Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Support RequestPolicy #247

Open
wants to merge 67 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
67 commits
Select commit Hold shift + click to select a range
e80ff1c
start working
shacharPash Jan 30, 2024
d05b4a5
Merge branch 'master' into ClusterSupport2
shacharPash Feb 5, 2024
886b4a6
Merge remote-tracking branch 'origin/master' into ClusterSupport2
shacharPash Feb 13, 2024
1ef32ad
add lcov.net8.0.info to gitignor
shacharPash Feb 14, 2024
8c36efd
Execute with RequestPolicy
shacharPash Feb 15, 2024
f6729ab
start adding RequestPolicys
shacharPash Feb 15, 2024
8cfb4ac
fix infinity loop
shacharPash Feb 15, 2024
502757c
try deleting request policy
shacharPash Feb 15, 2024
b0aa0d1
Fix SerializedCommand Creation with RequestPolicy
shacharPash Feb 15, 2024
47ea11d
add RequestPolicy.AllShards to ft.Create
shacharPash Feb 15, 2024
c585b66
Execute via server
shacharPash Feb 21, 2024
84b679f
Merge branch 'master' into ClusterSupport2
shacharPash Feb 25, 2024
07347e8
format
shacharPash Mar 3, 2024
602838f
use normal execute when redis is not enterprise
shacharPash Mar 4, 2024
d0ec96a
Skip if Oss cluster
shacharPash Mar 4, 2024
1cc477f
add RequestPolicy.AllShards to JSON.MGET
shacharPash Mar 4, 2024
f55f06f
Merge branch 'master' into ClusterSupport2
shacharPash Mar 5, 2024
2aa0f06
Merge branch 'master' into ClusterSupport2
shacharPash Mar 6, 2024
7986883
Merge branch 'master' into ClusterSupport2
shacharPash Mar 12, 2024
0822edb
Merge branch 'master' into ClusterSupport2
shacharPash Mar 19, 2024
ca9393f
fix compilation error
shacharPash Mar 19, 2024
6ce424a
add RequestPolicy to search commands
shacharPash Mar 19, 2024
bc02f46
add allshards to flushall in the tests
shacharPash Mar 19, 2024
fe9c880
support FlushAll
shacharPash Mar 19, 2024
f03a557
make ft.create in bound tests async
shacharPash Mar 20, 2024
d67f71e
dotnet format
shacharPash Mar 20, 2024
64780bf
skip if OSS cluster on bound tests
shacharPash Mar 20, 2024
c2e1e3d
Merge branch 'master' into ClusterSupport2
shacharPash Mar 20, 2024
95b8800
format
shacharPash Mar 20, 2024
dee15eb
dont skip any test
shacharPash Mar 20, 2024
d760cdf
dont skip serach tests
shacharPash Mar 20, 2024
965e2e0
skip everytime
shacharPash Mar 20, 2024
f32516a
dont skip EnterpriseOssCluster
shacharPash Mar 21, 2024
a83614b
delete allowskipping
shacharPash Mar 21, 2024
f21d8da
change FT.CREATE to RequestPolicy.AnyShard
shacharPash Mar 21, 2024
30eacd3
skip 3 envs
shacharPash Apr 1, 2024
a4ba254
try to fix TestJsonTransaction
shacharPash Apr 2, 2024
83b69e7
dont use ExecuteAllShards
shacharPash Apr 16, 2024
499b1b7
Merge branch 'master' into ClusterSupport2
shacharPash Apr 17, 2024
659ce79
Merge branch 'master' into ClusterSupport2
shacharPash Apr 21, 2024
d048b7a
make ft.create AllShrads, and skip
shacharPash Apr 21, 2024
90039d1
fix TestJsonTransaction
shacharPash Apr 24, 2024
5adef85
delete the ExecuteAllShardsOKres
shacharPash Apr 25, 2024
5070f1c
Change ExecuteAnyShard to list
shacharPash Apr 25, 2024
b568b4b
dotnet formet
shacharPash Apr 25, 2024
a8c3000
skip TestMerge if redis is EnterpriseOssCluster
shacharPash Apr 25, 2024
98f84b9
skip what needed and make setinfo allshards
shacharPash May 1, 2024
36b5a3b
delete ClientSetInfo request policy & skip setInfo tests
shacharPash May 1, 2024
433517d
skips
shacharPash May 1, 2024
7f25271
skip TestTSQueryIndex
shacharPash May 1, 2024
c16ae4d
change Is:OSSCluster to StandaloneOSSCluster, and skip all enterprise…
shacharPash May 5, 2024
bc56bbf
comment SetInfoInPipeline
shacharPash May 6, 2024
ccccc7e
merge OK Array to single res and dont send setinfo
shacharPash May 6, 2024
aca7c01
skips
shacharPash May 7, 2024
fa67aa6
add ft.dict to all shards RQP
shacharPash May 7, 2024
c4bb0c7
Add FLUSHALL to the switchcase
shacharPash May 7, 2024
227ced8
replace db.Execute("FLUSHALL"); with db.FlushAll();
shacharPash May 7, 2024
c7d9f2f
dont use raw Execute
shacharPash May 7, 2024
f44ca24
skip TestModulesPipelineWithoutGraph
shacharPash May 7, 2024
760a9dc
skip tests with dbsize
shacharPash May 7, 2024
69c45dd
uncomment SetInfoInPipeline
shacharPash May 7, 2024
496a3d4
print connection data
shacharPash May 9, 2024
5e0c3d2
print num of nodes before create
shacharPash May 9, 2024
a984f4d
change nodes num to 1
shacharPash May 9, 2024
e48e5f8
disable cluster command
shacharPash May 9, 2024
a55f4b2
change NUM_REDIS_CLUSTER_NODES to 3
shacharPash May 9, 2024
7b22632
try
shacharPash May 9, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion .github/workflows/modes/.env.enterprise
Original file line number Diff line number Diff line change
Expand Up @@ -4,4 +4,4 @@ [email protected]
RE_PASS=12345
RE_CLUSTER_NAME=test
RE_USE_OSS_CLUSTER=false
RE_DB_PORT=6379
RE_DB_PORT=6379
1 change: 1 addition & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -399,6 +399,7 @@ FodyWeavers.xsd
.idea
tests/NRedisStack.Tests/lcov.net7.0.info
tests/NRedisStack.Tests/lcov.net6.0.info
tests/NRedisStack.Tests/lcov.net8.0.info
tests/NRedisStack.Tests/lcov.info
tests/NRedisStack.Tests/.env
tests/NRedisStack.Tests/redis_ca.pem
Expand Down
2 changes: 1 addition & 1 deletion PackageVerification/PackageVerification.cs
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ public static void Main()
ConnectionMultiplexer redis = ConnectionMultiplexer.Connect(configurationOptions);

