diff --git a/.idea/crypto_alerts_api.iml b/.idea/crypto_alerts_api.iml index 24643cc..9f9fc8c 100644 --- a/.idea/crypto_alerts_api.iml +++ b/.idea/crypto_alerts_api.iml @@ -8,5 +8,6 @@ + \ No newline at end of file diff --git a/.idea/modules.xml b/.idea/modules.xml index caa8372..01d4e86 100644 --- a/.idea/modules.xml +++ b/.idea/modules.xml @@ -3,6 +3,7 @@ + \ No newline at end of file diff --git a/src/constants/constants.ts b/src/constants/constants.ts index 59a502e..41a366a 100644 --- a/src/constants/constants.ts +++ b/src/constants/constants.ts @@ -2,7 +2,8 @@ export default { channels: { pending_notifications: 'pending-notifications', update_alert_triggered: 'update_alert_triggered', - add_alert_listener: 'add_alert_listener' + add_alert_listener: 'add_alert_listener', + delete_alert_listener: 'delete_alert_listener' }, notification_workflow_ids: { crypto_price_alert: 'crypto-price-alert' diff --git a/src/interfaces/alert.model.ts b/src/interfaces/alert.model.ts index 6903329..1f09848 100644 --- a/src/interfaces/alert.model.ts +++ b/src/interfaces/alert.model.ts @@ -3,7 +3,7 @@ import {ObjectId} from "mongodb"; export default interface AlertModel { _id?: ObjectId | undefined; pair: string; - type: 'gt_price' + type: 'gt_price' | 'lt_price'; status: 'active' | 'triggered' | 'cancelled'; price: { current?: number; diff --git a/src/interfaces/create-alert-dto.model.ts b/src/interfaces/create-alert-dto.model.ts index 595be3f..1ebb60d 100644 --- a/src/interfaces/create-alert-dto.model.ts +++ b/src/interfaces/create-alert-dto.model.ts @@ -2,7 +2,7 @@ import { NotificationChannel } from './alert.model'; export default interface CreateAlertDtoModel { pair: string; - type: 'gt_price' + type: 'gt_price' | 'lt_price'; price: { value: number; current?: number; diff --git a/src/repository/alert-repository.ts b/src/repository/alert-repository.ts index 2e12b7a..5a66456 100644 --- a/src/repository/alert-repository.ts +++ b/src/repository/alert-repository.ts @@ -51,12 +51,24 @@ export default class AlertRepository { try { return await (await this.client()) .collection('alerts') - .find().toArray(); + .find() + .map(doc => { + return { + _id: doc._id, + pair: doc.pair, + type: doc.type, + status: doc.status, + price: doc.price, + notification: doc.notification, + triggerInfo: doc.triggerInfo, + created_at: doc.created_at, + } as AlertModel; + }) + .toArray(); } catch (e) { console.error('Error listing alerts', e); throw new AppError('Error listing alerts'); } - } async listActiveAlertSymbols() { @@ -102,4 +114,15 @@ export default class AlertRepository { throw new AppError('Error updating alert'); } } + + async delete(id: string) { + try { + return await (await this.client()) + .collection(this._collectionName) + .findOneAndDelete({_id: new ObjectId(id)}); + } catch (e) { + console.error('Error deleting alert', e); + throw new AppError('Error deleting alert'); + } + } } \ No newline at end of file diff --git a/src/routes/alerts.ts b/src/routes/alerts.ts index b21f98f..d00f074 100644 --- a/src/routes/alerts.ts +++ b/src/routes/alerts.ts @@ -8,11 +8,18 @@ const router = express.Router(); type AlertsResponse = { content: any[] + error?: string }; router.get<{}, AlertsResponse>('/', (req, res) => { console.log('GET /routes/v1/alerts', req.query); - res.json({ content: [] }); + alertService.list() + .then((alerts) => { + res.json({ content: alerts }); + }).catch((e) => { + console.error('Error listing alerts', e); + res.status(500).json({ error: 'Error listing alerts', content: []}); + }); }); router.post('/', (req, res) => { @@ -28,4 +35,17 @@ router.post('/', (req, res) => { }); }); +router.delete<{ id: string }, {}>('/:id', (req, res) => { + console.log('DELETE /routes/v1/alerts/:id', req.params); + alertService.delete(req.params.id) + .then(() => { + console.log('AlertModel deleted'); + res.status(204).end(); + }) + .catch((e: any) => { + console.error('Error deleting alert', e); + res.status(500).json({ error: 'Error deleting alert' }); + }); +}); + export default router; diff --git a/src/services/alerts.service.ts b/src/services/alerts.service.ts index ed0a200..5b77e8d 100644 --- a/src/services/alerts.service.ts +++ b/src/services/alerts.service.ts @@ -5,6 +5,7 @@ import Ajv from "ajv"; import create_alert_dto_schema from "../schemas/create-alert-dto.schema"; import RedisConfig from "../config/redis.config"; import Constants from "../constants/constants"; +import {WithId} from "mongodb"; export default class AlertsService { _alertsRepository = new AlertRepository(); @@ -47,6 +48,37 @@ export default class AlertsService { (await this._publisher).publish(Constants.channels.add_alert_listener, JSON.stringify(alert)); } + async list(): Promise { + return new Promise(async (resolve, reject) => { + this._alertsRepository.list() + .then((alerts) => { + if(alerts) { + resolve(alerts); + } else { + reject('No alerts found'); + } + }) + .catch((e) => reject(e)); + }); + } + + async delete(id: string) { + this._alertsRepository + .delete(id) + .then(async (doc) => { + if(!doc) { + console.error('Alert not found'); + throw new Error('Alert not found'); + } + console.log('Alert deleted ', doc._id); + (await this._publisher).publish(Constants.channels.delete_alert_listener, JSON.stringify(doc)); + }) + .catch((e: any) => { + console.error('Error deleting alert', e); + throw new Error('Error deleting alert'); + }); + } + async updateAsTriggered(message: string) { console.debug('Updating alert as triggered ', message); const alert: AlertModel = JSON.parse(message); diff --git a/src/workers/price-listener.worker.ts b/src/workers/price-listener.worker.ts index fc91572..cf704fe 100644 --- a/src/workers/price-listener.worker.ts +++ b/src/workers/price-listener.worker.ts @@ -4,6 +4,7 @@ import AlertModel from "../interfaces/alert.model"; import MarkPriceUpdate from "../interfaces/mark-price-update.model"; import RedisConfig from "../config/redis.config"; import Constants from "../constants/constants"; +import Alerts from "../routes/alerts"; export default class PriceListenerWorker { @@ -46,7 +47,8 @@ export default class PriceListenerWorker { async startSubscribers() { console.log('Starting price-watcher-worker subscribers'); - (await this._subscriber).subscribe(Constants.channels.add_alert_listener, (message) => this.addListener(message)) + (await this._subscriber).subscribe(Constants.channels.add_alert_listener, (message) => this.addListener(message)); + (await this._subscriber).subscribe(Constants.channels.delete_alert_listener, (message) => this.removeListener(message)) } async startMonitoring(symbols: string[]) { @@ -79,7 +81,7 @@ export default class PriceListenerWorker { if (message.id) { this.handlePendingResponse(message); } else if (message.stream) { - console.log('Stream message: ', message); + // console.info('Stream message: ', message); this.processPriceUpdate(message); } }); @@ -120,6 +122,13 @@ export default class PriceListenerWorker { this._pendingResponses.set(ref.toString(), payload); } + private unsubscribeIfNoAlerts(pair: string) { + const alertsForPair = this._alerts.get(pair); + if (!alertsForPair || !alertsForPair.length) { + this.unsubscribe([pair]); + } + } + private unsubscribe(pairs: string[]) { console.log('Unsubscribing from pairs: ', pairs.join(', ')); const params = pairs @@ -143,6 +152,15 @@ export default class PriceListenerWorker { this._alerts.set(alert.pair, [...alertsForPair]); } + removeListener(message: string) { + const alert: AlertModel = JSON.parse(message); + console.log(`Removing alert ${alert._id} from listener `); + this.unsubscribeIfNoAlerts(alert.pair); + let alertsForPair = this._alerts.get(alert.pair) || []; + alertsForPair = [...alertsForPair.filter(a => a._id !== alert._id)]; + this._alerts.set(alert.pair, [...alertsForPair]); + } + private handlePendingResponse(message: any) { const ref = message.id.toString(); const pendingResponse = this._pendingResponses.get(ref); @@ -199,6 +217,14 @@ export default class PriceListenerWorker { .catch(console.error) } break; + case 'lt_price': + if (Number(price) <= alert.price.value) { + this.evictFromListener(alert); + this.triggerAlert(alert, price) + .then(() => console.log(`Alert ${alert._id} triggered.`)) + .catch(console.error) + } + break; default: console.log(`Unhandled alert type: ${alert.type} for id ${alert._id}`); }