repos / starfx

supercharged async flow control library.
git clone https://github.com/neurosnap/starfx.git

commit
25afc98
parent
1294504
author
Eric Bower
date
2023-04-17 20:09:15 +0000 UTC
feat: parallel fx
5 files changed,  +95, -88
M deps.ts
+1, -0
1@@ -2,6 +2,7 @@ import React from "https://esm.sh/react@18.2.0";
2 export { React };
3 export type {
4   Channel,
5+  Instruction,
6   Operation,
7   Result,
8   Scope,
M fx/all.ts
+9, -4
 1@@ -1,7 +1,7 @@
 2 import type { Channel, Operation, Result } from "../deps.ts";
 3 import { createChannel, resource, spawn } from "../deps.ts";
 4 import { safe } from "./call.ts";
 5-import type { OpFn } from "../types.ts";
 6+import type { Computation, OpFn } from "../types.ts";
 7 import { map } from "../iter.ts";
 8 
 9 import { toOperation } from "./call.ts";
10@@ -19,8 +19,7 @@ export function all<T>(operations: OpFn<T>[]): Operation<T[]> {
11   });
12 }
13 
14-interface ParallelRet<T> {
15-  wait: () => Operation<Result<T>[]>;
16+interface ParallelRet<T> extends Computation<Result<T>[]> {
17   sequence: Channel<Result<T>, void>;
18   immediate: Channel<Result<T>, void>;
19 }
20@@ -57,5 +56,11 @@ export function* parallel<T>(operations: OpFn<T>[]): Operation<ParallelRet<T>> {
21     return results;
22   }
23 
24-  return { wait, sequence, immediate };
25+  return {
26+    sequence,
27+    immediate,
28+    *[Symbol.iterator]() {
29+      return yield* wait();
30+    },
31+  };
32 }
D test/all.test.ts
+0, -73
 1@@ -1,73 +0,0 @@
 2-import { Operation, run, sleep, spawn } from "../deps.ts";
 3-import { describe, expect, it } from "../test.ts";
 4-
 5-import { all } from "../mod.ts";
 6-
 7-interface Defer<T> {
 8-  promise: Operation<T>;
 9-  resolve: (t: T) => void;
10-  reject: (t: Error) => void;
11-}
12-
13-function defer<T>(): Defer<T> {
14-  let resolve;
15-  let reject;
16-  const promise = new Promise<T>((res, rej) => {
17-    resolve = res;
18-    reject = rej;
19-  });
20-
21-  return { resolve, reject, promise } as any;
22-}
23-
24-describe("all()", () => {
25-  it("should return empty array", async () => {
26-    let actual;
27-    await run(function* () {
28-      actual = yield* all([]);
29-    });
30-    expect(actual).toEqual([]);
31-  });
32-
33-  it("should resolve all async items", async () => {
34-    const two = defer();
35-
36-    function* one() {
37-      yield* sleep(5);
38-      return 1;
39-    }
40-
41-    const result = await run(function* () {
42-      yield* spawn(function* () {
43-        yield* sleep(15);
44-        two.resolve(2);
45-      });
46-      return yield* all([one, () => two.promise]);
47-    });
48-
49-    expect(result).toEqual([1, 2]);
50-  });
51-
52-  it("should stop all operations when there is an error", async () => {
53-    let actual: number[] = [];
54-    const one = defer<number>();
55-    const two = defer<number>();
56-
57-    function* genFn() {
58-      try {
59-        actual = yield* all([() => one.promise, () => two.promise]);
60-      } catch (err) {
61-        actual = [err];
62-      }
63-    }
64-
65-    const err = new Error("error");
66-    one.reject(err);
67-    two.resolve(1);
68-
69-    await run(genFn);
70-
71-    const expected = [err];
72-    expect(actual).toEqual(expected);
73-  });
74-});
M test/parallel.test.ts
+80, -10
  1@@ -1,17 +1,34 @@
  2 import { describe, expect, it } from "../test.ts";
  3-import type { Result } from "../deps.ts";
  4-import { Ok, run, sleep } from "../deps.ts";
  5+import type { Operation, Result } from "../deps.ts";
  6+import { Err, Ok, run, sleep, spawn } from "../deps.ts";
  7 import { parallel } from "../fx/mod.ts";
  8 import { cforEach } from "../iter.ts";
  9 
 10 const test = describe("parallel()");
 11 
 12+interface Defer<T> {
 13+  promise: Operation<T>;
 14+  resolve: (t: T) => void;
 15+  reject: (t: Error) => void;
 16+}
 17+
 18+function defer<T>(): Defer<T> {
 19+  let resolve;
 20+  let reject;
 21+  const promise = new Promise<T>((res, rej) => {
 22+    resolve = res;
 23+    reject = rej;
 24+  });
 25+
 26+  return { resolve, reject, promise } as any;
 27+}
 28+
 29 it(
 30   test,
 31   "should return an immediate channel with results as they are completed",
 32   async () => {
 33     const result = await run(function* () {
 34-      const { wait, immediate } = yield* parallel([
 35+      const results = yield* parallel([
 36         function* () {
 37           yield* sleep(20);
 38           return "second";
 39@@ -23,11 +40,11 @@ it(
 40       ]);
 41 
 42       const res: Result<string>[] = [];
 43-      yield* cforEach(immediate, function* (val) {
 44+      yield* cforEach(results.immediate, function* (val) {
 45         res.push(val);
 46       });
 47 
 48-      yield* wait();
 49+      yield* results;
 50       return res;
 51     });
 52 
 53@@ -40,7 +57,7 @@ it(
 54   "should return a sequence channel with results preserving array order as results",
 55   async () => {
 56     const result = await run(function* () {
 57-      const { wait, sequence } = yield* parallel([
 58+      const results = yield* parallel([
 59         function* () {
 60           yield* sleep(20);
 61           return "second";
 62@@ -52,11 +69,11 @@ it(
 63       ]);
 64 
 65       const res: Result<string>[] = [];
 66-      yield* cforEach(sequence, function* (val) {
 67+      yield* cforEach(results.sequence, function* (val) {
 68         res.push(val);
 69       });
 70 
 71-      yield* wait();
 72+      yield* results;
 73       return res;
 74     });
 75 
 76@@ -69,7 +86,7 @@ it(
 77   "should return all the result in an array, preserving order",
 78   async () => {
 79     const result = await run(function* () {
 80-      const { wait } = yield* parallel([
 81+      const para = yield* parallel([
 82         function* () {
 83           yield* sleep(20);
 84           return "second";
 85@@ -80,9 +97,62 @@ it(
 86         },
 87       ]);
 88 
 89-      return yield* wait();
 90+      return yield* para;
 91     });
 92 
 93     expect(result).toEqual([Ok("second"), Ok("first")]);
 94   },
 95 );
 96+
 97+it(test, "should return empty array", async () => {
 98+  let actual;
 99+  await run(function* () {
100+    const results = yield* parallel([]);
101+    actual = yield* results;
102+  });
103+  expect(actual).toEqual([]);
104+});
105+
106+it(test, "should resolve all async items", async () => {
107+  const two = defer();
108+
109+  function* one() {
110+    yield* sleep(5);
111+    return 1;
112+  }
113+
114+  const result = await run(function* () {
115+    yield* spawn(function* () {
116+      yield* sleep(15);
117+      two.resolve(2);
118+    });
119+    const results = yield* parallel([one, () => two.promise]);
120+    return yield* results;
121+  });
122+
123+  expect(result).toEqual([Ok(1), Ok(2)]);
124+});
125+
126+it(test, "should stop all operations when there is an error", async () => {
127+  let actual: Result<number>[] = [];
128+  const one = defer<number>();
129+  const two = defer<number>();
130+
131+  function* genFn() {
132+    try {
133+      const results = yield* parallel([() => one.promise, () => two.promise]);
134+      actual = yield* results;
135+    } catch (_) {
136+      actual = [Err(new Error("should not get hit"))];
137+    }
138+  }
139+
140+  const err = new Error("error");
141+  one.reject(err);
142+  two.resolve(1);
143+
144+  await run(genFn);
145+
146+  const expected = [Err(err), Ok(1)];
147+  expect(actual).toEqual(expected);
148+});
M types.ts
+5, -1
 1@@ -1,4 +1,8 @@
 2-import type { Operation } from "./deps.ts";
 3+import type { Instruction, Operation } from "./deps.ts";
 4+
 5+export interface Computation<T = any> {
 6+  [Symbol.iterator](): Iterator<Instruction, T, any>;
 7+}
 8 
 9 export type ActionType = string;
10 export interface Action<P = any> {