Home Programming Complex Event Processing with Apache Flink: Building Real-Time CEP Pipelines from Scratch

Complex Event Processing with Apache Flink: Building Real-Time CEP Pipelines from Scratch

Last updated: May 27, 2026
k
Published April 5, 2026 · Updated May 27, 2026 · 43 min read

Summary

What this post covers: A production-style guide to building Complex Event Processing pipelines with Apache Flink, including the Pattern API, three end-to-end Java examples (credit card fraud, IoT anomaly, stock pattern detection), event-time handling, Kafka connectors, deployment, and performance tuning.

Key insights:

  • CEP is fundamentally different from batch or per-event stream processing: it maintains stateful NFA pattern buffers across event sequences, which is why batch jobs and Kafka Streams cannot replace it for fraud detection or multi-step anomaly correlation.
  • Pattern contiguity choice dominates correctness and cost: use next() for strict sequences, followedBy() for relaxed matching, and avoid followedByAny() except when truly needed because it triggers combinatorial state growth.
  • Always drive CEP on event time with proper watermark strategies—processing time produces incorrect matches in any real system where events arrive out of order, and this single mistake breaks more production CEP jobs than any other.
  • Apply patterns to keyed streams so matches stay scoped to a logical entity (user, sensor, symbol); patterns on non-keyed streams quickly explode in state size and produce nonsensical cross-entity matches.
  • CEP is inherently stateful, so production readiness depends on RocksDB state backend, short time windows, TimedOutPartialMatchHandler to catch incomplete sequences, and active monitoring of state size to prevent runaway memory growth.

Main topics: What is Complex Event Processing (CEP)?, Why Apache Flink for CEP?, Setting Up Your Flink CEP Project, Understanding Flink CEP Pattern API, Hands-On Credit Card Fraud Detection, Hands-On IoT Sensor Anomaly Detection, Hands-On Stock Market Pattern Detection, Advanced CEP Techniques, Event Time vs Processing Time, Connecting to Real Data Sources, Deploying and Monitoring, Performance Optimization, Common Pitfalls and Troubleshooting, Final Thoughts, References.

Consider a scenario in which a single credit card is used at a gas station in Houston at 2:13 PM, and forty seconds later the same card number appears at an electronics store in Tokyo. Within those forty seconds, a payment-fraud system must ingest both events, correlate them across millions of concurrent transaction streams, recognise the physical impossibility, and emit a fraud alert before the Tokyo merchant finishes printing the receipt. The scenario is far from hypothetical. Visa processes more than 65,000 transactions per second at peak, and the speed of fraudulent activity continues to increase year on year. Traditional batch jobs executed overnight are of little value in such conditions. Complex Event Processing is required, and Apache Flink is among the strongest engines on which to implement it.

This guide presents the construction of real-time CEP pipelines from first principles. Rather than illustrative fragments, it provides complete, compilable Java code suitable for adaptation to production fraud detection, IoT monitoring, and financial market analysis. By the end of the guide, the reader will understand Flink’s CEP library in sufficient depth to design pattern-matching pipelines for any domain.

What is Complex Event Processing (CEP)?

Complex Event Processing is a methodology for detecting meaningful patterns across streams of events in real time. The defining term is patterns. Simple stream processing typically filters or transforms individual events, for example by returning all transactions above $1,000. CEP extends this scope by examining sequences, combinations, and temporal relationships across multiple events.

Simple Events vs Complex Events

A simple event is a single, atomic occurrence such as a temperature reading, a stock trade, or a log entry. A complex event is a higher-level pattern derived from multiple simple events. For example:

  • Simple event: “User #4821 made a $50 purchase at Starbucks.”
  • Complex event: “User #4821 made three purchases totalling over $2,000 within five minutes from three different countries.” This complex event exists only because a CEP engine recognised the pattern across the underlying simple events.

CEP Compared with Traditional Processing

Understanding where CEP fits relative to batch and stream processing is important:

Feature Batch Processing Stream Processing CEP
Latency Minutes to hours Milliseconds to seconds Milliseconds to seconds
Data Model Bounded datasets Unbounded streams Unbounded streams with pattern state
Pattern Detection Post-hoc analysis Per-event transformations Multi-event temporal patterns
State Management Minimal (reprocess from scratch) Windowed aggregations Pattern match buffers with NFA
Use Case Example Monthly reports Real-time dashboards Fraud detection, anomaly sequences
Tools Spark, Hadoop MapReduce Kafka Streams, Flink DataStream Flink CEP, Esper, Siddhi

 

Real-World CEP Applications

CEP is not a niche technology. It underpins a number of important systems across industries:

  • Fraud Detection: Banks and payment processors use CEP to identify fraudulent transaction patterns in real time, including velocity checks, geographic impossibility, and unusual merchant categories.
  • IoT Monitoring: Manufacturing plants and smart buildings use CEP to detect equipment failure sequences before catastrophic breakdowns occur. For the data infrastructure behind IoT monitoring, see the guide on managing metadata and time-series data for facility sensor signals.
  • Algorithmic Trading: Hedge funds detect price-volume patterns across multiple securities within microsecond windows in order to trigger automated trades.
  • Network Security: SIEM platforms use CEP to correlate firewall logs, authentication events, and data transfer patterns and thereby detect multi-stage cyberattacks.
  • Supply Chain: Real-time tracking of shipment events allows operators to detect delays, rerouting needs, or customs anomalies before they cascade.

CEP Pipeline: From Raw Events to Actionable Alerts Event Source Kafka / Kinesis / API Flink Ingestion Parse · Key · Watermark Pattern Detection NFA State Machine Alert / Action Sink · Notify · Block ① Ingest ② Stream ③ Match ④ React End-to-end latency: milliseconds

Several stream processing engines exist, but Flink occupies a distinct position for CEP workloads. The reasons are discussed below.

Flink was designed as a streaming-first engine. Unlike Spark, which added streaming capabilities to a batch framework, Flink treats streams as the fundamental data model. The distinction is consequential for CEP for several reasons:

  • DataStream API: The core API operates on unbounded streams and offers fine-grained control over event processing, keying, and windowing.
  • Event Time Processing: Flink natively supports event time semantics with watermarks, a feature that is essential for CEP. Matching patterns across events requires reasoning about when events actually occurred, not when they arrived at the processing system.
  • Watermarks: The watermark mechanism tracks the progress of event time through the stream and enables correct handling of out-of-order events, which are a routine occurrence in distributed systems.
  • Flink CEP Library (flink-cep): Flink ships a dedicated CEP library that implements a Non-deterministic Finite Automaton (NFA) for pattern matching. Patterns are defined declaratively, and the engine handles the associated state management internally.
  • Exactly-Once Semantics: The checkpointing mechanism guarantees exactly-once processing, ensuring that fraud alerts are never duplicated or lost.
  • Low Latency: Flink processes events within milliseconds rather than in micro-batches. For CEP workloads, where rapid pattern matching is essential, this property is non-negotiable.

Apache Flink Cluster Architecture JobManager Scheduler · Checkpoints · Recovery TaskManager 1 Task Slots · JVM TaskManager 2 Task Slots · JVM TaskManager 3 Task Slots · JVM Data Flow (partitioned by key) Source (Kafka) CEP Pattern Operator (NFA) Sink (Alerts)

Feature Flink CEP Kafka Streams Esper Spark Structured Streaming Kinesis Analytics
Pattern Matching Built-in NFA-based Manual (no CEP library) EPL query language No native CEP SQL-based only
Latency True streaming (ms) True streaming (ms) In-memory (ms) Micro-batch (100ms+) Near real-time
Scalability Distributed cluster Embedded scaling Single JVM Distributed cluster AWS managed
Exactly-Once Yes Yes No Yes Yes
Fault Tolerance Checkpointing + savepoints Changelog topics Limited Checkpointing Managed snapshots
Event Time Support Native watermarks Timestamp extractors Limited Native watermarks Limited
Best For Complex temporal patterns at scale Simple event-driven microservices Prototyping, embedded CEP Batch + streaming hybrid AWS-native SQL analytics

 

Key Takeaway: For workloads that require detection of complex temporal patterns across high-volume event streams with exactly-once guarantees, Flink CEP is the strongest choice. Kafka Streams is well suited to simpler event-driven architectures but lacks a built-in pattern matching engine. Esper offers strong CEP semantics yet does not scale horizontally. For a more detailed treatment of Kafka as the event backbone, see the Apache Kafka multivariate time-series engine guide.

