-
Notifications
You must be signed in to change notification settings - Fork 26
/
geminio.go
171 lines (142 loc) · 3.38 KB
/
geminio.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
package geminio
import (
"context"
"net"
"time"
"github.com/singchia/geminio/options"
)
// RPC releated
type Request interface {
// those meta info shouldn't be changed
ID() uint64
StreamID() uint64
ClientID() uint64
Method() string
Timeout() time.Duration
// application data
Data() []byte
// custom data
Custom() []byte
SetTimeout(timeout time.Duration)
SetCustom([]byte)
SetClientID(clientID uint64)
SetStreamID(streamID uint64)
}
type Response interface {
// those meta info shouldn't be changed
ID() uint64
StreamID() uint64
ClientID() uint64
Method() string
// application data
Data() []byte
Error() error
// custom data
Custom() []byte
SetData([]byte)
SetError(error)
SetCustom([]byte)
SetClientID(clientID uint64)
SetStreamID(streamID uint64)
}
type MethodRPC struct {
Method string
RPC RPC
}
// rpc functions
type RPC func(context.Context, Request, Response)
// hijack rpc functions
type HijackRPC func(context.Context, string, Request, Response)
// for async RPC
type Call struct {
Method string
Request Request
Response Response
Error error
Done chan *Call
}
type RPCer interface {
NewRequest(data []byte, opts ...*options.NewRequestOptions) Request
Call(ctx context.Context, method string, req Request, opts ...*options.CallOptions) (Response, error)
CallAsync(ctx context.Context, method string, req Request, ch chan *Call, opts ...*options.CallOptions) (*Call, error)
Register(ctx context.Context, method string, rpc RPC) error
// Hijack rpc from remote
Hijack(rpc HijackRPC, opts ...*options.HijackOptions) error
}
type Message interface {
// to tell peer received or errored
Done() error
Error(err error) error
// those meta info shouldn't be changed
ID() uint64
StreamID() uint64
ClientID() uint64
Timeout() time.Duration
Topic() string // empty if not set
// consistency protocol
Cnss() options.Cnss
// application data
Data() []byte
// custom data
Custom() []byte
// those Set operations must be accomplish before Publish
SetTimeout(timeout time.Duration)
SetCustom(data []byte)
SetTopic(topic string)
SetClientID(clientID uint64)
SetStreamID(streamID uint64)
}
// for async Publish
type Publish struct {
Message Message
Error error
Done chan *Publish
}
type Messager interface {
NewMessage(data []byte, opts ...*options.NewMessageOptions) Message
Publish(ctx context.Context, msg Message, opts ...*options.PublishOptions) error
PublishAsync(ctx context.Context, msg Message, ch chan *Publish, opts ...*options.PublishOptions) (*Publish, error)
Receive(ctx context.Context) (Message, error)
}
type Raw net.Conn
type RawRPCMessager interface {
// raw
Raw
// rpc
RPCer
// message
Messager
}
type Side int
const (
InitiatorSide Side = 0
RecipientSide Side = 1
)
type Stream interface {
// a stream is a geminio
RawRPCMessager
// meta info for a stream
StreamID() uint64
ClientID() uint64
Meta() []byte
Side() Side
Peer() string
}
// Stream multiplexer
type Multiplexer interface {
OpenStream(opts ...*options.OpenStreamOptions) (Stream, error)
AcceptStream() (Stream, error)
ListStreams() []Stream
}
type End interface {
// End is a default stream with streamID 1
// Close on default stream will close all from the End
Stream
// End is a stream multiplexer
Multiplexer
// End is a net.Listener
// Accept is a wrapper for AcceptStream
// Addr is a wrapper for LocalAddr
net.Listener
Close() error
}