Understanding Watermarks in Apache Flink

What are Watermarks?

Watermarks in Apache Flink are a mechanism to handle event time and out-of-order events in stream processing. They represent a point in time in the data stream and indicate that no events with timestamps earlier than the watermark should be expected. Essentially, watermarks help Flink understand the progress of event time in the stream and trigger computations like window operations based on this understanding.

  • Event Time Event Time is the time at which events actually occurred, as recorded in the event data itself. For more detailed information, you can refer to the Understanding Event Time in Apache Flink
  • Ingestion Time Ingestion Time is the time when events enter the Flink pipeline.
  • Processing Time Processing Time is the time when events are processed by Flink.

Watermarks

  • Definition: A watermark is a timestamp that flows as part of the data stream and denotes the progress of event time.
  • Purpose: Watermarks help in handling late events and triggering event-time-based operations like windowing.

Alt textSource: Apache Flink

Alt textSource: Apache Flink

Generating Watermarks

Watermarks can be generated in two main ways:

Punctuated Watermarks: These are emitted at specific points in the stream, often when certain events are encountered.

Periodic Watermarks: These are emitted at regular intervals.

Example of Watermark Generation

Here’s a code example demonstrating how to assign timestamps and generate watermarks using periodic watermarks:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
import org.apache.flink.api.common.eventtime.SerializableTimestampAssigner;
import org.apache.flink.api.common.eventtime.WatermarkGenerator;
import org.apache.flink.api.common.eventtime.WatermarkGeneratorSupplier;
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.source.SourceFunction;
import org.apache.flink.streaming.api.watermark.Watermark;

public class WatermarkExample {

    public static void main(String[] args) throws Exception {
        final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

        DataStream<Event> stream = env.addSource(new EventSource());

        WatermarkStrategy<Event> watermarkStrategy = WatermarkStrategy
            .<Event>forBoundedOutOfOrderness(Duration.ofSeconds(5))
            .withTimestampAssigner(new SerializableTimestampAssigner<Event>() {
                @Override
                public long extractTimestamp(Event event, long recordTimestamp) {
                    return event.getTimestamp();
                }
            });

        DataStream<Event> watermarkedStream = stream.assignTimestampsAndWatermarks(watermarkStrategy);

        watermarkedStream
            .keyBy(event -> event.getKey())
            .window(TumblingEventTimeWindows.of(Time.seconds(10)))
            .process(new EventTimeWindowFunction())
            .print();

        env.execute("Watermark Example");
    }
}

Example Source Function

Here’s a simple source function generating events with timestamps:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
public class EventSource implements SourceFunction<Event> {
    private volatile boolean running = true;

    @Override
    public void run(SourceContext<Event> ctx) throws Exception {
        while (running) {
            long timestamp = System.currentTimeMillis();
            ctx.collectWithTimestamp(new Event("key", timestamp), timestamp);
            ctx.emitWatermark(new Watermark(timestamp - 5000));
            Thread.sleep(100);
        }
    }

    @Override
    public void cancel() {
        running = false;
    }
}

Handling Parallel Streams

In a distributed environment, Flink processes streams in parallel. Each parallel sub-task can emit its own watermarks. Flink uses the minimum watermark of all parallel sub-tasks to ensure that no events are missed.

Alt textSource: Apache Flink

Example of Parallel Watermark Handling

When using parallel streams, each sub-task generates its watermarks, and Flink computes the minimum watermark across all sub-tasks. This is crucial to correctly handle late events and ensure accurate window computations.

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
DataStream<Event> stream = env.addSource(new ParallelEventSource()).setParallelism(4);

WatermarkStrategy<Event> watermarkStrategy = WatermarkStrategy
    .<Event>forBoundedOutOfOrderness(Duration.ofSeconds(5))
    .withTimestampAssigner(new SerializableTimestampAssigner<Event>() {
        @Override
        public long extractTimestamp(Event event, long recordTimestamp) {
            return event.getTimestamp();
        }
    });

DataStream<Event> watermarkedStream = stream.assignTimestampsAndWatermarks(watermarkStrategy);

watermarkedStream
    .keyBy(event -> event.getKey())
    .window(TumblingEventTimeWindows.of(Time.seconds(10)))
    .process(new EventTimeWindowFunction())
    .print();

Watermark Strategy for Parallel Sources

Here’s an example of a parallel source function generating events and watermarks:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
public class ParallelEventSource extends RichParallelSourceFunction<Event> {
    private volatile boolean running = true;

    @Override
    public void run(SourceContext<Event> ctx) throws Exception {
        while (running) {
            long timestamp = System.currentTimeMillis();
            ctx.collectWithTimestamp(new Event("key", timestamp), timestamp);
            ctx.emitWatermark(new Watermark(timestamp - 5000));
            Thread.sleep(100);
        }
    }

    @Override
    public void cancel() {
        running = false;
    }
}

Benefits of Watermarks

Handling Late Data: Watermarks allow the system to process late events and include them in the correct windows, ensuring completeness and accuracy.

Event Time Processing: With watermarks, Flink can process events based on their actual occurrence time, making it suitable for applications where timing is critical.

Out-of-Order Event Handling: Watermarks enable Flink to handle out-of-order events gracefully by providing a tolerance for lateness.

Conclusion

Watermarks are a critical component in Apache Flink for dealing with event time and out-of-order data. By generating and using watermarks, Flink can accurately perform time-based computations like windowing and aggregations, even in the presence of late-arriving events. This makes Flink a powerful tool for real-time stream processing applications.

For more detailed information, you can refer to the Apache Flink Documentation on Watermarks and Event Time.

Comments