Skip to main content

Substantial

Substantial runtime

The Substantial runtime enables the execution of durable workflows in one or across multiple typegates.

Why use it?

  • Long-running "processes": Durable tasks that need to run over extended periods (days, weeks or months), handling retries and restarts seamlessly.
  • Fault-tolerant execution: Ensure reliable execution of tasks, even upon failures, by maintaining a durable state of the latest run.
  • Task orchestration: Coordinate complex sequences of workflows (analogous to microservice interactions).

For example, the workflow below will continue running until a confirmation event is sent to the associated run.

export async function sendEmail(ctx: Context) {
// 1. A workflow can receive parameters whose type is defined on the typegraph
const { to } = ctx.kwargs;

// 2. When a function call produces effects, we can make it durable
const info = await ctx.save(() => sendSubscriptionEmail(to));
const timeSent = await ctx.save(() => new Date().toJSON());

const confirmation = ctx.receive<boolean>("confirmation");
if (!confirmation) {
throw new Error(`${to} has denied the subscription sent at ${timeSent}`);
}

return `${to} confirmed (${info})`;
}

Additionally, if we were to shut down the Typegate node executing it and then restart it, the state will be preserved. This means that if the subscription email was already sent, upon relaunch, it will not be sent again, same thing for the value of timeSent.

Key Concepts

Backend

This abstraction implements a set of atomic operations that allows Typegate to persist and recover the workflow state. Currently, we have the Redis backend available, along with others like fs and memory, which are primarily intended for development or testing purposes.

Workflows

A special type of function with durable state and an execution mechanism directly tied to time. A workflow can also trigger other workflows (child workflows).

Persistence and Lifecycle

  • Context

The context object contains the workflow input (namely kwargs as seen in the example above), but it can also be thought as a namespace that contains all of the core functions used for durableness.

It is recreated at every replay.

  • Interrupts

A special state of the program that is produced by any function that can trigger a workflow replay.

An interrupt will pause the program at the line it was emitted then queue it back to execute later.

One simple example of such function is when you want to wait for a given amount of time, Substantial will save the current time and the end time, interrupts the workflow then requeue it to execute later.

Any agent (Typegate node) that picks the workflow, will replay it, the cycle repeats until the actual current time is greater or equal to the end time.

ctx.sleep(24 * 3600 * 1000); // 1 day
  • Save

A save is one of the main building blocks of a workflow, many functions available on the context object rely on it.

This is mainly because a save call converts any function into a durable one: the function output is saved and persisted in the backend. On subsequent executions, the saved value is retrieved from the backend instead of re-executing the function.

This ensures that when a workflow is resumed (after a Typegate reboot for example) or replayed (after interrupts), the saved function will not be executed again.

// For example, if the output was 7 then after replay,
// save will not execute the function inside but directly return already persisted value, which was 7.
const rand = await ctx.save(() => Math.floor(10 * Math.random()));

// If you keep in mind that the workflow can be replayed many times
// A save call should make more sense!
const now = await ctx.save(() => Date.now());

// And even more for functions that can produce external side-effects
const result = await ctx.save(() => sendEmail());
Notes
  • Only JSON-serializable values can be persisted. The execution will throw otherwise.
  • Make sure to not rely on changing outside references inside a save call, best is to always expect a replay.
let value = 5;
const afterSave = await save(() => {
value *= 2;
return save; // 10 will be stored on the Backend
});

console.log(value); // 10 on the first replay, 5 on the next replay (save call function was skipped)
console.log(afterSave); // always 10

// Ideally, what you want is to reuse the saved value if the effect was desired
// especially when you branch
if (afterSave == 10) {
console.log("All good"); // branch path is now durable even after replays!
} else {
throw new Error("Unreachable code");
}
  • Send/Receive

You can send events to a workflow through GraphQL, any receive call on the workflow will await for it and will interrupt the workflow if it hasn't been received yet.

g.expose({
// ..
send: sub.send(t.integer()),
});
# Client
query {
send(run_id: "<workflow_run_id>", event: { name: "myEvent", payload: 1234 })
}
// Workflow
const value = ctx.receive<number>("myEvent"); // 1234
  • Ensure

It's a function that takes a predicate, and will interrupt the workflow so that it will be replayed later if the returned value is false.

const secret = ctx.receive<string>("secret");
await ctx.ensure(() => secret == "top_secret");
//
// continue execution

Run

When a workflow is started, a run is created and Substantial will provide you a run_id to uniquely identify it.

You can send an event or abort an ongoing run from its run_id.

Debugging your workflow

Logging

Under the context object, there is a logger namespace that you can use to emit event logs.

ctx.logger.info("Will start a sleep");
const startTime = await ctx.save(() => Date.now());

ctx.logger.warn("Started at", new Date());
ctx.sleep(5000);
ctx.logger.warn("Ended at", new Date());

