From d4fa445e54457bcd9ca143dc083b9d14eae87f09 Mon Sep 17 00:00:00 2001 From: chenminghua8 Date: Tue, 22 Oct 2024 19:10:37 +0800 Subject: [PATCH 1/5] =?UTF-8?q?support=20dataSketches=20include=20Quantile?= =?UTF-8?q?=20Sketches=E3=80=81Theta=20Sketch=E3=80=81Frequency=20Sketches?= =?UTF-8?q?.?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Signed-off-by: chenminghua8 --- be/src/exprs/agg/ds_agg.h | 691 ++++++++++++++++++ be/src/exprs/agg/ds_hll_count_distinct.h | 240 ------ .../exprs/agg/factory/aggregate_factory.hpp | 18 +- .../agg/factory/aggregate_resolver_approx.cpp | 28 +- be/src/types/CMakeLists.txt | 2 +- .../types/{hll_sketch.cpp => ds_sketch.cpp} | 5 +- be/src/types/ds_sketch.h | 566 ++++++++++++++ be/src/types/hll_sketch.h | 143 ---- .../aggregate-functions/ds_hll.md | 26 + .../com/starrocks/catalog/FunctionSet.java | 141 +++- .../sql/analyzer/FunctionAnalyzer.java | 94 ++- .../rule/tree/PreAggregateTurnOnRule.java | 3 + .../starrocks/sql/parser/SyntaxSugars.java | 1 + .../test_agg_function/R/test_datasketches.sql | 89 +++ .../test_agg_function/T/test_datasketches.sql | 56 ++ 15 files changed, 1688 insertions(+), 415 deletions(-) create mode 100644 be/src/exprs/agg/ds_agg.h delete mode 100644 be/src/exprs/agg/ds_hll_count_distinct.h rename be/src/types/{hll_sketch.cpp => ds_sketch.cpp} (97%) create mode 100644 be/src/types/ds_sketch.h delete mode 100644 be/src/types/hll_sketch.h create mode 100644 docs/en/sql-reference/sql-functions/aggregate-functions/ds_hll.md create mode 100644 test/sql/test_agg_function/R/test_datasketches.sql create mode 100644 test/sql/test_agg_function/T/test_datasketches.sql diff --git a/be/src/exprs/agg/ds_agg.h b/be/src/exprs/agg/ds_agg.h new file mode 100644 index 0000000000000..04a9ca7977549 --- /dev/null +++ b/be/src/exprs/agg/ds_agg.h @@ -0,0 +1,691 @@ +// Copyright 2021-present StarRocks, Inc. All rights reserved. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// https://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +#pragma once + +#include "column/binary_column.h" +#include "column/object_column.h" +#include "column/type_traits.h" +#include "column/vectorized_fwd.h" +#include "exprs/agg/aggregate.h" +#include "gutil/casts.h" +#include "types/ds_sketch.h" + +namespace starrocks { + +enum SketchType { + HLL = 0, + QUANTILE = 1, + FREQUENT = 2, + THETA =3, +}; + +template +struct DSSketchState { +}; + +template +struct DSSketchState { + using ColumnType = RunTimeColumnType; + std::unique_ptr ds_sketch_wrapper = nullptr; + int64_t memory_usage = 0; + + void init(FunctionContext* ctx) { + uint8_t log_k; + datasketches::target_hll_type tgt_type; + std::tie(log_k, tgt_type) = _parse_hll_sketch_args(ctx); + ds_sketch_wrapper = std::make_unique(log_k, tgt_type, &memory_usage); + } + + bool is_inited() const { + return ds_sketch_wrapper != nullptr; + } + + void merge(const BinaryColumn* sketch_data_column, size_t row_num) { + DSSketchState other_state; + other_state.deserialize(sketch_data_column->get(row_num).get_slice(), &memory_usage); + if (UNLIKELY(!is_inited())) { + ds_sketch_wrapper = std::make_unique( + other_state.ds_sketch_wrapper->get_lg_config_k(), other_state.ds_sketch_wrapper->get_target_type(), &memory_usage); + } + ds_sketch_wrapper->merge(*other_state.ds_sketch_wrapper); + } + + void update(const Column* data_column, size_t row_num) const { + uint64_t value = 0; + const ColumnType* column = down_cast(data_column); + + if constexpr (lt_is_string) { + Slice s = column->get_slice(row_num); + value = HashUtil::murmur_hash64A(s.data, s.size, HashUtil::MURMUR_SEED); + } else { + const auto& v = column->get_data(); + value = HashUtil::murmur_hash64A(&v[row_num], sizeof(v[row_num]), HashUtil::MURMUR_SEED); + } + ds_sketch_wrapper->update(value); + } + + void update_batch_single_state_with_frame(const Column* data_column, int64_t frame_start, int64_t frame_end) const { + const ColumnType* column = down_cast(data_column); + if constexpr (lt_is_string) { + uint64_t value = 0; + for (size_t i = frame_start; i < frame_end; ++i) { + Slice s = column->get_slice(i); + value = HashUtil::murmur_hash64A(s.data, s.size, HashUtil::MURMUR_SEED); + + if (value != 0) { + ds_sketch_wrapper->update(value); + } + } + } else { + uint64_t value = 0; + const auto& v = column->get_data(); + for (size_t i = frame_start; i < frame_end; ++i) { + value = HashUtil::murmur_hash64A(&v[i], sizeof(v[i]), HashUtil::MURMUR_SEED); + + if (value != 0) { + ds_sketch_wrapper->update(value); + } + } + } + } + + size_t serialize(uint8_t* dst) const { + return ds_sketch_wrapper->serialize(dst); + } + + size_t serialize_size() const { + return ds_sketch_wrapper->serialize_size(); + } + + void deserialize(const Slice& slice, int64_t* memory_usage) { + ds_sketch_wrapper = std::make_unique(slice, memory_usage); + } + + void get_values(Column* dst, size_t start, size_t end) const { + Int64Column* column = down_cast(dst); + int64_t result = 0L; + if (LIKELY(ds_sketch_wrapper != nullptr)) { + result = ds_sketch_wrapper->estimate_cardinality(); + } + for (size_t i = start; i < end; ++i) { + column->append(result); + } + } + + static std::string getFunName() { + return "ds_hll_count_distinct"; + } + +private: + // parse log_k and target type from args + static std::tuple _parse_hll_sketch_args(FunctionContext* ctx) { + uint8_t log_k = DEFAULT_HLL_LOG_K; + datasketches::target_hll_type tgt_type = datasketches::HLL_6; + if (ctx->get_num_args() == 2) { + log_k = (uint8_t)ColumnHelper::get_const_value(ctx->get_constant_column(1)); + } else if (ctx->get_num_args() == 3) { + log_k = (uint8_t)ColumnHelper::get_const_value(ctx->get_constant_column(1)); + Column* tgt_type_column = ColumnHelper::get_data_column(ctx->get_constant_column(2).get()); + std::string tgt_type_str = tgt_type_column->get(0).get_slice().to_string(); + std::transform(tgt_type_str.begin(), tgt_type_str.end(), tgt_type_str.begin(), ::toupper); + if (tgt_type_str == "HLL_4") { + tgt_type = datasketches::HLL_4; + } else if (tgt_type_str == "HLL_8") { + tgt_type = datasketches::HLL_8; + } else { + tgt_type = datasketches::HLL_6; + } + } + return {log_k, tgt_type}; + } +}; + +template +struct DSSketchState { + using CppType = RunTimeCppType; + using ColumnType = RunTimeColumnType; + using SketchWarapperType = DataSketchesQuantile; + uint32_t ranks_size; + std::unique_ptr ranks = nullptr; + std::unique_ptr ds_sketch_wrapper = nullptr; + int64_t memory_usage = 0; + + + void init(FunctionContext* ctx) { + DatumArray datum_array; + uint16_t k; + std::tie(k, datum_array) = _parse_sketch_args(ctx); + if (datum_array.size() < 1) { + ranks_size = 1; + ranks = std::make_unique(ranks_size); + *ranks.get() = 0.5; + } else { + ranks_size = datum_array.size(); + ranks = std::make_unique(ranks_size); + double* ranks_prt = ranks.get(); + for (Datum rank : datum_array) { + *ranks_prt = rank.get_double(); + ranks_prt++; + } + } + if (ranks_size == 0) { + ranks_size = 0; + } + ds_sketch_wrapper = std::make_unique(k, &memory_usage); + } + + bool is_inited() const { + return ds_sketch_wrapper != nullptr; + } + + void update(const Column* data_column, size_t row_num) const { + const ColumnType* column = down_cast(data_column); + const auto& values = column->get_data(); + ds_sketch_wrapper->update(values[row_num]); + } + + void update_batch_single_state_with_frame(const Column* data_column, int64_t frame_start, int64_t frame_end) const { + const ColumnType* column = down_cast(data_column); + const auto& values = column->get_data(); + for (size_t i = frame_start; i < frame_end; ++i) { + ds_sketch_wrapper->update(values[i]); + } + } + + void merge(const BinaryColumn* sketch_data_column, size_t row_num) { + DSSketchState other_state; + other_state.deserialize(sketch_data_column->get(row_num).get_slice(), &memory_usage); + if (UNLIKELY(!is_inited())) { + ranks_size = other_state.ranks_size; + ranks = std::make_unique(ranks_size); + double* ranks_prt = ranks.get(); + for (int i = 0; i < ranks_size; i++) { + *ranks_prt = other_state.ranks.get()[i]; + ranks_prt++; + } + ds_sketch_wrapper = std::make_unique( + other_state.ds_sketch_wrapper->get_k(), &memory_usage); + } + ds_sketch_wrapper->merge(*other_state.ds_sketch_wrapper); + } + + size_t serialize(uint8_t* dst) const { + size_t offset = 0; + memcpy(dst + offset, &ranks_size, sizeof(ranks_size)); + offset = offset + sizeof(uint32_t); + memcpy(dst + offset, ranks.get(), ranks_size * sizeof(double)); + offset = offset + ranks_size * sizeof(double); + size_t ser_sketch_size = ds_sketch_wrapper->serialize(dst + offset); + return offset + ser_sketch_size; + } + + size_t serialize_size() const { + return sizeof(uint32_t) + ranks_size * sizeof(double) + ds_sketch_wrapper->serialize_size(); + } + + void deserialize(const Slice& slice, int64_t* memory_usage) { + uint8_t* ptr = (uint8_t*)slice.get_data(); + size_t offset = 0; + memcpy(&ranks_size, ptr + offset, sizeof(uint32_t)); + if (ranks_size == 0) { + ranks_size = 0; + } + offset = offset + sizeof(uint32_t); + ranks = std::make_unique(ranks_size); + memcpy(ranks.get(), ptr + offset, ranks_size * sizeof(double)); + offset = offset + ranks_size * sizeof(double); + const Slice sketch_data_slice = Slice(slice.get_data() + offset, slice.size - offset); + ds_sketch_wrapper = std::make_unique(sketch_data_slice, memory_usage); + + } + + void get_values(Column* dst, size_t start, size_t end) const { + auto* array_column = down_cast(dst); + auto& offset_column = array_column->offsets_column(); + auto& elements_column = array_column->elements_column(); + auto* nullable_column = down_cast(elements_column.get()); + auto* result_column = down_cast(nullable_column->data_column().get()); + + std::vector result; + if (LIKELY(ds_sketch_wrapper != nullptr)) { + result = ds_sketch_wrapper->get_quantiles(ranks.get(), ranks_size); + } + + uint32_t index =0; + for (size_t row = start; row < end; row++) { + for (CppType result_data : result) { + result_column->append(result_data); + nullable_column->null_column()->append(0); + index++; + } + offset_column->append(index); + } + } + + static std::string getFunName() { + return "ds_quantile"; + } + +private: + // parse k and rank_arr from args + static std::tuple _parse_sketch_args(FunctionContext* ctx) { + uint16_t k = DEFAULT_QUANTILE_K; + if (ctx->get_num_args() > 1) { + if (ctx->get_num_args() > 2) { + k = ColumnHelper::get_const_value(ctx->get_constant_column(2)); + if (k <= 1) { + k = DEFAULT_QUANTILE_K; + } + int i = 1; + while ((1 << i) < k) { + i += 1; + } + k = 1 << i; + } + Column* ranks_column = ColumnHelper::get_data_column(ctx->get_constant_column(1).get()); + if (ranks_column->is_array()) { + DatumArray rank_arr = ranks_column->get(0).get_array(); + return {k, rank_arr}; + } else { + DatumArray rank_arr; + double rank_value = ranks_column->get(0).get_double(); + rank_arr.push_back(rank_value); + return {k, rank_arr}; + } + } + DatumArray rank_arr; + return {k, rank_arr}; + } +}; + +template +struct SpecialCppType { + using CppType = RunTimeCppType; +}; +template <> +struct SpecialCppType { + using CppType = std::string; +}; +template <> +struct SpecialCppType { + using CppType = std::string; +}; +template <> +struct SpecialCppType { + using CppType = std::string; +}; +template <> +struct SpecialCppType { + using CppType = std::string; +}; + +template +struct DSSketchState { + using OriginalCppType = RunTimeCppType; + using CppType = SpecialCppType::CppType; + using ColumnType = RunTimeColumnType; + using SketchWarapperType = DataSketchesFrequent; + uint64_t counter_num; + uint8_t lg_max_map_size; + uint8_t lg_start_map_size; + std::unique_ptr ds_sketch_wrapper = nullptr; + int64_t memory_usage = 0; + + void init(FunctionContext* ctx) { + std::tie(counter_num, lg_max_map_size, lg_start_map_size) = _parse_sketch_args(ctx); + ds_sketch_wrapper = std::make_unique(lg_max_map_size, lg_start_map_size, &memory_usage); + } + + bool is_inited() const { + return ds_sketch_wrapper != nullptr; + } + + void update(const Column* data_column, size_t row_num) const { + if constexpr (!IsSlice) { + const ColumnType* column = down_cast(data_column); + const auto& values = column->get_data(); + ds_sketch_wrapper->update(values[row_num]); + } else { + const BinaryColumn* column = down_cast(data_column); + const Slice data = column->get_slice(row_num); + ds_sketch_wrapper->update(std::string(data.get_data(), data.size)); + } + } + + void update_batch_single_state_with_frame(const Column* data_column, int64_t frame_start, int64_t frame_end) const { + if constexpr (!IsSlice) { + const ColumnType* column = down_cast(data_column); + const auto& values = column->get_data(); + for (size_t i = frame_start; i < frame_end; ++i) { + ds_sketch_wrapper->update(values[i]); + } + } else { + const BinaryColumn* column = down_cast(data_column); + for (size_t i = frame_start; i < frame_end; ++i) { + const Slice data = column->get_slice(i); + ds_sketch_wrapper->update(std::string(data.get_data(), data.size)); + } + + } + } + + void merge(const BinaryColumn* sketch_data_column, size_t row_num) { + DSSketchState other_state; + other_state.deserialize(sketch_data_column->get(row_num).get_slice(), &memory_usage); + if (UNLIKELY(!is_inited())) { + counter_num = other_state.counter_num; + lg_max_map_size = other_state.lg_max_map_size; + lg_start_map_size = other_state.lg_start_map_size; + ds_sketch_wrapper = std::make_unique(lg_max_map_size, lg_max_map_size, &memory_usage); + } + ds_sketch_wrapper->merge(*other_state.ds_sketch_wrapper); + } + + size_t serialize(uint8_t* dst) const { + size_t offset = 0; + memcpy(dst + offset, &counter_num, sizeof(uint64_t)); + offset = offset + sizeof(uint64_t); + memcpy(dst + offset, &lg_max_map_size, sizeof(uint8_t)); + offset = offset + sizeof(uint8_t); + memcpy(dst + offset, &lg_start_map_size, sizeof(uint8_t)); + offset = offset + sizeof(uint8_t); + size_t ser_sketch_size = ds_sketch_wrapper->serialize(dst + offset); + return offset + ser_sketch_size; + } + + size_t serialize_size() const { + return sizeof(uint64_t) + sizeof(uint8_t) + sizeof(uint8_t) + ds_sketch_wrapper->serialize_size(); + } + + void deserialize(const Slice& slice, int64_t* memory_usage) { + uint8_t* ptr = (uint8_t*)slice.get_data(); + size_t offset = 0; + memcpy(&counter_num, ptr + offset, sizeof(uint64_t)); + offset = offset + sizeof(uint64_t); + memcpy(&lg_max_map_size, ptr + offset, sizeof(uint8_t)); + offset = offset + sizeof(uint8_t); + memcpy(&lg_start_map_size, ptr + offset, sizeof(uint8_t)); + offset = offset + sizeof(uint8_t); + const Slice sketch_data_slice = Slice(slice.get_data() + offset, slice.size - offset); + ds_sketch_wrapper = std::make_unique(sketch_data_slice, lg_max_map_size, + lg_start_map_size, memory_usage); + } + + void get_values(Column* dst, size_t start, size_t end) const { + auto* array_column = down_cast(dst); + auto& offset_column = array_column->offsets_column(); + auto& elements_column = array_column->elements_column(); + + auto* nullable_struct_column = down_cast(elements_column.get()); + auto* struct_column = down_cast(nullable_struct_column->data_column().get()); + auto* value_column = down_cast(struct_column->fields_column()[0].get()); + auto* count_column = down_cast(struct_column->fields_column()[1].get()); + auto* lower_bound_column = down_cast(struct_column->fields_column()[2].get()); + auto* upper_bound_column = down_cast(struct_column->fields_column()[3].get()); + + std::vector> result; + if (LIKELY(ds_sketch_wrapper != nullptr)) { + result = ds_sketch_wrapper->get_frequent_items(0); + } + uint32_t index =0; + for (size_t row = start; row < end; row++) { + uint32_t counter_num_index = 0; + for (FrequentRow frequentRow : result) { + if (counter_num_index >= counter_num) { + break; + } + if constexpr (!IsSlice) { + value_column->append_datum(frequentRow.value); + } else { + std::string value = frequentRow.value; + uint8_t value_data[value.length() + 1]; + std::memcpy(value_data, value.data(), value.length()); + value_data[value.length()] = '\0'; + value_column->append_datum(Slice(value_data, value.length() + 1)); + } + count_column->append_datum(frequentRow.count); + lower_bound_column->append_datum(frequentRow.lower_bound); + upper_bound_column->append_datum(frequentRow.upper_bound); + nullable_struct_column->null_column()->append(0); + index++; + counter_num_index++; + } + offset_column->append(index); + } + } + + static std::string getFunName() { + return "ds_frequent"; + } + +private: + // parse threshold lg_max_map_size and lg_start_map_size from args + static std::tuple _parse_sketch_args(FunctionContext* ctx) { + uint64_t counter_num = DEFAULT_COUNTER_NUM; + uint8_t lg_max_map_size = DEFAULT_FREQUENT_LG_MAX_SIZE; + uint8_t lg_start_map_size = DEFAULT_FREQUENT_LG_MIn_SIZE; + if (ctx->get_num_args() > 1) { + counter_num = ColumnHelper::get_const_value(ctx->get_constant_column(1)); + if (ctx->get_num_args() > 2) { + lg_max_map_size = ColumnHelper::get_const_value(ctx->get_constant_column(2)); + if (ctx->get_num_args() > 3) { + lg_start_map_size = ColumnHelper::get_const_value(ctx->get_constant_column(3)); + } + } + } + if (lg_max_map_size <= lg_start_map_size) { + lg_max_map_size = lg_start_map_size; + } + return {counter_num, lg_max_map_size, lg_start_map_size}; + + } +}; + +template +struct DSSketchState { + using CppType = SpecialCppType::CppType; + using ColumnType = RunTimeColumnType; + using SketchWarapperType = DataSketchesTheta; + + std::unique_ptr ds_sketch_wrapper = nullptr; + int64_t memory_usage = 0; + + void init(FunctionContext* ctx) { + ds_sketch_wrapper = std::make_unique(&memory_usage); + } + + bool is_inited() const { + return ds_sketch_wrapper != nullptr; + } + + void merge(const BinaryColumn* sketch_data_column, size_t row_num) { + DSSketchState other_state; + other_state.deserialize(sketch_data_column->get(row_num).get_slice(), &memory_usage); + if (UNLIKELY(!is_inited())) { + ds_sketch_wrapper = std::make_unique(&memory_usage); + } + ds_sketch_wrapper->merge(*other_state.ds_sketch_wrapper); + } + + void update(const Column* data_column, size_t row_num) const { + uint64_t value = 0; + const ColumnType* column = down_cast(data_column); + + if constexpr (lt_is_string) { + Slice s = column->get_slice(row_num); + value = HashUtil::murmur_hash64A(s.data, s.size, HashUtil::MURMUR_SEED); + } else { + const auto& v = column->get_data(); + value = HashUtil::murmur_hash64A(&v[row_num], sizeof(v[row_num]), HashUtil::MURMUR_SEED); + } + ds_sketch_wrapper->update(value); + } + + void update_batch_single_state_with_frame(const Column* data_column, int64_t frame_start, int64_t frame_end) const { + const ColumnType* column = down_cast(data_column); + if constexpr (lt_is_string) { + uint64_t value = 0; + for (size_t i = frame_start; i < frame_end; ++i) { + Slice s = column->get_slice(i); + value = HashUtil::murmur_hash64A(s.data, s.size, HashUtil::MURMUR_SEED); + + if (value != 0) { + ds_sketch_wrapper->update(value); + } + } + } else { + uint64_t value = 0; + const auto& v = column->get_data(); + for (size_t i = frame_start; i < frame_end; ++i) { + value = HashUtil::murmur_hash64A(&v[i], sizeof(v[i]), HashUtil::MURMUR_SEED); + + if (value != 0) { + ds_sketch_wrapper->update(value); + } + } + } + } + + size_t serialize(uint8_t* dst) const { + return ds_sketch_wrapper->serialize(dst); + } + + size_t serialize_size() const { + return ds_sketch_wrapper->serialize_size(); + } + + void deserialize(const Slice& slice, int64_t* memory_usage) { + ds_sketch_wrapper = std::make_unique(slice, memory_usage); + } + + void get_values(Column* dst, size_t start, size_t end) const { + Int64Column* column = down_cast(dst); + int64_t result = 0L; + if (LIKELY(ds_sketch_wrapper != nullptr)) { + result = ds_sketch_wrapper->estimate_cardinality(); + } + for (size_t i = start; i < end; ++i) { + column->append(result); + } + } + + static std::string getFunName() { + return "ds_theta"; + } +}; + +template , typename T = RunTimeCppType> +class DataSketchesAggregateFunction final + : public AggregateFunctionBatchHelper> { +public: + using ColumnType = RunTimeColumnType; + + void reset(FunctionContext* ctx, const Columns& args, AggDataPtr state) const override { + if (this->data(state).is_inited()) { + ctx->add_mem_usage(-this->data(state).memory_usage); + this->data(state).ds_sketch_wrapper->clear(); + } + } + + void update(FunctionContext* ctx, const Column** columns, AggDataPtr __restrict state, + size_t row_num) const override { + // init state if needed + _init_if_needed(ctx, state); + int64_t prev_memory = this->data(state).memory_usage; + const Column* data_column = ColumnHelper::get_data_column(columns[0]); + this->data(state).update(data_column, row_num); + ctx->add_mem_usage(this->data(state).memory_usage - prev_memory); + } + + void update_batch_single_state_with_frame(FunctionContext* ctx, AggDataPtr __restrict state, const Column** columns, + int64_t peer_group_start, int64_t peer_group_end, int64_t frame_start, + int64_t frame_end) const override { + // init state if needed + _init_if_needed(ctx, state); + int64_t prev_memory = this->data(state).memory_usage; + const Column* data_column = ColumnHelper::get_data_column(columns[0]); + this->data(state).update_batch_single_state_with_frame(data_column, frame_start, frame_end); + ctx->add_mem_usage(this->data(state).memory_usage - prev_memory); + } + + void merge(FunctionContext* ctx, const Column* column, AggDataPtr __restrict state, size_t row_num) const override { + DCHECK(column->is_binary()); + const BinaryColumn* sketch_data_column = down_cast(column); + int64_t prev_memory = this->data(state).memory_usage; + this->data(state).merge(sketch_data_column, row_num); + ctx->add_mem_usage(this->data(state).memory_usage - prev_memory); + } + + void get_values(FunctionContext* ctx, ConstAggDataPtr __restrict state, Column* dst, size_t start, + size_t end) const override { + DCHECK_GT(end, start); + this->data(state).get_values(dst, start, end); + } + + void serialize_to_column([[maybe_unused]] FunctionContext* ctx, ConstAggDataPtr __restrict state, + Column* to) const override { + DCHECK(to->is_binary()); + auto* column = down_cast(to); + if (UNLIKELY(!this->data(state).is_inited())) { + column->append_default(); + } else { + size_t size = this->data(state).serialize_size(); + uint8_t result[size]; + size = this->data(state).serialize(result); + column->append(Slice(result, size)); + } + } + + void convert_to_serialize_format([[maybe_unused]] FunctionContext* ctx, const Columns& src, size_t chunk_size, + ColumnPtr* dst) const override { + auto* result = down_cast((*dst).get()); + + Bytes& bytes = result->get_bytes(); + bytes.reserve(chunk_size * 10); + result->get_offset().resize(chunk_size + 1); + + size_t old_size = bytes.size(); + // convert to const Column* + const auto* data_column = ColumnHelper::get_data_column(src[0].get()); + for (size_t i = 0; i < chunk_size; ++i) { + StateType state; + state.init(ctx); + state.update(data_column, i); + size_t new_size = old_size + state.serialize_size(); + bytes.resize(new_size); + state.serialize(bytes.data() + old_size); + result->get_offset()[i + 1] = new_size; + old_size = new_size; + } + } + + void finalize_to_column(FunctionContext* ctx __attribute__((unused)), ConstAggDataPtr __restrict state, + Column* to) const override { + // this->data(state).finalize_to_column(to); + this->data(state).get_values(to, 0, 1); + } + + std::string get_name() const override { return StateType::getFunName(); } + +private: + // init hll sketch if needed + void _init_if_needed(FunctionContext* ctx, AggDataPtr __restrict state) const { + if (UNLIKELY(!this->data(state).is_inited())) { + this->data(state).init(ctx); + } + } +}; + +} // namespace starrocks diff --git a/be/src/exprs/agg/ds_hll_count_distinct.h b/be/src/exprs/agg/ds_hll_count_distinct.h deleted file mode 100644 index e83914e0de2cc..0000000000000 --- a/be/src/exprs/agg/ds_hll_count_distinct.h +++ /dev/null @@ -1,240 +0,0 @@ -// Copyright 2021-present StarRocks, Inc. All rights reserved. -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// https://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. - -#pragma once - -#include "column/binary_column.h" -#include "column/object_column.h" -#include "column/type_traits.h" -#include "column/vectorized_fwd.h" -#include "exprs/agg/aggregate.h" -#include "gutil/casts.h" -#include "types/hll_sketch.h" - -namespace starrocks { - -struct HLLSketchState { - std::unique_ptr hll_sketch = nullptr; - int64_t memory_usage = 0; -}; - -/** - * RETURN_TYPE: TYPE_BIGINT - * ARGS_TYPE: ALL TYPE - * SERIALIZED_TYPE: TYPE_VARCHAR - */ -template > -class HllSketchAggregateFunction final - : public AggregateFunctionBatchHelper> { -public: - using ColumnType = RunTimeColumnType; - - void reset(FunctionContext* ctx, const Columns& args, AggDataPtr state) const override { - if (this->data(state).hll_sketch != nullptr) { - ctx->add_mem_usage(-this->data(state).hll_sketch->mem_usage()); - this->data(state).hll_sketch->clear(); - } - } - - void update_state(FunctionContext* ctx, AggDataPtr state, uint64_t value) const { - int64_t prev_memory = this->data(state).hll_sketch->mem_usage(); - this->data(state).hll_sketch->update(value); - ctx->add_mem_usage(this->data(state).hll_sketch->mem_usage() - prev_memory); - } - - void update(FunctionContext* ctx, const Column** columns, AggDataPtr __restrict state, - size_t row_num) const override { - // init state if needed - _init_if_needed(ctx, columns, state); - - uint64_t value = 0; - const ColumnType* column = down_cast(columns[0]); - - if constexpr (lt_is_string) { - Slice s = column->get_slice(row_num); - value = HashUtil::murmur_hash64A(s.data, s.size, HashUtil::MURMUR_SEED); - } else { - const auto& v = column->get_data(); - value = HashUtil::murmur_hash64A(&v[row_num], sizeof(v[row_num]), HashUtil::MURMUR_SEED); - } - update_state(ctx, state, value); - } - - void update_batch_single_state_with_frame(FunctionContext* ctx, AggDataPtr __restrict state, const Column** columns, - int64_t peer_group_start, int64_t peer_group_end, int64_t frame_start, - int64_t frame_end) const override { - // init state if needed - _init_if_needed(ctx, columns, state); - const ColumnType* column = down_cast(columns[0]); - if constexpr (lt_is_string) { - uint64_t value = 0; - for (size_t i = frame_start; i < frame_end; ++i) { - Slice s = column->get_slice(i); - value = HashUtil::murmur_hash64A(s.data, s.size, HashUtil::MURMUR_SEED); - - if (value != 0) { - update_state(ctx, state, value); - } - } - } else { - uint64_t value = 0; - const auto& v = column->get_data(); - for (size_t i = frame_start; i < frame_end; ++i) { - value = HashUtil::murmur_hash64A(&v[i], sizeof(v[i]), HashUtil::MURMUR_SEED); - - if (value != 0) { - update_state(ctx, state, value); - } - } - } - } - - void merge(FunctionContext* ctx, const Column* column, AggDataPtr __restrict state, size_t row_num) const override { - DCHECK(column->is_binary()); - const BinaryColumn* hll_column = down_cast(column); - DataSketchesHll hll(hll_column->get(row_num).get_slice(), &(this->data(state).memory_usage)); - if (UNLIKELY(this->data(state).hll_sketch == nullptr)) { - this->data(state).hll_sketch = std::make_unique( - hll.get_lg_config_k(), hll.get_target_type(), &(this->data(state).memory_usage)); - } - int64_t prev_memory = this->data(state).hll_sketch->mem_usage(); - this->data(state).hll_sketch->merge(hll); - ctx->add_mem_usage(this->data(state).hll_sketch->mem_usage() - prev_memory); - } - - void get_values(FunctionContext* ctx, ConstAggDataPtr __restrict state, Column* dst, size_t start, - size_t end) const override { - DCHECK_GT(end, start); - Int64Column* column = down_cast(dst); - int64_t result = 0L; - if (LIKELY(this->data(state).hll_sketch != nullptr)) { - result = this->data(state).hll_sketch->estimate_cardinality(); - } - for (size_t i = start; i < end; ++i) { - column->get_data()[i] = result; - } - } - - void serialize_to_column([[maybe_unused]] FunctionContext* ctx, ConstAggDataPtr __restrict state, - Column* to) const override { - DCHECK(to->is_binary()); - auto* column = down_cast(to); - if (UNLIKELY(this->data(state).hll_sketch == nullptr)) { - column->append_default(); - } else { - size_t size = this->data(state).hll_sketch->serialize_size(); - uint8_t result[size]; - size = this->data(state).hll_sketch->serialize(result); - column->append(Slice(result, size)); - } - } - - void convert_to_serialize_format([[maybe_unused]] FunctionContext* ctx, const Columns& src, size_t chunk_size, - ColumnPtr* dst) const override { - const ColumnType* column = down_cast(src[0].get()); - auto* result = down_cast((*dst).get()); - - Bytes& bytes = result->get_bytes(); - bytes.reserve(chunk_size * 10); - result->get_offset().resize(chunk_size + 1); - - size_t old_size = bytes.size(); - uint64_t value = 0; - uint8_t log_k; - datasketches::target_hll_type tgt_type; - // convert to const Column* - std::vector src_datas; - src_datas.reserve(src.size()); - std::transform(src.begin(), src.end(), std::back_inserter(src_datas), - [](const ColumnPtr& col) { return col.get(); }); - const Column** src_datas_ptr = src_datas.data(); - std::tie(log_k, tgt_type) = _parse_hll_sketch_args(ctx, src_datas_ptr); - for (size_t i = 0; i < chunk_size; ++i) { - int64_t memory_usage = 0; - DataSketchesHll hll{log_k, tgt_type, &memory_usage}; - if constexpr (lt_is_string) { - Slice s = column->get_slice(i); - value = HashUtil::murmur_hash64A(s.data, s.size, HashUtil::MURMUR_SEED); - } else { - auto v = column->get_data()[i]; - value = HashUtil::murmur_hash64A(&v, sizeof(v), HashUtil::MURMUR_SEED); - } - if (value != 0) { - hll.update(value); - } - - size_t new_size = old_size + hll.serialize_size(); - bytes.resize(new_size); - hll.serialize(bytes.data() + old_size); - - result->get_offset()[i + 1] = new_size; - old_size = new_size; - } - } - - void finalize_to_column(FunctionContext* ctx __attribute__((unused)), ConstAggDataPtr __restrict state, - Column* to) const override { - DCHECK(to->is_numeric()); - - auto* column = down_cast(to); - if (UNLIKELY(this->data(state).hll_sketch == nullptr)) { - column->append(0L); - } else { - column->append(this->data(state).hll_sketch->estimate_cardinality()); - } - } - - std::string get_name() const override { return "ds_hll_count_distinct"; } - -private: - // init hll sketch if needed - void _init_if_needed(FunctionContext* ctx, const Column** columns, AggDataPtr __restrict state) const { - if (UNLIKELY(this->data(state).hll_sketch == nullptr)) { - uint8_t log_k; - datasketches::target_hll_type tgt_type; - std::tie(log_k, tgt_type) = _parse_hll_sketch_args(ctx, columns); - this->data(state).hll_sketch = _init_hll_sketch(log_k, tgt_type, &(this->data(state).memory_usage)); - } - } - - // parse log_k and target type from args - std::tuple _parse_hll_sketch_args(FunctionContext* ctx, - const Column** columns) const { - uint8_t log_k = DEFAULT_HLL_LOG_K; - datasketches::target_hll_type tgt_type = datasketches::HLL_6; - if (ctx->get_num_args() == 2) { - log_k = (uint8_t)(columns[1]->get(0).get_int32()); - } else if (ctx->get_num_args() == 3) { - log_k = (uint8_t)(columns[1]->get(0).get_int32()); - std::string tgt_type_str = columns[2]->get(0).get_slice().to_string(); - std::transform(tgt_type_str.begin(), tgt_type_str.end(), tgt_type_str.begin(), ::toupper); - if (tgt_type_str == "HLL_4") { - tgt_type = datasketches::HLL_4; - } else if (tgt_type_str == "HLL_8") { - tgt_type = datasketches::HLL_8; - } else { - tgt_type = datasketches::HLL_6; - } - } - return {log_k, tgt_type}; - } - - // init hll sketch with default log_k and target type - std::unique_ptr _init_hll_sketch(uint8_t log_k, datasketches::target_hll_type tgt_type, - int64_t* memory_usage) const { - return std::make_unique(log_k, tgt_type, memory_usage); - } -}; - -} // namespace starrocks diff --git a/be/src/exprs/agg/factory/aggregate_factory.hpp b/be/src/exprs/agg/factory/aggregate_factory.hpp index e96ed6a55bf59..7bd582e26e71d 100644 --- a/be/src/exprs/agg/factory/aggregate_factory.hpp +++ b/be/src/exprs/agg/factory/aggregate_factory.hpp @@ -31,7 +31,7 @@ #include "exprs/agg/count.h" #include "exprs/agg/covariance.h" #include "exprs/agg/distinct.h" -#include "exprs/agg/ds_hll_count_distinct.h" +#include "exprs/agg/ds_agg.h" #include "exprs/agg/exchange_perf.h" #include "exprs/agg/group_concat.h" #include "exprs/agg/histogram.h" @@ -188,9 +188,6 @@ class AggregateFactory { template static AggregateFunctionPtr MakeHllNdvAggregateFunction(); - template - static AggregateFunctionPtr MakeHllSketchAggregateFunction(); - template static AggregateFunctionPtr MakeHllRawAggregateFunction(); @@ -259,6 +256,9 @@ class AggregateFactory { template static auto MakeRetractMaxAggregateFunction(); + + template + static AggregateFunctionPtr MakeDataSketchesAggregateFunction(); }; // The function should be placed by alphabetical order @@ -394,11 +394,6 @@ AggregateFunctionPtr AggregateFactory::MakeHllNdvAggregateFunction() { return std::make_shared>(); } -template -AggregateFunctionPtr AggregateFactory::MakeHllSketchAggregateFunction() { - return std::make_shared>(); -} - template AggregateFunctionPtr AggregateFactory::MakeHllRawAggregateFunction() { return std::make_shared>(); @@ -442,4 +437,9 @@ auto AggregateFactory::MakeRetractMaxAggregateFunction() { MaxElement>>>(); } +template +AggregateFunctionPtr AggregateFactory::MakeDataSketchesAggregateFunction() { + return std::make_shared>(); +} + } // namespace starrocks diff --git a/be/src/exprs/agg/factory/aggregate_resolver_approx.cpp b/be/src/exprs/agg/factory/aggregate_resolver_approx.cpp index b5ba782e6fbe1..ee5faf98999ac 100644 --- a/be/src/exprs/agg/factory/aggregate_resolver_approx.cpp +++ b/be/src/exprs/agg/factory/aggregate_resolver_approx.cpp @@ -38,9 +38,6 @@ struct HLLUnionBuilder { resolver->add_aggregate_mapping( "approx_count_distinct", false, AggregateFactory::MakeHllNdvAggregateFunction()); - - resolver->add_aggregate_mapping_variadic( - "ds_hll_count_distinct", false, AggregateFactory::MakeHllSketchAggregateFunction()); } } }; @@ -57,10 +54,35 @@ struct ApproxTopKBuilder { } }; +struct DataSketchesBuilder { + template + void operator()(AggregateFuncResolver* resolver) { + if constexpr (lt_is_fixedlength || lt_is_string) { + resolver->add_aggregate_mapping>( + "ds_hll_count_distinct", false, + AggregateFactory::MakeDataSketchesAggregateFunction()); + resolver->add_aggregate_mapping>( + "ds_theta", false, + AggregateFactory::MakeDataSketchesAggregateFunction()); + } + if constexpr (lt_is_integer || lt_is_float) { + resolver->add_aggregate_mapping>( + "ds_quantile", false, + AggregateFactory::MakeDataSketchesAggregateFunction()); + } + if constexpr (lt_is_integer || lt_is_float || lt_is_string) { + resolver->add_aggregate_mapping>( + "ds_frequent", false, + AggregateFactory::MakeDataSketchesAggregateFunction()); + } + } +}; + void AggregateFuncResolver::register_approx() { for (auto type : aggregate_types()) { type_dispatch_all(type, HLLUnionBuilder(), this); type_dispatch_all(type, ApproxTopKBuilder(), this); + type_dispatch_all(type, DataSketchesBuilder(), this); } add_aggregate_mapping("hll_union", false, AggregateFactory::MakeHllUnionAggregateFunction()); diff --git a/be/src/types/CMakeLists.txt b/be/src/types/CMakeLists.txt index 956737d93a7e3..fd0e53a8c3deb 100644 --- a/be/src/types/CMakeLists.txt +++ b/be/src/types/CMakeLists.txt @@ -18,8 +18,8 @@ add_library(Types STATIC array_type_info.cpp bitmap_value.cpp date_value.cpp + ds_sketch.cpp hll.cpp - hll_sketch.cpp logical_type.cpp map_type_info.cpp struct_type_info.cpp diff --git a/be/src/types/hll_sketch.cpp b/be/src/types/ds_sketch.cpp similarity index 97% rename from be/src/types/hll_sketch.cpp rename to be/src/types/ds_sketch.cpp index 9d19060d69290..8d77a62c973bf 100644 --- a/be/src/types/hll_sketch.cpp +++ b/be/src/types/ds_sketch.cpp @@ -15,7 +15,7 @@ // specific language governing permissions and limitations // under the License. -#include "types/hll_sketch.h" +#include "types/ds_sketch.h" #include "common/logging.h" #include "runtime/mem_pool.h" @@ -54,9 +54,6 @@ void DataSketchesHll::merge(const DataSketchesHll& other) { _sketch_union = std::make_unique(other.get_lg_config_k(), alloc_type(_memory_usage)); } auto o_sketch = other.get_hll_sketch(); - if (o_sketch == nullptr) { - return; - } _sketch_union->update(*o_sketch); this->mark_changed(); } diff --git a/be/src/types/ds_sketch.h b/be/src/types/ds_sketch.h new file mode 100644 index 0000000000000..523db2b97f5b9 --- /dev/null +++ b/be/src/types/ds_sketch.h @@ -0,0 +1,566 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#pragma once + +#include +#include + +#include +#include +#include +#include "runtime/memory/counting_allocator.h" +#include "runtime/memory/mem_chunk.h" +#include "runtime/memory/mem_chunk_allocator.h" +#include "util/slice.h" + +#undef IS_BIG_ENDIAN +#include +#include + +namespace starrocks { + +class DataSketchesHll { +public: + using alloc_type = STLCountingAllocator; + using hll_sketch_type = datasketches::hll_sketch_alloc; + using hll_union_type = datasketches::hll_union_alloc; + // default lg_k value for HLL + static const datasketches::target_hll_type DEFAULT_HLL_TGT_TYPE = datasketches::HLL_6; + + explicit DataSketchesHll(uint8_t log_k, datasketches::target_hll_type tgt_type, int64_t* memory_usage) + : _memory_usage(memory_usage), _tgt_type(tgt_type) { + this->_sketch_union = std::make_unique(log_k, alloc_type(_memory_usage)); + } + + DataSketchesHll(const DataSketchesHll& other) = delete; + DataSketchesHll& operator=(const DataSketchesHll& other) = delete; + + DataSketchesHll(DataSketchesHll&& other) noexcept + : _memory_usage(std::move(other._memory_usage)), + _sketch_union(std::move(other._sketch_union)), + _tgt_type(other._tgt_type) {} + DataSketchesHll& operator=(DataSketchesHll&& other) noexcept { + if (this != &other) { + this->_memory_usage = std::move(other._memory_usage); + this->_sketch_union = std::move(other._sketch_union); + this->_tgt_type = other._tgt_type; + } + return *this; + } + + explicit DataSketchesHll(const Slice& src, int64_t* memory_usage); + + ~DataSketchesHll() = default; + + // Returns sketch's configured lg_k value. + uint8_t get_lg_config_k() const { + if (UNLIKELY(_sketch_union == nullptr)) { + return DEFAULT_HLL_LOG_K; + } + return _sketch_union->get_lg_config_k(); + } + + // Returns the sketch's target HLL mode (from #target_hll_type). + datasketches::target_hll_type get_target_type() const { + if (UNLIKELY(_sketch_union == nullptr)) { + return DEFAULT_HLL_TGT_TYPE; + } + return _sketch_union->get_target_type(); + } + + // Add a hash value to this HLL value + // NOTE: input must be a hash_value + void update(uint64_t hash_value); + + // merge with other HLL value + void merge(const DataSketchesHll& other); + + // Return max size of serialized binary + size_t max_serialized_size() const; + int64_t mem_usage() const { return _memory_usage == nullptr ? 0L : *_memory_usage; } + + // Input slice should have enough capacity for serialize, which + // can be got through max_serialized_size(). If insufficient buffer + // is given, this will cause process crash. + // Return actual size of serialized binary. + size_t serialize(uint8_t* dst) const; + + // Now, only empty HLL support this funciton. + bool deserialize(const Slice& slice); + + int64_t estimate_cardinality() const; + + // No need to check is_valid for datasketches HLL, + // return ture for compatibility. + static bool is_valid(const Slice& slice); + + // only for debug + std::string to_string() const; + + uint64_t serialize_size() const; + + // common interface + void clear() { + if (_sketch_union != nullptr) { + _sketch_union->reset(); + _is_changed = true; // Mark as changed after reset + } + } + + // get hll_sketch object which is lazy initialized + hll_sketch_type* get_hll_sketch() const { + if (_is_changed) { + if (_sketch_union == nullptr) { + return nullptr; + } + _sketch = std::make_unique(_sketch_union->get_result(_tgt_type)); + _is_changed = false; + } + return _sketch.get(); + } + + inline void mark_changed() { _is_changed = true; } + +private: + int64_t* _memory_usage; + std::unique_ptr _sketch_union = nullptr; + datasketches::target_hll_type _tgt_type = DEFAULT_HLL_TGT_TYPE; + // lazy value of union state + mutable std::unique_ptr _sketch = nullptr; + mutable bool _is_changed = true; +}; + +template +class DataSketchesQuantile { +public: + using alloc_type = STLCountingAllocator; + using quantile_sketch_type = datasketches::quantiles_sketch, alloc_type>; + + explicit DataSketchesQuantile(uint16_t k, int64_t* memory_usage) + : _memory_usage(memory_usage) { + this->_sketch = std::make_unique(k, std::less(), alloc_type(_memory_usage)); + } + + DataSketchesQuantile(const DataSketchesQuantile& other) = delete; + DataSketchesQuantile& operator=(const DataSketchesQuantile& other) = delete; + + DataSketchesQuantile(DataSketchesQuantile&& other) noexcept + : _memory_usage(std::move(other._memory_usage)), + _sketch(std::move(other._sketch)) {} + DataSketchesQuantile& operator=(DataSketchesQuantile&& other) noexcept { + if (this != &other) { + this->_memory_usage = std::move(other._memory_usage); + this->_sketch = std::move(other._sketch); + } + return *this; + } + + explicit DataSketchesQuantile(const Slice& src, int64_t* memory_usage) : _memory_usage(memory_usage) { + if (!deserialize(src)) { + LOG(WARNING) << "Failed to init DataSketchesQuantile from slice, will be reset to 0."; + } + } + + ~DataSketchesQuantile() = default; + + uint16_t get_k() const { + return _sketch->get_k(); + } + + void update(T value) { + _sketch->update(value); + } + + void merge(const DataSketchesQuantile& other) { + if (UNLIKELY(_sketch == nullptr)) { + _sketch = std::make_unique(other._sketch->get_k(), std::less(), alloc_type(_memory_usage)); + } + _sketch.get()->merge(*other._sketch); + } + + int64_t mem_usage() const { return _memory_usage == nullptr ? 0L : *_memory_usage; } + + size_t serialize(uint8_t* dst) const { + if (_sketch == nullptr) { + return 0; + } + auto serialize_compact = _sketch->serialize(); + std::copy(serialize_compact.begin(), serialize_compact.end(), dst); + return _sketch->get_serialized_size_bytes(); + } + + uint64_t serialize_size() const { + if (_sketch == nullptr) { + return 0; + } + return _sketch->get_serialized_size_bytes(); + } + + bool deserialize(const Slice& slice) { + DCHECK(_sketch == nullptr); + + if (!is_valid(slice)) { + return false; + } + try { + _sketch = std::make_unique( + quantile_sketch_type::deserialize((uint8_t*)slice.data, slice.size, datasketches::serde(), + std::less(), alloc_type(_memory_usage))); + } catch (std::logic_error& e) { + LOG(WARNING) << "DataSketchesQuantile deserialize error: " << e.what(); + return false; + } + return true; + } + + std::vector get_quantiles(const double* ranks, uint32_t size) const { + std::vector result; + if (_sketch == nullptr) { + return result; + } + try { + std::vector quantiles = _sketch->get_quantiles(ranks, size); + for (T quantile : quantiles) { + result.push_back(quantile); + } + } catch (std::logic_error& e) { + LOG(WARNING) << "DataSketchesQuantile get_quantiles error: " << e.what(); + result.clear(); + } + return result; + } + + static bool is_valid(const Slice& slice) { + if (slice.size < 1) { + return false; + } + return true; + } + + void clear() { + *_memory_usage = 0; + this->_sketch = std::make_unique(_sketch->get_k(), std::less(), alloc_type(_memory_usage)); + } + + std::string to_string() const { + if (_sketch == nullptr) { + return ""; + } + datasketches::string str = _sketch->to_string(); + return std::string(str.begin(), str.end()); + } + + private: + int64_t* _memory_usage; + mutable std::unique_ptr _sketch = nullptr; +}; + +template +struct FrequentRow { + T value; + uint64_t count; + uint64_t lower_bound; + uint64_t upper_bound; +}; + +template +class DataSketchesFrequent { +public: + using alloc_type = STLCountingAllocator; + using frequent_sketch_type = datasketches::frequent_items_sketch, std::equal_to, alloc_type>; + + explicit DataSketchesFrequent(uint8_t lg_max_map_size, uint8_t lg_start_map_size, int64_t* memory_usage) + : _memory_usage(memory_usage), _lg_max_map_size(lg_max_map_size) , _lg_start_map_size(lg_start_map_size){ + _sketch = std::make_unique( + _lg_max_map_size, _lg_start_map_size, std::equal_to(), alloc_type(_memory_usage)); + } + + DataSketchesFrequent(const DataSketchesFrequent& other) = delete; + DataSketchesFrequent& operator=(const DataSketchesFrequent& other) = delete; + + DataSketchesFrequent(DataSketchesFrequent&& other) noexcept + : _memory_usage(std::move(other._memory_usage)), _lg_max_map_size(other._lg_max_map_size) , + _lg_start_map_size(other._lg_start_map_size), _sketch(std::move(other._sketch)) {} + + DataSketchesFrequent& operator=(DataSketchesFrequent&& other) noexcept { + if (this != &other) { + this->_memory_usage = std::move(other._memory_usage); + this->_lg_max_map_size = other._lg_max_map_size; + this->_lg_start_map_size = other._lg_start_map_size; + this->_sketch = std::move(other._sketch); + } + return *this; + } + + explicit DataSketchesFrequent(const Slice& src, uint8_t lg_max_map_size, uint8_t lg_start_map_size, int64_t* memory_usage) + : _memory_usage(memory_usage), _lg_max_map_size(lg_max_map_size), _lg_start_map_size(lg_start_map_size) { + if (!deserialize(src)) { + LOG(WARNING) << "Failed to init DataSketchesFrequent from slice, will be reset to 0."; + } + } + + ~DataSketchesFrequent() = default; + + void update(T value) { + uint32_t old_active_items = _sketch->get_num_active_items(); + _sketch->update(value); + uint32_t new_active_items = _sketch->get_num_active_items(); + if (old_active_items != new_active_items) { + // *_memory_usage = *_memory_usage + sizeof(T); + } + } + + void merge(const DataSketchesFrequent& other) { + if (UNLIKELY(_sketch == nullptr)) { + _sketch = std::make_unique( + _lg_max_map_size, _lg_start_map_size, std::equal_to(), alloc_type(_memory_usage)); + } + _sketch.get()->merge(*other._sketch); + } + + int64_t mem_usage() const { return _memory_usage == nullptr ? 0L : *_memory_usage; } + + size_t serialize(uint8_t* dst) const { + if (_sketch == nullptr) { + return 0; + } + auto serialize_compact = _sketch->serialize(); + std::copy(serialize_compact.begin(), serialize_compact.end(), dst); + return _sketch->get_serialized_size_bytes(); + } + + uint64_t serialize_size() const { + if (_sketch == nullptr) { + return 0; + } + return _sketch->get_serialized_size_bytes(); + } + + bool deserialize(const Slice& slice) { + DCHECK(_sketch == nullptr); + + if (!is_valid(slice)) { + return false; + } + try { + _sketch = std::make_unique( + frequent_sketch_type::deserialize((uint8_t*)slice.data, slice.size, datasketches::serde(), + std::equal_to(), alloc_type(_memory_usage))); + } catch (std::logic_error& e) { + LOG(WARNING) << "DataSketchesFrequent deserialize error: " << e.what(); + return false; + } + return true; + } + + std::vector> get_frequent_items(uint64_t threshold) const { + std::vector> result; + if (_sketch == nullptr) { + return result; + } + try { + auto frequent_items = _sketch->get_frequent_items(datasketches::NO_FALSE_POSITIVES, threshold); + for (auto item : frequent_items) { + FrequentRow frequent_row = FrequentRow {item.get_item(), item.get_estimate(), item.get_lower_bound(), + item.get_upper_bound()}; + result.push_back(frequent_row); + } + } catch (std::logic_error& e) { + LOG(WARNING) << "DataSketchesFrequent get_quantiles error: " << e.what(); + result.clear(); + } + return result; + } + + static bool is_valid(const Slice& slice) { + if (slice.size < 1) { + return false; + } + return true; + } + + void clear() { + *_memory_usage = 0; + this->_sketch = std::make_unique( + _lg_max_map_size, _lg_start_map_size, std::equal_to(), alloc_type(_memory_usage)); + } + + std::string to_string() const { + if (_sketch == nullptr) { + return ""; + } + datasketches::string str = _sketch->to_string(); + return std::string(str.begin(), str.end()); + } + +private: + int64_t* _memory_usage; + uint8_t _lg_max_map_size; + uint8_t _lg_start_map_size; + mutable std::unique_ptr _sketch = nullptr; +}; + +class DataSketchesTheta { +public: + using alloc_type = STLCountingAllocator; + using theta_sketch_type = datasketches::update_theta_sketch_alloc; + using theta_union_type = datasketches::theta_union_alloc; + using theta_wrapped_type = datasketches::wrapped_compact_theta_sketch_alloc; + using sketch_data_alloc_type = typename std::allocator_traits::template rebind_alloc; + using sketch_data_type = std::vector; + + explicit DataSketchesTheta(int64_t* memory_usage) : _memory_usage(memory_usage) { + _sketch = std::make_unique(theta_sketch_type::builder(alloc_type(_memory_usage)).build()); + } + + DataSketchesTheta(const DataSketchesTheta& other) = delete; + DataSketchesTheta& operator=(const DataSketchesTheta& other) = delete; + + DataSketchesTheta(DataSketchesTheta&& other) noexcept + : _memory_usage(std::move(other._memory_usage)), _sketch(std::move(other._sketch)) { + if (other._sketch_union != nullptr) { + this->_sketch_union = std::move(other._sketch_union); + } + } + + DataSketchesTheta& operator=(DataSketchesTheta&& other) noexcept { + if (this != &other) { + this->_memory_usage = std::move(other._memory_usage); + this->_sketch = std::move(other._sketch); + if (other._sketch_union != nullptr) { + this->_sketch_union = std::move(other._sketch_union); + } + } + return *this; + } + + explicit DataSketchesTheta(const Slice& src, int64_t* memory_usage) + : _memory_usage(memory_usage) { + if (!deserialize(src)) { + LOG(WARNING) << "Failed to init DataSketchesFrequent from slice, will be reset to 0."; + } + } + + ~DataSketchesTheta() = default; + + void update(uint64_t hash_value) { + _sketch->update(hash_value); + _is_changed = true; + } + + void merge(const DataSketchesTheta& other) { + if (_sketch_union == nullptr) { + _sketch_union = std::make_unique(theta_union_type::builder(alloc_type(_memory_usage)).build()); + } + _sketch_union->update(other._sketch->compact()); + if (other._sketch_union != nullptr) { + _sketch_union->update(other._sketch_union->get_result()); + } + _is_changed = true; + } + + int64_t mem_usage() const { return _memory_usage == nullptr ? 0L : *_memory_usage; } + + size_t serialize(uint8_t* dst) const { + serialize_if_needed(); + std::copy(_sketch_data->begin(), _sketch_data->end(), dst); + return _sketch_data->size(); + } + + uint64_t serialize_size() const { + serialize_if_needed(); + return _sketch_data->size(); + } + + void serialize_if_needed() const { + if (UNLIKELY(_sketch == nullptr)) { + _sketch = std::make_unique(theta_sketch_type::builder(alloc_type(_memory_usage)).build()); + } + if (_is_changed) { + auto resultTheta_union = theta_union_type(theta_union_type::builder(alloc_type(_memory_usage)).build()); + resultTheta_union.update(_sketch->compact()); + if (_sketch_union != nullptr) { + resultTheta_union.update(_sketch_union->get_result()); + } + auto sketch_ser = resultTheta_union.get_result().serialize(); + _sketch_data = std::make_unique(sketch_data_type(sketch_ser.begin(),sketch_ser.end(), sketch_ser.get_allocator())); + _is_changed = false; + } + } + + bool deserialize(const Slice& slice) { + if (!is_valid(slice)) { + return false; + } + DCHECK(_sketch == nullptr); + _sketch = std::make_unique(theta_sketch_type::builder(alloc_type(_memory_usage)).build()); + try { + auto sketch_warp = theta_wrapped_type::wrap((uint8_t*)slice.data, slice.size); + if (_sketch_union == nullptr) { + _sketch_union = std::make_unique(theta_union_type::builder(alloc_type(_memory_usage)).build()); + } + _sketch_union->update(sketch_warp); + } catch (std::logic_error& e) { + LOG(WARNING) << "DataSketchesFrequent deserialize error: " << e.what(); + return false; + } + return true; + } + + static bool is_valid(const Slice& slice) { + if (slice.size < 1) { + return false; + } + return true; + } + + int64_t estimate_cardinality() const { + if (_sketch == nullptr && _sketch_union == nullptr) { + return 0; + } + if (_sketch_union == nullptr) { + return _sketch->get_estimate(); + } else { + auto resultTheta_union = theta_union_type(theta_union_type::builder(alloc_type(_memory_usage)).build()); + resultTheta_union.update(_sketch_union->get_result()); + if (_sketch != nullptr) { + resultTheta_union.update(_sketch->compact()); + } + return resultTheta_union.get_result().get_estimate(); + } + } + + void clear() { + if (_sketch != nullptr) { + _sketch->reset(); + } + + if (_sketch_union != nullptr) { + _sketch_union.reset(); + } + } + +private: + int64_t* _memory_usage; + mutable std::unique_ptr _sketch = nullptr; + mutable std::unique_ptr _sketch_union = nullptr; + mutable std::unique_ptr _sketch_data = nullptr; + mutable bool _is_changed = true; +}; + +} // namespace starrocks diff --git a/be/src/types/hll_sketch.h b/be/src/types/hll_sketch.h deleted file mode 100644 index b4db90268f7d7..0000000000000 --- a/be/src/types/hll_sketch.h +++ /dev/null @@ -1,143 +0,0 @@ -// Licensed to the Apache Software Foundation (ASF) under one -// or more contributor license agreements. See the NOTICE file -// distributed with this work for additional information -// regarding copyright ownership. The ASF licenses this file -// to you under the Apache License, Version 2.0 (the -// "License"); you may not use this file except in compliance -// with the License. You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, -// software distributed under the License is distributed on an -// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -// KIND, either express or implied. See the License for the -// specific language governing permissions and limitations -// under the License. - -#pragma once - -#include -#include - -#include "datasketches/hll.hpp" -#include "runtime/memory/counting_allocator.h" -#include "runtime/memory/mem_chunk.h" -#include "runtime/memory/mem_chunk_allocator.h" - -namespace starrocks { - -class Slice; - -class DataSketchesHll { -public: - using alloc_type = STLCountingAllocator; - using hll_sketch_type = datasketches::hll_sketch_alloc; - using hll_union_type = datasketches::hll_union_alloc; - // default lg_k value for HLL - static const datasketches::target_hll_type DEFAULT_HLL_TGT_TYPE = datasketches::HLL_6; - - explicit DataSketchesHll(uint8_t log_k, datasketches::target_hll_type tgt_type, int64_t* memory_usage) - : _memory_usage(memory_usage), _tgt_type(tgt_type) { - this->_sketch_union = std::make_unique(log_k, alloc_type(_memory_usage)); - } - - DataSketchesHll(const DataSketchesHll& other) = delete; - DataSketchesHll& operator=(const DataSketchesHll& other) = delete; - - DataSketchesHll(DataSketchesHll&& other) noexcept - : _memory_usage(std::move(other._memory_usage)), - _sketch_union(std::move(other._sketch_union)), - _tgt_type(other._tgt_type) {} - DataSketchesHll& operator=(DataSketchesHll&& other) noexcept { - if (this != &other) { - this->_memory_usage = std::move(other._memory_usage); - this->_sketch_union = std::move(other._sketch_union); - this->_tgt_type = other._tgt_type; - } - return *this; - } - - explicit DataSketchesHll(const Slice& src, int64_t* memory_usage); - - ~DataSketchesHll() = default; - - // Returns sketch's configured lg_k value. - uint8_t get_lg_config_k() const { - if (UNLIKELY(_sketch_union == nullptr)) { - return DEFAULT_HLL_LOG_K; - } - return _sketch_union->get_lg_config_k(); - } - - // Returns the sketch's target HLL mode (from #target_hll_type). - datasketches::target_hll_type get_target_type() const { - if (UNLIKELY(_sketch_union == nullptr)) { - return DEFAULT_HLL_TGT_TYPE; - } - return _sketch_union->get_target_type(); - } - - // Add a hash value to this HLL value - // NOTE: input must be a hash_value - void update(uint64_t hash_value); - - // merge with other HLL value - void merge(const DataSketchesHll& other); - - // Return max size of serialized binary - size_t max_serialized_size() const; - int64_t mem_usage() const { return _memory_usage == nullptr ? 0L : *_memory_usage; } - - // Input slice should have enough capacity for serialize, which - // can be got through max_serialized_size(). If insufficient buffer - // is given, this will cause process crash. - // Return actual size of serialized binary. - size_t serialize(uint8_t* dst) const; - - // Now, only empty HLL support this funciton. - bool deserialize(const Slice& slice); - - int64_t estimate_cardinality() const; - - // No need to check is_valid for datasketches HLL, - // return ture for compatibility. - static bool is_valid(const Slice& slice); - - // only for debug - std::string to_string() const; - - uint64_t serialize_size() const; - - // common interface - void clear() { - if (_sketch_union != nullptr) { - _sketch_union->reset(); - _is_changed = true; // Mark as changed after reset - } - } - - // get hll_sketch object which is lazy initialized - hll_sketch_type* get_hll_sketch() const { - if (_is_changed) { - if (_sketch_union == nullptr) { - return nullptr; - } - _sketch = std::make_unique(_sketch_union->get_result(_tgt_type)); - _is_changed = false; - } - return _sketch.get(); - } - - inline void mark_changed() { _is_changed = true; } - -private: - int64_t* _memory_usage; - std::unique_ptr _sketch_union = nullptr; - datasketches::target_hll_type _tgt_type = DEFAULT_HLL_TGT_TYPE; - // lazy value of union state - mutable std::unique_ptr _sketch = nullptr; - mutable bool _is_changed = true; -}; - -} // namespace starrocks diff --git a/docs/en/sql-reference/sql-functions/aggregate-functions/ds_hll.md b/docs/en/sql-reference/sql-functions/aggregate-functions/ds_hll.md new file mode 100644 index 0000000000000..cfe5d9025ff97 --- /dev/null +++ b/docs/en/sql-reference/sql-functions/aggregate-functions/ds_hll.md @@ -0,0 +1,26 @@ +# DS_HLL + + + +Returns the approximate value of aggregate function similar to the result of COUNT(DISTINCT col). Like APPROX_COUNT_DISTINCT(expr). + +It is faster than the COUNT and DISTINCT combination and uses a fixed-size memory, so less memory is used for columns of high cardinality. + +It is slower than APPROX_COUNT_DISTINCT(expr) but with higher precision. Which takes advantages of Apache Datasketches. + +## Syntax + +```Haskell +DS_HLL(expr) +``` + +## Examples + +```plain text +MySQL > select DS_HLL(query_id) from log_statis group by datetime; ++-----------------------------------+ +| DS_HLL(`query_id`) | ++-----------------------------------+ +| 17721 | ++-----------------------------------+ +``` diff --git a/fe/fe-core/src/main/java/com/starrocks/catalog/FunctionSet.java b/fe/fe-core/src/main/java/com/starrocks/catalog/FunctionSet.java index b321fe0ba41d4..7317517036329 100644 --- a/fe/fe-core/src/main/java/com/starrocks/catalog/FunctionSet.java +++ b/fe/fe-core/src/main/java/com/starrocks/catalog/FunctionSet.java @@ -253,6 +253,10 @@ public class FunctionSet { public static final String APPROX_COUNT_DISTINCT = "approx_count_distinct"; public static final String APPROX_COUNT_DISTINCT_HLL_SKETCH = "approx_count_distinct_hll_sketch"; public static final String DS_HLL_COUNT_DISTINCT = "ds_hll_count_distinct"; + public static final String DS_HLL = "ds_hll"; + public static final String DS_QUANTILE = "ds_quantile"; + public static final String DS_FREQUENT = "ds_frequent"; + public static final String DS_THETA = "ds_theta"; public static final String APPROX_TOP_K = "approx_top_k"; public static final String AVG = "avg"; public static final String COUNT = "count"; @@ -623,6 +627,9 @@ public class FunctionSet { .add(FunctionSet.UTC_TIMESTAMP) .add(FunctionSet.MD5_SUM) .add(FunctionSet.DS_HLL_COUNT_DISTINCT) + .add(FunctionSet.DS_QUANTILE) + .add(FunctionSet.DS_FREQUENT) + .add(FunctionSet.DS_THETA) .add(FunctionSet.MD5_SUM_NUMERIC) .add(FunctionSet.BITMAP_EMPTY) .add(FunctionSet.HLL_EMPTY) @@ -1116,19 +1123,6 @@ private void initAggregateBuiltins() { Lists.newArrayList(t), Type.BIGINT, Type.VARBINARY, true, false, true)); - // ds_hll_count_distinct(col) - addBuiltin(AggregateFunction.createBuiltin(DS_HLL_COUNT_DISTINCT, - Lists.newArrayList(t), Type.BIGINT, Type.VARBINARY, - true, false, true)); - // ds_hll_count_distinct(col, log_k) - addBuiltin(AggregateFunction.createBuiltin(DS_HLL_COUNT_DISTINCT, - Lists.newArrayList(t, Type.INT), Type.BIGINT, Type.VARBINARY, - true, false, true)); - // ds_hll_count_distinct(col, log_k, tgt_type) - addBuiltin(AggregateFunction.createBuiltin(DS_HLL_COUNT_DISTINCT, - Lists.newArrayList(t, Type.INT, Type.VARCHAR), Type.BIGINT, Type.VARBINARY, - true, false, true)); - // HLL_RAW addBuiltin(AggregateFunction.createBuiltin(HLL_RAW, Lists.newArrayList(t), Type.HLL, Type.VARBINARY, @@ -1317,6 +1311,9 @@ private void initAggregateBuiltins() { // causal inference functions. registerBuiltinHypothesisTestingFunctions(); + + // DataSketches functions. + registerBuiltinDsFunction(); } private void registerBuiltinHypothesisTestingFunctions() { @@ -1565,6 +1562,124 @@ private void registerBuiltinApproxTopKWindowFunction() { registerBuiltinForTypes.accept(Type.DATE_TYPES); } + private void registerBuiltinDsFunction() { + for (Type t : Type.getSupportedTypes()) { + if (t.isFunctionType()) { + continue; + } + if (t.isNull()) { + continue; // NULL is handled through type promotion. + } + if (t.isChar()) { + continue; // promoted to STRING + } + // ds_hll_count_distinct(col) + addBuiltin(AggregateFunction.createBuiltin(DS_HLL_COUNT_DISTINCT, + Lists.newArrayList(t), Type.BIGINT, Type.VARBINARY, + true, false, true)); + // ds_hll_count_distinct(col, log_k) + addBuiltin(AggregateFunction.createBuiltin(DS_HLL_COUNT_DISTINCT, + Lists.newArrayList(t, Type.INT), Type.BIGINT, Type.VARBINARY, + true, false, true)); + // ds_hll_count_distinct(col, log_k, tgt_type) + addBuiltin(AggregateFunction.createBuiltin(DS_HLL_COUNT_DISTINCT, + Lists.newArrayList(t, Type.INT, Type.VARCHAR), Type.BIGINT, Type.VARBINARY, + true, false, true)); + // ds_theta(col) + addBuiltin(AggregateFunction.createBuiltin(DS_THETA, + Lists.newArrayList(t), Type.BIGINT, Type.VARBINARY, + true, false, true)); + } + + addBuiltin(AggregateFunction.createBuiltin(DS_QUANTILE, + Lists.newArrayList(Type.DOUBLE), Type.ARRAY_DOUBLE, Type.VARBINARY, + false, false, true)); + addBuiltin(AggregateFunction.createBuiltin(DS_QUANTILE, + Lists.newArrayList(Type.FLOAT), Type.ARRAY_FLOAT, Type.VARBINARY, + false, false, true)); + addBuiltin(AggregateFunction.createBuiltin(DS_QUANTILE, + Lists.newArrayList(Type.INT), Type.ARRAY_INT, Type.VARBINARY, + false, false, true)); + addBuiltin(AggregateFunction.createBuiltin(DS_QUANTILE, + Lists.newArrayList(Type.BIGINT), Type.ARRAY_BIGINT, Type.VARBINARY, + false, false, true)); + + addBuiltin(AggregateFunction.createBuiltin(DS_QUANTILE, + Lists.newArrayList(Type.DOUBLE, Type.DOUBLE), Type.ARRAY_DOUBLE, Type.VARBINARY, + false, false, true)); + addBuiltin(AggregateFunction.createBuiltin(DS_QUANTILE, + Lists.newArrayList(Type.FLOAT, Type.DOUBLE), Type.ARRAY_FLOAT, Type.VARBINARY, + false, false, true)); + addBuiltin(AggregateFunction.createBuiltin(DS_QUANTILE, + Lists.newArrayList(Type.INT, Type.DOUBLE), Type.ARRAY_INT, Type.VARBINARY, + false, false, true)); + addBuiltin(AggregateFunction.createBuiltin(DS_QUANTILE, + Lists.newArrayList(Type.BIGINT, Type.DOUBLE), Type.ARRAY_BIGINT, Type.VARBINARY, + false, false, true)); + + addBuiltin(AggregateFunction.createBuiltin(DS_QUANTILE, + Lists.newArrayList(Type.DOUBLE, Type.DOUBLE, Type.INT), Type.ARRAY_DOUBLE, Type.VARBINARY, + false, false, true)); + addBuiltin(AggregateFunction.createBuiltin(DS_QUANTILE, + Lists.newArrayList(Type.FLOAT, Type.DOUBLE, Type.INT), Type.ARRAY_FLOAT, Type.VARBINARY, + false, false, true)); + addBuiltin(AggregateFunction.createBuiltin(DS_QUANTILE, + Lists.newArrayList(Type.INT, Type.DOUBLE, Type.INT), Type.ARRAY_INT, Type.VARBINARY, + false, false, true)); + addBuiltin(AggregateFunction.createBuiltin(DS_QUANTILE, + Lists.newArrayList(Type.BIGINT, Type.DOUBLE, Type.INT), Type.ARRAY_BIGINT, Type.VARBINARY, + false, false, true)); + + addBuiltin(AggregateFunction.createBuiltin(DS_QUANTILE, + Lists.newArrayList(Type.DOUBLE, Type.ARRAY_DOUBLE), Type.ARRAY_DOUBLE, Type.VARBINARY, + false, false, true)); + addBuiltin(AggregateFunction.createBuiltin(DS_QUANTILE, + Lists.newArrayList(Type.FLOAT, Type.ARRAY_DOUBLE), Type.ARRAY_FLOAT, Type.VARBINARY, + false, false, true)); + addBuiltin(AggregateFunction.createBuiltin(DS_QUANTILE, + Lists.newArrayList(Type.INT, Type.ARRAY_DOUBLE), Type.ARRAY_INT, Type.VARBINARY, + false, false, true)); + addBuiltin(AggregateFunction.createBuiltin(DS_QUANTILE, + Lists.newArrayList(Type.BIGINT, Type.ARRAY_DOUBLE), Type.ARRAY_BIGINT, Type.VARBINARY, + false, false, true)); + + addBuiltin(AggregateFunction.createBuiltin(DS_QUANTILE, + Lists.newArrayList(Type.DOUBLE, Type.ARRAY_DOUBLE, Type.INT), Type.ARRAY_DOUBLE, Type.VARBINARY, + false, false, true)); + addBuiltin(AggregateFunction.createBuiltin(DS_QUANTILE, + Lists.newArrayList(Type.FLOAT, Type.ARRAY_DOUBLE, Type.INT), Type.ARRAY_FLOAT, Type.VARBINARY, + false, false, true)); + addBuiltin(AggregateFunction.createBuiltin(DS_QUANTILE, + Lists.newArrayList(Type.INT, Type.ARRAY_DOUBLE, Type.INT), Type.ARRAY_INT, Type.VARBINARY, + false, false, true)); + addBuiltin(AggregateFunction.createBuiltin(DS_QUANTILE, + Lists.newArrayList(Type.BIGINT, Type.ARRAY_DOUBLE, Type.INT), Type.ARRAY_BIGINT, Type.VARBINARY, + false, false, true)); + + ImmutableList DS_FREQUENT_SUPPORTED_TYPES = + ImmutableList.builder() + .addAll(Type.FLOAT_TYPES) + .addAll(Type.INTEGER_TYPES) + .addAll(Type.STRING_TYPES) + .addAll(Type.DATE_TYPES) + .build(); + for (Type type : DS_FREQUENT_SUPPORTED_TYPES) { + ArrayType retType = DS_FREQUENT_RET_TYPE_BUILDER.apply(type); + addBuiltin(AggregateFunction.createBuiltin(DS_FREQUENT, + Lists.newArrayList(type), retType, Type.VARBINARY, + false, true, true)); + addBuiltin(AggregateFunction.createBuiltin(DS_FREQUENT, + Lists.newArrayList(type, Type.BIGINT), retType, Type.VARBINARY, + false, true, true)); + addBuiltin(AggregateFunction.createBuiltin(DS_FREQUENT, + Lists.newArrayList(type, Type.BIGINT, Type.INT), retType, Type.VARBINARY, + false, true, true)); + addBuiltin(AggregateFunction.createBuiltin(DS_FREQUENT, + Lists.newArrayList(type, Type.BIGINT, Type.INT, Type.INT), retType, Type.VARBINARY, + false, true, true)); + } + } + public List getBuiltinFunctions() { List builtinFunctions = Lists.newArrayList(); for (Map.Entry> entry : vectorizedFunctions.entrySet()) { diff --git a/fe/fe-core/src/main/java/com/starrocks/sql/analyzer/FunctionAnalyzer.java b/fe/fe-core/src/main/java/com/starrocks/sql/analyzer/FunctionAnalyzer.java index 86f90526fdbb2..fc18377940d54 100644 --- a/fe/fe-core/src/main/java/com/starrocks/sql/analyzer/FunctionAnalyzer.java +++ b/fe/fe-core/src/main/java/com/starrocks/sql/analyzer/FunctionAnalyzer.java @@ -341,7 +341,10 @@ private static void analyzeBuiltinAggFunction(FunctionName fnName, || fnName.getFunction().equals(FunctionSet.MAX) || fnName.getFunction().equals(FunctionSet.NDV) || fnName.getFunction().equals(FunctionSet.APPROX_COUNT_DISTINCT) - || fnName.getFunction().equals(FunctionSet.DS_HLL_COUNT_DISTINCT)) + || fnName.getFunction().equals(FunctionSet.DS_HLL_COUNT_DISTINCT) + || fnName.getFunction().equals(FunctionSet.DS_QUANTILE) + || fnName.getFunction().equals(FunctionSet.DS_FREQUENT) + || fnName.getFunction().equals(FunctionSet.DS_THETA)) && !arg.getType().canApplyToNumeric()) { throw new SemanticException(Type.NOT_SUPPORT_AGG_ERROR_MSG); } @@ -534,7 +537,7 @@ private static void analyzeBuiltinAggFunction(FunctionName fnName, // check the second parameter: tgt_type if (argSize == 3) { if (!(functionCallExpr.getChild(2) instanceof StringLiteral)) { - throw new SemanticException(fnName + " 's second parameter's data type is wrong "); + throw new SemanticException(fnName + " 's third parameter's data type is wrong "); } String tgtType = ((LiteralExpr) functionCallExpr.getChild(2)).getStringValue(); if (!SUPPORTED_TGT_TYPES.contains(tgtType)) { @@ -544,6 +547,93 @@ private static void analyzeBuiltinAggFunction(FunctionName fnName, } } + // ds_quantile + if (fnName.getFunction().equals(FunctionSet.DS_QUANTILE)) { + int argSize = functionCallExpr.getChildren().size(); + if (argSize > 3 || argSize < 1) { + throw new SemanticException(fnName + " requires one/two/three parameters: ds_quantile(col, ranks, k)"); + } + if (!functionCallExpr.getChild(0).getType().isNumericType()) { + throw new SemanticException( + "ds_quantile requires the first parameter's type is numeric type"); + } + if (argSize >= 2) { + if (functionCallExpr.getChild(1) instanceof ArrayExpr) { + ArrayExpr ranksArrExpr = (ArrayExpr) functionCallExpr.getChild(1); + int ranksSize = ranksArrExpr.getChildren().size(); + for (int i = 0; i < ranksSize; i++) { + if (!(ranksArrExpr.getChild(i) instanceof DecimalLiteral)) { + throw new SemanticException(fnName + " 's second parameter's data type is wrong."); + } + double rank = ((LiteralExpr) ranksArrExpr.getChild(i)).getDoubleValue(); + if (rank < 0 || rank > 1) { + throw new SemanticException( + fnName + " rank should be between 0 and 1."); + } + } + } else if ((functionCallExpr.getChild(1) instanceof DecimalLiteral)) { + double rank = ((LiteralExpr) functionCallExpr.getChild(1)).getDoubleValue(); + if (rank < 0 || rank > 1) { + throw new SemanticException( + fnName + " rank should be between 0 and 1."); + } + } else { + throw new SemanticException(fnName + " 's second parameter's data type is wrong."); + } + } + if (argSize == 3) { + if (!(functionCallExpr.getChild(2) instanceof IntLiteral)) { + throw new SemanticException(fnName + " 's third parameter's data type is wrong."); + } + long k = ((LiteralExpr) functionCallExpr.getChild(2)).getLongValue(); + if (k < 2 || k > 32768) { + throw new SemanticException( + fnName + " third parameter'value should be between 2 and 32768."); + } + } + } + + // ds_frequent + if (fnName.getFunction().equals(FunctionSet.DS_FREQUENT)) { + int argSize = functionCallExpr.getChildren().size(); + if (argSize > 4) { + throw new SemanticException(fnName + " requires one/two/three/four parameters: ds_frequent(" + + "col, counter_num, lg_max_map_size, lg_start_map_size)"); + } + if (argSize >= 2) { + if (!(functionCallExpr.getChild(1) instanceof IntLiteral)) { + throw new SemanticException(fnName + " 's second parameter's data type is wrong."); + } + long counterNum = ((LiteralExpr) functionCallExpr.getChild(1)).getLongValue(); + if (counterNum < 1) { + throw new SemanticException( + fnName + " second parameter'value must be greater than 1."); + } + } + + if (argSize >= 3) { + if (!(functionCallExpr.getChild(2) instanceof IntLiteral)) { + throw new SemanticException(fnName + " 's third parameter's data type is wrong."); + } + long lgMaxMapSize = ((LiteralExpr) functionCallExpr.getChild(2)).getLongValue(); + if (lgMaxMapSize < 3 || lgMaxMapSize > 21) { + throw new SemanticException( + fnName + " third parameter'value should be between 3 and 21."); + } + } + + if (argSize == 4) { + if (!(functionCallExpr.getChild(3) instanceof IntLiteral)) { + throw new SemanticException(fnName + " 's third parameter's data type is wrong."); + } + long lgStartMapSize = ((LiteralExpr) functionCallExpr.getChild(3)).getLongValue(); + if (lgStartMapSize < 3 || lgStartMapSize > 21) { + throw new SemanticException( + fnName + " fourth parameter'value should be between 3 and 21."); + } + } + } + if (fnName.getFunction().equals(FunctionSet.COVAR_POP) || fnName.getFunction().equals(FunctionSet.COVAR_SAMP) || fnName.getFunction().equals(FunctionSet.CORR)) { if (functionCallExpr.getChildren().size() != 2) { diff --git a/fe/fe-core/src/main/java/com/starrocks/sql/optimizer/rule/tree/PreAggregateTurnOnRule.java b/fe/fe-core/src/main/java/com/starrocks/sql/optimizer/rule/tree/PreAggregateTurnOnRule.java index dddb609e5d308..0422b3761d061 100644 --- a/fe/fe-core/src/main/java/com/starrocks/sql/optimizer/rule/tree/PreAggregateTurnOnRule.java +++ b/fe/fe-core/src/main/java/com/starrocks/sql/optimizer/rule/tree/PreAggregateTurnOnRule.java @@ -71,6 +71,9 @@ private static class PreAggregateVisitor extends OptExpressionVisitor Date: Wed, 23 Oct 2024 10:37:02 +0800 Subject: [PATCH 2/5] fix format exception. Signed-off-by: chenminghua8 --- be/src/exprs/agg/ds_agg.h | 90 ++++++------------ .../agg/factory/aggregate_resolver_approx.cpp | 3 +- be/src/types/ds_sketch.h | 94 ++++++++++--------- .../com/starrocks/catalog/FunctionSet.java | 10 ++ 4 files changed, 90 insertions(+), 107 deletions(-) diff --git a/be/src/exprs/agg/ds_agg.h b/be/src/exprs/agg/ds_agg.h index 04a9ca7977549..d6f4811ae0668 100644 --- a/be/src/exprs/agg/ds_agg.h +++ b/be/src/exprs/agg/ds_agg.h @@ -28,12 +28,11 @@ enum SketchType { HLL = 0, QUANTILE = 1, FREQUENT = 2, - THETA =3, + THETA = 3, }; template -struct DSSketchState { -}; +struct DSSketchState {}; template struct DSSketchState { @@ -48,16 +47,15 @@ struct DSSketchState { ds_sketch_wrapper = std::make_unique(log_k, tgt_type, &memory_usage); } - bool is_inited() const { - return ds_sketch_wrapper != nullptr; - } + bool is_inited() const { return ds_sketch_wrapper != nullptr; } void merge(const BinaryColumn* sketch_data_column, size_t row_num) { DSSketchState other_state; other_state.deserialize(sketch_data_column->get(row_num).get_slice(), &memory_usage); if (UNLIKELY(!is_inited())) { - ds_sketch_wrapper = std::make_unique( - other_state.ds_sketch_wrapper->get_lg_config_k(), other_state.ds_sketch_wrapper->get_target_type(), &memory_usage); + ds_sketch_wrapper = + std::make_unique(other_state.ds_sketch_wrapper->get_lg_config_k(), + other_state.ds_sketch_wrapper->get_target_type(), &memory_usage); } ds_sketch_wrapper->merge(*other_state.ds_sketch_wrapper); } @@ -101,13 +99,9 @@ struct DSSketchState { } } - size_t serialize(uint8_t* dst) const { - return ds_sketch_wrapper->serialize(dst); - } + size_t serialize(uint8_t* dst) const { return ds_sketch_wrapper->serialize(dst); } - size_t serialize_size() const { - return ds_sketch_wrapper->serialize_size(); - } + size_t serialize_size() const { return ds_sketch_wrapper->serialize_size(); } void deserialize(const Slice& slice, int64_t* memory_usage) { ds_sketch_wrapper = std::make_unique(slice, memory_usage); @@ -124,9 +118,7 @@ struct DSSketchState { } } - static std::string getFunName() { - return "ds_hll_count_distinct"; - } + static std::string getFunName() { return "ds_hll_count_distinct"; } private: // parse log_k and target type from args @@ -162,7 +154,6 @@ struct DSSketchState { std::unique_ptr ds_sketch_wrapper = nullptr; int64_t memory_usage = 0; - void init(FunctionContext* ctx) { DatumArray datum_array; uint16_t k; @@ -186,9 +177,7 @@ struct DSSketchState { ds_sketch_wrapper = std::make_unique(k, &memory_usage); } - bool is_inited() const { - return ds_sketch_wrapper != nullptr; - } + bool is_inited() const { return ds_sketch_wrapper != nullptr; } void update(const Column* data_column, size_t row_num) const { const ColumnType* column = down_cast(data_column); @@ -215,8 +204,8 @@ struct DSSketchState { *ranks_prt = other_state.ranks.get()[i]; ranks_prt++; } - ds_sketch_wrapper = std::make_unique( - other_state.ds_sketch_wrapper->get_k(), &memory_usage); + ds_sketch_wrapper = + std::make_unique(other_state.ds_sketch_wrapper->get_k(), &memory_usage); } ds_sketch_wrapper->merge(*other_state.ds_sketch_wrapper); } @@ -228,7 +217,7 @@ struct DSSketchState { memcpy(dst + offset, ranks.get(), ranks_size * sizeof(double)); offset = offset + ranks_size * sizeof(double); size_t ser_sketch_size = ds_sketch_wrapper->serialize(dst + offset); - return offset + ser_sketch_size; + return offset + ser_sketch_size; } size_t serialize_size() const { @@ -244,11 +233,10 @@ struct DSSketchState { } offset = offset + sizeof(uint32_t); ranks = std::make_unique(ranks_size); - memcpy(ranks.get(), ptr + offset, ranks_size * sizeof(double)); + memcpy(ranks.get(), ptr + offset, ranks_size * sizeof(double)); offset = offset + ranks_size * sizeof(double); const Slice sketch_data_slice = Slice(slice.get_data() + offset, slice.size - offset); ds_sketch_wrapper = std::make_unique(sketch_data_slice, memory_usage); - } void get_values(Column* dst, size_t start, size_t end) const { @@ -263,7 +251,7 @@ struct DSSketchState { result = ds_sketch_wrapper->get_quantiles(ranks.get(), ranks_size); } - uint32_t index =0; + uint32_t index = 0; for (size_t row = start; row < end; row++) { for (CppType result_data : result) { result_column->append(result_data); @@ -274,9 +262,7 @@ struct DSSketchState { } } - static std::string getFunName() { - return "ds_quantile"; - } + static std::string getFunName() { return "ds_quantile"; } private: // parse k and rank_arr from args @@ -320,7 +306,7 @@ struct SpecialCppType { }; template <> struct SpecialCppType { - using CppType = std::string; + using CppType = std::string; }; template <> struct SpecialCppType { @@ -348,13 +334,11 @@ struct DSSketchState { ds_sketch_wrapper = std::make_unique(lg_max_map_size, lg_start_map_size, &memory_usage); } - bool is_inited() const { - return ds_sketch_wrapper != nullptr; - } + bool is_inited() const { return ds_sketch_wrapper != nullptr; } void update(const Column* data_column, size_t row_num) const { if constexpr (!IsSlice) { - const ColumnType* column = down_cast(data_column); + const ColumnType* column = down_cast(data_column); const auto& values = column->get_data(); ds_sketch_wrapper->update(values[row_num]); } else { @@ -366,7 +350,7 @@ struct DSSketchState { void update_batch_single_state_with_frame(const Column* data_column, int64_t frame_start, int64_t frame_end) const { if constexpr (!IsSlice) { - const ColumnType* column = down_cast(data_column); + const ColumnType* column = down_cast(data_column); const auto& values = column->get_data(); for (size_t i = frame_start; i < frame_end; ++i) { ds_sketch_wrapper->update(values[i]); @@ -377,7 +361,6 @@ struct DSSketchState { const Slice data = column->get_slice(i); ds_sketch_wrapper->update(std::string(data.get_data(), data.size)); } - } } @@ -402,7 +385,7 @@ struct DSSketchState { memcpy(dst + offset, &lg_start_map_size, sizeof(uint8_t)); offset = offset + sizeof(uint8_t); size_t ser_sketch_size = ds_sketch_wrapper->serialize(dst + offset); - return offset + ser_sketch_size; + return offset + ser_sketch_size; } size_t serialize_size() const { @@ -419,8 +402,8 @@ struct DSSketchState { memcpy(&lg_start_map_size, ptr + offset, sizeof(uint8_t)); offset = offset + sizeof(uint8_t); const Slice sketch_data_slice = Slice(slice.get_data() + offset, slice.size - offset); - ds_sketch_wrapper = std::make_unique(sketch_data_slice, lg_max_map_size, - lg_start_map_size, memory_usage); + ds_sketch_wrapper = std::make_unique(sketch_data_slice, lg_max_map_size, lg_start_map_size, + memory_usage); } void get_values(Column* dst, size_t start, size_t end) const { @@ -439,7 +422,7 @@ struct DSSketchState { if (LIKELY(ds_sketch_wrapper != nullptr)) { result = ds_sketch_wrapper->get_frequent_items(0); } - uint32_t index =0; + uint32_t index = 0; for (size_t row = start; row < end; row++) { uint32_t counter_num_index = 0; for (FrequentRow frequentRow : result) { @@ -466,9 +449,7 @@ struct DSSketchState { } } - static std::string getFunName() { - return "ds_frequent"; - } + static std::string getFunName() { return "ds_frequent"; } private: // parse threshold lg_max_map_size and lg_start_map_size from args @@ -489,7 +470,6 @@ struct DSSketchState { lg_max_map_size = lg_start_map_size; } return {counter_num, lg_max_map_size, lg_start_map_size}; - } }; @@ -502,13 +482,9 @@ struct DSSketchState { std::unique_ptr ds_sketch_wrapper = nullptr; int64_t memory_usage = 0; - void init(FunctionContext* ctx) { - ds_sketch_wrapper = std::make_unique(&memory_usage); - } + void init(FunctionContext* ctx) { ds_sketch_wrapper = std::make_unique(&memory_usage); } - bool is_inited() const { - return ds_sketch_wrapper != nullptr; - } + bool is_inited() const { return ds_sketch_wrapper != nullptr; } void merge(const BinaryColumn* sketch_data_column, size_t row_num) { DSSketchState other_state; @@ -558,13 +534,9 @@ struct DSSketchState { } } - size_t serialize(uint8_t* dst) const { - return ds_sketch_wrapper->serialize(dst); - } + size_t serialize(uint8_t* dst) const { return ds_sketch_wrapper->serialize(dst); } - size_t serialize_size() const { - return ds_sketch_wrapper->serialize_size(); - } + size_t serialize_size() const { return ds_sketch_wrapper->serialize_size(); } void deserialize(const Slice& slice, int64_t* memory_usage) { ds_sketch_wrapper = std::make_unique(slice, memory_usage); @@ -581,9 +553,7 @@ struct DSSketchState { } } - static std::string getFunName() { - return "ds_theta"; - } + static std::string getFunName() { return "ds_theta"; } }; template , typename T = RunTimeCppType> diff --git a/be/src/exprs/agg/factory/aggregate_resolver_approx.cpp b/be/src/exprs/agg/factory/aggregate_resolver_approx.cpp index ee5faf98999ac..e1088f5f9cbe0 100644 --- a/be/src/exprs/agg/factory/aggregate_resolver_approx.cpp +++ b/be/src/exprs/agg/factory/aggregate_resolver_approx.cpp @@ -62,8 +62,7 @@ struct DataSketchesBuilder { "ds_hll_count_distinct", false, AggregateFactory::MakeDataSketchesAggregateFunction()); resolver->add_aggregate_mapping>( - "ds_theta", false, - AggregateFactory::MakeDataSketchesAggregateFunction()); + "ds_theta", false, AggregateFactory::MakeDataSketchesAggregateFunction()); } if constexpr (lt_is_integer || lt_is_float) { resolver->add_aggregate_mapping>( diff --git a/be/src/types/ds_sketch.h b/be/src/types/ds_sketch.h index 523db2b97f5b9..610c09eba08f4 100644 --- a/be/src/types/ds_sketch.h +++ b/be/src/types/ds_sketch.h @@ -17,12 +17,12 @@ #pragma once -#include -#include - #include #include #include +#include +#include + #include "runtime/memory/counting_allocator.h" #include "runtime/memory/mem_chunk.h" #include "runtime/memory/mem_chunk_allocator.h" @@ -151,8 +151,7 @@ class DataSketchesQuantile { using alloc_type = STLCountingAllocator; using quantile_sketch_type = datasketches::quantiles_sketch, alloc_type>; - explicit DataSketchesQuantile(uint16_t k, int64_t* memory_usage) - : _memory_usage(memory_usage) { + explicit DataSketchesQuantile(uint16_t k, int64_t* memory_usage) : _memory_usage(memory_usage) { this->_sketch = std::make_unique(k, std::less(), alloc_type(_memory_usage)); } @@ -160,8 +159,7 @@ class DataSketchesQuantile { DataSketchesQuantile& operator=(const DataSketchesQuantile& other) = delete; DataSketchesQuantile(DataSketchesQuantile&& other) noexcept - : _memory_usage(std::move(other._memory_usage)), - _sketch(std::move(other._sketch)) {} + : _memory_usage(std::move(other._memory_usage)), _sketch(std::move(other._sketch)) {} DataSketchesQuantile& operator=(DataSketchesQuantile&& other) noexcept { if (this != &other) { this->_memory_usage = std::move(other._memory_usage); @@ -178,17 +176,14 @@ class DataSketchesQuantile { ~DataSketchesQuantile() = default; - uint16_t get_k() const { - return _sketch->get_k(); - } + uint16_t get_k() const { return _sketch->get_k(); } - void update(T value) { - _sketch->update(value); - } + void update(T value) { _sketch->update(value); } void merge(const DataSketchesQuantile& other) { if (UNLIKELY(_sketch == nullptr)) { - _sketch = std::make_unique(other._sketch->get_k(), std::less(), alloc_type(_memory_usage)); + _sketch = std::make_unique(other._sketch->get_k(), std::less(), + alloc_type(_memory_usage)); } _sketch.get()->merge(*other._sketch); } @@ -220,7 +215,7 @@ class DataSketchesQuantile { try { _sketch = std::make_unique( quantile_sketch_type::deserialize((uint8_t*)slice.data, slice.size, datasketches::serde(), - std::less(), alloc_type(_memory_usage))); + std::less(), alloc_type(_memory_usage))); } catch (std::logic_error& e) { LOG(WARNING) << "DataSketchesQuantile deserialize error: " << e.what(); return false; @@ -254,7 +249,8 @@ class DataSketchesQuantile { void clear() { *_memory_usage = 0; - this->_sketch = std::make_unique(_sketch->get_k(), std::less(), alloc_type(_memory_usage)); + this->_sketch = + std::make_unique(_sketch->get_k(), std::less(), alloc_type(_memory_usage)); } std::string to_string() const { @@ -265,9 +261,9 @@ class DataSketchesQuantile { return std::string(str.begin(), str.end()); } - private: - int64_t* _memory_usage; - mutable std::unique_ptr _sketch = nullptr; +private: + int64_t* _memory_usage; + mutable std::unique_ptr _sketch = nullptr; }; template @@ -282,20 +278,23 @@ template class DataSketchesFrequent { public: using alloc_type = STLCountingAllocator; - using frequent_sketch_type = datasketches::frequent_items_sketch, std::equal_to, alloc_type>; + using frequent_sketch_type = + datasketches::frequent_items_sketch, std::equal_to, alloc_type>; explicit DataSketchesFrequent(uint8_t lg_max_map_size, uint8_t lg_start_map_size, int64_t* memory_usage) - : _memory_usage(memory_usage), _lg_max_map_size(lg_max_map_size) , _lg_start_map_size(lg_start_map_size){ - _sketch = std::make_unique( - _lg_max_map_size, _lg_start_map_size, std::equal_to(), alloc_type(_memory_usage)); + : _memory_usage(memory_usage), _lg_max_map_size(lg_max_map_size) , _lg_start_map_size(lg_start_map_size){ + _sketch = std::make_unique(_lg_max_map_size, _lg_start_map_size, std::equal_to(), + alloc_type(_memory_usage)); } DataSketchesFrequent(const DataSketchesFrequent& other) = delete; DataSketchesFrequent& operator=(const DataSketchesFrequent& other) = delete; DataSketchesFrequent(DataSketchesFrequent&& other) noexcept - : _memory_usage(std::move(other._memory_usage)), _lg_max_map_size(other._lg_max_map_size) , - _lg_start_map_size(other._lg_start_map_size), _sketch(std::move(other._sketch)) {} + : _memory_usage(std::move(other._memory_usage)), + _lg_max_map_size(other._lg_max_map_size), + _lg_start_map_size(other._lg_start_map_size), + _sketch(std::move(other._sketch)) {} DataSketchesFrequent& operator=(DataSketchesFrequent&& other) noexcept { if (this != &other) { @@ -307,8 +306,9 @@ class DataSketchesFrequent { return *this; } - explicit DataSketchesFrequent(const Slice& src, uint8_t lg_max_map_size, uint8_t lg_start_map_size, int64_t* memory_usage) - : _memory_usage(memory_usage), _lg_max_map_size(lg_max_map_size), _lg_start_map_size(lg_start_map_size) { + explicit DataSketchesFrequent(const Slice& src, uint8_t lg_max_map_size, uint8_t lg_start_map_size, + int64_t* memory_usage) + : _memory_usage(memory_usage), _lg_max_map_size(lg_max_map_size), _lg_start_map_size(lg_start_map_size) { if (!deserialize(src)) { LOG(WARNING) << "Failed to init DataSketchesFrequent from slice, will be reset to 0."; } @@ -317,9 +317,9 @@ class DataSketchesFrequent { ~DataSketchesFrequent() = default; void update(T value) { - uint32_t old_active_items = _sketch->get_num_active_items(); + uint32_t old_active_items = _sketch->get_num_active_items(); _sketch->update(value); - uint32_t new_active_items = _sketch->get_num_active_items(); + uint32_t new_active_items = _sketch->get_num_active_items(); if (old_active_items != new_active_items) { // *_memory_usage = *_memory_usage + sizeof(T); } @@ -327,8 +327,8 @@ class DataSketchesFrequent { void merge(const DataSketchesFrequent& other) { if (UNLIKELY(_sketch == nullptr)) { - _sketch = std::make_unique( - _lg_max_map_size, _lg_start_map_size, std::equal_to(), alloc_type(_memory_usage)); + _sketch = std::make_unique(_lg_max_map_size, _lg_start_map_size, std::equal_to(), + alloc_type(_memory_usage)); } _sketch.get()->merge(*other._sketch); } @@ -360,7 +360,7 @@ class DataSketchesFrequent { try { _sketch = std::make_unique( frequent_sketch_type::deserialize((uint8_t*)slice.data, slice.size, datasketches::serde(), - std::equal_to(), alloc_type(_memory_usage))); + std::equal_to(), alloc_type(_memory_usage))); } catch (std::logic_error& e) { LOG(WARNING) << "DataSketchesFrequent deserialize error: " << e.what(); return false; @@ -374,10 +374,10 @@ class DataSketchesFrequent { return result; } try { - auto frequent_items = _sketch->get_frequent_items(datasketches::NO_FALSE_POSITIVES, threshold); + auto frequent_items = _sketch->get_frequent_items(datasketches::NO_FALSE_POSITIVES, threshold); for (auto item : frequent_items) { - FrequentRow frequent_row = FrequentRow {item.get_item(), item.get_estimate(), item.get_lower_bound(), - item.get_upper_bound()}; + FrequentRow frequent_row = FrequentRow{item.get_item(), item.get_estimate(), + item.get_lower_bound(), item.get_upper_bound()}; result.push_back(frequent_row); } } catch (std::logic_error& e) { @@ -396,8 +396,8 @@ class DataSketchesFrequent { void clear() { *_memory_usage = 0; - this->_sketch = std::make_unique( - _lg_max_map_size, _lg_start_map_size, std::equal_to(), alloc_type(_memory_usage)); + this->_sketch = std::make_unique(_lg_max_map_size, _lg_start_map_size, std::equal_to(), + alloc_type(_memory_usage)); } std::string to_string() const { @@ -432,7 +432,7 @@ class DataSketchesTheta { DataSketchesTheta& operator=(const DataSketchesTheta& other) = delete; DataSketchesTheta(DataSketchesTheta&& other) noexcept - : _memory_usage(std::move(other._memory_usage)), _sketch(std::move(other._sketch)) { + : _memory_usage(std::move(other._memory_usage)), _sketch(std::move(other._sketch)) { if (other._sketch_union != nullptr) { this->_sketch_union = std::move(other._sketch_union); } @@ -449,8 +449,7 @@ class DataSketchesTheta { return *this; } - explicit DataSketchesTheta(const Slice& src, int64_t* memory_usage) - : _memory_usage(memory_usage) { + explicit DataSketchesTheta(const Slice& src, int64_t* memory_usage) : _memory_usage(memory_usage) { if (!deserialize(src)) { LOG(WARNING) << "Failed to init DataSketchesFrequent from slice, will be reset to 0."; } @@ -465,7 +464,8 @@ class DataSketchesTheta { void merge(const DataSketchesTheta& other) { if (_sketch_union == nullptr) { - _sketch_union = std::make_unique(theta_union_type::builder(alloc_type(_memory_usage)).build()); + _sketch_union = + std::make_unique(theta_union_type::builder(alloc_type(_memory_usage)).build()); } _sketch_union->update(other._sketch->compact()); if (other._sketch_union != nullptr) { @@ -489,7 +489,8 @@ class DataSketchesTheta { void serialize_if_needed() const { if (UNLIKELY(_sketch == nullptr)) { - _sketch = std::make_unique(theta_sketch_type::builder(alloc_type(_memory_usage)).build()); + _sketch = + std::make_unique(theta_sketch_type::builder(alloc_type(_memory_usage)).build()); } if (_is_changed) { auto resultTheta_union = theta_union_type(theta_union_type::builder(alloc_type(_memory_usage)).build()); @@ -498,7 +499,8 @@ class DataSketchesTheta { resultTheta_union.update(_sketch_union->get_result()); } auto sketch_ser = resultTheta_union.get_result().serialize(); - _sketch_data = std::make_unique(sketch_data_type(sketch_ser.begin(),sketch_ser.end(), sketch_ser.get_allocator())); + _sketch_data = std::make_unique(sketch_data_type( + sketch_ser.begin(),sketch_ser.end(), sketch_ser.get_allocator())); _is_changed = false; } } @@ -508,11 +510,13 @@ class DataSketchesTheta { return false; } DCHECK(_sketch == nullptr); - _sketch = std::make_unique(theta_sketch_type::builder(alloc_type(_memory_usage)).build()); + _sketch = + std::make_unique(theta_sketch_type::builder(alloc_type(_memory_usage)).build()); try { auto sketch_warp = theta_wrapped_type::wrap((uint8_t*)slice.data, slice.size); if (_sketch_union == nullptr) { - _sketch_union = std::make_unique(theta_union_type::builder(alloc_type(_memory_usage)).build()); + _sketch_union = std::make_unique( + theta_union_type::builder(alloc_type(_memory_usage)).build()); } _sketch_union->update(sketch_warp); } catch (std::logic_error& e) { diff --git a/fe/fe-core/src/main/java/com/starrocks/catalog/FunctionSet.java b/fe/fe-core/src/main/java/com/starrocks/catalog/FunctionSet.java index 7317517036329..589ecbc1608fb 100644 --- a/fe/fe-core/src/main/java/com/starrocks/catalog/FunctionSet.java +++ b/fe/fe-core/src/main/java/com/starrocks/catalog/FunctionSet.java @@ -1680,6 +1680,16 @@ private void registerBuiltinDsFunction() { } } + public static final java.util.function.Function DS_FREQUENT_RET_TYPE_BUILDER = + (Type itemType) -> { + List fields = Lists.newArrayList(); + fields.add(new StructField("value", itemType)); + fields.add(new StructField("count", Type.BIGINT)); + fields.add(new StructField("lower_bound", Type.BIGINT)); + fields.add(new StructField("upper_bound", Type.BIGINT)); + return new ArrayType(new StructType(fields, true)); + }; + public List getBuiltinFunctions() { List builtinFunctions = Lists.newArrayList(); for (Map.Entry> entry : vectorizedFunctions.entrySet()) { From 2540ae92c8f56b378f27165b52b1e79e5791959f Mon Sep 17 00:00:00 2001 From: chenminghua8 Date: Thu, 24 Oct 2024 09:33:02 +0800 Subject: [PATCH 3/5] fix format exception. Signed-off-by: chenminghua8 --- be/src/exprs/agg/ds_agg.h | 2 +- be/src/types/ds_sketch.h | 13 ++++++------- 2 files changed, 7 insertions(+), 8 deletions(-) diff --git a/be/src/exprs/agg/ds_agg.h b/be/src/exprs/agg/ds_agg.h index d6f4811ae0668..1b9e76103b6b9 100644 --- a/be/src/exprs/agg/ds_agg.h +++ b/be/src/exprs/agg/ds_agg.h @@ -204,7 +204,7 @@ struct DSSketchState { *ranks_prt = other_state.ranks.get()[i]; ranks_prt++; } - ds_sketch_wrapper = + ds_sketch_wrapper = std::make_unique(other_state.ds_sketch_wrapper->get_k(), &memory_usage); } ds_sketch_wrapper->merge(*other_state.ds_sketch_wrapper); diff --git a/be/src/types/ds_sketch.h b/be/src/types/ds_sketch.h index 610c09eba08f4..ca55602bf20b6 100644 --- a/be/src/types/ds_sketch.h +++ b/be/src/types/ds_sketch.h @@ -282,7 +282,7 @@ class DataSketchesFrequent { datasketches::frequent_items_sketch, std::equal_to, alloc_type>; explicit DataSketchesFrequent(uint8_t lg_max_map_size, uint8_t lg_start_map_size, int64_t* memory_usage) - : _memory_usage(memory_usage), _lg_max_map_size(lg_max_map_size) , _lg_start_map_size(lg_start_map_size){ + : _memory_usage(memory_usage), _lg_max_map_size(lg_max_map_size), _lg_start_map_size(lg_start_map_size) { _sketch = std::make_unique(_lg_max_map_size, _lg_start_map_size, std::equal_to(), alloc_type(_memory_usage)); } @@ -464,7 +464,7 @@ class DataSketchesTheta { void merge(const DataSketchesTheta& other) { if (_sketch_union == nullptr) { - _sketch_union = + _sketch_union = std::make_unique(theta_union_type::builder(alloc_type(_memory_usage)).build()); } _sketch_union->update(other._sketch->compact()); @@ -489,7 +489,7 @@ class DataSketchesTheta { void serialize_if_needed() const { if (UNLIKELY(_sketch == nullptr)) { - _sketch = + _sketch = std::make_unique(theta_sketch_type::builder(alloc_type(_memory_usage)).build()); } if (_is_changed) { @@ -499,8 +499,8 @@ class DataSketchesTheta { resultTheta_union.update(_sketch_union->get_result()); } auto sketch_ser = resultTheta_union.get_result().serialize(); - _sketch_data = std::make_unique(sketch_data_type( - sketch_ser.begin(),sketch_ser.end(), sketch_ser.get_allocator())); + _sketch_data = std::make_unique( + sketch_data_type(sketch_ser.begin(), sketch_ser.end(), sketch_ser.get_allocator())); _is_changed = false; } } @@ -510,8 +510,7 @@ class DataSketchesTheta { return false; } DCHECK(_sketch == nullptr); - _sketch = - std::make_unique(theta_sketch_type::builder(alloc_type(_memory_usage)).build()); + _sketch = std::make_unique(theta_sketch_type::builder(alloc_type(_memory_usage)).build()); try { auto sketch_warp = theta_wrapped_type::wrap((uint8_t*)slice.data, slice.size); if (_sketch_union == nullptr) { From 02dc2b06483bdabf0141c80a67b797a98e3388f1 Mon Sep 17 00:00:00 2001 From: chenminghua8 Date: Thu, 24 Oct 2024 10:20:16 +0800 Subject: [PATCH 4/5] fix format exception. Signed-off-by: chenminghua8 --- be/src/types/constexpr.h | 6 ++++++ 1 file changed, 6 insertions(+) diff --git a/be/src/types/constexpr.h b/be/src/types/constexpr.h index d4a6c78172bec..7268cd15bd5d2 100644 --- a/be/src/types/constexpr.h +++ b/be/src/types/constexpr.h @@ -31,6 +31,12 @@ constexpr int HLL_EMPTY_SIZE = 1; const static int MAX_HLL_LOG_K = 20; const static uint8_t DEFAULT_HLL_LOG_K = 17; +const static uint16_t DEFAULT_QUANTILE_K = 128; +const static uint64_t DEFAULT_COUNTER_NUM = 10; + +const static uint8_t DEFAULT_FREQUENT_LG_MIn_SIZE = 3; +const static uint8_t DEFAULT_FREQUENT_LG_MAX_SIZE = 21; + // For JSON type constexpr int kJsonDefaultSize = 128; constexpr int kJsonMetaDefaultFormatVersion = 1; From 358b65285819035591ecbf9088556e0bc57fcbc9 Mon Sep 17 00:00:00 2001 From: chenminghua8 Date: Thu, 24 Oct 2024 14:52:50 +0800 Subject: [PATCH 5/5] fix format exception. Signed-off-by: chenminghua8 --- be/src/exprs/agg/ds_agg.h | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/be/src/exprs/agg/ds_agg.h b/be/src/exprs/agg/ds_agg.h index 1b9e76103b6b9..d4a5a8473c0d7 100644 --- a/be/src/exprs/agg/ds_agg.h +++ b/be/src/exprs/agg/ds_agg.h @@ -320,7 +320,7 @@ struct SpecialCppType { template struct DSSketchState { using OriginalCppType = RunTimeCppType; - using CppType = SpecialCppType::CppType; + using CppType = typename SpecialCppType::CppType; using ColumnType = RunTimeColumnType; using SketchWarapperType = DataSketchesFrequent; uint64_t counter_num; @@ -475,7 +475,7 @@ struct DSSketchState { template struct DSSketchState { - using CppType = SpecialCppType::CppType; + using CppType = typename SpecialCppType::CppType; using ColumnType = RunTimeColumnType; using SketchWarapperType = DataSketchesTheta;