From ccfc81adf8147a81e10c6018f94110275a5e8775 Mon Sep 17 00:00:00 2001 From: suyog shinde <omshinde942@gmail.com> Date: Fri, 15 Nov 2024 13:54:06 -0600 Subject: [PATCH] Remove resourcepolicycontroller.go and it's references Remove commented code for platformcontroller.go remove commented code remove commented code remove unused fucntions GetValuesTypes, getChartValueTypes,flatten remove unused imports Remove unused imports, functions, and references; clean up commented code for improved readability. --- platform-operator/main.go | 12 +- platform-operator/platformcontroller.go | 380 ++++-------------- platform-operator/resourcepolicycontroller.go | 315 --------------- 3 files changed, 82 insertions(+), 625 deletions(-) delete mode 100644 platform-operator/resourcepolicycontroller.go diff --git a/platform-operator/main.go b/platform-operator/main.go index eecfd843..fc72ad04 100644 --- a/platform-operator/main.go +++ b/platform-operator/main.go @@ -1,10 +1,10 @@ package main import ( - "flag" - "time" "context" + "flag" "sync" + "time" "github.com/golang/glog" kubeinformers "k8s.io/client-go/informers" @@ -47,7 +47,6 @@ func main() { kubeInformerFactory := kubeinformers.NewSharedInformerFactory(kubeClient, time.Second*30) platformInformerFactory := informers.NewSharedInformerFactory(platformOperatorClient, time.Second*30) platformController := NewPlatformController(kubeClient, platformOperatorClient, kubeInformerFactory, platformInformerFactory) - //resourcePolicyController := NewResourcePolicyController(kubeClient, platformOperatorClient, kubeInformerFactory, platformInformerFactory) var wg sync.WaitGroup wg.Add(1) @@ -57,13 +56,6 @@ func main() { platformController.Run(1, ctx.Done()) }() - /* - go func() { - defer wg.Done() - resourcePolicyController.Run(1, ctx.Done()) - }() - */ - go kubeInformerFactory.Start(ctx.Done()) go platformInformerFactory.Start(ctx.Done()) diff --git a/platform-operator/platformcontroller.go b/platform-operator/platformcontroller.go index ca683d42..006a4db9 100644 --- a/platform-operator/platformcontroller.go +++ b/platform-operator/platformcontroller.go @@ -1,25 +1,23 @@ package main import ( + "context" + gerrors "errors" "fmt" - "time" "io/ioutil" "log" + "reflect" "strconv" "strings" - "context" - gerrors "errors" - "reflect" + "time" - _ "github.com/lib/pq" "net/http" "net/url" - - "gopkg.in/yaml.v3" + + _ "github.com/lib/pq" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/client-go/kubernetes" - apiextensionsv1beta1 "k8s.io/apiextensions-apiserver/pkg/apis/apiextensions/v1" "github.com/golang/glog" corev1 "k8s.io/api/core/v1" @@ -63,10 +61,10 @@ const ( MessageResourceSynced = "PlatformStack synced successfully" // Annotations to put on Consumer CRDs. - CREATED_BY_KEY = "created-by" + CREATED_BY_KEY = "created-by" CREATED_BY_VALUE = "kubeplus" - HELMER_HOST = "localhost" - HELMER_PORT = "8090" + HELMER_HOST = "localhost" + HELMER_PORT = "8090" ) // Controller is the controller implementation for Foo resources @@ -76,10 +74,10 @@ type Controller struct { // sampleclientset is a clientset for our own API group platformStackclientset clientset.Interface - deploymentsLister appslisters.DeploymentLister - deploymentsSynced cache.InformerSynced - platformStacksLister listers.ResourceCompositionLister - platformStacksSynced cache.InformerSynced + deploymentsLister appslisters.DeploymentLister + deploymentsSynced cache.InformerSynced + platformStacksLister listers.ResourceCompositionLister + platformStacksSynced cache.InformerSynced // workqueue is a rate limited work queue. This is used to queue work to be // processed instead of performing it as soon as a change happens. This @@ -115,14 +113,14 @@ func NewPlatformController( recorder := eventBroadcaster.NewRecorder(scheme.Scheme, corev1.EventSource{Component: controllerAgentName}) controller := &Controller{ - kubeclientset: kubeclientset, - platformStackclientset: platformStackclientset, - deploymentsLister: deploymentInformer.Lister(), - deploymentsSynced: deploymentInformer.Informer().HasSynced, - platformStacksLister: platformStackInformer.Lister(), - platformStacksSynced: platformStackInformer.Informer().HasSynced, - workqueue: workqueue.NewNamedRateLimitingQueue(workqueue.DefaultControllerRateLimiter(), "PlatformStacks"), - recorder: recorder, + kubeclientset: kubeclientset, + platformStackclientset: platformStackclientset, + deploymentsLister: deploymentInformer.Lister(), + deploymentsSynced: deploymentInformer.Informer().HasSynced, + platformStacksLister: platformStackInformer.Lister(), + platformStacksSynced: platformStackInformer.Informer().HasSynced, + workqueue: workqueue.NewNamedRateLimitingQueue(workqueue.DefaultControllerRateLimiter(), "PlatformStacks"), + recorder: recorder, } glog.Info("Setting up event handlers") @@ -132,8 +130,6 @@ func NewPlatformController( UpdateFunc: func(old, new interface{}) { newDepl := new.(*platformworkflowv1alpha1.ResourceComposition) oldDepl := old.(*platformworkflowv1alpha1.ResourceComposition) - //fmt.Println("New Version:%s", newDepl.ResourceVersion) - //fmt.Println("Old Version:%s", oldDepl.ResourceVersion) if newDepl.ResourceVersion == oldDepl.ResourceVersion { // Periodic resync will send update events for all known Deployments. // Two different versions of the same Deployment will always have different RVs. @@ -144,9 +140,9 @@ func NewPlatformController( } }, DeleteFunc: func(obj interface{}) { - _, err := cache.DeletionHandlingMetaNamespaceKeyFunc(obj) + _, err := cache.DeletionHandlingMetaNamespaceKeyFunc(obj) if err == nil { - controller.deleteFoo(obj) + controller.deleteFoo(obj) } }, }) @@ -297,14 +293,13 @@ func (c *Controller) handleObject(obj interface{}) { } } - func (c *Controller) deleteFoo(obj interface{}) { fmt.Println("Inside delete Foo") var err error if _, err = cache.MetaNamespaceKeyFunc(obj); err != nil { - panic(err) + panic(err) } foo := obj.(*platformworkflowv1alpha1.ResourceComposition) @@ -333,13 +328,11 @@ func (c *Controller) deleteFoo(obj interface{}) { action := "delete" handleCRD(foo.Name, kind, version, group, plural, action, namespace, chartURL, chartName) - resPolicySpec := foo.Spec.ResPolicy - //fmt.Printf("ResPolicySpec:%v\n",resPolicySpec) + resPolicySpec := foo.Spec.ResPolicy deleteResourcePolicy(resPolicySpec, namespace) - resMonitorSpec := foo.Spec.ResMonitor - //fmt.Printf("ResMonitorSpec:%v\n",resMonitorSpec) + resMonitorSpec := foo.Spec.ResMonitor deleteResourceMonitor(resMonitorSpec, namespace) @@ -367,11 +360,11 @@ func (c *Controller) updateFoo(oldObj, newObj interface{}) { } fmt.Printf("GHI - update\n") fmt.Printf("NS:%s", namespace) - kind := newFoo.Spec.NewResource.Resource.Kind - group := newFoo.Spec.NewResource.Resource.Group - version := newFoo.Spec.NewResource.Resource.Version - plural := newFoo.Spec.NewResource.Resource.Plural - chartURL := newFoo.Spec.NewResource.ChartURL + kind := newFoo.Spec.NewResource.Resource.Kind + group := newFoo.Spec.NewResource.Resource.Group + version := newFoo.Spec.NewResource.Resource.Version + plural := newFoo.Spec.NewResource.Resource.Plural + chartURL := newFoo.Spec.NewResource.ChartURL chartName := newFoo.Spec.NewResource.ChartName fmt.Printf("Kind:%s, Version:%s Group:%s, Plural:%s\n", kind, version, group, plural) fmt.Printf("ChartURL:%s, ChartName:%s\n", chartURL, chartName) @@ -442,14 +435,14 @@ func (c *Controller) syncHandler(key string) error { return nil } - resPolicySpec := foo.Spec.ResPolicy - fmt.Printf("ResPolicySpec:%v\n",resPolicySpec) + resPolicySpec := foo.Spec.ResPolicy + fmt.Printf("ResPolicySpec:%v\n", resPolicySpec) // Instantiate ResourcePolicy object createResourcePolicy(resPolicySpec, namespace) - resMonitorSpec := foo.Spec.ResMonitor - fmt.Printf("ResMonitorSpec:%v\n",resMonitorSpec) + resMonitorSpec := foo.Spec.ResMonitor + fmt.Printf("ResMonitorSpec:%v\n", resMonitorSpec) // Instantiate ResourceMonitor object createResourceMonitor(resMonitorSpec, namespace) @@ -469,23 +462,22 @@ func (c *Controller) updateResourceCompositionStatus(foo *platformworkflowv1alph timeout := 300 count := 0 for { - _, err := c.platformStackclientset.WorkflowsV1alpha1().ResourceCompositions(namespace).Update(context.Background(), fooCopy, metav1.UpdateOptions{}) - //_, err := c.sampleclientset.MoodlecontrollerV1().Moodles(foo.Namespace).Update(fooCopy) - if err != nil { - fmt.Printf("Platformcontroller.go : ERROR in UpdateResourceCompositionStatus %e\n", err) - fmt.Printf("Platformcontroller.go : ERROR %v\n", err) - time.Sleep(1 * time.Second) - count = count + 1 - } else { - fmt.Printf("Successfully updated Resource Composition status.") - break - } - if count >= timeout { - fmt.Printf("\n--") - fmt.Printf("CR instance %v not ready till timeout.\n", foo) - fmt.Printf("---") - break - } + _, err := c.platformStackclientset.WorkflowsV1alpha1().ResourceCompositions(namespace).Update(context.Background(), fooCopy, metav1.UpdateOptions{}) + if err != nil { + fmt.Printf("Platformcontroller.go : ERROR in UpdateResourceCompositionStatus %e\n", err) + fmt.Printf("Platformcontroller.go : ERROR %v\n", err) + time.Sleep(1 * time.Second) + count = count + 1 + } else { + fmt.Printf("Successfully updated Resource Composition status.") + break + } + if count >= timeout { + fmt.Printf("\n--") + fmt.Printf("CR instance %v not ready till timeout.\n", foo) + fmt.Printf("---") + break + } } } @@ -510,15 +502,11 @@ func createResourceMonitor(resMonitorSpec interface{}, namespace string) { } } -func deleteResourceMonitor(resMonitorSpec interface{}, namespace string) { +func deleteResourceMonitor(resMonitorSpec interface{}, namespace string) { fmt.Println("Inside deleteResourceMonitor") resMonitorObject := resMonitorSpec.(platformworkflowv1alpha1.ResourceMonitor) inputResMonitorName := resMonitorObject.ObjectMeta.Name - /*namespace := resMonitorObject.ObjectMeta.Namespace - if namespace == "" { - namespace = "default" - }*/ - fmt.Printf("ResMonitor:%s, Namespace:%s\n",inputResMonitorName,namespace) + fmt.Printf("ResMonitor:%s, Namespace:%s\n", inputResMonitorName, namespace) // Using Typed client config, err := rest.InClusterConfig() @@ -566,15 +554,11 @@ func createResourcePolicy(resPolicySpec interface{}, namespace string) { } } -func deleteResourcePolicy(resPolicySpec interface{}, namespace string) { +func deleteResourcePolicy(resPolicySpec interface{}, namespace string) { fmt.Println("Inside deleteResourcePolicy.") resPolicyObject := resPolicySpec.(platformworkflowv1alpha1.ResourcePolicy) inputResPolicyName := resPolicyObject.ObjectMeta.Name - /*namespace := resPolicyObject.ObjectMeta.Namespace - if namespace == "" { - namespace = "default" - }*/ - fmt.Printf("ResPolicy:%s, Namespace:%s\n",inputResPolicyName,namespace) + fmt.Printf("ResPolicy:%s, Namespace:%s\n", inputResPolicyName, namespace) // Using Typed client config, err := rest.InClusterConfig() @@ -601,91 +585,6 @@ func deleteResourcePolicy(resPolicySpec interface{}, namespace string) { } } -func flatten(yaml_contents map[string]interface{}, types_dict map[string]apiextensionsv1beta1.JSONSchemaProps) map[string]apiextensionsv1beta1.JSONSchemaProps { - for key, value := range yaml_contents { - //fmt.Printf("Key:%s ", key) - //fmt.Printf("Value:%s Type:%T\n", value, value) - _, ok := value.(string) - if ok { - //str_dict := map[string]apiextensionsv1beta1.JSONSchemaProps{"type": apiextensionsv1beta1.JSONSchemaProps{Type: "string"}} - types_dict[key] = apiextensionsv1beta1.JSONSchemaProps{Type: "string"} - } - _, ok = value.(int) - if ok { - //str_dict := map[string]apiextensionsv1beta1.JSONSchemaProps{"type": apiextensionsv1beta1.JSONSchemaProps{Type: "integer"}} - types_dict[key] = apiextensionsv1beta1.JSONSchemaProps{Type: "integer"} - } - _, ok = value.(bool) - if ok { - //str_dict := map[string]apiextensionsv1beta1.JSONSchemaProps{"type": apiextensionsv1beta1.JSONSchemaProps{Type: "boolean"}} - types_dict[key] = apiextensionsv1beta1.JSONSchemaProps{Type: "boolean"} - } - _, ok = value.(map[string]interface{}) - if ok { - value1, _ := value.(map[string]interface{}) - inner_prop_dict := make(map[string]apiextensionsv1beta1.JSONSchemaProps) - inner_prop_dict = flatten(value1, inner_prop_dict) - - var jsonSchemaInner apiextensionsv1beta1.JSONSchemaProps - jsonSchemaInner.Type = "object" - jsonSchemaInner.Properties = inner_prop_dict - - types_dict[key] = jsonSchemaInner - } - _, ok = value.([]interface{}) - if ok { - items_dict := make(map[string]apiextensionsv1beta1.JSONSchemaProps) - items_dict["type"] = apiextensionsv1beta1.JSONSchemaProps{Type: "string"} - //prop_dict["items"] = items_dict - - var jsonSchemaInner apiextensionsv1beta1.JSONSchemaProps - jsonSchemaInner.Type = "array" - jsonSchemaInner.Properties = items_dict - - types_dict[key] = jsonSchemaInner - } - - } - //fmt.Println(types_dict) - return types_dict -} - -func getChartValueTypes(data []byte) map[string]apiextensionsv1beta1.JSONSchemaProps { - - // Create a map to hold the YAML data - var yaml_contents map[string]interface{} - - openAPIV3SchemaPropertiesInnerDetails := make(map[string]apiextensionsv1beta1.JSONSchemaProps) - - // Unmarshal the YAML data into the map - err := yaml.Unmarshal(data, &yaml_contents) - if err != nil { - fmt.Println(err) - return openAPIV3SchemaPropertiesInnerDetails - } - - /*attr_types := make(map[string]apiextensionsv1beta1.JSONSchemaProps) - openAPIV3SchemaObj := make(map[string]apiextensionsv1beta1.JSONSchemaProps) - openAPIV3SchemaProperties := make(map[string]apiextensionsv1beta1.JSONSchemaProps) - openAPIV3SchemaPropertiesInner := make(map[string]apiextensionsv1beta1.JSONSchemaProps) - */ - - openAPIV3SchemaPropertiesInnerDetails = flatten(yaml_contents, openAPIV3SchemaPropertiesInnerDetails) - //fmt.Println(openAPIV3SchemaPropertiesInnerDetails) - //fmt.Println("=====") - - /*openAPIV3SchemaPropertiesInner["type"] = apiextensionsv1beta1.JSONSchemaProps{Type: "object"} - openAPIV3SchemaPropertiesInner["properties"] = openAPIV3SchemaPropertiesInnerDetails - openAPIV3SchemaProperties["spec"] = openAPIV3SchemaPropertiesInner - openAPIV3SchemaObj["type"] = apiextensionsv1beta1.JSONSchemaProps{Type: "object"} - openAPIV3SchemaObj["properties"] = openAPIV3SchemaProperties - attr_types["openAPIV3Schema"] = openAPIV3SchemaObj - - attr_types_json, _ := json.Marshal(attr_types)*/ - return openAPIV3SchemaPropertiesInnerDetails - -} - func handleCRD(rescomposition, kind, version, group, plural, action, namespace, chartURL, chartName string) error { fmt.Printf("Inside handleCRD %s\n", action) cfg, err := rest.InClusterConfig() @@ -700,88 +599,6 @@ func handleCRD(rescomposition, kind, version, group, plural, action, namespace, fmt.Printf("Getting values.yaml of the service Helm chart\n") - //chartValuesBytes := GetValuesYaml(rescomposition, namespace) - /* - valuesYaml := string(chartValuesBytes) - fmt.Printf("%v\n", valuesYaml) - lines := strings.Split(valuesYaml, "\n") - - properties := make([]string,0) - for i:=0; i<len(lines);i++ { - line1 := strings.TrimSpace(lines[i]) - if line1 != "" { - fmt.Printf("ABC %s\n", line1) - parts := strings.Split(line1, ":") - if len(parts) == 2 { - propName := strings.TrimSpace(parts[0]) - properties = append(properties, propName) - } - } - } - - fmt.Printf("=====================\n") - fmt.Printf("%v\n", properties) - fmt.Printf("=====================\n") - */ - /* - for i:=0; i<len(properties); i++ { - fieldName := properties[i] - specProperties[fieldName] = apiextensionsv1beta1.JSONSchemaProps{Type: "string"} - }*/ - - //chartValuesTypes := GetValuesTypes(rescomposition, namespace) - - /* - specProperties := make(map[string]apiextensionsv1beta1.JSONSchemaProps) - specProperties = getChartValueTypes(chartValuesBytes) - - // Add "nodeName" to the specProperties to support node isolation. - specProperties["nodeName"] = apiextensionsv1beta1.JSONSchemaProps{Type: "string"} - - fmt.Printf("SpecProperties:%v\n", specProperties) - - var jsonSchemaInner apiextensionsv1beta1.JSONSchemaProps - jsonSchemaInner.Type = "object" - jsonSchemaInner.Properties = specProperties - - statusProperties := make(map[string]apiextensionsv1beta1.JSONSchemaProps) - statusProperties["helmrelease"] = apiextensionsv1beta1.JSONSchemaProps{Type: "string"} - var jsonSchemaStatus apiextensionsv1beta1.JSONSchemaProps - jsonSchemaStatus.Type = "object" - jsonSchemaStatus.Properties = statusProperties - - specField := make(map[string]apiextensionsv1beta1.JSONSchemaProps) - specField["spec"] = jsonSchemaInner - specField["status"] = jsonSchemaStatus - var jsonSchemaOuter apiextensionsv1beta1.JSONSchemaProps - jsonSchemaOuter.Type = "object" - jsonSchemaOuter.Properties = specField - - crd := &apiextensionsv1beta1.CustomResourceDefinition{ - ObjectMeta: metav1.ObjectMeta{ - Name: plural + "." + group, - Annotations: kubePlusAnnotation, - }, - Spec: apiextensionsv1beta1.CustomResourceDefinitionSpec{ - Group: group, - Versions: []apiextensionsv1beta1.CustomResourceDefinitionVersion{ - { - Name: version, - Served: true, - Storage: true, - Schema: &apiextensionsv1beta1.CustomResourceValidation{OpenAPIV3Schema: &jsonSchemaOuter}, - //Schema: &apiextensionsv1beta1.CustomResourceValidation{OpenAPIV3Schema: &apiextensionsv1beta1.JSONSchemaProps{Type: "object"}}, - }, - }, - Names: apiextensionsv1beta1.CustomResourceDefinitionNames{ - Plural: plural, - Kind: kind, - }, - Scope: "Namespaced", - }, - } - */ - crdPresent := false crdList, err := crdClient.CustomResourceDefinitions().List(context.Background(), metav1.ListOptions{}) if err != nil { @@ -798,10 +615,8 @@ func handleCRD(rescomposition, kind, version, group, plural, action, namespace, } group1 := crdObj.Spec.Group version1 := crdObj.Spec.Versions[0].Name - //endpoint := "apis/" + group + "/" + version kind1 := crdObj.Spec.Names.Kind plural1 := crdObj.Spec.Names.Plural - //fmt.Printf("Kind:%s, Group:%s, Version:%s, Endpoint:%s, Plural:%s\n",kind1, group1, version1, endpoint, plural1) if group == group1 && kind == kind1 && version == version1 && plural == plural1 { crdPresent = true @@ -811,17 +626,12 @@ func handleCRD(rescomposition, kind, version, group, plural, action, namespace, if !crdPresent { if action == "create" { - /* _, err1 := crdClient.CustomResourceDefinitions().Create(context.Background(), crd, metav1.CreateOptions{}) - if err1 != nil { - panic(err1.Error()) + resp := CreateCRD(kind, version, group, plural, chartURL, chartName) + respString := string(resp) + fmt.Printf("\nCreate CRD response:%s\n", respString) + if strings.Contains(respString, "Error") { + return gerrors.New(respString) } - */ - resp := CreateCRD(kind, version, group, plural, chartURL, chartName) - respString := string(resp) - fmt.Printf("\nCreate CRD response:%s\n", respString) - if strings.Contains(respString, "Error") { - return gerrors.New(respString) - } } } else { fmt.Printf("CRD Group:%s Version:%s Kind:%s Plural:%s found.\n", group, version, kind, plural) @@ -845,47 +655,28 @@ func handleCRD(rescomposition, kind, version, group, plural, action, namespace, } func GetValuesYaml(platformworkflow, namespace string) []byte { - args := fmt.Sprintf("platformworkflow=%s&namespace=%s", platformworkflow, namespace) - fmt.Printf("Inside GetValuesYaml...\n") - //serviceHost, servicePort := getServiceEndpoint("kubeplus") - //fmt.Printf("After getServiceEndpoint...\n") - var url1 string - url1 = fmt.Sprintf("http://%s:%s/apis/kubeplus/getchartvalues?%s", HELMER_HOST, HELMER_PORT, args) - fmt.Printf("Url:%s\n", url1) - body := queryKubeDiscoveryService(url1) - return body + args := fmt.Sprintf("platformworkflow=%s&namespace=%s", platformworkflow, namespace) + fmt.Printf("Inside GetValuesYaml...\n") + var url1 string + url1 = fmt.Sprintf("http://%s:%s/apis/kubeplus/getchartvalues?%s", HELMER_HOST, HELMER_PORT, args) + fmt.Printf("Url:%s\n", url1) + body := queryKubeDiscoveryService(url1) + return body } func CreateCRD(kind, version, group, plural, chartURL, chartName string) []byte { - args := fmt.Sprintf("kind=%s&version=%s&group=%s&plural=%s&chartURL=%s&chartName=%s",kind, version, group, plural, chartURL, chartName) - fmt.Printf("Inside CreateCRD...\n") - var url1 string + args := fmt.Sprintf("kind=%s&version=%s&group=%s&plural=%s&chartURL=%s&chartName=%s", kind, version, group, plural, chartURL, chartName) + fmt.Printf("Inside CreateCRD...\n") + var url1 string url1 = fmt.Sprintf("http://localhost:5005/registercrd?%s", args) - fmt.Printf("Url:%s\n", url1) - body := queryKubeDiscoveryService(url1) - return body -} - - -func GetValuesTypes(platformworkflow, namespace string) []byte { - args := fmt.Sprintf("platformworkflow=%s&namespace=%s", platformworkflow, namespace) - fmt.Printf("Inside GetValuesTypes...\n") - //serviceHost, servicePort := getServiceEndpoint("kubeplus") - //fmt.Printf("After getServiceEndpoint...\n") - var url1 string - url1 = fmt.Sprintf("http://localhost:5005/getchartvaluetypes?%s", args) - fmt.Printf("Url:%s\n", url1) - body := queryKubeDiscoveryService(url1) - return body + fmt.Printf("Url:%s\n", url1) + body := queryKubeDiscoveryService(url1) + return body } - - func deleteCRDInstances(kind, group, version, plural, namespace string) []byte { fmt.Printf("Inside deleteCRDInstances...\n") args := fmt.Sprintf("kind=%s&group=%s&version=%s&plural=%s&namespace=%s", kind, group, version, plural, namespace) - //serviceHost, servicePort := getServiceEndpoint("kubeplus") - //fmt.Printf("After getServiceEndpoint...\n") var url1 string url1 = fmt.Sprintf("http://%s:%s/apis/kubeplus/deletecrdinstances?%s", HELMER_HOST, HELMER_PORT, args) fmt.Printf("Url:%s\n", url1) @@ -893,7 +684,6 @@ func deleteCRDInstances(kind, group, version, plural, namespace string) []byte { return body } - func updateCRDInstances(kind, group, version, plural, namespace, chartURL, chartName string) []byte { fmt.Printf("Inside updateCRDInstances...\n") encodedChartURL := url.QueryEscape(chartURL) @@ -905,13 +695,9 @@ func updateCRDInstances(kind, group, version, plural, namespace, chartURL, chart return body } - - func deleteChartCRDs(chartName string) []byte { fmt.Printf("Inside deleteChartCRDs...\n") args := fmt.Sprintf("chartName=%s", chartName) - //serviceHost, servicePort := getServiceEndpoint("kubeplus") - //fmt.Printf("After getServiceEndpoint...\n") var url1 string url1 = fmt.Sprintf("http://%s:%s/apis/kubeplus/deletechartcrds?%s", HELMER_HOST, HELMER_PORT, args) fmt.Printf("Url:%s\n", url1) @@ -919,7 +705,6 @@ func deleteChartCRDs(chartName string) []byte { return body } - func getServiceEndpoint(servicename string) (string, string) { fmt.Printf("..Inside getServiceEndpoint...\n") namespace := getKubePlusNamespace() // Use the namespace in which kubeplus is deployed. @@ -931,20 +716,19 @@ func getServiceEndpoint(servicename string) (string, string) { host := discoveryServiceObj.Spec.ClusterIP port := discoveryServiceObj.Spec.Ports[0].Port stringPort := strconv.Itoa(int(port)) - fmt.Printf("Host:%s, Port:%s\n", host, stringPort) + fmt.Printf("Host:%s, Port:%s\n", host, stringPort) return host, stringPort } func getKubePlusNamespace() string { filePath := "/var/run/secrets/kubernetes.io/serviceaccount/namespace" content, err := ioutil.ReadFile(filePath) - if err != nil { - fmt.Printf("Namespace file reading error:%v\n", err) - } - ns := string(content) - ns = strings.TrimSpace(ns) - //fmt.Printf("CRD Hook NS:%s\n", ns) - return ns + if err != nil { + fmt.Printf("Namespace file reading error:%v\n", err) + } + ns := string(content) + ns = strings.TrimSpace(ns) + return ns } func queryKubeDiscoveryService(url1 string) []byte { @@ -967,10 +751,6 @@ func queryKubeDiscoveryService(url1 string) []byte { } defer resp.Body.Close() resp_body, _ := ioutil.ReadAll(resp.Body) - - //fmt.Println(resp.Status) - //fmt.Println(string(resp_body)) - //fmt.Println("Exiting QueryCompositionEndpoint") return resp_body } diff --git a/platform-operator/resourcepolicycontroller.go b/platform-operator/resourcepolicycontroller.go deleted file mode 100644 index ebd1303d..00000000 --- a/platform-operator/resourcepolicycontroller.go +++ /dev/null @@ -1,315 +0,0 @@ -package main - -import ( - "fmt" - "time" - - _ "github.com/lib/pq" - - metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" - "k8s.io/client-go/kubernetes" - - "github.com/golang/glog" - corev1 "k8s.io/api/core/v1" - "k8s.io/apimachinery/pkg/api/errors" - "k8s.io/apimachinery/pkg/util/runtime" - "k8s.io/apimachinery/pkg/util/wait" - kubeinformers "k8s.io/client-go/informers" - "k8s.io/client-go/kubernetes/scheme" - typedcorev1 "k8s.io/client-go/kubernetes/typed/core/v1" - "k8s.io/client-go/tools/cache" - "k8s.io/client-go/tools/record" - "k8s.io/client-go/util/workqueue" - - workflowcontrollerv1alpha1 "github.com/cloud-ark/kubeplus/platform-operator/pkg/apis/workflowcontroller/v1alpha1" - clientset "github.com/cloud-ark/kubeplus/platform-operator/pkg/generated/clientset/versioned" - platformstackscheme "github.com/cloud-ark/kubeplus/platform-operator/pkg/generated/clientset/versioned/scheme" - informers "github.com/cloud-ark/kubeplus/platform-operator/pkg/generated/informers/externalversions" - listers "github.com/cloud-ark/kubeplus/platform-operator/pkg/generated/listers/workflowcontroller/v1alpha1" -) - -const controllerAgent = "resourcepolicy-controller" - -const ( - // SuccessSyncedResourcePolicy is used as part of the Event 'reason' when ResourcePolicy is synced - SuccessSyncedResourcePolicy = "Synced" - // ErrResourceExists is used as part of the Event 'reason' when a Foo fails - // to sync due to a Deployment of the same name already existing. - ErrResourceExistsResourcePolicy = "ErrResourceExists" - - // MessageResourceExists is the message used for Events when a resource - // fails to sync due to a Deployment already existing - MessageResourceExistsResourcePolicy = "Resource %q already exists and is not managed by ResourcePolicy" - // MessageResourceSynced is the message used for an Event fired when a Foo - // is synced successfully - MessageResourceSyncedResourcePolicy = "ResourcePolicy synced successfully" -) - -// Controller is the controller implementation for Foo resources -type ResourcePolicyController struct { - // kubeclientset is a standard kubernetes clientset - kubeclientset kubernetes.Interface - // sampleclientset is a clientset for our own API group - resourcePolicyclientset clientset.Interface - - resourcePolicyLister listers.ResourcePolicyLister - resourcePoliciesSynced cache.InformerSynced - - // workqueue is a rate limited work queue. This is used to queue work to be - // processed instead of performing it as soon as a change happens. This - // means we can ensure we only process a fixed amount of resources at a - // time, and makes it easy to ensure we are never processing the same item - // simultaneously in two different workers. - workqueue workqueue.RateLimitingInterface - // recorder is an event recorder for recording Event resources to the - // Kubernetes API. - recorder record.EventRecorder -} - -// NewController returns a new sample controller -func NewResourcePolicyController( - kubeclientset kubernetes.Interface, - resourcePolicyclientset clientset.Interface, - kubeInformerFactory kubeinformers.SharedInformerFactory, - resourcePolicyInformerFactory informers.SharedInformerFactory) *ResourcePolicyController { - - resourcePolicyInformer := resourcePolicyInformerFactory.Workflows().V1alpha1().ResourcePolicies() - - // Create event broadcaster - // Add platformstack-controller types to the default Kubernetes Scheme so Events can be - // logged for platformstack-controller types. - platformstackscheme.AddToScheme(scheme.Scheme) - glog.V(4).Info("Creating event broadcaster") - eventBroadcaster := record.NewBroadcaster() - eventBroadcaster.StartLogging(glog.Infof) - eventBroadcaster.StartRecordingToSink(&typedcorev1.EventSinkImpl{Interface: kubeclientset.CoreV1().Events("")}) - recorder := eventBroadcaster.NewRecorder(scheme.Scheme, corev1.EventSource{Component: controllerAgent}) - - controller := &ResourcePolicyController{ - kubeclientset: kubeclientset, - resourcePolicyclientset: resourcePolicyclientset, - resourcePolicyLister: resourcePolicyInformer.Lister(), - resourcePoliciesSynced: resourcePolicyInformer.Informer().HasSynced, - workqueue: workqueue.NewNamedRateLimitingQueue(workqueue.DefaultControllerRateLimiter(), "ResourcePolicies"), - recorder: recorder, - } - - glog.Info("Setting up event handlers") - // Set up an event handler for when Foo resources change - resourcePolicyInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{ - AddFunc: controller.enqueueFoo, - UpdateFunc: func(old, new interface{}) { - newDepl := new.(*workflowcontrollerv1alpha1.ResourcePolicy) - oldDepl := old.(*workflowcontrollerv1alpha1.ResourcePolicy) - //fmt.Println("New Version:%s", newDepl.ResourceVersion) - //fmt.Println("Old Version:%s", oldDepl.ResourceVersion) - if newDepl.ResourceVersion == oldDepl.ResourceVersion { - // Periodic resync will send update events for all known ResourcePolicies. - // Two different versions of the same ResourcePolicy will always have different RVs. - return - } else { - controller.enqueueFoo(new) - } - }, - /* - DeleteFunc: func(obj interface{}) { - _, err := cache.DeletionHandlingMetaNamespaceKeyFunc(obj) - if err == nil { - controller.deleteFoo(obj) - } - },*/ - }) - return controller -} - -// Run will set up the event handlers for types we are interested in, as well -// as syncing informer caches and starting workers. It will block until stopCh -// is closed, at which point it will shutdown the workqueue and wait for -// workers to finish processing their current work items. -func (c *ResourcePolicyController) Run(threadiness int, stopCh <-chan struct{}) error { - defer runtime.HandleCrash() - defer c.workqueue.ShutDown() - - // Start the informer factories to begin populating the informer caches - glog.Info("Starting ResourcePolicy controller") - - // Wait for the caches to be synced before starting workers - glog.Info("Waiting for informer caches to sync") - if ok := cache.WaitForCacheSync(stopCh, c.resourcePoliciesSynced); !ok { - return fmt.Errorf("failed to wait for caches to sync") - } - - glog.Info("Starting workers") - // Launch two workers to process Foo resources - for i := 0; i < threadiness; i++ { - go wait.Until(c.runWorker, time.Second, stopCh) - } - - glog.Info("Started workers") - <-stopCh - glog.Info("Shutting down workers") - - return nil -} - -// runWorker is a long-running function that will continually call the -// processNextWorkItem function in order to read and process a message on the -// workqueue. -func (c *ResourcePolicyController) runWorker() { - for c.processNextWorkItem() { - } -} - -// processNextWorkItem will read a single work item off the workqueue and -// attempt to process it, by calling the syncHandler. -func (c *ResourcePolicyController) processNextWorkItem() bool { - obj, shutdown := c.workqueue.Get() - - if shutdown { - return false - } - - // We wrap this block in a func so we can defer c.workqueue.Done. - err := func(obj interface{}) error { - // We call Done here so the workqueue knows we have finished - // processing this item. We also must remember to call Forget if we - // do not want this work item being re-queued. For example, we do - // not call Forget if a transient error occurs, instead the item is - // put back on the workqueue and attempted again after a back-off - // period. - defer c.workqueue.Done(obj) - var key string - var ok bool - // We expect strings to come off the workqueue. These are of the - // form namespace/name. We do this as the delayed nature of the - // workqueue means the items in the informer cache may actually be - // more up to date that when the item was initially put onto the - // workqueue. - if key, ok = obj.(string); !ok { - // As the item in the workqueue is actually invalid, we call - // Forget here else we'd go into a loop of attempting to - // process a work item that is invalid. - c.workqueue.Forget(obj) - runtime.HandleError(fmt.Errorf("expected string in workqueue but got %#v", obj)) - return nil - } - // Run the syncHandler, passing it the namespace/name string of the - // Foo resource to be synced. - if err := c.syncHandler(key); err != nil { - return fmt.Errorf("error syncing '%s': %s", key, err.Error()) - } - // Finally, if no error occurs we Forget this item so it does not - // get queued again until another change happens. - c.workqueue.Forget(obj) - glog.Infof("Successfully synced '%s'", key) - return nil - }(obj) - - if err != nil { - runtime.HandleError(err) - return true - } - - return true -} - -// enqueueFoo takes a Foo resource and converts it into a namespace/name -// string which is then put onto the work queue. This method should *not* be -// passed resources of any type other than Foo. -func (c *ResourcePolicyController) enqueueFoo(obj interface{}) { - var key string - var err error - if key, err = cache.MetaNamespaceKeyFunc(obj); err != nil { - runtime.HandleError(err) - return - } - c.workqueue.AddRateLimited(key) -} - -// handleObject will take any resource implementing metav1.Object and attempt -// to find the Foo resource that 'owns' it. It does this by looking at the -// objects metadata.ownerReferences field for an appropriate OwnerReference. -// It then enqueues that Foo resource to be processed. If the object does not -// have an appropriate OwnerReference, it will simply be skipped. -func (c *ResourcePolicyController) handleObject(obj interface{}) { - var object metav1.Object - var ok bool - if object, ok = obj.(metav1.Object); !ok { - tombstone, ok := obj.(cache.DeletedFinalStateUnknown) - if !ok { - runtime.HandleError(fmt.Errorf("error decoding object, invalid type")) - return - } - object, ok = tombstone.Obj.(metav1.Object) - if !ok { - runtime.HandleError(fmt.Errorf("error decoding object tombstone, invalid type")) - return - } - glog.V(4).Infof("Recovered deleted object '%s' from tombstone", object.GetName()) - } - glog.V(4).Infof("Processing object: %s", object.GetName()) - if ownerRef := metav1.GetControllerOf(object); ownerRef != nil { - // If this object is not owned by a Foo, we should not do anything more - // with it. - if ownerRef.Kind != "Foo" { - return - } - - foo, err := c.resourcePolicyLister.ResourcePolicies(object.GetNamespace()).Get(ownerRef.Name) - if err != nil { - glog.V(4).Infof("ignoring orphaned object '%s' of foo '%s'", object.GetSelfLink(), ownerRef.Name) - return - } - - c.enqueueFoo(foo) - return - } -} - - -func (c *ResourcePolicyController) deleteFoo(obj interface{}) { - - fmt.Println("Inside delete Foo") - - var err error - if _, err = cache.MetaNamespaceKeyFunc(obj); err != nil { - panic(err) - } -} - -// syncHandler compares the actual state with the desired, and attempts to -// converge the two. It then updates the Status block of the Foo resource -// with the current status of the resource. -func (c *ResourcePolicyController) syncHandler(key string) error { - // Convert the namespace/name string into a distinct namespace and name - namespace, name, err := cache.SplitMetaNamespaceKey(key) - if err != nil { - runtime.HandleError(fmt.Errorf("invalid resource key: %s", key)) - return nil - } - - // Get the Foo resource with this namespace/name - foo, err := c.resourcePolicyLister.ResourcePolicies(namespace).Get(name) - if err != nil { - // The Foo resource may no longer exist, in which case we stop - // processing. - if errors.IsNotFound(err) { - runtime.HandleError(fmt.Errorf("resourcePolicy '%s' in work queue no longer exists", key)) - return nil - } - return err - } - - kind := foo.Spec.Resource.Kind - version := foo.Spec.Resource.Version - group := foo.Spec.Resource.Group - plural := foo.Spec.Resource.Plural - - fmt.Printf("Custom Resource Kind:%v\n", kind) - fmt.Printf("Custom Resource Version:%v\n", version) - fmt.Printf("Custom Resource Group:%v\n", group) - fmt.Printf("Custom Resource Plural:%v\n", plural) - - c.recorder.Event(foo, corev1.EventTypeNormal, SuccessSyncedResourcePolicy, MessageResourceSyncedResourcePolicy) - return nil -}