Skip to content

Commit

Permalink
适配最新版本的CAP
Browse files Browse the repository at this point in the history
  • Loading branch information
xiangxiren committed Oct 19, 2023
1 parent a0d91fe commit ed86970
Show file tree
Hide file tree
Showing 11 changed files with 247 additions and 22 deletions.
18 changes: 9 additions & 9 deletions samples/Sample.RabbitMQ.MySql/Sample.RabbitMQ.MySql.csproj
Original file line number Diff line number Diff line change
Expand Up @@ -5,16 +5,16 @@
</PropertyGroup>

<ItemGroup>
<PackageReference Include="AspectCore.Extensions.Hosting" Version="2.3.0" />
<PackageReference Include="DotNetCore.CAP.Dashboard" Version="7.0.1" />
<PackageReference Include="DotNetCore.CAP.RabbitMQ" Version="7.0.1" />
<PackageReference Include="MySqlConnector" Version="2.2.3" />
<PackageReference Include="SmartSql" Version="4.1.64" />
<PackageReference Include="SmartSql.DIExtension" Version="4.1.64" />
<PackageReference Include="SmartSql.DyRepository" Version="4.1.64" />
<PackageReference Include="AspectCore.Extensions.Hosting" Version="2.4.0" />
<PackageReference Include="DotNetCore.CAP.Dashboard" Version="7.2.1" />
<PackageReference Include="DotNetCore.CAP.RabbitMQ" Version="7.2.1" />
<PackageReference Include="MySqlConnector" Version="2.2.7" />
<PackageReference Include="SmartSql" Version="4.1.67" />
<PackageReference Include="SmartSql.DIExtension" Version="4.1.67" />
<PackageReference Include="SmartSql.DyRepository" Version="4.1.67" />
<PackageReference Include="SmartSql.Schema" Version="4.1.30" />
<PackageReference Include="SmartSql.ScriptTag" Version="4.1.64" />
<PackageReference Include="SmartSql.TypeHandler" Version="4.1.64" />
<PackageReference Include="SmartSql.ScriptTag" Version="4.1.67" />
<PackageReference Include="SmartSql.TypeHandler" Version="4.1.67" />
</ItemGroup>

<ItemGroup>
Expand Down
1 change: 1 addition & 0 deletions samples/Sample.RabbitMQ.MySql/Startup.cs
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,7 @@ public void ConfigureServices(IServiceCollection services)
option.Password = Configuration["RabbitMQConfig:Password"]!;
});
options.UseDashboard();
options.UseStorageLock = true;
});

