-
Notifications
You must be signed in to change notification settings - Fork 3
/
Copy pathcronGetTrends.js
163 lines (154 loc) · 4.89 KB
/
cronGetTrends.js
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
import cron from 'cron';
import twitterClient from './clients/twitterClient';
import util from 'util';
import request from 'request';
import serverConfig from './config/ServerConfig';
import { logger } from './service/LoggerService';
import RedisClient from './clients/RedisClient';
const CronJob = cron.CronJob;
const server_uri =
'http://' +
serverConfig.hostname +
':' +
serverConfig.port +
serverConfig.endpoint;
const locationType = {
1: 'Country',
2: 'City'
};
// Initialize queue with trending locations on server start
let queue = [];
// Run this Cron Job every 15 minutes to get trending locations and update trends when the queue is empty
const getTrendingLocationsJob = new CronJob('0 */15 * * * *', () => {
if (queue.length == 0) {
getCountryWoeids.then(result => {
logger.info(
'Getting trending hashtags country and city wise',
result.length
);
queue = Array.from(result);
});
}
});
// Rate limiting : Job to run every 15 minutes making requests to trends api
const getTrendsJob = new CronJob('0 */15 * * * *', () => {
let trendingLocations = queue.splice(0, 70);
if (trendingLocations.length) {
getTrendsByCountry(trendingLocations);
logger.info(
'Getting trends for cities and countries. Total number of locations: ' +
queue.length +
'. Getting trends for ' +
trendingLocations.length +
'locations.'
);
}
});
const getGlobalTrendsJob = new CronJob('0 */15 * * * *', () => {
twitterClient.get(
'trends/place.json',
{ id: 1 },
(error, trends, response) => {
if (error) {
logger.error('Error while getting global trends: ', error);
} else {
const globalTrends = getTrendingHashTag(trends[0].trends, 5, 1);
const value = JSON.stringify(globalTrends);
RedisClient.client.set('global-trend', value, (err, res) => {
if (err) {
logger.error(
'Failed to update redis cache with global trends: ' + err
);
}
});
}
}
);
});
// Returns object with this format [{ country: 'United States', woeid: 23424977, countryCode: 'US' }]
const getCountryWoeids = new Promise((resolve, reject) => {
twitterClient.get('trends/available', (error, availableWoeids, response) => {
if (error) {
logger.error('Error in getting available trends: ' + error);
reject(error);
}
const availableCountries = availableWoeids.map(loc => {
const newObj = {
country: loc.country,
woeid: loc.parentid,
countryCode: loc.countryCode,
locationType: locationType[1] // Country
};
return newObj;
});
// Remove first element as that is global trends
availableCountries.splice(0, 1);
const uniqueWoeids = {};
const distinctCountries = [];
availableCountries.forEach(element => {
if (!uniqueWoeids[element.woeid]) {
distinctCountries.push(element);
uniqueWoeids[element.woeid] = true;
}
});
availableWoeids = availableWoeids.filter(
item => item.placeType.name == 'Town'
);
const availableCities = availableWoeids.map(loc => {
const newObj = {
country: loc.country,
woeid: loc.woeid,
city: loc.name,
countryCode: loc.countryCode,
locationType: locationType[2] // City
};
return newObj;
});
let places = [];
places = distinctCountries.concat(availableCities);
resolve(places);
});
});
const getTrendsByCountry = countryWoeids => {
Promise.all(
countryWoeids.map(item =>
twitterClient
.get('trends/place.json', { id: item.woeid })
.then(result => getTrendingHashTag(result[0].trends, 5, item))
.catch(error =>
logger.error('Error in getting trending hashtag from Twitter' + error)
)
)
).then(result => {
// POST the data to sentiment analyser
const options = {
uri: server_uri,
json: { trends: result },
method: 'POST'
};
request(options, (err, res, body) => {
if (err) {
logger.error('Error in posting data to Sentiment Analyser: ' + err);
}
logger.info('Successfully posted data to Sentiment Analyser');
});
});
};
const getTrendingHashTag = (trends, numOfTrends, countryWoeid) => {
let filteredTrends = trends.filter(trend => trend.tweet_volume != null);
filteredTrends.sort((a, b) => b.tweet_volume - a.tweet_volume);
filteredTrends = filteredTrends.slice(0, numOfTrends);
filteredTrends = filteredTrends.map(item => {
const newObj = {
name: item.name,
tweetVolume: item.tweet_volume,
rank: null,
url: item.url
};
return newObj;
});
filteredTrends.forEach((item, index) => (item['rank'] = index + 1));
const mergedObj = { ...countryWoeid, twitterTrendInfo: filteredTrends };
return mergedObj;
};
module.exports = { getTrendsJob, getTrendingLocationsJob, getGlobalTrendsJob };