Skip to main content

Substantial

Substantial runtime

The Substantial runtime enables the execution of durable workflows in one or across multiple typegates.

Why use it?

  • Long-running "processes": Durable tasks that need to run over extended periods (days, weeks or months), handling retries and restarts seamlessly.
  • Fault-tolerant execution: Ensure reliable execution of tasks, even upon failures, by maintaining a durable state of the latest run.
  • Task orchestration: Coordinate complex sequences of workflows (analogous to microservice interactions).

For example, the workflow below will continue running until a confirmation event is sent to the associated run.

export async function sendEmail(ctx: Context) {
// 1. A workflow can receive parameters whose type is defined on the typegraph
const { to } = ctx.kwargs;

// 2. When a function call produces effects, we can make it durable
const info = await ctx.save(() => sendSubscriptionEmail(to));
const timeSent = await ctx.save(() => new Date().toJSON());

const confirmation = ctx.receive<boolean>("confirmation");
if (!confirmation) {
throw new Error(`${to} has denied the subscription sent at ${timeSent}`);
}

return `${to} confirmed (${info})`;
}

Additionally, if we were to shut down the Typegate node executing it and then restart it, the state will be preserved. This means that if the subscription email was already sent, upon relaunch, it will not be sent again, same thing for the value of timeSent.

Key Concepts

Backend

This abstraction implements a set of atomic operations that allows Typegate to persist and recover the workflow state. Currently, we have the Redis backend available, along with others like fs and memory, which are primarily intended for development or testing purposes.

Workflows

A special type of function with durable state and an execution mechanism directly tied to time. A workflow can also trigger other workflows (child workflows).

Persistence and Lifecycle

  • Context

The context object contains the workflow input (namely kwargs as seen in the example above), but it can also be thought as a namespace that contains all of the core functions used for durableness.

It is recreated at every replay.

  • Interrupts

A special state of the program that is produced by any function that can trigger a workflow replay.

An interrupt will pause the program at the line it was emitted then queue it back to execute later.

One simple example of such function is when you want to wait for a given amount of time, Substantial will save the current time and the end time, interrupts the workflow then requeue it to execute later.

Any agent (Typegate node) that picks the workflow, will replay it, the cycle repeats until the actual current time is greater or equal to the end time.

await ctx.sleep(24 * 3600 * 1000); // 1 day
  • Save

A save is one of the main building blocks of a workflow, many functions available on the context object rely on it.

This is mainly because a save call converts any function into a durable one: the function output is saved and persisted in the backend. On subsequent executions, the saved value is retrieved from the backend instead of re-executing the function.

This ensures that when a workflow is resumed (after a Typegate reboot for example) or replayed (after interrupts), the saved function will not be executed again.

// For example, if the output was 7 then after replay,
// save will not execute the function inside but directly return already persisted value, which was 7.
const rand = await ctx.save(() => Math.floor(10 * Math.random()));

// If you keep in mind that the workflow can be replayed many times
// A save call should make more sense!
const now = await ctx.save(() => Date.now());

// And even more for functions that can produce external side-effects
const result = await ctx.save(() => sendEmail());
Notes
  • Only JSON-compliant values can be persisted. The execution will throw otherwise.
  • Make sure to not rely on changing outside references inside a save call, best is to always expect a replay.
let value = 5;
const afterSave = await save(() => {
value *= 2;
return save; // 10 will be stored on the Backend
});

console.log(value); // 10 on the first replay, 5 on the next replay (save call function was skipped)
console.log(afterSave); // always 10

// Ideally, what you want is to reuse the saved value if the effect was desired
// especially when you branch
if (afterSave == 10) {
console.log("All good"); // branch path is now durable even after replays!
} else {
throw new Error("Unreachable code");
}
  • Send/Receive

You can send events to a workflow through GraphQL, any receive call on the workflow will await for it and will interrupt the workflow if it hasn't been received yet.

g.expose({
// ..
send: sub.send(t.integer()),
});
# Client
query {
send(run_id: "<workflow_run_id>", event: { name: "myEvent", payload: 1234 })
}
// Workflow
const value = ctx.receive<number>("myEvent"); // 1234
  • Ensure

It's a function that takes a predicate, and will interrupt the workflow so that it will be replayed later if the returned value is false.

const secret = ctx.receive<string>("secret");
await ctx.ensure(() => secret == "top_secret");
//
// continue execution

Run

When a workflow is started, a run is created and Substantial will provide you a run_id to uniquely identify it.

You can send an event or abort an ongoing run from its run_id.

Advanced Filters

In practice, you will have many workflows that have been executed, each run can be in different states. Substantial provides a way to filter the runs.

