Skip to content

Varpulis CLI Reference

Complete reference for all Varpulis command-line interface commands and options.

Installation

bash
# From source
cargo install --path crates/varpulis-cli

# Or build the workspace
cargo build --release
# Binary located at target/release/varpulis

Global Options

OptionEnvironment VariableDescription
-c, --config <PATH>VARPULIS_CONFIGPath to configuration file (YAML or TOML)
-h, --help-Print help information
-V, --version-Print version information

Commands

varpulis run

Execute a VPL program with optional MQTT connectivity.

bash
varpulis run --file program.vpl
varpulis run --code 'stream Readings = SensorReading'

Options:

OptionDescription
-f, --file <PATH>Path to the .vpl file to execute
-c, --code <STRING>Inline VPL code to execute

Notes:

  • Either --file or --code must be provided
  • If the program contains connector declarations with .from(), Varpulis will connect to the specified brokers
  • Press Ctrl+C to stop execution

Note: The config mqtt { } block syntax is deprecated. Use connector declarations with .from() instead. See Connectors.

Example with MQTT:

vpl
connector MqttBroker = mqtt (
    host: "localhost",
    port: 1883,
    client_id: "sensor-monitor"
)

event SensorReading:
    temperature: float

stream Readings = SensorReading
    .from(MqttBroker, topic: "sensors/#")
    .where(temperature > 100)
    .emit(alert_type: "HighTemp", temperature: temperature)

varpulis parse

Parse a VPL file and display the Abstract Syntax Tree (AST).

bash
varpulis parse program.vpl

Arguments:

ArgumentDescription
<FILE>Path to the .vpl file to parse

Output:

  • On success: Prints the AST in Rust debug format
  • On error: Displays parse error with location

varpulis check

Validate the syntax of a VPL file without executing it.

bash
varpulis check program.vpl

Arguments:

ArgumentDescription
<FILE>Path to the .vpl file to check

Output:

Syntax OK
   Statements: 12

Error Output:

Syntax error: unexpected token 'xyz' at line 5, column 10
   Hint: Did you mean 'where'?
   |
   | stream Readings xyz temperature > 100
   |                 ^

varpulis demo

Run the built-in HVAC building monitoring demo.

bash
varpulis demo --duration 120 --anomalies --metrics

Options:

OptionDefaultDescription
-d, --duration <SECS>60Duration to run the demo in seconds
--anomaliesdisabledEnable anomaly simulation
--degradationdisabledEnable gradual degradation simulation
--metricsdisabledEnable Prometheus metrics endpoint
--metrics-port <PORT>9090Port for Prometheus metrics

Output:

  • Real-time event processing statistics
  • Alert generation when thresholds are exceeded
  • Final summary with event count and throughput

varpulis server

Start the Varpulis WebSocket API server for IDE integration and remote control.

bash
varpulis server --port 9000 --api-key "secret" --metrics

Options:

OptionDefaultEnvironment VariableDescription
-p, --port <PORT>9000-WebSocket server port
--bind <ADDR>127.0.0.1-Bind address (use 0.0.0.0 for external access)
--api-key <KEY>noneVARPULIS_API_KEYAPI key for authentication (disables auth if not set)
--tls-cert <PATH>noneVARPULIS_TLS_CERTTLS certificate file (PEM format)
--tls-key <PATH>noneVARPULIS_TLS_KEYTLS private key file (PEM format)
--rate-limit <RPS>0VARPULIS_RATE_LIMITRate limit in requests/second per client (0 = disabled)
--workdir <PATH>current dir-Working directory for file operations
--metricsdisabled-Enable Prometheus metrics endpoint
--metrics-port <PORT>9090-Port for Prometheus metrics

Endpoints:

EndpointDescription
ws://host:port/wsWebSocket API (or wss:// with TLS)
GET /healthLiveness probe (always returns healthy)
GET /readyReadiness probe (returns ready when engine is loaded)

REST API (Multi-tenant Pipeline Management):

Authentication: X-API-Key header. When --api-key is set, a default tenant is auto-provisioned.

