repos / starfx

supercharged async flow control library.
git clone https://github.com/neurosnap/starfx.git

Eric Bower · 23 Feb 24

supervisor.ts

 1import { createAction, take } from "./action.ts";
 2import { call, Callable, Operation, race, sleep, spawn, Task } from "./deps.ts";
 3import type { ActionWithPayload, AnyAction } from "./types.ts";
 4import type { CreateActionPayload } from "./query/mod.ts";
 5import { getIdFromAction } from "./action.ts";
 6
 7const MS = 1000;
 8const SECONDS = 1 * MS;
 9const MINUTES = 60 * SECONDS;
10
11export function poll(parentTimer: number = 5 * SECONDS, cancelType?: string) {
12  return function* poller<T>(
13    actionType: string,
14    op: (action: AnyAction) => Operation<T>,
15  ): Operation<T> {
16    const cancel = cancelType || actionType;
17    function* fire(action: { type: string }, timer: number) {
18      while (true) {
19        yield* op(action);
20        yield* sleep(timer);
21      }
22    }
23
24    while (true) {
25      const action = yield* take<{ timer?: number }>(actionType);
26      const timer = action.payload?.timer || parentTimer;
27      yield* race([
28        fire(action, timer),
29        take(`${cancel}`) as Operation<void>,
30      ]);
31    }
32  };
33}
34
35type ClearTimerPayload = string | { type: string; payload: { key: string } };
36
37export const clearTimers = createAction<
38  ClearTimerPayload | ClearTimerPayload[]
39>("clear-timers");
40
41/**
42 * timer() will create a cache timer for each `key` inside
43 * of a starfx api endpoint.  `key` is a hash of the action type and payload.
44 *
45 * Why do we want this?  If we have an api endpoint to fetch a single app: `fetchApp({ id: 1 })`
46 * if we don't set a timer per key then all calls to `fetchApp` will be on a timer.
47 * So if we call `fetchApp({ id: 1 })` and then `fetchApp({ id: 2 })` if we use a normal
48 * cache timer then the second call will not send an http request.
49 */
50export function timer(timer: number = 5 * MINUTES) {
51  return function* onTimer(
52    actionType: string,
53    op: (action: AnyAction) => Callable<unknown>,
54  ) {
55    const map: { [key: string]: Task<unknown> } = {};
56
57    function* activate(action: ActionWithPayload<CreateActionPayload>) {
58      yield* call(() => op(action));
59      const idA = getIdFromAction(action);
60
61      const matchFn = (
62        act: ActionWithPayload<ClearTimerPayload | ClearTimerPayload[]>,
63      ) => {
64        if (act.type !== `${clearTimers}`) return false;
65        if (!act.payload) return false;
66        const ids = Array.isArray(act.payload) ? act.payload : [act.payload];
67        return ids.some((id) => {
68          if (id === "*") {
69            return true;
70          }
71          if (typeof id === "string") {
72            return idA === id;
73          } else {
74            return idA === getIdFromAction(id);
75          }
76        });
77      };
78      yield* race([
79        sleep(timer),
80        take(matchFn as any) as Operation<void>,
81      ]);
82
83      delete map[action.payload.key];
84    }
85
86    while (true) {
87      const action = yield* take<CreateActionPayload>(`${actionType}`);
88      const key = action.payload.key;
89      if (!map[key]) {
90        const task = yield* spawn(function* () {
91          yield* activate(action);
92        });
93        map[key] = task;
94      }
95    }
96  };
97}