Skip to content

Commit

Permalink
fix: properly implement IHealthCheckProvider.Check in MongoDB adapter (
Browse files Browse the repository at this point in the history
…#825)

# Motivation

Give more meaningful information when healthcheck fails due to issues
with checks on the mongodb tables.

# Description

- Put a meaningful description in HealthCheckResult.Unhealthy() from the
Mongo Adapter
- Reimplement Mongo Adapter Checks to call for the check from their
dependencies, thus knowing what went wrong.

# Testing

- Unit tests and CI tests perform with sucess (locally too).
- Calling HeathChecksService should succeed. The following command can
be used.

```bash
docker run --net armonik_network fullstorydev/grpcurl -plaintext armonik.control.submitter:1080 armonik.api.grpc.v1.health_checks.HealthChecksService.
CheckHealth
``` 

Output should look like:

```json
{
  "services": [
    {
      "name": "database",
      "healthy": "HEALTH_STATUS_ENUM_HEALTHY"
    },
    {
      "name": "object",
      "healthy": "HEALTH_STATUS_ENUM_HEALTHY"
    },
    {
      "name": "queue",
      "healthy": "HEALTH_STATUS_ENUM_HEALTHY"
    }
  ]
}
```

# Impact

- Easier to debug when there is an issue.


# Checklist

- [x] My code adheres to the coding and style guidelines of the project.
- [x] I have performed a self-review of my code.
- [ ] I have commented my code, particularly in hard-to-understand
areas.
- [x] I have made corresponding changes to the documentation.
- [x] I have thoroughly tested my modifications and added tests when
necessary.
- [x] Tests pass locally and in the CI.
- [x] I have assessed the performance impact of my modifications.
  • Loading branch information
aneojgurhem authored Jan 24, 2025
2 parents 42177b0 + 32d6893 commit 85821fb
Show file tree
Hide file tree
Showing 11 changed files with 205 additions and 85 deletions.
19 changes: 15 additions & 4 deletions Adaptors/MongoDB/src/AuthenticationTable.cs
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
using ArmoniK.Core.Adapters.MongoDB.Table.DataModel.Auth;
using ArmoniK.Core.Base.DataStructures;
using ArmoniK.Core.Common.Auth.Authentication;
using ArmoniK.Core.Utils;

using JetBrains.Annotations;

Expand Down Expand Up @@ -137,10 +138,20 @@ public void AddCertificates(IEnumerable<AuthData> certificates)
}

/// <inheritdoc />
public Task<HealthCheckResult> Check(HealthCheckTag tag)
=> Task.FromResult(isInitialized_
? HealthCheckResult.Healthy()
: HealthCheckResult.Unhealthy());
public async Task<HealthCheckResult> Check(HealthCheckTag tag)
{
var result = await HealthCheckResultCombiner.Combine(tag,
$"{nameof(AuthenticationTable)} is not initialized",
sessionProvider_,
userCollectionProvider_,
authCollectionProvider_,
roleCollectionProvider_)
.ConfigureAwait(false);

return isInitialized_ && result.Status == HealthStatus.Healthy
? HealthCheckResult.Healthy()
: HealthCheckResult.Unhealthy(result.Description);
}