const diff = await ctx.save(() => Date.now() - startTime);
if (diff < 0) {
// Always make sure that branches are deterministic accross replays
ctx.logger.error("Time went backwards");
throw new Error("Time travel");
}
Notes
  • Logger arguments must be JSON-serializable.
  • Similarly to save calls, any path leading to a logger function call must be deterministic.
// BAD
if (Math.random() < 0.5) {
// Path is not deterministic
ctx.logger.info("Lucky!");
}

// GOOD
// The solution is to save any non-deterministic value first
const isLucky = await ctx.save(() => Math.random() < 0.5);
if (isLucky) {
ctx.logger.info("Lucky!");
}

Debugging

Substantial comes with a few utility functions that you can expose on your typegraph.

You can check in real time the state of your workflows.

query AwesomeStatusDebugger($worlflow: String!)
results_raw(name: $workflow) {
ongoing {
count
runs {
run_id
logs {
timestamp # Time at which the log event was emitted
level # One of "Info", "Warn" or "Error"
value # A json string of an array of JSON objects
}
}
}

completed {
count
runs {
run_id
result {
status # One of "COMPLETED", "COMPLETED_WITH_ERROR"
value # Returned value or error by the workflow
}
logs {
timestamp
level
value
}
}
}
}

This endpoint can be exposed with queryResultsRaw or queryResults (if you want to enforce the return type shape).

g.expose({
// ..
results = sub.queryResults(
// if you want to enforce the return type
t.either([t.integer(), t.string()]).rename("ResultOrError"),
),
results_raw = sub.queryResultsRaw(), // if you do not care about the return type
});

Advanced Filters

In practice, you will have many workflows that have been executed, each run can be in different states. Substantial provides a way to filter the runs.

g.expose({
// ..
search: sub.advancedFilters(),
});
# Client
query {
search(
name: "sendEmail"
filter: {
and: [
{ status: { contains: "\"COMPLETED\"" } }
{ not: { started_at: { lt: "\"2025-01-15T00:00:00Z\"" } } }
{ not: { eq: "null" } }
]
}
) {
run_id
started_at
ended_at
status
value
}
}
  • Specification and syntax

The specification itself is very close to Prisma queries. You can also refer to your GraphQL playground for guiding you into expressing your query.

Base specification:

val           ::=  json_string

term ::= { eq: val }
| { lt: val } | { lte: val }
| { gt: val } | { gte: val }
| { in: val } | { contains: val }

special_term ::= { started_at: term }
| { ended_at: term }
| { status: term }

not ::= { not: expr }
or ::= { or: [expr] }
and ::= { and: [expr] }

expr ::= not | or | and | term | special_term
Notes
  • contains: Check if the workflow output is a list that contains the given value or if the given value is a substring of it.
  • in: Check if the workflow output is within a list or is a substring of the given value.
  • status
    • Can be one of "COMPLETED", "COMPLETED_WITH_ERROR", "ONGOING" or "UNKNOWN".

For example, the term

{ status: { contains: "\"COMPLETED\"" } }

..should cover "COMPLETED" and "COMPLETED_WITH_ERROR".

Child workflows

Child workflows are like any other workflows, they are just run by another workflow (parent).

If a workflow parent is explicitly stopped or aborted, all of its descendants will also be aborted.

For example, suppose you want to write a workflow that sends a subscription request to a list of emails and then receive a notification for each confirmation or denial, but only during your work hours.

You can easily translate that logic as if you were writing generic sequential code using Substantial workflows.

import {
nextTimeWhenAdminIsAvailable,
sendSubscriptionEmail,
notifyAdmin,
} from "./utils.ts";

export async function sendEmail(ctx: Context) {
// 1. A workflow can receive parameters whose type is defined on the typegraph
const { to } = ctx.kwargs;

// 2. When a function call produces effects, we can make it durable
const info = await ctx.save(() => sendSubscriptionEmail(to));
const timeSent = await ctx.save(() => new Date());

const confirmation = ctx.receive<boolean>("confirmation");
if (!confirmation) {
throw new Error(`${to} has denied the subscription sent at ${timeSent}`);
}

// 3. In this scenario, we use a durable sleep to wait until the admin
// is available
const duration = await ctx.save(() =>
nextTimeWhenAdminIsAvailable(new Date()),
);
ctx.sleep(duration);

const _ = await ctx.save(() => notifyAdmin(info), {
retry: {
minBackoffMs: 1000,
maxBackoffMs: 5000,
maxRetries: 4,
},
});

return `${to} confirmed`;
}

export async function sendMultipleEmails(ctx: Context) {
const { emails } = ctx.kwargs;

// 1. Persist the state of the child workflows
const handlersDef = await ctx.save(async () => {
const handlersDef = [];
for (const email of emails) {
const handleDef = await ctx.startChildWorkflow(sendEmail, {
to: email,
});
handlersDef.push(handleDef);
}

return handlersDef;
});

// 2. Create handles for your child workflows
const handles = handlersDef.map((def) => ctx.createWorkflowHandle(def));

// 3. In this example, we wait on all child workflows to complete
await ctx.ensure(async () => {
for (const handle of handles) {
if (!(await handle.hasStopped())) {
return false;
}
}
return true;
});

const ret = await ctx.save(async () => {
const ret = [];
for (const handle of handles) {
const childResult = await handle.result<string>();
ret.push(childResult);
}

return ret;
});

return ret;
}

