forked from streadway/amqp
-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathregistry.go
57 lines (48 loc) · 1.18 KB
/
registry.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
package amqp
import (
"sync"
)
// Synchronized channel id to channel map with sequence generation
type channelRegistry struct {
sync.RWMutex
channels map[uint16]*Channel
sequence uint16
}
// Overwrites the channel at the given id
func (m *channelRegistry) add(id uint16, c *Channel) *Channel {
m.Lock()
defer m.Unlock()
m.channels[id] = c
return c
}
// Returns nil if not found
func (m *channelRegistry) get(id uint16) *Channel {
m.RLock()
defer m.RUnlock()
return m.channels[id]
}
// Deletes this id from the channel map, noop if the channel id doesn't exist
func (m *channelRegistry) remove(id uint16) {
m.Lock()
defer m.Unlock()
delete(m.channels, id)
}
// Returns the next sequence to be used for a channel id, starting at 1.
// Collisions with existing keys are not considered.
func (m *channelRegistry) next() uint16 {
m.Lock()
defer m.Unlock()
m.sequence++
return m.sequence
}
// Removes all channels, returning the channels removed
func (m *channelRegistry) removeAll() []*Channel {
m.Lock()
defer m.Unlock()
removed := make([]*Channel, 0, len(m.channels))
for id, c := range m.channels {
removed = append(removed, c)
delete(m.channels, id)
}
return removed
}