Skip to content

API Reference

worker

import "github.com/danmestas/dagnats/worker"

Index

func HandleTyped

func HandleTyped[I, O any](w *Worker, taskType string, fn TypedHandlerFunc[I, O])

HandleTyped registers a typed task handler that automatically marshals/unmarshals JSON. Combines Typed() and Handle() into a single call so workers don’t need to know about the wrapping.

type Checkpointable

Checkpointable is the subset of TaskContext for checkpoint operations. Kept for backward compatibility — code that type- asserts to Checkpointable still compiles. Prefer using TaskContext methods directly.

type Checkpointable interface {
    Checkpoint(state []byte) error
    LoadCheckpoint() ([]byte, error)
    Pause(name string, duration time.Duration) error
}

type Directory

Directory provides worker visibility via NATS KV. Each worker writes its registration to the “workers” bucket; the bucket’s TTL ensures stale entries are purged automatically.

type Directory struct {
    // contains filtered or unexported fields
}

func NewDirectory

func NewDirectory(js jetstream.JetStream) *Directory

NewDirectory creates a Directory backed by the “workers” KV bucket. Panics if js is nil or the bucket does not exist — both are programmer errors indicating missing setup.

func (*Directory) Deregister

func (d *Directory) Deregister(workerID string) error

Deregister removes the worker’s entry from the directory. Panics if workerID is empty. Returns nil if the key does not exist.

func (*Directory) List

func (d *Directory) List() ([]WorkerRegistration, error)

List returns all currently registered workers. Returns an empty slice when no workers are registered. Skips entries that fail to unmarshal (TTL expiry race).

func (*Directory) Register

func (d *Directory) Register(reg WorkerRegistration) error

Register writes the worker’s registration to the KV bucket. The worker must call Register periodically (before the 60s TTL) to maintain its presence. Panics on empty WorkerID or TaskTypes.

type HandlerFunc

HandlerFunc is the function signature for task handlers registered with a Worker.

type HandlerFunc func(ctx TaskContext) error

func Typed

func Typed[I, O any](fn TypedHandlerFunc[I, O]) HandlerFunc

Typed wraps a TypedHandlerFunc into a HandlerFunc by handling JSON serialization. Marshal/unmarshal failures are wrapped in NonRetryableError because bad serialization will not fix itself on retry.

type NonRetryableError

NonRetryableError wraps an error to signal that retrying will not help. The worker framework detects this via errors.As and calls ctx.Fail() instead of NakWithDelay, causing immediate permanent failure.

type NonRetryableError struct {
    Err error
}

func NewNonRetryableError

func NewNonRetryableError(err error) *NonRetryableError

NewNonRetryableError wraps err so the worker framework skips retries. Panics if err is nil — a nil non-retryable error is a programmer mistake.

func (*NonRetryableError) Error

func (e *NonRetryableError) Error() string

func (*NonRetryableError) Unwrap

func (e *NonRetryableError) Unwrap() error

type Signaler

Signaler is the subset of TaskContext for signal operations. Kept for backward compatibility — code that type-asserts to Signaler still compiles. Prefer using TaskContext methods directly.

type Signaler interface {
    WaitForSignal(
        name string, timeout time.Duration,
    ) ([]byte, error)
    SendSignal(runID, name string, data []byte) error
}

type TaskContext

TaskContext is the interface workers use to interact with the DagNats engine. Includes step completion, checkpointing, signals, and streaming. Workers call exactly one of Complete, Fail, or Continue per execution.

Checkpoint and signal methods depend on optional KV buckets (“checkpoints” and “signals”). They return an error if the bucket was not provisioned at startup — check your natsutil.SetupAll call.

type TaskContext interface {
    // Step identity and input
    Input() []byte
    RunID() string
    StepID() string
    RetryCount() int

    // Step completion — call exactly one per execution
    Complete(output []byte) error
    Fail(err error) error
    FailPermanent(err error) error
    FailRetryAfter(err error, after time.Duration) error
    Continue(output []byte) error

    // Streaming and heartbeat
    PutStream(data []byte) error
    Heartbeat() error

    // Checkpointing — save/restore handler state across retries
    Checkpoint(state []byte) error
    LoadCheckpoint() ([]byte, error)
    Pause(name string, duration time.Duration) error

    // Signals — coordinate between steps
    WaitForSignal(
        name string, timeout time.Duration,
    ) ([]byte, error)
    SendSignal(runID, name string, data []byte) error
}

type TypedHandlerFunc

TypedHandlerFunc is a task handler with typed input and output. The worker.Typed wrapper handles JSON marshal/unmarshal so handlers work with concrete Go types instead of raw []byte.

type TypedHandlerFunc[I, O any] func(ctx TaskContext, input I) (O, error)

type Worker

Worker subscribes to task subjects and dispatches messages to registered handlers. Each task type gets its own JetStream subscription; messages are ack’d after the handler returns so failures are retried by JetStream’s MaxDeliver policy.

type Worker struct {
    // contains filtered or unexported fields
}

func NewWorker

func NewWorker(nc *nats.Conn, tel *observe.Telemetry, opts ...WorkerOption) *Worker

NewWorker creates a Worker using the given connection and optional telemetry bundle. Panics if nc is nil or if JetStream cannot be initialised — both are programmer errors at startup. When tel is nil, a noop telemetry is used so callers are not forced to import observe for simple use cases.

func (*Worker) Handle

func (w *Worker) Handle(taskType string, handler HandlerFunc)

Handle registers a HandlerFunc for the given task type. Panics on empty taskType or nil handler — both are programmer errors.

func (*Worker) HandleSingleton

func (w *Worker) HandleSingleton(taskType string, handler HandlerFunc)

HandleSingleton registers a handler that runs as a single- partition elastic consumer group. Only one consumer processes messages at a time across all worker instances. Implicitly enables partitioned mode if not already configured.

func (*Worker) Start

func (w *Worker) Start()

Start creates JetStream subscriptions for all registered task types. Panics if any subscription fails — stream misconfiguration is a startup error. Binds optional KV buckets for checkpoints and signals (nil if not present). When groups are configured, subscribes to group-specific subjects.

func (*Worker) Stop

func (w *Worker) Stop()

Stop unsubscribes all active subscriptions. Safe to call after Start.

type WorkerOption

WorkerOption configures optional Worker behavior.

type WorkerOption func(*Worker)

func WithGroups

func WithGroups(groups ...string) WorkerOption

WithGroups configures the worker to subscribe only to specific worker groups. When provided, the worker subscribes to task.{taskType}.{group}.> instead of task.{taskType}.>.

func WithPartitions

func WithPartitions(n int) WorkerOption

WithPartitions configures pcgroups elastic consumer groups with the given partition count. 0 = legacy consumer (default).

type WorkerRegistration

WorkerRegistration is the directory entry for a running worker. The directory is observability-only — the engine never reads it. Workers register on startup and maintain their entry via periodic heartbeat writes (the KV bucket has a 60s TTL).

type WorkerRegistration struct {
    WorkerID  string            `json:"worker_id"`
    TaskTypes []string          `json:"task_types"`
    Language  string            `json:"language"`
    Transport string            `json:"transport"`
    MaxTasks  int               `json:"max_tasks"`
    Metadata  map[string]string `json:"metadata,omitempty"`
}

Generated by gomarkdoc