feat: optimize parquet reads with bucket and page-level filtering#232
feat: optimize parquet reads with bucket and page-level filtering#232liangjie3138 wants to merge 16 commits intoalibaba:mainfrom
Conversation
|
liangjie.liang seems not to be a GitHub user. You need a GitHub account to be able to sign the CLA. If you have already a GitHub account, please add the email address used for this commit to your account. You have signed the CLA already but the status is still pending? Let us recheck it. |
|
Thank you for your contribution! This is a highly complex and important feature, and your work on it is greatly appreciated. Given the large scope of this PR, would it be possible to split it into smaller, focused changes? For example, separating the bucket predicate logic from the Parquet point lookup improvements could make each part easier to review and move forward incrementally. Also, could you please fix the CI failures first so we can begin the review process? We truly recognize the effort behind this change and look forward to helping get it merged smoothly. |
There was a problem hiding this comment.
Pull request overview
Implements multi-level Parquet read optimizations (bucket selection + page/row-group filtering) by leveraging Parquet page indexes (ColumnIndex/OffsetIndex) and adding page-level prefetching to reduce I/O and decode work.
Changes:
- Added page-index-based filtering infrastructure (
ColumnIndexFilter,RowRanges) and a page-filtered row-group reader with page-range prefetch support. - Integrated page-level filtering/prefetch into
ParquetFileBatchReader/FileReaderWrapper, and enabled writing page indexes via a new writer option. - Added bucket-id derivation from predicates (
BucketSelectConverter) and expanded scan bucket filtering to support multiple buckets.
Reviewed changes
Copilot reviewed 28 out of 28 changed files in this pull request and generated 9 comments.
Show a summary per file
| File | Description |
|---|---|
| src/paimon/format/parquet/row_ranges.h | Introduces RowRanges abstraction for page/row-range selection. |
| src/paimon/format/parquet/row_ranges.cpp | Implements range union/intersection/overlap/add logic used by page filtering. |
| src/paimon/format/parquet/parquet_writer_builder.cpp | Enables Parquet page index writing behind an option. |
| src/paimon/format/parquet/parquet_format_defs.h | Adds new read/write options for page-index functionality. |
| src/paimon/format/parquet/parquet_file_batch_reader.h | Adds page-index filtering API and logging member. |
| src/paimon/format/parquet/parquet_file_batch_reader.cpp | Wires page-level filtering + eager prepare to start prebuffer earlier. |
| src/paimon/format/parquet/page_filtered_row_group_reader.h | Declares page-filtered row group read + page-range computation. |
| src/paimon/format/parquet/page_filtered_row_group_reader.cpp | Implements decode skipping + page-range prefetch logic for filtered reads. |
| src/paimon/format/parquet/page_filtered_row_group_reader_test.cpp | Adds end-to-end tests for page filtering and page-range computation. |
| src/paimon/format/parquet/file_reader_wrapper.h | Extends wrapper to support page-filtered RG reads and page-range prebuffering. |
| src/paimon/format/parquet/file_reader_wrapper.cpp | Implements page-filtered RG scheduling + unified PreBufferRanges prefetch. |
| src/paimon/format/parquet/column_index_filter.h | Adds ColumnIndex-based predicate evaluation for page selection. |
| src/paimon/format/parquet/column_index_filter.cpp | Implements ColumnIndex-based page matching and RowRanges generation. |
| src/paimon/format/parquet/column_index_filter_test.cpp | Adds RowRanges unit tests + ColumnIndexFilter integration tests. |
| src/paimon/format/parquet/CMakeLists.txt | Registers new parquet sources/tests; adds Arrow source include path. |
| src/paimon/core/operation/key_value_file_store_scan.cpp | Derives bucket filter from predicates when not explicitly set. |
| src/paimon/core/operation/file_store_scan.h | Changes bucket filter to optional<set<int32_t>>; adds helpers. |
| src/paimon/core/operation/file_store_scan.cpp | Updates bucket filtering logic to handle multiple buckets. |
| src/paimon/core/operation/bucket_select_converter.h | Declares predicate→bucket-id derivation helper. |
| src/paimon/core/operation/bucket_select_converter.cpp | Implements bucket-id derivation compatible with Java hashing. |
| src/paimon/core/operation/bucket_select_converter_test.cpp | Adds tests for bucket derivation across predicate shapes/types. |
| src/paimon/core/operation/merge_file_split_read.cpp | Refactors loops to index-based iteration. |
| src/paimon/core/operation/abstract_split_read.cpp | Refactors loop to index-based iteration. |
| src/paimon/core/mergetree/compact/sort_merge_reader_with_min_heap.cpp | Refactors loop to index-based iteration. |
| src/paimon/common/utils/arrow/arrow_input_stream_adapter.h | Tracks outstanding async reads for safe destruction. |
| src/paimon/common/utils/arrow/arrow_input_stream_adapter.cpp | Waits for pending futures; prunes finished futures. |
| src/paimon/CMakeLists.txt | Registers new core operation source + test. |
| cmake_modules/arrow.diff | Patches Arrow Parquet reader to add PreBufferRanges/WhenBufferedRanges and cached page-range reads. |
💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.
| // causing use-after-free in arrow::PoolBuffer::~PoolBuffer(). | ||
| std::mutex pending_futures_mutex_; | ||
| std::vector<arrow::Future<std::shared_ptr<arrow::Buffer>>> pending_futures_; | ||
| }; |
There was a problem hiding this comment.
I'm curious — is this issue introduced by the page-level pushdown feature, or did it exist before?
If it's a pre-existing problem, should we consider submitting a separate fix PR to address it independently?
That way, we can clearly track and verify the bug fix without being coupled to the new feature changes.
| const std::shared_ptr<Predicate>& predicate, | ||
| ::parquet::RowGroupPageIndexReader* rg_page_index_reader, | ||
| const std::map<std::string, int32_t>& column_name_to_index, int64_t row_group_row_count) { | ||
| if (auto leaf_predicate = std::dynamic_pointer_cast<LeafPredicate>(predicate)) { |
There was a problem hiding this comment.
For consistency, could you please place output parameters like rg_page_index_reader at the end of the parameter list — this follows the common style in our codebase.
| auto function_type = function.GetType(); | ||
| const auto& literals = leaf_predicate->Literals(); | ||
| switch (function_type) { | ||
| case Function::Type::IS_NULL: |
There was a problem hiding this comment.
Since function and function_type are needed outside the if block, consider declaring them in the outer scope for clarity.
| if (it == column_name_to_index.end()) { | ||
| // Column not found in file (schema evolution): all values are treated as NULL. | ||
| // Return precise results based on predicate type, matching Java behavior. | ||
| const auto& function = leaf_predicate->GetFunction(); |
There was a problem hiding this comment.
This case seems unlikely to occur, because in C++ Paimon, schema evolution (e.g., missing columns filled with nulls, unsupported predicates not pushed down) is already handled by FieldMappingReader.
Given that, we don’t need to handle missing columns here; it’s already filtered upstream. So reporting an error is just ok.
| // NULL = non_null → no rows. | ||
| bool has_null_literal = !literals.empty() && literals[0].IsNull(); | ||
| return has_null_literal ? RowRanges::CreateSingle(row_group_row_count) | ||
| : RowRanges::CreateEmpty(); |
There was a problem hiding this comment.
Actually, equal predicates with null literals should not match — see java.
We return false on null, rather than allowing null to participate in equality.
| for (int i = 0; i < arrow_schema->num_fields(); ++i) { | ||
| PAIMON_ASSIGN_OR_RAISE_FROM_ARROW( | ||
| auto empty_array, arrow::MakeEmptyArray(arrow_schema->field(i)->type(), pool)); | ||
| empty_columns.push_back(std::move(empty_array)); |
There was a problem hiding this comment.
Please avoid auto with PAIMON_ASSIGN_OR_RAISE — prefer explicit types for clarity.
| // Build Table from ChunkedArrays, then combine chunks and extract a single RecordBatch | ||
| auto table = arrow::Table::Make(arrow_schema, columns, expected_rows); | ||
| PAIMON_ASSIGN_OR_RAISE_FROM_ARROW(std::shared_ptr<arrow::Table> combined_table, | ||
| table->CombineChunks(pool)); |
There was a problem hiding this comment.
I wonder whether CombineChunks copies data during its operation. Does it perform a deep copy of the chunk data, or does it work with references/views to avoid unnecessary duplication?
| int64_t remaining = current_filtered_batch_->num_rows() - filtered_batch_offset_; | ||
| int64_t slice_len = (batch_size_ > 0 && remaining > batch_size_) ? batch_size_ : remaining; | ||
| record_batch = current_filtered_batch_->Slice(filtered_batch_offset_, slice_len); | ||
|
|
There was a problem hiding this comment.
Noted that the returned C ArrowArray must have offset = 0. Otherwise, converting from C ArrowArray to arrow::RecordBatch will fail with an error (see
https://github.com/apache/arrow/blob/main/cpp/src/arrow/c/bridge.cc#L1563 for ref). Therefore, Therefore, the line current_filtered_batch_->Slice(filtered_batch_offset_, slice_len); needs to be adjusted to ensure the resulting array starts at offset 0.
|
|
||
| auto meta_data = file_reader_->parquet_reader()->metadata(); | ||
| int64_t row_count = meta_data->RowGroup(row_group_index)->num_rows(); | ||
|
|
There was a problem hiding this comment.
Not a big issue, but I noticed the declaration for meta_data and row_count is repeated a few times. Maybe we could extract them into a common section to reduce duplication.
| current_filtered_batch_ = full_batch; | ||
| filtered_batch_offset_ = batch_size_; | ||
| record_batch = full_batch->Slice(0, batch_size_); | ||
| } else { |
There was a problem hiding this comment.
It would be great to have a test case for the case that the page-level predicate returns more rows than the batch size. For example, we could set page row count = 3 and try different batch sizes like 1, 2, 3, 5, and 10, to check how the results are split across batches.
Purpose
Linked issue: close ##137
Implement multi-level filtering optimization for Parquet file reading. By leveraging ColumnIndex statistics and bucket predicate derivation, the reader can skip non-matching data at the bucket, row group, and page levels, reducing I/O and decoding overhead.
Main Features
Page-level data filtering
EQUAL,NOT_EQUAL,LESS_THAN,GREATER_THAN,IN,IS_NULL, and compound predicates withAND/OR.data_page_filtercallback.SkipRecords/ReadRecords.Page-level prefetching
Computes the byte ranges of required pages based on
RowRangesandOffsetIndex, and usesArrowPreBufferfor asynchronous prefetching.BucketSelectConverter
Derives target bucket IDs from query predicates, and is compatible with the Java Paimon hash algorithm.
Tests
bucket_select_converter_test.cpp: Covers various predicate combinations,Timestamptype, and Cartesian product computation.column_index_filter_test.cpp: Covers all predicate types (EQUAL,IN,LESS_THAN,GREATER_THAN,IS_NULL, etc.) andAND/ORcompound predicates.page_filtered_row_group_reader_test.cpp: Verifies filtering correctness, edge cases, and prefetching behavior.API and Format
No public API changes. No impact on storage format or protocol.
Documentation
Not applicable.
Generative AI Tooling
Claude Code (Opus 4.6)