Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix(v2 backend): remove unnecessary sync from persistent agent #1417

Merged
merged 2 commits into from
Nov 22, 2023
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
181 changes: 2 additions & 179 deletions backend/src/common/util/pipelinerun.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,8 +32,6 @@ import (
"github.com/pkg/errors"
log "github.com/sirupsen/logrus"
pipelineapi "github.com/tektoncd/pipeline/pkg/apis/pipeline/v1"
pipelineapiv1beta1 "github.com/tektoncd/pipeline/pkg/apis/pipeline/v1beta1"
customRun "github.com/tektoncd/pipeline/pkg/apis/run/v1beta1"
prclientset "github.com/tektoncd/pipeline/pkg/client/clientset/versioned"
prclientv1 "github.com/tektoncd/pipeline/pkg/client/clientset/versioned/typed/pipeline/v1"
prsinformers "github.com/tektoncd/pipeline/pkg/client/informers/externalversions"
Expand Down Expand Up @@ -945,13 +943,9 @@ func (pri *PipelineRunInformer) Get(namespace string, name string) (ExecutionSpe
"Error retrieving PipelineRun (%v) in namespace (%v): %v", name, namespace, err)
}
newWorkflow := NewPipelineRun(pipelinerun)
if err := pri.getStatusFromChildReferences(namespace,
fmt.Sprintf("%s=%s", LabelKeyWorkflowRunId, pipelinerun.Labels[LabelKeyWorkflowRunId]),
newWorkflow.Status); err != nil {

return nil, IsNotFound(err), errors.Wrapf(err,
"Error retrieving the Status of the PipelineRun (%v) in namespace (%v): %v", name, namespace, err)
}
// Reduce newWorkflow size
newWorkflow.Spec = pipelineapi.PipelineRunSpec{}
return newWorkflow, false, nil
}

