Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add OAuth to kafka plugins #629

Open
wants to merge 1 commit into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
11 changes: 10 additions & 1 deletion plugin/input/kafka/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -81,7 +81,7 @@ If set, the plugin will use SASL authentications mechanism.

<br>

**`sasl_mechanism`** *`string`* *`default=SCRAM-SHA-512`* *`options=PLAIN|SCRAM-SHA-256|SCRAM-SHA-512`*
**`sasl_mechanism`** *`string`* *`default=SCRAM-SHA-512`* *`options=PLAIN|SCRAM-SHA-256|SCRAM-SHA-512|OAUTHBEARER`*

SASL mechanism to use.

Expand All @@ -99,6 +99,15 @@ SASL password.

<br>

**`sasl_oauth`** *`xoauth.Config`*

SASL OAuth config.
* `client_id` - client ID
* `client_secret` - client secret
* `token_url` - token url

<br>

**`is_ssl_enabled`** *`bool`* *`default=false`*

If set, the plugin will use SSL/TLS connections method.
Expand Down
17 changes: 16 additions & 1 deletion plugin/input/kafka/kafka.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ import (
"github.com/ozontech/file.d/fd"
"github.com/ozontech/file.d/metric"
"github.com/ozontech/file.d/pipeline"
"github.com/ozontech/file.d/xoauth"
"github.com/ozontech/file.d/xscram"
"github.com/ozontech/file.d/xtls"
"github.com/prometheus/client_golang/prometheus"
Expand Down Expand Up @@ -120,7 +121,7 @@ type Config struct {
// > @3@4@5@6
// >
// > SASL mechanism to use.
SaslMechanism string `json:"sasl_mechanism" default:"SCRAM-SHA-512" options:"PLAIN|SCRAM-SHA-256|SCRAM-SHA-512"` // *
SaslMechanism string `json:"sasl_mechanism" default:"SCRAM-SHA-512" options:"PLAIN|SCRAM-SHA-256|SCRAM-SHA-512|OAUTHBEARER"` // *

// > @3@4@5@6
// >
Expand All @@ -132,6 +133,14 @@ type Config struct {
// > SASL password.
SaslPassword string `json:"sasl_password" default:"password"` // *

// > @3@4@5@6
// >
// > SASL OAuth config.
// > * `client_id` - client ID
// > * `client_secret` - client secret
// > * `token_url` - token url
SaslOauth xoauth.Config `json:"sasl_oauth" child:"true"` // *

// > @3@4@5@6
// >
// > If set, the plugin will use SSL/TLS connections method.
Expand Down Expand Up @@ -241,6 +250,12 @@ func NewConsumerGroup(c *Config, l *zap.SugaredLogger) sarama.ConsumerGroup {
config.Net.SASL.SCRAMClientGeneratorFunc = func() sarama.SCRAMClient { return xscram.NewClient(xscram.SHA256) }
case sarama.SASLTypeSCRAMSHA512:
config.Net.SASL.SCRAMClientGeneratorFunc = func() sarama.SCRAMClient { return xscram.NewClient(xscram.SHA512) }
case sarama.SASLTypeOAuth:
provider, err := xoauth.NewSaramaTokenProvider(c.SaslOauth)
if err != nil {
l.Fatalf("can't create OAuth token provider: %s", err.Error())
}
config.Net.SASL.TokenProvider = provider
}
}

Expand Down
9 changes: 9 additions & 0 deletions plugin/output/kafka/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -114,6 +114,15 @@ SASL password.

<br>

**`sasl_oauth`** *`xoauth.Config`*

SASL OAuth config.
* `client_id` - client ID
* `client_secret` - client secret
* `token_url` - token url

<br>

**`is_ssl_enabled`** *`bool`* *`default=false`*

If set, the plugin will use SSL/TLS connections method.
Expand Down
15 changes: 15 additions & 0 deletions plugin/output/kafka/kafka.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ import (
"github.com/ozontech/file.d/fd"
"github.com/ozontech/file.d/metric"
"github.com/ozontech/file.d/pipeline"
"github.com/ozontech/file.d/xoauth"
"github.com/ozontech/file.d/xscram"
"github.com/ozontech/file.d/xtls"
"github.com/prometheus/client_golang/prometheus"
Expand Down Expand Up @@ -146,6 +147,14 @@ type Config struct {
// > SASL password.
SaslPassword string `json:"sasl_password" default:"password"` // *

// > @3@4@5@6
// >
// > SASL OAuth config.
// > * `client_id` - client ID
// > * `client_secret` - client secret
// > * `token_url` - token url
SaslOauth xoauth.Config `json:"sasl_oauth" child:"true"` // *

// > @3@4@5@6
// >
// > If set, the plugin will use SSL/TLS connections method.
Expand Down Expand Up @@ -322,6 +331,12 @@ func NewProducer(c *Config, l *zap.SugaredLogger) sarama.SyncProducer {
config.Net.SASL.SCRAMClientGeneratorFunc = func() sarama.SCRAMClient { return xscram.NewClient(xscram.SHA256) }
case sarama.SASLTypeSCRAMSHA512:
config.Net.SASL.SCRAMClientGeneratorFunc = func() sarama.SCRAMClient { return xscram.NewClient(xscram.SHA512) }
case sarama.SASLTypeOAuth:
provider, err := xoauth.NewSaramaTokenProvider(c.SaslOauth)
if err != nil {
l.Fatalf("can't create OAuth token provider: %s", err.Error())
}
config.Net.SASL.TokenProvider = provider
}
}

Expand Down
22 changes: 22 additions & 0 deletions xoauth/config.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
package xoauth

import "errors"

type Config struct {
ClientID string `json:"client_id"`
ClientSecret string `json:"client_secret"`
TokenURL string `json:"token_url"`
}

func (c Config) validate() error {
if c.ClientID == "" {
return errors.New("'client_id' must be set")
}
if c.ClientSecret == "" {
return errors.New("'client_secret' must be set")
}
if c.TokenURL == "" {
return errors.New("'token_url' must be set")
}
return nil
}
42 changes: 42 additions & 0 deletions xoauth/sarama_token_provider.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,42 @@
package xoauth

import (
"context"

"github.com/Shopify/sarama"
"golang.org/x/oauth2"
cred "golang.org/x/oauth2/clientcredentials"
)

// saramaTokenProvider implements sarama.AccessTokenProvider
type saramaTokenProvider struct {
tokenSource oauth2.TokenSource
}

// NewSaramaTokenProvider creates a new sarama.AccessTokenProvider with the provided clientID and clientSecret.
// The provided tokenURL is used to perform the 2 legged client credentials flow.
func NewSaramaTokenProvider(cfg Config) (sarama.AccessTokenProvider, error) {
if err := cfg.validate(); err != nil {
return nil, err
}

credCfg := cred.Config{
ClientID: cfg.ClientID,
ClientSecret: cfg.ClientSecret,
TokenURL: cfg.TokenURL,
}

return &saramaTokenProvider{
tokenSource: credCfg.TokenSource(context.Background()),
}, nil
}

// Token returns a new sarama.AccessToken
func (t *saramaTokenProvider) Token() (*sarama.AccessToken, error) {
token, err := t.tokenSource.Token()
if err != nil {
return nil, err
}

return &sarama.AccessToken{Token: token.AccessToken}, nil
}
Loading