Skip to content

Commit

Permalink
Refactor deployments storage on redeploy and undeployment (#483)
Browse files Browse the repository at this point in the history
<!--  Thanks for sending a pull request!  Here are some tips for you:

1. Run unit tests and ensure that they are passing
2. If your change introduces any API changes, make sure to update the
e2e tests
3. Make sure documentation is updated for your PR!

-->

**What this PR does / why we need it**:
<!-- Explain here the context and why you're making the change. What is
the problem you're trying to solve. --->

The terminated endpoint means the user has triggered Undeployment and
there's no deployed endpoint in the cluster. When we redeploy this
terminated endpoint, the deployment history should show all previous
successful deployment logs as Not Deployed and the new deployment as
Pending.

However, when redeploying the terminated endpoint, the deployment
history shows the previous successful deployment as the current and
deployed one:

Before redeployment of terminated endpoint:

<img width="500" alt="image"
src="https://github.com/caraml-dev/merlin/assets/8122852/90d3cc87-9e73-4317-aaea-c37bc3c6b510">

Actual view from redeployment of terminated endpoint:

<img width="500" alt="image"
src="https://github.com/caraml-dev/merlin/assets/8122852/e363bac3-b7b2-4d1f-b942-b8dea9a6e4eb">

## Fixes introduced by this PR:

Before redeployment, the current label is given to the latest
successful/terminated endpoint:

<img width="500" alt="image"
src="https://github.com/caraml-dev/merlin/assets/8122852/76ccf983-f51d-43a8-8e8c-c3ba6547e21e">

During redeployment:

<img width="500" alt="image"
src="https://github.com/caraml-dev/merlin/assets/8122852/5aa74859-1ec7-4d36-98f8-6928b7aad6ae">

After redeployment succeed:

<img width="500" alt="image"
src="https://github.com/caraml-dev/merlin/assets/8122852/f17e1420-4aa6-4112-981d-bff3e5064798">

*Note: Please ignore the time displayed as `in 7 hours` -- this is
because I'm running from local and the time zone is not translated
correctly.

---

Behind the scene, we refactor deployments table for given version
endpoint for redeployment and unemployment:

1. Redeployment
  a. new deployment status = running/serving,
  b. old successful deployment status = terminated
2. Undeployment
  a. old successful deployment status = terminated

**Which issue(s) this PR fixes**:
<!--
*Automatically closes linked issue when PR is merged.
Usage: `Fixes #<issue number>`, or `Fixes (paste link of issue)`.
-->

* Fixes redeployment of terminated endpoint
* Fixes duplicate link icon on Mlflow run

**Does this PR introduce a user-facing change?**:
<!--
If no, just write "NONE" in the release-note block below.
If yes, a release note is required. Enter your extended release note in
the block below.
If the PR requires additional action from users switching to the new
release, include the string "action required".

For more information about release notes, see kubernetes' guide here:
http://git.k8s.io/community/contributors/guide/release-notes.md
-->

```release-note
NONE
```

**Checklist**

- [x] Added unit test, integration, and/or e2e tests
- [x] Tested locally
- [ ] Updated documentation
- [ ] Update Swagger spec if the PR introduce API changes
- [ ] Regenerated Golang and Python client if the PR introduce API
changes
  • Loading branch information
ariefrahmansyah authored Nov 19, 2023
1 parent 281a15d commit 95a6183
Show file tree
Hide file tree
Showing 16 changed files with 441 additions and 79 deletions.
4 changes: 4 additions & 0 deletions api/models/deployment.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,3 +27,7 @@ type Deployment struct {
Error string `json:"error"`
CreatedUpdated
}

func (d *Deployment) IsSuccess() bool {
return d.Status == EndpointRunning || d.Status == EndpointServing
}
11 changes: 9 additions & 2 deletions api/queue/work/model_service_deployment.go
Original file line number Diff line number Diff line change
Expand Up @@ -111,8 +111,15 @@ func (depl *ModelServiceDeployment) Deploy(job *queue.Job) error {

// record the deployment result
deployment.UpdatedAt = time.Now()
if _, err := depl.DeploymentStorage.Save(deployment); err != nil {
log.Warnf("unable to update deployment history", err)
if deployment.IsSuccess() {
if err := depl.DeploymentStorage.OnDeploymentSuccess(deployment); err != nil {
log.Errorf("unable to update deployment history on successful deployment (ID: %+v): %s", deployment.ID, err)
}
} else {
// If failed, only update the latest deployment
if _, err := depl.DeploymentStorage.Save(deployment); err != nil {
log.Errorf("unable to update deployment history for failed deployment (ID: %+v): %s", deployment.ID, err)
}
}

// if redeployment failed, we only update the previous endpoint status from pending to previous status
Expand Down
28 changes: 15 additions & 13 deletions api/queue/work/model_service_deployment_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -84,7 +84,7 @@ func TestExecuteDeployment(t *testing.T) {
deploymentStorage: func() *mocks.DeploymentStorage {
mockStorage := &mocks.DeploymentStorage{}
mockStorage.On("Save", mock.Anything).Return(&models.Deployment{}, nil)
mockStorage.On("Save", mock.Anything).Return(nil, nil)
mockStorage.On("OnDeploymentSuccess", mock.Anything).Return(nil)
return mockStorage
},
storage: func() *mocks.VersionEndpointStorage {
Expand Down Expand Up @@ -129,7 +129,7 @@ func TestExecuteDeployment(t *testing.T) {
deploymentStorage: func() *mocks.DeploymentStorage {
mockStorage := &mocks.DeploymentStorage{}
mockStorage.On("Save", mock.Anything).Return(&models.Deployment{}, nil)
mockStorage.On("Save", mock.Anything).Return(nil, nil)
mockStorage.On("OnDeploymentSuccess", mock.Anything).Return(nil)
return mockStorage
},
storage: func() *mocks.VersionEndpointStorage {
Expand Down Expand Up @@ -174,7 +174,7 @@ func TestExecuteDeployment(t *testing.T) {
deploymentStorage: func() *mocks.DeploymentStorage {
mockStorage := &mocks.DeploymentStorage{}
mockStorage.On("Save", mock.Anything).Return(&models.Deployment{}, nil)
mockStorage.On("Save", mock.Anything).Return(nil, nil)
mockStorage.On("OnDeploymentSuccess", mock.Anything).Return(nil)
return mockStorage
},
storage: func() *mocks.VersionEndpointStorage {
Expand Down Expand Up @@ -221,7 +221,7 @@ func TestExecuteDeployment(t *testing.T) {
deploymentStorage: func() *mocks.DeploymentStorage {
mockStorage := &mocks.DeploymentStorage{}
mockStorage.On("Save", mock.Anything).Return(&models.Deployment{}, nil)
mockStorage.On("Save", mock.Anything).Return(nil, nil)
mockStorage.On("OnDeploymentSuccess", mock.Anything).Return(nil)
return mockStorage
},
storage: func() *mocks.VersionEndpointStorage {
Expand Down Expand Up @@ -275,7 +275,7 @@ func TestExecuteDeployment(t *testing.T) {
deploymentStorage: func() *mocks.DeploymentStorage {
mockStorage := &mocks.DeploymentStorage{}
mockStorage.On("Save", mock.Anything).Return(&models.Deployment{}, nil)
mockStorage.On("Save", mock.Anything).Return(nil, nil)
mockStorage.On("OnDeploymentSuccess", mock.Anything).Return(nil)
return mockStorage
},
storage: func() *mocks.VersionEndpointStorage {
Expand Down Expand Up @@ -328,7 +328,6 @@ func TestExecuteDeployment(t *testing.T) {
deploymentStorage: func() *mocks.DeploymentStorage {
mockStorage := &mocks.DeploymentStorage{}
mockStorage.On("Save", mock.Anything).Return(&models.Deployment{}, nil)
mockStorage.On("Save", mock.Anything).Return(nil, nil)
return mockStorage
},
storage: func() *mocks.VersionEndpointStorage {
Expand Down Expand Up @@ -368,7 +367,6 @@ func TestExecuteDeployment(t *testing.T) {
deploymentStorage: func() *mocks.DeploymentStorage {
mockStorage := &mocks.DeploymentStorage{}
mockStorage.On("Save", mock.Anything).Return(&models.Deployment{}, nil)
mockStorage.On("Save", mock.Anything).Return(nil, nil)
return mockStorage
},
storage: func() *mocks.VersionEndpointStorage {
Expand Down Expand Up @@ -430,7 +428,6 @@ func TestExecuteDeployment(t *testing.T) {
}

mockStorage.AssertNumberOfCalls(t, "Save", 1)
mockDeploymentStorage.AssertNumberOfCalls(t, "Save", 2)

savedEndpoint := mockStorage.Calls[1].Arguments[0].(*models.VersionEndpoint)
assert.Equal(t, tt.model.ID, savedEndpoint.VersionModelID)
Expand All @@ -445,8 +442,11 @@ func TestExecuteDeployment(t *testing.T) {
}

if tt.deployErr != nil {
mockDeploymentStorage.AssertNumberOfCalls(t, "Save", 2)
assert.Equal(t, models.EndpointFailed, savedEndpoint.Status)
} else {
mockDeploymentStorage.AssertNumberOfCalls(t, "Save", 1)
mockDeploymentStorage.AssertNumberOfCalls(t, "OnDeploymentSuccess", 1)
assert.Equal(t, models.EndpointRunning, savedEndpoint.Status)
assert.Equal(t, url, savedEndpoint.URL)
assert.Equal(t, "", savedEndpoint.InferenceServiceName)
Expand Down Expand Up @@ -527,7 +527,7 @@ func TestExecuteRedeployment(t *testing.T) {
deploymentStorage: func() *mocks.DeploymentStorage {
mockStorage := &mocks.DeploymentStorage{}
mockStorage.On("Save", mock.Anything).Return(&models.Deployment{}, nil)
mockStorage.On("Save", mock.Anything).Return(nil, nil)
mockStorage.On("OnDeploymentSuccess", mock.Anything).Return(nil)
return mockStorage
},
storage: func() *mocks.VersionEndpointStorage {
Expand Down Expand Up @@ -592,7 +592,7 @@ func TestExecuteRedeployment(t *testing.T) {
deploymentStorage: func() *mocks.DeploymentStorage {
mockStorage := &mocks.DeploymentStorage{}
mockStorage.On("Save", mock.Anything).Return(&models.Deployment{}, nil)
mockStorage.On("Save", mock.Anything).Return(nil, nil)
mockStorage.On("OnDeploymentSuccess", mock.Anything).Return(nil)
return mockStorage
},
storage: func() *mocks.VersionEndpointStorage {
Expand Down Expand Up @@ -657,7 +657,7 @@ func TestExecuteRedeployment(t *testing.T) {
deploymentStorage: func() *mocks.DeploymentStorage {
mockStorage := &mocks.DeploymentStorage{}
mockStorage.On("Save", mock.Anything).Return(&models.Deployment{}, nil)
mockStorage.On("Save", mock.Anything).Return(nil, nil)
mockStorage.On("OnDeploymentSuccess", mock.Anything).Return(nil)
return mockStorage
},
storage: func() *mocks.VersionEndpointStorage {
Expand Down Expand Up @@ -723,7 +723,6 @@ func TestExecuteRedeployment(t *testing.T) {
deploymentStorage: func() *mocks.DeploymentStorage {
mockStorage := &mocks.DeploymentStorage{}
mockStorage.On("Save", mock.Anything).Return(&models.Deployment{}, nil)
mockStorage.On("Save", mock.Anything).Return(nil, nil)
return mockStorage
},
storage: func() *mocks.VersionEndpointStorage {
Expand Down Expand Up @@ -798,7 +797,6 @@ func TestExecuteRedeployment(t *testing.T) {
}

mockStorage.AssertNumberOfCalls(t, "Save", 1)
mockDeploymentStorage.AssertNumberOfCalls(t, "Save", 2)

savedEndpoint := mockStorage.Calls[1].Arguments[0].(*models.VersionEndpoint)
assert.Equal(t, tt.model.ID, savedEndpoint.VersionModelID)
Expand All @@ -814,8 +812,12 @@ func TestExecuteRedeployment(t *testing.T) {

assert.Equal(t, tt.expectedEndpointStatus, savedEndpoint.Status)
if tt.deployErr == nil {
mockDeploymentStorage.AssertNumberOfCalls(t, "Save", 1)
mockDeploymentStorage.AssertNumberOfCalls(t, "OnDeploymentSuccess", 1)
assert.Equal(t, url, savedEndpoint.URL)
assert.Equal(t, modelSvcName, savedEndpoint.InferenceServiceName)
} else {
mockDeploymentStorage.AssertNumberOfCalls(t, "Save", 2)
}
})
}
Expand Down
8 changes: 6 additions & 2 deletions api/service/version_endpoint_service.go
Original file line number Diff line number Diff line change
Expand Up @@ -262,8 +262,12 @@ func (k *endpointService) UndeployEndpoint(ctx context.Context, environment *mod
}

endpoint.Status = models.EndpointTerminated
err = k.storage.Save(endpoint)
if err != nil {

if err := k.storage.Save(endpoint); err != nil {
return nil, err
}

if err := k.deploymentStorage.Undeploy(model.ID.String(), version.ID.String(), endpoint.ID.String()); err != nil {
return nil, err
}

Expand Down
86 changes: 86 additions & 0 deletions api/storage/deployment_storage.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,8 @@
package storage

import (
"fmt"

"gorm.io/gorm"

"github.com/caraml-dev/merlin/models"
Expand All @@ -27,6 +29,10 @@ type DeploymentStorage interface {
ListInModelVersion(modelID, versionID, endpointUUID string) ([]*models.Deployment, error)
// Save save the deployment to underlying storage
Save(deployment *models.Deployment) (*models.Deployment, error)
// OnDeploymentSuccess updates the new deployment status to successful on DB and update all previous deployment status for that version endpoint to terminated.
OnDeploymentSuccess(newDeployment *models.Deployment) error
// Undeploy updates all successful deployment status to terminated on DB
Undeploy(modelID, versionID, endpointUUID string) error
// GetFirstSuccessModelVersionPerModel Return mapping of model id and the first model version with a successful model version
GetFirstSuccessModelVersionPerModel() (map[models.ID]models.ID, error)
Delete(modelID models.ID, versionID models.ID) error
Expand Down Expand Up @@ -83,3 +89,83 @@ func (d *deploymentStorage) GetFirstSuccessModelVersionPerModel() (map[models.ID
func (d *deploymentStorage) Delete(modelID models.ID, versionID models.ID) error {
return d.db.Where("version_id = ? AND version_model_id = ?", versionID, modelID).Delete(models.Deployment{}).Error
}

// OnDeploymentSuccess updates the new deployment status to successful on DB and update all previous successful deployment status for that version endpoint to terminated.
func (d *deploymentStorage) OnDeploymentSuccess(newDeployment *models.Deployment) error {
if newDeployment.ID == 0 {
return fmt.Errorf("newDeployment.ID must not be empty")
}

if !newDeployment.IsSuccess() {
return fmt.Errorf("newDeployment.Status must be running or serving")
}

tx := d.db.Begin()
if tx.Error != nil {
return fmt.Errorf("failed to begin transaction: %w", tx.Error)
}

var err error
defer func() {
if err != nil {
tx.Rollback()
} else {
tx.Commit()
}
}()

var deployments []*models.Deployment
err = tx.Where("version_model_id = ? AND version_id = ? AND version_endpoint_id = ? AND status IN ('running', 'serving') AND id != ?",
newDeployment.VersionModelID, newDeployment.VersionID, newDeployment.VersionEndpointID, newDeployment.ID).Find(&deployments).Error
if err != nil {
return err
}

for i := range deployments {
// Set older successful deployment to terminated
deployments[i].Status = models.EndpointTerminated
}

deployments = append(deployments, newDeployment)

err = tx.Save(deployments).Error
if err != nil {
return err
}

return err
}

func (d *deploymentStorage) Undeploy(modelID, versionID, endpointUUID string) error {
tx := d.db.Begin()
if tx.Error != nil {
return fmt.Errorf("failed to begin transaction: %w", tx.Error)
}

var err error
defer func() {
if err != nil {
tx.Rollback()
} else {
tx.Commit()
}
}()

var deployments []*models.Deployment
err = tx.Where("version_model_id = ? AND version_id = ? AND version_endpoint_id = ? AND status IN ('running', 'serving')",
modelID, versionID, endpointUUID).Find(&deployments).Error
if err != nil {
return err
}

for i := range deployments {
deployments[i].Status = models.EndpointTerminated
}

err = tx.Save(deployments).Error
if err != nil {
return err
}

return err
}
Loading

0 comments on commit 95a6183

Please sign in to comment.