Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

lxd: Make the LXD shutdown sequence more concurrent to avoid long-running operations blocking unrelated instances shutdown #15016

Open
wants to merge 9 commits into
base: main
Choose a base branch
from
Open
6 changes: 6 additions & 0 deletions lxd/api_internal.go
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,7 @@ var apiInternal = []APIEndpoint{
internalWarningCreateCmd,
internalIdentityCacheRefreshCmd,
internalPruneTokenCmd,
internalOperationWaitCmd,
}

var internalShutdownCmd = APIEndpoint{
Expand Down Expand Up @@ -142,6 +143,11 @@ var internalPruneTokenCmd = APIEndpoint{
Post: APIEndpointAction{Handler: removeTokenHandler, AccessHandler: allowPermission(entity.TypeServer, auth.EntitlementCanEdit)},
}

var internalOperationWaitCmd = APIEndpoint{
Path: "testing/operation-wait",
Post: APIEndpointAction{Handler: operationWaitHandler, AccessHandler: allowPermission(entity.TypeServer, auth.EntitlementCanEdit)},
}

var internalIdentityCacheRefreshCmd = APIEndpoint{
Path: "identity-cache-refresh",

Expand Down
73 changes: 57 additions & 16 deletions lxd/daemon.go
Original file line number Diff line number Diff line change
Expand Up @@ -1528,7 +1528,7 @@ func (d *Daemon) init() error {
return fmt.Errorf("Failed loading containers to restart: %w", err)
}

instancesShutdown(instances)
instancesShutdown(d.shutdownCtx, instances, nil)
instancesStart(s, instances)
}

Expand Down Expand Up @@ -2063,43 +2063,67 @@ func (d *Daemon) Stop(ctx context.Context, sig os.Signal) error {
if err == nil {
instancesLoaded = true
}

// Load cluster configuration.
var cancel context.CancelFunc
err = d.db.Cluster.Transaction(ctx, func(ctx context.Context, tx *db.ClusterTx) error {
config, err := clusterConfig.Load(ctx, tx)
if err != nil {
return err
}

d.globalConfigMu.Lock()
d.globalConfig = config
d.globalConfigMu.Unlock()

return nil
})
if err != nil {
logger.Warn("Failed loading cluster configuration", logger.Ctx{"err": err})
ctx, cancel = context.WithTimeout(ctx, 5*time.Minute)
} else {
ctx, cancel = context.WithTimeout(ctx, d.globalConfig.ShutdownTimeout())
}

defer cancel()
}

// Handle shutdown (unix.SIGPWR) and reload (unix.SIGTERM) signals.
if sig == unix.SIGPWR || sig == unix.SIGTERM {
var opsTracker *OperationTracker
if d.db.Cluster != nil {
// waitForOperations will block until all operations are done, or it's forced to shut down.
// For the latter case, we re-use the shutdown channel which is filled when a shutdown is
// initiated using `lxd shutdown`.
waitForOperations(ctx, d.db.Cluster, s.GlobalConfig.ShutdownTimeout())
opsTracker, err = entityToPendingOperations()
if err != nil {
logger.Error("Failed to get entity to pending operations map", logger.Ctx{"err": err})
}
}

// Unmount daemon image and backup volumes if set.
logger.Info("Stopping daemon storage volumes")
done := make(chan struct{})
var volumesWg sync.WaitGroup
volumesWg.Add(1)
go func() {
err := daemonStorageVolumesUnmount(s)
unmountTimeout := time.After(1 * time.Minute)
logger.Debug("Unmounting daemon storage volumes")
err := daemonStorageVolumesUnmount(ctx, unmountTimeout, s, opsTracker)
if err != nil {
logger.Error("Failed to unmount image and backup volumes", logger.Ctx{"err": err})
}

done <- struct{}{}
volumesWg.Done()
}()

// Only wait 60 seconds in case the storage backend is unreachable.
select {
case <-time.After(time.Minute):
logger.Error("Timed out waiting for image and backup volume")
case <-done:
}

// Full shutdown requested.
if sig == unix.SIGPWR {
instancesShutdown(instances)
logger.Debug("Shutting down instances")
instancesShutdown(ctx, instances, opsTracker)

logger.Info("Stopping networks")
networkShutdown(s)

volumesWg.Wait()
logger.Debug("Daemon storage volumes unmounted")

// Unmount storage pools after instances stopped.
logger.Info("Stopping storage pools")

Expand Down Expand Up @@ -2130,6 +2154,23 @@ func (d *Daemon) Stop(ctx context.Context, sig os.Signal) error {
}
}
}

if d.db.Cluster != nil {
// Remove remaining operations before closing the database.
err := s.DB.Cluster.Transaction(context.TODO(), func(ctx context.Context, tx *db.ClusterTx) error {
err := dbCluster.DeleteOperations(ctx, tx.Tx(), s.DB.Cluster.GetNodeID())
if err != nil {
logger.Error("Failed cleaning up operations")
}

return nil
})
if err != nil {
logger.Error("Failed cleaning up operations", logger.Ctx{"err": err})
} else {
logger.Debug("Operations deleted from the database")
}
}
}

