repos / starfx

supercharged async flow control library.
git clone https://github.com/neurosnap/starfx.git

commit
1294504
parent
58a048f
author
Eric Bower
date
2023-04-16 19:10:49 +0000 UTC
parallel api
5 files changed,  +152, -33
M deps.ts
+3, -0
 1@@ -3,6 +3,7 @@ export { React };
 2 export type {
 3   Channel,
 4   Operation,
 5+  Result,
 6   Scope,
 7   Task,
 8 } from "https://deno.land/x/effection@3.0.0-alpha.7/mod.ts";
 9@@ -11,8 +12,10 @@ export {
10   createChannel,
11   createContext,
12   createScope,
13+  Err,
14   expect,
15   getframe,
16+  Ok,
17   resource,
18   run,
19   sleep,
M fx/all.ts
+44, -2
 1@@ -1,5 +1,6 @@
 2-import type { Operation } from "../deps.ts";
 3-import { resource, spawn } from "../deps.ts";
 4+import type { Channel, Operation, Result } from "../deps.ts";
 5+import { createChannel, resource, spawn } from "../deps.ts";
 6+import { safe } from "./call.ts";
 7 import type { OpFn } from "../types.ts";
 8 import { map } from "../iter.ts";
 9 
10@@ -17,3 +18,44 @@ export function all<T>(operations: OpFn<T>[]): Operation<T[]> {
11     yield* provide(results);
12   });
13 }
14+
15+interface ParallelRet<T> {
16+  wait: () => Operation<Result<T>[]>;
17+  sequence: Channel<Result<T>, void>;
18+  immediate: Channel<Result<T>, void>;
19+}
20+
21+export function* parallel<T>(operations: OpFn<T>[]): Operation<ParallelRet<T>> {
22+  const sequence = createChannel<Result<T>>();
23+  const immediate = createChannel<Result<T>>();
24+  const results: Result<T>[] = [];
25+
26+  const task = yield* spawn(function* () {
27+    const tasks = [];
28+    for (const op of operations) {
29+      tasks.push(
30+        yield* spawn(function* () {
31+          const result = yield* safe(op);
32+          yield* immediate.input.send(result);
33+          return result;
34+        }),
35+      );
36+    }
37+
38+    for (const tsk of tasks) {
39+      const res = yield* tsk;
40+      results.push(res);
41+      yield* sequence.input.send(res);
42+    }
43+
44+    yield* sequence.input.close();
45+    yield* immediate.input.close();
46+  });
47+
48+  function* wait(): Operation<Result<T>[]> {
49+    yield* task;
50+    return results;
51+  }
52+
53+  return { wait, sequence, immediate };
54+}
M fx/call.ts
+1, -31
 1@@ -1,38 +1,8 @@
 2 import type { Operation, Task } from "../deps.ts";
 3-import { action, expect, spawn } from "../deps.ts";
 4+import { action, Err, expect, Ok, Result, spawn } from "../deps.ts";
 5 import type { OpFn } from "../types.ts";
 6 import { ErrContext } from "../context.ts";
 7 
 8-export interface ResultOk<T> {
 9-  type: "ok";
10-  ok: true;
11-  value: T;
12-}
13-
14-export interface ResultErr {
15-  type: "err";
16-  ok: false;
17-  error: Error;
18-}
19-
20-export type Result<T> = ResultOk<T> | ResultErr;
21-
22-export function Ok<T>(value: T): ResultOk<T> {
23-  return {
24-    type: "ok",
25-    ok: true,
26-    value,
27-  };
28-}
29-
30-export function Err(error: Error): ResultErr {
31-  return {
32-    type: "err",
33-    ok: false,
34-    error,
35-  };
36-}
37-
38 export const isFunc = (f: unknown) => typeof f === "function";
39 export const isPromise = (p: unknown) =>
40   p && isFunc((p as PromiseLike<unknown>).then);
M iter.ts
+16, -0
 1@@ -1,5 +1,21 @@
 2 import type { Channel, Operation } from "./deps.ts";
 3 
 4+export function* cforEach<T>(
 5+  chan: Channel<T, void>,
 6+  each?: (val: T) => Operation<void>,
 7+) {
 8+  const { output } = chan;
 9+  const msgList = yield* output;
10+  while (true) {
11+    const next = yield* msgList;
12+    if (next.done) {
13+      return next.value;
14+    } else if (each) {
15+      yield* each(next.value);
16+    }
17+  }
18+}
19+
20 export function* forEach<T>(
21   chan: Operation<Channel<T, void>>,
22   each?: (val: T) => Operation<void>,
A test/parallel.test.ts
+88, -0
 1@@ -0,0 +1,88 @@
 2+import { describe, expect, it } from "../test.ts";
 3+import type { Result } from "../deps.ts";
 4+import { Ok, run, sleep } from "../deps.ts";
 5+import { parallel } from "../fx/mod.ts";
 6+import { cforEach } from "../iter.ts";
 7+
 8+const test = describe("parallel()");
 9+
10+it(
11+  test,
12+  "should return an immediate channel with results as they are completed",
13+  async () => {
14+    const result = await run(function* () {
15+      const { wait, immediate } = yield* parallel([
16+        function* () {
17+          yield* sleep(20);
18+          return "second";
19+        },
20+        function* () {
21+          yield* sleep(10);
22+          return "first";
23+        },
24+      ]);
25+
26+      const res: Result<string>[] = [];
27+      yield* cforEach(immediate, function* (val) {
28+        res.push(val);
29+      });
30+
31+      yield* wait();
32+      return res;
33+    });
34+
35+    expect(result).toEqual([Ok("first"), Ok("second")]);
36+  },
37+);
38+
39+it(
40+  test,
41+  "should return a sequence channel with results preserving array order as results",
42+  async () => {
43+    const result = await run(function* () {
44+      const { wait, sequence } = yield* parallel([
45+        function* () {
46+          yield* sleep(20);
47+          return "second";
48+        },
49+        function* () {
50+          yield* sleep(10);
51+          return "first";
52+        },
53+      ]);
54+
55+      const res: Result<string>[] = [];
56+      yield* cforEach(sequence, function* (val) {
57+        res.push(val);
58+      });
59+
60+      yield* wait();
61+      return res;
62+    });
63+
64+    expect(result).toEqual([Ok("second"), Ok("first")]);
65+  },
66+);
67+
68+it(
69+  test,
70+  "should return all the result in an array, preserving order",
71+  async () => {
72+    const result = await run(function* () {
73+      const { wait } = yield* parallel([
74+        function* () {
75+          yield* sleep(20);
76+          return "second";
77+        },
78+        function* () {
79+          yield* sleep(10);
80+          return "first";
81+        },
82+      ]);
83+
84+      return yield* wait();
85+    });
86+
87+    expect(result).toEqual([Ok("second"), Ok("first")]);
88+  },
89+);