Skip to content

Commit

Permalink
[1/N] Dispatcher implementattion
Browse files Browse the repository at this point in the history
Signed-off-by: kerthcet <[email protected]>
  • Loading branch information
kerthcet committed Oct 22, 2024
1 parent 840d1a9 commit 47305b1
Show file tree
Hide file tree
Showing 42 changed files with 1,603 additions and 362 deletions.
1 change: 0 additions & 1 deletion .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,6 @@ Dockerfile.cross
*~
.DS_Store
artifacts
cache

__pycache__
*.pyc
Expand Down
2 changes: 1 addition & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -23,4 +23,4 @@ _Name Story: the inspiration of the name `Manta` is coming from Dota2, called [M

![architecture](./docs/assets/arch.png)

> Note: [llmaz](https://github.com/InftyAI/llmaz) is just one kind of integrations, **Manta** can be deployed independently.
> Note: [llmaz](https://github.com/InftyAI/llmaz) is just one kind of integrations, **Manta** can be deployed and used independently.
6 changes: 5 additions & 1 deletion agent/deploy/daemonset.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ spec:
mountPath: /workspace/models
containers:
- name: agent
image: inftyai/manta-agent:1016-06
image: inftyai/manta-agent:1022-01
ports:
- containerPort: 8080
resources:
Expand All @@ -36,6 +36,10 @@ spec:
valueFrom:
fieldRef:
fieldPath: spec.nodeName
# Set the nodeTracker.spec.sizeLimit.
- name: SIZE_LIMIT
value: "990Mi"
# If you have GFW problem in china.
- name: HF_ENDPOINT
value: https://hf-mirror.com
volumeMounts:
Expand Down
13 changes: 9 additions & 4 deletions agent/pkg/controller/replication_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ import (
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/types"
"k8s.io/klog/v2"
"k8s.io/utils/ptr"
ctrl "sigs.k8s.io/controller-runtime"
"sigs.k8s.io/controller-runtime/pkg/client"
"sigs.k8s.io/controller-runtime/pkg/controller"
Expand All @@ -51,8 +52,8 @@ func NewReplicationReconciler(client client.Client, scheme *runtime.Scheme) *Rep
}
}

// For more details, check Reconcile and its Result here:
// - https://pkg.go.dev/sigs.k8s.io/[email protected]/pkg/reconcile
// Agent Replication reconciler only focuses on downloading and replicating process, not interested in the
// Replication lifecycle management.
func (r *ReplicationReconciler) Reconcile(ctx context.Context, req ctrl.Request) (ctrl.Result, error) {
logger := log.FromContext(ctx)

Expand All @@ -64,7 +65,6 @@ func (r *ReplicationReconciler) Reconcile(ctx context.Context, req ctrl.Request)
// Filter out unrelated events.
if replication.Spec.NodeName != NODE_NAME ||
replicationReady(replication) ||
replication.DeletionTimestamp != nil ||
len(replication.Status.Conditions) == 0 {
logger.V(10).Info("Skip replication", "Replication", klog.KObj(replication))
return ctrl.Result{}, nil
Expand Down Expand Up @@ -106,7 +106,10 @@ func (r *ReplicationReconciler) updateNodeTracker(ctx context.Context, replicati
return nil
}
}
nodeTracker.Spec.Chunks = append(nodeTracker.Spec.Chunks, api.ChunkTracker{ChunkName: chunkName})
nodeTracker.Spec.Chunks = append(nodeTracker.Spec.Chunks, api.ChunkTracker{
ChunkName: chunkName,
SizeBytes: replication.Spec.SizeBytes,
})
if err := r.Client.Update(ctx, nodeTracker); err != nil {
return err
}
Expand All @@ -129,6 +132,7 @@ func setReplicationCondition(replication *api.Replication, conditionType string)
Reason: "Downloading",
Message: "Downloading chunks",
}
replication.Status.Phase = ptr.To[string](api.DownloadConditionType)
return apimeta.SetStatusCondition(&replication.Status.Conditions, condition)
}

Expand All @@ -139,6 +143,7 @@ func setReplicationCondition(replication *api.Replication, conditionType string)
Reason: "Ready",
Message: "Download chunks successfully",
}
replication.Status.Phase = ptr.To[string](api.ReadyConditionType)
return apimeta.SetStatusCondition(&replication.Status.Conditions, condition)
}

