Task Queue
The task queue is the boundary between the operator and agent pods. The operator submits tasks; agent pods consume them. Redis Streams is the default backend.
Architecture
ArkTeam controller
│ Submit(prompt, meta)
▼
Redis Stream (one per namespace.team.role)
│ XREADGROUP (consumer group per pod)
▼
Agent pod
│ call LLM, run tool loop
▼
Redis Hash (task results, keyed by task ID)
│ Ack(taskID, result)
▼
ArkTeam controller reads result, advances DAG
Redis Streams internals
Consumer groups: Each agent pod joins a consumer group for its role’s stream. Redis Streams guarantees at-least-once delivery — if a pod crashes mid-task, the message stays pending and is redelivered after a timeout.
Per-role isolation: In ArkTeam, each role polls its own stream keyed by <namespace>.<team>.<role>. Tasks submitted to one role are invisible to all others.
Results storage: Task results are stored in a Redis Hash keyed by task ID (HSET results <taskID> <result>). The operator retrieves them with HMGET — O(requested) not O(total stream size).
Backpressure: If a stream’s pending list exceeds maxPublishDepth (10,000 entries by default), new publishes are rejected and the error is returned to the caller.
Configuration
Set TASK_QUEUE_URL to point at your Redis instance:
# Via Helm
helm install ark-operator arkonis/ark-operator \
--set taskQueueURL=redis.ark-system.svc.cluster.local:6379
# Via env var (local ark run uses in-memory queue by default)
TASK_QUEUE_URL=redis://localhost:6379 ark run team.yaml
STREAM_CHANNEL_URL defaults to TASK_QUEUE_URL. If you want to use separate backends for task queue and streaming, set both explicitly.
The TaskQueue interface
The task queue is defined as an interface. Implement it to swap in any backend:
type TaskQueue interface {
Submit(ctx context.Context, prompt string, meta map[string]string) (string, error)
Poll(ctx context.Context) (*Task, error)
Ack(taskID, result string, usage TokenUsage) error
Nack(task Task, reason string) error
Results(ctx context.Context, taskIDs []string) ([]TaskResult, error)
Close()
}
The StreamChannel interface is for streaming partial LLM output back to the caller:
type StreamChannel interface {
Publish(key, chunk string) error
Done(key string) error
Read(ctx context.Context, key string) (string, error)
}
Register a custom backend:
func init() {
queue.RegisterQueue("nats", func(url string) (queue.TaskQueue, error) {
return nats.NewQueue(url)
})
queue.RegisterStream("nats", func(url string) (queue.StreamChannel, error) {
return nats.NewStream(url)
})
}
The backend is selected by URL scheme: redis://... → Redis adapter, nats://... → NATS adapter, no URL → in-memory (used by ark run).
Built-in backends
| Backend | TaskQueue | StreamChannel | URL scheme |
|---|---|---|---|
| Redis Streams | yes | yes | redis://... |
| In-memory | yes | no | (no URL / ark run) |
NATS and SQS backends are planned for a future release.
Dead-letter queue
Tasks that fail after exhausting retries are moved to a dead-letter stream. The operator runs a DLQ consumer that logs failures and optionally retries with exponential backoff.
Configure retry behavior on ArkAgent:
spec:
limits:
maxRetries: 3 # retry up to 3 times before DLQ
retryBackoffSeconds: 10 # initial backoff; doubles each retry
See also
- Environment Variables reference —
TASK_QUEUE_URL,STREAM_CHANNEL_URL - Observability — trace context propagation across the queue boundary