Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: ScanPerf objective in sampling compressor #1068

Open
wants to merge 50 commits into
base: develop
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from 44 commits
Commits
Show all changes
50 commits
Select commit Hold shift + click to select a range
2d92a40
wip
lwwmanning Oct 7, 2024
5baafcc
enable delta but with increased cost
lwwmanning Oct 8, 2024
a28d50f
bitpack with/without patches, only do without for patches
lwwmanning Oct 8, 2024
ceacc3e
only ALPRD after ALP, bitpack with patches (then without)
lwwmanning Oct 8, 2024
a5d19f8
alp used_encodings
lwwmanning Oct 8, 2024
8765d45
wip on mostly_constant
lwwmanning Oct 8, 2024
e52cc4e
Revert "wip on mostly_constant"
lwwmanning Oct 8, 2024
8a5d115
smattered fixes
lwwmanning Oct 8, 2024
7c7873d
estimates of decompression time
lwwmanning Oct 8, 2024
d405b51
wip
lwwmanning Oct 8, 2024
2637b59
throughput benchmarks
lwwmanning Oct 9, 2024
6d677bc
Merge branch 'develop' into wm/objective
lwwmanning Oct 9, 2024
7a2955f
better bench
lwwmanning Oct 9, 2024
00a0c1c
pull out the constants
lwwmanning Oct 9, 2024
686c123
revert run end stats changes
lwwmanning Oct 9, 2024
a746b7c
cost constants
lwwmanning Oct 9, 2024
8409fd1
twiddles
lwwmanning Oct 9, 2024
0d71fb1
better
lwwmanning Oct 9, 2024
675d51f
disable delta for now
lwwmanning Oct 9, 2024
89a594c
fmt
lwwmanning Oct 9, 2024
46131f0
haxors
lwwmanning Oct 9, 2024
45560d4
stuff and things
lwwmanning Oct 9, 2024
340bcce
switch to ScanPerf
lwwmanning Oct 9, 2024
23a9963
decompression bench
lwwmanning Oct 9, 2024
e5a9ca8
asdfa
lwwmanning Oct 9, 2024
73f6166
turn on delta
lwwmanning Oct 9, 2024
507e658
Revert "turn on delta"
lwwmanning Oct 9, 2024
af12eb8
Fmt
lwwmanning Oct 9, 2024
8f4e174
Merge remote-tracking branch 'origin/develop' into wm/scanperf
lwwmanning Oct 10, 2024
9397e97
fix up semantic merge conflicts
lwwmanning Oct 10, 2024
8797bb4
Merge remote-tracking branch 'origin/develop' into wm/scanperf
lwwmanning Oct 17, 2024
0ebf98e
cleanup
lwwmanning Oct 17, 2024
ba50423
extract sampling compressor
lwwmanning Oct 17, 2024
580ad50
rename
lwwmanning Oct 17, 2024
77387af
Merge remote-tracking branch 'origin/develop' into wm/scanperf
lwwmanning Oct 17, 2024
7d2de4c
more moving around
lwwmanning Oct 17, 2024
5bbd9cf
merging origin/develop
lwwmanning Nov 4, 2024
8b2b83e
Merge remote-tracking branch 'origin/develop' into wm/scanperf
lwwmanning Nov 4, 2024
c350e55
Merge remote-tracking branch 'origin/develop' into wm/scanperf
lwwmanning Nov 4, 2024
6c82456
fix doctest
lwwmanning Nov 7, 2024
12704c4
Merge remote-tracking branch 'origin/develop' into wm/scanperf
lwwmanning Nov 7, 2024
aad150f
fix merge conflict
lwwmanning Nov 7, 2024
c5d1101
fmt
lwwmanning Nov 7, 2024
798980b
fix
lwwmanning Nov 7, 2024
6c7027c
fix
lwwmanning Nov 7, 2024
b5b0143
Merge remote-tracking branch 'origin/develop' into wm/scanperf
lwwmanning Nov 7, 2024
3a7d3c9
Revert "fix"
lwwmanning Nov 7, 2024
d414df9
geometric mean, infinite starting value, and don't take things with c…
lwwmanning Nov 7, 2024
ed0abdc
Merge remote-tracking branch 'origin/develop' into wm/scanperf
lwwmanning Nov 7, 2024
dc987c7
fix and fmt
lwwmanning Nov 7, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions vortex-sampling-compressor/src/compressors/alp.rs
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,10 @@ impl EncodingCompressor for ALPCompressor {
constants::ALP_COST
}

fn decompression_gib_per_second(&self) -> f64 {
constants::ALP_GIB_PER_S
}

