Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Feature/bitcoin demo #2

Open
wants to merge 56 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
56 commits
Select commit Hold shift + click to select a range
aa63ffd
first rough draft at combining orders
emaxerrno Aug 21, 2015
3fcfaaa
First pass
emaxerrno Aug 25, 2015
27f6521
Adding front end build system for price demo
emaxerrno Aug 31, 2015
accda76
Simpler react-flux no redux, more docs, bigger community
emaxerrno Sep 1, 2015
4349ab1
Adding simple project structure
emaxerrno Sep 1, 2015
02e665a
Adding d3 as a dependency
emaxerrno Sep 1, 2015
53b1fde
Going with a simpler setup for play
emaxerrno Sep 2, 2015
7a97a63
WOrking demo with iteratees and websockets
emaxerrno Sep 2, 2015
ea6998f
Working websocket for index.html tests
emaxerrno Sep 2, 2015
70b7a22
Work in progress for kafka consumer
emaxerrno Sep 2, 2015
5a11bab
Adding a kafka consumer
emaxerrno Sep 3, 2015
1c897c3
Cleaned up the application.scala into subpackages
emaxerrno Sep 3, 2015
bb60235
Adding logger with log4back
emaxerrno Sep 3, 2015
13f5883
Updating log4j config
emaxerrno Sep 3, 2015
1fbe903
Adding css for stockcharts
emaxerrno Sep 3, 2015
8ce6de4
Adding first jschart attempt
emaxerrno Sep 3, 2015
a737b5d
Hacked up the UI to work with stock charts.
emaxerrno Sep 3, 2015
6c8bac8
Working demo, but UI is broken, the charting engine blows.
emaxerrno Sep 4, 2015
9ef7e9d
tmp changes. Staging half of the work
emaxerrno Sep 9, 2015
c599cf1
Initial prototype. for porting over goog charts
emaxerrno Sep 9, 2015
4bb90c2
Working bar chart example
emaxerrno Sep 10, 2015
e2b67d0
Line chart working.
emaxerrno Sep 10, 2015
ec1d6bd
Adding font awesome to the dashboard
emaxerrno Sep 11, 2015
b1f54f7
Adding proper arrows for up / down change
emaxerrno Sep 11, 2015
0704ddd
Adding Price & moving avg for matched orders of bitcoin
emaxerrno Sep 14, 2015
989a829
Adding kafka
emaxerrno Sep 14, 2015
e53e2d0
reordering
emaxerrno Sep 15, 2015
05fc23b
Adding moving average operators and code.
emaxerrno Sep 15, 2015
2c031cd
Reworked deployment files for a larger deployment
emaxerrno Sep 16, 2015
b340d67
Increasing the python version
emaxerrno Sep 16, 2015
7551b6a
remove verbose output
emaxerrno Sep 16, 2015
44ca4fb
Testing the logging from the framework
emaxerrno Sep 16, 2015
6ed9532
renaming json files
emaxerrno Sep 16, 2015
04611c7
Fixed logging
emaxerrno Sep 16, 2015
cbfcaa0
Made an amazing python bash wrapper for runner.bash
emaxerrno Sep 16, 2015
28ae5f9
Fix the order type to be 'match' from bitcoin
emaxerrno Sep 16, 2015
108e612
Return the average per data point
emaxerrno Sep 16, 2015
6ba3163
More logs on proc timer
emaxerrno Sep 16, 2015
ce6dd15
Print if match order found
emaxerrno Sep 16, 2015
f96cbf8
Adding 1000 to the time.
emaxerrno Sep 17, 2015
d88fdff
Update for logging.critical fixes
emaxerrno Sep 17, 2015
d8af24a
Fixing parsing from json
emaxerrno Sep 17, 2015
e09bb4f
fixed the moving average to be closer to the real value
emaxerrno Sep 17, 2015
d6af115
Logging properly
emaxerrno Sep 17, 2015
366dd95
Fixing logs for moving avg
emaxerrno Sep 17, 2015
0e973b9
Added deployment file for latest price
emaxerrno Sep 17, 2015
f25890d
Fixed the backend, front end and middle tier now all hoccked up
emaxerrno Sep 17, 2015
210ff51
Emitting latest price when it actually changes
emaxerrno Sep 17, 2015
994498c
Adding a comment on why we need 0' millis in emitter
emaxerrno Sep 17, 2015
12bc177
Add a comment on timer agg for moving avg
emaxerrno Sep 17, 2015
b896744
Adding consuemer stop for kafka queue
emaxerrno Sep 18, 2015
d77f0f7
Adding a kafka_utils for unifying init of clients
emaxerrno Sep 18, 2015
22b5c27
Fixing latest price that seems to be enqueueing finally
emaxerrno Sep 18, 2015
16663d9
tmp work with the pykafka driver
emaxerrno Sep 18, 2015
758a405
Using concord-py==0.2.21 proper server stop
emaxerrno Sep 18, 2015
dd5eb69
Added an iterator for the multitopic consumer
emaxerrno Sep 18, 2015
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
26 changes: 0 additions & 26 deletions CoinbasePricePrinter.py

