-
Notifications
You must be signed in to change notification settings - Fork 0
/
FetchADFMetrics.cs
188 lines (160 loc) · 8.42 KB
/
FetchADFMetrics.cs
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
using Microsoft.Azure.Management.DataFactory;
using Microsoft.Azure.Management.DataFactory.Models;
using Microsoft.Azure.Management.ResourceManager;
using Microsoft.Azure.WebJobs;
using Microsoft.Extensions.Logging;
using Microsoft.IdentityModel.Clients.ActiveDirectory;
using Microsoft.Rest;
using Newtonsoft.Json.Linq;
using System;
using System.Collections.Generic;
using System.Net;
using System.Net.Http;
using System.Threading.Tasks;
namespace AzureDataFactoryNewRelic.FetchMetrics
{
public static class FetchADFMetrics
{
[FunctionName("FetchADFMetrics")]
public static async Task Run([TimerTrigger("0 */5 * * * *")]TimerInfo myTimer, ILogger log)
{
log.LogInformation($"FetchADFMetrics function executed at: {DateTime.Now}");
// Set variables
var tenantID = Environment.GetEnvironmentVariable("TenantId", EnvironmentVariableTarget.Process);
var applicationId = Environment.GetEnvironmentVariable("ApplicationId", EnvironmentVariableTarget.Process);
var authenticationKey = Environment.GetEnvironmentVariable("AuthenticationKey", EnvironmentVariableTarget.Process);
var subscriptionId = Environment.GetEnvironmentVariable("SubscriptionId", EnvironmentVariableTarget.Process);
var resourceGroup = Environment.GetEnvironmentVariable("ResourceGroup", EnvironmentVariableTarget.Process);
var minuteInterval = Environment.GetEnvironmentVariable("MinuteInterval", EnvironmentVariableTarget.Process);
var newRelicInsightsInsertAPIKey = Environment.GetEnvironmentVariable("NewRelicInsightsInsertAPIKey", EnvironmentVariableTarget.Process);
var newRelicAccountId = Environment.GetEnvironmentVariable("NewRelicAccountId", EnvironmentVariableTarget.Process);
// Authenticate and create a data factory management client
var context = new AuthenticationContext("https://login.windows.net/" + tenantID);
ClientCredential cc = new ClientCredential(applicationId, authenticationKey);
AuthenticationResult result = context.AcquireTokenAsync("https://management.azure.com/", cc).Result;
ServiceClientCredentials cred = new TokenCredentials(result.AccessToken);
var factoryClient = new DataFactoryManagementClient(cred) { SubscriptionId = subscriptionId };
// Get list of factories
List<Factory> factories = new List<Factory>();
string nextPageLink = null;
do
{
try
{
Microsoft.Rest.Azure.IPage<Factory> newFactories;
if (string.IsNullOrEmpty(nextPageLink))
{
newFactories = await factoryClient.Factories.ListByResourceGroupAsync(resourceGroup);
}
else
{
newFactories = await factoryClient.Factories.ListByResourceGroupNextAsync(nextPageLink);
}
nextPageLink = newFactories.NextPageLink;
factories.AddRange(newFactories);
}
catch (Exception e)
{
log.LogError("Error fetching Factories", e);
log.LogError($"Exception type = {e.GetType().Name}");
log.LogError($"Exception message = {e.Message}; source = {e.Source}");
log.LogError($"Exception stack trace = {e.StackTrace}");
return;
}
}
while (!string.IsNullOrEmpty(nextPageLink));
if (factories.Count == 0)
{
log.LogInformation($"No Factories found in resource group {resourceGroup}");
return;
}
// Get list of pipeline runs in the last interval of minutes by factory
List<PipelineRun> pipelineRuns = new List<PipelineRun>();
nextPageLink = null;
foreach (var f in factories)
{
do
{
try
{
string token = null;
do
{
var updatedPipelineRuns = await factoryClient.PipelineRuns.QueryByFactoryAsync(
resourceGroup,
f.Name,
new RunFilterParameters(
DateTime.UtcNow.AddMinutes(-5),
DateTime.UtcNow,
token
)
);
pipelineRuns.AddRange(updatedPipelineRuns.Value);
token = updatedPipelineRuns.ContinuationToken;
}
while (!string.IsNullOrEmpty(token));
}
catch (Exception e)
{
log.LogError("Error fetching PipelineRuns", e);
log.LogError($"Exception type = {e.GetType().Name}");
log.LogError($"Exception message = {e.Message}; source = {e.Source}");
log.LogError($"Exception stack trace = {e.StackTrace}");
return;
}
}
while (!string.IsNullOrEmpty(nextPageLink));
}
if (pipelineRuns.Count == 0)
{
log.LogInformation($"No pipelineRun updates in the last {minuteInterval} minutes");
return;
}
// Insert PipelineRun objects into New Relic Insights
var handler = new HttpClientHandler();
if (handler.SupportsAutomaticDecompression)
{
handler.AutomaticDecompression = DecompressionMethods.GZip | DecompressionMethods.Deflate;
}
using (var httpClient = new HttpClient(handler))
{
httpClient.DefaultRequestHeaders.Add("X-Insert-Key", newRelicInsightsInsertAPIKey);
httpClient.DefaultRequestHeaders.Add("Content-Type", "application/json");
foreach (var p in pipelineRuns)
{
var jobj = new JObject(p);
jobj.Add("eventType", "ADFPipelineRun"); // eventType is required
//Need to convert DateTimes into Unix timestamps in seconds or milliseconds
if (p.LastUpdated.HasValue)
{
jobj["LastUpdated"] = ((DateTimeOffset)p.LastUpdated.Value.ToUniversalTime()).ToUnixTimeSeconds();
jobj["timestamp"] = ((DateTimeOffset)p.LastUpdated.Value.ToUniversalTime()).ToUnixTimeSeconds(); // If LastUpdated is populated, use this for the Insights timestamp
}
if (p.RunStart.HasValue)
{
jobj["RunStart"] = ((DateTimeOffset)p.RunStart.Value.ToUniversalTime()).ToUnixTimeSeconds();
}
if (p.RunEnd.HasValue)
{
jobj["RunEnd"] = ((DateTimeOffset)p.RunEnd.Value.ToUniversalTime()).ToUnixTimeSeconds();
}
try
{
var response = await httpClient.PostAsJsonAsync($"https://insights-collector.newrelic.com/v1/accounts/{newRelicAccountId}/events", p);
response.EnsureSuccessStatusCode();
string responseBody = await response.Content.ReadAsStringAsync();
log.LogInformation("Event insertion into Insights succeeeded");
log.LogInformation(responseBody);
}
catch (Exception e)
{
log.LogError("Error inserting PipelineRun into Insights", e);
log.LogError($"Exception type = {e.GetType().Name}");
log.LogError($"Exception message = {e.Message}; source = {e.Source}");
log.LogError($"Exception stack trace = {e.StackTrace}");
}
}
}
}
}
}