var assembly = Assembly.Load("Sample.RabbitMQ.MySql");
Expand Down
9 changes: 8 additions & 1 deletion src/SmartSql.CAP/ICapRepository.cs
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,8 @@ public interface ICapRepository
{
ISqlMapper SqlMapper { get; }

Task InitializeTablesAsync(string schema, string receivedTableName, string publishedTableName);
Task InitializeTablesAsync(string schema, string receivedTableName, string publishedTableName, bool userStorageLock,
string lockTableName, string pubKey, string recKey, DateTime lastLockTime);

Task<StatisticsDto> GetStatisticsAsync(string receivedTableName, string publishedTableName);

Expand Down Expand Up @@ -44,4 +45,10 @@ Task<List<MessagesOfNeedRetry>> GetMessagesOfNeedRetryAsync(string tableName, in

Task<List<MessagesOfNeedRetry>> GetMessagesOfDelayedAsync(string tableName, string version,
DateTime twoMinutesLater, DateTime oneMinutesAgo);

Task<int> AcquireLockAsync(string tableName, string key, DateTime ttl, string instance, DateTime lastLockTime);

Task ReleaseLockAsync(string tableName, string key, string instance, DateTime lastLockTime);

Task RenewLockAsync(string tableName, string key, double totalSeconds, string instance);
}
19 changes: 17 additions & 2 deletions src/SmartSql.CAP/IDataStorage.SmartSql.cs
Original file line number Diff line number Diff line change
Expand Up @@ -19,24 +19,39 @@ public class SmartSqlDataStorage : IDataStorage
private readonly IOptions<CapOptions> _capOptions;
private readonly IStorageInitializer _initializer;
private readonly ISerializer _serializer;
private readonly ISnowflakeId _snowflakeId;
private readonly ICapRepository _capRepository;
private readonly string _pubName;
private readonly string _recName;
private readonly string _lockName;

public SmartSqlDataStorage(
IOptions<CapOptions> capOptions,
IStorageInitializer initializer,
ISerializer serializer,
ISnowflakeId snowflakeId,
ICapRepository capRepository)
{
_capOptions = capOptions;
_initializer = initializer;
_serializer = serializer;
_snowflakeId = snowflakeId;
_capRepository = capRepository;
_pubName = initializer.GetPublishedTableName();
_recName = initializer.GetReceivedTableName();
_lockName = initializer.GetLockTableName();
}

public async Task<bool> AcquireLockAsync(string key, TimeSpan ttl, string instance,
CancellationToken token = new()) =>
await _capRepository.AcquireLockAsync(_lockName, key, DateTime.Now.Subtract(ttl), instance, DateTime.Now) > 0;

public async Task ReleaseLockAsync(string key, string instance, CancellationToken token = new()) =>
await _capRepository.ReleaseLockAsync(_lockName, key, instance, DateTime.MinValue);

public async Task RenewLockAsync(string key, TimeSpan ttl, string instance, CancellationToken token = new()) =>
await _capRepository.RenewLockAsync(_lockName, key, ttl.TotalSeconds, instance);

public async Task ChangePublishStateToDelayedAsync(string[] ids) =>
await _capRepository.ChangePublishStateToDelayedAsync(_pubName, ids)
.ConfigureAwait(false);
Expand Down Expand Up @@ -72,7 +87,7 @@ await _capRepository.InsertPublishedMessageAsync(_pubName, message.DbId, _capOpt

public async Task StoreReceivedExceptionMessageAsync(string name, string group, string content)
{
await _capRepository.InsertReceivedMessageAsync(_recName, SnowflakeId.Default().NextId().ToString(),
await _capRepository.InsertReceivedMessageAsync(_recName, _snowflakeId.NextId().ToString(),
_capOptions.Value.Version, name, group, content, _capOptions.Value.FailedRetryCount, DateTime.Now,
DateTime.Now.AddSeconds(_capOptions.Value.FailedMessageExpiredAfter), nameof(StatusName.Failed))
.ConfigureAwait(false);
Expand All @@ -82,7 +97,7 @@ public async Task<MediumMessage> StoreReceivedMessageAsync(string name, string g
{
var mdMessage = new MediumMessage
{
DbId = SnowflakeId.Default().NextId().ToString(),
DbId = _snowflakeId.NextId().ToString(),
Origin = message,
Added = DateTime.Now,
ExpiresAt = null,
Expand Down
15 changes: 14 additions & 1 deletion src/SmartSql.CAP/IStorageInitializer.SmartSql.cs
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
using System;
using System.Threading;
using System.Threading.Tasks;
using DotNetCore.CAP;
Expand All @@ -9,16 +10,20 @@ namespace SmartSql.CAP;

public class SmartSqlStorageInitializer : IStorageInitializer
{
private readonly IOptions<CapOptions> _capOptions;
private readonly IOptions<SmartSqlOptions> _options;
private readonly ILogger _logger;
private readonly ICapRepository _capRepository;

public SmartSqlStorageInitializer(
ILogger<SmartSqlStorageInitializer> logger,
IOptions<SmartSqlOptions> options,
IOptions<CapOptions> capOptions,
ICapRepository capRepository)
{
_options = options;
_capOptions = capOptions;
_capOptions = capOptions;
_capRepository = capRepository;
_logger = logger;
}
Expand All @@ -33,6 +38,11 @@ public virtual string GetReceivedTableName()
return $"{_options.Value.Schema}.received";
}

public string GetLockTableName()
{
return $"{_options.Value.Schema}.lock";
}

public async Task InitializeAsync(CancellationToken cancellationToken)
{
if (cancellationToken.IsCancellationRequested)
Expand All @@ -47,8 +57,11 @@ public async Task InitializeAsync(CancellationToken cancellationToken)
}

await _capRepository
.InitializeTablesAsync(_options.Value.Schema, GetReceivedTableName(), GetPublishedTableName())
.InitializeTablesAsync(_options.Value.Schema, GetReceivedTableName(), GetPublishedTableName(),
_capOptions.Value.UseStorageLock, GetLockTableName(), $"publish_retry_{_capOptions.Value.Version}",
$"received_retry_{_capOptions.Value.Version}", DateTime.MinValue)
.ConfigureAwait(false);

_logger.LogDebug("Ensuring all create database tables script are applied.");
}
}
34 changes: 34 additions & 0 deletions src/SmartSql.CAP/Maps/CapMySql.xml
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,19 @@
PRIMARY KEY (`Id`),
INDEX `IX_ExpiresAt`(`ExpiresAt`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4;

<IsTrue Prepend="" Property="userStorageLock">
CREATE TABLE IF NOT EXISTS `<Placeholder Property="lockTableName" />` (
`Key` varchar(128) NOT NULL,
`Instance` varchar(256) DEFAULT NULL,
`LastLockTime` datetime DEFAULT NULL,
PRIMARY KEY (`Key`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4;

INSERT IGNORE INTO `<Placeholder Property="lockTableName" />` (`Key`,`Instance`,`LastLockTime`) VALUES(?pubKey,'',?lastLockTime);
INSERT IGNORE INTO `<Placeholder Property="lockTableName" />` (`Key`,`Instance`,`LastLockTime`) VALUES(?recKey,'',?lastLockTime);
</IsTrue>

</Statement>

<Statement Id="GetStatistics">
Expand Down Expand Up @@ -196,6 +209,27 @@
FOR UPDATE
</Statement>

<Statement Id="AcquireLock">
UPDATE `<Placeholder Property="tableName" />`
SET `Instance`=?instance,`LastLockTime`=?lastLockTime
WHERE `Key`=?key
AND `LastLockTime` &lt; ?ttl
</Statement>

<Statement Id="ReleaseLock">
UPDATE `<Placeholder Property="tableName" />`
SET `Instance`= '',`LastLockTime`=?lastLockTime
WHERE `Key`=?key
AND `Instance`=?instance
</Statement>

<Statement Id="RenewLock">
UPDATE `<Placeholder Property="tableName" />`
SET `LastLockTime`=date_add(`LastLockTime`, interval ?totalSeconds second)
WHERE `Key`=?key
AND `Instance`=?instance
</Statement>

</Statements>

</SmartSqlMap>
33 changes: 33 additions & 0 deletions src/SmartSql.CAP/Maps/CapMySql8.xml
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,18 @@
PRIMARY KEY (`Id`),
INDEX `IX_ExpiresAt`(`ExpiresAt`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4;

<IsTrue Prepend="" Property="userStorageLock">
CREATE TABLE IF NOT EXISTS `<Placeholder Property="lockTableName" />` (
`Key` varchar(128) NOT NULL,
`Instance` varchar(256) DEFAULT NULL,
`LastLockTime` datetime DEFAULT NULL,
PRIMARY KEY (`Key`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4;

INSERT IGNORE INTO `<Placeholder Property="lockTableName" />` (`Key`,`Instance`,`LastLockTime`) VALUES(?pubKey,'',?lastLockTime);
INSERT IGNORE INTO `<Placeholder Property="lockTableName" />` (`Key`,`Instance`,`LastLockTime`) VALUES(?recKey,'',?lastLockTime);
</IsTrue>
</Statement>

<Statement Id="GetStatistics">
Expand Down Expand Up @@ -196,6 +208,27 @@
FOR UPDATE SKIP LOCKED
</Statement>

<Statement Id="AcquireLock">
UPDATE `<Placeholder Property="tableName" />`
SET `Instance`=?instance,`LastLockTime`=?lastLockTime
WHERE `Key`=?key
AND `LastLockTime` &lt; ?ttl
</Statement>

<Statement Id="ReleaseLock">
UPDATE `<Placeholder Property="tableName" />`
SET `Instance`= '',`LastLockTime`=?lastLockTime
WHERE `Key`=?key
AND `Instance`=?instance
</Statement>

<Statement Id="RenewLock">
UPDATE `<Placeholder Property="tableName" />`
SET `LastLockTime`=date_add(`LastLockTime`, interval ?totalSeconds second)
WHERE `Key`=?key
AND `Instance`=?instance
</Statement>

</Statements>

</SmartSqlMap>
32 changes: 32 additions & 0 deletions src/SmartSql.CAP/Maps/CapPostgreSql.xml
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,17 @@
"ExpiresAt" TIMESTAMP NULL,
"StatusName" VARCHAR(50) NOT NULL
);

<IsTrue Prepend="" Property="userStorageLock">
CREATE TABLE IF NOT EXISTS <Placeholder Property="lockTableName" />(
""Key"" VARCHAR(128) PRIMARY KEY NOT NULL,
""Instance"" VARCHAR(256),
""LastLockTime"" TIMESTAMP NOT NULL
);

INSERT INTO <Placeholder Property="lockTableName" /> (""Key"",""Instance"",""LastLockTime"") VALUES(@pubKey,'',@lastLockTime) ON CONFLICT DO NOTHING;
INSERT INTO <Placeholder Property="lockTableName" /> (""Key"",""Instance"",""LastLockTime"") VALUES(@recKey,'',@lastLockTime) ON CONFLICT DO NOTHING;
</IsTrue>
</Statement>

<Statement Id="GetStatistics">
Expand Down Expand Up @@ -191,6 +202,27 @@
FOR UPDATE SKIP LOCKED
</Statement>

<Statement Id="AcquireLock">
UPDATE <Placeholder Property="tableName" />
SET "Instance"=@instance,"LastLockTime"=@lastLockTime
WHERE "Key"=@Key
AND "LastLockTime" &lt; @ttl;
</Statement>

<Statement Id="ReleaseLock">
UPDATE <Placeholder Property="tableName" />
SET "Instance"='',"LastLockTime"=@lastLockTime
WHERE "Key"=@key
AND "Instance"=@instance
</Statement>

<Statement Id="RenewLock">
UPDATE <Placeholder Property="tableName" />
SET "LastLockTime"="LastLockTime"+interval @totalSeconds second
WHERE "Key"=@key
AND "Instance"=@instance
</Statement>

</Statements>

</SmartSqlMap>
53 changes: 49 additions & 4 deletions src/SmartSql.CAP/Maps/CapSqlServer.xml
Original file line number Diff line number Diff line change
Expand Up @@ -2,9 +2,15 @@

<SmartSqlMap Scope="Cap" xmlns="http://SmartSql.net/schemas/SmartSqlMap.xsd">

<ParameterMaps>
<ParameterMap Id="InitializeTablesParamMap">
<Parameter Property="lastLockTime" DbType="DateTime2"/>
</ParameterMap>
</ParameterMaps>

<Statements>

<Statement Id="InitializeTables" SourceChoice="Write">
<Statement Id="InitializeTables" SourceChoice="Write" ParameterMap="InitializeTablesParamMap">
IF NOT EXISTS (SELECT * FROM sys.schemas WHERE name = '<Placeholder Property="schema" />')
BEGIN
EXEC('CREATE SCHEMA [<Placeholder Property="schema" />]')
Expand Down Expand Up @@ -46,6 +52,24 @@
)WITH (PAD_INDEX = OFF, STATISTICS_NORECOMPUTE = OFF, IGNORE_DUP_KEY = OFF, ALLOW_ROW_LOCKS = ON, ALLOW_PAGE_LOCKS = ON) ON [PRIMARY]
) ON [PRIMARY] TEXTIMAGE_ON [PRIMARY]
END;

<IsTrue Prepend="" Property="userStorageLock">
IF OBJECT_ID(N'<Placeholder Property="lockTableName" />',N'U') IS NULL
BEGIN
CREATE TABLE <Placeholder Property="lockTableName" />(
[Key] [nvarchar](128) NOT NULL,
[Instance] [nvarchar](256) NOT NULL,
[LastLockTime] [datetime2](7) NOT NULL,
CONSTRAINT [PK_<Placeholder Property="lockTableName" />] PRIMARY KEY CLUSTERED
(
[Key] ASC
)WITH (PAD_INDEX = OFF, STATISTICS_NORECOMPUTE = OFF, IGNORE_DUP_KEY = ON, ALLOW_ROW_LOCKS = ON, ALLOW_PAGE_LOCKS = ON) ON [PRIMARY]
) ON [PRIMARY]
END;

INSERT INTO <Placeholder Property="lockTableName" /> ([Key],[Instance],[LastLockTime]) VALUES(@pubKey,'',@lastLockTime);
INSERT INTO <Placeholder Property="lockTableName" /> ([Key],[Instance],[LastLockTime]) VALUES(@recKey,'',@lastLockTime);
</IsTrue>
</Statement>

<Statement Id="GetStatistics">
Expand Down Expand Up @@ -193,7 +217,7 @@
SELECT TOP (200) Id, Content, Retries, Added
FROM <Placeholder Property="tableName" /> WITH (READPAST)
WHERE Retries &lt; @retries
AND Version = @version
AND Version = @version
AND Added &lt; @added
AND (StatusName = 'Failed' OR StatusName = 'Scheduled')
</Statement>
Expand All @@ -202,8 +226,29 @@
SELECT Id,Content,Retries,Added,ExpiresAt
FROM <Placeholder Property="tableName" /> WITH (UPDLOCK,READPAST)
WHERE Version = @version
AND ((ExpiresAt &lt; @twoMinutesLater AND StatusName = 'Delayed')
OR (ExpiresAt &lt; @oneMinutesAgo AND StatusName = 'Queued'))
AND ((ExpiresAt &lt; @twoMinutesLater AND StatusName = 'Delayed')
OR (ExpiresAt &lt; @oneMinutesAgo AND StatusName = 'Queued'))
</Statement>

<Statement Id="AcquireLock">
UPDATE <Placeholder Property="tableName" />
SET [Instance]=@instance,[LastLockTime]=@lastLockTime
WHERE [Key]=@Key
AND [LastLockTime] &lt; @ttl;
</Statement>

<Statement Id="ReleaseLock">
UPDATE <Placeholder Property="tableName" />
SET [Instance]='',[LastLockTime]=@lastLockTime
WHERE [Key]=@key
AND [Instance]=@instance
</Statement>

<Statement Id="RenewLock">
UPDATE <Placeholder Property="tableName" />
SET [LastLockTime]=DATEADD(s,@totalSeconds,LastLockTime)
WHERE [Key]=@key
AND [Instance]=@instance
</Statement>

</Statements>
Expand Down
Loading

0 comments on commit ed86970

Please sign in to comment.