Skip to content

Commit

Permalink
add --latest mode for replicate-logs (#36)
Browse files Browse the repository at this point in the history
* add --latest mode for replicate-logs

this will automaticlaly run watch to get the latest changes

* added comentary about use of background context

---------

Co-authored-by: Robin Bryce <robin.bryce@datatrails.ai>
  • Loading branch information
robinbryce and Robin Bryce authored Nov 5, 2024
1 parent 6dbec55 commit 6732ec6
Show file tree
Hide file tree
Showing 2 changed files with 111 additions and 33 deletions.
131 changes: 99 additions & 32 deletions replicatelogs.go
Original file line number Diff line number Diff line change
@@ -1,17 +1,20 @@
package veracity

import (
"bufio"
"bytes"
"context"
"crypto/sha256"
"errors"
"fmt"
"strings"
"sync"
"time"

"github.com/datatrails/go-datatrails-common/cbor"
"github.com/datatrails/go-datatrails-common/logger"
"github.com/datatrails/go-datatrails-merklelog/massifs"
"github.com/datatrails/go-datatrails-merklelog/massifs/watcher"
"github.com/datatrails/go-datatrails-merklelog/mmr"
"github.com/gosuri/uiprogress"
"github.com/urfave/cli/v2"
Expand All @@ -23,6 +26,9 @@ const (
// 429 errors which provide a valid Retry-After header will honor that header rather than use this.
baseDefaultRetryDelay = 2 * time.Second
defaultConcurrency = 5

// The default data retention policy is 2 years, so this is a generous default for "all data".
tenYearsOfHours = 10 * 365 * 24 * time.Hour
)

var (
Expand Down Expand Up @@ -90,6 +96,11 @@ changes are read from standard input.`,
Value: false,
Aliases: []string{"p"},
},
&cli.BoolFlag{
Name: "latest",
Usage: `find the latest changes automaticaly. When --latest is set, a list of tenants can be provided to --tenant to limit the tenant logs to be replicated.`,
Value: false,
},
&cli.IntFlag{
Name: "retries",
Aliases: []string{"r"},
Expand Down Expand Up @@ -119,7 +130,9 @@ By default transient errors are re-tried without limit, and if the error is 429,
return err
}

changes, err := readTenantMassifChanges(cCtx)
// There isn't really a better context. We could implement user
// defined timeouts for "lights out/ci" use cases in future. Humans can ctrl-c
changes, err := readTenantMassifChanges(context.Background(), cCtx, cmd)
if err != nil {
return err
}
Expand Down Expand Up @@ -164,6 +177,8 @@ func replicateChanges(cCtx *cli.Context, cmd *CmdCtx, changes []TenantMassif, pr
return
}

// There isn't really a better context. We could implement user
// defined timeouts for "lights out/ci" use cases in future. Humans can ctrl-c
err = replicator.ReplicateVerifiedUpdates(
context.Background(),
change.Tenant, startMassif, endMassif,
Expand Down Expand Up @@ -239,37 +254,6 @@ func newProgressor(cCtx *cli.Context, barName string, increments int) Progresser
return NewStagedProgress(barName, increments)
}

func readTenantMassifChanges(cCtx *cli.Context) ([]TenantMassif, error) {
changesFile := cCtx.String("changes")
tenantIdentity := cCtx.String("tenant")
changedMassifIndex := cCtx.Int("massif")
// Note: we could use GetHeadMassif to provide a default for --massif. But
// that issues a list blobs query, and those are 10x more expensive. We have
// aranged it so that replicate-logs does not issue *any* list blobs,
// and so can reasonably be run in parallel. The watch command provides the
// latest massif index, and the output of the watch command is the expected
// source of the options to this command.

var err error
var changes []TenantMassif

if changesFile != "" {
if tenantIdentity != "" || changedMassifIndex != 0 {
return nil, ErrChangesFlagIsExclusive
}
return filePathToTenantMassifs(changesFile)
}

changes = []TenantMassif{{Tenant: tenantIdentity, Massif: changedMassifIndex}}
if tenantIdentity == "" && changedMassifIndex == 0 {
changes, err = stdinToDecodedTenantMassifs()
if err != nil {
return nil, err
}
}
return changes, nil
}

type VerifiedContextReader interface {
massifs.VerifiedContextReader
}
Expand Down Expand Up @@ -624,3 +608,86 @@ func peakBaggedRoot(state massifs.MMRState) []byte {
}
return mmr.HashPeaksRHS(sha256.New(), state.Peaks)
}

type changeCollector struct {
log logger.Logger
watchOutput string
}

func (c *changeCollector) Logf(msg string, args ...any) {
if c.log == nil {
return
}
c.log.Infof(msg, args...)
}

func (c *changeCollector) Outf(msg string, args ...any) {
c.watchOutput += fmt.Sprintf(msg, args...)
}

func newWatchConfig(cCtx *cli.Context, cmd *CmdCtx) (WatchConfig, error) {
cfg := WatchConfig{
WatchCount: 1,
WatchConfig: watcher.WatchConfig{
Horizon: tenYearsOfHours,
},
}
err := watcher.ConfigDefaults(&cfg.WatchConfig)
if err != nil {
return WatchConfig{}, err
}
cfg.ReaderURL = cmd.readerURL

tenants := CtxGetTenantOptions(cCtx)
if len(tenants) == 0 {
return cfg, nil
}
cfg.WatchTenants = make(map[string]bool)
for _, t := range tenants {
cfg.WatchTenants[strings.TrimPrefix(t, tenantPrefix)] = true
}
return cfg, nil
}

func readTenantMassifChanges(ctx context.Context, cCtx *cli.Context, cmd *CmdCtx) ([]TenantMassif, error) {

if cCtx.IsSet("latest") {
// This is because people get tripped up with the `veracity watch -z 90000h | veracity replicate-logs` idiom,
// Its such a common use case that we should just make it work.
cfg, err := newWatchConfig(cCtx, cmd)
if err != nil {
return nil, err
}
forceProdUrl := cCtx.String("data-url") == ""

reader, err := cfgReader(cmd, cCtx, forceProdUrl)
if err != nil {
return nil, err
}

collector := &changeCollector{log: cmd.log}
err = WatchForChanges(ctx, cfg, reader, collector)
if err != nil {
return nil, err
}

return scannerToTenantMassifs(bufio.NewScanner(strings.NewReader(collector.watchOutput)))
}

tenants := CtxGetTenantOptions(cCtx)
if len(tenants) == 1 {
return []TenantMassif{{Tenant: tenants[0], Massif: cCtx.Int("massif")}}, nil
}
if len(tenants) > 1 {
return nil, fmt.Errorf("multiple tenants may only be used with --latest")
}

// If --changes is set the tenants and massif indices are read from the identified file
changesFile := cCtx.String("changes")
if changesFile != "" {
return filePathToTenantMassifs(changesFile)
}

// No explicit config and --all not set, read from stdin
return stdinToDecodedTenantMassifs()
}
13 changes: 12 additions & 1 deletion tests/systemtest/test.sh
Original file line number Diff line number Diff line change
Expand Up @@ -71,7 +71,7 @@ testVeracityWatchPublicFindsActivity() {
assertContains "watch-public should find activity" "$output" "$PROD_PUBLIC_TENANT_ID"
}

testVeracityReplicateLogsPublicTenant() {
testVeracityReplicateLogsPublicTenantWatchPipe() {
local output

rm -rf $TEST_TMPDIR/merkelogs
Expand All @@ -84,6 +84,17 @@ testVeracityReplicateLogsPublicTenant() {
assertEquals "should replicate one massif and one seal" "2" "$COUNT"
}

testVeracityReplicateLogsPublicTenantWatchLatestFlag() {
local output

rm -rf $TEST_TMPDIR/merkelogs
output=$($VERACITY_INSTALL --data-url $DATATRAILS_URL/verifiabledata --tenant=$PROD_PUBLIC_TENANT_ID replicate-logs --latest --ancestors=0 --replicadir=$TEST_TMPDIR/merkelogs)
assertEquals "replicate-logs --latest should return a 0 exit code" 0 $?

COUNT=$(find $TEST_TMPDIR/merkelogs -type f | wc -l | tr -d ' ')
assertEquals "should replicate one massif and one seal" "2" "$COUNT"
}

testVerifySingleEvent() {
# Check if the response status code is 200
local response
Expand Down

0 comments on commit 6732ec6

Please sign in to comment.