-
Notifications
You must be signed in to change notification settings - Fork 20
/
Copy pathmentionjobiter.go
98 lines (80 loc) · 1.92 KB
/
mentionjobiter.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
package borges
import (
"gopkg.in/src-d/core-retrieval.v0/model"
"gopkg.in/src-d/go-queue.v1"
)
type mentionJobIter struct {
storer RepositoryStore
q queue.Queue
iter queue.JobIter
}
// NewMentionJobIter returns a JobIter that returns jobs generated from mentions
// received from a queue (e.g. from rovers).
func NewMentionJobIter(q queue.Queue, storer RepositoryStore) JobIter {
return &mentionJobIter{
storer: storer,
q: q,
}
}
func (i *mentionJobIter) Next() (*Job, error) {
if err := i.initIter(); err != nil {
return nil, err
}
mention, j, err := i.getMention()
if err != nil {
if queue.ErrAlreadyClosed.Is(err) {
if err := i.Close(); err != nil {
return nil, err
}
}
return nil, err
}
ID, err := RepositoryID(getEndpoints(mention.Aliases, mention.Endpoint), mention.IsFork, i.storer)
if err != nil {
return nil, err
}
bj := &Job{RepositoryID: ID}
if err := j.Ack(); err != nil {
return nil, err
}
return bj, nil
}
// initIter initialize the iterator if it is not already initialized.
func (i *mentionJobIter) initIter() error {
if i.iter == nil {
awnd := 1
iter, err := i.q.Consume(awnd)
if err != nil {
return err
}
i.iter = iter
}
return nil
}
// getMention obtains the next Job from the queue and decodes the mention on it.
// If success, a Mention model is returned. Also the job itself is returned, to
// be able to send back the ACK.
func (i *mentionJobIter) getMention() (m *model.Mention, j *queue.Job, err error) {
j, err = i.iter.Next()
if err != nil {
return
}
err = j.Decode(&m)
return
}
func getEndpoints(aliases []string, mainURL string) []string {
if aliases == nil {
return []string{mainURL}
}
// if aliases is not nil it also contains the main URL
return aliases
}
func (i *mentionJobIter) Close() error {
if i.iter != nil {
defer func() { i.iter = nil }()
if err := i.iter.Close(); err != nil {
return err
}
}
return nil
}