From 4e08aa7f687ade78e2d8894a324dbc10d312f99f Mon Sep 17 00:00:00 2001 From: Mateo Greil Date: Tue, 7 May 2024 21:12:05 +0200 Subject: [PATCH] :sparkles: GetCandles --- internal/protocols/stream/request.go | 6 ++ internal/protocols/stream/response.go | 27 ++++++++ internal/protocols/types.go | 13 ++++ xapi.go | 97 +++++++++++++++++++++------ xapi_test.go | 28 ++++++-- 5 files changed, 146 insertions(+), 25 deletions(-) create mode 100644 internal/protocols/types.go diff --git a/internal/protocols/stream/request.go b/internal/protocols/stream/request.go index 1bac046..b555dfa 100644 --- a/internal/protocols/stream/request.go +++ b/internal/protocols/stream/request.go @@ -4,3 +4,9 @@ type Request struct { Command string `json:"command"` StreamSessionId string `json:"streamSessionId"` } + +type GetCandlesRequest struct { + Command string `json:"command"` + StreamSessionId string `json:"streamSessionId"` + Symbol string `json:"symbol"` +} diff --git a/internal/protocols/stream/response.go b/internal/protocols/stream/response.go index 479ba2e..c76f2f7 100644 --- a/internal/protocols/stream/response.go +++ b/internal/protocols/stream/response.go @@ -1,5 +1,32 @@ package stream type Response struct { + Command string `json:"command"` +} + +type KeepAliveResponse struct { + Command string `json:"command"` + Data KeepAliveData `json:"data"` +} + +type KeepAliveData struct { + Timestamp int `json:"timestamp"` +} + +type ResponseCandle struct { Command string `json:"response"` + Data Candle `json:"data"` +} + +// TODO: Move it to a common package (stream and socket use it) +type Candle struct { + Close float64 `json:"close"` + Ctm int64 `json:"ctm"` + CtmString string `json:"ctmString"` + High float64 `json:"high"` + Low float64 `json:"low"` + Open float64 `json:"open"` + QuoteId int `json:"quoteId"` + Symbol string `json:"symbol"` + Vol float64 `json:"vol"` } diff --git a/internal/protocols/types.go b/internal/protocols/types.go new file mode 100644 index 0000000..85e8380 --- /dev/null +++ b/internal/protocols/types.go @@ -0,0 +1,13 @@ +package xapi + +type Candle struct { + Close float64 `json:"close"` + Ctm int64 `json:"ctm"` + CtmString string `json:"ctmString"` + High float64 `json:"high"` + Low float64 `json:"low"` + Open float64 `json:"open"` + QuoteId int `json:"quoteId"` + Symbol string `json:"symbol"` + Vol float64 `json:"vol"` +} diff --git a/xapi.go b/xapi.go index f16165e..18797b3 100644 --- a/xapi.go +++ b/xapi.go @@ -5,15 +5,18 @@ import ( "fmt" "time" - socket "github.com/MateoGreil/xapi-go/internal/protocols/socket" - stream "github.com/MateoGreil/xapi-go/internal/protocols/stream" + "github.com/MateoGreil/xapi-go/internal/protocols/socket" + "github.com/MateoGreil/xapi-go/internal/protocols/stream" "github.com/gorilla/websocket" ) type client struct { - conn *websocket.Conn - streamConn *websocket.Conn - streamSessionId string + conn *websocket.Conn + streamConn *websocket.Conn + streamSessionId string + socketMessageChannel chan interface{} + streamMessageChannel chan interface{} + CandlesChannel chan stream.Candle } const ( @@ -48,33 +51,58 @@ func NewClient(userId string, password string, connectionType string) (*client, if err != nil { return nil, err } - getKeepAlive(conn, streamSessionId) + getKeepAlive(streamConn, streamSessionId) c := &client{ - conn: conn, - streamConn: streamConn, - streamSessionId: streamSessionId, + conn: conn, + streamConn: streamConn, + streamSessionId: streamSessionId, + socketMessageChannel: make(chan interface{}), + streamMessageChannel: make(chan interface{}), + CandlesChannel: make(chan stream.Candle), } go c.pingSocket() go c.pingStream() go c.listenStream() + go c.socketWriteJSON() + go c.streamWriteJSON() return c, nil } +func (c *client) SubscribeCandles(symbol string) { + request := stream.GetCandlesRequest{ + Command: "getCandles", + StreamSessionId: c.streamSessionId, + Symbol: symbol, + } + c.streamMessageChannel <- request +} + func (c *client) listenStream() { for { - _, msg, err := c.streamConn.ReadMessage() + _, message, err := c.streamConn.ReadMessage() if err != nil { fmt.Println(err.Error()) } - fmt.Println("Stream message:", msg) response := stream.Response{} - err = json.Unmarshal(msg, &response) + err = json.Unmarshal(message, &response) if err != nil { + fmt.Printf("message: %s\n", message) fmt.Println(err.Error()) } switch response.Command { + case "candle": + responseCandle := stream.ResponseCandle{} + err = json.Unmarshal(message, &responseCandle) + if err != nil { + fmt.Println(err.Error()) + } + c.CandlesChannel <- responseCandle.Data + case "keepAlive": + fmt.Printf("keepAlive received\n") + default: + fmt.Printf("Unknown stream message: %s\n", message) } } } @@ -85,12 +113,12 @@ func (c *client) pingSocket() { Command: "ping", Arguments: nil, } - c.conn.WriteJSON(request) - response := socket.Response{} - err := c.conn.ReadJSON(&response) - if err != nil { - fmt.Println(err.Error()) - } + c.socketMessageChannel <- request + // response := socket.Response{} + // err := c.conn.ReadJSON(&response) + // if err != nil { + // fmt.Println(err.Error()) + // } time.Sleep(pingInterval) } } @@ -101,11 +129,27 @@ func (c *client) pingStream() { Command: "ping", StreamSessionId: c.streamSessionId, } - c.streamConn.WriteJSON(request) + c.streamMessageChannel <- request time.Sleep(pingInterval) } } +func (c *client) streamWriteJSON() { + for { + message := <-c.streamMessageChannel + c.streamConn.WriteJSON(message) + fmt.Printf("messageStream: %+v\n", message) + } +} + +func (c *client) socketWriteJSON() { + for { + message := <-c.socketMessageChannel + c.conn.WriteJSON(message) + fmt.Printf("messageSocket: %+v\n", message) + } +} + func login(conn *websocket.Conn, userId string, password string) (string, error) { request := socket.Request{ Command: "login", @@ -133,4 +177,19 @@ func getKeepAlive(conn *websocket.Conn, streamSessionId string) { StreamSessionId: streamSessionId, } conn.WriteJSON(keepAliveReq) + _, message, err := conn.ReadMessage() + if err != nil { + // TODO: Handle errors + fmt.Println(err.Error()) + } + response := stream.KeepAliveResponse{} + err = json.Unmarshal(message, &response) + if err != nil { + // TODO: Handle errors + fmt.Println(err.Error()) + } + if response.Command != "keepAlive" { + // TODO: Handle errors + fmt.Println(err.Error()) + } } diff --git a/xapi_test.go b/xapi_test.go index 62b1c1a..14daee9 100644 --- a/xapi_test.go +++ b/xapi_test.go @@ -4,15 +4,11 @@ import ( "fmt" "os" "testing" + "time" ) func TestNewClient(t *testing.T) { - _, err := NewClient(os.Getenv("XAPI_USER_ID"), os.Getenv("XAPI_PASSWORD"), "demo") - if err != nil { - t.Error(err) - } - - _, err = NewClient(os.Getenv("XAPI_USER_ID"), "wrong-password", "demo") + _, err := NewClient(os.Getenv("XAPI_USER_ID"), "wrong-password", "demo") if err.Error() != "userPasswordCheck: Invalid login or password" { t.Error(err) } @@ -22,4 +18,24 @@ func TestNewClient(t *testing.T) { fmt.Println(err) t.Error(err) } + + _, err = NewClient(os.Getenv("XAPI_USER_ID"), os.Getenv("XAPI_PASSWORD"), "demo") + if err != nil { + t.Error(err) + } +} + +func TestSuscribeCandles(t *testing.T) { + xapiClient, err := NewClient(os.Getenv("XAPI_USER_ID"), os.Getenv("XAPI_PASSWORD"), "demo") + if err != nil { + t.Error(err) + } + + xapiClient.SubscribeCandles("EURUSD") + select { + case candle := <-xapiClient.CandlesChannel: + fmt.Printf("%+v\n", candle) + case <-time.After(2 * time.Minute): + t.Error("Did not receive candles") + } }