-
Notifications
You must be signed in to change notification settings - Fork 1
/
task.go
135 lines (126 loc) · 3.95 KB
/
task.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
package asyncp
import (
"context"
"log"
"reflect"
)
// Task describes a single execution unit
//
//go:generate mockgen -source $GOFILE -package mocks -destination mocks/task.go
type Task interface {
// Execute the list of subtasks with input data collection.
// It returns the new data collection which will be used in the next tasks as input params.
Execute(ctx context.Context, event Event, responseWriter ResponseWriter) error
}
// FuncTask provides implementation of Task interface for function pointer
type FuncTask func(ctx context.Context, event Event, responseWriter ResponseWriter) error
// Execute the list of subtasks with input data collection.
func (f FuncTask) Execute(ctx context.Context, event Event, responseWriter ResponseWriter) error {
defer func() {
err := responseWriter.Release()
if err != nil {
log.Printf("release response writer: %s", err.Error())
}
}()
return f(ctx, event, responseWriter)
}
// Async transforms task to the asynchronous executor
func (f FuncTask) Async(options ...AsyncOption) *AsyncTask {
return WrapAsyncTask(f, options...)
}
// TaskFrom converts income handler type to Task interface
func TaskFrom(handler any) Task {
switch h := handler.(type) {
case Task:
return h
case func(ctx context.Context, event Event, responseWriter ResponseWriter) error:
return FuncTask(h)
default:
return ExtFuncTask(h)
}
}
var (
errorType = reflect.TypeOf((*error)(nil)).Elem()
contextType = reflect.TypeOf((*context.Context)(nil)).Elem()
eventType = reflect.TypeOf((*Event)(nil)).Elem()
responseWriterType = reflect.TypeOf((*ResponseWriter)(nil)).Elem()
)
// ExtFuncTask wraps function argument with arbitrary input data type
func ExtFuncTask(f any) FuncTask {
fv := reflect.ValueOf(f)
if fv.Kind() != reflect.Func {
panic("argument must be a function")
}
var (
ft = fv.Type()
argMapper = make([]func(context.Context, Event, ResponseWriter) (reflect.Value, error), 0, ft.NumIn())
retMapper = make([]func(reflect.Value, ResponseWriter) error, 0, ft.NumOut())
)
for i := 0; i < ft.NumIn(); i++ {
inType := ft.In(i)
switch inType {
case contextType:
argMapper = append(argMapper, func(ctx context.Context, _ Event, _ ResponseWriter) (reflect.Value, error) {
return reflect.ValueOf(ctx), nil
})
case eventType:
argMapper = append(argMapper, func(_ context.Context, event Event, _ ResponseWriter) (reflect.Value, error) {
return reflect.ValueOf(event), nil
})
case responseWriterType:
argMapper = append(argMapper, func(_ context.Context, _ Event, responseWriter ResponseWriter) (reflect.Value, error) {
return reflect.ValueOf(responseWriter), nil
})
default:
argMapper = append(argMapper, func(_ context.Context, event Event, _ ResponseWriter) (reflect.Value, error) {
newValue, newValueI := newValue(inType)
err := event.Payload().Decode(newValueI)
return newValue, err
})
}
}
for i := 0; i < ft.NumOut(); i++ {
outType := ft.Out(i)
switch outType {
case errorType:
retMapper = append(retMapper, func(v reflect.Value, _ ResponseWriter) error {
if v.IsNil() {
return nil
}
return v.Interface().(error)
})
default:
retMapper = append(retMapper, func(v reflect.Value, responseWriter ResponseWriter) error {
if v.IsNil() {
return nil
}
return responseWriter.WriteResonse(v.Interface())
})
}
}
return func(ctx context.Context, event Event, responseWriter ResponseWriter) error {
args := make([]reflect.Value, 0, len(argMapper))
for _, fm := range argMapper {
arg, err := fm(ctx, event, responseWriter)
if err != nil {
return err
}
args = append(args, arg)
}
retVals := fv.Call(args)
for i, fr := range retMapper {
if err := fr(retVals[i], responseWriter); err != nil {
return err
}
}
return nil
}
}
func newValue(t reflect.Type) (reflect.Value, any) {
if t.Kind() == reflect.Ptr {
return newValue(t.Elem())
}
v := reflect.New(t)
i := v.Interface()
return v, i
}