Substantial
Substantial runtime
The Substantial runtime enables the execution of durable workflows in one or accross multiple typegates.
Why use it?
- Long-running "processes": Durable tasks that need to run over extended periods (days, weeks or months), handling retries and restarts seamlessly.
- Fault-tolerant execution: Ensure reliable execution of tasks, even upon failures, by maintaining a durable state of the latest run.
- Task orchestration: Coordinate complex sequences of workflows (analogous to microservices interactions).
For example, the workflow bellow will continue running until a confirmation
event is sent to the associated run.
export async function sendEmail(ctx: Context) {
// 1. A workflow can receive parameters whose type is defined on the typegraph
const { to } = ctx.kwargs;
// 2. When a function call produces effects, we can make it durable
const info = await ctx.save(() => sendSubscriptionEmail(to));
const timeSent = await ctx.save(() => new Date().toJSON());
const confirmation = ctx.receive<boolean>("confirmation");
if (!confirmation) {
throw new Error(`${to} has denied the subscription sent at ${timeSent}`);
}
return `${to} confirmed (${info})`;
}
Additionally, if we were to shut down the Typegate node executing it and then restart it, the state will be preserved. This means that if the subscription email was already sent, upon relaunch, it will not be sent again, same thing for the value of timeSent
.
Key Concepts
Workflows
A special type of function with durable state and an execution mechanism directly tied to time. A workflow can also trigger other workflows (child workflows).
Backend
This abstraction implements a set of atomic operations that allows Typegate to persist and recover the workflow state. Currently, we have the Redis backend available, along with others like fs and memory, which are primarily intended for development or testing purposes.
Run
When a workflow is started, a run is created and Substantial will provide you a run_id
to uniquely identify it.
You can send an event or abort an ongoing run from its run_id
.
Child workflows
Child workflows are like any other workflows, they are just run by another workflow (parent).
If a workflow parent is explicitly stopped or aborted, all of its descendants will also be aborted.
For example, suppose you want to write a workflow that sends a subscription request to a list of emails and then receive a notification for each confirmation or denial, but only during your work hours.
You can easily translate that logic as if you were writing generic sequential code using Substantial workflows.
import {
nextTimeWhenAdminIsAvailable,
sendSubscriptionEmail,
notifyAdmin,
} from "./utils.ts";
export async function sendEmail(ctx: Context) {
// 1. A workflow can receive parameters whose type is defined on the typegraph
const { to } = ctx.kwargs;
// 2. When a function call produces effects, we can make it durable
const info = await ctx.save(() => sendSubscriptionEmail(to));
const timeSent = await ctx.save(() => new Date());
const confirmation = ctx.receive<boolean>("confirmation");
if (!confirmation) {
throw new Error(`${to} has denied the subscription sent at ${timeSent}`);
}
// 3. In this scenario, we use a durable sleep to wait until the admin
// is available
const duration = await ctx.save(() =>
nextTimeWhenAdminIsAvailable(new Date()),
);
ctx.sleep(duration);
const _ = await ctx.save(() => notifyAdmin(info), {
retry: {
minBackoffMs: 1000,
maxBackoffMs: 5000,
maxRetries: 4,
},
});
return `${to} confirmed`;
}
export async function sendMultipleEmails(ctx: Context) {
const { emails } = ctx.kwargs;
// 1. Persist the state of the child workflows
const handlersDef = await ctx.save(async () => {
const handlersDef = [];
for (const email of emails) {
const handleDef = await ctx.startChildWorkflow(sendEmail, {
to: email,
});
handlersDef.push(handleDef);
}
return handlersDef;
});
// 2. Create handles for your child workflows
const handles = handlersDef.map((def) => ctx.createWorkflowHandle(def));
// 3. In this example, we wait on all child workflows to complete
await ctx.ensure(async () => {
for (const handle of handles) {
if (!(await handle.hasStopped())) {
return false;
}
}
return true;
});
const ret = await ctx.save(async () => {
const ret = [];
for (const handle of handles) {
const childResult = await handle.result<string>();
ret.push(childResult);
}
return ret;
});
return ret;
}
In your typegraph, you will have:
import { Policy, t, typegraph } from "@typegraph/sdk/index.ts";
import {
SubstantialRuntime,
Backend,
WorkflowFile,
} from "@typegraph/sdk/runtimes/substantial.ts";
typegraph(
{
name: "substantial-example",
},
(g) => {
const pub = Policy.public();
const backend = Backend.redis("REDIS_SECRET");
const file = WorkflowFile.deno("my_workflow.ts", [])
.import(["sendEmail", "sendMultipleEmails"])
.build();
const sub = new SubstantialRuntime(backend, [file]);
g.expose(
{
stop: sub.stop(),
send_multiple_emails: sub
.start(t.struct({ emails: t.list(t.email()) }))
.reduce({ name: "sendMultipleEmails" }),
send_single_email: sub
.start(t.struct({ to: t.email() }))
.reduce({ name: "sendEmail" }),
results_raw: sub.queryResultsRaw(),
workers: sub.queryResources(),
...sub.internals(), // Required for child workflows
},
pub,
);
},
);