Eric Bower
·
18 Jan 24
parallel.test.ts
1import { describe, expect, it } from "../test.ts";
2import type { Operation, Result } from "../mod.ts";
3import { each, Err, Ok, parallel, run, sleep, spawn } from "../mod.ts";
4
5const test = describe("parallel()");
6
7interface Defer<T> {
8 promise: Promise<T>;
9 resolve: (t: T) => void;
10 reject: (t: Error) => void;
11}
12
13function defer<T>(): Defer<T> {
14 let resolve: (t: T) => void = () => {};
15 let reject: (t: Error) => void = () => {};
16 const promise = new Promise<T>((res, rej) => {
17 resolve = res;
18 reject = rej;
19 });
20
21 return { resolve, reject, promise };
22}
23
24it(
25 test,
26 "should return an immediate channel with results as they are completed",
27 async () => {
28 const result = await run(function* () {
29 const results = yield* parallel([
30 function* () {
31 yield* sleep(20);
32 return "second";
33 },
34 function* () {
35 yield* sleep(10);
36 return "first";
37 },
38 ]);
39
40 const res: Result<string>[] = [];
41 for (const val of yield* each(results.immediate)) {
42 res.push(val);
43 yield* each.next();
44 }
45
46 yield* results;
47 return res;
48 });
49
50 expect(result).toEqual([Ok("first"), Ok("second")]);
51 },
52);
53
54it(
55 test,
56 "should return a sequence channel with results preserving array order as results",
57 async () => {
58 const result = await run(function* () {
59 const results = yield* parallel([
60 function* () {
61 yield* sleep(20);
62 return "second";
63 },
64 function* () {
65 yield* sleep(10);
66 return "first";
67 },
68 ]);
69
70 const res: Result<string>[] = [];
71 for (const val of yield* each(results.sequence)) {
72 res.push(val);
73 yield* each.next();
74 }
75
76 yield* results;
77 return res;
78 });
79
80 expect(result).toEqual([Ok("second"), Ok("first")]);
81 },
82);
83
84it(
85 test,
86 "should return all the result in an array, preserving order",
87 async () => {
88 const result = await run(function* () {
89 const para = yield* parallel([
90 function* () {
91 yield* sleep(20);
92 return "second";
93 },
94 function* () {
95 yield* sleep(10);
96 return "first";
97 },
98 ]);
99
100 return yield* para;
101 });
102
103 expect(result).toEqual([Ok("second"), Ok("first")]);
104 },
105);
106
107it(test, "should return empty array", async () => {
108 let actual;
109 await run(function* (): Operation<void> {
110 const results = yield* parallel([]);
111 actual = yield* results;
112 });
113 expect(actual).toEqual([]);
114});
115
116it(test, "should resolve all async items", async () => {
117 const two = defer();
118
119 function* one() {
120 yield* sleep(5);
121 return 1;
122 }
123
124 const result = await run(function* () {
125 yield* spawn(function* () {
126 yield* sleep(15);
127 two.resolve(2);
128 });
129 const results = yield* parallel([one, () => two.promise]);
130 return yield* results;
131 });
132
133 expect(result).toEqual([Ok(1), Ok(2)]);
134});
135
136it(test, "should stop all operations when there is an error", async () => {
137 let actual: Result<number>[] = [];
138 const one = defer<number>();
139 const two = defer<number>();
140
141 function* genFn() {
142 try {
143 const results = yield* parallel([() => one.promise, () => two.promise]);
144 actual = yield* results;
145 } catch (_) {
146 actual = [Err(new Error("should not get hit"))];
147 }
148 }
149
150 const err = new Error("error");
151 one.reject(err);
152 two.resolve(1);
153
154 await run(genFn);
155
156 const expected = [Err(err), Ok(1)];
157 expect(actual).toEqual(expected);
158});