Skip to content

Commit

Permalink
Merge branch 'master' of github.com:EventStore/EventStore-Client-Dotn…
Browse files Browse the repository at this point in the history
…et into DEVEX-185-Rebranding
  • Loading branch information
w1am committed Jan 8, 2025
2 parents 4c238c7 + f267b97 commit 9c693e6
Show file tree
Hide file tree
Showing 3 changed files with 33 additions and 23 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -12,8 +12,13 @@ public static async ValueTask<T> TraceClientOperation<T>(
this ActivitySource source,
Func<ValueTask<T>> tracedOperation,
string operationName,
ActivityTagsCollection? tags = null
Func<ActivityTagsCollection?>? tagsFactory = null
) {
if (source.HasNoActiveListeners())
return await tracedOperation().ConfigureAwait(false);

var tags = tagsFactory?.Invoke();

using var activity = StartActivity(source, operationName, ActivityKind.Client, tags, Activity.Current?.Context);

try {
Expand Down Expand Up @@ -47,7 +52,7 @@ public static void TraceSubscriptionEvent(
.WithRequiredTag(TelemetryTags.EventStore.EventId, resolvedEvent.OriginalEvent.EventId.ToString())
.WithRequiredTag(TelemetryTags.EventStore.EventType, resolvedEvent.OriginalEvent.EventType)
// Ensure consistent server.address attribute when connecting to cluster via dns discovery
.WithGrpcChannelServerTags(channelInfo)
.WithGrpcChannelServerTags(settings, channelInfo)
.WithClientSettingsServerTags(settings)
.WithOptionalTag(
TelemetryTags.Database.User,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,16 +7,21 @@ namespace EventStore.Client.Diagnostics;

static class ActivityTagsCollectionExtensions {
[MethodImpl(MethodImplOptions.AggressiveInlining)]
public static ActivityTagsCollection WithGrpcChannelServerTags(this ActivityTagsCollection tags, ChannelInfo? channelInfo) {
public static ActivityTagsCollection WithGrpcChannelServerTags(this ActivityTagsCollection tags, EventStoreClientSettings settings, ChannelInfo? channelInfo) {
if (channelInfo is null)
return tags;

var authorityParts = channelInfo.Channel.Target.Split(':');

return tags
.WithRequiredTag(TelemetryTags.Server.Address, authorityParts[0])
.WithRequiredTag(TelemetryTags.Server.Port, int.Parse(authorityParts[1]));
}
var authorityParts = channelInfo.Channel.Target.Split([':'], StringSplitOptions.RemoveEmptyEntries);

string host = authorityParts[0];
int port = authorityParts.Length == 1
? settings.ConnectivitySettings.Insecure ? 80 : 443
: int.Parse(authorityParts[1]);

return tags
.WithRequiredTag(TelemetryTags.Server.Address, host)
.WithRequiredTag(TelemetryTags.Server.Port, port);
}

[MethodImpl(MethodImplOptions.AggressiveInlining)]
public static ActivityTagsCollection WithClientSettingsServerTags(this ActivityTagsCollection source, EventStoreClientSettings settings) {
Expand Down
28 changes: 14 additions & 14 deletions src/EventStore.Client/Streams/EventStoreClient.Append.cs
Original file line number Diff line number Diff line change
Expand Up @@ -113,13 +113,7 @@ ValueTask<IWriteResult> AppendToStreamInternal(
UserCredentials? userCredentials,
CancellationToken cancellationToken
) {
var tags = new ActivityTagsCollection()
.WithRequiredTag(TelemetryTags.EventStore.Stream, header.Options.StreamIdentifier.StreamName.ToStringUtf8())
.WithGrpcChannelServerTags(channelInfo)
.WithClientSettingsServerTags(Settings)
.WithOptionalTag(TelemetryTags.Database.User, userCredentials?.Username ?? Settings.DefaultCredentials?.Username);

return EventStoreClientDiagnostics.ActivitySource.TraceClientOperation(Operation, TracingConstants.Operations.Append, tags);
return EventStoreClientDiagnostics.ActivitySource.TraceClientOperation(Operation, TracingConstants.Operations.Append, AppendTags);

async ValueTask<IWriteResult> Operation() {
using var call = new StreamsClient(channelInfo.CallInvoker)
Expand Down Expand Up @@ -157,6 +151,12 @@ await call.RequestStream

return HandleWrongExpectedRevision(response, header, operationOptions);
}

ActivityTagsCollection AppendTags() => new ActivityTagsCollection()
.WithRequiredTag(TelemetryTags.EventStore.Stream, header.Options.StreamIdentifier.StreamName.ToStringUtf8())
.WithGrpcChannelServerTags(Settings, channelInfo)
.WithClientSettingsServerTags(Settings)
.WithOptionalTag(TelemetryTags.Database.User, userCredentials?.Username ?? Settings.DefaultCredentials?.Username);
}

IWriteResult HandleSuccessAppend(AppendResp response, AppendReq header) {
Expand Down Expand Up @@ -282,16 +282,10 @@ ValueTask<IWriteResult> AppendInternal(
IEnumerable<EventData> events,
CancellationToken cancellationToken
) {
var tags = new ActivityTagsCollection()
.WithRequiredTag(TelemetryTags.EventStore.Stream, options.StreamIdentifier.StreamName.ToStringUtf8())
.WithGrpcChannelServerTags(_channelInfo)
.WithClientSettingsServerTags(_settings)
.WithOptionalTag(TelemetryTags.Database.User, _settings.DefaultCredentials?.Username);

return EventStoreClientDiagnostics.ActivitySource.TraceClientOperation(
Operation,
TracingConstants.Operations.Append,
tags
AppendTags
);

async ValueTask<IWriteResult> Operation() {
Expand All @@ -310,6 +304,12 @@ async ValueTask<IWriteResult> Operation() {

return await complete.Task.ConfigureAwait(false);
}

ActivityTagsCollection AppendTags() => new ActivityTagsCollection()
.WithRequiredTag(TelemetryTags.EventStore.Stream, options.StreamIdentifier.StreamName.ToStringUtf8())
.WithGrpcChannelServerTags(_settings, _channelInfo)
.WithClientSettingsServerTags(_settings)
.WithOptionalTag(TelemetryTags.Database.User, _settings.DefaultCredentials?.Username);
}

async Task Duplex(ValueTask<ChannelInfo> channelInfoTask) {
Expand Down

0 comments on commit 9c693e6

Please sign in to comment.