diff --git a/src/iceberg/CMakeLists.txt b/src/iceberg/CMakeLists.txt index c4e193b89..79298b1a1 100644 --- a/src/iceberg/CMakeLists.txt +++ b/src/iceberg/CMakeLists.txt @@ -161,6 +161,7 @@ add_iceberg_lib(iceberg set(ICEBERG_DATA_SOURCES data/data_writer.cc + data/delete_filter.cc data/delete_loader.cc data/equality_delete_writer.cc data/position_delete_writer.cc diff --git a/src/iceberg/data/delete_filter.cc b/src/iceberg/data/delete_filter.cc new file mode 100644 index 000000000..876d644e5 --- /dev/null +++ b/src/iceberg/data/delete_filter.cc @@ -0,0 +1,790 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +#include "iceberg/data/delete_filter.h" + +#include +#include +#include +#include +#include +#include +#include +#include +#include + +#include "iceberg/manifest/manifest_entry.h" +#include "iceberg/metadata_columns.h" +#include "iceberg/result.h" +#include "iceberg/row/arrow_array_wrapper.h" +#include "iceberg/schema.h" +#include "iceberg/schema_field.h" +#include "iceberg/table_metadata.h" +#include "iceberg/type.h" +#include "iceberg/util/checked_cast.h" +#include "iceberg/util/macros.h" +#include "iceberg/util/struct_like_set.h" + +namespace iceberg { + +namespace { + +std::optional FindFieldIndexById(std::span fields, + int32_t field_id) { + for (size_t pos = 0; pos < fields.size(); ++pos) { + if (fields[pos].field_id() == field_id) { + return pos; + } + } + return std::nullopt; +} + +Result RequireFieldIndexById(std::span fields, + int32_t field_id, std::string_view context) { + auto pos = FindFieldIndexById(fields, field_id); + if (pos.has_value()) { + return pos.value(); + } + return InvalidSchema("Cannot find field id {} in {}", field_id, context); +} + +// Views a source row through the equality-delete key schema: fields are selected by +// field id, then exposed by position so StructLikeSet can compare only delete keys. +class ProjectedStructLike : public StructLike { + public: + struct ProjectedField; + using ProjectedSubFields = std::vector; + + struct ProjectedField { + int32_t field_id; + size_t source_field_pos; + std::shared_ptr nested_projected_fields; + }; + + explicit ProjectedStructLike(std::shared_ptr projected_fields) + : projected_fields_(std::move(projected_fields)) { + nested_projected_structs_.reserve(projected_fields_->size()); + for (const auto& projected_field : *projected_fields_) { + nested_projected_structs_.push_back( + projected_field.nested_projected_fields == nullptr + ? nullptr + : std::make_shared( + projected_field.nested_projected_fields)); + } + } + + /// \brief Build field-id based positions from the source row to the equality keys. + /// + /// \param source_type the schema of wrapped rows + /// \param target_type the key schema used by the equality-delete set + static Result> BuildProjection( + const StructType& source_type, const StructType& target_type) { + ProjectedSubFields projected_fields; + projected_fields.reserve(target_type.fields().size()); + for (const auto& target_field : target_type.fields()) { + ICEBERG_ASSIGN_OR_RAISE( + auto source_field_pos, + RequireFieldIndexById(source_type.fields(), target_field.field_id(), + "source projection")); + const auto& source_field = source_type.fields()[source_field_pos]; + + std::shared_ptr nested_projected_fields; + if (*source_field.type() != *target_field.type()) { + if (target_field.type()->type_id() == TypeId::kStruct && + source_field.type()->type_id() == TypeId::kStruct) { + ICEBERG_ASSIGN_OR_RAISE( + nested_projected_fields, + BuildProjection( + internal::checked_cast(*source_field.type()), + internal::checked_cast(*target_field.type()))); + } else if (target_field.type()->is_nested()) { + return NotSupported("Cannot project partial non-struct equality field id {}", + target_field.field_id()); + } + } + + projected_fields.push_back(ProjectedField{ + .field_id = target_field.field_id(), + .source_field_pos = source_field_pos, + .nested_projected_fields = std::move(nested_projected_fields), + }); + } + return std::make_shared(std::move(projected_fields)); + } + + void Wrap(const StructLike& row) { + owned_row_.reset(); + row_ = &row; + } + + void Wrap(std::shared_ptr row) { + owned_row_ = std::move(row); + row_ = owned_row_.get(); + } + + Result GetField(size_t pos) const override { + ICEBERG_PRECHECK(row_ != nullptr, "ProjectedStructLike has no wrapped row"); + if (pos >= projected_fields_->size()) { + return InvalidArgument("Projected field index {} out of range (size: {})", pos, + projected_fields_->size()); + } + + const auto& projected_field = (*projected_fields_)[pos]; + ICEBERG_ASSIGN_OR_RAISE(auto scalar, + row_->GetField(projected_field.source_field_pos)); + if (projected_field.nested_projected_fields == nullptr || + std::holds_alternative(scalar)) { + return scalar; + } + + if (!std::holds_alternative>(scalar)) { + return InvalidSchema("Expected struct field id {} while projecting equality row", + projected_field.field_id); + } + + auto child = std::get>(std::move(scalar)); + if (child == nullptr) { + return Scalar{std::monostate{}}; + } + + auto projected_struct = nested_projected_structs_[pos]; + projected_struct->Wrap(std::move(child)); + return Scalar{std::static_pointer_cast(std::move(projected_struct))}; + } + + size_t num_fields() const override { return projected_fields_->size(); } + + private: + std::shared_ptr owned_row_; + const StructLike* row_ = nullptr; + std::shared_ptr projected_fields_; + std::vector> nested_projected_structs_; +}; + +Status ValidateEqualityIds(const DataFile& delete_file) { + if (delete_file.equality_ids.empty()) { + return InvalidArgument("Equality delete file '{}' has no equality field ids", + delete_file.file_path); + } + return {}; +} + +SchemaField WithType(const SchemaField& field, std::shared_ptr type) { + return SchemaField{field.field_id(), std::string(field.name()), std::move(type), + field.optional(), std::string(field.doc())}; +} + +std::shared_ptr SortStructFieldsById(const std::shared_ptr& type) { + if (type->type_id() != TypeId::kStruct) { + return type; + } + + const auto& struct_type = internal::checked_cast(*type); + auto source_fields = struct_type.fields(); + std::vector> sorted_types; + sorted_types.reserve(source_fields.size()); + bool changed = false; + for (const auto& field : source_fields) { + auto sorted_type = SortStructFieldsById(field.type()); + changed = changed || sorted_type != field.type(); + sorted_types.push_back(std::move(sorted_type)); + } + + const bool needs_sort = + !std::ranges::is_sorted(source_fields, {}, &SchemaField::field_id); + if (!changed && !needs_sort) { + return type; + } + + std::vector fields; + fields.reserve(source_fields.size()); + for (size_t pos = 0; pos < source_fields.size(); ++pos) { + const auto& field = source_fields[pos]; + fields.push_back(sorted_types[pos] == field.type() + ? field + : WithType(field, std::move(sorted_types[pos]))); + } + + if (needs_sort) { + std::ranges::sort(fields, {}, &SchemaField::field_id); + } + return std::make_shared(std::move(fields)); +} + +void SortFieldsById(std::vector& fields) { + for (auto& field : fields) { + auto sorted_type = SortStructFieldsById(field.type()); + if (sorted_type != field.type()) { + field = WithType(field, std::move(sorted_type)); + } + } + std::ranges::sort(fields, {}, &SchemaField::field_id); +} + +Result> ProjectFieldsById( + const Schema& schema, const std::set& selected_ids) { + std::unordered_set unordered_ids(selected_ids.begin(), selected_ids.end()); + ICEBERG_ASSIGN_OR_RAISE(auto projected_schema, schema.Project(unordered_ids)); + std::vector fields(projected_schema->fields().begin(), + projected_schema->fields().end()); + return fields; +} + +Result> ProjectEqualityKeyFields( + const Schema& schema, const std::set& selected_ids) { + ICEBERG_ASSIGN_OR_RAISE(auto fields, ProjectFieldsById(schema, selected_ids)); + // Equality-delete keys keep the projected struct shape; fields are sorted by id + // within each struct level, not flattened by nested leaf ids. + SortFieldsById(fields); + return fields; +} + +bool ContainsFieldId(const SchemaField& field, int32_t field_id); + +bool ContainsFieldId(const Type& type, int32_t field_id) { + if (!type.is_nested()) { + return false; + } + const auto& nested = internal::checked_cast(type); + return std::ranges::any_of(nested.fields(), [field_id](const SchemaField& field) { + return ContainsFieldId(field, field_id); + }); +} + +bool ContainsFieldId(const SchemaField& field, int32_t field_id) { + return field.field_id() == field_id || ContainsFieldId(*field.type(), field_id); +} + +Status ValidateEqualityProjectionField(int32_t field_id, const SchemaField& field) { + if (field.field_id() == field_id) { + if (!field.type()->is_primitive()) { + return InvalidArgument( + "Equality delete field id {} must reference a primitive field", field_id); + } + return {}; + } + + switch (field.type()->type_id()) { + case TypeId::kStruct: { + const auto& struct_type = internal::checked_cast(*field.type()); + for (const auto& child : struct_type.fields()) { + if (ContainsFieldId(child, field_id)) { + return ValidateEqualityProjectionField(field_id, child); + } + } + break; + } + case TypeId::kList: + case TypeId::kMap: + if (ContainsFieldId(*field.type(), field_id)) { + return InvalidArgument("Equality delete field id {} must not be nested in {}", + field_id, ToString(field.type()->type_id())); + } + break; + default: + break; + } + + return InvalidSchema("Cannot find equality delete field id {} in projection field {}", + field_id, field.field_id()); +} + +Result> LookupFieldInSchema( + const Schema& schema, int32_t field_id) { + ICEBERG_ASSIGN_OR_RAISE(auto field, schema.FindFieldById(field_id)); + if (!field.has_value()) { + return std::nullopt; + } + if (!field->get().type()->is_primitive()) { + return InvalidArgument("Equality delete field id {} must reference a primitive field", + field_id); + } + + std::set selected_ids = {field_id}; + ICEBERG_ASSIGN_OR_RAISE(auto projected_fields, ProjectFieldsById(schema, selected_ids)); + if (projected_fields.empty()) { + return InvalidSchema("Cannot project field id {} from lookup schema", field_id); + } + if (projected_fields.size() != 1) { + return InvalidSchema("Expected one top-level projection for field id {} but got {}", + field_id, projected_fields.size()); + } + + return DeleteFilter::FieldLookupResult{ + .field = field.value().get(), + .projection_field = std::move(projected_fields[0]), + }; +} + +Result MergeField(SchemaField& existing, const SchemaField& required) { + if (existing.field_id() != required.field_id()) { + return InvalidSchema("Cannot merge field id {} with field id {}", existing.field_id(), + required.field_id()); + } + + if (*existing.type() == *required.type() || !required.type()->is_nested()) { + return false; + } + + if (existing.type()->type_id() == TypeId::kStruct && + required.type()->type_id() == TypeId::kStruct) { + const auto& existing_struct = + internal::checked_cast(*existing.type()); + std::vector fields(existing_struct.fields().begin(), + existing_struct.fields().end()); + const auto& required_struct = + internal::checked_cast(*required.type()); + + bool changed = false; + for (const auto& required_child : required_struct.fields()) { + auto existing_pos = FindFieldIndexById(fields, required_child.field_id()); + if (existing_pos.has_value()) { + ICEBERG_ASSIGN_OR_RAISE(auto child_changed, + MergeField(fields[existing_pos.value()], required_child)); + changed = changed || child_changed; + } else { + fields.push_back(required_child); + changed = true; + } + } + + if (!changed) { + return false; + } + existing = SchemaField(existing.field_id(), std::string(existing.name()), + std::make_shared(std::move(fields)), + existing.optional(), std::string(existing.doc())); + return true; + } + + return InvalidArgument( + "Cannot merge non-struct nested field id {} into delete projection", + required.field_id()); +} + +Result MergeProjectionField(std::vector& fields, + const SchemaField& required_projection) { + auto existing_pos = FindFieldIndexById(fields, required_projection.field_id()); + if (existing_pos.has_value()) { + return MergeField(fields[existing_pos.value()], required_projection); + } + + fields.push_back(required_projection); + return true; +} + +void AddIdOnce(std::vector& ids, std::unordered_set& seen, + int32_t field_id) { + if (seen.insert(field_id).second) { + ids.push_back(field_id); + } +} + +} // namespace + +struct DeleteFilter::EqDeleteGroup { + std::unique_ptr row_projection; + std::unique_ptr delete_set; +}; + +Result DeleteFilter::MakeFieldLookup( + std::shared_ptr table_schema, + std::span> schemas) { + ICEBERG_PRECHECK(table_schema != nullptr, "Table schema must not be null"); + + std::vector> lookup_schemas; + lookup_schemas.reserve(schemas.size() + 1); + const int32_t current_schema_id = table_schema->schema_id(); + lookup_schemas.push_back(std::move(table_schema)); + + std::vector> sorted_fallback_schemas; + sorted_fallback_schemas.reserve(schemas.size()); + for (const auto& schema : schemas) { + ICEBERG_PRECHECK(schema != nullptr, "Schema must not be null"); + if (schema->schema_id() != current_schema_id) { + sorted_fallback_schemas.push_back(schema); + } + } + + // Search fallback schemas from latest to oldest so the highest schema_id wins. + std::ranges::stable_sort(sorted_fallback_schemas, [](const auto& lhs, const auto& rhs) { + return lhs->schema_id() > rhs->schema_id(); + }); + + std::unordered_set seen_schema_ids; + seen_schema_ids.insert(current_schema_id); + for (const auto& schema : sorted_fallback_schemas) { + if (seen_schema_ids.insert(schema->schema_id()).second) { + lookup_schemas.push_back(schema); + } + } + + return [lookup_schemas = std::move(lookup_schemas)]( + int32_t field_id) -> Result> { + for (const auto& schema : lookup_schemas) { + ICEBERG_ASSIGN_OR_RAISE(auto field, LookupFieldInSchema(*schema, field_id)); + if (field.has_value()) { + return field; + } + } + return std::nullopt; + }; +} + +Result DeleteFilter::MakeFieldLookup( + std::shared_ptr table_metadata) { + ICEBERG_PRECHECK(table_metadata != nullptr, "Table metadata must not be null"); + + ICEBERG_ASSIGN_OR_RAISE(auto table_schema, table_metadata->Schema()); + return MakeFieldLookup(std::move(table_schema), table_metadata->schemas); +} + +Result> DeleteFilter::Make( + std::string file_path, std::span> delete_files, + std::shared_ptr table_schema, std::shared_ptr requested_schema, + std::shared_ptr io, bool need_row_pos_col, + std::shared_ptr counter) { + ICEBERG_ASSIGN_OR_RAISE(auto field_lookup, MakeFieldLookup(table_schema)); + return Make(std::move(file_path), delete_files, std::move(requested_schema), + std::move(io), std::move(field_lookup), need_row_pos_col, + std::move(counter)); +} + +Result> DeleteFilter::Make( + std::string file_path, std::span> delete_files, + std::shared_ptr table_metadata, + std::shared_ptr requested_schema, std::shared_ptr io, + bool need_row_pos_col, std::shared_ptr counter) { + ICEBERG_PRECHECK(table_metadata != nullptr, "Table metadata must not be null"); + + ICEBERG_ASSIGN_OR_RAISE(auto field_lookup, MakeFieldLookup(std::move(table_metadata))); + return Make(std::move(file_path), delete_files, std::move(requested_schema), + std::move(io), std::move(field_lookup), need_row_pos_col, + std::move(counter)); +} + +Result> DeleteFilter::Make( + std::string file_path, std::span> delete_files, + std::shared_ptr table_schema, std::shared_ptr requested_schema, + std::shared_ptr io, std::span> schemas, + bool need_row_pos_col, std::shared_ptr counter) { + ICEBERG_ASSIGN_OR_RAISE(auto field_lookup, MakeFieldLookup(table_schema, schemas)); + return Make(std::move(file_path), delete_files, std::move(requested_schema), + std::move(io), std::move(field_lookup), need_row_pos_col, + std::move(counter)); +} + +Result> DeleteFilter::Make( + std::string file_path, std::span> delete_files, + std::shared_ptr requested_schema, std::shared_ptr io, + FieldLookup field_lookup, bool need_row_pos_col, + std::shared_ptr counter) { + ICEBERG_PRECHECK(requested_schema != nullptr, "Requested schema must not be null"); + ICEBERG_PRECHECK(field_lookup != nullptr, "Field lookup must not be null"); + ICEBERG_PRECHECK(delete_files.empty() || io != nullptr, + "FileIO must not be null when delete files are present"); + + auto filter = std::unique_ptr( + new DeleteFilter(std::move(file_path), std::move(requested_schema), std::move(io), + std::move(field_lookup), need_row_pos_col, std::move(counter))); + ICEBERG_RETURN_UNEXPECTED(filter->Init(delete_files)); + return filter; +} + +DeleteFilter::DeleteFilter(std::string file_path, + std::shared_ptr requested_schema, + std::shared_ptr io, FieldLookup field_lookup, + bool need_row_pos_col, std::shared_ptr counter) + : file_path_(std::move(file_path)), + requested_schema_(std::move(requested_schema)), + field_lookup_(std::move(field_lookup)), + need_row_pos_col_(need_row_pos_col), + counter_(std::move(counter)), + delete_loader_(std::move(io)) {} + +DeleteFilter::~DeleteFilter() = default; + +Status DeleteFilter::Init(std::span> delete_files) { + for (const auto& delete_file : delete_files) { + ICEBERG_PRECHECK(delete_file != nullptr, "Delete file must not be null"); + + switch (delete_file->content) { + case DataFile::Content::kPositionDeletes: + pos_deletes_.push_back(delete_file); + break; + case DataFile::Content::kEqualityDeletes: + ICEBERG_RETURN_UNEXPECTED(ValidateEqualityIds(*delete_file)); + eq_deletes_.push_back(delete_file); + break; + case DataFile::Content::kData: + return InvalidArgument("Expected delete file but got data file '{}'", + delete_file->file_path); + default: + return InvalidArgument("Unknown delete file content type {}", + static_cast(delete_file->content)); + } + } + + ICEBERG_ASSIGN_OR_RAISE(required_schema_, ComputeRequiredSchema()); + + // Pre-compute _pos column position for reuse + pos_field_position_ = FindFieldIndexById(required_schema_->fields(), + MetadataColumns::kFilePositionColumnId); + + return {}; +} + +Result> DeleteFilter::ComputeRequiredSchema() const { + if (!HasPositionDeletes() && !HasEqualityDeletes()) { + return requested_schema_; + } + + std::vector required_ids; + std::unordered_set seen_required_ids; + if (HasPositionDeletes() && need_row_pos_col_) { + AddIdOnce(required_ids, seen_required_ids, MetadataColumns::kFilePositionColumnId); + } + + for (const auto& delete_file : eq_deletes_) { + for (int32_t field_id : delete_file->equality_ids) { + AddIdOnce(required_ids, seen_required_ids, field_id); + } + } + + std::vector fields(requested_schema_->fields().begin(), + requested_schema_->fields().end()); + bool changed = false; + + for (int32_t field_id : required_ids) { + if (field_id == MetadataColumns::kFilePositionColumnId || + field_id == MetadataColumns::kIsDeletedColumnId) { + // These columns do not exist in the table schema and will be handled later. + continue; + } + + // Top-level primitive fields already cover equality-delete needs. Nested fields + // still need lookup so we can validate/merge the required subfield projection. + auto existing_pos = FindFieldIndexById(fields, field_id); + if (existing_pos.has_value() && !fields[existing_pos.value()].type()->is_nested()) { + continue; + } + + ICEBERG_ASSIGN_OR_RAISE(auto lookup, field_lookup_(field_id)); + if (!lookup.has_value()) { + return InvalidArgument("Cannot find equality delete field id {}", field_id); + } + ICEBERG_RETURN_UNEXPECTED( + ValidateEqualityProjectionField(field_id, lookup->projection_field)); + + ICEBERG_ASSIGN_OR_RAISE(auto merged, + MergeProjectionField(fields, lookup->projection_field)); + changed = changed || merged; + } + + const bool needs_pos = + HasPositionDeletes() && need_row_pos_col_ && + !FindFieldIndexById(fields, MetadataColumns::kFilePositionColumnId).has_value(); + if (needs_pos) { + fields.push_back(MetadataColumns::kRowPosition); + changed = true; + } + + if (!changed) { + return requested_schema_; + } + + return std::make_shared(std::move(fields)); +} + +const std::shared_ptr& DeleteFilter::RequiredSchema() const { + return required_schema_; +} + +bool DeleteFilter::HasPositionDeletes() const { return !pos_deletes_.empty(); } + +bool DeleteFilter::HasEqualityDeletes() const { return !eq_deletes_.empty(); } + +Status DeleteFilter::EnsurePositionDeletesLoaded() const { + if (!HasPositionDeletes()) { + return {}; + } + + std::lock_guard lock(pos_mutex_); + if (pos_loaded_) { + return {}; + } + + ICEBERG_ASSIGN_OR_RAISE(pos_index_, + delete_loader_.LoadPositionDeletes(pos_deletes_, file_path_)); + pos_loaded_ = true; + return {}; +} + +Status DeleteFilter::EnsureEqualityDeletesLoaded() const { + if (!HasEqualityDeletes()) { + return {}; + } + + std::lock_guard lock(eq_mutex_); + if (eq_loaded_) { + return {}; + } + + std::map, std::vector>> files_by_ids; + for (const auto& delete_file : eq_deletes_) { + // equality_ids were already validated in Init, build the grouping key directly. + std::set ids(delete_file->equality_ids.begin(), + delete_file->equality_ids.end()); + files_by_ids[std::move(ids)].push_back(delete_file); + } + + std::vector> groups; + groups.reserve(files_by_ids.size()); + + for (auto& [field_ids, files] : files_by_ids) { + ICEBERG_ASSIGN_OR_RAISE(auto fields, + ProjectEqualityKeyFields(*required_schema_, field_ids)); + auto equality_type = std::make_shared(std::move(fields)); + + ICEBERG_ASSIGN_OR_RAISE(auto row_projection, ProjectedStructLike::BuildProjection( + *required_schema_, *equality_type)); + auto project_row = std::make_unique(std::move(row_projection)); + ICEBERG_ASSIGN_OR_RAISE(auto delete_set, + delete_loader_.LoadEqualityDeletes(files, *equality_type)); + groups.push_back(std::make_unique(EqDeleteGroup{ + .row_projection = std::move(project_row), + .delete_set = std::move(delete_set), + })); + } + + eq_groups_ = std::move(groups); + eq_loaded_ = true; + return {}; +} + +const std::shared_ptr& DeleteFilter::ExpectedSchema() const { + return requested_schema_; +} + +void DeleteFilter::IncrementDeleteCount(int64_t count) { + if (counter_ != nullptr) { + counter_->Increment(count); + } +} + +Result DeleteFilter::DeletedRowPositions() const { + if (!HasPositionDeletes()) { + return nullptr; + } + ICEBERG_RETURN_UNEXPECTED(EnsurePositionDeletesLoaded()); + return &pos_index_; +} + +Result(const StructLike&)>> DeleteFilter::EqDeletedRowFilter() + const { + if (!HasEqualityDeletes()) { + // No equality deletes: every row is alive. + return [](const StructLike&) -> Result { return true; }; + } + ICEBERG_RETURN_UNEXPECTED(EnsureEqualityDeletesLoaded()); + std::lock_guard lock(eq_mutex_); + if (!eq_deleted_row_filter_cache_) { + eq_deleted_row_filter_cache_ = [this](const StructLike& row) -> Result { + for (const auto& group : eq_groups_) { + auto& projected_row = *group->row_projection; + projected_row.Wrap(row); + ICEBERG_ASSIGN_OR_RAISE(auto matched, group->delete_set->Contains(projected_row)); + if (matched) { + return false; + } + } + return true; + }; + } + return eq_deleted_row_filter_cache_; +} + +Result(const StructLike&)>> +DeleteFilter::FindEqualityDeleteRows() const { + if (!HasEqualityDeletes()) { + // No equality deletes: no row is deleted. + return [](const StructLike&) -> Result { return false; }; + } + ICEBERG_ASSIGN_OR_RAISE(auto alive_filter, EqDeletedRowFilter()); + return [alive_filter = std::move(alive_filter)](const StructLike& row) -> Result { + ICEBERG_ASSIGN_OR_RAISE(auto alive, alive_filter(row)); + return !alive; + }; +} + +Result DeleteFilter::ComputeAliveRows(const ArrowSchema& batch_schema, + const ArrowArray& batch) const { + ICEBERG_PRECHECK(batch.length >= 0, "Batch length must be non-negative"); + + ICEBERG_RETURN_UNEXPECTED(EnsurePositionDeletesLoaded()); + ICEBERG_RETURN_UNEXPECTED(EnsureEqualityDeletesLoaded()); + + AliveRowSelection result; + if (batch.length == 0) { + return result; + } + + result.indices.reserve(batch.length); + ICEBERG_ASSIGN_OR_RAISE(auto row, ArrowArrayStructLike::Make(batch_schema, batch)); + + for (int64_t i = 0; i < batch.length; ++i) { + if (i > 0) { + ICEBERG_RETURN_UNEXPECTED(row->Reset(i)); + } + + bool deleted = false; + if (pos_field_position_.has_value()) { + ICEBERG_ASSIGN_OR_RAISE(auto pos_scalar, + row->GetField(pos_field_position_.value())); + auto* pos = std::get_if(&pos_scalar); + if (pos == nullptr) { + return InvalidArrowData("Position delete filtering requires non-null int64 _pos"); + } + deleted = pos_index_.IsDeleted(*pos); + } + + if (!deleted) { + for (const auto& eq_group : eq_groups_) { + auto& projected_row = *eq_group->row_projection; + projected_row.Wrap(*row); + ICEBERG_ASSIGN_OR_RAISE(auto matched, + eq_group->delete_set->Contains(projected_row)); + if (matched) { + deleted = true; + break; + } + } + } + + if (!deleted) { + result.indices.push_back(static_cast(i)); + } else if (counter_ != nullptr) { + counter_->Increment(); + } + } + + return result; +} + +} // namespace iceberg diff --git a/src/iceberg/data/delete_filter.h b/src/iceberg/data/delete_filter.h new file mode 100644 index 000000000..4cb9bace6 --- /dev/null +++ b/src/iceberg/data/delete_filter.h @@ -0,0 +1,229 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +#pragma once + +/// \file iceberg/data/delete_filter.h +/// Delete-aware filtering for Arrow C Data batches. + +#include +#include +#include +#include +#include +#include +#include +#include +#include + +#include "iceberg/arrow_c_data.h" +#include "iceberg/data/delete_loader.h" +#include "iceberg/deletes/position_delete_index.h" +#include "iceberg/iceberg_data_export.h" +#include "iceberg/result.h" +#include "iceberg/schema_field.h" +#include "iceberg/type_fwd.h" + +namespace iceberg { + +/// \brief Result of ComputeAliveRows: indices of rows not matched by any delete. +struct ICEBERG_DATA_EXPORT AliveRowSelection { + /// Zero-based row indices within the batch that are alive (not deleted). + std::vector indices; + + /// Number of alive rows (convenience accessor to avoid size_t casts). + int64_t alive_count() const { return static_cast(indices.size()); } + + bool empty() const { return indices.empty(); } +}; + +/// \brief Counts rows removed by delete filters. +class ICEBERG_DATA_EXPORT DeleteCounter { + public: + void Increment(int64_t count = 1) { + count_.fetch_add(count, std::memory_order_relaxed); + } + int64_t Get() const { return count_.load(std::memory_order_relaxed); } + + private: + std::atomic count_{0}; +}; + +/// \brief Concrete batch-oriented delete filter for merge-on-read data batches. +class ICEBERG_DATA_EXPORT DeleteFilter { + public: + /// \brief Field lookup output for current or fallback equality-delete fields. + /// + /// `field` is the exact field for validation. `projection_field` is the + /// top-level field, possibly with a pruned nested struct path, that must be + /// merged into RequiredSchema so the data reader can materialize the delete + /// column. + struct FieldLookupResult { + SchemaField field; + SchemaField projection_field; + }; + + /// \brief Lookup a field by ID, including fields from table schema fallbacks. + using FieldLookup = std::function>(int32_t)>; + + /// \brief Build a lookup from the current schema and optional table schemas. + /// + /// The current table schema is searched first. `schemas` is the table metadata + /// schema list and may contain `table_schema`; current schema duplicates are ignored + /// and fallback schemas are searched from latest schema id to oldest. + static Result MakeFieldLookup( + std::shared_ptr table_schema, + std::span> schemas = {}); + + /// \brief Build a lookup from table metadata which uses the current schema first, + /// then table metadata schemas as fallback. + static Result MakeFieldLookup( + std::shared_ptr table_metadata); + + /// \brief Create a DeleteFilter with current schema only field lookup. + /// + /// \param need_row_pos_col If true, `_pos` is added to `RequiredSchema` when + /// position deletes are present so `ComputeAliveRows` can apply them. + /// Pass false when the caller owns position filtering externally (e.g. a vectorised + /// reader that applies the position delete index directly to Arrow column buffers). + /// Note that when `need_row_pos_col` is false, `HasPositionDeletes()` may + /// return true but `ComputeAliveRows` will not apply position deletes because `_pos` + /// is absent from `RequiredSchema`. The caller is responsible for applying them. + /// \param counter Optional counter incremented for each deleted row. + static Result> Make( + std::string file_path, std::span> delete_files, + std::shared_ptr table_schema, std::shared_ptr requested_schema, + std::shared_ptr io, bool need_row_pos_col = true, + std::shared_ptr counter = nullptr); + + /// \brief Create a DeleteFilter using table metadata for schema-aware field lookup. + static Result> Make( + std::string file_path, std::span> delete_files, + std::shared_ptr table_metadata, + std::shared_ptr requested_schema, std::shared_ptr io, + bool need_row_pos_col = true, std::shared_ptr counter = nullptr); + + /// \brief Create a DeleteFilter with table schemas for dropped equality fields. + static Result> Make( + std::string file_path, std::span> delete_files, + std::shared_ptr table_schema, std::shared_ptr requested_schema, + std::shared_ptr io, std::span> schemas, + bool need_row_pos_col = true, std::shared_ptr counter = nullptr); + + /// \brief Create a DeleteFilter with a custom field lookup. + static Result> Make( + std::string file_path, std::span> delete_files, + std::shared_ptr requested_schema, std::shared_ptr io, + FieldLookup field_lookup, bool need_row_pos_col = true, + std::shared_ptr counter = nullptr); + + ~DeleteFilter(); + + /// \brief Schema required from the underlying data file reader. + const std::shared_ptr& RequiredSchema() const; + + /// \brief The original schema requested by the caller, before delete columns were + /// added. + const std::shared_ptr& ExpectedSchema() const; + + /// \brief Increment the delete counter by the given count. + /// + /// Allows callers to record deletes that occur outside `ComputeAliveRows` (e.g. when + /// applying deletes in a vectorised path). + void IncrementDeleteCount(int64_t count = 1); + + /// \brief Expose the loaded position delete index for external use. + /// + /// Triggers lazy loading of position delete files on first call. Returns nullptr + /// when there are no position deletes. Returns an error if loading fails. + /// + /// The returned pointer is valid only for the lifetime of this DeleteFilter. + Result DeletedRowPositions() const; + + /// \brief Returns a predicate that is true for rows NOT matched by any equality delete. + /// + /// The returned function is valid for the lifetime of this DeleteFilter and is cached + /// after the first call. When there are no equality deletes, returns a predicate that + /// always returns true (every row is alive). + /// + /// \note The returned predicate is NOT thread-safe: it mutates internal projection + /// state on each call. Do not invoke it concurrently from multiple threads. + Result(const StructLike&)>> EqDeletedRowFilter() const; + + /// \brief Returns a predicate that is true for rows matched by any equality delete. + /// + /// Inverse of `EqDeletedRowFilter()`. When there are no equality deletes, returns a + /// predicate that always returns false (no row is deleted). + Result(const StructLike&)>> FindEqualityDeleteRows() const; + + /// \brief Compute alive rows relative to the supplied Arrow C Data batch. + /// + /// Returns the indices (zero-based, relative to the batch) of rows not matched by + /// any delete. Deleted-row counts are forwarded to the DeleteCounter supplied at + /// construction. + Result ComputeAliveRows(const ArrowSchema& batch_schema, + const ArrowArray& batch) const; + + bool HasPositionDeletes() const; + bool HasEqualityDeletes() const; + + DeleteFilter(const DeleteFilter&) = delete; + DeleteFilter& operator=(const DeleteFilter&) = delete; + + private: + struct EqDeleteGroup; + + DeleteFilter(std::string file_path, std::shared_ptr requested_schema, + std::shared_ptr io, FieldLookup field_lookup, + bool need_row_pos_col, std::shared_ptr counter); + + Status Init(std::span> delete_files); + Result> ComputeRequiredSchema() const; + Status EnsurePositionDeletesLoaded() const; + Status EnsureEqualityDeletesLoaded() const; + + const std::string file_path_; + std::vector> pos_deletes_; + std::vector> eq_deletes_; + + std::shared_ptr requested_schema_; + std::shared_ptr required_schema_; + FieldLookup field_lookup_; + + const bool need_row_pos_col_; + // Position of `_pos` in required_schema_ when existent + std::optional pos_field_position_; + std::shared_ptr counter_; + + // TODO(gangwu): expose a factory hook (e.g. a std::function or a + // virtual newDeleteLoader()) so callers can inject a caching DeleteLoader (analogous to + // SparkDeleteFilter.CachingDeleteLoader in Java). + DeleteLoader delete_loader_; + + mutable std::mutex pos_mutex_; + mutable bool pos_loaded_ = false; + mutable PositionDeleteIndex pos_index_; + + mutable std::mutex eq_mutex_; + mutable bool eq_loaded_ = false; + mutable std::vector> eq_groups_; + mutable std::function(const StructLike&)> eq_deleted_row_filter_cache_; +}; + +} // namespace iceberg diff --git a/src/iceberg/data/meson.build b/src/iceberg/data/meson.build index 0f68deccf..f0877ec64 100644 --- a/src/iceberg/data/meson.build +++ b/src/iceberg/data/meson.build @@ -18,6 +18,7 @@ install_headers( [ 'data_writer.h', + 'delete_filter.h', 'delete_loader.h', 'equality_delete_writer.h', 'position_delete_writer.h', diff --git a/src/iceberg/meson.build b/src/iceberg/meson.build index c2947f3fe..0b5f269d5 100644 --- a/src/iceberg/meson.build +++ b/src/iceberg/meson.build @@ -142,6 +142,7 @@ iceberg_sources = files( iceberg_data_sources = files( 'data/data_writer.cc', + 'data/delete_filter.cc', 'data/delete_loader.cc', 'data/equality_delete_writer.cc', 'data/position_delete_writer.cc', diff --git a/src/iceberg/metadata_columns.h b/src/iceberg/metadata_columns.h index 61f07c488..b390a50e8 100644 --- a/src/iceberg/metadata_columns.h +++ b/src/iceberg/metadata_columns.h @@ -50,7 +50,7 @@ struct ICEBERG_EXPORT MetadataColumns { constexpr static int32_t kIsDeletedColumnId = kInt32Max - 3; inline static const SchemaField kIsDeleted = SchemaField::MakeRequired( - kIsDeletedColumnId, "_deleted", binary(), "Whether the row has been deleted"); + kIsDeletedColumnId, "_deleted", boolean(), "Whether the row has been deleted"); constexpr static int32_t kSpecIdColumnId = kInt32Max - 4; inline static const SchemaField kSpecId = diff --git a/src/iceberg/test/CMakeLists.txt b/src/iceberg/test/CMakeLists.txt index 1d80b29a5..afafc4c14 100644 --- a/src/iceberg/test/CMakeLists.txt +++ b/src/iceberg/test/CMakeLists.txt @@ -220,6 +220,7 @@ if(ICEBERG_BUILD_BUNDLE) USE_BUNDLE SOURCES data_writer_test.cc + delete_filter_test.cc delete_loader_test.cc) endif() diff --git a/src/iceberg/test/delete_filter_test.cc b/src/iceberg/test/delete_filter_test.cc new file mode 100644 index 000000000..89d1b6b85 --- /dev/null +++ b/src/iceberg/test/delete_filter_test.cc @@ -0,0 +1,1625 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +#include "iceberg/data/delete_filter.h" + +#include +#include +#include +#include +#include +#include +#include + +#include +#include +#include +#include +#include + +#include "iceberg/arrow/arrow_io_internal.h" +#include "iceberg/data/equality_delete_writer.h" +#include "iceberg/data/position_delete_writer.h" +#include "iceberg/file_format.h" +#include "iceberg/file_reader.h" +#include "iceberg/manifest/manifest_entry.h" +#include "iceberg/metadata_columns.h" +#include "iceberg/parquet/parquet_register.h" +#include "iceberg/partition_spec.h" +#include "iceberg/row/arrow_array_wrapper.h" +#include "iceberg/row/partition_values.h" +#include "iceberg/schema.h" +#include "iceberg/schema_field.h" +#include "iceberg/schema_internal.h" +#include "iceberg/table_metadata.h" +#include "iceberg/test/matchers.h" +#include "iceberg/type.h" +#include "iceberg/util/macros.h" + +namespace iceberg { + +namespace { + +struct ExportedBatch { + ArrowSchema schema{}; + ArrowArray array{}; + + ExportedBatch() = default; + ~ExportedBatch() { + if (array.release != nullptr) { + array.release(&array); + } + if (schema.release != nullptr) { + schema.release(&schema); + } + } + + ExportedBatch(const ExportedBatch&) = delete; + ExportedBatch& operator=(const ExportedBatch&) = delete; + + ExportedBatch(ExportedBatch&& other) noexcept + : schema(other.schema), array(other.array) { + other.schema.release = nullptr; + other.array.release = nullptr; + } + ExportedBatch& operator=(ExportedBatch&& other) noexcept = delete; +}; + +std::vector FieldNames(const Schema& schema) { + std::vector names; + for (const auto& field : schema.fields()) { + names.emplace_back(field.name()); + } + return names; +} + +std::vector FieldIds(const Schema& schema) { + std::vector ids; + for (const auto& field : schema.fields()) { + ids.push_back(field.field_id()); + } + return ids; +} + +std::vector StructFieldIds(const StructType& struct_type) { + std::vector ids; + for (const auto& field : struct_type.fields()) { + ids.push_back(field.field_id()); + } + return ids; +} + +void ExpectAliveRows(const AliveRowSelection& alive, + const std::vector& expected) { + ASSERT_EQ(alive.alive_count(), static_cast(expected.size())); + EXPECT_EQ(alive.indices, expected); +} + +class CapturingReader : public Reader { + public: + explicit CapturingReader(std::shared_ptr* projection) + : projection_(projection) {} + + Status Open(const ReaderOptions& options) override { + *projection_ = options.projection; + return {}; + } + + Status Close() override { return {}; } + + Result> Next() override { return std::nullopt; } + + Result Schema() override { + ArrowSchema schema; + ICEBERG_RETURN_UNEXPECTED(ToArrowSchema(**projection_, &schema)); + return schema; + } + + Result> Metadata() override { + return std::unordered_map{}; + } + + private: + std::shared_ptr* projection_; +}; + +class ScopedReaderFactory { + public: + ScopedReaderFactory(FileFormatType format_type, ReaderFactory factory) + : format_type_(format_type), + previous_(ReaderFactoryRegistry::GetFactory(format_type)) { + ReaderFactoryRegistry::GetFactory(format_type_) = std::move(factory); + } + + ~ScopedReaderFactory() { + ReaderFactoryRegistry::GetFactory(format_type_) = std::move(previous_); + } + + private: + FileFormatType format_type_; + ReaderFactory previous_; +}; + +} // namespace + +class DeleteFilterTest : public ::testing::Test { + protected: + static void SetUpTestSuite() { parquet::RegisterAll(); } + + void SetUp() override { + file_io_ = arrow::ArrowFileSystemFileIO::MakeMockFileIO(); + table_schema_ = std::make_shared( + std::vector{SchemaField::MakeRequired(1, "id", int32()), + SchemaField::MakeOptional(2, "name", string()), + SchemaField::MakeOptional(3, "category", string())}); + partition_spec_ = PartitionSpec::Unpartitioned(); + } + + std::shared_ptr Project(std::initializer_list field_ids) const { + std::unordered_set ids(field_ids.begin(), field_ids.end()); + auto result = table_schema_->Project(ids); + EXPECT_TRUE(result.has_value()) << "Projection failed: " << result.error().message; + return std::move(result.value()); + } + + Result MakeBatch(const Schema& schema, + const std::string& json_data) const { + ArrowSchema type_schema; + ICEBERG_RETURN_UNEXPECTED(ToArrowSchema(schema, &type_schema)); + auto arrow_type_result = ::arrow::ImportType(&type_schema); + if (!arrow_type_result.ok()) { + return UnknownError(arrow_type_result.status().ToString()); + } + auto struct_type = ::arrow::struct_(arrow_type_result.MoveValueUnsafe()->fields()); + auto array_result = ::arrow::json::ArrayFromJSONString(struct_type, json_data); + if (!array_result.ok()) { + return UnknownError(array_result.status().ToString()); + } + + ExportedBatch batch; + ICEBERG_RETURN_UNEXPECTED(ToArrowSchema(schema, &batch.schema)); + auto export_status = + ::arrow::ExportArray(*array_result.MoveValueUnsafe(), &batch.array); + if (!export_status.ok()) { + return UnknownError(export_status.ToString()); + } + return batch; + } + + Result> PositionDeleteFile( + const std::string& path, const std::vector& positions, + const std::string& data_path = std::string(kDataPath)) { + PositionDeleteWriterOptions options{ + .path = path, + .schema = table_schema_, + .spec = partition_spec_, + .partition = PartitionValues{}, + .format = FileFormatType::kParquet, + .io = file_io_, + .flush_threshold = 10000, + .properties = {{"write.parquet.compression-codec", "uncompressed"}}, + }; + + ICEBERG_ASSIGN_OR_RAISE(auto writer, PositionDeleteWriter::Make(options)); + for (int64_t pos : positions) { + ICEBERG_RETURN_UNEXPECTED(writer->WriteDelete(data_path, pos)); + } + ICEBERG_RETURN_UNEXPECTED(writer->Close()); + ICEBERG_ASSIGN_OR_RAISE(auto metadata, writer->Metadata()); + return metadata.data_files[0]; + } + + Result> EqualityDeleteFile( + const std::string& path, const std::string& json_data, + std::vector equality_field_ids) { + EqualityDeleteWriterOptions options{ + .path = path, + .schema = table_schema_, + .spec = partition_spec_, + .partition = PartitionValues{}, + .format = FileFormatType::kParquet, + .io = file_io_, + .equality_field_ids = std::move(equality_field_ids), + .properties = {{"write.parquet.compression-codec", "uncompressed"}}, + }; + + ICEBERG_ASSIGN_OR_RAISE(auto writer, EqualityDeleteWriter::Make(options)); + ICEBERG_ASSIGN_OR_RAISE(auto batch, MakeBatch(*table_schema_, json_data)); + ICEBERG_RETURN_UNEXPECTED(writer->Write(&batch.array)); + ICEBERG_RETURN_UNEXPECTED(writer->Close()); + ICEBERG_ASSIGN_OR_RAISE(auto metadata, writer->Metadata()); + return metadata.data_files[0]; + } + + static constexpr std::string_view kDataPath = "data.parquet"; + + std::shared_ptr file_io_; + std::shared_ptr table_schema_; + std::shared_ptr partition_spec_; +}; + +enum class RequiredSchemaRequest { + kProjectFields, + kIdAndRowPos, +}; + +struct RequiredSchemaCase { + const char* name; + RequiredSchemaRequest request; + std::vector requested_field_ids; + std::vector> equality_ids_by_file; + bool has_pos_delete; + bool need_row_pos_col; + std::vector expected_field_ids; + std::vector expected_field_names; + bool expected_has_position_deletes; + bool expected_has_equality_deletes; +}; + +template +std::string ParamName(const testing::TestParamInfo& info) { + return info.param.name; +} + +class DeleteFilterRequiredSchemaTest + : public DeleteFilterTest, + public testing::WithParamInterface { + protected: + std::shared_ptr RequestedSchema(const RequiredSchemaCase& test_case) { + switch (test_case.request) { + case RequiredSchemaRequest::kProjectFields: { + std::unordered_set ids(test_case.requested_field_ids.begin(), + test_case.requested_field_ids.end()); + auto result = table_schema_->Project(ids); + EXPECT_TRUE(result.has_value()) + << "Projection failed: " << result.error().message; + return std::move(result.value()); + } + case RequiredSchemaRequest::kIdAndRowPos: + return std::make_shared(std::vector{ + SchemaField::MakeRequired(1, "id", int32()), MetadataColumns::kRowPosition}); + } + return nullptr; + } + + std::vector> DeleteFiles( + const RequiredSchemaCase& test_case) { + std::vector> delete_files; + for (size_t index = 0; index < test_case.equality_ids_by_file.size(); ++index) { + delete_files.push_back(std::make_shared(DataFile{ + .content = DataFile::Content::kEqualityDeletes, + .file_path = std::format("{}-eq-{}.parquet", test_case.name, index), + .file_format = FileFormatType::kParquet, + .equality_ids = test_case.equality_ids_by_file[index], + })); + } + if (test_case.has_pos_delete) { + delete_files.push_back(std::make_shared(DataFile{ + .content = DataFile::Content::kPositionDeletes, + .file_path = std::format("{}-pos.parquet", test_case.name), + .file_format = FileFormatType::kParquet, + })); + } + return delete_files; + } +}; + +TEST_P(DeleteFilterRequiredSchemaTest, ComputesRequiredSchema) { + const auto& test_case = GetParam(); + auto delete_files = DeleteFiles(test_case); + auto requested_schema = RequestedSchema(test_case); + + auto filter = + DeleteFilter::Make(std::string(kDataPath), delete_files, table_schema_, + requested_schema, file_io_, test_case.need_row_pos_col); + + ASSERT_THAT(filter, IsOk()); + EXPECT_EQ(filter.value()->HasPositionDeletes(), + test_case.expected_has_position_deletes); + EXPECT_EQ(filter.value()->HasEqualityDeletes(), + test_case.expected_has_equality_deletes); + EXPECT_THAT(FieldIds(*filter.value()->RequiredSchema()), + testing::ElementsAreArray(test_case.expected_field_ids)); + EXPECT_THAT(FieldNames(*filter.value()->RequiredSchema()), + testing::ElementsAreArray(test_case.expected_field_names)); +} + +INSTANTIATE_TEST_SUITE_P( + RequiredSchema, DeleteFilterRequiredSchemaTest, + testing::Values( + RequiredSchemaCase{ + .name = "UnchangedWithoutDeletes", + .request = RequiredSchemaRequest::kProjectFields, + .requested_field_ids = {2, 1}, + .equality_ids_by_file = {}, + .has_pos_delete = false, + .need_row_pos_col = true, + .expected_field_ids = {1, 2}, + .expected_field_names = {"id", "name"}, + .expected_has_position_deletes = false, + .expected_has_equality_deletes = false, + }, + RequiredSchemaCase{ + .name = "AddsEqualityFieldsAndRowPos", + .request = RequiredSchemaRequest::kProjectFields, + .requested_field_ids = {1}, + .equality_ids_by_file = {{2}, {3, 1}}, + .has_pos_delete = true, + .need_row_pos_col = true, + .expected_field_ids = {1, 2, 3, MetadataColumns::kFilePositionColumnId}, + .expected_field_names = {"id", "name", "category", + std::string(MetadataColumns::kRowPosition.name())}, + .expected_has_position_deletes = true, + .expected_has_equality_deletes = true, + }, + RequiredSchemaCase{ + .name = "AddsEqualityFieldsInDeclaredOrder", + .request = RequiredSchemaRequest::kProjectFields, + .requested_field_ids = {1}, + .equality_ids_by_file = {{3, 2}}, + .has_pos_delete = false, + .need_row_pos_col = true, + .expected_field_ids = {1, 3, 2}, + .expected_field_names = {"id", "category", "name"}, + .expected_has_position_deletes = false, + .expected_has_equality_deletes = true, + }, + RequiredSchemaCase{ + .name = "DeduplicatesRowPos", + .request = RequiredSchemaRequest::kIdAndRowPos, + .requested_field_ids = {}, + .equality_ids_by_file = {}, + .has_pos_delete = true, + .need_row_pos_col = true, + .expected_field_ids = {1, MetadataColumns::kFilePositionColumnId}, + .expected_field_names = {"id", + std::string(MetadataColumns::kRowPosition.name())}, + .expected_has_position_deletes = true, + .expected_has_equality_deletes = false, + }, + RequiredSchemaCase{ + .name = "NeedRowPosColFalseOmitsPos", + .request = RequiredSchemaRequest::kProjectFields, + .requested_field_ids = {1}, + .equality_ids_by_file = {}, + .has_pos_delete = true, + .need_row_pos_col = false, + .expected_field_ids = {1}, + .expected_field_names = {"id"}, + .expected_has_position_deletes = true, + .expected_has_equality_deletes = false, + }, + RequiredSchemaCase{ + .name = "NeedRowPosColTrueAppendsPos", + .request = RequiredSchemaRequest::kProjectFields, + .requested_field_ids = {1}, + .equality_ids_by_file = {}, + .has_pos_delete = true, + .need_row_pos_col = true, + .expected_field_ids = {1, MetadataColumns::kFilePositionColumnId}, + .expected_field_names = {"id", + std::string(MetadataColumns::kRowPosition.name())}, + .expected_has_position_deletes = true, + .expected_has_equality_deletes = false, + }, + RequiredSchemaCase{ + .name = "AddsFieldsInJavaOrder", + .request = RequiredSchemaRequest::kProjectFields, + .requested_field_ids = {1}, + .equality_ids_by_file = {{2}, {3}}, + .has_pos_delete = true, + .need_row_pos_col = true, + .expected_field_ids = {1, 2, 3, MetadataColumns::kFilePositionColumnId}, + .expected_field_names = {"id", "name", "category", + std::string(MetadataColumns::kRowPosition.name())}, + .expected_has_position_deletes = true, + .expected_has_equality_deletes = true, + }), + ParamName); + +TEST_F(DeleteFilterTest, EqualityFieldsCanBeTopLevelPrimitiveOrNestedPrimitive) { + auto nested_schema = std::make_shared(std::vector{ + SchemaField::MakeRequired(1, "id", int32()), + SchemaField::MakeOptional( + 4, "info", struct_({SchemaField::MakeOptional(5, "city", string())}))}); + auto requested_schema = std::make_shared( + std::vector{SchemaField::MakeRequired(1, "id", int32())}); + auto eq_by_struct = std::make_shared(DataFile{ + .content = DataFile::Content::kEqualityDeletes, + .file_path = "eq-id.parquet", + .file_format = FileFormatType::kParquet, + .equality_ids = {1}, + }); + + std::vector> top_level_primitive_delete = {eq_by_struct}; + auto top_level_filter = + DeleteFilter::Make(std::string(kDataPath), top_level_primitive_delete, + nested_schema, requested_schema, file_io_); + ASSERT_THAT(top_level_filter, IsOk()); + EXPECT_THAT(FieldIds(*top_level_filter.value()->RequiredSchema()), + testing::ElementsAre(1)); + + auto eq_by_nested_field = std::make_shared(*eq_by_struct); + eq_by_nested_field->equality_ids = {5}; + std::vector> nested_delete = {eq_by_nested_field}; + + auto nested_filter = DeleteFilter::Make(std::string(kDataPath), nested_delete, + nested_schema, requested_schema, file_io_); + + ASSERT_THAT(nested_filter, IsOk()); + EXPECT_THAT(FieldIds(*nested_filter.value()->RequiredSchema()), + testing::ElementsAre(1, 4)); +} + +TEST_F(DeleteFilterTest, RequiredSchemaMergesNestedSibling) { + auto nested_schema = std::make_shared(std::vector{ + SchemaField::MakeRequired(1, "id", int32()), + SchemaField::MakeOptional( + 4, "info", + struct_({SchemaField::MakeOptional(5, "city", string()), + SchemaField::MakeOptional(6, "state", string())}))}); + auto requested_schema = std::shared_ptr( + nested_schema->Project(std::unordered_set{1, 5}).value()); + auto eq_by_state = std::make_shared(DataFile{ + .content = DataFile::Content::kEqualityDeletes, + .file_path = "eq-state.parquet", + .file_format = FileFormatType::kParquet, + .equality_ids = {6}, + }); + std::vector> delete_files = {eq_by_state}; + + auto filter = DeleteFilter::Make(std::string(kDataPath), delete_files, nested_schema, + requested_schema, file_io_); + + ASSERT_THAT(filter, IsOk()); + EXPECT_THAT(FieldIds(*filter.value()->RequiredSchema()), testing::ElementsAre(1, 4)); + const auto& info = filter.value()->RequiredSchema()->fields()[1]; + auto info_type = std::dynamic_pointer_cast(info.type()); + ASSERT_NE(info_type, nullptr); + EXPECT_THAT(StructFieldIds(*info_type), testing::ElementsAre(5, 6)); +} + +TEST_F(DeleteFilterTest, StructEqualityFieldErrors) { + auto nested_schema = std::make_shared(std::vector{ + SchemaField::MakeRequired(1, "id", int32()), + SchemaField::MakeOptional( + 4, "info", + struct_({SchemaField::MakeOptional(5, "city", string()), + SchemaField::MakeOptional(6, "state", string())}))}); + auto requested_schema = std::shared_ptr( + nested_schema->Project(std::unordered_set{1, 5}).value()); + auto eq_by_info = std::make_shared(DataFile{ + .content = DataFile::Content::kEqualityDeletes, + .file_path = "eq-info.parquet", + .file_format = FileFormatType::kParquet, + .equality_ids = {4}, + }); + std::vector> delete_files = {eq_by_info}; + + auto filter = DeleteFilter::Make(std::string(kDataPath), delete_files, nested_schema, + requested_schema, file_io_); + + EXPECT_THAT(filter, IsError(ErrorKind::kInvalidArgument)); + EXPECT_THAT(filter, HasErrorMessage("must reference a primitive field")); +} + +enum class AliveRowsDeleteKind { + kNone, + kPosition, + kEqualityName, + kEqualityNameAndCategory, + kMixedPositionAndEqualityId, +}; + +enum class CounterMode { + kNone, + kAttachCounter, + kNullCounter, +}; + +struct AliveRowsCase { + const char* name; + AliveRowsDeleteKind delete_kind; + std::vector position_delete_positions; + std::string position_delete_data_path; + bool need_row_pos_col; + CounterMode counter_mode; + std::string batch_json; + std::vector expected_alive_rows; + std::optional expected_delete_count; +}; + +class DeleteFilterAliveRowsTest : public DeleteFilterTest, + public testing::WithParamInterface {}; + +TEST_P(DeleteFilterAliveRowsTest, ComputesAliveRows) { + const auto& test_case = GetParam(); + std::vector> delete_files; + switch (test_case.delete_kind) { + case AliveRowsDeleteKind::kNone: + break; + case AliveRowsDeleteKind::kPosition: { + auto data_path = test_case.position_delete_data_path.empty() + ? std::string(kDataPath) + : test_case.position_delete_data_path; + ICEBERG_UNWRAP_OR_FAIL( + auto pos_delete, + PositionDeleteFile(std::format("{}-pos.parquet", test_case.name), + test_case.position_delete_positions, data_path)); + delete_files.push_back(pos_delete); + break; + } + case AliveRowsDeleteKind::kEqualityName: { + ICEBERG_UNWRAP_OR_FAIL( + auto eq_by_name, + EqualityDeleteFile(std::format("{}-eq-name.parquet", test_case.name), + R"([[0, "Bob", "unused"]])", {2})); + delete_files.push_back(eq_by_name); + break; + } + case AliveRowsDeleteKind::kEqualityNameAndCategory: { + ICEBERG_UNWRAP_OR_FAIL( + auto eq_by_name, + EqualityDeleteFile(std::format("{}-eq-name.parquet", test_case.name), + R"([[0, "Bob", "unused"]])", {2})); + ICEBERG_UNWRAP_OR_FAIL( + auto eq_by_category, + EqualityDeleteFile(std::format("{}-eq-category.parquet", test_case.name), + R"([[0, "unused", "red"]])", {3})); + delete_files.push_back(eq_by_name); + delete_files.push_back(eq_by_category); + break; + } + case AliveRowsDeleteKind::kMixedPositionAndEqualityId: { + ICEBERG_UNWRAP_OR_FAIL( + auto pos_delete, + PositionDeleteFile(std::format("{}-pos.parquet", test_case.name), + test_case.position_delete_positions)); + ICEBERG_UNWRAP_OR_FAIL( + auto eq_by_id, + EqualityDeleteFile(std::format("{}-eq-id.parquet", test_case.name), + R"([[3, "unused", "unused"]])", {1})); + delete_files.push_back(pos_delete); + delete_files.push_back(eq_by_id); + break; + } + } + + auto requested_schema = Project({1}); + std::shared_ptr counter; + if (test_case.counter_mode == CounterMode::kAttachCounter) { + counter = std::make_shared(); + } + auto filter = + DeleteFilter::Make(std::string(kDataPath), delete_files, table_schema_, + requested_schema, file_io_, test_case.need_row_pos_col, counter); + ASSERT_THAT(filter, IsOk()); + ICEBERG_UNWRAP_OR_FAIL( + auto batch, MakeBatch(*filter.value()->RequiredSchema(), test_case.batch_json)); + + auto alive = filter.value()->ComputeAliveRows(batch.schema, batch.array); + + ASSERT_THAT(alive, IsOk()); + ExpectAliveRows(alive.value(), test_case.expected_alive_rows); + if (test_case.expected_delete_count.has_value()) { + ASSERT_NE(counter, nullptr); + EXPECT_EQ(counter->Get(), test_case.expected_delete_count.value()); + } +} + +INSTANTIATE_TEST_SUITE_P( + AliveRows, DeleteFilterAliveRowsTest, + testing::Values( + AliveRowsCase{ + .name = "AllReturnedWithoutDeletes", + .delete_kind = AliveRowsDeleteKind::kNone, + .position_delete_positions = {}, + .position_delete_data_path = "", + .need_row_pos_col = true, + .counter_mode = CounterMode::kNone, + .batch_json = R"([[1], [2], [3]])", + .expected_alive_rows = {0, 1, 2}, + .expected_delete_count = std::nullopt, + }, + AliveRowsCase{ + .name = "PositionDeletesFilterByRowPos", + .delete_kind = AliveRowsDeleteKind::kPosition, + .position_delete_positions = {1, 3}, + .position_delete_data_path = "", + .need_row_pos_col = true, + .counter_mode = CounterMode::kNone, + .batch_json = R"([[10, 0], [20, 1], [30, 2], [40, 3]])", + .expected_alive_rows = {0, 2}, + .expected_delete_count = std::nullopt, + }, + AliveRowsCase{ + .name = "EqualityDeletesApplyOrSemantics", + .delete_kind = AliveRowsDeleteKind::kEqualityNameAndCategory, + .position_delete_positions = {}, + .position_delete_data_path = "", + .need_row_pos_col = true, + .counter_mode = CounterMode::kNone, + .batch_json = + R"([[1, "Alice", "blue"], [2, "Bob", "blue"], [3, "Carol", "red"], [4, "Dan", "green"]])", + .expected_alive_rows = {0, 3}, + .expected_delete_count = std::nullopt, + }, + AliveRowsCase{ + .name = "MixedDeletesPosBeforeEqCanDeleteAll", + .delete_kind = AliveRowsDeleteKind::kMixedPositionAndEqualityId, + .position_delete_positions = {0, 1}, + .position_delete_data_path = "", + .need_row_pos_col = true, + .counter_mode = CounterMode::kNone, + .batch_json = R"([[1, 0], [2, 1], [3, 2]])", + .expected_alive_rows = {}, + .expected_delete_count = std::nullopt, + }, + AliveRowsCase{ + .name = "EmptyBatchReturnsEmptyBitmap", + .delete_kind = AliveRowsDeleteKind::kNone, + .position_delete_positions = {}, + .position_delete_data_path = "", + .need_row_pos_col = true, + .counter_mode = CounterMode::kNone, + .batch_json = R"([])", + .expected_alive_rows = {}, + .expected_delete_count = std::nullopt, + }, + AliveRowsCase{ + .name = "NeedRowPosColFalseSkipsPosFiltering", + .delete_kind = AliveRowsDeleteKind::kPosition, + .position_delete_positions = {0, 1}, + .position_delete_data_path = "", + .need_row_pos_col = false, + .counter_mode = CounterMode::kNone, + .batch_json = R"([[10], [20], [30]])", + .expected_alive_rows = {0, 1, 2}, + .expected_delete_count = std::nullopt, + }, + AliveRowsCase{ + .name = "CounterCountsPosDeletes", + .delete_kind = AliveRowsDeleteKind::kPosition, + .position_delete_positions = {0, 2}, + .position_delete_data_path = "", + .need_row_pos_col = true, + .counter_mode = CounterMode::kAttachCounter, + .batch_json = R"([[10, 0], [20, 1], [30, 2], [40, 3]])", + .expected_alive_rows = {1, 3}, + .expected_delete_count = 2, + }, + AliveRowsCase{ + .name = "CounterCountsEqDeletes", + .delete_kind = AliveRowsDeleteKind::kEqualityName, + .position_delete_positions = {}, + .position_delete_data_path = "", + .need_row_pos_col = true, + .counter_mode = CounterMode::kAttachCounter, + .batch_json = R"([[1, "Alice"], [2, "Bob"], [3, "Bob"], [4, "Dan"]])", + .expected_alive_rows = {0, 3}, + .expected_delete_count = 2, + }, + AliveRowsCase{ + .name = "NullCounterIsNoOp", + .delete_kind = AliveRowsDeleteKind::kPosition, + .position_delete_positions = {0}, + .position_delete_data_path = "", + .need_row_pos_col = true, + .counter_mode = CounterMode::kNullCounter, + .batch_json = R"([[10, 0], [20, 1]])", + .expected_alive_rows = {1}, + .expected_delete_count = std::nullopt, + }, + AliveRowsCase{ + .name = "PosDeleteOnlyFiltersMatchingPath", + .delete_kind = AliveRowsDeleteKind::kPosition, + .position_delete_positions = {0, 1, 2}, + .position_delete_data_path = "other-data.parquet", + .need_row_pos_col = true, + .counter_mode = CounterMode::kNone, + .batch_json = R"([[10, 0], [20, 1], [30, 2]])", + .expected_alive_rows = {0, 1, 2}, + .expected_delete_count = std::nullopt, + }), + ParamName); + +TEST_F(DeleteFilterTest, TopLevelStructEqualityErrors) { + auto nested_schema = std::make_shared(std::vector{ + SchemaField::MakeRequired(1, "id", int32()), + SchemaField::MakeOptional( + 4, "info", struct_({SchemaField::MakeOptional(5, "city", string())}))}); + auto requested_schema = std::make_shared( + std::vector{SchemaField::MakeRequired(1, "id", int32())}); + + auto eq_by_info = std::make_shared(DataFile{ + .content = DataFile::Content::kEqualityDeletes, + .file_path = "eq-info.parquet", + .file_format = FileFormatType::kParquet, + .equality_ids = {4}, + }); + std::vector> delete_files = {eq_by_info}; + auto filter = DeleteFilter::Make(std::string(kDataPath), delete_files, nested_schema, + requested_schema, file_io_); + + EXPECT_THAT(filter, IsError(ErrorKind::kInvalidArgument)); + EXPECT_THAT(filter, HasErrorMessage("must reference a primitive field")); +} + +TEST_F(DeleteFilterTest, NestedStructFieldEqualityFiltersRows) { + auto nested_schema = std::make_shared(std::vector{ + SchemaField::MakeRequired(1, "id", int32()), + SchemaField::MakeOptional( + 4, "info", + struct_({SchemaField::MakeOptional(5, "city", string()), + SchemaField::MakeOptional(6, "state", string())}))}); + auto requested_schema = std::make_shared( + std::vector{SchemaField::MakeRequired(1, "id", int32())}); + + EqualityDeleteWriterOptions options{ + .path = "eq-city.parquet", + .schema = nested_schema, + .spec = partition_spec_, + .partition = PartitionValues{}, + .format = FileFormatType::kParquet, + .io = file_io_, + .equality_field_ids = {5}, + .properties = {{"write.parquet.compression-codec", "uncompressed"}}, + }; + ICEBERG_UNWRAP_OR_FAIL(auto writer, EqualityDeleteWriter::Make(options)); + ICEBERG_UNWRAP_OR_FAIL( + auto delete_batch, + MakeBatch(*nested_schema, + R"([{"id": 0, "info": {"city": "Paris", "state": "FR"}}])")); + ASSERT_THAT(writer->Write(&delete_batch.array), IsOk()); + ASSERT_THAT(writer->Close(), IsOk()); + ICEBERG_UNWRAP_OR_FAIL(auto eq_by_city_meta, writer->Metadata()); + auto eq_by_city = eq_by_city_meta.data_files[0]; + + std::vector> delete_files = {eq_by_city}; + auto filter = DeleteFilter::Make(std::string(kDataPath), delete_files, nested_schema, + requested_schema, file_io_); + ASSERT_THAT(filter, IsOk()); + EXPECT_THAT(FieldIds(*filter.value()->RequiredSchema()), testing::ElementsAre(1, 4)); + + ICEBERG_UNWRAP_OR_FAIL(auto data_batch, + MakeBatch(*filter.value()->RequiredSchema(), + R"([{"id": 1, "info": {"city": "London"}}, + {"id": 2, "info": {"city": "Paris"}}, + {"id": 3, "info": null}])")); + + auto alive = filter.value()->ComputeAliveRows(data_batch.schema, data_batch.array); + + ASSERT_THAT(alive, IsOk()); + ExpectAliveRows(alive.value(), {0, 2}); +} + +TEST_F(DeleteFilterTest, NestedEqualityWithPartialStructNoOverDelete) { + auto nested_schema = std::make_shared(std::vector{ + SchemaField::MakeRequired(1, "id", int32()), + SchemaField::MakeOptional( + 4, "info", + struct_({SchemaField::MakeOptional(5, "city", string()), + SchemaField::MakeOptional(6, "state", string())}))}); + auto requested_schema = std::shared_ptr( + nested_schema->Project(std::unordered_set{1, 5}).value()); + + EqualityDeleteWriterOptions options{ + .path = "eq-state-partial.parquet", + .schema = nested_schema, + .spec = partition_spec_, + .partition = PartitionValues{}, + .format = FileFormatType::kParquet, + .io = file_io_, + .equality_field_ids = {6}, + .properties = {{"write.parquet.compression-codec", "uncompressed"}}, + }; + ICEBERG_UNWRAP_OR_FAIL(auto writer, EqualityDeleteWriter::Make(options)); + ICEBERG_UNWRAP_OR_FAIL( + auto delete_batch, + MakeBatch(*nested_schema, + R"([{"id": 0, "info": {"city": "ignored", "state": "CA"}}])")); + ASSERT_THAT(writer->Write(&delete_batch.array), IsOk()); + ASSERT_THAT(writer->Close(), IsOk()); + ICEBERG_UNWRAP_OR_FAIL(auto eq_by_state_meta, writer->Metadata()); + auto eq_by_state = eq_by_state_meta.data_files[0]; + + std::vector> delete_files = {eq_by_state}; + auto filter = DeleteFilter::Make(std::string(kDataPath), delete_files, nested_schema, + requested_schema, file_io_); + ASSERT_THAT(filter, IsOk()); + + ICEBERG_UNWRAP_OR_FAIL(auto data_batch, + MakeBatch(*filter.value()->RequiredSchema(), + R"([{"id": 1, "info": {"city": "SF", "state": "CA"}}, + {"id": 2, "info": {"city": "NYC", "state": "NY"}}, + {"id": 3, "info": null}])")); + + auto alive = filter.value()->ComputeAliveRows(data_batch.schema, data_batch.array); + + ASSERT_THAT(alive, IsOk()); + ExpectAliveRows(alive.value(), {1, 2}); +} + +TEST_F(DeleteFilterTest, EqualityDeleteProjectionSortsNestedFieldsById) { + auto nested_schema = std::make_shared(std::vector{ + SchemaField::MakeRequired(1, "id", int32()), + SchemaField::MakeOptional( + 4, "info", + struct_({SchemaField::MakeOptional(6, "state", string()), + SchemaField::MakeOptional(5, "city", string())}))}); + auto requested_schema = std::shared_ptr( + nested_schema->Project(std::unordered_set{1}).value()); + auto eq_delete = std::make_shared(DataFile{ + .content = DataFile::Content::kEqualityDeletes, + .file_path = "eq-city-state.orc", + .file_format = FileFormatType::kOrc, + .equality_ids = {6, 5}, + }); + std::vector> delete_files = {eq_delete}; + + std::shared_ptr captured_projection; + ScopedReaderFactory reader_factory( + FileFormatType::kOrc, [&captured_projection]() -> Result> { + return std::make_unique(&captured_projection); + }); + + auto filter = DeleteFilter::Make(std::string(kDataPath), delete_files, nested_schema, + requested_schema, file_io_); + ASSERT_THAT(filter, IsOk()); + ASSERT_THAT(filter.value()->EqDeletedRowFilter(), IsOk()); + + ASSERT_NE(captured_projection, nullptr); + ASSERT_EQ(captured_projection->fields().size(), 1); + auto info_type = + std::dynamic_pointer_cast(captured_projection->fields()[0].type()); + ASSERT_NE(info_type, nullptr); + EXPECT_THAT(StructFieldIds(*info_type), testing::ElementsAre(5, 6)); +} + +TEST_F(DeleteFilterTest, DroppedTopLevelFieldResolvedBySchemas) { + auto current_schema = std::make_shared( + std::vector{SchemaField::MakeRequired(1, "id", int32())}, + /*schema_id=*/2); + auto historic_schema = std::make_shared( + std::vector{SchemaField::MakeRequired(1, "id", int32()), + SchemaField::MakeOptional(7, "dropped_value", string())}, + /*schema_id=*/1); + auto requested_schema = current_schema; + auto eq_by_dropped = std::make_shared(DataFile{ + .content = DataFile::Content::kEqualityDeletes, + .file_path = "eq-dropped.parquet", + .file_format = FileFormatType::kParquet, + .equality_ids = {7}, + }); + std::vector> delete_files = {eq_by_dropped}; + std::vector> schemas = {current_schema, historic_schema}; + + auto filter = DeleteFilter::Make(std::string(kDataPath), delete_files, current_schema, + requested_schema, file_io_, schemas); + + ASSERT_THAT(filter, IsOk()); + EXPECT_THAT(FieldIds(*filter.value()->RequiredSchema()), testing::ElementsAre(1, 7)); + EXPECT_THAT(FieldNames(*filter.value()->RequiredSchema()), + testing::ElementsAre("id", "dropped_value")); +} + +TEST_F(DeleteFilterTest, MakeFieldLookupSchemasMayIncludeCurrent) { + auto current_schema = std::make_shared( + std::vector{SchemaField::MakeRequired(1, "id", int32()), + SchemaField::MakeOptional(7, "current_value", string())}, + /*schema_id=*/2); + auto old_schema = std::make_shared( + std::vector{SchemaField::MakeRequired(1, "id", int32()), + SchemaField::MakeOptional(7, "old_value", int32())}, + /*schema_id=*/1); + std::vector> schemas = {old_schema, current_schema}; + + auto lookup_result = DeleteFilter::MakeFieldLookup(current_schema, schemas); + ASSERT_THAT(lookup_result, IsOk()); + + auto field_result = lookup_result.value()(7); + ASSERT_THAT(field_result, IsOk()); + ASSERT_TRUE(field_result.value().has_value()); + EXPECT_EQ(field_result.value()->field.name(), "current_value"); + EXPECT_EQ(field_result.value()->field.type()->type_id(), TypeId::kString); +} + +TEST_F(DeleteFilterTest, MakeFieldLookupCurrentSchemaWins) { + auto current_schema = std::make_shared( + std::vector{SchemaField::MakeRequired(1, "id", int32()), + SchemaField::MakeOptional(7, "current_value", string())}, + /*schema_id=*/3); + auto fallback_schema = std::make_shared( + std::vector{SchemaField::MakeRequired(1, "id", int32()), + SchemaField::MakeOptional(7, "fallback_value", int32())}, + /*schema_id=*/2); + std::vector> schemas = {fallback_schema}; + + auto lookup_result = DeleteFilter::MakeFieldLookup(current_schema, schemas); + ASSERT_THAT(lookup_result, IsOk()); + + auto field_result = lookup_result.value()(7); + ASSERT_THAT(field_result, IsOk()); + ASSERT_TRUE(field_result.value().has_value()); + EXPECT_EQ(field_result.value()->field.name(), "current_value"); + EXPECT_EQ(field_result.value()->field.type()->type_id(), TypeId::kString); +} + +TEST_F(DeleteFilterTest, MakeFieldLookupLatestFallbackSchemaWins) { + auto current_schema = std::make_shared( + std::vector{SchemaField::MakeRequired(1, "id", int32())}, + /*schema_id=*/3); + auto older_schema = std::make_shared( + std::vector{SchemaField::MakeRequired(1, "id", int32()), + SchemaField::MakeOptional(7, "old_value", int32())}, + /*schema_id=*/1); + auto newer_schema = std::make_shared( + std::vector{SchemaField::MakeRequired(1, "id", int32()), + SchemaField::MakeOptional(7, "new_value", string())}, + /*schema_id=*/2); + std::vector> schemas = {older_schema, newer_schema}; + + auto lookup_result = DeleteFilter::MakeFieldLookup(current_schema, schemas); + ASSERT_THAT(lookup_result, IsOk()); + + auto field_result = lookup_result.value()(7); + ASSERT_THAT(field_result, IsOk()); + ASSERT_TRUE(field_result.value().has_value()); + EXPECT_EQ(field_result.value()->field.name(), "new_value"); + EXPECT_EQ(field_result.value()->field.type()->type_id(), TypeId::kString); +} + +TEST_F(DeleteFilterTest, DroppedNestedFieldResolvedBySchemas) { + auto current_schema = std::make_shared( + std::vector{ + SchemaField::MakeRequired(1, "id", int32()), + SchemaField::MakeOptional( + 4, "info", struct_({SchemaField::MakeOptional(5, "city", string())}))}, + /*schema_id=*/2); + auto historic_schema = std::make_shared( + std::vector{ + SchemaField::MakeRequired(1, "id", int32()), + SchemaField::MakeOptional( + 4, "info", + struct_({SchemaField::MakeOptional(5, "city", string()), + SchemaField::MakeOptional(6, "state", string())}))}, + /*schema_id=*/1); + auto requested_schema = std::make_shared( + std::vector{SchemaField::MakeRequired(1, "id", int32())}); + auto eq_by_dropped_nested = std::make_shared(DataFile{ + .content = DataFile::Content::kEqualityDeletes, + .file_path = "eq-dropped-state.parquet", + .file_format = FileFormatType::kParquet, + .equality_ids = {6}, + }); + std::vector> delete_files = {eq_by_dropped_nested}; + std::vector> schemas = {current_schema, historic_schema}; + + auto filter = DeleteFilter::Make(std::string(kDataPath), delete_files, current_schema, + requested_schema, file_io_, schemas); + + ASSERT_THAT(filter, IsOk()); + EXPECT_THAT(FieldIds(*filter.value()->RequiredSchema()), testing::ElementsAre(1, 4)); + const auto& info = filter.value()->RequiredSchema()->fields()[1]; + auto info_type = std::dynamic_pointer_cast(info.type()); + ASSERT_NE(info_type, nullptr); + EXPECT_THAT(StructFieldIds(*info_type), testing::ElementsAre(6)); +} + +TEST_F(DeleteFilterTest, MetadataLookupUsesSchemas) { + auto current_schema = std::make_shared( + std::vector{SchemaField::MakeRequired(1, "id", int32())}, + /*schema_id=*/2); + auto historic_schema = std::make_shared( + std::vector{SchemaField::MakeRequired(1, "id", int32()), + SchemaField::MakeOptional(7, "dropped_value", string())}, + /*schema_id=*/1); + auto metadata = std::make_shared(TableMetadata{ + .format_version = TableMetadata::kDefaultTableFormatVersion, + .schemas = {historic_schema, current_schema}, + .current_schema_id = current_schema->schema_id(), + }); + auto requested_schema = current_schema; + auto eq_by_dropped = std::make_shared(DataFile{ + .content = DataFile::Content::kEqualityDeletes, + .file_path = "eq-dropped.parquet", + .file_format = FileFormatType::kParquet, + .equality_ids = {7}, + }); + std::vector> delete_files = {eq_by_dropped}; + + auto filter = DeleteFilter::Make(std::string(kDataPath), delete_files, metadata, + requested_schema, file_io_); + + ASSERT_THAT(filter, IsOk()); + EXPECT_THAT(FieldIds(*filter.value()->RequiredSchema()), testing::ElementsAre(1, 7)); +} + +TEST_F(DeleteFilterTest, MetadataLookupPrefersLatestFallbackSchema) { + auto current_schema = std::make_shared( + std::vector{SchemaField::MakeRequired(1, "id", int32())}, + /*schema_id=*/3); + auto older_historic_schema = std::make_shared( + std::vector{SchemaField::MakeRequired(1, "id", int32()), + SchemaField::MakeOptional(7, "old_name", int32())}, + /*schema_id=*/1); + auto newer_historic_schema = std::make_shared( + std::vector{SchemaField::MakeRequired(1, "id", int32()), + SchemaField::MakeOptional(7, "new_name", string())}, + /*schema_id=*/2); + auto metadata = std::make_shared(TableMetadata{ + .format_version = TableMetadata::kDefaultTableFormatVersion, + .schemas = {older_historic_schema, newer_historic_schema, current_schema}, + .current_schema_id = current_schema->schema_id(), + }); + auto eq_by_dropped = std::make_shared(DataFile{ + .content = DataFile::Content::kEqualityDeletes, + .file_path = "eq-dropped.parquet", + .file_format = FileFormatType::kParquet, + .equality_ids = {7}, + }); + std::vector> delete_files = {eq_by_dropped}; + + auto filter = DeleteFilter::Make(std::string(kDataPath), delete_files, metadata, + current_schema, file_io_); + + ASSERT_THAT(filter, IsOk()); + ASSERT_THAT(FieldIds(*filter.value()->RequiredSchema()), testing::ElementsAre(1, 7)); + const auto& dropped_field = filter.value()->RequiredSchema()->fields()[1]; + EXPECT_EQ(dropped_field.name(), "new_name"); + EXPECT_EQ(dropped_field.type()->type_id(), TypeId::kString); +} + +TEST_F(DeleteFilterTest, DroppedNestedFieldFiltersRowsWithSchemas) { + auto current_schema = std::make_shared( + std::vector{ + SchemaField::MakeRequired(1, "id", int32()), + SchemaField::MakeOptional( + 4, "info", struct_({SchemaField::MakeOptional(5, "city", string())}))}, + /*schema_id=*/2); + auto historic_schema = std::make_shared( + std::vector{ + SchemaField::MakeRequired(1, "id", int32()), + SchemaField::MakeOptional( + 4, "info", + struct_({SchemaField::MakeOptional(5, "city", string()), + SchemaField::MakeOptional(6, "state", string())}))}, + /*schema_id=*/1); + auto requested_schema = current_schema; + + EqualityDeleteWriterOptions options{ + .path = "eq-dropped-state-filter.parquet", + .schema = historic_schema, + .spec = partition_spec_, + .partition = PartitionValues{}, + .format = FileFormatType::kParquet, + .io = file_io_, + .equality_field_ids = {6}, + .properties = {{"write.parquet.compression-codec", "uncompressed"}}, + }; + ICEBERG_UNWRAP_OR_FAIL(auto writer, EqualityDeleteWriter::Make(options)); + ICEBERG_UNWRAP_OR_FAIL( + auto delete_batch, + MakeBatch(*historic_schema, + R"([{"id": 0, "info": {"city": "ignored", "state": "CA"}}])")); + ASSERT_THAT(writer->Write(&delete_batch.array), IsOk()); + ASSERT_THAT(writer->Close(), IsOk()); + ICEBERG_UNWRAP_OR_FAIL(auto eq_by_state_meta, writer->Metadata()); + auto eq_by_state = eq_by_state_meta.data_files[0]; + std::vector> delete_files = {eq_by_state}; + std::vector> schemas = {current_schema, historic_schema}; + auto filter = DeleteFilter::Make(std::string(kDataPath), delete_files, current_schema, + requested_schema, file_io_, schemas); + ASSERT_THAT(filter, IsOk()); + + ICEBERG_UNWRAP_OR_FAIL(auto data_batch, + MakeBatch(*filter.value()->RequiredSchema(), + R"([{"id": 1, "info": {"city": "SF", "state": "CA"}}, + {"id": 2, "info": {"city": "NYC", "state": "NY"}}, + {"id": 3, "info": null}])")); + + auto alive = filter.value()->ComputeAliveRows(data_batch.schema, data_batch.array); + + ASSERT_THAT(alive, IsOk()); + ExpectAliveRows(alive.value(), {1, 2}); +} + +TEST_F(DeleteFilterTest, DeletionVectorErrorPropagatesFromCompute) { + auto dv_file = std::make_shared(DataFile{ + .content = DataFile::Content::kPositionDeletes, + .file_path = "dv.puffin", + .file_format = FileFormatType::kPuffin, + }); + std::vector> delete_files = {dv_file}; + auto requested_schema = Project({1}); + + auto filter = DeleteFilter::Make(std::string(kDataPath), delete_files, table_schema_, + requested_schema, file_io_); + + ASSERT_THAT(filter, IsOk()); + ICEBERG_UNWRAP_OR_FAIL(auto batch, + MakeBatch(*filter.value()->RequiredSchema(), R"([[1, 0]])")); + auto alive = filter.value()->ComputeAliveRows(batch.schema, batch.array); + ASSERT_THAT(alive, IsError(ErrorKind::kNotSupported)); +} + +TEST_F(DeleteFilterTest, EmptyBatchPropagatesDeleteLoadErrors) { + auto dv_file = std::make_shared(DataFile{ + .content = DataFile::Content::kPositionDeletes, + .file_path = "dv-empty.puffin", + .file_format = FileFormatType::kPuffin, + }); + std::vector> delete_files = {dv_file}; + auto requested_schema = Project({1}); + auto filter = DeleteFilter::Make(std::string(kDataPath), delete_files, table_schema_, + requested_schema, file_io_); + ASSERT_THAT(filter, IsOk()); + ICEBERG_UNWRAP_OR_FAIL(auto batch, + MakeBatch(*filter.value()->RequiredSchema(), R"([])")); + + auto alive = filter.value()->ComputeAliveRows(batch.schema, batch.array); + + ASSERT_THAT(alive, IsError(ErrorKind::kNotSupported)); +} + +TEST_F(DeleteFilterTest, CounterAccumulatesAcrossBatches) { + ICEBERG_UNWRAP_OR_FAIL(auto pos_delete, + PositionDeleteFile("pos-multi-batch.parquet", {1})); + std::vector> delete_files = {pos_delete}; + auto requested_schema = Project({1}); + auto counter = std::make_shared(); + auto filter = DeleteFilter::Make(std::string(kDataPath), delete_files, table_schema_, + requested_schema, file_io_, + /*need_row_pos_col=*/true, counter); + ASSERT_THAT(filter, IsOk()); + + ICEBERG_UNWRAP_OR_FAIL(auto batch1, MakeBatch(*filter.value()->RequiredSchema(), + R"([[10, 0], [20, 1], [30, 2]])")); + ICEBERG_UNWRAP_OR_FAIL( + auto batch2, MakeBatch(*filter.value()->RequiredSchema(), R"([[40, 3], [50, 4]])")); + + ASSERT_THAT(filter.value()->ComputeAliveRows(batch1.schema, batch1.array), IsOk()); + ASSERT_THAT(filter.value()->ComputeAliveRows(batch2.schema, batch2.array), IsOk()); + EXPECT_EQ(counter->Get(), 1); +} + +enum class MakeErrorDeleteKind { + kNullDeleteFile, + kDataFile, + kEqualityDeleteWithEmptyIds, + kUnknownEqualityFieldId, +}; + +struct MakeErrorCase { + const char* name; + MakeErrorDeleteKind delete_kind; +}; + +class DeleteFilterMakeErrorTest : public DeleteFilterTest, + public testing::WithParamInterface {}; + +TEST_P(DeleteFilterMakeErrorTest, InvalidDeleteFilesError) { + const auto& test_case = GetParam(); + std::vector> delete_files; + switch (test_case.delete_kind) { + case MakeErrorDeleteKind::kNullDeleteFile: + delete_files.push_back(nullptr); + break; + case MakeErrorDeleteKind::kDataFile: + delete_files.push_back(std::make_shared(DataFile{ + .content = DataFile::Content::kData, + .file_path = "data.parquet", + .file_format = FileFormatType::kParquet, + })); + break; + case MakeErrorDeleteKind::kEqualityDeleteWithEmptyIds: + delete_files.push_back(std::make_shared(DataFile{ + .content = DataFile::Content::kEqualityDeletes, + .file_path = "eq-no-ids.parquet", + .file_format = FileFormatType::kParquet, + .equality_ids = {}, + })); + break; + case MakeErrorDeleteKind::kUnknownEqualityFieldId: + delete_files.push_back(std::make_shared(DataFile{ + .content = DataFile::Content::kEqualityDeletes, + .file_path = "eq-unknown.parquet", + .file_format = FileFormatType::kParquet, + .equality_ids = {999}, + })); + break; + } + + auto filter = DeleteFilter::Make(std::string(kDataPath), delete_files, table_schema_, + table_schema_, file_io_); + + EXPECT_THAT(filter, IsError(ErrorKind::kInvalidArgument)); +} + +INSTANTIATE_TEST_SUITE_P( + MakeErrors, DeleteFilterMakeErrorTest, + testing::Values( + MakeErrorCase{ + .name = "NullDeleteFile", + .delete_kind = MakeErrorDeleteKind::kNullDeleteFile, + }, + MakeErrorCase{ + .name = "DataFileAsDeleteFile", + .delete_kind = MakeErrorDeleteKind::kDataFile, + }, + MakeErrorCase{ + .name = "EqualityDeleteWithEmptyIds", + .delete_kind = MakeErrorDeleteKind::kEqualityDeleteWithEmptyIds, + }, + MakeErrorCase{ + .name = "UnknownEqualityFieldId", + .delete_kind = MakeErrorDeleteKind::kUnknownEqualityFieldId, + }), + ParamName); + +TEST_F(DeleteFilterTest, EqualityFieldNestedInListOrMapErrors) { + auto schema = std::make_shared(std::vector{ + SchemaField::MakeRequired(1, "id", int32()), + SchemaField::MakeOptional(4, "tags", + list(SchemaField::MakeRequired(5, "element", string()))), + SchemaField::MakeOptional(6, "attrs", + map(SchemaField::MakeRequired(7, "key", string()), + SchemaField::MakeOptional(8, "value", string())))}); + auto requested_schema = std::make_shared( + std::vector{SchemaField::MakeRequired(1, "id", int32())}); + + for (const auto& [field_id, nested_type] : + {std::pair{5, std::string_view("list")}, std::pair{8, std::string_view("map")}}) { + auto eq_delete = std::make_shared(DataFile{ + .content = DataFile::Content::kEqualityDeletes, + .file_path = "eq-nested-container.parquet", + .file_format = FileFormatType::kParquet, + .equality_ids = {field_id}, + }); + std::vector> delete_files = {eq_delete}; + + auto filter = DeleteFilter::Make(std::string(kDataPath), delete_files, schema, + requested_schema, file_io_); + + EXPECT_THAT(filter, IsError(ErrorKind::kInvalidArgument)); + EXPECT_THAT(filter, + HasErrorMessage(std::format("must not be nested in {}", nested_type))); + } +} + +TEST_F(DeleteFilterTest, NullPosInBatchErrors) { + ICEBERG_UNWRAP_OR_FAIL(auto pos_delete, + PositionDeleteFile("pos-null-pos.parquet", {0})); + std::vector> delete_files = {pos_delete}; + auto requested_schema = Project({1}); + auto filter = DeleteFilter::Make(std::string(kDataPath), delete_files, table_schema_, + requested_schema, file_io_); + ASSERT_THAT(filter, IsOk()); + ICEBERG_UNWRAP_OR_FAIL(auto batch, MakeBatch(*filter.value()->RequiredSchema(), + R"([[10, null], [20, null]])")); + + auto alive = filter.value()->ComputeAliveRows(batch.schema, batch.array); + + EXPECT_THAT(alive, IsError(ErrorKind::kInvalidArrowData)); +} + +TEST_F(DeleteFilterTest, ExpectedSchemaIsRequestedSchema) { + auto requested_schema = Project({1}); + auto eq_by_name = std::make_shared(DataFile{ + .content = DataFile::Content::kEqualityDeletes, + .file_path = "eq-name.parquet", + .file_format = FileFormatType::kParquet, + .equality_ids = {2}, + }); + std::vector> delete_files = {eq_by_name}; + auto filter = DeleteFilter::Make(std::string(kDataPath), delete_files, table_schema_, + requested_schema, file_io_); + ASSERT_THAT(filter, IsOk()); + EXPECT_EQ(filter.value()->ExpectedSchema(), requested_schema); + EXPECT_NE(filter.value()->RequiredSchema(), requested_schema); +} + +TEST_F(DeleteFilterTest, IncrementDeleteCountForwardsToCounter) { + std::vector> delete_files; + auto counter = std::make_shared(); + auto filter = DeleteFilter::Make(std::string(kDataPath), delete_files, table_schema_, + table_schema_, file_io_, + /*need_row_pos_col=*/true, counter); + ASSERT_THAT(filter, IsOk()); + + filter.value()->IncrementDeleteCount(3); + filter.value()->IncrementDeleteCount(); + + EXPECT_EQ(counter->Get(), 4); +} + +TEST_F(DeleteFilterTest, DeletedRowPositionsNullWithNoPosDeletes) { + std::vector> delete_files; + auto filter = DeleteFilter::Make(std::string(kDataPath), delete_files, table_schema_, + table_schema_, file_io_); + ASSERT_THAT(filter, IsOk()); + + auto index = filter.value()->DeletedRowPositions(); + + ASSERT_THAT(index, IsOk()); + EXPECT_EQ(index.value(), nullptr); +} + +TEST_F(DeleteFilterTest, DeletedRowPositionsLazyLoads) { + ICEBERG_UNWRAP_OR_FAIL(auto pos_delete, + PositionDeleteFile("pos-index.parquet", {1, 3})); + std::vector> delete_files = {pos_delete}; + auto requested_schema = Project({1}); + auto filter = DeleteFilter::Make(std::string(kDataPath), delete_files, table_schema_, + requested_schema, file_io_); + ASSERT_THAT(filter, IsOk()); + + auto index = filter.value()->DeletedRowPositions(); + + ASSERT_THAT(index, IsOk()); + ASSERT_NE(index.value(), nullptr); + EXPECT_TRUE(index.value()->IsDeleted(1)); + EXPECT_TRUE(index.value()->IsDeleted(3)); + EXPECT_FALSE(index.value()->IsDeleted(0)); + EXPECT_FALSE(index.value()->IsDeleted(2)); +} + +TEST_F(DeleteFilterTest, EqDeletedRowFilterTrueWithNoEqDeletes) { + std::vector> delete_files; + auto filter = DeleteFilter::Make(std::string(kDataPath), delete_files, table_schema_, + table_schema_, file_io_); + ASSERT_THAT(filter, IsOk()); + + auto predicate_result = filter.value()->EqDeletedRowFilter(); + + ASSERT_THAT(predicate_result, IsOk()); + ASSERT_TRUE(static_cast(predicate_result.value())); + + ICEBERG_UNWRAP_OR_FAIL(auto batch, MakeBatch(*filter.value()->RequiredSchema(), + R"([[1, "Alice", "blue"]])")); + ICEBERG_UNWRAP_OR_FAIL(auto row, ArrowArrayStructLike::Make(batch.schema, batch.array)); + ICEBERG_UNWRAP_OR_FAIL(auto alive, predicate_result.value()(*row)); + EXPECT_TRUE(alive); +} + +TEST_F(DeleteFilterTest, EqDeletedRowFilterReturnsTrueForAliveRows) { + ICEBERG_UNWRAP_OR_FAIL( + auto eq_by_name, + EqualityDeleteFile("eq-filter.parquet", R"([[0, "Bob", "unused"]])", {2})); + std::vector> delete_files = {eq_by_name}; + auto requested_schema = Project({1}); + auto filter = DeleteFilter::Make(std::string(kDataPath), delete_files, table_schema_, + requested_schema, file_io_); + ASSERT_THAT(filter, IsOk()); + + auto predicate_result = filter.value()->EqDeletedRowFilter(); + ASSERT_THAT(predicate_result, IsOk()); + auto& predicate = predicate_result.value(); + ASSERT_TRUE(static_cast(predicate)); + + ICEBERG_UNWRAP_OR_FAIL(auto batch, + MakeBatch(*filter.value()->RequiredSchema(), + R"([[1, "Alice"], [2, "Bob"], [3, "Carol"]])")); + ICEBERG_UNWRAP_OR_FAIL(auto row, ArrowArrayStructLike::Make(batch.schema, batch.array)); + + ICEBERG_UNWRAP_OR_FAIL(auto alice_alive, predicate(*row)); + EXPECT_TRUE(alice_alive); + + ASSERT_THAT(row->Reset(1), IsOk()); + ICEBERG_UNWRAP_OR_FAIL(auto bob_alive, predicate(*row)); + EXPECT_FALSE(bob_alive); + + ASSERT_THAT(row->Reset(2), IsOk()); + ICEBERG_UNWRAP_OR_FAIL(auto carol_alive, predicate(*row)); + EXPECT_TRUE(carol_alive); +} + +TEST_F(DeleteFilterTest, EqDeletedRowFilterIsCached) { + ICEBERG_UNWRAP_OR_FAIL( + auto eq_by_name, + EqualityDeleteFile("eq-cache.parquet", R"([[0, "Bob", "unused"]])", {2})); + std::vector> delete_files = {eq_by_name}; + auto filter = DeleteFilter::Make(std::string(kDataPath), delete_files, table_schema_, + table_schema_, file_io_); + ASSERT_THAT(filter, IsOk()); + + auto result1 = filter.value()->EqDeletedRowFilter(); + auto result2 = filter.value()->EqDeletedRowFilter(); + ASSERT_THAT(result1, IsOk()); + ASSERT_THAT(result2, IsOk()); + EXPECT_TRUE(static_cast(result1.value())); + EXPECT_TRUE(static_cast(result2.value())); +} + +TEST_F(DeleteFilterTest, FindEqDeleteRowsTrueForDeleted) { + ICEBERG_UNWRAP_OR_FAIL( + auto eq_by_name, + EqualityDeleteFile("eq-find.parquet", R"([[0, "Bob", "unused"]])", {2})); + std::vector> delete_files = {eq_by_name}; + auto requested_schema = Project({1}); + auto filter = DeleteFilter::Make(std::string(kDataPath), delete_files, table_schema_, + requested_schema, file_io_); + ASSERT_THAT(filter, IsOk()); + + auto predicate_result = filter.value()->FindEqualityDeleteRows(); + ASSERT_THAT(predicate_result, IsOk()); + auto& predicate = predicate_result.value(); + ASSERT_TRUE(static_cast(predicate)); + + ICEBERG_UNWRAP_OR_FAIL(auto batch, + MakeBatch(*filter.value()->RequiredSchema(), + R"([[1, "Alice"], [2, "Bob"], [3, "Carol"]])")); + ICEBERG_UNWRAP_OR_FAIL(auto row, ArrowArrayStructLike::Make(batch.schema, batch.array)); + + ICEBERG_UNWRAP_OR_FAIL(auto alice_deleted, predicate(*row)); + EXPECT_FALSE(alice_deleted); + + ASSERT_THAT(row->Reset(1), IsOk()); + ICEBERG_UNWRAP_OR_FAIL(auto bob_deleted, predicate(*row)); + EXPECT_TRUE(bob_deleted); +} + +TEST_F(DeleteFilterTest, FindEqDeleteRowsFalseWithNoEqDeletes) { + std::vector> delete_files; + auto filter = DeleteFilter::Make(std::string(kDataPath), delete_files, table_schema_, + table_schema_, file_io_); + ASSERT_THAT(filter, IsOk()); + + auto predicate_result = filter.value()->FindEqualityDeleteRows(); + + ASSERT_THAT(predicate_result, IsOk()); + ASSERT_TRUE(static_cast(predicate_result.value())); + + ICEBERG_UNWRAP_OR_FAIL(auto batch, MakeBatch(*filter.value()->RequiredSchema(), + R"([[1, "Alice", "blue"]])")); + ICEBERG_UNWRAP_OR_FAIL(auto row, ArrowArrayStructLike::Make(batch.schema, batch.array)); + ICEBERG_UNWRAP_OR_FAIL(auto deleted, predicate_result.value()(*row)); + EXPECT_FALSE(deleted); +} + +TEST_F(DeleteFilterTest, ExplicitFieldLookupFiltersRows) { + ICEBERG_UNWRAP_OR_FAIL( + auto eq_by_name, + EqualityDeleteFile("eq-lookup.parquet", R"([[0, "Bob", "unused"]])", {2})); + std::vector> delete_files = {eq_by_name}; + auto requested_schema = Project({1}); + + ICEBERG_UNWRAP_OR_FAIL(auto base_lookup, DeleteFilter::MakeFieldLookup(table_schema_)); + DeleteFilter::FieldLookup custom_lookup = + [base_lookup = std::move(base_lookup)]( + int32_t field_id) -> Result> { + if (field_id == 2) { + return base_lookup(field_id); + } + return std::nullopt; + }; + + auto filter = DeleteFilter::Make(std::string(kDataPath), delete_files, requested_schema, + file_io_, std::move(custom_lookup)); + + ASSERT_THAT(filter, IsOk()); + ICEBERG_UNWRAP_OR_FAIL(auto batch, + MakeBatch(*filter.value()->RequiredSchema(), + R"([[1, "Alice"], [2, "Bob"], [3, "Carol"]])")); + + auto alive = filter.value()->ComputeAliveRows(batch.schema, batch.array); + + ASSERT_THAT(alive, IsOk()); + ExpectAliveRows(alive.value(), {0, 2}); +} + +TEST_F(DeleteFilterTest, ExplicitFieldLookupNulloptErrors) { + // A lookup that returns nullopt for the equality field must produce an error + // at Make() time (during ComputeRequiredSchema), not silently skip the field. + auto eq_by_name = std::make_shared(DataFile{ + .content = DataFile::Content::kEqualityDeletes, + .file_path = "eq-missing.parquet", + .file_format = FileFormatType::kParquet, + .equality_ids = {2}, + }); + std::vector> delete_files = {eq_by_name}; + auto requested_schema = Project({1}); + + DeleteFilter::FieldLookup empty_lookup = + [](int32_t) -> Result> { + return std::nullopt; + }; + + auto filter = DeleteFilter::Make(std::string(kDataPath), delete_files, requested_schema, + file_io_, std::move(empty_lookup)); + + EXPECT_THAT(filter, IsError(ErrorKind::kInvalidArgument)); +} + +TEST_F(DeleteFilterTest, ExplicitFieldLookupRejectsListOrMapProjection) { + auto eq_by_element = std::make_shared(DataFile{ + .content = DataFile::Content::kEqualityDeletes, + .file_path = "eq-element.parquet", + .file_format = FileFormatType::kParquet, + .equality_ids = {5}, + }); + std::vector> delete_files = {eq_by_element}; + auto requested_schema = Project({1}); + + DeleteFilter::FieldLookup list_lookup = + [](int32_t field_id) -> Result> { + auto element = SchemaField::MakeRequired(5, "element", string()); + return DeleteFilter::FieldLookupResult{ + .field = element, + .projection_field = SchemaField::MakeOptional(4, "tags", list(element)), + }; + }; + + auto filter = DeleteFilter::Make(std::string(kDataPath), delete_files, requested_schema, + file_io_, std::move(list_lookup)); + + EXPECT_THAT(filter, IsError(ErrorKind::kInvalidArgument)); + EXPECT_THAT(filter, HasErrorMessage("must not be nested in list")); +} + +TEST_F(DeleteFilterTest, ExplicitFieldLookupSkipsExistingFields) { + // When the equality field is already in requested_schema, the custom lookup + // must NOT be called + ICEBERG_UNWRAP_OR_FAIL( + auto eq_by_name, + EqualityDeleteFile("eq-already-present.parquet", R"([[0, "Bob", "unused"]])", {2})); + std::vector> delete_files = {eq_by_name}; + auto requested_schema = Project({1, 2}); + + bool lookup_called = false; + DeleteFilter::FieldLookup tracking_lookup = + [&lookup_called]( + int32_t) -> Result> { + lookup_called = true; + return std::nullopt; // would fail if called + }; + + auto filter = DeleteFilter::Make(std::string(kDataPath), delete_files, requested_schema, + file_io_, std::move(tracking_lookup)); + + ASSERT_THAT(filter, IsOk()); + EXPECT_FALSE(lookup_called); +} + +TEST_F(DeleteFilterTest, SchemasLookupDeduplicatesCurrentSchemaId) { + auto current_schema = std::make_shared( + std::vector{SchemaField::MakeRequired(1, "id", int32())}, + /*schema_id=*/2); + auto same_id_historic_schema = std::make_shared( + std::vector{SchemaField::MakeRequired(1, "id", int32()), + SchemaField::MakeOptional(7, "not_historic", string())}, + /*schema_id=*/2); + auto eq_by_dropped = std::make_shared(DataFile{ + .content = DataFile::Content::kEqualityDeletes, + .file_path = "eq-dropped.parquet", + .file_format = FileFormatType::kParquet, + .equality_ids = {7}, + }); + std::vector> delete_files = {eq_by_dropped}; + std::vector> schemas = {current_schema, + same_id_historic_schema}; + + auto filter = DeleteFilter::Make(std::string(kDataPath), delete_files, current_schema, + current_schema, file_io_, schemas); + + EXPECT_THAT(filter, IsError(ErrorKind::kInvalidArgument)); + EXPECT_THAT(filter, HasErrorMessage("Cannot find equality delete field id 7")); +} + +} // namespace iceberg