Skip to content

Commit

Permalink
feat: work on landing zone dataset
Browse files Browse the repository at this point in the history
  • Loading branch information
albanm committed Mar 14, 2022
1 parent 44e01f3 commit 6e0cabc
Show file tree
Hide file tree
Showing 7 changed files with 48 additions and 29 deletions.
1 change: 1 addition & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -3,3 +3,4 @@ data/
/config/local-*
errors.log
misc/
/qualifelec-ftp
1 change: 1 addition & 0 deletions config/default.js
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ module.exports = {
user: '',
password: ''
},
ftpBasePath: '/www/sites/default/files/private/',
dataFairUrl: '',
dataFairAPIKey: ''
}
42 changes: 24 additions & 18 deletions index.js
Original file line number Diff line number Diff line change
Expand Up @@ -7,18 +7,16 @@ exports.run = async ({ pluginConfig, processingConfig, processingId, dir, axios,
const FTPClient = require('promise-ftp')
const ftp = new FTPClient()
const serverMessage = await ftp.connect({
ftpOptions: {
host: 'localhost',
port: 21,
user: undefined,
password: undefined,
connTimeout: 30000,
pasvTimeout: 30000,
keepalive: 30000,
autoReconnect: true
},
host: 'localhost',
port: 21,
user: undefined,
password: undefined,
connTimeout: 30000,
pasvTimeout: 30000,
autoReconnect: true,
...pluginConfig.ftpOptions
})
pluginConfig.ftpBasePath = pluginConfig.ftpBasePath || '/www/sites/default/files/private/'
await log.info('connecté : ' + serverMessage)

await log.step('Récupération des données de référence liées')
Expand All @@ -30,20 +28,25 @@ exports.run = async ({ pluginConfig, processingConfig, processingId, dir, axios,
const contactsOrganismesLines = (await axios.get(`api/v1/datasets/${processingConfig.datasetContactsOrganismes.id}/lines`, { params: { size: 10000 } })).data.results
const contactsOrganismes = contactsOrganismesLines.reduce((a, ca) => { a[ca.ORGANISME] = ca; return a }, {})
await log.info(`${contactsOrganismesLines.length} lignes dans les données de référence "${processingConfig.datasetContactsOrganismes.title}"`)
if (processingConfig.datasetLandingZone) {
await log.info(`synchronisation des fichers déposés sur le jeu de données "${processingConfig.datasetLandingZone.title}"`)
await axios.post(`api/v1/datasets/${processingConfig.datasetLandingZone.id}/_sync_attachments_lines`)
}

// read .tar.gz uploaded by partners, and move content to archive if it is valid or error folder otherwise
const { downloadAndValidate, moveToFtp, sendValidationErrors } = require('./lib/import-validate')
for (const folder of processingConfig.folders) {
log.step(`Import et validation du répertoire ${folder}`)
await log.info('récupération de la liste des fichiers dans le répertoire')
const files = await ftp.list(ftpPath(folder))
console.log(pluginConfig)
const files = await ftp.list(path.join(pluginConfig.ftpBasePath, folder))
const csvs = files.map(f => f.name).filter(f => f.endsWith('.csv'))
if (csvs.length) {
const errors = await downloadAndValidate(ftp, dir, folder, csvs, log)
const errors = await downloadAndValidate(ftp, dir, folder, csvs, pluginConfig.ftpBasePath, log)
if (errors.length) {
await sendValidationErrors(folder, contactsOrganismes, errors, log, sendMail)
}
await moveToFtp(ftp, dir, folder, !!errors.length, log)
await moveToFtp(ftp, dir, folder, !!errors.length, pluginConfig.ftpBasePath, log)
} else {
await log.info('aucun fichier à importer')
}
Expand Down Expand Up @@ -87,7 +90,7 @@ exports.run = async ({ pluginConfig, processingConfig, processingId, dir, axios,
log.step(`Traitement du répertoire ${folder}`)
await fs.ensureDir(path.join(dir, folder))
await log.info(`récupération de la liste des fichiers dans le répertoire ${folder}/archive`)
const files = await ftp.list(ftpPath(folder + '/archive'))
const files = await ftp.list(path.join(pluginConfig.ftpBasePath, folder, '/archive'))
let days = Array.from(new Set(files.map(f => f.name.split('-').shift()).filter(f => f.length === 8 && !f.includes('.'))))
.map(day2date)
days.sort()
Expand All @@ -102,7 +105,7 @@ exports.run = async ({ pluginConfig, processingConfig, processingId, dir, axios,
await log.info(`nombre de jours déjà traités : ${nbProcessedDays}`)
days = days.slice(nbProcessedDays)
await log.info(`téléchargement de l'état au dernier jour traité ${lastProcessedDay}`)
previousState = await readDailyState(ftp, dir, folder, lastProcessedDay, qualifDomaine, log)
previousState = await readDailyState(ftp, dir, folder, lastProcessedDay, qualifDomaine, pluginConfig.ftpBasePath, log)
previousDay = lastProcessedDay
}

Expand All @@ -112,7 +115,7 @@ exports.run = async ({ pluginConfig, processingConfig, processingId, dir, axios,
}

for (const day of days) {
const state = await readDailyState(ftp, dir, folder, day, qualifDomaine, log)
const state = await readDailyState(ftp, dir, folder, day, qualifDomaine, pluginConfig.ftpBasePath, log)
const { stats, bulk } = await require('./lib/diff-bulk')(previousState, state, previousDay, day, historyData)
await log.info(`enregistrement des modifications pour le jour ${day} : ouvertures=${stats.created}, fermetures=${stats.closed}, modifications=${stats.updated}, inchangés=${stats.unmodified}`)
while (bulk.length) {
Expand All @@ -128,6 +131,9 @@ exports.run = async ({ pluginConfig, processingConfig, processingId, dir, axios,
}
}
await repairDomains(axios, dataset, qualifDomaine, log)
}

const ftpPath = (folder) => `/www/sites/default/files/private/${folder}`
if (processingConfig.datasetLandingZone) {
await log.info(`synchronisation des fichers déposés sur le jeu de données "${processingConfig.datasetLandingZone.title}"`)
await axios.post(`api/v1/datasets/${processingConfig.datasetLandingZone.id}/_sync_attachments_lines`)
}
}
7 changes: 3 additions & 4 deletions lib/daily-state.js
Original file line number Diff line number Diff line change
Expand Up @@ -6,18 +6,17 @@ const parserOpts = { delimiter: ';', quote: '', record_delimiter: '\r\n' }

const path = require('path')
const fs = require('fs-extra')
const ftpPath = (folder) => `/www/sites/default/files/private/${folder}/archive`

const { day2date, parseNumber } = require('./format')

// read the 3 files 'entreprises', 'qualifications' and 'liens' and build an object with a daily state
exports.readDailyState = async (ftp, dir, folder, day, qualifDomaine, log) => {
exports.readDailyState = async (ftp, dir, folder, day, qualifDomaine, ftpBasepath, log) => {
const files = {}
await fs.ensureDir(path.join(dir, folder, day))
for (const file of ['qualifications', 'entreprises', 'liens']) {
const filePath = path.join(dir, folder, day, `${file}.csv`)
if (!await fs.exists(filePath)) {
const ftpFilePath = ftpPath(folder) + `/${day.replace(/-/g, '')}-${file}.csv`
if (!await fs.pathExists(filePath)) {
const ftpFilePath = path.join(ftpBasepath, folder, `/archive/${day.replace(/-/g, '')}-${file}.csv`)

await log.debug('téléchargement du fichier ' + ftpFilePath)
// creating empty file before streaming seems to fix some weird bugs with NFS
Expand Down
11 changes: 5 additions & 6 deletions lib/import-validate.js
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,6 @@ const Iconv = require('iconv').Iconv
const iconv = new Iconv('latin1', 'utf-8')
const parse = require('csv-parse/lib/sync')
const parserOpts = { delimiter: ';', quote: '', record_delimiter: '\r\n' }
const ftpPath = '/www/sites/default/files/private/'

const { parseNumber, date2day } = require('./format')

Expand Down Expand Up @@ -151,7 +150,7 @@ const validateLiensLine = (line, i, errors, context) => {
}

// les fichiers déposés par les organismes sont validés puis déplacés dans ./archive
exports.downloadAndValidate = async (ftp, dir, folder, files, log) => {
exports.downloadAndValidate = async (ftp, dir, folder, files, ftpBasePath, log) => {
// this function logs errors and returns them in an array, ready to be sent to a contact
let errors = []

Expand All @@ -163,7 +162,7 @@ exports.downloadAndValidate = async (ftp, dir, folder, files, log) => {
const filePath = path.join(importFolder, file)
// creating empty file before streaming seems to fix some weird bugs with NFS
await fs.ensureFile(filePath + '.tmp')
await pump(await ftp.get(path.join(ftpPath, folder, file)), fs.createWriteStream(filePath + '.tmp'))
await pump(await ftp.get(path.join(ftpBasePath, folder, file)), fs.createWriteStream(filePath + '.tmp'))
// Try to prevent weird bug with NFS by forcing syncing file before reading it
const fd = await fs.open(filePath + '.tmp', 'r')
await fs.fsync(fd)
Expand Down Expand Up @@ -200,16 +199,16 @@ exports.downloadAndValidate = async (ftp, dir, folder, files, log) => {
return errors
}

exports.moveToFtp = async (ftp, dir, folder, error, log) => {
exports.moveToFtp = async (ftp, dir, folder, error, ftpBasePath, log) => {
const day = date2day(new Date())
const importFolder = path.join(dir, folder, 'import')
const ftpDir = path.join(folder, error ? '/erreur' : '/archive')
for (const file of ['entreprises', 'qualifications', 'liens']) {
const filePath = path.join(importFolder, `${file}.csv`)
if (await fs.pathExists(filePath)) {
await log.info(`${folder}/${file}.csv -> ` + path.join(ftpDir, `${day}-${file}.csv`))
await ftp.put(await fs.readFile(filePath), path.join(ftpPath, ftpDir, `${day}-${file}.csv`))
await ftp.delete(path.join(ftpPath, folder, `${file}.csv`))
await ftp.put(await fs.readFile(filePath), path.join(ftpBasePath, ftpDir, `${day}-${file}.csv`))
await ftp.delete(path.join(ftpBasePath, folder, `${file}.csv`))
}
}
await fs.remove(importFolder)
Expand Down
12 changes: 12 additions & 0 deletions processing-config-schema.json
Original file line number Diff line number Diff line change
Expand Up @@ -80,6 +80,18 @@
"title": {"type": "string", "title": "Titre"}
}
},
"datasetLandingZone": {
"title": "Jeu de données de dépôt des données",
"type": "object",
"x-fromUrl": "{context.dataFairUrl}/api/v1/datasets?q={q}&select=id,title&{context.ownerFilter}",
"x-itemsProp": "results",
"x-itemTitle": "title",
"x-itemKey": "id",
"properties": {
"id": {"type": "string", "title": "Identifiant"},
"title": {"type": "string", "title": "Titre"}
}
},
"folders": {
"type": "array",
"title": "Répertoires",
Expand Down
3 changes: 2 additions & 1 deletion test/test.js
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,8 @@ describe('Hello world processing', () => {
// await fs.emptyDir('data/')
await ademeRGE.run({
pluginConfig: {
ftpOptions: config.ftpOptions
ftpOptions: config.ftpOptions,
ftpBasePath: config.ftpBasePath
},
processingConfig: {
datasetMode: 'create',
Expand Down

0 comments on commit 6e0cabc

Please sign in to comment.