Skip to content

Commit

Permalink
WIP: CRM connection pool
Browse files Browse the repository at this point in the history
  • Loading branch information
gunndabad committed Oct 17, 2024
1 parent 75073ad commit 8ef1b78
Show file tree
Hide file tree
Showing 3 changed files with 186 additions and 7 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -220,7 +220,7 @@ public static void Main(string[] args)
{
var crmServiceClient = GetCrmServiceClient();
services.AddTrnGenerationApi(configuration);
services.AddDefaultServiceClient(ServiceLifetime.Transient, _ => crmServiceClient.Clone());
services.AddPooledDefaultServiceClient(crmServiceClient, size: 200);
services.AddTransient<IDataverseAdapter, DataverseAdapter>();

services.AddHealthChecks()
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,154 @@
using System.Collections.Concurrent;
using Microsoft.PowerPlatform.Dataverse.Client;
using Microsoft.Xrm.Sdk;
using Microsoft.Xrm.Sdk.Query;

namespace TeachingRecordSystem.Core.Dqt;

internal sealed class PooledOrganizationService : IOrganizationServiceAsync2, IDisposable
{
private readonly BlockingCollection<ServiceClient> _pool;
private bool _disposed;

private PooledOrganizationService(IEnumerable<ServiceClient> connections)
{
_pool = new BlockingCollection<ServiceClient>(new ConcurrentQueue<ServiceClient>(connections));
}

public static PooledOrganizationService Create(ServiceClient serviceClient, int size)
{
// Even though ServiceClient.Clone() is synchronous it can do network calls
// so we do the invocations in parallel.

var connections = new List<ServiceClient>(size);
connections.AddRange(Enumerable.Range(0, size).Select(_ => serviceClient.Clone()));
return new PooledOrganizationService(connections);
}

public void Dispose()
{
ObjectDisposedException.ThrowIf(_disposed, this);

_pool.Dispose();
_disposed = true;
}

private async Task<TResult> WithPooledConnectionAsync<TResult>(Func<IOrganizationServiceAsync2, Task<TResult>> action, CancellationToken cancellationToken = default)
{
ObjectDisposedException.ThrowIf(_disposed, this);

var client = _pool.Take(cancellationToken);
try
{
return await action(client);
}
finally
{
_pool.Add(client);
}
}

private async Task WithPooledConnectionAsync(Func<IOrganizationServiceAsync2, Task> action, CancellationToken cancellationToken = default) =>
await WithPooledConnectionAsync(async client =>
{
await action(client);
return 1;
});

private TResult WithPooledConnection<TResult>(Func<IOrganizationServiceAsync2, TResult> action)
{
ObjectDisposedException.ThrowIf(_disposed, this);

var client = _pool.Take();
try
{
return action(client);
}
finally
{
_pool.Add(client);
}
}

private void WithPooledConnection(Action<IOrganizationServiceAsync2> action) =>
WithPooledConnection(client =>
{
action(client);
return 1;
});

public Task AssociateAsync(string entityName, Guid entityId, Relationship relationship, EntityReferenceCollection relatedEntities, CancellationToken cancellationToken) =>
WithPooledConnectionAsync(s => s.AssociateAsync(entityName, entityId, relationship, relatedEntities, cancellationToken), cancellationToken);

public Task<Guid> CreateAsync(Entity entity, CancellationToken cancellationToken) =>
WithPooledConnectionAsync(s => s.CreateAsync(entity, cancellationToken), cancellationToken);

public Task<Entity> CreateAndReturnAsync(Entity entity, CancellationToken cancellationToken) =>
WithPooledConnectionAsync(s => s.CreateAndReturnAsync(entity, cancellationToken), cancellationToken);

public Task DeleteAsync(string entityName, Guid id, CancellationToken cancellationToken) =>
WithPooledConnectionAsync(s => s.DeleteAsync(entityName, id, cancellationToken), cancellationToken);

public Task DisassociateAsync(string entityName, Guid entityId, Relationship relationship, EntityReferenceCollection relatedEntities, CancellationToken cancellationToken) =>
WithPooledConnectionAsync(s => s.DisassociateAsync(entityName, entityId, relationship, relatedEntities, cancellationToken), cancellationToken);

public Task<OrganizationResponse> ExecuteAsync(OrganizationRequest request, CancellationToken cancellationToken) =>
WithPooledConnectionAsync(s => s.ExecuteAsync(request, cancellationToken), cancellationToken);

public Task<Entity> RetrieveAsync(string entityName, Guid id, ColumnSet columnSet, CancellationToken cancellationToken) =>
WithPooledConnectionAsync(s => s.RetrieveAsync(entityName, id, columnSet, cancellationToken), cancellationToken);

public Task<EntityCollection> RetrieveMultipleAsync(QueryBase query, CancellationToken cancellationToken) =>
WithPooledConnectionAsync(s => s.RetrieveMultipleAsync(query, cancellationToken), cancellationToken);

public Task UpdateAsync(Entity entity, CancellationToken cancellationToken) =>
WithPooledConnectionAsync(s => s.UpdateAsync(entity, cancellationToken), cancellationToken);

public Task<Guid> CreateAsync(Entity entity) =>
WithPooledConnectionAsync(s => s.CreateAsync(entity));

public Task<Entity> RetrieveAsync(string entityName, Guid id, ColumnSet columnSet) =>
WithPooledConnectionAsync(s => s.RetrieveAsync(entityName, id, columnSet));

public Task UpdateAsync(Entity entity) =>
WithPooledConnectionAsync(s => s.UpdateAsync(entity));

public Task DeleteAsync(string entityName, Guid id) =>
WithPooledConnectionAsync(s => s.DeleteAsync(entityName, id));

public Task<OrganizationResponse> ExecuteAsync(OrganizationRequest request) =>
WithPooledConnectionAsync(s => s.ExecuteAsync(request));

public Task AssociateAsync(string entityName, Guid entityId, Relationship relationship, EntityReferenceCollection relatedEntities) =>
WithPooledConnectionAsync(s => s.AssociateAsync(entityName, entityId, relationship, relatedEntities));

public Task DisassociateAsync(string entityName, Guid entityId, Relationship relationship, EntityReferenceCollection relatedEntities) =>
WithPooledConnectionAsync(s => s.DisassociateAsync(entityName, entityId, relationship, relatedEntities));

public Task<EntityCollection> RetrieveMultipleAsync(QueryBase query) =>
WithPooledConnectionAsync(s => s.RetrieveMultipleAsync(query));

public Guid Create(Entity entity) =>
WithPooledConnection(s => s.Create(entity));

public Entity Retrieve(string entityName, Guid id, ColumnSet columnSet) =>
WithPooledConnection(s => s.Retrieve(entityName, id, columnSet));

public void Update(Entity entity) =>
WithPooledConnection(s => s.Update(entity));

public void Delete(string entityName, Guid id) =>
WithPooledConnection(s => s.Delete(entityName, id));

public OrganizationResponse Execute(OrganizationRequest request) =>
WithPooledConnection(s => s.Execute(request));

public void Associate(string entityName, Guid entityId, Relationship relationship, EntityReferenceCollection relatedEntities) =>
WithPooledConnection(s => s.Associate(entityName, entityId, relationship, relatedEntities));

public void Disassociate(string entityName, Guid entityId, Relationship relationship, EntityReferenceCollection relatedEntities) =>
WithPooledConnection(s => s.Disassociate(entityName, entityId, relationship, relatedEntities));

public EntityCollection RetrieveMultiple(QueryBase query) =>
WithPooledConnection(s => s.RetrieveMultiple(query));
}
Original file line number Diff line number Diff line change
Expand Up @@ -21,12 +21,37 @@ public static IServiceCollection AddCrmQueries(this IServiceCollection services)
return services;
}

