Skip to content

Commit

Permalink
Expose "host" in connect()
Browse files Browse the repository at this point in the history
Don't "null" terminate connecting string
  • Loading branch information
lbilli committed May 9, 2019
1 parent d26ad20 commit a359b44
Show file tree
Hide file tree
Showing 11 changed files with 94 additions and 67 deletions.
2 changes: 1 addition & 1 deletion Project.toml
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
name = "Jib"
uuid = "f310f2d2-a263-11e8-3998-47bd686f18f7"
authors = ["Luca Billi <[email protected]>"]
version = "0.5.1"
version = "0.6.0"

[deps]
DataFrames = "a93c6f00-e57d-5684-b7b6-d8193f3e46c0"
Expand Down
7 changes: 6 additions & 1 deletion src/client.jl
Original file line number Diff line number Diff line change
@@ -1,11 +1,16 @@
module Client

using Sockets
using Sockets: TCPSocket
using TimeZones

include("core.jl")
include("versions.jl")

"""
Connection()
Hold a connection to IB TWS or IBGateway.
"""
struct Connection
socket::TCPSocket
id::Int
Expand Down
47 changes: 33 additions & 14 deletions src/connect.jl
Original file line number Diff line number Diff line change
Expand Up @@ -4,45 +4,64 @@ using Sockets

using ..Client: Core, Connection, Version
using ..Reader: read_msg
using ..Requests: Encoder.Enc, startApi
using ..Requests: startApi

import ..from_ibtime

function connect(port::Int, clientId::Int, connectOptions::String="", optionalCapabilities::String="")
"""
connect([host, ]port, clientId, connectOptions="", optionalCapabilities="")
Connect to host `host` on port `port` and set client ID `clientId`.
Return a [`Connection`](@ref) instance.
"""
function connect(host, port::Int, clientId::Int, connectOptions::String="", optionalCapabilities::String="")

s = Sockets.connect(port)
s = Sockets.connect(host, port)

@assert isopen(s)

o = Enc()
# Init string
m, M = Int(typemin(Version)), Int(typemax(Version))

o(init_string(connectOptions))
buf = IOBuffer()
print(buf, m==M ? "v$m" : "v$m..$M")
isempty(connectOptions) || print(buf, " ", connectOptions)

Core.write_one(s, o.buf, true)
# Handshake
Core.write_one(s, buf, true)

res = read_msg(s)

@assert length(res) == 2

@info "Connected" ver=res[1] t=res[2]
@info "Connected" V=res[1] T=res[2]

v = parse(Int, res[1])
m v M || error("Unsupported version.")

ib = Connection(s, clientId, connectOptions, Version(parse(Int, res[1])), from_ibtime(res[2]))
ib = Connection(s, clientId, connectOptions, Version(v), from_ibtime(res[2]))

startApi(ib, clientId, optionalCapabilities)

ib
end

disconnect(ib::Connection) = close(ib.socket)
function connect(port::Int, clientId::Int, connectOptions::String="", optionalCapabilities::String="")

localip = getalladdrinfo("localhost")

function init_string(options::String)
@assert !isempty(localip)

m, M = Int(typemin(Version)), Int(typemax(Version))
connect(localip[1], port, clientId, connectOptions, optionalCapabilities)
end

res = m == M ? "v$m" : "v$m..$M"

isempty(options) ? res : "$res $options"
end
"""
disconnect(ib)
Close the socket connection.
"""
disconnect(ib::Connection) = close(ib.socket)

end
5 changes: 2 additions & 3 deletions src/core.jl
Original file line number Diff line number Diff line change
Expand Up @@ -4,11 +4,10 @@ const API_SIGN = "API\0"
const HEADTYPE = UInt32 # sizeof(HEADTYPE) == 4 bytes
const MAX_LEN = 0xffffff

using Sockets

isascii(m) = all(x -> x < 0x80, m) # ASCII

function write_one(socket::TCPSocket, buf::IOBuffer, api_sign::Bool=false)
function write_one(socket, buf, api_sign=false)

msg = take!(buf)

Expand All @@ -22,7 +21,7 @@ function write_one(socket::TCPSocket, buf::IOBuffer, api_sign::Bool=false)
end


function read_one(socket::TCPSocket)
function read_one(socket)

len = ntoh(read(socket, HEADTYPE))

Expand Down
2 changes: 1 addition & 1 deletion src/decoder.jl
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ function decode(msg, w, ver)
# The first field is the message ID
id::Int = pop(it)