/// <inheritdoc />
public async Task Init(CancellationToken cancellationToken)
Expand Down
7 changes: 4 additions & 3 deletions Adaptors/MongoDB/src/Common/MongoCollectionProvider.cs
Original file line number Diff line number Diff line change
Expand Up @@ -67,10 +67,11 @@ public Task<HealthCheckResult> Check(HealthCheckTag tag)
{
HealthCheckTag.Startup or HealthCheckTag.Readiness => Task.FromResult(isInitialized_
? HealthCheckResult.Healthy()
: HealthCheckResult.Unhealthy("MongoCollection not initialized yet.")),
HealthCheckTag.Liveness => Task.FromResult(isInitialized_ && mongoCollection_ is null
: HealthCheckResult
.Unhealthy($"Mongo Collection<{typeof(TData)}> not initialized yet.")),
HealthCheckTag.Liveness => Task.FromResult(isInitialized_ && mongoCollection_ is not null
? HealthCheckResult.Healthy()
: HealthCheckResult.Unhealthy("MongoCollection not initialized yet.")),
: HealthCheckResult.Unhealthy($"Mongo Collection<{typeof(TData)}> not initialized yet.")),
_ => throw new ArgumentOutOfRangeException(nameof(tag),
tag,
null),
Expand Down
19 changes: 15 additions & 4 deletions Adaptors/MongoDB/src/PartitionTable.cs
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@
using ArmoniK.Core.Base.DataStructures;
using ArmoniK.Core.Common.Exceptions;
using ArmoniK.Core.Common.Storage;
using ArmoniK.Core.Utils;

using Microsoft.Extensions.Diagnostics.HealthChecks;
using Microsoft.Extensions.Logging;
Expand Down Expand Up @@ -59,11 +60,21 @@ public PartitionTable(SessionProvider
activitySource_ = activitySource;
}

public Task<HealthCheckResult> Check(HealthCheckTag tag)
=> Task.FromResult(isInitialized_
? HealthCheckResult.Healthy()
: HealthCheckResult.Unhealthy());
/// <inheritdoc />
public async Task<HealthCheckResult> Check(HealthCheckTag tag)
{
var result = await HealthCheckResultCombiner.Combine(tag,
$"{nameof(PartitionTable)} is not initialized",
sessionProvider_,
partitionCollectionProvider_)
.ConfigureAwait(false);

return isInitialized_ && result.Status == HealthStatus.Healthy
? HealthCheckResult.Healthy()
: HealthCheckResult.Unhealthy(result.Description);
}

/// <inheritdoc />
public async Task Init(CancellationToken cancellationToken)
{
if (!isInitialized_)
Expand Down
17 changes: 13 additions & 4 deletions Adaptors/MongoDB/src/ResultTable.cs
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@
using ArmoniK.Core.Base.Exceptions;
using ArmoniK.Core.Common.Exceptions;
using ArmoniK.Core.Common.Storage;
using ArmoniK.Core.Utils;
using ArmoniK.Utils;

using Microsoft.Extensions.Diagnostics.HealthChecks;
Expand Down Expand Up @@ -391,8 +392,16 @@ await resultCollectionProvider_.Init(cancellationToken)
public ILogger Logger { get; }

/// <inheritdoc />
public Task<HealthCheckResult> Check(HealthCheckTag tag)
=> Task.FromResult(isInitialized_
? HealthCheckResult.Healthy()
: HealthCheckResult.Unhealthy());
public async Task<HealthCheckResult> Check(HealthCheckTag tag)
{
var result = await HealthCheckResultCombiner.Combine(tag,
$"{nameof(ResultTable)} is not initialized",
sessionProvider_,
resultCollectionProvider_)
.ConfigureAwait(false);

return isInitialized_ && result.Status == HealthStatus.Healthy
? HealthCheckResult.Healthy()
: HealthCheckResult.Unhealthy(result.Description);
}
}
17 changes: 13 additions & 4 deletions Adaptors/MongoDB/src/ResultWatcher.cs
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
using ArmoniK.Core.Base.DataStructures;
using ArmoniK.Core.Common.Storage;
using ArmoniK.Core.Common.Storage.Events;
using ArmoniK.Core.Utils;

using Microsoft.Extensions.Diagnostics.HealthChecks;
using Microsoft.Extensions.Logging;
Expand Down Expand Up @@ -65,10 +66,18 @@ public ResultWatcher(SessionProvider ses
}

