-
Notifications
You must be signed in to change notification settings - Fork 1
/
gtable.go
81 lines (65 loc) · 2.59 KB
/
gtable.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
package gstream
import "github.com/KumKeeHyun/gstream/state"
// GTable is table interface for DSL.
type GTable[K, V any] interface {
// ToValueStream convert this table to GStream.
// GTable[K, V] -> GStream[V]
ToValueStream() GStream[V]
// ToStream convert this table to KeyValueGStream.
// GTable[K, V] -> KeyValueGStream[K, V]
ToStream() KeyValueGStream[K, V]
}
type gtable[K, V any] struct {
builder *builder
rid routineID
kvstore state.ReadOnlyKeyValueStore[K, V]
addChild func(*graphNode[KeyValue[K, Change[V]], KeyValue[K, Change[V]]])
}
var _ GTable[any, any] = >able[any, any]{}
func (t *gtable[K, V]) ToValueStream() GStream[V] {
tableToValueStreamNode := newTableToValueStreamNode[K, V]()
castAddChild[KeyValue[K, Change[V]], V](t.addChild)(tableToValueStreamNode)
return &gstream[V]{
builder: t.builder,
rid: t.rid,
addChild: curryingAddChild[KeyValue[K, Change[V]], V, V](tableToValueStreamNode),
}
}
func (t *gtable[K, V]) ToStream() KeyValueGStream[K, V] {
tableToStreamNode := newTableToStreamNode[K, V]()
castAddChild[KeyValue[K, Change[V]], KeyValue[K, V]](t.addChild)(tableToStreamNode)
currying := curryingAddChild[KeyValue[K, Change[V]], KeyValue[K, V], KeyValue[K, V]](tableToStreamNode)
return &keyValueGStream[K, V]{
builder: t.builder,
rid: t.rid,
addChild: currying,
}
}
func (t *gtable[K, V]) valueGetter() func(K) (V, error) {
return t.kvstore.Get
}
// -------------------------------
// Aggregate aggregate records by the key.
func Aggregate[K, V, VR any](kvs KeyValueGStream[K, V], initializer func() VR, aggregator func(KeyValue[K, V], VR) VR, stateOpt state.Options[K, VR]) GTable[K, VR] {
kvsImpl := kvs.(*keyValueGStream[K, V])
kvstore := state.NewKeyValueStore(stateOpt)
if closer, ok := kvstore.(Closer); ok {
kvsImpl.builder.sctx.addStore(closer)
}
aggProcessorSupplier := newStreamAggregateSupplier(initializer, aggregator, kvstore)
aggNode := newProcessorNode[KeyValue[K, V], KeyValue[K, Change[VR]]](aggProcessorSupplier)
castAddChild[KeyValue[K, V], KeyValue[K, Change[VR]]](kvsImpl.addChild)(aggNode)
currying := curryingAddChild[KeyValue[K, V], KeyValue[K, Change[VR]], KeyValue[K, Change[VR]]](aggNode)
return >able[K, VR]{
builder: kvsImpl.builder,
rid: kvsImpl.rid,
kvstore: kvstore,
addChild: currying,
}
}
// Count count the number of records by the key.
func Count[K, V any](kvs KeyValueGStream[K, V], stateOpt state.Options[K, int]) GTable[K, int] {
cntInit := func() int { return 0 }
cntAgg := func(_ KeyValue[K, V], cnt int) int { return cnt + 1 }
return Aggregate(kvs, cntInit, cntAgg, stateOpt)
}