repos / starfx

supercharged async flow control library.
git clone https://github.com/neurosnap/starfx.git

commit
e829ea4
parent
25afc98
author
Eric Bower
date
2023-04-20 01:13:03 +0000 UTC
chore(fx): replaced `all` with `parallel`
7 files changed,  +69, -78
D fx/all.ts
+0, -66
 1@@ -1,66 +0,0 @@
 2-import type { Channel, Operation, Result } from "../deps.ts";
 3-import { createChannel, resource, spawn } from "../deps.ts";
 4-import { safe } from "./call.ts";
 5-import type { Computation, OpFn } from "../types.ts";
 6-import { map } from "../iter.ts";
 7-
 8-import { toOperation } from "./call.ts";
 9-
10-export function all<T>(operations: OpFn<T>[]): Operation<T[]> {
11-  return resource(function* All(provide) {
12-    const tasks = yield* map(
13-      operations.map((o) => () => toOperation(o)),
14-      spawn,
15-    );
16-
17-    const results = yield* map(tasks, (task) => task);
18-
19-    yield* provide(results);
20-  });
21-}
22-
23-interface ParallelRet<T> extends Computation<Result<T>[]> {
24-  sequence: Channel<Result<T>, void>;
25-  immediate: Channel<Result<T>, void>;
26-}
27-
28-export function* parallel<T>(operations: OpFn<T>[]): Operation<ParallelRet<T>> {
29-  const sequence = createChannel<Result<T>>();
30-  const immediate = createChannel<Result<T>>();
31-  const results: Result<T>[] = [];
32-
33-  const task = yield* spawn(function* () {
34-    const tasks = [];
35-    for (const op of operations) {
36-      tasks.push(
37-        yield* spawn(function* () {
38-          const result = yield* safe(op);
39-          yield* immediate.input.send(result);
40-          return result;
41-        }),
42-      );
43-    }
44-
45-    for (const tsk of tasks) {
46-      const res = yield* tsk;
47-      results.push(res);
48-      yield* sequence.input.send(res);
49-    }
50-
51-    yield* sequence.input.close();
52-    yield* immediate.input.close();
53-  });
54-
55-  function* wait(): Operation<Result<T>[]> {
56-    yield* task;
57-    return results;
58-  }
59-
60-  return {
61-    sequence,
62-    immediate,
63-    *[Symbol.iterator]() {
64-      return yield* wait();
65-    },
66-  };
67-}
M fx/call.ts
+2, -2
1@@ -1,6 +1,6 @@
2-import type { Operation, Task } from "../deps.ts";
3-import { action, Err, expect, Ok, Result, spawn } from "../deps.ts";
4 import type { OpFn } from "../types.ts";
5+import type { Err, Ok, Operation, Result, Task } from "../deps.ts";
6+import { action, expect, spawn } from "../deps.ts";
7 import { ErrContext } from "../context.ts";
8 
9 export const isFunc = (f: unknown) => typeof f === "function";
M fx/emit.ts
+2, -2
 1@@ -1,7 +1,7 @@
 2 import type { Channel, Operation } from "../deps.ts";
 3 import type { Action } from "../types.ts";
 4 
 5-import { all } from "./all.ts";
 6+import { parallel } from "./parallel.ts";
 7 
 8 export function* emit({
 9   channel,
10@@ -15,7 +15,7 @@ export function* emit({
11     if (action.length === 0) {
12       return;
13     }
14-    yield* all(action.map((a) => () => input.send(a)));
15+    yield* parallel(action.map((a) => () => input.send(a)));
16   } else {
17     yield* input.send(action);
18   }
M fx/mod.ts
+1, -1
1@@ -1,4 +1,4 @@
2-export * from "./all.ts";
3+export * from "./parallel.ts";
4 export * from "./call.ts";
5 export * from "./cancel.ts";
6 export * from "./cancelled.ts";
A fx/parallel.ts
+52, -0
 1@@ -0,0 +1,52 @@
 2+import type { Channel, Operation, Result } from "../deps.ts";
 3+import type { Computation, OpFn } from "../types.ts";
 4+import { createChannel, resource, spawn } from "../deps.ts";
 5+import { safe } from "./call.ts";
 6+
 7+interface ParallelRet<T> extends Computation<Result<T>[]> {
 8+  sequence: Channel<Result<T>, void>;
 9+  immediate: Channel<Result<T>, void>;
10+}
11+
12+export function parallel<T>(operations: OpFn<T>[]) {
13+  const sequence = createChannel<Result<T>>();
14+  const immediate = createChannel<Result<T>>();
15+  const results: Result<T>[] = [];
16+
17+  return resource<ParallelRet<T>>(function* (provide) {
18+    const task = yield* spawn(function* () {
19+      const tasks = [];
20+      for (const op of operations) {
21+        tasks.push(
22+          yield* spawn(function* () {
23+            const result = yield* safe(op);
24+            yield* immediate.input.send(result);
25+            return result;
26+          }),
27+        );
28+      }
29+
30+      for (const tsk of tasks) {
31+        const res = yield* tsk;
32+        results.push(res);
33+        yield* sequence.input.send(res);
34+      }
35+
36+      yield* sequence.input.close();
37+      yield* immediate.input.close();
38+    });
39+
40+    function* wait(): Operation<Result<T>[]> {
41+      yield* task;
42+      return results;
43+    }
44+
45+    yield* provide({
46+      sequence,
47+      immediate,
48+      *[Symbol.iterator]() {
49+        return yield* wait();
50+      },
51+    });
52+  });
53+}
M fx/watch.ts
+3, -3
 1@@ -1,7 +1,7 @@
 2-import { OpFn } from "../types.ts";
 3+import type { OpFn } from "../types.ts";
 4 
 5 import { safe } from "./call.ts";
 6-import { all } from "./all.ts";
 7+import { parallel } from "./parallel.ts";
 8 
 9 export function supervise<T>(op: OpFn<T>) {
10   return function* () {
11@@ -12,5 +12,5 @@ export function supervise<T>(op: OpFn<T>) {
12 }
13 
14 export function* keepAlive(ops: OpFn[]) {
15-  return yield* all(ops.map(supervise));
16+  return yield* parallel(ops.map(supervise));
17 }
M redux.ts
+9, -4
 1@@ -1,9 +1,10 @@
 2 import type { Channel, Scope } from "./deps.ts";
 3+import type { Action, OpFn, StoreLike } from "./types.ts";
 4+import type { ActionPattern } from "./matcher.ts";
 5+
 6 import { createChannel, createContext, createScope } from "./deps.ts";
 7 import { contextualize } from "./context.ts";
 8-import { call, emit, once } from "./fx/mod.ts";
 9-import type { Action, OpFn, StoreLike } from "./types.ts";
10-import { ActionPattern } from "./matcher.ts";
11+import { call, emit, once, parallel } from "./fx/mod.ts";
12 
13 export const ActionContext = createContext<Channel<Action, void>>(
14   "redux:action",
15@@ -48,7 +49,11 @@ export function createFxMiddleware(scope: Scope = createScope()) {
16     return (next: (a: Action) => T) => (action: Action) => {
17       const result = next(action); // hit reducers
18       scope.run(function* () {
19-        yield* put(action);
20+        if (Array.isArray(action)) {
21+          yield* parallel(action.map((a) => () => put(a)));
22+        } else {
23+          yield* put(action);
24+        }
25       });
26       return result;
27     };