Skip to content

Commit

Permalink
Undo various unrelated changes.
Browse files Browse the repository at this point in the history
To reduce the size of the kvstore PR this change removes various unrelated changes.
  • Loading branch information
deverton-godaddy committed Mar 6, 2024
1 parent 8989e3d commit 695941e
Show file tree
Hide file tree
Showing 6 changed files with 268 additions and 231 deletions.
90 changes: 51 additions & 39 deletions main.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,16 +6,14 @@ import (
"io"
"os"
"os/signal"
"syscall"

ciliumClient "github.com/cilium/cilium/pkg/client"
ciliumCommand "github.com/cilium/cilium/pkg/command"
ciliumKvStore "github.com/cilium/cilium/pkg/kvstore"
ciliumLogging "github.com/cilium/cilium/pkg/logging"
nomadApi "github.com/hashicorp/nomad/api"
cilium_client "github.com/cilium/cilium/pkg/client"
cilium_command "github.com/cilium/cilium/pkg/command"
cilium_kvstore "github.com/cilium/cilium/pkg/kvstore"
cilium_logging "github.com/cilium/cilium/pkg/logging"
nomad_api "github.com/hashicorp/nomad/api"
"github.com/urfave/cli/v2"
"go.uber.org/zap"
"golang.org/x/sync/errgroup"

"github.com/cosmonic-labs/netreap/internal/zaplogrus"
"github.com/cosmonic-labs/netreap/reapers"
Expand Down Expand Up @@ -68,7 +66,7 @@ func main() {
Before: func(ctx *cli.Context) error {
// Borrow the parser from Cilium
kvStoreOpt := ctx.String("kvstore-opts")
if m, err := ciliumCommand.ToStringMapStringE(kvStoreOpt); err != nil {
if m, err := cilium_command.ToStringMapStringE(kvStoreOpt); err != nil {
return fmt.Errorf("unable to parse %s: %w", kvStoreOpt, err)
} else {
conf.kvStoreOpts = m
Expand Down Expand Up @@ -103,19 +101,15 @@ func configureLogging(debug bool) (logger *zap.Logger, err error) {
zap.ReplaceGlobals(logger)

// Bridge Cilium logrus to netreap zap
ciliumLogging.DefaultLogger.SetReportCaller(true)
ciliumLogging.DefaultLogger.SetOutput(io.Discard)
ciliumLogging.DefaultLogger.AddHook(zaplogrus.NewZapLogrusHook(logger))
cilium_logging.DefaultLogger.SetReportCaller(true)
cilium_logging.DefaultLogger.SetOutput(io.Discard)
cilium_logging.DefaultLogger.AddHook(zaplogrus.NewZapLogrusHook(logger))

return logger, nil
}

func run(ctx context.Context, conf config) error {

// Nomad with Docker defaults to SIGTERM for stopping containersq
ctx, cancel := signal.NotifyContext(ctx, os.Interrupt, syscall.SIGTERM)
defer cancel()

logger, err := configureLogging(conf.debug)
if err != nil {
return fmt.Errorf("can't initialize zap logger: %v", err)
Expand All @@ -125,14 +119,14 @@ func run(ctx context.Context, conf config) error {
// Step 0: Construct the clients

// Looks for the default Cilium socket path or uses the value from CILIUM_SOCK
ciliumClient, err := ciliumClient.NewDefaultClient()
cilium_client, err := cilium_client.NewDefaultClient()
if err != nil {
return fmt.Errorf("error when connecting to cilium agent: %s", err)
}

// Fetch kvstore config from Cilium if not set
if conf.kvStore == "" || len(conf.kvStoreOpts) == 0 {
resp, err := ciliumClient.ConfigGet()
resp, err := cilium_client.ConfigGet()
if err != nil {
return fmt.Errorf("unable to retrieve cilium configuration: %s", err)
}
Expand All @@ -153,20 +147,20 @@ func run(ctx context.Context, conf config) error {
}
}

err = ciliumKvStore.Setup(ctx, conf.kvStore, conf.kvStoreOpts, nil)
err = cilium_kvstore.Setup(ctx, conf.kvStore, conf.kvStoreOpts, nil)
if err != nil {
return fmt.Errorf("unable to connect to Cilium kvstore: %s", err)
}

// DefaultConfig fetches configuration data from well-known nomad variables (e.g. NOMAD_ADDR,
// NOMAD_CACERT), so we'll just leverage that for now.
nomadClient, err := nomadApi.NewClient(nomadApi.DefaultConfig())
nomad_client, err := nomad_api.NewClient(nomad_api.DefaultConfig())
if err != nil {
return fmt.Errorf("unable to connect to Nomad: %s", err)
}

// Get the node ID of the instance we're running on
self, err := nomadClient.Agent().Self()
self, err := nomad_client.Agent().Self()
if err != nil {
return fmt.Errorf("unable to query local agent info: %s", err)
}
Expand All @@ -181,36 +175,54 @@ func run(ctx context.Context, conf config) error {
return fmt.Errorf("unable to get local node ID")
}

// Step 2: Start the reapers
egroup, ctx := errgroup.WithContext(ctx)
// Step 1: Leader election
ctx, cancel := context.WithCancel(context.Background())
defer cancel()

c := make(chan os.Signal, 1)
signal.Notify(c, os.Interrupt)

zap.S().Debug("Starting endpoint reaper")
endpointReaper, err := reapers.NewEndpointReaper(ciliumClient, nomadClient.Allocations(), nomadClient.EventStream(), nodeID)
zap.L().Debug("Starting node reaper")
node_reaper, err := reapers.NewNodeReaper(ctx, cilium_kvstore.Client(), nomad_client.Nodes(), nomad_client.EventStream(), os.Getenv("NOMAD_ALLOC_ID"))
if err != nil {
return err
}
egroup.Go(func() error {
return endpointReaper.Run(ctx)
})

zap.S().Debug("Starting policy reaper")
policiesReaper, err := reapers.NewPoliciesReaper(ciliumKvStore.Client(), conf.policiesPrefix, ciliumClient)
nodeFailChan, err := node_reaper.Run()
if err != nil {
return fmt.Errorf("unable to start node reaper: %s", err)
}

// Step 2: Start the reapers
zap.L().Debug("Starting endpoint reaper")
endpoint_reaper, err := reapers.NewEndpointReaper(cilium_client, nomad_client.Allocations(), nomad_client.EventStream(), nodeID)
if err != nil {
return err
}
egroup.Go(func() error {
return policiesReaper.Run(ctx)
})
endpointFailChan, err := endpoint_reaper.Run(ctx)

zap.S().Debug("Starting node reaper")
nodeReaper, err := reapers.NewNodeReaper(ciliumKvStore.Client(), nomadClient.Nodes(), nomadClient.EventStream(), os.Getenv("NOMAD_ALLOC_ID"))
zap.S().Debug("Starting policies reaper")
policies_reaper, err := reapers.NewPoliciesReaper(cilium_kvstore.Client(), conf.policiesPrefix, cilium_client)
if err != nil {
return err
}
egroup.Go(func() error {
return nodeReaper.Run(ctx)
})
policiesFailChan, err := policies_reaper.Run(ctx)

// Wait for interrupt or client failure
select {
case <-c:
zap.S().Info("Received interrupt, shutting down")
cancel()
case <-nodeFailChan:
zap.S().Error("nomad node reaper client failed, shutting down")
cancel()
case <-endpointFailChan:
zap.S().Error("endpoint reaper kvstore client failed, shutting down")
cancel()
case <-policiesFailChan:
zap.S().Error("policies reaper kvstore client failed, shutting down")
cancel()
}

// Step 4: Wait interrupt or go routine error
return egroup.Wait()
return nil
}
Loading

0 comments on commit 695941e

Please sign in to comment.