diff --git a/examples/README.md b/examples/README.md index f56c56c0ea9..703cc9ff0ab 100644 --- a/examples/README.md +++ b/examples/README.md @@ -8,12 +8,14 @@ This folder lists some examples to run Proton in various use cases. For more rea - cdc: demonstrates how to use Debezium to sync database changes from MySQL to Proton, via Redpanda and show live updates(UPSERT and DELETE) in Proton via changelog stream. +- clickhouse: demonstrates how to read from ClickHouse or write to ClickHouse with the new External Table feature. + - ecommerce: a combination of Proton, Redpanda, owl-shop and Redpanda Console. Owl Shop is an imaginary ecommerce shop that simulates microservices exchanging data via Apache Kafka. Sample data streams are: clickstreams(frontend events), customer info, customer orders. [Learn more](https://docs.timeplus.com/proton-kafka#tutorial) - fraud_detection: demonstrates how to leverage proton to build a real-time fraud detection where proton is used as a real-time feature store. -- hackernews: just two containers: Proton and [a bytewax-based data loader](https://github.com/timeplus-io/proton-python-driver/tree/develop/example/bytewax). Inspired by https://bytewax.io/blog/polling-hacker-news, you can call Hacker News HTTP API with Bytewax and send latest news to Proton for SQL-based analysis. - - grafana: an example of how to use Grafana to connect to Proton and visualize the query results. +- hackernews: just two containers: Proton and [a bytewax-based data loader](https://github.com/timeplus-io/proton-python-driver/tree/develop/example/bytewax). Inspired by https://bytewax.io/blog/polling-hacker-news, you can call Hacker News HTTP API with Bytewax and send latest news to Proton for SQL-based analysis. + - jdbc: demonstrates how to connect to Proton via JDBC using DBeaver or Metabase. \ No newline at end of file diff --git a/examples/clickhouse/README.md b/examples/clickhouse/README.md new file mode 100644 index 00000000000..c47f334f21b --- /dev/null +++ b/examples/clickhouse/README.md @@ -0,0 +1,76 @@ +# Demo for ClickHouse External Table + +This docker compose file demonstrates how to read from ClickHouse or write to ClickHouse with the new [External Table](https://docs.timeplus.com/proton-clickhouse-external-table) feature. + +A YouTube video tutorial is available for visual learners: TBD + +## Start the example + +Simply run `docker compose up` in this folder. Three docker containers in the stack: + +1. ghcr.io/timeplus-io/proton:latest, as the streaming SQL engine. +2. clickhouse/clickhouse-server:latest +3. quay.io/cloudhut/owl-shop:latest, as the data generator. [Owl Shop](https://github.com/cloudhut/owl-shop) is an imaginary ecommerce shop that simulates microservices exchanging data via Apache Kafka. +4. docker.redpanda.com/redpandadata/redpanda, as the Kafka compatiable streaming message bus +5. docker.redpanda.com/redpandadata/console, as the web UI to explore data in Kafka/Redpanda + +When all containers are up running, a few topics will be created in Redpanda with live demo. + +## Read data from Redpanda, apply ETL and write to ClickHouse +Open the `proton client` in the proton container. Run the following SQL to create an external stream to read live data from Redpanda. + +```sql +CREATE EXTERNAL STREAM frontend_events(raw string) +SETTINGS type='kafka', + brokers='redpanda:9092', + topic='owlshop-frontend-events'; +``` + +Open the `clickhouse client` in the clickhouse container. Run the following SQL to create a regular MergeTree table. + +```sql +CREATE TABLE events +( + _tp_time DateTime64(3), + url String, + method String, + ip String +) +ENGINE=MergeTree() +PRIMARY KEY (_tp_time, url); +``` + +Go back to `proton client`, run the following SQL to create an external table to connect to ClickHouse: +```sql +CREATE EXTERNAL TABLE ch_local +SETTINGS type='clickhouse', + address='clickhouse:9000', + table='events'; +``` + +Then create a materialized view to read data from Redpanda, extract the values and turn the IP to masked md5, and send data to the external table. By doing so, the transformed data will be written to ClickHouse continuously. + +```sql +CREATE MATERIALIZED VIEW mv INTO ch_local AS + SELECT now64() AS _tp_time, + raw:requestedUrl AS url, + raw:method AS method, + lower(hex(md5(raw:ipAddress))) AS ip + FROM frontend_events; +``` + +## Read data from ClickHouse + +You can run the following SQL to query ClickHouse: + +```sql +SELECT * FROM ch_local; +``` + +Or apply SQL functions or group by, such as + +```sql +SELECT method, count() AS cnt FROM ch_local GROUP BY method +``` + +Please note, Proton will read all rows with selected columns from the ClickHouse and apply aggregation locally. Check [External Table](https://docs.timeplus.com/proton-clickhouse-external-table) documentation for details. \ No newline at end of file diff --git a/examples/clickhouse/docker-compose.yml b/examples/clickhouse/docker-compose.yml new file mode 100644 index 00000000000..76b75cce0a8 --- /dev/null +++ b/examples/clickhouse/docker-compose.yml @@ -0,0 +1,54 @@ +version: '3.7' +name: proton-ch-demo +volumes: + redpanda: null +services: + proton: + image: ghcr.io/timeplus-io/proton:latest + pull_policy: always + + clickhouse: + image: clickhouse/clickhouse-server:latest + ports: + - 9000:9000 + ulimits: + nofile: + soft: 262144 + hard: 262144 + + redpanda: + image: docker.redpanda.com/redpandadata/redpanda:v23.2.15 + command: + - redpanda start + - --kafka-addr internal://0.0.0.0:9092,external://0.0.0.0:19092 + - --advertise-kafka-addr internal://redpanda:9092,external://localhost:19092 + - --smp 1 + - --memory 1G + - --mode dev-container + volumes: + - redpanda:/var/lib/redpanda/data + + redpanda-console: + image: docker.redpanda.com/redpandadata/console:v2.3.5 + entrypoint: /bin/sh + command: -c "echo \"$$CONSOLE_CONFIG_FILE\" > /tmp/config.yml; /app/console" + environment: + CONFIG_FILEPATH: /tmp/config.yml + CONSOLE_CONFIG_FILE: | + kafka: + brokers: ["redpanda:9092"] + ports: + - 8080:8080 + depends_on: + - redpanda + + owl-shop: + image: quay.io/cloudhut/owl-shop:latest + #platform: 'linux/amd64' + environment: + - SHOP_KAFKA_BROKERS=redpanda:9092 + - SHOP_KAFKA_TOPICREPLICATIONFACTOR=1 + - SHOP_TRAFFIC_INTERVAL_RATE=1 + - SHOP_TRAFFIC_INTERVAL_DURATION=0.1s + depends_on: + - redpanda diff --git a/src/CMakeLists.txt b/src/CMakeLists.txt index 97bf7b74a79..44a21789ffe 100644 --- a/src/CMakeLists.txt +++ b/src/CMakeLists.txt @@ -247,7 +247,11 @@ else() message(FATAL "rdkafka is not enabled which is required") endif() +add_subdirectory(ClickHouse) +add_object_library(clickhouse_clickhouse ClickHouse) + add_subdirectory(Storages/ExternalStream) +add_subdirectory(Storages/ExternalTable) # proton: end set (DBMS_COMMON_LIBRARIES) @@ -264,6 +268,7 @@ endif() target_link_libraries (dbms PRIVATE klog) target_link_libraries (dbms PRIVATE external_stream) +target_link_libraries (dbms PRIVATE external_table) target_link_libraries (dbms PRIVATE checkpoint) set (all_modules dbms) diff --git a/src/ClickHouse/CMakeLists.txt b/src/ClickHouse/CMakeLists.txt new file mode 100644 index 00000000000..3ca0cf6c964 --- /dev/null +++ b/src/ClickHouse/CMakeLists.txt @@ -0,0 +1,3 @@ +# if (ENABLE_TESTS) +# add_subdirectory(tests) +# endif () diff --git a/src/ClickHouse/Client.cpp b/src/ClickHouse/Client.cpp new file mode 100644 index 00000000000..c04b5bd0237 --- /dev/null +++ b/src/ClickHouse/Client.cpp @@ -0,0 +1,273 @@ +#include +#include + +namespace DB +{ + +namespace ErrorCodes +{ +extern const int DEADLOCK_AVOIDED; +extern const int TIMEOUT_EXCEEDED; +extern const int UNKNOWN_PACKET_FROM_SERVER; +extern const int UNEXPECTED_PACKET_FROM_SERVER; +} + +namespace ClickHouse +{ + +namespace +{ + +size_t calculatePollInterval(const ConnectionTimeouts & timeouts) +{ + const auto & receive_timeout = timeouts.receive_timeout; + constexpr size_t default_poll_interval = 1'000'000; /// in microseconds + constexpr size_t min_poll_interval = 5'000; /// in microseconds + return std::max(min_poll_interval, std::min(receive_timeout.totalMicroseconds(), default_poll_interval)); +} + +} + +Client::Client(DB::ConnectionPool::Entry connection_, ConnectionTimeouts timeouts_, Poco::Logger * logger_) + : connection(std::move(connection_)) + , timeouts(std::move(timeouts_)) + , poll_interval(calculatePollInterval(timeouts)) + , logger(logger_) +{ + connection->setCompatibleWithClickHouse(); +} + +void Client::reset() +{ + cancelled = false; + processed_rows = 0; + server_exception = nullptr; +} + +void Client::executeQuery(const String & query, const String & query_id, bool fail_quick) +{ + assert(!has_running_query); + has_running_query = true; + + reset(); + + bool suppress_error_log {false}; + while (true) + { + try + { + connection->sendQuery( + timeouts, + query, + {} /*query_parameters*/, + query_id, + QueryProcessingStage::Complete, + nullptr /*settings*/, + nullptr /*client_info*/, + false); + + break; + } + catch (const Exception & e) + { + if (fail_quick) + e.rethrow(); + + /// connection lost + if (!connection->checkConnected()) + { + if (!suppress_error_log) + LOG_ERROR(logger, "Connection lost"); + /// set the connection not connected so that sendQuery will reconnect + connection->disconnect(); + std::this_thread::sleep_for(std::chrono::seconds(2)); + } + /// Retry when the server said "Client should retry" and no rows has been received yet. + else if (processed_rows == 0 && e.code() == ErrorCodes::DEADLOCK_AVOIDED) + { + if (!suppress_error_log) + LOG_ERROR(logger, "Got a transient error from the server, will retry in 1 second"); + std::this_thread::sleep_for(std::chrono::seconds(1)); + } + else + { + has_running_query = false; + e.rethrow(); + } + + /// Otherwise, it will keep generating the same error log again and again until the connection is back. + suppress_error_log = true; + } + } +} + +void Client::executeInsertQuery(const String & query, const String & query_id) +{ + executeQuery(query, query_id); + receiveEndOfQuery(); +} + +std::optional Client::pollData() +{ + if (!has_running_query) + return std::nullopt; + + while (true) + { + Stopwatch receive_watch(CLOCK_MONOTONIC_COARSE); + + while (true) + { + if (!cancelled) + { + double elapsed = receive_watch.elapsedSeconds(); + if (elapsed > timeouts.receive_timeout.totalSeconds()) + { + cancelQuery(); + + throw Exception(ErrorCodes::TIMEOUT_EXCEEDED, "Timeout exceeded while receiving data from server. Waited for {} seconds, timeout is {} seconds", static_cast(elapsed), timeouts.receive_timeout.totalSeconds()); + + } + } + + /// Poll for changes after a cancellation check, otherwise it never reached + /// because of progress updates from server. + + if (connection->poll(poll_interval)) + break; + } + + if (!receiveAndProcessPacket()) + { + has_running_query = false; + return std::nullopt; + } + + return std::move(polled_data); + } +} + +void Client::cancelQuery() +{ + if (!has_running_query) + return; + + LOG_INFO(logger, "Query cancelled."); + connection->sendCancel(); + cancelled = true; + has_running_query = false; +} + +/// Receive a part of the result, or progress info or an exception and process it. +/// Returns true if one should continue receiving packets. +bool Client::receiveAndProcessPacket() +{ + assert(has_running_query); + + Packet packet = connection->receivePacket(); + + switch (packet.type) + { + case Protocol::Server::PartUUIDs: + return true; + + case Protocol::Server::Data: + processed_rows += packet.block.rows(); + polled_data = std::move(packet.block); + return true; + + case Protocol::Server::Progress: + // on_progress(packet.progress); + return true; + + case Protocol::Server::ProfileInfo: + // on_profile_info(packet.profile_info); + return true; + + case Protocol::Server::Totals: + // on_totals(packet.block); + return true; + + case Protocol::Server::Extremes: + // on_extremes(packet.block); + return true; + + case Protocol::Server::EndOfStream: + onEndOfStream(); + return true; + + case Protocol::Server::Exception: + onServerException(std::move(packet.exception)); + return false; + + case Protocol::Server::Log: + /// on_server_log(packet.block); + return true; + + case Protocol::Server::ProfileEvents: + /// on_profile_event(packet.block); + return true; + + default: + throw Exception( + ErrorCodes::UNKNOWN_PACKET_FROM_SERVER, "Unknown packet {} from server {}", packet.type, connection->getDescription()); + } +} + +/// Process Log packets, exit when receive Exception or EndOfStream +bool Client::receiveEndOfQuery() +{ + while (true) + { + Packet packet = connection->receivePacket(); + + switch (packet.type) + { + case Protocol::Server::EndOfStream: + onEndOfStream(); + return true; + + case Protocol::Server::Exception: + onServerException(std::move(packet.exception)); + return false; + + case Protocol::Server::Log: + /// onLogData(packet.block); + break; + + case Protocol::Server::Progress: + /// onProgress(packet.progress); + break; + + case Protocol::Server::ProfileEvents: + /// onProfileEvents(packet.block); + break; + + default: + throw NetException(ErrorCodes::UNEXPECTED_PACKET_FROM_SERVER, + "Unexpected packet from server (expected Exception, EndOfStream, Log, Progress or ProfileEvents. Got {})", + String(Protocol::Server::toString(packet.type))); + } + } +} + +void Client::onEndOfStream() +{ + has_running_query = false; +} + +void Client::onServerException(std::unique_ptr && exception) +{ + server_exception.swap(exception); + has_running_query = false; +} + +void Client::throwServerExceptionIfAny() +{ + if (server_exception) + server_exception->rethrow(); +} + +} + +} diff --git a/src/ClickHouse/Client.h b/src/ClickHouse/Client.h new file mode 100644 index 00000000000..8cd99bb9c54 --- /dev/null +++ b/src/ClickHouse/Client.h @@ -0,0 +1,60 @@ +#pragma once + +#include + +namespace DB +{ + +namespace ClickHouse +{ + +/// This is a client that is compatiable with the ClickHouse protocol and can be used to talk to ClickHouse servers. +/// Note: +/// * This client is designed to be used in the ClickHouse ExternalTable, so it's not 100% compatiable with ClickHouse protocol, it just needs to make sure the ExternalTable is functional. +/// * A client object should not be shared with multiple threads. +class Client final +{ +public: + + Client(DB::ConnectionPool::Entry connection_, ConnectionTimeouts timeouts_, Poco::Logger * logger_); + + /// Sends the query to the server to execute. For insert queries, use `executeInsertQuery` instead. + /// Make sure keep calling the `pollData` method until it returns an empty optional, until which the + /// client won't be able to execute another query. + void executeQuery(const String & query, const String & query_id = "", bool fail_quick = false); + /// Sends an insert query to the server to execute. The difference between this and executeQuery is that, + /// after calling this method, there is no need to call the `pollData` method. + void executeInsertQuery(const String & query, const String & query_id = ""); + /// Cancels the currently running query, does nothing if there is no queries running. + void cancelQuery(); + /// Polls data for a query previously sent with `executeQuery`. When no more data are available, + /// the returned optional will be empty. + std::optional pollData(); + /// Throw the server exception received from the ClickHouse server if any (during `pollData` or `executeInsertQuery`). + void throwServerExceptionIfAny(); + +private: + bool receiveAndProcessPacket(); + bool receiveEndOfQuery(); + + void reset(); + + void onEndOfStream(); + void onServerException(std::unique_ptr && exception); + + DB::ConnectionPool::Entry connection; + ConnectionTimeouts timeouts; + size_t poll_interval; + + bool has_running_query {false}; + bool cancelled {false}; + size_t processed_rows {0}; + Block polled_data; + std::unique_ptr server_exception; + + Poco::Logger * logger; +}; + +} + +} diff --git a/src/ClickHouse/Sink.cpp b/src/ClickHouse/Sink.cpp new file mode 100644 index 00000000000..087807bc106 --- /dev/null +++ b/src/ClickHouse/Sink.cpp @@ -0,0 +1,82 @@ +#include +#include +#include + +namespace DB +{ + +namespace ClickHouse +{ + +namespace +{ + +String constructInsertQuery(const String & database, const String & table, const Block & header) +{ + assert(header.columns()); + const auto & col_names = header.getNames(); + + auto query = "INSERT INTO " + (database.empty() ? "" : backQuoteIfNeed(database) + ".") + backQuoteIfNeed(table) + " (" + backQuoteIfNeed(col_names[0]); + for (const auto & name : std::vector(std::next(col_names.begin()), col_names.end())) + query.append(", " + backQuoteIfNeed(name)); + query.append(") VALUES "); + + return query; +} + +} + +Sink::Sink( + const String & database, + const String & table, + const Block & header, + std::unique_ptr client_, + ContextPtr context_, + Poco::Logger * logger_) + : SinkToStorage(header, ProcessorID::ExternalTableDataSinkID) + , insert_into(constructInsertQuery(database, table, header)) + , client(std::move(client_)) + , context(context_) + , logger(logger_) +{ + buf = std::make_unique(); + auto format_settings = getFormatSettings(context); + format_settings.values.no_commas_between_rows = true; + output_format = FormatFactory::instance().getOutputFormat("Values", *buf, header, context, {}, format_settings); + output_format->setAutoFlush(); + + LOG_INFO(logger, "ready to send data to ClickHouse table {} with {}", table, insert_into); +} + +namespace +{ + +class BufferResetter +{ +public: +explicit BufferResetter(WriteBufferFromOwnString & buf_): buf(buf_) {} +~BufferResetter() { buf.restart(); } + +private: + WriteBufferFromOwnString & buf; +}; + +} + +void Sink::consume(Chunk chunk) +{ + if (!chunk.rows()) + return; + + BufferResetter reset_buffer(*buf); /// makes sure buf gets reset afterwards + buf->write(insert_into.data(), insert_into.size()); + auto block = getHeader().cloneWithColumns(chunk.detachColumns()); + output_format->write(block); + + client->executeInsertQuery(buf->str()); + client->throwServerExceptionIfAny(); +} + +} + +} diff --git a/src/ClickHouse/Sink.h b/src/ClickHouse/Sink.h new file mode 100644 index 00000000000..e028617b59b --- /dev/null +++ b/src/ClickHouse/Sink.h @@ -0,0 +1,42 @@ +#pragma once + +#include +#include +#include + +namespace DB +{ + +namespace ClickHouse +{ + +class Sink final : public SinkToStorage +{ +public: + Sink( + const String & database, + const String & table, + const Block & header, + std::unique_ptr client_, + ContextPtr context_, + Poco::Logger * logger_); + + String getName() const override { return "ClickHouseSink"; } + + void consume(Chunk chunk) override; + +private: + String insert_into; + + std::unique_ptr client; + + std::unique_ptr buf; + OutputFormatPtr output_format; + + ContextPtr context; + Poco::Logger * logger; +}; + +} + +} diff --git a/src/ClickHouse/Source.cpp b/src/ClickHouse/Source.cpp new file mode 100644 index 00000000000..9c80b22fb65 --- /dev/null +++ b/src/ClickHouse/Source.cpp @@ -0,0 +1,66 @@ +#include +#include + +namespace DB +{ + +namespace ClickHouse +{ + +namespace +{ +String constructSelectQuery(const String & database, const String & table, const Block & header) +{ + assert(header.columns()); + const auto & col_names = header.getNames(); + + auto query = "SELECT " + backQuoteIfNeed(col_names[0]); + for (const auto & name : std::vector(std::next(col_names.begin()), col_names.end())) + query.append(", " + backQuoteIfNeed(name)); + query.append(" FROM " + (database.empty() ? "" : backQuoteIfNeed(database) + ".") + table); + + return query; +} + +} + +Source::Source( + const String & database, + const String & table, + const Block & header, + std::unique_ptr client_, + ContextPtr context_) + : ISource(header, true, ProcessorID::ClickHouseSourceID) + , client(std::move(client_)) + , query(constructSelectQuery(database, table, header)) + , context(context_) +{ +} + +Chunk Source::generate() +{ + if (isCancelled()) + { + if (started) + client->cancelQuery(); + + return {}; + } + + if (!started) + { + started = true; + client->executeQuery(query); + } + + auto block = client->pollData(); + client->throwServerExceptionIfAny(); + if (!block) + return {}; + + return {block->getColumns(), block->rows()}; +} + +} + +} diff --git a/src/ClickHouse/Source.h b/src/ClickHouse/Source.h new file mode 100644 index 00000000000..8a4d320c4f9 --- /dev/null +++ b/src/ClickHouse/Source.h @@ -0,0 +1,38 @@ +#pragma once + +#include +#include + +namespace DB +{ + +namespace ClickHouse +{ + +class Source final : public ISource +{ +public: + Source( + const String & database, + const String & table, + const Block & header, + std::unique_ptr client_, + ContextPtr context_); + + String getName() const override { return "ClickHouseSource"; } + +protected: + Chunk generate() override; + +private: + bool started {false}; + + std::unique_ptr client; + String query; + + ContextPtr context; +}; + +} + +} diff --git a/src/Client/CMakeLists.txt b/src/Client/CMakeLists.txt index 119414a8a70..83bbe418246 100644 --- a/src/Client/CMakeLists.txt +++ b/src/Client/CMakeLists.txt @@ -1,3 +1,3 @@ if (ENABLE_EXAMPLES) add_subdirectory(examples) -endif() \ No newline at end of file +endif() diff --git a/src/Client/Connection.cpp b/src/Client/Connection.cpp index c7d5d441203..b2948c29509 100644 --- a/src/Client/Connection.cpp +++ b/src/Client/Connection.cpp @@ -985,6 +985,10 @@ void Connection::initBlockInput() } block_in = std::make_unique(*maybe_compressed_in, server_revision); + /// proton: starts + if (compatible_with_clickhouse) + block_in->setCompatibleWithClickHouse(); + /// proton: ends } } @@ -995,6 +999,10 @@ void Connection::initBlockLogsInput() { /// Have to return superset of SystemLogsQueue::getSampleBlock() columns block_logs_in = std::make_unique(*in, server_revision); + /// proton: starts + if (compatible_with_clickhouse) + block_logs_in->setCompatibleWithClickHouse(); + /// proton: ends } } @@ -1004,6 +1012,10 @@ void Connection::initBlockProfileEventsInput() if (!block_profile_events_in) { block_profile_events_in = std::make_unique(*in, server_revision); + /// proton: starts + if (compatible_with_clickhouse) + block_profile_events_in->setCompatibleWithClickHouse(); + /// proton: ends } } @@ -1085,4 +1097,17 @@ ServerConnectionPtr Connection::createConnection(const ConnectionParameters & pa parameters.security); } +/// proton: starts +void Connection::setCompatibleWithClickHouse() +{ + compatible_with_clickhouse = true; + if (block_in) + block_in->setCompatibleWithClickHouse(); + if (block_logs_in) + block_logs_in->setCompatibleWithClickHouse(); + if (block_profile_events_in) + block_profile_events_in->setCompatibleWithClickHouse(); +} +/// proton: ends + } diff --git a/src/Client/Connection.h b/src/Client/Connection.h index 243420b0593..40f1c004d40 100644 --- a/src/Client/Connection.h +++ b/src/Client/Connection.h @@ -155,6 +155,10 @@ class Connection : public IServerConnection if (in) in->setAsyncCallback(std::move(async_callback)); } + + /// proton: starts + void setCompatibleWithClickHouse(); + /// proton: ends private: String host; UInt16 port; @@ -273,6 +277,10 @@ class Connection : public IServerConnection void initBlockProfileEventsInput(); [[noreturn]] void throwUnexpectedPacket(UInt64 packet_type, const char * expected) const; + + /// proton: starts + bool compatible_with_clickhouse {false}; + /// proton: ends }; class AsyncCallbackSetter diff --git a/src/Common/IFactoryWithAliases.h b/src/Common/IFactoryWithAliases.h index 35bb9277f80..cdb9100a6f5 100644 --- a/src/Common/IFactoryWithAliases.h +++ b/src/Common/IFactoryWithAliases.h @@ -35,6 +35,16 @@ class IFactoryWithAliases : public IHints<2, IFactoryWithAliases> return name; } + /// proton: starts + String getClickHouseAliasToOrName(const String & name) const + { + if (clickhouse_names.contains(name)) + return clickhouse_names.at(name); + else + return name; + } + /// proton: ends + std::unordered_map case_insensitive_name_mapping; public: @@ -81,6 +91,32 @@ class IFactoryWithAliases : public IHints<2, IFactoryWithAliases> throw Exception(factory_name + ": alias name '" + alias_name + "' is not unique", ErrorCodes::LOGICAL_ERROR); } + /// proton: starts + /// Register the name used by ClickHouse for value + /// real_name have to be already registered. + void registerClickHouseAlias(const String & alias_name, const String & alias_or_real_name) + { + const auto & creator_map = getMap(); + const auto & case_insensitive_creator_map = getCaseInsensitiveMap(); + const String factory_name = getFactoryName(); + + String real_dict_name; + String real_name = alias_or_real_name; + if (auto it = aliases.find(real_name); it != aliases.end()) + real_name = it->second; + if (creator_map.count(real_name)) + real_dict_name = real_name; + else if (auto real_name_lowercase = Poco::toLower(real_name); case_insensitive_creator_map.count(real_name_lowercase)) + real_dict_name = real_name_lowercase; + else + throw Exception(factory_name + ": can't create ClickHouse alias '" + alias_name + "', the real name '" + alias_or_real_name + "' is not registered", + ErrorCodes::LOGICAL_ERROR); + + if (!clickhouse_names.emplace(alias_name, real_dict_name).second) + throw Exception(factory_name + ": ClickHouse alias name '" + alias_name + "' is not unique", ErrorCodes::LOGICAL_ERROR); + } + /// proton: ends + std::vector getAllRegisteredNames() const override { std::vector result; @@ -144,6 +180,11 @@ class IFactoryWithAliases : public IHints<2, IFactoryWithAliases> /// Case insensitive aliases AliasMap case_insensitive_aliases; + + /// proton: starts + /// ClickHouse names map to data_types from previous two maps + AliasMap clickhouse_names; + /// proton: ends }; } diff --git a/src/DataTypes/DataTypeAggregateFunction.cpp b/src/DataTypes/DataTypeAggregateFunction.cpp index c8a09dff73b..1c2a676d589 100644 --- a/src/DataTypes/DataTypeAggregateFunction.cpp +++ b/src/DataTypes/DataTypeAggregateFunction.cpp @@ -159,7 +159,7 @@ SerializationPtr DataTypeAggregateFunction::doGetDefaultSerialization() const } -static DataTypePtr create(const ASTPtr & arguments) +static DataTypePtr create(const ASTPtr & arguments, bool compatible_with_clickhouse = false) /// proton: updated { String function_name; AggregateFunctionPtr function; @@ -228,7 +228,7 @@ static DataTypePtr create(const ASTPtr & arguments) ErrorCodes::BAD_ARGUMENTS); for (size_t i = argument_types_start_idx; i < arguments->children.size(); ++i) - argument_types.push_back(DataTypeFactory::instance().get(arguments->children[i])); + argument_types.push_back(DataTypeFactory::instance().get(arguments->children[i]/* proton: starts */, compatible_with_clickhouse/* proton: ends */)); if (function_name.empty()) throw Exception("Logical error: empty name of aggregate function passed", ErrorCodes::LOGICAL_ERROR); @@ -259,6 +259,10 @@ void setVersionToAggregateFunctions(DataTypePtr & type, bool if_empty, std::opti void registerDataTypeAggregateFunction(DataTypeFactory & factory) { factory.registerDataType("aggregate_function", create); + + /// proton: starts + factory.registerClickHouseAlias("AggregateFunction", "aggregate_function"); + /// proton: ends } } diff --git a/src/DataTypes/DataTypeArray.cpp b/src/DataTypes/DataTypeArray.cpp index 3fc290f8732..9afdb169e74 100644 --- a/src/DataTypes/DataTypeArray.cpp +++ b/src/DataTypes/DataTypeArray.cpp @@ -59,18 +59,22 @@ size_t DataTypeArray::getNumberOfDimensions() const } -static DataTypePtr create(const ASTPtr & arguments) +static DataTypePtr create(const ASTPtr & arguments, bool compatible_with_clickhouse = false) /// proton: updated { if (!arguments || arguments->children.size() != 1) throw Exception("array data type family must have exactly one argument - type of elements", ErrorCodes::NUMBER_OF_ARGUMENTS_DOESNT_MATCH); - return std::make_shared(DataTypeFactory::instance().get(arguments->children[0])); + return std::make_shared(DataTypeFactory::instance().get(arguments->children[0]/* proton: starts */, compatible_with_clickhouse/* proton: ends */)); } void registerDataTypeArray(DataTypeFactory & factory) { factory.registerDataType("array", create); + + /// proton: starts + factory.registerClickHouseAlias("Array", "array"); + /// proton: ends } } diff --git a/src/DataTypes/DataTypeCustomGeo.cpp b/src/DataTypes/DataTypeCustomGeo.cpp index 23c4086267e..98059d7c76f 100644 --- a/src/DataTypes/DataTypeCustomGeo.cpp +++ b/src/DataTypes/DataTypeCustomGeo.cpp @@ -38,6 +38,13 @@ void registerDataTypeDomainGeo(DataTypeFactory & factory) return std::make_pair(DataTypeFactory::instance().get("array(polygon)"), std::make_unique(std::make_unique())); }); + + /// proton: starts + factory.registerClickHouseAlias("Point", "point"); + factory.registerClickHouseAlias("Ring", "ring"); + factory.registerClickHouseAlias("Polygon", "polygon"); + factory.registerClickHouseAlias("MultiPolygon", "multi_polygon"); + /// proton: ends } } diff --git a/src/DataTypes/DataTypeCustomSimpleAggregateFunction.cpp b/src/DataTypes/DataTypeCustomSimpleAggregateFunction.cpp index 7cc4866eb38..90cb39ca218 100644 --- a/src/DataTypes/DataTypeCustomSimpleAggregateFunction.cpp +++ b/src/DataTypes/DataTypeCustomSimpleAggregateFunction.cpp @@ -68,7 +68,7 @@ String DataTypeCustomSimpleAggregateFunction::getName() const } -static std::pair create(const ASTPtr & arguments) +static std::pair create(const ASTPtr & arguments, bool compatible_with_clickhouse = false) /// proton: updated { String function_name; AggregateFunctionPtr function; @@ -119,7 +119,7 @@ static std::pair create(const ASTPtr & argum ErrorCodes::BAD_ARGUMENTS); for (size_t i = 1; i < arguments->children.size(); ++i) - argument_types.push_back(DataTypeFactory::instance().get(arguments->children[i])); + argument_types.push_back(DataTypeFactory::instance().get(arguments->children[i]/* proton: starts */, compatible_with_clickhouse/* proton: ends */)); if (function_name.empty()) throw Exception("Logical error: empty name of aggregate function passed", ErrorCodes::LOGICAL_ERROR); @@ -129,7 +129,7 @@ static std::pair create(const ASTPtr & argum DataTypeCustomSimpleAggregateFunction::checkSupportedFunctions(function); - DataTypePtr storage_type = DataTypeFactory::instance().get(argument_types[0]->getName()); + DataTypePtr storage_type = DataTypeFactory::instance().get(argument_types[0]->getName()/* proton: starts */, compatible_with_clickhouse/* proton: ends */); if (!function->getReturnType()->equals(*removeLowCardinality(storage_type))) { @@ -145,6 +145,10 @@ static std::pair create(const ASTPtr & argum void registerDataTypeDomainSimpleAggregateFunction(DataTypeFactory & factory) { factory.registerDataTypeCustom("simple_aggregate_function", create); + + /// proton: starts + factory.registerClickHouseAlias("SimpleAggregateFunction", "simple_aggregate_function"); + /// proton: ends } } diff --git a/src/DataTypes/DataTypeDate.cpp b/src/DataTypes/DataTypeDate.cpp index d8daf8b97ef..08befbaaf49 100644 --- a/src/DataTypes/DataTypeDate.cpp +++ b/src/DataTypes/DataTypeDate.cpp @@ -18,6 +18,10 @@ SerializationPtr DataTypeDate::doGetDefaultSerialization() const void registerDataTypeDate(DataTypeFactory & factory) { factory.registerSimpleDataType("date", [] { return DataTypePtr(std::make_shared()); }, DataTypeFactory::CaseInsensitive); + + /// proton: starts + factory.registerClickHouseAlias("Date", "date"); + /// proton: ends } } diff --git a/src/DataTypes/DataTypeDate32.cpp b/src/DataTypes/DataTypeDate32.cpp index 02945162d24..174e7e533df 100644 --- a/src/DataTypes/DataTypeDate32.cpp +++ b/src/DataTypes/DataTypeDate32.cpp @@ -18,6 +18,10 @@ void registerDataTypeDate32(DataTypeFactory & factory) { factory.registerSimpleDataType( "date32", [] { return DataTypePtr(std::make_shared()); }, DataTypeFactory::CaseInsensitive); + + /// proton: starts + factory.registerClickHouseAlias("Date32", "date32"); + /// proton: ends } } diff --git a/src/DataTypes/DataTypeDomainBool.cpp b/src/DataTypes/DataTypeDomainBool.cpp index db8e7bd1066..8662a07bba5 100644 --- a/src/DataTypes/DataTypeDomainBool.cpp +++ b/src/DataTypes/DataTypeDomainBool.cpp @@ -15,6 +15,10 @@ void registerDataTypeDomainBool(DataTypeFactory & factory) }); factory.registerAlias("boolean", "bool"); + + /// proton: starts + factory.registerClickHouseAlias("Bool", "bool"); + /// proton: ends } } diff --git a/src/DataTypes/DataTypeEnum.cpp b/src/DataTypes/DataTypeEnum.cpp index 07b9cf88c46..dc33957c854 100644 --- a/src/DataTypes/DataTypeEnum.cpp +++ b/src/DataTypes/DataTypeEnum.cpp @@ -239,7 +239,7 @@ static void autoAssignNumberForEnum(const ASTPtr & arguments) } template -static DataTypePtr createExact(const ASTPtr & arguments) +static DataTypePtr createExact(const ASTPtr & arguments, [[maybe_unused]] bool compatible_with_clickhouse = false) /// proton: updated { if (!arguments || arguments->children.empty()) throw Exception("Data type enum cannot be empty", ErrorCodes::EMPTY_DATA_PASSED); @@ -279,7 +279,7 @@ static DataTypePtr createExact(const ASTPtr & arguments) return std::make_shared(values); } -static DataTypePtr create(const ASTPtr & arguments) +static DataTypePtr create(const ASTPtr & arguments, bool compatible_with_clickhouse = false) /// proton: updated { if (!arguments || arguments->children.empty()) throw Exception("Data type enum cannot be empty", ErrorCodes::EMPTY_DATA_PASSED); @@ -301,10 +301,10 @@ static DataTypePtr create(const ASTPtr & arguments) Int64 value = value_literal->value.get(); if (value > std::numeric_limits::max() || value < std::numeric_limits::min()) - return createExact(arguments); + return createExact(arguments/* proton: starts */, compatible_with_clickhouse/* proton: ends */); } - return createExact(arguments); + return createExact(arguments/* proton: starts */, compatible_with_clickhouse/* proton: ends */); } void registerDataTypeEnum(DataTypeFactory & factory) @@ -315,6 +315,12 @@ void registerDataTypeEnum(DataTypeFactory & factory) /// MySQL /// factory.registerAlias("ENUM", "enum", DataTypeFactory::CaseInsensitive); + + /// proton: starts + factory.registerClickHouseAlias("Enum8", "enum8"); + factory.registerClickHouseAlias("Enum16", "enum16"); + factory.registerClickHouseAlias("Enum", "enum"); + /// proton: ends } } diff --git a/src/DataTypes/DataTypeFactory.cpp b/src/DataTypes/DataTypeFactory.cpp index afa5bd26f2d..6fd3161350b 100644 --- a/src/DataTypes/DataTypeFactory.cpp +++ b/src/DataTypes/DataTypeFactory.cpp @@ -34,7 +34,7 @@ DataTypePtr DataTypeFactory::get(TypeIndex type) const } /// proton: ends. -DataTypePtr DataTypeFactory::get(const String & full_name) const +DataTypePtr DataTypeFactory::get(const String & full_name/* proton: starts*/, bool compatible_with_clickhouse/* proton: ends*/) const { /// Data type parser can be invoked from coroutines with small stack. /// Value 315 is known to cause stack overflow in some test configurations (debug build, sanitizers) @@ -49,21 +49,21 @@ DataTypePtr DataTypeFactory::get(const String & full_name) const ParserDataType parser; ASTPtr ast = parseQuery(parser, full_name.data(), full_name.data() + full_name.size(), "data type", 0, data_type_max_parse_depth); - return get(ast); + return get(ast, compatible_with_clickhouse); } -DataTypePtr DataTypeFactory::get(const ASTPtr & ast) const +DataTypePtr DataTypeFactory::get(const ASTPtr & ast/* proton: starts */, bool compatible_with_clickhouse/* proton: ends*/) const { if (const auto * func = ast->as()) { if (func->parameters) throw Exception("Data type cannot have multiple parenthesized parameters.", ErrorCodes::ILLEGAL_SYNTAX_FOR_DATA_TYPE); - return get(func->name, func->arguments); + return get(func->name, func->arguments, compatible_with_clickhouse); } if (const auto * ident = ast->as()) { - return get(ident->name(), {}); + return get(ident->name(), {}, compatible_with_clickhouse); } if (const auto * lit = ast->as()) @@ -75,11 +75,15 @@ DataTypePtr DataTypeFactory::get(const ASTPtr & ast) const throw Exception("Unexpected AST element for data type.", ErrorCodes::UNEXPECTED_AST_STRUCTURE); } -DataTypePtr DataTypeFactory::get(const String & family_name_param, const ASTPtr & parameters) const +DataTypePtr DataTypeFactory::get(const String & family_name_param, const ASTPtr & parameters/* proton: starts */, bool compatible_with_clickhouse/* proton: ends */) const { - String family_name = getAliasToOrName(family_name_param); + String family_name; + if (compatible_with_clickhouse) + family_name = getAliasToOrName(getClickHouseAliasToOrName(family_name_param)); + else + family_name = getAliasToOrName(family_name_param); - if (endsWith(family_name, "_with_dictionary")) + if (endsWith(family_name, "_with_dictionary")/* proton: starts */ || (compatible_with_clickhouse && endsWith(family_name, "WithDictionary"))/* proton: ends */) { ASTPtr low_cardinality_params = std::make_shared(); String param_name = family_name.substr(0, family_name.size() - strlen("_with_dictionary")); @@ -96,7 +100,7 @@ DataTypePtr DataTypeFactory::get(const String & family_name_param, const ASTPtr return get("low_cardinality", low_cardinality_params); } - return findCreatorByName(family_name)(parameters); + return findCreatorByName(family_name)(parameters, compatible_with_clickhouse); } DataTypePtr DataTypeFactory::getCustom(DataTypeCustomDescPtr customization) const @@ -138,7 +142,7 @@ void DataTypeFactory::registerSimpleDataType(const String & name, SimpleCreator throw Exception("DataTypeFactory: the data type " + name + " has been provided " " a null constructor", ErrorCodes::LOGICAL_ERROR); - registerDataType(name, [name, creator](const ASTPtr & ast) + registerDataType(name, [name, creator](const ASTPtr & ast/* proton: starts */, bool compatible_with_clickhouse [[maybe_unused]]/* proton: ends */) { if (ast) throw Exception("Data type " + name + " cannot have arguments", ErrorCodes::DATA_TYPE_CANNOT_HAVE_ARGUMENTS); @@ -148,9 +152,9 @@ void DataTypeFactory::registerSimpleDataType(const String & name, SimpleCreator void DataTypeFactory::registerDataTypeCustom(const String & family_name, CreatorWithCustom creator, CaseSensitiveness case_sensitiveness) { - registerDataType(family_name, [creator](const ASTPtr & ast) + registerDataType(family_name, [creator](const ASTPtr & ast/* proton: starts */, bool compatible_with_clickhouse [[maybe_unused]]/* proton: ends */) { - auto res = creator(ast); + auto res = creator(ast, compatible_with_clickhouse); res.first->setCustomization(std::move(res.second)); return res.first; @@ -159,7 +163,7 @@ void DataTypeFactory::registerDataTypeCustom(const String & family_name, Creator void DataTypeFactory::registerSimpleDataTypeCustom(const String &name, SimpleCreatorWithCustom creator, CaseSensitiveness case_sensitiveness) { - registerDataTypeCustom(name, [creator](const ASTPtr & /*ast*/) + registerDataTypeCustom(name, [creator](const ASTPtr & /*ast*//* proton: starts */, bool compatible_with_clickhouse [[maybe_unused]]/* proton: ends */) { return creator(); }, case_sensitiveness); diff --git a/src/DataTypes/DataTypeFactory.h b/src/DataTypes/DataTypeFactory.h index 6301548548d..56c82972dc9 100644 --- a/src/DataTypes/DataTypeFactory.h +++ b/src/DataTypes/DataTypeFactory.h @@ -20,12 +20,12 @@ using DataTypePtr = std::shared_ptr; /** Creates a data type by name of data type family and parameters. */ -class DataTypeFactory final : private boost::noncopyable, public IFactoryWithAliases> +class DataTypeFactory final : private boost::noncopyable, public IFactoryWithAliases> { private: using SimpleCreator = std::function; using DataTypesDictionary = std::unordered_map; - using CreatorWithCustom = std::function(const ASTPtr & parameters)>; + using CreatorWithCustom = std::function(const ASTPtr & parameters, bool compatible_with_clickhouse [[maybe_unused]])>; using SimpleCreatorWithCustom = std::function()>; public: @@ -35,9 +35,9 @@ class DataTypeFactory final : private boost::noncopyable, public IFactoryWithAli DataTypePtr get(TypeIndex type) const; /// proton: ends. - DataTypePtr get(const String & full_name) const; - DataTypePtr get(const String & family_name, const ASTPtr & parameters) const; - DataTypePtr get(const ASTPtr & ast) const; + DataTypePtr get(const String & full_name, bool compatible_with_clickhouse = false) const; /// proton: updated + DataTypePtr get(const String & family_name, const ASTPtr & parameters, bool compatible_with_clickhouse = false) const; /// proton: updated + DataTypePtr get(const ASTPtr & ast, bool compatible_with_clickhouse = false) const; /// proton: updated DataTypePtr getCustom(DataTypeCustomDescPtr customization) const; /// Register a type family by its name. diff --git a/src/DataTypes/DataTypeFixedString.cpp b/src/DataTypes/DataTypeFixedString.cpp index 6b87b530261..635621c9df0 100644 --- a/src/DataTypes/DataTypeFixedString.cpp +++ b/src/DataTypes/DataTypeFixedString.cpp @@ -44,7 +44,7 @@ SerializationPtr DataTypeFixedString::doGetDefaultSerialization() const } -static DataTypePtr create(const ASTPtr & arguments) +static DataTypePtr create(const ASTPtr & arguments, [[maybe_unused]] bool compatible_with_clickhouse = false) /// proton: updated { if (!arguments || arguments->children.size() != 1) throw Exception("The fixed_string data type family must have exactly one argument - size in bytes", ErrorCodes::NUMBER_OF_ARGUMENTS_DOESNT_MATCH); @@ -60,6 +60,10 @@ static DataTypePtr create(const ASTPtr & arguments) void registerDataTypeFixedString(DataTypeFactory & factory) { factory.registerDataType("fixed_string", create); + + /// proton: starts + factory.registerClickHouseAlias("FixedString", "fixed_string"); + /// proton: ends } } diff --git a/src/DataTypes/DataTypeIPv4andIPv6.cpp b/src/DataTypes/DataTypeIPv4andIPv6.cpp index 6d091182cd7..c73afc245f1 100644 --- a/src/DataTypes/DataTypeIPv4andIPv6.cpp +++ b/src/DataTypes/DataTypeIPv4andIPv6.cpp @@ -12,6 +12,13 @@ void registerDataTypeIPv4andIPv6(DataTypeFactory & factory) factory.registerAlias("inet4", "ipv4", DataTypeFactory::CaseInsensitive); factory.registerSimpleDataType("ipv6", [] { return DataTypePtr(std::make_shared()); }, DataTypeFactory::CaseInsensitive); factory.registerAlias("inet6", "ipv6", DataTypeFactory::CaseInsensitive); + + /// proton: starts + factory.registerClickHouseAlias("IPv4", "ipv4"); + factory.registerClickHouseAlias("INET4", "inet4"); + factory.registerClickHouseAlias("IPv6", "ipv6"); + factory.registerClickHouseAlias("INET6", "inet6"); + /// proton: ends } } diff --git a/src/DataTypes/DataTypeInterval.cpp b/src/DataTypes/DataTypeInterval.cpp index f82d5066fb8..1a1717722ab 100644 --- a/src/DataTypes/DataTypeInterval.cpp +++ b/src/DataTypes/DataTypeInterval.cpp @@ -24,6 +24,20 @@ void registerDataTypeInterval(DataTypeFactory & factory) factory.registerSimpleDataType("interval_month", [] { return DataTypePtr(std::make_shared(IntervalKind::Month)); }); factory.registerSimpleDataType("interval_quarter", [] { return DataTypePtr(std::make_shared(IntervalKind::Quarter)); }); factory.registerSimpleDataType("interval_year", [] { return DataTypePtr(std::make_shared(IntervalKind::Year)); }); + + /// proton: starts + factory.registerClickHouseAlias("IntervalNanosecond", "interval_nanosecond"); + factory.registerClickHouseAlias("IntervalMicrosecond", "interval_microsecond"); + factory.registerClickHouseAlias("IntervalMillisecond", "interval_millisecond"); + factory.registerClickHouseAlias("IntervalSecond", "interval_second"); + factory.registerClickHouseAlias("IntervalMinute", "interval_minute"); + factory.registerClickHouseAlias("IntervalHour", "interval_hour"); + factory.registerClickHouseAlias("IntervalDay", "interval_day"); + factory.registerClickHouseAlias("IntervalWeek", "interval_week"); + factory.registerClickHouseAlias("IntervalMonth", "interval_month"); + factory.registerClickHouseAlias("IntervalQuarter", "interval_quarter"); + factory.registerClickHouseAlias("IntervalYear", "interval_year"); + /// proton: ends } } diff --git a/src/DataTypes/DataTypeLowCardinality.cpp b/src/DataTypes/DataTypeLowCardinality.cpp index bae6c1f70cc..95323b30331 100644 --- a/src/DataTypes/DataTypeLowCardinality.cpp +++ b/src/DataTypes/DataTypeLowCardinality.cpp @@ -150,18 +150,22 @@ SerializationPtr DataTypeLowCardinality::doGetDefaultSerialization() const } -static DataTypePtr create(const ASTPtr & arguments) +static DataTypePtr create(const ASTPtr & arguments, bool compatible_with_clickhouse = false) /// proton: updated { if (!arguments || arguments->children.size() != 1) throw Exception("The low_cardinality data type family must have single argument - type of elements", ErrorCodes::NUMBER_OF_ARGUMENTS_DOESNT_MATCH); - return std::make_shared(DataTypeFactory::instance().get(arguments->children[0])); + return std::make_shared(DataTypeFactory::instance().get(arguments->children[0]/* proton: starts */, compatible_with_clickhouse/* proton: ends */)); } void registerDataTypeLowCardinality(DataTypeFactory & factory) { factory.registerDataType("low_cardinality", create); + + /// proton: starts + factory.registerClickHouseAlias("LowCardinality", "low_cardinality"); + /// proton: ends } diff --git a/src/DataTypes/DataTypeMap.cpp b/src/DataTypes/DataTypeMap.cpp index 6bca147022d..6face18f595 100644 --- a/src/DataTypes/DataTypeMap.cpp +++ b/src/DataTypes/DataTypeMap.cpp @@ -127,7 +127,7 @@ bool DataTypeMap::checkKeyType(DataTypePtr key_type) return true; } -static DataTypePtr create(const ASTPtr & arguments) +static DataTypePtr create(const ASTPtr & arguments, bool compatible_with_clickhouse = false) /// proton: updated { if (!arguments || arguments->children.size() != 2) throw Exception("The map data type family must have two arguments: key and value types", ErrorCodes::NUMBER_OF_ARGUMENTS_DOESNT_MATCH); @@ -136,7 +136,7 @@ static DataTypePtr create(const ASTPtr & arguments) nested_types.reserve(arguments->children.size()); for (const ASTPtr & child : arguments->children) - nested_types.emplace_back(DataTypeFactory::instance().get(child)); + nested_types.emplace_back(DataTypeFactory::instance().get(child/* proton: starts */, compatible_with_clickhouse/* proton: ends */)); return std::make_shared(nested_types); } @@ -145,5 +145,9 @@ static DataTypePtr create(const ASTPtr & arguments) void registerDataTypeMap(DataTypeFactory & factory) { factory.registerDataType("map", create); + + /// proton: starts + factory.registerClickHouseAlias("Map", "map"); + /// proton: ends } } diff --git a/src/DataTypes/DataTypeNested.cpp b/src/DataTypes/DataTypeNested.cpp index f0ef3c638d1..0d69396ac26 100644 --- a/src/DataTypes/DataTypeNested.cpp +++ b/src/DataTypes/DataTypeNested.cpp @@ -32,7 +32,7 @@ String DataTypeNestedCustomName::getName() const return s.str(); } -static std::pair create(const ASTPtr & arguments) +static std::pair create(const ASTPtr & arguments, bool compatible_with_clickhouse = false) /// proton: updated { if (!arguments || arguments->children.empty()) throw Exception("The nested cannot be empty", ErrorCodes::EMPTY_DATA_PASSED); @@ -48,7 +48,7 @@ static std::pair create(const ASTPtr & argum if (!name_type) throw Exception(ErrorCodes::BAD_ARGUMENTS, "Data type Nested accepts only pairs with name and type"); - auto nested_type = DataTypeFactory::instance().get(name_type->type); + auto nested_type = DataTypeFactory::instance().get(name_type->type/* proton: starts */, compatible_with_clickhouse/* proton: ends */); nested_types.push_back(std::move(nested_type)); nested_names.push_back(name_type->name); } @@ -61,7 +61,11 @@ static std::pair create(const ASTPtr & argum void registerDataTypeNested(DataTypeFactory & factory) { - return factory.registerDataTypeCustom("nested", create); + factory.registerDataTypeCustom("nested", create); + + /// proton: starts + factory.registerClickHouseAlias("Nested", "nested"); + /// proton: ends } DataTypePtr createNested(const DataTypes & types, const Names & names) diff --git a/src/DataTypes/DataTypeNothing.cpp b/src/DataTypes/DataTypeNothing.cpp index 09019b2c83b..180f55846c5 100644 --- a/src/DataTypes/DataTypeNothing.cpp +++ b/src/DataTypes/DataTypeNothing.cpp @@ -26,6 +26,10 @@ SerializationPtr DataTypeNothing::doGetDefaultSerialization() const void registerDataTypeNothing(DataTypeFactory & factory) { factory.registerSimpleDataType("nothing", [] { return DataTypePtr(std::make_shared()); }); + + /// proton: starts + factory.registerClickHouseAlias("Nothing", "nothing"); + /// proton: ends } } diff --git a/src/DataTypes/DataTypeNullable.cpp b/src/DataTypes/DataTypeNullable.cpp index d2e192e2b7f..574900d76ca 100644 --- a/src/DataTypes/DataTypeNullable.cpp +++ b/src/DataTypes/DataTypeNullable.cpp @@ -61,12 +61,12 @@ SerializationPtr DataTypeNullable::doGetDefaultSerialization() const } -static DataTypePtr create(const ASTPtr & arguments) +static DataTypePtr create(const ASTPtr & arguments, bool compatible_with_clickhouse = false) /// proton: updated { if (!arguments || arguments->children.size() != 1) throw Exception("Nullable data type family must have exactly one argument - nested type", ErrorCodes::NUMBER_OF_ARGUMENTS_DOESNT_MATCH); - DataTypePtr nested_type = DataTypeFactory::instance().get(arguments->children[0]); + DataTypePtr nested_type = DataTypeFactory::instance().get(arguments->children[0]/* proton: starts */, compatible_with_clickhouse/* proton: ends */); return std::make_shared(nested_type); } @@ -75,6 +75,10 @@ static DataTypePtr create(const ASTPtr & arguments) void registerDataTypeNullable(DataTypeFactory & factory) { factory.registerDataType("nullable", create); + + /// proton: starts + factory.registerClickHouseAlias("Nullable", "nullable"); + /// proton: ends } diff --git a/src/DataTypes/DataTypeObject.cpp b/src/DataTypes/DataTypeObject.cpp index 30b5864c1af..9d43a911fe2 100644 --- a/src/DataTypes/DataTypeObject.cpp +++ b/src/DataTypes/DataTypeObject.cpp @@ -67,5 +67,9 @@ void registerDataTypeObject(DataTypeFactory & factory) factory.registerSimpleDataType( "json", [] { return std::make_shared("json", false); }, DataTypeFactory::CaseInsensitive); /// factory.registerSimpleDataType("nullable_json", [] { return std::make_shared("json", true); }, DataTypeFactory::CaseInsensitive); + + /// proton: starts + factory.registerClickHouseAlias("JSON", "json"); + /// proton: ends } } diff --git a/src/DataTypes/DataTypeString.cpp b/src/DataTypes/DataTypeString.cpp index caea32299e5..29283806167 100644 --- a/src/DataTypes/DataTypeString.cpp +++ b/src/DataTypes/DataTypeString.cpp @@ -39,7 +39,7 @@ SerializationPtr DataTypeString::doGetDefaultSerialization() const return std::make_shared(); } -static DataTypePtr create(const ASTPtr & arguments) +static DataTypePtr create(const ASTPtr & arguments, [[maybe_unused]] bool compatible_with_clickhouse = false) /// proton: updated { if (arguments && !arguments->children.empty()) { @@ -94,5 +94,8 @@ void registerDataTypeString(DataTypeFactory & factory) /// factory.registerAlias("VARBINARY", "string", DataTypeFactory::CaseInsensitive); /// factory.registerAlias("GEOMETRY", "string", DataTypeFactory::CaseInsensitive); //mysql + /// proton: starts + factory.registerClickHouseAlias("String", "string"); + /// proton: ends } } diff --git a/src/DataTypes/DataTypeTuple.cpp b/src/DataTypes/DataTypeTuple.cpp index 5bcb7a3edba..c063b73f418 100644 --- a/src/DataTypes/DataTypeTuple.cpp +++ b/src/DataTypes/DataTypeTuple.cpp @@ -339,7 +339,7 @@ SerializationInfoPtr DataTypeTuple::getSerializationInfo(const IColumn & column) } -static DataTypePtr create(const ASTPtr & arguments) +static DataTypePtr create(const ASTPtr & arguments, bool compatible_with_clickhouse = false) /// proton: updated { if (!arguments || arguments->children.empty()) throw Exception("The tuple cannot be empty", ErrorCodes::EMPTY_DATA_PASSED); @@ -354,11 +354,11 @@ static DataTypePtr create(const ASTPtr & arguments) { if (const auto * name_and_type_pair = child->as()) { - nested_types.emplace_back(DataTypeFactory::instance().get(name_and_type_pair->type)); + nested_types.emplace_back(DataTypeFactory::instance().get(name_and_type_pair->type/* proton: starts */, compatible_with_clickhouse/* proton: ends */)); names.emplace_back(name_and_type_pair->name); } else - nested_types.emplace_back(DataTypeFactory::instance().get(child)); + nested_types.emplace_back(DataTypeFactory::instance().get(child/* proton: starts */, compatible_with_clickhouse/* proton: ends */)); } if (names.empty()) @@ -373,6 +373,10 @@ static DataTypePtr create(const ASTPtr & arguments) void registerDataTypeTuple(DataTypeFactory & factory) { factory.registerDataType("tuple", create); + + /// proton: starts + factory.registerClickHouseAlias("Tuple", "tuple"); + /// proton: ends } } diff --git a/src/DataTypes/DataTypeUUID.cpp b/src/DataTypes/DataTypeUUID.cpp index e4044b3afd5..d87783bb01c 100644 --- a/src/DataTypes/DataTypeUUID.cpp +++ b/src/DataTypes/DataTypeUUID.cpp @@ -29,6 +29,10 @@ MutableColumnPtr DataTypeUUID::createColumn() const void registerDataTypeUUID(DataTypeFactory & factory) { factory.registerSimpleDataType("uuid", [] { return DataTypePtr(std::make_shared()); }); + + /// proton: starts + factory.registerClickHouseAlias("UUID", "uuid"); + /// proton: ends } } diff --git a/src/DataTypes/DataTypesDecimal.cpp b/src/DataTypes/DataTypesDecimal.cpp index 1f7313fa946..a65a51f5403 100644 --- a/src/DataTypes/DataTypesDecimal.cpp +++ b/src/DataTypes/DataTypesDecimal.cpp @@ -65,7 +65,7 @@ SerializationPtr DataTypeDecimal::doGetDefaultSerialization() const } -static DataTypePtr create(const ASTPtr & arguments) +static DataTypePtr create(const ASTPtr & arguments, [[maybe_unused]] bool compatible_with_clickhouse = false) /// proton: updated { if (!arguments || arguments->children.size() != 2) throw Exception("The decimal data type family must have exactly two arguments: precision and scale", @@ -85,7 +85,7 @@ static DataTypePtr create(const ASTPtr & arguments) } template -static DataTypePtr createExact(const ASTPtr & arguments) +static DataTypePtr createExact(const ASTPtr & arguments, [[maybe_unused]] bool compatible_with_clickhouse = false) /// proton: updated { if (!arguments || arguments->children.size() != 1) throw Exception("The decimal data type family must have exactly two arguments: precision and scale", @@ -113,6 +113,15 @@ void registerDataTypeDecimal(DataTypeFactory & factory) /// factory.registerAlias("DEC", "decimal", DataTypeFactory::CaseInsensitive); /// factory.registerAlias("NUMERIC", "decimal", DataTypeFactory::CaseInsensitive); /// factory.registerAlias("FIXED", "decimal", DataTypeFactory::CaseInsensitive); + + /// proton: starts + factory.registerClickHouseAlias("Decimal32", "decimal32"); + factory.registerClickHouseAlias("Decimal64", "decimal64"); + factory.registerClickHouseAlias("Decimal128", "decimal128"); + factory.registerClickHouseAlias("Decimal256", "decimal256"); + + factory.registerClickHouseAlias("Decimal", "decimal"); + /// proton: ends } /// Explicit template instantiations. diff --git a/src/DataTypes/DataTypesNumber.cpp b/src/DataTypes/DataTypesNumber.cpp index 96eaadc9efd..2f5263bd64d 100644 --- a/src/DataTypes/DataTypesNumber.cpp +++ b/src/DataTypes/DataTypesNumber.cpp @@ -14,7 +14,7 @@ namespace ErrorCodes } template -static DataTypePtr createNumericDataType(const ASTPtr & arguments) +static DataTypePtr createNumericDataType(const ASTPtr & arguments/* proton: starts */, [[maybe_unused]] bool compatible_with_clickhouse = false/* proton: ends */) { if (arguments) { @@ -32,7 +32,6 @@ static DataTypePtr createNumericDataType(const ASTPtr & arguments) return std::make_shared>(); } - void registerDataTypeNumbers(DataTypeFactory & factory) { factory.registerDataType("uint8", createNumericDataType); @@ -91,6 +90,36 @@ void registerDataTypeNumbers(DataTypeFactory & factory) /// factory.registerAlias("SET", "uint64", DataTypeFactory::CaseInsensitive); /// MySQL /// factory.registerAlias("YEAR", "uint16", DataTypeFactory::CaseInsensitive); /// factory.registerAlias("TIME", "int64", DataTypeFactory::CaseInsensitive); + + /// proton: starts + factory.registerClickHouseAlias("UInt8", "uint8"); + factory.registerClickHouseAlias("UInt16", "uint16"); + factory.registerClickHouseAlias("UInt32", "uint32"); + factory.registerClickHouseAlias("UInt64", "uint64"); + + factory.registerClickHouseAlias("Int8", "int8"); + factory.registerClickHouseAlias("Int16", "int16"); + factory.registerClickHouseAlias("Int32", "int32"); + factory.registerClickHouseAlias("Int64", "int64"); + + factory.registerClickHouseAlias("Float32", "float32"); + factory.registerClickHouseAlias("Float64", "float64"); + + factory.registerClickHouseAlias("UInt128", "uint128"); + factory.registerClickHouseAlias("UInt256", "uint256"); + + factory.registerClickHouseAlias("Int128", "int128"); + factory.registerClickHouseAlias("Int256", "int256"); + + factory.registerClickHouseAlias("BYTE", "byte"); + factory.registerClickHouseAlias("SMALLINT", "smallint"); + factory.registerClickHouseAlias("INT", "int"); + factory.registerClickHouseAlias("UINT", "uint"); + factory.registerClickHouseAlias("INTEGER", "integer"); + factory.registerClickHouseAlias("BIGINT", "bigint"); + factory.registerClickHouseAlias("FLOAT", "float"); + factory.registerClickHouseAlias("DOUBLE", "double"); + /// proton: ends } } diff --git a/src/DataTypes/registerDataTypeDateTime.cpp b/src/DataTypes/registerDataTypeDateTime.cpp index bdc936daf85..d1b4dbb6a4d 100644 --- a/src/DataTypes/registerDataTypeDateTime.cpp +++ b/src/DataTypes/registerDataTypeDateTime.cpp @@ -58,7 +58,7 @@ getArgument(const ASTPtr & arguments, size_t argument_index, const char * argume return argument->value.get(); } -static DataTypePtr create(const ASTPtr & arguments) +static DataTypePtr create(const ASTPtr & arguments/* proton: starts */, [[maybe_unused]] bool compatible_with_clickhouse/* proton: ends */) { if (!arguments || arguments->children.empty()) return std::make_shared(); @@ -77,7 +77,7 @@ static DataTypePtr create(const ASTPtr & arguments) return std::make_shared(timezone.value_or(String{})); } -static DataTypePtr create32(const ASTPtr & arguments) +static DataTypePtr create32(const ASTPtr & arguments, [[maybe_unused]] bool compatible_with_clickhouse = false) /// proton: updated { if (!arguments || arguments->children.empty()) return std::make_shared(); @@ -90,7 +90,7 @@ static DataTypePtr create32(const ASTPtr & arguments) return std::make_shared(timezone); } -static DataTypePtr create64(const ASTPtr & arguments) +static DataTypePtr create64(const ASTPtr & arguments, [[maybe_unused]] bool compatible_with_clickhouse = false) /// proton: updated { if (!arguments || arguments->children.empty()) return std::make_shared(DataTypeDateTime64::default_scale); @@ -111,6 +111,12 @@ void registerDataTypeDateTime(DataTypeFactory & factory) factory.registerDataType("datetime64", create64, DataTypeFactory::CaseInsensitive); /// factory.registerAlias("TIMESTAMP", "datetime", DataTypeFactory::CaseInsensitive); + + /// proton: starts + factory.registerClickHouseAlias("Datetime", "datetime"); + factory.registerClickHouseAlias("Datetime32", "datetime32"); + factory.registerClickHouseAlias("Datetime64", "datetime64"); + /// proton: ends } } diff --git a/src/Formats/FormatSettings.h b/src/Formats/FormatSettings.h index 73e30f81b8c..08ff2975483 100644 --- a/src/Formats/FormatSettings.h +++ b/src/Formats/FormatSettings.h @@ -230,6 +230,9 @@ struct FormatSettings bool interpret_expressions = true; bool deduce_templates_of_expressions = true; bool accurate_types_of_literals = true; + /// proton: starts + bool no_commas_between_rows = false; + /// proton: ends } values; struct diff --git a/src/Formats/NativeReader.cpp b/src/Formats/NativeReader.cpp index bdc144a929e..068682a04e3 100644 --- a/src/Formats/NativeReader.cpp +++ b/src/Formats/NativeReader.cpp @@ -140,7 +140,7 @@ Block NativeReader::read() /// Type String type_name; readStringBinary(type_name, istr); - column.type = data_type_factory.get(type_name); + column.type = data_type_factory.get(type_name/* proton: starts */, compatible_with_clickhouse/* proton: ends */); setVersionToAggregateFunctions(column.type, true, server_revision); diff --git a/src/Formats/NativeReader.h b/src/Formats/NativeReader.h index 1f9eb8b9764..fa1e3240aab 100644 --- a/src/Formats/NativeReader.h +++ b/src/Formats/NativeReader.h @@ -39,6 +39,10 @@ class NativeReader Block read(); + /// proton: starts + void setCompatibleWithClickHouse() { compatible_with_clickhouse = true; } + /// proton: ends + private: ReadBuffer & istr; Block header; @@ -55,6 +59,10 @@ class NativeReader PODArray avg_value_size_hints; void updateAvgValueSizeHints(const Block & block); + + /// proton: starts + bool compatible_with_clickhouse {false}; + /// proton: ends }; } diff --git a/src/Interpreters/InterpreterCreateQuery.cpp b/src/Interpreters/InterpreterCreateQuery.cpp index 9cffb39a70a..63a9dfec8c0 100644 --- a/src/Interpreters/InterpreterCreateQuery.cpp +++ b/src/Interpreters/InterpreterCreateQuery.cpp @@ -892,6 +892,11 @@ void InterpreterCreateQuery::handleExternalStreamCreation(ASTCreateQuery & creat if (!create.is_external) return; + if (create.storage + && create.storage->engine + && create.storage->engine->name == "ExternalTable") + return; + auto sharding_expr_field = Field(""); String sharding_expr; @@ -916,10 +921,6 @@ void InterpreterCreateQuery::handleExternalStreamCreation(ASTCreateQuery & creat create.storage->set(create.storage->engine, makeASTFunction("ExternalStream", sharding_expr_ast)); } - - - if (create.storage->engine->name != "ExternalStream") - throw Exception(ErrorCodes::INCORRECT_QUERY, "External stream requires ExternalStream engine"); } /// proton: ends diff --git a/src/Interpreters/InterpreterDropQuery.cpp b/src/Interpreters/InterpreterDropQuery.cpp index 57a6267b223..3aefd1ccfb4 100644 --- a/src/Interpreters/InterpreterDropQuery.cpp +++ b/src/Interpreters/InterpreterDropQuery.cpp @@ -120,6 +120,11 @@ BlockIO InterpreterDropQuery::executeToTableImpl(ContextPtr context_, ASTDropQue throw Exception(ErrorCodes::INCORRECT_QUERY, "It {} is not a Dictionary", table_id.getNameForLogs()); /// proton: ends + /// proton: starts + if (ast_drop_query.is_external_table && !table->isExternalTable()) + throw Exception(ErrorCodes::INCORRECT_QUERY, "It {} is not a External Table", table_id.getNameForLogs()); + /// proton: ends + /// Now get UUID, so we can wait for table data to be finally dropped table_id.uuid = database->tryGetTableUUID(table_id.table_name); @@ -137,6 +142,11 @@ BlockIO InterpreterDropQuery::executeToTableImpl(ContextPtr context_, ASTDropQue if (query.kind == ASTDropQuery::Kind::Detach) { + /// proton: starts + if (table->isExternalTable()) + throw Exception("Cannot DETACH external table", ErrorCodes::NOT_IMPLEMENTED); + /// proton: ends + context_->checkAccess(drop_storage, table_id); if (table->isDictionary()) @@ -174,6 +184,11 @@ BlockIO InterpreterDropQuery::executeToTableImpl(ContextPtr context_, ASTDropQue } else if (query.kind == ASTDropQuery::Kind::Truncate) { + /// proton: starts + if (table->isExternalTable()) + throw Exception("Cannot TRUNCATE external table", ErrorCodes::SYNTAX_ERROR); + /// proton: ends + if (table->isDictionary()) throw Exception("Cannot TRUNCATE dictionary", ErrorCodes::SYNTAX_ERROR); diff --git a/src/Parsers/ASTCreateQuery.cpp b/src/Parsers/ASTCreateQuery.cpp index c76fd41b0ad..608349d77e9 100644 --- a/src/Parsers/ASTCreateQuery.cpp +++ b/src/Parsers/ASTCreateQuery.cpp @@ -36,7 +36,8 @@ ASTPtr ASTStorage::clone() const void ASTStorage::formatImpl(const FormatSettings & s, FormatState & state, FormatStateStacked frame) const { - if (engine) + if (engine + /*proton: starts*/ && engine->name != "ExternalTable" /*proton: ends*/) { s.ostr << (s.hilite ? hilite_keyword : "") << s.nl_or_ws << "ENGINE" << (s.hilite ? hilite_none : "") << " = "; engine->formatImpl(s, state, frame); @@ -277,6 +278,8 @@ void ASTCreateQuery::formatQueryImpl(const FormatSettings & settings, FormatStat what = "MATERIALIZED VIEW"; else if (is_random) what = "RANDOM STREAM"; + else if (storage && storage->engine && storage->engine->name == "ExternalTable") + what = "EXTERNAL TABLE"; /// proton: ends. settings.ostr diff --git a/src/Parsers/ASTDropQuery.h b/src/Parsers/ASTDropQuery.h index 2e67eaf3692..ffda9138fa2 100644 --- a/src/Parsers/ASTDropQuery.h +++ b/src/Parsers/ASTDropQuery.h @@ -33,6 +33,10 @@ class ASTDropQuery : public ASTQueryWithTableAndOutput, public ASTQueryWithOnClu bool no_delay{false}; + /// proton: starts + bool is_external_table{false}; + /// proton: ends + // We detach the object permanently, so it will not be reattached back during server restart. bool permanently{false}; diff --git a/src/Parsers/ParserCreateExternalTableQuery.cpp b/src/Parsers/ParserCreateExternalTableQuery.cpp new file mode 100644 index 00000000000..8c0cddd9c3b --- /dev/null +++ b/src/Parsers/ParserCreateExternalTableQuery.cpp @@ -0,0 +1,83 @@ +#include +#include +#include +#include +#include +#include + +namespace DB +{ + +bool DB::ParserCreateExternalTableQuery::parseImpl(Pos & pos, ASTPtr & node, Expected & expected, [[ maybe_unused ]] bool hint) +{ + ParserKeyword s_create("CREATE"); + ParserKeyword s_attach("ATTACH"); + ParserKeyword s_or_replace("OR REPLACE"); + ParserKeyword s_external_table("EXTERNAL TABLE"); + ParserKeyword s_if_not_exists("IF NOT EXISTS"); + ParserKeyword s_settings("SETTINGS"); + + ParserCompoundIdentifier table_name_p(true, true); + ParserSetQuery settings_p(/* parse_only_internals_ = */ true); + + ASTPtr table; + ASTPtr settings; + + bool attach = false; + bool or_replace = false; + bool if_not_exists = false; + + if (s_create.ignore(pos, expected)) + { + if (s_or_replace.ignore(pos, expected)) + or_replace = true; + } + else if (s_attach.ignore(pos, expected)) + attach = true; + else + return false; + + if (!s_external_table.ignore(pos, expected)) + return false; + + if (!or_replace && s_if_not_exists.ignore(pos, expected)) + if_not_exists = true; + + if (!table_name_p.parse(pos, table, expected)) + return false; + + if (s_settings.ignore(pos, expected)) + { + if (!settings_p.parse(pos, settings, expected)) + return false; + } + + auto create_query = std::make_shared(); + node = create_query; + + create_query->is_external = true; + create_query->create_or_replace = or_replace; + create_query->if_not_exists = if_not_exists; + + auto * table_id = table->as(); + create_query->database = table_id->getDatabase(); + create_query->table = table_id->getTable(); + if (attach) + { + create_query->uuid = table_id->uuid; + create_query->attach = attach; + } + if (create_query->database) + create_query->children.push_back(create_query->database); + if (create_query->table) + create_query->children.push_back(create_query->table); + + auto storage = std::make_shared(); + storage->set(storage->engine, makeASTFunction("ExternalTable")); + storage->set(storage->settings, settings); + create_query->set(create_query->storage, storage); + + return true; +} + +} diff --git a/src/Parsers/ParserCreateExternalTableQuery.h b/src/Parsers/ParserCreateExternalTableQuery.h new file mode 100644 index 00000000000..be45586bb33 --- /dev/null +++ b/src/Parsers/ParserCreateExternalTableQuery.h @@ -0,0 +1,18 @@ +#pragma once + +#include + +namespace DB +{ + +/// Query like this: +/// CREATE [OR REPLACE] EXTERNAL TABLE [IF NOT EXISTS] [db.]name +/// [SETTINGS name = value, ...] +class ParserCreateExternalTableQuery : public DB::IParserBase +{ +protected: + const char * getName() const override { return "CREATE EXTERNAL TABLE query"; } + bool parseImpl(Pos & pos, ASTPtr & node, Expected & expected, [[ maybe_unused ]] bool hint) override; +}; + +} diff --git a/src/Parsers/ParserCreateQuery.cpp b/src/Parsers/ParserCreateQuery.cpp index 7f4f75933af..0f94a62dbc3 100644 --- a/src/Parsers/ParserCreateQuery.cpp +++ b/src/Parsers/ParserCreateQuery.cpp @@ -19,6 +19,7 @@ #include /// proton: starts +#include #include /// proton: ends. @@ -1147,8 +1148,10 @@ bool ParserCreateQuery::parseImpl(Pos & pos, ASTPtr & node, Expected & expected, /// proton: starts. Add to parse MaterializedViewQuery ParserCreateMaterializedViewQuery streaming_view_p; + ParserCreateExternalTableQuery external_table_p; return table_p.parse(pos, node, expected) + || external_table_p.parse(pos, node, expected) || database_p.parse(pos, node, expected) || view_p.parse(pos, node, expected) || dictionary_p.parse(pos, node, expected) diff --git a/src/Parsers/ParserDropExternalTableQuery.cpp b/src/Parsers/ParserDropExternalTableQuery.cpp new file mode 100644 index 00000000000..8b1ac9a4f17 --- /dev/null +++ b/src/Parsers/ParserDropExternalTableQuery.cpp @@ -0,0 +1,52 @@ +#include +#include +#include +#include +#include + +namespace DB +{ + +bool DB::ParserDropExternalTableQuery::parseImpl(Pos & pos, ASTPtr & node, Expected & expected, [[ maybe_unused ]] bool hint) +{ + ParserKeyword s_drop("DROP"); + ParserKeyword s_external_table("EXTERNAL TABLE"); + ParserKeyword s_if_exists("IF EXISTS"); + + ParserCompoundIdentifier table_name_p(true, true); + + ASTPtr table; + + bool if_exists = false; + + if (!s_drop.ignore(pos, expected)) + return false; + + if (!s_external_table.ignore(pos, expected)) + return false; + + if (s_if_exists.ignore(pos, expected)) + if_exists = true; + + if (!table_name_p.parse(pos, table, expected)) + return false; + + auto query = std::make_shared(); + node = query; + + query->kind = ASTDropQuery::Drop; + query->is_external_table = true; + query->if_exists = if_exists; + + auto * table_id = table->as(); + query->database = table_id->getDatabase(); + query->table = table_id->getTable(); + if (query->database) + query->children.push_back(query->database); + if (query->table) + query->children.push_back(query->table); + + return true; +} + +} diff --git a/src/Parsers/ParserDropExternalTableQuery.h b/src/Parsers/ParserDropExternalTableQuery.h new file mode 100644 index 00000000000..5da057c5977 --- /dev/null +++ b/src/Parsers/ParserDropExternalTableQuery.h @@ -0,0 +1,17 @@ +#pragma once + +#include + +namespace DB +{ + +/// Query like this: +/// DROP EXTERNAL TABLE [IF NOT EXISTS] [db.]name +class ParserDropExternalTableQuery : public DB::IParserBase +{ +protected: + const char * getName() const override { return "DROP EXTERNAL TABLE query"; } + bool parseImpl(Pos & pos, ASTPtr & node, Expected & expected, [[ maybe_unused ]] bool hint) override; +}; + +} diff --git a/src/Parsers/ParserQueryWithOutput.cpp b/src/Parsers/ParserQueryWithOutput.cpp index d1d01a33fb8..2a0617371df 100644 --- a/src/Parsers/ParserQueryWithOutput.cpp +++ b/src/Parsers/ParserQueryWithOutput.cpp @@ -24,9 +24,10 @@ #include #include #include -#include "Common/Exception.h" +#include /// proton : starts +#include #include #include #include @@ -51,6 +52,7 @@ bool ParserQueryWithOutput::parseImpl(Pos & pos, ASTPtr & node, Expected & expec ParserAlterQuery alter_p; ParserRenameQuery rename_p; ParserDropQuery drop_p; + ParserDropExternalTableQuery drop_external_table_p; ParserCheckQuery check_p; ParserOptimizeQuery optimize_p; ParserKillQueryQuery kill_query_p; @@ -87,6 +89,9 @@ bool ParserQueryWithOutput::parseImpl(Pos & pos, ASTPtr & node, Expected & expec || alter_p.parse(pos, query, expected) || rename_p.parse(pos, query, expected) || drop_p.parse(pos, query, expected) + /// proton: starts + || drop_external_table_p.parse(pos, query, expected) + /// proton: ends || check_p.parse(pos, query, expected) || kill_query_p.parse(pos, query, expected) || optimize_p.parse(pos, query, expected) diff --git a/src/Processors/Formats/Impl/ValuesRowOutputFormat.cpp b/src/Processors/Formats/Impl/ValuesRowOutputFormat.cpp index abe7c42caae..4f9e437f5ac 100644 --- a/src/Processors/Formats/Impl/ValuesRowOutputFormat.cpp +++ b/src/Processors/Formats/Impl/ValuesRowOutputFormat.cpp @@ -37,6 +37,10 @@ void ValuesRowOutputFormat::writeRowEndDelimiter() void ValuesRowOutputFormat::writeRowBetweenDelimiter() { + /// proton: starts + if (format_settings.values.no_commas_between_rows) + return; + /// proton: ends writeCString(",", out); } diff --git a/src/Processors/ProcessorID.h b/src/Processors/ProcessorID.h index e044e22670c..68279452cfe 100644 --- a/src/Processors/ProcessorID.h +++ b/src/Processors/ProcessorID.h @@ -247,6 +247,9 @@ enum class ProcessorID : UInt32 GenerateRandomSourceID = 10'045, SourceFromQueryPipelineID = 10'046, ConvertingAggregatedToChunksSourceShuffledID = 10'047, + /// proton: starts + ClickHouseSourceID = 11'000, + /// proton: ends /// Sink Processors EmptySinkID = 20'000, diff --git a/src/Storages/ExternalTable/CMakeLists.txt b/src/Storages/ExternalTable/CMakeLists.txt new file mode 100644 index 00000000000..bd39e1ba864 --- /dev/null +++ b/src/Storages/ExternalTable/CMakeLists.txt @@ -0,0 +1,11 @@ +include("${proton_SOURCE_DIR}/cmake/dbms_glob_sources.cmake") + +add_headers_and_sources(external_table .) + +add_library(external_table ${external_table_headers} ${external_table_sources}) + +target_link_libraries(external_table PUBLIC clickhouse_parsers ch_contrib::abseil_swiss_tables) + +# if (ENABLE_TESTS) +# add_subdirectory(tests) +# endif () diff --git a/src/Storages/ExternalTable/ClickHouse.cpp b/src/Storages/ExternalTable/ClickHouse.cpp new file mode 100644 index 00000000000..9dc8b43ecdf --- /dev/null +++ b/src/Storages/ExternalTable/ClickHouse.cpp @@ -0,0 +1,135 @@ +#include +#include +#include +#include +#include +#include +#include + +namespace DB +{ + +namespace ExternalTable +{ + +ClickHouse::ClickHouse(const String & name, ExternalTableSettingsPtr settings) + : timeouts( /// TODO do not hard-code it, allow customization via settings + /*connection_timeout_=*/ 1 * 60 * 1'000'000, + /*send_timeout_=*/ 1 * 60 * 1'000'000, + /*receive_timeout_=*/ 1 * 60 * 1'000'000, + /*tcp_keep_alive_timeout_=*/ 5 * 60 * 1'000'000 + ) + , database(settings->database.value) + , table(settings->table.changed ? settings->table.value : name) + , logger(&Poco::Logger::get("ExternalTable-ClickHouse-" + table)) +{ + assert(settings->type.value == "clickhouse"); + + auto addr = settings->address.value; + auto pos = addr.find_first_of(':'); + if (pos == String::npos) + throw Exception(ErrorCodes::BAD_ARGUMENTS, "Invalid ClickHouse address, expected format ':'"); + auto host = addr.substr(0, pos); + auto port = std::stoi(addr.substr(pos + 1)); + if (!port) + throw Exception(ErrorCodes::BAD_ARGUMENTS, "Invalid port in ClickHouse address"); + + pool = DB::ConnectionPoolFactory::instance().get( + /*max_connections=*/ 100, + /*host=*/ host, + /*port=*/ port, + /*default_database=*/ settings->database.value, + /*user=*/ settings->user.value, + /*password=*/ settings->password.value, + /*quota_key=*/ "", + /*cluster=*/ "", + /*cluster_secret=*/ "", + "TimeplusProton", + settings->compression.value ? Protocol::Compression::Enable : Protocol::Compression::Disable, + settings->secure.value ? Protocol::Secure::Enable : Protocol::Secure::Disable, + /*priority=*/ 0); +} + +void ClickHouse::startup() +{ + LOG_INFO(logger, "startup"); +} + +Pipe ClickHouse::read( + const Names & column_names, + const StorageSnapshotPtr & storage_snapshot, + SelectQueryInfo & /*query_info*/, + ContextPtr context, + QueryProcessingStage::Enum /*processed_stage*/, + size_t /*max_block_size*/, + size_t /*num_streams*/) +{ + auto header = storage_snapshot->getSampleBlockForColumns(column_names); + auto client = std::make_unique(pool->get(timeouts), timeouts, logger); + auto source = std::make_shared(database, table, std::move(header), std::move(client), context); + return Pipe(std::move(source)); +} + +SinkToStoragePtr ClickHouse::write(const ASTPtr & /*query*/, const StorageMetadataPtr & metadata_snapshot, ContextPtr context) +{ + auto client = std::make_unique(pool->get(timeouts), timeouts, logger); + return std::make_shared(database, table, metadata_snapshot->getSampleBlock(), std::move(client), context, logger); +} + +ColumnsDescription ClickHouse::getTableStructure() +{ + ColumnsDescription ret {}; + + DB::ClickHouse::Client client = {pool->get(timeouts), timeouts, logger}; + auto query = fmt::format("DESCRIBE TABLE {}{}", + database.empty() ? "" : backQuoteIfNeed(database) + ".", + backQuoteIfNeed(table)); + /// This has to fail quickly otherwise it will block Proton from starting. + client.executeQuery(query, /*query_id=*/"", /*fail_quick=*/true); + LOG_INFO(logger, "Receiving table schema"); + while (true) + { + const auto & block = client.pollData(); + if (!block) + break; + + auto rows = block->rows(); + if (!rows) + continue; + + const auto & cols = block.value().getColumns(); + const auto & factory = DataTypeFactory::instance(); + for (size_t i = 0; i < rows; ++i) + { + ColumnDescription col_desc {}; + { + const auto & col = block->getByName("name"); + col_desc.name = col.column->getDataAt(i).toString(); + } + { + const auto & col = block->getByName("type"); + col_desc.type = factory.get(col.column->getDataAt(i).toString(), /*compatible_with_clickhouse=*/true); + } + { + const auto & col = block->getByName("comment"); + col_desc.comment = col.column->getDataAt(i).toString(); + } + ret.add(col_desc, String(), false, false); + } + } + + client.throwServerExceptionIfAny(); + return ret; +} + +} + +void registerClickHouseExternalTable(ExternalTableFactory & factory) +{ + factory.registerExternalTable("clickhouse", [](const String & name, ExternalTableSettingsPtr settings) + { + return std::make_unique(name, std::move(settings)); + }); +} + +} diff --git a/src/Storages/ExternalTable/ClickHouse.h b/src/Storages/ExternalTable/ClickHouse.h new file mode 100644 index 00000000000..a87eb418a0a --- /dev/null +++ b/src/Storages/ExternalTable/ClickHouse.h @@ -0,0 +1,46 @@ +#pragma once + +#include +#include +#include +#include + +namespace DB +{ + +namespace ExternalTable +{ + +class ClickHouse final : public IExternalTable +{ +public: + explicit ClickHouse(const String & name, ExternalTableSettingsPtr settings); + + void startup() override; + void shutdown() override {} + + ColumnsDescription getTableStructure() override; + + Pipe read( + const Names & column_names, + const StorageSnapshotPtr & storage_snapshot, + SelectQueryInfo & query_info, + ContextPtr context, + QueryProcessingStage::Enum /*processed_stage*/, + size_t max_block_size, + size_t /*num_streams*/) override; + + SinkToStoragePtr write(const ASTPtr & query, const StorageMetadataPtr & metadata_snapshot, ContextPtr context) override; + +private: + ConnectionPoolPtr pool; + ConnectionTimeouts timeouts; + String database; + String table; + + Poco::Logger * logger; +}; + +} + +} diff --git a/src/Storages/ExternalTable/ExternalTableFactory.cpp b/src/Storages/ExternalTable/ExternalTableFactory.cpp new file mode 100644 index 00000000000..67289211b87 --- /dev/null +++ b/src/Storages/ExternalTable/ExternalTableFactory.cpp @@ -0,0 +1,41 @@ +#include + +namespace DB +{ + +namespace ErrorCodes +{ +extern const int UNKNOWN_TYPE; +} + +void registerClickHouseExternalTable(ExternalTableFactory & factory); + +ExternalTableFactory & ExternalTableFactory::instance() +{ + static DB::ExternalTableFactory ret; + return ret; +} + +void ExternalTableFactory::registerExternalTable(const String & type, Creator creator) +{ + if (creators.contains(type)) + throw Exception(ErrorCodes::LOGICAL_ERROR, "ExternalTableFactory: type {} is already registered", type); + + creators[type] = std::move(creator); +} + +IExternalTablePtr ExternalTableFactory::getExternalTable(const String & name, ExternalTableSettingsPtr settings) const +{ + auto type = settings->type.value; + if (!creators.contains(type)) + throw Exception(ErrorCodes::UNKNOWN_TYPE, "Unknown external table type {}", type); + + return creators.at(type)(name, std::move(settings)); +} + +ExternalTableFactory::ExternalTableFactory() +{ + registerClickHouseExternalTable(*this); +} + +} diff --git a/src/Storages/ExternalTable/ExternalTableFactory.h b/src/Storages/ExternalTable/ExternalTableFactory.h new file mode 100644 index 00000000000..da925003a90 --- /dev/null +++ b/src/Storages/ExternalTable/ExternalTableFactory.h @@ -0,0 +1,27 @@ +#pragma once + +#include +#include "Storages/ExternalTable/IExternalTable.h" +#include "Storages/ExternalTable/ExternalTableSettings.h" + +namespace DB +{ + +/// Allows to create an IExternalTable by the name of they type. +class ExternalTableFactory final : private boost::noncopyable +{ +public: + static ExternalTableFactory & instance(); + + using Creator = std::function; + + IExternalTablePtr getExternalTable(const String & name, ExternalTableSettingsPtr settings) const; + void registerExternalTable(const String & type, Creator creator); + +private: + ExternalTableFactory(); + + std::unordered_map creators; +}; + +} diff --git a/src/Storages/ExternalTable/ExternalTableSettings.cpp b/src/Storages/ExternalTable/ExternalTableSettings.cpp new file mode 100644 index 00000000000..103b1a7e564 --- /dev/null +++ b/src/Storages/ExternalTable/ExternalTableSettings.cpp @@ -0,0 +1,39 @@ +#include +#include +#include +#include + +namespace DB +{ + +namespace ErrorCodes +{ + extern const int UNKNOWN_SETTING; +} + +IMPLEMENT_SETTINGS_TRAITS(ExternalTableSettingsTraits, LIST_OF_EXTERNAL_TABLE_SETTINGS) + +void ExternalTableSettings::loadFromQuery(ASTStorage & storage) +{ + if (storage.settings) + { + try + { + applyChanges(storage.settings->changes); + } + catch (Exception & e) + { + if (e.code() == ErrorCodes::UNKNOWN_SETTING) + e.addMessage("for storage " + storage.engine->name); + throw; + } + } + else + { + auto settings_ast = std::make_shared(); + settings_ast->is_standalone = false; + storage.set(storage.settings, settings_ast); + } +} + +} diff --git a/src/Storages/ExternalTable/ExternalTableSettings.h b/src/Storages/ExternalTable/ExternalTableSettings.h new file mode 100644 index 00000000000..e99cd972706 --- /dev/null +++ b/src/Storages/ExternalTable/ExternalTableSettings.h @@ -0,0 +1,33 @@ +#pragma once + +#include + +namespace DB +{ + +class ASTStorage; + +#define LIST_OF_EXTERNAL_TABLE_SETTINGS(M) \ + M(String, type, "", "External table type", 0) \ + /* ClickHouse settings */ \ + M(String, address, "", "The address of the ClickHouse server to connect", 0) \ + M(String, user, "default", "The user to be used to connect to the ClickHouse server", 0) \ + M(String, password, "", "The password to be used to connect to the ClickHouse server", 0) \ + M(Bool, secure, false, "Indicates if it uses secure connection", 0) \ + M(Bool, compression, true, "Indicates if compression should be enabled", 0) \ + M(String, database, "default", "The datababse to connect to", 0) \ + M(String, table, "", "The ClickHouse table to which the external table is mapped", 0) + +DECLARE_SETTINGS_TRAITS(ExternalTableSettingsTraits, LIST_OF_EXTERNAL_TABLE_SETTINGS) + + +/// Settings for the ExternalTable engine. +/// Could be loaded from a CREATE EXTERNAL TABLE query (SETTINGS clause). +struct ExternalTableSettings : public BaseSettings +{ + void loadFromQuery(ASTStorage & storage_def); +}; + +using ExternalTableSettingsPtr = std::unique_ptr; + +} diff --git a/src/Storages/ExternalTable/IExternalTable.h b/src/Storages/ExternalTable/IExternalTable.h new file mode 100644 index 00000000000..4f014a87f95 --- /dev/null +++ b/src/Storages/ExternalTable/IExternalTable.h @@ -0,0 +1,40 @@ +#pragma once + +#include +#include "QueryPipeline/Pipe.h" + +namespace DB +{ + +/// The interface for an External Table implementation to implement. +class IExternalTable +{ +public: + virtual ~IExternalTable() = default; + + virtual void startup() = 0; + virtual void shutdown() = 0; + + virtual ColumnsDescription getTableStructure() = 0; + + virtual Pipe read( + const Names & /*column_names*/, + const StorageSnapshotPtr & /*storage_snapshot*/, + SelectQueryInfo & /*query_info*/, + ContextPtr /*context*/, + QueryProcessingStage::Enum /*processed_stage*/, + size_t /*max_block_size*/, + size_t /*num_streams*/) + { + throw Exception(ErrorCodes::NOT_IMPLEMENTED, "Reading data from this type of external table is not supported"); + } + + virtual SinkToStoragePtr write(const ASTPtr & /* query */, const StorageMetadataPtr & /* metadata_snapshot */, ContextPtr /* context */) + { + throw Exception(ErrorCodes::NOT_IMPLEMENTED, "Ingesting data to this type of external table is not supported"); + } +}; + +using IExternalTablePtr = std::unique_ptr; + +} diff --git a/src/Storages/ExternalTable/StorageExternalTable.cpp b/src/Storages/ExternalTable/StorageExternalTable.cpp new file mode 100644 index 00000000000..5776bc96be0 --- /dev/null +++ b/src/Storages/ExternalTable/StorageExternalTable.cpp @@ -0,0 +1,111 @@ +#include +#include +#include +#include +#include +#include + +namespace DB +{ + +StorageExternalTable::StorageExternalTable( + const StorageID & table_id, + std::unique_ptr settings, + bool is_attach, + ContextPtr context_) +: IStorage(table_id) +, WithContext(context_) +{ + external_table = ExternalTableFactory::instance().getExternalTable(table_id.getTableName(), std::move(settings)); + + /// Two situations: + /// * Create a new table. In this case, we want it fails the create query if it fails to fetch the columns description. So that users know that there is something with the connection and they can try again once the issue is resolved. + /// * Attach a table (Proton restarts). In this case, even it fails to fetch the columns description, we want to make sure that: + /// - it does not terminate Proton, otherwise Proton will never start again + /// - it does not block Proton from starting, otherwise Proton will get stuck + /// So, we let it keep retrying in the background, and hoping it will eventually succeeded (until the user drops the table). + /// + /// TODO we could use cache to save the table structure, so that when Proton restarts it could read from the cache directly. + try + { + fetchColumnsDescription(); + } + catch (const Exception & e) + { + if (!is_attach) + e.rethrow(); + + LOG_ERROR(&Poco::Logger::get("ExternalTable"), + "Failed to fetch table structure for {}, error: {}. Will keep retrying in background", + getStorageID().getFullTableName(), + e.what()); + background_jobs.scheduleOrThrowOnError([this](){ + while (!is_dropped) + { + try + { + std::this_thread::sleep_for(std::chrono::seconds(5)); + fetchColumnsDescription(); + return; + } + catch (const Exception &) { } + } + }); + } +} + +Pipe StorageExternalTable::read( + const Names & column_names, + const StorageSnapshotPtr & storage_snapshot, + SelectQueryInfo & query_info, + ContextPtr context_, + QueryProcessingStage::Enum processed_stage, + size_t max_block_size, + size_t num_streams) +{ + return external_table->read(column_names, storage_snapshot, query_info, context_, processed_stage, max_block_size, num_streams); +} + +SinkToStoragePtr StorageExternalTable::write( + const ASTPtr & query, + const StorageMetadataPtr & metadata_snapshot, + ContextPtr context_) +{ + return external_table->write(query, metadata_snapshot, context_); +} + +void StorageExternalTable::fetchColumnsDescription() +{ + auto desc = external_table->getTableStructure(); + auto metadata = getInMemoryMetadata(); + metadata.setColumns(std::move(desc)); + setInMemoryMetadata(metadata); +} + +void registerStorageExternalTable(StorageFactory & factory) +{ + auto creator_fn = [](const StorageFactory::Arguments & args) + { + if (!args.storage_def->settings) + throw Exception(ErrorCodes::BAD_ARGUMENTS, "External table requires correct settings setup"); + + auto settings = std::make_unique(); + settings->loadFromQuery(*args.storage_def); + + return StorageExternalTable::create( + args.table_id, + std::move(settings), + args.attach, + args.getContext()->getGlobalContext()); + }; + + factory.registerStorage( + "ExternalTable", + creator_fn, + StorageFactory::StorageFeatures{ + .supports_settings = true, + .supports_schema_inference = true, + }); +} + +} diff --git a/src/Storages/ExternalTable/StorageExternalTable.h b/src/Storages/ExternalTable/StorageExternalTable.h new file mode 100644 index 00000000000..c5d86689484 --- /dev/null +++ b/src/Storages/ExternalTable/StorageExternalTable.h @@ -0,0 +1,56 @@ +#pragma once + +#include +#include +#include +#include +#include + +#include + +namespace DB +{ + +class StorageExternalTable final : public shared_ptr_helper, public IStorage, public WithContext +{ + friend struct shared_ptr_helper; + +public: + String getName() const override { return "ExternalTable"; } + + bool isRemote() const override { return true; } + bool isExternalTable() const override { return true; } + bool squashInsert() const noexcept override { return false; } + + void startup() override { external_table->startup(); } + void shutdown() override { external_table->shutdown(); } + + Pipe read( + const Names & column_names, + const StorageSnapshotPtr & storage_snapshot, + SelectQueryInfo & query_info, + ContextPtr context_, + QueryProcessingStage::Enum processed_stage, + size_t max_block_size, + size_t num_streams) override; + + SinkToStoragePtr write( + const ASTPtr & /*query*/, + const StorageMetadataPtr & /*metadata_snapshot*/, + ContextPtr /*context*/) override; + +protected: + StorageExternalTable( + const StorageID & table_id, + std::unique_ptr settings, + bool is_attach, + ContextPtr context_); + +private: + void fetchColumnsDescription(); + + IExternalTablePtr external_table; + ThreadPool background_jobs {1}; +}; + +} diff --git a/src/Storages/IStorage.h b/src/Storages/IStorage.h index f746c7eabc4..8a64ed71cc2 100644 --- a/src/Storages/IStorage.h +++ b/src/Storages/IStorage.h @@ -234,6 +234,8 @@ class IStorage : public std::enable_shared_from_this, public TypePromo virtual bool supportsAccurateSeekTo() const noexcept { return false; } virtual bool supportsStreamingQuery() const { return false; } + + virtual bool isExternalTable() const { return false; } /// proton: ends. /// Return list of virtual columns (like _part, _table, etc). In the vast diff --git a/src/Storages/Streaming/StorageMaterializedView.cpp b/src/Storages/Streaming/StorageMaterializedView.cpp index 942e8603402..fed2501ea8b 100644 --- a/src/Storages/Streaming/StorageMaterializedView.cpp +++ b/src/Storages/Streaming/StorageMaterializedView.cpp @@ -29,6 +29,7 @@ #include #include #include +#include #include @@ -201,7 +202,7 @@ StorageMaterializedView::StorageMaterializedView( if (!target_table) throw Exception(ErrorCodes::INCORRECT_QUERY, "Target stream is not found", target_table_id.getFullTableName()); - if (!target_table->as() && !target_table->as()) + if (!target_table->as() && !target_table->as() && !target_table->as()) throw Exception(ErrorCodes::NOT_IMPLEMENTED, "MaterializedView doesn't support target storage is {}", target_table->getName()); } } diff --git a/src/Storages/registerStorages.cpp b/src/Storages/registerStorages.cpp index 11ebaa2f61e..20544c5fb13 100644 --- a/src/Storages/registerStorages.cpp +++ b/src/Storages/registerStorages.cpp @@ -20,6 +20,7 @@ void registerStorageView(StorageFactory & factory); /// proton: starts. void registerStorageStream(StorageFactory & factory); void registerStorageMaterializedView(StorageFactory & factory); +void registerStorageExternalTable(StorageFactory & factory); void registerStorageExternalStream(StorageFactory & factory); void registerStorageRandom(StorageFactory & factory); /// proton: ends. @@ -59,6 +60,7 @@ void registerStorages() /// proton: starts. registerStorageStream(factory); registerStorageMaterializedView(factory); + registerStorageExternalTable(factory); registerStorageExternalStream(factory); registerStorageRandom(factory); /// proton: ends.