Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
47 changes: 47 additions & 0 deletions crates/paimon/src/spec/core_options.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ use std::collections::{HashMap, HashSet};
const DELETION_VECTORS_ENABLED_OPTION: &str = "deletion-vectors.enabled";
const DATA_EVOLUTION_ENABLED_OPTION: &str = "data-evolution.enabled";
const GLOBAL_INDEX_ENABLED_OPTION: &str = "global-index.enabled";
const GLOBAL_INDEX_ROW_COUNT_PER_SHARD_OPTION: &str = "global-index.row-count-per-shard";
const SOURCE_SPLIT_TARGET_SIZE_OPTION: &str = "source.split.target-size";
const SOURCE_SPLIT_OPEN_FILE_COST_OPTION: &str = "source.split.open-file-cost";
const PARTITION_DEFAULT_NAME_OPTION: &str = "partition.default-name";
Expand Down Expand Up @@ -64,6 +65,7 @@ const DEFAULT_TARGET_FILE_SIZE: i64 = 256 * 1024 * 1024;
const DEFAULT_WRITE_PARQUET_BUFFER_SIZE: i64 = 256 * 1024 * 1024;
const DYNAMIC_BUCKET_TARGET_ROW_NUM_OPTION: &str = "dynamic-bucket.target-row-num";
const DEFAULT_DYNAMIC_BUCKET_TARGET_ROW_NUM: i64 = 200_000;
const DEFAULT_GLOBAL_INDEX_ROW_COUNT_PER_SHARD: i64 = 100_000;
const BLOB_AS_DESCRIPTOR_OPTION: &str = "blob-as-descriptor";
const BLOB_DESCRIPTOR_FIELD_OPTION: &str = "blob-descriptor-field";

Expand Down Expand Up @@ -219,6 +221,22 @@ impl<'a> CoreOptions<'a> {
.unwrap_or(false)
}

pub fn global_index_row_count_per_shard(&self) -> crate::Result<i64> {
let value = self
.parse_i64_option(GLOBAL_INDEX_ROW_COUNT_PER_SHARD_OPTION)?
.unwrap_or(DEFAULT_GLOBAL_INDEX_ROW_COUNT_PER_SHARD);
if value <= 0 {
return Err(crate::Error::DataInvalid {
message: format!(
"Option '{}' must be greater than 0, got: {}",
GLOBAL_INDEX_ROW_COUNT_PER_SHARD_OPTION, value
),
source: None,
});
}
Ok(value)
}

pub fn source_split_target_size(&self) -> i64 {
self.options
.get(SOURCE_SPLIT_TARGET_SIZE_OPTION)
Expand Down Expand Up @@ -533,6 +551,10 @@ mod tests {

assert_eq!(core_options.source_split_target_size(), 128 * 1024 * 1024);
assert_eq!(core_options.source_split_open_file_cost(), 4 * 1024 * 1024);
assert_eq!(
core_options.global_index_row_count_per_shard().unwrap(),
100_000
);
}

#[test]
Expand All @@ -546,11 +568,36 @@ mod tests {
SOURCE_SPLIT_OPEN_FILE_COST_OPTION.to_string(),
"8 mb".to_string(),
),
(
GLOBAL_INDEX_ROW_COUNT_PER_SHARD_OPTION.to_string(),
"2048".to_string(),
),
]);
let core_options = CoreOptions::new(&options);

assert_eq!(core_options.source_split_target_size(), 256 * 1024 * 1024);
assert_eq!(core_options.source_split_open_file_cost(), 8 * 1024 * 1024);
assert_eq!(
core_options.global_index_row_count_per_shard().unwrap(),
2048
);
}

#[test]
fn test_global_index_row_count_per_shard_rejects_invalid_values() {
for value in ["0", "-1", "abc"] {
let options = HashMap::from([(
GLOBAL_INDEX_ROW_COUNT_PER_SHARD_OPTION.to_string(),
value.to_string(),
)]);
let core = CoreOptions::new(&options);

let err = core
.global_index_row_count_per_shard()
.expect_err("invalid rows-per-shard should fail");
assert!(matches!(err, crate::Error::DataInvalid { message, .. }
if message.contains(GLOBAL_INDEX_ROW_COUNT_PER_SHARD_OPTION)));
}
}

#[test]
Expand Down
Loading
Loading