Summary
What this post covers: A production-style guide to building Complex Event Processing pipelines with Apache Flink, including the Pattern API, three end-to-end Java examples (credit card fraud, IoT anomaly, stock pattern detection), event-time handling, Kafka connectors, deployment, and performance tuning.
Key insights:
- CEP is fundamentally different from batch or per-event stream processing: it maintains stateful NFA pattern buffers across event sequences, which is why batch jobs and Kafka Streams cannot replace it for fraud detection or multi-step anomaly correlation.
- Pattern contiguity choice dominates correctness and cost: use
next()for strict sequences,followedBy()for relaxed matching, and avoidfollowedByAny()except when truly needed because it triggers combinatorial state growth. - Always drive CEP on event time with proper watermark strategies—processing time produces incorrect matches in any real system where events arrive out of order, and this single mistake breaks more production CEP jobs than any other.
- Apply patterns to keyed streams so matches stay scoped to a logical entity (user, sensor, symbol); patterns on non-keyed streams quickly explode in state size and produce nonsensical cross-entity matches.
- CEP is inherently stateful, so production readiness depends on RocksDB state backend, short time windows,
TimedOutPartialMatchHandlerto catch incomplete sequences, and active monitoring of state size to prevent runaway memory growth.
Main topics: What is Complex Event Processing (CEP)?, Why Apache Flink for CEP?, Setting Up Your Flink CEP Project, Understanding Flink CEP Pattern API, Hands-On Credit Card Fraud Detection, Hands-On IoT Sensor Anomaly Detection, Hands-On Stock Market Pattern Detection, Advanced CEP Techniques, Event Time vs Processing Time, Connecting to Real Data Sources, Deploying and Monitoring, Performance Optimization, Common Pitfalls and Troubleshooting, Final Thoughts, References.
Consider a scenario in which a single credit card is used at a gas station in Houston at 2:13 PM, and forty seconds later the same card number appears at an electronics store in Tokyo. Within those forty seconds, a payment-fraud system must ingest both events, correlate them across millions of concurrent transaction streams, recognise the physical impossibility, and emit a fraud alert before the Tokyo merchant finishes printing the receipt. The scenario is far from hypothetical. Visa processes more than 65,000 transactions per second at peak, and the speed of fraudulent activity continues to increase year on year. Traditional batch jobs executed overnight are of little value in such conditions. Complex Event Processing is required, and Apache Flink is among the strongest engines on which to implement it.
This guide presents the construction of real-time CEP pipelines from first principles. Rather than illustrative fragments, it provides complete, compilable Java code suitable for adaptation to production fraud detection, IoT monitoring, and financial market analysis. By the end of the guide, the reader will understand Flink’s CEP library in sufficient depth to design pattern-matching pipelines for any domain.
What is Complex Event Processing (CEP)?
Complex Event Processing is a methodology for detecting meaningful patterns across streams of events in real time. The defining term is patterns. Simple stream processing typically filters or transforms individual events, for example by returning all transactions above $1,000. CEP extends this scope by examining sequences, combinations, and temporal relationships across multiple events.
Simple Events vs Complex Events
A simple event is a single, atomic occurrence such as a temperature reading, a stock trade, or a log entry. A complex event is a higher-level pattern derived from multiple simple events. For example:
- Simple event: “User #4821 made a $50 purchase at Starbucks.”
- Complex event: “User #4821 made three purchases totalling over $2,000 within five minutes from three different countries.” This complex event exists only because a CEP engine recognised the pattern across the underlying simple events.
CEP Compared with Traditional Processing
Understanding where CEP fits relative to batch and stream processing is important:
| Feature | Batch Processing | Stream Processing | CEP |
|---|---|---|---|
| Latency | Minutes to hours | Milliseconds to seconds | Milliseconds to seconds |
| Data Model | Bounded datasets | Unbounded streams | Unbounded streams with pattern state |
| Pattern Detection | Post-hoc analysis | Per-event transformations | Multi-event temporal patterns |
| State Management | Minimal (reprocess from scratch) | Windowed aggregations | Pattern match buffers with NFA |
| Use Case Example | Monthly reports | Real-time dashboards | Fraud detection, anomaly sequences |
| Tools | Spark, Hadoop MapReduce | Kafka Streams, Flink DataStream | Flink CEP, Esper, Siddhi |
Real-World CEP Applications
CEP is not a niche technology. It underpins a number of important systems across industries:
- Fraud Detection: Banks and payment processors use CEP to identify fraudulent transaction patterns in real time, including velocity checks, geographic impossibility, and unusual merchant categories.
- IoT Monitoring: Manufacturing plants and smart buildings use CEP to detect equipment failure sequences before catastrophic breakdowns occur. For the data infrastructure behind IoT monitoring, see the guide on managing metadata and time-series data for facility sensor signals.
- Algorithmic Trading: Hedge funds detect price-volume patterns across multiple securities within microsecond windows in order to trigger automated trades.
- Network Security: SIEM platforms use CEP to correlate firewall logs, authentication events, and data transfer patterns and thereby detect multi-stage cyberattacks.
- Supply Chain: Real-time tracking of shipment events allows operators to detect delays, rerouting needs, or customs anomalies before they cascade.
Why Apache Flink for CEP?
Several stream processing engines exist, but Flink occupies a distinct position for CEP workloads. The reasons are discussed below.
Flink’s Architecture for CEP
Flink was designed as a streaming-first engine. Unlike Spark, which added streaming capabilities to a batch framework, Flink treats streams as the fundamental data model. The distinction is consequential for CEP for several reasons:
- DataStream API: The core API operates on unbounded streams and offers fine-grained control over event processing, keying, and windowing.
- Event Time Processing: Flink natively supports event time semantics with watermarks, a feature that is essential for CEP. Matching patterns across events requires reasoning about when events actually occurred, not when they arrived at the processing system.
- Watermarks: The watermark mechanism tracks the progress of event time through the stream and enables correct handling of out-of-order events, which are a routine occurrence in distributed systems.
- Flink CEP Library (
flink-cep): Flink ships a dedicated CEP library that implements a Non-deterministic Finite Automaton (NFA) for pattern matching. Patterns are defined declaratively, and the engine handles the associated state management internally. - Exactly-Once Semantics: The checkpointing mechanism guarantees exactly-once processing, ensuring that fraud alerts are never duplicated or lost.
- Low Latency: Flink processes events within milliseconds rather than in micro-batches. For CEP workloads, where rapid pattern matching is essential, this property is non-negotiable.
Flink CEP Compared with Alternative Engines
| Feature | Flink CEP | Kafka Streams | Esper | Spark Structured Streaming | Kinesis Analytics |
|---|---|---|---|---|---|
| Pattern Matching | Built-in NFA-based | Manual (no CEP library) | EPL query language | No native CEP | SQL-based only |
| Latency | True streaming (ms) | True streaming (ms) | In-memory (ms) | Micro-batch (100ms+) | Near real-time |
| Scalability | Distributed cluster | Embedded scaling | Single JVM | Distributed cluster | AWS managed |
| Exactly-Once | Yes | Yes | No | Yes | Yes |
| Fault Tolerance | Checkpointing + savepoints | Changelog topics | Limited | Checkpointing | Managed snapshots |
| Event Time Support | Native watermarks | Timestamp extractors | Limited | Native watermarks | Limited |
| Best For | Complex temporal patterns at scale | Simple event-driven microservices | Prototyping, embedded CEP | Batch + streaming hybrid | AWS-native SQL analytics |
Setting Up Your Flink CEP Project
Prerequisites
Before any code is written, the following components should be in place:
- Java 11 or 17 (Flink 1.18+ supports both; Java 17 is recommended for new projects)
- Maven 3.8+ or Gradle 7+
- An IDE—IntelliJ IDEA with the Flink plugin is well suited
- Docker (optional, for running Kafka and Flink locally)
Project Structure
The following layout is used throughout this guide:
flink-cep-pipeline/
├── pom.xml
├── src/main/java/com/example/cep/
│ ├── FlinkCEPApplication.java
│ ├── events/
│ │ ├── Transaction.java
│ │ ├── SensorReading.java
│ │ └── StockTick.java
│ ├── patterns/
│ │ ├── FraudPatterns.java
│ │ ├── IoTPatterns.java
│ │ └── StockPatterns.java
│ ├── processors/
│ │ ├── FraudAlertProcessor.java
│ │ ├── AnomalyAlertProcessor.java
│ │ └── TradingSignalProcessor.java
│ └── sources/
│ └── KafkaSourceBuilder.java
└── src/main/resources/
└── log4j2.properties
Maven pom.xml
The following Maven configuration contains all required Flink CEP dependencies:
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0
http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<groupId>com.example</groupId>
<artifactId>flink-cep-pipeline</artifactId>
<version>1.0.0</version>
<packaging>jar</packaging>
<properties>
<flink.version>1.18.1</flink.version>
<java.version>17</java.version>
<kafka.version>3.6.1</kafka.version>
<maven.compiler.source>${java.version}</maven.compiler.source>
<maven.compiler.target>${java.version}</maven.compiler.target>
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
</properties>
<dependencies>
<!-- Flink Core -->
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-streaming-java</artifactId>
<version>${flink.version}</version>
<scope>provided</scope>
</dependency>
<!-- Flink CEP Library -->
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-cep</artifactId>
<version>${flink.version}</version>
</dependency>
<!-- Flink Kafka Connector -->
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-connector-kafka</artifactId>
<version>3.1.0-1.18</version>
</dependency>
<!-- Flink JSON Format -->
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-json</artifactId>
<version>${flink.version}</version>
</dependency>
<!-- Flink Clients (for local execution) -->
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-clients</artifactId>
<version>${flink.version}</version>
<scope>provided</scope>
</dependency>
<!-- Jackson for JSON serialization -->
<dependency>
<groupId>com.fasterxml.jackson.core</groupId>
<artifactId>jackson-databind</artifactId>
<version>2.16.1</version>
</dependency>
<!-- SLF4J + Log4j2 -->
<dependency>
<groupId>org.apache.logging.log4j</groupId>
<artifactId>log4j-slf4j-impl</artifactId>
<version>2.22.1</version>
<scope>runtime</scope>
</dependency>
<dependency>
<groupId>org.apache.logging.log4j</groupId>
<artifactId>log4j-api</artifactId>
<version>2.22.1</version>
<scope>runtime</scope>
</dependency>
<dependency>
<groupId>org.apache.logging.log4j</groupId>
<artifactId>log4j-core</artifactId>
<version>2.22.1</version>
<scope>runtime</scope>
</dependency>
</dependencies>
<build>
<plugins>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-shade-plugin</artifactId>
<version>3.5.1</version>
<executions>
<execution>
<phase>package</phase>
<goals><goal>shade</goal></goals>
<configuration>
<transformers>
<transformer implementation=
"org.apache.maven.plugins.shade.resource.ManifestResourceTransformer">
<mainClass>com.example.cep.FlinkCEPApplication</mainClass>
</transformer>
</transformers>
</configuration>
</execution>
</executions>
</plugin>
</plugins>
</build>
</project>
Gradle Alternative
For Gradle users, the equivalent build.gradle.kts is shown below:
plugins {
java
id("com.github.johnrengelman.shadow") version "8.1.1"
}
java {
sourceCompatibility = JavaVersion.VERSION_17
targetCompatibility = JavaVersion.VERSION_17
}
val flinkVersion = "1.18.1"
dependencies {
compileOnly("org.apache.flink:flink-streaming-java:$flinkVersion")
compileOnly("org.apache.flink:flink-clients:$flinkVersion")
implementation("org.apache.flink:flink-cep:$flinkVersion")
implementation("org.apache.flink:flink-connector-kafka:3.1.0-1.18")
implementation("org.apache.flink:flink-json:$flinkVersion")
implementation("com.fasterxml.jackson.core:jackson-databind:2.16.1")
runtimeOnly("org.apache.logging.log4j:log4j-slf4j-impl:2.22.1")
runtimeOnly("org.apache.logging.log4j:log4j-core:2.22.1")
}
flink-streaming-java and flink-clients dependencies are marked as provided (Maven) or compileOnly (Gradle) because the Flink cluster already includes them. When running locally in an IDE, add them to the run configuration’s classpath.
Understanding Flink CEP Pattern API
The Flink CEP library provides a declarative API for defining event patterns. Internally, the library compiles each pattern definition into a Non-deterministic Finite Automaton (NFA) that matches patterns efficiently against the incoming event stream. Each major concept is examined in turn below.
Pattern Basics
Every pattern starts with Pattern.begin() and chains additional states:
// Strict contiguity: events must be directly adjacent
Pattern<Event, ?> strict = Pattern.<Event>begin("start")
.where(new SimpleCondition<Event>() {
@Override
public boolean filter(Event event) {
return event.getType().equals("login_failed");
}
})
.next("second") // MUST be the very next event
.where(new SimpleCondition<Event>() {
@Override
public boolean filter(Event event) {
return event.getType().equals("login_failed");
}
})
.next("third")
.where(new SimpleCondition<Event>() {
@Override
public boolean filter(Event event) {
return event.getType().equals("login_failed");
}
});
// Relaxed contiguity: allows non-matching events in between
Pattern<Event, ?> relaxed = Pattern.<Event>begin("start")
.where(/* ... */)
.followedBy("end") // matching events can have other events between them
.where(/* ... */);
// Non-deterministic relaxed contiguity:
// matches all possible combinations
Pattern<Event, ?> nonDeterministic = Pattern.<Event>begin("start")
.where(/* ... */)
.followedByAny("end") // considers ALL matching events, not just first
.where(/* ... */);
Contiguity: Strict, Relaxed, Non-Deterministic
Contiguity is one of the most important concepts in Flink CEP. Consider a scenario in which the event stream contains A, C, B1, B2 and the pattern is “A followed by B”:
next()—Strict: No match. C appears between A and B1, which breaks strict contiguity.followedBy()—Relaxed: Matches{A, B1}. C is skipped, and the first matching B is selected.followedByAny()—Non-deterministic relaxed: Matches both{A, B1}and{A, B2}, since all possible matching events are considered.
Quantifiers
// Exactly N times
Pattern<Event, ?> exactly3 = Pattern.<Event>begin("failures")
.where(condition)
.times(3); // exactly 3 matching events
// N or more times
Pattern<Event, ?> atLeast3 = Pattern.<Event>begin("failures")
.where(condition)
.timesOrMore(3); // 3 or more matching events
// Range
Pattern<Event, ?> range = Pattern.<Event>begin("failures")
.where(condition)
.times(2, 5); // between 2 and 5 matching events
// One or more (greedy)
Pattern<Event, ?> oneOrMore = Pattern.<Event>begin("failures")
.where(condition)
.oneOrMore()
.greedy(); // match as many as possible
// Optional
Pattern<Event, ?> withOptional = Pattern.<Event>begin("start")
.where(startCondition)
.next("middle")
.where(middleCondition)
.optional() // this state may or may not match
.next("end")
.where(endCondition);
Conditions
// Simple condition — checks current event only
.where(new SimpleCondition<Event>() {
@Override
public boolean filter(Event event) {
return event.getAmount() > 1000.0;
}
})
// Iterative condition — can reference previously matched events
.where(new IterativeCondition<Event>() {
@Override
public boolean filter(Event event, Context<Event> ctx) {
// Compare with previously matched event
for (Event prev : ctx.getEventsForPattern("start")) {
if (!event.getLocation().equals(prev.getLocation())) {
return true; // different location than start event
}
}
return false;
}
})
// OR condition
.where(new SimpleCondition<Event>() {
@Override
public boolean filter(Event event) {
return event.getType().equals("withdrawal");
}
})
.or(new SimpleCondition<Event>() {
@Override
public boolean filter(Event event) {
return event.getType().equals("transfer");
}
})
// Until condition (stop condition for looping patterns)
.oneOrMore()
.until(new SimpleCondition<Event>() {
@Override
public boolean filter(Event event) {
return event.getType().equals("logout");
}
})
Time Constraints
// The entire pattern must complete within 5 minutes
Pattern<Event, ?> timedPattern = Pattern.<Event>begin("first")
.where(/* ... */)
.followedBy("second")
.where(/* ... */)
.followedBy("third")
.where(/* ... */)
.within(Time.minutes(5));
within() constraint applies to the entire pattern and is measured from the first matching event. If the first event matches at T=0 and within(Time.minutes(5)) is configured, the entire pattern must complete before T=5min. Partially matched patterns that time out are discarded, although they may be captured via timeout handling, which is discussed later.
Hands-On: Credit Card Fraud Detection Pipeline
The first complete CEP pipeline considered here is a credit card fraud detection system. The use case is canonical for CEP, and three distinct fraud patterns are implemented.
The Transaction Event Class
package com.example.cep.events;
public class Transaction implements java.io.Serializable {
private String transactionId;
private String userId;
private double amount;
private long timestamp;
private String location;
private String merchantCategory;
private String cardNumber;
// Default constructor for serialization
public Transaction() {}
public Transaction(String transactionId, String userId, double amount,
long timestamp, String location, String merchantCategory,
String cardNumber) {
this.transactionId = transactionId;
this.userId = userId;
this.amount = amount;
this.timestamp = timestamp;
this.location = location;
this.merchantCategory = merchantCategory;
this.cardNumber = cardNumber;
}
// Getters and setters
public String getTransactionId() { return transactionId; }
public void setTransactionId(String transactionId) { this.transactionId = transactionId; }
public String getUserId() { return userId; }
public void setUserId(String userId) { this.userId = userId; }
public double getAmount() { return amount; }
public void setAmount(double amount) { this.amount = amount; }
public long getTimestamp() { return timestamp; }
public void setTimestamp(long timestamp) { this.timestamp = timestamp; }
public String getLocation() { return location; }
public void setLocation(String location) { this.location = location; }
public String getMerchantCategory() { return merchantCategory; }
public void setMerchantCategory(String mc) { this.merchantCategory = mc; }
public String getCardNumber() { return cardNumber; }
public void setCardNumber(String cardNumber) { this.cardNumber = cardNumber; }
@Override
public String toString() {
return String.format("Transaction{id=%s, user=%s, amount=%.2f, loc=%s, time=%d}",
transactionId, userId, amount, location, timestamp);
}
}
The Fraud Alert Class
package com.example.cep.events;
import java.util.List;
public class FraudAlert implements java.io.Serializable {
private String alertId;
private String userId;
private String patternType;
private String description;
private List<Transaction> matchedTransactions;
private long detectedAt;
public FraudAlert(String alertId, String userId, String patternType,
String description, List<Transaction> matchedTransactions) {
this.alertId = alertId;
this.userId = userId;
this.patternType = patternType;
this.description = description;
this.matchedTransactions = matchedTransactions;
this.detectedAt = System.currentTimeMillis();
}
// Getters
public String getAlertId() { return alertId; }
public String getUserId() { return userId; }
public String getPatternType() { return patternType; }
public String getDescription() { return description; }
public List<Transaction> getMatchedTransactions() { return matchedTransactions; }
public long getDetectedAt() { return detectedAt; }
@Override
public String toString() {
return String.format("FRAUD ALERT [%s] User: %s | Pattern: %s | %s | Transactions: %d",
alertId, userId, patternType, description, matchedTransactions.size());
}
}
Defining Fraud Patterns
The core logic of the system is captured by three fraud detection patterns, defined below:
package com.example.cep.patterns;
import com.example.cep.events.Transaction;
import org.apache.flink.cep.pattern.Pattern;
import org.apache.flink.cep.pattern.conditions.IterativeCondition;
import org.apache.flink.cep.pattern.conditions.SimpleCondition;
import org.apache.flink.streaming.api.windowing.time.Time;
public class FraudPatterns {
/**
* Pattern 1: Geographic Impossibility
* Three transactions over $500 within 5 minutes from different locations.
* Spending observed in New York, then London, then Tokyo within 5 minutes
* is highly indicative of fraudulent activity.
*/
public static Pattern<Transaction, ?> geographicImpossibility() {
return Pattern.<Transaction>begin("first")
.where(new SimpleCondition<Transaction>() {
@Override
public boolean filter(Transaction tx) {
return tx.getAmount() > 500.0;
}
})
.followedBy("second")
.where(new IterativeCondition<Transaction>() {
@Override
public boolean filter(Transaction tx, Context<Transaction> ctx) {
if (tx.getAmount() <= 500.0) return false;
for (Transaction first : ctx.getEventsForPattern("first")) {
if (!tx.getLocation().equals(first.getLocation())) {
return true;
}
}
return false;
}
})
.followedBy("third")
.where(new IterativeCondition<Transaction>() {
@Override
public boolean filter(Transaction tx, Context<Transaction> ctx) {
if (tx.getAmount() <= 500.0) return false;
for (Transaction first : ctx.getEventsForPattern("first")) {
for (Transaction second : ctx.getEventsForPattern("second")) {
if (!tx.getLocation().equals(first.getLocation())
&& !tx.getLocation().equals(second.getLocation())) {
return true;
}
}
}
return false;
}
})
.within(Time.minutes(5));
}
/**
* Pattern 2: Card Testing Attack
* A small "test" transaction ($0.01–$5.00) followed by a large transaction
* ($1000+) within 1 minute. Fraudsters frequently test stolen cards with
* very small purchases before attempting larger ones.
*/
public static Pattern<Transaction, ?> cardTestingAttack() {
return Pattern.<Transaction>begin("test_charge")
.where(new SimpleCondition<Transaction>() {
@Override
public boolean filter(Transaction tx) {
return tx.getAmount() >= 0.01 && tx.getAmount() <= 5.0;
}
})
.followedBy("big_charge")
.where(new SimpleCondition<Transaction>() {
@Override
public boolean filter(Transaction tx) {
return tx.getAmount() >= 1000.0;
}
})
.within(Time.minutes(1));
}
/**
* Pattern 3: Transaction Velocity
* More than 5 transactions within 2 minutes. Even legitimate users
* rarely conduct this many purchases in such a short interval.
*/
public static Pattern<Transaction, ?> highVelocity() {
return Pattern.<Transaction>begin("transactions")
.where(new SimpleCondition<Transaction>() {
@Override
public boolean filter(Transaction tx) {
return tx.getAmount() > 0;
}
})
.timesOrMore(5)
.within(Time.minutes(2));
}
}
Processing Matched Patterns
package com.example.cep.processors;
import com.example.cep.events.FraudAlert;
import com.example.cep.events.Transaction;
import org.apache.flink.cep.functions.PatternProcessFunction;
import org.apache.flink.util.Collector;
import java.util.*;
public class FraudAlertProcessor
extends PatternProcessFunction<Transaction, FraudAlert> {
private final String patternType;
public FraudAlertProcessor(String patternType) {
this.patternType = patternType;
}
@Override
public void processMatch(Map<String, List<Transaction>> match,
Context ctx,
Collector<FraudAlert> out) {
// Collect all matched transactions from all pattern states
List<Transaction> allTransactions = new ArrayList<>();
match.values().forEach(allTransactions::addAll);
// Extract user ID from first transaction
String userId = allTransactions.get(0).getUserId();
// Build a description
String description = buildDescription(match);
// Generate alert
String alertId = UUID.randomUUID().toString();
FraudAlert alert = new FraudAlert(
alertId, userId, patternType, description, allTransactions
);
out.collect(alert);
}
private String buildDescription(Map<String, List<Transaction>> match) {
StringBuilder sb = new StringBuilder();
sb.append("Matched pattern '").append(patternType).append("': ");
double total = 0;
Set<String> locations = new HashSet<>();
int count = 0;
for (List<Transaction> txList : match.values()) {
for (Transaction tx : txList) {
total += tx.getAmount();
locations.add(tx.getLocation());
count++;
}
}
sb.append(count).append(" transactions, ");
sb.append(String.format("total $%.2f, ", total));
sb.append("locations: ").append(locations);
return sb.toString();
}
}
The Complete Fraud Detection Pipeline
The pipeline below is wired together end to end, from Kafka source to fraud alert output:
package com.example.cep;
import com.example.cep.events.FraudAlert;
import com.example.cep.events.Transaction;
import com.example.cep.patterns.FraudPatterns;
import com.example.cep.processors.FraudAlertProcessor;
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.api.common.serialization.SimpleStringSchema;
import org.apache.flink.cep.CEP;
import org.apache.flink.cep.PatternStream;
import org.apache.flink.cep.pattern.Pattern;
import org.apache.flink.connector.kafka.source.KafkaSource;
import org.apache.flink.connector.kafka.source.enumerator.initializer.OffsetsInitializer;
import org.apache.flink.connector.kafka.sink.KafkaSink;
import org.apache.flink.connector.kafka.sink.KafkaRecordSerializationSchema;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import com.fasterxml.jackson.databind.ObjectMapper;
import java.time.Duration;
public class FraudDetectionPipeline {
public static void main(String[] args) throws Exception {
// 1. Set up the streaming execution environment
StreamExecutionEnvironment env =
StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(4);
// Enable checkpointing for exactly-once semantics
env.enableCheckpointing(60_000); // checkpoint every 60 seconds
// 2. Create Kafka source for transactions
KafkaSource<String> kafkaSource = KafkaSource.<String>builder()
.setBootstrapServers("localhost:9092")
.setTopics("transactions")
.setGroupId("fraud-detection-group")
.setStartingOffsets(OffsetsInitializer.latest())
.setValueOnlyDeserializer(new SimpleStringSchema())
.build();
// 3. Read from Kafka with event time watermarks
ObjectMapper mapper = new ObjectMapper();
DataStream<Transaction> transactions = env
.fromSource(kafkaSource, WatermarkStrategy
.<String>forBoundedOutOfOrderness(Duration.ofSeconds(5))
.withTimestampAssigner((event, timestamp) -> {
try {
return mapper.readValue(event, Transaction.class)
.getTimestamp();
} catch (Exception e) {
return timestamp;
}
}), "Kafka Transactions")
.map(json -> mapper.readValue(json, Transaction.class))
.keyBy(Transaction::getUserId); // Key by user for per-user patterns
// 4. Apply Pattern 1: Geographic Impossibility
Pattern<Transaction, ?> geoPattern = FraudPatterns.geographicImpossibility();
PatternStream<Transaction> geoPatternStream = CEP.pattern(
transactions, geoPattern);
DataStream<FraudAlert> geoAlerts = geoPatternStream.process(
new FraudAlertProcessor("GEOGRAPHIC_IMPOSSIBILITY"));
// 5. Apply Pattern 2: Card Testing Attack
Pattern<Transaction, ?> testPattern = FraudPatterns.cardTestingAttack();
PatternStream<Transaction> testPatternStream = CEP.pattern(
transactions, testPattern);
DataStream<FraudAlert> testAlerts = testPatternStream.process(
new FraudAlertProcessor("CARD_TESTING_ATTACK"));
// 6. Apply Pattern 3: High Velocity
Pattern<Transaction, ?> velocityPattern = FraudPatterns.highVelocity();
PatternStream<Transaction> velocityPatternStream = CEP.pattern(
transactions, velocityPattern);
DataStream<FraudAlert> velocityAlerts = velocityPatternStream.process(
new FraudAlertProcessor("HIGH_VELOCITY"));
// 7. Union all alerts and sink to Kafka
DataStream<FraudAlert> allAlerts = geoAlerts
.union(testAlerts)
.union(velocityAlerts);
// Print to console (for development)
allAlerts.print("FRAUD ALERT");
// Sink to Kafka alerts topic
KafkaSink<String> alertSink = KafkaSink.<String>builder()
.setBootstrapServers("localhost:9092")
.setRecordSerializer(
KafkaRecordSerializationSchema.builder()
.setTopic("fraud-alerts")
.setValueSerializationSchema(new SimpleStringSchema())
.build()
)
.build();
allAlerts
.map(alert -> mapper.writeValueAsString(alert))
.sinkTo(alertSink);
// 8. Execute the pipeline
env.execute("Credit Card Fraud Detection CEP Pipeline");
}
}
CEP.pattern() call creates a separate NFA instance per key (per user), so patterns are evaluated independently and do not interfere with one another. The keyBy(Transaction::getUserId) call is essential because it ensures that patterns match only those events belonging to the same user.
Hands-On: IoT Sensor Anomaly Detection
The second pipeline detects anomalies in IoT sensor data. The target pattern is a sensor reporting three consecutive rising temperature readings above a threshold within one minute, followed by a pressure drop. The sequence frequently indicates an impending equipment failure. In a production setting, the detected anomalies would be persisted in a time-series database optimised for preprocessed data, and the underlying sensor readings could be supplied to forecasting models for predictive maintenance.
Sensor Event Class
package com.example.cep.events;
public class SensorReading implements java.io.Serializable {
private String sensorId;
private double temperature;
private double pressure;
private long timestamp;
private String location;
public SensorReading() {}
public SensorReading(String sensorId, double temperature, double pressure,
long timestamp, String location) {
this.sensorId = sensorId;
this.temperature = temperature;
this.pressure = pressure;
this.timestamp = timestamp;
this.location = location;
}
public String getSensorId() { return sensorId; }
public void setSensorId(String sensorId) { this.sensorId = sensorId; }
public double getTemperature() { return temperature; }
public void setTemperature(double temperature) { this.temperature = temperature; }
public double getPressure() { return pressure; }
public void setPressure(double pressure) { this.pressure = pressure; }
public long getTimestamp() { return timestamp; }
public void setTimestamp(long timestamp) { this.timestamp = timestamp; }
public String getLocation() { return location; }
public void setLocation(String location) { this.location = location; }
@Override
public String toString() {
return String.format("Sensor{id=%s, temp=%.1f, pressure=%.1f, time=%d}",
sensorId, temperature, pressure, timestamp);
}
}
Complete IoT Anomaly Pipeline
package com.example.cep;
import com.example.cep.events.SensorReading;
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.cep.CEP;
import org.apache.flink.cep.PatternStream;
import org.apache.flink.cep.functions.PatternProcessFunction;
import org.apache.flink.cep.pattern.Pattern;
import org.apache.flink.cep.pattern.conditions.IterativeCondition;
import org.apache.flink.cep.pattern.conditions.SimpleCondition;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.windowing.time.Time;
import org.apache.flink.util.Collector;
import java.time.Duration;
import java.util.*;
public class IoTAnomalyDetectionPipeline {
private static final double TEMP_THRESHOLD = 85.0; // degrees Celsius
private static final double PRESSURE_DROP_THRESHOLD = 10.0; // PSI
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env =
StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(2);
env.enableCheckpointing(30_000);
// Simulated sensor data source (replace with Kafka in production)
DataStream<SensorReading> sensorStream = env
.addSource(new SimulatedSensorSource()) // your custom source
.assignTimestampsAndWatermarks(
WatermarkStrategy
.<SensorReading>forBoundedOutOfOrderness(Duration.ofSeconds(3))
.withTimestampAssigner((reading, ts) -> reading.getTimestamp())
)
.keyBy(SensorReading::getSensorId);
// Pattern: 3 consecutive high-temp readings, then a pressure drop
Pattern<SensorReading, ?> anomalyPattern = Pattern
.<SensorReading>begin("rising_temp_1")
.where(new SimpleCondition<SensorReading>() {
@Override
public boolean filter(SensorReading reading) {
return reading.getTemperature() > TEMP_THRESHOLD;
}
})
.next("rising_temp_2")
.where(new IterativeCondition<SensorReading>() {
@Override
public boolean filter(SensorReading reading,
Context<SensorReading> ctx) {
if (reading.getTemperature() <= TEMP_THRESHOLD) return false;
for (SensorReading prev : ctx.getEventsForPattern("rising_temp_1")) {
return reading.getTemperature() > prev.getTemperature();
}
return false;
}
})
.next("rising_temp_3")
.where(new IterativeCondition<SensorReading>() {
@Override
public boolean filter(SensorReading reading,
Context<SensorReading> ctx) {
if (reading.getTemperature() <= TEMP_THRESHOLD) return false;
for (SensorReading prev : ctx.getEventsForPattern("rising_temp_2")) {
return reading.getTemperature() > prev.getTemperature();
}
return false;
}
})
.followedBy("pressure_drop")
.where(new IterativeCondition<SensorReading>() {
@Override
public boolean filter(SensorReading reading,
Context<SensorReading> ctx) {
for (SensorReading prev : ctx.getEventsForPattern("rising_temp_1")) {
double pressureDiff = prev.getPressure() - reading.getPressure();
return pressureDiff > PRESSURE_DROP_THRESHOLD;
}
return false;
}
})
.within(Time.minutes(1));
// Apply pattern and process matches
PatternStream<SensorReading> patternStream =
CEP.pattern(sensorStream, anomalyPattern);
DataStream<String> anomalyAlerts = patternStream.process(
new PatternProcessFunction<SensorReading, String>() {
@Override
public void processMatch(Map<String, List<SensorReading>> match,
Context ctx,
Collector<String> out) {
SensorReading first = match.get("rising_temp_1").get(0);
SensorReading second = match.get("rising_temp_2").get(0);
SensorReading third = match.get("rising_temp_3").get(0);
SensorReading drop = match.get("pressure_drop").get(0);
String alert = String.format(
"ANOMALY DETECTED | Sensor: %s | Location: %s | " +
"Temps: %.1f -> %.1f -> %.1f (threshold: %.1f) | " +
"Pressure drop: %.1f -> %.1f (delta: %.1f)",
first.getSensorId(), first.getLocation(),
first.getTemperature(), second.getTemperature(),
third.getTemperature(), TEMP_THRESHOLD,
first.getPressure(), drop.getPressure(),
first.getPressure() - drop.getPressure()
);
out.collect(alert);
}
}
);
anomalyAlerts.print("IOT ALERT");
env.execute("IoT Sensor Anomaly Detection Pipeline");
}
}
next() (strict contiguity) for the three rising temperature readings because they must be consecutive. By contrast, followedBy() (relaxed contiguity) is used for the pressure drop, since other normal readings may occur between the temperature spike and the pressure change.
Hands-On: Stock Market Pattern Detection
The third pipeline detects potential trading signals, specifically a price drop greater than 5% followed by a high volume spike within 10 seconds. The pattern can indicate panic selling followed by institutional buying, which may represent a potential buy signal.
StockTick Event Class
package com.example.cep.events;
public class StockTick implements java.io.Serializable {
private String symbol;
private double price;
private long volume;
private long timestamp;
private double previousClose;
public StockTick() {}
public StockTick(String symbol, double price, long volume,
long timestamp, double previousClose) {
this.symbol = symbol;
this.price = price;
this.volume = volume;
this.timestamp = timestamp;
this.previousClose = previousClose;
}
public String getSymbol() { return symbol; }
public void setSymbol(String symbol) { this.symbol = symbol; }
public double getPrice() { return price; }
public void setPrice(double price) { this.price = price; }
public long getVolume() { return volume; }
public void setVolume(long volume) { this.volume = volume; }
public long getTimestamp() { return timestamp; }
public void setTimestamp(long timestamp) { this.timestamp = timestamp; }
public double getPreviousClose() { return previousClose; }
public void setPreviousClose(double pc) { this.previousClose = pc; }
public double getPriceChangePercent() {
if (previousClose == 0) return 0;
return ((price - previousClose) / previousClose) * 100.0;
}
@Override
public String toString() {
return String.format("StockTick{sym=%s, price=%.2f, vol=%d, change=%.2f%%}",
symbol, price, volume, getPriceChangePercent());
}
}
Complete Stock Market Detection Pipeline
package com.example.cep;
import com.example.cep.events.StockTick;
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.cep.CEP;
import org.apache.flink.cep.PatternStream;
import org.apache.flink.cep.functions.PatternProcessFunction;
import org.apache.flink.cep.pattern.Pattern;
import org.apache.flink.cep.pattern.conditions.IterativeCondition;
import org.apache.flink.cep.pattern.conditions.SimpleCondition;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.windowing.time.Time;
import org.apache.flink.util.Collector;
import java.time.Duration;
import java.util.*;
public class StockPatternDetectionPipeline {
private static final double PRICE_DROP_THRESHOLD = -5.0; // percent
private static final double VOLUME_SPIKE_MULTIPLIER = 3.0; // 3x average
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env =
StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(4);
env.enableCheckpointing(10_000);
// Assume a Kafka source producing StockTick JSON
// (using simulated source for this example)
DataStream<StockTick> tickStream = env
.addSource(new SimulatedStockSource())
.assignTimestampsAndWatermarks(
WatermarkStrategy
.<StockTick>forBoundedOutOfOrderness(Duration.ofSeconds(2))
.withTimestampAssigner((tick, ts) -> tick.getTimestamp())
)
.keyBy(StockTick::getSymbol);
// Pattern: Price drop > 5% followed by volume spike within 10 seconds
Pattern<StockTick, ?> buySignalPattern = Pattern
.<StockTick>begin("price_drop")
.where(new SimpleCondition<StockTick>() {
@Override
public boolean filter(StockTick tick) {
return tick.getPriceChangePercent() < PRICE_DROP_THRESHOLD;
}
})
.followedBy("volume_spike")
.where(new IterativeCondition<StockTick>() {
@Override
public boolean filter(StockTick tick, Context<StockTick> ctx) {
for (StockTick drop : ctx.getEventsForPattern("price_drop")) {
// Volume must be at least 3x the volume during the drop
if (tick.getVolume() > drop.getVolume() * VOLUME_SPIKE_MULTIPLIER) {
return true;
}
}
return false;
}
})
.within(Time.seconds(10));
// Apply pattern
PatternStream<StockTick> patternStream =
CEP.pattern(tickStream, buySignalPattern);
DataStream<String> signals = patternStream.process(
new PatternProcessFunction<StockTick, String>() {
@Override
public void processMatch(Map<String, List<StockTick>> match,
Context ctx,
Collector<String> out) {
StockTick drop = match.get("price_drop").get(0);
StockTick spike = match.get("volume_spike").get(0);
String signal = String.format(
"BUY SIGNAL | %s | Drop: %.2f%% (price $%.2f) | " +
"Volume spike: %d -> %d (%.1fx) | " +
"Current price: $%.2f",
drop.getSymbol(),
drop.getPriceChangePercent(),
drop.getPrice(),
drop.getVolume(),
spike.getVolume(),
(double) spike.getVolume() / drop.getVolume(),
spike.getPrice()
);
out.collect(signal);
}
}
);
signals.print("TRADING SIGNAL");
env.execute("Stock Market Pattern Detection Pipeline");
}
}
Advanced CEP Techniques
Once the fundamentals are in place, the following advanced techniques bring CEP pipelines to production quality.
Dynamic Patterns from External Configuration
Hard-coded patterns are acceptable during initial development, but production systems must update rules without redeployment. One approach is to load pattern parameters from an external source:
// Load thresholds from a configuration source
public class DynamicFraudPatterns {
public static Pattern<Transaction, ?> fromConfig(FraudRuleConfig config) {
return Pattern.<Transaction>begin("test_charge")
.where(new SimpleCondition<Transaction>() {
@Override
public boolean filter(Transaction tx) {
return tx.getAmount() >= config.getMinTestAmount()
&& tx.getAmount() <= config.getMaxTestAmount();
}
})
.followedBy("big_charge")
.where(new SimpleCondition<Transaction>() {
@Override
public boolean filter(Transaction tx) {
return tx.getAmount() >= config.getLargeTransactionThreshold();
}
})
.within(Time.minutes(config.getTimeWindowMinutes()));
}
}
// Configuration POJO loaded from database, file, or broadcast stream
public class FraudRuleConfig implements java.io.Serializable {
private double minTestAmount = 0.01;
private double maxTestAmount = 5.0;
private double largeTransactionThreshold = 1000.0;
private int timeWindowMinutes = 1;
// getters and setters...
}
Side Outputs for Timeout Handling
When a partial pattern match times out, that is, when the within() window expires before the pattern completes, the timed-out partial matches can be captured using TimedOutPartialMatchHandler:
import org.apache.flink.cep.functions.PatternProcessFunction;
import org.apache.flink.cep.functions.TimedOutPartialMatchHandler;
import org.apache.flink.util.OutputTag;
public class FraudAlertWithTimeout
extends PatternProcessFunction<Transaction, FraudAlert>
implements TimedOutPartialMatchHandler<Transaction> {
// Side output for timed-out partial matches
public static final OutputTag<String> TIMEOUT_TAG =
new OutputTag<String>("timed-out-patterns") {};
@Override
public void processMatch(Map<String, List<Transaction>> match,
Context ctx,
Collector<FraudAlert> out) {
// Process fully matched pattern (same as before)
// ...
}
@Override
public void processTimedOutMatch(Map<String, List<Transaction>> match,
Context ctx) {
// A partial match timed out — log it for analysis
StringBuilder sb = new StringBuilder("PARTIAL MATCH TIMEOUT: ");
for (Map.Entry<String, List<Transaction>> entry : match.entrySet()) {
sb.append(entry.getKey()).append("=")
.append(entry.getValue().size()).append(" events; ");
}
// Output to side output
ctx.output(TIMEOUT_TAG, sb.toString());
}
}
// In your pipeline, capture the side output:
SingleOutputStreamOperator<FraudAlert> alerts = patternStream
.process(new FraudAlertWithTimeout());
DataStream<String> timedOutPatterns = alerts
.getSideOutput(FraudAlertWithTimeout.TIMEOUT_TAG);
timedOutPatterns.print("TIMEOUT");
Scaling CEP Jobs
CEP pattern matching is stateful because the NFA maintains partial match buffers per key. The principal scaling considerations are summarised below:
- Key Partitioning: The stream should be passed through
keyBy()before CEP patterns are applied. This ensures that events for the same entity (user, sensor, stock symbol) are routed to the same parallel instance. - Parallelism: Parallelism should be selected on the basis of key cardinality. For 10,000 users, a parallelism of 8–16 is generally sufficient. Flink distributes keys across parallel instances using hash partitioning.
- State Size: Each active partial match consumes memory. With long time windows or high-cardinality patterns, state size should be monitored carefully.
// Set different parallelism for different pipeline stages
DataStream<Transaction> transactions = env
.fromSource(kafkaSource, watermarkStrategy, "source")
.setParallelism(8) // match Kafka partitions
.map(json -> mapper.readValue(json, Transaction.class))
.setParallelism(8)
.keyBy(Transaction::getUserId);
// CEP pattern matching — can be different parallelism
PatternStream<Transaction> patternStream = CEP.pattern(
transactions.setParallelism(16), // more parallelism for CPU-heavy matching
fraudPattern
);
State Management and Checkpointing
import org.apache.flink.contrib.streaming.state.EmbeddedRocksDBStateBackend;
import org.apache.flink.streaming.api.CheckpointingMode;
import org.apache.flink.streaming.api.environment.CheckpointConfig;
// Configure robust checkpointing
env.setStateBackend(new EmbeddedRocksDBStateBackend());
env.enableCheckpointing(60_000, CheckpointingMode.EXACTLY_ONCE);
CheckpointConfig checkpointConfig = env.getCheckpointConfig();
checkpointConfig.setMinPauseBetweenCheckpoints(30_000);
checkpointConfig.setCheckpointTimeout(120_000);
checkpointConfig.setMaxConcurrentCheckpoints(1);
checkpointConfig.setTolerableCheckpointFailureNumber(3);
// Retain checkpoints on cancellation (for savepoint-like recovery)
checkpointConfig.setExternalizedCheckpointCleanup(
CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION
);
Event Time and Processing Time
The distinction between event time and processing time is of central importance for CEP. Event time is the moment at which the event actually occurred, as embedded in the event data. Processing time is the moment at which the Flink operator processes the event. Under ideal conditions, the two values would coincide. In practice, events arrive late, out of order, and at variable rates.
Why Event Time Matters for CEP
Consider a fraud detection pattern defined as “three transactions within 5 minutes.” If transaction #2 arrives at the system 10 seconds late owing to network congestion, processing time would register a gap that does not actually exist. Event time correctly identifies that the three transactions occurred within the 5-minute window, irrespective of when they arrived.
Watermark Strategies
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.api.common.eventtime.WatermarkGenerator;
import org.apache.flink.api.common.eventtime.WatermarkOutput;
import org.apache.flink.api.common.eventtime.WatermarkGeneratorSupplier;
// Strategy 1: Bounded out-of-orderness (most common)
// Assumes events can arrive up to 5 seconds late
WatermarkStrategy<Transaction> strategy1 = WatermarkStrategy
.<Transaction>forBoundedOutOfOrderness(Duration.ofSeconds(5))
.withTimestampAssigner((tx, recordTimestamp) -> tx.getTimestamp());
// Strategy 2: Monotonous timestamps (events always in order)
// Only use if you can guarantee ordering
WatermarkStrategy<Transaction> strategy2 = WatermarkStrategy
.<Transaction>forMonotonousTimestamps()
.withTimestampAssigner((tx, recordTimestamp) -> tx.getTimestamp());
// Strategy 3: Custom watermark generator for complex scenarios
WatermarkStrategy<Transaction> strategy3 = WatermarkStrategy
.<Transaction>forGenerator(context -> new WatermarkGenerator<Transaction>() {
private long maxTimestamp = Long.MIN_VALUE;
private static final long MAX_DELAY = 10_000L; // 10 seconds
@Override
public void onEvent(Transaction tx, long eventTimestamp,
WatermarkOutput output) {
maxTimestamp = Math.max(maxTimestamp, tx.getTimestamp());
}
@Override
public void onPeriodicEmit(WatermarkOutput output) {
output.emitWatermark(
new org.apache.flink.api.common.eventtime.Watermark(
maxTimestamp - MAX_DELAY
)
);
}
})
.withTimestampAssigner((tx, recordTimestamp) -> tx.getTimestamp());
forBoundedOutOfOrderness() with a bound of 5–10 seconds is the appropriate choice. A bound that is too low causes late events to be missed, while a bound that is too high delays pattern matching by the same amount, since Flink cannot process an event-time window until the watermark passes it.
Connecting to Real Data Sources
Kafka Source Connector
Most production CEP pipelines read from Apache Kafka. For a Python-focused treatment of Kafka consumer implementation, see the Apache Kafka consumer implementation guide in Python. A complete, production-ready Kafka source setup in Java is shown below:
import org.apache.flink.api.common.serialization.DeserializationSchema;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.connector.kafka.source.KafkaSource;
import org.apache.flink.connector.kafka.source.enumerator.initializer.OffsetsInitializer;
import com.fasterxml.jackson.databind.ObjectMapper;
// Custom deserializer for Transaction events
public class TransactionDeserializer
implements DeserializationSchema<Transaction> {
private transient ObjectMapper mapper;
@Override
public Transaction deserialize(byte[] message) {
if (mapper == null) mapper = new ObjectMapper();
try {
return mapper.readValue(message, Transaction.class);
} catch (Exception e) {
// Log and skip malformed events
System.err.println("Failed to deserialize: " + new String(message));
return null;
}
}
@Override
public boolean isEndOfStream(Transaction nextElement) {
return false;
}
@Override
public TypeInformation<Transaction> getProducedType() {
return TypeInformation.of(Transaction.class);
}
}
// Build the Kafka source
KafkaSource<Transaction> source = KafkaSource.<Transaction>builder()
.setBootstrapServers("kafka-broker-1:9092,kafka-broker-2:9092")
.setTopics("transactions")
.setGroupId("fraud-detection-v2")
.setStartingOffsets(OffsetsInitializer.latest())
.setValueOnlyDeserializer(new TransactionDeserializer())
.setProperty("security.protocol", "SASL_SSL")
.setProperty("sasl.mechanism", "PLAIN")
.setProperty("sasl.jaas.config",
"org.apache.kafka.common.security.plain.PlainLoginModule required " +
"username=\"api-key\" password=\"api-secret\";")
.build();
Kafka Sink for Alerts
import org.apache.flink.connector.kafka.sink.KafkaSink;
import org.apache.flink.connector.kafka.sink.KafkaRecordSerializationSchema;
import org.apache.flink.api.common.serialization.SimpleStringSchema;
import org.apache.flink.connector.base.DeliveryGuarantee;
KafkaSink<String> alertSink = KafkaSink.<String>builder()
.setBootstrapServers("kafka-broker-1:9092")
.setRecordSerializer(
KafkaRecordSerializationSchema.builder()
.setTopic("fraud-alerts")
.setValueSerializationSchema(new SimpleStringSchema())
.build()
)
.setDeliveryGuarantee(DeliveryGuarantee.EXACTLY_ONCE)
.setTransactionalIdPrefix("fraud-alert-sink")
.build();
// Wire it up
allAlerts
.map(alert -> mapper.writeValueAsString(alert))
.sinkTo(alertSink);
JDBC Connector for Enrichment
It is often necessary to enrich events with data from a database, for example by looking up a customer’s risk score before CEP patterns are applied. Flink’s asynchronous I/O is well suited to this purpose:
import org.apache.flink.streaming.api.functions.async.AsyncFunction;
import org.apache.flink.streaming.api.functions.async.ResultFuture;
import org.apache.flink.streaming.api.datastream.AsyncDataStream;
import java.util.concurrent.TimeUnit;
// Async enrichment function
public class CustomerEnrichment
extends RichAsyncFunction<Transaction, EnrichedTransaction> {
private transient DataSource dataSource;
@Override
public void open(Configuration parameters) {
// Initialize connection pool
dataSource = createConnectionPool();
}
@Override
public void asyncInvoke(Transaction tx,
ResultFuture<EnrichedTransaction> resultFuture) {
CompletableFuture.supplyAsync(() -> {
try (Connection conn = dataSource.getConnection();
PreparedStatement stmt = conn.prepareStatement(
"SELECT risk_score, account_age FROM customers WHERE id = ?")) {
stmt.setString(1, tx.getUserId());
ResultSet rs = stmt.executeQuery();
if (rs.next()) {
return new EnrichedTransaction(tx,
rs.getDouble("risk_score"),
rs.getInt("account_age"));
}
return new EnrichedTransaction(tx, 0.5, 0);
} catch (Exception e) {
return new EnrichedTransaction(tx, 0.5, 0);
}
}).thenAccept(result -> resultFuture.complete(
Collections.singleton(result)));
}
}
// Apply async enrichment before CEP
DataStream<EnrichedTransaction> enriched = AsyncDataStream
.unorderedWait(
transactionStream,
new CustomerEnrichment(),
30, TimeUnit.SECONDS, // timeout
100 // max concurrent requests
);
Flink also supports connectors for Apache Pulsar, Amazon Kinesis, and many other systems through its connector ecosystem. The setup is broadly similar: define a source, assign watermarks, and feed the stream into the CEP patterns.
Deploying and Monitoring
Running Locally for Development
The simplest development workflow is to run the job directly within an IDE. Flink will then create a local mini-cluster automatically:
// This works out of the box in your IDE
StreamExecutionEnvironment env =
StreamExecutionEnvironment.getExecutionEnvironment();
// Flink automatically creates a local mini-cluster
Docker Compose for Local Flink and Kafka
For integration testing, the following Docker Compose configuration provides a local Flink and Kafka environment:
# docker-compose.yml
version: '3.8'
services:
zookeeper:
image: confluentinc/cp-zookeeper:7.5.3
environment:
ZOOKEEPER_CLIENT_PORT: 2181
ports:
- "2181:2181"
kafka:
image: confluentinc/cp-kafka:7.5.3
depends_on:
- zookeeper
ports:
- "9092:9092"
environment:
KAFKA_BROKER_ID: 1
KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181
KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://localhost:9092
KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1
KAFKA_AUTO_CREATE_TOPICS_ENABLE: "true"
flink-jobmanager:
image: flink:1.18.1-java17
ports:
- "8081:8081" # Flink Web UI
command: jobmanager
environment:
FLINK_PROPERTIES: |
jobmanager.rpc.address: flink-jobmanager
state.backend: rocksdb
state.checkpoints.dir: file:///tmp/flink-checkpoints
state.savepoints.dir: file:///tmp/flink-savepoints
flink-taskmanager:
image: flink:1.18.1-java17
depends_on:
- flink-jobmanager
command: taskmanager
scale: 2 # Run 2 task managers
environment:
FLINK_PROPERTIES: |
jobmanager.rpc.address: flink-jobmanager
taskmanager.numberOfTaskSlots: 4
taskmanager.memory.process.size: 2048m
Deploying to a Flink Cluster
The fat JAR should be built and submitted to the cluster:
# Build the fat JAR
mvn clean package -DskipTests
# Submit to standalone cluster
./bin/flink run \
-c com.example.cep.FraudDetectionPipeline \
target/flink-cep-pipeline-1.0.0.jar
# Submit to YARN cluster
./bin/flink run -m yarn-cluster \
-yn 4 \ # 4 TaskManagers
-ys 8 \ # 8 slots per TaskManager
-yjm 2048m \ # JobManager memory
-ytm 4096m \ # TaskManager memory
-c com.example.cep.FraudDetectionPipeline \
target/flink-cep-pipeline-1.0.0.jar
# Submit to Kubernetes (using Flink Kubernetes Operator)
kubectl apply -f flink-cep-deployment.yaml
Monitoring the Pipeline
The Flink Web UI (default port 8081) is the primary monitoring interface. The most important metrics are summarised below:
- Checkpoint Duration: If checkpoints take longer than the configured interval, cascading delays appear. Checkpoint duration should be kept below 50% of the checkpoint interval.
- Backpressure: When a downstream operator cannot keep pace, backpressure propagates upstream. The Web UI indicates this with colour-coded task states, where red signals a problem.
- Throughput (records/second): Input and output rates for each operator should be monitored. A sudden drop in output rate with constant input suggests a processing bottleneck.
- State Size: CEP patterns maintain partial match buffers. State size should be observed over time, since unbounded growth indicates a pattern or key-space problem.
Performance Optimisation
Making a CEP pipeline functional is one matter; making it handle production volumes efficiently is another. The principal tuning levers are described below.
Choosing the Right Parallelism
Parallelism controls the number of parallel instances of each operator that Flink runs. For CEP pipelines, the following guidelines apply:
- Source parallelism: Should match the number of Kafka partitions. If the topic has 16 partitions, source parallelism should be set to 16.
- CEP operator parallelism: Depends on key cardinality and pattern complexity. A reasonable starting point is the same parallelism as the source, with subsequent increases if backpressure appears on the CEP operator.
- Sink parallelism: Typically lower than CEP parallelism because alert volume is substantially lower than input volume.
State Backend Selection
| State Backend | State Size | Speed | Best For |
|---|---|---|---|
| HashMapStateBackend (Heap) | Limited by JVM heap | Fastest | Small state, low latency requirements |
| EmbeddedRocksDBStateBackend | Limited by disk | Slower (disk I/O) | Large state, long time windows |
For CEP workloads specifically, the heap state backend is adequate when patterns have short time windows (seconds to minutes) and moderate key cardinality. For long time windows on the order of hours, or millions of keys with active partial matches, RocksDB is the safer option.
Recommended Settings by Use Case
| Setting | Fraud Detection | IoT Monitoring | Market Data |
|---|---|---|---|
| Parallelism | 8–32 | 4–16 | 16–64 |
| Checkpoint Interval | 60s | 30s | 10s |
| State Backend | RocksDB | Heap or RocksDB | Heap |
| Watermark Bound | 5s | 3s | 1s |
| TaskManager Memory | 4–8 GB | 2–4 GB | 8–16 GB |
| Serialization | Avro or Protobuf | Avro | Protobuf (smallest size) |
Serialisation Considerations
Flink’s default Java serialisation is slow and produces large state snapshots. For production CEP pipelines, event types should be registered with Flink’s type system or serialised efficiently:
// Register types for efficient serialization
env.getConfig().registerTypeWithKryoSerializer(
Transaction.class, ProtobufSerializer.class);
// Or use Flink's POJO serialization (automatic for well-formed POJOs)
// Ensure your classes:
// 1. Have a no-arg constructor
// 2. Have public getters/setters for all fields
// 3. Implement Serializable
// For Avro serialization, use Flink's Avro format
// Add dependency: flink-avro
// Then use AvroDeserializationSchema:
import org.apache.flink.formats.avro.AvroDeserializationSchema;
KafkaSource<Transaction> avroSource = KafkaSource.<Transaction>builder()
.setBootstrapServers("localhost:9092")
.setTopics("transactions-avro")
.setGroupId("fraud-detection")
.setValueOnlyDeserializer(
AvroDeserializationSchema.forSpecific(Transaction.class))
.build();
Common Pitfalls and Troubleshooting
The most frequently encountered issues are summarised below:
| Problem | Cause | Solution |
|---|---|---|
| Pattern never matches | Events arrive out of order; within() window too tight; using next() when followedBy() is needed |
Check event ordering, increase time window, switch contiguity mode |
| Too many matches (false positives) | Pattern conditions too loose; using followedByAny() generating combinatorial explosion |
Add tighter conditions, switch to followedBy(), shorten time window |
| OutOfMemoryError | Large NFA state from long time windows, high key cardinality, or followedByAny() with oneOrMore() |
Switch to RocksDB state backend, shorten time windows, add until() conditions |
| Checkpoint failures | State too large to snapshot within timeout; backpressure causing delays | Increase checkpoint timeout, enable incremental checkpointing with RocksDB, reduce state size |
| Watermark stalling (no progress) | One Kafka partition has no data—its watermark stays at Long.MIN_VALUE, blocking global watermark |
Use withIdleness(Duration.ofMinutes(1)) on watermark strategy |
| Duplicate alerts after restart | Reprocessing events without checkpointed state | Always restart from savepoint/checkpoint, enable exactly-once on sinks |
| ClassNotFoundException at runtime | flink-cep not in the fat JAR; marked as provided by mistake |
Ensure flink-cep is not marked as provided—only flink-streaming-java and flink-clients should be |
Fixing Watermark Stalling
Watermark stalling is among the most difficult issues to diagnose. If a single Kafka partition ceases to produce events, its watermark remains at negative infinity, which blocks the global watermark for the entire job. The remedy is straightforward:
WatermarkStrategy<Transaction> strategy = WatermarkStrategy
.<Transaction>forBoundedOutOfOrderness(Duration.ofSeconds(5))
.withTimestampAssigner((tx, ts) -> tx.getTimestamp())
.withIdleness(Duration.ofMinutes(1)); // Mark source as idle after 1 min
Debugging Pattern Matches
When patterns do not match as expected, a pass-through select can be inserted before the CEP operator in order to verify that events are flowing and correctly keyed:
// Debug: print events as they enter the CEP operator
transactions
.map(tx -> {
System.out.println("CEP INPUT: " + tx);
return tx;
})
.keyBy(Transaction::getUserId);
// Also: check that your conditions actually match
// by testing them in a unit test
@Test
public void testFraudCondition() {
Transaction tx = new Transaction("1", "user1", 600.0,
System.currentTimeMillis(), "NYC", "electronics", "1234");
assertTrue(tx.getAmount() > 500.0); // Verify condition logic
}
Final Thoughts
Complex Event Processing with Apache Flink supports the detection of sophisticated patterns across millions of events per second with millisecond latency and exactly-once guarantees. The present guide has covered considerable ground, from the fundamentals of CEP and the Flink pattern API to three complete, production-style pipelines for fraud detection, IoT monitoring, and financial market analysis.
The principal lessons may be summarised as follows:
- Select the appropriate contiguity:
next()for strict sequences,followedBy()for relaxed matching, andfollowedByAny()sparingly, given its computational cost. - Always use event time with appropriate watermark strategies. Processing time produces incorrect pattern matches in any real-world system where events arrive out of order.
- Key the streams: CEP patterns should almost always be applied to keyed streams so that matches remain scoped to a logical entity such as a user, sensor, or stock symbol.
- Handle timeouts: Implementing
TimedOutPartialMatchHandlerallows partial matches that do not complete within the time window to be captured and analysed. - Monitor state size: CEP is inherently stateful. RocksDB is recommended for large state, time windows should remain as short as possible, and combinatorial explosion in non-deterministic patterns should be monitored.
- Start simple and iterate: An initial implementation should begin with a single pattern on a small data sample, verified for correctness before complexity or scale are increased.
Flink’s CEP library is among the most capable pattern-matching engines in the open-source ecosystem. The patterns and techniques presented here provide the foundation required to build a first production CEP pipeline. For reproducible deployment of Flink applications, containerisation with Docker simplifies both local development and production rollout. The fraud detection example offers a suitable starting point that can be adapted to the target domain and scaled accordingly.
References
- Apache Flink CEP Documentation (v1.18)
- Flink Event Time and Watermarks Guide
- Flink Kafka Connector Documentation
- Flink State Backends Reference
- Flink Docker Deployment Guide
- Stream Processing with Apache Flink (O’Reilly) by Fabian Hueske and Vasiliki Kalavri
- Apache Flink: Stream and Batch Processing in a Single Engine—original Flink research paper
Leave a Reply