Skip to content

VPL Language Tutorial

A comprehensive guide to writing VPL programs, from basic event processing to advanced pattern matching.

Table of Contents

  1. Part 1: Basics - Events, streams, filters, emit
  2. Part 2: Windows and Aggregations
  3. Part 3: Sequence Patterns
  4. Part 4: SASE+ Advanced - Kleene, negation, AND/OR
  5. Part 5: Joins
  6. Part 6: Contexts - Multi-threaded execution with CPU affinity

Part 1: Basics

Events

Events are the fundamental unit of data in Varpulis. Each event has:

  • A type (string identifier)
  • A timestamp (automatically assigned or from data)
  • Fields (key-value data)

Events arrive from sources like MQTT, files, or the WebSocket API:

json
{
  "event_type": "TemperatureReading",
  "timestamp": "2024-01-15T10:30:00Z",
  "sensor_id": "S1",
  "temperature": 72.5,
  "unit": "F"
}

Defining Event Types (Optional)

You can explicitly define event schemas:

vpl
event TemperatureReading:
    sensor_id: str
    temperature: float
    unit: str

event HumidityReading:
    sensor_id: str
    humidity: float

Streams

Streams are continuous flows of events. Create streams with the stream keyword:

vpl
// Basic stream: listen for all TemperatureReading events
stream Temperatures = TemperatureReading

// Stream with alias
stream T = Temperatures

Filtering with .where()

Filter events based on conditions:

vpl
// Single condition
stream HighTemps = TemperatureReading
    .where(temperature > 100)

// Multiple conditions (AND)
stream CriticalTemps = TemperatureReading
    .where(temperature > 100 and sensor_id == "critical-zone")

// OR conditions
stream AlertZones = TemperatureReading
    .where(sensor_id == "zone-1" or sensor_id == "zone-2")

// Compound conditions
stream Filtered = TemperatureReading
    .where((temperature > 90 and humidity > 80) or emergency == true)

Comparison operators: ==, !=, <, <=, >, >=

Logical operators: and, or, not

Selecting Fields with .select()

Transform events by selecting specific fields or computing new ones:

vpl
stream SimplifiedTemps = TemperatureReading
    .select(
        sensor: sensor_id,
        temp: temperature,
        is_high: temperature > 80
    )

// Computed fields
stream EnhancedTemps = TemperatureReading
    .select(
        sensor_id,
        temp_celsius: (temperature - 32) * 5 / 9,
        reading_time: timestamp
    )

Emitting Alerts and Logs

Use .emit() and .print() to output data when conditions are met:

vpl
// Emit an alert
stream TempAlerts = TemperatureReading
    .where(temperature > 100)
    .emit(alert_type: "HighTemperature", message: "Sensor {sensor_id} reading {temperature}°F")

// Print to log
stream TempLog = TemperatureReading
    .print("Received: {sensor_id} = {temperature}")

// Emit with severity
stream CriticalAlerts = TemperatureReading
    .where(temperature > 150)
    .emit(alert_type: "CriticalTemperature", message: "DANGER: {sensor_id} at {temperature}°F", severity: "critical")

Variables and Constants

vpl
// Immutable variable
let threshold = 100
let sensor_name = "main-sensor"

// Mutable variable
var counter = 0

// Constants (compile-time)
const MAX_TEMP = 200
const API_KEY = "secret123"

// Use in streams
stream Alerts = TemperatureReading
    .where(temperature > threshold)
    .emit(alert_type: "High", message: "Above threshold")

Comments

vpl
// Single-line comment

/*
   Multi-line
   comment
*/

# Hash-style comment (also single-line)

Part 2: Windows and Aggregations

Windows collect events over time or count, enabling aggregate calculations.

Tumbling Windows

Non-overlapping, fixed-duration windows:

vpl
// 1-minute tumbling window
stream MinuteStats = TemperatureReading
    .window(1m)
    .aggregate(
        avg_temp: avg(temperature),
        max_temp: max(temperature),
        count: count()
    )

// 5-second window
stream RapidStats = SensorReading
    .window(5s)
    .aggregate(readings: count())

// 1-hour window
stream HourlyReport = Transaction
    .window(1h)
    .aggregate(
        total: sum(amount),
        avg_amount: avg(amount)
    )

Duration units: s (seconds), m (minutes), h (hours), d (days)

Sliding Windows

Overlapping windows with a slide interval:

