From cb3c041451256f6b5363c99eafc3782018eb3b36 Mon Sep 17 00:00:00 2001 From: yongqian Date: Wed, 15 Apr 2026 21:27:41 +0800 Subject: [PATCH 1/6] Supports merging multiple ORC files with the same schema into multiple ORC files. --- .../src/java/org/apache/orc/tools/Driver.java | 2 +- .../java/org/apache/orc/tools/MergeFiles.java | 123 ++++++++++++++++-- .../org/apache/orc/tools/TestMergeFiles.java | 79 +++++++++++ site/_docs/java-tools.md | 30 ++++- 4 files changed, 223 insertions(+), 11 deletions(-) diff --git a/java/tools/src/java/org/apache/orc/tools/Driver.java b/java/tools/src/java/org/apache/orc/tools/Driver.java index b134a1bb80..48acf340ff 100644 --- a/java/tools/src/java/org/apache/orc/tools/Driver.java +++ b/java/tools/src/java/org/apache/orc/tools/Driver.java @@ -93,7 +93,7 @@ public static void main(String[] args) throws Exception { System.err.println(" data - print the data from the ORC file"); System.err.println(" json-schema - scan JSON files to determine their schema"); System.err.println(" key - print information about the keys"); - System.err.println(" merge - merge multiple ORC files into a single ORC file"); + System.err.println(" merge - merge multiple ORC files into one or more ORC files"); System.err.println(" meta - print the metadata about the ORC file"); System.err.println(" scan - scan the ORC file"); System.err.println(" sizes - list size on disk of each column"); diff --git a/java/tools/src/java/org/apache/orc/tools/MergeFiles.java b/java/tools/src/java/org/apache/orc/tools/MergeFiles.java index 39130f82d5..10bb53d694 100644 --- a/java/tools/src/java/org/apache/orc/tools/MergeFiles.java +++ b/java/tools/src/java/org/apache/orc/tools/MergeFiles.java @@ -36,10 +36,14 @@ import java.util.Set; /** - * Merge multiple ORC files that all have the same schema into a single ORC file. + * Merge multiple ORC files that all have the same schema into one or more ORC files. + * When {@code --maxSize} is specified, the tool splits output into multiple part files + * under the given output directory, each not exceeding the specified size threshold. */ public class MergeFiles { + static final String PART_FILE_FORMAT = "part-%05d.orc"; + public static void main(Configuration conf, String[] args) throws Exception { Options opts = createOptions(); CommandLine cli = new DefaultParser().parse(opts, args); @@ -56,9 +60,16 @@ public static void main(Configuration conf, String[] args) throws Exception { } boolean ignoreExtension = cli.hasOption("ignoreExtension"); - List inputFiles = new ArrayList<>(); - OrcFile.WriterOptions writerOptions = OrcFile.writerOptions(conf); + long maxSizeBytes = 0; + if (cli.hasOption("maxSize")) { + maxSizeBytes = Long.parseLong(cli.getOptionValue("maxSize")); + if (maxSizeBytes <= 0) { + System.err.println("--maxSize must be a positive number of bytes."); + System.exit(1); + } + } + List inputStatuses = new ArrayList<>(); String[] files = cli.getArgs(); for (String root : files) { Path rootPath = new Path(root); @@ -66,17 +77,38 @@ public static void main(Configuration conf, String[] args) throws Exception { for (RemoteIterator itr = fs.listFiles(rootPath, true); itr.hasNext(); ) { LocatedFileStatus status = itr.next(); if (status.isFile() && (ignoreExtension || status.getPath().getName().endsWith(".orc"))) { - inputFiles.add(status.getPath()); + inputStatuses.add(status); } } } - if (inputFiles.isEmpty()) { + if (inputStatuses.isEmpty()) { System.err.println("No files found."); System.exit(1); } - List mergedFiles = OrcFile.mergeFiles( - new Path(outputFilename), writerOptions, inputFiles); + List inputFiles = new ArrayList<>(inputStatuses.size()); + for (LocatedFileStatus s : inputStatuses) { + inputFiles.add(s.getPath()); + } + + OrcFile.WriterOptions writerOptions = OrcFile.writerOptions(conf); + + if (maxSizeBytes > 0) { + mergeIntoMultipleFiles(conf, writerOptions, inputStatuses, inputFiles, + new Path(outputFilename), maxSizeBytes); + } else { + mergeIntoSingleFile(writerOptions, inputFiles, new Path(outputFilename), outputFilename); + } + } + + /** + * Original single-output behavior (no --maxSize). + */ + private static void mergeIntoSingleFile(OrcFile.WriterOptions writerOptions, + List inputFiles, + Path outputPath, + String outputFilename) throws Exception { + List mergedFiles = OrcFile.mergeFiles(outputPath, writerOptions, inputFiles); List unSuccessMergedFiles = new ArrayList<>(); if (mergedFiles.size() != inputFiles.size()) { @@ -100,11 +132,77 @@ public static void main(Configuration conf, String[] args) throws Exception { } } + /** + * Multi-output behavior when --maxSize is set. + * Input files are grouped by cumulative raw file size; each group is merged into + * a separate part file (part-00000.orc, part-00001.orc, ...) under outputDir. + * A single file whose size already exceeds maxSizeBytes is placed in its own part. + */ + private static void mergeIntoMultipleFiles(Configuration conf, + OrcFile.WriterOptions writerOptions, + List inputStatuses, + List inputFiles, + Path outputDir, + long maxSizeBytes) throws Exception { + FileSystem outFs = outputDir.getFileSystem(conf); + outFs.mkdirs(outputDir); + + // Group input files into batches where each batch's total size <= maxSizeBytes. + List> batches = new ArrayList<>(); + List currentBatch = new ArrayList<>(); + long currentBatchSize = 0; + + for (LocatedFileStatus status : inputStatuses) { + long fileSize = status.getLen(); + if (!currentBatch.isEmpty() && currentBatchSize + fileSize > maxSizeBytes) { + batches.add(currentBatch); + currentBatch = new ArrayList<>(); + currentBatchSize = 0; + } + currentBatch.add(status.getPath()); + currentBatchSize += fileSize; + } + if (!currentBatch.isEmpty()) { + batches.add(currentBatch); + } + + int totalMerged = 0; + List allUnmerged = new ArrayList<>(); + + for (int i = 0; i < batches.size(); i++) { + List batch = batches.get(i); + Path partOutput = new Path(outputDir, String.format(PART_FILE_FORMAT, i)); + List merged = OrcFile.mergeFiles(partOutput, OrcFile.writerOptions(conf), batch); + totalMerged += merged.size(); + + if (merged.size() != batch.size()) { + Set mergedSet = new HashSet<>(merged); + for (Path p : batch) { + if (!mergedSet.contains(p)) { + allUnmerged.add(p); + } + } + } + } + + if (!allUnmerged.isEmpty()) { + System.err.println("List of files that could not be merged:"); + allUnmerged.forEach(path -> System.err.println(path.toString())); + } + + System.out.printf( + "Output path: %s, Input files size: %d, Merge files size: %d, Output files: %d%n", + outputDir, inputFiles.size(), totalMerged, batches.size()); + if (!allUnmerged.isEmpty()) { + System.exit(1); + } + } + private static Options createOptions() { Options result = new Options(); result.addOption(Option.builder("o") .longOpt("output") - .desc("Output filename") + .desc("Output filename (single-file mode) or output directory (multi-file mode)") .hasArg() .build()); @@ -113,6 +211,15 @@ private static Options createOptions() { .desc("Ignore ORC file extension") .build()); + result.addOption(Option.builder("m") + .longOpt("maxSize") + .desc("Maximum size in bytes for each output ORC file. When set, --output is treated as " + + "an output directory and merged files are written as part-00000.orc, " + + "part-00001.orc, etc. Files are grouped at file boundaries so an individual " + + "file larger than this threshold will still be placed in its own part.") + .hasArg() + .build()); + result.addOption(Option.builder("h") .longOpt("help") .desc("Print help message") diff --git a/java/tools/src/test/org/apache/orc/tools/TestMergeFiles.java b/java/tools/src/test/org/apache/orc/tools/TestMergeFiles.java index 3fdfeba0c4..2901a337ce 100644 --- a/java/tools/src/test/org/apache/orc/tools/TestMergeFiles.java +++ b/java/tools/src/test/org/apache/orc/tools/TestMergeFiles.java @@ -53,6 +53,7 @@ public class TestMergeFiles implements TestConf { @BeforeEach public void openFileSystem() throws Exception { fs = FileSystem.getLocal(conf); + fs.delete(workDir, true); fs.mkdirs(workDir); fs.deleteOnExit(workDir); testFilePath = new Path(workDir + File.separator + "TestMergeFiles.testMerge.orc"); @@ -107,4 +108,82 @@ public void testMerge() throws Exception { assertEquals(10000 + 20000, reader.getNumberOfRows()); } } + + /** + * Verifies that --maxSize splits input files into multiple part files under the output + * directory. Three source files are created; a tight size threshold forces them to be + * written into at least two part files. + */ + @Test + public void testMergeWithMaxSize() throws Exception { + TypeDescription schema = TypeDescription.fromString("struct"); + + // Create 3 source ORC files with different row counts. + String[] sourceNames = { + workDir + File.separator + "ms-1.orc", + workDir + File.separator + "ms-2.orc", + workDir + File.separator + "ms-3.orc" + }; + int[] rowCounts = {5000, 5000, 5000}; + for (int f = 0; f < sourceNames.length; f++) { + Writer writer = OrcFile.createWriter(new Path(sourceNames[f]), + OrcFile.writerOptions(conf).setSchema(schema)); + VectorizedRowBatch batch = schema.createRowBatch(); + LongColumnVector x = (LongColumnVector) batch.cols[0]; + BytesColumnVector y = (BytesColumnVector) batch.cols[1]; + for (int r = 0; r < rowCounts[f]; ++r) { + int row = batch.size++; + x.vector[row] = r; + byte[] buffer = ("val-" + r).getBytes(); + y.setRef(row, buffer, 0, buffer.length); + if (batch.size == batch.getMaxSize()) { + writer.addRowBatch(batch); + batch.reset(); + } + } + if (batch.size != 0) { + writer.addRowBatch(batch); + } + writer.close(); + } + + // Measure the size of the first source file to compute a threshold that forces a split. + long singleFileSize = fs.getFileStatus(new Path(sourceNames[0])).getLen(); + // Threshold: slightly larger than one file so at most one file fits per part. + long maxSize = singleFileSize + 1; + + Path outputDir = new Path(workDir + File.separator + "merge-multi-out"); + fs.delete(outputDir, true); + + PrintStream origOut = System.out; + ByteArrayOutputStream myOut = new ByteArrayOutputStream(); + System.setOut(new PrintStream(myOut, false, StandardCharsets.UTF_8)); + MergeFiles.main(conf, new String[]{workDir.toString(), + "--output", outputDir.toString(), + "--maxSize", String.valueOf(maxSize)}); + System.out.flush(); + System.setOut(origOut); + String output = myOut.toString(StandardCharsets.UTF_8); + System.out.println(output); + + assertTrue(output.contains("Input files size: 3"), "Should report 3 input files"); + assertTrue(output.contains("Merge files size: 3"), "All 3 files should be merged"); + assertTrue(fs.isDirectory(outputDir), "Output directory should be created"); + + // Verify that multiple part files were created and total row count is correct. + long totalRows = 0; + int partCount = 0; + for (int i = 0; ; i++) { + Path part = new Path(outputDir, String.format(MergeFiles.PART_FILE_FORMAT, i)); + if (!fs.exists(part)) { + break; + } + partCount++; + try (Reader reader = OrcFile.createReader(part, OrcFile.readerOptions(conf))) { + totalRows += reader.getNumberOfRows(); + } + } + assertTrue(partCount > 1, "Expected more than one output part file, got: " + partCount); + assertEquals(5000 + 5000 + 5000, totalRows, "Total row count across all parts should match"); + } } diff --git a/site/_docs/java-tools.md b/site/_docs/java-tools.md index a3d546e007..21a1fec1d1 100644 --- a/site/_docs/java-tools.md +++ b/site/_docs/java-tools.md @@ -17,7 +17,7 @@ The subcommands for the tools are: * data - print the data of an ORC file * json-schema (since ORC 1.4) - determine the schema of JSON documents * key (since ORC 1.5) - print information about the encryption keys - * merge (since ORC 2.0.1) - merge multiple ORC files into a single ORC file + * merge (since ORC 2.0.1) - merge multiple ORC files into one or more ORC files * meta - print the metadata of an ORC file * scan (since ORC 1.3) - scan the data for benchmarking * sizes (since ORC 1.7.2) - list size on disk of each column @@ -356,13 +356,39 @@ ______________________________________________________________________ ## Java Merge -The merge command can merge multiple ORC files that all have the same schema into a single ORC file. +The merge command can merge multiple ORC files that all have the same schema. By default +it writes a single output file. If `--maxSize` is set, `--output` is treated as a directory +and the tool writes multiple part files (`part-00000.orc`, `part-00001.orc`, …) under it. +Input files are grouped using their on-disk sizes so that each part’s total input size +does not exceed the given threshold (a single input file larger than the threshold is still +merged into its own part). + +`-h,--help` + : Print help + +`-i,--ignoreExtension` + : Include files that do not end in `.orc` + +`-m,--maxSize ` + : Maximum size in bytes for each output part; enables multi-file output under `--output` + +`-o,--output ` + : Output ORC filename (single-file mode) or output directory (when `--maxSize` is set) + +Merge into one ORC file: ~~~ shell % java -jar orc-tools-X.Y.Z-uber.jar merge --output /path/to/merged.orc /path/to/input_orc/ ______________________________________________________________________ ~~~ +Merge into multiple ORC files under a directory (each part bounded by size): + +~~~ shell +% java -jar orc-tools-X.Y.Z-uber.jar merge --output /path/to/out_dir/ --maxSize 1073741824 /path/to/input_orc/ +______________________________________________________________________ +~~~ + ## Java Version The version command prints the version of this ORC tool. From 5af486009a78909680add1ecf9bd360c4ee1c624 Mon Sep 17 00:00:00 2001 From: yongqian Date: Mon, 20 Apr 2026 11:28:20 +0800 Subject: [PATCH 2/6] Enhance maxSize option validation and output directory checks in MergeFiles --- .../java/org/apache/orc/tools/MergeFiles.java | 31 ++++++++++++++----- .../org/apache/orc/tools/TestMergeFiles.java | 15 +++++---- 2 files changed, 33 insertions(+), 13 deletions(-) diff --git a/java/tools/src/java/org/apache/orc/tools/MergeFiles.java b/java/tools/src/java/org/apache/orc/tools/MergeFiles.java index 10bb53d694..642b554f0b 100644 --- a/java/tools/src/java/org/apache/orc/tools/MergeFiles.java +++ b/java/tools/src/java/org/apache/orc/tools/MergeFiles.java @@ -62,7 +62,12 @@ public static void main(Configuration conf, String[] args) throws Exception { long maxSizeBytes = 0; if (cli.hasOption("maxSize")) { - maxSizeBytes = Long.parseLong(cli.getOptionValue("maxSize")); + try { + maxSizeBytes = Long.parseLong(cli.getOptionValue("maxSize")); + } catch (NumberFormatException e) { + System.err.println("--maxSize requires a numeric value in bytes."); + System.exit(1); + } if (maxSizeBytes <= 0) { System.err.println("--maxSize must be a positive number of bytes."); System.exit(1); @@ -145,7 +150,18 @@ private static void mergeIntoMultipleFiles(Configuration conf, Path outputDir, long maxSizeBytes) throws Exception { FileSystem outFs = outputDir.getFileSystem(conf); - outFs.mkdirs(outputDir); + if (outFs.exists(outputDir)) { + if (!outFs.getFileStatus(outputDir).isDirectory()) { + throw new IllegalArgumentException( + "Output path already exists and is not a directory: " + outputDir); + } + if (outFs.listStatus(outputDir).length > 0) { + throw new IllegalArgumentException( + "Output directory must be empty for multi-file merge: " + outputDir); + } + } else if (!outFs.mkdirs(outputDir)) { + throw new IllegalStateException("Failed to create output directory: " + outputDir); + } // Group input files into batches where each batch's total size <= maxSizeBytes. List> batches = new ArrayList<>(); @@ -172,7 +188,7 @@ private static void mergeIntoMultipleFiles(Configuration conf, for (int i = 0; i < batches.size(); i++) { List batch = batches.get(i); Path partOutput = new Path(outputDir, String.format(PART_FILE_FORMAT, i)); - List merged = OrcFile.mergeFiles(partOutput, OrcFile.writerOptions(conf), batch); + List merged = OrcFile.mergeFiles(partOutput, writerOptions.clone(), batch); totalMerged += merged.size(); if (merged.size() != batch.size()) { @@ -213,10 +229,11 @@ private static Options createOptions() { result.addOption(Option.builder("m") .longOpt("maxSize") - .desc("Maximum size in bytes for each output ORC file. When set, --output is treated as " - + "an output directory and merged files are written as part-00000.orc, " - + "part-00001.orc, etc. Files are grouped at file boundaries so an individual " - + "file larger than this threshold will still be placed in its own part.") + .desc("Maximum cumulative input file size in bytes per output ORC file. When set, " + + "--output is treated as an output directory and merged files are written as " + + "part-00000.orc, part-00001.orc, etc. Input files are grouped at file " + + "boundaries so an individual file larger than this threshold will still be " + + "placed in its own part.") .hasArg() .build()); diff --git a/java/tools/src/test/org/apache/orc/tools/TestMergeFiles.java b/java/tools/src/test/org/apache/orc/tools/TestMergeFiles.java index 2901a337ce..48a88954ce 100644 --- a/java/tools/src/test/org/apache/orc/tools/TestMergeFiles.java +++ b/java/tools/src/test/org/apache/orc/tools/TestMergeFiles.java @@ -118,7 +118,7 @@ public void testMerge() throws Exception { public void testMergeWithMaxSize() throws Exception { TypeDescription schema = TypeDescription.fromString("struct"); - // Create 3 source ORC files with different row counts. + // Create 3 source ORC files. String[] sourceNames = { workDir + File.separator + "ms-1.orc", workDir + File.separator + "ms-2.orc", @@ -158,11 +158,14 @@ public void testMergeWithMaxSize() throws Exception { PrintStream origOut = System.out; ByteArrayOutputStream myOut = new ByteArrayOutputStream(); System.setOut(new PrintStream(myOut, false, StandardCharsets.UTF_8)); - MergeFiles.main(conf, new String[]{workDir.toString(), - "--output", outputDir.toString(), - "--maxSize", String.valueOf(maxSize)}); - System.out.flush(); - System.setOut(origOut); + try { + MergeFiles.main(conf, new String[]{workDir.toString(), + "--output", outputDir.toString(), + "--maxSize", String.valueOf(maxSize)}); + System.out.flush(); + } finally { + System.setOut(origOut); + } String output = myOut.toString(StandardCharsets.UTF_8); System.out.println(output); From 2334079dddfd804d415c0fd8b563fb3a95ffd0a4 Mon Sep 17 00:00:00 2001 From: yongqian Date: Wed, 22 Apr 2026 17:19:42 +0800 Subject: [PATCH 3/6] Add --preserveStructure, deterministic input sorting and --overwrite guard to merge tool --- .../java/org/apache/orc/tools/MergeFiles.java | 349 ++++++++++++++++-- .../org/apache/orc/tools/TestMergeFiles.java | 275 +++++++++++++- site/_docs/java-tools.md | 47 ++- 3 files changed, 630 insertions(+), 41 deletions(-) diff --git a/java/tools/src/java/org/apache/orc/tools/MergeFiles.java b/java/tools/src/java/org/apache/orc/tools/MergeFiles.java index 642b554f0b..439130363b 100644 --- a/java/tools/src/java/org/apache/orc/tools/MergeFiles.java +++ b/java/tools/src/java/org/apache/orc/tools/MergeFiles.java @@ -24,13 +24,16 @@ import org.apache.commons.cli.Option; import org.apache.commons.cli.Options; import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FileStatus; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.LocatedFileStatus; import org.apache.hadoop.fs.Path; import org.apache.hadoop.fs.RemoteIterator; import org.apache.orc.OrcFile; +import java.io.IOException; import java.util.ArrayList; +import java.util.Comparator; import java.util.HashSet; import java.util.List; import java.util.Set; @@ -59,6 +62,8 @@ public static void main(Configuration conf, String[] args) throws Exception { return; } boolean ignoreExtension = cli.hasOption("ignoreExtension"); + boolean preserveStructure = cli.hasOption("preserveStructure"); + boolean overwrite = cli.hasOption("overwrite"); long maxSizeBytes = 0; if (cli.hasOption("maxSize")) { @@ -74,8 +79,21 @@ public static void main(Configuration conf, String[] args) throws Exception { } } - List inputStatuses = new ArrayList<>(); String[] files = cli.getArgs(); + OrcFile.WriterOptions writerOptions = OrcFile.writerOptions(conf); + + if (preserveStructure) { + if (files.length != 1) { + System.err.println( + "--preserveStructure requires exactly one input directory."); + System.exit(1); + } + mergePreserveStructure(conf, writerOptions, new Path(files[0]), + new Path(outputFilename), maxSizeBytes, ignoreExtension, overwrite); + return; + } + + List inputStatuses = new ArrayList<>(); for (String root : files) { Path rootPath = new Path(root); FileSystem fs = rootPath.getFileSystem(conf); @@ -91,16 +109,16 @@ public static void main(Configuration conf, String[] args) throws Exception { System.exit(1); } + inputStatuses.sort(Comparator.comparing(FileStatus::getPath)); + List inputFiles = new ArrayList<>(inputStatuses.size()); for (LocatedFileStatus s : inputStatuses) { inputFiles.add(s.getPath()); } - OrcFile.WriterOptions writerOptions = OrcFile.writerOptions(conf); - if (maxSizeBytes > 0) { mergeIntoMultipleFiles(conf, writerOptions, inputStatuses, inputFiles, - new Path(outputFilename), maxSizeBytes); + new Path(outputFilename), maxSizeBytes, overwrite); } else { mergeIntoSingleFile(writerOptions, inputFiles, new Path(outputFilename), outputFilename); } @@ -148,29 +166,133 @@ private static void mergeIntoMultipleFiles(Configuration conf, List inputStatuses, List inputFiles, Path outputDir, - long maxSizeBytes) throws Exception { - FileSystem outFs = outputDir.getFileSystem(conf); - if (outFs.exists(outputDir)) { - if (!outFs.getFileStatus(outputDir).isDirectory()) { - throw new IllegalArgumentException( - "Output path already exists and is not a directory: " + outputDir); - } - if (outFs.listStatus(outputDir).length > 0) { - throw new IllegalArgumentException( - "Output directory must be empty for multi-file merge: " + outputDir); + long maxSizeBytes, + boolean overwrite) throws Exception { + DirMergeResult r = mergeBatchedIntoDir(conf, writerOptions, inputStatuses, + outputDir, maxSizeBytes, overwrite); + + if (!r.unmergedFiles.isEmpty()) { + System.err.println("List of files that could not be merged:"); + r.unmergedFiles.forEach(path -> System.err.println(path.toString())); + } + + System.out.printf( + "Output path: %s, Input files size: %d, Merge files size: %d, Output files: %d%n", + outputDir, inputFiles.size(), r.mergedFileCount, r.partFileCount); + if (!r.unmergedFiles.isEmpty()) { + System.exit(1); + } + } + + /** + * Preserve the relative directory structure of {@code inputRoot} under + * {@code outputRoot}: every directory containing ORC files (a "leaf") is merged + * independently, and its relative path is mirrored beneath {@code outputRoot}. + * + *

Leaves are located by a depth-first walk of {@code inputRoot}. A directory + * that contains both ORC files and subdirectories is considered ambiguous and + * raises an error. Hidden entries (names starting with {@code '_'} or + * {@code '.'}) are always skipped, matching Hive/Spark conventions for markers + * like {@code _SUCCESS}, {@code _committed_*} or {@code _temporary}. + * + *

If {@code --maxSize} was not supplied ({@code maxSizeBytes == 0}), each + * leaf is merged into a single {@code part-00000.orc} file; otherwise + * ({@code maxSizeBytes > 0}, already validated in {@link #main}) the files + * under a leaf are split into size-bounded part files exactly as in the flat + * multi-file mode. + */ + private static void mergePreserveStructure(Configuration conf, + OrcFile.WriterOptions writerOptions, + Path inputRoot, + Path outputRoot, + long maxSizeBytes, + boolean ignoreExtension, + boolean overwrite) throws Exception { + FileSystem inFs = inputRoot.getFileSystem(conf); + if (!inFs.exists(inputRoot) || !inFs.getFileStatus(inputRoot).isDirectory()) { + throw new IllegalArgumentException( + "Input path must be an existing directory with --preserveStructure: " + inputRoot); + } + + FileSystem outFs = outputRoot.getFileSystem(conf); + prepareOutputDir(outFs, outputRoot, overwrite, "--preserveStructure"); + + Path qualifiedInputRoot = inFs.makeQualified(inputRoot); + List leaves = new ArrayList<>(); + collectLeafDirs(inFs, qualifiedInputRoot, ignoreExtension, leaves); + + if (leaves.isEmpty()) { + System.err.println("No leaf directories containing ORC files found under: " + inputRoot); + System.exit(1); + } + + long effectiveMax = maxSizeBytes > 0 ? maxSizeBytes : Long.MAX_VALUE; + int totalInputFiles = 0; + int totalMerged = 0; + int totalPartFiles = 0; + List allUnmerged = new ArrayList<>(); + + for (Path leaf : leaves) { + String relative = relativize(qualifiedInputRoot, inFs.makeQualified(leaf)); + Path outputLeaf = relative.isEmpty() ? outputRoot : new Path(outputRoot, relative); + + List leafStatuses = + listLeafFiles(inFs, leaf, ignoreExtension); + if (leafStatuses.isEmpty()) { + continue; } - } else if (!outFs.mkdirs(outputDir)) { - throw new IllegalStateException("Failed to create output directory: " + outputDir); + + // Each leaf's output directory lives under a freshly-prepared outputRoot, + // so we don't need (and shouldn't force) overwrite semantics here. + DirMergeResult r = mergeBatchedIntoDir( + conf, writerOptions, leafStatuses, outputLeaf, effectiveMax, false); + + totalInputFiles += leafStatuses.size(); + totalMerged += r.mergedFileCount; + totalPartFiles += r.partFileCount; + allUnmerged.addAll(r.unmergedFiles); + + System.out.printf( + "Leaf: %s -> %s, Input files: %d, Merge files: %d, Output files: %d%n", + leaf, outputLeaf, leafStatuses.size(), r.mergedFileCount, r.partFileCount); + } + + if (!allUnmerged.isEmpty()) { + System.err.println("List of files that could not be merged:"); + allUnmerged.forEach(path -> System.err.println(path.toString())); } - // Group input files into batches where each batch's total size <= maxSizeBytes. + System.out.printf( + "Output root: %s, Leaves: %d, Total input files: %d, " + + "Total merge files: %d, Total output files: %d%n", + outputRoot, leaves.size(), totalInputFiles, totalMerged, totalPartFiles); + if (!allUnmerged.isEmpty()) { + System.exit(1); + } + } + + /** + * Core per-directory merge: groups {@code inputStatuses} into size-bounded batches + * and writes each batch as {@code part-NNNNN.orc} under {@code outputDir}. This + * helper contains no I/O side effects on stdout/stderr or process exit; callers + * aggregate/report results themselves. + */ + private static DirMergeResult mergeBatchedIntoDir(Configuration conf, + OrcFile.WriterOptions writerOptions, + List inputStatuses, + Path outputDir, + long maxSizeBytes, + boolean overwrite) throws Exception { + FileSystem outFs = outputDir.getFileSystem(conf); + prepareOutputDir(outFs, outputDir, overwrite, "multi-file merge"); + List> batches = new ArrayList<>(); List currentBatch = new ArrayList<>(); long currentBatchSize = 0; for (LocatedFileStatus status : inputStatuses) { long fileSize = status.getLen(); - if (!currentBatch.isEmpty() && currentBatchSize + fileSize > maxSizeBytes) { + if (!currentBatch.isEmpty() && currentBatchSize > maxSizeBytes - fileSize) { batches.add(currentBatch); currentBatch = new ArrayList<>(); currentBatchSize = 0; @@ -201,16 +323,157 @@ private static void mergeIntoMultipleFiles(Configuration conf, } } - if (!allUnmerged.isEmpty()) { - System.err.println("List of files that could not be merged:"); - allUnmerged.forEach(path -> System.err.println(path.toString())); + return new DirMergeResult(batches.size(), totalMerged, allUnmerged); + } + + /** + * Ensure {@code outputDir} is an empty directory ready to receive part files. + *

    + *
  • If it does not exist, it is created.
  • + *
  • If it exists and is empty, it is used as-is.
  • + *
  • If it exists, is a directory and non-empty: + *
      + *
    • when {@code overwrite} is {@code true}: a warning is printed and the + * directory is deleted recursively, then recreated.
    • + *
    • when {@code overwrite} is {@code false}: an + * {@link IllegalArgumentException} is thrown so callers do not + * accidentally destroy existing data.
    • + *
    + *
  • + *
  • If it exists but is not a directory, an exception is thrown.
  • + *
+ */ + private static void prepareOutputDir(FileSystem outFs, + Path outputDir, + boolean overwrite, + String modeLabel) throws IOException { + if (outFs.exists(outputDir)) { + if (!outFs.getFileStatus(outputDir).isDirectory()) { + throw new IllegalArgumentException( + "Output path already exists and is not a directory: " + outputDir); + } + FileStatus[] existing = outFs.listStatus(outputDir); + if (existing.length > 0) { + if (!overwrite) { + throw new IllegalArgumentException( + "Output directory is not empty for " + modeLabel + ": " + outputDir + + " (use --overwrite to delete existing contents)"); + } + System.err.println("Overwriting existing non-empty output directory: " + outputDir + + " (" + existing.length + " entries will be deleted)"); + if (!outFs.delete(outputDir, true)) { + throw new IllegalStateException( + "Failed to clean existing output directory: " + outputDir); + } + if (!outFs.mkdirs(outputDir)) { + throw new IllegalStateException( + "Failed to recreate output directory: " + outputDir); + } + } + } else if (!outFs.mkdirs(outputDir)) { + throw new IllegalStateException("Failed to create output directory: " + outputDir); } + } - System.out.printf( - "Output path: %s, Input files size: %d, Merge files size: %d, Output files: %d%n", - outputDir, inputFiles.size(), totalMerged, batches.size()); - if (!allUnmerged.isEmpty()) { - System.exit(1); + /** + * Recursively walk {@code dir}; append every directory that directly contains at + * least one ORC file to {@code out}. A directory containing both ORC files and + * subdirectories is treated as an error. Hidden entries (names starting with + * {@code '_'} or {@code '.'}) are always skipped to match Hive/Spark conventions + * around {@code _SUCCESS}, {@code _temporary}, etc. + */ + private static void collectLeafDirs(FileSystem fs, + Path dir, + boolean ignoreExtension, + List out) throws IOException { + FileStatus[] children = fs.listStatus(dir); + List orcFiles = new ArrayList<>(); + List subDirs = new ArrayList<>(); + for (FileStatus c : children) { + String name = c.getPath().getName(); + if (isHidden(name)) { + continue; + } + if (c.isDirectory()) { + subDirs.add(c); + } else if (isOrcCandidate(c, ignoreExtension)) { + orcFiles.add(c); + } + } + if (!orcFiles.isEmpty() && !subDirs.isEmpty()) { + throw new IllegalArgumentException( + "Directory contains both ORC files and subdirectories which is ambiguous" + + " under --preserveStructure: " + dir); + } + if (!orcFiles.isEmpty()) { + out.add(dir); + return; + } + subDirs.sort(Comparator.comparing(FileStatus::getPath)); + for (FileStatus sub : subDirs) { + collectLeafDirs(fs, sub.getPath(), ignoreExtension, out); + } + } + + /** + * List direct ORC-file children of a leaf directory (non-recursive), skipping + * hidden entries (names starting with {@code '_'} or {@code '.'}). + */ + private static List listLeafFiles(FileSystem fs, + Path leaf, + boolean ignoreExtension) + throws IOException { + List out = new ArrayList<>(); + for (RemoteIterator it = fs.listLocatedStatus(leaf); it.hasNext(); ) { + LocatedFileStatus s = it.next(); + String name = s.getPath().getName(); + if (isHidden(name)) { + continue; + } + if (isOrcCandidate(s, ignoreExtension)) { + out.add(s); + } + } + out.sort(Comparator.comparing(FileStatus::getPath)); + return out; + } + + private static boolean isHidden(String name) { + return !name.isEmpty() && (name.charAt(0) == '_' || name.charAt(0) == '.'); + } + + private static boolean isOrcCandidate(FileStatus s, boolean ignoreExtension) { + return s.isFile() && (ignoreExtension || s.getPath().getName().endsWith(".orc")); + } + + /** + * Return the portion of {@code child}'s path that lies beneath {@code root}. Both + * arguments should already be qualified (same scheme/authority). Returns an empty + * string when {@code child} equals {@code root}. + */ + private static String relativize(Path root, Path child) { + String rootStr = Path.getPathWithoutSchemeAndAuthority(root).toString(); + String childStr = Path.getPathWithoutSchemeAndAuthority(child).toString(); + if (childStr.equals(rootStr)) { + return ""; + } + String prefix = rootStr.endsWith("/") ? rootStr : rootStr + "/"; + if (!childStr.startsWith(prefix)) { + throw new IllegalStateException( + "Child path is not under root: child=" + child + ", root=" + root); + } + return childStr.substring(prefix.length()); + } + + private static final class DirMergeResult { + final int partFileCount; + final int mergedFileCount; + final List unmergedFiles; + + DirMergeResult(int partFileCount, int mergedFileCount, List unmergedFiles) { + this.partFileCount = partFileCount; + this.mergedFileCount = mergedFileCount; + this.unmergedFiles = unmergedFiles; } } @@ -229,14 +492,38 @@ private static Options createOptions() { result.addOption(Option.builder("m") .longOpt("maxSize") - .desc("Maximum cumulative input file size in bytes per output ORC file. When set, " - + "--output is treated as an output directory and merged files are written as " - + "part-00000.orc, part-00001.orc, etc. Input files are grouped at file " - + "boundaries so an individual file larger than this threshold will still be " - + "placed in its own part.") + .desc("Maximum cumulative input file size in bytes per output ORC file. Must be a " + + "positive integer; a value of 0, a negative value, or a non-numeric value " + + "causes the tool to exit with an error. When set, --output is treated as an " + + "output directory and merged files are written as part-00000.orc, " + + "part-00001.orc, etc. Input files are grouped at file boundaries so an " + + "individual file larger than this threshold will still be placed in its own " + + "part.") .hasArg() .build()); + result.addOption(Option.builder("p") + .longOpt("preserveStructure") + .desc("Mirror the input directory structure under --output: each directory that " + + "directly contains ORC files (a 'leaf' directory, e.g. a Hive partition " + + "such as d=2025-04-25/h=01) is merged independently and written to the " + + "corresponding relative path under --output. Works with any nesting depth. " + + "Requires exactly one input directory. Hidden files/directories (names " + + "starting with '_' or '.', such as _SUCCESS or _temporary) are always " + + "skipped to match Hive/Spark conventions. A directory that contains both " + + "ORC files and subdirectories is rejected.") + .build()); + + result.addOption(Option.builder() + .longOpt("overwrite") + .desc("Applies to multi-file output modes (--maxSize and --preserveStructure). " + + "If the output directory already exists and is non-empty, delete its " + + "contents before writing merged part files. Without this flag, the tool " + + "aborts with an error when the output directory is non-empty so that " + + "existing data is not silently destroyed. Only a long form is provided " + + "to avoid any risk of confusing --overwrite with --output (-o).") + .build()); + result.addOption(Option.builder("h") .longOpt("help") .desc("Print help message") diff --git a/java/tools/src/test/org/apache/orc/tools/TestMergeFiles.java b/java/tools/src/test/org/apache/orc/tools/TestMergeFiles.java index 48a88954ce..9b52f27a63 100644 --- a/java/tools/src/test/org/apache/orc/tools/TestMergeFiles.java +++ b/java/tools/src/test/org/apache/orc/tools/TestMergeFiles.java @@ -42,6 +42,8 @@ import java.util.Map; import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertFalse; +import static org.junit.jupiter.api.Assertions.assertThrows; import static org.junit.jupiter.api.Assertions.assertTrue; public class TestMergeFiles implements TestConf { @@ -147,10 +149,9 @@ public void testMergeWithMaxSize() throws Exception { writer.close(); } - // Measure the size of the first source file to compute a threshold that forces a split. - long singleFileSize = fs.getFileStatus(new Path(sourceNames[0])).getLen(); - // Threshold: slightly larger than one file so at most one file fits per part. - long maxSize = singleFileSize + 1; + long firstTwo = fs.getFileStatus(new Path(sourceNames[0])).getLen() + + fs.getFileStatus(new Path(sourceNames[1])).getLen(); + long maxSize = firstTwo + 1; Path outputDir = new Path(workDir + File.separator + "merge-multi-out"); fs.delete(outputDir, true); @@ -186,7 +187,271 @@ public void testMergeWithMaxSize() throws Exception { totalRows += reader.getNumberOfRows(); } } - assertTrue(partCount > 1, "Expected more than one output part file, got: " + partCount); + assertEquals(2, partCount, "Expected exactly two output part files, got: " + partCount); assertEquals(5000 + 5000 + 5000, totalRows, "Total row count across all parts should match"); } + + /** + * A single input file that is larger than --maxSize must still be emitted as its + * own part file (we never split an input). + */ + @Test + public void testMergeWithMaxSizeSingleGiantFile() throws Exception { + TypeDescription schema = TypeDescription.fromString("struct"); + + Path inputDir = new Path(workDir, "giant-in"); + fs.mkdirs(inputDir); + Path giant = new Path(inputDir, "giant.orc"); + writeOrcFile(giant, schema, 20000); + long giantSize = fs.getFileStatus(giant).getLen(); + assertTrue(giantSize > 0, "Giant source file should have non-zero size"); + + Path outputDir = new Path(workDir, "giant-out"); + fs.delete(outputDir, true); + + long maxSize = Math.max(1L, giantSize / 2); + + PrintStream origOut = System.out; + ByteArrayOutputStream myOut = new ByteArrayOutputStream(); + System.setOut(new PrintStream(myOut, false, StandardCharsets.UTF_8)); + try { + MergeFiles.main(conf, new String[]{inputDir.toString(), + "--output", outputDir.toString(), + "--maxSize", String.valueOf(maxSize)}); + System.out.flush(); + } finally { + System.setOut(origOut); + } + String output = myOut.toString(StandardCharsets.UTF_8); + System.out.println(output); + + assertTrue(output.contains("Input files size: 1"), "Should report 1 input file"); + assertTrue(output.contains("Merge files size: 1"), "The giant file should be merged"); + + Path part0 = new Path(outputDir, String.format(MergeFiles.PART_FILE_FORMAT, 0)); + Path part1 = new Path(outputDir, String.format(MergeFiles.PART_FILE_FORMAT, 1)); + assertTrue(fs.exists(part0), "Expected part-00000.orc"); + assertFalse(fs.exists(part1), "Expected exactly one part file for a single input"); + try (Reader reader = OrcFile.createReader(part0, OrcFile.readerOptions(conf))) { + assertEquals(20000, reader.getNumberOfRows(), + "Single giant file must be preserved intact in its own part"); + } + } + + /** + * By default running the merge against a non-empty output directory must fail so + * that existing data is not silently destroyed. With --overwrite the directory's + * contents are deleted before new part files are written. + */ + @Test + public void testMergeWithMaxSizeOverwriteBehavior() throws Exception { + TypeDescription schema = TypeDescription.fromString("struct"); + + Path inputDir = new Path(workDir, "ow-in"); + fs.mkdirs(inputDir); + writeOrcFile(new Path(inputDir, "a.orc"), schema, 100); + + Path outputDir = new Path(workDir, "ow-out"); + fs.delete(outputDir, true); + fs.mkdirs(outputDir); + Path stale = new Path(outputDir, "stale.txt"); + fs.create(stale).close(); + + String[] baseArgs = {inputDir.toString(), + "--output", outputDir.toString(), + "--maxSize", "1048576"}; + + // Without --overwrite: should reject non-empty output directory and leave it untouched. + IllegalArgumentException ex = assertThrows(IllegalArgumentException.class, + () -> MergeFiles.main(conf, baseArgs)); + assertTrue(ex.getMessage().contains("not empty") + && ex.getMessage().contains("--overwrite"), + "Error should mention --overwrite: " + ex.getMessage()); + assertTrue(fs.exists(stale), + "Existing content must be preserved when overwrite is refused"); + + // With --overwrite: existing content is cleared and fresh part files are written. + String[] owArgs = new String[baseArgs.length + 1]; + System.arraycopy(baseArgs, 0, owArgs, 0, baseArgs.length); + owArgs[baseArgs.length] = "--overwrite"; + + PrintStream origErr = System.err; + ByteArrayOutputStream myErr = new ByteArrayOutputStream(); + System.setErr(new PrintStream(myErr, false, StandardCharsets.UTF_8)); + try { + MergeFiles.main(conf, owArgs); + System.err.flush(); + } finally { + System.setErr(origErr); + } + String errOut = myErr.toString(StandardCharsets.UTF_8); + assertTrue(errOut.contains("Overwriting existing non-empty output directory"), + "Expected an overwrite warning on stderr, got:\n" + errOut); + assertFalse(fs.exists(stale), "Pre-existing file should be cleared by --overwrite"); + + Path part0 = new Path(outputDir, String.format(MergeFiles.PART_FILE_FORMAT, 0)); + assertTrue(fs.exists(part0), "Expected part-00000.orc after overwrite"); + try (Reader reader = OrcFile.createReader(part0, OrcFile.readerOptions(conf))) { + assertEquals(100, reader.getNumberOfRows()); + } + } + + /** + * Creates an ORC file at the given path with {@code rowCount} rows of the fixed + * {@code struct} schema used across these tests. + */ + private void writeOrcFile(Path path, TypeDescription schema, int rowCount) throws Exception { + Writer writer = OrcFile.createWriter(path, + OrcFile.writerOptions(conf).setSchema(schema)); + VectorizedRowBatch batch = schema.createRowBatch(); + LongColumnVector x = (LongColumnVector) batch.cols[0]; + BytesColumnVector y = (BytesColumnVector) batch.cols[1]; + for (int r = 0; r < rowCount; ++r) { + int row = batch.size++; + x.vector[row] = r; + byte[] buffer = ("val-" + r).getBytes(); + y.setRef(row, buffer, 0, buffer.length); + if (batch.size == batch.getMaxSize()) { + writer.addRowBatch(batch); + batch.reset(); + } + } + if (batch.size != 0) { + writer.addRowBatch(batch); + } + writer.close(); + } + + /** + * Two-level nested partitions (d=.../h=01, h=02, h=03). With --preserveStructure, + * each leaf partition must be merged independently and its relative path mirrored + * under the output root. Data MUST NOT be cross-mixed between leaves. + */ + @Test + public void testPreserveStructureTwoLevelPartitions() throws Exception { + TypeDescription schema = TypeDescription.fromString("struct"); + + Path inputRoot = new Path(workDir, "ps-in"); + Path partitionBase = new Path(inputRoot, "d=2025-04-25"); + + int[] hRows = {100, 200, 300}; + for (int h = 0; h < hRows.length; h++) { + Path leaf = new Path(partitionBase, String.format("h=%02d", h + 1)); + fs.mkdirs(leaf); + writeOrcFile(new Path(leaf, "a.orc"), schema, hRows[h]); + writeOrcFile(new Path(leaf, "b.orc"), schema, hRows[h] / 2); + } + + Path outputRoot = new Path(workDir, "ps-out"); + fs.delete(outputRoot, true); + + PrintStream origOut = System.out; + ByteArrayOutputStream myOut = new ByteArrayOutputStream(); + System.setOut(new PrintStream(myOut, false, StandardCharsets.UTF_8)); + try { + MergeFiles.main(conf, new String[]{inputRoot.toString(), + "--output", outputRoot.toString(), + "--preserveStructure"}); + System.out.flush(); + } finally { + System.setOut(origOut); + } + String output = myOut.toString(StandardCharsets.UTF_8); + System.out.println(output); + + assertTrue(output.contains("Leaves: 3"), + "Expected 3 leaves in summary, got:\n" + output); + + for (int h = 0; h < hRows.length; h++) { + Path outLeaf = new Path(outputRoot, "d=2025-04-25/" + String.format("h=%02d", h + 1)); + assertTrue(fs.isDirectory(outLeaf), + "Expected mirrored leaf directory to exist: " + outLeaf); + Path part0 = new Path(outLeaf, String.format(MergeFiles.PART_FILE_FORMAT, 0)); + assertTrue(fs.exists(part0), + "Expected part-00000.orc under leaf: " + outLeaf); + long expectedRows = hRows[h] + hRows[h] / 2; + try (Reader reader = OrcFile.createReader(part0, OrcFile.readerOptions(conf))) { + assertEquals(expectedRows, reader.getNumberOfRows(), + "Row count mismatch for " + outLeaf); + assertEquals(schema, reader.getSchema()); + } + } + } + + /** + * A directory containing BOTH ORC files and subdirectories is ambiguous under + * --preserveStructure and must be rejected before any output is written. + */ + @Test + public void testPreserveStructureRejectsMixedDirectory() throws Exception { + TypeDescription schema = TypeDescription.fromString("struct"); + + Path inputRoot = new Path(workDir, "psmix-in"); + fs.mkdirs(inputRoot); + writeOrcFile(new Path(inputRoot, "stray.orc"), schema, 10); + Path subLeaf = new Path(inputRoot, "h=01"); + fs.mkdirs(subLeaf); + writeOrcFile(new Path(subLeaf, "a.orc"), schema, 20); + + Path outputRoot = new Path(workDir, "psmix-out"); + fs.delete(outputRoot, true); + + IllegalArgumentException ex = assertThrows(IllegalArgumentException.class, + () -> MergeFiles.main(conf, new String[]{inputRoot.toString(), + "--output", outputRoot.toString(), + "--preserveStructure"})); + assertTrue(ex.getMessage().contains("both ORC files and subdirectories"), + "Unexpected error message: " + ex.getMessage()); + } + + /** + * Hidden files/directories (names starting with '_' or '.') must always be + * ignored, so files like _SUCCESS or _temporary/*.orc don't pollute the merged + * output. + */ + @Test + public void testPreserveStructureSkipsHidden() throws Exception { + TypeDescription schema = TypeDescription.fromString("struct"); + + Path inputRoot = new Path(workDir, "pshidden-in"); + Path leaf = new Path(inputRoot, "h=01"); + fs.mkdirs(leaf); + writeOrcFile(new Path(leaf, "data.orc"), schema, 30); + // _SUCCESS-style hidden file next to the real data — should be ignored by default. + writeOrcFile(new Path(leaf, "_hidden.orc"), schema, 999); + // Hidden sibling directory (e.g. _temporary) whose contents would otherwise + // appear as a second leaf — should be skipped entirely. + Path hiddenDir = new Path(inputRoot, "_temporary"); + fs.mkdirs(hiddenDir); + writeOrcFile(new Path(hiddenDir, "a.orc"), schema, 7); + + Path outputRoot = new Path(workDir, "pshidden-out"); + fs.delete(outputRoot, true); + + PrintStream origOut = System.out; + ByteArrayOutputStream myOut = new ByteArrayOutputStream(); + System.setOut(new PrintStream(myOut, false, StandardCharsets.UTF_8)); + try { + MergeFiles.main(conf, new String[]{inputRoot.toString(), + "--output", outputRoot.toString(), + "--preserveStructure"}); + System.out.flush(); + } finally { + System.setOut(origOut); + } + String output = myOut.toString(StandardCharsets.UTF_8); + System.out.println(output); + + assertTrue(output.contains("Leaves: 1"), + "Expected exactly 1 leaf (hidden dir ignored), got:\n" + output); + assertFalse(fs.exists(new Path(outputRoot, "_temporary")), + "Hidden input directory should not be mirrored to output"); + + Path part0 = new Path(outputRoot, "h=01/" + String.format(MergeFiles.PART_FILE_FORMAT, 0)); + assertTrue(fs.exists(part0)); + try (Reader reader = OrcFile.createReader(part0, OrcFile.readerOptions(conf))) { + assertEquals(30, reader.getNumberOfRows(), + "Hidden _hidden.orc rows must not be merged in"); + } + } } diff --git a/site/_docs/java-tools.md b/site/_docs/java-tools.md index 21a1fec1d1..4173b8c47d 100644 --- a/site/_docs/java-tools.md +++ b/site/_docs/java-tools.md @@ -359,9 +359,14 @@ ______________________________________________________________________ The merge command can merge multiple ORC files that all have the same schema. By default it writes a single output file. If `--maxSize` is set, `--output` is treated as a directory and the tool writes multiple part files (`part-00000.orc`, `part-00001.orc`, …) under it. -Input files are grouped using their on-disk sizes so that each part’s total input size -does not exceed the given threshold (a single input file larger than the threshold is still -merged into its own part). +Input files are sorted by path for deterministic output and grouped using their on-disk +sizes so that each part’s total input size does not exceed the given threshold (a single +input file larger than the threshold is still merged into its own part). + +For the multi-file output modes (`--maxSize` and `--preserveStructure`) the tool refuses +to run when `--output` points to a non-empty existing directory, so that existing data is +not silently destroyed. Use `--overwrite` to delete the directory's current contents +before writing new part files. `-h,--help` : Print help @@ -370,10 +375,30 @@ merged into its own part). : Include files that do not end in `.orc` `-m,--maxSize ` - : Maximum size in bytes for each output part; enables multi-file output under `--output` + : Maximum size in bytes for each output part; enables multi-file output under `--output`. + Must be a positive integer — a value of `0`, a negative value, or a non-numeric value + causes the tool to exit with an error. `-o,--output ` - : Output ORC filename (single-file mode) or output directory (when `--maxSize` is set) + : Output ORC filename (single-file mode) or output directory (when `--maxSize` or + `--preserveStructure` is set) + +`--overwrite` + : If the output directory already exists and is non-empty, delete its contents + before writing merged part files. Only applies to multi-file output modes + (`--maxSize` and `--preserveStructure`). Without this flag, the tool aborts + with an error when the output directory is non-empty so that existing data + is not silently destroyed. Intentionally provided as long-form only to avoid + confusion with `-o,--output`. + +`-p,--preserveStructure` + : Mirror the input directory structure under `--output`. Every directory that directly + contains ORC files (a "leaf" directory, e.g. a Hive partition path such as + `d=2025-04-25/h=01`) is merged independently and written to the corresponding relative + path under `--output`. Works with any nesting depth. Requires exactly one input + directory. Hidden files/directories (names starting with `_` or `.`, such as + `_SUCCESS` or `_temporary`) are always skipped to match Hive/Spark conventions. A + directory that contains both ORC files and subdirectories is rejected. Merge into one ORC file: @@ -389,6 +414,18 @@ Merge into multiple ORC files under a directory (each part bounded by size): ______________________________________________________________________ ~~~ +Merge a partitioned input tree while preserving the directory structure (each leaf +partition is merged independently; `--maxSize` is optional): + +~~~ shell +% java -jar orc-tools-X.Y.Z-uber.jar merge \ + --output /path/to/out_dir/ \ + --preserveStructure \ + --maxSize 2147483648 \ + /warehouse/db/table/d=2025-04-25/ +______________________________________________________________________ +~~~ + ## Java Version The version command prints the version of this ORC tool. From d30754deeb7aa00fdc1dc05881d74e165182e874 Mon Sep 17 00:00:00 2001 From: yongqian Date: Wed, 22 Apr 2026 17:44:24 +0800 Subject: [PATCH 4/6] fix style --- java/tools/src/test/org/apache/orc/tools/TestMergeFiles.java | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/java/tools/src/test/org/apache/orc/tools/TestMergeFiles.java b/java/tools/src/test/org/apache/orc/tools/TestMergeFiles.java index 9b52f27a63..ba8d3109c8 100644 --- a/java/tools/src/test/org/apache/orc/tools/TestMergeFiles.java +++ b/java/tools/src/test/org/apache/orc/tools/TestMergeFiles.java @@ -264,8 +264,8 @@ public void testMergeWithMaxSizeOverwriteBehavior() throws Exception { // Without --overwrite: should reject non-empty output directory and leave it untouched. IllegalArgumentException ex = assertThrows(IllegalArgumentException.class, () -> MergeFiles.main(conf, baseArgs)); - assertTrue(ex.getMessage().contains("not empty") - && ex.getMessage().contains("--overwrite"), + assertTrue(ex.getMessage().contains("not empty") && + ex.getMessage().contains("--overwrite"), "Error should mention --overwrite: " + ex.getMessage()); assertTrue(fs.exists(stale), "Existing content must be preserved when overwrite is refused"); From fef080ea3018db25dd5f55179180ea9877878c1a Mon Sep 17 00:00:00 2001 From: yongqian Date: Wed, 22 Apr 2026 18:11:03 +0800 Subject: [PATCH 5/6] Adjust merging logic to ensure contiguous part file naming and improve unmerged file handling --- .../java/org/apache/orc/tools/MergeFiles.java | 27 ++++++++++++++----- 1 file changed, 20 insertions(+), 7 deletions(-) diff --git a/java/tools/src/java/org/apache/orc/tools/MergeFiles.java b/java/tools/src/java/org/apache/orc/tools/MergeFiles.java index 439130363b..668fb07826 100644 --- a/java/tools/src/java/org/apache/orc/tools/MergeFiles.java +++ b/java/tools/src/java/org/apache/orc/tools/MergeFiles.java @@ -214,9 +214,6 @@ private static void mergePreserveStructure(Configuration conf, "Input path must be an existing directory with --preserveStructure: " + inputRoot); } - FileSystem outFs = outputRoot.getFileSystem(conf); - prepareOutputDir(outFs, outputRoot, overwrite, "--preserveStructure"); - Path qualifiedInputRoot = inFs.makeQualified(inputRoot); List leaves = new ArrayList<>(); collectLeafDirs(inFs, qualifiedInputRoot, ignoreExtension, leaves); @@ -226,6 +223,9 @@ private static void mergePreserveStructure(Configuration conf, System.exit(1); } + FileSystem outFs = outputRoot.getFileSystem(conf); + prepareOutputDir(outFs, outputRoot, overwrite, "--preserveStructure"); + long effectiveMax = maxSizeBytes > 0 ? maxSizeBytes : Long.MAX_VALUE; int totalInputFiles = 0; int totalMerged = 0; @@ -305,13 +305,26 @@ private static DirMergeResult mergeBatchedIntoDir(Configuration conf, } int totalMerged = 0; + int partFilesWritten = 0; List allUnmerged = new ArrayList<>(); - for (int i = 0; i < batches.size(); i++) { - List batch = batches.get(i); - Path partOutput = new Path(outputDir, String.format(PART_FILE_FORMAT, i)); + // Advance the part index only when a file is actually written, so + // part-NNNNN.orc stays contiguous and the reported count matches disk. + for (List batch : batches) { + Path partOutput = new Path(outputDir, String.format(PART_FILE_FORMAT, partFilesWritten)); List merged = OrcFile.mergeFiles(partOutput, writerOptions.clone(), batch); + + if (merged.isEmpty()) { + // Drop any 0-stripe placeholder left behind and skip this slot. + if (outFs.exists(partOutput)) { + outFs.delete(partOutput, false); + } + allUnmerged.addAll(batch); + continue; + } + totalMerged += merged.size(); + partFilesWritten++; if (merged.size() != batch.size()) { Set mergedSet = new HashSet<>(merged); @@ -323,7 +336,7 @@ private static DirMergeResult mergeBatchedIntoDir(Configuration conf, } } - return new DirMergeResult(batches.size(), totalMerged, allUnmerged); + return new DirMergeResult(partFilesWritten, totalMerged, allUnmerged); } /** From 5e235bfacdee299e7fb6e4653636fbcf6b59b3ba Mon Sep 17 00:00:00 2001 From: yongqian Date: Thu, 23 Apr 2026 14:49:43 +0800 Subject: [PATCH 6/6] Add output path validation to prevent overlap with input roots in MergeFiles --- .../java/org/apache/orc/tools/MergeFiles.java | 56 +++++++++++---- .../org/apache/orc/tools/TestMergeFiles.java | 69 ++++++++++++++----- 2 files changed, 96 insertions(+), 29 deletions(-) diff --git a/java/tools/src/java/org/apache/orc/tools/MergeFiles.java b/java/tools/src/java/org/apache/orc/tools/MergeFiles.java index 668fb07826..04d85e3a8f 100644 --- a/java/tools/src/java/org/apache/orc/tools/MergeFiles.java +++ b/java/tools/src/java/org/apache/orc/tools/MergeFiles.java @@ -57,7 +57,7 @@ public static void main(Configuration conf, String[] args) throws Exception { } String outputFilename = cli.getOptionValue("output"); if (outputFilename == null || outputFilename.isEmpty()) { - System.err.println("output filename is null"); + System.err.println("--output path is required."); formatter.printHelp("merge", opts); return; } @@ -81,6 +81,14 @@ public static void main(Configuration conf, String[] args) throws Exception { String[] files = cli.getArgs(); OrcFile.WriterOptions writerOptions = OrcFile.writerOptions(conf); + Path outputPath = new Path(outputFilename); + + // Multi-file modes rewrite the output directory in place, so its subtree must + // not overlap with any input root; otherwise --overwrite could delete files + // that were already enumerated as inputs. + if (preserveStructure || maxSizeBytes > 0) { + assertOutputNotOverlappingInputs(conf, outputPath, files); + } if (preserveStructure) { if (files.length != 1) { @@ -89,7 +97,7 @@ public static void main(Configuration conf, String[] args) throws Exception { System.exit(1); } mergePreserveStructure(conf, writerOptions, new Path(files[0]), - new Path(outputFilename), maxSizeBytes, ignoreExtension, overwrite); + outputPath, maxSizeBytes, ignoreExtension, overwrite); return; } @@ -111,16 +119,41 @@ public static void main(Configuration conf, String[] args) throws Exception { inputStatuses.sort(Comparator.comparing(FileStatus::getPath)); - List inputFiles = new ArrayList<>(inputStatuses.size()); - for (LocatedFileStatus s : inputStatuses) { - inputFiles.add(s.getPath()); - } - if (maxSizeBytes > 0) { - mergeIntoMultipleFiles(conf, writerOptions, inputStatuses, inputFiles, - new Path(outputFilename), maxSizeBytes, overwrite); + mergeIntoMultipleFiles(conf, writerOptions, inputStatuses, + outputPath, maxSizeBytes, overwrite); } else { - mergeIntoSingleFile(writerOptions, inputFiles, new Path(outputFilename), outputFilename); + List inputFiles = new ArrayList<>(inputStatuses.size()); + for (LocatedFileStatus s : inputStatuses) { + inputFiles.add(s.getPath()); + } + mergeIntoSingleFile(writerOptions, inputFiles, outputPath, outputFilename); + } + } + + /** + * Reject any configuration where {@code outputPath} equals, lies under, or + * contains any of the given input roots (after qualification). Used by + * multi-file output modes where the output directory is rewritten in place. + */ + private static void assertOutputNotOverlappingInputs(Configuration conf, + Path outputPath, + String[] inputRoots) throws IOException { + FileSystem outFs = outputPath.getFileSystem(conf); + Path qualifiedOutput = outFs.makeQualified(outputPath); + String outStr = qualifiedOutput.toString(); + String outPrefix = outStr.endsWith("/") ? outStr : outStr + "/"; + for (String root : inputRoots) { + Path rootPath = new Path(root); + FileSystem inFs = rootPath.getFileSystem(conf); + Path qualifiedInput = inFs.makeQualified(rootPath); + String inStr = qualifiedInput.toString(); + String inPrefix = inStr.endsWith("/") ? inStr : inStr + "/"; + if (outStr.equals(inStr) || outStr.startsWith(inPrefix) || inStr.startsWith(outPrefix)) { + throw new IllegalArgumentException( + "Output path must not overlap with any input path: " + + "output=" + outputPath + ", input=" + root); + } } } @@ -164,7 +197,6 @@ private static void mergeIntoSingleFile(OrcFile.WriterOptions writerOptions, private static void mergeIntoMultipleFiles(Configuration conf, OrcFile.WriterOptions writerOptions, List inputStatuses, - List inputFiles, Path outputDir, long maxSizeBytes, boolean overwrite) throws Exception { @@ -178,7 +210,7 @@ private static void mergeIntoMultipleFiles(Configuration conf, System.out.printf( "Output path: %s, Input files size: %d, Merge files size: %d, Output files: %d%n", - outputDir, inputFiles.size(), r.mergedFileCount, r.partFileCount); + outputDir, inputStatuses.size(), r.mergedFileCount, r.partFileCount); if (!r.unmergedFiles.isEmpty()) { System.exit(1); } diff --git a/java/tools/src/test/org/apache/orc/tools/TestMergeFiles.java b/java/tools/src/test/org/apache/orc/tools/TestMergeFiles.java index ba8d3109c8..8012408c92 100644 --- a/java/tools/src/test/org/apache/orc/tools/TestMergeFiles.java +++ b/java/tools/src/test/org/apache/orc/tools/TestMergeFiles.java @@ -29,7 +29,6 @@ import org.apache.orc.TestConf; import org.apache.orc.TypeDescription; import org.apache.orc.Writer; -import org.apache.orc.tools.MergeFiles; import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; @@ -100,7 +99,6 @@ public void testMerge() throws Exception { System.out.flush(); System.setOut(origOut); String output = myOut.toString(StandardCharsets.UTF_8); - System.out.println(output); assertTrue(output.contains("Input files size: 2, Merge files size: 2")); try (Reader reader = OrcFile.createReader(testFilePath, OrcFile.readerOptions(conf))) { @@ -120,15 +118,18 @@ public void testMerge() throws Exception { public void testMergeWithMaxSize() throws Exception { TypeDescription schema = TypeDescription.fromString("struct"); - // Create 3 source ORC files. - String[] sourceNames = { - workDir + File.separator + "ms-1.orc", - workDir + File.separator + "ms-2.orc", - workDir + File.separator + "ms-3.orc" + // Keep the input directory disjoint from the output directory; multi-file + // modes reject any overlap between the two. + Path inputDir = new Path(workDir, "ms-in"); + fs.mkdirs(inputDir); + Path[] sources = { + new Path(inputDir, "ms-1.orc"), + new Path(inputDir, "ms-2.orc"), + new Path(inputDir, "ms-3.orc") }; int[] rowCounts = {5000, 5000, 5000}; - for (int f = 0; f < sourceNames.length; f++) { - Writer writer = OrcFile.createWriter(new Path(sourceNames[f]), + for (int f = 0; f < sources.length; f++) { + Writer writer = OrcFile.createWriter(sources[f], OrcFile.writerOptions(conf).setSchema(schema)); VectorizedRowBatch batch = schema.createRowBatch(); LongColumnVector x = (LongColumnVector) batch.cols[0]; @@ -149,18 +150,18 @@ public void testMergeWithMaxSize() throws Exception { writer.close(); } - long firstTwo = fs.getFileStatus(new Path(sourceNames[0])).getLen() - + fs.getFileStatus(new Path(sourceNames[1])).getLen(); + long firstTwo = fs.getFileStatus(sources[0]).getLen() + + fs.getFileStatus(sources[1]).getLen(); long maxSize = firstTwo + 1; - Path outputDir = new Path(workDir + File.separator + "merge-multi-out"); + Path outputDir = new Path(workDir, "ms-out"); fs.delete(outputDir, true); PrintStream origOut = System.out; ByteArrayOutputStream myOut = new ByteArrayOutputStream(); System.setOut(new PrintStream(myOut, false, StandardCharsets.UTF_8)); try { - MergeFiles.main(conf, new String[]{workDir.toString(), + MergeFiles.main(conf, new String[]{inputDir.toString(), "--output", outputDir.toString(), "--maxSize", String.valueOf(maxSize)}); System.out.flush(); @@ -168,7 +169,6 @@ public void testMergeWithMaxSize() throws Exception { System.setOut(origOut); } String output = myOut.toString(StandardCharsets.UTF_8); - System.out.println(output); assertTrue(output.contains("Input files size: 3"), "Should report 3 input files"); assertTrue(output.contains("Merge files size: 3"), "All 3 files should be merged"); @@ -223,7 +223,6 @@ public void testMergeWithMaxSizeSingleGiantFile() throws Exception { System.setOut(origOut); } String output = myOut.toString(StandardCharsets.UTF_8); - System.out.println(output); assertTrue(output.contains("Input files size: 1"), "Should report 1 input file"); assertTrue(output.contains("Merge files size: 1"), "The giant file should be merged"); @@ -296,6 +295,44 @@ public void testMergeWithMaxSizeOverwriteBehavior() throws Exception { } } + /** + * Multi-file modes must reject configurations where the output path overlaps + * with any input root (equal, under, or containing), so --overwrite cannot + * delete files that were enumerated as inputs. + */ + @Test + public void testMultiFileModeRejectsOutputInsideInput() throws Exception { + TypeDescription schema = TypeDescription.fromString("struct"); + Path inputDir = new Path(workDir, "overlap-in"); + fs.mkdirs(inputDir); + writeOrcFile(new Path(inputDir, "a.orc"), schema, 10); + + // Output directly under inputDir -> overlap. + Path badOutput = new Path(inputDir, "out"); + + IllegalArgumentException maxSizeEx = assertThrows(IllegalArgumentException.class, + () -> MergeFiles.main(conf, new String[]{inputDir.toString(), + "--output", badOutput.toString(), + "--maxSize", "1048576"})); + assertTrue(maxSizeEx.getMessage().contains("must not overlap"), + "Unexpected error: " + maxSizeEx.getMessage()); + + IllegalArgumentException psEx = assertThrows(IllegalArgumentException.class, + () -> MergeFiles.main(conf, new String[]{inputDir.toString(), + "--output", badOutput.toString(), + "--preserveStructure"})); + assertTrue(psEx.getMessage().contains("must not overlap"), + "Unexpected error: " + psEx.getMessage()); + + // Input equal to output is also overlap. + IllegalArgumentException eqEx = assertThrows(IllegalArgumentException.class, + () -> MergeFiles.main(conf, new String[]{inputDir.toString(), + "--output", inputDir.toString(), + "--maxSize", "1048576"})); + assertTrue(eqEx.getMessage().contains("must not overlap"), + "Unexpected error: " + eqEx.getMessage()); + } + /** * Creates an ORC file at the given path with {@code rowCount} rows of the fixed * {@code struct} schema used across these tests. @@ -357,7 +394,6 @@ public void testPreserveStructureTwoLevelPartitions() throws Exception { System.setOut(origOut); } String output = myOut.toString(StandardCharsets.UTF_8); - System.out.println(output); assertTrue(output.contains("Leaves: 3"), "Expected 3 leaves in summary, got:\n" + output); @@ -440,7 +476,6 @@ public void testPreserveStructureSkipsHidden() throws Exception { System.setOut(origOut); } String output = myOut.toString(StandardCharsets.UTF_8); - System.out.println(output); assertTrue(output.contains("Leaves: 1"), "Expected exactly 1 leaf (hidden dir ignored), got:\n" + output);