Eric Bower
·
2025-06-06
supervisor.ts
1import { type Callable, type Operation, sleep } from "effection";
2import type { Result } from "effection"; // Adjust the import path as needed
3import { API_ACTION_PREFIX, put } from "../action.js";
4import { parallel } from "./parallel.js";
5import { safe } from "./safe.js";
6
7export function superviseBackoff(attempt: number, max = 10): number {
8 if (attempt > max) return -1;
9 // 20ms, 40ms, 80ms, 160ms, 320ms, 640ms, 1280ms, 2560ms, 5120ms, 10240ms
10 return 2 ** attempt * 10;
11}
12
13/**
14 * supvervise will watch whatever {@link Operation} is provided
15 * and it will automatically try to restart it when it exists. By
16 * default it uses a backoff pressure mechanism so if there is an
17 * error simply calling the {@link Operation} then it will exponentially
18 * wait longer until attempting to restart and eventually give up.
19 */
20export function supervise<T>(
21 op: Callable<T>,
22 backoff: (attemp: number) => number = superviseBackoff,
23) {
24 return function* () {
25 let attempt = 1;
26 let waitFor = backoff(attempt);
27
28 while (waitFor >= 0) {
29 const res = yield* safe(op);
30
31 if (res.ok) {
32 attempt = 0;
33 } else {
34 yield* put({
35 type: `${API_ACTION_PREFIX}supervise`,
36 payload: res.error,
37 meta: `Exception caught, waiting ${waitFor}ms before restarting operation`,
38 });
39 yield* sleep(waitFor);
40 }
41
42 attempt += 1;
43 waitFor = backoff(attempt);
44 }
45 };
46}
47
48/**
49 * keepAlive accepts a list of operations and calls them all with
50 * {@link supervise}
51 */
52export function* keepAlive(
53 ops: Callable<unknown>[],
54 backoff?: (attempt: number) => number,
55): Operation<Result<void>[]> {
56 const group = yield* parallel(ops.map((op) => supervise(op, backoff)));
57 const results = yield* group;
58 return results;
59}