Skip to content

Built-in Functions

This document lists all built-in functions in VPL. Functions are marked as:

  • Implemented - Working in current version
  • Planned - Documented for future implementation

Aggregation Functions (Implemented)

These functions work within .aggregate() blocks on windowed streams.

FunctionDescriptionExampleStatus
count()Number of elementscount()Implemented
sum(expr)Sum of valuessum(price)Implemented
avg(expr)Averageavg(temperature)Implemented
min(expr)Minimum valuemin(latency)Implemented
max(expr)Maximum valuemax(latency)Implemented
first(expr)First element in windowfirst(timestamp)Implemented
last(expr)Last element in windowlast(value)Implemented
stddev(expr)Standard deviationstddev(values)Implemented
count_distinct(expr)Count of distinct valuescount_distinct(user_id)Implemented
ema(expr, period)Exponential moving averageema(price, 12)Implemented
median(expr)Median (50th percentile)median(latency)Implemented
percentile(expr, q)Percentile (0.0–1.0)percentile(latency, 0.99)Implemented
p50(expr)50th percentilep50(latency)Implemented
p95(expr)95th percentilep95(latency)Implemented
p99(expr)99th percentilep99(latency)Implemented

Aggregation Example

varpulis
stream Stats = Trades
    .partition_by(symbol)
    .window(5m)
    .aggregate(
        total_volume: sum(volume),
        avg_price: avg(price),
        trade_count: count(),
        price_range: max(price) - min(price)
    )
    .emit(symbol: symbol, stats: "computed")

Window Operations (Implemented)

Windows are specified as stream operators, not functions.

OperationDescriptionExampleStatus
.window(duration)Tumbling window.window(5m)Implemented
.window(duration, sliding: interval)Sliding window.window(5m, sliding: 1m)Implemented
.partition_by(field)Partition by key.partition_by(user_id)Implemented

Join Operations (Implemented)

FunctionDescriptionExampleStatus
join(stream1, stream2, ...)Inner join streamsjoin(EMA12, EMA26)Implemented
left_join(stream1, stream2, ...)Left outer joinleft_join(Orders, Payments)Implemented
right_join(stream1, stream2, ...)Right outer joinright_join(Orders, Payments)Implemented
full_join(stream1, stream2, ...)Full outer joinfull_join(Orders, Payments)Implemented
.on(condition)Join condition.on(A.symbol == B.symbol)Implemented

Join Example

varpulis
# Inner join (default)
stream MACD = join(EMA12, EMA26)
    .on(EMA12.symbol == EMA26.symbol)
    .window(1m)
    .select(
        symbol: EMA12.symbol,
        macd_line: EMA12.ema_12 - EMA26.ema_26
    )
    .emit(event_type: "MACD")

# Left outer join — emits even when payments are missing
stream Enriched = left_join(
    stream orders = OrderStream.on(order_id),
    stream payments = PaymentStream.on(order_id)
)

Planned Functions (Not Yet Implemented)

The following functions are planned for future versions but are not currently available.

Planned Aggregations

FunctionDescription
variance(expr)Variance
collect()Collect all values into list

Planned Window Functions

FunctionDescription
session_window(gap)Session-based windowing
row_number()Row number in window
rank()Rank in window
lag(expr, n)Value n positions before
lead(expr, n)Value n positions after

Planned String Functions

FunctionDescription
len(s)String length
upper(s) / lower(s)Case conversion
trim(s)Remove whitespace
contains(s, sub)Substring check
starts_with(s, pre) / ends_with(s, suf)Prefix/suffix check
split(s, sep) / join(arr, sep)Split/join strings
replace(s, old, new)String replacement
regex_match(s, pat)Regex matching

Planned Math Functions

FunctionDescription
abs(x)Absolute value
round(x, n) / floor(x) / ceil(x)Rounding
sqrt(x) / pow(x, y)Power functions
log(x) / exp(x)Logarithm/exponential

Planned Timestamp Functions

FunctionDescription
now()Current timestamp
year(ts) / month(ts) / day(ts)Date extraction
hour(ts) / minute(ts) / second(ts)Time extraction
duration_between(t1, t2)Time difference

Planned Utility Functions

FunctionDescription
coalesce(a, b, ...)First non-null value
uuid()Generate UUID
random()Random number

Sequence Pattern Built-in Variables (Implemented)

These variables are available in .where() and .emit() after a sequence pattern match (-> or all Kleene).

VariableTypeDescription
match_countintTotal events in the matched sequence
match_duration_msintDuration from first to last matched event (milliseconds)
match_ratefloatEvents per second within the match (match_count / duration_seconds)

Sequence Pattern Example

varpulis
# Detect brute force: 3+ failed logins then success
stream BruteForce = failed_login -> all failed_login as f -> successful_login as s .within(5m)
    .where(match_count >= 4)
    .emit(user: s.user_id, failed_attempts: match_count - 1)

# Detect high event rate (> 2 events/sec)
stream HighRate = request -> all request as r .within(30s)
    .where(match_count >= 3 and match_rate > 2.0)
    .emit(endpoint: r.endpoint, rate: match_rate)

Kleene Aggregate Functions (Implemented)

These functions operate on Kleene (all) captures in sequence patterns. They can be used in .where() and .emit() without needing .trend_aggregate().

