Skip to content

Commit

Permalink
[1/N] Implement agent: download handler
Browse files Browse the repository at this point in the history
Signed-off-by: kerthcet <[email protected]>
  • Loading branch information
kerthcet committed Oct 13, 2024
1 parent 4d0a92b commit 96314f6
Show file tree
Hide file tree
Showing 30 changed files with 1,102 additions and 101 deletions.
4 changes: 0 additions & 4 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,3 @@ __pycache__
*.tgz
tmp
bin

# Added by cargo

/target
1 change: 1 addition & 0 deletions .golangci.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ run:
- api
- cmd
- pkg
- agent
- test

issues:
Expand Down
35 changes: 35 additions & 0 deletions Dockerfile.agent
Original file line number Diff line number Diff line change
@@ -0,0 +1,35 @@
ARG BASE_IMAGE
ARG BUILDER_IMAGE

# Build the manager binary
FROM ${BUILDER_IMAGE} as builder
ARG TARGETOS
ARG TARGETARCH

WORKDIR /workspace
# Copy the Go Modules manifests
COPY go.mod go.mod
COPY go.sum go.sum
# cache deps before building and copying source so that we don't need to re-download as much
# and so that source changes don't invalidate our downloaded layer
RUN go mod download

# Copy the go source
COPY api/ api/
COPY agent/ agent/

# Build
# the GOARCH has not a default value to allow the binary be built according to the host where the command
# was called. For example, if we call make docker-build in a local env which has the Apple Silicon M1 SO
# the docker BUILDPLATFORM arg will be linux/arm64 when for Apple x86 it will be linux/amd64. Therefore,
# by leaving it empty we can ensure that the container and binary shipped on it will have the same platform.
RUN CGO_ENABLED=0 GOOS=${TARGETOS:-linux} GOARCH=${TARGETARCH} go build -a -o manager agent/cmd/main.go

# Use distroless as minimal base image to package the manager binary
# Refer to https://github.com/GoogleContainerTools/distroless for more details
FROM ${BASE_IMAGE}
WORKDIR /
COPY --from=builder /workspace/manager .
USER 65532:65532

ENTRYPOINT ["/manager"]
26 changes: 25 additions & 1 deletion Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -61,8 +61,11 @@ IMAGE_BUILD_EXTRA_OPTS ?=
IMAGE_REGISTRY ?= inftyai
IMAGE_NAME ?= manta
IMAGE_REPO := $(IMAGE_REGISTRY)/$(IMAGE_NAME)
AGENT_IMAGE_NAME ?= manta-agent
AGENT_IMAGE_REPO := $(IMAGE_REGISTRY)/$(AGENT_IMAGE_NAME)
GIT_TAG ?= $(shell git describe --tags --dirty --always)
IMG ?= $(IMAGE_REPO):$(GIT_TAG)
AGENT_IMG ?= $(AGENT_IMAGE_REPO):$(GIT_TAG)
BUILDER_IMAGE ?= golang:$(GO_VERSION)
KIND_CLUSTER_NAME ?= kind

Expand Down Expand Up @@ -108,7 +111,7 @@ gotestsum: ## Download gotestsum locally if necessary.

.PHONY: test
test: manifests fmt vet envtest gotestsum ## Run tests.
$(GOTESTSUM) --junitfile $(ARTIFACTS)/junit.xml -- ./api/... ./pkg/... -coverprofile $(ARTIFACTS)/cover.out
$(GOTESTSUM) --junitfile $(ARTIFACTS)/junit.xml -- ./api/... ./pkg/... ./agent/... -coverprofile $(ARTIFACTS)/cover.out

.PHONY: test-integration
test-integration: manifests fmt vet envtest ginkgo ## Run integration tests.
Expand Down Expand Up @@ -174,6 +177,19 @@ image-load: image-build
image-push: IMAGE_BUILD_EXTRA_OPTS=--push
image-push: image-build

.PHONY: agent-image-build
agent-image-build:
$(IMAGE_BUILD_CMD) -t $(AGENT_IMG) \
-f Dockerfile.agent \
--build-arg BASE_IMAGE=$(BASE_IMAGE) \
--build-arg BUILDER_IMAGE=$(BUILDER_IMAGE) \
--build-arg CGO_ENABLED=$(CGO_ENABLED) \
$(IMAGE_BUILD_EXTRA_OPTS) ./
agent-image-load: IMAGE_BUILD_EXTRA_OPTS=--load
agent-image-load: agent-image-build
agent-image-push: IMAGE_BUILD_EXTRA_OPTS=--push
agent-image-push: agent-image-build

KIND = $(shell pwd)/bin/kind
.PHONY: kind
kind:
Expand Down Expand Up @@ -287,3 +303,11 @@ helm-package: helm

# To recover values.yaml
make helm

.PHONY: deploy-agent
deploy-agent:
$(KUBECTL) apply -f ./agent/deploy