In your typegraph, you will have:

import { Policy, t, typegraph } from "@typegraph/sdk/index.ts";
import {
SubstantialRuntime,
Backend,
WorkflowFile,
} from "@typegraph/sdk/runtimes/substantial.ts";

typegraph(
{
name: "substantial-example",
},
(g) => {
const pub = Policy.public();
const backend = Backend.redis("REDIS_SECRET");
const file = WorkflowFile.deno("my_workflow.ts", [])
.import(["sendEmail", "sendMultipleEmails"])
.build();

const sub = new SubstantialRuntime(backend, [file]);

g.expose(
{
stop: sub.stop(),
send_multiple_emails: sub
.start(t.struct({ emails: t.list(t.email()) }))
.reduce({ name: "sendMultipleEmails" }),
send_single_email: sub
.start(t.struct({ to: t.email() }))
.reduce({ name: "sendEmail" }),
results_raw: sub.queryResultsRaw(),
workers: sub.queryResources(),
...sub.internals(), // Required for child workflows
},
pub,
);
},
);

Versioning

There is no dedicated versioning scheme in Substantial. However, you can still manage versioning using existing features when applying incremental changes to a workflow.

Assume the following workflow:

import { checkStatus, getAmountById, sendEmail, pay } from "./payroll.ts";

export function payEmployee(ctx: Context) {
const { id } = ctx.kwargs;

let period = 0;
while (true) {
const isStillEmployed = await ctx.save(() => checkStatus(id));
if (!isStillEmployed) {
break;
}

const amount = await ctx.save(() => getAmountById(id));
await ctx.save(() => pay(id, amount));
await ctx.save(() => sendEmail(id));
period += 1;

ctx.logger.info(`Period ${period}: paid ${amount}`);
ctx.sleep(30 /*days*/ * 24 * 3600 * 1000);
}

ctx.logger.info(`Workflow terminated for Employee #${id}`);
}

Now, suppose you want to change the sleep interval from 30 days to 15 days for future runs. Reconciling this change with already-running workflows can be challenging.

Method 1: Abort or wait on all ongoing workflows, then start new ones

This works if your infrastructure lets you safely abort a workflow and restart it, or if your domain logic allows this approach.

In our example, this strategy is feasible only if all employees are paid in sync. Otherwise, you'd need to ensure you only abort at the end of a payroll cycle.

-    ctx.sleep(30/*days*/ * 24 * 3600 * 1000);
+ ctx.sleep(15/*days*/ * 24 * 3600 * 1000);
  • ✅ Simple, but requires coordination.

Method 2: Create a new workflow under a new name

Instead of modifying the existing workflow, keep it as-is and introduce a new one with the updated logic.

Then, switch over to the new workflow in your orchestration layer. Optionally, you can abort old ones after they finish.

export function payEmployee(ctx: Context) {
// old logic
}

export function payEmployee15Days(ctx: Context) {
// new logic
}
  • ✅ Clean separation, allows for drastically different logic.
  • ⚠️ You may need to update callers or orchestration logic.

Method 3: Add conditional logic for new behavior

This approach lets you keep the same workflow name but use branching logic based on a versioning flag (e.g., an optional cycle in ctx.kwargs).

It's especially useful when you have many interconnected workflows and don't want to change references or workflow names.

export function payEmployee(ctx: Context) {
const { id, cycle } = ctx.kwargs; // 'cycle' is new
// ..
while (true) {
// ..

if (!cycle) {
ctx.sleep(30 * 24 * 3600 * 1000); // legacy default
} else {
ctx.sleep(cycle * 24 * 3600 * 1000); // new logic
}
}
// ..
}

You can start new workflows with cycle set to 15, while old ones follow the legacy path.

  • ✅ Backwards-compatible and centralized logic.
  • ⚠️ Still requires external coordination to eventually abort the old instances.

Method 4: Add conditional logic with an external trigger

This is very similar to the previous method, the key difference being the source of the flag that decides the branch to pick.

In this setup, you control the code path from an external trigger.

export function payEmployee(ctx: Context) {
// ..
while (true) {
// ..

// Function that calls an external service
const version = await ctx.save(() => getVersion());
switch (version) {
case "15-days-patch-v1":
ctx.sleep(15 /*days*/ * 24 * 3600 * 1000);
break;

// You can add new conditions incrementally
// with potentially drastically different logic
// ..

default:
ctx.sleep(30 /*days*/ * 24 * 3600 * 1000);
}
}
// ..
}

This is especially useful because the workflow will simply use the new delay on the next cycle, automatically, without any need to migrate, and without breaking the event log.

  • ✅ All benefits from the previous methods, resilient to incremental changes.
  • ⚠️ The approach of needing an external service to inform the workflow itself, can get messy when the workflow complexifies.