Skip to content

Commit

Permalink
Refactor NewAerospikeBackend function (#164)
Browse files Browse the repository at this point in the history
* First commit

* unit test description

* Fix comments

* Mock aerospike client instantiation
  • Loading branch information
guscarreon authored Mar 26, 2024
1 parent 58cffa3 commit ab2b292
Show file tree
Hide file tree
Showing 3 changed files with 334 additions and 64 deletions.
80 changes: 54 additions & 26 deletions backends/aerospike.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package backends
import (
"context"
"errors"
"fmt"
"time"

as "github.com/aerospike/aerospike-client-go/v6"
Expand Down Expand Up @@ -52,37 +53,22 @@ type AerospikeBackend struct {
}

// NewAerospikeBackend validates config.Aerospike and returns an AerospikeBackend
func NewAerospikeBackend(cfg config.Aerospike, metrics *metrics.Metrics) *AerospikeBackend {
var hosts []*as.Host

clientPolicy := as.NewClientPolicy()
// cfg.User and cfg.Password are optional parameters
// if left blank in the config, they will default to the empty
// string and be ignored
clientPolicy.User = cfg.User
clientPolicy.Password = cfg.Password

// Aerospike's connection idle deadline default is 55 seconds. If greater than zero, this
// value will override
if cfg.ConnIdleTimeoutSecs > 0 {
clientPolicy.IdleTimeout = time.Duration(cfg.ConnIdleTimeoutSecs) * time.Second
}
type NewAerospikeClientFunc func(*as.ClientPolicy, ...*as.Host) (*as.Client, as.Error)

// Aerospike's default connection queue size per node is 256.
// If cfg.ConnQueueSize is greater than zero, it will override the default.
if cfg.ConnQueueSize > 0 {
clientPolicy.ConnectionQueueSize = cfg.ConnQueueSize
}
func NewAerospikeBackend(cfg config.Aerospike, metrics *metrics.Metrics) *AerospikeBackend {
return newAerospikeBackend(as.NewClientWithPolicyAndHost, cfg, metrics)
}

if len(cfg.Host) > 1 {
hosts = append(hosts, as.NewHost(cfg.Host, cfg.Port))
log.Info("config.backend.aerospike.host is being deprecated in favor of config.backend.aerospike.hosts")
}
for _, host := range cfg.Hosts {
hosts = append(hosts, as.NewHost(host, cfg.Port))
func newAerospikeBackend(newAerospikeClient NewAerospikeClientFunc, cfg config.Aerospike, metrics *metrics.Metrics) *AerospikeBackend {
clientPolicy := generateAerospikeClientPolicy(cfg)
hosts, err := generateHostsList(cfg)
if err != nil {
log.Fatalf("Error creating Aerospike backend: %s", err.Error())
return nil
}

client, err := as.NewClientWithPolicyAndHost(clientPolicy, hosts...)
client, err := newAerospikeClient(clientPolicy, hosts...)
if err != nil {
log.Fatalf("Error creating Aerospike backend: %s", classifyAerospikeError(err).Error())
panic("AerospikeBackend failure. This shouldn't happen.")
Expand Down Expand Up @@ -110,6 +96,48 @@ func NewAerospikeBackend(cfg config.Aerospike, metrics *metrics.Metrics) *Aerosp
}
}

// generateAerospikeClientPolicy returns an Aerospike ClientPolicy object configured according to values
// in config.Aerospike fields
func generateAerospikeClientPolicy(cfg config.Aerospike) *as.ClientPolicy {
clientPolicy := as.NewClientPolicy()
// cfg.User and cfg.Password are optional parameters
// if left blank in the config, they will default to the empty
// string and be ignored
clientPolicy.User = cfg.User
clientPolicy.Password = cfg.Password

// Connection idle timeout default is 55 seconds
if cfg.ConnIdleTimeoutSecs > 0 {
clientPolicy.IdleTimeout = time.Duration(cfg.ConnIdleTimeoutSecs) * time.Second
}

// Default connection queue size per node is 256
if cfg.ConnQueueSize > 0 {
clientPolicy.ConnectionQueueSize = cfg.ConnQueueSize
}

return clientPolicy
}

func generateHostsList(cfg config.Aerospike) ([]*as.Host, error) {
var hosts []*as.Host

if cfg.Port <= 0 {
return nil, fmt.Errorf("Cannot connect to Aerospike host at port %d", cfg.Port)
}
if len(cfg.Host) > 1 {
hosts = append(hosts, as.NewHost(cfg.Host, cfg.Port))
log.Info("config.backend.aerospike.host is being deprecated in favor of config.backend.aerospike.hosts")
}
for _, host := range cfg.Hosts {
hosts = append(hosts, as.NewHost(host, cfg.Port))
}
if len(hosts) == 0 {
return nil, errors.New("Cannot connect to empty Aerospike host(s)")
}
return hosts, nil
}

// Get creates an aerospike key based on the UUID key parameter, perfomrs the client's Get call
// and validates results. Can return a KEY_NOT_FOUND error or other Aerospike server errors
func (a *AerospikeBackend) Get(ctx context.Context, key string) (string, error) {
Expand Down
Loading

0 comments on commit ab2b292

Please sign in to comment.