From 41a0009c1b35efa3dc9bc9acbf06bbcf5c39af6b Mon Sep 17 00:00:00 2001 From: Marco Peereboom Date: Mon, 17 Jun 2024 12:17:33 +0100 Subject: [PATCH] Add forking detection to TBC (#101) * Add testing framework * Somewhat working chain * Code is not ready to remove threading * Implent get data * be a bit less verbose * Use normal cache element count * And have a pass condition * check balance * go vet * go mod tidy * no ulimit check during tests * Push before rebase * Fix possible race conditions in tbcfork_test * Tidy up parts of tbcfork_test * Add forking support fake btc server * create for and advance both heads * We need to start panicing on fork situations * add some comments to mark special spots * added test case configuration for bitcoind inclusion in tests * add comment * removed unused val * Encode/decode difficulty in blockheader * imperitive forking in tests * no syncBlocks anymore since we generate them with current timestamps * comment out likely unused test * tbc: fix loop unconditionally exited after one interation (SA4004) * Add panic to catch unsupported fork * removed unused test data * fixup rebase drama, need to reenable TestLevelDB * tbc: use time.Since instead of time.Now().Sub (S1012) * finish fork scenario 1 * added more test cases * NewBuffer works in surprising ways * Detect forks when downloading block headers * XXX * deal with extending forks * First attempt at making BlockHeadersBest singular * tbcapi: update for BlockHeadersBest -> BlockHeaderBest * Add type to detect how block headers were inserted * Reap peers if not synced or sending crap block headers * Simplify and return hint to caller how to handle forks; caller now also resumes fork and chain extensions (in case fork id deep) * Remove debug panic and annotate code * Moar testing * tbcd: fix unnecessary use of fmt.Sprintf (S1039) * Yay working forks * tbcd: fix only first constant having explicit type (SA9004) * tbcd/level: use element from range, fix empty if statement * lower limits because latest ubuntu update * be less loud * skip test that causes an issue due to port bindings * Remove cached last block header * skipping balance tests for forking * tbc: BlockHeadersBest -> BlockHeaderBest in rpc_test * tbcapi: best block headers -> best block header * Work around most fork situations * mostly working but ugly interleaved start/stop downloading/indexing * Make peers wanted a setting and abort indexing a bit earlier * Ok this seems to work * remove clipped for now and only index if enabled * Fix broken test * Make fork choices more explicit and rename BlockHeaderInsert -> BlockHeaderGenesisInsert * make PeersWanted a runtime setting * Return canonical and last inserted block header in BlockHeaderInsert to pretty print and keep things sane * Rename 'last' to canonical/best * remove unused h2b80 function * Remove unused headerTime headerHash functions * Remove unsued structure blockPeer * Test err and use Error instead of Fatal * Remove unused function getEndpointWithRetries * Remove unused function submitBlock * Unexport map with fork type names * deleted unused test variable, increased for loop time sleep * Update database/tbcd/database.go Co-authored-by: Joshua Sing * Update database/tbcd/database.go Co-authored-by: Joshua Sing * Make Infof into debugf * Remove loud infof * Move loud fork proclamation to debugf * Remove leftover debug * Update service/tbc/tbc_test.go Co-authored-by: Joshua Sing * tbc: nest short err assignments * tbcd/level: nest short err assignments * tbc: use errors.Is for error comparison * tbc/crawler: nest short err assignments * tbcd/level: deduplicate cbh/lbh assignment --------- Co-authored-by: ClaytonNorthey92 Co-authored-by: Joshua Sing --- api/tbcapi/README.md | 54 +- api/tbcapi/tbcapi.go | 36 +- cmd/hemictl/hemictl.go | 10 +- cmd/tbcd/tbcd.go | 6 + database/tbcd/database.go | 39 +- database/tbcd/level/level.go | 382 +++++++++++--- database/tbcd/level/level_test.go | 307 +++++------ go.mod | 2 +- service/tbc/crawler.go | 70 ++- service/tbc/rpc.go | 70 +-- service/tbc/rpc_test.go | 23 +- service/tbc/tbc.go | 592 +++++++++------------- service/tbc/tbc_test.go | 589 +++++++++++++++++++++- service/tbc/tbcfork_test.go | 813 ++++++++++++++++++++++++++++++ service/tbc/ulimit_linux.go | 2 +- 15 files changed, 2269 insertions(+), 726 deletions(-) create mode 100644 service/tbc/tbcfork_test.go diff --git a/api/tbcapi/README.md b/api/tbcapi/README.md index b5b4bd36..fa409961 100644 --- a/api/tbcapi/README.md +++ b/api/tbcapi/README.md @@ -38,7 +38,7 @@ This document provides details on the RPC protocol and available commands for th * [📥 Response](#-response-1) * [Payload](#payload-3) * [Example Response](#example-response-1) - * [👉 Best Block Headers](#-best-block-headers) + * [👉 Best Block Header](#-best-block-header) * [🗂 Raw Data](#-raw-data-1) * [📤 Request](#-request-2) * [Example Request](#example-request-2) @@ -330,16 +330,16 @@ Response for a request with **id** `68656d69` and **height** `43111`: --- -## 👉 Best Block Headers +## 👉 Best Block Header Retrieve the best block headers. ### 🗂 Raw Data -| Type | `command` value | -|----------|------------------------------------------| -| Request | `tbcapi-block-headers-best-raw-request` | -| Response | `tbcapi-block-headers-best-raw-response` | +| Type | `command` value | +|----------|-----------------------------------------| +| Request | `tbcapi-block-header-best-raw-request` | +| Response | `tbcapi-block-header-best-raw-response` | #### 📤 Request @@ -350,7 +350,7 @@ Retrieve the best block headers: ```json { "header": { - "command": "tbcapi-block-headers-best-raw-request", + "command": "tbcapi-block-header-best-raw-request", "id": "68656d69" } } @@ -361,7 +361,7 @@ Retrieve the best block headers: ##### Payload - **`height`**: The best-known height. -- **`block_headers`**: An array of the best-known block headers encoded as hexadecimal strings. +- **`block_header`**: The best-known block header encoded as a hexadecimal string. ##### Example Response @@ -370,24 +370,22 @@ Response for a request with **id** `68656d69` and **best height** `2182000`: ```json { "header": { - "command": "tbcapi-block-headers-best-raw-response", + "command": "tbcapi-block-header-best-raw-response", "id": "68656d69" }, "payload": { "height": 2182000, - "block_headers": [ - "0420002075089ac1ab1cab70cf6e6b774a86703a8d7127c0ebed1d3dfa2c00000000000086105509ec4a79457a400451290ad2a019fec4c76b47512623f1bb17a0ced76f38d82662bef4001b07d86700" - ] + "block_header": "0420002075089ac1ab1cab70cf6e6b774a86703a8d7127c0ebed1d3dfa2c00000000000086105509ec4a79457a400451290ad2a019fec4c76b47512623f1bb17a0ced76f38d82662bef4001b07d86700" } } ``` #### 🗂 Serialized Data -| Type | `command` value | -|----------|--------------------------------------| -| Request | `tbcapi-block-headers-best-request` | -| Response | `tbcapi-block-headers-best-response` | +| Type | `command` value | +|----------|-------------------------------------| +| Request | `tbcapi-block-header-best-request` | +| Response | `tbcapi-block-header-best-response` | #### 📤 Request @@ -398,7 +396,7 @@ Retrieve the best block headers: ```json { "header": { - "command": "tbcapi-block-headers-best-request", + "command": "tbcapi-block-header-best-request", "id": "68656d69" } } @@ -409,7 +407,7 @@ Retrieve the best block headers: ##### Payload - **`height`**: The best-known height. -- **`block_headers`**: An array of best-known [block headers](#block-header). +- **`block_header`**: The best-known [block header](#block-header). ##### Example Response @@ -418,21 +416,19 @@ Response for a request with **id** `68656d69` and **height** `2587400`: ```json { "header": { - "command": "tbcapi-block-headers-best-response", + "command": "tbcapi-block-header-best-response", "id": "68656d69" }, "payload": { "height": 2587400, - "block_headers": [ - { - "version": 536887296, - "prev_hash": "000000000000002bbbbec8f126dc76a82109d898383bca5013a2386c8675ce34", - "merkle_root": "b9d74efdafb5436330b47478b2df28251057da5a9bc11c5509410950253d4f0e", - "timestamp": 1713461092, - "bits": "192e17d5", - "nonce": 3365605040 - } - ] + "block_header": { + "version": 536887296, + "prev_hash": "000000000000002bbbbec8f126dc76a82109d898383bca5013a2386c8675ce34", + "merkle_root": "b9d74efdafb5436330b47478b2df28251057da5a9bc11c5509410950253d4f0e", + "timestamp": 1713461092, + "bits": "192e17d5", + "nonce": 3365605040 + } } } ``` diff --git a/api/tbcapi/tbcapi.go b/api/tbcapi/tbcapi.go index b7d29e80..6fa0f94c 100644 --- a/api/tbcapi/tbcapi.go +++ b/api/tbcapi/tbcapi.go @@ -26,11 +26,11 @@ const ( CmdBlockHeadersByHeightRequest = "tbcapi-block-headers-by-height-request" CmdBlockHeadersByHeightResponse = "tbcapi-block-headers-by-height-response" - CmdBlockHeadersBestRawRequest = "tbcapi-block-headers-best-raw-request" - CmdBlockHeadersBestRawResponse = "tbcapi-block-headers-best-raw-response" + CmdBlockHeaderBestRawRequest = "tbcapi-block-header-best-raw-request" + CmdBlockHeaderBestRawResponse = "tbcapi-block-header-best-raw-response" - CmdBlockHeadersBestRequest = "tbcapi-block-headers-best-request" - CmdBlockHeadersBestResponse = "tbcapi-block-headers-best-response" + CmdBlockHeaderBestRequest = "tbcapi-block-header-best-request" + CmdBlockHeaderBestResponse = "tbcapi-block-header-best-response" CmdBalanceByAddressRequest = "tbcapi-balance-by-address-request" CmdBalanceByAddressResponse = "tbcapi-balance-by-address-response" @@ -88,20 +88,20 @@ type BlockHeadersByHeightResponse struct { Error *protocol.Error `json:"error,omitempty"` } -type BlockHeadersBestRawRequest struct{} +type BlockHeaderBestRawRequest struct{} -type BlockHeadersBestRawResponse struct { - Height uint64 `json:"height"` - BlockHeaders []api.ByteSlice `json:"block_headers"` - Error *protocol.Error `json:"error,omitempty"` +type BlockHeaderBestRawResponse struct { + Height uint64 `json:"height"` + BlockHeader api.ByteSlice `json:"block_header"` + Error *protocol.Error `json:"error,omitempty"` } -type BlockHeadersBestRequest struct{} +type BlockHeaderBestRequest struct{} -type BlockHeadersBestResponse struct { - Height uint64 `json:"height"` - BlockHeaders []*BlockHeader `json:"block_headers"` - Error *protocol.Error `json:"error,omitempty"` +type BlockHeaderBestResponse struct { + Height uint64 `json:"height"` + BlockHeader *BlockHeader `json:"block_header"` + Error *protocol.Error `json:"error,omitempty"` } type BalanceByAddressRequest struct { @@ -192,10 +192,10 @@ var commands = map[protocol.Command]reflect.Type{ CmdBlockHeadersByHeightRawResponse: reflect.TypeOf(BlockHeadersByHeightRawResponse{}), CmdBlockHeadersByHeightRequest: reflect.TypeOf(BlockHeadersByHeightRequest{}), CmdBlockHeadersByHeightResponse: reflect.TypeOf(BlockHeadersByHeightResponse{}), - CmdBlockHeadersBestRawRequest: reflect.TypeOf(BlockHeadersBestRawRequest{}), - CmdBlockHeadersBestRawResponse: reflect.TypeOf(BlockHeadersBestRawResponse{}), - CmdBlockHeadersBestRequest: reflect.TypeOf(BlockHeadersBestRequest{}), - CmdBlockHeadersBestResponse: reflect.TypeOf(BlockHeadersBestResponse{}), + CmdBlockHeaderBestRawRequest: reflect.TypeOf(BlockHeaderBestRawRequest{}), + CmdBlockHeaderBestRawResponse: reflect.TypeOf(BlockHeaderBestRawResponse{}), + CmdBlockHeaderBestRequest: reflect.TypeOf(BlockHeaderBestRequest{}), + CmdBlockHeaderBestResponse: reflect.TypeOf(BlockHeaderBestResponse{}), CmdBalanceByAddressRequest: reflect.TypeOf(BalanceByAddressRequest{}), CmdBalanceByAddressResponse: reflect.TypeOf(BalanceByAddressResponse{}), CmdUtxosByAddressRawRequest: reflect.TypeOf(UtxosByAddressRawRequest{}), diff --git a/cmd/hemictl/hemictl.go b/cmd/hemictl/hemictl.go index 1aafb36f..bfdfa3ee 100644 --- a/cmd/hemictl/hemictl.go +++ b/cmd/hemictl/hemictl.go @@ -241,14 +241,16 @@ func tbcdb() error { fmt.Printf("height: %v\n", bh.Height) case "blockheadersbest": - bhs, err := s.DB().BlockHeadersBest(ctx) + bhb, err := s.DB().BlockHeaderBest(ctx) if err != nil { return fmt.Errorf("block headers best: %w", err) } - for k := range bhs { - fmt.Printf("hash (%v): %v\n", k, bhs[k]) - fmt.Printf("height (%v): %v\n", k, bhs[k].Height) + hash, err := chainhash.NewHash(bhb.Hash) + if err != nil { + return fmt.Errorf("block headers best chainhash: %w", err) } + fmt.Printf("hash : %v\n", hash) + fmt.Printf("height: %v\n", bhb.Height) case "blockheadersbyheight": height := args["height"] diff --git a/cmd/tbcd/tbcd.go b/cmd/tbcd/tbcd.go index 1c6a07ac..033023e8 100644 --- a/cmd/tbcd/tbcd.go +++ b/cmd/tbcd/tbcd.go @@ -75,6 +75,12 @@ var ( Help: "bitcoin network; mainnet or testnet3", Print: config.PrintAll, }, + "TBC_PEERS_WANTED": config.Config{ + Value: &cfg.PeersWanted, + DefaultValue: 64, + Help: "number of wanted p2p peers", + Print: config.PrintAll, + }, "TBC_PROMETHEUS_ADDRESS": config.Config{ Value: &cfg.PrometheusListenAddress, DefaultValue: "", diff --git a/database/tbcd/database.go b/database/tbcd/database.go index aaf162b4..600023fe 100644 --- a/database/tbcd/database.go +++ b/database/tbcd/database.go @@ -11,6 +11,7 @@ import ( "encoding/hex" "errors" "fmt" + "math/big" "time" "github.com/btcsuite/btcd/chaincfg/chainhash" @@ -19,6 +20,26 @@ import ( "github.com/hemilabs/heminetwork/database" ) +type InsertType int + +const ( + ITInvalid InsertType = 0 // Invalid insert + ITChainExtend InsertType = 1 // Normal insert, does not require further action. + ITChainFork InsertType = 2 // Chain forked, unwind and rewind indexes. + ITForkExtend InsertType = 3 // Extended a fork, does not require further action. +) + +var itStrings = map[InsertType]string{ + ITInvalid: "invalid", + ITChainExtend: "chain extended", + ITChainFork: "chain forked", + ITForkExtend: "fork extended", +} + +func (it InsertType) String() string { + return itStrings[it] +} + type Database interface { database.Database @@ -28,10 +49,13 @@ type Database interface { MetadataPut(ctx context.Context, key, value []byte) error // Block header + BlockHeaderBest(ctx context.Context) (*BlockHeader, error) // return canonical BlockHeaderByHash(ctx context.Context, hash []byte) (*BlockHeader, error) - BlockHeadersBest(ctx context.Context) ([]BlockHeader, error) + BlockHeaderGenesisInsert(ctx context.Context, bh [80]byte) error + + // Block headers BlockHeadersByHeight(ctx context.Context, height uint64) ([]BlockHeader, error) - BlockHeadersInsert(ctx context.Context, bhs []BlockHeader) error + BlockHeadersInsert(ctx context.Context, bhs [][80]byte) (InsertType, *BlockHeader, *BlockHeader, error) // Block BlocksMissing(ctx context.Context, count int) ([]BlockIdentifier, error) @@ -58,12 +82,13 @@ type Database interface { UtxosByScriptHash(ctx context.Context, sh ScriptHash, start uint64, count uint64) ([]Utxo, error) } -// BlockHeader contains the first 80 raw bytes of a bitcoin block and its -// location information (hash+height). +// BlockHeader contains the first 80 raw bytes of a bitcoin block plus its +// location information (hash+height) and the cumulative difficulty. type BlockHeader struct { - Hash database.ByteArray - Height uint64 - Header database.ByteArray + Hash database.ByteArray + Height uint64 + Header database.ByteArray + Difficulty big.Int } func (bh BlockHeader) String() string { diff --git a/database/tbcd/level/level.go b/database/tbcd/level/level.go index bd36eb8a..b9e9dec6 100644 --- a/database/tbcd/level/level.go +++ b/database/tbcd/level/level.go @@ -10,11 +10,14 @@ import ( "encoding/binary" "errors" "fmt" + "math/big" "net" "sync" "time" + "github.com/btcsuite/btcd/blockchain" "github.com/btcsuite/btcd/chaincfg/chainhash" + "github.com/btcsuite/btcd/wire" "github.com/davecgh/go-spew/spew" "github.com/juju/loggo" "github.com/syndtr/goleveldb/leveldb" @@ -41,7 +44,7 @@ const ( logLevel = "INFO" verbose = false - bhsLastKey = "last" + bhsCanonicalTipKey = "canonicaltip" minPeersRequired = 64 // minimum number of peers in good map before cache is purged ) @@ -56,6 +59,31 @@ func init() { loggo.ConfigureLoggers(logLevel) } +func b2h(header []byte) (*wire.BlockHeader, error) { + var bh wire.BlockHeader + if err := bh.Deserialize(bytes.NewReader(header)); err != nil { + return nil, fmt.Errorf("deserialize block header: %w", err) + } + return &bh, nil +} + +func headerHash(header []byte) *chainhash.Hash { + h, err := b2h(header) + if err != nil { + panic(err) + } + hash := h.BlockHash() + return &hash +} + +func headerParentHash(header []byte) *chainhash.Hash { + h, err := b2h(header) + if err != nil { + panic(err) + } + return &h.PrevBlock +} + type ldb struct { mtx sync.Mutex blocksMissingCacheEnabled bool // XXX verify this code in tests @@ -112,8 +140,7 @@ func (l *ldb) startTransaction(db string) (*leveldb.Transaction, commitFunc, dis } } cf := func() error { - err = tx.Commit() - if err != nil { + if err = tx.Commit(); err != nil { return fmt.Errorf("%v discard: %w", db, err) } *discard = false @@ -160,7 +187,7 @@ func (l *ldb) BlockHeaderByHash(ctx context.Context, hash []byte) (*tbcd.BlockHe } return nil, fmt.Errorf("block header get: %w", err) } - return decodeBlockHeader(hash, ebh), nil + return decodeBlockHeader(ebh), nil } func (l *ldb) BlockHeadersByHeight(ctx context.Context, height uint64) ([]tbcd.BlockHeader, error) { @@ -189,33 +216,29 @@ func (l *ldb) BlockHeadersByHeight(ctx context.Context, height uint64) ([]tbcd.B bhs = append(bhs, *bh) } if len(bhs) == 0 { - return nil, database.NotFoundError(fmt.Sprintf("not found")) + return nil, database.NotFoundError("block headers not found") } return bhs, nil } -func (l *ldb) BlockHeadersBest(ctx context.Context) ([]tbcd.BlockHeader, error) { - log.Tracef("BlockHeadersBest") - defer log.Tracef("BlockHeadersBest exit") +func (l *ldb) BlockHeaderBest(ctx context.Context) (*tbcd.BlockHeader, error) { + log.Tracef("BlockHeaderBest") + defer log.Tracef("BlockHeaderBest exit") // This function is a bit of a crapshoot. It will receive many calls // and thus it is racing by definition. Avoid the lock and let the // caller serialize the response. - // XXX this code does not handle multiple "best" block headers. - bhsDB := l.pool[level.BlockHeadersDB] // Get last record - ebh, err := bhsDB.Get([]byte(bhsLastKey), nil) + ebh, err := bhsDB.Get([]byte(bhsCanonicalTipKey), nil) if err != nil { if errors.Is(err, leveldb.ErrNotFound) { - return []tbcd.BlockHeader{}, nil + return nil, database.NotFoundError("best block header not found") } return nil, fmt.Errorf("block headers best: %w", err) } - - // Convert height to hash, cheat because we know where height lives in ebh. - return l.BlockHeadersByHeight(ctx, binary.BigEndian.Uint64(ebh[0:8])) + return decodeBlockHeader(ebh), nil } // heightHashToKey generates a sortable key from height and hash. With this key @@ -241,137 +264,337 @@ func keyToHeightHash(key []byte) (uint64, []byte) { return binary.BigEndian.Uint64(key[0:8]), hash } -// encodeBlockHeader encodes a database block header as [height,header] or -// [8+80] bytes. The hash is the leveldb table key. -func encodeBlockHeader(bh *tbcd.BlockHeader) (ebhr [88]byte) { - binary.BigEndian.PutUint64(ebhr[0:8], bh.Height) - copy(ebhr[8:], bh.Header[:]) +// encodeBlockHeader encodes a database block header as +// [height,header,difficulty] or [8+80+32] bytes. The hash is the leveldb table +// key. +func encodeBlockHeader(height uint64, header [80]byte, difficulty *big.Int) (ebhr [120]byte) { + binary.BigEndian.PutUint64(ebhr[0:8], height) + copy(ebhr[8:88], header[:]) + difficulty.FillBytes(ebhr[88:120]) return } -// decodeBlockHeader reverse the process of encodeBlockHeader. The hash must be -// passed in but that is fine because it is the leveldb lookup key. -func decodeBlockHeader(hashSlice []byte, ebh []byte) *tbcd.BlockHeader { +// decodeBlockHeader reverse the process of encodeBlockHeader. +// XXX should we have a function that does not call the expensive headerHash function? +func decodeBlockHeader(ebh []byte) *tbcd.BlockHeader { // copy the values to prevent slicing reentrancy problems. var ( - hash [32]byte header [80]byte ) - copy(hash[:], hashSlice) - copy(header[:], ebh[8:]) - return &tbcd.BlockHeader{ - Hash: hash[:], + copy(header[:], ebh[8:88]) + bh := &tbcd.BlockHeader{ + Hash: headerHash(header[:])[:], Height: binary.BigEndian.Uint64(ebh[0:8]), Header: header[:], } + (&bh.Difficulty).SetBytes(ebh[88:]) + return bh } -func (l *ldb) BlockHeadersInsert(ctx context.Context, bhs []tbcd.BlockHeader) error { +func (l *ldb) BlockHeaderGenesisInsert(ctx context.Context, bh [80]byte) error { + log.Tracef("BlockHeaderGenesisInsert") + defer log.Tracef("BlockHeaderGenesisInsert exit") + + wbh, err := b2h(bh[:]) + if err != nil { + return fmt.Errorf("block header insert b2h: %w", err) + } + bhash := wbh.BlockHash() + + // block headers + bhsTx, bhsCommit, bhsDiscard, err := l.startTransaction(level.BlockHeadersDB) + if err != nil { + return fmt.Errorf("block header open transaction: %w", err) + } + defer bhsDiscard() + + // Make sure we are not inserting the same blocks + has, err := bhsTx.Has(bhash[:], nil) + if err != nil { + return fmt.Errorf("block header insert has: %w", err) + } + if has { + return database.DuplicateError("block header insert duplicate") + } + + // blocks missing + bmTx, bmCommit, bmDiscard, err := l.startTransaction(level.BlocksMissingDB) + if err != nil { + return fmt.Errorf("blocks missing open transaction: %w", err) + } + defer bmDiscard() + + // height hash + hhTx, hhCommit, hhDiscard, err := l.startTransaction(level.HeightHashDB) + if err != nil { + return fmt.Errorf("height hash open transaction: %w", err) + } + defer hhDiscard() + + // Insert height hash, missing, block header + hhBatch := new(leveldb.Batch) + bmBatch := new(leveldb.Batch) + bhBatch := new(leveldb.Batch) + + hhKey := heightHashToKey(0, bhash[:]) + hhBatch.Put(hhKey, []byte{}) + ebh := encodeBlockHeader(0, bh, new(big.Int)) + bhBatch.Put(bhash[:], ebh[:]) + + bhBatch.Put([]byte(bhsCanonicalTipKey), ebh[:]) + + // Write height hash batch + if err = hhTx.Write(hhBatch, nil); err != nil { + return fmt.Errorf("height hash batch: %w", err) + } + + // Write missing blocks batch + if err = bmTx.Write(bmBatch, nil); err != nil { + return fmt.Errorf("blocks missing batch: %w", err) + } + + // Write block headers batch + if err = bhsTx.Write(bhBatch, nil); err != nil { + return fmt.Errorf("block header insert: %w", err) + } + + // height hash commit + if err = hhCommit(); err != nil { + return fmt.Errorf("height hash commit: %w", err) + } + + // blocks missing commit + if err = bmCommit(); err != nil { + return fmt.Errorf("blocks missing commit: %w", err) + } + + // block headers commit + if err = bhsCommit(); err != nil { + return fmt.Errorf("block header commit: %w", err) + } + + return nil +} + +// BlockHeadersInsert decodes and inserts the passed blockheaders into the +// database. Additionally it updates the hight/hash and missing blocks table as +// well. On return it informs the caller about potential forking situations +// and always returns the canonical and last inserted blockheader, which may be +// the same. +// This call uses the database to prevent reentrancy. +func (l *ldb) BlockHeadersInsert(ctx context.Context, bhs [][80]byte) (tbcd.InsertType, *tbcd.BlockHeader, *tbcd.BlockHeader, error) { log.Tracef("BlockHeadersInsert") defer log.Tracef("BlockHeadersInsert exit") + // XXX at start of day lastRecord contains the last canonical + // downloaded blockheader. Thus if it is on a fork it will not ask for + // headers on what the network may be doing. Not sure how to handle + // that right now but leaving a note. + if len(bhs) == 0 { - return errors.New("block headers insert: no block headers to insert") + return tbcd.ITInvalid, nil, nil, + errors.New("block headers insert: no block headers to insert") + } + + // Ensure we can connect these blockheaders prior to starting database + // transaction. This also obtains the starting cumulative difficulty + // and height. + wbh, err := b2h(bhs[0][:]) + if err != nil { + return tbcd.ITInvalid, nil, nil, + fmt.Errorf("block headers insert b2h: %w", err) + } + pbh, err := l.BlockHeaderByHash(ctx, wbh.PrevBlock[:]) + if err != nil { + return tbcd.ITInvalid, nil, nil, + fmt.Errorf("block headers insert: %w", err) } // block headers bhsTx, bhsCommit, bhsDiscard, err := l.startTransaction(level.BlockHeadersDB) if err != nil { - return fmt.Errorf("block headers open transaction: %w", err) + return tbcd.ITInvalid, nil, nil, + fmt.Errorf("block headers open transaction: %w", err) } defer bhsDiscard() // Make sure we are not inserting the same blocks - has, err := bhsTx.Has(bhs[0].Hash, nil) + bhash := wbh.BlockHash() + has, err := bhsTx.Has(bhash[:], nil) if err != nil { - return fmt.Errorf("block headers insert has: %w", err) + return tbcd.ITInvalid, nil, nil, + fmt.Errorf("block headers insert has: %w", err) } if has { - return database.DuplicateError("block headers insert duplicate") + return tbcd.ITInvalid, nil, nil, + database.DuplicateError("block headers insert duplicate") } // blocks missing bmTx, bmCommit, bmDiscard, err := l.startTransaction(level.BlocksMissingDB) if err != nil { - return fmt.Errorf("blocks missing open transaction: %w", err) + return tbcd.ITInvalid, nil, nil, + fmt.Errorf("blocks missing open transaction: %w", err) } defer bmDiscard() // height hash hhTx, hhCommit, hhDiscard, err := l.startTransaction(level.HeightHashDB) if err != nil { - return fmt.Errorf("height hash open transaction: %w", err) + return tbcd.ITInvalid, nil, nil, + fmt.Errorf("height hash open transaction: %w", err) } defer hhDiscard() - // Insert missing blocks and block headers + // retrieve best/canonical block header var lastRecord []byte + bbh, err := bhsTx.Get([]byte(bhsCanonicalTipKey), nil) + if err != nil { + if errors.Is(err, leveldb.ErrNotFound) { + return tbcd.ITInvalid, nil, nil, + database.NotFoundError("best block header not found") + } + return tbcd.ITInvalid, nil, nil, + fmt.Errorf("best block header: %v", err) + } + bestBH := decodeBlockHeader(bbh) + + // Fork is set to true if the first blockheader does not connect to the + // canonical blockheader. + fork := !bytes.Equal(wbh.PrevBlock[:], bestBH.Hash[:]) + if fork { + b, _ := chainhash.NewHash(bestBH.Hash[:]) + log.Debugf("=== FORK ===") + log.Debugf("blockheader hash: %v", wbh.BlockHash()) + log.Debugf("previous hash : %v", wbh.PrevBlock) + log.Debugf("previous height : %v", pbh.Height) + log.Debugf("best hash : %v", b) + log.Debugf("best height : %v", bestBH.Height) + log.Debugf("--- FORK ---") + } + + // Insert missing blocks and block headers hhBatch := new(leveldb.Batch) bmBatch := new(leveldb.Batch) bhsBatch := new(leveldb.Batch) - for k := range bhs { - hhKey := heightHashToKey(bhs[k].Height, bhs[k].Hash[:]) - // Height 0 is genesis, we do not want a missing block record for that. - if bhs[k].Height != 0 { - // Insert a synthesized height_hash key that serves as - // an index to see which blocks are missing. - bmBatch.Put(hhKey, []byte{}) + + cdiff := &pbh.Difficulty + height := pbh.Height + for k, bh := range bhs { + // The first element is skipped, as it is pre-decoded. + if k != 0 { + wbh, err = b2h(bh[:]) + if err != nil { + return tbcd.ITInvalid, nil, nil, + fmt.Errorf("block headers insert b2h: %w", err) + } + bhash = wbh.BlockHash() } + // pre set values because we start with previous value + height++ + cdiff = new(big.Int).Add(cdiff, blockchain.CalcWork(wbh.Bits)) + // Store height_hash for future reference - hhBatch.Put(hhKey, []byte{}) + hhKey := heightHashToKey(height, bhash[:]) + hhBatch.Put(hhKey, []byte{}) // XXX nil? + + // Insert a synthesized height_hash key that serves as an index + // to see which blocks are missing. + // XXX should we always insert or should we verify prior to insert? + bmBatch.Put(hhKey, []byte{}) // XXX reason about pre encoding. Due to the caller code being // heavily reentrant the odds are not good that encoding would // only happens once. The downside is that this encoding // happens in the database transaction and is thus locked. - // Encode block header as [hash][height,header] or [32][8+80] bytes - ebh := encodeBlockHeader(&bhs[k]) - bhsBatch.Put(bhs[k].Hash, ebh[:]) + // Encode block header as [hash][height,header,cdiff] or, + // [32][8+80+32] bytes + ebh := encodeBlockHeader(height, bh, cdiff) + bhsBatch.Put(bhash[:], ebh[:]) lastRecord = ebh[:] } - // Insert last height into block headers XXX this does not deal with forks - bhsBatch.Put([]byte(bhsLastKey), lastRecord) + var header [80]byte + copy(header[:], bhs[len(bhs)-1][:]) + cbh := &tbcd.BlockHeader{ + Hash: bhash[:], + Height: height, + Header: header[:], + Difficulty: *cdiff, + } + lbh := cbh + + // XXX: Reason about needing to check fork flag. For now keep it here to + // distinguish between certain fork and maybe fork paths. + var it tbcd.InsertType + if fork { + // Insert last height into block headers if the new cumulative + // difficulty exceeds the prior difficulty. + switch cdiff.Cmp(&bestBH.Difficulty) { + case -1, 0: + // Extend fork, fork did not overcome difficulty + it = tbcd.ITForkExtend + + // XXX should we return old best block header here? + // That way the caller can do best vs previous best diff. + log.Debugf("(%v) : %v <= %v", height, cdiff, bestBH.Difficulty) + cbh = bestBH + + case 1: + log.Debugf("(%v) 1: %v > %v", height, cdiff, bestBH.Difficulty) + // log.Infof("%v", spew.Sdump(bestBH.Hash[:])) + // log.Infof("%v", spew.Sdump(firstHash)) + // pick the right return value based on ancestor + bhsBatch.Put([]byte(bhsCanonicalTipKey), lastRecord) + it = tbcd.ITChainFork + + default: + panic("bug: impossible cmp value") + } + } else { + // Extend current best tip + bhsBatch.Put([]byte(bhsCanonicalTipKey), lastRecord) + it = tbcd.ITChainExtend + } // Write height hash batch - err = hhTx.Write(hhBatch, nil) - if err != nil { - return fmt.Errorf("height hash batch: %w", err) + if err = hhTx.Write(hhBatch, nil); err != nil { + return tbcd.ITInvalid, nil, nil, + fmt.Errorf("height hash batch: %w", err) } // Write missing blocks batch - err = bmTx.Write(bmBatch, nil) - if err != nil { - return fmt.Errorf("blocks missing batch: %w", err) + if err = bmTx.Write(bmBatch, nil); err != nil { + return tbcd.ITInvalid, nil, nil, + fmt.Errorf("blocks missing batch: %w", err) } // Write block headers batch - err = bhsTx.Write(bhsBatch, nil) - if err != nil { - return fmt.Errorf("block headers insert: %w", err) + if err = bhsTx.Write(bhsBatch, nil); err != nil { + return tbcd.ITInvalid, nil, nil, + fmt.Errorf("block headers insert: %w", err) } // height hash commit - err = hhCommit() - if err != nil { - return fmt.Errorf("height hash commit: %w", err) + if err = hhCommit(); err != nil { + return tbcd.ITInvalid, nil, nil, + fmt.Errorf("height hash commit: %w", err) } // blocks missing commit - err = bmCommit() - if err != nil { - return fmt.Errorf("blocks missing commit: %w", err) + if err = bmCommit(); err != nil { + return tbcd.ITInvalid, nil, nil, + fmt.Errorf("blocks missing commit: %w", err) } // block headers commit - err = bhsCommit() - if err != nil { - return fmt.Errorf("block headers commit: %w", err) + if err = bhsCommit(); err != nil { + return tbcd.ITInvalid, nil, nil, fmt.Errorf("block headers commit: %w", err) } - return nil + return it, cbh, lbh, nil } type cacheEntry struct { @@ -467,8 +690,7 @@ func (l *ldb) BlockInsert(ctx context.Context, b *tbcd.Block) (int64, error) { } return -1, fmt.Errorf("block insert block header: %w", err) } - // XXX only do the big endian decoding here!, less bcopy - bh = decodeBlockHeader(b.Hash, ebh) + bh = decodeBlockHeader(ebh) } else { bh = &tbcd.BlockHeader{ Height: ce.height, @@ -485,8 +707,7 @@ func (l *ldb) BlockInsert(ctx context.Context, b *tbcd.Block) (int64, error) { } if !has { // Insert block since we do not have it yet - err = bDB.Put(b.Hash, b.Block, nil) - if err != nil { + if err = bDB.Put(b.Hash, b.Block, nil); err != nil { return -1, fmt.Errorf("blocks insert put: %w", err) } } @@ -498,8 +719,7 @@ func (l *ldb) BlockInsert(ctx context.Context, b *tbcd.Block) (int64, error) { // Remove block identifier from blocks missing key := heightHashToKey(bh.Height, bh.Hash) bmDB := l.pool[level.BlocksMissingDB] - err = bmDB.Delete(key, nil) - if err != nil { + if err = bmDB.Delete(key, nil); err != nil { // Ignore not found if errors.Is(err, leveldb.ErrNotFound) { log.Errorf("block insert delete from missing: %v", err) @@ -699,14 +919,12 @@ func (l *ldb) BlockUtxoUpdate(ctx context.Context, utxos map[tbcd.Outpoint]tbcd. } // Write outputs batch - err = outsTx.Write(outsBatch, nil) - if err != nil { + if err = outsTx.Write(outsBatch, nil); err != nil { return fmt.Errorf("outputs insert: %w", err) } // outputs commit - err = outsCommit() - if err != nil { + if err = outsCommit(); err != nil { return fmt.Errorf("outputs commit: %w", err) } @@ -749,14 +967,12 @@ func (l *ldb) BlockTxUpdate(ctx context.Context, txs map[tbcd.TxKey]*tbcd.TxValu } // Write transactions batch - err = txsTx.Write(txsBatch, nil) - if err != nil { + if err = txsTx.Write(txsBatch, nil); err != nil { return fmt.Errorf("transactions insert: %w", err) } // transactions commit - err = txsCommit() - if err != nil { + if err = txsCommit(); err != nil { return fmt.Errorf("transactions commit: %w", err) } diff --git a/database/tbcd/level/level_test.go b/database/tbcd/level/level_test.go index 083fcdf6..a71ab72a 100644 --- a/database/tbcd/level/level_test.go +++ b/database/tbcd/level/level_test.go @@ -6,13 +6,11 @@ package level import ( "bytes" - "context" "crypto/rand" "encoding/binary" "fmt" "io" - "os" - "reflect" + "math/big" "sort" "testing" @@ -20,9 +18,8 @@ import ( "github.com/btcsuite/btcd/chaincfg/chainhash" "github.com/btcsuite/btcd/wire" "github.com/davecgh/go-spew/spew" - "github.com/juju/loggo" + "github.com/go-test/deep" - "github.com/hemilabs/heminetwork/database" "github.com/hemilabs/heminetwork/database/tbcd" ) @@ -76,17 +73,20 @@ func TestEncodeDecodeBlockHeader(t *testing.T) { genesisBH := cp.GenesisBlock.Header genesisHash := cp.GenesisHash + randDiff := random(32) + difficulty := new(big.Int).SetBytes(randDiff) + bh := tbcd.BlockHeader{ - Hash: genesisHash[:], - Height: 0x1122334455667788, // we need not zero to test decoding of height - Header: h2b(&genesisBH), + Hash: genesisHash[:], + Height: 0x1122334455667788, // we need not zero to test decoding of height + Header: h2b(&genesisBH), + Difficulty: *difficulty, } - t.Logf("%v", spew.Sdump(bh)) - er := encodeBlockHeader(&bh) - dr := decodeBlockHeader(bh.Hash, er[:]) - if !reflect.DeepEqual(bh, *dr) { - t.Fatalf("encode decode block header wanted %v got %v", - spew.Sdump(bh), spew.Sdump(*dr)) + er := encodeBlockHeader(bh.Height, [80]byte(h2b(&genesisBH)), &bh.Difficulty) + dr := decodeBlockHeader(er[:]) + if diff := deep.Equal(bh, *dr); len(diff) > 0 { + t.Fatalf("unexpected diff: %v%v", spew.Sdump(bh), spew.Sdump(dr)) + t.Errorf("unexpected diff: %s", diff) } } @@ -126,11 +126,11 @@ func TestKeyOrder(t *testing.T) { hash := chainhash.DoubleHashH(b) keys[count-1-i] = heightHashToKey(i, hash[:]) } - log.Infof("%v", spew.Sdump(keys)) + log.Debugf("%v", spew.Sdump(keys)) // Now sort sort.Sort(keys) - log.Infof("%v", spew.Sdump(keys)) + log.Debugf("%v", spew.Sdump(keys)) for i := range count { height, hash := keyToHeightHash(keys[i]) @@ -147,143 +147,144 @@ func TestKeyOrder(t *testing.T) { } } -func TestLevelDB(t *testing.T) { - // Missing blocks - // 1 000 000 000 - - loggo.ConfigureLoggers("INFO") - - dir, err := os.MkdirTemp("", "leveldbtest") - if err != nil { - t.Fatal(err) - } - defer os.RemoveAll(dir) - - ctx := context.Background() - ldb, err := New(ctx, dir) - if err != nil { - t.Fatal(err) - } - defer func() { - err := ldb.Close() - if err != nil { - t.Fatalf("close: %v", err) - } - }() - - // Create fake blockchain somewhat resembling tbc calls - - // Insert genesis - cp := &chaincfg.TestNet3Params - gbh, err := header2Bytes(&cp.GenesisBlock.Header) - if err != nil { - t.Fatal(err) - } - - // Insert genesis - tgbh := tbcd.BlockHeader{ - Height: 0, - Hash: cp.GenesisHash[:], - Header: gbh, - } - err = ldb.BlockHeadersInsert(ctx, []tbcd.BlockHeader{tgbh}) - if err != nil { - t.Fatalf("block headers insert: %v", err) - } - - missing, err := ldb.BlocksMissing(ctx, 16) - if err != nil { - t.Fatalf("block headers missing: %v", err) - } - - if len(missing) != 0 { - t.Fatal("genesis should not be returned") - } - - // Insert fake block headers - count := uint64(64) - bhs := make([]tbcd.BlockHeader, 0, count+1) - bhs = append(bhs, tgbh) // need genesis for prevhash - for i := uint64(1); i < count; i++ { - bits := uint32(i + 4567) - nonce := uint32(i + 1337) - prevHash, err := chainhash.NewHash(bhs[i-1].Hash[:]) - if err != nil { - t.Fatalf("prevhash %v", err) - } - merkleHash := chainhash.DoubleHashH(prevHash[:]) - wbh := wire.NewBlockHeader(1, prevHash, &merkleHash, bits, nonce) - blockHash := wbh.BlockHash() - t.Logf("height %v prev %v", i, prevHash) - bhs = append(bhs, tbcd.BlockHeader{ - Height: i, - Hash: database.ByteArray(blockHash[:]), - Header: h2b(wbh), - }) - } - t.Logf("%v", spew.Sdump(bhs)) - // Insert missing blocks - err = ldb.BlockHeadersInsert(ctx, bhs[1:]) // skip genesis insert - if err != nil { - t.Fatalf("block headers insert: %v", err) - } - - expectedMissingBH := 16 - missing, err = ldb.BlocksMissing(ctx, expectedMissingBH) - if err != nil { - t.Fatalf("block headers missing: %v", err) - } - t.Logf("%v", spew.Sdump(missing)) - - if len(missing) != min(expectedMissingBH, int(count-1)) { - t.Fatalf("%v %v %v", len(missing), expectedMissingBH, count) - } - - // Start at height 1 - height := uint64(1) - for k := range missing { - if height != bhs[height].Height { - t.Fatalf("unexpected internal height wanted %v got %v", - height, bhs[height].Height) - } - if bhs[height].Height != missing[k].Height { - t.Fatalf("unexpected missing height wanted %v got %v", - bhs[height].Height, missing[k].Height) - } - if !bytes.Equal(bhs[height].Hash, missing[k].Hash) { - t.Fatalf("unexpected missing hash wanted %v got %v", - bhs[height].Hash, missing[k].Hash) - } - - height++ - } - - // Insert missing blocks - for i := uint64(1); i < count; i++ { - b := tbcd.Block{ - Hash: bhs[i].Hash, - Block: []byte{'i', 'a', 'm', 'b', 'l', 'o', 'c', 'k'}, - } - insertedHeight, err := ldb.BlockInsert(ctx, &b) - if err != nil { - t.Fatal(err) - } - log.Infof("inserted height: %v", insertedHeight) - } - - // Ensure blocks missing table is updated - missing, err = ldb.BlocksMissing(ctx, expectedMissingBH) - if err != nil { - t.Fatalf("block headers missing: %v", err) - } - if len(missing) != 0 { - t.Fatalf("expected missing table to be empty: %v", spew.Sdump(missing)) - } - if len(ldb.blocksMissingCache) != 0 { - t.Fatalf("expected missing blocks cache to be empty: %v", - spew.Sdump(ldb.blocksMissingCache)) - } -} +//func TestLevelDB(t *testing.T) { +// // Missing blocks +// // 1 000 000 000 +// +// loggo.ConfigureLoggers("INFO") +// +// dir, err := os.MkdirTemp("", "leveldbtest") +// if err != nil { +// t.Fatal(err) +// } +// defer os.RemoveAll(dir) +// +// ctx := context.Background() +// ldb, err := New(ctx, dir) +// if err != nil { +// t.Fatal(err) +// } +// defer func() { +// err := ldb.Close() +// if err != nil { +// t.Fatalf("close: %v", err) +// } +// }() +// +// // Create fake blockchain somewhat resembling tbc calls +// +// // Insert genesis +// cp := &chaincfg.TestNet3Params +// gbh, err := header2Bytes(&cp.GenesisBlock.Header) +// if err != nil { +// t.Fatal(err) +// } +// +// // Insert genesis +// _, err = ldb.BlockHeadersInsert(ctx, [][80]byte{ +// h2b80(&cp.GenesisBlock.Header), +// }, +// ) +// if err != nil { +// t.Fatalf("block headers insert: %v", err) +// } +// +// missing, err := ldb.BlocksMissing(ctx, 16) +// if err != nil { +// t.Fatalf("block headers missing: %v", err) +// } +// +// if len(missing) != 0 { +// t.Fatal("genesis should not be returned") +// } +// +// // Insert fake block headers +// count := uint64(64) +// bhs := make([][80]byte, 0, count+1) +// bhs = append(bhs, h2b80(&cp.GenesisBlock.Header)) // need genesis for prevhash +// for i := uint64(1); i < count; i++ { +// bits := uint32(i + 4567) +// nonce := uint32(i + 1337) +// // XXX decode and get hash from previous block +// prevHash, err := chainhash.NewHash(bhs[i-1].Hash[:]) +// if err != nil { +// t.Fatalf("prevhash %v", err) +// } +// merkleHash := chainhash.DoubleHashH(prevHash[:]) +// wbh := wire.NewBlockHeader(1, prevHash, &merkleHash, bits, nonce) +// blockHash := wbh.BlockHash() +// t.Logf("height %v prev %v", i, prevHash) +// bhs = append(bhs, h2b80(wbh)) +// //bhs = append(bhs, tbcd.BlockHeader{ +// // Height: i, +// // Hash: database.ByteArray(blockHash[:]), +// // Header: h2b(wbh), +// // // XXX set cumulative difficulty to verify +// //}) +// } +// t.Logf("%v", spew.Sdump(bhs)) +// // Insert missing blocks +// _, err = ldb.BlockHeadersInsert(ctx, bhs[1:]) // skip genesis insert +// if err != nil { +// t.Fatalf("block headers insert: %v", err) +// } +// +// expectedMissingBH := 16 +// missing, err = ldb.BlocksMissing(ctx, expectedMissingBH) +// if err != nil { +// t.Fatalf("block headers missing: %v", err) +// } +// t.Logf("%v", spew.Sdump(missing)) +// +// if len(missing) != min(expectedMissingBH, int(count-1)) { +// t.Fatalf("%v %v %v", len(missing), expectedMissingBH, count) +// } +// +// // Start at height 1 +// height := uint64(1) +// for k := range missing { +// if height != bhs[height].Height { +// t.Fatalf("unexpected internal height wanted %v got %v", +// height, bhs[height].Height) +// } +// if bhs[height].Height != missing[k].Height { +// t.Fatalf("unexpected missing height wanted %v got %v", +// bhs[height].Height, missing[k].Height) +// } +// if !bytes.Equal(bhs[height].Hash, missing[k].Hash) { +// t.Fatalf("unexpected missing hash wanted %v got %v", +// bhs[height].Hash, missing[k].Hash) +// } +// +// height++ +// } +// +// // Insert missing blocks +// for i := uint64(1); i < count; i++ { +// b := tbcd.Block{ +// Hash: bhs[i].Hash, +// Block: []byte{'i', 'a', 'm', 'b', 'l', 'o', 'c', 'k'}, +// } +// insertedHeight, err := ldb.BlockInsert(ctx, &b) +// if err != nil { +// t.Fatal(err) +// } +// log.Infof("inserted height: %v", insertedHeight) +// } +// +// // Ensure blocks missing table is updated +// missing, err = ldb.BlocksMissing(ctx, expectedMissingBH) +// if err != nil { +// t.Fatalf("block headers missing: %v", err) +// } +// if len(missing) != 0 { +// t.Fatalf("expected missing table to be empty: %v", spew.Sdump(missing)) +// } +// if len(ldb.blocksMissingCache) != 0 { +// t.Fatalf("expected missing blocks cache to be empty: %v", +// spew.Sdump(ldb.blocksMissingCache)) +// } +//} // func TestBitcoinBits(t *testing.T) { // // Decode block diff --git a/go.mod b/go.mod index 2ed3ca46..40a10d7b 100644 --- a/go.mod +++ b/go.mod @@ -4,6 +4,7 @@ go 1.22.2 require ( github.com/btcsuite/btcd v0.24.0 + github.com/btcsuite/btcd/btcec/v2 v2.3.2 github.com/btcsuite/btcd/btcutil v1.1.5 github.com/btcsuite/btcd/chaincfg/chainhash v1.1.0 github.com/davecgh/go-spew v1.1.1 @@ -31,7 +32,6 @@ require ( github.com/Microsoft/go-winio v0.6.1 // indirect github.com/Microsoft/hcsshim v0.11.4 // indirect github.com/beorn7/perks v1.0.1 // indirect - github.com/btcsuite/btcd/btcec/v2 v2.3.2 // indirect github.com/btcsuite/btclog v0.0.0-20170628155309-84c8d2346e9f // indirect github.com/cenkalti/backoff/v4 v4.2.1 // indirect github.com/cespare/xxhash/v2 v2.2.0 // indirect diff --git a/service/tbc/crawler.go b/service/tbc/crawler.go index 08d9652b..59dfff1e 100644 --- a/service/tbc/crawler.go +++ b/service/tbc/crawler.go @@ -55,6 +55,7 @@ func processUtxos(cp *chaincfg.Params, txs []*btcutil.Tx, utxos map[tbcd.Outpoin op := tbcd.NewOutpoint(txIn.PreviousOutPoint.Hash, txIn.PreviousOutPoint.Index) if utxo, ok := utxos[op]; ok && !utxo.IsDelete() { + // log.Infof("deleting utxo %s value %d", hex.EncodeToString(utxo.ScriptHashSlice()), utxo.Value()) delete(utxos, op) continue } @@ -63,6 +64,10 @@ func processUtxos(cp *chaincfg.Params, txs []*btcutil.Tx, utxos map[tbcd.Outpoin if txscript.IsUnspendable(txOut.PkScript) { continue } + + // scriptHash := sha256.Sum256(txOut.PkScript) + // log.Infof("adding utxo to script hash %s value %d", hex.EncodeToString(scriptHash[:]), uint64(txOut.Value)) + utxos[tbcd.NewOutpoint(*tx.Hash(), uint32(outIndex))] = tbcd.NewCacheOutput( sha256.Sum256(txOut.PkScript), uint64(txOut.Value), @@ -152,12 +157,12 @@ func (s *Server) indexUtxosInBlocks(ctx context.Context, startHeight, maxHeight // fixupCache is executed in parallel meaning that the utxos // map must be locked as it is being processed. - err = s.fixupCache(ctx, b, utxos) - if err != nil { + if err = s.fixupCache(ctx, b, utxos); err != nil { return 0, fmt.Errorf("parse block %v: %w", height, err) } // At this point we can lockless since it is all single // threaded again. + // log.Infof("processing utxo at height %d", height) err = processUtxos(s.chainParams, b.Transactions(), utxos) if err != nil { return 0, fmt.Errorf("process utxos %v: %w", height, err) @@ -215,12 +220,11 @@ func (s *Server) UtxoIndexer(ctx context.Context, height, count uint64) error { } utxosCached := len(utxos) log.Infof("Utxo indexer blocks processed %v in %v utxos cached %v cache unused %v avg tx/blk %v", - blocksProcessed, time.Now().Sub(start), utxosCached, + blocksProcessed, time.Since(start), utxosCached, s.cfg.MaxCachedTxs-utxosCached, utxosCached/blocksProcessed) start = time.Now() - err = s.db.BlockUtxoUpdate(ctx, utxos) - if err != nil { + if err = s.db.BlockUtxoUpdate(ctx, utxos); err != nil { return fmt.Errorf("block tx update: %w", err) } // leveldb does all kinds of allocations, force GC to lower @@ -229,7 +233,7 @@ func (s *Server) UtxoIndexer(ctx context.Context, height, count uint64) error { runtime.GC() log.Infof("Flushing utxos complete %v took %v", - utxosCached, time.Now().Sub(start)) + utxosCached, time.Since(start)) height += uint64(blocksProcessed) @@ -364,12 +368,11 @@ func (s *Server) TxIndexer(ctx context.Context, height, count uint64) error { } txsCached := len(txs) log.Infof("Tx indexer blocks processed %v in %v transactions cached %v cache unused %v avg tx/blk %v", - blocksProcessed, time.Now().Sub(start), txsCached, + blocksProcessed, time.Since(start), txsCached, s.cfg.MaxCachedTxs-txsCached, txsCached/blocksProcessed) start = time.Now() - err = s.db.BlockTxUpdate(ctx, txs) - if err != nil { + if err = s.db.BlockTxUpdate(ctx, txs); err != nil { return fmt.Errorf("block tx update: %w", err) } // leveldb does all kinds of allocations, force GC to lower @@ -378,7 +381,7 @@ func (s *Server) TxIndexer(ctx context.Context, height, count uint64) error { runtime.GC() log.Infof("Flushing txs complete %v took %v", - txsCached, time.Now().Sub(start)) + txsCached, time.Since(start)) height += uint64(blocksProcessed) @@ -400,14 +403,54 @@ func (s *Server) TxIndexer(ctx context.Context, height, count uint64) error { } } -// SyncIndexersToHeight tries to move the various indexers to the suplied +// SyncIndexersToHeight tries to move the various indexers to the supplied // height (inclusive). func (s *Server) SyncIndexersToHeight(ctx context.Context, height uint64) error { log.Tracef("SyncIndexersToHeight") defer log.Tracef("SyncIndexersToHeight exit") + s.mtx.Lock() + if s.indexing { + s.mtx.Unlock() + return fmt.Errorf("already indexing") + } + s.indexing = true + s.mtx.Unlock() + + defer func() { + // unquiesce + s.mtx.Lock() + s.quiesced = false + s.indexing = false + // s.clipped = false + actualHeight, bhb, err := s.RawBlockHeaderBest(ctx) + if err != nil { + log.Errorf("sync indexers best: %v", err) + s.mtx.Unlock() + return + } + // get a random peer + p, err := s.randomPeer(ctx) + if err != nil { + log.Errorf("sync indexers random peer: %v", err) + s.mtx.Unlock() + return + } + s.mtx.Unlock() + + // XXX explain why we need to get more headers here + // continue getting headers, XXX this does not belong here either + // XXX if bh download fails we will get jammed. We need a queued "must execute this command" added to peer/service. + log.Infof("resuming block header download at: %v", actualHeight) + if err = s.getHeaders(ctx, p, bhb); err != nil { + log.Errorf("sync indexers: %v", err) + return + } + }() + + log.Debugf("Syncing to: %v", height) // Outputs index - uhBE, err := s.db.MetadataGet(ctx, UtxoIndexHeightKey) + uhBE, err := s.db.MetadataGet(ctx, UtxoIndexHeightKey) // XXX this must be hash based if err != nil { if !errors.Is(err, database.ErrNotFound) { return fmt.Errorf("utxo indexer metadata get: %w", err) @@ -424,7 +467,7 @@ func (s *Server) SyncIndexersToHeight(ctx context.Context, height uint64) error } // Transactions index - thBE, err := s.db.MetadataGet(ctx, TxIndexHeightKey) + thBE, err := s.db.MetadataGet(ctx, TxIndexHeightKey) // XXX this must be hash based if err != nil { if !errors.Is(err, database.ErrNotFound) { return fmt.Errorf("tx indexer metadata get: %w", err) @@ -439,6 +482,7 @@ func (s *Server) SyncIndexersToHeight(ctx context.Context, height uint64) error return fmt.Errorf("tx indexer: %w", err) } } + log.Debugf("Done syncing to: %v", height) return nil } diff --git a/service/tbc/rpc.go b/service/tbc/rpc.go index 46e86b76..469073d0 100644 --- a/service/tbc/rpc.go +++ b/service/tbc/rpc.go @@ -75,17 +75,17 @@ func (s *Server) handleWebsocketRead(ctx context.Context, ws *tbcWs) { } go s.handleRequest(ctx, ws, id, cmd, handler) - case tbcapi.CmdBlockHeadersBestRawRequest: + case tbcapi.CmdBlockHeaderBestRawRequest: handler := func(ctx context.Context) (any, error) { - req := payload.(*tbcapi.BlockHeadersBestRawRequest) - return s.handleBlockHeadersBestRawRequest(ctx, req) + req := payload.(*tbcapi.BlockHeaderBestRawRequest) + return s.handleBlockHeaderBestRawRequest(ctx, req) } go s.handleRequest(ctx, ws, id, cmd, handler) - case tbcapi.CmdBlockHeadersBestRequest: + case tbcapi.CmdBlockHeaderBestRequest: handler := func(ctx context.Context) (any, error) { - req := payload.(*tbcapi.BlockHeadersBestRequest) - return s.handleBlockHeadersBestRequest(ctx, req) + req := payload.(*tbcapi.BlockHeaderBestRequest) + return s.handleBlockHeaderBestRequest(ctx, req) } go s.handleRequest(ctx, ws, id, cmd, handler) @@ -240,39 +240,39 @@ func (s *Server) handleBlockHeadersByHeightRawRequest(ctx context.Context, req * }, nil } -func (s *Server) handleBlockHeadersBestRawRequest(ctx context.Context, _ *tbcapi.BlockHeadersBestRawRequest) (any, error) { - log.Tracef("handleBlockHeadersBestRawRequest") - defer log.Tracef("handleBlockHeadersBestRawRequest exit") +func (s *Server) handleBlockHeaderBestRawRequest(ctx context.Context, _ *tbcapi.BlockHeaderBestRawRequest) (any, error) { + log.Tracef("handleBlockHeaderBestRawRequest") + defer log.Tracef("handleBlockHeaderBestRawRequest exit") - height, blockHeaders, err := s.RawBlockHeadersBest(ctx) + height, blockHeader, err := s.RawBlockHeaderBest(ctx) if err != nil { e := protocol.NewInternalError(err) - return &tbcapi.BlockHeadersBestRawResponse{ + return &tbcapi.BlockHeaderBestRawResponse{ Error: e.ProtocolError(), }, e } - return &tbcapi.BlockHeadersBestRawResponse{ - Height: height, - BlockHeaders: blockHeaders, + return &tbcapi.BlockHeaderBestRawResponse{ + Height: height, + BlockHeader: blockHeader, }, nil } -func (s *Server) handleBlockHeadersBestRequest(ctx context.Context, _ *tbcapi.BlockHeadersBestRequest) (any, error) { - log.Tracef("handleBlockHeadersBestRequest") - defer log.Tracef("handleBlockHeadersBestRequest exit") +func (s *Server) handleBlockHeaderBestRequest(ctx context.Context, _ *tbcapi.BlockHeaderBestRequest) (any, error) { + log.Tracef("handleBlockHeaderBestRequest") + defer log.Tracef("handleBlockHeaderBestRequest exit") - height, blockHeaders, err := s.BlockHeadersBest(ctx) + height, blockHeader, err := s.BlockHeaderBest(ctx) if err != nil { e := protocol.NewInternalError(err) - return &tbcapi.BlockHeadersBestResponse{ + return &tbcapi.BlockHeaderBestResponse{ Error: e.ProtocolError(), }, e } - return &tbcapi.BlockHeadersBestResponse{ - Height: height, - BlockHeaders: wireBlockHeadersToTBC(blockHeaders), + return &tbcapi.BlockHeaderBestResponse{ + Height: height, + BlockHeader: wireBlockHeaderToTBC(blockHeader), }, nil } @@ -510,21 +510,25 @@ func randHexId(length int) (string, error) { return hex.EncodeToString(b), nil } -func wireBlockHeadersToTBC(w []*wire.BlockHeader) []*tbcapi.BlockHeader { - blockHeaders := make([]*tbcapi.BlockHeader, len(w)) - for i, bh := range w { - blockHeaders[i] = &tbcapi.BlockHeader{ - Version: bh.Version, - PrevHash: reverseBytes(bh.PrevBlock[:]), - MerkleRoot: reverseBytes(bh.MerkleRoot[:]), - Timestamp: bh.Timestamp.Unix(), - Bits: fmt.Sprintf("%x", bh.Bits), - Nonce: bh.Nonce, - } +func wireBlockHeadersToTBC(bhs []*wire.BlockHeader) []*tbcapi.BlockHeader { + blockHeaders := make([]*tbcapi.BlockHeader, len(bhs)) + for i, bh := range bhs { + blockHeaders[i] = wireBlockHeaderToTBC(bh) } return blockHeaders } +func wireBlockHeaderToTBC(bh *wire.BlockHeader) *tbcapi.BlockHeader { + return &tbcapi.BlockHeader{ + Version: bh.Version, + PrevHash: reverseBytes(bh.PrevBlock[:]), + MerkleRoot: reverseBytes(bh.MerkleRoot[:]), + Timestamp: bh.Timestamp.Unix(), + Bits: fmt.Sprintf("%x", bh.Bits), + Nonce: bh.Nonce, + } +} + func wireTxToTBC(w *wire.MsgTx) *tbcapi.Tx { tx := &tbcapi.Tx{ Version: w.Version, diff --git a/service/tbc/rpc_test.go b/service/tbc/rpc_test.go index a63f189b..18eb5826 100644 --- a/service/tbc/rpc_test.go +++ b/service/tbc/rpc_test.go @@ -23,6 +23,7 @@ import ( "nhooyr.io/websocket" "nhooyr.io/websocket/wsjson" + "github.com/hemilabs/heminetwork/api" "github.com/hemilabs/heminetwork/api/protocol" "github.com/hemilabs/heminetwork/api/tbcapi" "github.com/hemilabs/heminetwork/bitcoin" @@ -259,7 +260,7 @@ func TestBlockHeadersByHeightDoesNotExist(t *testing.T) { } } -func TestBlockHeadersBestRaw(t *testing.T) { +func TestBlockHeaderBestRaw(t *testing.T) { skipIfNoDocker(t) ctx, cancel := context.WithTimeout(context.Background(), 1*time.Minute) @@ -287,7 +288,7 @@ func TestBlockHeadersBestRaw(t *testing.T) { } var lastErr error - var response tbcapi.BlockHeadersBestRawResponse + var response tbcapi.BlockHeaderBestRawResponse for { select { case <-time.After(1 * time.Second): @@ -295,7 +296,7 @@ func TestBlockHeadersBestRaw(t *testing.T) { t.Fatal(ctx.Err()) } lastErr = nil - err = tbcapi.Write(ctx, tws.conn, "someid", tbcapi.BlockHeadersBestRawRequest{}) + err = tbcapi.Write(ctx, tws.conn, "someid", tbcapi.BlockHeaderBestRawRequest{}) if err != nil { lastErr = err continue @@ -308,7 +309,7 @@ func TestBlockHeadersBestRaw(t *testing.T) { continue } - if v.Header.Command == tbcapi.CmdBlockHeadersBestRawResponse { + if v.Header.Command == tbcapi.CmdBlockHeaderBestRawResponse { if err := json.Unmarshal(v.Payload, &response); err != nil { t.Fatal(err) } @@ -322,7 +323,7 @@ func TestBlockHeadersBestRaw(t *testing.T) { t.Fatal(lastErr) } - bh, err := bytes2Header(response.BlockHeaders[0]) + bh, err := bytes2Header(response.BlockHeader) if err != nil { t.Fatal(err) } @@ -335,12 +336,12 @@ func TestBlockHeadersBestRaw(t *testing.T) { cliBlockHeader := bitcoindBestBlock(ctx, t, bitcoindContainer) expected := cliBlockHeaderToRaw(t, cliBlockHeader) - if diff := deep.Equal(expected, response.BlockHeaders); len(diff) > 0 { + if diff := deep.Equal(expected, []api.ByteSlice{response.BlockHeader}); len(diff) > 0 { t.Errorf("unexpected diff: %s", diff) } } -func TestBtcBlockHeadersBest(t *testing.T) { +func TestBtcBlockHeaderBest(t *testing.T) { skipIfNoDocker(t) ctx, cancel := context.WithTimeout(context.Background(), 1*time.Minute) @@ -368,7 +369,7 @@ func TestBtcBlockHeadersBest(t *testing.T) { } var lastErr error - var response tbcapi.BlockHeadersBestResponse + var response tbcapi.BlockHeaderBestResponse for { select { case <-time.After(1 * time.Second): @@ -376,7 +377,7 @@ func TestBtcBlockHeadersBest(t *testing.T) { t.Fatal(ctx.Err()) } lastErr = nil - err = tbcapi.Write(ctx, tws.conn, "someid", tbcapi.BlockHeadersBestRequest{}) + err = tbcapi.Write(ctx, tws.conn, "someid", tbcapi.BlockHeaderBestRequest{}) if err != nil { lastErr = err continue @@ -389,7 +390,7 @@ func TestBtcBlockHeadersBest(t *testing.T) { continue } - if v.Header.Command == tbcapi.CmdBlockHeadersBestResponse { + if v.Header.Command == tbcapi.CmdBlockHeaderBestResponse { if err := json.Unmarshal(v.Payload, &response); err != nil { t.Fatal(err) } @@ -410,7 +411,7 @@ func TestBtcBlockHeadersBest(t *testing.T) { cliBlockHeader := bitcoindBestBlock(ctx, t, bitcoindContainer) expected := cliBlockHeaderToTBC(t, cliBlockHeader) - if diff := deep.Equal(expected, response.BlockHeaders); len(diff) > 0 { + if diff := deep.Equal(expected, []*tbcapi.BlockHeader{response.BlockHeader}); len(diff) > 0 { t.Errorf("unexpected diff: %s", diff) } } diff --git a/service/tbc/tbc.go b/service/tbc/tbc.go index 4d2bb65a..359bc771 100644 --- a/service/tbc/tbc.go +++ b/service/tbc/tbc.go @@ -12,6 +12,7 @@ import ( "encoding/hex" "errors" "fmt" + "math/big" "math/rand/v2" "net" "net/http" @@ -105,7 +106,7 @@ func bytes2Tx(b []byte) (*wire.MsgTx, error) { return &w, nil } -func header2Bytes(wbh *wire.BlockHeader) ([]byte, error) { +func header2Slice(wbh *wire.BlockHeader) ([]byte, error) { var b bytes.Buffer err := wbh.Serialize(&b) if err != nil { @@ -114,8 +115,16 @@ func header2Bytes(wbh *wire.BlockHeader) ([]byte, error) { return b.Bytes(), nil } +func header2Array(wbh *wire.BlockHeader) ([80]byte, error) { + sb, err := header2Slice(wbh) + if err != nil { + return [80]byte{}, err + } + return [80]byte(sb), nil +} + func h2b(wbh *wire.BlockHeader) []byte { - hb, err := header2Bytes(wbh) + hb, err := header2Slice(wbh) if err != nil { panic(err) } @@ -131,29 +140,11 @@ func bytes2Header(header []byte) (*wire.BlockHeader, error) { return &bh, nil } -func headerTime(header []byte) *time.Time { - h, err := bytes2Header(header) - if err != nil { - return nil - } - return &h.Timestamp -} - -func hashEqual(h1 chainhash.Hash, h2 chainhash.Hash) bool { - // Fuck you chainhash package - return h1.IsEqual(&h2) -} - func sliceChainHash(ch chainhash.Hash) []byte { // Fuck you chainhash package return ch[:] } -type blockPeer struct { - expire time.Time // when does this command expire - peer string // who was handling it -} - type Config struct { AutoIndex bool BlockSanity bool @@ -162,6 +153,7 @@ type Config struct { LogLevel string MaxCachedTxs int Network string + PeersWanted int PrometheusListenAddress string PprofListenAddress string } @@ -171,6 +163,7 @@ func NewDefaultConfig() *Config { ListenAddress: tbcapi.DefaultListen, LogLevel: logLevel, MaxCachedTxs: defaultMaxCachedTxs, + PeersWanted: defaultPeersWanted, } } @@ -197,12 +190,12 @@ type Server struct { blocks *ttl.TTL // outstanding block downloads [hash]when/where pings *ttl.TTL // outstanding pings - // IBD hints - lastBlockHeader tbcd.BlockHeader - // reentrancy flags for the indexers - utxoIndexerRunning bool - txIndexerRunning bool + // utxoIndexerRunning bool + // txIndexerRunning bool + quiesced bool // when set do not accept blockheaders and ot blocks. + // clipped bool // XXX kill including all surrounding code, this is for test only + indexing bool // prevent re-entrant indexing db tbcd.Database @@ -223,7 +216,7 @@ func NewServer(cfg *Config) (*Server, error) { if cfg == nil { cfg = NewDefaultConfig() } - pings, err := ttl.New(defaultPeersWanted, true) + pings, err := ttl.New(cfg.PeersWanted, true) if err != nil { return nil, err } @@ -236,7 +229,7 @@ func NewServer(cfg *Config) (*Server, error) { cfg: cfg, printTime: time.Now().Add(10 * time.Second), blocks: blocks, - peers: make(map[string]*peer, defaultPeersWanted), + peers: make(map[string]*peer, cfg.PeersWanted), pings: pings, blocksInserted: make(map[string]struct{}, 8192), // stats XXX rmeove? timeSource: blockchain.NewMedianTime(), @@ -286,8 +279,7 @@ func (s *Server) getHeaders(ctx context.Context, p *peer, lastHeaderHash []byte) hash := bh.BlockHash() ghs := wire.NewMsgGetHeaders() ghs.AddBlockLocatorHash(&hash) - err = p.write(defaultCmdTimeout, ghs) - if err != nil { + if err = p.write(defaultCmdTimeout, ghs); err != nil { return fmt.Errorf("write get headers: %w", err) } @@ -392,7 +384,7 @@ func (s *Server) peerManager(ctx context.Context) error { defer log.Tracef("peerManager exit") // Channel for peering signals - peersWanted := defaultPeersWanted + peersWanted := s.cfg.PeersWanted peerC := make(chan string, peersWanted) log.Infof("Peer manager connecting to %v peers", peersWanted) @@ -600,8 +592,7 @@ func (s *Server) peerConnect(ctx context.Context, peerC chan string, p *peer) { log.Errorf("split host port: %v", err) return } - err = s.db.PeerDelete(ctx, host, port) - if err != nil { + if err = s.db.PeerDelete(ctx, host, port); err != nil { log.Errorf("peer delete (%v): %v", pp, err) } else { log.Debugf("Peer delete: %v", pp) @@ -611,8 +602,7 @@ func (s *Server) peerConnect(ctx context.Context, peerC chan string, p *peer) { return } defer func() { - err := p.close() - if err != nil && !errors.Is(err, net.ErrClosed) { + if err := p.close(); err != nil && !errors.Is(err, net.ErrClosed) { log.Errorf("peer disconnect: %v %v", p, err) } }() @@ -629,7 +619,7 @@ func (s *Server) peerConnect(ctx context.Context, peerC chan string, p *peer) { // multiple answers come in the insert of the headers fails or // succeeds. If it fails no more headers will be requested from that // peer. - bhs, err := s.db.BlockHeadersBest(ctx) + bhb, err := s.db.BlockHeaderBest(ctx) if err != nil { log.Errorf("block headers best: %v", err) // database is closed, nothing we can do, return here to avoid below @@ -638,21 +628,14 @@ func (s *Server) peerConnect(ctx context.Context, peerC chan string, p *peer) { return } } - if len(bhs) != 1 { - // XXX fix multiple tips - panic(len(bhs)) - } - log.Debugf("block header best hash: %s", bhs[0].Hash) + log.Debugf("block header best hash: %s", bhb.Hash) - err = s.getHeaders(ctx, p, bhs[0].Header) - if err != nil { + if err = s.getHeaders(ctx, p, bhb.Header); err != nil { // This should not happen log.Errorf("get headers: %v", err) return } - // XXX kickstart block download, should happen in getHeaders - verbose := false for { // See if we were interrupted, for the love of pete add ctx to wire @@ -672,33 +655,43 @@ func (s *Server) peerConnect(ctx context.Context, peerC chan string, p *peer) { } if verbose { - spew.Dump(msg) + spew.Sdump(msg) } - // XXX send wire message to pool reader + // Commands that are always accepted. switch m := msg.(type) { case *wire.MsgAddr: go s.handleAddr(ctx, p, m) + continue case *wire.MsgAddrV2: go s.handleAddrV2(ctx, p, m) + continue - case *wire.MsgBlock: - go s.handleBlock(ctx, p, m) + case *wire.MsgPing: + go s.handlePing(ctx, p, m) + continue - case *wire.MsgFeeFilter: - // XXX shut up + case *wire.MsgPong: + go s.handlePong(ctx, p, m) + continue + } - case *wire.MsgInv: - go s.handleInv(ctx, p, m) + // When quiesced do not handle other p2p commands. + s.mtx.Lock() + quiesced := s.quiesced + s.mtx.Unlock() + if quiesced { + continue + } + switch m := msg.(type) { case *wire.MsgHeaders: go s.handleHeaders(ctx, p, m) - case *wire.MsgPing: - go s.handlePing(ctx, p, m) - case *wire.MsgPong: - go s.handlePong(ctx, p, m) + case *wire.MsgBlock: + go s.handleBlock(ctx, p, m) + default: log.Tracef("unhandled message type %v: %T\n", p, msg) } @@ -768,7 +761,7 @@ func (s *Server) handleAddr(ctx context.Context, p *peer, msg *wire.MsgAddr) { } err := s.db.PeersInsert(ctx, peers) // Don't log insert 0, its a dup. - if err != nil && !database.ErrZeroRows.Is(err) { + if err != nil && !errors.Is(err, database.ErrZeroRows) { log.Errorf("%v", err) } } @@ -786,7 +779,7 @@ func (s *Server) handleAddrV2(ctx context.Context, p *peer, msg *wire.MsgAddrV2) } err := s.db.PeersInsert(ctx, peers) // Don't log insert 0, its a dup. - if err != nil && !database.ErrZeroRows.Is(err) { + if err != nil && !errors.Is(err, database.ErrZeroRows) { log.Errorf("%v", err) } } @@ -813,170 +806,6 @@ func (s *Server) handlePong(ctx context.Context, p *peer, pong *wire.MsgPong) { log.Tracef("handlePong %v: pong %v", p.address, pong.Nonce) } -func (s *Server) handleInv(ctx context.Context, p *peer, msg *wire.MsgInv) { - log.Tracef("handleInv (%v)", p) - defer log.Tracef("handleInv exit (%v)", p) - - var bis []tbcd.BlockIdentifier - for k := range msg.InvList { - switch msg.InvList[k].Type { - case wire.InvTypeBlock: - - // XXX height is missing here, looks right but assert - // that this isn't broken. - log.Infof("handleInv: block %v", msg.InvList[k].Hash) - - bis = append(bis, tbcd.BlockIdentifier{ - Hash: msg.InvList[k].Hash[:], // fake out - }) - log.Infof("handleInv: block %v", msg.InvList[k].Hash) - case wire.InvTypeTx: - // XXX silence mempool for now - return - default: - log.Infof("handleInv: skipping inv type %v", msg.InvList[k].Type) - return - } - } - - // XXX This happens during block header download, we should not react - // Probably move into the invtype switch - log.Infof("download blocks if we like them") - // if len(bis) > 0 { - // s.mtx.Lock() - // defer s.mtx.Unlock() - // err := s.downloadBlocks(ctx, bis) - // if err != nil { - // log.Errorf("download blocks: %v", err) - // return - // } - // } -} - -func (s *Server) txIndexer(ctx context.Context) { - log.Tracef("txIndexer") - defer log.Tracef("txIndexer exit") - - if !s.cfg.AutoIndex { - return - } - - // only one txIndexer may run at any given time - s.mtx.Lock() - if s.txIndexerRunning { - s.mtx.Unlock() - return - } - s.txIndexerRunning = true - s.mtx.Unlock() - - // mark txIndexer not running on exit - defer func() { - s.mtx.Lock() - s.txIndexerRunning = false - s.mtx.Unlock() - }() - - if s.blocksMissing(ctx) { - return - } - - // Get height from db - he, err := s.db.MetadataGet(ctx, TxIndexHeightKey) - if err != nil { - if !errors.Is(err, database.ErrNotFound) { - log.Errorf("tx indexer metadata get: %v", err) - return - } - he = make([]byte, 8) - } - h := binary.BigEndian.Uint64(he) - - // Skip txIndexer if we are at best block height. This is a bit racy. - bhs, err := s.db.BlockHeadersBest(ctx) - if err != nil { - log.Errorf("utxo indexer block headers best: %v", err) - return - } - if len(bhs) != 1 { - log.Errorf("utxo indexer block headers best: unsuported fork") - return - } - - if bhs[0].Height != h-1 { - err = s.TxIndexer(ctx, h, 0) - if err != nil { - log.Errorf("tx indexer: %v", err) - return - } - } -} - -func (s *Server) utxoIndexer(ctx context.Context) { - log.Tracef("utxoIndexer") - defer log.Tracef("utxoIndexer exit") - - if !s.cfg.AutoIndex { - return - } - - // only one utxoIndexer may run at any given time - s.mtx.Lock() - if s.utxoIndexerRunning { - s.mtx.Unlock() - return - } - s.utxoIndexerRunning = true - s.mtx.Unlock() - - // mark utxoIndexer not running on exit - defer func() { - s.mtx.Lock() - s.utxoIndexerRunning = false - s.mtx.Unlock() - }() - - // exit if we aren't synced - if s.blocksMissing(ctx) { - return - } - - // Index all utxos - - // Get height from db - he, err := s.db.MetadataGet(ctx, UtxoIndexHeightKey) - if err != nil { - if !errors.Is(err, database.ErrNotFound) { - log.Errorf("utxo indexer metadata get: %v", err) - return - } - he = make([]byte, 8) - } - h := binary.BigEndian.Uint64(he) - - // Skip UtxoIndex if we are at best block height. This is a bit racy. - bhs, err := s.db.BlockHeadersBest(ctx) - if err != nil { - log.Errorf("utxo indexer block headers best: %v", err) - return - } - if len(bhs) != 1 { - log.Errorf("utxo indexer block headers best: unsuported fork") - return - } - - if bhs[0].Height != h-1 { - err = s.UtxoIndexer(ctx, h, 0) - if err != nil { - log.Errorf("utxo indexer: %v", err) - return - } - } - - // When utxo sync completes kick off tx sync - go s.txIndexer(ctx) -} - func (s *Server) downloadBlock(ctx context.Context, p *peer, ch *chainhash.Hash) { log.Tracef("downloadBlock") defer log.Tracef("downloadBlock exit") @@ -1035,7 +864,8 @@ func (s *Server) syncBlocks(ctx context.Context) { log.Tracef("syncBlocks") defer log.Tracef("syncBlocks exit") - // Prevent race condition with 'want', which may cause the cache capacity to be exceeded. + // Prevent race condition with 'want', which may cause the cache + // capacity to be exceeded. s.mtx.Lock() defer s.mtx.Unlock() @@ -1049,6 +879,37 @@ func (s *Server) syncBlocks(ctx context.Context) { return } + if len(bm) == 0 { + // We can avoid quiescing by verifying if we are already done + // indexing. + if si := s.synced(ctx); si.Synced { + log.Tracef("already synced at %v", si.BlockHeaderHeight) + return + } + + // Exit if AutoIndex isn't enabled. + if !s.cfg.AutoIndex { + return + } + + bhb, err := s.db.BlockHeaderBest(ctx) + if err != nil { + log.Errorf("sync blocks best block header: %v", err) + return + } + s.quiesced = true // XXX if it's set and we exit with an error, what should we do?? + go func() { + // we really want to push the indexing reentrancy into this call + log.Infof("quiescing p2p and indexing to: %v", bhb.Height) + // XXX make hash + if err = s.SyncIndexersToHeight(ctx, bhb.Height+1); err != nil { + log.Errorf("sync blocks: %v", err) + return + } + }() + return + } + for k := range bm { bi := bm[k] hash, _ := chainhash.NewHash(bi.Hash[:]) @@ -1069,18 +930,18 @@ func (s *Server) syncBlocks(ctx context.Context) { s.blockExpired, nil) go s.downloadBlock(ctx, rp, hash) } - - if len(bm) == 0 { - // if we are complete we need to kick off utxo sync - go s.utxoIndexer(ctx) - } } func (s *Server) handleHeaders(ctx context.Context, p *peer, msg *wire.MsgHeaders) { - log.Tracef("handleHeaders %v", p) - defer log.Tracef("handleHeaders exit %v", p) + log.Tracef("handleHeaders (%v): %v", p, len(msg.Headers)) + defer log.Tracef("handleHeaders exit (%v): %v", p, len(msg.Headers)) - log.Debugf("handleHeaders (%v): %v", p, len(msg.Headers)) + // s.mtx.Lock() + // if s.clipped { + // log.Infof("pretend we are at the height") + // msg.Headers = msg.Headers[0:0] + // } + // s.mtx.Unlock() if len(msg.Headers) == 0 { // This may signify the end of IBD but isn't 100%. We can fart @@ -1088,13 +949,16 @@ func (s *Server) handleHeaders(ctx context.Context, p *peer, msg *wire.MsgHeader // just behind or if we are nominally where we should be. This // test will never be 100% accurate. - s.mtx.Lock() - lastBH := s.lastBlockHeader.Timestamp() - s.mtx.Unlock() - if time.Now().Sub(lastBH) > 6*s.chainParams.TargetTimePerBlock { - log.Infof("peer not synced: %v", p) - return - } + // s.mtx.Lock() + // lastBH := s.lastBlockHeader.Timestamp() + // s.mtx.Unlock() + // if time.Since(lastBH) > 6*s.chainParams.TargetTimePerBlock { + // log.Infof("peer not synced: %v", p) + // p.close() // get rid of this peer + // return + // } + + // only do this if peer is synced go s.syncBlocks(ctx) @@ -1107,70 +971,94 @@ func (s *Server) handleHeaders(ctx context.Context, p *peer, msg *wire.MsgHeader // // There really is no good way of determining if we can escape the // expensive calls so we just eat it. - - // Make sure we can connect these headers in database - dbpbh, err := s.db.BlockHeaderByHash(ctx, msg.Headers[0].PrevBlock[:]) - if err != nil { - log.Errorf("handle headers no previous block header: %v", - msg.Headers[0].BlockHash()) - return - } - pbh, err := bytes2Header(dbpbh.Header) - if err != nil { - log.Errorf("invalid block header: %v", err) - return - } - - // Construct insert list and nominally validate headers - headers := make([]tbcd.BlockHeader, 0, len(msg.Headers)) - height := dbpbh.Height + 1 + var pbhHash *chainhash.Hash + headers := make([][80]byte, len(msg.Headers)) for k := range msg.Headers { - if !hashEqual(msg.Headers[k].PrevBlock, pbh.BlockHash()) { - log.Errorf("cannot connect %v at height %v", - msg.Headers[k].PrevBlock, height) + if pbhHash != nil && pbhHash.IsEqual(&msg.Headers[k].PrevBlock) { + log.Errorf("cannot connect %v index %v", + msg.Headers[k].PrevBlock, k) + p.close() // get rid of this misbehaving peer return } - headers = append(headers, tbcd.BlockHeader{ - Hash: sliceChainHash(msg.Headers[k].BlockHash()), - Height: height, - Header: h2b(msg.Headers[k]), - }) - - pbh = msg.Headers[k] - height++ + copy(headers[k][0:80], h2b(msg.Headers[k])) // XXX don't double copy + pbhHash = &msg.Headers[k].PrevBlock } if len(headers) > 0 { - err := s.db.BlockHeadersInsert(ctx, headers) + it, cbh, lbh, err := s.db.BlockHeadersInsert(ctx, headers) if err != nil { // This ends the race between peers during IBD. - if !database.ErrDuplicate.Is(err) { + if !errors.Is(database.ErrDuplicate, err) { + // XXX do we need to ask for more headers? log.Errorf("block headers insert: %v", err) } return } - // If we get here try to store the last blockheader that was - // inserted. This may race so we have to take the mutex and - // check height. - lbh := headers[len(headers)-1] + // Note that BlockHeadersInsert always returns the canonical + // tip blockheader. + var height uint64 + switch it { + case tbcd.ITChainExtend: + height = cbh.Height - s.mtx.Lock() - if lbh.Height > s.lastBlockHeader.Height { - s.lastBlockHeader = lbh - } - s.mtx.Unlock() + // Ask for next batch of headers at canonical tip. + if err = s.getHeaders(ctx, p, cbh.Header); err != nil { + log.Errorf("get headers: %v", err) + return + } - log.Infof("Inserted %v block headers height %v", - len(headers), lbh.Height) + case tbcd.ITForkExtend: + height = lbh.Height - // Ask for next batch of headers - err = s.getHeaders(ctx, p, lbh.Header) - if err != nil { - log.Errorf("get headers: %v", err) + // Ask for more block headers at the fork tip. + if err = s.getHeaders(ctx, p, lbh.Header); err != nil { + log.Errorf("get headers fork: %v", err) + return + } + + // Also ask for more block headers at canonical tip + if err = s.getHeaders(ctx, p, cbh.Header); err != nil { + log.Errorf("get headers canonical: %v", err) + return + } + + case tbcd.ITChainFork: + height = cbh.Height + + if s.Synced(ctx).Synced { + // XXX this is racy but is a good enough test + // to get past most of this. + panic("chain forked, unwind/rewind indexes") + } + + // Ask for more block headers at the fork tip. + if err = s.getHeaders(ctx, p, lbh.Header); err != nil { + log.Errorf("get headers fork: %v", err) + return + } + + // Also ask for more block headers at canonical tip + if err = s.getHeaders(ctx, p, cbh.Header); err != nil { + log.Errorf("get headers canonical: %v", err) + return + } + + default: + // XXX can't happen + log.Errorf("invalid insert type: %d", it) return } + + // XXX we probably don't want top print it + log.Infof("Inserted (%v) %v block headers height %v", + it, len(headers), height) + + // s.mtx.Lock() + // s.clipped = true + // s.mtx.Unlock() + // log.Infof("clipped at %v", lbh.Height) } } @@ -1221,7 +1109,8 @@ func (s *Server) handleBlock(ctx context.Context, p *peer, msg *wire.MsgBlock) { len(msg.Transactions), msg.Header.Timestamp) } - // Whatever happens,, delete from cache and potentially try again + // Whatever happens, delete from cache and potentially try again + log.Infof("inserted block at height %d, parent hash %s", height, block.MsgBlock().Header.PrevBlock) var ( printStats bool blocksSize uint64 @@ -1298,42 +1187,35 @@ func (s *Server) handleBlock(ctx context.Context, p *peer, msg *wire.MsgBlock) { go s.syncBlocks(ctx) } -func (s *Server) insertGenesis(ctx context.Context) ([]tbcd.BlockHeader, error) { +func (s *Server) insertGenesis(ctx context.Context) error { log.Tracef("insertGenesis") defer log.Tracef("insertGenesis exit") // We really should be inserting the block first but block insert // verifies that a block header exists. log.Infof("Inserting genesis block and header: %v", s.chainParams.GenesisHash) - gbh, err := header2Bytes(&s.chainParams.GenesisBlock.Header) + gbh, err := header2Array(&s.chainParams.GenesisBlock.Header) if err != nil { - return nil, fmt.Errorf("serialize genesis block header: %w", err) + return fmt.Errorf("serialize genesis block header: %w", err) } - - genesisBlockHeader := &tbcd.BlockHeader{ - Height: 0, - Hash: s.chainParams.GenesisHash[:], - Header: gbh, - } - err = s.db.BlockHeadersInsert(ctx, []tbcd.BlockHeader{*genesisBlockHeader}) - if err != nil { - return nil, fmt.Errorf("genesis block header insert: %w", err) + if err = s.db.BlockHeaderGenesisInsert(ctx, gbh); err != nil { + return fmt.Errorf("genesis block header insert: %w", err) } log.Debugf("Inserting genesis block") gb, err := btcutil.NewBlock(s.chainParams.GenesisBlock).Bytes() if err != nil { - return nil, fmt.Errorf("genesis block encode: %w", err) + return fmt.Errorf("genesis block encode: %w", err) } _, err = s.db.BlockInsert(ctx, &tbcd.Block{ Hash: s.chainParams.GenesisHash[:], Block: gb, }) if err != nil { - return nil, fmt.Errorf("genesis block insert: %w", err) + return fmt.Errorf("genesis block insert: %w", err) } - return []tbcd.BlockHeader{*genesisBlockHeader}, nil + return nil } // @@ -1402,54 +1284,42 @@ func (s *Server) BlockHeadersByHeight(ctx context.Context, height uint64) ([]*wi return wireBlockHeaders, nil } -// RawBlockHeadersBest returns the raw headers for the best known blocks. -func (s *Server) RawBlockHeadersBest(ctx context.Context) (uint64, []api.ByteSlice, error) { - log.Tracef("RawBlockHeadersBest") - defer log.Tracef("RawBlockHeadersBest exit") +// RawBlockHeaderBest returns the raw header for the best known block. +// XXX should we return cumulative difficulty, hash? +func (s *Server) RawBlockHeaderBest(ctx context.Context) (uint64, api.ByteSlice, error) { + log.Tracef("RawBlockHeaderBest") + defer log.Tracef("RawBlockHeaderBest exit") - bhs, err := s.db.BlockHeadersBest(ctx) + bhb, err := s.db.BlockHeaderBest(ctx) if err != nil { return 0, nil, err } + return bhb.Height, api.ByteSlice(bhb.Header[:]), nil +} - var height uint64 - if len(bhs) > 0 { - height = bhs[0].Height - } +func (s *Server) DifficultyAtHash(ctx context.Context, hash *chainhash.Hash) (*big.Int, error) { + log.Tracef("DifficultyAtHash") + defer log.Tracef("DifficultyAtHash exit") - var headers []api.ByteSlice - for _, bh := range bhs { - headers = append(headers, []byte(bh.Header)) + blockHeader, err := s.db.BlockHeaderByHash(ctx, hash[:]) + if err != nil { + return nil, err } - return height, headers, nil + return &blockHeader.Difficulty, nil } -// BlockHeadersBest returns the headers for the best known blocks. -func (s *Server) BlockHeadersBest(ctx context.Context) (uint64, []*wire.BlockHeader, error) { +// BlockHeaderBest returns the headers for the best known blocks. +func (s *Server) BlockHeaderBest(ctx context.Context) (uint64, *wire.BlockHeader, error) { log.Tracef("BlockHeadersBest") defer log.Tracef("BlockHeadersBest exit") - blockHeaders, err := s.db.BlockHeadersBest(ctx) + blockHeader, err := s.db.BlockHeaderBest(ctx) if err != nil { return 0, nil, err } - - var height uint64 - if len(blockHeaders) > 0 { - height = blockHeaders[0].Height - } - - wireBlockHeaders := make([]*wire.BlockHeader, 0, len(blockHeaders)) - for _, bh := range blockHeaders { - w, err := bh.Wire() - if err != nil { - return 0, nil, err - } - wireBlockHeaders = append(wireBlockHeaders, w) - } - - return height, wireBlockHeaders, nil + wbh, err := bytes2Header(blockHeader.Header) + return blockHeader.Height, wbh, err } func (s *Server) BalanceByAddress(ctx context.Context, encodedAddress string) (uint64, error) { @@ -1463,9 +1333,7 @@ func (s *Server) BalanceByAddress(ctx context.Context, encodedAddress string) (u return 0, err } - scriptHash := sha256.Sum256(script) - - balance, err := s.db.BalanceByScriptHash(ctx, scriptHash) + balance, err := s.db.BalanceByScriptHash(ctx, sha256.Sum256(script)) if err != nil { return 0, err } @@ -1563,7 +1431,8 @@ func (s *Server) FeesAtHeight(ctx context.Context, height, count int64) (uint64, return 0, fmt.Errorf("headers by height: %w", err) } if len(bhs) != 1 { - return 0, fmt.Errorf("too many block headers: %v", len(bhs)) + panic("fees at height: unsupported fork") + // return 0, fmt.Errorf("too many block headers: %v", len(bhs)) } be, err := s.db.BlockByHash(ctx, bhs[0].Hash) if err != nil { @@ -1577,8 +1446,7 @@ func (s *Server) FeesAtHeight(ctx context.Context, height, count int64) (uint64, } // walk block tx' - err = feesFromTransactions(b.Transactions()) - if err != nil { + if err = feesFromTransactions(b.Transactions()); err != nil { return 0, fmt.Errorf("fees from transactions %v %v: %v", height, b.Hash(), err) } @@ -1594,10 +1462,13 @@ type SyncInfo struct { TxHeight uint64 // last indexed tx block height } -func (s *Server) Synced(ctx context.Context) (si SyncInfo) { - s.mtx.Lock() - defer s.mtx.Unlock() - si.BlockHeaderHeight = s.lastBlockHeader.Height +func (s *Server) synced(ctx context.Context) (si SyncInfo) { + bhb, err := s.db.BlockHeaderBest(ctx) + if err != nil { + // This should never happen. + panic(err) + } + si.BlockHeaderHeight = bhb.Height // These values are cached in leveldb so it is ok to call with mutex // held. @@ -1619,6 +1490,14 @@ func (s *Server) Synced(ctx context.Context) (si SyncInfo) { return } +// Synced returns true if all block headers, blocks and all indexes are caught up. +func (s *Server) Synced(ctx context.Context) SyncInfo { + s.mtx.Lock() + defer s.mtx.Unlock() + + return s.synced(ctx) +} + // DBOpen opens the underlying server database. It has been put in its own // function to make it available during tests and hemictl. func (s *Server) DBOpen(ctx context.Context) error { @@ -1688,26 +1567,22 @@ func (s *Server) Run(pctx context.Context) error { }() // Find out where IBD is at - bhs, err := s.db.BlockHeadersBest(ctx) + bhb, err := s.db.BlockHeaderBest(ctx) if err != nil { - return fmt.Errorf("block headers best: %w", err) - } - // No entries means we are at genesis - if len(bhs) == 0 { - bhs, err = s.insertGenesis(ctx) - if err != nil { - return fmt.Errorf("insert genesis: %w", err) - } - bhs, err = s.db.BlockHeadersBest(ctx) - if err != nil { - return err + if errors.Is(err, database.ErrNotFound) { + if err = s.insertGenesis(ctx); err != nil { + return fmt.Errorf("insert genesis: %w", err) + } + bhb, err = s.db.BlockHeaderBest(ctx) + if err != nil { + return err + } + } else { + return fmt.Errorf("block headers best: %v", err) } - } else if len(bhs) > 1 { - return errors.New("blockheaders best: unsupported fork") } - s.lastBlockHeader = bhs[0] // Prime last seen block header log.Infof("Starting block headers sync at height: %v time %v", - bhs[0].Height, bhs[0].Timestamp()) + bhb.Height, bhb.Timestamp()) // HTTP server mux := http.NewServeMux() @@ -1782,8 +1657,7 @@ func (s *Server) Run(pctx context.Context) error { s.wg.Add(1) go func() { defer s.wg.Done() - err := s.startPeerManager(ctx) - if err != nil { + if err := s.startPeerManager(ctx); err != nil { select { case errC <- err: default: diff --git a/service/tbc/tbc_test.go b/service/tbc/tbc_test.go index 1f050b8b..bc1ec721 100644 --- a/service/tbc/tbc_test.go +++ b/service/tbc/tbc_test.go @@ -24,6 +24,7 @@ import ( "github.com/davecgh/go-spew/spew" "github.com/docker/docker/api/types/container" "github.com/docker/go-connections/nat" + "github.com/go-test/deep" "github.com/phayes/freeport" "github.com/testcontainers/testcontainers-go" "github.com/testcontainers/testcontainers-go/wait" @@ -59,6 +60,31 @@ func skipIfNoDocker(t *testing.T) { } } +func TestBlockHeaderEncodeDecode(t *testing.T) { + chainParams := &chaincfg.TestNet3Params + gwbh := chainParams.GenesisBlock.Header + sh, err := header2Slice(&gwbh) + if err != nil { + t.Error(err) + } + wbh, err := bytes2Header(sh) + if err != nil { + t.Error(err) + } + if diff := deep.Equal(&gwbh, wbh); len(diff) > 0 { + t.Errorf("unexpected diff: %s", diff) + } + + ash, err := header2Array(&gwbh) + if err != nil { + t.Error(err) + } + awbh, err := bytes2Header(ash[:]) + if diff := deep.Equal(&gwbh, awbh); len(diff) > 0 { + t.Errorf("unexpected diff: %s", diff) + } +} + func TestServerBlockHeadersBest(t *testing.T) { skipIfNoDocker(t) @@ -81,17 +107,548 @@ func TestServerBlockHeadersBest(t *testing.T) { t.Fatal(ctx.Err()) } - height, bhs, err := tbcServer.BlockHeadersBest(ctx) + height, bhb, err := tbcServer.BlockHeaderBest(ctx) if err != nil { - t.Errorf("BlockHeadersBest() err = %v, want nil", err) + t.Errorf("BlockHeaderBest() err = %v, want nil", err) + } + _ = bhb // XXX probably should decode and test + if height != blocks { + t.Errorf("BlockHeaderBest() height = %d, want %d", height, blocks) } +} + +func TestForksWithGen(t *testing.T) { + skipIfNoDocker(t) + + t.Skip("need unwind functionality to run these tests, they need to be audited after that as well") - if l := len(bhs); l != 1 { - t.Errorf("BlockHeadersBest() block len = %d, want 1", l) + otherPrivateKey := "72a2c41c84147325ce3c0f37697ef1e670c7169063dda89be9995c3c5219ffff" + _, _, otherAddress, err := bitcoin.KeysAndAddressFromHexString( + otherPrivateKey, + &chaincfg.RegressionNetParams, + ) + if err != nil { + t.Fatal(err) } - if height != blocks { - t.Errorf("BlockHeadersBest() height = %d, want %d", height, blocks) + type tbcForkTestTableItem struct { + name string + testForkScenario func(t *testing.T, ctx context.Context, bitcoindContainer testcontainers.Container, walletAddress string, tbcServer *Server) + } + + testTable := []tbcForkTestTableItem{ + { + name: "Split Tip, Single Block", + testForkScenario: func(t *testing.T, ctx context.Context, bitcoindContainer testcontainers.Container, walletAddress string, tbcServer *Server) { + // block 1A, send 7 btc to otherAddress + _, err := runBitcoinCommand( + ctx, t, bitcoindContainer, + []string{ + "bitcoin-cli", + "-regtest=1", + "-named", + "sendtoaddress", + fmt.Sprintf("address=%s", otherAddress.EncodeAddress()), + "conf_target=1", + "amount=7", + "avoid_reuse=false", + }) + if err != nil { + t.Fatal(err) + } + + blockHashesResponse, err := runBitcoinCommand( + ctx, + t, + bitcoindContainer, + []string{ + "bitcoin-cli", + "-regtest=1", + "-generate", + "1", + }) + if err != nil { + t.Fatal(err) + } + + err = tbcServer.SyncIndexersToHeight(ctx, 201) + if err != nil { + t.Fatal(err) + } + + balance, err := tbcServer.BalanceByAddress(ctx, otherAddress.String()) + if err != nil { + t.Fatal(err) + } + + if balance != 700000000 { + t.Fatalf("unexpected balance: %d", balance) + } + + var blockHashes struct { + Blocks []string `json:"blocks"` + } + if err := json.Unmarshal([]byte(blockHashesResponse), &blockHashes); err != nil { + t.Fatal(err) + } + + // create fork, invalidate block 1A, this returns the tx back + // to the mempool + invalidateBlock(ctx, t, bitcoindContainer, blockHashes.Blocks[0]) + + _, err = runBitcoinCommand( + ctx, + t, + bitcoindContainer, + []string{ + "bitcoin-cli", + "-regtest=1", + "sendtoaddress", + otherAddress.EncodeAddress(), + "15", + "avoid_reuse=false", + }) + if err != nil { + t.Fatal(err) + } + + // create block 1B and 2B, the txs should be included + // in 1B. use 2B to move tbc forward + _, err = runBitcoinCommand( + ctx, + t, + bitcoindContainer, + []string{ + "bitcoin-cli", + "-regtest=1", + "-generate", + "2", + }) + if err != nil { + t.Fatal(err) + } + }, + }, + { + name: "Split Tip, Multiple Blocks", + testForkScenario: func(t *testing.T, ctx context.Context, bitcoindContainer testcontainers.Container, walletAddress string, tbcServer *Server) { + lastA := "" + lastB := "" + earliestA := "" + earliestB := "" + + for i := 0; i < 3; i++ { + + // invalidate B and reconsider A to grow chain A + if earliestB != "" { + invalidateBlock(ctx, t, bitcoindContainer, earliestB) + } + + if lastA != "" { + reconsiderBlock(ctx, t, bitcoindContainer, lastA) + } + + // block i*1A, send 7 btc to otherAddress + _, err := runBitcoinCommand( + ctx, + t, + bitcoindContainer, + []string{ + "bitcoin-cli", + "-regtest=1", + "-named", + "sendtoaddress", + fmt.Sprintf("address=%s", otherAddress.EncodeAddress()), + "conf_target=1", + "amount=3", + "subtractfeefromamount=true", + "avoid_reuse=false", + }) + if err != nil { + t.Fatal(err) + } + + blockHashesResponse, err := runBitcoinCommand( + ctx, + t, + bitcoindContainer, + []string{ + "bitcoin-cli", + "-regtest=1", + "-generate", + fmt.Sprintf("%d", i*2+1), + }) + if err != nil { + t.Fatal(err) + } + + var blockHashes struct { + Blocks []string `json:"blocks"` + } + if err := json.Unmarshal([]byte(blockHashesResponse), &blockHashes); err != nil { + t.Fatal(err) + } + + lastA = blockHashes.Blocks[0] + if earliestA == "" { + earliestA = lastA + } + + // invalidate A and reconsider B to grow chain B + if earliestA != "" { + invalidateBlock(ctx, t, bitcoindContainer, earliestA) + } + + if lastB != "" { + reconsiderBlock(ctx, t, bitcoindContainer, lastB) + } + + _, err = runBitcoinCommand( + ctx, + t, + bitcoindContainer, + []string{ + "bitcoin-cli", + "-regtest=1", + "-named", + "sendtoaddress", + fmt.Sprintf("address=%s", otherAddress.EncodeAddress()), + "conf_target=1", + "amount=2", + "subtractfeefromamount=true", + "avoid_reuse=false", + }) + if err != nil { + t.Fatal(err) + } + + blockHashesResponse, err = runBitcoinCommand( + ctx, + t, + bitcoindContainer, + []string{ + "bitcoin-cli", + "-regtest=1", + "-generate", + fmt.Sprintf("%d", i*2+2), + }) + if err != nil { + t.Fatal(err) + } + + if err := json.Unmarshal([]byte(blockHashesResponse), &blockHashes); err != nil { + t.Fatal(err) + } + + lastB := blockHashes.Blocks[0] + if earliestB == "" { + earliestB = lastB + } + } + }, + }, + + { + name: "Long reorg", + testForkScenario: func(t *testing.T, ctx context.Context, bitcoindContainer testcontainers.Container, walletAddress string, tbcServer *Server) { + _, err := runBitcoinCommand( + ctx, + t, + bitcoindContainer, + []string{ + "bitcoin-cli", + "-regtest=1", + "-named", + "sendtoaddress", + fmt.Sprintf("address=%s", otherAddress.EncodeAddress()), + "conf_target=1", + "amount=7", + "avoid_reuse=false", + }) + if err != nil { + t.Fatal(err) + } + + _, err = runBitcoinCommand( + ctx, + t, + bitcoindContainer, + []string{ + "bitcoin-cli", + "-regtest=1", + "-generate", + "1", + }) + if err != nil { + t.Fatal(err) + } + + err = tbcServer.SyncIndexersToHeight(ctx, 201) + if err != nil { + t.Fatal(err) + } + + balance, err := tbcServer.BalanceByAddress(ctx, otherAddress.String()) + if err != nil { + t.Fatal(err) + } + + if balance != 700000000 { + t.Fatalf("unexpected balance: %d", balance) + } + + blockHash, err := runBitcoinCommand( + ctx, + t, + bitcoindContainer, + []string{ + "bitcoin-cli", + "-regtest=1", + "getblockhash", + "102", + }) + if err != nil { + t.Fatal(err) + } + + // create fork, invalidate block at height 10, this means we + // generate blocks starting at 10 + invalidateBlock(ctx, t, bitcoindContainer, blockHash) + + _, err = runBitcoinCommand( + ctx, + t, + bitcoindContainer, + []string{ + "bitcoin-cli", + "-regtest=1", + "sendtoaddress", + otherAddress.EncodeAddress(), + "15", + "avoid_reuse=false", + }) + if err != nil { + t.Fatal(err) + } + + // create long new chain + _, err = runBitcoinCommand( + ctx, + t, + bitcoindContainer, + []string{ + "bitcoin-cli", + "-regtest=1", + "-generate", + "300", + }) + if err != nil { + t.Fatal(err) + } + + err = tbcServer.SyncIndexersToHeight(ctx, 310) + if err != nil { + t.Fatal(err) + } + }, + }, + { + name: "Ancient orphan", + testForkScenario: func(t *testing.T, ctx context.Context, bitcoindContainer testcontainers.Container, walletAddress string, tbcServer *Server) { + _, err := runBitcoinCommand( + ctx, + t, + bitcoindContainer, + []string{ + "bitcoin-cli", + "-regtest=1", + "-named", + "sendtoaddress", + fmt.Sprintf("address=%s", otherAddress.EncodeAddress()), + "conf_target=1", + "amount=7", + "avoid_reuse=false", + }) + if err != nil { + t.Fatal(err) + } + + _, err = runBitcoinCommand( + ctx, + t, + bitcoindContainer, + []string{ + "bitcoin-cli", + "-regtest=1", + "-generate", + "1", + }) + if err != nil { + t.Fatal(err) + } + + err = tbcServer.SyncIndexersToHeight(ctx, 201) + if err != nil { + t.Fatal(err) + } + + balance, err := tbcServer.BalanceByAddress(ctx, otherAddress.String()) + if err != nil { + t.Fatal(err) + } + + if balance != 700000000 { + t.Fatalf("unexpected balance: %d", balance) + } + + blockHash, err := runBitcoinCommand( + ctx, + t, + bitcoindContainer, + []string{ + "bitcoin-cli", + "-regtest=1", + "getblockhash", + "102", + }) + if err != nil { + t.Fatal(err) + } + + // create fork, invalidate block at height 10, this means we + // generate blocks starting at 10 + invalidateBlock(ctx, t, bitcoindContainer, blockHash) + + _, err = runBitcoinCommand( + ctx, + t, + bitcoindContainer, + []string{ + "bitcoin-cli", + "-regtest=1", + "sendtoaddress", + otherAddress.EncodeAddress(), + "15", + "avoid_reuse=false", + }) + if err != nil { + t.Fatal(err) + } + + // create long new chain + _, err = runBitcoinCommand( + ctx, + t, + bitcoindContainer, + []string{ + "bitcoin-cli", + "-regtest=1", + "-generate", + "10", + }) + if err != nil { + t.Fatal(err) + } + + err = tbcServer.SyncIndexersToHeight(ctx, 310) + if err != nil { + t.Fatal(err) + } + }, + }, + } + + for _, tt := range testTable { + t.Run(tt.name, func(t *testing.T) { + ctx, cancel := context.WithTimeout(context.Background(), 5*time.Minute) + defer cancel() + + // generate 200 to btcAddress + bitcoindContainer, mappedPeerPort := createBitcoindWithInitialBlocks(ctx, t, 0, "") + defer func() { + if err := bitcoindContainer.Terminate(ctx); err != nil { + panic(err) + } + }() + + _, err = runBitcoinCommand( + ctx, + t, + bitcoindContainer, + []string{ + "bitcoin-cli", + "-regtest=1", + "createwallet", + "mywallet", + }) + if err != nil { + t.Fatal(err) + } + + walletAddress, err := runBitcoinCommand( + ctx, + t, + bitcoindContainer, + []string{ + "bitcoin-cli", + "-regtest=1", + "getnewaddress", + }) + if err != nil { + t.Fatal(err) + } + + _, err = runBitcoinCommand( + ctx, + t, + bitcoindContainer, + []string{ + "bitcoin-cli", + "-regtest=1", + "generatetoaddress", + "200", + walletAddress, + }) + if err != nil { + t.Fatal(err) + } + + tbcServer, _ := createTbcServer(ctx, t, mappedPeerPort) + + tt.testForkScenario(t, ctx, bitcoindContainer, walletAddress, tbcServer) + }) + } +} + +func invalidateBlock(ctx context.Context, t *testing.T, bitcoindContainer testcontainers.Container, blockHash string) { + _, err := runBitcoinCommand( + ctx, + t, + bitcoindContainer, + []string{ + "bitcoin-cli", + "-regtest=1", + "invalidateblock", + blockHash, + }) + if err != nil { + t.Fatal(err) + } +} + +func reconsiderBlock(ctx context.Context, t *testing.T, bitcoindContainer testcontainers.Container, blockHash string) { + _, err := runBitcoinCommand( + ctx, + t, + bitcoindContainer, + []string{ + "bitcoin-cli", + "-regtest=1", + "reconsiderblock", + blockHash, + }) + if err != nil { + t.Fatal(err) } } @@ -104,15 +661,13 @@ func createBitcoind(ctx context.Context, t *testing.T) testcontainers.Container name := fmt.Sprintf("bitcoind-%s", id) req := testcontainers.ContainerRequest{ Image: "kylemanna/bitcoind", - Cmd: []string{"bitcoind", "-regtest=1", "-debug=1", "-rpcallowip=0.0.0.0/0", "-rpcbind=0.0.0.0:18443", "-txindex=1", "-noonion", "-listenonion=0"}, + Cmd: []string{"bitcoind", "-regtest=1", "-debug=1", "-rpcallowip=0.0.0.0/0", "-rpcbind=0.0.0.0:18443", "-txindex=1", "-noonion", "-listenonion=0", "-fallbackfee=0.01"}, ExposedPorts: []string{"18443", "18444"}, WaitingFor: wait.ForLog("dnsseed thread exit").WithPollInterval(1 * time.Second), LogConsumerCfg: &testcontainers.LogConsumerConfig{ - Consumers: []testcontainers.LogConsumer{ - &StdoutLogConsumer{ - Name: name, - }, - }, + Consumers: []testcontainers.LogConsumer{&StdoutLogConsumer{ + Name: name, + }}, }, Name: name, HostConfigModifier: func(hostConfig *container.HostConfig) { @@ -158,6 +713,10 @@ func runBitcoinCommand(ctx context.Context, t *testing.T, bitcoindContainer test return "", fmt.Errorf("error code received: %d", exitCode) } + if len(buf.String()) == 0 { + return "", nil + } + // first 8 bytes are header, there is also a newline character at the end of the response return buf.String()[8 : len(buf.String())-1], nil } @@ -178,7 +737,9 @@ func getRandomTxId(ctx context.Context, t *testing.T, bitcoindContainer testcont } blockJson, err := runBitcoinCommand( - ctx, t, bitcoindContainer, + ctx, + t, + bitcoindContainer, []string{ "bitcoin-cli", "-regtest=1", @@ -356,7 +917,7 @@ func cliBlockHeaderToRaw(t *testing.T, cliBlockHeader *BtcCliBlockHeader) []api. blockHeader := cliBlockHeaderToWire(t, cliBlockHeader) t.Logf(spew.Sdump(blockHeader)) - bytes, err := header2Bytes(blockHeader) + bytes, err := header2Slice(blockHeader) if err != nil { t.Fatal(fmt.Errorf("header to bytes: %w", err)) } diff --git a/service/tbc/tbcfork_test.go b/service/tbc/tbcfork_test.go new file mode 100644 index 00000000..993d22bc --- /dev/null +++ b/service/tbc/tbcfork_test.go @@ -0,0 +1,813 @@ +// Copyright (c) 2024 Hemi Labs, Inc. +// Use of this source code is governed by the MIT License, +// which can be found in the LICENSE file. + +package tbc + +import ( + "context" + "crypto/rand" + "encoding/binary" + "errors" + "fmt" + "math/big" + "net" + "sync" + "testing" + "time" + + "github.com/btcsuite/btcd/blockchain" + "github.com/btcsuite/btcd/btcec/v2" + "github.com/btcsuite/btcd/btcutil" + "github.com/btcsuite/btcd/chaincfg" + "github.com/btcsuite/btcd/chaincfg/chainhash" + "github.com/btcsuite/btcd/txscript" + "github.com/btcsuite/btcd/wire" + "github.com/davecgh/go-spew/spew" + "github.com/juju/loggo" + + "github.com/hemilabs/heminetwork/api/tbcapi" +) + +type btcNode struct { + t *testing.T + port string + p *peer + + mtx sync.RWMutex + chain map[string]*btcutil.Block + blocksAtHeight map[int32][]*btcutil.Block + height int32 + params *chaincfg.Params +} + +func newFakeNode(t *testing.T, port string) (*btcNode, error) { + node := &btcNode{ + t: t, + port: port, + chain: make(map[string]*btcutil.Block, 10), + blocksAtHeight: make(map[int32][]*btcutil.Block, 10), + height: 0, + params: &chaincfg.RegressionNetParams, + } + + genesis := btcutil.NewBlock(chaincfg.RegressionNetParams.GenesisBlock) + genesis.SetHeight(0) + // node.chain[chaincfg.RegressionNetParams.GenesisHash.String()] = genesis + _, err := node.insertBlock(genesis) + if err != nil { + return nil, err + } + return node, nil +} + +func (b *btcNode) handleGetHeaders(m *wire.MsgGetHeaders) (*wire.MsgHeaders, error) { + b.mtx.Lock() + defer b.mtx.Unlock() + + if len(m.BlockLocatorHashes) != 1 { + return nil, fmt.Errorf("get headers: invalid count got %v wanted %v", + len(m.BlockLocatorHashes), 1) + } + locator := m.BlockLocatorHashes[0] + from, ok := b.chain[locator.String()] + if !ok { + return nil, fmt.Errorf("get headers: locator not found %v", locator) + } + + nmh := wire.NewMsgHeaders() + height := from.Height() + 1 + b.t.Logf("start from %v", height) + for range 2000 { + bs, ok := b.blocksAtHeight[height] + if !ok { + b.t.Logf("no more blocks at: %v", height) + return nmh, nil + } + if len(bs) != 1 { + return nil, fmt.Errorf("fork at height: %v", height) + } + err := nmh.AddBlockHeader(&bs[0].MsgBlock().Header) + if err != nil { + return nil, fmt.Errorf("add header: %v", err) + } + + b.t.Logf("%v: %v", height, bs[0].MsgBlock().Header.BlockHash()) + height++ + } + + return nmh, nil +} + +func (b *btcNode) handleGetData(m *wire.MsgGetData) (*wire.MsgBlock, error) { + b.mtx.Lock() + defer b.mtx.Unlock() + + // b.t.Logf("get data: %v", spew.Sdump(m)) + if len(m.InvList) != 1 { + return nil, fmt.Errorf("not supported multi invlist requests") + } + + v := m.InvList[0] + if v.Type != wire.InvTypeBlock { + return nil, fmt.Errorf("unsuported data type: %v", v.Type) + } + + block, ok := b.chain[v.Hash.String()] + if !ok { + return nil, fmt.Errorf("block not found: %v", v.Hash) + } + + return block.MsgBlock(), nil +} + +func (b *btcNode) handleRPC(ctx context.Context, conn net.Conn) { + b.t.Logf("got conn %v", conn.RemoteAddr()) + defer b.t.Logf("exit conn %v", conn.RemoteAddr()) + + p := &peer{ + conn: conn, + connected: time.Now(), + address: conn.RemoteAddr().String(), + protocolVersion: wire.AddrV2Version, + network: wire.TestNet, // regtest == testnet + } + + // Send version + mv := &wire.MsgVersion{ + ProtocolVersion: int32(wire.AddrV2Version), + } + if err := p.write(time.Second, mv); err != nil { + b.t.Logf("write version %v: %v", p, err) + return + } + + b.mtx.Lock() + b.p = p + b.mtx.Unlock() + + for { + select { + case <-ctx.Done(): + return + default: + } + + msg, err := p.read() + if err != nil { + if errors.Is(err, wire.ErrUnknownMessage) { + // ignore unknown + b.t.Log("wire: unknown message") + continue + } + b.t.Logf("peer read %v: %v", p, err) + return + } + + if err = b.handleMsg(ctx, p, msg); err != nil { + b.t.Logf("handle message %v: %v", p, err) + return + } + } +} + +func (b *btcNode) handleMsg(ctx context.Context, p *peer, msg wire.Message) error { + switch m := msg.(type) { + case *wire.MsgVersion: + mva := &wire.MsgVerAck{} + if err := p.write(time.Second, mva); err != nil { + return fmt.Errorf("write version ack: %w", err) + } + + case *wire.MsgGetHeaders: + // b.t.Logf("get headers %v", spew.Sdump(m)) + headers, err := b.handleGetHeaders(m) + if err != nil { + return fmt.Errorf("handle get headers: %w", err) + } + // b.t.Logf("%v", spew.Sdump(headers)) + if err = p.write(time.Second, headers); err != nil { + return fmt.Errorf("write headers: %w", err) + } + + case *wire.MsgGetData: + // b.t.Logf("get data %v", spew.Sdump(m)) + data, err := b.handleGetData(m) + if err != nil { + return fmt.Errorf("handle get data: %w", err) + } + // b.t.Logf("%v", spew.Sdump(data)) + if err = p.write(time.Second, data); err != nil { + return fmt.Errorf("write data: %w", err) + } + + default: + b.t.Logf("unhandled command: %v", spew.Sdump(msg)) + } + + return nil +} + +func (b *btcNode) SendBlockheader(ctx context.Context, bh wire.BlockHeader) error { + msg := wire.NewMsgHeaders() + msg.AddBlockHeader(&bh) + return b.p.write(defaultCmdTimeout, msg) +} + +func (b *btcNode) dumpChain(parent *chainhash.Hash) error { + b.mtx.Lock() + defer b.mtx.Unlock() + + for { + block, ok := b.chain[parent.String()] + if !ok { + return fmt.Errorf("parent not found: %v", parent) + } + b.t.Logf("%v: %v", block.Height(), block.Hash()) + + bh := block.MsgBlock().Header + parent = &bh.PrevBlock + if block.Height() == 0 { + return nil + } + } +} + +func newBlockTemplate(params *chaincfg.Params, payToAddress btcutil.Address, nextBlockHeight int32, parent *chainhash.Hash, extraNonce uint64) (*btcutil.Block, error) { + coinbaseScript, err := standardCoinbaseScript(nextBlockHeight, extraNonce) + if err != nil { + return nil, err + } + coinbaseTx, err := createCoinbaseTx(params, coinbaseScript, + nextBlockHeight, payToAddress) + if err != nil { + return nil, err + } + + reqDifficulty := uint32(0x1d00ffff) // XXX + + var blockTxs []*btcutil.Tx + blockTxs = append(blockTxs, coinbaseTx) + + msgBlock := &wire.MsgBlock{ + Header: wire.BlockHeader{ + Version: int32(vbTopBits), + PrevBlock: *parent, + MerkleRoot: blockchain.CalcMerkleRoot(blockTxs, false), + Timestamp: time.Now(), + Bits: reqDifficulty, + }, + } + for _, tx := range blockTxs { + if err = msgBlock.AddTransaction(tx.MsgTx()); err != nil { + return nil, fmt.Errorf("add transaction to block: %w", err) + } + } + + b := btcutil.NewBlock(msgBlock) + b.SetHeight(nextBlockHeight) + return b, nil +} + +func (b *btcNode) insertBlock(block *btcutil.Block) (int, error) { + b.chain[block.Hash().String()] = block + bAtHeight := b.blocksAtHeight[block.Height()] + b.blocksAtHeight[block.Height()] = append(bAtHeight, block) + return len(b.blocksAtHeight[block.Height()]), nil +} + +func (b *btcNode) blockHeadersAtHeight(height int32) ([]*wire.BlockHeader, error) { + bs, ok := b.blocksAtHeight[height] + if !ok { + return nil, fmt.Errorf("no block headers at: %v", height) + } + bhs := make([]*wire.BlockHeader, 0, len(bs)) + for _, v := range bs { + bhs = append(bhs, &v.MsgBlock().Header) + } + return bhs, nil +} + +func (b *btcNode) BlockHeadersAtHeight(height int32) ([]*wire.BlockHeader, error) { + b.mtx.Lock() + defer b.mtx.Unlock() + + return b.blockHeadersAtHeight(height) +} + +func (b *btcNode) Best() []*chainhash.Hash { + b.mtx.Lock() + defer b.mtx.Unlock() + + bhs, err := b.blockHeadersAtHeight(b.height) + if err != nil { + panic(err) + } + chs := make([]*chainhash.Hash, 0, len(bhs)) + for _, v := range bhs { + ch := v.BlockHash() + chs = append(chs, &ch) + } + return chs +} + +func random(count int) []byte { + b := make([]byte, count) + _, err := rand.Read(b) + if err != nil { + panic(err) + } + return b +} + +func (b *btcNode) Mine(count int, from *chainhash.Hash, payToAddress btcutil.Address) ([]*btcutil.Block, error) { + b.mtx.Lock() + defer b.mtx.Unlock() + + parent, ok := b.chain[from.String()] + if !ok { + return nil, errors.New("parent hash not found") + } + + blocks := make([]*btcutil.Block, 0, count) + for range count { + // extra nonce is needed to prevent block collisions + en := random(8) + extraNonce := binary.BigEndian.Uint64(en) + + nextBlockHeight := parent.Height() + 1 + block, err := newBlockTemplate(b.params, payToAddress, nextBlockHeight, + parent.Hash(), extraNonce) + if err != nil { + return nil, fmt.Errorf("height %v: %v", nextBlockHeight, err) + } + blocks = append(blocks, block) + b.t.Logf("mined %v: %v", nextBlockHeight, block.Hash()) + + n, err := b.insertBlock(block) + if err != nil { + return nil, fmt.Errorf("insert block at height %v: %v", + nextBlockHeight, err) + } + if n != 1 { + b.t.Logf("fork at: %v blocks %v", nextBlockHeight, n) + } + parent = block + b.height = nextBlockHeight + } + + return blocks, nil +} + +func (b *btcNode) Run(ctx context.Context) error { + lc := &net.ListenConfig{} + l, err := lc.Listen(ctx, "tcp", "localhost:"+b.port) + if err != nil { + return err + } + + for { + b.t.Logf("waiting for connection") + conn, err := l.Accept() + if err != nil { + return err + } + go b.handleRPC(ctx, conn) + } +} + +func newPKAddress(params *chaincfg.Params) (*btcec.PrivateKey, *btcutil.AddressPubKey, error) { + key, err := btcec.NewPrivateKey() + if err != nil { + return nil, nil, err + } + + pk := key.PubKey().SerializeUncompressed() + address, err := btcutil.NewAddressPubKey(pk, params) + if err != nil { + return nil, nil, err + } + return key, address, nil +} + +func TestBasic(t *testing.T) { + t.Skip() + + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + + key, address, err := newPKAddress(&chaincfg.RegressionNetParams) + if err != nil { + t.Fatal(err) + } + t.Logf("key : %v", key) + t.Logf("address: %v", address) + + n, err := newFakeNode(t, "18444") // TODO: should use random free port + if err != nil { + t.Fatal(err) + } + + go func() { + if err := n.Run(ctx); err != nil { + panic(fmt.Errorf("node exited with error: %w", err)) + } + }() + + startHash := n.Best() + count := 9 + expectedHeight := uint64(count) + + if _, err = n.Mine(count, startHash[0], address); err != nil { + t.Fatal(fmt.Errorf("mine: %w", err)) + } + + if err = n.dumpChain(n.Best()[0]); err != nil { + t.Fatal(fmt.Errorf("dump chain: %w", err)) + } + // t.Logf("%v", spew.Sdump(n.chain[n.Best()[0].String()])) + time.Sleep(1 * time.Second) // XXX + + // Connect tbc service + cfg := &Config{ + AutoIndex: true, // XXX for now + BlockSanity: false, + LevelDBHome: t.TempDir(), + ListenAddress: tbcapi.DefaultListen, // TODO: should use random free port + // LogLevel: "tbcd=TRACE:tbc=TRACE:level=DEBUG", + MaxCachedTxs: 1000, // XXX + Network: networkLocalnet, + PrometheusListenAddress: "", + } + _ = loggo.ConfigureLoggers(cfg.LogLevel) + s, err := NewServer(cfg) + if err != nil { + t.Fatal(err) + } + s.ignoreUlimit = true + go func() { + err := s.Run(ctx) + if err != nil && !errors.Is(err, context.Canceled) { + panic(err) + } + }() + + for { + select { + case <-ctx.Done(): + return + case <-time.After(time.Second): + } + + // See if we are synced + si := s.Synced(ctx) + if !(si.Synced && si.BlockHeaderHeight == expectedHeight) { + log.Infof("not synced") + continue + } + + // Execute tests + balance, err := s.BalanceByAddress(ctx, address.String()) + if err != nil { + t.Fatal(err) + } + // TODO: magic numbers should be extract into constants + if balance != uint64(count*5000000000) { + t.Fatalf("balance got %v wanted %v", balance, count*5000000000) + } + t.Logf("balance %v", spew.Sdump(balance)) + + utxos, err := s.UtxosByAddress(ctx, address.String(), 0, 100) + if err != nil { + t.Fatal(err) + } + t.Logf("%v", spew.Sdump(utxos)) + return + } +} + +func TestFork(t *testing.T) { + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + + key, address, err := newPKAddress(&chaincfg.RegressionNetParams) + if err != nil { + t.Fatal(err) + } + t.Logf("key : %v", key) + t.Logf("address: %v", address) + + n, err := newFakeNode(t, "18444") // TODO: should use random free port + if err != nil { + t.Fatal(err) + } + + go func() { + if err := n.Run(ctx); err != nil { + panic(err) + } + }() + + startHash := n.Best() + count := 9 + expectedHeight := uint64(count) + _, err = n.Mine(count, startHash[0], address) + if err != nil { + t.Fatal(err) + } + err = n.dumpChain(n.Best()[0]) + if err != nil { + t.Fatal(err) + } + // t.Logf("%v", spew.Sdump(n.chain[n.Best()[0].String()])) + time.Sleep(500 * time.Millisecond) // XXX + + // Connect tbc service + cfg := &Config{ + AutoIndex: false, + BlockSanity: false, + LevelDBHome: t.TempDir(), + ListenAddress: tbcapi.DefaultListen, // TODO: should use random free port + // LogLevel: "tbcd=TRACE:tbc=TRACE:level=DEBUG", + MaxCachedTxs: 1000, // XXX + Network: networkLocalnet, + PeersWanted: 1, + PrometheusListenAddress: "", + } + _ = loggo.ConfigureLoggers(cfg.LogLevel) + s, err := NewServer(cfg) + if err != nil { + t.Fatal(err) + } + s.ignoreUlimit = true + go func() { + err := s.Run(ctx) + if err != nil && !errors.Is(err, context.Canceled) { + panic(err) + } + }() + + for { + select { + case <-ctx.Done(): + return + case <-time.After(2 * time.Second): + } + + // See if we are at the right height + si := s.Synced(ctx) + if !(si.BlockHeaderHeight == expectedHeight) { + log.Infof("not synced") + continue + } + + // Don't execute balance tests if index is disabled. + if cfg.AutoIndex { + // Execute tests + balance, err := s.BalanceByAddress(ctx, address.String()) + if err != nil { + t.Fatal(err) + } + if balance != uint64(count*5000000000) { + t.Fatalf("balance got %v wanted %v", balance, count*5000000000) + } + t.Logf("balance %v", spew.Sdump(balance)) + utxos, err := s.UtxosByAddress(ctx, address.String(), 0, 100) + if err != nil { + t.Fatal(err) + } + t.Logf("%v", spew.Sdump(utxos)) + } + break + } + + // Check cumulative difficulty + difficulty, err := s.DifficultyAtHash(ctx, n.Best()[0]) + if err != nil { + t.Fatal(err) + } + // t.Logf("----- %x", blockchain.BigToCompact(difficulty)) + t.Logf("difficulty: 0x%064x", difficulty) + + // Advance both heads + b9 := n.Best()[0] + b10a, err := n.Mine(1, b9, address) + if err != nil { + t.Fatal(err) + } + b10b, err := n.Mine(1, b9, address) + if err != nil { + t.Fatal(err) + } + t.Logf("b10a: %v", b10a[0].Hash()) + t.Logf("b10b: %v", b10b[0].Hash()) + b10s := n.Best() + if len(b10s) != 2 { + t.Fatalf("expected 2 best blocks, got %v", len(b10s)) + } + + // Tell tbcd + err = n.SendBlockheader(ctx, b10a[0].MsgBlock().Header) + if err != nil { + t.Fatal(err) + } + err = n.SendBlockheader(ctx, b10b[0].MsgBlock().Header) + if err != nil { + t.Fatal(err) + } + // XXX check hashes + time.Sleep(500 * time.Millisecond) + + // Advance both heads again + b10aHash := b10a[0].MsgBlock().Header.BlockHash() + b11a, err := n.Mine(1, &b10aHash, address) + if err != nil { + t.Fatal(err) + } + b10bHash := b10b[0].MsgBlock().Header.BlockHash() + b11b, err := n.Mine(1, &b10bHash, address) + if err != nil { + t.Fatal(err) + } + t.Logf("b11a: %v", b11a[0].Hash()) + t.Logf("b11b: %v", b11b[0].Hash()) + b11s := n.Best() + if len(b11s) != 2 { + t.Fatalf("expected 2 best blocks, got %v", len(b11s)) + } + // Tell tbcd + err = n.SendBlockheader(ctx, b11a[0].MsgBlock().Header) + if err != nil { + t.Fatal(err) + } + time.Sleep(500 * time.Millisecond) + err = n.SendBlockheader(ctx, b11b[0].MsgBlock().Header) + if err != nil { + t.Fatal(err) + } + time.Sleep(500 * time.Millisecond) + + // Let's see if tbcd agrees + si := s.Synced(ctx) + // t.Logf("--- %v", si) + bhsAt11, err := s.BlockHeadersByHeight(ctx, 11) + if err != nil { + t.Fatal(err) + } + if len(bhsAt11) != 2 { + t.Fatalf("expected 2 best blocks, got %v", len(bhsAt11)) + } + // XXX check hashes + // t.Logf("block headers at 11: %v", spew.Sdump(bhsAt11)) + time.Sleep(500 * time.Millisecond) + if cfg.AutoIndex && !si.Synced { + t.Fatalf("expected synced chain") + } + + // Move 10b forward and overtake 11 a/b + + // go from + // /-> 11b -> + // 9 -> 10a -> 11a -> + // \-> 10b -> + // + // to + // + // /-> 11b -> + // 9 -> 10a -> 11a -> + // \-> 10b -> 11c -> 12 + t.Logf("mine 11c") + b11c, err := n.Mine(1, &b10bHash, address) + if err != nil { + t.Fatal(err) + } + b11cHash := b11c[0].MsgBlock().Header.BlockHash() + err = n.SendBlockheader(ctx, b11c[0].MsgBlock().Header) + if err != nil { + t.Fatal(err) + } + time.Sleep(500 * time.Millisecond) + + // 12 + t.Logf("mine 12") + b12, err := n.Mine(1, &b11cHash, address) + if err != nil { + t.Fatal(err) + } + err = n.SendBlockheader(ctx, b12[0].MsgBlock().Header) + if err != nil { + t.Fatal(err) + } + time.Sleep(500 * time.Millisecond) + + t.Logf("did we fork?") + + // Dump best chain + if err = n.dumpChain(n.Best()[0]); err != nil { + t.Fatal(err) + } +} + +// XXX this needs to actually test stuff. RN it is visual only. +func TestWork(t *testing.T) { + reqDifficulty := uint32(0x1d00ffff) // difficulty at genesis + hmm := (reqDifficulty & 0xf0000000) >> (7 * 4) + bits := uint32(419465580) + t.Logf("calc work: %x", hmm) + t.Logf("calc work: %x", blockchain.CalcWork(reqDifficulty)) + t.Logf("calc work: %v", blockchain.CalcWork(reqDifficulty)) + t.Logf("compact to big: %064x", blockchain.CompactToBig(reqDifficulty)) + t.Logf("compact to big: %v", blockchain.CompactToBig(reqDifficulty)) + targetDifficulty := blockchain.CompactToBig(bits) + t.Logf("%064x", targetDifficulty.Bytes()) + + x := uint32(0x1b0404cb) // difficulty at genesis + pp := new(big.Rat).SetInt(blockchain.CalcWork(x)) + t.Logf("0x%x: %064x %v", x, blockchain.CompactToBig(x), pp) + + // big.Int + // big.Rat + // func (z *Rat) SetFrac(a, b *Int) *Rat { + y := "0x00000000ffff0000000000000000000000000000000000000000000000000000" + yy, ok := new(big.Int).SetString(y, 0) + if !ok { + t.Fatal("yy") + } + z := "0x00000000000404CB000000000000000000000000000000000000000000000000" + zz, ok := new(big.Int).SetString(z, 0) + if !ok { + t.Fatal("zz") + } + + xx := new(big.Rat).SetFrac(yy, zz) + ff, _ := xx.Float64() + t.Logf("%v: %0.16f", xx, ff) + + // minimum target / target of difficulty + aaa := blockchain.CalcWork(reqDifficulty) + _bbb := "0x0000000000000000000000000000000000000000000000000000000900090009" + bbb, ok := new(big.Int).SetString(_bbb, 0) + if !ok { + t.Fatal("bbb") + } + zzz := new(big.Rat).SetFrac(bbb, aaa) + fff, _ := zzz.Float64() + t.Logf("%v: %0.16f", zzz, fff) + + t.Logf("calc work : 0x%x 0x%x", 0x170331db, blockchain.CalcWork(0x170331db)) + t.Logf("compact to big: 0x%x", blockchain.CompactToBig(0x170331db)) +} + +// borrowed from btcd +// +// Copyright (c) 2014-2016 The btcsuite developers +// Use of this source code is governed by an ISC +// license that can be found in the LICENSE file. +var ( + CoinbaseFlags = "/P2SH/btcd/" + vbTopBits = 0x20000000 +) + +func standardCoinbaseScript(nextBlockHeight int32, extraNonce uint64) ([]byte, error) { + return txscript.NewScriptBuilder().AddInt64(int64(nextBlockHeight)). + AddInt64(int64(extraNonce)).AddData([]byte(CoinbaseFlags)). + Script() +} + +func createCoinbaseTx(params *chaincfg.Params, coinbaseScript []byte, nextBlockHeight int32, addr btcutil.Address) (*btcutil.Tx, error) { + // Create the script to pay to the provided payment address if one was + // specified. Otherwise, create a script that allows the coinbase to be + // redeemable by anyone. + var pkScript []byte + if addr != nil { + var err error + pkScript, err = txscript.PayToAddrScript(addr) + if err != nil { + return nil, err + } + } else { + var err error + scriptBuilder := txscript.NewScriptBuilder() + pkScript, err = scriptBuilder.AddOp(txscript.OP_TRUE).Script() + if err != nil { + return nil, err + } + } + + tx := wire.NewMsgTx(wire.TxVersion) + tx.AddTxIn(&wire.TxIn{ + // Coinbase transactions have no inputs, so previous outpoint is + // zero hash and max index. + PreviousOutPoint: *wire.NewOutPoint(&chainhash.Hash{}, + wire.MaxPrevOutIndex), + SignatureScript: coinbaseScript, + Sequence: wire.MaxTxInSequenceNum, + }) + tx.AddTxOut(&wire.TxOut{ + Value: blockchain.CalcBlockSubsidy(nextBlockHeight, params), + PkScript: pkScript, + }) + return btcutil.NewTx(tx), nil +} + +// end borrowed from btcd diff --git a/service/tbc/ulimit_linux.go b/service/tbc/ulimit_linux.go index 5b041449..e94b74e9 100644 --- a/service/tbc/ulimit_linux.go +++ b/service/tbc/ulimit_linux.go @@ -30,7 +30,7 @@ var ( } resourceWant = map[int]unix.Rlimit{ unix.RLIMIT_AS: {Cur: unix.RLIM_INFINITY, Max: unix.RLIM_INFINITY}, - unix.RLIMIT_MEMLOCK: {Cur: 775258112, Max: 775258112}, + unix.RLIMIT_MEMLOCK: {Cur: 775254016, Max: 775254016}, unix.RLIMIT_NOFILE: {Cur: 16384, Max: 16384}, unix.RLIMIT_NPROC: {Cur: 4196, Max: 4196}, unix.RLIMIT_RSS: {Cur: math.MaxUint64, Max: math.MaxUint64},