Eric Bower
·
2025-06-06
parallel.test.ts
1import type { Operation, Result } from "../index.js";
2import { Err, Ok, each, parallel, run, sleep, spawn } from "../index.js";
3import { expect, test } from "../test.js";
4
5interface Defer<T> {
6 promise: Promise<T>;
7 resolve: (t: T) => void;
8 reject: (t: Error) => void;
9}
10
11function defer<T>(): Defer<T> {
12 let resolve: (t: T) => void = () => {};
13 let reject: (t: Error) => void = () => {};
14 const promise = new Promise<T>((res, rej) => {
15 resolve = res;
16 reject = rej;
17 });
18
19 return { resolve, reject, promise };
20}
21
22test("should return an immediate channel with results as they are completed", async () => {
23 const result = await run(function* () {
24 const results = yield* parallel([
25 function* () {
26 yield* sleep(20);
27 return "second";
28 },
29 function* () {
30 yield* sleep(10);
31 return "first";
32 },
33 ]);
34
35 const res: Result<string>[] = [];
36 for (const val of yield* each(results.immediate)) {
37 res.push(val);
38 yield* each.next();
39 }
40
41 yield* results;
42 return res;
43 });
44
45 expect(result).toEqual([Ok("first"), Ok("second")]);
46});
47
48test("should return a sequence channel with results preserving array order as results", async () => {
49 const result = await run(function* () {
50 const results = yield* parallel([
51 function* () {
52 yield* sleep(20);
53 return "second";
54 },
55 function* () {
56 yield* sleep(10);
57 return "first";
58 },
59 ]);
60
61 const res: Result<string>[] = [];
62 for (const val of yield* each(results.sequence)) {
63 res.push(val);
64 yield* each.next();
65 }
66
67 yield* results;
68 return res;
69 });
70
71 expect(result).toEqual([Ok("second"), Ok("first")]);
72});
73
74test("should return all the result in an array, preserving order", async () => {
75 const result = await run(function* () {
76 const para = yield* parallel([
77 function* () {
78 yield* sleep(20);
79 return "second";
80 },
81 function* () {
82 yield* sleep(10);
83 return "first";
84 },
85 ]);
86
87 return yield* para;
88 });
89
90 expect(result).toEqual([Ok("second"), Ok("first")]);
91});
92
93test("should return empty array", async () => {
94 let actual;
95 await run(function* (): Operation<void> {
96 const results = yield* parallel([]);
97 actual = yield* results;
98 });
99 expect(actual).toEqual([]);
100});
101
102test("should resolve all async items", async () => {
103 const two = defer();
104
105 function* one() {
106 yield* sleep(5);
107 return 1;
108 }
109
110 const result = await run(function* () {
111 yield* spawn(function* () {
112 yield* sleep(15);
113 two.resolve(2);
114 });
115 const results = yield* parallel([one, () => two.promise]);
116 return yield* results;
117 });
118
119 expect(result).toEqual([Ok(1), Ok(2)]);
120});
121
122test("should stop all operations when there is an error", async () => {
123 let actual: Result<number>[] = [];
124 const one = defer<number>();
125 const two = defer<number>();
126
127 function* genFn() {
128 try {
129 const results = yield* parallel([() => one.promise, () => two.promise]);
130 actual = yield* results;
131 } catch (_) {
132 actual = [Err(new Error("should not get hit"))];
133 }
134 }
135
136 const err = new Error("error");
137 one.reject(err);
138 two.resolve(1);
139
140 await run(genFn);
141
142 const expected = [Err(err), Ok(1)];
143 expect(actual).toEqual(expected);
144});