Setting Up Your Flink CEP Project

Prerequisites

Before any code is written, the following components should be in place:

  • Java 11 or 17 (Flink 1.18+ supports both; Java 17 is recommended for new projects)
  • Maven 3.8+ or Gradle 7+
  • An IDE—IntelliJ IDEA with the Flink plugin is well suited
  • Docker (optional, for running Kafka and Flink locally)

Project Structure

The following layout is used throughout this guide:

flink-cep-pipeline/
├── pom.xml
├── src/main/java/com/example/cep/
│   ├── FlinkCEPApplication.java
│   ├── events/
│   │   ├── Transaction.java
│   │   ├── SensorReading.java
│   │   └── StockTick.java
│   ├── patterns/
│   │   ├── FraudPatterns.java
│   │   ├── IoTPatterns.java
│   │   └── StockPatterns.java
│   ├── processors/
│   │   ├── FraudAlertProcessor.java
│   │   ├── AnomalyAlertProcessor.java
│   │   └── TradingSignalProcessor.java
│   └── sources/
│       └── KafkaSourceBuilder.java
└── src/main/resources/
    └── log4j2.properties

Maven pom.xml

The following Maven configuration contains all required Flink CEP dependencies:

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0
         http://maven.apache.org/xsd/maven-4.0.0.xsd">
    <modelVersion>4.0.0</modelVersion>

    <groupId>com.example</groupId>
    <artifactId>flink-cep-pipeline</artifactId>
    <version>1.0.0</version>
    <packaging>jar</packaging>

    <properties>
        <flink.version>1.18.1</flink.version>
        <java.version>17</java.version>
        <kafka.version>3.6.1</kafka.version>
        <maven.compiler.source>${java.version}</maven.compiler.source>
        <maven.compiler.target>${java.version}</maven.compiler.target>
        <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
    </properties>

    <dependencies>
        <!-- Flink Core -->
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-streaming-java</artifactId>
            <version>${flink.version}</version>
            <scope>provided</scope>
        </dependency>

        <!-- Flink CEP Library -->
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-cep</artifactId>
            <version>${flink.version}</version>
        </dependency>

        <!-- Flink Kafka Connector -->
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-connector-kafka</artifactId>
            <version>3.1.0-1.18</version>
        </dependency>

        <!-- Flink JSON Format -->
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-json</artifactId>
            <version>${flink.version}</version>
        </dependency>

        <!-- Flink Clients (for local execution) -->
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-clients</artifactId>
            <version>${flink.version}</version>
            <scope>provided</scope>
        </dependency>

        <!-- Jackson for JSON serialization -->
        <dependency>
            <groupId>com.fasterxml.jackson.core</groupId>
            <artifactId>jackson-databind</artifactId>
            <version>2.16.1</version>
        </dependency>

        <!-- SLF4J + Log4j2 -->
        <dependency>
            <groupId>org.apache.logging.log4j</groupId>
            <artifactId>log4j-slf4j-impl</artifactId>
            <version>2.22.1</version>
            <scope>runtime</scope>
        </dependency>
        <dependency>
            <groupId>org.apache.logging.log4j</groupId>
            <artifactId>log4j-api</artifactId>
            <version>2.22.1</version>
            <scope>runtime</scope>
        </dependency>
        <dependency>
            <groupId>org.apache.logging.log4j</groupId>
            <artifactId>log4j-core</artifactId>
            <version>2.22.1</version>
            <scope>runtime</scope>
        </dependency>
    </dependencies>

    <build>
        <plugins>
            <plugin>
                <groupId>org.apache.maven.plugins</groupId>
                <artifactId>maven-shade-plugin</artifactId>
                <version>3.5.1</version>
                <executions>
                    <execution>
                        <phase>package</phase>
                        <goals><goal>shade</goal></goals>
                        <configuration>
                            <transformers>
                                <transformer implementation=
                                    "org.apache.maven.plugins.shade.resource.ManifestResourceTransformer">
                                    <mainClass>com.example.cep.FlinkCEPApplication</mainClass>
                                </transformer>
                            </transformers>
                        </configuration>
                    </execution>
                </executions>
            </plugin>
        </plugins>
    </build>
</project>

Gradle Alternative

For Gradle users, the equivalent build.gradle.kts is shown below:

plugins {
    java
    id("com.github.johnrengelman.shadow") version "8.1.1"
}

java {
    sourceCompatibility = JavaVersion.VERSION_17
    targetCompatibility = JavaVersion.VERSION_17
}

val flinkVersion = "1.18.1"

dependencies {
    compileOnly("org.apache.flink:flink-streaming-java:$flinkVersion")
    compileOnly("org.apache.flink:flink-clients:$flinkVersion")
    implementation("org.apache.flink:flink-cep:$flinkVersion")
    implementation("org.apache.flink:flink-connector-kafka:3.1.0-1.18")
    implementation("org.apache.flink:flink-json:$flinkVersion")
    implementation("com.fasterxml.jackson.core:jackson-databind:2.16.1")
    runtimeOnly("org.apache.logging.log4j:log4j-slf4j-impl:2.22.1")
    runtimeOnly("org.apache.logging.log4j:log4j-core:2.22.1")
}
Tip: The flink-streaming-java and flink-clients dependencies are marked as provided (Maven) or compileOnly (Gradle) because the Flink cluster already includes them. When running locally in an IDE, add them to the run configuration’s classpath.

Understanding Flink CEP Pattern API

The Flink CEP library provides a declarative API for defining event patterns. Internally, the library compiles each pattern definition into a Non-deterministic Finite Automaton (NFA) that matches patterns efficiently against the incoming event stream. Each major concept is examined in turn below.

Pattern Matching: Sequence Detection on an Event Stream time → E1 login_fail other E2 login_fail E3 login_fail other ALERT 3× login_fail within window → pattern matched Matching event Non-matching event Alert fired

Pattern Basics

Every pattern starts with Pattern.begin() and chains additional states:

// Strict contiguity: events must be directly adjacent
Pattern<Event, ?> strict = Pattern.<Event>begin("start")
    .where(new SimpleCondition<Event>() {
        @Override
        public boolean filter(Event event) {
            return event.getType().equals("login_failed");
        }
    })
    .next("second")  // MUST be the very next event
    .where(new SimpleCondition<Event>() {
        @Override
        public boolean filter(Event event) {
            return event.getType().equals("login_failed");
        }
    })
    .next("third")
    .where(new SimpleCondition<Event>() {
        @Override
        public boolean filter(Event event) {
            return event.getType().equals("login_failed");
        }
    });

// Relaxed contiguity: allows non-matching events in between
Pattern<Event, ?> relaxed = Pattern.<Event>begin("start")
    .where(/* ... */)
    .followedBy("end")  // matching events can have other events between them
    .where(/* ... */);

// Non-deterministic relaxed contiguity:
// matches all possible combinations
Pattern<Event, ?> nonDeterministic = Pattern.<Event>begin("start")
    .where(/* ... */)
    .followedByAny("end")  // considers ALL matching events, not just first
    .where(/* ... */);

Contiguity: Strict, Relaxed, Non-Deterministic

Contiguity is one of the most important concepts in Flink CEP. Consider a scenario in which the event stream contains A, C, B1, B2 and the pattern is “A followed by B”:

  • next()—Strict: No match. C appears between A and B1, which breaks strict contiguity.
  • followedBy()—Relaxed: Matches {A, B1}. C is skipped, and the first matching B is selected.
  • followedByAny()—Non-deterministic relaxed: Matches both {A, B1} and {A, B2}, since all possible matching events are considered.

Quantifiers

// Exactly N times
Pattern<Event, ?> exactly3 = Pattern.<Event>begin("failures")
    .where(condition)
    .times(3);  // exactly 3 matching events

// N or more times
Pattern<Event, ?> atLeast3 = Pattern.<Event>begin("failures")
    .where(condition)
    .timesOrMore(3);  // 3 or more matching events

// Range
Pattern<Event, ?> range = Pattern.<Event>begin("failures")
    .where(condition)
    .times(2, 5);  // between 2 and 5 matching events

