Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Unnest respect kPreferedOutputBatchRows strictly #493

Closed
wants to merge 1 commit into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
170 changes: 148 additions & 22 deletions velox/exec/Unnest.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
*/

#include "velox/exec/Unnest.h"
#include <iostream>
#include "velox/common/base/Nulls.h"
#include "velox/vector/FlatVector.h"

Expand All @@ -29,7 +30,8 @@ Unnest::Unnest(
operatorId,
unnestNode->id(),
"Unnest"),
withOrdinality_(unnestNode->withOrdinality()) {
withOrdinality_(unnestNode->withOrdinality()),
maxOutputSize_(outputBatchRows()) {
const auto& inputType = unnestNode->sources()[0]->outputType();
const auto& unnestVariables = unnestNode->unnestVariables();
for (const auto& variable : unnestVariables) {
Expand Down Expand Up @@ -111,20 +113,61 @@ RowVectorPtr Unnest::getOutput() {
}

const auto size = input_->size();
const auto maxOutputSize = outputBatchRows();

// Limit the number of input rows to keep output batch size within
// 'maxOutputSize' if possible. Process each input row fully. Do not break
// single row's output into multiple batches.
// 'maxOutputSize' if possible. Not process each input row fully when single
// row's output exceeds maxOutputSize. Single row's output maybe into
// multiple batches.
vector_size_t numInput = 0;
vector_size_t numElements = 0;
for (auto row = nextInputRow_; row < size; ++row) {
numElements += rawMaxSizes_[row];
vector_size_t partialProcessRowStartSize = -1;
vector_size_t firstRowEndSize = -1;
// Process first row.
if (nextInputRow_ < size) {
auto firstRow = nextInputRow_;
firstRowEndSize = rawMaxSizes_[firstRow];
vector_size_t remainingSize = firstRowEndSize - firstRowStartSize_;
if (numElements + remainingSize > maxOutputSize_) {
// Single row's output is into multiple batches.
// Read the size range from them, not use 0 to rawMaxSizes_[row].
firstRowEndSize = firstRowStartSize_ + maxOutputSize_ - numElements;
// Process maxOutputSize_ in this getOutput.
numElements = maxOutputSize_;
partialProcessRowStartSize = firstRowEndSize;
} else {
// Not need to split this row
numElements += remainingSize;
}
++numInput;

if (numElements >= maxOutputSize) {
}
// Not split middle row.
// If there is only 1 row, the end row will not take effect, its startSize is
// always 0.
vector_size_t endRowEndSize = -1;
for (auto row = nextInputRow_ + 1; row < size; ++row) {
if (numElements >= maxOutputSize_) {
break;
}
vector_size_t remainingSize = rawMaxSizes_[row];
if (numElements + remainingSize > maxOutputSize_) {
// This is the end row.
// Single row's output is into multiple batches.
// read the size range from them, not use 0 to rawMaxSizes_[row].
endRowEndSize = maxOutputSize_ - numElements;
// Process maxOutputSize_ in this getOutput.
numElements = maxOutputSize_;
partialProcessRowStartSize = endRowEndSize;
++numInput;
break;
} else {
// Not split this row.
numElements += remainingSize;
++numInput;
}
}
// The end row is not partial, set it to the maxSize.
if (endRowEndSize == -1 && numInput > 1) {
endRowEndSize = rawMaxSizes_[nextInputRow_ + numInput - 1];
}

if (numElements == 0) {
Expand All @@ -134,9 +177,20 @@ RowVectorPtr Unnest::getOutput() {
return nullptr;
}

auto output = generateOutput(nextInputRow_, numInput, numElements);

nextInputRow_ += numInput;
std::cout << "row1 startSize " << firstRowStartSize_ << " endSize "
<< firstRowEndSize << " end row endSize " << endRowEndSize
<< std::endl;

auto output = generateOutput(
nextInputRow_, numInput, numElements, firstRowEndSize, endRowEndSize);
std::cout << "generate output" << output->toString(0, 300) << std::endl;
if (partialProcessRowStartSize != -1) {
firstRowStartSize_ = partialProcessRowStartSize;
nextInputRow_ += numInput - 1;
} else {
firstRowStartSize_ = 0;
nextInputRow_ += numInput;
}

if (nextInputRow_ >= size) {
input_ = nullptr;
Expand All @@ -150,17 +204,29 @@ void Unnest::generateRepeatedColumns(
vector_size_t start,
vector_size_t size,
vector_size_t numElements,
std::vector<VectorPtr>& outputs) {
std::vector<VectorPtr>& outputs,
vector_size_t firstRowEndSize,
vector_size_t endRowEndSize) {
// Create "indices" buffer to repeat rows as many times as there are elements
// in the array (or map) in unnestDecoded.
auto repeatedIndices = allocateIndices(numElements, pool());
auto* rawRepeatedIndices = repeatedIndices->asMutable<vector_size_t>();
vector_size_t index = 0;
for (auto row = start; row < start + size; ++row) {
if (size > 0) {
for (auto i = firstRowStartSize_; i < firstRowEndSize; i++) {
rawRepeatedIndices[index++] = start;
}
}
for (auto row = start + 1; row < start + size - 1; ++row) {
for (auto i = 0; i < rawMaxSizes_[row]; i++) {
rawRepeatedIndices[index++] = row;
}
}
if (size > 1) {
for (auto i = 0; i < endRowEndSize; i++) {
rawRepeatedIndices[index++] = start + size - 1;
}
}

// Wrap "replicated" columns in a dictionary using 'repeatedIndices'.
for (const auto& projection : identityProjections_) {
Expand All @@ -176,7 +242,9 @@ const Unnest::UnnestChannelEncoding Unnest::generateEncodingForChannel(
column_index_t channel,
vector_size_t start,
vector_size_t size,
vector_size_t numElements) {
vector_size_t numElements,
vector_size_t firstRowEndSize,
vector_size_t endRowEndSize) {
BufferPtr elementIndices = allocateIndices(numElements, pool());
auto* rawElementIndices = elementIndices->asMutable<vector_size_t>();

Expand All @@ -191,7 +259,45 @@ const Unnest::UnnestChannelEncoding Unnest::generateEncodingForChannel(
// Make dictionary index for elements column since they may be out of order.
vector_size_t index = 0;
bool identityMapping = true;
for (auto row = start; row < start + size; ++row) {
if (firstRowStartSize_ != 0) {
identityMapping = false;
}

auto firstEndRowGenerator =
[&](vector_size_t row, vector_size_t startSize, vector_size_t endSize) {
if (!currentDecoded.isNullAt(row)) {
const auto offset = currentOffsets[currentIndices[row]];
const auto unnestSize = currentSizes[currentIndices[row]];
if (index != offset || endSize != rawMaxSizes_[row] ||
unnestSize < endSize) {
identityMapping = false;
}
auto currentUnnestSize = std::min(endSize, unnestSize);
std::cout << "for channel " << channel << " numElements "
<< numElements << " for row " << row << "startSize "
<< startSize << "endSize " << endSize << " offset "
<< offset << " unnestSize " << unnestSize << std::endl;
for (auto i = startSize; i < currentUnnestSize; i++) {
rawElementIndices[index++] = offset + i;
}

for (auto i = currentUnnestSize; i < endSize; ++i) {
bits::setNull(rawNulls, index++, true);
}
} else if (endSize - startSize > 0) {
identityMapping = false;

for (auto i = startSize; i < endSize; ++i) {
bits::setNull(rawNulls, index++, true);
}
}
};

if (size > 0) {
firstEndRowGenerator(start, firstRowStartSize_, firstRowEndSize);
}

for (auto row = start + 1; row < start + size - 1; ++row) {
const auto maxSize = rawMaxSizes_[row];

if (!currentDecoded.isNullAt(row)) {
Expand All @@ -217,40 +323,59 @@ const Unnest::UnnestChannelEncoding Unnest::generateEncodingForChannel(
}
}
}

if (size > 1) {
firstEndRowGenerator(start + size - 1, 0, endRowEndSize);
}

return {elementIndices, nulls, identityMapping};
}

VectorPtr Unnest::generateOrdinalityVector(
vector_size_t start,
vector_size_t size,
vector_size_t numElements) {
vector_size_t numElements,
vector_size_t firstRowEndSize,
vector_size_t endRowEndSize) {
auto ordinalityVector =
BaseVector::create<FlatVector<int64_t>>(BIGINT(), numElements, pool());

// Set the ordinality at each result row to be the index of the element in
// the original array (or map) plus one.
auto* rawOrdinality = ordinalityVector->mutableRawValues();
for (auto row = start; row < start + size; ++row) {
if (size > 0) {
const auto maxSize = firstRowEndSize - firstRowStartSize_;
std::iota(rawOrdinality, rawOrdinality + maxSize, firstRowStartSize_ + 1);
rawOrdinality += maxSize;
}
for (auto row = start + 1; row < start + size - 1; ++row) {
const auto maxSize = rawMaxSizes_[row];
std::iota(rawOrdinality, rawOrdinality + maxSize, 1);
rawOrdinality += maxSize;
}
if (size > 1) {
std::iota(rawOrdinality, rawOrdinality + endRowEndSize, 1);
rawOrdinality += endRowEndSize;
}

return ordinalityVector;
}

RowVectorPtr Unnest::generateOutput(
vector_size_t start,
vector_size_t size,
vector_size_t numElements) {
vector_size_t numElements,
vector_size_t firstRowEndSize,
vector_size_t endRowEndSize) {
std::vector<VectorPtr> outputs(outputType_->size());
generateRepeatedColumns(start, size, numElements, outputs);
generateRepeatedColumns(
start, size, numElements, outputs, firstRowEndSize, endRowEndSize);

// Create unnest columns.
vector_size_t outputsIndex = identityProjections_.size();
for (auto channel = 0; channel < unnestChannels_.size(); ++channel) {
const auto unnestChannelEncoding =
generateEncodingForChannel(channel, start, size, numElements);
const auto unnestChannelEncoding = generateEncodingForChannel(
channel, start, size, numElements, firstRowEndSize, endRowEndSize);

auto& currentDecoded = unnestDecoded_[channel];
if (currentDecoded.base()->typeKind() == TypeKind::ARRAY) {
Expand All @@ -272,7 +397,8 @@ RowVectorPtr Unnest::generateOutput(

if (withOrdinality_) {
// Ordinality column is always at the end.
outputs.back() = generateOrdinalityVector(start, size, numElements);
outputs.back() = generateOrdinalityVector(
start, size, numElements, firstRowEndSize, endRowEndSize);
}

return std::make_shared<RowVector>(
Expand Down
20 changes: 16 additions & 4 deletions velox/exec/Unnest.h
Original file line number Diff line number Diff line change
Expand Up @@ -40,22 +40,27 @@ class Unnest : public Operator {

private:
// Generate output for 'size' input rows starting from 'start' input row.
// Get the firstRowStartSize from class member `firstRowStartSize_`.
//
// @param start First input row to include in the output.
// @param size Number of input rows to include in the output.
// @param outputSize Pre-computed number of output rows.
RowVectorPtr generateOutput(
vector_size_t start,
vector_size_t size,
vector_size_t outputSize);
vector_size_t outputSize,
vector_size_t firstRowEndSize,
vector_size_t endRowEndSize);

// Invoked by generateOutput function above to generate the repeated output
// columns.
void generateRepeatedColumns(
vector_size_t start,
vector_size_t size,
vector_size_t numElements,
std::vector<VectorPtr>& outputs);
std::vector<VectorPtr>& outputs,
vector_size_t firstRowEndSize,
vector_size_t endRowEndSize);

struct UnnestChannelEncoding {
BufferPtr indices;
Expand All @@ -71,22 +76,29 @@ class Unnest : public Operator {
column_index_t channel,
vector_size_t start,
vector_size_t size,
vector_size_t numElements);
vector_size_t numElements,
vector_size_t firstRowEndSize,
vector_size_t endRowEndSize);

// Invoked by generateOutput for the ordinality column.
VectorPtr generateOrdinalityVector(
vector_size_t start,
vector_size_t size,
vector_size_t numElements);
vector_size_t numElements,
vector_size_t firstRowEndSize,
vector_size_t endRowEndSize);

const bool withOrdinality_;
std::vector<column_index_t> unnestChannels_;

std::vector<DecodedVector> unnestDecoded_;

const uint32_t maxOutputSize_;
BufferPtr maxSizes_;
vector_size_t* rawMaxSizes_{nullptr};

vector_size_t firstRowStartSize_ = 0;

std::vector<const vector_size_t*> rawSizes_;
std::vector<const vector_size_t*> rawOffsets_;
std::vector<const vector_size_t*> rawIndices_;
Expand Down
Loading
Loading