From 83f548bd0f1244583d37e63c34c062ec6978db60 Mon Sep 17 00:00:00 2001 From: Doug Friedman Date: Tue, 24 Mar 2015 23:10:25 -0400 Subject: [PATCH] v0 basic working prototype --- force/stream.go | 42 +++++++++++++++++++++++++++++++++--------- 1 file changed, 33 insertions(+), 9 deletions(-) diff --git a/force/stream.go b/force/stream.go index cd44fcd..b53a336 100644 --- a/force/stream.go +++ b/force/stream.go @@ -7,12 +7,13 @@ import ( "io/ioutil" "net/http" "net/http/cookiejar" + //"reflect" "strings" ) type forceStreaming struct { ClientId string - SubscribedPushTopics map[string]func(...interface{}) + SubscribedPushTopics map[string]func([]byte, ...interface{}) Timeout int forceApi *ForceApi longPollClient *http.Client @@ -47,7 +48,7 @@ func (forceApi *ForceApi) ConnectToStreamingApi() { PublicSuffixList: publicsuffix.List, } jar, _ := cookiejar.New(&cookiejarOptions) - forceApi.stream = &forceStreaming{"", nil, 0, forceApi, &http.Client{Jar: jar}} + forceApi.stream = &forceStreaming{"", map[string]func([]byte, ...interface{}){}, 0, forceApi, &http.Client{Jar: jar}} //handshake var params = `{"channel":"/meta/handshake", "supportedConnectionTypes":["long-polling"], "version":"1.0"}` @@ -63,25 +64,48 @@ func (forceApi *ForceApi) ConnectToStreamingApi() { //must handle error here // connect - connResp, _ := forceApi.stream.connect() - fmt.Println(string(connResp)) + connBytes, _ := forceApi.stream.connect() + + var connectData []map[string]interface{} + json.Unmarshal(connBytes, &connectData) + for _, msg := range data { + cb := forceApi.stream.SubscribedPushTopics[msg["channel"].(string)] + if cb != nil { + cb(connBytes) + } + fmt.Println(string(connBytes)) + } + go func() { // got to allow disconnect, handle errors for { - connResp, _ = forceApi.stream.connect() - fmt.Println(string(connResp)) + connBytes, _ = forceApi.stream.connect() + json.Unmarshal(connBytes, &connectData) + + for _, msg := range connectData { + cb := forceApi.stream.SubscribedPushTopics[msg["channel"].(string)] + if cb != nil { + cb(connBytes) + } + } + //fmt.Println(string(connBytes)) } }() } //here we have to allow the ability to pass in a callback function -func (forceApi *ForceApi) SubscribeToPushTopic(pushTopic string) { +func (forceApi *ForceApi) SubscribeToPushTopic(pushTopic string, callback func([]byte, ...interface{})) ([]byte, error) { topicString := "/topic/" + pushTopic subscribeParams := `{ "channel": "/meta/subscribe", "clientId": "` + forceApi.stream.ClientId + `", "subscription": "` + topicString + `"}` + subscribeResp, _ := forceApi.stream.httpPost(subscribeParams) - subscribeBytes, _ := ioutil.ReadAll(subscribeResp.Body) + subscribeBytes, err := ioutil.ReadAll(subscribeResp.Body) + defer subscribeResp.Body.Close() - fmt.Println(string(subscribeBytes)) + + forceApi.stream.SubscribedPushTopics[topicString] = callback + return subscribeBytes, err + } func UnsubscribeFromPushTopic(pushTopic string) {