MethodEndpointDescription
POST/api/v1/pipelinesDeploy a pipeline ({"name": "...", "source": "..."})
GET/api/v1/pipelinesList all pipelines for the tenant
GET/api/v1/pipelines/:idGet pipeline details
DELETE/api/v1/pipelines/:idDelete a pipeline
POST/api/v1/pipelines/:id/eventsInject events ({"event_type": "...", "fields": {...}})
GET/api/v1/pipelines/:id/metricsPipeline metrics (events processed, alerts)
POST/api/v1/pipelines/:id/reloadHot reload with new source ({"source": "..."})
GET/api/v1/usageTenant usage stats and quota

WebSocket API Messages:

json
// Load a program
{"type": "load", "source": "stream X = Y"}

// Send an event
{"type": "event", "data": {"event_type": "Sensor", "value": 42}}

// Get status
{"type": "status"}

Example with TLS:

bash
varpulis server \
    --port 9443 \
    --bind 0.0.0.0 \
    --api-key "$(cat /secrets/api-key)" \
    --tls-cert /certs/server.crt \
    --tls-key /certs/server.key \
    --rate-limit 100

varpulis simulate

Play events from an event file through a VPL program.

bash
varpulis simulate --program rules.vpl --events data.evt --workers 8

Options:

OptionDefaultDescription
-p, --program <PATH>requiredPath to the VPL program (.vpl)
-e, --events <PATH>requiredPath to the event file (.evt)
--timeddisabledReplay events with real-time timing delays
--streamingdisabledRead events line-by-line instead of preloading (for huge files)
-v, --verbosedisabledShow each event as it's processed
-w, --workers <N>CPU coresNumber of worker threads for parallel processing
--partition-by <FIELD>autoField to use for partitioning events

Event File Format (.evt):

# Comments start with #
# Format: @<delay_ms> <event_type> { field: value, ... }

@0 TemperatureReading { sensor_id: "S1", value: 72.5 }
@100 TemperatureReading { sensor_id: "S2", value: 68.2 }
@200 HumidityReading { sensor_id: "S1", humidity: 45 }

Processing Modes:

ModeFlagDescription
Fast (default)(none)Load all events into memory, process as fast as possible
Timed--timedReplay events with real-time timing delays
Streaming--streamingRead events line-by-line (lower memory for huge files)
Parallel--workers NPartition and process in parallel

Output:

Varpulis Event Simulation
============================
Program: rules.vpl
Events:  data.evt
Mode:    fast (parallel)
Workers: 8

Starting simulation...

Simulation Complete
======================
Duration:         1.234s
Events processed: 1000000
Workers used:     8
Alerts generated: 42
Event rate:       810,373.2 events/sec

varpulis config-gen

Generate an example configuration file.

bash
varpulis config-gen --format yaml --output config.yaml

Options:

OptionDefaultDescription
-f, --format <FORMAT>yamlOutput format: yaml or toml
-o, --output <PATH>stdoutOutput file path

Example YAML output:

yaml
# Varpulis Configuration File
processing:
  workers: 4
  partition_by: "device_id"
  batch_size: 1000

mqtt:
  broker: "localhost"
  port: 1883
  input_topic: "events/#"
  output_topic: "alerts"
  qos: 1

metrics:
  enabled: true
  port: 9090

Environment Variables

VariableUsed ByDescription
VARPULIS_CONFIGGlobalPath to default configuration file
VARPULIS_API_KEYserverAPI key for WebSocket authentication
VARPULIS_TLS_CERTserverPath to TLS certificate
VARPULIS_TLS_KEYserverPath to TLS private key
VARPULIS_RATE_LIMITserverDefault rate limit (requests/second)
RUST_LOGAllLogging level (e.g., info, debug, trace)

Configuration File

Varpulis supports configuration files in YAML or TOML format. Use the global --config option or VARPULIS_CONFIG environment variable.

YAML Example

yaml
processing:
  workers: 8
  partition_by: "device_id"
  batch_size: 5000

mqtt:
  broker: "mqtt.example.com"
  port: 8883
  input_topic: "sensors/#"
  output_topic: "varpulis/alerts"
  client_id: "varpulis-prod"
  qos: 1
  tls: true

