From e9dba19f064aed63e58ca54530b90bcc018b72d1 Mon Sep 17 00:00:00 2001 From: Sadiq Khoja Date: Fri, 19 May 2023 11:58:19 -0400 Subject: [PATCH] Features/entities/auto convert sub to entities (#870) * enhance: PATCH datasets/:name can now auto convert pending submissions into entity * use loop instead of recursion in runSequentially * correct SQL query * fix: event.details.data can be null dataset update worker * use entity source table * correct event id and save parent event id in source table * added assertion for audit of auto-created entity * lint it --- lib/model/query/datasets.js | 42 ++++++- lib/model/query/entities.js | 25 ++-- lib/resources/datasets.js | 22 +++- lib/util/problem.js | 3 + lib/util/promise.js | 14 ++- lib/worker/dataset.js | 21 ++++ lib/worker/jobs.js | 4 +- test/integration/api/datasets.js | 202 +++++++++++++++++++++++++++++- test/integration/worker/entity.js | 2 +- 9 files changed, 312 insertions(+), 23 deletions(-) create mode 100644 lib/worker/dataset.js diff --git a/lib/model/query/datasets.js b/lib/model/query/datasets.js index e754c70be..48065ad08 100644 --- a/lib/model/query/datasets.js +++ b/lib/model/query/datasets.js @@ -9,9 +9,9 @@ const { sql } = require('slonik'); const { extender, QueryOptions, equals, updater } = require('../../util/db'); -const { Dataset, Form } = require('../frames'); +const { Dataset, Form, Audit } = require('../frames'); const { validatePropertyName } = require('../../data/dataset'); -const { isEmpty, isNil, either, reduceBy, groupBy, uniqWith, equals: rEquals } = require('ramda'); +const { isEmpty, isNil, either, reduceBy, groupBy, uniqWith, equals: rEquals, map } = require('ramda'); const Problem = require('../../util/problem'); const { construct } = require('../../util/util'); @@ -241,6 +241,11 @@ const getById = (id, extended = false) => ({ maybeOne }) => { return _get(maybeOne, options.withCondition({ id }), true); }; +// Get by Actee ID - return only published dataset +const getByActeeId = (acteeId, extended = false) => ({ maybeOne }) => { + const options = extended ? QueryOptions.extended : QueryOptions.none; + return _get(maybeOne, options.withCondition({ acteeId }), true); +}; //////////////////////////////////////////////////////////////////////////////// @@ -391,12 +396,39 @@ const getDiff = (projectId, xmlFormId, forDraft) => ({ all }) => all(sql` .then(r => Object.keys(r).map(k => ({ name: k.split(',')[0], isNew: forDraft ? k.split(',')[1] === 'true' : undefined, properties: r[k] }))); const update = (datasets, data) => ({ one }) => one(updater(datasets, data)).then(construct(Dataset)); -update.audit = (datasets, data) => (log) => log('dataset.update', datasets, { data }); +update.audit = (datasets, data, autoConvert) => (log) => log('dataset.update', datasets, { data, autoConvert }); + +const _unprocessedSubmissions = (datasetId, fields) => sql` +SELECT ${fields} FROM dataset_form_defs dfd +JOIN submission_defs sd ON sd."formDefId" = dfd."formDefId" +JOIN submissions s ON sd."submissionId" = s.id +JOIN forms f ON s."formId" = f.id +JOIN ( + SELECT DISTINCT ON ((details->'submissionId')::INTEGER) * FROM audits + WHERE action IN ('submission.create', 'submission.update.version') + ORDER BY (details->'submissionId')::INTEGER, id DESC +) audits ON (audits.details->'submissionId')::INTEGER = s.id +LEFT JOIN ( + entity_defs ed + JOIN entity_def_sources es ON es.id = ed."sourceId" + JOIN entities e ON e.id = ed."entityId" AND e."datasetId" = ${datasetId} +) ON es."submissionDefId" = sd.id +WHERE sd.current AND dfd."datasetId" = ${datasetId} +AND (s."reviewState" IS NULL OR s."reviewState" = 'edited') +AND ed.id IS NULL +`; + +const countUnprocessedSubmissions = (datasetId) => ({ oneFirst }) => oneFirst(_unprocessedSubmissions(datasetId, sql`COUNT(1)`)); + +const getUnprocessedSubmissions = (datasetId) => ({ all }) => + all(_unprocessedSubmissions(datasetId, sql`audits.*`)) + .then(map(construct(Audit))); module.exports = { createOrMerge, publishIfExists, - getList, get, getById, + getList, get, getById, getByActeeId, getMetadata, getProperties, getFieldsByFormDefId, - getDiff, update + getDiff, update, countUnprocessedSubmissions, + getUnprocessedSubmissions }; diff --git a/lib/model/query/entities.js b/lib/model/query/entities.js index 1b62581e9..74e10e30d 100644 --- a/lib/model/query/entities.js +++ b/lib/model/query/entities.js @@ -17,6 +17,7 @@ const { odataFilter } = require('../../data/odata-filter'); const { odataToColumnMap, parseSubmissionXml } = require('../../data/entity'); const { isTrue } = require('../../util/http'); const Problem = require('../../util/problem'); +const { runSequentially } = require('../../util/promise'); //////////////////////////////////////////////////////////////////////////////// // ENTITY CREATE @@ -90,8 +91,9 @@ const createSource = (details = null, subDefId = null, eventId = null) => ({ one // Entrypoint to where submissions (a specific version) become entities -const _processSubmissionDef = (event) => async ({ Datasets, Entities, Submissions, Forms, Audits }) => { - const { submissionDefId, submissionId } = event.details; +const _processSubmissionEvent = (event, parentEvent) => async ({ Datasets, Entities, Submissions, Forms, Audits }) => { + const { submissionId, submissionDefId } = event.details; + const existingEntity = await Entities.getDefBySubmissionId(submissionId); // If the submission has already been used to make an entity, don't try again // and don't log it as an error. @@ -130,7 +132,7 @@ const _processSubmissionDef = (event) => async ({ Datasets, Entities, Submission const partial = await Entity.fromParseEntityData(entityData); - const sourceDetails = { submission: { instanceId: submissionDef.instanceId } }; + const sourceDetails = { submission: { instanceId: submissionDef.instanceId }, parentEventId: parentEvent ? parentEvent.id : undefined }; const sourceId = await Entities.createSource(sourceDetails, submissionDefId, event.id); const entity = await Entities.createNew(dataset, partial, submissionDef, sourceId); @@ -144,9 +146,9 @@ const _processSubmissionDef = (event) => async ({ Datasets, Entities, Submission }); }; -const processSubmissionEvent = (event) => (container) => +const processSubmissionEvent = (event, parentEvent) => (container) => container.db.transaction((trxn) => - container.with({ db: trxn }).Entities._processSubmissionDef(event)) + container.with({ db: trxn }).Entities._processSubmissionEvent(event, parentEvent)) .catch((err) => // err could be any kind of problem, from an entity violation error, to a // database constraint error, to some other kind of error within the processing code. @@ -163,6 +165,14 @@ const processSubmissionEvent = (event) => (container) => errorMessage: err.message, problem: (err.isProblem === true) ? err : null })); +const createEntitiesFromPendingSubmissions = (submissionEvents, parentEvent) => (container) => + // run sequentially because we want to isolate transaction for each submission + runSequentially(submissionEvents.map(event => + () => processSubmissionEvent(event, parentEvent)(container))); + + + + //////////////////////////////////////////////////////////////////////////////// // UPDATING ENTITIES @@ -311,11 +321,12 @@ const del = (entity) => ({ run }) => del.audit = (entity, dataset) => (log) => log('entity.delete', entity.with({ acteeId: dataset.acteeId }), { uuid: entity.uuid }); module.exports = { - createNew, _processSubmissionDef, + createNew, _processSubmissionEvent, createSource, processSubmissionEvent, streamForExport, getDefBySubmissionId, createVersion, countByDatasetId, getById, - getAll, getAllDefs, del + getAll, getAllDefs, del, + createEntitiesFromPendingSubmissions }; diff --git a/lib/resources/datasets.js b/lib/resources/datasets.js index 21f4211fa..c22ffd423 100644 --- a/lib/resources/datasets.js +++ b/lib/resources/datasets.js @@ -13,6 +13,7 @@ const { streamEntityCsv } = require('../data/entity'); const { contentDisposition, withEtag } = require('../util/http'); const { md5sum } = require('../util/crypto'); const { Dataset } = require('../model/frames'); +const Problem = require('../util/problem'); module.exports = (service, endpoint) => { service.get('/projects/:id/datasets', endpoint(({ Projects, Datasets }, { auth, params, queryOptions }) => @@ -27,10 +28,25 @@ module.exports = (service, endpoint) => { .then((dataset) => auth.canOrReject('dataset.read', dataset) .then(() => Datasets.getMetadata(dataset))))); - service.patch('/projects/:projectId/datasets/:name', endpoint(async ({ Datasets }, { params, body, auth }) => { - const dataset = await Datasets.get(params.projectId, params.name).then(getOrNotFound); + service.patch('/projects/:projectId/datasets/:name', endpoint(async ({ Datasets }, { params, body, auth, query }) => { + const dataset = await Datasets.get(params.projectId, params.name, true).then(getOrNotFound); await auth.canOrReject('dataset.update', dataset); - const updatedDataset = await Datasets.update(dataset, Dataset.fromApi(body)); + const newDataset = Dataset.fromApi(body); + + // validate value of convert query parameter + if (query.convert !== undefined && query.convert !== 'true' && query.convert !== 'false') + return Problem.user.unexpectedValue({ field: 'convert', value: query.convert }); + + // return warning if approvalRequired is false and there are pending submissions + if (!newDataset.approvalRequired) { + if (query.convert === undefined) { + const unprocessedSubmissions = await Datasets.countUnprocessedSubmissions(dataset.id); + if (unprocessedSubmissions > 0) { + return Problem.user.pendingSubmissions({ count: unprocessedSubmissions }); + } + } + } + const updatedDataset = await Datasets.update(dataset, newDataset, query.convert === 'true'); return Datasets.getMetadata(updatedDataset); })); diff --git a/lib/util/problem.js b/lib/util/problem.js index cd23d20aa..af3aca838 100644 --- a/lib/util/problem.js +++ b/lib/util/problem.js @@ -114,6 +114,9 @@ const problems = { // problem parsing the entity data (probably JSON) itself invalidEntity: problem(400.28, ({ reason }) => `The entity is invalid. ${reason}`), + // warning: there are pending submissions + pendingSubmissions: problem(400.29, ({ count }) => `There are ${count} pending submissions`), + // no detail information for security reasons. authenticationFailed: problem(401.2, () => 'Could not authenticate with the provided credentials.'), diff --git a/lib/util/promise.js b/lib/util/promise.js index 78d4a42d2..75a901155 100644 --- a/lib/util/promise.js +++ b/lib/util/promise.js @@ -74,11 +74,17 @@ const block = () => { // inspired from https://stackoverflow.com/questions/54867318/sequential-execution-of-promise-all const runSequentially = async (functions) => { - if (functions.length === 0) { - return []; + const results = []; + + for (const fn of functions) { + // reason: we want to run functions sequentially + // Current use: each function is dependent on DB transaction + // we can't do parallel processing with transaction + // eslint-disable-next-line no-await-in-loop + results.push(await fn()); } - const [first, ...rest] = functions; - return [await first(), ...(await runSequentially(rest))]; + + return results; }; module.exports = { diff --git a/lib/worker/dataset.js b/lib/worker/dataset.js new file mode 100644 index 000000000..27de24a22 --- /dev/null +++ b/lib/worker/dataset.js @@ -0,0 +1,21 @@ +// Copyright 2023 ODK Central Developers +// See the NOTICE file at the top-level directory of this distribution and at +// https://github.com/getodk/central-backend/blob/master/NOTICE. +// This file is part of ODK Central. It is subject to the license terms in +// the LICENSE file found in the top-level directory of this distribution and at +// https://www.apache.org/licenses/LICENSE-2.0. No part of ODK Central, +// including this file, may be copied, modified, propagated, or distributed +// except according to the terms contained in the LICENSE file. + +const { getOrNotFound } = require('../util/promise'); + +const createEntitiesFromPendingSubmissions = async ({ Entities, Datasets }, event) => { + if (event.details.data && !event.details.data.approvalRequired && event.details.autoConvert) { + const dataset = await Datasets.getByActeeId(event.acteeId).then(getOrNotFound); + const pendingSubmissions = await Datasets.getUnprocessedSubmissions(dataset.id); + + await Entities.createEntitiesFromPendingSubmissions(pendingSubmissions, event); + } +}; + +module.exports = { createEntitiesFromPendingSubmissions }; diff --git a/lib/worker/jobs.js b/lib/worker/jobs.js index 9c536ba73..2f4906ee3 100644 --- a/lib/worker/jobs.js +++ b/lib/worker/jobs.js @@ -24,7 +24,9 @@ const jobs = { 'form.update.publish': [ require('./form').updatePublish ], 'upgrade.process.form.draft': [ require('./form').updateDraftSet ], - 'upgrade.process.form': [ require('./form').updatePublish ] + 'upgrade.process.form': [ require('./form').updatePublish ], + + 'dataset.update': [ require('./dataset').createEntitiesFromPendingSubmissions ] }; module.exports = { jobs }; diff --git a/test/integration/api/datasets.js b/test/integration/api/datasets.js index 0d696f12c..a668820aa 100644 --- a/test/integration/api/datasets.js +++ b/test/integration/api/datasets.js @@ -7,6 +7,7 @@ const { getOrNotFound } = require('../../../lib/util/promise'); const { omit } = require('ramda'); const should = require('should'); const { sql } = require('slonik'); +const { QueryOptions } = require('../../../lib/util/db'); /* eslint-disable import/no-dynamic-require */ const { exhaust } = require(appRoot + '/lib/worker/worker'); @@ -2191,7 +2192,7 @@ describe('datasets and entities', () => { it('should reject if the user cannot read', testService(async (service) => { const asAlice = await service.login('alice'); - await asAlice.post('/v1/projects/1/forms') + await asAlice.post('/v1/projects/1/forms?publish=true') .send(testData.forms.simpleEntity) .set('Content-Type', 'application/xml') .expect(200); @@ -2206,7 +2207,7 @@ describe('datasets and entities', () => { const asAlice = await service.login('alice'); - await asAlice.post('/v1/projects/1/forms') + await asAlice.post('/v1/projects/1/forms?publish=true') .send(testData.forms.simpleEntity) .set('Content-Type', 'application/xml') .expect(200); @@ -2218,6 +2219,203 @@ describe('datasets and entities', () => { dataset.body.approvalRequired.should.equal(true); })); + + it('should return bad request if value of convert query param is invalid', testService(async (service) => { + + const asAlice = await service.login('alice'); + + await asAlice.post('/v1/projects/1/forms?publish=true') + .send(testData.forms.simpleEntity) + .set('Content-Type', 'application/xml') + .expect(200); + + await asAlice.patch('/v1/projects/1/datasets/people?convert=dummy') + .send({ approvalRequired: true }) + .expect(400) + .then(({ body }) => { + body.code.should.be.eql(400.8); + }); + + })); + + it('should return warning if there are pending submissions', testService(async (service, container) => { + const asAlice = await service.login('alice'); + + await asAlice.post('/v1/projects/1/forms?publish=true') + .send(testData.forms.simpleEntity) + .set('Content-Type', 'application/xml') + .expect(200); + + await asAlice.patch('/v1/projects/1/datasets/people') + .send({ approvalRequired: true }) + .expect(200); + + await asAlice.post('/v1/projects/1/forms/simpleEntity/submissions') + .send(testData.instances.simpleEntity.one) + .set('Content-Type', 'application/xml') + .expect(200); + + await asAlice.patch('/v1/projects/1/forms/simpleEntity/submissions/one') + .send({ reviewState: 'approved' }) + .expect(200); + + await exhaust(container); + + await asAlice.post('/v1/projects/1/forms/simpleEntity/submissions') + .send(testData.instances.simpleEntity.two) + .set('Content-Type', 'application/xml') + .expect(200); + + await asAlice.patch('/v1/projects/1/datasets/people') + .send({ approvalRequired: false }) + .expect(400) + .then(({ body }) => { + body.code.should.be.eql(400.29); + body.details.count.should.be.eql(1); + }); + })); + + it('should update the flag without automatic conversions', testService(async (service) => { + const asAlice = await service.login('alice'); + + await asAlice.post('/v1/projects/1/forms?publish=true') + .send(testData.forms.simpleEntity) + .set('Content-Type', 'application/xml') + .expect(200); + + await asAlice.patch('/v1/projects/1/datasets/people') + .send({ approvalRequired: true }) + .expect(200); + + await asAlice.post('/v1/projects/1/forms/simpleEntity/submissions') + .send(testData.instances.simpleEntity.one) + .set('Content-Type', 'application/xml') + .expect(200); + + await asAlice.patch('/v1/projects/1/datasets/people?convert=false') + .send({ approvalRequired: false }) + .expect(200) + .then(({ body }) => body.approvalRequired.should.be.false()); + + // there are no entities + await asAlice.get('/v1/projects/1/datasets/people/entities') + .expect(200) + .then(({ body }) => body.should.be.eql([])); + + })); + + it('should automatically convert pending submissions', testService(async (service, container) => { + const asAlice = await service.login('alice'); + + await asAlice.post('/v1/projects/1/forms?publish=true') + .send(testData.forms.simpleEntity) + .set('Content-Type', 'application/xml') + .expect(200); + + await asAlice.patch('/v1/projects/1/datasets/people') + .send({ approvalRequired: true }) + .expect(200); + + await asAlice.post('/v1/projects/1/forms/simpleEntity/submissions') + .send(testData.instances.simpleEntity.one) + .set('Content-Type', 'application/xml') + .expect(200); + + await asAlice.post('/v1/projects/1/forms/simpleEntity/submissions') + .send(testData.instances.simpleEntity.two) + .set('Content-Type', 'application/xml') + .expect(200); + + await exhaust(container); + + // There are no entities + await asAlice.get('/v1/projects/1/datasets/people/entities') + .expect(200) + .then(({ body }) => body.length.should.be.eql(0)); + + await asAlice.patch('/v1/projects/1/datasets/people?convert=true') + .send({ approvalRequired: false }) + .expect(200) + .then(({ body }) => body.approvalRequired.should.be.false()); + + await exhaust(container); + + // Entities are created now + await asAlice.get('/v1/projects/1/datasets/people/entities') + .expect(200) + .then(({ body }) => body.length.should.be.eql(2)); + + await asAlice.get('/v1/projects/1/datasets/people/entities/12345678-1234-4123-8234-123456789abc/audits') + .expect(200) + .then(({ body: logs }) => { + logs[0].should.be.an.Audit(); + logs[0].action.should.be.eql('entity.create'); + logs[0].actor.displayName.should.be.eql('Alice'); + + logs[0].details.submission.should.be.a.Submission(); + logs[0].details.submission.xmlFormId.should.be.eql('simpleEntity'); + logs[0].details.submission.currentVersion.instanceName.should.be.eql('one'); + logs[0].details.submission.currentVersion.submitter.displayName.should.be.eql('Alice'); + }); + + + + })); + + it('should log error if there is a problem in a submission while auto converting', testService(async (service, container) => { + const asAlice = await service.login('alice'); + + await asAlice.post('/v1/projects/1/forms?publish=true') + .send(testData.forms.simpleEntity) + .set('Content-Type', 'application/xml') + .expect(200); + + await asAlice.patch('/v1/projects/1/datasets/people') + .send({ approvalRequired: true }) + .expect(200); + + await asAlice.post('/v1/projects/1/forms/simpleEntity/submissions') + .send(testData.instances.simpleEntity.one.replace('Alice (88)', '')) //removing label + .set('Content-Type', 'application/xml') + .expect(200); + + await asAlice.post('/v1/projects/1/forms/simpleEntity/submissions') + .send(testData.instances.simpleEntity.two) + .set('Content-Type', 'application/xml') + .expect(200); + + await asAlice.post('/v1/projects/1/forms/simpleEntity/submissions') + .send(testData.instances.simpleEntity.three.replace('create="1"', 'create="0"')) // don't create entity + .set('Content-Type', 'application/xml') + .expect(200); + + await exhaust(container); + + // There are no entities + await asAlice.get('/v1/projects/1/datasets/people/entities') + .expect(200) + .then(({ body }) => body.length.should.be.eql(0)); + + await asAlice.patch('/v1/projects/1/datasets/people?convert=true') + .send({ approvalRequired: false }) + .expect(200) + .then(({ body }) => body.approvalRequired.should.be.false()); + + await exhaust(container); + + // One Entity is created + await asAlice.get('/v1/projects/1/datasets/people/entities') + .expect(200) + .then(({ body }) => { + body.length.should.be.eql(1); + }); + + const entityErrors = await container.Audits.get(new QueryOptions({ args: { action: 'entity.create.error' } })); + + entityErrors.length.should.be.eql(1); + entityErrors[0].details.errorMessage.should.match(/Label empty or missing/); + + })); }); }); }); diff --git a/test/integration/worker/entity.js b/test/integration/worker/entity.js index 5d9b44eed..dc43b17d9 100644 --- a/test/integration/worker/entity.js +++ b/test/integration/worker/entity.js @@ -701,7 +701,7 @@ describe('worker: entity', () => { await exhaust(container); - await asAlice.patch('/v1/projects/1/datasets/people') + await asAlice.patch('/v1/projects/1/datasets/people?convert=true') .send({ approvalRequired: false }) .expect(200);