Skip to content

Commit

Permalink
Merge branch 'trinodb:master' into master
Browse files Browse the repository at this point in the history
  • Loading branch information
kavyabala23 authored Oct 31, 2024
2 parents a83fca4 + 8818f05 commit 0aa8156
Show file tree
Hide file tree
Showing 181 changed files with 1,868 additions and 1,398 deletions.
9 changes: 7 additions & 2 deletions .github/workflows/upload-test-results.yml
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,9 @@ on:
types:
- completed

permissions:
actions: read

defaults:
run:
shell: bash --noprofile --norc -euo pipefail {0}
Expand Down Expand Up @@ -52,6 +55,7 @@ jobs:
AWS_ACCESS_KEY_ID: ${{ vars.TEST_RESULTS_AWS_ACCESS_KEY_ID }}
AWS_SECRET_ACCESS_KEY: ${{ secrets.TEST_RESULTS_AWS_SECRET_ACCESS_KEY }}
AWS_DEFAULT_REGION: us-east-2
BRANCH_NAME: ${{ github.event.workflow_run.head_branch }}
if: env.S3_BUCKET != '' && env.AWS_ACCESS_KEY_ID != '' && env.AWS_SECRET_ACCESS_KEY != ''
shell: bash --noprofile --norc -euo pipefail {0}
run: |
Expand Down Expand Up @@ -92,9 +96,10 @@ jobs:
continue;
fi
jq -c \
--argjson addObj '{"branch":"${{ github.event.workflow_run.head_branch }}","git_sha":"${{ github.event.workflow_run.head_sha }}","workflow_name":"${{ github.event.workflow.name }}","workflow_run":"${{ github.event.workflow_run.id }}","workflow_conclusion":"${{ github.event.workflow_run.conclusion }}","workflow_job":"","workflow_run_attempt":"${{ github.event.workflow_run.run_attempt }}","timestamp":""}' \
--argjson addObj '{"branch":"","git_sha":"${{ github.event.workflow_run.head_sha }}","workflow_name":"${{ github.event.workflow.name }}","workflow_run":"${{ github.event.workflow_run.id }}","workflow_conclusion":"${{ github.event.workflow_run.conclusion }}","workflow_job":"","workflow_run_attempt":"${{ github.event.workflow_run.run_attempt }}","timestamp":""}' \
--arg timestamp "$(date -u '+%F %T.%3NZ')" \
'. + $addObj | .timestamp=$timestamp' "$filename" | gzip -c > "$artifact_id"
--arg branch "$BRANCH_NAME" \
'. + $addObj | .branch=$branch | .timestamp=$timestamp' "$filename" | gzip -c > "$artifact_id"
aws s3 cp --no-progress "$artifact_id" "s3://$S3_BUCKET/tests/results/type=$(basename "$filename" .ndjson)/repo=$(basename "${{ github.repository }}")/date_created=$(date -u '+%Y-%m-%d')/$artifact_id"
done
2 changes: 1 addition & 1 deletion client/trino-cli/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@
<parent>
<groupId>io.trino</groupId>
<artifactId>trino-root</artifactId>
<version>464-SNAPSHOT</version>
<version>465-SNAPSHOT</version>
<relativePath>../../pom.xml</relativePath>
</parent>

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,8 +20,8 @@
import io.trino.client.Column;
import io.trino.client.JsonCodec;
import io.trino.client.QueryResults;
import io.trino.client.RawQueryData;
import io.trino.client.StatementStats;
import io.trino.client.TypedQueryData;
import io.trino.client.uri.PropertyName;
import io.trino.client.uri.TrinoUri;
import okhttp3.mockwebserver.MockResponse;
Expand Down Expand Up @@ -136,7 +136,7 @@ static String createResults(MockWebServer server)
null,
null,
ImmutableList.of(new Column("_col0", BIGINT, new ClientTypeSignature(BIGINT))),
RawQueryData.of(ImmutableList.of(ImmutableList.of(123))),
TypedQueryData.of(ImmutableList.of(ImmutableList.of(123))),
StatementStats.builder()
.setState("FINISHED")
.setProgressPercentage(OptionalDouble.empty())
Expand Down
2 changes: 1 addition & 1 deletion client/trino-client/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@
<parent>
<groupId>io.trino</groupId>
<artifactId>trino-root</artifactId>
<version>464-SNAPSHOT</version>
<version>465-SNAPSHOT</version>
<relativePath>../../pom.xml</relativePath>
</parent>

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -83,8 +83,8 @@ public ResultRows toRows(List<Column> columns, QueryData data)
}