server:
  port: 9000
  bind: "0.0.0.0"
  api_key_file: "/secrets/api-key"

metrics:
  enabled: true
  port: 9090

TOML Example

toml
[processing]
workers = 8
partition_by = "device_id"
batch_size = 5000

[mqtt]
broker = "mqtt.example.com"
port = 8883
input_topic = "sensors/#"
output_topic = "varpulis/alerts"

[server]
port = 9000
bind = "0.0.0.0"

[metrics]
enabled = true
port = 9090

Exit Codes

CodeDescription
0Success
1Syntax error or validation failure
2File not found or I/O error
3Configuration error
4Runtime error (MQTT connection failed, etc.)

Examples

Basic Validation Workflow

bash
# Check syntax
varpulis check program.vpl

# View AST for debugging
varpulis parse program.vpl

# Simulate with sample events
varpulis simulate -p program.vpl -e test.evt --verbose

Production Deployment

bash
# Generate config template
varpulis config-gen -f yaml -o /etc/varpulis/config.yaml

# Run with MQTT in production
VARPULIS_CONFIG=/etc/varpulis/config.yaml \
RUST_LOG=info \
varpulis run --file /etc/varpulis/rules.vpl

High-Performance Batch Processing

bash
# Process 10M events with 16 workers
varpulis simulate \
    --program analytics.vpl \
    --events large_dataset.evt \
    --workers 16 \
    --partition-by device_id

Secure Server Deployment

bash
# Production server with TLS and authentication
varpulis server \
    --port 9443 \
    --bind 0.0.0.0 \
    --api-key "$(cat /run/secrets/api-key)" \
    --tls-cert /etc/ssl/varpulis.crt \
    --tls-key /etc/ssl/varpulis.key \
    --rate-limit 1000 \
    --workdir /var/lib/varpulis \
    --metrics \
    --metrics-port 9090

varpulis coordinator

Start the cluster coordinator (control plane for distributed execution).

bash
varpulis coordinator --port 9100 --api-key admin

Options:

OptionDefaultEnvironment VariableDescription
-p, --port <PORT>9100-Coordinator port
--bind <ADDR>127.0.0.1-Bind address
--api-key <KEY>noneVARPULIS_API_KEYAPI key for authentication

Endpoints:

EndpointDescription
GET /healthCoordinator health check
POST /api/v1/cluster/workers/registerWorker registration
POST /api/v1/cluster/workers/{id}/heartbeatWorker heartbeat
GET /api/v1/cluster/workersList workers
POST /api/v1/cluster/pipeline-groupsDeploy pipeline group
GET /api/v1/cluster/pipeline-groupsList pipeline groups
POST /api/v1/cluster/pipeline-groups/{id}/injectInject event
DELETE /api/v1/cluster/pipeline-groups/{id}Tear down group
GET /api/v1/cluster/topologyFull routing topology

See Cluster Architecture for full API documentation.


varpulis server (Cluster Mode)

When --coordinator is provided, the server registers as a cluster worker:

bash
varpulis server --port 9000 --api-key test \
    --coordinator http://localhost:9100 \
    --worker-id worker-0

Additional Cluster Options:

OptionDefaultEnvironment VariableDescription
--coordinator <URL>noneVARPULIS_COORDINATORCoordinator URL to register with
--worker-id <ID>auto-generatedVARPULIS_WORKER_IDWorker identifier

The worker auto-registers, sends heartbeats every 5 seconds, and retries registration with exponential backoff if the coordinator is unavailable.


Environment Variables

VariableUsed ByDescription
VARPULIS_CONFIGGlobalPath to default configuration file
VARPULIS_API_KEYserver, coordinatorAPI key for authentication
VARPULIS_TLS_CERTserverPath to TLS certificate
VARPULIS_TLS_KEYserverPath to TLS private key
VARPULIS_RATE_LIMITserverDefault rate limit (requests/second)
VARPULIS_COORDINATORserverCoordinator URL for cluster registration
VARPULIS_WORKER_IDserverWorker identifier in cluster mode
RUST_LOGAllLogging level (e.g., info, debug, trace)

See Also

Varpulis - Next-generation streaming analytics engine