Skip to content

Commit

Permalink
Storage impls have a common read API (#281)
Browse files Browse the repository at this point in the history
  • Loading branch information
AlCutter authored Nov 7, 2024
1 parent fe05ce7 commit 8367fdb
Show file tree
Hide file tree
Showing 6 changed files with 40 additions and 25 deletions.
8 changes: 4 additions & 4 deletions cmd/conformance/mysql/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -158,8 +158,8 @@ func configureTilesReadAPI(mux *http.ServeMux, storage *mysql.Storage) {
}
return
}

tile, err := storage.ReadTile(r.Context(), level, index, width)
impliedSize := (index*256 + width) << (level * 8)
tile, err := storage.ReadTile(r.Context(), level, index, impliedSize)
if err != nil {
klog.Errorf("/tile/{level}/{index...}: %v", err)
w.WriteHeader(http.StatusInternalServerError)
Expand All @@ -179,7 +179,7 @@ func configureTilesReadAPI(mux *http.ServeMux, storage *mysql.Storage) {
})

mux.HandleFunc("GET /tile/entries/{index...}", func(w http.ResponseWriter, r *http.Request) {
index, _, err := layout.ParseTileIndexWidth(r.PathValue("index"))
index, width, err := layout.ParseTileIndexWidth(r.PathValue("index"))
if err != nil {
w.WriteHeader(http.StatusBadRequest)
if _, werr := w.Write([]byte(fmt.Sprintf("Malformed URL: %s", err.Error()))); werr != nil {
Expand All @@ -188,7 +188,7 @@ func configureTilesReadAPI(mux *http.ServeMux, storage *mysql.Storage) {
return
}

entryBundle, err := storage.ReadEntryBundle(r.Context(), index)
entryBundle, err := storage.ReadEntryBundle(r.Context(), index, width)
if err != nil {
klog.Errorf("/tile/entries/{index...}: %v", err)
w.WriteHeader(http.StatusInternalServerError)
Expand Down
3 changes: 3 additions & 0 deletions integration/storage_uniformity_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,9 @@ import (

type StorageContract interface {
Add(ctx context.Context, entry *tessera.Entry) tessera.IndexFuture
ReadCheckpoint(ctx context.Context) ([]byte, error)
ReadTile(ctx context.Context, level, index, treeSize uint64) ([]byte, error)
ReadEntryBundle(ctx context.Context, index, treeSize uint64) ([]byte, error)
}

var (
Expand Down
18 changes: 15 additions & 3 deletions storage/gcp/gcp.go
Original file line number Diff line number Diff line change
Expand Up @@ -178,17 +178,29 @@ func (s *Storage) Add(ctx context.Context, e *tessera.Entry) tessera.IndexFuture
return s.queue.Add(ctx, e)
}

// Get returns the requested object.
func (s *Storage) ReadCheckpoint(ctx context.Context) ([]byte, error) {
return s.get(ctx, layout.CheckpointPath)
}

func (s *Storage) ReadTile(ctx context.Context, l, i, sz uint64) ([]byte, error) {
return s.get(ctx, layout.TilePath(l, i, sz))
}

func (s *Storage) ReadEntryBundle(ctx context.Context, i, sz uint64) ([]byte, error) {
return s.get(ctx, layout.EntriesPath(i, sz))
}

// get returns the requested object.
//
// This is indended to be used to proxy read requests through the personality for debug/testing purposes.
func (s *Storage) Get(ctx context.Context, path string) ([]byte, error) {
func (s *Storage) get(ctx context.Context, path string) ([]byte, error) {
d, _, err := s.objStore.getObject(ctx, path)
return d, err
}

// init ensures that the storage represents a log in a valid state.
func (s *Storage) init(ctx context.Context) error {
cpRaw, err := s.Get(ctx, layout.CheckpointPath)
cpRaw, err := s.get(ctx, layout.CheckpointPath)
if err != nil {
if errors.Is(err, gcs.ErrObjectNotExist) {
// No checkpoint exists, do a forced (possibly empty) integration to create one in a safe
Expand Down
2 changes: 1 addition & 1 deletion storage/mysql/mysql.go
Original file line number Diff line number Diff line change
Expand Up @@ -188,7 +188,7 @@ func (s *Storage) writeTile(ctx context.Context, tx *sql.Tx, level, index uint64
// 3. Partial tile request with full/larger partial tile output: Return trimmed partial tile with correct tile width.
// 4. Partial tile request with partial tile (same width) output: Return partial tile.
// 5. Partial tile request with smaller partial tile output: Return error.
func (s *Storage) ReadEntryBundle(ctx context.Context, index uint64) ([]byte, error) {
func (s *Storage) ReadEntryBundle(ctx context.Context, index, treeSize uint64) ([]byte, error) {
row := s.db.QueryRowContext(ctx, selectTiledLeavesSQL, index)
if err := row.Err(); err != nil {
return nil, err
Expand Down
4 changes: 2 additions & 2 deletions storage/mysql/mysql_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -213,7 +213,7 @@ func TestReadMissingEntryBundle(t *testing.T) {
},
} {
t.Run(test.name, func(t *testing.T) {
entryBundle, err := s.ReadEntryBundle(ctx, test.index)
entryBundle, err := s.ReadEntryBundle(ctx, test.index, test.index)
if err != nil {
t.Errorf("got err: %v", err)
}
Expand Down Expand Up @@ -335,7 +335,7 @@ func TestEntryBundleRoundTrip(t *testing.T) {
if err != nil {
t.Errorf("Add got err: %v", err)
}
entryBundleRaw, err := s.ReadEntryBundle(ctx, entryIndex/256)
entryBundleRaw, err := s.ReadEntryBundle(ctx, entryIndex/256, entryIndex)
if err != nil {
t.Errorf("ReadEntryBundle got err: %v", err)
}
Expand Down
30 changes: 15 additions & 15 deletions storage/posix/files.go
Original file line number Diff line number Diff line change
Expand Up @@ -148,8 +148,8 @@ func (s *Storage) Add(ctx context.Context, e *tessera.Entry) tessera.IndexFuture
return s.queue.Add(ctx, e)
}

// GetEntryBundle retrieves the Nth entries bundle for a log of the given size.
func (s *Storage) GetEntryBundle(ctx context.Context, index, logSize uint64) ([]byte, error) {
// ReadEntryBundle retrieves the Nth entries bundle for a log of the given size.
func (s *Storage) ReadEntryBundle(ctx context.Context, index, logSize uint64) ([]byte, error) {
return os.ReadFile(filepath.Join(s.path, s.entriesPath(index, logSize)))
}

Expand Down Expand Up @@ -190,7 +190,7 @@ func (s *Storage) sequenceBatch(ctx context.Context, entries []*tessera.Entry) e
bundleIndex, entriesInBundle := seq/uint64(256), seq%uint64(256)
if entriesInBundle > 0 {
// If the latest bundle is partial, we need to read the data it contains in for our newer, larger, bundle.
part, err := s.GetEntryBundle(ctx, bundleIndex, s.curSize)
part, err := s.ReadEntryBundle(ctx, bundleIndex, s.curSize)
if err != nil {
return err
}
Expand Down Expand Up @@ -254,7 +254,7 @@ func (s *Storage) sequenceBatch(ctx context.Context, entries []*tessera.Entry) e
// doIntegrate handles integrating new entries into the log, and updating the checkpoint.
func (s *Storage) doIntegrate(ctx context.Context, fromSeq uint64, entries []storage.SequencedEntry) error {
tb := storage.NewTreeBuilder(func(ctx context.Context, tileIDs []storage.TileID, treeSize uint64) ([]*api.HashTile, error) {
n, err := s.getTiles(ctx, tileIDs, treeSize)
n, err := s.readTiles(ctx, tileIDs, treeSize)
if err != nil {
return nil, fmt.Errorf("getTiles: %w", err)
}
Expand All @@ -267,7 +267,7 @@ func (s *Storage) doIntegrate(ctx context.Context, fromSeq uint64, entries []sto
return fmt.Errorf("Integrate: %v", err)
}
for k, v := range tiles {
if err := s.StoreTile(ctx, uint64(k.Level), k.Index, newSize, v); err != nil {
if err := s.storeTile(ctx, uint64(k.Level), k.Index, newSize, v); err != nil {
return fmt.Errorf("failed to set tile(%v): %v", k, err)
}
}
Expand All @@ -278,18 +278,18 @@ func (s *Storage) doIntegrate(ctx context.Context, fromSeq uint64, entries []sto
if err != nil {
return fmt.Errorf("newCP: %v", err)
}
if err := WriteCheckpoint(s.path, cpRaw); err != nil {
if err := writeCheckpoint(s.path, cpRaw); err != nil {
return fmt.Errorf("failed to write new checkpoint: %v", err)
}
}

return nil
}

func (s *Storage) getTiles(ctx context.Context, tileIDs []storage.TileID, treeSize uint64) ([]*api.HashTile, error) {
func (s *Storage) readTiles(ctx context.Context, tileIDs []storage.TileID, treeSize uint64) ([]*api.HashTile, error) {
r := make([]*api.HashTile, 0, len(tileIDs))
for _, id := range tileIDs {
t, err := s.GetTile(ctx, id.Level, id.Index, treeSize)
t, err := s.ReadTile(ctx, id.Level, id.Index, treeSize)
if err != nil {
return nil, err
}
Expand All @@ -298,10 +298,10 @@ func (s *Storage) getTiles(ctx context.Context, tileIDs []storage.TileID, treeSi
return r, nil
}

// GetTile returns the tile at the given tile-level and tile-index.
// ReadTile returns the tile at the given tile-level and tile-index.
// If no complete tile exists at that location, it will attempt to find a
// partial tile for the given tree size at that location.
func (s *Storage) GetTile(_ context.Context, level, index, logSize uint64) (*api.HashTile, error) {
func (s *Storage) ReadTile(_ context.Context, level, index, logSize uint64) (*api.HashTile, error) {
p := filepath.Join(s.path, layout.TilePath(level, index, logSize))
t, err := os.ReadFile(p)
if err != nil {
Expand All @@ -319,11 +319,11 @@ func (s *Storage) GetTile(_ context.Context, level, index, logSize uint64) (*api
return &tile, nil
}

// StoreTile writes a tile out to disk.
// storeTile writes a tile out to disk.
// Fully populated tiles are stored at the path corresponding to the level &
// index parameters, partially populated (i.e. right-hand edge) tiles are
// stored with a .xx suffix where xx is the number of "tile leaves" in hex.
func (s *Storage) StoreTile(_ context.Context, level, index, logSize uint64, tile *api.HashTile) error {
func (s *Storage) storeTile(_ context.Context, level, index, logSize uint64, tile *api.HashTile) error {
tileSize := uint64(len(tile.Nodes))
klog.V(2).Infof("StoreTile: level %d index %x ts: %x", level, index, tileSize)
if tileSize == 0 || tileSize > 256 {
Expand Down Expand Up @@ -383,7 +383,7 @@ func (s *Storage) initialise(create bool) error {
if err != nil {
return fmt.Errorf("failed to sign empty checkpoint: %v", err)
}
if err := WriteCheckpoint(s.path, n); err != nil {
if err := writeCheckpoint(s.path, n); err != nil {
return fmt.Errorf("failed to write empty checkpoint: %v", err)
}
}
Expand All @@ -396,8 +396,8 @@ func (s *Storage) initialise(create bool) error {
return nil
}

// WriteCheckpoint stores a raw log checkpoint on disk.
func WriteCheckpoint(path string, newCPRaw []byte) error {
// writeCheckpoint stores a raw log checkpoint on disk.
func writeCheckpoint(path string, newCPRaw []byte) error {
if err := createExclusive(filepath.Join(path, layout.CheckpointPath), newCPRaw); err != nil {
return fmt.Errorf("failed to create checkpoint file: %w", err)
}
Expand Down

0 comments on commit 8367fdb

Please sign in to comment.