Skip to content

dag

import "github.com/danmestas/dagnats/dag"

Pure DAG logic with zero NATS dependencies. This package defines the workflow data model, validation rules, and state machine for advancing runs through their steps.

Key Types

TypeDescription
WorkflowDefComplete workflow definition: name, version, steps, retry policy, concurrency limits, timeout, input/output schemas
StepDefIndividual step within a workflow: task type, dependencies, retry, timeout, conditional skip
WorkflowRunRuntime snapshot of an executing workflow: status, per-step state, timestamps
StepStatePer-step execution state: status, attempts, iterations, output, error
RetryPolicyStructured retry configuration: strategy, delays, max attempts
AgentLoopConfigConfiguration for agent-loop steps: max iterations, duration, delay
ConcurrencyLimitWorkflow-level parallelism: max parallel runs and steps
ParentCondConditional skip predicate evaluated against parent step output

Key Functions

FunctionDescription
Validate(def)Validates a workflow definition: unique IDs, valid references, acyclic graph (Kahn’s algorithm)
Advance(run, event)State machine: applies an event to a run snapshot and returns the next set of ready steps
ResolveRetryPolicy(def, step)Resolves the effective retry policy for a step (step > workflow default > legacy)
ValidateSchema(schema, data)Validates JSON data against a JSON Schema
ExtractDotPath(path, data)Extracts a value from JSON data using dot-notation path

Step Types

The StepType enum controls execution semantics:

TypeConstantDescription
normalStepTypeNormalRuns once, completes or fails
agent_loopStepTypeAgentLoopIterates until termination signal
sub_workflowStepTypeSubWorkflowDelegates to a nested workflow
agentStepTypeAgentSingle autonomous agent execution
mapStepTypeMapFan-out over input collection
sleepStepTypeSleepPause for a duration
wait_for_eventStepTypeWaitForEventWait for an external signal
approvalStepTypeApprovalHuman-in-the-loop gate
plannerStepTypePlannerDynamic step generation

Run Status

StatusTerminalDescription
pendingNoCreated, not yet started
runningNoAt least one step is executing
completedYesAll steps completed successfully
failedYesOne or more steps failed
cancelledYesCancelled by user or system
compensate_failedYesSaga compensation failed

Usage

def := dag.WorkflowDef{
    Name:    "pipeline",
    Version: "1.0",
    Steps: []dag.StepDef{
        {ID: "fetch", Task: "fetch", Timeout: 2 * time.Minute, Type: dag.StepTypeNormal},
        {ID: "process", Task: "process", Timeout: 5 * time.Minute, Type: dag.StepTypeNormal, DependsOn: []string{"fetch"}},
    },
}
if err := dag.Validate(def); err != nil {
    log.Fatal(err)
}