From 6e0cabc09e921cb74812f6330807ab1c47d6299c Mon Sep 17 00:00:00 2001 From: Alban Mouton Date: Mon, 14 Mar 2022 10:23:15 +0100 Subject: [PATCH] feat: work on landing zone dataset --- .gitignore | 1 + config/default.js | 1 + index.js | 42 ++++++++++++++++++++--------------- lib/daily-state.js | 7 +++--- lib/import-validate.js | 11 +++++---- processing-config-schema.json | 12 ++++++++++ test/test.js | 3 ++- 7 files changed, 48 insertions(+), 29 deletions(-) diff --git a/.gitignore b/.gitignore index a8edca0..8cbcaaf 100644 --- a/.gitignore +++ b/.gitignore @@ -3,3 +3,4 @@ data/ /config/local-* errors.log misc/ +/qualifelec-ftp \ No newline at end of file diff --git a/config/default.js b/config/default.js index 9d7c847..f0c10ec 100644 --- a/config/default.js +++ b/config/default.js @@ -5,6 +5,7 @@ module.exports = { user: '', password: '' }, + ftpBasePath: '/www/sites/default/files/private/', dataFairUrl: '', dataFairAPIKey: '' } diff --git a/index.js b/index.js index c865916..1d93094 100644 --- a/index.js +++ b/index.js @@ -7,18 +7,16 @@ exports.run = async ({ pluginConfig, processingConfig, processingId, dir, axios, const FTPClient = require('promise-ftp') const ftp = new FTPClient() const serverMessage = await ftp.connect({ - ftpOptions: { - host: 'localhost', - port: 21, - user: undefined, - password: undefined, - connTimeout: 30000, - pasvTimeout: 30000, - keepalive: 30000, - autoReconnect: true - }, + host: 'localhost', + port: 21, + user: undefined, + password: undefined, + connTimeout: 30000, + pasvTimeout: 30000, + autoReconnect: true, ...pluginConfig.ftpOptions }) + pluginConfig.ftpBasePath = pluginConfig.ftpBasePath || '/www/sites/default/files/private/' await log.info('connecté : ' + serverMessage) await log.step('Récupération des données de référence liées') @@ -30,20 +28,25 @@ exports.run = async ({ pluginConfig, processingConfig, processingId, dir, axios, const contactsOrganismesLines = (await axios.get(`api/v1/datasets/${processingConfig.datasetContactsOrganismes.id}/lines`, { params: { size: 10000 } })).data.results const contactsOrganismes = contactsOrganismesLines.reduce((a, ca) => { a[ca.ORGANISME] = ca; return a }, {}) await log.info(`${contactsOrganismesLines.length} lignes dans les données de référence "${processingConfig.datasetContactsOrganismes.title}"`) + if (processingConfig.datasetLandingZone) { + await log.info(`synchronisation des fichers déposés sur le jeu de données "${processingConfig.datasetLandingZone.title}"`) + await axios.post(`api/v1/datasets/${processingConfig.datasetLandingZone.id}/_sync_attachments_lines`) + } // read .tar.gz uploaded by partners, and move content to archive if it is valid or error folder otherwise const { downloadAndValidate, moveToFtp, sendValidationErrors } = require('./lib/import-validate') for (const folder of processingConfig.folders) { log.step(`Import et validation du répertoire ${folder}`) await log.info('récupération de la liste des fichiers dans le répertoire') - const files = await ftp.list(ftpPath(folder)) + console.log(pluginConfig) + const files = await ftp.list(path.join(pluginConfig.ftpBasePath, folder)) const csvs = files.map(f => f.name).filter(f => f.endsWith('.csv')) if (csvs.length) { - const errors = await downloadAndValidate(ftp, dir, folder, csvs, log) + const errors = await downloadAndValidate(ftp, dir, folder, csvs, pluginConfig.ftpBasePath, log) if (errors.length) { await sendValidationErrors(folder, contactsOrganismes, errors, log, sendMail) } - await moveToFtp(ftp, dir, folder, !!errors.length, log) + await moveToFtp(ftp, dir, folder, !!errors.length, pluginConfig.ftpBasePath, log) } else { await log.info('aucun fichier à importer') } @@ -87,7 +90,7 @@ exports.run = async ({ pluginConfig, processingConfig, processingId, dir, axios, log.step(`Traitement du répertoire ${folder}`) await fs.ensureDir(path.join(dir, folder)) await log.info(`récupération de la liste des fichiers dans le répertoire ${folder}/archive`) - const files = await ftp.list(ftpPath(folder + '/archive')) + const files = await ftp.list(path.join(pluginConfig.ftpBasePath, folder, '/archive')) let days = Array.from(new Set(files.map(f => f.name.split('-').shift()).filter(f => f.length === 8 && !f.includes('.')))) .map(day2date) days.sort() @@ -102,7 +105,7 @@ exports.run = async ({ pluginConfig, processingConfig, processingId, dir, axios, await log.info(`nombre de jours déjà traités : ${nbProcessedDays}`) days = days.slice(nbProcessedDays) await log.info(`téléchargement de l'état au dernier jour traité ${lastProcessedDay}`) - previousState = await readDailyState(ftp, dir, folder, lastProcessedDay, qualifDomaine, log) + previousState = await readDailyState(ftp, dir, folder, lastProcessedDay, qualifDomaine, pluginConfig.ftpBasePath, log) previousDay = lastProcessedDay } @@ -112,7 +115,7 @@ exports.run = async ({ pluginConfig, processingConfig, processingId, dir, axios, } for (const day of days) { - const state = await readDailyState(ftp, dir, folder, day, qualifDomaine, log) + const state = await readDailyState(ftp, dir, folder, day, qualifDomaine, pluginConfig.ftpBasePath, log) const { stats, bulk } = await require('./lib/diff-bulk')(previousState, state, previousDay, day, historyData) await log.info(`enregistrement des modifications pour le jour ${day} : ouvertures=${stats.created}, fermetures=${stats.closed}, modifications=${stats.updated}, inchangés=${stats.unmodified}`) while (bulk.length) { @@ -128,6 +131,9 @@ exports.run = async ({ pluginConfig, processingConfig, processingId, dir, axios, } } await repairDomains(axios, dataset, qualifDomaine, log) -} -const ftpPath = (folder) => `/www/sites/default/files/private/${folder}` + if (processingConfig.datasetLandingZone) { + await log.info(`synchronisation des fichers déposés sur le jeu de données "${processingConfig.datasetLandingZone.title}"`) + await axios.post(`api/v1/datasets/${processingConfig.datasetLandingZone.id}/_sync_attachments_lines`) + } +} diff --git a/lib/daily-state.js b/lib/daily-state.js index 8a24034..7a21b5b 100644 --- a/lib/daily-state.js +++ b/lib/daily-state.js @@ -6,18 +6,17 @@ const parserOpts = { delimiter: ';', quote: '', record_delimiter: '\r\n' } const path = require('path') const fs = require('fs-extra') -const ftpPath = (folder) => `/www/sites/default/files/private/${folder}/archive` const { day2date, parseNumber } = require('./format') // read the 3 files 'entreprises', 'qualifications' and 'liens' and build an object with a daily state -exports.readDailyState = async (ftp, dir, folder, day, qualifDomaine, log) => { +exports.readDailyState = async (ftp, dir, folder, day, qualifDomaine, ftpBasepath, log) => { const files = {} await fs.ensureDir(path.join(dir, folder, day)) for (const file of ['qualifications', 'entreprises', 'liens']) { const filePath = path.join(dir, folder, day, `${file}.csv`) - if (!await fs.exists(filePath)) { - const ftpFilePath = ftpPath(folder) + `/${day.replace(/-/g, '')}-${file}.csv` + if (!await fs.pathExists(filePath)) { + const ftpFilePath = path.join(ftpBasepath, folder, `/archive/${day.replace(/-/g, '')}-${file}.csv`) await log.debug('téléchargement du fichier ' + ftpFilePath) // creating empty file before streaming seems to fix some weird bugs with NFS diff --git a/lib/import-validate.js b/lib/import-validate.js index b03e1eb..f544029 100644 --- a/lib/import-validate.js +++ b/lib/import-validate.js @@ -5,7 +5,6 @@ const Iconv = require('iconv').Iconv const iconv = new Iconv('latin1', 'utf-8') const parse = require('csv-parse/lib/sync') const parserOpts = { delimiter: ';', quote: '', record_delimiter: '\r\n' } -const ftpPath = '/www/sites/default/files/private/' const { parseNumber, date2day } = require('./format') @@ -151,7 +150,7 @@ const validateLiensLine = (line, i, errors, context) => { } // les fichiers déposés par les organismes sont validés puis déplacés dans ./archive -exports.downloadAndValidate = async (ftp, dir, folder, files, log) => { +exports.downloadAndValidate = async (ftp, dir, folder, files, ftpBasePath, log) => { // this function logs errors and returns them in an array, ready to be sent to a contact let errors = [] @@ -163,7 +162,7 @@ exports.downloadAndValidate = async (ftp, dir, folder, files, log) => { const filePath = path.join(importFolder, file) // creating empty file before streaming seems to fix some weird bugs with NFS await fs.ensureFile(filePath + '.tmp') - await pump(await ftp.get(path.join(ftpPath, folder, file)), fs.createWriteStream(filePath + '.tmp')) + await pump(await ftp.get(path.join(ftpBasePath, folder, file)), fs.createWriteStream(filePath + '.tmp')) // Try to prevent weird bug with NFS by forcing syncing file before reading it const fd = await fs.open(filePath + '.tmp', 'r') await fs.fsync(fd) @@ -200,7 +199,7 @@ exports.downloadAndValidate = async (ftp, dir, folder, files, log) => { return errors } -exports.moveToFtp = async (ftp, dir, folder, error, log) => { +exports.moveToFtp = async (ftp, dir, folder, error, ftpBasePath, log) => { const day = date2day(new Date()) const importFolder = path.join(dir, folder, 'import') const ftpDir = path.join(folder, error ? '/erreur' : '/archive') @@ -208,8 +207,8 @@ exports.moveToFtp = async (ftp, dir, folder, error, log) => { const filePath = path.join(importFolder, `${file}.csv`) if (await fs.pathExists(filePath)) { await log.info(`${folder}/${file}.csv -> ` + path.join(ftpDir, `${day}-${file}.csv`)) - await ftp.put(await fs.readFile(filePath), path.join(ftpPath, ftpDir, `${day}-${file}.csv`)) - await ftp.delete(path.join(ftpPath, folder, `${file}.csv`)) + await ftp.put(await fs.readFile(filePath), path.join(ftpBasePath, ftpDir, `${day}-${file}.csv`)) + await ftp.delete(path.join(ftpBasePath, folder, `${file}.csv`)) } } await fs.remove(importFolder) diff --git a/processing-config-schema.json b/processing-config-schema.json index 8799328..c838867 100644 --- a/processing-config-schema.json +++ b/processing-config-schema.json @@ -80,6 +80,18 @@ "title": {"type": "string", "title": "Titre"} } }, + "datasetLandingZone": { + "title": "Jeu de données de dépôt des données", + "type": "object", + "x-fromUrl": "{context.dataFairUrl}/api/v1/datasets?q={q}&select=id,title&{context.ownerFilter}", + "x-itemsProp": "results", + "x-itemTitle": "title", + "x-itemKey": "id", + "properties": { + "id": {"type": "string", "title": "Identifiant"}, + "title": {"type": "string", "title": "Titre"} + } + }, "folders": { "type": "array", "title": "Répertoires", diff --git a/test/test.js b/test/test.js index a0f0c8c..3fe2e13 100644 --- a/test/test.js +++ b/test/test.js @@ -48,7 +48,8 @@ describe('Hello world processing', () => { // await fs.emptyDir('data/') await ademeRGE.run({ pluginConfig: { - ftpOptions: config.ftpOptions + ftpOptions: config.ftpOptions, + ftpBasePath: config.ftpBasePath }, processingConfig: { datasetMode: 'create',