This file was deleted.

57 changes: 57 additions & 0 deletions concord/src/CoinbaseMatchedOrderMovingAverage.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,57 @@
import json
import sys
import time
import concord
from collections import deque
from concord.computation import (Computation, Metadata, serve_computation,
StreamGrouping)
from models.CoinbaseOrder import CoinbaseOrder
from utils.time_utils import (time_millis, bottom_of_current_second,
nseconds_from_now_in_millis)
from utils.kafka_utils import local_kafka_producer


# TODO(agallego): use pandas / numpy
# stores 1k numbers as the avg. simpler, but def not correct
class MovingAvg(deque):
def __init__(self, size=1000):
super(MovingAvg, self).__init__(maxlen=size)

@property
def average(self):
if len(self) <= 0: return 0
return sum(self)/len(self)

class CoinbaseMatchedOrderMovingAverage(Computation):
def __init__(self):
self.moving_average = MovingAvg()
self.producer = local_kafka_producer('match-avg')

def init(self, ctx):
self.concord_logger.info("Matched order moving avg init")

def process_record(self, ctx, record):
order = CoinbaseOrder(record.data)
if order.type == 'match':
self.concord_logger.info("Found matched order at price: %s",
str(order.price))
# it has to include 0's in the millisecond and micro second
# parts of the time to be reasonable for updates to kafka
sec = (bottom_of_current_second() * 1000) + 1000
self.moving_average.append(order.price)
ctx.set_timer(str(sec), sec)

def process_timer(self, ctx, key, time):
d = {
'time': int(key),
'avg': self.moving_average.average
}
self.producer.produce(json.dumps(d))
self.concord_logger.info("Saving to kafka for time: %s, avg price: %s",
key, str(self.moving_average.average))

def metadata(self):
return Metadata(name='match-orders-sec-avg',
istreams=[('btcusd', StreamGrouping.GROUP_BY)])

serve_computation(CoinbaseMatchedOrderMovingAverage())
104 changes: 104 additions & 0 deletions concord/src/CoinbasePricePrinter.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,104 @@
import json
import sys
import unicodedata
import logging
import concord
from concord.computation import (Computation, Metadata, serve_computation)
import cachetools

def time_millis(): return int(round(time.time() * 1000))
def next_second(sec=1): return time_millis() + (sec * 1000)



class CoinbasePricePrinter(Computation):
def __init__(self):
from cachetools import TTLCache
def missing_order(order_type):
return CoinbaseOrder(
{'time': '1970-01-01T00:00:0Z',
'sequence': 0,
'price': 0,
'type': order_type})

self.order_cache = TTLCache(10, 300, missing=missing_order)
self.volume_cache = TTLCache(10, 300, missing=lambda x: 0)


def update_volume(self, coinbase_order, context):
current = self.volume_cache[coinbase_order.time]
self.volume_cache[c.time] = (current +
(coinbase_order.price * coinbase.volume))
context.set_state(CoinbasePricePrinter.volume_key(coinbase_order.time),
CoinbaseOrder.to_json(self.volume_cache[c.time]))


# TODO(agallego) map(filter(test,cache)cache)
def update_orders(self, coinbase_order, context):
if not coinbase_order.valid(): return
current = self.order_cache[coinbase_order.type]
if not current.valid():
self.order_cache[coinbase_order.type] = coinbase_order
else if CoinbaseOrder.is_combinable(coinbase_order, current):
self.order_cache[coinbase_order.type] = CoinbaseOrder.combie(
current, coinbase_order)
else:
raise Exception("Couldn't combine with any orders", coinbase_order)

context.set_state(CoinbasePricePrinter.durable_key(coinbase_order),
CoinbasePricePrinter.to_json(
self.order_cache[coinbase_order.type]))

def init(self, ctx):
log.info("Price Printer initialized")
ctx.set_timer("loop", next_second())

def process_record(self, ctx, record):
try:
c = CoinbaseOrder(record.data)
self.update_volume(c)
self.update_orders(c)
except as e:
log.exception(e)

