[spark] Add numRowsRead custom metric and improve Spark UI operator name for FlussScan#3196
Merged
luoyuxia merged 4 commits intoapache:mainfrom Apr 27, 2026
Merged
[spark] Add numRowsRead custom metric and improve Spark UI operator name for FlussScan#3196luoyuxia merged 4 commits intoapache:mainfrom
luoyuxia merged 4 commits intoapache:mainfrom
Conversation
Contributor
Author
Contributor
There was a problem hiding this comment.
Pull request overview
This PR enhances the Fluss Spark connector’s observability and Spark UI readability by adding a custom numRowsRead scan metric and improving scan/table naming shown in Spark’s operator UI.
Changes:
- Add a
numRowsReadcustom metric for Fluss scans via Spark DataSource V2 custom metrics. - Improve Spark UI scan description (
FlussScan.description) to include table path and scan type. - Refactor partition readers to move scan iteration logic into
next0()so the base reader can centrally maintain the row counter and report task metrics.
Reviewed changes
Copilot reviewed 10 out of 10 changed files in this pull request and generated no comments.
Show a summary per file
| File | Description |
|---|---|
| fluss-spark/fluss-spark-common/src/main/scala/org/apache/fluss/spark/read/FlussMetrics.scala | Introduces Fluss custom metric/task-metric implementations and a shared metric name constant. |
| fluss-spark/fluss-spark-common/src/main/scala/org/apache/fluss/spark/read/FlussPartitionReader.scala | Adds centralized row counting + executor-side currentMetricsValues() reporting and next0() hook. |
| fluss-spark/fluss-spark-common/src/main/scala/org/apache/fluss/spark/read/FlussScan.scala | Adds scan description improvements and advertises supported custom metrics. |
| fluss-spark/fluss-spark-common/src/main/scala/org/apache/fluss/spark/read/FlussAppendPartitionReader.scala | Refactors next() implementation to next0() for centralized metric accounting. |
| fluss-spark/fluss-spark-common/src/main/scala/org/apache/fluss/spark/read/FlussUpsertPartitionReader.scala | Refactors next() implementation to next0() for centralized metric accounting. |
| fluss-spark/fluss-spark-common/src/main/scala/org/apache/fluss/spark/read/lake/FlussLakeAppendPartitionReader.scala | Refactors next() implementation to next0() for centralized metric accounting. |
| fluss-spark/fluss-spark-common/src/main/scala/org/apache/fluss/spark/read/lake/FlussLakeUpsertPartitionReader.scala | Refactors next() implementation to next0() for centralized metric accounting. |
| fluss-spark/fluss-spark-common/src/main/scala/org/apache/fluss/spark/catalog/AbstractSparkTable.scala | Makes Spark table name() return the concise table path string for better UI readability. |
| fluss-spark/fluss-spark-ut/src/test/scala/org/apache/fluss/spark/SparkLogTableReadTest.scala | Adds UT coverage validating scan description and numRowsRead metric accumulation for append/log scans. |
| fluss-spark/fluss-spark-ut/src/test/scala/org/apache/fluss/spark/SparkPrimaryKeyTableReadTest.scala | Adds UT coverage validating scan description and numRowsRead metric accumulation for upsert/PK scans. |
💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.
Contributor
Author
|
@luoyuxia please help take a look, thank you! |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
Add this suggestion to a batch that can be applied as a single commit.This suggestion is invalid because no changes were made to the code.Suggestions cannot be applied while the pull request is closed.Suggestions cannot be applied while viewing a subset of changes.Only one suggestion per line can be applied in a batch.Add this suggestion to a batch that can be applied as a single commit.Applying suggestions on deleted lines is not supported.You must change the existing code in this line in order to create a valid suggestion.Outdated suggestions cannot be applied.This suggestion has been applied or marked resolved.Suggestions cannot be applied from pending reviews.Suggestions cannot be applied on multi-line comments.Suggestions cannot be applied while the pull request is queued to merge.Suggestion cannot be applied right now. Please check back later.


Purpose
Linked issue: close #3160
Brief change log
This PR adds custom metrics support to the Spark connector for Fluss table scans, enabling users to monitor
row read counts via Spark UI. It also fixes
AbstractSparkTable.name()to return the correct table path.numRowsReadcustom metric: IntroducesFlussMetricswith custom metric classes (FlussNumRowsReadMetric,FlussNumRowsReadTaskMetric) for driver-side aggregation and executor-side reportingAbstractSparkTable.name()now returnstableInfo.getTablePath.toString(e.g.db.table) instead of the verboseTableInfo.toString.next()→next0(): Moves scan logic tonext0()in concrete readers so thebase class can centrally maintain the row counter.
Tests
SparkLogTableReadTest, Spark Read: log table scan metricsSparkPrimaryKeyTableReadTest, Spark Read: primary key table scan metricsAPI and Format
Documentation