-
Notifications
You must be signed in to change notification settings - Fork 0
/
QueryParameters.zig
222 lines (194 loc) · 6.52 KB
/
QueryParameters.zig
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
const std = @import("std");
const mem = std.mem;
const meta = std.meta;
const net = std.net;
const protocol = @import("protocol.zig");
const CompressionAlgorithm = protocol.CompressionAlgorithm;
const Consistency = protocol.Consistency;
const MessageReader = protocol.MessageReader;
const MessageWriter = protocol.MessageWriter;
const NamedValue = protocol.NamedValue;
const NotSet = protocol.NotSet;
const OptionID = protocol.OptionID;
const ProtocolVersion = protocol.ProtocolVersion;
const Value = protocol.Value;
const Values = protocol.Values;
const Self = @This();
consistency_level: Consistency,
values: ?Values,
skip_metadata: bool,
page_size: ?u32,
paging_state: ?[]const u8, // NOTE(vincent): not a string
serial_consistency_level: ?Consistency,
timestamp: ?u64,
keyspace: ?[]const u8,
now_in_seconds: ?u32,
const FlagWithValues: u32 = 0x0001;
const FlagSkipMetadata: u32 = 0x0002;
const FlagWithPageSize: u32 = 0x0004;
const FlagWithPagingState: u32 = 0x0008;
const FlagWithSerialConsistency: u32 = 0x0010;
const FlagWithDefaultTimestamp: u32 = 0x0020;
const FlagWithNamedValues: u32 = 0x0040;
const FlagWithKeyspace: u32 = 0x0080;
const FlagWithNowInSeconds: u32 = 0x100;
pub fn write(self: Self, protocol_version: ProtocolVersion, mw: *MessageWriter) !void {
_ = try mw.writeConsistency(self.consistency_level);
// Build the flags value
var flags: u32 = 0;
if (self.values) |v| {
flags |= FlagWithValues;
if (v == .Named) {
flags |= FlagWithNamedValues;
}
}
if (self.skip_metadata) {
flags |= FlagSkipMetadata;
}
if (self.page_size != null) {
flags |= FlagWithPageSize;
}
if (self.paging_state != null) {
flags |= FlagWithPagingState;
}
if (self.serial_consistency_level != null) {
flags |= FlagWithSerialConsistency;
}
if (self.timestamp != null) {
flags |= FlagWithDefaultTimestamp;
}
if (protocol_version.is(5)) {
if (self.keyspace != null) {
flags |= FlagWithKeyspace;
}
if (self.now_in_seconds != null) {
flags |= FlagWithNowInSeconds;
}
}
if (protocol_version.is(5)) {
_ = try mw.writeInt(u32, flags);
} else {
_ = try mw.writeInt(u8, @intCast(flags));
}
// Write the remaining body
if (self.values) |values| {
switch (values) {
.Normal => |normal_values| {
_ = try mw.writeInt(u16, @intCast(normal_values.len));
for (normal_values) |value| {
_ = try mw.writeValue(value);
}
},
.Named => |named_values| {
_ = try mw.writeInt(u16, @intCast(named_values.len));
for (named_values) |v| {
_ = try mw.writeString(v.name);
_ = try mw.writeValue(v.value);
}
},
}
}
if (self.page_size) |ps| {
_ = try mw.writeInt(u32, ps);
}
if (self.paging_state) |ps| {
_ = try mw.writeBytes(ps);
}
if (self.serial_consistency_level) |consistency| {
_ = try mw.writeConsistency(consistency);
}
if (self.timestamp) |ts| {
_ = try mw.writeInt(u64, ts);
}
if (!protocol_version.is(5)) {
return;
}
// The following flags are only valid with protocol v5
if (self.keyspace) |ks| {
_ = try mw.writeString(ks);
}
if (self.now_in_seconds) |s| {
_ = try mw.writeInt(u32, s);
}
}
pub fn read(allocator: mem.Allocator, protocol_version: ProtocolVersion, mr: *MessageReader) !Self {
var params = Self{
.consistency_level = undefined,
.values = null,
.skip_metadata = false,
.page_size = null,
.paging_state = null,
.serial_consistency_level = null,
.timestamp = null,
.keyspace = null,
.now_in_seconds = null,
};
params.consistency_level = try mr.readConsistency();
// The remaining data in the message depends on the flags
// The size of the flags bitmask depends on the protocol version.
var flags: u32 = 0;
if (protocol_version.is(5)) {
flags = try mr.readInt(u32);
} else {
flags = try mr.readInt(u8);
}
if (flags & FlagWithValues == FlagWithValues) {
const n = try mr.readInt(u16);
if (flags & FlagWithNamedValues == FlagWithNamedValues) {
var list = std.ArrayList(NamedValue).init(allocator);
errdefer list.deinit();
var i: usize = 0;
while (i < @as(usize, n)) : (i += 1) {
const nm = NamedValue{
.name = try mr.readString(allocator),
.value = try mr.readValue(allocator),
};
_ = try list.append(nm);
}
params.values = Values{ .Named = try list.toOwnedSlice() };
} else {
var list = std.ArrayList(Value).init(allocator);
errdefer list.deinit();
var i: usize = 0;
while (i < @as(usize, n)) : (i += 1) {
const value = try mr.readValue(allocator);
_ = try list.append(value);
}
params.values = Values{ .Normal = try list.toOwnedSlice() };
}
}
if (flags & FlagSkipMetadata == FlagSkipMetadata) {
params.skip_metadata = true;
}
if (flags & FlagWithPageSize == FlagWithPageSize) {
params.page_size = try mr.readInt(u32);
}
if (flags & FlagWithPagingState == FlagWithPagingState) {
params.paging_state = try mr.readBytes(allocator);
}
if (flags & FlagWithSerialConsistency == FlagWithSerialConsistency) {
const consistency_level = try mr.readConsistency();
if (consistency_level != .Serial and consistency_level != .LocalSerial) {
return error.InvalidSerialConsistency;
}
params.serial_consistency_level = consistency_level;
}
if (flags & FlagWithDefaultTimestamp == FlagWithDefaultTimestamp) {
const timestamp = try mr.readInt(u64);
if (timestamp < 0) {
return error.InvalidNegativeTimestamp;
}
params.timestamp = timestamp;
}
if (!protocol_version.is(5)) {
return params;
}
// The following flags are only valid with protocol v5
if (flags & FlagWithKeyspace == FlagWithKeyspace) {
params.keyspace = try mr.readString(allocator);
}
if (flags & FlagWithNowInSeconds == FlagWithNowInSeconds) {
params.now_in_seconds = try mr.readInt(u32);
}
return params;
}