Expand All @@ -971,174 +965,3 @@ func (pri *PipelineRunInformer) List(labels *labels.Selector) (ExecutionSpecList
func (pri *PipelineRunInformer) InformerFactoryStart(stopCh <-chan struct{}) {
pri.factory.Start(stopCh)
}

func (pri *PipelineRunInformer) getStatusFromChildReferences(namespace, selector string, status TektonStatus) error {
if status.ChildReferences == nil {
return nil
}

hasTaskRun, hasCustomRun := false, false
for _, child := range status.ChildReferences {
switch child.Kind {
case "TaskRun":
hasTaskRun = true
case "CustomRun":
hasCustomRun = true
default:
}
}
// TODO: restruct the workflow to contain taskrun/run status, these 2 field
// will be removed in the future
if hasTaskRun {
// fetch taskrun status and insert into Status.TaskRuns
taskruns, err := pri.clientset.TektonV1().TaskRuns(namespace).List(context.Background(), metav1.ListOptions{
LabelSelector: selector,
})
if err != nil {
return NewInternalServerError(err, "can't fetch taskruns")
}

taskrunStatuses := make(map[string]*pipelineapi.PipelineRunTaskRunStatus, len(taskruns.Items))
for _, taskrun := range taskruns.Items {
taskrunStatuses[taskrun.Name] = &pipelineapi.PipelineRunTaskRunStatus{
PipelineTaskName: taskrun.Labels["tekton.dev/pipelineTask"],
Status: taskrun.Status.DeepCopy(),
}
}
status.TaskRuns = taskrunStatuses
}
if hasCustomRun {
customRuns, err := pri.clientset.TektonV1beta1().CustomRuns(namespace).List(context.Background(), metav1.ListOptions{
LabelSelector: selector,
})
if err != nil {
return NewInternalServerError(err, "can't fetch runs")
}
customRunStatuses := make(map[string]*pipelineapi.PipelineRunRunStatus, len(customRuns.Items))
for _, customRun := range customRuns.Items {
customRunStatus := customRun.Status.DeepCopy()
customRunStatuses[customRun.Name] = &pipelineapi.PipelineRunRunStatus{
PipelineTaskName: customRun.Labels["tekton.dev/pipelineTask"],
Status: customRunStatus,
}
// handle nested status
pri.handleNestedStatusV1beta1(&customRun, customRunStatus, namespace)
}
if status.Runs == nil {
status.Runs = customRunStatuses
} else {
for n, v := range customRunStatuses {
status.Runs[n] = v
}
}
}
return nil
}

// handle nested status case for specific types of Run
func (pri *PipelineRunInformer) handleNestedStatusV1beta1(customRun *pipelineapiv1beta1.CustomRun, customRunStatus *customRun.CustomRunStatus, namespace string) {
var kind string
if customRun.Spec.CustomSpec != nil {
kind = customRun.Spec.CustomSpec.Kind
} else if customRun.Spec.CustomRef != nil {
kind = string(customRun.Spec.CustomRef.Kind)
}
if sort.SearchStrings(childReferencesKinds, kind) >= len(childReferencesKinds) {
return
}

// need to lookup the nested status
obj := make(map[string]interface{})
if err := json.Unmarshal(customRunStatus.ExtraFields.Raw, &obj); err != nil {
return
}
if pri.updateExtraFields(obj, namespace) {
if newStatus, err := json.Marshal(obj); err == nil {
customRunStatus.ExtraFields.Raw = newStatus
}
}
}

// check ExtraFields and update nested status if needed
func (pri *PipelineRunInformer) updateExtraFields(obj map[string]interface{}, namespace string) bool {
updated := false
val, ok := obj["pipelineRuns"]
if !ok {
return false
}
prs, ok := val.(map[string]interface{})
if !ok {
return false
}
// go through the list of pipelineRuns
for _, val := range prs {
probj, ok := val.(map[string]interface{})
if !ok {
continue
}
val, ok := probj["status"]
if !ok {
continue
}
statusobj, ok := val.(map[string]interface{})
if !ok {
continue
}
childRef, ok := statusobj["childReferences"]
if !ok {
continue
}
if children, ok := childRef.([]interface{}); ok {
for _, childObj := range children {
if child, ok := childObj.(map[string]interface{}); ok {
kindI, ok := child["kind"]
if !ok {
continue
}
nameI, ok := child["name"]
if !ok {
continue
}
kind := fmt.Sprintf("%v", kindI)
name := fmt.Sprintf("%v", nameI)
if kind == "TaskRun" {
if taskrunCR, err := pri.clientset.TektonV1().TaskRuns(namespace).Get(context.Background(), name, metav1.GetOptions{}); err == nil {
taskruns, ok := statusobj["taskRuns"]
if !ok {
taskruns = make(map[string]*pipelineapi.PipelineRunTaskRunStatus)
}
if taskrunStatus, ok := taskruns.(map[string]*pipelineapi.PipelineRunTaskRunStatus); ok {
taskrunStatus[name] = &pipelineapi.PipelineRunTaskRunStatus{
PipelineTaskName: taskrunCR.Labels["tekton.dev/pipelineTask"],
Status: taskrunCR.Status.DeepCopy(),
}
statusobj["taskRuns"] = taskrunStatus
updated = true
}
}
} else if kind == "CustomRun" {
if customRunsCR, err := pri.clientset.TektonV1beta1().CustomRuns(namespace).Get(context.Background(), name, metav1.GetOptions{}); err == nil {
customRuns, ok := statusobj["customRuns"]
if !ok {
customRuns = make(map[string]*pipelineapi.PipelineRunRunStatus)
}
if customRunStatus, ok := customRuns.(map[string]*pipelineapi.PipelineRunRunStatus); ok {
customRunStatusStatus := customRunsCR.Status.DeepCopy()
customRunStatus[name] = &pipelineapi.PipelineRunRunStatus{
PipelineTaskName: customRunsCR.Labels["tekton.dev/pipelineTask"],
Status: customRunStatusStatus,
}
statusobj["customRuns"] = customRunStatus
// handle nested status recursively
pri.handleNestedStatusV1beta1(customRunsCR, customRunStatusStatus, namespace)
updated = true
}
}
}
}
}
}

}
return updated
}