Skip to content

Commit

Permalink
v6.0.0 (#18)
Browse files Browse the repository at this point in the history
* New format support

* version bump

* removed heartbeat + small fixes and additions

* Update readme, etc, get ready for release

---------

Co-authored-by: Evan <[email protected]>
  • Loading branch information
ssnyder-intrinio and evanlhyde authored May 26, 2023
1 parent ba98d85 commit 237a777
Show file tree
Hide file tree
Showing 7 changed files with 272 additions and 123 deletions.
24 changes: 22 additions & 2 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -58,29 +58,49 @@ There are thousands of securities, each with their own feed of activity. We hig
### Trade Message

```java
public record Trade(String symbol, double price, long size, long timestamp, long totalVolume)
public record Trade(String symbol, SubProvider subProvider, char marketCenter, double price, long size, long timestamp, long totalVolume, String conditions)
```

* **symbol** - Ticker symbole.
* **subProvider** - Denotes the detailed source within grouped sources.
* **`NONE`** - No subtype specified.
* **`CTA_A`** - CTA_A in the DELAYED_SIP provider.
* **`CTA_B`** - CTA_B in the DELAYED_SIP provider.
* **`UTP`** - UTP in the DELAYED_SIP provider.
* **`OTC`** - OTC in the DELAYED_SIP provider.
* **`NASDAQ_BASIC`** - NASDAQ Basic in the NASDAQ_BASIC provider.
* **`IEX`** - From the IEX exchange in the REALTIME provider.
* **marketCenter** - Provides the market center
* **price** - the price in USD
* **size** - the size of the last trade.
* **totalVolume** - The number of stocks traded so far today for this symbol.
* **timestamp** - a Unix timestamp in nanoseconds since unix epoch.
* **conditions** - Provides the conditions


### Quote Message

```java
public record Quote(QuoteType type, String symbol, double price, long size, long timestamp)
public record Quote(QuoteType type, String symbol, SubProvider subProvider, char marketCenter, double price, long size, long timestamp, String conditions)
```

* **type** - the quote type
* **`ask`** - represents an ask type
* **`bid`** - represents a bid type
* **subProvider** - Denotes the detailed source within grouped sources.
* **`NONE`** - No subtype specified.
* **`CTA_A`** - CTA_A in the DELAYED_SIP provider.
* **`CTA_B`** - CTA_B in the DELAYED_SIP provider.
* **`UTP`** - UTP in the DELAYED_SIP provider.
* **`OTC`** - OTC in the DELAYED_SIP provider.
* **`NASDAQ_BASIC`** - NASDAQ Basic in the NASDAQ_BASIC provider.
* **`IEX`** - From the IEX exchange in the REALTIME provider.
* **marketCenter** - Provides the market center
* **symbol** - Ticker symbol.
* **price** - the price in USD
* **size** - the size of the last ask or bid).
* **timestamp** - a Unix timestamp in nanoseconds since unix epoch.
* **conditions** - Provides the conditions

## API Keys

