Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add apply kubeyaml rpc to KNE grpc server #464

Merged
merged 6 commits into from
Dec 1, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
11 changes: 11 additions & 0 deletions controller/server/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -331,6 +331,17 @@ func (s *server) ShowCluster(ctx context.Context, req *cpb.ShowClusterRequest) (
return &cpb.ShowClusterResponse{State: cpb.ClusterState_CLUSTER_STATE_RUNNING}, nil
}

func (s *server) ApplyCluster(ctx context.Context, req *cpb.ApplyClusterRequest) (*cpb.ApplyClusterResponse, error) {
log.Infof("Received ApplyCluster request: %v", req)
s.muDeploy.Lock()
defer s.muDeploy.Unlock()
d, ok := s.deployments[req.GetName()]
if !ok {
return nil, status.Errorf(codes.NotFound, "cluster %q not found, can only apply config to clusters created using TopologyManager", req.GetName())
}
return nil, d.Cluster.Apply(req.GetConfig())
}

func (s *server) CreateTopology(ctx context.Context, req *cpb.CreateTopologyRequest) (*cpb.CreateTopologyResponse, error) {
log.Infof("Received CreateTopology request: %v", req)
topoPb := req.GetTopology()
Expand Down
87 changes: 52 additions & 35 deletions deploy/deploy.go
Original file line number Diff line number Diff line change
Expand Up @@ -51,57 +51,61 @@ var (
setPIDMaxScript = filepath.Join(homedir.HomeDir(), "kne-internal", "set_pid_max.sh")
pullRetryDelay = time.Second
poolRetryDelay = 5 * time.Second
logInfo = log.Info
logWarning = log.Warning
)

func runCommand(writeLogs bool, in []byte, cmd string, args ...string) ([]byte, error) {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nice reduction in duplicated code!

c := kexec.Command(cmd, args...)
var out bytes.Buffer
c.SetStdout(&out)
if writeLogs {
outLog := logshim.New(func(v ...interface{}) {
logInfo(append([]interface{}{"(" + cmd + "): "}, v...)...)
})
errLog := logshim.New(func(v ...interface{}) {
logWarning(append([]interface{}{"(" + cmd + "): "}, v...)...)
})
defer func() {
outLog.Close()
errLog.Close()
}()
c.SetStdout(io.MultiWriter(outLog, &out))
c.SetStderr(io.MultiWriter(errLog, &out))
}
if len(in) > 0 {
c.SetStdin(bytes.NewReader(in))
}
err := c.Run()
return out.Bytes(), err
}

// logCommand runs the specified command but records standard output
// with log.Info and standard error with log.Warning.
func logCommand(cmd string, args ...string) error {
c := kexec.Command(cmd, args...)
outLog := logshim.New(func(v ...interface{}) {
log.Info(append([]interface{}{"(" + cmd + "): "}, v...)...)
})
errLog := logshim.New(func(v ...interface{}) {
log.Warning(append([]interface{}{"(" + cmd + "): "}, v...)...)
})
defer func() {
outLog.Close()
errLog.Close()
}()
c.SetStdout(outLog)
c.SetStderr(errLog)
return c.Run()
_, err := runCommand(true, nil, cmd, args...)
return err
}

// logCommandWithInput runs the specified command but records standard output
// with log.Info and standard error with log.Warning. in is sent to
// the standard input of the command.
func logCommandWithInput(in []byte, cmd string, args ...string) error {
alexmasi marked this conversation as resolved.
Show resolved Hide resolved
alexmasi marked this conversation as resolved.
Show resolved Hide resolved
_, err := runCommand(true, in, cmd, args...)
return err
}

// outLogCommand runs the specified command but records standard output
// with log.Info and standard error with log.Warning. Standard output
// and standard error are also returned.
func outLogCommand(cmd string, args ...string) ([]byte, error) {
c := kexec.Command(cmd, args...)
outLog := logshim.New(func(v ...interface{}) {
log.Info(append([]interface{}{"(" + cmd + "): "}, v...)...)
})
errLog := logshim.New(func(v ...interface{}) {
log.Warning(append([]interface{}{"(" + cmd + "): "}, v...)...)
})
defer func() {
outLog.Close()
errLog.Close()
}()
var out bytes.Buffer
c.SetStdout(io.MultiWriter(outLog, &out))
c.SetStderr(io.MultiWriter(errLog, &out))
err := c.Run()
return out.Bytes(), err
return runCommand(true, nil, cmd, args...)
}

// outCommand runs the specified command and returns any standard output
// as well as any errors.
func outCommand(cmd string, args ...string) ([]byte, error) {
c := kexec.Command(cmd, args...)
var stdout bytes.Buffer
c.SetStdout(&stdout)
err := c.Run()
return stdout.Bytes(), err
return runCommand(false, nil, cmd, args...)
}

var (
Expand All @@ -118,6 +122,7 @@ type Cluster interface {
Healthy() error
GetName() string
GetDockerNetworkResourceName() string
Apply([]byte) error
}

type Ingress interface {
Expand Down Expand Up @@ -429,6 +434,14 @@ func (e *ExternalSpec) GetDockerNetworkResourceName() string {
return e.Network
}

func (e *ExternalSpec) Apply(cfg []byte) error {
return kubectlApply(cfg)
}

func kubectlApply(cfg []byte) error {
return logCommandWithInput(cfg, "kubectl", "apply", "-f", "-")
}

func init() {
load.Register("Kind", &load.Spec{
Type: KindSpec{},
Expand Down Expand Up @@ -654,6 +667,10 @@ func (k *KindSpec) GetDockerNetworkResourceName() string {
return "kind"
}

func (k *KindSpec) Apply(cfg []byte) error {
return kubectlApply(cfg)
}

func (k *KindSpec) setupGoogleArtifactRegistryAccess(ctx context.Context) error {
// Create a temporary dir to hold a new docker config that lacks credsStore.
// Then use `docker login` to store the generated credentials directly in
Expand Down
153 changes: 153 additions & 0 deletions deploy/deploy_test.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package deploy

import (
"bytes"
"context"
"fmt"
"testing"
Expand Down Expand Up @@ -39,6 +40,158 @@ func init() {
klog.LogToStderr(false)
}

func TestRunCommand(t *testing.T) {
tests := []struct {
desc string
writeLogs bool
in string
cmd string
args []string
resp []fexec.Response
want string
wantInfos string
wantWarnings string
wantErr string
}{{
desc: "log no input",
writeLogs: true,
cmd: "echo",
args: []string{"hello"},
resp: []fexec.Response{
{Cmd: "echo", Args: []string{"hello"}, Stdout: "hello"},
},
want: "hello",
wantInfos: "(echo): hello",
}, {
desc: "log no input with stderr",
writeLogs: true,
cmd: "echo",
args: []string{"hello"},
resp: []fexec.Response{
{Cmd: "echo", Args: []string{"hello"}, Stdout: "hello", Stderr: "err"},
},
want: "helloerr",
wantInfos: "(echo): hello",
wantWarnings: "(echo): err",
}, {
desc: "log with input",
writeLogs: true,
in: "echo hello",
cmd: "cat",
resp: []fexec.Response{
{Cmd: "cat", Stdout: "hello"},
},
want: "hello",
wantInfos: "(cat): hello",
}, {
desc: "no log no input",
cmd: "echo",
args: []string{"hello"},
resp: []fexec.Response{
{Cmd: "echo", Args: []string{"hello"}, Stdout: "hello"},
},
want: "hello",
}, {
desc: "no log with input",
in: "echo hello",
cmd: "cat",
resp: []fexec.Response{
{Cmd: "cat", Stdout: "hello"},
},
want: "hello",
}, {
desc: "failed command",
cmd: "false",
resp: []fexec.Response{
{Cmd: "false", Err: "failure"},
},
wantErr: "failure",
}}
for _, tt := range tests {
t.Run(tt.desc, func(t *testing.T) {
if verbose {
fexec.LogCommand = func(s string) {
t.Logf("%s: %s", tt.desc, s)
}
}
cmds := fexec.Commands(tt.resp)
kexec.Command = cmds.Command
defer checkCmds(t, cmds)

origLogInfo := logInfo
defer func() {
logInfo = origLogInfo
}()
var infos bytes.Buffer
logInfo = func(args ...interface{}) {
fmt.Fprint(&infos, args...)
}

origLogWarning := logWarning
defer func() {
logWarning = origLogWarning
}()
var warnings bytes.Buffer
logWarning = func(args ...interface{}) {
fmt.Fprint(&warnings, args...)
}

got, err := runCommand(tt.writeLogs, []byte(tt.in), tt.cmd, tt.args...)
if s := errdiff.Substring(err, tt.wantErr); s != "" {
t.Fatalf("unexpected error: %s", s)
}
if string(got) != tt.want {
t.Errorf("runCommand() got output %v, want %v", string(got), tt.want)
}
if string(infos.Bytes()) != tt.wantInfos {
t.Errorf("runCommand() got info logs %v, want %v", string(infos.Bytes()), tt.wantInfos)
}
if string(warnings.Bytes()) != tt.wantWarnings {
t.Errorf("runCommand() got warning logs %v, want %v", string(warnings.Bytes()), tt.wantWarnings)
}
})
}
}

func TestKubectlApply(t *testing.T) {
tests := []struct {
desc string
cfg string
resp []fexec.Response
wantErr string
}{{
desc: "success",
cfg: "valid kubeyaml",
resp: []fexec.Response{
{Cmd: "kubectl", Args: []string{"apply", "-f", "-"}},
},
}, {
desc: "failed to apply",
cfg: "invalid",
resp: []fexec.Response{
{Cmd: "kubectl", Args: []string{"apply", "-f", "-"}, Err: "invalid yaml"},
},
wantErr: "invalid yaml",
}}
for _, tt := range tests {
t.Run(tt.desc, func(t *testing.T) {
if verbose {
fexec.LogCommand = func(s string) {
t.Logf("%s: %s", tt.desc, s)
}
}
cmds := fexec.Commands(tt.resp)
kexec.Command = cmds.Command
defer checkCmds(t, cmds)

err := kubectlApply([]byte(tt.cfg))
if s := errdiff.Substring(err, tt.wantErr); s != "" {
t.Fatalf("unexpected error: %s", s)
}
})
}
}

func TestKindSpec(t *testing.T) {
ctx := context.Background()

Expand Down
5 changes: 5 additions & 0 deletions exec/exec.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ import (
type Cmd interface {
SetStdout(io.Writer) // Redirect standard output to the writer
SetStderr(io.Writer) // Redirect standard error to the writer
SetStdin(io.Reader) // Redirect standard in to the reader
Run() error // Operates the same as os/exec/Cmd.Run
}

Expand All @@ -45,7 +46,11 @@ type command struct {
}

func (c command) SetStdout(w io.Writer) { c.cmd.Stdout = w }

func (c command) SetStderr(w io.Writer) { c.cmd.Stderr = w }

func (c command) SetStdin(r io.Reader) { c.cmd.Stdin = r }

func (c command) Run() error {
if err := c.cmd.Run(); err != nil {
return fmt.Errorf("%q failed: %v", c.cmd.String(), err)
Expand Down
4 changes: 4 additions & 0 deletions exec/fake/fake.go
Original file line number Diff line number Diff line change
Expand Up @@ -80,6 +80,7 @@ type Command struct {
unexpected []Response
stdout io.Writer
stderr io.Writer
stdin io.Reader
cnt int
}

Expand All @@ -104,6 +105,9 @@ func (c *Command) SetStdout(w io.Writer) { c.stdout = w }
// Stderr sets standard err to w.
func (c *Command) SetStderr(w io.Writer) { c.stderr = w }

// Stdin sets standard in to r.
func (c *Command) SetStdin(r io.Reader) { c.stdin = r }

// LogCommand is called with the string representation of the command that is
// running. The test program can optionally set this to their own function.
var LogCommand = func(string) {}
Expand Down
14 changes: 13 additions & 1 deletion proto/controller.proto
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,8 @@ service TopologyManager {
rpc PushConfig(PushConfigRequest) returns (PushConfigResponse) {}
// Resets config of a device in a topology.
rpc ResetConfig(ResetConfigRequest) returns (ResetConfigResponse) {}
// Applies kubeyaml to a running cluster.
rpc ApplyCluster(ApplyClusterRequest) returns (ApplyClusterResponse) {}
}

// Kind cluster specifications
Expand Down Expand Up @@ -213,7 +215,7 @@ message DeleteTopologyRequest {
message DeleteTopologyResponse {
}

// Request message to view topology info
// Request message to view topology info.
message ShowTopologyRequest {
string topology_name = 1;
}
Expand Down Expand Up @@ -244,3 +246,13 @@ message ResetConfigRequest {
// Returns reset config response.
message ResetConfigResponse {
}

// Request message to apply kubeyaml to a cluster.
message ApplyClusterRequest {
string name = 1;
bytes config = 2;
}

// Returns apply cluster response.
message ApplyClusterResponse {
}
Loading
Loading