Built-in Functions
This document lists all built-in functions in VPL. Functions are marked as:
- Implemented - Working in current version
- Planned - Documented for future implementation
Aggregation Functions (Implemented)
These functions work within .aggregate() blocks on windowed streams.
| Function | Description | Example | Status |
|---|---|---|---|
count() | Number of elements | count() | Implemented |
sum(expr) | Sum of values | sum(price) | Implemented |
avg(expr) | Average | avg(temperature) | Implemented |
min(expr) | Minimum value | min(latency) | Implemented |
max(expr) | Maximum value | max(latency) | Implemented |
first(expr) | First element in window | first(timestamp) | Implemented |
last(expr) | Last element in window | last(value) | Implemented |
stddev(expr) | Standard deviation | stddev(values) | Implemented |
count_distinct(expr) | Count of distinct values | count_distinct(user_id) | Implemented |
ema(expr, period) | Exponential moving average | ema(price, 12) | Implemented |
median(expr) | Median (50th percentile) | median(latency) | Implemented |
percentile(expr, q) | Percentile (0.0–1.0) | percentile(latency, 0.99) | Implemented |
p50(expr) | 50th percentile | p50(latency) | Implemented |
p95(expr) | 95th percentile | p95(latency) | Implemented |
p99(expr) | 99th percentile | p99(latency) | Implemented |
Aggregation Example
stream Stats = Trades
.partition_by(symbol)
.window(5m)
.aggregate(
total_volume: sum(volume),
avg_price: avg(price),
trade_count: count(),
price_range: max(price) - min(price)
)
.emit(symbol: symbol, stats: "computed")Window Operations (Implemented)
Windows are specified as stream operators, not functions.
| Operation | Description | Example | Status |
|---|---|---|---|
.window(duration) | Tumbling window | .window(5m) | Implemented |
.window(duration, sliding: interval) | Sliding window | .window(5m, sliding: 1m) | Implemented |
.partition_by(field) | Partition by key | .partition_by(user_id) | Implemented |
Join Operations (Implemented)
| Function | Description | Example | Status |
|---|---|---|---|
join(stream1, stream2, ...) | Inner join streams | join(EMA12, EMA26) | Implemented |
left_join(stream1, stream2, ...) | Left outer join | left_join(Orders, Payments) | Implemented |
right_join(stream1, stream2, ...) | Right outer join | right_join(Orders, Payments) | Implemented |
full_join(stream1, stream2, ...) | Full outer join | full_join(Orders, Payments) | Implemented |
.on(condition) | Join condition | .on(A.symbol == B.symbol) | Implemented |
Join Example
# Inner join (default)
stream MACD = join(EMA12, EMA26)
.on(EMA12.symbol == EMA26.symbol)
.window(1m)
.select(
symbol: EMA12.symbol,
macd_line: EMA12.ema_12 - EMA26.ema_26
)
.emit(event_type: "MACD")
# Left outer join — emits even when payments are missing
stream Enriched = left_join(
stream orders = OrderStream.on(order_id),
stream payments = PaymentStream.on(order_id)
)Planned Functions (Not Yet Implemented)
The following functions are planned for future versions but are not currently available.
Planned Aggregations
| Function | Description |
|---|---|
variance(expr) | Variance |
collect() | Collect all values into list |
Planned Window Functions
| Function | Description |
|---|---|
session_window(gap) | Session-based windowing |
row_number() | Row number in window |
rank() | Rank in window |
lag(expr, n) | Value n positions before |
lead(expr, n) | Value n positions after |
Planned String Functions
| Function | Description |
|---|---|
len(s) | String length |
upper(s) / lower(s) | Case conversion |
trim(s) | Remove whitespace |
contains(s, sub) | Substring check |
starts_with(s, pre) / ends_with(s, suf) | Prefix/suffix check |
split(s, sep) / join(arr, sep) | Split/join strings |
replace(s, old, new) | String replacement |
regex_match(s, pat) | Regex matching |
Planned Math Functions
| Function | Description |
|---|---|
abs(x) | Absolute value |
round(x, n) / floor(x) / ceil(x) | Rounding |
sqrt(x) / pow(x, y) | Power functions |
log(x) / exp(x) | Logarithm/exponential |
Planned Timestamp Functions
| Function | Description |
|---|---|
now() | Current timestamp |
year(ts) / month(ts) / day(ts) | Date extraction |
hour(ts) / minute(ts) / second(ts) | Time extraction |
duration_between(t1, t2) | Time difference |
Planned Utility Functions
| Function | Description |
|---|---|
coalesce(a, b, ...) | First non-null value |
uuid() | Generate UUID |
random() | Random number |
Sequence Pattern Built-in Variables (Implemented)
These variables are available in .where() and .emit() after a sequence pattern match (-> or all Kleene).
| Variable | Type | Description |
|---|---|---|
match_count | int | Total events in the matched sequence |
match_duration_ms | int | Duration from first to last matched event (milliseconds) |
match_rate | float | Events per second within the match (match_count / duration_seconds) |
Sequence Pattern Example
# Detect brute force: 3+ failed logins then success
stream BruteForce = failed_login -> all failed_login as f -> successful_login as s .within(5m)
.where(match_count >= 4)
.emit(user: s.user_id, failed_attempts: match_count - 1)
# Detect high event rate (> 2 events/sec)
stream HighRate = request -> all request as r .within(30s)
.where(match_count >= 3 and match_rate > 2.0)
.emit(endpoint: r.endpoint, rate: match_rate)Kleene Aggregate Functions (Implemented)
These functions operate on Kleene (all) captures in sequence patterns. They can be used in .where() and .emit() without needing .trend_aggregate().
| Function | Description | Example |
|---|---|---|
sum(alias.field) | Sum of field values across Kleene matches | sum(t.amount) |
avg(alias.field) | Average of field values | avg(t.temperature) |
min(alias.field) | Minimum field value | min(t.latency) |
max(alias.field) | Maximum field value | max(t.amount) |
count(alias) | Number of events matched by alias | count(t) |
distinct_count(alias.field) | Number of unique values | distinct_count(s.source_ip) |
first(alias).field | Field from first matched event | first(f).city |
last(alias).field | Field from last matched event | last(f).city |
Kleene Aggregate Example
# Detect large cumulative transfers
stream LargeTransfers = login as l -> all transfer as t .within(10m)
.where(sum(t.amount) > 5000)
.emit(user: l.user_id, total: sum(t.amount), avg: avg(t.amount))
# Detect location changes: compare first and last login cities
stream LocationChange = login -> all login as f .within(1h)
.where(match_count >= 2)
.emit(user: f.user_id, from: first(f).city, to: last(f).city)
# Detect distributed attacks from many distinct IPs
stream DDoS = scan -> all scan as s .within(1m)
.where(distinct_count(s.source_ip) >= 3)
.emit(target: s.target_ip, unique_ips: distinct_count(s.source_ip))Absence / Negation (Implemented)
The .not(EventType) operator invalidates active pattern runs if the forbidden event arrives. Use it for "A then B, but NOT C in between" patterns.
# Order followed by shipment, but NOT if cancellation arrives
stream OrderShipped = order as o -> shipment as s .within(24h)
.not(cancellation)
.emit(order_id: o.order_id, status: s.status)Forecast Built-in Variables (Implemented)
These variables are available in streams that use the .forecast() operator after a sequence pattern. They are populated by the PST-based pattern forecasting engine.
| Variable | Type | Description |
|---|---|---|
forecast_probability | float | Pattern completion probability (0.0–1.0) |
forecast_confidence | float | Prediction stability (0.0–1.0) — high values mean the model has converged |
forecast_time | int | Expected time to completion (nanoseconds) |
forecast_state | str | Current NFA state label |
forecast_context_depth | int | PST context depth used for prediction |
forecast_lower | float | Lower bound of conformal prediction interval (0.0–1.0) |
forecast_upper | float | Upper bound of conformal prediction interval (0.0–1.0) |
The forecast_confidence variable measures how stable the prediction is. It is computed from the coefficient of variation (CV) over a sliding window of the last 20 predictions. A value of 1.0 means predictions are perfectly stable; a value near 0.0 means the model is still learning. Use .where(forecast_confidence > 0.8) to filter out unstable early predictions.
The forecast_lower and forecast_upper variables provide calibrated 90%-coverage prediction intervals via conformal prediction. Before sufficient calibration data is available, the interval defaults to [0.0, 1.0] (maximum uncertainty). As the engine observes forecast outcomes (pattern completions and expirations), the interval narrows automatically. Conformal intervals can be disabled per-pipeline with conformal: false — in that case, forecast_lower is always 0.0 and forecast_upper is always 1.0.
The forecast_probability is modulated by Hawkes process intensity tracking when enabled. When events needed for the next NFA transition arrive in a temporal burst, the probability is boosted proportionally (up to 5x). During normal event rates, the boost factor is ~1.0. Hawkes modulation can be disabled per-pipeline with hawkes: false for latency-sensitive workloads. The Hawkes process uses EMA-based parameter estimation, adapting to regime changes in ~20-40 events.
Forecast Modes
The .forecast() operator supports preset modes for common use cases:
| Mode | warmup | max_depth | hawkes | conformal | Adaptive warmup |
|---|---|---|---|---|---|
"fast" | 50 | 3 | off | off | off |
"accurate" | 200 | 5 | on | on | on |
"balanced" (default) | 100 | 3 | on | on | on |
Modes provide defaults that can be overridden by explicit parameters:
# Zero-config — uses balanced defaults with adaptive warmup
.forecast()
# Fast mode — no Hawkes overhead, no conformal
.forecast(mode: "fast")
# Accurate mode with custom confidence
.forecast(mode: "accurate", confidence: 0.8)
# Fast mode but keep conformal intervals
.forecast(mode: "fast", conformal: true)When warmup is not explicitly set, adaptive warmup is enabled (for balanced/accurate modes). It extends the warmup beyond the minimum event count until predictions stabilize, then starts emitting.
Forecast Example
stream FraudForecast = Transaction as t1
-> Transaction as t2 where t2.amount > t1.amount * 5
-> Transaction as t3 where t3.location != t1.location
.within(5m)
.forecast(mode: "accurate")
.where(forecast_confidence > 0.8 and forecast_probability > 0.7)
.emit(
probability: forecast_probability,
stability: forecast_confidence,
expected_time: forecast_time,
state: forecast_state,
confidence_lower: forecast_lower,
confidence_upper: forecast_upper
)See Forecasting Tutorial and Forecasting Architecture for details.
Enrichment Built-in Variables (Implemented)
These variables are available after .enrich():
| Variable | Type | Description | Status |
|---|---|---|---|
enrich_status | str | "ok", "error", "cached", or "timeout" | Implemented |
enrich_latency_ms | int | Lookup latency in ms (0 for cache hits) | Implemented |
Enrichment Example
connector WeatherAPI = http(url: "https://api.weather.com/v1")
stream Enriched = Temperature as t
.enrich(WeatherAPI, key: t.city, fields: [forecast, humidity], cache_ttl: 5m)
.where(enrich_status == "ok" and forecast == "rain")
.emit(city: t.city, forecast: forecast, latency: enrich_latency_ms)See Enrichment Reference for details.