Skip to content

Commit

Permalink
Add support for missing build processing (#8424)
Browse files Browse the repository at this point in the history
* Add support for missing build processing
* Add rust to list of monitored repos
  • Loading branch information
hallipr authored Sep 5, 2024
1 parent ea8390c commit f9bbdd2
Show file tree
Hide file tree
Showing 12 changed files with 397 additions and 12 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -183,6 +183,30 @@ public async Task UploadBuildBlobsAsync(string account, Guid projectId, int buil
await UploadBuildBlobAsync(account, build);
}

public async Task<string[]> GetBuildBlobNamesAsync(string projectName, DateTimeOffset minTime, DateTimeOffset maxTime, CancellationToken cancellationToken)
{
DateTimeOffset minDay = minTime.ToUniversalTime().Date;
DateTimeOffset maxDay = maxTime.ToUniversalTime().Date;

DateTimeOffset[] days = Enumerable.Range(0, (int)(maxDay - minDay).TotalDays + 1)
.Select(offset => minDay.AddDays(offset))
.ToArray();

List<string> blobNames = [];

foreach (DateTimeOffset day in days)
{
string blobPrefix = $"{projectName}/{day:yyyy/MM/dd}/";

await foreach (BlobItem blob in this.buildsContainerClient.GetBlobsAsync(prefix: blobPrefix, cancellationToken: cancellationToken))
{
blobNames.Add(blob.Name);
}
}

return [.. blobNames];
}

public string GetBuildBlobName(Build build)
{
long changeTime = ((DateTimeOffset)build.LastChangedDate).ToUnixTimeSeconds();
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,105 @@
using System;
using System.Linq;
using System.Threading;
using System.Threading.Tasks;
using Azure.Sdk.Tools.PipelineWitness.Configuration;
using Azure.Sdk.Tools.PipelineWitness.Services;
using Azure.Sdk.Tools.PipelineWitness.Services.WorkTokens;
using Microsoft.Extensions.Logging;
using Microsoft.Extensions.Options;
using Microsoft.TeamFoundation.Build.WebApi;
using Microsoft.VisualStudio.Services.WebApi;

namespace Azure.Sdk.Tools.PipelineWitness.AzurePipelines
{
public class MissingAzurePipelineRunsWorker : PeriodicLockingBackgroundService
{
private readonly ILogger<MissingAzurePipelineRunsWorker> logger;
private readonly AzurePipelinesProcessor runProcessor;
private readonly BuildCompleteQueue buildCompleteQueue;
private readonly IOptions<PipelineWitnessSettings> options;
private readonly EnhancedBuildHttpClient buildClient;

public MissingAzurePipelineRunsWorker(
ILogger<MissingAzurePipelineRunsWorker> logger,
AzurePipelinesProcessor runProcessor,
IAsyncLockProvider asyncLockProvider,
VssConnection vssConnection,
BuildCompleteQueue buildCompleteQueue,
IOptions<PipelineWitnessSettings> options)
: base(
logger,
asyncLockProvider,
options.Value.MissingPipelineRunsWorker)
{
this.logger = logger;
this.runProcessor = runProcessor;
this.buildCompleteQueue = buildCompleteQueue;
this.options = options;

ArgumentNullException.ThrowIfNull(vssConnection);

this.buildClient = vssConnection.GetClient<EnhancedBuildHttpClient>();
}

protected override async Task ProcessAsync(CancellationToken cancellationToken)
{
var settings = this.options.Value;

// search for builds that completed within this window
var buildMinTime = DateTimeOffset.UtcNow.Subtract(settings.MissingPipelineRunsWorker.LookbackPeriod);
var buildMaxTime = DateTimeOffset.UtcNow.Subtract(TimeSpan.FromHours(1));

foreach (string project in settings.Projects)
{
var knownBlobs = await this.runProcessor.GetBuildBlobNamesAsync(project, buildMinTime, buildMaxTime, cancellationToken);

string continuationToken = null;
do
{
var completedBuilds = await this.buildClient.GetBuildsAsync2(
project,
minFinishTime: buildMinTime.DateTime,
maxFinishTime: buildMaxTime.DateTime,
statusFilter: BuildStatus.Completed,
continuationToken: continuationToken,
cancellationToken: cancellationToken);

var skipCount = 0;
var enqueueCount = 0;
foreach (var build in completedBuilds)
{
var blobName = this.runProcessor.GetBuildBlobName(build);

if (knownBlobs.Contains(blobName, StringComparer.InvariantCultureIgnoreCase))
{
skipCount++;
continue;
}

var queueMessage = new BuildCompleteQueueMessage
{
Account = settings.Account,
ProjectId = build.Project.Id,
BuildId = build.Id
};

this.logger.LogInformation("Enqueuing missing build {Project} {BuildId} for processing", build.Project.Name, build.Id);
await this.buildCompleteQueue.EnqueueMessageAsync(queueMessage);
enqueueCount++;
}

this.logger.LogInformation("Enqueued {EnqueueCount} missing builds, skipped {SkipCount} existing builds in project {Project}", enqueueCount, skipCount, project);

continuationToken = completedBuilds.ContinuationToken;
} while(!string.IsNullOrEmpty(continuationToken));
}
}

protected override Task ProcessExceptionAsync(Exception ex)
{
this.logger.LogError(ex, "Error processing missing builds");
return Task.CompletedTask;
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,11 @@ public class PeriodicProcessSettings
/// </summary>
public TimeSpan CooldownPeriod { get; set; }

/// <summary>
/// Gets or sets the amount of history to process in each iteration
/// </summary>
public TimeSpan LookbackPeriod { get; set; }

/// <summary>
/// Gets or sets the name of the distributed lock
/// </summary>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -85,6 +85,16 @@ public class PipelineWitnessSettings
/// </summary>
public PeriodicProcessSettings BuildDefinitionWorker { get; set; }

/// <summary>
/// Gets or sets the loops settins for the Missing Azure Pipline Runs worker
/// </summary>
public PeriodicProcessSettings MissingPipelineRunsWorker { get; set; }

/// <summary>
/// Gets or sets the loops settins for the Missing GitHub Actions worker
/// </summary>
public PeriodicProcessSettings MissingGitHubActionsWorker { get; set; }

/// <summary>
/// Gets or sets the artifact name used by the pipeline owners extraction build
/// </summary>
Expand All @@ -109,5 +119,15 @@ public class PipelineWitnessSettings
/// Gets or sets the container to use for async locks
/// </summary>
public string CosmosAsyncLockContainer { get; set; }

/// <summary>
/// Gets or sets the list of monitored GitHub repositories (Overrides GitHubRepositoriesSource)
/// </summary>
public string[] GitHubRepositories { get; set; }

/// <summary>
/// Gets or sets the url for a list of monitored GitHub repositories
/// </summary>
public string GitHubRepositoriesSource { get; set; }
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,47 @@
using System;
using System.Net.Http;
using System.Net.Http.Json;
using Microsoft.Extensions.Logging;
using Microsoft.Extensions.Options;

namespace Azure.Sdk.Tools.PipelineWitness.Configuration;

public class PostConfigureSettings : IPostConfigureOptions<PipelineWitnessSettings>
{
private readonly ILogger logger;

public PostConfigureSettings(ILogger<PostConfigureSettings> logger)
{
this.logger = logger;
}

public void PostConfigure(string name, PipelineWitnessSettings options)
{
if (options.GitHubRepositories == null || options.GitHubRepositories.Length == 0)
{
options.GitHubRepositories = [];

if (string.IsNullOrEmpty(options.GitHubRepositoriesSource))
{
this.logger.LogWarning("No GitHubRepositories or GitHubRepositoriesSource configured");
return;
}

try
{
this.logger.LogInformation("Replacing settings property GitHubRepositories with values from {Source}", options.GitHubRepositoriesSource);
using var client = new HttpClient();

options.GitHubRepositories = client.GetFromJsonAsync<string[]>(options.GitHubRepositoriesSource)
.ConfigureAwait(true)
.GetAwaiter()
.GetResult();
}
catch (Exception ex)
{
this.logger.LogError(ex, "Error loading repository list from source");
return;
}
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -83,16 +83,27 @@ private async Task<IActionResult> ProcessWorkflowRunEventAsync()

if (action == "completed")
{
var queueMessage = new RunCompleteQueueMessage
{
Owner = eventMessage.GetProperty("repository").GetProperty("owner").GetProperty("login").GetString(),
Repository = eventMessage.GetProperty("repository").GetProperty("name").GetString(),
RunId = eventMessage.GetProperty("workflow_run").GetProperty("id").GetInt64(),
};

this.logger.LogInformation("Enqueuing GitHubRunCompleteMessage for {Owner}/{Repository} run {RunId}", queueMessage.Owner, queueMessage.Repository, queueMessage.RunId);
string owner = eventMessage.GetProperty("repository").GetProperty("owner").GetProperty("login").GetString();
string repository = eventMessage.GetProperty("repository").GetProperty("name").GetString();
long runId = eventMessage.GetProperty("workflow_run").GetProperty("id").GetInt64();

await this.queueClient.SendMessageAsync(JsonSerializer.Serialize(queueMessage));
if (this.settings.GitHubRepositories.Contains($"{owner}/{repository}", StringComparer.InvariantCultureIgnoreCase))
{
this.logger.LogInformation("Enqueuing GitHubRunCompleteMessage for {Owner}/{Repository} run {RunId}", owner, repository, runId);

var queueMessage = new RunCompleteQueueMessage
{
Owner = owner,
Repository = repository,
RunId = runId,
};

await this.queueClient.SendMessageAsync(JsonSerializer.Serialize(queueMessage));
}
else
{
this.logger.LogInformation("Skipping message for unknown repostory {Owner}/{Repository}", owner, repository);
}
}

return Ok();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,32 @@ public async Task ProcessAsync(string owner, string repository, long runId)
}
}

public async Task<string[]> GetRunBlobNamesAsync(string repository, DateTimeOffset minTime, DateTimeOffset maxTime, CancellationToken cancellationToken)
{
DateTimeOffset minDay = minTime.ToUniversalTime().Date;
DateTimeOffset maxDay = maxTime.ToUniversalTime().Date;

DateTimeOffset[] days = Enumerable.Range(0, (int)(maxDay - minDay).TotalDays + 1)
.Select(offset => minDay.AddDays(offset))
.ToArray();

List<string> blobNames = [];

foreach (DateTimeOffset day in days)
{
string blobPrefix = $"{repository}/{day:yyyy/MM/dd}/".ToLower();

AsyncPageable<BlobItem> blobs = this.runsContainerClient.GetBlobsAsync(prefix: blobPrefix, cancellationToken: cancellationToken);

await foreach (BlobItem blob in blobs)
{
blobNames.Add(blob.Name);
}
}

return blobNames.ToArray();
}

public string GetRunBlobName(WorkflowRun run)
{
string repository = run.Repository.FullName;
Expand Down
Loading

0 comments on commit f9bbdd2

Please sign in to comment.