Skip to content

NATS Transport Architecture

Varpulis uses NATS in two distinct roles: as a data connector for event ingestion/output, and as an internal transport layer for cluster communication between the coordinator and workers.

Overview

NATS dual-role overview

AspectData ConnectorCluster Transport
PurposeUser event streaming (IoT, market data)Coordinator-worker communication
Feature flagnatsnats-transport
Cratevarpulis-runtimevarpulis-cluster
Subject prefixUser-defined in VPLvarpulis.cluster.*
ConfigurationVPL connector declarationsCluster startup parameters

Cluster Transport Layer

Subject Namespace

All cluster subjects use the varpulis.cluster prefix. NATS uses . as a separator, * for single-token wildcard, and > for multi-token wildcard.

Subject PatternDirectionPatternPurpose
varpulis.cluster.registerWorker -> CoordinatorRequest/ReplyWorker self-registration
varpulis.cluster.heartbeat.{worker_id}Worker -> CoordinatorPub/SubHealth monitoring
varpulis.cluster.cmd.{worker_id}.{cmd}Coordinator -> WorkerRequest/ReplyPipeline commands
varpulis.cluster.cmd.{worker_id}.>(subscription)WildcardWorker subscribes to all its commands
varpulis.cluster.pipeline.{group}.{from}.{to}Worker -> WorkerPub/SubInter-pipeline event routing
varpulis.cluster.raft.{node_id}.{rpc}Node -> NodeRequest/ReplyRaft consensus RPCs
varpulis.cluster.raft.{node_id}.>(subscription)WildcardNode subscribes to all its RPCs

Communication Patterns

Worker Registration

When a worker starts with NATS transport enabled, it sends a registration request to the coordinator:

Worker registration sequence

Registration payload:

json
{
  "worker_id": "worker-0",
  "address": "http://localhost:9000",
  "api_key": "test",
  "capacity": {"cpu_cores": 8, "pipelines_running": 0, "max_pipelines": 100}
}

Response:

json
{
  "worker_id": "worker-0",
  "status": "registered",
  "heartbeat_interval_secs": 5
}

Heartbeats

Workers publish heartbeats on their dedicated subject. The coordinator subscribes to the wildcard varpulis.cluster.heartbeat.> to receive all heartbeats:

Heartbeat pub/sub fan-in

The coordinator extracts the worker_id from the subject's last segment and updates the worker's health metrics.

Command Dispatch

The coordinator sends commands to specific workers using request/reply. Each worker subscribes to varpulis.cluster.cmd.{worker_id}.>:

Command dispatch sequence

Available commands:

CommandPurposePayload
deployDeploy a pipeline{name, source}
undeployRemove a pipeline{pipeline_id}
injectSend a single event{pipeline_id, event_type, fields}
inject_batchSend batch of events{pipeline_id, events}
checkpointSnapshot pipeline state{pipeline_id}
restoreRestore from checkpoint{pipeline_id, checkpoint}
drainGraceful shutdown{}

Architecture Diagram

Full NATS architecture


Comparison: NATS vs REST Transport

The cluster supports both REST (HTTP) and NATS transport for coordinator-worker communication.

FeatureREST TransportNATS Transport
RegistrationPOST /api/v1/cluster/workers/registervarpulis.cluster.register (req/reply)
HeartbeatsPOST /api/v1/cluster/workers/{id}/heartbeatvarpulis.cluster.heartbeat.{id} (pub)
CommandsPOST /workers/{id}/api/v1/...varpulis.cluster.cmd.{id}.{cmd} (req/reply)
DiscoveryWorkers need coordinator URLWorkers need NATS URL
ConnectionPoint-to-point HTTPMultiplexed over single NATS connection
OverheadTCP connection per requestPersistent connection, message framing
Inter-pipelineVia MQTT topicsVia varpulis.cluster.pipeline.* subjects

NATS transport is recommended when:

  • You already run a NATS server for data connectors
  • You need lower-latency command dispatch
  • You want a single transport layer for both data and control planes
  • You're using NATS for inter-pipeline event routing (replaces MQTT for internal comms)

Deployment

Requirements

  • nats-server v2.9+ (recommended: v2.10+ for enhanced JetStream support)
  • Default port: 4222
  • No JetStream required for basic cluster transport (core NATS is sufficient)

Starting nats-server

bash
# Standalone (development)
nats-server

# With monitoring
nats-server -m 8222

# Docker
docker run -d --name nats -p 4222:4222 -p 8222:8222 nats:latest

# Docker Compose (included in Varpulis cluster compose)
# See deploy/docker/docker-compose.cluster.yml

Building with NATS Transport

bash
# Data connector only
cargo build --release --features nats

# Cluster transport only
cargo build --release --features nats-transport

# Both
cargo build --release --features nats,nats-transport

# Everything
cargo build --release --features all-connectors,nats-transport

Authentication

NATS transport currently supports:

  • No auth (development)
  • User/password via connection options
  • Token authentication

For production, configure authentication on your nats-server:

# nats-server.conf
authorization {
  users = [
    {user: "varpulis", password: "secret"}
  ]
}

TLS

TLS is handled by the NATS server configuration. Clients connect using tls:// URLs:

nats-server --tls \
  --tlscert=/path/to/server.crt \
  --tlskey=/path/to/server.key

Workers and coordinators connect via tls://host:4222 instead of nats://host:4222.


Crate Structure

The NATS transport implementation spans two crates:

varpulis-cluster (transport layer)

FilePurpose
nats_transport.rsSubject helpers, connect_nats(), nats_request(), nats_publish(), NatsTransportError
nats_coordinator.rsrun_coordinator_nats_handler() -- registration and heartbeat subscriptions
nats_worker.rsrun_worker_nats_handler() -- command subscription and dispatch

varpulis-runtime (data connector)

FilePurpose
connector/nats.rsNatsSource, NatsSink, NatsConfig, JSON parsing
connector/managed_nats.rsManagedNatsConnector -- shared client, health tracking
connector/managed_registry.rsFactory: creates NATS connectors from ConnectorConfig

See Also

Varpulis - Next-generation streaming analytics engine