IDatabase db = redis.GetDatabase();
db.Execute("FLUSHALL");
db.FlushAll();

IJsonCommands json = db.JSON();

Expand Down
219 changes: 192 additions & 27 deletions src/NRedisStack/Auxiliary.cs
Original file line number Diff line number Diff line change
@@ -1,5 +1,8 @@
using NRedisStack.Core;
using NRedisStack.Core.Literals;
using NRedisStack.Json.Literals;
using NRedisStack.RedisStackCommands;
using NRedisStack.Search.Literals;
using StackExchange.Redis;

namespace NRedisStack;
Expand All @@ -8,11 +11,27 @@ public static class Auxiliary
{
private static string? _libraryName = $"NRedisStack(.NET_v{Environment.Version})";
private static bool _setInfo = true;
public static bool IsCluster(this IDatabase db) => db.Multiplexer.GetEndPoints().Length > 1;
public static bool IsEnterprise(this IDatabase db) // TODO: check if there is a better way to check if the server is Redis Enterprise without sending a command each time
{
// DPING command is available only in Redis Enterprise
try
{
db.Execute("DPING");
return true;
}
catch (RedisServerException)
{
return false;
}
}

public static void ResetInfoDefaults()
{
_setInfo = true;
_libraryName = $"NRedisStack(.NET_v{Environment.Version})";
}

public static List<object> MergeArgs(RedisKey key, params RedisValue[] items)
{
var args = new List<object>(items.Length + 1) { key };
Expand All @@ -36,7 +55,7 @@ public static object[] AssembleNonNullArguments(params object?[] arguments)

// TODO: add all the signatures of GetDatabase
public static IDatabase GetDatabase(this ConnectionMultiplexer redis,
string? LibraryName)
string? LibraryName)
{
var _db = redis.GetDatabase();
if (LibraryName == null) // the user wants to disable the library name and version sending
Expand All @@ -61,66 +80,212 @@ internal static void SetInfoInPipeline(this IDatabase db)
}
}

