Skip to content

Commit

Permalink
Merge branch 'master' into feat.fetchRemoteSchemaAsync
Browse files Browse the repository at this point in the history
  • Loading branch information
cisse21 authored Jan 3, 2025
2 parents b013c87 + 18f4bdf commit 7e43ff9
Show file tree
Hide file tree
Showing 10 changed files with 191 additions and 138 deletions.
4 changes: 2 additions & 2 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -66,7 +66,7 @@ require (
github.com/mitchellh/mapstructure v1.5.0
github.com/olekukonko/tablewriter v0.0.5
github.com/onsi/ginkgo/v2 v2.22.1
github.com/onsi/gomega v1.36.1
github.com/onsi/gomega v1.36.2
github.com/ory/dockertest/v3 v3.11.0
github.com/oschwald/maxminddb-golang v1.13.1
github.com/phayes/freeport v0.0.0-20220201140144-74d24b5ae9f5
Expand Down Expand Up @@ -110,7 +110,7 @@ require (
google.golang.org/api v0.214.0
google.golang.org/genproto/googleapis/rpc v0.0.0-20241209162323-e6fa225c2576
google.golang.org/grpc v1.68.1
google.golang.org/protobuf v1.36.0
google.golang.org/protobuf v1.36.1
)

require (
Expand Down
8 changes: 4 additions & 4 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -1072,8 +1072,8 @@ github.com/onsi/ginkgo/v2 v2.22.1/go.mod h1:S6aTpoRsSq2cZOd+pssHAlKW/Q/jZt6cPrPl
github.com/onsi/gomega v1.7.1/go.mod h1:XdKZgCCFLUoM/7CFJVPcG8C1xQ1AJ0vpAezJrB7JYyY=
github.com/onsi/gomega v1.10.1/go.mod h1:iN09h71vgCQne3DLsj+A5owkum+a2tYe+TOCB1ybHNo=
github.com/onsi/gomega v1.10.3/go.mod h1:V9xEwhxec5O8UDM77eCW8vLymOMltsqPVYWrpDsH8xc=
github.com/onsi/gomega v1.36.1 h1:bJDPBO7ibjxcbHMgSCoo4Yj18UWbKDlLwX1x9sybDcw=
github.com/onsi/gomega v1.36.1/go.mod h1:PvZbdDc8J6XJEpDK4HCuRBm8a6Fzp9/DmhC9C7yFlog=
github.com/onsi/gomega v1.36.2 h1:koNYke6TVk6ZmnyHrCXba/T/MoLBXFjeC1PtvYgw0A8=
github.com/onsi/gomega v1.36.2/go.mod h1:DdwyADRjrc825LhMEkD76cHR5+pUnjhUN8GlHlRPHzY=
github.com/opencontainers/go-digest v1.0.0 h1:apOUWs51W5PlhuyGyz9FCeeBIOUDA/6nW8Oi/yOhh5U=
github.com/opencontainers/go-digest v1.0.0/go.mod h1:0JzlMkj0TRzQZfJkVvzbP0HBR3IKzErnv2BNG4W4MAM=
github.com/opencontainers/image-spec v1.1.0 h1:8SG7/vwALn54lVB/0yZ/MMwhFrPYtpEHQb2IpWsCzug=
Expand Down Expand Up @@ -1922,8 +1922,8 @@ google.golang.org/protobuf v1.26.0/go.mod h1:9q0QmTI4eRPtz6boOQmLYwt+qCgq0jsYwAQ
google.golang.org/protobuf v1.27.1/go.mod h1:9q0QmTI4eRPtz6boOQmLYwt+qCgq0jsYwAQnmE0givc=
google.golang.org/protobuf v1.28.0/go.mod h1:HV8QOd/L58Z+nl8r43ehVNZIU/HEI6OcFqwMG9pJV4I=
google.golang.org/protobuf v1.34.1/go.mod h1:c6P6GXX6sHbq/GpV6MGZEdwhWPcYBgnhAHhKbcUYpos=
google.golang.org/protobuf v1.36.0 h1:mjIs9gYtt56AzC4ZaffQuh88TZurBGhIJMBZGSxNerQ=
google.golang.org/protobuf v1.36.0/go.mod h1:9fA7Ob0pmnwhb644+1+CVWFRbNajQ6iRojtC/QF5bRE=
google.golang.org/protobuf v1.36.1 h1:yBPeRvTftaleIgM3PZ/WBIZ7XM/eEYAaEyCwvyjq/gk=
google.golang.org/protobuf v1.36.1/go.mod h1:9fA7Ob0pmnwhb644+1+CVWFRbNajQ6iRojtC/QF5bRE=
gopkg.in/alexcesaro/statsd.v2 v2.0.0 h1:FXkZSCZIH17vLCO5sO2UucTHsH9pc+17F6pl3JVCwMc=
gopkg.in/alexcesaro/statsd.v2 v2.0.0/go.mod h1:i0ubccKGzBVNBpdGV5MocxyA/XlLUJzA7SLonnE4drU=
gopkg.in/cenkalti/backoff.v1 v1.1.0 h1:Arh75ttbsvlpVA7WtVpH4u9h6Zl46xuptxqLxPiSo4Y=
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -70,11 +70,7 @@ var _ = Describe("Bing ads Offline Conversions", func() {
initBingads()
ctrl := gomock.NewController(GinkgoT())
bingAdsService := mockbulkservice.NewMockBulkServiceI(ctrl)
clientI := Client{
URL: "http://localhost/upload1",
client: &http.Client{},
}
bulkUploader := NewBingAdsBulkUploader(logger.NOP, stats.NOP, "BING_ADS", bingAdsService, &clientI)
bulkUploader := NewBingAdsBulkUploader(logger.NOP, stats.NOP, "BING_ADS", bingAdsService, true)
bingAdsService.EXPECT().GetBulkUploadUrl().Return(&mockbingads.GetBulkUploadUrlResponse{
UploadUrl: "http://localhost/upload1",
RequestId: misc.FastUUID().URN(),
Expand Down Expand Up @@ -138,8 +134,7 @@ var _ = Describe("Bing ads Offline Conversions", func() {
initBingads()
ctrl := gomock.NewController(GinkgoT())
bingAdsService := mockbulkservice.NewMockBulkServiceI(ctrl)
clientI := Client{}
bulkUploader := NewBingAdsBulkUploader(logger.NOP, stats.NOP, "BING_ADS", bingAdsService, &clientI)
bulkUploader := NewBingAdsBulkUploader(logger.NOP, stats.NOP, "BING_ADS", bingAdsService, true)
errorMsg := "Error in getting bulk upload url"
bingAdsService.EXPECT().GetBulkUploadUrl().Return(nil, errors.New(errorMsg))
bingAdsService.EXPECT().GetBulkUploadUrl().Return(nil, errors.New(errorMsg))
Expand Down Expand Up @@ -181,8 +176,7 @@ var _ = Describe("Bing ads Offline Conversions", func() {
initBingads()
ctrl := gomock.NewController(GinkgoT())
bingAdsService := mockbulkservice.NewMockBulkServiceI(ctrl)
ClientI := Client{}
bulkUploader := NewBingAdsBulkUploader(logger.NOP, stats.NOP, "BING_ADS", bingAdsService, &ClientI)
bulkUploader := NewBingAdsBulkUploader(logger.NOP, stats.NOP, "BING_ADS", bingAdsService, true)
errMsg := "unable to get bulk upload url, check your credentials"
bingAdsService.EXPECT().GetBulkUploadUrl().Return(nil, errors.New(errMsg))

Expand Down Expand Up @@ -227,8 +221,7 @@ var _ = Describe("Bing ads Offline Conversions", func() {
initBingads()
ctrl := gomock.NewController(GinkgoT())
bingAdsService := mockbulkservice.NewMockBulkServiceI(ctrl)
clientI := Client{}
bulkUploader := NewBingAdsBulkUploader(logger.NOP, stats.NOP, "BING_ADS", bingAdsService, &clientI)
bulkUploader := NewBingAdsBulkUploader(logger.NOP, stats.NOP, "BING_ADS", bingAdsService, true)
bingAdsService.EXPECT().GetBulkUploadUrl().Return(&mockbingads.GetBulkUploadUrlResponse{
UploadUrl: "http://localhost/upload1",
RequestId: misc.FastUUID().URN(),
Expand Down Expand Up @@ -285,8 +278,7 @@ var _ = Describe("Bing ads Offline Conversions", func() {
initBingads()
ctrl := gomock.NewController(GinkgoT())
bingAdsService := mockbulkservice.NewMockBulkServiceI(ctrl)
clientI := Client{}
bulkUploader := NewBingAdsBulkUploader(logger.NOP, stats.NOP, "BING_ADS", bingAdsService, &clientI)
bulkUploader := NewBingAdsBulkUploader(logger.NOP, stats.NOP, "BING_ADS", bingAdsService, true)

bingAdsService.EXPECT().GetBulkUploadStatus("dummyRequestId123").Return(&mockbingads.GetBulkUploadStatusResponse{
PercentComplete: int64(100),
Expand All @@ -308,8 +300,7 @@ var _ = Describe("Bing ads Offline Conversions", func() {
initBingads()
ctrl := gomock.NewController(GinkgoT())
bingAdsService := mockbulkservice.NewMockBulkServiceI(ctrl)
clientI := Client{}
bulkUploader := NewBingAdsBulkUploader(logger.NOP, stats.NOP, "BING_ADS", bingAdsService, &clientI)
bulkUploader := NewBingAdsBulkUploader(logger.NOP, stats.NOP, "BING_ADS", bingAdsService, true)

bingAdsService.EXPECT().GetBulkUploadStatus("dummyRequestId123").Return(nil, fmt.Errorf("failed to get bulk upload status:"))
pollInput := common.AsyncPoll{
Expand All @@ -327,8 +318,7 @@ var _ = Describe("Bing ads Offline Conversions", func() {
initBingads()
ctrl := gomock.NewController(GinkgoT())
bingAdsService := mockbulkservice.NewMockBulkServiceI(ctrl)
clientI := Client{}
bulkUploader := NewBingAdsBulkUploader(logger.NOP, stats.NOP, "BING_ADS", bingAdsService, &clientI)
bulkUploader := NewBingAdsBulkUploader(logger.NOP, stats.NOP, "BING_ADS", bingAdsService, true)

bingAdsService.EXPECT().GetBulkUploadStatus("dummyRequestId123").Return(&mockbingads.GetBulkUploadStatusResponse{
PercentComplete: int64(100),
Expand Down Expand Up @@ -356,8 +346,7 @@ var _ = Describe("Bing ads Offline Conversions", func() {
initBingads()
ctrl := gomock.NewController(GinkgoT())
bingAdsService := mockbulkservice.NewMockBulkServiceI(ctrl)
clientI := Client{}
bulkUploader := NewBingAdsBulkUploader(logger.NOP, stats.NOP, "BING_ADS", bingAdsService, &clientI)
bulkUploader := NewBingAdsBulkUploader(logger.NOP, stats.NOP, "BING_ADS", bingAdsService, true)

bingAdsService.EXPECT().GetBulkUploadStatus("dummyRequestId123").Return(&mockbingads.GetBulkUploadStatusResponse{
PercentComplete: int64(0),
Expand All @@ -383,8 +372,7 @@ var _ = Describe("Bing ads Offline Conversions", func() {
initBingads()
ctrl := gomock.NewController(GinkgoT())
bingAdsService := mockbulkservice.NewMockBulkServiceI(ctrl)
clientI := Client{}
bulkUploader := NewBingAdsBulkUploader(logger.NOP, stats.NOP, "BING_ADS", bingAdsService, &clientI)
bulkUploader := NewBingAdsBulkUploader(logger.NOP, stats.NOP, "BING_ADS", bingAdsService, true)

bingAdsService.EXPECT().GetBulkUploadStatus("dummyRequestId123").Return(&mockbingads.GetBulkUploadStatusResponse{
PercentComplete: int64(0),
Expand All @@ -410,8 +398,7 @@ var _ = Describe("Bing ads Offline Conversions", func() {
initBingads()
ctrl := gomock.NewController(GinkgoT())
bingAdsService := mockbulkservice.NewMockBulkServiceI(ctrl)
clientI := Client{}
bulkUploader := NewBingAdsBulkUploader(logger.NOP, stats.NOP, "BING_ADS", bingAdsService, &clientI)
bulkUploader := NewBingAdsBulkUploader(logger.NOP, stats.NOP, "BING_ADS", bingAdsService, true)

bingAdsService.EXPECT().GetBulkUploadStatus("dummyRequestId456").Return(&mockbingads.GetBulkUploadStatusResponse{
PercentComplete: int64(100),
Expand Down Expand Up @@ -454,10 +441,8 @@ var _ = Describe("Bing ads Offline Conversions", func() {
http.ServeFile(w, r, errorsTemplateFilePath)
}))
defer ts.Close()
client := ts.Client()
modifiedURL := ts.URL // Use the test server URL
clientI := Client{client: client, URL: modifiedURL}
bulkUploader := NewBingAdsBulkUploader(logger.NOP, stats.NOP, "BING_ADS", bingAdsService, &clientI)
bulkUploader := NewBingAdsBulkUploader(logger.NOP, stats.NOP, "BING_ADS", bingAdsService, true)

UploadStatsInput := common.GetUploadStatsInput{
FailedJobParameters: modifiedURL,
Expand Down Expand Up @@ -531,11 +516,15 @@ var _ = Describe("Bing ads Offline Conversions", func() {
})
It("Transform() Test -> successful ", func() {
job := &jobsdb.JobT{
EventPayload: []byte("{\"type\": \"record\", \"action\": \"insert\", \"fields\": {\"conversionName\": \"Test-Integration\", \"conversionTime\": \"5/22/2023 6:27:54 AM\", \"conversionValue\": \"100\", \"microsoftClickId\": \"click_id\", \"conversionCurrencyCode\": \"USD\"}}"),
EventPayload: []byte("{\n \"type\": \"record\",\n \"action\": \"insert\",\n \"fields\": {\n \"conversionName\": \"Test-Integration\",\n \"conversionTime\": \"5/22/2023 6:27:54 AM\",\n \"conversionValue\": \"100\",\n \"microsoftClickId\": \"click_id\",\n \"conversionCurrencyCode\": \"USD\",\n \"email\":\"[email protected]\",\n \"phone\":\"+911234567890\"\n }\n}"),
}
uploader := &BingAdsBulkUploader{}
uploader := &BingAdsBulkUploader{
isHashRequired: true,
}
expectedResp := `{"message":{"fields":{"conversionCurrencyCode":"USD","conversionName":"Test-Integration","conversionTime":"5/22/2023 6:27:54 AM","conversionValue":"100","email":"28a4da98f8812110001ab8ffacde3b38b4725a9e3570c39299fbf2d12c5aa70e","microsoftClickId":"click_id","phone":"8c229df83de8ab269e90918846e326c4008c86481393223d17a30ff5a407b08e"},"action":"insert"},"metadata":{"jobId":0}}`
// Execute
_, err := uploader.Transform(job)
resp, err := uploader.Transform(job)
Expect(resp).To(Equal(expectedResp))
Expect(err).To(BeNil())
})

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,15 +19,15 @@ import (
"github.com/rudderlabs/rudder-server/router/batchrouter/asyncdestinationmanager/common"
)

func NewBingAdsBulkUploader(logger logger.Logger, statsFactory stats.Stats, destName string, service bingads.BulkServiceI, client *Client) *BingAdsBulkUploader {
func NewBingAdsBulkUploader(logger logger.Logger, statsFactory stats.Stats, destName string, service bingads.BulkServiceI, isHashRequired bool) *BingAdsBulkUploader {
return &BingAdsBulkUploader{
destName: destName,
service: service,
logger: logger.Child("BingAds").Child("BingAdsBulkUploader"),
statsFactory: statsFactory,
client: *client,
fileSizeLimit: common.GetBatchRouterConfigInt64("MaxUploadLimit", destName, 100*bytesize.MB),
eventsLimit: common.GetBatchRouterConfigInt64("MaxEventsLimit", destName, 1000),
isHashRequired: isHashRequired,
destName: destName,
service: service,
logger: logger.Child("BingAds").Child("BingAdsBulkUploader"),
statsFactory: statsFactory,
fileSizeLimit: common.GetBatchRouterConfigInt64("MaxUploadLimit", destName, 100*bytesize.MB),
eventsLimit: common.GetBatchRouterConfigInt64("MaxEventsLimit", destName, 1000),
}
}

Expand Down Expand Up @@ -81,6 +81,12 @@ func (b *BingAdsBulkUploader) Transform(job *jobsdb.JobT) (string, error) {
}
}
}
if b.isHashRequired {
event.Fields, err = hashFields(fields)
if err != nil {
return payload, fmt.Errorf("Unable to hash fields.%w", err)
}
}
data := Data{
Message: Message{
Fields: event.Fields,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -49,9 +49,7 @@ func newManagerInternal(logger logger.Logger, statsFactory stats.Stats, destinat
TokenSource: &token,
}
session := bingads.NewSession(sessionConfig)

clientNew := Client{}
bingUploader := NewBingAdsBulkUploader(logger, statsFactory, destination.DestinationDefinition.Name, bingads.NewBulkService(session), &clientNew)
bingUploader := NewBingAdsBulkUploader(logger, statsFactory, destination.DestinationDefinition.Name, bingads.NewBulkService(session), destConfig.IsHashRequired)
return bingUploader, nil
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,25 +3,20 @@ package offline_conversions
import (
"encoding/csv"
"encoding/json"
"net/http"

"github.com/rudderlabs/bing-ads-go-sdk/bingads"
"github.com/rudderlabs/rudder-go-kit/logger"
"github.com/rudderlabs/rudder-go-kit/stats"
)

type Client struct {
URL string
client *http.Client
}
type BingAdsBulkUploader struct {
destName string
service bingads.BulkServiceI
logger logger.Logger
statsFactory stats.Stats
client Client
fileSizeLimit int64
eventsLimit int64
destName string
service bingads.BulkServiceI
logger logger.Logger
statsFactory stats.Stats
fileSizeLimit int64
eventsLimit int64
isHashRequired bool
}
type Message struct {
Fields json.RawMessage `json:"fields"`
Expand All @@ -41,6 +36,7 @@ type DestinationConfig struct {
CustomerAccountID string `json:"customerAccountId"`
CustomerID string `json:"customerId"`
RudderAccountID string `json:"rudderAccountId"`
IsHashRequired bool `json:"isHashRequired"`
}

type ActionFileInfo struct {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,8 +3,10 @@ package offline_conversions
import (
"archive/zip"
"bufio"
"crypto/sha256"
"encoding/csv"
"encoding/json"
stdjson "encoding/json"
"fmt"
"io"
"net/http"
Expand Down Expand Up @@ -466,3 +468,43 @@ func validateField(fields map[string]interface{}, field string) error {
}
return nil
}

func calculateHashCode(data string) string {
// Join the strings into a single string with a separator
hash := sha256.New()
hash.Write([]byte(data))
hashBytes := hash.Sum(nil)
hashCode := fmt.Sprintf("%x", hashBytes)

return hashCode
}

func hashFields(input map[string]interface{}) (stdjson.RawMessage, error) {
// Create a new map to hold the hashed fields
hashedMap := make(map[string]interface{})

// Iterate over the input map
for key, value := range input {
// Check if the key is "email" or "phone"
if key == "email" || key == "phone" {
// Ensure the value is a string before hashing
if strVal, ok := value.(string); ok {
hashedMap[key] = calculateHashCode(strVal)
} else {
// If not a string, preserve the original value
hashedMap[key] = value
}
} else {
// Preserve other fields unchanged
hashedMap[key] = value
}
}

// Convert the resulting map to JSON RawMessage
result, err := json.Marshal(hashedMap)
if err != nil {
return nil, err
}

return json.RawMessage(result), nil
}
Loading

0 comments on commit 7e43ff9

Please sign in to comment.