Skip to content

Commit

Permalink
feat: German strings, attempt 3 (#1082)
Browse files Browse the repository at this point in the history
Implementing VarBinView as our canonical string type. Concretely, this
involves

1. Changing `Canonical` variant from `VarBin` to `VarBinView` and
updating all of those method names accordingly
2. `impl IntoCanonical for VarBinArray` that involves doing a
single-pass construction of views
3. Changing the Arrow type from `Utf8/Binary` to `Utf8View/BinaryView`
and propagating that in Datafusion and Python bindings
4. Changing how ChunkedArray canonicalize works to repack views instead
of repacking arrays
  • Loading branch information
a10y authored Oct 22, 2024
1 parent e8a2bf5 commit b9335aa
Show file tree
Hide file tree
Showing 47 changed files with 1,212 additions and 427 deletions.
2 changes: 2 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -125,6 +125,7 @@ serde = "1.0.197"
serde_json = "1.0.116"
serde_test = "1.0.176"
simplelog = { version = "0.12.2", features = ["paris"] }
static_assertions = "1"
tar = "0.4"
tempfile = "3"
thiserror = "1.0.58"
Expand Down
6 changes: 3 additions & 3 deletions bench-vortex/benches/bytes_at.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ use std::sync::Arc;
use criterion::{criterion_group, criterion_main, BenchmarkId, Criterion};
use futures::executor::block_on;
use futures::StreamExt;
use vortex::array::{PrimitiveArray, VarBinArray};
use vortex::array::{PrimitiveArray, VarBinArray, VarBinViewArray};
use vortex::validity::Validity;
use vortex::{Context, IntoArray, IntoCanonical};
use vortex_dtype::{DType, Nullability};
Expand All @@ -24,7 +24,7 @@ fn array_data_fixture() -> VarBinArray {
.unwrap()
}

fn array_view_fixture() -> VarBinArray {
fn array_view_fixture() -> VarBinViewArray {
let array_data = array_data_fixture();
let mut buffer = Vec::new();

Expand All @@ -42,7 +42,7 @@ fn array_view_fixture() -> VarBinArray {
.unwrap()
.into_canonical()
.unwrap()
.into_varbin()
.into_varbinview()
.unwrap()
}

Expand Down
58 changes: 29 additions & 29 deletions bench-vortex/src/tpch/schema.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,62 +7,62 @@ use lazy_static::lazy_static;
lazy_static! {
pub static ref NATION: Schema = Schema::new(vec![
Field::new("n_nationkey", DataType::Int64, false),
Field::new("n_name", DataType::Utf8, false),
Field::new("n_name", DataType::Utf8View, false),
Field::new("n_regionkey", DataType::Int64, false),
Field::new("n_comment", DataType::Utf8, true),
Field::new("n_comment", DataType::Utf8View, true),
]);
pub static ref REGION: Schema = Schema::new(vec![
Field::new("r_regionkey", DataType::Int64, false),
Field::new("r_name", DataType::Utf8, false),
Field::new("r_comment", DataType::Utf8, true),
Field::new("r_name", DataType::Utf8View, false),
Field::new("r_comment", DataType::Utf8View, true),
]);
pub static ref PART: Schema = Schema::new(vec![
Field::new("p_partkey", DataType::Int64, false),
Field::new("p_name", DataType::Utf8, false),
Field::new("p_mfgr", DataType::Utf8, false),
Field::new("p_brand", DataType::Utf8, false),
Field::new("p_type", DataType::Utf8, false),
Field::new("p_name", DataType::Utf8View, false),
Field::new("p_mfgr", DataType::Utf8View, false),
Field::new("p_brand", DataType::Utf8View, false),
Field::new("p_type", DataType::Utf8View, false),
Field::new("p_size", DataType::Int32, false),
Field::new("p_container", DataType::Utf8, false),
Field::new("p_container", DataType::Utf8View, false),
Field::new("p_retailprice", DataType::Float64, false),
Field::new("p_comment", DataType::Utf8, false),
Field::new("p_comment", DataType::Utf8View, false),
]);
pub static ref SUPPLIER: Schema = Schema::new(vec![
Field::new("s_suppkey", DataType::Int64, false),
Field::new("s_name", DataType::Utf8, false),
Field::new("s_address", DataType::Utf8, false),
Field::new("s_name", DataType::Utf8View, false),
Field::new("s_address", DataType::Utf8View, false),
Field::new("s_nationkey", DataType::Int32, false),
Field::new("s_phone", DataType::Utf8, false),
Field::new("s_phone", DataType::Utf8View, false),
Field::new("s_acctbal", DataType::Float64, false),
Field::new("s_comment", DataType::Utf8, false),
Field::new("s_comment", DataType::Utf8View, false),
]);
pub static ref PARTSUPP: Schema = Schema::new(vec![
Field::new("ps_partkey", DataType::Int64, false),
Field::new("ps_suppkey", DataType::Int64, false),
Field::new("ps_availqty", DataType::Int64, false),
Field::new("ps_supplycost", DataType::Float64, false),
Field::new("ps_comment", DataType::Utf8, false),
Field::new("ps_comment", DataType::Utf8View, false),
]);
pub static ref CUSTOMER: Schema = Schema::new(vec![
Field::new("c_custkey", DataType::Int64, false),
Field::new("c_name", DataType::Utf8, false),
Field::new("c_address", DataType::Utf8, false),
Field::new("c_name", DataType::Utf8View, false),
Field::new("c_address", DataType::Utf8View, false),
Field::new("c_nationkey", DataType::Int64, false),
Field::new("c_phone", DataType::Utf8, false),
Field::new("c_phone", DataType::Utf8View, false),
Field::new("c_acctbal", DataType::Float64, false),
Field::new("c_mktsegment", DataType::Utf8, false),
Field::new("c_comment", DataType::Utf8, false),
Field::new("c_mktsegment", DataType::Utf8View, false),
Field::new("c_comment", DataType::Utf8View, false),
]);
pub static ref ORDERS: Schema = Schema::new(vec![
Field::new("o_orderkey", DataType::Int64, false),
Field::new("o_custkey", DataType::Int64, false),
Field::new("o_orderstatus", DataType::Utf8, false),
Field::new("o_orderstatus", DataType::Utf8View, false),
Field::new("o_totalprice", DataType::Float64, false),
Field::new("o_orderdate", DataType::Date32, false),
Field::new("o_orderpriority", DataType::Utf8, false),
Field::new("o_clerk", DataType::Utf8, false),
Field::new("o_orderpriority", DataType::Utf8View, false),
Field::new("o_clerk", DataType::Utf8View, false),
Field::new("o_shippriority", DataType::Int32, false),
Field::new("o_comment", DataType::Utf8, false),
Field::new("o_comment", DataType::Utf8View, false),
]);
pub static ref LINEITEM: Schema = Schema::new(vec![
Field::new("l_orderkey", DataType::Int64, false),
Expand All @@ -73,13 +73,13 @@ lazy_static! {
Field::new("l_extendedprice", DataType::Float64, false),
Field::new("l_discount", DataType::Float64, false),
Field::new("l_tax", DataType::Float64, false),
Field::new("l_returnflag", DataType::Utf8, false),
Field::new("l_linestatus", DataType::Utf8, false),
Field::new("l_returnflag", DataType::Utf8View, false),
Field::new("l_linestatus", DataType::Utf8View, false),
Field::new("l_shipdate", DataType::Date32, false),
Field::new("l_commitdate", DataType::Date32, false),
Field::new("l_receiptdate", DataType::Date32, false),
Field::new("l_shipinstruct", DataType::Utf8, false),
Field::new("l_shipmode", DataType::Utf8, false),
Field::new("l_comment", DataType::Utf8, false),
Field::new("l_shipinstruct", DataType::Utf8View, false),
Field::new("l_shipmode", DataType::Utf8View, false),
Field::new("l_comment", DataType::Utf8View, false),
]);
}
1 change: 1 addition & 0 deletions encodings/dict/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ categories = { workspace = true }
readme = { workspace = true }

[dependencies]
arrow-array = { workspace = true }
arrow-buffer = { workspace = true }
hashbrown = { workspace = true }
num-traits = { workspace = true }
Expand Down
26 changes: 22 additions & 4 deletions encodings/dict/src/compress.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,11 +4,14 @@ use hashbrown::hash_map::{Entry, RawEntryMut};
use hashbrown::{DefaultHashBuilder, HashMap};
use num_traits::AsPrimitive;
use vortex::accessor::ArrayAccessor;
use vortex::array::{PrimitiveArray, VarBinArray};
use vortex::array::{PrimitiveArray, VarBinArray, VarBinViewArray};
use vortex::validity::Validity;
use vortex::{ArrayDType, IntoArray};
use vortex::{ArrayDType, IntoArray, IntoCanonical};
use vortex_dtype::{match_each_native_ptype, DType, NativePType, ToBytes};
use vortex_error::VortexExpect as _;
use vortex_error::{VortexExpect as _, VortexUnwrap};

/// Statically assigned code for a null value.
pub const NULL_CODE: u64 = 0;

#[derive(Debug)]
struct Value<T>(T);
Expand Down Expand Up @@ -49,7 +52,7 @@ pub fn dict_encode_typed_primitive<T: NativePType>(
ArrayAccessor::<T>::with_iterator(array, |iter| {
for ov in iter {
match ov {
None => codes.push(0),
None => codes.push(NULL_CODE),
Some(&v) => {
let code = match lookup_dict.entry(Value(v)) {
Entry::Occupied(o) => *o.get(),
Expand Down Expand Up @@ -89,6 +92,21 @@ pub fn dict_encode_varbin(array: &VarBinArray) -> (PrimitiveArray, VarBinArray)
.vortex_expect("Failed to dictionary encode varbin array")
}

/// Dictionary encode a VarbinViewArray.
pub fn dict_encode_varbinview(array: &VarBinViewArray) -> (PrimitiveArray, VarBinViewArray) {
let (codes, values) = array
.with_iterator(|iter| dict_encode_typed_varbin(array.dtype().clone(), iter))
.vortex_unwrap();
(
codes,
values
.into_canonical()
.vortex_expect("VarBin to canonical")
.into_varbinview()
.vortex_expect("VarBinView"),
)
}

fn lookup_bytes<'a, T: NativePType + AsPrimitive<usize>>(
offsets: &'a [T],
bytes: &'a [u8],
Expand Down
28 changes: 17 additions & 11 deletions encodings/dict/src/compute.rs
Original file line number Diff line number Diff line change
Expand Up @@ -66,11 +66,12 @@ impl SliceFn for DictArray {

#[cfg(test)]
mod test {
use vortex::array::{PrimitiveArray, VarBinArray};
use vortex::accessor::ArrayAccessor;
use vortex::array::{PrimitiveArray, VarBinViewArray};
use vortex::{IntoArray, IntoArrayVariant, ToArray};
use vortex_dtype::{DType, Nullability};

use crate::{dict_encode_typed_primitive, dict_encode_varbin, DictArray};
use crate::{dict_encode_typed_primitive, dict_encode_varbinview, DictArray};

#[test]
fn flatten_nullable_primitive() {
Expand All @@ -90,20 +91,25 @@ mod test {

#[test]
fn flatten_nullable_varbin() {
let reference = VarBinArray::from_iter(
let reference = VarBinViewArray::from_iter(
vec![Some("a"), Some("b"), None, Some("a"), None, Some("b")],
DType::Utf8(Nullability::Nullable),
);
let (codes, values) = dict_encode_varbin(&reference);
assert_eq!(reference.len(), 6);
let (codes, values) = dict_encode_varbinview(&reference);
let dict = DictArray::try_new(codes.into_array(), values.into_array()).unwrap();
let flattened_dict = dict.to_array().into_varbin().unwrap();
let flattened_dict = dict.to_array().into_varbinview().unwrap();
assert_eq!(
flattened_dict.offsets().into_primitive().unwrap().buffer(),
reference.offsets().into_primitive().unwrap().buffer()
);
assert_eq!(
flattened_dict.bytes().into_primitive().unwrap().buffer(),
reference.bytes().into_primitive().unwrap().buffer()
flattened_dict
.with_iterator(|iter| iter
.map(|slice| slice.map(|s| s.to_vec()))
.collect::<Vec<_>>())
.unwrap(),
reference
.with_iterator(|iter| iter
.map(|slice| slice.map(|s| s.to_vec()))
.collect::<Vec<_>>())
.unwrap(),
);
}
}
11 changes: 9 additions & 2 deletions encodings/fsst/src/array.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,12 +4,12 @@ use std::sync::Arc;
use fsst::{Decompressor, Symbol};
use serde::{Deserialize, Serialize};
use vortex::array::visitor::{AcceptArrayVisitor, ArrayVisitor};
use vortex::array::VarBinArray;
use vortex::array::{VarBin, VarBinArray};
use vortex::encoding::ids;
use vortex::stats::{ArrayStatisticsCompute, StatsSet};
use vortex::validity::{ArrayValidity, LogicalValidity, Validity};
use vortex::variants::{ArrayVariants, BinaryArrayTrait, Utf8ArrayTrait};
use vortex::{impl_encoding, Array, ArrayDType, ArrayTrait, IntoCanonical};
use vortex::{impl_encoding, Array, ArrayDType, ArrayDef, ArrayTrait, IntoCanonical};
use vortex_dtype::{DType, Nullability, PType};
use vortex_error::{vortex_bail, VortexExpect, VortexResult};

Expand Down Expand Up @@ -73,6 +73,13 @@ impl FSSTArray {
vortex_bail!(InvalidArgument: "uncompressed_lengths must have integer type and cannot be nullable");
}

if codes.encoding().id() != VarBin::ID {
vortex_bail!(
InvalidArgument: "codes must have varbin encoding, was {}",
codes.encoding().id()
);
}

// Check: strings must be a Binary array.
if !matches!(codes.dtype(), DType::Binary(_)) {
vortex_bail!(InvalidArgument: "codes array must be DType::Binary type");
Expand Down
17 changes: 10 additions & 7 deletions encodings/fsst/src/canonical.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
use vortex::array::{PrimitiveArray, VarBinArray};
use vortex::{ArrayDType, Canonical, IntoArray, IntoCanonical};
use vortex::{ArrayDType, Canonical, IntoArray, IntoArrayVariant, IntoCanonical};
use vortex_error::VortexResult;

use crate::FSSTArray;
Expand Down Expand Up @@ -44,12 +44,15 @@ impl IntoCanonical for FSSTArray {
let offsets_array = PrimitiveArray::from(offsets).into_array();
let uncompressed_bytes_array = PrimitiveArray::from(uncompressed_bytes).into_array();

Ok(Canonical::VarBin(VarBinArray::try_new(
offsets_array,
uncompressed_bytes_array,
self.dtype().clone(),
self.validity(),
)?))
Ok(Canonical::VarBinView(
VarBinArray::try_new(
offsets_array,
uncompressed_bytes_array,
self.dtype().clone(),
self.validity(),
)?
.into_varbinview()?,
))
})
}
}
Loading

0 comments on commit b9335aa

Please sign in to comment.