-
Notifications
You must be signed in to change notification settings - Fork 1
/
Copy pathsession-manager.go
138 lines (124 loc) · 4.07 KB
/
session-manager.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
package telego
import "github.com/davilag/telego/api"
type sessionManager struct {
update chan api.Update // Update channel which expects new messages from Telegram
exit chan int // Exit channel where it expects messages from the conversations to finish them
requeue chan api.Update // Channel to receive requeue messages
channels map[int]chan api.Update // Map from ChatID to channel which stores the channel to communicate with live sessions.
telego *Telego
}
// newSessionManager initialises a session manager which is going to manage the sessions
// by chat id. It returns 3 channels, the first one is the channel where
// the session manager expects new updates from telegram, the second
// channel is the channel where the manager expects a message from the
// conversations to finish the session and the third channel is the channel where
// we are going to requeue messages that we were assigned to a session the first time
// but that they should be process as a new session.
func newSessionManager(telego *Telego) (chan api.Update, chan int) {
s := sessionManager{
update: make(chan api.Update, 100),
exit: make(chan int, 100),
requeue: make(chan api.Update, 100),
channels: map[int]chan api.Update{},
telego: telego,
}
go s.manageChannels()
return s.update, s.exit
}
// Main loop for the session manager. It's listening for events in the exit and
// update channels. It gives priority to the exit and requeue channel.
// The reason why we're giving priority to the exit and the requeue channel is because
// we want to finish the session as soon as we've reached the timeout and if we have
// a requeued message, we want to give it higher priority.
func (s *sessionManager) manageChannels() {
for {
select {
case cID := <-s.exit:
s.doExit(cID)
continue
case u := <-s.requeue:
s.manageUpdate(u)
default:
}
select {
case cID := <-s.exit:
s.doExit(cID)
continue
case u := <-s.requeue:
s.manageUpdate(u)
case u := <-s.update:
s.manageUpdate(u)
}
}
}
// Method to manage an update coming from the telegram API
func (s *sessionManager) manageUpdate(u api.Update) {
chatID := u.Message.Chat.ID
v, ok := s.channels[chatID]
if !ok {
v = s.startConversation(u)
}
if v != nil {
v <- u
}
}
func (s *sessionManager) doExit(cID int) {
close(s.channels[cID])
delete(s.channels, cID)
}
// Given a message, it checks if it contains any command
// and returns a flow based on that.
func getCommandFlows(m *api.Message, s *sessionManager) (Flow, bool) {
command := m.GetCommand()
if command == "" {
return Flow{}, false
}
value, ok := s.telego.commandFlows[command]
return value, ok
}
// Given a message, it checks its kind and returns a flow
// based on it.
func getKindFlows(m *api.Message, s *sessionManager) (Flow, bool) {
k := m.GetKind()
value, ok := s.telego.kindFlows[k]
return value, ok
}
// Gets the flow to execute given a message, it gives priority
// to the command flows. Returns the default handler defined in
// the package if the message doesn't match any flow.
func getFlow(m *api.Message, s *sessionManager) Flow {
if f, ok := getCommandFlows(m, s); ok {
return f
}
if f, ok := getKindFlows(m, s); ok {
return f
}
return Flow{
ActualStep: s.telego.defaultHandler,
}
}
// Initialises a conversation, retrieving the flow defined for each command/kind
// and executing the default handler if no flow has been defined for that message
func (s *sessionManager) startConversation(u api.Update) chan api.Update {
f := getFlow(u.Message, s)
if f.ActualStep == nil {
return nil
}
var cu chan api.Update
// We only create the channel if the flow has time to live
// Otherwise, we don't create any channel as the flow will
// execute just one handler.
if f.TimeToLive != 0 {
cu = make(chan api.Update)
s.channels[u.Message.Chat.ID] = cu
}
c := newConversation(u.Message.Chat.ID, f, cu, s.exit, s.telego)
// If the channel hasn't been created, we just execute the
// handler for that update.
if cu == nil {
go c.executeUpdate(u)
} else {
go c.createSession(s.requeue)
}
return cu
}