// public static RedisResult Execute(this IDatabase db, SerializedCommand command)
// {
// if (_setInfo)
// {
// _setInfo = false;
// db.SetInfoInPipeline();
// }
// return (db.IsCluster()) ? db.ClusterExecute(command)
// : db.Execute(command.Command, command.Args);
// }

// public async static Task<RedisResult> ExecuteAsync(this IDatabaseAsync db, SerializedCommand command)
// {
// if (_setInfo)
// {
// _setInfo = false;
// ((IDatabase)db).SetInfoInPipeline();
// }
// return (db.IsCluster()) ? db.ClusterExecuteAsync(command)
// : db.ExecuteAsync(command.Command, command.Args);
// }

public static RedisResult Execute(this IDatabase db, SerializedCommand command)
{
db.SetInfoInPipeline();
return db.Execute(command.Command, command.Args);

if (!db.IsEnterprise() || !db.IsCluster())
return db.Execute(command.Command, command.Args);

switch (command.Policy)
{
case RequestPolicy.Default: // add is cluster
return db.Execute(command.Command, command.Args);
case RequestPolicy.AllNodes:
return db.ExecuteAllNodes(command);
case RequestPolicy.AllShards:
return db.ExecuteAllShards(command);
case RequestPolicy.AnyShard:
return db.ExecuteAnyShard(command);
case RequestPolicy.MultiShard:
throw new NotImplementedException("MultiShard policy is not implemented yet");
case RequestPolicy.Special:
throw new NotImplementedException("Special policy is not implemented yet");
default:
throw new NotImplementedException("Unknown policy");
}
}

public static async Task<RedisResult> ExecuteAsync(this IDatabaseAsync db, SerializedCommand command)
{
((IDatabase)db).SetInfoInPipeline();
return await db.ExecuteAsync(command.Command, command.Args);

if (!((IDatabase)db).IsCluster())
return await db.ExecuteAsync(command.Command, command.Args);

switch (command.Policy)
{
case RequestPolicy.Default:
return await db.ExecuteAsync(command.Command, command.Args);
case RequestPolicy.AllNodes:
return await db.ExecuteAllNodesAsync(command);
case RequestPolicy.AllShards:
return await db.ExecuteAllShardsAsync(command);
case RequestPolicy.AnyShard:
return await db.ExecuteAnyShardAsync(command);
case RequestPolicy.MultiShard:
// return db.ExecuteMultiShard(command);
throw new NotImplementedException("MultiShard policy is not implemented yet");
case RequestPolicy.Special:
throw new NotImplementedException("Special policy is not implemented yet");
default:
throw new NotImplementedException("Unknown policy");
}
}

public static List<RedisResult> ExecuteBroadcast(this IDatabase db, string command)
=> db.ExecuteBroadcast(new SerializedCommand(command));
public static RedisResult ExecuteAllNodes(this IDatabase db, SerializedCommand command)
{
var redis = db.Multiplexer;
var endpoints = redis.GetEndPoints();
var results = new RedisResult[endpoints.Length];

for (int i = 0; i < endpoints.Length; i++)
{
results[i] = redis.GetServer(endpoints[i]).Execute(command.Command, command.Args);
}

return results.ToRedisResult(command.Command);
}

