Commit 5da1a31e authored by Seth Hoenig's avatar Seth Hoenig
Browse files

client: enable support for cgroups v2

This PR introduces support for using Nomad on systems with cgroups v2 [1]
enabled as the cgroups controller mounted on /sys/fs/cgroups. Newer Linux
distros like Ubuntu 21.10 are shipping with cgroups v2 only, causing problems
for Nomad users.

Nomad mostly "just works" with cgroups v2 due to the indirection via libcontainer,
but not so for managing cpuset cgroups. Before, Nomad has been making use of
a feature in v1 where a PID could be a member of more than one cgroup. In v2
this is no longer possible, and so the logic around computing cpuset values
must be modified. When Nomad detects v2, it manages cpuset values in-process,
rather than making use of cgroup heirarchy inheritence via shared/reserved
parents.

Nomad will only activate the v2 logic when it detects cgroups2 is mounted at
/sys/fs/cgroups. This means on systems running in hybrid mode with cgroups2
mounted at /sys/fs/cgroups/unified (as is typical) Nomad will continue to
use the v1 logic, and should operat...
parent 1e55a358
Showing with 647 additions and 270 deletions
+647 -270
```release-note:improvement
Enable support for cgroups v2
```
......@@ -96,7 +96,7 @@ jobs:
- client/devicemanager
- client/dynamicplugins
- client/fingerprint
# - client/lib/...
- client/lib/...
- client/logmon
- client/pluginmanager
- client/state
......@@ -105,8 +105,8 @@ jobs:
- client/taskenv
- command
- command/agent
# - drivers/docker
# - drivers/exec
- drivers/docker
- drivers/exec
- drivers/java
- drivers/rawexec
- helper/...
......
......@@ -992,6 +992,7 @@ func TestAlloc_ExecStreaming_ACL_WithIsolation_Image(t *testing.T) {
// TestAlloc_ExecStreaming_ACL_WithIsolation_Chroot asserts that token only needs
// alloc-exec acl policy when chroot isolation is used
func TestAlloc_ExecStreaming_ACL_WithIsolation_Chroot(t *testing.T) {
ci.SkipSlow(t, "flaky on GHA; too much disk IO")
ci.Parallel(t)
if runtime.GOOS != "linux" || unix.Geteuid() != 0 {
......
......@@ -70,7 +70,7 @@ func testAllocRunnerConfig(t *testing.T, alloc *structs.Allocation) (*Config, fu
PrevAllocMigrator: allocwatcher.NoopPrevAlloc{},
DeviceManager: devicemanager.NoopMockManager(),
DriverManager: drivermanager.TestDriverManager(t),
CpusetManager: cgutil.NoopCpusetManager(),
CpusetManager: new(cgutil.NoopCpusetManager),
ServersContactedCh: make(chan struct{}),
}
return conf, cleanup
......
......@@ -365,7 +365,7 @@ func NewClient(cfg *config.Config, consulCatalog consul.CatalogAPI, consulProxie
invalidAllocs: make(map[string]struct{}),
serversContactedCh: make(chan struct{}),
serversContactedOnce: sync.Once{},
cpusetManager: cgutil.NewCpusetManager(cfg.CgroupParent, logger.Named("cpuset_manager")),
cpusetManager: cgutil.CreateCPUSetManager(cfg.CgroupParent, logger),
EnterpriseClient: newEnterpriseClient(logger),
}
......@@ -657,19 +657,23 @@ func (c *Client) init() error {
// Ensure cgroups are created on linux platform
if runtime.GOOS == "linux" && c.cpusetManager != nil {
err := c.cpusetManager.Init()
if err != nil {
// if the client cannot initialize the cgroup then reserved cores will not be reported and the cpuset manager
// will be disabled. this is common when running in dev mode under a non-root user for example
c.logger.Warn("could not initialize cpuset cgroup subsystem, cpuset management disabled", "error", err)
c.cpusetManager = cgutil.NoopCpusetManager()
// use the client configuration for reservable_cores if set
cores := c.config.ReservableCores
if len(cores) == 0 {
// otherwise lookup the effective cores from the parent cgroup
cores, _ = cgutil.GetCPUsFromCgroup(c.config.CgroupParent)
}
if cpuErr := c.cpusetManager.Init(cores); cpuErr != nil {
// If the client cannot initialize the cgroup then reserved cores will not be reported and the cpuset manager
// will be disabled. this is common when running in dev mode under a non-root user for example.
c.logger.Warn("failed to initialize cpuset cgroup subsystem, cpuset management disabled", "error", cpuErr)
c.cpusetManager = new(cgutil.NoopCpusetManager)
}
}
return nil
}
// reloadTLSConnections allows a client to reload its TLS configuration on the
// fly
// reloadTLSConnections allows a client to reload its TLS configuration on the fly
func (c *Client) reloadTLSConnections(newConfig *nconfig.TLSConfig) error {
var tlsWrap tlsutil.RegionWrapper
if newConfig != nil && newConfig.EnableRPC {
......
......@@ -17,6 +17,7 @@ import (
"github.com/hashicorp/nomad/client/config"
consulApi "github.com/hashicorp/nomad/client/consul"
"github.com/hashicorp/nomad/client/fingerprint"
cstate "github.com/hashicorp/nomad/client/state"
"github.com/hashicorp/nomad/command/agent/consul"
"github.com/hashicorp/nomad/helper/pluginutils/catalog"
"github.com/hashicorp/nomad/helper/pluginutils/singleton"
......@@ -30,8 +31,6 @@ import (
psstructs "github.com/hashicorp/nomad/plugins/shared/structs"
"github.com/hashicorp/nomad/testutil"
"github.com/stretchr/testify/assert"
cstate "github.com/hashicorp/nomad/client/state"
"github.com/stretchr/testify/require"
)
......
......@@ -774,7 +774,7 @@ func DefaultConfig() *Config {
CNIConfigDir: "/opt/cni/config",
CNIInterfacePrefix: "eth",
HostNetworks: map[string]*structs.ClientHostNetworkConfig{},
CgroupParent: cgutil.DefaultCgroupParent,
CgroupParent: cgutil.GetCgroupParent(""),
MaxDynamicPort: structs.DefaultMinDynamicPort,
MinDynamicPort: structs.DefaultMaxDynamicPort,
}
......
......@@ -3,55 +3,86 @@ package fingerprint
import (
"time"
"github.com/hashicorp/go-hclog"
"github.com/hashicorp/nomad/client/lib/cgutil"
log "github.com/hashicorp/go-hclog"
)
const (
cgroupAvailable = "available"
cgroupUnavailable = "unavailable"
interval = 15
cgroupMountPointAttribute = "unique.cgroup.mountpoint"
cgroupVersionAttribute = "unique.cgroup.version"
cgroupDetectInterval = 15 * time.Second
)
type CGroupFingerprint struct {
logger log.Logger
logger hclog.Logger
lastState string
mountPointDetector MountPointDetector
versionDetector CgroupVersionDetector
}
// An interface to isolate calls to the cgroup library
// This facilitates testing where we can implement
// fake mount points to test various code paths
// MountPointDetector isolates calls to the cgroup library.
//
// This facilitates testing where we can implement fake mount points to test
// various code paths.
type MountPointDetector interface {
// MountPoint returns a cgroup mount-point.
//
// In v1, this is one arbitrary subsystem (e.g. /sys/fs/cgroup/cpu).
//
// In v2, this is the actual root mount point (i.e. /sys/fs/cgroup).
MountPoint() (string, error)
}
// Implements the interface detector which calls the cgroups library directly
// DefaultMountPointDetector implements the interface detector which calls the cgroups
// library directly
type DefaultMountPointDetector struct {
}
// MountPoint calls out to the default cgroup library.
func (b *DefaultMountPointDetector) MountPoint() (string, error) {
func (*DefaultMountPointDetector) MountPoint() (string, error) {
return cgutil.FindCgroupMountpointDir()
}
// CgroupVersionDetector isolates calls to the cgroup library.
type CgroupVersionDetector interface {
// CgroupVersion returns v1 or v2 depending on the cgroups version in use.
CgroupVersion() string
}
// DefaultCgroupVersionDetector implements the version detector which calls the
// cgroups library directly.
type DefaultCgroupVersionDetector struct {
}
func (*DefaultCgroupVersionDetector) CgroupVersion() string {
if cgutil.UseV2 {
return "v2"
}
return "v1"
}
// NewCGroupFingerprint returns a new cgroup fingerprinter
func NewCGroupFingerprint(logger log.Logger) Fingerprint {
f := &CGroupFingerprint{
func NewCGroupFingerprint(logger hclog.Logger) Fingerprint {
return &CGroupFingerprint{
logger: logger.Named("cgroup"),
lastState: cgroupUnavailable,
mountPointDetector: &DefaultMountPointDetector{},
mountPointDetector: new(DefaultMountPointDetector),
versionDetector: new(DefaultCgroupVersionDetector),
}
return f
}
// clearCGroupAttributes clears any node attributes related to cgroups that might
// have been set in a previous fingerprint run.
func (f *CGroupFingerprint) clearCGroupAttributes(r *FingerprintResponse) {
r.RemoveAttribute("unique.cgroup.mountpoint")
r.RemoveAttribute(cgroupMountPointAttribute)
r.RemoveAttribute(cgroupVersionAttribute)
}
// Periodic determines the interval at which the periodic fingerprinter will run.
func (f *CGroupFingerprint) Periodic() (bool, time.Duration) {
return true, interval * time.Second
return true, cgroupDetectInterval
}
//go:build !linux
// +build !linux
package fingerprint
......
//go:build linux
// +build linux
package fingerprint
......@@ -7,31 +6,30 @@ import (
"fmt"
)
const (
cgroupAvailable = "available"
)
// Fingerprint tries to find a valid cgroup mount point
// Fingerprint tries to find a valid cgroup mount point and the version of cgroups
// if a mount-point is present.
func (f *CGroupFingerprint) Fingerprint(req *FingerprintRequest, resp *FingerprintResponse) error {
mount, err := f.mountPointDetector.MountPoint()
if err != nil {
f.clearCGroupAttributes(resp)
return fmt.Errorf("Failed to discover cgroup mount point: %s", err)
return fmt.Errorf("failed to discover cgroup mount point: %s", err)
}
// Check if a cgroup mount point was found
// Check if a cgroup mount point was found.
if mount == "" {
f.clearCGroupAttributes(resp)
if f.lastState == cgroupAvailable {
f.logger.Info("cgroups are unavailable")
f.logger.Warn("cgroups are now unavailable")
}
f.lastState = cgroupUnavailable
return nil
}
resp.AddAttribute("unique.cgroup.mountpoint", mount)
// Check the version in use.
version := f.versionDetector.CgroupVersion()
resp.AddAttribute(cgroupMountPointAttribute, mount)
resp.AddAttribute(cgroupVersionAttribute, version)
resp.Detected = true
if f.lastState == cgroupUnavailable {
......
//go:build linux
// +build linux
package fingerprint
......@@ -11,6 +10,7 @@ import (
"github.com/hashicorp/nomad/client/config"
"github.com/hashicorp/nomad/helper/testlog"
"github.com/hashicorp/nomad/nomad/structs"
"github.com/stretchr/testify/require"
)
// A fake mount point detector that returns an empty path
......@@ -41,94 +41,116 @@ func (m *MountPointDetectorEmptyMountPoint) MountPoint() (string, error) {
return "", nil
}
func TestCGroupFingerprint(t *testing.T) {
// A fake version detector that returns the set version.
type FakeVersionDetector struct {
version string
}
func (f *FakeVersionDetector) CgroupVersion() string {
return f.version
}
func newRequest(node *structs.Node) *FingerprintRequest {
return &FingerprintRequest{
Config: new(config.Config),
Node: node,
}
}
func newNode() *structs.Node {
return &structs.Node{
Attributes: make(map[string]string),
}
}
func TestCgroup_MountPoint(t *testing.T) {
ci.Parallel(t)
{
t.Run("mount-point fail", func(t *testing.T) {
f := &CGroupFingerprint{
logger: testlog.HCLogger(t),
lastState: cgroupUnavailable,
mountPointDetector: &MountPointDetectorMountPointFail{},
}
node := &structs.Node{
Attributes: make(map[string]string),
mountPointDetector: new(MountPointDetectorMountPointFail),
versionDetector: new(DefaultCgroupVersionDetector),
}
request := &FingerprintRequest{Config: &config.Config{}, Node: node}
request := newRequest(newNode())
var response FingerprintResponse
err := f.Fingerprint(request, &response)
if err == nil {
t.Fatalf("expected an error")
}
if a, _ := response.Attributes["unique.cgroup.mountpoint"]; a != "" {
t.Fatalf("unexpected attribute found, %s", a)
}
}
require.EqualError(t, err, "failed to discover cgroup mount point: cgroup mountpoint discovery failed")
require.Empty(t, response.Attributes[cgroupMountPointAttribute])
})
{
t.Run("mount-point available", func(t *testing.T) {
f := &CGroupFingerprint{
logger: testlog.HCLogger(t),
lastState: cgroupUnavailable,
mountPointDetector: &MountPointDetectorValidMountPoint{},
mountPointDetector: new(MountPointDetectorValidMountPoint),
versionDetector: new(DefaultCgroupVersionDetector),
}
node := &structs.Node{
Attributes: make(map[string]string),
}
request := &FingerprintRequest{Config: &config.Config{}, Node: node}
request := newRequest(newNode())
var response FingerprintResponse
err := f.Fingerprint(request, &response)
if err != nil {
t.Fatalf("unexpected error, %s", err)
}
if a, ok := response.Attributes["unique.cgroup.mountpoint"]; !ok {
t.Fatalf("unable to find attribute: %s", a)
}
}
require.NoError(t, err)
require.Equal(t, "/sys/fs/cgroup", response.Attributes[cgroupMountPointAttribute])
})
{
t.Run("mount-point empty", func(t *testing.T) {
f := &CGroupFingerprint{
logger: testlog.HCLogger(t),
lastState: cgroupUnavailable,
mountPointDetector: &MountPointDetectorEmptyMountPoint{},
mountPointDetector: new(MountPointDetectorEmptyMountPoint),
versionDetector: new(DefaultCgroupVersionDetector),
}
node := &structs.Node{
Attributes: make(map[string]string),
}
request := &FingerprintRequest{Config: &config.Config{}, Node: node}
var response FingerprintResponse
err := f.Fingerprint(request, &response)
if err != nil {
t.Fatalf("unexpected error, %s", err)
}
if a, _ := response.Attributes["unique.cgroup.mountpoint"]; a != "" {
t.Fatalf("unexpected attribute found, %s", a)
}
}
{
err := f.Fingerprint(newRequest(newNode()), &response)
require.NoError(t, err)
require.Empty(t, response.Attributes[cgroupMountPointAttribute])
})
t.Run("mount-point already present", func(t *testing.T) {
f := &CGroupFingerprint{
logger: testlog.HCLogger(t),
lastState: cgroupAvailable,
mountPointDetector: &MountPointDetectorValidMountPoint{},
mountPointDetector: new(MountPointDetectorValidMountPoint),
versionDetector: new(DefaultCgroupVersionDetector),
}
node := &structs.Node{
Attributes: make(map[string]string),
var response FingerprintResponse
err := f.Fingerprint(newRequest(newNode()), &response)
require.NoError(t, err)
require.Equal(t, "/sys/fs/cgroup", response.Attributes[cgroupMountPointAttribute])
})
}
func TestCgroup_Version(t *testing.T) {
t.Run("version v1", func(t *testing.T) {
f := &CGroupFingerprint{
logger: testlog.HCLogger(t),
lastState: cgroupUnavailable,
mountPointDetector: new(MountPointDetectorValidMountPoint),
versionDetector: &FakeVersionDetector{version: "v1"},
}
request := &FingerprintRequest{Config: &config.Config{}, Node: node}
var response FingerprintResponse
err := f.Fingerprint(request, &response)
if err != nil {
t.Fatalf("unexpected error, %s", err)
}
if a, _ := response.Attributes["unique.cgroup.mountpoint"]; a == "" {
t.Fatalf("expected attribute to be found, %s", a)
err := f.Fingerprint(newRequest(newNode()), &response)
require.NoError(t, err)
require.Equal(t, "v1", response.Attributes[cgroupVersionAttribute])
})
t.Run("without mount-point", func(t *testing.T) {
f := &CGroupFingerprint{
logger: testlog.HCLogger(t),
lastState: cgroupUnavailable,
mountPointDetector: new(MountPointDetectorEmptyMountPoint),
versionDetector: &FakeVersionDetector{version: "v1"},
}
}
var response FingerprintResponse
err := f.Fingerprint(newRequest(newNode()), &response)
require.NoError(t, err)
require.Empty(t, response.Attributes[cgroupMountPointAttribute])
})
}
......@@ -5,9 +5,8 @@ import (
)
func (f *CPUFingerprint) deriveReservableCores(req *FingerprintRequest) ([]uint16, error) {
parent := req.Config.CgroupParent
if parent == "" {
parent = cgutil.DefaultCgroupParent
}
return cgutil.GetCPUsFromCgroup(parent)
// The cpuset cgroup manager is initialized (on linux), but not accessible
// from the finger-printer. So we reach in and grab the information manually.
// We may assume the hierarchy is already setup.
return cgutil.GetCPUsFromCgroup(req.Config.CgroupParent)
}
//go:build !linux
// +build !linux
package cgutil
const (
DefaultCgroupParent = ""
)
// FindCgroupMountpointDir is used to find the cgroup mount point on a Linux
// system. Here it is a no-op implemtation
func FindCgroupMountpointDir() (string, error) {
return "", nil
}
//go:build linux
package cgutil
import (
"fmt"
"os"
"path/filepath"
cgroupFs "github.com/opencontainers/runc/libcontainer/cgroups/fs"
"github.com/hashicorp/go-hclog"
"github.com/hashicorp/nomad/helper/uuid"
"github.com/opencontainers/runc/libcontainer/cgroups"
"github.com/opencontainers/runc/libcontainer/cgroups/fscommon"
"github.com/opencontainers/runc/libcontainer/configs"
"golang.org/x/sys/unix"
)
const (
DefaultCgroupParent = "/nomad"
SharedCpusetCgroupName = "shared"
ReservedCpusetCgroupName = "reserved"
lcc "github.com/opencontainers/runc/libcontainer/configs"
)
func GetCPUsFromCgroup(group string) ([]uint16, error) {
cgroupPath, err := getCgroupPathHelper("cpuset", group)
if err != nil {
return nil, err
}
// UseV2 indicates whether only cgroups.v2 is enabled. If cgroups.v2 is not
// enabled or is running in hybrid mode with cgroups.v1, Nomad will make use of
// cgroups.v1
//
// This is a read-only value.
var UseV2 = cgroups.IsCgroup2UnifiedMode()
// GetCgroupParent returns the mount point under the root cgroup in which Nomad
// will create cgroups. If parent is not set, an appropriate name for the version
// of cgroups will be used.
func GetCgroupParent(parent string) string {
if UseV2 {
return getParentV2(parent)
}
return getParentV1(parent)
}
man := cgroupFs.NewManager(&configs.Cgroup{Path: group}, map[string]string{"cpuset": cgroupPath}, false)
stats, err := man.GetStats()
if err != nil {
return nil, err
// CreateCPUSetManager creates a V1 or V2 CpusetManager depending on system configuration.
func CreateCPUSetManager(parent string, logger hclog.Logger) CpusetManager {
if UseV2 {
return NewCpusetManagerV2(getParentV2(parent), logger.Named("cpuset.v2"))
}
return stats.CPUSetStats.CPUs, nil
return NewCpusetManagerV1(getParentV1(parent), logger.Named("cpuset.v1"))
}
func getCpusetSubsystemSettings(parent string) (cpus, mems string, err error) {
if cpus, err = fscommon.ReadFile(parent, "cpuset.cpus"); err != nil {
return
}
if mems, err = fscommon.ReadFile(parent, "cpuset.mems"); err != nil {
return
// GetCPUsFromCgroup gets the effective cpuset value for the given cgroup.
func GetCPUsFromCgroup(group string) ([]uint16, error) {
if UseV2 {
return getCPUsFromCgroupV2(getParentV2(group))
}
return cpus, mems, nil
return getCPUsFromCgroupV1(getParentV1(group))
}
// cpusetEnsureParent makes sure that the parent directories of current
// are created and populated with the proper cpus and mems files copied
// from their respective parent. It does that recursively, starting from
// the top of the cpuset hierarchy (i.e. cpuset cgroup mount point).
func cpusetEnsureParent(current string) error {
var st unix.Statfs_t
// CgroupScope returns the name of the scope for Nomad's managed cgroups for
// the given allocID and task.
//
// e.g. "<allocID>-<task>.scope"
//
// Only useful for v2.
func CgroupScope(allocID, task string) string {
return fmt.Sprintf("%s.%s.scope", allocID, task)
}
parent := filepath.Dir(current)
err := unix.Statfs(parent, &st)
if err == nil && st.Type != unix.CGROUP_SUPER_MAGIC {
// ConfigureBasicCgroups will initialize cgroups for v1.
//
// Not useful in cgroups.v2
func ConfigureBasicCgroups(config *lcc.Config) error {
if UseV2 {
// In v2 the default behavior is to create inherited interface files for
// all mounted subsystems automatically.
return nil
}
// Treat non-existing directory as cgroupfs as it will be created,
// and the root cpuset directory obviously exists.
if err != nil && err != unix.ENOENT {
return &os.PathError{Op: "statfs", Path: parent, Err: err}
}
if err := cpusetEnsureParent(parent); err != nil {
return err
}
if err := os.Mkdir(current, 0755); err != nil && !os.IsExist(err) {
return err
}
return cpusetCopyIfNeeded(current, parent)
}
// cpusetCopyIfNeeded copies the cpuset.cpus and cpuset.mems from the parent
// directory to the current directory if the file's contents are 0
func cpusetCopyIfNeeded(current, parent string) error {
currentCpus, currentMems, err := getCpusetSubsystemSettings(current)
id := uuid.Generate()
// In v1 we must setup the freezer cgroup ourselves.
subsystem := "freezer"
path, err := GetCgroupPathHelperV1(subsystem, filepath.Join(DefaultCgroupV1Parent, id))
if err != nil {
return err
return fmt.Errorf("failed to find %s cgroup mountpoint: %v", subsystem, err)
}
parentCpus, parentMems, err := getCpusetSubsystemSettings(parent)
if err != nil {
if err = os.MkdirAll(path, 0755); err != nil {
return err
}
if isEmptyCpuset(currentCpus) {
if err := fscommon.WriteFile(current, "cpuset.cpus", parentCpus); err != nil {
return err
}
}
if isEmptyCpuset(currentMems) {
if err := fscommon.WriteFile(current, "cpuset.mems", parentMems); err != nil {
return err
}
config.Cgroups.Paths = map[string]string{
subsystem: path,
}
return nil
}
func isEmptyCpuset(str string) bool {
return str == "" || str == "\n"
}
func getCgroupPathHelper(subsystem, cgroup string) (string, error) {
mnt, root, err := cgroups.FindCgroupMountpointAndRoot("", subsystem)
if err != nil {
return "", err
}
// This is needed for nested containers, because in /proc/self/cgroup we
// see paths from host, which don't exist in container.
relCgroup, err := filepath.Rel(root, cgroup)
if err != nil {
return "", err
}
return filepath.Join(mnt, relCgroup), nil
}
// FindCgroupMountpointDir is used to find the cgroup mount point on a Linux
// system.
//
// Note that in cgroups.v1, this returns one of many subsystems that are mounted.
// e.g. a return value of "/sys/fs/cgroup/systemd" really implies the root is
// "/sys/fs/cgroup", which is interesting on hybrid systems where the 'unified'
// subsystem is mounted as if it were a subsystem, but the actual root is different.
// (i.e. /sys/fs/cgroup/unified).
//
// As far as Nomad is concerned, UseV2 is the source of truth for which hierarchy
// to use, and that will only be a true value if cgroups.v2 is mounted on
// /sys/fs/cgroup (i.e. system is not in v1 or hybrid mode).
//
// ➜ mount -l | grep cgroup
// tmpfs on /sys/fs/cgroup type tmpfs (ro,nosuid,nodev,noexec,mode=755,inode64)
// cgroup2 on /sys/fs/cgroup/unified type cgroup2 (rw,nosuid,nodev,noexec,relatime,nsdelegate)
// cgroup on /sys/fs/cgroup/systemd type cgroup (rw,nosuid,nodev,noexec,relatime,xattr,name=systemd)
// cgroup on /sys/fs/cgroup/memory type cgroup (rw,nosuid,nodev,noexec,relatime,memory)
// (etc.)
func FindCgroupMountpointDir() (string, error) {
mount, err := cgroups.GetCgroupMounts(false)
if err != nil {
......@@ -127,3 +112,18 @@ func FindCgroupMountpointDir() (string, error) {
}
return mount[0].Mountpoint, nil
}
// CopyCpuset copies the cpuset.cpus value from source into destination.
func CopyCpuset(source, destination string) error {
correct, err := cgroups.ReadFile(source, "cpuset.cpus")
if err != nil {
return err
}
err = cgroups.WriteFile(destination, "cpuset.cpus", correct)
if err != nil {
return err
}
return nil
}
//go:build linux
package cgutil
import (
"path/filepath"
"strings"
"testing"
"github.com/hashicorp/nomad/ci"
"github.com/hashicorp/nomad/client/testutil"
"github.com/hashicorp/nomad/helper/testlog"
"github.com/hashicorp/nomad/helper/uuid"
"github.com/opencontainers/runc/libcontainer/cgroups"
"github.com/opencontainers/runc/libcontainer/cgroups/fs2"
"github.com/stretchr/testify/require"
)
func TestUtil_GetCgroupParent(t *testing.T) {
ci.Parallel(t)
t.Run("v1", func(t *testing.T) {
testutil.CgroupsCompatibleV1(t)
t.Run("default", func(t *testing.T) {
exp := "/nomad"
parent := GetCgroupParent("")
require.Equal(t, exp, parent)
})
t.Run("configured", func(t *testing.T) {
exp := "/bar"
parent := GetCgroupParent("/bar")
require.Equal(t, exp, parent)
})
})
t.Run("v2", func(t *testing.T) {
testutil.CgroupsCompatibleV2(t)
t.Run("default", func(t *testing.T) {
exp := "nomad.slice"
parent := GetCgroupParent("")
require.Equal(t, exp, parent)
})
t.Run("configured", func(t *testing.T) {
exp := "abc.slice"
parent := GetCgroupParent("abc.slice")
require.Equal(t, exp, parent)
})
})
}
func TestUtil_CreateCPUSetManager(t *testing.T) {
ci.Parallel(t)
logger := testlog.HCLogger(t)
t.Run("v1", func(t *testing.T) {
testutil.CgroupsCompatibleV1(t)
parent := "/" + uuid.Short()
manager := CreateCPUSetManager(parent, logger)
err := manager.Init([]uint16{0})
require.NoError(t, err)
require.NoError(t, cgroups.RemovePath(filepath.Join(CgroupRoot, parent)))
})
t.Run("v2", func(t *testing.T) {
testutil.CgroupsCompatibleV2(t)
parent := uuid.Short() + ".slice"
manager := CreateCPUSetManager(parent, logger)
err := manager.Init([]uint16{0})
require.NoError(t, err)
require.NoError(t, cgroups.RemovePath(filepath.Join(CgroupRoot, parent)))
})
}
func TestUtil_GetCPUsFromCgroup(t *testing.T) {
ci.Parallel(t)
t.Run("v2", func(t *testing.T) {
testutil.CgroupsCompatibleV2(t)
cpus, err := GetCPUsFromCgroup("system.slice") // thanks, systemd!
require.NoError(t, err)
require.NotEmpty(t, cpus)
})
}
func create(t *testing.T, name string) {
mgr, err := fs2.NewManager(nil, filepath.Join(CgroupRoot, name), rootless)
require.NoError(t, err)
err = mgr.Apply(CreationPID)
require.NoError(t, err)
}
func cleanup(t *testing.T, name string) {
err := cgroups.RemovePath(name)
require.NoError(t, err)
}
func TestUtil_CopyCpuset(t *testing.T) {
ci.Parallel(t)
t.Run("v2", func(t *testing.T) {
testutil.CgroupsCompatibleV2(t)
source := uuid.Short() + ".scope"
create(t, source)
defer cleanup(t, source)
require.NoError(t, cgroups.WriteFile(filepath.Join(CgroupRoot, source), "cpuset.cpus", "0-1"))
destination := uuid.Short() + ".scope"
create(t, destination)
defer cleanup(t, destination)
err := CopyCpuset(
filepath.Join(CgroupRoot, source),
filepath.Join(CgroupRoot, destination),
)
require.NoError(t, err)
value, readErr := cgroups.ReadFile(filepath.Join(CgroupRoot, destination), "cpuset.cpus")
require.NoError(t, readErr)
require.Equal(t, "0-1", strings.TrimSpace(value))
})
}
//go:build !linux
package cgutil
import (
"github.com/hashicorp/go-hclog"
)
const (
// DefaultCgroupParent does not apply to non-Linux operating systems.
DefaultCgroupParent = ""
)
// UseV2 is always false on non-Linux systems.
//
// This is a read-only value.
var UseV2 = false
// CreateCPUSetManager creates a no-op CpusetManager for non-Linux operating systems.
func CreateCPUSetManager(string, hclog.Logger) CpusetManager {
return new(NoopCpusetManager)
}
// FindCgroupMountpointDir returns nothing for non-Linux operating systems.
func FindCgroupMountpointDir() (string, error) {
return "", nil
}
// GetCgroupParent returns nothing for non-Linux operating systems.
func GetCgroupParent(string) string {
return DefaultCgroupParent
}
// GetCPUsFromCgroup returns nothing for non-Linux operating systems.
func GetCPUsFromCgroup(string) ([]uint16, error) {
return nil, nil
}
// CgroupScope returns nothing for non-Linux operating systems.
func CgroupScope(allocID, task string) string {
return ""
}
......@@ -2,18 +2,26 @@ package cgutil
import (
"context"
"fmt"
"path/filepath"
"strings"
"github.com/hashicorp/nomad/lib/cpuset"
"github.com/hashicorp/nomad/nomad/structs"
)
// CpusetManager is used to setup cpuset cgroups for each task. A pool of shared cpus is managed for
// tasks which don't require any reserved cores and a cgroup is managed seperetly for each task which
// require reserved cores.
const (
// CgroupRoot is hard-coded in the cgroups specification.
// It only applies to linux but helpers have references to it in driver(s).
CgroupRoot = "/sys/fs/cgroup"
)
// CpusetManager is used to setup cpuset cgroups for each task.
type CpusetManager interface {
// Init should be called before any tasks are managed to ensure the cgroup parent exists and
// check that proper permissions are granted to manage cgroups.
Init() error
// Init should be called with the initial set of reservable cores before any
// allocations are managed. Ensures the parent cgroup exists and proper permissions
// are available for managing cgroups.
Init([]uint16) error
// AddAlloc adds an allocation to the manager
AddAlloc(alloc *structs.Allocation)
......@@ -26,8 +34,26 @@ type CpusetManager interface {
CgroupPathFor(allocID, taskName string) CgroupPathGetter
}
// CgroupPathGetter is a function which returns the cgroup path and any error which ocured during cgroup initialization.
// It should block until the cgroup has been created or an error is reported
type NoopCpusetManager struct{}
func (n NoopCpusetManager) Init([]uint16) error {
return nil
}
func (n NoopCpusetManager) AddAlloc(alloc *structs.Allocation) {
}
func (n NoopCpusetManager) RemoveAlloc(allocID string) {
}
func (n NoopCpusetManager) CgroupPathFor(allocID, task string) CgroupPathGetter {
return func(context.Context) (string, error) { return "", nil }
}
// CgroupPathGetter is a function which returns the cgroup path and any error which
// occurred during cgroup initialization.
//
// It should block until the cgroup has been created or an error is reported.
type CgroupPathGetter func(context.Context) (path string, err error)
type TaskCgroupInfo struct {
......@@ -37,20 +63,25 @@ type TaskCgroupInfo struct {
Error error
}
func NoopCpusetManager() CpusetManager { return noopCpusetManager{} }
type noopCpusetManager struct{}
// identity is the "<allocID>.<taskName>" string that uniquely identifies an
// individual instance of a task within the flat cgroup namespace
type identity string
func (n noopCpusetManager) Init() error {
return nil
}
func (n noopCpusetManager) AddAlloc(alloc *structs.Allocation) {
func makeID(allocID, task string) identity {
return identity(fmt.Sprintf("%s.%s", allocID, task))
}
func (n noopCpusetManager) RemoveAlloc(allocID string) {
func makeScope(id identity) string {
return string(id) + ".scope"
}
func (n noopCpusetManager) CgroupPathFor(allocID, task string) CgroupPathGetter {
return func(context.Context) (string, error) { return "", nil }
// SplitPath determines the parent and cgroup from p.
// p must contain at least 2 elements (parent + cgroup).
//
// Handles the cgroup root if present.
func SplitPath(p string) (string, string) {
p = strings.TrimPrefix(p, CgroupRoot)
p = strings.Trim(p, "/")
parts := strings.Split(p, "/")
return parts[0], "/" + filepath.Join(parts[1:]...)
}
//go:build !linux
// +build !linux
package cgutil
import (
"github.com/hashicorp/go-hclog"
)
func NewCpusetManager(_ string, _ hclog.Logger) CpusetManager { return noopCpusetManager{} }
package cgutil
import (
"testing"
"github.com/hashicorp/nomad/ci"
"github.com/stretchr/testify/require"
)
func TestUtil_SplitPath(t *testing.T) {
ci.Parallel(t)
try := func(input, expParent, expCgroup string) {
parent, cgroup := SplitPath(input)
require.Equal(t, expParent, parent)
require.Equal(t, expCgroup, cgroup)
}
// foo, /bar
try("foo/bar", "foo", "/bar")
try("/foo/bar/", "foo", "/bar")
try("/sys/fs/cgroup/foo/bar", "foo", "/bar")
// foo, /bar/baz
try("/foo/bar/baz/", "foo", "/bar/baz")
try("foo/bar/baz", "foo", "/bar/baz")
try("/sys/fs/cgroup/foo/bar/baz", "foo", "/bar/baz")
}
//go:build linux
package cgutil
import (
......@@ -10,21 +12,28 @@ import (
"sync"
"time"
"github.com/opencontainers/runc/libcontainer/cgroups"
cgroupFs "github.com/opencontainers/runc/libcontainer/cgroups/fs"
"github.com/opencontainers/runc/libcontainer/cgroups/fscommon"
"github.com/opencontainers/runc/libcontainer/configs"
"github.com/hashicorp/go-hclog"
"github.com/hashicorp/nomad/helper"
"github.com/hashicorp/nomad/lib/cpuset"
"github.com/hashicorp/nomad/nomad/structs"
"github.com/opencontainers/runc/libcontainer/cgroups"
"github.com/opencontainers/runc/libcontainer/cgroups/fs"
"github.com/opencontainers/runc/libcontainer/configs"
"golang.org/x/sys/unix"
)
const (
DefaultCgroupV1Parent = "/nomad"
SharedCpusetCgroupName = "shared"
ReservedCpusetCgroupName = "reserved"
)
func NewCpusetManager(cgroupParent string, logger hclog.Logger) CpusetManager {
// NewCpusetManagerV1 creates a CpusetManager compatible with cgroups.v1
func NewCpusetManagerV1(cgroupParent string, logger hclog.Logger) CpusetManager {
if cgroupParent == "" {
cgroupParent = DefaultCgroupParent
cgroupParent = DefaultCgroupV1Parent
}
return &cpusetManager{
return &cpusetManagerV1{
cgroupParent: cgroupParent,
cgroupInfo: map[string]allocTaskCgroupInfo{},
logger: logger,
......@@ -35,7 +44,7 @@ var (
cpusetReconcileInterval = 30 * time.Second
)
type cpusetManager struct {
type cpusetManagerV1 struct {
// cgroupParent relative to the cgroup root. ex. '/nomad'
cgroupParent string
// cgroupParentPath is the absolute path to the cgroup parent.
......@@ -53,7 +62,7 @@ type cpusetManager struct {
logger hclog.Logger
}
func (c *cpusetManager) AddAlloc(alloc *structs.Allocation) {
func (c *cpusetManagerV1) AddAlloc(alloc *structs.Allocation) {
if alloc == nil || alloc.AllocatedResources == nil {
return
}
......@@ -77,14 +86,14 @@ func (c *cpusetManager) AddAlloc(alloc *structs.Allocation) {
go c.signalReconcile()
}
func (c *cpusetManager) RemoveAlloc(allocID string) {
func (c *cpusetManagerV1) RemoveAlloc(allocID string) {
c.mu.Lock()
delete(c.cgroupInfo, allocID)
c.mu.Unlock()
go c.signalReconcile()
}
func (c *cpusetManager) CgroupPathFor(allocID, task string) CgroupPathGetter {
func (c *cpusetManagerV1) CgroupPathFor(allocID, task string) CgroupPathGetter {
return func(ctx context.Context) (string, error) {
c.mu.Lock()
allocInfo, ok := c.cgroupInfo[allocID]
......@@ -99,15 +108,21 @@ func (c *cpusetManager) CgroupPathFor(allocID, task string) CgroupPathGetter {
return "", fmt.Errorf("task %q not found", task)
}
timer, stop := helper.NewSafeTimer(0)
defer stop()
for {
if taskInfo.Error != nil {
break
}
timer.Reset(100 * time.Millisecond)
if _, err := os.Stat(taskInfo.CgroupPath); os.IsNotExist(err) {
select {
case <-ctx.Done():
return taskInfo.CgroupPath, ctx.Err()
case <-time.After(100 * time.Millisecond):
case <-timer.C:
continue
}
}
......@@ -119,24 +134,25 @@ func (c *cpusetManager) CgroupPathFor(allocID, task string) CgroupPathGetter {
}
// task name -> task cgroup info
type allocTaskCgroupInfo map[string]*TaskCgroupInfo
// Init checks that the cgroup parent and expected child cgroups have been created
// If the cgroup parent is set to /nomad then this will ensure that the /nomad/shared
// cgroup is initialized.
func (c *cpusetManager) Init() error {
cgroupParentPath, err := getCgroupPathHelper("cpuset", c.cgroupParent)
func (c *cpusetManagerV1) Init(_ []uint16) error {
cgroupParentPath, err := GetCgroupPathHelperV1("cpuset", c.cgroupParent)
if err != nil {
return err
}
c.cgroupParentPath = cgroupParentPath
// ensures that shared cpuset exists and that the cpuset values are copied from the parent if created
if err := cpusetEnsureParent(filepath.Join(cgroupParentPath, SharedCpusetCgroupName)); err != nil {
if err := cpusetEnsureParentV1(filepath.Join(cgroupParentPath, SharedCpusetCgroupName)); err != nil {
return err
}
parentCpus, parentMems, err := getCpusetSubsystemSettings(cgroupParentPath)
parentCpus, parentMems, err := getCpusetSubsystemSettingsV1(cgroupParentPath)
if err != nil {
return fmt.Errorf("failed to detect parent cpuset settings: %v", err)
}
......@@ -155,7 +171,7 @@ func (c *cpusetManager) Init() error {
return err
}
if err := fscommon.WriteFile(filepath.Join(cgroupParentPath, ReservedCpusetCgroupName), "cpuset.mems", parentMems); err != nil {
if err := cgroups.WriteFile(filepath.Join(cgroupParentPath, ReservedCpusetCgroupName), "cpuset.mems", parentMems); err != nil {
return err
}
......@@ -168,7 +184,7 @@ func (c *cpusetManager) Init() error {
return nil
}
func (c *cpusetManager) reconcileLoop() {
func (c *cpusetManagerV1) reconcileLoop() {
timer := time.NewTimer(0)
if !timer.Stop() {
<-timer.C
......@@ -189,7 +205,7 @@ func (c *cpusetManager) reconcileLoop() {
}
}
func (c *cpusetManager) reconcileCpusets() {
func (c *cpusetManagerV1) reconcileCpusets() {
c.mu.Lock()
defer c.mu.Unlock()
sharedCpuset := cpuset.New(c.parentCpuset.ToSlice()...)
......@@ -240,13 +256,13 @@ func (c *cpusetManager) reconcileCpusets() {
}
// copy cpuset.mems from parent
_, parentMems, err := getCpusetSubsystemSettings(filepath.Dir(info.CgroupPath))
_, parentMems, err := getCpusetSubsystemSettingsV1(filepath.Dir(info.CgroupPath))
if err != nil {
c.logger.Error("failed to read parent cgroup settings for task", "path", info.CgroupPath, "error", err)
info.Error = err
continue
}
if err := fscommon.WriteFile(info.CgroupPath, "cpuset.mems", parentMems); err != nil {
if err := cgroups.WriteFile(info.CgroupPath, "cpuset.mems", parentMems); err != nil {
c.logger.Error("failed to write cgroup cpuset.mems setting for task", "path", info.CgroupPath, "mems", parentMems, "error", err)
info.Error = err
continue
......@@ -260,30 +276,30 @@ func (c *cpusetManager) reconcileCpusets() {
}
// setCgroupCpusetCPUs will compare an existing cpuset.cpus value with an expected value, overwriting the existing if different
// must hold a lock on cpusetManager.mu before calling
func (_ *cpusetManager) setCgroupCpusetCPUs(path, cpus string) error {
currentCpusRaw, err := fscommon.ReadFile(path, "cpuset.cpus")
// must hold a lock on cpusetManagerV1.mu before calling
func (_ *cpusetManagerV1) setCgroupCpusetCPUs(path, cpus string) error {
currentCpusRaw, err := cgroups.ReadFile(path, "cpuset.cpus")
if err != nil {
return err
}
if cpus != strings.TrimSpace(currentCpusRaw) {
if err := fscommon.WriteFile(path, "cpuset.cpus", cpus); err != nil {
if err := cgroups.WriteFile(path, "cpuset.cpus", cpus); err != nil {
return err
}
}
return nil
}
func (c *cpusetManager) signalReconcile() {
func (c *cpusetManagerV1) signalReconcile() {
select {
case c.signalCh <- struct{}{}:
case <-c.doneCh:
}
}
func (c *cpusetManager) getCpuset(group string) (cpuset.CPUSet, error) {
man := cgroupFs.NewManager(
func (c *cpusetManagerV1) getCpuset(group string) (cpuset.CPUSet, error) {
man := fs.NewManager(
&configs.Cgroup{
Path: filepath.Join(c.cgroupParent, group),
},
......@@ -297,15 +313,119 @@ func (c *cpusetManager) getCpuset(group string) (cpuset.CPUSet, error) {
return cpuset.New(stats.CPUSetStats.CPUs...), nil
}
func (c *cpusetManager) getCgroupPathsForTask(allocID, task string) (absolute, relative string) {
func (c *cpusetManagerV1) getCgroupPathsForTask(allocID, task string) (absolute, relative string) {
return filepath.Join(c.reservedCpusetPath(), fmt.Sprintf("%s-%s", allocID, task)),
filepath.Join(c.cgroupParent, ReservedCpusetCgroupName, fmt.Sprintf("%s-%s", allocID, task))
}
func (c *cpusetManager) sharedCpusetPath() string {
func (c *cpusetManagerV1) sharedCpusetPath() string {
return filepath.Join(c.cgroupParentPath, SharedCpusetCgroupName)
}
func (c *cpusetManager) reservedCpusetPath() string {
func (c *cpusetManagerV1) reservedCpusetPath() string {
return filepath.Join(c.cgroupParentPath, ReservedCpusetCgroupName)
}
func getCPUsFromCgroupV1(group string) ([]uint16, error) {
cgroupPath, err := GetCgroupPathHelperV1("cpuset", group)
if err != nil {
return nil, err
}
man := fs.NewManager(&configs.Cgroup{Path: group}, map[string]string{"cpuset": cgroupPath}, false)
stats, err := man.GetStats()
if err != nil {
return nil, err
}
return stats.CPUSetStats.CPUs, nil
}
func getParentV1(parent string) string {
if parent == "" {
return DefaultCgroupV1Parent
}
return parent
}
// cpusetEnsureParentV1 makes sure that the parent directories of current
// are created and populated with the proper cpus and mems files copied
// from their respective parent. It does that recursively, starting from
// the top of the cpuset hierarchy (i.e. cpuset cgroup mount point).
func cpusetEnsureParentV1(current string) error {
var st unix.Statfs_t
parent := filepath.Dir(current)
err := unix.Statfs(parent, &st)
if err == nil && st.Type != unix.CGROUP_SUPER_MAGIC {
return nil
}
// Treat non-existing directory as cgroupfs as it will be created,
// and the root cpuset directory obviously exists.
if err != nil && err != unix.ENOENT {
return &os.PathError{Op: "statfs", Path: parent, Err: err}
}
if err := cpusetEnsureParentV1(parent); err != nil {
return err
}
if err := os.Mkdir(current, 0755); err != nil && !os.IsExist(err) {
return err
}
return cpusetCopyIfNeededV1(current, parent)
}
// cpusetCopyIfNeededV1 copies the cpuset.cpus and cpuset.mems from the parent
// directory to the current directory if the file's contents are 0
func cpusetCopyIfNeededV1(current, parent string) error {
currentCpus, currentMems, err := getCpusetSubsystemSettingsV1(current)
if err != nil {
return err
}
parentCpus, parentMems, err := getCpusetSubsystemSettingsV1(parent)
if err != nil {
return err
}
if isEmptyCpusetV1(currentCpus) {
if err := cgroups.WriteFile(current, "cpuset.cpus", parentCpus); err != nil {
return err
}
}
if isEmptyCpusetV1(currentMems) {
if err := cgroups.WriteFile(current, "cpuset.mems", parentMems); err != nil {
return err
}
}
return nil
}
func getCpusetSubsystemSettingsV1(parent string) (cpus, mems string, err error) {
if cpus, err = cgroups.ReadFile(parent, "cpuset.cpus"); err != nil {
return
}
if mems, err = cgroups.ReadFile(parent, "cpuset.mems"); err != nil {
return
}
return cpus, mems, nil
}
func isEmptyCpusetV1(str string) bool {
return str == "" || str == "\n"
}
func GetCgroupPathHelperV1(subsystem, cgroup string) (string, error) {
mnt, root, err := cgroups.FindCgroupMountpointAndRoot("", subsystem)
if err != nil {
return "", err
}
// This is needed for nested containers, because in /proc/self/cgroup we
// see paths from host, which don't exist in container.
relCgroup, err := filepath.Rel(root, cgroup)
if err != nil {
return "", err
}
result := filepath.Join(mnt, relCgroup)
return result, nil
}
Supports Markdown
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment