Skip to content

Commit

Permalink
v0 basic working prototype
Browse files Browse the repository at this point in the history
  • Loading branch information
Doug Friedman committed Mar 25, 2015
1 parent 83b8a7e commit 83f548b
Showing 1 changed file with 33 additions and 9 deletions.
42 changes: 33 additions & 9 deletions force/stream.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,12 +7,13 @@ import (
"io/ioutil"
"net/http"
"net/http/cookiejar"
//"reflect"
"strings"
)

type forceStreaming struct {
ClientId string
SubscribedPushTopics map[string]func(...interface{})
SubscribedPushTopics map[string]func([]byte, ...interface{})
Timeout int
forceApi *ForceApi
longPollClient *http.Client
Expand Down Expand Up @@ -47,7 +48,7 @@ func (forceApi *ForceApi) ConnectToStreamingApi() {
PublicSuffixList: publicsuffix.List,
}
jar, _ := cookiejar.New(&cookiejarOptions)
forceApi.stream = &forceStreaming{"", nil, 0, forceApi, &http.Client{Jar: jar}}
forceApi.stream = &forceStreaming{"", map[string]func([]byte, ...interface{}){}, 0, forceApi, &http.Client{Jar: jar}}

//handshake
var params = `{"channel":"/meta/handshake", "supportedConnectionTypes":["long-polling"], "version":"1.0"}`
Expand All @@ -63,25 +64,48 @@ func (forceApi *ForceApi) ConnectToStreamingApi() {
//must handle error here

// connect
connResp, _ := forceApi.stream.connect()
fmt.Println(string(connResp))
connBytes, _ := forceApi.stream.connect()

var connectData []map[string]interface{}
json.Unmarshal(connBytes, &connectData)
for _, msg := range data {
cb := forceApi.stream.SubscribedPushTopics[msg["channel"].(string)]
if cb != nil {
cb(connBytes)
}
fmt.Println(string(connBytes))
}

go func() {
// got to allow disconnect, handle errors
for {
connResp, _ = forceApi.stream.connect()
fmt.Println(string(connResp))
connBytes, _ = forceApi.stream.connect()
json.Unmarshal(connBytes, &connectData)

for _, msg := range connectData {
cb := forceApi.stream.SubscribedPushTopics[msg["channel"].(string)]
if cb != nil {
cb(connBytes)
}
}
//fmt.Println(string(connBytes))
}
}()
}

//here we have to allow the ability to pass in a callback function
func (forceApi *ForceApi) SubscribeToPushTopic(pushTopic string) {
func (forceApi *ForceApi) SubscribeToPushTopic(pushTopic string, callback func([]byte, ...interface{})) ([]byte, error) {
topicString := "/topic/" + pushTopic
subscribeParams := `{ "channel": "/meta/subscribe", "clientId": "` + forceApi.stream.ClientId + `", "subscription": "` + topicString + `"}`

subscribeResp, _ := forceApi.stream.httpPost(subscribeParams)
subscribeBytes, _ := ioutil.ReadAll(subscribeResp.Body)
subscribeBytes, err := ioutil.ReadAll(subscribeResp.Body)

defer subscribeResp.Body.Close()
fmt.Println(string(subscribeBytes))

forceApi.stream.SubscribedPushTopics[topicString] = callback
return subscribeBytes, err

}

func UnsubscribeFromPushTopic(pushTopic string) {
Expand Down

0 comments on commit 83f548b

Please sign in to comment.