Skip to content

Commit

Permalink
[Exec] more comprehensive exec node tracing (#578)
Browse files Browse the repository at this point in the history
* remove nil checkings

* add tracking for chunk data pack generation

* update storage tracers

* name fix

* hot fix

Co-authored-by: Kan Zhang <kan@axiomzen.co>
  • Loading branch information
ramtinms and Kay-Zee authored Mar 29, 2021
1 parent 5ad176e commit b71dec4
Show file tree
Hide file tree
Showing 12 changed files with 100 additions and 89 deletions.
86 changes: 36 additions & 50 deletions engine/execution/computation/computer/computer.go
Original file line number Diff line number Diff line change
Expand Up @@ -74,16 +74,15 @@ func (e *blockComputer) ExecuteBlock(
program *programs.Programs,
) (*execution.ComputationResult, error) {

if e.tracer != nil {
span, _ := e.tracer.StartSpanFromContext(ctx, trace.EXEComputeBlock)
defer func() {
span.SetTag("block.collectioncount", len(block.CompleteCollections))
span.LogFields(
log.String("block.hash", block.ID().String()),
)
span.Finish()
}()
}
// call tracer
span, _ := e.tracer.StartSpanFromContext(ctx, trace.EXEComputeBlock)
defer func() {
span.SetTag("block.collectioncount", len(block.CompleteCollections))
span.LogFields(
log.String("block.hash", block.ID().String()),
)
span.Finish()
}()

results, err := e.executeBlock(ctx, block, stateView, program)
if err != nil {
Expand Down Expand Up @@ -152,10 +151,8 @@ func (e *blockComputer) executeBlock(
e.log.Debug().Hex("block_id", logging.Entity(block)).Msg("executing system chunk")

var colSpan opentracing.Span
if e.tracer != nil {
colSpan, _ = e.tracer.StartSpanFromContext(ctx, trace.EXEComputeSystemCollection)
defer colSpan.Finish()
}
colSpan, _ = e.tracer.StartSpanFromContext(ctx, trace.EXEComputeSystemCollection)
defer colSpan.Finish()

serviceAddress := e.vmCtx.Chain.ServiceAddress()

Expand Down Expand Up @@ -199,18 +196,17 @@ func (e *blockComputer) executeCollection(
collection *entity.CompleteCollection,
) ([]flow.Event, []flow.Event, []flow.TransactionResult, uint32, uint64, error) {

// call tracing
startedAt := time.Now()
var colSpan opentracing.Span
if e.tracer != nil {
colSpan, _ = e.tracer.StartSpanFromContext(ctx, trace.EXEComputeCollection)
defer func() {
colSpan.SetTag("collection.txcount", len(collection.Transactions))
colSpan.LogFields(
log.String("collection.hash", collection.Guarantee.CollectionID.String()),
)
colSpan.Finish()
}()
}
colSpan, _ = e.tracer.StartSpanFromContext(ctx, trace.EXEComputeCollection)
defer func() {
colSpan.SetTag("collection.txCount", len(collection.Transactions))
colSpan.LogFields(
log.String("collection.hash", collection.Guarantee.CollectionID.String()),
)
colSpan.Finish()
}()

var (
events []flow.Event
Expand Down Expand Up @@ -261,37 +257,23 @@ func (e *blockComputer) executeTransaction(
txIndex uint32,
) ([]flow.Event, []flow.Event, flow.TransactionResult, uint64, error) {

startedAt := time.Now()
var txSpan opentracing.Span
var traceID string
// call tracing
txSpan = e.tracer.StartSpanFromParent(colSpan, trace.EXEComputeTransaction)

if e.tracer != nil {
txSpan = e.tracer.StartSpanFromParent(colSpan, trace.EXEComputeTransaction)

if sc, ok := txSpan.Context().(jaeger.SpanContext); ok {
traceID = sc.TraceID().String()
}

defer func() {
// Attach runtime metrics to the transaction span.
//
// Each duration is the sum of all sub-programs in the transaction.
//
// For example, metrics.Parsed() returns the total time spent parsing the transaction itself,
// as well as any imported programs.
txSpan.SetTag("transaction.proposer", txBody.ProposalKey.Address.String())
txSpan.SetTag("transaction.payer", txBody.Payer.String())
txSpan.LogFields(
log.String("transaction.ID", txBody.ID().String()),
log.Int64(trace.EXEParseDurationTag, int64(txMetrics.Parsed())),
log.Int64(trace.EXECheckDurationTag, int64(txMetrics.Checked())),
log.Int64(trace.EXEInterpretDurationTag, int64(txMetrics.Interpreted())),
log.Int64(trace.EXEValueEncodingDurationTag, int64(txMetrics.ValueEncoded())),
log.Int64(trace.EXEValueDecodingDurationTag, int64(txMetrics.ValueDecoded())),
)
txSpan.Finish()
}()
if sc, ok := txSpan.Context().(jaeger.SpanContext); ok {
traceID = sc.TraceID().String()
}

defer func() {
txSpan.LogFields(
log.String("transaction.ID", txBody.ID().String()),
)
txSpan.Finish()
}()

e.log.Debug().
Hex("tx_id", logging.Entity(txBody)).
Msg("executing transaction")
Expand Down Expand Up @@ -330,6 +312,9 @@ func (e *blockComputer) executeTransaction(
Msg("transaction executed successfully")
}

mergeSpan := e.tracer.StartSpanFromParent(txSpan, trace.EXEMergeTransactionView)
defer mergeSpan.Finish()

if tx.Err == nil {
err := collectionView.MergeView(txView)
if err != nil {
Expand All @@ -340,6 +325,7 @@ func (e *blockComputer) executeTransaction(
e.log.Info().
Str("txHash", tx.ID.String()).
Str("traceID", traceID).
Int64("timeSpentInMS", time.Since(startedAt).Milliseconds()).
Msg("transaction executed")

return tx.Events, tx.ServiceEvents, txResult, tx.GasUsed, nil
Expand Down
15 changes: 8 additions & 7 deletions engine/execution/computation/computer/computer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ import (
"github.com/onflow/flow-go/fvm/state"
"github.com/onflow/flow-go/model/flow"
"github.com/onflow/flow-go/module/mempool/entity"
"github.com/onflow/flow-go/module/trace"
"github.com/onflow/flow-go/utils/unittest"
)

Expand All @@ -39,7 +40,7 @@ func TestBlockExecutor_ExecuteBlock(t *testing.T) {

vm := new(computermock.VirtualMachine)

exe, err := computer.NewBlockComputer(vm, execCtx, nil, nil, zerolog.Nop())
exe, err := computer.NewBlockComputer(vm, execCtx, nil, trace.NewNoopTracer(), zerolog.Nop())
require.NoError(t, err)

// create a block with 1 collection with 2 transactions
Expand All @@ -66,7 +67,7 @@ func TestBlockExecutor_ExecuteBlock(t *testing.T) {

vm := new(computermock.VirtualMachine)

exe, err := computer.NewBlockComputer(vm, execCtx, nil, nil, zerolog.Nop())
exe, err := computer.NewBlockComputer(vm, execCtx, nil, trace.NewNoopTracer(), zerolog.Nop())
require.NoError(t, err)

// create an empty block
Expand Down Expand Up @@ -94,7 +95,7 @@ func TestBlockExecutor_ExecuteBlock(t *testing.T) {

vm := new(computermock.VirtualMachine)

exe, err := computer.NewBlockComputer(vm, execCtx, nil, nil, zerolog.Nop())
exe, err := computer.NewBlockComputer(vm, execCtx, nil, trace.NewNoopTracer(), zerolog.Nop())
require.NoError(t, err)

collectionCount := 2
Expand Down Expand Up @@ -218,7 +219,7 @@ func TestBlockExecutor_ExecuteBlock(t *testing.T) {

vm := fvm.NewVirtualMachine(emittingRuntime)

exe, err := computer.NewBlockComputer(vm, execCtx, nil, nil, zerolog.Nop())
exe, err := computer.NewBlockComputer(vm, execCtx, nil, trace.NewNoopTracer(), zerolog.Nop())
require.NoError(t, err)

//vm.On("Run", mock.Anything, mock.Anything, mock.Anything).
Expand Down Expand Up @@ -279,7 +280,7 @@ func TestBlockExecutor_ExecuteBlock(t *testing.T) {

vm := fvm.NewVirtualMachine(rt)

exe, err := computer.NewBlockComputer(vm, execCtx, nil, nil, zerolog.Nop())
exe, err := computer.NewBlockComputer(vm, execCtx, nil, trace.NewNoopTracer(), zerolog.Nop())
require.NoError(t, err)

const collectionCount = 2
Expand Down Expand Up @@ -348,7 +349,7 @@ func TestBlockExecutor_ExecuteBlock(t *testing.T) {

vm := fvm.NewVirtualMachine(rt)

exe, err := computer.NewBlockComputer(vm, execCtx, nil, nil, zerolog.Nop())
exe, err := computer.NewBlockComputer(vm, execCtx, nil, trace.NewNoopTracer(), zerolog.Nop())
require.NoError(t, err)

block := generateBlock(collectionCount, transactionCount, rag)
Expand Down Expand Up @@ -444,7 +445,7 @@ func Test_FreezeAccountChecksAreIncluded(t *testing.T) {
err = accounts.Create([]flow.AccountPublicKey{key.PublicKey(1000)}, address)
require.NoError(t, err)

exe, err := computer.NewBlockComputer(vm, execCtx, nil, nil, zerolog.Nop())
exe, err := computer.NewBlockComputer(vm, execCtx, nil, trace.NewNoopTracer(), zerolog.Nop())
require.NoError(t, err)

block := generateBlockWithVisitor(1, 1, fag, func(txBody *flow.TransactionBody) {
Expand Down
3 changes: 2 additions & 1 deletion engine/execution/computation/manager_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ import (
"github.com/onflow/flow-go/model/flow"
"github.com/onflow/flow-go/module/mempool/entity"
module "github.com/onflow/flow-go/module/mock"
"github.com/onflow/flow-go/module/trace"
"github.com/onflow/flow-go/utils/unittest"
)

Expand Down Expand Up @@ -88,7 +89,7 @@ func TestComputeBlockWithStorage(t *testing.T) {
me := new(module.Local)
me.On("NodeID").Return(flow.ZeroID)

blockComputer, err := computer.NewBlockComputer(vm, execCtx, nil, nil, zerolog.Nop())
blockComputer, err := computer.NewBlockComputer(vm, execCtx, nil, trace.NewNoopTracer(), zerolog.Nop())
require.NoError(t, err)

programsCache, err := NewProgramsCache(10)
Expand Down
5 changes: 3 additions & 2 deletions engine/execution/computation/programs_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ import (
"github.com/onflow/flow-go/model/flow"
"github.com/onflow/flow-go/module/mempool/entity"
module "github.com/onflow/flow-go/module/mock"
"github.com/onflow/flow-go/module/trace"
"github.com/onflow/flow-go/utils/unittest"
)

Expand Down Expand Up @@ -95,7 +96,7 @@ func TestPrograms_TestContractUpdates(t *testing.T) {
me := new(module.Local)
me.On("NodeID").Return(flow.ZeroID)

blockComputer, err := computer.NewBlockComputer(vm, execCtx, nil, nil, zerolog.Nop())
blockComputer, err := computer.NewBlockComputer(vm, execCtx, nil, trace.NewNoopTracer(), zerolog.Nop())
require.NoError(t, err)

programsCache, err := NewProgramsCache(10)
Expand Down Expand Up @@ -162,7 +163,7 @@ func TestPrograms_TestBlockForks(t *testing.T) {
me := new(module.Local)
me.On("NodeID").Return(flow.ZeroID)

blockComputer, err := computer.NewBlockComputer(vm, execCtx, nil, nil, zerolog.Nop())
blockComputer, err := computer.NewBlockComputer(vm, execCtx, nil, trace.NewNoopTracer(), zerolog.Nop())
require.NoError(t, err)

programsCache, err := NewProgramsCache(10)
Expand Down
10 changes: 5 additions & 5 deletions engine/execution/ingestion/engine.go
Original file line number Diff line number Diff line change
Expand Up @@ -1090,22 +1090,22 @@ func (e *Engine) saveExecutionResults(
chunk := generateChunk(i, startState, endState, collectionID, blockID)

// chunkDataPack
allRegisters := view.AllRegisters()

sp := e.tracer.StartSpanFromParent(span, trace.EXEGenerateChunkDataPacks)
allRegisters := view.AllRegisters()
proof, err := e.execState.GetProof(childCtx, chunk.StartState, allRegisters)

if err != nil {
return nil, fmt.Errorf(
"error reading registers with proofs for chunk number [%v] of block [%x] ", i, blockID,
)
}

chdp := generateChunkDataPack(chunk, collectionID, proof)

chdps[i] = chdp
chdps[i] = generateChunkDataPack(chunk, collectionID, proof)
sp.Finish()
// TODO use view.SpockSecret() as an input to spock generator
chunks[i] = chunk
startState = endState

}

executionResult, err := e.generateExecutionResultForBlock(childCtx, executableBlock.Block, chunks, endState, serviceEvents)
Expand Down
31 changes: 15 additions & 16 deletions engine/execution/state/state.go
Original file line number Diff line number Diff line change
Expand Up @@ -251,10 +251,8 @@ func CommitDelta(ldg ledger.Ledger, delta delta.Delta, baseState flow.StateCommi
}

func (s *state) CommitDelta(ctx context.Context, delta delta.Delta, baseState flow.StateCommitment) (flow.StateCommitment, error) {
if s.tracer != nil {
span, _ := s.tracer.StartSpanFromContext(ctx, trace.EXECommitDelta)
defer span.Finish()
}
span, _ := s.tracer.StartSpanFromContext(ctx, trace.EXECommitDelta)
defer span.Finish()

return CommitDelta(s.ls, delta, baseState)
}
Expand All @@ -280,10 +278,8 @@ func (s *state) GetRegisters(
commit flow.StateCommitment,
registerIDs []flow.RegisterID,
) ([]flow.RegisterValue, error) {
if s.tracer != nil {
span, _ := s.tracer.StartSpanFromContext(ctx, trace.EXEGetRegisters)
defer span.Finish()
}
span, _ := s.tracer.StartSpanFromContext(ctx, trace.EXEGetRegisters)
defer span.Finish()

_, values, err := s.getRegisters(commit, registerIDs)
if err != nil {
Expand All @@ -304,6 +300,9 @@ func (s *state) GetProof(
registerIDs []flow.RegisterID,
) (flow.StorageProof, error) {

span, _ := s.tracer.StartSpanFromContext(ctx, trace.EXEGetRegistersWithProofs)
defer span.Finish()

query, err := makeQuery(commit, registerIDs)

if err != nil {
Expand All @@ -322,17 +321,12 @@ func (s *state) StateCommitmentByBlockID(ctx context.Context, blockID flow.Ident
}

func (s *state) ChunkDataPackByChunkID(ctx context.Context, chunkID flow.Identifier) (*flow.ChunkDataPack, error) {
span, _ := s.tracer.StartSpanFromContext(ctx, trace.EXEPersistStateCommitment)
defer span.Finish()

return s.chunkDataPacks.ByChunkID(chunkID)
}

func (s *state) GetExecutionResultID(ctx context.Context, blockID flow.Identifier) (flow.Identifier, error) {
if s.tracer != nil {
span, _ := s.tracer.StartSpanFromContext(ctx, trace.EXEGetExecutionResultID)
defer span.Finish()
}
span, _ := s.tracer.StartSpanFromContext(ctx, trace.EXEGetExecutionResultID)
defer span.Finish()

result, err := s.results.ByBlockID(blockID)
if err != nil {
Expand All @@ -355,6 +349,7 @@ func (s *state) PersistExecutionState(ctx context.Context, header *flow.Header,
// but it's the closes thing to atomicity we could have
batch := badgerstorage.NewBatch(s.db)

sp, _ := s.tracer.StartSpanFromContext(ctx, trace.EXEPersistChunkDataPack)
for _, chunkDataPack := range chunkDataPacks {
err := s.chunkDataPacks.BatchStore(chunkDataPack, batch)
if err != nil {
Expand All @@ -366,21 +361,25 @@ func (s *state) PersistExecutionState(ctx context.Context, header *flow.Header,
return fmt.Errorf("cannot index chunk data pack by blockID: %w", err)
}
}
sp.Finish()

sp, _ = s.tracer.StartSpanFromContext(ctx, trace.EXEPersistStateCommitment)
err := s.commits.BatchStore(blockID, endState, batch)
if err != nil {
return fmt.Errorf("cannot store state commitment: %w", err)
}
sp.Finish()

sp, _ = s.tracer.StartSpanFromContext(ctx, trace.EXEPersistEvents)
err = s.events.BatchStore(blockID, events, batch)
if err != nil {
return fmt.Errorf("cannot store events: %w", err)
}

err = s.serviceEvents.BatchStore(blockID, events, batch)
if err != nil {
return fmt.Errorf("cannot store service events: %w", err)
}
sp.Finish()

executionResult := &executionReceipt.ExecutionResult

Expand Down
3 changes: 2 additions & 1 deletion engine/execution/state/state_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ import (
ledger "github.com/onflow/flow-go/ledger/complete"
"github.com/onflow/flow-go/model/flow"
"github.com/onflow/flow-go/module/metrics"
"github.com/onflow/flow-go/module/trace"
storage "github.com/onflow/flow-go/storage/mock"
"github.com/onflow/flow-go/storage/mocks"
"github.com/onflow/flow-go/utils/unittest"
Expand Down Expand Up @@ -49,7 +50,7 @@ func prepareTest(f func(t *testing.T, es state.ExecutionState)) func(*testing.T)
myReceipts := new(storage.MyExecutionReceipts)

es := state.NewExecutionState(
ls, stateCommitments, blocks, headers, collections, chunkDataPacks, results, receipts, myReceipts, events, serviceEvents, txResults, badgerDB, nil,
ls, stateCommitments, blocks, headers, collections, chunkDataPacks, results, receipts, myReceipts, events, serviceEvents, txResults, badgerDB, trace.NewNoopTracer(),
)

f(t, es)
Expand Down
Loading

0 comments on commit b71dec4

Please sign in to comment.