Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Silo metadata filtering #1

Draft
wants to merge 7 commits into
base: main
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions src/Orleans.Core.Abstractions/Manifest/GrainProperties.cs
Original file line number Diff line number Diff line change
Expand Up @@ -68,6 +68,11 @@ public static class WellKnownGrainTypeProperties
/// </summary>
public const string PlacementStrategy = "placement-strategy";

/// <summary>
/// The name of the placement strategy for grains of this type.
/// </summary>
public const string PlacementFilter = "placement-filter";

/// <summary>
/// The directory policy for grains of this type.
/// </summary>
Expand Down
12 changes: 12 additions & 0 deletions src/Orleans.Core.Abstractions/Placement/PlacementAttribute.cs
Original file line number Diff line number Diff line change
Expand Up @@ -113,6 +113,18 @@ public ResourceOptimizedPlacementAttribute() :
{ }
}

/// <summary>
/// Marks a grain class as using the <see cref="SiloMetadataPlacement"/> policy.
/// </summary>
/// <inheritdoc cref="SiloMetadataPlacement"/>
[AttributeUsage(AttributeTargets.Class, AllowMultiple = false)]
public sealed class SiloMetadataPlacementAttribute : PlacementAttribute
{
public SiloMetadataPlacementAttribute() :
base(SiloMetadataPlacement.Singleton)
{ }
}

/// <summary>
/// Ensures that activations of this grain type will not be migrated automatically.
/// </summary>
Expand Down
14 changes: 14 additions & 0 deletions src/Orleans.Core.Abstractions/Placement/SiloMetadataPlacement.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@
namespace Orleans.Runtime;

/// <summary>
/// A placement strategy which prefers placement on silos with matching metadata.
/// </summary>
/// <remarks>
/// <para>TODO: fill out</para>
/// <para>Silos which are overloaded by definition of the load shedding mechanism are not considered as candidates for new placements.</para>
/// <para><i>This placement strategy is configured by adding the <see cref="Placement.SiloMetadataPlacementAttribute"/> attribute to a grain.</i></para>
/// </remarks>
public sealed class SiloMetadataPlacement : PlacementStrategy
{
internal static readonly SiloMetadataPlacement Singleton = new();
}
12 changes: 12 additions & 0 deletions src/Orleans.Runtime/Hosting/DefaultSiloServices.cs
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,9 @@
using Orleans.Core;
using Orleans.Placement.Repartitioning;
using Orleans.GrainDirectory;
using Orleans.Placement;
using Orleans.Runtime.Hosting;
using Orleans.Runtime.MembershipService.SiloMetadata;

