workflow

The workflow package provides the core types for defining and executing agentic workflows in Microbus. It is used by both task authors (who interact with Flow) and workflow authors (who build Graph definitions).

Flow

Flow is the carrier object passed to every task endpoint. It provides access to the workflow’s identity, state, and control signals.

Identity Fields (read-only)

  • FlowID - unique identifier for this flow execution
  • WorkflowName - the workflow graph’s name
  • TaskName - the current task being executed
  • StepNum - the current step number in the flow

State Access

Tasks read input from state and write output back:

// Field-based access (one-off reads/writes)
name := flow.GetString("name")
score := flow.GetFloat("score")
flow.Set("approved", true)

// Check for existence
if flow.Has("delay") {
    delay := flow.GetFloat("delay")
    // ...
}

// Struct-based access (parse/diff pattern)
snap := flow.Snapshot() // returns map[string]any
var state MyState
flow.ParseState(&state)
// ... modify state ...
flow.SetChanges(state, snap) // only records fields that differ from snap

Control Signals

Tasks can influence execution flow:

flow.Goto(myserviceapi.OtherTask.URL()) // override next transition
flow.Retry()                            // re-execute this task
flow.Sleep(5 * time.Second)             // delay before next execution
flow.Interrupt(payload)                  // pause for external input

Graph

Graph defines the structure of a workflow: tasks, transitions between them, and reducers for fan-in state merging.

graph := workflow.NewGraph(myserviceapi.CreateOrder.URL())
graph.AddTransition(myserviceapi.Validate.URL(), myserviceapi.Charge.URL())
graph.AddTransitionWhen(myserviceapi.Validate.URL(), myserviceapi.Reject.URL(), "valid != true")
graph.AddTransition(myserviceapi.Charge.URL(), myserviceapi.Fulfill.URL())
graph.AddTransition(myserviceapi.Fulfill.URL(), workflow.END)

Transition Types

MethodDescription
AddTransition(from, to)Unconditional transition, always taken
AddTransitionWhen(from, to, when)Taken when the when expression matches state
AddTransitionGoto(from, to)Only taken when the task calls flow.Goto(to)
AddTransitionForEach(from, to, forEach, as)Dynamic fan-out: iterates over a state array field

Reducers

Reducers control how state fields are merged at fan-in points. The reducer is normally chosen by the field name’s prefix:

PrefixReducerBehavior
sum*ReducerAddSum numeric values
list*ReducerAppendConcatenate arrays, duplicates kept
set*ReducerUnion (array) or ReducerMerge (object)Deduplicate arrays, or merge objects with new key wins
anything elseReducerReplaceLast write wins (default)

The character right after the prefix must be uppercase: sumScore, listMessages, setUsers match; summary, listening, setup do not.

graph.SetReducer(field, reducer) is the escape hatch for fields whose names cannot follow the convention (e.g. dictated by an external schema):

graph.SetReducer("messages", workflow.ReducerAppend) // legacy field name

Explicit reducers are strict: ReducerUnion rejects object values and ReducerMerge rejects array values. Only the prefix-driven set* is polymorphic over JSON kind.

Time Budgets

Set per-task execution timeouts:

graph.SetTimeBudget(myserviceapi.SlowTask.URL(), 2*time.Minute)

Validation and Visualization

err := graph.Validate()    // check for structural errors
mermaid := graph.Mermaid() // generate a Mermaid flowchart

Transition

Transition describes a single edge in the graph:

type Transition struct {
    From     string // source task URL
    To       string // target task URL (or workflow.END)
    When     string // boolean expression evaluated against state
    WithGoto bool   // only taken on explicit flow.Goto()
    ForEach  string // state field to iterate over (dynamic fan-out)
    As       string // alias for current element during forEach
}

Node

Node describes a task or subgraph node registered in a graph:

type Node struct {
    Name       string        // task or subgraph URL
    TimeBudget time.Duration // execution timeout (0 = use foreman default)
    Subgraph   bool          // true if this node is a child workflow
}

Reducer

Reducer is a string constant defining a merge strategy:

const (
    ReducerReplace Reducer = "replace"
    ReducerAppend  Reducer = "append"
    ReducerAdd     Reducer = "add"
    ReducerUnion   Reducer = "union"
    ReducerMerge   Reducer = "merge"
)

RawFlow

RawFlow wraps Flow with additional methods used by the foreman orchestrator. Task authors do not interact with RawFlow directly. It exposes raw state/config/changes access, control signal readers, and mutation methods needed for orchestration bookkeeping.

Further Reading