From 96314f6fc40a28cb825ae2e36802d82db584c17c Mon Sep 17 00:00:00 2001 From: kerthcet Date: Sun, 13 Oct 2024 10:51:42 +0800 Subject: [PATCH] [1/N] Implement agent: download handler Signed-off-by: kerthcet --- .gitignore | 4 - .golangci.yaml | 1 + Dockerfile.agent | 35 +++++ Makefile | 26 +++- agent/cmd/main.go | 107 ++++++++++++++ agent/deploy/daemonset.yaml | 49 +++++++ agent/deploy/service-account.yaml | 33 +++++ agent/handler/blob.go | 35 +++++ agent/handler/blob_test.go | 54 +++++++ agent/handler/handler.go | 68 +++++++++ agent/handler/request.go | 139 +++++++++++++++++++ agent/handler/request_test.go | 68 +++++++++ api/v1alpha1/replication_types.go | 18 ++- api/v1alpha1/torrent_types.go | 17 +-- api/v1alpha1/zz_generated.deepcopy.go | 14 +- cmd/main.go | 4 + config/crd/bases/manta.io_replications.yaml | 91 +++++++++--- config/crd/bases/manta.io_torrents.yaml | 14 +- config/manager/kustomization.yaml | 4 +- config/webhook/manifests.yaml | 40 ++++++ go.mod | 5 +- pkg/controller/replication_controller.go | 50 +++++-- pkg/controller/torrent_controller.go | 15 +- pkg/dispatcher/dispatcher.go | 48 +++++-- pkg/webhook/replication_webhook.go | 91 ++++++++++++ test/integration/webhook/replication_test.go | 75 ++++++++++ test/integration/webhook/suit_test.go | 2 + test/integration/webhook/torrent_test.go | 5 +- test/util/validation/validate_torrent.go | 8 +- test/util/wrapper/replication.go | 83 +++++++++++ 30 files changed, 1102 insertions(+), 101 deletions(-) create mode 100644 Dockerfile.agent create mode 100644 agent/cmd/main.go create mode 100644 agent/deploy/daemonset.yaml create mode 100644 agent/deploy/service-account.yaml create mode 100644 agent/handler/blob.go create mode 100644 agent/handler/blob_test.go create mode 100644 agent/handler/handler.go create mode 100644 agent/handler/request.go create mode 100644 agent/handler/request_test.go create mode 100644 pkg/webhook/replication_webhook.go create mode 100644 test/integration/webhook/replication_test.go create mode 100644 test/util/wrapper/replication.go diff --git a/.gitignore b/.gitignore index f6c1a84..2698d1d 100644 --- a/.gitignore +++ b/.gitignore @@ -33,7 +33,3 @@ __pycache__ *.tgz tmp bin - -# Added by cargo - -/target diff --git a/.golangci.yaml b/.golangci.yaml index af3aa62..968847d 100644 --- a/.golangci.yaml +++ b/.golangci.yaml @@ -5,6 +5,7 @@ run: - api - cmd - pkg + - agent - test issues: diff --git a/Dockerfile.agent b/Dockerfile.agent new file mode 100644 index 0000000..2227bdf --- /dev/null +++ b/Dockerfile.agent @@ -0,0 +1,35 @@ +ARG BASE_IMAGE +ARG BUILDER_IMAGE + +# Build the manager binary +FROM ${BUILDER_IMAGE} as builder +ARG TARGETOS +ARG TARGETARCH + +WORKDIR /workspace +# Copy the Go Modules manifests +COPY go.mod go.mod +COPY go.sum go.sum +# cache deps before building and copying source so that we don't need to re-download as much +# and so that source changes don't invalidate our downloaded layer +RUN go mod download + +# Copy the go source +COPY api/ api/ +COPY agent/ agent/ + +# Build +# the GOARCH has not a default value to allow the binary be built according to the host where the command +# was called. For example, if we call make docker-build in a local env which has the Apple Silicon M1 SO +# the docker BUILDPLATFORM arg will be linux/arm64 when for Apple x86 it will be linux/amd64. Therefore, +# by leaving it empty we can ensure that the container and binary shipped on it will have the same platform. +RUN CGO_ENABLED=0 GOOS=${TARGETOS:-linux} GOARCH=${TARGETARCH} go build -a -o manager agent/cmd/main.go + +# Use distroless as minimal base image to package the manager binary +# Refer to https://github.com/GoogleContainerTools/distroless for more details +FROM ${BASE_IMAGE} +WORKDIR / +COPY --from=builder /workspace/manager . +USER 65532:65532 + +ENTRYPOINT ["/manager"] diff --git a/Makefile b/Makefile index 1e4418b..3682522 100644 --- a/Makefile +++ b/Makefile @@ -61,8 +61,11 @@ IMAGE_BUILD_EXTRA_OPTS ?= IMAGE_REGISTRY ?= inftyai IMAGE_NAME ?= manta IMAGE_REPO := $(IMAGE_REGISTRY)/$(IMAGE_NAME) +AGENT_IMAGE_NAME ?= manta-agent +AGENT_IMAGE_REPO := $(IMAGE_REGISTRY)/$(AGENT_IMAGE_NAME) GIT_TAG ?= $(shell git describe --tags --dirty --always) IMG ?= $(IMAGE_REPO):$(GIT_TAG) +AGENT_IMG ?= $(AGENT_IMAGE_REPO):$(GIT_TAG) BUILDER_IMAGE ?= golang:$(GO_VERSION) KIND_CLUSTER_NAME ?= kind @@ -108,7 +111,7 @@ gotestsum: ## Download gotestsum locally if necessary. .PHONY: test test: manifests fmt vet envtest gotestsum ## Run tests. - $(GOTESTSUM) --junitfile $(ARTIFACTS)/junit.xml -- ./api/... ./pkg/... -coverprofile $(ARTIFACTS)/cover.out + $(GOTESTSUM) --junitfile $(ARTIFACTS)/junit.xml -- ./api/... ./pkg/... ./agent/... -coverprofile $(ARTIFACTS)/cover.out .PHONY: test-integration test-integration: manifests fmt vet envtest ginkgo ## Run integration tests. @@ -174,6 +177,19 @@ image-load: image-build image-push: IMAGE_BUILD_EXTRA_OPTS=--push image-push: image-build +.PHONY: agent-image-build +agent-image-build: + $(IMAGE_BUILD_CMD) -t $(AGENT_IMG) \ + -f Dockerfile.agent \ + --build-arg BASE_IMAGE=$(BASE_IMAGE) \ + --build-arg BUILDER_IMAGE=$(BUILDER_IMAGE) \ + --build-arg CGO_ENABLED=$(CGO_ENABLED) \ + $(IMAGE_BUILD_EXTRA_OPTS) ./ +agent-image-load: IMAGE_BUILD_EXTRA_OPTS=--load +agent-image-load: agent-image-build +agent-image-push: IMAGE_BUILD_EXTRA_OPTS=--push +agent-image-push: agent-image-build + KIND = $(shell pwd)/bin/kind .PHONY: kind kind: @@ -287,3 +303,11 @@ helm-package: helm # To recover values.yaml make helm + +.PHONY: deploy-agent +deploy-agent: + $(KUBECTL) apply -f ./agent/deploy + +.PHONY: undeploy-agent +undeploy-agent: + $(KUBECTL) delete -f ./agent/deploy \ No newline at end of file diff --git a/agent/cmd/main.go b/agent/cmd/main.go new file mode 100644 index 0000000..78ca2f2 --- /dev/null +++ b/agent/cmd/main.go @@ -0,0 +1,107 @@ +/* +Copyright 2024. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package main + +import ( + "context" + "os" + + "github.com/go-logr/logr" + apimeta "k8s.io/apimachinery/pkg/api/meta" + "k8s.io/apimachinery/pkg/runtime" + "k8s.io/client-go/tools/cache" + "k8s.io/klog/v2" + ctrl "sigs.k8s.io/controller-runtime" + "sigs.k8s.io/controller-runtime/pkg/client/config" + "sigs.k8s.io/controller-runtime/pkg/log/zap" + "sigs.k8s.io/controller-runtime/pkg/manager" + + "github.com/inftyai/manta/agent/handler" + api "github.com/inftyai/manta/api/v1alpha1" +) + +var ( + setupLog logr.Logger +) + +func main() { + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + + ctrl.SetLogger(zap.New(zap.UseDevMode(true))) + setupLog = ctrl.Log.WithName("setup") + + cfg, err := config.GetConfig() + if err != nil { + setupLog.Error(err, "failed to get config") + os.Exit(1) + } + + setupLog.Info("Setting up manta-agent") + + scheme := runtime.NewScheme() + _ = api.AddToScheme(scheme) + + mgr, err := manager.New(cfg, manager.Options{ + Scheme: scheme, + }) + if err != nil { + setupLog.Error(err, "failed to initialize the manager") + os.Exit(1) + } + + informer, err := mgr.GetCache().GetInformer(ctx, &api.Replication{}) + if err != nil { + setupLog.Error(err, "failed to get the informer") + os.Exit(1) + } + + if _, err := informer.AddEventHandler(cache.ResourceEventHandlerFuncs{ + AddFunc: func(obj interface{}) { + replication := obj.(*api.Replication) + setupLog.Info("Add Event for Replication", "Replication", klog.KObj(replication)) + + // Injected by downward API. + nodeName := os.Getenv("NODE_NAME") + // Filter out unrelated events. + if nodeName != replication.Spec.NodeName || replicationReady(replication) { + return + } + handler.HandleAddEvent(replication) + }, + UpdateFunc: func(oldObj, newObj interface{}) { + }, + DeleteFunc: func(obj interface{}) { + replication := obj.(*api.Replication) + setupLog.Info("Delete Event for Replication", "Replication", klog.KObj(replication)) + // TODO: delete the file by the policy. + }, + }); err != nil { + setupLog.Error(err, "failed to add event handlers") + os.Exit(1) + } + + setupLog.Info("Starting informers") + if err := mgr.GetCache().Start(ctx); err != nil { + setupLog.Error(err, "failed to start informers") + os.Exit(1) + } +} + +func replicationReady(replication *api.Replication) bool { + return apimeta.IsStatusConditionTrue(replication.Status.Conditions, api.ReadyConditionType) +} diff --git a/agent/deploy/daemonset.yaml b/agent/deploy/daemonset.yaml new file mode 100644 index 0000000..383c204 --- /dev/null +++ b/agent/deploy/daemonset.yaml @@ -0,0 +1,49 @@ +apiVersion: apps/v1 +kind: DaemonSet +metadata: + name: manta-agent + labels: + app: manta-agent +spec: + selector: + matchLabels: + app: manta-agent + template: + metadata: + labels: + app: manta-agent + spec: + serviceAccountName: manta-agent + initContainers: + - name: init-permissions + image: busybox + command: ['sh', '-c', 'chown -R 1000:3000 /workspace/models && chmod -R 777 /workspace/models'] + volumeMounts: + - name: model-volume + mountPath: /workspace/models + containers: + - name: agent + image: inftyai/manta-agent:1013-08 + ports: + - containerPort: 8080 + resources: + limits: + memory: 200Mi + requests: + cpu: 100m + env: + - name: NODE_NAME + valueFrom: + fieldRef: + fieldPath: spec.nodeName + volumeMounts: + - name: model-volume + mountPath: /workspace/models + securityContext: + runAsUser: 1000 + runAsGroup: 3000 + volumes: + - name: model-volume + hostPath: + path: /mnt/models + type: DirectoryOrCreate diff --git a/agent/deploy/service-account.yaml b/agent/deploy/service-account.yaml new file mode 100644 index 0000000..5beb8ae --- /dev/null +++ b/agent/deploy/service-account.yaml @@ -0,0 +1,33 @@ +apiVersion: v1 +kind: ServiceAccount +metadata: + name: manta-agent +--- +apiVersion: rbac.authorization.k8s.io/v1 +kind: ClusterRole +metadata: + name: manta-agent-role +rules: +- apiGroups: + - "manta.io" + resources: + - replications + verbs: + - get + - list + - watch + - update + - patch +--- +apiVersion: rbac.authorization.k8s.io/v1 +kind: ClusterRoleBinding +metadata: + name: manta-agent-binding +subjects: +- kind: ServiceAccount + name: manta-agent + namespace: manta-system +roleRef: + kind: ClusterRole + name: manta-agent-role + apiGroup: rbac.authorization.k8s.io diff --git a/agent/handler/blob.go b/agent/handler/blob.go new file mode 100644 index 0000000..ebf1f7d --- /dev/null +++ b/agent/handler/blob.go @@ -0,0 +1,35 @@ +/* +Copyright 2024. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package handler + +import ( + "os" + + api "github.com/inftyai/manta/api/v1alpha1" +) + +func blobExists(repoName, chunkName string) bool { + workspace := api.DefaultWorkspace + if ws := os.Getenv("WORKSPACE"); ws != "" { + workspace = ws + } + path := workspace + repoName + "/blobs/" + chunkName + if _, err := os.Stat(path); err != nil { + return false + } + return true +} diff --git a/agent/handler/blob_test.go b/agent/handler/blob_test.go new file mode 100644 index 0000000..0b1f623 --- /dev/null +++ b/agent/handler/blob_test.go @@ -0,0 +1,54 @@ +/* +Copyright 2024. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package handler + +import ( + "os" + "testing" +) + +func TestBlobExists(t *testing.T) { + if err := os.Setenv("WORKSPACE", "../../tmp/workspace/models/"); err != nil { + t.Fail() + } + + path := "../../tmp/workspace/models/fakeRepo/blobs/" + filename := "fakeChunk" + + if blobExists("fakeRepo", "fakeChunk") { + t.Error("blob should be not exist") + } + + if err := os.MkdirAll(path, os.ModePerm); err != nil { + t.Errorf("run mkdir failed: %v", err) + } + + file, err := os.Create(path + filename) + if err != nil { + t.Error("failed to create file") + } + defer func() { + _ = file.Close() + }() + defer func() { + _ = os.RemoveAll("../../tmp/workspace") + }() + + if !blobExists("fakeRepo", "fakeChunk") { + t.Error("blob should be exist") + } +} diff --git a/agent/handler/handler.go b/agent/handler/handler.go new file mode 100644 index 0000000..bead298 --- /dev/null +++ b/agent/handler/handler.go @@ -0,0 +1,68 @@ +/* +Copyright 2024. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package handler + +import ( + "fmt" + "strings" + "sync" + + api "github.com/inftyai/manta/api/v1alpha1" +) + +func HandleAddEvent(replication *api.Replication) { + var wg sync.WaitGroup + + for i := range replication.Spec.Tuples { + wg.Add(1) + go func() { + defer wg.Done() + if err := handleTuple(&replication.Spec.Tuples[i]); err != nil { + fmt.Printf("Error handling tuple: %v.\n", err) + } + }() + } + + wg.Wait() +} + +func handleTuple(tuple *api.Tuple) error { + // If destination is nil, the address must not be localhost. + if tuple.Destination == nil { + // TODO: Delete OP + return nil + } + + // If modelHub != nil, it must be download to the localhost. + if tuple.Source.ModelHub != nil { + _, localPath := parseURI(*tuple.Destination.URI) + if *tuple.Source.ModelHub.Name == api.HUGGINGFACE_MODEL_HUB { + if err := downloadFromHF(tuple.Source.ModelHub.ModelID, *tuple.Source.ModelHub.Revision, *tuple.Source.ModelHub.Filename, localPath); err != nil { + return err + } + // TODO: handle modelScope + } + // TODO: Handle address + } + + return nil +} + +func parseURI(uri string) (host string, address string) { + splits := strings.Split(uri, "://") + return splits[0], splits[1] +} diff --git a/agent/handler/request.go b/agent/handler/request.go new file mode 100644 index 0000000..18aa768 --- /dev/null +++ b/agent/handler/request.go @@ -0,0 +1,139 @@ +/* +Copyright 2024. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package handler + +import ( + "fmt" + "io" + "net/http" + "os" + "path/filepath" + "time" +) + +const ( + maxAttempts = 10 + interval = 500 * time.Millisecond +) + +func downloadFromHF(modelID, revision, path string, downloadPath string) error { + // Example: "https://huggingface.co/Qwen/Qwen2.5-72B-Instruct/resolve/main/model-00031-of-00037.safetensors" + url := fmt.Sprintf("https://huggingface.co/%s/resolve/%s/%s", modelID, revision, path) + token := hfToken() + + attempts := 0 + for { + + attempts += 1 + + if err := downloadFileWithResume(url, downloadPath, token); err != nil { + if attempts > maxAttempts { + return fmt.Errorf("reached maximum download attempts, download failed") + } + + fmt.Printf("Error downloading file from %s: %v. Resuming.\n", url, err) + time.Sleep(interval) + continue + } + break + } + + fmt.Printf("File %s downloaded successfully!", path) + + return nil +} + +func hfToken() string { + if token := os.Getenv("HF_TOKEN"); token != "" { + return token + } + if token := os.Getenv("HUGGING_FACE_HUB_TOKEN"); token != "" { + return token + } + return "" +} + +func downloadFileWithResume(url string, file string, token string) error { + dir := filepath.Dir(file) + err := os.MkdirAll(dir, 0755) + if err != nil { + return err + } + + out, err := os.OpenFile(file, os.O_RDWR|os.O_CREATE, 0666) + if err != nil { + return err + } + defer func() { + _ = out.Close() + }() + + fileInfo, err := out.Stat() + if err != nil { + return err + } + existingFileSize := fileInfo.Size() + + req, err := http.NewRequest("GET", url, nil) + if err != nil { + return err + } + + if existingFileSize > 0 { + req.Header.Set("Range", fmt.Sprintf("bytes=%d-", existingFileSize)) + } + + if token != "" { + req.Header.Set("Authorization", fmt.Sprintf("Bearer %s", token)) + } + + resp, err := http.DefaultClient.Do(req) + if err != nil { + return err + } + defer func() { + _ = resp.Body.Close() + }() + + if !(resp.StatusCode == http.StatusOK || + resp.StatusCode == http.StatusRequestedRangeNotSatisfiable || + resp.StatusCode == http.StatusPartialContent) { + return fmt.Errorf("unexpected status code: %v", resp.StatusCode) + } + + // File is already downloaded. Should we be more cautious here? + if resp.StatusCode == http.StatusRequestedRangeNotSatisfiable { + return nil + } + + // If the server doesn't support partial download, return error. + if resp.StatusCode != http.StatusPartialContent && existingFileSize > 0 { + return fmt.Errorf("server doesn't support resuming downloads, status: %s", resp.Status) + } + + _, err = out.Seek(existingFileSize, io.SeekStart) + if err != nil { + return err + } + + _, err = io.Copy(out, resp.Body) + if err != nil { + return err + } + + return nil +} diff --git a/agent/handler/request_test.go b/agent/handler/request_test.go new file mode 100644 index 0000000..6b062a7 --- /dev/null +++ b/agent/handler/request_test.go @@ -0,0 +1,68 @@ +/* +Copyright 2024. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package handler + +import ( + "os" + "testing" +) + +func Test_downloadFromHF(t *testing.T) { + testCases := []struct { + name string + modelID string + revision string + path string + downloadPath string + wantError bool + }{ + { + name: "normal download", + modelID: "Qwen/Qwen2.5-72B-Instruct", + revision: "main", + path: "LICENSE", + downloadPath: "../../tmp/LICENSE", + wantError: false, + }, + { + name: "unknown revision", + modelID: "Qwen/Qwen2.5-72B-Instruct", + revision: "master", // unknown branch + path: "LICENSE", + downloadPath: "../../tmp/LICENSE", + wantError: true, + }, + } + + for _, tc := range testCases { + t.Run(tc.name, func(t *testing.T) { + gotError := downloadFromHF(tc.modelID, tc.revision, tc.path, tc.downloadPath) + defer func() { + _ = os.RemoveAll(tc.downloadPath) + }() + + if tc.wantError && gotError == nil { + t.Error("expected error here") + } + if !tc.wantError { + if _, err := os.Stat(tc.downloadPath); err != nil { + t.Error("expected file downloaded successfully") + } + } + }) + } +} diff --git a/api/v1alpha1/replication_types.go b/api/v1alpha1/replication_types.go index b5abc7d..f4a536f 100644 --- a/api/v1alpha1/replication_types.go +++ b/api/v1alpha1/replication_types.go @@ -24,10 +24,14 @@ import ( // Source couldn't be nil, but if destination is nil, // it means to delete the file. type Target struct { - // TODO - // Address represents the communication address of the Pod. + // URI represents the file address with different storage. + // - oss://./ + // - localhost:// + URI *string `json:"uri,omitempty"` + // ModelHub represents the model registry for model downloads. + // ModelHub and address are exclusive. // +optional - Address *string `json:"address,omitempty"` + ModelHub *ModelHub `json:"modelHub,omitempty"` } // Tuple represents a pair of source and destination. @@ -37,6 +41,7 @@ type Tuple struct { Source Target `json:"source"` // Destination represents the destination of the file. // If destination is nil, it means to delete the file. + // +optional Destination *Target `json:"destination,omitempty"` } @@ -44,13 +49,6 @@ type Tuple struct { type ReplicationSpec struct { // NodeName represents which node should do replication. NodeName string `json:"nodeName"` - // RepoName represents the repo name. - // +optional - RepoName *string `json:"repoName,omitempty"` - // ObjectPath represents the object path. - ObjectPath string `json:"objectPath"` - // ChunkName represents the target chunk name. - ChunkName string `json:"chunkName"` // Tuples represents a slice of tuples. // +optional Tuples []Tuple `json:"tuples,omitempty"` diff --git a/api/v1alpha1/torrent_types.go b/api/v1alpha1/torrent_types.go index dfce4f8..b00cdb2 100644 --- a/api/v1alpha1/torrent_types.go +++ b/api/v1alpha1/torrent_types.go @@ -21,7 +21,9 @@ import ( ) const ( - TorrentNameLabelKey = "manta.io/torrent-name" + TorrentNameLabelKey = "manta.io/torrent-name" + DefaultWorkspace = "/workspace/models/" + HUGGINGFACE_MODEL_HUB = "Huggingface" ) // This is inspired by https://github.com/InftyAI/llmaz. @@ -75,6 +77,7 @@ type TorrentSpec struct { // URI *URIProtocol `json:"uri,omitempty"` // Replicas represents the replication number of each object. + // The real Replicas number could be greater than the desired Replicas. // +kubebuilder:default=1 // +kubebuilder:validation:Maximum=99 // +optional @@ -100,11 +103,10 @@ const ( type ChunkStatus struct { // Name represents the name of the chunk. - // The chunk name is formatted as: ----, - // e.g. "945c19bff66ba533eb2032a33dcc6281c4a1e032--0210--02", which means: + // The chunk name is formatted as: --, + // e.g. "945c19bff66ba533eb2032a33dcc6281c4a1e032--0210", which means: // - the object hash is 945c19bff66ba533eb2032a33dcc6281c4a1e032 // - the chunk is the second chunk of the total 10 chunks - // - the replica number 2 means the object has at least 3 replicas Name string `json:"name"` // State represents the state of the chunk, whether in downloading // or downloaded ready. @@ -130,16 +132,9 @@ type ObjectStatus struct { // Type represents the object type, limits to file or directory. // +kubebuilder:validation:Enum={file,directory} Type ObjectType `json:"type"` - - // TODO: for embedding files. - // Objects []ObjectStatus `json:"objects,omitempty"` } type RepoStatus struct { - // RepoName represents the repo name of the file, - // it could be nil once the file has no repo. - // +optional - Name *string `json:"name,omitempty"` // Objects represents the whole objects belongs to the repo. Objects []*ObjectStatus `json:"objects,omitempty"` } diff --git a/api/v1alpha1/zz_generated.deepcopy.go b/api/v1alpha1/zz_generated.deepcopy.go index abc5bcd..128d2fb 100644 --- a/api/v1alpha1/zz_generated.deepcopy.go +++ b/api/v1alpha1/zz_generated.deepcopy.go @@ -291,11 +291,6 @@ func (in *ReplicationStatus) DeepCopy() *ReplicationStatus { // DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil. func (in *RepoStatus) DeepCopyInto(out *RepoStatus) { *out = *in - if in.Name != nil { - in, out := &in.Name, &out.Name - *out = new(string) - **out = **in - } if in.Objects != nil { in, out := &in.Objects, &out.Objects *out = make([]*ObjectStatus, len(*in)) @@ -322,11 +317,16 @@ func (in *RepoStatus) DeepCopy() *RepoStatus { // DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil. func (in *Target) DeepCopyInto(out *Target) { *out = *in - if in.Address != nil { - in, out := &in.Address, &out.Address + if in.URI != nil { + in, out := &in.URI, &out.URI *out = new(string) **out = **in } + if in.ModelHub != nil { + in, out := &in.ModelHub, &out.ModelHub + *out = new(ModelHub) + (*in).DeepCopyInto(*out) + } } // DeepCopy is an autogenerated deepcopy function, copying the receiver, creating a new Target. diff --git a/cmd/main.go b/cmd/main.go index 9552858..6dcd707 100644 --- a/cmd/main.go +++ b/cmd/main.go @@ -155,5 +155,9 @@ func setupControllers(mgr ctrl.Manager, certsReady chan struct{}) { setupLog.Error(err, "unable to create webhook", "webhook", "Torrent") os.Exit(1) } + if err := webhook.SetupReplicationWebhook(mgr); err != nil { + setupLog.Error(err, "unable to create webhook", "webhook", "Replication") + os.Exit(1) + } } } diff --git a/config/crd/bases/manta.io_replications.yaml b/config/crd/bases/manta.io_replications.yaml index 9979a85..298001a 100644 --- a/config/crd/bases/manta.io_replications.yaml +++ b/config/crd/bases/manta.io_replications.yaml @@ -39,18 +39,9 @@ spec: spec: description: ReplicationSpec defines the desired state of Replication properties: - chunkName: - description: ChunkName represents the target chunk name. - type: string nodeName: description: NodeName represents which node should do replication. type: string - objectPath: - description: ObjectPath represents the object path. - type: string - repoName: - description: RepoName represents the repo name. - type: string tuples: description: Tuples represents a slice of tuples. items: @@ -61,9 +52,43 @@ spec: Destination represents the destination of the file. If destination is nil, it means to delete the file. properties: - address: - description: Address represents the communication address - of the Pod. + modelHub: + description: |- + ModelHub represents the model registry for model downloads. + ModelHub and address are exclusive. + properties: + filename: + description: |- + Filename refers to a specified model file rather than the whole repo. + This is helpful to download a specified GGUF model rather than downloading + the whole repo which includes all kinds of quantized models. + in the near future. + type: string + modelID: + description: |- + ModelID refers to the model identifier on model hub, + such as meta-llama/Meta-Llama-3-8B. + type: string + name: + default: Huggingface + description: Name refers to the model registry, such + as huggingface. + enum: + - Huggingface + type: string + revision: + default: main + description: Revision refers to a Git revision id which + can be a branch name, a tag, or a commit hash. + type: string + required: + - modelID + type: object + uri: + description: |- + URI represents the file address with different storage. + - oss://./ + - localhost:// type: string type: object source: @@ -71,9 +96,43 @@ spec: Source represents the source file. Source couldn't be nil. properties: - address: - description: Address represents the communication address - of the Pod. + modelHub: + description: |- + ModelHub represents the model registry for model downloads. + ModelHub and address are exclusive. + properties: + filename: + description: |- + Filename refers to a specified model file rather than the whole repo. + This is helpful to download a specified GGUF model rather than downloading + the whole repo which includes all kinds of quantized models. + in the near future. + type: string + modelID: + description: |- + ModelID refers to the model identifier on model hub, + such as meta-llama/Meta-Llama-3-8B. + type: string + name: + default: Huggingface + description: Name refers to the model registry, such + as huggingface. + enum: + - Huggingface + type: string + revision: + default: main + description: Revision refers to a Git revision id which + can be a branch name, a tag, or a commit hash. + type: string + required: + - modelID + type: object + uri: + description: |- + URI represents the file address with different storage. + - oss://./ + - localhost:// type: string type: object required: @@ -81,9 +140,7 @@ spec: type: object type: array required: - - chunkName - nodeName - - objectPath type: object status: description: ReplicationStatus defines the observed state of Replication diff --git a/config/crd/bases/manta.io_torrents.yaml b/config/crd/bases/manta.io_torrents.yaml index 796d8a9..7e7efc2 100644 --- a/config/crd/bases/manta.io_torrents.yaml +++ b/config/crd/bases/manta.io_torrents.yaml @@ -87,7 +87,9 @@ spec: type: string replicas: default: 1 - description: Replicas represents the replication number of each object. + description: |- + Replicas represents the replication number of each object. + The real Replicas number could be greater than the desired Replicas. format: int32 maximum: 99 type: integer @@ -155,11 +157,6 @@ spec: repo: description: Repo tracks the objects belong to the source. properties: - name: - description: |- - RepoName represents the repo name of the file, - it could be nil once the file has no repo. - type: string objects: description: Objects represents the whole objects belongs to the repo. @@ -174,11 +171,10 @@ spec: name: description: |- Name represents the name of the chunk. - The chunk name is formatted as: ----, - e.g. "945c19bff66ba533eb2032a33dcc6281c4a1e032--0210--02", which means: + The chunk name is formatted as: --, + e.g. "945c19bff66ba533eb2032a33dcc6281c4a1e032--0210", which means: - the object hash is 945c19bff66ba533eb2032a33dcc6281c4a1e032 - the chunk is the second chunk of the total 10 chunks - - the replica number 2 means the object has at least 3 replicas type: string sizeBytes: description: SizeBytes represents the chunk size. diff --git a/config/manager/kustomization.yaml b/config/manager/kustomization.yaml index beaa32a..f371afa 100644 --- a/config/manager/kustomization.yaml +++ b/config/manager/kustomization.yaml @@ -4,5 +4,5 @@ apiVersion: kustomize.config.k8s.io/v1beta1 kind: Kustomization images: - name: controller - newName: inftyai/llmaz - newTag: main + newName: inftyai/manta + newTag: 1013-01 diff --git a/config/webhook/manifests.yaml b/config/webhook/manifests.yaml index 43a48cd..96c579b 100644 --- a/config/webhook/manifests.yaml +++ b/config/webhook/manifests.yaml @@ -4,6 +4,26 @@ kind: MutatingWebhookConfiguration metadata: name: mutating-webhook-configuration webhooks: +- admissionReviewVersions: + - v1 + clientConfig: + service: + name: webhook-service + namespace: system + path: /mutate-manta-io-v1alpha1-replication + failurePolicy: Fail + name: mreplication.kb.io + rules: + - apiGroups: + - manta.io + apiVersions: + - v1alpha1 + operations: + - CREATE + - UPDATE + resources: + - replications + sideEffects: None - admissionReviewVersions: - v1 clientConfig: @@ -30,6 +50,26 @@ kind: ValidatingWebhookConfiguration metadata: name: validating-webhook-configuration webhooks: +- admissionReviewVersions: + - v1 + clientConfig: + service: + name: webhook-service + namespace: system + path: /validate-manta-io-v1alpha1-replication + failurePolicy: Fail + name: vreplication.kb.io + rules: + - apiGroups: + - manta.io + apiVersions: + - v1alpha1 + operations: + - CREATE + - UPDATE + resources: + - replications + sideEffects: None - admissionReviewVersions: - v1 clientConfig: diff --git a/go.mod b/go.mod index ae781d0..2e96685 100644 --- a/go.mod +++ b/go.mod @@ -13,8 +13,8 @@ require ( k8s.io/client-go v0.31.0 k8s.io/code-generator v0.31.0 k8s.io/klog/v2 v2.130.1 + k8s.io/utils v0.0.0-20240711033017-18e509b52bc8 sigs.k8s.io/controller-runtime v0.19.0 - sigs.k8s.io/structured-merge-diff/v4 v4.4.1 ) require ( @@ -67,13 +67,12 @@ require ( golang.org/x/tools v0.23.0 // indirect gomodules.xyz/jsonpatch/v2 v2.4.0 // indirect google.golang.org/protobuf v1.34.2 // indirect - gopkg.in/evanphx/json-patch.v4 v4.12.0 // indirect gopkg.in/inf.v0 v0.9.1 // indirect gopkg.in/yaml.v2 v2.4.0 // indirect gopkg.in/yaml.v3 v3.0.1 // indirect k8s.io/gengo/v2 v2.0.0-20240228010128-51d4e06bde70 // indirect k8s.io/kube-openapi v0.0.0-20240430033511-f0e62f92d13f // indirect - k8s.io/utils v0.0.0-20240711033017-18e509b52bc8 // indirect sigs.k8s.io/json v0.0.0-20221116044647-bc3834ca7abd // indirect + sigs.k8s.io/structured-merge-diff/v4 v4.4.1 // indirect sigs.k8s.io/yaml v1.4.0 // indirect ) diff --git a/pkg/controller/replication_controller.go b/pkg/controller/replication_controller.go index 8922573..4a44795 100644 --- a/pkg/controller/replication_controller.go +++ b/pkg/controller/replication_controller.go @@ -19,13 +19,18 @@ package controller import ( "context" + apimeta "k8s.io/apimachinery/pkg/api/meta" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/runtime" + "k8s.io/apimachinery/pkg/types" + "k8s.io/client-go/tools/record" + "k8s.io/klog/v2" ctrl "sigs.k8s.io/controller-runtime" "sigs.k8s.io/controller-runtime/pkg/client" "sigs.k8s.io/controller-runtime/pkg/controller" "sigs.k8s.io/controller-runtime/pkg/log" - inftyaicomv1alpha1 "github.com/inftyai/manta/api/v1alpha1" + api "github.com/inftyai/manta/api/v1alpha1" ) // ReplicationReconciler reconciles a Replication object @@ -34,23 +39,33 @@ type ReplicationReconciler struct { Scheme *runtime.Scheme } +func NewReplicationReconciler(client client.Client, scheme *runtime.Scheme, record record.EventRecorder) *TorrentReconciler { + return &TorrentReconciler{ + Client: client, + Scheme: scheme, + Record: record, + } +} + //+kubebuilder:rbac:groups=manta.io,resources=replications,verbs=get;list;watch;create;update;patch;delete //+kubebuilder:rbac:groups=manta.io,resources=replications/status,verbs=get;update;patch //+kubebuilder:rbac:groups=manta.io,resources=replications/finalizers,verbs=update -// Reconcile is part of the main kubernetes reconciliation loop which aims to -// move the current state of the cluster closer to the desired state. -// TODO(user): Modify the Reconcile function to compare the state specified by -// the Replication object against the actual cluster state, and then -// perform operations to make the cluster state reflect the state specified by -// the user. -// // For more details, check Reconcile and its Result here: // - https://pkg.go.dev/sigs.k8s.io/controller-runtime@v0.16.3/pkg/reconcile func (r *ReplicationReconciler) Reconcile(ctx context.Context, req ctrl.Request) (ctrl.Result, error) { - _ = log.FromContext(ctx) + logger := log.FromContext(ctx) + + replication := &api.Replication{} + if err := r.Get(ctx, types.NamespacedName{Name: req.Name}, replication); err != nil { + return ctrl.Result{}, client.IgnoreNotFound(err) + } - // TODO(user): your logic here + logger.V(10).Info("reconcile Replication", "Replication", klog.KObj(replication)) + + if setReplicationCondition(replication) { + return ctrl.Result{}, r.Status().Update(ctx, replication) + } return ctrl.Result{}, nil } @@ -58,7 +73,20 @@ func (r *ReplicationReconciler) Reconcile(ctx context.Context, req ctrl.Request) // SetupWithManager sets up the controller with the Manager. func (r *ReplicationReconciler) SetupWithManager(mgr ctrl.Manager) error { return ctrl.NewControllerManagedBy(mgr). - For(&inftyaicomv1alpha1.Replication{}). + For(&api.Replication{}). WithOptions(controller.Options{MaxConcurrentReconciles: 5}). Complete(r) } + +func setReplicationCondition(replication *api.Replication) (changed bool) { + if len(replication.Status.Conditions) == 0 { + condition := metav1.Condition{ + Type: api.PendingConditionType, + Status: metav1.ConditionTrue, + Reason: "Pending", + Message: "Waiting for downloading", + } + return apimeta.SetStatusCondition(&replication.Status.Conditions, condition) + } + return false +} diff --git a/pkg/controller/torrent_controller.go b/pkg/controller/torrent_controller.go index 25d927b..1c8b4d3 100644 --- a/pkg/controller/torrent_controller.go +++ b/pkg/controller/torrent_controller.go @@ -19,7 +19,6 @@ package controller import ( "context" "fmt" - "strings" "k8s.io/apimachinery/pkg/api/errors" apimeta "k8s.io/apimachinery/pkg/api/meta" @@ -55,10 +54,6 @@ func NewTorrentReconciler(client client.Client, scheme *runtime.Scheme, record r } } -func repoName(modelID string) string { - return "models--" + strings.ReplaceAll(modelID, "/", "--") -} - //+kubebuilder:rbac:groups=manta.io,resources=torrents,verbs=get;list;watch;create;update;patch;delete //+kubebuilder:rbac:groups=manta.io,resources=torrents/status,verbs=get;update;patch //+kubebuilder:rbac:groups=manta.io,resources=torrents/finalizers,verbs=update @@ -77,7 +72,7 @@ func (r *TorrentReconciler) Reconcile(ctx context.Context, req ctrl.Request) (ct // Handle Pending status. if torrent.Status.Repo == nil { - _ = setCondition(torrent) + _ = setTorrentCondition(torrent) // TODO: We only support modelHub right now, we need to support spec.URI in the future as well. objects, err := util.ListRepoObjects(torrent.Spec.ModelHub.ModelID, *torrent.Spec.ModelHub.Revision) @@ -104,7 +99,7 @@ func (r *TorrentReconciler) Reconcile(ctx context.Context, req ctrl.Request) (ct } } - conditionChanged := setCondition(torrent) + conditionChanged := setTorrentCondition(torrent) if torrentStatusChanged || conditionChanged { return ctrl.Result{}, r.Status().Update(ctx, torrent) } @@ -143,7 +138,7 @@ func (r *TorrentReconciler) SetupWithManager(mgr ctrl.Manager) error { Complete(r) } -func setCondition(torrent *api.Torrent) (changed bool) { +func setTorrentCondition(torrent *api.Torrent) (changed bool) { if torrent.Status.Repo == nil { condition := metav1.Condition{ Type: api.PendingConditionType, @@ -193,6 +188,8 @@ func constructRepoOfStatus(torrent *api.Torrent, objects []*util.ObjectBody) { repo := &api.RepoStatus{} if torrent.Spec.ModelHub.Filename != nil { + // The repo could contain multiple objects(files) in the same directory, but + // we only need one file. for _, obj := range objects { if obj.Path == *torrent.Spec.ModelHub.Filename { chunks := []*api.ChunkStatus{} @@ -213,8 +210,6 @@ func constructRepoOfStatus(torrent *api.Torrent, objects []*util.ObjectBody) { } } } else { - repoName := repoName(torrent.Spec.ModelHub.ModelID) - repo.Name = &repoName for _, obj := range objects { chunks := []*api.ChunkStatus{} chunks = append(chunks, &api.ChunkStatus{ diff --git a/pkg/dispatcher/dispatcher.go b/pkg/dispatcher/dispatcher.go index e7c0611..7d90bbb 100644 --- a/pkg/dispatcher/dispatcher.go +++ b/pkg/dispatcher/dispatcher.go @@ -17,15 +17,23 @@ limitations under the License. package dispatcher import ( - api "github.com/inftyai/manta/api/v1alpha1" - "github.com/inftyai/manta/pkg/util" + "errors" + "strings" + v1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/utils/ptr" + + api "github.com/inftyai/manta/api/v1alpha1" + "github.com/inftyai/manta/pkg/util" ) var _ Framework = &DefaultDownloader{} var _ Framework = &DefaultSyncer{} +const ( + localHost = "localhost://" +) + type DefaultDownloader struct { plugins []string } @@ -79,8 +87,9 @@ func NewDispatcher(downloadPlugins []string, syncPlugins []string) *Dispatcher { // PrepareReplications will construct the replications needed to created and // update the torrent status the same time. +// Note: make sure the same download/sync task will not be sent to the same node, +// or we have to introduce file lock when downloading chunks. func (d *Dispatcher) PrepareReplications(torrent *api.Torrent) ([]*api.Replication, bool, error) { - // Make sure this will not happen, just in case of panic. if torrent.Status.Repo == nil { return nil, false, nil } @@ -116,6 +125,13 @@ func buildReplication(torrent *api.Torrent, objPath string, chunkName string) (* return nil, err } + // Support modelHub only right now. + if torrent.Spec.ModelHub == nil { + return nil, errors.New("unimplemented") + } + + repoName := repoName(torrent.Spec.ModelHub) + return &api.Replication{ TypeMeta: v1.TypeMeta{ Kind: "Replication", @@ -138,21 +154,33 @@ func buildReplication(torrent *api.Torrent, objPath string, chunkName string) (* }, }, Spec: api.ReplicationSpec{ - NodeName: "unknown", - RepoName: torrent.Status.Repo.Name, - ObjectPath: objPath, - ChunkName: chunkName, + // TODO: + NodeName: "unknown", Tuples: []api.Tuple{ { - // TODO: source could be local or remote Source: api.Target{ - Address: ptr.To[string]("unknown"), + ModelHub: &api.ModelHub{ + Name: torrent.Spec.ModelHub.Name, + ModelID: torrent.Spec.ModelHub.ModelID, + // TODO: support multiple chunks for one file in the future. + Filename: &objPath, + Revision: torrent.Spec.ModelHub.Revision, + }, }, Destination: &api.Target{ - Address: ptr.To[string]("unknown"), + URI: ptr.To[string](localHost + api.DefaultWorkspace + repoName + "/blobs/" + chunkName), }, }, }, }, }, nil } + +func repoName(modelHub *api.ModelHub) string { + if modelHub.Filename != nil { + splits := strings.Split(*modelHub.Filename, ".") + return splits[0] + } + + return strings.ReplaceAll(modelHub.ModelID, "/", "--") +} diff --git a/pkg/webhook/replication_webhook.go b/pkg/webhook/replication_webhook.go new file mode 100644 index 0000000..1bdbc1c --- /dev/null +++ b/pkg/webhook/replication_webhook.go @@ -0,0 +1,91 @@ +/* +Copyright 2024. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package webhook + +import ( + "context" + + "k8s.io/apimachinery/pkg/runtime" + "k8s.io/apimachinery/pkg/util/validation/field" + ctrl "sigs.k8s.io/controller-runtime" + "sigs.k8s.io/controller-runtime/pkg/webhook" + "sigs.k8s.io/controller-runtime/pkg/webhook/admission" + + api "github.com/inftyai/manta/api/v1alpha1" +) + +type ReplicationWebhook struct{} + +// SetupTorrentWebhook will setup the manager to manage the webhooks +func SetupReplicationWebhook(mgr ctrl.Manager) error { + return ctrl.NewWebhookManagedBy(mgr). + For(&api.Replication{}). + WithDefaulter(&ReplicationWebhook{}). + WithValidator(&ReplicationWebhook{}). + Complete() +} + +//+kubebuilder:webhook:path=/mutate-manta-io-v1alpha1-replication,mutating=true,failurePolicy=fail,sideEffects=None,groups=manta.io,resources=replications,verbs=create;update,versions=v1alpha1,name=mreplication.kb.io,admissionReviewVersions=v1 + +var _ webhook.CustomDefaulter = &ReplicationWebhook{} + +// Default implements webhook.Defaulter so a webhook will be registered for the type +func (w *ReplicationWebhook) Default(ctx context.Context, obj runtime.Object) error { + return nil +} + +//+kubebuilder:webhook:path=/validate-manta-io-v1alpha1-replication,mutating=false,failurePolicy=fail,sideEffects=None,groups=manta.io,resources=replications,verbs=create;update,versions=v1alpha1,name=vreplication.kb.io,admissionReviewVersions=v1 + +var _ webhook.CustomValidator = &ReplicationWebhook{} + +// ValidateCreate implements webhook.Validator so a webhook will be registered for the type +func (w *ReplicationWebhook) ValidateCreate(ctx context.Context, obj runtime.Object) (admission.Warnings, error) { + allErrs := w.generateValidate(obj) + return nil, allErrs.ToAggregate() +} + +// ValidateUpdate implements webhook.Validator so a webhook will be registered for the type +func (w *ReplicationWebhook) ValidateUpdate(ctx context.Context, oldObj, newObj runtime.Object) (admission.Warnings, error) { + allErrs := w.generateValidate(newObj) + return nil, allErrs.ToAggregate() +} + +// ValidateDelete implements webhook.Validator so a webhook will be registered for the type +func (w *ReplicationWebhook) ValidateDelete(ctx context.Context, obj runtime.Object) (admission.Warnings, error) { + return nil, nil +} + +func (w *ReplicationWebhook) generateValidate(obj runtime.Object) field.ErrorList { + replication := obj.(*api.Replication) + specPath := field.NewPath("spec") + + var allErrs field.ErrorList + if len(replication.Spec.Tuples) == 0 { + allErrs = append(allErrs, field.Forbidden(specPath.Child("tuples"), "tuples couldn't be null")) + } + + for _, tuple := range replication.Spec.Tuples { + if tuple.Destination != nil && tuple.Destination.ModelHub == nil && tuple.Destination.URI == nil || + tuple.Source.ModelHub == nil && tuple.Source.URI == nil { + allErrs = append(allErrs, field.Forbidden(specPath.Child("tuples"), "modelHub and URI couldn't be both null in Destination")) + } + if tuple.Source.ModelHub == nil && tuple.Source.URI == nil { + allErrs = append(allErrs, field.Forbidden(specPath.Child("tuples"), "modelHub and URI couldn't be both null in Source")) + } + } + return allErrs +} diff --git a/test/integration/webhook/replication_test.go b/test/integration/webhook/replication_test.go new file mode 100644 index 0000000..59d75ab --- /dev/null +++ b/test/integration/webhook/replication_test.go @@ -0,0 +1,75 @@ +/* +Copyright 2024. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package webhook + +import ( + "github.com/onsi/ginkgo/v2" + "github.com/onsi/gomega" + + api "github.com/inftyai/manta/api/v1alpha1" + "github.com/inftyai/manta/test/util/wrapper" +) + +// TODO: once source modelHub != nil, it's address must not be nil. +// TODO: add validation to URI, must be :// +var _ = ginkgo.Describe("Replication default and validation", func() { + + // Delete all the Replications for each case. + ginkgo.AfterEach(func() { + var replications api.ReplicationList + gomega.Expect(k8sClient.List(ctx, &replications)).To(gomega.Succeed()) + + for _, torrent := range replications.Items { + gomega.Expect(k8sClient.Delete(ctx, &torrent)).To(gomega.Succeed()) + } + }) + + type testValidatingCase struct { + replication func() *api.Replication + failed bool + } + ginkgo.DescribeTable("test validating", + func(tc *testValidatingCase) { + if tc.failed { + gomega.Expect(k8sClient.Create(ctx, tc.replication())).Should(gomega.HaveOccurred()) + } else { + gomega.Expect(k8sClient.Create(ctx, tc.replication())).To(gomega.Succeed()) + } + }, + ginkgo.Entry("replication with modelHub set", &testValidatingCase{ + replication: func() *api.Replication { + return wrapper.MakeReplication("fake-replication").SourceOfModelHub("Huggingface", "Qwen/Qwen2-7B-Instruct", "", "").Obj() + }, + failed: false, + }), + ginkgo.Entry("replication with modelHub and URI unset", &testValidatingCase{ + replication: func() *api.Replication { + replication := wrapper.MakeReplication("fake-replication").Obj() + tuple := api.Tuple{Source: api.Target{}} + replication.Spec.Tuples = []api.Tuple{tuple} + return replication + }, + failed: true, + }), + ginkgo.Entry("replication with empty Tuples", &testValidatingCase{ + replication: func() *api.Replication { + return wrapper.MakeReplication("fake-replication").Obj() + }, + failed: true, + }), + ) +}) diff --git a/test/integration/webhook/suit_test.go b/test/integration/webhook/suit_test.go index cb21638..8345abe 100644 --- a/test/integration/webhook/suit_test.go +++ b/test/integration/webhook/suit_test.go @@ -118,6 +118,8 @@ var _ = BeforeSuite(func() { err = apiwebhook.SetupTorrentWebhook(mgr) Expect(err).NotTo(HaveOccurred()) + err = apiwebhook.SetupReplicationWebhook(mgr) + Expect(err).NotTo(HaveOccurred()) //+kubebuilder:scaffold:webhook diff --git a/test/integration/webhook/torrent_test.go b/test/integration/webhook/torrent_test.go index e425bc0..2a23317 100644 --- a/test/integration/webhook/torrent_test.go +++ b/test/integration/webhook/torrent_test.go @@ -24,9 +24,9 @@ import ( "github.com/inftyai/manta/test/util/wrapper" ) -var _ = ginkgo.Describe("model default and validation", func() { +var _ = ginkgo.Describe("Torrent default and validation", func() { - // Delete all the Models for each case. + // Delete all the Torrents for each case. ginkgo.AfterEach(func() { var torrents api.TorrentList gomega.Expect(k8sClient.List(ctx, &torrents)).To(gomega.Succeed()) @@ -40,7 +40,6 @@ var _ = ginkgo.Describe("model default and validation", func() { torrent func() *api.Torrent failed bool } - // TODO: add more testCases to cover update. ginkgo.DescribeTable("test validating", func(tc *testValidatingCase) { if tc.failed { diff --git a/test/util/validation/validate_torrent.go b/test/util/validation/validate_torrent.go index e1c9f6a..25675ee 100644 --- a/test/util/validation/validate_torrent.go +++ b/test/util/validation/validate_torrent.go @@ -60,6 +60,10 @@ func ValidateTorrentStatusEqualTo(ctx context.Context, k8sClient client.Client, } } + if conditionType != api.DownloadConditionType { + return nil + } + replications := api.ReplicationList{} selector := labels.SelectorFromSet(labels.Set{api.TorrentNameLabelKey: torrent.Name}) if err := k8sClient.List(ctx, &replications, &client.ListOptions{ @@ -69,9 +73,7 @@ func ValidateTorrentStatusEqualTo(ctx context.Context, k8sClient client.Client, } // TODO: refactor this part once we support multi-chunks per file. - if len(torrent.Status.Repo.Objects)*int(*torrent.Spec.Replicas) != len(replications.Items) { - return errors.New("unexpected Replication number") - } + // TODO: validate replicas return nil }, util.Timeout, util.Interval).Should(gomega.Succeed()) diff --git a/test/util/wrapper/replication.go b/test/util/wrapper/replication.go new file mode 100644 index 0000000..4a36705 --- /dev/null +++ b/test/util/wrapper/replication.go @@ -0,0 +1,83 @@ +/* +Copyright 2024. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package wrapper + +import ( + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + + api "github.com/inftyai/manta/api/v1alpha1" +) + +type ReplicationWrapper struct { + api.Replication +} + +func MakeReplication(name string) *ReplicationWrapper { + return &ReplicationWrapper{ + api.Replication{ + ObjectMeta: metav1.ObjectMeta{ + Name: name, + }, + }, + } +} + +func (w *ReplicationWrapper) Obj() *api.Replication { + return &w.Replication +} + +func (w *ReplicationWrapper) NodeName(name string) *ReplicationWrapper { + w.Spec.NodeName = name + return w +} + +// Only one tuple be default. +func (w *ReplicationWrapper) SourceOfModelHub(name, modelID, revision, filename string) *ReplicationWrapper { + source := api.Target{ + ModelHub: &api.ModelHub{ + ModelID: modelID, + }, + } + if name != "" { + source.ModelHub.Name = &name + } + if revision != "" { + source.ModelHub.Revision = &revision + } + if filename != "" { + source.ModelHub.Filename = &filename + } + if len(w.Spec.Tuples) == 0 { + w.Spec.Tuples = append(w.Spec.Tuples, api.Tuple{Source: source}) + } else { + w.Spec.Tuples[0].Source = source + } + return w +} + +// Only one tuple be default. +func (w *ReplicationWrapper) DestinationOfAddress(address string) *ReplicationWrapper { + destination := api.Target{ + URI: &address, + } + if len(w.Spec.Tuples) == 0 { + w.Spec.Tuples = append(w.Spec.Tuples, api.Tuple{Destination: &destination}) + } else { + w.Spec.Tuples[0].Destination = &destination + } + return w +}