Skip to content

Commit

Permalink
Add rate limit detection and general pause-processing support (#8997)
Browse files Browse the repository at this point in the history
  • Loading branch information
hallipr authored Sep 17, 2024
1 parent 7db5686 commit c605e95
Show file tree
Hide file tree
Showing 9 changed files with 192 additions and 83 deletions.
Original file line number Diff line number Diff line change
@@ -1,4 +1,3 @@
using System;
using System.Threading;
using System.Threading.Tasks;
using Azure.Sdk.Tools.PipelineWitness.Configuration;
Expand Down Expand Up @@ -38,11 +37,5 @@ protected override async Task ProcessAsync(CancellationToken cancellationToken)
await this.runProcessor.UploadBuildDefinitionBlobsAsync(settings.Account, project);
}
}

protected override Task ProcessExceptionAsync(Exception ex)
{
this.logger.LogError(ex, "Error processing build definitions");
return Task.CompletedTask;
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -95,11 +95,5 @@ protected override async Task ProcessAsync(CancellationToken cancellationToken)
} while(!string.IsNullOrEmpty(continuationToken));
}
}

protected override Task ProcessExceptionAsync(Exception ex)
{
this.logger.LogError(ex, "Error processing missing builds");
return Task.CompletedTask;
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -44,14 +44,18 @@ public partial class GitHubActionProcessor
private readonly BlobContainerClient stepsContainerClient;
private readonly BlobContainerClient logsContainerClient;

public GitHubActionProcessor(ILogger<GitHubActionProcessor> logger, BlobServiceClient blobServiceClient, ICredentialStore credentials)
public GitHubActionProcessor(ILogger<GitHubActionProcessor> logger, BlobServiceClient blobServiceClient, GitHubClient githubClient)
{
ArgumentNullException.ThrowIfNull(githubClient);
ArgumentNullException.ThrowIfNull(blobServiceClient);
ArgumentNullException.ThrowIfNull(githubClient);

this.logger = logger;
this.logsContainerClient = blobServiceClient.GetBlobContainerClient(LogsContainerName);
this.runsContainerClient = blobServiceClient.GetBlobContainerClient(RunsContainerName);
this.jobsContainerClient = blobServiceClient.GetBlobContainerClient(JobsContainerName);
this.stepsContainerClient = blobServiceClient.GetBlobContainerClient(StepsContainerName);
this.client = new GitHubClient(new ProductHeaderValue("PipelineWitness", "1.0"), credentials);
this.client = githubClient;
}

public async Task ProcessAsync(string owner, string repository, long runId)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -54,59 +54,80 @@ protected override async Task ProcessAsync(CancellationToken cancellationToken)
DateTimeOffset runMinTime = DateTimeOffset.UtcNow.Subtract(settings.MissingGitHubActionsWorker.LookbackPeriod);
DateTimeOffset runMaxTime = DateTimeOffset.UtcNow.Subtract(TimeSpan.FromHours(1));

foreach (string ownerAndRepository in repositories)
try
{
foreach (string ownerAndRepository in repositories)
{
await ProcessRepositoryAsync(ownerAndRepository, runMinTime, runMaxTime, cancellationToken);
}
}
catch(RateLimitExceededException ex)
{
try
{
string owner = ownerAndRepository.Split('/')[0];
string repository = ownerAndRepository.Split('/')[1];

string[] knownBlobs = await this.processor.GetRunBlobNamesAsync(ownerAndRepository, runMinTime, runMaxTime, cancellationToken);

WorkflowRunsResponse listRunsResponse = await this.client.Actions.Workflows.Runs.List(owner, repository, new WorkflowRunsRequest
{
Created = $"{runMinTime:o}..{runMaxTime:o}",
Status = CheckRunStatusFilter.Completed,
});

var skipCount = 0;
var enqueueCount = 0;

foreach (WorkflowRun run in listRunsResponse.WorkflowRuns)
{
var blobName = this.processor.GetRunBlobName(run);

if (knownBlobs.Contains(blobName, StringComparer.InvariantCultureIgnoreCase))
{
skipCount++;
continue;
}

var queueMessage = new RunCompleteQueueMessage
{
Owner = owner,
Repository = repository,
RunId = run.Id
};

this.logger.LogInformation("Enqueuing missing run {Repository} {RunId} for processing", ownerAndRepository, run.Id);
await this.queue.EnqueueMessageAsync(queueMessage);
enqueueCount++;
}

this.logger.LogInformation("Enqueued {EnqueueCount} missing runs, skipped {SkipCount} existing runs in repository {Repository}", enqueueCount, skipCount, ownerAndRepository);
var rateLimit = await this.client.RateLimit.GetRateLimits();
this.logger.LogInformation("Rate limit details: {RateLimit}", rateLimit.Resources);
}
catch (Exception ex)
catch (Exception rateLimitException)
{
this.logger.LogError(ex, "Error processing repository {Repository}", ownerAndRepository);
this.logger.LogError(rateLimitException, "Error logging rate limit details");
}

var resetRemaining = ex.Reset - DateTimeOffset.UtcNow;
this.logger.LogError("Rate limit exceeded. Pausing processing for {RateLimitReset}", resetRemaining);
throw new PauseProcessingException(resetRemaining);
}
}

