repos / starfx

supercharged async flow control library.
git clone https://github.com/neurosnap/starfx.git

Eric Bower · 30 Jul 24

action.ts

  1import {
  2  call,
  3  Callable,
  4  createContext,
  5  createSignal,
  6  each,
  7  Operation,
  8  Signal,
  9  SignalQueueFactory,
 10  spawn,
 11  Stream,
 12} from "./deps.ts";
 13import { ActionPattern, matcher } from "./matcher.ts";
 14import type { Action, ActionWithPayload, AnyAction } from "./types.ts";
 15import { createFilterQueue } from "./queue.ts";
 16import { ActionFnWithPayload } from "./types.ts";
 17
 18export const ActionContext = createContext(
 19  "starfx:action",
 20  createSignal<AnyAction, void>(),
 21);
 22
 23export function useActions(pattern: ActionPattern): Stream<AnyAction, void> {
 24  return {
 25    *subscribe() {
 26      const actions = yield* ActionContext;
 27      const match = matcher(pattern);
 28      yield* SignalQueueFactory.set(() => createFilterQueue(match) as any);
 29      return yield* actions.subscribe();
 30    },
 31  };
 32}
 33
 34export function emit({
 35  signal,
 36  action,
 37}: {
 38  signal: Signal<AnyAction, void>;
 39  action: AnyAction | AnyAction[];
 40}) {
 41  if (Array.isArray(action)) {
 42    if (action.length === 0) {
 43      return;
 44    }
 45    action.map((a) => signal.send(a));
 46  } else {
 47    signal.send(action);
 48  }
 49}
 50
 51export function* put(action: AnyAction | AnyAction[]) {
 52  const signal = yield* ActionContext;
 53  return emit({
 54    signal,
 55    action,
 56  });
 57}
 58
 59export function take<P>(
 60  pattern: ActionPattern,
 61): Operation<ActionWithPayload<P>>;
 62export function* take(pattern: ActionPattern): Operation<Action> {
 63  const fd = useActions(pattern);
 64  for (const action of yield* each(fd)) {
 65    return action;
 66  }
 67
 68  return { type: "take failed, this should not be possible" };
 69}
 70
 71export function* takeEvery<T>(
 72  pattern: ActionPattern,
 73  op: (action: AnyAction) => Operation<T>,
 74) {
 75  const fd = useActions(pattern);
 76  for (const action of yield* each(fd)) {
 77    yield* spawn(() => op(action));
 78    yield* each.next();
 79  }
 80}
 81
 82export function* takeLatest<T>(
 83  pattern: ActionPattern,
 84  op: (action: AnyAction) => Operation<T>,
 85) {
 86  const fd = useActions(pattern);
 87  let lastTask;
 88
 89  for (const action of yield* each(fd)) {
 90    if (lastTask) {
 91      yield* lastTask.halt();
 92    }
 93    lastTask = yield* spawn(() => op(action));
 94    yield* each.next();
 95  }
 96}
 97
 98export function* takeLeading<T>(
 99  pattern: ActionPattern,
100  op: (action: AnyAction) => Operation<T>,
101) {
102  while (true) {
103    const action = yield* take(pattern);
104    yield* call(() => op(action));
105  }
106}
107
108export function* waitFor(
109  predicate: Callable<boolean>,
110) {
111  const init = yield* call(predicate as any);
112  if (init) {
113    return;
114  }
115
116  while (true) {
117    yield* take("*");
118    const result = yield* call(() => predicate as any);
119    if (result) {
120      return;
121    }
122  }
123}
124
125export function getIdFromAction(
126  action: ActionWithPayload<{ key: string }> | ActionFnWithPayload,
127): string {
128  return typeof action === "function" ? action.toString() : action.payload.key;
129}
130
131export const API_ACTION_PREFIX = "";
132
133export function createAction(actionType: string): () => Action;
134export function createAction<P>(
135  actionType: string,
136): (p: P) => ActionWithPayload<P>;
137export function createAction(actionType: string) {
138  if (!actionType) {
139    throw new Error("createAction requires non-empty string");
140  }
141  const fn = (payload?: unknown) => ({
142    type: actionType,
143    payload,
144  });
145  fn.toString = () => actionType;
146
147  return fn;
148}