diff --git a/.vscode/settings.json b/.vscode/settings.json index 063394a..8851028 100644 --- a/.vscode/settings.json +++ b/.vscode/settings.json @@ -12,6 +12,7 @@ "cenkalti", "Chasinga", "cmds", + "Cobrass", "coverpkg", "coverprofile", "cubiest", diff --git a/README.md b/README.md index 48c2f95..990d999 100644 --- a/README.md +++ b/README.md @@ -36,17 +36,11 @@ ___Reactive extensions for go (limited set of functionality as required by snivilised)___ -This project has been setup for the currency needs of __snivilised__ projects. Ordinarily, I would have used [___RxGo___](https://github.com/ReactiveX/RxGo) for reactive functionality, but it is currently not in a working state and seems to be abandoned. So ___lorax___ will aim to fill the gap, implementing the functionality required by __snivilised__ projects (in particular ___extendio___, which requires the ability to navigate a directory tree concurrently). +This project has been setup for the currency needs of __snivilised__ projects. Ordinarily, I would have used [___RxGo___](https://github.com/ReactiveX/RxGo) for reactive functionality, but it is currently not in a working state and seems to be abandoned. So ___lorax/rx___ will aim to fill the gap, implementing the functionality required by __snivilised__ projects (in particular ___extendio___, which requires the ability to navigate a directory tree concurrently). -The intention is to write a version of RxGo with identical semantics as the original, having fixed some of the outstanding issues in RxGo reported by other users and possibly introduce new features. If and when version 3 of RxGo becomes available, then lorax/rx will be considered surplice to requirements and retired. +The intention is to write a version of RxGo with identical semantics as the original, having fixed some of the outstanding issues in RxGo reported by other users and possibly introduce new features. If and when version 3 of RxGo becomes available, then ___lorax/rx___ will be considered surplice to requirements and retired. -___lorax___ will also provide concurrent functionality via an abstraction that implements the __worker pool__ pattern. - -## 📚 Usage - -Please refer to the documentation already available on [___RxGo___](https://github.com/ReactiveX/RxGo). The documentation here focuses on the specific differences of usage between lorax/rx vs RxGo. - -💤 tbd +___lorax/boost___ also provides a worker pool implementation using [🐜🐜🐜 ants](https://github.com/panjf2000/ants). ## 🎀 Features @@ -61,218 +55,17 @@ Please refer to the documentation already available on [___RxGo___](https://gith + i18n with [go-i18n](https://github.com/nicksnyder/go-i18n) + linting configuration and pre-commit hooks, (see: [linting-golang](https://freshman.tech/linting-golang/)). -## 🚀 Rx - -In the absence of a version 3 of [___RxGo___](https://github.com/ReactiveX/RxGo), lorax provides an alternative implementation, ___rx___, that uses generics over reflection. Use of generics simplifies some issues but introduces complexity in other areas. For example, the ___AverageXXX___ (eg ___AverageFloat32___) rxgo operator contains 'overloads' for the different numeric types. This overloading is not required when generics are in play and in __rx__, all the ___AverageXXX___ variants are replaced by a single ___Average___ operator. - -Conversely, now that the channels have been made type specific, it is now no longer possible to send any type of value through them. An object of type __T__ can be sent via an instantiation of the newly modified ___Item___, which is now a generic, based upon type __T__. But there are times when we need to send values which are not of type T. To keep using the same channel, a solution was required to resolve this issue. This solution involves the introduction of a new field on ___Item[T]___; ___opaque___ and a discriminator field ___disc___. The opaque field is defined as ___any___ and its type is identified by the ___disc___ member. There are additional helpers to create the ___Item___ instance similar to the existing ___Of___ function and also getter functions. - -The following sections describe the issues encountered when implementing generics and how to use different aspects of the rx library using the new paradigm. - -### 💎 Item - -The new representation of an Item is as follows: - -```go -Item[T any] struct { - V T - E error - opaque any - disc enums.ItemDiscriminator -} -``` - -To minimise change, the ___V___ member remains in place and the semantics behind it remain the same. The opaque field is used to carry items not of type T. The presence of V is mutually exclusive with opaque and they are distinguished by the discriminator. An opaque instance of ___Item[T]___ is used whenever the type of value that needs to be send is independent of type ___T___. For example consider this __rx__ code: - -```go -func (op *allOperator[T]) end(ctx context.Context, dst chan<- Item[T]) { - if op.all { - True[T]().SendContext(ctx, dst) - } -} -``` - -The ___end___ method on ___allOperator___ needs to send a boolean true value (which is of course independent of type ___T___) through the channel. The ___True[T]___ helper function, creates an instance of ___Item[T]___ with the opaque field set to ___true___ and the discriminator set to ___enums.ItemDiscBoolean___. - -#### 🎓 Item helpers - -The receiver of an ___Item[T]___ needs to know what type of instance it is. Most of the time, the item will just carry a native value ___T___ and just because of the scenario in which the item is being received, the receiver knows that its a native instance. In these cases, the receiver can directly access the V member. However, there are times when the receiver does not know what type the item is. - -Various try/must helpers have been defined to make this task easier for clients. Eg, consider this helper for boolean values: - -```go -func TryBool[T any](item Item[T]) (bool, error) -``` - -This will return the opaque field cast as a bool, if the discriminator says its valid to do so or an error otherwise. There are other similar helpers for a variety of supported auxiliary types (see __must-try.go__). There are also ___MustXXX___ equivalents for each type that panic instead of returning an error. - -### 🔢 Calculator - -Various operators need to be able to compute simple numerical calculations on item values. This is no longer possible to do directly when the type of T is unknown. This has been resolved by the introduction of a calculator that must be specified as an option when using any operation that requires a numerical calculation. - -The following is an illustration of using the new ___WithCalc___ option used in conjunction with the ___Assert___ api: - -```go - rx.Assert(ctx, - testObservable[float32](ctx, - float32(1), float32(20), - ).Average(rx.WithCalc(rx.Calc[float32]())), - rx.HasItem[float32]{ - Expected: 10.5, - }, - ) -``` - -Since the calc operation is common to a variety of functions/operators, to aid simplicity of use, its better to have to only specify this once, rather than pass in a calc directly into the function/operator in use; doing so in this case where multiple operators are in play, would require passing in a calc to each and every one. ___WithCalc___ allows us to have to only specify this once. Of course, with the calculator being an option, it is easier to forget to pass one in when it is required. In this case, the operation will fail and an error returned. Any operation requiring a calc is documented as such. - -NB: rx.Calc is a pre-defined calculator instance that works for all numeric scalar types. - -### 🌀 Iteration - -The ___Range___ function needed special attention, because of the need to send numeric values through channels. Consider the legacy implementation of ___rangeIterable___: - -```go - for idx := i.start; idx <= i.start+i.count-1; idx++ { - select { - case <-ctx.Done(): - return - case next <- Of(idx): - } - } -``` - -The index ___idx___ is an int and is sent as such, but this is no longer possible. We could have used item as a ___Num___ instance, but doing so would have caused rippling knock-on effects through a chain of operators. That is to say, when ___Range___ is used in combination with an operator like ___Max___, then Max would also have to be aware of ___Item___ being a ___Num___. This leaky abstraction is undesirable, so instead, the iterator variable is declared to be of type T and therefore can be carried by a native ___Item___. This works ok for ___numeric___ scalar types (float32, int, ...), but for compound types this is a bit more complicated, imposing constraints on the type T (see ___ProxyField___). - -We now have 2 Range functions, 1 for numeric scalar types and the other for compound struct types. The core of both of these are the same: - -```go - for idx, _ := i.iterator.Start(); i.iterator.While(*idx); i.iterator.Increment(idx) { - select { - case <-ctx.Done(): - return - case next <- Of(*idx): - } - } -``` - -The iterator contains methods, that should be implemented by the client for the type ___T___. ___Start___ creates an initial value of type ___T___ and returns a pointer to it. ___While___ is a condition function that returns true whilst the iteration should continue to run and ___Increment___ receives a pointer to the index variable so that it can be incremented. - -The following is an example of how to use the Range operator for scalar types: - -```go - obs := rx.Range(&rx.NumericRangeIterator[int]{ - StartAt: 5, - By: 1, - Whilst: rx.LessThan(8), - }) -``` - -___NumericRangeIterator___ is defined for all numeric types and is therefore able to use the native operators for calculation operations. - -For struct types, the above is identical, but instead using ___RangeIteratorByProxy___: +___RxGo___ -```go - obs := rx.RangePF(&rx.RangeIteratorByProxy[widget, int]{ - StartAt: widget{id: 5}, - By: widget{id: 1}, - Whilst: rx.LessThanPF(widget{id: 8}), - }) -``` - -The user can also perform reverse iteration by using a negative ___By___ value and then using the pre-0defined ___rx.MoreThanPF___ for the ___Whilst___ condition function. - -#### 🎙️ Proxy field - -In order for ___RangePF___ to work on struct types, a constraint has to be placed on type ___T___. We need type ___T___ to have certain methods and is defined as: - -```go - Numeric interface { - constraints.Integer | constraints.Signed | constraints.Unsigned | constraints.Float - } - - ProxyField[T any, O Numeric] interface { - Field() O - Inc(index *T, by T) *T - Index(i int) *T - } -``` - -So for a type ___T___, ___T___ has to nominate a member field (the proxy) which will act as the iterator value of type O and this is the purpose of the ___Field___ function. ___Inc___ is the function that performs the increment, by the value specified as ___By___ and ___Index___ allows us to derive an index value of type ___T___ from an integer source. - -This makes for a flexible approach that allows the type T to be in control of incrementing the index value over each iteration. - -So given a domain type widget: - -```go -type widget struct { - id int - name string - amount int -} -``` - -we nominate ___id___ as the proxy field: - -```go -func (w widget) Field() int { - return w.id -} -``` - -Our incrementor is defined as: - -```go -func (w widget) Inc(index *widget, by widget) *widget { - index.id += by.id - - return index -} -``` - -This may look strange, but is necessary since the type ___T___ can not be defined with pointer receivers with respect to the ___Numeric___ constraint. The reason for this is to keep in line with the original rxgo functionality of being able to compose an observable with literal scalar values and we can't take the address of literal scalars that would be required in order to be able to define ___Inc___ as: - -```go -func (w *widget) Inc(by widget) *widget -``` - -So ___Numeric___ receivers on ___T___ being of the non pointer variety is a strict invariant. - -The aspect to focus on in ___widget.Inc___ is that ___index___ is incremented with the ___by___ value, not ___w.id___. Effectively, widget is passed a pointer to its original self as index, but w is the copy of index in which we're are running. For this to work properly, the original widget (index) must be incremented, not the copy (w), which would have no effect, resulting in an infinite loop owing to the exit condition never being met. - -#### 📨 Envelope - -The above description regarding pointer receivers on T may appear to be burdensome for prospective types. However, there is a mitigation for this in the form of the type ___Envelope[T any, O Numeric]___. This serves 2 purposes: - -+ __permit pointer receiver:__ The envelope wraps the type T addressed as a pointer and also contains a __numeric__ member P of type O. This is particularly useful large struct instances, where copying by value could be non trivial and thus inefficient. - -+ __satisfy ProxyField constraint:__ The presence of the proxy field P means that Envelope is able to implement all the methods on the ___ProxyField___ constraint, freeing the client from this obligation. - -The following is an example of how to use the ___Envelope___ with the iterator ___RangeIteratorByProxy___: - -```go - obs := rx.RangePF(&rx.RangeIteratorByProxy[rx.Envelope[nugget, int], int]{ - StartAt: rx.Envelope[nugget, int]{P: 5}, - By: rx.Envelope[nugget, int]{P: 1}, - Whilst: rx.LessThanPF(rx.Envelope[nugget, int]{P: 8}), - }) -``` - -### 🎭 Map - -The ___Map___ functionality poses a new challenge. If we wanted to map a value of type ___T___ to a value other than ___T___, the mapped to value could not be sent through the channel because it is of the wrong type. A work-around would be to use an opaque instance of Item, but then that could easily become very messy as we no longer have consistent types of emitted values which would be difficult to keep track of. - -So, in the short term, what we say is that the ___Map___ operator works, but can only map to different values of the same type, but this is also a little too restricting. Mapping to a different type is not a niche feature, but we can speculate as to what a solution would look like (this is not implemented yet, to be addressed under issue [#230](https://github.com/snivilised/lorax/issues/230)) - -The essence of the problem is that we need to represent the new type O required for ___Map___. But we can't introduce O to Item, as that would mean every other generic type would also need to be aware of O. This is incorrect as the type O would only be needed for Map and nothing else. There is no such facility to be able to define a void type. - -To fix this, we need some kind of bridging mechanism. This could be either a function or a struct, which would be defined to accept this second generic parameter. The bridge connects a source observable based on type T to a new observable chain based on type O. - -## 😵‍💫 Trouble shooting - -### Infinite range iteration +

+ rxjs +

-Be careful how the range iterator is specified. Make sure that the incrementor defined by ___By___ tends towards the exit condition specified by ___Whilst___, otherwise infinity will ensue. +To support concurrency features, ___lorax/rx___ uses the reactive model provided by [RxGo](https://github.com/ReactiveX/RxGo). However, since ___RxGo___ seems to be a dead project with its last release in April 2021 and its unit tests not currently fully running successfully, the decision has been made to re-implement this locally. One of the main reasons for the project no longer being actively maintained is the release of generics feature in Go version 1.18, and supporting generics in RxGo would require significant effort to re-write the entire library. While work on this has begun, it's unclear when this will be delivered. Despite this, the reactive model's support for concurrency is highly valued, and ___lorax/rx___ aims to make use of a minimal functionality set for parallel processing during directory traversal. The goal is to replace it with the next version of RxGo when/if it becomes available. -### Missing calc +See: -Make sure than any operator that needs a calculator is provided one via the ___WithCalc___ option. A missing calc will result in an error that indicates as such. ++ [🔆 boost worker pool](./resources/doc/WORKER-POOL.md) ++ [🦑 rx](./resources/doc/RX.md) ++ [🌐 i18n](./resources/doc/i18n-README.md) ++ [🔨 dev notes](./resources/doc/DEVELOPER-INFO.md) diff --git a/boost/worker-pool-func-manifold.go b/boost/worker-pool-func-manifold.go index 144dc21..b5bc5bb 100644 --- a/boost/worker-pool-func-manifold.go +++ b/boost/worker-pool-func-manifold.go @@ -136,15 +136,13 @@ func manifoldFuncResponse[I, O any](ctx context.Context, if job, ok := input.(Job[I]); ok { payload, e := mf(job.Input) - output := JobOutput[O]{ - ID: job.ID, - SequenceNo: job.SequenceNo, - Payload: payload, - Error: e, - } - if wi != nil { - _ = respond(ctx, wi, &output) + _ = respond(ctx, wi, &JobOutput[O]{ + ID: job.ID, + SequenceNo: job.SequenceNo, + Payload: payload, + Error: e, + }) } } } diff --git a/internal/ants/ants_test.go b/internal/ants/ants_test.go index 2dc99fa..88b10fb 100644 --- a/internal/ants/ants_test.go +++ b/internal/ants/ants_test.go @@ -69,6 +69,7 @@ var _ = Describe("Ants", func() { const poolSize = 10 pool, err := ants.NewPool(ctx, + ants.WithSize(poolSize), ants.WithMaxBlockingTasks(1), ) Expect(err).To(Succeed(), "create TimingPool failed") @@ -132,7 +133,9 @@ var _ = Describe("Ants", func() { pool, _ := ants.NewPoolWithFunc(ctx, func(i boost.InputParam) { demoPoolFunc(i) wg.Done() - }) + }, + ants.WithSize(AntsSize), + ) defer pool.Release(ctx) for i := 0; i < n; i++ { @@ -158,7 +161,10 @@ var _ = Describe("Ants", func() { pool, _ := ants.NewPoolWithFunc(ctx, func(i boost.InputParam) { demoPoolFunc(i) wg.Done() - }, ants.WithPreAlloc(true)) + }, + ants.WithSize(AntsSize), + ants.WithPreAlloc(true), + ) defer pool.Release(ctx) for i := 0; i < n; i++ { diff --git a/resources/doc/RX.md b/resources/doc/RX.md new file mode 100644 index 0000000..4d7048f --- /dev/null +++ b/resources/doc/RX.md @@ -0,0 +1,247 @@ +# 🌟 lorax/rx: ___Go Concurrency with Functional/Reactive Extensions___ + + + + + + + + + + + + + + + + + + + +## 📚 Usage + +

+ rxjs +

+ +Please refer to the documentation already available on [___RxGo___](https://github.com/ReactiveX/RxGo). The documentation here focuses on the specific differences of usage between lorax/rx vs RxGo. + +💤 tbd + +## 🚀 Rx + +In the absence of a version 3 of [___RxGo___](https://github.com/ReactiveX/RxGo), lorax provides an alternative implementation, ___rx___, that uses generics over reflection. Use of generics simplifies some issues but introduces complexity in other areas. For example, the ___AverageXXX___ (eg ___AverageFloat32___) rxgo operator contains 'overloads' for the different numeric types. This overloading is not required when generics are in play and in __rx__, all the ___AverageXXX___ variants are replaced by a single ___Average___ operator. + +Conversely, now that the channels have been made type specific, it is now no longer possible to send any type of value through them. An object of type __T__ can be sent via an instantiation of the newly modified ___Item___, which is now a generic, based upon type __T__. But there are times when we need to send values which are not of type T. To keep using the same channel, a solution was required to resolve this issue. This involves the introduction of a new field on ___Item[T]___; ___opaque___ and a discriminator field ___disc___. The opaque field is defined as ___any___ and its type is identified by the ___disc___ member. There are additional helpers to create the ___Item___ instance similar to the existing ___Of___ function and also getter functions. + +Clearly, the introduction of generics means that this implementation is not backwards compatible with the existing ___RxGo___. However, effort has been exerted to maintain _semantic_ compatibility where ever possible. + +The following sections describe the issues encountered when implementing generics and how to use different aspects of the rx library using the new paradigm. + +### 💎 Item + +The new representation of an Item is as follows: + +```go +Item[T any] struct { + V T + E error + opaque any + disc enums.ItemDiscriminator +} +``` + +To minimise change, the ___V___ member remains in place and the semantics behind it remain the same. The opaque field is used to carry items not of type T. The presence of V is mutually exclusive with opaque and they are distinguished by the discriminator. An opaque instance of ___Item[T]___ is used whenever the type of value that needs to be send is independent of type ___T___. For example consider this __rx__ code: + +```go +func (op *allOperator[T]) end(ctx context.Context, dst chan<- Item[T]) { + if op.all { + True[T]().SendContext(ctx, dst) + } +} +``` + +The ___end___ method on ___allOperator___ needs to send a boolean true value (which is of course independent of type ___T___) through the channel. The ___True[T]___ helper function, creates an instance of ___Item[T]___ with the opaque field set to ___true___ and the discriminator set to ___enums.ItemDiscBoolean___. + +#### 🎓 Item helpers + +The receiver of an ___Item[T]___ needs to know what type of instance it is. Most of the time, the item will just carry a native value ___T___ and just because of the scenario in which the item is being received, the receiver knows that its a native instance. In these cases, the receiver can directly access the V member. However, there are times when the receiver does not know what type the item is. + +Various try/must helpers have been defined to make this task easier for clients. Eg, consider this helper for boolean values: + +```go +func TryBool[T any](item Item[T]) (bool, error) +``` + +This will return the opaque field cast as a bool, if the discriminator says its valid to do so or an error otherwise. There are other similar helpers for a variety of supported auxiliary types (see __must-try.go__). There are also ___MustXXX___ equivalents for each type that panic instead of returning an error. + +### 🔢 Calculator + +Various operators need to be able to compute simple numerical calculations on item values. This is no longer possible to do directly when the type of T is unknown. This has been resolved by the introduction of a calculator that must be specified as an option when using any operation that requires a numerical calculation. + +The following is an illustration of using the new ___WithCalc___ option used in conjunction with the ___Assert___ api: + +```go + rx.Assert(ctx, + testObservable[float32](ctx, + float32(1), float32(20), + ).Average(rx.WithCalc(rx.Calc[float32]())), + rx.HasItem[float32]{ + Expected: 10.5, + }, + ) +``` + +Since the calc operation is common to a variety of functions/operators, to aid simplicity of use, its better to have to only specify this once, rather than pass in a calc directly into the function/operator in use; doing so in this case where multiple operators are in play, would require passing in a calc to each and every one. ___WithCalc___ allows us to have to only specify this once. Of course, with the calculator being an option, it is easier to forget to pass one in when it is required. In this case, the operation will fail and an error returned. Any operation requiring a calc is documented as such. + +NB: rx.Calc is a pre-defined calculator instance that works for all numeric scalar types. + +### 🌀 Iteration + +The ___Range___ function needed special attention, because of the need to send numeric values through channels. Consider the legacy implementation of ___rangeIterable___: + +```go + for idx := i.start; idx <= i.start+i.count-1; idx++ { + select { + case <-ctx.Done(): + return + case next <- Of(idx): + } + } +``` + +The index ___idx___ is an int and is sent as such, but this is no longer possible. We could have used item as a ___Num___ instance, but doing so would have caused rippling knock-on effects through a chain of operators. That is to say, when ___Range___ is used in combination with an operator like ___Max___, then Max would also have to be aware of ___Item___ being a ___Num___. This leaky abstraction is undesirable, so instead, the iterator variable is declared to be of type T and therefore can be carried by a native ___Item___. This works ok for ___numeric___ scalar types (float32, int, ...), but for compound types this is a bit more complicated, imposing constraints on the type T (see ___ProxyField___). + +We now have 2 Range functions, 1 for numeric scalar types and the other for compound struct types. The core of both of these are the same: + +```go + for idx, _ := i.iterator.Start(); i.iterator.While(*idx); i.iterator.Increment(idx) { + select { + case <-ctx.Done(): + return + case next <- Of(*idx): + } + } +``` + +The iterator contains methods, that should be implemented by the client for the type ___T___. ___Start___ creates an initial value of type ___T___ and returns a pointer to it. ___While___ is a condition function that returns true whilst the iteration should continue to run and ___Increment___ receives a pointer to the index variable so that it can be incremented. + +The following is an example of how to use the Range operator for scalar types: + +```go + obs := rx.Range(&rx.NumericRangeIterator[int]{ + StartAt: 5, + By: 1, + Whilst: rx.LessThan(8), + }) +``` + +___NumericRangeIterator___ is defined for all numeric types and is therefore able to use the native operators for calculation operations. + +For struct types, the above is identical, but instead using ___RangeIteratorByProxy___: + +```go + obs := rx.RangePF(&rx.RangeIteratorByProxy[widget, int]{ + StartAt: widget{id: 5}, + By: widget{id: 1}, + Whilst: rx.LessThanPF(widget{id: 8}), + }) +``` + +The user can also perform reverse iteration by using a negative ___By___ value and then using the pre-0defined ___rx.MoreThanPF___ for the ___Whilst___ condition function. + +#### 🎙️ Proxy field + +In order for ___RangePF___ to work on struct types, a constraint has to be placed on type ___T___. We need type ___T___ to have certain methods and is defined as: + +```go + Numeric interface { + constraints.Integer | constraints.Signed | constraints.Unsigned | constraints.Float + } + + ProxyField[T any, O Numeric] interface { + Field() O + Inc(index *T, by T) *T + Index(i int) *T + } +``` + +So for a type ___T___, ___T___ has to nominate a member field (the proxy) which will act as the iterator value of type O and this is the purpose of the ___Field___ function. ___Inc___ is the function that performs the increment, by the value specified as ___By___ and ___Index___ allows us to derive an index value of type ___T___ from an integer source. + +This makes for a flexible approach that allows the type T to be in control of incrementing the index value over each iteration. + +So given a domain type widget: + +```go +type widget struct { + id int + name string + amount int +} +``` + +we nominate ___id___ as the proxy field: + +```go +func (w widget) Field() int { + return w.id +} +``` + +Our incrementor is defined as: + +```go +func (w widget) Inc(index *widget, by widget) *widget { + index.id += by.id + + return index +} +``` + +This may look strange, but is necessary since the type ___T___ can not be defined with pointer receivers with respect to the ___Numeric___ constraint. The reason for this is to keep in line with the original rxgo functionality of being able to compose an observable with literal scalar values and we can't take the address of literal scalars that would be required in order to be able to define ___Inc___ as: + +```go +func (w *widget) Inc(by widget) *widget +``` + +So ___Numeric___ receivers on ___T___ being of the non pointer variety is a strict invariant. + +The aspect to focus on in ___widget.Inc___ is that ___index___ is incremented with the ___by___ value, not ___w.id___. Effectively, widget is passed a pointer to its original self as index, but w is the copy of index in which we're are running. For this to work properly, the original widget (index) must be incremented, not the copy (w), which would have no effect, resulting in an infinite loop owing to the exit condition never being met. + +#### 📨 Envelope + +The above description regarding pointer receivers on T may appear to be burdensome for prospective types. However, there is a mitigation for this in the form of the type ___Envelope[T any, O Numeric]___. This serves 2 purposes: + ++ __permit pointer receiver:__ The envelope wraps the type T addressed as a pointer and also contains a __numeric__ member P of type O. This is particularly useful large struct instances, where copying by value could be non trivial and thus inefficient. + ++ __satisfy ProxyField constraint:__ The presence of the proxy field P means that Envelope is able to implement all the methods on the ___ProxyField___ constraint, freeing the client from this obligation. + +The following is an example of how to use the ___Envelope___ with the iterator ___RangeIteratorByProxy___: + +```go + obs := rx.RangePF(&rx.RangeIteratorByProxy[rx.Envelope[nugget, int], int]{ + StartAt: rx.Envelope[nugget, int]{P: 5}, + By: rx.Envelope[nugget, int]{P: 1}, + Whilst: rx.LessThanPF(rx.Envelope[nugget, int]{P: 8}), + }) +``` + +### 🎭 Map + +The ___Map___ functionality poses a new challenge. If we wanted to map a value of type ___T___ to a value other than ___T___, the mapped to value could not be sent through the channel because it is of the wrong type. A work-around would be to use an opaque instance of Item, but then that could easily become very messy as we no longer have consistent types of emitted values which would be difficult to keep track of. + +So, in the short term, what we say is that the ___Map___ operator works, but can only map to different values of the same type, but this is also a little too restricting. Mapping to a different type is not a niche feature, but we can speculate as to what a solution would look like (this is not implemented yet, to be addressed under issue [#230](https://github.com/snivilised/lorax/issues/230)) + +The essence of the problem is that we need to represent the new type O required for ___Map___. But we can't introduce O to Item, as that would mean every other generic type would also need to be aware of O. This is incorrect as the type O would only be needed for Map and nothing else. There is no such facility to be able to define a void type. + +To fix this, we need some kind of bridging mechanism. This could be either a function or a struct, which would be defined to accept this second generic parameter. The bridge connects a source observable based on type T to a new observable chain based on type O. + +## 😵‍💫 Trouble shooting + +### Infinite range iteration + +Be careful how the range iterator is specified. Make sure that the incrementor defined by ___By___ tends towards the exit condition specified by ___Whilst___, otherwise infinity will ensue. + +### Missing calc + +Make sure than any operator that needs a calculator is provided one via the ___WithCalc___ option. A missing calc will result in an error that indicates as such. diff --git a/resources/doc/WORKER-POOL.md b/resources/doc/WORKER-POOL.md new file mode 100644 index 0000000..8c143f4 --- /dev/null +++ b/resources/doc/WORKER-POOL.md @@ -0,0 +1,165 @@ +# 🔆 lorax/boost: ___🐜🐜🐜 ants based worker pool___ + + + + + + + + + + + + + + + + + + + +## 📚 Usage + +Please refer to the documentation already available at [🐜🐜🐜 ___ants___](https://github.com/panjf2000/ants). The documentation here focuses on the functionality provided that augments the underlying ___ants___ implementation. + +Included in this repo are some executable examples that help explain how the pool works and demonstrates some key characteristics that will aid in understanding of how to correctly use this package. For more detailed explanation of the _Options_, the reader is encouraged to read the ants documentation. + +## 🎀 Additional Features + +The ___ants___ implementation was chosen because it has already proven itself in production, having a wide install base and addresses scalability and reliability issues. However after review of its features, it was discovered that there were a few supplementary features that it did not possess including the following: + ++ __no top level client defined context__: this means there is no way for the client to cancel an operation using idiomatic ___Go___ techniques. ++ __no job return error__: that is to say, whenever a job is executed, there is no notification of wether it executed successfully or not. Rather, it has been implemented on a _fire and forget_ basis. ++ __no job output__: similar to the lack of an error result for each job, there is no way for the result of an operation to be collated; eg the client may request that the pool perform some task that contains a result. In the ants implementation, there is no native way to return an output for each job. ++ __no input channel__: the client needs direct access to the pool instance in order to submit tasks with a function call. However, there are benefits including but not limited to reduced coupling. With an input channel, the client can pass this channel to another entity capable of generating a workload without having direct access to the pool itself, all they need to to do is simply write to the channel. + +## 💫 ManifoldFuncPool + +### 🚀 Quick start + +#### 📌 Create pool with output + +```go + pool, err := boost.NewManifoldFuncPool( + ctx, func(input int) (int, error) { + // client implementation; output = something + + return output, nil + }, &wg, + boost.WithSize(PoolSize), + boost.WithOutput(OutputChSize, CheckCloseInterval, TimeoutOnSend), + ) +``` + +Creates an _int_ based manifold worker pool. The ___ManifoldFuncPool___ is a generic whose type parameters represents the Input type _I_ and the output type _O_. In this example, the input and output types are both _int_ as denoted by the signature of the manifold function: + +> func(input int) (int, error) + +NB: It is not mandatory to require workers to send outputs. If the ___WithOutput___ option is not specified, then an output will still occur, but will be ignored. + +#### 📌 Submit work + +There are 2 ways to submit work to the pool, either directly or by input channel + ++ direct(Post): + +```go + pool.Post(ctx, 42) + + ... + pool.Conclude(ctx) +``` + +Sends a job to the pool with int based input value 42. Typically, the Post would be issued multiple times as needs demands. At some point we are done submitting work. The end of the workload needs to be communicated to the pool. This is the purpose of invoking Conclude. + ++ via input channel(Source): + +```go + inputCh := pool.Source(ctx, wg) + inputCh <- 42 + + ... + close(inputCh) +``` + +Sends a job to the pool with int based input value 42, via the input channel. At the end of the workload, all we need to do is close the channel; we do not need to invoke ___Conclude___ explicitly as this is done automatically on our behalf as a result of the channel closure. + +#### 📌 Consume outputs + +Outputs can be consumed simply by invoking ___pool.Observe___ which returns a channel: + +```go + select { + case output := <-pool.Observe(): + fmt.Printf("🍒 payload: '%v', id: '%v', seq: '%v' (e: '%v')\n", + output.Payload, output.ID, output.SequenceNo, output.Error, + ) + case <-ctx.Done(): + return + } +``` + +Each output is represented by a ___JobOutput___ which contains a _Payload_ field representing the job's result and some supplementary meta data fields, including a sequence number and a job ID. + +It is possible to range over the output channel as illustrated: + +```go + for output := range pool.Observe() { + fmt.Printf("🍒 payload: '%v', id: '%v', seq: '%v' (e: '%v')\n", + output.Payload, output.ID, output.SequenceNo, output.Error, + ) + } +``` + +This will work in success cases, but what happens if a worker send timeout occurs? The worker will send a cancellation request and the context will be cancelled as a result. But since the range operator is not pre-empted as a result of this cancellation, it will continue to block, waiting for either more content or channel closure. If the main Go routine is blocking on a WaitGroup, which it almost certainly should be, the program will deadlock on the wait. For this reason, it is recommended to use a select statement as shown. + +#### 📌 Monitor the cancellation channel + +Currently, the only reason for a worker to request a cancellation is that it is unable to send an output. Any request cancellation must be addressed by the client, this means invoking the cancel function associated with the context. + +The client can delegate this responsibility to a pre defined function in boost: ___StartCancellationMonitor___: + +```go + if cc := pool.CancelCh(); cc != nil { + boost.StartCancellationMonitor(ctx, cancel, &wg, cc, func() { + fmt.Print("🔴 cancellation received, cancelling...\n") + }) + } +``` + +Note, the client is able to pass in a callback function which is invoked, if cancellation occurs. Also, note that there is no need to increment the wait group as that is done internally. + +## 📝 Design + +In designing the augmented functionality, it was discovered that there could conceivably be more than 1 abstraction, depending on the client's needs. From the perspective of ___snivilised___ projects, the key requirement was to have a pool that could execute jobs and for each one, return an error code and an output. The name given to this implementation is the ___ManifoldFuncPool___. + +In ___ants___, there are 2 main implementations of worker pool, ___Pool___ or ___PoolFunc___. + ++ ___Pool___: accepts new jobs represented by a function. Each function can implement any logic, so the pool is in fact able to execute a stream of heterogenous tasks. ++ ___PoolFunc___: the pool is created with a pre-defined function and accepts new jobs specified as an input to this pool function. So every job the pool executes, runs the same functionality but with a different input. + +___ManifoldFuncPool___ is based on the ___PoolFunc___ implementation. However, ___PoolFunc___ does not return either an output or an error, ___ManifoldFuncPool___ allows for this behaviour by allowing the client to define a function (_manifold function_) whose signature allows for an input of a specific type, along with an output and error. ___ManifoldFuncPool___ therefore provides a mapping from the _manifold function_ to the ants function (_PoolFunc_). + +As previously mentioned, ___boost___ could provide many more worker pool abstractions, eg there could be a ___ManifoldTaskPool___ based upon the ___Pool___ implementation. However, ___ManifoldTaskPool___ is not currently defined as there is no established need for one. Similarly, boost could provide a ___PoolFunc___ based pool whose client function only returns an error. Future versions of lorax/boost could provide these alternative implementations if such a need arises. + +### Context + +The ___NewManifoldFuncPool___ constructor function accepts a context, that works in exactly the way one would expect. Any internal Go routine works with this context. If the client cancels this context, then this will be propagated to all child Go routines including the workers in the pool. + +### Cancellation + +The need to send output back to the client for each job presents us with an additional problem. Once the need for output has been declared via use of the ___WithOutput___ option, there is an obligation on the client to consume it. Failure to consume, will result in the eventual blockage of the entire worker pool; the pool will get to a state where all workers are blocking on their attempt to send the output, the output buffer is full and new incoming requests can no longer be dispatched to workers, as they are all busy, resulting in deadlock. This may just be a programming error, but it would be undesirable for the pool to simply end up in deadlock. + +This has been alleviated by the use of a timeout mechanism. The ___WithOutput___ option takes a timeout parameter defined as a ___time.Duration___. When the worker timeouts out attempting to send the output, it will then send a cancellation request back to the client via a separate cancellation channel (obtained by invoking ___ManifoldFuncPool.CancelCh___). + +Since context cancellation should only be initiated by the client, the onus is on them to cancel the context. However, the way in which this would be done amounts to some boilerplate code, so ___boost___ also provides this as a function ___StartCancellationMonitor___, which starts a Go routine that monitors the cancellation channel for requests and on seeing one, cancels the associated context. This results in all child Go routines abandoning their work when they are able and exiting gracefully. This means that we can avoid the deadlock and leaked Go routines. + +### Conclude + +The pool needs to close the output channel so the consumer knows to exit it's read loop, but it can only do so once its clear there are no more outstanding jobs to complete and all workers are idle. We can't close the channel prematurely as that would result in a panic when a worker attempts to send the output. + +Typically, the use of the ___WithOutput___ operator looks like this: + +> boost.WithOutput(OutputChSize, CheckCloseInterval, TimeoutOnSend) + +The ___CheckCloseInterval___ parameter is internally required by ___pool.Conclude___. To counter the problem described above, ___Conclude___ needs to check if its safe to close the output channel, periodically, which is implemented within another Go routine. ___CheckCloseInterval___ denotes the amount of time it will wait before checking again. diff --git a/rx/LICENSE b/rx/LICENSE new file mode 100644 index 0000000..1c00bfa --- /dev/null +++ b/rx/LICENSE @@ -0,0 +1,21 @@ +MIT License + +Copyright (c) 2016 Joe Chasinga + +Permission is hereby granted, free of charge, to any person obtaining a copy +of this software and associated documentation files (the "Software"), to deal +in the Software without restriction, including without limitation the rights +to use, copy, modify, merge, publish, distribute, sublicense, and/or sell +copies of the Software, and to permit persons to whom the Software is +furnished to do so, subject to the following conditions: + +The above copyright notice and this permission notice shall be included in all +copies or substantial portions of the Software. + +THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR +IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, +OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE +SOFTWARE.