def process_timer(self, ctx, key, time):
ctx.produce_record('btcusd-volume', self.volume_key(time),
self.cache(self.volume_key(time)))
ctx.produce_record('btcusd-buy', self.buy_key(time),
self.cache(self.buy_key(time)))
ctx.produce_record('btcusd-sell', self.sell_key(time),
self.cache(self.sell_key(time)))
ctx.set_timer(key, next_second())

def metadata(self):
return Metadata(name='coinbase-price-printer',
istreams=['btcusd'], ostreams=[])

@staticmethod
def durable_key(coinbase_order):
key_map = {
'sell' = CoinbasePricePrinter.sell_key(coinbase_order.time),
'buy' = CoinbasePricePrinter.buy_key(coinbase_order.time),
}
return key_map[coinbase_order.type]

@staticmethod
def to_bottom_of_second(time_stamp_millis):
return time_stamp_millis - (time_stamp_millis % 1000)

@staticmethod
def volume_key(time_stamp):
return 'volume-' + str(
CoinbasePricePrinter.to_bottom_of_second(time_stamp))

@staticmethod
def sell_key(time_stamp):
return 'sell-' + str(
CoinbasePricePrinter.to_bottom_of_second(time_stamp))

@staticmethod
def buy_key( time_stamp):
return 'buy-' + str(
CoinbasePricePrinter.to_bottom_of_second(time_stamp))

serve_computation(CoinbasePricePrinter())
2 changes: 1 addition & 1 deletion CoinbaseSource.py → concord/src/CoinbaseSource.py
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@ def __init__(self):
self.queue = Queue()
def init(self, ctx):
ctx.set_timer('loop', time_millis() + 1000) # start in 1 sec
log.info("Coinbase initialized")
self.concord_logger.info("Coinbase initialized")
def process_timer(self, ctx, key, time):
while not self.queue.empty():
ctx.produce_record('btcusd', 'empty', self.queue.get())
Expand Down
31 changes: 31 additions & 0 deletions concord/src/LatestMatchPrice.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,31 @@
import json
import sys
import time
import concord
from concord.computation import (Computation, Metadata, serve_computation,
StreamGrouping)
from models.CoinbaseOrder import CoinbaseOrder
from utils.time_utils import (time_millis, bottom_of_current_second,
nseconds_from_now_in_millis)
from utils.kafka_utils import local_kafka_producer

class LatestMatchPrice(Computation):
def __init__(self):
self.producer = local_kafka_producer('latest-match-price')

def init(self, ctx):
self.concord_logger.info("Latest price init")

def process_record(self, ctx, record):
order = CoinbaseOrder(record.data)
if order.type == 'match':
self.concord_logger.info("Emitting latest price: %s",
str(order.price))
d = {'time': time_millis(), 'price': order.price}
self.producer.produce(json.dumps(d))

def metadata(self):
return Metadata(name='latest-price',
istreams=[('btcusd', StreamGrouping.GROUP_BY)])

