Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

*: init runner groups' result subcommand #59

Merged
merged 4 commits into from
Jan 22, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 6 additions & 1 deletion api/types/http.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,5 +2,10 @@ package types

// HTTPError is used to render response for error.
type HTTPError struct {
Error string `json:"error"`
ErrorMessage string `json:"error"`
}

// Error implements error interface.
func (herr HTTPError) Error() string {
return herr.ErrorMessage
}
4 changes: 4 additions & 0 deletions api/types/metric.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,3 +24,7 @@ type RunnerMetricReport struct {
// PercentileLatencies represents the latency distribution in seconds.
PercentileLatencies [][2]float64 `json:"percentileLatencies,omitempty"`
}

// TODO(weifu): build brand new struct for RunnerGroupsReport to include more
// information, like how many runner groups, service account and flow control.
type RunnerGroupsReport = RunnerMetricReport
41 changes: 41 additions & 0 deletions cmd/kperf/commands/runnergroup/result.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,41 @@
package runnergroup

import (
"context"
"encoding/json"
"fmt"
"os"

"github.com/Azure/kperf/api/types"
"github.com/Azure/kperf/runner"

"github.com/urfave/cli"
)

var resultCommand = cli.Command{
Name: "result",
Usage: "show the runner groups' result",
Flags: []cli.Flag{},
Action: func(cliCtx *cli.Context) error {
kubeCfgPath := cliCtx.GlobalString("kubeconfig")

res, err := runner.GetRunnerGroupResult(context.Background(), kubeCfgPath)
if err != nil {
return err
}

return renderRunnerGroupsReport(res)
},
}

// renderRunnerGroupsReport renders runner groups' report into stdio.
func renderRunnerGroupsReport(res *types.RunnerGroupsReport) error {
encoder := json.NewEncoder(os.Stdout)

encoder.SetIndent("", " ")
err := encoder.Encode(res)
if err != nil {
return fmt.Errorf("failed to encode json: %w", err)
}
return nil
}
12 changes: 0 additions & 12 deletions cmd/kperf/commands/runnergroup/root.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,15 +36,3 @@ var waitCommand = cli.Command{
return fmt.Errorf("wait - not implemented")
},
}

var resultCommand = cli.Command{
Name: "result",
Usage: "show the result",
Flags: []cli.Flag{},
Action: func(cliCtx *cli.Context) error {
// 1. Check the progress tracker name
// 2. Ensure the jobs finished
// 3. Output the result
return fmt.Errorf("result - not implemented")
},
}
151 changes: 151 additions & 0 deletions portforward/portforward.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,151 @@
package portforward

import (
"context"
"fmt"
"net/http"
"net/url"
"time"

corev1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/client-go/kubernetes"
"k8s.io/client-go/rest"
"k8s.io/client-go/tools/clientcmd"
kubepf "k8s.io/client-go/tools/portforward"
"k8s.io/client-go/transport/spdy"
"k8s.io/klog/v2"
)

// PodPortForwarder is used to forward traffic to specific pod's TCP port from
// local listener.
type PodPortForwarder struct {
// targetPort is the target TCP port.
targetPort uint16
// portforwardURL is the pod's portforward URL.
portforwardURL *url.URL
// restCfg is used to create spdy transport.
restCfg *rest.Config

portForwarder *kubepf.PortForwarder
}

// NewPodPortForwarder return a new instance of PodPortForwarder.
func NewPodPortForwarder(kubeCfgPath string, namespace, podName string, targetPort uint16) (*PodPortForwarder, error) {
restCfg, err := clientcmd.BuildConfigFromFlags("", kubeCfgPath)
if err != nil {
return nil, err
}
restCfg.ContentType = "application/vnd.kubernetes.protobuf"

restCli, err := kubernetes.NewForConfig(restCfg)
if err != nil {
return nil, err
}

if err := ensurePodIsRunning(restCli, namespace, podName); err != nil {
return nil, err
}

u := restCli.CoreV1().RESTClient().Post().
Namespace(namespace).
Resource("pods").
Name(podName).
SubResource("portforward").URL()

return &PodPortForwarder{
targetPort: targetPort,
portforwardURL: u,
restCfg: restCfg,
}, nil
}

