Skip to content

feat(tools): support Parquet and Arrow import with schema/auto modes#793

Open
gengziyand wants to merge 5 commits intoapache:developfrom
gengziyand:feat/tools-format-to-tsfile
Open

feat(tools): support Parquet and Arrow import with schema/auto modes#793
gengziyand wants to merge 5 commits intoapache:developfrom
gengziyand:feat/tools-format-to-tsfile

Conversation

@gengziyand
Copy link
Copy Markdown

Summary

  • Add Parquet and Arrow format support to the TsFile import tool (alongside existing CSV)
  • Add auto mode (schema-less) import for all three formats
  • Unify schema naming: id_columnstag_columns, csv_columnssource_columns (backward compatible)
  • Add parquet2tsfile.sh/.bat and arrow2tsfile.sh/.bat scripts
  • Add CLI options: --format, --table_name, --time_precision, --separator

Changes

  • New classes: ParquetSourceReader, ArrowSourceReader, AutoSchemaInferer, ImportExecutor, ImportSchema, ImportSchemaParser, SourceReader,
    SourceBatch, TabletBuilder, TimeConverter, ValueConverter
  • Modified: TsFileTool.java (unified CLI entry point), CsvSourceReader.java (auto mode support)
  • New dependencies: parquet-hadoop 1.14.4, hadoop-common 3.3.6, arrow-vector 15.0.2
  • 189 automated tests across 11 test classes
  • Updated README.md and README-zh.md

Test plan

  • 189 unit and E2E tests passing (JDK 8)
  • Smoke test on Linux JDK 17 (Docker) - 9 scenarios
  • Smoke test on Windows JDK 8 and JDK 17 - 9 scenarios
  • Data correctness verification - 39 checks (table name, TAG/FIELD, cross-format consistency)
  • Boundary tests - 8 scenarios (tab separator, null values, SKIP/DEFAULT, error handling, fail_dir)

Copy link
Copy Markdown
Contributor

@CritasWang CritasWang left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

need check skip

