-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathProgram.cs
128 lines (106 loc) · 4.1 KB
/
Program.cs
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
using System.Collections.Concurrent;
using System.Text;
using Azure.Messaging.EventHubs;
using Azure.Messaging.EventHubs.Processor;
using Azure.Storage.Blobs;
using Microsoft.Extensions.Logging;
var storageConnectionString = "<< CONNECTION STRING FOR THE STORAGE ACCOUNT >>";
var blobContainerName = "<< NAME OF THE BLOB CONTAINER >>";
var eventHubsConnectionString = "<< CONNECTION STRING FOR THE EVENT HUBS NAMESPACE >>";
var eventHubName = "<< NAME OF THE EVENT HUB >>";
var consumerGroup = "<< NAME OF THE EVENT HUB CONSUMER GROUP >>";
var storageClient = new BlobContainerClient(
storageConnectionString,
blobContainerName);
var processor = new EventProcessorClient(
storageClient,
consumerGroup,
eventHubsConnectionString,
eventHubName);
using var loggerFactory = LoggerFactory.Create(builder => builder.AddConsole());
var logger = loggerFactory.CreateLogger("EventHubConsumer");
var partitionEventCount = new ConcurrentDictionary<string, int>();
using var cts = new CancellationTokenSource();
cts.CancelAfter(TimeSpan.FromSeconds(60));
processor.ProcessEventAsync += processEventHandlerAsync;
processor.ProcessErrorAsync += processErrorHandler;
try
{
await processor.StartProcessingAsync(cts.Token);
await Task.Delay(Timeout.Infinite, cts.Token);
}
catch (OperationCanceledException) when (cts.IsCancellationRequested)
{
// This is expected if the cancellation token is
// signaled.
}
finally
{
// This may take up to the length of time defined
// as part of the configured TryTimeout of the processor;
// by default, this is 60 seconds.
await processor.StopProcessingAsync();
processor.ProcessEventAsync -= processEventHandlerAsync;
processor.ProcessErrorAsync -= processErrorHandler;
}
async Task processEventHandlerAsync(ProcessEventArgs args)
{
try
{
// If the cancellation token is signaled, then the
// processor has been asked to stop. It will invoke
// this handler with any events that were in flight;
// these will not be lost if not processed.
//
// It is up to the handler to decide whether to take
// action to process the event or to cancel immediately.
if (args.CancellationToken.IsCancellationRequested)
{
return;
}
var partition = args.Partition.PartitionId;
var eventBody = args.Data.EventBody.ToArray();
logger.LogInformation("Event from partition {partition} with length {eventBodyLength}.", partition, eventBody.Length);
var message = Encoding.UTF8.GetString(eventBody);
logger.LogInformation(message);
var eventsSinceLastCheckpoint = partitionEventCount.AddOrUpdate(
key: partition,
addValue: 1,
updateValueFactory: (_, currentCount) => currentCount + 1);
if (eventsSinceLastCheckpoint >= 50)
{
await args.UpdateCheckpointAsync();
partitionEventCount[partition] = 0;
}
}
catch (Exception ex)
{
// It is very important that you always guard against
// exceptions in your handler code; the processor does
// not have enough understanding of your code to
// determine the correct action to take. Any
// exceptions from your handlers go uncaught by
// the processor and will NOT be redirected to
// the error handler.
logger.LogCritical(ex, "Unexpected failure.");
}
}
Task processErrorHandler(ProcessErrorEventArgs args)
{
try
{
logger.LogError("Error in the EventProcessorClient. Operation: '{operation}' Exception: {exception}", args.Operation, args.Exception);
}
catch (Exception ex)
{
// It is very important that you always guard against
// exceptions in your handler code; the processor does
// not have enough understanding of your code to
// determine the correct action to take. Any
// exceptions from your handlers go uncaught by
// the processor and will NOT be handled in any
// way.
logger.LogCritical(ex, "Unexpected failure.");
}
return Task.CompletedTask;
}