Skip to content

Commit

Permalink
Merge branch '8.x' into mergify/bp/8.x/pr-6593
Browse files Browse the repository at this point in the history
  • Loading branch information
jlind23 authored Jan 27, 2025
2 parents 55adbeb + bfdeedd commit 3593c50
Show file tree
Hide file tree
Showing 8 changed files with 147 additions and 24 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,32 @@
# Kind can be one of:
# - breaking-change: a change to previously-documented behavior
# - deprecation: functionality that is being removed in a later release
# - bug-fix: fixes a problem in a previous version
# - enhancement: extends functionality but does not break or fix existing behavior
# - feature: new functionality
# - known-issue: problems that we are aware of in a given version
# - security: impacts on the security of a product or a user’s deployment.
# - upgrade: important information for someone upgrading from a prior version
# - other: does not fit into any of the other categories
kind: feature

# Change summary; a 80ish characters long description of the change.
summary: Use a random free port for sub-process communication in containers by default. Avoids port collisions when using host networking in Kubernetes.

# Long description; in case the summary is not enough to describe the change
# this field accommodate a description without length limits.
# NOTE: This field will be rendered only for breaking-change and known-issue kinds at the moment.
#description:

# Affected component; usually one of "elastic-agent", "fleet-server", "filebeat", "metricbeat", "auditbeat", "all", etc.
component: "elastic-agent"

# PR URL; optional; the PR number that added the changeset.
# If not present is automatically filled by the tooling finding the PR where this changelog fragment has been added.
# NOTE: the tooling supports backports, so it's able to fill the original PR number instead of the backport PR number.
# Please provide it if you are adding a fragment for a different PR.
pr: https://github.com/elastic/elastic-agent/pull/6585

# Issue URL; optional; the GitHub issue related to this changeset (either closes or is part of).
# If not present is automatically filled by the tooling with the issue linked to the PR number.
#issue: https://github.com/owner/repo/1234
9 changes: 9 additions & 0 deletions internal/pkg/agent/application/application.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,9 @@ import (
"github.com/elastic/elastic-agent/version"
)

// CfgOverrider allows for application driven overrides of configuration read from disk.
type CfgOverrider func(cfg *configuration.Configuration)

