README
soflow
Easily run distributed workflows with AWS Simple Workflow Service and Lambda
Table of contents
Installation
A minimum of node 10 is required
yarn add soflow
Basic usage
You can find implementation examples over at Skalar/soflow-examples
Defining tasks
tasks.js
async function addOrderToDatabase(data) {
// do your stuff
return result
}
addOrderToDatabase.config = {
concurrency: 100, // ReservedConcurrentExecutions in lambda context
type: 'both', // deploy as lambda function and register activity type
memorySize: 128, // only enforced by lambda
scheduleToStartTimeout: 10, // only applies when run as activity
startToCloseTimeout: 60,
scheduleToCloseTimeout: 20, // only applies to activities
}
exports.addOrderToDatabase = addOrderToDatabase
Defining workflows
workflows.js
async function CreateOrder({
input: {confirmationEmailRecipient, products, customerId},
tasks: {addOrderToDatabase, sendOrderConfirmation},
}) {
const orderData = await addOrderToDatabase(customerId, products)
await sendOrderConfirmation(orderData)
return orderData
}
CreateOrder.config = {
startToCloseTimeout: 30,
tasks: {
addOrderToDatabase: {type: 'faas'},
sendOrderConfirmation: {type: 'activity'},
},
}
exports.CreateOrder = CreateOrder
Deploying AWS resources
const {SWF} = require('soflow')
const deployPromise = SWF.Orchestration.setup({
progressIndicator: true // default: false
deciderEnvironment: {
// environment variables available in lambda decider worklow functions
MY_CUSTOM_ENVIRONMENT_VARIABLE: 'myvalue',
}
// File glob patterns to include in the lambda package.
// Everything needed by your tasks must be included (including the soflow npm module).
// Default: [`${tasksPath}/**`, `${workflowsPath}/**`]
files: [
[
'stripped_node_modules/**',
// Provided callback can return a new filename or true/false for whether to include the file
path => path.replace('stripped_node_modules', 'node_modules')
],
'workflows/**',
'tasks/**',
'lib/**',
],
})
Running workers
Decider worker
This worker serves as the workflow decider/conductor.
#!/usr/bin/env node
const {SWF} = require('soflow')
const workflows = require('./workflows')
const tasks = require('./tasks')
const deciderWorker = new SWF.DeciderWorker({
workflows,
tasks,
concurrency: 2,
})
deciderWorker.on('error', error => {
console.error(error)
process.exit(1)
})
deciderWorker.start()
Activity worker
This worker executes scheduled activity tasks.
#!/usr/bin/env node
const {SWF} = require('soflow')
const workflows = require('./workflows')
const tasks = require('./tasks')
const activityWorker = new SWF.ActivityWorker({
workflows,
tasks,
concurrency: 2,
})
activityWorker.on('error', error => {
console.error(error)
process.exit(1)
})
activityWorker.start()
Lambda decider
Soflow supports running SWF deciders as Lamda functions.
Due to the nature of Lambda and SWF, the implementation has some important details.
Enable lambda decider
When the lambda decider is enabled, soflow enables a scheduled CloudWatch event rule that triggers the decider lambda function every minute.
The lambda function will run for 65-130 seconds, exiting when there no longer time (60s + 5s slack) to do an empty poll. This is to prevent decision tasks being temporarily "stuck". This means between 1 and 2 deciders are running at any given time, each able to handle multiple decision tasks concurrenctly.Note that it can take up to 1 minute for the first invocation to happen.
await SWF.Orchestration.Lambda.enableDecider()
Disable lambda decider
Disables the CloudWatch event rule. It can take up to 2 minutes for all deciders to be shut down.
await SWF.Orchestration.Lambda.disableDecider()
Manually invoke lambda decider function
May be used with enableDecider() to ensure a decider is running immediately, or to temporarily scale up the decider capacity.
await SWF.Orchestration.Lambda.invokeDecider()
Manually shut down running lambda deciders
await SWF.Orchestration.Lambda.shutdownDeciders()
Executing workflow
const {SWF} = require('soflow')
async function startCreateOrderWorkflow() {
// Initiate workflow execution
const execution = await SWF.executeWorkflow({
type: 'CreateOrder',
workflowId: 'CreateOrder-12345',
input: {productIds: [1, 2, 3]},
})
// Optionally await workflow result
const result = await execution.promise
}
Terminating workflow executions
const {SWF} = require('soflow')
async function terminationExample() {
await SWF.terminateAllExecutions() // terminate ALL workflow executions within namespace
await SWF.terminateExecution({workflowId: 'myid'})
}
Tearing down AWS resources
Warning: this removes all AWS resources within the given namespace
const {SWF} = require('soflow')
async function teardownExample() {
await SWF.Orchstration.teardown({
removeBucket: true, // default: false
progressIndicator: true, // default: false
})
}
Executing workflow without AWS
Soflow provides a limited LocalWorkflow backend, with the same API as the SWF backend.
This can be useful during development or testing, but be aware that it:
- runs all workflow (decider) functions in the current process
- does not enforce worklow timeouts
- only allows workflow signaling within the same process
- runs tasks in separate child processes, on the local node
- only enforces task startToCloseTimeout
- is not able to terminate workflow executions
const {LocalWorkflow} = require('soflow')
async function runWorkflowWithoutSWF() {
const execution = await LocalWorkflow.executeWorkflow({
type: 'CreateOrder'
workflowId: 'order-1234'
input: {}
// ...
})
// Optionally await workflow result
const result = await execution.promise
}
Configuration
You can provide configuration as
environment variables
, viasoflow.config
orpassed as an argument
to a soflow function.
const {SWF, config} = require('soflow')
config.update({
namespace: 'mynamespace',
swfDomain: 'MyDomain'
})
// above code must be required/invoked before your code that uses soflow.
SWF.executeWorkflow({namespace: 'othernamespace', ...})
Variable name | ENV variable | Description |
---|---|---|
namespace |
SOFLOW_NAMESPACE |
Prefix for all AWS resources (globally unique) default: undefined |
version |
SOFLOW_WORKFLOWS_VERSION |
Developer-specified workflows version to use default: undefined |
swfDomain |
SOFLOW_SWF_DOMAIN |
Under which AWS SWF Domain to operate default: 'SoFlow' |
codeRoot |
SOFLOW_CODE_ROOT |
Path to root directory of code to be deployed default: process.cwd() |
tasksPath |
SOFLOW_TASKS_PATH |
Requireable path to tasks, relative to codeRoot default: 'tasks' |
workflowsPath |
SOFLOW_WORKFLOWS_PATH |
Requireable path to workflows, relative to codeRoot default: 'workflows' |
soflowPath |
SOFLOW_PATH |
Requireable path to soflow, relative to codeRoot default: 'node_modules/soflow' |
s3Bucket |
SOFLOW_S3_BUCKET |
Name of S3 bucket to for lambda packages default: namespace |
s3Prefix |
SOFLOW_S3_PREFIX |
Key prefix for S3 objects default: 'soflow/' |
awsRegion |
SOFLOW_AWS_REGION |
Which AWS region to operate in default: 'eu-west-1' |
executionRetention |
SOFLOW_EXECUTION_RETENTION |
Number of days to keep workflow executions. note: Can only be set the first time an SWF domain is created, after which it is immutable default: 1 |
Development
Starting dev environment
# Bring up a local dynamodb and s3 as well as linting every time the code changes.
docker-compose up --build
# Or you could use the tmux-session script:
ln -s $PWD/scripts/tmux-session ~/.bin/soflow
soflow # start or resume tmux dev session
# brings up linting, unit and integration tests with file watching
soflow clean # stops/cleans docker containers, tmux session
Running tests
Requires the development environment to be running
# Unit and integration tests for all node targets
docker-compose exec dev scripts/test
# Unit tests with file watching and verbose output
docker-compose exec dev ash -c \
"scripts/unit-tests --watch --verbose"
# Integration tests with 'local' profile
docker-compose exec dev ash -c \
"PROFILES=local scripts/integration-tests --verbose"