From 8c13ba0e68f7d034d03afcab4e2ffc05b824e656 Mon Sep 17 00:00:00 2001 From: Topper Bowers Date: Fri, 15 Mar 2019 12:04:41 +0100 Subject: [PATCH 01/30] add initial tracing file --- gossip3/tracing/tracing.go | 10 ++++++++++ 1 file changed, 10 insertions(+) create mode 100644 gossip3/tracing/tracing.go diff --git a/gossip3/tracing/tracing.go b/gossip3/tracing/tracing.go new file mode 100644 index 00000000..921c572e --- /dev/null +++ b/gossip3/tracing/tracing.go @@ -0,0 +1,10 @@ +package tracing + +import ( + "github.com/opentracing/opentracing-go" + "go.elastic.co/apm/module/apmot" +) + +func StartElastic() { + opentracing.SetGlobalTracer(apmot.New()) +} From fe5bdb7f0a973f7f848cb486512186b50309dbac Mon Sep 17 00:00:00 2001 From: Topper Bowers Date: Fri, 15 Mar 2019 12:04:56 +0100 Subject: [PATCH 02/30] add a context to a transaction --- gossip3/messages/messages.go | 3 +++ 1 file changed, 3 insertions(+) diff --git a/gossip3/messages/messages.go b/gossip3/messages/messages.go index 25ede5c8..44735f72 100644 --- a/gossip3/messages/messages.go +++ b/gossip3/messages/messages.go @@ -1,6 +1,8 @@ package messages import ( + "context" + "github.com/AsynkronIT/protoactor-go/actor" extmsgs "github.com/quorumcontrol/tupelo-go-client/gossip3/messages" "github.com/quorumcontrol/tupelo-go-client/gossip3/types" @@ -104,6 +106,7 @@ type TransactionWrapper struct { Key []byte Value []byte Metadata MetadataMap + Context context.Context } type MemPoolCleanup struct { From 93d2f8c16ae709c2a94eba102abc9c3c5df0574d Mon Sep 17 00:00:00 2001 From: Topper Bowers Date: Fri, 15 Mar 2019 12:05:13 +0100 Subject: [PATCH 03/30] rename context to actorContext --- gossip3/actors/validator.go | 26 +++++++++++++------------- 1 file changed, 13 insertions(+), 13 deletions(-) diff --git a/gossip3/actors/validator.go b/gossip3/actors/validator.go index 4486d59e..b4dd4e1f 100644 --- a/gossip3/actors/validator.go +++ b/gossip3/actors/validator.go @@ -57,11 +57,11 @@ func NewTransactionValidatorProps(currentStateStore storage.Reader) *actor.Props ) } -func (tv *TransactionValidator) Receive(context actor.Context) { - switch msg := context.Message().(type) { +func (tv *TransactionValidator) Receive(actorCtx actor.Context) { + switch msg := actorCtx.Message().(type) { case *validationRequest: tv.Log.Debugw("stateHandler initial", "key", msg.key) - tv.handleRequest(context, msg) + tv.handleRequest(actorCtx, msg) } } @@ -69,7 +69,7 @@ func (tv *TransactionValidator) nextHeight(objectID []byte) uint64 { return nextHeight(tv.reader, objectID) } -func (tv *TransactionValidator) handleRequest(context actor.Context, msg *validationRequest) { +func (tv *TransactionValidator) handleRequest(actorCtx actor.Context, msg *validationRequest) { wrapper := &messages.TransactionWrapper{ Key: msg.key, Value: msg.value, @@ -82,7 +82,7 @@ func (tv *TransactionValidator) handleRequest(context actor.Context, msg *valida _, err := t.UnmarshalMsg(msg.value) if err != nil { tv.Log.Infow("error unmarshaling", "err", err) - context.Respond(wrapper) + actorCtx.Respond(wrapper) return } wrapper.ConflictSetID = t.ConflictSetID() @@ -106,25 +106,25 @@ func (tv *TransactionValidator) handleRequest(context actor.Context, msg *valida currTip = currentState.Signature.NewTip } else if expectedHeight < t.Height { wrapper.PreFlight = true - context.Respond(wrapper) + actorCtx.Respond(wrapper) return } else { tv.Log.Debugf("transaction height %d is lower than current state height %d; ignoring", t.Height, expectedHeight) wrapper.Stale = true - context.Respond(wrapper) + actorCtx.Respond(wrapper) return } } else { if t.Height != 0 { wrapper.PreFlight = true - context.Respond(wrapper) + actorCtx.Respond(wrapper) return } } if !bytes.Equal(crypto.Keccak256(msg.value), msg.key) { tv.Log.Errorw("invalid transaction: key did not match value") - context.Respond(wrapper) + actorCtx.Respond(wrapper) return } @@ -132,13 +132,13 @@ func (tv *TransactionValidator) handleRequest(context actor.Context, msg *valida err = cbornode.DecodeInto(t.Payload, block) if err != nil { tv.Log.Errorw("invalid transaction: payload is not a block") - context.Respond(wrapper) + actorCtx.Respond(wrapper) return } if block.Height != t.Height { tv.Log.Errorw("invalid transaction block height != transaction height", "blockHeight", block.Height, "transHeight", t.Height, "transaction", msg.key) - context.Respond(wrapper) + actorCtx.Respond(wrapper) return } @@ -158,7 +158,7 @@ func (tv *TransactionValidator) handleRequest(context actor.Context, msg *valida if accepted && expectedNewTip { tv.Log.Debugw("accepted", "key", msg.key) wrapper.Accepted = true - context.Respond(wrapper) + actorCtx.Respond(wrapper) return } else { if err == nil && !expectedNewTip { @@ -171,7 +171,7 @@ func (tv *TransactionValidator) handleRequest(context actor.Context, msg *valida tv.Log.Debugw("rejected", "err", err) - context.Respond(wrapper) + actorCtx.Respond(wrapper) } func chainTreeStateHandler(stateTrans *stateTransaction) (nextState []byte, accepted bool, err error) { From 4120c01f4fce7f61b7a0db1f271e044bd38c4f18 Mon Sep 17 00:00:00 2001 From: Topper Bowers Date: Fri, 15 Mar 2019 13:24:24 +0100 Subject: [PATCH 04/30] some basic spans and logging for opentracing --- gossip3/actors/conflictset.go | 15 ++++++++++++ gossip3/actors/conflictsetrouter.go | 2 ++ gossip3/actors/conflictsetrouter_test.go | 5 +++- gossip3/actors/tupelo.go | 2 ++ gossip3/actors/validator.go | 23 ++++++++++++------ gossip3/messages/messages.go | 30 ++++++++++++++++++++++++ 6 files changed, 69 insertions(+), 8 deletions(-) diff --git a/gossip3/actors/conflictset.go b/gossip3/actors/conflictset.go index 6525c3b5..b62f88fa 100644 --- a/gossip3/actors/conflictset.go +++ b/gossip3/actors/conflictset.go @@ -192,16 +192,21 @@ func (cs *ConflictSet) DoneReceive(context actor.Context) { } func (cs *ConflictSet) handleNewTransaction(context actor.Context, msg *messages.TransactionWrapper) { + sp := msg.NewSpan("conflictset-handlenewtransaction") + defer sp.Finish() + cs.Log.Debugw("new transaction", "trans", msg.TransactionID) if !msg.PreFlight && !msg.Accepted { panic(fmt.Sprintf("we should only handle pre-flight or accepted transactions at this level")) } if msg.Accepted { + sp.LogKV("accepted", msg.Accepted) cs.active = true } if !cs.active { + sp.LogKV("snoozing", true) cs.Log.Debugw("snoozing transaction", "t", msg.Key, "height", msg.Transaction.Height) } cs.transactions[string(msg.TransactionID)] = msg @@ -216,13 +221,16 @@ func (cs *ConflictSet) processTransactions(context actor.Context) { } for _, transaction := range cs.transactions { + sp := transaction.NewSpan("conflictset-processing") cs.Log.Debugw("processing transaction", "t", transaction.Key, "height", transaction.Transaction.Height) if !cs.didSign { context.Request(cs.signatureGenerator, transaction) + sp.LogKV("didSign", true) cs.didSign = true } cs.updates++ + sp.Finish() } // do this as a message to make sure we're doing it after all the updates have come in context.Send(context.Self(), &checkStateMsg{atUpdate: cs.updates}) @@ -270,6 +278,8 @@ func (cs *ConflictSet) checkState(context actor.Context, msg *checkStateMsg) { return } if trans := cs.possiblyDone(); trans != nil { + sp := trans.NewSpan("checkState") + defer sp.Finish() // we have a possibly done transaction, lets make a current state if err := cs.createCurrentStateFromTrans(context, trans); err != nil { panic(err) @@ -287,6 +297,7 @@ func (cs *ConflictSet) handleDeadlockedState(context actor.Context) { var lowestTrans *messages.TransactionWrapper for transID, trans := range cs.transactions { + trans.LogKV("deadlocked", true) if lowestTrans == nil { lowestTrans = trans continue @@ -312,6 +323,9 @@ func (cs *ConflictSet) nextView(newWinner *messages.TransactionWrapper) { } func (cs *ConflictSet) createCurrentStateFromTrans(context actor.Context, trans *messages.TransactionWrapper) error { + sp := trans.NewSpan("createCurrentState") + defer sp.Finish() + cs.Log.Debugw("createCurrentStateFromTrans", "t", trans.Key) sigs := cs.signatures[string(trans.TransactionID)] var sigBytes [][]byte @@ -415,6 +429,7 @@ func (cs *ConflictSet) handleCurrentStateWrapper(context actor.Context, currWrap currWrapper.CleanupTransactions = make([]*messages.TransactionWrapper, len(cs.transactions)) i := 0 for _, t := range cs.transactions { + t.LogKV("done", true) currWrapper.CleanupTransactions[i] = t i++ } diff --git a/gossip3/actors/conflictsetrouter.go b/gossip3/actors/conflictsetrouter.go index 1dc5b991..a6f5dbe3 100644 --- a/gossip3/actors/conflictsetrouter.go +++ b/gossip3/actors/conflictsetrouter.go @@ -64,7 +64,9 @@ func (csr *ConflictSetRouter) nextHeight(objectID []byte) uint64 { func (csr *ConflictSetRouter) Receive(context actor.Context) { switch msg := context.Message().(type) { case *messages.TransactionWrapper: + sp := msg.NewSpan("conflictset-router") csr.forwardOrIgnore(context, []byte(msg.ConflictSetID)) + sp.Finish() case *messages.SignatureWrapper: csr.forwardOrIgnore(context, []byte(msg.ConflictSetID)) case *extmsgs.Signature: diff --git a/gossip3/actors/conflictsetrouter_test.go b/gossip3/actors/conflictsetrouter_test.go index d23fa081..c9a49db1 100644 --- a/gossip3/actors/conflictsetrouter_test.go +++ b/gossip3/actors/conflictsetrouter_test.go @@ -1,6 +1,7 @@ package actors import ( + "context" "strconv" "testing" "time" @@ -9,6 +10,7 @@ import ( "github.com/AsynkronIT/protoactor-go/plugin" "github.com/ethereum/go-ethereum/common/hexutil" "github.com/ethereum/go-ethereum/crypto" + "github.com/opentracing/opentracing-go" "github.com/quorumcontrol/storage" extmsgs "github.com/quorumcontrol/tupelo-go-client/gossip3/messages" "github.com/quorumcontrol/tupelo-go-client/gossip3/middleware" @@ -311,7 +313,7 @@ func fakeValidateTransaction(t testing.TB, trans *extmsgs.Transaction) *messages bits, err := trans.MarshalMsg(nil) require.Nil(t, err) key := crypto.Keccak256(bits) - + _, ctx := opentracing.StartSpanFromContext(context.Background(), "transaction") wrapper := &messages.TransactionWrapper{ TransactionID: key, Transaction: trans, @@ -321,6 +323,7 @@ func fakeValidateTransaction(t testing.TB, trans *extmsgs.Transaction) *messages PreFlight: true, Accepted: true, Metadata: messages.MetadataMap{"seen": time.Now()}, + Context: ctx, } return wrapper } diff --git a/gossip3/actors/tupelo.go b/gossip3/actors/tupelo.go index d9bd373d..a80f9ce6 100644 --- a/gossip3/actors/tupelo.go +++ b/gossip3/actors/tupelo.go @@ -95,6 +95,7 @@ func (tn *TupeloNode) handleNewCurrentState(context actor.Context, msg *messages // cleanup the transactions ids := make([][]byte, len(msg.CleanupTransactions)) for i, trans := range msg.CleanupTransactions { + trans.StopTrace() ids[i] = trans.TransactionID } context.Send(tn.mempoolStore, &messages.BulkRemove{ObjectIDs: ids}) @@ -144,6 +145,7 @@ func (tn *TupeloNode) handleNewTransaction(context actor.Context) { Memo: fmt.Sprintf("bad transaction: %v", msg.Metadata["error"]), }) } + msg.StopTrace() context.Send(tn.mempoolStore, &messages.Remove{Key: msg.Key}) } } diff --git a/gossip3/actors/validator.go b/gossip3/actors/validator.go index b4dd4e1f..d3caa6e2 100644 --- a/gossip3/actors/validator.go +++ b/gossip3/actors/validator.go @@ -78,6 +78,11 @@ func (tv *TransactionValidator) handleRequest(actorCtx actor.Context, msg *valid Stale: false, Metadata: messages.MetadataMap{"seen": time.Now()}, } + parentSpan := wrapper.StartTrace() + + sp := wrapper.NewSpan("validator") + defer sp.Finish() + var t extmsgs.Transaction _, err := t.UnmarshalMsg(msg.value) if err != nil { @@ -88,6 +93,8 @@ func (tv *TransactionValidator) handleRequest(actorCtx actor.Context, msg *valid wrapper.ConflictSetID = t.ConflictSetID() wrapper.Transaction = &t wrapper.TransactionID = msg.key + parentSpan.SetTag("conflictSetID", string(wrapper.ConflictSetID)) + parentSpan.SetTag("transactionID", string(wrapper.TransactionID)) var currTip []byte objectIDBits, err := tv.reader.Get(t.ObjectID) @@ -160,14 +167,16 @@ func (tv *TransactionValidator) handleRequest(actorCtx actor.Context, msg *valid wrapper.Accepted = true actorCtx.Respond(wrapper) return - } else { - if err == nil && !expectedNewTip { - nextStateCid, _ := cid.Cast(nextState) - newTipCid, _ := cid.Cast(t.NewTip) - err = fmt.Errorf("error: expected new tip: %s but got: %s", nextStateCid.String(), newTipCid.String()) - } - wrapper.Metadata["error"] = err } + sp.LogKV("accepted", false) + + if err == nil && !expectedNewTip { + nextStateCid, _ := cid.Cast(nextState) + newTipCid, _ := cid.Cast(t.NewTip) + err = fmt.Errorf("error: expected new tip: %s but got: %s", nextStateCid.String(), newTipCid.String()) + } + sp.LogKV("error", err) + wrapper.Metadata["error"] = err tv.Log.Debugw("rejected", "err", err) diff --git a/gossip3/messages/messages.go b/gossip3/messages/messages.go index 44735f72..38a4cc52 100644 --- a/gossip3/messages/messages.go +++ b/gossip3/messages/messages.go @@ -4,6 +4,7 @@ import ( "context" "github.com/AsynkronIT/protoactor-go/actor" + "github.com/opentracing/opentracing-go" extmsgs "github.com/quorumcontrol/tupelo-go-client/gossip3/messages" "github.com/quorumcontrol/tupelo-go-client/gossip3/types" ) @@ -109,6 +110,35 @@ type TransactionWrapper struct { Context context.Context } +type contextSpanKey struct{} + +var parentSpanKey = contextSpanKey{} + +// StartTrace starts the parent trace of a transactionwrapper +func (tw *TransactionWrapper) StartTrace() opentracing.Span { + parent, ctx := opentracing.StartSpanFromContext(context.Background(), "transaction") + ctx = context.WithValue(ctx, parentSpanKey, parent) + tw.Context = ctx + return parent +} + +// StartTrace starts the parent trace of a transactionwrapper +func (tw *TransactionWrapper) StopTrace() { + val := tw.Context.Value(parentSpanKey) + val.(opentracing.Span).Finish() +} + +func (tw *TransactionWrapper) NewSpan(name string) opentracing.Span { + sp, ctx := opentracing.StartSpanFromContext(tw.Context, name) + tw.Context = ctx + return sp +} + +func (tw *TransactionWrapper) LogKV(key string, value interface{}) { + sp := opentracing.SpanFromContext(tw.Context) + sp.LogKV(key, value) +} + type MemPoolCleanup struct { Transactions [][]byte } From b4013a00742846eadb9fbf3c0efe5d4c1da16530 Mon Sep 17 00:00:00 2001 From: Topper Bowers Date: Fri, 15 Mar 2019 13:24:33 +0100 Subject: [PATCH 05/30] add elastic and opentracing --- Gopkg.lock | 98 ++++++++++++++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 98 insertions(+) diff --git a/Gopkg.lock b/Gopkg.lock index e2fdd4ae..a1f8c46a 100644 --- a/Gopkg.lock +++ b/Gopkg.lock @@ -75,6 +75,14 @@ pruneopts = "" revision = "8b13a72661dae6e9e5dea04f344f0dc95ea29547" +[[projects]] + digest = "1:5680f8c40e48f07cb77aece3165a866aaf8276305258b3b70db8ec7ad6ddb78d" + name = "github.com/armon/go-radix" + packages = ["."] + pruneopts = "" + revision = "1a2de0c21c94309923825da3df33a4381872c795" + version = "v1.0.0" + [[projects]] branch = "master" digest = "1:c0bec5f9b98d0bc872ff5e834fac186b807b656683bd29cb82fb207a1513fabb" @@ -220,6 +228,30 @@ revision = "9f541cc9db5d55bce703bd99987c9d5cb8eea45e" version = "v1.0.0" +[[projects]] + branch = "master" + digest = "1:10808a231991506e33d92579224ad81e649bc0b2874d99e0c8be8e234a52c3ab" + name = "github.com/elastic/go-sysinfo" + packages = [ + ".", + "internal/registry", + "providers/darwin", + "providers/linux", + "providers/shared", + "providers/windows", + "types", + ] + pruneopts = "" + revision = "59ef8c0eae46c0929e3b219ac86368d4b5934f91" + +[[projects]] + branch = "master" + digest = "1:63509fc7f9aac866d56904b62131b2d1e71291d36513e072c560c3deaad704a0" + name = "github.com/elastic/go-windows" + packages = ["."] + pruneopts = "" + revision = "bb1581babc04d5cb29a2bfa7a9ac6781c730c8dd" + [[projects]] branch = "master" digest = "1:bbae4a0efc62c72b8b9a793b02f89035a8869ae46b6e9afde628e08ef0bb900b" @@ -971,6 +1003,14 @@ revision = "f55edac94c9bbba5d6182a4be46d86a2c9b5b50e" version = "v1.0.2" +[[projects]] + branch = "master" + digest = "1:521a4970efd87aee222f294b7f0c281266fd4325a3e5c31bda725dff082d120a" + name = "github.com/joeshaw/multierror" + packages = ["."] + pruneopts = "" + revision = "69b34d4ec901851247ae7e77d33909caf9df99ed" + [[projects]] branch = "master" digest = "1:560e150c2fcf04a92c22029205edb57cd0282a545dd949165dae5cd65bce1204" @@ -1308,6 +1348,20 @@ revision = "9a47f48565a795472d43519dd49aac781f3034fb" version = "v1.6.0" +[[projects]] + digest = "1:4b73a0659a08dd32fb042bfee9e62f4481bddf18cfc4dbff7dc68084e6ea4460" + name = "github.com/santhosh-tekuri/jsonschema" + packages = [ + ".", + "decoders", + "formats", + "loader", + "mediatypes", + ] + pruneopts = "" + revision = "534765aa6277d702a0aaf99c986492dda697d48f" + version = "v1.2.4" + [[projects]] branch = "master" digest = "1:4176436b905cb4846fc4664cd673d7f941b9aca1ba7d407ce70fe2838695b338" @@ -1447,6 +1501,39 @@ pruneopts = "" revision = "0457bb6b88fc1973573aaf6b5145d8d3ae972390" +[[projects]] + digest = "1:a98315339728e075197d04be6a57a46b141aa382db3728392f0771e9a54cf77b" + name = "go.elastic.co/apm" + packages = [ + ".", + "internal/apmconfig", + "internal/apmcontext", + "internal/apmhostutil", + "internal/apmhttputil", + "internal/apmlog", + "internal/apmschema", + "internal/apmstrings", + "internal/iochan", + "internal/ringbuffer", + "internal/wildcard", + "model", + "module/apmhttp", + "module/apmot", + "stacktrace", + "transport", + ] + pruneopts = "" + revision = "cc1e77d6cd8552403fc3f91464308b1d3ceebab4" + version = "v1.2.0" + +[[projects]] + digest = "1:e05c654acb77a3d7c00aecaf64fdb66310f5f063f896a0d749be8a8d1fb168df" + name = "go.elastic.co/fastjson" + packages = ["."] + pruneopts = "" + revision = "676cec79bd027a8a4618de0991f27cfe632978f1" + version = "v1.0.0" + [[projects]] digest = "1:74f86c458e82e1c4efbab95233e0cf51b7cc02dc03193be9f62cd81224e10401" name = "go.uber.org/atomic" @@ -1543,6 +1630,7 @@ "cpu", "unix", "windows", + "windows/registry", ] pruneopts = "" revision = "b90733256f2e882e81d52f9126de08df5615afd9" @@ -1661,6 +1749,14 @@ revision = "51d6538a90f86fe93ac480b35f37b2be17fef232" version = "v2.2.2" +[[projects]] + branch = "master" + digest = "1:e43acd1190dde7223727f1f8aeac537156d33bf87776d5637e672e1c6b354be4" + name = "howett.net/plist" + packages = ["."] + pruneopts = "" + revision = "591f970eefbbeb04d7b37f334a0c4c3256e32876" + [solve-meta] analyzer-name = "dep" analyzer-version = 1 @@ -1696,6 +1792,7 @@ "github.com/ipsn/go-ipfs/plugin/loader", "github.com/ipsn/go-ipfs/repo/fsrepo", "github.com/mitchellh/go-homedir", + "github.com/opentracing/opentracing-go", "github.com/quorumcontrol/chaintree/chaintree", "github.com/quorumcontrol/chaintree/dag", "github.com/quorumcontrol/chaintree/nodestore", @@ -1718,6 +1815,7 @@ "github.com/stretchr/testify/assert", "github.com/stretchr/testify/require", "github.com/tinylib/msgp/msgp", + "go.elastic.co/apm/module/apmot", "golang.org/x/net/context", "google.golang.org/grpc", "google.golang.org/grpc/codes", From bb1d0a3ec579d38aad3e0a2da9bff4cdbaa2cc94 Mon Sep 17 00:00:00 2001 From: Topper Bowers Date: Fri, 15 Mar 2019 14:32:21 +0100 Subject: [PATCH 06/30] add elastic and jaeger --- Gopkg.lock | 46 ++++++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 46 insertions(+) diff --git a/Gopkg.lock b/Gopkg.lock index a1f8c46a..fc2f2e32 100644 --- a/Gopkg.lock +++ b/Gopkg.lock @@ -149,6 +149,14 @@ pruneopts = "" revision = "f6d7a1f6fbf35bbf9beb80dc63c56a29dcfb759f" +[[projects]] + branch = "master" + digest = "1:c46fd324e7902268373e1b337436a6377c196e2dbd7b35624c6256d29d494e78" + name = "github.com/codahale/hdrhistogram" + packages = ["."] + pruneopts = "" + revision = "3a0bb77429bd3a61596f5e8a3172445844342120" + [[projects]] digest = "1:3c3f68ebab415344aef64363d23471e953a4715645115604aaf57923ae904f5e" name = "github.com/coreos/go-semver" @@ -1485,6 +1493,40 @@ revision = "af6442a0fcf6e2a1b824f70dd0c734f01e817751" version = "v1.1.0" +[[projects]] + digest = "1:941ab4973b3218a9a6d02d31734f5c762239eb538c0d702fff62d4037af8ab0a" + name = "github.com/uber/jaeger-client-go" + packages = [ + ".", + "config", + "internal/baggage", + "internal/baggage/remote", + "internal/spanlog", + "internal/throttler", + "internal/throttler/remote", + "log", + "rpcmetrics", + "thrift", + "thrift-gen/agent", + "thrift-gen/baggage", + "thrift-gen/jaeger", + "thrift-gen/sampling", + "thrift-gen/zipkincore", + "transport", + "utils", + ] + pruneopts = "" + revision = "1a782e2da844727691fef1757c72eb190c2909f0" + version = "v2.15.0" + +[[projects]] + digest = "1:aa1598d34009b45ce74fdabdd25e4258d7923d1e1b418d4c98482e79607cb9b0" + name = "github.com/uber/jaeger-lib" + packages = ["metrics"] + pruneopts = "" + revision = "ed3a127ec5fef7ae9ea95b01b542c47fbd999ce5" + version = "v1.5.0" + [[projects]] branch = "master" digest = "1:1e10b6e24ab9cbfafc7db7985cc43f01624a5a82d188572baf2ab2257b4427e2" @@ -1815,6 +1857,10 @@ "github.com/stretchr/testify/assert", "github.com/stretchr/testify/require", "github.com/tinylib/msgp/msgp", + "github.com/uber/jaeger-client-go", + "github.com/uber/jaeger-client-go/config", + "github.com/uber/jaeger-client-go/log", + "github.com/uber/jaeger-lib/metrics", "go.elastic.co/apm/module/apmot", "golang.org/x/net/context", "google.golang.org/grpc", From 5577e4316801c8496f66faae67f51f43314948d1 Mon Sep 17 00:00:00 2001 From: Topper Bowers Date: Fri, 15 Mar 2019 14:32:36 +0100 Subject: [PATCH 07/30] add a startjaeger --- gossip3/tracing/tracing.go | 40 ++++++++++++++++++++++++++++++++++++++ 1 file changed, 40 insertions(+) diff --git a/gossip3/tracing/tracing.go b/gossip3/tracing/tracing.go index 921c572e..be7b665a 100644 --- a/gossip3/tracing/tracing.go +++ b/gossip3/tracing/tracing.go @@ -1,10 +1,50 @@ package tracing import ( + "log" + "github.com/opentracing/opentracing-go" + "github.com/uber/jaeger-client-go" + jaegercfg "github.com/uber/jaeger-client-go/config" + jaegerlog "github.com/uber/jaeger-client-go/log" + "github.com/uber/jaeger-lib/metrics" "go.elastic.co/apm/module/apmot" ) func StartElastic() { opentracing.SetGlobalTracer(apmot.New()) } + +func StartJaeger() { + // Sample configuration for testing. Use constant sampling to sample every trace + // and enable LogSpan to log every span via configured Logger. + cfg := jaegercfg.Configuration{ + Sampler: &jaegercfg.SamplerConfig{ + Type: jaeger.SamplerTypeConst, + Param: 1, + }, + Reporter: &jaegercfg.ReporterConfig{ + LogSpans: true, + }, + } + + // Example logger and metrics factory. Use github.com/uber/jaeger-client-go/log + // and github.com/uber/jaeger-lib/metrics respectively to bind to real logging and metrics + // frameworks. + jLogger := jaegerlog.StdLogger + jMetricsFactory := metrics.NullFactory + + // Initialize tracer with a logger and a metrics factory + _, err := cfg.InitGlobalTracer( + "testNode", + jaegercfg.Logger(jLogger), + jaegercfg.Metrics(jMetricsFactory), + ) + if err != nil { + log.Printf("Could not initialize jaeger tracer: %s", err.Error()) + return + } + // defer closer.Close() + + // continue main() +} From 7ec3fa5d3f4a0b0fb4b0e354530e5e647b86fac3 Mon Sep 17 00:00:00 2001 From: Topper Bowers Date: Fri, 15 Mar 2019 14:34:17 +0100 Subject: [PATCH 08/30] use jaeger for now --- cmd/testnode.go | 2 ++ 1 file changed, 2 insertions(+) diff --git a/cmd/testnode.go b/cmd/testnode.go index b7712d40..d8d16c23 100644 --- a/cmd/testnode.go +++ b/cmd/testnode.go @@ -36,6 +36,7 @@ import ( "github.com/quorumcontrol/tupelo-go-client/p2p" gossip3actors "github.com/quorumcontrol/tupelo/gossip3/actors" "github.com/quorumcontrol/tupelo/gossip3/messages" + "github.com/quorumcontrol/tupelo/gossip3/tracing" "github.com/spf13/cobra" ) @@ -60,6 +61,7 @@ var testnodeCmd = &cobra.Command{ blsKeyHex := os.Getenv("TUPELO_NODE_BLS_KEY_HEX") signer := setupGossipNode(ctx, ecdsaKeyHex, blsKeyHex, "distributed-network", testnodePort) actor.EmptyRootContext.Send(signer.Actor, &messages.StartGossip{}) + tracing.StartJaeger() stopOnSignal(signer) }, } From 2b65dad64641a26a358b9765c1be46c29923df18 Mon Sep 17 00:00:00 2001 From: Topper Bowers Date: Fri, 15 Mar 2019 17:00:29 +0100 Subject: [PATCH 09/30] add fix comment --- gossip3/messages/messages.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/gossip3/messages/messages.go b/gossip3/messages/messages.go index 38a4cc52..c45d7f3a 100644 --- a/gossip3/messages/messages.go +++ b/gossip3/messages/messages.go @@ -122,7 +122,7 @@ func (tw *TransactionWrapper) StartTrace() opentracing.Span { return parent } -// StartTrace starts the parent trace of a transactionwrapper +// StopTrace stops the parent trace of a transactionwrapper func (tw *TransactionWrapper) StopTrace() { val := tw.Context.Value(parentSpanKey) val.(opentracing.Span).Finish() From 4d75722268d3b7d11606481fe812e6cb0c78ede5 Mon Sep 17 00:00:00 2001 From: Topper Bowers Date: Mon, 18 Mar 2019 17:04:46 +0100 Subject: [PATCH 10/30] tracing for push syncer --- cmd/testnode.go | 2 +- gossip3/actors/gossiper.go | 41 ++++++++---- gossip3/actors/pushsyncer.go | 103 ++++++++++++++++++++++++++-- gossip3/messages/internal.go | 3 +- gossip3/messages/internal_gen.go | 111 ++++++++++++++++++++++++++++--- gossip3/tracing/tracing.go | 17 +++-- 6 files changed, 243 insertions(+), 34 deletions(-) diff --git a/cmd/testnode.go b/cmd/testnode.go index d8d16c23..661b64dd 100644 --- a/cmd/testnode.go +++ b/cmd/testnode.go @@ -76,7 +76,7 @@ func setupNotaryGroup(local *gossip3types.Signer, keys []*PublicKeySet) *gossip3 if local != nil { group.AddSigner(local) } - +w for _, keySet := range keys { ecdsaBytes := hexutil.MustDecode(keySet.EcdsaHexPublicKey) if local != nil && bytes.Equal(crypto.FromECDSAPub(local.DstKey), ecdsaBytes) { diff --git a/gossip3/actors/gossiper.go b/gossip3/actors/gossiper.go index 2c94ccb2..266a2843 100644 --- a/gossip3/actors/gossiper.go +++ b/gossip3/actors/gossiper.go @@ -1,12 +1,14 @@ package actors import ( + "context" "fmt" "strings" "time" "github.com/AsynkronIT/protoactor-go/actor" "github.com/AsynkronIT/protoactor-go/plugin" + "github.com/opentracing/opentracing-go" extmsgs "github.com/quorumcontrol/tupelo-go-client/gossip3/messages" "github.com/quorumcontrol/tupelo-go-client/gossip3/middleware" "github.com/quorumcontrol/tupelo/gossip3/messages" @@ -54,14 +56,14 @@ func NewGossiperProps(kind string, storage *actor.PID, system system, pusherProp ) } -func (g *Gossiper) Receive(context actor.Context) { +func (g *Gossiper) Receive(actorContext actor.Context) { // defer func() { // if r := recover(); r != nil { // g.Log.Errorw("recover", "r", r) // panic(r) // } // }() - switch msg := context.Message().(type) { + switch msg := actorContext.Message().(type) { case *actor.Restarting: g.Log.Infow("restarting") case *actor.Terminated: @@ -73,12 +75,12 @@ func (g *Gossiper) Receive(context actor.Context) { go func(pid *actor.PID) { <-timer //fmt.Printf("%s Sending DoOneGossip\n", pid.String()) - context.Send(pid, &messages.DoOneGossip{}) - }(context.Self()) + actorContext.Send(pid, &messages.DoOneGossip{}) + }(actorContext.Self()) return } - if strings.HasPrefix(msg.Who.GetId(), context.Self().GetId()+"/"+remoteSyncerPrefix) { + if strings.HasPrefix(msg.Who.GetId(), actorContext.Self().GetId()+"/"+remoteSyncerPrefix) { g.Log.Debugw("releasing a new remote syncer") g.syncersAvailable++ return @@ -90,7 +92,7 @@ func (g *Gossiper) Receive(context actor.Context) { case *messages.StartGossip: g.Log.Debugw("start gossip") g.validatorClear = true - context.Send(context.Self(), &messages.DoOneGossip{ + actorContext.Send(actorContext.Self(), &messages.DoOneGossip{ Why: "startGosip", }) case *messages.DoOneGossip: @@ -100,27 +102,40 @@ func (g *Gossiper) Receive(context actor.Context) { return } g.Log.Debugw("gossiping again") - localsyncer, err := context.SpawnNamed(g.pusherProps, "pushSyncer") + localsyncer, err := actorContext.SpawnNamed(g.pusherProps, "pushSyncer") if err != nil { panic(fmt.Sprintf("error spawning: %v", err)) } g.pids[currentPusherKey] = localsyncer - context.Send(localsyncer, &messages.DoPush{ + actorContext.Send(localsyncer, &messages.DoPush{ System: g.system, }) case *messages.GetSyncer: - g.Log.Debugw("GetSyncer", "remote", context.Sender().GetId()) + g.Log.Debugw("GetSyncer", "remote", actorContext.Sender().GetId()) + ctx := context.Background() + spanContext, err := opentracing.GlobalTracer().Extract(opentracing.TextMap, opentracing.TextMapCarrier(msg.Context)) + var sp opentracing.Span + if err == nil { + sp = opentracing.StartSpan("pushSyncer-receiver", opentracing.ChildOf(spanContext)) + } else { + g.Log.Warnw("error decoding remote context", "err", err, "msg", msg, "from", actorContext.Sender().String()) + sp = opentracing.StartSpan("pushSyncer-receiver-no-parent") + } + sp.SetTag("actor", actorContext.Self().String()) + ctx = opentracing.ContextWithSpan(ctx, sp) + if g.syncersAvailable > 0 { - receiveSyncer := context.SpawnPrefix(g.pusherProps, remoteSyncerPrefix) + receiveSyncer := actorContext.SpawnPrefix(g.pusherProps, remoteSyncerPrefix) g.syncersAvailable-- available := &messages.SyncerAvailable{} available.SetDestination(extmsgs.ToActorPid(receiveSyncer)) - context.Respond(available) + actorContext.Send(receiveSyncer, &setContext{context: ctx}) + actorContext.Respond(available) } else { - context.Respond(&messages.NoSyncersAvailable{}) + actorContext.Respond(&messages.NoSyncersAvailable{}) } case *messages.Subscribe: - context.Forward(g.storageActor) + actorContext.Forward(g.storageActor) } } diff --git a/gossip3/actors/pushsyncer.go b/gossip3/actors/pushsyncer.go index faafd276..176c6672 100644 --- a/gossip3/actors/pushsyncer.go +++ b/gossip3/actors/pushsyncer.go @@ -1,18 +1,24 @@ package actors import ( + "context" "fmt" "strings" "time" "github.com/AsynkronIT/protoactor-go/actor" "github.com/AsynkronIT/protoactor-go/plugin" + "github.com/opentracing/opentracing-go" "github.com/quorumcontrol/differencedigest/ibf" extmsgs "github.com/quorumcontrol/tupelo-go-client/gossip3/messages" "github.com/quorumcontrol/tupelo-go-client/gossip3/middleware" "github.com/quorumcontrol/tupelo/gossip3/messages" ) +type setContext struct { + context context.Context +} + // PushSyncer is the main remote-facing actor that handles // Sending out syncs type PushSyncer struct { @@ -23,6 +29,51 @@ type PushSyncer struct { storageActor *actor.PID remote *actor.PID sendingObjects bool + context context.Context +} + +type contextPushSyncerKey struct{} + +var parentPushSyncerKey = contextPushSyncerKey{} + +func (ps *PushSyncer) startInitiatorTrace() opentracing.Span { + parent, ctx := opentracing.StartSpanFromContext(context.Background(), "push-syncer") + ctx = context.WithValue(ctx, parentPushSyncerKey, parent) + ps.context = ctx + return parent +} + +func (ps *PushSyncer) stopTrace() { + val := ps.context.Value(parentPushSyncerKey) + val.(opentracing.Span).Finish() +} + +func (ps *PushSyncer) newSpan(name string) opentracing.Span { + sp, ctx := opentracing.StartSpanFromContext(ps.context, name) + ps.context = ctx + return sp +} + +func (ps *PushSyncer) serializedContext() (map[string]string, error) { + serializedContext := make(map[string]string) + sp := opentracing.SpanFromContext(ps.context) + err := opentracing.GlobalTracer().Inject(sp.Context(), opentracing.TextMap, opentracing.TextMapCarrier(serializedContext)) + if err != nil { + return nil, fmt.Errorf("error injecting: %v", err) + } + ps.Log.Debugw("serialized", "obj", serializedContext) + return serializedContext, nil +} + +func (ps *PushSyncer) setContext(ctx context.Context) { + parent := opentracing.SpanFromContext(ctx) + ctx = context.WithValue(ctx, parentPushSyncerKey, parent) + ps.context = ctx +} + +func (ps *PushSyncer) LogKV(key string, value interface{}) { + sp := opentracing.SpanFromContext(ps.context) + sp.LogKV(key, value) } func stopDecider(reason interface{}) actor.Directive { @@ -55,7 +106,7 @@ func (syncer *PushSyncer) Receive(context actor.Context) { context.SetReceiveTimeout(syncerReceiveTimeout) case *actor.ReceiveTimeout: syncer.Log.Infow("timeout") - context.Self().Poison() + syncer.poison(context) case *messages.DoPush: context.SetReceiveTimeout(syncerReceiveTimeout) syncer.handleDoPush(context, msg) @@ -80,12 +131,25 @@ func (syncer *PushSyncer) Receive(context actor.Context) { syncer.Log.Debugw("sync complete", "remote", syncer.remote, "length", time.Since(syncer.start)) context.CancelReceiveTimeout() if !syncer.sendingObjects { - context.Self().Poison() + syncer.poison(context) } + case *setContext: + syncer.setContext(msg.context) } } +func (syncer *PushSyncer) poison(context actor.Context) { + syncer.stopTrace() + context.Self().Poison() +} + +func (syncer *PushSyncer) stop(context actor.Context) { + syncer.stopTrace() + context.Self().Stop() +} + func (syncer *PushSyncer) handleDoPush(context actor.Context, msg *messages.DoPush) { + sp := syncer.startInitiatorTrace() syncer.start = time.Now() syncer.Log.Debugw("sync start", "now", syncer.start) var remoteGossiper *actor.PID @@ -94,20 +158,30 @@ func (syncer *PushSyncer) handleDoPush(context actor.Context, msg *messages.DoPu } syncer.remote = remoteGossiper syncer.Log.Debugw("requesting syncer", "remote", remoteGossiper.Id) + sp.SetTag("remote", remoteGossiper.String()) + + serialized, err := syncer.serializedContext() + if err != nil { + syncer.Log.Errorw("error serializing context", "err", err) + } resp, err := context.RequestFuture(remoteGossiper, &messages.GetSyncer{ - Kind: syncer.kind, + Kind: syncer.kind, + Context: serialized, }, 10*time.Second).Result() if err != nil { syncer.Log.Errorw("timeout waiting for remote syncer", "err", err, "remote", remoteGossiper.String()) - context.Self().Stop() + sp.SetTag("error", true) + sp.SetTag("timeout", true) + syncer.stop(context) return } switch remoteSyncer := resp.(type) { case *messages.NoSyncersAvailable: syncer.Log.Debugw("remote busy") - context.Self().Poison() + sp.SetTag("unavailable", true) + syncer.poison(context) case *messages.SyncerAvailable: destination := extmsgs.FromActorPid(remoteSyncer.Destination) syncer.Log.Debugw("requesting strata") @@ -130,6 +204,9 @@ func (syncer *PushSyncer) handleDoPush(context actor.Context, msg *messages.DoPu } func (syncer *PushSyncer) handleProvideStrata(context actor.Context, msg *messages.ProvideStrata) { + sp := syncer.newSpan("handleProvideStrata") + defer sp.Finish() + syncer.Log.Debugw("handleProvideStrata") syncer.start = time.Now() syncer.remote = context.Sender() @@ -146,6 +223,7 @@ func (syncer *PushSyncer) handleProvideStrata(context actor.Context, msg *messag if result == nil { syncer.Log.Debugw("nil result") if count > 0 { + sp.SetTag("hasKeys", true) wantsToSend := count * 2 var sizeToSend int @@ -190,6 +268,8 @@ func (syncer *PushSyncer) handleProvideStrata(context actor.Context, msg *messag } func (syncer *PushSyncer) handleRequestIBF(context actor.Context, msg *messages.RequestIBF) { + sp := syncer.newSpan("handleRequestIBF") + defer sp.Finish() syncer.Log.Debugw("handleRequestIBF") wantsToSend := msg.Count * 2 var sizeToSend int @@ -221,6 +301,9 @@ func (syncer *PushSyncer) handleRequestIBF(context actor.Context, msg *messages. } func (syncer *PushSyncer) handleProvideBloomFilter(context actor.Context, msg *messages.ProvideBloomFilter) { + sp := syncer.newSpan("handleProvideBloomFilter") + defer sp.Finish() + localIBF, err := syncer.getLocalIBF(context, len(msg.Filter.Cells)) if err != nil { syncer.Log.Errorw("error getting local IBF", "err", err) @@ -238,10 +321,14 @@ func (syncer *PushSyncer) handleProvideBloomFilter(context actor.Context, msg *m } func (syncer *PushSyncer) handleRequestKeys(context actor.Context, msg *messages.RequestKeys) { + sp := syncer.newSpan("handleRequestKeys") + defer sp.Finish() syncer.sendPrefixes(context, msg.Keys, context.Sender()) } func (syncer *PushSyncer) getLocalIBF(context actor.Context, size int) (*ibf.InvertibleBloomFilter, error) { + sp := syncer.newSpan("getLocalIBF") + defer sp.Finish() localIBF, err := context.RequestFuture(syncer.storageActor, &messages.GetIBF{ Size: size, }, 30*time.Second).Result() @@ -253,6 +340,8 @@ func (syncer *PushSyncer) getLocalIBF(context actor.Context, size int) (*ibf.Inv } func (syncer *PushSyncer) handleDiff(context actor.Context, diff ibf.DecodeResults, destination *actor.PID) { + sp := syncer.newSpan("handleDiff") + defer sp.Finish() syncer.Log.Debugw("handleDiff") syncer.sendingObjects = true context.RequestWithCustomSender(context.Sender(), requestKeysFromDiff(diff.RightSet), syncer.storageActor) @@ -264,6 +353,8 @@ func (syncer *PushSyncer) handleDiff(context actor.Context, diff ibf.DecodeResul } func (syncer *PushSyncer) sendPrefixes(context actor.Context, prefixes []uint64, destination *actor.PID) { + sp := syncer.newSpan("sendPrefixes") + defer sp.Finish() sender := context.SpawnPrefix(NewObjectSenderProps(syncer.storageActor), "objectSender") for _, pref := range prefixes { context.Send(sender, &messages.SendPrefix{ @@ -280,7 +371,7 @@ func (syncer *PushSyncer) syncDone(context actor.Context) { if sender != nil { context.Request(context.Sender(), &messages.SyncDone{}) } - context.Self().Poison() + syncer.poison(context) } func requestKeysFromDiff(objs []ibf.ObjectId) *messages.RequestKeys { diff --git a/gossip3/messages/internal.go b/gossip3/messages/internal.go index 46967b2e..595cbfc7 100644 --- a/gossip3/messages/internal.go +++ b/gossip3/messages/internal.go @@ -31,7 +31,8 @@ func (dh *DestinationHolder) GetDestination() *extmsgs.ActorPID { } type GetSyncer struct { - Kind string + Kind string + Context map[string]string } func (GetSyncer) TypeCode() int8 { diff --git a/gossip3/messages/internal_gen.go b/gossip3/messages/internal_gen.go index 3dde59a2..751dd52f 100644 --- a/gossip3/messages/internal_gen.go +++ b/gossip3/messages/internal_gen.go @@ -178,6 +178,36 @@ func (z *GetSyncer) DecodeMsg(dc *msgp.Reader) (err error) { err = msgp.WrapError(err, "Kind") return } + case "Context": + var zb0002 uint32 + zb0002, err = dc.ReadMapHeader() + if err != nil { + err = msgp.WrapError(err, "Context") + return + } + if z.Context == nil { + z.Context = make(map[string]string, zb0002) + } else if len(z.Context) > 0 { + for key := range z.Context { + delete(z.Context, key) + } + } + for zb0002 > 0 { + zb0002-- + var za0001 string + var za0002 string + za0001, err = dc.ReadString() + if err != nil { + err = msgp.WrapError(err, "Context") + return + } + za0002, err = dc.ReadString() + if err != nil { + err = msgp.WrapError(err, "Context", za0001) + return + } + z.Context[za0001] = za0002 + } default: err = dc.Skip() if err != nil { @@ -190,10 +220,10 @@ func (z *GetSyncer) DecodeMsg(dc *msgp.Reader) (err error) { } // EncodeMsg implements msgp.Encodable -func (z GetSyncer) EncodeMsg(en *msgp.Writer) (err error) { - // map header, size 1 +func (z *GetSyncer) EncodeMsg(en *msgp.Writer) (err error) { + // map header, size 2 // write "Kind" - err = en.Append(0x81, 0xa4, 0x4b, 0x69, 0x6e, 0x64) + err = en.Append(0x82, 0xa4, 0x4b, 0x69, 0x6e, 0x64) if err != nil { return } @@ -202,16 +232,45 @@ func (z GetSyncer) EncodeMsg(en *msgp.Writer) (err error) { err = msgp.WrapError(err, "Kind") return } + // write "Context" + err = en.Append(0xa7, 0x43, 0x6f, 0x6e, 0x74, 0x65, 0x78, 0x74) + if err != nil { + return + } + err = en.WriteMapHeader(uint32(len(z.Context))) + if err != nil { + err = msgp.WrapError(err, "Context") + return + } + for za0001, za0002 := range z.Context { + err = en.WriteString(za0001) + if err != nil { + err = msgp.WrapError(err, "Context") + return + } + err = en.WriteString(za0002) + if err != nil { + err = msgp.WrapError(err, "Context", za0001) + return + } + } return } // MarshalMsg implements msgp.Marshaler -func (z GetSyncer) MarshalMsg(b []byte) (o []byte, err error) { +func (z *GetSyncer) MarshalMsg(b []byte) (o []byte, err error) { o = msgp.Require(b, z.Msgsize()) - // map header, size 1 + // map header, size 2 // string "Kind" - o = append(o, 0x81, 0xa4, 0x4b, 0x69, 0x6e, 0x64) + o = append(o, 0x82, 0xa4, 0x4b, 0x69, 0x6e, 0x64) o = msgp.AppendString(o, z.Kind) + // string "Context" + o = append(o, 0xa7, 0x43, 0x6f, 0x6e, 0x74, 0x65, 0x78, 0x74) + o = msgp.AppendMapHeader(o, uint32(len(z.Context))) + for za0001, za0002 := range z.Context { + o = msgp.AppendString(o, za0001) + o = msgp.AppendString(o, za0002) + } return } @@ -239,6 +298,36 @@ func (z *GetSyncer) UnmarshalMsg(bts []byte) (o []byte, err error) { err = msgp.WrapError(err, "Kind") return } + case "Context": + var zb0002 uint32 + zb0002, bts, err = msgp.ReadMapHeaderBytes(bts) + if err != nil { + err = msgp.WrapError(err, "Context") + return + } + if z.Context == nil { + z.Context = make(map[string]string, zb0002) + } else if len(z.Context) > 0 { + for key := range z.Context { + delete(z.Context, key) + } + } + for zb0002 > 0 { + var za0001 string + var za0002 string + zb0002-- + za0001, bts, err = msgp.ReadStringBytes(bts) + if err != nil { + err = msgp.WrapError(err, "Context") + return + } + za0002, bts, err = msgp.ReadStringBytes(bts) + if err != nil { + err = msgp.WrapError(err, "Context", za0001) + return + } + z.Context[za0001] = za0002 + } default: bts, err = msgp.Skip(bts) if err != nil { @@ -252,8 +341,14 @@ func (z *GetSyncer) UnmarshalMsg(bts []byte) (o []byte, err error) { } // Msgsize returns an upper bound estimate of the number of bytes occupied by the serialized message -func (z GetSyncer) Msgsize() (s int) { - s = 1 + 5 + msgp.StringPrefixSize + len(z.Kind) +func (z *GetSyncer) Msgsize() (s int) { + s = 1 + 5 + msgp.StringPrefixSize + len(z.Kind) + 8 + msgp.MapHeaderSize + if z.Context != nil { + for za0001, za0002 := range z.Context { + _ = za0002 + s += msgp.StringPrefixSize + len(za0001) + msgp.StringPrefixSize + len(za0002) + } + } return } diff --git a/gossip3/tracing/tracing.go b/gossip3/tracing/tracing.go index be7b665a..e7177332 100644 --- a/gossip3/tracing/tracing.go +++ b/gossip3/tracing/tracing.go @@ -1,6 +1,7 @@ package tracing import ( + "io" "log" "github.com/opentracing/opentracing-go" @@ -11,11 +12,17 @@ import ( "go.elastic.co/apm/module/apmot" ) +var jaegerCloser io.Closer + func StartElastic() { opentracing.SetGlobalTracer(apmot.New()) } -func StartJaeger() { +func StopJaeger() { + jaegerCloser.Close() +} + +func StartJaeger(serviceName string) { // Sample configuration for testing. Use constant sampling to sample every trace // and enable LogSpan to log every span via configured Logger. cfg := jaegercfg.Configuration{ @@ -24,7 +31,7 @@ func StartJaeger() { Param: 1, }, Reporter: &jaegercfg.ReporterConfig{ - LogSpans: true, + LogSpans: false, }, } @@ -35,8 +42,8 @@ func StartJaeger() { jMetricsFactory := metrics.NullFactory // Initialize tracer with a logger and a metrics factory - _, err := cfg.InitGlobalTracer( - "testNode", + closer, err := cfg.InitGlobalTracer( + serviceName, jaegercfg.Logger(jLogger), jaegercfg.Metrics(jMetricsFactory), ) @@ -44,7 +51,7 @@ func StartJaeger() { log.Printf("Could not initialize jaeger tracer: %s", err.Error()) return } - // defer closer.Close() + jaegerCloser = closer // continue main() } From 42d392bd3c8a1f6d7a9037913f648c41dde4eb49 Mon Sep 17 00:00:00 2001 From: Topper Bowers Date: Mon, 18 Mar 2019 17:45:05 +0100 Subject: [PATCH 11/30] more tags, etc for tracing --- gossip3/actors/gossiper.go | 4 +++- gossip3/actors/pushsyncer.go | 7 +++++-- 2 files changed, 8 insertions(+), 3 deletions(-) diff --git a/gossip3/actors/gossiper.go b/gossip3/actors/gossiper.go index 266a2843..9304c21e 100644 --- a/gossip3/actors/gossiper.go +++ b/gossip3/actors/gossiper.go @@ -67,6 +67,7 @@ func (g *Gossiper) Receive(actorContext actor.Context) { case *actor.Restarting: g.Log.Infow("restarting") case *actor.Terminated: + // this is for when the pushers stop, we can queue up another push if _, ok := g.pids[currentPusherKey]; ok && msg.Who.Equal(g.pids[currentPusherKey]) { g.Log.Debugw("terminate", "doGossip", g.validatorClear) @@ -123,14 +124,15 @@ func (g *Gossiper) Receive(actorContext actor.Context) { sp = opentracing.StartSpan("pushSyncer-receiver-no-parent") } sp.SetTag("actor", actorContext.Self().String()) + sp.SetTag("syncersAvailable", g.syncersAvailable) ctx = opentracing.ContextWithSpan(ctx, sp) if g.syncersAvailable > 0 { receiveSyncer := actorContext.SpawnPrefix(g.pusherProps, remoteSyncerPrefix) + actorContext.Send(receiveSyncer, &setContext{context: ctx}) g.syncersAvailable-- available := &messages.SyncerAvailable{} available.SetDestination(extmsgs.ToActorPid(receiveSyncer)) - actorContext.Send(receiveSyncer, &setContext{context: ctx}) actorContext.Respond(available) } else { actorContext.Respond(&messages.NoSyncersAvailable{}) diff --git a/gossip3/actors/pushsyncer.go b/gossip3/actors/pushsyncer.go index 176c6672..c2734858 100644 --- a/gossip3/actors/pushsyncer.go +++ b/gossip3/actors/pushsyncer.go @@ -220,10 +220,10 @@ func (syncer *PushSyncer) handleProvideStrata(context actor.Context, msg *messag syncer.Log.Debugw("estimating strata") localStrata := localStrataInt.(*ibf.DifferenceStrata) count, result := localStrata.Estimate(msg.Strata) + sp.SetTag("count", count) if result == nil { syncer.Log.Debugw("nil result") if count > 0 { - sp.SetTag("hasKeys", true) wantsToSend := count * 2 var sizeToSend int @@ -238,6 +238,8 @@ func (syncer *PushSyncer) handleProvideStrata(context actor.Context, msg *messag syncer.syncDone(context) return } + sp.SetTag("IBFSize", sizeToSend) + localIBF, err := syncer.getLocalIBF(context, sizeToSend) if err != nil { syncer.Log.Errorw("error getting local IBF", "err", err) @@ -252,18 +254,19 @@ func (syncer *PushSyncer) handleProvideStrata(context actor.Context, msg *messag }, }) } else { + sp.SetTag("synced", true) syncer.Log.Debugw("synced", "remote", context.Sender()) syncer.syncDone(context) } } else { if len(result.LeftSet) == 0 && len(result.RightSet) == 0 { + sp.SetTag("synced", true) syncer.Log.Debugw("synced", "remote", context.Sender()) syncer.syncDone(context) } else { syncer.Log.Debugw("strata", "count", count, "resultL", len(result.LeftSet), "resultR", len(result.RightSet)) syncer.handleDiff(context, *result, extmsgs.FromActorPid(msg.Destination)) } - } } From bb563dd608776bd8ba8a351be7263136d99dd9ae Mon Sep 17 00:00:00 2001 From: Topper Bowers Date: Mon, 18 Mar 2019 17:45:19 +0100 Subject: [PATCH 12/30] Enable jaeger tracing for the test node --- cmd/testnode.go | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/cmd/testnode.go b/cmd/testnode.go index 661b64dd..f60f35e6 100644 --- a/cmd/testnode.go +++ b/cmd/testnode.go @@ -61,7 +61,7 @@ var testnodeCmd = &cobra.Command{ blsKeyHex := os.Getenv("TUPELO_NODE_BLS_KEY_HEX") signer := setupGossipNode(ctx, ecdsaKeyHex, blsKeyHex, "distributed-network", testnodePort) actor.EmptyRootContext.Send(signer.Actor, &messages.StartGossip{}) - tracing.StartJaeger() + tracing.StartJaeger("signer-" + signer.ID) stopOnSignal(signer) }, } @@ -76,7 +76,7 @@ func setupNotaryGroup(local *gossip3types.Signer, keys []*PublicKeySet) *gossip3 if local != nil { group.AddSigner(local) } -w + w for _, keySet := range keys { ecdsaBytes := hexutil.MustDecode(keySet.EcdsaHexPublicKey) if local != nil && bytes.Equal(crypto.FromECDSAPub(local.DstKey), ecdsaBytes) { @@ -164,6 +164,7 @@ func stopOnSignal(signers ...*gossip3types.Signer) { for _, signer := range signers { log.Info("gracefully stopping signer") signer.Actor.GracefulStop() + tracing.StopJaeger() } done <- true }() From d2c91bb31771af8b780fef62b31f78ec8f184534 Mon Sep 17 00:00:00 2001 From: Topper Bowers Date: Tue, 19 Mar 2019 11:40:59 +0100 Subject: [PATCH 13/30] move all the tracing code into a single package that you can easily include into your actor structs --- gossip3/actors/conflictsetrouter_test.go | 5 +- gossip3/actors/gossiper.go | 13 +-- gossip3/actors/pushsyncer.go | 72 +++----------- gossip3/actors/validator.go | 2 +- gossip3/messages/messages.go | 36 +------ gossip3/tracing/interfaces.go | 116 +++++++++++++++++++++++ 6 files changed, 142 insertions(+), 102 deletions(-) create mode 100644 gossip3/tracing/interfaces.go diff --git a/gossip3/actors/conflictsetrouter_test.go b/gossip3/actors/conflictsetrouter_test.go index c9a49db1..bffcf98f 100644 --- a/gossip3/actors/conflictsetrouter_test.go +++ b/gossip3/actors/conflictsetrouter_test.go @@ -1,7 +1,6 @@ package actors import ( - "context" "strconv" "testing" "time" @@ -10,7 +9,6 @@ import ( "github.com/AsynkronIT/protoactor-go/plugin" "github.com/ethereum/go-ethereum/common/hexutil" "github.com/ethereum/go-ethereum/crypto" - "github.com/opentracing/opentracing-go" "github.com/quorumcontrol/storage" extmsgs "github.com/quorumcontrol/tupelo-go-client/gossip3/messages" "github.com/quorumcontrol/tupelo-go-client/gossip3/middleware" @@ -313,7 +311,6 @@ func fakeValidateTransaction(t testing.TB, trans *extmsgs.Transaction) *messages bits, err := trans.MarshalMsg(nil) require.Nil(t, err) key := crypto.Keccak256(bits) - _, ctx := opentracing.StartSpanFromContext(context.Background(), "transaction") wrapper := &messages.TransactionWrapper{ TransactionID: key, Transaction: trans, @@ -323,8 +320,8 @@ func fakeValidateTransaction(t testing.TB, trans *extmsgs.Transaction) *messages PreFlight: true, Accepted: true, Metadata: messages.MetadataMap{"seen": time.Now()}, - Context: ctx, } + wrapper.StartTrace("transaction") return wrapper } diff --git a/gossip3/actors/gossiper.go b/gossip3/actors/gossiper.go index 9304c21e..a3655ed0 100644 --- a/gossip3/actors/gossiper.go +++ b/gossip3/actors/gossiper.go @@ -12,6 +12,7 @@ import ( extmsgs "github.com/quorumcontrol/tupelo-go-client/gossip3/messages" "github.com/quorumcontrol/tupelo-go-client/gossip3/middleware" "github.com/quorumcontrol/tupelo/gossip3/messages" + "github.com/quorumcontrol/tupelo/gossip3/tracing" ) type system interface { @@ -114,15 +115,15 @@ func (g *Gossiper) Receive(actorContext actor.Context) { }) case *messages.GetSyncer: g.Log.Debugw("GetSyncer", "remote", actorContext.Sender().GetId()) + ctx := context.Background() - spanContext, err := opentracing.GlobalTracer().Extract(opentracing.TextMap, opentracing.TextMapCarrier(msg.Context)) - var sp opentracing.Span - if err == nil { - sp = opentracing.StartSpan("pushSyncer-receiver", opentracing.ChildOf(spanContext)) - } else { - g.Log.Warnw("error decoding remote context", "err", err, "msg", msg, "from", actorContext.Sender().String()) + sp, err := tracing.SpanContextFromSerialized(msg.Context, "pushSyncer-receiver") + + if err != nil { + // g.Log.Warnw("error decoding remote context", "err", err, "msg", msg, "from", actorContext.Sender().String()) sp = opentracing.StartSpan("pushSyncer-receiver-no-parent") } + sp.SetTag("actor", actorContext.Self().String()) sp.SetTag("syncersAvailable", g.syncersAvailable) ctx = opentracing.ContextWithSpan(ctx, sp) diff --git a/gossip3/actors/pushsyncer.go b/gossip3/actors/pushsyncer.go index c2734858..b2b41004 100644 --- a/gossip3/actors/pushsyncer.go +++ b/gossip3/actors/pushsyncer.go @@ -8,11 +8,11 @@ import ( "github.com/AsynkronIT/protoactor-go/actor" "github.com/AsynkronIT/protoactor-go/plugin" - "github.com/opentracing/opentracing-go" "github.com/quorumcontrol/differencedigest/ibf" extmsgs "github.com/quorumcontrol/tupelo-go-client/gossip3/messages" "github.com/quorumcontrol/tupelo-go-client/gossip3/middleware" "github.com/quorumcontrol/tupelo/gossip3/messages" + "github.com/quorumcontrol/tupelo/gossip3/tracing" ) type setContext struct { @@ -23,57 +23,13 @@ type setContext struct { // Sending out syncs type PushSyncer struct { middleware.LogAwareHolder + tracing.ContextHolder start time.Time kind string storageActor *actor.PID remote *actor.PID sendingObjects bool - context context.Context -} - -type contextPushSyncerKey struct{} - -var parentPushSyncerKey = contextPushSyncerKey{} - -func (ps *PushSyncer) startInitiatorTrace() opentracing.Span { - parent, ctx := opentracing.StartSpanFromContext(context.Background(), "push-syncer") - ctx = context.WithValue(ctx, parentPushSyncerKey, parent) - ps.context = ctx - return parent -} - -func (ps *PushSyncer) stopTrace() { - val := ps.context.Value(parentPushSyncerKey) - val.(opentracing.Span).Finish() -} - -func (ps *PushSyncer) newSpan(name string) opentracing.Span { - sp, ctx := opentracing.StartSpanFromContext(ps.context, name) - ps.context = ctx - return sp -} - -func (ps *PushSyncer) serializedContext() (map[string]string, error) { - serializedContext := make(map[string]string) - sp := opentracing.SpanFromContext(ps.context) - err := opentracing.GlobalTracer().Inject(sp.Context(), opentracing.TextMap, opentracing.TextMapCarrier(serializedContext)) - if err != nil { - return nil, fmt.Errorf("error injecting: %v", err) - } - ps.Log.Debugw("serialized", "obj", serializedContext) - return serializedContext, nil -} - -func (ps *PushSyncer) setContext(ctx context.Context) { - parent := opentracing.SpanFromContext(ctx) - ctx = context.WithValue(ctx, parentPushSyncerKey, parent) - ps.context = ctx -} - -func (ps *PushSyncer) LogKV(key string, value interface{}) { - sp := opentracing.SpanFromContext(ps.context) - sp.LogKV(key, value) } func stopDecider(reason interface{}) actor.Directive { @@ -134,22 +90,22 @@ func (syncer *PushSyncer) Receive(context actor.Context) { syncer.poison(context) } case *setContext: - syncer.setContext(msg.context) + syncer.SetContext(msg.context) } } func (syncer *PushSyncer) poison(context actor.Context) { - syncer.stopTrace() + syncer.StopTrace() context.Self().Poison() } func (syncer *PushSyncer) stop(context actor.Context) { - syncer.stopTrace() + syncer.StopTrace() context.Self().Stop() } func (syncer *PushSyncer) handleDoPush(context actor.Context, msg *messages.DoPush) { - sp := syncer.startInitiatorTrace() + sp := syncer.StartTrace("push-syncer") syncer.start = time.Now() syncer.Log.Debugw("sync start", "now", syncer.start) var remoteGossiper *actor.PID @@ -160,7 +116,7 @@ func (syncer *PushSyncer) handleDoPush(context actor.Context, msg *messages.DoPu syncer.Log.Debugw("requesting syncer", "remote", remoteGossiper.Id) sp.SetTag("remote", remoteGossiper.String()) - serialized, err := syncer.serializedContext() + serialized, err := syncer.SerializedContext() if err != nil { syncer.Log.Errorw("error serializing context", "err", err) } @@ -204,7 +160,7 @@ func (syncer *PushSyncer) handleDoPush(context actor.Context, msg *messages.DoPu } func (syncer *PushSyncer) handleProvideStrata(context actor.Context, msg *messages.ProvideStrata) { - sp := syncer.newSpan("handleProvideStrata") + sp := syncer.NewSpan("handleProvideStrata") defer sp.Finish() syncer.Log.Debugw("handleProvideStrata") @@ -271,7 +227,7 @@ func (syncer *PushSyncer) handleProvideStrata(context actor.Context, msg *messag } func (syncer *PushSyncer) handleRequestIBF(context actor.Context, msg *messages.RequestIBF) { - sp := syncer.newSpan("handleRequestIBF") + sp := syncer.NewSpan("handleRequestIBF") defer sp.Finish() syncer.Log.Debugw("handleRequestIBF") wantsToSend := msg.Count * 2 @@ -304,7 +260,7 @@ func (syncer *PushSyncer) handleRequestIBF(context actor.Context, msg *messages. } func (syncer *PushSyncer) handleProvideBloomFilter(context actor.Context, msg *messages.ProvideBloomFilter) { - sp := syncer.newSpan("handleProvideBloomFilter") + sp := syncer.NewSpan("handleProvideBloomFilter") defer sp.Finish() localIBF, err := syncer.getLocalIBF(context, len(msg.Filter.Cells)) @@ -324,13 +280,13 @@ func (syncer *PushSyncer) handleProvideBloomFilter(context actor.Context, msg *m } func (syncer *PushSyncer) handleRequestKeys(context actor.Context, msg *messages.RequestKeys) { - sp := syncer.newSpan("handleRequestKeys") + sp := syncer.NewSpan("handleRequestKeys") defer sp.Finish() syncer.sendPrefixes(context, msg.Keys, context.Sender()) } func (syncer *PushSyncer) getLocalIBF(context actor.Context, size int) (*ibf.InvertibleBloomFilter, error) { - sp := syncer.newSpan("getLocalIBF") + sp := syncer.NewSpan("getLocalIBF") defer sp.Finish() localIBF, err := context.RequestFuture(syncer.storageActor, &messages.GetIBF{ Size: size, @@ -343,7 +299,7 @@ func (syncer *PushSyncer) getLocalIBF(context actor.Context, size int) (*ibf.Inv } func (syncer *PushSyncer) handleDiff(context actor.Context, diff ibf.DecodeResults, destination *actor.PID) { - sp := syncer.newSpan("handleDiff") + sp := syncer.NewSpan("handleDiff") defer sp.Finish() syncer.Log.Debugw("handleDiff") syncer.sendingObjects = true @@ -356,7 +312,7 @@ func (syncer *PushSyncer) handleDiff(context actor.Context, diff ibf.DecodeResul } func (syncer *PushSyncer) sendPrefixes(context actor.Context, prefixes []uint64, destination *actor.PID) { - sp := syncer.newSpan("sendPrefixes") + sp := syncer.NewSpan("sendPrefixes") defer sp.Finish() sender := context.SpawnPrefix(NewObjectSenderProps(syncer.storageActor), "objectSender") for _, pref := range prefixes { diff --git a/gossip3/actors/validator.go b/gossip3/actors/validator.go index d3caa6e2..3145ed9e 100644 --- a/gossip3/actors/validator.go +++ b/gossip3/actors/validator.go @@ -78,7 +78,7 @@ func (tv *TransactionValidator) handleRequest(actorCtx actor.Context, msg *valid Stale: false, Metadata: messages.MetadataMap{"seen": time.Now()}, } - parentSpan := wrapper.StartTrace() + parentSpan := wrapper.StartTrace("transaction") sp := wrapper.NewSpan("validator") defer sp.Finish() diff --git a/gossip3/messages/messages.go b/gossip3/messages/messages.go index c45d7f3a..b71a456b 100644 --- a/gossip3/messages/messages.go +++ b/gossip3/messages/messages.go @@ -1,12 +1,10 @@ package messages import ( - "context" - "github.com/AsynkronIT/protoactor-go/actor" - "github.com/opentracing/opentracing-go" extmsgs "github.com/quorumcontrol/tupelo-go-client/gossip3/messages" "github.com/quorumcontrol/tupelo-go-client/gossip3/types" + "github.com/quorumcontrol/tupelo/gossip3/tracing" ) type MetadataMap map[string]interface{} @@ -98,6 +96,8 @@ type CurrentStateWrapper struct { } type TransactionWrapper struct { + tracing.ContextHolder + ConflictSetID string TransactionID []byte Transaction *extmsgs.Transaction @@ -107,36 +107,6 @@ type TransactionWrapper struct { Key []byte Value []byte Metadata MetadataMap - Context context.Context -} - -type contextSpanKey struct{} - -var parentSpanKey = contextSpanKey{} - -// StartTrace starts the parent trace of a transactionwrapper -func (tw *TransactionWrapper) StartTrace() opentracing.Span { - parent, ctx := opentracing.StartSpanFromContext(context.Background(), "transaction") - ctx = context.WithValue(ctx, parentSpanKey, parent) - tw.Context = ctx - return parent -} - -// StopTrace stops the parent trace of a transactionwrapper -func (tw *TransactionWrapper) StopTrace() { - val := tw.Context.Value(parentSpanKey) - val.(opentracing.Span).Finish() -} - -func (tw *TransactionWrapper) NewSpan(name string) opentracing.Span { - sp, ctx := opentracing.StartSpanFromContext(tw.Context, name) - tw.Context = ctx - return sp -} - -func (tw *TransactionWrapper) LogKV(key string, value interface{}) { - sp := opentracing.SpanFromContext(tw.Context) - sp.LogKV(key, value) } type MemPoolCleanup struct { diff --git a/gossip3/tracing/interfaces.go b/gossip3/tracing/interfaces.go new file mode 100644 index 00000000..b202bfbb --- /dev/null +++ b/gossip3/tracing/interfaces.go @@ -0,0 +1,116 @@ +package tracing + +import ( + "context" + "fmt" + + "github.com/opentracing/opentracing-go" +) + +type contextSpanKey struct{} + +var parentSpanKey = contextSpanKey{} + +// ContextHolder is a struct that you can include in your actor structs +// in order to make them easily traceable +type ContextHolder struct { + context context.Context +} + +// Contextable defines an interface for getting and setting a context +// ContentHolder implements the interface. +type Contextable interface { + SetContext(ctx context.Context) + GetContext() context.Context +} + +// Traceable defines the interface necessary to make an +// actor struct traceable. ContextHolder implements this interface. +type Traceable interface { + StartTrace() opentracing.Span + StopTrace() + NewSpan(name string) opentracing.Span + LogKV(key string, value interface{}) + SerializedContext() (map[string]string, error) + RehydrateSerialized(serialized map[string]string, childName string) (opentracing.Span, error) +} + +// StartTrace starts the parent trace of a transactionwrapper +func (ch *ContextHolder) StartTrace(name string) opentracing.Span { + parent, ctx := opentracing.StartSpanFromContext(context.Background(), name) + ctx = context.WithValue(ctx, parentSpanKey, parent) + ch.context = ctx + return parent +} + +// StopTrace stops the parent trace of a transactionwrapper +func (ch *ContextHolder) StopTrace() { + val := ch.context.Value(parentSpanKey) + val.(opentracing.Span).Finish() +} + +// NewSpan returns a new span as a child of whatever span is +// already in the context. +func (ch *ContextHolder) NewSpan(name string) opentracing.Span { + sp, ctx := opentracing.StartSpanFromContext(ch.context, name) + ch.context = ctx + return sp +} + +// LogKV logs a key/value pair to the current span +func (ch *ContextHolder) LogKV(key string, value interface{}) { + sp := opentracing.SpanFromContext(ch.context) + sp.LogKV(key, value) +} + +// SetContext overrides the current context of the ContextHolder +func (ch *ContextHolder) SetContext(ctx context.Context) { + parent := opentracing.SpanFromContext(ctx) + if parent != nil { + ctx = context.WithValue(ctx, parentSpanKey, parent) + } + ch.context = ctx +} + +// GetContext returns the current context +func (ch *ContextHolder) GetContext() context.Context { + return ch.context +} + +// SerializedContext returns a text map of the current span context +func (ch *ContextHolder) SerializedContext() (map[string]string, error) { + serializedContext := make(map[string]string) + sp := opentracing.SpanFromContext(ch.context) + err := opentracing.GlobalTracer().Inject(sp.Context(), opentracing.TextMap, opentracing.TextMapCarrier(serializedContext)) + if err != nil { + return nil, fmt.Errorf("error injecting: %v", err) + } + return serializedContext, nil +} + +// RehydrateSerialized takes the output of SerializedContext and starts a new span with the childName and sets up +// the context to the correct value. +// WARNING : this will overwrite any context that has previously been set (this is usually the first thing to be called) +func (ch *ContextHolder) RehydrateSerialized(serialized map[string]string, childName string) (opentracing.Span, error) { + ctx := context.Background() + sp, err := SpanContextFromSerialized(serialized, childName) + if err != nil { + return nil, fmt.Errorf("error deserializing: %v", err) + } + ctx = opentracing.ContextWithSpan(ctx, sp) + + ch.SetContext(ctx) + return sp, nil +} + +// SpanContextFromSerialized takes the output of SerializedContext and starts a new span with the childName +func SpanContextFromSerialized(serialized map[string]string, childName string) (opentracing.Span, error) { + spanContext, err := opentracing.GlobalTracer().Extract(opentracing.TextMap, opentracing.TextMapCarrier(serialized)) + var sp opentracing.Span + if err != nil { + return nil, fmt.Errorf("error rehydrating: %v", err) + } + + sp = opentracing.StartSpan(childName, opentracing.ChildOf(spanContext)) + return sp, nil +} From f85be65f0d24bf321c915de0a3a6c7e738deb8e5 Mon Sep 17 00:00:00 2001 From: Topper Bowers Date: Tue, 19 Mar 2019 14:54:26 +0100 Subject: [PATCH 14/30] remove tracing (moved over to tupelo-go-client) --- gossip3/tracing/interfaces.go | 116 ---------------------------------- gossip3/tracing/tracing.go | 57 ----------------- 2 files changed, 173 deletions(-) delete mode 100644 gossip3/tracing/interfaces.go delete mode 100644 gossip3/tracing/tracing.go diff --git a/gossip3/tracing/interfaces.go b/gossip3/tracing/interfaces.go deleted file mode 100644 index b202bfbb..00000000 --- a/gossip3/tracing/interfaces.go +++ /dev/null @@ -1,116 +0,0 @@ -package tracing - -import ( - "context" - "fmt" - - "github.com/opentracing/opentracing-go" -) - -type contextSpanKey struct{} - -var parentSpanKey = contextSpanKey{} - -// ContextHolder is a struct that you can include in your actor structs -// in order to make them easily traceable -type ContextHolder struct { - context context.Context -} - -// Contextable defines an interface for getting and setting a context -// ContentHolder implements the interface. -type Contextable interface { - SetContext(ctx context.Context) - GetContext() context.Context -} - -// Traceable defines the interface necessary to make an -// actor struct traceable. ContextHolder implements this interface. -type Traceable interface { - StartTrace() opentracing.Span - StopTrace() - NewSpan(name string) opentracing.Span - LogKV(key string, value interface{}) - SerializedContext() (map[string]string, error) - RehydrateSerialized(serialized map[string]string, childName string) (opentracing.Span, error) -} - -// StartTrace starts the parent trace of a transactionwrapper -func (ch *ContextHolder) StartTrace(name string) opentracing.Span { - parent, ctx := opentracing.StartSpanFromContext(context.Background(), name) - ctx = context.WithValue(ctx, parentSpanKey, parent) - ch.context = ctx - return parent -} - -// StopTrace stops the parent trace of a transactionwrapper -func (ch *ContextHolder) StopTrace() { - val := ch.context.Value(parentSpanKey) - val.(opentracing.Span).Finish() -} - -// NewSpan returns a new span as a child of whatever span is -// already in the context. -func (ch *ContextHolder) NewSpan(name string) opentracing.Span { - sp, ctx := opentracing.StartSpanFromContext(ch.context, name) - ch.context = ctx - return sp -} - -// LogKV logs a key/value pair to the current span -func (ch *ContextHolder) LogKV(key string, value interface{}) { - sp := opentracing.SpanFromContext(ch.context) - sp.LogKV(key, value) -} - -// SetContext overrides the current context of the ContextHolder -func (ch *ContextHolder) SetContext(ctx context.Context) { - parent := opentracing.SpanFromContext(ctx) - if parent != nil { - ctx = context.WithValue(ctx, parentSpanKey, parent) - } - ch.context = ctx -} - -// GetContext returns the current context -func (ch *ContextHolder) GetContext() context.Context { - return ch.context -} - -// SerializedContext returns a text map of the current span context -func (ch *ContextHolder) SerializedContext() (map[string]string, error) { - serializedContext := make(map[string]string) - sp := opentracing.SpanFromContext(ch.context) - err := opentracing.GlobalTracer().Inject(sp.Context(), opentracing.TextMap, opentracing.TextMapCarrier(serializedContext)) - if err != nil { - return nil, fmt.Errorf("error injecting: %v", err) - } - return serializedContext, nil -} - -// RehydrateSerialized takes the output of SerializedContext and starts a new span with the childName and sets up -// the context to the correct value. -// WARNING : this will overwrite any context that has previously been set (this is usually the first thing to be called) -func (ch *ContextHolder) RehydrateSerialized(serialized map[string]string, childName string) (opentracing.Span, error) { - ctx := context.Background() - sp, err := SpanContextFromSerialized(serialized, childName) - if err != nil { - return nil, fmt.Errorf("error deserializing: %v", err) - } - ctx = opentracing.ContextWithSpan(ctx, sp) - - ch.SetContext(ctx) - return sp, nil -} - -// SpanContextFromSerialized takes the output of SerializedContext and starts a new span with the childName -func SpanContextFromSerialized(serialized map[string]string, childName string) (opentracing.Span, error) { - spanContext, err := opentracing.GlobalTracer().Extract(opentracing.TextMap, opentracing.TextMapCarrier(serialized)) - var sp opentracing.Span - if err != nil { - return nil, fmt.Errorf("error rehydrating: %v", err) - } - - sp = opentracing.StartSpan(childName, opentracing.ChildOf(spanContext)) - return sp, nil -} diff --git a/gossip3/tracing/tracing.go b/gossip3/tracing/tracing.go deleted file mode 100644 index e7177332..00000000 --- a/gossip3/tracing/tracing.go +++ /dev/null @@ -1,57 +0,0 @@ -package tracing - -import ( - "io" - "log" - - "github.com/opentracing/opentracing-go" - "github.com/uber/jaeger-client-go" - jaegercfg "github.com/uber/jaeger-client-go/config" - jaegerlog "github.com/uber/jaeger-client-go/log" - "github.com/uber/jaeger-lib/metrics" - "go.elastic.co/apm/module/apmot" -) - -var jaegerCloser io.Closer - -func StartElastic() { - opentracing.SetGlobalTracer(apmot.New()) -} - -func StopJaeger() { - jaegerCloser.Close() -} - -func StartJaeger(serviceName string) { - // Sample configuration for testing. Use constant sampling to sample every trace - // and enable LogSpan to log every span via configured Logger. - cfg := jaegercfg.Configuration{ - Sampler: &jaegercfg.SamplerConfig{ - Type: jaeger.SamplerTypeConst, - Param: 1, - }, - Reporter: &jaegercfg.ReporterConfig{ - LogSpans: false, - }, - } - - // Example logger and metrics factory. Use github.com/uber/jaeger-client-go/log - // and github.com/uber/jaeger-lib/metrics respectively to bind to real logging and metrics - // frameworks. - jLogger := jaegerlog.StdLogger - jMetricsFactory := metrics.NullFactory - - // Initialize tracer with a logger and a metrics factory - closer, err := cfg.InitGlobalTracer( - serviceName, - jaegercfg.Logger(jLogger), - jaegercfg.Metrics(jMetricsFactory), - ) - if err != nil { - log.Printf("Could not initialize jaeger tracer: %s", err.Error()) - return - } - jaegerCloser = closer - - // continue main() -} From fd10313ee74e3867c30a14b81ea807f20097ab6a Mon Sep 17 00:00:00 2001 From: Topper Bowers Date: Tue, 19 Mar 2019 14:54:39 +0100 Subject: [PATCH 15/30] Bring in new tupelo-go-client (fix/timeout branch) --- Gopkg.lock | 30 +++++++++++++----------------- Gopkg.toml | 2 +- 2 files changed, 14 insertions(+), 18 deletions(-) diff --git a/Gopkg.lock b/Gopkg.lock index fc2f2e32..16cc0363 100644 --- a/Gopkg.lock +++ b/Gopkg.lock @@ -987,6 +987,14 @@ pruneopts = "" revision = "aac704a3f4f27190b4ccc05f303a4931fd1241ff" +[[projects]] + branch = "master" + digest = "1:521a4970efd87aee222f294b7f0c281266fd4325a3e5c31bda725dff082d120a" + name = "github.com/joeshaw/multierror" + packages = ["."] + pruneopts = "" + revision = "69b34d4ec901851247ae7e77d33909caf9df99ed" + [[projects]] digest = "1:7df5a9695a743c3e1626b28bb8741602c8c15527e1efaeaec48ab2ff9a23f74c" name = "github.com/joho/godotenv" @@ -1011,14 +1019,6 @@ revision = "f55edac94c9bbba5d6182a4be46d86a2c9b5b50e" version = "v1.0.2" -[[projects]] - branch = "master" - digest = "1:521a4970efd87aee222f294b7f0c281266fd4325a3e5c31bda725dff082d120a" - name = "github.com/joeshaw/multierror" - packages = ["."] - pruneopts = "" - revision = "69b34d4ec901851247ae7e77d33909caf9df99ed" - [[projects]] branch = "master" digest = "1:560e150c2fcf04a92c22029205edb57cd0282a545dd949165dae5cd65bce1204" @@ -1319,7 +1319,7 @@ version = "v1.1.1" [[projects]] - digest = "1:56769dee14e4fb99c8c3c65e066613c2e6e8b22ae6ac7552c4b8ddd1379b9eb0" + digest = "1:b20ff72e5522808d8073e2cc8c06457fe3ea213393753297f262de220425ca05" name = "github.com/quorumcontrol/tupelo-go-client" packages = [ "bls", @@ -1331,10 +1331,10 @@ "gossip3/testhelpers", "gossip3/types", "p2p", + "tracing", ] pruneopts = "" - revision = "d9679dfa00fd2c7c33e6f406e56530dc56729250" - version = "v0.1.0" + revision = "31c342087121ebf47d1f6ba07e2e575764be7fd9" [[projects]] digest = "1:d367886e3a8134415fad58fb2ac44e2f38aa88068adca7a02d59a555f87085c0" @@ -1804,6 +1804,7 @@ analyzer-version = 1 input-imports = [ "github.com/AsynkronIT/protoactor-go/actor", + "github.com/AsynkronIT/protoactor-go/mailbox", "github.com/AsynkronIT/protoactor-go/plugin", "github.com/AsynkronIT/protoactor-go/router", "github.com/Workiva/go-datastructures/bitarray", @@ -1834,7 +1835,6 @@ "github.com/ipsn/go-ipfs/plugin/loader", "github.com/ipsn/go-ipfs/repo/fsrepo", "github.com/mitchellh/go-homedir", - "github.com/opentracing/opentracing-go", "github.com/quorumcontrol/chaintree/chaintree", "github.com/quorumcontrol/chaintree/dag", "github.com/quorumcontrol/chaintree/nodestore", @@ -1851,17 +1851,13 @@ "github.com/quorumcontrol/tupelo-go-client/gossip3/testhelpers", "github.com/quorumcontrol/tupelo-go-client/gossip3/types", "github.com/quorumcontrol/tupelo-go-client/p2p", + "github.com/quorumcontrol/tupelo-go-client/tracing", "github.com/shibukawa/configdir", "github.com/spf13/cobra", "github.com/spf13/viper", "github.com/stretchr/testify/assert", "github.com/stretchr/testify/require", "github.com/tinylib/msgp/msgp", - "github.com/uber/jaeger-client-go", - "github.com/uber/jaeger-client-go/config", - "github.com/uber/jaeger-client-go/log", - "github.com/uber/jaeger-lib/metrics", - "go.elastic.co/apm/module/apmot", "golang.org/x/net/context", "google.golang.org/grpc", "google.golang.org/grpc/codes", diff --git a/Gopkg.toml b/Gopkg.toml index 2a514013..0156331e 100644 --- a/Gopkg.toml +++ b/Gopkg.toml @@ -56,7 +56,7 @@ [[constraint]] name = "github.com/quorumcontrol/tupelo-go-client" - version = "v0.1.0" + revision = "31c342087121ebf47d1f6ba07e2e575764be7fd9" # fix/timeouts [[constraint]] name = "github.com/gobuffalo/packr" From b681c160eb5a21ec9499c600ad7e1fb7de104154 Mon Sep 17 00:00:00 2001 From: Topper Bowers Date: Tue, 19 Mar 2019 14:55:22 +0100 Subject: [PATCH 16/30] use new tupelo-go-client tracing and add synchronized dispatchers --- cmd/testnode.go | 6 +- gossip3/actors/conflictsetrouter.go | 3 +- gossip3/actors/gossiper.go | 55 ++++++++++---- gossip3/actors/pushsyncer.go | 17 ++--- gossip3/actors/tupelo.go | 3 +- gossip3/messages/internal.go | 8 +- gossip3/messages/internal_gen.go | 111 ++-------------------------- gossip3/messages/messages.go | 2 +- 8 files changed, 68 insertions(+), 137 deletions(-) diff --git a/cmd/testnode.go b/cmd/testnode.go index f60f35e6..ac3d07b8 100644 --- a/cmd/testnode.go +++ b/cmd/testnode.go @@ -34,9 +34,9 @@ import ( gossip3remote "github.com/quorumcontrol/tupelo-go-client/gossip3/remote" gossip3types "github.com/quorumcontrol/tupelo-go-client/gossip3/types" "github.com/quorumcontrol/tupelo-go-client/p2p" + "github.com/quorumcontrol/tupelo-go-client/tracing" gossip3actors "github.com/quorumcontrol/tupelo/gossip3/actors" "github.com/quorumcontrol/tupelo/gossip3/messages" - "github.com/quorumcontrol/tupelo/gossip3/tracing" "github.com/spf13/cobra" ) @@ -163,13 +163,13 @@ func stopOnSignal(signers ...*gossip3types.Signer) { fmt.Println(sig) for _, signer := range signers { log.Info("gracefully stopping signer") - signer.Actor.GracefulStop() - tracing.StopJaeger() + signer.Actor.GracefulPoison() } done <- true }() fmt.Println("awaiting signal") <-done + tracing.StopJaeger() fmt.Println("exiting") } diff --git a/gossip3/actors/conflictsetrouter.go b/gossip3/actors/conflictsetrouter.go index a6f5dbe3..59d9087a 100644 --- a/gossip3/actors/conflictsetrouter.go +++ b/gossip3/actors/conflictsetrouter.go @@ -4,6 +4,7 @@ import ( "fmt" "github.com/AsynkronIT/protoactor-go/actor" + "github.com/AsynkronIT/protoactor-go/mailbox" "github.com/AsynkronIT/protoactor-go/plugin" "github.com/ethereum/go-ethereum/common/hexutil" iradix "github.com/hashicorp/go-immutable-radix" @@ -54,7 +55,7 @@ func NewConflictSetRouterProps(cfg *ConflictSetRouterConfig) *actor.Props { }).WithReceiverMiddleware( middleware.LoggingMiddleware, plugin.Use(&middleware.LogPlugin{}), - ) + ).WithDispatcher(mailbox.NewSynchronizedDispatcher(300)) } func (csr *ConflictSetRouter) nextHeight(objectID []byte) uint64 { diff --git a/gossip3/actors/gossiper.go b/gossip3/actors/gossiper.go index a3655ed0..dee887eb 100644 --- a/gossip3/actors/gossiper.go +++ b/gossip3/actors/gossiper.go @@ -1,18 +1,17 @@ package actors import ( - "context" "fmt" "strings" "time" "github.com/AsynkronIT/protoactor-go/actor" + "github.com/AsynkronIT/protoactor-go/mailbox" "github.com/AsynkronIT/protoactor-go/plugin" - "github.com/opentracing/opentracing-go" extmsgs "github.com/quorumcontrol/tupelo-go-client/gossip3/messages" "github.com/quorumcontrol/tupelo-go-client/gossip3/middleware" + "github.com/quorumcontrol/tupelo-go-client/tracing" "github.com/quorumcontrol/tupelo/gossip3/messages" - "github.com/quorumcontrol/tupelo/gossip3/tracing" ) type system interface { @@ -25,6 +24,7 @@ const currentPusherKey = "currentPusher" // Gossiper is the root gossiper type Gossiper struct { middleware.LogAwareHolder + tracing.ContextHolder kind string pids map[string]*actor.PID @@ -42,6 +42,8 @@ type Gossiper struct { const maxSyncers = 3 func NewGossiperProps(kind string, storage *actor.PID, system system, pusherProps *actor.Props) *actor.Props { + supervisor := actor.NewOneForOneStrategy(1, 10, stopDecider) + return actor.PropsFromProducer(func() actor.Actor { return &Gossiper{ kind: kind, @@ -54,7 +56,9 @@ func NewGossiperProps(kind string, storage *actor.PID, system system, pusherProp }).WithReceiverMiddleware( middleware.LoggingMiddleware, plugin.Use(&middleware.LogPlugin{}), - ) + ).WithSupervisor( + supervisor, + ).WithDispatcher(mailbox.NewSynchronizedDispatcher(300)) } func (g *Gossiper) Receive(actorContext actor.Context) { @@ -65,10 +69,17 @@ func (g *Gossiper) Receive(actorContext actor.Context) { // } // }() switch msg := actorContext.Message().(type) { + case *actor.Started: + g.StartTrace("gossiper") + case *actor.Stopping: + g.Log.Warnw("stopping") + g.StopTrace() case *actor.Restarting: g.Log.Infow("restarting") case *actor.Terminated: - + sp := g.NewSpan("terminated") + defer sp.Finish() + sp.SetTag("who", msg.Who.Id) // this is for when the pushers stop, we can queue up another push if _, ok := g.pids[currentPusherKey]; ok && msg.Who.Equal(g.pids[currentPusherKey]) { g.Log.Debugw("terminate", "doGossip", g.validatorClear) @@ -92,6 +103,8 @@ func (g *Gossiper) Receive(actorContext actor.Context) { panic(fmt.Sprintf("unknown actor terminated: %s", msg.Who.GetId())) case *messages.StartGossip: + sp := g.NewSpan("start-gossip") + defer sp.Finish() g.Log.Debugw("start gossip") g.validatorClear = true actorContext.Send(actorContext.Self(), &messages.DoOneGossip{ @@ -99,7 +112,11 @@ func (g *Gossiper) Receive(actorContext actor.Context) { }) case *messages.DoOneGossip: //fmt.Printf("%s Received DoOneGossip\n", context.Self().String()) + sp := g.NewSpan("doOneGossip") + defer sp.Finish() + if _, ok := g.pids[currentPusherKey]; ok { + sp.SetTag("ignoring", true) g.Log.Debugw("ignoring because in progress") return } @@ -114,31 +131,39 @@ func (g *Gossiper) Receive(actorContext actor.Context) { System: g.system, }) case *messages.GetSyncer: + sp := g.NewSpan("getSyncer") + defer sp.Finish() g.Log.Debugw("GetSyncer", "remote", actorContext.Sender().GetId()) - ctx := context.Background() - sp, err := tracing.SpanContextFromSerialized(msg.Context, "pushSyncer-receiver") + // syncerCtx := context.Background() + // remoteSpan, err := tracing.SpanContextFromSerialized(msg.Context, "pushSyncer-receiver") - if err != nil { - // g.Log.Warnw("error decoding remote context", "err", err, "msg", msg, "from", actorContext.Sender().String()) - sp = opentracing.StartSpan("pushSyncer-receiver-no-parent") - } + // if err != nil { + // g.Log.Warnw("error decoding remote context", "err", err, "msg", msg, "from", actorContext.Sender().String()) + // remoteSpan = opentracing.StartSpan("pushSyncer-receiver-no-parent") + // } - sp.SetTag("actor", actorContext.Self().String()) - sp.SetTag("syncersAvailable", g.syncersAvailable) - ctx = opentracing.ContextWithSpan(ctx, sp) + // remoteSpan.SetTag("actor", actorContext.Self().String()) + // sp.SetTag("actor", actorContext.Self().String()) + // remoteSpan.SetTag("syncersAvailable", g.syncersAvailable) + // sp.SetTag("syncersAvailable", g.syncersAvailable) + // syncerCtx = opentracing.ContextWithSpan(syncerCtx, remoteSpan) if g.syncersAvailable > 0 { + sp.SetTag("available", true) receiveSyncer := actorContext.SpawnPrefix(g.pusherProps, remoteSyncerPrefix) - actorContext.Send(receiveSyncer, &setContext{context: ctx}) + actorContext.Send(receiveSyncer, &setContext{context: msg.GetContext()}) g.syncersAvailable-- available := &messages.SyncerAvailable{} available.SetDestination(extmsgs.ToActorPid(receiveSyncer)) actorContext.Respond(available) } else { + sp.SetTag("unavailable", true) actorContext.Respond(&messages.NoSyncersAvailable{}) } case *messages.Subscribe: + sp := g.NewSpan("subscribe") + defer sp.Finish() actorContext.Forward(g.storageActor) } } diff --git a/gossip3/actors/pushsyncer.go b/gossip3/actors/pushsyncer.go index b2b41004..e834c8c6 100644 --- a/gossip3/actors/pushsyncer.go +++ b/gossip3/actors/pushsyncer.go @@ -11,8 +11,8 @@ import ( "github.com/quorumcontrol/differencedigest/ibf" extmsgs "github.com/quorumcontrol/tupelo-go-client/gossip3/messages" "github.com/quorumcontrol/tupelo-go-client/gossip3/middleware" + "github.com/quorumcontrol/tupelo-go-client/tracing" "github.com/quorumcontrol/tupelo/gossip3/messages" - "github.com/quorumcontrol/tupelo/gossip3/tracing" ) type setContext struct { @@ -33,7 +33,7 @@ type PushSyncer struct { } func stopDecider(reason interface{}) actor.Directive { - middleware.Log.Infow("actor died", "reason", reason) + middleware.Log.Warnw("actor died", "reason", reason) return actor.StopDirective } @@ -115,16 +115,11 @@ func (syncer *PushSyncer) handleDoPush(context actor.Context, msg *messages.DoPu syncer.remote = remoteGossiper syncer.Log.Debugw("requesting syncer", "remote", remoteGossiper.Id) sp.SetTag("remote", remoteGossiper.String()) - - serialized, err := syncer.SerializedContext() - if err != nil { - syncer.Log.Errorw("error serializing context", "err", err) + getSyncerMsg := &messages.GetSyncer{ + Kind: syncer.kind, } - - resp, err := context.RequestFuture(remoteGossiper, &messages.GetSyncer{ - Kind: syncer.kind, - Context: serialized, - }, 10*time.Second).Result() + getSyncerMsg.SetContext(syncer.GetContext()) + resp, err := context.RequestFuture(remoteGossiper, getSyncerMsg, 10*time.Second).Result() if err != nil { syncer.Log.Errorw("timeout waiting for remote syncer", "err", err, "remote", remoteGossiper.String()) sp.SetTag("error", true) diff --git a/gossip3/actors/tupelo.go b/gossip3/actors/tupelo.go index a80f9ce6..d36cbd56 100644 --- a/gossip3/actors/tupelo.go +++ b/gossip3/actors/tupelo.go @@ -4,6 +4,7 @@ import ( "fmt" "github.com/AsynkronIT/protoactor-go/actor" + "github.com/AsynkronIT/protoactor-go/mailbox" "github.com/AsynkronIT/protoactor-go/plugin" "github.com/quorumcontrol/storage" extmsgs "github.com/quorumcontrol/tupelo-go-client/gossip3/messages" @@ -53,7 +54,7 @@ func NewTupeloNodeProps(cfg *TupeloConfig) *actor.Props { }).WithReceiverMiddleware( middleware.LoggingMiddleware, plugin.Use(&middleware.LogPlugin{}), - ) + ).WithDispatcher(mailbox.NewSynchronizedDispatcher(300)) } func (tn *TupeloNode) Receive(context actor.Context) { diff --git a/gossip3/messages/internal.go b/gossip3/messages/internal.go index 595cbfc7..da1175df 100644 --- a/gossip3/messages/internal.go +++ b/gossip3/messages/internal.go @@ -5,6 +5,7 @@ package messages import ( "github.com/quorumcontrol/differencedigest/ibf" extmsgs "github.com/quorumcontrol/tupelo-go-client/gossip3/messages" + "github.com/quorumcontrol/tupelo-go-client/tracing" ) func init() { @@ -30,9 +31,12 @@ func (dh *DestinationHolder) GetDestination() *extmsgs.ActorPID { return dh.Destination } +var _ tracing.Traceable = (*GetSyncer)(nil) + type GetSyncer struct { - Kind string - Context map[string]string + tracing.ContextHolder `msg:"-"` + + Kind string } func (GetSyncer) TypeCode() int8 { diff --git a/gossip3/messages/internal_gen.go b/gossip3/messages/internal_gen.go index 751dd52f..3dde59a2 100644 --- a/gossip3/messages/internal_gen.go +++ b/gossip3/messages/internal_gen.go @@ -178,36 +178,6 @@ func (z *GetSyncer) DecodeMsg(dc *msgp.Reader) (err error) { err = msgp.WrapError(err, "Kind") return } - case "Context": - var zb0002 uint32 - zb0002, err = dc.ReadMapHeader() - if err != nil { - err = msgp.WrapError(err, "Context") - return - } - if z.Context == nil { - z.Context = make(map[string]string, zb0002) - } else if len(z.Context) > 0 { - for key := range z.Context { - delete(z.Context, key) - } - } - for zb0002 > 0 { - zb0002-- - var za0001 string - var za0002 string - za0001, err = dc.ReadString() - if err != nil { - err = msgp.WrapError(err, "Context") - return - } - za0002, err = dc.ReadString() - if err != nil { - err = msgp.WrapError(err, "Context", za0001) - return - } - z.Context[za0001] = za0002 - } default: err = dc.Skip() if err != nil { @@ -220,10 +190,10 @@ func (z *GetSyncer) DecodeMsg(dc *msgp.Reader) (err error) { } // EncodeMsg implements msgp.Encodable -func (z *GetSyncer) EncodeMsg(en *msgp.Writer) (err error) { - // map header, size 2 +func (z GetSyncer) EncodeMsg(en *msgp.Writer) (err error) { + // map header, size 1 // write "Kind" - err = en.Append(0x82, 0xa4, 0x4b, 0x69, 0x6e, 0x64) + err = en.Append(0x81, 0xa4, 0x4b, 0x69, 0x6e, 0x64) if err != nil { return } @@ -232,45 +202,16 @@ func (z *GetSyncer) EncodeMsg(en *msgp.Writer) (err error) { err = msgp.WrapError(err, "Kind") return } - // write "Context" - err = en.Append(0xa7, 0x43, 0x6f, 0x6e, 0x74, 0x65, 0x78, 0x74) - if err != nil { - return - } - err = en.WriteMapHeader(uint32(len(z.Context))) - if err != nil { - err = msgp.WrapError(err, "Context") - return - } - for za0001, za0002 := range z.Context { - err = en.WriteString(za0001) - if err != nil { - err = msgp.WrapError(err, "Context") - return - } - err = en.WriteString(za0002) - if err != nil { - err = msgp.WrapError(err, "Context", za0001) - return - } - } return } // MarshalMsg implements msgp.Marshaler -func (z *GetSyncer) MarshalMsg(b []byte) (o []byte, err error) { +func (z GetSyncer) MarshalMsg(b []byte) (o []byte, err error) { o = msgp.Require(b, z.Msgsize()) - // map header, size 2 + // map header, size 1 // string "Kind" - o = append(o, 0x82, 0xa4, 0x4b, 0x69, 0x6e, 0x64) + o = append(o, 0x81, 0xa4, 0x4b, 0x69, 0x6e, 0x64) o = msgp.AppendString(o, z.Kind) - // string "Context" - o = append(o, 0xa7, 0x43, 0x6f, 0x6e, 0x74, 0x65, 0x78, 0x74) - o = msgp.AppendMapHeader(o, uint32(len(z.Context))) - for za0001, za0002 := range z.Context { - o = msgp.AppendString(o, za0001) - o = msgp.AppendString(o, za0002) - } return } @@ -298,36 +239,6 @@ func (z *GetSyncer) UnmarshalMsg(bts []byte) (o []byte, err error) { err = msgp.WrapError(err, "Kind") return } - case "Context": - var zb0002 uint32 - zb0002, bts, err = msgp.ReadMapHeaderBytes(bts) - if err != nil { - err = msgp.WrapError(err, "Context") - return - } - if z.Context == nil { - z.Context = make(map[string]string, zb0002) - } else if len(z.Context) > 0 { - for key := range z.Context { - delete(z.Context, key) - } - } - for zb0002 > 0 { - var za0001 string - var za0002 string - zb0002-- - za0001, bts, err = msgp.ReadStringBytes(bts) - if err != nil { - err = msgp.WrapError(err, "Context") - return - } - za0002, bts, err = msgp.ReadStringBytes(bts) - if err != nil { - err = msgp.WrapError(err, "Context", za0001) - return - } - z.Context[za0001] = za0002 - } default: bts, err = msgp.Skip(bts) if err != nil { @@ -341,14 +252,8 @@ func (z *GetSyncer) UnmarshalMsg(bts []byte) (o []byte, err error) { } // Msgsize returns an upper bound estimate of the number of bytes occupied by the serialized message -func (z *GetSyncer) Msgsize() (s int) { - s = 1 + 5 + msgp.StringPrefixSize + len(z.Kind) + 8 + msgp.MapHeaderSize - if z.Context != nil { - for za0001, za0002 := range z.Context { - _ = za0002 - s += msgp.StringPrefixSize + len(za0001) + msgp.StringPrefixSize + len(za0002) - } - } +func (z GetSyncer) Msgsize() (s int) { + s = 1 + 5 + msgp.StringPrefixSize + len(z.Kind) return } diff --git a/gossip3/messages/messages.go b/gossip3/messages/messages.go index b71a456b..73499b6b 100644 --- a/gossip3/messages/messages.go +++ b/gossip3/messages/messages.go @@ -4,7 +4,7 @@ import ( "github.com/AsynkronIT/protoactor-go/actor" extmsgs "github.com/quorumcontrol/tupelo-go-client/gossip3/messages" "github.com/quorumcontrol/tupelo-go-client/gossip3/types" - "github.com/quorumcontrol/tupelo/gossip3/tracing" + "github.com/quorumcontrol/tupelo-go-client/tracing" ) type MetadataMap map[string]interface{} From b8e54d473579c21b8b577da1d3a7a5a528116faf Mon Sep 17 00:00:00 2001 From: Topper Bowers Date: Wed, 20 Mar 2019 10:27:34 +0100 Subject: [PATCH 17/30] Lower log level to debug for stopping, use multiline for dispatcher. --- gossip3/actors/gossiper.go | 6 ++++-- gossip3/actors/tupelo.go | 4 +++- 2 files changed, 7 insertions(+), 3 deletions(-) diff --git a/gossip3/actors/gossiper.go b/gossip3/actors/gossiper.go index dee887eb..288600c5 100644 --- a/gossip3/actors/gossiper.go +++ b/gossip3/actors/gossiper.go @@ -58,7 +58,9 @@ func NewGossiperProps(kind string, storage *actor.PID, system system, pusherProp plugin.Use(&middleware.LogPlugin{}), ).WithSupervisor( supervisor, - ).WithDispatcher(mailbox.NewSynchronizedDispatcher(300)) + ).WithDispatcher( + mailbox.NewSynchronizedDispatcher(300), + ) } func (g *Gossiper) Receive(actorContext actor.Context) { @@ -72,7 +74,7 @@ func (g *Gossiper) Receive(actorContext actor.Context) { case *actor.Started: g.StartTrace("gossiper") case *actor.Stopping: - g.Log.Warnw("stopping") + g.Log.Debugw("stopping") g.StopTrace() case *actor.Restarting: g.Log.Infow("restarting") diff --git a/gossip3/actors/tupelo.go b/gossip3/actors/tupelo.go index d36cbd56..2896b457 100644 --- a/gossip3/actors/tupelo.go +++ b/gossip3/actors/tupelo.go @@ -54,7 +54,9 @@ func NewTupeloNodeProps(cfg *TupeloConfig) *actor.Props { }).WithReceiverMiddleware( middleware.LoggingMiddleware, plugin.Use(&middleware.LogPlugin{}), - ).WithDispatcher(mailbox.NewSynchronizedDispatcher(300)) + ).WithDispatcher( + mailbox.NewSynchronizedDispatcher(300), + ) } func (tn *TupeloNode) Receive(context actor.Context) { From d1309b945e32bcd03c5a7595301830bdcf1fc5ca Mon Sep 17 00:00:00 2001 From: Topper Bowers Date: Wed, 20 Mar 2019 11:05:01 +0100 Subject: [PATCH 18/30] better gossiper tracing, fix bug where syncer count increased unbounded. --- gossip3/actors/gossiper.go | 42 ++++++++++++-------------------------- 1 file changed, 13 insertions(+), 29 deletions(-) diff --git a/gossip3/actors/gossiper.go b/gossip3/actors/gossiper.go index 288600c5..d86a2cbd 100644 --- a/gossip3/actors/gossiper.go +++ b/gossip3/actors/gossiper.go @@ -71,19 +71,17 @@ func (g *Gossiper) Receive(actorContext actor.Context) { // } // }() switch msg := actorContext.Message().(type) { - case *actor.Started: - g.StartTrace("gossiper") - case *actor.Stopping: - g.Log.Debugw("stopping") - g.StopTrace() case *actor.Restarting: g.Log.Infow("restarting") case *actor.Terminated: - sp := g.NewSpan("terminated") - defer sp.Finish() - sp.SetTag("who", msg.Who.Id) + sp := g.StartTrace("pushSyncerTerminated") + defer g.StopTrace() + sp.SetTag("who", msg.Who.String()) + sp.SetTag("syncersAvailable", g.syncersAvailable) + // this is for when the pushers stop, we can queue up another push if _, ok := g.pids[currentPusherKey]; ok && msg.Who.Equal(g.pids[currentPusherKey]) { + sp.SetTag("initiator", true) g.Log.Debugw("terminate", "doGossip", g.validatorClear) delete(g.pids, currentPusherKey) timer := time.After(100 * time.Millisecond) @@ -96,17 +94,17 @@ func (g *Gossiper) Receive(actorContext actor.Context) { return } if strings.HasPrefix(msg.Who.GetId(), actorContext.Self().GetId()+"/"+remoteSyncerPrefix) { + sp.SetTag("remoteSyncer", true) g.Log.Debugw("releasing a new remote syncer") g.syncersAvailable++ return } + sp.SetTag("error", true) g.Log.Errorw("unknown actor terminated", "who", msg.Who.GetId(), "pids", g.pids) panic(fmt.Sprintf("unknown actor terminated: %s", msg.Who.GetId())) case *messages.StartGossip: - sp := g.NewSpan("start-gossip") - defer sp.Finish() g.Log.Debugw("start gossip") g.validatorClear = true actorContext.Send(actorContext.Self(), &messages.DoOneGossip{ @@ -125,6 +123,7 @@ func (g *Gossiper) Receive(actorContext actor.Context) { g.Log.Debugw("gossiping again") localsyncer, err := actorContext.SpawnNamed(g.pusherProps, "pushSyncer") if err != nil { + sp.SetTag("error", true) panic(fmt.Sprintf("error spawning: %v", err)) } g.pids[currentPusherKey] = localsyncer @@ -133,26 +132,13 @@ func (g *Gossiper) Receive(actorContext actor.Context) { System: g.system, }) case *messages.GetSyncer: - sp := g.NewSpan("getSyncer") - defer sp.Finish() + sp := g.StartTrace("getSyncer") + defer g.StopTrace() g.Log.Debugw("GetSyncer", "remote", actorContext.Sender().GetId()) - - // syncerCtx := context.Background() - // remoteSpan, err := tracing.SpanContextFromSerialized(msg.Context, "pushSyncer-receiver") - - // if err != nil { - // g.Log.Warnw("error decoding remote context", "err", err, "msg", msg, "from", actorContext.Sender().String()) - // remoteSpan = opentracing.StartSpan("pushSyncer-receiver-no-parent") - // } - - // remoteSpan.SetTag("actor", actorContext.Self().String()) - // sp.SetTag("actor", actorContext.Self().String()) - // remoteSpan.SetTag("syncersAvailable", g.syncersAvailable) - // sp.SetTag("syncersAvailable", g.syncersAvailable) - // syncerCtx = opentracing.ContextWithSpan(syncerCtx, remoteSpan) + sp.SetTag("remote", actorContext.Sender().String()) + sp.SetTag("syncersAvailable", g.syncersAvailable) if g.syncersAvailable > 0 { - sp.SetTag("available", true) receiveSyncer := actorContext.SpawnPrefix(g.pusherProps, remoteSyncerPrefix) actorContext.Send(receiveSyncer, &setContext{context: msg.GetContext()}) g.syncersAvailable-- @@ -164,8 +150,6 @@ func (g *Gossiper) Receive(actorContext actor.Context) { actorContext.Respond(&messages.NoSyncersAvailable{}) } case *messages.Subscribe: - sp := g.NewSpan("subscribe") - defer sp.Finish() actorContext.Forward(g.storageActor) } } From 0f489220aaf2bfb4b696744ef8ff6b6aa4365517 Mon Sep 17 00:00:00 2001 From: Topper Bowers Date: Wed, 20 Mar 2019 11:05:12 +0100 Subject: [PATCH 19/30] Various tupelo-go-client fixes --- Gopkg.lock | 7 +++++-- Gopkg.toml | 2 +- 2 files changed, 6 insertions(+), 3 deletions(-) diff --git a/Gopkg.lock b/Gopkg.lock index 16cc0363..d69b04d7 100644 --- a/Gopkg.lock +++ b/Gopkg.lock @@ -1235,6 +1235,7 @@ packages = [ "prometheus", "prometheus/internal", + "prometheus/promauto", "prometheus/promhttp", ] pruneopts = "" @@ -1319,7 +1320,7 @@ version = "v1.1.1" [[projects]] - digest = "1:b20ff72e5522808d8073e2cc8c06457fe3ea213393753297f262de220425ca05" + digest = "1:ec7330e2593994c54d6f6bfe92d07c1d098aa0fc6279ff985a4f14cf43da9874" name = "github.com/quorumcontrol/tupelo-go-client" packages = [ "bls", @@ -1334,7 +1335,7 @@ "tracing", ] pruneopts = "" - revision = "31c342087121ebf47d1f6ba07e2e575764be7fd9" + revision = "d5122bf934020dd33cf0aab8ce06b668df09f2ad" [[projects]] digest = "1:d367886e3a8134415fad58fb2ac44e2f38aa88068adca7a02d59a555f87085c0" @@ -1835,6 +1836,8 @@ "github.com/ipsn/go-ipfs/plugin/loader", "github.com/ipsn/go-ipfs/repo/fsrepo", "github.com/mitchellh/go-homedir", + "github.com/prometheus/client_golang/prometheus", + "github.com/prometheus/client_golang/prometheus/promauto", "github.com/quorumcontrol/chaintree/chaintree", "github.com/quorumcontrol/chaintree/dag", "github.com/quorumcontrol/chaintree/nodestore", diff --git a/Gopkg.toml b/Gopkg.toml index 0156331e..5d7cb7ef 100644 --- a/Gopkg.toml +++ b/Gopkg.toml @@ -56,7 +56,7 @@ [[constraint]] name = "github.com/quorumcontrol/tupelo-go-client" - revision = "31c342087121ebf47d1f6ba07e2e575764be7fd9" # fix/timeouts + revision = "d5122bf934020dd33cf0aab8ce06b668df09f2ad" # fix/timeouts [[constraint]] name = "github.com/gobuffalo/packr" From f9f50db463f5c42eab935e643457563445f7fc09 Mon Sep 17 00:00:00 2001 From: Topper Bowers Date: Wed, 20 Mar 2019 12:04:31 +0100 Subject: [PATCH 20/30] move tracing logic to flags --- cmd/testnode.go | 19 +++++++++++++++++-- 1 file changed, 17 insertions(+), 2 deletions(-) diff --git a/cmd/testnode.go b/cmd/testnode.go index ac3d07b8..f84b5b0b 100644 --- a/cmd/testnode.go +++ b/cmd/testnode.go @@ -44,6 +44,9 @@ var ( BlsSignKeys []*bls.SignKey EcdsaKeys []*ecdsa.PrivateKey testnodePort int + + enableJaegerTracing bool + enableElasticTracing bool ) // testnodeCmd represents the testnode command @@ -60,8 +63,16 @@ var testnodeCmd = &cobra.Command{ ecdsaKeyHex := os.Getenv("TUPELO_NODE_ECDSA_KEY_HEX") blsKeyHex := os.Getenv("TUPELO_NODE_BLS_KEY_HEX") signer := setupGossipNode(ctx, ecdsaKeyHex, blsKeyHex, "distributed-network", testnodePort) + if enableElasticTracing && enableJaegerTracing { + panic("only one tracing library may be used at once") + } + if enableJaegerTracing { + tracing.StartJaeger("signer-" + signer.ID) + } + if enableElasticTracing { + tracing.StartElastic() + } actor.EmptyRootContext.Send(signer.Actor, &messages.StartGossip{}) - tracing.StartJaeger("signer-" + signer.ID) stopOnSignal(signer) }, } @@ -169,11 +180,15 @@ func stopOnSignal(signers ...*gossip3types.Signer) { }() fmt.Println("awaiting signal") <-done - tracing.StopJaeger() + if enableJaegerTracing { + tracing.StopJaeger() + } fmt.Println("exiting") } func init() { rootCmd.AddCommand(testnodeCmd) testnodeCmd.Flags().IntVarP(&testnodePort, "port", "p", 0, "what port will the node listen on") + testnodeCmd.Flags().BoolVar(&enableJaegerTracing, "jaeger-tracing", false, "enable jaeger tracing") + testnodeCmd.Flags().BoolVar(&enableElasticTracing, "elastic-tracing", false, "enable elastic tracing") } From 3ea89d511d6684e9599a877baf71616ca7d98f0f Mon Sep 17 00:00:00 2001 From: Topper Bowers Date: Wed, 20 Mar 2019 12:15:43 +0100 Subject: [PATCH 21/30] move all LogKVs to SetTag --- gossip3/actors/conflictset.go | 13 ++++++++----- gossip3/actors/validator.go | 5 +++-- 2 files changed, 11 insertions(+), 7 deletions(-) diff --git a/gossip3/actors/conflictset.go b/gossip3/actors/conflictset.go index b62f88fa..efa06278 100644 --- a/gossip3/actors/conflictset.go +++ b/gossip3/actors/conflictset.go @@ -201,12 +201,12 @@ func (cs *ConflictSet) handleNewTransaction(context actor.Context, msg *messages } if msg.Accepted { - sp.LogKV("accepted", msg.Accepted) + sp.SetTag("accepted", msg.Accepted) cs.active = true } if !cs.active { - sp.LogKV("snoozing", true) + sp.SetTag("snoozing", true) cs.Log.Debugw("snoozing transaction", "t", msg.Key, "height", msg.Transaction.Height) } cs.transactions[string(msg.TransactionID)] = msg @@ -226,7 +226,7 @@ func (cs *ConflictSet) processTransactions(context actor.Context) { if !cs.didSign { context.Request(cs.signatureGenerator, transaction) - sp.LogKV("didSign", true) + sp.SetTag("didSign", true) cs.didSign = true } cs.updates++ @@ -297,7 +297,8 @@ func (cs *ConflictSet) handleDeadlockedState(context actor.Context) { var lowestTrans *messages.TransactionWrapper for transID, trans := range cs.transactions { - trans.LogKV("deadlocked", true) + sp := trans.NewSpan("handleDeadlockedState") + defer sp.Finish() if lowestTrans == nil { lowestTrans = trans continue @@ -429,7 +430,9 @@ func (cs *ConflictSet) handleCurrentStateWrapper(context actor.Context, currWrap currWrapper.CleanupTransactions = make([]*messages.TransactionWrapper, len(cs.transactions)) i := 0 for _, t := range cs.transactions { - t.LogKV("done", true) + sp := t.NewSpan("handleCurrentStateWrapper") + defer sp.Finish() + sp.SetTag("done", true) currWrapper.CleanupTransactions[i] = t i++ } diff --git a/gossip3/actors/validator.go b/gossip3/actors/validator.go index 3145ed9e..24c79919 100644 --- a/gossip3/actors/validator.go +++ b/gossip3/actors/validator.go @@ -165,17 +165,18 @@ func (tv *TransactionValidator) handleRequest(actorCtx actor.Context, msg *valid if accepted && expectedNewTip { tv.Log.Debugw("accepted", "key", msg.key) wrapper.Accepted = true + sp.SetTag("accepted", true) actorCtx.Respond(wrapper) return } - sp.LogKV("accepted", false) + sp.SetTag("accepted", false) if err == nil && !expectedNewTip { nextStateCid, _ := cid.Cast(nextState) newTipCid, _ := cid.Cast(t.NewTip) err = fmt.Errorf("error: expected new tip: %s but got: %s", nextStateCid.String(), newTipCid.String()) } - sp.LogKV("error", err) + sp.SetTag("error", err) wrapper.Metadata["error"] = err tv.Log.Debugw("rejected", "err", err) From db95c02476376608ca971ae049e647c6ecb17c0f Mon Sep 17 00:00:00 2001 From: Topper Bowers Date: Wed, 20 Mar 2019 17:05:31 +0100 Subject: [PATCH 22/30] back out synch changes --- gossip3/actors/conflictsetrouter.go | 3 +-- gossip3/actors/tupelo.go | 3 --- 2 files changed, 1 insertion(+), 5 deletions(-) diff --git a/gossip3/actors/conflictsetrouter.go b/gossip3/actors/conflictsetrouter.go index 59d9087a..a6f5dbe3 100644 --- a/gossip3/actors/conflictsetrouter.go +++ b/gossip3/actors/conflictsetrouter.go @@ -4,7 +4,6 @@ import ( "fmt" "github.com/AsynkronIT/protoactor-go/actor" - "github.com/AsynkronIT/protoactor-go/mailbox" "github.com/AsynkronIT/protoactor-go/plugin" "github.com/ethereum/go-ethereum/common/hexutil" iradix "github.com/hashicorp/go-immutable-radix" @@ -55,7 +54,7 @@ func NewConflictSetRouterProps(cfg *ConflictSetRouterConfig) *actor.Props { }).WithReceiverMiddleware( middleware.LoggingMiddleware, plugin.Use(&middleware.LogPlugin{}), - ).WithDispatcher(mailbox.NewSynchronizedDispatcher(300)) + ) } func (csr *ConflictSetRouter) nextHeight(objectID []byte) uint64 { diff --git a/gossip3/actors/tupelo.go b/gossip3/actors/tupelo.go index 2896b457..a80f9ce6 100644 --- a/gossip3/actors/tupelo.go +++ b/gossip3/actors/tupelo.go @@ -4,7 +4,6 @@ import ( "fmt" "github.com/AsynkronIT/protoactor-go/actor" - "github.com/AsynkronIT/protoactor-go/mailbox" "github.com/AsynkronIT/protoactor-go/plugin" "github.com/quorumcontrol/storage" extmsgs "github.com/quorumcontrol/tupelo-go-client/gossip3/messages" @@ -54,8 +53,6 @@ func NewTupeloNodeProps(cfg *TupeloConfig) *actor.Props { }).WithReceiverMiddleware( middleware.LoggingMiddleware, plugin.Use(&middleware.LogPlugin{}), - ).WithDispatcher( - mailbox.NewSynchronizedDispatcher(300), ) } From ef9e42368e9fedf132691c6ae1f969e226356b99 Mon Sep 17 00:00:00 2001 From: Topper Bowers Date: Wed, 20 Mar 2019 17:44:34 +0100 Subject: [PATCH 23/30] instrument the conflictset too --- gossip3/actors/conflictset.go | 81 +++++++++++++++++++++++++++++------ 1 file changed, 67 insertions(+), 14 deletions(-) diff --git a/gossip3/actors/conflictset.go b/gossip3/actors/conflictset.go index efa06278..9f4a5080 100644 --- a/gossip3/actors/conflictset.go +++ b/gossip3/actors/conflictset.go @@ -13,6 +13,7 @@ import ( extmsgs "github.com/quorumcontrol/tupelo-go-client/gossip3/messages" "github.com/quorumcontrol/tupelo-go-client/gossip3/middleware" "github.com/quorumcontrol/tupelo-go-client/gossip3/types" + "github.com/quorumcontrol/tupelo-go-client/tracing" "github.com/quorumcontrol/tupelo/gossip3/messages" ) @@ -26,6 +27,7 @@ type checkStateMsg struct { type ConflictSet struct { middleware.LogAwareHolder + tracing.ContextHolder ID string currentStateStore storage.Reader @@ -89,6 +91,12 @@ func (cs *ConflictSet) Receive(ctx actor.Context) { func (cs *ConflictSet) NormalState(context actor.Context) { switch msg := context.Message().(type) { + case *actor.Started: + sp := cs.StartTrace("conflictSet") + sp.SetTag("self", context.Self().String()) + sp.SetTag("active", cs.active) + case *actor.Stopped: + cs.StopTrace() case *messages.TransactionWrapper: cs.handleNewTransaction(context, msg) // this will be an external signature @@ -187,13 +195,21 @@ func (cs *ConflictSet) handleCommit(context actor.Context, msg *commitNotificati } func (cs *ConflictSet) DoneReceive(context actor.Context) { - cs.Log.Debugw("done cs received message") // do nothing when in the done state + cs.Log.Debugw("done cs received message") + switch context.Message().(type) { + case *actor.Stopped: + cs.StopTrace() + } } func (cs *ConflictSet) handleNewTransaction(context actor.Context, msg *messages.TransactionWrapper) { - sp := msg.NewSpan("conflictset-handlenewtransaction") + sp := cs.NewSpan("handleNewTransaction") defer sp.Finish() + sp.SetTag("transaction", msg.TransactionID) + + transSpan := msg.NewSpan("conflictset-handlenewtransaction") + defer transSpan.Finish() cs.Log.Debugw("new transaction", "trans", msg.TransactionID) if !msg.PreFlight && !msg.Accepted { @@ -201,12 +217,15 @@ func (cs *ConflictSet) handleNewTransaction(context actor.Context, msg *messages } if msg.Accepted { - sp.SetTag("accepted", msg.Accepted) + sp.SetTag("accepted", true) + sp.SetTag("active", true) + transSpan.SetTag("accepted", true) cs.active = true } if !cs.active { sp.SetTag("snoozing", true) + transSpan.SetTag("snoozing", true) cs.Log.Debugw("snoozing transaction", "t", msg.Key, "height", msg.Transaction.Height) } cs.transactions[string(msg.TransactionID)] = msg @@ -216,27 +235,33 @@ func (cs *ConflictSet) handleNewTransaction(context actor.Context, msg *messages } func (cs *ConflictSet) processTransactions(context actor.Context) { + sp := cs.NewSpan("processTransactions") + defer sp.Finish() + if !cs.active { panic(fmt.Errorf("error: processTransactions called on inactive ConflictSet")) } for _, transaction := range cs.transactions { - sp := transaction.NewSpan("conflictset-processing") + transSpan := transaction.NewSpan("conflictset-processing") cs.Log.Debugw("processing transaction", "t", transaction.Key, "height", transaction.Transaction.Height) if !cs.didSign { context.Request(cs.signatureGenerator, transaction) - sp.SetTag("didSign", true) + transSpan.SetTag("didSign", true) cs.didSign = true } cs.updates++ - sp.Finish() + transSpan.Finish() } // do this as a message to make sure we're doing it after all the updates have come in context.Send(context.Self(), &checkStateMsg{atUpdate: cs.updates}) } func (cs *ConflictSet) handleNewSignature(context actor.Context, msg *messages.SignatureWrapper) { + sp := cs.NewSpan("handleNewSignature") + defer sp.Finish() + cs.Log.Debugw("handle new signature", "t", msg.Signature.TransactionID) if msg.Internal { context.Send(cs.signatureSender, msg) @@ -271,15 +296,19 @@ func (cs *ConflictSet) handleNewSignature(context actor.Context, msg *messages.S } func (cs *ConflictSet) checkState(context actor.Context, msg *checkStateMsg) { + sp := cs.NewSpan("checkState") + defer sp.Finish() + cs.Log.Debugw("check state") if cs.updates < msg.atUpdate { cs.Log.Debugw("old update") + sp.SetTag("oldUpdate", true) // we know there will be another check state message with a higher update return } if trans := cs.possiblyDone(); trans != nil { - sp := trans.NewSpan("checkState") - defer sp.Finish() + transSpan := trans.NewSpan("checkState") + defer transSpan.Finish() // we have a possibly done transaction, lets make a current state if err := cs.createCurrentStateFromTrans(context, trans); err != nil { panic(err) @@ -293,12 +322,14 @@ func (cs *ConflictSet) checkState(context actor.Context, msg *checkStateMsg) { } func (cs *ConflictSet) handleDeadlockedState(context actor.Context) { + sp := cs.NewSpan("handleDeadlockedState") + defer sp.Finish() cs.Log.Debugw("handle deadlocked state") var lowestTrans *messages.TransactionWrapper for transID, trans := range cs.transactions { - sp := trans.NewSpan("handleDeadlockedState") - defer sp.Finish() + transSpan := trans.NewSpan("handleDeadlockedState") + defer transSpan.Finish() if lowestTrans == nil { lowestTrans = trans continue @@ -313,6 +344,9 @@ func (cs *ConflictSet) handleDeadlockedState(context actor.Context) { } func (cs *ConflictSet) nextView(newWinner *messages.TransactionWrapper) { + sp := cs.NewSpan("nextView") + defer sp.Finish() + cs.view++ cs.didSign = false cs.transactions = make(transactionMap) @@ -324,8 +358,12 @@ func (cs *ConflictSet) nextView(newWinner *messages.TransactionWrapper) { } func (cs *ConflictSet) createCurrentStateFromTrans(context actor.Context, trans *messages.TransactionWrapper) error { - sp := trans.NewSpan("createCurrentState") + sp := cs.NewSpan("createCurrentStateFromTrans") defer sp.Finish() + transSpan := trans.NewSpan("createCurrentState") + defer transSpan.Finish() + + sp.SetTag("winner", trans.TransactionID) cs.Log.Debugw("createCurrentStateFromTrans", "t", trans.Key) sigs := cs.signatures[string(trans.TransactionID)] @@ -380,6 +418,9 @@ func (cs *ConflictSet) createCurrentStateFromTrans(context actor.Context, trans } func (cs *ConflictSet) validSignature(context actor.Context, currWrapper *messages.CurrentStateWrapper) (bool, error) { + sp := cs.NewSpan("validSignature") + defer sp.Finish() + sig := currWrapper.CurrentState.Signature signerArray, err := bitarray.Unmarshal(sig.Signers) if err != nil { @@ -413,6 +454,9 @@ func (cs *ConflictSet) validSignature(context actor.Context, currWrapper *messag } func (cs *ConflictSet) handleCurrentStateWrapper(context actor.Context, currWrapper *messages.CurrentStateWrapper) error { + sp := cs.NewSpan("handleCurrentStateWrapper") + defer sp.Finish() + cs.Log.Debugw("handleCurrentStateWrapper", "internal", currWrapper.Internal) if !currWrapper.Verified { @@ -430,14 +474,15 @@ func (cs *ConflictSet) handleCurrentStateWrapper(context actor.Context, currWrap currWrapper.CleanupTransactions = make([]*messages.TransactionWrapper, len(cs.transactions)) i := 0 for _, t := range cs.transactions { - sp := t.NewSpan("handleCurrentStateWrapper") - defer sp.Finish() - sp.SetTag("done", true) + transSpan := t.NewSpan("handleCurrentStateWrapper") + defer transSpan.Finish() + transSpan.SetTag("done", true) currWrapper.CleanupTransactions[i] = t i++ } cs.done = true + sp.SetTag("done", true) cs.Log.Debugw("done") cs.behavior.Become(cs.DoneReceive) @@ -445,6 +490,8 @@ func (cs *ConflictSet) handleCurrentStateWrapper(context actor.Context, currWrap context.Send(parent, currWrapper) } } else { + sp.SetTag("badSignature", true) + sp.SetTag("error", true) cs.Log.Errorw("signature not verified") } return nil @@ -452,6 +499,9 @@ func (cs *ConflictSet) handleCurrentStateWrapper(context actor.Context, currWrap // returns a transaction with enough signatures or nil if none yet exist func (cs *ConflictSet) possiblyDone() *messages.TransactionWrapper { + sp := cs.NewSpan("possiblyDone") + defer sp.Finish() + count := cs.notaryGroup.QuorumCount() for tID, sigList := range cs.signatures { cs.Log.Debugw("check count", "t", []byte(tID), "len", len(sigList), "quorumAt", count) @@ -463,6 +513,9 @@ func (cs *ConflictSet) possiblyDone() *messages.TransactionWrapper { } func (cs *ConflictSet) deadlocked() bool { + sp := cs.NewSpan("isDeadlocked") + defer sp.Finish() + unknownSigCount := len(cs.notaryGroup.Signers) - len(cs.signerSigs) quorumAt := cs.notaryGroup.QuorumCount() if len(cs.signerSigs) == 0 { From 0410be1f6377be4319483c3cb66bda39e2000ddf Mon Sep 17 00:00:00 2001 From: Wes Morgan Date: Wed, 20 Mar 2019 10:56:46 -0600 Subject: [PATCH 24/30] Update to latest fix/timeouts commit in client lib --- Gopkg.lock | 7 ++----- Gopkg.toml | 2 +- 2 files changed, 3 insertions(+), 6 deletions(-) diff --git a/Gopkg.lock b/Gopkg.lock index d69b04d7..7305cdce 100644 --- a/Gopkg.lock +++ b/Gopkg.lock @@ -1235,7 +1235,6 @@ packages = [ "prometheus", "prometheus/internal", - "prometheus/promauto", "prometheus/promhttp", ] pruneopts = "" @@ -1320,7 +1319,7 @@ version = "v1.1.1" [[projects]] - digest = "1:ec7330e2593994c54d6f6bfe92d07c1d098aa0fc6279ff985a4f14cf43da9874" + digest = "1:67d0fcdb751140d0bae4e0083e67fed1a2f449773b6f9dedabe5bba3af371c47" name = "github.com/quorumcontrol/tupelo-go-client" packages = [ "bls", @@ -1335,7 +1334,7 @@ "tracing", ] pruneopts = "" - revision = "d5122bf934020dd33cf0aab8ce06b668df09f2ad" + revision = "5efd67f80429ecc87e07c121eb9e692459836c21" [[projects]] digest = "1:d367886e3a8134415fad58fb2ac44e2f38aa88068adca7a02d59a555f87085c0" @@ -1836,8 +1835,6 @@ "github.com/ipsn/go-ipfs/plugin/loader", "github.com/ipsn/go-ipfs/repo/fsrepo", "github.com/mitchellh/go-homedir", - "github.com/prometheus/client_golang/prometheus", - "github.com/prometheus/client_golang/prometheus/promauto", "github.com/quorumcontrol/chaintree/chaintree", "github.com/quorumcontrol/chaintree/dag", "github.com/quorumcontrol/chaintree/nodestore", diff --git a/Gopkg.toml b/Gopkg.toml index 5d7cb7ef..184248af 100644 --- a/Gopkg.toml +++ b/Gopkg.toml @@ -56,7 +56,7 @@ [[constraint]] name = "github.com/quorumcontrol/tupelo-go-client" - revision = "d5122bf934020dd33cf0aab8ce06b668df09f2ad" # fix/timeouts + revision = "5efd67f80429ecc87e07c121eb9e692459836c21" # fix/timeouts [[constraint]] name = "github.com/gobuffalo/packr" From f628b7d67268c501551d4d9c6a5595af9d005d99 Mon Sep 17 00:00:00 2001 From: Wes Morgan Date: Wed, 20 Mar 2019 11:52:15 -0600 Subject: [PATCH 25/30] Update to latest fix/timeouts commit in client lib --- Gopkg.lock | 4 ++-- Gopkg.toml | 2 +- 2 files changed, 3 insertions(+), 3 deletions(-) diff --git a/Gopkg.lock b/Gopkg.lock index 7305cdce..17d3d503 100644 --- a/Gopkg.lock +++ b/Gopkg.lock @@ -1319,7 +1319,7 @@ version = "v1.1.1" [[projects]] - digest = "1:67d0fcdb751140d0bae4e0083e67fed1a2f449773b6f9dedabe5bba3af371c47" + digest = "1:638bf971cf7f3658a37a2a1fa55aad3e5b9bc898b4eb57805c3499927bf92523" name = "github.com/quorumcontrol/tupelo-go-client" packages = [ "bls", @@ -1334,7 +1334,7 @@ "tracing", ] pruneopts = "" - revision = "5efd67f80429ecc87e07c121eb9e692459836c21" + revision = "6c8a5f56dbcac8454b7d65fafc52074a23911f45" [[projects]] digest = "1:d367886e3a8134415fad58fb2ac44e2f38aa88068adca7a02d59a555f87085c0" diff --git a/Gopkg.toml b/Gopkg.toml index 184248af..f619cee6 100644 --- a/Gopkg.toml +++ b/Gopkg.toml @@ -56,7 +56,7 @@ [[constraint]] name = "github.com/quorumcontrol/tupelo-go-client" - revision = "5efd67f80429ecc87e07c121eb9e692459836c21" # fix/timeouts + revision = "6c8a5f56dbcac8454b7d65fafc52074a23911f45" # fix/timeouts [[constraint]] name = "github.com/gobuffalo/packr" From 3cf96dbfbb3e5f94fb34f9ac0e445e0ac9019b99 Mon Sep 17 00:00:00 2001 From: Wes Morgan Date: Thu, 21 Mar 2019 13:52:05 -0600 Subject: [PATCH 26/30] Update to latest fix/timeouts commit in client lib --- Gopkg.lock | 4 ++-- Gopkg.toml | 2 +- 2 files changed, 3 insertions(+), 3 deletions(-) diff --git a/Gopkg.lock b/Gopkg.lock index 17d3d503..1f646300 100644 --- a/Gopkg.lock +++ b/Gopkg.lock @@ -1319,7 +1319,7 @@ version = "v1.1.1" [[projects]] - digest = "1:638bf971cf7f3658a37a2a1fa55aad3e5b9bc898b4eb57805c3499927bf92523" + digest = "1:d31b4c596731714ce7f2149b468d972706ebc12ba098581ea1e4b8b29cb31ddd" name = "github.com/quorumcontrol/tupelo-go-client" packages = [ "bls", @@ -1334,7 +1334,7 @@ "tracing", ] pruneopts = "" - revision = "6c8a5f56dbcac8454b7d65fafc52074a23911f45" + revision = "4ab4685bbbf2787ec55b02ee66cb09d3161b2eaa" [[projects]] digest = "1:d367886e3a8134415fad58fb2ac44e2f38aa88068adca7a02d59a555f87085c0" diff --git a/Gopkg.toml b/Gopkg.toml index f619cee6..6c10de95 100644 --- a/Gopkg.toml +++ b/Gopkg.toml @@ -56,7 +56,7 @@ [[constraint]] name = "github.com/quorumcontrol/tupelo-go-client" - revision = "6c8a5f56dbcac8454b7d65fafc52074a23911f45" # fix/timeouts + revision = "4ab4685bbbf2787ec55b02ee66cb09d3161b2eaa" # fix/timeouts [[constraint]] name = "github.com/gobuffalo/packr" From aa671e4a53405b13cec34018c1510cf8c04babb7 Mon Sep 17 00:00:00 2001 From: Topper Bowers Date: Fri, 22 Mar 2019 09:33:18 +0100 Subject: [PATCH 27/30] remove typo --- cmd/testnode.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/cmd/testnode.go b/cmd/testnode.go index f84b5b0b..7e68e3d3 100644 --- a/cmd/testnode.go +++ b/cmd/testnode.go @@ -87,7 +87,7 @@ func setupNotaryGroup(local *gossip3types.Signer, keys []*PublicKeySet) *gossip3 if local != nil { group.AddSigner(local) } - w + for _, keySet := range keys { ecdsaBytes := hexutil.MustDecode(keySet.EcdsaHexPublicKey) if local != nil && bytes.Equal(crypto.FromECDSAPub(local.DstKey), ecdsaBytes) { From ed75107c272d10698163e4e04b4be9d42eb32018 Mon Sep 17 00:00:00 2001 From: Topper Bowers Date: Fri, 22 Mar 2019 10:58:00 +0100 Subject: [PATCH 28/30] upgrade to latest tupelo-go-client --- Gopkg.lock | 5 ++--- Gopkg.toml | 2 +- 2 files changed, 3 insertions(+), 4 deletions(-) diff --git a/Gopkg.lock b/Gopkg.lock index 1f646300..8000b8ad 100644 --- a/Gopkg.lock +++ b/Gopkg.lock @@ -1319,7 +1319,7 @@ version = "v1.1.1" [[projects]] - digest = "1:d31b4c596731714ce7f2149b468d972706ebc12ba098581ea1e4b8b29cb31ddd" + digest = "1:8e1c510830f8612ff8bc987dc715bfadafd3ad89c3e83c3887a050cf5200535f" name = "github.com/quorumcontrol/tupelo-go-client" packages = [ "bls", @@ -1334,7 +1334,7 @@ "tracing", ] pruneopts = "" - revision = "4ab4685bbbf2787ec55b02ee66cb09d3161b2eaa" + revision = "9b2fe34399d4f33bf83a8fb9e0bd0ed2f529f759" [[projects]] digest = "1:d367886e3a8134415fad58fb2ac44e2f38aa88068adca7a02d59a555f87085c0" @@ -1814,7 +1814,6 @@ "github.com/ethereum/go-ethereum/crypto", "github.com/ethereum/go-ethereum/log", "github.com/gobuffalo/packr/v2", - "github.com/gobuffalo/packr/v2/file/resolver", "github.com/gogo/protobuf/proto", "github.com/golang/protobuf/proto", "github.com/gorilla/mux", diff --git a/Gopkg.toml b/Gopkg.toml index 6c10de95..65eef014 100644 --- a/Gopkg.toml +++ b/Gopkg.toml @@ -56,7 +56,7 @@ [[constraint]] name = "github.com/quorumcontrol/tupelo-go-client" - revision = "4ab4685bbbf2787ec55b02ee66cb09d3161b2eaa" # fix/timeouts + revision = "9b2fe34399d4f33bf83a8fb9e0bd0ed2f529f759" # fix/timeouts [[constraint]] name = "github.com/gobuffalo/packr" From 27fc030d8a5585568f3faa5fbb49cd6375b98865 Mon Sep 17 00:00:00 2001 From: Brandon Westcott Date: Fri, 22 Mar 2019 15:09:25 -0400 Subject: [PATCH 29/30] Use latest tupelo-go-client version --- Gopkg.lock | 5 +++-- Gopkg.toml | 2 +- 2 files changed, 4 insertions(+), 3 deletions(-) diff --git a/Gopkg.lock b/Gopkg.lock index 8000b8ad..138c5c7e 100644 --- a/Gopkg.lock +++ b/Gopkg.lock @@ -1319,7 +1319,7 @@ version = "v1.1.1" [[projects]] - digest = "1:8e1c510830f8612ff8bc987dc715bfadafd3ad89c3e83c3887a050cf5200535f" + digest = "1:b165a648a1600fcd47140ebd672479af92cc859c037701e679e166e870267847" name = "github.com/quorumcontrol/tupelo-go-client" packages = [ "bls", @@ -1334,7 +1334,8 @@ "tracing", ] pruneopts = "" - revision = "9b2fe34399d4f33bf83a8fb9e0bd0ed2f529f759" + revision = "14462356726b7f74cb869ac127a79d6559598733" + version = "v0.1.0" [[projects]] digest = "1:d367886e3a8134415fad58fb2ac44e2f38aa88068adca7a02d59a555f87085c0" diff --git a/Gopkg.toml b/Gopkg.toml index 65eef014..2a514013 100644 --- a/Gopkg.toml +++ b/Gopkg.toml @@ -56,7 +56,7 @@ [[constraint]] name = "github.com/quorumcontrol/tupelo-go-client" - revision = "9b2fe34399d4f33bf83a8fb9e0bd0ed2f529f759" # fix/timeouts + version = "v0.1.0" [[constraint]] name = "github.com/gobuffalo/packr" From c237998b5a2c32df434d4975bb81c1e888cd7514 Mon Sep 17 00:00:00 2001 From: Wes Morgan Date: Fri, 22 Mar 2019 13:44:16 -0600 Subject: [PATCH 30/30] Bump integration test sub timeout to 90s Seeing several timeouts from this in CI, but passes the majority of the time locally. Weird that it takes this long, but hoping this gets us out of retrying CI builds so much for now. --- gossip3/tupelo_integration_test.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/gossip3/tupelo_integration_test.go b/gossip3/tupelo_integration_test.go index 5f2a7cf4..7ac0d2d0 100644 --- a/gossip3/tupelo_integration_test.go +++ b/gossip3/tupelo_integration_test.go @@ -230,7 +230,7 @@ func TestLibP2PSigning(t *testing.T) { newTip, err := cid.Cast(trans.NewTip) require.Nil(t, err) - ch, err := client.Subscribe(systems[0].AllSigners()[0], string(trans.ObjectID), newTip, 60*time.Second) + ch, err := client.Subscribe(systems[0].AllSigners()[0], string(trans.ObjectID), newTip, 90*time.Second) require.Nil(t, err) client.SendTransaction(systems[0].GetRandomSigner(), &trans)