From 9622dc651291afa2cc09e7883b0f06359f43a7fc Mon Sep 17 00:00:00 2001 From: Ferenc Csaky Date: Mon, 1 Jun 2026 14:42:53 +0200 Subject: [PATCH 1/6] feat: Add Adaptive Kafka watermark generator --- ...KafkaRecordTimestampWatermarkStrategy.java | 140 ++++++++++++++++++ .../kafka/table/SafeKafkaDynamicSource.java | 35 ++++- ...aRecordTimestampWatermarkStrategyTest.java | 98 ++++++++++++ .../flinkrunner/AbstractITSupport.java | 23 ++- .../flinkrunner/KafkaSourceWatermarkIT.java | 105 +++++++++++++ .../resources/sql/kafka_source_watermark.sql | 36 +++++ .../sql/kafka_source_watermark_produce.sql | 28 ++++ 7 files changed, 457 insertions(+), 8 deletions(-) create mode 100644 connectors/kafka-safe-connector/src/main/java/com/datasqrl/flinkrunner/connector/kafka/KafkaRecordTimestampWatermarkStrategy.java create mode 100644 connectors/kafka-safe-connector/src/test/java/com/datasqrl/flinkrunner/connector/kafka/KafkaRecordTimestampWatermarkStrategyTest.java create mode 100644 flink-sql-runner/src/test/java/com/datasqrl/flinkrunner/KafkaSourceWatermarkIT.java create mode 100644 flink-sql-runner/src/test/resources/sql/kafka_source_watermark.sql create mode 100644 flink-sql-runner/src/test/resources/sql/kafka_source_watermark_produce.sql diff --git a/connectors/kafka-safe-connector/src/main/java/com/datasqrl/flinkrunner/connector/kafka/KafkaRecordTimestampWatermarkStrategy.java b/connectors/kafka-safe-connector/src/main/java/com/datasqrl/flinkrunner/connector/kafka/KafkaRecordTimestampWatermarkStrategy.java new file mode 100644 index 00000000..f78de399 --- /dev/null +++ b/connectors/kafka-safe-connector/src/main/java/com/datasqrl/flinkrunner/connector/kafka/KafkaRecordTimestampWatermarkStrategy.java @@ -0,0 +1,140 @@ +/* + * Copyright © 2026 DataSQRL (contact@datasqrl.com) + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.datasqrl.flinkrunner.connector.kafka; + +import java.io.Serial; +import java.time.Duration; +import java.util.Arrays; +import org.apache.flink.annotation.Internal; +import org.apache.flink.api.common.eventtime.Watermark; +import org.apache.flink.api.common.eventtime.WatermarkGenerator; +import org.apache.flink.api.common.eventtime.WatermarkGeneratorSupplier; +import org.apache.flink.api.common.eventtime.WatermarkOutput; +import org.apache.flink.api.common.eventtime.WatermarkStrategy; +import org.apache.flink.table.data.RowData; + +/** Adaptive watermark strategy based on Kafka record timestamps. */ +@Internal +public final class KafkaRecordTimestampWatermarkStrategy implements WatermarkStrategy { + + @Serial private static final long serialVersionUID = 1L; + + static final int MIN_RECORDS = 250; + static final int SAMPLE_SIZE = 4096; + static final long MIN_OUT_OF_ORDERNESS_MILLIS = 50L; + static final long MAX_OUT_OF_ORDERNESS_MILLIS = Duration.ofDays(1).toMillis(); + + private static final double OUT_OF_ORDERNESS_QUANTILE = 0.95D; + + public static final KafkaRecordTimestampWatermarkStrategy INSTANCE = + new KafkaRecordTimestampWatermarkStrategy(); + + private KafkaRecordTimestampWatermarkStrategy() {} + + @Override + public WatermarkGenerator createWatermarkGenerator( + WatermarkGeneratorSupplier.Context context) { + + return new AdaptiveKafkaRecordTimestampWatermarkGenerator(); + } + + /** + * Watermark generator that derives event time from Kafka record timestamps and adapts its + * out-of-orderness delay from recently observed records. + * + *

The generator watches how far records tend to arrive behind the newest Kafka timestamp seen + * so far. It keeps a rolling sample of those delays and uses the 95th percentile as the safety + * margin before advancing the watermark. This lets the source tolerate typical out-of-order + * records without waiting for rare extreme delays. + * + *

The {@code eventTimestamp} argument is supplied by Flink from the Kafka source record + * timestamp. + */ + private static final class AdaptiveKafkaRecordTimestampWatermarkGenerator + implements WatermarkGenerator { + + /** + * Circular buffer of observed lateness values in milliseconds. + * + *

For a record with timestamp {@code t}, lateness is {@code max(0, maxTimestamp - t)} using + * the maximum timestamp seen before that record. In-order records therefore contribute {@code + * 0}, while older records contribute how far they lag behind the current observed frontier. + */ + private final long[] latenessSamples = new long[SAMPLE_SIZE]; + + /** Highest Kafka record timestamp observed by this generator. */ + private long maxTimestamp = Long.MIN_VALUE; + + /** Last emitted watermark, used to preserve Flink's monotonic watermark contract. */ + private long lastEmittedWatermark = Long.MIN_VALUE; + + /** + * Number of records observed, including the first record that cannot produce a lateness sample. + */ + private long recordCount; + + /** Next slot in the circular lateness sample buffer. */ + private int nextSampleIndex; + + @Override + public void onEvent(RowData event, long eventTimestamp, WatermarkOutput output) { + if (maxTimestamp != Long.MIN_VALUE) { + // Sample lateness before updating maxTimestamp, so the sample reflects disorder relative to + // the already observed stream frontier. + latenessSamples[nextSampleIndex] = Math.max(0L, maxTimestamp - eventTimestamp); + nextSampleIndex = (nextSampleIndex + 1) % SAMPLE_SIZE; + } + + maxTimestamp = Math.max(maxTimestamp, eventTimestamp); + recordCount++; + } + + @Override + public void onPeriodicEmit(WatermarkOutput output) { + if (recordCount < MIN_RECORDS || maxTimestamp == Long.MIN_VALUE) { + // Avoid emitting during warm-up, so limited early samples won't distort the estimate. + return; + } + + long outOfOrdernessMillis = calculateOutOfOrdernessMillis(); + long watermark = maxTimestamp - outOfOrdernessMillis - 1; + + if (watermark > lastEmittedWatermark) { + output.emitWatermark(new Watermark(watermark)); + lastEmittedWatermark = watermark; + } + } + + private long calculateOutOfOrdernessMillis() { + int sampleCount = (int) Math.min(recordCount - 1, SAMPLE_SIZE); + if (sampleCount <= 0) { + return MIN_OUT_OF_ORDERNESS_MILLIS; + } + + long[] samples = Arrays.copyOf(latenessSamples, sampleCount); + Arrays.sort(samples); + + // Use a high quantile rather than the maximum, so a single pathological record does not stall + // all event-time progress until it rotates out of the sample buffer. + int quantileIndex = + Math.min(sampleCount - 1, (int) Math.ceil(sampleCount * OUT_OF_ORDERNESS_QUANTILE) - 1); + + return Math.min( + MAX_OUT_OF_ORDERNESS_MILLIS, + Math.max(MIN_OUT_OF_ORDERNESS_MILLIS, samples[quantileIndex])); + } + } +} diff --git a/connectors/kafka-safe-connector/src/main/java/org/apache/flink/streaming/connectors/kafka/table/SafeKafkaDynamicSource.java b/connectors/kafka-safe-connector/src/main/java/org/apache/flink/streaming/connectors/kafka/table/SafeKafkaDynamicSource.java index c64d78c8..63b445c9 100644 --- a/connectors/kafka-safe-connector/src/main/java/org/apache/flink/streaming/connectors/kafka/table/SafeKafkaDynamicSource.java +++ b/connectors/kafka-safe-connector/src/main/java/org/apache/flink/streaming/connectors/kafka/table/SafeKafkaDynamicSource.java @@ -15,6 +15,7 @@ */ package org.apache.flink.streaming.connectors.kafka.table; +import com.datasqrl.flinkrunner.connector.kafka.KafkaRecordTimestampWatermarkStrategy; import org.apache.flink.annotation.Internal; import org.apache.flink.api.common.eventtime.WatermarkStrategy; import org.apache.flink.api.common.serialization.DeserializationSchema; @@ -40,6 +41,7 @@ import org.apache.flink.table.connector.source.DynamicTableSource; import org.apache.flink.table.connector.source.ScanTableSource; import org.apache.flink.table.connector.source.abilities.SupportsReadingMetadata; +import org.apache.flink.table.connector.source.abilities.SupportsSourceWatermark; import org.apache.flink.table.connector.source.abilities.SupportsWatermarkPushDown; import org.apache.flink.table.data.GenericMapData; import org.apache.flink.table.data.RowData; @@ -82,7 +84,10 @@ /** A version-agnostic Kafka {@link ScanTableSource}. */ @Internal public class SafeKafkaDynamicSource - implements ScanTableSource, SupportsReadingMetadata, SupportsWatermarkPushDown { + implements ScanTableSource, + SupportsReadingMetadata, + SupportsSourceWatermark, + SupportsWatermarkPushDown { private static final String KAFKA_TRANSFORMATION = "kafka"; @@ -99,6 +104,9 @@ public class SafeKafkaDynamicSource /** Watermark strategy that is used to generate per-partition watermark. */ protected @Nullable WatermarkStrategy watermarkStrategy; + /** Whether to use Kafka record timestamps to generate source watermarks. */ + protected boolean sourceWatermarkEnabled; + // -------------------------------------------------------------------------------------------- // Format attributes // -------------------------------------------------------------------------------------------- @@ -215,6 +223,7 @@ public SafeKafkaDynamicSource( this.producedDataType = physicalDataType; this.metadataKeys = Collections.emptyList(); this.watermarkStrategy = null; + this.sourceWatermarkEnabled = false; // Kafka-specific attributes Preconditions.checkArgument( (topics != null && topicPattern == null) @@ -264,9 +273,7 @@ public ScanRuntimeProvider getScanRuntimeProvider(ScanContext context) { @Override public DataStream produceDataStream( ProviderContext providerContext, StreamExecutionEnvironment execEnv) { - if (watermarkStrategy == null) { - watermarkStrategy = WatermarkStrategy.noWatermarks(); - } + final WatermarkStrategy watermarkStrategy = getWatermarkStrategy(); DataStreamSource sourceStream = execEnv.fromSource( kafkaSource, watermarkStrategy, "KafkaSource-" + tableIdentifier); @@ -340,6 +347,11 @@ public void applyWatermark(WatermarkStrategy watermarkStrategy) { this.watermarkStrategy = watermarkStrategy; } + @Override + public void applySourceWatermark() { + this.sourceWatermarkEnabled = true; + } + @Override public DynamicTableSource copy() { final SafeKafkaDynamicSource copy = @@ -366,6 +378,7 @@ public DynamicTableSource copy() { copy.producedDataType = producedDataType; copy.metadataKeys = metadataKeys; copy.watermarkStrategy = watermarkStrategy; + copy.sourceWatermarkEnabled = sourceWatermarkEnabled; return copy; } @@ -403,6 +416,7 @@ public boolean equals(Object o) { && Objects.equals(upsertMode, that.upsertMode) && Objects.equals(tableIdentifier, that.tableIdentifier) && Objects.equals(watermarkStrategy, that.watermarkStrategy) + && sourceWatermarkEnabled == that.sourceWatermarkEnabled && Objects.equals(parallelism, that.parallelism); } @@ -429,6 +443,7 @@ public int hashCode() { upsertMode, tableIdentifier, watermarkStrategy, + sourceWatermarkEnabled, parallelism); } @@ -509,6 +524,18 @@ protected KafkaSource createKafkaSource( return kafkaSourceBuilder.build(); } + private WatermarkStrategy getWatermarkStrategy() { + if (watermarkStrategy != null) { + return watermarkStrategy; + } + + if (sourceWatermarkEnabled) { + return KafkaRecordTimestampWatermarkStrategy.INSTANCE; + } + + return WatermarkStrategy.noWatermarks(); + } + private OffsetResetStrategy getResetStrategy(String offsetResetConfig) { return Arrays.stream(OffsetResetStrategy.values()) .filter(ors -> ors.name().equals(offsetResetConfig.toUpperCase(Locale.ROOT))) diff --git a/connectors/kafka-safe-connector/src/test/java/com/datasqrl/flinkrunner/connector/kafka/KafkaRecordTimestampWatermarkStrategyTest.java b/connectors/kafka-safe-connector/src/test/java/com/datasqrl/flinkrunner/connector/kafka/KafkaRecordTimestampWatermarkStrategyTest.java new file mode 100644 index 00000000..a4c01005 --- /dev/null +++ b/connectors/kafka-safe-connector/src/test/java/com/datasqrl/flinkrunner/connector/kafka/KafkaRecordTimestampWatermarkStrategyTest.java @@ -0,0 +1,98 @@ +/* + * Copyright © 2026 DataSQRL (contact@datasqrl.com) + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.datasqrl.flinkrunner.connector.kafka; + +import static org.assertj.core.api.Assertions.assertThat; + +import java.util.ArrayList; +import java.util.List; +import org.apache.flink.api.common.eventtime.Watermark; +import org.apache.flink.api.common.eventtime.WatermarkGenerator; +import org.apache.flink.api.common.eventtime.WatermarkOutput; +import org.apache.flink.table.data.RowData; +import org.junit.jupiter.api.Test; + +/** Tests for {@link KafkaRecordTimestampWatermarkStrategy}. */ +class KafkaRecordTimestampWatermarkStrategyTest { + + @Test + void testDoesNotEmitBeforeWarmup() { + final WatermarkGenerator generator = createGenerator(); + final CollectingWatermarkOutput output = new CollectingWatermarkOutput(); + + for (int i = 0; i < KafkaRecordTimestampWatermarkStrategy.MIN_RECORDS - 1; i++) { + generator.onEvent(null, i, output); + } + generator.onPeriodicEmit(output); + + assertThat(output.watermarks).isEmpty(); + } + + @Test + void testEmitsWatermarkFromKafkaRecordTimestampAfterWarmup() { + final WatermarkGenerator generator = createGenerator(); + final CollectingWatermarkOutput output = new CollectingWatermarkOutput(); + + for (int i = 0; i < KafkaRecordTimestampWatermarkStrategy.MIN_RECORDS; i++) { + generator.onEvent(null, i * 100L, output); + } + generator.onPeriodicEmit(output); + + assertThat(output.watermarks) + .containsExactly( + (KafkaRecordTimestampWatermarkStrategy.MIN_RECORDS - 1) * 100L + - KafkaRecordTimestampWatermarkStrategy.MIN_OUT_OF_ORDERNESS_MILLIS + - 1); + } + + @Test + void testWatermarksAreMonotonic() { + final WatermarkGenerator generator = createGenerator(); + final CollectingWatermarkOutput output = new CollectingWatermarkOutput(); + + for (int i = 0; i < KafkaRecordTimestampWatermarkStrategy.MIN_RECORDS; i++) { + generator.onEvent(null, i * 100L, output); + } + generator.onPeriodicEmit(output); + + for (int i = 0; i < KafkaRecordTimestampWatermarkStrategy.MIN_RECORDS; i++) { + generator.onEvent(null, 0L, output); + } + generator.onPeriodicEmit(output); + + assertThat(output.watermarks).hasSize(1); + } + + private static WatermarkGenerator createGenerator() { + return KafkaRecordTimestampWatermarkStrategy.INSTANCE.createWatermarkGenerator(null); + } + + private static final class CollectingWatermarkOutput implements WatermarkOutput { + + private final List watermarks = new ArrayList<>(); + + @Override + public void emitWatermark(Watermark watermark) { + watermarks.add(watermark.getTimestamp()); + } + + @Override + public void markIdle() {} + + @Override + public void markActive() {} + } +} diff --git a/flink-sql-runner/src/test/java/com/datasqrl/flinkrunner/AbstractITSupport.java b/flink-sql-runner/src/test/java/com/datasqrl/flinkrunner/AbstractITSupport.java index eea663d3..c9175832 100644 --- a/flink-sql-runner/src/test/java/com/datasqrl/flinkrunner/AbstractITSupport.java +++ b/flink-sql-runner/src/test/java/com/datasqrl/flinkrunner/AbstractITSupport.java @@ -76,7 +76,24 @@ public class AbstractITSupport { DockerImageName.parse("docker.redpanda.com/redpandadata/redpanda:v23.1.2")) .withNetwork(sharedNetwork) .withNetworkAliases("redpanda") - .withExposedPorts(REDPANDA_PORT); + .withExposedPorts(REDPANDA_PORT) + .withCommand( + "redpanda", + "start", + "--overprovisioned", + "--smp", + "1", + "--memory", + "1G", + "--reserve-memory", + "0M", + "--node-id", + "0", + "--check=false", + "--kafka-addr", + "PLAINTEXT://0.0.0.0:9092", + "--advertise-kafka-addr", + "PLAINTEXT://redpanda:9092"); protected static GenericContainer flinkContainer; @@ -85,8 +102,6 @@ public class AbstractITSupport { @SuppressWarnings("resource") @BeforeAll protected void init() throws Exception { - int redisPort = redpandaContainer.getMappedPort(REDPANDA_PORT); - flinkContainer = new GenericContainer<>(DockerImageName.parse("flink-sql-runner")) .withNetwork(sharedNetwork) @@ -94,7 +109,7 @@ protected void init() throws Exception { .withEnv("JDBC_URL", "jdbc:postgresql://postgres:5432/datasqrl") .withEnv("JDBC_USERNAME", "postgres") .withEnv("JDBC_PASSWORD", "postgres") - .withEnv("REDPANDA_PORT", String.valueOf(redisPort)) + .withEnv("REDPANDA_PORT", String.valueOf(REDPANDA_PORT)) .withFileSystemBind("target/test-classes/plans", "/it/planfile", BindMode.READ_ONLY) .withFileSystemBind("target/test-classes/sql", "/it/sqlfile", BindMode.READ_ONLY) .withFileSystemBind("target/test-classes/sqrl", "/it/sqrl", BindMode.READ_ONLY) diff --git a/flink-sql-runner/src/test/java/com/datasqrl/flinkrunner/KafkaSourceWatermarkIT.java b/flink-sql-runner/src/test/java/com/datasqrl/flinkrunner/KafkaSourceWatermarkIT.java new file mode 100644 index 00000000..61288ecf --- /dev/null +++ b/flink-sql-runner/src/test/java/com/datasqrl/flinkrunner/KafkaSourceWatermarkIT.java @@ -0,0 +1,105 @@ +/* + * Copyright © 2026 DataSQRL (contact@datasqrl.com) + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.datasqrl.flinkrunner; + +import static org.assertj.core.api.Assertions.assertThat; + +import com.nextbreakpoint.flink.client.model.JobStatus; +import org.jdbi.v3.core.Jdbi; +import org.jdbi.v3.sqlobject.SqlObjectPlugin; +import org.jdbi.v3.sqlobject.statement.SqlQuery; +import org.jdbi.v3.sqlobject.statement.SqlUpdate; +import org.junit.jupiter.api.Test; +import org.testcontainers.containers.Container; + +class KafkaSourceWatermarkIT extends AbstractITSupport { + + private static final String TOPIC = "source-watermark-it"; + + @Test + void givenKafkaSourceWatermark_whenWindowing_thenWatermarkClosesWindows() throws Exception { + var resultDao = connect(); + resultDao.createTable(); + resultDao.truncateTable(); + + createTopic(); + produceTimestampedRecords(); + + flinkRun("--sqlfile", "/it/sqlfile/kafka_source_watermark.sql"); + + try { + untilAssert( + () -> { + assertThat(resultDao.getRowCount()).isGreaterThan(0); + assertThat(resultDao.getTotalCount()).isGreaterThan(0); + }); + } catch (Throwable t) { + throw new AssertionError(readFlinkLogs(), t); + } + } + + private String readFlinkLogs() throws Exception { + Container.ExecResult result = + flinkContainer.execInContainer( + "bash", "-c", "for f in /opt/flink/log/*; do echo ==== $f ====; tail -200 $f; done"); + return result.getStdout() + result.getStderr(); + } + + private void createTopic() throws Exception { + Container.ExecResult result = + redpandaContainer.execInContainer( + "rpk", "topic", "create", TOPIC, "--brokers", "redpanda:9092", "--partitions", "1"); + + assertThat(result.getExitCode()) + .withFailMessage(result.getStdout() + result.getStderr()) + .isZero(); + } + + private void produceTimestampedRecords() throws Exception { + String jobId = flinkRun("--sqlfile", "/it/sqlfile/kafka_source_watermark_produce.sql"); + + untilAssert( + () -> assertThat(client.getJobStatusInfo(jobId).getStatus()).isEqualTo(JobStatus.FINISHED)); + } + + private ResultDao connect() { + var mappedPort = postgresContainer.getMappedPort(5432); + var jdbi = + Jdbi.create( + "jdbc:postgresql://localhost:" + mappedPort + "/datasqrl", "postgres", "postgres"); + jdbi.installPlugin(new SqlObjectPlugin()); + return jdbi.onDemand(ResultDao.class); + } + + interface ResultDao { + + @SqlUpdate( + "CREATE TABLE IF NOT EXISTS source_watermark_results (" + + "window_start TIMESTAMP NOT NULL, " + + "window_end TIMESTAMP NOT NULL, " + + "cnt BIGINT NOT NULL)") + void createTable(); + + @SqlUpdate("TRUNCATE source_watermark_results") + void truncateTable(); + + @SqlQuery("SELECT count(*) FROM source_watermark_results") + long getRowCount(); + + @SqlQuery("SELECT coalesce(sum(cnt), 0) FROM source_watermark_results") + long getTotalCount(); + } +} diff --git a/flink-sql-runner/src/test/resources/sql/kafka_source_watermark.sql b/flink-sql-runner/src/test/resources/sql/kafka_source_watermark.sql new file mode 100644 index 00000000..0a73b2ec --- /dev/null +++ b/flink-sql-runner/src/test/resources/sql/kafka_source_watermark.sql @@ -0,0 +1,36 @@ +CREATE TABLE source_events ( + id BIGINT, + payload STRING, + ts TIMESTAMP_LTZ(3) METADATA FROM 'timestamp', + WATERMARK FOR ts AS SOURCE_WATERMARK() +) WITH ( + 'connector' = 'kafka-safe', + 'topic' = 'source-watermark-it', + 'properties.bootstrap.servers' = 'redpanda:9092', + 'properties.group.id' = 'source-watermark-it', + 'scan.startup.mode' = 'earliest-offset', + 'format' = 'json' +); + +CREATE TABLE source_watermark_results ( + window_start TIMESTAMP(3) NOT NULL, + window_end TIMESTAMP(3) NOT NULL, + cnt BIGINT NOT NULL +) WITH ( + 'connector' = 'jdbc', + 'driver' = 'org.postgresql.Driver', + 'url' = '${JDBC_URL}', + 'username' = '${JDBC_USERNAME}', + 'password' = '${JDBC_PASSWORD}', + 'table-name' = 'source_watermark_results' +); + +INSERT INTO source_watermark_results +SELECT + CAST(window_start AS TIMESTAMP(3)), + CAST(window_end AS TIMESTAMP(3)), + COUNT(*) +FROM TABLE( + TUMBLE(TABLE source_events, DESCRIPTOR(ts), INTERVAL '1' SECOND) +) +GROUP BY window_start, window_end; diff --git a/flink-sql-runner/src/test/resources/sql/kafka_source_watermark_produce.sql b/flink-sql-runner/src/test/resources/sql/kafka_source_watermark_produce.sql new file mode 100644 index 00000000..6cb002a1 --- /dev/null +++ b/flink-sql-runner/src/test/resources/sql/kafka_source_watermark_produce.sql @@ -0,0 +1,28 @@ +CREATE TABLE source_watermark_records ( + record_id BIGINT +) WITH ( + 'connector' = 'datagen', + 'number-of-rows' = '300', + 'rows-per-second' = '1000', + 'fields.record_id.kind' = 'sequence', + 'fields.record_id.start' = '0', + 'fields.record_id.end' = '299' +); + +CREATE TABLE source_watermark_input ( + id BIGINT, + payload STRING, + ts TIMESTAMP_LTZ(3) METADATA FROM 'timestamp' +) WITH ( + 'connector' = 'kafka-safe', + 'topic' = 'source-watermark-it', + 'properties.bootstrap.servers' = 'redpanda:9092', + 'format' = 'json' +); + +INSERT INTO source_watermark_input +SELECT + CAST(record_id AS BIGINT), + CONCAT('record-', CAST(record_id AS STRING)), + TO_TIMESTAMP_LTZ(1780272000000 + record_id * 100, 3) -- 2026-06-01 00:00:00 +FROM source_watermark_records; From 7125130854d2fd273c44fb208c9fb8ac152a1844 Mon Sep 17 00:00:00 2001 From: Ferenc Csaky Date: Tue, 2 Jun 2026 16:18:59 +0200 Subject: [PATCH 2/6] address review comments --- ...KafkaRecordTimestampWatermarkStrategy.java | 63 +++++++++-- .../kafka/SourceWatermarkOptions.java | 103 ++++++++++++++++++ .../kafka/table/SafeKafkaDynamicSource.java | 50 ++++++++- .../table/SafeKafkaDynamicTableFactory.java | 30 ++++- .../SafeUpsertKafkaDynamicTableFactory.java | 18 ++- ...aRecordTimestampWatermarkStrategyTest.java | 54 ++++++++- .../resources/sql/kafka_source_watermark.sql | 6 + 7 files changed, 302 insertions(+), 22 deletions(-) create mode 100644 connectors/kafka-safe-connector/src/main/java/com/datasqrl/flinkrunner/connector/kafka/SourceWatermarkOptions.java diff --git a/connectors/kafka-safe-connector/src/main/java/com/datasqrl/flinkrunner/connector/kafka/KafkaRecordTimestampWatermarkStrategy.java b/connectors/kafka-safe-connector/src/main/java/com/datasqrl/flinkrunner/connector/kafka/KafkaRecordTimestampWatermarkStrategy.java index f78de399..164748b2 100644 --- a/connectors/kafka-safe-connector/src/main/java/com/datasqrl/flinkrunner/connector/kafka/KafkaRecordTimestampWatermarkStrategy.java +++ b/connectors/kafka-safe-connector/src/main/java/com/datasqrl/flinkrunner/connector/kafka/KafkaRecordTimestampWatermarkStrategy.java @@ -15,6 +15,7 @@ */ package com.datasqrl.flinkrunner.connector.kafka; +import com.datasqrl.flinkrunner.connector.kafka.SourceWatermarkOptions.SourceWatermarkConfig; import java.io.Serial; import java.time.Duration; import java.util.Arrays; @@ -25,6 +26,7 @@ import org.apache.flink.api.common.eventtime.WatermarkOutput; import org.apache.flink.api.common.eventtime.WatermarkStrategy; import org.apache.flink.table.data.RowData; +import org.apache.flink.table.watermark.WatermarkEmitStrategy; /** Adaptive watermark strategy based on Kafka record timestamps. */ @Internal @@ -37,18 +39,36 @@ public final class KafkaRecordTimestampWatermarkStrategy implements WatermarkStr static final long MIN_OUT_OF_ORDERNESS_MILLIS = 50L; static final long MAX_OUT_OF_ORDERNESS_MILLIS = Duration.ofDays(1).toMillis(); - private static final double OUT_OF_ORDERNESS_QUANTILE = 0.95D; + static final double OUT_OF_ORDERNESS_QUANTILE = 0.95D; - public static final KafkaRecordTimestampWatermarkStrategy INSTANCE = - new KafkaRecordTimestampWatermarkStrategy(); + private final WatermarkEmitStrategy emitStrategy; + private final SourceWatermarkConfig configuration; - private KafkaRecordTimestampWatermarkStrategy() {} + public KafkaRecordTimestampWatermarkStrategy() { + this(WatermarkEmitStrategy.ON_PERIODIC); + } + + public KafkaRecordTimestampWatermarkStrategy(WatermarkEmitStrategy emitStrategy) { + this( + emitStrategy, + new SourceWatermarkConfig( + MIN_RECORDS, + MIN_OUT_OF_ORDERNESS_MILLIS, + MAX_OUT_OF_ORDERNESS_MILLIS, + OUT_OF_ORDERNESS_QUANTILE)); + } + + public KafkaRecordTimestampWatermarkStrategy( + WatermarkEmitStrategy emitStrategy, SourceWatermarkConfig configuration) { + this.emitStrategy = emitStrategy; + this.configuration = configuration; + } @Override public WatermarkGenerator createWatermarkGenerator( WatermarkGeneratorSupplier.Context context) { - return new AdaptiveKafkaRecordTimestampWatermarkGenerator(); + return new AdaptiveKafkaRecordTimestampWatermarkGenerator(emitStrategy, configuration); } /** @@ -75,6 +95,9 @@ private static final class AdaptiveKafkaRecordTimestampWatermarkGenerator */ private final long[] latenessSamples = new long[SAMPLE_SIZE]; + private final WatermarkEmitStrategy emitStrategy; + private final SourceWatermarkConfig configuration; + /** Highest Kafka record timestamp observed by this generator. */ private long maxTimestamp = Long.MIN_VALUE; @@ -89,6 +112,12 @@ private static final class AdaptiveKafkaRecordTimestampWatermarkGenerator /** Next slot in the circular lateness sample buffer. */ private int nextSampleIndex; + private AdaptiveKafkaRecordTimestampWatermarkGenerator( + WatermarkEmitStrategy emitStrategy, SourceWatermarkConfig configuration) { + this.emitStrategy = emitStrategy; + this.configuration = configuration; + } + @Override public void onEvent(RowData event, long eventTimestamp, WatermarkOutput output) { if (maxTimestamp != Long.MIN_VALUE) { @@ -100,11 +129,23 @@ public void onEvent(RowData event, long eventTimestamp, WatermarkOutput output) maxTimestamp = Math.max(maxTimestamp, eventTimestamp); recordCount++; + + if (emitStrategy.isOnEvent()) { + emitIfReady(output); + } } @Override public void onPeriodicEmit(WatermarkOutput output) { - if (recordCount < MIN_RECORDS || maxTimestamp == Long.MIN_VALUE) { + if (!emitStrategy.isOnPeriodic()) { + return; + } + + emitIfReady(output); + } + + private void emitIfReady(WatermarkOutput output) { + if (recordCount < configuration.getMinRecords() || maxTimestamp == Long.MIN_VALUE) { // Avoid emitting during warm-up, so limited early samples won't distort the estimate. return; } @@ -121,7 +162,7 @@ public void onPeriodicEmit(WatermarkOutput output) { private long calculateOutOfOrdernessMillis() { int sampleCount = (int) Math.min(recordCount - 1, SAMPLE_SIZE); if (sampleCount <= 0) { - return MIN_OUT_OF_ORDERNESS_MILLIS; + return configuration.getMinOutOfOrdernessMillis(); } long[] samples = Arrays.copyOf(latenessSamples, sampleCount); @@ -130,11 +171,13 @@ private long calculateOutOfOrdernessMillis() { // Use a high quantile rather than the maximum, so a single pathological record does not stall // all event-time progress until it rotates out of the sample buffer. int quantileIndex = - Math.min(sampleCount - 1, (int) Math.ceil(sampleCount * OUT_OF_ORDERNESS_QUANTILE) - 1); + Math.min( + sampleCount - 1, + (int) Math.ceil(sampleCount * configuration.getOutOfOrdernessQuantile()) - 1); return Math.min( - MAX_OUT_OF_ORDERNESS_MILLIS, - Math.max(MIN_OUT_OF_ORDERNESS_MILLIS, samples[quantileIndex])); + configuration.getMaxOutOfOrdernessMillis(), + Math.max(configuration.getMinOutOfOrdernessMillis(), samples[quantileIndex])); } } } diff --git a/connectors/kafka-safe-connector/src/main/java/com/datasqrl/flinkrunner/connector/kafka/SourceWatermarkOptions.java b/connectors/kafka-safe-connector/src/main/java/com/datasqrl/flinkrunner/connector/kafka/SourceWatermarkOptions.java new file mode 100644 index 00000000..c7278d3c --- /dev/null +++ b/connectors/kafka-safe-connector/src/main/java/com/datasqrl/flinkrunner/connector/kafka/SourceWatermarkOptions.java @@ -0,0 +1,103 @@ +/* + * Copyright © 2026 DataSQRL (contact@datasqrl.com) + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.datasqrl.flinkrunner.connector.kafka; + +import java.io.Serial; +import java.io.Serializable; +import java.time.Duration; +import lombok.Value; +import org.apache.flink.configuration.ConfigOption; +import org.apache.flink.configuration.ConfigOptions; +import org.apache.flink.configuration.ReadableConfig; +import org.apache.flink.table.api.ValidationException; + +public class SourceWatermarkOptions { + + public static final ConfigOption SCAN_SOURCE_WATERMARK_MIN_RECORDS = + ConfigOptions.key("scan.source-watermark.min-records") + .intType() + .defaultValue(KafkaRecordTimestampWatermarkStrategy.MIN_RECORDS) + .withDescription( + "Minimum number of records to observe before emitting source watermarks."); + + public static final ConfigOption SCAN_SOURCE_WATERMARK_MIN_OUT_OF_ORDERNESS = + ConfigOptions.key("scan.source-watermark.min-out-of-orderness") + .durationType() + .defaultValue( + Duration.ofMillis(KafkaRecordTimestampWatermarkStrategy.MIN_OUT_OF_ORDERNESS_MILLIS)) + .withDescription("Minimum adaptive out-of-orderness delay for source watermarks."); + + public static final ConfigOption SCAN_SOURCE_WATERMARK_MAX_OUT_OF_ORDERNESS = + ConfigOptions.key("scan.source-watermark.max-out-of-orderness") + .durationType() + .defaultValue( + Duration.ofMillis(KafkaRecordTimestampWatermarkStrategy.MAX_OUT_OF_ORDERNESS_MILLIS)) + .withDescription("Maximum adaptive out-of-orderness delay for source watermarks."); + + public static final ConfigOption SCAN_SOURCE_WATERMARK_OUT_OF_ORDERNESS_QUANTILE = + ConfigOptions.key("scan.source-watermark.out-of-orderness-quantile") + .doubleType() + .defaultValue(KafkaRecordTimestampWatermarkStrategy.OUT_OF_ORDERNESS_QUANTILE) + .withDescription( + "Quantile of observed lateness samples to use as the source watermark delay."); + + public static SourceWatermarkConfig sourceWatermarkConfiguration(ReadableConfig tableOptions) { + var minRecords = tableOptions.get(SCAN_SOURCE_WATERMARK_MIN_RECORDS); + var minOutOfOrderness = tableOptions.get(SCAN_SOURCE_WATERMARK_MIN_OUT_OF_ORDERNESS); + var maxOutOfOrderness = tableOptions.get(SCAN_SOURCE_WATERMARK_MAX_OUT_OF_ORDERNESS); + var quantile = tableOptions.get(SCAN_SOURCE_WATERMARK_OUT_OF_ORDERNESS_QUANTILE); + + if (minRecords <= 0) { + throw new ValidationException( + String.format("'%s' must be greater than 0.", SCAN_SOURCE_WATERMARK_MIN_RECORDS.key())); + } + + if (minOutOfOrderness.isNegative()) { + throw new ValidationException( + String.format( + "'%s' must not be negative.", SCAN_SOURCE_WATERMARK_MIN_OUT_OF_ORDERNESS.key())); + } + + if (maxOutOfOrderness.compareTo(minOutOfOrderness) < 0) { + throw new ValidationException( + String.format( + "'%s' must be greater than or equal to '%s'.", + SCAN_SOURCE_WATERMARK_MAX_OUT_OF_ORDERNESS.key(), + SCAN_SOURCE_WATERMARK_MIN_OUT_OF_ORDERNESS.key())); + } + + if (quantile <= 0D || quantile > 1D) { + throw new ValidationException( + String.format( + "'%s' must be greater than 0 and less than or equal to 1.", + SCAN_SOURCE_WATERMARK_OUT_OF_ORDERNESS_QUANTILE.key())); + } + + return new SourceWatermarkConfig( + minRecords, minOutOfOrderness.toMillis(), maxOutOfOrderness.toMillis(), quantile); + } + + @Value + public static class SourceWatermarkConfig implements Serializable { + + @Serial private static final long serialVersionUID = 1L; + + int minRecords; + long minOutOfOrdernessMillis; + long maxOutOfOrdernessMillis; + double outOfOrdernessQuantile; + } +} diff --git a/connectors/kafka-safe-connector/src/main/java/org/apache/flink/streaming/connectors/kafka/table/SafeKafkaDynamicSource.java b/connectors/kafka-safe-connector/src/main/java/org/apache/flink/streaming/connectors/kafka/table/SafeKafkaDynamicSource.java index 63b445c9..40d43bb3 100644 --- a/connectors/kafka-safe-connector/src/main/java/org/apache/flink/streaming/connectors/kafka/table/SafeKafkaDynamicSource.java +++ b/connectors/kafka-safe-connector/src/main/java/org/apache/flink/streaming/connectors/kafka/table/SafeKafkaDynamicSource.java @@ -16,6 +16,7 @@ package org.apache.flink.streaming.connectors.kafka.table; import com.datasqrl.flinkrunner.connector.kafka.KafkaRecordTimestampWatermarkStrategy; +import com.datasqrl.flinkrunner.connector.kafka.SourceWatermarkOptions.SourceWatermarkConfig; import org.apache.flink.annotation.Internal; import org.apache.flink.api.common.eventtime.WatermarkStrategy; import org.apache.flink.api.common.serialization.DeserializationSchema; @@ -52,6 +53,7 @@ import org.apache.flink.table.types.logical.LogicalType; import org.apache.flink.table.types.logical.RowType; import org.apache.flink.table.types.logical.utils.LogicalTypeUtils; +import org.apache.flink.table.watermark.WatermarkEmitStrategy; import org.apache.flink.util.Preconditions; import com.datasqrl.flinkrunner.connector.kafka.DeserFailureHandler; @@ -63,6 +65,7 @@ import javax.annotation.Nullable; +import java.time.Duration; import java.util.ArrayList; import java.util.Arrays; import java.util.Collections; @@ -107,6 +110,15 @@ public class SafeKafkaDynamicSource /** Whether to use Kafka record timestamps to generate source watermarks. */ protected boolean sourceWatermarkEnabled; + /** Emit strategy for source watermarks. */ + protected final WatermarkEmitStrategy sourceWatermarkEmitStrategy; + + /** Idle timeout for source watermarks. */ + protected final Optional sourceWatermarkIdleTimeout; + + /** Adaptive source watermark configuration. */ + protected final SourceWatermarkConfig sourceWatermarkConfig; + // -------------------------------------------------------------------------------------------- // Format attributes // -------------------------------------------------------------------------------------------- @@ -205,7 +217,10 @@ public SafeKafkaDynamicSource( boolean upsertMode, String tableIdentifier, @Nullable Integer parallelism, - DeserFailureHandler deserFailureHandler) { + DeserFailureHandler deserFailureHandler, + WatermarkEmitStrategy sourceWatermarkEmitStrategy, + Optional sourceWatermarkIdleTimeout, + SourceWatermarkConfig sourceWatermarkConfig) { // Format attributes this.physicalDataType = Preconditions.checkNotNull( @@ -248,6 +263,18 @@ public SafeKafkaDynamicSource( this.tableIdentifier = tableIdentifier; this.parallelism = parallelism; this.deserFailureHandler = deserFailureHandler; + this.sourceWatermarkEmitStrategy = + Preconditions.checkNotNull( + sourceWatermarkEmitStrategy, + "Source watermark emit strategy must not be null."); + this.sourceWatermarkIdleTimeout = + Preconditions.checkNotNull( + sourceWatermarkIdleTimeout, + "Source watermark idle timeout must not be null."); + this.sourceWatermarkConfig = + Preconditions.checkNotNull( + sourceWatermarkConfig, + "Source watermark configuration must not be null."); } @Override @@ -374,7 +401,10 @@ public DynamicTableSource copy() { upsertMode, tableIdentifier, parallelism, - deserFailureHandler); + deserFailureHandler, + sourceWatermarkEmitStrategy, + sourceWatermarkIdleTimeout, + sourceWatermarkConfig); copy.producedDataType = producedDataType; copy.metadataKeys = metadataKeys; copy.watermarkStrategy = watermarkStrategy; @@ -417,6 +447,9 @@ public boolean equals(Object o) { && Objects.equals(tableIdentifier, that.tableIdentifier) && Objects.equals(watermarkStrategy, that.watermarkStrategy) && sourceWatermarkEnabled == that.sourceWatermarkEnabled + && sourceWatermarkEmitStrategy == that.sourceWatermarkEmitStrategy + && Objects.equals(sourceWatermarkIdleTimeout, that.sourceWatermarkIdleTimeout) + && Objects.equals(sourceWatermarkConfig, that.sourceWatermarkConfig) && Objects.equals(parallelism, that.parallelism); } @@ -444,6 +477,9 @@ public int hashCode() { tableIdentifier, watermarkStrategy, sourceWatermarkEnabled, + sourceWatermarkEmitStrategy, + sourceWatermarkIdleTimeout, + sourceWatermarkConfig, parallelism); } @@ -530,7 +566,15 @@ private WatermarkStrategy getWatermarkStrategy() { } if (sourceWatermarkEnabled) { - return KafkaRecordTimestampWatermarkStrategy.INSTANCE; + WatermarkStrategy sourceWatermarkStrategy = + new KafkaRecordTimestampWatermarkStrategy( + sourceWatermarkEmitStrategy, sourceWatermarkConfig); + if (sourceWatermarkIdleTimeout.isPresent()) { + sourceWatermarkStrategy = + sourceWatermarkStrategy.withIdleness(sourceWatermarkIdleTimeout.get()); + } + + return sourceWatermarkStrategy; } return WatermarkStrategy.noWatermarks(); diff --git a/connectors/kafka-safe-connector/src/main/java/org/apache/flink/streaming/connectors/kafka/table/SafeKafkaDynamicTableFactory.java b/connectors/kafka-safe-connector/src/main/java/org/apache/flink/streaming/connectors/kafka/table/SafeKafkaDynamicTableFactory.java index 6dab6c29..fe7de0e3 100644 --- a/connectors/kafka-safe-connector/src/main/java/org/apache/flink/streaming/connectors/kafka/table/SafeKafkaDynamicTableFactory.java +++ b/connectors/kafka-safe-connector/src/main/java/org/apache/flink/streaming/connectors/kafka/table/SafeKafkaDynamicTableFactory.java @@ -17,6 +17,7 @@ package org.apache.flink.streaming.connectors.kafka.table; import com.datasqrl.flinkrunner.connector.kafka.DeserFailureHandler; +import com.datasqrl.flinkrunner.connector.kafka.SourceWatermarkOptions.SourceWatermarkConfig; import com.google.auto.service.AutoService; import org.apache.flink.api.common.serialization.DeserializationSchema; import org.apache.flink.api.common.serialization.SerializationSchema; @@ -47,6 +48,7 @@ import org.apache.flink.table.factories.FactoryUtil.TableFactoryHelper; import org.apache.flink.table.factories.SerializationFormatFactory; import org.apache.flink.table.types.DataType; +import org.apache.flink.table.watermark.WatermarkEmitStrategy; import org.apache.flink.types.RowKind; import org.apache.kafka.common.TopicPartition; import org.slf4j.Logger; @@ -67,6 +69,11 @@ import static com.datasqrl.flinkrunner.connector.kafka.DeserFailureHandlerOptions.SCAN_DESER_FAILURE_HANDLER; import static com.datasqrl.flinkrunner.connector.kafka.DeserFailureHandlerOptions.SCAN_DESER_FAILURE_TOPIC; import static com.datasqrl.flinkrunner.connector.kafka.DeserFailureHandlerOptions.validateDeserFailureHandlerOptions; +import static com.datasqrl.flinkrunner.connector.kafka.SourceWatermarkOptions.SCAN_SOURCE_WATERMARK_MAX_OUT_OF_ORDERNESS; +import static com.datasqrl.flinkrunner.connector.kafka.SourceWatermarkOptions.SCAN_SOURCE_WATERMARK_MIN_OUT_OF_ORDERNESS; +import static com.datasqrl.flinkrunner.connector.kafka.SourceWatermarkOptions.SCAN_SOURCE_WATERMARK_MIN_RECORDS; +import static com.datasqrl.flinkrunner.connector.kafka.SourceWatermarkOptions.SCAN_SOURCE_WATERMARK_OUT_OF_ORDERNESS_QUANTILE; +import static com.datasqrl.flinkrunner.connector.kafka.SourceWatermarkOptions.sourceWatermarkConfiguration; import static org.apache.flink.streaming.connectors.kafka.table.KafkaConnectorOptions.DELIVERY_GUARANTEE; import static org.apache.flink.streaming.connectors.kafka.table.KafkaConnectorOptions.KEY_FIELDS; import static org.apache.flink.streaming.connectors.kafka.table.KafkaConnectorOptions.KEY_FIELDS_PREFIX; @@ -102,6 +109,8 @@ import static org.apache.flink.streaming.connectors.kafka.table.KafkaConnectorOptionsUtil.getTopics; import static org.apache.flink.streaming.connectors.kafka.table.KafkaConnectorOptionsUtil.validateTableSinkOptions; import static org.apache.flink.streaming.connectors.kafka.table.KafkaConnectorOptionsUtil.validateTableSourceOptions; +import static org.apache.flink.table.factories.FactoryUtil.SOURCE_IDLE_TIMEOUT; +import static org.apache.flink.table.factories.FactoryUtil.WATERMARK_EMIT_STRATEGY; /** * Factory for creating configured instances of {@link SafeKafkaDynamicSource} and {@link @@ -160,6 +169,12 @@ public Set> optionalOptions() { options.add(TRANSACTION_NAMING_STRATEGY); options.add(SCAN_DESER_FAILURE_HANDLER); options.add(SCAN_DESER_FAILURE_TOPIC); + options.add(WATERMARK_EMIT_STRATEGY); + options.add(SOURCE_IDLE_TIMEOUT); + options.add(SCAN_SOURCE_WATERMARK_MIN_RECORDS); + options.add(SCAN_SOURCE_WATERMARK_MIN_OUT_OF_ORDERNESS); + options.add(SCAN_SOURCE_WATERMARK_MAX_OUT_OF_ORDERNESS); + options.add(SCAN_SOURCE_WATERMARK_OUT_OF_ORDERNESS_QUANTILE); return options; } @@ -250,7 +265,10 @@ public DynamicTableSource createDynamicTableSource(Context context) { boundedOptions.boundedTimestampMillis, context.getObjectIdentifier().asSummaryString(), parallelism, - deserFailureHandler); + deserFailureHandler, + tableOptions.get(WATERMARK_EMIT_STRATEGY), + tableOptions.getOptional(SOURCE_IDLE_TIMEOUT), + sourceWatermarkConfiguration(tableOptions)); } @Override @@ -418,7 +436,10 @@ protected SafeKafkaDynamicSource createKafkaTableSource( long endTimestampMillis, String tableIdentifier, Integer parallelism, - DeserFailureHandler deserFailureHandler) { + DeserFailureHandler deserFailureHandler, + WatermarkEmitStrategy sourceWatermarkEmitStrategy, + Optional sourceWatermarkIdleTimeout, + SourceWatermarkConfig sourceWatermarkConfig) { return new SafeKafkaDynamicSource( physicalDataType, keyDecodingFormat, @@ -438,7 +459,10 @@ protected SafeKafkaDynamicSource createKafkaTableSource( false, tableIdentifier, parallelism, - deserFailureHandler); + deserFailureHandler, + sourceWatermarkEmitStrategy, + sourceWatermarkIdleTimeout, + sourceWatermarkConfig); } protected KafkaDynamicSink createKafkaTableSink( diff --git a/connectors/kafka-safe-connector/src/main/java/org/apache/flink/streaming/connectors/kafka/table/SafeUpsertKafkaDynamicTableFactory.java b/connectors/kafka-safe-connector/src/main/java/org/apache/flink/streaming/connectors/kafka/table/SafeUpsertKafkaDynamicTableFactory.java index 83a793f5..2f2467f7 100644 --- a/connectors/kafka-safe-connector/src/main/java/org/apache/flink/streaming/connectors/kafka/table/SafeUpsertKafkaDynamicTableFactory.java +++ b/connectors/kafka-safe-connector/src/main/java/org/apache/flink/streaming/connectors/kafka/table/SafeUpsertKafkaDynamicTableFactory.java @@ -55,6 +55,11 @@ import java.util.stream.Stream; import static com.datasqrl.flinkrunner.connector.kafka.DeserFailureHandlerOptions.*; +import static com.datasqrl.flinkrunner.connector.kafka.SourceWatermarkOptions.SCAN_SOURCE_WATERMARK_MAX_OUT_OF_ORDERNESS; +import static com.datasqrl.flinkrunner.connector.kafka.SourceWatermarkOptions.SCAN_SOURCE_WATERMARK_MIN_OUT_OF_ORDERNESS; +import static com.datasqrl.flinkrunner.connector.kafka.SourceWatermarkOptions.SCAN_SOURCE_WATERMARK_MIN_RECORDS; +import static com.datasqrl.flinkrunner.connector.kafka.SourceWatermarkOptions.SCAN_SOURCE_WATERMARK_OUT_OF_ORDERNESS_QUANTILE; +import static com.datasqrl.flinkrunner.connector.kafka.SourceWatermarkOptions.sourceWatermarkConfiguration; import static org.apache.flink.streaming.connectors.kafka.table.KafkaConnectorOptions.DELIVERY_GUARANTEE; import static org.apache.flink.streaming.connectors.kafka.table.KafkaConnectorOptions.KEY_FIELDS; import static org.apache.flink.streaming.connectors.kafka.table.KafkaConnectorOptions.KEY_FIELDS_PREFIX; @@ -82,6 +87,8 @@ import static org.apache.flink.streaming.connectors.kafka.table.KafkaConnectorOptionsUtil.getTopicPattern; import static org.apache.flink.streaming.connectors.kafka.table.KafkaConnectorOptionsUtil.getTopics; import static org.apache.flink.streaming.connectors.kafka.table.KafkaConnectorOptionsUtil.validateScanBoundedMode; +import static org.apache.flink.table.factories.FactoryUtil.SOURCE_IDLE_TIMEOUT; +import static org.apache.flink.table.factories.FactoryUtil.WATERMARK_EMIT_STRATEGY; /** Upsert-Kafka factory. */ @AutoService(Factory.class) @@ -123,6 +130,12 @@ public Set> optionalOptions() { options.add(TRANSACTIONAL_ID_PREFIX); options.add(SCAN_PARALLELISM); options.add(TRANSACTION_NAMING_STRATEGY); + options.add(WATERMARK_EMIT_STRATEGY); + options.add(SOURCE_IDLE_TIMEOUT); + options.add(SCAN_SOURCE_WATERMARK_MIN_RECORDS); + options.add(SCAN_SOURCE_WATERMARK_MIN_OUT_OF_ORDERNESS); + options.add(SCAN_SOURCE_WATERMARK_MAX_OUT_OF_ORDERNESS); + options.add(SCAN_SOURCE_WATERMARK_OUT_OF_ORDERNESS_QUANTILE); return options; } @@ -183,7 +196,10 @@ public DynamicTableSource createDynamicTableSource(Context context) { true, context.getObjectIdentifier().asSummaryString(), parallelism, - deserFailureHandler); + deserFailureHandler, + tableOptions.get(WATERMARK_EMIT_STRATEGY), + tableOptions.getOptional(SOURCE_IDLE_TIMEOUT), + sourceWatermarkConfiguration(tableOptions)); } @Override diff --git a/connectors/kafka-safe-connector/src/test/java/com/datasqrl/flinkrunner/connector/kafka/KafkaRecordTimestampWatermarkStrategyTest.java b/connectors/kafka-safe-connector/src/test/java/com/datasqrl/flinkrunner/connector/kafka/KafkaRecordTimestampWatermarkStrategyTest.java index a4c01005..4ab2f0f7 100644 --- a/connectors/kafka-safe-connector/src/test/java/com/datasqrl/flinkrunner/connector/kafka/KafkaRecordTimestampWatermarkStrategyTest.java +++ b/connectors/kafka-safe-connector/src/test/java/com/datasqrl/flinkrunner/connector/kafka/KafkaRecordTimestampWatermarkStrategyTest.java @@ -17,12 +17,14 @@ import static org.assertj.core.api.Assertions.assertThat; +import com.datasqrl.flinkrunner.connector.kafka.SourceWatermarkOptions.SourceWatermarkConfig; import java.util.ArrayList; import java.util.List; import org.apache.flink.api.common.eventtime.Watermark; import org.apache.flink.api.common.eventtime.WatermarkGenerator; import org.apache.flink.api.common.eventtime.WatermarkOutput; import org.apache.flink.table.data.RowData; +import org.apache.flink.table.watermark.WatermarkEmitStrategy; import org.junit.jupiter.api.Test; /** Tests for {@link KafkaRecordTimestampWatermarkStrategy}. */ @@ -30,7 +32,7 @@ class KafkaRecordTimestampWatermarkStrategyTest { @Test void testDoesNotEmitBeforeWarmup() { - final WatermarkGenerator generator = createGenerator(); + final WatermarkGenerator generator = createOnPeriodicGenerator(); final CollectingWatermarkOutput output = new CollectingWatermarkOutput(); for (int i = 0; i < KafkaRecordTimestampWatermarkStrategy.MIN_RECORDS - 1; i++) { @@ -43,7 +45,7 @@ void testDoesNotEmitBeforeWarmup() { @Test void testEmitsWatermarkFromKafkaRecordTimestampAfterWarmup() { - final WatermarkGenerator generator = createGenerator(); + final WatermarkGenerator generator = createOnPeriodicGenerator(); final CollectingWatermarkOutput output = new CollectingWatermarkOutput(); for (int i = 0; i < KafkaRecordTimestampWatermarkStrategy.MIN_RECORDS; i++) { @@ -58,9 +60,39 @@ void testEmitsWatermarkFromKafkaRecordTimestampAfterWarmup() { - 1); } + @Test + void testOnEventEmitStrategyEmitsWhenWarmupCompletes() { + final WatermarkGenerator generator = createOnEventGenerator(); + final CollectingWatermarkOutput output = new CollectingWatermarkOutput(); + + for (int i = 0; i < KafkaRecordTimestampWatermarkStrategy.MIN_RECORDS; i++) { + generator.onEvent(null, i * 100L, output); + } + + assertThat(output.watermarks) + .containsExactly( + (KafkaRecordTimestampWatermarkStrategy.MIN_RECORDS - 1) * 100L + - KafkaRecordTimestampWatermarkStrategy.MIN_OUT_OF_ORDERNESS_MILLIS + - 1); + } + + @Test + void testUsesConfiguredWarmupAndMinimumOutOfOrderness() { + final WatermarkGenerator generator = + createOnPeriodicGenerator(new SourceWatermarkConfig(3, 10, 1000, 0.95D)); + final CollectingWatermarkOutput output = new CollectingWatermarkOutput(); + + generator.onEvent(null, 0L, output); + generator.onEvent(null, 100L, output); + generator.onEvent(null, 200L, output); + generator.onPeriodicEmit(output); + + assertThat(output.watermarks).containsExactly(189L); + } + @Test void testWatermarksAreMonotonic() { - final WatermarkGenerator generator = createGenerator(); + final WatermarkGenerator generator = createOnPeriodicGenerator(); final CollectingWatermarkOutput output = new CollectingWatermarkOutput(); for (int i = 0; i < KafkaRecordTimestampWatermarkStrategy.MIN_RECORDS; i++) { @@ -76,8 +108,20 @@ void testWatermarksAreMonotonic() { assertThat(output.watermarks).hasSize(1); } - private static WatermarkGenerator createGenerator() { - return KafkaRecordTimestampWatermarkStrategy.INSTANCE.createWatermarkGenerator(null); + private static WatermarkGenerator createOnPeriodicGenerator() { + return new KafkaRecordTimestampWatermarkStrategy().createWatermarkGenerator(null); + } + + private static WatermarkGenerator createOnPeriodicGenerator( + SourceWatermarkConfig configuration) { + return new KafkaRecordTimestampWatermarkStrategy( + WatermarkEmitStrategy.ON_PERIODIC, configuration) + .createWatermarkGenerator(null); + } + + private static WatermarkGenerator createOnEventGenerator() { + return new KafkaRecordTimestampWatermarkStrategy(WatermarkEmitStrategy.ON_EVENT) + .createWatermarkGenerator(null); } private static final class CollectingWatermarkOutput implements WatermarkOutput { diff --git a/flink-sql-runner/src/test/resources/sql/kafka_source_watermark.sql b/flink-sql-runner/src/test/resources/sql/kafka_source_watermark.sql index 0a73b2ec..7105dd2f 100644 --- a/flink-sql-runner/src/test/resources/sql/kafka_source_watermark.sql +++ b/flink-sql-runner/src/test/resources/sql/kafka_source_watermark.sql @@ -9,6 +9,12 @@ CREATE TABLE source_events ( 'properties.bootstrap.servers' = 'redpanda:9092', 'properties.group.id' = 'source-watermark-it', 'scan.startup.mode' = 'earliest-offset', + 'scan.watermark.emit.strategy' = 'on-event', + 'scan.watermark.idle-timeout' = '1 s', + 'scan.source-watermark.min-records' = '250', + 'scan.source-watermark.min-out-of-orderness' = '50 ms', + 'scan.source-watermark.max-out-of-orderness' = '86400000 ms', + 'scan.source-watermark.out-of-orderness-quantile' = '0.95', 'format' = 'json' ); From 05fc8cbb2d6cb11873e92a7fa05e4a2e5a43658c Mon Sep 17 00:00:00 2001 From: Ferenc Csaky Date: Fri, 5 Jun 2026 16:12:16 +0200 Subject: [PATCH 3/6] add idle source watermark advance logic --- ...KafkaAdminIdleAdvanceReadinessChecker.java | 124 ++++++++++++++++ ...KafkaRecordTimestampWatermarkStrategy.java | 131 +++++++++++----- .../kafka/SourceWatermarkOptions.java | 140 +++++++++++++++--- .../kafka/table/SafeKafkaDynamicSource.java | 11 +- .../table/SafeKafkaDynamicTableFactory.java | 8 + .../SafeUpsertKafkaDynamicTableFactory.java | 8 + ...aAdminIdleAdvanceReadinessCheckerTest.java | 56 +++++++ ...aRecordTimestampWatermarkStrategyTest.java | 93 ++++++++++-- .../flinkrunner/KafkaSourceWatermarkIT.java | 129 ++++++++++++++-- .../kafka_source_watermark_idle_advance.sql | 41 +++++ ...kafka_source_watermark_no_idle_advance.sql | 38 +++++ 11 files changed, 697 insertions(+), 82 deletions(-) create mode 100644 connectors/kafka-safe-connector/src/main/java/com/datasqrl/flinkrunner/connector/kafka/KafkaAdminIdleAdvanceReadinessChecker.java create mode 100644 connectors/kafka-safe-connector/src/test/java/com/datasqrl/flinkrunner/connector/kafka/KafkaAdminIdleAdvanceReadinessCheckerTest.java create mode 100644 flink-sql-runner/src/test/resources/sql/kafka_source_watermark_idle_advance.sql create mode 100644 flink-sql-runner/src/test/resources/sql/kafka_source_watermark_no_idle_advance.sql diff --git a/connectors/kafka-safe-connector/src/main/java/com/datasqrl/flinkrunner/connector/kafka/KafkaAdminIdleAdvanceReadinessChecker.java b/connectors/kafka-safe-connector/src/main/java/com/datasqrl/flinkrunner/connector/kafka/KafkaAdminIdleAdvanceReadinessChecker.java new file mode 100644 index 00000000..a839fea1 --- /dev/null +++ b/connectors/kafka-safe-connector/src/main/java/com/datasqrl/flinkrunner/connector/kafka/KafkaAdminIdleAdvanceReadinessChecker.java @@ -0,0 +1,124 @@ +/* + * Copyright © 2026 DataSQRL (contact@datasqrl.com) + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.datasqrl.flinkrunner.connector.kafka; + +import static org.apache.flink.util.Preconditions.checkArgument; + +import com.datasqrl.flinkrunner.connector.kafka.SourceWatermarkOptions.SourceWatermarkConfig; +import java.io.Serial; +import java.io.Serializable; +import java.lang.ref.Cleaner; +import java.util.List; +import java.util.Properties; +import java.util.concurrent.TimeUnit; +import lombok.extern.slf4j.Slf4j; +import org.apache.kafka.clients.admin.AdminClient; +import org.apache.kafka.common.config.ConfigResource; +import org.apache.kafka.common.config.TopicConfig; +import org.apache.kafka.common.record.TimestampType; + +/** Readiness checker for idle source-watermark advancement backed by Kafka AdminClient metadata. */ +@Slf4j +class KafkaAdminIdleAdvanceReadinessChecker implements IdleAdvanceReadinessChecker { + + @Serial private static final long serialVersionUID = 1L; + + private static final Cleaner ADMIN_CLIENT_CLEANER = Cleaner.create(); + + private final Properties kafkaProperties; + private final List topics; + private final long brokerCheckTimeoutMillis; + private final long brokerCheckTtlMillis; + + @SuppressWarnings({"FieldCanBeLocal", "unused"}) + private transient Cleaner.Cleanable adminClientCleanable; + + private transient AdminClient adminClient; + private transient long lastCheckMillis = Long.MIN_VALUE; + private transient boolean lastCheckReady; + + KafkaAdminIdleAdvanceReadinessChecker( + Properties kafkaProperties, List topics, SourceWatermarkConfig config) { + this.kafkaProperties = kafkaProperties; + checkArgument(topics != null && !topics.isEmpty(), "topics must not be empty"); + this.topics = List.copyOf(topics); + this.brokerCheckTimeoutMillis = config.idleAdvanceBrokerCheckTimeoutMillis(); + this.brokerCheckTtlMillis = config.idleAdvanceBrokerCheckTtlMillis(); + } + + /** + * Returns whether it is safe to emit wall-clock-derived idle source watermarks. + * + *

Idle watermark advancement uses wall-clock time to move event time forward while no records + * are arriving. That is only safe when the broker is reachable and the source topics use Kafka + * {@code LogAppendTime}. + */ + @Override + public boolean isReady(long currentTimeMillis) { + if (lastCheckMillis != Long.MIN_VALUE + && currentTimeMillis - lastCheckMillis < brokerCheckTtlMillis) { + return lastCheckReady; + } + + lastCheckMillis = currentTimeMillis; + lastCheckReady = checkBrokerAndTopicTimestampType(); + + return lastCheckReady; + } + + boolean checkBrokerAndTopicTimestampType() { + try { + var configs = + adminClient() + .describeConfigs(mapTopicsToConfigResources()) + .all() + .get(brokerCheckTimeoutMillis, TimeUnit.MILLISECONDS); + + return configs.values().stream() + .allMatch( + config -> { + var timestampType = config.get(TopicConfig.MESSAGE_TIMESTAMP_TYPE_CONFIG); + return TimestampType.LOG_APPEND_TIME.toString().equals(timestampType.value()); + }); + + } catch (Exception e) { + log.debug("Failed to check broker and topic timestamp type", e); + return false; + } + } + + private AdminClient adminClient() { + if (adminClient == null) { + adminClient = AdminClient.create(kafkaProperties); + adminClientCleanable = ADMIN_CLIENT_CLEANER.register(this, adminClient::close); + } + + return adminClient; + } + + private List mapTopicsToConfigResources() { + return topics.stream() + .map(topic -> new ConfigResource(ConfigResource.Type.TOPIC, topic)) + .toList(); + } +} + +/** Determines whether it is currently safe to emit wall-clock-derived idle source watermarks. */ +@FunctionalInterface +interface IdleAdvanceReadinessChecker extends Serializable { + + boolean isReady(long currentTimeMillis); +} diff --git a/connectors/kafka-safe-connector/src/main/java/com/datasqrl/flinkrunner/connector/kafka/KafkaRecordTimestampWatermarkStrategy.java b/connectors/kafka-safe-connector/src/main/java/com/datasqrl/flinkrunner/connector/kafka/KafkaRecordTimestampWatermarkStrategy.java index 164748b2..e61a9b1b 100644 --- a/connectors/kafka-safe-connector/src/main/java/com/datasqrl/flinkrunner/connector/kafka/KafkaRecordTimestampWatermarkStrategy.java +++ b/connectors/kafka-safe-connector/src/main/java/com/datasqrl/flinkrunner/connector/kafka/KafkaRecordTimestampWatermarkStrategy.java @@ -17,9 +17,12 @@ import com.datasqrl.flinkrunner.connector.kafka.SourceWatermarkOptions.SourceWatermarkConfig; import java.io.Serial; -import java.time.Duration; +import java.io.Serializable; import java.util.Arrays; +import java.util.List; +import java.util.Properties; import org.apache.flink.annotation.Internal; +import org.apache.flink.annotation.VisibleForTesting; import org.apache.flink.api.common.eventtime.Watermark; import org.apache.flink.api.common.eventtime.WatermarkGenerator; import org.apache.flink.api.common.eventtime.WatermarkGeneratorSupplier; @@ -34,41 +37,50 @@ public final class KafkaRecordTimestampWatermarkStrategy implements WatermarkStr @Serial private static final long serialVersionUID = 1L; - static final int MIN_RECORDS = 250; - static final int SAMPLE_SIZE = 4096; - static final long MIN_OUT_OF_ORDERNESS_MILLIS = 50L; - static final long MAX_OUT_OF_ORDERNESS_MILLIS = Duration.ofDays(1).toMillis(); - - static final double OUT_OF_ORDERNESS_QUANTILE = 0.95D; + private static final int SAMPLE_SIZE = 4096; private final WatermarkEmitStrategy emitStrategy; - private final SourceWatermarkConfig configuration; + private final SourceWatermarkConfig sourceWatermarkConfig; + private final MillisClock clock; + private final IdleAdvanceReadinessChecker idleAdvanceReadinessChecker; - public KafkaRecordTimestampWatermarkStrategy() { - this(WatermarkEmitStrategy.ON_PERIODIC); + public KafkaRecordTimestampWatermarkStrategy( + WatermarkEmitStrategy emitStrategy, + SourceWatermarkConfig sourceWatermarkConfig, + Properties kafkaProperties, + List topics) { + this( + emitStrategy, + sourceWatermarkConfig, + System::currentTimeMillis, + new KafkaAdminIdleAdvanceReadinessChecker(kafkaProperties, topics, sourceWatermarkConfig)); } - public KafkaRecordTimestampWatermarkStrategy(WatermarkEmitStrategy emitStrategy) { + @VisibleForTesting + KafkaRecordTimestampWatermarkStrategy( + WatermarkEmitStrategy emitStrategy, SourceWatermarkConfig sourceWatermarkConfig) { this( - emitStrategy, - new SourceWatermarkConfig( - MIN_RECORDS, - MIN_OUT_OF_ORDERNESS_MILLIS, - MAX_OUT_OF_ORDERNESS_MILLIS, - OUT_OF_ORDERNESS_QUANTILE)); + emitStrategy, sourceWatermarkConfig, System::currentTimeMillis, currentTimeMillis -> false); } - public KafkaRecordTimestampWatermarkStrategy( - WatermarkEmitStrategy emitStrategy, SourceWatermarkConfig configuration) { + @VisibleForTesting + KafkaRecordTimestampWatermarkStrategy( + WatermarkEmitStrategy emitStrategy, + SourceWatermarkConfig sourceWatermarkConfig, + MillisClock clock, + IdleAdvanceReadinessChecker idleAdvanceReadinessChecker) { this.emitStrategy = emitStrategy; - this.configuration = configuration; + this.sourceWatermarkConfig = sourceWatermarkConfig; + this.clock = clock; + this.idleAdvanceReadinessChecker = idleAdvanceReadinessChecker; } @Override public WatermarkGenerator createWatermarkGenerator( WatermarkGeneratorSupplier.Context context) { - return new AdaptiveKafkaRecordTimestampWatermarkGenerator(emitStrategy, configuration); + return new AdaptiveKafkaRecordTimestampWatermarkGenerator( + emitStrategy, sourceWatermarkConfig, clock, idleAdvanceReadinessChecker); } /** @@ -96,11 +108,16 @@ private static final class AdaptiveKafkaRecordTimestampWatermarkGenerator private final long[] latenessSamples = new long[SAMPLE_SIZE]; private final WatermarkEmitStrategy emitStrategy; - private final SourceWatermarkConfig configuration; + private final SourceWatermarkConfig config; + private final MillisClock clock; + private final IdleAdvanceReadinessChecker idleAdvanceReadinessChecker; /** Highest Kafka record timestamp observed by this generator. */ private long maxTimestamp = Long.MIN_VALUE; + /** Processing time when the latest record was observed. */ + private long lastRecordWallClockMillis = Long.MIN_VALUE; + /** Last emitted watermark, used to preserve Flink's monotonic watermark contract. */ private long lastEmittedWatermark = Long.MIN_VALUE; @@ -113,9 +130,14 @@ private static final class AdaptiveKafkaRecordTimestampWatermarkGenerator private int nextSampleIndex; private AdaptiveKafkaRecordTimestampWatermarkGenerator( - WatermarkEmitStrategy emitStrategy, SourceWatermarkConfig configuration) { + WatermarkEmitStrategy emitStrategy, + SourceWatermarkConfig config, + MillisClock clock, + IdleAdvanceReadinessChecker idleAdvanceReadinessChecker) { this.emitStrategy = emitStrategy; - this.configuration = configuration; + this.config = config; + this.clock = clock; + this.idleAdvanceReadinessChecker = idleAdvanceReadinessChecker; } @Override @@ -128,6 +150,7 @@ public void onEvent(RowData event, long eventTimestamp, WatermarkOutput output) } maxTimestamp = Math.max(maxTimestamp, eventTimestamp); + lastRecordWallClockMillis = clock.currentTimeMillis(); recordCount++; if (emitStrategy.isOnEvent()) { @@ -137,15 +160,15 @@ public void onEvent(RowData event, long eventTimestamp, WatermarkOutput output) @Override public void onPeriodicEmit(WatermarkOutput output) { - if (!emitStrategy.isOnPeriodic()) { - return; + if (emitStrategy.isOnPeriodic()) { + emitIfReady(output); } - emitIfReady(output); + emitIdleWatermarkIfReady(output); } private void emitIfReady(WatermarkOutput output) { - if (recordCount < configuration.getMinRecords() || maxTimestamp == Long.MIN_VALUE) { + if (recordCount < config.minRecords() || maxTimestamp == Long.MIN_VALUE) { // Avoid emitting during warm-up, so limited early samples won't distort the estimate. return; } @@ -159,10 +182,47 @@ private void emitIfReady(WatermarkOutput output) { } } + private void emitIdleWatermarkIfReady(WatermarkOutput output) { + long idleAdvanceTimeoutMillis = config.idleAdvanceTimeoutMillis(); + + if (idleAdvanceTimeoutMillis <= 0 + || recordCount == 0 + || maxTimestamp == Long.MIN_VALUE + || lastRecordWallClockMillis == Long.MIN_VALUE) { + return; + } + + long currentTimeMillis = clock.currentTimeMillis(); + long idleDurationMillis = currentTimeMillis - lastRecordWallClockMillis; + if (idleDurationMillis < idleAdvanceTimeoutMillis) { + return; + } + + if (!idleAdvanceReadinessChecker.isReady(currentTimeMillis)) { + return; + } + + long outOfOrdernessMillis = calculateOutOfOrdernessMillis(); + + // Move forward from the last Kafka timestamp by wall-clock idle time, then subtract the + // observed lateness estimate and configured safety margin to avoid closing windows too early. + long watermark = + maxTimestamp + + idleDurationMillis + - outOfOrdernessMillis + - config.idleAdvanceSafetyMarginMillis() + - 1; + + if (watermark > lastEmittedWatermark) { + output.emitWatermark(new Watermark(watermark)); + lastEmittedWatermark = watermark; + } + } + private long calculateOutOfOrdernessMillis() { int sampleCount = (int) Math.min(recordCount - 1, SAMPLE_SIZE); if (sampleCount <= 0) { - return configuration.getMinOutOfOrdernessMillis(); + return config.minOutOfOrdernessMillis(); } long[] samples = Arrays.copyOf(latenessSamples, sampleCount); @@ -172,12 +232,17 @@ private long calculateOutOfOrdernessMillis() { // all event-time progress until it rotates out of the sample buffer. int quantileIndex = Math.min( - sampleCount - 1, - (int) Math.ceil(sampleCount * configuration.getOutOfOrdernessQuantile()) - 1); + sampleCount - 1, (int) Math.ceil(sampleCount * config.outOfOrdernessQuantile()) - 1); return Math.min( - configuration.getMaxOutOfOrdernessMillis(), - Math.max(configuration.getMinOutOfOrdernessMillis(), samples[quantileIndex])); + config.maxOutOfOrdernessMillis(), + Math.max(config.minOutOfOrdernessMillis(), samples[quantileIndex])); } } + + @FunctionalInterface + interface MillisClock extends Serializable { + + long currentTimeMillis(); + } } diff --git a/connectors/kafka-safe-connector/src/main/java/com/datasqrl/flinkrunner/connector/kafka/SourceWatermarkOptions.java b/connectors/kafka-safe-connector/src/main/java/com/datasqrl/flinkrunner/connector/kafka/SourceWatermarkOptions.java index c7278d3c..4d8d63d1 100644 --- a/connectors/kafka-safe-connector/src/main/java/com/datasqrl/flinkrunner/connector/kafka/SourceWatermarkOptions.java +++ b/connectors/kafka-safe-connector/src/main/java/com/datasqrl/flinkrunner/connector/kafka/SourceWatermarkOptions.java @@ -18,7 +18,6 @@ import java.io.Serial; import java.io.Serializable; import java.time.Duration; -import lombok.Value; import org.apache.flink.configuration.ConfigOption; import org.apache.flink.configuration.ConfigOptions; import org.apache.flink.configuration.ReadableConfig; @@ -26,78 +25,171 @@ public class SourceWatermarkOptions { + static final int MIN_RECORDS_DEFAULT = 250; + static final long MIN_OUT_OF_ORDERNESS_MILLIS_DEFAULT = 50L; + static final long MAX_OUT_OF_ORDERNESS_MILLIS_DEFAULT = Duration.ofDays(3).toMillis(); + static final double OUT_OF_ORDERNESS_QUANTILE_DEFAULT = 0.95D; + public static final ConfigOption SCAN_SOURCE_WATERMARK_MIN_RECORDS = ConfigOptions.key("scan.source-watermark.min-records") .intType() - .defaultValue(KafkaRecordTimestampWatermarkStrategy.MIN_RECORDS) + .defaultValue(MIN_RECORDS_DEFAULT) .withDescription( "Minimum number of records to observe before emitting source watermarks."); public static final ConfigOption SCAN_SOURCE_WATERMARK_MIN_OUT_OF_ORDERNESS = ConfigOptions.key("scan.source-watermark.min-out-of-orderness") .durationType() - .defaultValue( - Duration.ofMillis(KafkaRecordTimestampWatermarkStrategy.MIN_OUT_OF_ORDERNESS_MILLIS)) + .defaultValue(Duration.ofMillis(MIN_OUT_OF_ORDERNESS_MILLIS_DEFAULT)) .withDescription("Minimum adaptive out-of-orderness delay for source watermarks."); public static final ConfigOption SCAN_SOURCE_WATERMARK_MAX_OUT_OF_ORDERNESS = ConfigOptions.key("scan.source-watermark.max-out-of-orderness") .durationType() - .defaultValue( - Duration.ofMillis(KafkaRecordTimestampWatermarkStrategy.MAX_OUT_OF_ORDERNESS_MILLIS)) + .defaultValue(Duration.ofMillis(MAX_OUT_OF_ORDERNESS_MILLIS_DEFAULT)) .withDescription("Maximum adaptive out-of-orderness delay for source watermarks."); public static final ConfigOption SCAN_SOURCE_WATERMARK_OUT_OF_ORDERNESS_QUANTILE = ConfigOptions.key("scan.source-watermark.out-of-orderness-quantile") .doubleType() - .defaultValue(KafkaRecordTimestampWatermarkStrategy.OUT_OF_ORDERNESS_QUANTILE) + .defaultValue(OUT_OF_ORDERNESS_QUANTILE_DEFAULT) .withDescription( "Quantile of observed lateness samples to use as the source watermark delay."); + public static final ConfigOption SCAN_SOURCE_WATERMARK_IDLE_ADVANCE_TIMEOUT = + ConfigOptions.key("scan.source-watermark.idle-advance-timeout") + .durationType() + .defaultValue(Duration.ZERO) + .withDescription( + """ + How long the Kafka source must receive no records before source watermarks may \ + continue advancing based on wall-clock time. This helps event-time windows \ + close after traffic stops. A zero or negative duration disables idle watermark \ + advancement."""); + + public static final ConfigOption SCAN_SOURCE_WATERMARK_IDLE_ADVANCE_SAFETY_MARGIN = + ConfigOptions.key("scan.source-watermark.idle-advance-safety-margin") + .durationType() + .defaultValue(Duration.ofSeconds(10)) + .withDescription( + """ + Additional duration subtracted from wall-clock-derived idle source watermarks to \ + keep advancement conservative."""); + + public static final ConfigOption + SCAN_SOURCE_WATERMARK_IDLE_ADVANCE_BROKER_CHECK_TIMEOUT = + ConfigOptions.key("scan.source-watermark.idle-advance-broker-check-timeout") + .durationType() + .defaultValue(Duration.ofSeconds(1)) + .withDescription( + """ + Timeout applied to each Kafka AdminClient broker readiness and LogAppendTime \ + metadata check before idle source-watermark advancement."""); + + public static final ConfigOption SCAN_SOURCE_WATERMARK_IDLE_ADVANCE_BROKER_CHECK_TTL = + ConfigOptions.key("scan.source-watermark.idle-advance-broker-check-ttl") + .durationType() + .defaultValue(Duration.ofSeconds(10)) + .withDescription( + """ + How long to cache Kafka AdminClient broker readiness and LogAppendTime \ + metadata check results for idle source-watermark advancement."""); + public static SourceWatermarkConfig sourceWatermarkConfiguration(ReadableConfig tableOptions) { var minRecords = tableOptions.get(SCAN_SOURCE_WATERMARK_MIN_RECORDS); var minOutOfOrderness = tableOptions.get(SCAN_SOURCE_WATERMARK_MIN_OUT_OF_ORDERNESS); var maxOutOfOrderness = tableOptions.get(SCAN_SOURCE_WATERMARK_MAX_OUT_OF_ORDERNESS); var quantile = tableOptions.get(SCAN_SOURCE_WATERMARK_OUT_OF_ORDERNESS_QUANTILE); + var idleAdvanceTimeout = tableOptions.get(SCAN_SOURCE_WATERMARK_IDLE_ADVANCE_TIMEOUT); + var idleAdvanceSafetyMargin = + tableOptions.get(SCAN_SOURCE_WATERMARK_IDLE_ADVANCE_SAFETY_MARGIN); + var idleAdvanceBrokerCheckTimeout = + tableOptions.get(SCAN_SOURCE_WATERMARK_IDLE_ADVANCE_BROKER_CHECK_TIMEOUT); + var idleAdvanceBrokerCheckTtl = + tableOptions.get(SCAN_SOURCE_WATERMARK_IDLE_ADVANCE_BROKER_CHECK_TTL); if (minRecords <= 0) { throw new ValidationException( - String.format("'%s' must be greater than 0.", SCAN_SOURCE_WATERMARK_MIN_RECORDS.key())); + "'%s' must be greater than 0.".formatted(SCAN_SOURCE_WATERMARK_MIN_RECORDS.key())); } if (minOutOfOrderness.isNegative()) { throw new ValidationException( - String.format( - "'%s' must not be negative.", SCAN_SOURCE_WATERMARK_MIN_OUT_OF_ORDERNESS.key())); + "'%s' must not be negative.".formatted(SCAN_SOURCE_WATERMARK_MIN_OUT_OF_ORDERNESS.key())); } if (maxOutOfOrderness.compareTo(minOutOfOrderness) < 0) { throw new ValidationException( - String.format( - "'%s' must be greater than or equal to '%s'.", - SCAN_SOURCE_WATERMARK_MAX_OUT_OF_ORDERNESS.key(), - SCAN_SOURCE_WATERMARK_MIN_OUT_OF_ORDERNESS.key())); + "'%s' must be greater than or equal to '%s'." + .formatted( + SCAN_SOURCE_WATERMARK_MAX_OUT_OF_ORDERNESS.key(), + SCAN_SOURCE_WATERMARK_MIN_OUT_OF_ORDERNESS.key())); } if (quantile <= 0D || quantile > 1D) { throw new ValidationException( - String.format( - "'%s' must be greater than 0 and less than or equal to 1.", - SCAN_SOURCE_WATERMARK_OUT_OF_ORDERNESS_QUANTILE.key())); + "'%s' must be greater than 0 and less than or equal to 1." + .formatted(SCAN_SOURCE_WATERMARK_OUT_OF_ORDERNESS_QUANTILE.key())); + } + + if (idleAdvanceTimeout.isNegative()) { + throw new ValidationException( + "'%s' must not be negative.".formatted(SCAN_SOURCE_WATERMARK_IDLE_ADVANCE_TIMEOUT.key())); + } + + if (idleAdvanceSafetyMargin.isNegative()) { + throw new ValidationException( + "'%s' must not be negative." + .formatted(SCAN_SOURCE_WATERMARK_IDLE_ADVANCE_SAFETY_MARGIN.key())); + } + + if (idleAdvanceBrokerCheckTimeout.isNegative() || idleAdvanceBrokerCheckTimeout.isZero()) { + throw new ValidationException( + "'%s' must be greater than 0." + .formatted(SCAN_SOURCE_WATERMARK_IDLE_ADVANCE_BROKER_CHECK_TIMEOUT.key())); + } + + if (idleAdvanceBrokerCheckTtl.isNegative() || idleAdvanceBrokerCheckTtl.isZero()) { + throw new ValidationException( + "'%s' must be greater than 0." + .formatted(SCAN_SOURCE_WATERMARK_IDLE_ADVANCE_BROKER_CHECK_TTL.key())); } return new SourceWatermarkConfig( - minRecords, minOutOfOrderness.toMillis(), maxOutOfOrderness.toMillis(), quantile); + minRecords, + minOutOfOrderness.toMillis(), + maxOutOfOrderness.toMillis(), + quantile, + idleAdvanceTimeout.toMillis(), + idleAdvanceSafetyMargin.toMillis(), + idleAdvanceBrokerCheckTimeout.toMillis(), + idleAdvanceBrokerCheckTtl.toMillis()); } - @Value - public static class SourceWatermarkConfig implements Serializable { + public record SourceWatermarkConfig( + int minRecords, + long minOutOfOrdernessMillis, + long maxOutOfOrdernessMillis, + double outOfOrdernessQuantile, + long idleAdvanceTimeoutMillis, + long idleAdvanceSafetyMarginMillis, + long idleAdvanceBrokerCheckTimeoutMillis, + long idleAdvanceBrokerCheckTtlMillis) + implements Serializable { @Serial private static final long serialVersionUID = 1L; - int minRecords; - long minOutOfOrdernessMillis; - long maxOutOfOrdernessMillis; - double outOfOrdernessQuantile; + public SourceWatermarkConfig() { + // Turn off idle advancement by default. + this( + MIN_RECORDS_DEFAULT, + MIN_OUT_OF_ORDERNESS_MILLIS_DEFAULT, + MAX_OUT_OF_ORDERNESS_MILLIS_DEFAULT, + OUT_OF_ORDERNESS_QUANTILE_DEFAULT, + 0L, + 0L, + 0L, + 0L); + } } } diff --git a/connectors/kafka-safe-connector/src/main/java/org/apache/flink/streaming/connectors/kafka/table/SafeKafkaDynamicSource.java b/connectors/kafka-safe-connector/src/main/java/org/apache/flink/streaming/connectors/kafka/table/SafeKafkaDynamicSource.java index 40d43bb3..3ea6b816 100644 --- a/connectors/kafka-safe-connector/src/main/java/org/apache/flink/streaming/connectors/kafka/table/SafeKafkaDynamicSource.java +++ b/connectors/kafka-safe-connector/src/main/java/org/apache/flink/streaming/connectors/kafka/table/SafeKafkaDynamicSource.java @@ -568,8 +568,15 @@ private WatermarkStrategy getWatermarkStrategy() { if (sourceWatermarkEnabled) { WatermarkStrategy sourceWatermarkStrategy = new KafkaRecordTimestampWatermarkStrategy( - sourceWatermarkEmitStrategy, sourceWatermarkConfig); - if (sourceWatermarkIdleTimeout.isPresent()) { + sourceWatermarkEmitStrategy, + sourceWatermarkConfig, + properties, + topics); + + // Skip Flink idleness when idle advancement is enabled, otherwise the source + // may be marked idle before this strategy can emit wall-clock-derived watermarks. + if (sourceWatermarkIdleTimeout.isPresent() + && sourceWatermarkConfig.idleAdvanceTimeoutMillis() <= 0) { sourceWatermarkStrategy = sourceWatermarkStrategy.withIdleness(sourceWatermarkIdleTimeout.get()); } diff --git a/connectors/kafka-safe-connector/src/main/java/org/apache/flink/streaming/connectors/kafka/table/SafeKafkaDynamicTableFactory.java b/connectors/kafka-safe-connector/src/main/java/org/apache/flink/streaming/connectors/kafka/table/SafeKafkaDynamicTableFactory.java index fe7de0e3..8d3a7e45 100644 --- a/connectors/kafka-safe-connector/src/main/java/org/apache/flink/streaming/connectors/kafka/table/SafeKafkaDynamicTableFactory.java +++ b/connectors/kafka-safe-connector/src/main/java/org/apache/flink/streaming/connectors/kafka/table/SafeKafkaDynamicTableFactory.java @@ -70,6 +70,10 @@ import static com.datasqrl.flinkrunner.connector.kafka.DeserFailureHandlerOptions.SCAN_DESER_FAILURE_TOPIC; import static com.datasqrl.flinkrunner.connector.kafka.DeserFailureHandlerOptions.validateDeserFailureHandlerOptions; import static com.datasqrl.flinkrunner.connector.kafka.SourceWatermarkOptions.SCAN_SOURCE_WATERMARK_MAX_OUT_OF_ORDERNESS; +import static com.datasqrl.flinkrunner.connector.kafka.SourceWatermarkOptions.SCAN_SOURCE_WATERMARK_IDLE_ADVANCE_BROKER_CHECK_TIMEOUT; +import static com.datasqrl.flinkrunner.connector.kafka.SourceWatermarkOptions.SCAN_SOURCE_WATERMARK_IDLE_ADVANCE_BROKER_CHECK_TTL; +import static com.datasqrl.flinkrunner.connector.kafka.SourceWatermarkOptions.SCAN_SOURCE_WATERMARK_IDLE_ADVANCE_SAFETY_MARGIN; +import static com.datasqrl.flinkrunner.connector.kafka.SourceWatermarkOptions.SCAN_SOURCE_WATERMARK_IDLE_ADVANCE_TIMEOUT; import static com.datasqrl.flinkrunner.connector.kafka.SourceWatermarkOptions.SCAN_SOURCE_WATERMARK_MIN_OUT_OF_ORDERNESS; import static com.datasqrl.flinkrunner.connector.kafka.SourceWatermarkOptions.SCAN_SOURCE_WATERMARK_MIN_RECORDS; import static com.datasqrl.flinkrunner.connector.kafka.SourceWatermarkOptions.SCAN_SOURCE_WATERMARK_OUT_OF_ORDERNESS_QUANTILE; @@ -175,6 +179,10 @@ public Set> optionalOptions() { options.add(SCAN_SOURCE_WATERMARK_MIN_OUT_OF_ORDERNESS); options.add(SCAN_SOURCE_WATERMARK_MAX_OUT_OF_ORDERNESS); options.add(SCAN_SOURCE_WATERMARK_OUT_OF_ORDERNESS_QUANTILE); + options.add(SCAN_SOURCE_WATERMARK_IDLE_ADVANCE_TIMEOUT); + options.add(SCAN_SOURCE_WATERMARK_IDLE_ADVANCE_SAFETY_MARGIN); + options.add(SCAN_SOURCE_WATERMARK_IDLE_ADVANCE_BROKER_CHECK_TIMEOUT); + options.add(SCAN_SOURCE_WATERMARK_IDLE_ADVANCE_BROKER_CHECK_TTL); return options; } diff --git a/connectors/kafka-safe-connector/src/main/java/org/apache/flink/streaming/connectors/kafka/table/SafeUpsertKafkaDynamicTableFactory.java b/connectors/kafka-safe-connector/src/main/java/org/apache/flink/streaming/connectors/kafka/table/SafeUpsertKafkaDynamicTableFactory.java index 2f2467f7..69d94f6a 100644 --- a/connectors/kafka-safe-connector/src/main/java/org/apache/flink/streaming/connectors/kafka/table/SafeUpsertKafkaDynamicTableFactory.java +++ b/connectors/kafka-safe-connector/src/main/java/org/apache/flink/streaming/connectors/kafka/table/SafeUpsertKafkaDynamicTableFactory.java @@ -56,6 +56,10 @@ import static com.datasqrl.flinkrunner.connector.kafka.DeserFailureHandlerOptions.*; import static com.datasqrl.flinkrunner.connector.kafka.SourceWatermarkOptions.SCAN_SOURCE_WATERMARK_MAX_OUT_OF_ORDERNESS; +import static com.datasqrl.flinkrunner.connector.kafka.SourceWatermarkOptions.SCAN_SOURCE_WATERMARK_IDLE_ADVANCE_BROKER_CHECK_TIMEOUT; +import static com.datasqrl.flinkrunner.connector.kafka.SourceWatermarkOptions.SCAN_SOURCE_WATERMARK_IDLE_ADVANCE_BROKER_CHECK_TTL; +import static com.datasqrl.flinkrunner.connector.kafka.SourceWatermarkOptions.SCAN_SOURCE_WATERMARK_IDLE_ADVANCE_SAFETY_MARGIN; +import static com.datasqrl.flinkrunner.connector.kafka.SourceWatermarkOptions.SCAN_SOURCE_WATERMARK_IDLE_ADVANCE_TIMEOUT; import static com.datasqrl.flinkrunner.connector.kafka.SourceWatermarkOptions.SCAN_SOURCE_WATERMARK_MIN_OUT_OF_ORDERNESS; import static com.datasqrl.flinkrunner.connector.kafka.SourceWatermarkOptions.SCAN_SOURCE_WATERMARK_MIN_RECORDS; import static com.datasqrl.flinkrunner.connector.kafka.SourceWatermarkOptions.SCAN_SOURCE_WATERMARK_OUT_OF_ORDERNESS_QUANTILE; @@ -136,6 +140,10 @@ public Set> optionalOptions() { options.add(SCAN_SOURCE_WATERMARK_MIN_OUT_OF_ORDERNESS); options.add(SCAN_SOURCE_WATERMARK_MAX_OUT_OF_ORDERNESS); options.add(SCAN_SOURCE_WATERMARK_OUT_OF_ORDERNESS_QUANTILE); + options.add(SCAN_SOURCE_WATERMARK_IDLE_ADVANCE_TIMEOUT); + options.add(SCAN_SOURCE_WATERMARK_IDLE_ADVANCE_SAFETY_MARGIN); + options.add(SCAN_SOURCE_WATERMARK_IDLE_ADVANCE_BROKER_CHECK_TIMEOUT); + options.add(SCAN_SOURCE_WATERMARK_IDLE_ADVANCE_BROKER_CHECK_TTL); return options; } diff --git a/connectors/kafka-safe-connector/src/test/java/com/datasqrl/flinkrunner/connector/kafka/KafkaAdminIdleAdvanceReadinessCheckerTest.java b/connectors/kafka-safe-connector/src/test/java/com/datasqrl/flinkrunner/connector/kafka/KafkaAdminIdleAdvanceReadinessCheckerTest.java new file mode 100644 index 00000000..d358b471 --- /dev/null +++ b/connectors/kafka-safe-connector/src/test/java/com/datasqrl/flinkrunner/connector/kafka/KafkaAdminIdleAdvanceReadinessCheckerTest.java @@ -0,0 +1,56 @@ +/* + * Copyright © 2026 DataSQRL (contact@datasqrl.com) + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.datasqrl.flinkrunner.connector.kafka; + +import static org.assertj.core.api.Assertions.assertThat; + +import com.datasqrl.flinkrunner.connector.kafka.SourceWatermarkOptions.SourceWatermarkConfig; +import java.util.List; +import java.util.Properties; +import org.junit.jupiter.api.Test; + +class KafkaAdminIdleAdvanceReadinessCheckerTest { + + @Test + void testCachesBrokerCheckUntilConfiguredTtlExpires() { + var checker = + new CountingReadinessChecker( + new SourceWatermarkConfig(250, 50, 1000, 0.95D, 1000, 10_000, 1000, 10_000)); + + assertThat(checker.isReady(1000)).isTrue(); + assertThat(checker.isReady(10_999)).isTrue(); + assertThat(checker.checkCount).isEqualTo(1); + + assertThat(checker.isReady(11_000)).isFalse(); + assertThat(checker.checkCount).isEqualTo(2); + } + + private static final class CountingReadinessChecker + extends KafkaAdminIdleAdvanceReadinessChecker { + + private int checkCount; + + private CountingReadinessChecker(SourceWatermarkConfig config) { + super(new Properties(), List.of("topic"), config); + } + + @Override + boolean checkBrokerAndTopicTimestampType() { + checkCount++; + return checkCount == 1; + } + } +} diff --git a/connectors/kafka-safe-connector/src/test/java/com/datasqrl/flinkrunner/connector/kafka/KafkaRecordTimestampWatermarkStrategyTest.java b/connectors/kafka-safe-connector/src/test/java/com/datasqrl/flinkrunner/connector/kafka/KafkaRecordTimestampWatermarkStrategyTest.java index 4ab2f0f7..23615edf 100644 --- a/connectors/kafka-safe-connector/src/test/java/com/datasqrl/flinkrunner/connector/kafka/KafkaRecordTimestampWatermarkStrategyTest.java +++ b/connectors/kafka-safe-connector/src/test/java/com/datasqrl/flinkrunner/connector/kafka/KafkaRecordTimestampWatermarkStrategyTest.java @@ -15,6 +15,8 @@ */ package com.datasqrl.flinkrunner.connector.kafka; +import static com.datasqrl.flinkrunner.connector.kafka.SourceWatermarkOptions.MIN_OUT_OF_ORDERNESS_MILLIS_DEFAULT; +import static com.datasqrl.flinkrunner.connector.kafka.SourceWatermarkOptions.MIN_RECORDS_DEFAULT; import static org.assertj.core.api.Assertions.assertThat; import com.datasqrl.flinkrunner.connector.kafka.SourceWatermarkOptions.SourceWatermarkConfig; @@ -35,7 +37,7 @@ void testDoesNotEmitBeforeWarmup() { final WatermarkGenerator generator = createOnPeriodicGenerator(); final CollectingWatermarkOutput output = new CollectingWatermarkOutput(); - for (int i = 0; i < KafkaRecordTimestampWatermarkStrategy.MIN_RECORDS - 1; i++) { + for (int i = 0; i < MIN_RECORDS_DEFAULT - 1; i++) { generator.onEvent(null, i, output); } generator.onPeriodicEmit(output); @@ -48,16 +50,14 @@ void testEmitsWatermarkFromKafkaRecordTimestampAfterWarmup() { final WatermarkGenerator generator = createOnPeriodicGenerator(); final CollectingWatermarkOutput output = new CollectingWatermarkOutput(); - for (int i = 0; i < KafkaRecordTimestampWatermarkStrategy.MIN_RECORDS; i++) { + for (int i = 0; i < MIN_RECORDS_DEFAULT; i++) { generator.onEvent(null, i * 100L, output); } generator.onPeriodicEmit(output); assertThat(output.watermarks) .containsExactly( - (KafkaRecordTimestampWatermarkStrategy.MIN_RECORDS - 1) * 100L - - KafkaRecordTimestampWatermarkStrategy.MIN_OUT_OF_ORDERNESS_MILLIS - - 1); + (MIN_RECORDS_DEFAULT - 1) * 100L - MIN_OUT_OF_ORDERNESS_MILLIS_DEFAULT - 1); } @Test @@ -65,21 +65,20 @@ void testOnEventEmitStrategyEmitsWhenWarmupCompletes() { final WatermarkGenerator generator = createOnEventGenerator(); final CollectingWatermarkOutput output = new CollectingWatermarkOutput(); - for (int i = 0; i < KafkaRecordTimestampWatermarkStrategy.MIN_RECORDS; i++) { + for (int i = 0; i < MIN_RECORDS_DEFAULT; i++) { generator.onEvent(null, i * 100L, output); } assertThat(output.watermarks) .containsExactly( - (KafkaRecordTimestampWatermarkStrategy.MIN_RECORDS - 1) * 100L - - KafkaRecordTimestampWatermarkStrategy.MIN_OUT_OF_ORDERNESS_MILLIS - - 1); + (MIN_RECORDS_DEFAULT - 1) * 100L - MIN_OUT_OF_ORDERNESS_MILLIS_DEFAULT - 1); } @Test void testUsesConfiguredWarmupAndMinimumOutOfOrderness() { final WatermarkGenerator generator = - createOnPeriodicGenerator(new SourceWatermarkConfig(3, 10, 1000, 0.95D)); + createOnPeriodicGenerator( + new SourceWatermarkConfig(3, 10, 1000, 0.95D, 0, 10_000, 1000, 5000)); final CollectingWatermarkOutput output = new CollectingWatermarkOutput(); generator.onEvent(null, 0L, output); @@ -95,12 +94,12 @@ void testWatermarksAreMonotonic() { final WatermarkGenerator generator = createOnPeriodicGenerator(); final CollectingWatermarkOutput output = new CollectingWatermarkOutput(); - for (int i = 0; i < KafkaRecordTimestampWatermarkStrategy.MIN_RECORDS; i++) { + for (int i = 0; i < MIN_RECORDS_DEFAULT; i++) { generator.onEvent(null, i * 100L, output); } generator.onPeriodicEmit(output); - for (int i = 0; i < KafkaRecordTimestampWatermarkStrategy.MIN_RECORDS; i++) { + for (int i = 0; i < MIN_RECORDS_DEFAULT; i++) { generator.onEvent(null, 0L, output); } generator.onPeriodicEmit(output); @@ -108,8 +107,58 @@ void testWatermarksAreMonotonic() { assertThat(output.watermarks).hasSize(1); } + @Test + void testIdleAdvanceEmitsAfterSilenceBeforeWarmupCompletes() { + final MutableMillisClock clock = new MutableMillisClock(); + final WatermarkGenerator generator = + new KafkaRecordTimestampWatermarkStrategy( + WatermarkEmitStrategy.ON_PERIODIC, + new SourceWatermarkConfig(250, 50, 1000, 0.95D, 1000, 10_000, 1000, 5000), + clock, + currentTimeMillis -> true) + .createWatermarkGenerator(null); + final CollectingWatermarkOutput output = new CollectingWatermarkOutput(); + + generator.onEvent(null, 10_000L, output); + generator.onPeriodicEmit(output); + + clock.advanceMillis(999); + generator.onPeriodicEmit(output); + + clock.advanceMillis(1); + generator.onPeriodicEmit(output); + + assertThat(output.watermarks).containsExactly(949L); + + clock.advanceMillis(10_000); + generator.onPeriodicEmit(output); + + assertThat(output.watermarks).containsExactly(949L, 10_949L); + } + + @Test + void testIdleAdvanceDoesNotEmitWhenReadinessCheckFails() { + final MutableMillisClock clock = new MutableMillisClock(); + final WatermarkGenerator generator = + new KafkaRecordTimestampWatermarkStrategy( + WatermarkEmitStrategy.ON_PERIODIC, + new SourceWatermarkConfig(250, 50, 1000, 0.95D, 1000, 10_000, 1000, 5000), + clock, + currentTimeMillis -> false) + .createWatermarkGenerator(null); + final CollectingWatermarkOutput output = new CollectingWatermarkOutput(); + + generator.onEvent(null, 10_000L, output); + clock.advanceMillis(11_000); + generator.onPeriodicEmit(output); + + assertThat(output.watermarks).isEmpty(); + } + private static WatermarkGenerator createOnPeriodicGenerator() { - return new KafkaRecordTimestampWatermarkStrategy().createWatermarkGenerator(null); + return new KafkaRecordTimestampWatermarkStrategy( + WatermarkEmitStrategy.ON_PERIODIC, new SourceWatermarkConfig()) + .createWatermarkGenerator(null); } private static WatermarkGenerator createOnPeriodicGenerator( @@ -120,7 +169,8 @@ private static WatermarkGenerator createOnPeriodicGenerator( } private static WatermarkGenerator createOnEventGenerator() { - return new KafkaRecordTimestampWatermarkStrategy(WatermarkEmitStrategy.ON_EVENT) + return new KafkaRecordTimestampWatermarkStrategy( + WatermarkEmitStrategy.ON_EVENT, new SourceWatermarkConfig()) .createWatermarkGenerator(null); } @@ -139,4 +189,19 @@ public void markIdle() {} @Override public void markActive() {} } + + private static final class MutableMillisClock + implements KafkaRecordTimestampWatermarkStrategy.MillisClock { + + private long currentTimeMillis; + + @Override + public long currentTimeMillis() { + return currentTimeMillis; + } + + private void advanceMillis(long millis) { + currentTimeMillis += millis; + } + } } diff --git a/flink-sql-runner/src/test/java/com/datasqrl/flinkrunner/KafkaSourceWatermarkIT.java b/flink-sql-runner/src/test/java/com/datasqrl/flinkrunner/KafkaSourceWatermarkIT.java index 61288ecf..c37d3439 100644 --- a/flink-sql-runner/src/test/java/com/datasqrl/flinkrunner/KafkaSourceWatermarkIT.java +++ b/flink-sql-runner/src/test/java/com/datasqrl/flinkrunner/KafkaSourceWatermarkIT.java @@ -15,9 +15,15 @@ */ package com.datasqrl.flinkrunner; +import static java.util.concurrent.TimeUnit.MILLISECONDS; +import static java.util.concurrent.TimeUnit.SECONDS; import static org.assertj.core.api.Assertions.assertThat; +import static org.awaitility.Awaitility.await; import com.nextbreakpoint.flink.client.model.JobStatus; +import com.nextbreakpoint.flink.client.model.TerminationMode; +import java.util.ArrayList; +import java.util.List; import org.jdbi.v3.core.Jdbi; import org.jdbi.v3.sqlobject.SqlObjectPlugin; import org.jdbi.v3.sqlobject.statement.SqlQuery; @@ -28,6 +34,8 @@ class KafkaSourceWatermarkIT extends AbstractITSupport { private static final String TOPIC = "source-watermark-it"; + private static final String IDLE_ADVANCE_TOPIC = "source-watermark-idle-advance-it"; + private static final String NO_IDLE_ADVANCE_TOPIC = "source-watermark-no-idle-advance-it"; @Test void givenKafkaSourceWatermark_whenWindowing_thenWatermarkClosesWindows() throws Exception { @@ -35,10 +43,10 @@ void givenKafkaSourceWatermark_whenWindowing_thenWatermarkClosesWindows() throws resultDao.createTable(); resultDao.truncateTable(); - createTopic(); - produceTimestampedRecords(); + createTopic(TOPIC, false); + produceTimestampedRecords("/it/sqlfile/kafka_source_watermark_produce.sql"); - flinkRun("--sqlfile", "/it/sqlfile/kafka_source_watermark.sql"); + String jobId = flinkRun("--sqlfile", "/it/sqlfile/kafka_source_watermark.sql"); try { untilAssert( @@ -48,6 +56,74 @@ void givenKafkaSourceWatermark_whenWindowing_thenWatermarkClosesWindows() throws }); } catch (Throwable t) { throw new AssertionError(readFlinkLogs(), t); + } finally { + cancelJob(jobId); + } + } + + @Test + void givenSparseKafkaSourceWatermark_whenIdleAdvanceEnabled_thenWatermarkClosesWindows() + throws Exception { + var resultDao = connect(); + resultDao.createTable(); + resultDao.truncateTable(); + + createTopic(IDLE_ADVANCE_TOPIC, true); + String jobId = flinkRun("--sqlfile", "/it/sqlfile/kafka_source_watermark_idle_advance.sql"); + + try { + produceJsonRecord(IDLE_ADVANCE_TOPIC, "{\"id\":0,\"payload\":\"idle-record-0\"}"); + + untilAssert( + () -> { + assertThat(resultDao.getRowCount()).isEqualTo(1); + assertThat(resultDao.getTotalCount()).isEqualTo(1); + }); + + produceJsonRecord(IDLE_ADVANCE_TOPIC, "{\"id\":1,\"payload\":\"idle-record-1\"}"); + + untilAssert( + () -> { + assertThat(resultDao.getRowCount()).isEqualTo(2); + assertThat(resultDao.getTotalCount()).isEqualTo(2); + }); + } catch (Throwable t) { + throw new AssertionError(readFlinkLogs(), t); + } finally { + cancelJob(jobId); + } + } + + @Test + void givenSparseKafkaSourceWatermark_whenIdleAdvanceDisabled_thenWatermarkDoesNotCloseWindow() + throws Exception { + var resultDao = connect(); + resultDao.createTable(); + resultDao.truncateTable(); + + createTopic(NO_IDLE_ADVANCE_TOPIC, true); + String jobId = flinkRun("--sqlfile", "/it/sqlfile/kafka_source_watermark_no_idle_advance.sql"); + + try { + untilAssert( + () -> + assertThat(client.getJobStatusInfo(jobId).getStatus()).isEqualTo(JobStatus.RUNNING)); + + produceJsonRecord(NO_IDLE_ADVANCE_TOPIC, "{\"id\":0,\"payload\":\"idle-record-0\"}"); + + await() + .during(5, SECONDS) + .atMost(6, SECONDS) + .pollInterval(500, MILLISECONDS) + .untilAsserted( + () -> { + assertThat(resultDao.getRowCount()).isZero(); + assertThat(resultDao.getTotalCount()).isZero(); + }); + } catch (Throwable t) { + throw new AssertionError(readFlinkLogs(), t); + } finally { + cancelJob(jobId); } } @@ -58,23 +134,58 @@ private String readFlinkLogs() throws Exception { return result.getStdout() + result.getStderr(); } - private void createTopic() throws Exception { - Container.ExecResult result = - redpandaContainer.execInContainer( - "rpk", "topic", "create", TOPIC, "--brokers", "redpanda:9092", "--partitions", "1"); + private void createTopic(String topic, boolean logAppendTime) throws Exception { + var command = + new ArrayList<>( + List.of( + "rpk", + "topic", + "create", + topic, + "--brokers", + "redpanda:9092", + "--partitions", + "1")); + if (logAppendTime) { + command.add("--topic-config"); + command.add("message.timestamp.type=LogAppendTime"); + } + + Container.ExecResult result = redpandaContainer.execInContainer(command.toArray(new String[0])); assertThat(result.getExitCode()) .withFailMessage(result.getStdout() + result.getStderr()) .isZero(); } - private void produceTimestampedRecords() throws Exception { - String jobId = flinkRun("--sqlfile", "/it/sqlfile/kafka_source_watermark_produce.sql"); + private void produceTimestampedRecords(String sqlFile) throws Exception { + String jobId = flinkRun("--sqlfile", sqlFile); untilAssert( () -> assertThat(client.getJobStatusInfo(jobId).getStatus()).isEqualTo(JobStatus.FINISHED)); } + private void produceJsonRecord(String topic, String json) throws Exception { + Container.ExecResult result = + redpandaContainer.execInContainer( + "bash", + "-c", + String.format( + "printf '%%s\\n' '%s' | rpk topic produce %s --brokers redpanda:9092", + json, topic)); + + assertThat(result.getExitCode()) + .withFailMessage(result.getStdout() + result.getStderr()) + .isZero(); + } + + private void cancelJob(String jobId) { + try { + client.cancelJob(jobId, TerminationMode.CANCEL); + } catch (Exception ignored) { + } + } + private ResultDao connect() { var mappedPort = postgresContainer.getMappedPort(5432); var jdbi = diff --git a/flink-sql-runner/src/test/resources/sql/kafka_source_watermark_idle_advance.sql b/flink-sql-runner/src/test/resources/sql/kafka_source_watermark_idle_advance.sql new file mode 100644 index 00000000..facaf277 --- /dev/null +++ b/flink-sql-runner/src/test/resources/sql/kafka_source_watermark_idle_advance.sql @@ -0,0 +1,41 @@ +CREATE TABLE idle_advance_events ( + id BIGINT, + payload STRING, + ts TIMESTAMP_LTZ(3) METADATA FROM 'timestamp', + WATERMARK FOR ts AS SOURCE_WATERMARK() +) WITH ( + 'connector' = 'kafka-safe', + 'topic' = 'source-watermark-idle-advance-it', + 'properties.bootstrap.servers' = 'redpanda:9092', + 'properties.group.id' = 'source-watermark-idle-advance-it', + 'scan.startup.mode' = 'earliest-offset', + 'scan.source-watermark.min-records' = '250', + 'scan.source-watermark.min-out-of-orderness' = '50 ms', + 'scan.source-watermark.idle-advance-timeout' = '1 s', + 'scan.source-watermark.idle-advance-safety-margin' = '1 s', + 'scan.source-watermark.idle-advance-broker-check-timeout' = '1 s', + 'format' = 'json' +); + +CREATE TABLE source_watermark_results ( + window_start TIMESTAMP(3) NOT NULL, + window_end TIMESTAMP(3) NOT NULL, + cnt BIGINT NOT NULL +) WITH ( + 'connector' = 'jdbc', + 'driver' = 'org.postgresql.Driver', + 'url' = '${JDBC_URL}', + 'username' = '${JDBC_USERNAME}', + 'password' = '${JDBC_PASSWORD}', + 'table-name' = 'source_watermark_results' +); + +INSERT INTO source_watermark_results +SELECT + CAST(window_start AS TIMESTAMP(3)), + CAST(window_end AS TIMESTAMP(3)), + COUNT(*) +FROM TABLE( + TUMBLE(TABLE idle_advance_events, DESCRIPTOR(ts), INTERVAL '1' SECOND) +) +GROUP BY window_start, window_end; diff --git a/flink-sql-runner/src/test/resources/sql/kafka_source_watermark_no_idle_advance.sql b/flink-sql-runner/src/test/resources/sql/kafka_source_watermark_no_idle_advance.sql new file mode 100644 index 00000000..a61b8ced --- /dev/null +++ b/flink-sql-runner/src/test/resources/sql/kafka_source_watermark_no_idle_advance.sql @@ -0,0 +1,38 @@ +CREATE TABLE no_idle_advance_events ( + id BIGINT, + payload STRING, + ts TIMESTAMP_LTZ(3) METADATA FROM 'timestamp', + WATERMARK FOR ts AS SOURCE_WATERMARK() +) WITH ( + 'connector' = 'kafka-safe', + 'topic' = 'source-watermark-no-idle-advance-it', + 'properties.bootstrap.servers' = 'redpanda:9092', + 'properties.group.id' = 'source-watermark-no-idle-advance-it', + 'scan.startup.mode' = 'earliest-offset', + 'scan.source-watermark.min-records' = '250', + 'scan.source-watermark.min-out-of-orderness' = '50 ms', + 'format' = 'json' +); + +CREATE TABLE source_watermark_results ( + window_start TIMESTAMP(3) NOT NULL, + window_end TIMESTAMP(3) NOT NULL, + cnt BIGINT NOT NULL +) WITH ( + 'connector' = 'jdbc', + 'driver' = 'org.postgresql.Driver', + 'url' = '${JDBC_URL}', + 'username' = '${JDBC_USERNAME}', + 'password' = '${JDBC_PASSWORD}', + 'table-name' = 'source_watermark_results' +); + +INSERT INTO source_watermark_results +SELECT + CAST(window_start AS TIMESTAMP(3)), + CAST(window_end AS TIMESTAMP(3)), + COUNT(*) +FROM TABLE( + TUMBLE(TABLE no_idle_advance_events, DESCRIPTOR(ts), INTERVAL '1' SECOND) +) +GROUP BY window_start, window_end; From f5fc775a6402daa81a738c61a4eebfdf0e8ffeb1 Mon Sep 17 00:00:00 2001 From: Ferenc Csaky Date: Tue, 9 Jun 2026 14:34:32 +0200 Subject: [PATCH 4/6] address review, fix Flink pkg spotless formatting --- ...KafkaAdminIdleAdvanceReadinessChecker.java | 14 +++++-- ...KafkaRecordTimestampWatermarkStrategy.java | 10 ++--- ...SafeDynamicKafkaDeserializationSchema.java | 1 + .../kafka/table/SafeKafkaDynamicSource.java | 42 ++++++++++++------- .../table/SafeKafkaDynamicTableFactory.java | 10 +++-- .../SafeUpsertKafkaDynamicTableFactory.java | 7 ++-- pom.xml | 4 +- 7 files changed, 52 insertions(+), 36 deletions(-) diff --git a/connectors/kafka-safe-connector/src/main/java/com/datasqrl/flinkrunner/connector/kafka/KafkaAdminIdleAdvanceReadinessChecker.java b/connectors/kafka-safe-connector/src/main/java/com/datasqrl/flinkrunner/connector/kafka/KafkaAdminIdleAdvanceReadinessChecker.java index a839fea1..84651b3e 100644 --- a/connectors/kafka-safe-connector/src/main/java/com/datasqrl/flinkrunner/connector/kafka/KafkaAdminIdleAdvanceReadinessChecker.java +++ b/connectors/kafka-safe-connector/src/main/java/com/datasqrl/flinkrunner/connector/kafka/KafkaAdminIdleAdvanceReadinessChecker.java @@ -28,11 +28,12 @@ import org.apache.kafka.clients.admin.AdminClient; import org.apache.kafka.common.config.ConfigResource; import org.apache.kafka.common.config.TopicConfig; +import org.apache.kafka.common.errors.AuthorizationException; import org.apache.kafka.common.record.TimestampType; /** Readiness checker for idle source-watermark advancement backed by Kafka AdminClient metadata. */ @Slf4j -class KafkaAdminIdleAdvanceReadinessChecker implements IdleAdvanceReadinessChecker { +public class KafkaAdminIdleAdvanceReadinessChecker implements IdleAdvanceReadinessChecker { @Serial private static final long serialVersionUID = 1L; @@ -50,10 +51,12 @@ class KafkaAdminIdleAdvanceReadinessChecker implements IdleAdvanceReadinessCheck private transient long lastCheckMillis = Long.MIN_VALUE; private transient boolean lastCheckReady; - KafkaAdminIdleAdvanceReadinessChecker( + public KafkaAdminIdleAdvanceReadinessChecker( Properties kafkaProperties, List topics, SourceWatermarkConfig config) { + checkArgument( + topics != null && !topics.isEmpty(), + "Watermark idle advance only supports the 'topic' configuration"); this.kafkaProperties = kafkaProperties; - checkArgument(topics != null && !topics.isEmpty(), "topics must not be empty"); this.topics = List.copyOf(topics); this.brokerCheckTimeoutMillis = config.idleAdvanceBrokerCheckTimeoutMillis(); this.brokerCheckTtlMillis = config.idleAdvanceBrokerCheckTtlMillis(); @@ -93,9 +96,12 @@ boolean checkBrokerAndTopicTimestampType() { var timestampType = config.get(TopicConfig.MESSAGE_TIMESTAMP_TYPE_CONFIG); return TimestampType.LOG_APPEND_TIME.toString().equals(timestampType.value()); }); + } catch (AuthorizationException e) { + throw new RuntimeException( + "Cannot check idle watermark advance readiness: insufficient Kafka permissions.", e); } catch (Exception e) { - log.debug("Failed to check broker and topic timestamp type", e); + log.debug("Failed to check idle watermark advance readiness", e); return false; } } diff --git a/connectors/kafka-safe-connector/src/main/java/com/datasqrl/flinkrunner/connector/kafka/KafkaRecordTimestampWatermarkStrategy.java b/connectors/kafka-safe-connector/src/main/java/com/datasqrl/flinkrunner/connector/kafka/KafkaRecordTimestampWatermarkStrategy.java index e61a9b1b..e7f65893 100644 --- a/connectors/kafka-safe-connector/src/main/java/com/datasqrl/flinkrunner/connector/kafka/KafkaRecordTimestampWatermarkStrategy.java +++ b/connectors/kafka-safe-connector/src/main/java/com/datasqrl/flinkrunner/connector/kafka/KafkaRecordTimestampWatermarkStrategy.java @@ -19,8 +19,6 @@ import java.io.Serial; import java.io.Serializable; import java.util.Arrays; -import java.util.List; -import java.util.Properties; import org.apache.flink.annotation.Internal; import org.apache.flink.annotation.VisibleForTesting; import org.apache.flink.api.common.eventtime.Watermark; @@ -47,17 +45,15 @@ public final class KafkaRecordTimestampWatermarkStrategy implements WatermarkStr public KafkaRecordTimestampWatermarkStrategy( WatermarkEmitStrategy emitStrategy, SourceWatermarkConfig sourceWatermarkConfig, - Properties kafkaProperties, - List topics) { + KafkaAdminIdleAdvanceReadinessChecker idleAdvanceReadinessChecker) { this( emitStrategy, sourceWatermarkConfig, System::currentTimeMillis, - new KafkaAdminIdleAdvanceReadinessChecker(kafkaProperties, topics, sourceWatermarkConfig)); + idleAdvanceReadinessChecker); } - @VisibleForTesting - KafkaRecordTimestampWatermarkStrategy( + public KafkaRecordTimestampWatermarkStrategy( WatermarkEmitStrategy emitStrategy, SourceWatermarkConfig sourceWatermarkConfig) { this( emitStrategy, sourceWatermarkConfig, System::currentTimeMillis, currentTimeMillis -> false); diff --git a/connectors/kafka-safe-connector/src/main/java/org/apache/flink/streaming/connectors/kafka/table/SafeDynamicKafkaDeserializationSchema.java b/connectors/kafka-safe-connector/src/main/java/org/apache/flink/streaming/connectors/kafka/table/SafeDynamicKafkaDeserializationSchema.java index a62473d2..3f9a8325 100644 --- a/connectors/kafka-safe-connector/src/main/java/org/apache/flink/streaming/connectors/kafka/table/SafeDynamicKafkaDeserializationSchema.java +++ b/connectors/kafka-safe-connector/src/main/java/org/apache/flink/streaming/connectors/kafka/table/SafeDynamicKafkaDeserializationSchema.java @@ -25,6 +25,7 @@ import org.apache.kafka.clients.consumer.ConsumerRecord; import javax.annotation.Nullable; + import java.io.IOException; /** A specific {@link KafkaRecordDeserializationSchema} for {@link SafeKafkaDynamicSource}. */ diff --git a/connectors/kafka-safe-connector/src/main/java/org/apache/flink/streaming/connectors/kafka/table/SafeKafkaDynamicSource.java b/connectors/kafka-safe-connector/src/main/java/org/apache/flink/streaming/connectors/kafka/table/SafeKafkaDynamicSource.java index 3ea6b816..3ec44b50 100644 --- a/connectors/kafka-safe-connector/src/main/java/org/apache/flink/streaming/connectors/kafka/table/SafeKafkaDynamicSource.java +++ b/connectors/kafka-safe-connector/src/main/java/org/apache/flink/streaming/connectors/kafka/table/SafeKafkaDynamicSource.java @@ -15,8 +15,6 @@ */ package org.apache.flink.streaming.connectors.kafka.table; -import com.datasqrl.flinkrunner.connector.kafka.KafkaRecordTimestampWatermarkStrategy; -import com.datasqrl.flinkrunner.connector.kafka.SourceWatermarkOptions.SourceWatermarkConfig; import org.apache.flink.annotation.Internal; import org.apache.flink.api.common.eventtime.WatermarkStrategy; import org.apache.flink.api.common.serialization.DeserializationSchema; @@ -57,6 +55,9 @@ import org.apache.flink.util.Preconditions; import com.datasqrl.flinkrunner.connector.kafka.DeserFailureHandler; +import com.datasqrl.flinkrunner.connector.kafka.KafkaAdminIdleAdvanceReadinessChecker; +import com.datasqrl.flinkrunner.connector.kafka.KafkaRecordTimestampWatermarkStrategy; +import com.datasqrl.flinkrunner.connector.kafka.SourceWatermarkOptions.SourceWatermarkConfig; import org.apache.kafka.clients.consumer.ConsumerConfig; import org.apache.kafka.clients.consumer.ConsumerRecord; import org.apache.kafka.clients.consumer.OffsetResetStrategy; @@ -273,8 +274,7 @@ public SafeKafkaDynamicSource( "Source watermark idle timeout must not be null."); this.sourceWatermarkConfig = Preconditions.checkNotNull( - sourceWatermarkConfig, - "Source watermark configuration must not be null."); + sourceWatermarkConfig, "Source watermark configuration must not be null."); } @Override @@ -404,7 +404,7 @@ public DynamicTableSource copy() { deserFailureHandler, sourceWatermarkEmitStrategy, sourceWatermarkIdleTimeout, - sourceWatermarkConfig); + sourceWatermarkConfig); copy.producedDataType = producedDataType; copy.metadataKeys = metadataKeys; copy.watermarkStrategy = watermarkStrategy; @@ -479,7 +479,7 @@ public int hashCode() { sourceWatermarkEnabled, sourceWatermarkEmitStrategy, sourceWatermarkIdleTimeout, - sourceWatermarkConfig, + sourceWatermarkConfig, parallelism); } @@ -566,17 +566,27 @@ private WatermarkStrategy getWatermarkStrategy() { } if (sourceWatermarkEnabled) { - WatermarkStrategy sourceWatermarkStrategy = - new KafkaRecordTimestampWatermarkStrategy( - sourceWatermarkEmitStrategy, - sourceWatermarkConfig, - properties, - topics); - - // Skip Flink idleness when idle advancement is enabled, otherwise the source + boolean idleAdvanceEnabled = sourceWatermarkConfig.idleAdvanceTimeoutMillis() > 0; + WatermarkStrategy sourceWatermarkStrategy; + if (idleAdvanceEnabled) { + KafkaAdminIdleAdvanceReadinessChecker idleAdvanceReadinessChecker = + new KafkaAdminIdleAdvanceReadinessChecker( + properties, topics, sourceWatermarkConfig); + + sourceWatermarkStrategy = + new KafkaRecordTimestampWatermarkStrategy( + sourceWatermarkEmitStrategy, + sourceWatermarkConfig, + idleAdvanceReadinessChecker); + } else { + sourceWatermarkStrategy = + new KafkaRecordTimestampWatermarkStrategy( + sourceWatermarkEmitStrategy, sourceWatermarkConfig); + } + + // Only apply Flink idleness when idle advancement is disabled, otherwise the source // may be marked idle before this strategy can emit wall-clock-derived watermarks. - if (sourceWatermarkIdleTimeout.isPresent() - && sourceWatermarkConfig.idleAdvanceTimeoutMillis() <= 0) { + if (sourceWatermarkIdleTimeout.isPresent() && !idleAdvanceEnabled) { sourceWatermarkStrategy = sourceWatermarkStrategy.withIdleness(sourceWatermarkIdleTimeout.get()); } diff --git a/connectors/kafka-safe-connector/src/main/java/org/apache/flink/streaming/connectors/kafka/table/SafeKafkaDynamicTableFactory.java b/connectors/kafka-safe-connector/src/main/java/org/apache/flink/streaming/connectors/kafka/table/SafeKafkaDynamicTableFactory.java index 8d3a7e45..2cf2b714 100644 --- a/connectors/kafka-safe-connector/src/main/java/org/apache/flink/streaming/connectors/kafka/table/SafeKafkaDynamicTableFactory.java +++ b/connectors/kafka-safe-connector/src/main/java/org/apache/flink/streaming/connectors/kafka/table/SafeKafkaDynamicTableFactory.java @@ -16,9 +16,6 @@ package org.apache.flink.streaming.connectors.kafka.table; -import com.datasqrl.flinkrunner.connector.kafka.DeserFailureHandler; -import com.datasqrl.flinkrunner.connector.kafka.SourceWatermarkOptions.SourceWatermarkConfig; -import com.google.auto.service.AutoService; import org.apache.flink.api.common.serialization.DeserializationSchema; import org.apache.flink.api.common.serialization.SerializationSchema; import org.apache.flink.configuration.ConfigOption; @@ -50,11 +47,16 @@ import org.apache.flink.table.types.DataType; import org.apache.flink.table.watermark.WatermarkEmitStrategy; import org.apache.flink.types.RowKind; + +import com.datasqrl.flinkrunner.connector.kafka.DeserFailureHandler; +import com.datasqrl.flinkrunner.connector.kafka.SourceWatermarkOptions.SourceWatermarkConfig; +import com.google.auto.service.AutoService; import org.apache.kafka.common.TopicPartition; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import javax.annotation.Nullable; + import java.time.Duration; import java.util.HashSet; import java.util.List; @@ -69,11 +71,11 @@ import static com.datasqrl.flinkrunner.connector.kafka.DeserFailureHandlerOptions.SCAN_DESER_FAILURE_HANDLER; import static com.datasqrl.flinkrunner.connector.kafka.DeserFailureHandlerOptions.SCAN_DESER_FAILURE_TOPIC; import static com.datasqrl.flinkrunner.connector.kafka.DeserFailureHandlerOptions.validateDeserFailureHandlerOptions; -import static com.datasqrl.flinkrunner.connector.kafka.SourceWatermarkOptions.SCAN_SOURCE_WATERMARK_MAX_OUT_OF_ORDERNESS; import static com.datasqrl.flinkrunner.connector.kafka.SourceWatermarkOptions.SCAN_SOURCE_WATERMARK_IDLE_ADVANCE_BROKER_CHECK_TIMEOUT; import static com.datasqrl.flinkrunner.connector.kafka.SourceWatermarkOptions.SCAN_SOURCE_WATERMARK_IDLE_ADVANCE_BROKER_CHECK_TTL; import static com.datasqrl.flinkrunner.connector.kafka.SourceWatermarkOptions.SCAN_SOURCE_WATERMARK_IDLE_ADVANCE_SAFETY_MARGIN; import static com.datasqrl.flinkrunner.connector.kafka.SourceWatermarkOptions.SCAN_SOURCE_WATERMARK_IDLE_ADVANCE_TIMEOUT; +import static com.datasqrl.flinkrunner.connector.kafka.SourceWatermarkOptions.SCAN_SOURCE_WATERMARK_MAX_OUT_OF_ORDERNESS; import static com.datasqrl.flinkrunner.connector.kafka.SourceWatermarkOptions.SCAN_SOURCE_WATERMARK_MIN_OUT_OF_ORDERNESS; import static com.datasqrl.flinkrunner.connector.kafka.SourceWatermarkOptions.SCAN_SOURCE_WATERMARK_MIN_RECORDS; import static com.datasqrl.flinkrunner.connector.kafka.SourceWatermarkOptions.SCAN_SOURCE_WATERMARK_OUT_OF_ORDERNESS_QUANTILE; diff --git a/connectors/kafka-safe-connector/src/main/java/org/apache/flink/streaming/connectors/kafka/table/SafeUpsertKafkaDynamicTableFactory.java b/connectors/kafka-safe-connector/src/main/java/org/apache/flink/streaming/connectors/kafka/table/SafeUpsertKafkaDynamicTableFactory.java index 69d94f6a..7bc7ffd5 100644 --- a/connectors/kafka-safe-connector/src/main/java/org/apache/flink/streaming/connectors/kafka/table/SafeUpsertKafkaDynamicTableFactory.java +++ b/connectors/kafka-safe-connector/src/main/java/org/apache/flink/streaming/connectors/kafka/table/SafeUpsertKafkaDynamicTableFactory.java @@ -15,8 +15,6 @@ */ package org.apache.flink.streaming.connectors.kafka.table; -import com.datasqrl.flinkrunner.connector.kafka.DeserFailureHandler; -import com.google.auto.service.AutoService; import org.apache.flink.api.common.serialization.DeserializationSchema; import org.apache.flink.api.common.serialization.SerializationSchema; import org.apache.flink.api.java.tuple.Tuple2; @@ -44,6 +42,9 @@ import org.apache.flink.table.types.DataType; import org.apache.flink.types.RowKind; +import com.datasqrl.flinkrunner.connector.kafka.DeserFailureHandler; +import com.google.auto.service.AutoService; + import java.time.Duration; import java.util.Collections; import java.util.HashSet; @@ -55,11 +56,11 @@ import java.util.stream.Stream; import static com.datasqrl.flinkrunner.connector.kafka.DeserFailureHandlerOptions.*; -import static com.datasqrl.flinkrunner.connector.kafka.SourceWatermarkOptions.SCAN_SOURCE_WATERMARK_MAX_OUT_OF_ORDERNESS; import static com.datasqrl.flinkrunner.connector.kafka.SourceWatermarkOptions.SCAN_SOURCE_WATERMARK_IDLE_ADVANCE_BROKER_CHECK_TIMEOUT; import static com.datasqrl.flinkrunner.connector.kafka.SourceWatermarkOptions.SCAN_SOURCE_WATERMARK_IDLE_ADVANCE_BROKER_CHECK_TTL; import static com.datasqrl.flinkrunner.connector.kafka.SourceWatermarkOptions.SCAN_SOURCE_WATERMARK_IDLE_ADVANCE_SAFETY_MARGIN; import static com.datasqrl.flinkrunner.connector.kafka.SourceWatermarkOptions.SCAN_SOURCE_WATERMARK_IDLE_ADVANCE_TIMEOUT; +import static com.datasqrl.flinkrunner.connector.kafka.SourceWatermarkOptions.SCAN_SOURCE_WATERMARK_MAX_OUT_OF_ORDERNESS; import static com.datasqrl.flinkrunner.connector.kafka.SourceWatermarkOptions.SCAN_SOURCE_WATERMARK_MIN_OUT_OF_ORDERNESS; import static com.datasqrl.flinkrunner.connector.kafka.SourceWatermarkOptions.SCAN_SOURCE_WATERMARK_MIN_RECORDS; import static com.datasqrl.flinkrunner.connector.kafka.SourceWatermarkOptions.SCAN_SOURCE_WATERMARK_OUT_OF_ORDERNESS_QUANTILE; diff --git a/pom.xml b/pom.xml index d077e123..8da351a5 100644 --- a/pom.xml +++ b/pom.xml @@ -445,12 +445,12 @@ validate - + **/org/apache/flink/**/*.java - 1.8 + 1.10.0 From 9d7c6356f1b4d951d6c91962f94287942d90df51 Mon Sep 17 00:00:00 2001 From: Ferenc Csaky Date: Tue, 9 Jun 2026 16:32:03 +0200 Subject: [PATCH 5/6] address complete idleness in idle advance logic --- ...KafkaRecordTimestampWatermarkStrategy.java | 39 +++++++++++++++-- ...aRecordTimestampWatermarkStrategyTest.java | 43 ++++++++++++++++++- 2 files changed, 76 insertions(+), 6 deletions(-) diff --git a/connectors/kafka-safe-connector/src/main/java/com/datasqrl/flinkrunner/connector/kafka/KafkaRecordTimestampWatermarkStrategy.java b/connectors/kafka-safe-connector/src/main/java/com/datasqrl/flinkrunner/connector/kafka/KafkaRecordTimestampWatermarkStrategy.java index e7f65893..8756de3e 100644 --- a/connectors/kafka-safe-connector/src/main/java/com/datasqrl/flinkrunner/connector/kafka/KafkaRecordTimestampWatermarkStrategy.java +++ b/connectors/kafka-safe-connector/src/main/java/com/datasqrl/flinkrunner/connector/kafka/KafkaRecordTimestampWatermarkStrategy.java @@ -107,6 +107,7 @@ private static final class AdaptiveKafkaRecordTimestampWatermarkGenerator private final SourceWatermarkConfig config; private final MillisClock clock; private final IdleAdvanceReadinessChecker idleAdvanceReadinessChecker; + private final long createdWallClockMillis; /** Highest Kafka record timestamp observed by this generator. */ private long maxTimestamp = Long.MIN_VALUE; @@ -117,6 +118,9 @@ private static final class AdaptiveKafkaRecordTimestampWatermarkGenerator /** Last emitted watermark, used to preserve Flink's monotonic watermark contract. */ private long lastEmittedWatermark = Long.MIN_VALUE; + /** Whether this output was marked idle before receiving any records. */ + private boolean idle; + /** * Number of records observed, including the first record that cannot produce a lateness sample. */ @@ -134,10 +138,16 @@ private AdaptiveKafkaRecordTimestampWatermarkGenerator( this.config = config; this.clock = clock; this.idleAdvanceReadinessChecker = idleAdvanceReadinessChecker; + this.createdWallClockMillis = clock.currentTimeMillis(); } @Override public void onEvent(RowData event, long eventTimestamp, WatermarkOutput output) { + if (idle) { + output.markActive(); + idle = false; + } + if (maxTimestamp != Long.MIN_VALUE) { // Sample lateness before updating maxTimestamp, so the sample reflects disorder relative to // the already observed stream frontier. @@ -181,14 +191,20 @@ private void emitIfReady(WatermarkOutput output) { private void emitIdleWatermarkIfReady(WatermarkOutput output) { long idleAdvanceTimeoutMillis = config.idleAdvanceTimeoutMillis(); - if (idleAdvanceTimeoutMillis <= 0 - || recordCount == 0 - || maxTimestamp == Long.MIN_VALUE - || lastRecordWallClockMillis == Long.MIN_VALUE) { + if (idleAdvanceTimeoutMillis <= 0) { return; } long currentTimeMillis = clock.currentTimeMillis(); + if (recordCount == 0) { + markIdleIfReady(output, currentTimeMillis, currentTimeMillis - createdWallClockMillis); + return; + } + + if (maxTimestamp == Long.MIN_VALUE || lastRecordWallClockMillis == Long.MIN_VALUE) { + return; + } + long idleDurationMillis = currentTimeMillis - lastRecordWallClockMillis; if (idleDurationMillis < idleAdvanceTimeoutMillis) { return; @@ -215,6 +231,21 @@ private void emitIdleWatermarkIfReady(WatermarkOutput output) { } } + private void markIdleIfReady( + WatermarkOutput output, long currentTimeMillis, long idleDurationMillis) { + + if (idle || idleDurationMillis < config.idleAdvanceTimeoutMillis()) { + return; + } + + if (!idleAdvanceReadinessChecker.isReady(currentTimeMillis)) { + return; + } + + output.markIdle(); + idle = true; + } + private long calculateOutOfOrdernessMillis() { int sampleCount = (int) Math.min(recordCount - 1, SAMPLE_SIZE); if (sampleCount <= 0) { diff --git a/connectors/kafka-safe-connector/src/test/java/com/datasqrl/flinkrunner/connector/kafka/KafkaRecordTimestampWatermarkStrategyTest.java b/connectors/kafka-safe-connector/src/test/java/com/datasqrl/flinkrunner/connector/kafka/KafkaRecordTimestampWatermarkStrategyTest.java index 23615edf..b0ca6f34 100644 --- a/connectors/kafka-safe-connector/src/test/java/com/datasqrl/flinkrunner/connector/kafka/KafkaRecordTimestampWatermarkStrategyTest.java +++ b/connectors/kafka-safe-connector/src/test/java/com/datasqrl/flinkrunner/connector/kafka/KafkaRecordTimestampWatermarkStrategyTest.java @@ -155,6 +155,39 @@ void testIdleAdvanceDoesNotEmitWhenReadinessCheckFails() { assertThat(output.watermarks).isEmpty(); } + @Test + void testIdleAdvanceMarksNeverSeenOutputIdleAndReactivatesOnFirstRecord() { + final MutableMillisClock clock = new MutableMillisClock(); + final WatermarkGenerator generator = + new KafkaRecordTimestampWatermarkStrategy( + WatermarkEmitStrategy.ON_PERIODIC, + new SourceWatermarkConfig(250, 50, 1000, 0.95D, 1000, 10_000, 1000, 5000), + clock, + currentTimeMillis -> true) + .createWatermarkGenerator(null); + final CollectingWatermarkOutput output = new CollectingWatermarkOutput(); + + clock.advanceMillis(999); + generator.onPeriodicEmit(output); + + assertThat(output.idleCount).isZero(); + + clock.advanceMillis(1); + generator.onPeriodicEmit(output); + + assertThat(output.idleCount).isOne(); + assertThat(output.activeCount).isZero(); + assertThat(output.watermarks).isEmpty(); + + generator.onPeriodicEmit(output); + + assertThat(output.idleCount).isOne(); + + generator.onEvent(null, 10_000L, output); + + assertThat(output.activeCount).isOne(); + } + private static WatermarkGenerator createOnPeriodicGenerator() { return new KafkaRecordTimestampWatermarkStrategy( WatermarkEmitStrategy.ON_PERIODIC, new SourceWatermarkConfig()) @@ -177,6 +210,8 @@ WatermarkEmitStrategy.ON_EVENT, new SourceWatermarkConfig()) private static final class CollectingWatermarkOutput implements WatermarkOutput { private final List watermarks = new ArrayList<>(); + private int idleCount; + private int activeCount; @Override public void emitWatermark(Watermark watermark) { @@ -184,10 +219,14 @@ public void emitWatermark(Watermark watermark) { } @Override - public void markIdle() {} + public void markIdle() { + idleCount++; + } @Override - public void markActive() {} + public void markActive() { + activeCount++; + } } private static final class MutableMillisClock From ff78db9dfcd5b5f8b0cb4ecd19aaced0b9651993 Mon Sep 17 00:00:00 2001 From: Ferenc Csaky Date: Tue, 9 Jun 2026 18:44:07 +0200 Subject: [PATCH 6/6] fix catching AuthorizationException --- ...KafkaAdminIdleAdvanceReadinessChecker.java | 9 +-- ...aAdminIdleAdvanceReadinessCheckerTest.java | 61 +++++++++++++++++++ 2 files changed, 66 insertions(+), 4 deletions(-) diff --git a/connectors/kafka-safe-connector/src/main/java/com/datasqrl/flinkrunner/connector/kafka/KafkaAdminIdleAdvanceReadinessChecker.java b/connectors/kafka-safe-connector/src/main/java/com/datasqrl/flinkrunner/connector/kafka/KafkaAdminIdleAdvanceReadinessChecker.java index 84651b3e..5798347e 100644 --- a/connectors/kafka-safe-connector/src/main/java/com/datasqrl/flinkrunner/connector/kafka/KafkaAdminIdleAdvanceReadinessChecker.java +++ b/connectors/kafka-safe-connector/src/main/java/com/datasqrl/flinkrunner/connector/kafka/KafkaAdminIdleAdvanceReadinessChecker.java @@ -96,11 +96,12 @@ boolean checkBrokerAndTopicTimestampType() { var timestampType = config.get(TopicConfig.MESSAGE_TIMESTAMP_TYPE_CONFIG); return TimestampType.LOG_APPEND_TIME.toString().equals(timestampType.value()); }); - } catch (AuthorizationException e) { - throw new RuntimeException( - "Cannot check idle watermark advance readiness: insufficient Kafka permissions.", e); - } catch (Exception e) { + if (e.getCause() instanceof AuthorizationException ae) { + throw new RuntimeException( + "Cannot check idle watermark advance readiness: insufficient Kafka permissions.", ae); + } + log.debug("Failed to check idle watermark advance readiness", e); return false; } diff --git a/connectors/kafka-safe-connector/src/test/java/com/datasqrl/flinkrunner/connector/kafka/KafkaAdminIdleAdvanceReadinessCheckerTest.java b/connectors/kafka-safe-connector/src/test/java/com/datasqrl/flinkrunner/connector/kafka/KafkaAdminIdleAdvanceReadinessCheckerTest.java index d358b471..9ef22e7e 100644 --- a/connectors/kafka-safe-connector/src/test/java/com/datasqrl/flinkrunner/connector/kafka/KafkaAdminIdleAdvanceReadinessCheckerTest.java +++ b/connectors/kafka-safe-connector/src/test/java/com/datasqrl/flinkrunner/connector/kafka/KafkaAdminIdleAdvanceReadinessCheckerTest.java @@ -16,10 +16,21 @@ package com.datasqrl.flinkrunner.connector.kafka; import static org.assertj.core.api.Assertions.assertThat; +import static org.assertj.core.api.Assertions.assertThatThrownBy; +import static org.mockito.ArgumentMatchers.anyCollection; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.when; import com.datasqrl.flinkrunner.connector.kafka.SourceWatermarkOptions.SourceWatermarkConfig; import java.util.List; +import java.util.Map; import java.util.Properties; +import org.apache.kafka.clients.admin.AdminClient; +import org.apache.kafka.clients.admin.Config; +import org.apache.kafka.clients.admin.DescribeConfigsResult; +import org.apache.kafka.common.config.ConfigResource; +import org.apache.kafka.common.errors.AuthorizationException; +import org.apache.kafka.common.internals.KafkaFutureImpl; import org.junit.jupiter.api.Test; class KafkaAdminIdleAdvanceReadinessCheckerTest { @@ -38,6 +49,56 @@ void testCachesBrokerCheckUntilConfiguredTtlExpires() { assertThat(checker.checkCount).isEqualTo(2); } + @Test + void testThrowsWhenBrokerCheckFailsWithAuthorizationException() throws Exception { + var checker = newReadinessChecker(); + var authorizationException = new AuthorizationException("describe configs denied"); + setAdminClient(checker, adminClientCompletingExceptionally(authorizationException)); + + assertThatThrownBy(checker::checkBrokerAndTopicTimestampType) + .isInstanceOf(RuntimeException.class) + .hasMessage( + "Cannot check idle watermark advance readiness: insufficient Kafka permissions.") + .hasCause(authorizationException); + } + + @Test + void testReturnsFalseWhenBrokerCheckFailsWithoutAuthorizationException() throws Exception { + var checker = newReadinessChecker(); + setAdminClient( + checker, adminClientCompletingExceptionally(new RuntimeException("intermittent problem"))); + + assertThat(checker.checkBrokerAndTopicTimestampType()).isFalse(); + } + + private static KafkaAdminIdleAdvanceReadinessChecker newReadinessChecker() { + return new KafkaAdminIdleAdvanceReadinessChecker( + new Properties(), + List.of("topic"), + new SourceWatermarkConfig(250, 50, 1000, 0.95D, 1000, 10_000, 1000, 10_000)); + } + + private static AdminClient adminClientCompletingExceptionally(Exception exception) { + var adminClient = mock(AdminClient.class); + var describeConfigsResult = mock(DescribeConfigsResult.class); + var future = new KafkaFutureImpl>(); + future.completeExceptionally(exception); + + when(adminClient.describeConfigs(anyCollection())).thenReturn(describeConfigsResult); + when(describeConfigsResult.all()).thenReturn(future); + + return adminClient; + } + + private static void setAdminClient( + KafkaAdminIdleAdvanceReadinessChecker checker, AdminClient adminClient) throws Exception { + + var adminClientField = + KafkaAdminIdleAdvanceReadinessChecker.class.getDeclaredField("adminClient"); + adminClientField.setAccessible(true); + adminClientField.set(checker, adminClient); + } + private static final class CountingReadinessChecker extends KafkaAdminIdleAdvanceReadinessChecker {