Skip to content

Commit

Permalink
syncctl: compress config
Browse files Browse the repository at this point in the history
  • Loading branch information
absorbb committed Sep 17, 2024
1 parent cc4460c commit b3aa309
Show file tree
Hide file tree
Showing 2 changed files with 46 additions and 18 deletions.
16 changes: 8 additions & 8 deletions sync-controller/job_runner.go
Original file line number Diff line number Diff line change
Expand Up @@ -315,8 +315,8 @@ func (j *JobRunner) createSecret(podName string, task TaskDescriptor, configurat
Labels: map[string]string{k8sCreatorLabel: k8sCreatorLabelValue},
Namespace: j.namespace,
},
Immutable: &trueVar,
StringData: configuration.ToMap(),
Immutable: &trueVar,
Data: configuration.ToMap(),
}
return cm
}
Expand Down Expand Up @@ -383,12 +383,12 @@ func (j *JobRunner) createCronJob(jobId string, task TaskDescriptor, configurati
}
initCommand := []string{"sh", "-c", "mkfifo /pipes/stdout; mkfifo /pipes/stderr"}
if !configuration.IsEmpty() {
initCommand = []string{"sh", "-c", "mkfifo /pipes/stdout; mkfifo /pipes/stderr; cp /configmap/* /config/"}
initCommand = []string{"sh", "-c", "mkfifo /pipes/stdout; mkfifo /pipes/stderr; cp /configmap/* /config/; gunzip /config/*.gz"}
items := []v1.KeyToPath{}
for k := range configuration.ToMap() {
for _, k := range configuration.Keys() {
items = append(items, v1.KeyToPath{
Key: k,
Path: k + ".json",
Path: k + ".json.gz",
})
}
volumes = append(volumes, v1.Volume{
Expand Down Expand Up @@ -581,12 +581,12 @@ func (j *JobRunner) createPod(podName string, task TaskDescriptor, configuration
}
initCommand := []string{"sh", "-c", "mkfifo /pipes/stdout; mkfifo /pipes/stderr"}
if !configuration.IsEmpty() {
initCommand = []string{"sh", "-c", "mkfifo /pipes/stdout; mkfifo /pipes/stderr; cp /configmap/* /config/"}
initCommand = []string{"sh", "-c", "mkfifo /pipes/stdout; mkfifo /pipes/stderr; cp /configmap/* /config/; gunzip /config/*.gz"}
items := []v1.KeyToPath{}
for k := range configuration.ToMap() {
for _, k := range configuration.Keys() {
items = append(items, v1.KeyToPath{
Key: k,
Path: k + ".json",
Path: k + ".json.gz",
})
}
volumes = append(volumes, v1.Volume{
Expand Down
48 changes: 38 additions & 10 deletions sync-controller/task.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
package main

import (
"bytes"
"compress/gzip"
"github.com/jitsucom/bulker/jitsubase/jsonorder"
types2 "github.com/jitsucom/bulker/jitsubase/types"
"github.com/mitchellh/mapstructure"
Expand Down Expand Up @@ -79,26 +81,52 @@ func (t *TaskConfiguration) IsEmpty() bool {
return t == nil || (t.Config == nil && t.Catalog == nil && t.State == nil)
}

func (t *TaskConfiguration) ToMap() map[string]string {
func gzipJson(json any) []byte {
var gzipBuffer bytes.Buffer
gzipWriter, _ := gzip.NewWriterLevel(&gzipBuffer, gzip.BestCompression)
encoder := jsonorder.NewEncoder(gzipWriter)
_ = encoder.Encode(json)
_ = gzipWriter.Close()
gzippedData := gzipBuffer.Bytes()
return gzippedData
}

func (t *TaskConfiguration) Keys() []string {
if t == nil {
return nil
}
m := make([]string, 0)
if t.Config != nil {
m = append(m, "config")
}
if t.Catalog != nil {
m = append(m, "catalog")
}
if t.State != nil {
m = append(m, "state")
}
if t.DestinationConfig != nil {
m = append(m, "destinationConfig")
}
return m
}

func (t *TaskConfiguration) ToMap() map[string][]byte {
if t == nil {
return nil
}
m := map[string]string{}
m := map[string][]byte{}
if t.Config != nil {
config, _ := jsonorder.Marshal(t.Config)
m["config"] = string(config)
m["config"] = gzipJson(t.Config)
}
if t.Catalog != nil {
catalog, _ := jsonorder.Marshal(t.Catalog)
m["catalog"] = string(catalog)
m["catalog"] = gzipJson(t.Catalog)
}
if t.State != nil {
state, _ := jsonorder.Marshal(t.State)
m["state"] = string(state)
m["state"] = gzipJson(t.State)
}
if t.DestinationConfig != nil {
destinationConfig, _ := jsonorder.Marshal(t.DestinationConfig)
m["destinationConfig"] = string(destinationConfig)
m["destinationConfig"] = gzipJson(t.DestinationConfig)
}
return m
}

0 comments on commit b3aa309

Please sign in to comment.