vpl
// 5-minute window, slides every 1 minute
stream SlidingAvg = TemperatureReading
    .window(5m, sliding: 1m)
    .aggregate(
        rolling_avg: avg(temperature)
    )

// 10-second window, slides every 2 seconds
stream RecentTrend = SensorReading
    .window(10s, sliding: 2s)
    .aggregate(
        recent_max: max(value),
        recent_min: min(value)
    )

Count-Based Windows

Windows based on event count:

vpl
// Every 100 events
stream BatchStats = Transaction
    .window(100)
    .aggregate(
        batch_total: sum(amount),
        batch_avg: avg(amount)
    )

// Sliding count window: 50 events, slide by 10
stream RollingBatch = Reading
    .window(50, sliding: 10)
    .aggregate(rolling_sum: sum(value))

Aggregation Functions

FunctionDescriptionExample
count()Number of eventscount()
sum(field)Sum of field valuessum(amount)
avg(field)Average (SIMD-optimized)avg(temperature)
min(field)Minimum value (SIMD-optimized)min(price)
max(field)Maximum value (SIMD-optimized)max(price)
stddev(field)Standard deviationstddev(latency)
percentile(field, q)Percentile (0.0-1.0)percentile(latency, 0.95)
median(field)Median (50th percentile)median(price)
p50(field)50th percentile shorthandp50(latency)
p95(field)95th percentile shorthandp95(latency)
p99(field)99th percentile shorthandp99(latency)
collect(field)Collect values into arraycollect(sensor_id)
first(field)First value in windowfirst(timestamp)
last(field)Last value in windowlast(value)

Partitioned Windows

Windows partitioned by a key:

vpl
// Per-sensor statistics
stream PerSensorStats = TemperatureReading
    .partition_by(sensor_id)
    .window(1m)
    .aggregate(
        sensor: sensor_id,
        avg_temp: avg(temperature),
        readings: count()
    )

// Per-customer totals
stream CustomerTotals = Transaction
    .partition_by(customer_id)
    .window(1h)
    .aggregate(
        customer: customer_id,
        hourly_spend: sum(amount)
    )

Percentile Aggregations

Compute percentiles for latency monitoring, SLA tracking, and distribution analysis:

vpl
// Latency monitoring with percentile aggregations
stream LatencyStats = RequestEvent
    .window(1m)
    .aggregate(
        median_ms: median(latency_ms),
        p50: p50(latency_ms),
        p95: p95(latency_ms),
        p99: p99(latency_ms),
        p999: percentile(latency_ms, 0.999)
    )

The percentile(field, q) function takes a quantile between 0.0 and 1.0. The median, p50, p95, and p99 functions are convenient shorthands.

Filtering After Aggregation

Apply conditions to aggregated results:

vpl
stream HighVolumeMinutes = Transaction
    .window(1m)
    .aggregate(
        total: sum(amount),
        count: count()
    )
    .having(total > 10000 or count > 100)
    .emit(alert_type: "HighVolume", message: "Minute had {count} transactions totaling {total}")

Part 3: Sequence Patterns

Sequence patterns detect events occurring in a specific order using the -> operator.

Basic Sequences

vpl
// A followed by B
pattern LoginLogout = Login -> Logout

// A followed by B followed by C
pattern ThreeStep = Start -> Process -> Complete

// Inline sequence in a stream
stream Sessions = Login as l -> Logout where user_id == l.user_id
    .within(1h)
    .emit(user_id: l.user_id, message: "Session: user logged in and out")

Sequences with Conditions

vpl
// Events must match conditions
pattern FailedLogin =
    LoginAttempt[status == "failed"] as first
    -> LoginAttempt[status == "failed" and user_id == first.user_id] as second
    -> LoginAttempt[status == "failed" and user_id == first.user_id] as third
    within 5m

stream BruteForceDetection = LoginAttempt[status == "failed"] as first
    -> LoginAttempt[status == "failed" and user_id == first.user_id] as second
    -> LoginAttempt[status == "failed" and user_id == first.user_id] as third
    .within(5m)
    .emit(alert_type: "BruteForce", message: "3 failed attempts for {first.user_id}")

Referencing Previous Events

Use aliases to reference earlier events in the sequence:

vpl
pattern PriceSpike =
    Trade as t1
    -> Trade[symbol == t1.symbol and price > t1.price * 1.1] as t2
    within 1m

stream Spikes = Trade as t1
    -> Trade[symbol == t1.symbol and price > t1.price * 1.1] as t2
    .within(1m)
    .emit(alert_type: "PriceSpike", message: "{t1.symbol} jumped from {t1.price} to {t2.price}")

