State Management
Overview
Varpulis provides a pluggable state persistence layer with three storage backends. State management covers two distinct areas:
- Tenant/pipeline state -- which tenants exist, their pipelines, quotas, and usage counters. This is fully persistent with
--state-dir. - Engine processing state -- window contents, pattern matcher partial matches, aggregation accumulators. The checkpoint infrastructure exists but is not yet wired into the engine event loop.
Storage Backends
In-Memory (MemoryStore)
Default backend. No configuration needed.
- Zero latency
- Data lost on restart
- Suitable for development and testing
File System (FileStore)
Enabled via --state-dir CLI flag. Stores key-value pairs as files with atomic writes (temp file + rename).
varpulis server --port 9000 --api-key "key" --state-dir /var/lib/varpulis/state- Tenant and pipeline metadata persists across restarts
- Atomic writes prevent corruption
- Keys with
:separators map to subdirectories
RocksDB (RocksDbStore)
Available behind the persistence feature flag. Optimized for write-heavy workloads.
# Build with RocksDB support
cargo build --release --features persistence
# Use in code
use varpulis_runtime::persistence::{RocksDbStore, StateStore};
let store = RocksDbStore::open("/var/lib/varpulis/state")?;Configuration (set in code):
- Compression: LZ4
- Write buffer: 64 MB
- Max write buffers: 3
- Target file size: 64 MB
Checkpointing
Overview
Engine state checkpointing is fully integrated. All stateful components -- windows, SASE+ pattern matchers, join buffers, variables, watermark trackers -- are checkpointed and restored via EngineCheckpoint.
CheckpointConfig
CheckpointConfig {
interval: Duration::from_secs(60), // How often to checkpoint
max_checkpoints: 3, // Retain last N checkpoints
checkpoint_on_shutdown: true, // Save on graceful shutdown
key_prefix: "varpulis".to_string(), // Storage key prefix
}Checkpoint Contents
Each EngineCheckpoint contains:
| Field | Description |
|---|---|
window_states | Events in active windows (count, tumbling, sliding, session, partitioned) |
sase_states | Active SASE+ runs, captured events, NFA state |
join_states | Join buffer contents and expiry queues |
variables | Engine-level var declarations |
watermark_state | Per-source watermark positions and effective watermark |
events_processed | Total events processed counter |
output_events_emitted | Total output events counter |
The top-level Checkpoint wraps per-context EngineCheckpoint entries in context_states: HashMap<String, EngineCheckpoint>.
Recovery Process
For tenant/pipeline state:
- Server starts with
--state-dir TenantManagerloads tenant snapshots from the store- Pipelines are restored and re-compiled from saved VPL source
- Usage counters and quotas are restored
For engine state:
CheckpointManager::recover()loads the latest valid checkpoint from the storeengine.restore_checkpoint()restores window contents, SASE+ runs, join buffers, variables, watermark state, and counters- Processing resumes from the checkpoint position
See the Checkpointing Tutorial for hands-on examples.
Temporal Windows
Window Types
# Tumbling window (non-overlapping)
stream Metrics = Events
.window(5m)
.aggregate(count: count())
# Sliding window (overlapping)
stream Metrics = Events
.window(5m, sliding: 1m)
.aggregate(avg_val: avg(value))Memory Management
- Automatic eviction of out-of-window events
- Window contents are held in memory
- Partitioned windows maintain separate state per partition key
# Per-zone windowed aggregation
stream ZoneStats = Temperatures
.partition_by(zone)
.window(5m)
.aggregate(
zone: last(zone),
avg_temp: avg(value),
max_temp: max(value)
)Watermarks
Per-source watermark tracking handles out-of-order events:
stream Orders = OrderEvent
.watermark(out_of_order: 10s)
.allowed_lateness(30s)
.window(1m)
.aggregate(total: sum(amount))
.emit(total_amount: total).watermark(out_of_order: Xs)-- Track watermarks per event source with X seconds tolerance.allowed_lateness(Ys)-- Accept events up to Y seconds past the watermark- Watermark advancement triggers window closure
- Watermark state is included in checkpoints
Encryption at Rest
Available behind the encryption feature flag. Wraps any StateStore backend with AES-256-GCM authenticated encryption.
How It Works
EncryptedStateStore<S> transparently encrypts all data before writing and decrypts after reading:
- Algorithm: AES-256-GCM (authenticated encryption with associated data)
- Nonce: Random 96-bit nonce prepended to each ciphertext
- Key management: 32-byte key from hex-encoded env var or passphrase-derived via Argon2id
Configuration
# Option 1: Hex-encoded 256-bit key
export VARPULIS_ENCRYPTION_KEY="0123456789abcdef0123456789abcdef0123456789abcdef0123456789abcdef"
# Option 2: Passphrase (derived via Argon2id with random salt)
# Use EncryptedStateStore::key_from_passphrase("my-passphrase", &salt)Usage
use varpulis_runtime::persistence::{EncryptedStateStore, RocksDbStore, StateStore};
let store = RocksDbStore::open("/var/lib/varpulis/state")?;
let key = EncryptedStateStore::<RocksDbStore>::key_from_hex(
&std::env::var("VARPULIS_ENCRYPTION_KEY")?
)?;
let encrypted_store = EncryptedStateStore::new(store, key);
// Use encrypted_store exactly like any other StateStore
encrypted_store.save_checkpoint("ctx", &checkpoint).await?;Building with Encryption
cargo build --release --features persistence,encryptionPlanned Features
- Exactly-once processing semantics (checkpoint barriers in progress)
- External checkpoint storage (S3)
- Cross-context watermark propagation