fn can_compress(&self, array: &Array) -> Option<&dyn EncodingCompressor> {
// Only support primitive arrays
let parray = PrimitiveArray::try_from(array).ok()?;
Expand Down
4 changes: 4 additions & 0 deletions vortex-sampling-compressor/src/compressors/alp_rd.rs
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,10 @@ impl EncodingCompressor for ALPRDCompressor {
constants::ALP_RD_COST
}

fn decompression_gib_per_second(&self) -> f64 {
constants::ALP_RD_GIB_PER_S
}

fn can_compress(&self, array: &Array) -> Option<&dyn EncodingCompressor> {
// Only support primitive arrays
let parray = PrimitiveArray::try_from(array).ok()?;
Expand Down
8 changes: 8 additions & 0 deletions vortex-sampling-compressor/src/compressors/bitpacked.rs
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,14 @@ impl EncodingCompressor for BitPackedCompressor {
}
}

fn decompression_gib_per_second(&self) -> f64 {
if self.allow_patches {
constants::BITPACKED_WITH_PATCHES_GIB_PER_S
} else {
constants::BITPACKED_NO_PATCHES_GIB_PER_S
}
}

fn can_compress(&self, array: &Array) -> Option<&dyn EncodingCompressor> {
// Only support primitive arrays
let parray = PrimitiveArray::try_from(array).ok()?;
Expand Down
8 changes: 6 additions & 2 deletions vortex-sampling-compressor/src/compressors/chunked.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ use vortex_error::{vortex_bail, VortexResult};

use super::EncoderMetadata;
use crate::compressors::{CompressedArray, CompressionTree, EncodingCompressor};
use crate::SamplingCompressor;
use crate::{constants, SamplingCompressor};

#[derive(Debug)]
pub struct ChunkedCompressor {
Expand All @@ -35,7 +35,11 @@ impl EncodingCompressor for ChunkedCompressor {
}

fn cost(&self) -> u8 {
0
constants::CHUNKED_COST
}

fn decompression_gib_per_second(&self) -> f64 {
constants::CHUNKED_GIB_PER_S
}

fn can_compress(&self, array: &Array) -> Option<&dyn EncodingCompressor> {
Expand Down
4 changes: 4 additions & 0 deletions vortex-sampling-compressor/src/compressors/constant.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,10 @@ impl EncodingCompressor for ConstantCompressor {
constants::CONSTANT_COST
}

fn decompression_gib_per_second(&self) -> f64 {
constants::CONSTANT_GIB_PER_S
}

fn can_compress(&self, array: &Array) -> Option<&dyn EncodingCompressor> {
(!array.is_empty() && array.statistics().compute_is_constant().unwrap_or(false))
.then_some(self as &dyn EncodingCompressor)
Expand Down
4 changes: 4 additions & 0 deletions vortex-sampling-compressor/src/compressors/date_time_parts.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,10 @@ impl EncodingCompressor for DateTimePartsCompressor {
constants::DATE_TIME_PARTS_COST
}

fn decompression_gib_per_second(&self) -> f64 {
constants::DATE_TIME_PARTS_GIB_PER_S
}

fn can_compress(&self, array: &Array) -> Option<&dyn EncodingCompressor> {
if let Ok(temporal_array) = TemporalArray::try_from(array) {
match temporal_array.temporal_metadata() {
Expand Down
4 changes: 4 additions & 0 deletions vortex-sampling-compressor/src/compressors/delta.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,10 @@ impl EncodingCompressor for DeltaCompressor {
constants::DELTA_COST
}

fn decompression_gib_per_second(&self) -> f64 {
constants::DELTA_GIB_PER_S
}

fn can_compress(&self, array: &Array) -> Option<&dyn EncodingCompressor> {
// Only support primitive arrays
let parray = PrimitiveArray::try_from(array).ok()?;
Expand Down
4 changes: 4 additions & 0 deletions vortex-sampling-compressor/src/compressors/dict.rs
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,10 @@ impl EncodingCompressor for DictCompressor {
constants::DICT_COST
}

fn decompression_gib_per_second(&self) -> f64 {
constants::DICT_GIB_PER_S
}

fn can_compress(&self, array: &Array) -> Option<&dyn EncodingCompressor> {
if array.encoding().id() != Primitive::ID
&& array.encoding().id() != VarBin::ID
Expand Down
4 changes: 4 additions & 0 deletions vortex-sampling-compressor/src/compressors/for.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,10 @@ impl EncodingCompressor for FoRCompressor {
constants::FOR_COST
}

fn decompression_gib_per_second(&self) -> f64 {
constants::FOR_GIB_PER_S
}

fn can_compress(&self, array: &Array) -> Option<&dyn EncodingCompressor> {
// Only support primitive arrays
let parray = PrimitiveArray::try_from(array).ok()?;
Expand Down
4 changes: 4 additions & 0 deletions vortex-sampling-compressor/src/compressors/fsst.rs
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,10 @@ impl EncodingCompressor for FSSTCompressor {
constants::FSST_COST
}

fn decompression_gib_per_second(&self) -> f64 {
constants::FSST_GIB_PER_S
}

fn can_compress(&self, array: &vortex_array::Array) -> Option<&dyn EncodingCompressor> {
// FSST arrays must have DType::Utf8.
//
Expand Down
32 changes: 32 additions & 0 deletions vortex-sampling-compressor/src/compressors/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,8 @@ pub trait EncodingCompressor: Sync + Send + Debug {

fn cost(&self) -> u8;

fn decompression_gib_per_second(&self) -> f64;

fn can_compress(&self, array: &Array) -> Option<&dyn EncodingCompressor>;

fn compress<'a>(
Expand Down Expand Up @@ -222,6 +224,36 @@ impl<'a> CompressedArray<'a> {
pub fn nbytes(&self) -> usize {
self.array.nbytes()
}

pub fn decompression_time_ms(&self, assumed_compression_ratio: f64) -> f64 {
const MS_PER_SEC: f64 = 1000.0;
const BYTES_PER_GB: f64 = 1_073_741_824.0;
let children_time = self
.path()
.iter()
.map(|c| {
c.children
.iter()
.zip(self.array.children().iter())
.map(|(c, a)| {
CompressedArray::new(a.clone(), c.clone())
.decompression_time_ms(assumed_compression_ratio)
})
.sum::<f64>()
})
.sum::<f64>();
children_time
+ self
.path()
.as_ref()
.map(|c| {
(MS_PER_SEC / c.compressor().decompression_gib_per_second())
* assumed_compression_ratio
* self.nbytes() as f64
/ BYTES_PER_GB
})
.unwrap_or(0.0)
}
}

impl AsRef<Array> for CompressedArray<'_> {
Expand Down
4 changes: 4 additions & 0 deletions vortex-sampling-compressor/src/compressors/roaring_bool.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,10 @@ impl EncodingCompressor for RoaringBoolCompressor {
constants::ROARING_BOOL_COST
}

fn decompression_gib_per_second(&self) -> f64 {
constants::ROARING_BOOL_GIB_PER_S
}

fn can_compress(&self, array: &Array) -> Option<&dyn EncodingCompressor> {
// Only support bool arrays
if array.encoding().id() != Bool::ID {
Expand Down
4 changes: 4 additions & 0 deletions vortex-sampling-compressor/src/compressors/roaring_int.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,10 @@ impl EncodingCompressor for RoaringIntCompressor {
constants::ROARING_INT_COST
}

fn decompression_gib_per_second(&self) -> f64 {
constants::ROARING_INT_GIB_PER_S
}

fn can_compress(&self, array: &Array) -> Option<&dyn EncodingCompressor> {
// Only support non-nullable uint arrays
if !array.dtype().is_unsigned_int() || array.dtype().is_nullable() {
Expand Down
4 changes: 4 additions & 0 deletions vortex-sampling-compressor/src/compressors/runend.rs
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,10 @@ impl EncodingCompressor for RunEndCompressor {
constants::RUN_END_COST
}

fn decompression_gib_per_second(&self) -> f64 {
constants::RUN_END_GIB_PER_S
}

fn can_compress(&self, array: &Array) -> Option<&dyn EncodingCompressor> {
if array.encoding().id() != Primitive::ID {
return None;
Expand Down
4 changes: 4 additions & 0 deletions vortex-sampling-compressor/src/compressors/sparse.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,10 @@ impl EncodingCompressor for SparseCompressor {
constants::SPARSE_COST
}

fn decompression_gib_per_second(&self) -> f64 {
constants::SPARSE_GIB_PER_S
}

fn can_compress(&self, array: &Array) -> Option<&dyn EncodingCompressor> {
array.is_encoding(Sparse::ID).then_some(self)
}
Expand Down
8 changes: 6 additions & 2 deletions vortex-sampling-compressor/src/compressors/struct_.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ use vortex_array::{Array, ArrayDef, IntoArray};
use vortex_error::VortexResult;

use crate::compressors::{CompressedArray, CompressionTree, EncodingCompressor};
use crate::SamplingCompressor;
use crate::{constants, SamplingCompressor};

#[derive(Debug)]
pub struct StructCompressor;
Expand All @@ -18,7 +18,11 @@ impl EncodingCompressor for StructCompressor {
}

fn cost(&self) -> u8 {
0
constants::STRUCT_COST
}

fn decompression_gib_per_second(&self) -> f64 {
constants::STRUCT_GIB_PER_S
}

fn can_compress(&self, array: &Array) -> Option<&dyn EncodingCompressor> {
Expand Down
4 changes: 4 additions & 0 deletions vortex-sampling-compressor/src/compressors/zigzag.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,10 @@ impl EncodingCompressor for ZigZagCompressor {
constants::ZIGZAG_COST
}

fn decompression_gib_per_second(&self) -> f64 {
constants::ZIGZAG_GIB_PER_S
}

fn can_compress(&self, array: &Array) -> Option<&dyn EncodingCompressor> {
// Only support primitive arrays
let parray = PrimitiveArray::try_from(array).ok()?;
Expand Down
69 changes: 48 additions & 21 deletions vortex-sampling-compressor/src/constants.rs
Original file line number Diff line number Diff line change
@@ -1,26 +1,53 @@
#![allow(dead_code)]
#![allow(unused_imports)]

// structural pass-throughs have no cost
pub const SPARSE_COST: u8 = 0;
// TODO: struct
// TODO: chunked
pub use cost::*;
pub use decompression::*;

// so fast that we can ignore the cost
pub const BITPACKED_NO_PATCHES_COST: u8 = 0;
pub const BITPACKED_WITH_PATCHES_COST: u8 = 0;
pub const CONSTANT_COST: u8 = 0;
pub const ZIGZAG_COST: u8 = 0;
mod cost {
// structural pass-throughs have no cost
pub const SPARSE_COST: u8 = 0;
pub const STRUCT_COST: u8 = 0;
pub const CHUNKED_COST: u8 = 0;

// "normal" encodings
pub const ALP_COST: u8 = 1;
pub const ALP_RD_COST: u8 = 1;
pub const DATE_TIME_PARTS_COST: u8 = 1;
pub const DICT_COST: u8 = 1;
pub const FOR_COST: u8 = 1;
pub const FSST_COST: u8 = 1;
pub const ROARING_BOOL_COST: u8 = 1;
pub const ROARING_INT_COST: u8 = 1;
pub const RUN_END_COST: u8 = 1;
// so fast that we can ignore the cost
pub const BITPACKED_NO_PATCHES_COST: u8 = 0;
pub const BITPACKED_WITH_PATCHES_COST: u8 = 0;
pub const CONSTANT_COST: u8 = 0;
pub const ZIGZAG_COST: u8 = 0;

// "expensive" encodings
pub const DELTA_COST: u8 = 2;
// "normal" encodings
pub const ALP_COST: u8 = 1;
pub const ALP_RD_COST: u8 = 1;
pub const DATE_TIME_PARTS_COST: u8 = 1;
pub const DICT_COST: u8 = 1;
pub const FOR_COST: u8 = 1;
pub const FSST_COST: u8 = 1;
pub const ROARING_BOOL_COST: u8 = 1;
pub const ROARING_INT_COST: u8 = 1;
pub const RUN_END_COST: u8 = 1;
pub const DELTA_COST: u8 = 1;
}

mod decompression {
// structural pass-throughs
pub const SPARSE_GIB_PER_S: f64 = f64::INFINITY;
pub const STRUCT_GIB_PER_S: f64 = f64::INFINITY;
pub const CHUNKED_GIB_PER_S: f64 = f64::INFINITY;

// benchmarked decompression throughput
pub const ALP_GIB_PER_S: f64 = 4.9;
pub const ALP_RD_GIB_PER_S: f64 = 3.3;
pub const BITPACKED_NO_PATCHES_GIB_PER_S: f64 = 36.0;
pub const BITPACKED_WITH_PATCHES_GIB_PER_S: f64 = 33.2;
pub const CONSTANT_GIB_PER_S: f64 = 200.0;
pub const DATE_TIME_PARTS_GIB_PER_S: f64 = 20.0; // this is a guess
pub const DELTA_GIB_PER_S: f64 = 4.7;
pub const DICT_GIB_PER_S: f64 = 10.0;
pub const FOR_GIB_PER_S: f64 = 8.7;
pub const FSST_GIB_PER_S: f64 = 2.0;
pub const ROARING_BOOL_GIB_PER_S: f64 = 5.0;
pub const ROARING_INT_GIB_PER_S: f64 = 5.0;
pub const RUN_END_GIB_PER_S: f64 = 10.0;
pub const ZIGZAG_GIB_PER_S: f64 = 25.5;
}
Loading
Loading