A single credit card gets swiped at a gas station in Houston at 2:13 PM. Forty seconds later, the same card number appears at an electronics store in Tokyo. Within those forty seconds, your system needs to ingest both events, correlate them across millions of concurrent transaction streams, recognize the physical impossibility, and fire a fraud alert — all before the Tokyo merchant finishes printing the receipt. This is not a hypothetical scenario. Visa processes over 65,000 transactions per second at peak, and fraudsters are getting faster every year. Traditional batch jobs that run overnight are worthless here. You need Complex Event Processing, and Apache Flink is the best engine to build it on.
In this guide, we are going to build real-time CEP pipelines from scratch. Not toy examples — complete, compilable Java code that you can adapt for production fraud detection, IoT monitoring, and financial market analysis. By the end, you will understand Flink’s CEP library deeply enough to design your own pattern-matching pipelines for any domain.
What is Complex Event Processing (CEP)?
Complex Event Processing is a methodology for detecting meaningful patterns across streams of events in real time. The key word is patterns. Simple stream processing might filter or transform individual events — “give me all transactions over $1,000.” CEP goes further: it looks for sequences, combinations, and temporal relationships between multiple events.
Simple Events vs Complex Events
A simple event is a single, atomic occurrence: a temperature reading, a stock trade, a log entry. A complex event is a higher-level pattern derived from multiple simple events. For example:
- Simple event: “User #4821 made a $50 purchase at Starbucks.”
- Complex event: “User #4821 made three purchases totaling over $2,000 within five minutes from three different countries.” This complex event only exists because a CEP engine recognized the pattern across the simple events.
CEP vs Traditional Processing
Understanding where CEP fits relative to batch and stream processing is crucial:
| Feature | Batch Processing | Stream Processing | CEP |
|---|---|---|---|
| Latency | Minutes to hours | Milliseconds to seconds | Milliseconds to seconds |
| Data Model | Bounded datasets | Unbounded streams | Unbounded streams with pattern state |
| Pattern Detection | Post-hoc analysis | Per-event transformations | Multi-event temporal patterns |
| State Management | Minimal (reprocess from scratch) | Windowed aggregations | Pattern match buffers with NFA |
| Use Case Example | Monthly reports | Real-time dashboards | Fraud detection, anomaly sequences |
| Tools | Spark, Hadoop MapReduce | Kafka Streams, Flink DataStream | Flink CEP, Esper, Siddhi |
Real-World CEP Applications
CEP is not a niche technology. It powers some of the most critical systems in the world:
- Fraud Detection: Banks and payment processors use CEP to catch fraudulent transaction patterns in real time — velocity checks, geographic impossibility, unusual merchant categories.
- IoT Monitoring: Manufacturing plants and smart buildings use CEP to detect equipment failure sequences before catastrophic breakdowns occur.
- Algorithmic Trading: Hedge funds detect price-volume patterns across multiple securities within microsecond windows to trigger automated trades.
- Network Security: SIEM platforms use CEP to correlate firewall logs, authentication events, and data transfer patterns to detect multi-stage cyberattacks.
- Supply Chain: Real-time tracking of shipment events to detect delays, rerouting needs, or customs anomalies before they cascade.
Why Apache Flink for CEP?
There are several stream processing engines on the market, but Flink stands apart for CEP workloads. Here is why.
Flink’s Architecture for CEP
Flink was designed from the ground up as a streaming-first engine. Unlike Spark, which bolted streaming onto a batch framework, Flink treats streams as the fundamental data model. This matters enormously for CEP because:
- DataStream API: Flink’s core API operates on unbounded streams, giving you fine-grained control over event processing, keying, and windowing.
- Event Time Processing: Flink natively supports event time semantics with watermarks, which is essential for CEP. When you are matching patterns across events, you need to reason about when events actually happened, not when they arrived at your system.
- Watermarks: Flink’s watermark mechanism tracks the progress of event time through the stream, enabling correct handling of out-of-order events — a constant reality in distributed systems.
- Flink CEP Library (
flink-cep): Flink ships a dedicated CEP library that implements a Non-deterministic Finite Automaton (NFA) for pattern matching. You define patterns declaratively, and the engine handles the complex state management internally. - Exactly-Once Semantics: Flink’s checkpointing mechanism guarantees exactly-once processing, so your fraud alerts will never be duplicated or lost.
- Low Latency: Flink processes events within milliseconds, not micro-batches. For CEP, where you need to match patterns as fast as possible, this is non-negotiable.
Flink CEP vs the Competition
| Feature | Flink CEP | Kafka Streams | Esper | Spark Structured Streaming | Kinesis Analytics |
|---|---|---|---|---|---|
| Pattern Matching | Built-in NFA-based | Manual (no CEP library) | EPL query language | No native CEP | SQL-based only |
| Latency | True streaming (ms) | True streaming (ms) | In-memory (ms) | Micro-batch (100ms+) | Near real-time |
| Scalability | Distributed cluster | Embedded scaling | Single JVM | Distributed cluster | AWS managed |
| Exactly-Once | Yes | Yes | No | Yes | Yes |
| Fault Tolerance | Checkpointing + savepoints | Changelog topics | Limited | Checkpointing | Managed snapshots |
| Event Time Support | Native watermarks | Timestamp extractors | Limited | Native watermarks | Limited |
| Best For | Complex temporal patterns at scale | Simple event-driven microservices | Prototyping, embedded CEP | Batch + streaming hybrid | AWS-native SQL analytics |
Setting Up Your Flink CEP Project
Prerequisites
Before we write any code, make sure you have:
- Java 11 or 17 (Flink 1.18+ supports both; Java 17 is recommended for new projects)
- Maven 3.8+ or Gradle 7+
- An IDE — IntelliJ IDEA with the Flink plugin is ideal
- Docker (optional, for running Kafka and Flink locally)
Project Structure
Here is the layout we will use throughout this guide:
flink-cep-pipeline/
├── pom.xml
├── src/main/java/com/example/cep/
│ ├── FlinkCEPApplication.java
│ ├── events/
│ │ ├── Transaction.java
│ │ ├── SensorReading.java
│ │ └── StockTick.java
│ ├── patterns/
│ │ ├── FraudPatterns.java
│ │ ├── IoTPatterns.java
│ │ └── StockPatterns.java
│ ├── processors/
│ │ ├── FraudAlertProcessor.java
│ │ ├── AnomalyAlertProcessor.java
│ │ └── TradingSignalProcessor.java
│ └── sources/
│ └── KafkaSourceBuilder.java
└── src/main/resources/
└── log4j2.properties
Maven pom.xml
This is the complete Maven configuration with all the Flink CEP dependencies you need:
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0
http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<groupId>com.example</groupId>
<artifactId>flink-cep-pipeline</artifactId>
<version>1.0.0</version>
<packaging>jar</packaging>
<properties>
<flink.version>1.18.1</flink.version>
<java.version>17</java.version>
<kafka.version>3.6.1</kafka.version>
<maven.compiler.source>${java.version}</maven.compiler.source>
<maven.compiler.target>${java.version}</maven.compiler.target>
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
</properties>
<dependencies>
<!-- Flink Core -->
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-streaming-java</artifactId>
<version>${flink.version}</version>
<scope>provided</scope>
</dependency>
<!-- Flink CEP Library -->
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-cep</artifactId>
<version>${flink.version}</version>
</dependency>
<!-- Flink Kafka Connector -->
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-connector-kafka</artifactId>
<version>3.1.0-1.18</version>
</dependency>
<!-- Flink JSON Format -->
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-json</artifactId>
<version>${flink.version}</version>
</dependency>
<!-- Flink Clients (for local execution) -->
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-clients</artifactId>
<version>${flink.version}</version>
<scope>provided</scope>
</dependency>
<!-- Jackson for JSON serialization -->
<dependency>
<groupId>com.fasterxml.jackson.core</groupId>
<artifactId>jackson-databind</artifactId>
<version>2.16.1</version>
</dependency>
<!-- SLF4J + Log4j2 -->
<dependency>
<groupId>org.apache.logging.log4j</groupId>
<artifactId>log4j-slf4j-impl</artifactId>
<version>2.22.1</version>
<scope>runtime</scope>
</dependency>
<dependency>
<groupId>org.apache.logging.log4j</groupId>
<artifactId>log4j-api</artifactId>
<version>2.22.1</version>
<scope>runtime</scope>
</dependency>
<dependency>
<groupId>org.apache.logging.log4j</groupId>
<artifactId>log4j-core</artifactId>
<version>2.22.1</version>
<scope>runtime</scope>
</dependency>
</dependencies>
<build>
<plugins>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-shade-plugin</artifactId>
<version>3.5.1</version>
<executions>
<execution>
<phase>package</phase>
<goals><goal>shade</goal></goals>
<configuration>
<transformers>
<transformer implementation=
"org.apache.maven.plugins.shade.resource.ManifestResourceTransformer">
<mainClass>com.example.cep.FlinkCEPApplication</mainClass>
</transformer>
</transformers>
</configuration>
</execution>
</executions>
</plugin>
</plugins>
</build>
</project>
Gradle Alternative
If you prefer Gradle, here is the equivalent build.gradle.kts:
plugins {
java
id("com.github.johnrengelman.shadow") version "8.1.1"
}
java {
sourceCompatibility = JavaVersion.VERSION_17
targetCompatibility = JavaVersion.VERSION_17
}
val flinkVersion = "1.18.1"
dependencies {
compileOnly("org.apache.flink:flink-streaming-java:$flinkVersion")
compileOnly("org.apache.flink:flink-clients:$flinkVersion")
implementation("org.apache.flink:flink-cep:$flinkVersion")
implementation("org.apache.flink:flink-connector-kafka:3.1.0-1.18")
implementation("org.apache.flink:flink-json:$flinkVersion")
implementation("com.fasterxml.jackson.core:jackson-databind:2.16.1")
runtimeOnly("org.apache.logging.log4j:log4j-slf4j-impl:2.22.1")
runtimeOnly("org.apache.logging.log4j:log4j-core:2.22.1")
}
flink-streaming-java and flink-clients dependencies are marked as provided (Maven) or compileOnly (Gradle) because the Flink cluster already includes them. When running locally in your IDE, add them to your run configuration’s classpath.
Understanding Flink CEP Pattern API
The Flink CEP library gives you a declarative API to define event patterns. Under the hood, it compiles your pattern definition into a Non-deterministic Finite Automaton (NFA) that efficiently matches patterns against the incoming event stream. Let us walk through every major concept.
Pattern Basics
Every pattern starts with Pattern.begin() and chains additional states:
// Strict contiguity: events must be directly adjacent
Pattern<Event, ?> strict = Pattern.<Event>begin("start")
.where(new SimpleCondition<Event>() {
@Override
public boolean filter(Event event) {
return event.getType().equals("login_failed");
}
})
.next("second") // MUST be the very next event
.where(new SimpleCondition<Event>() {
@Override
public boolean filter(Event event) {
return event.getType().equals("login_failed");
}
})
.next("third")
.where(new SimpleCondition<Event>() {
@Override
public boolean filter(Event event) {
return event.getType().equals("login_failed");
}
});
// Relaxed contiguity: allows non-matching events in between
Pattern<Event, ?> relaxed = Pattern.<Event>begin("start")
.where(/* ... */)
.followedBy("end") // matching events can have other events between them
.where(/* ... */);
// Non-deterministic relaxed contiguity:
// matches all possible combinations
Pattern<Event, ?> nonDeterministic = Pattern.<Event>begin("start")
.where(/* ... */)
.followedByAny("end") // considers ALL matching events, not just first
.where(/* ... */);
Contiguity: Strict, Relaxed, Non-Deterministic
This is one of the most important concepts in Flink CEP. Suppose you have the event stream: A, C, B1, B2 and your pattern is “A followed by B”:
next()— Strict: No match. C appears between A and B1, breaking strict contiguity.followedBy()— Relaxed: Matches{A, B1}. Skips C, takes the first matching B.followedByAny()— Non-deterministic relaxed: Matches{A, B1}AND{A, B2}. Considers all possible matching events.
Quantifiers
// Exactly N times
Pattern<Event, ?> exactly3 = Pattern.<Event>begin("failures")
.where(condition)
.times(3); // exactly 3 matching events
// N or more times
Pattern<Event, ?> atLeast3 = Pattern.<Event>begin("failures")
.where(condition)
.timesOrMore(3); // 3 or more matching events
// Range
Pattern<Event, ?> range = Pattern.<Event>begin("failures")
.where(condition)
.times(2, 5); // between 2 and 5 matching events
// One or more (greedy)
Pattern<Event, ?> oneOrMore = Pattern.<Event>begin("failures")
.where(condition)
.oneOrMore()
.greedy(); // match as many as possible
// Optional
Pattern<Event, ?> withOptional = Pattern.<Event>begin("start")
.where(startCondition)
.next("middle")
.where(middleCondition)
.optional() // this state may or may not match
.next("end")
.where(endCondition);
Conditions
// Simple condition — checks current event only
.where(new SimpleCondition<Event>() {
@Override
public boolean filter(Event event) {
return event.getAmount() > 1000.0;
}
})
// Iterative condition — can reference previously matched events
.where(new IterativeCondition<Event>() {
@Override
public boolean filter(Event event, Context<Event> ctx) {
// Compare with previously matched event
for (Event prev : ctx.getEventsForPattern("start")) {
if (!event.getLocation().equals(prev.getLocation())) {
return true; // different location than start event
}
}
return false;
}
})
// OR condition
.where(new SimpleCondition<Event>() {
@Override
public boolean filter(Event event) {
return event.getType().equals("withdrawal");
}
})
.or(new SimpleCondition<Event>() {
@Override
public boolean filter(Event event) {
return event.getType().equals("transfer");
}
})
// Until condition (stop condition for looping patterns)
.oneOrMore()
.until(new SimpleCondition<Event>() {
@Override
public boolean filter(Event event) {
return event.getType().equals("logout");
}
})
Time Constraints
// The entire pattern must complete within 5 minutes
Pattern<Event, ?> timedPattern = Pattern.<Event>begin("first")
.where(/* ... */)
.followedBy("second")
.where(/* ... */)
.followedBy("third")
.where(/* ... */)
.within(Time.minutes(5));
within() constraint applies to the entire pattern, measured from the first matching event. If the first event matches at T=0 and you set within(Time.minutes(5)), the entire pattern must complete before T=5min. Partially matched patterns that time out are discarded (or can be captured via timeout handling, which we will cover later).
Hands-On: Credit Card Fraud Detection Pipeline
Let us build our first complete CEP pipeline — a credit card fraud detection system. This is the classic CEP use case, and we will implement three different fraud patterns.
The Transaction Event Class
package com.example.cep.events;
public class Transaction implements java.io.Serializable {
private String transactionId;
private String userId;
private double amount;
private long timestamp;
private String location;
private String merchantCategory;
private String cardNumber;
// Default constructor for serialization
public Transaction() {}
public Transaction(String transactionId, String userId, double amount,
long timestamp, String location, String merchantCategory,
String cardNumber) {
this.transactionId = transactionId;
this.userId = userId;
this.amount = amount;
this.timestamp = timestamp;
this.location = location;
this.merchantCategory = merchantCategory;
this.cardNumber = cardNumber;
}
// Getters and setters
public String getTransactionId() { return transactionId; }
public void setTransactionId(String transactionId) { this.transactionId = transactionId; }
public String getUserId() { return userId; }
public void setUserId(String userId) { this.userId = userId; }
public double getAmount() { return amount; }
public void setAmount(double amount) { this.amount = amount; }
public long getTimestamp() { return timestamp; }
public void setTimestamp(long timestamp) { this.timestamp = timestamp; }
public String getLocation() { return location; }
public void setLocation(String location) { this.location = location; }
public String getMerchantCategory() { return merchantCategory; }
public void setMerchantCategory(String mc) { this.merchantCategory = mc; }
public String getCardNumber() { return cardNumber; }
public void setCardNumber(String cardNumber) { this.cardNumber = cardNumber; }
@Override
public String toString() {
return String.format("Transaction{id=%s, user=%s, amount=%.2f, loc=%s, time=%d}",
transactionId, userId, amount, location, timestamp);
}
}
The Fraud Alert Class
package com.example.cep.events;
import java.util.List;
public class FraudAlert implements java.io.Serializable {
private String alertId;
private String userId;
private String patternType;
private String description;
private List<Transaction> matchedTransactions;
private long detectedAt;
public FraudAlert(String alertId, String userId, String patternType,
String description, List<Transaction> matchedTransactions) {
this.alertId = alertId;
this.userId = userId;
this.patternType = patternType;
this.description = description;
this.matchedTransactions = matchedTransactions;
this.detectedAt = System.currentTimeMillis();
}
// Getters
public String getAlertId() { return alertId; }
public String getUserId() { return userId; }
public String getPatternType() { return patternType; }
public String getDescription() { return description; }
public List<Transaction> getMatchedTransactions() { return matchedTransactions; }
public long getDetectedAt() { return detectedAt; }
@Override
public String toString() {
return String.format("FRAUD ALERT [%s] User: %s | Pattern: %s | %s | Transactions: %d",
alertId, userId, patternType, description, matchedTransactions.size());
}
}
Defining Fraud Patterns
Now the interesting part. We will define three fraud detection patterns:
package com.example.cep.patterns;
import com.example.cep.events.Transaction;
import org.apache.flink.cep.pattern.Pattern;
import org.apache.flink.cep.pattern.conditions.IterativeCondition;
import org.apache.flink.cep.pattern.conditions.SimpleCondition;
import org.apache.flink.streaming.api.windowing.time.Time;
public class FraudPatterns {
/**
* Pattern 1: Geographic Impossibility
* Three transactions over $500 within 5 minutes from different locations.
* If a user is spending in New York, then London, then Tokyo within 5 minutes,
* something is very wrong.
*/
public static Pattern<Transaction, ?> geographicImpossibility() {
return Pattern.<Transaction>begin("first")
.where(new SimpleCondition<Transaction>() {
@Override
public boolean filter(Transaction tx) {
return tx.getAmount() > 500.0;
}
})
.followedBy("second")
.where(new IterativeCondition<Transaction>() {
@Override
public boolean filter(Transaction tx, Context<Transaction> ctx) {
if (tx.getAmount() <= 500.0) return false;
for (Transaction first : ctx.getEventsForPattern("first")) {
if (!tx.getLocation().equals(first.getLocation())) {
return true;
}
}
return false;
}
})
.followedBy("third")
.where(new IterativeCondition<Transaction>() {
@Override
public boolean filter(Transaction tx, Context<Transaction> ctx) {
if (tx.getAmount() <= 500.0) return false;
for (Transaction first : ctx.getEventsForPattern("first")) {
for (Transaction second : ctx.getEventsForPattern("second")) {
if (!tx.getLocation().equals(first.getLocation())
&& !tx.getLocation().equals(second.getLocation())) {
return true;
}
}
}
return false;
}
})
.within(Time.minutes(5));
}
/**
* Pattern 2: Card Testing Attack
* A small "test" transaction ($0.01–$5.00) followed by a large transaction
* ($1000+) within 1 minute. Fraudsters often test stolen cards with tiny
* purchases before going big.
*/
public static Pattern<Transaction, ?> cardTestingAttack() {
return Pattern.<Transaction>begin("test_charge")
.where(new SimpleCondition<Transaction>() {
@Override
public boolean filter(Transaction tx) {
return tx.getAmount() >= 0.01 && tx.getAmount() <= 5.0;
}
})
.followedBy("big_charge")
.where(new SimpleCondition<Transaction>() {
@Override
public boolean filter(Transaction tx) {
return tx.getAmount() >= 1000.0;
}
})
.within(Time.minutes(1));
}
/**
* Pattern 3: Transaction Velocity
* More than 5 transactions within 2 minutes. Even legitimate users
* rarely make this many purchases in such a short time.
*/
public static Pattern<Transaction, ?> highVelocity() {
return Pattern.<Transaction>begin("transactions")
.where(new SimpleCondition<Transaction>() {
@Override
public boolean filter(Transaction tx) {
return tx.getAmount() > 0;
}
})
.timesOrMore(5)
.within(Time.minutes(2));
}
}
Processing Matched Patterns
package com.example.cep.processors;
import com.example.cep.events.FraudAlert;
import com.example.cep.events.Transaction;
import org.apache.flink.cep.functions.PatternProcessFunction;
import org.apache.flink.util.Collector;
import java.util.*;
public class FraudAlertProcessor
extends PatternProcessFunction<Transaction, FraudAlert> {
private final String patternType;
public FraudAlertProcessor(String patternType) {
this.patternType = patternType;
}
@Override
public void processMatch(Map<String, List<Transaction>> match,
Context ctx,
Collector<FraudAlert> out) {
// Collect all matched transactions from all pattern states
List<Transaction> allTransactions = new ArrayList<>();
match.values().forEach(allTransactions::addAll);
// Extract user ID from first transaction
String userId = allTransactions.get(0).getUserId();
// Build a description
String description = buildDescription(match);
// Generate alert
String alertId = UUID.randomUUID().toString();
FraudAlert alert = new FraudAlert(
alertId, userId, patternType, description, allTransactions
);
out.collect(alert);
}
private String buildDescription(Map<String, List<Transaction>> match) {
StringBuilder sb = new StringBuilder();
sb.append("Matched pattern '").append(patternType).append("': ");
double total = 0;
Set<String> locations = new HashSet<>();
int count = 0;
for (List<Transaction> txList : match.values()) {
for (Transaction tx : txList) {
total += tx.getAmount();
locations.add(tx.getLocation());
count++;
}
}
sb.append(count).append(" transactions, ");
sb.append(String.format("total $%.2f, ", total));
sb.append("locations: ").append(locations);
return sb.toString();
}
}
The Complete Fraud Detection Pipeline
Here is the entire pipeline wired together — from Kafka source to fraud alert output:
package com.example.cep;
import com.example.cep.events.FraudAlert;
import com.example.cep.events.Transaction;
import com.example.cep.patterns.FraudPatterns;
import com.example.cep.processors.FraudAlertProcessor;
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.api.common.serialization.SimpleStringSchema;
import org.apache.flink.cep.CEP;
import org.apache.flink.cep.PatternStream;
import org.apache.flink.cep.pattern.Pattern;
import org.apache.flink.connector.kafka.source.KafkaSource;
import org.apache.flink.connector.kafka.source.enumerator.initializer.OffsetsInitializer;
import org.apache.flink.connector.kafka.sink.KafkaSink;
import org.apache.flink.connector.kafka.sink.KafkaRecordSerializationSchema;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import com.fasterxml.jackson.databind.ObjectMapper;
import java.time.Duration;
public class FraudDetectionPipeline {
public static void main(String[] args) throws Exception {
// 1. Set up the streaming execution environment
StreamExecutionEnvironment env =
StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(4);
// Enable checkpointing for exactly-once semantics
env.enableCheckpointing(60_000); // checkpoint every 60 seconds
// 2. Create Kafka source for transactions
KafkaSource<String> kafkaSource = KafkaSource.<String>builder()
.setBootstrapServers("localhost:9092")
.setTopics("transactions")
.setGroupId("fraud-detection-group")
.setStartingOffsets(OffsetsInitializer.latest())
.setValueOnlyDeserializer(new SimpleStringSchema())
.build();
// 3. Read from Kafka with event time watermarks
ObjectMapper mapper = new ObjectMapper();
DataStream<Transaction> transactions = env
.fromSource(kafkaSource, WatermarkStrategy
.<String>forBoundedOutOfOrderness(Duration.ofSeconds(5))
.withTimestampAssigner((event, timestamp) -> {
try {
return mapper.readValue(event, Transaction.class)
.getTimestamp();
} catch (Exception e) {
return timestamp;
}
}), "Kafka Transactions")
.map(json -> mapper.readValue(json, Transaction.class))
.keyBy(Transaction::getUserId); // Key by user for per-user patterns
// 4. Apply Pattern 1: Geographic Impossibility
Pattern<Transaction, ?> geoPattern = FraudPatterns.geographicImpossibility();
PatternStream<Transaction> geoPatternStream = CEP.pattern(
transactions, geoPattern);
DataStream<FraudAlert> geoAlerts = geoPatternStream.process(
new FraudAlertProcessor("GEOGRAPHIC_IMPOSSIBILITY"));
// 5. Apply Pattern 2: Card Testing Attack
Pattern<Transaction, ?> testPattern = FraudPatterns.cardTestingAttack();
PatternStream<Transaction> testPatternStream = CEP.pattern(
transactions, testPattern);
DataStream<FraudAlert> testAlerts = testPatternStream.process(
new FraudAlertProcessor("CARD_TESTING_ATTACK"));
// 6. Apply Pattern 3: High Velocity
Pattern<Transaction, ?> velocityPattern = FraudPatterns.highVelocity();
PatternStream<Transaction> velocityPatternStream = CEP.pattern(
transactions, velocityPattern);
DataStream<FraudAlert> velocityAlerts = velocityPatternStream.process(
new FraudAlertProcessor("HIGH_VELOCITY"));
// 7. Union all alerts and sink to Kafka
DataStream<FraudAlert> allAlerts = geoAlerts
.union(testAlerts)
.union(velocityAlerts);
// Print to console (for development)
allAlerts.print("FRAUD ALERT");
// Sink to Kafka alerts topic
KafkaSink<String> alertSink = KafkaSink.<String>builder()
.setBootstrapServers("localhost:9092")
.setRecordSerializer(
KafkaRecordSerializationSchema.builder()
.setTopic("fraud-alerts")
.setValueSerializationSchema(new SimpleStringSchema())
.build()
)
.build();
allAlerts
.map(alert -> mapper.writeValueAsString(alert))
.sinkTo(alertSink);
// 8. Execute the pipeline
env.execute("Credit Card Fraud Detection CEP Pipeline");
}
}
CEP.pattern() call creates a separate NFA instance per key (per user), so patterns are evaluated independently and do not interfere with each other. The keyBy(Transaction::getUserId) call is critical — it ensures that patterns only match events belonging to the same user.
Hands-On: IoT Sensor Anomaly Detection
Our second pipeline detects anomalies in IoT sensor data. The pattern we want to catch: a sensor reports three consecutive rising temperature readings above a threshold within one minute, followed by a pressure drop. This sequence often indicates an impending equipment failure.
Sensor Event Class
package com.example.cep.events;
public class SensorReading implements java.io.Serializable {
private String sensorId;
private double temperature;
private double pressure;
private long timestamp;
private String location;
public SensorReading() {}
public SensorReading(String sensorId, double temperature, double pressure,
long timestamp, String location) {
this.sensorId = sensorId;
this.temperature = temperature;
this.pressure = pressure;
this.timestamp = timestamp;
this.location = location;
}
public String getSensorId() { return sensorId; }
public void setSensorId(String sensorId) { this.sensorId = sensorId; }
public double getTemperature() { return temperature; }
public void setTemperature(double temperature) { this.temperature = temperature; }
public double getPressure() { return pressure; }
public void setPressure(double pressure) { this.pressure = pressure; }
public long getTimestamp() { return timestamp; }
public void setTimestamp(long timestamp) { this.timestamp = timestamp; }
public String getLocation() { return location; }
public void setLocation(String location) { this.location = location; }
@Override
public String toString() {
return String.format("Sensor{id=%s, temp=%.1f, pressure=%.1f, time=%d}",
sensorId, temperature, pressure, timestamp);
}
}
Complete IoT Anomaly Pipeline
package com.example.cep;
import com.example.cep.events.SensorReading;
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.cep.CEP;
import org.apache.flink.cep.PatternStream;
import org.apache.flink.cep.functions.PatternProcessFunction;
import org.apache.flink.cep.pattern.Pattern;
import org.apache.flink.cep.pattern.conditions.IterativeCondition;
import org.apache.flink.cep.pattern.conditions.SimpleCondition;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.windowing.time.Time;
import org.apache.flink.util.Collector;
import java.time.Duration;
import java.util.*;
public class IoTAnomalyDetectionPipeline {
private static final double TEMP_THRESHOLD = 85.0; // degrees Celsius
private static final double PRESSURE_DROP_THRESHOLD = 10.0; // PSI
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env =
StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(2);
env.enableCheckpointing(30_000);
// Simulated sensor data source (replace with Kafka in production)
DataStream<SensorReading> sensorStream = env
.addSource(new SimulatedSensorSource()) // your custom source
.assignTimestampsAndWatermarks(
WatermarkStrategy
.<SensorReading>forBoundedOutOfOrderness(Duration.ofSeconds(3))
.withTimestampAssigner((reading, ts) -> reading.getTimestamp())
)
.keyBy(SensorReading::getSensorId);
// Pattern: 3 consecutive high-temp readings, then a pressure drop
Pattern<SensorReading, ?> anomalyPattern = Pattern
.<SensorReading>begin("rising_temp_1")
.where(new SimpleCondition<SensorReading>() {
@Override
public boolean filter(SensorReading reading) {
return reading.getTemperature() > TEMP_THRESHOLD;
}
})
.next("rising_temp_2")
.where(new IterativeCondition<SensorReading>() {
@Override
public boolean filter(SensorReading reading,
Context<SensorReading> ctx) {
if (reading.getTemperature() <= TEMP_THRESHOLD) return false;
for (SensorReading prev : ctx.getEventsForPattern("rising_temp_1")) {
return reading.getTemperature() > prev.getTemperature();
}
return false;
}
})
.next("rising_temp_3")
.where(new IterativeCondition<SensorReading>() {
@Override
public boolean filter(SensorReading reading,
Context<SensorReading> ctx) {
if (reading.getTemperature() <= TEMP_THRESHOLD) return false;
for (SensorReading prev : ctx.getEventsForPattern("rising_temp_2")) {
return reading.getTemperature() > prev.getTemperature();
}
return false;
}
})
.followedBy("pressure_drop")
.where(new IterativeCondition<SensorReading>() {
@Override
public boolean filter(SensorReading reading,
Context<SensorReading> ctx) {
for (SensorReading prev : ctx.getEventsForPattern("rising_temp_1")) {
double pressureDiff = prev.getPressure() - reading.getPressure();
return pressureDiff > PRESSURE_DROP_THRESHOLD;
}
return false;
}
})
.within(Time.minutes(1));
// Apply pattern and process matches
PatternStream<SensorReading> patternStream =
CEP.pattern(sensorStream, anomalyPattern);
DataStream<String> anomalyAlerts = patternStream.process(
new PatternProcessFunction<SensorReading, String>() {
@Override
public void processMatch(Map<String, List<SensorReading>> match,
Context ctx,
Collector<String> out) {
SensorReading first = match.get("rising_temp_1").get(0);
SensorReading second = match.get("rising_temp_2").get(0);
SensorReading third = match.get("rising_temp_3").get(0);
SensorReading drop = match.get("pressure_drop").get(0);
String alert = String.format(
"ANOMALY DETECTED | Sensor: %s | Location: %s | " +
"Temps: %.1f -> %.1f -> %.1f (threshold: %.1f) | " +
"Pressure drop: %.1f -> %.1f (delta: %.1f)",
first.getSensorId(), first.getLocation(),
first.getTemperature(), second.getTemperature(),
third.getTemperature(), TEMP_THRESHOLD,
first.getPressure(), drop.getPressure(),
first.getPressure() - drop.getPressure()
);
out.collect(alert);
}
}
);
anomalyAlerts.print("IOT ALERT");
env.execute("IoT Sensor Anomaly Detection Pipeline");
}
}
next() (strict contiguity) for the three rising temperature readings — they must be consecutive. But we use followedBy() (relaxed) for the pressure drop, because other normal readings might occur between the temperature spike and the pressure change.
Hands-On: Stock Market Pattern Detection
Our third pipeline detects potential trading signals: a price drop greater than 5% followed by a high volume spike within 10 seconds. This pattern can indicate panic selling followed by institutional buying — a potential buy signal.
StockTick Event Class
package com.example.cep.events;
public class StockTick implements java.io.Serializable {
private String symbol;
private double price;
private long volume;
private long timestamp;
private double previousClose;
public StockTick() {}
public StockTick(String symbol, double price, long volume,
long timestamp, double previousClose) {
this.symbol = symbol;
this.price = price;
this.volume = volume;
this.timestamp = timestamp;
this.previousClose = previousClose;
}
public String getSymbol() { return symbol; }
public void setSymbol(String symbol) { this.symbol = symbol; }
public double getPrice() { return price; }
public void setPrice(double price) { this.price = price; }
public long getVolume() { return volume; }
public void setVolume(long volume) { this.volume = volume; }
public long getTimestamp() { return timestamp; }
public void setTimestamp(long timestamp) { this.timestamp = timestamp; }
public double getPreviousClose() { return previousClose; }
public void setPreviousClose(double pc) { this.previousClose = pc; }
public double getPriceChangePercent() {
if (previousClose == 0) return 0;
return ((price - previousClose) / previousClose) * 100.0;
}
@Override
public String toString() {
return String.format("StockTick{sym=%s, price=%.2f, vol=%d, change=%.2f%%}",
symbol, price, volume, getPriceChangePercent());
}
}
Complete Stock Market Detection Pipeline
package com.example.cep;
import com.example.cep.events.StockTick;
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.cep.CEP;
import org.apache.flink.cep.PatternStream;
import org.apache.flink.cep.functions.PatternProcessFunction;
import org.apache.flink.cep.pattern.Pattern;
import org.apache.flink.cep.pattern.conditions.IterativeCondition;
import org.apache.flink.cep.pattern.conditions.SimpleCondition;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.windowing.time.Time;
import org.apache.flink.util.Collector;
import java.time.Duration;
import java.util.*;
public class StockPatternDetectionPipeline {
private static final double PRICE_DROP_THRESHOLD = -5.0; // percent
private static final double VOLUME_SPIKE_MULTIPLIER = 3.0; // 3x average
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env =
StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(4);
env.enableCheckpointing(10_000);
// Assume a Kafka source producing StockTick JSON
// (using simulated source for this example)
DataStream<StockTick> tickStream = env
.addSource(new SimulatedStockSource())
.assignTimestampsAndWatermarks(
WatermarkStrategy
.<StockTick>forBoundedOutOfOrderness(Duration.ofSeconds(2))
.withTimestampAssigner((tick, ts) -> tick.getTimestamp())
)
.keyBy(StockTick::getSymbol);
// Pattern: Price drop > 5% followed by volume spike within 10 seconds
Pattern<StockTick, ?> buySignalPattern = Pattern
.<StockTick>begin("price_drop")
.where(new SimpleCondition<StockTick>() {
@Override
public boolean filter(StockTick tick) {
return tick.getPriceChangePercent() < PRICE_DROP_THRESHOLD;
}
})
.followedBy("volume_spike")
.where(new IterativeCondition<StockTick>() {
@Override
public boolean filter(StockTick tick, Context<StockTick> ctx) {
for (StockTick drop : ctx.getEventsForPattern("price_drop")) {
// Volume must be at least 3x the volume during the drop
if (tick.getVolume() > drop.getVolume() * VOLUME_SPIKE_MULTIPLIER) {
return true;
}
}
return false;
}
})
.within(Time.seconds(10));
// Apply pattern
PatternStream<StockTick> patternStream =
CEP.pattern(tickStream, buySignalPattern);
DataStream<String> signals = patternStream.process(
new PatternProcessFunction<StockTick, String>() {
@Override
public void processMatch(Map<String, List<StockTick>> match,
Context ctx,
Collector<String> out) {
StockTick drop = match.get("price_drop").get(0);
StockTick spike = match.get("volume_spike").get(0);
String signal = String.format(
"BUY SIGNAL | %s | Drop: %.2f%% (price $%.2f) | " +
"Volume spike: %d -> %d (%.1fx) | " +
"Current price: $%.2f",
drop.getSymbol(),
drop.getPriceChangePercent(),
drop.getPrice(),
drop.getVolume(),
spike.getVolume(),
(double) spike.getVolume() / drop.getVolume(),
spike.getPrice()
);
out.collect(signal);
}
}
);
signals.print("TRADING SIGNAL");
env.execute("Stock Market Pattern Detection Pipeline");
}
}
Advanced CEP Techniques
Once you have the basics working, these advanced techniques will take your CEP pipelines to production quality.
Dynamic Patterns from External Configuration
Hardcoding patterns is fine for getting started, but production systems need to update rules without redeploying. One approach is loading pattern parameters from an external source:
// Load thresholds from a configuration source
public class DynamicFraudPatterns {
public static Pattern<Transaction, ?> fromConfig(FraudRuleConfig config) {
return Pattern.<Transaction>begin("test_charge")
.where(new SimpleCondition<Transaction>() {
@Override
public boolean filter(Transaction tx) {
return tx.getAmount() >= config.getMinTestAmount()
&& tx.getAmount() <= config.getMaxTestAmount();
}
})
.followedBy("big_charge")
.where(new SimpleCondition<Transaction>() {
@Override
public boolean filter(Transaction tx) {
return tx.getAmount() >= config.getLargeTransactionThreshold();
}
})
.within(Time.minutes(config.getTimeWindowMinutes()));
}
}
// Configuration POJO loaded from database, file, or broadcast stream
public class FraudRuleConfig implements java.io.Serializable {
private double minTestAmount = 0.01;
private double maxTestAmount = 5.0;
private double largeTransactionThreshold = 1000.0;
private int timeWindowMinutes = 1;
// getters and setters...
}
Side Outputs for Timeout Handling
When a partial pattern match times out (the within() window expires before the pattern completes), you can capture these timed-out partial matches using TimedOutPartialMatchHandler:
import org.apache.flink.cep.functions.PatternProcessFunction;
import org.apache.flink.cep.functions.TimedOutPartialMatchHandler;
import org.apache.flink.util.OutputTag;
public class FraudAlertWithTimeout
extends PatternProcessFunction<Transaction, FraudAlert>
implements TimedOutPartialMatchHandler<Transaction> {
// Side output for timed-out partial matches
public static final OutputTag<String> TIMEOUT_TAG =
new OutputTag<String>("timed-out-patterns") {};
@Override
public void processMatch(Map<String, List<Transaction>> match,
Context ctx,
Collector<FraudAlert> out) {
// Process fully matched pattern (same as before)
// ...
}
@Override
public void processTimedOutMatch(Map<String, List<Transaction>> match,
Context ctx) {
// A partial match timed out — log it for analysis
StringBuilder sb = new StringBuilder("PARTIAL MATCH TIMEOUT: ");
for (Map.Entry<String, List<Transaction>> entry : match.entrySet()) {
sb.append(entry.getKey()).append("=")
.append(entry.getValue().size()).append(" events; ");
}
// Output to side output
ctx.output(TIMEOUT_TAG, sb.toString());
}
}
// In your pipeline, capture the side output:
SingleOutputStreamOperator<FraudAlert> alerts = patternStream
.process(new FraudAlertWithTimeout());
DataStream<String> timedOutPatterns = alerts
.getSideOutput(FraudAlertWithTimeout.TIMEOUT_TAG);
timedOutPatterns.print("TIMEOUT");
Scaling CEP Jobs
CEP pattern matching is stateful — the NFA maintains partial match buffers per key. Here are the scaling considerations:
- Key Partitioning: Always
keyBy()your stream before applying CEP patterns. This ensures events for the same entity (user, sensor, stock symbol) go to the same parallel instance. - Parallelism: Set parallelism based on your key cardinality. If you have 10,000 users, a parallelism of 8-16 is usually sufficient. Flink distributes keys across parallel instances using hash partitioning.
- State Size: Each active partial match consumes memory. If you have long time windows or high-cardinality patterns, monitor your state size carefully.
// Set different parallelism for different pipeline stages
DataStream<Transaction> transactions = env
.fromSource(kafkaSource, watermarkStrategy, "source")
.setParallelism(8) // match Kafka partitions
.map(json -> mapper.readValue(json, Transaction.class))
.setParallelism(8)
.keyBy(Transaction::getUserId);
// CEP pattern matching — can be different parallelism
PatternStream<Transaction> patternStream = CEP.pattern(
transactions.setParallelism(16), // more parallelism for CPU-heavy matching
fraudPattern
);
State Management and Checkpointing
import org.apache.flink.contrib.streaming.state.EmbeddedRocksDBStateBackend;
import org.apache.flink.streaming.api.CheckpointingMode;
import org.apache.flink.streaming.api.environment.CheckpointConfig;
// Configure robust checkpointing
env.setStateBackend(new EmbeddedRocksDBStateBackend());
env.enableCheckpointing(60_000, CheckpointingMode.EXACTLY_ONCE);
CheckpointConfig checkpointConfig = env.getCheckpointConfig();
checkpointConfig.setMinPauseBetweenCheckpoints(30_000);
checkpointConfig.setCheckpointTimeout(120_000);
checkpointConfig.setMaxConcurrentCheckpoints(1);
checkpointConfig.setTolerableCheckpointFailureNumber(3);
// Retain checkpoints on cancellation (for savepoint-like recovery)
checkpointConfig.setExternalizedCheckpointCleanup(
CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION
);
Event Time vs Processing Time
This distinction is absolutely critical for CEP. Event time is when the event actually happened (embedded in the event data). Processing time is when your Flink operator processes the event. In a perfect world, these would be identical. In reality, events arrive late, out of order, and at variable rates.
Why Event Time Matters for CEP
Consider a fraud detection pattern: “three transactions within 5 minutes.” If transaction #2 arrives at your system 10 seconds late due to network congestion, processing time would see a gap that does not actually exist. Event time correctly identifies that the three transactions occurred within the 5-minute window, regardless of when they arrived.
Watermark Strategies
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.api.common.eventtime.WatermarkGenerator;
import org.apache.flink.api.common.eventtime.WatermarkOutput;
import org.apache.flink.api.common.eventtime.WatermarkGeneratorSupplier;
// Strategy 1: Bounded out-of-orderness (most common)
// Assumes events can arrive up to 5 seconds late
WatermarkStrategy<Transaction> strategy1 = WatermarkStrategy
.<Transaction>forBoundedOutOfOrderness(Duration.ofSeconds(5))
.withTimestampAssigner((tx, recordTimestamp) -> tx.getTimestamp());
// Strategy 2: Monotonous timestamps (events always in order)
// Only use if you can guarantee ordering
WatermarkStrategy<Transaction> strategy2 = WatermarkStrategy
.<Transaction>forMonotonousTimestamps()
.withTimestampAssigner((tx, recordTimestamp) -> tx.getTimestamp());
// Strategy 3: Custom watermark generator for complex scenarios
WatermarkStrategy<Transaction> strategy3 = WatermarkStrategy
.<Transaction>forGenerator(context -> new WatermarkGenerator<Transaction>() {
private long maxTimestamp = Long.MIN_VALUE;
private static final long MAX_DELAY = 10_000L; // 10 seconds
@Override
public void onEvent(Transaction tx, long eventTimestamp,
WatermarkOutput output) {
maxTimestamp = Math.max(maxTimestamp, tx.getTimestamp());
}
@Override
public void onPeriodicEmit(WatermarkOutput output) {
output.emitWatermark(
new org.apache.flink.api.common.eventtime.Watermark(
maxTimestamp - MAX_DELAY
)
);
}
})
.withTimestampAssigner((tx, recordTimestamp) -> tx.getTimestamp());
forBoundedOutOfOrderness() with a 5-10 second bound is the right choice. Set it too low and you will miss late events. Set it too high and your pattern matching will be delayed by that amount, since Flink cannot process an event time window until the watermark passes it.
Connecting to Real Data Sources
Kafka Source Connector
Most production CEP pipelines read from Apache Kafka. Here is a complete, production-ready Kafka source setup:
import org.apache.flink.api.common.serialization.DeserializationSchema;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.connector.kafka.source.KafkaSource;
import org.apache.flink.connector.kafka.source.enumerator.initializer.OffsetsInitializer;
import com.fasterxml.jackson.databind.ObjectMapper;
// Custom deserializer for Transaction events
public class TransactionDeserializer
implements DeserializationSchema<Transaction> {
private transient ObjectMapper mapper;
@Override
public Transaction deserialize(byte[] message) {
if (mapper == null) mapper = new ObjectMapper();
try {
return mapper.readValue(message, Transaction.class);
} catch (Exception e) {
// Log and skip malformed events
System.err.println("Failed to deserialize: " + new String(message));
return null;
}
}
@Override
public boolean isEndOfStream(Transaction nextElement) {
return false;
}
@Override
public TypeInformation<Transaction> getProducedType() {
return TypeInformation.of(Transaction.class);
}
}
// Build the Kafka source
KafkaSource<Transaction> source = KafkaSource.<Transaction>builder()
.setBootstrapServers("kafka-broker-1:9092,kafka-broker-2:9092")
.setTopics("transactions")
.setGroupId("fraud-detection-v2")
.setStartingOffsets(OffsetsInitializer.latest())
.setValueOnlyDeserializer(new TransactionDeserializer())
.setProperty("security.protocol", "SASL_SSL")
.setProperty("sasl.mechanism", "PLAIN")
.setProperty("sasl.jaas.config",
"org.apache.kafka.common.security.plain.PlainLoginModule required " +
"username=\"api-key\" password=\"api-secret\";")
.build();
Kafka Sink for Alerts
import org.apache.flink.connector.kafka.sink.KafkaSink;
import org.apache.flink.connector.kafka.sink.KafkaRecordSerializationSchema;
import org.apache.flink.api.common.serialization.SimpleStringSchema;
import org.apache.flink.connector.base.DeliveryGuarantee;
KafkaSink<String> alertSink = KafkaSink.<String>builder()
.setBootstrapServers("kafka-broker-1:9092")
.setRecordSerializer(
KafkaRecordSerializationSchema.builder()
.setTopic("fraud-alerts")
.setValueSerializationSchema(new SimpleStringSchema())
.build()
)
.setDeliveryGuarantee(DeliveryGuarantee.EXACTLY_ONCE)
.setTransactionalIdPrefix("fraud-alert-sink")
.build();
// Wire it up
allAlerts
.map(alert -> mapper.writeValueAsString(alert))
.sinkTo(alertSink);
JDBC Connector for Enrichment
You might want to enrich events with data from a database (for example, looking up a customer’s risk score before applying CEP patterns). Flink’s async I/O is ideal for this:
import org.apache.flink.streaming.api.functions.async.AsyncFunction;
import org.apache.flink.streaming.api.functions.async.ResultFuture;
import org.apache.flink.streaming.api.datastream.AsyncDataStream;
import java.util.concurrent.TimeUnit;
// Async enrichment function
public class CustomerEnrichment
extends RichAsyncFunction<Transaction, EnrichedTransaction> {
private transient DataSource dataSource;
@Override
public void open(Configuration parameters) {
// Initialize connection pool
dataSource = createConnectionPool();
}
@Override
public void asyncInvoke(Transaction tx,
ResultFuture<EnrichedTransaction> resultFuture) {
CompletableFuture.supplyAsync(() -> {
try (Connection conn = dataSource.getConnection();
PreparedStatement stmt = conn.prepareStatement(
"SELECT risk_score, account_age FROM customers WHERE id = ?")) {
stmt.setString(1, tx.getUserId());
ResultSet rs = stmt.executeQuery();
if (rs.next()) {
return new EnrichedTransaction(tx,
rs.getDouble("risk_score"),
rs.getInt("account_age"));
}
return new EnrichedTransaction(tx, 0.5, 0);
} catch (Exception e) {
return new EnrichedTransaction(tx, 0.5, 0);
}
}).thenAccept(result -> resultFuture.complete(
Collections.singleton(result)));
}
}
// Apply async enrichment before CEP
DataStream<EnrichedTransaction> enriched = AsyncDataStream
.unorderedWait(
transactionStream,
new CustomerEnrichment(),
30, TimeUnit.SECONDS, // timeout
100 // max concurrent requests
);
Flink also supports connectors for Apache Pulsar, Amazon Kinesis, and many other systems through its connector ecosystem. The setup is similar — define a source, assign watermarks, and feed the stream into your CEP patterns.
Deploying and Monitoring
Running Locally for Development
The simplest way to develop is running directly in your IDE. Flink will create a local mini-cluster:
// This works out of the box in your IDE
StreamExecutionEnvironment env =
StreamExecutionEnvironment.getExecutionEnvironment();
// Flink automatically creates a local mini-cluster
Docker Compose for Local Flink + Kafka
For integration testing, use this Docker Compose setup to run Flink and Kafka locally:
# docker-compose.yml
version: '3.8'
services:
zookeeper:
image: confluentinc/cp-zookeeper:7.5.3
environment:
ZOOKEEPER_CLIENT_PORT: 2181
ports:
- "2181:2181"
kafka:
image: confluentinc/cp-kafka:7.5.3
depends_on:
- zookeeper
ports:
- "9092:9092"
environment:
KAFKA_BROKER_ID: 1
KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181
KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://localhost:9092
KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1
KAFKA_AUTO_CREATE_TOPICS_ENABLE: "true"
flink-jobmanager:
image: flink:1.18.1-java17
ports:
- "8081:8081" # Flink Web UI
command: jobmanager
environment:
FLINK_PROPERTIES: |
jobmanager.rpc.address: flink-jobmanager
state.backend: rocksdb
state.checkpoints.dir: file:///tmp/flink-checkpoints
state.savepoints.dir: file:///tmp/flink-savepoints
flink-taskmanager:
image: flink:1.18.1-java17
depends_on:
- flink-jobmanager
command: taskmanager
scale: 2 # Run 2 task managers
environment:
FLINK_PROPERTIES: |
jobmanager.rpc.address: flink-jobmanager
taskmanager.numberOfTaskSlots: 4
taskmanager.memory.process.size: 2048m
Deploying to a Flink Cluster
Build your fat JAR and submit it to the cluster:
# Build the fat JAR
mvn clean package -DskipTests
# Submit to standalone cluster
./bin/flink run \
-c com.example.cep.FraudDetectionPipeline \
target/flink-cep-pipeline-1.0.0.jar
# Submit to YARN cluster
./bin/flink run -m yarn-cluster \
-yn 4 \ # 4 TaskManagers
-ys 8 \ # 8 slots per TaskManager
-yjm 2048m \ # JobManager memory
-ytm 4096m \ # TaskManager memory
-c com.example.cep.FraudDetectionPipeline \
target/flink-cep-pipeline-1.0.0.jar
# Submit to Kubernetes (using Flink Kubernetes Operator)
kubectl apply -f flink-cep-deployment.yaml
Monitoring Your Pipeline
The Flink Web UI (default port 8081) is your primary monitoring interface. Key metrics to watch:
- Checkpoint Duration: If checkpoints take longer than your interval, you will see cascading delays. Keep checkpoint duration under 50% of the checkpoint interval.
- Backpressure: When a downstream operator cannot keep up, backpressure propagates upstream. The Web UI shows this with color-coded task states — red means trouble.
- Throughput (records/second): Monitor input and output rates for each operator. A sudden drop in output with constant input suggests a processing bottleneck.
- State Size: CEP patterns maintain partial match buffers. Watch state size grow over time — unbounded growth indicates a pattern or key space issue.
Performance Optimization
Getting a CEP pipeline to work is one thing. Getting it to handle production volumes efficiently is another. Here are the key tuning levers.
Choosing the Right Parallelism
Parallelism controls how many parallel instances of each operator Flink runs. For CEP pipelines:
- Source parallelism: Match the number of Kafka partitions. If your topic has 16 partitions, set source parallelism to 16.
- CEP operator parallelism: This depends on your key cardinality and pattern complexity. Start with the same parallelism as your source, then increase if you see backpressure on the CEP operator.
- Sink parallelism: Usually lower than CEP parallelism since alert volume is much lower than input volume.
State Backend Selection
| State Backend | State Size | Speed | Best For |
|---|---|---|---|
| HashMapStateBackend (Heap) | Limited by JVM heap | Fastest | Small state, low latency requirements |
| EmbeddedRocksDBStateBackend | Limited by disk | Slower (disk I/O) | Large state, long time windows |
For CEP specifically: if your patterns have short time windows (seconds to minutes) and moderate key cardinality, the heap state backend is fine. For long time windows (hours) or millions of keys with active partial matches, RocksDB is safer.
Recommended Settings by Use Case
| Setting | Fraud Detection | IoT Monitoring | Market Data |
|---|---|---|---|
| Parallelism | 8–32 | 4–16 | 16–64 |
| Checkpoint Interval | 60s | 30s | 10s |
| State Backend | RocksDB | Heap or RocksDB | Heap |
| Watermark Bound | 5s | 3s | 1s |
| TaskManager Memory | 4–8 GB | 2–4 GB | 8–16 GB |
| Serialization | Avro or Protobuf | Avro | Protobuf (smallest size) |
Serialization Considerations
Flink’s default Java serialization is slow and produces large state snapshots. For production CEP pipelines, register your event types with Flink’s type system or use efficient serialization:
// Register types for efficient serialization
env.getConfig().registerTypeWithKryoSerializer(
Transaction.class, ProtobufSerializer.class);
// Or use Flink's POJO serialization (automatic for well-formed POJOs)
// Ensure your classes:
// 1. Have a no-arg constructor
// 2. Have public getters/setters for all fields
// 3. Implement Serializable
// For Avro serialization, use Flink's Avro format
// Add dependency: flink-avro
// Then use AvroDeserializationSchema:
import org.apache.flink.formats.avro.AvroDeserializationSchema;
KafkaSource<Transaction> avroSource = KafkaSource.<Transaction>builder()
.setBootstrapServers("localhost:9092")
.setTopics("transactions-avro")
.setGroupId("fraud-detection")
.setValueOnlyDeserializer(
AvroDeserializationSchema.forSpecific(Transaction.class))
.build();
Common Pitfalls and Troubleshooting
Here are the issues that trip up most developers when building Flink CEP pipelines:
| Problem | Cause | Solution |
|---|---|---|
| Pattern never matches | Events arrive out of order; within() window too tight; using next() when followedBy() is needed |
Check event ordering, increase time window, switch contiguity mode |
| Too many matches (false positives) | Pattern conditions too loose; using followedByAny() generating combinatorial explosion |
Add tighter conditions, switch to followedBy(), shorten time window |
| OutOfMemoryError | Large NFA state from long time windows, high key cardinality, or followedByAny() with oneOrMore() |
Switch to RocksDB state backend, shorten time windows, add until() conditions |
| Checkpoint failures | State too large to snapshot within timeout; backpressure causing delays | Increase checkpoint timeout, enable incremental checkpointing with RocksDB, reduce state size |
| Watermark stalling (no progress) | One Kafka partition has no data — its watermark stays at Long.MIN_VALUE, blocking global watermark |
Use withIdleness(Duration.ofMinutes(1)) on watermark strategy |
| Duplicate alerts after restart | Reprocessing events without checkpointed state | Always restart from savepoint/checkpoint, enable exactly-once on sinks |
| ClassNotFoundException at runtime | flink-cep not in the fat JAR; marked as provided by mistake |
Ensure flink-cep is not marked as provided — only flink-streaming-java and flink-clients should be |
Fixing Watermark Stalling
This is one of the most frustrating issues. If one Kafka partition stops producing events, its watermark stays at negative infinity, which blocks the global watermark for the entire job. The fix is simple:
WatermarkStrategy<Transaction> strategy = WatermarkStrategy
.<Transaction>forBoundedOutOfOrderness(Duration.ofSeconds(5))
.withTimestampAssigner((tx, ts) -> tx.getTimestamp())
.withIdleness(Duration.ofMinutes(1)); // Mark source as idle after 1 min
Debugging Pattern Matches
When patterns are not matching as expected, add a pass-through select before your CEP to verify events are flowing and correctly keyed:
// Debug: print events as they enter the CEP operator
transactions
.map(tx -> {
System.out.println("CEP INPUT: " + tx);
return tx;
})
.keyBy(Transaction::getUserId);
// Also: check that your conditions actually match
// by testing them in a unit test
@Test
public void testFraudCondition() {
Transaction tx = new Transaction("1", "user1", 600.0,
System.currentTimeMillis(), "NYC", "electronics", "1234");
assertTrue(tx.getAmount() > 500.0); // Verify condition logic
}
Conclusion
Complex Event Processing with Apache Flink gives you the ability to detect sophisticated patterns across millions of events per second with millisecond latency and exactly-once guarantees. We have covered a lot of ground in this guide — from the fundamentals of CEP and the Flink pattern API to three complete, production-style pipelines for fraud detection, IoT monitoring, and financial market analysis.
The key takeaways to remember:
- Choose the right contiguity:
next()for strict sequences,followedBy()for relaxed matching, andfollowedByAny()sparingly (it is expensive). - Always use event time with proper watermark strategies. Processing time will give you incorrect pattern matches in any real-world system where events arrive out of order.
- Key your streams: CEP patterns should almost always be applied to keyed streams so patterns match within a logical entity (user, sensor, stock symbol).
- Handle timeouts: Implement
TimedOutPartialMatchHandlerto capture and analyze partial matches that do not complete within the time window. - Monitor state size: CEP is inherently stateful. Use RocksDB for large state, keep time windows as short as possible, and watch for combinatorial explosion with non-deterministic patterns.
- Start simple, iterate: Begin with a single pattern on a small data sample. Verify it works correctly before adding complexity or scaling up.
Flink’s CEP library is one of the most powerful pattern-matching engines available in the open-source ecosystem. With the patterns and techniques in this guide, you have everything you need to build your first production CEP pipeline. Start with the fraud detection example, adapt it to your domain, and scale from there.
References
- Apache Flink CEP Documentation (v1.18)
- Flink Event Time and Watermarks Guide
- Flink Kafka Connector Documentation
- Flink State Backends Reference
- Flink Docker Deployment Guide
- Stream Processing with Apache Flink (O’Reilly) by Fabian Hueske and Vasiliki Kalavri
- Apache Flink: Stream and Batch Processing in a Single Engine — original Flink research paper
Leave a Reply