API Reference
dag
import "github.com/danmestas/dagnats/dag"WorkflowBuilder provides a fluent DSL for constructing WorkflowDefs. Centralizing construction here lets callers express graph topology naturally without touching StepDef internals — the builder enforces invariants and delegates final structural validation to Validate.
dag/priority.go Priority resolution for workflow run ordering.
Index
- func CalculateDelay(policy RetryPolicy, attempt int) time.Duration
- func ExtractDotPath(path string, data []byte) (any, error)
- func IsComplete(def WorkflowDef, completed map[string]bool) bool
- func MarshalConfig(cfg interface{}) json.RawMessage
- func ResolveInput(step StepDef, steps map[string]StepState) ([]byte, error)
- func ResolvePriority(cfg *PriorityConfig, input json.RawMessage) int
- func Validate(def WorkflowDef) error
- func ValidateFragment(fragment []StepDef, cfg PlannerConfig, existingIDs map[string]bool) error
- func ValidateSchema(schema json.RawMessage, data json.RawMessage) error
- type AgentLoopConfig
- type ApprovalConfig
- type CancelOn
- type ConcurrencyLimit
- type KeyedRateLimit
- type MapConfig
- type MapInstanceState
- type Match
- type MatchOp
- type ParentCond
- type PlannerConfig
- type PriorityConfig
- type RateLimit
- type ResolvedMatch
- type RetryPolicy
- type RetryStrategy
- type RunStatus
- type SingletonConfig
- type SingletonMode
- type SleepConfig
- type StepDef
- func EffectiveSteps(def WorkflowDef, run WorkflowRun) []StepDef
- func NamespaceFragment(plannerID string, fragment []StepDef) []StepDef
- func ResolveCompensateChain(def WorkflowDef, completed map[string]bool, failedStepID string) []StepDef
- func ResolveReady(def WorkflowDef, completed map[string]bool, queued map[string]bool) []StepDef
- func ResolveSkipped(def WorkflowDef, completed map[string]bool, queued map[string]bool, steps map[string]StepState) []StepDef
- type StepRef
- func (r StepRef) After(refs …StepRef) StepRef
- func (r StepRef) Compensate(target StepRef) StepRef
- func (r StepRef) ID() string
- func (r StepRef) OnFailure(target StepRef) StepRef
- func (r StepRef) SkipIf(cond *ParentCond) StepRef
- func (r StepRef) WithDetach() StepRef
- func (r StepRef) WithKeyedRateLimit(krl KeyedRateLimit) StepRef
- func (r StepRef) WithLoopDelay(d time.Duration) StepRef
- func (r StepRef) WithMaxDuration(d time.Duration) StepRef
- func (r StepRef) WithMaxItems(n int) StepRef
- func (r StepRef) WithMaxIterations(n int) StepRef
- func (r StepRef) WithRateLimit(rl RateLimit) StepRef
- func (r StepRef) WithRetries(n int) StepRef
- func (r StepRef) WithTaskConcurrency(max int) StepRef
- func (r StepRef) WithTimeout(d time.Duration) StepRef
- type StepState
- type StepStatus
- type StepType
- type StickyStrategy
- type SubWorkflowConfig
- type WaitForEventOpts
- type WorkflowBuilder
- func NewWorkflow(name string) *WorkflowBuilder
- func (b *WorkflowBuilder) Agent(id, task string, metadata map[string]string) StepRef
- func (b *WorkflowBuilder) AgentLoop(id, task string) StepRef
- func (b *WorkflowBuilder) Approval(id string, cfg ApprovalConfig) StepRef
- func (b *WorkflowBuilder) Build() (WorkflowDef, error)
- func (b *WorkflowBuilder) CancelOn(event string, match Match) *WorkflowBuilder
- func (b *WorkflowBuilder) CancelOnWithTimeout(event string, match Match, timeout time.Duration) *WorkflowBuilder
- func (b *WorkflowBuilder) DependsOn(ids …string) *WorkflowBuilder
- func (b *WorkflowBuilder) Map(id, taskType string) StepRef
- func (b *WorkflowBuilder) Name() string
- func (b *WorkflowBuilder) Planner(id, task string, cfg PlannerConfig) StepRef
- func (b *WorkflowBuilder) Sleep(id string, duration time.Duration) StepRef
- func (b *WorkflowBuilder) SubWorkflow(id, workflow string) StepRef
- func (b *WorkflowBuilder) Task(id, task string) StepRef
- func (b *WorkflowBuilder) Version(v string) *WorkflowBuilder
- func (b *WorkflowBuilder) WaitForEvent(id string, opts WaitForEventOpts) StepRef
- func (b *WorkflowBuilder) WithConcurrency(maxRuns, maxSteps int) *WorkflowBuilder
- func (b *WorkflowBuilder) WithIdempotencyKey(dotPath string) *WorkflowBuilder
- func (b *WorkflowBuilder) WithMaxDuration(d time.Duration) *WorkflowBuilder
- func (b *WorkflowBuilder) WithMaxIterations(n int) *WorkflowBuilder
- func (b *WorkflowBuilder) WithPriority(cfg PriorityConfig) *WorkflowBuilder
- func (b *WorkflowBuilder) WithSingleton(mode SingletonMode) *WorkflowBuilder
- func (b *WorkflowBuilder) WithSingletonKey(mode SingletonMode, key string) *WorkflowBuilder
- func (b *WorkflowBuilder) WithSticky(s StickyStrategy) *WorkflowBuilder
- func (b *WorkflowBuilder) WithTimeout(d time.Duration) *WorkflowBuilder
- type WorkflowDef
- type WorkflowRun
func CalculateDelay
func CalculateDelay(policy RetryPolicy, attempt int) time.DurationCalculateDelay returns the delay before the next retry attempt. Attempt is 1-based (first retry = attempt 1).
func ExtractDotPath
func ExtractDotPath(path string, data []byte) (any, error)ExtractDotPath extracts a value from JSON data using a dot-separated path. The path walks nested objects: “data.user.id” accesses data[“user”][“id”]. Returns the raw value (string, number, bool, map, array, or nil). Panics if path is empty (programmer error); returns error for missing keys.
func IsComplete
func IsComplete(def WorkflowDef, completed map[string]bool) boolIsComplete returns true when every step in the definition has been completed or skipped. Auxiliary steps (OnFailure/Compensate targets) that were never triggered don’t block completion — they are expected to remain Pending in the happy path.
func MarshalConfig
func MarshalConfig(cfg interface{}) json.RawMessageMarshalConfig serializes a config struct into raw JSON for StepDef.Config. Panics on nil or marshal failure — both are programmer errors that should be caught at build time.
func ResolveInput
func ResolveInput(step StepDef, steps map[string]StepState) ([]byte, error)ResolveInput builds the input payload for a step from its upstream outputs. No deps → nil (first step receives workflow-level input from the caller). Single dep → pass that step’s output through unchanged. Fan-in → map of dep ID → raw output, so the task can address each upstream.
func ResolvePriority
func ResolvePriority(cfg *PriorityConfig, input json.RawMessage) intResolvePriority computes the priority offset from input data.
func Validate
func Validate(def WorkflowDef) errorValidate checks a WorkflowDef for structural correctness before any run is created. Catching these errors at definition time defines them out of existence at runtime — the engine can safely assume every WorkflowDef it receives has already passed Validate.
func ValidateFragment
func ValidateFragment(fragment []StepDef, cfg PlannerConfig, existingIDs map[string]bool) errorValidateFragment checks a planner-generated DAG fragment against bounds. All IDs must be unique and not collide with existing steps. Tasks must be non-empty and in AllowedTasks if configured. Dependencies must reference only within-fragment steps.
func ValidateSchema
func ValidateSchema(schema json.RawMessage, data json.RawMessage) errorValidateSchema validates data against a JSON Schema subset. Supports: type (string, number, boolean, object, array), required, properties (nested). Returns nil if schema is nil.
type AgentLoopConfig
AgentLoopConfig bounds the iterative behavior of an agent-loop step. Both limits are enforced: whichever fires first terminates the loop.
type AgentLoopConfig struct {
MaxIterations int `json:"max_iterations"`
MaxDuration time.Duration `json:"max_duration,omitempty"`
LoopDelay time.Duration `json:"loop_delay,omitempty"`
}func ParseAgentLoopConfig
func ParseAgentLoopConfig(step StepDef) (AgentLoopConfig, error)ParseAgentLoopConfig extracts AgentLoopConfig from a StepDef’s Config field. Returns an error if the step type is wrong, Config is nil, or the JSON is malformed.
type ApprovalConfig
ApprovalConfig holds configuration for approval gate steps. Subject is the NATS subject where a notification is published when the approval is requested. External systems subscribe to this subject and present approve/reject actions to humans.
type ApprovalConfig struct {
Timeout time.Duration `json:"timeout"`
Subject string `json:"subject"`
Description string `json:"description,omitempty"`
Metadata map[string]string `json:"metadata,omitempty"`
}func ParseApprovalConfig
func ParseApprovalConfig(step StepDef) (ApprovalConfig, error)ParseApprovalConfig extracts ApprovalConfig from a StepDef’s Config field. Returns an error if the step type is wrong, Config is nil, or the JSON is malformed.
type CancelOn
CancelOn specifies an event that cancels a running workflow.
type CancelOn struct {
Event string `json:"event"`
Match Match `json:"match"`
Timeout time.Duration `json:"timeout,omitempty"`
}type ConcurrencyLimit
ConcurrencyLimit controls parallel execution at workflow and step level.
type ConcurrencyLimit struct {
MaxRuns int `json:"max_runs,omitempty"`
MaxSteps int `json:"max_steps,omitempty"`
}type KeyedRateLimit
KeyedRateLimit configures per-key rate limiting using a dot-path expression.
type KeyedRateLimit struct {
Key string `json:"key"`
Limit int `json:"limit"`
Period time.Duration `json:"period"`
Units int `json:"units"`
}type MapConfig
MapConfig controls parallel execution for map steps that fan out. MaxItems caps the array size to prevent unbounded parallelism.
type MapConfig struct {
MaxItems int `json:"max_items"`
}func ParseMapConfig
func ParseMapConfig(step StepDef) (MapConfig, error)ParseMapConfig extracts MapConfig from a StepDef’s Config field.
type MapInstanceState
MapInstanceState tracks runtime state for one map item execution. Each instance represents one parallel task invocation for a map step.
type MapInstanceState struct {
Status StepStatus `json:"status"`
Output json.RawMessage `json:"output,omitempty"`
Error string `json:"error,omitempty"`
}type Match
Match is the builder-time type. Both sides are dot-path strings.
type Match struct {
Left string `json:"left"`
Op MatchOp `json:"op"`
Right string `json:"right"`
}func (Match) Resolve
func (m Match) Resolve(stepOutputs map[string][]byte, workflowInput []byte) (ResolvedMatch, error)Resolve converts a builder-time Match to a runtime ResolvedMatch.
type MatchOp
MatchOp defines comparison operators for event matching.
type MatchOp stringconst MatchOpEq MatchOp = "eq"type ParentCond
ParentCond evaluates a simple comparison on a parent step’s JSON output. Field is a top-level key in the output JSON object. Op is one of the six comparison operators: ==, !=, <, >, <=, >=. Value is the comparison target. Evaluated purely — no I/O, no side effects.
type ParentCond struct {
StepID string `json:"step_id"`
Field string `json:"field"`
Op string `json:"op"`
Value any `json:"value"`
}func SkipIfOutput
func SkipIfOutput(parent StepRef, field, op string, value any) *ParentCondSkipIfOutput creates a ParentCond for use with StepRef.SkipIf. The parent must be in the step’s DependsOn list (enforced by Validate).
func (*ParentCond) Evaluate
func (c *ParentCond) Evaluate(steps map[string]StepState) boolEvaluate returns true when the condition is satisfied against the given step states. Returns false if the parent step has no output, the field is missing, or the types are not comparable.
type PlannerConfig
PlannerConfig holds configuration for planner steps that generate DAG fragments at runtime. MaxSteps bounds the number of steps the planner can emit; MaxDepth bounds the longest dependency chain. AllowedTasks restricts which task types the fragment may reference.
type PlannerConfig struct {
MaxSteps int `json:"max_steps"`
MaxDepth int `json:"max_depth,omitempty"`
AllowedTasks []string `json:"allowed_tasks,omitempty"`
}func ParsePlannerConfig
func ParsePlannerConfig(step StepDef) (PlannerConfig, error)ParsePlannerConfig extracts PlannerConfig from a StepDef’s Config field. Returns an error if the step type is wrong, Config is nil, or the JSON is malformed.
type PriorityConfig
PriorityConfig controls run ordering when concurrency limits create backlogs. Key is a dot-path into input data.
type PriorityConfig struct {
Key string `json:"key"`
Rules map[string]int `json:"rules"`
DefaultOffset int `json:"default_offset"`
}type RateLimit
RateLimit configures global per-task-type rate limiting.
type RateLimit struct {
Limit int `json:"limit"`
Period time.Duration `json:"period"`
}type ResolvedMatch
ResolvedMatch is the runtime type stored in KV waiter entries. Right is resolved to a concrete value when the waiter is created.
type ResolvedMatch struct {
Left string `json:"left"`
Op MatchOp `json:"op"`
Right any `json:"right"`
}func (ResolvedMatch) Evaluate
func (m ResolvedMatch) Evaluate(eventData []byte) (bool, error)Evaluate checks if the match condition holds against event JSON data.
type RetryPolicy
RetryPolicy configures retry behavior for a step or as a workflow default. MaxAttempts=0 means no retries.
type RetryPolicy struct {
MaxAttempts int `json:"max_attempts"`
Strategy RetryStrategy `json:"strategy"`
InitialDelay time.Duration `json:"initial_delay"`
MaxDelay time.Duration `json:"max_delay"`
Multiplier float64 `json:"multiplier,omitempty"`
}func ResolveRetryPolicy
func ResolveRetryPolicy(wfDef WorkflowDef, stepDef StepDef) *RetryPolicyResolveRetryPolicy returns the effective retry policy for a step. Resolution order: step Retry → workflow DefaultRetry → legacy Retries field → nil (no retries).
type RetryStrategy
RetryStrategy selects the backoff algorithm for step retries.
type RetryStrategy intconst (
RetryFixed RetryStrategy = iota // Same delay every attempt
RetryLinear // delay * attempt
RetryExponential // delay * multiplier^(attempt-1)
)func (RetryStrategy) MarshalJSON
func (s RetryStrategy) MarshalJSON() ([]byte, error)func (RetryStrategy) String
func (s RetryStrategy) String() stringfunc (*RetryStrategy) UnmarshalJSON
func (s *RetryStrategy) UnmarshalJSON(data []byte) errortype RunStatus
RunStatus tracks the lifecycle of a workflow run. The zero value (pending) is a safe default — a newly created run has not yet been claimed by the engine.
type RunStatus intconst (
RunStatusPending RunStatus = iota
RunStatusRunning
RunStatusCompleted
RunStatusFailed
RunStatusCancelled
RunStatusCompensated
RunStatusCompensateFailed
)func (RunStatus) IsTerminal
func (r RunStatus) IsTerminal() boolIsTerminal returns true for statuses that represent a finished run.
func (RunStatus) MarshalJSON
func (r RunStatus) MarshalJSON() ([]byte, error)func (RunStatus) String
func (r RunStatus) String() stringfunc (*RunStatus) UnmarshalJSON
func (r *RunStatus) UnmarshalJSON(data []byte) errortype SingletonConfig
SingletonConfig constrains runs to one-at-a-time per key.
type SingletonConfig struct {
Mode SingletonMode `json:"mode"`
Key string `json:"key,omitempty"`
}type SingletonMode
SingletonMode determines behavior on duplicate detection. String type for safe JSON serialization in KV storage.
type SingletonMode stringconst (
SingletonModeSkip SingletonMode = "skip"
SingletonModeCancel SingletonMode = "cancel"
)type SleepConfig
SleepConfig holds configuration for sleep steps. Duration is the durable delay the engine waits before completing.
type SleepConfig struct {
Duration time.Duration `json:"duration"`
}func ParseSleepConfig
func ParseSleepConfig(step StepDef) (SleepConfig, error)ParseSleepConfig extracts SleepConfig from a StepDef’s Config field.
type StepDef
StepDef is the immutable declaration of a single step within a WorkflowDef. DependsOn lists step IDs that must complete before this step is queued. Config holds type-specific configuration as raw JSON — use ParseXxxConfig helpers to extract typed structs.
type StepDef struct {
ID string `json:"id"`
Task string `json:"task"`
DependsOn []string `json:"depends_on,omitempty"`
Retries int `json:"retries,omitempty"`
Timeout time.Duration `json:"timeout"`
Type StepType `json:"type"`
Config json.RawMessage `json:"config,omitempty"`
SkipIf *ParentCond `json:"skip_if,omitempty"`
Metadata map[string]string `json:"metadata,omitempty"`
Retry *RetryPolicy `json:"retry,omitempty"`
WorkerGroup string `json:"worker_group,omitempty"`
OnFailure string `json:"on_failure,omitempty"`
Compensate string `json:"compensate,omitempty"`
RateLimit *RateLimit `json:"rate_limit,omitempty"`
KeyedRateLimit *KeyedRateLimit `json:"keyed_rate_limit,omitempty"`
MaxTaskConcurrency int `json:"max_task_concurrency,omitempty"`
Singleton bool `json:"singleton,omitempty"`
}func EffectiveSteps
func EffectiveSteps(def WorkflowDef, run WorkflowRun) []StepDefEffectiveSteps returns the combined static + dynamic steps for a running workflow. When no dynamic steps exist, the original slice is returned unchanged to avoid allocation.
func NamespaceFragment
func NamespaceFragment(plannerID string, fragment []StepDef) []StepDefNamespaceFragment prefixes all step IDs and DependsOn references with the planner step ID to prevent collisions. Also forces all steps to StepTypeNormal — planners cannot spawn nested planners.
func ResolveCompensateChain
func ResolveCompensateChain(def WorkflowDef, completed map[string]bool, failedStepID string) []StepDefResolveCompensateChain returns compensate steps for completed steps in reverse topological order. Each step (except the first) gets a DependsOn pointing to the previous — this lets the engine enforce sequential execution using existing resolution logic.
func ResolveReady
func ResolveReady(def WorkflowDef, completed map[string]bool, queued map[string]bool) []StepDefResolveReady returns steps whose dependencies are fully satisfied (completed or skipped) and that have not yet been queued or completed. Both completed and queued are checked to avoid double-dispatching a step already in flight.
func ResolveSkipped
func ResolveSkipped(def WorkflowDef, completed map[string]bool, queued map[string]bool, steps map[string]StepState) []StepDefResolveSkipped returns steps whose dependencies are satisfied AND whose SkipIf condition evaluates to true. These should be marked Skipped by the orchestrator instead of being enqueued. Steps without SkipIf are never returned here.
type StepRef
StepRef is a compile-time-safe handle to a step within a WorkflowBuilder. Returned by Task() and AgentLoop(), it replaces string-based DependsOn with typed references that cannot silently miswire dependencies. The zero value is unusable — only the builder constructs valid StepRefs.
type StepRef struct {
// contains filtered or unexported fields
}func (StepRef) After
func (r StepRef) After(refs ...StepRef) StepRefAfter declares that this step depends on the given steps. Compile-time safe: passing a StepRef from a different builder panics immediately rather than producing a corrupt WorkflowDef discovered at validation time.
func (StepRef) Compensate
func (r StepRef) Compensate(target StepRef) StepRefCompensate designates a step to reverse this step’s side effects during saga compensation. Runs in reverse topo order when a downstream step fails permanently. Panics on zero-value StepRef, cross-builder refs, or self-reference.
func (StepRef) ID
func (r StepRef) ID() stringID returns the step’s string identifier. Useful for bridge code that still needs the raw ID (e.g. serialization boundaries).
func (StepRef) OnFailure
func (r StepRef) OnFailure(target StepRef) StepRefOnFailure designates a step to run when this step fails permanently (retries exhausted). The target step receives error context as input. Panics on zero-value StepRef, cross-builder refs, or self-reference.
func (StepRef) SkipIf
func (r StepRef) SkipIf(cond *ParentCond) StepRefSkipIf sets a condition that, when true, causes this step to be skipped instead of executed. The condition’s StepID must be in DependsOn (enforced by Validate). Skipped steps are treated as “satisfied” for downstream deps.
func (StepRef) WithDetach
func (r StepRef) WithDetach() StepRefWithDetach marks a SubWorkflow step as detached — the parent step completes immediately after spawning the child, without waiting for the child to finish. Panics if called on a non-SubWorkflow step.
func (StepRef) WithKeyedRateLimit
func (r StepRef) WithKeyedRateLimit(krl KeyedRateLimit) StepRefWithKeyedRateLimit sets per-key rate limiting on this step.
func (StepRef) WithLoopDelay
func (r StepRef) WithLoopDelay(d time.Duration) StepRefWithLoopDelay configures the delay between agent loop iterations. The orchestrator waits this duration before re-enqueuing the step. Useful for rate-limited APIs where you need spacing between calls.
func (StepRef) WithMaxDuration
func (r StepRef) WithMaxDuration(d time.Duration) StepRefWithMaxDuration configures the wall-clock bound on an AgentLoop step.
func (StepRef) WithMaxItems
func (r StepRef) WithMaxItems(n int) StepRefWithMaxItems configures the maximum number of items to process for a Map step. Calling this on a non-Map step or with n <= 0 panics — these are programmer errors that should be caught immediately.
func (StepRef) WithMaxIterations
func (r StepRef) WithMaxIterations(n int) StepRefWithMaxIterations configures the iteration bound on an AgentLoop step. Panics if the step is not an AgentLoop — calling this on a Task step indicates a logic error in the caller.
func (StepRef) WithRateLimit
func (r StepRef) WithRateLimit(rl RateLimit) StepRefWithRateLimit sets global per-task-type rate limiting on this step.
func (StepRef) WithRetries
func (r StepRef) WithRetries(n int) StepRefWithRetries sets the maximum number of retry attempts for this step. Zero means no retries — the step fails permanently on first error.
func (StepRef) WithTaskConcurrency
func (r StepRef) WithTaskConcurrency(max int) StepRefWithTaskConcurrency sets the global per-task-type concurrency limit on this step. At most max tasks of this type will execute concurrently across all workflow runs.
func (StepRef) WithTimeout
func (r StepRef) WithTimeout(d time.Duration) StepRefWithTimeout sets the per-attempt timeout on this step.
type StepState
StepState captures mutable runtime state for one step in a run. Output is kept as raw bytes to remain payload-agnostic. Iterations tracks how many agent-loop Continue cycles have completed; used to generate unique dedup IDs for each re-enqueue. LoopStartedAt records when the first iteration began, for MaxDuration enforcement. MapInstances tracks state for each parallel map item when Type == StepTypeMap. WakeAt records when a sleep step should complete, for engine scheduling. ChildRunID links to the spawned child run for SubWorkflow steps.
type StepState struct {
Status StepStatus `json:"status"`
Attempts int `json:"attempts"`
Iterations int `json:"iterations,omitempty"`
LoopStartedAt time.Time `json:"loop_started_at,omitempty"`
Output []byte `json:"output,omitempty"`
Error string `json:"error,omitempty"`
MapInstances []MapInstanceState `json:"map_instances,omitempty"`
WakeAt *time.Time `json:"wake_at,omitempty"`
ChildRunID string `json:"child_run_id,omitempty"`
}type StepStatus
StepStatus tracks the lifecycle of a single step within a run. Queued means the step has been dispatched to NATS but not yet claimed by a worker.
type StepStatus intconst (
StepStatusPending StepStatus = iota
StepStatusQueued
StepStatusRunning
StepStatusCompleted
StepStatusFailed
StepStatusSkipped
StepStatusCancelled
StepStatusRecovered
)func (StepStatus) MarshalJSON
func (s StepStatus) MarshalJSON() ([]byte, error)func (StepStatus) String
func (s StepStatus) String() stringfunc (*StepStatus) UnmarshalJSON
func (s *StepStatus) UnmarshalJSON(data []byte) errortype StepType
StepType distinguishes execution semantics — normal tasks run once, agent loops iterate until a termination signal, and sub-workflows delegate to a nested DAG.
type StepType intconst (
StepTypeNormal StepType = iota
StepTypeAgentLoop
StepTypeSubWorkflow
StepTypeAgent
StepTypeMap
StepTypeSleep
StepTypeWaitForEvent
StepTypeApproval
StepTypePlanner
)func (StepType) MarshalJSON
func (s StepType) MarshalJSON() ([]byte, error)func (StepType) String
func (s StepType) String() stringfunc (*StepType) UnmarshalJSON
func (s *StepType) UnmarshalJSON(data []byte) errortype StickyStrategy
StickyStrategy controls worker affinity for workflow runs.
type StickyStrategy stringconst (
StickyNone StickyStrategy = ""
StickySoft StickyStrategy = "soft"
StickyHard StickyStrategy = "hard"
)type SubWorkflowConfig
SubWorkflowConfig holds configuration for sub-workflow steps. Workflow names the child workflow definition to spawn. Detach controls whether the parent waits for the child to complete: when true, the parent step completes immediately after spawn.
type SubWorkflowConfig struct {
Workflow string `json:"workflow"`
Detach bool `json:"detach,omitempty"`
}func ParseSubWorkflowConfig
func ParseSubWorkflowConfig(step StepDef) (SubWorkflowConfig, error)ParseSubWorkflowConfig extracts SubWorkflowConfig from a StepDef’s Config field.
type WaitForEventOpts
WaitForEventOpts configures a wait-for-event step.
type WaitForEventOpts struct {
Event string `json:"event"`
Match Match `json:"match"`
Timeout time.Duration `json:"timeout"`
}func ParseWaitForEventConfig
func ParseWaitForEventConfig(step StepDef) (WaitForEventOpts, error)ParseWaitForEventConfig extracts WaitForEventOpts from a StepDef’s Config field.
type WorkflowBuilder
WorkflowBuilder accumulates step definitions and wires them into a WorkflowDef on Build(). current tracks the most recently added step so that chained modifier calls (DependsOn, WithTimeout, etc.) always target the right step.
type WorkflowBuilder struct {
// contains filtered or unexported fields
}func NewWorkflow
func NewWorkflow(name string) *WorkflowBuilderNewWorkflow starts a new builder for a workflow with the given name. Version defaults to “1” — override via Version() if needed.
func (*WorkflowBuilder) Agent
func (b *WorkflowBuilder) Agent(id, task string, metadata map[string]string) StepRefAgent appends a Claude Agent SDK step. Metadata carries role and other agent-specific config — the core DAG package is ignorant of what it means.
func (*WorkflowBuilder) AgentLoop
func (b *WorkflowBuilder) AgentLoop(id, task string) StepRefAgentLoop appends an agent-loop step with an initialised (but unconfigured) AgentLoopConfig. Callers must configure bounds via WithMaxIterations / WithMaxDuration before Build() — Validate enforces MaxIterations > 0.
func (*WorkflowBuilder) Approval
func (b *WorkflowBuilder) Approval(id string, cfg ApprovalConfig) StepRefApproval adds a human approval gate step to the workflow. No worker is involved — the engine manages the token and timeout.
func (*WorkflowBuilder) Build
func (b *WorkflowBuilder) Build() (WorkflowDef, error)func (*WorkflowBuilder) CancelOn
func (b *WorkflowBuilder) CancelOn(event string, match Match) *WorkflowBuilderCancelOn registers an event that cancels the workflow.
func (*WorkflowBuilder) CancelOnWithTimeout
func (b *WorkflowBuilder) CancelOnWithTimeout(event string, match Match, timeout time.Duration) *WorkflowBuilderCancelOnWithTimeout registers a cancellation event with timeout.
func (*WorkflowBuilder) DependsOn
func (b *WorkflowBuilder) DependsOn(ids ...string) *WorkflowBuilderDependsOn declares that the active step must not start until all listed step IDs have completed. Kept for backward compatibility — prefer After(StepRef) for new code which provides compile-time safety.
func (*WorkflowBuilder) Map
func (b *WorkflowBuilder) Map(id, taskType string) StepRefMap appends a map step that fans out over an array from its dependency. The step will execute taskType once per item in the input array, up to MapConfig.MaxItems. Returns a StepRef for chaining dependency wiring and calling WithMaxItems to override the default bound of 1000.
func (*WorkflowBuilder) Name
func (b *WorkflowBuilder) Name() stringName returns the workflow name. Used by higher-level packages that need to derive task names from the workflow identity.
func (*WorkflowBuilder) Planner
func (b *WorkflowBuilder) Planner(id, task string, cfg PlannerConfig) StepRefPlanner appends a planner step that generates a DAG fragment at runtime. The worker outputs JSON steps; the engine validates, namespaces, and materializes them into the running workflow.
func (*WorkflowBuilder) Sleep
func (b *WorkflowBuilder) Sleep(id string, duration time.Duration) StepRefSleep adds a durable delay step to the workflow. No worker is involved — the engine handles the timer.
func (*WorkflowBuilder) SubWorkflow
func (b *WorkflowBuilder) SubWorkflow(id, workflow string) StepRefSubWorkflow appends a sub-workflow step that spawns a child workflow execution. The child workflow must be registered in the workflow_defs KV bucket. By default the parent step blocks until the child completes; use WithDetach() on the returned StepRef to fire-and-forget.
func (*WorkflowBuilder) Task
func (b *WorkflowBuilder) Task(id, task string) StepRefTask appends a normal (non-looping) step and returns a StepRef for compile-time-safe dependency wiring and modifier chaining.
func (*WorkflowBuilder) Version
func (b *WorkflowBuilder) Version(v string) *WorkflowBuilderVersion overrides the default workflow version string.
func (*WorkflowBuilder) WaitForEvent
func (b *WorkflowBuilder) WaitForEvent(id string, opts WaitForEventOpts) StepRefWaitForEvent adds a step that waits for an external event to match a condition. No worker is involved — the engine handles event matching.
func (*WorkflowBuilder) WithConcurrency
func (b *WorkflowBuilder) WithConcurrency(maxRuns, maxSteps int) *WorkflowBuilderWithConcurrency sets workflow-level concurrency limits. MaxRuns bounds how many runs of this workflow execute in parallel; MaxSteps bounds how many steps execute concurrently within a single run.
func (*WorkflowBuilder) WithIdempotencyKey
func (b *WorkflowBuilder) WithIdempotencyKey(dotPath string) *WorkflowBuilderWithIdempotencyKey configures a dot-path expression evaluated against workflow input to produce a dedup key. Duplicate runs with the same key value return the existing run ID instead of creating a new one.
func (*WorkflowBuilder) WithMaxDuration
func (b *WorkflowBuilder) WithMaxDuration(d time.Duration) *WorkflowBuilderWithMaxDuration configures the wall-clock bound on the active AgentLoop step. Kept for backward compatibility — prefer StepRef.WithMaxDuration.
func (*WorkflowBuilder) WithMaxIterations
func (b *WorkflowBuilder) WithMaxIterations(n int) *WorkflowBuilderWithMaxIterations configures the iteration bound on the active AgentLoop step. Kept for backward compatibility — prefer StepRef.WithMaxIterations.
func (*WorkflowBuilder) WithPriority
func (b *WorkflowBuilder) WithPriority(cfg PriorityConfig) *WorkflowBuilderWithPriority configures run priority ordering.
func (*WorkflowBuilder) WithSingleton
func (b *WorkflowBuilder) WithSingleton(mode SingletonMode) *WorkflowBuilderWithSingleton configures global singleton constraint.
func (*WorkflowBuilder) WithSingletonKey
func (b *WorkflowBuilder) WithSingletonKey(mode SingletonMode, key string) *WorkflowBuilderWithSingletonKey configures per-entity singleton.
func (*WorkflowBuilder) WithSticky
func (b *WorkflowBuilder) WithSticky(s StickyStrategy) *WorkflowBuilderBuild assembles the WorkflowDef and delegates to Validate. Any structural error (cycle, missing dep, etc.) is surfaced here so callers get a clean error value rather than a panic at execution time. WithSticky configures worker affinity for workflow runs. Soft prefers the same worker; Hard requires it.
func (*WorkflowBuilder) WithTimeout
func (b *WorkflowBuilder) WithTimeout(d time.Duration) *WorkflowBuilderWithTimeout sets the per-attempt timeout on the active step. Kept for backward compatibility — prefer StepRef.WithTimeout for new code.
type WorkflowDef
WorkflowDef is the immutable schema for a workflow. Stored once, referenced by many runs. Version allows schema evolution without breaking existing runs.
type WorkflowDef struct {
Name string `json:"name"`
Version string `json:"version"`
Steps []StepDef `json:"steps"`
DefaultRetry *RetryPolicy `json:"default_retry,omitempty"`
Concurrency *ConcurrencyLimit `json:"concurrency,omitempty"`
Timeout time.Duration `json:"timeout,omitempty"`
InputSchema json.RawMessage `json:"input_schema,omitempty"`
OutputSchema json.RawMessage `json:"output_schema,omitempty"`
AuxSteps map[string]bool `json:"aux_steps,omitempty"`
IdempotencyKey string `json:"idempotency_key,omitempty"`
Sticky StickyStrategy `json:"sticky,omitempty"`
Priority *PriorityConfig `json:"priority,omitempty"`
CancelOn []CancelOn `json:"cancel_on,omitempty"`
Singleton *SingletonConfig `json:"singleton,omitempty"`
}func EffectiveDef
func EffectiveDef(def WorkflowDef, run WorkflowRun) WorkflowDefEffectiveDef returns a WorkflowDef augmented with dynamic steps from the run. The original def is not mutated — a shallow copy is returned with the combined step list and rebuilt AuxSteps.
func WithSchemas
func WithSchemas[I, O any](def WorkflowDef) WorkflowDefWithSchemas generates JSON schemas from Go types I (input) and O (output) and attaches them to the WorkflowDef. Applied after Build(). Supports flat structs with primitive fields, slices, and maps.
type WorkflowRun
WorkflowRun holds live state for a single execution of a WorkflowDef. Steps maps step ID to its current StepState; initialized to pending for all steps. Input preserves the original user-supplied payload so retries can reuse it.
type WorkflowRun struct {
RunID string `json:"run_id"`
WorkflowID string `json:"workflow_id"`
Status RunStatus `json:"status"`
Steps map[string]StepState `json:"steps"`
Input json.RawMessage `json:"input,omitempty"`
CreatedAt time.Time `json:"created_at"`
DynamicSteps []StepDef `json:"dynamic_steps,omitempty"`
ParentRunID string `json:"parent_run_id,omitempty"`
ParentStepID string `json:"parent_step_id,omitempty"`
Deadline *time.Time `json:"deadline,omitempty"`
PriorityOffset int `json:"priority_offset,omitempty"`
SingletonKey string `json:"singleton_key,omitempty"`
}func NewWorkflowRun
func NewWorkflowRun(def WorkflowDef, runID string) WorkflowRunNewWorkflowRun constructs a WorkflowRun with all steps initialized to pending. runID must be non-empty — callers are responsible for providing a unique ID (e.g. nuid.Next()) before calling this constructor.
func (WorkflowRun) EffectiveTime
func (r WorkflowRun) EffectiveTime() time.TimeEffectiveTime returns the priority-adjusted queue position.
Generated by gomarkdoc