repos / starfx

supercharged async flow control library.
git clone https://github.com/neurosnap/starfx.git

commit
a4c160f
parent
544d852
author
Eric Bower
date
2023-11-10 01:17:16 +0000 UTC
refactor(redux): custom queue impl (#19)

17 files changed,  +188, -150
M mod.ts
M npm.ts
M README.md
+1, -1
1@@ -45,7 +45,7 @@ const task = main(function* () {
2     } else {
3       console.error(result.error);
4     }
5-    yield* each.next;
6+    yield* each.next();
7   }
8 });
9 
M deno.lock
+35, -35
 1@@ -109,41 +109,41 @@
 2     "https://deno.land/x/dnt@0.38.1/lib/utils.ts": "878b7ac7003a10c16e6061aa49dbef9b42bd43174853ebffc9b67ea47eeb11d8",
 3     "https://deno.land/x/dnt@0.38.1/mod.ts": "b13349fe77847cf58e26b40bcd58797a8cec5d71b31a1ca567071329c8489de1",
 4     "https://deno.land/x/dnt@0.38.1/transform.ts": "f68743a14cf9bf53bfc9c81073871d69d447a7f9e3453e0447ca2fb78926bb1d",
 5-    "https://deno.land/x/effection@3.0.0-beta.0/lib/abort-signal.ts": "8be1b331b2bc417d70fe4c07e0b806e89972b8eab519ce58beed7ec632ae9048",
 6-    "https://deno.land/x/effection@3.0.0-beta.0/lib/all.ts": "acadab8258228e290192f587c8c532428f9093337a9b7688ae55cbc2cacd5caf",
 7-    "https://deno.land/x/effection@3.0.0-beta.0/lib/async.ts": "3e007ef245abb240de07029f523c7ef74b9bc383db5716f89d261a5150295777",
 8-    "https://deno.land/x/effection@3.0.0-beta.0/lib/call.ts": "69c465573031e6315e375c17e01e820239e01a93107866dd9f5ef584b79d13dd",
 9-    "https://deno.land/x/effection@3.0.0-beta.0/lib/channel.ts": "cdd48c80c05dfbd9b248bd50bf16a80f973e51b5e35b219386e82a65eb4477b6",