protected override Task ProcessExceptionAsync(Exception ex)
private async Task ProcessRepositoryAsync(string ownerAndRepository, DateTimeOffset runMinTime, DateTimeOffset runMaxTime, CancellationToken cancellationToken)
{
this.logger.LogError(ex, "Error processing missing builds");
return Task.CompletedTask;
string owner = ownerAndRepository.Split('/')[0];
string repository = ownerAndRepository.Split('/')[1];

string[] knownBlobs = await this.processor.GetRunBlobNamesAsync(ownerAndRepository, runMinTime, runMaxTime, cancellationToken);

WorkflowRunsResponse listRunsResponse;

try
{
listRunsResponse = await this.client.Actions.Workflows.Runs.List(owner, repository, new WorkflowRunsRequest
{
Created = $"{runMinTime:o}..{runMaxTime:o}",
Status = CheckRunStatusFilter.Completed,
});
}
catch (ApiException ex) when (ex.StatusCode == System.Net.HttpStatusCode.NotFound)
{
this.logger.LogWarning("Repository {Repository} not found", ownerAndRepository);
return;
}

var skipCount = 0;
var enqueueCount = 0;

foreach (WorkflowRun run in listRunsResponse.WorkflowRuns)
{
var blobName = this.processor.GetRunBlobName(run);

if (knownBlobs.Contains(blobName, StringComparer.InvariantCultureIgnoreCase))
{
skipCount++;
continue;
}

var queueMessage = new RunCompleteQueueMessage
{
Owner = owner,
Repository = repository,
RunId = run.Id
};

this.logger.LogInformation("Enqueuing missing run {Repository} {RunId} for processing", ownerAndRepository, run.Id);
await this.queue.EnqueueMessageAsync(queueMessage);
enqueueCount++;
}

this.logger.LogInformation("Enqueued {EnqueueCount} missing runs, skipped {SkipCount} existing runs in repository {Repository}", enqueueCount, skipCount, ownerAndRepository);
}
}
}
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
using System;
using System.Text.Json;
using System.Threading;
using System.Threading.Tasks;
Expand All @@ -9,18 +10,21 @@
using Microsoft.ApplicationInsights;
using Microsoft.Extensions.Logging;
using Microsoft.Extensions.Options;
using Octokit;

