Skip to content

Commit

Permalink
Merge pull request #240 from graphql-dotnet/fix-websocket-send-queue
Browse files Browse the repository at this point in the history
Fix websocket send queue
  • Loading branch information
rose-a authored May 25, 2020
2 parents fb657d5 + 059b32b commit 47b4abf
Show file tree
Hide file tree
Showing 4 changed files with 19 additions and 8 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -79,8 +79,11 @@ private async Task<GraphQLResponse<TResponse>> ExecuteQueryAsync<TResponse>(Grap
private async Task<IObservable<GraphQLResponse<TResponse>>> ExecuteSubscriptionAsync<TResponse>(GraphQLRequest request, CancellationToken cancellationToken = default)
{
var result = await ExecuteAsync(request, cancellationToken);
return ((SubscriptionExecutionResult)result).Streams?.Values.SingleOrDefault()?
.SelectMany(executionResult => Observable.FromAsync(token => ExecutionResultToGraphQLResponse<TResponse>(executionResult, token)));
var stream = ((SubscriptionExecutionResult)result).Streams?.Values.SingleOrDefault();

return stream == null
? Observable.Throw<GraphQLResponse<TResponse>>(new InvalidOperationException("the GraphQL execution did not return an observable"))
: stream.SelectMany(executionResult => Observable.FromAsync(token => ExecutionResultToGraphQLResponse<TResponse>(executionResult, token)));
}

private async Task<ExecutionResult> ExecuteAsync(GraphQLRequest request, CancellationToken cancellationToken = default)
Expand Down
6 changes: 6 additions & 0 deletions src/GraphQL.Client/GraphQLHttpClient.cs
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,9 @@
using System.Collections.Concurrent;
using System.Diagnostics;
using System.IO;
using System.Linq;
using System.Net.Http;
using System.Net.Http.Headers;
using System.Threading;
using System.Threading.Tasks;
using GraphQL.Client.Abstractions;
Expand Down Expand Up @@ -57,6 +59,10 @@ public GraphQLHttpClient(GraphQLHttpClientOptions options, IGraphQLWebsocketJson
Options = options ?? throw new ArgumentNullException(nameof(options));
JsonSerializer = serializer ?? throw new ArgumentNullException(nameof(serializer), "please configure the JSON serializer you want to use");
HttpClient = httpClient ?? throw new ArgumentNullException(nameof(httpClient));

if (!HttpClient.DefaultRequestHeaders.UserAgent.Any())
HttpClient.DefaultRequestHeaders.UserAgent.Add(new ProductInfoHeaderValue(GetType().Assembly.GetName().Name, GetType().Assembly.GetName().Version.ToString()));

_graphQlHttpWebSocket = new GraphQLHttpWebSocket(GetWebSocketUri(), this);
}

Expand Down
9 changes: 6 additions & 3 deletions src/GraphQL.Client/Websocket/GraphQLHttpWebSocket.cs
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
using System.IO;
using System.Net.Http;
using System.Net.WebSockets;
using System.Reactive;
using System.Reactive.Concurrency;
using System.Reactive.Disposables;
using System.Reactive.Linq;
Expand Down Expand Up @@ -84,7 +85,8 @@ public GraphQLHttpWebSocket(Uri webSocketUri, GraphQLHttpClient client)

_requestSubscription = _requestSubject
.ObserveOn(_sendLoopScheduler)
.Subscribe(async request => await SendWebSocketRequestAsync(request));
.SelectMany(SendWebSocketRequestAsync)
.Subscribe();
}

#region Send requests
Expand Down Expand Up @@ -339,14 +341,14 @@ private Task QueueWebSocketRequest(GraphQLWebSocketRequest request)
return request.SendTask();
}

private async Task SendWebSocketRequestAsync(GraphQLWebSocketRequest request)
private async Task<Unit> SendWebSocketRequestAsync(GraphQLWebSocketRequest request)
{
try
{
if (_internalCancellationToken.IsCancellationRequested)
{
request.SendCanceled();
return;
return Unit.Default;
}

await InitializeWebSocket();
Expand All @@ -362,6 +364,7 @@ await _clientWebSocket.SendAsync(
{
request.SendFailed(e);
}
return Unit.Default;
}

#endregion
Expand Down
5 changes: 2 additions & 3 deletions src/src.props
Original file line number Diff line number Diff line change
Expand Up @@ -7,9 +7,8 @@
<LangVersion>8.0</LangVersion>
</PropertyGroup>

<!--TODO: remove condition when https://github.com/GitTools/GitVersion/issues/2063 is fixed-->
<ItemGroup Condition=" '$(DEBUG)' != ''">
<PackageReference Update="GitVersionTask" Version="5.2.4">
<ItemGroup>
<PackageReference Update="GitVersionTask" Version="5.3.4">
<PrivateAssets>all</PrivateAssets>
<IncludeAssets>runtime; build; native; contentfiles; analyzers; buildtransitive</IncludeAssets>
</PackageReference>
Expand Down

0 comments on commit 47b4abf

Please sign in to comment.