Eric Bower
·
26 Aug 24
parallel.ts
1import type { Callable, Channel, Operation, Result } from "../deps.ts";
2import type { Computation } from "../types.ts";
3import { createChannel, resource, spawn } from "../deps.ts";
4import { safe } from "./safe.ts";
5
6export interface ParallelRet<T> extends Computation<Result<T>[]> {
7 sequence: Channel<Result<T>, void>;
8 immediate: Channel<Result<T>, void>;
9}
10
11/**
12 * The goal of `parallel` is to make it easier to cooridnate multiple async
13 * operations in parallel, with different ways to receive completed tasks.
14 *
15 * All tasks are called with {@link fx.safe} which means they will never
16 * throw an exception. Instead all tasks will return a Result object that
17 * the end development must evaluate in order to grab the value.
18 *
19 * @example
20 * ```ts
21 * import { parallel } from "starfx";
22 *
23 * function* run() {
24 * const task = yield* parallel([job1, job2]);
25 * // wait for all tasks to complete before moving to next yield point
26 * const results = yield* task;
27 * // job1 = results[0];
28 * // job2 = results[1];
29 * }
30 * ```
31 *
32 * Instead of waiting for all tasks to complete, we can instead loop over
33 * tasks as they arrive:
34 *
35 * @example
36 * ```ts
37 * function* run() {
38 * const task = yield* parallel([job1, job2]);
39 * for (const job of yield* each(task.immediate)) {
40 * // job2 completes first then it will be first in list
41 * console.log(job);
42 * yield* each.next();
43 * }
44 * }
45 * ```
46 *
47 * Or we can instead loop over tasks in order of the array provided to
48 * parallel:
49 *
50 * @example
51 * ```ts
52 * function* run() {
53 * const task = yield* parallel([job1, job2]);
54 * for (const job of yield* each(task.sequence)) {
55 * // job1 then job2 will be returned regardless of when the jobs
56 * // complete
57 * console.log(job);
58 * yield* each.next();
59 * }
60 * }
61 * ```
62 */
63export function parallel<T>(operations: Callable<T>[]) {
64 const sequence = createChannel<Result<T>>();
65 const immediate = createChannel<Result<T>>();
66 const results: Result<T>[] = [];
67
68 return resource<ParallelRet<T>>(function* (provide) {
69 const task = yield* spawn(function* () {
70 const tasks = [];
71 for (const op of operations) {
72 tasks.push(
73 yield* spawn(function* () {
74 const result = yield* safe(op);
75 yield* immediate.send(result);
76 return result;
77 }),
78 );
79 }
80
81 for (const tsk of tasks) {
82 const res = yield* tsk;
83 results.push(res);
84 yield* sequence.send(res);
85 }
86
87 yield* sequence.close();
88 yield* immediate.close();
89 });
90
91 function* wait(): Operation<Result<T>[]> {
92 yield* task;
93 return results;
94 }
95
96 yield* provide({
97 sequence,
98 immediate,
99 *[Symbol.iterator]() {
100 return yield* wait();
101 },
102 });
103 });
104}