forked from ligato/cn-infra
-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathplugin_impl_resync.go
138 lines (112 loc) · 3.98 KB
/
plugin_impl_resync.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
// Copyright (c) 2017 Cisco and/or its affiliates.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at:
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package resync
import (
"strings"
"sync"
"time"
"go.ligato.io/cn-infra/v2/infra"
"go.ligato.io/cn-infra/v2/logging"
)
var (
// SingleResyncAcceptTimeout defines timeout for accepting resync start.
SingleResyncAcceptTimeout = time.Second * 1
// SingleResyncAckTimeout defines timeout for resync ack.
SingleResyncAckTimeout = time.Second * 10
)
// Plugin implements Plugin interface, therefore it can be loaded with other plugins.
type Plugin struct {
Deps
mu sync.Mutex
regOrder []string
registrations map[string]*registration
}
// Deps groups dependencies injected into the plugin so that they are
// logically separated from other plugin fields.
type Deps struct {
infra.PluginName
Log logging.PluginLogger
}
// Init initializes variables.
func (p *Plugin) Init() error {
p.registrations = make(map[string]*registration)
return nil
}
// Close TODO set flag that ignore errors => not start Resync while agent is stopping
// TODO kill existing Resync timeout while agent is stopping
func (p *Plugin) Close() error {
//TODO close error report channel
p.mu.Lock()
defer p.mu.Unlock()
p.registrations = make(map[string]*registration)
return nil
}
// Register function is supposed to be called in Init() by all VPP Agent plugins.
// The plugins are supposed to load current state of their objects when newResync() is called.
// The actual CreateNewObjects(), DeleteObsoleteObjects() and ModifyExistingObjects() will be orchestrated
// to ensure their proper order. If an error occurs during Resync, then new Resync is planned.
func (p *Plugin) Register(resyncName string) Registration {
p.mu.Lock()
defer p.mu.Unlock()
if _, found := p.registrations[resyncName]; found {
p.Log.WithField("resyncName", resyncName).
Panic("You are trying to register same resync twice")
return nil
}
// ensure that resync is triggered in the same order as the plugins were registered
p.regOrder = append(p.regOrder, resyncName)
reg := newRegistration(resyncName, make(chan StatusEvent))
p.registrations[resyncName] = reg
return reg
}
// DoResync can be used to start resync procedure outside of after init
func (p *Plugin) DoResync() {
p.startResync()
}
// Call callback on plugins to create/delete/modify objects.
func (p *Plugin) startResync() {
if len(p.regOrder) == 0 {
p.Log.Warnf("No registrations, skipping resync")
return
}
subs := strings.Join(p.regOrder, ", ")
p.Log.Infof("Resync starting for %d registrations (%v)", len(p.regOrder), subs)
resyncStart := time.Now()
for _, regName := range p.regOrder {
if reg, found := p.registrations[regName]; found {
t := time.Now()
p.startSingleResync(regName, reg)
took := time.Since(t).Round(time.Millisecond)
p.Log.Debugf("finished resync for %v took %v", regName, took)
}
}
p.Log.Infof("Resync done (took: %v)", time.Since(resyncStart).Round(time.Millisecond))
// TODO check if there ReportError (if not than report) if error occurred even during Resync
}
func (p *Plugin) startSingleResync(resyncName string, reg *registration) {
started := newStatusEvent(Started)
select {
case reg.statusChan <- started:
// accept
case <-time.After(SingleResyncAcceptTimeout):
p.Log.WithField("regName", resyncName).Warn("Timeout of resync start!")
return
}
select {
case <-started.ReceiveAck():
// ack
case <-time.After(SingleResyncAckTimeout):
p.Log.WithField("regName", resyncName).Warn("Timeout of resync ACK!")
}
}