Skip to main content
Version: 2.0.0 (Latest)

Comprehensive Examples

This page demonstrates Padas Domain Language (PDL) usage through input → PDL → output walkthroughs. JSON shown as expected output is illustrative and non-normative (field order, envelope metadata, and numeric details may differ by deployment). For exact syntax, operator semantics, wildcard rules, window validation, and runtime behavior, use Reference; for a compact operator and command list, use Quick Reference.

Table of Contents

  1. Example format
  2. Syntax recap
  3. Single-event examples
  4. Parse examples
  5. Aggregation examples
  6. End-to-end pipelines
  7. Real-world scenarios
  8. Advanced patterns
  9. Production considerations
  10. Running examples

Example format

Unless stated otherwise, each worked example uses:

BlockContent
PurposeWhat the snippet demonstrates.
InputOne JSON event, or an array of events for windowed aggregation.
PDLQuery, pipeline stage list, or aggregation clause.
Expected outputRepresentative JSON after the stage (or aggregate emit).
NotesWindow boundaries, group_key shape, engine version caveats.

Grouped aggregate rows follow flat emit shape with group_key (see Reference → Aggregations and Glossary → Aggregation).

Syntax recap

Minimal reminders only—not a substitute for Quick Reference or Reference.

Query filters

status = "active" AND score > 80
(user_id = 123 OR user_id = 456) AND NOT deleted = true
action IN ["login", "logout"]
email ~= ".*@example\\.com$"
tags ?= "urgent"

?= on arrays checks membership (example: tags ?= "urgent"). For string matching on email domains, prefer ~= as above.

Transformations

eval total = price * qty, tier = if(total > 100, "high", "low")
rename user_id AS id
fields id, total, tier
output result type=string

Parsing

parse_kv raw
parse_json envelope
parse_csv line header="ts,user,evt"

Aggregations

count AS n timespan=5m
sum(amount) AS rev timespan=1h group_by tenant_id
avg(latency) AS avg_ms timespan=1m group_by svc where ok = true

Partitioning

partition_by user_id
partition_by tenant_id, user_id
parse_json body | partition_by user_id | count AS c timespan=5m group_by user_id

Single-event examples

Purpose

Retain events where status is active and score exceeds 80.

Input

{
"user_id": 123,
"status": "active",
"score": 85,
"ts": 1704067300000
}

PDL

status = "active" AND score > 80

Expected output

{
"user_id": 123,
"status": "active",
"score": 85,
"ts": 1704067300000
}

Notes

A non-matching event is dropped for this branch (no output row).


Purpose

Combine numeric threshold with deterministic string match on email using regex (avoid ?= on strings for domain checks).

Input

{
"user_id": 456,
"email": "user@example.com",
"login_count": 5
}

PDL

login_count > 3 AND email ~= ".*@example\\.com$"

Expected output

{
"user_id": 456,
"email": "user@example.com",
"login_count": 5
}

Notes

Escape . in the pattern as needed for your regex engine.


Purpose

Array membership with ?=.

Input

{
"event_id": "e1",
"tags": ["prod", "security", "urgent"]
}

PDL

tags ?= "urgent"

Expected output

{
"event_id": "e1",
"tags": ["prod", "security", "urgent"]
}

Purpose

eval adds derived fields; prior keys remain unless trimmed.

Input

{
"first_name": "John",
"last_name": "Doe",
"age": 30
}

PDL

eval full_name = first_name + " " + last_name, age_group = if(age >= 18, "adult", "minor")

Expected output

{
"first_name": "John",
"last_name": "Doe",
"age": 30,
"full_name": "John Doe",
"age_group": "adult"
}

Purpose

rename then project with fields.

Input

{
"user_id": 1,
"user_name": "jdoe",
"user_email": "jdoe@example.com"
}

PDL

rename user_id AS id, user_name AS username, user_email AS email | fields id, username

Expected output

{
"id": 1,
"username": "jdoe"
}

Parse examples

Purpose

parse_csv without header yields generic column names (field1, …).

Input

{
"line": "2024-01-01,user123,login,success"
}

PDL

parse_csv line

Expected output

{
"line": "2024-01-01,user123,login,success",
"field1": "2024-01-01",
"field2": "user123",
"field3": "login",
"field4": "success"
}

Notes

Supply header= when you want stable names (Reference / Quick Reference).


Purpose

parse_kv on space-separated key=value tokens.

Input

{
"raw": "user=john action=login ts=1704067300"
}

PDL

parse_kv raw

Expected output

{
"raw": "user=john action=login ts=1704067300",
"user": "john",
"action": "login",
"ts": "1704067300"
}

Purpose

parse_json merges object keys from a string field.

Input

{
"payload": "{\"user_id\": 123, \"name\": \"John\", \"active\": true}"
}

PDL

parse_json payload

Expected output

{
"payload": "{\"user_id\": 123, \"name\": \"John\", \"active\": true}",
"user_id": 123,
"name": "John",
"active": true
}

Purpose

parse_regex with named captures ((?P<name>…)).

Input

{
"log_line": "2024-01-20 14:30:25 [ERROR] connection failed host=db-01"
}

