Skip to content

Commit

Permalink
Context propagation in facts gathering flow (#360)
Browse files Browse the repository at this point in the history
  • Loading branch information
balanza authored Jan 10, 2025
1 parent 162b64a commit abd0f06
Show file tree
Hide file tree
Showing 66 changed files with 422 additions and 185 deletions.
2 changes: 1 addition & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -253,7 +253,7 @@ To create a new plugin (check the [example](plugin_examples/dummy.go) dummy plug

- Create a new Golang package. This is as simple as creating a new folder (it can be created anywhere, it doesn't need to be in the Agent code directory) with `.go` file inside. Name the Golang file with a meaningful name (even though, it is not relevant for the usage itself).
- The `.go` file implements the `main` package and imports the `go-plugin` package as seen in the example.
- Implement the gathering function with the `func (s exampleGatherer) Gather(factsRequests []gatherers.FactRequest) ([]gatherers.Fact, error)` signature. This function must gather the facts from the system where the Agent is running.
- Implement the gathering function with the `func (s exampleGatherer) Gather(ctx context.Context, factsRequests []gatherers.FactRequest) ([]gatherers.Fact, error)` signature. This function must gather the facts from the system where the Agent is running.
- This function receives a list of fact gathering requests to gather, which entirely depends on the gathering code nature.
- Copy the `main()` function from the [example](plugin_examples/dummy.go) file. Simply replace the gatherer struct name there.
- Once the plugin is implemented, it must be compiled. Use the next command for that: `go build -o /usr/etc/trento/example ./your_plugin_folder/example.go`. The `-o` flag specifies the destination of the created binary, which the Agent needs to load. This folder is the same specified in the `--plugins-folder` flag in the Agent execution. In this case, the used name for the output in the `-o` flag is relevant, as this name is the gatherer name that must be used in the server side checks declaration.
Expand Down
28 changes: 26 additions & 2 deletions cmd/facts.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,11 @@
package cmd

import (
"context"
"os"
"os/signal"
"syscall"

"github.com/pkg/errors"
log "github.com/sirupsen/logrus"
"github.com/spf13/cobra"
Expand Down Expand Up @@ -71,7 +76,7 @@ func NewFactsListCmd() *cobra.Command {
return gatherCmd
}

func gather(*cobra.Command, []string) {
func gather(cmd *cobra.Command, _ []string) {
var gatherer = viper.GetString("gatherer")
var argument = viper.GetString("argument")
var pluginsFolder = viper.GetString("plugins-folder")
Expand Down Expand Up @@ -108,7 +113,26 @@ func gather(*cobra.Command, []string) {
},
}

value, err := g.Gather(factRequest)
ctx, cancel := context.WithCancel(cmd.Context())

signals := make(chan os.Signal, 1)
signal.Notify(signals, syscall.SIGINT, syscall.SIGTERM)
cancelled := false
go func() {
<-signals
log.Info("Caught signal!")
cancelled = true
cancel()

}()

value, err := g.Gather(ctx, factRequest)

if cancelled {
log.Info("Gathering cancelled")
return
}

if err != nil {
log.Errorf("Error gathering fact \"%s\" with argument \"%s\"", gatherer, argument)
cleanupAndFatal(err)
Expand Down
2 changes: 1 addition & 1 deletion internal/factsengine/factsengine.go
Original file line number Diff line number Diff line change
Expand Up @@ -69,7 +69,7 @@ func (c *FactsEngine) Listen(ctx context.Context) error {
}
}()
queue := fmt.Sprintf(agentsQueue, c.agentID)
if err := c.factsServiceAdapter.Listen(queue, exchange, agentsEventsRoutingKey, c.handleEvent); err != nil {
if err := c.factsServiceAdapter.Listen(queue, exchange, agentsEventsRoutingKey, c.makeEventHandler(ctx)); err != nil {
return err
}

Expand Down
2 changes: 1 addition & 1 deletion internal/factsengine/factsengine_integration_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -65,7 +65,7 @@ func NewFactsEngineIntegrationTestGatherer() *FactsEngineIntegrationTestGatherer
return &FactsEngineIntegrationTestGatherer{}
}

func (s *FactsEngineIntegrationTestGatherer) Gather(requests []entities.FactRequest) ([]entities.Fact, error) {
func (s *FactsEngineIntegrationTestGatherer) Gather(_ context.Context, requests []entities.FactRequest) ([]entities.Fact, error) {
facts := []entities.Fact{}
for i, req := range requests {
fact := entities.Fact{
Expand Down
4 changes: 3 additions & 1 deletion internal/factsengine/gatherers/_todo/crmmon.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
package gatherers

import (
"context"

log "github.com/sirupsen/logrus"
"github.com/trento-project/agent/pkg/factsengine/entities"
)
Expand All @@ -23,7 +25,7 @@ func NewCrmMonGatherer(executor CommandExecutor) *CrmMonGatherer {
}
}

func (g *CrmMonGatherer) Gather(factsRequests []entities.FactRequest) ([]entities.FactsGatheredItem, error) {
func (g *CrmMonGatherer) Gather(_ context.Context, factsRequests []entities.FactRequest) ([]entities.FactsGatheredItem, error) {
log.Infof("Starting crmmon facts gathering process")

crmmon, err := g.executor.Exec("crm_mon", "--output-as", "xml")
Expand Down
7 changes: 4 additions & 3 deletions internal/factsengine/gatherers/_todo/crmmon_test.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package gatherers // nolint

import (
"context"
"errors"
"io"
"os"
Expand Down Expand Up @@ -51,7 +52,7 @@ func (suite *CrmMonTestSuite) TestCrmMonGather() {
},
}

factResults, err := p.Gather(factRequests)
factResults, err := p.Gather(context.Background(), factRequests)

expectedResults := []entities.FactsGatheredItem{
{
Expand Down Expand Up @@ -90,7 +91,7 @@ func (suite *CrmMonTestSuite) TestCrmMonGatherCmdNotFound() {
},
}

_, err := p.Gather(factRequests)
_, err := p.Gather(context.Background(), factRequests)

suite.EqualError(err, "crm_mon not found")
}
Expand All @@ -117,7 +118,7 @@ func (suite *CrmMonTestSuite) TestCrmMonGatherError() {
},
}

factResults, err := p.Gather(factRequests)
factResults, err := p.Gather(context.Background(), factRequests)

expectedResults := []entities.FactsGatheredItem{
{
Expand Down
5 changes: 4 additions & 1 deletion internal/factsengine/gatherers/ascsers_cluster.go
Original file line number Diff line number Diff line change
Expand Up @@ -74,7 +74,10 @@ func (g *AscsErsClusterGatherer) SetCache(cache *factscache.FactsCache) {
g.cache = cache
}

func (g *AscsErsClusterGatherer) Gather(factsRequests []entities.FactRequest) ([]entities.Fact, error) {
func (g *AscsErsClusterGatherer) Gather(
_ context.Context,
factsRequests []entities.FactRequest,
) ([]entities.Fact, error) {
log.Infof("Starting %s facts gathering process", AscsErsClusterGathererName)
var cibdata cib.Root

Expand Down
10 changes: 5 additions & 5 deletions internal/factsengine/gatherers/ascsers_cluster_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,7 @@ func (suite *AscsErsClusterTestSuite) TestAscsErsClusterGatherCmdNotFound() {
},
}

_, err := p.Gather(factRequests)
_, err := p.Gather(context.Background(), factRequests)

suite.EqualError(err, "fact gathering error: cibadmin-command-error - "+
"error running cibadmin command: cibadmin not found")
Expand All @@ -74,7 +74,7 @@ func (suite *AscsErsClusterTestSuite) TestAscsErsClusterGatherCacheCastingError(
},
}

_, err = p.Gather(factRequests)
_, err = p.Gather(context.Background(), factRequests)

suite.EqualError(err, "fact gathering error: ascsers-cluster-decoding-error - "+
"error decoding cibadmin output: error casting the command output")
Expand All @@ -98,7 +98,7 @@ func (suite *AscsErsClusterTestSuite) TestAscsErsClusterGatherInvalidInstanceNam
},
}

_, err := p.Gather(factRequests)
_, err := p.Gather(context.Background(), factRequests)

suite.EqualError(err, "fact gathering error: ascsers-cluster-cib-error - "+
"error parsing cibadmin output: incorrect InstanceName property value: PRD_ASCS00")
Expand All @@ -123,7 +123,7 @@ func (suite *AscsErsClusterTestSuite) TestAscsErsClusterGatherInvalidInstanceNum
},
}

_, err := p.Gather(factRequests)
_, err := p.Gather(context.Background(), factRequests)

suite.EqualError(err, "fact gathering error: ascsers-cluster-cib-error - "+
"error parsing cibadmin output: "+
Expand Down Expand Up @@ -195,7 +195,7 @@ func (suite *AscsErsClusterTestSuite) TestAscsErsClusterGather() {
},
}

results, err := p.Gather(factRequests)
results, err := p.Gather(context.Background(), factRequests)

// nolint:dupl
expectedFacts := []entities.Fact{
Expand Down
4 changes: 3 additions & 1 deletion internal/factsengine/gatherers/cibadmin.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
package gatherers

import (
"context"

log "github.com/sirupsen/logrus"
"github.com/trento-project/agent/internal/factsengine/factscache"
"github.com/trento-project/agent/pkg/factsengine/entities"
Expand Down Expand Up @@ -53,7 +55,7 @@ func memoizeCibAdmin(args ...interface{}) (interface{}, error) {
return executor.Exec("cibadmin", "--query", "--local")
}

func (g *CibAdminGatherer) Gather(factsRequests []entities.FactRequest) ([]entities.Fact, error) {
func (g *CibAdminGatherer) Gather(_ context.Context, factsRequests []entities.FactRequest) ([]entities.Fact, error) {
log.Infof("Starting %s facts gathering process", CibAdminGathererName)

content, err := factscache.GetOrUpdate(g.cache, CibAdminGathererCache, memoizeCibAdmin, g.executor)
Expand Down
13 changes: 7 additions & 6 deletions internal/factsengine/gatherers/cibadmin_test.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package gatherers_test

import (
"context"
"errors"
"io"
"os"
Expand Down Expand Up @@ -50,7 +51,7 @@ func (suite *CibAdminTestSuite) TestCibAdminGatherCmdNotFound() {
},
}

_, err := p.Gather(factRequests)
_, err := p.Gather(context.Background(), factRequests)

suite.EqualError(err, "fact gathering error: cibadmin-command-error - "+
"error running cibadmin command: cibadmin not found")
Expand All @@ -71,7 +72,7 @@ func (suite *CibAdminTestSuite) TestCibAdminInvalidXML() {
},
}

_, err := p.Gather(factRequests)
_, err := p.Gather(context.Background(), factRequests)

suite.EqualError(err, "fact gathering error: cibadmin-decoding-error - "+
"error decoding cibadmin output: EOF")
Expand Down Expand Up @@ -116,7 +117,7 @@ func (suite *CibAdminTestSuite) TestCibAdminGather() {
},
}

factResults, err := p.Gather(factRequests)
factResults, err := p.Gather(context.Background(), factRequests)

expectedResults := []entities.Fact{
{
Expand Down Expand Up @@ -234,11 +235,11 @@ func (suite *CibAdminTestSuite) TestCibAdminGatherWithCache() {
},
}

factResults, err := p.Gather(factRequests)
factResults, err := p.Gather(context.Background(), factRequests)
suite.NoError(err)
suite.ElementsMatch(expectedResults, factResults)

_, err = p.Gather(factRequests)
_, err = p.Gather(context.Background(), factRequests)
suite.NoError(err)

entries := cache.Entries()
Expand All @@ -263,7 +264,7 @@ func (suite *CibAdminTestSuite) TestCibAdminGatherCacheCastingError() {
},
}

_, err = p.Gather(factRequests)
_, err = p.Gather(context.Background(), factRequests)

suite.EqualError(err, "fact gathering error: cibadmin-decoding-error - "+
"error decoding cibadmin output: error casting the command output")
Expand Down
6 changes: 5 additions & 1 deletion internal/factsengine/gatherers/corosynccmapctl.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package gatherers

import (
"context"
"strings"

log "github.com/sirupsen/logrus"
Expand Down Expand Up @@ -80,7 +81,10 @@ func corosyncCmapctlOutputToMap(corosyncCmapctlOutput string) *entities.FactValu
return outputMap
}

func (s *CorosyncCmapctlGatherer) Gather(factsRequests []entities.FactRequest) ([]entities.Fact, error) {
func (s *CorosyncCmapctlGatherer) Gather(
_ context.Context,
factsRequests []entities.FactRequest,
) ([]entities.Fact, error) {
facts := []entities.Fact{}
log.Infof("Starting %s facts gathering process", CorosyncCmapCtlGathererName)

Expand Down
10 changes: 6 additions & 4 deletions internal/factsengine/gatherers/corosynccmapctl_test.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package gatherers_test

import (
"context"
"io"
"os"
"os/exec"
Expand All @@ -26,6 +27,7 @@ func (suite *CorosyncCmapctlTestSuite) SetupTest() {
suite.mockExecutor = new(utilsMocks.CommandExecutor)
}

// nolint:dupl
func (suite *CorosyncCmapctlTestSuite) TestCorosyncCmapctlGathererNoArgumentProvided() {
mockOutputFile, _ := os.Open(helpers.GetFixturePath("gatherers/corosynccmap-ctl.output"))
mockOutput, _ := io.ReadAll(mockOutputFile)
Expand All @@ -45,7 +47,7 @@ func (suite *CorosyncCmapctlTestSuite) TestCorosyncCmapctlGathererNoArgumentProv
},
}

factResults, err := c.Gather(factRequests)
factResults, err := c.Gather(context.Background(), factRequests)

expectedResults := []entities.Fact{
{
Expand Down Expand Up @@ -85,7 +87,7 @@ func (suite *CorosyncCmapctlTestSuite) TestCorosyncCmapctlGathererNonExistingKey
},
}

factResults, err := c.Gather(factRequests)
factResults, err := c.Gather(context.Background(), factRequests)

expectedResults := []entities.Fact{
{
Expand Down Expand Up @@ -120,7 +122,7 @@ func (suite *CorosyncCmapctlTestSuite) TestCorosyncCmapctlCommandNotFound() {
Type: "corosync-cmapctl-command-error",
}

factResults, err := c.Gather(factRequests)
factResults, err := c.Gather(context.Background(), factRequests)

suite.EqualError(err, expectedError.Error())

Expand Down Expand Up @@ -157,7 +159,7 @@ func (suite *CorosyncCmapctlTestSuite) TestCorosyncCmapctlGatherer() {
},
}

factResults, err := c.Gather(factRequests)
factResults, err := c.Gather(context.Background(), factRequests)

expectedResults := []entities.Fact{
{
Expand Down
6 changes: 5 additions & 1 deletion internal/factsengine/gatherers/corosyncconf.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package gatherers

import (
"bufio"
"context"
"fmt"
"os"
"regexp"
Expand Down Expand Up @@ -49,7 +50,10 @@ func NewCorosyncConfGatherer(configFile string) *CorosyncConfGatherer {
}
}

func (s *CorosyncConfGatherer) Gather(factsRequests []entities.FactRequest) ([]entities.Fact, error) {
func (s *CorosyncConfGatherer) Gather(
_ context.Context,
factsRequests []entities.FactRequest,
) ([]entities.Fact, error) {
facts := []entities.Fact{}
log.Infof("Starting corosync.conf file facts gathering process")

Expand Down
Loading

0 comments on commit abd0f06

Please sign in to comment.