# The second field (version) is unused for id < 75 and != 3, 5, 11, 17
# The second field (version) is ignored for id < 75 and != 3, 5, 11, 17
if id < 75 && id [3, 5, 11, 17] ||
id == 3 && ver < Client.MARKET_CAP_PRICE ||
id == 5 && ver < Client.ORDER_CONTAINER ||
Expand Down
1 change: 1 addition & 0 deletions src/encoder.jl
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ struct Enc
end
Enc() = Enc(IOBuffer(sizehint=64))


"""
(e::Enc)(::T)
Expand Down
7 changes: 3 additions & 4 deletions src/field.jl
Original file line number Diff line number Diff line change
@@ -1,9 +1,8 @@
"""
Wrap inbound fields and define conversions to
String, Int, Float64, Bool and Enums
Field()
Wrap inbound fields and define conversions to
`String`, `Int`, `Float64`, `Bool` and `Enums`.
"""
struct Field{T<:AbstractString}
value::T
Expand Down
69 changes: 35 additions & 34 deletions src/process.jl
Original file line number Diff line number Diff line change
Expand Up @@ -22,8 +22,9 @@ import ...ComboLeg,


"""
Some utility functions
slurp(::Type{T}, it)
Utility functions to read from an iterator `it` and convert to predefined types `T`.
"""
slurp(::Type{T}, it) where {T<:Union{Bool,Int,Enum{Int32},Float64,String}} = convert(T, pop(it))

Expand All @@ -35,10 +36,11 @@ slurp!(x::T, idx, it) where {T} = setfield!.(Ref(x), idx, slurp(fieldtype.(T, id


"""
Convert a String[] into a NamedTuple() like this:
tagvalue2nt(x)
["tag1", "value1", "tag2", "value2", ...] -> (tag1="value1", tag2="value2", ...)
Convert a `String[]` into a `NamedTuple()` like this:
["tag1", "value1", "tag2", "value2", ...] -> (tag1="value1", tag2="value2", ...)
"""
function tagvalue2nt(x)

Expand All @@ -65,21 +67,20 @@ end

function fill_df(eltypes, names, n, it)

dt = DataFrame(eltypes, names, n)
df = DataFrame(eltypes, names, n)

nr, nc = size(dt)

for r 1:nr, c 1:nc
dt[r, c] = pop(it)
for r 1:nrow(df), c 1:ncol(df)
df[r, c] = pop(it)
end

dt
df
end


"""
Collection of parsers indexed by message ID
process::Dict{Int,Function}
Collection of parsers indexed by message ID
"""
const process = Dict{Int,Function}(

Expand Down Expand Up @@ -392,7 +393,7 @@ const process = Dict{Int,Function}(

n::Int = pop(it)

dt = if ver < Client.SYNT_REALTIME_BARS
df = if ver < Client.SYNT_REALTIME_BARS

tmp = fill_df([String,Float64,Float64,Float64,Float64,Int,Float64,String,Int],
[:time, :open, :high, :low, :close, :volume, :wap, :hasGaps, :count],
Expand All @@ -409,7 +410,7 @@ const process = Dict{Int,Function}(
n, it)
end

w.historicalData(reqId, dt)
w.historicalData(reqId, df)
end,

# BOND_CONTRACT_DATA
Expand Down Expand Up @@ -717,7 +718,7 @@ const process = Dict{Int,Function}(

n::Int = pop(it)

dt = if ver Client.SERVICE_DATA_TYPE
df = if ver Client.SERVICE_DATA_TYPE

fill_df([String,String,String,String,Union{Int,Nothing}], [:exchange, :secType, :listingExch, :serviceDataType, :aggGroup], n, it)

Expand All @@ -732,7 +733,7 @@ const process = Dict{Int,Function}(
tmp
end

w.mktDepthExchanges(dt)
w.mktDepthExchanges(df)
end,

# TICK_REQ_PARAMS
Expand All @@ -744,9 +745,9 @@ const process = Dict{Int,Function}(
reqId::Int,
n::Int = it

dt = fill_df([Int,String,String], [:bit, :exchange, :exchangeLetter], n, it)
df = fill_df([Int,String,String], [:bit, :exchange, :exchangeLetter], n, it)

w.smartComponents(reqId, dt)
w.smartComponents(reqId, df)
end,

# NEWS_ARTICLE
Expand All @@ -766,9 +767,9 @@ const process = Dict{Int,Function}(

n::Int = pop(it)

dt = fill_df([String,String], [:providerCode, :providerName], n, it)
df = fill_df([String,String], [:providerCode, :providerName], n, it)

w.newsProviders(dt)
w.newsProviders(df)
end,

# HISTORICAL_NEWS
Expand All @@ -786,9 +787,9 @@ const process = Dict{Int,Function}(
reqId::Int,
n::Int = it

dt = fill_df([Float64,Int], [:price, :size], n, it)
df = fill_df([Float64,Int], [:price, :size], n, it)

w.histogramData(reqId, dt)
w.histogramData(reqId, df)
end,

# HISTORICAL_DATA_UPDATE
Expand Down Expand Up @@ -820,9 +821,9 @@ const process = Dict{Int,Function}(
marketRuleId::Int,
n::Int = it

dt = fill_df([Float64,Float64], [:lowEdge, :increment], n, it)
df = fill_df([Float64,Float64], [:lowEdge, :increment], n, it)

w.marketRule(marketRuleId, dt)
w.marketRule(marketRuleId, df)
end,

# PNL
Expand Down Expand Up @@ -865,15 +866,15 @@ const process = Dict{Int,Function}(
reqId::Int,
n::Int = it

dt = fill_df([Int,Int,Float64,Int], [:time, :ignore, :price, :size], n, it)
df = fill_df([Int,Int,Float64,Int], [:time, :ignore, :price, :size], n, it)

deletecols!(dt, :ignore)
deletecols!(df, :ignore)

# TODO: Convert dt[:time] to [Zoned]DateTime
# TODO: Convert df[:time] to [Zoned]DateTime

done::Bool = pop(it)

w.historicalTicks(reqId, dt, done)
w.historicalTicks(reqId, df, done)
end,

# HISTORICAL_TICKS_BID_ASK
Expand All @@ -882,14 +883,14 @@ const process = Dict{Int,Function}(
reqId::Int,
n::Int = it

dt = fill_df([Int,Int,Float64,Float64,Int,Int], [:time, :mask, :priceBid, :priceAsk, :sizeBid, :sizeAsk], n, it)
df = fill_df([Int,Int,Float64,Float64,Int,Int], [:time, :mask, :priceBid, :priceAsk, :sizeBid, :sizeAsk], n, it)

# TODO: Convert dt[:time] to [Zoned]DateTime
# TODO: Unmask dt[:mask]
# TODO: Convert df[:time] to [Zoned]DateTime
# TODO: Unmask df[:mask]

done::Bool = pop(it)

w.historicalTicksBidAsk(reqId, dt, done)
w.historicalTicksBidAsk(reqId, df, done)
end,

# HISTORICAL_TICKS_LAST
Expand All @@ -898,14 +899,14 @@ const process = Dict{Int,Function}(
reqId::Int,
n::Int = it

dt = fill_df([Int,Int,Float64,Int,String,String], [:time, :mask, :price, :size, :exchange, :specialConditions], n, it)
df = fill_df([Int,Int,Float64,Int,String,String], [:time, :mask, :price, :size, :exchange, :specialConditions], n, it)

# TODO: Convert dt[:time] to [Zoned]DateTime
# TODO: Unmask dt[:mask]
# TODO: Convert df[:time] to [Zoned]DateTime
# TODO: Unmask df[:mask]

done::Bool = pop(it)

w.historicalTicksLast(reqId, dt, done)
w.historicalTicksLast(reqId, df, done)
end,

# TICK_BY_TICK
Expand Down
6 changes: 2 additions & 4 deletions src/reader.jl
Original file line number Diff line number Diff line change
@@ -1,15 +1,13 @@
module Reader

using Sockets

using ..Client: Connection, Core.read_one

import ..Wrapper

include("decoder.jl")


function read_msg(socket::TCPSocket)
function read_msg(socket)

msg = read_one(socket)

Expand Down Expand Up @@ -44,7 +42,7 @@ Return number of messages processed.
function check_all(ib::Connection, w::Wrapper, flush::Bool=false)

count = 0
while bytesavailable(ib.socket) > 0 || ib.socket.status == Sockets.StatusOpen # =3
while bytesavailable(ib.socket) > 0 || ib.socket.status == Base.StatusOpen # =3

msg = read_msg(ib.socket)

Expand Down
Loading

0 comments on commit a359b44

Please sign in to comment.