repos / starfx

supercharged async flow control library.
git clone https://github.com/neurosnap/starfx.git

starfx / docs / posts
Eric Bower · 27 Aug 24

supervisors.md

  1---
  2title: Supervisors
  3description: Learn how supervisor tasks work
  4---
  5
  6A supervisor task is a way to monitor children tasks and manage their health. By
  7structuring your side-effects and business logic around supervisor tasks, we
  8gain interesting coding paradigms that result in easier to read and manage code.
  9
 10[Supplemental reading from erlang](https://www.erlang.org/doc/design_principles/des_princ)
 11
 12The most basic version of a supervisor is simply an infinite loop that calls a
 13child task:
 14
 15```ts
 16import { call } from "starfx";
 17
 18function* supervisor() {
 19  while (true) {
 20    try {
 21      yield* call(someTask);
 22    } catch (err) {
 23      console.error(err);
 24    }
 25  }
 26}
 27
 28function* someTask() {
 29  yield* sleep(10 * 1000);
 30  throw new Error("boom!");
 31}
 32```
 33
 34Here we `call` some task that should always be in a running and healthy state.
 35If it raises an exception, we log it and try to run the task again.
 36
 37Building on top of that simple supervisor, we can have tasks that always listen
 38for events and if they fail, restart them.
 39
 40```ts
 41import { parallel, run, take } from "starfx";
 42
 43function* watchFetch() {
 44  while (true) {
 45    const action = yield* take("FETCH_USERS");
 46    console.log(action);
 47  }
 48}
 49
 50function* send() {
 51  yield* put({ type: "FETCH_USERS" });
 52  yield* put({ type: "FETCH_USERS" });
 53  yield* put({ type: "FETCH_USERS" });
 54}
 55
 56await run(
 57  parallel([watchFetch, send]),
 58);
 59```
 60
 61Here we create a supervisor function using a helper `take` to call a function
 62for every `FETCH_USERS` event emitted.
 63
 64While inside a `while` loop, you get full access to its powerful flow control.
 65Another example, let's say we we only want to respond to a login action when the
 66user isn't logged in and conversely only listen to a logout action when the user
 67is logged in:
 68
 69```ts
 70function*() {
 71  while (true) {
 72    const login = yield* take("LOGIN");
 73    // e.g. fetch token with creds inside `login.payload`
 74    const logout = yield* take("LOGOUT");
 75    // e.g. destroy token from `logout.payload`
 76  }
 77}
 78```
 79
 80Interesting, we've essentially created a finite state machine within a
 81while-loop!
 82
 83We also built a helper that will abstract the while loop if you don't need it:
 84
 85```ts
 86import { takeEvery } from "starfx";
 87
 88function* watchFetch() {
 89  yield* takeEvery("FETCH_USERS", function* (action) {
 90    console.log(action);
 91  });
 92}
 93```
 94
 95However, this means that we are going to make the same request 3 times, we
 96probably want a throttle or debounce so we only make a fetch request once within
 97some interval.
 98
 99```ts
100import { takeLeading } from "starfx";
101
102function* watchFetch() {
103  yield* takeLeading("FETCH_USERS", function* (action) {
104    console.log(action);
105  });
106}
107```
108
109That's better, now only one task can be alive at one time.
110
111Both thunks and endpoints simply listen for
112[actions](/thunks#anatomy-of-an-action) being emitted onto a channel -- which is
113just an event emitter -- and then call the middleware stack with that action.
114
115Both thunks and endpoints support overriding the default `takeEvery` supervisor
116for either our officially supported supervisors `takeLatest` and `takeLeading`,
117or a user-defined supervisor.
118
119Because every thunk and endpoint have their own supervisor tasks monitoring the
120health of their children, we allow the end-developer to change the default
121supervisor -- which is `takeEvery`:
122
123```ts
124const someAction = thunks.create("some-action", { supervisor: takeLatest });
125dispatch(someAction()); // this task gets cancelled
126dispatch(someAction()); // this task gets cancelled
127dispatch(someAction()); // this tasks lives
128```
129
130This is the power of supervisors and is fundamental to how `starfx` works.
131
132# poll
133
134When activated, call a thunk or endpoint once every N millisecond indefinitely
135until cancelled.
136
137```ts
138import { poll } from "starfx";
139
140const fetchUsers = api.get("/users", { supervisor: poll() });
141store.dispatch(fetchUsers());
142// fetch users
143// sleep 5000
144// fetch users
145// sleep 5000
146// fetch users
147store.dispatch(fetchUsers());
148// cancelled
149```
150
151The default value provided to `poll()` is **5 seconds**.
152
153You can optionally provide a cancel action instead of calling the thunk twice:
154
155```ts
156import { poll } from "starfx";
157
158const cancelPoll = createAction("cancel-poll");
159const fetchUsers = api.get("/users", {
160  supervisor: poll(5 * 1000, `${cancelPoll}`),
161});
162store.dispatch(fetchUsers());
163// fetch users
164// sleep 5000
165// fetch users
166// sleep 5000
167// fetch users
168store.dispatch(cancelPoll());
169// cancelled
170```
171
172# timer
173
174Only call a thunk or endpoint at-most once every N milliseconds.
175
176```ts
177import { timer } from "starfx";
178
179const fetchUsers = api.get("/users", { supervisor: timer(1000) });
180store.dispatch(fetchUsers());
181store.dispatch(fetchUsers());
182// sleep(100);
183store.dispatch(fetchUsers());
184// sleep(1000);
185store.dispatch(fetchUsers());
186// called: 2 times
187```
188
189The default value provided to `timer()` is **5 minutes**. This means you can
190only call `fetchUsers` at-most once every **5 minutes**.
191
192## clearTimers
193
194Want to clear a timer and refetch?
195
196```ts
197import { clearTimers, timer } from "starfx";
198
199const fetchUsers = api.get("/users", { supervisor: timer(1000) });
200store.dispatch(fetchUsers());
201store.dispatch(clearTimers(fetchUsers()));
202store.dispatch(fetchUsers());
203// called: 2 times
204store.dispatch(clearTimers("*")); // clear all timers
205```