public static IServiceCollection AddPooledDefaultServiceClient(
this IServiceCollection services,
ServiceClient baseServiceClient,
int size)
{
services.AddKeyedSingleton(serviceKey: null, PooledOrganizationService.Create(baseServiceClient, size));

services.AddDefaultServiceClient(ServiceLifetime.Singleton, sp => sp.GetRequiredKeyedService<PooledOrganizationService>(serviceKey: null));

return services;
}

public static IServiceCollection AddPooledNamedServiceClient(
this IServiceCollection services,
string name,
ServiceClient baseServiceClient,
int size)
{
services.AddKeyedSingleton(name, PooledOrganizationService.Create(baseServiceClient, size));

services.AddNamedServiceClient(name, ServiceLifetime.Singleton, sp => sp.GetRequiredKeyedService<PooledOrganizationService>(name));

return services;
}

public static IServiceCollection AddDefaultServiceClient(
this IServiceCollection services,
ServiceLifetime lifetime,
Func<IServiceProvider, IOrganizationServiceAsync2> createServiceClient)
Func<IServiceProvider, IOrganizationServiceAsync2> getServiceClient)
{
services.AddServiceClient(name: null, lifetime, createServiceClient);
services.AddServiceClient(name: null, lifetime, getServiceClient);

services.AddSingleton<ReferenceDataCache>();

Expand All @@ -37,9 +62,9 @@ public static IServiceCollection AddNamedServiceClient(
this IServiceCollection services,
string name,
ServiceLifetime lifetime,
Func<IServiceProvider, IOrganizationServiceAsync2> createServiceClient)
Func<IServiceProvider, IOrganizationServiceAsync2> getServiceClient)
{
return AddServiceClient(services, name, lifetime, createServiceClient);
return AddServiceClient(services, name, lifetime, getServiceClient);
}

public static IServiceCollection AddCrmEntityChangesService(
Expand Down Expand Up @@ -73,14 +98,14 @@ private static IServiceCollection AddServiceClient(
this IServiceCollection services,
string? name,
ServiceLifetime lifetime,
Func<IServiceProvider, IOrganizationServiceAsync2> createServiceClient)
Func<IServiceProvider, IOrganizationServiceAsync2> getServiceClient)
{
services.TryAddSingleton<ICrmServiceClientProvider, CrmServiceClientProvider>();

services.Add(ServiceDescriptor.DescribeKeyed(
typeof(IOrganizationServiceAsync2),
name,
implementationFactory: (IServiceProvider serviceProvider, object? key) => createServiceClient(serviceProvider),
implementationFactory: (IServiceProvider serviceProvider, object? key) => getServiceClient(serviceProvider),
lifetime));

services.Add(ServiceDescriptor.DescribeKeyed(
Expand Down

0 comments on commit 8ef1b78

Please sign in to comment.