public static List<RedisResult> ExecuteBroadcast(this IDatabase db, SerializedCommand command)
public static async Task<RedisResult> ExecuteAllNodesAsync(this IDatabaseAsync db, SerializedCommand command)
{
var redis = db.Multiplexer;
var endpoints = redis.GetEndPoints();
var results = new List<RedisResult>(endpoints.Length);
var results = new RedisResult[endpoints.Length];

foreach (var endPoint in endpoints)
for (int i = 0; i < endpoints.Length; i++)
{
var server = redis.GetServer(endPoint);
results[i] = await redis.GetServer(endpoints[i]).ExecuteAsync(command.Command, command.Args);
}

return results.ToRedisResult(command.Command);
}

public static RedisResult ExecuteAllShards(this IDatabase db, string command)
=> db.ExecuteAllShards(new SerializedCommand(command));

public static RedisResult ExecuteAllShards(this IDatabase db, SerializedCommand command)
{
var redis = db.Multiplexer;
var endpoints = redis.GetEndPoints();
var results = new List<RedisResult>();

if (server.IsReplica)
foreach (var endpoint in endpoints)
{
var server = redis.GetServer(endpoint);
if (!server.IsReplica)
{
continue; // Skip replica nodes
results.Add(server.Execute(command.Command, command.Args));
}
// Send your command to the master node

results.Add(server.Multiplexer.GetDatabase().Execute(command));
}
return results;

return results.ToArray().ToRedisResult(command.Command);


}

public static async Task<List<RedisResult>> ExecuteBroadcastAsync(this IDatabaseAsync db, string command)
=> await db.ExecuteBroadcastAsync(new SerializedCommand(command));
public async static Task<RedisResult> ExecuteAllShardsAsync(this IDatabaseAsync db, string command)
=> await db.ExecuteAllShardsAsync(new SerializedCommand(command));

private static async Task<List<RedisResult>> ExecuteBroadcastAsync(this IDatabaseAsync db, SerializedCommand command)
public async static Task<RedisResult> ExecuteAllShardsAsync(this IDatabaseAsync db, SerializedCommand command)
{
var redis = db.Multiplexer;
var endpoints = redis.GetEndPoints();
var results = new List<RedisResult>(endpoints.Length);
var results = new List<RedisResult>();

foreach (var endPoint in endpoints)
foreach (var endpoint in endpoints)
{
var server = redis.GetServer(endPoint);

if (server.IsReplica)
var server = redis.GetServer(endpoint);
if (!server.IsReplica)
{
continue; // Skip replica nodes
results.Add(await server.ExecuteAsync(command.Command, command.Args));
}
// Send your command to the master node
}
var toRedisResult = results.ToArray().ToRedisResult(command.Command);

return toRedisResult;
return RedisResult.Create(results.ToArray());
}

public static RedisResult ExecuteAnyShard(this IDatabase db, SerializedCommand command)
{
var server = GetAnyPrimary(db.Multiplexer);
return server.Execute(command.Command, command.Args);
}

public static async Task<RedisResult> ExecuteAnyShardAsync(this IDatabaseAsync db, SerializedCommand command)
{
var server = GetAnyPrimary(db.Multiplexer);
return await server.ExecuteAsync(command.Command, command.Args);
}

public static IServer GetAnyPrimary(IConnectionMultiplexer muxer)
{
foreach (var endpoint in muxer.GetEndPoints())
{
var server = muxer.GetServer(endpoint);
if (!server.IsReplica) return server;
}
throw new InvalidOperationException("Requires a primary endpoint (found none)");
}

public static RedisResult ToRedisResult(this RedisResult[] results, string command)
{
switch (command)
{
case FT.ALIASADD:
case FT.ALIASDEL:
case FT.ALIASUPDATE:
case FT.ALTER:
case FT.CREATE:
case FT.DROPINDEX:
case FT.DICTADD:
case FT.DICTDEL:
case RedisCoreCommands.FLUSHALL:
return results.OKArraytoResult();
case JSON.MGET:
// TODO: implement
break;
}
return results[0]; // TODO: check if this is the correct behavior
}

