diff --git a/scheduler/pkg/agent/server.go b/scheduler/pkg/agent/server.go index 447115ec4a..7aa0e9c555 100644 --- a/scheduler/pkg/agent/server.go +++ b/scheduler/pkg/agent/server.go @@ -265,10 +265,11 @@ func (s *Server) Sync(modelName string) { } as.mutex.Lock() + model := latestModel.GetModel() err = as.stream.Send(&pb.ModelOperationMessage{ Operation: pb.ModelOperationMessage_LOAD_MODEL, - ModelVersion: &pb.ModelVersion{Model: latestModel.GetModel(), Version: latestModel.GetVersion()}, - AutoscalingEnabled: AutoscalingEnabled(latestModel.GetModel()) && s.autoscalingServiceEnabled, + ModelVersion: &pb.ModelVersion{Model: model, Version: latestModel.GetVersion()}, + AutoscalingEnabled: util.AutoscalingEnabled(model.DeploymentSpec.GetMinReplicas(), model.DeploymentSpec.GetMaxReplicas()) && s.autoscalingServiceEnabled, }) as.mutex.Unlock() if err != nil { @@ -605,13 +606,13 @@ func calculateDesiredNumReplicas(model *pbs.Model, trigger pb.ModelScalingTrigge // which is hidden in this logic unfortunately as we reject the scaling up / down event. // a side effect is that we do not go below 1 replica of a model func checkModelScalingWithinRange(model *pbs.Model, targetNumReplicas int) error { - if !AutoscalingEnabled(model) { - return fmt.Errorf("No autoscaling for model %s", model.GetMeta().GetName()) - } - minReplicas := model.DeploymentSpec.GetMinReplicas() maxReplicas := model.DeploymentSpec.GetMaxReplicas() + if !util.AutoscalingEnabled(minReplicas, maxReplicas) { + return fmt.Errorf("No autoscaling for model %s", model.GetMeta().GetName()) + } + if targetNumReplicas < int(minReplicas) || (targetNumReplicas < 1) { return fmt.Errorf("Violating min replicas %d / %d for model %s", minReplicas, targetNumReplicas, model.GetMeta().GetName()) } @@ -622,18 +623,3 @@ func checkModelScalingWithinRange(model *pbs.Model, targetNumReplicas int) error return nil } - -// if min and max replicas are not set, we do not allow autoscaling -// we check that they are not set if they are equal to zero as per -// `GetMinReplicas` and `GetMaxReplicas` definition -func AutoscalingEnabled(model *pbs.Model) bool { - minReplicas := model.DeploymentSpec.GetMinReplicas() - maxReplicas := model.DeploymentSpec.GetMaxReplicas() - - if (minReplicas == 0) && (maxReplicas == 0) { - // no autoscaling - return false - } else { - return true - } -} diff --git a/scheduler/pkg/agent/server_test.go b/scheduler/pkg/agent/server_test.go index 4ef760f807..11c19f39af 100644 --- a/scheduler/pkg/agent/server_test.go +++ b/scheduler/pkg/agent/server_test.go @@ -21,7 +21,6 @@ import ( "google.golang.org/grpc" "google.golang.org/grpc/credentials/insecure" - "github.com/seldonio/seldon-core/apis/go/v2/mlops/agent" pb "github.com/seldonio/seldon-core/apis/go/v2/mlops/agent" pbs "github.com/seldonio/seldon-core/apis/go/v2/mlops/scheduler" @@ -29,6 +28,7 @@ import ( testing_utils "github.com/seldonio/seldon-core/scheduler/v2/pkg/internal/testing_utils" "github.com/seldonio/seldon-core/scheduler/v2/pkg/scheduler" "github.com/seldonio/seldon-core/scheduler/v2/pkg/store" + "github.com/seldonio/seldon-core/scheduler/v2/pkg/util" ) type mockScheduler struct{} @@ -952,7 +952,7 @@ func TestAutoscalingEnabled(t *testing.T) { for _, test := range tests { t.Run(test.name, func(t *testing.T) { - enabled := AutoscalingEnabled(test.model) + enabled := util.AutoscalingEnabled(test.model.DeploymentSpec.MinReplicas, test.model.DeploymentSpec.MaxReplicas) g.Expect(enabled).To(Equal(test.enabled)) }) } @@ -1009,13 +1009,13 @@ func TestSubscribe(t *testing.T) { getStream := func(id uint32, context context.Context, port int) *grpc.ClientConn { conn, _ := grpc.NewClient(fmt.Sprintf(":%d", port), grpc.WithTransportCredentials(insecure.NewCredentials())) - grpcClient := agent.NewAgentServiceClient(conn) + grpcClient := pb.NewAgentServiceClient(conn) _, _ = grpcClient.Subscribe( context, - &agent.AgentSubscribeRequest{ + &pb.AgentSubscribeRequest{ ServerName: "dummy", ReplicaIdx: id, - ReplicaConfig: &agent.ReplicaConfig{}, + ReplicaConfig: &pb.ReplicaConfig{}, Shared: true, AvailableMemoryBytes: 0, }, diff --git a/scheduler/pkg/proxy/server.go b/scheduler/pkg/proxy/server.go index 934552a5ef..bea3faaebc 100644 --- a/scheduler/pkg/proxy/server.go +++ b/scheduler/pkg/proxy/server.go @@ -21,8 +21,7 @@ import ( pba "github.com/seldonio/seldon-core/apis/go/v2/mlops/agent" pb "github.com/seldonio/seldon-core/apis/go/v2/mlops/proxy" pbs "github.com/seldonio/seldon-core/apis/go/v2/mlops/scheduler" - - "github.com/seldonio/seldon-core/scheduler/v2/pkg/agent" + "github.com/seldonio/seldon-core/scheduler/v2/pkg/util" ) type ProxyServer struct { @@ -56,11 +55,12 @@ func (p *ProxyServer) Start(port uint) error { } func (p *ProxyServer) LoadModel(ctx context.Context, r *pb.LoadModelRequest) (*pb.LoadModelResponse, error) { + model := r.GetRequest().GetModel() m := ModelEvent{ ModelOperationMessage: &pba.ModelOperationMessage{ Operation: pba.ModelOperationMessage_LOAD_MODEL, - ModelVersion: &pba.ModelVersion{Model: r.GetRequest().GetModel(), Version: r.GetVersion()}, - AutoscalingEnabled: agent.AutoscalingEnabled(r.GetRequest().GetModel()), + ModelVersion: &pba.ModelVersion{Model: model, Version: r.GetVersion()}, + AutoscalingEnabled: util.AutoscalingEnabled(model.DeploymentSpec.MinReplicas, model.DeploymentSpec.MaxReplicas), }, } p.modelEvents <- m @@ -74,7 +74,8 @@ func (p *ProxyServer) UnloadModel(ctx context.Context, r *pb.UnloadModelRequest) Operation: pba.ModelOperationMessage_UNLOAD_MODEL, ModelVersion: &pba.ModelVersion{ Model: &pbs.Model{Meta: &pbs.MetaData{Name: r.GetModel().GetName()}}, - Version: r.GetVersion()}, + Version: r.GetVersion(), + }, }, } p.modelEvents <- m diff --git a/scheduler/pkg/scheduler/filters/interface.go b/scheduler/pkg/scheduler/filters/interface.go index ece0ddb956..fc9f65fc31 100644 --- a/scheduler/pkg/scheduler/filters/interface.go +++ b/scheduler/pkg/scheduler/filters/interface.go @@ -9,9 +9,7 @@ the Change License after the Change Date as each is defined in accordance with t package filters -import ( - "github.com/seldonio/seldon-core/scheduler/v2/pkg/store" -) +import "github.com/seldonio/seldon-core/scheduler/v2/pkg/store" type ReplicaFilter interface { Name() string diff --git a/scheduler/pkg/scheduler/scheduler.go b/scheduler/pkg/scheduler/scheduler.go index 8b4132c813..ecba18ed2f 100644 --- a/scheduler/pkg/scheduler/scheduler.go +++ b/scheduler/pkg/scheduler/scheduler.go @@ -173,6 +173,7 @@ func (s *SimpleScheduler) scheduleToServer(modelName string) error { logger. WithField("candidate_servers", filteredServers). WithField("desired_replicas", desiredReplicas). + WithField("min_replicas", minReplicas). Debug("Identified candidate servers for model") // The main logic of trying to find a server for the model is as follows: diff --git a/scheduler/pkg/server/server.go b/scheduler/pkg/server/server.go index d53ea26600..d67471002e 100644 --- a/scheduler/pkg/server/server.go +++ b/scheduler/pkg/server/server.go @@ -47,9 +47,7 @@ const ( sendTimeout = 30 * time.Second // Timeout for sending events to subscribers via grpc `sendMsg` ) -var ( - ErrAddServerEmptyServerName = status.Errorf(codes.FailedPrecondition, "Empty server name passed") -) +var ErrAddServerEmptyServerName = status.Errorf(codes.FailedPrecondition, "Empty server name passed") type SchedulerServer struct { pb.UnimplementedSchedulerServer @@ -445,7 +443,7 @@ func (s *SchedulerServer) ServerStatus( } for _, s := range servers { - resp := createServerStatusResponse(s) + resp := createServerStatusUpdateResponse(s) err := stream.Send(resp) if err != nil { return status.Errorf(codes.Internal, err.Error()) @@ -458,7 +456,7 @@ func (s *SchedulerServer) ServerStatus( if err != nil { return status.Errorf(codes.FailedPrecondition, err.Error()) } - resp := createServerStatusResponse(server) + resp := createServerStatusUpdateResponse(server) err = stream.Send(resp) if err != nil { return status.Errorf(codes.Internal, err.Error()) @@ -467,7 +465,7 @@ func (s *SchedulerServer) ServerStatus( } } -func createServerStatusResponse(s *store.ServerSnapshot) *pb.ServerStatusResponse { +func createServerStatusUpdateResponse(s *store.ServerSnapshot) *pb.ServerStatusResponse { // note we dont count draining replicas in available replicas resp := &pb.ServerStatusResponse{ diff --git a/scheduler/pkg/server/server_status.go b/scheduler/pkg/server/server_status.go index 52a58cdd77..18d45528bd 100644 --- a/scheduler/pkg/server/server_status.go +++ b/scheduler/pkg/server/server_status.go @@ -15,6 +15,8 @@ import ( pb "github.com/seldonio/seldon-core/apis/go/v2/mlops/scheduler" "github.com/seldonio/seldon-core/scheduler/v2/pkg/coordinator" + "github.com/seldonio/seldon-core/scheduler/v2/pkg/store" + "github.com/seldonio/seldon-core/scheduler/v2/pkg/util" ) func (s *SchedulerServer) SubscribeModelStatus(req *pb.ModelSubscriptionRequest, stream pb.Scheduler_SubscribeModelStatusServer) error { @@ -164,7 +166,7 @@ func (s *SchedulerServer) SubscribeServerStatus(req *pb.ServerSubscriptionReques } func (s *SchedulerServer) handleModelEventForServerStatus(event coordinator.ModelEventMsg) { - logger := s.logger.WithField("func", "handleServerEvent") + logger := s.logger.WithField("func", "handleModelEventForServerStatus") logger.Debugf("Got server state change for %s", event.String()) err := s.updateServerModelsStatus(event) @@ -182,7 +184,7 @@ func (s *SchedulerServer) StopSendServerEvents() { } func (s *SchedulerServer) updateServerModelsStatus(evt coordinator.ModelEventMsg) error { - logger := s.logger.WithField("func", "sendServerStatusEvent") + logger := s.logger.WithField("func", "updateServerModelStatus") model, err := s.modelStore.GetModel(evt.ModelName) if err != nil { @@ -198,6 +200,19 @@ func (s *SchedulerServer) updateServerModelsStatus(evt coordinator.ModelEventMsg return nil } + latestModel := model.GetLatest() + + if latestModel != nil && latestModel.GetVersion() == evt.ModelVersion && + util.AutoscalingEnabled(latestModel.GetDeploymentSpec().MinReplicas, latestModel.GetDeploymentSpec().MaxReplicas) && + latestModel.DesiredReplicas() > int(latestModel.ModelState().AvailableReplicas) { + err := s.incrementExpectedReplicas(latestModel) + if err != nil { + return err + } else { + return nil + } + } + s.serverEventStream.pendingLock.Lock() // we are coalescing events so we only send one event (the latest status) per server s.serverEventStream.pendingEvents[modelVersion.Server()] = struct{}{} @@ -206,6 +221,27 @@ func (s *SchedulerServer) updateServerModelsStatus(evt coordinator.ModelEventMsg } s.serverEventStream.pendingLock.Unlock() + return err +} + +func (s *SchedulerServer) incrementExpectedReplicas(latestModel *store.ModelVersion) error { + // TODO: should there be some sort of velocity check ? + logger := s.logger.WithField("func", "incrementExpectedReplicas") + logger.Debugf("will attempt to scale servers to %d for %v", latestModel.DesiredReplicas(), latestModel) + + server, err := s.modelStore.GetServer(latestModel.Server(), true, true) + if err != nil { + return err + } + newExpectedReplicas := server.ExpectedReplicas + 1 + if newExpectedReplicas < latestModel.DesiredReplicas() { + newExpectedReplicas = latestModel.DesiredReplicas() + } + ssr := createServerStatusUpdateResponse(server) + ssr.ExpectedReplicas = int32(newExpectedReplicas) + ssr.Type = pb.ServerStatusResponse_ScalingRequest + s.sendServerStatusResponse(ssr) + return nil } @@ -228,21 +264,24 @@ func (s *SchedulerServer) sendServerStatus() { logger.Errorf("Failed to get server %s", serverName) continue } - ssr := createServerStatusResponse(server) + ssr := createServerStatusUpdateResponse(server) + s.sendServerStatusResponse(ssr) + } +} - for stream, subscription := range s.serverEventStream.streams { - hasExpired, err := sendWithTimeout(func() error { return stream.Send(ssr) }, s.timeout) - if hasExpired { - // this should trigger a reconnect from the client - close(subscription.fin) - delete(s.serverEventStream.streams, stream) - } - if err != nil { - logger.WithError(err).Errorf("Failed to send server status event to %s", subscription.name) - } +func (s *SchedulerServer) sendServerStatusResponse(ssr *pb.ServerStatusResponse) { + logger := s.logger.WithField("func", "sendServerStatusResponse") + for stream, subscription := range s.serverEventStream.streams { + hasExpired, err := sendWithTimeout(func() error { return stream.Send(ssr) }, s.timeout) + if hasExpired { + // this should trigger a reconnect from the client + close(subscription.fin) + delete(s.serverEventStream.streams, stream) + } + if err != nil { + logger.WithError(err).Errorf("Failed to send server status response to %s", subscription.name) } } - } // initial send of server statuses to a new controller @@ -252,7 +291,7 @@ func (s *SchedulerServer) sendCurrentServerStatuses(stream pb.Scheduler_ServerSt return err } for _, server := range servers { - ssr := createServerStatusResponse(server) + ssr := createServerStatusUpdateResponse(server) _, err := sendWithTimeout(func() error { return stream.Send(ssr) }, s.timeout) if err != nil { return err diff --git a/scheduler/pkg/server/server_status_test.go b/scheduler/pkg/server/server_status_test.go index f7d23e99b2..e17bc4c5ff 100644 --- a/scheduler/pkg/server/server_status_test.go +++ b/scheduler/pkg/server/server_status_test.go @@ -146,7 +146,8 @@ func TestModelsStatusEvents(t *testing.T) { } g.Expect(s.modelEventStream.streams[stream]).ToNot(BeNil()) hub.PublishModelEvent(modelEventHandlerName, coordinator.ModelEventMsg{ - ModelName: "foo", ModelVersion: 1}) + ModelName: "foo", ModelVersion: 1, + }) // to allow events to propagate time.Sleep(500 * time.Millisecond) @@ -340,10 +341,13 @@ func TestServersStatusEvents(t *testing.T) { g := NewGomegaWithT(t) type test struct { - name string - loadReq *pba.AgentSubscribeRequest - timeout time.Duration - err bool + name string + loadReq *pba.AgentSubscribeRequest + timeout time.Duration + desiredModelReplicas uint32 + minModelReplicas uint32 + maxModelReplicas uint32 + err bool } tests := []test{ @@ -363,6 +367,39 @@ func TestServersStatusEvents(t *testing.T) { timeout: 1 * time.Millisecond, err: true, }, + { + name: "scaling requested", + loadReq: &pba.AgentSubscribeRequest{ + ServerName: "foo", + }, + timeout: 10 * time.Minute, + desiredModelReplicas: 3, + minModelReplicas: 2, + maxModelReplicas: 3, + err: false, + }, + { + name: "scaling requested", + loadReq: &pba.AgentSubscribeRequest{ + ServerName: "foo", + }, + timeout: 10 * time.Minute, + desiredModelReplicas: 1, + minModelReplicas: 2, + maxModelReplicas: 3, + err: false, + }, + { + name: "desired replicas less than available replicas", + loadReq: &pba.AgentSubscribeRequest{ + ServerName: "foo", + }, + timeout: 10 * time.Minute, + desiredModelReplicas: 1, + minModelReplicas: 2, + maxModelReplicas: 3, + err: false, + }, } for _, test := range tests { @@ -374,7 +411,8 @@ func TestServersStatusEvents(t *testing.T) { g.Expect(err).To(BeNil()) err = s.modelStore.UpdateModel(&pb.LoadModelRequest{ Model: &pb.Model{ - Meta: &pb.MetaData{Name: "foo"}, + Meta: &pb.MetaData{Name: "foo"}, + DeploymentSpec: &pb.DeploymentSpec{Replicas: test.desiredModelReplicas, MinReplicas: test.minModelReplicas, MaxReplicas: test.maxModelReplicas}, }, }) g.Expect(err).To(BeNil()) @@ -394,7 +432,8 @@ func TestServersStatusEvents(t *testing.T) { } g.Expect(s.serverEventStream.streams[stream]).ToNot(BeNil()) hub.PublishModelEvent(serverModelEventHandlerName, coordinator.ModelEventMsg{ - ModelName: "foo", ModelVersion: 1}) + ModelName: "foo", ModelVersion: 1, + }) // to allow events to propagate time.Sleep(500 * time.Millisecond) @@ -402,7 +441,6 @@ func TestServersStatusEvents(t *testing.T) { if test.err { g.Expect(s.serverEventStream.streams).To(HaveLen(0)) } else { - var ssr *pb.ServerStatusResponse select { case next := <-stream.msgs: @@ -414,7 +452,16 @@ func TestServersStatusEvents(t *testing.T) { g.Expect(ssr).ToNot(BeNil()) g.Expect(ssr.ServerName).To(Equal("foo")) g.Expect(s.serverEventStream.streams).To(HaveLen(1)) + + if test.desiredModelReplicas > 0 { + g.Expect(ssr.ExpectedReplicas).To(Equal(int32(test.desiredModelReplicas))) + g.Expect(ssr.Type).To(Equal(pb.ServerStatusResponse_ScalingRequest)) + } else { + g.Expect(ssr.ExpectedReplicas).To(Equal(int32(0))) + g.Expect(ssr.Type).To(Equal(pb.ServerStatusResponse_StatusUpdate)) + } } + }) } } diff --git a/scheduler/pkg/store/memory_status.go b/scheduler/pkg/store/memory_status.go index 0c0187782d..0ff7ecf983 100644 --- a/scheduler/pkg/store/memory_status.go +++ b/scheduler/pkg/store/memory_status.go @@ -119,6 +119,7 @@ func (m *MemoryStore) FailedScheduling(modelVersion *ModelVersion, reason string if reset { modelVersion.server = "" } + m.eventHub.PublishModelEvent( modelFailureEventSource, coordinator.ModelEventMsg{ diff --git a/scheduler/pkg/util/model.go b/scheduler/pkg/util/model.go index e77f6d18dd..c9f0691f23 100644 --- a/scheduler/pkg/util/model.go +++ b/scheduler/pkg/util/model.go @@ -36,3 +36,15 @@ func GetOrignalModelNameAndVersion(versionedModel string) (string, uint32, error func GetPinnedModelVersion() uint32 { return 1 } + +// if min and max replicas are not set, we do not allow autoscaling +// we check that they are not set if they are equal to zero as per +// `GetMinReplicas` and `GetMaxReplicas` definition +func AutoscalingEnabled(minReplicas, maxReplicas uint32) bool { + if (minReplicas == 0) && (maxReplicas == 0) { + // no autoscaling + return false + } else { + return true + } +}