From 4b249c23d7c8d198f350525f73216fb8bb7a39d9 Mon Sep 17 00:00:00 2001 From: crimson <1291463831@qq.com> Date: Fri, 23 Feb 2024 10:12:40 +0800 Subject: [PATCH 1/5] consumer support signature v4 --- consumer/config.go | 5 ++++- consumer/consumer_client.go | 8 +++++++ example/signv4.go | 44 +++++++++++++++++++++++++++++++++++++ 3 files changed, 56 insertions(+), 1 deletion(-) diff --git a/consumer/config.go b/consumer/config.go index a3f303e2..2e55453a 100644 --- a/consumer/config.go +++ b/consumer/config.go @@ -46,7 +46,8 @@ type LogHubConfig struct { //:param AutoCommitDisabled: whether to disable commit checkpoint automatically, default is false, means auto commit checkpoint // Note that if you set autocommit to false, you must use InitConsumerWorkerWithCheckpointTracker instead of InitConsumerWorker //:param AutoCommitIntervalInSec: default auto commit interval, default is 30 - + //:param AuthVersion: signature algorithm version, default is sls.AuthV1 + //:param Region: region of sls endpoint, eg. cn-hangzhou, region must be set if AuthVersion is sls.AuthV4 Endpoint string AccessKeyID string AccessKeySecret string @@ -73,6 +74,8 @@ type LogHubConfig struct { SecurityToken string AutoCommitDisabled bool AutoCommitIntervalInMS int64 + AuthVersion sls.AuthVersionType + Region string } const ( diff --git a/consumer/consumer_client.go b/consumer/consumer_client.go index cc6e0597..b0659c49 100644 --- a/consumer/consumer_client.go +++ b/consumer/consumer_client.go @@ -46,6 +46,14 @@ func initConsumerClient(option LogHubConfig, logger log.Logger) *ConsumerClient if option.HTTPClient != nil { client.SetHTTPClient(option.HTTPClient) } + if option.AuthVersion != "" { + client.SetAuthVersion(option.AuthVersion) + } + if option.Region != "" { + client.SetRegion(option.Region) + } else if option.AuthVersion == sls.AuthV4 { + level.Error(logger).Log("error", "region must be set when using signature v4") + } consumerGroup := sls.ConsumerGroup{ ConsumerGroupName: option.ConsumerGroupName, Timeout: option.HeartbeatTimeoutInSecond, diff --git a/example/signv4.go b/example/signv4.go index 6df15e2b..b52d8da0 100644 --- a/example/signv4.go +++ b/example/signv4.go @@ -1,8 +1,15 @@ package main import ( + "fmt" + "os" + "os/signal" + "syscall" + sls "github.com/aliyun/aliyun-log-go-sdk" + consumerLibrary "github.com/aliyun/aliyun-log-go-sdk/consumer" "github.com/aliyun/aliyun-log-go-sdk/util" + "github.com/go-kit/kit/log/level" ) func CreateSignV4Client() { @@ -20,3 +27,40 @@ func CreateSignV4Client() { client.GetProject("example-project") // call client API } + +func CreateSignV4Consumer() { + accessKeyId, accessKeySecret := "", "" // replace with your access key and secret + endpoint := "cn-hangzhou-share.log.aliyuncs.com" // replace with your endpoint + region, err := util.ParseRegion(endpoint) // parse region from endpoint + if err != nil { + panic(err) + } + option := consumerLibrary.LogHubConfig{ + Endpoint: endpoint, + AccessKeyID: accessKeyId, + AccessKeySecret: accessKeySecret, + Project: "example-project", + Logstore: "example-logstore", + ConsumerGroupName: "example-consumer-group", + ConsumerName: "example-consumer-group-consumer-1", + CursorPosition: consumerLibrary.END_CURSOR, + + AuthVersion: sls.AuthV4, // use signature v4 + Region: region, // region must be set if using signature v4 + } + // create consumer + consumerWorker := consumerLibrary.InitConsumerWorkerWithCheckpointTracker(option, process) + ch := make(chan os.Signal, 1) + signal.Notify(ch, syscall.SIGINT, syscall.SIGTERM) + consumerWorker.Start() + if _, ok := <-ch; ok { + level.Info(consumerWorker.Logger).Log("msg", "get stop signal, start to stop consumer worker", "consumer worker name", option.ConsumerName) + consumerWorker.StopAndWait() + } +} + +func process(shardId int, logGroupList *sls.LogGroupList, checkpointTracker consumerLibrary.CheckPointTracker) (string, error) { + fmt.Println(shardId, logGroupList) + checkpointTracker.SaveCheckPoint(true) + return "", nil +} From c487941b288e2edbcb32db56d239eb48de730472 Mon Sep 17 00:00:00 2001 From: crimson <1291463831@qq.com> Date: Fri, 23 Feb 2024 17:24:45 +0800 Subject: [PATCH 2/5] refine new client --- consumer/consumer_client.go | 19 ++++++++++--------- example/alert/alert_example.go | 14 ++++++++------ example/consumer/reset_checkpoint_demo.go | 13 +++++++------ example/metric_agg/metric_agg_example.go | 7 ++++--- 4 files changed, 29 insertions(+), 24 deletions(-) diff --git a/consumer/consumer_client.go b/consumer/consumer_client.go index b0659c49..3aad5e08 100644 --- a/consumer/consumer_client.go +++ b/consumer/consumer_client.go @@ -33,16 +33,17 @@ func initConsumerClient(option LogHubConfig, logger log.Logger) *ConsumerClient if option.AutoCommitIntervalInMS == 0 { option.AutoCommitIntervalInMS = 60 * 1000 } - client := &sls.Client{ - Endpoint: option.Endpoint, - AccessKeyID: option.AccessKeyID, - AccessKeySecret: option.AccessKeySecret, - SecurityToken: option.SecurityToken, - UserAgent: option.ConsumerGroupName + "_" + option.ConsumerName, - } + var client sls.ClientInterface if option.CredentialsProvider != nil { - client = client.WithCredentialsProvider(option.CredentialsProvider) + client = sls.CreateNormalInterfaceV2(option.Endpoint, option.CredentialsProvider) + } else { + client = sls.CreateNormalInterface(option.Endpoint, + option.AccessKeyID, + option.AccessKeySecret, + option.SecurityToken) } + client.SetUserAgent(option.ConsumerGroupName + "_" + option.ConsumerName) + if option.HTTPClient != nil { client.SetHTTPClient(option.HTTPClient) } @@ -61,7 +62,7 @@ func initConsumerClient(option LogHubConfig, logger log.Logger) *ConsumerClient } consumerClient := &ConsumerClient{ option, - client, + client.(*sls.Client), consumerGroup, logger, } diff --git a/example/alert/alert_example.go b/example/alert/alert_example.go index 08062d97..6a11faeb 100644 --- a/example/alert/alert_example.go +++ b/example/alert/alert_example.go @@ -1,8 +1,9 @@ package main import ( - sls "github.com/aliyun/aliyun-log-go-sdk" "time" + + sls "github.com/aliyun/aliyun-log-go-sdk" ) func main() { @@ -12,11 +13,12 @@ func main() { logstore := "002" dashboardName := "dashboardtest" alertName := "test-alert" - client := &sls.Client{ - Endpoint: "cn-hangzhou.log.aliyuncs.com", - AccessKeyID: accessKeyID, - AccessKeySecret: accessKeySecret, - } + client := sls.CreateNormalInterface( + "cn-hangzhou.log.aliyuncs.com", + accessKeyID, + accessKeySecret, + "", + ) chart := sls.Chart{ Title: "chart-1234567", Type: "table", diff --git a/example/consumer/reset_checkpoint_demo.go b/example/consumer/reset_checkpoint_demo.go index 323a4c71..3f158369 100644 --- a/example/consumer/reset_checkpoint_demo.go +++ b/example/consumer/reset_checkpoint_demo.go @@ -56,7 +56,7 @@ func process(shardId int, logGroupList *sls.LogGroupList, checkpointTracker cons return "", nil } -func updateCheckpoint(config consumerLibrary.LogHubConfig, client *sls.Client, shardId int) error { +func updateCheckpoint(config consumerLibrary.LogHubConfig, client sls.ClientInterface, shardId int) error { from := fmt.Sprintf("%d", time.Now().Unix()) cursor, err := client.GetCursor(config.Project, config.Logstore, shardId, from) if err != nil { @@ -67,11 +67,12 @@ func updateCheckpoint(config consumerLibrary.LogHubConfig, client *sls.Client, s } func UpdateConsumerGroupCheckPoint(config consumerLibrary.LogHubConfig) error { - client := &sls.Client{ - Endpoint: config.Endpoint, - AccessKeyID: config.AccessKeyID, - AccessKeySecret: config.AccessKeySecret, - } + client := sls.CreateNormalInterface( + config.Endpoint, + config.AccessKeyID, + config.AccessKeySecret, + "", + ) shards, err := client.ListShards(config.Project, config.Logstore) if err != nil { return err diff --git a/example/metric_agg/metric_agg_example.go b/example/metric_agg/metric_agg_example.go index 412d8ef5..9849b193 100644 --- a/example/metric_agg/metric_agg_example.go +++ b/example/metric_agg/metric_agg_example.go @@ -3,10 +3,11 @@ package main import ( "encoding/json" "fmt" + sls "github.com/aliyun/aliyun-log-go-sdk" ) -func crud(client sls.Client, sourceProject string, aggRules *sls.MetricAggRules, testId string) { +func crud(client *sls.Client, sourceProject string, aggRules *sls.MetricAggRules, testId string) { err := client.CreateMetricAggRules(sourceProject, aggRules) if err != nil { panic(err) @@ -137,9 +138,9 @@ func main() { testId := "metric_agg_rules1" aggRules := sqlConfig(accessKeyID, accessKeySecret, testId) - crud(*client, sourceProject, aggRules, testId) + crud(client, sourceProject, aggRules, testId) testId = "metric_agg_rules2" aggRules = promqlConfig(accessKeyID, accessKeySecret, testId) - crud(*client, sourceProject, aggRules, testId) + crud(client, sourceProject, aggRules, testId) } From 63adedf702d9e52e1da761f5c7b8fb0ede27ff4a Mon Sep 17 00:00:00 2001 From: crimson <1291463831@qq.com> Date: Fri, 23 Feb 2024 17:28:44 +0800 Subject: [PATCH 3/5] rm error msg --- consumer/consumer_client.go | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/consumer/consumer_client.go b/consumer/consumer_client.go index 3aad5e08..c8d2a1d5 100644 --- a/consumer/consumer_client.go +++ b/consumer/consumer_client.go @@ -52,9 +52,8 @@ func initConsumerClient(option LogHubConfig, logger log.Logger) *ConsumerClient } if option.Region != "" { client.SetRegion(option.Region) - } else if option.AuthVersion == sls.AuthV4 { - level.Error(logger).Log("error", "region must be set when using signature v4") } + consumerGroup := sls.ConsumerGroup{ ConsumerGroupName: option.ConsumerGroupName, Timeout: option.HeartbeatTimeoutInSecond, From 85b2b745a2f650025880d527915455c77d8c702c Mon Sep 17 00:00:00 2001 From: crimson <1291463831@qq.com> Date: Tue, 27 Feb 2024 16:08:13 +0800 Subject: [PATCH 4/5] refine --- example/signv4/signv4.go | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/example/signv4/signv4.go b/example/signv4/signv4.go index c0a8553a..d2f88a90 100644 --- a/example/signv4/signv4.go +++ b/example/signv4/signv4.go @@ -29,9 +29,9 @@ func main() { } func CreateSignV4Consumer() { - accessKeyId, accessKeySecret := "", "" // replace with your access key and secret - endpoint := "cn-hangzhou-share.log.aliyuncs.com" // replace with your endpoint - region, err := util.ParseRegion(endpoint) // parse region from endpoint + accessKeyId, accessKeySecret := "", "" // replace with your access key and secret + endpoint := "cn-hangzhou-intranet.log.aliyuncs.com" // replace with your endpoint + region, err := util.ParseRegion(endpoint) // parse region from endpoint if err != nil { panic(err) } From a248f2f78c65ff1dbd74949cd814fe743fa2303d Mon Sep 17 00:00:00 2001 From: crimson <1291463831@qq.com> Date: Tue, 27 Feb 2024 16:19:11 +0800 Subject: [PATCH 5/5] use clientInterface --- consumer/consumer_client.go | 4 ++-- consumer/consumer_client_test.go | 2 +- 2 files changed, 3 insertions(+), 3 deletions(-) diff --git a/consumer/consumer_client.go b/consumer/consumer_client.go index c8d2a1d5..4c8b8a2f 100644 --- a/consumer/consumer_client.go +++ b/consumer/consumer_client.go @@ -11,7 +11,7 @@ import ( type ConsumerClient struct { option LogHubConfig - client *sls.Client + client sls.ClientInterface consumerGroup sls.ConsumerGroup logger log.Logger } @@ -61,7 +61,7 @@ func initConsumerClient(option LogHubConfig, logger log.Logger) *ConsumerClient } consumerClient := &ConsumerClient{ option, - client.(*sls.Client), + client, consumerGroup, logger, } diff --git a/consumer/consumer_client_test.go b/consumer/consumer_client_test.go index 407dd5d5..01e9831b 100644 --- a/consumer/consumer_client_test.go +++ b/consumer/consumer_client_test.go @@ -61,7 +61,7 @@ func TestConsumerClient_createConsumerGroup(t *testing.T) { } } -func internalGetConsumerGroup(client *sls.Client, project, logstore, groupName string) (sls.ConsumerGroup, error) { +func internalGetConsumerGroup(client sls.ClientInterface, project, logstore, groupName string) (sls.ConsumerGroup, error) { cgs, err := client.ListConsumerGroup(project, logstore) if err != nil { return sls.ConsumerGroup{}, err