Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Context propagation in facts gathering flow #360

Merged
merged 12 commits into from
Jan 10, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -253,7 +253,7 @@ To create a new plugin (check the [example](plugin_examples/dummy.go) dummy plug

- Create a new Golang package. This is as simple as creating a new folder (it can be created anywhere, it doesn't need to be in the Agent code directory) with `.go` file inside. Name the Golang file with a meaningful name (even though, it is not relevant for the usage itself).
- The `.go` file implements the `main` package and imports the `go-plugin` package as seen in the example.
- Implement the gathering function with the `func (s exampleGatherer) Gather(factsRequests []gatherers.FactRequest) ([]gatherers.Fact, error)` signature. This function must gather the facts from the system where the Agent is running.
- Implement the gathering function with the `func (s exampleGatherer) Gather(ctx context.Context, factsRequests []gatherers.FactRequest) ([]gatherers.Fact, error)` signature. This function must gather the facts from the system where the Agent is running.
- This function receives a list of fact gathering requests to gather, which entirely depends on the gathering code nature.
- Copy the `main()` function from the [example](plugin_examples/dummy.go) file. Simply replace the gatherer struct name there.
- Once the plugin is implemented, it must be compiled. Use the next command for that: `go build -o /usr/etc/trento/example ./your_plugin_folder/example.go`. The `-o` flag specifies the destination of the created binary, which the Agent needs to load. This folder is the same specified in the `--plugins-folder` flag in the Agent execution. In this case, the used name for the output in the `-o` flag is relevant, as this name is the gatherer name that must be used in the server side checks declaration.
Expand Down
28 changes: 26 additions & 2 deletions cmd/facts.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,11 @@
package cmd

import (
"context"
"os"
"os/signal"
"syscall"

"github.com/pkg/errors"
log "github.com/sirupsen/logrus"
"github.com/spf13/cobra"
Expand Down Expand Up @@ -71,7 +76,7 @@ func NewFactsListCmd() *cobra.Command {
return gatherCmd
}

func gather(*cobra.Command, []string) {
func gather(cmd *cobra.Command, _ []string) {
var gatherer = viper.GetString("gatherer")
var argument = viper.GetString("argument")
var pluginsFolder = viper.GetString("plugins-folder")
Expand Down Expand Up @@ -108,7 +113,26 @@ func gather(*cobra.Command, []string) {
},
}

value, err := g.Gather(factRequest)
ctx, cancel := context.WithCancel(cmd.Context())

signals := make(chan os.Signal, 1)
signal.Notify(signals, syscall.SIGINT, syscall.SIGTERM)
cancelled := false
go func() {
<-signals
log.Info("Caught signal!")
cancelled = true
cancel()

}()

value, err := g.Gather(ctx, factRequest)

if cancelled {
balanza marked this conversation as resolved.
Show resolved Hide resolved
log.Info("Gathering cancelled")
return
}

if err != nil {
log.Errorf("Error gathering fact \"%s\" with argument \"%s\"", gatherer, argument)
cleanupAndFatal(err)
Expand Down
2 changes: 1 addition & 1 deletion internal/factsengine/factsengine.go
Original file line number Diff line number Diff line change
Expand Up @@ -69,7 +69,7 @@ func (c *FactsEngine) Listen(ctx context.Context) error {
}
}()
queue := fmt.Sprintf(agentsQueue, c.agentID)
if err := c.factsServiceAdapter.Listen(queue, exchange, agentsEventsRoutingKey, c.handleEvent); err != nil {
if err := c.factsServiceAdapter.Listen(queue, exchange, agentsEventsRoutingKey, c.makeEventHandler(ctx)); err != nil {
return err
}

Expand Down
2 changes: 1 addition & 1 deletion internal/factsengine/factsengine_integration_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -65,7 +65,7 @@ func NewFactsEngineIntegrationTestGatherer() *FactsEngineIntegrationTestGatherer
return &FactsEngineIntegrationTestGatherer{}
}

func (s *FactsEngineIntegrationTestGatherer) Gather(requests []entities.FactRequest) ([]entities.Fact, error) {
func (s *FactsEngineIntegrationTestGatherer) Gather(_ context.Context, requests []entities.FactRequest) ([]entities.Fact, error) {
facts := []entities.Fact{}
for i, req := range requests {
fact := entities.Fact{
Expand Down
4 changes: 3 additions & 1 deletion internal/factsengine/gatherers/_todo/crmmon.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
package gatherers

import (
"context"

log "github.com/sirupsen/logrus"
"github.com/trento-project/agent/pkg/factsengine/entities"
)
Expand All @@ -23,7 +25,7 @@ func NewCrmMonGatherer(executor CommandExecutor) *CrmMonGatherer {
}
}

func (g *CrmMonGatherer) Gather(factsRequests []entities.FactRequest) ([]entities.FactsGatheredItem, error) {
func (g *CrmMonGatherer) Gather(_ context.Context, factsRequests []entities.FactRequest) ([]entities.FactsGatheredItem, error) {
log.Infof("Starting crmmon facts gathering process")

crmmon, err := g.executor.Exec("crm_mon", "--output-as", "xml")
Expand Down
7 changes: 4 additions & 3 deletions internal/factsengine/gatherers/_todo/crmmon_test.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package gatherers // nolint

import (
"context"
"errors"
"io"
"os"
Expand Down Expand Up @@ -51,7 +52,7 @@ func (suite *CrmMonTestSuite) TestCrmMonGather() {
},
}

factResults, err := p.Gather(factRequests)
factResults, err := p.Gather(context.Background(), factRequests)

expectedResults := []entities.FactsGatheredItem{
{
Expand Down Expand Up @@ -90,7 +91,7 @@ func (suite *CrmMonTestSuite) TestCrmMonGatherCmdNotFound() {
},
}

_, err := p.Gather(factRequests)
_, err := p.Gather(context.Background(), factRequests)

suite.EqualError(err, "crm_mon not found")
}
Expand All @@ -117,7 +118,7 @@ func (suite *CrmMonTestSuite) TestCrmMonGatherError() {
},
}

factResults, err := p.Gather(factRequests)
factResults, err := p.Gather(context.Background(), factRequests)

expectedResults := []entities.FactsGatheredItem{
{
Expand Down
5 changes: 4 additions & 1 deletion internal/factsengine/gatherers/ascsers_cluster.go
Original file line number Diff line number Diff line change
Expand Up @@ -74,7 +74,10 @@ func (g *AscsErsClusterGatherer) SetCache(cache *factscache.FactsCache) {
g.cache = cache
}

func (g *AscsErsClusterGatherer) Gather(factsRequests []entities.FactRequest) ([]entities.Fact, error) {
func (g *AscsErsClusterGatherer) Gather(
_ context.Context,
factsRequests []entities.FactRequest,
) ([]entities.Fact, error) {
log.Infof("Starting %s facts gathering process", AscsErsClusterGathererName)
var cibdata cib.Root

Expand Down
10 changes: 5 additions & 5 deletions internal/factsengine/gatherers/ascsers_cluster_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,7 @@ func (suite *AscsErsClusterTestSuite) TestAscsErsClusterGatherCmdNotFound() {
},
}

_, err := p.Gather(factRequests)
_, err := p.Gather(context.Background(), factRequests)

suite.EqualError(err, "fact gathering error: cibadmin-command-error - "+
"error running cibadmin command: cibadmin not found")
Expand All @@ -74,7 +74,7 @@ func (suite *AscsErsClusterTestSuite) TestAscsErsClusterGatherCacheCastingError(
},
}

_, err = p.Gather(factRequests)
_, err = p.Gather(context.Background(), factRequests)

suite.EqualError(err, "fact gathering error: ascsers-cluster-decoding-error - "+
"error decoding cibadmin output: error casting the command output")
Expand All @@ -98,7 +98,7 @@ func (suite *AscsErsClusterTestSuite) TestAscsErsClusterGatherInvalidInstanceNam
},
}

_, err := p.Gather(factRequests)
_, err := p.Gather(context.Background(), factRequests)

suite.EqualError(err, "fact gathering error: ascsers-cluster-cib-error - "+
"error parsing cibadmin output: incorrect InstanceName property value: PRD_ASCS00")
Expand All @@ -123,7 +123,7 @@ func (suite *AscsErsClusterTestSuite) TestAscsErsClusterGatherInvalidInstanceNum
},
}

_, err := p.Gather(factRequests)
_, err := p.Gather(context.Background(), factRequests)

suite.EqualError(err, "fact gathering error: ascsers-cluster-cib-error - "+
"error parsing cibadmin output: "+
Expand Down Expand Up @@ -195,7 +195,7 @@ func (suite *AscsErsClusterTestSuite) TestAscsErsClusterGather() {
},
}

results, err := p.Gather(factRequests)
results, err := p.Gather(context.Background(), factRequests)

// nolint:dupl
expectedFacts := []entities.Fact{
Expand Down
4 changes: 3 additions & 1 deletion internal/factsengine/gatherers/cibadmin.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
package gatherers

import (
"context"

log "github.com/sirupsen/logrus"
"github.com/trento-project/agent/internal/factsengine/factscache"
"github.com/trento-project/agent/pkg/factsengine/entities"
Expand Down Expand Up @@ -53,7 +55,7 @@ func memoizeCibAdmin(args ...interface{}) (interface{}, error) {
return executor.Exec("cibadmin", "--query", "--local")
}

func (g *CibAdminGatherer) Gather(factsRequests []entities.FactRequest) ([]entities.Fact, error) {
func (g *CibAdminGatherer) Gather(_ context.Context, factsRequests []entities.FactRequest) ([]entities.Fact, error) {
log.Infof("Starting %s facts gathering process", CibAdminGathererName)

content, err := factscache.GetOrUpdate(g.cache, CibAdminGathererCache, memoizeCibAdmin, g.executor)
Expand Down
13 changes: 7 additions & 6 deletions internal/factsengine/gatherers/cibadmin_test.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package gatherers_test

import (
"context"
"errors"
"io"
"os"
Expand Down Expand Up @@ -50,7 +51,7 @@ func (suite *CibAdminTestSuite) TestCibAdminGatherCmdNotFound() {
},
}

_, err := p.Gather(factRequests)
_, err := p.Gather(context.Background(), factRequests)

suite.EqualError(err, "fact gathering error: cibadmin-command-error - "+
"error running cibadmin command: cibadmin not found")
Expand All @@ -71,7 +72,7 @@ func (suite *CibAdminTestSuite) TestCibAdminInvalidXML() {
},
}

_, err := p.Gather(factRequests)
_, err := p.Gather(context.Background(), factRequests)

suite.EqualError(err, "fact gathering error: cibadmin-decoding-error - "+
"error decoding cibadmin output: EOF")
Expand Down Expand Up @@ -116,7 +117,7 @@ func (suite *CibAdminTestSuite) TestCibAdminGather() {
},
}

factResults, err := p.Gather(factRequests)
factResults, err := p.Gather(context.Background(), factRequests)

expectedResults := []entities.Fact{
{
Expand Down Expand Up @@ -234,11 +235,11 @@ func (suite *CibAdminTestSuite) TestCibAdminGatherWithCache() {
},
}

factResults, err := p.Gather(factRequests)
factResults, err := p.Gather(context.Background(), factRequests)
suite.NoError(err)
suite.ElementsMatch(expectedResults, factResults)

_, err = p.Gather(factRequests)
_, err = p.Gather(context.Background(), factRequests)
suite.NoError(err)

entries := cache.Entries()
Expand All @@ -263,7 +264,7 @@ func (suite *CibAdminTestSuite) TestCibAdminGatherCacheCastingError() {
},
}

_, err = p.Gather(factRequests)
_, err = p.Gather(context.Background(), factRequests)

suite.EqualError(err, "fact gathering error: cibadmin-decoding-error - "+
"error decoding cibadmin output: error casting the command output")
Expand Down
6 changes: 5 additions & 1 deletion internal/factsengine/gatherers/corosynccmapctl.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package gatherers

import (
"context"
"strings"

log "github.com/sirupsen/logrus"
Expand Down Expand Up @@ -80,7 +81,10 @@ func corosyncCmapctlOutputToMap(corosyncCmapctlOutput string) *entities.FactValu
return outputMap
}

func (s *CorosyncCmapctlGatherer) Gather(factsRequests []entities.FactRequest) ([]entities.Fact, error) {
func (s *CorosyncCmapctlGatherer) Gather(
_ context.Context,
factsRequests []entities.FactRequest,
) ([]entities.Fact, error) {
facts := []entities.Fact{}
log.Infof("Starting %s facts gathering process", CorosyncCmapCtlGathererName)

Expand Down
10 changes: 6 additions & 4 deletions internal/factsengine/gatherers/corosynccmapctl_test.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package gatherers_test

import (
"context"
"io"
"os"
"os/exec"
Expand All @@ -26,6 +27,7 @@ func (suite *CorosyncCmapctlTestSuite) SetupTest() {
suite.mockExecutor = new(utilsMocks.CommandExecutor)
}

// nolint:dupl
func (suite *CorosyncCmapctlTestSuite) TestCorosyncCmapctlGathererNoArgumentProvided() {
mockOutputFile, _ := os.Open(helpers.GetFixturePath("gatherers/corosynccmap-ctl.output"))
mockOutput, _ := io.ReadAll(mockOutputFile)
Expand All @@ -45,7 +47,7 @@ func (suite *CorosyncCmapctlTestSuite) TestCorosyncCmapctlGathererNoArgumentProv
},
}

factResults, err := c.Gather(factRequests)
factResults, err := c.Gather(context.Background(), factRequests)

expectedResults := []entities.Fact{
{
Expand Down Expand Up @@ -85,7 +87,7 @@ func (suite *CorosyncCmapctlTestSuite) TestCorosyncCmapctlGathererNonExistingKey
},
}

factResults, err := c.Gather(factRequests)
factResults, err := c.Gather(context.Background(), factRequests)

expectedResults := []entities.Fact{
{
Expand Down Expand Up @@ -120,7 +122,7 @@ func (suite *CorosyncCmapctlTestSuite) TestCorosyncCmapctlCommandNotFound() {
Type: "corosync-cmapctl-command-error",
}

factResults, err := c.Gather(factRequests)
factResults, err := c.Gather(context.Background(), factRequests)

suite.EqualError(err, expectedError.Error())

Expand Down Expand Up @@ -157,7 +159,7 @@ func (suite *CorosyncCmapctlTestSuite) TestCorosyncCmapctlGatherer() {
},
}

factResults, err := c.Gather(factRequests)
factResults, err := c.Gather(context.Background(), factRequests)

expectedResults := []entities.Fact{
{
Expand Down
6 changes: 5 additions & 1 deletion internal/factsengine/gatherers/corosyncconf.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package gatherers

import (
"bufio"
"context"
"fmt"
"os"
"regexp"
Expand Down Expand Up @@ -49,7 +50,10 @@ func NewCorosyncConfGatherer(configFile string) *CorosyncConfGatherer {
}
}

func (s *CorosyncConfGatherer) Gather(factsRequests []entities.FactRequest) ([]entities.Fact, error) {
func (s *CorosyncConfGatherer) Gather(
_ context.Context,
factsRequests []entities.FactRequest,
) ([]entities.Fact, error) {
facts := []entities.Fact{}
log.Infof("Starting corosync.conf file facts gathering process")

Expand Down
Loading
Loading