PostgreSQL CDC Tutorial
This tutorial walks you through setting up Varpulis with PostgreSQL Change Data Capture (CDC) for real-time database change streaming. You'll learn to configure logical replication, create CDC pipelines, filter by operation type, handle updates with old/new fields, and process changes from multiple tables.
Prerequisites
- PostgreSQL 10+ with
wal_level=logical(we'll configure this below) - Varpulis built with the
cdcfeature (cargo build --release --features cdc) - Basic VPL knowledge (see Getting Started)
Part 1: Setting Up PostgreSQL for CDC
Enable Logical Replication
PostgreSQL logical replication requires wal_level=logical. Edit postgresql.conf:
wal_level = logical
max_replication_slots = 4
max_wal_senders = 4Or start PostgreSQL with Docker (recommended for testing):
docker run -d --name varpulis-pg \
-e POSTGRES_USER=varpulis \
-e POSTGRES_PASSWORD=secret \
-e POSTGRES_DB=myapp \
-p 5432:5432 \
postgres:16-alpine \
-c wal_level=logical \
-c max_replication_slots=4 \
-c max_wal_senders=4Verify logical replication is enabled:
docker exec varpulis-pg psql -U varpulis -d myapp -c "SHOW wal_level;"
# wal_level
# -----------
# logicalCreate Tables and Publication
-- Create tables
CREATE TABLE orders (
id SERIAL PRIMARY KEY,
customer TEXT NOT NULL,
amount NUMERIC(10,2) NOT NULL,
status TEXT DEFAULT 'pending'
);
CREATE TABLE products (
id SERIAL PRIMARY KEY,
name TEXT NOT NULL,
price NUMERIC(10,2) NOT NULL,
stock INTEGER DEFAULT 0
);
-- Create a publication for CDC
CREATE PUBLICATION varpulis_pub FOR TABLE orders, products;The publication tells PostgreSQL which tables to include in the CDC stream.
Part 2: Your First CDC Pipeline
The Program
Create cdc_orders.vpl:
# Track all new orders in real-time via PostgreSQL CDC
connector PgCdc = cdc_postgres (
host: "localhost",
port: 5432,
dbname: "myapp",
user: "varpulis",
password: "secret",
publication: "varpulis_pub"
)
stream NewOrders = orders.INSERT
.from(PgCdc)
.emit(
customer: customer,
amount: amount,
status: status
)How It Works
The CDC connector uses PostgreSQL's logical replication protocol to stream WAL (Write-Ahead Log) changes as events. Each database change becomes a Varpulis event with:
event_type:{table}.{OPERATION}(e.g.,orders.INSERT,orders.UPDATE)_table: The source table name_op: The operation type (INSERT,UPDATE, orDELETE)- Data fields: Column values from the changed row
Test It
Insert some rows:
docker exec varpulis-pg psql -U varpulis -d myapp -c "
INSERT INTO orders (customer, amount) VALUES ('Alice', 99.99);
INSERT INTO orders (customer, amount) VALUES ('Bob', 250.00);
"You should see output events for each INSERT.
Part 3: Filtering by Operation
Each CDC event carries an _op field that you can filter on:
# Only track new inserts
stream Inserts = orders.INSERT
.from(PgCdc)
.where(_op == "INSERT")
.emit(customer: customer, amount: amount)
# Only track updates
stream Updates = orders.UPDATE
.from(PgCdc)
.where(_op == "UPDATE")
.emit(customer: customer, new_amount: amount)
# Only track deletes
stream Deletes = orders.DELETE
.from(PgCdc)
.where(_op == "DELETE")
.emit(deleted_id: id)You can also use the event_type which follows the pattern {table}.{OP}:
# Equivalent — filter by event type
stream Inserts = orders.INSERT
.from(PgCdc)
.emit(customer: customer, amount: amount)Part 4: Handling Updates
UPDATE events can include both old and new field values when PostgreSQL is configured with REPLICA IDENTITY FULL:
-- Enable full replica identity for old-value tracking
ALTER TABLE orders REPLICA IDENTITY FULL;With full replica identity, UPDATE events include old_* prefixed fields:
# Detect price changes
stream PriceChanges = orders.UPDATE
.from(PgCdc)
.where(_op == "UPDATE")
.emit(
customer: customer,
old_amount: old_amount,
new_amount: amount,
change: amount - old_amount
)Without REPLICA IDENTITY FULL, only the new values and primary key are available in UPDATE events.
Part 5: Multi-Table CDC
A single publication can include multiple tables. Events are automatically routed by their _table field:
connector PgCdc = cdc_postgres (
host: "localhost",
port: 5432,
dbname: "myapp",
user: "varpulis",
password: "secret",
publication: "varpulis_pub"
)
# Orders pipeline
stream OrderAlerts = orders.INSERT
.from(PgCdc)
.where(amount > 1000)
.emit(alert: "high_value_order", customer: customer, amount: amount)
# Product stock changes
stream LowStock = products.UPDATE
.from(PgCdc)
.where(stock < 10)
.emit(alert: "low_stock", product: name, remaining: stock)Part 6: Complete Example — Order Processing Pipeline
connector PgCdc = cdc_postgres (
host: "localhost",
port: 5432,
dbname: "myapp",
user: "varpulis",
password: "secret",
publication: "varpulis_pub"
)
connector AlertWebhook = http (
url: "http://localhost:8080/alerts",
method: "POST"
)
# High-value order alerts
stream HighValueOrders = orders.INSERT
.from(PgCdc)
.where(amount > 500)
.emit(
alert_type: "high_value_order",
customer: customer,
amount: amount,
status: status
)
.to(AlertWebhook)
# Order volume per minute
stream OrderVolume = orders.INSERT
.from(PgCdc)
.window(1m)
.aggregate(
order_count: count(),
total_amount: sum(amount),
avg_amount: avg(amount)
)
# Rapid order pattern: 3+ orders from the same customer within 5 minutes
stream RapidOrders = orders.INSERT as o1
.from(PgCdc)
-> orders.INSERT as o2 where o2.customer == o1.customer
-> orders.INSERT as o3 where o3.customer == o1.customer
.within(5m)
.emit(
alert: "rapid_orders",
customer: o1.customer,
order_count: 3,
total: o1.amount + o2.amount + o3.amount
)Troubleshooting
"replication slot already exists"
The connector creates a replication slot on first start. If a previous run didn't clean up:
-- List existing slots
SELECT slot_name, active FROM pg_replication_slots;
-- Drop a stale slot
SELECT pg_drop_replication_slot('varpulis_slot');WAL accumulation
Active replication slots prevent WAL cleanup. If the Varpulis connector is down for a long time, WAL will accumulate. Monitor with:
SELECT slot_name, pg_wal_lsn_diff(pg_current_wal_lsn(), restart_lsn) AS lag_bytes
FROM pg_replication_slots;Drop unused slots to allow WAL cleanup.
Permission errors
The PostgreSQL user needs REPLICATION privilege:
ALTER USER varpulis WITH REPLICATION;"wal_level is not logical"
Restart PostgreSQL after changing wal_level in postgresql.conf. The change requires a server restart.
Quick Reference
| Concept | Syntax / Value |
|---|---|
| Feature flag | --features cdc |
| Event type format | {table}.{INSERT|UPDATE|DELETE} |
| Metadata: table | _table field |
| Metadata: operation | _op field (INSERT, UPDATE, DELETE) |
| Old values (UPDATE) | old_* prefix (requires REPLICA IDENTITY FULL) |
| Publication | CREATE PUBLICATION name FOR TABLE t1, t2; |
| Replication slot | Auto-created by connector, name configurable |
Next Steps
- CDC Architecture -- Polling architecture, LSN tracking, type mapping
- Configuration Guide -- CDC configuration options
- Checkpointing Tutorial -- State persistence across restarts
- Language Tutorial -- VPL language reference