-
Notifications
You must be signed in to change notification settings - Fork 4
/
entroq.go
1520 lines (1360 loc) · 48.4 KB
/
entroq.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
697
698
699
700
701
702
703
704
705
706
707
708
709
710
711
712
713
714
715
716
717
718
719
720
721
722
723
724
725
726
727
728
729
730
731
732
733
734
735
736
737
738
739
740
741
742
743
744
745
746
747
748
749
750
751
752
753
754
755
756
757
758
759
760
761
762
763
764
765
766
767
768
769
770
771
772
773
774
775
776
777
778
779
780
781
782
783
784
785
786
787
788
789
790
791
792
793
794
795
796
797
798
799
800
801
802
803
804
805
806
807
808
809
810
811
812
813
814
815
816
817
818
819
820
821
822
823
824
825
826
827
828
829
830
831
832
833
834
835
836
837
838
839
840
841
842
843
844
845
846
847
848
849
850
851
852
853
854
855
856
857
858
859
860
861
862
863
864
865
866
867
868
869
870
871
872
873
874
875
876
877
878
879
880
881
882
883
884
885
886
887
888
889
890
891
892
893
894
895
896
897
898
899
900
901
902
903
904
905
906
907
908
909
910
911
912
913
914
915
916
917
918
919
920
921
922
923
924
925
926
927
928
929
930
931
932
933
934
935
936
937
938
939
940
941
942
943
944
945
946
947
948
949
950
951
952
953
954
955
956
957
958
959
960
961
962
963
964
965
966
967
968
969
970
971
972
973
974
975
976
977
978
979
980
981
982
983
984
985
986
987
988
989
990
991
992
993
994
995
996
997
998
999
1000
// Copyright 2019 Chris Monson <shiblon@gmail.com>
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
//
// Package entroq contains the main task queue client and data definitions. The
// client relies on a backend to implement the actual transactional
// functionality, the interface for which is also defined here.
package entroq
import (
"context"
"errors"
"fmt"
"log"
"strings"
"time"
"github.com/google/uuid"
"golang.org/x/sync/errgroup"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/status"
)
const (
DefaultClaimPollTime = 30 * time.Second
DefaultClaimDuration = 30 * time.Second
)
// TaskID contains the identifying parts of a task. If IDs don't match
// (identifier and version together), then operations fail on those tasks.
//
// Also contains the name of the queue in which this task resides. Can be
// omitted, as it does not effect functionality, but might be required for
// authorization, which is performed based on queue name. Present whenever
// using tasks as a source of IDs.
type TaskID struct {
ID uuid.UUID `json:"id"`
Version int32 `json:"version"`
Queue string `json:"queue,omitempty"`
}
// NewTaskID creates a new TaskID with given options.
func NewTaskID(id uuid.UUID, version int32, opts ...IDOption) *TaskID {
tID := &TaskID{
ID: id,
Version: version,
}
for _, o := range opts {
o(tID)
}
return tID
}
// IDOption is an option for things that require task ID information. Allows additional ID-related metadata to be passed.
type IDOption func(id *TaskID)
// WithIDQueue specifies the queue for a particular task ID.
func WithIDQueue(q string) IDOption {
return func(id *TaskID) {
id.Queue = q
}
}
// String produces the id:version string representation.
func (t TaskID) String() string {
return fmt.Sprintf("%s:v%d (in %q)", t.ID, t.Version, t.Queue)
}
// AsDeletion produces an appropriate ModifyArg to delete the task with this ID.
func (t TaskID) AsDeletion() ModifyArg {
return Deleting(t.ID, t.Version, WithIDQueue(t.Queue))
}
// AsDependency produces an appropriate ModifyArg to depend on this task ID.
func (t TaskID) AsDependency() ModifyArg {
return DependingOn(t.ID, t.Version, WithIDQueue(t.Queue))
}
// TaskData contains just the data, not the identifier or metadata. Used for insertions.
type TaskData struct {
Queue string `json:"queue"`
At time.Time `json:"at"`
Value []byte `json:"value"`
// Attempt indicates which "attempt number" this task is on. Used by workers.
Attempt int32 `json:"attempt"`
// Err contains error information for this task. Used by workers.
Err string `json:"err"`
// ID is an optional task ID to be used for task insertion.
// Default (uuid.Nil) causes the backend to assign one, and that is
// sufficient for many cases. If you desire to make a database entry that
// *references* a task, however, in that case it can make sense to specify
// an explicit task ID for insertion. This allows a common workflow cycle
//
// consume task -> db update -> insert tasks
//
// to be done safely, where the database update needs to refer to
// to-be-inserted tasks.
ID uuid.UUID `json:"id"`
// skipCollidingID indicates that a collision on insertion is not fatal,
// and the insertion can be removed if that happens, and then the
// modification can be retried.
skipCollidingID bool
// These timings are here so that journaling can restore full state.
// Usually they are blank, and there are no convenience methods to allow
// them to be set. Leave them at default values in all cases.
Created time.Time `json:"created"`
Modified time.Time `json:"modified"`
}
// String returns a string representation of the task data, excluding the value.
func (t *TaskData) String() string {
s := fmt.Sprintf("%q::%v", t.Queue, t.At)
if t.ID != uuid.Nil {
s += "::" + t.ID.String()
}
return s
}
// Task represents a unit of work, with a byte slice value payload.
// Note that Claims is the number of times a task has successfully been claimed.
// This is different than the version number, which increments for
// every modification, not just claims.
type Task struct {
Queue string `json:"queue"`
ID uuid.UUID `json:"id"`
Version int32 `json:"version"`
At time.Time `json:"at"`
Claimant uuid.UUID `json:"claimant"`
Claims int32 `json:"claims"`
Value []byte `json:"value"`
Created time.Time `json:"created"`
Modified time.Time `json:"modified"`
// FromQueue specifies the previous queue for a task that is moving to another queue.
// Usually not present, can be used for change authorization (since two queues are in play, there).
FromQueue string `json:"fromqueue,omitempty"`
// Worker retry logic uses these fields when moving tasks and when retrying them.
// It is left up to the consumer to determine how many attempts is too many
// and to produce a suitable retry or move error.
Attempt int32 `json:"attempt"`
Err string `json:"err"`
}
// String returns a useful representation of this task.
func (t *Task) String() string {
qInfo := fmt.Sprintf("%q", t.Queue)
if t.FromQueue != "" && t.FromQueue != t.Queue {
qInfo = fmt.Sprintf("%q <- %q", t.Queue, t.FromQueue)
}
return fmt.Sprintf("Task [%s %s:v%d]\n\t", qInfo, t.ID, t.Version) + strings.Join([]string{
fmt.Sprintf("at=%q claimant=%s claims=%d attempt=%d err=%q", t.At, t.Claimant, t.Claims, t.Attempt, t.Err),
fmt.Sprintf("val=%q", string(t.Value)),
}, "\n\t")
}
// AsDeletion returns a ModifyArg that can be used in the Modify function, e.g.,
//
// cli.Modify(ctx, task1.AsDeletion())
//
// The above would cause the given task to be deleted, if it can be. It is
// shorthand for
//
// cli.Modify(ctx, Deleting(task1.ID, task1.Version, WithIDQueue(task1.Queue)))
func (t *Task) AsDeletion() ModifyArg {
return Deleting(t.ID, t.Version, WithIDQueue(t.Queue))
}
// AsChange returns a ModifyArg that can be used in the Modify function, e.g.,
//
// cli.Modify(ctx, task1.AsChange(ArrivalTimeBy(2 * time.Minute)))
//
// The above is shorthand for
//
// cli.Modify(ctx, Changing(task1, ArrivalTimeBy(2 * time.Minute)))
func (t *Task) AsChange(args ...ChangeArg) ModifyArg {
return Changing(t, args...)
}
// AsDependency returns a ModifyArg that can be used to create a Modify dependency, e.g.,
//
// cli.Modify(ctx, task.AsDependency())
//
// That is shorthand for
//
// cli.Modify(ctx, DependingOn(task.ID, task.Version, WithIDQueue(task.Queue)))
func (t *Task) AsDependency() ModifyArg {
return DependingOn(t.ID, t.Version, WithIDQueue(t.Queue))
}
// ID returns a Task ID from this task.
func (t *Task) IDVersion() *TaskID {
return NewTaskID(t.ID, t.Version, WithIDQueue(t.Queue))
}
// Data returns the data for this task.
func (t *Task) Data() *TaskData {
return &TaskData{
Queue: t.Queue,
At: t.At,
Value: t.Value,
ID: t.ID,
Attempt: t.Attempt,
Err: t.Err,
Created: t.Created,
Modified: t.Modified,
}
}
// Copy copies this task's data and everything.
func (t *Task) Copy() *Task {
newT := new(Task)
*newT = *t
newT.Value = make([]byte, len(t.Value))
copy(newT.Value, t.Value)
return newT
}
// CopyOmitValue copies this task but leaves the value blank.
func (t *Task) CopyOmitValue() *Task {
newT := new(Task)
*newT = *t
newT.Value = nil
return newT
}
// CopyWithValue lets you specify whether the value should be copied.
func (t *Task) CopyWithValue(ok bool) *Task {
if ok {
return t.Copy()
}
return t.CopyOmitValue()
}
// ClaimQuery contains information necessary to attempt to make a claim on a task in a specific queue.
type ClaimQuery struct {
Queues []string // Queues to attempt to claim from. Only one wins.
Claimant uuid.UUID // The ID of the process trying to make the claim.
Duration time.Duration // How long the task should be claimed for if successful.
PollTime time.Duration // Length of time between (possibly interruptible) sleep and polling.
}
// TasksQuery holds information for a tasks query.
type TasksQuery struct {
Queue string
Claimant uuid.UUID
Limit int
IDs []uuid.UUID
// OmitValues specifies that only metadata should be returned.
// Backends are not required to honor this flag, though any
// service receiving it in a request should ensure that values
// are not passed over the wire.
OmitValues bool
}
// QueuesQuery modifies a queue listing request.
type QueuesQuery struct {
// MatchPrefix specifies allowable prefix matches. If empty, limitations
// are not set based on prefix matching. All prefix match conditions are ORed.
// If both this and MatchExact are empty or nil, no limitations are set on
// queue name: all will be returned.
MatchPrefix []string
// MatchExact specifies allowable exact matches. If empty, limitations are
// not set on exact queue names.
MatchExact []string
// Limit specifies an upper bound on the number of queue names returned.
Limit int
}
func newQueuesQuery(opts ...QueuesOpt) *QueuesQuery {
q := new(QueuesQuery)
for _, opt := range opts {
opt(q)
}
return q
}
// QueueStat holds high-level information about a queue.
// Note that available + claimed may not add up to size. This is because a task
// can be unavailable (AT in the future) without being claimed by anyone.
type QueueStat struct {
Name string `json:"name"` // The queue name.
Size int `json:"size"` // The total number of tasks.
Claimed int `json:"claimed"` // The number of currently claimed tasks.
Available int `json:"available"` // The number of available tasks.
MaxClaims int `json:"maxClaims"` // The maximum number of claims for a task in the queue.
}
// QueuesFromStats can be used for converting the new QueueStats to the old
// Queues output, making it easier on backend implementers to just define one
// function (similar to how WaitTryClaim or PollTryClaim can make implementing
// Claim in terms of TryClaim easier).
func QueuesFromStats(stats map[string]*QueueStat, err error) (map[string]int, error) {
if err != nil {
return nil, err
}
qs := make(map[string]int)
for k, v := range stats {
qs[k] = v.Size
}
return qs, nil
}
// BackendClaimFunc is a function that can make claims based on a ClaimQuery.
// It is a convenience type for backends to use.
type BackendClaimFunc func(ctx context.Context, eq *ClaimQuery) (*Task, error)
// PollTryClaim runs a loop in which the TryClaim function is called between
// sleeps with exponential backoff (up to a point). Backend implementations may
// choose to use this as their Claim implementation.
func PollTryClaim(ctx context.Context, eq *ClaimQuery, tc BackendClaimFunc) (*Task, error) {
const (
maxCheckInterval = 30 * time.Second
startInterval = time.Second
)
curWait := startInterval
for {
// Don't wait longer than the check interval or canceled context.
task, err := tc(ctx, eq)
if err != nil {
return nil, fmt.Errorf("poll try claim: %w", err)
}
if task != nil {
return task, nil
}
// No error, no task - we wait with exponential backoff and try again.
select {
case <-time.After(curWait):
curWait *= 2
if curWait > maxCheckInterval {
curWait = maxCheckInterval
}
case <-ctx.Done():
return nil, fmt.Errorf("poll try claim: %w", ctx.Err())
}
}
}
// Waiter can wait for an event on a given key (e.g., queue name).
type Waiter interface {
// Wait waits for an event on the given set of keys, calling cond after
// poll intervals until one of them is notified, cond returns true, or the
// context is canceled.
//
// If cond is nil, this function returns when the channel is notified,
// the poll interval is exceeded, or the context is canceled. Only the last
// event causes a non-nil error.
//
// If poll is 0, it can never be exceeded.
//
// A common use is to use poll=0 and cond=nil, causing this to simply wait
// for a notification.
Wait(ctx context.Context, keys []string, poll time.Duration, cond func() bool) error
}
// Notifier can be notified on a given key (e.g., queue name);
type Notifier interface {
// Notify signals an event on the key. Wakes up one waiter, if any, or is
// dropped if no waiters are waiting.
Notify(key string)
}
// NotifyWaiter can wait for and notify events.
type NotifyWaiter interface {
Notifier
Waiter
}
// NotifyModified takes inserted and changed tasks and notifies once per unique queue/ID pair.
func NotifyModified(n Notifier, inserted, changed []*Task) {
now := time.Now()
qs := make(map[uuid.UUID]string)
for _, t := range inserted {
if !now.Before(t.At) {
qs[t.ID] = t.Queue
}
}
for _, t := range changed {
if !now.Before(t.At) {
qs[t.ID] = t.Queue
}
}
for _, q := range qs {
n.Notify(q)
}
}
// WaitTryClaim runs a loop in which the TryClaim function is called, then if
// no tasks are available, the given wait function is used to attempt to wait
// for a task to become available on the queue.
//
// The wait function should exit (more or less) immediately if the context is
// canceled, and should return a nil error if the wait was successful
// (something became available).
func WaitTryClaim(ctx context.Context, eq *ClaimQuery, tc BackendClaimFunc, w Waiter) (*Task, error) {
var (
task *Task
condErr error
)
pollTime := eq.PollTime
if pollTime == 0 {
pollTime = DefaultClaimPollTime
}
if err := w.Wait(ctx, eq.Queues, pollTime, func() bool {
// If we get either a task or an error, time to stop trying.
task, condErr = tc(ctx, eq)
return task != nil || condErr != nil
}); err != nil {
return nil, fmt.Errorf("wait try claim: %w", err)
}
if condErr != nil {
return nil, fmt.Errorf("wait try claim condition: %w", condErr)
}
return task, nil
}
// Backend describes all of the functions that any backend has to implement
// to be used as the storage for task queues.
type Backend interface {
// Queues returns a mapping from all known queues to their task counts.
Queues(ctx context.Context, qq *QueuesQuery) (map[string]int, error)
// QueueStats returns statistics for the specified queues query. Richer
// than just calling Queues, as it can return more than just the size.
QueueStats(ctx context.Context, qq *QueuesQuery) (map[string]*QueueStat, error)
// Tasks retrieves all tasks from the given queue. If claimantID is
// specified (non-zero), limits those tasks to those that are either
// expired or belong to the given claimant. Otherwise returns them all.
Tasks(ctx context.Context, tq *TasksQuery) ([]*Task, error)
// TryClaim attempts to claim a task from the "top" (or close to it) of the
// given queue. When claimed, a task is held for the duration specified
// from the time of the claim. If claiming until a specific wall-clock time
// is desired, the task should be immediately modified after it is claimed
// to set At to a specific time. Returns a nil task and a nil error if
// there is nothing to claim. Will fail with a DependencyError is a
// specific task ID is requested but not present.
TryClaim(ctx context.Context, cq *ClaimQuery) (*Task, error)
// Claim is a blocking version of TryClaim, attempting to claim a task
// from a queue, and blocking until canceled or a task becomes available.
//
// Will fail with a DependencyError is a specific task ID is requested but
// not present. Never returns both a nil task and a nil error
// simultaneously: a failure to claim a task is an error (potentially just
// a timeout).
Claim(ctx context.Context, cq *ClaimQuery) (*Task, error)
// Modify attempts to atomically modify the task store, and only succeeds
// if all dependencies are available and all mutations are either expired
// or already owned by this claimant. The Modification type provides useful
// functions for determining whether dependencies are good or bad. This
// function is intended to return a DependencyError if the transaction could
// not proceed because dependencies were missing or already claimed (and
// not expired) by another claimant.
Modify(ctx context.Context, mod *Modification) (inserted []*Task, changed []*Task, err error)
// Time returns the time as the backend understands it, in UTC.
Time(ctx context.Context) (time.Time, error)
// Close closes any underlying connections. The backend is expected to take
// ownership of all such connections, so this cleans them up.
Close() error
}
// EntroQ is a client interface for accessing the task queue.
type EntroQ struct {
backend Backend
clientID uuid.UUID
}
// Option is an option that modifies how EntroQ clients are created.
type Option func(*EntroQ)
// WithClaimantID sets the default claimaint ID for this client.
func WithClaimantID(id uuid.UUID) Option {
return func(eq *EntroQ) {
eq.clientID = id
}
}
// BackendOpener is a function that can open a connection to a backend. Creating
// a client with a specific backend is accomplished by passing one of these functions
// into New.
type BackendOpener func(ctx context.Context) (Backend, error)
// New creates a new task client with the given backend implementation, for example, to
// use an in-memory implementation:
//
// cli, err := New(ctx, mem.Opener())
func New(ctx context.Context, opener BackendOpener, opts ...Option) (*EntroQ, error) {
backend, err := opener(ctx)
if err != nil {
return nil, fmt.Errorf("backend connection: %w", err)
}
eq := &EntroQ{
clientID: uuid.New(),
backend: backend,
}
for _, o := range opts {
o(eq)
}
return eq, nil
}
// Close closes the underlying backend.
func (c *EntroQ) Close() error {
return c.backend.Close()
}
// ID returns the default claimant ID of this client. Used in "bare" calls,
// like Modify, Claim, etc. To change the ID per call (usually not needed, and
// can be dangerous), use the "As" calls, e.g., ModifyAs.
func (c *EntroQ) ID() uuid.UUID {
return c.clientID
}
// Queues returns a mapping from all queue names to their task counts.
func (c *EntroQ) Queues(ctx context.Context, opts ...QueuesOpt) (map[string]int, error) {
query := newQueuesQuery(opts...)
return c.backend.Queues(ctx, query)
}
// QueueStats returns a mapping from queue names to task stats.
func (c *EntroQ) QueueStats(ctx context.Context, opts ...QueuesOpt) (map[string]*QueueStat, error) {
query := newQueuesQuery(opts...)
return c.backend.QueueStats(ctx, query)
}
// QueuesEmpty indicates whether the specified task queues are all empty. If no
// options are specified, returns an error.
func (c *EntroQ) QueuesEmpty(ctx context.Context, opts ...QueuesOpt) (bool, error) {
if len(opts) == 0 {
return false, fmt.Errorf("empty check: no queue options specified")
}
qs, err := c.Queues(ctx, opts...)
if err != nil {
return false, fmt.Errorf("empty check: %w", err)
}
for _, size := range qs {
if size > 0 {
return false, nil
}
}
return true, nil
}
// WaitQueuesEmpty does a poll-and-wait strategy to block until the queue query returns empty.
func (c *EntroQ) WaitQueuesEmpty(ctx context.Context, opts ...QueuesOpt) error {
for {
empty, err := c.QueuesEmpty(ctx, opts...)
if err != nil {
return fmt.Errorf("wait empty: %w", err)
}
if empty {
return nil
}
select {
case <-ctx.Done():
return fmt.Errorf("wait empty: %w", ctx.Err())
case <-time.After(2 * time.Second):
}
}
}
// Tasks returns a slice of all tasks in the given queue.
func (c *EntroQ) Tasks(ctx context.Context, queue string, opts ...TasksOpt) ([]*Task, error) {
query := &TasksQuery{
Queue: queue,
}
for _, opt := range opts {
opt(c, query)
}
return c.backend.Tasks(ctx, query)
}
// TasksOpt is an option that can be passed into Tasks to control what it returns.
type TasksOpt func(*EntroQ, *TasksQuery)
// LimitSelf only returns self-claimed tasks or expired tasks.
func LimitSelf() TasksOpt {
return func(c *EntroQ, q *TasksQuery) {
q.Claimant = c.clientID
}
}
// LimitClaimant only returns tasks with the given claimant, or expired tasks.
func LimitClaimant(id uuid.UUID) TasksOpt {
return func(_ *EntroQ, q *TasksQuery) {
q.Claimant = id
}
}
// LimitTasks sets the limit on the number of tasks to return. A value <= 0 indicates "no limit".
func LimitTasks(limit int) TasksOpt {
return func(_ *EntroQ, q *TasksQuery) {
q.Limit = limit
}
}
// OmitValues tells a tasks query to only return metadata, not values.
func OmitValues() TasksOpt {
return func(_ *EntroQ, q *TasksQuery) {
q.OmitValues = true
}
}
// WithTaskID adds a task ID to the set of IDs that can be returned in a task
// query. The default is "all that match other specs" if no IDs are specified.
// Note that versions are not part of the ID.
func WithTaskID(ids ...uuid.UUID) TasksOpt {
return func(_ *EntroQ, q *TasksQuery) {
q.IDs = append(q.IDs, ids...)
}
}
// QueuesOpt modifies how queue requests are made.
type QueuesOpt func(*QueuesQuery)
// MatchPrefix adds allowable prefix matches for a queue listing.
func MatchPrefix(prefixes ...string) QueuesOpt {
return func(q *QueuesQuery) {
q.MatchPrefix = append(q.MatchPrefix, prefixes...)
}
}
// MatchExact adds an allowable exact match for a queue listing.
func MatchExact(matches ...string) QueuesOpt {
return func(q *QueuesQuery) {
q.MatchExact = append(q.MatchExact, matches...)
}
}
// LimitQueues sets the limit on the number of queues that are returned.
func LimitQueues(limit int) QueuesOpt {
return func(q *QueuesQuery) {
q.Limit = limit
}
}
// ClaimOpt modifies limits on a task claim.
type ClaimOpt func(*ClaimQuery)
// ClaimAs sets the claimant ID for a claim operation. When not set, uses the internal default for this client.
func ClaimAs(id uuid.UUID) ClaimOpt {
return func(q *ClaimQuery) {
q.Claimant = id
}
}
// ClaimPollTime sets the polling time for a claim. Set to DefaultClaimPollTime if left at 0.
func ClaimPollTime(d time.Duration) ClaimOpt {
return func(q *ClaimQuery) {
q.PollTime = d
}
}
// ClaimFor sets the duration of a successful claim (the amount of time from now when it expires).
func ClaimFor(duration time.Duration) ClaimOpt {
return func(q *ClaimQuery) {
q.Duration = duration
}
}
// From sets the queue(s) for a claim.
func From(qs ...string) ClaimOpt {
return func(cq *ClaimQuery) {
// Add and remove duplicates.
qmap := make(map[string]bool)
for _, q := range qs {
qmap[q] = true
}
for _, q := range cq.Queues {
qmap[q] = true
}
cq.Queues = make([]string, 0, len(qmap))
for q := range qmap {
cq.Queues = append(cq.Queues, q)
}
}
}
func asStatusCode(err error) (codes.Code, bool) {
// We have to sequentially unwrap errors to find the underlying cause,
// since the status package does not expose its error type and the
// GRPCStatus interface is ephemeral and clearly meant to be an
// implementation detail in that package.
for e := err; e != nil; e = errors.Unwrap(e) {
if code := status.Code(e); code != codes.Unknown {
return code, true
}
}
return codes.OK, false
}
// IsCanceled indicates whether the error is a canceled error.
func IsCanceled(err error) bool {
if err == nil {
return false
}
if errors.Is(err, context.Canceled) {
return true
}
if code, ok := asStatusCode(err); ok {
return code == codes.Canceled
}
return false
}
// IsTimeout indicates whether the error is a timeout error.
func IsTimeout(err error) bool {
if err == nil {
return false
}
if errors.Is(err, context.DeadlineExceeded) {
return true
}
if code, ok := asStatusCode(err); ok {
return code == codes.DeadlineExceeded
}
return false
}
// claimQueryFromOpts processes ClaimOpt values and produces a claim query.
func claimQueryFromOpts(claimant uuid.UUID, opts ...ClaimOpt) *ClaimQuery {
query := &ClaimQuery{
Claimant: claimant,
Duration: DefaultClaimDuration,
PollTime: DefaultClaimPollTime,
}
for _, opt := range opts {
opt(query)
}
return query
}
// Claim attempts to get the next unclaimed task from the given queues. It
// blocks until one becomes available or until the context is done. When it
// succeeds, it returns a task with the claimant set to the default, or to the
// value given in options, and an arrival time computed from the duration. The
// default duration if none is given is DefaultClaimDuration.
func (c *EntroQ) Claim(ctx context.Context, opts ...ClaimOpt) (*Task, error) {
query := claimQueryFromOpts(c.clientID, opts...)
if len(query.Queues) == 0 {
return nil, fmt.Errorf("no queues specified for claim")
}
return c.backend.Claim(ctx, query)
}
// TryClaim attempts one time to claim a task from the given queues. If
// there are no tasks, it returns a nil error *and* a nil task. This allows the
// caller to decide whether to retry. It can fail if certain (optional)
// dependency tasks are not present. This can be used, for example, to ensure
// that configuration tasks haven't changed.
func (c *EntroQ) TryClaim(ctx context.Context, opts ...ClaimOpt) (*Task, error) {
query := claimQueryFromOpts(c.clientID, opts...)
if len(query.Queues) == 0 {
return nil, fmt.Errorf("no queues specified for try claim")
}
return c.backend.TryClaim(ctx, query)
}
// RenewFor attempts to renew the given task's lease (update arrival time) for
// the given duration. Returns the new task.
func (c *EntroQ) RenewFor(ctx context.Context, task *Task, duration time.Duration) (*Task, error) {
changed, err := c.RenewAllFor(ctx, []*Task{task}, duration)
if err != nil {
return nil, fmt.Errorf("renew task: %w", err)
}
return changed[0], nil
}
// RenewAllFor attempts to renew all given tasks' leases (update arrival times)
// for the given duration. Returns the new tasks.
func (c *EntroQ) RenewAllFor(ctx context.Context, tasks []*Task, duration time.Duration) (result []*Task, err error) {
if len(tasks) == 0 {
return nil, nil
}
// Note that we use the default claimant always in this situation. These
// functions are high-level user-friendly things and don't allow some of
// the other hocus pocus that might happen in a grpc proxy situation (where
// claimant IDs need to come from the request, not the client it uses to
// perform its work).
var modArgs []ModifyArg
var taskIDs []string
for _, t := range tasks {
modArgs = append(modArgs, Changing(t, ArrivalTimeBy(duration)))
taskIDs = append(taskIDs, t.IDVersion().String())
}
_, changed, err := c.Modify(ctx, modArgs...)
if err != nil {
return nil, fmt.Errorf("renewal failed for tasks %q: %w", taskIDs, err)
}
if len(changed) != len(tasks) {
return nil, fmt.Errorf("renewal expected %d updated tasks, got %d", len(tasks), len(changed))
}
return changed, nil
}
// DoWithRenewAll runs the provided function while keeping all given tasks leases renewed.
func (c *EntroQ) DoWithRenewAll(ctx context.Context, tasks []*Task, lease time.Duration, f func(context.Context) error) ([]*Task, error) {
// The use of contexts is subtle, here. There are cases where a gRPC client
// context cancelation can result in a delayed call to the server that
// actually makes it there. There appears, in other words, to be a race
// condition between client cancelation and actual server calls.
//
// What this means that we can't use two goroutines in this group, one for
// each of renewal and user-defined calls, because we can get into a
// situation where the following sequence happens:
//
// - task is renewed
// - just before the user function finished (closing doneCh), another renewal is started
// - user function finishes before the server call goes out
// - context is canceled, client call is canceled
// - but the call is already en route to the server
//
// In this case, the renewed value comes back to us for the first renewal,
// but the second is detached and we don't have its version anymore, so we
// send back the wrong task version and any attempt to change it fails.
//
// Thus, we use a special group context that can be canceled after a
// blocking renewal loop exits with an error. If it exits cleanly, then it
// got a signal from the completion of the user function already.
//
// This means that the only thing that really cancels the renewal loop is
// cancelation of the *parent context*, which will also cancel the user
// function and an appropriate error will be returned to the caller (who
// canceled this to begin with).
g, gctx := errgroup.WithContext(ctx)
gctx, cancelGroup := context.WithCancel(gctx)
doneCh := make(chan struct{})
g.Go(func() error {
defer close(doneCh)
return f(gctx)
})
renewed := tasks
if err := func() error {
for {
select {
case <-doneCh:
return nil
case <-ctx.Done():
return fmt.Errorf("renew all stopped: %w", ctx.Err())
case <-time.After(lease / 2):
r, err := c.RenewAllFor(ctx, renewed, lease)
if err != nil {
return fmt.Errorf("renew all lease: %w", err)
}
renewed = r
}
}
}(); err != nil && !IsCanceled(err) {
cancelGroup() // Terminate the user-provided function early.
return nil, fmt.Errorf("renew loop: %w", err)
}
if err := g.Wait(); err != nil {
return nil, fmt.Errorf("renew all: %w", err)
}
// The task will have been overwritten with every renewal. Return final task.
return renewed, nil
}
// DoWithRenew runs the provided function while keeping the given task lease renewed.
func (c *EntroQ) DoWithRenew(ctx context.Context, task *Task, lease time.Duration, f func(context.Context) error) (*Task, error) {
finalTasks, err := c.DoWithRenewAll(ctx, []*Task{task}, lease, f)
if err != nil {
return nil, fmt.Errorf("renew: %w", err)
}
return finalTasks[0], nil
}
// Modify allows a batch modification operation to be done, gated on the
// existence of all task IDs and versions specified. Deletions, Updates, and
// Dependencies must be present. The transaction all fails or all succeeds.
//
// Returns all inserted task IDs, and an error if it could not proceed. If the error
// was due to missing dependencies, a *DependencyError is returned, which can be checked for
// by calling AsDependency(err).
func (c *EntroQ) Modify(ctx context.Context, modArgs ...ModifyArg) (inserted []*Task, changed []*Task, err error) {
mod := NewModification(c.clientID, modArgs...)
for {
ins, chg, err := c.backend.Modify(ctx, mod)
if err == nil {
return ins, chg, nil
}
depErr, ok := AsDependency(err)
// If not a dependency error, pass it on out.
if !ok {
return nil, nil, fmt.Errorf("modify: %w", err)
}
// If anything is missing or there's a claim problem, bail.
if depErr.HasMissing() || depErr.HasClaims() {
return nil, nil, fmt.Errorf("modify non-ins deps: %w", err)
}
// No collisions? Not sure what's going on.
if !depErr.HasCollisions() {
return nil, nil, fmt.Errorf("non-collision errors: %w", err)
}
// If we get this far, the only errors we have are insertion collisions.
// If any of them cannot be skipped, we bail out.
// Otherwise we remove them and try again.
// Now check all collisions. If we can remove all of them safely, do so
// and try again.
collidingIDs := make(map[uuid.UUID]bool)
for _, ins := range depErr.Inserts {
collidingIDs[ins.ID] = true
}
var newInserts []*TaskData
for _, td := range mod.Inserts {
if collidingIDs[td.ID] {
if td.skipCollidingID {
// Skippable, so skip.
continue
}
// Can't skip this one. Bail.
return nil, nil, fmt.Errorf("unskippable collision: %w", err)
}
newInserts = append(newInserts, td)
}
log.Printf("Trying modification again due to skippable colliding inserts. %v -> %v", mod.Inserts, newInserts)
mod.Inserts = newInserts
}
}
// ModifyArg is an argument to the Modify function, which does batch modifications to the task store.
type ModifyArg func(m *Modification)
// ModifyAs sets the claimant ID for a particular modify call. Usually not
// needed, can be dangerous unless used with extreme care. The client default
// is used if this option is not provided.
func ModifyAs(id uuid.UUID) ModifyArg {
return func(m *Modification) {
m.Claimant = id
}
}
// Inserting creates an insert modification from TaskData:
//
// cli.Modify(ctx,
// Inserting(&TaskData{
// Queue: "myqueue",
// At: time.Now.Add(1 * time.Minute),
// Value: []byte("hi there"),
// }))
//
// Or, better still,
//
// cli.Modify(ctx,
// InsertingInto("myqueue",
// WithArrivalTimeIn(1 * time.Minute),
// WithValue([]byte("hi there"))))
func Inserting(tds ...*TaskData) ModifyArg {
return func(m *Modification) {
m.Inserts = append(m.Inserts, tds...)
}
}
// InsertingInto creates an insert modification. Use like this:
//
// cli.Modify(InsertingInto("my queue name", WithValue([]byte("hi there"))))
func InsertingInto(q string, insertArgs ...InsertArg) ModifyArg {
return func(m *Modification) {
data := &TaskData{Queue: q}
for _, arg := range insertArgs {
arg(m, data)