PDL

parse_regex log_line "(?P<ts>[^\\s]+ [^\\s]+) \\[(?P<level>[^\\]]+)\\] (?P<msg>.+) host=(?P<host>\\S+)"

Expected output

{
"log_line": "2024-01-20 14:30:25 [ERROR] connection failed host=db-01",
"ts": "2024-01-20 14:30:25",
"level": "ERROR",
"msg": "connection failed",
"host": "db-01"
}

Notes

Tune the pattern to your log format; catastrophic backtracking is a runtime risk on hot paths.

Aggregation examples

Assume all sample rows fall in the same window [window_start, window_end) for illustration.

Purpose

count only, no group_by — single flat row.

Input

[
{"user_id": 123, "ok": true},
{"user_id": 456, "ok": true},
{"user_id": 789, "ok": false}
]

PDL

count AS total_events timespan=5m

Expected output

{
"window_start": 1704067200000,
"window_end": 1704067500000,
"total_events": 3
}

Purpose

Multiple reducers, one window, no group_by.

Input

[
{"score": 85},
{"score": 92},
{"score": 45}
]

PDL

count AS total, avg(score) AS avg_score, max(score) AS highest timespan=5m

Expected output

{
"window_start": 1704067200000,
"window_end": 1704067500000,
"total": 3,
"avg_score": 74.0,
"highest": 92
}

Purpose

group_by — one row per key; group_key string matches group field value (single dimension).

Input

[
{"user_id": 123, "score": 85, "department": "engineering"},
{"user_id": 456, "score": 92, "department": "engineering"},
{"user_id": 789, "score": 45, "department": "sales"}
]

PDL

count AS dept_count, avg(score) AS dept_avg timespan=5m group_by department

Expected output

[
{
"group_key": "engineering",
"window_start": 1704067200000,
"window_end": 1704067500000,
"dept_count": 2,
"dept_avg": 88.5
},
{
"group_key": "sales",
"window_start": 1704067200000,
"window_end": 1704067500000,
"dept_count": 1,
"dept_avg": 45.0
}
]

Notes

Multi-group emissions are often represented as a JSON array at the API boundary; tasks may fan out one sink record per element (Reference).


Purpose

Composite group_key when group_by lists multiple fields (order preserved; key segments joined with |).

Input

[
{"tenant": "t1", "region": "us", "bytes": 100},
{"tenant": "t1", "region": "us", "bytes": 200},
{"tenant": "t1", "region": "eu", "bytes": 50}
]

PDL

sum(bytes) AS b timespan=5m group_by tenant, region

Expected output

[
{
"group_key": "t1|us",
"window_start": 1704067200000,
"window_end": 1704067500000,
"b": 300
},
{
"group_key": "t1|eu",
"window_start": 1704067200000,
"window_end": 1704067500000,
"b": 50
}
]

Purpose

collect embeds matching raw events in the aggregate row.

Input

[
{"user_id": 123, "score": 85},
{"user_id": 456, "score": 92},
{"user_id": 789, "score": 45}
]

PDL

collect(score > 80) AS high_scores timespan=5m

Expected output

{
"window_start": 1704067200000,
"window_end": 1704067500000,
"high_scores": [
{"user_id": 123, "score": 85},
{"user_id": 456, "score": 92}
]
}

Notes

collect increases memory vs scalar reducers; cap volume in production designs.

End-to-end pipelines

Purpose

parse_kveval → query gate.

Input

{
"raw_log": "user=john score=85 status=active ts=1704067300"
}

PDL

parse_kv raw_log | eval score_num = to_number(score) | score_num > 80

Expected output

{
"raw_log": "user=john score=85 status=active ts=1704067300",
"user": "john",
"score": "85",
"status": "active",
"ts": "1704067300",
"score_num": 85
}

Purpose

Filter high scores, then aggregate per department with group_key.

Input

[
{"user_id": 123, "score": 85, "department": "engineering"},
{"user_id": 456, "score": 92, "department": "engineering"},
{"user_id": 789, "score": 45, "department": "sales"}
]

PDL

score > 80 | count AS high_scores, collect(score > 80) AS top_performers timespan=5m group_by department

Expected output

[
{
"group_key": "engineering",
"window_start": 1704067200000,
"window_end": 1704067500000,
"high_scores": 2,
"top_performers": [
{"user_id": 123, "score": 85, "department": "engineering"},
{"user_id": 456, "score": 92, "department": "engineering"}
]
}
]

Notes

sales emits only if any row in that group passes score > 80 before the aggregate; otherwise that group may be absent from the array.

Real-world scenarios

Purpose

Chained eval on a nested order payload (indices are a fixed-size illustration, not a generic cart fold).

Input

{
"order": {
"id": "ORD-1",
"items": [
{"price": 999.99, "quantity": 1},
{"price": 29.99, "quantity": 2}
],
"discount_code": "SAVE10",
"shipping_address": {"state": "CA"}
}
}

PDL

