Skip to content

Commit

Permalink
Add sync logic
Browse files Browse the repository at this point in the history
Signed-off-by: kerthcet <[email protected]>
  • Loading branch information
kerthcet committed Nov 9, 2024
1 parent 566e401 commit 6cb257f
Show file tree
Hide file tree
Showing 32 changed files with 440 additions and 251 deletions.
3 changes: 3 additions & 0 deletions Dockerfile.agent
Original file line number Diff line number Diff line change
Expand Up @@ -32,4 +32,7 @@ WORKDIR /
COPY --from=builder /workspace/manager .
USER 65532:65532

# Expose the http server.
EXPOSE 9090

ENTRYPOINT ["/manager"]
8 changes: 4 additions & 4 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@
</p>

<h3 align="center">
A lightweight P2P-based cache system for model distributions.
A lightweight P2P-based cache system for model distributions on Kubernetes.
</h3>

[![stability-alpha](https://img.shields.io/badge/stability-alpha-f4d03f.svg)](https://github.com/mkenney/software-guides/blob/master/STABILITY-BADGES.md#alpha)
Expand Down Expand Up @@ -39,7 +39,7 @@ _Name Story: the inspiration of the name `Manta` is coming from Dota2, called [M

Read the [Installation](./docs//installation.md) for guidance.

### Preload Models
### Preheat Models

A toy sample to preload the `Qwen/Qwen2.5-0.5B-Instruct` model:

Expand Down Expand Up @@ -69,6 +69,8 @@ spec:
zone: zone-a
```

### Delete Models

If you want to remove the model weights once `Torrent` is deleted, set the `ReclaimPolicy=Delete`, default to `Retain`:

```yaml
Expand All @@ -80,8 +82,6 @@ spec:
replicas: 1
hub:
repoID: Qwen/Qwen2.5-0.5B-Instruct
nodeSelector:
zone: zone-a
reclaimPolicy: Delete
```

Expand Down
6 changes: 4 additions & 2 deletions agent/deploy/daemonset.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ spec:
labels:
app: manta-agent
spec:
hostNetwork: true
serviceAccountName: manta-agent
initContainers:
- name: init-permissions
Expand All @@ -24,9 +25,10 @@ spec:
mountPath: /workspace/models
containers:
- name: agent
image: inftyai/manta-agent:v0.0.1
# image: inftyai/manta-agent:v0.0.1
image: inftyai/test:manta-agent-110810
ports:
- containerPort: 8080
- containerPort: 9090
resources:
limits:
memory: 200Mi
Expand Down
1 change: 1 addition & 0 deletions agent/pkg/controller/replication_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -83,6 +83,7 @@ func (r *ReplicationReconciler) Reconcile(ctx context.Context, req ctrl.Request)

// This may take a long time, the concurrency is controlled by the MaxConcurrentReconciles.
if err := handler.HandleReplication(ctx, replication); err != nil {
logger.Error(err, "error to handle replication", "Replication", klog.KObj(replication))
return ctrl.Result{}, err
} else {
if err := r.updateNodeTracker(ctx, replication); err != nil {
Expand Down
24 changes: 15 additions & 9 deletions agent/pkg/handler/chunk_handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,8 @@ import (
"io"
"net/http"
"os"

"github.com/inftyai/manta/api"
)

const (
Expand All @@ -30,6 +32,7 @@ const (
// SendChunk will send the chunk content via http request.
func SendChunk(w http.ResponseWriter, r *http.Request) {
path := r.URL.Query().Get("path")

if path == "" {
http.Error(w, "path is required", http.StatusBadRequest)
return
Expand All @@ -45,25 +48,29 @@ func SendChunk(w http.ResponseWriter, r *http.Request) {
buffer := make([]byte, buffSize)
for {
n, err := file.Read(buffer)
if err != nil {
if err == io.EOF {
break
} else {
fmt.Println("Error reading file")
http.Error(w, "Error reading file", http.StatusInternalServerError)
return
}
}

if n > 0 {
_, writeErr := w.Write(buffer[:n])
if writeErr != nil {
fmt.Println("Error writing to response:", writeErr)
http.Error(w, "Error writing to response", http.StatusInternalServerError)
return
}
}
if err == io.EOF {
break
}
if err != nil {
http.Error(w, "Error reading file", http.StatusInternalServerError)
return
}
}
}

func recvChunk(blobPath, snapshotPath, peerName string) error {
url := fmt.Sprintf("http://%s:8080/sync?path=%s", peerName, blobPath)
url := fmt.Sprintf("http://%s:%s/sync?path=%s", peerName, api.HttpPort, blobPath)

resp, err := http.Get(url)
if err != nil {
Expand All @@ -87,6 +94,5 @@ func recvChunk(blobPath, snapshotPath, peerName string) error {
return err
}

fmt.Println("Chunk synced successfully")
return nil
}
17 changes: 12 additions & 5 deletions agent/pkg/handler/replication_handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,8 @@ func HandleReplication(ctx context.Context, replication *api.Replication) error

if replication.Spec.Source.URI != nil {
host, _ := parseURI(*replication.Spec.Source.URI)
if host == api.URI_REMOTE {
// TODO: handel other uris.
if host != api.URI_LOCALHOST {
return syncChunk(ctx, replication)
}
// TODO: handle uri with object store.
Expand Down Expand Up @@ -77,18 +78,17 @@ func downloadChunk(ctx context.Context, replication *api.Replication) error {
// TODO: handle modelScope
}
// TODO: Handle address
logger.Info("download file successfully", "file", filename)
}

// symlink can helps to validate the file is downloaded successfully.
// symlink can help to validate the file is downloaded successfully.
// TODO: once we support split a file to several chunks, the targetPath should be
// changed here, such as targetPath-0001.
if err := createSymlink(blobPath, targetPath); err != nil {
logger.Error(err, "failed to create symlink")
return err
}

logger.Info("create symlink successfully", "file", filename)
logger.Info("download chunk successfully", "file", filename)
return nil
}

Expand All @@ -97,12 +97,19 @@ func syncChunk(ctx context.Context, replication *api.Replication) error {

logger.Info("start to sync chunks", "Replication", klog.KObj(replication))

// The source URI looks like remote://node@<path-to-your-file>
sourceSplits := strings.Split(*replication.Spec.Source.URI, "://")
addresses := strings.Split(sourceSplits[1], "@")
nodeName, blobPath := addresses[0], addresses[1]
// The destination URI looks like localhost://<path-to-your-file>
destSplits := strings.Split(*replication.Spec.Destination.URI, "://")
if err := recvChunk(sourceSplits[1], destSplits[1], replication.Spec.NodeName); err != nil {

if err := recvChunk(blobPath, destSplits[1], nodeName); err != nil {
logger.Error(err, "failed to sync chunk")
return err
}

logger.Info("sync chunk successfully", "Replication", klog.KObj(replication), "target", destSplits[0])
return nil
}

Expand Down
16 changes: 10 additions & 6 deletions agent/pkg/server/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,11 +18,12 @@ package server

import (
"context"
"fmt"
"net/http"
"time"

"github.com/inftyai/manta/agent/pkg/handler"
"github.com/inftyai/manta/api"
"sigs.k8s.io/controller-runtime/pkg/log"
)

func Run(ctx context.Context) {
Expand All @@ -31,13 +32,14 @@ func Run(ctx context.Context) {

mux := http.NewServeMux()
mux.HandleFunc("/sync", handler.SendChunk)
server := &http.Server{Addr: ":8080", Handler: mux}
server := &http.Server{Addr: ":" + api.HttpPort, Handler: mux}

go func() {
fmt.Println("Server started on port 8080")
logger := log.FromContext(ctx)
logger.Info("Server started on port 9090")

if err := server.ListenAndServe(); err != nil {
fmt.Printf("ListenAndServe error: %s\n", err)
logger.Error(err, "listen and server error")
cancel()
}
}()
Expand All @@ -47,9 +49,11 @@ func Run(ctx context.Context) {
ctxShutdown, cancelShutdown := context.WithTimeout(context.Background(), 5*time.Second)
defer cancelShutdown()

logger := log.FromContext(ctx)

if err := server.Shutdown(ctxShutdown); err != nil {
fmt.Printf("Server shutdown error: %s\n", err)
logger.Error(err, "server shutdown error")
}

fmt.Println("Server shutdown successfully")
logger.Info("server shutdown successfully")
}
4 changes: 2 additions & 2 deletions agent/pkg/task/task.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,14 +33,14 @@ import (
"sigs.k8s.io/controller-runtime/pkg/client"
"sigs.k8s.io/controller-runtime/pkg/log/zap"

"github.com/inftyai/manta/agent/pkg/util"
cons "github.com/inftyai/manta/api"
api "github.com/inftyai/manta/api/v1alpha1"
)

const (
syncDuration = 5 * time.Minute

workspace = util.DefaultWorkspace
workspace = cons.DefaultWorkspace
)

var (
Expand Down
5 changes: 4 additions & 1 deletion agent/pkg/util/consts.go → api/consts.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,8 +14,11 @@ See the License for the specific language governing permissions and
limitations under the License.
*/

package util
package api

// Shared by both controller plane and agent.

const (
DefaultWorkspace = "/workspace/models/"
HttpPort = "9090"
)
1 change: 1 addition & 0 deletions api/v1alpha1/replication_types.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ type Target struct {
// URI represents the file address with different storages, e.g.:
// - oss://<bucket>.<endpoint>/<path-to-your-file>
// - localhost://<path-to-your-file>
// - remote://<node-name>@<path-to-your-file>
// Note: if it's a folder, all the files under the folder will be considered,
// otherwise, only one file will be replicated.
URI *string `json:"uri,omitempty"`
Expand Down
10 changes: 8 additions & 2 deletions api/v1alpha1/torrent_types.go
Original file line number Diff line number Diff line change
Expand Up @@ -84,6 +84,8 @@ type TorrentSpec struct {
// +optional
Replicas *int32 `json:"replicas,omitempty"`
// ReclaimPolicy represents how to handle the file replicas when Torrent is deleted.
// Be careful to use the Delete policy because once two Torrents refer to the same
// repo, delete one Torrent will remove the whole files.
// +kubebuilder:default=Retain
// +kubebuilder:validation:Enum={Retain,Delete}
// +optional
Expand All @@ -97,8 +99,11 @@ type TorrentSpec struct {
type TrackerState string

const (
PendingTrackerState TrackerState = "Pending"
ReadyTrackerState TrackerState = "Ready"
// Pending means the chunk is waiting for downloading.
PendingTrackerState TrackerState = "Pending"
// Ready means the chunk is ready for downloading or downloaded.
ReadyTrackerState TrackerState = "Ready"
// Deleting means the chunk is being removed.
DeletingTrackerState TrackerState = "Deleting"
)

Expand Down Expand Up @@ -167,6 +172,7 @@ type TorrentStatus struct {
//+kubebuilder:subresource:status
//+kubebuilder:resource:scope=Cluster
//+kubebuilder:printcolumn:name="Phase",type=string,JSONPath=".status.phase"
//+kubebuilder:printcolumn:name="Age",type=date,JSONPath=".metadata.creationTimestamp"

// Torrent is the Schema for the torrents API
type Torrent struct {
Expand Down
8 changes: 4 additions & 4 deletions cmd/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ import (

// Import all Kubernetes client auth plugins (e.g. Azure, GCP, OIDC, etc.)
// to ensure that exec-entrypoint and run can make use of them.

_ "k8s.io/client-go/plugin/pkg/client/auth"

"k8s.io/apimachinery/pkg/runtime"
Expand Down Expand Up @@ -60,14 +61,13 @@ func main() {
var metricsAddr string
var enableLeaderElection bool
var probeAddr string

flag.StringVar(&metricsAddr, "metrics-bind-address", ":8080", "The address the metric endpoint binds to.")
flag.StringVar(&probeAddr, "health-probe-bind-address", ":8081", "The address the probe endpoint binds to.")
flag.BoolVar(&enableLeaderElection, "leader-elect", false,
"Enable leader election for controller manager. "+
"Enabling this will ensure there is only one active controller manager.")
opts := zap.Options{
Development: true,
}
opts := zap.Options{}
opts.BindFlags(flag.CommandLine)
flag.Parse()

Expand Down Expand Up @@ -133,7 +133,7 @@ func setupControllers(mgr ctrl.Manager, certsReady chan struct{}) {
<-certsReady
setupLog.Info("certs ready")

dispatcher, err := dispatcher.NewDispatcher([]framework.RegisterFunc{nodeselector.New, diskaware.New}, []framework.RegisterFunc{gnumber.New})
dispatcher, err := dispatcher.NewDispatcher([]framework.RegisterFunc{nodeselector.New, diskaware.New, gnumber.New})
if err != nil {
setupLog.Error(err, "unable to create dispatcher")
os.Exit(1)
Expand Down
12 changes: 6 additions & 6 deletions config/crd/bases/manta.io_replications.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -88,9 +88,9 @@ spec:
uri:
description: "URI represents the file address with different storages,
e.g.:\n\t - oss://<bucket>.<endpoint>/<path-to-your-file>\n\t
- localhost://<path-to-your-file>\nNote: if it's a folder, all
the files under the folder will be considered,\notherwise, only
one file will be replicated."
- localhost://<path-to-your-file>\n\t - remote://<node-name>@<path-to-your-file>\nNote:
if it's a folder, all the files under the folder will be considered,\notherwise,
only one file will be replicated."
type: string
type: object
nodeName:
Expand Down Expand Up @@ -139,9 +139,9 @@ spec:
uri:
description: "URI represents the file address with different storages,
e.g.:\n\t - oss://<bucket>.<endpoint>/<path-to-your-file>\n\t
- localhost://<path-to-your-file>\nNote: if it's a folder, all
the files under the folder will be considered,\notherwise, only
one file will be replicated."
- localhost://<path-to-your-file>\n\t - remote://<node-name>@<path-to-your-file>\nNote:
if it's a folder, all the files under the folder will be considered,\notherwise,
only one file will be replicated."
type: string
type: object
required:
Expand Down
9 changes: 7 additions & 2 deletions config/crd/bases/manta.io_torrents.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,9 @@ spec:
- jsonPath: .status.phase
name: Phase
type: string
- jsonPath: .metadata.creationTimestamp
name: Age
type: date
name: v1alpha1
schema:
openAPIV3Schema:
Expand Down Expand Up @@ -83,8 +86,10 @@ spec:
type: object
reclaimPolicy:
default: Retain
description: ReclaimPolicy represents how to handle the file replicas
when Torrent is deleted.
description: |-
ReclaimPolicy represents how to handle the file replicas when Torrent is deleted.
Be careful to use the Delete policy because once two Torrents refer to the same
repo, delete one Torrent will remove the whole files.
enum:
- Retain
- Delete
Expand Down
1 change: 1 addition & 0 deletions config/default/manager_auth_proxy_patch.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -37,3 +37,4 @@ spec:
- "--health-probe-bind-address=:8081"
- "--metrics-bind-address=127.0.0.1:8080"
- "--leader-elect"
- "--zap-log-level=2"
2 changes: 1 addition & 1 deletion config/manager/kustomization.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -5,4 +5,4 @@ kind: Kustomization
images:
- name: controller
newName: inftyai/manta
newTag: v0.0.1
newTag: "110905"
Loading

0 comments on commit 6cb257f

Please sign in to comment.