Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: database backend add ssdb support #1280

Open
wants to merge 4 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
19 changes: 19 additions & 0 deletions context/context.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,11 +6,13 @@
"errors"
"fmt"
"math/rand"
"net/url"
"os"
"os/signal"
"path/filepath"
"runtime"
"runtime/pprof"
"strconv"
"strings"
"sync"
"time"
Expand All @@ -21,6 +23,7 @@
"github.com/aptly-dev/aptly/database"
"github.com/aptly-dev/aptly/database/etcddb"
"github.com/aptly-dev/aptly/database/goleveldb"
"github.com/aptly-dev/aptly/database/ssdb"
"github.com/aptly-dev/aptly/deb"
"github.com/aptly-dev/aptly/files"
"github.com/aptly-dev/aptly/http"
Expand All @@ -29,6 +32,7 @@
"github.com/aptly-dev/aptly/swift"
"github.com/aptly-dev/aptly/task"
"github.com/aptly-dev/aptly/utils"
"github.com/seefan/gossdb/v2/conf"
"github.com/smira/commander"
"github.com/smira/flag"
)
Expand Down Expand Up @@ -300,6 +304,21 @@
context.database, err = goleveldb.NewDB(dbPath)
case "etcd":
context.database, err = etcddb.NewDB(context.config().DatabaseBackend.URL)
case "ssdb":
var cfg conf.Config
u, e := url.Parse(context.config().DatabaseBackend.URL)

if e != nil {
return nil, e
}
cfg.Port, e = strconv.Atoi(u.Port())
cfg.Host = strings.Split(u.Host, ":")[0]
if e != nil {
return nil, e
}
password, _ := u.User.Password()
cfg.Password = password
context.database, err = ssdb.NewOpenDB(&cfg)

Check warning on line 321 in context/context.go

View check run for this annotation

Codecov / codecov/patch

context/context.go#L307-L321

Added lines #L307 - L321 were not covered by tests
default:
context.database, err = goleveldb.NewDB(context.dbPath())
}
Expand Down
129 changes: 129 additions & 0 deletions database/ssdb/batch.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,129 @@
package ssdb

import (
"fmt"

"github.com/aptly-dev/aptly/database"
"github.com/seefan/gossdb/v2/conf"
"github.com/seefan/gossdb/v2/pool"
)

const (
delOpt = "del"
)

type bWriteData struct {
key []byte
value []byte
opts string
err error
}

type Batch struct {
cfg *conf.Config
// key-value chan
w chan bWriteData
p map[string]interface{}
d []string
db *pool.Client
}

// func internalOpenBatch...
func internalOpenBatch(_ database.Storage) *Batch {
b := &Batch{
w: make(chan bWriteData),
p: make(map[string]interface{}),
}
b.run()

return b
}

func (b *Batch) run() {
go func() {
for {
select {
case w, ok := <-b.w:
{
if !ok {
ssdbLog("ssdb batch write chan closed")
return
}

if w.opts == "write" {
ssdbLog("ssdb batch write")
var err error
if len(b.p) > 0 && len(b.d) == 0 {
err = b.db.MultiSet(b.p)
ssdbLog("ssdb batch set errinfo: ", err)
} else if len(b.d) > 0 && len(b.p) == 0 {
err = b.db.MultiDel(b.d...)
ssdbLog("ssdb batch del errinfo: ", err)
} else if len(b.p) == 0 && len(b.d) == 0 {
err = nil
} else {
err = fmt.Errorf("ssdb batch does not support both put and delete operations")
}

Check warning on line 66 in database/ssdb/batch.go

View check run for this annotation

Codecov / codecov/patch

database/ssdb/batch.go#L63-L66

Added lines #L63 - L66 were not covered by tests
ssdbLog("ssdb batch write errinfo: ", err)
b.w <- bWriteData{
err: err,
}
ssdbLog("ssdb batch write end")
} else {
ssdbLog("ssdb batch", w.opts)
if w.opts == "put" {
b.p[string(w.key)] = w.value
} else if w.opts == delOpt {
b.d = append(b.d, string(w.key))
}
}
}
}
}
}()
}

func (b *Batch) stop() {
ssdbLog("ssdb batch stop")
close(b.w)
}

func (b *Batch) Put(key, value []byte) (err error) {
// err = b.db.Set(string(key), string(value))
w := bWriteData{
key: key,
value: value,
opts: "put",
}

b.w <- w
return nil
}

func (b *Batch) Delete(key []byte) (err error) {
/* err = b.db.Del(string(key))
return */
w := bWriteData{
key: key,
opts: delOpt,
}

b.w <- w
return nil
}

func (b *Batch) Write() (err error) {
defer b.stop()
w := bWriteData{
opts: "write",
}

b.w <- w
result := <-b.w
return result.err
}

// batch should implement database.Batch
var (
_ database.Batch = &Batch{}
)
62 changes: 62 additions & 0 deletions database/ssdb/database.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,62 @@
package ssdb

import (
"os"
"strconv"

"github.com/aptly-dev/aptly/database"
"github.com/seefan/gossdb/v2"
"github.com/seefan/gossdb/v2/conf"
"github.com/seefan/gossdb/v2/pool"
)

var defaultBufSize = 102400
var defaultPoolSize = 1

func internalOpen(cfg *conf.Config) (*pool.Client, error) {
ssdbLog("internalOpen")

cfg.ReadBufferSize = defaultBufSize
cfg.WriteBufferSize = defaultBufSize
cfg.MaxPoolSize = defaultPoolSize
cfg.PoolSize = defaultPoolSize
cfg.MinPoolSize = defaultPoolSize
cfg.MaxWaitSize = 100 * defaultPoolSize
cfg.RetryEnabled = true

//override by env
if os.Getenv("SSDB_READBUFFERSIZE") != "" {
readBufSize, err := strconv.Atoi(os.Getenv("SSDB_READBUFFERSIZE"))
if err != nil {
cfg.ReadBufferSize = readBufSize
}

Check warning on line 32 in database/ssdb/database.go

View check run for this annotation

Codecov / codecov/patch

database/ssdb/database.go#L29-L32

Added lines #L29 - L32 were not covered by tests
}

if os.Getenv("SSDB_WRITEBUFFERSIZE") != "" {
writeBufSize, err := strconv.Atoi(os.Getenv("SSDB_WRITEBUFFERSIZE"))
if err != nil {
cfg.WriteBufferSize = writeBufSize
}

Check warning on line 39 in database/ssdb/database.go

View check run for this annotation

Codecov / codecov/patch

database/ssdb/database.go#L36-L39

Added lines #L36 - L39 were not covered by tests
}

var cfgs = []*conf.Config{cfg}
err := gossdb.Start(cfgs...)
if err != nil {
return nil, err
}

Check warning on line 46 in database/ssdb/database.go

View check run for this annotation

Codecov / codecov/patch

database/ssdb/database.go#L45-L46

Added lines #L45 - L46 were not covered by tests

return gossdb.NewClient()
}

func NewDB(cfg *conf.Config) (database.Storage, error) {
return &Storage{cfg: cfg}, nil
}

func NewOpenDB(cfg *conf.Config) (database.Storage, error) {
db, err := NewDB(cfg)
if err != nil {
return nil, err
}

Check warning on line 59 in database/ssdb/database.go

View check run for this annotation

Codecov / codecov/patch

database/ssdb/database.go#L58-L59

Added lines #L58 - L59 were not covered by tests

return db, db.Open()
}
Loading
Loading