/// <inheritdoc />
public Task<HealthCheckResult> Check(HealthCheckTag tag)
=> Task.FromResult(isInitialized_
? HealthCheckResult.Healthy()
: HealthCheckResult.Unhealthy());
public async Task<HealthCheckResult> Check(HealthCheckTag tag)
{
var result = await HealthCheckResultCombiner.Combine(tag,
$"{nameof(ResultWatcher)} is not initialized",
sessionProvider_,
resultCollectionProvider_)
.ConfigureAwait(false);

return isInitialized_ && result.Status == HealthStatus.Healthy
? HealthCheckResult.Healthy()
: HealthCheckResult.Unhealthy(result.Description);
}

/// <inheritdoc />
public async Task Init(CancellationToken cancellationToken)
Expand Down
17 changes: 13 additions & 4 deletions Adaptors/MongoDB/src/SessionTable.cs
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
using ArmoniK.Core.Adapters.MongoDB.Table.DataModel;
using ArmoniK.Core.Base.DataStructures;
using ArmoniK.Core.Common.Storage;
using ArmoniK.Core.Utils;
using ArmoniK.Utils;

using JetBrains.Annotations;
Expand Down Expand Up @@ -179,10 +180,18 @@ await sessionCollectionProvider_.Init(cancellationToken)
}

/// <inheritdoc />
public Task<HealthCheckResult> Check(HealthCheckTag tag)
=> Task.FromResult(isInitialized_
? HealthCheckResult.Healthy()
: HealthCheckResult.Unhealthy());
public async Task<HealthCheckResult> Check(HealthCheckTag tag)
{
var result = await HealthCheckResultCombiner.Combine(tag,
$"{nameof(SessionTable)} is not initialized",
sessionProvider_,
sessionCollectionProvider_)
.ConfigureAwait(false);

return isInitialized_ && result.Status == HealthStatus.Healthy
? HealthCheckResult.Healthy()
: HealthCheckResult.Unhealthy(result.Description);
}

