diff --git a/Cargo.lock b/Cargo.lock index 68e909aa1..f75c23baf 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -4325,6 +4325,7 @@ dependencies = [ "rand", "rstest", "serde", + "static_assertions", "tokio", "vortex-buffer", "vortex-datetime-dtype", @@ -4424,6 +4425,7 @@ dependencies = [ name = "vortex-dict" version = "0.12.0" dependencies = [ + "arrow-array", "arrow-buffer", "criterion", "hashbrown 0.15.0", diff --git a/Cargo.toml b/Cargo.toml index 6c5681ebe..27cd15bd6 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -125,6 +125,7 @@ serde = "1.0.197" serde_json = "1.0.116" serde_test = "1.0.176" simplelog = { version = "0.12.2", features = ["paris"] } +static_assertions = "1" tar = "0.4" tempfile = "3" thiserror = "1.0.58" diff --git a/bench-vortex/benches/bytes_at.rs b/bench-vortex/benches/bytes_at.rs index 9526f3a4d..167d5a0bd 100644 --- a/bench-vortex/benches/bytes_at.rs +++ b/bench-vortex/benches/bytes_at.rs @@ -7,7 +7,7 @@ use std::sync::Arc; use criterion::{criterion_group, criterion_main, BenchmarkId, Criterion}; use futures::executor::block_on; use futures::StreamExt; -use vortex::array::{PrimitiveArray, VarBinArray}; +use vortex::array::{PrimitiveArray, VarBinArray, VarBinViewArray}; use vortex::validity::Validity; use vortex::{Context, IntoArray, IntoCanonical}; use vortex_dtype::{DType, Nullability}; @@ -24,7 +24,7 @@ fn array_data_fixture() -> VarBinArray { .unwrap() } -fn array_view_fixture() -> VarBinArray { +fn array_view_fixture() -> VarBinViewArray { let array_data = array_data_fixture(); let mut buffer = Vec::new(); @@ -42,7 +42,7 @@ fn array_view_fixture() -> VarBinArray { .unwrap() .into_canonical() .unwrap() - .into_varbin() + .into_varbinview() .unwrap() } diff --git a/bench-vortex/src/tpch/schema.rs b/bench-vortex/src/tpch/schema.rs index eb4971b66..6d15ec37f 100644 --- a/bench-vortex/src/tpch/schema.rs +++ b/bench-vortex/src/tpch/schema.rs @@ -7,62 +7,62 @@ use lazy_static::lazy_static; lazy_static! { pub static ref NATION: Schema = Schema::new(vec![ Field::new("n_nationkey", DataType::Int64, false), - Field::new("n_name", DataType::Utf8, false), + Field::new("n_name", DataType::Utf8View, false), Field::new("n_regionkey", DataType::Int64, false), - Field::new("n_comment", DataType::Utf8, true), + Field::new("n_comment", DataType::Utf8View, true), ]); pub static ref REGION: Schema = Schema::new(vec![ Field::new("r_regionkey", DataType::Int64, false), - Field::new("r_name", DataType::Utf8, false), - Field::new("r_comment", DataType::Utf8, true), + Field::new("r_name", DataType::Utf8View, false), + Field::new("r_comment", DataType::Utf8View, true), ]); pub static ref PART: Schema = Schema::new(vec![ Field::new("p_partkey", DataType::Int64, false), - Field::new("p_name", DataType::Utf8, false), - Field::new("p_mfgr", DataType::Utf8, false), - Field::new("p_brand", DataType::Utf8, false), - Field::new("p_type", DataType::Utf8, false), + Field::new("p_name", DataType::Utf8View, false), + Field::new("p_mfgr", DataType::Utf8View, false), + Field::new("p_brand", DataType::Utf8View, false), + Field::new("p_type", DataType::Utf8View, false), Field::new("p_size", DataType::Int32, false), - Field::new("p_container", DataType::Utf8, false), + Field::new("p_container", DataType::Utf8View, false), Field::new("p_retailprice", DataType::Float64, false), - Field::new("p_comment", DataType::Utf8, false), + Field::new("p_comment", DataType::Utf8View, false), ]); pub static ref SUPPLIER: Schema = Schema::new(vec![ Field::new("s_suppkey", DataType::Int64, false), - Field::new("s_name", DataType::Utf8, false), - Field::new("s_address", DataType::Utf8, false), + Field::new("s_name", DataType::Utf8View, false), + Field::new("s_address", DataType::Utf8View, false), Field::new("s_nationkey", DataType::Int32, false), - Field::new("s_phone", DataType::Utf8, false), + Field::new("s_phone", DataType::Utf8View, false), Field::new("s_acctbal", DataType::Float64, false), - Field::new("s_comment", DataType::Utf8, false), + Field::new("s_comment", DataType::Utf8View, false), ]); pub static ref PARTSUPP: Schema = Schema::new(vec![ Field::new("ps_partkey", DataType::Int64, false), Field::new("ps_suppkey", DataType::Int64, false), Field::new("ps_availqty", DataType::Int64, false), Field::new("ps_supplycost", DataType::Float64, false), - Field::new("ps_comment", DataType::Utf8, false), + Field::new("ps_comment", DataType::Utf8View, false), ]); pub static ref CUSTOMER: Schema = Schema::new(vec![ Field::new("c_custkey", DataType::Int64, false), - Field::new("c_name", DataType::Utf8, false), - Field::new("c_address", DataType::Utf8, false), + Field::new("c_name", DataType::Utf8View, false), + Field::new("c_address", DataType::Utf8View, false), Field::new("c_nationkey", DataType::Int64, false), - Field::new("c_phone", DataType::Utf8, false), + Field::new("c_phone", DataType::Utf8View, false), Field::new("c_acctbal", DataType::Float64, false), - Field::new("c_mktsegment", DataType::Utf8, false), - Field::new("c_comment", DataType::Utf8, false), + Field::new("c_mktsegment", DataType::Utf8View, false), + Field::new("c_comment", DataType::Utf8View, false), ]); pub static ref ORDERS: Schema = Schema::new(vec![ Field::new("o_orderkey", DataType::Int64, false), Field::new("o_custkey", DataType::Int64, false), - Field::new("o_orderstatus", DataType::Utf8, false), + Field::new("o_orderstatus", DataType::Utf8View, false), Field::new("o_totalprice", DataType::Float64, false), Field::new("o_orderdate", DataType::Date32, false), - Field::new("o_orderpriority", DataType::Utf8, false), - Field::new("o_clerk", DataType::Utf8, false), + Field::new("o_orderpriority", DataType::Utf8View, false), + Field::new("o_clerk", DataType::Utf8View, false), Field::new("o_shippriority", DataType::Int32, false), - Field::new("o_comment", DataType::Utf8, false), + Field::new("o_comment", DataType::Utf8View, false), ]); pub static ref LINEITEM: Schema = Schema::new(vec![ Field::new("l_orderkey", DataType::Int64, false), @@ -73,13 +73,13 @@ lazy_static! { Field::new("l_extendedprice", DataType::Float64, false), Field::new("l_discount", DataType::Float64, false), Field::new("l_tax", DataType::Float64, false), - Field::new("l_returnflag", DataType::Utf8, false), - Field::new("l_linestatus", DataType::Utf8, false), + Field::new("l_returnflag", DataType::Utf8View, false), + Field::new("l_linestatus", DataType::Utf8View, false), Field::new("l_shipdate", DataType::Date32, false), Field::new("l_commitdate", DataType::Date32, false), Field::new("l_receiptdate", DataType::Date32, false), - Field::new("l_shipinstruct", DataType::Utf8, false), - Field::new("l_shipmode", DataType::Utf8, false), - Field::new("l_comment", DataType::Utf8, false), + Field::new("l_shipinstruct", DataType::Utf8View, false), + Field::new("l_shipmode", DataType::Utf8View, false), + Field::new("l_comment", DataType::Utf8View, false), ]); } diff --git a/encodings/dict/Cargo.toml b/encodings/dict/Cargo.toml index ae7c1615f..de79f9f5b 100644 --- a/encodings/dict/Cargo.toml +++ b/encodings/dict/Cargo.toml @@ -14,6 +14,7 @@ categories = { workspace = true } readme = { workspace = true } [dependencies] +arrow-array = { workspace = true } arrow-buffer = { workspace = true } hashbrown = { workspace = true } num-traits = { workspace = true } diff --git a/encodings/dict/src/compress.rs b/encodings/dict/src/compress.rs index 34c270a9c..035e47729 100644 --- a/encodings/dict/src/compress.rs +++ b/encodings/dict/src/compress.rs @@ -4,11 +4,14 @@ use hashbrown::hash_map::{Entry, RawEntryMut}; use hashbrown::{DefaultHashBuilder, HashMap}; use num_traits::AsPrimitive; use vortex::accessor::ArrayAccessor; -use vortex::array::{PrimitiveArray, VarBinArray}; +use vortex::array::{PrimitiveArray, VarBinArray, VarBinViewArray}; use vortex::validity::Validity; -use vortex::{ArrayDType, IntoArray}; +use vortex::{ArrayDType, IntoArray, IntoCanonical}; use vortex_dtype::{match_each_native_ptype, DType, NativePType, ToBytes}; -use vortex_error::VortexExpect as _; +use vortex_error::{VortexExpect as _, VortexUnwrap}; + +/// Statically assigned code for a null value. +pub const NULL_CODE: u64 = 0; #[derive(Debug)] struct Value(T); @@ -49,7 +52,7 @@ pub fn dict_encode_typed_primitive( ArrayAccessor::::with_iterator(array, |iter| { for ov in iter { match ov { - None => codes.push(0), + None => codes.push(NULL_CODE), Some(&v) => { let code = match lookup_dict.entry(Value(v)) { Entry::Occupied(o) => *o.get(), @@ -89,6 +92,21 @@ pub fn dict_encode_varbin(array: &VarBinArray) -> (PrimitiveArray, VarBinArray) .vortex_expect("Failed to dictionary encode varbin array") } +/// Dictionary encode a VarbinViewArray. +pub fn dict_encode_varbinview(array: &VarBinViewArray) -> (PrimitiveArray, VarBinViewArray) { + let (codes, values) = array + .with_iterator(|iter| dict_encode_typed_varbin(array.dtype().clone(), iter)) + .vortex_unwrap(); + ( + codes, + values + .into_canonical() + .vortex_expect("VarBin to canonical") + .into_varbinview() + .vortex_expect("VarBinView"), + ) +} + fn lookup_bytes<'a, T: NativePType + AsPrimitive>( offsets: &'a [T], bytes: &'a [u8], diff --git a/encodings/dict/src/compute.rs b/encodings/dict/src/compute.rs index ccd42f001..5bf223630 100644 --- a/encodings/dict/src/compute.rs +++ b/encodings/dict/src/compute.rs @@ -66,11 +66,12 @@ impl SliceFn for DictArray { #[cfg(test)] mod test { - use vortex::array::{PrimitiveArray, VarBinArray}; + use vortex::accessor::ArrayAccessor; + use vortex::array::{PrimitiveArray, VarBinViewArray}; use vortex::{IntoArray, IntoArrayVariant, ToArray}; use vortex_dtype::{DType, Nullability}; - use crate::{dict_encode_typed_primitive, dict_encode_varbin, DictArray}; + use crate::{dict_encode_typed_primitive, dict_encode_varbinview, DictArray}; #[test] fn flatten_nullable_primitive() { @@ -90,20 +91,25 @@ mod test { #[test] fn flatten_nullable_varbin() { - let reference = VarBinArray::from_iter( + let reference = VarBinViewArray::from_iter( vec![Some("a"), Some("b"), None, Some("a"), None, Some("b")], DType::Utf8(Nullability::Nullable), ); - let (codes, values) = dict_encode_varbin(&reference); + assert_eq!(reference.len(), 6); + let (codes, values) = dict_encode_varbinview(&reference); let dict = DictArray::try_new(codes.into_array(), values.into_array()).unwrap(); - let flattened_dict = dict.to_array().into_varbin().unwrap(); + let flattened_dict = dict.to_array().into_varbinview().unwrap(); assert_eq!( - flattened_dict.offsets().into_primitive().unwrap().buffer(), - reference.offsets().into_primitive().unwrap().buffer() - ); - assert_eq!( - flattened_dict.bytes().into_primitive().unwrap().buffer(), - reference.bytes().into_primitive().unwrap().buffer() + flattened_dict + .with_iterator(|iter| iter + .map(|slice| slice.map(|s| s.to_vec())) + .collect::>()) + .unwrap(), + reference + .with_iterator(|iter| iter + .map(|slice| slice.map(|s| s.to_vec())) + .collect::>()) + .unwrap(), ); } } diff --git a/encodings/fsst/src/array.rs b/encodings/fsst/src/array.rs index d7586f2e6..c7c36a859 100644 --- a/encodings/fsst/src/array.rs +++ b/encodings/fsst/src/array.rs @@ -4,12 +4,12 @@ use std::sync::Arc; use fsst::{Decompressor, Symbol}; use serde::{Deserialize, Serialize}; use vortex::array::visitor::{AcceptArrayVisitor, ArrayVisitor}; -use vortex::array::VarBinArray; +use vortex::array::{VarBin, VarBinArray}; use vortex::encoding::ids; use vortex::stats::{ArrayStatisticsCompute, StatsSet}; use vortex::validity::{ArrayValidity, LogicalValidity, Validity}; use vortex::variants::{ArrayVariants, BinaryArrayTrait, Utf8ArrayTrait}; -use vortex::{impl_encoding, Array, ArrayDType, ArrayTrait, IntoCanonical}; +use vortex::{impl_encoding, Array, ArrayDType, ArrayDef, ArrayTrait, IntoCanonical}; use vortex_dtype::{DType, Nullability, PType}; use vortex_error::{vortex_bail, VortexExpect, VortexResult}; @@ -73,6 +73,13 @@ impl FSSTArray { vortex_bail!(InvalidArgument: "uncompressed_lengths must have integer type and cannot be nullable"); } + if codes.encoding().id() != VarBin::ID { + vortex_bail!( + InvalidArgument: "codes must have varbin encoding, was {}", + codes.encoding().id() + ); + } + // Check: strings must be a Binary array. if !matches!(codes.dtype(), DType::Binary(_)) { vortex_bail!(InvalidArgument: "codes array must be DType::Binary type"); diff --git a/encodings/fsst/src/canonical.rs b/encodings/fsst/src/canonical.rs index be2a0cbc9..6dd9138e6 100644 --- a/encodings/fsst/src/canonical.rs +++ b/encodings/fsst/src/canonical.rs @@ -1,5 +1,5 @@ use vortex::array::{PrimitiveArray, VarBinArray}; -use vortex::{ArrayDType, Canonical, IntoArray, IntoCanonical}; +use vortex::{ArrayDType, Canonical, IntoArray, IntoArrayVariant, IntoCanonical}; use vortex_error::VortexResult; use crate::FSSTArray; @@ -44,12 +44,15 @@ impl IntoCanonical for FSSTArray { let offsets_array = PrimitiveArray::from(offsets).into_array(); let uncompressed_bytes_array = PrimitiveArray::from(uncompressed_bytes).into_array(); - Ok(Canonical::VarBin(VarBinArray::try_new( - offsets_array, - uncompressed_bytes_array, - self.dtype().clone(), - self.validity(), - )?)) + Ok(Canonical::VarBinView( + VarBinArray::try_new( + offsets_array, + uncompressed_bytes_array, + self.dtype().clone(), + self.validity(), + )? + .into_varbinview()?, + )) }) } } diff --git a/encodings/fsst/src/compute.rs b/encodings/fsst/src/compute.rs index a923e8c8e..e2793bd8a 100644 --- a/encodings/fsst/src/compute.rs +++ b/encodings/fsst/src/compute.rs @@ -1,14 +1,22 @@ -use vortex::array::varbin_scalar; +use fsst::Symbol; +use vortex::array::{varbin_scalar, ConstantArray}; use vortex::compute::unary::{scalar_at_unchecked, ScalarAtFn}; -use vortex::compute::{filter, slice, take, ArrayCompute, FilterFn, SliceFn, TakeFn}; -use vortex::{Array, ArrayDType, IntoArray}; +use vortex::compute::{ + compare, filter, slice, take, ArrayCompute, FilterFn, MaybeCompareFn, Operator, SliceFn, TakeFn, +}; +use vortex::{Array, ArrayDType, IntoArray, IntoArrayVariant}; use vortex_buffer::Buffer; +use vortex_dtype::DType; use vortex_error::{vortex_err, VortexResult, VortexUnwrap}; use vortex_scalar::Scalar; use crate::FSSTArray; impl ArrayCompute for FSSTArray { + fn compare(&self, other: &Array, operator: Operator) -> Option> { + MaybeCompareFn::maybe_compare(self, other, operator) + } + fn filter(&self) -> Option<&dyn FilterFn> { Some(self) } @@ -26,6 +34,68 @@ impl ArrayCompute for FSSTArray { } } +impl MaybeCompareFn for FSSTArray { + fn maybe_compare(&self, other: &Array, operator: Operator) -> Option> { + match (ConstantArray::try_from(other), operator) { + (Ok(constant_array), Operator::Eq | Operator::NotEq) => Some(compare_fsst_constant( + self, + &constant_array, + operator == Operator::Eq, + )), + _ => None, + } + } +} + +/// Specialized compare function implementation used when performing equals or not equals against +/// a constant. +fn compare_fsst_constant( + left: &FSSTArray, + right: &ConstantArray, + equal: bool, +) -> VortexResult { + let symbols = left.symbols().into_primitive()?; + let symbols_u64 = symbols.maybe_null_slice::(); + + let symbol_lens = left.symbol_lengths().into_primitive()?; + let symbol_lens_u8 = symbol_lens.maybe_null_slice::(); + + let mut compressor = fsst::CompressorBuilder::new(); + for (symbol, symbol_len) in symbols_u64.iter().zip(symbol_lens_u8.iter()) { + compressor.insert(Symbol::from_slice(&symbol.to_le_bytes()), *symbol_len as _); + } + let compressor = compressor.build(); + + let encoded_scalar = match left.dtype() { + DType::Utf8(_) => right + .scalar_value() + .as_buffer_string()? + .map(|scalar| Buffer::from(compressor.compress(scalar.as_bytes()))), + DType::Binary(_) => right + .scalar_value() + .as_buffer()? + .map(|scalar| Buffer::from(compressor.compress(scalar.as_slice()))), + + _ => unreachable!("FSSTArray can only have string or binary data type"), + }; + + match encoded_scalar { + None => { + // Eq and NotEq on null values yield nulls, per the Arrow behavior. + Ok(right.clone().into_array()) + } + Some(encoded_scalar) => { + let rhs = ConstantArray::new(encoded_scalar, left.len()); + + compare( + left.codes(), + rhs, + if equal { Operator::Eq } else { Operator::NotEq }, + ) + } + } +} + impl SliceFn for FSSTArray { fn slice(&self, start: usize, stop: usize) -> VortexResult { // Slicing an FSST array leaves the symbol table unmodified, @@ -87,3 +157,75 @@ impl FilterFn for FSSTArray { .into_array()) } } + +#[cfg(test)] +mod tests { + use vortex::array::{ConstantArray, VarBinArray}; + use vortex::compute::unary::scalar_at_unchecked; + use vortex::compute::{MaybeCompareFn, Operator}; + use vortex::{IntoArray, IntoArrayVariant}; + use vortex_dtype::{DType, Nullability}; + use vortex_scalar::Scalar; + + use crate::{fsst_compress, fsst_train_compressor}; + + #[test] + fn test_compare_fsst() { + let lhs = VarBinArray::from_iter( + [ + Some("hello"), + None, + Some("world"), + None, + Some("this is a very long string"), + ], + DType::Utf8(Nullability::Nullable), + ) + .into_array(); + let compressor = fsst_train_compressor(&lhs).unwrap(); + let lhs = fsst_compress(&lhs, &compressor).unwrap(); + + let rhs = ConstantArray::new("world", lhs.len()).into_array(); + + // Ensure fastpath for Eq exists, and returns correct answer + let equals: Vec = MaybeCompareFn::maybe_compare(&lhs, &rhs, Operator::Eq) + .unwrap() + .unwrap() + .into_bool() + .unwrap() + .boolean_buffer() + .into_iter() + .collect(); + + assert_eq!(equals, vec![false, false, true, false, false]); + + // Ensure fastpath for Eq exists, and returns correct answer + let not_equals: Vec = MaybeCompareFn::maybe_compare(&lhs, &rhs, Operator::NotEq) + .unwrap() + .unwrap() + .into_bool() + .unwrap() + .boolean_buffer() + .into_iter() + .collect(); + + assert_eq!(not_equals, vec![true, true, false, true, true]); + + // Ensure null constants are handled correctly. + let null_rhs = + ConstantArray::new(Scalar::null(DType::Utf8(Nullability::Nullable)), lhs.len()); + let equals_null = MaybeCompareFn::maybe_compare(&lhs, null_rhs.as_ref(), Operator::Eq) + .unwrap() + .unwrap(); + for idx in 0..lhs.len() { + assert!(scalar_at_unchecked(&equals_null, idx).is_null()); + } + + let noteq_null = MaybeCompareFn::maybe_compare(&lhs, null_rhs.as_ref(), Operator::NotEq) + .unwrap() + .unwrap(); + for idx in 0..lhs.len() { + assert!(scalar_at_unchecked(¬eq_null, idx).is_null()); + } + } +} diff --git a/encodings/fsst/tests/fsst_tests.rs b/encodings/fsst/tests/fsst_tests.rs index 4642d80d4..1ca6cdf14 100644 --- a/encodings/fsst/tests/fsst_tests.rs +++ b/encodings/fsst/tests/fsst_tests.rs @@ -102,7 +102,7 @@ fn test_fsst_array_ops() { .clone() .into_canonical() .unwrap() - .into_varbin() + .into_varbinview() .unwrap() .into_array(); diff --git a/fuzz/src/filter.rs b/fuzz/src/filter.rs index 8ff0f1634..b47d0eaef 100644 --- a/fuzz/src/filter.rs +++ b/fuzz/src/filter.rs @@ -1,5 +1,5 @@ use vortex::accessor::ArrayAccessor; -use vortex::array::{BoolArray, PrimitiveArray, StructArray, VarBinArray}; +use vortex::array::{BoolArray, PrimitiveArray, StructArray, VarBinViewArray}; use vortex::validity::{ArrayValidity, Validity}; use vortex::variants::StructArrayTrait; use vortex::{Array, ArrayDType, IntoArray, IntoArrayVariant}; @@ -60,7 +60,7 @@ pub fn filter_canonical_array(array: &Array, filter: &[bool]) -> Array { .into_array() }), DType::Utf8(_) | DType::Binary(_) => { - let utf8 = array.clone().into_varbin().unwrap(); + let utf8 = array.clone().into_varbinview().unwrap(); let values = utf8 .with_iterator(|iter| { iter.zip(filter.iter()) @@ -69,7 +69,7 @@ pub fn filter_canonical_array(array: &Array, filter: &[bool]) -> Array { .collect::>() }) .unwrap(); - VarBinArray::from_iter(values, array.dtype().clone()).into_array() + VarBinViewArray::from_iter(values, array.dtype().clone()).into_array() } DType::Struct(..) => { let struct_array = array.clone().into_struct().unwrap(); diff --git a/fuzz/src/search_sorted.rs b/fuzz/src/search_sorted.rs index 03eb9129f..24f901792 100644 --- a/fuzz/src/search_sorted.rs +++ b/fuzz/src/search_sorted.rs @@ -99,7 +99,7 @@ pub fn search_sorted_canonical_array( }) } DType::Utf8(_) | DType::Binary(_) => { - let utf8 = array.clone().into_varbin().unwrap(); + let utf8 = array.clone().into_varbinview().unwrap(); let opt_values = utf8 .with_iterator(|iter| iter.map(|v| v.map(|u| u.to_vec())).collect::>()) .unwrap(); diff --git a/fuzz/src/slice.rs b/fuzz/src/slice.rs index 85ee760f4..2add1f645 100644 --- a/fuzz/src/slice.rs +++ b/fuzz/src/slice.rs @@ -1,5 +1,5 @@ use vortex::accessor::ArrayAccessor; -use vortex::array::{BoolArray, PrimitiveArray, StructArray, VarBinArray}; +use vortex::array::{BoolArray, PrimitiveArray, StructArray, VarBinViewArray}; use vortex::validity::{ArrayValidity, Validity}; use vortex::variants::StructArrayTrait; use vortex::{Array, ArrayDType, IntoArray, IntoArrayVariant}; @@ -46,11 +46,11 @@ pub fn slice_canonical_array(array: &Array, start: usize, stop: usize) -> Array .into_array() }), DType::Utf8(_) | DType::Binary(_) => { - let utf8 = array.clone().into_varbin().unwrap(); + let utf8 = array.clone().into_varbinview().unwrap(); let values = utf8 .with_iterator(|iter| iter.map(|v| v.map(|u| u.to_vec())).collect::>()) .unwrap(); - VarBinArray::from_iter(Vec::from(&values[start..stop]), array.dtype().clone()) + VarBinViewArray::from_iter(Vec::from(&values[start..stop]), array.dtype().clone()) .into_array() } DType::Struct(..) => { diff --git a/fuzz/src/sort.rs b/fuzz/src/sort.rs index 8fa08b3a9..4af2110b3 100644 --- a/fuzz/src/sort.rs +++ b/fuzz/src/sort.rs @@ -1,7 +1,7 @@ use std::cmp::Ordering; use vortex::accessor::ArrayAccessor; -use vortex::array::{BoolArray, PrimitiveArray, VarBinArray}; +use vortex::array::{BoolArray, PrimitiveArray, VarBinViewArray}; use vortex::compute::unary::scalar_at; use vortex::validity::ArrayValidity; use vortex::{Array, ArrayDType, IntoArray, IntoArrayVariant}; @@ -53,12 +53,12 @@ pub fn sort_canonical_array(array: &Array) -> Array { }) } DType::Utf8(_) | DType::Binary(_) => { - let utf8 = array.clone().into_varbin().unwrap(); + let utf8 = array.clone().into_varbinview().unwrap(); let mut opt_values = utf8 .with_iterator(|iter| iter.map(|v| v.map(|u| u.to_vec())).collect::>()) .unwrap(); sort_opt_slice(&mut opt_values); - VarBinArray::from_iter(opt_values, array.dtype().clone()).into_array() + VarBinViewArray::from_iter(opt_values, array.dtype().clone()).into_array() } DType::Struct(..) => { let mut sort_indices = (0..array.len()).collect::>(); diff --git a/fuzz/src/take.rs b/fuzz/src/take.rs index 59ee93d3f..fe9f41096 100644 --- a/fuzz/src/take.rs +++ b/fuzz/src/take.rs @@ -1,5 +1,5 @@ use vortex::accessor::ArrayAccessor; -use vortex::array::{BoolArray, PrimitiveArray, StructArray, VarBinArray}; +use vortex::array::{BoolArray, PrimitiveArray, StructArray, VarBinViewArray}; use vortex::validity::{ArrayValidity, Validity}; use vortex::variants::StructArrayTrait; use vortex::{Array, ArrayDType, IntoArray, IntoArrayVariant}; @@ -46,11 +46,11 @@ pub fn take_canonical_array(array: &Array, indices: &[usize]) -> Array { .into_array() }), DType::Utf8(_) | DType::Binary(_) => { - let utf8 = array.clone().into_varbin().unwrap(); + let utf8 = array.clone().into_varbinview().unwrap(); let values = utf8 .with_iterator(|iter| iter.map(|v| v.map(|u| u.to_vec())).collect::>()) .unwrap(); - VarBinArray::from_iter( + VarBinViewArray::from_iter( indices.iter().map(|i| values[*i].clone()), array.dtype().clone(), ) diff --git a/pyvortex/pyproject.toml b/pyvortex/pyproject.toml index e55b0a190..5cea0decf 100644 --- a/pyvortex/pyproject.toml +++ b/pyvortex/pyproject.toml @@ -5,7 +5,7 @@ name = "vortex-array" dynamic = ["version", "description", "authors"] readme = "README.md" dependencies = [ - "pyarrow>=15.0.0", + "pyarrow>=17.0.0", "substrait>=0.23.0", ] requires-python = ">= 3.11" @@ -35,7 +35,7 @@ build-backend = "maturin" managed = true dev-dependencies = [ "ipython>=8.26.0", - "pandas>=2.2.2", + "pandas>=2.2.3", "pip", "pyright>=1.1.385", "polars>=1.9.0", diff --git a/pyvortex/python/vortex/encoding.py b/pyvortex/python/vortex/encoding.py index c66b4fcce..ac522d375 100644 --- a/pyvortex/python/vortex/encoding.py +++ b/pyvortex/python/vortex/encoding.py @@ -5,6 +5,24 @@ from ._lib import encoding as _encoding +# HACK: monkey-patch a fixed implementation of the pd.ArrowDtype.type property accessor. +# See https://github.com/pandas-dev/pandas/issues/60068 for more details + +_old_ArrowDtype_type = pandas.ArrowDtype.type + + +@property +def __ArrowDtype_type_patched(self): + if pyarrow.types.is_string_view(self.pyarrow_dtype): + return str + if pyarrow.types.is_binary_view(self.pyarrow_dtype): + return bytes + return _old_ArrowDtype_type(self) + + +pandas.ArrowDtype.type = __ArrowDtype_type_patched + + if TYPE_CHECKING: import numpy @@ -52,7 +70,7 @@ def _Array_to_arrow_table(self: _encoding.Array) -> pyarrow.Table: >>> array.to_arrow_table() pyarrow.Table age: int64 - name: string + name: string_view ---- age: [[25,31,33,57]] name: [["Joseph","Narendra","Angela","Mikhail"]] @@ -94,6 +112,7 @@ def _Array_to_pandas(self: _encoding.Array) -> "pandas.DataFrame": 2 33 Angela 3 57 Mikhail + Lift the struct fields to the top-level in the dataframe: """ @@ -296,7 +315,7 @@ def array(obj: pyarrow.Array | list) -> Array: >>> arrow = pyarrow.array(['Hello', 'it', 'is', 'me']) >>> vortex.encoding.array(arrow).to_arrow_array() - + [ "Hello", "it", diff --git a/pyvortex/src/array.rs b/pyvortex/src/array.rs index bff63bada..3daf16e2f 100644 --- a/pyvortex/src/array.rs +++ b/pyvortex/src/array.rs @@ -427,7 +427,7 @@ impl PyArray { /// >>> a = vortex.encoding.array(['a', 'b', 'c', 'd']) /// >>> indices = vortex.encoding.array([0, 2]) /// >>> a.take(indices).to_arrow_array() - /// + /// /// [ /// "a", /// "c" @@ -438,7 +438,7 @@ impl PyArray { /// >>> a = vortex.encoding.array(['a', 'b', 'c', 'd']) /// >>> indices = vortex.encoding.array([0, 1, 1, 0]) /// >>> a.take(indices).to_arrow_array() - /// + /// /// [ /// "a", /// "b", @@ -482,7 +482,7 @@ impl PyArray { /// /// >>> a = vortex.encoding.array(['a', 'b', 'c', 'd']) /// >>> a.slice(1, 3).to_arrow_array() - /// + /// /// [ /// "b", /// "c" @@ -492,7 +492,7 @@ impl PyArray { /// /// >>> a = vortex.encoding.array(['a', 'b', 'c', 'd']) /// >>> a.slice(3, 3).to_arrow_array() - /// + /// /// [] /// /// Unlike Python, it is an error to slice outside the bounds of the array: diff --git a/pyvortex/src/expr.rs b/pyvortex/src/expr.rs index 72230ed44..5b5abe1b2 100644 --- a/pyvortex/src/expr.rs +++ b/pyvortex/src/expr.rs @@ -37,7 +37,7 @@ use crate::dtype::PyDType; /// [ /// 57 /// ] -/// -- child 1 type: string +/// -- child 1 type: string_view /// [ /// "Mikhail" /// ] @@ -55,7 +55,7 @@ use crate::dtype::PyDType; /// 25, /// 31 /// ] -/// -- child 1 type: string +/// -- child 1 type: string_view /// [ /// "Joseph", /// null @@ -72,7 +72,7 @@ use crate::dtype::PyDType; /// [ /// 25 /// ] -/// -- child 1 type: string +/// -- child 1 type: string_view /// [ /// "Joseph" /// ] @@ -90,7 +90,7 @@ use crate::dtype::PyDType; /// [ /// 25 /// ] -/// -- child 1 type: string +/// -- child 1 type: string_view /// [ /// "Joseph" /// ] diff --git a/pyvortex/src/io.rs b/pyvortex/src/io.rs index c49ad0d86..b7622f732 100644 --- a/pyvortex/src/io.rs +++ b/pyvortex/src/io.rs @@ -50,7 +50,7 @@ use crate::{PyArray, TOKIO_RUNTIME}; /// 57, /// null /// ] -/// -- child 1 type: string +/// -- child 1 type: string_view /// [ /// "Joseph", /// null, @@ -80,7 +80,7 @@ use crate::{PyArray, TOKIO_RUNTIME}; /// >>> d.to_arrow_array() /// /// -- is_valid: all not null -/// -- child 0 type: string +/// -- child 0 type: string_view /// [ /// "Joseph", /// null, @@ -100,7 +100,7 @@ use crate::{PyArray, TOKIO_RUNTIME}; /// [ /// 57 /// ] -/// -- child 1 type: string +/// -- child 1 type: string_view /// [ /// "Mikhail" /// ] diff --git a/pyvortex/test/test_array.py b/pyvortex/test/test_array.py index b71167c56..9c7c10e02 100644 --- a/pyvortex/test/test_array.py +++ b/pyvortex/test/test_array.py @@ -9,13 +9,13 @@ def test_primitive_array_round_trip(): def test_array_with_nulls(): - a = pa.array([b"123", None]) + a = pa.array([b"123", None], type=pa.string_view()) arr = vortex.array(a) assert arr.to_arrow_array() == a def test_varbin_array_round_trip(): - a = pa.array(["a", "b", "c"]) + a = pa.array(["a", "b", "c"], type=pa.string_view()) arr = vortex.array(a) assert arr.to_arrow_array() == a @@ -24,7 +24,7 @@ def test_varbin_array_take(): a = vortex.array(pa.array(["a", "b", "c", "d"])) assert a.take(vortex.array(pa.array([0, 2]))).to_arrow_array() == pa.array( ["a", "c"], - type=pa.utf8(), + type=pa.string_view(), ) diff --git a/pyvortex/test/test_compress.py b/pyvortex/test/test_compress.py index af42cc327..213228ee6 100644 --- a/pyvortex/test/test_compress.py +++ b/pyvortex/test/test_compress.py @@ -66,12 +66,15 @@ def test_table_encode(): table = pa.table( { "number": pa.chunked_array([pa.array([0, 1, 2]), pa.array([3, 4, 5])]), - "string": pa.chunked_array([pa.array(["a", "b", "c"]), pa.array(["d", "e", "f"])]), + "string": pa.chunked_array( + [pa.array(["a", "b", "c"], type=pa.string_view()), pa.array(["d", "e", "f"], type=pa.string_view())] + ), } ) encoded = vortex.array(table) assert encoded.to_arrow_array().combine_chunks() == pa.StructArray.from_arrays( - [pa.array([0, 1, 2, 3, 4, 5]), pa.array(["a", "b", "c", "d", "e", "f"])], names=["number", "string"] + [pa.array([0, 1, 2, 3, 4, 5]), pa.array(["a", "b", "c", "d", "e", "f"], type=pa.string_view())], + names=["number", "string"], ) diff --git a/pyvortex/test/test_dataset.py b/pyvortex/test/test_dataset.py index 0a70cbd18..d09961eda 100644 --- a/pyvortex/test/test_dataset.py +++ b/pyvortex/test/test_dataset.py @@ -29,7 +29,7 @@ def ds(tmpdir_factory) -> vortex.dataset.VortexDataset: def test_schema(ds): assert ds.schema == pa.schema( - [("bool", pa.bool_()), ("float", pa.float64()), ("index", pa.int64()), ("string", pa.utf8())] + [("bool", pa.bool_()), ("float", pa.float64()), ("index", pa.int64()), ("string", pa.string_view())] ) @@ -47,21 +47,30 @@ def test_take(ds): def test_to_batches(ds): - assert sum(len(x) for x in ds.to_batches("string", "bool")) == 1_000_000 + assert sum(len(x) for x in ds.to_batches("float", "bool")) == 1_000_000 + + schema = pa.struct( + [("bool", pa.bool_()), ("float", pa.float64()), ("index", pa.int64()), ("string", pa.string_view())] + ) chunk0 = next(ds.to_batches(columns=["string", "bool"])) - assert chunk0.to_struct_array() == pa.array([record(x) for x in range(1 << 16)]) + assert chunk0.to_struct_array() == pa.array([record(x) for x in range(1 << 16)], type=schema) def test_to_table(ds): - tbl = ds.to_table(columns=["bool", "string"], filter=pc.field("float") > 100) - assert 0 == len(tbl.filter(pc.field("string") <= "10000")) + tbl = ds.to_table(columns=["bool", "float"], filter=pc.field("float") > 100) + # TODO(aduffy): add back once pyarrow supports casting to/from string_view + # assert 0 == len(tbl.filter(pc.field("string") <= "10000")) assert tbl.slice(0, 10) == pa.Table.from_struct_array( - pa.array([record(x, columns={"string", "bool"}) for x in range(10001, 10011)]) + pa.array([record(x, columns={"float", "bool"}) for x in range(10001, 10011)]) ) - assert ds.to_table(columns=["bool", "string"]).schema == pa.schema([("bool", pa.bool_()), ("string", pa.utf8())]) - assert ds.to_table(columns=["string", "bool"]).schema == pa.schema([("string", pa.utf8()), ("bool", pa.bool_())]) + assert ds.to_table(columns=["bool", "string"]).schema == pa.schema( + [("bool", pa.bool_()), ("string", pa.string_view())] + ) + assert ds.to_table(columns=["string", "bool"]).schema == pa.schema( + [("string", pa.string_view()), ("bool", pa.bool_())] + ) def test_to_record_batch_reader_with_polars(ds): diff --git a/requirements-dev.lock b/requirements-dev.lock index 73b3c5a31..5a9bc870c 100644 --- a/requirements-dev.lock +++ b/requirements-dev.lock @@ -76,7 +76,7 @@ packaging==24.0 # via pytest # via sphinx # via xarray -pandas==2.2.2 +pandas==2.2.3 # via pyvortex # via vortex-array # via xarray @@ -101,7 +101,7 @@ pure-eval==0.2.3 # via stack-data py-cpuinfo==9.0.0 # via pytest-benchmark -pyarrow==15.0.2 +pyarrow==17.0.0 # via vortex-array pydata-sphinx-theme==0.15.4 pygments==2.17.2 diff --git a/vortex-array/Cargo.toml b/vortex-array/Cargo.toml index e230d89a8..32ed554b6 100644 --- a/vortex-array/Cargo.toml +++ b/vortex-array/Cargo.toml @@ -45,6 +45,7 @@ paste = { workspace = true } pin-project = { workspace = true } rand = { workspace = true } serde = { workspace = true, features = ["derive"] } +static_assertions = { workspace = true } vortex-buffer = { workspace = true } vortex-datetime-dtype = { workspace = true } vortex-dtype = { workspace = true } @@ -95,3 +96,7 @@ harness = false [[bench]] name = "fn" harness = false + +[[bench]] +name = "take_strings" +harness = false diff --git a/vortex-array/benches/take_strings.rs b/vortex-array/benches/take_strings.rs new file mode 100644 index 000000000..43c4fa645 --- /dev/null +++ b/vortex-array/benches/take_strings.rs @@ -0,0 +1,49 @@ +#![allow(clippy::unwrap_used)] + +use criterion::{criterion_group, criterion_main, Criterion}; +use vortex::array::{PrimitiveArray, VarBinArray}; +use vortex::compute::take; +use vortex::validity::Validity; +use vortex::{Array, IntoArray, IntoArrayVariant}; +use vortex_dtype::{DType, Nullability}; + +// Try take with different array frequency. +fn fixture(len: usize) -> VarBinArray { + let values: [Option<&'static str>; 3] = + [Some("inlined"), None, Some("verylongstring--notinlined")]; + + VarBinArray::from_iter( + values.into_iter().cycle().take(len), + DType::Utf8(Nullability::Nullable), + ) +} + +// What fraction of the indices to take. +fn indices(len: usize) -> Array { + PrimitiveArray::from_vec( + (0..len) + .filter_map(|x| (x % 2 == 0).then_some(x as u64)) + .collect(), + Validity::NonNullable, + ) + .into_array() +} + +fn bench_varbin(c: &mut Criterion) { + let array = fixture(65_535); + let indices = indices(1024); + + c.bench_function("varbin", |b| b.iter(|| take(&array, &indices).unwrap())); +} + +fn bench_varbinview(c: &mut Criterion) { + let array = fixture(65_535).into_varbinview().unwrap(); + let indices = indices(1024); + + c.bench_function("varbinview", |b| { + b.iter(|| take(array.as_ref(), &indices).unwrap()) + }); +} + +criterion_group!(bench_take, bench_varbin, bench_varbinview); +criterion_main!(bench_take); diff --git a/vortex-array/src/array/chunked/canonical.rs b/vortex-array/src/array/chunked/canonical.rs index 54fbeb2ae..a1dbcd08a 100644 --- a/vortex-array/src/array/chunked/canonical.rs +++ b/vortex-array/src/array/chunked/canonical.rs @@ -1,5 +1,5 @@ -use arrow_buffer::{BooleanBufferBuilder, Buffer, MutableBuffer}; -use vortex_dtype::{DType, Nullability, PType, StructDType}; +use arrow_buffer::{BooleanBufferBuilder, Buffer, MutableBuffer, ScalarBuffer}; +use vortex_dtype::{DType, PType, StructDType}; use vortex_error::{vortex_bail, vortex_err, ErrString, VortexResult}; use crate::array::chunked::ChunkedArray; @@ -7,9 +7,7 @@ use crate::array::extension::ExtensionArray; use crate::array::null::NullArray; use crate::array::primitive::PrimitiveArray; use crate::array::struct_::StructArray; -use crate::array::varbin::VarBinArray; -use crate::array::BoolArray; -use crate::compute::unary::{scalar_at_unchecked, try_cast}; +use crate::array::{BinaryView, BoolArray, VarBinViewArray}; use crate::validity::Validity; use crate::{ Array, ArrayDType, ArrayValidity, Canonical, IntoArray, IntoArrayVariant, IntoCanonical, @@ -108,12 +106,12 @@ pub(crate) fn try_canonicalize_chunks( Ok(Canonical::Primitive(prim_array)) } DType::Utf8(_) => { - let varbin_array = pack_varbin(chunks.as_slice(), validity, dtype)?; - Ok(Canonical::VarBin(varbin_array)) + let varbin_array = pack_views(chunks.as_slice(), dtype, validity)?; + Ok(Canonical::VarBinView(varbin_array)) } DType::Binary(_) => { - let varbin_array = pack_varbin(chunks.as_slice(), validity, dtype)?; - Ok(Canonical::VarBin(varbin_array)) + let varbin_array = pack_views(chunks.as_slice(), dtype, validity)?; + Ok(Canonical::VarBinView(varbin_array)) } DType::Null => { let len = chunks.iter().map(|chunk| chunk.len()).sum(); @@ -188,48 +186,53 @@ fn pack_primitives( )) } -/// Builds a new [VarBinArray] by repacking the values from the chunks into a single +/// Builds a new [VarBinViewArray] by repacking the values from the chunks into a single /// contiguous array. /// /// It is expected this function is only called from [try_canonicalize_chunks], and thus all chunks have /// been checked to have the same DType already. -fn pack_varbin(chunks: &[Array], validity: Validity, dtype: &DType) -> VortexResult { - let len: usize = chunks.iter().map(|c| c.len()).sum(); - let mut offsets = Vec::with_capacity(len + 1); - offsets.push(0); - let mut data_bytes = Vec::new(); - +fn pack_views( + chunks: &[Array], + dtype: &DType, + validity: Validity, +) -> VortexResult { + let total_len = chunks.iter().map(|a| a.len()).sum(); + let mut views: Vec = Vec::with_capacity(total_len); + let mut buffers = Vec::new(); for chunk in chunks { - let chunk = chunk.clone().into_varbin()?; - let offsets_arr = try_cast( - chunk.offsets().into_primitive()?.as_ref(), - &DType::Primitive(PType::I32, Nullability::NonNullable), - )? - .into_primitive()?; + // Each chunk's views have buffer IDs that are zero-referenced. + // As part of the packing operation, we need to rewrite them to be referenced to the global + // merged buffers list. + let buffers_offset = buffers.len(); + let canonical_chunk = chunk.clone().into_varbinview()?; - let first_offset_value: usize = - usize::try_from(&scalar_at_unchecked(offsets_arr.as_ref(), 0))?; - let primitive_bytes = chunk.sliced_bytes()?.into_primitive()?; - data_bytes.extend_from_slice(primitive_bytes.buffer()); + for buffer in canonical_chunk.buffers() { + let canonical_buffer = buffer.into_canonical()?.into_primitive()?.into_array(); + buffers.push(canonical_buffer); + } - let adjustment_from_previous = *offsets - .last() - .ok_or_else(|| vortex_err!("VarBinArray offsets must have at least one element"))?; - offsets.extend( - offsets_arr - .maybe_null_slice::() - .iter() - .skip(1) - .map(|off| off + adjustment_from_previous - first_offset_value as i32), - ); + for view in canonical_chunk.view_slice() { + if view.is_inlined() { + // Inlined views can be copied directly into the output + views.push(view.as_u128()); + } else { + // Referencing views must have their buffer_index adjusted with new offsets + let view_ref = view.as_view(); + views.push( + BinaryView::new_view( + view.len(), + *view_ref.prefix(), + (buffers_offset as u32) + view_ref.buffer_index(), + view_ref.offset(), + ) + .as_u128(), + ); + } + } } - VarBinArray::try_new( - PrimitiveArray::from(offsets).into_array(), - PrimitiveArray::from(data_bytes).into_array(), - dtype.clone(), - validity, - ) + let views_buffer: Buffer = ScalarBuffer::::from(views).into_inner(); + VarBinViewArray::try_new(Array::from(views_buffer), buffers, dtype.clone(), validity) } #[cfg(test)] @@ -237,31 +240,25 @@ mod tests { use vortex_dtype::{DType, Nullability}; use crate::accessor::ArrayAccessor; - use crate::array::builder::VarBinBuilder; - use crate::array::chunked::canonical::pack_varbin; - use crate::array::{ChunkedArray, StructArray, VarBinArray}; + use crate::array::chunked::canonical::pack_views; + use crate::array::{ChunkedArray, StructArray, VarBinViewArray}; use crate::compute::slice; use crate::validity::Validity; use crate::variants::StructArrayTrait; use crate::{ArrayDType, IntoArray, IntoArrayVariant, ToArray}; - fn varbin_array() -> VarBinArray { - let mut builder = VarBinBuilder::::with_capacity(4); - builder.push_value("foo"); - builder.push_value("bar"); - builder.push_value("baz"); - builder.push_value("quak"); - builder.finish(DType::Utf8(Nullability::NonNullable)) + fn stringview_array() -> VarBinViewArray { + VarBinViewArray::from_iter_str(["foo", "bar", "baz", "quak"]) } #[test] pub fn pack_sliced_varbin() { - let array1 = slice(varbin_array().as_ref(), 1, 3).unwrap(); - let array2 = slice(varbin_array().as_ref(), 2, 4).unwrap(); - let packed = pack_varbin( + let array1 = slice(stringview_array().as_ref(), 1, 3).unwrap(); + let array2 = slice(stringview_array().as_ref(), 2, 4).unwrap(); + let packed = pack_views( &[array1, array2], - Validity::NonNullable, &DType::Utf8(Nullability::NonNullable), + Validity::NonNullable, ) .unwrap(); assert_eq!(packed.len(), 4); @@ -279,7 +276,7 @@ mod tests { pub fn pack_nested_structs() { let struct_array = StructArray::try_new( vec!["a".into()].into(), - vec![varbin_array().into_array()], + vec![stringview_array().into_array()], 4, Validity::NonNullable, ) @@ -296,8 +293,12 @@ mod tests { .unwrap() .into_array(); let canonical_struct = chunked.into_struct().unwrap(); - let canonical_varbin = canonical_struct.field(0).unwrap().into_varbin().unwrap(); - let original_varbin = struct_array.field(0).unwrap().into_varbin().unwrap(); + let canonical_varbin = canonical_struct + .field(0) + .unwrap() + .into_varbinview() + .unwrap(); + let original_varbin = struct_array.field(0).unwrap().into_varbinview().unwrap(); let orig_values = original_varbin .with_iterator(|it| it.map(|a| a.map(|v| v.to_vec())).collect::>()) .unwrap(); diff --git a/vortex-array/src/array/constant/canonical.rs b/vortex-array/src/array/constant/canonical.rs index f77760438..a529f849a 100644 --- a/vortex-array/src/array/constant/canonical.rs +++ b/vortex-array/src/array/constant/canonical.rs @@ -1,16 +1,15 @@ -use std::iter; - -use arrow_buffer::BooleanBuffer; +use arrow_array::builder::make_view; +use arrow_buffer::{BooleanBuffer, BufferBuilder}; +use vortex_buffer::Buffer; use vortex_dtype::{match_each_native_ptype, DType, Nullability, PType}; use vortex_error::{vortex_bail, VortexResult}; -use vortex_scalar::{BinaryScalar, BoolScalar, Utf8Scalar}; +use vortex_scalar::{BinaryScalar, BoolScalar, Scalar, Utf8Scalar}; use crate::array::constant::ConstantArray; use crate::array::primitive::PrimitiveArray; -use crate::array::varbin::VarBinArray; -use crate::array::BoolArray; +use crate::array::{BinaryView, BoolArray, VarBinViewArray}; use crate::validity::Validity; -use crate::{ArrayDType, Canonical, IntoCanonical}; +use crate::{ArrayDType, Canonical, IntoArray, IntoCanonical}; impl IntoCanonical for ConstantArray { fn into_canonical(self) -> VortexResult { @@ -39,20 +38,16 @@ impl IntoCanonical for ConstantArray { let value = s.value(); let const_value = value.as_ref().map(|v| v.as_bytes()); - return Ok(Canonical::VarBin(VarBinArray::from_iter( - iter::repeat(const_value).take(self.len()), - DType::Utf8(validity.nullability()), - ))); + return canonical_byte_view(const_value, self.dtype(), self.len()) + .map(Canonical::VarBinView); } if let Ok(b) = BinaryScalar::try_from(scalar) { let value = b.value(); let const_value = value.as_ref().map(|v| v.as_slice()); - return Ok(Canonical::VarBin(VarBinArray::from_iter( - iter::repeat(const_value).take(self.len()), - DType::Binary(validity.nullability()), - ))); + return canonical_byte_view(const_value, self.dtype(), self.len()) + .map(Canonical::VarBinView); } if let Ok(ptype) = PType::try_from(scalar.dtype()) { @@ -67,3 +62,80 @@ impl IntoCanonical for ConstantArray { vortex_bail!("Unsupported scalar type {}", self.dtype()) } } + +fn canonical_byte_view( + scalar_bytes: Option<&[u8]>, + dtype: &DType, + len: usize, +) -> VortexResult { + match scalar_bytes { + None => { + let views = ConstantArray::new(Scalar::null(dtype.clone()), len); + + VarBinViewArray::try_new( + views.into_array(), + Vec::new(), + dtype.clone(), + Validity::AllInvalid, + ) + } + Some(scalar_bytes) => { + // Create a view to hold the scalar bytes. + // If the scalar cannot be inlined, allocate a single buffer large enough to hold it. + let view: u128 = make_view(scalar_bytes, 0, 0); + let mut buffers = Vec::new(); + if scalar_bytes.len() >= BinaryView::MAX_INLINED_SIZE { + buffers.push( + PrimitiveArray::new( + Buffer::from(scalar_bytes), + PType::U8, + Validity::NonNullable, + ) + .into_array(), + ); + } + + // Clone our constant view `len` times. + // TODO(aduffy): switch this out for a ConstantArray once we + // add u128 PType, see https://github.com/spiraldb/vortex/issues/1110 + let mut views = BufferBuilder::::new(len); + views.append_n(len, view); + let views = + PrimitiveArray::new(views.finish().into(), PType::U8, Validity::NonNullable) + .into_array(); + + let validity = if dtype.nullability() == Nullability::NonNullable { + Validity::NonNullable + } else { + Validity::AllValid + }; + + VarBinViewArray::try_new(views, buffers, dtype.clone(), validity) + } + } +} + +#[cfg(test)] +mod tests { + use crate::array::ConstantArray; + use crate::compute::unary::scalar_at; + use crate::IntoCanonical; + + #[test] + fn test_canonicalize_const_str() { + let const_array = ConstantArray::new("four".to_string(), 4); + + // Check all values correct. + let canonical = const_array + .into_canonical() + .unwrap() + .into_varbinview() + .unwrap(); + + assert_eq!(canonical.len(), 4); + + for i in 0..=3 { + assert_eq!(scalar_at(&canonical, i).unwrap(), "four".into(),); + } + } +} diff --git a/vortex-array/src/array/constant/variants.rs b/vortex-array/src/array/constant/variants.rs index b17028446..81e3a590f 100644 --- a/vortex-array/src/array/constant/variants.rs +++ b/vortex-array/src/array/constant/variants.rs @@ -53,6 +53,14 @@ impl ArrayVariants for ConstantArray { impl NullArrayTrait for ConstantArray {} impl BoolArrayTrait for ConstantArray { + fn invert(&self) -> VortexResult { + let value = self.scalar_value().as_bool()?; + match value { + None => Ok(self.clone().into_array()), + Some(b) => Ok(ConstantArray::new(!b, self.len()).into_array()), + } + } + fn maybe_null_indices_iter(&self) -> Box> { let value = self .scalar_value() diff --git a/vortex-array/src/array/varbin/arrow.rs b/vortex-array/src/array/varbin/arrow.rs new file mode 100644 index 000000000..5f3e8b3cc --- /dev/null +++ b/vortex-array/src/array/varbin/arrow.rs @@ -0,0 +1,107 @@ +use std::sync::Arc; + +use arrow_array::{ArrayRef, BinaryArray, Datum, LargeBinaryArray, LargeStringArray, StringArray}; +use vortex_dtype::{DType, PType}; +use vortex_error::{vortex_bail, VortexResult}; +use vortex_scalar::Scalar; + +use crate::array::VarBinArray; +use crate::arrow::wrappers::as_offset_buffer; +use crate::compute::unary::try_cast; +use crate::validity::ArrayValidity; +use crate::{ArrayDType, IntoArrayVariant, ToArray}; + +/// Convert the array to Arrow variable length binary array type. +pub(crate) fn varbin_to_arrow(varbin_array: &VarBinArray) -> VortexResult { + let offsets = varbin_array + .offsets() + .into_primitive() + .map_err(|err| err.with_context("Failed to canonicalize offsets"))?; + let offsets = match offsets.ptype() { + PType::I32 | PType::I64 => offsets, + PType::U64 => try_cast(offsets, PType::I64.into())?.into_primitive()?, + PType::U32 => try_cast(offsets, PType::I32.into())?.into_primitive()?, + + // Unless it's u64, everything else can be converted into an i32. + _ => try_cast(offsets.to_array(), PType::I32.into()) + .and_then(|a| a.into_primitive()) + .map_err(|err| err.with_context("Failed to cast offsets to PrimitiveArray of i32"))?, + }; + let nulls = varbin_array + .logical_validity() + .to_null_buffer() + .map_err(|err| err.with_context("Failed to get null buffer from logical validity"))?; + + let data = varbin_array + .bytes() + .into_primitive() + .map_err(|err| err.with_context("Failed to canonicalize bytes"))?; + if data.dtype() != &DType::BYTES { + vortex_bail!("Expected bytes to be of type U8, got {}", data.ptype()); + } + let data = data.buffer(); + + // Switch on Arrow DType. + Ok(match varbin_array.dtype() { + DType::Binary(_) => match offsets.ptype() { + PType::I32 => Arc::new(unsafe { + BinaryArray::new_unchecked( + as_offset_buffer::(offsets), + data.clone().into_arrow(), + nulls, + ) + }), + PType::I64 => Arc::new(unsafe { + LargeBinaryArray::new_unchecked( + as_offset_buffer::(offsets), + data.clone().into_arrow(), + nulls, + ) + }), + _ => vortex_bail!("Invalid offsets type {}", offsets.ptype()), + }, + DType::Utf8(_) => match offsets.ptype() { + PType::I32 => Arc::new(unsafe { + StringArray::new_unchecked( + as_offset_buffer::(offsets), + data.clone().into_arrow(), + nulls, + ) + }), + PType::I64 => Arc::new(unsafe { + LargeStringArray::new_unchecked( + as_offset_buffer::(offsets), + data.clone().into_arrow(), + nulls, + ) + }), + _ => vortex_bail!("Invalid offsets type {}", offsets.ptype()), + }, + _ => vortex_bail!( + "expected utf8 or binary instead of {}", + varbin_array.dtype() + ), + }) +} + +/// Create a [`Datum`] from a Utf8 or Binary scalar. +pub(crate) fn varbin_datum(scalar: Scalar) -> VortexResult> { + match scalar.dtype() { + DType::Utf8(_) => Ok(Arc::new( + scalar + .value() + .as_buffer_string()? + .map(StringArray::new_scalar) + .unwrap_or_else(|| arrow_array::Scalar::new(StringArray::new_null(1))), + )), + DType::Binary(_) => Ok(Arc::new( + scalar + .value() + .as_buffer()? + .map(BinaryArray::new_scalar) + .unwrap_or_else(|| arrow_array::Scalar::new(BinaryArray::new_null(1))), + )), + + other => vortex_bail!("Expected Utf8 or Binary scalar, found {other}"), + } +} diff --git a/vortex-array/src/array/varbin/compute/compare.rs b/vortex-array/src/array/varbin/compute/compare.rs index fcc40bd29..5a4ad5a92 100644 --- a/vortex-array/src/array/varbin/compute/compare.rs +++ b/vortex-array/src/array/varbin/compute/compare.rs @@ -7,10 +7,11 @@ use arrow_ord::cmp; use arrow_schema::DataType; use vortex_error::{vortex_bail, VortexResult}; +use crate::array::varbin::arrow::{varbin_datum, varbin_to_arrow}; use crate::array::{ConstantArray, VarBinArray}; use crate::arrow::FromArrowArray; use crate::compute::{MaybeCompareFn, Operator}; -use crate::{Array, IntoCanonical}; +use crate::Array; impl MaybeCompareFn for VarBinArray { fn maybe_compare(&self, other: &Array, operator: Operator) -> Option> { @@ -27,8 +28,9 @@ fn compare_constant( rhs: &ConstantArray, operator: Operator, ) -> VortexResult { - let arrow_lhs = lhs.clone().into_canonical()?.into_arrow()?; - let constant = Arc::::try_from(&rhs.owned_scalar())?; + // Compare using the arrow kernels directly. + let arrow_lhs = varbin_to_arrow(lhs)?; + let constant = varbin_datum(rhs.owned_scalar())?; match arrow_lhs.data_type() { DataType::Binary => { diff --git a/vortex-array/src/array/varbin/compute/filter.rs b/vortex-array/src/array/varbin/compute/filter.rs index 4d220d652..f565a1d68 100644 --- a/vortex-array/src/array/varbin/compute/filter.rs +++ b/vortex-array/src/array/varbin/compute/filter.rs @@ -38,7 +38,7 @@ fn filter_select_var_bin_by_slice( predicate: &dyn BoolArrayTrait, selection_count: usize, ) -> VortexResult { - let offsets = values.offsets().as_primitive(); + let offsets = values.offsets().into_primitive()?; match_each_integer_ptype!(offsets.ptype(), |$O| { filter_select_var_bin_by_slice_primitive_offset( values.dtype().clone(), @@ -137,7 +137,7 @@ fn filter_select_var_bin_by_index( predicate: &dyn BoolArrayTrait, selection_count: usize, ) -> VortexResult { - let offsets = values.offsets().as_primitive(); + let offsets = values.offsets().into_primitive()?; match_each_integer_ptype!(offsets.ptype(), |$O| { filter_select_var_bin_by_index_primitive_offset( values.dtype().clone(), diff --git a/vortex-array/src/array/varbin/compute/mod.rs b/vortex-array/src/array/varbin/compute/mod.rs index b964a7236..8c17c2cfc 100644 --- a/vortex-array/src/array/varbin/compute/mod.rs +++ b/vortex-array/src/array/varbin/compute/mod.rs @@ -3,7 +3,7 @@ use vortex_scalar::Scalar; use crate::array::varbin::{varbin_scalar, VarBinArray}; use crate::compute::unary::ScalarAtFn; -use crate::compute::{ArrayCompute, MaybeCompareFn, Operator, SliceFn, TakeFn}; +use crate::compute::{ArrayCompute, FilterFn, MaybeCompareFn, Operator, SliceFn, TakeFn}; use crate::{Array, ArrayDType}; mod compare; @@ -16,6 +16,10 @@ impl ArrayCompute for VarBinArray { MaybeCompareFn::maybe_compare(self, other, operator) } + fn filter(&self) -> Option<&dyn FilterFn> { + Some(self) + } + fn scalar_at(&self) -> Option<&dyn ScalarAtFn> { Some(self) } diff --git a/vortex-array/src/array/varbin/flatten.rs b/vortex-array/src/array/varbin/flatten.rs index d00d03290..d7c0ba3aa 100644 --- a/vortex-array/src/array/varbin/flatten.rs +++ b/vortex-array/src/array/varbin/flatten.rs @@ -1,10 +1,58 @@ +use arrow_schema::DataType; use vortex_error::VortexResult; +use crate::array::varbin::arrow::varbin_to_arrow; use crate::array::varbin::VarBinArray; -use crate::{Canonical, IntoCanonical}; +use crate::array::VarBinViewArray; +use crate::arrow::FromArrowArray; +use crate::{Array, ArrayDType, Canonical, IntoCanonical}; impl IntoCanonical for VarBinArray { fn into_canonical(self) -> VortexResult { - Ok(Canonical::VarBin(self)) + let nullable = self.dtype().is_nullable(); + let array_ref = varbin_to_arrow(&self)?; + let casted = arrow_cast::cast(array_ref.as_ref(), &DataType::Utf8View)?; + + VarBinViewArray::try_from(Array::from_arrow(casted, nullable)).map(Canonical::VarBinView) + } +} + +#[cfg(test)] +mod test { + use vortex_dtype::{DType, Nullability}; + + use crate::array::varbin::builder::VarBinBuilder; + use crate::validity::ArrayValidity; + use crate::IntoCanonical; + + #[test] + fn test_canonical_varbin() { + let mut varbin = VarBinBuilder::::with_capacity(10); + varbin.push_null(); + varbin.push_null(); + // inlined value + varbin.push_value("123456789012".as_bytes()); + // non-inlinable value + varbin.push_value("1234567890123".as_bytes()); + let varbin = varbin.finish(DType::Utf8(Nullability::Nullable)); + + let canonical = varbin.into_canonical().unwrap().into_varbinview().unwrap(); + + assert!(!canonical.is_valid(0)); + assert!(!canonical.is_valid(1)); + + // First value is inlined (12 bytes) + assert!(canonical.view_at(2).is_inlined()); + assert_eq!( + canonical.bytes_at(2).unwrap().as_slice(), + "123456789012".as_bytes() + ); + + // Second value is not inlined (13 bytes) + assert!(!canonical.view_at(3).is_inlined()); + assert_eq!( + canonical.bytes_at(3).unwrap().as_slice(), + "1234567890123".as_bytes() + ); } } diff --git a/vortex-array/src/array/varbin/mod.rs b/vortex-array/src/array/varbin/mod.rs index 871776ccd..df8ed51de 100644 --- a/vortex-array/src/array/varbin/mod.rs +++ b/vortex-array/src/array/varbin/mod.rs @@ -22,6 +22,7 @@ use crate::{impl_encoding, Array, ArrayDType, ArrayTrait, IntoArrayVariant}; mod accessor; mod array; +mod arrow; pub mod builder; mod compute; mod flatten; @@ -205,6 +206,17 @@ impl VarBinArray { let sliced = slice(self.bytes(), start, end)?; Ok(sliced.into_primitive()?.buffer().clone()) } + + /// Consumes self, returning a tuple containing the `DType`, the `bytes` array, + /// the `offsets` array, and the `validity`. + pub fn into_parts(self) -> (DType, Array, Array, Validity) { + ( + self.dtype().clone(), + self.bytes(), + self.offsets(), + self.validity(), + ) + } } impl ArrayTrait for VarBinArray {} diff --git a/vortex-array/src/array/varbinview/accessor.rs b/vortex-array/src/array/varbinview/accessor.rs index a7e079288..00b8c863a 100644 --- a/vortex-array/src/array/varbinview/accessor.rs +++ b/vortex-array/src/array/varbinview/accessor.rs @@ -1,10 +1,11 @@ +use itertools::Itertools; use vortex_error::VortexResult; use crate::accessor::ArrayAccessor; use crate::array::primitive::PrimitiveArray; use crate::array::varbinview::VarBinViewArray; use crate::validity::ArrayValidity; -use crate::IntoArrayVariant; +use crate::IntoCanonical; impl ArrayAccessor<[u8]> for VarBinViewArray { fn with_iterator FnOnce(&mut dyn Iterator>) -> R, R>( @@ -12,22 +13,22 @@ impl ArrayAccessor<[u8]> for VarBinViewArray { f: F, ) -> VortexResult { let views = self.view_slice(); - let bytes: Vec = (0..self.metadata().data_lens.len()) - .map(|i| self.bytes(i).into_primitive()) - .collect::>>()?; + let bytes: Vec = (0..self.metadata().buffer_lens.len()) + .map(|i| self.buffer(i).into_canonical()?.into_primitive()) + .try_collect()?; let validity = self.logical_validity().to_null_buffer()?; match validity { None => { let mut iter = views.iter().map(|view| { if view.is_inlined() { - Some(unsafe { &view.inlined.data[..view.size()] }) + Some(unsafe { &view.inlined.data[..view.len() as usize] }) } else { let offset = unsafe { view._ref.offset as usize }; let buffer_idx = unsafe { view._ref.buffer_index as usize }; Some( &bytes[buffer_idx].maybe_null_slice::() - [offset..offset + view.size()], + [offset..offset + view.len() as usize], ) } }); @@ -37,13 +38,13 @@ impl ArrayAccessor<[u8]> for VarBinViewArray { let mut iter = views.iter().zip(validity.iter()).map(|(view, valid)| { if valid { if view.is_inlined() { - Some(unsafe { &view.inlined.data[..view.size()] }) + Some(unsafe { &view.inlined.data[..view.len() as usize] }) } else { let offset = unsafe { view._ref.offset as usize }; let buffer_idx = unsafe { view._ref.buffer_index as usize }; Some( &bytes[buffer_idx].maybe_null_slice::() - [offset..offset + view.size()], + [offset..offset + view.len() as usize], ) } } else { diff --git a/vortex-array/src/array/varbinview/compute.rs b/vortex-array/src/array/varbinview/compute.rs index f288fcb69..8631407ac 100644 --- a/vortex-array/src/array/varbinview/compute.rs +++ b/vortex-array/src/array/varbinview/compute.rs @@ -1,12 +1,21 @@ +use std::sync::Arc; + +use arrow_array::cast::AsArray; +use arrow_array::types::ByteViewType; +use arrow_array::{Datum, GenericByteViewArray}; +use arrow_ord::cmp; +use arrow_schema::DataType; use vortex_buffer::Buffer; -use vortex_error::{vortex_panic, VortexResult}; +use vortex_error::{vortex_bail, VortexResult, VortexUnwrap}; use vortex_scalar::Scalar; use crate::array::varbin::varbin_scalar; -use crate::array::varbinview::{VarBinViewArray, VIEW_SIZE}; +use crate::array::varbinview::{VarBinViewArray, VIEW_SIZE_BYTES}; +use crate::array::{varbinview_as_arrow, ConstantArray}; +use crate::arrow::FromArrowArray; use crate::compute::unary::ScalarAtFn; -use crate::compute::{slice, ArrayCompute, SliceFn}; -use crate::{Array, ArrayDType, IntoArray}; +use crate::compute::{slice, ArrayCompute, MaybeCompareFn, Operator, SliceFn, TakeFn}; +use crate::{Array, ArrayDType, IntoArray, IntoCanonical}; impl ArrayCompute for VarBinViewArray { fn scalar_at(&self) -> Option<&dyn ScalarAtFn> { @@ -16,6 +25,10 @@ impl ArrayCompute for VarBinViewArray { fn slice(&self) -> Option<&dyn SliceFn> { Some(self) } + + fn take(&self) -> Option<&dyn TakeFn> { + Some(self) + } } impl ScalarAtFn for VarBinViewArray { @@ -25,16 +38,20 @@ impl ScalarAtFn for VarBinViewArray { } fn scalar_at_unchecked(&self, index: usize) -> Scalar { - ::scalar_at(self, index).unwrap_or_else(|err| vortex_panic!(err)) + ::scalar_at(self, index).vortex_unwrap() } } impl SliceFn for VarBinViewArray { fn slice(&self, start: usize, stop: usize) -> VortexResult { Ok(Self::try_new( - slice(self.views(), start * VIEW_SIZE, stop * VIEW_SIZE)?, - (0..self.metadata().data_lens.len()) - .map(|i| self.bytes(i)) + slice( + self.views(), + start * VIEW_SIZE_BYTES, + stop * VIEW_SIZE_BYTES, + )?, + (0..self.metadata().buffer_lens.len()) + .map(|i| self.buffer(i)) .collect::>(), self.dtype().clone(), self.validity().slice(start, stop)?, @@ -42,3 +59,97 @@ impl SliceFn for VarBinViewArray { .into_array()) } } + +/// Take involves creating a new array that references the old array, just with the given set of views. +impl TakeFn for VarBinViewArray { + fn take(&self, indices: &Array) -> VortexResult { + let array_ref = varbinview_as_arrow(self); + let indices_arrow = indices.clone().into_canonical()?.into_arrow()?; + + let take_arrow = arrow_select::take::take(&array_ref, &indices_arrow, None)?; + let nullable = take_arrow.is_nullable(); + Ok(Array::from_arrow(take_arrow, nullable)) + } +} + +impl MaybeCompareFn for VarBinViewArray { + fn maybe_compare(&self, other: &Array, operator: Operator) -> Option> { + if let Ok(rhs_const) = ConstantArray::try_from(other) { + Some(compare_constant(self, &rhs_const, operator)) + } else { + None + } + } +} + +fn compare_constant( + lhs: &VarBinViewArray, + rhs: &ConstantArray, + operator: Operator, +) -> VortexResult { + let arrow_lhs = lhs.clone().into_canonical()?.into_arrow()?; + let constant = Arc::::try_from(&rhs.owned_scalar())?; + + match arrow_lhs.data_type() { + DataType::BinaryView => { + compare_constant_arrow(arrow_lhs.as_binary_view(), constant, operator) + } + DataType::Utf8View => { + compare_constant_arrow(arrow_lhs.as_string_view(), constant, operator) + } + _ => { + vortex_bail!("Cannot compare VarBinViewArray with non-binary type"); + } + } +} + +fn compare_constant_arrow( + lhs: &GenericByteViewArray, + rhs: Arc, + operator: Operator, +) -> VortexResult { + let rhs = rhs.as_ref(); + let array = match operator { + Operator::Eq => cmp::eq(lhs, rhs)?, + Operator::NotEq => cmp::neq(lhs, rhs)?, + Operator::Gt => cmp::gt(lhs, rhs)?, + Operator::Gte => cmp::gt_eq(lhs, rhs)?, + Operator::Lt => cmp::lt(lhs, rhs)?, + Operator::Lte => cmp::lt_eq(lhs, rhs)?, + }; + Ok(crate::Array::from_arrow(&array, true)) +} + +#[cfg(test)] +mod tests { + use vortex_dtype::Nullability; + use vortex_scalar::Scalar; + + use crate::array::varbinview::compute::compare_constant; + use crate::array::{ConstantArray, VarBinViewArray}; + use crate::compute::Operator; + use crate::IntoArrayVariant; + + #[test] + fn basic_test() { + let arr = VarBinViewArray::from_iter_nullable_str([ + Some("one"), + Some("two"), + Some("three"), + Some("four"), + Some("five"), + Some("six"), + ]); + + let s = Scalar::utf8("seven".to_string(), Nullability::Nullable); + + let constant_array = ConstantArray::new(s, arr.len()); + + let r = compare_constant(&arr, &constant_array, Operator::Eq) + .unwrap() + .into_bool() + .unwrap(); + + assert!(r.boolean_buffer().iter().all(|v| !v)); + } +} diff --git a/vortex-array/src/array/varbinview/mod.rs b/vortex-array/src/array/varbinview/mod.rs index 7081def6a..bfa5aa8fc 100644 --- a/vortex-array/src/array/varbinview/mod.rs +++ b/vortex-array/src/array/varbinview/mod.rs @@ -1,19 +1,19 @@ use std::fmt::{Debug, Display, Formatter}; -use std::ops::Deref; +use std::slice; use std::sync::Arc; -use std::{mem, slice}; use ::serde::{Deserialize, Serialize}; -use arrow_array::builder::{BinaryViewBuilder, StringViewBuilder}; -use arrow_array::{ArrayRef, BinaryViewArray, StringViewArray}; +use arrow_array::builder::{BinaryViewBuilder, GenericByteViewBuilder, StringViewBuilder}; +use arrow_array::types::{BinaryViewType, ByteViewType, StringViewType}; +use arrow_array::{ArrayRef, BinaryViewArray, GenericByteViewArray, StringViewArray}; use arrow_buffer::ScalarBuffer; -use arrow_schema::DataType; use itertools::Itertools; +use static_assertions::{assert_eq_align, assert_eq_size}; use vortex_dtype::{DType, PType}; -use vortex_error::{vortex_bail, vortex_panic, VortexError, VortexExpect as _, VortexResult}; +use vortex_error::{vortex_bail, vortex_err, vortex_panic, VortexExpect, VortexResult}; -use crate::array::varbin::VarBinArray; use crate::array::visitor::{AcceptArrayVisitor, ArrayVisitor}; +use crate::array::PrimitiveArray; use crate::arrow::FromArrowArray; use crate::compute::slice; use crate::encoding::ids; @@ -49,6 +49,11 @@ impl Inlined { inlined.data[..value.len()].copy_from_slice(value); inlined } + + #[inline] + pub fn value(&self) -> &[u8] { + &self.data[0..(self.size as usize)] + } } #[derive(Clone, Copy, Debug)] @@ -69,25 +74,95 @@ impl Ref { offset, } } + + #[inline] + pub fn buffer_index(&self) -> u32 { + self.buffer_index + } + + #[inline] + pub fn offset(&self) -> u32 { + self.offset + } + + #[inline] + pub fn prefix(&self) -> &[u8; 4] { + &self.prefix + } } #[derive(Clone, Copy)] #[repr(C, align(8))] pub union BinaryView { + // Numeric representation. This is logically `u128`, but we split it into the high and low + // bits to preserve the alignment. + num: [u64; 2], + + // Inlined representation: strings <= 12 bytes inlined: Inlined, + + // Reference type: strings > 12 bytes. _ref: Ref, } +assert_eq_size!(BinaryView, [u8; 16]); +assert_eq_size!(Inlined, [u8; 16]); +assert_eq_size!(Ref, [u8; 16]); +assert_eq_align!(BinaryView, u64); + impl BinaryView { pub const MAX_INLINED_SIZE: usize = 12; + pub fn new_inlined(value: &[u8]) -> Self { + assert!( + value.len() <= Self::MAX_INLINED_SIZE, + "expected inlined value to be <= 12 bytes, was {}", + value.len() + ); + + Self { + inlined: Inlined::new(value), + } + } + + /// Create a new view over bytes stored in a block. + pub fn new_view(len: u32, prefix: [u8; 4], block: u32, offset: u32) -> Self { + Self { + _ref: Ref::new(len, prefix, block, offset), + } + } + #[inline] - pub fn size(&self) -> usize { - unsafe { self.inlined.size as usize } + pub fn len(&self) -> u32 { + unsafe { self.inlined.size } } + #[inline] + pub fn is_empty(&self) -> bool { + self.len() > 0 + } + + #[inline] pub fn is_inlined(&self) -> bool { - unsafe { self.inlined.size <= Self::MAX_INLINED_SIZE as u32 } + self.len() <= (Self::MAX_INLINED_SIZE as u32) + } + + pub fn as_inlined(&self) -> &Inlined { + unsafe { &self.inlined } + } + + pub fn as_view(&self) -> &Ref { + unsafe { &self._ref } + } + + pub fn as_u128(&self) -> u128 { + let mut tmp = 0u128; + unsafe { + tmp |= self.num[0] as u128; + tmp |= (self.num[1] as u128) << 64; + } + + tmp } } @@ -95,23 +170,25 @@ impl Debug for BinaryView { fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result { let mut s = f.debug_struct("BinaryView"); if self.is_inlined() { - s.field("inline", unsafe { &self.inlined }); + s.field("inline", &"i".to_string()); } else { - s.field("ref", unsafe { &self._ref }); + s.field("ref", &"r".to_string()); } s.finish() } } // reminder: views are 16 bytes with 8-byte alignment -pub(crate) const VIEW_SIZE: usize = mem::size_of::(); - -impl_encoding!("vortex.varbinview", ids::VAR_BIN_VIEW, VarBinView); +pub(crate) const VIEW_SIZE_BYTES: usize = size_of::(); #[derive(Debug, Clone, Serialize, Deserialize)] pub struct VarBinViewMetadata { + // Validity metadata validity: ValidityMetadata, - data_lens: Vec, + + // Length of each buffer. The buffers are primitive byte arrays containing the raw string/binary + // data referenced by views. + buffer_lens: Vec, } impl Display for VarBinViewMetadata { @@ -120,10 +197,32 @@ impl Display for VarBinViewMetadata { } } +pub struct Buffers<'a> { + index: u32, + n_buffers: u32, + array: &'a VarBinViewArray, +} + +impl<'a> Iterator for Buffers<'a> { + type Item = Array; + + fn next(&mut self) -> Option { + if self.index >= self.n_buffers { + return None; + } + + let bytes = self.array.buffer(self.index as usize); + self.index += 1; + Some(bytes) + } +} + +impl_encoding!("vortex.varbinview", ids::VAR_BIN_VIEW, VarBinView); + impl VarBinViewArray { pub fn try_new( views: Array, - data: Vec, + buffers: Vec, dtype: DType, validity: Validity, ) -> VortexResult { @@ -131,7 +230,7 @@ impl VarBinViewArray { vortex_bail!(MismatchedTypes: "u8", views.dtype()); } - for d in data.iter() { + for d in buffers.iter() { if !matches!(d.dtype(), &DType::BYTES) { vortex_bail!(MismatchedTypes: "u8", d.dtype()); } @@ -145,15 +244,22 @@ impl VarBinViewArray { vortex_bail!("incorrect validity {:?}", validity); } - let num_views = views.len() / VIEW_SIZE; + let num_views = views.len() / VIEW_SIZE_BYTES; + let buffer_lens: Vec = buffers + .iter() + .map(|buffer| -> VortexResult { + u32::try_from(buffer.len()) + .map_err(|e| vortex_err!("buffer must be within 32-bit range: {e}")) + }) + .try_collect()?; let metadata = VarBinViewMetadata { validity: validity.to_metadata(num_views)?, - data_lens: data.iter().map(|a| a.len()).collect_vec(), + buffer_lens, }; - let mut children = Vec::with_capacity(data.len() + 2); + let mut children = Vec::with_capacity(buffers.len() + 2); children.push(views); - children.extend(data); + children.extend(buffers); if let Some(a) = validity.into_array() { children.push(a) } @@ -161,42 +267,86 @@ impl VarBinViewArray { Self::try_from_parts(dtype, num_views, metadata, children.into(), StatsSet::new()) } - fn view_slice(&self) -> &[BinaryView] { + /// Number of raw string data buffers held by this array. + pub fn buffer_count(&self) -> usize { + self.metadata().buffer_lens.len() + } + + /// Access to the underlying `views` child array as a slice of [BinaryView] structures. + /// + /// This is useful for iteration over the values, as well as for applying filters that may + /// only require hitting the prefixes or inline strings. + pub fn view_slice(&self) -> &[BinaryView] { unsafe { slice::from_raw_parts( - self.views() - .into_primitive() + PrimitiveArray::try_from(self.views()) .vortex_expect("Views must be a primitive array") .maybe_null_slice::() .as_ptr() as _, - self.views().len() / VIEW_SIZE, + self.views().len() / VIEW_SIZE_BYTES, ) } } - fn view_at(&self, index: usize) -> BinaryView { + pub fn view_at(&self, index: usize) -> BinaryView { self.view_slice()[index] } + /// Access to the primitive views array. + /// + /// Variable-sized binary view arrays contain a "view" child array, with 16-byte entries that + /// contain either a pointer into one of the array's owned `buffer`s OR an inlined copy of + /// the string (if the string has 12 bytes or fewer). #[inline] pub fn views(&self) -> Array { self.as_ref() - .child(0, &DType::BYTES, self.len() * VIEW_SIZE) - .vortex_expect("VarBinViewArray is missing its views") + .child(0, &DType::BYTES, self.len() * VIEW_SIZE_BYTES) + .vortex_expect("VarBinViewArray: views child") } + /// Access one of the backing data buffers. + /// + /// # Panics + /// + /// This method panics if the provided index is out of bounds for the set of buffers provided + /// at construction time. #[inline] - pub fn bytes(&self, idx: usize) -> Array { + pub fn buffer(&self, idx: usize) -> Array { self.as_ref() - .child(idx + 1, &DType::BYTES, self.metadata().data_lens[idx]) - .vortex_expect("VarBinViewArray is missing its data buffer") + .child( + idx + 1, + &DType::BYTES, + self.metadata().buffer_lens[idx] as usize, + ) + .vortex_expect("VarBinViewArray: buffer child") + } + + /// Retrieve an iterator over the raw data buffers. + /// These are the BYTE buffers that make up the array's contents. + /// + /// Example + /// + /// ``` + /// use vortex::array::VarBinViewArray; + /// let array = VarBinViewArray::from_iter_str(["a", "b", "c"]); + /// array.buffers().for_each(|block| { + /// // Do something with the `block` + /// }); + /// ``` + pub fn buffers(&self) -> Buffers { + Buffers { + index: 0, + n_buffers: u32::try_from(self.buffer_count()) + .unwrap_or_else(|e| vortex_panic!("n_buffers exceeds u32::MAX: {e}")), + array: self, + } } pub fn validity(&self) -> Validity { self.metadata().validity.to_validity(|| { self.as_ref() .child( - self.metadata().data_lens.len() + 1, + (self.metadata().buffer_lens.len() + 1) as _, &Validity::DTYPE, self.len(), ) @@ -204,6 +354,45 @@ impl VarBinViewArray { }) } + /// Accumulate an iterable set of values into our type here. + #[allow(clippy::same_name_method)] + pub fn from_iter, I: IntoIterator>>( + iter: I, + dtype: DType, + ) -> Self { + match dtype { + DType::Utf8(nullability) => { + let string_view_array = generic_byte_view_builder::( + iter.into_iter(), + |builder, v| { + match v { + None => builder.append_null(), + Some(inner) => { + // SAFETY: the caller must provide valid utf8 values if Utf8 DType is passed. + let utf8 = unsafe { std::str::from_utf8_unchecked(inner.as_ref()) }; + builder.append_value(utf8); + } + } + }, + ); + VarBinViewArray::try_from(Array::from_arrow(&string_view_array, nullability.into())) + .vortex_expect("StringViewArray to VarBinViewArray downcast") + } + DType::Binary(nullability) => { + let binary_view_array = generic_byte_view_builder::( + iter.into_iter(), + |builder, v| match v { + None => builder.append_null(), + Some(bytes) => builder.append_value(bytes.as_ref()), + }, + ); + VarBinViewArray::try_from(Array::from_arrow(&binary_view_array, nullability.into())) + .vortex_expect("BinaryViewArray to VarBinViewArray downcast") + } + other => vortex_panic!("VarBinViewArray must be Utf8 or Binary, was {other}"), + } + } + pub fn from_iter_str, I: IntoIterator>(iter: I) -> Self { let iter = iter.into_iter(); let mut builder = StringViewBuilder::with_capacity(iter.size_hint().0); @@ -211,8 +400,7 @@ impl VarBinViewArray { builder.append_value(s); } let array = Array::from_arrow(&builder.finish(), false); - VarBinViewArray::try_from(array) - .vortex_expect("Failed to convert iterator of nullable strings to VarBinViewArray") + VarBinViewArray::try_from(array).vortex_expect("VarBinViewArray from StringViewBuilder") } pub fn from_iter_nullable_str, I: IntoIterator>>( @@ -223,8 +411,7 @@ impl VarBinViewArray { builder.extend(iter); let array = Array::from_arrow(&builder.finish(), true); - VarBinViewArray::try_from(array) - .vortex_expect("Failed to convert iterator of nullable strings to VarBinViewArray") + VarBinViewArray::try_from(array).vortex_expect("VarBinViewArray from StringViewBuilder") } pub fn from_iter_bin, I: IntoIterator>(iter: I) -> Self { @@ -233,9 +420,8 @@ impl VarBinViewArray { for b in iter { builder.append_value(b); } - let array = Array::from_arrow(&builder.finish(), false); - VarBinViewArray::try_from(array) - .vortex_expect("Failed to convert iterator of bytes to VarBinViewArray") + let array = Array::from_arrow(&builder.finish(), true); + VarBinViewArray::try_from(array).vortex_expect("VarBinViewArray from StringViewBuilder") } pub fn from_iter_nullable_bin, I: IntoIterator>>( @@ -245,63 +431,79 @@ impl VarBinViewArray { let mut builder = BinaryViewBuilder::with_capacity(iter.size_hint().0); builder.extend(iter); let array = Array::from_arrow(&builder.finish(), true); - VarBinViewArray::try_from(array) - .vortex_expect("Failed to convert iterator of nullable bytes to VarBinViewArray") + VarBinViewArray::try_from(array).vortex_expect("VarBinViewArray from StringViewBuilder") } + // TODO(aduffy): do we really need to do this with copying? pub fn bytes_at(&self, index: usize) -> VortexResult> { let view = self.view_at(index); - unsafe { - if !view.is_inlined() { - let data_buf = slice( - self.bytes(view._ref.buffer_index as usize), - view._ref.offset as usize, - (view._ref.size + view._ref.offset) as usize, - )? - .into_primitive()?; - Ok(data_buf.maybe_null_slice::().to_vec()) - } else { - Ok(view.inlined.data[..view.size()].to_vec()) - } + // Expect this to be the common case: strings > 12 bytes. + if !view.is_inlined() { + let view_ref = view.as_view(); + let data_buf = slice( + self.buffer(view_ref.buffer_index() as usize), + view_ref.offset() as usize, + (view.len() + view_ref.offset()) as usize, + )? + .into_primitive()?; + Ok(data_buf.maybe_null_slice::().to_vec()) + } else { + // Return access to the range of bytes around it. + Ok(view.as_inlined().value().to_vec()) } } } +// Generic helper to create an Arrow ByteViewBuilder of the appropriate type. +fn generic_byte_view_builder( + values: impl Iterator>, + mut append_fn: F, +) -> GenericByteViewArray +where + B: ByteViewType, + V: AsRef<[u8]>, + F: FnMut(&mut GenericByteViewBuilder, Option), +{ + let mut builder = GenericByteViewBuilder::::new(); + + for value in values { + append_fn(&mut builder, value); + } + + builder.finish() +} + impl ArrayTrait for VarBinViewArray {} impl IntoCanonical for VarBinViewArray { fn into_canonical(self) -> VortexResult { - let arrow_dtype = if matches!(self.dtype(), &DType::Utf8(_)) { - &DataType::Utf8 - } else { - &DataType::Binary - }; let nullable = self.dtype().is_nullable(); - let arrow_self = as_arrow(self); - let arrow_varbin = - arrow_cast::cast(arrow_self.deref(), arrow_dtype).map_err(VortexError::ArrowError)?; - let vortex_array = Array::from_arrow(arrow_varbin, nullable); + let arrow_self = varbinview_as_arrow(&self); + let vortex_array = Array::from_arrow(arrow_self, nullable); - Ok(Canonical::VarBin(VarBinArray::try_from(&vortex_array)?)) + Ok(Canonical::VarBinView(VarBinViewArray::try_from( + &vortex_array, + )?)) } } -fn as_arrow(var_bin_view: VarBinViewArray) -> ArrayRef { +pub(crate) fn varbinview_as_arrow(var_bin_view: &VarBinViewArray) -> ArrayRef { // Views should be buffer of u8 let views = var_bin_view .views() .into_primitive() - .vortex_expect("Views must be a primitive array"); + .vortex_expect("VarBinViewArray: views child must be primitive"); assert_eq!(views.ptype(), PType::U8); + let nulls = var_bin_view .logical_validity() .to_null_buffer() - .vortex_expect("Failed to convert logical validity to null buffer"); + .vortex_expect("VarBinViewArray: validity child must be bool"); - let data = (0..var_bin_view.metadata().data_lens.len()) - .map(|i| var_bin_view.bytes(i).into_primitive()) + let data = (0..var_bin_view.buffer_count()) + .map(|i| var_bin_view.buffer(i).into_primitive()) .collect::>>() - .vortex_expect("VarBinView byte arrays must be primitive arrays"); + .vortex_expect("VarBinViewArray: bytes children must be primitive"); if !data.is_empty() { assert_eq!(data[0].ptype(), PType::U8); assert!(data.iter().map(|d| d.ptype()).all_equal()); @@ -314,17 +516,21 @@ fn as_arrow(var_bin_view: VarBinViewArray) -> ArrayRef { // Switch on Arrow DType. match var_bin_view.dtype() { - DType::Binary(_) => Arc::new(BinaryViewArray::new( - ScalarBuffer::::from(views.buffer().clone().into_arrow()), - data, - nulls, - )), - DType::Utf8(_) => Arc::new(StringViewArray::new( - ScalarBuffer::::from(views.buffer().clone().into_arrow()), - data, - nulls, - )), - _ => vortex_panic!("Expected utf8 or binary, got {}", var_bin_view.dtype()), + DType::Binary(_) => Arc::new(unsafe { + BinaryViewArray::new_unchecked( + ScalarBuffer::::from(views.buffer().clone().into_arrow()), + data, + nulls, + ) + }), + DType::Utf8(_) => Arc::new(unsafe { + StringViewArray::new_unchecked( + ScalarBuffer::::from(views.buffer().clone().into_arrow()), + data, + nulls, + ) + }), + _ => vortex_panic!("expected utf8 or binary, got {}", var_bin_view.dtype()), } } @@ -341,8 +547,8 @@ impl ArrayValidity for VarBinViewArray { impl AcceptArrayVisitor for VarBinViewArray { fn accept(&self, visitor: &mut dyn ArrayVisitor) -> VortexResult<()> { visitor.visit_child("views", &self.views())?; - for i in 0..self.metadata().data_lens.len() { - visitor.visit_child(format!("bytes_{i}").as_str(), &self.bytes(i))?; + for i in 0..self.metadata().buffer_lens.len() { + visitor.visit_child(format!("bytes_{i}").as_str(), &self.buffer(i))?; } visitor.visit_validity(&self.validity()) } @@ -376,10 +582,10 @@ impl<'a> FromIterator> for VarBinViewArray { mod test { use vortex_scalar::Scalar; - use crate::array::varbinview::{BinaryView, Inlined, Ref, VarBinViewArray, VIEW_SIZE}; + use crate::array::varbinview::{BinaryView, VarBinViewArray, VIEW_SIZE_BYTES}; use crate::compute::slice; use crate::compute::unary::scalar_at; - use crate::{Array, Canonical, IntoCanonical}; + use crate::{Canonical, IntoArray, IntoCanonical}; #[test] pub fn varbin_view() { @@ -387,11 +593,11 @@ mod test { VarBinViewArray::from_iter_str(["hello world", "hello world this is a long string"]); assert_eq!(binary_arr.len(), 2); assert_eq!( - scalar_at(binary_arr.as_ref(), 0).unwrap(), + scalar_at(&binary_arr, 0).unwrap(), Scalar::from("hello world") ); assert_eq!( - scalar_at(binary_arr.as_ref(), 1).unwrap(), + scalar_at(&binary_arr, 1).unwrap(), Scalar::from("hello world this is a long string") ); } @@ -399,7 +605,8 @@ mod test { #[test] pub fn slice_array() { let binary_arr = slice( - VarBinViewArray::from_iter_str(["hello world", "hello world this is a long string"]), + VarBinViewArray::from_iter_str(["hello world", "hello world this is a long string"]) + .into_array(), 1, 2, ) @@ -415,19 +622,17 @@ mod test { let binary_arr = VarBinViewArray::from_iter_str(["string1", "string2"]); let flattened = binary_arr.into_canonical().unwrap(); - assert!(matches!(flattened, Canonical::VarBin(_))); + assert!(matches!(flattened, Canonical::VarBinView(_))); - let var_bin: Array = flattened.into(); + let var_bin = flattened.into_varbinview().unwrap().into_array(); assert_eq!(scalar_at(&var_bin, 0).unwrap(), Scalar::from("string1")); assert_eq!(scalar_at(&var_bin, 1).unwrap(), Scalar::from("string2")); } #[test] pub fn binary_view_size_and_alignment() { - assert_eq!(std::mem::size_of::(), 16); - assert_eq!(std::mem::size_of::(), 16); - assert_eq!(std::mem::size_of::(), VIEW_SIZE); - assert_eq!(std::mem::size_of::(), 16); - assert_eq!(std::mem::align_of::(), 8); + assert_eq!(size_of::(), VIEW_SIZE_BYTES); + assert_eq!(size_of::(), 16); + assert_eq!(align_of::(), 8); } } diff --git a/vortex-array/src/arrow/dtype.rs b/vortex-array/src/arrow/dtype.rs index f84826037..6d39c7f05 100644 --- a/vortex-array/src/arrow/dtype.rs +++ b/vortex-array/src/arrow/dtype.rs @@ -77,8 +77,8 @@ impl FromArrowType<&Field> for DType { match field.data_type() { DataType::Null => Null, DataType::Boolean => Bool(nullability), - DataType::Utf8 | DataType::LargeUtf8 => Utf8(nullability), - DataType::Binary | DataType::LargeBinary => Binary(nullability), + DataType::Utf8 | DataType::LargeUtf8 | DataType::Utf8View => Utf8(nullability), + DataType::Binary | DataType::LargeBinary | DataType::BinaryView => Binary(nullability), DataType::Date32 | DataType::Date64 | DataType::Time32(_) @@ -149,8 +149,8 @@ pub fn infer_data_type(dtype: &DType) -> VortexResult { PType::F32 => DataType::Float32, PType::F64 => DataType::Float64, }, - DType::Utf8(_) => DataType::Utf8, - DType::Binary(_) => DataType::Binary, + DType::Utf8(_) => DataType::Utf8View, + DType::Binary(_) => DataType::BinaryView, DType::Struct(struct_dtype, _) => { let mut fields = Vec::with_capacity(struct_dtype.names().len()); for (field_name, field_dt) in struct_dtype @@ -207,12 +207,12 @@ mod test { assert_eq!( infer_data_type(&DType::Utf8(Nullability::NonNullable)).unwrap(), - DataType::Utf8 + DataType::Utf8View ); assert_eq!( infer_data_type(&DType::Binary(Nullability::NonNullable)).unwrap(), - DataType::Binary + DataType::BinaryView ); assert_eq!( @@ -226,7 +226,7 @@ mod test { .unwrap(), DataType::Struct(Fields::from(vec![ FieldRef::from(Field::new("field_a", DataType::Boolean, false)), - FieldRef::from(Field::new("field_b", DataType::Utf8, true)), + FieldRef::from(Field::new("field_b", DataType::Utf8View, true)), ])) ); } @@ -250,7 +250,7 @@ mod test { infer_schema(&schema_nonnull).unwrap(), Schema::new(Fields::from(vec![ Field::new("field_a", DataType::Boolean, false), - Field::new("field_b", DataType::Utf8, false), + Field::new("field_b", DataType::Utf8View, false), Field::new("field_c", DataType::Int32, true), ])) ); diff --git a/vortex-array/src/canonical.rs b/vortex-array/src/canonical.rs index 547f0c985..a434f0492 100644 --- a/vortex-array/src/canonical.rs +++ b/vortex-array/src/canonical.rs @@ -7,12 +7,11 @@ use arrow_array::types::{ UInt32Type, UInt64Type, UInt8Type, }; use arrow_array::{ - ArrayRef, ArrowPrimitiveType, BinaryArray, BooleanArray as ArrowBoolArray, Date32Array, - Date64Array, LargeBinaryArray, LargeStringArray, NullArray as ArrowNullArray, - PrimitiveArray as ArrowPrimitiveArray, StringArray, StructArray as ArrowStructArray, - Time32MillisecondArray, Time32SecondArray, Time64MicrosecondArray, Time64NanosecondArray, - TimestampMicrosecondArray, TimestampMillisecondArray, TimestampNanosecondArray, - TimestampSecondArray, + ArrayRef, ArrowPrimitiveType, BooleanArray as ArrowBoolArray, Date32Array, Date64Array, + NullArray as ArrowNullArray, PrimitiveArray as ArrowPrimitiveArray, + StructArray as ArrowStructArray, Time32MillisecondArray, Time32SecondArray, + Time64MicrosecondArray, Time64NanosecondArray, TimestampMicrosecondArray, + TimestampMillisecondArray, TimestampNanosecondArray, TimestampSecondArray, }; use arrow_buffer::ScalarBuffer; use arrow_schema::{Field, Fields}; @@ -21,14 +20,14 @@ use vortex_dtype::{DType, NativePType, PType}; use vortex_error::{vortex_bail, VortexResult}; use crate::array::{ - BoolArray, ExtensionArray, NullArray, PrimitiveArray, StructArray, TemporalArray, VarBinArray, + varbinview_as_arrow, BoolArray, ExtensionArray, NullArray, PrimitiveArray, StructArray, + TemporalArray, VarBinViewArray, }; -use crate::arrow::wrappers::as_offset_buffer; use crate::compute::unary::try_cast; use crate::encoding::ArrayEncoding; use crate::validity::ArrayValidity; use crate::variants::StructArrayTrait; -use crate::{Array, ArrayDType, IntoArray, ToArray}; +use crate::{Array, ArrayDType, IntoArray}; /// The set of canonical array encodings, also the set of encodings that can be transferred to /// Arrow with zero-copy. @@ -48,21 +47,18 @@ use crate::{Array, ArrayDType, IntoArray, ToArray}; /// /// # Views support /// -/// Binary and String views are a new, better encoding format for nearly all use-cases. For now, -/// because DataFusion does not include pervasive support for compute over StringView, we opt to use -/// the [`VarBinArray`] as the canonical encoding (which corresponds to the Arrow `BinaryViewArray`). +/// Binary and String views, also known as "German strings" are a better encoding format for +/// nearly all use-cases. Variable-length binary views are part of the Apache Arrow spec, and are +/// fully supported by the Datafusion query engine. We use them as our canonical string encoding +/// for all `Utf8` and `Binary` typed arrays in Vortex. /// -/// We expect to change this soon once DataFusion is able to finish up some initial support, which -/// is tracked in . #[derive(Debug, Clone)] pub enum Canonical { Null(NullArray), Bool(BoolArray), Primitive(PrimitiveArray), Struct(StructArray), - VarBin(VarBinArray), - // TODO(aduffy): switch to useing VarBinView instead of VarBin - // VarBinView(VarBinViewArray), + VarBinView(VarBinViewArray), Extension(ExtensionArray), } @@ -78,7 +74,7 @@ impl Canonical { Canonical::Bool(a) => bool_to_arrow(a)?, Canonical::Primitive(a) => primitive_to_arrow(a)?, Canonical::Struct(a) => struct_to_arrow(a)?, - Canonical::VarBin(a) => varbin_to_arrow(a)?, + Canonical::VarBinView(a) => varbinview_as_arrow(&a), Canonical::Extension(a) => { if !is_temporal_ext_type(a.id()) { vortex_bail!("unsupported extension dtype with ID {}", a.id().as_ref()) @@ -119,10 +115,10 @@ impl Canonical { } } - pub fn into_varbin(self) -> VortexResult { + pub fn into_varbinview(self) -> VortexResult { match self { - Canonical::VarBin(a) => Ok(a), - _ => vortex_bail!("Cannot unwrap VarBinArray from {:?}", &self), + Canonical::VarBinView(a) => Ok(a), + _ => vortex_bail!("Cannot unwrap VarBinViewArray from {:?}", &self), } } @@ -214,77 +210,6 @@ fn struct_to_arrow(struct_array: StructArray) -> VortexResult { )?)) } -fn varbin_to_arrow(varbin_array: VarBinArray) -> VortexResult { - let offsets = varbin_array - .offsets() - .into_primitive() - .map_err(|err| err.with_context("Failed to canonicalize offsets"))?; - let offsets = match offsets.ptype() { - PType::I32 | PType::I64 => offsets, - PType::U64 => offsets.reinterpret_cast(PType::I64), - PType::U32 => offsets.reinterpret_cast(PType::I32), - // Unless it's u64, everything else can be converted into an i32. - _ => try_cast(offsets.to_array(), PType::I32.into()) - .and_then(|a| a.into_primitive()) - .map_err(|err| err.with_context("Failed to cast offsets to PrimitiveArray of i32"))?, - }; - let nulls = varbin_array - .logical_validity() - .to_null_buffer() - .map_err(|err| err.with_context("Failed to get null buffer from logical validity"))?; - - let data = varbin_array - .bytes() - .into_primitive() - .map_err(|err| err.with_context("Failed to canonicalize bytes"))?; - if data.ptype() != PType::U8 { - vortex_bail!("Expected bytes to be of type U8, got {}", data.ptype()); - } - let data = data.buffer(); - - // Switch on Arrow DType. - Ok(match varbin_array.dtype() { - DType::Binary(_) => match offsets.ptype() { - PType::I32 => Arc::new(unsafe { - BinaryArray::new_unchecked( - as_offset_buffer::(offsets), - data.clone().into_arrow(), - nulls, - ) - }), - PType::I64 => Arc::new(unsafe { - LargeBinaryArray::new_unchecked( - as_offset_buffer::(offsets), - data.clone().into_arrow(), - nulls, - ) - }), - _ => vortex_bail!("Invalid offsets type {}", offsets.ptype()), - }, - DType::Utf8(_) => match offsets.ptype() { - PType::I32 => Arc::new(unsafe { - StringArray::new_unchecked( - as_offset_buffer::(offsets), - data.clone().into_arrow(), - nulls, - ) - }), - PType::I64 => Arc::new(unsafe { - LargeStringArray::new_unchecked( - as_offset_buffer::(offsets), - data.clone().into_arrow(), - nulls, - ) - }), - _ => vortex_bail!("Invalid offsets type {}", offsets.ptype()), - }, - _ => vortex_bail!( - "expected utf8 or binary instead of {}", - varbin_array.dtype() - ), - }) -} - fn temporal_to_arrow(temporal_array: TemporalArray) -> VortexResult { macro_rules! extract_temporal_values { ($values:expr, $prim:ty) => {{ @@ -387,7 +312,7 @@ pub trait IntoArrayVariant { fn into_struct(self) -> VortexResult; - fn into_varbin(self) -> VortexResult; + fn into_varbinview(self) -> VortexResult; fn into_extension(self) -> VortexResult; } @@ -412,8 +337,8 @@ where self.into_canonical()?.into_struct() } - fn into_varbin(self) -> VortexResult { - self.into_canonical()?.into_varbin() + fn into_varbinview(self) -> VortexResult { + self.into_canonical()?.into_varbinview() } fn into_extension(self) -> VortexResult { @@ -443,7 +368,7 @@ impl From for Array { Canonical::Bool(a) => a.into(), Canonical::Primitive(a) => a.into(), Canonical::Struct(a) => a.into(), - Canonical::VarBin(a) => a.into(), + Canonical::VarBinView(a) => a.into(), Canonical::Extension(a) => a.into(), } } @@ -456,7 +381,8 @@ mod test { use arrow_array::cast::AsArray; use arrow_array::types::{Int32Type, Int64Type, UInt64Type}; use arrow_array::{ - Array, PrimitiveArray as ArrowPrimitiveArray, StringArray, StructArray as ArrowStructArray, + Array, PrimitiveArray as ArrowPrimitiveArray, StringViewArray, + StructArray as ArrowStructArray, }; use arrow_buffer::NullBufferBuilder; use arrow_schema::{DataType, Field}; @@ -540,7 +466,7 @@ mod test { nulls.append_n_non_nulls(4); nulls.append_null(); nulls.append_non_null(); - let names = Arc::new(StringArray::from_iter(vec![ + let names = Arc::new(StringViewArray::from_iter(vec![ Some("Joseph"), None, Some("Angela"), @@ -559,7 +485,7 @@ mod test { let arrow_struct = ArrowStructArray::new( vec![ - Arc::new(Field::new("name", DataType::Utf8, true)), + Arc::new(Field::new("name", DataType::Utf8View, true)), Arc::new(Field::new("age", DataType::Int32, true)), ] .into(), diff --git a/vortex-array/src/variants.rs b/vortex-array/src/variants.rs index 8e2db7641..8730dbd72 100644 --- a/vortex-array/src/variants.rs +++ b/vortex-array/src/variants.rs @@ -3,12 +3,15 @@ //! When callers only want to make assumptions about the DType, and not about any specific //! encoding, they can use these traits to write encoding-agnostic code. +use std::ops::Not; + use vortex_dtype::field::Field; use vortex_dtype::{DType, ExtDType, FieldNames}; use vortex_error::{vortex_panic, VortexExpect as _, VortexResult}; +use crate::array::BoolArray; use crate::iter::{AccessorRef, VectorizedArrayIter}; -use crate::{Array, ArrayTrait}; +use crate::{Array, ArrayTrait, IntoArray, IntoArrayVariant}; pub trait ArrayVariants { fn as_null_array(&self) -> Option<&dyn NullArrayTrait> { @@ -81,6 +84,21 @@ pub trait ArrayVariants { pub trait NullArrayTrait: ArrayTrait {} pub trait BoolArrayTrait: ArrayTrait { + /// Return a new inverted version of this array. + /// + /// True -> False + /// False -> True + /// Null -> Null + fn invert(&self) -> VortexResult + where + Self: Clone, + { + let bool_array = self.clone().into_bool()?; + let validity = bool_array.validity(); + + BoolArray::try_new(bool_array.boolean_buffer().not(), validity).map(|a| a.into_array()) + } + fn true_count(&self) -> usize { self.statistics() .compute_true_count() diff --git a/vortex-datafusion/src/memory.rs b/vortex-datafusion/src/memory.rs index 257677c8d..dcb594388 100644 --- a/vortex-datafusion/src/memory.rs +++ b/vortex-datafusion/src/memory.rs @@ -214,26 +214,22 @@ mod test { use datafusion::prelude::SessionContext; use datafusion_common::{Column, TableReference}; use datafusion_expr::{and, col, lit, BinaryExpr, Expr, Operator}; - use vortex::array::{PrimitiveArray, StructArray, VarBinArray}; + use vortex::array::{PrimitiveArray, StructArray, VarBinViewArray}; use vortex::validity::Validity; use vortex::{Array, IntoArray}; - use vortex_dtype::{DType, Nullability}; use crate::memory::VortexMemTableOptions; use crate::{can_be_pushed_down, SessionContextExt as _}; fn presidents_array() -> Array { - let names = VarBinArray::from_vec( - vec![ - "Washington", - "Adams", - "Jefferson", - "Madison", - "Monroe", - "Adams", - ], - DType::Utf8(Nullability::NonNullable), - ); + let names = VarBinViewArray::from_iter_str([ + "Washington", + "Adams", + "Jefferson", + "Madison", + "Monroe", + "Adams", + ]); let term_start = PrimitiveArray::from_vec( vec![1789u16, 1797, 1801, 1809, 1817, 1825], Validity::NonNullable, diff --git a/vortex-sampling-compressor/src/compressors/dict.rs b/vortex-sampling-compressor/src/compressors/dict.rs index 67aa92b95..ccd9f9b3b 100644 --- a/vortex-sampling-compressor/src/compressors/dict.rs +++ b/vortex-sampling-compressor/src/compressors/dict.rs @@ -1,10 +1,13 @@ use std::collections::HashSet; -use vortex::array::{Primitive, PrimitiveArray, VarBin, VarBinArray}; +use vortex::array::{Primitive, PrimitiveArray, VarBin, VarBinArray, VarBinView, VarBinViewArray}; use vortex::encoding::EncodingRef; use vortex::stats::ArrayStatistics; use vortex::{Array, ArrayDef, IntoArray}; -use vortex_dict::{dict_encode_primitive, dict_encode_varbin, Dict, DictArray, DictEncoding}; +use vortex_dict::{ + dict_encode_primitive, dict_encode_varbin, dict_encode_varbinview, Dict, DictArray, + DictEncoding, +}; use vortex_error::VortexResult; use crate::compressors::{CompressedArray, CompressionTree, EncodingCompressor}; @@ -58,6 +61,11 @@ impl EncodingCompressor for DictCompressor { let (codes, values) = dict_encode_varbin(&vb); (codes.into_array(), values.into_array()) } + VarBinView::ID => { + let vb = VarBinViewArray::try_from(array)?; + let (codes, values) = dict_encode_varbinview(&vb); + (codes.into_array(), values.into_array()) + } _ => unreachable!("This array kind should have been filtered out"), }; diff --git a/vortex-sampling-compressor/tests/smoketest.rs b/vortex-sampling-compressor/tests/smoketest.rs index e4be56650..7ea31f331 100644 --- a/vortex-sampling-compressor/tests/smoketest.rs +++ b/vortex-sampling-compressor/tests/smoketest.rs @@ -28,6 +28,7 @@ mod tests { use vortex_datetime_parts::DateTimeParts; use vortex_dict::Dict; use vortex_fastlanes::FoR; + use vortex_fsst::FSST; use vortex_sampling_compressor::compressors::alp_rd::ALPRDCompressor; use vortex_sampling_compressor::compressors::bitpacked::BITPACK_WITH_PATCHES; use vortex_sampling_compressor::compressors::delta::DeltaCompressor; @@ -163,7 +164,7 @@ mod tests { .try_into() .unwrap(); for chunk in varbin_col.chunks() { - assert_eq!(chunk.encoding().id(), Dict::ID); + assert!(chunk.encoding().id() == Dict::ID || chunk.encoding().id() == FSST::ID); } let binary_col: ChunkedArray = struct_array diff --git a/vortex-scalar/src/arrow.rs b/vortex-scalar/src/arrow.rs index 32a3a4677..9a02462bb 100644 --- a/vortex-scalar/src/arrow.rs +++ b/vortex-scalar/src/arrow.rs @@ -55,10 +55,10 @@ impl TryFrom<&Scalar> for Arc { }) } DType::Utf8(_) => { - value_to_arrow_scalar!(value.value.as_buffer_string()?, StringArray) + value_to_arrow_scalar!(value.value.as_buffer_string()?, StringViewArray) } DType::Binary(_) => { - value_to_arrow_scalar!(value.value.as_buffer()?, BinaryArray) + value_to_arrow_scalar!(value.value.as_buffer()?, BinaryViewArray) } DType::Struct(..) => { todo!("struct scalar conversion") diff --git a/vortex-serde/src/layouts/tests.rs b/vortex-serde/src/layouts/tests.rs index 9386e5db2..4ddd1c87c 100644 --- a/vortex-serde/src/layouts/tests.rs +++ b/vortex-serde/src/layouts/tests.rs @@ -103,7 +103,7 @@ async fn test_read_projection() { .unwrap() .field(0) .unwrap() - .into_varbin() + .into_varbinview() .unwrap() .with_iterator(|x| { x.map(|x| unsafe { String::from_utf8_unchecked(x.unwrap().to_vec()) }) @@ -135,7 +135,7 @@ async fn test_read_projection() { .unwrap() .field(0) .unwrap() - .into_varbin() + .into_varbinview() .unwrap() .with_iterator(|x| { x.map(|x| unsafe { String::from_utf8_unchecked(x.unwrap().to_vec()) }) @@ -333,7 +333,7 @@ async fn filter_string() { .unwrap(); assert_eq!( names - .into_varbin() + .into_varbinview() .unwrap() .with_iterator(|iter| iter .flatten() @@ -406,7 +406,7 @@ async fn filter_or() { .unwrap(); assert_eq!( names - .into_varbin() + .into_varbinview() .unwrap() .with_iterator(|iter| iter .flatten() @@ -471,7 +471,7 @@ async fn filter_and() { .unwrap(); assert_eq!( names - .into_varbin() + .into_varbinview() .unwrap() .with_iterator(|iter| iter .map(|s| s.map(|st| unsafe { String::from_utf8_unchecked(st.to_vec()) }))