Skip to content

Wire Protocol

DagNats supports two transport modes for workers: NATS (native) and HTTP (via bridge). Both use the same JSON schemas for task payloads and resolutions, ensuring consistent semantics across languages and runtimes.

NATS Transport

Workers connect directly to NATS JetStream and subscribe to task subjects.

Task Subjects

Task subjects follow the pattern task.{type}.{runID}:

SubjectMatches
task.llm.*All LLM tasks
task.http.*All HTTP tasks
task.llm.run-abcLLM tasks for run-abc only

Workers create durable pull consumers or ephemeral subscriptions with manual ACK.

TaskPayload Schema

Published to task subjects when the engine dispatches a step:

{
  "task_id": "run-1.step-a",
  "run_id": "run-1",
  "step_id": "step-a",
  "iteration": 0,
  "attempt": 1,
  "input": {"key": "value"}
}

Canonical Go type: protocol.TaskPayload in protocol/protocol.go.

FieldTypeRequiredDescription
task_idstringYesUnique task identifier: {run_id}.{step_id}
run_idstringYesWorkflow run identifier
step_idstringYesStep identifier from workflow DAG
iterationintNoAgent-loop iteration (0 for first execution)
attemptintNoRetry attempt number (1-based)
inputJSONNoStep input data as raw JSON

Task Resolution

Workers publish lifecycle events back to the history stream:

ActionEvent TypeSubjectDescription
Completestep.completedhistory.{runID}Task finished successfully with output payload
Failstep.failedhistory.{runID}Task failed with error payload
Continuestep.continuehistory.{runID}Agent loop requesting next iteration

Use protocol.NewStepEvent() to construct events with correct subject and dedup ID.

Heartbeat

Workers register in the workers KV bucket on startup via worker.Directory. The bucket has a 60s TTL, so workers must re-PUT their registration every 30s to remain visible. Deregistration happens automatically on TTL expiry or explicit DELETE.


HTTP Transport

The bridge exposes three endpoints for HTTP workers. All requests require Bearer token authentication via Authorization: Bearer {token} header. The token is configured via the DAGNATS_BRIDGE_TOKEN environment variable.

POST /v1/workers/connect

Registers a worker and maintains a Server-Sent Events (SSE) heartbeat stream. The bridge sends periodic heartbeat events to keep the connection alive and refreshes the worker’s KV TTL.

Request:

{
  "worker_id": "worker-123",
  "task_types": ["llm", "http"],
  "max_tasks": 2
}

Response: SSE stream with heartbeat events every 25 seconds.

Behavior:

  • Worker is registered in the workers KV bucket
  • Heartbeat events are sent every 25s to maintain connection
  • Worker is deregistered on disconnect

POST /v1/tasks/poll

Long-polls for available tasks from the TASK_QUEUES stream.

Request:

{
  "task_types": ["llm"],
  "max_tasks": 1,
  "timeout_ms": 30000
}
FieldTypeRequiredDescription
task_types[]stringYesTask types to poll for
max_tasksintYesMaximum tasks to return
timeout_msintYesLong-poll timeout in milliseconds (max 60000)

Response:

[
  {
    "task_id": "run-1.step-a",
    "run_id": "run-1",
    "step_id": "step-a",
    "iteration": 0,
    "attempt": 1,
    "input": {"prompt": "hello"}
  }
]

Returns empty array [] on timeout. Each fetched message is stored in an in-memory ack map keyed by task_id.

POST /v1/tasks/{id}/resolve

Resolves a polled task by completing, failing, pausing, or checkpointing it.

Request:

{
  "action": "complete",
  "output": {"result": "ok"},
  "error": "error message",
  "name": "pause name",
  "duration_ms": 5000,
  "checkpoint": {"state": "..."},
  "data": {"incremental": "..."}
}

Actions:

ActionFields UsedBehavior
completeoutputPublishes step.completed event, ACKs message, removes from ack map
failerrorPublishes step.failed event, ACKs message, removes from ack map
pauseduration_ms, checkpointWrites checkpoint to KV, NAKs with delay, removes from ack map
checkpointdataWrites incremental checkpoint to KV, extends ack deadline (InProgress)

Response: 200 OK on success, 404 Not Found if task ID not in ack map.


WorkerRegistration Schema

Workers register their presence in the workers KV bucket:

{
  "worker_id": "worker-123",
  "task_types": ["llm", "http"],
  "language": "python",
  "transport": "bridge",
  "max_tasks": 2,
  "metadata": {"version": "1.0.0"}
}

Canonical Go type: worker.WorkerRegistration in worker/directory.go.


Task Lifecycle

  1. Connect (HTTP only): Worker registers via /v1/workers/connect and maintains SSE heartbeat
  2. Poll: Worker polls for tasks via NATS subscription or /v1/tasks/poll
  3. Execute: Worker processes task using input from TaskPayload
  4. Resolve: Worker completes/fails via event publishing (NATS) or /v1/tasks/{id}/resolve (HTTP)

Pause and Checkpoint Semantics

Pause suspends task execution for a fixed duration. The worker writes checkpoint state to KV and NAKs the message with delay. After the delay expires, the task is redelivered to the same or another worker with the checkpoint data available in KV.

Checkpoint saves incremental state without suspending execution. The worker writes data to KV and calls InProgress() to extend the ack deadline. The task remains in-flight and the worker continues execution.

Both mechanisms use the checkpoints KV bucket with keys formatted as {run_id}.{step_id}.


Idempotency and Deduplication

MechanismID FormatScope
NATS message dedupNats-Msg-Id headerJetStream duplicate window (default 2 min)
Event dedup{run_id}.{step_id}.{event_type}Prevents duplicate events on replay
Rate retry dedup{run_id}.{step_id}.rate_retryPrevents duplicate retries

Authentication

TransportMethod
HTTP bridgeBearer token via Authorization: Bearer {token} header. Token set via DAGNATS_BRIDGE_TOKEN. Missing or invalid tokens return 401 Unauthorized.
NATS nativeNATS native authentication (user/password, tokens, NKey, JWT).

Implementation Limits

ParameterLimit
Pause duration1 hour (3,600,000 ms)
Poll timeout60 seconds (60,000 ms)
Worker KV TTL60 seconds
Worker heartbeat interval30 seconds (NATS), 25 seconds (HTTP SSE)
Signal payload size1 MiB

Reference Implementations

  • Go (NATS): worker/ package
  • HTTP SDKs: Implement against the three HTTP endpoints and JSON schemas above
  • All Go types referenced in this document are canonical – implement JSON serialization matching these types