diff --git a/crates/paimon/src/arrow/format/mosaic.rs b/crates/paimon/src/arrow/format/mosaic.rs index b7e3c47a..74727ba6 100644 --- a/crates/paimon/src/arrow/format/mosaic.rs +++ b/crates/paimon/src/arrow/format/mosaic.rs @@ -28,6 +28,7 @@ use async_trait::async_trait; use bytes::Bytes; use futures::StreamExt; use paimon_mosaic_core::reader::{InputFile, MosaicReader, ReaderAccess}; +use std::collections::HashSet; use std::io; pub(crate) struct MosaicFormatReader; @@ -46,16 +47,28 @@ impl FormatFileReader for MosaicFormatReader { row_selection: Option>, ) -> crate::Result { // Mosaic predicates are currently residual; callers must re-check them for exact filtering. - let target_schema = build_target_arrow_schema(read_fields)?; - validate_mosaic_schema(&target_schema)?; - let file_bytes = reader.read(0..file_size).await?; let mosaic_reader = MosaicReader::new(MemoryInputFile::new(file_bytes), file_size) .map_err(mosaic_read_error)?; - let projected_names = read_fields + + let file_column_names = mosaic_reader + .schema() + .columns + .iter() + .map(|column| column.name.as_str()) + .collect::>(); + let existing_read_fields = read_fields + .iter() + .filter(|field| file_column_names.contains(field.name())) + .cloned() + .collect::>(); + let read_schema = build_target_arrow_schema(&existing_read_fields)?; + validate_mosaic_schema(&read_schema)?; + let projected_names = existing_read_fields .iter() .map(|field| field.name().to_string()) .collect::>(); + let all_projected_columns_missing = !read_fields.is_empty() && projected_names.is_empty(); let batch_size = batch_size.unwrap_or(DEFAULT_BATCH_SIZE); Ok(try_stream! { @@ -82,24 +95,25 @@ impl FormatFileReader for MosaicFormatReader { } } - let mut row_group_reader = if projected_names.is_empty() { - mosaic_reader - .row_group_reader_by_names(row_group_index, &[]) - .map_err(mosaic_read_error)? + let batch = if all_projected_columns_missing { + let row_count = selected_indices + .as_ref() + .map_or(row_group_rows, UInt64Array::len); + empty_batch(read_schema.clone(), row_count)? } else { let names = projected_names .iter() .map(String::as_str) .collect::>(); - mosaic_reader + let mut row_group_reader = mosaic_reader .row_group_reader_by_names(row_group_index, &names) - .map_err(mosaic_read_error)? - }; + .map_err(mosaic_read_error)?; - let batch = row_group_reader - .read_columns() - .map_err(mosaic_read_error)?; - let batch = take_rows(batch, selected_indices.as_ref(), &target_schema)?; + let batch = row_group_reader + .read_columns() + .map_err(mosaic_read_error)?; + take_rows(batch, selected_indices.as_ref(), &read_schema)? + }; for chunk in split_batch(batch, batch_size) { yield chunk; } @@ -251,15 +265,7 @@ fn take_rows( }; if batch.num_columns() == 0 { - return RecordBatch::try_new_with_options( - target_schema.clone(), - Vec::new(), - &RecordBatchOptions::new().with_row_count(Some(indices.len())), - ) - .map_err(|e| Error::UnexpectedError { - message: format!("Failed to build empty Mosaic RecordBatch: {e}"), - source: Some(Box::new(e)), - }); + return empty_batch(target_schema.clone(), indices.len()); } let columns = batch @@ -287,15 +293,7 @@ fn ensure_schema(batch: RecordBatch, target_schema: &SchemaRef) -> crate::Result } if batch.num_columns() == 0 { - return RecordBatch::try_new_with_options( - target_schema.clone(), - Vec::new(), - &RecordBatchOptions::new().with_row_count(Some(batch.num_rows())), - ) - .map_err(|e| Error::UnexpectedError { - message: format!("Failed to build empty Mosaic RecordBatch: {e}"), - source: Some(Box::new(e)), - }); + return empty_batch(target_schema.clone(), batch.num_rows()); } RecordBatch::try_new(target_schema.clone(), batch.columns().to_vec()).map_err(|e| { @@ -306,6 +304,18 @@ fn ensure_schema(batch: RecordBatch, target_schema: &SchemaRef) -> crate::Result }) } +fn empty_batch(schema: SchemaRef, row_count: usize) -> crate::Result { + RecordBatch::try_new_with_options( + schema, + Vec::new(), + &RecordBatchOptions::new().with_row_count(Some(row_count)), + ) + .map_err(|e| Error::UnexpectedError { + message: format!("Failed to build empty Mosaic RecordBatch: {e}"), + source: Some(Box::new(e)), + }) +} + fn split_batch(batch: RecordBatch, batch_size: usize) -> Vec { if batch_size == 0 || batch.num_rows() <= batch_size { return vec![batch]; @@ -400,6 +410,10 @@ mod tests { ] } + fn field(id: i32, name: &str, data_type: DataType) -> DataField { + DataField::new(id, name.to_string(), data_type) + } + fn arrow_schema() -> SchemaRef { Arc::new(Schema::new(vec![ Field::new("id", ArrowDataType::Int32, false), @@ -526,32 +540,128 @@ mod tests { } #[tokio::test] - async fn test_unsupported_type_returns_error() { - let unsupported = vec![DataField::new( + async fn test_read_projection_with_missing_column() { + let fields = data_fields(); + let projected = vec![ + fields[0].clone(), + field(3, "new_score", DataType::Int(IntType::with_nullable(true))), + fields[1].clone(), + ]; + let data = write_mosaic(&sample_batch()); + let batches = read_batches(data, &projected, None).await.unwrap(); + + assert_eq!(batches.len(), 1); + assert_eq!(batches[0].num_rows(), 5); + assert_eq!(batches[0].num_columns(), 2); + assert_eq!(batches[0].schema().field(0).name(), "id"); + assert_eq!(batches[0].schema().field(1).name(), "name"); + let ids = batches[0] + .column(0) + .as_any() + .downcast_ref::() + .unwrap(); + assert_eq!(ids.values(), &[1, 2, 3, 4, 5]); + } + + #[tokio::test] + async fn test_read_projection_with_missing_unsupported_column() { + let fields = data_fields(); + let projected = vec![ + fields[0].clone(), + field( + 3, + "new_items", + DataType::Array(ArrayType::new(DataType::Int(IntType::new()))), + ), + ]; + let data = write_mosaic(&sample_batch()); + let batches = read_batches(data, &projected, None).await.unwrap(); + + assert_eq!(batches.len(), 1); + assert_eq!(batches[0].num_rows(), 5); + assert_eq!(batches[0].num_columns(), 1); + assert_eq!(batches[0].schema().field(0).name(), "id"); + } + + #[tokio::test] + async fn test_read_projection_with_existing_unsupported_column_returns_error() { + let projected = vec![field( 0, - "items".to_string(), + "id", DataType::Array(ArrayType::new(DataType::Int(IntType::new()))), )]; - let result = MosaicFormatReader - .read_batch_stream( - Box::new(TestFileRead { data: Bytes::new() }), - 0, - &unsupported, - None, - None, - None, - ) - .await; - let err = match result { - Ok(_) => panic!("expected unsupported Mosaic type error"), - Err(err) => err, - }; + let data = write_mosaic(&sample_batch()); + let err = read_batches(data, &projected, None).await.unwrap_err(); assert!( - matches!(err, Error::Unsupported { message } if message.contains("Mosaic format does not support column 'items'")) + matches!(err, Error::Unsupported { message } if message.contains("Mosaic format does not support column 'id'")) ); } + #[tokio::test] + async fn test_read_projection_all_columns_missing() { + let projected = vec![ + field(3, "new_score", DataType::Int(IntType::with_nullable(true))), + field( + 4, + "new_name", + DataType::VarChar(VarCharType::with_nullable(true, 20).unwrap()), + ), + ]; + let data = write_mosaic(&sample_batch()); + let batches = read_batches(data, &projected, None).await.unwrap(); + + assert_eq!(batches.len(), 1); + assert_eq!(batches[0].num_rows(), 5); + assert_eq!(batches[0].num_columns(), 0); + assert!(batches[0].schema().fields().is_empty()); + } + + #[tokio::test] + async fn test_read_projection_all_columns_missing_with_row_selection() { + let projected = vec![field( + 3, + "new_score", + DataType::Int(IntType::with_nullable(true)), + )]; + let data = write_mosaic(&sample_batch()); + let batches = read_batches(data, &projected, Some(vec![RowRange::new(1, 3)])) + .await + .unwrap(); + + assert_eq!(batches.len(), 1); + assert_eq!(batches[0].num_rows(), 3); + assert_eq!(batches[0].num_columns(), 0); + } + + #[tokio::test] + async fn test_read_projection_with_missing_column_and_row_selection() { + let fields = data_fields(); + let projected = vec![ + fields[2].clone(), + field(3, "new_id", DataType::Int(IntType::with_nullable(true))), + ]; + let data = write_mosaic(&sample_batch()); + let batches = read_batches( + data, + &projected, + Some(vec![RowRange::new(0, 1), RowRange::new(4, 4)]), + ) + .await + .unwrap(); + + assert_eq!(batches.len(), 1); + assert_eq!(batches[0].num_rows(), 3); + assert_eq!(batches[0].num_columns(), 1); + assert_eq!(batches[0].schema().field(0).name(), "score"); + let scores = batches[0] + .column(0) + .as_any() + .downcast_ref::() + .unwrap(); + assert_eq!(scores.values(), &[10, 20, 50]); + } + #[test] fn test_validate_row_type_as_unsupported() { let unsupported = vec![DataField::new( diff --git a/crates/paimon/src/table/data_file_reader.rs b/crates/paimon/src/table/data_file_reader.rs index 8a0cfc70..f08ad91e 100644 --- a/crates/paimon/src/table/data_file_reader.rs +++ b/crates/paimon/src/table/data_file_reader.rs @@ -463,3 +463,185 @@ pub(super) fn append_null_row_id_column( let array: Arc = Arc::new(Int64Array::new_null(batch.num_rows())); insert_column_at(batch, array, insert_index, output_schema) } + +#[cfg(all(test, feature = "mosaic"))] +mod tests { + use super::*; + use crate::arrow::build_target_arrow_schema; + use crate::io::FileIOBuilder; + use crate::spec::stats::BinaryTableStats; + use crate::spec::{ArrayType, DataFileMeta, DataType, IntType, VarCharType}; + use crate::table::source::DataSplitBuilder; + use arrow_array::{Int32Array, StringArray}; + use bytes::Bytes; + use futures::TryStreamExt; + use paimon_mosaic_core::spec::COMPRESSION_NONE; + use paimon_mosaic_core::writer::{MosaicWriter, OutputFile, WriterOptions}; + use std::io; + + struct MemOutputFile { + data: Vec, + } + + impl MemOutputFile { + fn new() -> Self { + Self { data: Vec::new() } + } + } + + impl OutputFile for MemOutputFile { + fn write(&mut self, data: &[u8]) -> io::Result<()> { + self.data.extend_from_slice(data); + Ok(()) + } + + fn flush(&mut self) -> io::Result<()> { + Ok(()) + } + + fn pos(&self) -> u64 { + self.data.len() as u64 + } + } + + fn data_field(id: i32, name: &str, data_type: DataType) -> DataField { + DataField::new(id, name.to_string(), data_type) + } + + fn data_file(file_name: &str, file_size: i64, row_count: i64, schema_id: i64) -> DataFileMeta { + DataFileMeta { + file_name: file_name.to_string(), + file_size, + row_count, + min_key: Vec::new(), + max_key: Vec::new(), + key_stats: BinaryTableStats::empty(), + value_stats: BinaryTableStats::empty(), + min_sequence_number: 0, + max_sequence_number: 0, + schema_id, + level: 0, + extra_files: Vec::new(), + creation_time: None, + delete_row_count: None, + embedded_index: None, + file_source: None, + value_stats_cols: None, + external_path: None, + first_row_id: None, + write_cols: None, + } + } + + fn write_mosaic(batch: &RecordBatch) -> Bytes { + let out = MemOutputFile::new(); + let mut writer = MosaicWriter::new( + out, + batch.schema().as_ref(), + WriterOptions { + compression: COMPRESSION_NONE, + num_buckets: 2, + row_group_max_size: u64::MAX, + ..Default::default() + }, + ) + .unwrap(); + writer.write_batch(batch).unwrap(); + writer.close().unwrap(); + Bytes::from(writer.output().data.to_vec()) + } + + #[tokio::test] + async fn test_mosaic_physical_missing_column_is_null_filled() { + let physical_fields = vec![ + data_field(0, "id", DataType::Int(IntType::with_nullable(false))), + data_field( + 1, + "name", + DataType::VarChar(VarCharType::with_nullable(true, 20).unwrap()), + ), + ]; + let read_fields = vec![ + physical_fields[0].clone(), + data_field( + 2, + "items", + DataType::Array(ArrayType::new(DataType::Int(IntType::new()))), + ), + physical_fields[1].clone(), + ]; + + let physical_arrow_schema = build_target_arrow_schema(&physical_fields).unwrap(); + let batch = RecordBatch::try_new( + physical_arrow_schema, + vec![ + Arc::new(Int32Array::from(vec![1, 2, 3])), + Arc::new(StringArray::from(vec!["a", "b", "c"])), + ], + ) + .unwrap(); + let data = write_mosaic(&batch); + + let file_io = FileIOBuilder::new("memory").build().unwrap(); + let table_path = "memory:/mosaic_schema_evolution"; + let bucket_path = format!("{table_path}/bucket-0"); + let file_name = "part-0.mosaic"; + let file_path = format!("{bucket_path}/{file_name}"); + file_io + .new_output(&file_path) + .unwrap() + .write(data.clone()) + .await + .unwrap(); + + let table_schema_id = 1; + let split = DataSplitBuilder::new() + .with_snapshot(1) + .with_partition(crate::spec::BinaryRow::new(0)) + .with_bucket(0) + .with_bucket_path(bucket_path) + .with_total_buckets(1) + .with_data_files(vec![data_file( + file_name, + data.len() as i64, + 3, + table_schema_id, + )]) + .build() + .unwrap(); + let schema_manager = SchemaManager::new(file_io.clone(), table_path.to_string()); + let reader = DataFileReader::new( + file_io, + schema_manager, + table_schema_id, + read_fields.clone(), + read_fields.clone(), + Vec::new(), + ); + let stream = reader.read(&[split]).unwrap(); + let batches = stream.try_collect::>().await.unwrap(); + + assert_eq!(batches.len(), 1); + let result = &batches[0]; + assert_eq!(result.num_rows(), 3); + assert_eq!(result.num_columns(), 3); + assert_eq!(result.schema().field(0).name(), "id"); + assert_eq!(result.schema().field(1).name(), "items"); + assert_eq!(result.schema().field(2).name(), "name"); + assert_eq!(result.column(1).null_count(), 3); + + let ids = result + .column(0) + .as_any() + .downcast_ref::() + .unwrap(); + assert_eq!(ids.values(), &[1, 2, 3]); + let names = result + .column(2) + .as_any() + .downcast_ref::() + .unwrap(); + assert_eq!(names.value(0), "a"); + assert_eq!(names.value(2), "c"); + } +}