Skip to content

feat: Add Adaptive Kafka watermark generator#346

Open
ferenc-csaky wants to merge 3 commits into
mainfrom
feat/adaptive-watermark-gen
Open

feat: Add Adaptive Kafka watermark generator#346
ferenc-csaky wants to merge 3 commits into
mainfrom
feat/adaptive-watermark-gen

Conversation

@ferenc-csaky

@ferenc-csaky ferenc-csaky commented Jun 1, 2026

Copy link
Copy Markdown
Collaborator

Summary

Adds adaptive Kafka source watermarks for SOURCE_WATERMARK() and introduces opt-in idle watermark advancement for sparse Kafka streams.

The adaptive source watermark strategy estimates out-of-orderness from observed Kafka record timestamps and emits source watermarks once enough records have been observed. Idle advancement is disabled by default and can be enabled explicitly for sources where it is safe to advance event time during periods with no records.

Changes

  • Added adaptive source-watermark configuration:

    • scan.source-watermark.min-records
    • scan.source-watermark.min-out-of-orderness
    • scan.source-watermark.max-out-of-orderness
    • scan.source-watermark.out-of-orderness-quantile
  • Added opt-in idle source-watermark advancement:

    • scan.source-watermark.idle-advance-timeout
    • scan.source-watermark.idle-advance-safety-margin
    • scan.source-watermark.idle-advance-broker-check-timeout
    • scan.source-watermark.idle-advance-broker-check-ttl
  • Idle advancement remains disabled by default.

  • Idle advancement only proceeds when the Kafka readiness check confirms all explicit source topics use LogAppendTime.

  • Broker/topic readiness checks fail on errors, missing configs, topic-pattern sources, empty topic lists, or non-LogAppendTime topics.

  • Readiness checks are cached for 10 s by default to avoid checking Kafka every watermark period.

  • Reuses one lazy transient Kafka AdminClient per source checker/subtask to avoid repeated cold AdminClient bootstrap overhead.

  • Applies Flink withIdleness(...) only when idle advancement is disabled. When idle advancement is enabled, the source watermark generator must remain active so it can emit wall-clock-derived watermarks.

  • Exposes source-watermark options for both:

    • kafka-safe
    • upsert-kafka-safe
  • Added focused unit test coverage for:

    • adaptive watermark behavior
    • idle watermark advancement
    • readiness-check TTL caching
  • Added Kafka source-watermark integration test coverage for:

    • normal adaptive source watermarks
    • sparse-stream idle advancement
    • sparse-stream behavior without idle advancement

Safety Notes

Idle advancement is intentionally opt-in because advancing event time during source silence can mark later bursty traffic as late.

When enabled, idle advancement emits conservative wall-clock-derived watermarks using:

watermark = maxTimestamp + idleDuration - outOfOrderness - safetyMargin - 1

The final -1 preserves Flink’s exclusive watermark boundary so records exactly on the boundary remain on time.

Validation

Passed:

mvn -pl connectors/kafka-safe-connector clean test -Dtest=KafkaRecordTimestampWatermarkStrategyTest,KafkaAdminIdleAdvanceReadinessCheckerTest
mvn -pl flink-sql-runner -am -Dtest=NoSuchTest -Dit.test=KafkaSourceWatermarkIT verify

@ferenc-csaky ferenc-csaky force-pushed the feat/adaptive-watermark-gen branch from f5dd621 to 0c3a726 Compare June 1, 2026 14:50
@ferenc-csaky ferenc-csaky force-pushed the feat/adaptive-watermark-gen branch from 95f82a6 to 7125130 Compare June 2, 2026 15:07
});

} catch (Exception e) {
log.debug("Failed to check broker and topic timestamp type", e);

Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think it's fine to ignore the exception here, as we don't want to fail a job for e.g. intermittent network failures, but if the user configures this, they want to know on if they e.g. misconfigured the permissions, and not let it silently fail.

I think we can either check explicitly for org.apache.kafka.common.errors.AuthorizationException here, or add a maximum amount of tolerable failures.

KafkaAdminIdleAdvanceReadinessChecker(
Properties kafkaProperties, List<String> topics, SourceWatermarkConfig config) {
this.kafkaProperties = kafkaProperties;
checkArgument(topics != null && !topics.isEmpty(), "topics must not be empty");

Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This can be null if topic-pattern is used instead of topics

long idleAdvanceTimeoutMillis = config.idleAdvanceTimeoutMillis();

if (idleAdvanceTimeoutMillis <= 0
|| recordCount == 0

Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Won't this cause issues for very slow topics, where no records were emitted from a few partitions? Those partitions could hold back the watermark here.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants