Skip to content

Commit

Permalink
add time based interval triger for compaction procss
Browse files Browse the repository at this point in the history
  • Loading branch information
rezkam committed Nov 14, 2023
1 parent 6ba78ff commit 3c0171f
Show file tree
Hide file tree
Showing 2 changed files with 67 additions and 7 deletions.
28 changes: 26 additions & 2 deletions compaction.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,16 +5,24 @@ import (
"log/slog"
"os"
"path/filepath"
"sync"
"time"
)

type compactionManager struct {
enabled bool
interval time.Duration
ticker *time.Ticker
lock sync.Mutex
}

// compact orchestrates the compaction process for the storage engine.
// It ensures that only one compaction process can run at a time and manages the creation,
// execution, and cleanup of the compaction environment.
func (e *Engine) compact() error {
// Acquire a lock to ensure single execution of the compaction process
e.compactionLock.Lock()
defer e.compactionLock.Unlock()
e.compactionManager.lock.Lock()
defer e.compactionManager.lock.Unlock()

// Define the path for the compaction directory
compactionPath := filepath.Join(e.dataPath, "compaction")
Expand Down Expand Up @@ -162,3 +170,19 @@ func isLogInSnapshot(log *readLog, snapshotReadLogs []*readLog) bool {
}
return false
}

func (e *Engine) startBackgroundCompaction() error {
if !e.compactionManager.enabled {
return fmt.Errorf("compaction is not enabled")
}

e.compactionManager.ticker = time.NewTicker(e.compactionManager.interval)
go func() {
for range e.compactionManager.ticker.C {
if err := e.compact(); err != nil {
slog.Warn("failed to run compaction", "err", err)
}
}
}()
return nil
}
46 changes: 41 additions & 5 deletions engine.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ import (
"os"
"path/filepath"
"sync"
"time"
)

const (
Expand All @@ -21,9 +22,10 @@ const (
// default internal constants for the storage engine
// can be overridden by the user with provided options
const (
defaultTombstone = "tombstone-jbc46-q42fd-pggmc-kp38y-6mqd8"
defaultLogSize = 10 * MB
defaultKeySize = 1 * KB
defaultTombstone = "tombstone-jbc46-q42fd-pggmc-kp38y-6mqd8"
defaultLogSize = 10 * MB
defaultKeySize = 1 * KB
defaultCompactionInterval = 1 * time.Hour
)

// Engine represents the storage engine for key-value storage
Expand Down Expand Up @@ -61,8 +63,8 @@ type Engine struct {
// Each OptionSetter is a function that modifies the Engine's state, enabling customization
// of behavior such as setting maximum log sizes, key sizes, or other operational parameters.
options []OptionSetter
// compactionLock is a mutex to ensure only one compaction process runs at a time
compactionLock sync.Mutex
// compactionManager handles all compaction-related processes
compactionManager *compactionManager
}

// NewEngine creates a new Engine instance with default settings which can be overridden with optional settings
Expand All @@ -86,6 +88,10 @@ func NewEngine(path string, options ...OptionSetter) (*Engine, error) {
dataPath: path,
lockFile: lockFile,
options: options,
compactionManager: &compactionManager{
enabled: false,
interval: defaultCompactionInterval,
},
}

for _, option := range options {
Expand Down Expand Up @@ -113,6 +119,14 @@ func NewEngine(path string, options ...OptionSetter) (*Engine, error) {

engine.writeLog = &writeLog{file: file, index: make(map[string]int64)}

// start background compaction process if enabled
if engine.compactionManager.enabled {
err := engine.startBackgroundCompaction()
if err != nil {
return nil, err
}
}

return engine, nil
}

Expand Down Expand Up @@ -154,7 +168,29 @@ func WithTombStone(value string) OptionSetter {
}
}

// WithCompactionEnabled enables compaction for the storage engine
func WithCompactionEnabled() OptionSetter {
return func(engine *Engine) error {
engine.compactionManager.enabled = true
return nil
}
}

// WithCompactionInterval sets the interval for the compaction process
func WithCompactionInterval(interval time.Duration) OptionSetter {
return func(engine *Engine) error {
if interval <= 0 {
return fmt.Errorf("invalid compaction interval")
}
engine.compactionManager.interval = interval
return nil
}
}

func (e *Engine) Close() error {
if e.compactionManager.ticker != nil {
e.compactionManager.ticker.Stop()
}

if err := e.writeLog.file.Sync(); err != nil {
return err
Expand Down

0 comments on commit 3c0171f

Please sign in to comment.