diff --git a/.github/workflows/build.yml b/.github/workflows/build.yml index ab043b4b..d35f2aec 100644 --- a/.github/workflows/build.yml +++ b/.github/workflows/build.yml @@ -181,7 +181,7 @@ jobs: tls: ${{ matrix.tls }} mtls: ${{ matrix.mtls }} ext-csharp-version: ${{ needs.versionning.outputs.version }} - core-version: 0.19.3 + core-version: 0.20.0 - name: Setup hosts file run : echo -e "$(kubectl get svc ingress -n armonik -o jsonpath={.status.loadBalancer.ingress[0].ip})\tarmonik.local" | sudo tee -a /etc/hosts @@ -304,7 +304,7 @@ jobs: working-directory: ${{ github.workspace }}/infra type: localhost ext-csharp-version: ${{ needs.versionning.outputs.version }} - core-version: 0.19.3 + core-version: 0.20.0 - name: Run Test timeout-minutes: 20 diff --git a/Client/src/Common/Exceptions/ResultAbortedException.cs b/Client/src/Common/Exceptions/ResultAbortedException.cs new file mode 100644 index 00000000..521ca656 --- /dev/null +++ b/Client/src/Common/Exceptions/ResultAbortedException.cs @@ -0,0 +1,36 @@ +// This file is part of the ArmoniK project +// +// Copyright (C) ANEO, 2021-$CURRENT_YEAR$. All rights reserved. +// +// Licensed under the Apache License, Version 2.0 (the "License") +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. +// + +using System; + +namespace ArmoniK.DevelopmentKit.Client.Common.Exceptions; + +/// +/// Exception when result is aborted +/// +public class ResultAbortedException : Exception + +{ + /// + /// Initializes a new instance of the with the specified error message + /// + /// The error message + public ResultAbortedException(string message) + : base(message) + { + } +} diff --git a/Client/src/Common/Submitter/BaseClientSubmitter.cs b/Client/src/Common/Submitter/BaseClientSubmitter.cs index 56d10215..b5602e86 100644 --- a/Client/src/Common/Submitter/BaseClientSubmitter.cs +++ b/Client/src/Common/Submitter/BaseClientSubmitter.cs @@ -1,13 +1,13 @@ // This file is part of the ArmoniK project -// +// // Copyright (C) ANEO, 2021-2023. All rights reserved. -// +// // Licensed under the Apache License, Version 2.0 (the "License") // you may not use this file except in compliance with the License. // You may obtain a copy of the License at -// +// // http://www.apache.org/licenses/LICENSE-2.0 -// +// // Unless required by applicable law or agreed to in writing, software // distributed under the License is distributed on an "AS IS" BASIS, // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. @@ -16,7 +16,6 @@ using System; using System.Collections.Generic; -using System.Data; using System.IO; using System.Linq; using System.Threading; @@ -25,6 +24,7 @@ using ArmoniK.Api.Client; using ArmoniK.Api.Common.Utils; using ArmoniK.Api.gRPC.V1; +using ArmoniK.Api.gRPC.V1.Events; using ArmoniK.Api.gRPC.V1.Results; using ArmoniK.Api.gRPC.V1.Sessions; using ArmoniK.Api.gRPC.V1.SortDirection; @@ -675,40 +675,15 @@ public byte[] GetResult(string taskId, ResultId = resultId, Session = SessionId.Id, }; - - Retry.WhileException(5, - 2000, - retry => - { - using var channel = ChannelPool.GetChannel(); - var submitterService = new Api.gRPC.V1.Submitter.Submitter.SubmitterClient(channel); - - Logger.LogDebug("Try {try} for {funcName}", - retry, - nameof(submitterService.WaitForAvailability)); - // TODO: replace with submitterService.TryGetResultStream() => Issue # - var availabilityReply = submitterService.WaitForAvailability(resultRequest, - cancellationToken: cancellationToken); - - switch (availabilityReply.TypeCase) - { - case AvailabilityReply.TypeOneofCase.None: - throw new Exception("Issue with Server !"); - case AvailabilityReply.TypeOneofCase.Ok: - break; - case AvailabilityReply.TypeOneofCase.Error: - throw new - ClientResultsException($"Result in Error - {resultId}\nMessage :\n{string.Join("Inner message:\n", availabilityReply.Error.Errors)}", - resultId); - case AvailabilityReply.TypeOneofCase.NotCompletedTask: - throw new DataException($"Result {resultId} was not yet completed"); - default: - throw new InvalidOperationException(); - } - }, - true, - typeof(IOException), - typeof(RpcException)); + using var channel = ChannelPool.GetChannel(); + var eventsClient = new Events.EventsClient(channel); + eventsClient.WaitForResultsAsync(SessionId.Id, + new List + { + resultId, + }, + cancellationToken) + .Wait(cancellationToken); return Retry.WhileException(5, 200, diff --git a/Client/src/Common/Submitter/EventsClientExt.cs b/Client/src/Common/Submitter/EventsClientExt.cs new file mode 100644 index 00000000..a8439a21 --- /dev/null +++ b/Client/src/Common/Submitter/EventsClientExt.cs @@ -0,0 +1,130 @@ +// This file is part of the ArmoniK project +// +// Copyright (C) ANEO, 2021-2023. All rights reserved. +// +// Licensed under the Apache License, Version 2.0 (the "License") +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +using System; +using System.Collections.Generic; +using System.Linq; +using System.Threading; +using System.Threading.Tasks; + +using ArmoniK.Api.gRPC.V1; +using ArmoniK.Api.gRPC.V1.Events; +using ArmoniK.Api.gRPC.V1.Results; +using ArmoniK.DevelopmentKit.Client.Common.Exceptions; + +using Grpc.Core; + +namespace ArmoniK.DevelopmentKit.Client.Common.Submitter; + +internal static class EventsClientExt +{ + private static FiltersAnd ResultsFilter(string resultId) + => new() + { + And = + { + new FilterField + { + Field = new ResultField + { + ResultRawField = new ResultRawField + { + Field = ResultRawEnumField.ResultId, + }, + }, + FilterString = new FilterString + { + Operator = FilterStringOperator.Equal, + Value = resultId, + }, + }, + }, + }; + + internal static async Task WaitForResultsAsync(this Events.EventsClient client, + string sessionId, + ICollection resultIds, + CancellationToken cancellationToken) + { + var resultsNotFound = new HashSet(resultIds); + while (resultsNotFound.Any()) + { + using var streamingCall = client.GetEvents(new EventSubscriptionRequest + { + SessionId = sessionId, + ReturnedEvents = + { + EventsEnum.ResultStatusUpdate, + EventsEnum.NewResult, + }, + ResultsFilters = new Filters + { + Or = + { + resultsNotFound.Select(ResultsFilter), + }, + }, + }, + cancellationToken: cancellationToken); + try + { + while (await streamingCall.ResponseStream.MoveNext(cancellationToken)) + { + var resp = streamingCall.ResponseStream.Current; + if (resp.UpdateCase == EventSubscriptionResponse.UpdateOneofCase.ResultStatusUpdate && resultsNotFound.Contains(resp.ResultStatusUpdate.ResultId)) + { + if (resp.ResultStatusUpdate.Status == ResultStatus.Completed) + { + resultsNotFound.Remove(resp.ResultStatusUpdate.ResultId); + if (!resultsNotFound.Any()) + { + break; + } + } + + if (resp.ResultStatusUpdate.Status == ResultStatus.Aborted) + { + throw new ResultAbortedException($"Result {resp.ResultStatusUpdate.ResultId} has been aborted"); + } + } + + if (resp.UpdateCase == EventSubscriptionResponse.UpdateOneofCase.NewResult && resultsNotFound.Contains(resp.NewResult.ResultId)) + { + if (resp.NewResult.Status == ResultStatus.Completed) + { + resultsNotFound.Remove(resp.NewResult.ResultId); + if (!resultsNotFound.Any()) + { + break; + } + } + + if (resp.NewResult.Status == ResultStatus.Aborted) + { + throw new ResultAbortedException($"Result {resp.NewResult.ResultId} has been aborted"); + } + } + } + } + catch (OperationCanceledException) + { + } + catch (RpcException) + { + } + } + } +}