Skip to content

Commit

Permalink
refactor: use tube term instead of queue (#134)
Browse files Browse the repository at this point in the history
Signed-off-by: Zike Yang <zike@apache.org>
  • Loading branch information
RobertIndie committed Feb 15, 2024
1 parent 14ca732 commit 575aafc
Show file tree
Hide file tree
Showing 11 changed files with 69 additions and 71 deletions.
2 changes: 1 addition & 1 deletion benchmark/bench_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -105,7 +105,7 @@ func BenchmarkStressForBasicFuncWithMemoryQueue(b *testing.B) {

svrConf := &fs.Config{
ListenAddr: common.DefaultAddr,
QueueBuilder: func(ctx context.Context, config *fs.Config) (contube.TubeFactory, error) {
TubeBuilder: func(ctx context.Context, config *fs.Config) (contube.TubeFactory, error) {
return memoryQueueFactory, nil
},
}
Expand Down
4 changes: 2 additions & 2 deletions common/constants.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,9 +17,9 @@
package common

const (
PulsarQueueType = "pulsar"
PulsarTubeType = "pulsar"

DefaultAddr = "localhost:7300"
DefaultPulsarURL = "pulsar://localhost:6650"
DefaultQueueType = PulsarQueueType
DefaultTubeType = PulsarTubeType
)
9 changes: 5 additions & 4 deletions fs/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,10 +21,11 @@ import (
"github.com/functionstream/functionstream/fs/contube"
)

type QueueBuilder func(ctx context.Context, config *Config) (contube.TubeFactory, error)
type TubeBuilder func(ctx context.Context, config *Config) (contube.TubeFactory, error)

// Config is a struct that holds the configuration for a function stream.
type Config struct {
ListenAddr string
PulsarURL string
QueueBuilder QueueBuilder
ListenAddr string // ListenAddr is the address that the function stream REST service will listen on.
PulsarURL string // PulsarURL is the URL of the Pulsar service. It's used for the pulsar_tube
TubeBuilder TubeBuilder // TubeBuilder is a function that will be used to build the tube.
}
28 changes: 14 additions & 14 deletions fs/instance.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,12 +33,12 @@ import (
)

type FunctionInstance struct {
ctx context.Context
cancelFunc context.CancelFunc
definition *model.Function
queueFactory contube.TubeFactory
readyCh chan error
index int32
ctx context.Context
cancelFunc context.CancelFunc
definition *model.Function
tubeFactory contube.TubeFactory
readyCh chan error
index int32
}

func NewFunctionInstance(definition *model.Function, queueFactory contube.TubeFactory, index int32) *FunctionInstance {
Expand All @@ -48,12 +48,12 @@ func NewFunctionInstance(definition *model.Function, queueFactory contube.TubeFa
"function-index": index,
})
return &FunctionInstance{
ctx: ctx,
cancelFunc: cancelFunc,
definition: definition,
queueFactory: queueFactory,
readyCh: make(chan error),
index: index,
ctx: ctx,
cancelFunc: cancelFunc,
definition: definition,
tubeFactory: queueFactory,
readyCh: make(chan error),
index: index,
}
}

Expand Down Expand Up @@ -113,12 +113,12 @@ func (instance *FunctionInstance) Run() {
return
}

sourceChan, err := instance.queueFactory.NewSourceTube(instance.ctx, (&contube.SourceQueueConfig{Topics: instance.definition.Inputs, SubName: fmt.Sprintf("function-stream-%s", instance.definition.Name)}).ToConfigMap())
sourceChan, err := instance.tubeFactory.NewSourceTube(instance.ctx, (&contube.SourceQueueConfig{Topics: instance.definition.Inputs, SubName: fmt.Sprintf("function-stream-%s", instance.definition.Name)}).ToConfigMap())
if err != nil {
instance.readyCh <- errors.Wrap(err, "Error creating source event queue")
return
}
sinkChan, err := instance.queueFactory.NewSinkTube(instance.ctx, (&contube.SinkQueueConfig{Topic: instance.definition.Output}).ToConfigMap())
sinkChan, err := instance.tubeFactory.NewSinkTube(instance.ctx, (&contube.SinkQueueConfig{Topic: instance.definition.Output}).ToConfigMap())
if err != nil {
instance.readyCh <- errors.Wrap(err, "Error creating sink event queue")
return
Expand Down
18 changes: 9 additions & 9 deletions fs/manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,19 +28,19 @@ import (
)

type FunctionManager struct {
functions map[string][]*FunctionInstance
functionsLock sync.Mutex
eventQueueFactory contube.TubeFactory
functions map[string][]*FunctionInstance
functionsLock sync.Mutex
tubeFactory contube.TubeFactory
}

func NewFunctionManager(config *Config) (*FunctionManager, error) {
eventQueueFactory, err := config.QueueBuilder(context.Background(), config)
tubeFactory, err := config.TubeBuilder(context.Background(), config)
if err != nil {
return nil, err
}
return &FunctionManager{
functions: make(map[string][]*FunctionInstance),
eventQueueFactory: eventQueueFactory,
functions: make(map[string][]*FunctionInstance),
tubeFactory: tubeFactory,
}, nil
}

Expand All @@ -52,7 +52,7 @@ func (fm *FunctionManager) StartFunction(f *model.Function) error {
}
fm.functions[f.Name] = make([]*FunctionInstance, f.Replicas)
for i := int32(0); i < f.Replicas; i++ {
instance := NewFunctionInstance(f, fm.eventQueueFactory, i)
instance := NewFunctionInstance(f, fm.tubeFactory, i)
fm.functions[f.Name][i] = instance
go instance.Run()
if err := <-instance.WaitForReady(); err != nil {
Expand Down Expand Up @@ -95,7 +95,7 @@ func (fm *FunctionManager) ListFunctions() (result []string) {
func (fm *FunctionManager) ProduceEvent(name string, event contube.Record) error {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
c, err := fm.eventQueueFactory.NewSinkTube(ctx, (&contube.SinkQueueConfig{Topic: name}).ToConfigMap())
c, err := fm.tubeFactory.NewSinkTube(ctx, (&contube.SinkQueueConfig{Topic: name}).ToConfigMap())
if err != nil {
return err
}
Expand All @@ -106,7 +106,7 @@ func (fm *FunctionManager) ProduceEvent(name string, event contube.Record) error
func (fm *FunctionManager) ConsumeEvent(name string) (contube.Record, error) {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
c, err := fm.eventQueueFactory.NewSourceTube(ctx, (&contube.SourceQueueConfig{Topics: []string{name}, SubName: "consume-" + strconv.Itoa(rand.Int())}).ToConfigMap())
c, err := fm.tubeFactory.NewSourceTube(ctx, (&contube.SourceQueueConfig{Topics: []string{name}, SubName: "consume-" + strconv.Itoa(rand.Int())}).ToConfigMap())
if err != nil {
return nil, err
}
Expand Down
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ require (
github.com/bmizerany/perks v0.0.0-20230307044200-03f9df79da1e
github.com/gorilla/mux v1.8.1
github.com/pkg/errors v0.9.1
github.com/sirupsen/logrus v1.9.3
github.com/spf13/cobra v1.8.0
github.com/tetratelabs/wazero v1.6.0
golang.org/x/time v0.5.0
Expand Down Expand Up @@ -48,7 +49,6 @@ require (
github.com/prometheus/common v0.46.0 // indirect
github.com/prometheus/procfs v0.12.0 // indirect
github.com/rogpeppe/go-internal v1.11.0 // indirect
github.com/sirupsen/logrus v1.9.3 // indirect
github.com/spaolacci/murmur3 v1.1.0 // indirect
github.com/spf13/pflag v1.0.5 // indirect
go.uber.org/atomic v1.11.0 // indirect
Expand Down
16 changes: 8 additions & 8 deletions perf/perf.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,32 +38,32 @@ type Config struct {
PulsarURL string
RequestRate float64
Func *restclient.Function
QueueBuilder fs.QueueBuilder
QueueBuilder fs.TubeBuilder
}

type Perf interface {
Run(context.Context)
}

type perf struct {
config *Config
input chan<- contube.Record
output <-chan contube.Record
queueBuilder fs.QueueBuilder
config *Config
input chan<- contube.Record
output <-chan contube.Record
tubeBuilder fs.TubeBuilder
}

func New(config *Config) Perf {
p := &perf{
config: config,
}
if config.QueueBuilder == nil {
p.queueBuilder = func(ctx context.Context, c *fs.Config) (contube.TubeFactory, error) {
p.tubeBuilder = func(ctx context.Context, c *fs.Config) (contube.TubeFactory, error) {
return contube.NewPulsarEventQueueFactory(ctx, (&contube.PulsarTubeFactoryConfig{
PulsarURL: config.PulsarURL,
}).ToConfigMap())
}
} else {
p.queueBuilder = config.QueueBuilder
p.tubeBuilder = config.QueueBuilder
}
return p
}
Expand Down Expand Up @@ -96,7 +96,7 @@ func (p *perf) Run(ctx context.Context) {
PulsarURL: p.config.PulsarURL,
}

queueFactory, err := p.queueBuilder(ctx, config)
queueFactory, err := p.tubeBuilder(ctx, config)
if err != nil {
slog.Error(
"Failed to create Record Queue Factory",
Expand Down
29 changes: 15 additions & 14 deletions restclient/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,9 @@
No description provided (generated by Openapi Generator https://github.com/openapitools/openapi-generator)

## Overview
This API client was generated by the [OpenAPI Generator](https://openapi-generator.tech) project. By using the [OpenAPI-spec](https://www.openapis.org/) from a remote server, you can easily generate an API client.

This API client was generated by the [OpenAPI Generator](https://openapi-generator.tech) project. By using
the [OpenAPI-spec](https://www.openapis.org/) from a remote server, you can easily generate an API client.

- API version: 0.1.0
- Package version: 1.0.0
Expand Down Expand Up @@ -44,7 +46,8 @@ ctx := context.WithValue(context.Background(), restclient.ContextServerIndex, 1)

### Templated Server URL

Templated server URL is formatted using default variables from configuration or from context value `restclient.ContextServerVariables` of type `map[string]string`.
Templated server URL is formatted using default variables from configuration or from context
value `restclient.ContextServerVariables` of type `map[string]string`.

```go
ctx := context.WithValue(context.Background(), restclient.ContextServerVariables, map[string]string{
Expand All @@ -58,7 +61,8 @@ Note, enum values are always validated and all unused variables are silently ign

Each operation can use different server URL defined using `OperationServers` map in the `Configuration`.
An operation is uniquely identified by `"{classname}Service.{nickname}"` string.
Similar rules for overriding default operation server index and variables applies by using `restclient.ContextOperationServerIndices` and `restclient.ContextOperationServerVariables` context maps.
Similar rules for overriding default operation server index and variables applies by
using `restclient.ContextOperationServerIndices` and `restclient.ContextOperationServerVariables` context maps.

```go
ctx := context.WithValue(context.Background(), restclient.ContextOperationServerIndices, map[string]int{
Expand All @@ -75,25 +79,22 @@ ctx = context.WithValue(context.Background(), restclient.ContextOperationServerV

All URIs are relative to *http://localhost:7300*

Class | Method | HTTP request | Description
------------ | ------------- | ------------- | -------------
*DefaultAPI* | [**ApiV1ConsumeQueueNameGet**](docs/DefaultAPI.md#apiv1consumequeuenameget) | **Get** /api/v1/consume/{queue_name} | Consumes an event from a queue
*DefaultAPI* | [**ApiV1FunctionFunctionNameDelete**](docs/DefaultAPI.md#apiv1functionfunctionnamedelete) | **Delete** /api/v1/function/{function_name} | Deletes a function
*DefaultAPI* | [**ApiV1FunctionFunctionNamePost**](docs/DefaultAPI.md#apiv1functionfunctionnamepost) | **Post** /api/v1/function/{function_name} | Starts a function
*DefaultAPI* | [**ApiV1FunctionsGet**](docs/DefaultAPI.md#apiv1functionsget) | **Get** /api/v1/functions | Returns a list of functions
*DefaultAPI* | [**ApiV1ProduceQueueNamePut**](docs/DefaultAPI.md#apiv1producequeuenameput) | **Put** /api/v1/produce/{queue_name} | Produces an event to a queue

Class | Method | HTTP request | Description
--------------|-------------------------------------------------------------------------------------------|---------------------------------------------|--------------------------------
*DefaultAPI* | [**ApiV1ConsumeQueueNameGet**](docs/DefaultAPI.md#apiv1consumequeuenameget) | **Get** /api/v1/consume/{queue_name} | Consumes an event from a queue
*DefaultAPI* | [**ApiV1FunctionFunctionNameDelete**](docs/DefaultAPI.md#apiv1functionfunctionnamedelete) | **Delete** /api/v1/function/{function_name} | Deletes a function
*DefaultAPI* | [**ApiV1FunctionFunctionNamePost**](docs/DefaultAPI.md#apiv1functionfunctionnamepost) | **Post** /api/v1/function/{function_name} | Starts a function
*DefaultAPI* | [**ApiV1FunctionsGet**](docs/DefaultAPI.md#apiv1functionsget) | **Get** /api/v1/functions | Returns a list of functions
*DefaultAPI* | [**ApiV1ProduceQueueNamePut**](docs/DefaultAPI.md#apiv1producequeuenameput) | **Put** /api/v1/produce/{queue_name} | Produces an event to a queue

## Documentation For Models

- [Function](docs/Function.md)

- [Function](docs/Function.md)

## Documentation For Authorization

Endpoints do not require authorization.


## Documentation for Utility Methods

Due to the fact that model structure members are all pointers, this package contains
Expand Down
20 changes: 8 additions & 12 deletions restclient/docs/Function.md
Original file line number Diff line number Diff line change
Expand Up @@ -2,14 +2,14 @@

## Properties

Name | Type | Description | Notes
------------ | ------------- | ------------- | -------------
**Name** | Pointer to **string** | | [optional]
**Archive** | **string** | |
**Inputs** | **[]string** | |
**Output** | **string** | |
**Replicas** | Pointer to **int32** | | [optional]
**Config** | Pointer to **map[string]string** | | [optional]
Name | Type | Description | Notes
--------------|----------------------------------|-------------|------------
**Name** | Pointer to **string** | | [optional]
**Archive** | **string** | |
**Inputs** | **[]string** | |
**Output** | **string** | |
**Replicas** | Pointer to **int32** | | [optional]
**Config** | Pointer to **map[string]string** | | [optional]

## Methods

Expand Down Expand Up @@ -74,7 +74,6 @@ and a boolean to check if the value has been set.

SetArchive sets Archive field to given value.


### GetInputs

`func (o *Function) GetInputs() []string`
Expand All @@ -94,7 +93,6 @@ and a boolean to check if the value has been set.

SetInputs sets Inputs field to given value.


### GetOutput

`func (o *Function) GetOutput() string`
Expand All @@ -114,7 +112,6 @@ and a boolean to check if the value has been set.

SetOutput sets Output field to given value.


### GetReplicas

`func (o *Function) GetReplicas() int32`
Expand Down Expand Up @@ -165,7 +162,6 @@ SetConfig sets Config field to given value.

HasConfig returns a boolean if a field has been set.


[[Back to Model list]](../README.md#documentation-for-models) [[Back to API list]](../README.md#documentation-for-api-endpoints) [[Back to README]](../README.md)


10 changes: 5 additions & 5 deletions server/config_loader.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,10 +36,10 @@ func LoadConfigFromEnv() *fs.Config {
ListenAddr: getEnvWithDefault("LISTEN_ADDR", common.DefaultAddr),
PulsarURL: getEnvWithDefault("PULSAR_URL", common.DefaultPulsarURL),
}
queueType := getEnvWithDefault("QUEUE_TYPE", common.DefaultQueueType)
switch queueType {
case common.PulsarQueueType:
loadedConfig.QueueBuilder = func(ctx context.Context, c *fs.Config) (contube.TubeFactory, error) {
tubeType := getEnvWithDefault("TUBE_TYPE", common.DefaultTubeType)
switch tubeType {
case common.PulsarTubeType:
loadedConfig.TubeBuilder = func(ctx context.Context, c *fs.Config) (contube.TubeFactory, error) {
return contube.NewPulsarEventQueueFactory(ctx, (&contube.PulsarTubeFactoryConfig{
PulsarURL: c.PulsarURL,
}).ToConfigMap())
Expand All @@ -54,7 +54,7 @@ func LoadStandaloneConfigFromEnv() *fs.Config {
loadedConfig = &fs.Config{
ListenAddr: getEnvWithDefault("LISTEN_ADDR", common.DefaultAddr),
}
loadedConfig.QueueBuilder = func(ctx context.Context, c *fs.Config) (contube.TubeFactory, error) {
loadedConfig.TubeBuilder = func(ctx context.Context, c *fs.Config) (contube.TubeFactory, error) {
return contube.NewMemoryQueueFactory(ctx), nil
}
})
Expand Down
2 changes: 1 addition & 1 deletion server/server_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ func TestStandaloneBasicFunction(t *testing.T) {

conf := &fs.Config{
ListenAddr: "localhost:7301",
QueueBuilder: func(ctx context.Context, config *fs.Config) (contube.TubeFactory, error) {
TubeBuilder: func(ctx context.Context, config *fs.Config) (contube.TubeFactory, error) {
return contube.NewMemoryQueueFactory(ctx), nil
},
}
Expand Down

0 comments on commit 575aafc

Please sign in to comment.