Skip to content

Commit

Permalink
Improve container and imagefs stats
Browse files Browse the repository at this point in the history
- Use pipeline pattern for controlling the concurrecy of ListContainerStats
- decouple computing container RW layer size and cache it.
- for linux, calculate imageFS using statfs and docker images api
- cache imageFS stats
- pick up kubernetes#104287

Signed-off-by: Xinfeng Liu <[email protected]>

remove store/object_cache.go

Signed-off-by: Xinfeng Liu <[email protected]>

decouple container RW layer size calculation

Signed-off-by: Xinfeng Liu <[email protected]>

use docker images api for imagefs usedBytes

Signed-off-by: Xinfeng Liu <[email protected]>
  • Loading branch information
xinfengliu committed Sep 20, 2023
1 parent 665bb2d commit 21b8469
Show file tree
Hide file tree
Showing 22 changed files with 640 additions and 348 deletions.
3 changes: 2 additions & 1 deletion core/container_remove.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,10 +19,11 @@ package core
import (
"context"
"fmt"
"time"

"github.com/Mirantis/cri-dockerd/libdocker"
"github.com/docker/docker/api/types"
v1 "k8s.io/cri-api/pkg/apis/runtime/v1"
"time"
)

// RemoveContainer removes the container.
Expand Down
114 changes: 38 additions & 76 deletions core/docker_service.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@ import (
"github.com/Mirantis/cri-dockerd/network/kubenet"
"github.com/Mirantis/cri-dockerd/store"
"github.com/Mirantis/cri-dockerd/streaming"
"github.com/Mirantis/cri-dockerd/utils"
"github.com/blang/semver"
dockertypes "github.com/docker/docker/api/types"
"github.com/sirupsen/logrus"
Expand Down Expand Up @@ -66,11 +67,9 @@ const (
containerLogPathLabelKey = "io.kubernetes.container.logpath"
sandboxIDLabelKey = "io.kubernetes.sandbox.id"

// The expiration time of version cache.
versionCacheTTL = 60 * time.Second
// The expiration time of 'docker info' cache.
infoCacheTTL = 60 * time.Second
maxMsgSize = 1024 * 1024 * 16
systemInfoCacheMinTTL = time.Minute

maxMsgSize = 1024 * 1024 * 16

defaultCgroupDriver = "cgroupfs"
)
Expand Down Expand Up @@ -156,6 +155,7 @@ func NewDockerService(
checkpointManager: checkpointManager,
networkReady: make(map[string]bool),
containerCleanupInfos: make(map[string]*containerCleanupInfo),
containerStatsCache: newContainerStatsCache(),
}

// check docker version compatibility.
Expand Down Expand Up @@ -216,23 +216,18 @@ func NewDockerService(
plug.Name(),
)

ds.infoCache = store.NewObjectCache(
func() (interface{}, error) {
return ds.client.Info()
},
infoCacheTTL,
)
dockerInfo, err := ds.getDockerInfo()
if err != nil {
return nil, fmt.Errorf("Failed to execute Info() call to the Docker client")
}
logrus.Infof("Docker Info: %+v", dockerInfo)
ds.dockerRootDir = dockerInfo.DockerRootDir

// skipping cgroup driver checks for Windows
if runtime.GOOS == "linux" {
// NOTE: cgroup driver is only detectable in docker 1.11+
cgroupDriver := defaultCgroupDriver
dockerInfo, err := ds.getDockerInfo()
logrus.Infof("Docker Info: %+v", dockerInfo)
if err != nil {
logrus.Error(err, "Failed to execute Info() call to the Docker client")
logrus.Infof("Falling back to use the default driver %s", cgroupDriver)
} else if len(dockerInfo.CgroupDriver) == 0 {
if len(dockerInfo.CgroupDriver) == 0 {
logrus.Info("No cgroup driver is set in Docker")
logrus.Infof("Falling back to use the default driver %s", cgroupDriver)
} else {
Expand All @@ -249,18 +244,11 @@ func NewDockerService(
ds.cgroupDriver = cgroupDriver
}

ds.versionCache = store.NewObjectCache(
func() (interface{}, error) {
v, err := ds.client.Version()
fixAPIVersion(v)
return v, err
},
versionCacheTTL,
)

// Register prometheus metrics.
metrics.Register()

go ds.startStatsCollection()

return ds, nil
}

Expand All @@ -280,14 +268,14 @@ type dockerService struct {
// cgroup driver used by Docker runtime.
cgroupDriver string
checkpointManager store.CheckpointManager
// caches the version of the runtime.
// To be compatible with multiple docker versions, we need to perform
// version checking for some operations. Use this cache to avoid querying
// the docker daemon every time we need to do such checks.
versionCache *store.ObjectCache

// caches "docker info"
infoCache *store.ObjectCache
// cache for 'docker version' and 'docker info'
systemInfoCache utils.Cache

// docker root directory
dockerRootDir string

containerStatsCache *containerStatsCache

// containerCleanupInfos maps container IDs to the `containerCleanupInfo` structs
// needed to clean up after containers have been removed.
Expand Down Expand Up @@ -340,17 +328,18 @@ func (ds *dockerService) AlphaVersion(
}

// getDockerVersion gets the version information from docker.
func (ds *dockerService) getDockerVersion() (v *dockertypes.Version, err error) {
if ds.versionCache != nil {
v, err = ds.getDockerVersionFromCache()
} else {
v, err = ds.client.Version()
fixAPIVersion(v)
}
func (ds *dockerService) getDockerVersion() (*dockertypes.Version, error) {
res, err := ds.systemInfoCache.Memoize("docker_version", systemInfoCacheMinTTL, func() (interface{}, error) {
return ds.client.Version()
})
if err != nil {
return nil, fmt.Errorf("failed to get docker version: %v", err)
return nil, fmt.Errorf("failed to get docker version from dockerd: %v", err)
}
return v, nil
cachedValue := res.(*dockertypes.Version)
// make a copy
v := *cachedValue
fixAPIVersion(&v)
return &v, nil
}

// fixAPIVersion remedy Docker API version (e.g., 1.23) which is not semver compatible by
Expand All @@ -362,16 +351,15 @@ func fixAPIVersion(v *dockertypes.Version) {
}

// getDockerInfo gets the information of "docker info".
func (ds *dockerService) getDockerInfo() (v *dockertypes.Info, err error) {
if ds.infoCache != nil {
v, err = ds.getDockerInfoFromCache()
} else {
v, err = ds.client.Info()
}
func (ds *dockerService) getDockerInfo() (*dockertypes.Info, error) {
res, err := ds.systemInfoCache.Memoize("docker_info", systemInfoCacheMinTTL, func() (interface{}, error) {
return ds.client.Info()
})
if err != nil {
return nil, fmt.Errorf("failed to get docker info: %v", err)
return nil, fmt.Errorf("failed to get docker info from dockerd: %v", err)
}
return v, nil
info := res.(*dockertypes.Info)
return info, nil
}

// UpdateRuntimeConfig updates the runtime config. Currently only handles podCIDR updates.
Expand Down Expand Up @@ -524,29 +512,3 @@ func (ds *dockerService) getDockerAPIVersion() (*semver.Version, error) {
}
return &apiVersion, nil
}

func (ds *dockerService) getDockerVersionFromCache() (*dockertypes.Version, error) {
// We only store on key in the cache.
const dummyKey = "version"
value, err := ds.versionCache.Get(dummyKey)
if err != nil {
return nil, err
}
dv, ok := value.(*dockertypes.Version)
if !ok {
return nil, fmt.Errorf("converted to *dockertype.Version error")
}
return dv, nil
}

func (ds *dockerService) getDockerInfoFromCache() (*dockertypes.Info, error) {
value, err := ds.infoCache.Get("info")
if err != nil {
return nil, err
}
dv, ok := value.(*dockertypes.Info)
if !ok {
return nil, fmt.Errorf("converted to *dockertype.Info error")
}
return dv, nil
}
7 changes: 1 addition & 6 deletions core/docker_service_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -102,17 +102,12 @@ func newTestDockerService() (*dockerService, *libdocker.FakeDockerClient, *clock
network: pm,
checkpointManager: ckm,
networkReady: make(map[string]bool),
dockerRootDir: "/docker/root/dir",
}, c, fakeClock
}

func newTestDockerServiceWithVersionCache() (*dockerService, *libdocker.FakeDockerClient, *clock.FakeClock) {
ds, c, fakeClock := newTestDockerService()
ds.versionCache = store.NewObjectCache(
func() (interface{}, error) {
return ds.getDockerVersion()
},
time.Hour*10,
)
return ds, c, fakeClock
}

Expand Down
81 changes: 0 additions & 81 deletions core/image_linux.go

This file was deleted.

31 changes: 31 additions & 0 deletions core/imagefs.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,31 @@
package core

import (
"context"
"time"

"github.com/Mirantis/cri-dockerd/utils"
runtimeapi "k8s.io/cri-api/pkg/apis/runtime/v1"
)

// ImageFsStatsCache caches imagefs stats.
var ImageFsStatsCache utils.Cache

const imageFsStatsMinTTL = 30 * time.Second

// ImageFsInfo returns information of the filesystem that is used to store images.
func (ds *dockerService) ImageFsInfo(
_ context.Context,
_ *runtimeapi.ImageFsInfoRequest,
) (*runtimeapi.ImageFsInfoResponse, error) {

res, err := ImageFsStatsCache.Memoize("imagefs", imageFsStatsMinTTL, func() (interface{}, error) {
return ds.imageFsInfo()
})
if err != nil {
return nil, err
}
stats := res.(*runtimeapi.ImageFsInfoResponse)
return stats, nil

}
Loading

0 comments on commit 21b8469

Please sign in to comment.