// One or more (greedy)
Pattern<Event, ?> oneOrMore = Pattern.<Event>begin("failures")
    .where(condition)
    .oneOrMore()
    .greedy();  // match as many as possible

// Optional
Pattern<Event, ?> withOptional = Pattern.<Event>begin("start")
    .where(startCondition)
    .next("middle")
    .where(middleCondition)
    .optional()  // this state may or may not match
    .next("end")
    .where(endCondition);

Conditions

// Simple condition — checks current event only
.where(new SimpleCondition<Event>() {
    @Override
    public boolean filter(Event event) {
        return event.getAmount() > 1000.0;
    }
})

// Iterative condition — can reference previously matched events
.where(new IterativeCondition<Event>() {
    @Override
    public boolean filter(Event event, Context<Event> ctx) {
        // Compare with previously matched event
        for (Event prev : ctx.getEventsForPattern("start")) {
            if (!event.getLocation().equals(prev.getLocation())) {
                return true;  // different location than start event
            }
        }
        return false;
    }
})

// OR condition
.where(new SimpleCondition<Event>() {
    @Override
    public boolean filter(Event event) {
        return event.getType().equals("withdrawal");
    }
})
.or(new SimpleCondition<Event>() {
    @Override
    public boolean filter(Event event) {
        return event.getType().equals("transfer");
    }
})

// Until condition (stop condition for looping patterns)
.oneOrMore()
.until(new SimpleCondition<Event>() {
    @Override
    public boolean filter(Event event) {
        return event.getType().equals("logout");
    }
})

Time Constraints

// The entire pattern must complete within 5 minutes
Pattern<Event, ?> timedPattern = Pattern.<Event>begin("first")
    .where(/* ... */)
    .followedBy("second")
    .where(/* ... */)
    .followedBy("third")
    .where(/* ... */)
    .within(Time.minutes(5));
Caution: The within() constraint applies to the entire pattern and is measured from the first matching event. If the first event matches at T=0 and within(Time.minutes(5)) is configured, the entire pattern must complete before T=5min. Partially matched patterns that time out are discarded, although they may be captured via timeout handling, which is discussed later.

Hands-On: Credit Card Fraud Detection Pipeline

The first complete CEP pipeline considered here is a credit card fraud detection system. The use case is canonical for CEP, and three distinct fraud patterns are implemented.

The Transaction Event Class

package com.example.cep.events;

public class Transaction implements java.io.Serializable {
    private String transactionId;
    private String userId;
    private double amount;
    private long timestamp;
    private String location;
    private String merchantCategory;
    private String cardNumber;

    // Default constructor for serialization
    public Transaction() {}

    public Transaction(String transactionId, String userId, double amount,
                       long timestamp, String location, String merchantCategory,
                       String cardNumber) {
        this.transactionId = transactionId;
        this.userId = userId;
        this.amount = amount;
        this.timestamp = timestamp;
        this.location = location;
        this.merchantCategory = merchantCategory;
        this.cardNumber = cardNumber;
    }

    // Getters and setters
    public String getTransactionId() { return transactionId; }
    public void setTransactionId(String transactionId) { this.transactionId = transactionId; }
    public String getUserId() { return userId; }
    public void setUserId(String userId) { this.userId = userId; }
    public double getAmount() { return amount; }
    public void setAmount(double amount) { this.amount = amount; }
    public long getTimestamp() { return timestamp; }
    public void setTimestamp(long timestamp) { this.timestamp = timestamp; }
    public String getLocation() { return location; }
    public void setLocation(String location) { this.location = location; }
    public String getMerchantCategory() { return merchantCategory; }
    public void setMerchantCategory(String mc) { this.merchantCategory = mc; }
    public String getCardNumber() { return cardNumber; }
    public void setCardNumber(String cardNumber) { this.cardNumber = cardNumber; }

    @Override
    public String toString() {
        return String.format("Transaction{id=%s, user=%s, amount=%.2f, loc=%s, time=%d}",
            transactionId, userId, amount, location, timestamp);
    }
}

The Fraud Alert Class

package com.example.cep.events;

import java.util.List;

public class FraudAlert implements java.io.Serializable {
    private String alertId;
    private String userId;
    private String patternType;
    private String description;
    private List<Transaction> matchedTransactions;
    private long detectedAt;

    public FraudAlert(String alertId, String userId, String patternType,
                      String description, List<Transaction> matchedTransactions) {
        this.alertId = alertId;
        this.userId = userId;
        this.patternType = patternType;
        this.description = description;
        this.matchedTransactions = matchedTransactions;
        this.detectedAt = System.currentTimeMillis();
    }

    // Getters
    public String getAlertId() { return alertId; }
    public String getUserId() { return userId; }
    public String getPatternType() { return patternType; }
    public String getDescription() { return description; }
    public List<Transaction> getMatchedTransactions() { return matchedTransactions; }
    public long getDetectedAt() { return detectedAt; }

    @Override
    public String toString() {
        return String.format("FRAUD ALERT [%s] User: %s | Pattern: %s | %s | Transactions: %d",
            alertId, userId, patternType, description, matchedTransactions.size());
    }
}

Defining Fraud Patterns

The core logic of the system is captured by three fraud detection patterns, defined below:

package com.example.cep.patterns;

import com.example.cep.events.Transaction;
import org.apache.flink.cep.pattern.Pattern;
import org.apache.flink.cep.pattern.conditions.IterativeCondition;
import org.apache.flink.cep.pattern.conditions.SimpleCondition;
import org.apache.flink.streaming.api.windowing.time.Time;

public class FraudPatterns {

    /**
     * Pattern 1: Geographic Impossibility
     * Three transactions over $500 within 5 minutes from different locations.
     * Spending observed in New York, then London, then Tokyo within 5 minutes
     * is highly indicative of fraudulent activity.
     */
    public static Pattern<Transaction, ?> geographicImpossibility() {
        return Pattern.<Transaction>begin("first")
            .where(new SimpleCondition<Transaction>() {
                @Override
                public boolean filter(Transaction tx) {
                    return tx.getAmount() > 500.0;
                }
            })
            .followedBy("second")
            .where(new IterativeCondition<Transaction>() {
                @Override
                public boolean filter(Transaction tx, Context<Transaction> ctx) {
                    if (tx.getAmount() <= 500.0) return false;
                    for (Transaction first : ctx.getEventsForPattern("first")) {
                        if (!tx.getLocation().equals(first.getLocation())) {
                            return true;
                        }
                    }
                    return false;
                }
            })
            .followedBy("third")
            .where(new IterativeCondition<Transaction>() {
                @Override
                public boolean filter(Transaction tx, Context<Transaction> ctx) {
                    if (tx.getAmount() <= 500.0) return false;
                    for (Transaction first : ctx.getEventsForPattern("first")) {
                        for (Transaction second : ctx.getEventsForPattern("second")) {
                            if (!tx.getLocation().equals(first.getLocation())
                                && !tx.getLocation().equals(second.getLocation())) {
                                return true;
                            }
                        }
                    }
                    return false;
                }
            })
            .within(Time.minutes(5));
    }

    /**
     * Pattern 2: Card Testing Attack
     * A small "test" transaction ($0.01–$5.00) followed by a large transaction
     * ($1000+) within 1 minute. Fraudsters frequently test stolen cards with
     * very small purchases before attempting larger ones.
     */
    public static Pattern<Transaction, ?> cardTestingAttack() {
        return Pattern.<Transaction>begin("test_charge")
            .where(new SimpleCondition<Transaction>() {
                @Override
                public boolean filter(Transaction tx) {
                    return tx.getAmount() >= 0.01 && tx.getAmount() <= 5.0;
                }
            })
            .followedBy("big_charge")
            .where(new SimpleCondition<Transaction>() {
                @Override
                public boolean filter(Transaction tx) {
                    return tx.getAmount() >= 1000.0;
                }
            })
            .within(Time.minutes(1));
    }

    /**
     * Pattern 3: Transaction Velocity
     * More than 5 transactions within 2 minutes. Even legitimate users
     * rarely conduct this many purchases in such a short interval.
     */
    public static Pattern<Transaction, ?> highVelocity() {
        return Pattern.<Transaction>begin("transactions")
            .where(new SimpleCondition<Transaction>() {
                @Override
                public boolean filter(Transaction tx) {
                    return tx.getAmount() > 0;
                }
            })
            .timesOrMore(5)
            .within(Time.minutes(2));
    }
}

