From 575aafc46e70709ef4c2c6a733fe36ecf6cbb267 Mon Sep 17 00:00:00 2001 From: Zike Yang Date: Thu, 15 Feb 2024 16:04:44 +0800 Subject: [PATCH] refactor: use tube term instead of queue (#134) Signed-off-by: Zike Yang --- benchmark/bench_test.go | 2 +- common/constants.go | 4 ++-- fs/config.go | 9 +++++---- fs/instance.go | 28 ++++++++++++++-------------- fs/manager.go | 18 +++++++++--------- go.mod | 2 +- perf/perf.go | 16 ++++++++-------- restclient/README.md | 29 +++++++++++++++-------------- restclient/docs/Function.md | 20 ++++++++------------ server/config_loader.go | 10 +++++----- server/server_test.go | 2 +- 11 files changed, 69 insertions(+), 71 deletions(-) diff --git a/benchmark/bench_test.go b/benchmark/bench_test.go index ab9e1cab..a757d5cd 100644 --- a/benchmark/bench_test.go +++ b/benchmark/bench_test.go @@ -105,7 +105,7 @@ func BenchmarkStressForBasicFuncWithMemoryQueue(b *testing.B) { svrConf := &fs.Config{ ListenAddr: common.DefaultAddr, - QueueBuilder: func(ctx context.Context, config *fs.Config) (contube.TubeFactory, error) { + TubeBuilder: func(ctx context.Context, config *fs.Config) (contube.TubeFactory, error) { return memoryQueueFactory, nil }, } diff --git a/common/constants.go b/common/constants.go index 21bcc9b0..7f376e1a 100644 --- a/common/constants.go +++ b/common/constants.go @@ -17,9 +17,9 @@ package common const ( - PulsarQueueType = "pulsar" + PulsarTubeType = "pulsar" DefaultAddr = "localhost:7300" DefaultPulsarURL = "pulsar://localhost:6650" - DefaultQueueType = PulsarQueueType + DefaultTubeType = PulsarTubeType ) diff --git a/fs/config.go b/fs/config.go index 18b0a114..d9420e9f 100644 --- a/fs/config.go +++ b/fs/config.go @@ -21,10 +21,11 @@ import ( "github.com/functionstream/functionstream/fs/contube" ) -type QueueBuilder func(ctx context.Context, config *Config) (contube.TubeFactory, error) +type TubeBuilder func(ctx context.Context, config *Config) (contube.TubeFactory, error) +// Config is a struct that holds the configuration for a function stream. type Config struct { - ListenAddr string - PulsarURL string - QueueBuilder QueueBuilder + ListenAddr string // ListenAddr is the address that the function stream REST service will listen on. + PulsarURL string // PulsarURL is the URL of the Pulsar service. It's used for the pulsar_tube + TubeBuilder TubeBuilder // TubeBuilder is a function that will be used to build the tube. } diff --git a/fs/instance.go b/fs/instance.go index a5e1d1fb..23493192 100644 --- a/fs/instance.go +++ b/fs/instance.go @@ -33,12 +33,12 @@ import ( ) type FunctionInstance struct { - ctx context.Context - cancelFunc context.CancelFunc - definition *model.Function - queueFactory contube.TubeFactory - readyCh chan error - index int32 + ctx context.Context + cancelFunc context.CancelFunc + definition *model.Function + tubeFactory contube.TubeFactory + readyCh chan error + index int32 } func NewFunctionInstance(definition *model.Function, queueFactory contube.TubeFactory, index int32) *FunctionInstance { @@ -48,12 +48,12 @@ func NewFunctionInstance(definition *model.Function, queueFactory contube.TubeFa "function-index": index, }) return &FunctionInstance{ - ctx: ctx, - cancelFunc: cancelFunc, - definition: definition, - queueFactory: queueFactory, - readyCh: make(chan error), - index: index, + ctx: ctx, + cancelFunc: cancelFunc, + definition: definition, + tubeFactory: queueFactory, + readyCh: make(chan error), + index: index, } } @@ -113,12 +113,12 @@ func (instance *FunctionInstance) Run() { return } - sourceChan, err := instance.queueFactory.NewSourceTube(instance.ctx, (&contube.SourceQueueConfig{Topics: instance.definition.Inputs, SubName: fmt.Sprintf("function-stream-%s", instance.definition.Name)}).ToConfigMap()) + sourceChan, err := instance.tubeFactory.NewSourceTube(instance.ctx, (&contube.SourceQueueConfig{Topics: instance.definition.Inputs, SubName: fmt.Sprintf("function-stream-%s", instance.definition.Name)}).ToConfigMap()) if err != nil { instance.readyCh <- errors.Wrap(err, "Error creating source event queue") return } - sinkChan, err := instance.queueFactory.NewSinkTube(instance.ctx, (&contube.SinkQueueConfig{Topic: instance.definition.Output}).ToConfigMap()) + sinkChan, err := instance.tubeFactory.NewSinkTube(instance.ctx, (&contube.SinkQueueConfig{Topic: instance.definition.Output}).ToConfigMap()) if err != nil { instance.readyCh <- errors.Wrap(err, "Error creating sink event queue") return diff --git a/fs/manager.go b/fs/manager.go index 5efe79aa..527718c5 100644 --- a/fs/manager.go +++ b/fs/manager.go @@ -28,19 +28,19 @@ import ( ) type FunctionManager struct { - functions map[string][]*FunctionInstance - functionsLock sync.Mutex - eventQueueFactory contube.TubeFactory + functions map[string][]*FunctionInstance + functionsLock sync.Mutex + tubeFactory contube.TubeFactory } func NewFunctionManager(config *Config) (*FunctionManager, error) { - eventQueueFactory, err := config.QueueBuilder(context.Background(), config) + tubeFactory, err := config.TubeBuilder(context.Background(), config) if err != nil { return nil, err } return &FunctionManager{ - functions: make(map[string][]*FunctionInstance), - eventQueueFactory: eventQueueFactory, + functions: make(map[string][]*FunctionInstance), + tubeFactory: tubeFactory, }, nil } @@ -52,7 +52,7 @@ func (fm *FunctionManager) StartFunction(f *model.Function) error { } fm.functions[f.Name] = make([]*FunctionInstance, f.Replicas) for i := int32(0); i < f.Replicas; i++ { - instance := NewFunctionInstance(f, fm.eventQueueFactory, i) + instance := NewFunctionInstance(f, fm.tubeFactory, i) fm.functions[f.Name][i] = instance go instance.Run() if err := <-instance.WaitForReady(); err != nil { @@ -95,7 +95,7 @@ func (fm *FunctionManager) ListFunctions() (result []string) { func (fm *FunctionManager) ProduceEvent(name string, event contube.Record) error { ctx, cancel := context.WithCancel(context.Background()) defer cancel() - c, err := fm.eventQueueFactory.NewSinkTube(ctx, (&contube.SinkQueueConfig{Topic: name}).ToConfigMap()) + c, err := fm.tubeFactory.NewSinkTube(ctx, (&contube.SinkQueueConfig{Topic: name}).ToConfigMap()) if err != nil { return err } @@ -106,7 +106,7 @@ func (fm *FunctionManager) ProduceEvent(name string, event contube.Record) error func (fm *FunctionManager) ConsumeEvent(name string) (contube.Record, error) { ctx, cancel := context.WithCancel(context.Background()) defer cancel() - c, err := fm.eventQueueFactory.NewSourceTube(ctx, (&contube.SourceQueueConfig{Topics: []string{name}, SubName: "consume-" + strconv.Itoa(rand.Int())}).ToConfigMap()) + c, err := fm.tubeFactory.NewSourceTube(ctx, (&contube.SourceQueueConfig{Topics: []string{name}, SubName: "consume-" + strconv.Itoa(rand.Int())}).ToConfigMap()) if err != nil { return nil, err } diff --git a/go.mod b/go.mod index 4b2d54cc..9cdd1251 100644 --- a/go.mod +++ b/go.mod @@ -7,6 +7,7 @@ require ( github.com/bmizerany/perks v0.0.0-20230307044200-03f9df79da1e github.com/gorilla/mux v1.8.1 github.com/pkg/errors v0.9.1 + github.com/sirupsen/logrus v1.9.3 github.com/spf13/cobra v1.8.0 github.com/tetratelabs/wazero v1.6.0 golang.org/x/time v0.5.0 @@ -48,7 +49,6 @@ require ( github.com/prometheus/common v0.46.0 // indirect github.com/prometheus/procfs v0.12.0 // indirect github.com/rogpeppe/go-internal v1.11.0 // indirect - github.com/sirupsen/logrus v1.9.3 // indirect github.com/spaolacci/murmur3 v1.1.0 // indirect github.com/spf13/pflag v1.0.5 // indirect go.uber.org/atomic v1.11.0 // indirect diff --git a/perf/perf.go b/perf/perf.go index 9f40205a..2ea94aa8 100644 --- a/perf/perf.go +++ b/perf/perf.go @@ -38,7 +38,7 @@ type Config struct { PulsarURL string RequestRate float64 Func *restclient.Function - QueueBuilder fs.QueueBuilder + QueueBuilder fs.TubeBuilder } type Perf interface { @@ -46,10 +46,10 @@ type Perf interface { } type perf struct { - config *Config - input chan<- contube.Record - output <-chan contube.Record - queueBuilder fs.QueueBuilder + config *Config + input chan<- contube.Record + output <-chan contube.Record + tubeBuilder fs.TubeBuilder } func New(config *Config) Perf { @@ -57,13 +57,13 @@ func New(config *Config) Perf { config: config, } if config.QueueBuilder == nil { - p.queueBuilder = func(ctx context.Context, c *fs.Config) (contube.TubeFactory, error) { + p.tubeBuilder = func(ctx context.Context, c *fs.Config) (contube.TubeFactory, error) { return contube.NewPulsarEventQueueFactory(ctx, (&contube.PulsarTubeFactoryConfig{ PulsarURL: config.PulsarURL, }).ToConfigMap()) } } else { - p.queueBuilder = config.QueueBuilder + p.tubeBuilder = config.QueueBuilder } return p } @@ -96,7 +96,7 @@ func (p *perf) Run(ctx context.Context) { PulsarURL: p.config.PulsarURL, } - queueFactory, err := p.queueBuilder(ctx, config) + queueFactory, err := p.tubeBuilder(ctx, config) if err != nil { slog.Error( "Failed to create Record Queue Factory", diff --git a/restclient/README.md b/restclient/README.md index 6a0e09cc..636fb63e 100644 --- a/restclient/README.md +++ b/restclient/README.md @@ -3,7 +3,9 @@ No description provided (generated by Openapi Generator https://github.com/openapitools/openapi-generator) ## Overview -This API client was generated by the [OpenAPI Generator](https://openapi-generator.tech) project. By using the [OpenAPI-spec](https://www.openapis.org/) from a remote server, you can easily generate an API client. + +This API client was generated by the [OpenAPI Generator](https://openapi-generator.tech) project. By using +the [OpenAPI-spec](https://www.openapis.org/) from a remote server, you can easily generate an API client. - API version: 0.1.0 - Package version: 1.0.0 @@ -44,7 +46,8 @@ ctx := context.WithValue(context.Background(), restclient.ContextServerIndex, 1) ### Templated Server URL -Templated server URL is formatted using default variables from configuration or from context value `restclient.ContextServerVariables` of type `map[string]string`. +Templated server URL is formatted using default variables from configuration or from context +value `restclient.ContextServerVariables` of type `map[string]string`. ```go ctx := context.WithValue(context.Background(), restclient.ContextServerVariables, map[string]string{ @@ -58,7 +61,8 @@ Note, enum values are always validated and all unused variables are silently ign Each operation can use different server URL defined using `OperationServers` map in the `Configuration`. An operation is uniquely identified by `"{classname}Service.{nickname}"` string. -Similar rules for overriding default operation server index and variables applies by using `restclient.ContextOperationServerIndices` and `restclient.ContextOperationServerVariables` context maps. +Similar rules for overriding default operation server index and variables applies by +using `restclient.ContextOperationServerIndices` and `restclient.ContextOperationServerVariables` context maps. ```go ctx := context.WithValue(context.Background(), restclient.ContextOperationServerIndices, map[string]int{ @@ -75,25 +79,22 @@ ctx = context.WithValue(context.Background(), restclient.ContextOperationServerV All URIs are relative to *http://localhost:7300* -Class | Method | HTTP request | Description ------------- | ------------- | ------------- | ------------- -*DefaultAPI* | [**ApiV1ConsumeQueueNameGet**](docs/DefaultAPI.md#apiv1consumequeuenameget) | **Get** /api/v1/consume/{queue_name} | Consumes an event from a queue -*DefaultAPI* | [**ApiV1FunctionFunctionNameDelete**](docs/DefaultAPI.md#apiv1functionfunctionnamedelete) | **Delete** /api/v1/function/{function_name} | Deletes a function -*DefaultAPI* | [**ApiV1FunctionFunctionNamePost**](docs/DefaultAPI.md#apiv1functionfunctionnamepost) | **Post** /api/v1/function/{function_name} | Starts a function -*DefaultAPI* | [**ApiV1FunctionsGet**](docs/DefaultAPI.md#apiv1functionsget) | **Get** /api/v1/functions | Returns a list of functions -*DefaultAPI* | [**ApiV1ProduceQueueNamePut**](docs/DefaultAPI.md#apiv1producequeuenameput) | **Put** /api/v1/produce/{queue_name} | Produces an event to a queue - + Class | Method | HTTP request | Description +--------------|-------------------------------------------------------------------------------------------|---------------------------------------------|-------------------------------- + *DefaultAPI* | [**ApiV1ConsumeQueueNameGet**](docs/DefaultAPI.md#apiv1consumequeuenameget) | **Get** /api/v1/consume/{queue_name} | Consumes an event from a queue + *DefaultAPI* | [**ApiV1FunctionFunctionNameDelete**](docs/DefaultAPI.md#apiv1functionfunctionnamedelete) | **Delete** /api/v1/function/{function_name} | Deletes a function + *DefaultAPI* | [**ApiV1FunctionFunctionNamePost**](docs/DefaultAPI.md#apiv1functionfunctionnamepost) | **Post** /api/v1/function/{function_name} | Starts a function + *DefaultAPI* | [**ApiV1FunctionsGet**](docs/DefaultAPI.md#apiv1functionsget) | **Get** /api/v1/functions | Returns a list of functions + *DefaultAPI* | [**ApiV1ProduceQueueNamePut**](docs/DefaultAPI.md#apiv1producequeuenameput) | **Put** /api/v1/produce/{queue_name} | Produces an event to a queue ## Documentation For Models - - [Function](docs/Function.md) - +- [Function](docs/Function.md) ## Documentation For Authorization Endpoints do not require authorization. - ## Documentation for Utility Methods Due to the fact that model structure members are all pointers, this package contains diff --git a/restclient/docs/Function.md b/restclient/docs/Function.md index 95edbc91..8632d438 100644 --- a/restclient/docs/Function.md +++ b/restclient/docs/Function.md @@ -2,14 +2,14 @@ ## Properties -Name | Type | Description | Notes ------------- | ------------- | ------------- | ------------- -**Name** | Pointer to **string** | | [optional] -**Archive** | **string** | | -**Inputs** | **[]string** | | -**Output** | **string** | | -**Replicas** | Pointer to **int32** | | [optional] -**Config** | Pointer to **map[string]string** | | [optional] + Name | Type | Description | Notes +--------------|----------------------------------|-------------|------------ + **Name** | Pointer to **string** | | [optional] + **Archive** | **string** | | + **Inputs** | **[]string** | | + **Output** | **string** | | + **Replicas** | Pointer to **int32** | | [optional] + **Config** | Pointer to **map[string]string** | | [optional] ## Methods @@ -74,7 +74,6 @@ and a boolean to check if the value has been set. SetArchive sets Archive field to given value. - ### GetInputs `func (o *Function) GetInputs() []string` @@ -94,7 +93,6 @@ and a boolean to check if the value has been set. SetInputs sets Inputs field to given value. - ### GetOutput `func (o *Function) GetOutput() string` @@ -114,7 +112,6 @@ and a boolean to check if the value has been set. SetOutput sets Output field to given value. - ### GetReplicas `func (o *Function) GetReplicas() int32` @@ -165,7 +162,6 @@ SetConfig sets Config field to given value. HasConfig returns a boolean if a field has been set. - [[Back to Model list]](../README.md#documentation-for-models) [[Back to API list]](../README.md#documentation-for-api-endpoints) [[Back to README]](../README.md) diff --git a/server/config_loader.go b/server/config_loader.go index b1ff4d75..6b384b56 100644 --- a/server/config_loader.go +++ b/server/config_loader.go @@ -36,10 +36,10 @@ func LoadConfigFromEnv() *fs.Config { ListenAddr: getEnvWithDefault("LISTEN_ADDR", common.DefaultAddr), PulsarURL: getEnvWithDefault("PULSAR_URL", common.DefaultPulsarURL), } - queueType := getEnvWithDefault("QUEUE_TYPE", common.DefaultQueueType) - switch queueType { - case common.PulsarQueueType: - loadedConfig.QueueBuilder = func(ctx context.Context, c *fs.Config) (contube.TubeFactory, error) { + tubeType := getEnvWithDefault("TUBE_TYPE", common.DefaultTubeType) + switch tubeType { + case common.PulsarTubeType: + loadedConfig.TubeBuilder = func(ctx context.Context, c *fs.Config) (contube.TubeFactory, error) { return contube.NewPulsarEventQueueFactory(ctx, (&contube.PulsarTubeFactoryConfig{ PulsarURL: c.PulsarURL, }).ToConfigMap()) @@ -54,7 +54,7 @@ func LoadStandaloneConfigFromEnv() *fs.Config { loadedConfig = &fs.Config{ ListenAddr: getEnvWithDefault("LISTEN_ADDR", common.DefaultAddr), } - loadedConfig.QueueBuilder = func(ctx context.Context, c *fs.Config) (contube.TubeFactory, error) { + loadedConfig.TubeBuilder = func(ctx context.Context, c *fs.Config) (contube.TubeFactory, error) { return contube.NewMemoryQueueFactory(ctx), nil } }) diff --git a/server/server_test.go b/server/server_test.go index 7528aa08..c431f27f 100644 --- a/server/server_test.go +++ b/server/server_test.go @@ -32,7 +32,7 @@ func TestStandaloneBasicFunction(t *testing.T) { conf := &fs.Config{ ListenAddr: "localhost:7301", - QueueBuilder: func(ctx context.Context, config *fs.Config) (contube.TubeFactory, error) { + TubeBuilder: func(ctx context.Context, config *fs.Config) (contube.TubeFactory, error) { return contube.NewMemoryQueueFactory(ctx), nil }, }