results.Add(await server.Multiplexer.GetDatabase().ExecuteAsync(command));
public static RedisResult OKArraytoResult(this RedisResult[] results)
{
foreach (var result in results)
{
if (result.ToString() != "OK")
{
return result; // return the problematic result
}
}
return results;
return results[0]; // return the first result (which is OK, like the others)
}

// TODO: check if implementing MultiShard and Special policies is nessesary

public static string GetNRedisStackVersion()
{
Version version = typeof(Auxiliary).Assembly.GetName().Version!;
Expand Down
11 changes: 11 additions & 0 deletions src/NRedisStack/CoreCommands/CoreCommandBuilder.cs
Original file line number Diff line number Diff line change
Expand Up @@ -199,5 +199,16 @@ private static SerializedCommand BlockingCommandWithKeysAndTimeout(String comman

return new SerializedCommand(command, args);
}

public static SerializedCommand FlushAll(bool? async)
{
List<object> args = new List<object>();
if (async != null)
{
args.Add(async.Value ? CoreArgs.ASYNC : CoreArgs.SYNC);
}

return new SerializedCommand(RedisCoreCommands.FLUSHALL, RequestPolicy.AllShards, args);
}
}
}
13 changes: 13 additions & 0 deletions src/NRedisStack/CoreCommands/CoreCommands.cs
Original file line number Diff line number Diff line change
Expand Up @@ -479,5 +479,18 @@ public static bool ClientSetInfo(this IDatabase db, SetInfoAttr attr, string val
}
return result[0].Entries;
}

/// <summary>
/// Removes all keys from all databases.
/// </summary>
/// <param name="db">The <see cref="IDatabase"/> class where this extension method is applied.</param>
/// <param name="async">if set, flushes the databases asynchronously</param>
/// <returns><see langword="true"/> if everything was done correctly, Error otherwise.</returns>
/// <remarks><seealso href="https://redis.io/commands/flushall"/></remarks>
public static bool FlushAll(this IDatabase db, bool? async = null)
{
var command = CoreCommandBuilder.FlushAll(async);
return db.Execute(command).OKtoBoolean();
}
}

2 changes: 2 additions & 0 deletions src/NRedisStack/CoreCommands/Literals/CommandArgs.cs
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ namespace NRedisStack.Core.Literals
{
internal static class CoreArgs
{
public const string ASYNC = "ASYNC";
public const string BLOCK = "BLOCK";
public const string COUNT = "COUNT";
public const string GROUP = "GROUP";
Expand All @@ -11,6 +12,7 @@ internal static class CoreArgs
public const string NOACK = "NOACK";
public const string RIGHT = "RIGHT";
public const string STREAMS = "STREAMS";
public const string SYNC = "SYNC";
public const string lib_name = "LIB-NAME";
public const string lib_ver = "LIB-VER";
}
Expand Down
1 change: 1 addition & 0 deletions src/NRedisStack/CoreCommands/Literals/Commands.cs
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ internal static class RedisCoreCommands
public const string BZPOPMIN = "BZPOPMIN";
public const string CLIENT = "CLIENT";
public const string SETINFO = "SETINFO";
public const string FLUSHALL = "FLUSHALL";
public const string XREAD = "XREAD";
public const string XREADGROUP = "XREADGROUP";
}
Expand Down
2 changes: 1 addition & 1 deletion src/NRedisStack/Json/JsonCommandBuilder.cs
Original file line number Diff line number Diff line change
Expand Up @@ -210,7 +210,7 @@ public static SerializedCommand MGet(RedisKey[] keys, string path)
var args = keys.Cast<object>().ToList();

args.Add(path);
return new SerializedCommand(JSON.MGET, args);
return new SerializedCommand(JSON.MGET, RequestPolicy.AllShards, args);
}

public static SerializedCommand NumIncrby(RedisKey key, string path, double value)
Expand Down
Loading
Loading