Processing Matched Patterns

package com.example.cep.processors;

import com.example.cep.events.FraudAlert;
import com.example.cep.events.Transaction;
import org.apache.flink.cep.functions.PatternProcessFunction;
import org.apache.flink.util.Collector;

import java.util.*;

public class FraudAlertProcessor
        extends PatternProcessFunction<Transaction, FraudAlert> {

    private final String patternType;

    public FraudAlertProcessor(String patternType) {
        this.patternType = patternType;
    }

    @Override
    public void processMatch(Map<String, List<Transaction>> match,
                             Context ctx,
                             Collector<FraudAlert> out) {
        // Collect all matched transactions from all pattern states
        List<Transaction> allTransactions = new ArrayList<>();
        match.values().forEach(allTransactions::addAll);

        // Extract user ID from first transaction
        String userId = allTransactions.get(0).getUserId();

        // Build a description
        String description = buildDescription(match);

        // Generate alert
        String alertId = UUID.randomUUID().toString();
        FraudAlert alert = new FraudAlert(
            alertId, userId, patternType, description, allTransactions
        );

        out.collect(alert);
    }

    private String buildDescription(Map<String, List<Transaction>> match) {
        StringBuilder sb = new StringBuilder();
        sb.append("Matched pattern '").append(patternType).append("': ");

        double total = 0;
        Set<String> locations = new HashSet<>();
        int count = 0;

        for (List<Transaction> txList : match.values()) {
            for (Transaction tx : txList) {
                total += tx.getAmount();
                locations.add(tx.getLocation());
                count++;
            }
        }

        sb.append(count).append(" transactions, ");
        sb.append(String.format("total $%.2f, ", total));
        sb.append("locations: ").append(locations);

        return sb.toString();
    }
}

The Complete Fraud Detection Pipeline

The pipeline below is wired together end to end, from Kafka source to fraud alert output:

package com.example.cep;

import com.example.cep.events.FraudAlert;
import com.example.cep.events.Transaction;
import com.example.cep.patterns.FraudPatterns;
import com.example.cep.processors.FraudAlertProcessor;

import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.api.common.serialization.SimpleStringSchema;
import org.apache.flink.cep.CEP;
import org.apache.flink.cep.PatternStream;
import org.apache.flink.cep.pattern.Pattern;
import org.apache.flink.connector.kafka.source.KafkaSource;
import org.apache.flink.connector.kafka.source.enumerator.initializer.OffsetsInitializer;
import org.apache.flink.connector.kafka.sink.KafkaSink;
import org.apache.flink.connector.kafka.sink.KafkaRecordSerializationSchema;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;

import com.fasterxml.jackson.databind.ObjectMapper;

import java.time.Duration;

public class FraudDetectionPipeline {

    public static void main(String[] args) throws Exception {
        // 1. Set up the streaming execution environment
        StreamExecutionEnvironment env =
            StreamExecutionEnvironment.getExecutionEnvironment();
        env.setParallelism(4);

        // Enable checkpointing for exactly-once semantics
        env.enableCheckpointing(60_000); // checkpoint every 60 seconds

        // 2. Create Kafka source for transactions
        KafkaSource<String> kafkaSource = KafkaSource.<String>builder()
            .setBootstrapServers("localhost:9092")
            .setTopics("transactions")
            .setGroupId("fraud-detection-group")
            .setStartingOffsets(OffsetsInitializer.latest())
            .setValueOnlyDeserializer(new SimpleStringSchema())
            .build();

        // 3. Read from Kafka with event time watermarks
        ObjectMapper mapper = new ObjectMapper();

        DataStream<Transaction> transactions = env
            .fromSource(kafkaSource, WatermarkStrategy
                .<String>forBoundedOutOfOrderness(Duration.ofSeconds(5))
                .withTimestampAssigner((event, timestamp) -> {
                    try {
                        return mapper.readValue(event, Transaction.class)
                            .getTimestamp();
                    } catch (Exception e) {
                        return timestamp;
                    }
                }), "Kafka Transactions")
            .map(json -> mapper.readValue(json, Transaction.class))
            .keyBy(Transaction::getUserId);  // Key by user for per-user patterns

        // 4. Apply Pattern 1: Geographic Impossibility
        Pattern<Transaction, ?> geoPattern = FraudPatterns.geographicImpossibility();
        PatternStream<Transaction> geoPatternStream = CEP.pattern(
            transactions, geoPattern);

        DataStream<FraudAlert> geoAlerts = geoPatternStream.process(
            new FraudAlertProcessor("GEOGRAPHIC_IMPOSSIBILITY"));

        // 5. Apply Pattern 2: Card Testing Attack
        Pattern<Transaction, ?> testPattern = FraudPatterns.cardTestingAttack();
        PatternStream<Transaction> testPatternStream = CEP.pattern(
            transactions, testPattern);

        DataStream<FraudAlert> testAlerts = testPatternStream.process(
            new FraudAlertProcessor("CARD_TESTING_ATTACK"));

        // 6. Apply Pattern 3: High Velocity
        Pattern<Transaction, ?> velocityPattern = FraudPatterns.highVelocity();
        PatternStream<Transaction> velocityPatternStream = CEP.pattern(
            transactions, velocityPattern);

        DataStream<FraudAlert> velocityAlerts = velocityPatternStream.process(
            new FraudAlertProcessor("HIGH_VELOCITY"));

        // 7. Union all alerts and sink to Kafka
        DataStream<FraudAlert> allAlerts = geoAlerts
            .union(testAlerts)
            .union(velocityAlerts);

        // Print to console (for development)
        allAlerts.print("FRAUD ALERT");

        // Sink to Kafka alerts topic
        KafkaSink<String> alertSink = KafkaSink.<String>builder()
            .setBootstrapServers("localhost:9092")
            .setRecordSerializer(
                KafkaRecordSerializationSchema.builder()
                    .setTopic("fraud-alerts")
                    .setValueSerializationSchema(new SimpleStringSchema())
                    .build()
            )
            .build();

        allAlerts
            .map(alert -> mapper.writeValueAsString(alert))
            .sinkTo(alertSink);

        // 8. Execute the pipeline
        env.execute("Credit Card Fraud Detection CEP Pipeline");
    }
}
Key Takeaway: The pipeline applies multiple independent patterns to the same keyed stream. Each CEP.pattern() call creates a separate NFA instance per key (per user), so patterns are evaluated independently and do not interfere with one another. The keyBy(Transaction::getUserId) call is essential because it ensures that patterns match only those events belonging to the same user.

Hands-On: IoT Sensor Anomaly Detection

The second pipeline detects anomalies in IoT sensor data. The target pattern is a sensor reporting three consecutive rising temperature readings above a threshold within one minute, followed by a pressure drop. The sequence frequently indicates an impending equipment failure. In a production setting, the detected anomalies would be persisted in a time-series database optimised for preprocessed data, and the underlying sensor readings could be supplied to forecasting models for predictive maintenance.

Sensor Event Class

package com.example.cep.events;

public class SensorReading implements java.io.Serializable {
    private String sensorId;
    private double temperature;
    private double pressure;
    private long timestamp;
    private String location;

    public SensorReading() {}

    public SensorReading(String sensorId, double temperature, double pressure,
                         long timestamp, String location) {
        this.sensorId = sensorId;
        this.temperature = temperature;
        this.pressure = pressure;
        this.timestamp = timestamp;
        this.location = location;
    }

    public String getSensorId() { return sensorId; }
    public void setSensorId(String sensorId) { this.sensorId = sensorId; }
    public double getTemperature() { return temperature; }
    public void setTemperature(double temperature) { this.temperature = temperature; }
    public double getPressure() { return pressure; }
    public void setPressure(double pressure) { this.pressure = pressure; }
    public long getTimestamp() { return timestamp; }
    public void setTimestamp(long timestamp) { this.timestamp = timestamp; }
    public String getLocation() { return location; }
    public void setLocation(String location) { this.location = location; }

    @Override
    public String toString() {
        return String.format("Sensor{id=%s, temp=%.1f, pressure=%.1f, time=%d}",
            sensorId, temperature, pressure, timestamp);
    }
}

Complete IoT Anomaly Pipeline

package com.example.cep;

