Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Workflow module enhancement #16043

Draft
wants to merge 12 commits into
base: main
Choose a base branch
from
Original file line number Diff line number Diff line change
Expand Up @@ -180,7 +180,7 @@ public async Task<IActionResult> Details(long id)
return NotFound();
}

var workflowType = await _workflowTypeStore.GetAsync(workflow.WorkflowTypeId);
var workflowType = await _workflowTypeStore.GetByVersionAsync(workflow.WorkflowTypeVersionId);
var blockingActivities = workflow.BlockingActivities.ToDictionary(x => x.ActivityId);
var workflowContext = await _workflowManager.CreateWorkflowExecutionContextAsync(workflowType, workflow);
var activityContexts = await Task.WhenAll(workflowType.Activities.Select(x => _workflowManager.CreateActivityExecutionContextAsync(x, x.Properties)));
Expand Down Expand Up @@ -239,7 +239,7 @@ public async Task<IActionResult> Delete(long id)
return NotFound();
}

var workflowType = await _workflowTypeStore.GetAsync(workflow.WorkflowTypeId);
var workflowType = await _workflowTypeStore.GetByVersionAsync(workflow.WorkflowTypeVersionId);
await _workflowStore.DeleteAsync(workflow);
await _notifier.SuccessAsync(H["Workflow {0} has been deleted.", id]);
return RedirectToAction(nameof(Index), new { workflowTypeId = workflowType.Id });
Expand All @@ -260,7 +260,7 @@ public async Task<IActionResult> Restart(long id)
return NotFound();
}

var workflowType = await _workflowTypeStore.GetAsync(workflow.WorkflowTypeId);
var workflowType = await _workflowTypeStore.GetByVersionAsync(workflow.WorkflowTypeVersionId);