// Start is to start local listener to forward traffic.
func (pf *PodPortForwarder) Start() error {
transport, upgrader, err := spdy.RoundTripperFor(pf.restCfg)
if err != nil {
return fmt.Errorf("failed to create spdy transport: %w", err)
}

dialer := spdy.NewDialer(
upgrader,
&http.Client{Transport: transport},
"POST",
pf.portforwardURL,
)

startCh := make(chan struct{})

// pick available local port randomly.
kubePortForwarder, err := kubepf.New(
dialer,
[]string{fmt.Sprintf("0:%d", pf.targetPort)},
nil,
startCh,
&debugLogger{},
&debugLogger{},
)
if err != nil {
return fmt.Errorf("failed to init kube port forward: %w", err)
}

errCh := make(chan error, 1)
go func() {
errCh <- kubePortForwarder.ForwardPorts()
}()

select {
case <-startCh:
case err := <-errCh:
return fmt.Errorf("failed to start kube port forward: %w", err)
case <-time.After(120 * time.Second):
return fmt.Errorf("timeout to start kube port forward")
}

pf.portForwarder = kubePortForwarder
return nil
}

// GetLocalPort returns the local listener's port.
func (pf *PodPortForwarder) GetLocalPort() (uint16, error) {
if pf.portForwarder == nil {
return 0, fmt.Errorf("kube port forwarder doesn't start")
}

ports, err := pf.portForwarder.GetPorts()
if err != nil {
return 0, fmt.Errorf("failed to get local port: %w", err)
}
return ports[0].Local, nil
}

// Stop stops port forward.
func (pf *PodPortForwarder) Stop() {
defer klog.Flush()
if pf.portForwarder != nil {
pf.portForwarder.Close()
}
}

// ensurePodIsRunning is to check if the target pod is still running.
func ensurePodIsRunning(restCli kubernetes.Interface, namespace, podName string) error {
pod, err := restCli.CoreV1().
Pods(namespace).
Get(context.TODO(), podName, metav1.GetOptions{})
if err != nil {
return fmt.Errorf("failed to ensure if %s in %s exists: %w",
podName, namespace, err)
}

if pod.Status.Phase != corev1.PodRunning {
return fmt.Errorf("unable to forward port because pod is not running (status=%s)", pod.Status.Phase)
}
return nil
}

type debugLogger struct{}

func (l *debugLogger) Write(data []byte) (int, error) {
klog.V(2).InfoS(string(data))
return len(data), nil
}
23 changes: 23 additions & 0 deletions runner/runnergroup_common.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
package runner

var (
// runnerGroupReleaseLabels is used to mark that helm chart release
// is managed by kperf.
runnerGroupReleaseLabels = map[string]string{
"runnergroups.kperf.io/managed": "true",
}
)

const (
// runnerGroupServerChartName should be aligned with ../manifests/runnergroup/server.
runnerGroupServerChartName = "runnergroup/server"

// runnerGroupServerReleaseName is the helm releas name for runner groups's server.
runnerGroupServerReleaseName = "runnergroup-server"

// runnerGroupServerPort should be aligned with ../manifests/runnergroup/server/templates/pod.yaml.
runnerGroupServerPort uint16 = 8080

// runnerGroupReleaseNamespace is used to host runner groups.
runnerGroupReleaseNamespace = "runnergroups-kperf-io"
)
74 changes: 74 additions & 0 deletions runner/runnergroup_result.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,74 @@
package runner

import (
"context"
"encoding/json"
"fmt"
"io"
"net/http"

"github.com/Azure/kperf/api/types"
"github.com/Azure/kperf/portforward"
)

