Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Silo Metadata and Placement Filtering #9271

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions src/Orleans.Core.Abstractions/Manifest/GrainProperties.cs
Original file line number Diff line number Diff line change
Expand Up @@ -68,6 +68,11 @@ public static class WellKnownGrainTypeProperties
/// </summary>
public const string PlacementStrategy = "placement-strategy";

/// <summary>
/// The name of the placement strategy for grains of this type.
/// </summary>
public const string PlacementFilter = "placement-filter";

/// <summary>
/// The directory policy for grains of this type.
/// </summary>
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,29 @@
using System;
using System.Collections.Generic;
using Orleans.Metadata;
using Orleans.Runtime;

#nullable enable
namespace Orleans.Placement;

/// <summary>
/// Base for all placement filter marker attributes.
/// </summary>
[AttributeUsage(AttributeTargets.Class, AllowMultiple = true)]
public abstract class PlacementFilterAttribute : Attribute, IGrainPropertiesProviderAttribute
{
/// <summary>
/// Gets the placement filter strategy.
/// </summary>
public PlacementFilterStrategy PlacementFilterStrategy { get; private set; }

protected PlacementFilterAttribute(PlacementFilterStrategy placement)
{
ArgumentNullException.ThrowIfNull(placement);
PlacementFilterStrategy = placement;
}

/// <inheritdoc />
public virtual void Populate(IServiceProvider services, Type grainClass, GrainType grainType, Dictionary<string, string> properties)
=> PlacementFilterStrategy?.PopulateGrainProperties(services, grainClass, grainType, properties);
}
80 changes: 80 additions & 0 deletions src/Orleans.Core.Abstractions/Placement/PlacementFilterStrategy.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,80 @@
using System;
using System.Collections.Generic;
using System.Globalization;
using Orleans.Metadata;
using Orleans.Runtime;

#nullable enable
namespace Orleans.Placement;

/// <summary>
/// Represents a strategy for filtering silos which a grain can be placed on.
/// </summary>
public abstract class PlacementFilterStrategy
{
public int Order { get; private set; }

protected PlacementFilterStrategy(int order)
{
Order = order;
}

/// <summary>
/// Initializes an instance of this type using the provided grain properties.
/// </summary>
/// <param name="properties">
/// The grain properties.
/// </param>
public void Initialize(GrainProperties properties)
{
var orderProperty = GetPlacementFilterGrainProperty("order", properties);
if (!int.TryParse(orderProperty, out var parsedOrder))
{
throw new ArgumentException("Invalid order property value.");
}

Order = parsedOrder;

AdditionalInitialize(properties);
}

public virtual void AdditionalInitialize(GrainProperties properties)
{
}

/// <summary>
/// Populates grain properties to specify the preferred placement strategy.
/// </summary>
/// <param name="services">The service provider.</param>
/// <param name="grainClass">The grain class.</param>
/// <param name="grainType">The grain type.</param>
/// <param name="properties">The grain properties which will be populated by this method call.</param>
public void PopulateGrainProperties(IServiceProvider services, Type grainClass, GrainType grainType, Dictionary<string, string> properties)
{
var typeName = GetType().Name;
if (properties.TryGetValue(WellKnownGrainTypeProperties.PlacementFilter, out var existingValue))
{
properties[WellKnownGrainTypeProperties.PlacementFilter] = $"{existingValue},{typeName}";
}
else
{
properties[WellKnownGrainTypeProperties.PlacementFilter] = typeName;
}

properties[$"{WellKnownGrainTypeProperties.PlacementFilter}.{typeName}.order"] = Order.ToString(CultureInfo.InvariantCulture);

foreach (var additionalGrainProperty in GetAdditionalGrainProperties(services, grainClass, grainType, properties))
{
properties[$"{WellKnownGrainTypeProperties.PlacementFilter}.{typeName}.{additionalGrainProperty.Key}"] = additionalGrainProperty.Value;
}
}

protected string? GetPlacementFilterGrainProperty(string key, GrainProperties properties)
{
var typeName = GetType().Name;
return properties.Properties.TryGetValue($"{WellKnownGrainTypeProperties.PlacementFilter}.{typeName}.{key}", out var value) ? value : null;
}

protected virtual IEnumerable<KeyValuePair<string, string>> GetAdditionalGrainProperties(IServiceProvider services, Type grainClass, GrainType grainType, IReadOnlyDictionary<string, string> existingProperties)
=> Array.Empty<KeyValuePair<string, string>>();
}
11 changes: 11 additions & 0 deletions src/Orleans.Core/Placement/IPlacementFilterDirector.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
using System.Collections.Generic;
using Orleans.Runtime;
using Orleans.Runtime.Placement;