10-    "https://deno.land/x/effection@3.0.0-beta.0/lib/context.ts": "108989ac839d6756e30f6c0afc458bfa3975dd0f970d5173b6b8f8473ce4c335",
11-    "https://deno.land/x/effection@3.0.0-beta.0/lib/deps.ts": "91062b4b97089a8cf36550d4f9605d325a0fd19bebc72d15524481a3b56ea669",
12-    "https://deno.land/x/effection@3.0.0-beta.0/lib/each.ts": "756f82d8b4ec43623776416b6de6558e7584bbfd437a6a35d44e088667842d32",
13-    "https://deno.land/x/effection@3.0.0-beta.0/lib/ensure.ts": "c3640cc12c1bc747a8a4086af476840db026d04ea22f45a697d53617b2b1cc66",
14-    "https://deno.land/x/effection@3.0.0-beta.0/lib/events.ts": "b33ef4b8b81c05c1c9b0d21b675f131fb7b0dd5021f440e64eb3a9e1e50aedf9",
15-    "https://deno.land/x/effection@3.0.0-beta.0/lib/filter.ts": "ad3ba0ce59923306620da410393c8f248fb9885a55c8cbd1f3670521cd96cc68",
16-    "https://deno.land/x/effection@3.0.0-beta.0/lib/first.ts": "6dfacc67b5c6b11bf1df1cf272491ed24a88431ea2eb8a36216b848fbfbe2e8e",
17-    "https://deno.land/x/effection@3.0.0-beta.0/lib/instructions.ts": "b88cd8bf32c8b11300046fcc8c76e6c77a8032eb5269b150fb53639732c68c93",
18-    "https://deno.land/x/effection@3.0.0-beta.0/lib/lazy.ts": "92ea526c5ad7d88290f2a87168e038d482f97421379508d85cf2e049ee60639b",
19-    "https://deno.land/x/effection@3.0.0-beta.0/lib/main.ts": "a0deaf1d1d958ef7a5821d8ac3dfbd190a47608d603798a5fc3b0c2309a724da",
20-    "https://deno.land/x/effection@3.0.0-beta.0/lib/map.ts": "26184b51211c87e37396f336c7a8212ea46d9fdea210df208127a6394eee69f9",
21-    "https://deno.land/x/effection@3.0.0-beta.0/lib/mod.ts": "fe2a2e685cc81fe24b1e15af52a17f213e5d062d48a664d6fd28071b13dda780",
22-    "https://deno.land/x/effection@3.0.0-beta.0/lib/op.ts": "483465a66fb9fa7529eb8cafbaf94c085b1a5e72876e5d34e44cd4f875b2141b",
23-    "https://deno.land/x/effection@3.0.0-beta.0/lib/pause.ts": "a690b0d67cf970c34f528df8c61d69eb43deda9817362776f6359f506dc0da45",
24-    "https://deno.land/x/effection@3.0.0-beta.0/lib/pipe.ts": "4a28fa93a1ba53661bafb84265f3fcb5614920bbecc0db1c261e1093da3b2cdf",
25-    "https://deno.land/x/effection@3.0.0-beta.0/lib/queue.ts": "714968cede633c5c336479ad62b695709e5b72076d38ac462caa2aa8ac49da06",
26-    "https://deno.land/x/effection@3.0.0-beta.0/lib/race.ts": "0c43f24ce5006768f5cbac8d6f5dc07848bafa625cc0bc6c24fb6a2f2a8808f2",
27-    "https://deno.land/x/effection@3.0.0-beta.0/lib/result.ts": "44e4bdadad155beb9bbfe41948819bbcb9e27a772283e52e89981bd6636a8687",
28-    "https://deno.land/x/effection@3.0.0-beta.0/lib/run.ts": "b85043bc8b30c0eb0d04654cdd07004b21145f2e3f59f52e39df76558e324ca4",
29-    "https://deno.land/x/effection@3.0.0-beta.0/lib/run/create.ts": "be9139af2fbe15908256d2d159dec8dca079f94cf02d488074c94fa26fc651fa",
30-    "https://deno.land/x/effection@3.0.0-beta.0/lib/run/frame.ts": "b9b85112e3168c0fc91e01b1df60f2e911ee1a44314678944d9cbfa71b0641de",
31-    "https://deno.land/x/effection@3.0.0-beta.0/lib/run/scope.ts": "45a6f4033ef3f5ccf2d450884221ffba0c4edb2d45ac2bedb36ebcffc9d4ec14",
32-    "https://deno.land/x/effection@3.0.0-beta.0/lib/run/task.ts": "b4b019d6e32d4c22c83ea9d1cfd64a3d587587080d459aec00aa9e6ba9d49b2a",
33-    "https://deno.land/x/effection@3.0.0-beta.0/lib/run/types.ts": "c6e413c941613e364604643a084eb432a7ba3ec468e5b0f5341c09b2f54510ef",
34-    "https://deno.land/x/effection@3.0.0-beta.0/lib/run/value.ts": "d57428b45dfeecc9df1e68dadf8697dbc33cd412e6ffcab9d0ba4368e8c1fbd6",
35-    "https://deno.land/x/effection@3.0.0-beta.0/lib/shift-sync.ts": "74ecefa9cb2e145a3c52f363319f8d6296b804600852044b7d14bd53bc10b512",
36-    "https://deno.land/x/effection@3.0.0-beta.0/lib/signal.ts": "935dabb018ae29a0d817819a6eb8c461a5a96e0e302198652a3ff07bb9e21789",
37-    "https://deno.land/x/effection@3.0.0-beta.0/lib/sleep.ts": "44e3a80248dad7a47066a99a7daec9b318e87d5d211adf27776145544d455689",
38-    "https://deno.land/x/effection@3.0.0-beta.0/lib/types.ts": "779952d3537a99545ab78307326ee14b77ff778b0ba3075489f31982d7eccb60",
39-    "https://deno.land/x/effection@3.0.0-beta.0/mod.ts": "ffae461c16d4a1bf24c2179582ab8d5c81ad0df61e4ae2fba51ef5e5bdf90345",
40+    "https://deno.land/x/effection@3.0.0-beta.2/lib/abort-signal.ts": "8be1b331b2bc417d70fe4c07e0b806e89972b8eab519ce58beed7ec632ae9048",
41+    "https://deno.land/x/effection@3.0.0-beta.2/lib/all.ts": "acadab8258228e290192f587c8c532428f9093337a9b7688ae55cbc2cacd5caf",
42+    "https://deno.land/x/effection@3.0.0-beta.2/lib/async.ts": "086b27b253be944c47c633d105f1657e243cd8c0d35b9a0dc5383528d7235dde",
43+    "https://deno.land/x/effection@3.0.0-beta.2/lib/call.ts": "1ab6649c2944b72ffd27a495712562311abf414c548ada9cc1c8edea96c46d37",
44+    "https://deno.land/x/effection@3.0.0-beta.2/lib/channel.ts": "f86b36666463f8f86fc1ac1726a94f0f08dc05559ba710b8eb93581b2b8588e6",
45+    "https://deno.land/x/effection@3.0.0-beta.2/lib/context.ts": "108989ac839d6756e30f6c0afc458bfa3975dd0f970d5173b6b8f8473ce4c335",
46+    "https://deno.land/x/effection@3.0.0-beta.2/lib/deps.ts": "91062b4b97089a8cf36550d4f9605d325a0fd19bebc72d15524481a3b56ea669",
47+    "https://deno.land/x/effection@3.0.0-beta.2/lib/each.ts": "9689346d1db3fedcd87d48c70be5515ad3e18fa4b894755fa53910fb8ad356f3",
48+    "https://deno.land/x/effection@3.0.0-beta.2/lib/ensure.ts": "c3640cc12c1bc747a8a4086af476840db026d04ea22f45a697d53617b2b1cc66",
49+    "https://deno.land/x/effection@3.0.0-beta.2/lib/events.ts": "d962e7403d62948642f5a3161f611f4375932aa8702050575f0d538aab7c3467",
50+    "https://deno.land/x/effection@3.0.0-beta.2/lib/filter.ts": "39f349ee921ba718cf3259e05003255eeeafbb5ca6e437d2d269b1805da2236e",
51+    "https://deno.land/x/effection@3.0.0-beta.2/lib/first.ts": "5bc321069d2e2b87b6623f626a929d5d5ba32bca32ee03b37bdc1a64722eebb9",
52+    "https://deno.land/x/effection@3.0.0-beta.2/lib/instructions.ts": "5fd8638e385068adc6c1a896bba02b736d7c2c26e5124d3d063fdbcaf140abec",
53+    "https://deno.land/x/effection@3.0.0-beta.2/lib/lazy.ts": "92ea526c5ad7d88290f2a87168e038d482f97421379508d85cf2e049ee60639b",
54+    "https://deno.land/x/effection@3.0.0-beta.2/lib/lift.ts": "0c622bf0359f92235547b57efa66139b265a7b259428e6883469de0b3af32f5d",
55+    "https://deno.land/x/effection@3.0.0-beta.2/lib/main.ts": "a0deaf1d1d958ef7a5821d8ac3dfbd190a47608d603798a5fc3b0c2309a724da",
56+    "https://deno.land/x/effection@3.0.0-beta.2/lib/map.ts": "1a0c369dad53affc4b798a04142de637a75f981385acafcafd26bdc569675bc2",
57+    "https://deno.land/x/effection@3.0.0-beta.2/lib/mod.ts": "f7189b02d008baba1166d33779379b12f7104e0b6d373194270ac126a73ba82d",
58+    "https://deno.land/x/effection@3.0.0-beta.2/lib/pause.ts": "a690b0d67cf970c34f528df8c61d69eb43deda9817362776f6359f506dc0da45",
59+    "https://deno.land/x/effection@3.0.0-beta.2/lib/pipe.ts": "4a28fa93a1ba53661bafb84265f3fcb5614920bbecc0db1c261e1093da3b2cdf",
60+    "https://deno.land/x/effection@3.0.0-beta.2/lib/queue.ts": "80c6234cb6eaba9fd1abdae077e73f51897b099ea54f852b9a744e8eba51302f",
61+    "https://deno.land/x/effection@3.0.0-beta.2/lib/race.ts": "0c43f24ce5006768f5cbac8d6f5dc07848bafa625cc0bc6c24fb6a2f2a8808f2",
62+    "https://deno.land/x/effection@3.0.0-beta.2/lib/result.ts": "44e4bdadad155beb9bbfe41948819bbcb9e27a772283e52e89981bd6636a8687",
63+    "https://deno.land/x/effection@3.0.0-beta.2/lib/run.ts": "b85043bc8b30c0eb0d04654cdd07004b21145f2e3f59f52e39df76558e324ca4",
64+    "https://deno.land/x/effection@3.0.0-beta.2/lib/run/create.ts": "be9139af2fbe15908256d2d159dec8dca079f94cf02d488074c94fa26fc651fa",
65+    "https://deno.land/x/effection@3.0.0-beta.2/lib/run/frame.ts": "b9b85112e3168c0fc91e01b1df60f2e911ee1a44314678944d9cbfa71b0641de",
66+    "https://deno.land/x/effection@3.0.0-beta.2/lib/run/scope.ts": "0e164df8b9825ac1aef3ff1e35a85cf6c82ac48318ba9942d76bf477337895ca",
67+    "https://deno.land/x/effection@3.0.0-beta.2/lib/run/task.ts": "b4b019d6e32d4c22c83ea9d1cfd64a3d587587080d459aec00aa9e6ba9d49b2a",
68+    "https://deno.land/x/effection@3.0.0-beta.2/lib/run/types.ts": "c6e413c941613e364604643a084eb432a7ba3ec468e5b0f5341c09b2f54510ef",
69+    "https://deno.land/x/effection@3.0.0-beta.2/lib/run/value.ts": "d57428b45dfeecc9df1e68dadf8697dbc33cd412e6ffcab9d0ba4368e8c1fbd6",
70+    "https://deno.land/x/effection@3.0.0-beta.2/lib/shift-sync.ts": "74ecefa9cb2e145a3c52f363319f8d6296b804600852044b7d14bd53bc10b512",
71+    "https://deno.land/x/effection@3.0.0-beta.2/lib/signal.ts": "da723b43b6bd61ea86dab991e9a6c6249a61d3b1c3c98ef473b160c9383e7d07",
72+    "https://deno.land/x/effection@3.0.0-beta.2/lib/sleep.ts": "44e3a80248dad7a47066a99a7daec9b318e87d5d211adf27776145544d455689",
73+    "https://deno.land/x/effection@3.0.0-beta.2/lib/types.ts": "4595c09ccfaae87c5a1d12006c23e5f4083fcd5658c322350f27801a9a9cb348",
74+    "https://deno.land/x/effection@3.0.0-beta.2/mod.ts": "ffae461c16d4a1bf24c2179582ab8d5c81ad0df61e4ae2fba51ef5e5bdf90345",
75     "https://deno.land/x/expect@v0.3.0/expect.ts": "5e6717eddc9df376f7b2c9be6403e016130bb2edbb1acd261a2d6ea9608ee196",
76     "https://deno.land/x/expect@v0.3.0/matchers.ts": "a37ef4577739247af77a852cdcd69484f999a41ad86ec16bb63a88a7a47a2372",
77     "https://deno.land/x/expect@v0.3.0/mock.ts": "562d4b1d735d15b0b8e935f342679096b64fe452f86e96714fe8616c0c884914",
M deps.ts
+6, -4
 1@@ -2,15 +2,17 @@ export type {
 2   Channel,
 3   Instruction,
 4   Operation,
 5-  Port,
 6   Predicate,
 7+  Queue,
 8+  Reject,
 9+  Resolve,
10   Result,
11   Scope,
12   Signal,
13   Stream,
14   Subscription,
15   Task,
16-} from "https://deno.land/x/effection@3.0.0-beta.0/mod.ts";
17+} from "https://deno.land/x/effection@3.0.0-beta.2/mod.ts";
18 export {
19   action,
20   createChannel,
21@@ -23,15 +25,15 @@ export {
22   expect,
23   filter,
24   getframe,
25-  main,
26   Ok,
27   resource,
28   run,
29+  SignalQueueFactory,
30   sleep,
31   spawn,
32   suspend,
33   useAbortSignal,
34-} from "https://deno.land/x/effection@3.0.0-beta.0/mod.ts";
35+} from "https://deno.land/x/effection@3.0.0-beta.2/mod.ts";
36 
37 import React from "https://esm.sh/react@18.2.0?pin=v122";
38 export { React };
M fx/parallel.test.ts
+4, -5
 1@@ -40,9 +40,9 @@ it(
 2       ]);
 3 
 4       const res: Result<string>[] = [];
 5-      for (const val of yield* each(results.immediate.output)) {
 6+      for (const val of yield* each(results.immediate)) {
 7         res.push(val);
 8-        yield* each.next;
 9+        yield* each.next();
10       }
11 
12       yield* results;
13@@ -70,10 +70,9 @@ it(
14       ]);
15 
16       const res: Result<string>[] = [];
17-      const { output } = results.sequence;
18-      for (const val of yield* each(output)) {
19+      for (const val of yield* each(results.sequence)) {
20         res.push(val);
21-        yield* each.next;
22+        yield* each.next();
23       }
24 
25       yield* results;
M fx/parallel.ts
+4, -4
 1@@ -21,7 +21,7 @@ export function parallel<T>(operations: OpFn<T>[]) {
 2         tasks.push(
 3           yield* spawn(function* () {
 4             const result = yield* safe(op);
 5-            yield* immediate.input.send(result);
 6+            yield* immediate.send(result);
 7             return result;
 8           }),
 9         );
10@@ -30,11 +30,11 @@ export function parallel<T>(operations: OpFn<T>[]) {
11       for (const tsk of tasks) {
12         const res = yield* tsk;
13         results.push(res);
14-        yield* sequence.input.send(res);
15+        yield* sequence.send(res);
16       }
17 
18-      yield* sequence.input.close();
19-      yield* immediate.input.close();
20+      yield* sequence.close();
21+      yield* immediate.close();
22     });
23 
24     function* wait(): Operation<Result<T>[]> {
M matcher.ts
+9, -10
 1@@ -1,51 +1,50 @@
 2 import type { AnyAction } from "./types.ts";
 3-import type { Predicate } from "./deps.ts";
 4 
 5 type ActionType = string;
 6 type GuardPredicate<G extends T, T = unknown> = (arg: T) => arg is G;
 7-type APredicate = (action: AnyAction) => boolean;
 8+type Predicate = (action: AnyAction) => boolean;
 9 type StringableActionCreator<A extends AnyAction = AnyAction> = {
10   (...args: unknown[]): A;
11   toString(): string;
12 };
13-type SubPattern = APredicate | StringableActionCreator | ActionType;
14+type SubPattern = Predicate | StringableActionCreator | ActionType;
15 export type Pattern = SubPattern | SubPattern[];
16 type ActionSubPattern<Guard extends AnyAction = AnyAction> =
17   | GuardPredicate<Guard, AnyAction>
18   | StringableActionCreator<Guard>
19-  | APredicate
20+  | Predicate
21   | ActionType;
22 export type ActionPattern<Guard extends AnyAction = AnyAction> =
23   | ActionSubPattern<Guard>
24   | ActionSubPattern<Guard>[];
25 
26-export function matcher(pattern: ActionPattern): Predicate<AnyAction> {
27+export function matcher(pattern: ActionPattern): Predicate {
28   if (pattern === "*") {
29-    return function* (input) {
30+    return function (input) {
31       return !!input;
32     };
33   }
34 
35   if (typeof pattern === "string") {
36-    return function* (input) {
37+    return function (input) {
38       return pattern === input.type;
39     };
40   }
41 
42   if (Array.isArray(pattern)) {
43-    return function* (input) {
44+    return function (input) {
45       return pattern.some((p) => matcher(p)(input));
46     };
47   }
48 
49   if (typeof pattern === "function" && Object.hasOwn(pattern, "toString")) {
50-    return function* (input) {
51+    return function (input) {
52       return pattern.toString() === input.type;
53     };
54   }
55 
56   if (typeof pattern === "function") {
57-    return function* (input) {
58+    return function (input) {
59       return pattern(input) as boolean;
60     };
61   }
M mod.ts
+0, -2
 1@@ -13,7 +13,6 @@ export {
 2   each,
 3   Err,
 4   getframe,
 5-  main,
 6   Ok,
 7   resource,
 8   run,
 9@@ -25,7 +24,6 @@ export type {
10   Channel,
11   Instruction,
12   Operation,
13-  Port,
14   Result,
15   Scope,
16   Stream,
M npm.ts
+2, -2
 1@@ -32,9 +32,9 @@ async function main() {
 2       },
 3     ],
 4     mappings: {
 5-      "https://deno.land/x/effection@3.0.0-beta.0/mod.ts": {
 6+      "https://deno.land/x/effection@3.0.0-beta.2/mod.ts": {
 7         name: "effection",
 8-        version: "3.0.0-beta.0",
 9+        version: "3.0.0-beta.2",
10       },
11       "https://esm.sh/react@18.2.0?pin=v122": {
12         name: "react",
A queue.ts
+14, -0
 1@@ -0,0 +1,14 @@
 2+import { createQueue } from "./deps.ts";
 3+
 4+export function createFilterQueue<T, TClose>(predicate: (v: T) => boolean) {
 5+  const queue = createQueue<T, TClose>();
 6+
 7+  return {
 8+    ...queue,
 9+    add(value: T) {
10+      if (predicate(value)) {
11+        queue.add(value);
12+      }
13+    },
14+  };
15+}
M redux/fx.ts
+41, -16
  1@@ -1,9 +1,14 @@
  2-import type { Action, Operation, Signal } from "../deps.ts";
  3-import { createContext, each, filter, spawn } from "../deps.ts";
  4+import type { Action, Operation, Queue, Signal, Stream } from "../deps.ts";
  5+import {
  6+  createContext,
  7+  createQueue,
  8+  each,
  9+  SignalQueueFactory,
 10+  spawn,
 11+} from "../deps.ts";
 12 import { call } from "../fx/mod.ts";
 13 import { ActionPattern, matcher } from "../matcher.ts";
 14 import type { ActionWPayload, AnyAction } from "../types.ts";
 15-
 16 import type { StoreLike } from "./types.ts";
 17 
 18 export const ActionContext = createContext<Signal<Action, void>>(
 19@@ -11,6 +16,21 @@ export const ActionContext = createContext<Signal<Action, void>>(
 20 );
 21 export const StoreContext = createContext<StoreLike>("redux:store");
 22 
 23+function createFilterQueue<T, TClose>(
 24+  predicate: (a: T) => boolean,
 25+): Queue<T, TClose> {
 26+  const queue = createQueue<T, TClose>();
 27+
 28+  return {
 29+    ...queue,
 30+    add(value: T) {
 31+      if (predicate(value)) {
 32+        queue.add(value);
 33+      }
 34+    },
 35+  };
 36+}
 37+
 38 export function* put(action: AnyAction | AnyAction[]) {
 39   const store = yield* StoreContext;
 40   if (Array.isArray(action)) {
 41@@ -42,16 +62,22 @@ export function* select<S, R>(selectorFn: (s: S) => R) {
 42   return selectorFn(store.getState() as S);
 43 }
 44 
 45-function* createPatternStream(pattern: ActionPattern) {
 46-  const signal = yield* ActionContext;
 47-  const match = matcher(pattern);
 48-  const fd = filter(match)(signal.stream);
 49-  return fd;
 50+function useActions(pattern: ActionPattern): Stream<AnyAction, void> {
 51+  return {
 52+    *subscribe() {
 53+      const actions = yield* ActionContext;
 54+      const match = matcher(pattern);
 55+      yield* SignalQueueFactory.set(() =>
 56+        createFilterQueue<AnyAction, void>(match) as any
 57+      );
 58+      return yield* actions.subscribe();
 59+    },
 60+  };
 61 }
 62 
 63 export function take<P>(pattern: ActionPattern): Operation<ActionWPayload<P>>;
 64 export function* take(pattern: ActionPattern): Operation<Action> {
 65-  const fd = yield* createPatternStream(pattern);
 66+  const fd = useActions(pattern);
 67   for (const action of yield* each(fd)) {
 68     return action;
 69   }
 70@@ -64,10 +90,10 @@ export function* takeEvery<T>(
 71   op: (action: Action) => Operation<T>,
 72 ) {
 73   return yield* spawn(function* (): Operation<void> {
 74-    const fd = yield* createPatternStream(pattern);
 75+    const fd = useActions(pattern);
 76     for (const action of yield* each(fd)) {
 77       yield* spawn(() => op(action));
 78-      yield* each.next;
 79+      yield* each.next();
 80     }
 81   });
 82 }
 83@@ -77,7 +103,7 @@ export function* takeLatest<T>(
 84   op: (action: Action) => Operation<T>,
 85 ) {
 86   return yield* spawn(function* (): Operation<void> {
 87-    const fd = yield* createPatternStream(pattern);
 88+    const fd = useActions(pattern);
 89     let lastTask;
 90 
 91     for (const action of yield* each(fd)) {
 92@@ -85,7 +111,7 @@ export function* takeLatest<T>(
 93         yield* lastTask.halt();
 94       }
 95       lastTask = yield* spawn(() => op(action));
 96-      yield* each.next;
 97+      yield* each.next();
 98     }
 99   });
100 }
101@@ -96,10 +122,9 @@ export function* takeLeading<T>(
102   op: (action: Action) => Operation<T>,
103 ) {
104   return yield* spawn(function* (): Operation<void> {
105-    const fd = yield* createPatternStream(pattern);
106-    for (const action of yield* each(fd)) {
107+    while (true) {
108+      const action = yield* take(pattern);
109       yield* call(() => op(action));
110-      yield* each.next;
111     }
112   });
113 }
M redux/put.test.ts
+2, -2
 1@@ -12,9 +12,9 @@ it(putTests, "should send actions through channel", async () => {
 2   function* genFn(arg: string) {
 3     yield* spawn(function* () {
 4       const actions = yield* ActionContext;
 5-      for (const action of yield* each(actions.stream)) {
 6+      for (const action of yield* each(actions)) {
 7         actual.push(action.type);
 8-        yield* each.next;
 9+        yield* each.next();
10       }
11     });
12 
M store/context.ts
+2, -3
 1@@ -1,11 +1,10 @@
 2-import { Channel, createChannel, createContext } from "../deps.ts";
 3+import { Channel, createChannel, createContext, Signal } from "../deps.ts";
 4 import type { AnyAction, AnyState } from "../types.ts";
 5 
 6 import type { FxStore } from "./types.ts";
 7 
 8-export const ActionContext = createContext<Channel<AnyAction, void>>(
 9+export const ActionContext = createContext<Signal<AnyAction, void>>(
10   "store:action",
11-  createChannel<AnyAction, void>(),
12 );
13 
14 export const StoreUpdateContext = createContext<Channel<void, void>>(
M store/fx.ts
+53, -45
  1@@ -1,10 +1,18 @@
  2-import { Channel, filter, Operation, spawn, Stream, Task } from "../deps.ts";
  3-import { call, parallel } from "../fx/mod.ts";
  4+import {
  5+  Action,
  6+  each,
  7+  Operation,
  8+  Signal,
  9+  SignalQueueFactory,
 10+  spawn,
 11+  Stream,
 12+} from "../deps.ts";
 13+import { call } from "../fx/mod.ts";
 14 import { ActionPattern, matcher } from "../matcher.ts";
 15 import type { ActionWPayload, AnyAction, AnyState } from "../types.ts";
 16-
 17 import type { FxStore, StoreUpdater, UpdaterCtx } from "./types.ts";
 18 import { ActionContext, StoreContext } from "./context.ts";
 19+import { createFilterQueue } from "../queue.ts";
 20 
 21 export function* updateStore<S extends AnyState>(
 22   updater: StoreUpdater<S> | StoreUpdater<S>[],
 23@@ -17,23 +25,19 @@ export function* updateStore<S extends AnyState>(
 24 }
 25 
 26 export function* emit({
 27-  channel,
 28+  signal,
 29   action,
 30 }: {
 31-  channel: Operation<Channel<AnyAction, void>>;
 32+  signal: Signal<AnyAction, void>;
 33   action: AnyAction | AnyAction[];
 34 }) {
 35-  const { input } = yield* channel;
 36   if (Array.isArray(action)) {
 37     if (action.length === 0) {
 38       return;
 39     }
 40-    const group = yield* parallel(
 41-      action.map((a) => () => input.send(a)),
 42-    );
 43-    yield* group;
 44+    action.map((a) => signal.send(a));
 45   } else {
 46-    yield* input.send(action);
 47+    signal.send(action);
 48   }
 49 }
 50 
 51@@ -51,71 +55,75 @@ export function* select<S, R, P>(
 52 }
 53 
 54 export function* put(action: AnyAction | AnyAction[]) {
 55+  const signal = yield* ActionContext;
 56   return yield* emit({
 57-    channel: ActionContext,
 58+    signal,
 59     action,
 60   });
 61 }
 62 
 63-export function* useActions(pattern: ActionPattern): Stream<AnyAction, void> {
 64-  const { output } = yield* ActionContext;
 65-  const match = matcher(pattern);
 66-  // return a subscription to the filtered actions.
 67-  const result = yield* filter(match)(output);
 68-  return result;
 69+function useActions(pattern: ActionPattern): Stream<AnyAction, void> {
 70+  return {
 71+    *subscribe() {
 72+      const actions = yield* ActionContext;
 73+      const match = matcher(pattern);
 74+      yield* SignalQueueFactory.set(() => createFilterQueue(match) as any);
 75+      return yield* actions.subscribe();
 76+    },
 77+  };
 78 }
 79 
 80 export function take<P>(pattern: ActionPattern): Operation<ActionWPayload<P>>;
 81-export function* take(pattern: ActionPattern): Operation<AnyAction> {
 82-  const actions = yield* useActions(pattern);
 83-  const first = yield* actions.next();
 84-  return first.value as AnyAction;
 85+export function* take(pattern: ActionPattern): Operation<Action> {
 86+  const fd = useActions(pattern);
 87+  for (const action of yield* each(fd)) {
 88+    return action;
 89+  }
 90+
 91+  return { type: "take failed, this should not be possible" };
 92 }
 93 
 94-export function takeEvery<T>(
 95+export function* takeEvery<T>(
 96   pattern: ActionPattern,
 97-  op: (action: AnyAction) => Operation<T>,
 98-): Operation<Task<void>> {
 99-  return spawn(function* () {
100-    const actions = yield* useActions(pattern);
101-    let next = yield* actions.next();
102-    while (!next.done) {
103-      yield* spawn(() => op(next.value as AnyAction));
104-      next = yield* actions.next();
105+  op: (action: Action) => Operation<T>,
106+) {
107+  return yield* spawn(function* (): Operation<void> {
108+    const fd = useActions(pattern);
109+    for (const action of yield* each(fd)) {
110+      yield* spawn(() => op(action));
111+      yield* each.next();
112     }
113   });
114 }
115 
116 export function* takeLatest<T>(
117   pattern: ActionPattern,
118-  op: (action: AnyAction) => Operation<T>,
119-): Operation<Task<void>> {
120-  return yield* spawn(function* () {
121-    const actions = yield* useActions(pattern);
122+  op: (action: Action) => Operation<T>,
123+) {
124+  return yield* spawn(function* (): Operation<void> {
125+    const fd = useActions(pattern);
126+    let lastTask;
127 
128-    let lastTask: Task<T> | undefined;
129-    while (true) {
130-      const action = yield* actions.next();
131-      if (action.done) {
132-        return;
133-      }
134+    for (const action of yield* each(fd)) {
135       if (lastTask) {
136         yield* lastTask.halt();
137       }
138-      lastTask = yield* spawn(() => op(action.value));
139+      lastTask = yield* spawn(() => op(action));
140+      yield* each.next();
141     }
142   });
143 }
144+export const latest = takeLatest;
145 
146 export function* takeLeading<T>(
147   pattern: ActionPattern,
148-  op: (action: AnyAction) => Operation<T>,
149-): Operation<Task<void>> {
150+  op: (action: Action) => Operation<T>,
151+) {
152   return yield* spawn(function* (): Operation<void> {
153     while (true) {
154       const action = yield* take(pattern);
155-      if (!action) continue;
156       yield* call(() => op(action));
157     }
158   });
159 }
160+export const leading = takeLeading;
M store/put.test.ts
+8, -8
 1@@ -1,5 +1,5 @@
 2 import { describe, expect, it } from "../test.ts";
 3-import { sleep, spawn } from "../deps.ts";
 4+import { each, sleep, spawn } from "../deps.ts";
 5 
 6 import { ActionContext, put, take } from "./mod.ts";
 7 import { configureStore } from "./store.ts";
 8@@ -10,13 +10,11 @@ it(putTests, "should send actions through channel", async () => {
 9   const actual: string[] = [];
10 
11   function* genFn(arg: string) {
12-    yield* spawn(function* () {
13-      const actions = yield* ActionContext;
14-      const msgs = yield* actions.output;
15-      let action = yield* msgs.next();
16-      while (!action.done) {
17-        actual.push(action.value.type);
18-        action = yield* msgs.next();
19+    const actions = yield* ActionContext;
20+    const task = yield* spawn(function* () {
21+      for (const action of yield* each(actions)) {
22+        actual.push(action.type);
23+        yield* each.next();
24       }
25     });
26 
27@@ -26,6 +24,8 @@ it(putTests, "should send actions through channel", async () => {
28     yield* put({
29       type: "2",
30     });
31+    actions.close();
32+    yield* task;
33   }
34 
35   const store = configureStore({ initialState: {} });
M store/store.test.ts
+1, -1
1@@ -65,7 +65,7 @@ it(
2         function* () {
3           const store = yield* StoreContext;
4           const chan = yield* StoreUpdateContext;
5-          const msgList = yield* chan.output;
6+          const msgList = yield* chan.subscribe();
7           yield* msgList.next();
8           asserts.assertEquals(store.getState(), {
9             users: { 1: { id: "1", name: "eric" }, 3: { id: "", name: "" } },
M store/store.ts
+5, -3
 1@@ -1,5 +1,6 @@
 2 import {
 3   createScope,
 4+  createSignal,
 5   enablePatches,
 6   Ok,
 7   produceWithPatches,
 8@@ -11,9 +12,8 @@ import { BaseMiddleware, compose } from "../compose.ts";
 9 import type { AnyAction, AnyState, OpFn } from "../types.ts";
10 import { safe } from "../fx/mod.ts";
11 import { Next } from "../query/types.ts";
12-
13 import type { FxStore, Listener, StoreUpdater, UpdaterCtx } from "./types.ts";
14-import { StoreContext, StoreUpdateContext } from "./context.ts";
15+import { ActionContext, StoreContext, StoreUpdateContext } from "./context.ts";
16 import { put } from "./fx.ts";
17 
18 const stubMsg = "This is merely a stub, not implemented";
19@@ -90,7 +90,7 @@ export function createStore<S extends AnyState>({
20 
21   function* notifyChannelMdw(_: UpdaterCtx<S>, next: Next) {
22     const chan = yield* StoreUpdateContext;
23-    yield* chan.input.send();
24+    yield* chan.send();
25     yield* next();
26   }
27 
28@@ -166,7 +166,9 @@ export function configureStore<S extends AnyState>(
29   props: CreateStore<S>,
30 ): FxStore<S> {
31   const store = createStore<S>(props);
32+  const signal = createSignal<AnyAction, void>();
33   // deno-lint-ignore no-explicit-any
34   store.getScope().set(StoreContext, store as any);
35+  store.getScope().set(ActionContext, signal);
36   return store;
37 }
M store/supervisor.ts
+1, -9
 1@@ -1,5 +1,5 @@
 2 import { call, race } from "../fx/mod.ts";
 3-import { take, takeLatest, takeLeading } from "./fx.ts";
 4+import { take } from "./fx.ts";
 5 import { Operation, sleep, spawn, Task } from "../deps.ts";
 6 import type { ActionWPayload, AnyAction, OpFn } from "../types.ts";
 7 import type { CreateActionPayload } from "../query/mod.ts";
 8@@ -8,14 +8,6 @@ const MS = 1000;
 9 const SECONDS = 1 * MS;
10 const MINUTES = 60 * SECONDS;
11 
12-export function* latest(action: string, saga: any) {
13-  yield takeLatest(`${action}`, saga);
14-}
15-
16-export function* leading(action: string, saga: any) {
17-  yield takeLeading(`${action}`, saga);
18-}
19-
20 export function poll(parentTimer: number = 5 * 1000, cancelType?: string) {
21   return function* poller<T>(
22     actionType: string,