Skip to content

Commit

Permalink
Added HBase, mongo, and Phoenix bolts. Defined a topology.properties …
Browse files Browse the repository at this point in the history
…file for dynamically building topologies from lists of components at runtime
  • Loading branch information
randerzander committed Dec 9, 2014
1 parent 02a55d2 commit fcc634b
Show file tree
Hide file tree
Showing 18 changed files with 1,305 additions and 296 deletions.
226 changes: 0 additions & 226 deletions multilang/resources/backup.py

This file was deleted.

15 changes: 0 additions & 15 deletions multilang/resources/example.py

This file was deleted.

28 changes: 28 additions & 0 deletions multilang/resources/mtr-parse.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,28 @@
#!/usr/bin/python

import storm, sys, json, datetime

class ExampleBolt(storm.BasicBolt):
#def initialize(self, stormconf, context):
# Can define init steps here as needed

def quote(text): return '\"' + text + '\"'

def process(self, tup):
report = json.loads(tup.values[0])
src_ip = report['Client']['IP']
target_ip = report['Command'][5]
#local_timestamp = str(datetime.datetime.fromtimestamp(int(report['StartTime'])))
local_time = int(report['StartTime'])*1000
# Parse output by newlines, get last line
last_line = report['Output'].split('\x00')[-2]
if '???' in last_line: return # Don't report on an incomplete route

tokens = last_line.split()
loss = tokens[2].replace('%', '')
stddev = tokens[-1]
avg = tokens[5]

storm.emit([quote(src_ip), quote(target_ip), local_time, loss, avg, stddev])

ExampleBolt().run()
25 changes: 0 additions & 25 deletions multilang/resources/splitsentence.py

This file was deleted.

Binary file modified multilang/resources/storm.pyc
Binary file not shown.
22 changes: 22 additions & 0 deletions multilang/resources/test/mtr-parse.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
#!/usr/bin/python

import sys, json, datetime

def quote(text): return '\"' + text + '\"'

for line in sys.stdin:
report = json.loads(line)
src_ip = report['Client']['IP']
target_ip = report['Command'][5]
#local_timestamp = str(datetime.datetime.fromtimestamp(int(report['StartTime'])))
local_timestamp = int(report['StartTime'])*1000
# Parse output by newlines, get last line
last_line = report['Output'].split('\x00')[-2]
if '???' in last_line: continue # Don't report on an incomplete route

tokens = last_line.split()
loss = tokens[2].replace('%', '')
stddev = tokens[-1]
avg = tokens[5]

print([quote(src_ip), quote(target_ip), local_timestamp, loss, avg, stddev])
12 changes: 12 additions & 0 deletions phoenix_ddl/mtr.sql
Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@
drop table if exists mtr;

create table mtr(
source_ip varchar,
target_ip varchar,
local_time time,
loss decimal,
avg_latency decimal,
stddev_latency decimal
constraint my_pk primary key (source_ip, target_ip, local_time)
) ttl=432000
;
1 change: 1 addition & 0 deletions phoenix_queries/empty_mtr.sql
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
delete from mtr;
1 change: 1 addition & 0 deletions phoenix_queries/route_stats.sql
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
select route_id, avg(loss), avg(avg_latency), avg(stddev_latency), count(*) from mtr group by route_id;
5 changes: 5 additions & 0 deletions phoenix_queries/route_stats_test.sql
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
upsert into mtr values ('192.168.1.2|192.168.1.1', current_time(), '192.168.1.1', '192.168.1.2', 0, 23.4, .4);
upsert into mtr values ('192.168.1.2|192.168.1.1', current_time(), '192.168.1.1', '192.168.1.2', 0, 25, .5);
upsert into mtr values ('192.168.1.2|192.168.1.1', current_time(), '192.168.1.1', '192.168.1.2', 0, 24, .2);

select route_id, avg(loss), avg(avg_latency), avg(stddev_latency), count(*) from mtr group by route_id;
11 changes: 10 additions & 1 deletion pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -46,12 +46,21 @@
<artifactId>storm-kafka</artifactId>
<version>0.9.1.2.1.5.0-695</version>
</dependency>

<dependency>
<groupId>org.apache.storm</groupId>
<artifactId>storm-hdfs</artifactId>
<version>0.9.1.2.1.5.0-695</version>
</dependency>
<dependency>
<groupId>org.apache.storm</groupId>
<artifactId>storm-hbase</artifactId>
<version>0.9.3.2.2.0.0-2041</version>
</dependency>
<dependency>
<groupId>org.mongodb</groupId>
<artifactId>mongo-java-driver</artifactId>
<version>2.12.4</version>
</dependency>
</dependencies>
<build>
<sourceDirectory>src/main</sourceDirectory>
Expand Down
Loading

0 comments on commit fcc634b

Please sign in to comment.