serve_computation(LatestMatchPrice())
Empty file added concord/src/__init__.py
Empty file.
11 changes: 11 additions & 0 deletions concord/src/coinbase_latest_price.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
{
"zookeeper_hosts": "localhost:2181",
"zookeeper_path": "/concord",
"executable_arguments": ["LatestMatchPrice.py"],
"executable_name": "runner.bash",
"compress_files": ["runner.bash", "."],
"exclude_compress_files": [".*\\.pyc$", ".*\\.log$", ".*\\.json"],
"computation_name": "latest-price",
"mem": 256,
"cpus": 0.5
}
11 changes: 11 additions & 0 deletions concord/src/coinbase_moving_avg.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
{
"zookeeper_hosts": "localhost:2181",
"zookeeper_path": "/concord",
"executable_arguments": ["CoinbaseMatchedOrderMovingAverage.py"],
"executable_name": "runner.bash",
"compress_files": ["runner.bash", "."],
"exclude_compress_files": [".*\\.pyc$", ".*\\.log$", ".*\\.json"],
"computation_name": "match-orders-sec-avg",
"mem": 1024,
"cpus": 1.0
}
File renamed without changes.
5 changes: 3 additions & 2 deletions coinbase_source.json → concord/src/coinbase_source.json
Original file line number Diff line number Diff line change
@@ -1,9 +1,10 @@
{
"zookeeper_hosts": "localhost:2181",
"zookeeper_path": "/bolt",
"zookeeper_path": "/concord",
"executable_arguments": ["CoinbaseSource.py"],
"executable_name": "runner.bash",
"compress_files": ["CoinbaseSource.py", "runner.bash", "requirements.txt"],
"compress_files": ["runner.bash", "."],
"exclude_compress_files": [".*\\.pyc$", ".*\\.log$", ".*\\.json"],
"computation_name": "coinbase-indx",
"mem": 1024,
"cpus": 1.0
Expand Down
11 changes: 11 additions & 0 deletions concord/src/models/Avg.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
class Avg:
def __init__(self):
self.array = []
self.count = 0.0
def avg(self):
if self.count == 0:
return 0
return float(sum(self.array))/self.count
def append(self, point):
self.array.append(point)
self.count = self.count + 1
49 changes: 49 additions & 0 deletions concord/src/models/CoinbaseOrder.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,49 @@
# coding: utf-8
from __future__ import absolute_import
from __future__ import division
from __future__ import print_function
from __future__ import unicode_literals

import time
import json
import dateutil

class CoinbaseOrder:
def __init__(self, json_data):
r = json.loads(json_data)
for key in r: setattr(self, key, r[key])
from dateutil.parser import parse
self.time = parse(self.time)
self.sequence = int(self.sequence)
if self.type in ['match', 'open', 'close']:
self.price = float(self.price)

def valid(self):
return self.sequence > 0


@staticmethod
def to_json(coinbase_order):
return json.dumps(coinbase_order,
default=lambda o: o.__dict__,
sort_keys=True)

@staticmethod
def from_json(byte_array):
return CoinbaseOrder(json.loads(byte_array))

@staticmethod
def combine(lhs_coinbase, rhs_coinbase):
if not is_combinable(lhs_coinbase, rhs_coinbase):
raise Exception("Cannot combine orders")
ret = lhs_coinbase
ret.price += rhs_coinbase.price
return ret

@staticmethod
def is_combinable(lhs_coinbase, rhs_coinbase):
try:
return (lhs_coinbase.type == rhs_coinbase.type and
lhs_coinbase.side == rhs_coinbase.side)
except:
return false
Empty file added concord/src/models/__init__.py
Empty file.
6 changes: 5 additions & 1 deletion requirements.txt → concord/src/requirements.txt
Original file line number Diff line number Diff line change
@@ -1,7 +1,8 @@
autobahn==0.10.5.post2
backports.ssl-match-hostname==3.4.0.2
cachetools==1.0.3
cffi==1.1.2
concord-py==0.2.5
concord-py==0.2.21
cryptography==1.0
enum34==1.0.4
graphviz==0.4.3
Expand All @@ -10,8 +11,11 @@ ipaddress==1.0.14
kazoo==2.0
pyasn1==0.1.8
pycparser==2.14
pykafka==1.1.1
pyOpenSSL==0.15.1
python-dateutil==2.4.2
six==1.9.0
tabulate==0.7.5
thrift==0.9.2
trace2html==0.2.1
Twisted==15.3.0
Expand Down
39 changes: 39 additions & 0 deletions concord/src/runner.bash
Original file line number Diff line number Diff line change
@@ -0,0 +1,39 @@
#!/bin/bash
# set -x

SOURCE="${BASH_SOURCE[0]}"
while [ -h "$SOURCE" ]; do # resolve $SOURCE until the file is no longer a symlink
exec_dir="$( cd -P "$( dirname "$SOURCE" )" && pwd )"
SOURCE="$(readlink "$SOURCE")"
[[ $SOURCE != /* ]] && SOURCE="$exec_dir/$SOURCE"
# if $SOURCE was a relative symlink, we need to resolve it
# relative to the path where the symlink file was located
done
exec_dir="$( cd -P "$( dirname "$SOURCE" )" && pwd )"
original=${PWD}
requirements=$(find $exec_dir -name requirements.txt)

echo "Mesos directory: ${PWD}"
echo "Exec directory: ${exec_dir}"

if [[ -f $requirements ]]; then
# need to overcome pip 128 chars path - software... :'(
work_dir=$(mktemp -d -p $original)
symlink_dir=$(mktemp -d)
ln -s $work_dir $symlink_dir/concord
dir=$symlink_dir/concord
cd $dir
echo "Installing venv in $dir"
virtualenv $dir/env
$dir/env/bin/pip install -r $requirements
cd $exec_dir
exec $dir/env/bin/python "$original/$@"
else
exec python "$original/$@"
fi

rc = $?
if [[ $rc != 0 ]]; then
echo "Client exited with: $rc"
exit $rc
fi
Loading