Skip to content

Commit

Permalink
merge
Browse files Browse the repository at this point in the history
  • Loading branch information
aabs committed Aug 28, 2024
2 parents 764112f + 5f56c12 commit 840481b
Show file tree
Hide file tree
Showing 11 changed files with 178 additions and 233 deletions.
2 changes: 1 addition & 1 deletion ActorSrcGen.Abstractions/FirstStepAttribute.cs
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@
[AttributeUsage(AttributeTargets.Method, Inherited = true, AllowMultiple = false)]
public sealed class FirstStepAttribute : StepAttribute
{
public FirstStepAttribute(string inputName, int maxDegreeOfParallelism = 0, int maxBufferSize = 0)
public FirstStepAttribute(string inputName, int maxDegreeOfParallelism = 4, int maxBufferSize = 1)
: base(maxDegreeOfParallelism, maxBufferSize)
{
InputName = inputName;
Expand Down
2 changes: 1 addition & 1 deletion ActorSrcGen.Abstractions/LastStepAttribute.cs
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@
[AttributeUsage(AttributeTargets.Method, Inherited = true, AllowMultiple = false)]
public sealed class LastStepAttribute : StepAttribute
{
public LastStepAttribute(int maxDegreeOfParallelism = 0, int maxBufferSize = 0)
public LastStepAttribute(int maxDegreeOfParallelism = 4, int maxBufferSize = 1)
{
}
}
2 changes: 1 addition & 1 deletion ActorSrcGen.Abstractions/StepAttribute.cs
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@
[AttributeUsage(AttributeTargets.Method, Inherited = true, AllowMultiple = false)]
public class StepAttribute : Attribute
{
public StepAttribute(int maxDegreeOfParallelism = 0, int maxBufferSize = 0)
public StepAttribute(int maxDegreeOfParallelism = 4, int maxBufferSize = 1)
{
MaxBufferSize = maxBufferSize;
MaxDegreeOfParallelism = maxDegreeOfParallelism < 0 ? Environment.ProcessorCount : maxDegreeOfParallelism;
Expand Down
2 changes: 1 addition & 1 deletion ActorSrcGen.Playground/MyPipeline.cs
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@ public async Task<string> ReceiveBackfillRequest(CancellationToken cancellationT
}

// decode
[FirstStep("decode poll request")]
[FirstStep("decode poll request", 8, 1)]
[NextStep(nameof(SetupGapTracking))]
[NextStep(nameof(LogIncomingPollRequest))]
public TRequest DecodePollRequest(string x)
Expand Down
69 changes: 67 additions & 2 deletions ActorSrcGen/Helpers/TypeHelpers.cs
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
using Microsoft.CodeAnalysis;
using ActorSrcGen.Model;
using Microsoft.CodeAnalysis;
using Microsoft.CodeAnalysis.CSharp;
using Microsoft.CodeAnalysis.CSharp.Syntax;
using System.Collections.Immutable;
Expand Down Expand Up @@ -43,7 +44,7 @@ public static string RenderTypename(this ITypeSymbol? ts, bool stripTask = false
t = nt.TypeArguments[0];
}

if (stripCollection && t.AllInterfaces.Any(i => i.Name == "IEnumerable"))
if (stripCollection && t.IsCollection())
{
nt = t as INamedTypeSymbol;
t = nt.TypeArguments[0];
Expand Down Expand Up @@ -72,6 +73,9 @@ public static string RenderTypename(this GenericNameSyntax? ts, Compilation comp
return x.ToDisplayString(SymbolDisplayFormat.MinimallyQualifiedFormat);
}

public static bool IsCollection(this ITypeSymbol ts)
=> ts.Name is "List" or "IEnumerable";

public static bool HasMultipleOnwardSteps(this IMethodSymbol method, GenerationContext ctx)
{
if (ctx.DependencyGraph.TryGetValue(method, out var nextSteps))
Expand Down Expand Up @@ -100,4 +104,65 @@ public static TypeArgumentListSyntax AsTypeArgumentList(ImmutableArray<ITypeSymb
)
);
}
}

public static class MethodExtensions
{
public static bool ReturnTypeIsCollection(this IMethodSymbol method)
{
var t = method.ReturnType;

if (t.Name == "Task")
{
t = t.GetFirstTypeParameter();
}
var returnTypeIsEnumerable = t.IsCollection();
return returnTypeIsEnumerable;
}

public static bool IsAsynchronous(this IMethodSymbol method)
{
return (method.IsAsync || method.ReturnType.Name == "Task");
}

public static int GetMaxDegreeOfParallelism(this IMethodSymbol method)
{
var attr = method.GetBlockAttr();
if (attr != null)
{
var ord = attr.AttributeConstructor.Parameters.First(p => p.Name == "maxDegreeOfParallelism").Ordinal;
return (int)attr.ConstructorArguments[ord].Value;
}

return 1;
}
public static int GetMaxBufferSize(this IMethodSymbol method)
{
var attr = method.GetBlockAttr();
if (attr != null)
{
var ord = attr.AttributeConstructor.Parameters.First(p => p.Name == "maxBufferSize").Ordinal;
return (int)attr.ConstructorArguments[ord].Value;
}

return 1;
}

public static string GetReturnTypeCollectionType(this IMethodSymbol method)
{
if (method.ReturnTypeIsCollection())
{
return method.ReturnType.GetFirstTypeParameter().RenderTypename();
}
return method.ReturnType.RenderTypename(stripTask: true);
}

public static string? GetInputTypeName(this IMethodSymbol method)
{
if (method.Parameters.IsDefaultOrEmpty)
{
return default;
}
return method.Parameters.First().Type.RenderTypename();
}
}
144 changes: 66 additions & 78 deletions ActorSrcGen/Model/ActorVisitor.cs
Original file line number Diff line number Diff line change
Expand Up @@ -67,7 +67,7 @@ public void VisitActor(SyntaxAndSymbol symbol)
VisitMethod(mi);
}
actor.StepNodes = _blockStack.ToList();

foreach (var mi in GetIngestMethods(symbol.Symbol))
{
actor.Ingesters.Add(new IngestMethod(mi));
Expand Down Expand Up @@ -121,9 +121,10 @@ public void VisitActor(SyntaxAndSymbol symbol)
public void VisitMethod(IMethodSymbol method)
{
BlockNode? blockNode = null;
if (IsReturnTypeCollection(method))

if (method.ReturnTypeIsCollection())
{
if (IsAsyncMethod(method))
if (method.IsAsynchronous())
{
blockNode = CreateAsyncManyNode(method);
}
Expand All @@ -134,7 +135,7 @@ public void VisitMethod(IMethodSymbol method)
}
else
{
if (IsAsyncMethod(method))
if (method.IsAsynchronous())
{
blockNode = CreateAsyncNode(method);
}
Expand All @@ -150,11 +151,11 @@ public void VisitMethod(IMethodSymbol method)
blockNode = CreateActionNode(method);
}

blockNode.IsAsync = IsAsyncMethod(method);
blockNode.IsReturnTypeCollection = IsReturnTypeCollection(method);
blockNode.IsAsync = method.IsAsynchronous();
blockNode.IsReturnTypeCollection = method.ReturnTypeIsCollection();
blockNode.Id = ++BlockCounter;
blockNode.NumNextSteps = blockNode.Method.GetNextStepAttrs().Count();

if (blockNode.NumNextSteps > 1)
{
// if we get here, we have to split via a synthetic BroadcastBlock.
Expand All @@ -167,39 +168,27 @@ public void VisitMethod(IMethodSymbol method)

blockNode.IsEntryStep = method.IsStartStep();
blockNode.IsExitStep = method.IsEndStep();
blockNode.MaxDegreeOfParallelism = method.GetMaxDegreeOfParallelism();
blockNode.MaxBufferSize = method.GetMaxBufferSize();
_blockStack.Push(blockNode);
}

private bool IsReturnTypeCollection(IMethodSymbol method)
{
var t = method.ReturnType;
if (t.Name == "Task")
{
t = t.GetFirstTypeParameter();
}
var returnTypeIsEnumerable = t.AllInterfaces.Any(i => i.Name.StartsWith("IEnumerable", StringComparison.InvariantCultureIgnoreCase));
return returnTypeIsEnumerable;
}

private bool IsAsyncMethod(IMethodSymbol method)
{
return (method.IsAsync || method.ReturnType.Name == "Task");
}

private BlockNode CreateActionNode(IMethodSymbol method)
{
string inputTypeName = method.Parameters.First().Type.RenderTypename();
string inputTypeName = method.GetInputTypeName();
return new()
{
Method = method,
NodeType = NodeType.Action,
HandlerBody = $$"""
({{inputTypeName}} x) => {
try
{
{{method.Name}}(x);
}catch{}
}
HandlerBody =
$$"""
({{inputTypeName}} x) => {
try
{
{{method.Name}}(x);
}catch{}
}
"""
};
}
Expand All @@ -218,89 +207,88 @@ private BlockNode CreateIdentityBroadcastNode(IMethodSymbol method)
private BlockNode CreateAsyncManyNode(IMethodSymbol method)
{
var collectionType = method.ReturnType.GetFirstTypeParameter().RenderTypename();
string inputTypeName = method.Parameters.First().Type.RenderTypename();
var returnTypeName = method.ReturnType.RenderTypename(false).ToLowerInvariant();
string inputTypeName = method.GetInputTypeName();
return new()
{
Method = method,
NodeType = NodeType.TransformMany,
HandlerBody = $$"""
async ({{inputTypeName}} x) => {
var result = new List<{{collectionType}}>();
try
{
result.AddRange(await {{method.Name}}(x));
}catch{}
return result;
}
"""
async ({{inputTypeName}} x) => {
var result = new List<{{collectionType}}>();
try
{
result.AddRange(await {{method.Name}}(x));
}catch{}
return result;
}
"""
};
}

private BlockNode CreateAsyncNode(IMethodSymbol method)
{
var collectionType = method.ReturnType.GetFirstTypeParameter().RenderTypename();
string inputTypeName = method.Parameters.First().Type.RenderTypename();
var returnTypeName = method.ReturnType.RenderTypename(false).ToLowerInvariant();
string inputTypeName = method.GetInputTypeName();
return new()
{
Method = method,
NodeType = NodeType.TransformMany,
HandlerBody = $$"""
async ({{inputTypeName}} x) => {
var result = new List<{{collectionType}}>();
try
{
result.Add(await {{method.Name}}(x));
}catch{}
return result;
}
"""
async ({{inputTypeName}} x) => {
try
{
return await {{method.Name}}(x);
}
catch
{
return default;
}
}
"""
};
}

private BlockNode CreateDefaultNode(IMethodSymbol method)
{
string inputTypeName = method.Parameters.First().Type.RenderTypename();
var returnTypeName = method.ReturnType.RenderTypename(false).ToLowerInvariant();
string inputTypeName = method.GetInputTypeName();

return new()
{
Method = method,
NodeType = NodeType.Transform,
HandlerBody = $$"""
({{inputTypeName}} x) => {
try
{
return {{method.Name}}(x);
}
catch
{
return default;
}
}
"""
({{inputTypeName}} x) => {
try
{
return {{method.Name}}(x);
}
catch
{
return default;
}
}
"""
};
}

private BlockNode CreateManyNode(IMethodSymbol method)
{
var collectionType = method.ReturnType.GetFirstTypeParameter().RenderTypename();
string inputTypeName = method.Parameters.First().Type.RenderTypename();
var returnTypeName = method.ReturnType.RenderTypename(false).ToLowerInvariant();
var collectionType = method.GetReturnTypeCollectionType();
string inputTypeName = method.GetInputTypeName();

return new()
{
Method = method,
NodeType = NodeType.Transform,
HandlerBody = $$"""
({{inputTypeName}} x) => {
var result = new List<{{collectionType}}>();
try
{
result.AddRange({{method.Name}}(x));
}catch{}
return result;
}
"""
({{inputTypeName}} x) => {
var result = new List<{{collectionType}}>();
try
{
result.AddRange({{method.Name}}(x));
}catch{}
return result;
}
"""
};
}
}
2 changes: 2 additions & 0 deletions ActorSrcGen/Model/BlockGraph.cs
Original file line number Diff line number Diff line change
Expand Up @@ -113,6 +113,8 @@ public class BlockNode
public string OutputTypeName => OutputType.RenderTypename();
public bool IsAsync { get; set; }
public bool IsReturnTypeCollection { get; set; }
public int MaxDegreeOfParallelism { get; set; } = 4;
public int MaxBufferSize { get; set; } = 10;
}

public class IngestMethod
Expand Down
17 changes: 15 additions & 2 deletions ActorSrcGen/Templates/Actor.cs
Original file line number Diff line number Diff line change
Expand Up @@ -123,8 +123,21 @@ public virtual string TransformText()
#line default
#line hidden
this.Write(",\r\n new ExecutionDataflowBlockOptions() {\r\n BoundedCapa" +
"city = 1,\r\n MaxDegreeOfParallelism = 1\r\n });\r\n Regi" +
"sterChild(");
"city = ");

#line 44 "C:\dev\aabs\ActorSrcGen\ActorSrcGen\Templates\Actor.tt"
this.Write(this.ToStringHelper.ToStringWithCulture(step.MaxBufferSize));

#line default
#line hidden
this.Write(",\r\n MaxDegreeOfParallelism = ");

#line 45 "C:\dev\aabs\ActorSrcGen\ActorSrcGen\Templates\Actor.tt"
this.Write(this.ToStringHelper.ToStringWithCulture(step.MaxDegreeOfParallelism));

#line default
#line hidden
this.Write("\r\n });\r\n RegisterChild(");

#line 47 "C:\dev\aabs\ActorSrcGen\ActorSrcGen\Templates\Actor.tt"
this.Write(this.ToStringHelper.ToStringWithCulture(blockName));
Expand Down
Loading

0 comments on commit 840481b

Please sign in to comment.