Skip to content

Commit

Permalink
Add OOM Protection Support for Multi-Stage Queries (apache#13598)
Browse files Browse the repository at this point in the history
track cpu and memory usage in multi-stage queries if query resource usage tracking is enabled
  • Loading branch information
vrajat authored Sep 3, 2024
1 parent 3f324a4 commit d008709
Show file tree
Hide file tree
Showing 29 changed files with 399 additions and 25 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -64,10 +64,12 @@
import org.apache.pinot.query.runtime.MultiStageStatsTreeBuilder;
import org.apache.pinot.query.runtime.plan.MultiStageQueryStats;
import org.apache.pinot.query.service.dispatch.QueryDispatcher;
import org.apache.pinot.spi.accounting.ThreadExecutionContext;
import org.apache.pinot.spi.auth.TableAuthorizationResult;
import org.apache.pinot.spi.env.PinotConfiguration;
import org.apache.pinot.spi.exception.DatabaseConflictException;
import org.apache.pinot.spi.trace.RequestContext;
import org.apache.pinot.spi.trace.Tracing;
import org.apache.pinot.spi.utils.CommonConstants;
import org.apache.pinot.spi.utils.builder.TableNameBuilder;
import org.apache.pinot.sql.parsers.SqlNodeAndOptions;
Expand Down Expand Up @@ -210,6 +212,8 @@ protected BrokerResponse handleRequest(long requestId, String query, @Nullable S
return new BrokerResponseNative(QueryException.getException(QueryException.QUOTA_EXCEEDED_ERROR, errorMessage));
}

Tracing.ThreadAccountantOps.setupRunner(String.valueOf(requestId), ThreadExecutionContext.TaskType.MSE);

long executionStartTimeNs = System.nanoTime();
QueryDispatcher.QueryResult queryResults;
try {
Expand All @@ -228,6 +232,8 @@ protected BrokerResponse handleRequest(long requestId, String query, @Nullable S
requestContext.setErrorCode(QueryException.QUERY_EXECUTION_ERROR_CODE);
return new BrokerResponseNative(
QueryException.getException(QueryException.QUERY_EXECUTION_ERROR, consolidatedMessage));
} finally {
Tracing.getThreadAccountant().clear();
}
long executionEndTimeNs = System.nanoTime();
updatePhaseTimingForTables(tableNames, BrokerQueryPhase.QUERY_EXECUTION, executionEndTimeNs - executionStartTimeNs);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -103,8 +103,15 @@ public int getTaskId() {
return taskEntry == null ? -1 : taskEntry.getTaskId();
}

public void setThreadTaskStatus(@Nonnull String queryId, int taskId, @Nonnull Thread anchorThread) {
_currentThreadTaskStatus.set(new TaskEntry(queryId, taskId, anchorThread));
@Override
public ThreadExecutionContext.TaskType getTaskType() {
TaskEntry taskEntry = _currentThreadTaskStatus.get();
return taskEntry == null ? ThreadExecutionContext.TaskType.UNKNOWN : taskEntry.getTaskType();
}

public void setThreadTaskStatus(@Nonnull String queryId, int taskId, ThreadExecutionContext.TaskType taskType,
@Nonnull Thread anchorThread) {
_currentThreadTaskStatus.set(new TaskEntry(queryId, taskId, taskType, anchorThread));
}
}

Expand All @@ -117,15 +124,17 @@ public static class TaskEntry implements ThreadExecutionContext {
private final String _queryId;
private final int _taskId;
private final Thread _anchorThread;
private final TaskType _taskType;

public boolean isAnchorThread() {
return _taskId == CommonConstants.Accounting.ANCHOR_TASK_ID;
}

public TaskEntry(String queryId, int taskId, Thread anchorThread) {
public TaskEntry(String queryId, int taskId, TaskType taskType, Thread anchorThread) {
_queryId = queryId;
_taskId = taskId;
_anchorThread = anchorThread;
_taskType = taskType;
}

public String getQueryId() {
Expand All @@ -140,6 +149,11 @@ public Thread getAnchorThread() {
return _anchorThread;
}

@Override
public TaskType getTaskType() {
return _taskType;
}

@Override
public String toString() {
return "TaskEntry{" + "_queryId='" + _queryId + '\'' + ", _taskId=" + _taskId + ", _rootThread=" + _anchorThread
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -293,10 +293,10 @@ public void createExecutionContextInner(@Nullable String queryId, int taskId, @N
// is anchor thread
assert queryId != null;
_threadLocalEntry.get().setThreadTaskStatus(queryId, CommonConstants.Accounting.ANCHOR_TASK_ID,
Thread.currentThread());
ThreadExecutionContext.TaskType.UNKNOWN, Thread.currentThread());
} else {
// not anchor thread
_threadLocalEntry.get().setThreadTaskStatus(parentContext.getQueryId(), taskId,
_threadLocalEntry.get().setThreadTaskStatus(parentContext.getQueryId(), taskId, parentContext.getTaskType(),
parentContext.getAnchorThread());
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,7 @@
import org.apache.pinot.query.runtime.plan.pipeline.PipelineBreakerExecutor;
import org.apache.pinot.query.runtime.plan.pipeline.PipelineBreakerResult;
import org.apache.pinot.query.runtime.plan.server.ServerPlanRequestUtils;
import org.apache.pinot.spi.accounting.ThreadExecutionContext;
import org.apache.pinot.spi.env.PinotConfiguration;
import org.apache.pinot.spi.utils.CommonConstants;
import org.apache.pinot.spi.utils.CommonConstants.Broker.Request.QueryOptionKey;
Expand Down Expand Up @@ -152,7 +153,8 @@ public void shutDown() {
* <p>This execution entry point should be asynchronously called by the request handler and caller should not wait
* for results/exceptions.</p>
*/
public void processQuery(WorkerMetadata workerMetadata, StagePlan stagePlan, Map<String, String> requestMetadata) {
public void processQuery(WorkerMetadata workerMetadata, StagePlan stagePlan, Map<String, String> requestMetadata,
@Nullable ThreadExecutionContext parentContext) {
long requestId = Long.parseLong(requestMetadata.get(CommonConstants.Query.Request.MetadataKeys.REQUEST_ID));
long timeoutMs = Long.parseLong(requestMetadata.get(CommonConstants.Broker.Request.QueryOptionKey.TIMEOUT_MS));
long deadlineMs = System.currentTimeMillis() + timeoutMs;
Expand All @@ -163,7 +165,7 @@ public void processQuery(WorkerMetadata workerMetadata, StagePlan stagePlan, Map
// run pre-stage execution for all pipeline breakers
PipelineBreakerResult pipelineBreakerResult =
PipelineBreakerExecutor.executePipelineBreakers(_opChainScheduler, _mailboxService, workerMetadata, stagePlan,
opChainMetadata, requestId, deadlineMs);
opChainMetadata, requestId, deadlineMs, parentContext);

// Send error block to all the receivers if pipeline breaker fails
if (pipelineBreakerResult != null && pipelineBreakerResult.getErrorBlock() != null) {
Expand Down Expand Up @@ -196,7 +198,7 @@ public void processQuery(WorkerMetadata workerMetadata, StagePlan stagePlan, Map
// run OpChain
OpChainExecutionContext executionContext =
new OpChainExecutionContext(_mailboxService, requestId, deadlineMs, opChainMetadata, stageMetadata,
workerMetadata, pipelineBreakerResult);
workerMetadata, pipelineBreakerResult, parentContext);
OpChain opChain;
if (workerMetadata.isLeafStageWorker()) {
opChain = ServerPlanRequestUtils.compileLeafStage(executionContext, stagePlan, _helixManager, _serverMetrics,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,9 @@
import org.apache.pinot.query.runtime.blocks.TransferableBlock;
import org.apache.pinot.query.runtime.operator.OpChain;
import org.apache.pinot.query.runtime.operator.OpChainId;
import org.apache.pinot.spi.accounting.ThreadExecutionContext;
import org.apache.pinot.spi.accounting.ThreadResourceUsageProvider;
import org.apache.pinot.spi.trace.Tracing;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

Expand All @@ -50,6 +53,10 @@ public void runJob() {
TransferableBlock returnedErrorBlock = null;
Throwable thrown = null;
try {
ThreadResourceUsageProvider threadResourceUsageProvider = new ThreadResourceUsageProvider();
Tracing.ThreadAccountantOps.setupWorker(operatorChain.getId().getStageId(),
ThreadExecutionContext.TaskType.MSE, threadResourceUsageProvider,
operatorChain.getParentContext());
LOGGER.trace("({}): Executing", operatorChain);
TransferableBlock result = operatorChain.getRoot().nextBlock();
while (!result.isEndOfStreamBlock()) {
Expand All @@ -76,6 +83,7 @@ public void runJob() {
} else if (isFinished) {
operatorChain.close();
}
Tracing.ThreadAccountantOps.clear();
}
}
});
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -173,6 +173,7 @@ private TransferableBlock consumeGroupBy() {
TransferableBlock block = _input.nextBlock();
while (block.isDataBlock()) {
_groupByExecutor.processBlock(block);
sampleAndCheckInterruption();
block = _input.nextBlock();
}
return block;
Expand All @@ -187,6 +188,7 @@ private TransferableBlock consumeAggregation() {
TransferableBlock block = _input.nextBlock();
while (block.isDataBlock()) {
_aggregationExecutor.processBlock(block);
sampleAndCheckInterruption();
block = _input.nextBlock();
}
return block;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -253,6 +253,7 @@ private void buildBroadcastHashTable()
hashCollection.add(row);
}
_currentRowsInHashTable += container.size();
sampleAndCheckInterruption();
rightBlock = _rightInput.nextBlock();
}
if (rightBlock.isErrorBlock()) {
Expand Down Expand Up @@ -297,6 +298,7 @@ private TransferableBlock buildJoinedDataBlock() throws ProcessingException {
}
assert leftBlock.isDataBlock();
List<Object[]> rows = buildJoinedRows(leftBlock);
sampleAndCheckInterruption();
if (!rows.isEmpty()) {
return new TransferableBlock(rows, _resultSchema, DataBlock.Type.ROW);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,8 @@ protected TransferableBlock getNextBlock() {
}
if (block.isSuccessfulEndOfStreamBlock()) {
updateEosBlock(block, _statMap);
} else if (block.isDataBlock()) {
sampleAndCheckInterruption();
}
return block;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -143,6 +143,7 @@ protected TransferableBlock getNextBlock() {
earlyTerminate();
}
}
sampleAndCheckInterruption();
return block;
} catch (QueryCancelledException e) {
LOGGER.debug("Query was cancelled! for opChain: {}", _context.getId());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,15 @@ public MultiStageOperator(OpChainExecutionContext context) {

public abstract void registerExecution(long time, int numRows);

// Samples resource usage of the operator. The operator should call this function for every block of data or
// assuming the block holds 10000 rows or more.
protected void sampleAndCheckInterruption() {
Tracing.ThreadAccountantOps.sample();
if (Tracing.ThreadAccountantOps.isInterrupted()) {
earlyTerminate();
}
}

/**
* Returns the next block from the operator. It should return non-empty data blocks followed by an end-of-stream (EOS)
* block when all the data is processed, or an error block if an error occurred. After it returns EOS or error block,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
import org.apache.pinot.core.common.Operator;
import org.apache.pinot.query.runtime.blocks.TransferableBlock;
import org.apache.pinot.query.runtime.plan.OpChainExecutionContext;
import org.apache.pinot.spi.accounting.ThreadExecutionContext;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

Expand All @@ -36,6 +37,7 @@ public class OpChain implements AutoCloseable {
private final OpChainId _id;
private final MultiStageOperator _root;
private final Consumer<OpChainId> _finishCallback;
private final ThreadExecutionContext _parentContext;

public OpChain(OpChainExecutionContext context, MultiStageOperator root) {
this(context, root, (id) -> {
Expand All @@ -46,6 +48,7 @@ public OpChain(OpChainExecutionContext context, MultiStageOperator root, Consume
_id = context.getId();
_root = root;
_finishCallback = finishCallback;
_parentContext = context.getParentContext();
}

public OpChainId getId() {
Expand All @@ -56,6 +59,10 @@ public Operator<TransferableBlock> getRoot() {
return _root;
}

public ThreadExecutionContext getParentContext() {
return _parentContext;
}

@Override
public String toString() {
return "OpChain{" + _id + "}";
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,10 @@ public int getVirtualServerId() {
return _virtualServerId;
}

public int getStageId() {
return _stageId;
}

@Override
public String toString() {
return String.format("%s_%s_%s", _requestId, _virtualServerId, _stageId);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -119,6 +119,7 @@ protected void constructRightBlockSet() {
_rightRowSet.add(new Record(row));
}
}
sampleAndCheckInterruption();
block = _rightChildOperator.nextBlock();
}
if (block.isErrorBlock()) {
Expand Down Expand Up @@ -153,6 +154,7 @@ protected TransferableBlock constructResultBlockSet() {
rows.add(row);
}
}
sampleAndCheckInterruption();
if (!rows.isEmpty()) {
return new TransferableBlock(rows, _dataSchema, DataBlock.Type.ROW);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -181,6 +181,7 @@ private TransferableBlock consumeInputBlocks() {
for (Object[] row : container) {
SelectionOperatorUtils.addToPriorityQueue(row, _priorityQueue, _numRowsToKeep);
}
sampleAndCheckInterruption();
}
block = _input.nextBlock();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -269,6 +269,7 @@ private TransferableBlock computeBlocks()
_partitionRows.computeIfAbsent(key, k -> new ArrayList<>()).add(row);
}
_numRows += containerSize;
sampleAndCheckInterruption();
block = _input.nextBlock();
}
// Early termination if the block is an error block
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@
import org.apache.pinot.query.runtime.operator.OpChainId;
import org.apache.pinot.query.runtime.plan.pipeline.PipelineBreakerResult;
import org.apache.pinot.query.runtime.plan.server.ServerPlanRequestContext;
import org.apache.pinot.spi.accounting.ThreadExecutionContext;
import org.apache.pinot.spi.utils.CommonConstants;


Expand All @@ -48,12 +49,13 @@ public class OpChainExecutionContext {
@Nullable
private final PipelineBreakerResult _pipelineBreakerResult;
private final boolean _traceEnabled;
private final ThreadExecutionContext _parentContext;

private ServerPlanRequestContext _leafStageContext;

public OpChainExecutionContext(MailboxService mailboxService, long requestId, long deadlineMs,
Map<String, String> opChainMetadata, StageMetadata stageMetadata, WorkerMetadata workerMetadata,
@Nullable PipelineBreakerResult pipelineBreakerResult) {
@Nullable PipelineBreakerResult pipelineBreakerResult, @Nullable ThreadExecutionContext parentContext) {
_mailboxService = mailboxService;
_requestId = requestId;
_deadlineMs = deadlineMs;
Expand All @@ -65,6 +67,7 @@ public OpChainExecutionContext(MailboxService mailboxService, long requestId, lo
_id = new OpChainId(requestId, workerMetadata.getWorkerId(), stageMetadata.getStageId());
_pipelineBreakerResult = pipelineBreakerResult;
_traceEnabled = Boolean.parseBoolean(opChainMetadata.get(CommonConstants.Broker.Request.TRACE));
_parentContext = parentContext;
}

public MailboxService getMailboxService() {
Expand Down Expand Up @@ -123,4 +126,9 @@ public ServerPlanRequestContext getLeafStageContext() {
public void setLeafStageContext(ServerPlanRequestContext leafStageContext) {
_leafStageContext = leafStageContext;
}

@Nullable
public ThreadExecutionContext getParentContext() {
return _parentContext;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@
import org.apache.pinot.query.runtime.operator.OpChain;
import org.apache.pinot.query.runtime.plan.OpChainExecutionContext;
import org.apache.pinot.query.runtime.plan.PhysicalPlanVisitor;
import org.apache.pinot.spi.accounting.ThreadExecutionContext;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

Expand All @@ -50,6 +51,14 @@ private PipelineBreakerExecutor() {

private static final Logger LOGGER = LoggerFactory.getLogger(PipelineBreakerExecutor.class);

@Nullable
public static PipelineBreakerResult executePipelineBreakers(OpChainSchedulerService scheduler,
MailboxService mailboxService, WorkerMetadata workerMetadata, StagePlan stagePlan,
Map<String, String> opChainMetadata, long requestId, long deadlineMs) {
return executePipelineBreakers(scheduler, mailboxService, workerMetadata, stagePlan, opChainMetadata, requestId,
deadlineMs, null);
}

/**
* Execute a pipeline breaker and collect the results (synchronously). Currently, pipeline breaker executor can only
* execute mailbox receive pipeline breaker.
Expand All @@ -61,14 +70,16 @@ private PipelineBreakerExecutor() {
* @param opChainMetadata request metadata, including query options
* @param requestId request ID
* @param deadlineMs execution deadline
* @param parentContext Parent thread metadata
* @return pipeline breaker result;
* - If exception occurs, exception block will be wrapped in {@link TransferableBlock} and assigned to each PB node.
* - Normal stats will be attached to each PB node and downstream execution should return with stats attached.
*/
@Nullable
public static PipelineBreakerResult executePipelineBreakers(OpChainSchedulerService scheduler,
MailboxService mailboxService, WorkerMetadata workerMetadata, StagePlan stagePlan,
Map<String, String> opChainMetadata, long requestId, long deadlineMs) {
Map<String, String> opChainMetadata, long requestId, long deadlineMs,
@Nullable ThreadExecutionContext parentContext) {
PipelineBreakerContext pipelineBreakerContext = new PipelineBreakerContext();
PipelineBreakerVisitor.visitPlanRoot(stagePlan.getRootNode(), pipelineBreakerContext);
if (!pipelineBreakerContext.getPipelineBreakerMap().isEmpty()) {
Expand All @@ -78,7 +89,7 @@ public static PipelineBreakerResult executePipelineBreakers(OpChainSchedulerServ
// see also: MailboxIdUtils TODOs, de-couple mailbox id from query information
OpChainExecutionContext opChainExecutionContext =
new OpChainExecutionContext(mailboxService, requestId, deadlineMs, opChainMetadata,
stagePlan.getStageMetadata(), workerMetadata, null);
stagePlan.getStageMetadata(), workerMetadata, null, parentContext);
return execute(scheduler, pipelineBreakerContext, opChainExecutionContext);
} catch (Exception e) {
LOGGER.error("Caught exception executing pipeline breaker for request: {}, stage: {}", requestId,
Expand Down
Loading

0 comments on commit d008709

Please sign in to comment.