repos / starfx

supercharged async flow control library.
git clone https://github.com/neurosnap/starfx.git

commit
cdc5721
parent
6d5ec93
author
Eric Bower
date
2023-07-30 15:30:16 +0000 UTC
fix: event inside takeEvery can trigger same event (#5)

This is a regression from `saga-query`.  This is an issue that came up
in `redux-saga`.

[See ref to original issue.](https://github.com/redux-saga/redux-saga/issues/277)

[Discussion in discord](https://discordapp.com/channels/700803887132704931/1108053742835736636)
7 files changed,  +114, -69
M deps.ts
+3, -6
 1@@ -12,8 +12,11 @@ export {
 2   createChannel,
 3   createContext,
 4   createScope,
 5+  Err,
 6   expect,
 7+  filter,
 8   getframe,
 9+  Ok,
10   resource,
11   run,
12   sleep,
13@@ -22,12 +25,6 @@ export {
14 } from "https://deno.land/x/effection@3.0.0-alpha.9/mod.ts";
15 
16 export type { Result };
17-export function Ok<T>(value: T): Result<T> {
18-  return { ok: true, value };
19-}
20-export function Err<T>(error: Error): Result<T> {
21-  return { ok: false, error };
22-}
23 
24 import React from "https://esm.sh/react@18.2.0?pin=v122";
25 export { React };
M iter.ts
+0, -11
 1@@ -14,14 +14,3 @@ export function* forEach<T>(
 2     }
 3   }
 4 }
 5-
 6-export function* map<T, R>(
 7-  values: T[],
 8-  each: (value: T) => Operation<R>,
 9-): Operation<R[]> {
10-  const results: R[] = [];
11-  for (const value of values) {
12-    results.push(yield* each(value));
13-  }
14-  return results;
15-}
M query/pipe.test.ts
+4, -5
 1@@ -372,7 +372,7 @@ it(tests, "middleware order of execution", async () => {
 2   asserts.assert(acc === "abcdefg");
 3 });
 4 
 5-it.ignore(tests, "retry with actionFn", async () => {
 6+it(tests, "retry with actionFn", async () => {
 7   let acc = "";
 8   let called = false;
 9 
10@@ -402,7 +402,7 @@ it.ignore(tests, "retry with actionFn", async () => {
11   asserts.assertEquals(acc, "agag");
12 });
13 
14-it.ignore(tests, "retry with actionFn with payload", async () => {
15+it(tests, "retry with actionFn with payload", async () => {
16   let acc = "";
17   const api = createPipe();
18   api.use(api.routes());
19@@ -425,12 +425,11 @@ it.ignore(tests, "retry with actionFn with payload", async () => {
20   );
21 
22   const store = await configureStore({ initialState: {} });
23-  const task = store.run(api.bootup);
24+  store.run(api.bootup);
25   store.dispatch(action({ page: 1 }));
26 
27   await sleep(150);
28-  asserts.assertEquals(acc, "aagg");
29-  await task;
30+  asserts.assertEquals(acc, "agag");
31 });
32 
33 it(tests, "should only call thunk once", async () => {
M store/fx.ts
+41, -40
  1@@ -1,4 +1,4 @@
  2-import { Channel, Operation, spawn, Task } from "../deps.ts";
  3+import { Channel, filter, Operation, spawn, Stream, Task } from "../deps.ts";
  4 import { call, parallel } from "../fx/mod.ts";
  5 import { ActionPattern, matcher } from "../matcher.ts";
  6 
  7@@ -6,6 +6,7 @@ import type {
  8   ActionWPayload,
  9   AnyAction,
 10   AnyState,
 11+  FxStore,
 12   StoreUpdater,
 13   UpdaterCtx,
 14 } from "./types.ts";
 15@@ -15,9 +16,10 @@ export function* updateStore<S extends AnyState>(
 16   updater: StoreUpdater<S> | StoreUpdater<S>[],
 17 ): Operation<UpdaterCtx<S>> {
 18   const store = yield* StoreContext;
 19-  const ctx = yield* store.update(updater as any);
 20-  // TODO: fix type
 21-  return ctx as any;
 22+  // had to cast the store since StoreContext has a generic store type
 23+  const st = store as FxStore<S>;
 24+  const ctx = yield* st.update(updater);
 25+  return ctx;
 26 }
 27 
 28 export function* emit({
 29@@ -32,31 +34,15 @@ export function* emit({
 30     if (action.length === 0) {
 31       return;
 32     }
 33-    yield* parallel(action.map((a) => () => input.send(a)));
 34+    const group = yield* parallel(
 35+      action.map((a) => () => input.send(a)),
 36+    );
 37+    yield* group;
 38   } else {
 39     yield* input.send(action);
 40   }
 41 }
 42 
 43-export function* once({
 44-  channel,
 45-  pattern,
 46-}: {
 47-  channel: Operation<Channel<AnyAction, void>>;
 48-  pattern: ActionPattern;
 49-}) {
 50-  const { output } = yield* channel;
 51-  const msgList = yield* output;
 52-  let next = yield* msgList.next();
 53-  while (!next.done) {
 54-    const match = matcher(pattern);
 55-    if (match(next.value)) {
 56-      return next.value;
 57-    }
 58-    next = yield* msgList.next();
 59-  }
 60-}
 61-
 62 export function* select<S, R, P>(selectorFn: (s: S, p?: P) => R, p?: P) {
 63   const store = yield* StoreContext;
 64   return selectorFn(store.getState() as S, p);
 65@@ -69,24 +55,35 @@ export function* put(action: AnyAction | AnyAction[]) {
 66   });
 67 }
 68 
 69+export function* useActions(pattern: ActionPattern): Stream<AnyAction, void> {
 70+  const match = matcher(pattern);
 71+  const { output } = yield* ActionContext;
 72+  // deno-lint-ignore require-yield
 73+  function* fn(a: AnyAction) {
 74+    return match(a);
 75+  }
 76+  // return a subscription to the filtered actions.
 77+  const result = yield* filter(fn)(output);
 78+  return result;
 79+}
 80+
 81 export function take<P>(pattern: ActionPattern): Operation<ActionWPayload<P>>;
 82 export function* take(pattern: ActionPattern): Operation<AnyAction> {
 83-  const action = yield* once({
 84-    channel: ActionContext,
 85-    pattern,
 86-  });
 87-  return action as AnyAction;
 88+  const actions = yield* useActions(pattern);
 89+  const first = yield* actions.next();
 90+  return first.value as AnyAction;
 91 }
 92 
 93-export function* takeEvery<T>(
 94+export function takeEvery<T>(
 95   pattern: ActionPattern,
 96   op: (action: AnyAction) => Operation<T>,
 97 ): Operation<Task<void>> {
 98-  return yield* spawn(function* () {
 99-    while (true) {
100-      const action = yield* take(pattern);
101-      if (!action) continue;
102-      yield* spawn(() => op(action));
103+  return spawn(function* () {
104+    const actions = yield* useActions(pattern);
105+    let next = yield* actions.next();
106+    while (!next.done) {
107+      yield* spawn(() => op(next.value as AnyAction));
108+      next = yield* actions.next();
109     }
110   });
111 }
112@@ -96,14 +93,18 @@ export function* takeLatest<T>(
113   op: (action: AnyAction) => Operation<T>,
114 ): Operation<Task<void>> {
115   return yield* spawn(function* () {
116-    let lastTask;
117+    const actions = yield* useActions(pattern);
118+
119+    let lastTask: Task<T> | undefined;
120     while (true) {
121-      const action = yield* take(pattern);
122+      const action = yield* actions.next();
123+      if (action.done) {
124+        return;
125+      }
126       if (lastTask) {
127         yield* lastTask.halt();
128       }
129-      if (!action) continue;
130-      lastTask = yield* spawn(() => op(action));
131+      lastTask = yield* spawn(() => op(action.value));
132     }
133   });
134 }
135@@ -112,7 +113,7 @@ export function* takeLeading<T>(
136   pattern: ActionPattern,
137   op: (action: AnyAction) => Operation<T>,
138 ): Operation<Task<void>> {
139-  return yield* spawn(function* () {
140+  return yield* spawn(function* (): Operation<void> {
141     while (true) {
142       const action = yield* take(pattern);
143       if (!action) continue;
M store/store.test.ts
+4, -4
 1@@ -1,4 +1,4 @@
 2-import { createScope } from "../deps.ts";
 3+import { createScope, Operation, Result } from "../deps.ts";
 4 import { parallel } from "../fx/mod.ts";
 5 import { asserts, describe, it } from "../test.ts";
 6 
 7@@ -61,7 +61,7 @@ it(
 8     const store = createStore({ scope, initialState });
 9     await register(store);
10 
11-    await scope.run(function* (): any {
12+    await scope.run(function* (): Operation<Result<void>[]> {
13       const result = yield* parallel([
14         function* () {
15           const store = yield* StoreContext;
16@@ -122,9 +122,9 @@ it(tests, "emit Action and update store", async () => {
17   const store = createStore({ scope, initialState });
18   await register(store);
19 
20-  await scope.run(function* (): any {
21+  await scope.run(function* (): Operation<void> {
22     const result = yield* parallel([
23-      function* (): any {
24+      function* (): Operation<void> {
25         const action = yield* take<UpdateUserProps>("UPDATE_USER");
26         yield* updateStore(updateUser(action.payload));
27       },
M store/take-helper.test.ts
+61, -2
 1@@ -1,11 +1,69 @@
 2 import { describe, expect, it } from "../test.ts";
 3+import { sleep } from "../deps.ts";
 4 
 5 import type { AnyAction } from "./types.ts";
 6-import { configureStore, take, takeEvery } from "./mod.ts";
 7+import { configureStore } from "./mod.ts";
 8+import { take, takeEvery, takeLatest, takeLeading } from "./fx.ts";
 9 
10 const testEvery = describe("takeEvery()");
11+const testLatest = describe("takeLatest()");
12+const testLeading = describe("takeLeading()");
13 
14-it(testEvery, "should work", async () => {
15+it(testLatest, "should cancel previous tasks and only use latest", async () => {
16+  const actual: string[] = [];
17+  function* worker(action: AnyAction) {
18+    if (action.payload !== "3") {
19+      yield* sleep(3000);
20+    }
21+    actual.push(action.payload);
22+  }
23+
24+  function* root() {
25+    const task = yield* takeLatest("ACTION", worker);
26+    yield* take("CANCEL_WATCHER");
27+    yield* task.halt();
28+  }
29+  const store = await configureStore({ initialState: {} });
30+  const task = store.run(root);
31+
32+  store.dispatch({ type: "ACTION", payload: "1" });
33+  store.dispatch({ type: "ACTION", payload: "2" });
34+  store.dispatch({ type: "ACTION", payload: "3" });
35+  store.dispatch({ type: "CANCEL_WATCHER" });
36+
37+  await task;
38+
39+  expect(actual).toEqual(["3"]);
40+});
41+
42+it(testLeading, "should keep first action and discard the rest", async () => {
43+  let called = 0;
44+  const actual: string[] = [];
45+  function* worker(action: AnyAction) {
46+    called += 1;
47+    yield* sleep(100);
48+    actual.push(action.payload);
49+  }
50+
51+  function* root() {
52+    const task = yield* takeLeading("ACTION", worker);
53+    yield* sleep(150);
54+    yield* task.halt();
55+  }
56+  const store = await configureStore({ initialState: {} });
57+  const task = store.run(root);
58+
59+  store.dispatch({ type: "ACTION", payload: "1" });
60+  store.dispatch({ type: "ACTION", payload: "2" });
61+  store.dispatch({ type: "ACTION", payload: "3" });
62+
63+  await task;
64+
65+  expect(actual).toEqual(["1"]);
66+  expect(called).toEqual(1);
67+});
68+
69+it(testEvery, "should receive all actions", async () => {
70   const loop = 10;
71   const actual: string[][] = [];
72 
73@@ -18,6 +76,7 @@ it(testEvery, "should work", async () => {
74     yield* task.halt();
75   }
76 
77+  // deno-lint-ignore require-yield
78   function* worker(arg1: string, arg2: string, action: AnyAction) {
79     actual.push([arg1, arg2, action.payload]);
80   }
M store/types.ts
+1, -1
1@@ -35,7 +35,7 @@ export interface FxStore<S extends AnyState> {
2   getScope: () => Scope;
3   getState: () => S;
4   subscribe: (fn: Listener) => () => void;
5-  update: (u: StoreUpdater<S>) => Operation<UpdaterCtx<S>>;
6+  update: (u: StoreUpdater<S> | StoreUpdater<S>[]) => Operation<UpdaterCtx<S>>;
7   run: <T>(op: OpFn<T>) => Task<Result<T>>;
8   // deno-lint-ignore no-explicit-any
9   dispatch: (a: AnyAction) => any;