Skip to content

Commit

Permalink
Merge pull request #1 from sujoshua/frontConfig
Browse files Browse the repository at this point in the history
job的前端镜像支持传入自定义config+代码清理
  • Loading branch information
sujoshua authored Aug 23, 2024
2 parents 5a99285 + 4bda711 commit 7ba62db
Show file tree
Hide file tree
Showing 6 changed files with 234 additions and 49 deletions.
177 changes: 143 additions & 34 deletions cmd/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ package main
import (
"flag"
"os"
"reflect"
"strings"

// Import all Kubernetes client auth plugins (e.g. Azure, GCP, OIDC, etc.)
Expand Down Expand Up @@ -91,40 +92,7 @@ func main() {
os.Exit(1)
}

annString := os.Getenv("FRONT_ANN")
annItems := make(map[string]string)
if annString != "" {
for _, item := range strings.Split(annString, ";") {
splits := strings.Split(item, "=")
if len(splits) == 2 {
annItems[splits[0]] = splits[1]
}
}
}

debug := false
if os.Getenv("DEBUG") != "" {
debug = true
}

config := controller.Config{
ManagerImage: os.Getenv("MANAGER_IMAGE"),
WorkerImage: os.Getenv("WORKER_IMAGE"),
PullPolicy: os.Getenv("PULL_POLICY"),
PullSecret: os.Getenv("PULL_SECRET"),
StorageClass: os.Getenv("STORAGE_CLASS"),
AccessMode: os.Getenv("ACCESS_MODE"),
FrontMode: os.Getenv("FRONT_MODE"),
FrontImage: os.Getenv("FRONT_IMAGE"),
RsyncImage: os.Getenv("RSYNC_IMAGE"),
FrontCmd: os.Getenv("FRONT_CMD"),
RsyncCmd: os.Getenv("RSYNC_CMD"),
FrontHost: os.Getenv("FRONT_HOST"),
FrontTLS: os.Getenv("FRONT_TLS"),
FrontClass: os.Getenv("FRONT_CLASS"),
FrontAnn: annItems,
Debug: debug,
}
config := getConfig()

if err = (&controller.JobReconciler{
Client: mgr.GetClient(),
Expand Down Expand Up @@ -166,3 +134,144 @@ func main() {
os.Exit(1)
}
}

func getConfig() controller.Config {
// TODO: replace ugly env way to config file
annString := os.Getenv("FRONT_ANN")
annItems := make(map[string]string)
if annString != "" {
for _, item := range strings.Split(annString, ";") {
splits := strings.Split(item, "=")
if len(splits) == 2 {
annItems[splits[0]] = splits[1]
}
}
}

debug := false
if os.Getenv("DEBUG") != "" {
debug = true
}

c := controller.Config{
ManagerImage: os.Getenv("MANAGER_IMAGE"),
WorkerImage: os.Getenv("WORKER_IMAGE"),
PullPolicy: os.Getenv("PULL_POLICY"),
PullSecret: os.Getenv("PULL_SECRET"),
StorageClass: os.Getenv("STORAGE_CLASS"),
AccessMode: os.Getenv("ACCESS_MODE"),
FrontMode: os.Getenv("FRONT_MODE"),
FrontImage: os.Getenv("FRONT_IMAGE"),
RsyncImage: os.Getenv("RSYNC_IMAGE"),
FrontCmd: os.Getenv("FRONT_CMD"),
FrontConfig: os.Getenv("FRONT_CONFIG"),
RsyncCmd: os.Getenv("RSYNC_CMD"),
FrontHost: os.Getenv("FRONT_HOST"),
FrontTLS: os.Getenv("FRONT_TLS"),
FrontClass: os.Getenv("FRONT_CLASS"),
FrontAnn: annItems,
Debug: debug,
}

mergeDefaults(&c)

return c
}

// mergeDefaults merges the default values into the given config.
// only support string and map[string]string.
func mergeDefaults(config *controller.Config) {
defaultConfig := &controller.Config{
ManagerImage: "cquptmirror/manager:latest",
WorkerImage: "cquptmirror/worker:latest",
FrontMode: "caddy",
RsyncImage: "",
FrontCmd: "caddy run -c /etc/frontConfig",
FrontConfig: `{
"apps": {
"http": {
"servers": {
"static": {
"listen": [
":80"
],
"read_header_timeout": 10000000000,
"idle_timeout": 30000000000,
"max_header_bytes": 10240,
"routes": [
{
"handle": [
{
"encodings": {
"gzip": {
},
"zstd": {
}
},
"handler": "encode",
"prefer": [
"zstd",
"gzip"
]
},
{
"browse": {
},
"handler": "file_server",
"root": "/data",
"index_names": [
"_noindex"
]
}
]
}
],
"automatic_https": {
"disable": true
}
}
}
}
}}
`,
RsyncCmd: "",
FrontHost: "mirrors.cqupt.edu.cn",
FrontTLS: "",
FrontClass: "traefik",
FrontAnn: map[string]string{
"traefik.ingress.kubernetes.io/router.entrypoints": "web",
},
Debug: false,
}

// 用反射实现
configValue := reflect.ValueOf(config).Elem()
defaultConfigValue := reflect.ValueOf(defaultConfig).Elem()

for i := 0; i < configValue.NumField(); i++ {
configField := configValue.Field(i)
defaultConfigField := defaultConfigValue.Field(i)

switch configField.Kind() {
case reflect.String:
if configField.IsZero() {
configField.SetString(defaultConfigField.String())
}
case reflect.Map:
// only support map[string]string
if configField.IsZero() {
// 新建一个map,防止defaultConfig依然被引用而无法被gc
newMap := make(map[string]string)
for _, key := range defaultConfigField.MapKeys() {
newMap[key.String()] = defaultConfigField.MapIndex(key).String()
}
configField.Set(reflect.ValueOf(newMap))
}
default:
continue
}
}
}
5 changes: 5 additions & 0 deletions internal/controller/const.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
package controller

const (
ConfigMapKind string = "Configmap"
)
38 changes: 28 additions & 10 deletions internal/controller/job_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -73,12 +73,29 @@ func (r *JobReconciler) Reconcile(ctx context.Context, req ctrl.Request) (ctrl.R
return ctrl.Result{}, nil
}

var (
err error
ig *v1.Ingress
frontCM *corev1.ConfigMap
)
disableFront, _, _, _, _, _, _ := r.checkRsyncFront(&job)
if !disableFront {
ig, err = r.desiredIngress(&job)
if err != nil {
return ctrl.Result{}, err
}
frontCM, err = r.desiredFrontConfigmap(&job)
if err != nil {
return ctrl.Result{}, err
}
}

pvc, err := r.desiredPersistentVolumeClaim(&job)
if err != nil {
return ctrl.Result{}, err
}

app, err := r.desiredDeployment(&job, managerName)
app, err := r.desiredDeployment(&job, managerName, frontCM)
if err != nil {
return ctrl.Result{}, err
}
Expand All @@ -88,15 +105,6 @@ func (r *JobReconciler) Reconcile(ctx context.Context, req ctrl.Request) (ctrl.R
return ctrl.Result{}, err
}

var ig *v1.Ingress
disableFront, _, _, _, _, _, _ := r.checkRsyncFront(&job)
if !disableFront {
ig, err = r.desiredIngress(&job)
if err != nil {
return ctrl.Result{}, err
}
}

applyOpts := []client.PatchOption{client.ForceOwnership, client.FieldOwner("mirror-controller")}

err = r.Patch(ctx, pvc, client.Apply, applyOpts...)
Expand All @@ -120,6 +128,12 @@ func (r *JobReconciler) Reconcile(ctx context.Context, req ctrl.Request) (ctrl.R
if err != nil {
return ctrl.Result{}, err
}
if frontCM != nil {
err = r.Patch(ctx, frontCM, client.Apply, applyOpts...)
if err != nil {
return ctrl.Result{}, err
}
}
}
} else {
deploy := new(appsv1.Deployment)
Expand All @@ -134,6 +148,10 @@ func (r *JobReconciler) Reconcile(ctx context.Context, req ctrl.Request) (ctrl.R
ObjectMeta: metav1.ObjectMeta{Name: job.Name, Namespace: job.Namespace},
})
r.Delete(ctx, deploy)
r.Delete(ctx, &corev1.ConfigMap{
TypeMeta: metav1.TypeMeta{APIVersion: corev1.SchemeGroupVersion.String(), Kind: ConfigMapKind},
ObjectMeta: metav1.ObjectMeta{Name: job.Name, Namespace: job.Namespace},
})
}
}

