Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Expose isUnquotedPathCharacter for validation #375

Closed
wants to merge 9 commits into from
43 changes: 36 additions & 7 deletions velox/benchmarks/basic/CastBenchmark.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -27,27 +27,52 @@ int main(int argc, char** argv) {
folly::Init init(&argc, &argv);

ExpressionBenchmarkBuilder benchmarkBuilder;

const vector_size_t vectorSize = 1000;
auto vectorMaker = benchmarkBuilder.vectorMaker();
auto invalidInput = vectorMaker.flatVector<facebook::velox::StringView>({""});
auto validInput = vectorMaker.flatVector<facebook::velox::StringView>({""});
auto nanInput = vectorMaker.flatVector<facebook::velox::StringView>({""});
auto decimalInput = vectorMaker.flatVector<int64_t>(
vectorSize, [&](auto j) { return 12345 * j; }, nullptr, DECIMAL(9, 2));
auto shortDecimalInput = vectorMaker.flatVector<int64_t>(
vectorSize,
[&](auto j) { return 123456789 * j; },
nullptr,
DECIMAL(18, 6));
auto longDecimalInput = vectorMaker.flatVector<int128_t>(
vectorSize,
[&](auto j) {
return facebook::velox::HugeInt::build(12345 * j, 56789 * j + 12345);
},
nullptr,
DECIMAL(38, 16));

invalidInput->resize(1000);
validInput->resize(1000);
nanInput->resize(1000);
invalidInput->resize(vectorSize);
validInput->resize(vectorSize);
nanInput->resize(vectorSize);

for (int i = 0; i < 1000; i++) {
for (int i = 0; i < vectorSize; i++) {
nanInput->set(i, "$"_sv);
invalidInput->set(i, StringView::makeInline(std::string("")));
validInput->set(i, StringView::makeInline(std::to_string(i)));
}

benchmarkBuilder
.addBenchmarkSet(
"cast_int",
"cast",
vectorMaker.rowVector(
{"valid", "empty", "nan"}, {validInput, invalidInput, nanInput}))
{"valid",
"empty",
"nan",
"decimal",
"short_decimal",
"long_decimal"},
{validInput,
invalidInput,
nanInput,
decimalInput,
shortDecimalInput,
longDecimalInput}))
.addExpression("try_cast_invalid_empty_input", "try_cast (empty as int) ")
.addExpression(
"tryexpr_cast_invalid_empty_input", "try (cast (empty as int))")
Expand All @@ -56,6 +81,10 @@ int main(int argc, char** argv) {
.addExpression("try_cast_valid", "try_cast (valid as int)")
.addExpression("tryexpr_cast_valid", "try (cast (valid as int))")
.addExpression("cast_valid", "cast(valid as int)")
.addExpression(
"cast_decimal_to_inline_string", "cast (decimal as varchar)")
.addExpression("cast_short_decimal", "cast (short_decimal as varchar)")
.addExpression("cast_long_decimal", "cast (long_decimal as varchar)")
.withIterations(100)
.disableTesting();

Expand Down
11 changes: 11 additions & 0 deletions velox/common/process/ThreadDebugInfo.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,12 @@
namespace facebook::velox::process {
thread_local const ThreadDebugInfo* threadDebugInfo = nullptr;

// Flag to ensure that printCurrentQueryId() only invokes the callback in
// ThreadDebugInfo once. This is to prevent callback from being recursively
// called in case it induces a fatal signal which ends up calling
// printCurrentQueryId() again.
thread_local bool fatalSignalProcessed = false;

static void printCurrentQueryId() {
const ThreadDebugInfo* info = GetThreadDebugInfo();
if (info == nullptr) {
Expand All @@ -38,6 +44,11 @@ static void printCurrentQueryId() {
write(STDERR_FILENO, info->taskId_.c_str(), info->taskId_.length());
}
write(STDERR_FILENO, "\n", 1);

if (!fatalSignalProcessed && info->callback_) {
fatalSignalProcessed = true;
info->callback_();
}
}

const ThreadDebugInfo* GetThreadDebugInfo() {
Expand Down
3 changes: 3 additions & 0 deletions velox/common/process/ThreadDebugInfo.h
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@

#pragma once

#include <functional>
#include <string>

namespace facebook::velox::process {
Expand All @@ -25,6 +26,8 @@ namespace facebook::velox::process {
struct ThreadDebugInfo {
std::string queryId_;
std::string taskId_;
// Callback to invoke when the debug info is to be dumped. Can be empty.
std::function<void()> callback_;
};

// A RAII class to store thread local debug information.
Expand Down
2 changes: 1 addition & 1 deletion velox/connectors/hive/storage_adapters/s3fs/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ if(VELOX_ENABLE_S3)
target_sources(velox_s3fs PRIVATE S3FileSystem.cpp S3Util.cpp)

target_include_directories(velox_s3fs PUBLIC ${AWSSDK_INCLUDE_DIRS})
target_link_libraries(velox_s3fs Folly::folly ${AWSSDK_LIBRARIES})
target_link_libraries(velox_s3fs Folly::folly ${AWSSDK_LIBRARIES} xsimd)

if(${VELOX_BUILD_TESTING})
add_subdirectory(tests)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
#include "velox/connectors/hive/storage_adapters/s3fs/S3FileSystem.h"
#include "velox/connectors/hive/storage_adapters/s3fs/S3Util.h"
#include "velox/core/Config.h"
#include "velox/dwio/common/FileSink.h"
#endif

#include "velox/connectors/hive/storage_adapters/s3fs/RegisterS3FileSystem.h"
Expand Down Expand Up @@ -52,12 +53,36 @@ fileSystemGenerator() {
};
return filesystemGenerator;
}

std::function<std::unique_ptr<velox::dwio::common::FileSink>(
const std::string&,
const velox::dwio::common::FileSink::Options& options)>
s3WriteFileSinkGenerator() {
static auto s3WriteFileSink =
[](const std::string& fileURI,
const velox::dwio::common::FileSink::Options& options)
-> std::unique_ptr<dwio::common::WriteFileSink> {
if (isS3File(fileURI)) {
auto fileSystem =
filesystems::getFileSystem(fileURI, options.connectorProperties);
return std::make_unique<dwio::common::WriteFileSink>(
fileSystem->openFileForWrite(fileURI),
fileURI,
options.metricLogger,
options.stats);
}
return nullptr;
};

return s3WriteFileSink;
}
#endif

void registerS3FileSystem() {
#ifdef VELOX_ENABLE_S3
if (!s3fs) {
registerFileSystem(isS3File, fileSystemGenerator());
dwio::common::FileSink::registerFactory(s3WriteFileSinkGenerator());
}
#endif
}
Expand Down
14 changes: 14 additions & 0 deletions velox/connectors/hive/storage_adapters/s3fs/tests/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -50,3 +50,17 @@ target_link_libraries(
velox_core
gtest
gtest_main)

add_executable(velox_s3insert_test S3InsertTest.cpp)
add_test(velox_s3insert_test velox_s3insert_test)
target_link_libraries(
velox_s3insert_test
velox_file
velox_s3fs
velox_hive_config
velox_core
velox_exec_test_lib
velox_dwio_common_exception
velox_exec
gtest
gtest_main)
184 changes: 184 additions & 0 deletions velox/connectors/hive/storage_adapters/s3fs/tests/S3InsertTest.cpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,184 @@
/*
* Copyright (c) Facebook, Inc. and its affiliates.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

#include <folly/init/Init.h>

#include "gtest/gtest.h"
#include "velox/common/file/FileSystems.h"
#include "velox/connectors/hive/storage_adapters/s3fs/RegisterS3FileSystem.h"
#include "velox/connectors/hive/storage_adapters/s3fs/tests/MinioServer.h"
#include "velox/dwio/parquet/reader/ParquetReader.h"
#include "velox/exec/TableWriter.h"
#include "velox/exec/tests/utils/AssertQueryBuilder.h"
#include "velox/exec/tests/utils/HiveConnectorTestBase.h"
#include "velox/exec/tests/utils/PlanBuilder.h"

using namespace facebook::velox;
using namespace facebook::velox::core;
using namespace facebook::velox::exec;
using namespace facebook::velox::exec::test;
using namespace facebook::velox::connector;
using namespace facebook::velox::connector::hive;
using namespace facebook::velox::dwio::common;
using namespace facebook::velox::test;
using namespace facebook::velox::filesystems;

class S3InsertTest : public testing::Test, public VectorTestBase {
public:
static constexpr char const* kMinioConnectionString{"127.0.0.1:7000"};
/// We use static initialization because we want a single version of the
/// Minio server running.
/// Each test must use a unique bucket to avoid concurrency issues.
static void SetUpTestSuite() {
minioServer_ = std::make_shared<MinioServer>(kMinioConnectionString);
minioServer_->start();

ioExecutor_ = std::make_unique<folly::IOThreadPoolExecutor>(3);
filesystems::registerS3FileSystem();
auto hiveConnector =
connector::getConnectorFactory(
connector::hive::HiveConnectorFactory::kHiveConnectorName)
->newConnector(
kHiveConnectorId,
minioServer_->hiveConfig(),
ioExecutor_.get());
connector::registerConnector(hiveConnector);
}

static void TearDownTestSuite() {
filesystems::finalizeS3FileSystem();
unregisterConnector(kHiveConnectorId);
minioServer_->stop();
minioServer_ = nullptr;
}

static std::shared_ptr<MinioServer> minioServer_;
static std::unique_ptr<folly::IOThreadPoolExecutor> ioExecutor_;
};
std::shared_ptr<MinioServer> S3InsertTest::minioServer_ = nullptr;
std::unique_ptr<folly::IOThreadPoolExecutor> S3InsertTest::ioExecutor_ =
nullptr;

PlanNodePtr createInsertPlan(
PlanBuilder& inputPlan,
const RowTypePtr& outputRowType,
const std::string_view& outputDirectoryPath,
const std::vector<std::string>& partitionedBy = {},
const std::shared_ptr<HiveBucketProperty>& bucketProperty = {},
const connector::hive::LocationHandle::TableType& outputTableType =
connector::hive::LocationHandle::TableType::kNew,
const CommitStrategy& outputCommitStrategy = CommitStrategy::kNoCommit) {
auto insertTableHandle = std::make_shared<core::InsertTableHandle>(
kHiveConnectorId,
HiveConnectorTestBase::makeHiveInsertTableHandle(
outputRowType->names(),
outputRowType->children(),
partitionedBy,
bucketProperty,
HiveConnectorTestBase::makeLocationHandle(
outputDirectoryPath.data(), std::nullopt, outputTableType),
FileFormat::PARQUET));

auto insertPlan = inputPlan.tableWrite(
inputPlan.planNode()->outputType(),
outputRowType->names(),
nullptr,
insertTableHandle,
bucketProperty != nullptr,
outputCommitStrategy);
return insertPlan.planNode();
}

TEST_F(S3InsertTest, s3InsertTest) {
const int64_t kExpectedRows = 1'000;
const std::string_view kOutputDirectory{"s3://writedata/"};

auto rowType = ROW(
{"c0", "c1", "c2", "c3"}, {BIGINT(), INTEGER(), SMALLINT(), DOUBLE()});

auto input = makeRowVector(
{makeFlatVector<int64_t>(kExpectedRows, [](auto row) { return row; }),
makeFlatVector<int32_t>(kExpectedRows, [](auto row) { return row; }),
makeFlatVector<int16_t>(kExpectedRows, [](auto row) { return row; }),
makeFlatVector<double>(kExpectedRows, [](auto row) { return row; })});

minioServer_->addBucket("writedata");

// Insert into s3 with one writer.
auto plan = createInsertPlan(
PlanBuilder().values({input}), rowType, kOutputDirectory);

// Execute the write plan.
auto result = AssertQueryBuilder(plan).copyResults(pool());

// Get the fragment from the TableWriter output.
auto fragmentVector = result->childAt(TableWriteTraits::kFragmentChannel)
->asFlatVector<StringView>();

ASSERT(fragmentVector);

// The fragment contains data provided by the DataSink#finish.
// This includes the target filename, rowCount, etc...
// Extract the filename, row counts, filesize.
std::vector<std::string> writeFiles;
int64_t numRows{0};
int64_t writeFileSize{0};
for (int i = 0; i < result->size(); ++i) {
if (!fragmentVector->isNullAt(i)) {
folly::dynamic obj = folly::parseJson(fragmentVector->valueAt(i));
ASSERT_EQ(obj["targetPath"], kOutputDirectory);
ASSERT_EQ(obj["writePath"], kOutputDirectory);
numRows += obj["rowCount"].asInt();

folly::dynamic writerInfoObj = obj["fileWriteInfos"][0];
const std::string writeFileName =
writerInfoObj["writeFileName"].asString();
const std::string writeFileFullPath =
obj["writePath"].asString() + "/" + writeFileName;
writeFiles.push_back(writeFileFullPath);
writeFileSize += writerInfoObj["fileSize"].asInt();
}
}

ASSERT_EQ(numRows, kExpectedRows);
ASSERT_EQ(writeFiles.size(), 1);

// Verify that the data is written to S3 correctly by scanning the file.
auto tableScan = PlanBuilder(pool_.get()).tableScan(rowType).planNode();
CursorParameters params;
params.planNode = tableScan;
const int numSplitsPerFile = 1;
bool noMoreSplits = false;
auto addSplits = [&](exec::Task* task) {
if (!noMoreSplits) {
auto const splits = HiveConnectorTestBase::makeHiveConnectorSplits(
writeFiles[0], numSplitsPerFile, dwio::common::FileFormat::PARQUET);
for (const auto& split : splits) {
task->addSplit("0", exec::Split(split));
}
task->noMoreSplits("0");
}
noMoreSplits = true;
};
auto scanResult = readCursor(params, addSplits);
assertEqualResults(scanResult.second, {input});
}

int main(int argc, char** argv) {
testing::InitGoogleTest(&argc, argv);
folly::init(&argc, &argv, false);
return RUN_ALL_TESTS();
}
6 changes: 5 additions & 1 deletion velox/docs/functions/presto/conversion.rst
Original file line number Diff line number Diff line change
Expand Up @@ -197,7 +197,7 @@ supported conversions to/from JSON are listed in :doc:`json`.
- Y
- Y
- Y
-
- Y
-
-
-
Expand Down Expand Up @@ -482,6 +482,10 @@ Valid examples
SELECT cast(infinity() as varchar); -- 'Infinity'
SELECT cast(true as varchar); -- 'true'
SELECT cast(timestamp '1970-01-01 00:00:00' as varchar); -- '1970-01-01T00:00:00.000'
SELECT cast(cast(22.51 as DECIMAL(5, 3)) as varchar); -- '22.510'
SELECT cast(cast(-22.51 as DECIMAL(4, 2)) as varchar); -- '-22.51'
SELECT cast(cast(0.123 as DECIMAL(3, 3)) as varchar); -- '0.123'
SELECT cast(cast(1 as DECIMAL(6, 2)) as varchar); -- '1.00'

Cast to TIMESTAMP
-----------------
Expand Down
Loading
Loading