repos / starfx

supercharged async flow control library.
git clone https://github.com/neurosnap/starfx.git

starfx / fx
Eric Bower · 26 Aug 24

parallel.ts

  1import type { Callable, Channel, Operation, Result } from "../deps.ts";
  2import type { Computation } from "../types.ts";
  3import { createChannel, resource, spawn } from "../deps.ts";
  4import { safe } from "./safe.ts";
  5
  6export interface ParallelRet<T> extends Computation<Result<T>[]> {
  7  sequence: Channel<Result<T>, void>;
  8  immediate: Channel<Result<T>, void>;
  9}
 10
 11/**
 12 * The goal of `parallel` is to make it easier to cooridnate multiple async
 13 * operations in parallel, with different ways to receive completed tasks.
 14 *
 15 * All tasks are called with {@link fx.safe} which means they will never
 16 * throw an exception.  Instead all tasks will return a Result object that
 17 * the end development must evaluate in order to grab the value.
 18 *
 19 * @example
 20 * ```ts
 21 * import { parallel } from "starfx";
 22 *
 23 * function* run() {
 24 *  const task = yield* parallel([job1, job2]);
 25 *  // wait for all tasks to complete before moving to next yield point
 26 *  const results = yield* task;
 27 *  // job1 = results[0];
 28 *  // job2 = results[1];
 29 * }
 30 * ```
 31 *
 32 * Instead of waiting for all tasks to complete, we can instead loop over
 33 * tasks as they arrive:
 34 *
 35 * @example
 36 * ```ts
 37 * function* run() {
 38 *  const task = yield* parallel([job1, job2]);
 39 *  for (const job of yield* each(task.immediate)) {
 40 *    // job2 completes first then it will be first in list
 41 *    console.log(job);
 42 *    yield* each.next();
 43 *  }
 44 * }
 45 * ```
 46 *
 47 * Or we can instead loop over tasks in order of the array provided to
 48 * parallel:
 49 *
 50 * @example
 51 * ```ts
 52 * function* run() {
 53 *  const task = yield* parallel([job1, job2]);
 54 *  for (const job of yield* each(task.sequence)) {
 55 *    // job1 then job2 will be returned regardless of when the jobs
 56 *    // complete
 57 *    console.log(job);
 58 *    yield* each.next();
 59 *  }
 60 * }
 61 * ```
 62 */
 63export function parallel<T>(operations: Callable<T>[]) {
 64  const sequence = createChannel<Result<T>>();
 65  const immediate = createChannel<Result<T>>();
 66  const results: Result<T>[] = [];
 67
 68  return resource<ParallelRet<T>>(function* (provide) {
 69    const task = yield* spawn(function* () {
 70      const tasks = [];
 71      for (const op of operations) {
 72        tasks.push(
 73          yield* spawn(function* () {
 74            const result = yield* safe(op);
 75            yield* immediate.send(result);
 76            return result;
 77          }),
 78        );
 79      }
 80
 81      for (const tsk of tasks) {
 82        const res = yield* tsk;
 83        results.push(res);
 84        yield* sequence.send(res);
 85      }
 86
 87      yield* sequence.close();
 88      yield* immediate.close();
 89    });
 90
 91    function* wait(): Operation<Result<T>[]> {
 92      yield* task;
 93      return results;
 94    }
 95
 96    yield* provide({
 97      sequence,
 98      immediate,
 99      *[Symbol.iterator]() {
100        return yield* wait();
101      },
102    });
103  });
104}