diff --git a/java/tools/src/java/org/apache/orc/tools/Driver.java b/java/tools/src/java/org/apache/orc/tools/Driver.java index b134a1bb80..48acf340ff 100644 --- a/java/tools/src/java/org/apache/orc/tools/Driver.java +++ b/java/tools/src/java/org/apache/orc/tools/Driver.java @@ -93,7 +93,7 @@ public static void main(String[] args) throws Exception { System.err.println(" data - print the data from the ORC file"); System.err.println(" json-schema - scan JSON files to determine their schema"); System.err.println(" key - print information about the keys"); - System.err.println(" merge - merge multiple ORC files into a single ORC file"); + System.err.println(" merge - merge multiple ORC files into one or more ORC files"); System.err.println(" meta - print the metadata about the ORC file"); System.err.println(" scan - scan the ORC file"); System.err.println(" sizes - list size on disk of each column"); diff --git a/java/tools/src/java/org/apache/orc/tools/MergeFiles.java b/java/tools/src/java/org/apache/orc/tools/MergeFiles.java index 39130f82d5..04d85e3a8f 100644 --- a/java/tools/src/java/org/apache/orc/tools/MergeFiles.java +++ b/java/tools/src/java/org/apache/orc/tools/MergeFiles.java @@ -24,22 +24,29 @@ import org.apache.commons.cli.Option; import org.apache.commons.cli.Options; import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FileStatus; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.LocatedFileStatus; import org.apache.hadoop.fs.Path; import org.apache.hadoop.fs.RemoteIterator; import org.apache.orc.OrcFile; +import java.io.IOException; import java.util.ArrayList; +import java.util.Comparator; import java.util.HashSet; import java.util.List; import java.util.Set; /** - * Merge multiple ORC files that all have the same schema into a single ORC file. + * Merge multiple ORC files that all have the same schema into one or more ORC files. + * When {@code --maxSize} is specified, the tool splits output into multiple part files + * under the given output directory, each not exceeding the specified size threshold. */ public class MergeFiles { + static final String PART_FILE_FORMAT = "part-%05d.orc"; + public static void main(Configuration conf, String[] args) throws Exception { Options opts = createOptions(); CommandLine cli = new DefaultParser().parse(opts, args); @@ -50,33 +57,114 @@ public static void main(Configuration conf, String[] args) throws Exception { } String outputFilename = cli.getOptionValue("output"); if (outputFilename == null || outputFilename.isEmpty()) { - System.err.println("output filename is null"); + System.err.println("--output path is required."); formatter.printHelp("merge", opts); return; } boolean ignoreExtension = cli.hasOption("ignoreExtension"); + boolean preserveStructure = cli.hasOption("preserveStructure"); + boolean overwrite = cli.hasOption("overwrite"); - List inputFiles = new ArrayList<>(); - OrcFile.WriterOptions writerOptions = OrcFile.writerOptions(conf); + long maxSizeBytes = 0; + if (cli.hasOption("maxSize")) { + try { + maxSizeBytes = Long.parseLong(cli.getOptionValue("maxSize")); + } catch (NumberFormatException e) { + System.err.println("--maxSize requires a numeric value in bytes."); + System.exit(1); + } + if (maxSizeBytes <= 0) { + System.err.println("--maxSize must be a positive number of bytes."); + System.exit(1); + } + } String[] files = cli.getArgs(); + OrcFile.WriterOptions writerOptions = OrcFile.writerOptions(conf); + Path outputPath = new Path(outputFilename); + + // Multi-file modes rewrite the output directory in place, so its subtree must + // not overlap with any input root; otherwise --overwrite could delete files + // that were already enumerated as inputs. + if (preserveStructure || maxSizeBytes > 0) { + assertOutputNotOverlappingInputs(conf, outputPath, files); + } + + if (preserveStructure) { + if (files.length != 1) { + System.err.println( + "--preserveStructure requires exactly one input directory."); + System.exit(1); + } + mergePreserveStructure(conf, writerOptions, new Path(files[0]), + outputPath, maxSizeBytes, ignoreExtension, overwrite); + return; + } + + List inputStatuses = new ArrayList<>(); for (String root : files) { Path rootPath = new Path(root); FileSystem fs = rootPath.getFileSystem(conf); for (RemoteIterator itr = fs.listFiles(rootPath, true); itr.hasNext(); ) { LocatedFileStatus status = itr.next(); if (status.isFile() && (ignoreExtension || status.getPath().getName().endsWith(".orc"))) { - inputFiles.add(status.getPath()); + inputStatuses.add(status); } } } - if (inputFiles.isEmpty()) { + if (inputStatuses.isEmpty()) { System.err.println("No files found."); System.exit(1); } - List mergedFiles = OrcFile.mergeFiles( - new Path(outputFilename), writerOptions, inputFiles); + inputStatuses.sort(Comparator.comparing(FileStatus::getPath)); + + if (maxSizeBytes > 0) { + mergeIntoMultipleFiles(conf, writerOptions, inputStatuses, + outputPath, maxSizeBytes, overwrite); + } else { + List inputFiles = new ArrayList<>(inputStatuses.size()); + for (LocatedFileStatus s : inputStatuses) { + inputFiles.add(s.getPath()); + } + mergeIntoSingleFile(writerOptions, inputFiles, outputPath, outputFilename); + } + } + + /** + * Reject any configuration where {@code outputPath} equals, lies under, or + * contains any of the given input roots (after qualification). Used by + * multi-file output modes where the output directory is rewritten in place. + */ + private static void assertOutputNotOverlappingInputs(Configuration conf, + Path outputPath, + String[] inputRoots) throws IOException { + FileSystem outFs = outputPath.getFileSystem(conf); + Path qualifiedOutput = outFs.makeQualified(outputPath); + String outStr = qualifiedOutput.toString(); + String outPrefix = outStr.endsWith("/") ? outStr : outStr + "/"; + for (String root : inputRoots) { + Path rootPath = new Path(root); + FileSystem inFs = rootPath.getFileSystem(conf); + Path qualifiedInput = inFs.makeQualified(rootPath); + String inStr = qualifiedInput.toString(); + String inPrefix = inStr.endsWith("/") ? inStr : inStr + "/"; + if (outStr.equals(inStr) || outStr.startsWith(inPrefix) || inStr.startsWith(outPrefix)) { + throw new IllegalArgumentException( + "Output path must not overlap with any input path: " + + "output=" + outputPath + ", input=" + root); + } + } + } + + /** + * Original single-output behavior (no --maxSize). + */ + private static void mergeIntoSingleFile(OrcFile.WriterOptions writerOptions, + List inputFiles, + Path outputPath, + String outputFilename) throws Exception { + List mergedFiles = OrcFile.mergeFiles(outputPath, writerOptions, inputFiles); List unSuccessMergedFiles = new ArrayList<>(); if (mergedFiles.size() != inputFiles.size()) { @@ -100,11 +188,345 @@ public static void main(Configuration conf, String[] args) throws Exception { } } + /** + * Multi-output behavior when --maxSize is set. + * Input files are grouped by cumulative raw file size; each group is merged into + * a separate part file (part-00000.orc, part-00001.orc, ...) under outputDir. + * A single file whose size already exceeds maxSizeBytes is placed in its own part. + */ + private static void mergeIntoMultipleFiles(Configuration conf, + OrcFile.WriterOptions writerOptions, + List inputStatuses, + Path outputDir, + long maxSizeBytes, + boolean overwrite) throws Exception { + DirMergeResult r = mergeBatchedIntoDir(conf, writerOptions, inputStatuses, + outputDir, maxSizeBytes, overwrite); + + if (!r.unmergedFiles.isEmpty()) { + System.err.println("List of files that could not be merged:"); + r.unmergedFiles.forEach(path -> System.err.println(path.toString())); + } + + System.out.printf( + "Output path: %s, Input files size: %d, Merge files size: %d, Output files: %d%n", + outputDir, inputStatuses.size(), r.mergedFileCount, r.partFileCount); + if (!r.unmergedFiles.isEmpty()) { + System.exit(1); + } + } + + /** + * Preserve the relative directory structure of {@code inputRoot} under + * {@code outputRoot}: every directory containing ORC files (a "leaf") is merged + * independently, and its relative path is mirrored beneath {@code outputRoot}. + * + *

Leaves are located by a depth-first walk of {@code inputRoot}. A directory + * that contains both ORC files and subdirectories is considered ambiguous and + * raises an error. Hidden entries (names starting with {@code '_'} or + * {@code '.'}) are always skipped, matching Hive/Spark conventions for markers + * like {@code _SUCCESS}, {@code _committed_*} or {@code _temporary}. + * + *

If {@code --maxSize} was not supplied ({@code maxSizeBytes == 0}), each + * leaf is merged into a single {@code part-00000.orc} file; otherwise + * ({@code maxSizeBytes > 0}, already validated in {@link #main}) the files + * under a leaf are split into size-bounded part files exactly as in the flat + * multi-file mode. + */ + private static void mergePreserveStructure(Configuration conf, + OrcFile.WriterOptions writerOptions, + Path inputRoot, + Path outputRoot, + long maxSizeBytes, + boolean ignoreExtension, + boolean overwrite) throws Exception { + FileSystem inFs = inputRoot.getFileSystem(conf); + if (!inFs.exists(inputRoot) || !inFs.getFileStatus(inputRoot).isDirectory()) { + throw new IllegalArgumentException( + "Input path must be an existing directory with --preserveStructure: " + inputRoot); + } + + Path qualifiedInputRoot = inFs.makeQualified(inputRoot); + List leaves = new ArrayList<>(); + collectLeafDirs(inFs, qualifiedInputRoot, ignoreExtension, leaves); + + if (leaves.isEmpty()) { + System.err.println("No leaf directories containing ORC files found under: " + inputRoot); + System.exit(1); + } + + FileSystem outFs = outputRoot.getFileSystem(conf); + prepareOutputDir(outFs, outputRoot, overwrite, "--preserveStructure"); + + long effectiveMax = maxSizeBytes > 0 ? maxSizeBytes : Long.MAX_VALUE; + int totalInputFiles = 0; + int totalMerged = 0; + int totalPartFiles = 0; + List allUnmerged = new ArrayList<>(); + + for (Path leaf : leaves) { + String relative = relativize(qualifiedInputRoot, inFs.makeQualified(leaf)); + Path outputLeaf = relative.isEmpty() ? outputRoot : new Path(outputRoot, relative); + + List leafStatuses = + listLeafFiles(inFs, leaf, ignoreExtension); + if (leafStatuses.isEmpty()) { + continue; + } + + // Each leaf's output directory lives under a freshly-prepared outputRoot, + // so we don't need (and shouldn't force) overwrite semantics here. + DirMergeResult r = mergeBatchedIntoDir( + conf, writerOptions, leafStatuses, outputLeaf, effectiveMax, false); + + totalInputFiles += leafStatuses.size(); + totalMerged += r.mergedFileCount; + totalPartFiles += r.partFileCount; + allUnmerged.addAll(r.unmergedFiles); + + System.out.printf( + "Leaf: %s -> %s, Input files: %d, Merge files: %d, Output files: %d%n", + leaf, outputLeaf, leafStatuses.size(), r.mergedFileCount, r.partFileCount); + } + + if (!allUnmerged.isEmpty()) { + System.err.println("List of files that could not be merged:"); + allUnmerged.forEach(path -> System.err.println(path.toString())); + } + + System.out.printf( + "Output root: %s, Leaves: %d, Total input files: %d, " + + "Total merge files: %d, Total output files: %d%n", + outputRoot, leaves.size(), totalInputFiles, totalMerged, totalPartFiles); + if (!allUnmerged.isEmpty()) { + System.exit(1); + } + } + + /** + * Core per-directory merge: groups {@code inputStatuses} into size-bounded batches + * and writes each batch as {@code part-NNNNN.orc} under {@code outputDir}. This + * helper contains no I/O side effects on stdout/stderr or process exit; callers + * aggregate/report results themselves. + */ + private static DirMergeResult mergeBatchedIntoDir(Configuration conf, + OrcFile.WriterOptions writerOptions, + List inputStatuses, + Path outputDir, + long maxSizeBytes, + boolean overwrite) throws Exception { + FileSystem outFs = outputDir.getFileSystem(conf); + prepareOutputDir(outFs, outputDir, overwrite, "multi-file merge"); + + List> batches = new ArrayList<>(); + List currentBatch = new ArrayList<>(); + long currentBatchSize = 0; + + for (LocatedFileStatus status : inputStatuses) { + long fileSize = status.getLen(); + if (!currentBatch.isEmpty() && currentBatchSize > maxSizeBytes - fileSize) { + batches.add(currentBatch); + currentBatch = new ArrayList<>(); + currentBatchSize = 0; + } + currentBatch.add(status.getPath()); + currentBatchSize += fileSize; + } + if (!currentBatch.isEmpty()) { + batches.add(currentBatch); + } + + int totalMerged = 0; + int partFilesWritten = 0; + List allUnmerged = new ArrayList<>(); + + // Advance the part index only when a file is actually written, so + // part-NNNNN.orc stays contiguous and the reported count matches disk. + for (List batch : batches) { + Path partOutput = new Path(outputDir, String.format(PART_FILE_FORMAT, partFilesWritten)); + List merged = OrcFile.mergeFiles(partOutput, writerOptions.clone(), batch); + + if (merged.isEmpty()) { + // Drop any 0-stripe placeholder left behind and skip this slot. + if (outFs.exists(partOutput)) { + outFs.delete(partOutput, false); + } + allUnmerged.addAll(batch); + continue; + } + + totalMerged += merged.size(); + partFilesWritten++; + + if (merged.size() != batch.size()) { + Set mergedSet = new HashSet<>(merged); + for (Path p : batch) { + if (!mergedSet.contains(p)) { + allUnmerged.add(p); + } + } + } + } + + return new DirMergeResult(partFilesWritten, totalMerged, allUnmerged); + } + + /** + * Ensure {@code outputDir} is an empty directory ready to receive part files. + *

    + *
  • If it does not exist, it is created.
  • + *
  • If it exists and is empty, it is used as-is.
  • + *
  • If it exists, is a directory and non-empty: + *
      + *
    • when {@code overwrite} is {@code true}: a warning is printed and the + * directory is deleted recursively, then recreated.
    • + *
    • when {@code overwrite} is {@code false}: an + * {@link IllegalArgumentException} is thrown so callers do not + * accidentally destroy existing data.
    • + *
    + *
  • + *
  • If it exists but is not a directory, an exception is thrown.
  • + *
+ */ + private static void prepareOutputDir(FileSystem outFs, + Path outputDir, + boolean overwrite, + String modeLabel) throws IOException { + if (outFs.exists(outputDir)) { + if (!outFs.getFileStatus(outputDir).isDirectory()) { + throw new IllegalArgumentException( + "Output path already exists and is not a directory: " + outputDir); + } + FileStatus[] existing = outFs.listStatus(outputDir); + if (existing.length > 0) { + if (!overwrite) { + throw new IllegalArgumentException( + "Output directory is not empty for " + modeLabel + ": " + outputDir + + " (use --overwrite to delete existing contents)"); + } + System.err.println("Overwriting existing non-empty output directory: " + outputDir + + " (" + existing.length + " entries will be deleted)"); + if (!outFs.delete(outputDir, true)) { + throw new IllegalStateException( + "Failed to clean existing output directory: " + outputDir); + } + if (!outFs.mkdirs(outputDir)) { + throw new IllegalStateException( + "Failed to recreate output directory: " + outputDir); + } + } + } else if (!outFs.mkdirs(outputDir)) { + throw new IllegalStateException("Failed to create output directory: " + outputDir); + } + } + + /** + * Recursively walk {@code dir}; append every directory that directly contains at + * least one ORC file to {@code out}. A directory containing both ORC files and + * subdirectories is treated as an error. Hidden entries (names starting with + * {@code '_'} or {@code '.'}) are always skipped to match Hive/Spark conventions + * around {@code _SUCCESS}, {@code _temporary}, etc. + */ + private static void collectLeafDirs(FileSystem fs, + Path dir, + boolean ignoreExtension, + List out) throws IOException { + FileStatus[] children = fs.listStatus(dir); + List orcFiles = new ArrayList<>(); + List subDirs = new ArrayList<>(); + for (FileStatus c : children) { + String name = c.getPath().getName(); + if (isHidden(name)) { + continue; + } + if (c.isDirectory()) { + subDirs.add(c); + } else if (isOrcCandidate(c, ignoreExtension)) { + orcFiles.add(c); + } + } + if (!orcFiles.isEmpty() && !subDirs.isEmpty()) { + throw new IllegalArgumentException( + "Directory contains both ORC files and subdirectories which is ambiguous" + + " under --preserveStructure: " + dir); + } + if (!orcFiles.isEmpty()) { + out.add(dir); + return; + } + subDirs.sort(Comparator.comparing(FileStatus::getPath)); + for (FileStatus sub : subDirs) { + collectLeafDirs(fs, sub.getPath(), ignoreExtension, out); + } + } + + /** + * List direct ORC-file children of a leaf directory (non-recursive), skipping + * hidden entries (names starting with {@code '_'} or {@code '.'}). + */ + private static List listLeafFiles(FileSystem fs, + Path leaf, + boolean ignoreExtension) + throws IOException { + List out = new ArrayList<>(); + for (RemoteIterator it = fs.listLocatedStatus(leaf); it.hasNext(); ) { + LocatedFileStatus s = it.next(); + String name = s.getPath().getName(); + if (isHidden(name)) { + continue; + } + if (isOrcCandidate(s, ignoreExtension)) { + out.add(s); + } + } + out.sort(Comparator.comparing(FileStatus::getPath)); + return out; + } + + private static boolean isHidden(String name) { + return !name.isEmpty() && (name.charAt(0) == '_' || name.charAt(0) == '.'); + } + + private static boolean isOrcCandidate(FileStatus s, boolean ignoreExtension) { + return s.isFile() && (ignoreExtension || s.getPath().getName().endsWith(".orc")); + } + + /** + * Return the portion of {@code child}'s path that lies beneath {@code root}. Both + * arguments should already be qualified (same scheme/authority). Returns an empty + * string when {@code child} equals {@code root}. + */ + private static String relativize(Path root, Path child) { + String rootStr = Path.getPathWithoutSchemeAndAuthority(root).toString(); + String childStr = Path.getPathWithoutSchemeAndAuthority(child).toString(); + if (childStr.equals(rootStr)) { + return ""; + } + String prefix = rootStr.endsWith("/") ? rootStr : rootStr + "/"; + if (!childStr.startsWith(prefix)) { + throw new IllegalStateException( + "Child path is not under root: child=" + child + ", root=" + root); + } + return childStr.substring(prefix.length()); + } + + private static final class DirMergeResult { + final int partFileCount; + final int mergedFileCount; + final List unmergedFiles; + + DirMergeResult(int partFileCount, int mergedFileCount, List unmergedFiles) { + this.partFileCount = partFileCount; + this.mergedFileCount = mergedFileCount; + this.unmergedFiles = unmergedFiles; + } + } + private static Options createOptions() { Options result = new Options(); result.addOption(Option.builder("o") .longOpt("output") - .desc("Output filename") + .desc("Output filename (single-file mode) or output directory (multi-file mode)") .hasArg() .build()); @@ -113,6 +535,40 @@ private static Options createOptions() { .desc("Ignore ORC file extension") .build()); + result.addOption(Option.builder("m") + .longOpt("maxSize") + .desc("Maximum cumulative input file size in bytes per output ORC file. Must be a " + + "positive integer; a value of 0, a negative value, or a non-numeric value " + + "causes the tool to exit with an error. When set, --output is treated as an " + + "output directory and merged files are written as part-00000.orc, " + + "part-00001.orc, etc. Input files are grouped at file boundaries so an " + + "individual file larger than this threshold will still be placed in its own " + + "part.") + .hasArg() + .build()); + + result.addOption(Option.builder("p") + .longOpt("preserveStructure") + .desc("Mirror the input directory structure under --output: each directory that " + + "directly contains ORC files (a 'leaf' directory, e.g. a Hive partition " + + "such as d=2025-04-25/h=01) is merged independently and written to the " + + "corresponding relative path under --output. Works with any nesting depth. " + + "Requires exactly one input directory. Hidden files/directories (names " + + "starting with '_' or '.', such as _SUCCESS or _temporary) are always " + + "skipped to match Hive/Spark conventions. A directory that contains both " + + "ORC files and subdirectories is rejected.") + .build()); + + result.addOption(Option.builder() + .longOpt("overwrite") + .desc("Applies to multi-file output modes (--maxSize and --preserveStructure). " + + "If the output directory already exists and is non-empty, delete its " + + "contents before writing merged part files. Without this flag, the tool " + + "aborts with an error when the output directory is non-empty so that " + + "existing data is not silently destroyed. Only a long form is provided " + + "to avoid any risk of confusing --overwrite with --output (-o).") + .build()); + result.addOption(Option.builder("h") .longOpt("help") .desc("Print help message") diff --git a/java/tools/src/test/org/apache/orc/tools/TestMergeFiles.java b/java/tools/src/test/org/apache/orc/tools/TestMergeFiles.java index 3fdfeba0c4..8012408c92 100644 --- a/java/tools/src/test/org/apache/orc/tools/TestMergeFiles.java +++ b/java/tools/src/test/org/apache/orc/tools/TestMergeFiles.java @@ -29,7 +29,6 @@ import org.apache.orc.TestConf; import org.apache.orc.TypeDescription; import org.apache.orc.Writer; -import org.apache.orc.tools.MergeFiles; import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; @@ -42,6 +41,8 @@ import java.util.Map; import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertFalse; +import static org.junit.jupiter.api.Assertions.assertThrows; import static org.junit.jupiter.api.Assertions.assertTrue; public class TestMergeFiles implements TestConf { @@ -53,6 +54,7 @@ public class TestMergeFiles implements TestConf { @BeforeEach public void openFileSystem() throws Exception { fs = FileSystem.getLocal(conf); + fs.delete(workDir, true); fs.mkdirs(workDir); fs.deleteOnExit(workDir); testFilePath = new Path(workDir + File.separator + "TestMergeFiles.testMerge.orc"); @@ -97,7 +99,6 @@ public void testMerge() throws Exception { System.out.flush(); System.setOut(origOut); String output = myOut.toString(StandardCharsets.UTF_8); - System.out.println(output); assertTrue(output.contains("Input files size: 2, Merge files size: 2")); try (Reader reader = OrcFile.createReader(testFilePath, OrcFile.readerOptions(conf))) { @@ -107,4 +108,385 @@ public void testMerge() throws Exception { assertEquals(10000 + 20000, reader.getNumberOfRows()); } } + + /** + * Verifies that --maxSize splits input files into multiple part files under the output + * directory. Three source files are created; a tight size threshold forces them to be + * written into at least two part files. + */ + @Test + public void testMergeWithMaxSize() throws Exception { + TypeDescription schema = TypeDescription.fromString("struct"); + + // Keep the input directory disjoint from the output directory; multi-file + // modes reject any overlap between the two. + Path inputDir = new Path(workDir, "ms-in"); + fs.mkdirs(inputDir); + Path[] sources = { + new Path(inputDir, "ms-1.orc"), + new Path(inputDir, "ms-2.orc"), + new Path(inputDir, "ms-3.orc") + }; + int[] rowCounts = {5000, 5000, 5000}; + for (int f = 0; f < sources.length; f++) { + Writer writer = OrcFile.createWriter(sources[f], + OrcFile.writerOptions(conf).setSchema(schema)); + VectorizedRowBatch batch = schema.createRowBatch(); + LongColumnVector x = (LongColumnVector) batch.cols[0]; + BytesColumnVector y = (BytesColumnVector) batch.cols[1]; + for (int r = 0; r < rowCounts[f]; ++r) { + int row = batch.size++; + x.vector[row] = r; + byte[] buffer = ("val-" + r).getBytes(); + y.setRef(row, buffer, 0, buffer.length); + if (batch.size == batch.getMaxSize()) { + writer.addRowBatch(batch); + batch.reset(); + } + } + if (batch.size != 0) { + writer.addRowBatch(batch); + } + writer.close(); + } + + long firstTwo = fs.getFileStatus(sources[0]).getLen() + + fs.getFileStatus(sources[1]).getLen(); + long maxSize = firstTwo + 1; + + Path outputDir = new Path(workDir, "ms-out"); + fs.delete(outputDir, true); + + PrintStream origOut = System.out; + ByteArrayOutputStream myOut = new ByteArrayOutputStream(); + System.setOut(new PrintStream(myOut, false, StandardCharsets.UTF_8)); + try { + MergeFiles.main(conf, new String[]{inputDir.toString(), + "--output", outputDir.toString(), + "--maxSize", String.valueOf(maxSize)}); + System.out.flush(); + } finally { + System.setOut(origOut); + } + String output = myOut.toString(StandardCharsets.UTF_8); + + assertTrue(output.contains("Input files size: 3"), "Should report 3 input files"); + assertTrue(output.contains("Merge files size: 3"), "All 3 files should be merged"); + assertTrue(fs.isDirectory(outputDir), "Output directory should be created"); + + // Verify that multiple part files were created and total row count is correct. + long totalRows = 0; + int partCount = 0; + for (int i = 0; ; i++) { + Path part = new Path(outputDir, String.format(MergeFiles.PART_FILE_FORMAT, i)); + if (!fs.exists(part)) { + break; + } + partCount++; + try (Reader reader = OrcFile.createReader(part, OrcFile.readerOptions(conf))) { + totalRows += reader.getNumberOfRows(); + } + } + assertEquals(2, partCount, "Expected exactly two output part files, got: " + partCount); + assertEquals(5000 + 5000 + 5000, totalRows, "Total row count across all parts should match"); + } + + /** + * A single input file that is larger than --maxSize must still be emitted as its + * own part file (we never split an input). + */ + @Test + public void testMergeWithMaxSizeSingleGiantFile() throws Exception { + TypeDescription schema = TypeDescription.fromString("struct"); + + Path inputDir = new Path(workDir, "giant-in"); + fs.mkdirs(inputDir); + Path giant = new Path(inputDir, "giant.orc"); + writeOrcFile(giant, schema, 20000); + long giantSize = fs.getFileStatus(giant).getLen(); + assertTrue(giantSize > 0, "Giant source file should have non-zero size"); + + Path outputDir = new Path(workDir, "giant-out"); + fs.delete(outputDir, true); + + long maxSize = Math.max(1L, giantSize / 2); + + PrintStream origOut = System.out; + ByteArrayOutputStream myOut = new ByteArrayOutputStream(); + System.setOut(new PrintStream(myOut, false, StandardCharsets.UTF_8)); + try { + MergeFiles.main(conf, new String[]{inputDir.toString(), + "--output", outputDir.toString(), + "--maxSize", String.valueOf(maxSize)}); + System.out.flush(); + } finally { + System.setOut(origOut); + } + String output = myOut.toString(StandardCharsets.UTF_8); + + assertTrue(output.contains("Input files size: 1"), "Should report 1 input file"); + assertTrue(output.contains("Merge files size: 1"), "The giant file should be merged"); + + Path part0 = new Path(outputDir, String.format(MergeFiles.PART_FILE_FORMAT, 0)); + Path part1 = new Path(outputDir, String.format(MergeFiles.PART_FILE_FORMAT, 1)); + assertTrue(fs.exists(part0), "Expected part-00000.orc"); + assertFalse(fs.exists(part1), "Expected exactly one part file for a single input"); + try (Reader reader = OrcFile.createReader(part0, OrcFile.readerOptions(conf))) { + assertEquals(20000, reader.getNumberOfRows(), + "Single giant file must be preserved intact in its own part"); + } + } + + /** + * By default running the merge against a non-empty output directory must fail so + * that existing data is not silently destroyed. With --overwrite the directory's + * contents are deleted before new part files are written. + */ + @Test + public void testMergeWithMaxSizeOverwriteBehavior() throws Exception { + TypeDescription schema = TypeDescription.fromString("struct"); + + Path inputDir = new Path(workDir, "ow-in"); + fs.mkdirs(inputDir); + writeOrcFile(new Path(inputDir, "a.orc"), schema, 100); + + Path outputDir = new Path(workDir, "ow-out"); + fs.delete(outputDir, true); + fs.mkdirs(outputDir); + Path stale = new Path(outputDir, "stale.txt"); + fs.create(stale).close(); + + String[] baseArgs = {inputDir.toString(), + "--output", outputDir.toString(), + "--maxSize", "1048576"}; + + // Without --overwrite: should reject non-empty output directory and leave it untouched. + IllegalArgumentException ex = assertThrows(IllegalArgumentException.class, + () -> MergeFiles.main(conf, baseArgs)); + assertTrue(ex.getMessage().contains("not empty") && + ex.getMessage().contains("--overwrite"), + "Error should mention --overwrite: " + ex.getMessage()); + assertTrue(fs.exists(stale), + "Existing content must be preserved when overwrite is refused"); + + // With --overwrite: existing content is cleared and fresh part files are written. + String[] owArgs = new String[baseArgs.length + 1]; + System.arraycopy(baseArgs, 0, owArgs, 0, baseArgs.length); + owArgs[baseArgs.length] = "--overwrite"; + + PrintStream origErr = System.err; + ByteArrayOutputStream myErr = new ByteArrayOutputStream(); + System.setErr(new PrintStream(myErr, false, StandardCharsets.UTF_8)); + try { + MergeFiles.main(conf, owArgs); + System.err.flush(); + } finally { + System.setErr(origErr); + } + String errOut = myErr.toString(StandardCharsets.UTF_8); + assertTrue(errOut.contains("Overwriting existing non-empty output directory"), + "Expected an overwrite warning on stderr, got:\n" + errOut); + assertFalse(fs.exists(stale), "Pre-existing file should be cleared by --overwrite"); + + Path part0 = new Path(outputDir, String.format(MergeFiles.PART_FILE_FORMAT, 0)); + assertTrue(fs.exists(part0), "Expected part-00000.orc after overwrite"); + try (Reader reader = OrcFile.createReader(part0, OrcFile.readerOptions(conf))) { + assertEquals(100, reader.getNumberOfRows()); + } + } + + /** + * Multi-file modes must reject configurations where the output path overlaps + * with any input root (equal, under, or containing), so --overwrite cannot + * delete files that were enumerated as inputs. + */ + @Test + public void testMultiFileModeRejectsOutputInsideInput() throws Exception { + TypeDescription schema = TypeDescription.fromString("struct"); + Path inputDir = new Path(workDir, "overlap-in"); + fs.mkdirs(inputDir); + writeOrcFile(new Path(inputDir, "a.orc"), schema, 10); + + // Output directly under inputDir -> overlap. + Path badOutput = new Path(inputDir, "out"); + + IllegalArgumentException maxSizeEx = assertThrows(IllegalArgumentException.class, + () -> MergeFiles.main(conf, new String[]{inputDir.toString(), + "--output", badOutput.toString(), + "--maxSize", "1048576"})); + assertTrue(maxSizeEx.getMessage().contains("must not overlap"), + "Unexpected error: " + maxSizeEx.getMessage()); + + IllegalArgumentException psEx = assertThrows(IllegalArgumentException.class, + () -> MergeFiles.main(conf, new String[]{inputDir.toString(), + "--output", badOutput.toString(), + "--preserveStructure"})); + assertTrue(psEx.getMessage().contains("must not overlap"), + "Unexpected error: " + psEx.getMessage()); + + // Input equal to output is also overlap. + IllegalArgumentException eqEx = assertThrows(IllegalArgumentException.class, + () -> MergeFiles.main(conf, new String[]{inputDir.toString(), + "--output", inputDir.toString(), + "--maxSize", "1048576"})); + assertTrue(eqEx.getMessage().contains("must not overlap"), + "Unexpected error: " + eqEx.getMessage()); + } + + /** + * Creates an ORC file at the given path with {@code rowCount} rows of the fixed + * {@code struct} schema used across these tests. + */ + private void writeOrcFile(Path path, TypeDescription schema, int rowCount) throws Exception { + Writer writer = OrcFile.createWriter(path, + OrcFile.writerOptions(conf).setSchema(schema)); + VectorizedRowBatch batch = schema.createRowBatch(); + LongColumnVector x = (LongColumnVector) batch.cols[0]; + BytesColumnVector y = (BytesColumnVector) batch.cols[1]; + for (int r = 0; r < rowCount; ++r) { + int row = batch.size++; + x.vector[row] = r; + byte[] buffer = ("val-" + r).getBytes(); + y.setRef(row, buffer, 0, buffer.length); + if (batch.size == batch.getMaxSize()) { + writer.addRowBatch(batch); + batch.reset(); + } + } + if (batch.size != 0) { + writer.addRowBatch(batch); + } + writer.close(); + } + + /** + * Two-level nested partitions (d=.../h=01, h=02, h=03). With --preserveStructure, + * each leaf partition must be merged independently and its relative path mirrored + * under the output root. Data MUST NOT be cross-mixed between leaves. + */ + @Test + public void testPreserveStructureTwoLevelPartitions() throws Exception { + TypeDescription schema = TypeDescription.fromString("struct"); + + Path inputRoot = new Path(workDir, "ps-in"); + Path partitionBase = new Path(inputRoot, "d=2025-04-25"); + + int[] hRows = {100, 200, 300}; + for (int h = 0; h < hRows.length; h++) { + Path leaf = new Path(partitionBase, String.format("h=%02d", h + 1)); + fs.mkdirs(leaf); + writeOrcFile(new Path(leaf, "a.orc"), schema, hRows[h]); + writeOrcFile(new Path(leaf, "b.orc"), schema, hRows[h] / 2); + } + + Path outputRoot = new Path(workDir, "ps-out"); + fs.delete(outputRoot, true); + + PrintStream origOut = System.out; + ByteArrayOutputStream myOut = new ByteArrayOutputStream(); + System.setOut(new PrintStream(myOut, false, StandardCharsets.UTF_8)); + try { + MergeFiles.main(conf, new String[]{inputRoot.toString(), + "--output", outputRoot.toString(), + "--preserveStructure"}); + System.out.flush(); + } finally { + System.setOut(origOut); + } + String output = myOut.toString(StandardCharsets.UTF_8); + + assertTrue(output.contains("Leaves: 3"), + "Expected 3 leaves in summary, got:\n" + output); + + for (int h = 0; h < hRows.length; h++) { + Path outLeaf = new Path(outputRoot, "d=2025-04-25/" + String.format("h=%02d", h + 1)); + assertTrue(fs.isDirectory(outLeaf), + "Expected mirrored leaf directory to exist: " + outLeaf); + Path part0 = new Path(outLeaf, String.format(MergeFiles.PART_FILE_FORMAT, 0)); + assertTrue(fs.exists(part0), + "Expected part-00000.orc under leaf: " + outLeaf); + long expectedRows = hRows[h] + hRows[h] / 2; + try (Reader reader = OrcFile.createReader(part0, OrcFile.readerOptions(conf))) { + assertEquals(expectedRows, reader.getNumberOfRows(), + "Row count mismatch for " + outLeaf); + assertEquals(schema, reader.getSchema()); + } + } + } + + /** + * A directory containing BOTH ORC files and subdirectories is ambiguous under + * --preserveStructure and must be rejected before any output is written. + */ + @Test + public void testPreserveStructureRejectsMixedDirectory() throws Exception { + TypeDescription schema = TypeDescription.fromString("struct"); + + Path inputRoot = new Path(workDir, "psmix-in"); + fs.mkdirs(inputRoot); + writeOrcFile(new Path(inputRoot, "stray.orc"), schema, 10); + Path subLeaf = new Path(inputRoot, "h=01"); + fs.mkdirs(subLeaf); + writeOrcFile(new Path(subLeaf, "a.orc"), schema, 20); + + Path outputRoot = new Path(workDir, "psmix-out"); + fs.delete(outputRoot, true); + + IllegalArgumentException ex = assertThrows(IllegalArgumentException.class, + () -> MergeFiles.main(conf, new String[]{inputRoot.toString(), + "--output", outputRoot.toString(), + "--preserveStructure"})); + assertTrue(ex.getMessage().contains("both ORC files and subdirectories"), + "Unexpected error message: " + ex.getMessage()); + } + + /** + * Hidden files/directories (names starting with '_' or '.') must always be + * ignored, so files like _SUCCESS or _temporary/*.orc don't pollute the merged + * output. + */ + @Test + public void testPreserveStructureSkipsHidden() throws Exception { + TypeDescription schema = TypeDescription.fromString("struct"); + + Path inputRoot = new Path(workDir, "pshidden-in"); + Path leaf = new Path(inputRoot, "h=01"); + fs.mkdirs(leaf); + writeOrcFile(new Path(leaf, "data.orc"), schema, 30); + // _SUCCESS-style hidden file next to the real data — should be ignored by default. + writeOrcFile(new Path(leaf, "_hidden.orc"), schema, 999); + // Hidden sibling directory (e.g. _temporary) whose contents would otherwise + // appear as a second leaf — should be skipped entirely. + Path hiddenDir = new Path(inputRoot, "_temporary"); + fs.mkdirs(hiddenDir); + writeOrcFile(new Path(hiddenDir, "a.orc"), schema, 7); + + Path outputRoot = new Path(workDir, "pshidden-out"); + fs.delete(outputRoot, true); + + PrintStream origOut = System.out; + ByteArrayOutputStream myOut = new ByteArrayOutputStream(); + System.setOut(new PrintStream(myOut, false, StandardCharsets.UTF_8)); + try { + MergeFiles.main(conf, new String[]{inputRoot.toString(), + "--output", outputRoot.toString(), + "--preserveStructure"}); + System.out.flush(); + } finally { + System.setOut(origOut); + } + String output = myOut.toString(StandardCharsets.UTF_8); + + assertTrue(output.contains("Leaves: 1"), + "Expected exactly 1 leaf (hidden dir ignored), got:\n" + output); + assertFalse(fs.exists(new Path(outputRoot, "_temporary")), + "Hidden input directory should not be mirrored to output"); + + Path part0 = new Path(outputRoot, "h=01/" + String.format(MergeFiles.PART_FILE_FORMAT, 0)); + assertTrue(fs.exists(part0)); + try (Reader reader = OrcFile.createReader(part0, OrcFile.readerOptions(conf))) { + assertEquals(30, reader.getNumberOfRows(), + "Hidden _hidden.orc rows must not be merged in"); + } + } } diff --git a/site/_docs/java-tools.md b/site/_docs/java-tools.md index a3d546e007..4173b8c47d 100644 --- a/site/_docs/java-tools.md +++ b/site/_docs/java-tools.md @@ -17,7 +17,7 @@ The subcommands for the tools are: * data - print the data of an ORC file * json-schema (since ORC 1.4) - determine the schema of JSON documents * key (since ORC 1.5) - print information about the encryption keys - * merge (since ORC 2.0.1) - merge multiple ORC files into a single ORC file + * merge (since ORC 2.0.1) - merge multiple ORC files into one or more ORC files * meta - print the metadata of an ORC file * scan (since ORC 1.3) - scan the data for benchmarking * sizes (since ORC 1.7.2) - list size on disk of each column @@ -356,13 +356,76 @@ ______________________________________________________________________ ## Java Merge -The merge command can merge multiple ORC files that all have the same schema into a single ORC file. +The merge command can merge multiple ORC files that all have the same schema. By default +it writes a single output file. If `--maxSize` is set, `--output` is treated as a directory +and the tool writes multiple part files (`part-00000.orc`, `part-00001.orc`, …) under it. +Input files are sorted by path for deterministic output and grouped using their on-disk +sizes so that each part’s total input size does not exceed the given threshold (a single +input file larger than the threshold is still merged into its own part). + +For the multi-file output modes (`--maxSize` and `--preserveStructure`) the tool refuses +to run when `--output` points to a non-empty existing directory, so that existing data is +not silently destroyed. Use `--overwrite` to delete the directory's current contents +before writing new part files. + +`-h,--help` + : Print help + +`-i,--ignoreExtension` + : Include files that do not end in `.orc` + +`-m,--maxSize ` + : Maximum size in bytes for each output part; enables multi-file output under `--output`. + Must be a positive integer — a value of `0`, a negative value, or a non-numeric value + causes the tool to exit with an error. + +`-o,--output ` + : Output ORC filename (single-file mode) or output directory (when `--maxSize` or + `--preserveStructure` is set) + +`--overwrite` + : If the output directory already exists and is non-empty, delete its contents + before writing merged part files. Only applies to multi-file output modes + (`--maxSize` and `--preserveStructure`). Without this flag, the tool aborts + with an error when the output directory is non-empty so that existing data + is not silently destroyed. Intentionally provided as long-form only to avoid + confusion with `-o,--output`. + +`-p,--preserveStructure` + : Mirror the input directory structure under `--output`. Every directory that directly + contains ORC files (a "leaf" directory, e.g. a Hive partition path such as + `d=2025-04-25/h=01`) is merged independently and written to the corresponding relative + path under `--output`. Works with any nesting depth. Requires exactly one input + directory. Hidden files/directories (names starting with `_` or `.`, such as + `_SUCCESS` or `_temporary`) are always skipped to match Hive/Spark conventions. A + directory that contains both ORC files and subdirectories is rejected. + +Merge into one ORC file: ~~~ shell % java -jar orc-tools-X.Y.Z-uber.jar merge --output /path/to/merged.orc /path/to/input_orc/ ______________________________________________________________________ ~~~ +Merge into multiple ORC files under a directory (each part bounded by size): + +~~~ shell +% java -jar orc-tools-X.Y.Z-uber.jar merge --output /path/to/out_dir/ --maxSize 1073741824 /path/to/input_orc/ +______________________________________________________________________ +~~~ + +Merge a partitioned input tree while preserving the directory structure (each leaf +partition is merged independently; `--maxSize` is optional): + +~~~ shell +% java -jar orc-tools-X.Y.Z-uber.jar merge \ + --output /path/to/out_dir/ \ + --preserveStructure \ + --maxSize 2147483648 \ + /warehouse/db/table/d=2025-04-25/ +______________________________________________________________________ +~~~ + ## Java Version The version command prints the version of this ORC tool.