namespace Orleans.Hosting
{
Expand Down Expand Up @@ -201,6 +203,9 @@ internal static void AddDefaultServices(ISiloBuilder builder)
services.AddSingleton<PlacementService>();
services.AddSingleton<PlacementStrategyResolver>();
services.AddSingleton<PlacementDirectorResolver>();
services.AddSingleton<PlacementFilterResolver>();
services.AddSingleton<FilterDirectorResolver>();
services.AddSingleton<SiloMetadataCache>();
services.AddSingleton<IPlacementStrategyResolver, ClientObserverPlacementStrategyResolver>();

// Configure the default placement strategy.
Expand All @@ -215,6 +220,13 @@ internal static void AddDefaultServices(ISiloBuilder builder)
services.AddPlacementDirector<ClientObserversPlacement, ClientObserversPlacementDirector>();
services.AddPlacementDirector<SiloRoleBasedPlacement, SiloRoleBasedPlacementDirector>();
services.AddPlacementDirector<ResourceOptimizedPlacement, ResourceOptimizedPlacementDirector>();
services.AddSingleton<ResourceOptimizedPlacementLogic>();
services.AddPlacementDirector<SiloMetadataPlacement, SiloMetadataPlacementDirector>();
services.AddSingleton<SiloStatisticsCache>();

// Placement filters
services.AddPlacementFilter<PreferredSiloMetadataPlacementFilter, PreferredSiloMetadataFilterDirector>(ServiceLifetime.Transient);
services.AddPlacementFilter<RequiredSiloMetadataPlacementFilter, RequiredSiloMetadataFilterDirector>(ServiceLifetime.Transient);

// Versioning
services.TryAddSingleton<VersionSelectorManager>();
Expand Down
17 changes: 17 additions & 0 deletions src/Orleans.Runtime/Hosting/PlacementStrategyExtensions.cs
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
using System;
using Microsoft.Extensions.DependencyInjection;
using Orleans.Placement;
using Orleans.Runtime;
using Orleans.Runtime.Placement;

Expand Down Expand Up @@ -64,6 +65,22 @@ public static void AddPlacementDirector<TStrategy, TDirector>(this IServiceColle
services.AddKeyedSingleton<IPlacementDirector, TDirector>(typeof(TStrategy));
}


/// <summary>
/// Configures a <typeparamref name="TFilter"/> for filtering candidate grain placements.
/// </summary>
/// <typeparam name="TFilter">The placement filter.</typeparam>
/// <param name="services">The service collection.</param>
/// <param name="strategyLifetime">The lifetime of the placement strategy.</param>
/// <returns>The service collection.</returns>
public static void AddPlacementFilter<TFilter, TDirector>(this IServiceCollection services, ServiceLifetime strategyLifetime)
where TFilter : PlacementFilter
where TDirector : class, IFilterDirector
{
services.Add(ServiceDescriptor.DescribeKeyed(typeof(PlacementFilter), typeof(TFilter).Name, typeof(TFilter), strategyLifetime));
services.AddKeyedSingleton<IFilterDirector, TDirector>(typeof(TFilter));
}

/// <summary>
/// Adds a placement director.
/// </summary>
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,72 @@
using System;
using System.Collections.Concurrent;
using System.Collections.Generic;
using System.Linq;
using System.Threading;
using System.Threading.Tasks;
using Microsoft.Extensions.Logging;
using Microsoft.Extensions.Options;

namespace Orleans.Runtime.MembershipService.SiloMetadata;

public class SiloMetadataCacheOptions
{
public TimeSpan CachePurgeInterval { get; set; } = TimeSpan.FromMinutes(10);
}

public class SiloMetadataCache : IDisposable
{
private readonly IClusterClient _clusterClient;
private readonly ILogger<SiloMetadataCache> _logger;
private readonly IOptions<SiloMetadataCacheOptions> _cacheOptions;
private readonly ConcurrentDictionary<SiloAddress, SiloMetadata> _metadata = new();
private readonly CancellationTokenSource _cts = new();
private PeriodicTimer _pt;

public SiloMetadataCache(IClusterClient clusterClient, ILogger<SiloMetadataCache> logger, IOptions<SiloMetadataCacheOptions> cacheOptions)
{
_clusterClient = clusterClient;
_logger = logger;
_cacheOptions = cacheOptions;
PurgeMetadataCache().Ignore();
}

private async Task PurgeMetadataCache()
{
_pt = new PeriodicTimer(_cacheOptions.Value.CachePurgeInterval);
while (!_cts.IsCancellationRequested)
{
await _pt.WaitForNextTickAsync(_cts.Token);

Dictionary<SiloAddress, SiloStatus> activeSilos;
try
{
var managementClient = _clusterClient.GetGrain<IManagementGrain>(0);
activeSilos = await managementClient.GetHosts(true);
}
catch(Exception ex)
{
_logger.LogWarning(ex, "Failed to get active silos to purge metadata cache. This won't impact the data correctness but there may be extra metadata in memory from defunct silos.");
continue;
}

foreach (var silo in _metadata.Keys.ToList())
{
if (!activeSilos.ContainsKey(silo))
{
_metadata.TryRemove(silo, out _);
}
}
}
}

public SiloMetadata GetMetadata(SiloAddress siloAddress) => _metadata.GetValueOrDefault(siloAddress);

public void SetMetadata(SiloAddress siloAddress, SiloMetadata metadata) => _metadata.TryAdd(siloAddress, metadata);

public void Dispose()
{
_cts.Cancel();
_pt.Dispose();
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,40 @@
using System;
using System.Threading.Tasks;
using Orleans.Runtime.Services;
using Orleans.Services;

namespace Orleans.Runtime.MembershipService.SiloMetadata;

public interface ISiloMetadataClient : IGrainServiceClient<ISiloMetadataGrainService>
{
public ValueTask<SiloMetadata> GetSiloMetadata(SiloAddress siloAddress);
}

public class SiloMetadataClient : GrainServiceClient<ISiloMetadataGrainService>, ISiloMetadataClient
{
private readonly SiloMetadataCache _cache;

public SiloMetadataClient(SiloMetadataCache cache, IServiceProvider serviceProvider) : base(serviceProvider)
{
_cache = cache;
}

public ValueTask<SiloMetadata> GetSiloMetadata(SiloAddress siloAddress)
{
var cached = _cache.GetMetadata(siloAddress);
if (cached is not null)
{
return ValueTask.FromResult(cached);
}

return new ValueTask<SiloMetadata>(SlowGetSiloMetadata(siloAddress));
}

private async Task<SiloMetadata> SlowGetSiloMetadata(SiloAddress siloAddress)
{
var grainService = GetGrainService(siloAddress);
var metadata = await grainService.GetSiloMetadata();
_cache.SetMetadata(siloAddress, metadata);
return metadata;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,39 @@
using System.Collections.Generic;
using System.Threading.Tasks;
using Microsoft.Extensions.Logging;
using Microsoft.Extensions.Options;
using Orleans.Services;

namespace Orleans.Runtime.MembershipService.SiloMetadata;

[GenerateSerializer]
public record SiloMetadata
{
[Id(0)]
public Dictionary<string, string> Metadata { get; init; } = new();
}

public interface ISiloMetadataGrainService : IGrainService
{
Task<SiloMetadata> GetSiloMetadata();
}

public class SiloMetadataGrainService : GrainService, ISiloMetadataGrainService
{
private readonly SiloMetadata _siloMetadata;

public SiloMetadataGrainService(IOptions<SiloMetadata> siloMetadata) : base()
{
_siloMetadata = siloMetadata.Value;
}

public SiloMetadataGrainService(IOptions<SiloMetadata> siloMetadata, GrainId grainId, Silo silo, ILoggerFactory loggerFactory) : base(grainId, silo, loggerFactory)
{
_siloMetadata = siloMetadata.Value;
}

public Task<SiloMetadata> GetSiloMetadata()
{
return Task.FromResult(_siloMetadata);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,56 @@
using System.Collections.Generic;
using Microsoft.Extensions.Configuration;
using Microsoft.Extensions.DependencyInjection;
using Orleans.Hosting;

namespace Orleans.Runtime.MembershipService.SiloMetadata;

public static class SiloMetadataHostingExtensions
{
public static ISiloBuilder UseSiloMetadata(this ISiloBuilder builder)
{
return builder.UseSiloMetadata(builder.Configuration);
}

public static ISiloBuilder UseSiloMetadata(this ISiloBuilder builder, IConfiguration configuration)
{
// Get the ORLEANS__METADATA section from config
// Key/value pairs in configuration as a Dictionary <string, string> will look like this as environment variables:
// ORLEANS__METADATA__key1=value1
var metadataConfigSection = builder.Configuration.GetSection("ORLEANS").GetSection("METADATA");

return builder.UseSiloMetadata(metadataConfigSection);
}

public static ISiloBuilder UseSiloMetadata(this ISiloBuilder builder, IConfigurationSection configurationSection)
{
var dictionary = configurationSection.Get<Dictionary<string, string>>();

return builder.UseSiloMetadata(dictionary ?? new Dictionary<string, string>());
}

public static ISiloBuilder UseSiloMetadata(this ISiloBuilder builder, Dictionary<string, string> metadata)
{
builder.ConfigureServices(services =>
{
services
.AddOptionsWithValidateOnStart<global::Orleans.Runtime.MembershipService.SiloMetadata.SiloMetadata>()
.Configure(m =>
{
foreach (var data in metadata)
{
m.Metadata[data.Key] = data.Value;
}
});

services.AddOptionsWithValidateOnStart<SiloMetadataCacheOptions>();

services
.AddSingleton<SiloMetadataCache>()
.AddGrainService<SiloMetadataGrainService>()
.AddSingleton<ISiloMetadataClient, SiloMetadataClient>()
;
});
return builder;
}
}
20 changes: 20 additions & 0 deletions src/Orleans.Runtime/Placement/FilterDirectorResolver.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
using System;
using Microsoft.Extensions.DependencyInjection;
using Orleans.Placement;

namespace Orleans.Runtime.Placement;

/// <summary>
/// Responsible for resolving an <see cref="IFilterDirector"/> for a <see cref="PlacementFilter"/>.
/// </summary>
public sealed class FilterDirectorResolver
{
private readonly IServiceProvider _services;

public FilterDirectorResolver(IServiceProvider services)
{
_services = services;
}

public IFilterDirector GetFilterDirector(PlacementFilter placementFilter) => _services.GetRequiredKeyedService<IFilterDirector>(placementFilter.GetType());
}
52 changes: 52 additions & 0 deletions src/Orleans.Runtime/Placement/Filtering/PlacementFilter.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,52 @@
using System;
using System.Collections.Generic;
using Orleans.Metadata;

namespace Orleans.Runtime;

public abstract class PlacementFilter
{
/// <summary>
/// Initializes an instance of this type using the provided grain properties.
/// </summary>
/// <param name="properties">
/// The grain properties.
/// </param>
public virtual void Initialize(GrainProperties properties)
{
}

/// <summary>
/// Populates grain properties to specify the preferred placement strategy.
/// </summary>
/// <param name="services">The service provider.</param>
/// <param name="grainClass">The grain class.</param>
/// <param name="grainType">The grain type.</param>
/// <param name="properties">The grain properties which will be populated by this method call.</param>
public void PopulateGrainProperties(IServiceProvider services, Type grainClass, GrainType grainType, Dictionary<string, string> properties)
{
var typeName = GetType().Name;
if (properties.TryGetValue(WellKnownGrainTypeProperties.PlacementFilter, out var existingValue))
{
properties[WellKnownGrainTypeProperties.PlacementFilter] = $"{existingValue},{typeName}";
}
else
{
properties[WellKnownGrainTypeProperties.PlacementFilter] = typeName;
}

foreach (var additionalGrainProperty in GetAdditionalGrainProperties(services, grainClass, grainType, properties))
{
properties["placement-filter." + typeName + "." + additionalGrainProperty.Key] = additionalGrainProperty.Value;
}
}

protected string GetPlacementFilterGrainProperty(string key, GrainProperties properties)
{
var typeName = GetType().Name;
return properties.Properties.TryGetValue("placement-filter." + typeName + "." + key, out var value) ? value : null;
}

protected virtual IEnumerable<KeyValuePair<string, string>> GetAdditionalGrainProperties(IServiceProvider services, Type grainClass, GrainType grainType, IReadOnlyDictionary<string, string> existingProperties)
=> Array.Empty<KeyValuePair<string, string>>();
}
Loading