repos / starfx

supercharged async flow control library.
git clone https://github.com/neurosnap/starfx.git

commit
6d5ec93
parent
3db088e
author
Eric Bower
date
2023-07-30 14:37:14 +0000 UTC
fix: ensure compose mdw returns aggregate `Result` (#6)

There was a regression compared to `saga-query` for `createPipe()` where
we were swallowing errors in the mdw pipeline.  This is largely because
we are experimenting with the idea of ensuring all operations inside
`starfx` are safe-by-default and return a `Result` instead of `throw`.

This effort is a design decision to treat errors-as-data.

In order to support safe-by-default inside our `compose()` function, we
need to aggregate the `Result` of every mdw call and then attach an
aggregate `Result` onto the `ctx` provided to `compose()`.

So now we require at least one field to always exist on `ctx` which is
`ctx.result` which returns `Result<any[]>`.

If an error exists inside the aggregate results, we return it unwrapped
as an error.  If no error exists in the results then we return a single
`Result` with a list of values unwrapped.

Here is a test that demonstrates the new behavior:

```ts
it(tests, "error inside endpoint mdw", async () => {
  let called = false;
  const query = createPipe();
  query.use(function* (ctx, next) {
    yield* next();
    if (!ctx.result.ok) {
      called = true;
    }
  });

  query.use(query.routes());

  const fetchUsers = query.create(
    `/users`,
    { supervisor: takeEvery },
    function* processUsers() {
      throw new Error("some error");
    },
  );

  const store = await configureStore({
    initialState: {
      ...createQueryState(),
      users: {},
    },
  });
  store.run(query.bootup);
  store.dispatch(fetchUsers());
  asserts.assertEquals(called, true);
});
```
9 files changed,  +173, -52
M compose.test.ts
+66, -9
  1@@ -1,12 +1,12 @@
  2-import { describe, expect, it } from "./test.ts";
  3+import { asserts, describe, expect, it } from "./test.ts";
  4 
  5-import { Ok, run, sleep } from "./deps.ts";
  6+import { Err, Ok, Result, run, sleep } from "./deps.ts";
  7 import { compose } from "./compose.ts";
  8 
  9 const tests = describe("compose()");
 10 
 11 it(tests, "should compose middleware", async () => {
 12-  const mdw = compose<{ one: string; three: string }>([
 13+  const mdw = compose<{ one: string; three: string; result: Result<any[]> }>([
 14     function* (ctx, next) {
 15       ctx.one = "two";
 16       yield* next();
 17@@ -17,19 +17,22 @@ it(tests, "should compose middleware", async () => {
 18     },
 19   ]);
 20   const actual = await run(function* () {
 21-    return yield* mdw({ one: "", three: "" });
 22+    const ctx = { one: "", three: "", result: Ok([]) };
 23+    yield* mdw(ctx);
 24+    return ctx;
 25   });
 26 
 27-  const expected = Ok({
 28+  const expected = {
 29     // we should see the mutation
 30     one: "two",
 31     three: "four",
 32-  });
 33+    result: Ok([undefined, undefined]),
 34+  };
 35   expect(actual).toEqual(expected);
 36 });
 37 
 38 it(tests, "order of execution", async () => {
 39-  const mdw = compose<{ actual: string }>([
 40+  const mdw = compose<{ actual: string; result: Result<any[]> }>([
 41     function* (ctx, next) {
 42       ctx.actual += "a";
 43       yield* next();
 44@@ -52,8 +55,62 @@ it(tests, "order of execution", async () => {
 45   ]);
 46 
 47   const actual = await run(function* () {
 48-    return yield* mdw({ actual: "" });
 49+    const ctx = { actual: "", result: Ok([]) };
 50+    yield* mdw(ctx);
 51+    return ctx;
 52   });
 53-  const expected = Ok({ actual: "abcdefg" });
 54+  const expected = {
 55+    actual: "abcdefg",
 56+    result: Ok([undefined, undefined, undefined]),
 57+  };
 58+  expect(actual).toEqual(expected);
 59+});
 60+
 61+it(tests, "result of each mdw is aggregated to `ctx.result`", async () => {
 62+  const mdw = compose<{ result: Result<any[]> }>([
 63+    function* (_, next) {
 64+      yield* next();
 65+      return "two";
 66+    },
 67+    function* (_, next) {
 68+      yield* next();
 69+      return "one";
 70+    },
 71+  ]);
 72+  const actual = await run(function* () {
 73+    const ctx = { result: Ok([]) };
 74+    yield* mdw(ctx);
 75+    return ctx;
 76+  });
 77+
 78+  const expected = {
 79+    result: Ok(["one", "two"]),
 80+  };
 81+
 82+  expect(actual).toEqual(expected);
 83+});
 84+
 85+it(tests, "when error is discovered return in `ctx.result`", async () => {
 86+  const err = new Error("boom");
 87+  const mdw = compose<{ result: Result<any[]> }>([
 88+    function* (_, next) {
 89+      yield* next();
 90+      throw err;
 91+    },
 92+    function* (_, next) {
 93+      yield* next();
 94+      asserts.fail();
 95+    },
 96+  ]);
 97+  const actual = await run(function* () {
 98+    const ctx = { result: Ok([]) };
 99+    yield* mdw(ctx);
100+    return ctx;
101+  });
102+
103+  const expected = {
104+    result: Err(err),
105+  };
106+
107   expect(actual).toEqual(expected);
108 });
M compose.ts
+29, -14
 1@@ -1,14 +1,19 @@
 2 import { call } from "./fx/mod.ts";
 3 import type { Next } from "./query/mod.ts";
 4-import type { Instruction, Operation } from "./deps.ts";
 5-import { Ok } from "./deps.ts";
 6+import { Err, Instruction, Operation, Result } from "./deps.ts";
 7+import { resultAll } from "./result.ts";
 8 
 9 // deno-lint-ignore no-explicit-any
10-export type BaseCtx = Record<string, any>;
11+export interface BaseCtx<T extends any[] = any[]> {
12+  // deno-lint-ignore no-explicit-any
13+  [key: string]: any;
14+  result: Result<T>;
15+}
16+
17 export type BaseMiddleware<Ctx extends BaseCtx = BaseCtx, T = unknown> = (
18   ctx: Ctx,
19   next: Next,
20-) => Operation<T>;
21+) => Operation<T | undefined>;
22 
23 export function compose<Ctx extends BaseCtx = BaseCtx, T = unknown>(
24   middleware: BaseMiddleware<Ctx, T>[],
25@@ -24,12 +29,15 @@ export function compose<Ctx extends BaseCtx = BaseCtx, T = unknown>(
26   }
27 
28   return function* composeFn(context: Ctx, mdw?: BaseMiddleware<Ctx, T>) {
29+    // deno-lint-ignore no-explicit-any
30+    const results: Result<any>[] = [];
31     // last called middleware #
32     let index = -1;
33 
34-    function* dispatch(i: number): Generator<Instruction, T | undefined, void> {
35+    function* dispatch(i: number): Generator<Instruction, void, void> {
36       if (i <= index) {
37-        throw new Error("next() called multiple times");
38+        results.push(Err(new Error("next() called multiple times")));
39+        return;
40       }
41       index = i;
42       let fn: BaseMiddleware<Ctx, T> | undefined = middleware[i];
43@@ -40,15 +48,22 @@ export function compose<Ctx extends BaseCtx = BaseCtx, T = unknown>(
44         return;
45       }
46       const nxt = dispatch.bind(null, i + 1);
47-      const result = yield* fn(context, nxt);
48-      return result;
49-    }
50+      // wrap mdw in a safe call
51+      const result = yield* call(() =>
52+        (fn as BaseMiddleware<Ctx, T>)(context, nxt)
53+      );
54 
55-    const result = yield* call(() => dispatch(0));
56-    if (result.ok) {
57-      return Ok(context);
58-    } else {
59-      return result;
60+      // exit early if an error is discovered
61+      if (!result.ok) {
62+        context.result = result;
63+        return;
64+      }
65+
66+      results.push(result);
67+      // aggregate results on each pass of the mdw
68+      context.result = resultAll(results);
69     }
70+
71+    yield* dispatch(0);
72   };
73 }
M deno.lock
+2, -0
1@@ -265,6 +265,8 @@
2     "https://esm.sh/v128/@types/prop-types@15.7.5/index.d.ts": "6a386ff939f180ae8ef064699d8b7b6e62bc2731a62d7fbf5e02589383838dea",
3     "https://esm.sh/v128/@types/react@18.2.14/global.d.ts": "549df62b64a71004aee17685b445a8289013daf96246ce4d9b087d13d7a27a61",
4     "https://esm.sh/v128/@types/react@18.2.14/index.d.ts": "9aac3002309f46089b251d57e7b16c20bb0e8be711867bd308009fb33562b46c",
5+    "https://esm.sh/v128/@types/react@18.2.15/global.d.ts": "549df62b64a71004aee17685b445a8289013daf96246ce4d9b087d13d7a27a61",
6+    "https://esm.sh/v128/@types/react@18.2.15/index.d.ts": "f55ede57bc191e4c9c326fc86393abaa7b20ea3b34199e4c0b118a703a4481b6",
7     "https://esm.sh/v128/@types/scheduler@0.16.3/tracing.d.ts": "f5a8b384f182b3851cec3596ccc96cb7464f8d3469f48c74bf2befb782a19de5",
8     "https://esm.sh/v128/csstype@3.1.2/index.d.ts": "4c68749a564a6facdf675416d75789ee5a557afda8960e0803cf6711fa569288"
9   },
M query/pipe.test.ts
+38, -19
 1@@ -188,13 +188,13 @@ it(
 2 );
 3 
 4 it(tests, "error handling", async () => {
 5+  let called = false;
 6   const api = createPipe<RoboCtx>();
 7   api.use(api.routes());
 8-  api.use(function* upstream(_, next) {
 9-    try {
10-      yield* next();
11-    } catch (_) {
12-      asserts.assert(true);
13+  api.use(function* upstream(ctx, next) {
14+    yield* next();
15+    if (!ctx.result.ok) {
16+      called = true;
17     }
18   });
19   api.use(function* fail() {
20@@ -206,9 +206,11 @@ it(tests, "error handling", async () => {
21   const store = await configureStore({ initialState: {} });
22   store.run(api.bootup);
23   store.dispatch(action());
24+  asserts.assertStrictEquals(called, true);
25 });
26 
27 it(tests, "error handling inside create", async () => {
28+  let called = false;
29   const api = createPipe<RoboCtx>();
30   api.use(api.routes());
31   api.use(function* fail() {
32@@ -218,31 +220,48 @@ it(tests, "error handling inside create", async () => {
33   const action = api.create(
34     `/error`,
35     { supervisor: takeEvery },
36-    function* (_, next) {
37-      try {
38-        yield* next();
39-      } catch (_) {
40-        asserts.assert(true);
41+    function* (ctx, next) {
42+      yield* next();
43+      if (!ctx.result.ok) {
44+        called = true;
45       }
46     },
47   );
48   const store = await configureStore({ initialState: {} });
49   store.run(api.bootup);
50   store.dispatch(action());
51+  asserts.assertStrictEquals(called, true);
52 });
53 
54-it(tests, "error handling - error handler", async () => {
55-  const api = createPipe<RoboCtx>();
56-  api.use(api.routes());
57-  api.use(function* upstream() {
58-    throw new Error("failure");
59+it(tests, "error inside endpoint mdw", async () => {
60+  let called = false;
61+  const query = createPipe();
62+  query.use(function* (ctx, next) {
63+    yield* next();
64+    if (!ctx.result.ok) {
65+      called = true;
66+    }
67   });
68 
69-  const action = api.create(`/error`, { supervisor: takeEvery });
70-  const store = await configureStore({ initialState: {} });
71-  store.run(api.bootup);
72+  query.use(query.routes());
73 
74-  store.dispatch(action());
75+  const fetchUsers = query.create(
76+    `/users`,
77+    { supervisor: takeEvery },
78+    function* processUsers() {
79+      throw new Error("some error");
80+    },
81+  );
82+
83+  const store = await configureStore({
84+    initialState: {
85+      ...createQueryState(),
86+      users: {},
87+    },
88+  });
89+  store.run(query.bootup);
90+  store.dispatch(fetchUsers());
91+  asserts.assertEquals(called, true);
92 });
93 
94 it(tests, "create fn is an array", async () => {
M query/pipe.ts
+5, -3
 1@@ -1,4 +1,3 @@
 2-import { call } from "../fx/mod.ts";
 3 import { compose } from "../compose.ts";
 4 import type { OpFn, Payload } from "../types.ts";
 5 import { parallel } from "../mod.ts";
 6@@ -20,6 +19,7 @@ import type {
 7   Supervisor,
 8 } from "./types.ts";
 9 import { API_ACTION_PREFIX } from "../action.ts";
10+import { Ok } from "../deps.ts";
11 
12 export interface SagaApi<Ctx extends PipeCtx> {
13   use: (fn: Middleware<Ctx>) => void;
14@@ -149,9 +149,10 @@ export function createPipe<Ctx extends PipeCtx = PipeCtx<any>>(
15       key,
16       payload: options,
17       actionFn,
18+      result: Ok(undefined),
19     } as unknown as Ctx;
20     const fn = compose(middleware);
21-    yield* call(() => fn(ctx));
22+    yield* fn(ctx);
23     return ctx;
24   }
25 
26@@ -226,7 +227,8 @@ export function createPipe<Ctx extends PipeCtx = PipeCtx<any>>(
27         return;
28       }
29 
30-      yield* call(() => match(ctx, next));
31+      const result = yield* match(ctx, next);
32+      return result;
33     }
34 
35     return router;
M query/types.ts
+2, -1
 1@@ -1,4 +1,4 @@
 2-import type { Operation } from "../deps.ts";
 3+import type { Operation, Result } from "../deps.ts";
 4 import type { LoadingItemState, LoadingPayload, Payload } from "../types.ts";
 5 
 6 type IfAny<T, Y, N> = 0 extends 1 & T ? Y : N;
 7@@ -12,6 +12,7 @@ export interface PipeCtx<P = any> extends Payload<P> {
 8     CreateAction<PipeCtx>,
 9     CreateActionWithPayload<PipeCtx<P>, P>
10   >;
11+  result: Result<unknown[]>;
12 }
13 
14 export interface LoaderCtx<P = unknown> extends PipeCtx<P> {
A result.ts
+14, -0
 1@@ -0,0 +1,14 @@
 2+import { Ok, Result } from "./deps.ts";
 3+
 4+export function resultAll<T>(results: Result<T>[]): Result<T[]> {
 5+  const agg: T[] = [];
 6+  for (let i = 0; i < results.length; i += 1) {
 7+    const result = results[i];
 8+    if (result.ok) {
 9+      agg.push(result.value);
10+    } else {
11+      return result;
12+    }
13+  }
14+  return Ok(agg);
15+}
M store/store.ts
+10, -4
 1@@ -1,6 +1,7 @@
 2 import {
 3   createScope,
 4   enablePatches,
 5+  Ok,
 6   produceWithPatches,
 7   Result,
 8   Scope,
 9@@ -75,6 +76,7 @@ export function createStore<S extends AnyState>({
10 
11     const [nextState, patches, _] = produceWithPatches(getState(), (draft) => {
12       // TODO: check for return value inside updater
13+      // deno-lint-ignore no-explicit-any
14       upds.forEach((updater) => updater(draft as any));
15     });
16     ctx.patches = patches;
17@@ -112,15 +114,17 @@ export function createStore<S extends AnyState>({
18     const ctx = {
19       updater,
20       patches: [],
21+      result: Ok([]),
22     };
23-    const result = yield* mdw(ctx);
24+    yield* mdw(ctx);
25     // TODO: dev mode only?
26-    if (!result.ok) {
27-      console.error(result);
28+    if (!ctx.result.ok) {
29+      console.error(ctx.result);
30     }
31-    return result;
32+    return ctx;
33   }
34 
35+  // deno-lint-ignore no-explicit-any
36   function dispatch(action: AnyAction | AnyAction[]): Task<any> {
37     return scope.run(function* () {
38       yield* put(action);
39@@ -143,6 +147,7 @@ export function createStore<S extends AnyState>({
40     // refer to pieces of business logic -- that can also mutate state
41     dispatch,
42     // stubs so `react-redux` is happy
43+    // deno-lint-ignore no-explicit-any
44     replaceReducer<S = any>(
45       _nextReducer: (_s: S, _a: AnyAction) => void,
46     ): void {
47@@ -156,6 +161,7 @@ export function register<S extends AnyState>(store: FxStore<S>) {
48   const scope = store.getScope();
49   return scope.run(function* () {
50     // TODO: fix type
51+    // deno-lint-ignore no-explicit-any
52     yield* StoreContext.set(store as any);
53   });
54 }
M store/types.ts
+7, -2
 1@@ -1,17 +1,19 @@
 2 import type { Operation, Patch, Result, Scope, Task } from "../deps.ts";
 3+import { BaseCtx } from "../mod.ts";
 4 import type { OpFn } from "../types.ts";
 5 
 6 export type StoreUpdater<S extends AnyState> = (s: S) => S | void;
 7 
 8 export type Listener = () => void;
 9 
10-export interface UpdaterCtx<S extends AnyState> {
11+export interface UpdaterCtx<S extends AnyState> extends BaseCtx {
12   updater: StoreUpdater<S> | StoreUpdater<S>[];
13   patches: Patch[];
14 }
15 
16 export interface AnyAction {
17   type: string;
18+  // deno-lint-ignore no-explicit-any
19   [key: string]: any;
20 }
21 
22@@ -20,6 +22,7 @@ export interface ActionWPayload<P> {
23   payload: P;
24 }
25 
26+// deno-lint-ignore no-explicit-any
27 export type AnyState = Record<string, any>;
28 
29 declare global {
30@@ -32,9 +35,11 @@ export interface FxStore<S extends AnyState> {
31   getScope: () => Scope;
32   getState: () => S;
33   subscribe: (fn: Listener) => () => void;
34-  update: (u: StoreUpdater<S>) => Operation<Result<UpdaterCtx<S>>>;
35+  update: (u: StoreUpdater<S>) => Operation<UpdaterCtx<S>>;
36   run: <T>(op: OpFn<T>) => Task<Result<T>>;
37+  // deno-lint-ignore no-explicit-any
38   dispatch: (a: AnyAction) => any;
39   replaceReducer: (r: (s: S, a: AnyAction) => S) => void;
40+  // deno-lint-ignore no-explicit-any
41   [Symbol.observable]: () => any;
42 }