eval item_total = order.items[0].price * order.items[0].quantity + order.items[1].price * order.items[1].quantity | eval discount_amount = if(order.discount_code = "SAVE10", item_total * 0.1, 0) | eval final_total = item_total - discount_amount | eval tax = if(order.shipping_address.state = "CA", final_total * 0.08, 0) | eval order_total = final_total + tax

Expected output

{
"item_total": 1059.97,
"discount_amount": 105.997,
"final_total": 953.973,
"tax": 76.31784,
"order_total": 1030.29084
}

Notes

Production pipelines usually unnest line items upstream instead of hard-coding indices.


Purpose

Structured log line → fields → alert label using nested metrics.

Input

{
"log_entry": "2024-01-20 14:30:25 [ERROR] Database connection failed",
"metrics": {"cpu_usage": 85.5, "memory_usage": 78.2}
}

PDL

parse_regex log_entry "(?P<ts>\\S+ \\S+) \\[(?P<level>[^\\]]+)\\] (?P<message>.+)" | eval high_cpu = metrics.cpu_usage > 80 | eval high_memory = metrics.memory_usage > 75 | eval system_stress = high_cpu OR high_memory | eval alert_level = case(level = "ERROR" AND system_stress, "critical", level = "ERROR", "warning", "info")

Expected output

{
"ts": "2024-01-20 14:30:25",
"level": "ERROR",
"message": "Database connection failed",
"high_cpu": true,
"high_memory": true,
"system_stress": true,
"alert_level": "critical"
}

Purpose

Flat clickstream rows: windowed counts per action (valid group_by on scalar fields).

Input

[
{"user_id": 123, "action": "view", "page": "/products", "ts": 1},
{"user_id": 123, "action": "click", "page": "/products", "ts": 2},
{"user_id": 123, "action": "purchase", "page": "/checkout", "ts": 3}
]

PDL

count AS n timespan=1h group_by action

Expected output

[
{"group_key": "view", "window_start": 0, "window_end": 3600000, "n": 1},
{"group_key": "click", "window_start": 0, "window_end": 3600000, "n": 1},
{"group_key": "purchase", "window_start": 0, "window_end": 3600000, "n": 1}
]

Notes

window_start / window_end here are placeholders; real values come from event-time assignment in the engine.


Purpose

API-style flat events: request count and average latency per endpoint in one windowed statement.

Input

[
{"endpoint": "/api/users", "status_code": 200, "latency_ms": 150},
{"endpoint": "/api/users", "status_code": 500, "latency_ms": 5000},
{"endpoint": "/api/orders", "status_code": 201, "latency_ms": 300}
]

PDL

count AS requests, avg(latency_ms) AS avg_latency_ms timespan=1m group_by endpoint

Expected output

[
{
"group_key": "/api/users",
"window_start": 1704067200000,
"window_end": 1704067260000,
"requests": 2,
"avg_latency_ms": 2575.0
},
{
"group_key": "/api/orders",
"window_start": 1704067200000,
"window_end": 1704067260000,
"requests": 1,
"avg_latency_ms": 300.0
}
]

Notes

Add a separate where status_code >= 400 aggregate or per-event eval if you need error-only metrics (Reference).

Advanced patterns

Purpose

Partition key then aggregate with rekey=true for aligned downstream routing.

Input

[
{"user_id": "u1", "amount": 10},
{"user_id": "u1", "amount": 20}
]

PDL

partition_by user_id | sum(amount) AS total timespan=1h group_by user_id rekey=true

Expected output

{
"group_key": "u1",
"window_start": 1704067200000,
"window_end": 1711843200000,
"total": 30
}

Notes

rekey behavior and routing contract: Reference.


Purpose

first / last (processing order) vs earliest / latest (envelope time)—see Reference for definitions.

PDL

count AS n, first(phase) AS first_phase, last(phase) AS last_phase timespan=5m group_by job_id
earliest(message) AS first_msg, latest(message) AS last_msg timespan=5m group_by correlation_id

Notes

Envelope timestamps must be consistent with window assignment for earliest/latest to match operator expectations.

Production considerations

  • Filter early with cheap queries before parse_* when predicates do not depend on parsed fields.
  • Project early with fields to drop large blobs before eval or aggregations.
  • Bound ~= and parse_regex patterns to reduce backtracking cost.
  • Size timespan, slide, and group_by cardinality to control state and emit rate; avoid large collect windows without caps.
  • Guard divisions with if; use coalesce for optional paths.
  • Validate PDL with the same padas-pdl / Core version as production.

Running examples

Embed the padas-pdl crate (layout varies by repository).

use padas_pdl::{NewPdlProcessor, PdlInput};
use serde_json::json;

let mut processor = NewPdlProcessor::new();

let query_input = PdlInput::json(json!({"status": "active", "score": 85}));
let _ = processor.process_query_with_input(r#"status = "active" AND score > 80"#, &query_input)?;

let pipe_input = PdlInput::json(json!({"raw": "user=u1 score=85 status=active"}));
let _ = processor.process_pipeline_with_input(
r#"parse_kv raw | eval n = to_number(score) | n > 80"#,
&pipe_input,
)?;

See the examples/ directory in padas-pdl for runnable snippets.