-
Notifications
You must be signed in to change notification settings - Fork 0
/
engine.go
390 lines (333 loc) · 10.9 KB
/
engine.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
// Package storage for key-value storage engine
package storage
import (
"encoding/binary"
"fmt"
"golang.org/x/sys/unix"
"io"
"os"
"path/filepath"
"sync"
"time"
)
const (
_ = iota // ignore first value (0) by assigning to blank identifier
KB = 1 << (10 * iota)
MB
GB
)
// default internal constants for the storage engine
// can be overridden by the user with provided options
const (
defaultTombstone = "tombstone-jbc46-q42fd-pggmc-kp38y-6mqd8"
defaultLogSize = 10 * MB
defaultKeySize = 1 * KB
defaultCompactionInterval = 1 * time.Hour
)
// Engine represents the storage engine for key-value storage
type Engine struct {
// logs represents the list of log file and index for the storage engine
// TODO: let's see if we can change this to a []log and what's the benefit of using a slice of pointers
readLogs []*readLog
// maxLogBytes represents the max size of the log file in bytes if the log file exceeds this size
// a new log file will be created and this file will be closed for writing.
// the smaller the size the more log files will be created and the more time it will take to read a key specially
// if the key is not found in the storage we have to recursively search all the log files to find the key from the
// latest to the oldest log file so inorder to reduce the number of log files we can increase the max log size
maxLogBytes int64
// maxKeyBytes represents the max size of the key in bytes if the key exceeds this size an error will be returned
// and the state of the storage engine will not be changed. Since all the keys are stored in the in-memory index
// it's better to keep the key size small to reduce the memory footprint of the storage engine and practically have
// more keys in the storage engine
maxKeyBytes int64
// represents the tombstone value for the storage engine which a special value used to mark a key as deleted
// the key will still be part of the index and the value will be set to the tombstone value which later will be
// picked up by the garbage collector and removed from the index also the compaction process will remove the key
// from all the other log files
tombStone string
// represents the path where the data files will be stored if the path doesn't exist it will be created
dataPath string
// represents the file used to lock the storage engine for writing
// this lock makes sure only one process can write to the storage engine at a time
lockFile *os.File
// represents the lock for the storage engine to ensure only one process can write to the storage engine at a time
lock sync.RWMutex
// writeLog represents the current log file and index for the storage engine
writeLog *writeLog
// options holds a slice of OptionSetter functions for configuring the engine.
// This approach allows for flexible and extensible configuration of the Engine instance.
// Each OptionSetter is a function that modifies the Engine's state, enabling customization
// of behavior such as setting maximum log sizes, key sizes, or other operational parameters.
options []OptionSetter
// compactionManager handles all compaction-related processes
compactionManager *compactionManager
}
// NewEngine creates a new Engine instance with default settings which can be overridden with optional settings
// path is where the data files will be stored if the path doesn't exist it will be created
// the user should have write access to the path otherwise an error will be returned
func NewEngine(path string, options ...OptionSetter) (*Engine, error) {
path = ensureTrailingSlash(path)
if err := validateDataPath(path); err != nil {
return nil, err
}
lockFile, err := createFlock(path)
if err != nil {
return nil, err
}
engine := &Engine{
maxLogBytes: defaultLogSize,
maxKeyBytes: defaultKeySize,
tombStone: defaultTombstone,
dataPath: path,
lockFile: lockFile,
options: options,
compactionManager: &compactionManager{
enabled: false,
interval: defaultCompactionInterval,
},
}
for _, option := range options {
if err := option(engine); err != nil {
return nil, err
}
}
dataFiles, err := extractDatafiles(path)
if err != nil {
return nil, err
}
readLogs, err := initReadLogs(dataFiles)
if err != nil {
return nil, err
}
engine.readLogs = readLogs
file, err := engine.createNewFile()
if err != nil {
return nil, err
}
engine.writeLog = &writeLog{file: file, index: make(map[string]int64)}
// start background compaction process if enabled
if engine.compactionManager.enabled {
err := engine.startBackgroundCompaction()
if err != nil {
return nil, err
}
}
return engine, nil
}
type OptionSetter func(*Engine) error
// WithMaxLogSize sets the max size of the log file
func WithMaxLogSize(size int64) OptionSetter {
return func(e *Engine) error {
if size <= 0 {
return fmt.Errorf("invalid max log size")
}
e.maxLogBytes = size
return nil
}
}
// WithMaxKeySize sets the max size of the key
func WithMaxKeySize(size int64) OptionSetter {
return func(e *Engine) error {
if size <= 0 {
return fmt.Errorf("invalid max key size")
}
e.maxKeyBytes = size
return nil
}
}
// WithTombStone sets the tombstone value
func WithTombStone(value string) OptionSetter {
return func(engine *Engine) error {
if value == "" {
return fmt.Errorf("invalid tombstone value")
}
engine.tombStone = value
return nil
}
}
// WithCompactionEnabled enables compaction for the storage engine
func WithCompactionEnabled() OptionSetter {
return func(engine *Engine) error {
engine.compactionManager.enabled = true
return nil
}
}
// WithCompactionInterval sets the interval for the compaction process
func WithCompactionInterval(interval time.Duration) OptionSetter {
return func(engine *Engine) error {
if interval <= 0 {
return fmt.Errorf("invalid compaction interval")
}
engine.compactionManager.interval = interval
return nil
}
}
func (e *Engine) Close() error {
if e.compactionManager.ticker != nil {
e.compactionManager.ticker.Stop()
}
if err := e.writeLog.file.Sync(); err != nil {
return err
}
if err := e.writeLog.file.Close(); err != nil {
return err
}
if err := unix.Flock(int(e.lockFile.Fd()), unix.LOCK_UN); err != nil {
return nil
}
return nil
}
// Put set a key-value pair in the storage engine
// key and value are strings
func (e *Engine) Put(key, value string) error {
return e.putKeyValue(key, value)
}
// putKeyValue validates the key and value and then appends the key-value pair to the storage engine
func (e *Engine) putKeyValue(key, value string) error {
if err := e.validateKey(key); err != nil {
return err
}
if err := e.validateValue(value); err != nil {
return err
}
return e.appendKeyValue(key, value)
}
// Get retrieves the value associated with the given key from the storage engine.
func (e *Engine) Get(key string) (string, error) {
return e.findValueInLogs(key)
}
// findValueInLogs searches for a value corresponding to the given key
// in the log files, starting with the most recent.
func (e *Engine) findValueInLogs(key string) (string, error) {
if err := e.validateKey(key); err != nil {
return "", err
}
e.lock.RLock()
writeLog := e.writeLog
offset, ok := writeLog.index[key]
e.lock.RUnlock()
if ok {
value, err := e.readValueFromFile(writeLog.file.Name(), offset)
if value == e.tombStone {
return "", fmt.Errorf("value not found")
}
return value, err
}
for i := len(e.readLogs) - 1; i >= 0; i-- {
currentLog := e.readLogs[i]
offset, exists := currentLog.index[key]
if exists {
value, err := e.readValueFromFile(currentLog.path, offset)
if value == e.tombStone {
return "", fmt.Errorf("value not found")
}
return value, err
}
}
return "", fmt.Errorf("key %s not found", key)
}
// readValueFromFile reads a value from a file at the given offset.
func (e *Engine) readValueFromFile(path string, offset int64) (string, error) {
value, err := openAndReadAtDataFile(path, offset)
if err != nil {
return "", err
}
return value, nil
}
// Delete deletes a key-value pair from the storage engine
// Internally it sets the value to a tombstone value and then garbage collector will remove it
func (e *Engine) Delete(key string) error {
return e.deleteKey(key)
}
// deleteKey validates the key and then appends the key-value pair to the storage engine
func (e *Engine) deleteKey(key string) error {
if err := e.validateKey(key); err != nil {
return err
}
return e.appendKeyValue(key, e.tombStone)
}
func (e *Engine) closeWriteLog() error {
e.readLogs = append(e.readLogs, &readLog{path: e.writeLog.file.Name(), index: e.writeLog.index})
return e.writeLog.file.Close()
}
// appendKeyValue appends a key-value pair to the file
func (e *Engine) appendKeyValue(key, value string) error {
e.lock.Lock()
defer e.lock.Unlock()
if e.writeLog.size >= e.maxLogBytes {
err := e.closeWriteLog()
if err != nil {
return err
}
file, err := e.createNewFile()
if err != nil {
return err
}
e.writeLog = &writeLog{file: file, index: make(map[string]int64), size: 0}
}
keyBytes := []byte(key)
keySize := uint32(len(keyBytes))
sizeBuffer := make([]byte, 4)
binary.LittleEndian.PutUint32(sizeBuffer, keySize)
written, err := e.writeLog.file.Write(sizeBuffer)
if err != nil {
return err
}
e.writeLog.size += int64(written)
written, err = e.writeLog.file.Write(keyBytes)
if err != nil {
return err
}
e.writeLog.size += int64(written)
// Find the current write position in the file
// Current position is the position that we write the value size
currentPos, err := e.writeLog.file.Seek(0, io.SeekCurrent)
if err != nil {
return err
}
valueBytes := []byte(value)
valueSize := uint32(len(valueBytes))
sizeBuffer = make([]byte, 4)
binary.LittleEndian.PutUint32(sizeBuffer, valueSize)
written, err = e.writeLog.file.Write(sizeBuffer)
if err != nil {
return err
}
e.writeLog.size += int64(written)
written, err = e.writeLog.file.Write(valueBytes)
if err != nil {
return err
}
e.writeLog.size += int64(written)
// Update the index with the current write position
e.writeLog.index[key] = currentPos
return nil
}
func (e *Engine) validateKey(key string) error {
if key == "" {
return fmt.Errorf("key cannot be empty")
}
if int64(len([]byte(key))) > e.maxKeyBytes {
return fmt.Errorf("key cannot be longer than %d bytes", e.maxKeyBytes)
}
return nil
}
func (e *Engine) validateValue(value string) error {
if value == e.tombStone {
return fmt.Errorf("value cannot be tombstone")
}
// value size should be less than the max size of the log file
if int64(len([]byte(value))) > e.maxLogBytes {
return fmt.Errorf("value cannot be longer than %d bytes", e.maxLogBytes)
}
return nil
}
func (e *Engine) createNewFile() (*os.File, error) {
fileName := fmt.Sprintf("%d%s", len(e.readLogs)+1, dataFileFormatSuffix)
dataFilePath := filepath.Join(e.dataPath, fileName)
file, err := os.OpenFile(dataFilePath, os.O_CREATE|os.O_WRONLY|os.O_APPEND, 0o644) // how we should get the righy permission
if err != nil {
return nil, err
}
return file, nil
}