import com.example.cep.events.SensorReading;
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.cep.CEP;
import org.apache.flink.cep.PatternStream;
import org.apache.flink.cep.functions.PatternProcessFunction;
import org.apache.flink.cep.pattern.Pattern;
import org.apache.flink.cep.pattern.conditions.IterativeCondition;
import org.apache.flink.cep.pattern.conditions.SimpleCondition;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.windowing.time.Time;
import org.apache.flink.util.Collector;

import java.time.Duration;
import java.util.*;

public class IoTAnomalyDetectionPipeline {

    private static final double TEMP_THRESHOLD = 85.0; // degrees Celsius
    private static final double PRESSURE_DROP_THRESHOLD = 10.0; // PSI

    public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment env =
            StreamExecutionEnvironment.getExecutionEnvironment();
        env.setParallelism(2);
        env.enableCheckpointing(30_000);

        // Simulated sensor data source (replace with Kafka in production)
        DataStream<SensorReading> sensorStream = env
            .addSource(new SimulatedSensorSource()) // your custom source
            .assignTimestampsAndWatermarks(
                WatermarkStrategy
                    .<SensorReading>forBoundedOutOfOrderness(Duration.ofSeconds(3))
                    .withTimestampAssigner((reading, ts) -> reading.getTimestamp())
            )
            .keyBy(SensorReading::getSensorId);

        // Pattern: 3 consecutive high-temp readings, then a pressure drop
        Pattern<SensorReading, ?> anomalyPattern = Pattern
            .<SensorReading>begin("rising_temp_1")
            .where(new SimpleCondition<SensorReading>() {
                @Override
                public boolean filter(SensorReading reading) {
                    return reading.getTemperature() > TEMP_THRESHOLD;
                }
            })
            .next("rising_temp_2")
            .where(new IterativeCondition<SensorReading>() {
                @Override
                public boolean filter(SensorReading reading,
                                      Context<SensorReading> ctx) {
                    if (reading.getTemperature() <= TEMP_THRESHOLD) return false;
                    for (SensorReading prev : ctx.getEventsForPattern("rising_temp_1")) {
                        return reading.getTemperature() > prev.getTemperature();
                    }
                    return false;
                }
            })
            .next("rising_temp_3")
            .where(new IterativeCondition<SensorReading>() {
                @Override
                public boolean filter(SensorReading reading,
                                      Context<SensorReading> ctx) {
                    if (reading.getTemperature() <= TEMP_THRESHOLD) return false;
                    for (SensorReading prev : ctx.getEventsForPattern("rising_temp_2")) {
                        return reading.getTemperature() > prev.getTemperature();
                    }
                    return false;
                }
            })
            .followedBy("pressure_drop")
            .where(new IterativeCondition<SensorReading>() {
                @Override
                public boolean filter(SensorReading reading,
                                      Context<SensorReading> ctx) {
                    for (SensorReading prev : ctx.getEventsForPattern("rising_temp_1")) {
                        double pressureDiff = prev.getPressure() - reading.getPressure();
                        return pressureDiff > PRESSURE_DROP_THRESHOLD;
                    }
                    return false;
                }
            })
            .within(Time.minutes(1));

        // Apply pattern and process matches
        PatternStream<SensorReading> patternStream =
            CEP.pattern(sensorStream, anomalyPattern);

        DataStream<String> anomalyAlerts = patternStream.process(
            new PatternProcessFunction<SensorReading, String>() {
                @Override
                public void processMatch(Map<String, List<SensorReading>> match,
                                         Context ctx,
                                         Collector<String> out) {
                    SensorReading first = match.get("rising_temp_1").get(0);
                    SensorReading second = match.get("rising_temp_2").get(0);
                    SensorReading third = match.get("rising_temp_3").get(0);
                    SensorReading drop = match.get("pressure_drop").get(0);

                    String alert = String.format(
                        "ANOMALY DETECTED | Sensor: %s | Location: %s | " +
                        "Temps: %.1f -> %.1f -> %.1f (threshold: %.1f) | " +
                        "Pressure drop: %.1f -> %.1f (delta: %.1f)",
                        first.getSensorId(), first.getLocation(),
                        first.getTemperature(), second.getTemperature(),
                        third.getTemperature(), TEMP_THRESHOLD,
                        first.getPressure(), drop.getPressure(),
                        first.getPressure() - drop.getPressure()
                    );

                    out.collect(alert);
                }
            }
        );

        anomalyAlerts.print("IOT ALERT");
        env.execute("IoT Sensor Anomaly Detection Pipeline");
    }
}
Tip: The pipeline uses next() (strict contiguity) for the three rising temperature readings because they must be consecutive. By contrast, followedBy() (relaxed contiguity) is used for the pressure drop, since other normal readings may occur between the temperature spike and the pressure change.

Hands-On: Stock Market Pattern Detection

The third pipeline detects potential trading signals, specifically a price drop greater than 5% followed by a high volume spike within 10 seconds. The pattern can indicate panic selling followed by institutional buying, which may represent a potential buy signal.

StockTick Event Class

package com.example.cep.events;

public class StockTick implements java.io.Serializable {
    private String symbol;
    private double price;
    private long volume;
    private long timestamp;
    private double previousClose;

    public StockTick() {}

    public StockTick(String symbol, double price, long volume,
                     long timestamp, double previousClose) {
        this.symbol = symbol;
        this.price = price;
        this.volume = volume;
        this.timestamp = timestamp;
        this.previousClose = previousClose;
    }

    public String getSymbol() { return symbol; }
    public void setSymbol(String symbol) { this.symbol = symbol; }
    public double getPrice() { return price; }
    public void setPrice(double price) { this.price = price; }
    public long getVolume() { return volume; }
    public void setVolume(long volume) { this.volume = volume; }
    public long getTimestamp() { return timestamp; }
    public void setTimestamp(long timestamp) { this.timestamp = timestamp; }
    public double getPreviousClose() { return previousClose; }
    public void setPreviousClose(double pc) { this.previousClose = pc; }

    public double getPriceChangePercent() {
        if (previousClose == 0) return 0;
        return ((price - previousClose) / previousClose) * 100.0;
    }

    @Override
    public String toString() {
        return String.format("StockTick{sym=%s, price=%.2f, vol=%d, change=%.2f%%}",
            symbol, price, volume, getPriceChangePercent());
    }
}

Complete Stock Market Detection Pipeline

package com.example.cep;

import com.example.cep.events.StockTick;
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.cep.CEP;
import org.apache.flink.cep.PatternStream;
import org.apache.flink.cep.functions.PatternProcessFunction;
import org.apache.flink.cep.pattern.Pattern;
import org.apache.flink.cep.pattern.conditions.IterativeCondition;
import org.apache.flink.cep.pattern.conditions.SimpleCondition;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.windowing.time.Time;
import org.apache.flink.util.Collector;

import java.time.Duration;
import java.util.*;

public class StockPatternDetectionPipeline {

    private static final double PRICE_DROP_THRESHOLD = -5.0; // percent
    private static final double VOLUME_SPIKE_MULTIPLIER = 3.0; // 3x average

    public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment env =
            StreamExecutionEnvironment.getExecutionEnvironment();
        env.setParallelism(4);
        env.enableCheckpointing(10_000);

        // Assume a Kafka source producing StockTick JSON
        // (using simulated source for this example)
        DataStream<StockTick> tickStream = env
            .addSource(new SimulatedStockSource())
            .assignTimestampsAndWatermarks(
                WatermarkStrategy
                    .<StockTick>forBoundedOutOfOrderness(Duration.ofSeconds(2))
                    .withTimestampAssigner((tick, ts) -> tick.getTimestamp())
            )
            .keyBy(StockTick::getSymbol);

        // Pattern: Price drop > 5% followed by volume spike within 10 seconds
        Pattern<StockTick, ?> buySignalPattern = Pattern
            .<StockTick>begin("price_drop")
            .where(new SimpleCondition<StockTick>() {
                @Override
                public boolean filter(StockTick tick) {
                    return tick.getPriceChangePercent() < PRICE_DROP_THRESHOLD;
                }
            })
            .followedBy("volume_spike")
            .where(new IterativeCondition<StockTick>() {
                @Override
                public boolean filter(StockTick tick, Context<StockTick> ctx) {
                    for (StockTick drop : ctx.getEventsForPattern("price_drop")) {
                        // Volume must be at least 3x the volume during the drop
                        if (tick.getVolume() > drop.getVolume() * VOLUME_SPIKE_MULTIPLIER) {
                            return true;
                        }
                    }
                    return false;
                }
            })
            .within(Time.seconds(10));

