repos / starfx

supercharged async flow control library.
git clone https://github.com/neurosnap/starfx.git

commit
cab4009
parent
7fa2483
author
Eric Bower
date
2024-01-23 04:48:25 +0000 UTC
refactor: remove `log` (#35)

refactor: removed wrapped spawn from take helpers

BREAKING CHANGE: take helper fn signatures are different
BREAKING CHANGE: no more log or `LogContext`
12 files changed,  +70, -103
D log.ts
M mod.ts
M action.ts
+21, -27
 1@@ -1,6 +1,7 @@
 2 import {
 3   call,
 4   createContext,
 5+  createSignal,
 6   each,
 7   Operation,
 8   Signal,
 9@@ -12,8 +13,9 @@ import { ActionPattern, matcher } from "./matcher.ts";
10 import type { Action, ActionWithPayload, AnyAction } from "./types.ts";
11 import { createFilterQueue } from "./queue.ts";
12 
13-export const ActionContext = createContext<Signal<AnyAction, void>>(
14+export const ActionContext = createContext(
15   "starfx:action",
16+  createSignal<AnyAction, void>(),
17 );
18 
19 export function useActions(pattern: ActionPattern): Stream<AnyAction, void> {
20@@ -68,51 +70,43 @@ export function* takeEvery<T>(
21   pattern: ActionPattern,
22   op: (action: Action) => Operation<T>,
23 ) {
24-  return yield* spawn(function* (): Operation<void> {
25-    const fd = useActions(pattern);
26-    for (const action of yield* each(fd)) {
27-      yield* spawn(() => op(action));
28-      yield* each.next();
29-    }
30-  });
31+  const fd = useActions(pattern);
32+  for (const action of yield* each(fd)) {
33+    yield* spawn(() => op(action));
34+    yield* each.next();
35+  }
36 }
37 
38 export function* takeLatest<T>(
39   pattern: ActionPattern,
40   op: (action: Action) => Operation<T>,
41 ) {
42-  return yield* spawn(function* (): Operation<void> {
43-    const fd = useActions(pattern);
44-    let lastTask;
45+  const fd = useActions(pattern);
46+  let lastTask;
47 
48-    for (const action of yield* each(fd)) {
49-      if (lastTask) {
50-        yield* lastTask.halt();
51-      }
52-      lastTask = yield* spawn(() => op(action));
53-      yield* each.next();
54+  for (const action of yield* each(fd)) {
55+    if (lastTask) {
56+      yield* lastTask.halt();
57     }
58-  });
59+    lastTask = yield* spawn(() => op(action));
60+    yield* each.next();
61+  }
62 }
63-export const latest = takeLatest;
64 
65 export function* takeLeading<T>(
66   pattern: ActionPattern,
67   op: (action: Action) => Operation<T>,
68 ) {
69-  return yield* spawn(function* (): Operation<void> {
70-    while (true) {
71-      const action = yield* take(pattern);
72-      yield* call(() => op(action));
73-    }
74-  });
75+  while (true) {
76+    const action = yield* take(pattern);
77+    yield* call(() => op(action));
78+  }
79 }
80-export const leading = takeLeading;
81 
82 export const API_ACTION_PREFIX = "@@starfx";
83 export const createAction = (curType: string) => {
84   if (!curType) throw new Error("createAction requires non-empty string");
85-  const type = `${API_ACTION_PREFIX}/${curType}`;
86+  const type = `${API_ACTION_PREFIX}:${curType}`;
87   const action = () => ({ type });
88   action.toString = () => type;
89   return action;
M fx/supervisor.ts
+7, -9
 1@@ -1,7 +1,8 @@
 2 import { Callable, Operation, Result, sleep } from "../deps.ts";
 3 import { safe } from "./safe.ts";
 4 import { parallel } from "./parallel.ts";
 5-import { log } from "../log.ts";
 6+import { put } from "../action.ts";
 7+import { API_ACTION_PREFIX } from "../action.ts";
 8 
 9 export function superviseBackoff(attempt: number, max = 10): number {
10   if (attempt > max) return -1;
11@@ -30,14 +31,11 @@ export function supervise<T>(
12       if (res.ok) {
13         attempt = 0;
14       } else {
15-        yield* log({
16-          type: "error:supervise",
17-          payload: {
18-            message:
19-              `Exception caught, waiting ${waitFor}ms before restarting operation`,
20-            error: res.error,
21-            op,
22-          },
23+        yield* put({
24+          type: `${API_ACTION_PREFIX}:supervise`,
25+          payload: res.error,
26+          meta:
27+            `Exception caught, waiting ${waitFor}ms before restarting operation`,
28         });
29         yield* sleep(waitFor);
30       }
D log.ts
+0, -23
 1@@ -1,23 +0,0 @@
 2-import { createChannel, createContext } from "./deps.ts";
 3-import type { ActionWithPayload } from "./types.ts";
 4-
 5-export interface LogMessage {
 6-  [key: string]: any;
 7-}
 8-export type LogAction = ActionWithPayload<LogMessage>;
 9-
10-export function createLogger(type: string) {
11-  return function (payload: LogMessage) {
12-    return log({ type, payload });
13-  };
14-}
15-
16-export function* log(action: LogAction) {
17-  const chan = yield* LogContext;
18-  yield* chan.send(action);
19-}
20-
21-export const LogContext = createContext(
22-  "starfx:logger",
23-  createChannel<LogAction>(),
24-);
M mod.ts
+0, -1
1@@ -3,7 +3,6 @@ export * from "./query/mod.ts";
2 export * from "./types.ts";
3 export * from "./compose.ts";
4 export * from "./action.ts";
5-export * from "./log.ts";
6 export * from "./supervisor.ts";
7 export {
8   action,
M query/mdw.ts
+2, -2
 1@@ -12,8 +12,8 @@ import type {
 2 import type { Next } from "../types.ts";
 3 import { mergeRequest } from "./util.ts";
 4 import * as fetchMdw from "./fetch.ts";
 5-import { log } from "../log.ts";
 6 import { call, Callable } from "../deps.ts";
 7+import { put } from "../action.ts";
 8 export * from "./fetch.ts";
 9 
10 /**
11@@ -33,7 +33,7 @@ export function* err<Ctx extends ThunkCtx = ThunkCtx>(
12 ) {
13   ctx.result = yield* safe(next);
14   if (!ctx.result.ok) {
15-    yield* log({
16+    yield* put({
17       type: "error:query",
18       payload: {
19         message:
M query/thunk.ts
+2, -4
 1@@ -134,8 +134,7 @@ export function createThunks<Ctx extends ThunkCtx = ThunkCtx<any>>(
 2     yield* next();
 3   }
 4 
 5-  const createType = (post: string) =>
 6-    `${API_ACTION_PREFIX}${post.startsWith("/") ? "" : "/"}${post}`;
 7+  const createType = (post: string) => `${API_ACTION_PREFIX}:${post}`;
 8 
 9   function* onApi<P extends CreateActionPayload>(
10     action: ActionWithPayload<P>,
11@@ -197,8 +196,7 @@ export function createThunks<Ctx extends ThunkCtx = ThunkCtx<any>>(
12 
13     const tt = req ? (req as any).supervisor : supervisor;
14     function* curVisor() {
15-      const task = yield* tt(type, onApi);
16-      yield* task;
17+      yield* tt(type, onApi);
18     }
19     visors[name] = curVisor;
20 
M store/store.ts
+7, -10
 1@@ -14,8 +14,8 @@ import type { AnyAction, AnyState, Next } from "../types.ts";
 2 import { safe } from "../fx/mod.ts";
 3 import type { FxStore, Listener, StoreUpdater, UpdaterCtx } from "./types.ts";
 4 import { StoreContext, StoreUpdateContext } from "./context.ts";
 5-import { log } from "../log.ts";
 6 import { ActionContext, emit } from "../action.ts";
 7+import { API_ACTION_PREFIX } from "../action.ts";
 8 
 9 const stubMsg = "This is merely a stub, not implemented";
10 
11@@ -93,9 +93,9 @@ export function createStore<S extends AnyState>({
12   }
13 
14   function* logMdw(ctx: UpdaterCtx<S>, next: Next) {
15-    yield* log({
16-      type: "store",
17-      payload: { ctx },
18+    dispatch({
19+      type: `${API_ACTION_PREFIX}:store`,
20+      payload: ctx,
21     });
22     yield* next();
23   }
24@@ -134,12 +134,9 @@ export function createStore<S extends AnyState>({
25     yield* mdw(ctx);
26 
27     if (!ctx.result.ok) {
28-      yield* log({
29-        type: "error:store",
30-        payload: {
31-          message: `Exception raised when calling store updaters`,
32-          error: ctx.result.error,
33-        },
34+      dispatch({
35+        type: `${API_ACTION_PREFIX}:store`,
36+        payload: ctx.result.error,
37       });
38     }
39 
M test/action.test.ts
+2, -2
 1@@ -5,10 +5,10 @@ const tests = describe("createAction()");
 2 
 3 it(tests, "should return action type when stringified", () => {
 4   const undo = createAction("UNDO");
 5-  expect(`${API_ACTION_PREFIX}/UNDO`).toEqual(`${undo}`);
 6+  expect(`${API_ACTION_PREFIX}:UNDO`).toEqual(`${undo}`);
 7 });
 8 
 9 it(tests, "return object with type", () => {
10   const undo = createAction("UNDO");
11-  expect(undo()).toEqual({ type: `${API_ACTION_PREFIX}/UNDO` });
12+  expect(undo()).toEqual({ type: `${API_ACTION_PREFIX}:UNDO` });
13 });
M test/api.test.ts
+1, -2
 1@@ -229,8 +229,7 @@ it(tests, "run() from a normal saga", () => {
 2   }
 3 
 4   function* watchAction() {
 5-    const task = yield* takeEvery(`${action2}`, onAction);
 6-    yield* task;
 7+    yield* takeEvery(`${action2}`, onAction);
 8   }
 9 
10   const store = configureStore({ initialState: { users: {} } });
M test/supervisor.test.ts
+13, -12
 1@@ -1,15 +1,15 @@
 2 import { describe, expect, it } from "../test.ts";
 3 import {
 4   call,
 5-  each,
 6-  LogAction,
 7-  LogContext,
 8   Operation,
 9   run,
10   spawn,
11   supervise,
12   superviseBackoff,
13 } from "../mod.ts";
14+import { ActionWithPayload } from "../types.ts";
15+import { take } from "../action.ts";
16+import { API_ACTION_PREFIX } from "../action.ts";
17 
18 const test = describe("supervise()");
19 
20@@ -38,6 +38,8 @@ describe("superviseBackoff", () => {
21   });
22 });
23 
24+type LogAction = ActionWithPayload<{ message: string }>;
25+
26 it(test, "should recover with backoff pressure", async () => {
27   const err = console.error;
28   console.error = () => {};
29@@ -53,26 +55,25 @@ it(test, "should recover with backoff pressure", async () => {
30       throw new Error("boom!");
31     }
32     yield* spawn(function* () {
33-      const chan = yield* LogContext;
34-      for (const action of yield* each(chan)) {
35+      while (true) {
36+        const action = yield* take<LogAction["payload"]>("*");
37         actions.push(action);
38-        yield* each.next();
39       }
40     });
41     yield* call(supervise(op, backoff));
42   });
43 
44   expect(actions.length).toEqual(3);
45-  expect(actions[0].type).toEqual("error:supervise");
46-  expect(actions[0].payload.message).toEqual(
47+  expect(actions[0].type).toEqual(`${API_ACTION_PREFIX}:supervise`);
48+  expect(actions[0].meta).toEqual(
49     "Exception caught, waiting 1ms before restarting operation",
50   );
51-  expect(actions[1].type).toEqual("error:supervise");
52-  expect(actions[1].payload.message).toEqual(
53+  expect(actions[1].type).toEqual(`${API_ACTION_PREFIX}:supervise`);
54+  expect(actions[1].meta).toEqual(
55     "Exception caught, waiting 2ms before restarting operation",
56   );
57-  expect(actions[2].type).toEqual("error:supervise");
58-  expect(actions[2].payload.message).toEqual(
59+  expect(actions[2].type).toEqual(`${API_ACTION_PREFIX}:supervise`);
60+  expect(actions[2].meta).toEqual(
61     "Exception caught, waiting 3ms before restarting operation",
62   );
63 
M test/take-helper.test.ts
+8, -5
 1@@ -2,6 +2,7 @@ import { describe, expect, it } from "../test.ts";
 2 import { configureStore } from "../store/mod.ts";
 3 import type { AnyAction } from "../mod.ts";
 4 import { sleep, take, takeEvery, takeLatest, takeLeading } from "../mod.ts";
 5+import { spawn } from "../deps.ts";
 6 
 7 const testEvery = describe("takeEvery()");
 8 const testLatest = describe("takeLatest()");
 9@@ -17,7 +18,7 @@ it(testLatest, "should cancel previous tasks and only use latest", async () => {
10   }
11 
12   function* root() {
13-    const task = yield* takeLatest("ACTION", worker);
14+    const task = yield* spawn(() => takeLatest("ACTION", worker));
15     yield* take("CANCEL_WATCHER");
16     yield* task.halt();
17   }
18@@ -44,7 +45,7 @@ it(testLeading, "should keep first action and discard the rest", async () => {
19   }
20 
21   function* root() {
22-    const task = yield* takeLeading("ACTION", worker);
23+    const task = yield* spawn(() => takeLeading("ACTION", worker));
24     yield* sleep(150);
25     yield* task.halt();
26   }
27@@ -66,9 +67,11 @@ it(testEvery, "should receive all actions", async () => {
28   const actual: string[][] = [];
29 
30   function* root() {
31-    const task = yield* takeEvery(
32-      "ACTION",
33-      (action) => worker("a1", "a2", action),
34+    const task = yield* spawn(() =>
35+      takeEvery(
36+        "ACTION",
37+        (action) => worker("a1", "a2", action),
38+      )
39     );
40     yield* take("CANCEL_WATCHER");
41     yield* task.halt();
M types.ts
+7, -6
 1@@ -42,16 +42,17 @@ export interface Payload<P = any> {
 2   payload: P;
 3 }
 4 
 5-export interface AnyAction {
 6+export interface Action {
 7   type: string;
 8-  // deno-lint-ignore no-explicit-any
 9-  [key: string]: any;
10 }
11 
12-export interface Action {
13-  type: string;
14+// https://github.com/redux-utilities/flux-standard-action
15+export interface AnyAction extends Action {
16+  payload?: any;
17+  meta?: any;
18+  error?: boolean;
19 }
20 
21-export interface ActionWithPayload<P> extends Action {
22+export interface ActionWithPayload<P> extends AnyAction {
23   payload: P;
24 }