Skip to content

Commit

Permalink
Merge branch 'master' into feat.isolateServerUTCommunication
Browse files Browse the repository at this point in the history
  • Loading branch information
cisse21 committed Jan 14, 2025
2 parents 8bf15a2 + e1b756b commit f16e30e
Show file tree
Hide file tree
Showing 7 changed files with 243 additions and 19 deletions.
4 changes: 2 additions & 2 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ replace (
require (
cloud.google.com/go/bigquery v1.65.0
cloud.google.com/go/pubsub v1.45.3
cloud.google.com/go/storage v1.49.0
cloud.google.com/go/storage v1.50.0
github.com/Azure/azure-storage-blob-go v0.15.0
github.com/ClickHouse/clickhouse-go v1.5.4
github.com/DATA-DOG/go-sqlmock v1.5.2
Expand Down Expand Up @@ -94,7 +94,7 @@ require (
github.com/stretchr/testify v1.10.0
github.com/tidwall/gjson v1.18.0
github.com/tidwall/sjson v1.2.5
github.com/trinodb/trino-go-client v0.320.0
github.com/trinodb/trino-go-client v0.321.0
github.com/urfave/cli/v2 v2.27.5
github.com/viney-shih/go-lock v1.1.2
github.com/xitongsys/parquet-go v1.6.2
Expand Down
8 changes: 4 additions & 4 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -89,8 +89,8 @@ cloud.google.com/go/storage v1.8.0/go.mod h1:Wv1Oy7z6Yz3DshWRJFhqM/UCfaWIRTdp0RX
cloud.google.com/go/storage v1.10.0/go.mod h1:FLPqc6j+Ki4BU591ie1oL6qBQGu2Bl/tZ9ullr3+Kg0=
cloud.google.com/go/storage v1.12.0/go.mod h1:fFLk2dp2oAhDz8QFKwqrjdJvxSp/W2g7nillojlL5Ho=
cloud.google.com/go/storage v1.21.0/go.mod h1:XmRlxkgPjlBONznT2dDUU/5XlpU2OjMnKuqnZI01LAA=
cloud.google.com/go/storage v1.49.0 h1:zenOPBOWHCnojRd9aJZAyQXBYqkJkdQS42dxL55CIMw=
cloud.google.com/go/storage v1.49.0/go.mod h1:k1eHhhpLvrPjVGfo0mOUPEJ4Y2+a/Hv5PiwehZI9qGU=
cloud.google.com/go/storage v1.50.0 h1:3TbVkzTooBvnZsk7WaAQfOsNrdoM8QHusXA1cpk6QJs=
cloud.google.com/go/storage v1.50.0/go.mod h1:l7XeiD//vx5lfqE3RavfmU9yvk5Pp0Zhcv482poyafY=
cloud.google.com/go/trace v1.0.0/go.mod h1:4iErSByzxkyHWzzlAj63/Gmjz0NH1ASqhJguHpGcr6A=
cloud.google.com/go/trace v1.2.0/go.mod h1:Wc8y/uYyOhPy12KEnXG9XGrvfMz5F5SrYecQlbW1rwM=
cloud.google.com/go/trace v1.11.2 h1:4ZmaBdL8Ng/ajrgKqY5jfvzqMXbrDcBsUGXOT9aqTtI=
Expand Down Expand Up @@ -1308,8 +1308,8 @@ github.com/tonistiigi/units v0.0.0-20180711220420-6950e57a87ea h1:SXhTLE6pb6eld/
github.com/tonistiigi/units v0.0.0-20180711220420-6950e57a87ea/go.mod h1:WPnis/6cRcDZSUvVmezrxJPkiO87ThFYsoUiMwWNDJk=
github.com/tonistiigi/vt100 v0.0.0-20240514184818-90bafcd6abab h1:H6aJ0yKQ0gF49Qb2z5hI1UHxSQt4JMyxebFR15KnApw=
github.com/tonistiigi/vt100 v0.0.0-20240514184818-90bafcd6abab/go.mod h1:ulncasL3N9uLrVann0m+CDlJKWsIAP34MPcOJF6VRvc=
github.com/trinodb/trino-go-client v0.320.0 h1:z0LJU21PN68YGZzqFczroKv0mARRpdRpvHqu34+Pdh4=
github.com/trinodb/trino-go-client v0.320.0/go.mod h1:F+7TZRD0+0M8XqYsgXT8+EJT1pSlbxTECVD1BDzCc70=
github.com/trinodb/trino-go-client v0.321.0 h1:ViwiBxLNlJARWLCH4Q6MOjWFu/WrsznOM7QzRG/kRlY=
github.com/trinodb/trino-go-client v0.321.0/go.mod h1:F+7TZRD0+0M8XqYsgXT8+EJT1pSlbxTECVD1BDzCc70=
github.com/twitchyliquid64/golang-asm v0.15.1/go.mod h1:a1lVb/DtPvCB8fslRZhAngC2+aY1QWCk3Cedj/Gdt08=
github.com/ugorji/go/codec v1.2.12/go.mod h1:UNopzCgEMSXjBc6AOMqYvWC1ktqTAfzJZUZgYf6w6lg=
github.com/urfave/cli/v2 v2.27.5 h1:WoHEJLdsXr6dDWoJgMq/CboDmyY/8HMMH1fTECbih+w=
Expand Down
Original file line number Diff line number Diff line change
@@ -1,11 +1,57 @@
package destination_transformer

import "fmt"
import (
"fmt"
"strings"

type DestTransformer struct{}
"github.com/rudderlabs/rudder-go-kit/config"
"github.com/rudderlabs/rudder-go-kit/logger"
"github.com/rudderlabs/rudder-go-kit/stats"
warehouseutils "github.com/rudderlabs/rudder-server/warehouse/utils"
)

type DestTransformer struct {
config struct {
destTransformationURL string
}
conf *config.Config
log logger.Logger
stat stats.Stats
}

func (d *DestTransformer) SendRequest(data interface{}) (interface{}, error) {
fmt.Println("Sending request to Service A")
// Add service-specific logic
return "Response from Service A", nil
}

func NewDestTransformer(conf *config.Config, log logger.Logger, stat stats.Stats) *DestTransformer {
handle := &DestTransformer{}
handle.conf = conf
handle.log = log
handle.stat = stat

handle.config.destTransformationURL = handle.conf.GetString("Warehouse.destTransformationURL", "http://localhost:9090")
return handle
}

func (d *DestTransformer) destTransformURL(destType string) string {
destinationEndPoint := fmt.Sprintf("%s/v0/destinations/%s", d.config.destTransformationURL, strings.ToLower(destType))

if _, ok := warehouseutils.WarehouseDestinationMap[destType]; ok {
whSchemaVersionQueryParam := fmt.Sprintf("whSchemaVersion=%s&whIDResolve=%v", d.conf.GetString("Warehouse.schemaVersion", "v1"), warehouseutils.IDResolutionEnabled())
switch destType {
case warehouseutils.RS:
return destinationEndPoint + "?" + whSchemaVersionQueryParam
case warehouseutils.CLICKHOUSE:
enableArraySupport := fmt.Sprintf("chEnableArraySupport=%s", fmt.Sprintf("%v", d.conf.GetBool("Warehouse.clickhouse.enableArraySupport", false)))
return destinationEndPoint + "?" + whSchemaVersionQueryParam + "&" + enableArraySupport
default:
return destinationEndPoint + "?" + whSchemaVersionQueryParam
}
}
if destType == warehouseutils.SnowpipeStreaming {
return fmt.Sprintf("%s?whSchemaVersion=%s&whIDResolve=%t", destinationEndPoint, d.conf.GetString("Warehouse.schemaVersion", "v1"), warehouseutils.IDResolutionEnabled())
}
return destinationEndPoint
}
126 changes: 126 additions & 0 deletions processor/internal/http_client/http_client.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,126 @@
package http_client

import (
"fmt"
"github.com/bufbuild/httplb"
"github.com/bufbuild/httplb/resolver"
"github.com/rudderlabs/rudder-go-kit/config"
"github.com/rudderlabs/rudder-go-kit/logger"
"github.com/rudderlabs/rudder-go-kit/stats"
"github.com/rudderlabs/rudder-server/utils/sysUtils"
"net"
"net/http"
"time"
)

type HTTPDoer interface {
Do(req *http.Request) (*http.Response, error)
}

type HTTPHandle struct {
sentStat stats.Measurement
receivedStat stats.Measurement
cpDownGauge stats.Measurement

conf *config.Config
logger logger.Logger
stat stats.Stats

httpClient HTTPDoer

guardConcurrency chan struct{}

config struct {
maxConcurrency int
maxHTTPConnections int
maxHTTPIdleConnections int
maxIdleConnDuration time.Duration
disableKeepAlives bool

timeoutDuration time.Duration

maxRetry config.ValueLoader[int]
failOnUserTransformTimeout config.ValueLoader[bool]
failOnError config.ValueLoader[bool]
maxRetryBackoffInterval config.ValueLoader[time.Duration]

destTransformationURL string
userTransformationURL string
}
}

type HTTPLBTransport struct {
*http.Transport
}

func (t *HTTPLBTransport) NewRoundTripper(scheme, target string, config httplb.TransportConfig) httplb.RoundTripperResult {
return httplb.RoundTripperResult{RoundTripper: t.Transport, Close: t.CloseIdleConnections}
}

func NewHTTPClient(conf *config.Config, log logger.Logger, stat stats.Stats) (HTTPHandle, error) {
trans := HTTPHandle{}

trans.conf = conf
trans.logger = log.Child("transformer")
trans.stat = stat

trans.sentStat = stat.NewStat("processor.transformer_sent", stats.CountType)
trans.receivedStat = stat.NewStat("processor.transformer_received", stats.CountType)
trans.cpDownGauge = stat.NewStat("processor.control_plane_down", stats.GaugeType)

trans.config.maxConcurrency = conf.GetInt("Processor.maxConcurrency", 200)
trans.config.maxHTTPConnections = conf.GetInt("Transformer.Client.maxHTTPConnections", 100)
trans.config.maxHTTPIdleConnections = conf.GetInt("Transformer.Client.maxHTTPIdleConnections", 10)
trans.config.maxIdleConnDuration = conf.GetDuration("Transformer.Client.maxIdleConnDuration", 30, time.Second)
trans.config.disableKeepAlives = conf.GetBool("Transformer.Client.disableKeepAlives", true)
trans.config.timeoutDuration = conf.GetDuration("HttpClient.procTransformer.timeout", 600, time.Second)
trans.config.destTransformationURL = conf.GetString("DEST_TRANSFORM_URL", "http://localhost:9090")
trans.config.userTransformationURL = conf.GetString("USER_TRANSFORM_URL", trans.config.destTransformationURL)

trans.config.maxRetry = conf.GetReloadableIntVar(30, 1, "Processor.maxRetry")
trans.config.failOnUserTransformTimeout = conf.GetReloadableBoolVar(false, "Processor.Transformer.failOnUserTransformTimeout")
trans.config.failOnError = conf.GetReloadableBoolVar(false, "Processor.Transformer.failOnError")

trans.config.maxRetryBackoffInterval = conf.GetReloadableDurationVar(30, time.Second, "Processor.Transformer.maxRetryBackoffInterval")

trans.guardConcurrency = make(chan struct{}, trans.config.maxConcurrency)

clientType := conf.GetString("Transformer.Client.type", "stdlib")

transport := &http.Transport{
DisableKeepAlives: trans.config.disableKeepAlives,
MaxConnsPerHost: trans.config.maxHTTPConnections,
MaxIdleConnsPerHost: trans.config.maxHTTPIdleConnections,
IdleConnTimeout: trans.config.maxIdleConnDuration,
}
client := &http.Client{
Transport: transport,
Timeout: trans.config.timeoutDuration,
}

switch clientType {
case "stdlib":
trans.httpClient = client
case "recycled":
trans.httpClient = sysUtils.NewRecycledHTTPClient(func() *http.Client {
return client
}, config.GetDuration("Transformer.Client.ttl", 120, time.Second))
case "httplb":
trans.httpClient = httplb.NewClient(
httplb.WithTransport("http", &HTTPLBTransport{
Transport: transport,
}),
httplb.WithResolver(
resolver.NewDNSResolver(
net.DefaultResolver,
resolver.PreferIPv6,
config.GetDuration("Transformer.Client.ttl", 120, time.Second), // TTL value
),
),
)
default:
return HTTPHandle{}, fmt.Errorf("unknown transformer client type: %s", clientType)
}

return trans, nil
}
Original file line number Diff line number Diff line change
@@ -1,11 +1,36 @@
package trackingplan_validation

import "fmt"
import (
"fmt"
"github.com/rudderlabs/rudder-go-kit/config"
"github.com/rudderlabs/rudder-go-kit/logger"
"github.com/rudderlabs/rudder-go-kit/stats"
)

type TPValidator struct{}
type TPValidator struct {
config struct {
destTransformationURL string
}
conf *config.Config
log logger.Logger
stat stats.Stats
}

func (t *TPValidator) SendRequest(data interface{}) (interface{}, error) {
fmt.Println("Sending request to Service A")
// Add service-specific logic
return "Response from Service A", nil
}

func NewTPValidator(conf *config.Config, log logger.Logger, stat stats.Stats) *TPValidator {
handle := &TPValidator{}
handle.conf = conf
handle.log = log
handle.stat = stat
handle.config.destTransformationURL = handle.conf.GetString("Warehouse.destTransformationURL", "http://localhost:9090")
return handle
}

func (t *TPValidator) trackingPlanValidationURL() string {
return t.config.destTransformationURL + "/v0/validate"
}
29 changes: 27 additions & 2 deletions processor/internal/user_transformer/user_transformer.go
Original file line number Diff line number Diff line change
@@ -1,11 +1,36 @@
package user_transformer

import "fmt"
import (
"fmt"
"github.com/rudderlabs/rudder-go-kit/config"
"github.com/rudderlabs/rudder-go-kit/logger"
"github.com/rudderlabs/rudder-go-kit/stats"
)

type UserTransformer struct{}
type UserTransformer struct {
config struct {
userTransformationURL string
}
conf *config.Config
log logger.Logger
stat stats.Stats
}

func (u *UserTransformer) SendRequest(data interface{}) (interface{}, error) {
fmt.Println("Sending request to Service A")
// Add service-specific logic
return "Response from Service A", nil
}

func NewUserTransformer(conf *config.Config, log logger.Logger, stat stats.Stats) *UserTransformer {
handle := &UserTransformer{}
handle.conf = conf
handle.log = log
handle.stat = stat
handle.config.userTransformationURL = handle.conf.GetString("Warehouse.userTransformationURL", "http://localhost:9090")
return handle
}

func (u *UserTransformer) userTransformURL() string {
return u.config.userTransformationURL + "/customTransform"
}
16 changes: 9 additions & 7 deletions processor/processor.go
Original file line number Diff line number Diff line change
Expand Up @@ -393,13 +393,19 @@ func (proc *Handle) Setup(
if proc.conf == nil {
proc.conf = config.Default
}

// Stats
if proc.statsFactory == nil {
proc.statsFactory = stats.Default
}

proc.setupReloadableVars()
proc.logger = logger.NewLogger().Child("processor")
proc.backendConfig = backendConfig
proc.transformerManager = transformer.NewCommunicationManager()
proc.transformerManager.RegisterService("user_transformer", &user_transformer.UserTransformer{})
proc.transformerManager.RegisterService("destination_transformer", &destination_transformer.DestTransformer{})
proc.transformerManager.RegisterService("trackingplan_validation", &trackingplan_validation.TPValidator{})
proc.transformerManager.RegisterService("user_transformer", user_transformer.NewUserTransformer(proc.conf, proc.logger, proc.statsFactory))
proc.transformerManager.RegisterService("destination_transformer", destination_transformer.NewDestTransformer(proc.conf, proc.logger, proc.statsFactory))
proc.transformerManager.RegisterService("trackingplan_validation", trackingplan_validation.NewTPValidator(proc.conf, proc.logger, proc.statsFactory))

proc.gatewayDB = gatewayDB
proc.routerDB = routerDB
Expand All @@ -426,10 +432,6 @@ func (proc *Handle) Setup(

proc.trackedUsersReporter = trackedUsersReporter

// Stats
if proc.statsFactory == nil {
proc.statsFactory = stats.Default
}
proc.tracer = proc.statsFactory.NewTracer("processor")
proc.stats.statGatewayDBR = func(partition string) stats.Measurement {
return proc.statsFactory.NewTaggedStat("processor_gateway_db_read", stats.CountType, stats.Tags{
Expand Down

0 comments on commit f16e30e

Please sign in to comment.