// New creates a new Agent and bootstrap the required subsystem.
func New(
ctx context.Context,
Expand All @@ -47,6 +50,7 @@ func New(
testingMode bool,
fleetInitTimeout time.Duration,
disableMonitoring bool,
override CfgOverrider,
modifiers ...component.PlatformModifier,
) (*coordinator.Coordinator, coordinator.ConfigManager, composable.Controller, error) {

Expand Down Expand Up @@ -96,11 +100,16 @@ func New(
if err := info.InjectAgentConfig(rawConfig); err != nil {
return nil, nil, nil, fmt.Errorf("failed to load configuration: %w", err)
}

cfg, err := configuration.NewFromConfig(rawConfig)
if err != nil {
return nil, nil, nil, fmt.Errorf("failed to load configuration: %w", err)
}

if override != nil {
override(cfg)
}

// monitoring is not supported in bootstrap mode https://github.com/elastic/elastic-agent/issues/1761
isMonitoringSupported := !disableMonitoring && cfg.Settings.V1MonitoringEnabled
upgrader, err := upgrade.NewUpgrader(log, cfg.Settings.DownloadConfig, agentInfo)
Expand Down
1 change: 1 addition & 0 deletions internal/pkg/agent/application/application_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,7 @@ func TestLimitsLog(t *testing.T) {
true, // testingMode
time.Millisecond, // fleetInitTimeout
true, // disable monitoring
nil, // no configuration overrides
)
require.NoError(t, err)

Expand Down
9 changes: 5 additions & 4 deletions internal/pkg/agent/cmd/container.go
Original file line number Diff line number Diff line change
Expand Up @@ -77,7 +77,6 @@ The following actions are possible and grouped based on the actions.
ELASTIC_AGENT_CERT - path to certificate to use for connecting to fleet-server.
ELASTIC_AGENT_CERT_KEY - path to private key use for connecting to fleet-server.
The following vars are need in the scenario that Elastic Agent should automatically fetch its own token.
KIBANA_FLEET_HOST - Kibana host to enable create enrollment token on [$KIBANA_HOST]
Expand Down Expand Up @@ -282,7 +281,7 @@ func runContainerCmd(streams *cli.IOStreams, cfg setupConfig) error {
_, err = os.Stat(paths.AgentConfigFile())
if !os.IsNotExist(err) && !cfg.Fleet.Force {
// already enrolled, just run the standard run
return run(logToStderr, false, initTimeout, isContainer)
return run(containerCfgOverrides, false, initTimeout, isContainer)
}

if cfg.FleetServer.Enable {
Expand Down Expand Up @@ -336,7 +335,7 @@ func runContainerCmd(streams *cli.IOStreams, cfg setupConfig) error {
}
}

return run(logToStderr, false, initTimeout, isContainer)
return run(containerCfgOverrides, false, initTimeout, isContainer)
}

// TokenResp is used to decode a response for generating a service token
Expand Down Expand Up @@ -769,7 +768,7 @@ func runLegacyAPMServer(streams *cli.IOStreams) (*process.Info, error) {
return process.Start(spec.BinaryPath, options...)
}

func logToStderr(cfg *configuration.Configuration) {
func containerCfgOverrides(cfg *configuration.Configuration) {
logsPath := envWithDefault("", "LOGS_PATH")
if logsPath == "" {
// when no LOGS_PATH defined the container should log to stderr
Expand All @@ -786,6 +785,8 @@ func logToStderr(cfg *configuration.Configuration) {
cfg.Settings.EventLoggingConfig.ToFiles = false
cfg.Settings.EventLoggingConfig.ToStderr = true
}

configuration.OverrideDefaultContainerGRPCPort(cfg.Settings.GRPC)
}

func setPaths(statePath, configPath, logsPath, socketPath string, writePaths bool) error {
Expand Down
15 changes: 6 additions & 9 deletions internal/pkg/agent/cmd/run.go
Original file line number Diff line number Diff line change
Expand Up @@ -62,10 +62,6 @@ const (
flagRunDevelopment = "develop"
)

type (
cfgOverrider func(cfg *configuration.Configuration)
)

func newRunCommandWithArgs(_ []string, streams *cli.IOStreams) *cobra.Command {
cmd := &cobra.Command{
Use: "run",
Expand Down Expand Up @@ -120,7 +116,7 @@ func newRunCommandWithArgs(_ []string, streams *cli.IOStreams) *cobra.Command {
return cmd
}

func run(override cfgOverrider, testingMode bool, fleetInitTimeout time.Duration, modifiers ...component.PlatformModifier) error {
func run(override application.CfgOverrider, testingMode bool, fleetInitTimeout time.Duration, modifiers ...component.PlatformModifier) error {
// Windows: Mark service as stopped.
// After this is run, the service is considered by the OS to be stopped.
// This must be the first deferred cleanup task (last to execute).
Expand Down Expand Up @@ -164,7 +160,7 @@ func logReturn(l *logger.Logger, err error) error {
return err
}

func runElasticAgent(ctx context.Context, cancel context.CancelFunc, override cfgOverrider, stop chan bool, testingMode bool, fleetInitTimeout time.Duration, modifiers ...component.PlatformModifier) error {
func runElasticAgent(ctx context.Context, cancel context.CancelFunc, override application.CfgOverrider, stop chan bool, testingMode bool, fleetInitTimeout time.Duration, modifiers ...component.PlatformModifier) error {
cfg, err := loadConfig(ctx, override)
if err != nil {
return err
Expand Down Expand Up @@ -284,7 +280,8 @@ func runElasticAgent(ctx context.Context, cancel context.CancelFunc, override cf
l.Info("APM instrumentation disabled")
}

coord, configMgr, _, err := application.New(ctx, l, baseLogger, logLvl, agentInfo, rex, tracer, testingMode, fleetInitTimeout, configuration.IsFleetServerBootstrap(cfg.Fleet), modifiers...)
isBootstrap := configuration.IsFleetServerBootstrap(cfg.Fleet)
coord, configMgr, _, err := application.New(ctx, l, baseLogger, logLvl, agentInfo, rex, tracer, testingMode, fleetInitTimeout, isBootstrap, override, modifiers...)
if err != nil {
return logReturn(l, err)
}
Expand Down Expand Up @@ -401,7 +398,7 @@ LOOP:
return logReturn(l, err)
}

func loadConfig(ctx context.Context, override cfgOverrider) (*configuration.Configuration, error) {
func loadConfig(ctx context.Context, override application.CfgOverrider) (*configuration.Configuration, error) {
pathConfigFile := paths.ConfigFile()
rawConfig, err := config.LoadFile(pathConfigFile)
if err != nil {
Expand Down Expand Up @@ -503,7 +500,7 @@ func defaultLogLevel(cfg *configuration.Configuration, currentLevel string) stri
return ""
}

func tryDelayEnroll(ctx context.Context, logger *logger.Logger, cfg *configuration.Configuration, override cfgOverrider) (*configuration.Configuration, error) {
func tryDelayEnroll(ctx context.Context, logger *logger.Logger, cfg *configuration.Configuration, override application.CfgOverrider) (*configuration.Configuration, error) {
enrollPath := paths.AgentEnrollFile()
if _, err := os.Stat(enrollPath); err != nil {
//nolint:nilerr // ignore the error, this is expected
Expand Down
48 changes: 38 additions & 10 deletions internal/pkg/agent/configuration/grpc.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,10 +6,30 @@ package configuration

import (
"fmt"
"os"
"strconv"

"github.com/elastic/elastic-agent/internal/pkg/agent/application/paths"
)

const (
// DefaultGRPCPort is the default non-zero port in most situations. Ideally we'd always bind to
// port 0 to avoid collisions with other Elastic Agents or applications, but this would be a
// breaking change for users that have to manually whitelist the gRPC port in local firewall rules.
DefaultGRPCPort = uint16(6789)

// DefaultGRPCPortInInstallNamespace is the port used when explicitly installed in an installation
// namespace to allow multiple Elastic Agents on the same machine. Must be zero to avoid collisions.
DefaultGRPCPortInInstallNamespace = uint16(0)

// DefaultGPRCPortInContainer is the port used in a container. Defaults to zero to allow use of
// host networking (hostNetwork: true) in Kubernetes without port collisions between pods.
DefaultGPRCPortInContainer = uint16(0)

// grpcPortContainerEnvVar is the environment variable allowing containers to specify a fixed port.
grpcPortContainerEnvVar = "ELASTIC_AGENT_GRPC_PORT"
)

// GRPCConfig is a configuration of GRPC server.
type GRPCConfig struct {
Address string `config:"address"`
Expand All @@ -20,17 +40,9 @@ type GRPCConfig struct {

// DefaultGRPCConfig creates a default server configuration.
func DefaultGRPCConfig() *GRPCConfig {
// When in an installation namespace, bind to port zero to select a random free port to avoid
// collisions with any already installed Elastic Agent. Ideally we'd always bind to port zero,
// but this would be breaking for users that had to manually whitelist the gRPC port in local
// firewall rules.
//
// Note: this uses local TCP by default. A port of -1 switches to unix domain sockets / named
// pipes. Using domain sockets by default is preferable but is currently blocked because the
// gRPC library endpoint security uses does not support Windows named pipes.
defaultPort := uint16(6789)
defaultPort := DefaultGRPCPort
if paths.InInstallNamespace() {
defaultPort = 0
defaultPort = DefaultGRPCPortInInstallNamespace
}

return &GRPCConfig{
Expand All @@ -41,6 +53,22 @@ func DefaultGRPCConfig() *GRPCConfig {
}
}

// OverrideDefaultContainerGRPCPort is the configuration override used by the container command
// to switch to a more convenient default port.
func OverrideDefaultContainerGRPCPort(cfg *GRPCConfig) {
cfg.Port = DefaultGPRCPortInContainer

// Allow manually specifying the port via an undocumented environment variable in case
// the change from the original DefaultGRPCPort causes unexpected problems.
grpcPortEnv, ok := os.LookupEnv(grpcPortContainerEnvVar)
if ok {
port, err := strconv.Atoi(grpcPortEnv)
if err == nil {
cfg.Port = uint16(port) //nolint:gosec // integer size truncation is fine here.
}
}
}

// String returns the composed listen address for the GRPC.
func (cfg *GRPCConfig) String() string {
return fmt.Sprintf("%s:%d", cfg.Address, cfg.Port)
Expand Down
44 changes: 44 additions & 0 deletions internal/pkg/agent/configuration/grpc_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,44 @@
// Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
// or more contributor license agreements. Licensed under the Elastic License 2.0;
// you may not use this file except in compliance with the Elastic License 2.0.

package configuration

import (
"os"
"testing"

"github.com/stretchr/testify/assert"
)

func TestOverrideDefaultGRPCPort(t *testing.T) {
testcases := []struct {
name string
env string
expected uint16
}{{
name: "no env var",
env: "",
expected: DefaultGPRCPortInContainer,
}, {
name: "valid env var",
env: "1234",
expected: 1234,
}, {
name: "invalid env var",
env: "not a number",
expected: DefaultGPRCPortInContainer,
}}

for _, tc := range testcases {
t.Run(tc.name, func(t *testing.T) {
cfg := GRPCConfig{}
if tc.env != "" {
os.Setenv(grpcPortContainerEnvVar, tc.env)
defer os.Unsetenv(grpcPortContainerEnvVar)
}
OverrideDefaultContainerGRPCPort(&cfg)
assert.Equal(t, tc.expected, cfg.Port)
})
}
}
13 changes: 12 additions & 1 deletion testing/integration/kubernetes_agent_standalone_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -267,7 +267,10 @@ func TestKubernetesAgentHelm(t *testing.T) {
steps []k8sTestStep
}{
{
name: "helm standalone agent default kubernetes privileged",
// Configure the perNode and clusterWide agents to both use host networking. On the node that
// runs the clusterWide agent, this tests that two agents do not try to bind to the same
// gRPC control protocol port by default preventing one from starting.
name: "helm standalone agent default kubernetes privileged without host network port collision",
steps: []k8sTestStep{
k8sStepCreateNamespace(),
k8sStepHelmDeploy(agentK8SHelm, "helm-agent", map[string]any{
Expand All @@ -281,6 +284,14 @@ func TestKubernetesAgentHelm(t *testing.T) {
"tag": kCtx.agentImageTag,
"pullPolicy": "Never",
},
"presets": map[string]any{
"clusterWide": map[string]any{
"hostNetwork": true,
},
"perNode": map[string]any{
"hostNetwork": true,
},
},
},
"outputs": map[string]any{
"default": map[string]any{
Expand Down

0 comments on commit 3593c50

Please sign in to comment.