        // Apply pattern
        PatternStream<StockTick> patternStream =
            CEP.pattern(tickStream, buySignalPattern);

        DataStream<String> signals = patternStream.process(
            new PatternProcessFunction<StockTick, String>() {
                @Override
                public void processMatch(Map<String, List<StockTick>> match,
                                         Context ctx,
                                         Collector<String> out) {
                    StockTick drop = match.get("price_drop").get(0);
                    StockTick spike = match.get("volume_spike").get(0);

                    String signal = String.format(
                        "BUY SIGNAL | %s | Drop: %.2f%% (price $%.2f) | " +
                        "Volume spike: %d -> %d (%.1fx) | " +
                        "Current price: $%.2f",
                        drop.getSymbol(),
                        drop.getPriceChangePercent(),
                        drop.getPrice(),
                        drop.getVolume(),
                        spike.getVolume(),
                        (double) spike.getVolume() / drop.getVolume(),
                        spike.getPrice()
                    );

                    out.collect(signal);
                }
            }
        );

        signals.print("TRADING SIGNAL");
        env.execute("Stock Market Pattern Detection Pipeline");
    }
}
Caution: The example above illustrates pattern detection for educational purposes and does not constitute investment advice. Production algorithmic trading systems incorporate substantially more signals, risk management, and regulatory safeguards. Trading decisions should not be made on the basis of a single CEP pattern.

Advanced CEP Techniques

Once the fundamentals are in place, the following advanced techniques bring CEP pipelines to production quality.

Dynamic Patterns from External Configuration

Hard-coded patterns are acceptable during initial development, but production systems must update rules without redeployment. One approach is to load pattern parameters from an external source:

// Load thresholds from a configuration source
public class DynamicFraudPatterns {

    public static Pattern<Transaction, ?> fromConfig(FraudRuleConfig config) {
        return Pattern.<Transaction>begin("test_charge")
            .where(new SimpleCondition<Transaction>() {
                @Override
                public boolean filter(Transaction tx) {
                    return tx.getAmount() >= config.getMinTestAmount()
                        && tx.getAmount() <= config.getMaxTestAmount();
                }
            })
            .followedBy("big_charge")
            .where(new SimpleCondition<Transaction>() {
                @Override
                public boolean filter(Transaction tx) {
                    return tx.getAmount() >= config.getLargeTransactionThreshold();
                }
            })
            .within(Time.minutes(config.getTimeWindowMinutes()));
    }
}

// Configuration POJO loaded from database, file, or broadcast stream
public class FraudRuleConfig implements java.io.Serializable {
    private double minTestAmount = 0.01;
    private double maxTestAmount = 5.0;
    private double largeTransactionThreshold = 1000.0;
    private int timeWindowMinutes = 1;

    // getters and setters...
}
Tip: For fully dynamic pattern updates without restarting the Flink job, Flink’s Broadcast State can be used to distribute new rule configurations to all parallel instances. The CEP library itself does not support changing patterns at runtime, but a custom operator can re-create patterns when new configurations arrive via a broadcast stream.

Side Outputs for Timeout Handling

When a partial pattern match times out, that is, when the within() window expires before the pattern completes, the timed-out partial matches can be captured using TimedOutPartialMatchHandler:

import org.apache.flink.cep.functions.PatternProcessFunction;
import org.apache.flink.cep.functions.TimedOutPartialMatchHandler;
import org.apache.flink.util.OutputTag;

public class FraudAlertWithTimeout
        extends PatternProcessFunction<Transaction, FraudAlert>
        implements TimedOutPartialMatchHandler<Transaction> {

    // Side output for timed-out partial matches
    public static final OutputTag<String> TIMEOUT_TAG =
        new OutputTag<String>("timed-out-patterns") {};

    @Override
    public void processMatch(Map<String, List<Transaction>> match,
                             Context ctx,
                             Collector<FraudAlert> out) {
        // Process fully matched pattern (same as before)
        // ...
    }

    @Override
    public void processTimedOutMatch(Map<String, List<Transaction>> match,
                                     Context ctx) {
        // A partial match timed out — log it for analysis
        StringBuilder sb = new StringBuilder("PARTIAL MATCH TIMEOUT: ");
        for (Map.Entry<String, List<Transaction>> entry : match.entrySet()) {
            sb.append(entry.getKey()).append("=")
              .append(entry.getValue().size()).append(" events; ");
        }

        // Output to side output
        ctx.output(TIMEOUT_TAG, sb.toString());
    }
}

// In your pipeline, capture the side output:
SingleOutputStreamOperator<FraudAlert> alerts = patternStream
    .process(new FraudAlertWithTimeout());

DataStream<String> timedOutPatterns = alerts
    .getSideOutput(FraudAlertWithTimeout.TIMEOUT_TAG);

timedOutPatterns.print("TIMEOUT");

Scaling CEP Jobs

CEP pattern matching is stateful because the NFA maintains partial match buffers per key. The principal scaling considerations are summarised below:

  • Key Partitioning: The stream should be passed through keyBy() before CEP patterns are applied. This ensures that events for the same entity (user, sensor, stock symbol) are routed to the same parallel instance.
  • Parallelism: Parallelism should be selected on the basis of key cardinality. For 10,000 users, a parallelism of 8–16 is generally sufficient. Flink distributes keys across parallel instances using hash partitioning.
  • State Size: Each active partial match consumes memory. With long time windows or high-cardinality patterns, state size should be monitored carefully.
// Set different parallelism for different pipeline stages
DataStream<Transaction> transactions = env
    .fromSource(kafkaSource, watermarkStrategy, "source")
    .setParallelism(8)  // match Kafka partitions
    .map(json -> mapper.readValue(json, Transaction.class))
    .setParallelism(8)
    .keyBy(Transaction::getUserId);

// CEP pattern matching — can be different parallelism
PatternStream<Transaction> patternStream = CEP.pattern(
    transactions.setParallelism(16),  // more parallelism for CPU-heavy matching
    fraudPattern
);

State Management and Checkpointing

import org.apache.flink.contrib.streaming.state.EmbeddedRocksDBStateBackend;
import org.apache.flink.streaming.api.CheckpointingMode;
import org.apache.flink.streaming.api.environment.CheckpointConfig;

// Configure robust checkpointing
env.setStateBackend(new EmbeddedRocksDBStateBackend());
env.enableCheckpointing(60_000, CheckpointingMode.EXACTLY_ONCE);

CheckpointConfig checkpointConfig = env.getCheckpointConfig();
checkpointConfig.setMinPauseBetweenCheckpoints(30_000);
checkpointConfig.setCheckpointTimeout(120_000);
checkpointConfig.setMaxConcurrentCheckpoints(1);
checkpointConfig.setTolerableCheckpointFailureNumber(3);

// Retain checkpoints on cancellation (for savepoint-like recovery)
checkpointConfig.setExternalizedCheckpointCleanup(
    CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION
);

Event Time and Processing Time

The distinction between event time and processing time is of central importance for CEP. Event time is the moment at which the event actually occurred, as embedded in the event data. Processing time is the moment at which the Flink operator processes the event. Under ideal conditions, the two values would coincide. In practice, events arrive late, out of order, and at variable rates.

Why Event Time Matters for CEP

Consider a fraud detection pattern defined as “three transactions within 5 minutes.” If transaction #2 arrives at the system 10 seconds late owing to network congestion, processing time would register a gap that does not actually exist. Event time correctly identifies that the three transactions occurred within the 5-minute window, irrespective of when they arrived.

Watermark Strategies

import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.api.common.eventtime.WatermarkGenerator;
import org.apache.flink.api.common.eventtime.WatermarkOutput;
import org.apache.flink.api.common.eventtime.WatermarkGeneratorSupplier;

// Strategy 1: Bounded out-of-orderness (most common)
// Assumes events can arrive up to 5 seconds late
WatermarkStrategy<Transaction> strategy1 = WatermarkStrategy
    .<Transaction>forBoundedOutOfOrderness(Duration.ofSeconds(5))
    .withTimestampAssigner((tx, recordTimestamp) -> tx.getTimestamp());

