From 96237fe639f8d685a433d7217ca4532d3d70e170 Mon Sep 17 00:00:00 2001 From: Brandon Blaylock Date: Mon, 18 Mar 2024 16:23:47 -0700 Subject: [PATCH] release: 2.1.2 * add store to contrib/dux * add ideas/most2 --- contrib/dux.ts | 144 ++++++++++++++++- deno.json | 2 +- ideas/most2.ts | 421 +++++++++++++++++++++++++++++++++++++++++++++++++ 3 files changed, 561 insertions(+), 6 deletions(-) create mode 100644 ideas/most2.ts diff --git a/contrib/dux.ts b/contrib/dux.ts index 14e763a..e4e6719 100644 --- a/contrib/dux.ts +++ b/contrib/dux.ts @@ -1,10 +1,12 @@ import type { DatumEither } from "../datum_either.ts"; +import type { Stream } from "./most.ts"; import type { Lens } from "../optic.ts"; +import * as A from "../array.ts"; import * as D from "../datum.ts"; import * as DE from "../datum_either.ts"; +import * as M from "./most.ts"; import * as O from "../optic.ts"; - import { pipe } from "../fn.ts"; // ======= @@ -61,7 +63,7 @@ export type ActionCreator

= /** * Extract an Action type from an ActionCreator * - * @since 8.0.0 + * @since 2.1.0 */ export type ExtractAction = T extends ActionCreator[] ? Action