if d.gateway != nil {
Expand Down
132 changes: 121 additions & 11 deletions lxd/daemon_storage.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,20 +6,24 @@ import (
"os"
"path/filepath"
"strings"
"time"

"github.com/canonical/lxd/lxd/db"
"github.com/canonical/lxd/lxd/db/cluster"
"github.com/canonical/lxd/lxd/node"
"github.com/canonical/lxd/lxd/operations"
"github.com/canonical/lxd/lxd/project"
"github.com/canonical/lxd/lxd/rsync"
"github.com/canonical/lxd/lxd/state"
storagePools "github.com/canonical/lxd/lxd/storage"
storageDrivers "github.com/canonical/lxd/lxd/storage/drivers"
"github.com/canonical/lxd/shared"
"github.com/canonical/lxd/shared/api"
"github.com/canonical/lxd/shared/logger"
"github.com/canonical/lxd/shared/version"
)

func daemonStorageVolumesUnmount(s *state.State) error {
func daemonStorageVolumesUnmount(ctx context.Context, unmountTimeout <-chan time.Time, s *state.State, opsTracker *OperationTracker) error {
var storageBackups string
var storageImages string

Expand Down Expand Up @@ -59,21 +63,127 @@ func daemonStorageVolumesUnmount(s *state.State) error {
return nil
}

if storageBackups != "" {
err := unmount(storageBackups)
if err != nil {
return fmt.Errorf("Failed to unmount backups storage: %w", err)
if opsTracker != nil && (storageBackups != "" || storageImages != "") {
tick := time.NewTicker(time.Second)
defer tick.Stop()

// We're only interested to the ongoing operations related to 'volumes' in this function.
volumesTracker := opsTracker.GetOpsTrackingGroup("volumes")

storageBackupsUnmounted := false
var storageBackupsBaseURLPrefix string
var storageBackupsURLToRemoveFromMap string
storageImagesUnmounted := false
var storageImagesBaseURLPrefix string
var storageImagesURLToRemoveFromMap string

if storageBackups != "" {
poolName, volumeName, err := daemonStorageSplitVolume(storageBackups)
if err != nil {
return err
}

storageBackupsBaseURLPrefix = api.NewURL().Path(version.APIVersion, "storage-pools", poolName, "volumes", "custom", volumeName).String()
} else {
storageBackupsUnmounted = true
}
}

if storageImages != "" {
err := unmount(storageImages)
if err != nil {
return fmt.Errorf("Failed to unmount images storage: %w", err)
if storageImages != "" {
poolName, volumeName, err := daemonStorageSplitVolume(storageImages)
if err != nil {
return err
}

storageImagesBaseURLPrefix = api.NewURL().Path(version.APIVersion, "storage-pools", poolName, "volumes", "custom", volumeName).String()
} else {
storageImagesUnmounted = true
}

for {
select {
case <-unmountTimeout:
logger.Warn("Unmounting storage volumes timed out")
return fmt.Errorf("Failed to unmount backups and images storage volumes due to timeout")
case <-ctx.Done():
logger.Warn("Unmounting storage volumes cancelled")
return fmt.Errorf("Failed to unmount backups and images storage volumes due to context cancellation")
case <-tick.C:
volumesTracker.Mu.RLock()
for url, op := range volumesTracker.Ops {
if op.Status() == api.Running && op.Class() != operations.OperationClassToken {
continue
}

// Check if the resource has the same storageBackupsBaseURLPrefix prefix
if storageBackupsBaseURLPrefix != "" && !storageBackupsUnmounted && strings.HasPrefix(url, storageBackupsBaseURLPrefix) {
err := unmount(storageBackups)
if err != nil {
volumesTracker.Mu.RUnlock()
return fmt.Errorf("Failed to unmount backups storage: %w", err)
}

storageBackupsURLToRemoveFromMap = url
storageBackupsUnmounted = true
logger.Debug("Successfully unmounted backups storage volume")
}

if storageImagesBaseURLPrefix != "" && !storageImagesUnmounted && strings.HasPrefix(url, storageImagesBaseURLPrefix) {
err := unmount(storageImages)
if err != nil {
volumesTracker.Mu.RUnlock()
return fmt.Errorf("Failed to unmount images storage: %w", err)
}

storageImagesURLToRemoveFromMap = url
storageImagesUnmounted = true
logger.Debug("Successfully unmounted images storage volume")
}
}

volumesTracker.Mu.RUnlock()
if storageBackupsUnmounted && storageImagesUnmounted {
if storageBackupsURLToRemoveFromMap != "" {
volumesTracker.Mu.Lock()
logger.Debug("Removing storage backups URL from operations tracker", logger.Ctx{"url": storageBackupsURLToRemoveFromMap})
delete(volumesTracker.Ops, storageBackupsURLToRemoveFromMap)
volumesTracker.Mu.Unlock()
}

if storageImagesURLToRemoveFromMap != "" {
volumesTracker.Mu.Lock()
logger.Debug("Removing storage images URL from operations tracker", logger.Ctx{"url": storageImagesURLToRemoveFromMap})
delete(volumesTracker.Ops, storageImagesURLToRemoveFromMap)
volumesTracker.Mu.Unlock()
}

return nil
}
}
}
}

return nil
select {
case <-ctx.Done():
return fmt.Errorf("Failed to unmount backups and images storage volumes due to context cancellation")
case <-unmountTimeout:
return fmt.Errorf("Failed to unmount backups and images storage volumes due to timeout")
default:
if storageBackups != "" {
err := unmount(storageBackups)
if err != nil {
return fmt.Errorf("Failed to unmount backups storage: %w", err)
}
}

if storageImages != "" {
err := unmount(storageImages)
if err != nil {
return fmt.Errorf("Failed to unmount images storage: %w", err)
}
}

return nil
}
}

func daemonStorageMount(s *state.State) error {
Expand Down
Loading
Loading