#nullable enable
namespace Orleans.Placement;

public interface IPlacementFilterDirector
{
IEnumerable<SiloAddress> Filter(PlacementFilterStrategy filterStrategy, PlacementFilterContext context, IEnumerable<SiloAddress> silos);
}
6 changes: 6 additions & 0 deletions src/Orleans.Core/Placement/PlacementFilterContext.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
using Orleans.Runtime;

#nullable enable
namespace Orleans.Placement;

public readonly record struct PlacementFilterContext(GrainType GrainType, GrainInterfaceType InterfaceType, ushort InterfaceVersion);
26 changes: 26 additions & 0 deletions src/Orleans.Core/Placement/PlacementFilterExtensions.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
using Microsoft.Extensions.DependencyInjection;

#nullable enable
namespace Orleans.Placement;

public static class PlacementFilterExtensions
{
/// <summary>
/// Configures a <typeparamref name="TFilter"/> for filtering candidate grain placements.
/// </summary>
/// <typeparam name="TFilter">The placement filter.</typeparam>
/// <typeparam name="TDirector">The placement filter director.</typeparam>
/// <param name="services">The service collection.</param>
/// <param name="strategyLifetime">The lifetime of the placement strategy.</param>
/// <returns>The service collection.</returns>
public static IServiceCollection AddPlacementFilter<TFilter, TDirector>(this IServiceCollection services, ServiceLifetime strategyLifetime)
where TFilter : PlacementFilterStrategy
where TDirector : class, IPlacementFilterDirector
{
services.Add(ServiceDescriptor.DescribeKeyed(typeof(PlacementFilterStrategy), typeof(TFilter).Name, typeof(TFilter), strategyLifetime));
services.AddKeyedSingleton<IPlacementFilterDirector, TDirector>(typeof(TFilter));

return services;
}

}
8 changes: 5 additions & 3 deletions src/Orleans.Core/Runtime/Constants.cs
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ internal static class Constants
public static readonly GrainType DirectoryCacheValidatorType = SystemTargetGrainId.CreateGrainType("dir.cache-validator");
public static readonly GrainType ClientDirectoryType = SystemTargetGrainId.CreateGrainType("dir.client");
public static readonly GrainType SiloControlType = SystemTargetGrainId.CreateGrainType("silo-control");
public static readonly GrainType SiloMetadataType = SystemTargetGrainId.CreateGrainType("silo-metadata");
public static readonly GrainType CatalogType = SystemTargetGrainId.CreateGrainType("catalog");
public static readonly GrainType MembershipServiceType = SystemTargetGrainId.CreateGrainType("clustering");
public static readonly GrainType SystemMembershipTableType = SystemTargetGrainId.CreateGrainType("clustering.dev");
Expand All @@ -27,8 +28,8 @@ internal static class Constants
public static readonly GrainType ActivationMigratorType = SystemTargetGrainId.CreateGrainType("migrator");
public static readonly GrainType ActivationRepartitionerType = SystemTargetGrainId.CreateGrainType("repartitioner");
public static readonly GrainType ActivationRebalancerMonitorType = SystemTargetGrainId.CreateGrainType("rebalancer-monitor");
public static readonly GrainType GrainDirectoryPartition = SystemTargetGrainId.CreateGrainType("dir.grain.part");
public static readonly GrainType GrainDirectory = SystemTargetGrainId.CreateGrainType("dir.grain");
public static readonly GrainType GrainDirectoryPartitionType = SystemTargetGrainId.CreateGrainType("dir.grain.part");
public static readonly GrainType GrainDirectoryType = SystemTargetGrainId.CreateGrainType("dir.grain");

public static readonly GrainId SiloDirectConnectionId = GrainId.Create(
GrainType.Create(GrainTypePrefix.SystemPrefix + "silo"),
Expand All @@ -41,6 +42,7 @@ internal static class Constants
{DirectoryServiceType, "DirectoryService"},
{DirectoryCacheValidatorType, "DirectoryCacheValidator"},
{SiloControlType, "SiloControl"},
{SiloMetadataType, "SiloMetadata"},
{ClientDirectoryType, "ClientDirectory"},
{CatalogType,"Catalog"},
{MembershipServiceType,"MembershipService"},
Expand All @@ -57,7 +59,7 @@ internal static class Constants
{ActivationMigratorType, "ActivationMigrator"},
{ActivationRepartitionerType, "ActivationRepartitioner"},
{ActivationRebalancerMonitorType, "ActivationRebalancerMonitor"},
{GrainDirectory, "GrainDirectory"},
{GrainDirectoryType, "GrainDirectory"},
}.ToFrozenDictionary();