// Strategy 2: Monotonous timestamps (events always in order)
// Only use if you can guarantee ordering
WatermarkStrategy<Transaction> strategy2 = WatermarkStrategy
    .<Transaction>forMonotonousTimestamps()
    .withTimestampAssigner((tx, recordTimestamp) -> tx.getTimestamp());

// Strategy 3: Custom watermark generator for complex scenarios
WatermarkStrategy<Transaction> strategy3 = WatermarkStrategy
    .<Transaction>forGenerator(context -> new WatermarkGenerator<Transaction>() {
        private long maxTimestamp = Long.MIN_VALUE;
        private static final long MAX_DELAY = 10_000L; // 10 seconds

        @Override
        public void onEvent(Transaction tx, long eventTimestamp,
                            WatermarkOutput output) {
            maxTimestamp = Math.max(maxTimestamp, tx.getTimestamp());
        }

        @Override
        public void onPeriodicEmit(WatermarkOutput output) {
            output.emitWatermark(
                new org.apache.flink.api.common.eventtime.Watermark(
                    maxTimestamp - MAX_DELAY
                )
            );
        }
    })
    .withTimestampAssigner((tx, recordTimestamp) -> tx.getTimestamp());
Key Takeaway: For most CEP applications, forBoundedOutOfOrderness() with a bound of 5–10 seconds is the appropriate choice. A bound that is too low causes late events to be missed, while a bound that is too high delays pattern matching by the same amount, since Flink cannot process an event-time window until the watermark passes it.

Connecting to Real Data Sources

Kafka Source Connector

Most production CEP pipelines read from Apache Kafka. For a Python-focused treatment of Kafka consumer implementation, see the Apache Kafka consumer implementation guide in Python. A complete, production-ready Kafka source setup in Java is shown below:

import org.apache.flink.api.common.serialization.DeserializationSchema;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.connector.kafka.source.KafkaSource;
import org.apache.flink.connector.kafka.source.enumerator.initializer.OffsetsInitializer;
import com.fasterxml.jackson.databind.ObjectMapper;

// Custom deserializer for Transaction events
public class TransactionDeserializer
        implements DeserializationSchema<Transaction> {

    private transient ObjectMapper mapper;

    @Override
    public Transaction deserialize(byte[] message) {
        if (mapper == null) mapper = new ObjectMapper();
        try {
            return mapper.readValue(message, Transaction.class);
        } catch (Exception e) {
            // Log and skip malformed events
            System.err.println("Failed to deserialize: " + new String(message));
            return null;
        }
    }

    @Override
    public boolean isEndOfStream(Transaction nextElement) {
        return false;
    }

    @Override
    public TypeInformation<Transaction> getProducedType() {
        return TypeInformation.of(Transaction.class);
    }
}

// Build the Kafka source
KafkaSource<Transaction> source = KafkaSource.<Transaction>builder()
    .setBootstrapServers("kafka-broker-1:9092,kafka-broker-2:9092")
    .setTopics("transactions")
    .setGroupId("fraud-detection-v2")
    .setStartingOffsets(OffsetsInitializer.latest())
    .setValueOnlyDeserializer(new TransactionDeserializer())
    .setProperty("security.protocol", "SASL_SSL")
    .setProperty("sasl.mechanism", "PLAIN")
    .setProperty("sasl.jaas.config",
        "org.apache.kafka.common.security.plain.PlainLoginModule required " +
        "username=\"api-key\" password=\"api-secret\";")
    .build();

Kafka Sink for Alerts

import org.apache.flink.connector.kafka.sink.KafkaSink;
import org.apache.flink.connector.kafka.sink.KafkaRecordSerializationSchema;
import org.apache.flink.api.common.serialization.SimpleStringSchema;
import org.apache.flink.connector.base.DeliveryGuarantee;

KafkaSink<String> alertSink = KafkaSink.<String>builder()
    .setBootstrapServers("kafka-broker-1:9092")
    .setRecordSerializer(
        KafkaRecordSerializationSchema.builder()
            .setTopic("fraud-alerts")
            .setValueSerializationSchema(new SimpleStringSchema())
            .build()
    )
    .setDeliveryGuarantee(DeliveryGuarantee.EXACTLY_ONCE)
    .setTransactionalIdPrefix("fraud-alert-sink")
    .build();

// Wire it up
allAlerts
    .map(alert -> mapper.writeValueAsString(alert))
    .sinkTo(alertSink);

JDBC Connector for Enrichment

It is often necessary to enrich events with data from a database, for example by looking up a customer’s risk score before CEP patterns are applied. Flink’s asynchronous I/O is well suited to this purpose:

import org.apache.flink.streaming.api.functions.async.AsyncFunction;
import org.apache.flink.streaming.api.functions.async.ResultFuture;
import org.apache.flink.streaming.api.datastream.AsyncDataStream;
import java.util.concurrent.TimeUnit;

// Async enrichment function
public class CustomerEnrichment
        extends RichAsyncFunction<Transaction, EnrichedTransaction> {

    private transient DataSource dataSource;

    @Override
    public void open(Configuration parameters) {
        // Initialize connection pool
        dataSource = createConnectionPool();
    }

    @Override
    public void asyncInvoke(Transaction tx,
                            ResultFuture<EnrichedTransaction> resultFuture) {
        CompletableFuture.supplyAsync(() -> {
            try (Connection conn = dataSource.getConnection();
                 PreparedStatement stmt = conn.prepareStatement(
                     "SELECT risk_score, account_age FROM customers WHERE id = ?")) {
                stmt.setString(1, tx.getUserId());
                ResultSet rs = stmt.executeQuery();
                if (rs.next()) {
                    return new EnrichedTransaction(tx,
                        rs.getDouble("risk_score"),
                        rs.getInt("account_age"));
                }
                return new EnrichedTransaction(tx, 0.5, 0);
            } catch (Exception e) {
                return new EnrichedTransaction(tx, 0.5, 0);
            }
        }).thenAccept(result -> resultFuture.complete(
            Collections.singleton(result)));
    }
}

// Apply async enrichment before CEP
DataStream<EnrichedTransaction> enriched = AsyncDataStream
    .unorderedWait(
        transactionStream,
        new CustomerEnrichment(),
        30, TimeUnit.SECONDS, // timeout
        100 // max concurrent requests
    );

Flink also supports connectors for Apache Pulsar, Amazon Kinesis, and many other systems through its connector ecosystem. The setup is broadly similar: define a source, assign watermarks, and feed the stream into the CEP patterns.

Deploying and Monitoring

Running Locally for Development

The simplest development workflow is to run the job directly within an IDE. Flink will then create a local mini-cluster automatically:

// This works out of the box in your IDE
StreamExecutionEnvironment env =
    StreamExecutionEnvironment.getExecutionEnvironment();
// Flink automatically creates a local mini-cluster

Docker Compose for Local Flink and Kafka

For integration testing, the following Docker Compose configuration provides a local Flink and Kafka environment:

# docker-compose.yml
version: '3.8'

services:
  zookeeper:
    image: confluentinc/cp-zookeeper:7.5.3
    environment:
      ZOOKEEPER_CLIENT_PORT: 2181
    ports:
      - "2181:2181"

  kafka:
    image: confluentinc/cp-kafka:7.5.3
    depends_on:
      - zookeeper
    ports:
      - "9092:9092"
    environment:
      KAFKA_BROKER_ID: 1
      KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181
      KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://localhost:9092
      KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1
      KAFKA_AUTO_CREATE_TOPICS_ENABLE: "true"

  flink-jobmanager:
    image: flink:1.18.1-java17
    ports:
      - "8081:8081"  # Flink Web UI
    command: jobmanager
    environment:
      FLINK_PROPERTIES: |
        jobmanager.rpc.address: flink-jobmanager
        state.backend: rocksdb
        state.checkpoints.dir: file:///tmp/flink-checkpoints
        state.savepoints.dir: file:///tmp/flink-savepoints

  flink-taskmanager:
    image: flink:1.18.1-java17
    depends_on:
      - flink-jobmanager
    command: taskmanager
    scale: 2  # Run 2 task managers
    environment:
      FLINK_PROPERTIES: |
        jobmanager.rpc.address: flink-jobmanager
        taskmanager.numberOfTaskSlots: 4
        taskmanager.memory.process.size: 2048m

