From f360476068df23548b9c0b0c09b6d16cdc528ba7 Mon Sep 17 00:00:00 2001 From: Guotao Yu Date: Tue, 12 May 2026 14:53:04 +0800 Subject: [PATCH] feat(manifest): add ManifestFilterManager and ManifestMergeManager Implement two manifest management classes for table write operations: - ManifestFilterManager: filters manifest entries by row filter expression, file path, or partition value; supports FailMissingDeletePaths validation. Rewrites manifests that contain matching files, marking entries as DELETED; passes through manifests that cannot contain matching files unchanged. - ManifestMergeManager: merges small manifests using greedy bin-packing, grouping by partition_spec_id (manifests with different specs are never merged). Oversized manifests pass through unchanged. ADDED entries from prior manifests become EXISTING when merged (matching Java semantics). --- src/iceberg/CMakeLists.txt | 2 + .../manifest/manifest_filter_manager.cc | 248 ++++++++++++++++ .../manifest/manifest_filter_manager.h | 171 +++++++++++ .../manifest/manifest_merge_manager.cc | 160 +++++++++++ src/iceberg/manifest/manifest_merge_manager.h | 103 +++++++ src/iceberg/manifest/meson.build | 2 + src/iceberg/meson.build | 2 + src/iceberg/schema.cc | 2 + src/iceberg/schema.h | 2 + src/iceberg/test/CMakeLists.txt | 2 + .../test/manifest_filter_manager_test.cc | 270 ++++++++++++++++++ .../test/manifest_merge_manager_test.cc | 244 ++++++++++++++++ 12 files changed, 1208 insertions(+) create mode 100644 src/iceberg/manifest/manifest_filter_manager.cc create mode 100644 src/iceberg/manifest/manifest_filter_manager.h create mode 100644 src/iceberg/manifest/manifest_merge_manager.cc create mode 100644 src/iceberg/manifest/manifest_merge_manager.h create mode 100644 src/iceberg/test/manifest_filter_manager_test.cc create mode 100644 src/iceberg/test/manifest_merge_manager_test.cc diff --git a/src/iceberg/CMakeLists.txt b/src/iceberg/CMakeLists.txt index c4e193b89..76407a94c 100644 --- a/src/iceberg/CMakeLists.txt +++ b/src/iceberg/CMakeLists.txt @@ -45,8 +45,10 @@ set(ICEBERG_SOURCES location_provider.cc manifest/manifest_adapter.cc manifest/manifest_entry.cc + manifest/manifest_filter_manager.cc manifest/manifest_group.cc manifest/manifest_list.cc + manifest/manifest_merge_manager.cc manifest/manifest_reader.cc manifest/manifest_util.cc manifest/manifest_writer.cc diff --git a/src/iceberg/manifest/manifest_filter_manager.cc b/src/iceberg/manifest/manifest_filter_manager.cc new file mode 100644 index 000000000..43eafb593 --- /dev/null +++ b/src/iceberg/manifest/manifest_filter_manager.cc @@ -0,0 +1,248 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +#include "iceberg/manifest/manifest_filter_manager.h" + +#include +#include +#include + +#include "iceberg/expression/inclusive_metrics_evaluator.h" +#include "iceberg/expression/manifest_evaluator.h" +#include "iceberg/manifest/manifest_entry.h" +#include "iceberg/manifest/manifest_list.h" +#include "iceberg/manifest/manifest_reader.h" +#include "iceberg/result.h" +#include "iceberg/snapshot.h" +#include "iceberg/table_metadata.h" +#include "iceberg/util/macros.h" + +namespace iceberg { + +ManifestFilterManager::ManifestFilterManager(ManifestContent content, + std::shared_ptr file_io) + : manifest_content_(content), file_io_(std::move(file_io)) {} + +void ManifestFilterManager::DeleteByRowFilter(std::shared_ptr expr, + bool case_sensitive) { + delete_exprs_.push_back({.expr = std::move(expr), .case_sensitive = case_sensitive}); +} + +void ManifestFilterManager::DeleteFile(std::string_view path) { + std::string p(path); + delete_paths_.insert(p); + pending_paths_.insert(std::move(p)); +} + +void ManifestFilterManager::DropPartition(int32_t spec_id, PartitionValues partition) { + drop_partitions_.add(spec_id, std::move(partition)); +} + +void ManifestFilterManager::FailMissingDeletePaths() { + fail_missing_delete_paths_ = true; +} + +bool ManifestFilterManager::DeletesFiles() const { + return !delete_exprs_.empty() || !delete_paths_.empty() || !drop_partitions_.empty(); +} + +bool ManifestFilterManager::CanContainDroppedFiles() const { + return !delete_paths_.empty(); +} + +bool ManifestFilterManager::CanContainDroppedPartitions(const ManifestFile& manifest) { + if (drop_partitions_.empty()) return false; + // Only manifests whose partition spec matches a registered drop can contain + // entries for that partition. PartitionKey is pair. + int32_t spec_id = manifest.partition_spec_id; + for (const auto& key : drop_partitions_) { + if (key.first == spec_id) return true; + } + return false; +} + +bool ManifestFilterManager::CanContainExpressionDeletes(const ManifestFile& manifest, + const TableMetadata& metadata) { + if (delete_exprs_.empty()) return false; + int32_t spec_id = manifest.partition_spec_id; + for (const auto& delete_expr : delete_exprs_) { + auto* evaluator_ptr = + GetManifestEvaluator(metadata, spec_id, delete_expr).value_or(nullptr); + if (evaluator_ptr == nullptr) return true; // conservative on error + auto result = evaluator_ptr->Evaluate(manifest); + if (!result.has_value() || result.value()) return true; + } + return false; +} + +bool ManifestFilterManager::CanContainDeletedFiles(const ManifestFile& manifest, + const TableMetadata& metadata) { + // A manifest with no live files cannot contain files to delete. + bool has_live = (manifest.added_files_count.value_or(0) > 0) || + (manifest.existing_files_count.value_or(0) > 0); + if (!has_live) return false; + + return CanContainDroppedFiles() || CanContainExpressionDeletes(manifest, metadata) || + CanContainDroppedPartitions(manifest); +} + +Result ManifestFilterManager::GetManifestEvaluator( + const TableMetadata& metadata, int32_t spec_id, const DeleteExpr& de) { + auto& vec = manifest_evaluator_cache_[spec_id]; + size_t idx = &de - delete_exprs_.data(); + if (idx >= vec.size()) { + vec.resize(delete_exprs_.size()); + } + if (!vec[idx]) { + ICEBERG_ASSIGN_OR_RAISE(auto spec, metadata.PartitionSpecById(spec_id)); + ICEBERG_ASSIGN_OR_RAISE(auto schema, metadata.Schema()); + ICEBERG_ASSIGN_OR_RAISE(vec[idx], ManifestEvaluator::MakeRowFilter( + de.expr, spec, *schema, de.case_sensitive)); + } + return vec[idx].get(); +} + +Result ManifestFilterManager::GetMetricsEvaluator( + const TableMetadata& metadata, int32_t spec_id, const DeleteExpr& de) { + auto& vec = metrics_evaluator_cache_[spec_id]; + size_t idx = &de - delete_exprs_.data(); + if (idx >= vec.size()) { + vec.resize(delete_exprs_.size()); + } + if (!vec[idx]) { + ICEBERG_ASSIGN_OR_RAISE(auto schema, metadata.Schema()); + ICEBERG_ASSIGN_OR_RAISE( + vec[idx], InclusiveMetricsEvaluator::Make(de.expr, *schema, de.case_sensitive)); + } + return vec[idx].get(); +} + +Result ManifestFilterManager::ShouldDelete(const ManifestEntry& entry, + const TableMetadata& metadata, + int32_t manifest_spec_id) { + if (!entry.data_file) return false; + const DataFile& file = *entry.data_file; + + // Path-based check + if (delete_paths_.count(file.file_path)) { + pending_paths_.erase(file.file_path); + return true; + } + + // Partition-drop check + int32_t spec_id = file.partition_spec_id.value_or(manifest_spec_id); + if (drop_partitions_.contains(spec_id, file.partition)) { + return true; + } + + // Expression-based check (inclusive metrics) + for (const auto& de : delete_exprs_) { + ICEBERG_ASSIGN_OR_RAISE(auto* eval, GetMetricsEvaluator(metadata, spec_id, de)); + ICEBERG_ASSIGN_OR_RAISE(auto matches, eval->Evaluate(file)); + if (matches) return true; + } + + return false; +} + +Result> ManifestFilterManager::FilterManifests( + const TableMetadata& metadata, const std::shared_ptr& base_snapshot, + const ManifestWriterFactory& writer_factory) { + // No base snapshot → nothing to filter + if (!base_snapshot) return std::vector{}; + + // Load the relevant manifests from the manifest list + ICEBERG_ASSIGN_OR_RAISE( + auto list_reader, ManifestListReader::Make(base_snapshot->manifest_list, file_io_)); + ICEBERG_ASSIGN_OR_RAISE(auto all_manifests, list_reader->Files()); + + // Keep only manifests for this manager's content type + std::vector manifests; + manifests.reserve(all_manifests.size()); + for (const auto& m : all_manifests) { + if (m.content == manifest_content_) manifests.push_back(m); + } + + // No conditions registered → return unchanged + if (!DeletesFiles()) return manifests; + + ICEBERG_ASSIGN_OR_RAISE(auto schema, metadata.Schema()); + + std::vector result; + result.reserve(manifests.size()); + + for (const auto& manifest : manifests) { + // Fast path: metadata skip + if (!CanContainDeletedFiles(manifest, metadata)) { + result.push_back(manifest); + continue; + } + + int32_t spec_id = manifest.partition_spec_id; + ICEBERG_ASSIGN_OR_RAISE(auto spec, metadata.PartitionSpecById(spec_id)); + + // Read all live entries from the manifest + ICEBERG_ASSIGN_OR_RAISE(auto reader, + ManifestReader::Make(manifest, file_io_, schema, spec)); + ICEBERG_ASSIGN_OR_RAISE(auto entries, reader->LiveEntries()); + + // Check whether any entry should be deleted + bool has_deletes = false; + for (const auto& entry : entries) { + ICEBERG_ASSIGN_OR_RAISE(auto should_delete, ShouldDelete(entry, metadata, spec_id)); + if (should_delete) { + has_deletes = true; + break; + } + } + + if (!has_deletes) { + result.push_back(manifest); + continue; + } + + // Rewrite the manifest with deleted entries marked + ICEBERG_ASSIGN_OR_RAISE(auto writer, writer_factory(spec_id, manifest_content_)); + for (const auto& entry : entries) { + ICEBERG_ASSIGN_OR_RAISE(auto should_delete, ShouldDelete(entry, metadata, spec_id)); + if (should_delete) { + ICEBERG_RETURN_UNEXPECTED(writer->WriteDeletedEntry(entry)); + } else { + ICEBERG_RETURN_UNEXPECTED(writer->WriteExistingEntry(entry)); + } + } + ICEBERG_RETURN_UNEXPECTED(writer->Close()); + ICEBERG_ASSIGN_OR_RAISE(auto filtered_manifest, writer->ToManifestFile()); + result.push_back(std::move(filtered_manifest)); + } + + // Validate that all registered delete paths were found + if (fail_missing_delete_paths_ && !pending_paths_.empty()) { + std::string missing; + for (const auto& p : pending_paths_) { + if (!missing.empty()) missing += ", "; + missing += p; + } + return InvalidArgument("Missing delete paths: {}", missing); + } + + return result; +} + +} // namespace iceberg diff --git a/src/iceberg/manifest/manifest_filter_manager.h b/src/iceberg/manifest/manifest_filter_manager.h new file mode 100644 index 000000000..f8aa15d9b --- /dev/null +++ b/src/iceberg/manifest/manifest_filter_manager.h @@ -0,0 +1,171 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +#pragma once + +/// \file iceberg/manifest/manifest_filter_manager.h +/// Filters an existing snapshot's manifest list, marking data files as DELETED +/// or EXISTING based on row-filter expressions, exact path deletes, and partition drops. + +#include +#include +#include +#include +#include +#include + +#include "iceberg/expression/inclusive_metrics_evaluator.h" +#include "iceberg/expression/manifest_evaluator.h" +#include "iceberg/iceberg_export.h" +#include "iceberg/manifest/manifest_list.h" +#include "iceberg/manifest/manifest_writer.h" +#include "iceberg/result.h" +#include "iceberg/type_fwd.h" +#include "iceberg/util/partition_value_util.h" + +namespace iceberg { + +/// \brief Factory type for creating ManifestWriter instances during filtering/merging. +/// +/// The factory receives the partition spec ID (to look up the spec) and the manifest +/// content type, and returns a new ManifestWriter ready for writing. The caller +/// (i.e. MergingSnapshotUpdate in PR2) captures metadata, FileIO, and snapshot ID +/// inside the lambda. +using ManifestWriterFactory = std::function>( + int32_t spec_id, ManifestContent content)>; + +/// \brief Filters an existing snapshot's manifest list. +/// +/// The manager accumulates delete conditions incrementally, then applies them all +/// at once in a single FilterManifests() call. Manifests that contain no deleted +/// entries are returned unchanged (no I/O). Manifests that do contain deleted +/// entries are rewritten with those entries marked DELETED. +/// +/// The manager is content-agnostic: pass ManifestContent::kData to process data +/// manifests, or ManifestContent::kDeletes to process delete manifests. +/// +/// \note This class is non-copyable and non-movable. +class ICEBERG_EXPORT ManifestFilterManager { + public: + ManifestFilterManager(ManifestContent content, std::shared_ptr file_io); + + ManifestFilterManager(const ManifestFilterManager&) = delete; + ManifestFilterManager& operator=(const ManifestFilterManager&) = delete; + + /// \brief Register a row-filter expression. + /// + /// Any manifest entry whose column metrics indicate the file may satisfy the + /// expression will be marked DELETED. + /// + /// \param expr The expression to match files against + /// \param case_sensitive Whether field name matching is case-sensitive + void DeleteByRowFilter(std::shared_ptr expr, bool case_sensitive = true); + + /// \brief Register an exact file path for deletion. + /// + /// Any manifest entry whose file_path matches this path will be marked DELETED. + /// + /// \param path The exact file path to delete + void DeleteFile(std::string_view path); + + /// \brief Register a partition for dropping. + /// + /// Any manifest entry whose (spec_id, partition) pair matches will be marked DELETED. + /// + /// \param spec_id The partition spec ID + /// \param partition The partition values to drop + void DropPartition(int32_t spec_id, PartitionValues partition); + + /// \brief Set a flag that makes FilterManifests() fail if any registered + /// delete path was not found in any manifest entry. + void FailMissingDeletePaths(); + + /// \brief Returns true if any delete condition has been registered. + bool DeletesFiles() const; + + /// \brief Apply all accumulated delete conditions to the base snapshot's manifests. + /// + /// Manifests that cannot possibly contain deleted files are returned unchanged. + /// Manifests that do contain deleted files are rewritten using writer_factory. + /// + /// \param metadata Table metadata (provides specs and schema for evaluators) + /// \param base_snapshot The snapshot whose manifests to filter (may be null) + /// \param writer_factory Factory to create new ManifestWriter instances + /// \return The filtered manifest list, or an error + Result> FilterManifests( + const TableMetadata& metadata, const std::shared_ptr& base_snapshot, + const ManifestWriterFactory& writer_factory); + + private: + struct DeleteExpr { + std::shared_ptr expr; + bool case_sensitive; + }; + + /// \brief Returns true if the manifest might contain files matching any expression. + bool CanContainExpressionDeletes(const ManifestFile& manifest, + const TableMetadata& metadata); + + /// \brief Returns true if the manifest might contain files in a dropped partition. + /// + /// Checks whether the manifest's partition_spec_id matches any spec_id registered + /// via DropPartition(). Manifests from a different spec cannot contain the dropped + /// partition values. A more precise implementation could also compare + /// partition_summaries bounds, but that requires decoding binary bounds against the + /// PartitionSpec, which is not yet available at this call site. + bool CanContainDroppedPartitions(const ManifestFile& manifest); + + /// \brief Returns true if the manifest might contain path-deleted files. + bool CanContainDroppedFiles() const; + + /// \brief Returns true if the manifest possibly contains any deleted file. + bool CanContainDeletedFiles(const ManifestFile& manifest, + const TableMetadata& metadata); + + /// \brief Get or create a ManifestEvaluator for the given spec and expression. + Result GetManifestEvaluator(const TableMetadata& metadata, + int32_t spec_id, const DeleteExpr& de); + + /// \brief Get or create an InclusiveMetricsEvaluator for the given spec and expression. + Result GetMetricsEvaluator(const TableMetadata& metadata, + int32_t spec_id, + const DeleteExpr& de); + + /// \brief Check whether a single entry should be deleted. + Result ShouldDelete(const ManifestEntry& entry, const TableMetadata& metadata, + int32_t manifest_spec_id); + + const ManifestContent manifest_content_; + std::shared_ptr file_io_; + + std::vector delete_exprs_; + std::unordered_set delete_paths_; + std::unordered_set pending_paths_; + PartitionSet drop_partitions_; + bool fail_missing_delete_paths_{false}; + + // Cache: (spec_id, expr_index) → ManifestEvaluator + std::unordered_map>> + manifest_evaluator_cache_; + // Cache: (spec_id, expr_index) → InclusiveMetricsEvaluator + std::unordered_map>> + metrics_evaluator_cache_; +}; + +} // namespace iceberg diff --git a/src/iceberg/manifest/manifest_merge_manager.cc b/src/iceberg/manifest/manifest_merge_manager.cc new file mode 100644 index 000000000..dd5c00928 --- /dev/null +++ b/src/iceberg/manifest/manifest_merge_manager.cc @@ -0,0 +1,160 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +#include "iceberg/manifest/manifest_merge_manager.h" + +#include +#include +#include +#include + +#include "iceberg/manifest/manifest_entry.h" +#include "iceberg/manifest/manifest_reader.h" +#include "iceberg/table_metadata.h" +#include "iceberg/util/macros.h" + +namespace iceberg { + +ManifestMergeManager::ManifestMergeManager(int64_t target_size_bytes, + int32_t min_count_to_merge, bool merge_enabled) + : target_size_bytes_(target_size_bytes), + min_count_to_merge_(min_count_to_merge), + merge_enabled_(merge_enabled) {} + +Result> ManifestMergeManager::MergeManifests( + const std::vector& existing_manifests, + const std::vector& new_manifests, int64_t snapshot_id, + const TableMetadata& metadata, std::shared_ptr file_io, + const ManifestWriterFactory& writer_factory) { + // Combine new then existing (new-first ordering is preserved in output) + std::vector all; + all.reserve(new_manifests.size() + existing_manifests.size()); + all.insert(all.end(), new_manifests.begin(), new_manifests.end()); + all.insert(all.end(), existing_manifests.begin(), existing_manifests.end()); + + if (!merge_enabled_ || std::cmp_less(all.size(), min_count_to_merge_)) { + return all; + } + + // The first (newest) manifest governs the per-bin minCountToMerge check. + const ManifestFile& first = all[0]; + + // Group manifests by partition_spec_id — never merge across specs + std::map> by_spec; + for (const auto& m : all) { + by_spec[m.partition_spec_id].push_back(m); + } + + std::vector result; + result.reserve(all.size()); + for (auto& [spec_id, group] : by_spec) { + ICEBERG_ASSIGN_OR_RAISE(auto merged, MergeGroup(group, first, snapshot_id, metadata, + file_io, writer_factory)); + result.insert(result.end(), std::make_move_iterator(merged.begin()), + std::make_move_iterator(merged.end())); + } + return result; +} + +Result> ManifestMergeManager::MergeGroup( + const std::vector& group, const ManifestFile& first, + int64_t snapshot_id, const TableMetadata& metadata, std::shared_ptr file_io, + const ManifestWriterFactory& writer_factory) { + // Collect bins using greedy first-fit packing. + std::vector> bins; + std::vector current_bin; + int64_t bin_size = 0; + + std::vector result; + + for (const auto& manifest : group) { + if (manifest.manifest_length >= target_size_bytes_) { + // Oversized manifest passes through unchanged without affecting the current bin. + result.push_back(manifest); + } else if (bin_size + manifest.manifest_length > target_size_bytes_ && + !current_bin.empty()) { + bins.push_back(std::move(current_bin)); + current_bin = {manifest}; + bin_size = manifest.manifest_length; + } else { + current_bin.push_back(manifest); + bin_size += manifest.manifest_length; + } + } + if (!current_bin.empty()) { + bins.push_back(std::move(current_bin)); + } + + // Process each bin: if the bin contains the newest manifest and is too small, + // pass its contents through unchanged (mirrors Java's minCountToMerge logic). + for (auto& bin : bins) { + bool contains_first = std::ranges::find(bin, first) != bin.end(); + if (contains_first && std::cmp_less(bin.size(), min_count_to_merge_)) { + result.insert(result.end(), bin.begin(), bin.end()); + } else { + ICEBERG_ASSIGN_OR_RAISE( + auto merged, FlushBin(bin, snapshot_id, metadata, file_io, writer_factory)); + result.push_back(std::move(merged)); + } + } + + return result; +} + +Result ManifestMergeManager::FlushBin( + const std::vector& bin, int64_t snapshot_id, + const TableMetadata& metadata, std::shared_ptr file_io, + const ManifestWriterFactory& writer_factory) { + // A single-manifest bin requires no merging. + if (bin.size() == 1) return bin[0]; + + const ManifestFile& first = bin[0]; + int32_t spec_id = first.partition_spec_id; + + ICEBERG_ASSIGN_OR_RAISE(auto schema, metadata.Schema()); + ICEBERG_ASSIGN_OR_RAISE(auto spec, metadata.PartitionSpecById(spec_id)); + + ICEBERG_ASSIGN_OR_RAISE(auto writer, writer_factory(spec_id, first.content)); + + for (const auto& manifest : bin) { + ICEBERG_ASSIGN_OR_RAISE(auto reader, + ManifestReader::Make(manifest, file_io, schema, spec)); + ICEBERG_ASSIGN_OR_RAISE(auto entries, reader->Entries()); + for (const auto& entry : entries) { + bool is_current = + entry.snapshot_id.has_value() && entry.snapshot_id.value() == snapshot_id; + if (entry.status == ManifestStatus::kDeleted) { + // Carry forward only the current snapshot's deletes; drop older tombstones. + if (is_current) { + ICEBERG_RETURN_UNEXPECTED(writer->WriteDeletedEntry(entry)); + } + } else if (entry.status == ManifestStatus::kAdded && is_current) { + // Files added by the current snapshot retain their ADDED status. + ICEBERG_RETURN_UNEXPECTED(writer->WriteAddedEntry(entry)); + } else { + // Files added by prior snapshots (ADDED or EXISTING) become EXISTING. + ICEBERG_RETURN_UNEXPECTED(writer->WriteExistingEntry(entry)); + } + } + } + + ICEBERG_RETURN_UNEXPECTED(writer->Close()); + return writer->ToManifestFile(); +} +} // namespace iceberg diff --git a/src/iceberg/manifest/manifest_merge_manager.h b/src/iceberg/manifest/manifest_merge_manager.h new file mode 100644 index 000000000..7b00cca72 --- /dev/null +++ b/src/iceberg/manifest/manifest_merge_manager.h @@ -0,0 +1,103 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +#pragma once + +/// \file iceberg/manifest/manifest_merge_manager.h +/// Merges small manifests into fewer larger ones according to table properties. + +#include +#include +#include + +#include "iceberg/iceberg_export.h" +#include "iceberg/manifest/manifest_filter_manager.h" +#include "iceberg/manifest/manifest_list.h" +#include "iceberg/result.h" +#include "iceberg/type_fwd.h" + +namespace iceberg { + +/// \brief Merges small manifests into larger ones using greedy bin-packing. +/// +/// Manifests are grouped by partition_spec_id before merging; manifests with +/// different spec IDs are never merged together. Within a group, manifests are +/// accumulated into bins until a bin would exceed target_size_bytes, at which +/// point the bin is flushed (written) and a new one started. Manifests already +/// larger than target_size_bytes pass through unchanged. +/// +/// \note This class is non-copyable and non-movable. +class ICEBERG_EXPORT ManifestMergeManager { + public: + /// \brief Construct a merge manager with the given configuration. + /// + /// \param target_size_bytes Target output manifest size in bytes + /// \param min_count_to_merge Minimum number of manifests before any merging occurs + /// \param merge_enabled Whether merging is enabled at all + ManifestMergeManager(int64_t target_size_bytes, int32_t min_count_to_merge, + bool merge_enabled); + + ManifestMergeManager(const ManifestMergeManager&) = delete; + ManifestMergeManager& operator=(const ManifestMergeManager&) = delete; + + /// \brief Merge existing and new manifests according to configured thresholds. + /// + /// \param existing_manifests Manifests already in the base snapshot + /// \param new_manifests Newly written manifests to incorporate + /// \param snapshot_id The ID of the snapshot being committed. Used to preserve + /// ADDED/DELETED status for entries written by this snapshot and to suppress + /// stale DELETED tombstones from prior snapshots. + /// \param metadata Table metadata (provides specs and schema for readers) + /// \param file_io File IO used to open existing manifests for reading + /// \param writer_factory Factory to create new ManifestWriter instances + /// \return The merged manifest list, or an error + Result> MergeManifests( + const std::vector& existing_manifests, + const std::vector& new_manifests, int64_t snapshot_id, + const TableMetadata& metadata, std::shared_ptr file_io, + const ManifestWriterFactory& writer_factory); + + private: + /// \brief Merge a group of manifests sharing the same spec_id. + /// + /// \param first The overall first (newest) manifest across all groups, used to + /// apply the minCountToMerge threshold on the bin that contains it. + Result> MergeGroup( + const std::vector& group, const ManifestFile& first, + int64_t snapshot_id, const TableMetadata& metadata, std::shared_ptr file_io, + const ManifestWriterFactory& writer_factory); + + /// \brief Write a merged manifest from all manifests in a bin. + /// + /// Entries are written snapshot-aware: + /// - ADDED from \p snapshot_id → WriteAddedEntry (preserve status) + /// - DELETED from \p snapshot_id → WriteDeletedEntry (preserve tombstone) + /// - DELETED from older snapshots → dropped (stale tombstones are not carried forward) + /// - All other entries → WriteExistingEntry + Result FlushBin(const std::vector& bin, int64_t snapshot_id, + const TableMetadata& metadata, + std::shared_ptr file_io, + const ManifestWriterFactory& writer_factory); + + const int64_t target_size_bytes_; + const int32_t min_count_to_merge_; + const bool merge_enabled_; +}; + +} // namespace iceberg diff --git a/src/iceberg/manifest/meson.build b/src/iceberg/manifest/meson.build index 41e685ffc..d4b039a67 100644 --- a/src/iceberg/manifest/meson.build +++ b/src/iceberg/manifest/meson.build @@ -18,8 +18,10 @@ install_headers( [ 'manifest_entry.h', + 'manifest_filter_manager.h', 'manifest_group.h', 'manifest_list.h', + 'manifest_merge_manager.h', 'manifest_reader.h', 'manifest_writer.h', 'rolling_manifest_writer.h', diff --git a/src/iceberg/meson.build b/src/iceberg/meson.build index c2947f3fe..8136f46ac 100644 --- a/src/iceberg/meson.build +++ b/src/iceberg/meson.build @@ -67,8 +67,10 @@ iceberg_sources = files( 'location_provider.cc', 'manifest/manifest_adapter.cc', 'manifest/manifest_entry.cc', + 'manifest/manifest_filter_manager.cc', 'manifest/manifest_group.cc', 'manifest/manifest_list.cc', + 'manifest/manifest_merge_manager.cc', 'manifest/manifest_reader.cc', 'manifest/manifest_util.cc', 'manifest/manifest_writer.cc', diff --git a/src/iceberg/schema.cc b/src/iceberg/schema.cc index 00905378a..5fdd47998 100644 --- a/src/iceberg/schema.cc +++ b/src/iceberg/schema.cc @@ -40,6 +40,8 @@ Schema::Schema(std::vector fields, int32_t schema_id) schema_id_(schema_id), cache_(std::make_unique(this)) {} +Schema::~Schema() = default; + Result> Schema::Make(std::vector fields, int32_t schema_id, std::vector identifier_field_ids) { diff --git a/src/iceberg/schema.h b/src/iceberg/schema.h index 3c84bc2af..791ed5c8f 100644 --- a/src/iceberg/schema.h +++ b/src/iceberg/schema.h @@ -57,6 +57,8 @@ class ICEBERG_EXPORT Schema : public StructType { explicit Schema(std::vector fields, int32_t schema_id = kInitialSchemaId); + ~Schema() override; + /// \brief Create a schema. /// /// \param fields The fields that make up the schema. diff --git a/src/iceberg/test/CMakeLists.txt b/src/iceberg/test/CMakeLists.txt index 1d80b29a5..ca77e4e2f 100644 --- a/src/iceberg/test/CMakeLists.txt +++ b/src/iceberg/test/CMakeLists.txt @@ -180,6 +180,7 @@ if(ICEBERG_BUILD_BUNDLE) delete_file_index_test.cc manifest_group_test.cc manifest_list_versions_test.cc + manifest_merge_manager_test.cc manifest_reader_stats_test.cc manifest_reader_test.cc manifest_writer_versions_test.cc @@ -205,6 +206,7 @@ if(ICEBERG_BUILD_BUNDLE) SOURCES expire_snapshots_test.cc fast_append_test.cc + manifest_filter_manager_test.cc name_mapping_update_test.cc snapshot_manager_test.cc transaction_test.cc diff --git a/src/iceberg/test/manifest_filter_manager_test.cc b/src/iceberg/test/manifest_filter_manager_test.cc new file mode 100644 index 000000000..8831b6a07 --- /dev/null +++ b/src/iceberg/test/manifest_filter_manager_test.cc @@ -0,0 +1,270 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +#include "iceberg/manifest/manifest_filter_manager.h" + +#include +#include +#include + +#include +#include + +#include "iceberg/avro/avro_register.h" +#include "iceberg/expression/expression.h" +#include "iceberg/expression/expressions.h" +#include "iceberg/manifest/manifest_entry.h" +#include "iceberg/manifest/manifest_reader.h" +#include "iceberg/manifest/manifest_writer.h" +#include "iceberg/partition_spec.h" +#include "iceberg/result.h" +#include "iceberg/row/partition_values.h" +#include "iceberg/schema.h" +#include "iceberg/table.h" +#include "iceberg/table_metadata.h" +#include "iceberg/test/matchers.h" +#include "iceberg/test/update_test_base.h" +#include "iceberg/update/fast_append.h" +#include "iceberg/util/macros.h" + +namespace iceberg { + +class ManifestFilterManagerTest : public MinimalUpdateTestBase { + protected: + static void SetUpTestSuite() { avro::RegisterAll(); } + + void SetUp() override { + MinimalUpdateTestBase::SetUp(); + + ICEBERG_UNWRAP_OR_FAIL(spec_, table_->spec()); + ICEBERG_UNWRAP_OR_FAIL(schema_, table_->schema()); + + // Two files in different partitions (identity(x)) + file_a_ = MakeDataFile("/data/file_a.parquet", /*partition_x=*/1L); + file_b_ = MakeDataFile("/data/file_b.parquet", /*partition_x=*/2L); + } + + std::shared_ptr MakeDataFile(const std::string& path, int64_t partition_x) { + auto f = std::make_shared(); + f->content = DataFile::Content::kData; + f->file_path = table_location_ + path; + f->file_format = FileFormatType::kParquet; + f->partition = PartitionValues(std::vector{Literal::Long(partition_x)}); + f->file_size_in_bytes = 1024; + f->record_count = 100; + f->partition_spec_id = spec_->spec_id(); + return f; + } + + // Append files, commit, refresh, and return the current snapshot. + Result> CommitFiles( + std::vector> files) { + ICEBERG_ASSIGN_OR_RAISE(auto fa, table_->NewFastAppend()); + for (const auto& f : files) fa->AppendFile(f); + ICEBERG_RETURN_UNEXPECTED(fa->Commit()); + ICEBERG_RETURN_UNEXPECTED(table_->Refresh()); + return table_->current_snapshot(); + } + + ManifestWriterFactory MakeWriterFactory(const TableMetadata& metadata) { + auto fv = metadata.format_version; + return [this, fv, &metadata](int32_t spec_id, ManifestContent content) mutable + -> Result> { + ICEBERG_ASSIGN_OR_RAISE(auto spec, metadata.PartitionSpecById(spec_id)); + ICEBERG_ASSIGN_OR_RAISE(auto schema, metadata.Schema()); + auto path = + std::format("{}/metadata/flt-{}.avro", table_location_, manifest_counter_++); + return ManifestWriter::MakeWriter(fv, kTestSnapshotId, path, file_io_, spec, schema, + content); + }; + } + + // Read all entries from a list of ManifestFiles. + Result> ReadAllEntries( + const std::vector& manifests, const TableMetadata& metadata) { + std::vector result; + for (const auto& m : manifests) { + ICEBERG_ASSIGN_OR_RAISE(auto spec, metadata.PartitionSpecById(m.partition_spec_id)); + ICEBERG_ASSIGN_OR_RAISE(auto schema, metadata.Schema()); + ICEBERG_ASSIGN_OR_RAISE(auto reader, + ManifestReader::Make(m, file_io_, schema, spec)); + ICEBERG_ASSIGN_OR_RAISE(auto entries, reader->Entries()); + result.insert(result.end(), entries.begin(), entries.end()); + } + return result; + } + + static constexpr int64_t kTestSnapshotId = 55555L; + int manifest_counter_ = 0; + std::shared_ptr spec_; + std::shared_ptr schema_; + std::shared_ptr file_a_; + std::shared_ptr file_b_; +}; + +TEST_F(ManifestFilterManagerTest, NullSnapshotReturnsEmpty) { + ManifestFilterManager mgr(ManifestContent::kData, file_io_); + auto* metadata = table_->metadata().get(); + auto factory = MakeWriterFactory(*metadata); + ICEBERG_UNWRAP_OR_FAIL(auto result, mgr.FilterManifests(*metadata, nullptr, factory)); + EXPECT_TRUE(result.empty()); +} + +TEST_F(ManifestFilterManagerTest, DeletesFilesReturnsCorrectState) { + ManifestFilterManager mgr(ManifestContent::kData, file_io_); + EXPECT_FALSE(mgr.DeletesFiles()); + mgr.DeleteFile("/some/path.parquet"); + EXPECT_TRUE(mgr.DeletesFiles()); +} + +TEST_F(ManifestFilterManagerTest, NoConditionsReturnsManifestsUnchanged) { + ICEBERG_UNWRAP_OR_FAIL(auto snap, CommitFiles({file_a_, file_b_})); + auto* metadata = table_->metadata().get(); + auto factory = MakeWriterFactory(*metadata); + + // Load original manifests so we can compare paths + ICEBERG_UNWRAP_OR_FAIL(auto list_reader, + ManifestListReader::Make(snap->manifest_list, file_io_)); + ICEBERG_UNWRAP_OR_FAIL(auto orig_manifests, list_reader->Files()); + + ManifestFilterManager mgr(ManifestContent::kData, file_io_); + ICEBERG_UNWRAP_OR_FAIL(auto result, mgr.FilterManifests(*metadata, snap, factory)); + + ASSERT_EQ(result.size(), orig_manifests.size()); + for (size_t i = 0; i < result.size(); ++i) { + // No rewrite → same manifest path + EXPECT_EQ(result[i].manifest_path, orig_manifests[i].manifest_path); + } +} + +TEST_F(ManifestFilterManagerTest, DeleteFileByPath) { + ICEBERG_UNWRAP_OR_FAIL(auto snap, CommitFiles({file_a_, file_b_})); + auto* metadata = table_->metadata().get(); + auto factory = MakeWriterFactory(*metadata); + + ManifestFilterManager mgr(ManifestContent::kData, file_io_); + mgr.DeleteFile(file_a_->file_path); + + ICEBERG_UNWRAP_OR_FAIL(auto result, mgr.FilterManifests(*metadata, snap, factory)); + + ICEBERG_UNWRAP_OR_FAIL(auto entries, ReadAllEntries(result, *metadata)); + int deleted_count = 0; + int live_count = 0; + for (const auto& e : entries) { + if (e.status == ManifestStatus::kDeleted) { + ++deleted_count; + ASSERT_NE(e.data_file, nullptr); + EXPECT_EQ(e.data_file->file_path, file_a_->file_path); + } else { + ++live_count; + } + } + EXPECT_EQ(deleted_count, 1); + EXPECT_EQ(live_count, 1); +} + +TEST_F(ManifestFilterManagerTest, RowFilterAlwaysTrueDeletesAll) { + ICEBERG_UNWRAP_OR_FAIL(auto snap, CommitFiles({file_a_, file_b_})); + auto* metadata = table_->metadata().get(); + auto factory = MakeWriterFactory(*metadata); + + ManifestFilterManager mgr(ManifestContent::kData, file_io_); + mgr.DeleteByRowFilter(Expressions::AlwaysTrue()); + + ICEBERG_UNWRAP_OR_FAIL(auto result, mgr.FilterManifests(*metadata, snap, factory)); + + ICEBERG_UNWRAP_OR_FAIL(auto entries, ReadAllEntries(result, *metadata)); + for (const auto& e : entries) { + EXPECT_EQ(e.status, ManifestStatus::kDeleted) << "Expected all entries to be DELETED"; + } +} + +TEST_F(ManifestFilterManagerTest, RowFilterAlwaysFalseDeletesNone) { + ICEBERG_UNWRAP_OR_FAIL(auto snap, CommitFiles({file_a_, file_b_})); + auto* metadata = table_->metadata().get(); + auto factory = MakeWriterFactory(*metadata); + + ManifestFilterManager mgr(ManifestContent::kData, file_io_); + mgr.DeleteByRowFilter(Expressions::AlwaysFalse()); + + ICEBERG_UNWRAP_OR_FAIL(auto result, mgr.FilterManifests(*metadata, snap, factory)); + + ICEBERG_UNWRAP_OR_FAIL(auto entries, ReadAllEntries(result, *metadata)); + for (const auto& e : entries) { + // AlwaysFalse means nothing can match → entries remain ADDED or EXISTING + EXPECT_NE(e.status, ManifestStatus::kDeleted) << "Expected no entries to be DELETED"; + } +} + +TEST_F(ManifestFilterManagerTest, DropPartition) { + ICEBERG_UNWRAP_OR_FAIL(auto snap, CommitFiles({file_a_, file_b_})); + auto* metadata = table_->metadata().get(); + auto factory = MakeWriterFactory(*metadata); + + // Drop partition of file_a (partition_x = 1) + ManifestFilterManager mgr(ManifestContent::kData, file_io_); + mgr.DropPartition(spec_->spec_id(), + PartitionValues(std::vector{Literal::Long(1L)})); + + ICEBERG_UNWRAP_OR_FAIL(auto result, mgr.FilterManifests(*metadata, snap, factory)); + + ICEBERG_UNWRAP_OR_FAIL(auto entries, ReadAllEntries(result, *metadata)); + int deleted_count = 0; + for (const auto& e : entries) { + if (e.status == ManifestStatus::kDeleted) { + ++deleted_count; + ASSERT_TRUE(e.data_file != nullptr); + EXPECT_EQ(e.data_file->file_path, file_a_->file_path); + } + } + EXPECT_EQ(deleted_count, 1); +} + +TEST_F(ManifestFilterManagerTest, FailMissingDeletePathsReturnsError) { + ICEBERG_UNWRAP_OR_FAIL(auto snap, CommitFiles({file_a_, file_b_})); + auto* metadata = table_->metadata().get(); + auto factory = MakeWriterFactory(*metadata); + + ManifestFilterManager mgr(ManifestContent::kData, file_io_); + mgr.DeleteFile("/does/not/exist.parquet"); + mgr.FailMissingDeletePaths(); + + auto result = mgr.FilterManifests(*metadata, snap, factory); + EXPECT_THAT(result, IsError(ErrorKind::kInvalidArgument)); +} + +TEST_F(ManifestFilterManagerTest, MultipleConditionsOrCombined) { + ICEBERG_UNWRAP_OR_FAIL(auto snap, CommitFiles({file_a_, file_b_})); + auto* metadata = table_->metadata().get(); + auto factory = MakeWriterFactory(*metadata); + + // Both files should be deleted: file_a by path, file_b by AlwaysTrue expression + ManifestFilterManager mgr(ManifestContent::kData, file_io_); + mgr.DeleteFile(file_a_->file_path); + mgr.DeleteByRowFilter(Expressions::AlwaysTrue()); + + ICEBERG_UNWRAP_OR_FAIL(auto result, mgr.FilterManifests(*metadata, snap, factory)); + + ICEBERG_UNWRAP_OR_FAIL(auto entries, ReadAllEntries(result, *metadata)); + for (const auto& e : entries) { + EXPECT_EQ(e.status, ManifestStatus::kDeleted); + } +} + +} // namespace iceberg diff --git a/src/iceberg/test/manifest_merge_manager_test.cc b/src/iceberg/test/manifest_merge_manager_test.cc new file mode 100644 index 000000000..ee587c5c6 --- /dev/null +++ b/src/iceberg/test/manifest_merge_manager_test.cc @@ -0,0 +1,244 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +#include "iceberg/manifest/manifest_merge_manager.h" + +#include +#include +#include +#include + +#include +#include + +#include "iceberg/arrow/arrow_io_util.h" +#include "iceberg/avro/avro_register.h" +#include "iceberg/file_format.h" +#include "iceberg/manifest/manifest_entry.h" +#include "iceberg/manifest/manifest_filter_manager.h" +#include "iceberg/manifest/manifest_list.h" +#include "iceberg/manifest/manifest_reader.h" +#include "iceberg/manifest/manifest_writer.h" +#include "iceberg/partition_spec.h" +#include "iceberg/result.h" +#include "iceberg/row/partition_values.h" +#include "iceberg/schema.h" +#include "iceberg/schema_field.h" +#include "iceberg/sort_order.h" +#include "iceberg/table_metadata.h" +#include "iceberg/test/matchers.h" +#include "iceberg/transform.h" +#include "iceberg/type.h" +#include "iceberg/util/macros.h" + +namespace iceberg { + +namespace { + +constexpr int8_t kFormatVersion = 2; +constexpr int64_t kSnapshotId = 12345L; +constexpr int32_t kSpecId0 = 0; +constexpr int32_t kSpecId1 = 1; + +} // namespace + +class ManifestMergeManagerTest : public ::testing::Test { + protected: + static void SetUpTestSuite() { avro::RegisterAll(); } + + void SetUp() override { + file_io_ = arrow::MakeMockFileIO(); + + // Simple schema: one long column + schema_ = std::make_shared(std::vector{ + SchemaField::MakeRequired(1, "x", int64()), + }); + spec0_ = PartitionSpec::Make(kSpecId0, + {PartitionField(1, 1000, "x", Transform::Identity())}) + .value(); + spec1_ = PartitionSpec::Make( + kSpecId1, {PartitionField(1, 1001, "x_bucket", Transform::Bucket(8))}) + .value(); + + // Build minimal TableMetadata with both specs + auto builder = TableMetadataBuilder::BuildFromEmpty(kFormatVersion); + builder->SetCurrentSchema(schema_, schema_->HighestFieldId().value_or(0)); + builder->SetDefaultPartitionSpec(spec0_); + builder->AddPartitionSpec(spec1_); + builder->SetDefaultSortOrder(SortOrder::Unsorted()); + ICEBERG_UNWRAP_OR_FAIL(auto metadata, builder->Build()); + metadata_ = std::shared_ptr(std::move(metadata)); + } + + // Write a small manifest with N data files and return the ManifestFile descriptor. + Result WriteManifest(int32_t spec_id, int num_files, + int64_t file_size_override = 512) { + auto path = std::format("manifest-{}.avro", manifest_counter_++); + auto spec = spec_id == kSpecId0 ? spec0_ : spec1_; + ICEBERG_ASSIGN_OR_RAISE(auto writer, ManifestWriter::MakeWriter( + kFormatVersion, kSnapshotId, path, file_io_, + spec, schema_, ManifestContent::kData)); + for (int i = 0; i < num_files; ++i) { + auto f = std::make_shared(); + f->content = DataFile::Content::kData; + f->file_path = std::format("data/file-{}-{}.parquet", manifest_counter_, i); + f->file_format = FileFormatType::kParquet; + // Identity spec uses LONG partition values; Bucket spec uses INT + Literal part_val = (spec_id == kSpecId0) ? Literal::Long(i) : Literal::Int(i % 8); + f->partition = PartitionValues(std::vector{part_val}); + f->file_size_in_bytes = 1024; + f->record_count = 10; + f->partition_spec_id = spec_id; + ICEBERG_RETURN_UNEXPECTED(writer->WriteAddedEntry(f)); + } + ICEBERG_RETURN_UNEXPECTED(writer->Close()); + ICEBERG_ASSIGN_OR_RAISE(auto manifest_file, writer->ToManifestFile()); + // Override length so we can control bin-packing behaviour in tests + manifest_file.manifest_length = file_size_override; + return manifest_file; + } + + ManifestWriterFactory MakeWriterFactory() { + return [this](int32_t spec_id, + ManifestContent content) -> Result> { + ++factory_call_count_; + auto spec = spec_id == kSpecId0 ? spec0_ : spec1_; + auto path = std::format("merged-{}.avro", manifest_counter_++); + return ManifestWriter::MakeWriter(kFormatVersion, kSnapshotId, path, file_io_, spec, + schema_, content); + }; + } + + // Count total entries across all manifests. + Result CountEntries(const std::vector& manifests) { + int total = 0; + for (const auto& m : manifests) { + auto spec = m.partition_spec_id == kSpecId0 ? spec0_ : spec1_; + ICEBERG_ASSIGN_OR_RAISE(auto reader, + ManifestReader::Make(m, file_io_, schema_, spec)); + ICEBERG_ASSIGN_OR_RAISE(auto entries, reader->Entries()); + total += static_cast(entries.size()); + } + return total; + } + + std::shared_ptr file_io_; + std::shared_ptr schema_; + std::shared_ptr spec0_; + std::shared_ptr spec1_; + std::shared_ptr metadata_; + int manifest_counter_ = 0; + int factory_call_count_ = 0; +}; + +TEST_F(ManifestMergeManagerTest, MergeDisabled) { + ICEBERG_UNWRAP_OR_FAIL(auto m0, WriteManifest(kSpecId0, 1)); + ICEBERG_UNWRAP_OR_FAIL(auto m1, WriteManifest(kSpecId0, 1)); + ICEBERG_UNWRAP_OR_FAIL(auto m2, WriteManifest(kSpecId0, 1)); + + ManifestMergeManager mgr(/*target=*/1024, /*min_count=*/2, /*enabled=*/false); + ICEBERG_UNWRAP_OR_FAIL( + auto result, mgr.MergeManifests({m0, m1}, {m2}, kSnapshotId, *metadata_, file_io_, + MakeWriterFactory())); + // merge disabled → all 3 manifests returned, factory never called + EXPECT_EQ(result.size(), 3U); + EXPECT_EQ(factory_call_count_, 0); +} + +TEST_F(ManifestMergeManagerTest, BelowMinCountThreshold) { + ICEBERG_UNWRAP_OR_FAIL(auto m0, WriteManifest(kSpecId0, 1)); + ICEBERG_UNWRAP_OR_FAIL(auto m1, WriteManifest(kSpecId0, 1)); + + // min_count=3, only 2 manifests total → no merge + ManifestMergeManager mgr(/*target=*/1024, /*min_count=*/3, /*enabled=*/true); + ICEBERG_UNWRAP_OR_FAIL(auto result, + mgr.MergeManifests({m0}, {m1}, kSnapshotId, *metadata_, file_io_, + MakeWriterFactory())); + EXPECT_EQ(result.size(), 2U); + EXPECT_EQ(factory_call_count_, 0); +} + +TEST_F(ManifestMergeManagerTest, MergeOccursAtThreshold) { + // 3 small manifests (each 100 bytes), target=1024 → all fit in one bin + ICEBERG_UNWRAP_OR_FAIL(auto m0, WriteManifest(kSpecId0, 1, /*size=*/100)); + ICEBERG_UNWRAP_OR_FAIL(auto m1, WriteManifest(kSpecId0, 1, /*size=*/100)); + ICEBERG_UNWRAP_OR_FAIL(auto m2, WriteManifest(kSpecId0, 1, /*size=*/100)); + + ManifestMergeManager mgr(/*target=*/1024, /*min_count=*/3, /*enabled=*/true); + ICEBERG_UNWRAP_OR_FAIL( + auto result, mgr.MergeManifests({m0, m1}, {m2}, kSnapshotId, *metadata_, file_io_, + MakeWriterFactory())); + // All 3 merged into 1 manifest (total 3 entries) + EXPECT_EQ(result.size(), 1U); + ICEBERG_UNWRAP_OR_FAIL(auto count1, CountEntries(result)); + EXPECT_EQ(count1, 3); +} + +TEST_F(ManifestMergeManagerTest, OversizedManifestPassedThrough) { + // m_large exceeds target → must not be merged; m_small fits + ICEBERG_UNWRAP_OR_FAIL(auto m_large, WriteManifest(kSpecId0, 2, /*size=*/2000)); + ICEBERG_UNWRAP_OR_FAIL(auto m_small, WriteManifest(kSpecId0, 1, /*size=*/100)); + ICEBERG_UNWRAP_OR_FAIL(auto m_small2, WriteManifest(kSpecId0, 1, /*size=*/100)); + + ManifestMergeManager mgr(/*target=*/1024, /*min_count=*/2, /*enabled=*/true); + ICEBERG_UNWRAP_OR_FAIL(auto result, + mgr.MergeManifests({m_large, m_small}, {m_small2}, kSnapshotId, + *metadata_, file_io_, MakeWriterFactory())); + // m_large passes through; m_small and m_small2 merge into 1 + EXPECT_EQ(result.size(), 2U); + ICEBERG_UNWRAP_OR_FAIL(auto count2, CountEntries(result)); + EXPECT_EQ(count2, 4); // 2 + 1 + 1 +} + +TEST_F(ManifestMergeManagerTest, CrossSpecManifestsNotMerged) { + // Manifests with different spec IDs must never be merged together + ICEBERG_UNWRAP_OR_FAIL(auto m_spec0a, WriteManifest(kSpecId0, 1, /*size=*/100)); + ICEBERG_UNWRAP_OR_FAIL(auto m_spec0b, WriteManifest(kSpecId0, 1, /*size=*/100)); + ICEBERG_UNWRAP_OR_FAIL(auto m_spec1a, WriteManifest(kSpecId1, 1, /*size=*/100)); + ICEBERG_UNWRAP_OR_FAIL(auto m_spec1b, WriteManifest(kSpecId1, 1, /*size=*/100)); + + // With 4 manifests (target large enough for each pair), we get 2 merged outputs + ManifestMergeManager mgr(/*target=*/1024, /*min_count=*/2, /*enabled=*/true); + ICEBERG_UNWRAP_OR_FAIL( + auto result, + mgr.MergeManifests({m_spec0a, m_spec1a}, {m_spec0b, m_spec1b}, kSnapshotId, + *metadata_, file_io_, MakeWriterFactory())); + EXPECT_EQ(result.size(), 2U); + // Verify spec IDs are preserved per output manifest + for (const auto& m : result) { + EXPECT_THAT(m.partition_spec_id, ::testing::AnyOf(kSpecId0, kSpecId1)); + } +} + +TEST_F(ManifestMergeManagerTest, WriterFactoryCalledOncePerMergedManifest) { + // 4 small manifests in two groups → 2 merged outputs → factory called twice + ICEBERG_UNWRAP_OR_FAIL(auto m0, WriteManifest(kSpecId0, 1, /*size=*/100)); + ICEBERG_UNWRAP_OR_FAIL(auto m1, WriteManifest(kSpecId0, 1, /*size=*/100)); + ICEBERG_UNWRAP_OR_FAIL(auto m2, WriteManifest(kSpecId1, 1, /*size=*/100)); + ICEBERG_UNWRAP_OR_FAIL(auto m3, WriteManifest(kSpecId1, 1, /*size=*/100)); + + ManifestMergeManager mgr(/*target=*/1024, /*min_count=*/2, /*enabled=*/true); + ICEBERG_UNWRAP_OR_FAIL(auto result, + mgr.MergeManifests({m0, m2}, {m1, m3}, kSnapshotId, *metadata_, + file_io_, MakeWriterFactory())); + EXPECT_EQ(result.size(), 2U); + EXPECT_EQ(factory_call_count_, 2); +} + +} // namespace iceberg