diff --git a/.gitmodules b/.gitmodules index b24bc75e1..a28696951 100644 --- a/.gitmodules +++ b/.gitmodules @@ -14,3 +14,7 @@ path = contrib/googletest url = https://github.com/google/googletest.git branch = master +[submodule "contrib/folly"] + path = contrib/folly + url = https://github.com/facebook/folly.git + branch = master diff --git a/CMakeLists.txt b/CMakeLists.txt index 050e14a2f..c97362104 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -45,6 +45,7 @@ cmake_dependent_option (CH_ODBC_ENABLE_TESTING "Enable test targets" ON "BUILD_T option (CH_ODBC_PREFER_BUNDLED_THIRD_PARTIES "Prefer bundled over system variants of third party libraries" ON) cmake_dependent_option (CH_ODBC_PREFER_BUNDLED_POCO "Prefer bundled over system variants of Poco library" ON "CH_ODBC_PREFER_BUNDLED_THIRD_PARTIES" OFF) cmake_dependent_option (CH_ODBC_PREFER_BUNDLED_SSL "Prefer bundled over system variants of SSL library" ON "CH_ODBC_PREFER_BUNDLED_POCO" OFF) +cmake_dependent_option (CH_ODBC_PREFER_BUNDLED_FOLLY "Prefer bundled over system variants of Folly library" ON "CH_ODBC_PREFER_BUNDLED_THIRD_PARTIES" OFF) cmake_dependent_option (CH_ODBC_PREFER_BUNDLED_GOOGLETEST "Prefer bundled over system variants of Google Test library" ON "CH_ODBC_PREFER_BUNDLED_THIRD_PARTIES" OFF) cmake_dependent_option (CH_ODBC_PREFER_BUNDLED_NANODBC "Prefer bundled over system variants of nanodbc library" ON "CH_ODBC_PREFER_BUNDLED_THIRD_PARTIES" OFF) option (CH_ODBC_RUNTIME_LINK_STATIC "Link with compiler and language runtime statically" OFF) @@ -131,6 +132,11 @@ if (NOT CH_ODBC_PREFER_BUNDLED_POCO) find_package (Poco COMPONENTS Foundation Net NetSSL) endif () +if (NOT CH_ODBC_PREFER_BUNDLED_FOLLY) + message (WARNING "Folly: using system variant of the library currently not supported") +# find_package (Folly) +endif () + if (CH_ODBC_ENABLE_TESTING) if (NOT CH_ODBC_PREFER_BUNDLED_GOOGLETEST) find_package (GTest) diff --git a/contrib/CMakeLists.txt b/contrib/CMakeLists.txt index 8391275bd..e4285fc1b 100644 --- a/contrib/CMakeLists.txt +++ b/contrib/CMakeLists.txt @@ -90,6 +90,15 @@ if (MSVC) endif () endif () +# if (CH_ODBC_PREFER_BUNDLED_FOLLY OR NOT Folly_FOUND) +# if (NOT Folly_FOUND AND NOT CH_ODBC_PREFER_BUNDLED_FOLLY) +# message (WARNING "Folly: unable to find system Folly, falling back to using the bundled variant of the library") +# endif () +# +# add_subdirectory (folly) + set (FOLLY_INLUDE_DIR "${CMAKE_CURRENT_SOURCE_DIR}/folly" CACHE INTERNAL "") +# endif () + if (CH_ODBC_ENABLE_TESTING) if (CH_ODBC_PREFER_BUNDLED_GOOGLETEST OR NOT GTEST_FOUND) if (NOT GTEST_FOUND AND NOT CH_ODBC_PREFER_BUNDLED_GOOGLETEST) diff --git a/contrib/folly b/contrib/folly new file mode 160000 index 000000000..9d9dad169 --- /dev/null +++ b/contrib/folly @@ -0,0 +1 @@ +Subproject commit 9d9dad1690926668e53c3a98be6e94126d534d00 diff --git a/driver/CMakeLists.txt b/driver/CMakeLists.txt index f6a9c48af..1653b4b28 100644 --- a/driver/CMakeLists.txt +++ b/driver/CMakeLists.txt @@ -132,6 +132,7 @@ if (UNICODE) endif () target_include_directories (${libname}-impl + PUBLIC ${FOLLY_INLUDE_DIR} # TODO: switch to linking with folly lib target? PUBLIC ${PROJECT_BINARY_DIR} PUBLIC ${PROJECT_SOURCE_DIR} ) diff --git a/driver/format/ODBCDriver2.cpp b/driver/format/ODBCDriver2.cpp index 2e8b43f57..ae9441e6d 100644 --- a/driver/format/ODBCDriver2.cpp +++ b/driver/format/ODBCDriver2.cpp @@ -1,6 +1,6 @@ #include "driver/format/ODBCDriver2.h" -ODBCDriver2ResultSet::ODBCDriver2ResultSet(std::istream & stream, std::unique_ptr && mutator) +ODBCDriver2ResultSet::ODBCDriver2ResultSet(AmortizedIStreamReader & stream, std::unique_ptr && mutator) : ResultSet(stream, std::move(mutator)) { std::int32_t num_header_rows = 0; @@ -57,7 +57,7 @@ ODBCDriver2ResultSet::ODBCDriver2ResultSet(std::istream & stream, std::unique_pt } bool ODBCDriver2ResultSet::readNextRow(Row & row) { - if (raw_stream.peek() == EOF) + if (stream.eof()) return false; for (std::size_t i = 0; i < row.fields.size(); ++i) { @@ -68,11 +68,7 @@ bool ODBCDriver2ResultSet::readNextRow(Row & row) { } void ODBCDriver2ResultSet::readSize(std::int32_t & dest) { - constexpr auto size = sizeof(dest); - raw_stream.read(reinterpret_cast(&dest), size); - - if (raw_stream.gcount() != size) - throw std::runtime_error("Incomplete result received, expected size: " + std::to_string(size)); + stream.read(reinterpret_cast(&dest), sizeof(std::int32_t)); } void ODBCDriver2ResultSet::readValue(std::string & dest, bool * is_null) { @@ -80,20 +76,23 @@ void ODBCDriver2ResultSet::readValue(std::string & dest, bool * is_null) { readSize(size); if (size >= 0) { - dest.resize(size); // TODO: switch to uninitializing resize(). + resize_without_initialization(dest, size); if (is_null) *is_null = false; if (size > 0) { - raw_stream.read(dest.data(), size); - - if (raw_stream.gcount() != size) - throw std::runtime_error("Incomplete result received, expected size: " + std::to_string(size)); + try { + stream.read(dest.data(), size); + } + catch (...) { + dest.clear(); + throw; + } } } else /*if (size == -1) */{ - dest.resize(0); + dest.clear(); if (is_null) *is_null = true; @@ -235,10 +234,10 @@ void ODBCDriver2ResultSet::readValue(std::string & src, DataSourceType::template to_value>::convert(src, dest); } -ODBCDriver2ResultReader::ODBCDriver2ResultReader(std::istream & stream, std::unique_ptr && mutator) - : ResultReader(stream, std::move(mutator)) +ODBCDriver2ResultReader::ODBCDriver2ResultReader(std::istream & raw_stream, std::unique_ptr && mutator) + : ResultReader(raw_stream, std::move(mutator)) { - if (stream.peek() == EOF) + if (stream.eof()) return; result_set = std::make_unique(stream, releaseMutator()); diff --git a/driver/format/ODBCDriver2.h b/driver/format/ODBCDriver2.h index 17ee76b8e..4ce658af9 100755 --- a/driver/format/ODBCDriver2.h +++ b/driver/format/ODBCDriver2.h @@ -8,7 +8,7 @@ class ODBCDriver2ResultSet : public ResultSet { public: - explicit ODBCDriver2ResultSet(std::istream & stream, std::unique_ptr && mutator); + explicit ODBCDriver2ResultSet(AmortizedIStreamReader & stream, std::unique_ptr && mutator); virtual ~ODBCDriver2ResultSet() override = default; protected: @@ -61,7 +61,7 @@ class ODBCDriver2ResultReader : public ResultReader { public: - explicit ODBCDriver2ResultReader(std::istream & stream, std::unique_ptr && mutator); + explicit ODBCDriver2ResultReader(std::istream & raw_stream, std::unique_ptr && mutator); virtual ~ODBCDriver2ResultReader() override = default; virtual bool advanceToNextResultSet() override; diff --git a/driver/format/RowBinaryWithNamesAndTypes.cpp b/driver/format/RowBinaryWithNamesAndTypes.cpp index 633377151..0b672bc43 100644 --- a/driver/format/RowBinaryWithNamesAndTypes.cpp +++ b/driver/format/RowBinaryWithNamesAndTypes.cpp @@ -2,7 +2,7 @@ #include -RowBinaryWithNamesAndTypesResultSet::RowBinaryWithNamesAndTypesResultSet(std::istream & stream, std::unique_ptr && mutator) +RowBinaryWithNamesAndTypesResultSet::RowBinaryWithNamesAndTypesResultSet(AmortizedIStreamReader & stream, std::unique_ptr && mutator) : ResultSet(stream, std::move(mutator)) { std::uint64_t num_columns = 0; @@ -35,7 +35,7 @@ RowBinaryWithNamesAndTypesResultSet::RowBinaryWithNamesAndTypesResultSet(std::is } bool RowBinaryWithNamesAndTypesResultSet::readNextRow(Row & row) { - if (raw_stream.peek() == EOF) + if (stream.eof()) return false; for (std::size_t i = 0; i < row.fields.size(); ++i) { @@ -53,10 +53,7 @@ void RowBinaryWithNamesAndTypesResultSet::readSize(std::uint64_t & res) { std::uint8_t shift = 0; while (true) { - auto byte = raw_stream.get(); - - if (raw_stream.fail() || byte == EOF) - throw std::runtime_error("Incomplete result received, expected: at least 1 more byte"); + const int byte = stream.get(); const std::uint64_t chunk = (byte & 0b01111111); const std::uint64_t segment = (chunk << shift); @@ -80,11 +77,7 @@ void RowBinaryWithNamesAndTypesResultSet::readSize(std::uint64_t & res) { } void RowBinaryWithNamesAndTypesResultSet::readValue(bool & dest) { - auto byte = raw_stream.get(); - - if (raw_stream.fail() || byte == EOF) - throw std::runtime_error("Incomplete result received, expected size: 1"); - + const int byte = stream.get(); dest = (byte != 0); } @@ -95,11 +88,15 @@ void RowBinaryWithNamesAndTypesResultSet::readValue(std::string & res) { } void RowBinaryWithNamesAndTypesResultSet::readValue(std::string & dest, const std::uint64_t size) { - dest.resize(size); // TODO: switch to uninitializing resize(). - raw_stream.read(dest.data(), dest.size()); + resize_without_initialization(dest, size); - if (raw_stream.gcount() != dest.size()) - throw std::runtime_error("Incomplete result received, expected size: " + std::to_string(size)); + try { + stream.read(dest.data(), dest.size()); + } + catch (...) { + dest.clear(); + throw; + } } void RowBinaryWithNamesAndTypesResultSet::readValue(Field & dest, ColumnInfo & column_info) { @@ -285,11 +282,7 @@ void RowBinaryWithNamesAndTypesResultSet::readValue(DataSourceType && mutator) - : ResultReader(stream, std::move(mutator)) +RowBinaryWithNamesAndTypesResultReader::RowBinaryWithNamesAndTypesResultReader(std::istream & raw_stream, std::unique_ptr && mutator) + : ResultReader(raw_stream, std::move(mutator)) { - if (stream.peek() == EOF) + if (stream.eof()) return; result_set = std::make_unique(stream, releaseMutator()); diff --git a/driver/format/RowBinaryWithNamesAndTypes.h b/driver/format/RowBinaryWithNamesAndTypes.h index 92424d1aa..c0b8e34e3 100755 --- a/driver/format/RowBinaryWithNamesAndTypes.h +++ b/driver/format/RowBinaryWithNamesAndTypes.h @@ -8,7 +8,7 @@ class RowBinaryWithNamesAndTypesResultSet : public ResultSet { public: - explicit RowBinaryWithNamesAndTypesResultSet(std::istream & stream, std::unique_ptr && mutator); + explicit RowBinaryWithNamesAndTypesResultSet(AmortizedIStreamReader & stream, std::unique_ptr && mutator); virtual ~RowBinaryWithNamesAndTypesResultSet() override = default; protected: @@ -23,11 +23,7 @@ class RowBinaryWithNamesAndTypesResultSet template void readPOD(T & dest) { - constexpr auto size = sizeof(T); - raw_stream.read(reinterpret_cast(&dest), size); - - if (raw_stream.gcount() != size) - throw std::runtime_error("Incomplete result received, expected size: " + std::to_string(size)); + stream.read(reinterpret_cast(&dest), sizeof(T)); } void readValue(Field & dest, ColumnInfo & column_info); @@ -73,7 +69,7 @@ class RowBinaryWithNamesAndTypesResultReader : public ResultReader { public: - explicit RowBinaryWithNamesAndTypesResultReader(std::istream & stream, std::unique_ptr && mutator); + explicit RowBinaryWithNamesAndTypesResultReader(std::istream & raw_stream, std::unique_ptr && mutator); virtual ~RowBinaryWithNamesAndTypesResultReader() override = default; virtual bool advanceToNextResultSet() override; diff --git a/driver/result_set.cpp b/driver/result_set.cpp index 6f1e9cdf1..64a7263d9 100644 --- a/driver/result_set.cpp +++ b/driver/result_set.cpp @@ -117,8 +117,8 @@ SQLRETURN Row::extractField(std::size_t column_idx, BindingInfo & binding_info) return fields[column_idx].extract(binding_info); } -ResultSet::ResultSet(std::istream & stream, std::unique_ptr && mutator) - : raw_stream(stream) +ResultSet::ResultSet(AmortizedIStreamReader & str, std::unique_ptr && mutator) + : stream(str) , result_mutator(std::move(mutator)) , string_pool(1000000) , row_pool(1000000) @@ -296,8 +296,8 @@ void ResultSet::retireRow(Row && row) { row_pool.put(std::move(row)); } -ResultReader::ResultReader(std::istream & stream, std::unique_ptr && mutator) - : raw_stream(stream) +ResultReader::ResultReader(std::istream & raw_stream, std::unique_ptr && mutator) + : stream(raw_stream) , result_mutator(std::move(mutator)) { } diff --git a/driver/result_set.h b/driver/result_set.h index 850eb61c3..7a547557f 100644 --- a/driver/result_set.h +++ b/driver/result_set.h @@ -85,7 +85,7 @@ class ResultMutator { class ResultSet { public: - explicit ResultSet(std::istream & stream, std::unique_ptr && mutator); + explicit ResultSet(AmortizedIStreamReader & str, std::unique_ptr && mutator); virtual ~ResultSet(); @@ -111,7 +111,7 @@ class ResultSet { virtual bool readNextRow(Row & row) = 0; protected: - std::istream & raw_stream; + AmortizedIStreamReader & stream; std::unique_ptr result_mutator; std::vector columns_info; std::deque row_set; @@ -139,7 +139,7 @@ class ResultReader { virtual bool advanceToNextResultSet() = 0; protected: - std::istream & raw_stream; + AmortizedIStreamReader stream; std::unique_ptr result_mutator; std::unique_ptr result_set; }; diff --git a/driver/utils/utils.h b/driver/utils/utils.h index b5f80f3af..54d10d51a 100755 --- a/driver/utils/utils.h +++ b/driver/utils/utils.h @@ -33,6 +33,13 @@ #include +#if defined(_MSC_VER) && _MSC_VER > 1916 // Not supported yet for Visual Studio 2019 and later. +# define resize_without_initialization(container, size) container.resize(size) +#else +# include +# define resize_without_initialization(container, size) folly::resizeWithoutInitialization(container, size) +#endif + class Environment; class Connection; class Descriptor; @@ -143,6 +150,123 @@ class ObjectPool { std::deque cache_; }; +// A restricted wrapper around std::istream, that tries to reduce the number of std::istream::read() calls at the cost of extra std::memcpy(). +// Maintains internal buffer of pre-read characters making AmortizedIStreamReader::read() calls for small counts more efficient. +// Handles incomplete reads and terminated std::istream more aggressively, by throwing exceptions. +class AmortizedIStreamReader +{ +public: + explicit AmortizedIStreamReader(std::istream & raw_stream) + : raw_stream_(raw_stream) + { + } + + ~AmortizedIStreamReader() { + // Put back any pre-read characters, just in case... + if (available() > 0) { + for (std::size_t i = buffer_.size() - 1; i >= offset_; --i) { + raw_stream_.putback(buffer_[i]); + } + } + } + + AmortizedIStreamReader(const AmortizedIStreamReader &) = delete; + AmortizedIStreamReader(AmortizedIStreamReader &&) noexcept = delete; + AmortizedIStreamReader & operator= (const AmortizedIStreamReader &) = delete; + AmortizedIStreamReader & operator= (AmortizedIStreamReader &&) noexcept = delete; + + bool eof() { + if (available() > 0) + return false; + + if (raw_stream_.eof() || raw_stream_.fail()) + return true; + + tryPrepare(1); + + if (available() > 0) + return false; + + return (raw_stream_.eof() || raw_stream_.fail()); + } + + char get() { + tryPrepare(1); + + if (available() < 1) + throw std::runtime_error("Incomplete input stream, expected at least 1 more byte"); + + return buffer_[offset_++]; + } + + AmortizedIStreamReader & read(char * str, std::size_t count) { + tryPrepare(count); + + if (available() < count) + throw std::runtime_error("Incomplete input stream, expected at least " + std::to_string(count) + " more bytes"); + + if (str) // If str == nullptr, just silently consume requested amount of data. + std::memcpy(str, &buffer_[offset_], count); + + offset_ += count; + + return *this; + } + +private: + std::size_t available() const { + if (offset_ < buffer_.size()) + return (buffer_.size() - offset_); + + return 0; + } + + void tryPrepare(std::size_t count) { + const auto avail = available(); + + if (avail < count) { + static constexpr std::size_t min_read_size = 1 << 13; // 8 KB + + const auto to_read = std::max(min_read_size, count - avail); + const auto tail_capacity = buffer_.capacity() - buffer_.size(); + const auto free_capacity = tail_capacity + offset_; + + if (tail_capacity < to_read) { // Reallocation or at least compacting have to be done. + if (free_capacity < to_read) { // Reallocation is unavoidable. Compact the buffer while doing it. + if (avail > 0) { + decltype(buffer_) tmp; + resize_without_initialization(tmp, avail + to_read); + std::memcpy(&tmp[0], &buffer_[offset_], avail); + buffer_.swap(tmp); + } + else { + buffer_.clear(); + resize_without_initialization(buffer_, to_read); + } + } + else { // Compacting the buffer is enough. + std::memmove(&buffer_[0], &buffer_[offset_], avail); + resize_without_initialization(buffer_, avail + to_read); + } + offset_ = 0; + } + else { + resize_without_initialization(buffer_, buffer_.size() + to_read); + } + + raw_stream_.read(&buffer_[offset_ + avail], to_read); + + if (raw_stream_.gcount() < to_read) + buffer_.resize(buffer_.size() - (to_read - raw_stream_.gcount())); + } + } + +private: + std::istream & raw_stream_; + std::size_t offset_ = 0; + std::string buffer_; +}; + // Parses "Value List Arguments" of catalog functions. // Effectively, parses a comma-separated list of possibly single-quoted values // into a set of values. Escaping support is not supposed is such quoted values.