FunctionDescriptionExample
sum(alias.field)Sum of field values across Kleene matchessum(t.amount)
avg(alias.field)Average of field valuesavg(t.temperature)
min(alias.field)Minimum field valuemin(t.latency)
max(alias.field)Maximum field valuemax(t.amount)
count(alias)Number of events matched by aliascount(t)
distinct_count(alias.field)Number of unique valuesdistinct_count(s.source_ip)
first(alias).fieldField from first matched eventfirst(f).city
last(alias).fieldField from last matched eventlast(f).city

Kleene Aggregate Example

varpulis
# Detect large cumulative transfers
stream LargeTransfers = login as l -> all transfer as t .within(10m)
    .where(sum(t.amount) > 5000)
    .emit(user: l.user_id, total: sum(t.amount), avg: avg(t.amount))

# Detect location changes: compare first and last login cities
stream LocationChange = login -> all login as f .within(1h)
    .where(match_count >= 2)
    .emit(user: f.user_id, from: first(f).city, to: last(f).city)

# Detect distributed attacks from many distinct IPs
stream DDoS = scan -> all scan as s .within(1m)
    .where(distinct_count(s.source_ip) >= 3)
    .emit(target: s.target_ip, unique_ips: distinct_count(s.source_ip))

Absence / Negation (Implemented)

The .not(EventType) operator invalidates active pattern runs if the forbidden event arrives. Use it for "A then B, but NOT C in between" patterns.

varpulis
# Order followed by shipment, but NOT if cancellation arrives
stream OrderShipped = order as o -> shipment as s .within(24h)
    .not(cancellation)
    .emit(order_id: o.order_id, status: s.status)

Forecast Built-in Variables (Implemented)

These variables are available in streams that use the .forecast() operator after a sequence pattern. They are populated by the PST-based pattern forecasting engine.

VariableTypeDescription
forecast_probabilityfloatPattern completion probability (0.0–1.0)
forecast_confidencefloatPrediction stability (0.0–1.0) — high values mean the model has converged
forecast_timeintExpected time to completion (nanoseconds)
forecast_statestrCurrent NFA state label
forecast_context_depthintPST context depth used for prediction
forecast_lowerfloatLower bound of conformal prediction interval (0.0–1.0)
forecast_upperfloatUpper bound of conformal prediction interval (0.0–1.0)

The forecast_confidence variable measures how stable the prediction is. It is computed from the coefficient of variation (CV) over a sliding window of the last 20 predictions. A value of 1.0 means predictions are perfectly stable; a value near 0.0 means the model is still learning. Use .where(forecast_confidence > 0.8) to filter out unstable early predictions.

The forecast_lower and forecast_upper variables provide calibrated 90%-coverage prediction intervals via conformal prediction. Before sufficient calibration data is available, the interval defaults to [0.0, 1.0] (maximum uncertainty). As the engine observes forecast outcomes (pattern completions and expirations), the interval narrows automatically. Conformal intervals can be disabled per-pipeline with conformal: false — in that case, forecast_lower is always 0.0 and forecast_upper is always 1.0.

The forecast_probability is modulated by Hawkes process intensity tracking when enabled. When events needed for the next NFA transition arrive in a temporal burst, the probability is boosted proportionally (up to 5x). During normal event rates, the boost factor is ~1.0. Hawkes modulation can be disabled per-pipeline with hawkes: false for latency-sensitive workloads. The Hawkes process uses EMA-based parameter estimation, adapting to regime changes in ~20-40 events.

Forecast Modes

The .forecast() operator supports preset modes for common use cases:

Modewarmupmax_depthhawkesconformalAdaptive warmup
"fast"503offoffoff
"accurate"2005ononon
"balanced" (default)1003ononon

Modes provide defaults that can be overridden by explicit parameters:

varpulis
# Zero-config — uses balanced defaults with adaptive warmup
.forecast()

# Fast mode — no Hawkes overhead, no conformal
.forecast(mode: "fast")

# Accurate mode with custom confidence
.forecast(mode: "accurate", confidence: 0.8)

# Fast mode but keep conformal intervals
.forecast(mode: "fast", conformal: true)

When warmup is not explicitly set, adaptive warmup is enabled (for balanced/accurate modes). It extends the warmup beyond the minimum event count until predictions stabilize, then starts emitting.

Forecast Example

varpulis
stream FraudForecast = Transaction as t1
    -> Transaction as t2 where t2.amount > t1.amount * 5
    -> Transaction as t3 where t3.location != t1.location
    .within(5m)
    .forecast(mode: "accurate")
    .where(forecast_confidence > 0.8 and forecast_probability > 0.7)
    .emit(
        probability: forecast_probability,
        stability: forecast_confidence,
        expected_time: forecast_time,
        state: forecast_state,
        confidence_lower: forecast_lower,
        confidence_upper: forecast_upper
    )

See Forecasting Tutorial and Forecasting Architecture for details.


Enrichment Built-in Variables (Implemented)

These variables are available after .enrich():

VariableTypeDescriptionStatus
enrich_statusstr"ok", "error", "cached", or "timeout"Implemented
enrich_latency_msintLookup latency in ms (0 for cache hits)Implemented

Enrichment Example

varpulis
connector WeatherAPI = http(url: "https://api.weather.com/v1")

stream Enriched = Temperature as t
    .enrich(WeatherAPI, key: t.city, fields: [forecast, humidity], cache_ttl: 5m)
    .where(enrich_status == "ok" and forecast == "rain")
    .emit(city: t.city, forecast: forecast, latency: enrich_latency_ms)

See Enrichment Reference for details.


See Also

Varpulis - Next-generation streaming analytics engine