Skip to content

Commit

Permalink
longer TerminationGracePeriodSeconds
Browse files Browse the repository at this point in the history
Signed-off-by: shaoyue.chen <[email protected]>
  • Loading branch information
haorenfsa committed Jan 15, 2024
1 parent bfcbe1f commit 5e57619
Show file tree
Hide file tree
Showing 7 changed files with 177 additions and 6 deletions.
38 changes: 36 additions & 2 deletions pkg/controllers/component_condition.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,8 +3,10 @@ package controllers
import (
"context"
"fmt"
"time"

"github.com/milvus-io/milvus-operator/apis/milvus.io/v1beta1"
"github.com/milvus-io/milvus-operator/pkg/util/rest"
"github.com/pkg/errors"
appsv1 "k8s.io/api/apps/v1"
corev1 "k8s.io/api/core/v1"
Expand All @@ -24,7 +26,7 @@ func (c ComponentConditionGetterImpl) GetMilvusInstanceCondition(ctx context.Con
if mc.Spec.IsStopping() {
reason := v1beta1.ReasonMilvusStopping
msg := MessageMilvusStopped
stopped, err := CheckMilvusStopped(ctx, cli, mc)
stopped, err := CheckMilvusStopped(ctx, cli, mc, false)
if err != nil {
return v1beta1.MilvusCondition{}, err
}
Expand Down Expand Up @@ -158,7 +160,7 @@ func GetComponentConditionGetter() ComponentConditionGetter {

var singletonComponentConditionGetter ComponentConditionGetter = ComponentConditionGetterImpl{}

var CheckMilvusStopped = func(ctx context.Context, cli client.Client, mc v1beta1.Milvus) (bool, error) {
var CheckMilvusStopped = func(ctx context.Context, cli client.Client, mc v1beta1.Milvus, killIfTooLong bool) (bool, error) {
podList := &corev1.PodList{}
opts := &client.ListOptions{
Namespace: mc.Namespace,
Expand All @@ -171,7 +173,39 @@ var CheckMilvusStopped = func(ctx context.Context, cli client.Client, mc v1beta1
return false, err
}
if len(podList.Items) > 0 {
if killIfTooLong {
return false, ExecKillIfTerminatingTooLong(ctx, podList)
}

Check warning on line 178 in pkg/controllers/component_condition.go

View check run for this annotation

Codecov / codecov/patch

pkg/controllers/component_condition.go#L177-L178

Added lines #L177 - L178 were not covered by tests
return false, nil
}
return true, nil
}

var gracefulStopTimeout = time.Second * 30

func ExecKillIfTerminatingTooLong(ctx context.Context, podList *corev1.PodList) error {
// we use kubectl exec to kill milvus process, because tini ignore SIGKILL
cli := rest.GetRestClient()
var ret, err error
for _, pod := range podList.Items {
if pod.DeletionTimestamp == nil {
continue
}
if time.Since(pod.DeletionTimestamp.Time) < gracefulStopTimeout {
continue
}
// kill milvus process
logger := ctrl.LoggerFrom(ctx)
containerName := pod.Labels[AppLabelComponent]
logger.Info("kill milvus process", "pod", fmt.Sprintf("%s/%s", pod.Namespace, pod.Name), "container", containerName)
_, _, err = cli.Exec(ctx, pod.Namespace, pod.Name, containerName, []string{"kill", "-9", "1", "8"})
if err != nil {
logger.Error(err, "kill milvus process err", "pod", fmt.Sprintf("%s/%s", pod.Namespace, pod.Name), "container", containerName)
ret = err
}
}
if ret != nil {
return errors.Wrap(ret, "failed to kill some milvus pod")
}
return nil
}
41 changes: 41 additions & 0 deletions pkg/controllers/component_condition_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,9 +3,11 @@ package controllers
import (
"context"
"testing"
"time"

"github.com/golang/mock/gomock"
"github.com/milvus-io/milvus-operator/apis/milvus.io/v1beta1"
"github.com/milvus-io/milvus-operator/pkg/util/rest"
"github.com/pkg/errors"
"github.com/prashantv/gostub"
"github.com/stretchr/testify/assert"
Expand Down Expand Up @@ -264,3 +266,42 @@ func TestGetComponentErrorDetail(t *testing.T) {
assert.Equal(t, "creating", ret.Deployment.Message)
})
}

func TestExecKillIfTerminatingTooLong(t *testing.T) {
mockCtrl := gomock.NewController(t)
defer mockCtrl.Finish()
mockRestClient := rest.NewMockRestClient(mockCtrl)
ctx := context.Background()
rest.SetRestClient(mockRestClient)
pods := &corev1.PodList{
Items: []corev1.Pod{{}, {}},
}
t.Run("delete not sent yet", func(t *testing.T) {
err := ExecKillIfTerminatingTooLong(ctx, pods)
assert.NoError(t, err)
})

t.Run("delete sent, but not timeout", func(t *testing.T) {
pods.Items[0].DeletionTimestamp = &metav1.Time{Time: time.Now()}
pods.Items[1].DeletionTimestamp = &metav1.Time{Time: time.Now()}
err := ExecKillIfTerminatingTooLong(ctx, pods)
assert.NoError(t, err)
})

t.Run("kill ok", func(t *testing.T) {
pods.Items[0].DeletionTimestamp = &metav1.Time{Time: time.Now().Add(-time.Hour)}
pods.Items[1].DeletionTimestamp = &metav1.Time{Time: time.Now().Add(-time.Hour)}
mockRestClient.EXPECT().Exec(gomock.Any(), gomock.Any(), gomock.Any(), gomock.Any(), gomock.Any()).Return("", "", nil).Times(2)
err := ExecKillIfTerminatingTooLong(ctx, pods)
assert.NoError(t, err)
})

t.Run("kill 1 ok,1 error", func(t *testing.T) {
pods.Items[0].DeletionTimestamp = &metav1.Time{Time: time.Now().Add(-time.Hour)}
pods.Items[1].DeletionTimestamp = &metav1.Time{Time: time.Now().Add(-time.Hour)}
mockRestClient.EXPECT().Exec(gomock.Any(), gomock.Any(), gomock.Any(), gomock.Any(), gomock.Any()).Return("", "", errors.New("test")).Times(1)
mockRestClient.EXPECT().Exec(gomock.Any(), gomock.Any(), gomock.Any(), gomock.Any(), gomock.Any()).Return("", "", nil).Times(1)
err := ExecKillIfTerminatingTooLong(ctx, pods)
assert.Error(t, err)
})
}
2 changes: 1 addition & 1 deletion pkg/controllers/deployment_updater.go
Original file line number Diff line number Diff line change
Expand Up @@ -243,7 +243,7 @@ func updateSomeFieldsOnlyWhenRolling(template *corev1.PodTemplateSpec, updater d
},
}
}
template.Spec.TerminationGracePeriodSeconds = int64Ptr(300)
template.Spec.TerminationGracePeriodSeconds = int64Ptr(1800)
}

func updateSidecars(template *corev1.PodTemplateSpec, updater deploymentUpdater) {
Expand Down
2 changes: 1 addition & 1 deletion pkg/controllers/milvus_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -119,7 +119,7 @@ func (r *MilvusReconciler) Reconcile(ctx context.Context, req ctrl.Request) (ctr
}

if controllerutil.ContainsFinalizer(milvus, ForegroundDeletionFinalizer) {
stopped, err := CheckMilvusStopped(ctx, r.Client, *milvus)
stopped, err := CheckMilvusStopped(ctx, r.Client, *milvus, true)
if !stopped || err != nil {
return ctrl.Result{RequeueAfter: unhealthySyncInterval}, err
}
Expand Down
2 changes: 1 addition & 1 deletion pkg/controllers/milvus_controller_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ import (

var mockCheckMilvusStopRet = false
var mockCheckMilvusStopErr error = nil
var mockCheckMilvusStop = func(ctx context.Context, cli client.Client, mc v1beta1.Milvus) (bool, error) {
var mockCheckMilvusStop = func(ctx context.Context, cli client.Client, mc v1beta1.Milvus, kill bool) (bool, error) {
return mockCheckMilvusStopRet, mockCheckMilvusStopErr
}

Expand Down
1 change: 0 additions & 1 deletion pkg/util/k8s_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -65,7 +65,6 @@ func NewK8sClientsForConfig(config *rest.Config) (*K8sClients, error) {
if err != nil {
return nil, errors.Wrap(err, "failed to create dynamic client")
}

return &K8sClients{
ClientSet: clientSet,
ExtClientSet: extClientSet,
Expand Down
97 changes: 97 additions & 0 deletions pkg/util/rest/rest_client.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,97 @@
package rest

import (
"bytes"
"context"

"github.com/pkg/errors"
corev1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/runtime/serializer"
"k8s.io/client-go/rest"
"k8s.io/client-go/tools/remotecommand"
ctrl "sigs.k8s.io/controller-runtime"
)

//go:generate mockgen -source=./rest_client.go -destination=./rest_client_mock.go -package=rest RestClient

type RestClient interface {
// Exec exec command in pod
Exec(ctx context.Context, namespace, pod, container string, cmd []string) (stdout string, stderr string, err error)
}

type RestClientImpl struct {
restClient rest.Interface
config *rest.Config
scheme *runtime.Scheme
}

var singletonRestClient RestClient

func GetRestClient() RestClient {
return singletonRestClient
}

// SetRestClient for unit test
func SetRestClient(r RestClient) {
singletonRestClient = r
}

func init() {
config := ctrl.GetConfigOrDie()
restClient, err := newRestClientImpl(config)
if err != nil {
panic(err)
}
singletonRestClient = restClient
}

func newRestClientImpl(config *rest.Config) (*RestClientImpl, error) {
scheme := runtime.NewScheme()
if err := corev1.AddToScheme(scheme); err != nil {
return nil, errors.Wrap(err, "failed to add corev1 to scheme")
}
config.NegotiatedSerializer = serializer.NewCodecFactory(scheme)
config.GroupVersion = &corev1.SchemeGroupVersion
restClient, err := rest.RESTClientFor(config)
if err != nil {
return nil, errors.Wrap(err, "failed to create rest client")
}

return &RestClientImpl{
restClient: restClient,
config: config,
scheme: scheme,
}, nil
}

func (clis RestClientImpl) Exec(ctx context.Context, namespace, pod, container string, cmd []string) (stdout string, stderr string, err error) {
req := clis.restClient.Post().
Resource("pods").
Namespace(namespace).
Name(pod).
SubResource("exec").
VersionedParams(&corev1.PodExecOptions{
Container: container,
Command: cmd,
Stdin: false,
Stdout: true,
Stderr: true,
}, runtime.NewParameterCodec(clis.scheme))

exec, err := remotecommand.NewSPDYExecutor(clis.config, "POST", req.URL())
if err != nil {
return "", "", errors.Wrap(err, "failed to create executor")
}

var stdoutBuf, stderrBuf bytes.Buffer
err = exec.Stream(remotecommand.StreamOptions{
Stdout: &stdoutBuf,
Stderr: &stderrBuf,
})
if err != nil {
return "", "", errors.Wrap(err, "failed to exec command")
}

return stdoutBuf.String(), stderrBuf.String(), nil
}

0 comments on commit 5e57619

Please sign in to comment.