Windows & Aggregations Reference
Complete reference for window types and aggregation functions in VPL.
Window Types
Tumbling Windows
Non-overlapping, fixed-duration windows. When the duration expires, the window emits and resets.
Syntax:
stream Name = EventType
.window(<duration>)
.aggregate(...)Parameters:
<duration>: Window length (e.g.,5s,1m,1h)
Behavior:
- Events accumulate until window duration elapses
- When first event arrives, window start time is set
- When an event's timestamp exceeds
window_start + duration, window emits - Window resets with the triggering event
Example:
stream MinuteStats = SensorReading
.window(1m)
.aggregate(
avg_value: avg(value),
max_value: max(value),
min_value: min(value),
count: count()
)Implementation: TumblingWindow struct in varpulis-runtime/src/window.rs:15
Sliding Windows
Overlapping windows that slide at a specified interval, providing a rolling view of data.
Syntax:
stream Name = EventType
.window(<size>, sliding: <slide>)Parameters:
<size>: Window size (e.g.,5m)<slide>: Slide interval (e.g.,1m)
Behavior:
- Maintains events within the window size
- Emits current window contents at each slide interval
- Old events are evicted as they fall outside the window
- Window contents overlap between emissions
Example:
stream RollingAverage = TemperatureReading
.window(5m, sliding: 30s)
.aggregate(
rolling_avg: avg(temperature),
recent_max: max(temperature)
)Implementation: SlidingWindow struct in varpulis-runtime/src/window.rs:77
Count Windows
Windows based on event count rather than time.
Syntax:
stream Name = EventType
.window(<n>)
.aggregate(...)Parameters:
<n>: Number of events per window
Behavior:
- Collects exactly N events
- Emits when count is reached
- Resets to empty after emission
Example:
stream BatchProcessor = Transaction
.window(100)
.aggregate(
batch_total: sum(amount),
batch_count: count()
)Implementation: CountWindow struct in varpulis-runtime/src/window.rs:146
Sliding Count Windows
Count-based windows with overlap.
Syntax:
stream Name = EventType
.window(<size>, sliding: <slide>)Parameters:
<size>: Window size in events<slide>: Slide size in events
Example:
stream RollingBatch = Reading
.window(100, sliding: 25)
.aggregate(
rolling_sum: sum(value)
)Implementation: SlidingCountWindow struct in varpulis-runtime/src/window.rs:194
Partitioned Windows
Any window type can be partitioned by a key field.
Syntax:
stream Name = EventType
.partition_by(<field>)
.window(<params>)
.aggregate(...)Behavior:
- Creates independent windows per unique partition key
- Each partition emits separately
- Memory scales with number of unique keys
Example:
stream PerDeviceStats = SensorReading
.partition_by(device_id)
.window(1m)
.aggregate(
device: device_id,
avg_reading: avg(value),
readings: count()
)Implementations:
PartitionedTumblingWindowinwindow.rsPartitionedSlidingWindowinwindow.rs
Duration Units
| Unit | Abbreviation | Example |
|---|---|---|
| Milliseconds | ms | 500ms |
| Seconds | s | 30s |
| Minutes | m | 5m |
| Hours | h | 1h |
| Days | d | 1d |
Examples:
.window(500ms) // Half second
.window(30s) // 30 seconds
.window(5m, sliding: 1m) // 5 min window, 1 min slide
.window(1h) // 1 hour
.window(1d) // 1 dayAggregation Functions
All aggregations operate on events within a window.
count()
Count the number of events in the window.
Signature: count() -> int
Example:
.aggregate(total_events: count())sum(field)
Sum numeric field values. SIMD-optimized on x86_64 with AVX2.
Signature: sum(field: string) -> float
Example:
.aggregate(
total_amount: sum(amount),
total_quantity: sum(quantity)
)Performance: Uses SIMD (AVX2) for 4x parallel f64 addition when available.
avg(field)
Compute the arithmetic mean. SIMD-optimized.
Signature: avg(field: string) -> float | null
Returns: null if no events contain the field.
Example:
.aggregate(average_price: avg(price))Performance: Extracts values to contiguous array for SIMD summation.
min(field)
Find the minimum value. SIMD-optimized.
Signature: min(field: string) -> float | null
Example:
.aggregate(lowest_temp: min(temperature))Performance: Uses SIMD _mm256_min_pd for parallel comparison.
max(field)
Find the maximum value. SIMD-optimized.
Signature: max(field: string) -> float | null
Example:
.aggregate(highest_temp: max(temperature))Performance: Uses SIMD _mm256_max_pd for parallel comparison.
stddev(field)
Compute the sample standard deviation using Welford's online algorithm.
Signature: stddev(field: string) -> float | null
Returns: null if fewer than 2 values.
Example:
.aggregate(temp_variance: stddev(temperature))Algorithm: Single-pass Welford's algorithm for numerical stability.
percentile(field, q)
Compute the q-th percentile using linear interpolation. The quantile q is specified as a float between 0.0 and 1.0.
Signature: percentile(field: string, q: float) -> float | null
Example:
.aggregate(
p50_latency: percentile(latency, 0.5),
p95_latency: percentile(latency, 0.95),
p99_latency: percentile(latency, 0.99),
p999_latency: percentile(latency, 0.999)
)Algorithm: Sort-based with linear interpolation. Collects all f64 values, sorts, then interpolates at the quantile position.
median(field)
Shorthand for the 50th percentile.
Signature: median(field: string) -> float | null
Example:
.aggregate(median_price: median(price))p50(field) / p95(field) / p99(field)
Convenience aliases for common percentiles.
Signature: p50(field) -> float | null (equivalent to percentile(field, 0.5))
Example:
.aggregate(
p50_latency: p50(latency),
p95_latency: p95(latency),
p99_latency: p99(latency)
)collect(field)
Collect all field values into an array.
Signature: collect(field: string) -> array
Example:
.aggregate(
all_sensors: collect(sensor_id),
all_values: collect(value)
)first(field)
Return the first value in the window (by event order).
Signature: first(field: string) -> value | null
Example:
.aggregate(
window_start: first(timestamp),
initial_value: first(value)
)last(field)
Return the last value in the window (by event order).
Signature: last(field: string) -> value | null
Example:
.aggregate(
window_end: last(timestamp),
final_value: last(value)
)distinct(field) / count_distinct(field)
Count unique values of a field.
Signature: count(distinct(field: string)) -> int
Example:
.aggregate(
unique_users: count(distinct(user_id)),
unique_products: count(distinct(product_id))
)Incremental Aggregations
For high-throughput scenarios, Varpulis supports incremental aggregation where possible.
IncrementalSum
O(1) updates for sum instead of O(n) recomputation.
Supported operations:
add(value)- Add a value: O(1)remove(value)- Remove a value: O(1)sum()- Get current sum: O(1)count()- Get current count: O(1)avg()- Get current average: O(1)
Use case: Sliding windows where events expire frequently.
Implementation: IncrementalSum in varpulis-runtime/src/simd.rs:391
IncrementalMinMax
Maintains min/max with incremental updates.
Supported operations:
add(value)- Add a value: O(1) amortizedremove(value)- Remove a value: O(n) worst casemin()- Get current min: O(1) after sortmax()- Get current max: O(1) after sort
Implementation: IncrementalMinMax in varpulis-runtime/src/simd.rs:444
SIMD Optimization
Aggregations use SIMD (Single Instruction Multiple Data) vectorization when available.
Requirements
- x86_64 architecture
- AVX2 instruction set support (most CPUs since 2013)
Performance Characteristics
| Operation | Scalar | SIMD (AVX2) | Speedup |
|---|---|---|---|
| sum | O(n) | O(n/4) | ~4x |
| min | O(n) | O(n/4) | ~4x |
| max | O(n) | O(n/4) | ~4x |
| avg | O(n) | O(n/4) | ~4x |
How It Works
- Field Extraction: Numeric field values are extracted to contiguous
Vec<f64> - Vectorization: AVX2 processes 4 f64 values per instruction
- Loop Unrolling: 4-way unrolling for better instruction-level parallelism
- Horizontal Reduction: Final aggregation of vector lanes
- Remainder Handling: Scalar processing for remaining elements
Fallback
On non-AVX2 systems, scalar 4-way loop unrolling provides ~2x speedup over naive loops.
Memory Considerations
Window Memory Usage
| Window Type | Memory Formula |
|---|---|
| Tumbling | O(events_per_window) |
| Sliding | O(events_per_window) |
| Count | O(window_size) |
| Partitioned | O(partitions × events_per_window) |
Recommendations
- Use appropriate window sizes: Smaller windows = less memory
- Partition carefully: Each partition key creates a separate window
- Consider count windows: Fixed memory usage vs. time windows
- Monitor partition growth: Unbounded partition keys can exhaust memory
Examples
Real-time Monitoring
stream SensorAlerts = SensorReading
.partition_by(sensor_id)
.window(1m)
.aggregate(
sensor: sensor_id,
avg_value: avg(value),
max_value: max(value),
readings: count()
)
.where(avg_value > threshold or max_value > critical_threshold)
.emit(sensor: sensor, avg_value: avg_value, max_value: max_value)Rolling Statistics
stream RollingStats = MarketData
.window(5m, sliding: 10s)
.aggregate(
vwap: sum(price * volume) / sum(volume),
high: max(price),
low: min(price),
volatility: stddev(price)
)Batch Processing
stream BatchReport = Transaction
.window(1000)
.aggregate(
batch_num: count(),
total_value: sum(amount),
avg_value: avg(amount),
unique_customers: count(distinct(customer_id))
)
.print("Processed batch: {batch_num} txns, ${total_value:.2} total")Latency Percentiles
stream ApiLatency = RequestEvent
.partition_by(endpoint)
.window(1m)
.aggregate(
endpoint: endpoint,
median: median(latency_ms),
p95: p95(latency_ms),
p99: p99(latency_ms),
p999: percentile(latency_ms, 0.999),
count: count()
)
.where(p99 > 500)
.emit(
alert: "high_latency",
endpoint: endpoint,
p99_ms: p99,
request_count: count
)Multi-Level Aggregation
// First level: per-device minute stats
stream DeviceMinutes = SensorReading
.partition_by(device_id)
.window(1m)
.aggregate(
device: device_id,
minute_avg: avg(value)
)
// Second level: all-device hour stats
stream HourlyOverview = DeviceMinutes
.window(1h)
.aggregate(
active_devices: count(distinct(device)),
overall_avg: avg(minute_avg),
hottest_device: max(minute_avg)
)See Also
- Language Tutorial - Learn VPL basics
- Performance Tuning - Optimize window performance
- SASE+ Pattern Guide - Pattern matching over windows