From 8367fdbdd89ea4d3b52bca822d2439576169560c Mon Sep 17 00:00:00 2001 From: Al Cutter Date: Thu, 7 Nov 2024 10:59:48 +0000 Subject: [PATCH] Storage impls have a common read API (#281) --- cmd/conformance/mysql/main.go | 8 +++---- integration/storage_uniformity_test.go | 3 +++ storage/gcp/gcp.go | 18 +++++++++++++--- storage/mysql/mysql.go | 2 +- storage/mysql/mysql_test.go | 4 ++-- storage/posix/files.go | 30 +++++++++++++------------- 6 files changed, 40 insertions(+), 25 deletions(-) diff --git a/cmd/conformance/mysql/main.go b/cmd/conformance/mysql/main.go index 3bcd5040..9c91f3e4 100644 --- a/cmd/conformance/mysql/main.go +++ b/cmd/conformance/mysql/main.go @@ -158,8 +158,8 @@ func configureTilesReadAPI(mux *http.ServeMux, storage *mysql.Storage) { } return } - - tile, err := storage.ReadTile(r.Context(), level, index, width) + impliedSize := (index*256 + width) << (level * 8) + tile, err := storage.ReadTile(r.Context(), level, index, impliedSize) if err != nil { klog.Errorf("/tile/{level}/{index...}: %v", err) w.WriteHeader(http.StatusInternalServerError) @@ -179,7 +179,7 @@ func configureTilesReadAPI(mux *http.ServeMux, storage *mysql.Storage) { }) mux.HandleFunc("GET /tile/entries/{index...}", func(w http.ResponseWriter, r *http.Request) { - index, _, err := layout.ParseTileIndexWidth(r.PathValue("index")) + index, width, err := layout.ParseTileIndexWidth(r.PathValue("index")) if err != nil { w.WriteHeader(http.StatusBadRequest) if _, werr := w.Write([]byte(fmt.Sprintf("Malformed URL: %s", err.Error()))); werr != nil { @@ -188,7 +188,7 @@ func configureTilesReadAPI(mux *http.ServeMux, storage *mysql.Storage) { return } - entryBundle, err := storage.ReadEntryBundle(r.Context(), index) + entryBundle, err := storage.ReadEntryBundle(r.Context(), index, width) if err != nil { klog.Errorf("/tile/entries/{index...}: %v", err) w.WriteHeader(http.StatusInternalServerError) diff --git a/integration/storage_uniformity_test.go b/integration/storage_uniformity_test.go index fa59d50a..3619cf78 100644 --- a/integration/storage_uniformity_test.go +++ b/integration/storage_uniformity_test.go @@ -25,6 +25,9 @@ import ( type StorageContract interface { Add(ctx context.Context, entry *tessera.Entry) tessera.IndexFuture + ReadCheckpoint(ctx context.Context) ([]byte, error) + ReadTile(ctx context.Context, level, index, treeSize uint64) ([]byte, error) + ReadEntryBundle(ctx context.Context, index, treeSize uint64) ([]byte, error) } var ( diff --git a/storage/gcp/gcp.go b/storage/gcp/gcp.go index dc043a71..86ba444c 100644 --- a/storage/gcp/gcp.go +++ b/storage/gcp/gcp.go @@ -178,17 +178,29 @@ func (s *Storage) Add(ctx context.Context, e *tessera.Entry) tessera.IndexFuture return s.queue.Add(ctx, e) } -// Get returns the requested object. +func (s *Storage) ReadCheckpoint(ctx context.Context) ([]byte, error) { + return s.get(ctx, layout.CheckpointPath) +} + +func (s *Storage) ReadTile(ctx context.Context, l, i, sz uint64) ([]byte, error) { + return s.get(ctx, layout.TilePath(l, i, sz)) +} + +func (s *Storage) ReadEntryBundle(ctx context.Context, i, sz uint64) ([]byte, error) { + return s.get(ctx, layout.EntriesPath(i, sz)) +} + +// get returns the requested object. // // This is indended to be used to proxy read requests through the personality for debug/testing purposes. -func (s *Storage) Get(ctx context.Context, path string) ([]byte, error) { +func (s *Storage) get(ctx context.Context, path string) ([]byte, error) { d, _, err := s.objStore.getObject(ctx, path) return d, err } // init ensures that the storage represents a log in a valid state. func (s *Storage) init(ctx context.Context) error { - cpRaw, err := s.Get(ctx, layout.CheckpointPath) + cpRaw, err := s.get(ctx, layout.CheckpointPath) if err != nil { if errors.Is(err, gcs.ErrObjectNotExist) { // No checkpoint exists, do a forced (possibly empty) integration to create one in a safe diff --git a/storage/mysql/mysql.go b/storage/mysql/mysql.go index 67747303..9c09c0b5 100644 --- a/storage/mysql/mysql.go +++ b/storage/mysql/mysql.go @@ -188,7 +188,7 @@ func (s *Storage) writeTile(ctx context.Context, tx *sql.Tx, level, index uint64 // 3. Partial tile request with full/larger partial tile output: Return trimmed partial tile with correct tile width. // 4. Partial tile request with partial tile (same width) output: Return partial tile. // 5. Partial tile request with smaller partial tile output: Return error. -func (s *Storage) ReadEntryBundle(ctx context.Context, index uint64) ([]byte, error) { +func (s *Storage) ReadEntryBundle(ctx context.Context, index, treeSize uint64) ([]byte, error) { row := s.db.QueryRowContext(ctx, selectTiledLeavesSQL, index) if err := row.Err(); err != nil { return nil, err diff --git a/storage/mysql/mysql_test.go b/storage/mysql/mysql_test.go index a52ce212..ea9643d6 100644 --- a/storage/mysql/mysql_test.go +++ b/storage/mysql/mysql_test.go @@ -213,7 +213,7 @@ func TestReadMissingEntryBundle(t *testing.T) { }, } { t.Run(test.name, func(t *testing.T) { - entryBundle, err := s.ReadEntryBundle(ctx, test.index) + entryBundle, err := s.ReadEntryBundle(ctx, test.index, test.index) if err != nil { t.Errorf("got err: %v", err) } @@ -335,7 +335,7 @@ func TestEntryBundleRoundTrip(t *testing.T) { if err != nil { t.Errorf("Add got err: %v", err) } - entryBundleRaw, err := s.ReadEntryBundle(ctx, entryIndex/256) + entryBundleRaw, err := s.ReadEntryBundle(ctx, entryIndex/256, entryIndex) if err != nil { t.Errorf("ReadEntryBundle got err: %v", err) } diff --git a/storage/posix/files.go b/storage/posix/files.go index 034f40ef..a6499df4 100644 --- a/storage/posix/files.go +++ b/storage/posix/files.go @@ -148,8 +148,8 @@ func (s *Storage) Add(ctx context.Context, e *tessera.Entry) tessera.IndexFuture return s.queue.Add(ctx, e) } -// GetEntryBundle retrieves the Nth entries bundle for a log of the given size. -func (s *Storage) GetEntryBundle(ctx context.Context, index, logSize uint64) ([]byte, error) { +// ReadEntryBundle retrieves the Nth entries bundle for a log of the given size. +func (s *Storage) ReadEntryBundle(ctx context.Context, index, logSize uint64) ([]byte, error) { return os.ReadFile(filepath.Join(s.path, s.entriesPath(index, logSize))) } @@ -190,7 +190,7 @@ func (s *Storage) sequenceBatch(ctx context.Context, entries []*tessera.Entry) e bundleIndex, entriesInBundle := seq/uint64(256), seq%uint64(256) if entriesInBundle > 0 { // If the latest bundle is partial, we need to read the data it contains in for our newer, larger, bundle. - part, err := s.GetEntryBundle(ctx, bundleIndex, s.curSize) + part, err := s.ReadEntryBundle(ctx, bundleIndex, s.curSize) if err != nil { return err } @@ -254,7 +254,7 @@ func (s *Storage) sequenceBatch(ctx context.Context, entries []*tessera.Entry) e // doIntegrate handles integrating new entries into the log, and updating the checkpoint. func (s *Storage) doIntegrate(ctx context.Context, fromSeq uint64, entries []storage.SequencedEntry) error { tb := storage.NewTreeBuilder(func(ctx context.Context, tileIDs []storage.TileID, treeSize uint64) ([]*api.HashTile, error) { - n, err := s.getTiles(ctx, tileIDs, treeSize) + n, err := s.readTiles(ctx, tileIDs, treeSize) if err != nil { return nil, fmt.Errorf("getTiles: %w", err) } @@ -267,7 +267,7 @@ func (s *Storage) doIntegrate(ctx context.Context, fromSeq uint64, entries []sto return fmt.Errorf("Integrate: %v", err) } for k, v := range tiles { - if err := s.StoreTile(ctx, uint64(k.Level), k.Index, newSize, v); err != nil { + if err := s.storeTile(ctx, uint64(k.Level), k.Index, newSize, v); err != nil { return fmt.Errorf("failed to set tile(%v): %v", k, err) } } @@ -278,7 +278,7 @@ func (s *Storage) doIntegrate(ctx context.Context, fromSeq uint64, entries []sto if err != nil { return fmt.Errorf("newCP: %v", err) } - if err := WriteCheckpoint(s.path, cpRaw); err != nil { + if err := writeCheckpoint(s.path, cpRaw); err != nil { return fmt.Errorf("failed to write new checkpoint: %v", err) } } @@ -286,10 +286,10 @@ func (s *Storage) doIntegrate(ctx context.Context, fromSeq uint64, entries []sto return nil } -func (s *Storage) getTiles(ctx context.Context, tileIDs []storage.TileID, treeSize uint64) ([]*api.HashTile, error) { +func (s *Storage) readTiles(ctx context.Context, tileIDs []storage.TileID, treeSize uint64) ([]*api.HashTile, error) { r := make([]*api.HashTile, 0, len(tileIDs)) for _, id := range tileIDs { - t, err := s.GetTile(ctx, id.Level, id.Index, treeSize) + t, err := s.ReadTile(ctx, id.Level, id.Index, treeSize) if err != nil { return nil, err } @@ -298,10 +298,10 @@ func (s *Storage) getTiles(ctx context.Context, tileIDs []storage.TileID, treeSi return r, nil } -// GetTile returns the tile at the given tile-level and tile-index. +// ReadTile returns the tile at the given tile-level and tile-index. // If no complete tile exists at that location, it will attempt to find a // partial tile for the given tree size at that location. -func (s *Storage) GetTile(_ context.Context, level, index, logSize uint64) (*api.HashTile, error) { +func (s *Storage) ReadTile(_ context.Context, level, index, logSize uint64) (*api.HashTile, error) { p := filepath.Join(s.path, layout.TilePath(level, index, logSize)) t, err := os.ReadFile(p) if err != nil { @@ -319,11 +319,11 @@ func (s *Storage) GetTile(_ context.Context, level, index, logSize uint64) (*api return &tile, nil } -// StoreTile writes a tile out to disk. +// storeTile writes a tile out to disk. // Fully populated tiles are stored at the path corresponding to the level & // index parameters, partially populated (i.e. right-hand edge) tiles are // stored with a .xx suffix where xx is the number of "tile leaves" in hex. -func (s *Storage) StoreTile(_ context.Context, level, index, logSize uint64, tile *api.HashTile) error { +func (s *Storage) storeTile(_ context.Context, level, index, logSize uint64, tile *api.HashTile) error { tileSize := uint64(len(tile.Nodes)) klog.V(2).Infof("StoreTile: level %d index %x ts: %x", level, index, tileSize) if tileSize == 0 || tileSize > 256 { @@ -383,7 +383,7 @@ func (s *Storage) initialise(create bool) error { if err != nil { return fmt.Errorf("failed to sign empty checkpoint: %v", err) } - if err := WriteCheckpoint(s.path, n); err != nil { + if err := writeCheckpoint(s.path, n); err != nil { return fmt.Errorf("failed to write empty checkpoint: %v", err) } } @@ -396,8 +396,8 @@ func (s *Storage) initialise(create bool) error { return nil } -// WriteCheckpoint stores a raw log checkpoint on disk. -func WriteCheckpoint(path string, newCPRaw []byte) error { +// writeCheckpoint stores a raw log checkpoint on disk. +func writeCheckpoint(path string, newCPRaw []byte) error { if err := createExclusive(filepath.Join(path, layout.CheckpointPath), newCPRaw); err != nil { return fmt.Errorf("failed to create checkpoint file: %w", err) }