Skip to content

Commit

Permalink
feat(note): add block cache
Browse files Browse the repository at this point in the history
  • Loading branch information
cancue committed May 7, 2024
1 parent ab73742 commit 9bd2842
Show file tree
Hide file tree
Showing 5 changed files with 159 additions and 33 deletions.
43 changes: 31 additions & 12 deletions pkg/editor/pool/block.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,12 +16,12 @@ import (

type SynchronizedBlock struct {
sync.Mutex
blockPath string
fname string
block *block.Block
changed bool
toSyncFile bool
toReadFile bool
blockPath string
fname string
block *block.Block
changed bool
toFile bool
fromFile bool
}

func NewEmptySynchronizedBlock(blockPath string) (*SynchronizedBlock, error) {
Expand All @@ -32,19 +32,38 @@ func NewEmptySynchronizedBlock(blockPath string) (*SynchronizedBlock, error) {
return &SynchronizedBlock{blockPath: blockPath}, nil
}

func NewUnSynchronizedBlock(blockPath string, block *block.Block) (*SynchronizedBlock, error) {
err := os.MkdirAll(blockPath, 0777)
if err != nil {
return nil, err
}
sb, err := NewEmptySynchronizedBlock(blockPath)
if err != nil {
return nil, err
}
sb.block = block
sb.toFile = true
return sb, nil
}

func (sb *SynchronizedBlock) Sync(db *sql.DB, note *Note) (*block.Block, error) {
sb.Lock()
defer sb.Unlock()

if !sb.toReadFile {
if !sb.fromFile {
err := sb.write()
if err != nil {
return nil, err
}
sb.toSyncFile = false
sb.toFile = false
return nil, nil
}

err := local.InsertBlock(db, &note.ID, sb.block)
if err != nil {
return nil, err
}

fname, err := latestFileName(sb.blockPath)
if err != nil {
return nil, err
Expand All @@ -57,8 +76,8 @@ func (sb *SynchronizedBlock) Sync(db *sql.DB, note *Note) (*block.Block, error)
return nil, err
}
}
sb.toReadFile = false
sb.toSyncFile = false
sb.fromFile = false
sb.toFile = false
sb.changed = false
return nil, nil
}
Expand All @@ -79,8 +98,8 @@ func (sb *SynchronizedBlock) Sync(db *sql.DB, note *Note) (*block.Block, error)
if err != nil {
return nil, err
}
sb.toReadFile = false
sb.toSyncFile = false
sb.fromFile = false
sb.toFile = false
sb.changed = false
}

Expand Down
29 changes: 16 additions & 13 deletions pkg/editor/pool/note.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,12 +38,12 @@ type Note struct {
}
type Blocks map[common.BlockID]*SynchronizedBlock

func NewNote(path string, replicaID common.ReplicaID, interval time.Duration) (*Note, error) {
func NewNote(db *sql.DB, replicaID common.ReplicaID, path string, interval time.Duration) (*Note, error) {
noteID, err := readNoteID(path)
if err != nil {
return nil, err
}
err = nav.NewNoteBlockIfNeeded(path, noteID.String())
err = nav.NewNoteBlockIfNeeded(db, path, noteID.String())
if err != nil {
return nil, err
}
Expand Down Expand Up @@ -93,30 +93,30 @@ func (note *Note) Contribute(ctx context.Context, bytes []byte) error {
return err
}
db := ctx.Value(identifier.DB).(*sql.DB)
blockIDs, err := local.Insert(db, &note.ID, ctrbs)
blockIDs, err := local.InsertCTRBs(db, &note.ID, ctrbs)
if err != nil {
return err
}

updated := make(map[common.BlockID]bool)

for _, ctrb := range ctrbs {
sb, ok := note.blocks[ctrb.BlockID]
if !ok {
blockPath := filepath.Join(note.path, "blocks", ctrb.BlockID.String())
err := os.MkdirAll(blockPath, 0777)
if err != nil {
return err
}
sb, err = NewEmptySynchronizedBlock(blockPath)
sb, err = NewUnSynchronizedBlock(blockPath, ctrb.Operations.BINS)
if err != nil {
return err
}
sb.block = ctrb.Operations.BINS
note.blocks[ctrb.BlockID] = sb
}
err := sb.Apply(*ctrb)
if err != nil {
return err
}
updated[ctrb.BlockID] = true
}

return note.flagSync(ctx, blockIDs, false)
}

Expand All @@ -138,10 +138,10 @@ func (note *Note) flagSync(ctx context.Context, blockIDs []*uuid.UUID, isFS bool
}
note.blocks[*blockID] = sb
}
if isFS && !sb.toReadFile {
sb.toReadFile = true
if isFS && !sb.fromFile {
sb.fromFile = true
}
sb.toSyncFile = true
sb.toFile = true
}

note.wg.Add(note.dt.Add(func() {
Expand All @@ -154,7 +154,7 @@ func (note *Note) flagSync(ctx context.Context, blockIDs []*uuid.UUID, isFS bool
func (note *Note) sync(ctx context.Context) {
db := ctx.Value(identifier.DB).(*sql.DB)
for blockID, sb := range note.blocks {
if !sb.toSyncFile {
if !sb.toFile && !sb.fromFile {
continue
}
block, err := sb.Sync(db, note)
Expand Down Expand Up @@ -206,6 +206,7 @@ func readBlocks(path string) (Blocks, error) {
if err != nil {
return nil, err
}

var wg sync.WaitGroup
var mutex sync.Mutex
blocks := make(Blocks)
Expand All @@ -224,11 +225,13 @@ func readBlocks(path string) (Blocks, error) {
}
sb.fname = fname
sb.block = block
sb.fromFile = true
mutex.Lock()
blocks[sb.block.BlockID] = sb
mutex.Unlock()
}(bid)
}
wg.Wait()

return blocks, nil
}
28 changes: 24 additions & 4 deletions pkg/editor/pool/pool.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import (
"context"
"database/sql"
"errors"
"path/filepath"
"strings"
"sync"
"time"
Expand All @@ -15,6 +16,7 @@ import (
"github.com/notebox/nb-crdt-go/common"
"github.com/notebox/nbfm/pkg/config"
"github.com/notebox/nbfm/pkg/identifier"
local "github.com/notebox/nbfm/pkg/local/note"
)

type Path = string
Expand Down Expand Up @@ -42,20 +44,38 @@ func (pool *Pool) Open(ctx context.Context, path string) (*Note, error) {
if ok {
note.wg.Add(1)
} else {
note, err = NewNote(path, ctx.Value(identifier.ReplicaID).(uint32), pool.syncInterval)
db := ctx.Value(identifier.DB).(*sql.DB)
replicaID := ctx.Value(identifier.ReplicaID).(common.ReplicaID)
note, err = NewNote(db, replicaID, path, pool.syncInterval)
if err != nil {
return nil, err
}
pool.notes[path] = note

note.wg = new(sync.WaitGroup)
note.wg.Add(1)
for _, sb := range note.blocks {
sb.toSyncFile, err = sb.update(ctx.Value(identifier.DB).(*sql.DB), note)

cached, err := local.SelectBlocks(db, &note.ID)
if err != nil {
return nil, err
}
blocksPath := filepath.Join(path, "blocks")
for _, b := range cached {
sb, ok := note.blocks[b.BlockID]
if ok {
sb.toFile, err = sb.update(db, note)
if err != nil {
return nil, err
}
continue
}
sb, err = NewUnSynchronizedBlock(filepath.Join(blocksPath, b.BlockID.String()), b)
if err != nil {
return nil, err
}
note.blocks[b.BlockID] = sb
}

note.wg.Add(1)
note.sync(ctx)

Expand All @@ -68,7 +88,7 @@ func (pool *Pool) Open(ctx context.Context, path string) (*Note, error) {
}()

go pool.watch(ctx, note)
err := note.watcher.AddRecursive(note.path)
err = note.watcher.AddRecursive(note.path)
if err != nil {
return nil, err
}
Expand Down
68 changes: 67 additions & 1 deletion pkg/local/note/ctrb.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,13 @@ func Prepare(db *sql.DB, path string) error {
);
CREATE INDEX IF NOT EXISTS idx_nb_note_ctrbs_ids ON nb_note_ctrbs (note_id, block_id);
CREATE INDEX IF NOT EXISTS idx_nb_note_ctrbs_block_nonce ON nb_note_ctrbs (block_nonce);
CREATE TABLE IF NOT EXISTS nb_note_blocks (
note_id TEXT NOT NULL,
block_id TEXT NOT NULL,
data BLOB NOT NULL,
UNIQUE (note_id, block_id)
);
CREATE INDEX IF NOT EXISTS idx_nb_note_blocks_ids ON nb_note_blocks (note_id, block_id);
`)
return err
}
Expand All @@ -48,7 +55,7 @@ func isInstalled(db *sql.DB) (bool, error) {
return rows.Next(), nil
}

func Insert(db *sql.DB, noteID *uuid.UUID, contributions []*block.Contribution) ([]*uuid.UUID, error) {
func InsertCTRBs(db *sql.DB, noteID *uuid.UUID, contributions []*block.Contribution) ([]*uuid.UUID, error) {
var blockIDs []*uuid.UUID
q := "INSERT INTO nb_note_ctrbs (note_id, block_id, block_nonce, text_nonce, replica_id, timestamp, ops) VALUES"
v := []any{}
Expand All @@ -72,6 +79,65 @@ func Insert(db *sql.DB, noteID *uuid.UUID, contributions []*block.Contribution)
return blockIDs, err
}

func InsertBlock(db *sql.DB, noteID *uuid.UUID, block *block.Block) error {
data, err := json.Marshal(block)
if err != nil {
return err
}
_, err = db.Exec("INSERT OR REPLACE INTO nb_note_blocks (note_id, block_id, data) VALUES (?, ?, ?)", noteID, block.BlockID, data)
return err
}

func SelectBlockData(db *sql.DB, noteID *uuid.UUID, blockID *uuid.UUID) ([]byte, error) {
rows, err := db.Query("SELECT data FROM nb_note_blocks WHERE note_id = ? AND block_id LIMIT 1", noteID, blockID)
if err != nil {
if err == sql.ErrNoRows {
return nil, nil
}
return nil, err
}
defer rows.Close()

for rows.Next() {
var data []byte
err := rows.Scan(&data)
if err != nil {
return nil, err
}
return data, nil
}
return nil, nil
}

func SelectBlocks(db *sql.DB, noteID *uuid.UUID) ([]*block.Block, error) {
var blocks []*block.Block

rows, err := db.Query("SELECT data FROM nb_note_blocks WHERE note_id = ?", noteID)
if err != nil {
if err == sql.ErrNoRows {
return blocks, nil
}
return nil, err
}
defer rows.Close()

for rows.Next() {
var data []byte
err := rows.Scan(&data)
if err != nil {
return nil, err
}
var b block.Block
err = json.Unmarshal(data, &b)
if err != nil {
return nil, err
}
blocks = append(blocks, &b)
}

return blocks, nil
}

func SelectAllAfter(db *sql.DB, replicaID uint32, noteID *uuid.UUID, blockID *uuid.UUID, blockNonce common.Nonce, textNonce common.Nonce) ([]*block.Contribution, error) {
rows, err := db.Query("SELECT block_id, block_nonce, text_nonce, replica_id, timestamp, ops FROM nb_note_ctrbs WHERE replica_id = ? AND note_id = ? AND block_id = ? AND (block_nonce > ? OR (block_nonce = ? AND text_nonce > ?)) ORDER BY block_nonce ASC", replicaID, noteID, blockID, blockNonce, blockNonce, textNonce)
if err != nil {
Expand Down
24 changes: 21 additions & 3 deletions pkg/nav/exec.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package nav

import (
"database/sql"
"fmt"
"os"
"path/filepath"
Expand All @@ -9,6 +10,7 @@ import (

"github.com/google/uuid"
"github.com/notebox/nbfm/pkg/config"
local "github.com/notebox/nbfm/pkg/local/note"
)

func DeleteFile(path string) error {
Expand All @@ -27,7 +29,7 @@ func AddFile(path string) error {
return err
}
noteIDStr := noteID.String()
err = NewNoteBlockIfNeeded(path, noteIDStr)
err = NewNoteBlockIfNeeded(nil, path, noteIDStr)
if err != nil {
return err
}
Expand Down Expand Up @@ -65,7 +67,7 @@ func MoveFile(src, dst string) error {
return sysMoveFile(src, dst)
}

func NewNoteBlockIfNeeded(path, noteIDStr string) error {
func NewNoteBlockIfNeeded(db *sql.DB, path, noteIDStr string) error {
noteBlockPath := filepath.Join(path, "blocks", noteIDStr, NewBlockFileName())
dirPath := filepath.Dir(noteBlockPath)
if _, err := os.Stat(dirPath); !os.IsNotExist(err) {
Expand All @@ -76,7 +78,23 @@ func NewNoteBlockIfNeeded(path, noteIDStr string) error {
if err != nil {
return err
}
return os.WriteFile(noteBlockPath, []byte(fmt.Sprintf(`["%s",{},[[0,0,1]],{"TYPE":[null,"NOTE"]},false,[]]`, noteIDStr)), 0777)

noteID, err := uuid.Parse(noteIDStr)
if err != nil {
return err
}
var data []byte
if db != nil {
data, err = local.SelectBlockData(db, &noteID, &noteID)
if err != nil {
return err
}
}
if data == nil {
data = []byte(fmt.Sprintf(`["%s",{},[[0,0,1]],{"TYPE":[null,"NOTE"]},false,[]]`, noteIDStr))
}

return os.WriteFile(noteBlockPath, data, 0777)
}

func NewBlockFileName() string {
Expand Down

0 comments on commit 9bd2842

Please sign in to comment.