-
Notifications
You must be signed in to change notification settings - Fork 7
/
Copy pathmain.go
81 lines (76 loc) · 2.22 KB
/
main.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
package main
import (
"context"
"fmt"
"io/ioutil"
"log"
"time"
"github.com/kubemq-io/kubemq-go"
"github.com/kubemq-io/kubemq-targets/pkg/uuid"
"github.com/kubemq-io/kubemq-targets/types"
)
func main() {
client, err := kubemq.NewClient(context.Background(),
kubemq.WithAddress("kubemq-cluster", 50000),
kubemq.WithClientId(uuid.New().String()),
kubemq.WithTransportType(kubemq.TransportTypeGRPC))
if err != nil {
log.Fatal(err)
}
dat, err := ioutil.ReadFile("./credentials/aws/athena/db.txt")
if err != nil {
log.Fatal(err)
}
db := string(dat)
dat, err = ioutil.ReadFile("./credentials/aws/athena/catalog.txt")
if err != nil {
log.Fatal(err)
}
catalog := string(dat)
dat, err = ioutil.ReadFile("./credentials/aws/athena/query.txt")
if err != nil {
log.Fatal(err)
}
query := string(dat)
dat, err = ioutil.ReadFile("./credentials/aws/athena/outputLocation.txt")
if err != nil {
log.Fatal(err)
}
outputLocation := string(dat)
// query Request
queryRequest := types.NewRequest().
SetMetadataKeyValue("method", "query").
SetMetadataKeyValue("db", db).
SetMetadataKeyValue("output_location", outputLocation).
SetMetadataKeyValue("catalog", catalog).
SetMetadataKeyValue("query", query)
queryResponse, err := client.SetQuery(queryRequest.ToQuery()).
SetChannel("query.aws.athena").
SetTimeout(10 * time.Second).Send(context.Background())
if err != nil {
log.Fatal(err)
}
qryResponse, err := types.ParseResponse(queryResponse.Body)
if err != nil {
log.Fatal(err)
}
executionCode := qryResponse.Metadata["execution_id"]
log.Println(fmt.Sprintf("qry executed, executionCode: %s", executionCode))
// Give query time to end
time.Sleep(2 * time.Second)
// get query result
getResultRequest := types.NewRequest().
SetMetadataKeyValue("method", "get_query_result").
SetMetadataKeyValue("execution_id", executionCode)
getResult, err := client.SetQuery(getResultRequest.ToQuery()).
SetChannel("query.aws.athena").
SetTimeout(10 * time.Second).Send(context.Background())
if err != nil {
log.Fatal(err)
}
getResponse, err := types.ParseResponse(getResult.Body)
if err != nil {
log.Fatal(err)
}
log.Println(fmt.Sprintf("get resoponse %v, error: %v", getResponse.Data, getResponse.IsError))
}