Skip to content

Commit

Permalink
feat: support Turso Database
Browse files Browse the repository at this point in the history
  • Loading branch information
yudhasubki committed Jun 18, 2024
1 parent a943bbf commit 41e3d85
Show file tree
Hide file tree
Showing 11 changed files with 149 additions and 31 deletions.
30 changes: 29 additions & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@

**Block Queue** is a lightweight and cost-effective queue messaging system with pub/sub mechanism for a cheap, robust, reliable, and durable messaging system.

Crafted atop the robust foundations of [SQLite3](https://www.sqlite.org/index.html) and [NutsDB](https://github.com/nutsdb/nutsdb), Block Queue prioritizes efficiency by minimizing network latency and ensuring cost-effectiveness.
Built on the sturdy foundations of [SQLite3](https://www.sqlite.org/index.html), [NutsDB](https://github.com/nutsdb/nutsdb), and now supporting the [Turso Database](https://turso.tech/), Block Queue prioritizes efficiency by minimizing network latency and ensuring cost-effectiveness.

## Why BlockQueue

Expand All @@ -17,11 +17,15 @@ While Kafka, Redis, or SQS is an excellent product, it is quite complex and requ
### Binary
You can read it on our wiki page at: https://github.com/yudhasubki/blockqueue/wiki/Welcome-to-BlockQueue

### Driver
BlockQueue supports drivers using SQLite or Turso. You can define the driver in the config.yaml under the http.driver setting (either **sqlite** or **turso**).

### Running on Go
```bash
go get -u github.com/yudhasubki/blockqueue
```

Using SQLite:
```go
// github.com/yudhasubki/blockqueue/pkg/sqlite or you can define your own
sqlite, err := sqlite.New(cfg.SQLite.DatabaseName, sqlite.Config{
Expand All @@ -47,6 +51,30 @@ go get -u github.com/yudhasubki/blockqueue
}
```

Using Turso:
```go
// github.com/yudhasubki/blockqueue/pkg/sqlite or you can define your own
sqlite, err := turso.New("libsql://dbname-username.turso.io?authToken=[TOKEN]")
if err != nil {
return err
}

// github.com/yudhasubki/blockqueue/pkg/etcd or you can define your own
etcd, err := etcd.New(
cfg.Etcd.Path,
etcd.WithSync(cfg.Etcd.Sync),
)
if err != nil {
return err
}

stream := blockqueue.New(sqlite, etcd)
err = stream.Run(ctx)
if err != nil {
return err
}
```

## Architecture

![Publish Architecture](https://github.com/yudhasubki/blockqueue/blob/main/docs/img/publisher_architecture.png)
Expand Down
3 changes: 1 addition & 2 deletions blockqueue.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,6 @@ import (
"github.com/yudhasubki/blockqueue/pkg/etcd"
bqio "github.com/yudhasubki/blockqueue/pkg/io"
"github.com/yudhasubki/blockqueue/pkg/metric"
"github.com/yudhasubki/blockqueue/pkg/sqlite"
"github.com/yudhasubki/eventpool"
)

Expand All @@ -36,7 +35,7 @@ func init() {
prometheus.Register(metric.MessagePublished)
}

func New[V chan bqio.ResponseMessages](db *sqlite.SQLite, kv *etcd.Etcd) *BlockQueue[V] {
func New[V chan bqio.ResponseMessages](db Driver, kv *etcd.Etcd) *BlockQueue[V] {
blockqueue := &BlockQueue[V]{
db: newDb(db),
mtx: cas.New(),
Expand Down
29 changes: 21 additions & 8 deletions cmd/blockqueue/http.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ import (
blockqueue "github.com/yudhasubki/blockqueue"
"github.com/yudhasubki/blockqueue/pkg/etcd"
"github.com/yudhasubki/blockqueue/pkg/sqlite"
"github.com/yudhasubki/blockqueue/pkg/turso"
)

type Http struct{}
Expand All @@ -38,12 +39,24 @@ func (h *Http) Run(ctx context.Context, args []string) error {
return err
}

sqlite, err := sqlite.New(cfg.SQLite.DatabaseName, sqlite.Config{
BusyTimeout: cfg.SQLite.BusyTimeout,
})
if err != nil {
slog.Error("failed to open database", "error", err)
return err
var driver blockqueue.Driver
switch cfg.Http.Driver {
case "turso":
turso, err := turso.New(cfg.Turso.URL)
if err != nil {
return err
}
driver = turso
case "sqlite", "":
sqlite, err := sqlite.New(cfg.SQLite.DatabaseName, sqlite.Config{
BusyTimeout: cfg.SQLite.BusyTimeout,
})
if err != nil {
slog.Error("failed to open database", "error", err)
return err
}

driver = sqlite
}

etcd, err := etcd.New(
Expand All @@ -57,7 +70,7 @@ func (h *Http) Run(ctx context.Context, args []string) error {

ctx, cancel := context.WithCancel(ctx)

stream := blockqueue.New(sqlite, etcd)
stream := blockqueue.New(driver, etcd)

err = stream.Run(ctx)
if err != nil {
Expand Down Expand Up @@ -94,7 +107,7 @@ func (h *Http) Run(ctx context.Context, args []string) error {

engine.Stop()
stream.Close()
sqlite.Close()
driver.Close()
etcd.Close()

// handling graceful shutdown
Expand Down
10 changes: 10 additions & 0 deletions cmd/blockqueue/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -71,6 +71,7 @@ type Config struct {
Http HttpConfig `yaml:"http"`
Logging LoggingConfig `yaml:"logging"`
SQLite SQLiteConfig `yaml:"sqlite"`
Turso TursoConfig `yaml:"turso"`
Job JobConfig `yaml:"job"`
Metric MetricConfig `yaml:"metric"`
}
Expand Down Expand Up @@ -100,6 +101,10 @@ func ReadConfigFile(filename string) (_ Config, err error) {
Level: slog.LevelInfo,
}

if config.Http.Driver == "" {
config.Http.Driver = "sqlite"
}

switch strings.ToUpper(config.Logging.Level) {
case "DEBUG":
logOpts.Level = slog.LevelDebug
Expand All @@ -125,6 +130,7 @@ func ReadConfigFile(filename string) (_ Config, err error) {
type HttpConfig struct {
Port string `yaml:"port"`
Shutdown time.Duration `yaml:"shutdown"`
Driver string `yaml:"driver"`
}

func register(fs *flag.FlagSet) *string {
Expand All @@ -142,6 +148,10 @@ type SQLiteConfig struct {
BusyTimeout int `yaml:"busy_timeout"`
}

type TursoConfig struct {
URL string `yaml:"url"`
}

type EtcdConfig struct {
Path string `yaml:"path"`
Sync bool `yaml:"sync"`
Expand Down
28 changes: 21 additions & 7 deletions cmd/blockqueue/migrate.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,9 @@ import (
"os"
"path/filepath"

blockqueue "github.com/yudhasubki/blockqueue"
"github.com/yudhasubki/blockqueue/pkg/sqlite"
"github.com/yudhasubki/blockqueue/pkg/turso"
)

type Migrate struct{}
Expand All @@ -34,12 +36,24 @@ func (m *Migrate) Run(ctx context.Context, args []string) error {
return err
}

sqlite, err := sqlite.New(cfg.SQLite.DatabaseName, sqlite.Config{
BusyTimeout: cfg.SQLite.BusyTimeout,
})
if err != nil {
slog.Error("failed to open database", "error", err)
return err
var driver blockqueue.Driver
switch cfg.Http.Driver {
case "turso":
turso, err := turso.New(cfg.Turso.URL)
if err != nil {
return err
}
driver = turso
case "sqlite", "":
sqlite, err := sqlite.New(cfg.SQLite.DatabaseName, sqlite.Config{
BusyTimeout: cfg.SQLite.BusyTimeout,
})
if err != nil {
slog.Error("failed to open database", "error", err)
return err
}

driver = sqlite
}

_ = filepath.Walk("migration/", func(path string, info os.FileInfo, err error) error {
Expand All @@ -64,7 +78,7 @@ func (m *Migrate) Run(ctx context.Context, args []string) error {
return err
}

_, err = sqlite.Database.Exec(buf.String())
_, err = driver.Conn().Exec(buf.String())
if err != nil {
slog.Error("failed migrate", "filename", path, "error", err)
return err
Expand Down
3 changes: 3 additions & 0 deletions config.yaml.example
Original file line number Diff line number Diff line change
@@ -1,12 +1,15 @@
http:
port: 8080
shutdown: "30s"
driver: "sqlite"
logging:
level: "debug"
type: "json"
sqlite:
db_name: "blockqueue"
busy_timeout: 5000
turso:
url: "libsql://tursodatabase-username.turso.io"
etcd:
path: "etcdb"
sync: false
Expand Down
30 changes: 17 additions & 13 deletions db.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,16 +6,20 @@ import (
"github.com/google/uuid"
"github.com/jmoiron/sqlx"
"github.com/yudhasubki/blockqueue/pkg/core"
"github.com/yudhasubki/blockqueue/pkg/sqlite"
)

type Driver interface {
Conn() *sqlx.DB
Close() error
}

type db struct {
*sqlite.SQLite
Database Driver
}

func newDb(sqlite *sqlite.SQLite) *db {
func newDb(driver Driver) *db {
return &db{
SQLite: sqlite,
Database: driver,
}
}

Expand All @@ -39,9 +43,9 @@ func (d *db) getTopics(ctx context.Context, filter core.FilterTopic) (core.Topic
if err != nil {
return topics, err
}
query = d.Database.Rebind(query)
query = d.Database.Conn().Rebind(query)

err = d.Database.Select(&topics, query, args...)
err = d.Database.Conn().SelectContext(ctx, &topics, query, args...)
if err != nil {
return topics, err
}
Expand Down Expand Up @@ -69,9 +73,9 @@ func (d *db) getSubscribers(ctx context.Context, filter core.FilterSubscriber) (
if err != nil {
return subscribers, err
}
query = d.Database.Rebind(query)
query = d.Database.Conn().Rebind(query)

err = d.Database.Select(&subscribers, query, args...)
err = d.Database.Conn().SelectContext(ctx, &subscribers, query, args...)
if err != nil {
return subscribers, err
}
Expand Down Expand Up @@ -134,7 +138,7 @@ func (d *db) createTxSubscribers(ctx context.Context, tx *sqlx.Tx, subscribers c
}

func (d *db) createMessages(ctx context.Context, message core.Message) error {
stmt, err := d.Database.PrepareNamedContext(ctx, "INSERT INTO topic_messages (`id`, `topic_id`, `message`, `status`) VALUES (:id, :topic_id, :message, :status)")
stmt, err := d.Database.Conn().PrepareNamedContext(ctx, "INSERT INTO topic_messages (`id`, `topic_id`, `message`, `status`) VALUES (:id, :topic_id, :message, :status)")
if err != nil {
return err
}
Expand All @@ -158,7 +162,7 @@ func (d *db) updateStatusMessage(ctx context.Context, status core.MessageStatus,
return err
}

_, err = d.Database.ExecContext(ctx, d.Database.Rebind(query), args...)
_, err = d.Database.Conn().ExecContext(ctx, d.Database.Conn().Rebind(query), args...)
if err != nil {
return err
}
Expand Down Expand Up @@ -187,9 +191,9 @@ func (d *db) getMessages(ctx context.Context, filter core.FilterMessage) (core.M
if err != nil {
return messages, err
}
query = d.Database.Rebind(query)
query = d.Database.Conn().Rebind(query)

err = d.Database.Select(&messages, query, args...)
err = d.Database.Conn().SelectContext(ctx, &messages, query, args...)
if err != nil {
return messages, err
}
Expand All @@ -198,7 +202,7 @@ func (d *db) getMessages(ctx context.Context, filter core.FilterMessage) (core.M
}

func (d *db) tx(ctx context.Context, fn func(ctx context.Context, tx *sqlx.Tx) error) error {
tx, err := d.Database.Beginx()
tx, err := d.Database.Conn().Beginx()
if err != nil {
return err
}
Expand Down
5 changes: 5 additions & 0 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -18,13 +18,15 @@ require (
require (
github.com/antlabs/stl v0.0.1 // indirect
github.com/antlabs/timer v0.0.11 // indirect
github.com/antlr4-go/antlr/v4 v4.13.0 // indirect
github.com/beorn7/perks v1.0.1 // indirect
github.com/bwmarrin/snowflake v0.3.0 // indirect
github.com/cespare/xxhash/v2 v2.2.0 // indirect
github.com/davecgh/go-spew v1.1.1 // indirect
github.com/gofrs/flock v0.8.1 // indirect
github.com/golang/protobuf v1.5.3 // indirect
github.com/lesismal/llib v1.1.12 // indirect
github.com/libsql/sqlite-antlr4-parser v0.0.0-20240327125255-dbf53b6cbf06 // indirect
github.com/matttproud/golang_protobuf_extensions v1.0.4 // indirect
github.com/pkg/errors v0.9.1 // indirect
github.com/pmezard/go-difflib v1.0.0 // indirect
Expand All @@ -34,9 +36,12 @@ require (
github.com/prometheus/procfs v0.11.1 // indirect
github.com/stretchr/testify v1.8.4 // indirect
github.com/tidwall/btree v1.6.0 // indirect
github.com/tursodatabase/libsql-client-go v0.0.0-20240416075003-747366ff79c4 // indirect
github.com/xujiajun/mmap-go v1.0.1 // indirect
github.com/xujiajun/utils v0.0.0-20220904132955-5f7c5b914235 // indirect
golang.org/x/crypto v0.6.0 // indirect
golang.org/x/exp v0.0.0-20240325151524-a685a6edb6d8 // indirect
golang.org/x/sys v0.15.0 // indirect
google.golang.org/protobuf v1.31.0 // indirect
nhooyr.io/websocket v1.8.10 // indirect
)
10 changes: 10 additions & 0 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,8 @@ github.com/antlabs/stl v0.0.1 h1:TRD3csCrjREeLhLoQ/supaoCvFhNLBTNIwuRGrDIs6Q=
github.com/antlabs/stl v0.0.1/go.mod h1:wvVwP1loadLG3cRjxUxK8RL4Co5xujGaZlhbztmUEqQ=
github.com/antlabs/timer v0.0.11 h1:z75oGFLeTqJHMOcWzUPBKsBbQAz4Ske3AfqJ7bsdcwU=
github.com/antlabs/timer v0.0.11/go.mod h1:JNV8J3yGvMKhCavGXgj9HXrVZkfdQyKCcqXBT8RdyuU=
github.com/antlr4-go/antlr/v4 v4.13.0 h1:lxCg3LAv+EUK6t1i0y1V6/SLeUi0eKEKdhQAlS8TVTI=
github.com/antlr4-go/antlr/v4 v4.13.0/go.mod h1:pfChB/xh/Unjila75QW7+VU4TSnWnnk9UTnmpPaOR2g=
github.com/beorn7/perks v1.0.1 h1:VlbKKnNfV8bJzeqoa4cOKqO6bYr3WgKZxO8Z16+hsOM=
github.com/beorn7/perks v1.0.1/go.mod h1:G2ZrVWU2WbWT9wwq4/hrbKbnv/1ERSJQ0ibhJ6rlkpw=
github.com/bwmarrin/snowflake v0.3.0 h1:xm67bEhkKh6ij1790JB83OujPR5CzNe8QuQqAgISZN0=
Expand Down Expand Up @@ -34,6 +36,8 @@ github.com/lesismal/nbio v1.3.20 h1:btQdW4u8yAo2xg1PeU/gOWR0IPj2wUK+ZeVc5zHIEn4=
github.com/lesismal/nbio v1.3.20/go.mod h1:KWlouFT5cgDdW5sMX8RsHASUMGniea9X0XIellZ0B38=
github.com/lib/pq v1.2.0 h1:LXpIM/LZ5xGFhOpXAQUIMM1HdyqzVYM13zNdjCEEcA0=
github.com/lib/pq v1.2.0/go.mod h1:5WUZQaWbwv1U+lTReE5YruASi9Al49XbQIvNi/34Woo=
github.com/libsql/sqlite-antlr4-parser v0.0.0-20240327125255-dbf53b6cbf06 h1:JLvn7D+wXjH9g4Jsjo+VqmzTUpl/LX7vfr6VOfSWTdM=
github.com/libsql/sqlite-antlr4-parser v0.0.0-20240327125255-dbf53b6cbf06/go.mod h1:FUkZ5OHjlGPjnM2UyGJz9TypXQFgYqw6AFNO1UiROTM=
github.com/mattn/go-sqlite3 v1.14.6/go.mod h1:NyWgC/yNuGj7Q9rpYnZvas74GogHl5/Z4A/KQRfk6bU=
github.com/mattn/go-sqlite3 v1.14.18 h1:JL0eqdCOq6DJVNPSvArO/bIV9/P7fbGrV00LZHc+5aI=
github.com/mattn/go-sqlite3 v1.14.18/go.mod h1:2eHXhiwb8IkHr+BDWZGa96P6+rkvnG63S2DGjv9HUNg=
Expand Down Expand Up @@ -65,6 +69,8 @@ github.com/stretchr/testify v1.8.4 h1:CcVxjf3Q8PM0mHUKJCdn+eZZtm5yQwehR5yeSVQQcU
github.com/stretchr/testify v1.8.4/go.mod h1:sz/lmYIOXD/1dqDmKjjqLyZ2RngseejIcXlSw2iwfAo=
github.com/tidwall/btree v1.6.0 h1:LDZfKfQIBHGHWSwckhXI0RPSXzlo+KYdjK7FWSqOzzg=
github.com/tidwall/btree v1.6.0/go.mod h1:twD9XRA5jj9VUQGELzDO4HPQTNJsoWWfYEL+EUQ2cKY=
github.com/tursodatabase/libsql-client-go v0.0.0-20240416075003-747366ff79c4 h1:wNN8t3qiLLzFiETD4jL086WemAgQLfARClUx2Jfk78w=
github.com/tursodatabase/libsql-client-go v0.0.0-20240416075003-747366ff79c4/go.mod h1:2Fu26tjM011BLeR5+jwTfs6DX/fNMEWV/3CBZvggrA4=
github.com/xujiajun/mmap-go v1.0.1 h1:7Se7ss1fLPPRW+ePgqGpCkfGIZzJV6JPq9Wq9iv/WHc=
github.com/xujiajun/mmap-go v1.0.1/go.mod h1:CNN6Sw4SL69Sui00p0zEzcZKbt+5HtEnYUsc6BKKRMg=
github.com/xujiajun/utils v0.0.0-20220904132955-5f7c5b914235 h1:w0si+uee0iAaCJO9q86T6yrhdadgcsoNuh47LrUykzg=
Expand All @@ -77,6 +83,8 @@ golang.org/x/crypto v0.0.0-20210513122933-cd7d49e622d5/go.mod h1:P+XmwS30IXTQdn5
golang.org/x/crypto v0.0.0-20210921155107-089bfa567519/go.mod h1:GvvjBRRGRdwPK5ydBHafDWAxML/pGHZbMvKqRZ5+Abc=
golang.org/x/crypto v0.6.0 h1:qfktjS5LUO+fFKeJXZ+ikTRijMmljikvG68fpMMruSc=
golang.org/x/crypto v0.6.0/go.mod h1:OFC/31mSvZgRz0V1QTNCzfAI1aIRzbiufJtkMIlEp58=
golang.org/x/exp v0.0.0-20240325151524-a685a6edb6d8 h1:aAcj0Da7eBAtrTp03QXWvm88pSyOt+UgdZw2BFZ+lEw=
golang.org/x/exp v0.0.0-20240325151524-a685a6edb6d8/go.mod h1:CQ1k9gNrJ50XIzaKCRR2hssIjF07kZFEiieALBM/ARQ=
golang.org/x/mod v0.6.0-dev.0.20220419223038-86c51ed26bb4/go.mod h1:jJ57K6gSWd91VN4djpZkiMVwK6gcyfeH4XE8wZrZaV4=
golang.org/x/net v0.0.0-20190620200207-3b0461eec859/go.mod h1:z5CRVTTTmAJ677TzLLGU+0bjPO0LkuOLi4/5GtJWs/s=
golang.org/x/net v0.0.0-20210226172049-e18ecbb05110/go.mod h1:m0MpNAwzfU5UDzcl9v0D8zg8gWTRqZa9RBIspLL5mdg=
Expand Down Expand Up @@ -122,3 +130,5 @@ gopkg.in/guregu/null.v4 v4.0.0/go.mod h1:YoQhUrADuG3i9WqesrCmpNRwm1ypAgSHYqoOcTu
gopkg.in/yaml.v3 v3.0.0-20200313102051-9f266ea9e77c/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM=
gopkg.in/yaml.v3 v3.0.1 h1:fxVm/GzAzEWqLHuvctI91KS9hhNmmWOoWu0XTYJS7CA=
gopkg.in/yaml.v3 v3.0.1/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM=
nhooyr.io/websocket v1.8.10 h1:mv4p+MnGrLDcPlBoWsvPP7XCzTYMXP9F9eIGoKbgx7Q=
nhooyr.io/websocket v1.8.10/go.mod h1:rN9OFWIUwuxg4fR5tELlYC04bXYowCP9GX47ivo2l+c=
Loading

0 comments on commit 41e3d85

Please sign in to comment.