diff --git a/go.mod b/go.mod index f4eb0d78ef..fda714b35b 100644 --- a/go.mod +++ b/go.mod @@ -66,7 +66,7 @@ require ( github.com/mitchellh/mapstructure v1.5.0 github.com/olekukonko/tablewriter v0.0.5 github.com/onsi/ginkgo/v2 v2.22.1 - github.com/onsi/gomega v1.36.1 + github.com/onsi/gomega v1.36.2 github.com/ory/dockertest/v3 v3.11.0 github.com/oschwald/maxminddb-golang v1.13.1 github.com/phayes/freeport v0.0.0-20220201140144-74d24b5ae9f5 @@ -110,7 +110,7 @@ require ( google.golang.org/api v0.214.0 google.golang.org/genproto/googleapis/rpc v0.0.0-20241209162323-e6fa225c2576 google.golang.org/grpc v1.68.1 - google.golang.org/protobuf v1.36.0 + google.golang.org/protobuf v1.36.1 ) require ( diff --git a/go.sum b/go.sum index c080e8365e..9acae8f910 100644 --- a/go.sum +++ b/go.sum @@ -1072,8 +1072,8 @@ github.com/onsi/ginkgo/v2 v2.22.1/go.mod h1:S6aTpoRsSq2cZOd+pssHAlKW/Q/jZt6cPrPl github.com/onsi/gomega v1.7.1/go.mod h1:XdKZgCCFLUoM/7CFJVPcG8C1xQ1AJ0vpAezJrB7JYyY= github.com/onsi/gomega v1.10.1/go.mod h1:iN09h71vgCQne3DLsj+A5owkum+a2tYe+TOCB1ybHNo= github.com/onsi/gomega v1.10.3/go.mod h1:V9xEwhxec5O8UDM77eCW8vLymOMltsqPVYWrpDsH8xc= -github.com/onsi/gomega v1.36.1 h1:bJDPBO7ibjxcbHMgSCoo4Yj18UWbKDlLwX1x9sybDcw= -github.com/onsi/gomega v1.36.1/go.mod h1:PvZbdDc8J6XJEpDK4HCuRBm8a6Fzp9/DmhC9C7yFlog= +github.com/onsi/gomega v1.36.2 h1:koNYke6TVk6ZmnyHrCXba/T/MoLBXFjeC1PtvYgw0A8= +github.com/onsi/gomega v1.36.2/go.mod h1:DdwyADRjrc825LhMEkD76cHR5+pUnjhUN8GlHlRPHzY= github.com/opencontainers/go-digest v1.0.0 h1:apOUWs51W5PlhuyGyz9FCeeBIOUDA/6nW8Oi/yOhh5U= github.com/opencontainers/go-digest v1.0.0/go.mod h1:0JzlMkj0TRzQZfJkVvzbP0HBR3IKzErnv2BNG4W4MAM= github.com/opencontainers/image-spec v1.1.0 h1:8SG7/vwALn54lVB/0yZ/MMwhFrPYtpEHQb2IpWsCzug= @@ -1922,8 +1922,8 @@ google.golang.org/protobuf v1.26.0/go.mod h1:9q0QmTI4eRPtz6boOQmLYwt+qCgq0jsYwAQ google.golang.org/protobuf v1.27.1/go.mod h1:9q0QmTI4eRPtz6boOQmLYwt+qCgq0jsYwAQnmE0givc= google.golang.org/protobuf v1.28.0/go.mod h1:HV8QOd/L58Z+nl8r43ehVNZIU/HEI6OcFqwMG9pJV4I= google.golang.org/protobuf v1.34.1/go.mod h1:c6P6GXX6sHbq/GpV6MGZEdwhWPcYBgnhAHhKbcUYpos= -google.golang.org/protobuf v1.36.0 h1:mjIs9gYtt56AzC4ZaffQuh88TZurBGhIJMBZGSxNerQ= -google.golang.org/protobuf v1.36.0/go.mod h1:9fA7Ob0pmnwhb644+1+CVWFRbNajQ6iRojtC/QF5bRE= +google.golang.org/protobuf v1.36.1 h1:yBPeRvTftaleIgM3PZ/WBIZ7XM/eEYAaEyCwvyjq/gk= +google.golang.org/protobuf v1.36.1/go.mod h1:9fA7Ob0pmnwhb644+1+CVWFRbNajQ6iRojtC/QF5bRE= gopkg.in/alexcesaro/statsd.v2 v2.0.0 h1:FXkZSCZIH17vLCO5sO2UucTHsH9pc+17F6pl3JVCwMc= gopkg.in/alexcesaro/statsd.v2 v2.0.0/go.mod h1:i0ubccKGzBVNBpdGV5MocxyA/XlLUJzA7SLonnE4drU= gopkg.in/cenkalti/backoff.v1 v1.1.0 h1:Arh75ttbsvlpVA7WtVpH4u9h6Zl46xuptxqLxPiSo4Y= diff --git a/router/batchrouter/asyncdestinationmanager/bing-ads/offline-conversions/bingads_test.go b/router/batchrouter/asyncdestinationmanager/bing-ads/offline-conversions/bingads_test.go index 5eb703c90d..8754c04cdf 100644 --- a/router/batchrouter/asyncdestinationmanager/bing-ads/offline-conversions/bingads_test.go +++ b/router/batchrouter/asyncdestinationmanager/bing-ads/offline-conversions/bingads_test.go @@ -70,11 +70,7 @@ var _ = Describe("Bing ads Offline Conversions", func() { initBingads() ctrl := gomock.NewController(GinkgoT()) bingAdsService := mockbulkservice.NewMockBulkServiceI(ctrl) - clientI := Client{ - URL: "http://localhost/upload1", - client: &http.Client{}, - } - bulkUploader := NewBingAdsBulkUploader(logger.NOP, stats.NOP, "BING_ADS", bingAdsService, &clientI) + bulkUploader := NewBingAdsBulkUploader(logger.NOP, stats.NOP, "BING_ADS", bingAdsService, true) bingAdsService.EXPECT().GetBulkUploadUrl().Return(&mockbingads.GetBulkUploadUrlResponse{ UploadUrl: "http://localhost/upload1", RequestId: misc.FastUUID().URN(), @@ -138,8 +134,7 @@ var _ = Describe("Bing ads Offline Conversions", func() { initBingads() ctrl := gomock.NewController(GinkgoT()) bingAdsService := mockbulkservice.NewMockBulkServiceI(ctrl) - clientI := Client{} - bulkUploader := NewBingAdsBulkUploader(logger.NOP, stats.NOP, "BING_ADS", bingAdsService, &clientI) + bulkUploader := NewBingAdsBulkUploader(logger.NOP, stats.NOP, "BING_ADS", bingAdsService, true) errorMsg := "Error in getting bulk upload url" bingAdsService.EXPECT().GetBulkUploadUrl().Return(nil, errors.New(errorMsg)) bingAdsService.EXPECT().GetBulkUploadUrl().Return(nil, errors.New(errorMsg)) @@ -181,8 +176,7 @@ var _ = Describe("Bing ads Offline Conversions", func() { initBingads() ctrl := gomock.NewController(GinkgoT()) bingAdsService := mockbulkservice.NewMockBulkServiceI(ctrl) - ClientI := Client{} - bulkUploader := NewBingAdsBulkUploader(logger.NOP, stats.NOP, "BING_ADS", bingAdsService, &ClientI) + bulkUploader := NewBingAdsBulkUploader(logger.NOP, stats.NOP, "BING_ADS", bingAdsService, true) errMsg := "unable to get bulk upload url, check your credentials" bingAdsService.EXPECT().GetBulkUploadUrl().Return(nil, errors.New(errMsg)) @@ -227,8 +221,7 @@ var _ = Describe("Bing ads Offline Conversions", func() { initBingads() ctrl := gomock.NewController(GinkgoT()) bingAdsService := mockbulkservice.NewMockBulkServiceI(ctrl) - clientI := Client{} - bulkUploader := NewBingAdsBulkUploader(logger.NOP, stats.NOP, "BING_ADS", bingAdsService, &clientI) + bulkUploader := NewBingAdsBulkUploader(logger.NOP, stats.NOP, "BING_ADS", bingAdsService, true) bingAdsService.EXPECT().GetBulkUploadUrl().Return(&mockbingads.GetBulkUploadUrlResponse{ UploadUrl: "http://localhost/upload1", RequestId: misc.FastUUID().URN(), @@ -285,8 +278,7 @@ var _ = Describe("Bing ads Offline Conversions", func() { initBingads() ctrl := gomock.NewController(GinkgoT()) bingAdsService := mockbulkservice.NewMockBulkServiceI(ctrl) - clientI := Client{} - bulkUploader := NewBingAdsBulkUploader(logger.NOP, stats.NOP, "BING_ADS", bingAdsService, &clientI) + bulkUploader := NewBingAdsBulkUploader(logger.NOP, stats.NOP, "BING_ADS", bingAdsService, true) bingAdsService.EXPECT().GetBulkUploadStatus("dummyRequestId123").Return(&mockbingads.GetBulkUploadStatusResponse{ PercentComplete: int64(100), @@ -308,8 +300,7 @@ var _ = Describe("Bing ads Offline Conversions", func() { initBingads() ctrl := gomock.NewController(GinkgoT()) bingAdsService := mockbulkservice.NewMockBulkServiceI(ctrl) - clientI := Client{} - bulkUploader := NewBingAdsBulkUploader(logger.NOP, stats.NOP, "BING_ADS", bingAdsService, &clientI) + bulkUploader := NewBingAdsBulkUploader(logger.NOP, stats.NOP, "BING_ADS", bingAdsService, true) bingAdsService.EXPECT().GetBulkUploadStatus("dummyRequestId123").Return(nil, fmt.Errorf("failed to get bulk upload status:")) pollInput := common.AsyncPoll{ @@ -327,8 +318,7 @@ var _ = Describe("Bing ads Offline Conversions", func() { initBingads() ctrl := gomock.NewController(GinkgoT()) bingAdsService := mockbulkservice.NewMockBulkServiceI(ctrl) - clientI := Client{} - bulkUploader := NewBingAdsBulkUploader(logger.NOP, stats.NOP, "BING_ADS", bingAdsService, &clientI) + bulkUploader := NewBingAdsBulkUploader(logger.NOP, stats.NOP, "BING_ADS", bingAdsService, true) bingAdsService.EXPECT().GetBulkUploadStatus("dummyRequestId123").Return(&mockbingads.GetBulkUploadStatusResponse{ PercentComplete: int64(100), @@ -356,8 +346,7 @@ var _ = Describe("Bing ads Offline Conversions", func() { initBingads() ctrl := gomock.NewController(GinkgoT()) bingAdsService := mockbulkservice.NewMockBulkServiceI(ctrl) - clientI := Client{} - bulkUploader := NewBingAdsBulkUploader(logger.NOP, stats.NOP, "BING_ADS", bingAdsService, &clientI) + bulkUploader := NewBingAdsBulkUploader(logger.NOP, stats.NOP, "BING_ADS", bingAdsService, true) bingAdsService.EXPECT().GetBulkUploadStatus("dummyRequestId123").Return(&mockbingads.GetBulkUploadStatusResponse{ PercentComplete: int64(0), @@ -383,8 +372,7 @@ var _ = Describe("Bing ads Offline Conversions", func() { initBingads() ctrl := gomock.NewController(GinkgoT()) bingAdsService := mockbulkservice.NewMockBulkServiceI(ctrl) - clientI := Client{} - bulkUploader := NewBingAdsBulkUploader(logger.NOP, stats.NOP, "BING_ADS", bingAdsService, &clientI) + bulkUploader := NewBingAdsBulkUploader(logger.NOP, stats.NOP, "BING_ADS", bingAdsService, true) bingAdsService.EXPECT().GetBulkUploadStatus("dummyRequestId123").Return(&mockbingads.GetBulkUploadStatusResponse{ PercentComplete: int64(0), @@ -410,8 +398,7 @@ var _ = Describe("Bing ads Offline Conversions", func() { initBingads() ctrl := gomock.NewController(GinkgoT()) bingAdsService := mockbulkservice.NewMockBulkServiceI(ctrl) - clientI := Client{} - bulkUploader := NewBingAdsBulkUploader(logger.NOP, stats.NOP, "BING_ADS", bingAdsService, &clientI) + bulkUploader := NewBingAdsBulkUploader(logger.NOP, stats.NOP, "BING_ADS", bingAdsService, true) bingAdsService.EXPECT().GetBulkUploadStatus("dummyRequestId456").Return(&mockbingads.GetBulkUploadStatusResponse{ PercentComplete: int64(100), @@ -454,10 +441,8 @@ var _ = Describe("Bing ads Offline Conversions", func() { http.ServeFile(w, r, errorsTemplateFilePath) })) defer ts.Close() - client := ts.Client() modifiedURL := ts.URL // Use the test server URL - clientI := Client{client: client, URL: modifiedURL} - bulkUploader := NewBingAdsBulkUploader(logger.NOP, stats.NOP, "BING_ADS", bingAdsService, &clientI) + bulkUploader := NewBingAdsBulkUploader(logger.NOP, stats.NOP, "BING_ADS", bingAdsService, true) UploadStatsInput := common.GetUploadStatsInput{ FailedJobParameters: modifiedURL, @@ -531,11 +516,15 @@ var _ = Describe("Bing ads Offline Conversions", func() { }) It("Transform() Test -> successful ", func() { job := &jobsdb.JobT{ - EventPayload: []byte("{\"type\": \"record\", \"action\": \"insert\", \"fields\": {\"conversionName\": \"Test-Integration\", \"conversionTime\": \"5/22/2023 6:27:54 AM\", \"conversionValue\": \"100\", \"microsoftClickId\": \"click_id\", \"conversionCurrencyCode\": \"USD\"}}"), + EventPayload: []byte("{\n \"type\": \"record\",\n \"action\": \"insert\",\n \"fields\": {\n \"conversionName\": \"Test-Integration\",\n \"conversionTime\": \"5/22/2023 6:27:54 AM\",\n \"conversionValue\": \"100\",\n \"microsoftClickId\": \"click_id\",\n \"conversionCurrencyCode\": \"USD\",\n \"email\":\"test@testmail.com\",\n \"phone\":\"+911234567890\"\n }\n}"), } - uploader := &BingAdsBulkUploader{} + uploader := &BingAdsBulkUploader{ + isHashRequired: true, + } + expectedResp := `{"message":{"fields":{"conversionCurrencyCode":"USD","conversionName":"Test-Integration","conversionTime":"5/22/2023 6:27:54 AM","conversionValue":"100","email":"28a4da98f8812110001ab8ffacde3b38b4725a9e3570c39299fbf2d12c5aa70e","microsoftClickId":"click_id","phone":"8c229df83de8ab269e90918846e326c4008c86481393223d17a30ff5a407b08e"},"action":"insert"},"metadata":{"jobId":0}}` // Execute - _, err := uploader.Transform(job) + resp, err := uploader.Transform(job) + Expect(resp).To(Equal(expectedResp)) Expect(err).To(BeNil()) }) diff --git a/router/batchrouter/asyncdestinationmanager/bing-ads/offline-conversions/bulk_uploader.go b/router/batchrouter/asyncdestinationmanager/bing-ads/offline-conversions/bulk_uploader.go index a214d03250..2b603a87ed 100644 --- a/router/batchrouter/asyncdestinationmanager/bing-ads/offline-conversions/bulk_uploader.go +++ b/router/batchrouter/asyncdestinationmanager/bing-ads/offline-conversions/bulk_uploader.go @@ -19,15 +19,15 @@ import ( "github.com/rudderlabs/rudder-server/router/batchrouter/asyncdestinationmanager/common" ) -func NewBingAdsBulkUploader(logger logger.Logger, statsFactory stats.Stats, destName string, service bingads.BulkServiceI, client *Client) *BingAdsBulkUploader { +func NewBingAdsBulkUploader(logger logger.Logger, statsFactory stats.Stats, destName string, service bingads.BulkServiceI, isHashRequired bool) *BingAdsBulkUploader { return &BingAdsBulkUploader{ - destName: destName, - service: service, - logger: logger.Child("BingAds").Child("BingAdsBulkUploader"), - statsFactory: statsFactory, - client: *client, - fileSizeLimit: common.GetBatchRouterConfigInt64("MaxUploadLimit", destName, 100*bytesize.MB), - eventsLimit: common.GetBatchRouterConfigInt64("MaxEventsLimit", destName, 1000), + isHashRequired: isHashRequired, + destName: destName, + service: service, + logger: logger.Child("BingAds").Child("BingAdsBulkUploader"), + statsFactory: statsFactory, + fileSizeLimit: common.GetBatchRouterConfigInt64("MaxUploadLimit", destName, 100*bytesize.MB), + eventsLimit: common.GetBatchRouterConfigInt64("MaxEventsLimit", destName, 1000), } } @@ -81,6 +81,12 @@ func (b *BingAdsBulkUploader) Transform(job *jobsdb.JobT) (string, error) { } } } + if b.isHashRequired { + event.Fields, err = hashFields(fields) + if err != nil { + return payload, fmt.Errorf("Unable to hash fields.%w", err) + } + } data := Data{ Message: Message{ Fields: event.Fields, diff --git a/router/batchrouter/asyncdestinationmanager/bing-ads/offline-conversions/manager.go b/router/batchrouter/asyncdestinationmanager/bing-ads/offline-conversions/manager.go index c304130206..aa54b4c360 100644 --- a/router/batchrouter/asyncdestinationmanager/bing-ads/offline-conversions/manager.go +++ b/router/batchrouter/asyncdestinationmanager/bing-ads/offline-conversions/manager.go @@ -49,9 +49,7 @@ func newManagerInternal(logger logger.Logger, statsFactory stats.Stats, destinat TokenSource: &token, } session := bingads.NewSession(sessionConfig) - - clientNew := Client{} - bingUploader := NewBingAdsBulkUploader(logger, statsFactory, destination.DestinationDefinition.Name, bingads.NewBulkService(session), &clientNew) + bingUploader := NewBingAdsBulkUploader(logger, statsFactory, destination.DestinationDefinition.Name, bingads.NewBulkService(session), destConfig.IsHashRequired) return bingUploader, nil } diff --git a/router/batchrouter/asyncdestinationmanager/bing-ads/offline-conversions/types.go b/router/batchrouter/asyncdestinationmanager/bing-ads/offline-conversions/types.go index 220069d2ff..16549f6742 100644 --- a/router/batchrouter/asyncdestinationmanager/bing-ads/offline-conversions/types.go +++ b/router/batchrouter/asyncdestinationmanager/bing-ads/offline-conversions/types.go @@ -3,25 +3,20 @@ package offline_conversions import ( "encoding/csv" "encoding/json" - "net/http" "github.com/rudderlabs/bing-ads-go-sdk/bingads" "github.com/rudderlabs/rudder-go-kit/logger" "github.com/rudderlabs/rudder-go-kit/stats" ) -type Client struct { - URL string - client *http.Client -} type BingAdsBulkUploader struct { - destName string - service bingads.BulkServiceI - logger logger.Logger - statsFactory stats.Stats - client Client - fileSizeLimit int64 - eventsLimit int64 + destName string + service bingads.BulkServiceI + logger logger.Logger + statsFactory stats.Stats + fileSizeLimit int64 + eventsLimit int64 + isHashRequired bool } type Message struct { Fields json.RawMessage `json:"fields"` @@ -41,6 +36,7 @@ type DestinationConfig struct { CustomerAccountID string `json:"customerAccountId"` CustomerID string `json:"customerId"` RudderAccountID string `json:"rudderAccountId"` + IsHashRequired bool `json:"isHashRequired"` } type ActionFileInfo struct { diff --git a/router/batchrouter/asyncdestinationmanager/bing-ads/offline-conversions/util.go b/router/batchrouter/asyncdestinationmanager/bing-ads/offline-conversions/util.go index 01728d917b..d6272bd46e 100644 --- a/router/batchrouter/asyncdestinationmanager/bing-ads/offline-conversions/util.go +++ b/router/batchrouter/asyncdestinationmanager/bing-ads/offline-conversions/util.go @@ -3,8 +3,10 @@ package offline_conversions import ( "archive/zip" "bufio" + "crypto/sha256" "encoding/csv" "encoding/json" + stdjson "encoding/json" "fmt" "io" "net/http" @@ -466,3 +468,43 @@ func validateField(fields map[string]interface{}, field string) error { } return nil } + +func calculateHashCode(data string) string { + // Join the strings into a single string with a separator + hash := sha256.New() + hash.Write([]byte(data)) + hashBytes := hash.Sum(nil) + hashCode := fmt.Sprintf("%x", hashBytes) + + return hashCode +} + +func hashFields(input map[string]interface{}) (stdjson.RawMessage, error) { + // Create a new map to hold the hashed fields + hashedMap := make(map[string]interface{}) + + // Iterate over the input map + for key, value := range input { + // Check if the key is "email" or "phone" + if key == "email" || key == "phone" { + // Ensure the value is a string before hashing + if strVal, ok := value.(string); ok { + hashedMap[key] = calculateHashCode(strVal) + } else { + // If not a string, preserve the original value + hashedMap[key] = value + } + } else { + // Preserve other fields unchanged + hashedMap[key] = value + } + } + + // Convert the resulting map to JSON RawMessage + result, err := json.Marshal(hashedMap) + if err != nil { + return nil, err + } + + return json.RawMessage(result), nil +} diff --git a/services/oauth/v2/oauth.go b/services/oauth/v2/oauth.go index 70efa0ca53..1be55950a0 100644 --- a/services/oauth/v2/oauth.go +++ b/services/oauth/v2/oauth.go @@ -5,6 +5,7 @@ import ( "errors" "fmt" "net/http" + "strings" "time" "github.com/tidwall/gjson" @@ -141,11 +142,11 @@ func (h *OAuthHandler) FetchToken(fetchTokenParams *RefreshTokenParams) (int, *A authErrCategory: "", errorMessage: "", destDefName: fetchTokenParams.DestDefName, - isTokenFetch: true, flowType: h.RudderFlowType, action: "fetch_token", } - return h.GetTokenInfo(fetchTokenParams, "Fetch token", authStats) + statshandler := NewStatsHandlerFromOAuthStats(authStats) + return h.GetTokenInfo(fetchTokenParams, "Fetch token", statshandler) } /* @@ -176,10 +177,11 @@ func (h *OAuthHandler) RefreshToken(refTokenParams *RefreshTokenParams) (int, *A action: "refresh_token", stats: h.stats, } - return h.GetTokenInfo(refTokenParams, "Refresh token", authStats) + statsHandler := NewStatsHandlerFromOAuthStats(authStats) + return h.GetTokenInfo(refTokenParams, "Refresh token", statsHandler) } -func (h *OAuthHandler) GetTokenInfo(refTokenParams *RefreshTokenParams, logTypeName string, authStats *OAuthStats) (int, *AuthResponse, error) { +func (h *OAuthHandler) GetTokenInfo(refTokenParams *RefreshTokenParams, logTypeName string, statsHandler OAuthStatsHandler) (int, *AuthResponse, error) { log := h.Logger.Withn( logger.NewStringField("Call Type", logTypeName), logger.NewStringField("AccountId", refTokenParams.AccountID), @@ -189,13 +191,13 @@ func (h *OAuthHandler) GetTokenInfo(refTokenParams *RefreshTokenParams, logTypeN ) log.Debugn("[request] :: Get Token Info request received") startTime := time.Now() - defer func() { - authStats.statName = GetOAuthActionStatName("total_latency") - authStats.isCallToCpApi = false - authStats.SendTimerStats(startTime) - }() h.CacheMutex.Lock(refTokenParams.AccountID) defer h.CacheMutex.Unlock(refTokenParams.AccountID) + defer func() { + statsHandler.SendTiming(startTime, "total_latency", stats.Tags{ + "isCallToCpApi": "false", + }) + }() refTokenBody := RefreshTokenBodyParams{} storedCache, ok := h.Cache.Load(refTokenParams.AccountID) if ok { @@ -205,7 +207,7 @@ func (h *OAuthHandler) GetTokenInfo(refTokenParams *RefreshTokenParams, logTypeN return http.StatusInternalServerError, nil, errors.New("failed to type assert the stored cache") } // TODO: verify if the storedCache is nil at this point - if !checkIfTokenExpired(cachedSecret.Account, refTokenParams.Secret, h.ExpirationTimeDiff, authStats) { + if !checkIfTokenExpired(cachedSecret.Account, refTokenParams.Secret, h.ExpirationTimeDiff, statsHandler) { return http.StatusOK, cachedSecret, nil } // Refresh token preparation @@ -214,7 +216,7 @@ func (h *OAuthHandler) GetTokenInfo(refTokenParams *RefreshTokenParams, logTypeN ExpiredSecret: refTokenParams.Secret, } } - statusCode, refSecret, refErr := h.fetchAccountInfoFromCp(refTokenParams, refTokenBody, authStats, logTypeName) + statusCode, refSecret, refErr := h.fetchAccountInfoFromCp(refTokenParams, refTokenBody, statsHandler, logTypeName) // handling of refresh token response if statusCode == http.StatusOK { // fetching/refreshing through control plane was successful @@ -241,11 +243,7 @@ func (h *OAuthHandler) AuthStatusToggle(params *AuthStatusToggleParams) (statusC action: action, stats: h.stats, } - defer func() { - authStatusToggleStats.statName = GetOAuthActionStatName("total_latency") - authStatusToggleStats.isCallToCpApi = false - authStatusToggleStats.SendTimerStats(authErrHandlerTimeStart) - }() + statsHandler := NewStatsHandlerFromOAuthStats(authStatusToggleStats) h.CacheMutex.Lock(params.RudderAccountID) isAuthStatusUpdateActive, isAuthStatusUpdateReqPresent := h.AuthStatusUpdateActiveMap[destinationId] if isAuthStatusUpdateReqPresent && isAuthStatusUpdateActive { @@ -266,6 +264,11 @@ func (h *OAuthHandler) AuthStatusToggle(params *AuthStatusToggleParams) (statusC h.Cache.Delete(params.RudderAccountID) h.CacheMutex.Unlock(params.RudderAccountID) }() + defer func() { + statsHandler.SendTiming(authErrHandlerTimeStart, "total_latency", stats.Tags{ + "isCallToCpApi": "false", + }) + }() authStatusToggleUrl := fmt.Sprintf("%s/workspaces/%s/destinations/%s/authStatus/toggle", h.ConfigBEURL, params.WorkspaceID, destinationId) @@ -278,14 +281,15 @@ func (h *OAuthHandler) AuthStatusToggle(params *AuthStatusToggleParams) (statusC RequestType: action, BasicAuthUser: h.Identity(), } - authStatusToggleStats.statName = GetOAuthActionStatName("request_sent") - authStatusToggleStats.isCallToCpApi = true - authStatusToggleStats.SendCountStat() + statsHandler.Increment("request_sent", stats.Tags{ + "isCallToCpApi": "true", + }) cpiCallStartTime := time.Now() statusCode, respBody = h.CpConn.CpApiCall(authStatusInactiveCpReq) - authStatusToggleStats.statName = GetOAuthActionStatName("request_latency") - authStatusToggleStats.SendTimerStats(cpiCallStartTime) + statsHandler.SendTiming(cpiCallStartTime, "request_latency", stats.Tags{ + "isCallToCpApi": "true", + }) h.Logger.Debugn("[request] :: Response from CP for auth status inactive req", logger.NewIntField("StatusCode", int64(statusCode)), logger.NewStringField("Response", respBody)) @@ -299,18 +303,20 @@ func (h *OAuthHandler) AuthStatusToggle(params *AuthStatusToggleParams) (statusC } else { msg = fmt.Sprintf("Could not update authStatus to inactive for destination: %v", authStatusToggleRes.Message) } - authStatusToggleStats.statName = GetOAuthActionStatName("request") - authStatusToggleStats.errorMessage = msg - authStatusToggleStats.SendCountStat() + statsHandler.Increment("request", stats.Tags{ + "errorMessage": msg, + "isCallToCpApi": "true", + }) return http.StatusBadRequest, ErrPermissionOrTokenRevoked.Error() } h.Logger.Debugn("[request] :: (Write) auth status inactive Response received", logger.NewIntField("StatusCode", int64(statusCode)), logger.NewStringField("Response", respBody)) - authStatusToggleStats.statName = GetOAuthActionStatName("request") - authStatusToggleStats.errorMessage = "" - authStatusToggleStats.SendCountStat() + statsHandler.Increment("request", stats.Tags{ + "errorMessage": "", + "isCallToCpApi": "true", + }) return http.StatusBadRequest, ErrPermissionOrTokenRevoked.Error() } @@ -342,14 +348,15 @@ func (h *OAuthHandler) GetRefreshTokenErrResp(response string, accountSecret *Ac // This method hits the Control Plane to get the account information // As well update the account information into the destAuthInfoMap(which acts as an in-memory cache) func (h *OAuthHandler) fetchAccountInfoFromCp(refTokenParams *RefreshTokenParams, refTokenBody RefreshTokenBodyParams, - authStats *OAuthStats, logTypeName string, + statsHandler OAuthStatsHandler, logTypeName string, ) (int, *AuthResponse, error) { + actionType := strings.Join(strings.Fields(strings.ToLower(logTypeName)), "_") refreshUrl := fmt.Sprintf("%s/destination/workspaces/%s/accounts/%s/token", h.ConfigBEURL, refTokenParams.WorkspaceID, refTokenParams.AccountID) res, err := json.Marshal(refTokenBody) if err != nil { - authStats.statName = GetOAuthActionStatName("request") - authStats.errorMessage = "error in marshalling refresh token body" - authStats.SendCountStat() + statsHandler.Increment("request", stats.Tags{ + "errorMessage": "error in marshalling refresh token body", + }) return http.StatusInternalServerError, nil, err } refreshCpReq := &controlplane.Request{ @@ -358,20 +365,21 @@ func (h *OAuthHandler) fetchAccountInfoFromCp(refTokenParams *RefreshTokenParams ContentType: "application/json; charset=utf-8", Body: string(res), DestName: refTokenParams.DestDefName, - RequestType: authStats.action, + RequestType: actionType, BasicAuthUser: h.TokenProvider.Identity(), } var accountSecret AccountSecret // Stat for counting number of Refresh Token endpoint calls - authStats.statName = GetOAuthActionStatName("request_sent") - authStats.isCallToCpApi = true - authStats.errorMessage = "" - authStats.SendCountStat() + statsHandler.Increment("request_sent", stats.Tags{ + "isCallToCpApi": "true", + "errorMessage": "", + }) cpiCallStartTime := time.Now() statusCode, response := h.CpConn.CpApiCall(refreshCpReq) - authStats.statName = GetOAuthActionStatName("request_latency") - authStats.SendTimerStats(cpiCallStartTime) + statsHandler.SendTiming(cpiCallStartTime, "request_latency", stats.Tags{ + "isCallToCpApi": "true", + }) log := h.Logger.Withn(logger.NewIntField("StatusCode", int64(statusCode)), logger.NewIntField("WorkerId", int64(refTokenParams.WorkerID)), @@ -380,9 +388,10 @@ func (h *OAuthHandler) fetchAccountInfoFromCp(refTokenParams *RefreshTokenParams // Empty Refresh token response if !routerutils.IsNotEmptyString(response) { - authStats.statName = GetOAuthActionStatName("request") - authStats.errorMessage = "Empty secret" - authStats.SendCountStat() + statsHandler.Increment("request", stats.Tags{ + "errorMessage": "Empty secret", + "isCallToCpApi": "true", + }) // Setting empty accessToken value into in-memory auth info map(cache) h.Logger.Debugn("Empty response from Control-Plane", logger.NewStringField("Response", response), @@ -398,9 +407,10 @@ func (h *OAuthHandler) fetchAccountInfoFromCp(refTokenParams *RefreshTokenParams Err: errType, ErrorMessage: refErrMsg, } - authStats.statName = GetOAuthActionStatName("request") - authStats.errorMessage = errType - authStats.SendCountStat() + statsHandler.Increment("request", stats.Tags{ + "errorMessage": errType, + "isCallToCpApi": "true", + }) if authResponse.Err == common.RefTokenInvalidGrant { // Should abort the event as refresh is not going to work // until we have new refresh token for the account @@ -408,9 +418,10 @@ func (h *OAuthHandler) fetchAccountInfoFromCp(refTokenParams *RefreshTokenParams } return http.StatusInternalServerError, authResponse, fmt.Errorf("error occurred while fetching/refreshing account info from CP: %s", refErrMsg) } - authStats.statName = GetOAuthActionStatName("request") - authStats.errorMessage = "" - authStats.SendCountStat() + statsHandler.Increment("request", stats.Tags{ + "errorMessage": "", + "isCallToCpApi": "true", + }) log.Debugn("[request] :: (Write) Account Secret received") // Store expirationDate information accountSecret.ExpirationDate = gjson.Get(response, "secret.expirationDate").String() diff --git a/services/oauth/v2/stats.go b/services/oauth/v2/stats.go index b6db502237..75fe46d8cf 100644 --- a/services/oauth/v2/stats.go +++ b/services/oauth/v2/stats.go @@ -2,12 +2,17 @@ package v2 import ( "strconv" + "strings" "time" + "github.com/samber/lo" + "github.com/rudderlabs/rudder-go-kit/stats" "github.com/rudderlabs/rudder-server/services/oauth/v2/common" ) +const OAUTH_V2_STAT_PREFIX = "oauth_action" + type OAuthStats struct { stats stats.Stats id string // destinationId -> for action == auth_status_inactive, accountId -> for action == refresh_token/fetch_token @@ -18,40 +23,45 @@ type OAuthStats struct { isCallToCpApi bool // is a call being made to control-plane APIs authErrCategory string // for action=refresh_token -> REFRESH_TOKEN, for action=fetch_token -> "", for action=auth_status_inactive -> auth_status_inactive destDefName string - isTokenFetch bool // This stats field is used to identify if a request to get token is arising from processor flowType common.RudderFlow // delivery, delete action string // refresh_token, fetch_token, auth_status_inactive } -func (s *OAuthStats) SendTimerStats(startTime time.Time) { - statsTags := stats.Tags{ - "id": s.id, - "workspaceId": s.workspaceID, - "rudderCategory": s.rudderCategory, - "isCallToCpApi": strconv.FormatBool(s.isCallToCpApi), - "authErrCategory": s.authErrCategory, - "destType": s.destDefName, - "flowType": string(s.flowType), - "action": s.action, +type OAuthStatsHandler struct { + stats stats.Stats + defaultTags stats.Tags +} + +func GetDefaultTagsFromOAuthStats(oauthStats *OAuthStats) stats.Tags { + return stats.Tags{ + "id": oauthStats.id, + "workspaceId": oauthStats.workspaceID, + "rudderCategory": "destination", + "isCallToCpApi": strconv.FormatBool(oauthStats.isCallToCpApi), + "authErrCategory": oauthStats.authErrCategory, + "destType": oauthStats.destDefName, + "flowType": string(oauthStats.flowType), + "action": oauthStats.action, "oauthVersion": "v2", } - s.stats.NewTaggedStat(s.statName, stats.TimerType, statsTags).SendTiming(time.Since(startTime)) } -// SendCountStat Send count type stats related to OAuth(Destination) -func (s *OAuthStats) SendCountStat() { - statsTags := stats.Tags{ - "oauthVersion": "v2", - "id": s.id, - "workspaceId": s.workspaceID, - "rudderCategory": s.rudderCategory, - "errorMessage": s.errorMessage, - "isCallToCpApi": strconv.FormatBool(s.isCallToCpApi), - "authErrCategory": s.authErrCategory, - "destType": s.destDefName, - "isTokenFetch": strconv.FormatBool(s.isTokenFetch), - "flowType": string(s.flowType), - "action": s.action, +func NewStatsHandlerFromOAuthStats(oauthStats *OAuthStats) OAuthStatsHandler { + defaultTags := GetDefaultTagsFromOAuthStats(oauthStats) + return OAuthStatsHandler{ + stats: oauthStats.stats, + defaultTags: defaultTags, } - s.stats.NewTaggedStat(s.statName, stats.CountType, statsTags).Increment() +} + +func (m *OAuthStatsHandler) Increment(statSuffix string, tags stats.Tags) { + statName := strings.Join([]string{OAUTH_V2_STAT_PREFIX, statSuffix}, "_") + allTags := lo.Assign(m.defaultTags, tags) + m.stats.NewTaggedStat(statName, stats.CountType, allTags).Increment() +} + +func (m *OAuthStatsHandler) SendTiming(startTime time.Time, statSuffix string, tags stats.Tags) { + statName := strings.Join([]string{OAUTH_V2_STAT_PREFIX, statSuffix}, "_") + allTags := lo.Assign(m.defaultTags, tags) + m.stats.NewTaggedStat(statName, stats.TimerType, allTags).SendTiming(time.Since(startTime)) } diff --git a/services/oauth/v2/utils.go b/services/oauth/v2/utils.go index a8754673d1..67cab847fb 100644 --- a/services/oauth/v2/utils.go +++ b/services/oauth/v2/utils.go @@ -7,6 +7,7 @@ import ( "fmt" "time" + "github.com/rudderlabs/rudder-go-kit/stats" routerutils "github.com/rudderlabs/rudder-server/router/utils" "github.com/rudderlabs/rudder-server/services/oauth/v2/common" "github.com/rudderlabs/rudder-server/utils/misc" @@ -21,8 +22,8 @@ func GetOAuthActionStatName(stat string) string { return fmt.Sprintf("oauth_action_%v", stat) } -func checkIfTokenExpired(secret AccountSecret, oldSecret json.RawMessage, expiryTimeDiff time.Duration, stats *OAuthStats) bool { - if secret.ExpirationDate != "" && isTokenExpired(secret.ExpirationDate, expiryTimeDiff, stats) { +func checkIfTokenExpired(secret AccountSecret, oldSecret json.RawMessage, expiryTimeDiff time.Duration, statsHandler OAuthStatsHandler) bool { + if secret.ExpirationDate != "" && isTokenExpired(secret.ExpirationDate, expiryTimeDiff, &statsHandler) { return true } if !routerutils.IsNotEmptyString(string(oldSecret)) { @@ -31,12 +32,12 @@ func checkIfTokenExpired(secret AccountSecret, oldSecret json.RawMessage, expiry return bytes.Equal(secret.Secret, oldSecret) } -func isTokenExpired(expirationDate string, expirationTimeDiff time.Duration, stats *OAuthStats) bool { +func isTokenExpired(expirationDate string, expirationTimeDiff time.Duration, statsHandler *OAuthStatsHandler) bool { date, err := time.Parse(misc.RFC3339Milli, expirationDate) if err != nil { - stats.errorMessage = "parsing failed" - stats.statName = GetOAuthActionStatName("proactive_token_refresh") - stats.SendCountStat() + statsHandler.Increment("proactive_token_refresh", stats.Tags{ + "errorMessage": "parsing failed", + }) return false } return date.Before(time.Now().Add(expirationTimeDiff))