Skip to content

Commit

Permalink
replace hex keys to identites
Browse files Browse the repository at this point in the history
  • Loading branch information
0xluk committed Mar 6, 2024
1 parent 43019ad commit f18e6de
Show file tree
Hide file tree
Showing 13 changed files with 515 additions and 459 deletions.
14 changes: 7 additions & 7 deletions main.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,7 @@ func run() error {
}
Qubic struct {
NodePort string `conf:"default:21841"`
FallbackTick uint64 `conf:"default:12769858"`
FallbackTick uint64 `conf:"default:12789267"`
StorageFolder string `conf:"default:store"`
ProcessTickTimeout time.Duration `conf:"default:5s"`
}
Expand Down Expand Up @@ -84,12 +84,6 @@ func run() error {

ps := store.NewPebbleStore(db, nil)

rpcServer := rpc.NewServer(cfg.Server.GrpcHost, cfg.Server.HttpHost, ps, nil)
rpcServer.Start()

shutdown := make(chan os.Signal, 1)
signal.Notify(shutdown, os.Interrupt, syscall.SIGTERM)

fact := factory.NewQubicConnection(cfg.Pool.NodeFetcherTimeout, cfg.Pool.NodeFetcherUrl)
poolConfig := pool.Config{
InitialCap: cfg.Pool.InitialCap,
Expand All @@ -105,6 +99,12 @@ func run() error {
return errors.Wrap(err, "creating new connection pool")
}

rpcServer := rpc.NewServer(cfg.Server.GrpcHost, cfg.Server.HttpHost, ps, chPool)
rpcServer.Start()

shutdown := make(chan os.Signal, 1)
signal.Notify(shutdown, os.Interrupt, syscall.SIGTERM)

p := processor.NewProcessor(chPool, ps, cfg.Qubic.FallbackTick, cfg.Qubic.ProcessTickTimeout)
archiveErrors := make(chan error, 1)

Expand Down
10 changes: 2 additions & 8 deletions processor/processor.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,6 @@ package processor

import (
"context"
"fmt"
"github.com/pkg/errors"
"github.com/qubic/go-archiver/store"
"github.com/qubic/go-archiver/validator"
Expand Down Expand Up @@ -42,16 +41,11 @@ func NewProcessor(pool pool.Pool, ps *store.PebbleStore, fallbackNextProcessingT
}

func (p *Processor) Start() error {
var titfError *TickInTheFutureError
for {
err := p.processOneByOne()
if err != nil {
if errors.As(err, &titfError) {
log.Printf("Processing failed: %s", err.Error())
time.Sleep(1 * time.Second)
continue
}
log.Printf("Processing failed: %s", err.Error())
time.Sleep(1 * time.Second)
}
}
}
Expand Down Expand Up @@ -86,7 +80,7 @@ func (p *Processor) processOneByOne() error {
if err != nil {
return errors.Wrap(err, "getting next processing tick")
}
fmt.Printf("Next tick to process: %d\n", nextTick)
log.Printf("Next tick to process: %d\n", nextTick)
tickInfo, err := client.GetTickInfo(ctx)
if err != nil {
return errors.Wrap(err, "getting tick info")
Expand Down
570 changes: 283 additions & 287 deletions protobuff/archive.pb.go

Large diffs are not rendered by default.

19 changes: 5 additions & 14 deletions protobuff/archive.proto
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ message TickData {
uint64 timestamp = 4;
bytes var_struct = 5;
bytes time_lock = 6;
repeated string transaction_digests_hex = 7;
repeated string transaction_ids = 7;
repeated int64 contract_fees = 8;
string signature_hex = 9;
}
Expand All @@ -25,15 +25,15 @@ message GetTickDataResponse {
}

message Transaction {
string source_pubkey_hex = 1;
string dest_pubkey_hex = 2;
string source_id = 1;
string dest_id = 2;
int64 amount = 3;
uint32 tick_number = 4;
uint32 input_type = 5;
uint32 input_size = 6;
string input_hex = 7;
string signature_hex = 8;
string digest_hex = 9;
string tx_id = 9;
}

message GetTransactionRequest {
Expand All @@ -56,15 +56,6 @@ message GetTickTransactionsResponse {
repeated Transaction transactions = 1;
}

//type QuorumDiff struct {
// SaltedResourceTestingDigest uint64
// SaltedSpectrumDigest [32]byte
// SaltedUniverseDigest [32]byte
// SaltedComputerDigest [32]byte
// ExpectedNextTickTxDigest [32]byte
// Signature [SignatureSize]byte
//}

message QuorumDiff {
string salted_resource_testing_digest_hex = 1;
string salted_spectrum_digest_hex = 2;
Expand Down Expand Up @@ -114,7 +105,7 @@ message GetComputorsResponse {
}

message IdentityInfo {
string pubkey_hex = 1;
string id = 1;
uint32 tick_number = 2;
int64 balance = 3;
int64 incoming_amount = 4;
Expand Down
30 changes: 19 additions & 11 deletions rpc/rpc_server.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ import (
"github.com/qubic/go-archiver/store"
qubic "github.com/qubic/go-node-connector"
"github.com/qubic/go-node-connector/types"
"github.com/silenceper/pool"
"google.golang.org/grpc"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/credentials/insecure"
Expand All @@ -25,15 +26,15 @@ type Server struct {
listenAddrGRPC string
listenAddrHTTP string
store *store.PebbleStore
qc *qubic.Client
pool pool.Pool
}

func NewServer(listenAddrGRPC, listenAddrHTTP string, store *store.PebbleStore, qc *qubic.Client) *Server {
func NewServer(listenAddrGRPC, listenAddrHTTP string, store *store.PebbleStore, p pool.Pool) *Server {
return &Server{
listenAddrGRPC: listenAddrGRPC,
listenAddrHTTP: listenAddrHTTP,
store: store,
qc: qc,
pool: p,
}
}

Expand All @@ -60,12 +61,7 @@ func (s *Server) GetTickTransactions(ctx context.Context, req *protobuff.GetTick
return &protobuff.GetTickTransactionsResponse{Transactions: txs.Transactions}, nil
}
func (s *Server) GetTransaction(ctx context.Context, req *protobuff.GetTransactionRequest) (*protobuff.GetTransactionResponse, error) {
id := types.Identity(req.TxId)
digest, err := id.ToPubKey(true)
if err != nil {
return nil, status.Errorf(codes.InvalidArgument, "invalid tx id: %v", err)
}
tx, err := s.store.GetTransaction(ctx, digest[:])
tx, err := s.store.GetTransaction(ctx, req.TxId)
if err != nil {
if errors.Cause(err) == store.ErrNotFound {
return nil, status.Errorf(codes.NotFound, "transaction not found")
Expand Down Expand Up @@ -98,7 +94,14 @@ func (s *Server) GetComputors(ctx context.Context, req *protobuff.GetComputorsRe
return &protobuff.GetComputorsResponse{Computors: computors}, nil
}
func (s *Server) GetIdentityInfo(ctx context.Context, req *protobuff.GetIdentityInfoRequest) (*protobuff.GetIdentityInfoResponse, error) {
addr, err := s.qc.GetIdentity(ctx, req.Identity)
qcv, err := s.pool.Get()
if err != nil {
return nil, status.Errorf(codes.Internal, "getting qubic pooled client connection: %v", err)
}
defer s.pool.Put(qcv)

client := qcv.(*qubic.Client)
addr, err := client.GetIdentity(ctx, req.Identity)
if err != nil {
return nil, status.Errorf(codes.Internal, "getting identity info: %v", err)
}
Expand All @@ -111,8 +114,13 @@ func (s *Server) GetIdentityInfo(ctx context.Context, req *protobuff.GetIdentity
siblings = append(siblings, hex.EncodeToString(sibling[:]))
}

var addrID types.Identity
addrID, err = addrID.FromPubKey(addr.AddressData.PublicKey, false)
if err != nil {
return nil, status.Errorf(codes.Internal, "getting address id: %v", err)
}
return &protobuff.GetIdentityInfoResponse{IdentityInfo: &protobuff.IdentityInfo{
PubkeyHex: hex.EncodeToString(addr.AddressData.PublicKey[:]),
Id: addrID.String(),
TickNumber: addr.Tick,
Balance: addr.AddressData.IncomingAmount - addr.AddressData.OutgoingAmount,
IncomingAmount: addr.AddressData.IncomingAmount,
Expand Down
4 changes: 2 additions & 2 deletions store/keys.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,9 +33,9 @@ func computorsKey(epochNumber uint64) []byte {
return key
}

func tickTxKey(digest []byte) ([]byte, error) {
func tickTxKey(txID string) ([]byte, error) {
key := []byte{Transaction}
key = append(key, digest...)
key = append(key, []byte(txID)...)

return key, nil
}
Expand Down
29 changes: 8 additions & 21 deletions store/store.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,6 @@ package store
import (
"context"
"encoding/binary"
"encoding/hex"
"github.com/cockroachdb/pebble"
"github.com/pkg/errors"
"github.com/qubic/go-archiver/protobuff"
Expand Down Expand Up @@ -135,14 +134,9 @@ func (s *PebbleStore) SetTickTransactions(ctx context.Context, txs *protobuff.Tr
defer batch.Close()

for _, tx := range txs.GetTransactions() {
digest, err := hex.DecodeString(tx.DigestHex)
key, err := tickTxKey(tx.TxId)
if err != nil {
return errors.Wrapf(err, "decoding hex digest: %s", tx.DigestHex)
}

key, err := tickTxKey(digest)
if err != nil {
return errors.Wrapf(err, "creating tx key for digest: %s", digest)
return errors.Wrapf(err, "creating tx key for id: %s", tx.TxId)
}

serialized, err := protojson.MarshalOptions{EmitDefaultValues: true}.Marshal(tx)
Expand Down Expand Up @@ -173,20 +167,15 @@ func (s *PebbleStore) GetTickTransactions(ctx context.Context, tickNumber uint64
return nil, errors.Wrap(err, "getting tick data")
}

txs := make([]*protobuff.Transaction, 0, len(td.TransactionDigestsHex))
for _, digestHex := range td.TransactionDigestsHex {
digest, err := hex.DecodeString(digestHex)
if err != nil {
return nil, errors.Wrapf(err, "decoding hex digest: %s", digestHex)
}

tx, err := s.GetTransaction(ctx, digest)
txs := make([]*protobuff.Transaction, 0, len(td.TransactionIds))
for _, txID := range td.TransactionIds {
tx, err := s.GetTransaction(ctx, txID)
if err != nil {
if errors.Is(err, ErrNotFound) {
return nil, ErrNotFound
}

return nil, errors.Wrapf(err, "getting tx for digest: %s", digestHex)
return nil, errors.Wrapf(err, "getting tx for id: %s", txID)
}

txs = append(txs, tx)
Expand All @@ -195,8 +184,8 @@ func (s *PebbleStore) GetTickTransactions(ctx context.Context, tickNumber uint64
return &protobuff.Transactions{Transactions: txs}, nil
}

func (s *PebbleStore) GetTransaction(ctx context.Context, digest []byte) (*protobuff.Transaction, error) {
key, err := tickTxKey(digest)
func (s *PebbleStore) GetTransaction(ctx context.Context, txID string) (*protobuff.Transaction, error) {
key, err := tickTxKey(txID)
if err != nil {
return nil, errors.Wrap(err, "getting tx key")
}
Expand Down Expand Up @@ -246,5 +235,3 @@ func (s *PebbleStore) GetLastProcessedTick(ctx context.Context) (uint64, error)

return binary.LittleEndian.Uint64(value), nil
}


Loading

0 comments on commit f18e6de

Please sign in to comment.