g.expose({
// ..
search: sub.advancedFilters(),
});
# Client
query {
search(
name: "sendEmail"
filter: {
and: [
{ status: { contains: "\"COMPLETED\"" } }
{ not: { started_at: { lt: "\"2025-01-15T00:00:00Z\"" } } }
{ not: { eq: "null" } }
]
}
) {
run_id
started_at
ended_at
status
value
}
}
  • Specification and syntax

The specification itself is very close to Prisma queries. You can also refer to your GraphQL playground for guiding you into expressing your query.

Base specification:

val           ::=  json_string

term ::= { eq: val }
| { lt: val } | { lte: val }
| { gt: val } | { gte: val }
| { in: val } | { contains: val }

special_term ::= { started_at: term }
| { ended_at: term }
| { status: term }

not ::= { not: expr }
or ::= { or: [expr] }
and ::= { and: [expr] }

expr ::= not | or | and | term | special_term
Notes
  • contains: Check if the workflow output is a list that contains the given value or if the given value is a substring of it.
  • in: Check if the workflow output is within a list or is a substring of the given value.
  • status
    • Can be one of "COMPLETED", "COMPLETED_WITH_ERROR", "ONGOING" or "UNKNOWN".

For example, the term

{ status: { contains: "\"COMPLETED\"" } }

..should cover "COMPLETED" and "COMPLETED_WITH_ERROR".

Child workflows

Child workflows are like any other workflows, they are just run by another workflow (parent).

If a workflow parent is explicitly stopped or aborted, all of its descendants will also be aborted.

For example, suppose you want to write a workflow that sends a subscription request to a list of emails and then receive a notification for each confirmation or denial, but only during your work hours.

You can easily translate that logic as if you were writing generic sequential code using Substantial workflows.

import {
nextTimeWhenAdminIsAvailable,
sendSubscriptionEmail,
notifyAdmin,
} from "./utils.ts";

export async function sendEmail(ctx: Context) {
// 1. A workflow can receive parameters whose type is defined on the typegraph
const { to } = ctx.kwargs;

// 2. When a function call produces effects, we can make it durable
const info = await ctx.save(() => sendSubscriptionEmail(to));
const timeSent = await ctx.save(() => new Date());

const confirmation = ctx.receive<boolean>("confirmation");
if (!confirmation) {
throw new Error(`${to} has denied the subscription sent at ${timeSent}`);
}

// 3. In this scenario, we use a durable sleep to wait until the admin
// is available
const duration = await ctx.save(() =>
nextTimeWhenAdminIsAvailable(new Date()),
);
ctx.sleep(duration);

const _ = await ctx.save(() => notifyAdmin(info), {
retry: {
minBackoffMs: 1000,
maxBackoffMs: 5000,
maxRetries: 4,
},
});

return `${to} confirmed`;
}

export async function sendMultipleEmails(ctx: Context) {
const { emails } = ctx.kwargs;

// 1. Persist the state of the child workflows
const handlersDef = await ctx.save(async () => {
const handlersDef = [];
for (const email of emails) {
const handleDef = await ctx.startChildWorkflow(sendEmail, {
to: email,
});
handlersDef.push(handleDef);
}

return handlersDef;
});

// 2. Create handles for your child workflows
const handles = handlersDef.map((def) => ctx.createWorkflowHandle(def));

// 3. In this example, we wait on all child workflows to complete
await ctx.ensure(async () => {
for (const handle of handles) {
if (!(await handle.hasStopped())) {
return false;
}
}
return true;
});

const ret = await ctx.save(async () => {
const ret = [];
for (const handle of handles) {
const childResult = await handle.result<string>();
ret.push(childResult);
}

return ret;
});

return ret;
}

In your typegraph, you will have:

import { Policy, t, typegraph } from "@typegraph/sdk/index.ts";
import {
SubstantialRuntime,
Backend,
WorkflowFile,
} from "@typegraph/sdk/runtimes/substantial.ts";

typegraph(
{
name: "substantial-example",
},
(g) => {
const pub = Policy.public();
const backend = Backend.redis("REDIS_SECRET");
const file = WorkflowFile.deno("my_workflow.ts", [])
.import(["sendEmail", "sendMultipleEmails"])
.build();

const sub = new SubstantialRuntime(backend, [file]);

g.expose(
{
stop: sub.stop(),
send_multiple_emails: sub
.start(t.struct({ emails: t.list(t.email()) }))
.reduce({ name: "sendMultipleEmails" }),
send_single_email: sub
.start(t.struct({ to: t.email() }))
.reduce({ name: "sendEmail" }),
results_raw: sub.queryResultsRaw(),
workers: sub.queryResources(),
...sub.internals(), // Required for child workflows
},
pub,
);
},
);