Skip to content

protocol

import "github.com/danmestas/dagnats/protocol"

Wire types shared between the engine, workers, and API. This package defines the event schema, task payload format, and resolution types that flow through NATS subjects.

Key Types

TypeDescription
EventEnvelope for all workflow and step lifecycle events
EventTypeString enum for event types (workflow.started, step.completed, etc.)
TaskPayloadMessage body published to task subjects when the engine dispatches a step
TaskResolutionResolution sent by HTTP workers via the bridge resolve endpoint
FailureTypeCategorizes failures: permanent, transient, rate_limited

Event Types

ConstantValueScope
EventWorkflowStartedworkflow.startedWorkflow
EventWorkflowCompletedworkflow.completedWorkflow
EventWorkflowFailedworkflow.failedWorkflow
EventWorkflowCancelledworkflow.cancelledWorkflow
EventStepStartedstep.startedStep
EventStepCompletedstep.completedStep
EventStepFailedstep.failedStep
EventStepContinuestep.continueStep (agent loop)
EventApprovalGrantedapproval.grantedStep (approval)
EventApprovalRejectedapproval.rejectedStep (approval)

Event Structure

type Event struct {
    Type        EventType       `json:"type"`
    RunID       string          `json:"run_id"`
    StepID      string          `json:"step_id,omitempty"`
    Timestamp   time.Time       `json:"timestamp"`
    Payload     json.RawMessage `json:"payload,omitempty"`
    TraceParent string          `json:"trace_parent,omitempty"`
}

Constructor Functions

FunctionDescription
NewWorkflowEvent(typ, runID, payload)Creates a workflow-scoped event
NewStepEvent(typ, runID, stepID, payload)Creates a step-scoped event

Both constructors set the timestamp and provide methods for NATS subject routing (NATSSubject()) and deduplication IDs (NATSMsgID()).

TaskPayload Structure

type TaskPayload struct {
    TaskID    string          `json:"task_id"`
    RunID     string          `json:"run_id"`
    StepID    string          `json:"step_id"`
    Iteration int             `json:"iteration,omitempty"`
    Attempt   int             `json:"attempt,omitempty"`
    Input     json.RawMessage `json:"input,omitempty"`
}

TaskResolution Structure

Used by HTTP workers to resolve tasks via the bridge:

type TaskResolution struct {
    Action     string          `json:"action"`
    Output     json.RawMessage `json:"output,omitempty"`
    Error      string          `json:"error,omitempty"`
    DurationMs int             `json:"duration_ms,omitempty"`
    Checkpoint json.RawMessage `json:"checkpoint,omitempty"`
    Data       json.RawMessage `json:"data,omitempty"`
}

Actions: complete, fail, pause, checkpoint.