Skip to content
Events and Event Sourcing

Events and Event Sourcing

DagNats uses event sourcing as its persistence model – the immutable event log is the source of truth, not the KV snapshots.

The WORKFLOW_HISTORY Stream

Every state change in a workflow run is recorded as an event on the WORKFLOW_HISTORY JetStream stream. Events are published to subjects matching history.{run_id} and are retained indefinitely with a 5-second deduplication window (via Nats-Msg-Id headers).

This stream is append-only. Events are never modified or deleted. The full history of any run can be reconstructed by replaying its events from the stream.

Event Types

Events are categorized by lifecycle scope:

Workflow lifecycle

EventWhen published
workflow.startedRun begins execution
workflow.completedAll non-auxiliary steps succeed
workflow.failedA step fails permanently
workflow.cancelledRun is cancelled
workflow.spawnSub-workflow child is created
workflow.child.completedChild sub-workflow finishes successfully
workflow.child.failedChild sub-workflow fails

Step lifecycle

EventWhen published
step.completedWorker calls Complete()
step.failedWorker calls Fail() / FailPermanent() / FailRetryAfter()
step.cancelledStep cancelled during run cancellation
step.continueWorker calls Continue() (agent loop iteration)

Agent loop

EventWhen published
agent.loop.iterationEngine processes a Continue event

Sleep and wait

EventWhen published
step.sleep.startedSleep step timer begins
step.sleep.completedSleep duration elapses
step.wait.startedWaitForEvent step begins watching
step.wait.matchedExternal event matches the condition
step.wait.timeoutWait timeout expires without a match

Map steps

EventWhen published
step.map.startedMap step fans out to individual tasks
step.map.completedAll map instances finish (or one fails)
step.map.instance.completedOne map item finishes

Compensation (saga)

EventWhen published
compensate.startedSaga rollback begins
compensate.step.completedOne compensation step finishes
compensate.failedCompensation itself fails
compensate.completedAll compensation steps succeed

How Events Drive DAG Resolution

The engine’s core function is dag.Advance(def, run, event) []Action – a pure function that takes an immutable workflow definition, the current run state, and a new event, then returns a list of actions to execute (enqueue tasks, complete the workflow, fail the workflow, etc.).

This function contains no I/O. It calculates which steps have all dependencies satisfied and produces the next set of actions. The engine loop is:

  1. Consume event from WORKFLOW_HISTORY
  2. Load run snapshot from KV
  3. Call Advance() to get actions
  4. Execute actions (publish tasks, update KV, publish new events)
  5. Repeat

Because Advance() is pure, the engine is stateless. On restart, it replays the event stream to reconstruct the current state of all active runs.

Replay Semantics

Any run’s complete state can be rebuilt by replaying its events from the WORKFLOW_HISTORY stream. This provides:

  • Crash recovery – the engine restarts and replays, no data lost
  • Debugging – replay a run’s history to understand exactly what happened
  • Auditing – every state transition is permanently recorded with timestamps

Deduplication via Nats-Msg-Id ensures that replayed events (from worker retries or engine restarts) do not create duplicate state transitions.

Events vs KV Snapshots

DagNats maintains both an event stream and KV snapshots. They serve different purposes:

ConcernEvent streamKV snapshot
AuthoritySource of truthRecovery convenience
MutabilityAppend-only, immutableOverwritten on each update
Use caseAudit, replay, debuggingFast current-state lookup
Optimistic lockingN/AKV Revision for CAS

The KV snapshot in workflow_runs stores the latest WorkflowRun state for fast reads. The engine updates it after processing each event. If the KV snapshot is lost or corrupted, it can be rebuilt entirely from the event stream.

Related pages

  • Runs – the state that events modify
  • Workers – the source of step completion events
  • Workflows and DAGs – the definition that Advance() evaluates against