Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: add support for running external commands #356

Merged
merged 8 commits into from
Feb 17, 2025
175 changes: 109 additions & 66 deletions cmd/inventory/tasks.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ import (
"os"
"sort"
"strconv"
"time"

"github.com/hibiken/asynq"
"github.com/urfave/cli/v2"
Expand Down Expand Up @@ -79,9 +80,10 @@ func NewTaskCommand() *cli.Command {
Required: true,
},
&cli.StringFlag{
Name: "queue",
Usage: "name of queue to use",
Value: "default",
Name: "queue",
Aliases: []string{"q"},
Usage: "name of queue to use",
Value: "default",
},
},
Action: func(ctx *cli.Context) error {
Expand All @@ -99,19 +101,22 @@ func NewTaskCommand() *cli.Command {
Aliases: []string{"a"},
Flags: []cli.Flag{
&cli.StringFlag{
Name: "queue",
Usage: "name of queue to use",
Value: "default",
Name: "queue",
Aliases: []string{"q"},
Usage: "name of queue to use",
Value: "default",
},
&cli.IntFlag{
Name: "page",
Usage: "page number to retrieve",
Value: 1,
Name: "page",
Aliases: []string{"p"},
Usage: "page number to retrieve",
Value: 1,
},
&cli.IntFlag{
Name: "size",
Usage: "page size to use",
Value: 50,
Name: "size",
Aliases: []string{"s"},
Usage: "page size to use",
Value: 50,
},
},
Action: func(ctx *cli.Context) error {
Expand All @@ -124,19 +129,22 @@ func NewTaskCommand() *cli.Command {
Aliases: []string{"p"},
Flags: []cli.Flag{
&cli.StringFlag{
Name: "queue",
Usage: "name of queue to use",
Value: "default",
Name: "queue",
Aliases: []string{"q"},
Usage: "name of queue to use",
Value: "default",
},
&cli.IntFlag{
Name: "page",
Usage: "page number to retrieve",
Value: 1,
Name: "page",
Aliases: []string{"p"},
Usage: "page number to retrieve",
Value: 1,
},
&cli.IntFlag{
Name: "size",
Usage: "page size to use",
Value: 50,
Name: "size",
Aliases: []string{"s"},
Usage: "page size to use",
Value: 50,
},
},
Action: func(ctx *cli.Context) error {
Expand All @@ -149,19 +157,22 @@ func NewTaskCommand() *cli.Command {
Aliases: []string{"ar"},
Flags: []cli.Flag{
&cli.StringFlag{
Name: "queue",
Usage: "name of queue to use",
Value: "default",
Name: "queue",
Aliases: []string{"q"},
Usage: "name of queue to use",
Value: "default",
},
&cli.IntFlag{
Name: "page",
Usage: "page number to retrieve",
Value: 1,
Name: "page",
Aliases: []string{"p"},
Usage: "page number to retrieve",
Value: 1,
},
&cli.IntFlag{
Name: "size",
Usage: "page size to use",
Value: 50,
Name: "size",
Aliases: []string{"s"},
Usage: "page size to use",
Value: 50,
},
},
Action: func(ctx *cli.Context) error {
Expand All @@ -173,19 +184,22 @@ func NewTaskCommand() *cli.Command {
Usage: "list completed tasks",
Flags: []cli.Flag{
&cli.StringFlag{
Name: "queue",
Usage: "name of queue to use",
Value: "default",
Name: "queue",
Aliases: []string{"q"},
Usage: "name of queue to use",
Value: "default",
},
&cli.IntFlag{
Name: "page",
Usage: "page number to retrieve",
Value: 1,
Name: "page",
Aliases: []string{"p"},
Usage: "page number to retrieve",
Value: 1,
},
&cli.IntFlag{
Name: "size",
Usage: "page size to use",
Value: 50,
Name: "size",
Aliases: []string{"s"},
Usage: "page size to use",
Value: 50,
},
},
Action: func(ctx *cli.Context) error {
Expand All @@ -198,19 +212,22 @@ func NewTaskCommand() *cli.Command {
Aliases: []string{"r"},
Flags: []cli.Flag{
&cli.StringFlag{
Name: "queue",
Usage: "name of queue to use",
Value: "default",
Name: "queue",
Aliases: []string{"q"},
Usage: "name of queue to use",
Value: "default",
},
&cli.IntFlag{
Name: "page",
Usage: "page number to retrieve",
Value: 1,
Name: "page",
Aliases: []string{"p"},
Usage: "page number to retrieve",
Value: 1,
},
&cli.IntFlag{
Name: "size",
Usage: "page size to use",
Value: 50,
Name: "size",
Aliases: []string{"s"},
Usage: "page size to use",
Value: 50,
},
},
Action: func(ctx *cli.Context) error {
Expand All @@ -223,19 +240,22 @@ func NewTaskCommand() *cli.Command {
Aliases: []string{"s"},
Flags: []cli.Flag{
&cli.StringFlag{
Name: "queue",
Usage: "name of queue to use",
Value: "default",
Name: "queue",
Aliases: []string{"q"},
Usage: "name of queue to use",
Value: "default",
},
&cli.IntFlag{
Name: "page",
Usage: "page number to retrieve",
Value: 1,
Name: "page",
Aliases: []string{"p"},
Usage: "page number to retrieve",
Value: 1,
},
&cli.IntFlag{
Name: "size",
Usage: "page size to use",
Value: 50,
Name: "size",
Aliases: []string{"s"},
Usage: "page size to use",
Value: 50,
},
},
Action: func(ctx *cli.Context) error {
Expand All @@ -249,17 +269,28 @@ func NewTaskCommand() *cli.Command {
Flags: []cli.Flag{
&cli.StringFlag{
Name: "task",
Aliases: []string{"t"},
Usage: "name of task to enqueue",
Required: true,
},
&cli.StringFlag{
Name: "payload",
Usage: "task payload",
},
&cli.PathFlag{
Name: "payload-file",
Usage: "path to a payload file",
},
&cli.StringFlag{
Name: "queue",
Usage: "name of queue to use",
Value: "default",
Name: "queue",
Aliases: []string{"q"},
Usage: "name of queue to use",
Value: "default",
},
&cli.DurationFlag{
Name: "timeout",
Usage: "set timeout for task",
Value: 30 * time.Minute,
},
},
Action: func(ctx *cli.Context) error {
Expand All @@ -268,16 +299,23 @@ func NewTaskCommand() *cli.Command {
defer client.Close()

taskName := ctx.String("task")
timeout := ctx.Duration("timeout")
queue := ctx.String("queue")
payloadFile := ctx.String("payload")

_, ok := registry.TaskRegistry.Get(taskName)
if !ok {
return fmt.Errorf("Task %q not found in the registry", taskName)
}

var payload []byte
if payloadFile != "" {
payloadData := ctx.String("payload")
payloadFile := ctx.Path("payload-file")
switch {
case payloadData != "" && payloadFile != "":
return fmt.Errorf("Cannot use --payload and --payload-file at the same time")
case payloadData != "":
payload = []byte(payloadData)
case payloadFile != "":
data, err := os.ReadFile(payloadFile)
if err != nil {
return fmt.Errorf("Cannot read payload file: %w", err)
Expand All @@ -286,7 +324,11 @@ func NewTaskCommand() *cli.Command {
}

task := asynq.NewTask(taskName, payload)
info, err := client.EnqueueContext(ctx.Context, task, asynq.Queue(queue))
opts := []asynq.Option{
asynq.Queue(queue),
asynq.Timeout(timeout),
}
info, err := client.EnqueueContext(ctx.Context, task, opts...)
if err != nil {
return fmt.Errorf("Cannot enqueue %q task: %w", taskName, err)
}
Expand All @@ -301,9 +343,10 @@ func NewTaskCommand() *cli.Command {
Aliases: []string{"i"},
Flags: []cli.Flag{
&cli.StringFlag{
Name: "queue",
Usage: "name of queue to use",
Value: "default",
Name: "queue",
Aliases: []string{"q"},
Usage: "name of queue to use",
Value: "default",
},
&cli.StringFlag{
Name: "id",
Expand Down
77 changes: 77 additions & 0 deletions pkg/common/tasks/command.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,77 @@
// SPDX-FileCopyrightText: 2025 SAP SE or an SAP affiliate company and Gardener contributors
//
// SPDX-License-Identifier: Apache-2.0

package tasks

import (
"context"
"errors"
"os/exec"

"github.com/hibiken/asynq"

"github.com/gardener/inventory/pkg/core/registry"
asynqutils "github.com/gardener/inventory/pkg/utils/asynq"
)

// ErrNoCommand is an error which is returned when the task for executing
// external commands was called without specifying a command as part of the
// payload.
var ErrNoCommand = errors.New("no command specified")

const (
// CommandTaskType is the name of the task for executing external
// commands.
CommandTaskType = "common:task:command"
)

// CommandPayload represents the payload of the task for executing external
// commands.
type CommandPayload struct {
// Command specifies the path to the command to be executed
Command string `yaml:"command" json:"command"`

// Args specifies any optional arguments to be passed to the command.
Args []string `yaml:"args" json:"args"`

// Dir specifies the working directory of the command. If not specified
// then the external command will be executed in the calling process'
// current directory.
Dir string `yaml:"dir" json:"dir"`
}

// HandleCommandTask executes the command specified as part of the payload.
func HandleCommandTask(ctx context.Context, task *asynq.Task) error {
data := task.Payload()
var payload CommandPayload
if err := asynqutils.Unmarshal(data, &payload); err != nil {
return asynqutils.SkipRetry(err)
}

if payload.Command == "" {
return asynqutils.SkipRetry(ErrNoCommand)
}

path, err := exec.LookPath(payload.Command)
if err != nil {
return asynqutils.SkipRetry(err)
}

logger := asynqutils.GetLogger(ctx)
logger.Info(
"executing command",
"command", path,
"args", payload.Args,
"dir", payload.Dir,
)

cmd := exec.CommandContext(ctx, path, payload.Args...)
cmd.Dir = payload.Dir

return cmd.Run()
}

func init() {
registry.TaskRegistry.MustRegister(CommandTaskType, asynq.HandlerFunc(HandleCommandTask))
}
Loading