Skip to content

Commit

Permalink
[SR-4450] support vectorized schema scanner
Browse files Browse the repository at this point in the history
  • Loading branch information
starrocks-xueli committed Sep 15, 2021
1 parent f3b6b43 commit f0eb713
Show file tree
Hide file tree
Showing 36 changed files with 2,726 additions and 1 deletion.
17 changes: 17 additions & 0 deletions be/src/exec/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -144,6 +144,23 @@ set(EXEC_FILES
vectorized/es_http_scan_node.cpp
vectorized/es_http_scanner.cpp
vectorized/es_http_components.cpp
vectorized/schema_scanner.cpp
vectorized/schema_scan_node.cpp
vectorized/schema_scanner/schema_tables_scanner.cpp
vectorized/schema_scanner/schema_dummy_scanner.cpp
vectorized/schema_scanner/schema_schemata_scanner.cpp
vectorized/schema_scanner/schema_variables_scanner.cpp
vectorized/schema_scanner/schema_columns_scanner.cpp
vectorized/schema_scanner/schema_charsets_scanner.cpp
vectorized/schema_scanner/schema_collations_scanner.cpp
vectorized/schema_scanner/schema_statistics_scanner.cpp
vectorized/schema_scanner/schema_triggers_scanner.cpp
vectorized/schema_scanner/schema_events_scanner.cpp
vectorized/schema_scanner/schema_views_scanner.cpp
vectorized/schema_scanner/schema_user_privileges_scanner.cpp
vectorized/schema_scanner/schema_schema_privileges_scanner.cpp
vectorized/schema_scanner/schema_table_privileges_scanner.cpp
vectorized/schema_scanner/schema_helper.cpp
parquet/column_chunk_reader.cpp
parquet/column_reader.cpp
parquet/encoding.cpp
Expand Down
3 changes: 2 additions & 1 deletion be/src/exec/exec_node.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -70,6 +70,7 @@
#include "exec/vectorized/olap_scan_node.h"
#include "exec/vectorized/project_node.h"
#include "exec/vectorized/repeat_node.h"
#include "exec/vectorized/schema_scan_node.h"
#include "exec/vectorized/table_function_node.h"
#include "exec/vectorized/topn_node.h"
#include "exec/vectorized/union_node.h"
Expand Down Expand Up @@ -552,7 +553,7 @@ Status ExecNode::create_vectorized_node(starrocks::RuntimeState* state, starrock
*node = pool->add(new vectorized::EsHttpScanNode(pool, tnode, descs));
return Status::OK();
case TPlanNodeType::SCHEMA_SCAN_NODE:
*node = pool->add(new SchemaScanNode(pool, tnode, descs));
*node = pool->add(new vectorized::SchemaScanNode(pool, tnode, descs));
return Status::OK();
default:
return Status::InternalError(strings::Substitute("Vectorized engine not support node: $0", tnode.node_type));
Expand Down
228 changes: 228 additions & 0 deletions be/src/exec/vectorized/schema_scan_node.cpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,228 @@
// This file is licensed under the Elastic License 2.0. Copyright 2021 StarRocks Limited.

#include "exec/vectorized/schema_scan_node.h"

#include <boost/algorithm/string.hpp>

#include "column/column_helper.h"
#include "exec/text_converter.hpp"
#include "exec/vectorized/schema_scanner/schema_helper.h"
#include "gen_cpp/PlanNodes_types.h"
#include "gen_cpp/Types_types.h"
#include "runtime/runtime_state.h"
#include "runtime/string_value.h"
#include "util/runtime_profile.h"

namespace starrocks::vectorized {

SchemaScanNode::SchemaScanNode(ObjectPool* pool, const TPlanNode& tnode, const DescriptorTbl& descs)
: ScanNode(pool, tnode, descs),
_is_init(false),
_table_name(tnode.schema_scan_node.table_name),
_tuple_id(tnode.schema_scan_node.tuple_id),
_dest_tuple_desc(nullptr),
_schema_scanner(nullptr) {}

SchemaScanNode::~SchemaScanNode() {}

Status SchemaScanNode::init(const TPlanNode& tnode, RuntimeState* state) {
RETURN_IF_ERROR(ExecNode::init(tnode));
if (tnode.schema_scan_node.__isset.db) {
_scanner_param.db = _pool->add(new std::string(tnode.schema_scan_node.db));
}

if (tnode.schema_scan_node.__isset.table) {
_scanner_param.table = _pool->add(new std::string(tnode.schema_scan_node.table));
}

if (tnode.schema_scan_node.__isset.wild) {
_scanner_param.wild = _pool->add(new std::string(tnode.schema_scan_node.wild));
}

if (tnode.schema_scan_node.__isset.current_user_ident) {
_scanner_param.current_user_ident = _pool->add(new TUserIdentity(tnode.schema_scan_node.current_user_ident));
} else {
if (tnode.schema_scan_node.__isset.user) {
_scanner_param.user = _pool->add(new std::string(tnode.schema_scan_node.user));
}
if (tnode.schema_scan_node.__isset.user_ip) {
_scanner_param.user_ip = _pool->add(new std::string(tnode.schema_scan_node.user_ip));
}
}

if (tnode.schema_scan_node.__isset.ip) {
_scanner_param.ip = _pool->add(new std::string(tnode.schema_scan_node.ip));
}
if (tnode.schema_scan_node.__isset.port) {
_scanner_param.port = tnode.schema_scan_node.port;
}

if (tnode.schema_scan_node.__isset.thread_id) {
_scanner_param.thread_id = tnode.schema_scan_node.thread_id;
}
return Status::OK();
}

Status SchemaScanNode::prepare(RuntimeState* state) {
if (_is_init) {
return Status::OK();
}

if (nullptr == state) {
return Status::InternalError("input pointer is nullptr.");
}

RETURN_IF_ERROR(ScanNode::prepare(state));

// get dest tuple desc
_dest_tuple_desc = state->desc_tbl().get_tuple_descriptor(_tuple_id);

if (nullptr == _dest_tuple_desc) {
return Status::InternalError("Failed to get tuple descriptor.");
}

const SchemaTableDescriptor* schema_table =
static_cast<const SchemaTableDescriptor*>(_dest_tuple_desc->table_desc());

if (nullptr == schema_table) {
return Status::InternalError("Failed to get schema table descriptor.");
}

// new one scanner
_schema_scanner.reset(SchemaScanner::create(schema_table->schema_table_type()));

if (nullptr == _schema_scanner.get()) {
return Status::InternalError("schema scanner get nullptr pointer.");
}

_schema_scanner->set_slot_descs(_dest_tuple_desc->slots());

RETURN_IF_ERROR(_schema_scanner->init(&_scanner_param, _pool));

_is_init = true;

return Status::OK();
}

Status SchemaScanNode::open(RuntimeState* state) {
if (!_is_init) {
return Status::InternalError("Open before Init.");
}

if (nullptr == state) {
return Status::InternalError("input pointer is nullptr.");
}

SCOPED_TIMER(_runtime_profile->total_time_counter());
RETURN_IF_ERROR(exec_debug_action(TExecNodePhase::OPEN));
RETURN_IF_CANCELLED(state);
RETURN_IF_ERROR(ExecNode::open(state));

if (_scanner_param.user) {
TSetSessionParams param;
param.__set_user(*_scanner_param.user);
}

return _schema_scanner->start(state);
}

Status SchemaScanNode::get_next(RuntimeState* state, RowBatch* row_batch, bool* eos) {
if (!_is_init) {
return Status::InternalError("used before initialized.");
}

if (nullptr == state || nullptr == row_batch || nullptr == eos) {
return Status::InternalError("input pointer is nullptr.");
}

*eos = true;
return Status::OK();
}

Status SchemaScanNode::get_next(RuntimeState* state, ChunkPtr* chunk, bool* eos) {
VLOG(1) << "SchemaScanNode::GetNext";

DCHECK(state != nullptr && chunk != nullptr && eos != nullptr);
DCHECK(_is_init);

RETURN_IF_CANCELLED(state);
SCOPED_TIMER(_runtime_profile->total_time_counter());

if (reached_limit() || _is_finished) {
*eos = true;
return Status::OK();
}

*chunk = std::make_shared<vectorized::Chunk>();
const std::vector<SlotDescriptor*>& slot_descs = _schema_scanner->get_slot_descs();
// init column information
for (auto& slot_desc : slot_descs) {
ColumnPtr column = vectorized::ColumnHelper::create_column(slot_desc->type(), slot_desc->is_nullable());
(*chunk)->append_column(std::move(column), slot_desc->id());
}

bool scanner_eos = false;
int row_num = 0;

while (true) {
RETURN_IF_CANCELLED(state);

if (reached_limit()) {
_is_finished = true;
// if row_num is greater than 0, in this call, eos = false, and eos will be set to true
// in the next call
if (row_num == 0) {
*eos = true;
}
return Status::OK();
}

if (row_num >= config::vector_chunk_size) {
return Status::OK();
}

RETURN_IF_ERROR(_schema_scanner->get_next(chunk, &scanner_eos));

if (scanner_eos) {
_is_finished = true;
// if row_num is greater than 0, in this call, eos = false, and eos will be set to true
// in the next call
if (row_num == 0) {
*eos = true;
}
return Status::OK();
}
++row_num;

++_num_rows_returned;
COUNTER_SET(_rows_returned_counter, _num_rows_returned);
}

return Status::OK();
}

Status SchemaScanNode::close(RuntimeState* state) {
if (is_closed()) {
return Status::OK();
}
RETURN_IF_ERROR(exec_debug_action(TExecNodePhase::CLOSE));
SCOPED_TIMER(_runtime_profile->total_time_counter());

return ScanNode::close(state);
}

void SchemaScanNode::debug_string(int indentation_level, std::stringstream* out) const {
*out << string(indentation_level * 2, ' ');
*out << "SchemaScanNode(tupleid=" << _tuple_id << " table=" << _table_name;
*out << ")" << std::endl;

for (int i = 0; i < _children.size(); ++i) {
_children[i]->debug_string(indentation_level + 1, out);
}
}

Status SchemaScanNode::set_scan_ranges(const std::vector<TScanRangeParams>& scan_ranges) {
return Status::OK();
}

} // namespace starrocks::vectorized
64 changes: 64 additions & 0 deletions be/src/exec/vectorized/schema_scan_node.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,64 @@
// This file is licensed under the Elastic License 2.0. Copyright 2021 StarRocks Limited.

#pragma once

#include "exec/scan_node.h"
#include "exec/vectorized/schema_scanner.h"
#include "gen_cpp/Descriptors_types.h"
#include "runtime/descriptors.h"

namespace starrocks {
class TextConverter;
class TupleDescriptor;
class RuntimeState;
class Status;
} // namespace starrocks

namespace starrocks::vectorized {

class SchemaScanNode : public ScanNode {
public:
SchemaScanNode(ObjectPool* pool, const TPlanNode& tnode, const DescriptorTbl& descs);
~SchemaScanNode();

// Prepare conjuncts, create Schema columns to slots mapping
// initialize _schema_scanner
Status init(const TPlanNode& tnode, RuntimeState* state = nullptr) override;

// Prepare conjuncts, create Schema columns to slots mapping
// initialize _schema_scanner
Status prepare(RuntimeState* state) override;

// Start Schema scan using _schema_scanner.
Status open(RuntimeState* state) override;

// Fill the next row batch by calling next() on the _schema_scanner,
Status get_next(RuntimeState* state, RowBatch* row_batch, bool* eos) override;

// Fill the next chunk by calling next() on the _schema_scanner,
Status get_next(RuntimeState* state, ChunkPtr* chunk, bool* eos) override;

// Close the _schema_scanner, and report errors.
Status close(RuntimeState* state) override;

// this is no use in this class
Status set_scan_ranges(const std::vector<TScanRangeParams>& scan_ranges) override;

private:
// Write debug string of this into out.
void debug_string(int indentation_level, std::stringstream* out) const override;

bool _is_init;
bool _is_finished = false;
const std::string _table_name;
SchemaScannerParam _scanner_param;
// Tuple id resolved in prepare() to set _tuple_desc;
TupleId _tuple_id;

// Descriptor of dest tuples
const TupleDescriptor* _dest_tuple_desc;
// Jni helper for scanning an schema table.
std::unique_ptr<SchemaScanner> _schema_scanner;
};

} // namespace starrocks::vectorized
Loading

0 comments on commit f0eb713

Please sign in to comment.