Skip to content

Commit

Permalink
Add standard columns to kusto tables (#8984)
Browse files Browse the repository at this point in the history
* Add denormalized workflow name to GitHub kusto tables
* Put deployment name in tables deployments
* Process periodic workers every day
  • Loading branch information
hallipr authored Sep 18, 2024
1 parent 8957dfb commit f2d1dea
Show file tree
Hide file tree
Showing 7 changed files with 87 additions and 54 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -109,19 +109,22 @@ public string GetRunBlobName(WorkflowRun run)

private async Task ProcessWorkflowRunAsync(WorkflowRun run)
{
Workflow workflow = await GetWorkflowAsync(run);
List<WorkflowJob> jobs = await GetJobsAsync(run);

await UploadJobsBlobAsync(run, jobs);
await UploadStepsBlobAsync(run, jobs);
await UploadLogsBlobAsync(run, jobs);
await UploadJobsBlobAsync(workflow, run, jobs);
await UploadStepsBlobAsync(workflow, run, jobs);
await UploadLogsBlobAsync(workflow, run, jobs);

// We upload the run blob last. This allows us to use the existence of the blob as a signal that run processing is complete.
await UploadRunBlobAsync(run);
await UploadRunBlobAsync(workflow, run);
}

private async Task UploadRunBlobAsync(WorkflowRun run)
private async Task UploadRunBlobAsync(Workflow workflow, WorkflowRun run)
{
string repository = run.Repository.FullName;
long workflowId = workflow.Id;
string workflowName = workflow.Name;
long runId = run.Id;
string runName = run.Name;
long attempt = run.RunAttempt;
Expand All @@ -135,23 +138,24 @@ private async Task UploadRunBlobAsync(WorkflowRun run)

if (await blobClient.ExistsAsync())
{
this.logger.LogInformation("Skipping existing workflow jobs for repository {Repository}, workflow {Workflow}, run {RunId}, attempt {Attempt}", repository, runName, runId, attempt);
this.logger.LogInformation("Skipping existing workflow jobs for repository {Repository}, workflow {Workflow}, run {RunId}, attempt {Attempt}", repository, workflowName, runId, attempt);
return;
}

this.logger.LogInformation("Processing workflow jobs for repository {Repository}, workflow {Workflow}, run {RunId}, attempt {Attempt}", repository, runName, runId, attempt);
this.logger.LogInformation("Processing workflow jobs for repository {Repository}, workflow {Workflow}, run {RunId}, attempt {Attempt}", repository, workflowName, runId, attempt);

string content = JsonConvert.SerializeObject(
new
{
Repository = repository,
Workflow = runName,
run.WorkflowId,
RunId = run.Id,
WorkflowId = workflowId,
WorkflowName = workflowName,
RunId = runId,
RunName = runName,
run.RunNumber,
run.HeadBranch,
run.HeadSha,
run.RunAttempt,
RunAttenpt = attempt,
run.Event,
Status = run.Status.StringValue,
Conclusion = run.Conclusion?.StringValue,
Expand All @@ -173,18 +177,20 @@ private async Task UploadRunBlobAsync(WorkflowRun run)
}
catch (RequestFailedException ex) when (ex.Status == (int)HttpStatusCode.Conflict)
{
this.logger.LogInformation("Ignoring existing blob exception for repository {Repository}, workflow {Workflow}, run {RunId}, attempt {Attempt}", repository, runName, runId, attempt);
this.logger.LogInformation("Ignoring existing blob exception for repository {Repository}, workflow {Workflow}, run {RunId}, attempt {Attempt}", repository, workflowName, runId, attempt);
}
catch (Exception ex)
{
this.logger.LogError(ex, "Error processing repository {Repository}, workflow {Workflow}, run {RunId}, attempt {Attempt}", repository, runName, runId, attempt);
this.logger.LogError(ex, "Error processing repository {Repository}, workflow {Workflow}, run {RunId}, attempt {Attempt}", repository, workflowName, runId, attempt);
throw;
}
}

private async Task UploadJobsBlobAsync(WorkflowRun run, List<WorkflowJob> jobs)
private async Task UploadJobsBlobAsync(Workflow workflow, WorkflowRun run, List<WorkflowJob> jobs)
{
string repository = run.Repository.FullName;
long workflowId = workflow.Id;
string workflowName = workflow.Name;
long runId = run.Id;
string runName = run.Name;
long attempt = run.RunAttempt;
Expand All @@ -196,11 +202,11 @@ private async Task UploadJobsBlobAsync(WorkflowRun run, List<WorkflowJob> jobs)

if (await blobClient.ExistsAsync())
{
this.logger.LogInformation("Skipping existing workflow jobs for repository {Repository}, workflow {Workflow}, run {RunId}, attempt {Attempt}", repository, runName, runId, attempt);
this.logger.LogInformation("Skipping existing workflow jobs for repository {Repository}, workflow {Workflow}, run {RunId}, attempt {Attempt}", repository, workflowName, runId, attempt);
return;
}

this.logger.LogInformation("Processing workflow jobs for repository {Repository}, workflow {Workflow}, run {RunId}, attempt {Attempt}", repository, runName, runId, attempt);
this.logger.LogInformation("Processing workflow jobs for repository {Repository}, workflow {Workflow}, run {RunId}, attempt {Attempt}", repository, workflowName, runId, attempt);

StringBuilder builder = new();

Expand All @@ -210,11 +216,13 @@ private async Task UploadJobsBlobAsync(WorkflowRun run, List<WorkflowJob> jobs)
new
{
Repository = repository,
Workflow = runName,
run.WorkflowId,
RunId = run.Id,
WorkflowId = workflowId,
WorkflowName = workflowName,
RunId = runId,
RunName = runName,
RunAttempt = attempt,
JobId = job.Id,
job.Name,
JobName = job.Name,
Status = job.Status.StringValue,
Conclusion = job.Conclusion?.StringValue,
CreatedAt = job.CreatedAt?.ToString(TimeFormat),
Expand All @@ -236,18 +244,20 @@ private async Task UploadJobsBlobAsync(WorkflowRun run, List<WorkflowJob> jobs)
}
catch (RequestFailedException ex) when (ex.Status == (int)HttpStatusCode.Conflict)
{
this.logger.LogInformation("Ignoring existing blob exception for repository {Repository}, workflow {Workflow}, run {RunId}, attempt {Attempt}", repository, runName, runId, attempt);
this.logger.LogInformation("Ignoring existing blob exception for repository {Repository}, workflow {Workflow}, run {RunId}, attempt {Attempt}", repository, workflowName, runId, attempt);
}
catch (Exception ex)
{
this.logger.LogError(ex, "Error processing repository {Repository}, workflow {Workflow}, run {RunId}, attempt {Attempt}", repository, runName, runId, attempt);
this.logger.LogError(ex, "Error processing repository {Repository}, workflow {Workflow}, run {RunId}, attempt {Attempt}", repository, workflowName, runId, attempt);
throw;
}
}

private async Task UploadStepsBlobAsync(WorkflowRun run, List<WorkflowJob> jobs)
private async Task UploadStepsBlobAsync(Workflow workflow, WorkflowRun run, List<WorkflowJob> jobs)
{
string repository = run.Repository.FullName;
long workflowId = workflow.Id;
string workflowName = workflow.Name;
long runId = run.Id;
string runName = run.Name;
long attempt = run.RunAttempt;
Expand All @@ -262,11 +272,11 @@ private async Task UploadStepsBlobAsync(WorkflowRun run, List<WorkflowJob> jobs)

if (await blobClient.ExistsAsync())
{
this.logger.LogInformation("Skipping existing workflow steps for repository {Repository}, workflow {Workflow}, run {RunId}, attempt {Attempt}", repository, runName, runId, attempt);
this.logger.LogInformation("Skipping existing workflow steps for repository {Repository}, workflow {Workflow}, run {RunId}, attempt {Attempt}", repository, workflowName, runId, attempt);
return;
}

this.logger.LogInformation("Processing workflow steps for repository {Repository}, workflow {Workflow}, run {RunId}, attempt {Attempt}", repository, runName, runId, attempt);
this.logger.LogInformation("Processing workflow steps for repository {Repository}, workflow {Workflow}, run {RunId}, attempt {Attempt}", repository, workflowName, runId, attempt);

StringBuilder builder = new();

Expand All @@ -278,13 +288,15 @@ private async Task UploadStepsBlobAsync(WorkflowRun run, List<WorkflowJob> jobs)
new
{
Repository = repository,
Workflow = runName,
Job = job.Name,
run.WorkflowId,
RunId = run.Id,
WorkflowId = workflowId,
WorkflowName = workflowName,
RunId = runId,
RunName = runName,
RunAttempt = attempt,
JobId = job.Id,
JobName = job.Name,
StepNumber = step.Number,
step.Name,
StepName = step.Name,
Status = step.Status.StringValue,
Conclusion = step.Conclusion?.StringValue,
StartedAt = step.StartedAt?.ToString(TimeFormat),
Expand All @@ -307,9 +319,11 @@ private async Task UploadStepsBlobAsync(WorkflowRun run, List<WorkflowJob> jobs)
}
}

private async Task UploadLogsBlobAsync(WorkflowRun run, List<WorkflowJob> jobs)
private async Task UploadLogsBlobAsync(Workflow workflow, WorkflowRun run, List<WorkflowJob> jobs)
{
string repository = run.Repository.FullName;
long workflowId = workflow.Id;
string workflowName = workflow.Name;
long runId = run.Id;
string runName = run.Name;
long attempt = run.RunAttempt;
Expand Down Expand Up @@ -390,9 +404,10 @@ private async Task UploadLogsBlobAsync(WorkflowRun run, List<WorkflowJob> jobs)
await blobWriter.WriteLineAsync(JsonConvert.SerializeObject(new
{
Repository = repository,
WorkflowName = runName,
run.WorkflowId,
RunId = run.Id,
WorkflowId = workflowId,
WorkflowName = workflowName,
RunId = runId,
RunAttempt = attempt,
JobId = job.Id,
StepNumber = logLine.Step,
LineNumber = logLine.Number,
Expand Down Expand Up @@ -493,6 +508,12 @@ private async Task<WorkflowRun> GetWorkflowRunAsync(string owner, string reposit
return workflowRun;
}

private async Task<Workflow> GetWorkflowAsync(WorkflowRun run)
{
Workflow workflow = await this.client.Actions.Workflows.Get(run.Repository.Owner.Name, run.Repository.Name, run.WorkflowId);
return workflow;
}

private async Task<List<WorkflowJob>> GetJobsAsync(WorkflowRun run)
{
List<WorkflowJob> jobs = [];
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,23 +24,23 @@

"BuildDefinitionWorker": {
"Enabled": true,
"LoopPeriod": "00:05:00",
"CooldownPeriod": "7.00:00:00",
"LoopPeriod": "01:00:00",
"CooldownPeriod": "1.00:00:00",
"LockName": "BuildDefinitionWorker"
},

"MissingPipelineRunsWorker": {
"Enabled": true,
"LoopPeriod": "01:00:00",
"CooldownPeriod": "7.00:00:00",
"CooldownPeriod": "1.00:00:00",
"LookbackPeriod": "14.00:00:00",
"LockName": "MissingPipelineRunsWorker"
},

"MissingGitHubActionsWorker": {
"Enabled": true,
"LoopPeriod": "01:00:00",
"CooldownPeriod": "7.00:00:00",
"CooldownPeriod": "1.00:00:00",
"LookbackPeriod": "14.00:00:00",
"LockName": "MissingGitHubActionsWorker"
},
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -252,7 +252,7 @@ resource gitHubKustoEventHubsAssignment 'Microsoft.Authorization/roleAssignments
// namespace, we need an event hub per table, so we split our tables across two namespaces.
// https://learn.microsoft.com/en-us/azure/event-hubs/event-hubs-quotas
module devOpsTables 'tableResources.bicep' = {
name: 'devOpsTables'
name: '${deployment().name}-devOpsTables'
scope: resourceGroup()
dependsOn:[ kustoScriptInvocation ]
params: {
Expand Down Expand Up @@ -300,7 +300,7 @@ module devOpsTables 'tableResources.bicep' = {
}

module gitHubTables 'tableResources.bicep' = {
name: 'gitHubTables'
name: '${deployment().name}-gitHubTables'
scope: resourceGroup()
dependsOn:[ kustoScriptInvocation ]
params: {
Expand Down
Original file line number Diff line number Diff line change
@@ -1,10 +1,12 @@
.create-merge table GitHubActionsJob (
Repository: string,
Workflow: string,
WorkflowId: long,
WorkflowName: string,
RunId: long,
RunName: string,
RunAttempt: long,
JobId: long,
Name: string,
JobName: string,
Status: string,
Conclusion: string,
CreatedAt: datetime,
Expand All @@ -24,11 +26,13 @@

.create-or-alter table GitHubActionsJob ingestion json mapping 'GitHubActionsJob_mapping' ```[
{ "column": "Repository", "path": "$['repository']" },
{ "column": "Workflow", "path": "$['workflow']" },
{ "column": "WorkflowId", "path": "$['workflowId']" },
{ "column": "WorkflowName", "path": "$['workflowName']" },
{ "column": "RunId", "path": "$['runId']" },
{ "column": "RunName", "path": "$['runName']" },
{ "column": "RunAttempt", "path": "$['runAttempt']" },
{ "column": "JobId", "path": "$['jobId']" },
{ "column": "Name", "path": "$['name']" },
{ "column": "JobName", "path": "$['jobName']" },
{ "column": "Status", "path": "$['status']" },
{ "column": "Conclusion", "path": "$['conclusion']" },
{ "column": "CreatedAt", "path": "$['createdAt']" },
Expand Down
Original file line number Diff line number Diff line change
@@ -1,8 +1,9 @@
.create-merge table GitHubActionsLogLine (
Repository: string,
WorkflowName: string,
WorkflowId: long,
WorkflowName: string,
RunId: long,
RunAttempt: long,
JobId: long,
StepNumber: int,
LineNumber: int,
Expand All @@ -14,9 +15,10 @@

.create-or-alter table GitHubActionsLogLine ingestion json mapping 'GitHubActionsLogLine_mapping' ```[
{ "column": "Repository", "path": "$['repository']" },
{ "column": "WorkflowName", "path": "$['workflowName']" },
{ "column": "WorkflowId", "path": "$['workflowId']" },
{ "column": "WorkflowName", "path": "$['workflowName']" },
{ "column": "RunId", "path": "$['runId']" },
{ "column": "RunAttempt", "path": "$['runAttempt']" },
{ "column": "JobId", "path": "$['jobId']" },
{ "column": "StepNumber", "path": "$['stepNumber']" },
{ "column": "LineNumber", "path": "$['lineNumber']" },
Expand Down
Original file line number Diff line number Diff line change
@@ -1,8 +1,9 @@
.create-merge table GitHubActionsRun (
Repository: string,
Workflow: string,
WorkflowId: long,
WorkflowName: string,
RunId: long,
RunName: string,
RunNumber: long,
HeadBranch: string,
HeadSha: string,
Expand All @@ -26,9 +27,10 @@

.create-or-alter table GitHubActionsRun ingestion json mapping 'GitHubActionsRun_mapping' ```[
{ "column": "Repository", "path": "$['repository']" },
{ "column": "Workflow", "path": "$['workflow']" },
{ "column": "WorkflowId", "path": "$['workflowId']" },
{ "column": "WorkflowName", "path": "$['workflowName']" },
{ "column": "RunId", "path": "$['runId']" },
{ "column": "RunName", "path": "$['runName']" },
{ "column": "RunNumber", "path": "$['runNumber']" },
{ "column": "HeadBranch", "path": "$['headBranch']" },
{ "column": "HeadSha", "path": "$['headSha']" },
Expand Down
Original file line number Diff line number Diff line change
@@ -1,12 +1,14 @@
.create-merge table GitHubActionsStep (
Repository: string,
Workflow: string,
Job: string,
WorkflowId: long,
WorkflowName: string,
RunId: long,
RunName: string,
RunAttempt: long,
JobId: long,
JobName: string,
StepNumber: int,
Name: string,
StepName: string,
Status: string,
Conclusion: string,
StartedAt: datetime,
Expand All @@ -16,13 +18,15 @@

.create-or-alter table GitHubActionsStep ingestion json mapping 'GitHubActionsStep_mapping' ```[
{ "column": "Repository", "path": "$['repository']" },
{ "column": "Workflow", "path": "$['workflow']" },
{ "column": "Job", "path": "$['job']" },
{ "column": "WorkflowId", "path": "$['workflowId']" },
{ "column": "WorkflowName", "path": "$['workflowName']" },
{ "column": "RunId", "path": "$['runId']" },
{ "column": "RunName", "path": "$['runName']" },
{ "column": "RunAttempt", "path": "$['runAttempt']" },
{ "column": "JobId", "path": "$['jobId']" },
{ "column": "JobName", "path": "$['jobName']" },
{ "column": "StepNumber", "path": "$['stepNumber']" },
{ "column": "Name", "path": "$['name']" },
{ "column": "StepName", "path": "$['stepName']" },
{ "column": "Status", "path": "$['status']" },
{ "column": "Conclusion", "path": "$['conclusion']" },
{ "column": "StartedAt", "path": "$['startedAt']" },
Expand Down

0 comments on commit f2d1dea

Please sign in to comment.