Temporal Constraints

Constrain how quickly events must occur:

vpl
// Must complete within 5 minutes
pattern QuickCheckout =
    CartAdd -> PaymentStart -> PaymentComplete
    within 5m

// Different timeouts per step
pattern SlowThenFast =
    SlowEvent
    -> FastEvent within 10s
    -> FinalEvent within 5s

Part 4: SASE+ Advanced

SASE+ extends basic patterns with Kleene closures, negation, and logical operators.

Kleene Plus (+) - One or More

vpl
// One or more failed logins followed by success
pattern BruteForceSuccess =
    LoginFailed+ -> LoginSuccess
    within 10m

stream Attacks = LoginFailed+ -> LoginSuccess
    .within(10m)
    .emit(alert_type: "BruteForce", message: "Multiple failures followed by success")

Kleene Star (*) - Zero or More

vpl
// Start, any number of middle events, then end
pattern FullSession =
    SessionStart -> Activity* -> SessionEnd
    within 1h

// Optional retries before success
pattern WithRetries =
    Request -> Retry* -> Success
    within 30s

Negation (NOT) - Absence of Event

vpl
// Payment started but not completed
pattern AbandonedPayment =
    PaymentStart -> NOT(PaymentComplete) within 5m

stream Abandoned = PaymentStart -> NOT(PaymentComplete)
    .within(5m)
    .emit(alert_type: "Abandoned", message: "Payment started but not completed")

// Order without confirmation
pattern UnconfirmedOrder =
    OrderPlaced -> NOT(OrderConfirmed) within 1h

AND - Both Events (Any Order)

vpl
// Both A and B must occur (order doesn't matter)
pattern BothRequired =
    AND(DocumentUploaded, SignatureProvided)
    within 1h

stream Complete = AND(DocumentUploaded, SignatureProvided)
    .within(1h)
    .emit(message: "Both document and signature received")

OR - Either Event

vpl
// Either payment method
pattern PaymentReceived =
    OR(CreditCardPayment, BankTransfer)

stream Payments = OR(CreditCardPayment, BankTransfer)
    .emit(message: "Payment received via {match.event_type}")

Complex Combinations

vpl
// (A followed by B) AND (C or D), all within 10 minutes
pattern ComplexFlow =
    (Start -> Middle) AND (OR(OptionA, OptionB))
    within 10m

// Multiple failures, then either success or lockout
pattern AuthResult =
    LoginFailed+ -> OR(LoginSuccess, AccountLocked)
    within 15m

// Order placed, items added, no cancellation, then shipped
pattern SuccessfulOrder =
    OrderPlaced
    -> ItemAdded+
    -> NOT(OrderCancelled)
    -> OrderShipped
    within 24h

Partition-By for Patterns

Process patterns independently per partition key:

vpl
// Per-user pattern matching
pattern UserFailures =
    LoginFailed+ -> LoginSuccess
    within 10m
    partition by user_id

stream UserAttacks = LoginFailed+ as fails -> LoginSuccess
    .within(10m)
    .partition_by(user_id)
    .emit(alert_type: "UserBruteForce", message: "User {user_id} had multiple failures")

Part 5: Joins

Join multiple event streams based on conditions.

Basic Join

vpl
stream EnrichedOrders = join(
    stream Orders = OrderEvent,
    stream Customers = CustomerEvent
        on Orders.customer_id == Customers.id
)
.window(5m)
.select(
    order_id: Orders.id,
    customer_name: Customers.name,
    order_total: Orders.total
)

Multi-Stream Join

vpl
stream FullOrderDetails = join(
    stream Orders = OrderEvent,
    stream Customers = CustomerEvent
        on Orders.customer_id == Customers.id,
    stream Products = ProductEvent
        on Orders.product_id == Products.id,
    stream Inventory = InventoryEvent
        on Orders.product_id == Inventory.product_id
)
.window(10m)
.select(
    order_id: Orders.id,
    customer: Customers.name,
    product: Products.name,
    in_stock: Inventory.quantity > 0
)

Join with Aggregation

vpl
stream CustomerStats = join(
    stream Orders = OrderEvent,
    stream Customers = CustomerEvent
        on Orders.customer_id == Customers.id
)
.window(1h)
.aggregate(
    customer: Customers.name,
    order_count: count(),
    total_spent: sum(Orders.amount),
    avg_order: avg(Orders.amount)
)

Merge Streams