: never; @@ -134,8 +136,6 @@ function tagFactory(...tags: string[]): ActionType { * The simplest way to create an action. * Generally, for all but the simplest of applications, using * actionCreatorsFactory is a better move. - * - * @since 7.0.0 */ export function actionFactory

(type: string): ActionFunction

{ return ((value: P) => ({ type, value })) as ActionFunction

; @@ -274,7 +274,7 @@ export function asyncReducerFactory( /** * Filters actions by first section of action type to bypass sections of the store * - * @since 7.1.0 + * @since 2.1.0 */ export const filterReducer = ( match: string, @@ -282,3 +282,137 @@ export const filterReducer = ( ): Reducer => (state, action) => action.type.startsWith(match) ? reducer(state, action) : state; + +// ============ +// MetaReducers +// ============ + +export type MetaReducer = ( + reducer: Reducer, +) => Reducer; + +export function metaReducerFn( + metareducer: MetaReducer, +): MetaReducer { + return metareducer; +} + +// ======= +// Effects +// ======= + +/** + * @since 2.1.2 + */ +export type Effect = ( + a: A, +) => Stream; + +/** + * @since 2.1.2 + */ +export type EffectWide = ( + a: A, +) => ActionType | Promise | Stream; + +function liftAction( + action: ActionType | Promise | Stream, +): Stream { + if ("type" in action) { + return M.wrap(action); + } else if ("then" in action) { + return M.fromPromise(action); + } else { + return action; + } +} + +/** + * @since 2.1.2 + */ +export function caseEff

( + action: ActionCreator

, + effect: EffectWide>, +): Effect { + return (a) => action.match(a) ? liftAction(effect(a)) : M.empty(); +} + +/** + * @since 2.1.2 + */ +export function caseEffs[]>( + actionCreators: A, + effect: EffectWide>, +): Effect { + return (a) => + actionCreators.some(({ match }) => match(a)) + ? liftAction(effect(> a)) + : M.empty(); +} + +/** + * @since 2.1.2 + */ +export function effectFn( + ...cases: ReadonlyArray> +): Effect { + return (action) => + pipe( + cases, + A.map((a) => liftAction(a(action))), + M.mergeArray, + ); +} + +// ========== +// MetaEffect +// ========== + +/** + * @since 2.1.2 + */ +export type MetaEffect = ( + effect: Effect, +) => Effect; + +/** + * @since 2.1.2 + */ +export function metaEffectFn( + metaEffect: MetaEffect, +): MetaEffect { + return metaEffect; +} + +// ===== +// Store +// ===== + +/** + * @since 2.1.2 + */ +export type Store = { + readonly state: Stream; + readonly dispatch: (action: ActionType) => void; +}; + +/** + * @since 2.1.2 + */ +export function createStore( + initial: S, + reducer: Reducer, + effect: Effect = () => M.empty(), +): Store { + const [dispatch, dispatched] = M.createAdapter(); + const state = pipe( + dispatched, + M.mergeMapConcurrently( + (a) => M.startWith(a, effect(a)), + Number.POSITIVE_INFINITY, + ), + M.scan(reducer, initial), + ); + + return { state, dispatch }; +} diff --git a/deno.json b/deno.json index 4ebafdb..0a94b0b 100644 --- a/deno.json +++ b/deno.json @@ -1,6 +1,6 @@ { "name": "@baetheus/fun", - "version": "2.1.1", + "version": "2.1.2", "exports": { "./applicable": "./applicable.ts", "./array": "./array.ts", diff --git a/ideas/most2.ts b/ideas/most2.ts new file mode 100644 index 0000000..8b16ea7 --- /dev/null +++ b/ideas/most2.ts @@ -0,0 +1,421 @@ +import type { Intersect } from "../kind.ts"; +import { isNotNil } from "../nil.ts"; +import { pipe, todo } from "../fn.ts"; + +export type Stream = { + run(sink: Sink, env: R): Disposable; +}; + +export type Sink = { + event(a: A): void; + end(): void; +}; + +export type TypeOf = U extends Stream ? A : never; + +export type EnvOf = U extends Stream ? R : never; + +export function stream( + run: (sink: Sink, env: R) => Disposable, +): Stream { + return { run }; +} + +export function sink( + event: (a: A) => void, + end: () => void, +): Sink { + return { event, end }; +} + +export function disposable(dispose: () => void): Disposable { + return { [Symbol.dispose]: dispose }; +} + +export function dispose(disposable: Disposable): void { + disposable[Symbol.dispose](); +} + +export const DISPOSE_NONE: Disposable = disposable(() => undefined); + +export const EMPTY: Stream = stream( + (sink) => { + sink.end(); + return DISPOSE_NONE; + }, +); + +export const NEVER: Stream = stream(() => DISPOSE_NONE); + +export type Timeout = { + // deno-lint-ignore no-explicit-any + setTimeout( + f: (...a: Args) => void, + timeoutMillis: number, + ...a: Args + ): Disposable; +}; + +export function at(time: number): Stream { + return stream((sink: Sink, { setTimeout }: Timeout) => + setTimeout( + ({ event, end }) => { + event(undefined); + end(); + }, + time, + sink, + ) + ); +} + +export function continueWith( + second: () => Stream, +): (first: Stream) => Stream { + return (first) => + stream((snk, env) => { + let d = first.run( + sink(snk.event, () => { + d = second().run(snk, env); + }), + env, + ); + return d; + }); +} + +export function periodic(period: number): Stream { + return pipe( + at(period), + continueWith(() => periodic(period)), + ); +} + +export function map( + fai: (a: A) => I, +): (ua: Stream) => Stream { + return (ua) => + stream(({ event, end }, env) => + ua.run(sink((a) => event(fai(a)), end), env) + ); +} + +export function tap( + fa: (a: A) => void, +): (ua: Stream) => Stream { + return (ua) => + stream(({ event, end }, env) => + ua.run( + sink((a) => { + fa(a); + event(a); + }, end), + env, + ) + ); +} + +export function filter( + refinement: (a: A) => a is B, +): (s: Stream) => Stream; +export function filter( + predicate: (a: A) => boolean, +): (s: Stream) => Stream; +export function filter( + predicate: (a: A) => boolean, +): (ua: Stream) => Stream { + return (ua) => + stream(({ event, end }, env) => + ua.run( + sink((a) => { + if (predicate(a)) { + event(a); + } + }, end), + env, + ) + ); +} + +export function scan( + foao: (o: O, a: A) => O, + init: O, +): (ua: Stream) => Stream { + return (ua) => + stream(({ event, end }, env) => { + let hold = init; + return ua.run( + sink((a) => { + hold = foao(hold, a); + event(hold); + }, end), + env, + ); + }); +} + +// deno-lint-ignore no-explicit-any +export function merge[]>( + ...streams: Streams +): Stream< + TypeOf, + Intersect>[0]> +> { + return stream(({ event, end }, env) => { + let count = streams.length; + const disposables: Disposable[] = []; + const done = () => disposables.forEach(dispose); + + // last stream disposes and pass an end to the sink + const snk = sink(event, () => { + if (--count === 0) { + done(); + end(); + } + }); + + // dump the merged disposables into our stateful array + disposables.push(...streams.map((s) => s.run(snk, env))); + return disposable(done); + }); +} + +// deno-lint-ignore no-explicit-any +export function combine[]>( + init: { readonly [K in keyof Streams]: TypeOf }, + ...streams: Streams +): Stream< + { readonly [K in keyof Streams]: TypeOf }, + Intersect>[0]> +> { + return stream(({ event, end }, env) => { + let count = streams.length; + const values = [...init] as { [K in keyof Streams]: TypeOf }; + const disposables: Disposable[] = []; + const done = () => disposables.forEach(dispose); + + disposables.push( + ...streams.map((s, i) => + s.run( + sink((a: TypeOf) => { + values[i] = a; + event(values); + }, () => { + if (--count === 0) { + done(); + end(); + } + }), + env, + ) + ), + ); + + return disposable(done); + }); +} + +export type SeedValue = { readonly seed: S; readonly value: V }; + +export function loop( + stepper: (seed: S, a: A) => SeedValue, + seed: S, +): (ua: Stream) => Stream { + return (ua) => + stream(({ event, end }, env) => { + let hold: S = seed; + return ua.run( + sink((a) => { + const { seed, value } = stepper(hold, a); + hold = seed; + event(value); + }, end), + env, + ); + }); +} + +export function join(concurrency = 1): ( + ua: Stream, R2>, +) => Stream { + return ( + ua: Stream, R2>, + ): Stream => + stream(({ event, end }, env) => { + let done = false; + const queue: Stream[] = []; + const running = new Map, Disposable>(); // WeakMap? + + function startInner(strm: Stream) { + const snk = sink(event, () => { + if (running.has(snk)) { + running.delete(snk); + } + + if (queue.length > 0 && running.size < concurrency) { + startInner(queue.shift() as Stream); + } + + if (done && queue.length === 0 && running.size === 0) { + end(); + } + }); + + running.set(snk, strm.run(snk, env)); + } + + return ua.run( + sink((strm) => { + if (running.size < concurrency) { + startInner(strm); + } else { + queue.push(strm); + } + }, () => { + done = true; + }), + env, + ); + }); +} + +export function takeUntil( + predicate: (a: A) => boolean, +): (ua: Stream) => Stream { + return (ua) => + stream(({ event, end }, env) => { + const dsp = ua.run( + sink((a) => { + if (predicate(a)) { + end(); + } else { + event(a); + } + }, end), + env, + ); + return dsp; + }); +} + +// export function flatMap( +// faui: (a: A) => Stream, +// count = 1, +// ): (ua: Stream) => Stream { +// return (ua) => +// stream(({ event, end }, env) => { +// const queue = new Array>(); +// const running = new Map, Disposable>(); + +// const = () => { +// const snk = sink(event, () => { +// const disposable = running.get(snk); +// if (isNotNil(disposable)) { +// dispose(disposable); +// running.delete(snk); +// } +// }); + +// }; + +// ua.run( +// sink((a) => { +// const strm = faui(a); + +// if (running.size < count) { +// const snk = sink(event, () => { +// const disposable = running.get(snk); +// if (isNotNil(disposable)) { +// dispose(disposable); +// running.delete(snk); +// } + +// }); +// } +// }, () => {}), +// env, +// ); +// }); +// } + +export function indexed( + fsi: (s: S) => [I, S], + init: S, +): (ua: Stream) => Stream<[I, A]> { + return loop((s, a) => { + const [index, seed] = fsi(s); + return { seed, value: [index, a] }; + }, init); +} + +export function withIndex( + start: number = 0, + step: number = 1, +): (ua: Stream) => Stream<[number, A], R> { + return indexed((i) => [i, i + step], start); +} + +export function withCount( + ua: Stream, +): Stream<[number, A], R> { + return pipe(ua, withIndex(1)); +} + +export function createAdapter(): [ + (value: A) => void, + Stream, +] { + const dispatcher = { dispatch: (_: A) => {} }; + const dispatch = (a: A) => dispatcher.dispatch(a); + return [ + dispatch, + stream(({ event, end }) => { + dispatcher.dispatch = event; + return disposable(() => { + dispatcher.dispatch = () => {}; + end(); + }); + }), + ]; +} + +export function fromPromise(ua: Promise): Stream { + return stream((sink) => { + const dispatcher = { dispatch: sink.event }; + const done = disposable(() => { + dispatcher.dispatch = () => {}; + }); + ua.then((a) => dispatcher.dispatch(a)).finally(() => dispose(done)); + return done; + }); +} + +export function run( + sink: Sink, + env: R, +): (ua: Stream) => Disposable { + return (ua) => ua.run(sink, env); +} + +export function runPromise(env: R): (ua: Stream) => Promise { + return (ua) => + new Promise((resolve) => run(sink(() => {}, resolve), env)(ua)); +} + +const test = pipe( + periodic(1000), + withCount, + map(([i]) => + pipe( + periodic(200), + withCount, + map(([j]): [number, number] => [i, j]), + takeUntil(([_, j]) => j > 3), + ) + ), + join(Number.POSITIVE_INFINITY), +); + +const timeout: Timeout = { setTimeout } as unknown as Timeout; + +pipe(test, run(sink(console.log, () => console.log("done")), timeout));