Expand Down
60 changes: 56 additions & 4 deletions internal/controller/job_helper.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,8 @@ along with this program. If not, see <https://www.gnu.org/licenses/>.
package controller

import (
"bytes"
"encoding/json"
"errors"
"fmt"
"github.com/CQUPTMirror/kubesync/api/v1beta1"
Expand All @@ -38,6 +40,25 @@ const (
RsyncPort = 873
)

func (r *JobReconciler) getFrontConfig(job *v1beta1.Job) (frontConfig string, err error) {
// TODO add caddy config to job crd
/*
if job.Spec.Config.CaddyConfig != "" {
return job.Spec.Config.CaddyConfig
}
*/
if r.Config.FrontConfig == "" {
return "", nil
}
frontConfig = r.Config.FrontConfig
var buf bytes.Buffer
if err = json.Compact(&buf, []byte(frontConfig)); err != nil {
return "", err
}

return buf.String(), nil
}

func (r *JobReconciler) checkRsyncFront(job *v1beta1.Job) (disableFront, disableRsync bool, frontCmd, rsyncCmd []string, frontMode, frontImage, rsyncImage string) {
frontMode, frontImage, rsyncImage = r.Config.FrontMode, r.Config.FrontImage, r.Config.RsyncImage
frontCmd, rsyncCmd = strings.Split(r.Config.FrontCmd, " "), strings.Split(r.Config.RsyncCmd, " ")
Expand Down Expand Up @@ -110,7 +131,30 @@ func (r *JobReconciler) desiredPersistentVolumeClaim(job *v1beta1.Job) (*corev1.
return &pvc, nil
}

func (r *JobReconciler) desiredDeployment(job *v1beta1.Job, manager string) (*appsv1.Deployment, error) {
func (r *JobReconciler) desiredFrontConfigmap(job *v1beta1.Job) (*corev1.ConfigMap, error) {
caddyConfig, err := r.getFrontConfig(job)
if err != nil {
return nil, err
}

if caddyConfig == "" {
return nil, nil
}

return &corev1.ConfigMap{
TypeMeta: metav1.TypeMeta{APIVersion: corev1.SchemeGroupVersion.String(), Kind: ConfigMapKind},
ObjectMeta: metav1.ObjectMeta{
Name: job.Name + "-front",
Namespace: job.Namespace,
Labels: map[string]string{"job": job.Name},
},
Data: map[string]string{
"frontConfig": caddyConfig,
},
}, nil
}

func (r *JobReconciler) desiredDeployment(job *v1beta1.Job, manager string, frontCM *corev1.ConfigMap) (*appsv1.Deployment, error) {
enableServiceLinks := false
app := appsv1.Deployment{
TypeMeta: metav1.TypeMeta{APIVersion: appsv1.SchemeGroupVersion.String(), Kind: "Deployment"},
Expand Down Expand Up @@ -204,7 +248,7 @@ func (r *JobReconciler) desiredDeployment(job *v1beta1.Job, manager string) (*ap
}
probe := &corev1.Probe{
ProbeHandler: corev1.ProbeHandler{
TCPSocket: &corev1.TCPSocketAction{Port: intstr.FromInt(ApiPort)},
TCPSocket: &corev1.TCPSocketAction{Port: intstr.FromInt32(ApiPort)},
},
InitialDelaySeconds: 10,
TimeoutSeconds: 5,
Expand Down Expand Up @@ -251,7 +295,7 @@ func (r *JobReconciler) desiredDeployment(job *v1beta1.Job, manager string) (*ap
if !disableFront {
frontProbe := &corev1.Probe{
ProbeHandler: corev1.ProbeHandler{
TCPSocket: &corev1.TCPSocketAction{Port: intstr.FromInt(FrontPort)},
TCPSocket: &corev1.TCPSocketAction{Port: intstr.FromInt32(FrontPort)},
},
InitialDelaySeconds: 10,
TimeoutSeconds: 5,
Expand All @@ -275,6 +319,14 @@ func (r *JobReconciler) desiredDeployment(job *v1beta1.Job, manager string) (*ap
{ContainerPort: FrontPort, Name: "front", Protocol: "TCP"},
},
}
if frontCM != nil {
frontContainer.VolumeMounts = append(frontContainer.VolumeMounts, corev1.VolumeMount{
Name: frontCM.Name,
SubPath: "frontConfig",
MountPath: "/etc/frontConfig",
})
}

if len(frontCmd) > 0 {
frontContainer.Command = frontCmd
}
Expand All @@ -283,7 +335,7 @@ func (r *JobReconciler) desiredDeployment(job *v1beta1.Job, manager string) (*ap
if !disableRsync {
rsyncProbe := &corev1.Probe{
ProbeHandler: corev1.ProbeHandler{
TCPSocket: &corev1.TCPSocketAction{Port: intstr.FromInt(RsyncPort)},
TCPSocket: &corev1.TCPSocketAction{Port: intstr.FromInt32(RsyncPort)},
},
InitialDelaySeconds: 10,
TimeoutSeconds: 5,
Expand Down
1 change: 1 addition & 0 deletions internal/controller/manager_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@ type Config struct {
FrontImage string
RsyncImage string
FrontCmd string
FrontConfig string
RsyncCmd string
FrontHost string
FrontTLS string
Expand Down
2 changes: 1 addition & 1 deletion internal/controller/manager_helper.go
Original file line number Diff line number Diff line change
Expand Up @@ -116,7 +116,7 @@ func (r *ManagerReconciler) desiredDeployment(manager *v1beta1.Manager) (metav1.
enableServiceLinks := false
probe := &corev1.Probe{
ProbeHandler: corev1.ProbeHandler{
TCPSocket: &corev1.TCPSocketAction{Port: intstr.FromInt(ManagerPort)},
TCPSocket: &corev1.TCPSocketAction{Port: intstr.FromInt32(ManagerPort)},
},
InitialDelaySeconds: 10,
TimeoutSeconds: 5,
Expand Down

0 comments on commit 7ba62db

Please sign in to comment.