if (workflowType == null)
{
Expand All @@ -278,7 +278,7 @@ public async Task<IActionResult> Restart(long id)
await using var acquiredLock = locker;

// Check if this is a workflow singleton and there's already an halted instance on any activity.
if (workflowType.IsSingleton && await _workflowStore.HasHaltedInstanceAsync(workflowType.WorkflowTypeId))
if (workflowType.IsSingleton && await _workflowStore.HasHaltedInstanceAsync(workflowType.WorkflowTypeVersionId))
{
await _notifier.ErrorAsync(H["Another instance is already running.", id]);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -99,7 +99,7 @@ public async Task<IActionResult> Index(WorkflowTypeIndexOptions options, PagerPa

options ??= new WorkflowTypeIndexOptions();

var query = _session.Query<WorkflowType, WorkflowTypeIndex>();
var query = _session.Query<WorkflowType, WorkflowTypeIndex>(x => x.Latest);

if (!string.IsNullOrWhiteSpace(options.Search))
{
Expand All @@ -124,7 +124,7 @@ public async Task<IActionResult> Index(WorkflowTypeIndexOptions options, PagerPa
var sqlBuilder = dialect.CreateBuilder(_session.Store.Configuration.TablePrefix);
sqlBuilder.Select();
sqlBuilder.Distinct();
sqlBuilder.Selector(nameof(WorkflowIndex), nameof(WorkflowIndex.WorkflowTypeId), _session.Store.Configuration.Schema);
sqlBuilder.Selector(nameof(WorkflowIndex), nameof(WorkflowIndex.WorkflowTypeVersionId), _session.Store.Configuration.Schema);
sqlBuilder.Table(nameof(WorkflowIndex), alias: null, _session.Store.Configuration.Schema);

// Use existing session connection. Do not use 'using' or dispose the connection.
Expand All @@ -148,7 +148,7 @@ public async Task<IActionResult> Index(WorkflowTypeIndexOptions options, PagerPa
{
WorkflowType = x,
Id = x.Id,
HasInstances = workflowTypeIdsWithInstances.Contains(x.WorkflowTypeId),
HasInstances = workflowTypeIdsWithInstances.Contains(x.WorkflowTypeVersionId),
Name = x.Name,
})
.ToList(),
Expand Down Expand Up @@ -184,7 +184,7 @@ public async Task<IActionResult> BulkEdit(WorkflowTypeIndexOptions options, IEnu

if (itemIds?.Count() > 0)
{
var checkedEntries = await _session.Query<WorkflowType, WorkflowTypeIndex>()
var checkedEntries = await _session.Query<WorkflowType, WorkflowTypeIndex>(x => x.Latest)
.Where(x => x.DocumentId.IsIn(itemIds)).ListAsync();
switch (options.BulkAction)
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -149,11 +149,14 @@ public async Task<IActionResult> Invoke(string token)
await using var acquiredLock = locker;

// Check if this is not a workflow singleton or there's not already an halted instance on any activity.
if (!workflowType.IsSingleton || !await _workflowStore.HasHaltedInstanceAsync(workflowType.WorkflowTypeId))
if (!workflowType.IsSingleton || !await _workflowStore.HasHaltedInstanceAsync(workflowType.WorkflowTypeVersionId))
{
if (_logger.IsEnabled(LogLevel.Debug))
{
_logger.LogDebug("Invoking new workflow of type '{WorkflowTypeId}' with start activity '{ActivityId}'", workflowType.WorkflowTypeId, startActivity.ActivityId);
_logger.LogDebug("Invoking new workflow of type '{WorkflowTypeId}', Version: '{WorkflowTypeVersionId}' with start activity '{ActivityId}'",
workflowType.WorkflowTypeId,
workflowType.WorkflowTypeVersionId,
startActivity.ActivityId);
}

await _workflowManager.StartWorkflowAsync(workflowType, startActivity);
Expand All @@ -163,7 +166,7 @@ public async Task<IActionResult> Invoke(string token)
else
{
// Otherwise, we need to resume all halted workflows on this activity.
var workflows = await _workflowStore.ListAsync(workflowType.WorkflowTypeId, new[] { startActivity.ActivityId });
var workflows = await _workflowStore.ListAsync(workflowType.WorkflowTypeVersionId, new[] { startActivity.ActivityId });

if (!workflows.Any())
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -65,7 +65,7 @@ public async Task OnActionExecutionAsync(ActionExecutingContext context, ActionE
await using var acquiredLock = locker;

// Check if this is a workflow singleton and there's already an halted instance on any activity.
if (workflowType.IsSingleton && await _workflowStore.HasHaltedInstanceAsync(workflowType.WorkflowTypeId))
if (workflowType.IsSingleton && await _workflowStore.HasHaltedInstanceAsync(workflowType.WorkflowTypeVersionId))
{
continue;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ public WorkflowInstanceRouteEntries(IVolatileDocumentManager<WorkflowRouteDocume

protected override async Task<WorkflowRouteDocument> CreateDocumentAsync()
{
var workflowTypeDictionary = (await Session.Query<WorkflowType, WorkflowTypeIndex>().ListAsync()).ToDictionary(x => x.WorkflowTypeId);
var workflowTypeDictionary = (await Session.Query<WorkflowType, WorkflowTypeIndex>(x => x.Latest).ListAsync()).ToDictionary(x => x.WorkflowTypeId);

var skip = 0;
var pageSize = 50;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ public WorkflowTypeRouteEntries(IVolatileDocumentManager<WorkflowTypeRouteDocume

protected override async Task<WorkflowTypeRouteDocument> CreateDocumentAsync()
{
var workflowTypeDictionary = (await Session.Query<WorkflowType, WorkflowTypeIndex>().ListAsync()).ToDictionary(x => x.WorkflowTypeId);
var workflowTypeDictionary = (await Session.Query<WorkflowType, WorkflowTypeIndex>(x => x.Latest).ListAsync()).ToDictionary(x => x.WorkflowTypeId);

var workflowTypeRouteEntries =
from workflowType in workflowTypeDictionary.Values
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,8 +9,10 @@ public class WorkflowIndex : MapIndex
{
public long DocumentId { get; set; }
public string WorkflowTypeId { get; set; }
public string WorkflowTypeVersionId { get; set; }
public string WorkflowId { get; set; }
public int WorkflowStatus { get; set; }
public DateTime? LastExecutedOnUtc { get; set; }
public DateTime CreatedUtc { get; set; }
}

Expand All @@ -33,6 +35,7 @@ public override void Describe(DescribeContext<Workflow> context)
new WorkflowIndex
{
WorkflowTypeId = workflow.WorkflowTypeId,
WorkflowTypeVersionId = workflow.WorkflowTypeVersionId,
WorkflowId = workflow.WorkflowId,
CreatedUtc = workflow.CreatedUtc,
WorkflowStatus = (int)workflow.Status
Expand Down
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
using System;
using System.Linq;
using OrchardCore.Workflows.Models;
using YesSql.Indexes;
Expand All @@ -11,6 +12,13 @@ public class WorkflowTypeIndex : MapIndex
public string Name { get; set; }
public bool IsEnabled { get; set; }
public bool HasStart { get; set; }
public string DisplayName { get; set; }
public string WorkflowTypeVersionId { get; set; }
public bool Latest { get; set; }
public DateTime CreatedUtc { get; set; }
public string CreatedBy { get; set; }
public DateTime ModifiedUtc { get; set; }
public string ModifiedBy { get; set; }
}

public class WorkflowTypeStartActivitiesIndex : MapIndex
Expand All @@ -33,7 +41,14 @@ public override void Describe(DescribeContext<WorkflowType> context)
WorkflowTypeId = workflowType.WorkflowTypeId,
Name = workflowType.Name,
IsEnabled = workflowType.IsEnabled,
HasStart = workflowType.Activities.Any(x => x.IsStart)
HasStart = workflowType.Activities.Any(x => x.IsStart),
DisplayName = workflowType.DisplayName,
WorkflowTypeVersionId = workflowType.WorkflowTypeVersionId,
Latest = workflowType.Latest,
CreatedUtc = workflowType.CreatedUtc,
ModifiedUtc = workflowType.ModifiedUtc,
ModifiedBy = workflowType.ModifiedBy,
CreatedBy = workflowType.CreatedBy
}
);

Expand Down
97 changes: 97 additions & 0 deletions src/OrchardCore.Modules/OrchardCore.Workflows/Migrations.cs
Original file line number Diff line number Diff line change
@@ -1,13 +1,32 @@
using System;
using System.Collections.Generic;
using System.Threading.Tasks;
using OrchardCore.Data.Migration;
using OrchardCore.Modules;
using OrchardCore.Workflows.Indexes;
using OrchardCore.Workflows.Models;
using OrchardCore.Workflows.Services;
using YesSql;
using YesSql.Services;
using YesSql.Sql;

namespace OrchardCore.Workflows
{
public class Migrations : DataMigration
{
private readonly IWorkflowStore _workflowStore;
private readonly IWorkflowTypeStore _workflowTypeStore;
private readonly ISession _session;
private readonly IClock _clock;

public Migrations(IWorkflowStore workflowStore, IWorkflowTypeStore workflowTypeStore, ISession session, IClock clock)
{
_workflowStore = workflowStore;
_workflowTypeStore = workflowTypeStore;
_session = session;
_clock = clock;
}

public async Task<int> CreateAsync()
{
await SchemaBuilder.CreateMapIndexTableAsync<WorkflowTypeIndex>(table => table
Expand Down Expand Up @@ -147,5 +166,83 @@ await SchemaBuilder.AlterIndexTableAsync<WorkflowBlockingActivitiesIndex>(table

return 3;
}
public async Task<int> UpdateFrom3Async()
{
await SchemaBuilder.AlterIndexTableAsync<WorkflowTypeIndex>(table =>
{
table.AddColumn<string>("DisplayName", c => c.WithLength(255));
table.AddColumn<string>("WorkflowTypeVersionId", c => c.WithLength(26));
table.AddColumn<bool>("Latest");
table.AddColumn<bool>("ModifiedBy");
table.AddColumn<bool>("CreatedBy");
table.AddColumn<DateTime>("CreatedUtc");
table.AddColumn<DateTime>("ModifiedUtc");
});

await SchemaBuilder.AlterIndexTableAsync<WorkflowTypeIndex>(table => table
.DropIndex("IDX_WorkflowTypeIndex_WorkflowTypeVersionId"));

await SchemaBuilder.AlterIndexTableAsync<WorkflowTypeIndex>(table => table

.CreateIndex("IDX_WorkflowTypeIndex_WorkflowTypeVersionId",
"DocumentId",
"WorkflowTypeId",
"Name",
"HasStart",
"IsEnabled",
"WorkflowTypeVersionId",
"DisplayName",
"Latest",
"CreatedUtc",
"ModifiedUtc",
"ModifiedBy")
);

await SchemaBuilder.AlterIndexTableAsync<WorkflowIndex>(table =>
{
table.AddColumn<string>("WorkflowTypeVersionId", c => c.WithLength(26));
});

await SchemaBuilder.AlterIndexTableAsync<WorkflowIndex>(table =>
{
table.DropIndex("IDX_WorkflowIndex_DocumentId");
});

await SchemaBuilder.AlterIndexTableAsync<WorkflowIndex>(table =>
{
table.CreateIndex("IDX_WorkflowIndex_DocumentId",
"DocumentId",
"WorkflowTypeId",
"WorkflowTypeVersionId",
"WorkflowId",
"WorkflowStatus",
"CreatedUtc");
});

var existsedTypes = await _session.Query<WorkflowType, WorkflowTypeIndex>().ListAsync();
var latestVersionMapping = new Dictionary<string, string>();
foreach (var workflowType in existsedTypes)
{
workflowType.DisplayName = workflowType.Name;
workflowType.Latest = true;
workflowType.CreatedUtc = _clock.UtcNow;
workflowType.ModifiedUtc = _clock.UtcNow;
await _workflowTypeStore.SaveAsync(workflowType);
latestVersionMapping[workflowType.WorkflowTypeId] = workflowType.WorkflowTypeVersionId;
}
var existsWorkflows = await _session.Query<Workflow, WorkflowIndex>(index => index.WorkflowTypeId.IsIn(latestVersionMapping.Keys)).ListAsync();

foreach (var workflow in existsWorkflows)
{
workflow.WorkflowTypeVersionId = latestVersionMapping[workflow.WorkflowTypeId];
await _workflowStore.SaveAsync(workflow);
}


return 4;
}



}
}
Original file line number Diff line number Diff line change
Expand Up @@ -52,32 +52,22 @@ public async Task ExecuteAsync(RecipeExecutionContext context)
foreach (var token in model.Data.Cast<JsonObject>())
{
var workflow = token.ToObject<WorkflowType>(_jsonSerializerOptions);
workflow.Id = 0;

var existing = await _workflowTypeStore.GetAsync(workflow.WorkflowTypeId);

if (existing is null)
if (urlHelper is not null)
{
workflow.Id = 0;

if (urlHelper is not null)
foreach (var activity in workflow.Activities.Where(a => a.Name == nameof(HttpRequestEvent)))
{
foreach (var activity in workflow.Activities.Where(a => a.Name == nameof(HttpRequestEvent)))
if (!activity.Properties.TryGetPropertyValue("TokenLifeSpan", out var tokenLifeSpan))
{
if (!activity.Properties.TryGetPropertyValue("TokenLifeSpan", out var tokenLifeSpan))
{
continue;
}

activity.Properties["Url"] = ReGenerateHttpRequestEventUrl(urlHelper, workflow, activity, tokenLifeSpan.ToObject<int>());
continue;
}

activity.Properties["Url"] = ReGenerateHttpRequestEventUrl(urlHelper, workflow, activity, tokenLifeSpan.ToObject<int>());
}
}
else
{
await _workflowTypeStore.DeleteAsync(existing);
}

await _workflowTypeStore.SaveAsync(workflow);
await _workflowTypeStore.SaveAsync(workflow, true);
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -75,6 +75,7 @@ public Workflow NewWorkflow(WorkflowType workflowType, string correlationId = nu
var workflow = new Workflow
{
WorkflowTypeId = workflowType.WorkflowTypeId,
WorkflowTypeVersionId = workflowType.WorkflowTypeVersionId,
Status = WorkflowStatus.Idle,
State = JObject.FromObject(new WorkflowState
{
Expand Down Expand Up @@ -219,13 +220,13 @@ public async Task<IEnumerable<WorkflowExecutionContext>> TriggerEventAsync(strin
await using var acquiredLock = locker;

// Check if this is a workflow singleton and there's already an halted instance on any activity.
if (workflowType.IsSingleton && await _workflowStore.HasHaltedInstanceAsync(workflowType.WorkflowTypeId))
if (workflowType.IsSingleton && await _workflowStore.HasHaltedInstanceAsync(workflowType.WorkflowTypeVersionId))
{
continue;
}

// Check if the event is exclusive and there's already a correlated instance halted on a starting activity of this type.
if (isExclusive && (await _workflowStore.ListAsync(workflowType.WorkflowTypeId, name, correlationId, isAlwaysCorrelated))
if (isExclusive && (await _workflowStore.ListAsync(workflowType.WorkflowTypeVersionId, name, correlationId, isAlwaysCorrelated))
.Any(x => x.BlockingActivities.Any(x => x.Name == name && x.IsStart)))
{
continue;
Expand Down Expand Up @@ -567,6 +568,7 @@ private async Task PersistAsync(WorkflowExecutionContext workflowContext)
state.ActivityStates = workflowContext.Activities.ToDictionary(x => x.Key, x => x.Value.Activity.Properties);

workflowContext.Workflow.State = JObject.FromObject(state, _jsonSerializerOptions);

await _workflowStore.SaveAsync(workflowContext.Workflow);
}

Expand Down
Loading