public static string SystemTargetName(GrainType id) => SingletonSystemTargetNames.TryGetValue(id, out var name) ? name : id.ToString();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -84,7 +84,7 @@ public DistributedGrainDirectory(
ILocalSiloDetails localSiloDetails,
ILoggerFactory loggerFactory,
IServiceProvider serviceProvider,
IInternalGrainFactory grainFactory) : base(Constants.GrainDirectory, localSiloDetails.SiloAddress, loggerFactory)
IInternalGrainFactory grainFactory) : base(Constants.GrainDirectoryType, localSiloDetails.SiloAddress, loggerFactory)
{
_serviceProvider = serviceProvider;
_membershipService = membershipService;
Expand Down
4 changes: 2 additions & 2 deletions src/Orleans.Runtime/GrainDirectory/GrainDirectoryPartition.cs
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ internal sealed partial class GrainDirectoryPartition(
IInternalGrainFactory grainFactory)
: SystemTarget(CreateGrainId(localSiloDetails.SiloAddress, partitionIndex), localSiloDetails.SiloAddress, loggerFactory), IGrainDirectoryPartition, IGrainDirectoryTestHooks
{
internal static SystemTargetGrainId CreateGrainId(SiloAddress siloAddress, int partitionIndex) => SystemTargetGrainId.Create(Constants.GrainDirectoryPartition, siloAddress, partitionIndex.ToString(CultureInfo.InvariantCulture));
internal static SystemTargetGrainId CreateGrainId(SiloAddress siloAddress, int partitionIndex) => SystemTargetGrainId.Create(Constants.GrainDirectoryPartitionType, siloAddress, partitionIndex.ToString(CultureInfo.InvariantCulture));
private readonly Dictionary<GrainId, GrainAddress> _directory = [];
private readonly int _partitionIndex = partitionIndex;
private readonly DistributedGrainDirectory _owner = owner;
Expand Down Expand Up @@ -665,7 +665,7 @@ private async IAsyncEnumerable<List<GrainAddress>> GetRegisteredActivations(Dire
async Task<List<GrainAddress>> GetRegisteredActivationsFromClusterMember(MembershipVersion version, RingRange range, SiloAddress siloAddress, bool isValidation)
{
var stopwatch = ValueStopwatch.StartNew();
var client = _grainFactory.GetSystemTarget<IGrainDirectoryClient>(Constants.GrainDirectory, siloAddress);
var client = _grainFactory.GetSystemTarget<IGrainDirectoryClient>(Constants.GrainDirectoryType, siloAddress);
var result = await InvokeOnClusterMember(
siloAddress,
async () =>
Expand Down
7 changes: 5 additions & 2 deletions src/Orleans.Runtime/Hosting/DefaultSiloServices.cs
Original file line number Diff line number Diff line change
Expand Up @@ -43,8 +43,7 @@
using Orleans.Serialization.Internal;
using Orleans.Core;
using Orleans.Placement.Repartitioning;
using Orleans.GrainDirectory;
using Orleans.Runtime.Hosting;
using Orleans.Runtime.Placement.Filtering;

namespace Orleans.Hosting
{
Expand Down Expand Up @@ -206,6 +205,10 @@ internal static void AddDefaultServices(ISiloBuilder builder)
// Configure the default placement strategy.
services.TryAddSingleton<PlacementStrategy, RandomPlacement>();

// Placement filters
services.AddSingleton<PlacementFilterStrategyResolver>();
services.AddSingleton<PlacementFilterDirectorResolver>();

// Placement directors
services.AddPlacementDirector<RandomPlacement, RandomPlacementDirector>();
services.AddPlacementDirector<PreferLocalPlacement, PreferLocalPlacementDirector>();
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
#nullable enable

namespace Orleans.Runtime.MembershipService.SiloMetadata;

public interface ISiloMetadataCache
{
SiloMetadata GetSiloMetadata(SiloAddress siloAddress);
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
using System.Threading.Tasks;

#nullable enable
namespace Orleans.Runtime.MembershipService.SiloMetadata;

internal interface ISiloMetadataClient
{
Task<SiloMetadata> GetSiloMetadata(SiloAddress siloAddress);
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
using System.Threading.Tasks;

#nullable enable
namespace Orleans.Runtime.MembershipService.SiloMetadata;

[Alias("Orleans.Runtime.MembershipService.SiloMetadata.ISiloMetadataSystemTarget")]
internal interface ISiloMetadataSystemTarget : ISystemTarget
{
[Alias("GetSiloMetadata")]
Task<SiloMetadata> GetSiloMetadata();
}
105 changes: 105 additions & 0 deletions src/Orleans.Runtime/MembershipService/SiloMetadata/SiloMetadaCache.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,105 @@
using System;
using System.Collections.Concurrent;
using System.Collections.Generic;
using System.Linq;
using System.Threading;
using System.Threading.Tasks;
using Microsoft.Extensions.Logging;

#nullable enable
namespace Orleans.Runtime.MembershipService.SiloMetadata;

internal class SiloMetadataCache(
ISiloMetadataClient siloMetadataClient,
MembershipTableManager membershipTableManager,
ILogger<SiloMetadataCache> logger)
: ISiloMetadataCache, ILifecycleParticipant<ISiloLifecycle>, IDisposable
{
private readonly ConcurrentDictionary<SiloAddress, SiloMetadata> _metadata = new();
private readonly CancellationTokenSource _cts = new();

void ILifecycleParticipant<ISiloLifecycle>.Participate(ISiloLifecycle lifecycle)
{
Task? task = null;
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This previous code was copied from DistributedGrainDirectory. Should there be a follow up issue to update usages of the prior pattern with this one?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, we should clean up the other instances.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Created #9360

Task OnStart(CancellationToken _)
{
task = Task.Run(() => this.ProcessMembershipUpdates(_cts.Token));
return Task.CompletedTask;
}

async Task OnStop(CancellationToken ct)
{
await _cts.CancelAsync().ConfigureAwait(ConfigureAwaitOptions.SuppressThrowing);
if (task is not null)
{
await task.WaitAsync(ct).ConfigureAwait(ConfigureAwaitOptions.SuppressThrowing);
}
}

lifecycle.Subscribe(
nameof(ClusterMembershipService),
ServiceLifecycleStage.RuntimeServices,
OnStart,
OnStop);
}

private async Task ProcessMembershipUpdates(CancellationToken ct)
{
try
{
if (logger.IsEnabled(LogLevel.Debug)) logger.LogDebug("Starting to process membership updates.");
await foreach (var update in membershipTableManager.MembershipTableUpdates.WithCancellation(ct))
{
// Add entries for members that aren't already in the cache
foreach (var membershipEntry in update.Entries.Where(e => e.Value.Status is SiloStatus.Active or SiloStatus.Joining))
{
if (!_metadata.ContainsKey(membershipEntry.Key))
{
try
{
var metadata = await siloMetadataClient.GetSiloMetadata(membershipEntry.Key).WaitAsync(ct);
_metadata.TryAdd(membershipEntry.Key, metadata);
}
catch(Exception exception)
{
logger.LogError(exception, "Error fetching metadata for silo {Silo}", membershipEntry.Key);
}
}
}

// Remove entries for members that are now dead
foreach (var membershipEntry in update.Entries.Where(e => e.Value.Status == SiloStatus.Dead))
{
_metadata.TryRemove(membershipEntry.Key, out _);
}

// Remove entries for members that are no longer in the table
foreach (var silo in _metadata.Keys.ToList())
{
if (!update.Entries.ContainsKey(silo))
{
_metadata.TryRemove(silo, out _);
}
}
}
}
catch (OperationCanceledException) when (ct.IsCancellationRequested)
{
// Ignore and continue shutting down.
}
catch (Exception exception)
{
logger.LogError(exception, "Error processing membership updates");
}
finally
{
if (logger.IsEnabled(LogLevel.Debug)) logger.LogDebug("Stopping membership update processor");
}
}

public SiloMetadata GetSiloMetadata(SiloAddress siloAddress) => _metadata.GetValueOrDefault(siloAddress) ?? SiloMetadata.Empty;

public void SetMetadata(SiloAddress siloAddress, SiloMetadata metadata) => _metadata.TryAdd(siloAddress, metadata);

public void Dispose() => _cts.Cancel();
}
Loading
Loading