// GetRunnerGroupResult gets runner group's aggregated report.
func GetRunnerGroupResult(_ context.Context, kubecfgPath string) (*types.RunnerGroupsReport, error) {
pf, err := portforward.NewPodPortForwarder(
kubecfgPath,
runnerGroupReleaseNamespace,
runnerGroupServerReleaseName,
runnerGroupServerPort,
)
if err != nil {
return nil, fmt.Errorf("failed to init pod portforward: %w", err)
}
defer pf.Stop()

if err = pf.Start(); err != nil {
return nil, fmt.Errorf("failed to start pod port forward: %w", err)
}

localPort, err := pf.GetLocalPort()
if err != nil {
return nil, fmt.Errorf("failed to get local port: %w", err)
}

targetURL := fmt.Sprintf("http://localhost:%d/v1/runnergroups/summary", localPort)

// FIXME(weifu): cleanup nolint
//nolint:gosec
resp, err := http.Get(targetURL)
if err != nil {
return nil, fmt.Errorf("failed to access %s by portforward: %w", targetURL, err)
}
defer resp.Body.Close()

if resp.StatusCode != http.StatusOK {
errInRaw, err := io.ReadAll(resp.Body)
if err != nil {
return nil, fmt.Errorf("failed to read error message when http code = %v: %w",
resp.Status, err)
}

herr := types.HTTPError{}
err = json.Unmarshal(errInRaw, &herr)
if err != nil {
return nil, fmt.Errorf("failed to get error when http code = %v: %w",
resp.Status, err)
}
return nil, herr
}

dataInRaw, err := io.ReadAll(resp.Body)
if err != nil {
return nil, fmt.Errorf("failed to read data: %w", err)
}

res := types.RunnerGroupsReport{}
err = json.Unmarshal(dataInRaw, &res)
if err != nil {
return nil, fmt.Errorf("failed to unmarshal to get result: %w\n\n%s",
err, string(dataInRaw))
}
return &res, nil
}
22 changes: 0 additions & 22 deletions runner/runnergroup_run.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,28 +12,6 @@ import (
"gopkg.in/yaml.v2"
)

var (
// runnerGroupReleaseLabels is used to mark that helm chart release
// is managed by kperf.
runnerGroupReleaseLabels = map[string]string{
"runnergroups.kperf.io/managed": "true",
}
)

const (
// runnerGroupServerChartName should be aligned with ../manifests/runnergroup/server.
runnerGroupServerChartName = "runnergroup/server"

// runnerGroupServerReleaseName is the helm releas name for runner groups's server.
runnerGroupServerReleaseName = "runnergroup-server"

// runnerGroupServerPort should be aligned with ../manifests/runnergroup/server/templates/pod.yaml.
// runnerGroupServerPort = 8080

// runnerGroupReleaseNamespace is used to host runner groups.
runnerGroupReleaseNamespace = "runnergroups-kperf-io"
)

// CreateRunnerGroupServer creates a long running server to deploy runner groups.
//
// TODO:
Expand Down
1 change: 1 addition & 0 deletions runner/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,7 @@ func (s *Server) Run() error {

r := mux.NewRouter()
r.HandleFunc("/v1/runnergroups", s.listRunnerGroupsHandler).Methods("GET")
// NOTE: Please update ./runnergroup_result.go if endpoint has been changed.
r.HandleFunc("/v1/runnergroups/summary", s.getRunnerGroupsSummary).Methods("GET")
r.HandleFunc("/v1/runnergroups/{runner_name}/result", s.postRunnerGroupsRunnerResult).Methods("POST")

Expand Down
2 changes: 1 addition & 1 deletion runner/utils.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ func renderErrorResponse(w http.ResponseWriter, code int, err error) {
w.WriteHeader(code)

data, _ := json.Marshal(types.HTTPError{
Error: err.Error(),
ErrorMessage: err.Error(),
})
_, _ = w.Write(data)
}
Expand Down