Deploying to a Flink Cluster

The fat JAR should be built and submitted to the cluster:

# Build the fat JAR
mvn clean package -DskipTests

# Submit to standalone cluster
./bin/flink run \
  -c com.example.cep.FraudDetectionPipeline \
  target/flink-cep-pipeline-1.0.0.jar

# Submit to YARN cluster
./bin/flink run -m yarn-cluster \
  -yn 4 \       # 4 TaskManagers
  -ys 8 \       # 8 slots per TaskManager
  -yjm 2048m \  # JobManager memory
  -ytm 4096m \  # TaskManager memory
  -c com.example.cep.FraudDetectionPipeline \
  target/flink-cep-pipeline-1.0.0.jar

# Submit to Kubernetes (using Flink Kubernetes Operator)
kubectl apply -f flink-cep-deployment.yaml

Monitoring the Pipeline

The Flink Web UI (default port 8081) is the primary monitoring interface. The most important metrics are summarised below:

  • Checkpoint Duration: If checkpoints take longer than the configured interval, cascading delays appear. Checkpoint duration should be kept below 50% of the checkpoint interval.
  • Backpressure: When a downstream operator cannot keep pace, backpressure propagates upstream. The Web UI indicates this with colour-coded task states, where red signals a problem.
  • Throughput (records/second): Input and output rates for each operator should be monitored. A sudden drop in output rate with constant input suggests a processing bottleneck.
  • State Size: CEP patterns maintain partial match buffers. State size should be observed over time, since unbounded growth indicates a pattern or key-space problem.

Performance Optimisation

Making a CEP pipeline functional is one matter; making it handle production volumes efficiently is another. The principal tuning levers are described below.

Choosing the Right Parallelism

Parallelism controls the number of parallel instances of each operator that Flink runs. For CEP pipelines, the following guidelines apply:

  • Source parallelism: Should match the number of Kafka partitions. If the topic has 16 partitions, source parallelism should be set to 16.
  • CEP operator parallelism: Depends on key cardinality and pattern complexity. A reasonable starting point is the same parallelism as the source, with subsequent increases if backpressure appears on the CEP operator.
  • Sink parallelism: Typically lower than CEP parallelism because alert volume is substantially lower than input volume.

State Backend Selection

State Backend State Size Speed Best For
HashMapStateBackend (Heap) Limited by JVM heap Fastest Small state, low latency requirements
EmbeddedRocksDBStateBackend Limited by disk Slower (disk I/O) Large state, long time windows

 

For CEP workloads specifically, the heap state backend is adequate when patterns have short time windows (seconds to minutes) and moderate key cardinality. For long time windows on the order of hours, or millions of keys with active partial matches, RocksDB is the safer option.

Setting Fraud Detection IoT Monitoring Market Data
Parallelism 8–32 4–16 16–64
Checkpoint Interval 60s 30s 10s
State Backend RocksDB Heap or RocksDB Heap
Watermark Bound 5s 3s 1s
TaskManager Memory 4–8 GB 2–4 GB 8–16 GB
Serialization Avro or Protobuf Avro Protobuf (smallest size)

 

Serialisation Considerations

Flink’s default Java serialisation is slow and produces large state snapshots. For production CEP pipelines, event types should be registered with Flink’s type system or serialised efficiently:

// Register types for efficient serialization
env.getConfig().registerTypeWithKryoSerializer(
    Transaction.class, ProtobufSerializer.class);

// Or use Flink's POJO serialization (automatic for well-formed POJOs)
// Ensure your classes:
// 1. Have a no-arg constructor
// 2. Have public getters/setters for all fields
// 3. Implement Serializable

// For Avro serialization, use Flink's Avro format
// Add dependency: flink-avro
// Then use AvroDeserializationSchema:
import org.apache.flink.formats.avro.AvroDeserializationSchema;

KafkaSource<Transaction> avroSource = KafkaSource.<Transaction>builder()
    .setBootstrapServers("localhost:9092")
    .setTopics("transactions-avro")
    .setGroupId("fraud-detection")
    .setValueOnlyDeserializer(
        AvroDeserializationSchema.forSpecific(Transaction.class))
    .build();

Common Pitfalls and Troubleshooting

The most frequently encountered issues are summarised below:

Problem Cause Solution
Pattern never matches Events arrive out of order; within() window too tight; using next() when followedBy() is needed Check event ordering, increase time window, switch contiguity mode
Too many matches (false positives) Pattern conditions too loose; using followedByAny() generating combinatorial explosion Add tighter conditions, switch to followedBy(), shorten time window
OutOfMemoryError Large NFA state from long time windows, high key cardinality, or followedByAny() with oneOrMore() Switch to RocksDB state backend, shorten time windows, add until() conditions
Checkpoint failures State too large to snapshot within timeout; backpressure causing delays Increase checkpoint timeout, enable incremental checkpointing with RocksDB, reduce state size
Watermark stalling (no progress) One Kafka partition has no data—its watermark stays at Long.MIN_VALUE, blocking global watermark Use withIdleness(Duration.ofMinutes(1)) on watermark strategy
Duplicate alerts after restart Reprocessing events without checkpointed state Always restart from savepoint/checkpoint, enable exactly-once on sinks
ClassNotFoundException at runtime flink-cep not in the fat JAR; marked as provided by mistake Ensure flink-cep is not marked as provided—only flink-streaming-java and flink-clients should be

 

Fixing Watermark Stalling

Watermark stalling is among the most difficult issues to diagnose. If a single Kafka partition ceases to produce events, its watermark remains at negative infinity, which blocks the global watermark for the entire job. The remedy is straightforward:

WatermarkStrategy<Transaction> strategy = WatermarkStrategy
    .<Transaction>forBoundedOutOfOrderness(Duration.ofSeconds(5))
    .withTimestampAssigner((tx, ts) -> tx.getTimestamp())
    .withIdleness(Duration.ofMinutes(1));  // Mark source as idle after 1 min

Debugging Pattern Matches

When patterns do not match as expected, a pass-through select can be inserted before the CEP operator in order to verify that events are flowing and correctly keyed:

// Debug: print events as they enter the CEP operator
transactions
    .map(tx -> {
        System.out.println("CEP INPUT: " + tx);
        return tx;
    })
    .keyBy(Transaction::getUserId);

// Also: check that your conditions actually match
// by testing them in a unit test
@Test
public void testFraudCondition() {
    Transaction tx = new Transaction("1", "user1", 600.0,
        System.currentTimeMillis(), "NYC", "electronics", "1234");
    assertTrue(tx.getAmount() > 500.0);  // Verify condition logic
}

Final Thoughts

Complex Event Processing with Apache Flink supports the detection of sophisticated patterns across millions of events per second with millisecond latency and exactly-once guarantees. The present guide has covered considerable ground, from the fundamentals of CEP and the Flink pattern API to three complete, production-style pipelines for fraud detection, IoT monitoring, and financial market analysis.

The principal lessons may be summarised as follows:

  • Select the appropriate contiguity: next() for strict sequences, followedBy() for relaxed matching, and followedByAny() sparingly, given its computational cost.
  • Always use event time with appropriate watermark strategies. Processing time produces incorrect pattern matches in any real-world system where events arrive out of order.
  • Key the streams: CEP patterns should almost always be applied to keyed streams so that matches remain scoped to a logical entity such as a user, sensor, or stock symbol.
  • Handle timeouts: Implementing TimedOutPartialMatchHandler allows partial matches that do not complete within the time window to be captured and analysed.
  • Monitor state size: CEP is inherently stateful. RocksDB is recommended for large state, time windows should remain as short as possible, and combinatorial explosion in non-deterministic patterns should be monitored.
  • Start simple and iterate: An initial implementation should begin with a single pattern on a small data sample, verified for correctness before complexity or scale are increased.

Flink’s CEP library is among the most capable pattern-matching engines in the open-source ecosystem. The patterns and techniques presented here provide the foundation required to build a first production CEP pipeline. For reproducible deployment of Flink applications, containerisation with Docker simplifies both local development and production rollout. The fraud detection example offers a suitable starting point that can be adapted to the target domain and scaled accordingly.

References

You Might Also Like

Comments

Leave a Reply

Your email address will not be published. Required fields are marked *