Eric Bower
·
26 Aug 24
supervisor.ts
1import { Callable, Operation, Result, sleep } from "../deps.ts";
2import { safe } from "./safe.ts";
3import { parallel } from "./parallel.ts";
4import { put } from "../action.ts";
5import { API_ACTION_PREFIX } from "../action.ts";
6
7export function superviseBackoff(attempt: number, max = 10): number {
8 if (attempt > max) return -1;
9 // 20ms, 40ms, 80ms, 160ms, 320ms, 640ms, 1280ms, 2560ms, 5120ms, 10240ms
10 return 2 ** attempt * 10;
11}
12
13/**
14 * supvervise will watch whatever {@link Operation} is provided
15 * and it will automatically try to restart it when it exists. By
16 * default it uses a backoff pressure mechanism so if there is an
17 * error simply calling the {@link Operation} then it will exponentially
18 * wait longer until attempting to restart and eventually give up.
19 */
20export function supervise<T>(
21 op: Callable<T>,
22 backoff: (attemp: number) => number = superviseBackoff,
23) {
24 return function* () {
25 let attempt = 1;
26 let waitFor = backoff(attempt);
27
28 while (waitFor >= 0) {
29 const res = yield* safe(op);
30
31 if (res.ok) {
32 attempt = 0;
33 } else {
34 yield* put({
35 type: `${API_ACTION_PREFIX}supervise`,
36 payload: res.error,
37 meta:
38 `Exception caught, waiting ${waitFor}ms before restarting operation`,
39 });
40 yield* sleep(waitFor);
41 }
42
43 attempt += 1;
44 waitFor = backoff(attempt);
45 }
46 };
47}
48
49/**
50 * keepAlive accepts a list of operations and calls them all with
51 * {@link supervise}
52 */
53export function* keepAlive(
54 ops: Callable<unknown>[],
55 backoff?: (attempt: number) => number,
56): Operation<Result<void>[]> {
57 const group = yield* parallel(
58 ops.map((op) => supervise(op, backoff)),
59 );
60 const results = yield* group;
61 return results;
62}