Forecasting Tutorial
What Is Pattern Forecasting?
Standard complex event processing detects patterns after they complete. Forecasting goes further: it predicts whether a partially-matched pattern will complete, and estimates when.
For example, an industrial equipment failure pattern might be:
VibrationAnomaly -> TemperatureRise -> BearingFailureWith forecasting, after observing VibrationAnomaly -> TemperatureRise, the engine can estimate the probability that BearingFailure will follow, before it happens. This turns hours of warning into proactive maintenance instead of reactive repair — avoiding unplanned downtime that costs $100K-$300K per hour in heavy industry.
Varpulis implements forecasting using Prediction Suffix Trees (PST) combined with the SASE NFA to form a Pattern Markov Chain (PMC). The model learns transition probabilities from the live event stream and applies them to the pattern structure.
Basic .forecast() Syntax
The .forecast() operator is attached to a sequence pattern, after .within(). In the simplest case, no parameters are needed:
# Zero-config — adaptive warmup, balanced defaults
stream AlertName = EventA as a -> EventB as b
.within(5m)
.forecast()
.where(forecast_confidence > 0.8)
.emit(prob: forecast_probability)For more control, use preset modes or individual parameters:
stream AlertName = EventA as a
-> EventB where condition as b
-> EventC where condition as c
.within(time_window)
.forecast(mode: "accurate", confidence: 0.7)
.where(forecast_confidence > 0.8 and forecast_probability > 0.7)
.emit(fields...)Parameters
| Parameter | Type | Default | Description |
|---|---|---|---|
mode | str | "balanced" | Preset: "fast", "accurate", or "balanced" |
confidence | float | 0.5 | Minimum probability to emit a forecast |
horizon | duration | 5m | How far ahead to forecast |
warmup | integer | 100 | Minimum events before forecasting starts |
max_depth | integer | 3 | PST context depth for prediction |
hawkes | bool | true | Enable Hawkes intensity modulation |
conformal | bool | true | Enable conformal prediction intervals |
Modes
| Mode | Best for | Hawkes | Conformal | Warmup | Adaptive |
|---|---|---|---|---|---|
"fast" | Low-latency, high-throughput | off | off | 50 | off |
"balanced" | General use (default) | on | on | 100 | on |
"accurate" | Maximum precision | on | on | 200 | on |
Modes provide defaults; explicit parameters override them. For example, .forecast(mode: "fast", conformal: true) uses fast mode but keeps conformal intervals.
Forecast Fields
After .forecast(), these fields become available for use in .where() and .emit():
forecast_probability: a value between 0.0 and 1.0 representing the estimated probability that the pattern will completeforecast_confidence: a value between 0.0 and 1.0 indicating prediction stability — high values mean the model has converged and the prediction is reliableforecast_time: estimated time until pattern completion in nanosecondsforecast_state: current NFA state label (e.g., "state_2_of_4")forecast_context_depth: PST context depth used for predictionforecast_lower/forecast_upper: conformal prediction interval bounds
Step-by-Step Example: Industrial Predictive Maintenance
Step 1: Define the Events
Start with the sensor events that form the failure cascade. In rotating equipment (pumps, turbines, compressors), bearing failure follows a well-known physical sequence: vibration anomaly → temperature rise → mechanical failure.
event VibrationReading:
machine_id: str
amplitude_mm: float
frequency_hz: float
zone: str
ts: timestamp
event TemperatureReading:
machine_id: str
value_celsius: float
zone: str
ts: timestamp
event MachineFailure:
machine_id: str
failure_type: str
severity: str
ts: timestampStep 2: Add the Sequence
stream BearingFailurePattern = VibrationReading as vib
-> TemperatureReading where machine_id == vib.machine_id and value_celsius > 75 as temp
-> MachineFailure where machine_id == vib.machine_id as failure
.within(4h)This matches the full three-step failure cascade within a 4-hour window. Without forecasting, alerts fire only after the failure has already occurred.
Step 3: Add Forecasting
stream BearingFailureForecast = VibrationReading as vib
-> TemperatureReading where machine_id == vib.machine_id and value_celsius > 75 as temp
-> MachineFailure where machine_id == vib.machine_id as failure
.within(4h)
.forecast(mode: "accurate")Now the engine will:
- Learn the sensor stream's statistical properties during warmup (minimum 200 events, extended by adaptive warmup until predictions stabilize)
- For each partially-matched pattern (e.g., vibration seen, waiting for temperature rise), compute the probability of eventual failure
- Only emit forecasts where the probability exceeds the confidence threshold (default 0.5)
Step 4: Filter and Emit
stream BearingFailureForecast = VibrationReading as vib
-> TemperatureReading where machine_id == vib.machine_id and value_celsius > 75 as temp
-> MachineFailure where machine_id == vib.machine_id as failure
.within(4h)
.forecast(mode: "accurate", confidence: 0.5, horizon: 2h)
.where(forecast_confidence > 0.8 and forecast_probability > 0.7)
.emit(
alert_type: "PREDICTED_BEARING_FAILURE",
machine_id: vib.machine_id,
zone: vib.zone,
probability: forecast_probability,
stability: forecast_confidence,
expected_time: forecast_time,
vibration_amplitude: vib.amplitude_mm,
temperature: temp.value_celsius,
severity: if forecast_probability > 0.85 then "critical" else "warning",
recommendation: if forecast_probability > 0.85
then "Schedule emergency bearing replacement within 1 hour"
else "Monitor closely and prepare replacement parts"
)The .where(forecast_confidence > 0.8 and forecast_probability > 0.7) combines two filters: forecast_confidence ensures the model has converged (stable predictions), and forecast_probability sets the minimum completion threshold. A maintenance team can then act on high-probability alerts (>85%) with emergency repairs, while monitoring lower-probability alerts (70-85%) with standby parts ready.
Tuning Parameters
Confidence Threshold
The confidence parameter controls the minimum probability for emitting any forecast at all. Lower values produce more forecasts (higher recall) but more false positives. Higher values produce fewer, more reliable forecasts.
Recommended starting points:
- Exploratory/monitoring:
0.3-- see what the model predicts - Alerting:
0.5-- reasonable balance - Automated action:
0.8-- high confidence required
Horizon
The horizon parameter limits how far ahead the model looks. A longer horizon allows earlier detection but may reduce accuracy because the model must predict further into the future.
Guidelines:
- Set horizon to roughly half the
.within()window for balanced results - For time-sensitive patterns, use a shorter horizon (e.g.,
horizon: 5mfor a.within(10m)pattern) - The model naturally becomes less confident at longer horizons
Warmup Period
The warmup parameter controls how many events the PST observes before it starts making predictions. During warmup, the model learns the event stream's statistical properties -- no forecasts are emitted.
Guidelines:
- Low-volume streams (< 100 events/min):
warmup: 200 - Medium-volume streams (100-10K events/min):
warmup: 500 - High-volume streams (> 10K events/min):
warmup: 1000
More warmup events produce a better-calibrated model, but delay the start of forecasting. In practice, 500-1000 events is sufficient for most workloads.
Working With Forecast Results
Tiered Alerting
Use multiple filters on forecast_probability to create tiered alert levels:
# High-confidence: automated shutdown
stream EmergencyShutdown = VibrationReading as vib
-> TemperatureReading where machine_id == vib.machine_id and value_celsius > 75
-> MachineFailure where machine_id == vib.machine_id
.within(4h)
.forecast(confidence: 0.5, horizon: 2h, warmup: 500)
.where(forecast_probability > 0.9)
.emit(
action: "EMERGENCY_SHUTDOWN",
machine_id: vib.machine_id,
probability: forecast_probability
)
# Medium-confidence: schedule maintenance
stream MaintenanceQueue = VibrationReading as vib
-> TemperatureReading where machine_id == vib.machine_id and value_celsius > 75
-> MachineFailure where machine_id == vib.machine_id
.within(4h)
.forecast(confidence: 0.5, horizon: 2h, warmup: 500)
.where(forecast_probability > 0.6 and forecast_probability <= 0.9)
.emit(
action: "SCHEDULE_MAINTENANCE",
machine_id: vib.machine_id,
probability: forecast_probability,
expected_time: forecast_expected_time
)Using Expected Time
The forecast_expected_time field estimates when the pattern will complete. Use it to prioritize alerts or set SLA timers:
stream UrgentForecast = SensorReading as baseline
-> SensorReading where sensor_id == baseline.sensor_id and value > baseline.value * 2 as spike
-> SystemFailure where system_id == baseline.sensor_id as failure
.within(30m)
.forecast(confidence: 0.5, horizon: 15m, warmup: 500)
.where(forecast_probability > 0.7)
.emit(
alert_type: if forecast_expected_time < 5m then "URGENT" else "WARNING",
sensor_id: baseline.sensor_id,
probability: forecast_probability,
minutes_remaining: forecast_expected_time
)Full Example: Cybersecurity APT Detection
Advanced Persistent Threats (APTs) follow a predictable kill chain: reconnaissance → initial exploitation → lateral movement → data exfiltration. With dwell times of 56-200 days, there is a wide intervention window — but only if you can predict that a partial sequence will progress.
connector SiemKafka = kafka(
brokers: ["siem-kafka:9092"],
group_id: "apt-forecaster"
)
event NetworkScan:
source_ip: str
target_subnet: str
ports_scanned: int
ts: timestamp
event ExploitAttempt:
source_ip: str
target_host: str
cve_id: str
success: bool
ts: timestamp
event LateralMovement:
source_host: str
target_host: str
method: str # "pass_the_hash", "psexec", "wmi", "ssh"
ts: timestamp
event DataExfiltration:
source_host: str
destination_ip: str
bytes_transferred: int
protocol: str
ts: timestamp
# Forecast: scan -> exploit -> lateral move -> exfiltration
stream APTForecast = NetworkScan as scan
-> ExploitAttempt where source_ip == scan.source_ip and success == true as exploit
-> LateralMovement where source_host == exploit.target_host as lateral
-> DataExfiltration where source_host == lateral.target_host as exfil
.within(7d)
.forecast(confidence: 0.4, horizon: 3d, warmup: 2000)
.where(forecast_probability > 0.5)
.emit(
alert_type: "APT_KILL_CHAIN_FORECAST",
severity: if forecast_probability > 0.8 then "critical" else "high",
source_ip: scan.source_ip,
compromised_host: exploit.target_host,
probability: forecast_probability,
expected_time: forecast_expected_time,
recommendation: if forecast_probability > 0.75
then "Isolate compromised host immediately, begin incident response"
else "Increase monitoring on subnet, block lateral movement paths"
)Full Example: IoT Predictive Maintenance
connector SensorMQTT = mqtt(
host: "iot-gateway.local",
port: 1883,
client_id: "forecaster"
)
event VibrationReading:
machine_id: str
amplitude: float
frequency: float
ts: timestamp
event TemperatureSpike:
machine_id: str
temp_celsius: float
ts: timestamp
event MachineFailure:
machine_id: str
error_code: str
ts: timestamp
# Forecast: vibration anomaly -> temp spike -> failure
stream PredictiveMaintenanceAlert = VibrationReading as vib
-> TemperatureSpike where machine_id == vib.machine_id and temp_celsius > 80 as temp
-> MachineFailure where machine_id == vib.machine_id as failure
.within(2h)
.forecast(confidence: 0.5, horizon: 1h, warmup: 1000)
.where(forecast_probability > 0.6)
.emit(
alert_type: "PREDICTED_FAILURE",
machine_id: vib.machine_id,
probability: forecast_probability,
expected_in: forecast_expected_time,
vibration_amplitude: vib.amplitude,
temperature: temp.temp_celsius,
recommendation: if forecast_probability > 0.8
then "Schedule immediate maintenance"
else "Monitor closely, maintenance may be needed"
)How It Works Internally
Pattern Markov Chain (PMC): The
.forecast()operator constructs a PMC from the pattern's NFA (the sequence structure) and a Prediction Suffix Tree (PST) that learns from the live stream.Online Learning: As events flow through the engine, the PST continuously updates its model of event transition probabilities. It uses variable-order Markov modeling -- deeper context where it helps, shallower where it does not.
Forward Algorithm: When a partial match is active (e.g.,
VibrationAnomalymatched, waiting forTemperatureRise), the PMC runs a forward computation over the product of NFA states and PST-predicted transition probabilities to estimate completion probability.Adaptive Pruning: The PST periodically prunes nodes whose distributions do not differ significantly from their parent, keeping the model compact and fast.
Smoothing: Laplace smoothing ensures all event types have non-zero probability, preventing the model from assigning zero probability to unseen transitions.