repos / starfx

supercharged async flow control library.
git clone https://github.com/neurosnap/starfx.git

commit
c6193ae
parent
5dd4a44
author
Eric Bower
date
2023-11-14 02:40:00 +0000 UTC
refactor(fx): use `call` from `effection` (#20)

13 files changed,  +51, -86
M deps.ts
+1, -0
1@@ -15,6 +15,7 @@ export type {
2 } from "https://deno.land/x/effection@3.0.0-beta.2/mod.ts";
3 export {
4   action,
5+  call,
6   createChannel,
7   createContext,
8   createQueue,
M fx/call.test.ts
+0, -23
 1@@ -1,6 +1,5 @@
 2 import { describe, expect, it } from "../test.ts";
 3 import { run } from "../deps.ts";
 4-
 5 import { call } from "./call.ts";
 6 
 7 const tests = describe("call()");
 8@@ -31,28 +30,6 @@ it(tests, "should return an Err()", async () => {
 9   });
10 });
11 
12-it(tests, "should call a normal function with no params", async () => {
13-  function me() {
14-    return "valid";
15-  }
16-
17-  await run(function* () {
18-    const result = yield* call(me);
19-    expect(result).toEqual("valid");
20-  });
21-});
22-
23-it(tests, "should call a normal function with params", async () => {
24-  function me(v: string) {
25-    return "valid " + v;
26-  }
27-
28-  await run(function* () {
29-    const result = yield* call(() => me("fn"));
30-    expect(result).toEqual("valid fn");
31-  });
32-});
33-
34 it(tests, "should call a promise", async () => {
35   const me = () =>
36     new Promise<string>((resolve) => {
M fx/call.ts
+5, -31
 1@@ -1,38 +1,12 @@
 2-import type { OpFn } from "../types.ts";
 3 import type { Operation, Result } from "../deps.ts";
 4-import { action, Err, expect, Ok } from "../deps.ts";
 5+import { call, Err, Ok } from "../deps.ts";
 6+import type { Operator } from "../types.ts";
 7 
 8-export const isFunc = (f: unknown) => typeof f === "function";
 9-export const isPromise = (p: unknown) =>
10-  p && isFunc((p as PromiseLike<unknown>).then);
11-export const isIterator = (it: unknown) =>
12-  it &&
13-  isFunc((it as Iterator<unknown>).next) &&
14-  isFunc((it as Iterator<unknown>).throw);
15+export { call };
16 
17-export function* toOperation<T>(opFn: OpFn<T>): Operation<T> {
18-  const op = opFn();
19-  let result: T;
20-  if (isPromise(op)) {
21-    result = yield* expect(op as Promise<T>);
22-  } else if (isIterator(op)) {
23-    result = yield* op as Operation<T>;
24-  } else {
25-    result = op as T;
26-  }
27-  return result;
28-}
29-
30-export function call<T>(op: OpFn<T>): Operation<T> {
31-  return action(function* (resolve) {
32-    const result = yield* toOperation(op);
33-    resolve(result);
34-  });
35-}
36-
37-export function* safe<T>(opFn: OpFn<T>): Operation<Result<T>> {
38+export function* safe<T>(operator: Operator<T>): Operation<Result<T>> {
39   try {
40-    const value = yield* call(opFn);
41+    const value: T = yield* call(operator as any) as any;
42     return Ok(value);
43   } catch (error) {
44     return Err(error);
M fx/parallel.ts
+2, -2
 1@@ -1,5 +1,5 @@
 2 import type { Channel, Operation, Result } from "../deps.ts";
 3-import type { Computation, OpFn } from "../types.ts";
 4+import type { Computation, Operator } from "../types.ts";
 5 import { createChannel, resource, spawn } from "../deps.ts";
 6 
 7 import { safe } from "./call.ts";
 8@@ -9,7 +9,7 @@ export interface ParallelRet<T> extends Computation<Result<T>[]> {
 9   immediate: Channel<Result<T>, void>;
10 }
11 
12-export function parallel<T>(operations: OpFn<T>[]) {
13+export function parallel<T>(operations: Operator<T>[]) {
14   const sequence = createChannel<Result<T>>();
15   const immediate = createChannel<Result<T>>();
16   const results: Result<T>[] = [];
M fx/race.ts
+14, -7
 1@@ -1,16 +1,21 @@
 2 import type { Operation, Task } from "../deps.ts";
 3 import { action, resource, spawn } from "../deps.ts";
 4-import type { OpFn } from "../types.ts";
 5-
 6-import { toOperation } from "./call.ts";
 7+import type { Operator } from "../types.ts";
 8+import { call } from "./call.ts";
 9 
10 interface OpMap<T = unknown> {
11-  [key: string]: OpFn<T>;
12+  [key: string]: Operator<T>;
13 }
14 
15-export function race(
16+export function race<T>(
17   opMap: OpMap,
18-): Operation<{ [K in keyof OpMap<unknown>]: ReturnType<OpMap[K]> }> {
19+): Operation<
20+  {
21+    [K in keyof OpMap<T>]: OpMap[K] extends (...args: any[]) => any
22+      ? ReturnType<OpMap[K]>
23+      : OpMap[K];
24+  }
25+> {
26   return resource(function* Race(provide) {
27     const keys = Object.keys(opMap);
28     const taskMap: { [key: string]: Task<unknown> } = {};
29@@ -20,7 +25,9 @@ export function race(
30       for (let i = 0; i < keys.length; i += 1) {
31         const key = keys[i];
32         yield* spawn(function* () {
33-          const task = yield* spawn(() => toOperation(opMap[key]));
34+          const task = yield* spawn(function* () {
35+            yield* call(opMap[key] as any);
36+          });
37           taskMap[key] = task;
38           (resultMap as any)[key] = yield* task;
39           resolve(task);
M fx/request.ts
+3, -3
 1@@ -1,12 +1,12 @@
 2-import { expect, useAbortSignal } from "../deps.ts";
 3+import { call, useAbortSignal } from "../deps.ts";
 4 
 5 export function* request(url: string | URL | Request, opts?: RequestInit) {
 6   const signal = yield* useAbortSignal();
 7-  const response = yield* expect(fetch(url, { signal, ...opts }));
 8+  const response = yield* call(fetch(url, { signal, ...opts }));
 9   return response;
10 }
11 
12 export function* json(response: Response) {
13-  const result = yield* expect(response.json());
14+  const result = yield* call(response.json());
15   return result;
16 }
M fx/watch.ts
+3, -4
 1@@ -1,9 +1,8 @@
 2-import type { OpFn } from "../types.ts";
 3-
 4+import type { Operator } from "../types.ts";
 5 import { safe } from "./call.ts";
 6 import { parallel } from "./parallel.ts";
 7 
 8-export function supervise<T>(op: OpFn<T>) {
 9+export function supervise<T>(op: Operator<T>) {
10   return function* () {
11     while (true) {
12       yield* safe(op);
13@@ -11,7 +10,7 @@ export function supervise<T>(op: OpFn<T>) {
14   };
15 }
16 
17-export function* keepAlive(ops: OpFn[]) {
18+export function* keepAlive(ops: Operator<unknown>[]) {
19   const results = yield* parallel(ops.map(supervise));
20   return yield* results;
21 }
M query/middleware.ts
+7, -3
 1@@ -1,6 +1,6 @@
 2 import { call } from "../fx/mod.ts";
 3 import { compose } from "../compose.ts";
 4-import type { OpFn } from "../types.ts";
 5+import type { Operator } from "../types.ts";
 6 
 7 import type {
 8   Action,
 9@@ -153,9 +153,13 @@ export function* performanceMonitor<Ctx extends PerfCtx = PerfCtx>(
10 /**
11  * This middleware will call the `saga` provided with the action sent to the middleware pipeline.
12  */
13-export function wrap<Ctx extends PipeCtx = PipeCtx>(op: (a: Action) => OpFn) {
14+export function wrap<Ctx extends PipeCtx = PipeCtx, T = any>(
15+  op: (a: Action) => Operator<T>,
16+) {
17   return function* (ctx: Ctx, next: Next) {
18-    yield* call(() => op(ctx.action));
19+    yield* call(function* () {
20+      return op(ctx.action);
21+    });
22     yield* next();
23   };
24 }
M query/pipe.ts
+3, -3
 1@@ -1,5 +1,5 @@
 2 import { compose } from "../compose.ts";
 3-import type { OpFn, Payload } from "../types.ts";
 4+import type { Operator, Payload } from "../types.ts";
 5 import { parallel } from "../mod.ts";
 6 
 7 // TODO: remove store deps
 8@@ -24,7 +24,7 @@ import { Ok } from "../deps.ts";
 9 export interface SagaApi<Ctx extends PipeCtx> {
10   use: (fn: Middleware<Ctx>) => void;
11   routes: () => Middleware<Ctx>;
12-  bootup: OpFn;
13+  bootup: Operator<unknown>;
14 
15   /**
16    * Name only
17@@ -127,7 +127,7 @@ export function createPipe<Ctx extends PipeCtx = PipeCtx<any>>(
18   } = { supervisor: takeEvery },
19 ): SagaApi<Ctx> {
20   const middleware: Middleware<Ctx>[] = [];
21-  const visors: { [key: string]: OpFn } = {};
22+  const visors: { [key: string]: Operator<unknown> } = {};
23   const middlewareMap: { [key: string]: Middleware<Ctx> } = {};
24   const actionMap: {
25     [key: string]: CreateActionWithPayload<Ctx, any>;
M store/store.ts
+2, -2
 1@@ -9,7 +9,7 @@ import {
 2   Task,
 3 } from "../deps.ts";
 4 import { BaseMiddleware, compose } from "../compose.ts";
 5-import type { AnyAction, AnyState, OpFn } from "../types.ts";
 6+import type { AnyAction, AnyState, Operator } from "../types.ts";
 7 import { safe } from "../fx/mod.ts";
 8 import { Next } from "../query/types.ts";
 9 import type { FxStore, Listener, StoreUpdater, UpdaterCtx } from "./types.ts";
10@@ -132,7 +132,7 @@ export function createStore<S extends AnyState>({
11     });
12   }
13 
14-  function run<T>(op: OpFn<T>): Task<Result<T>> {
15+  function run<T>(op: Operator<T>): Task<Result<T>> {
16     return scope.run(function* () {
17       return yield* safe(op);
18     });
M store/supervisor.ts
+5, -3
 1@@ -1,7 +1,7 @@
 2 import { call, race } from "../fx/mod.ts";
 3 import { take } from "./fx.ts";
 4 import { Operation, sleep, spawn, Task } from "../deps.ts";
 5-import type { ActionWPayload, AnyAction, OpFn } from "../types.ts";
 6+import type { ActionWPayload, AnyAction, Operator } from "../types.ts";
 7 import type { CreateActionPayload } from "../query/mod.ts";
 8 
 9 const MS = 1000;
10@@ -44,12 +44,14 @@ export function poll(parentTimer: number = 5 * 1000, cancelType?: string) {
11 export function timer(timer: number = 5 * MINUTES) {
12   return function* onTimer(
13     actionType: string,
14-    op: (action: AnyAction) => OpFn,
15+    op: (action: AnyAction) => Operator<unknown>,
16   ) {
17     const map: { [key: string]: Task<unknown> } = {};
18 
19     function* activate(action: ActionWPayload<CreateActionPayload>) {
20-      yield* call(() => op(action));
21+      yield* call(function* () {
22+        return op(action);
23+      });
24       yield* sleep(timer);
25       delete map[action.payload.key];
26     }
M store/types.ts
+2, -2
 1@@ -1,6 +1,6 @@
 2 import type { Operation, Patch, Result, Scope, Task } from "../deps.ts";
 3 import { BaseCtx } from "../mod.ts";
 4-import type { AnyAction, AnyState, OpFn } from "../types.ts";
 5+import type { AnyAction, AnyState, Operator } from "../types.ts";
 6 
 7 export type StoreUpdater<S extends AnyState> = (s: S) => S | void;
 8 
 9@@ -32,7 +32,7 @@ export interface FxStore<S extends AnyState> {
10   getState: () => S;
11   subscribe: (fn: Listener) => () => void;
12   update: (u: StoreUpdater<S> | StoreUpdater<S>[]) => Operation<UpdaterCtx<S>>;
13-  run: <T>(op: OpFn<T>) => Task<Result<T>>;
14+  run: <T>(op: Operator<T>) => Task<Result<T>>;
15   // deno-lint-ignore no-explicit-any
16   dispatch: (a: AnyAction) => any;
17   replaceReducer: (r: (s: S, a: AnyAction) => S) => void;
M types.ts
+4, -3
 1@@ -5,10 +5,11 @@ export interface Computation<T = unknown> {
 2   [Symbol.iterator](): Iterator<Instruction, T, any>;
 3 }
 4 
 5-export type OpFn<T = unknown> =
 6+export type Operator<T> =
 7+  | Operation<T>
 8+  | Promise<T>
 9   | (() => Operation<T>)
10-  | (() => PromiseLike<T>)
11-  | (() => T);
12+  | (() => Promise<T>);
13 
14 export interface QueryState {
15   "@@starfx/loaders": Record<string, LoaderItemState>;