VPL Language Tutorial
A comprehensive guide to writing VPL programs, from basic event processing to advanced pattern matching.
Table of Contents
- Part 1: Basics - Events, streams, filters, emit
- Part 2: Windows and Aggregations
- Part 3: Sequence Patterns
- Part 4: SASE+ Advanced - Kleene, negation, AND/OR
- Part 5: Joins
- Part 6: Contexts - Multi-threaded execution with CPU affinity
Part 1: Basics
Events
Events are the fundamental unit of data in Varpulis. Each event has:
- A type (string identifier)
- A timestamp (automatically assigned or from data)
- Fields (key-value data)
Events arrive from sources like MQTT, files, or the WebSocket API:
{
"event_type": "TemperatureReading",
"timestamp": "2024-01-15T10:30:00Z",
"sensor_id": "S1",
"temperature": 72.5,
"unit": "F"
}Defining Event Types (Optional)
You can explicitly define event schemas:
event TemperatureReading:
sensor_id: str
temperature: float
unit: str
event HumidityReading:
sensor_id: str
humidity: floatStreams
Streams are continuous flows of events. Create streams with the stream keyword:
// Basic stream: listen for all TemperatureReading events
stream Temperatures = TemperatureReading
// Stream with alias
stream T = TemperaturesFiltering with .where()
Filter events based on conditions:
// Single condition
stream HighTemps = TemperatureReading
.where(temperature > 100)
// Multiple conditions (AND)
stream CriticalTemps = TemperatureReading
.where(temperature > 100 and sensor_id == "critical-zone")
// OR conditions
stream AlertZones = TemperatureReading
.where(sensor_id == "zone-1" or sensor_id == "zone-2")
// Compound conditions
stream Filtered = TemperatureReading
.where((temperature > 90 and humidity > 80) or emergency == true)Comparison operators: ==, !=, <, <=, >, >=
Logical operators: and, or, not
Selecting Fields with .select()
Transform events by selecting specific fields or computing new ones:
stream SimplifiedTemps = TemperatureReading
.select(
sensor: sensor_id,
temp: temperature,
is_high: temperature > 80
)
// Computed fields
stream EnhancedTemps = TemperatureReading
.select(
sensor_id,
temp_celsius: (temperature - 32) * 5 / 9,
reading_time: timestamp
)Emitting Alerts and Logs
Use .emit() and .print() to output data when conditions are met:
// Emit an alert
stream TempAlerts = TemperatureReading
.where(temperature > 100)
.emit(alert_type: "HighTemperature", message: "Sensor {sensor_id} reading {temperature}°F")
// Print to log
stream TempLog = TemperatureReading
.print("Received: {sensor_id} = {temperature}")
// Emit with severity
stream CriticalAlerts = TemperatureReading
.where(temperature > 150)
.emit(alert_type: "CriticalTemperature", message: "DANGER: {sensor_id} at {temperature}°F", severity: "critical")Variables and Constants
// Immutable variable
let threshold = 100
let sensor_name = "main-sensor"
// Mutable variable
var counter = 0
// Constants (compile-time)
const MAX_TEMP = 200
const API_KEY = "secret123"
// Use in streams
stream Alerts = TemperatureReading
.where(temperature > threshold)
.emit(alert_type: "High", message: "Above threshold")Comments
// Single-line comment
/*
Multi-line
comment
*/
# Hash-style comment (also single-line)Part 2: Windows and Aggregations
Windows collect events over time or count, enabling aggregate calculations.
Tumbling Windows
Non-overlapping, fixed-duration windows:
// 1-minute tumbling window
stream MinuteStats = TemperatureReading
.window(1m)
.aggregate(
avg_temp: avg(temperature),
max_temp: max(temperature),
count: count()
)
// 5-second window
stream RapidStats = SensorReading
.window(5s)
.aggregate(readings: count())
// 1-hour window
stream HourlyReport = Transaction
.window(1h)
.aggregate(
total: sum(amount),
avg_amount: avg(amount)
)Duration units: s (seconds), m (minutes), h (hours), d (days)
Sliding Windows
Overlapping windows with a slide interval:
// 5-minute window, slides every 1 minute
stream SlidingAvg = TemperatureReading
.window(5m, sliding: 1m)
.aggregate(
rolling_avg: avg(temperature)
)
// 10-second window, slides every 2 seconds
stream RecentTrend = SensorReading
.window(10s, sliding: 2s)
.aggregate(
recent_max: max(value),
recent_min: min(value)
)Count-Based Windows
Windows based on event count:
// Every 100 events
stream BatchStats = Transaction
.window(100)
.aggregate(
batch_total: sum(amount),
batch_avg: avg(amount)
)
// Sliding count window: 50 events, slide by 10
stream RollingBatch = Reading
.window(50, sliding: 10)
.aggregate(rolling_sum: sum(value))Aggregation Functions
| Function | Description | Example |
|---|---|---|
count() | Number of events | count() |
sum(field) | Sum of field values | sum(amount) |
avg(field) | Average (SIMD-optimized) | avg(temperature) |
min(field) | Minimum value (SIMD-optimized) | min(price) |
max(field) | Maximum value (SIMD-optimized) | max(price) |
stddev(field) | Standard deviation | stddev(latency) |
percentile(field, q) | Percentile (0.0-1.0) | percentile(latency, 0.95) |
median(field) | Median (50th percentile) | median(price) |
p50(field) | 50th percentile shorthand | p50(latency) |
p95(field) | 95th percentile shorthand | p95(latency) |
p99(field) | 99th percentile shorthand | p99(latency) |
collect(field) | Collect values into array | collect(sensor_id) |
first(field) | First value in window | first(timestamp) |
last(field) | Last value in window | last(value) |
Partitioned Windows
Windows partitioned by a key:
// Per-sensor statistics
stream PerSensorStats = TemperatureReading
.partition_by(sensor_id)
.window(1m)
.aggregate(
sensor: sensor_id,
avg_temp: avg(temperature),
readings: count()
)
// Per-customer totals
stream CustomerTotals = Transaction
.partition_by(customer_id)
.window(1h)
.aggregate(
customer: customer_id,
hourly_spend: sum(amount)
)Percentile Aggregations
Compute percentiles for latency monitoring, SLA tracking, and distribution analysis:
// Latency monitoring with percentile aggregations
stream LatencyStats = RequestEvent
.window(1m)
.aggregate(
median_ms: median(latency_ms),
p50: p50(latency_ms),
p95: p95(latency_ms),
p99: p99(latency_ms),
p999: percentile(latency_ms, 0.999)
)The percentile(field, q) function takes a quantile between 0.0 and 1.0. The median, p50, p95, and p99 functions are convenient shorthands.
Filtering After Aggregation
Apply conditions to aggregated results:
stream HighVolumeMinutes = Transaction
.window(1m)
.aggregate(
total: sum(amount),
count: count()
)
.having(total > 10000 or count > 100)
.emit(alert_type: "HighVolume", message: "Minute had {count} transactions totaling {total}")Part 3: Sequence Patterns
Sequence patterns detect events occurring in a specific order using the -> operator.
Basic Sequences
// A followed by B
pattern LoginLogout = Login -> Logout
// A followed by B followed by C
pattern ThreeStep = Start -> Process -> Complete
// Inline sequence in a stream
stream Sessions = Login as l -> Logout where user_id == l.user_id
.within(1h)
.emit(user_id: l.user_id, message: "Session: user logged in and out")Sequences with Conditions
// Events must match conditions
pattern FailedLogin =
LoginAttempt[status == "failed"] as first
-> LoginAttempt[status == "failed" and user_id == first.user_id] as second
-> LoginAttempt[status == "failed" and user_id == first.user_id] as third
within 5m
stream BruteForceDetection = LoginAttempt[status == "failed"] as first
-> LoginAttempt[status == "failed" and user_id == first.user_id] as second
-> LoginAttempt[status == "failed" and user_id == first.user_id] as third
.within(5m)
.emit(alert_type: "BruteForce", message: "3 failed attempts for {first.user_id}")Referencing Previous Events
Use aliases to reference earlier events in the sequence:
pattern PriceSpike =
Trade as t1
-> Trade[symbol == t1.symbol and price > t1.price * 1.1] as t2
within 1m
stream Spikes = Trade as t1
-> Trade[symbol == t1.symbol and price > t1.price * 1.1] as t2
.within(1m)
.emit(alert_type: "PriceSpike", message: "{t1.symbol} jumped from {t1.price} to {t2.price}")Temporal Constraints
Constrain how quickly events must occur:
// Must complete within 5 minutes
pattern QuickCheckout =
CartAdd -> PaymentStart -> PaymentComplete
within 5m
// Different timeouts per step
pattern SlowThenFast =
SlowEvent
-> FastEvent within 10s
-> FinalEvent within 5sPart 4: SASE+ Advanced
SASE+ extends basic patterns with Kleene closures, negation, and logical operators.
Kleene Plus (+) - One or More
// One or more failed logins followed by success
pattern BruteForceSuccess =
LoginFailed+ -> LoginSuccess
within 10m
stream Attacks = LoginFailed+ -> LoginSuccess
.within(10m)
.emit(alert_type: "BruteForce", message: "Multiple failures followed by success")Kleene Star (*) - Zero or More
// Start, any number of middle events, then end
pattern FullSession =
SessionStart -> Activity* -> SessionEnd
within 1h
// Optional retries before success
pattern WithRetries =
Request -> Retry* -> Success
within 30sNegation (NOT) - Absence of Event
// Payment started but not completed
pattern AbandonedPayment =
PaymentStart -> NOT(PaymentComplete) within 5m
stream Abandoned = PaymentStart -> NOT(PaymentComplete)
.within(5m)
.emit(alert_type: "Abandoned", message: "Payment started but not completed")
// Order without confirmation
pattern UnconfirmedOrder =
OrderPlaced -> NOT(OrderConfirmed) within 1hAND - Both Events (Any Order)
// Both A and B must occur (order doesn't matter)
pattern BothRequired =
AND(DocumentUploaded, SignatureProvided)
within 1h
stream Complete = AND(DocumentUploaded, SignatureProvided)
.within(1h)
.emit(message: "Both document and signature received")OR - Either Event
// Either payment method
pattern PaymentReceived =
OR(CreditCardPayment, BankTransfer)
stream Payments = OR(CreditCardPayment, BankTransfer)
.emit(message: "Payment received via {match.event_type}")Complex Combinations
// (A followed by B) AND (C or D), all within 10 minutes
pattern ComplexFlow =
(Start -> Middle) AND (OR(OptionA, OptionB))
within 10m
// Multiple failures, then either success or lockout
pattern AuthResult =
LoginFailed+ -> OR(LoginSuccess, AccountLocked)
within 15m
// Order placed, items added, no cancellation, then shipped
pattern SuccessfulOrder =
OrderPlaced
-> ItemAdded+
-> NOT(OrderCancelled)
-> OrderShipped
within 24hPartition-By for Patterns
Process patterns independently per partition key:
// Per-user pattern matching
pattern UserFailures =
LoginFailed+ -> LoginSuccess
within 10m
partition by user_id
stream UserAttacks = LoginFailed+ as fails -> LoginSuccess
.within(10m)
.partition_by(user_id)
.emit(alert_type: "UserBruteForce", message: "User {user_id} had multiple failures")Part 5: Joins
Join multiple event streams based on conditions.
Basic Join
stream EnrichedOrders = join(
stream Orders = OrderEvent,
stream Customers = CustomerEvent
on Orders.customer_id == Customers.id
)
.window(5m)
.select(
order_id: Orders.id,
customer_name: Customers.name,
order_total: Orders.total
)Multi-Stream Join
stream FullOrderDetails = join(
stream Orders = OrderEvent,
stream Customers = CustomerEvent
on Orders.customer_id == Customers.id,
stream Products = ProductEvent
on Orders.product_id == Products.id,
stream Inventory = InventoryEvent
on Orders.product_id == Inventory.product_id
)
.window(10m)
.select(
order_id: Orders.id,
customer: Customers.name,
product: Products.name,
in_stock: Inventory.quantity > 0
)Join with Aggregation
stream CustomerStats = join(
stream Orders = OrderEvent,
stream Customers = CustomerEvent
on Orders.customer_id == Customers.id
)
.window(1h)
.aggregate(
customer: Customers.name,
order_count: count(),
total_spent: sum(Orders.amount),
avg_order: avg(Orders.amount)
)Merge Streams
Combine multiple streams of the same type:
stream AllSensors = merge(
stream Zone1 = SensorReading .where(zone == "1"),
stream Zone2 = SensorReading .where(zone == "2"),
stream Zone3 = SensorReading .where(zone == "3")
)
.window(1m)
.aggregate(
total_sensors: count(),
avg_value: avg(value)
)Part 6: Contexts
Contexts let you run streams on dedicated OS threads for true multi-core parallelism.
Declaring Contexts
// Declare named execution contexts
context ingestion
context analytics (cores: [2, 3])
context alerts (cores: [4])Each context gets its own OS thread with a single-threaded Tokio runtime. The optional cores parameter pins the thread to specific CPU cores (Linux only).
Assigning Streams
Use .context() to assign a stream to a context:
context fast (cores: [0])
context slow (cores: [1])
stream RawFilter = SensorReading
.context(fast)
.where(value > 0)
.emit(sensor_id: sensor_id, value: value)
stream HeavyAnalytics = SensorReading
.context(slow)
.window(5m)
.aggregate(avg: avg(value), stddev: stddev(value))Cross-Context Communication
Send events from one context to another using context: in .emit():
context ingest (cores: [0])
context analyze (cores: [1])
// Filter in the ingest context, forward to analyze
stream Filtered = RawEvent
.context(ingest)
.where(priority > 5)
.emit(context: analyze, data: data, priority: priority)
// Aggregate in the analyze context
stream Stats = Filtered
.context(analyze)
.window(1m)
.aggregate(count: count(), avg_priority: avg(priority))Cross-context events are delivered via bounded mpsc channels.
Backward Compatibility
Programs without context declarations run exactly as before -- single-threaded with zero overhead. Contexts are purely opt-in.
For a complete tutorial with a multi-stage IoT pipeline, see the Contexts Guide.
Best Practices
1. Start Simple
Begin with basic filters before adding windows and patterns:
// Step 1: Basic filter
stream HighTemps = TemperatureReading
.where(temperature > 100)
// Step 2: Add window
stream HighTempMinutes = TemperatureReading
.where(temperature > 100)
.window(1m)
.aggregate(count: count())
// Step 3: Add alert
stream HighTempAlerts = TemperatureReading
.where(temperature > 100)
.window(1m)
.aggregate(count: count())
.having(count > 5)
.emit(alert_type: "SustainedHighTemp", message: "5+ high readings in 1 minute")2. Use Partitioning for Scale
// Process per-device independently
stream DeviceAlerts = SensorReading
.partition_by(device_id)
.window(1m)
.aggregate(avg_val: avg(value))
.having(avg_val > threshold)3. Set Appropriate Timeouts
// Don't wait forever for patterns
pattern QuickMatch = A -> B -> C within 5m
// Different timeouts for different patterns
pattern SlowProcess = Start -> Middle within 1h -> End within 10m4. Use Aliases for Clarity
pattern ClearPattern =
LoginFailed[user_id == "admin"] as failed_login
-> LoginSuccess[user_id == failed_login.user_id] as success
within 10m5. Test with Simulation
# Always test with event files first
varpulis check program.vpl
varpulis simulate -p program.vpl -e test_events.evt --verboseQuick Reference
Stream Operations
| Operation | Syntax | Description |
|---|---|---|
| Filter | .where(condition) | Filter events |
| Select | .select(field: expr, ...) | Transform/project fields |
| Window (tumbling) | .window(1m) | Time-based tumbling window |
| Window (sliding) | .window(5m, sliding: 1m) | Time-based sliding window |
| Window (count) | .window(100) | Count-based window |
| Aggregate | .aggregate(name: func(), ...) | Compute aggregations |
| Pattern | pattern A -> B | Sequence detection (declaration) |
| Partition | .partition_by(field) | Process per-key |
| Context | .context(name) | Assign to execution context |
| Emit | .emit(field: value, ...) | Output alert/event |
.print("message") | Output log message | |
| Having | .having(condition) | Post-aggregation filter |
Pattern Operators
| Operator | Meaning | Example |
|---|---|---|
-> | Followed by | A -> B -> C |
+ | One or more | A+ |
* | Zero or more | A* |
NOT | Absence | NOT(A) |
AND | Both (any order) | AND(A, B) |
OR | Either | OR(A, B) |
within | Time constraint | within 5m |
Duration Units
| Unit | Meaning | Example |
|---|---|---|
s | Seconds | 30s |
m | Minutes | 5m |
h | Hours | 1h |
d | Days | 1d |
Next Steps
- CLI Reference - All command options
- Windows & Aggregations Reference - Detailed window documentation
- SASE+ Pattern Guide - Advanced pattern matching
- Configuration Guide - MQTT, deployment, and more