-
Notifications
You must be signed in to change notification settings - Fork 7
/
recv.go
executable file
·218 lines (197 loc) · 6.86 KB
/
recv.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
package wkproto
import (
"fmt"
"github.com/pkg/errors"
)
type StreamFlag uint8
const (
StreamFlagStart StreamFlag = 0 // 开始
StreamFlagIng StreamFlag = 1 // 进行中
StreamFlagEnd StreamFlag = 2 // 结束
)
// RecvPacket 收到消息的包
type RecvPacket struct {
Framer
Setting Setting
MsgKey string // 用于验证此消息是否合法(仿中间人篡改)
Expire uint32 // 消息过期时间 0 表示永不过期
MessageID int64 // 服务端的消息ID(全局唯一)
MessageSeq uint32 // 消息序列号 (用户唯一,有序递增)
ClientMsgNo string // 客户端唯一标示
StreamNo string // 流式编号
StreamSeq uint32 // 流式序列号
StreamFlag StreamFlag // 流式标记
Timestamp int32 // 服务器消息时间戳(10位,到秒)
ChannelID string // 频道ID
ChannelType uint8 // 频道类型
Topic string // 话题ID
FromUID string // 发送者UID
Payload []byte // 消息内容
// ---------- 以下不参与编码 ------------
ClientSeq uint64 // 客户端提供的序列号,在客户端内唯一
}
// GetPacketType 获得包类型
func (r *RecvPacket) GetFrameType() FrameType {
return RECV
}
func (r *RecvPacket) Size() int {
return r.SizeWithProtoVersion(LatestVersion)
}
func (r *RecvPacket) SizeWithProtoVersion(protVersion uint8) int {
return encodeRecvSize(r, protVersion)
}
// VerityString 验证字符串
func (r *RecvPacket) VerityString() string {
return fmt.Sprintf("%d%d%s%d%s%s%d%s", r.MessageID, r.MessageSeq, r.ClientMsgNo, r.Timestamp, r.FromUID, r.ChannelID, r.ChannelType, string(r.Payload))
}
func (r *RecvPacket) String() string {
return fmt.Sprintf("recv Header:%s Setting:%d MessageID:%d MessageSeq:%d Timestamp:%d Expire:%d FromUid:%s ChannelID:%s ChannelType:%d Topic:%s Payload:%s", r.Framer, r.Setting, r.MessageID, r.MessageSeq, r.Timestamp, r.Expire, r.FromUID, r.ChannelID, r.ChannelType, r.Topic, string(r.Payload))
}
func decodeRecv(frame Frame, data []byte, version uint8) (Frame, error) {
dec := NewDecoder(data)
recvPacket := &RecvPacket{}
recvPacket.Framer = frame.(Framer)
var err error
setting, err := dec.Uint8()
if err != nil {
return nil, errors.Wrap(err, "解码消息设置失败!")
}
recvPacket.Setting = Setting(setting)
// MsgKey
if recvPacket.MsgKey, err = dec.String(); err != nil {
return nil, errors.Wrap(err, "解码MsgKey失败!")
}
// 发送者
if recvPacket.FromUID, err = dec.String(); err != nil {
return nil, errors.Wrap(err, "解码FromUID失败!")
}
// 频道ID
if recvPacket.ChannelID, err = dec.String(); err != nil {
return nil, errors.Wrap(err, "解码ChannelId失败!")
}
// 频道类型
if recvPacket.ChannelType, err = dec.Uint8(); err != nil {
return nil, errors.Wrap(err, "解码ChannelType失败!")
}
if version >= 3 {
var expire uint32
if expire, err = dec.Uint32(); err != nil {
return nil, errors.Wrap(err, "解码Expire失败!")
}
recvPacket.Expire = expire
}
// 客户端唯一标示
if recvPacket.ClientMsgNo, err = dec.String(); err != nil {
return nil, errors.Wrap(err, "解码ClientMsgNo失败!")
}
// 流消息
if version >= 2 && recvPacket.Setting.IsSet(SettingStream) {
if recvPacket.StreamNo, err = dec.String(); err != nil {
return nil, errors.Wrap(err, "解码StreamNo失败!")
}
if recvPacket.StreamSeq, err = dec.Uint32(); err != nil {
return nil, errors.Wrap(err, "解码StreamSeq失败!")
}
var streamFlag uint8
if streamFlag, err = dec.Uint8(); err != nil {
return nil, errors.Wrap(err, "解码StreamFlag失败!")
}
recvPacket.StreamFlag = StreamFlag(streamFlag)
}
// 消息全局唯一ID
if recvPacket.MessageID, err = dec.Int64(); err != nil {
return nil, errors.Wrap(err, "解码MessageId失败!")
}
// 消息序列号 (用户唯一,有序递增)
if recvPacket.MessageSeq, err = dec.Uint32(); err != nil {
return nil, errors.Wrap(err, "解码MessageSeq失败!")
}
// 消息时间
if recvPacket.Timestamp, err = dec.Int32(); err != nil {
return nil, errors.Wrap(err, "解码Timestamp失败!")
}
if recvPacket.Setting.IsSet(SettingTopic) {
// topic
if recvPacket.Topic, err = dec.String(); err != nil {
return nil, errors.Wrap(err, "解密topic消息失败!")
}
}
// payloadStartLen := 8 + 4 + 4 + uint32(len(recvPacket.ChannelID)+2) + 1 + uint32(len(recvPacket.FromUID)+2) // 消息ID长度 + 消息序列号长度 + 消息时间长度 +频道ID长度+字符串标示长度 + 频道类型长度 + 发送者uid长度
// if version > 1 {
// payloadStartLen += uint32(len(recvPacket.ClientMsgNo) + 2)
// }
// if version > 2 {
// payloadStartLen += uint32(len(recvPacket.MsgKey) + 2)
// }
// if version > 3 {
// payloadStartLen += 1 // 设置的长度
// }
// if uint32(len(data)) < payloadStartLen {
// return nil, errors.New("解码RECV消息时失败!payload开始长度位置大于整个剩余数据长度!")
// }
// recvPacket.Payload = data[payloadStartLen:]
if recvPacket.Payload, err = dec.BinaryAll(); err != nil {
return nil, errors.Wrap(err, "解码payload失败!")
}
return recvPacket, err
}
func encodeRecv(recvPacket *RecvPacket, enc *Encoder, version uint8) error {
// setting
_ = enc.WriteByte(recvPacket.Setting.Uint8())
// MsgKey
enc.WriteString(recvPacket.MsgKey)
// 发送者
enc.WriteString(recvPacket.FromUID)
// 频道ID
enc.WriteString(recvPacket.ChannelID)
// 频道类型
enc.WriteUint8(recvPacket.ChannelType)
if version >= 3 {
enc.WriteUint32(recvPacket.Expire)
}
// 客户端唯一标示
enc.WriteString(recvPacket.ClientMsgNo)
// 流消息
if version >= 2 && recvPacket.Setting.IsSet(SettingStream) {
enc.WriteString(recvPacket.StreamNo)
enc.WriteUint32(recvPacket.StreamSeq)
enc.WriteUint8(uint8(recvPacket.StreamFlag))
}
// 消息唯一ID
enc.WriteInt64(recvPacket.MessageID)
// 消息有序ID
enc.WriteUint32(recvPacket.MessageSeq)
// 消息时间戳
enc.WriteInt32(recvPacket.Timestamp)
if recvPacket.Setting.IsSet(SettingTopic) {
enc.WriteString(recvPacket.Topic)
}
// 消息内容
enc.WriteBytes(recvPacket.Payload)
return nil
}
func encodeRecvSize(packet *RecvPacket, version uint8) int {
size := 0
size += SettingByteSize
size += (len(packet.MsgKey) + StringFixLenByteSize)
size += (len(packet.FromUID) + StringFixLenByteSize)
size += (len(packet.ChannelID) + StringFixLenByteSize)
size += ChannelTypeByteSize
if version >= 3 {
size += ExpireByteSize
}
size += (len(packet.ClientMsgNo) + StringFixLenByteSize)
if version >= 2 && packet.Setting.IsSet(SettingStream) {
size += (len(packet.StreamNo) + StringFixLenByteSize)
size += StreamSeqByteSize
size += StreamFlagByteSize
}
size += MessageIDByteSize
size += MessageSeqByteSize
size += TimestampByteSize
if packet.Setting.IsSet(SettingTopic) {
size += (len(packet.Topic) + StringFixLenByteSize)
}
size += len(packet.Payload)
return size
}