diff --git a/vortex-sampling-compressor/src/compressors/alp.rs b/vortex-sampling-compressor/src/compressors/alp.rs index 231dea0eb..4184dac66 100644 --- a/vortex-sampling-compressor/src/compressors/alp.rs +++ b/vortex-sampling-compressor/src/compressors/alp.rs @@ -26,6 +26,10 @@ impl EncodingCompressor for ALPCompressor { constants::ALP_COST } + fn decompression_gib_per_second(&self) -> f64 { + constants::ALP_GIB_PER_S + } + fn can_compress(&self, array: &Array) -> Option<&dyn EncodingCompressor> { // Only support primitive arrays let parray = PrimitiveArray::try_from(array).ok()?; diff --git a/vortex-sampling-compressor/src/compressors/alp_rd.rs b/vortex-sampling-compressor/src/compressors/alp_rd.rs index 78c421ed9..6f5f84154 100644 --- a/vortex-sampling-compressor/src/compressors/alp_rd.rs +++ b/vortex-sampling-compressor/src/compressors/alp_rd.rs @@ -33,6 +33,10 @@ impl EncodingCompressor for ALPRDCompressor { constants::ALP_RD_COST } + fn decompression_gib_per_second(&self) -> f64 { + constants::ALP_RD_GIB_PER_S + } + fn can_compress(&self, array: &Array) -> Option<&dyn EncodingCompressor> { // Only support primitive arrays let parray = PrimitiveArray::try_from(array).ok()?; diff --git a/vortex-sampling-compressor/src/compressors/bitpacked.rs b/vortex-sampling-compressor/src/compressors/bitpacked.rs index 7d26bfd06..e1bb4a671 100644 --- a/vortex-sampling-compressor/src/compressors/bitpacked.rs +++ b/vortex-sampling-compressor/src/compressors/bitpacked.rs @@ -52,6 +52,14 @@ impl EncodingCompressor for BitPackedCompressor { } } + fn decompression_gib_per_second(&self) -> f64 { + if self.allow_patches { + constants::BITPACKED_WITH_PATCHES_GIB_PER_S + } else { + constants::BITPACKED_NO_PATCHES_GIB_PER_S + } + } + fn can_compress(&self, array: &Array) -> Option<&dyn EncodingCompressor> { // Only support primitive arrays let parray = PrimitiveArray::try_from(array).ok()?; diff --git a/vortex-sampling-compressor/src/compressors/chunked.rs b/vortex-sampling-compressor/src/compressors/chunked.rs index aabfbd47b..c21a055c3 100644 --- a/vortex-sampling-compressor/src/compressors/chunked.rs +++ b/vortex-sampling-compressor/src/compressors/chunked.rs @@ -11,7 +11,7 @@ use vortex_error::{vortex_bail, VortexResult}; use super::EncoderMetadata; use crate::compressors::{CompressedArray, CompressionTree, EncodingCompressor}; -use crate::SamplingCompressor; +use crate::{constants, SamplingCompressor}; #[derive(Debug)] pub struct ChunkedCompressor { @@ -36,7 +36,11 @@ impl EncodingCompressor for ChunkedCompressor { } fn cost(&self) -> u8 { - 0 + constants::CHUNKED_COST + } + + fn decompression_gib_per_second(&self) -> f64 { + constants::CHUNKED_GIB_PER_S } fn can_compress(&self, array: &Array) -> Option<&dyn EncodingCompressor> { diff --git a/vortex-sampling-compressor/src/compressors/constant.rs b/vortex-sampling-compressor/src/compressors/constant.rs index 4b34b53b2..587bcb651 100644 --- a/vortex-sampling-compressor/src/compressors/constant.rs +++ b/vortex-sampling-compressor/src/compressors/constant.rs @@ -21,6 +21,10 @@ impl EncodingCompressor for ConstantCompressor { constants::CONSTANT_COST } + fn decompression_gib_per_second(&self) -> f64 { + constants::CONSTANT_GIB_PER_S + } + fn can_compress(&self, array: &Array) -> Option<&dyn EncodingCompressor> { (!array.is_empty() && array.statistics().compute_is_constant().unwrap_or(false)) .then_some(self as &dyn EncodingCompressor) diff --git a/vortex-sampling-compressor/src/compressors/date_time_parts.rs b/vortex-sampling-compressor/src/compressors/date_time_parts.rs index 25dbf02ce..026b44189 100644 --- a/vortex-sampling-compressor/src/compressors/date_time_parts.rs +++ b/vortex-sampling-compressor/src/compressors/date_time_parts.rs @@ -24,6 +24,10 @@ impl EncodingCompressor for DateTimePartsCompressor { constants::DATE_TIME_PARTS_COST } + fn decompression_gib_per_second(&self) -> f64 { + constants::DATE_TIME_PARTS_GIB_PER_S + } + fn can_compress(&self, array: &Array) -> Option<&dyn EncodingCompressor> { if let Ok(temporal_array) = TemporalArray::try_from(array) { match temporal_array.temporal_metadata() { diff --git a/vortex-sampling-compressor/src/compressors/delta.rs b/vortex-sampling-compressor/src/compressors/delta.rs index ec474f4d2..03eac87bf 100644 --- a/vortex-sampling-compressor/src/compressors/delta.rs +++ b/vortex-sampling-compressor/src/compressors/delta.rs @@ -22,6 +22,10 @@ impl EncodingCompressor for DeltaCompressor { constants::DELTA_COST } + fn decompression_gib_per_second(&self) -> f64 { + constants::DELTA_GIB_PER_S + } + fn can_compress(&self, array: &Array) -> Option<&dyn EncodingCompressor> { // Only support primitive arrays let parray = PrimitiveArray::try_from(array).ok()?; diff --git a/vortex-sampling-compressor/src/compressors/dict.rs b/vortex-sampling-compressor/src/compressors/dict.rs index 56503e815..b6258f15d 100644 --- a/vortex-sampling-compressor/src/compressors/dict.rs +++ b/vortex-sampling-compressor/src/compressors/dict.rs @@ -26,6 +26,10 @@ impl EncodingCompressor for DictCompressor { constants::DICT_COST } + fn decompression_gib_per_second(&self) -> f64 { + constants::DICT_GIB_PER_S + } + fn can_compress(&self, array: &Array) -> Option<&dyn EncodingCompressor> { if array.encoding().id() != Primitive::ID && array.encoding().id() != VarBin::ID diff --git a/vortex-sampling-compressor/src/compressors/for.rs b/vortex-sampling-compressor/src/compressors/for.rs index 579fb16c8..d94802cbb 100644 --- a/vortex-sampling-compressor/src/compressors/for.rs +++ b/vortex-sampling-compressor/src/compressors/for.rs @@ -24,6 +24,10 @@ impl EncodingCompressor for FoRCompressor { constants::FOR_COST } + fn decompression_gib_per_second(&self) -> f64 { + constants::FOR_GIB_PER_S + } + fn can_compress(&self, array: &Array) -> Option<&dyn EncodingCompressor> { // Only support primitive arrays let parray = PrimitiveArray::try_from(array).ok()?; diff --git a/vortex-sampling-compressor/src/compressors/fsst.rs b/vortex-sampling-compressor/src/compressors/fsst.rs index f1f3baf7f..3cca21dbd 100644 --- a/vortex-sampling-compressor/src/compressors/fsst.rs +++ b/vortex-sampling-compressor/src/compressors/fsst.rs @@ -37,6 +37,10 @@ impl EncodingCompressor for FSSTCompressor { constants::FSST_COST } + fn decompression_gib_per_second(&self) -> f64 { + constants::FSST_GIB_PER_S + } + fn can_compress(&self, array: &vortex_array::Array) -> Option<&dyn EncodingCompressor> { // FSST arrays must have DType::Utf8. // diff --git a/vortex-sampling-compressor/src/compressors/mod.rs b/vortex-sampling-compressor/src/compressors/mod.rs index c8fc50071..b19a56fc4 100644 --- a/vortex-sampling-compressor/src/compressors/mod.rs +++ b/vortex-sampling-compressor/src/compressors/mod.rs @@ -33,6 +33,8 @@ pub trait EncodingCompressor: Sync + Send + Debug { fn cost(&self) -> u8; + fn decompression_gib_per_second(&self) -> f64; + fn can_compress(&self, array: &Array) -> Option<&dyn EncodingCompressor>; fn compress<'a>( @@ -232,6 +234,36 @@ impl<'a> CompressedArray<'a> { pub fn nbytes(&self) -> usize { self.array.nbytes() } + + pub fn decompression_time_ms(&self, assumed_compression_ratio: f64) -> f64 { + const MS_PER_SEC: f64 = 1000.0; + const BYTES_PER_GB: f64 = 1_073_741_824.0; + let children_time = self + .path() + .iter() + .map(|c| { + c.children + .iter() + .zip(self.array.children().iter()) + .map(|(c, a)| { + CompressedArray::compressed(a.clone(), c.clone(), None) + .decompression_time_ms(assumed_compression_ratio) + }) + .sum::() + }) + .sum::(); + children_time + + self + .path() + .as_ref() + .map(|c| { + (MS_PER_SEC / c.compressor().decompression_gib_per_second()) + * assumed_compression_ratio + * self.nbytes() as f64 + / BYTES_PER_GB + }) + .unwrap_or(0.0) + } } impl AsRef for CompressedArray<'_> { diff --git a/vortex-sampling-compressor/src/compressors/roaring_bool.rs b/vortex-sampling-compressor/src/compressors/roaring_bool.rs index 36e24df75..7c7982099 100644 --- a/vortex-sampling-compressor/src/compressors/roaring_bool.rs +++ b/vortex-sampling-compressor/src/compressors/roaring_bool.rs @@ -23,6 +23,10 @@ impl EncodingCompressor for RoaringBoolCompressor { constants::ROARING_BOOL_COST } + fn decompression_gib_per_second(&self) -> f64 { + constants::ROARING_BOOL_GIB_PER_S + } + fn can_compress(&self, array: &Array) -> Option<&dyn EncodingCompressor> { // Only support bool arrays if array.encoding().id() != Bool::ID { diff --git a/vortex-sampling-compressor/src/compressors/roaring_int.rs b/vortex-sampling-compressor/src/compressors/roaring_int.rs index 6de3bba74..c6e2bb7ad 100644 --- a/vortex-sampling-compressor/src/compressors/roaring_int.rs +++ b/vortex-sampling-compressor/src/compressors/roaring_int.rs @@ -20,6 +20,10 @@ impl EncodingCompressor for RoaringIntCompressor { constants::ROARING_INT_COST } + fn decompression_gib_per_second(&self) -> f64 { + constants::ROARING_INT_GIB_PER_S + } + fn can_compress(&self, array: &Array) -> Option<&dyn EncodingCompressor> { // Only support non-nullable uint arrays if !array.dtype().is_unsigned_int() || array.dtype().is_nullable() { diff --git a/vortex-sampling-compressor/src/compressors/runend.rs b/vortex-sampling-compressor/src/compressors/runend.rs index 8b1369f72..0bda34bcc 100644 --- a/vortex-sampling-compressor/src/compressors/runend.rs +++ b/vortex-sampling-compressor/src/compressors/runend.rs @@ -26,6 +26,10 @@ impl EncodingCompressor for RunEndCompressor { constants::RUN_END_COST } + fn decompression_gib_per_second(&self) -> f64 { + constants::RUN_END_GIB_PER_S + } + fn can_compress(&self, array: &Array) -> Option<&dyn EncodingCompressor> { if array.encoding().id() != Primitive::ID { return None; diff --git a/vortex-sampling-compressor/src/compressors/sparse.rs b/vortex-sampling-compressor/src/compressors/sparse.rs index 748ff72b4..4ee5b0609 100644 --- a/vortex-sampling-compressor/src/compressors/sparse.rs +++ b/vortex-sampling-compressor/src/compressors/sparse.rs @@ -20,6 +20,10 @@ impl EncodingCompressor for SparseCompressor { constants::SPARSE_COST } + fn decompression_gib_per_second(&self) -> f64 { + constants::SPARSE_GIB_PER_S + } + fn can_compress(&self, array: &Array) -> Option<&dyn EncodingCompressor> { array.is_encoding(Sparse::ID).then_some(self) } diff --git a/vortex-sampling-compressor/src/compressors/struct_.rs b/vortex-sampling-compressor/src/compressors/struct_.rs index ba4e0bd07..c77f25f69 100644 --- a/vortex-sampling-compressor/src/compressors/struct_.rs +++ b/vortex-sampling-compressor/src/compressors/struct_.rs @@ -8,7 +8,7 @@ use vortex_array::{Array, ArrayDef, IntoArray}; use vortex_error::VortexResult; use crate::compressors::{CompressedArray, CompressionTree, EncodingCompressor}; -use crate::SamplingCompressor; +use crate::{constants, SamplingCompressor}; #[derive(Debug)] pub struct StructCompressor; @@ -19,7 +19,11 @@ impl EncodingCompressor for StructCompressor { } fn cost(&self) -> u8 { - 0 + constants::STRUCT_COST + } + + fn decompression_gib_per_second(&self) -> f64 { + constants::STRUCT_GIB_PER_S } fn can_compress(&self, array: &Array) -> Option<&dyn EncodingCompressor> { diff --git a/vortex-sampling-compressor/src/compressors/zigzag.rs b/vortex-sampling-compressor/src/compressors/zigzag.rs index 91e6da53a..0cb4be282 100644 --- a/vortex-sampling-compressor/src/compressors/zigzag.rs +++ b/vortex-sampling-compressor/src/compressors/zigzag.rs @@ -22,6 +22,10 @@ impl EncodingCompressor for ZigZagCompressor { constants::ZIGZAG_COST } + fn decompression_gib_per_second(&self) -> f64 { + constants::ZIGZAG_GIB_PER_S + } + fn can_compress(&self, array: &Array) -> Option<&dyn EncodingCompressor> { // Only support primitive arrays let parray = PrimitiveArray::try_from(array).ok()?; diff --git a/vortex-sampling-compressor/src/constants.rs b/vortex-sampling-compressor/src/constants.rs index 45f60ccf9..acfe22cee 100644 --- a/vortex-sampling-compressor/src/constants.rs +++ b/vortex-sampling-compressor/src/constants.rs @@ -1,26 +1,53 @@ #![allow(dead_code)] +#![allow(unused_imports)] -// structural pass-throughs have no cost -pub const SPARSE_COST: u8 = 0; -// TODO: struct -// TODO: chunked +pub use cost::*; +pub use decompression::*; -// so fast that we can ignore the cost -pub const BITPACKED_NO_PATCHES_COST: u8 = 0; -pub const BITPACKED_WITH_PATCHES_COST: u8 = 0; -pub const CONSTANT_COST: u8 = 0; -pub const ZIGZAG_COST: u8 = 0; +mod cost { + // structural pass-throughs have no cost + pub const SPARSE_COST: u8 = 0; + pub const STRUCT_COST: u8 = 0; + pub const CHUNKED_COST: u8 = 0; -// "normal" encodings -pub const ALP_COST: u8 = 1; -pub const ALP_RD_COST: u8 = 1; -pub const DATE_TIME_PARTS_COST: u8 = 1; -pub const DICT_COST: u8 = 1; -pub const FOR_COST: u8 = 1; -pub const FSST_COST: u8 = 1; -pub const ROARING_BOOL_COST: u8 = 1; -pub const ROARING_INT_COST: u8 = 1; -pub const RUN_END_COST: u8 = 1; + // so fast that we can ignore the cost + pub const BITPACKED_NO_PATCHES_COST: u8 = 0; + pub const BITPACKED_WITH_PATCHES_COST: u8 = 0; + pub const CONSTANT_COST: u8 = 0; + pub const ZIGZAG_COST: u8 = 0; -// "expensive" encodings -pub const DELTA_COST: u8 = 2; + // "normal" encodings + pub const ALP_COST: u8 = 1; + pub const ALP_RD_COST: u8 = 1; + pub const DATE_TIME_PARTS_COST: u8 = 1; + pub const DICT_COST: u8 = 1; + pub const FOR_COST: u8 = 1; + pub const FSST_COST: u8 = 1; + pub const ROARING_BOOL_COST: u8 = 1; + pub const ROARING_INT_COST: u8 = 1; + pub const RUN_END_COST: u8 = 1; + pub const DELTA_COST: u8 = 1; +} + +mod decompression { + // structural pass-throughs + pub const SPARSE_GIB_PER_S: f64 = f64::INFINITY; + pub const STRUCT_GIB_PER_S: f64 = f64::INFINITY; + pub const CHUNKED_GIB_PER_S: f64 = f64::INFINITY; + + // benchmarked decompression throughput + pub const ALP_GIB_PER_S: f64 = 4.9; + pub const ALP_RD_GIB_PER_S: f64 = 3.3; + pub const BITPACKED_NO_PATCHES_GIB_PER_S: f64 = 36.0; + pub const BITPACKED_WITH_PATCHES_GIB_PER_S: f64 = 33.2; + pub const CONSTANT_GIB_PER_S: f64 = 200.0; + pub const DATE_TIME_PARTS_GIB_PER_S: f64 = 20.0; // this is a guess + pub const DELTA_GIB_PER_S: f64 = 4.7; + pub const DICT_GIB_PER_S: f64 = 10.0; + pub const FOR_GIB_PER_S: f64 = 8.7; + pub const FSST_GIB_PER_S: f64 = 2.0; + pub const ROARING_BOOL_GIB_PER_S: f64 = 5.0; + pub const ROARING_INT_GIB_PER_S: f64 = 5.0; + pub const RUN_END_GIB_PER_S: f64 = 10.0; + pub const ZIGZAG_GIB_PER_S: f64 = 25.5; +} diff --git a/vortex-sampling-compressor/src/lib.rs b/vortex-sampling-compressor/src/lib.rs index 2585251ac..2b0cf84ee 100644 --- a/vortex-sampling-compressor/src/lib.rs +++ b/vortex-sampling-compressor/src/lib.rs @@ -1,28 +1,14 @@ -use std::fmt::{Debug, Display, Formatter}; use std::sync::{Arc, LazyLock}; use compressors::bitpacked::BITPACK_WITH_PATCHES; -use compressors::chunked::DEFAULT_CHUNKED_COMPRESSOR; use compressors::fsst::FSSTCompressor; -use compressors::struct_::StructCompressor; -use log::{debug, warn}; -use rand::rngs::StdRng; -use rand::SeedableRng; +use compressors::{CompressedArray, CompressionTree, CompressorRef}; use vortex_alp::{ALPEncoding, ALPRDEncoding}; -use vortex_array::aliases::hash_set::HashSet; -use vortex_array::array::{ChunkedArray, Constant}; -use vortex_array::compress::{ - check_dtype_unchanged, check_statistics_unchanged, check_validity_unchanged, - CompressionStrategy, -}; -use vortex_array::compute::slice; use vortex_array::encoding::EncodingRef; -use vortex_array::validity::Validity; -use vortex_array::{Array, ArrayDType, ArrayDef, Context, IntoCanonical}; +use vortex_array::Context; use vortex_bytebool::ByteBoolEncoding; use vortex_datetime_parts::DateTimePartsEncoding; use vortex_dict::DictEncoding; -use vortex_error::{VortexExpect as _, VortexResult}; use vortex_fastlanes::{BitPackedEncoding, DeltaEncoding, FoREncoding}; use vortex_fsst::FSSTEncoding; use vortex_roaring::{RoaringBoolEncoding, RoaringIntEncoding}; @@ -31,21 +17,21 @@ use vortex_runend_bool::RunEndBoolEncoding; use vortex_zigzag::ZigZagEncoding; use crate::compressors::alp::ALPCompressor; -use crate::compressors::constant::ConstantCompressor; use crate::compressors::date_time_parts::DateTimePartsCompressor; use crate::compressors::dict::DictCompressor; use crate::compressors::r#for::FoRCompressor; use crate::compressors::runend::DEFAULT_RUN_END_COMPRESSOR; use crate::compressors::sparse::SparseCompressor; use crate::compressors::zigzag::ZigZagCompressor; -use crate::compressors::{CompressedArray, CompressionTree, CompressorRef, EncodingCompressor}; -use crate::sampling::stratified_slices; #[cfg(feature = "arbitrary")] pub mod arbitrary; pub mod compressors; mod constants; mod sampling; +mod sampling_compressor; + +pub use sampling_compressor::*; pub static DEFAULT_COMPRESSORS: LazyLock<[CompressorRef<'static>; 9]> = LazyLock::new(|| { [ @@ -95,9 +81,72 @@ pub static ALL_ENCODINGS_CONTEXT: LazyLock> = LazyLock::new(|| { ])) }); +#[derive(Debug, Clone)] +pub struct ScanPerfConfig { + /// MiB per second of download throughput + mib_per_second: f64, + /// Compression ratio to assume when calculating decompression time + assumed_compression_ratio: f64, +} + +impl ScanPerfConfig { + pub fn download_time_ms(&self, nbytes: u64) -> f64 { + const MS_PER_SEC: f64 = 1000.0; + const BYTES_PER_MIB: f64 = (1 << 20) as f64; + (MS_PER_SEC / self.mib_per_second) * (nbytes as f64 / BYTES_PER_MIB) + } +} + +impl Default for ScanPerfConfig { + fn default() -> Self { + Self { + mib_per_second: 500.0, // 500 MiB/s for object storage + assumed_compression_ratio: 10.0, // 10:1 ratio of uncompressed data size to compressed data size + } + } +} + #[derive(Debug, Clone)] pub enum Objective { MinSize, + ScanPerf(ScanPerfConfig), +} + +impl Objective { + pub fn starting_value(&self) -> f64 { + match self { + // if we're minimizing size, we should never choose a worse compression ratio than "uncompressed" + Objective::MinSize => 1.0, + // if we're maximizing performance, the units are in milliseconds + Objective::ScanPerf(_) => f64::INFINITY, + } + } + + pub fn evaluate( + array: &CompressedArray, + base_size_bytes: usize, + config: &CompressConfig, + ) -> f64 { + let num_descendants = array + .path() + .as_ref() + .map(CompressionTree::num_descendants) + .unwrap_or(0) as u64; + let overhead_bytes = num_descendants * config.overhead_bytes_per_array; + let size_in_bytes = array.nbytes() as u64 + overhead_bytes; + + match &config.objective { + Objective::MinSize => (size_in_bytes as f64) / (base_size_bytes as f64), + Objective::ScanPerf(config) => { + let download_time = config.download_time_ms(size_in_bytes); + let decompression_time = + array.decompression_time_ms(config.assumed_compression_ratio); + + // we take the geometric mean of download and decompression time + (download_time * decompression_time).sqrt() + } + } + } } #[derive(Debug, Clone)] @@ -131,7 +180,7 @@ impl Default for CompressConfig { sample_size: 64, sample_count: 16, max_cost: 3, - objective: Objective::MinSize, + objective: Objective::ScanPerf(ScanPerfConfig::default()), overhead_bytes_per_array: 64, target_block_bytesize: 16 * mib, target_block_size: 64 * kib, @@ -139,336 +188,3 @@ impl Default for CompressConfig { } } } - -#[derive(Debug, Clone)] -pub struct SamplingCompressor<'a> { - compressors: HashSet>, - options: CompressConfig, - - path: Vec, - depth: u8, - /// A set of encodings disabled for this ctx. - disabled_compressors: HashSet>, -} - -impl Display for SamplingCompressor<'_> { - fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result { - write!(f, "[{}|{}]", self.depth, self.path.join(".")) - } -} - -impl CompressionStrategy for SamplingCompressor<'_> { - #[allow(clippy::same_name_method)] - fn compress(&self, array: &Array) -> VortexResult { - Self::compress(self, array, None).map(CompressedArray::into_array) - } - - fn used_encodings(&self) -> HashSet { - self.compressors - .iter() - .flat_map(|c| c.used_encodings()) - .collect() - } -} - -impl Default for SamplingCompressor<'_> { - fn default() -> Self { - Self::new(HashSet::from(*DEFAULT_COMPRESSORS)) - } -} - -impl<'a> SamplingCompressor<'a> { - pub fn new(compressors: HashSet>) -> Self { - Self::new_with_options(compressors, Default::default()) - } - - pub fn new_with_options( - compressors: HashSet>, - options: CompressConfig, - ) -> Self { - Self { - compressors, - options, - path: Vec::new(), - depth: 0, - disabled_compressors: HashSet::new(), - } - } - - pub fn named(&self, name: &str) -> Self { - let mut cloned = self.clone(); - cloned.path.push(name.into()); - cloned - } - - // Returns a new ctx used for compressing an auxiliary array. - // In practice, this means resetting any disabled encodings back to the original config. - pub fn auxiliary(&self, name: &str) -> Self { - let mut cloned = self.clone(); - cloned.path.push(name.into()); - cloned.disabled_compressors = HashSet::new(); - cloned - } - - pub fn for_compressor(&self, compression: &dyn EncodingCompressor) -> Self { - let mut cloned = self.clone(); - cloned.depth += compression.cost(); - cloned - } - - #[inline] - pub fn options(&self) -> &CompressConfig { - &self.options - } - - pub fn excluding(&self, compressor: CompressorRef<'a>) -> Self { - let mut cloned = self.clone(); - cloned.disabled_compressors.insert(compressor); - cloned - } - - pub fn including(&self, compressor: CompressorRef<'a>) -> Self { - let mut cloned = self.clone(); - cloned.compressors.insert(compressor); - cloned - } - - #[allow(clippy::same_name_method)] - pub fn compress( - &self, - arr: &Array, - like: Option<&CompressionTree<'a>>, - ) -> VortexResult> { - if arr.is_empty() { - return Ok(CompressedArray::uncompressed(arr.clone())); - } - - // Attempt to compress using the "like" array, otherwise fall back to sampled compression - if let Some(l) = like { - if let Some(compressed) = l.compress(arr, self) { - let compressed = compressed?; - - check_validity_unchanged(arr, compressed.as_ref()); - check_dtype_unchanged(arr, compressed.as_ref()); - check_statistics_unchanged(arr, compressed.as_ref()); - return Ok(compressed); - } else { - warn!( - "{} cannot find compressor to compress {} like {}", - self, arr, l - ); - } - } - - // Otherwise, attempt to compress the array - let compressed = self.compress_array(arr)?; - - check_validity_unchanged(arr, compressed.as_ref()); - check_dtype_unchanged(arr, compressed.as_ref()); - check_statistics_unchanged(arr, compressed.as_ref()); - Ok(compressed) - } - - pub fn compress_validity(&self, validity: Validity) -> VortexResult { - match validity { - Validity::Array(a) => Ok(Validity::Array(self.compress(&a, None)?.into_array())), - a => Ok(a), - } - } - - fn compress_array(&self, array: &Array) -> VortexResult> { - let mut rng = StdRng::seed_from_u64(self.options.rng_seed); - - if array.encoding().id() == Constant::ID { - // Not much better we can do than constant! - return Ok(CompressedArray::uncompressed(array.clone())); - } - - if let Some(cc) = DEFAULT_CHUNKED_COMPRESSOR.can_compress(array) { - return cc.compress(array, None, self.clone()); - } - - if let Some(cc) = StructCompressor.can_compress(array) { - return cc.compress(array, None, self.clone()); - } - - if let Some(cc) = ConstantCompressor.can_compress(array) { - return cc.compress(array, None, self.clone()); - } - - let (mut candidates, too_deep) = self - .compressors - .iter() - .filter(|&encoding| !self.disabled_compressors.contains(encoding)) - .filter(|&encoding| encoding.can_compress(array).is_some()) - .partition::, _>(|&encoding| { - self.depth + encoding.cost() <= self.options.max_cost - }); - - if !too_deep.is_empty() { - debug!( - "{} skipping encodings due to depth/cost: {}", - self, - too_deep - .iter() - .map(|x| x.id()) - .collect::>() - .join(", ") - ); - } - - debug!("{} candidates for {}: {:?}", self, array, candidates); - - if candidates.is_empty() { - debug!( - "{} no compressors for array with dtype: {} and encoding: {}", - self, - array.dtype(), - array.encoding().id(), - ); - return Ok(CompressedArray::uncompressed(array.clone())); - } - - // We prefer all other candidates to the array's own encoding. - // This is because we assume that the array's own encoding is the least efficient, but useful - // to destructure an array in the final stages of compression. e.g. VarBin would be DictEncoded - // but then the dictionary itself remains a VarBin array. DictEncoding excludes itself from the - // dictionary, but we still have a large offsets array that should be compressed. - // TODO(ngates): we actually probably want some way to prefer dict encoding over other varbin - // encodings, e.g. FSST. - if candidates.len() > 1 { - candidates.retain(|&compression| compression.id() != array.encoding().id().as_ref()); - } - - if array.len() <= (self.options.sample_size as usize * self.options.sample_count as usize) { - // We're either already within a sample, or we're operating over a sufficiently small array. - return find_best_compression(candidates, array, self); - } - - // Take a sample of the array, then ask codecs for their best compression estimate. - let sample = ChunkedArray::try_new( - stratified_slices( - array.len(), - self.options.sample_size, - self.options.sample_count, - &mut rng, - ) - .into_iter() - .map(|(start, stop)| slice(array, start, stop)) - .collect::>>()?, - array.dtype().clone(), - )? - .into_canonical()? - .into(); - - let best = find_best_compression(candidates, &sample, self)? - .into_path() - .map(|best_compressor| { - debug!( - "{} Compressing array {} with {}", - self, array, best_compressor - ); - best_compressor.compress_unchecked(array, self) - }) - .transpose()?; - - Ok(best.unwrap_or_else(|| CompressedArray::uncompressed(array.clone()))) - } -} - -fn find_best_compression<'a>( - candidates: Vec<&'a dyn EncodingCompressor>, - sample: &Array, - ctx: &SamplingCompressor<'a>, -) -> VortexResult> { - let mut best = None; - let mut best_objective = 1.0; - let mut best_objective_ratio = 1.0; - // for logging - let mut best_ratio = 1.0; - let mut best_ratio_sample = None; - - for compression in candidates { - debug!( - "{} trying candidate {} for {}", - ctx, - compression.id(), - sample - ); - if compression.can_compress(sample).is_none() { - continue; - } - let compressed_sample = - compression.compress(sample, None, ctx.for_compressor(compression))?; - - let ratio = (compressed_sample.nbytes() as f64) / (sample.nbytes() as f64); - let objective = objective_function(&compressed_sample, sample.nbytes(), ctx.options()); - - // track the compression ratio, just for logging - if ratio < best_ratio { - best_ratio = ratio; - - // if we find one with a better compression ratio but worse objective value, save it - // for debug logging later. - if ratio < best_objective_ratio && objective >= best_objective { - best_ratio_sample = Some(compressed_sample.clone()); - } - } - - if objective < best_objective { - best_objective = objective; - best_objective_ratio = ratio; - best = Some(compressed_sample); - } - - debug!( - "{} with {}: ratio ({}), objective fn value ({}); best so far: ratio ({}), objective fn value ({})", - ctx, - compression.id(), - ratio, - objective, - best_ratio, - best_objective - ); - } - - let best = best.unwrap_or_else(|| CompressedArray::uncompressed(sample.clone())); - if best_ratio < best_objective_ratio && best_ratio_sample.is_some() { - let best_ratio_sample = - best_ratio_sample.vortex_expect("already checked that this Option is Some"); - debug!( - "{} best objective fn value ({}) has ratio {} from {}", - ctx, - best_objective, - best_ratio, - best.array().tree_display() - ); - debug!( - "{} best ratio ({}) has objective fn value {} from {}", - ctx, - best_ratio, - best_objective, - best_ratio_sample.array().tree_display() - ); - } - - Ok(best) -} - -fn objective_function( - array: &CompressedArray, - base_size_bytes: usize, - config: &CompressConfig, -) -> f64 { - let num_descendants = array - .path() - .as_ref() - .map(CompressionTree::num_descendants) - .unwrap_or(0) as u64; - let overhead_bytes = num_descendants * config.overhead_bytes_per_array; - let size_in_bytes = array.nbytes() as u64 + overhead_bytes; - - match &config.objective { - Objective::MinSize => (size_in_bytes as f64) / (base_size_bytes as f64), - } -} diff --git a/vortex-sampling-compressor/src/sampling_compressor.rs b/vortex-sampling-compressor/src/sampling_compressor.rs new file mode 100644 index 000000000..0c16d2e06 --- /dev/null +++ b/vortex-sampling-compressor/src/sampling_compressor.rs @@ -0,0 +1,340 @@ +use core::fmt::Formatter; +use std::fmt::Display; + +use log::{debug, warn}; +use rand::rngs::StdRng; +use rand::SeedableRng as _; +use vortex_array::aliases::hash_set::HashSet; +use vortex_array::array::{ChunkedArray, Constant}; +use vortex_array::compress::{ + check_dtype_unchanged, check_statistics_unchanged, check_validity_unchanged, + CompressionStrategy, +}; +use vortex_array::compute::slice; +use vortex_array::encoding::EncodingRef; +use vortex_array::validity::Validity; +use vortex_array::{Array, ArrayDType as _, ArrayDef as _, IntoCanonical as _}; +use vortex_error::{VortexExpect as _, VortexResult}; + +use super::compressors::chunked::DEFAULT_CHUNKED_COMPRESSOR; +use super::compressors::struct_::StructCompressor; +use super::{CompressConfig, Objective, DEFAULT_COMPRESSORS}; +use crate::compressors::constant::ConstantCompressor; +use crate::compressors::{CompressedArray, CompressionTree, CompressorRef, EncodingCompressor}; +use crate::sampling::stratified_slices; + +#[derive(Debug, Clone)] +pub struct SamplingCompressor<'a> { + compressors: HashSet>, + options: CompressConfig, + + path: Vec, + depth: u8, + /// A set of encodings disabled for this ctx. + disabled_compressors: HashSet>, +} + +impl Display for SamplingCompressor<'_> { + fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result { + write!(f, "[{}|{}]", self.depth, self.path.join(".")) + } +} + +impl CompressionStrategy for SamplingCompressor<'_> { + #[allow(clippy::same_name_method)] + fn compress(&self, array: &Array) -> VortexResult { + Self::compress(self, array, None).map(CompressedArray::into_array) + } + + fn used_encodings(&self) -> HashSet { + self.compressors + .iter() + .flat_map(|c| c.used_encodings()) + .collect() + } +} + +impl Default for SamplingCompressor<'_> { + fn default() -> Self { + Self::new(HashSet::from(*DEFAULT_COMPRESSORS)) + } +} + +impl<'a> SamplingCompressor<'a> { + pub fn new(compressors: HashSet>) -> Self { + Self::new_with_options(compressors, Default::default()) + } + + pub fn new_with_options( + compressors: HashSet>, + options: CompressConfig, + ) -> Self { + Self { + compressors, + options, + path: Vec::new(), + depth: 0, + disabled_compressors: HashSet::new(), + } + } + + pub fn named(&self, name: &str) -> Self { + let mut cloned = self.clone(); + cloned.path.push(name.into()); + cloned + } + + // Returns a new ctx used for compressing an auxiliary array. + // In practice, this means resetting any disabled encodings back to the original config. + pub fn auxiliary(&self, name: &str) -> Self { + let mut cloned = self.clone(); + cloned.path.push(name.into()); + cloned.disabled_compressors = HashSet::new(); + cloned + } + + pub fn for_compressor(&self, compression: &dyn EncodingCompressor) -> Self { + let mut cloned = self.clone(); + cloned.depth += compression.cost(); + cloned + } + + #[inline] + pub fn options(&self) -> &CompressConfig { + &self.options + } + + pub fn excluding(&self, compressor: CompressorRef<'a>) -> Self { + let mut cloned = self.clone(); + cloned.disabled_compressors.insert(compressor); + cloned + } + + pub fn including(&self, compressor: CompressorRef<'a>) -> Self { + let mut cloned = self.clone(); + cloned.compressors.insert(compressor); + cloned + } + + #[allow(clippy::same_name_method)] + pub fn compress( + &self, + arr: &Array, + like: Option<&CompressionTree<'a>>, + ) -> VortexResult> { + if arr.is_empty() { + return Ok(CompressedArray::uncompressed(arr.clone())); + } + + // Attempt to compress using the "like" array, otherwise fall back to sampled compression + if let Some(l) = like { + if let Some(compressed) = l.compress(arr, self) { + let compressed = compressed?; + + check_statistics_unchanged(arr, compressed.as_ref()); + check_validity_unchanged(arr, compressed.as_ref()); + check_dtype_unchanged(arr, compressed.as_ref()); + return Ok(compressed); + } else { + warn!( + "{} cannot find compressor to compress {} like {}", + self, arr, l + ); + } + } + + // Otherwise, attempt to compress the array + let compressed = self.compress_array(arr)?; + + check_statistics_unchanged(arr, compressed.as_ref()); + check_validity_unchanged(arr, compressed.as_ref()); + check_dtype_unchanged(arr, compressed.as_ref()); + Ok(compressed) + } + + pub fn compress_validity(&self, validity: Validity) -> VortexResult { + match validity { + Validity::Array(a) => Ok(Validity::Array(self.compress(&a, None)?.into_array())), + a => Ok(a), + } + } + + pub(crate) fn compress_array(&self, array: &Array) -> VortexResult> { + let mut rng = StdRng::seed_from_u64(self.options.rng_seed); + + if array.encoding().id() == Constant::ID { + // Not much better we can do than constant! + return Ok(CompressedArray::uncompressed(array.clone())); + } + + if let Some(cc) = DEFAULT_CHUNKED_COMPRESSOR.can_compress(array) { + return cc.compress(array, None, self.clone()); + } + + if let Some(cc) = StructCompressor.can_compress(array) { + return cc.compress(array, None, self.clone()); + } + + if let Some(cc) = ConstantCompressor.can_compress(array) { + return cc.compress(array, None, self.clone()); + } + + let (mut candidates, too_deep) = self + .compressors + .iter() + .filter(|&encoding| !self.disabled_compressors.contains(encoding)) + .filter(|&encoding| encoding.can_compress(array).is_some()) + .partition::, _>(|&encoding| { + self.depth + encoding.cost() <= self.options.max_cost + }); + + if !too_deep.is_empty() { + debug!( + "{} skipping encodings due to depth/cost: {}", + self, + too_deep + .iter() + .map(|x| x.id()) + .collect::>() + .join(", ") + ); + } + + debug!("{} candidates for {}: {:?}", self, array, candidates); + + if candidates.is_empty() { + debug!( + "{} no compressors for array with dtype: {} and encoding: {}", + self, + array.dtype(), + array.encoding().id(), + ); + return Ok(CompressedArray::uncompressed(array.clone())); + } + + // We prefer all other candidates to the array's own encoding. + // This is because we assume that the array's own encoding is the least efficient, but useful + // to destructure an array in the final stages of compression. e.g. VarBin would be DictEncoded + // but then the dictionary itself remains a VarBin array. DictEncoding excludes itself from the + // dictionary, but we still have a large offsets array that should be compressed. + // TODO(ngates): we actually probably want some way to prefer dict encoding over other varbin + // encodings, e.g. FSST. + if candidates.len() > 1 { + candidates.retain(|&compression| compression.id() != array.encoding().id().as_ref()); + } + + if array.len() <= (self.options.sample_size as usize * self.options.sample_count as usize) { + // We're either already within a sample, or we're operating over a sufficiently small array. + return find_best_compression(candidates, array, self); + } + + // Take a sample of the array, then ask codecs for their best compression estimate. + let sample = ChunkedArray::try_new( + stratified_slices( + array.len(), + self.options.sample_size, + self.options.sample_count, + &mut rng, + ) + .into_iter() + .map(|(start, stop)| slice(array, start, stop)) + .collect::>>()?, + array.dtype().clone(), + )? + .into_canonical()? + .into(); + + let best = find_best_compression(candidates, &sample, self)? + .into_path() + .map(|best_compressor| { + debug!( + "{} Compressing array {} with {}", + self, array, best_compressor + ); + best_compressor.compress_unchecked(array, self) + }) + .transpose()?; + + Ok(best.unwrap_or_else(|| CompressedArray::uncompressed(array.clone()))) + } +} + +pub(crate) fn find_best_compression<'a>( + candidates: Vec<&'a dyn EncodingCompressor>, + sample: &Array, + ctx: &SamplingCompressor<'a>, +) -> VortexResult> { + let mut best = None; + let mut best_objective = ctx.options().objective.starting_value(); + let mut best_objective_ratio = 1.0; + // for logging + let mut best_compression_ratio = 1.0; + let mut best_compression_ratio_sample = None; + + for compression in candidates { + debug!( + "{} trying candidate {} for {}", + ctx, + compression.id(), + sample + ); + if compression.can_compress(sample).is_none() { + continue; + } + let compressed_sample = + compression.compress(sample, None, ctx.for_compressor(compression))?; + + let ratio = (compressed_sample.nbytes() as f64) / (sample.nbytes() as f64); + let objective = Objective::evaluate(&compressed_sample, sample.nbytes(), ctx.options()); + + // track the compression ratio, just for logging + if ratio < best_compression_ratio { + best_compression_ratio = ratio; + + // if we find one with a better compression ratio but worse objective value, save it + // for debug logging later. + if ratio < best_objective_ratio && objective >= best_objective { + best_compression_ratio_sample = Some(compressed_sample.clone()); + } + } + + // don't consider anything that compresses to be *larger* than uncompressed + if objective < best_objective && ratio < 1.0 { + best_objective = objective; + best_objective_ratio = ratio; + best = Some(compressed_sample); + } + + debug!( + "{} with {}: ratio ({}), objective fn value ({}); best so far: ratio ({}), objective fn value ({})", + ctx, + compression.id(), + ratio, + objective, + best_compression_ratio, + best_objective + ); + } + + let best = best.unwrap_or_else(|| CompressedArray::uncompressed(sample.clone())); + if best_compression_ratio < best_objective_ratio && best_compression_ratio_sample.is_some() { + let best_ratio_sample = + best_compression_ratio_sample.vortex_expect("already checked that this Option is Some"); + debug!( + "{} best objective fn value ({}) has ratio {} from {}", + ctx, + best_objective, + best_compression_ratio, + best.array().tree_display() + ); + debug!( + "{} best ratio ({}) has objective fn value {} from {}", + ctx, + best_compression_ratio, + best_objective, + best_ratio_sample.array().tree_display() + ); + } + + Ok(best) +}