Combine multiple streams of the same type:

vpl
stream AllSensors = merge(
    stream Zone1 = SensorReading .where(zone == "1"),
    stream Zone2 = SensorReading .where(zone == "2"),
    stream Zone3 = SensorReading .where(zone == "3")
)
.window(1m)
.aggregate(
    total_sensors: count(),
    avg_value: avg(value)
)

Part 6: Contexts

Contexts let you run streams on dedicated OS threads for true multi-core parallelism.

Declaring Contexts

vpl
// Declare named execution contexts
context ingestion
context analytics (cores: [2, 3])
context alerts (cores: [4])

Each context gets its own OS thread with a single-threaded Tokio runtime. The optional cores parameter pins the thread to specific CPU cores (Linux only).

Assigning Streams

Use .context() to assign a stream to a context:

vpl
context fast (cores: [0])
context slow (cores: [1])

stream RawFilter = SensorReading
    .context(fast)
    .where(value > 0)
    .emit(sensor_id: sensor_id, value: value)

stream HeavyAnalytics = SensorReading
    .context(slow)
    .window(5m)
    .aggregate(avg: avg(value), stddev: stddev(value))

Cross-Context Communication

Send events from one context to another using context: in .emit():

vpl
context ingest (cores: [0])
context analyze (cores: [1])

// Filter in the ingest context, forward to analyze
stream Filtered = RawEvent
    .context(ingest)
    .where(priority > 5)
    .emit(context: analyze, data: data, priority: priority)

// Aggregate in the analyze context
stream Stats = Filtered
    .context(analyze)
    .window(1m)
    .aggregate(count: count(), avg_priority: avg(priority))

Cross-context events are delivered via bounded mpsc channels.

Backward Compatibility

Programs without context declarations run exactly as before -- single-threaded with zero overhead. Contexts are purely opt-in.

For a complete tutorial with a multi-stage IoT pipeline, see the Contexts Guide.


Best Practices

1. Start Simple

Begin with basic filters before adding windows and patterns:

vpl
// Step 1: Basic filter
stream HighTemps = TemperatureReading
    .where(temperature > 100)

// Step 2: Add window
stream HighTempMinutes = TemperatureReading
    .where(temperature > 100)
    .window(1m)
    .aggregate(count: count())

// Step 3: Add alert
stream HighTempAlerts = TemperatureReading
    .where(temperature > 100)
    .window(1m)
    .aggregate(count: count())
    .having(count > 5)
    .emit(alert_type: "SustainedHighTemp", message: "5+ high readings in 1 minute")

2. Use Partitioning for Scale

vpl
// Process per-device independently
stream DeviceAlerts = SensorReading
    .partition_by(device_id)
    .window(1m)
    .aggregate(avg_val: avg(value))
    .having(avg_val > threshold)

3. Set Appropriate Timeouts

vpl
// Don't wait forever for patterns
pattern QuickMatch = A -> B -> C within 5m

// Different timeouts for different patterns
pattern SlowProcess = Start -> Middle within 1h -> End within 10m

4. Use Aliases for Clarity

vpl
pattern ClearPattern =
    LoginFailed[user_id == "admin"] as failed_login
    -> LoginSuccess[user_id == failed_login.user_id] as success
    within 10m

5. Test with Simulation

bash
# Always test with event files first
varpulis check program.vpl
varpulis simulate -p program.vpl -e test_events.evt --verbose

Quick Reference

Stream Operations

OperationSyntaxDescription
Filter.where(condition)Filter events
Select.select(field: expr, ...)Transform/project fields
Window (tumbling).window(1m)Time-based tumbling window
Window (sliding).window(5m, sliding: 1m)Time-based sliding window
Window (count).window(100)Count-based window
Aggregate.aggregate(name: func(), ...)Compute aggregations
Patternpattern A -> BSequence detection (declaration)
Partition.partition_by(field)Process per-key
Context.context(name)Assign to execution context
Emit.emit(field: value, ...)Output alert/event
Print.print("message")Output log message
Having.having(condition)Post-aggregation filter

Pattern Operators

OperatorMeaningExample
->Followed byA -> B -> C
+One or moreA+
*Zero or moreA*
NOTAbsenceNOT(A)
ANDBoth (any order)AND(A, B)
OREitherOR(A, B)
withinTime constraintwithin 5m

Duration Units

UnitMeaningExample
sSeconds30s
mMinutes5m
hHours1h
dDays1d

Next Steps

Varpulis - Next-generation streaming analytics engine