verify(columns != null && !columns.isEmpty(), "Columns must be set when decoding data");
if (data instanceof RawQueryData) {
RawQueryData rawData = (RawQueryData) data;
if (data instanceof TypedQueryData) {
TypedQueryData rawData = (TypedQueryData) data;
if (rawData.isNull()) {
return NULL_ROWS; // for backward compatibility instead of null
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,12 +24,12 @@
* Class represents QueryData of already typed values
*
*/
public class RawQueryData
public class TypedQueryData
implements QueryData
{
private final Iterable<List<Object>> iterable;

private RawQueryData(Iterable<List<Object>> values)
private TypedQueryData(Iterable<List<Object>> values)
{
this.iterable = values == null ? null : unmodifiableIterable(values);
}
Expand All @@ -42,7 +42,7 @@ public Iterable<List<Object>> getIterable()

public static QueryData of(@Nullable Iterable<List<Object>> values)
{
return new RawQueryData(values);
return new TypedQueryData(values);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -161,7 +161,7 @@ private String newQueryResults(MockWebServer server)
Stream.of(new Column("id", INTEGER, new ClientTypeSignature("integer")),
new Column("name", VARCHAR, new ClientTypeSignature("varchar")))
.collect(toList()),
RawQueryData.of(IntStream.range(0, numRecords)
TypedQueryData.of(IntStream.range(0, numRecords)
.mapToObj(index -> Stream.of((Object) index, "a").collect(toList()))
.collect(toList())),
StatementStats.builder()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -140,7 +140,7 @@ private String newQueryResults(String state)
Stream.of(new Column("id", INTEGER, new ClientTypeSignature("integer")),
new Column("name", VARCHAR, new ClientTypeSignature("varchar")))
.collect(toList()),
RawQueryData.of(IntStream.range(0, numRecords)
TypedQueryData.of(IntStream.range(0, numRecords)
.mapToObj(index -> Stream.of((Object) index, "a").collect(toList()))
.collect(toList())),
new StatementStats(state, state.equals("QUEUED"), true, OptionalDouble.of(0), OptionalDouble.of(0), 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, null),
Expand Down
2 changes: 1 addition & 1 deletion client/trino-jdbc/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@
<parent>
<groupId>io.trino</groupId>
<artifactId>trino-root</artifactId>
<version>464-SNAPSHOT</version>
<version>465-SNAPSHOT</version>
<relativePath>../../pom.xml</relativePath>
</parent>

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,8 +21,8 @@
import io.trino.client.ClientTypeSignature;
import io.trino.client.Column;
import io.trino.client.QueryResults;
import io.trino.client.RawQueryData;
import io.trino.client.StatementStats;
import io.trino.client.TypedQueryData;
import io.trino.server.protocol.spooling.QueryDataJacksonModule;
import io.trino.spi.type.StandardTypes;
import okhttp3.mockwebserver.MockResponse;
Expand Down Expand Up @@ -98,7 +98,7 @@ private String newQueryResults(Integer partialCancelId, Integer nextUriId, List<
partialCancelId == null ? null : server.url(format("/v1/statement/partialCancel/%s.%s", queryId, partialCancelId)).uri(),
nextUriId == null ? null : server.url(format("/v1/statement/%s/%s", queryId, nextUriId)).uri(),
responseColumns,
RawQueryData.of(data),
TypedQueryData.of(data),
new StatementStats(state, state.equals("QUEUED"), true, OptionalDouble.of(0), OptionalDouble.of(0), 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, null),
null,
ImmutableList.of(),
Expand Down
2 changes: 1 addition & 1 deletion core/trino-grammar/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@
<parent>
<groupId>io.trino</groupId>
<artifactId>trino-root</artifactId>
<version>464-SNAPSHOT</version>
<version>465-SNAPSHOT</version>
<relativePath>../../pom.xml</relativePath>
</parent>

Expand Down
2 changes: 1 addition & 1 deletion core/trino-main/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@
<parent>
<groupId>io.trino</groupId>
<artifactId>trino-root</artifactId>
<version>464-SNAPSHOT</version>
<version>465-SNAPSHOT</version>
<relativePath>../../pom.xml</relativePath>
</parent>

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,8 +27,8 @@
import io.opentelemetry.api.trace.Tracer;
import io.trino.client.QueryError;
import io.trino.client.QueryResults;
import io.trino.client.RawQueryData;
import io.trino.client.StatementStats;
import io.trino.client.TypedQueryData;
import io.trino.execution.ExecutionFailureInfo;
import io.trino.execution.QueryManagerConfig;
import io.trino.execution.QueryState;
Expand Down Expand Up @@ -281,7 +281,7 @@ private static QueryResults createQueryResults(
null,
nextUri,
null,
RawQueryData.of(null),
TypedQueryData.of(null),
StatementStats.builder()
.setState(state.toString())
.setQueued(state == QUEUED)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -187,7 +187,7 @@ private SqlQueryExecution(
EventDrivenTaskSourceFactory eventDrivenTaskSourceFactory,
TaskDescriptorStorage taskDescriptorStorage)
{
try (SetThreadName _ = new SetThreadName("Query-%s", stateMachine.getQueryId())) {
try (SetThreadName _ = new SetThreadName("Query-" + stateMachine.getQueryId())) {
this.slug = requireNonNull(slug, "slug is null");
this.tracer = requireNonNull(tracer, "tracer is null");
this.plannerContext = requireNonNull(plannerContext, "plannerContext is null");
Expand Down Expand Up @@ -396,7 +396,7 @@ public BasicQueryInfo getBasicQueryInfo()
@Override
public void start()
{
try (SetThreadName _ = new SetThreadName("Query-%s", stateMachine.getQueryId())) {
try (SetThreadName _ = new SetThreadName("Query-" + stateMachine.getQueryId())) {
try {
if (!stateMachine.transitionToPlanning()) {
// query already started or finished
Expand Down Expand Up @@ -456,7 +456,7 @@ public void start()
@Override
public void addStateChangeListener(StateChangeListener<QueryState> stateChangeListener)
{
try (SetThreadName _ = new SetThreadName("Query-%s", stateMachine.getQueryId())) {
try (SetThreadName _ = new SetThreadName("Query-" + stateMachine.getQueryId())) {
stateMachine.addStateChangeListener(stateChangeListener);
}
}
Expand Down Expand Up @@ -607,7 +607,7 @@ public void cancelStage(StageId stageId)
{
requireNonNull(stageId, "stageId is null");

try (SetThreadName _ = new SetThreadName("Query-%s", stateMachine.getQueryId())) {
try (SetThreadName _ = new SetThreadName("Query-" + stateMachine.getQueryId())) {
QueryScheduler scheduler = queryScheduler.get();
if (scheduler != null) {
scheduler.cancelStage(stageId);
Expand All @@ -620,7 +620,7 @@ public void failTask(TaskId taskId, Exception reason)
{
requireNonNull(taskId, "stageId is null");

try (SetThreadName _ = new SetThreadName("Query-%s", stateMachine.getQueryId())) {
try (SetThreadName _ = new SetThreadName("Query-" + stateMachine.getQueryId())) {
QueryScheduler scheduler = queryScheduler.get();
if (scheduler != null) {
scheduler.failTask(taskId, reason);
Expand Down Expand Up @@ -693,7 +693,7 @@ public QueryId getQueryId()
@Override
public QueryInfo getQueryInfo()
{
try (SetThreadName _ = new SetThreadName("Query-%s", stateMachine.getQueryId())) {
try (SetThreadName _ = new SetThreadName("Query-" + stateMachine.getQueryId())) {
// acquire reference to scheduler before checking finalQueryInfo, because
// state change listener sets finalQueryInfo and then clears scheduler when
// the query finishes.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -265,7 +265,7 @@ public void createQuery(QueryExecution queryExecution)
queryTracker.expireQuery(queryExecution.getQueryId());
});

try (SetThreadName _ = new SetThreadName("Query-%s", queryExecution.getQueryId())) {
try (SetThreadName _ = new SetThreadName("Query-" + queryExecution.getQueryId())) {
try (var ignoredStartScope = scopedSpan(tracer.spanBuilder("query-start")
.setParent(Context.current().with(queryExecution.getSession().getQuerySpan()))
.startSpan())) {
Expand Down
4 changes: 2 additions & 2 deletions core/trino-main/src/main/java/io/trino/execution/SqlTask.java
Original file line number Diff line number Diff line change
Expand Up @@ -269,14 +269,14 @@ public void recordHeartbeat()

public TaskInfo getTaskInfo()
{
try (SetThreadName _ = new SetThreadName("Task-%s", taskId)) {
try (SetThreadName _ = new SetThreadName("Task-" + taskId)) {
return createTaskInfo(taskHolderReference.get());
}
}

public TaskStatus getTaskStatus()
{
try (SetThreadName _ = new SetThreadName("Task-%s", taskId)) {
try (SetThreadName _ = new SetThreadName("Task-" + taskId)) {
return createTaskStatus(taskHolderReference.get());
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -140,7 +140,7 @@ public SqlTaskExecution(
this.splitMonitor = requireNonNull(splitMonitor, "splitMonitor is null");
this.driverAndTaskTerminationTracker = new DriverAndTaskTerminationTracker(taskStateMachine);

try (SetThreadName _ = new SetThreadName("Task-%s", taskId)) {
try (SetThreadName _ = new SetThreadName("Task-" + taskId)) {
List<DriverFactory> driverFactories = localExecutionPlan.getDriverFactories();
// index driver factories
Set<PlanNodeId> partitionedSources = ImmutableSet.copyOf(localExecutionPlan.getPartitionedSourceOrder());
Expand Down Expand Up @@ -195,7 +195,7 @@ public SqlTaskExecution(
// this must be synchronized to prevent a concurrent call to checkTaskCompletion() from proceeding before all task lifecycle drivers are created
public synchronized void start()
{
try (SetThreadName _ = new SetThreadName("Task-%s", getTaskId())) {
try (SetThreadName _ = new SetThreadName("Task-" + getTaskId())) {
// Signal immediate termination complete if task termination has started
if (taskStateMachine.getState().isTerminating()) {
taskStateMachine.terminationComplete();
Expand Down Expand Up @@ -263,7 +263,7 @@ public void addSplitAssignments(List<SplitAssignment> splitAssignments)
return;
}

try (SetThreadName _ = new SetThreadName("Task-%s", taskId)) {
try (SetThreadName _ = new SetThreadName("Task-" + taskId)) {
// update our record of split assignments and schedule drivers for new partitioned splits
Set<PlanNodeId> updatedUnpartitionedSources = updateSplitAssignments(splitAssignments);
for (PlanNodeId planNodeId : updatedUnpartitionedSources) {
Expand Down Expand Up @@ -415,7 +415,7 @@ private synchronized void enqueueDriverSplitRunner(boolean forceRunSplit, List<D
@Override
public void onSuccess(Object result)
{
try (SetThreadName _ = new SetThreadName("Task-%s", taskId)) {
try (SetThreadName _ = new SetThreadName("Task-" + taskId)) {
// record driver is finished
if (remainingSplitRunners.decrementAndGet() == 0) {
checkTaskCompletion();
Expand All @@ -428,7 +428,7 @@ public void onSuccess(Object result)
@Override
public void onFailure(Throwable cause)
{
try (SetThreadName _ = new SetThreadName("Task-%s", taskId)) {
try (SetThreadName _ = new SetThreadName("Task-" + taskId)) {
taskStateMachine.failed(cause);

// record driver is finished
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -78,7 +78,7 @@ public SqlTaskExecution create(
cpuTimerEnabled);

LocalExecutionPlan localExecutionPlan;
try (SetThreadName _ = new SetThreadName("Task-%s", taskStateMachine.getTaskId())) {
try (SetThreadName _ = new SetThreadName("Task-" + taskStateMachine.getTaskId())) {
try (var ignoredSpan = scopedSpan(tracer, "local-planner")) {
localExecutionPlan = planner.plan(
taskContext,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -71,7 +71,7 @@ public void run(SchedulerContext context)
CpuTimer timer = new CpuTimer(Ticker.systemTicker(), false);
long previousCpuNanos = 0;
long previousScheduledNanos = 0;
try (SetThreadName _ = new SetThreadName("SplitRunner-%s-%s", taskId, splitId)) {
try (SetThreadName _ = new SetThreadName("SplitRunner-" + taskId + "-" + splitId)) {
while (!split.isFinished()) {
try (var ignored2 = processSpan.makeCurrent()) {
ListenableFuture<Void> blocked = split.processFor(SPLIT_RUN_QUANTA);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -293,7 +293,7 @@ public synchronized TimeSharingTaskHandle addTask(
public void removeTask(TaskHandle taskHandle)
{
TimeSharingTaskHandle handle = (TimeSharingTaskHandle) taskHandle;
try (SetThreadName _ = new SetThreadName("Task-%s", handle.getTaskId())) {
try (SetThreadName _ = new SetThreadName("Task-" + handle.getTaskId())) {
// Skip additional scheduling if the task was already destroyed
if (!doRemoveTask(handle)) {
return;
Expand Down Expand Up @@ -542,7 +542,7 @@ private class TaskRunner
@Override
public void run()
{
try (SetThreadName runnerName = new SetThreadName("SplitRunner-%s", runnerId)) {
try (SetThreadName runnerName = new SetThreadName("SplitRunner-" + runnerId)) {
while (!closed && !Thread.currentThread().isInterrupted()) {
// select next worker
PrioritizedSplitRunner split;
Expand All @@ -555,7 +555,7 @@ public void run()
}

String threadId = split.getTaskHandle().getTaskId() + "-" + split.getSplitId();
try (SetThreadName splitName = new SetThreadName(threadId)) {
try (SetThreadName _ = new SetThreadName(threadId)) {
RunningSplitInfo splitInfo = new RunningSplitInfo(ticker.read(), threadId, Thread.currentThread(), split.getTaskHandle().getTaskId(), split::getInfo);
runningSplitInfos.add(splitInfo);
runningSplits.add(split);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -432,7 +432,7 @@ private synchronized void scheduleRetry()
@Override
public synchronized void cancelStage(StageId stageId)
{
try (SetThreadName _ = new SetThreadName("Query-%s", queryStateMachine.getQueryId())) {
try (SetThreadName _ = new SetThreadName("Query-" + queryStateMachine.getQueryId())) {
coordinatorStagesScheduler.cancelStage(stageId);
DistributedStagesScheduler distributedStagesScheduler = this.distributedStagesScheduler.get();
if (distributedStagesScheduler != null) {
Expand All @@ -444,7 +444,7 @@ public synchronized void cancelStage(StageId stageId)
@Override
public void failTask(TaskId taskId, Throwable failureCause)
{
try (SetThreadName _ = new SetThreadName("Query-%s", queryStateMachine.getQueryId())) {
try (SetThreadName _ = new SetThreadName("Query-" + queryStateMachine.getQueryId())) {
stageManager.failTaskRemotely(taskId, failureCause);
}
}
Expand Down Expand Up @@ -1272,7 +1272,7 @@ public void schedule()
{
checkState(started.compareAndSet(false, true), "already started");

try (SetThreadName _ = new SetThreadName("Query-%s", queryStateMachine.getQueryId())) {
try (SetThreadName _ = new SetThreadName("Query-" + queryStateMachine.getQueryId())) {
stageSchedulers.values().forEach(StageScheduler::start);
while (!executionSchedule.isFinished()) {
List<ListenableFuture<Void>> blockedStages = new ArrayList<>();
Expand Down
Loading

0 comments on commit 0aa8156

Please sign in to comment.