Expand Down
36 changes: 28 additions & 8 deletions agent/pkg/task/task.go
Original file line number Diff line number Diff line change
Expand Up @@ -85,14 +85,14 @@ func syncChunks(ctx context.Context, c client.Client) {
} else {
logger.Info("Syncing the chunks")

if chunkNames, err := walkThroughChunks(workspace); err != nil {
if chunks, err := walkThroughChunks(workspace); err != nil {
logger.Error(err, "Failed to walk through chunks")
} else {
nodeTracker := &api.NodeTracker{}
if err := c.Get(ctx, types.NamespacedName{Name: os.Getenv("NODE_NAME")}, nodeTracker); err != nil {
logger.Error(err, "Failed to get nodeTracker", "nodeTracker", os.Getenv("NODE_NAME"))
} else {
UpdateChunks(nodeTracker, chunkNames)
UpdateChunks(nodeTracker, chunks)
if err := c.Update(ctx, nodeTracker); err != nil {
logger.Error(err, "Failed to update nodeTracker", "NodeTracker", nodeTracker.Name)
}
Expand All @@ -105,12 +105,13 @@ func syncChunks(ctx context.Context, c client.Client) {
}
}

func UpdateChunks(nt *api.NodeTracker, chunkNames []string) {
nt.Spec.Chunks = make([]api.ChunkTracker, 0, len(chunkNames))
for _, name := range chunkNames {
func UpdateChunks(nt *api.NodeTracker, chunks []chunkInfo) {
nt.Spec.Chunks = make([]api.ChunkTracker, 0, len(chunks))
for _, chunk := range chunks {
nt.Spec.Chunks = append(nt.Spec.Chunks,
api.ChunkTracker{
ChunkName: name,
ChunkName: chunk.Name,
SizeBytes: chunk.SizeBytes,
},
)
}
Expand Down Expand Up @@ -138,6 +139,7 @@ func findOrCreateNodeTracker(ctx context.Context, c client.Client) error {
}

nodeTracker.Name = nodeName
nodeTracker.Labels = node.Labels
nodeTracker.OwnerReferences = []v1.OwnerReference{
{
Kind: "Node",
Expand All @@ -148,13 +150,24 @@ func findOrCreateNodeTracker(ctx context.Context, c client.Client) error {
Controller: ptr.To(true),
},
}

sizeLimit := os.Getenv("SIZE_LIMIT")
if sizeLimit != "" {
nodeTracker.Spec.SizeLimit = ptr.To[string](sizeLimit)
}

return c.Create(newCtx, &nodeTracker)
}

return nil
}

func walkThroughChunks(path string) (chunks []string, err error) {
type chunkInfo struct {
Name string
SizeBytes int64
}

func walkThroughChunks(path string) (chunks []chunkInfo, err error) {
fileMap := make(map[string]struct{})

repos, err := os.ReadDir(path)
Expand Down Expand Up @@ -199,12 +212,19 @@ func walkThroughChunks(path string) (chunks []string, err error) {
if err != nil {
return nil, err
}
fileInfo, err := os.Stat(filePath)
if err != nil {
return nil, err
}

chunkName := filepath.Base(targetPath)

// To avoid duplicated files
if _, ok := fileMap[chunkName]; !ok {
chunks = append(chunks, chunkName)
chunks = append(chunks, chunkInfo{
Name: chunkName,
SizeBytes: fileInfo.Size(),
})
fileMap[chunkName] = struct{}{}
}
}
Expand Down
2 changes: 1 addition & 1 deletion agent/pkg/task/task_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@ func Test_walkThroughChunks(t *testing.T) {
t.Error(err)
}

wantFiles := []string{"blob1", "blob2", "blob-same", "blobA", "blobB"}
wantFiles := []chunkInfo{{Name: "blob1"}, {Name: "blob2"}, {Name: "blob-same"}, {Name: "blobA"}, {Name: "blobB"}}

if diff := cmp.Diff(chunks, wantFiles); diff != "" {
t.Errorf("unexpected files, diff %v", diff)
Expand Down
7 changes: 7 additions & 0 deletions api/v1alpha1/nodetracker_types.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,8 @@ import (
type ChunkTracker struct {
// ChunkName represents the name of the chunk.
ChunkName string `json:"chunkName"`
// SizeBytes represents the chunk size.
SizeBytes int64 `json:"sizeBytes"`
}

// NodeTrackerSpec defines the desired state of NodeTracker
Expand All @@ -34,6 +36,11 @@ type NodeTrackerSpec struct {
// Chunks represents a list of chunks replicated in this node.
// +optional
Chunks []ChunkTracker `json:"chunks,omitempty"`
// SizeLimit sets the maximum memory reserved for chunks.
// If nil, means no limit here, use the whole disk,
// use 1Tib instead right now.
// +optional
SizeLimit *string `json:"sizeLimit,omitempty"`
}

// NodeTrackerStatus defines the observed state of NodeTracker
Expand Down
5 changes: 5 additions & 0 deletions api/v1alpha1/replication_types.go
Original file line number Diff line number Diff line change
Expand Up @@ -64,11 +64,16 @@ const (
type ReplicationStatus struct {
// Conditions represents the Torrent condition.
Conditions []metav1.Condition `json:"conditions,omitempty"`
// Phase represents the current state.
// +optional
Phase *string `json:"phase,omitempty"`
}

//+kubebuilder:object:root=true
//+kubebuilder:subresource:status
//+kubebuilder:resource:scope=Cluster
//+kubebuilder:printcolumn:name="node",type=string,JSONPath=".spec.nodeName"
//+kubebuilder:printcolumn:name="phase",type=string,JSONPath=".status.phase"

// Replication is the Schema for the replications API
type Replication struct {
Expand Down
11 changes: 7 additions & 4 deletions api/v1alpha1/torrent_types.go
Original file line number Diff line number Diff line change
Expand Up @@ -107,12 +107,11 @@ type ChunkStatus struct {
// - the object hash is 945c19bff66ba533eb2032a33dcc6281c4a1e032
// - the chunk is the second chunk of the total 10 chunks
Name string `json:"name"`
// State represents the state of the chunk, whether in downloading
// or downloaded ready.
// Note that once all the Replicas are replicated, the State will transmit into Ready.
State TrackerState `json:"state"`
// SizeBytes represents the chunk size.
SizeBytes int64 `json:"sizeBytes"`
// State represents the state of the chunk, whether in pending or tracked already.
// Chunks in Pending state will bring in Replication creations.
State TrackerState `json:"state"`
}

type ObjectType string
Expand Down Expand Up @@ -155,11 +154,15 @@ type TorrentStatus struct {
Conditions []metav1.Condition `json:"conditions,omitempty"`
// Repo tracks the objects belong to the source.
Repo *RepoStatus `json:"repo,omitempty"`
// Phase represents the current state.
// +optional
Phase *string `json:"phase,omitempty"`
}

//+kubebuilder:object:root=true
//+kubebuilder:subresource:status
//+kubebuilder:resource:scope=Cluster
//+kubebuilder:printcolumn:name="Phase",type=string,JSONPath=".status.phase"

// Torrent is the Schema for the torrents API
type Torrent struct {
Expand Down
36 changes: 24 additions & 12 deletions cmd/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,10 @@ import (
inftyaicomv1alpha1 "github.com/inftyai/manta/api/v1alpha1"
"github.com/inftyai/manta/pkg/cert"
"github.com/inftyai/manta/pkg/controller"
"github.com/inftyai/manta/pkg/dispatcher"
"github.com/inftyai/manta/pkg/dispatcher/framework"
"github.com/inftyai/manta/pkg/dispatcher/plugins/diskaware"
"github.com/inftyai/manta/pkg/dispatcher/plugins/nodeselector"
"github.com/inftyai/manta/pkg/webhook"
//+kubebuilder:scaffold:imports
)
Expand Down Expand Up @@ -128,24 +132,32 @@ func setupControllers(mgr ctrl.Manager, certsReady chan struct{}) {
<-certsReady
setupLog.Info("certs ready")

if err := (&controller.ReplicationReconciler{
Client: mgr.GetClient(),
Scheme: mgr.GetScheme(),
}).SetupWithManager(mgr); err != nil {
dispatcher, err := dispatcher.NewDispatcher([]framework.RegisterFunc{nodeselector.New, diskaware.New}, []framework.RegisterFunc{})
if err != nil {
setupLog.Error(err, "unable to create dispatcher")
os.Exit(1)
}

if err := controller.NewReplicationReconciler(
mgr.GetClient(),
mgr.GetScheme(),
).SetupWithManager(mgr); err != nil {
setupLog.Error(err, "unable to create controller", "controller", "Replication")
os.Exit(1)
}
if err := (&controller.NodeTrackerReconciler{
Client: mgr.GetClient(),
Scheme: mgr.GetScheme(),
}).SetupWithManager(mgr); err != nil {
if err := controller.NewNodeTrackerReconciler(
mgr.GetClient(),
mgr.GetScheme(),
dispatcher,
).SetupWithManager(mgr); err != nil {
setupLog.Error(err, "unable to create controller", "controller", "NodeTracker")
os.Exit(1)
}
if err := (&controller.TorrentReconciler{
Client: mgr.GetClient(),
Scheme: mgr.GetScheme(),
}).SetupWithManager(mgr); err != nil {
if err := controller.NewTorrentReconciler(
mgr.GetClient(),
mgr.GetScheme(),
dispatcher,
).SetupWithManager(mgr); err != nil {
setupLog.Error(err, "unable to create controller", "controller", "Torrent")
os.Exit(1)
}
Expand Down
11 changes: 11 additions & 0 deletions config/crd/bases/manta.io_nodetrackers.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -53,10 +53,21 @@ spec:
chunkName:
description: ChunkName represents the name of the chunk.
type: string
sizeBytes:
description: SizeBytes represents the chunk size.
format: int64
type: integer
required:
- chunkName
- sizeBytes
type: object
type: array
sizeLimit:
description: |-
SizeLimit sets the maximum memory reserved for chunks.
If nil, means no limit here, use the whole disk,
use 1Tib instead right now.
type: string
type: object
status:
description: NodeTrackerStatus defines the observed state of NodeTracker
Expand Down
12 changes: 11 additions & 1 deletion config/crd/bases/manta.io_replications.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,14 @@ spec:
singular: replication
scope: Cluster
versions:
- name: v1alpha1
- additionalPrinterColumns:
- jsonPath: .spec.nodeName
name: node
type: string
- jsonPath: .status.phase
name: phase
type: string
name: v1alpha1
schema:
openAPIV3Schema:
description: Replication is the Schema for the replications API
Expand Down Expand Up @@ -203,6 +210,9 @@ spec:
- type
type: object
type: array
phase:
description: Phase represents the current state.
type: string
type: object
type: object
served: true
Expand Down
14 changes: 10 additions & 4 deletions config/crd/bases/manta.io_torrents.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,11 @@ spec:
singular: torrent
scope: Cluster
versions:
- name: v1alpha1
- additionalPrinterColumns:
- jsonPath: .status.phase
name: Phase
type: string
name: v1alpha1
schema:
openAPIV3Schema:
description: Torrent is the Schema for the torrents API
Expand Down Expand Up @@ -154,6 +158,9 @@ spec:
- type
type: object
type: array
phase:
description: Phase represents the current state.
type: string
repo:
description: Repo tracks the objects belong to the source.
properties:
Expand Down Expand Up @@ -182,9 +189,8 @@ spec:
type: integer
state:
description: |-
State represents the state of the chunk, whether in downloading
or downloaded ready.
Note that once all the Replicas are replicated, the State will transmit into Ready.
State represents the state of the chunk, whether in pending or tracked already.
Chunks in Pending state will bring in Replication creations.
type: string
required:
- name
Expand Down
Loading

0 comments on commit 47305b1

Please sign in to comment.