namespace Azure.Sdk.Tools.PipelineWitness.GitHubActions
{
internal class RunCompleteQueueWorker : QueueWorkerBackgroundService
{
private readonly ILogger logger;
private readonly GitHubActionProcessor processor;
private readonly GitHubClient githubClient;

public RunCompleteQueueWorker(
ILogger<RunCompleteQueueWorker> logger,
GitHubActionProcessor processor,
QueueServiceClient queueServiceClient,
GitHubClient githubClient,
TelemetryClient telemetryClient,
IOptionsMonitor<PipelineWitnessSettings> options)
: base(
Expand All @@ -32,6 +36,7 @@ public RunCompleteQueueWorker(
{
this.logger = logger;
this.processor = processor;
this.githubClient = githubClient;
}

internal override async Task ProcessMessageAsync(QueueMessage message, CancellationToken cancellationToken)
Expand All @@ -40,7 +45,29 @@ internal override async Task ProcessMessageAsync(QueueMessage message, Cancellat

var githubMessage = JsonSerializer.Deserialize<RunCompleteQueueMessage>(message.MessageText);

await this.processor.ProcessAsync(githubMessage.Owner, githubMessage.Repository, githubMessage.RunId);
try
{
await this.processor.ProcessAsync(githubMessage.Owner, githubMessage.Repository, githubMessage.RunId);
}
catch(RateLimitExceededException ex)
{
this.logger.LogError(ex, "Rate limit exceeded while processing run {RunId}", githubMessage.RunId);

try
{
var rateLimit = await this.githubClient.RateLimit.GetRateLimits();
this.logger.LogInformation("Rate limit details: {RateLimit}", rateLimit.Resources);
}
catch (Exception rateLimitException)
{
this.logger.LogError(rateLimitException, "Error logging rate limit details");
}

var resetRemaining = ex.Reset - DateTimeOffset.UtcNow;

throw new PauseProcessingException(resetRemaining);
}

}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@
using System;

namespace Azure.Sdk.Tools.PipelineWitness.Services
{
internal class PauseProcessingException : Exception
{
public TimeSpan PauseDuration { get; set; }

public PauseProcessingException(TimeSpan pauseDuration) : base()
{
PauseDuration = pauseDuration;
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
using System.Threading.Tasks;
using Azure.Sdk.Tools.PipelineWitness.Configuration;
using Azure.Sdk.Tools.PipelineWitness.Services.WorkTokens;
using Azure.Storage.Queues.Models;
using Microsoft.Extensions.Hosting;
using Microsoft.Extensions.Logging;

Expand Down Expand Up @@ -51,29 +52,32 @@ protected override async Task ExecuteAsync(CancellationToken stoppingToken)
var localCancellation = CancellationTokenSource.CreateLinkedTokenSource(stoppingToken);

// concurrently process and keep the lock alive
var processTask = ProcessAsync(localCancellation.Token);
var processTask = SafelyProcessAsync(localCancellation.Token);
var renewLockTask = RenewLockAsync(asyncLock, this.lockDuration, localCancellation.Token);
await Task.WhenAny(processTask, renewLockTask);

Task.WaitAny([processTask, renewLockTask], stoppingToken);

// whichever task completes first, cancel the other, then wait for both to complete
localCancellation.Cancel();
await Task.WhenAll(processTask, renewLockTask);

// awaiting processTask will have thrown an exception if it failed
Task.WaitAll([processTask, renewLockTask], stoppingToken);

// if processTask completed successfully, attemt to extend the lock for the cooldown period
// the cooldown period prevents the process from running too frequently
await asyncLock.TryExtendAsync(this.cooldownDuration, stoppingToken);
var (success, cooldown) = await processTask;

await asyncLock.TryExtendAsync(cooldown, stoppingToken);
asyncLock.ReleaseOnDispose = false;
}
else
{
this.logger.LogInformation("Lock {LockName} not acquired", this.lockName);
}
}
catch (Exception ex)
catch (Exception ex) when (ex is not OperationCanceledException && !stoppingToken.IsCancellationRequested)
{
await ProcessExceptionAsync(ex);
// outside of a lock, we can't set the cooldown period on the lock, so we ignore the cooldown period in the return value
this.logger.LogError(ex, "Error processing");
}

// Remove the time spent processing from the wait time to maintain the loop period
Expand All @@ -85,8 +89,36 @@ protected override async Task ExecuteAsync(CancellationToken stoppingToken)
}
}

protected abstract Task ProcessExceptionAsync(Exception ex);

/// <summary>
/// Call ProcessAsync and log any exception that occurs
/// </summary>
private async Task<(bool Success, TimeSpan PauseDuration)> SafelyProcessAsync(CancellationToken cancellationToken)
{
try
{
await ProcessAsync(cancellationToken).ConfigureAwait(false);
return (true, this.cooldownDuration);
}
catch (PauseProcessingException ex)
{
if (ex.InnerException != null)
{
this.logger.LogError(ex.InnerException, "ProcessAsync threw exception");
}

return (false, ex.PauseDuration);
}
catch (Exception ex)
{
this.logger.LogError(ex, "ProcessAsync threw exception");
return (false, TimeSpan.Zero);
}
}

/// <summary>
/// The main process for the defived class
/// </summary>
protected abstract Task ProcessAsync(CancellationToken cancellationToken);

private async Task RenewLockAsync(IAsyncLock asyncLock, TimeSpan duration, CancellationToken cancellationToken)
Expand Down
Loading

0 comments on commit c605e95

Please sign in to comment.