Unverified Commit 9dcddb5b authored by travisn's avatar travisn
Browse files

ceph: upgrade all daemons during ceph upgrade


The CephCluster CR contains settings that are needed by other
CRs to configure the Ceph daemons. When the CephCluster CR
is updated, the updates will now be passed on to each of the
CR controllers to ensure the daemons are updated properly
without requiring an operator restart.

When calling the controllers from another controller,
we ensure that only a single goroutine is handling
CRs at any given time to prevent contention across
multiple CRs of the same type.
Signed-off-by: default avatartravisn <tnielsen@redhat.com>
parent 284b00fb
Showing with 279 additions and 97 deletions
+279 -97
......@@ -428,7 +428,8 @@ cephosd: configuring osd devices: {"Entries":{"sdb":{"Data":-1,"Metadata":null},
## Solution
After you have either updated the CRD with the correct settings, or you have cleaned the partitions or file system from your devices,
you can trigger the operator to analyze the devices again by restarting the operator. Each time the operator starts, it
will ensure all the desired devices are configured.
will ensure all the desired devices are configured. The operator does automatically deploy OSDs in most scenarios, but an operator restart
will cover any scenarios that the operator doesn't detect automatically.
```
# Restart the operator to ensure devices are configured. A new pod will automatically be started when the current operator pod is deleted.
......@@ -542,4 +543,4 @@ You don't need to restart the pod, the effect will be immediate.
To disable the logging on file, simply set `log_to_file` to `false`.
For Ceph Luminous/Mimic releases, `mon_cluster_log_file` and `cluster_log_file` can be set to `/var/log/ceph/XXXX` in the config override ConfigMap to enable logging. See the (Advanced Documentation)[Documentation/advanced-configuration.md#kubernetes] for information about how to use the config override ConfigMap.
\ No newline at end of file
For Ceph Luminous/Mimic releases, `mon_cluster_log_file` and `cluster_log_file` can be set to `/var/log/ceph/XXXX` in the config override ConfigMap to enable logging. See the (Advanced Documentation)[advanced-configuration.md#custom-cephconf-settings] for information about how to use the config override ConfigMap.
......@@ -13,6 +13,11 @@
### Ceph
- Ceph Nautilus (`v14`) is now supported by Rook and is the default version deployed by the examples.
- An operator restart is no longer needed to apply changes to the cluster in the following scenarios:
- When a node is added to the cluster, OSDs will be automatically configured if needed.
- When a device is attached to a storage node, OSDs will be automatically configured if needed.
- Any change to the CephCluster CR will trigger updates to the cluster.
- Upgrading the Ceph version will update all Ceph daemons (in v0.9, mds and rgw daemons were skipped)
- Ceph status is surfaced in the CephCluster CR and periodically updated by the operator (default is 60s). The interval can be configured with the `ROOK_CEPH_STATUS_CHECK_INTERVAL` env var.
- A `CephNFS` CRD will start NFS daemon(s) for exporting CephFS volumes or RGW buckets. See the [NFS documentation](Documentation/ceph-nfs-crd.md).
- Selinux labeling for mounts can now be toggled with the [ROOK_ENABLE_SELINUX_RELABELING](https://github.com/rook/rook/issues/2417) environment variable.
......
......@@ -57,6 +57,13 @@ type cluster struct {
orchestrationRunning bool
orchestrationNeeded bool
orchMux sync.Mutex
childControllers []childController
}
// ChildController is implemented by CRs that are owned by the CephCluster
type childController interface {
// ParentClusterChanged is called when the CephCluster CR is updated, for example for a newer ceph version
ParentClusterChanged(cluster cephv1.ClusterSpec, clusterInfo *cephconfig.ClusterInfo)
}
func newCluster(c *cephv1.CephCluster, context *clusterd.Context) *cluster {
......@@ -222,6 +229,11 @@ func (c *cluster) doOrchestration(rookImage string, cephVersion cephver.CephVers
logger.Infof("Done creating rook instance in namespace %s", c.Namespace)
// Notify the child controllers that the cluster spec might have changed
for _, child := range c.childControllers {
child.ParentClusterChanged(*c.Spec, clusterInfo)
}
return nil
}
......@@ -293,7 +305,7 @@ func clusterChanged(oldCluster, newCluster cephv1.ClusterSpec, clusterRef *clust
// any change in the crd will trigger an orchestration
if !reflect.DeepEqual(oldCluster, newCluster) {
diff := cmp.Diff(oldCluster, newCluster)
logger.Infof("The Cluster CRD has changed. diff=%s", diff)
logger.Infof("The Cluster CR has changed. diff=%s", diff)
return true, diff
}
......
......@@ -306,24 +306,28 @@ func (c *ClusterController) onAdd(obj interface{}) {
}
// Start pool CRD watcher
poolController := pool.NewPoolController(c.context)
poolController.StartWatch(cluster.Namespace, cluster.stopCh)
poolController := pool.NewPoolController(c.context, cluster.Namespace)
poolController.StartWatch(cluster.stopCh)
// Start object store CRD watcher
objectStoreController := object.NewObjectStoreController(cluster.Info, c.context, c.rookImage, cluster.Spec.CephVersion, cluster.Spec.Network.HostNetwork, cluster.ownerRef, cluster.Spec.DataDirHostPath)
objectStoreController.StartWatch(cluster.Namespace, cluster.stopCh)
objectStoreController := object.NewObjectStoreController(cluster.Info, c.context, cluster.Namespace, c.rookImage, cluster.Spec.CephVersion, cluster.Spec.Network.HostNetwork, cluster.ownerRef, cluster.Spec.DataDirHostPath)
objectStoreController.StartWatch(cluster.stopCh)
// Start object store user CRD watcher
objectStoreUserController := objectuser.NewObjectStoreUserController(c.context, cluster.ownerRef)
objectStoreUserController.StartWatch(cluster.Namespace, cluster.stopCh)
objectStoreUserController := objectuser.NewObjectStoreUserController(c.context, cluster.Namespace, cluster.ownerRef)
objectStoreUserController.StartWatch(cluster.stopCh)
// Start file system CRD watcher
fileController := file.NewFilesystemController(cluster.Info, c.context, c.rookImage, cluster.Spec.CephVersion, cluster.Spec.Network.HostNetwork, cluster.ownerRef, cluster.Spec.DataDirHostPath)
fileController.StartWatch(cluster.Namespace, cluster.stopCh)
fileController := file.NewFilesystemController(cluster.Info, c.context, cluster.Namespace, c.rookImage, cluster.Spec.CephVersion, cluster.Spec.Network.HostNetwork, cluster.ownerRef, cluster.Spec.DataDirHostPath)
fileController.StartWatch(cluster.stopCh)
// Start nfs ganesha CRD watcher
ganeshaController := nfs.NewCephNFSController(cluster.Info, c.context, c.rookImage, cluster.Spec.CephVersion, cluster.Spec.Network.HostNetwork, cluster.ownerRef)
ganeshaController.StartWatch(cluster.Namespace, cluster.stopCh)
ganeshaController := nfs.NewCephNFSController(cluster.Info, c.context, cluster.Namespace, c.rookImage, cluster.Spec.CephVersion, cluster.Spec.Network.HostNetwork, cluster.ownerRef)
ganeshaController.StartWatch(cluster.stopCh)
cluster.childControllers = []childController{
poolController, objectStoreController, objectStoreUserController, fileController, ganeshaController,
}
// Start mon health checker
healthChecker := mon.NewHealthChecker(cluster.mons)
......
......@@ -20,6 +20,7 @@ package file
import (
"fmt"
"reflect"
"sync"
"github.com/coreos/pkg/capnslog"
opkit "github.com/rook/operator-kit"
......@@ -57,19 +58,22 @@ var filesystemResourceRookLegacy = opkit.CustomResource{
// FilesystemController represents a controller for filesystem custom resources
type FilesystemController struct {
clusterInfo *cephconfig.ClusterInfo
context *clusterd.Context
rookVersion string
cephVersion cephv1.CephVersionSpec
hostNetwork bool
ownerRef metav1.OwnerReference
dataDirHostPath string
clusterInfo *cephconfig.ClusterInfo
context *clusterd.Context
namespace string
rookVersion string
cephVersion cephv1.CephVersionSpec
hostNetwork bool
ownerRef metav1.OwnerReference
dataDirHostPath string
orchestrationMutex sync.Mutex
}
// NewFilesystemController create controller for watching filesystem custom resources created
func NewFilesystemController(
clusterInfo *cephconfig.ClusterInfo,
context *clusterd.Context,
namespace string,
rookVersion string,
cephVersion cephv1.CephVersionSpec,
hostNetwork bool,
......@@ -79,6 +83,7 @@ func NewFilesystemController(
return &FilesystemController{
clusterInfo: clusterInfo,
context: context,
namespace: namespace,
rookVersion: rookVersion,
cephVersion: cephVersion,
hostNetwork: hostNetwork,
......@@ -88,7 +93,7 @@ func NewFilesystemController(
}
// StartWatch watches for instances of Filesystem custom resources and acts on them
func (c *FilesystemController) StartWatch(namespace string, stopCh chan struct{}) error {
func (c *FilesystemController) StartWatch(stopCh chan struct{}) error {
resourceHandlerFuncs := cache.ResourceEventHandlerFuncs{
AddFunc: c.onAdd,
......@@ -96,12 +101,12 @@ func (c *FilesystemController) StartWatch(namespace string, stopCh chan struct{}
DeleteFunc: c.onDelete,
}
logger.Infof("start watching filesystem resource in namespace %s", namespace)
watcher := opkit.NewWatcher(FilesystemResource, namespace, resourceHandlerFuncs, c.context.RookClientset.CephV1().RESTClient())
logger.Infof("start watching filesystem resource in namespace %s", c.namespace)
watcher := opkit.NewWatcher(FilesystemResource, c.namespace, resourceHandlerFuncs, c.context.RookClientset.CephV1().RESTClient())
go watcher.Watch(&cephv1.CephFilesystem{}, stopCh)
// watch for events on all legacy types too
c.watchLegacyFilesystems(namespace, stopCh, resourceHandlerFuncs)
c.watchLegacyFilesystems(c.namespace, stopCh, resourceHandlerFuncs)
return nil
}
......@@ -120,6 +125,9 @@ func (c *FilesystemController) onAdd(obj interface{}) {
return
}
c.acquireOrchestrationLock()
defer c.releaseOrchestrationLock()
err = createFilesystem(c.clusterInfo, c.context, *filesystem, c.rookVersion, c.cephVersion, c.hostNetwork, c.filesystemOwners(filesystem), c.dataDirHostPath)
if err != nil {
logger.Errorf("failed to create filesystem %s: %+v", filesystem.Name, err)
......@@ -150,6 +158,9 @@ func (c *FilesystemController) onUpdate(oldObj, newObj interface{}) {
return
}
c.acquireOrchestrationLock()
defer c.releaseOrchestrationLock()
// if the filesystem is modified, allow the filesystem to be created if it wasn't already
logger.Infof("updating filesystem %s", newFS.Name)
err = createFilesystem(c.clusterInfo, c.context, *newFS, c.rookVersion, c.cephVersion, c.hostNetwork, c.filesystemOwners(newFS), c.dataDirHostPath)
......@@ -158,6 +169,33 @@ func (c *FilesystemController) onUpdate(oldObj, newObj interface{}) {
}
}
func (c *FilesystemController) ParentClusterChanged(cluster cephv1.ClusterSpec, clusterInfo *cephconfig.ClusterInfo) {
c.clusterInfo = clusterInfo
if cluster.CephVersion.Image == c.cephVersion.Image {
logger.Debugf("No need to update the file system after the parent cluster changed")
return
}
c.acquireOrchestrationLock()
defer c.releaseOrchestrationLock()
c.cephVersion = cluster.CephVersion
filesystems, err := c.context.RookClientset.CephV1().CephFilesystems(c.namespace).List(metav1.ListOptions{})
if err != nil {
logger.Errorf("failed to retrieve filesystems to update the ceph version. %+v", err)
return
}
for _, fs := range filesystems.Items {
logger.Infof("updating the ceph version for filesystem %s to %s", fs.Name, c.cephVersion.Image)
err = createFilesystem(c.clusterInfo, c.context, fs, c.rookVersion, c.cephVersion, c.hostNetwork, c.filesystemOwners(&fs), c.dataDirHostPath)
if err != nil {
logger.Errorf("failed to update filesystem %s. %+v", fs.Name, err)
} else {
logger.Infof("updated filesystem %s to ceph version %s", fs.Name, c.cephVersion.Image)
}
}
}
func (c *FilesystemController) onDelete(obj interface{}) {
filesystem, migrationNeeded, err := getFilesystemObject(obj)
if err != nil {
......@@ -170,6 +208,9 @@ func (c *FilesystemController) onDelete(obj interface{}) {
return
}
c.acquireOrchestrationLock()
defer c.releaseOrchestrationLock()
err = deleteFilesystem(c.context, c.clusterInfo.CephVersion, *filesystem)
if err != nil {
logger.Errorf("failed to delete filesystem %s: %+v", filesystem.Name, err)
......@@ -293,3 +334,14 @@ func convertRookLegacyFilesystem(legacyFilesystem *cephbeta.Filesystem) *cephv1.
return filesystem
}
func (c *FilesystemController) acquireOrchestrationLock() {
logger.Debugf("Acquiring lock for filesystem orchestration")
c.orchestrationMutex.Lock()
logger.Debugf("Acquired lock for filesystem orchestration")
}
func (c *FilesystemController) releaseOrchestrationLock() {
c.orchestrationMutex.Unlock()
logger.Debugf("Released lock for filesystem orchestration")
}
......@@ -87,7 +87,7 @@ func TestMigrateFilesystemObject(t *testing.T) {
}
clusterInfo := &cephconfig.ClusterInfo{FSID: "myfsid"}
controller := NewFilesystemController(clusterInfo, context, "", cephv1.CephVersionSpec{}, false, metav1.OwnerReference{}, "/var/lib/rook/")
controller := NewFilesystemController(clusterInfo, context, legacyFilesystem.Namespace, "", cephv1.CephVersionSpec{}, false, metav1.OwnerReference{}, "/var/lib/rook/")
// convert the legacy filesystem object in memory and assert that a migration is needed
convertedFilesystem, migrationNeeded, err := getFilesystemObject(legacyFilesystem)
......
......@@ -19,6 +19,7 @@ package nfs
import (
"reflect"
"sync"
"github.com/coreos/pkg/capnslog"
opkit "github.com/rook/operator-kit"
......@@ -44,19 +45,22 @@ var CephNFSResource = opkit.CustomResource{
// NFSCephNFSController represents a controller for NFS custom resources
type CephNFSController struct {
clusterInfo *cephconfig.ClusterInfo
context *clusterd.Context
rookImage string
cephVersion cephv1.CephVersionSpec
hostNetwork bool
ownerRef metav1.OwnerReference
clusterInfo *cephconfig.ClusterInfo
context *clusterd.Context
namespace string
rookImage string
cephVersion cephv1.CephVersionSpec
hostNetwork bool
ownerRef metav1.OwnerReference
orchestrationMutex sync.Mutex
}
// NewNFSCephNFSController create controller for watching NFS custom resources created
func NewCephNFSController(clusterInfo *cephconfig.ClusterInfo, context *clusterd.Context, rookImage string, cephVersion cephv1.CephVersionSpec, hostNetwork bool, ownerRef metav1.OwnerReference) *CephNFSController {
func NewCephNFSController(clusterInfo *cephconfig.ClusterInfo, context *clusterd.Context, namespace, rookImage string, cephVersion cephv1.CephVersionSpec, hostNetwork bool, ownerRef metav1.OwnerReference) *CephNFSController {
return &CephNFSController{
clusterInfo: clusterInfo,
context: context,
namespace: namespace,
rookImage: rookImage,
cephVersion: cephVersion,
hostNetwork: hostNetwork,
......@@ -65,7 +69,7 @@ func NewCephNFSController(clusterInfo *cephconfig.ClusterInfo, context *clusterd
}
// StartWatch watches for instances of CephNFS custom resources and acts on them
func (c *CephNFSController) StartWatch(namespace string, stopCh chan struct{}) error {
func (c *CephNFSController) StartWatch(stopCh chan struct{}) error {
resourceHandlerFuncs := cache.ResourceEventHandlerFuncs{
AddFunc: c.onAdd,
......@@ -73,8 +77,8 @@ func (c *CephNFSController) StartWatch(namespace string, stopCh chan struct{}) e
DeleteFunc: c.onDelete,
}
logger.Infof("start watching ceph nfs resource in namespace %s", namespace)
watcher := opkit.NewWatcher(CephNFSResource, namespace, resourceHandlerFuncs, c.context.RookClientset.CephV1().RESTClient())
logger.Infof("start watching ceph nfs resource in namespace %s", c.namespace)
watcher := opkit.NewWatcher(CephNFSResource, c.namespace, resourceHandlerFuncs, c.context.RookClientset.CephV1().RESTClient())
go watcher.Watch(&cephv1.CephNFS{}, stopCh)
return nil
......@@ -87,6 +91,9 @@ func (c *CephNFSController) onAdd(obj interface{}) {
return
}
c.acquireOrchestrationLock()
defer c.releaseOrchestrationLock()
err := c.upCephNFS(*nfs, 0)
if err != nil {
logger.Errorf("failed to create NFS Ganesha %s. %+v", nfs.Name, err)
......@@ -106,6 +113,9 @@ func (c *CephNFSController) onUpdate(oldObj, newObj interface{}) {
return
}
c.acquireOrchestrationLock()
defer c.releaseOrchestrationLock()
logger.Infof("Updating the ganesha server from %d to %d active count", oldNFS.Spec.Server.Active, newNFS.Spec.Server.Active)
if oldNFS.Spec.Server.Active < newNFS.Spec.Server.Active {
err := c.upCephNFS(*newNFS, oldNFS.Spec.Server.Active)
......@@ -128,15 +138,56 @@ func (c *CephNFSController) onDelete(obj interface{}) {
return
}
c.acquireOrchestrationLock()
defer c.releaseOrchestrationLock()
err := c.downCephNFS(*nfs, 0)
if err != nil {
logger.Errorf("failed to delete file system %s. %+v", nfs.Name, err)
}
}
func (c *CephNFSController) ParentClusterChanged(cluster cephv1.ClusterSpec, clusterInfo *cephconfig.ClusterInfo) {
c.clusterInfo = clusterInfo
if cluster.CephVersion.Image == c.cephVersion.Image || !c.clusterInfo.CephVersion.IsAtLeastNautilus() {
logger.Debugf("No need to update the nfs daemons after the parent cluster changed")
return
}
c.acquireOrchestrationLock()
defer c.releaseOrchestrationLock()
c.cephVersion = cluster.CephVersion
nfses, err := c.context.RookClientset.CephV1().CephNFSes(c.namespace).List(metav1.ListOptions{})
if err != nil {
logger.Errorf("failed to retrieve NFSes to update the ceph version. %+v", err)
return
}
for _, nfs := range nfses.Items {
logger.Infof("updating the ceph version for nfs %s to %s", nfs.Name, c.cephVersion.Image)
err := c.upCephNFS(nfs, 0)
if err != nil {
logger.Errorf("failed to update nfs %s. %+v", nfs.Name, err)
} else {
logger.Infof("updated nfs %s to ceph version %s", nfs.Name, c.cephVersion.Image)
}
}
}
func nfsChanged(oldNFS, newNFS cephv1.NFSGaneshaSpec) bool {
if oldNFS.Server.Active != newNFS.Server.Active {
return true
}
return false
}
func (c *CephNFSController) acquireOrchestrationLock() {
logger.Debugf("Acquiring lock for nfs orchestration")
c.orchestrationMutex.Lock()
logger.Debugf("Acquired lock for nfs orchestration")
}
func (c *CephNFSController) releaseOrchestrationLock() {
c.orchestrationMutex.Unlock()
logger.Debugf("Released lock for nfs orchestration")
}
......@@ -76,9 +76,7 @@ func (c *CephNFSController) upCephNFS(n cephv1.CephNFS, oldActive int) error {
return fmt.Errorf("failed to create ganesha service. %+v", err)
}
if err = c.addServerToDatabase(n, name); err != nil {
logger.Warningf("Failed to add ganesha server %s to database. It may already be added. %+v", name, err)
}
c.addServerToDatabase(n, name)
}
return nil
......@@ -97,22 +95,20 @@ func (c *CephNFSController) addRADOSConfigFile(n cephv1.CephNFS, name string) er
return c.context.Executor.ExecuteCommand(false, "", "rados", "--pool", n.Spec.RADOS.Pool, "--namespace", n.Spec.RADOS.Namespace, "create", config)
}
func (c *CephNFSController) addServerToDatabase(n cephv1.CephNFS, name string) error {
func (c *CephNFSController) addServerToDatabase(n cephv1.CephNFS, name string) {
logger.Infof("Adding ganesha %s to grace db", name)
if err := c.runGaneshaRadosGraceJob(n, name, "add", 10*time.Minute); err != nil {
logger.Errorf("failed to add %s to grace db. %+v", name, err)
}
return nil
}
func (c *CephNFSController) removeServerFromDatabase(n cephv1.CephNFS, name string) error {
func (c *CephNFSController) removeServerFromDatabase(n cephv1.CephNFS, name string) {
logger.Infof("Removing ganesha %s from grace db", name)
if err := c.runGaneshaRadosGraceJob(n, name, "remove", 10*time.Minute); err != nil {
logger.Errorf("failed to remmove %s from grace db. %+v", name, err)
}
return nil
}
func (c *CephNFSController) runGaneshaRadosGraceJob(n cephv1.CephNFS, name, action string, timeout time.Duration) error {
......@@ -219,9 +215,7 @@ func (c *CephNFSController) downCephNFS(n cephv1.CephNFS, newActive int) error {
name := k8sutil.IndexToName(i)
// Remove from grace db
if err := c.removeServerFromDatabase(n, name); err != nil {
logger.Warningf("failed to remove server %s from grace db. %+v", name, err)
}
c.removeServerFromDatabase(n, name)
// Delete the mds deployment
k8sutil.DeleteDeployment(c.context.Clientset, n.Namespace, instanceName(n, name))
......
......@@ -19,6 +19,7 @@ package object
import (
"fmt"
"reflect"
"sync"
"github.com/coreos/pkg/capnslog"
opkit "github.com/rook/operator-kit"
......@@ -57,19 +58,22 @@ var ObjectStoreResourceRookLegacy = opkit.CustomResource{
// ObjectStoreController represents a controller object for object store custom resources
type ObjectStoreController struct {
clusterInfo *daemonconfig.ClusterInfo
context *clusterd.Context
rookImage string
cephVersion cephv1.CephVersionSpec
hostNetwork bool
ownerRef metav1.OwnerReference
dataDirHostPath string
clusterInfo *daemonconfig.ClusterInfo
context *clusterd.Context
namespace string
rookImage string
cephVersion cephv1.CephVersionSpec
hostNetwork bool
ownerRef metav1.OwnerReference
dataDirHostPath string
orchestrationMutex sync.Mutex
}
// NewObjectStoreController create controller for watching object store custom resources created
func NewObjectStoreController(
clusterInfo *daemonconfig.ClusterInfo,
context *clusterd.Context,
namespace string,
rookImage string,
cephVersion cephv1.CephVersionSpec,
hostNetwork bool,
......@@ -79,6 +83,7 @@ func NewObjectStoreController(
return &ObjectStoreController{
clusterInfo: clusterInfo,
context: context,
namespace: namespace,
rookImage: rookImage,
cephVersion: cephVersion,
hostNetwork: hostNetwork,
......@@ -88,7 +93,7 @@ func NewObjectStoreController(
}
// StartWatch watches for instances of ObjectStore custom resources and acts on them
func (c *ObjectStoreController) StartWatch(namespace string, stopCh chan struct{}) error {
func (c *ObjectStoreController) StartWatch(stopCh chan struct{}) error {
resourceHandlerFuncs := cache.ResourceEventHandlerFuncs{
AddFunc: c.onAdd,
......@@ -96,12 +101,12 @@ func (c *ObjectStoreController) StartWatch(namespace string, stopCh chan struct{
DeleteFunc: c.onDelete,
}
logger.Infof("start watching object store resources in namespace %s", namespace)
watcher := opkit.NewWatcher(ObjectStoreResource, namespace, resourceHandlerFuncs, c.context.RookClientset.CephV1().RESTClient())
logger.Infof("start watching object store resources in namespace %s", c.namespace)
watcher := opkit.NewWatcher(ObjectStoreResource, c.namespace, resourceHandlerFuncs, c.context.RookClientset.CephV1().RESTClient())
go watcher.Watch(&cephv1.CephObjectStore{}, stopCh)
// watch for events on all legacy types too
c.watchLegacyObjectStores(namespace, stopCh, resourceHandlerFuncs)
c.watchLegacyObjectStores(c.namespace, stopCh, resourceHandlerFuncs)
return nil
}
......@@ -120,19 +125,10 @@ func (c *ObjectStoreController) onAdd(obj interface{}) {
return
}
cfg := clusterConfig{
clusterInfo: c.clusterInfo,
context: c.context,
store: *objectstore,
rookVersion: c.rookImage,
cephVersion: c.cephVersion,
hostNetwork: c.hostNetwork,
ownerRefs: c.storeOwners(objectstore),
DataPathMap: cephconfig.NewStatelessDaemonDataPathMap(cephconfig.RgwType, objectstore.Name, c.clusterInfo.Name, c.dataDirHostPath),
}
if err = cfg.createStore(); err != nil {
logger.Errorf("failed to create object store %s. %+v", objectstore.Name, err)
}
c.acquireOrchestrationLock()
defer c.releaseOrchestrationLock()
c.createOrUpdateStore(true, objectstore)
}
func (c *ObjectStoreController) onUpdate(oldObj, newObj interface{}) {
......@@ -160,19 +156,31 @@ func (c *ObjectStoreController) onUpdate(oldObj, newObj interface{}) {
return
}
logger.Infof("applying object store %s changes", newStore.Name)
c.acquireOrchestrationLock()
defer c.releaseOrchestrationLock()
c.createOrUpdateStore(true, newStore)
}
func (c *ObjectStoreController) createOrUpdateStore(update bool, objectstore *cephv1.CephObjectStore) {
action := "create"
if update {
action = "update"
}
logger.Infof("%s object store %s", action, objectstore.Name)
cfg := clusterConfig{
c.clusterInfo,
c.context,
*newStore,
c.rookImage,
c.cephVersion,
c.hostNetwork,
c.storeOwners(newStore),
cephconfig.NewStatelessDaemonDataPathMap(cephconfig.RgwType, newStore.Name, c.clusterInfo.Name, c.dataDirHostPath),
clusterInfo: c.clusterInfo,
context: c.context,
store: *objectstore,
rookVersion: c.rookImage,
cephVersion: c.cephVersion,
hostNetwork: c.hostNetwork,
ownerRefs: c.storeOwners(objectstore),
DataPathMap: cephconfig.NewStatelessDaemonDataPathMap(cephconfig.RgwType, objectstore.Name, c.clusterInfo.Name, c.dataDirHostPath),
}
if err = cfg.updateStore(); err != nil {
logger.Errorf("failed to create (modify) object store %s. %+v", newStore.Name, err)
if err := cfg.createOrUpdate(update); err != nil {
logger.Errorf("failed to %s object store %s. %+v", action, objectstore.Name, err)
}
}
......@@ -188,12 +196,42 @@ func (c *ObjectStoreController) onDelete(obj interface{}) {
return
}
c.acquireOrchestrationLock()
defer c.releaseOrchestrationLock()
cfg := clusterConfig{context: c.context, store: *objectstore}
if err = cfg.deleteStore(); err != nil {
logger.Errorf("failed to delete object store %s. %+v", objectstore.Name, err)
}
}
func (c *ObjectStoreController) ParentClusterChanged(cluster cephv1.ClusterSpec, clusterInfo *daemonconfig.ClusterInfo) {
c.clusterInfo = clusterInfo
if cluster.CephVersion.Image == c.cephVersion.Image {
logger.Debugf("No need to update the object store after the parent cluster changed")
return
}
c.acquireOrchestrationLock()
defer c.releaseOrchestrationLock()
c.cephVersion = cluster.CephVersion
objectStores, err := c.context.RookClientset.CephV1().CephObjectStores(c.namespace).List(metav1.ListOptions{})
if err != nil {
logger.Errorf("failed to retrieve object stores to update the ceph version. %+v", err)
return
}
for _, store := range objectStores.Items {
logger.Infof("updating the ceph version for object store %s to %s", store.Name, c.cephVersion.Image)
c.createOrUpdateStore(true, &store)
if err != nil {
logger.Errorf("failed to update object store %s. %+v", store.Name, err)
} else {
logger.Infof("updated object store %s to ceph version %s", store.Name, c.cephVersion.Image)
}
}
}
func (c *ObjectStoreController) storeOwners(store *cephv1.CephObjectStore) []metav1.OwnerReference {
// Only set the cluster crd as the owner of the object store resources.
// If the object store crd is deleted, the operator will explicitly remove the object store resources.
......@@ -325,3 +363,14 @@ func convertRookLegacyObjectStore(legacyObjectStore *cephbeta.ObjectStore) *ceph
return objectStore
}
func (c *ObjectStoreController) acquireOrchestrationLock() {
logger.Debugf("Acquiring lock for object store orchestration")
c.orchestrationMutex.Lock()
logger.Debugf("Acquired lock for object store orchestration")
}
func (c *ObjectStoreController) releaseOrchestrationLock() {
c.orchestrationMutex.Unlock()
logger.Debugf("Released lock for object store orchestration")
}
......@@ -92,7 +92,7 @@ func TestMigrateObjectStoreObject(t *testing.T) {
RookClientset: rookfake.NewSimpleClientset(legacyObjectStore),
}
info := testop.CreateConfigDir(1)
controller := NewObjectStoreController(info, context, "", cephv1.CephVersionSpec{}, false, metav1.OwnerReference{}, "/var/lib/rook/")
controller := NewObjectStoreController(info, context, legacyObjectStore.Namespace, "", cephv1.CephVersionSpec{}, false, metav1.OwnerReference{}, "/var/lib/rook/")
// convert the legacy objectstore object in memory and assert that a migration is needed
convertedObjectStore, migrationNeeded, err := getObjectStoreObject(legacyObjectStore)
......
......@@ -25,6 +25,7 @@ import (
opkit "github.com/rook/operator-kit"
cephv1 "github.com/rook/rook/pkg/apis/ceph.rook.io/v1"
"github.com/rook/rook/pkg/clusterd"
cephconfig "github.com/rook/rook/pkg/daemon/ceph/config"
"github.com/rook/rook/pkg/operator/ceph/object"
"github.com/rook/rook/pkg/operator/k8sutil"
"k8s.io/api/core/v1"
......@@ -51,20 +52,22 @@ var ObjectStoreUserResource = opkit.CustomResource{
// ObjectStoreUserController represents a controller object for object store user custom resources
type ObjectStoreUserController struct {
context *clusterd.Context
ownerRef metav1.OwnerReference
context *clusterd.Context
ownerRef metav1.OwnerReference
namespace string
}
// NewObjectStoreUserController create controller for watching object store user custom resources created
func NewObjectStoreUserController(context *clusterd.Context, ownerRef metav1.OwnerReference) *ObjectStoreUserController {
func NewObjectStoreUserController(context *clusterd.Context, namespace string, ownerRef metav1.OwnerReference) *ObjectStoreUserController {
return &ObjectStoreUserController{
context: context,
ownerRef: ownerRef,
context: context,
ownerRef: ownerRef,
namespace: namespace,
}
}
// StartWatch watches for instances of ObjectStoreUser custom resources and acts on them
func (c *ObjectStoreUserController) StartWatch(namespace string, stopCh chan struct{}) error {
func (c *ObjectStoreUserController) StartWatch(stopCh chan struct{}) error {
resourceHandlerFuncs := cache.ResourceEventHandlerFuncs{
AddFunc: c.onAdd,
......@@ -72,8 +75,8 @@ func (c *ObjectStoreUserController) StartWatch(namespace string, stopCh chan str
DeleteFunc: c.onDelete,
}
logger.Infof("start watching object store user resources in namespace %s", namespace)
watcher := opkit.NewWatcher(ObjectStoreUserResource, namespace, resourceHandlerFuncs, c.context.RookClientset.CephV1().RESTClient())
logger.Infof("start watching object store user resources in namespace %s", c.namespace)
watcher := opkit.NewWatcher(ObjectStoreUserResource, c.namespace, resourceHandlerFuncs, c.context.RookClientset.CephV1().RESTClient())
go watcher.Watch(&cephv1.CephObjectStoreUser{}, stopCh)
return nil
......@@ -107,6 +110,10 @@ func (c *ObjectStoreUserController) onDelete(obj interface{}) {
}
}
func (c *ObjectStoreUserController) ParentClusterChanged(cluster cephv1.ClusterSpec, clusterInfo *cephconfig.ClusterInfo) {
logger.Debugf("No need to update object store users after the parent cluster changed")
}
func (c *ObjectStoreUserController) storeUserOwners(store *cephv1.CephObjectStoreUser) []metav1.OwnerReference {
// Only set the cluster crd as the owner of the object store user resources.
// If the object store user crd is deleted, the operator will explicitly remove the object store user resources.
......
......@@ -27,6 +27,7 @@ import (
cephbeta "github.com/rook/rook/pkg/apis/ceph.rook.io/v1beta1"
"github.com/rook/rook/pkg/clusterd"
ceph "github.com/rook/rook/pkg/daemon/ceph/client"
cephconfig "github.com/rook/rook/pkg/daemon/ceph/config"
"github.com/rook/rook/pkg/daemon/ceph/model"
apiextensionsv1beta1 "k8s.io/apiextensions-apiserver/pkg/apis/apiextensions/v1beta1"
"k8s.io/apimachinery/pkg/api/errors"
......@@ -63,18 +64,20 @@ var PoolResourceRookLegacy = opkit.CustomResource{
// PoolController represents a controller object for pool custom resources
type PoolController struct {
context *clusterd.Context
context *clusterd.Context
namespace string
}
// NewPoolController create controller for watching pool custom resources created
func NewPoolController(context *clusterd.Context) *PoolController {
func NewPoolController(context *clusterd.Context, namespace string) *PoolController {
return &PoolController{
context: context,
context: context,
namespace: namespace,
}
}
// Watch watches for instances of Pool custom resources and acts on them
func (c *PoolController) StartWatch(namespace string, stopCh chan struct{}) error {
func (c *PoolController) StartWatch(stopCh chan struct{}) error {
resourceHandlerFuncs := cache.ResourceEventHandlerFuncs{
AddFunc: c.onAdd,
......@@ -82,12 +85,12 @@ func (c *PoolController) StartWatch(namespace string, stopCh chan struct{}) erro
DeleteFunc: c.onDelete,
}
logger.Infof("start watching pool resources in namespace %s", namespace)
watcher := opkit.NewWatcher(PoolResource, namespace, resourceHandlerFuncs, c.context.RookClientset.CephV1().RESTClient())
logger.Infof("start watching pool resources in namespace %s", c.namespace)
watcher := opkit.NewWatcher(PoolResource, c.namespace, resourceHandlerFuncs, c.context.RookClientset.CephV1().RESTClient())
go watcher.Watch(&cephv1.CephBlockPool{}, stopCh)
// watch for events on all legacy types too
c.watchLegacyPools(namespace, stopCh, resourceHandlerFuncs)
c.watchLegacyPools(c.namespace, stopCh, resourceHandlerFuncs)
return nil
}
......@@ -151,6 +154,10 @@ func (c *PoolController) onUpdate(oldObj, newObj interface{}) {
}
}
func (c *PoolController) ParentClusterChanged(cluster cephv1.ClusterSpec, clusterInfo *cephconfig.ClusterInfo) {
logger.Debugf("No need to update the pool after the parent cluster changed")
}
func poolChanged(old, new cephv1.PoolSpec) bool {
if old.Replicated.Size != new.Replicated.Size {
logger.Infof("pool replication changed from %d to %d", old.Replicated.Size, new.Replicated.Size)
......
......@@ -220,7 +220,7 @@ func TestMigratePoolObject(t *testing.T) {
Clientset: clientset,
RookClientset: rookfake.NewSimpleClientset(legacyPool),
}
controller := NewPoolController(context)
controller := NewPoolController(context, legacyPool.Namespace)
// convert the legacy pool object in memory and assert that a migration is needed
convertedPool, migrationNeeded, err := getPoolObject(legacyPool)
......
Supports Markdown
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment