Skip to content

Commit

Permalink
feat: grpc call timeout/deadline (#2)
Browse files Browse the repository at this point in the history
  • Loading branch information
catcherwong authored Feb 19, 2022
1 parent c19704e commit 25934e2
Show file tree
Hide file tree
Showing 3 changed files with 31 additions and 11 deletions.
23 changes: 16 additions & 7 deletions src/Dtmgrpc/DtmgRPCClient.cs
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
using Google.Protobuf.WellKnownTypes;
using Grpc.Core;
using Grpc.Net.Client;
using Microsoft.Extensions.Options;
using System;
using System.Collections.Generic;
using System.Threading.Tasks;
Expand All @@ -19,10 +20,12 @@ public class DtmgRPCClient : IDtmgRPCClient
private static readonly string HTTPPrefix = "http://";

private readonly Driver.IDtmDriver _dtmDriver;
private readonly DtmOptions _options;

public DtmgRPCClient(Driver.IDtmDriver dtmDriver)
public DtmgRPCClient(Driver.IDtmDriver dtmDriver, IOptions<DtmOptions> optionsAccs)
{
this._dtmDriver = dtmDriver;
this._options = optionsAccs.Value;
}

public async Task DtmGrpcCall(TransBase transBase, string operation)
Expand All @@ -31,14 +34,18 @@ public async Task DtmGrpcCall(TransBase transBase, string operation)
var method = new Method<dtmgpb.DtmRequest, Empty>(MethodType.Unary, DtmServiceName, operation, DtmRequestMarshaller, DtmReplyMarshaller);

using var channel = GrpcChannel.ForAddress(transBase.Dtm);
await channel.CreateCallInvoker().AsyncUnaryCall(method, string.Empty, new CallOptions(), dtmRequest);
var callOptions = new CallOptions()
.WithDeadline(DateTime.UtcNow.AddMilliseconds(_options.DtmTimeout));
await channel.CreateCallInvoker().AsyncUnaryCall(method, string.Empty, callOptions, dtmRequest);
}

public async Task<string> GenGid(string grpcServer)
{
using var channel = GrpcChannel.ForAddress(grpcServer);
var client = new dtmgpb.Dtm.DtmClient(channel);
var reply = await client.NewGidAsync(new Empty(), new CallOptions());
var callOptions = new CallOptions()
.WithDeadline(DateTime.UtcNow.AddMilliseconds(_options.DtmTimeout));
var reply = await client.NewGidAsync(new Empty(), callOptions);
return reply.Gid;
}

Expand All @@ -59,9 +66,9 @@ public async Task<TResponse> InvokeBranch<TRequest, TResponse>(TransBase tb, TRe
var grpcMethod = Utils.CreateMethod<TRequest, TResponse>(MethodType.Unary, serviceName, method);

var metadata = Utils.TransInfo2Metadata(tb.Gid, tb.TransType, branchId, op, tb.Dtm);
var callOptions = new CallOptions();
callOptions.WithHeaders(metadata);

var callOptions = new CallOptions()
.WithHeaders(metadata)
.WithDeadline(DateTime.UtcNow.AddMilliseconds(_options.BranchTimeout));
var resp = await channel.CreateCallInvoker().AsyncUnaryCall(grpcMethod, string.Empty, callOptions, msg);
return resp;
}
Expand All @@ -80,7 +87,9 @@ public async Task RegisterBranch(TransBase tb, string branchId, ByteString bd, D

using var channel = GrpcChannel.ForAddress(tb.Dtm);
var client = new dtmgpb.Dtm.DtmClient(channel);
await client.RegisterBranchAsync(request, new CallOptions());
var callOptions = new CallOptions()
.WithDeadline(DateTime.UtcNow.AddMilliseconds(_options.DtmTimeout));
await client.RegisterBranchAsync(request, callOptions);
}

public TransBase TransBaseFromGrpc(ServerCallContext context)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,28 +2,38 @@
using Google.Protobuf.WellKnownTypes;
using Grpc.Core;
using Grpc.Net.Client;
using Microsoft.Extensions.DependencyInjection;
using System;
using System.Threading.Tasks;
using Xunit;

namespace Dtmgrpc.Tests
namespace Dtmgrpc.IntegrationTests
{
public class GrpcCallInvokerTests
{
private static readonly Marshaller<dtmgpb.DtmGidReply> DtmGidReplyMarshaller = Marshallers.Create(r => r.ToByteArray(), data => dtmgpb.DtmGidReply.Parser.ParseFrom(data));
private static readonly Marshaller<Empty> EmptyMarshaller = Marshallers.Create(r => r.ToByteArray(), data => Empty.Parser.ParseFrom(data));

[Fact(Skip = "local")]
[Fact]
public async Task CallInvokerAsync()
{
AppContext.SetSwitch("System.Net.Http.SocketsHttpHandler.Http2UnencryptedSupport", true);

var method = new Method<Empty, dtmgpb.DtmGidReply>(MethodType.Unary, "dtmgimp.Dtm", "NewGid", EmptyMarshaller, DtmGidReplyMarshaller);

using var channel = GrpcChannel.ForAddress("http://localhost:36790");
using var channel = GrpcChannel.ForAddress(ITTestHelper.DTMgRPCUrl);
var resp = await channel.CreateCallInvoker().AsyncUnaryCall(method, string.Empty, new CallOptions(), new Empty());

Assert.NotNull(resp.Gid);
}

[Fact]
public async Task DtmTimoutTest()
{
var provider = ITTestHelper.AddDtmGrpc(1);
var client = provider.GetRequiredService<IDtmgRPCClient>();
var ex = await Assert.ThrowsAsync<RpcException>(async () => await client.GenGid(ITTestHelper.DTMgRPCUrl));
Assert.Equal(StatusCode.DeadlineExceeded, ex.StatusCode);
}
}
}
3 changes: 2 additions & 1 deletion tests/Dtmgrpc.IntegrationTests/ITTestHelper.cs
Original file line number Diff line number Diff line change
Expand Up @@ -57,14 +57,15 @@ public static busi.BusiReq GenBusiReq(bool outFailed, bool inFailed)
};
}

public static ServiceProvider AddDtmGrpc()
public static ServiceProvider AddDtmGrpc(int dtmTimout = 10000)
{
AppContext.SetSwitch("System.Net.Http.SocketsHttpHandler.Http2UnencryptedSupport", true);
var services = new ServiceCollection();
services.AddLogging();
services.AddDtmGrpc(x =>
{
x.DtmGrpcUrl = DTMgRPCUrl;
x.DtmTimeout = dtmTimout;
});

var provider = services.BuildServiceProvider();
Expand Down

0 comments on commit 25934e2

Please sign in to comment.