diff --git a/abci/client/client.go b/abci/client/client.go index c5afa771e2..82a4dd2bce 100644 --- a/abci/client/client.go +++ b/abci/client/client.go @@ -42,6 +42,8 @@ func NewClient(logger log.Logger, addr, transport string, mustConnect bool) (Cli return NewSocketClient(logger, addr, mustConnect), nil case "grpc": return NewGRPCClient(logger, addr, mustConnect), nil + case "routed": + return NewRoutedClientWithAddr(logger, addr, mustConnect) default: return nil, fmt.Errorf("unknown abci transport %s", transport) } diff --git a/abci/client/routed_client.go b/abci/client/routed_client.go new file mode 100644 index 0000000000..6743073db6 --- /dev/null +++ b/abci/client/routed_client.go @@ -0,0 +1,375 @@ +package abciclient + +import ( + "context" + "fmt" + "reflect" + "runtime" + "strings" + "time" + + "github.com/hashicorp/go-multierror" + + "github.com/dashpay/tenderdash/abci/types" + "github.com/dashpay/tenderdash/libs/log" + "github.com/dashpay/tenderdash/libs/service" +) + +type routedClient struct { + service.Service + logger log.Logger + routing Routing + defaultClient ClientInfo +} + +var _ Client = (*routedClient)(nil) + +type RequestType string +type Routing map[RequestType][]ClientInfo + +type ClientInfo struct { + Client + // ClientID is an unique, human-readable, client identifier + ClientID string +} + +// NewRoutedClientWithAddr returns a new ABCI client that routes requests to multiple +// underlying clients based on the request type. +// +// It takes a comma-separated list of routing rules, consisting of colon-separated: request type, transport, address. +// Special request type "*" is used for default client. +// +// Example: +// +// ``` +// +// "Info:socket:unix:///tmp/socket.1,Info:socket:unix:///tmp/socket.2,CheckTx:socket:unix:///tmp/socket.1,*:socket:unix:///tmp/socket.3" +// +// ``` +// +// # Arguments +// - `logger` - The logger to use for the client. +// - `addr` - comma-separated list of routing rules, consisting of request type, transport name and client address separated with colon. +// Special request type "*" is used for default client. +func NewRoutedClientWithAddr(logger log.Logger, addr string, mustConnect bool) (Client, error) { + // Split the routing rules + routing := make(Routing) + clients := make(map[string]Client) + var defaultClient Client + + rules := strings.Split(addr, ",") + + for _, rule := range rules { + parts := strings.SplitN(rule, ":", 3) + if len(parts) != 3 { + return nil, fmt.Errorf("invalid routing rule: %s", rule) + } + requestType := strings.TrimSpace(parts[0]) + transport := strings.TrimSpace(parts[1]) + address := strings.TrimSpace(parts[2]) + + // Create a new client if it doesn't exist + clientName := fmt.Sprintf("%s:%s", transport, address) + if _, ok := clients[clientName]; !ok { + c, err := NewClient(logger, address, transport, mustConnect) + if err != nil { + return nil, err + } + clients[clientName] = c + } + + // Add the client to the routing table + if requestType == "*" { + if defaultClient != nil { + return nil, fmt.Errorf("multiple default clients") + } + defaultClient = clients[clientName] + continue + } + + client := clients[clientName] + routing[RequestType(requestType)] = append(routing[RequestType(requestType)], ClientInfo{client, clientName}) + } + + if defaultClient == nil { + return nil, fmt.Errorf("no default client defined for routed client address %s", addr) + } + + return NewRoutedClient(logger, defaultClient, routing) +} + +// NewRoutedClient returns a new ABCI client that routes requests to the +// appropriate underlying client based on the request type. +// +// # Arguments +// +// - `logger` - The logger to use for the client. +// - `defaultClient` - The default client to use when no specific client is +// configured for a request type. +// - `routing` - The clients to route requests to. +// +// See docs of routedClient.delegate() for more details about implemented logic. +func NewRoutedClient(logger log.Logger, defaultClient Client, routing Routing) (Client, error) { + defaultClientID := "" + if s, ok := defaultClient.(fmt.Stringer); ok { + defaultClientID = fmt.Sprintf("DEFAULT:%s", s.String()) + } else { + defaultClientID = "DEFAULT" + } + + cli := &routedClient{ + logger: logger, + routing: routing, + defaultClient: ClientInfo{defaultClient, defaultClientID}, + } + + cli.Service = service.NewBaseService(logger, "RoutedClient", cli) + return cli, nil +} + +func (cli *routedClient) OnStart(ctx context.Context) error { + var errs error + for _, clients := range cli.routing { + for _, client := range clients { + if !client.IsRunning() { + if err := client.Start(ctx); err != nil { + errs = multierror.Append(errs, err) + } + } + } + } + + if !cli.defaultClient.IsRunning() { + if err := cli.defaultClient.Start(ctx); err != nil { + errs = multierror.Append(errs, err) + } + } + + return errs +} + +func (cli *routedClient) OnStop() { + for _, clients := range cli.routing { + for _, client := range clients { + if client.IsRunning() { + switch c := client.Client.(type) { + case *socketClient: + c.Stop() + case *localClient: + c.Stop() + case *grpcClient: + c.Stop() + } + } + } + } +} + +// delegate calls the given function on the appropriate client with the given +// arguments. +// +// It executes the given function on all clients configured in the routing table. +// If no client is configured for the given function, it calls the function on the +// default client. +// +// If more than one client is configured for the given function, it calls the +// function on each client in turn, and returns first result where any of returned +// values is non-zero. Results of subsequent calls are silently dropped. +// +// If all clients return only zero values, it returns response from last client, which is effectively +// also a zero value. +// +// If the function returns only 1 value, it assumes it is of type `error`. +// Otherwise it assumes response is `result, error`. +// +// When a function call returns an error, error is returned and remaining functions are NOT called. +func (cli *routedClient) delegate(args ...interface{}) (firstResult any, err error) { + // Get the caller function name; it will be our request type + pc, _, _, _ := runtime.Caller(1) + funcObj := runtime.FuncForPC(pc) + funcName := funcObj.Name() + // remove package name + funcName = funcName[strings.LastIndex(funcName, ".")+1:] + + clients, ok := cli.routing[RequestType(funcName)] + if !ok { + clients = []ClientInfo{cli.defaultClient} + cli.logger.Trace("no client found for method, falling back to default client", "method", funcName) + } + // client that provided first non-zero result + winner := "" + + startAll := time.Now() + + var ret any + for _, client := range clients { + start := time.Now() + + zerosReturned, results := cli.call(client, funcName, args...) + if ret, err = parseReturned(funcName, results); err != nil { + cli.logger.Error("abci client returned error", "client_id", client.ClientID, "err", err) + return ret, err + } + + // return first non-zero result + if !zerosReturned && firstResult == nil { + firstResult = ret + winner = client.ClientID + } + + cli.logger.Trace("routed ABCI request to a client", + "method", funcName, + "client_id", client.ClientID, + "nil", zerosReturned, + "took", time.Since(start).String()) + } + + cli.logger.Trace("routed ABCI request execution successful", + "method", funcName, + "client_id", winner, + "took", time.Since(startAll).String(), + "nil", firstResult == nil) + + if firstResult == nil { + firstResult = ret + } + + return firstResult, err +} + +// call calls the given function on the given client with the given arguments. +// It returns whether all returned values are zero, and these values itself. +func (cli *routedClient) call(client Client, funcName string, args ...interface{}) (onlyZeros bool, result []interface{}) { + method := reflect.ValueOf(client).MethodByName(funcName) + if !method.IsValid() { + panic(fmt.Sprintf("no method %s on client %T", funcName, client)) + } + + arguments := make([]reflect.Value, 0, len(args)) + for _, arg := range args { + arguments = append(arguments, reflect.ValueOf(arg)) + } + + values := method.Call(arguments) + + onlyZeros = true + + result = make([]interface{}, 0, len(values)) + for _, v := range values { + if !v.IsZero() { + onlyZeros = false + } + result = append(result, v.Interface()) + } + + return onlyZeros, result +} + +func parseReturned(funcName string, ret []interface{}) (any, error) { + switch len(ret) { + case 0: + // should never happen + return nil, fmt.Errorf("no result from any client for ABCI method %s", funcName) + case 1: + err, _ := ret[0].(error) + return nil, err + + case 2: + err, _ := ret[1].(error) + return ret[0], err + default: + panic(fmt.Sprintf("unexpected number of return values: %d", len(ret))) + } +} + +// Error returns an error if the client was stopped abruptly. +func (cli *routedClient) Error() error { + var errs error + for _, clients := range cli.routing { + for _, client := range clients { + err := client.Error() + if err != nil { + errs = multierror.Append(errs, err) + } + } + } + + return errs +} + +/// Implement the Application interface + +func (cli *routedClient) Flush(ctx context.Context) error { + _, err := cli.delegate(ctx) + return err +} + +func (cli *routedClient) Echo(ctx context.Context, msg string) (*types.ResponseEcho, error) { + result, err := cli.delegate(ctx, msg) + return result.(*types.ResponseEcho), err +} + +func (cli *routedClient) Info(ctx context.Context, req *types.RequestInfo) (*types.ResponseInfo, error) { + result, err := cli.delegate(ctx, req) + return result.(*types.ResponseInfo), err +} + +func (cli *routedClient) CheckTx(ctx context.Context, req *types.RequestCheckTx) (*types.ResponseCheckTx, error) { + result, err := cli.delegate(ctx, req) + return result.(*types.ResponseCheckTx), err +} + +func (cli *routedClient) Query(ctx context.Context, req *types.RequestQuery) (*types.ResponseQuery, error) { + result, err := cli.delegate(ctx, req) + return result.(*types.ResponseQuery), err +} + +func (cli *routedClient) InitChain(ctx context.Context, req *types.RequestInitChain) (*types.ResponseInitChain, error) { + result, err := cli.delegate(ctx, req) + return result.(*types.ResponseInitChain), err +} + +func (cli *routedClient) ListSnapshots(ctx context.Context, req *types.RequestListSnapshots) (*types.ResponseListSnapshots, error) { + result, err := cli.delegate(ctx, req) + return result.(*types.ResponseListSnapshots), err +} + +func (cli *routedClient) OfferSnapshot(ctx context.Context, req *types.RequestOfferSnapshot) (*types.ResponseOfferSnapshot, error) { + result, err := cli.delegate(ctx, req) + return result.(*types.ResponseOfferSnapshot), err +} + +func (cli *routedClient) LoadSnapshotChunk(ctx context.Context, req *types.RequestLoadSnapshotChunk) (*types.ResponseLoadSnapshotChunk, error) { + result, err := cli.delegate(ctx, req) + return result.(*types.ResponseLoadSnapshotChunk), err +} + +func (cli *routedClient) ApplySnapshotChunk(ctx context.Context, req *types.RequestApplySnapshotChunk) (*types.ResponseApplySnapshotChunk, error) { + result, err := cli.delegate(ctx, req) + return result.(*types.ResponseApplySnapshotChunk), err +} + +func (cli *routedClient) PrepareProposal(ctx context.Context, req *types.RequestPrepareProposal) (*types.ResponsePrepareProposal, error) { + result, err := cli.delegate(ctx, req) + return result.(*types.ResponsePrepareProposal), err +} + +func (cli *routedClient) ProcessProposal(ctx context.Context, req *types.RequestProcessProposal) (*types.ResponseProcessProposal, error) { + result, err := cli.delegate(ctx, req) + return result.(*types.ResponseProcessProposal), err +} + +func (cli *routedClient) ExtendVote(ctx context.Context, req *types.RequestExtendVote) (*types.ResponseExtendVote, error) { + result, err := cli.delegate(ctx, req) + return result.(*types.ResponseExtendVote), err +} + +func (cli *routedClient) VerifyVoteExtension(ctx context.Context, req *types.RequestVerifyVoteExtension) (*types.ResponseVerifyVoteExtension, error) { + result, err := cli.delegate(ctx, req) + return result.(*types.ResponseVerifyVoteExtension), err +} + +func (cli *routedClient) FinalizeBlock(ctx context.Context, req *types.RequestFinalizeBlock) (*types.ResponseFinalizeBlock, error) { + result, err := cli.delegate(ctx, req) + return result.(*types.ResponseFinalizeBlock), err +} diff --git a/abci/client/routed_client_test.go b/abci/client/routed_client_test.go new file mode 100644 index 0000000000..b796bf6ec6 --- /dev/null +++ b/abci/client/routed_client_test.go @@ -0,0 +1,152 @@ +package abciclient_test + +import ( + "context" + "fmt" + "sync" + "testing" + "time" + + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/mock" + "github.com/stretchr/testify/require" + + abciclient "github.com/dashpay/tenderdash/abci/client" + "github.com/dashpay/tenderdash/abci/server" + "github.com/dashpay/tenderdash/abci/types" + "github.com/dashpay/tenderdash/abci/types/mocks" + "github.com/dashpay/tenderdash/libs/log" +) + +// TestRouting tests the RoutedClient. +// +// Given 3 clients: defaultApp, consensusApp and queryApp: +// * when a request of type Info is made, it should be routed to defaultApp +// * when a request of type FinalizeBlock is made, it should be first routed to queryApp, then to consensusApp +// * when a request of type CheckTx is made, it should be routed to queryApp +// * when a request of type PrepareProposal is made, it should be routed to to consensusApp +func TestRouting(t *testing.T) { + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + // infoMtx blocks Info until we finish the test + var infoMtx sync.Mutex + infoMtx.Lock() + infoExecuted := false + + logger := log.NewTestingLogger(t) + + defaultApp, defaultSocket := startApp(ctx, t, logger, "default") + defer defaultApp.AssertExpectations(t) + + defaultApp.On("Info", mock.Anything, mock.Anything).Return(&types.ResponseInfo{ + Data: "info", + }, nil).Run(func(_args mock.Arguments) { + t.Log("Info: before lock") + infoMtx.Lock() + defer infoMtx.Unlock() + t.Log("Info: after lock") + infoExecuted = true + }).Once() + + queryApp, querySocket := startApp(ctx, t, logger, "query") + defer queryApp.AssertExpectations(t) + queryApp.On("CheckTx", mock.Anything, mock.Anything).Return(&types.ResponseCheckTx{ + Priority: 1, + }, nil).Once() + queryApp.On("FinalizeBlock", mock.Anything, mock.Anything).Return(&types.ResponseFinalizeBlock{}, nil).Once() + + consensusApp, consensusSocket := startApp(ctx, t, logger, "consensus") + defer consensusApp.AssertExpectations(t) + consensusApp.On("PrepareProposal", mock.Anything, mock.Anything).Return(&types.ResponsePrepareProposal{ + AppHash: []byte("apphash"), + }, nil).Once() + consensusApp.On("FinalizeBlock", mock.Anything, mock.Anything).Return(&types.ResponseFinalizeBlock{ + RetainHeight: 1, + }, nil).Once() + + addr := fmt.Sprintf("CheckTx:socket:%s", querySocket) + + fmt.Sprintf(",FinalizeBlock:socket:%s,FinalizeBlock:socket:%s", querySocket, consensusSocket) + + fmt.Sprintf(",PrepareProposal:socket:%s", consensusSocket) + + fmt.Sprintf(",*:socket:%s", defaultSocket) + + logger.Info("configuring routed abci client with address", "addr", addr) + routedClient, err := abciclient.NewRoutedClientWithAddr(logger, addr, true) + assert.NoError(t, err) + err = routedClient.Start(ctx) + assert.NoError(t, err) + + // Test routing + wg := sync.WaitGroup{} + + // Info is called from separate thread, as we want it to block + // to see if we can execute other calls (on other clients) without blocking + wg.Add(1) + go func() { + // info is locked, so it should finish last + _, err := routedClient.Info(ctx, &types.RequestInfo{}) + require.NoError(t, err) + wg.Done() + }() + + // CheckTx + _, err = routedClient.CheckTx(ctx, &types.RequestCheckTx{}) + assert.NoError(t, err) + + // FinalizeBlock + _, err = routedClient.FinalizeBlock(ctx, &types.RequestFinalizeBlock{}) + assert.NoError(t, err) + + // PrepareProposal + _, err = routedClient.PrepareProposal(ctx, &types.RequestPrepareProposal{}) + assert.NoError(t, err) + + // unlock info + assert.False(t, infoExecuted) + infoMtx.Unlock() + wg.Wait() + assert.True(t, infoExecuted) +} + +func startApp(ctx context.Context, t *testing.T, logger log.Logger, id string) (*mocks.Application, string) { + app := mocks.NewApplication(t) + defer app.AssertExpectations(t) + + addr := fmt.Sprintf("unix://%s/%s", t.TempDir(), "/socket."+id) + + server, err := server.NewServer(logger, addr, "socket", app) + require.NoError(t, err) + err = server.Start(ctx) + require.NoError(t, err) + + return app, addr +} + +// / TestRoutedClientGrpc tests the RoutedClient correctly forwards requests to a gRPC server. +func TestRoutedClientGrpc(t *testing.T) { + ctx, cancel := context.WithTimeout(context.Background(), 100*time.Millisecond) + defer cancel() + + logger := log.NewTestingLogger(t) + + // app := types.NewBaseApplication() + app := mocks.NewApplication(t) + defer app.AssertExpectations(t) + app.On("Echo", mock.Anything, mock.Anything).Return( + func(_ctx context.Context, msg *types.RequestEcho) (*types.ResponseEcho, error) { + return &types.ResponseEcho{Message: msg.Message}, nil + }).Maybe() + app.On("Info", mock.Anything, mock.Anything).Return(&types.ResponseInfo{}, nil).Once() + + grpcServer := server.NewGRPCServer(logger, "tcp://127.0.0.1:1234", app) + require.NoError(t, grpcServer.Start(ctx)) + + addr := "*:grpc:127.0.0.1:1234" + logger.Info("configuring routed abci client with address", "addr", addr) + client, err := abciclient.NewRoutedClientWithAddr(logger, addr, true) + require.NoError(t, err) + require.NoError(t, client.Start(ctx)) + + _, err = client.Info(ctx, &types.RequestInfo{}) + assert.NoError(t, err) + +} diff --git a/abci/client/socket_client.go b/abci/client/socket_client.go index 26adc2318f..9dacbc71d5 100644 --- a/abci/client/socket_client.go +++ b/abci/client/socket_client.go @@ -99,6 +99,14 @@ func (cli *socketClient) OnStop() { cli.drainQueue() } +func (cli *socketClient) String() string { + if err := cli.Error(); err != nil { + return fmt.Sprintf("%T(%s):err=%s", cli, cli.addr, err.Error()) + } + + return fmt.Sprintf("%T(%s)", cli, cli.addr) +} + // Error returns an error if the client was stopped abruptly. func (cli *socketClient) Error() error { cli.mtx.Lock() diff --git a/cmd/tenderdash/commands/run_node.go b/cmd/tenderdash/commands/run_node.go index 8abe55755c..7ad2cb782d 100644 --- a/cmd/tenderdash/commands/run_node.go +++ b/cmd/tenderdash/commands/run_node.go @@ -51,7 +51,7 @@ func AddNodeFlags(cmd *cobra.Command, conf *cfg.Config) { conf.ProxyApp, "proxy app address, or one of: 'kvstore',"+ " 'persistent_kvstore', 'e2e' or 'noop' for local testing.") - cmd.Flags().String("abci", conf.ABCI, "specify abci transport (socket | grpc)") + cmd.Flags().String("abci", conf.ABCI, "specify abci transport (socket | grpc | routed)") // rpc flags cmd.Flags().String("rpc.laddr", conf.RPC.ListenAddress, "RPC listen address. Port required") diff --git a/config/config.go b/config/config.go index 985236acef..6600d8e357 100644 --- a/config/config.go +++ b/config/config.go @@ -165,7 +165,7 @@ type BaseConfig struct { //nolint: maligned // This should be set in viper so it can unmarshal into this struct RootDir string `mapstructure:"home"` - // TCP or UNIX socket address of the ABCI application, + // TCP or UNIX socket address of the ABCI application,or routing rules for routed ABCI client, // or the name of an ABCI application compiled in with the Tendermint binary ProxyApp string `mapstructure:"proxy-app"` @@ -223,7 +223,7 @@ type BaseConfig struct { //nolint: maligned // A JSON file containing the private key to use for p2p authenticated encryption NodeKey string `mapstructure:"node-key-file"` - // Mechanism to connect to the ABCI application: socket | grpc + // Mechanism to connect to the ABCI application: socket | grpc | routed ABCI string `mapstructure:"abci"` // If true, query the ABCI app on connecting to a new peer diff --git a/config/toml.go b/config/toml.go index d3e7aa2eca..7eb8bcc845 100644 --- a/config/toml.go +++ b/config/toml.go @@ -87,7 +87,12 @@ const defaultConfigTemplate = `# This is a TOML config file. ####################################################################### # TCP or UNIX socket address of the ABCI application, +# or routing rules for routed multi-app setup, # or the name of an ABCI application compiled in with the Tendermint binary +# Example for routed multi-app setup: +# abci = "routed" +# proxy-app = "Info:socket:unix:///tmp/socket.1,Info:socket:unix:///tmp/socket.2,CheckTx:socket:unix:///tmp/socket.1,*:socket:unix:///tmp/socket.3" + proxy-app = "{{ .BaseConfig.ProxyApp }}" # A custom human readable name for this node @@ -146,7 +151,7 @@ genesis-file = "{{ js .BaseConfig.Genesis }}" # Path to the JSON file containing the private key to use for node authentication in the p2p protocol node-key-file = "{{ js .BaseConfig.NodeKey }}" -# Mechanism to connect to the ABCI application: socket | grpc +# Mechanism to connect to the ABCI application: socket | grpc | routed abci = "{{ .BaseConfig.ABCI }}" # If true, query the ABCI app on connecting to a new peer