Skip to content

Commit

Permalink
jobs: Let the ExecutablJob emit events on the global emitter
Browse files Browse the repository at this point in the history
The EventEmitter in argument to the create, save, enqueue and delete
methods is now optional, and if not set, retrieved from the global
clientEventManager for the job's user.

This is the first part for fixing chairemobilite#589. For now, the progress of the
jobs will be displayed in the notifications after reconnection, as if
the user was never disconnected.
  • Loading branch information
tahini committed Mar 23, 2023
1 parent 5f99d1b commit 8c6b2d3
Show file tree
Hide file tree
Showing 4 changed files with 48 additions and 45 deletions.
58 changes: 26 additions & 32 deletions packages/transition-backend/src/api/services.socketRoutes.ts
Original file line number Diff line number Diff line change
Expand Up @@ -208,22 +208,19 @@ export default function (socket: EventEmitter, userId?: number) {
}

// TODO Handle the input file and add it to the task
const job: ExecutableJob<BatchRouteJobType> = await ExecutableJob.createJob(
{
user_id: userId,
name: 'batchRoute',
data: {
parameters: {
demandAttributes: parameters,
transitRoutingAttributes
}
},
inputFiles,
hasOutputFiles: true
const job: ExecutableJob<BatchRouteJobType> = await ExecutableJob.createJob({
user_id: userId,
name: 'batchRoute',
data: {
parameters: {
demandAttributes: parameters,
transitRoutingAttributes
}
},
socket
);
await job.enqueue(socket);
inputFiles,
hasOutputFiles: true
});
await job.enqueue();
await job.refresh();
// TODO Do a quick return with task detail instead of waiting for task to finish
callback(Status.createOk(job.attributes.data.results));
Expand Down Expand Up @@ -292,24 +289,21 @@ export default function (socket: EventEmitter, userId?: number) {
try {
socket.emit('progress', { name: 'BatchAccessMap', progress: null });
// TODO Handle the input file and add it to the task
const job = await ExecutableJob.createJob(
{
user_id: userId,
name: 'batchAccessMap',
data: {
parameters: {
batchAccessMapAttributes: parameters,
accessMapAttributes
}
},
inputFiles: {
input: `${directoryManager.userDataDirectory}/${userId}/imports/batchAccessMap.csv`
},
hasOutputFiles: true
const job = await ExecutableJob.createJob({
user_id: userId,
name: 'batchAccessMap',
data: {
parameters: {
batchAccessMapAttributes: parameters,
accessMapAttributes
}
},
socket
);
await job.enqueue(socket);
inputFiles: {
input: `${directoryManager.userDataDirectory}/${userId}/imports/batchAccessMap.csv`
},
hasOutputFiles: true
});
await job.enqueue();
await job.refresh();
// TODO Do a quick return with task detail instead of waiting for task to finish
callback(Status.createOk(job.attributes.data.results));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ import { directoryManager } from 'chaire-lib-backend/lib//utils/filesystem/direc
import { execJob } from '../../tasks/serverWorkerPool';
import Users from 'chaire-lib-backend/lib/services/users/users';
import { fileManager } from 'chaire-lib-backend/lib/utils/filesystem/fileManager';
import clientEventManager from '../../utils/ClientEventManager';

export type InitialJobData<TData extends JobDataType> = {
inputFiles?: {
Expand All @@ -23,9 +24,7 @@ export type InitialJobData<TData extends JobDataType> = {
};

export class ExecutableJob<TData extends JobDataType> extends Job<TData> {
static async enqueueRunningAndPendingJobs<TData extends JobDataType>(
progressEmitter: EventEmitter
): Promise<boolean> {
static async enqueueRunningAndPendingJobs<TData extends JobDataType>(): Promise<boolean> {
try {
const runningAndPending = await jobsDbQueries.collection({
statuses: ['inProgress', 'pending'],
Expand All @@ -40,7 +39,7 @@ export class ExecutableJob<TData extends JobDataType> extends Job<TData> {
console.log(`Enqueuing ${runningAndPending.jobs.length} in progress and pending jobs`);
for (let i = 0; i < runningAndPending.jobs.length; i++) {
const job = new ExecutableJob<TData>(runningAndPending.jobs[i] as JobAttributes<TData>);
await job.enqueue(progressEmitter);
await job.enqueue();
}

return true;
Expand Down Expand Up @@ -74,6 +73,8 @@ export class ExecutableJob<TData extends JobDataType> extends Job<TData> {
}: Omit<JobAttributes<TData>, 'id' | 'status'> & InitialJobData<TData>,
jobListener?: EventEmitter
): Promise<ExecutableJob<TData>> {
const jobProgressEmitter =
jobListener !== undefined ? jobListener : clientEventManager.getUserEventEmitter(attributes.user_id);
// Check the disk usage if the job has output files
if (hasOutputFiles) {
const diskUsage = Users.getUserDiskUsage(attributes.user_id);
Expand Down Expand Up @@ -111,7 +112,7 @@ export class ExecutableJob<TData extends JobDataType> extends Job<TData> {
}

const id = await jobsDbQueries.create({ status: 'pending', ...attributes });
jobListener?.emit('executableJob.updated', { id, name: attributes.name });
jobProgressEmitter.emit('executableJob.updated', { id, name: attributes.name });
const job = new ExecutableJob<TData>({ id, status: 'pending', ...attributes });
toCopy.forEach(({ filePath, jobFileName }) =>
fileManager.copyFileAbsolute(filePath, `${job.getJobFileDirectory()}/${jobFileName}`, true)
Expand All @@ -134,12 +135,16 @@ export class ExecutableJob<TData extends JobDataType> extends Job<TData> {
}/`;
}

async enqueue(progressEmitter: EventEmitter): Promise<any> {
async enqueue(progressEmitter?: EventEmitter): Promise<any> {
// TODO Handle the cancellation
const jobProgressEmitter =
progressEmitter !== undefined
? progressEmitter
: clientEventManager.getUserEventEmitter(this.attributes.user_id);
await this.save();
execJob('task', [this.attributes.id], {
on: function (payload) {
progressEmitter.emit(payload.event, payload.data);
jobProgressEmitter.emit(payload.event, payload.data);
}
});
}
Expand Down Expand Up @@ -206,20 +211,24 @@ export class ExecutableJob<TData extends JobDataType> extends Job<TData> {
}

async save(jobListener?: EventEmitter): Promise<number> {
const jobProgressEmitter =
jobListener !== undefined ? jobListener : clientEventManager.getUserEventEmitter(this.attributes.user_id);
const { resources, data } = this.attributes;
const updatedId = await jobsDbQueries.update(this.attributes.id, { status: this.status, resources, data });
jobListener?.emit('executableJob.updated', { id: updatedId, name: this.attributes.name });
jobProgressEmitter.emit('executableJob.updated', { id: updatedId, name: this.attributes.name });
return updatedId;
}

async delete(jobListener?: EventEmitter): Promise<number> {
const jobProgressEmitter =
jobListener !== undefined ? jobListener : clientEventManager.getUserEventEmitter(this.attributes.user_id);
// Delete resources used by this task
const fileDirectory = this.getJobFileDirectory();
if (directoryManager.directoryExistsAbsolute(fileDirectory)) {
directoryManager.deleteDirectoryAbsolute(fileDirectory);
}
const id = await jobsDbQueries.delete(this.attributes.id);
jobListener?.emit('executableJob.updated', { id, name: this.attributes.name });
jobProgressEmitter.emit('executableJob.updated', { id, name: this.attributes.name });
return id;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -140,7 +140,7 @@ test('Test load job', async () => {
describe('Test resume running and pending', () => {
test('No data', async () => {
mockedJobCollection.mockResolvedValueOnce({ jobs: [], totalCount: 0 })
expect(await ExecutableJob.enqueueRunningAndPendingJobs(progressEmitter)).toEqual(true);
expect(await ExecutableJob.enqueueRunningAndPendingJobs()).toEqual(true);
expect(mockedJobCollection).toHaveBeenCalledWith(expect.objectContaining({
statuses: ['inProgress', 'pending'],
pageIndex: 0,
Expand All @@ -164,7 +164,7 @@ describe('Test resume running and pending', () => {
...newJobAttributes
}];
mockedJobCollection.mockResolvedValueOnce({ jobs: jobsToRun, totalCount: jobsToRun.length })
expect(await ExecutableJob.enqueueRunningAndPendingJobs(progressEmitter)).toEqual(true);
expect(await ExecutableJob.enqueueRunningAndPendingJobs()).toEqual(true);
// Wait for the jobs to have been enqueued and saved
await TestUtils.flushPromises();
expect(mockedJobCollection).toHaveBeenCalledWith(expect.objectContaining({
Expand All @@ -188,7 +188,7 @@ describe('Test resume running and pending', () => {
test('exception', async () => {

mockedJobCollection.mockRejectedValueOnce('exception')
expect(await ExecutableJob.enqueueRunningAndPendingJobs(progressEmitter)).toEqual(false);
expect(await ExecutableJob.enqueueRunningAndPendingJobs()).toEqual(false);
expect(mockedPool).not.toHaveBeenCalled();

});
Expand Down
2 changes: 1 addition & 1 deletion packages/transition-backend/src/socketServerApp.ts
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,7 @@ const setupSocketServerApp = async function (server, session) {
// but for jobs in progress, we don't know if they are actively being run or
// not
if (process.env.STARTUP_RESTART_JOBS === undefined || _booleish(process.env.STARTUP_RESTART_JOBS)) {
await ExecutableJob.enqueueRunningAndPendingJobs(serviceLocator.socketEventManager);
await ExecutableJob.enqueueRunningAndPendingJobs();
}

const io = socketIO(server, {
Expand Down

0 comments on commit 8c6b2d3

Please sign in to comment.