Expand Down
2 changes: 1 addition & 1 deletion pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@
<modelVersion>4.0.0</modelVersion>
<groupId>com.intrinio</groupId>
<artifactId>IntrinioRealtimeJavaSDK</artifactId>
<version>5.0.0</version>
<version>6.0.0</version>
<packaging>jar</packaging>
<name>IntrinioRealtimeJavaSDK</name>
<build>
Expand Down
2 changes: 1 addition & 1 deletion src/SampleApp/SampleApp.java
Original file line number Diff line number Diff line change
Expand Up @@ -126,6 +126,6 @@ public void run() {
quoteHandler.tryLog();
}
};
timer.schedule(task, 300000, 300000);
timer.schedule(task, 30000, 30000);
}
}
56 changes: 16 additions & 40 deletions src/intrinio/Client.java
Original file line number Diff line number Diff line change
Expand Up @@ -50,8 +50,11 @@ public class Client implements WebSocket.Listener {
private OnTrade onTrade = (Trade trade) -> {};
private OnQuote onQuote = (Quote quote) -> {};
private Thread[] processDataThreads;
private Thread heartbeatThread;
private boolean isCancellationRequested = false;
private String HeaderClientInformationKey = "Client-Information";
private String HeaderClientInformationValue = "IntrinioRealtimeJavaSDKv6.0";
private String HeaderMessageVersionKey = "UseNewEquitiesFormat";
private String HeaderMessageVersionValue = "v2";
//endregion Data Members

//region Constructors
Expand Down Expand Up @@ -355,7 +358,7 @@ public void stop() {
//region Private Methods
private void processData(){
while (!this.isCancellationRequested) {
int count, offset, symbolLength;
int count, offset, messageLength;
byte type;
ByteBuffer buffer, offsetBuffer;
try {
Expand All @@ -369,25 +372,23 @@ private void processData(){
for (long i = 0L; i < count; i++) {
buffer.position(0);
type = datum[offset];
symbolLength = datum[offset + 1];
messageLength = datum[offset + 1];
offsetBuffer = buffer.slice(offset, messageLength);
switch (type) {
case 0:
offsetBuffer = buffer.slice(offset, 22 + symbolLength);
Trade trade = Trade.parse(offsetBuffer, symbolLength);
Trade trade = Trade.parse(offsetBuffer);
onTrade.onTrade(trade);
offset += 22 + symbolLength;
break;
case 1:
case 2:
offsetBuffer = buffer.slice(offset, 18 + symbolLength);
Quote quote = Quote.parse(offsetBuffer, symbolLength);
Quote quote = Quote.parse(offsetBuffer);
onQuote.onQuote(quote);
offset += 18 + symbolLength;
break;
default:
Client.Log("Error parsing multi-part message. Type is %d", type);
i = count;
}
offset += messageLength;
}
}
} catch (Exception ex)
Expand All @@ -397,35 +398,11 @@ private void processData(){
}
}

public void heartbeat(){
while (!this.isCancellationRequested) {
try {
Thread.sleep(20000);
//Client.Log("Sending heartbeat");
wsLock.readLock().lock();
try {
if (wsState.isReady()) {
wsState.getWebSocket().sendBinary(ByteBuffer.wrap(new byte[] {}), true).join();
}
} finally {
wsLock.readLock().unlock();
}
} catch (InterruptedException e) {
//Client.Log("Websocket - Heartbeat Interrupted - %s", e.getMessage());
} catch (Exception e) {
Client.Log("Websocket - Heartbeat Error - %s", e.getMessage());
this.onClose(this.wsState.getWebSocket(), 1000, "Heartbeat error");
}
}
}

private void startThreads() throws Exception{
this.isCancellationRequested = false;
this.heartbeatThread = new Thread(() -> heartbeat());
for (int i = 0; i < processDataThreads.length; i++) {
processDataThreads[i] = new Thread(()->processData());
}
heartbeatThread.start();
for (Thread thread : processDataThreads) {
thread.start();
}
Expand All @@ -436,11 +413,6 @@ private void stopThreads(){
try {
Thread.sleep(1000);
}catch (Exception e){}

try {
this.heartbeatThread.join();
}catch (Exception e){}

for (Thread thread : processDataThreads) {
try {
thread.join();
Expand Down Expand Up @@ -496,7 +468,11 @@ private void initializeWebSocket(String token) {
return;
}
HttpClient httpClient = HttpClient.newHttpClient();
CompletableFuture<WebSocket> task = httpClient.newWebSocketBuilder().buildAsync(uri, (WebSocket.Listener) this);
CompletableFuture<WebSocket> task =
httpClient.newWebSocketBuilder()
.header(HeaderMessageVersionKey, HeaderMessageVersionValue)
.header(HeaderClientInformationKey, HeaderClientInformationValue)
.buildAsync(uri, (WebSocket.Listener) this);
try {
WebSocket ws = task.get();
this.wsState.setWebSocket(ws);
Expand Down Expand Up @@ -574,7 +550,7 @@ private boolean tryGetNewToken(){
HttpURLConnection con;
try {
con = (HttpURLConnection) url.openConnection();
con.setRequestProperty("Client-Information", "IntrinioRealtimeJavaSDKv5.0");
con.setRequestProperty(HeaderClientInformationKey, HeaderClientInformationValue);
} catch (IOException e) {
Client.Log("Authorization Failure. Please check your network connection. " + e.getMessage());
return false;
Expand Down
158 changes: 115 additions & 43 deletions src/intrinio/Quote.java
Original file line number Diff line number Diff line change
Expand Up @@ -8,70 +8,142 @@
* A bid or ask quote. "timestamp" is in nanoseconds since unix epoch.
* @author Intrinio *
*/
public record Quote(QuoteType type, String symbol, double price, long size, long timestamp) {
public record Quote
(QuoteType type,
String symbol,
SubProvider subProvider,
char marketCenter,
double price,
long size,
long timestamp,
String conditions) {

public String toString() {
String s =
"Quote (" +
"Type: " + this.type +
", Symbol: " + this.symbol +
", Price: " + this.price +
", Size: " + this.size +
", Timestamp: " + this.timestamp +
")";
return s;
return
"Quote (" +
"Type: " + this.type +
", Symbol: " + this.symbol +
", SubProvider: " + this.subProvider +
", MarketCenter: " + this.marketCenter +
", Price: " + this.price +
", Size: " + this.size +
", Timestamp: " + this.timestamp +
", Conditions: " + this.conditions +
")";
}

public static Quote parse(byte[] bytes, int symbolLength) {
String symbol = StandardCharsets.US_ASCII.decode(ByteBuffer.wrap(bytes, 2, symbolLength)).toString();

public static Quote parse(byte[] bytes) {
int symbolLength = bytes[2];
int conditionLength = bytes[22 + symbolLength];
String symbol = StandardCharsets.US_ASCII.decode(ByteBuffer.wrap(bytes, 3, symbolLength)).toString();

QuoteType type;
switch (bytes[0]) {
case 1: type = QuoteType.ASK;
break;
case 2: type = QuoteType.BID;
break;
default: type = QuoteType.INVALID;
case 1: type = QuoteType.ASK;
break;
case 2: type = QuoteType.BID;
break;
default: type = QuoteType.INVALID;
}

ByteBuffer priceBuffer = ByteBuffer.wrap(bytes, 2 + symbolLength, 4);

SubProvider subProvider;
switch (bytes[3 + symbolLength]) {
case 0: subProvider = SubProvider.NONE;
break;
case 1: subProvider = SubProvider.CTA_A;
break;
case 2: subProvider = SubProvider.CTA_B;
break;
case 3: subProvider = SubProvider.UTP;
break;
case 4: subProvider = SubProvider.OTC;
break;
case 5: subProvider = SubProvider.NASDAQ_BASIC;
break;
case 6: subProvider = SubProvider.IEX;
break;
default: subProvider = SubProvider.IEX;
}

ByteBuffer priceBuffer = ByteBuffer.wrap(bytes, 6 + symbolLength, 4);
priceBuffer.order(ByteOrder.LITTLE_ENDIAN);
double price = priceBuffer.getFloat();
ByteBuffer sizeBuffer = ByteBuffer.wrap(bytes, 6 + symbolLength, 4);

ByteBuffer sizeBuffer = ByteBuffer.wrap(bytes, 10 + symbolLength, 4);
sizeBuffer.order(ByteOrder.LITTLE_ENDIAN);
long size = Integer.toUnsignedLong(sizeBuffer.getInt());
ByteBuffer timeStampBuffer = ByteBuffer.wrap(bytes, 10 + symbolLength, 8);

ByteBuffer timeStampBuffer = ByteBuffer.wrap(bytes, 14 + symbolLength, 8);
timeStampBuffer.order(ByteOrder.LITTLE_ENDIAN);
long nanoSecondsSinceEpoch = timeStampBuffer.getLong();

return new Quote(type, symbol, price, size, nanoSecondsSinceEpoch);

ByteBuffer marketCenterBuffer = ByteBuffer.wrap(bytes, 4 + symbolLength, 2);
marketCenterBuffer.order(ByteOrder.LITTLE_ENDIAN);
char marketCenter = marketCenterBuffer.getChar();

String condition = "";
if (conditionLength > 0) {
condition = StandardCharsets.US_ASCII.decode(ByteBuffer.wrap(bytes, 23 + symbolLength, conditionLength)).toString();
}

return new Quote(type, symbol, subProvider, marketCenter, price, size, nanoSecondsSinceEpoch, condition);
}

public static Quote parse(ByteBuffer bytes, int symbolLength) {
String symbol = StandardCharsets.US_ASCII.decode(bytes.slice(2, symbolLength)).toString();

public static Quote parse(ByteBuffer bytes) {
int symbolLength = bytes.get(2);
int conditionLength = bytes.get(22 + symbolLength);
String symbol = StandardCharsets.US_ASCII.decode(bytes.slice(3, symbolLength)).toString();

QuoteType type;
switch (bytes.get(0)) {
case 1: type = QuoteType.ASK;
break;
case 2: type = QuoteType.BID;
break;
default: type = QuoteType.INVALID;
case 1: type = QuoteType.ASK;
break;
case 2: type = QuoteType.BID;
break;
default: type = QuoteType.INVALID;
}

SubProvider source;
switch (bytes.get(3 + symbolLength)) {
case 0: source = SubProvider.NONE;
break;
case 1: source = SubProvider.CTA_A;
break;
case 2: source = SubProvider.CTA_B;
break;
case 3: source = SubProvider.UTP;
break;
case 4: source = SubProvider.OTC;
break;
case 5: source = SubProvider.NASDAQ_BASIC;
break;
case 6: source = SubProvider.IEX;
break;
default: source = SubProvider.IEX;
}
ByteBuffer priceBuffer = bytes.slice(2 + symbolLength, 4);

ByteBuffer priceBuffer = bytes.slice(6 + symbolLength, 4);
priceBuffer.order(ByteOrder.LITTLE_ENDIAN);
double price = priceBuffer.getFloat();
ByteBuffer sizeBuffer = bytes.slice(6 + symbolLength, 4);

ByteBuffer sizeBuffer = bytes.slice(10 + symbolLength, 4);
sizeBuffer.order(ByteOrder.LITTLE_ENDIAN);
long size = Integer.toUnsignedLong(sizeBuffer.getInt());
ByteBuffer timeStampBuffer = bytes.slice(10 + symbolLength, 8);

ByteBuffer timeStampBuffer = bytes.slice(14 + symbolLength, 8);
timeStampBuffer.order(ByteOrder.LITTLE_ENDIAN);
long nanoSecondsSinceEpoch = timeStampBuffer.getLong();

return new Quote(type, symbol, price, size, nanoSecondsSinceEpoch);

ByteBuffer marketCenterBuffer = bytes.slice(4 + symbolLength, 2);
marketCenterBuffer.order(ByteOrder.LITTLE_ENDIAN);
char marketCenter = marketCenterBuffer.getChar();

String condition = "";
if (conditionLength > 0) {
condition = StandardCharsets.US_ASCII.decode(bytes.slice(23 + symbolLength, conditionLength)).toString();
}

return new Quote(type, symbol, source, marketCenter, price, size, nanoSecondsSinceEpoch, condition);
}

}
11 changes: 11 additions & 0 deletions src/intrinio/SubProvider.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
package intrinio;

public enum SubProvider {
NONE,
CTA_A,
CTA_B,
UTP,
OTC,
NASDAQ_BASIC,
IEX
}
Loading

0 comments on commit 237a777

Please sign in to comment.