private List<String> getSchemaColumnNames() {
List<String> names = new ArrayList<>();
for (ImportSchema.SourceColumn col : schema.getSourceColumns()) {
if (!col.isSkip()) {
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In schema mode these readers drop skipped source columns from the SourceBatch, but TabletBuilder still indexes values by the original source_columns positions. With a schema like time, unused SKIP, value, getSchemaColumnNames() returns only [time, value], while TabletBuilder maps value to index 2, so batch.getValue(row, 2) throws and the import fails (or later columns shift incorrectly). This affects real Parquet/Arrow schemas using SKIP, and the current Parquet test only checks readBatch(), not the ImportExecutor path. Keep placeholder columns for skipped entries or change the builder mapping to use batch column names rather than schema positions.

Copy link
Copy Markdown
Contributor

@JackieTien97 JackieTien97 left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Overall

Good refactoring — the new SourceReader / ImportSchema / ImportExecutor abstraction cleanly separates format-specific reading from the TsFile writing pipeline, and the auto-mode schema inference is a nice usability improvement. The test coverage (189 tests across 11 classes) is solid. Below are the issues I found.


Bug: time_precision validation rejects "s" but the rest of the code supports it

ImportSchemaParser.validate() only allows ms/us/ns:

if (!"ms".equals(tp) && !"us".equals(tp) && !"ns".equals(tp)) {
    throw new IllegalArgumentException("time_precision must be ms, us, or ns");
}

But TimeConverter, the CLI --time_precision option, the README, and ArrowSourceReader.detectTimestampPrecision() (which returns "s" for SECOND) all support "s". A schema file with time_precision=s will fail at validation.

Fix: Add && !"s".equals(tp) to the check, or extract a VALID_PRECISIONS set.


Resource leak in ArrowSourceReader.ensureReaderOpen()

arrowReader = new ArrowFileReader(new FileInputStream(sourceFile).getChannel(), allocator);

The FileInputStream is created as an anonymous temporary — only its channel is captured. The stream itself is never closed. Even if ArrowFileReader.close() closes the channel, the FileInputStream may hold OS-level resources.

Fix: Store the FileInputStream as a field and close it in ArrowSourceReader.close().


enforcer.skip=true suppresses Maven enforcer

<enforcer.skip>true</enforcer.skip>

For an Apache project, silently skipping the enforcer hides dependency convergence issues. The heavy Hadoop dependency tree likely triggers this — please either resolve the convergence conflicts explicitly, or add a comment explaining why it's safe to skip.


Dependency versions should use properties

parquet-hadoop 1.14.4, hadoop-common 3.3.6, arrow-vector 15.0.2 are hardcoded in pom.xml. These should be managed via <properties> (ideally in the parent POM) for consistent version management. This also makes upgrades more visible.


ImportExecutor.execute() accepts failDir but never uses it

public boolean execute(SourceReader reader, String outputDir, String sourceBaseName, String failDir) {

The failDir parameter is never read in the method body. Either wire it up (e.g., move failed source files there on error) or remove it to avoid misleading callers.


Documentation mismatch on type promotion chain

The README states:

CSV type inference uses a 100-row sampling window with promotion chain: BOOLEAN → INT64 → DOUBLE → STRING

This implies BOOLEAN promotes to INT64, but the code (AutoSchemaInferer.promote()) sends BOOLEAN mixed with any other type directly to STRING:

if (current == InferredType.BOOLEAN || incoming == InferredType.BOOLEAN) {
    return InferredType.STRING;
}

The actual behavior is: INT64 ↔ DOUBLE promotes to DOUBLE; any other mixed pair promotes to STRING. The docs should reflect this.


Minor

  • TagColumn.existsInSource() returns !hasDefault, which means any column with a DEFAULT value is treated as virtual (not in source). The method name is slightly misleading — consider renaming to isVirtual() with inverted semantics, or adding a clarifying comment.
  • In CsvSourceReader.readBatch(), when the chunk size boundary is hit, the boundary line is both added to rows and triggers a return — but currentSize is not updated to include this final line, which subtly makes chunks slightly larger than the configured size. Not a functional issue, just an off-by-one on the accounting.

@JackieTien97 JackieTien97 dismissed their stale review April 29, 2026 08:40

Replacing with inline review comments

Copy link
Copy Markdown
Contributor

@JackieTien97 JackieTien97 left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Overall the new SourceReader / ImportSchema / ImportExecutor abstraction is a clean design that separates format-specific reading from TsFile writing. Auto-mode schema inference is a nice usability win. Test coverage (189 tests across 11 classes) is solid. See inline comments for specific issues.


private static void validate(ImportSchema schema) {
String tp = schema.getTimePrecision();
if (!"ms".equals(tp) && !"us".equals(tp) && !"ns".equals(tp)) {
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Bug: This validation rejects "s" as a valid time_precision, but TimeConverter, the CLI --time_precision option, the README (which lists ms / us / ns / s), and ArrowSourceReader.detectTimestampPrecision() (returns "s" for SECOND) all support it.

A schema file with time_precision=s will fail here.

Suggested fix:

if (!"ms".equals(tp) && !"us".equals(tp) && !"ns".equals(tp) && !"s".equals(tp)) {
    throw new IllegalArgumentException("time_precision must be ms, us, ns, or s");
}

Or extract a VALID_PRECISIONS set for single-source-of-truth.

Copy link
Copy Markdown
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Fixed. time_precision validation now also accepts "s", and the error message was updated accordingly.

private void ensureReaderOpen() throws IOException {
if (arrowReader == null) {
allocator = new RootAllocator();
arrowReader = new ArrowFileReader(new FileInputStream(sourceFile).getChannel(), allocator);
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Resource leak: The FileInputStream is created as an anonymous temporary — only its channel is captured. The stream itself is never closed. Even if ArrowFileReader.close() closes the channel, the FileInputStream object may hold OS-level resources.

Suggested fix: store it as a field and close it in close():

private FileInputStream fileInputStream;

private void ensureReaderOpen() throws IOException {
    if (arrowReader == null) {
        allocator = new RootAllocator();
        fileInputStream = new FileInputStream(sourceFile);
        arrowReader = new ArrowFileReader(fileInputStream.getChannel(), allocator);
        // ...
    }
}

public void close() {
    // ... close arrowReader first ...
    if (fileInputStream != null) {
        try { fileInputStream.close(); } catch (IOException e) { ... }
    }
    // ... close allocator ...
}

Copy link
Copy Markdown
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done. I now keep the FileInputStream as a field in ArrowSourceReader and close it explicitly in close() instead of creating it anonymously from getChannel().

Comment thread java/tools/pom.xml Outdated
<artifactId>tools</artifactId>
<name>TsFile: Java: Tools</name>
<properties>
<enforcer.skip>true</enforcer.skip>
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Silently skipping the Maven enforcer hides dependency convergence issues. The heavy Hadoop/Arrow dependency tree likely triggers this — please either:

  1. Resolve the convergence conflicts explicitly (preferred for Apache projects), or
  2. Add a comment explaining why it's safe to skip and what specific conflict it works around.

Copy link
Copy Markdown
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done. I removed enforcer.skip, reproduced the convergence failures locally, and resolved them by managing the conflicting transitive dependency versions in the parent dependencyManagement.

Comment thread java/tools/pom.xml Outdated
<dependency>
<groupId>org.apache.parquet</groupId>
<artifactId>parquet-hadoop</artifactId>
<version>1.14.4</version>
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hardcoded dependency versions (parquet-hadoop 1.14.4, hadoop-common 3.3.6, arrow-vector 15.0.2) should be managed via <properties> — ideally in the parent POM — for consistent version management across modules. This also makes upgrades more visible in diffs.

Copy link
Copy Markdown
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Updated. I moved the tools dependency versions out of java/tools/pom.xml and into the parent java/pom.xml dependencyManagement section for centralized version management.

}

public boolean execute(
SourceReader reader, String outputDir, String sourceBaseName, String failDir) {
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

failDir parameter is accepted but never used in the method body. This is misleading to callers who pass a fail directory expecting it to be wired up.

Either use it (e.g., move/copy failed source files there on error) or remove the parameter.

Copy link
Copy Markdown
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done. I removed the unused failDir parameter by collapsing the 4-arg execute(...) into the real 3-arg method and updated the call sites in TsFileTool accordingly.

Comment thread java/tools/README.md Outdated
**Auto mode rules:**
- Time column: must be named exactly `time` or `TIME` (case-sensitive, strict match)
- All other columns become FIELD (no tag inference)
- CSV type inference uses a 100-row sampling window with promotion chain: `BOOLEAN → INT64 → DOUBLE → STRING`
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Doc mismatch with code: This implies BOOLEAN promotes to INT64, but AutoSchemaInferer.promote() sends BOOLEAN mixed with any other type directly to STRING:

if (current == InferredType.BOOLEAN || incoming == InferredType.BOOLEAN) {
    return InferredType.STRING;
}

The actual behavior: INT64 ↔ DOUBLE promotes to DOUBLE; any other mixed pair (including BOOLEAN + numeric) promotes to STRING. The promotion chain description should reflect this.

Copy link
Copy Markdown
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Fixed. I updated both README.md and README-zh.md so the documented CSV type-promotion behavior now matches the actual implementation.

return defaultValue;
}

public boolean existsInSource() {
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nit: the name existsInSource() is a bit misleading — it actually returns !hasDefault, meaning "is not a virtual column." Consider renaming to isVirtual() (with inverted return) or at minimum adding a one-liner clarifying the semantics.

Copy link
Copy Markdown
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Updated as suggested. I renamed existsInSource() to isVirtual(), inverted the return semantics, and adjusted all call sites/tests to match the new meaning.

byte[] lineBytes = line.getBytes(StandardCharsets.UTF_8);
long lineSize = lineBytes.length;

if (currentSize > 0 && currentSize + lineSize > chunkSizeBytes) {
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Minor: when the chunk boundary is hit, the boundary line is added to rows (line 184) and triggers a return — but currentSize is never updated to include this final line. This means chunks can be slightly larger than chunkSizeBytes. Not a functional issue (just an off-by-one on the accounting), but worth noting.

Copy link
Copy Markdown
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Fixed. I added the missing currentSize update before returning the batch at the chunk boundary.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

3 participants