.PHONY: undeploy-agent
undeploy-agent:
$(KUBECTL) delete -f ./agent/deploy
107 changes: 107 additions & 0 deletions agent/cmd/main.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,107 @@
/*
Copyright 2024.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/

package main

import (
"context"
"os"

"github.com/go-logr/logr"
apimeta "k8s.io/apimachinery/pkg/api/meta"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/client-go/tools/cache"
"k8s.io/klog/v2"
ctrl "sigs.k8s.io/controller-runtime"
"sigs.k8s.io/controller-runtime/pkg/client/config"
"sigs.k8s.io/controller-runtime/pkg/log/zap"
"sigs.k8s.io/controller-runtime/pkg/manager"

"github.com/inftyai/manta/agent/handler"
api "github.com/inftyai/manta/api/v1alpha1"
)

var (
setupLog logr.Logger
)

func main() {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()

ctrl.SetLogger(zap.New(zap.UseDevMode(true)))
setupLog = ctrl.Log.WithName("setup")

cfg, err := config.GetConfig()
if err != nil {
setupLog.Error(err, "failed to get config")
os.Exit(1)
}

setupLog.Info("Setting up manta-agent")

scheme := runtime.NewScheme()
_ = api.AddToScheme(scheme)

mgr, err := manager.New(cfg, manager.Options{
Scheme: scheme,
})
if err != nil {
setupLog.Error(err, "failed to initialize the manager")
os.Exit(1)
}

informer, err := mgr.GetCache().GetInformer(ctx, &api.Replication{})
if err != nil {
setupLog.Error(err, "failed to get the informer")
os.Exit(1)
}

if _, err := informer.AddEventHandler(cache.ResourceEventHandlerFuncs{
AddFunc: func(obj interface{}) {
replication := obj.(*api.Replication)
setupLog.Info("Add Event for Replication", "Replication", klog.KObj(replication))

// Injected by downward API.
nodeName := os.Getenv("NODE_NAME")
// Filter out unrelated events.
if nodeName != replication.Spec.NodeName || replicationReady(replication) {
return
}
handler.HandleAddEvent(replication)
},
UpdateFunc: func(oldObj, newObj interface{}) {
},
DeleteFunc: func(obj interface{}) {
replication := obj.(*api.Replication)
setupLog.Info("Delete Event for Replication", "Replication", klog.KObj(replication))
// TODO: delete the file by the policy.
},
}); err != nil {
setupLog.Error(err, "failed to add event handlers")
os.Exit(1)
}

setupLog.Info("Starting informers")
if err := mgr.GetCache().Start(ctx); err != nil {
setupLog.Error(err, "failed to start informers")
os.Exit(1)
}
}

func replicationReady(replication *api.Replication) bool {
return apimeta.IsStatusConditionTrue(replication.Status.Conditions, api.ReadyConditionType)
}
49 changes: 49 additions & 0 deletions agent/deploy/daemonset.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,49 @@
apiVersion: apps/v1
kind: DaemonSet
metadata:
name: manta-agent
labels:
app: manta-agent
spec:
selector:
matchLabels:
app: manta-agent
template:
metadata:
labels:
app: manta-agent
spec:
serviceAccountName: manta-agent
initContainers:
- name: init-permissions
image: busybox
command: ['sh', '-c', 'chown -R 1000:3000 /workspace/models && chmod -R 777 /workspace/models']
volumeMounts:
- name: model-volume
mountPath: /workspace/models
containers:
- name: agent
image: inftyai/manta-agent:1013-08
ports:
- containerPort: 8080
resources:
limits:
memory: 200Mi
requests:
cpu: 100m
env:
- name: NODE_NAME
valueFrom:
fieldRef:
fieldPath: spec.nodeName
volumeMounts:
- name: model-volume
mountPath: /workspace/models
securityContext:
runAsUser: 1000
runAsGroup: 3000
volumes:
- name: model-volume
hostPath:
path: /mnt/models
type: DirectoryOrCreate
33 changes: 33 additions & 0 deletions agent/deploy/service-account.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,33 @@
apiVersion: v1
kind: ServiceAccount
metadata:
name: manta-agent
---
apiVersion: rbac.authorization.k8s.io/v1
kind: ClusterRole
metadata:
name: manta-agent-role
rules:
- apiGroups:
- "manta.io"
resources:
- replications
verbs:
- get
- list
- watch
- update
- patch
---
apiVersion: rbac.authorization.k8s.io/v1
kind: ClusterRoleBinding
metadata:
name: manta-agent-binding
subjects:
- kind: ServiceAccount
name: manta-agent
namespace: manta-system
roleRef:
kind: ClusterRole
name: manta-agent-role
apiGroup: rbac.authorization.k8s.io
35 changes: 35 additions & 0 deletions agent/handler/blob.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,35 @@
/*
Copyright 2024.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/

package handler

import (
"os"

api "github.com/inftyai/manta/api/v1alpha1"
)

func blobExists(repoName, chunkName string) bool {
workspace := api.DefaultWorkspace
if ws := os.Getenv("WORKSPACE"); ws != "" {
workspace = ws
}
path := workspace + repoName + "/blobs/" + chunkName
if _, err := os.Stat(path); err != nil {
return false
}
return true
}
54 changes: 54 additions & 0 deletions agent/handler/blob_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,54 @@
/*
Copyright 2024.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/

package handler

import (
"os"
"testing"
)

func TestBlobExists(t *testing.T) {
if err := os.Setenv("WORKSPACE", "../../tmp/workspace/models/"); err != nil {
t.Fail()
}

path := "../../tmp/workspace/models/fakeRepo/blobs/"
filename := "fakeChunk"

if blobExists("fakeRepo", "fakeChunk") {
t.Error("blob should be not exist")
}

if err := os.MkdirAll(path, os.ModePerm); err != nil {
t.Errorf("run mkdir failed: %v", err)
}

file, err := os.Create(path + filename)
if err != nil {
t.Error("failed to create file")
}
defer func() {
_ = file.Close()
}()
defer func() {
_ = os.RemoveAll("../../tmp/workspace")
}()

if !blobExists("fakeRepo", "fakeChunk") {
t.Error("blob should be exist")
}
}
Loading

0 comments on commit 96314f6

Please sign in to comment.