/// <inheritdoc />
public async Task<SessionData?> UpdateOneSessionAsync(string sessionId,
Expand Down
17 changes: 13 additions & 4 deletions Adaptors/MongoDB/src/TaskTable.cs
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@
using ArmoniK.Core.Base.Exceptions;
using ArmoniK.Core.Common.Exceptions;
using ArmoniK.Core.Common.Storage;
using ArmoniK.Core.Utils;
using ArmoniK.Utils;

using Microsoft.Extensions.Diagnostics.HealthChecks;
Expand Down Expand Up @@ -435,10 +436,18 @@ await taskCollection.UpdateManyAsync(data => taskIds.Contains(data.TaskId),
public ILogger Logger { get; }

/// <inheritdoc />
public Task<HealthCheckResult> Check(HealthCheckTag tag)
=> Task.FromResult(isInitialized_
? HealthCheckResult.Healthy()
: HealthCheckResult.Unhealthy());
public async Task<HealthCheckResult> Check(HealthCheckTag tag)
{
var result = await HealthCheckResultCombiner.Combine(tag,
$"{nameof(TaskTable)} is not initialized",
sessionProvider_,
taskCollectionProvider_)
.ConfigureAwait(false);

return isInitialized_ && result.Status == HealthStatus.Healthy
? HealthCheckResult.Healthy()
: HealthCheckResult.Unhealthy(result.Description);
}

/// <inheritdoc />
public async Task Init(CancellationToken cancellationToken)
Expand Down
17 changes: 13 additions & 4 deletions Adaptors/MongoDB/src/TaskWatcher.cs
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
using ArmoniK.Core.Base.DataStructures;
using ArmoniK.Core.Common.Storage;
using ArmoniK.Core.Common.Storage.Events;
using ArmoniK.Core.Utils;

using Microsoft.Extensions.Diagnostics.HealthChecks;
using Microsoft.Extensions.Logging;
Expand Down Expand Up @@ -65,10 +66,18 @@ public TaskWatcher(SessionProvider sessi
}

/// <inheritdoc />
public Task<HealthCheckResult> Check(HealthCheckTag tag)
=> Task.FromResult(isInitialized_
? HealthCheckResult.Healthy()
: HealthCheckResult.Unhealthy());
public async Task<HealthCheckResult> Check(HealthCheckTag tag)
{
var result = await HealthCheckResultCombiner.Combine(tag,
$"{nameof(TaskWatcher)} is not initialized",
sessionProvider_,
taskCollectionProvider_)
.ConfigureAwait(false);

return isInitialized_ && result.Status == HealthStatus.Healthy
? HealthCheckResult.Healthy()
: HealthCheckResult.Unhealthy(result.Description);
}

/// <inheritdoc />
public async Task Init(CancellationToken cancellationToken)
Expand Down
65 changes: 14 additions & 51 deletions Common/src/Pollster/Pollster.cs
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,6 @@
using System.Diagnostics.CodeAnalysis;
using System.Diagnostics.Metrics;
using System.Linq;
using System.Text;
using System.Threading;
using System.Threading.Tasks;

Expand All @@ -36,6 +35,7 @@
using ArmoniK.Core.Common.Storage;
using ArmoniK.Core.Common.Stream.Worker;
using ArmoniK.Core.Common.Utils;
using ArmoniK.Core.Utils;
using ArmoniK.Utils;

using Grpc.Core;
Expand Down Expand Up @@ -180,56 +180,19 @@ public async Task<HealthCheckResult> Check(HealthCheckTag tag)
return HealthCheckResult.Unhealthy("End of main loop reached, no more tasks will be executed.");
}

var checks = new List<Task<HealthCheckResult>>
{
pullQueueStorage_.Check(tag),
dataPrefetcher_.Check(tag),
workerStreamHandler_.Check(tag),
objectStorage_.Check(tag),
resultTable_.Check(tag),
sessionTable_.Check(tag),
taskTable_.Check(tag),
};

var exceptions = new List<Exception>();
var data = new Dictionary<string, object>();
var description = new StringBuilder();
var worstStatus = HealthStatus.Healthy;

foreach (var healthCheckResult in await checks.WhenAll()
.ConfigureAwait(false))
{
if (healthCheckResult.Status == HealthStatus.Healthy)
{
continue;
}

if (healthCheckResult.Exception is not null)
{
exceptions.Add(healthCheckResult.Exception);
}

foreach (var (key, value) in healthCheckResult.Data)
{
data[key] = value;
}

if (healthCheckResult.Description is not null)
{
description.AppendLine(healthCheckResult.Description);
}

worstStatus = worstStatus < healthCheckResult.Status
? worstStatus
: healthCheckResult.Status;
}

var result = new HealthCheckResult(worstStatus,
description.ToString(),
new AggregateException(exceptions),
data);

if (worstStatus == HealthStatus.Unhealthy && tag == HealthCheckTag.Liveness)
// no need for description because this check is registered as the agent health check and it will add proper metadata.
var result = await HealthCheckResultCombiner.Combine(tag,
string.Empty,
pullQueueStorage_,
dataPrefetcher_,
workerStreamHandler_,
objectStorage_,
resultTable_,
sessionTable_,
taskTable_)
.ConfigureAwait(false);

if (result.Status == HealthStatus.Unhealthy && tag == HealthCheckTag.Liveness)
{
healthCheckFailedResult_ = result;
}
Expand Down
5 changes: 2 additions & 3 deletions Common/tests/Pollster/PollsterTest.cs
Original file line number Diff line number Diff line change
Expand Up @@ -356,9 +356,8 @@ await testServiceProvider.Pollster.Init(CancellationToken.None)

Console.WriteLine(res.Description);

Assert.AreEqual(new StringBuilder().AppendLine(desc)
.ToString(),
healthResult.Description);
Assert.AreEqual(desc,
healthResult.Description?.Trim());
Assert.AreEqual(new AggregateException(ex).Message,
healthResult.Exception?.Message);
Assert.AreEqual(HealthStatus.Unhealthy,
Expand Down
Loading

0 comments on commit 85821fb

Please sign in to comment.