Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Consumer support signature algorithm v4 #258

Merged
merged 10 commits into from
Feb 29, 2024
5 changes: 4 additions & 1 deletion consumer/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,8 @@ type LogHubConfig struct {
//:param AutoCommitDisabled: whether to disable commit checkpoint automatically, default is false, means auto commit checkpoint
// Note that if you set autocommit to false, you must use InitConsumerWorkerWithCheckpointTracker instead of InitConsumerWorker
//:param AutoCommitIntervalInSec: default auto commit interval, default is 30

//:param AuthVersion: signature algorithm version, default is sls.AuthV1
//:param Region: region of sls endpoint, eg. cn-hangzhou, region must be set if AuthVersion is sls.AuthV4
Endpoint string
AccessKeyID string
AccessKeySecret string
Expand All @@ -73,6 +74,8 @@ type LogHubConfig struct {
SecurityToken string
AutoCommitDisabled bool
AutoCommitIntervalInMS int64
AuthVersion sls.AuthVersionType
Region string
}

const (
Expand Down
26 changes: 17 additions & 9 deletions consumer/consumer_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ import (

type ConsumerClient struct {
option LogHubConfig
client *sls.Client
client sls.ClientInterface
consumerGroup sls.ConsumerGroup
logger log.Logger
}
Expand All @@ -33,19 +33,27 @@ func initConsumerClient(option LogHubConfig, logger log.Logger) *ConsumerClient
if option.AutoCommitIntervalInMS == 0 {
option.AutoCommitIntervalInMS = 60 * 1000
}
client := &sls.Client{
Endpoint: option.Endpoint,
AccessKeyID: option.AccessKeyID,
AccessKeySecret: option.AccessKeySecret,
SecurityToken: option.SecurityToken,
UserAgent: option.ConsumerGroupName + "_" + option.ConsumerName,
}
var client sls.ClientInterface
if option.CredentialsProvider != nil {
client = client.WithCredentialsProvider(option.CredentialsProvider)
client = sls.CreateNormalInterfaceV2(option.Endpoint, option.CredentialsProvider)
} else {
client = sls.CreateNormalInterface(option.Endpoint,
option.AccessKeyID,
option.AccessKeySecret,
option.SecurityToken)
}
client.SetUserAgent(option.ConsumerGroupName + "_" + option.ConsumerName)

if option.HTTPClient != nil {
client.SetHTTPClient(option.HTTPClient)
}
if option.AuthVersion != "" {
client.SetAuthVersion(option.AuthVersion)
}
if option.Region != "" {
client.SetRegion(option.Region)
}

consumerGroup := sls.ConsumerGroup{
ConsumerGroupName: option.ConsumerGroupName,
Timeout: option.HeartbeatTimeoutInSecond,
Expand Down
2 changes: 1 addition & 1 deletion consumer/consumer_client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,7 @@ func TestConsumerClient_createConsumerGroup(t *testing.T) {
}
}

func internalGetConsumerGroup(client *sls.Client, project, logstore, groupName string) (sls.ConsumerGroup, error) {
func internalGetConsumerGroup(client sls.ClientInterface, project, logstore, groupName string) (sls.ConsumerGroup, error) {
cgs, err := client.ListConsumerGroup(project, logstore)
if err != nil {
return sls.ConsumerGroup{}, err
Expand Down
14 changes: 8 additions & 6 deletions example/alert/alert_example.go
Original file line number Diff line number Diff line change
@@ -1,8 +1,9 @@
package main

import (
sls "github.com/aliyun/aliyun-log-go-sdk"
"time"

sls "github.com/aliyun/aliyun-log-go-sdk"
)

func main() {
Expand All @@ -12,11 +13,12 @@ func main() {
logstore := "002"
dashboardName := "dashboardtest"
alertName := "test-alert"
client := &sls.Client{
Endpoint: "cn-hangzhou.log.aliyuncs.com",
AccessKeyID: accessKeyID,
AccessKeySecret: accessKeySecret,
}
client := sls.CreateNormalInterface(
"cn-hangzhou.log.aliyuncs.com",
accessKeyID,
accessKeySecret,
"",
)
chart := sls.Chart{
Title: "chart-1234567",
Type: "table",
Expand Down
13 changes: 7 additions & 6 deletions example/consumer/reset_checkpoint_demo.go
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,7 @@ func process(shardId int, logGroupList *sls.LogGroupList, checkpointTracker cons
return "", nil
}

func updateCheckpoint(config consumerLibrary.LogHubConfig, client *sls.Client, shardId int) error {
func updateCheckpoint(config consumerLibrary.LogHubConfig, client sls.ClientInterface, shardId int) error {
from := fmt.Sprintf("%d", time.Now().Unix())
cursor, err := client.GetCursor(config.Project, config.Logstore, shardId, from)
if err != nil {
Expand All @@ -67,11 +67,12 @@ func updateCheckpoint(config consumerLibrary.LogHubConfig, client *sls.Client, s
}

func UpdateConsumerGroupCheckPoint(config consumerLibrary.LogHubConfig) error {
client := &sls.Client{
Endpoint: config.Endpoint,
AccessKeyID: config.AccessKeyID,
AccessKeySecret: config.AccessKeySecret,
}
client := sls.CreateNormalInterface(
config.Endpoint,
config.AccessKeyID,
config.AccessKeySecret,
"",
)
shards, err := client.ListShards(config.Project, config.Logstore)
if err != nil {
return err
Expand Down
7 changes: 4 additions & 3 deletions example/metric_agg/metric_agg_example.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,10 +3,11 @@ package main
import (
"encoding/json"
"fmt"

sls "github.com/aliyun/aliyun-log-go-sdk"
)

func crud(client sls.Client, sourceProject string, aggRules *sls.MetricAggRules, testId string) {
func crud(client *sls.Client, sourceProject string, aggRules *sls.MetricAggRules, testId string) {
err := client.CreateMetricAggRules(sourceProject, aggRules)
if err != nil {
panic(err)
Expand Down Expand Up @@ -137,9 +138,9 @@ func main() {

testId := "metric_agg_rules1"
aggRules := sqlConfig(accessKeyID, accessKeySecret, testId)
crud(*client, sourceProject, aggRules, testId)
crud(client, sourceProject, aggRules, testId)

testId = "metric_agg_rules2"
aggRules = promqlConfig(accessKeyID, accessKeySecret, testId)
crud(*client, sourceProject, aggRules, testId)
crud(client, sourceProject, aggRules, testId)
}
44 changes: 44 additions & 0 deletions example/signv4/signv4.go
Original file line number Diff line number Diff line change
@@ -1,8 +1,15 @@
package main

import (
"fmt"
"os"
"os/signal"
"syscall"

sls "github.com/aliyun/aliyun-log-go-sdk"
consumerLibrary "github.com/aliyun/aliyun-log-go-sdk/consumer"
"github.com/aliyun/aliyun-log-go-sdk/util"
"github.com/go-kit/kit/log/level"
)

func main() {
Expand All @@ -20,3 +27,40 @@ func main() {

client.GetProject("example-project") // call client API
}

func CreateSignV4Consumer() {
accessKeyId, accessKeySecret := "", "" // replace with your access key and secret
endpoint := "cn-hangzhou-intranet.log.aliyuncs.com" // replace with your endpoint
region, err := util.ParseRegion(endpoint) // parse region from endpoint
if err != nil {
panic(err)
}
option := consumerLibrary.LogHubConfig{
Endpoint: endpoint,
AccessKeyID: accessKeyId,
AccessKeySecret: accessKeySecret,
Project: "example-project",
Logstore: "example-logstore",
ConsumerGroupName: "example-consumer-group",
ConsumerName: "example-consumer-group-consumer-1",
CursorPosition: consumerLibrary.END_CURSOR,

AuthVersion: sls.AuthV4, // use signature v4
Region: region, // region must be set if using signature v4
}
// create consumer
consumerWorker := consumerLibrary.InitConsumerWorkerWithCheckpointTracker(option, process)
ch := make(chan os.Signal, 1)
signal.Notify(ch, syscall.SIGINT, syscall.SIGTERM)
consumerWorker.Start()
if _, ok := <-ch; ok {
level.Info(consumerWorker.Logger).Log("msg", "get stop signal, start to stop consumer worker", "consumer worker name", option.ConsumerName)
consumerWorker.StopAndWait()
}
}

func process(shardId int, logGroupList *sls.LogGroupList, checkpointTracker consumerLibrary.CheckPointTracker) (string, error) {
fmt.Println(shardId, logGroupList)
checkpointTracker.SaveCheckPoint(true)
return "", nil
}
Loading