Unverified Commit 9edf205c authored by Travis Nielsen's avatar Travis Nielsen Committed by GitHub
Browse files

Merge pull request #5035 from leseb/file-crd-controller-runtime

ceph: convert Filesystem controller to the controller-runtime
parents d9a1ea50 ca0a30f3
Showing with 523 additions and 273 deletions
+523 -273
......@@ -39,7 +39,6 @@ import (
"github.com/rook/rook/pkg/operator/ceph/config"
"github.com/rook/rook/pkg/operator/ceph/controller"
"github.com/rook/rook/pkg/operator/ceph/csi"
"github.com/rook/rook/pkg/operator/ceph/file"
"github.com/rook/rook/pkg/operator/ceph/nfs"
"github.com/rook/rook/pkg/operator/ceph/object/bucket"
cephver "github.com/rook/rook/pkg/operator/ceph/version"
......@@ -466,17 +465,13 @@ func (c *ClusterController) initializeCluster(cluster *cluster, clusterObj *ceph
bucketController, _ := bucket.NewBucketController(c.context.KubeConfig, bucketProvisioner)
go bucketController.Run(cluster.stopCh)
// Start file system CRD watcher
fileController := file.NewFilesystemController(cluster.Info, c.context, cluster.Namespace, c.rookImage, cluster.Spec, cluster.ownerRef, cluster.Spec.DataDirHostPath)
fileController.StartWatch(cluster.Namespace, cluster.stopCh)
// Start nfs ganesha CRD watcher
ganeshaController := nfs.NewCephNFSController(cluster.Info, c.context, cluster.Spec.DataDirHostPath, cluster.Namespace, c.rookImage, cluster.Spec, cluster.ownerRef)
ganeshaController.StartWatch(cluster.Namespace, cluster.stopCh)
// Populate childControllers
logger.Debug("populating child controllers, so cluster CR spec updates will be propagaged to other CR")
cluster.childControllers = []childController{
fileController,
ganeshaController,
}
......
......@@ -20,6 +20,7 @@ import (
"github.com/pkg/errors"
"github.com/rook/rook/pkg/clusterd"
"github.com/rook/rook/pkg/operator/ceph/cluster/crash"
"github.com/rook/rook/pkg/operator/ceph/file"
"github.com/rook/rook/pkg/operator/ceph/object"
objectuser "github.com/rook/rook/pkg/operator/ceph/object/user"
"github.com/rook/rook/pkg/operator/ceph/pool"
......@@ -33,6 +34,7 @@ var AddToManagerFuncs = []func(manager.Manager, *clusterd.Context) error{
pool.Add,
objectuser.Add,
object.Add,
file.Add,
}
// AddToManager adds all the registered controllers to the passed manager.
......
......@@ -94,6 +94,20 @@ func WatchControllerPredicate() predicate.Funcs {
return true
}
case *cephv1.CephFilesystem:
objNew := e.ObjectNew.(*cephv1.CephFilesystem)
logger.Debug("update event from the parent object CephFilesystem")
diff := cmp.Diff(objOld.Spec, objNew.Spec, resourceQtyComparer)
if diff != "" ||
objOld.GetDeletionTimestamp() != objNew.GetDeletionTimestamp() ||
objOld.GetGeneration() != objNew.GetGeneration() {
// Checking if diff is not empty so we don't print it when the CR gets deleted
if diff != "" {
logger.Infof("CR has changed for %q. diff=%s", objNew.Name, diff)
}
return true
}
}
logger.Debug("wont update unknown object")
return false
......@@ -154,6 +168,8 @@ func WatchPredicateForNonCRDObject(owner runtime.Object, scheme *runtime.Scheme)
logger.Debugf("object %q matched on delete", object.GetName())
return true
}
logger.Debugf("object %q did not match on delete", object.GetName())
return false
},
UpdateFunc: func(e event.UpdateEvent) bool {
......@@ -170,6 +186,7 @@ func WatchPredicateForNonCRDObject(owner runtime.Object, scheme *runtime.Scheme)
return objectChanged
}
logger.Debugf("object %q did not match on update", object.GetName())
return false
},
GenericFunc: func(e event.GenericEvent) bool {
......
This diff is collapsed.
......@@ -14,40 +14,271 @@ See the License for the specific language governing permissions and
limitations under the License.
*/
// Package mds to manage a rook filesystem.
// Package file to manage a rook filesystem
package file
import (
"context"
"os"
"testing"
"github.com/coreos/pkg/capnslog"
cephv1 "github.com/rook/rook/pkg/apis/ceph.rook.io/v1"
rookclient "github.com/rook/rook/pkg/client/clientset/versioned/fake"
"github.com/rook/rook/pkg/client/clientset/versioned/scheme"
"github.com/rook/rook/pkg/clusterd"
"github.com/rook/rook/pkg/operator/k8sutil"
"github.com/rook/rook/pkg/operator/test"
exectest "github.com/rook/rook/pkg/util/exec/test"
"github.com/stretchr/testify/assert"
v1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/types"
"sigs.k8s.io/controller-runtime/pkg/client/fake"
"sigs.k8s.io/controller-runtime/pkg/reconcile"
)
func TestFilesystemChanged(t *testing.T) {
// no change
old := cephv1.FilesystemSpec{MetadataServer: cephv1.MetadataServerSpec{ActiveCount: 1, ActiveStandby: true}}
new := cephv1.FilesystemSpec{MetadataServer: cephv1.MetadataServerSpec{ActiveCount: 1, ActiveStandby: true}}
changed := filesystemChanged(old, new)
assert.False(t, changed)
const (
fsGet = `{
"mdsmap":{
"epoch":49,
"flags":50,
"ever_allowed_features":32,
"explicitly_allowed_features":32,
"created":"2020-03-17 13:17:43.743717",
"modified":"2020-03-17 15:22:51.020576",
"tableserver":0,
"root":0,
"session_timeout":60,
"session_autoclose":300,
"min_compat_client":"-1 (unspecified)",
"max_file_size":1099511627776,
"last_failure":0,
"last_failure_osd_epoch":0,
"compat":{
"compat":{
// changed properties
new = cephv1.FilesystemSpec{MetadataServer: cephv1.MetadataServerSpec{ActiveCount: 2, ActiveStandby: true}}
assert.True(t, filesystemChanged(old, new))
},
"ro_compat":{
new = cephv1.FilesystemSpec{MetadataServer: cephv1.MetadataServerSpec{ActiveCount: 1, ActiveStandby: false}}
assert.True(t, filesystemChanged(old, new))
}
},
"incompat":{
"feature_1":"base v0.20",
"feature_2":"client writeable ranges",
"feature_3":"default file layouts on dirs",
"feature_4":"dir inode in separate object",
"feature_5":"mds uses versioned encoding",
"feature_6":"dirfrag is stored in omap",
"feature_8":"no anchor table",
"feature_9":"file layout v2",
"feature_10":"snaprealm v2"
}
},
"max_mds":1,
"in":[
0
],
"up":{
"mds_0":4463
},
"failed":[
],
"damaged":[
],
"stopped":[
],
"info":{
"gid_4463":{
"gid":4463,
"name":"myfs-a",
"rank":0,
"incarnation":5,
"state":"up:active",
"state_seq":3,
"addr":"172.17.0.12:6801/175789278",
"addrs":{
"addrvec":[
{
"type":"v2",
"addr":"172.17.0.12:6800",
"nonce":175789278
},
{
"type":"v1",
"addr":"172.17.0.12:6801",
"nonce":175789278
}
]
},
"export_targets":[
],
"features":4611087854031667199,
"flags":0
}
},
"data_pools":[
3
],
"metadata_pool":2,
"enabled":true,
"fs_name":"myfs",
"balancer":"",
"standby_count_wanted":1
},
"id":1
}`
mdsCephAuthGetOrCreateKey = `{"key":"AQCvzWBeIV9lFRAAninzm+8XFxbSfTiPwoX50g=="}`
)
var (
name = "my-fs"
namespace = "rook-ceph"
)
func TestCephObjectStoreController(t *testing.T) {
// Set DEBUG logging
capnslog.SetGlobalLogLevel(capnslog.DEBUG)
os.Setenv("ROOK_LOG_LEVEL", "DEBUG")
//
// TEST 1 SETUP
//
// FAILURE because no CephCluster
//
// A Pool resource with metadata and spec.
fs := &cephv1.CephFilesystem{
ObjectMeta: metav1.ObjectMeta{
Name: name,
Namespace: namespace,
},
Spec: cephv1.FilesystemSpec{
MetadataServer: cephv1.MetadataServerSpec{
ActiveCount: 1,
},
},
TypeMeta: controllerTypeMeta,
}
cephCluster := &cephv1.CephCluster{}
// Objects to track in the fake client.
object := []runtime.Object{
fs,
}
executor := &exectest.MockExecutor{
MockExecuteCommandWithOutputFile: func(command, outfile string, args ...string) (string, error) {
if args[0] == "status" {
return `{"pgmap":{"num_pgs":100,"pgs_by_state":[{"state_name":"active+clean","count":100}]}}`, nil
}
if args[0] == "fs" && args[1] == "get" {
return fsGet, nil
}
if args[0] == "auth" && args[1] == "get-or-create-key" {
return mdsCephAuthGetOrCreateKey, nil
}
return "", nil
},
}
clientset := test.New(t, 3)
c := &clusterd.Context{
Executor: executor,
RookClientset: rookclient.NewSimpleClientset(),
Clientset: clientset,
}
// Register operator types with the runtime scheme.
s := scheme.Scheme
s.AddKnownTypes(cephv1.SchemeGroupVersion, &cephv1.CephObjectStore{})
s.AddKnownTypes(cephv1.SchemeGroupVersion, &cephv1.CephCluster{})
// Create a fake client to mock API calls.
cl := fake.NewFakeClientWithScheme(s, object...)
// Create a ReconcileCephFilesystem object with the scheme and fake client.
r := &ReconcileCephFilesystem{client: cl, scheme: s, context: c}
// Mock request to simulate Reconcile() being called on an event for a
// watched resource .
req := reconcile.Request{
NamespacedName: types.NamespacedName{
Name: name,
Namespace: namespace,
},
}
logger.Info("STARTING PHASE 1")
res, err := r.Reconcile(req)
assert.NoError(t, err)
assert.True(t, res.Requeue)
logger.Info("PHASE 1 DONE")
//
// TEST 2:
//
// FAILURE we have a cluster but it's not ready
//
cephCluster = &cephv1.CephCluster{
ObjectMeta: metav1.ObjectMeta{
Name: namespace,
Namespace: namespace,
},
Status: cephv1.ClusterStatus{
Phase: "",
},
}
object = append(object, cephCluster)
// Create a fake client to mock API calls.
cl = fake.NewFakeClientWithScheme(s, object...)
// Create a ReconcileCephFilesystem object with the scheme and fake client.
r = &ReconcileCephFilesystem{client: cl, scheme: s, context: c}
logger.Info("STARTING PHASE 2")
res, err = r.Reconcile(req)
assert.NoError(t, err)
assert.True(t, res.Requeue)
logger.Info("PHASE 2 DONE")
//
// TEST 3:
//
// SUCCESS! The CephCluster is ready
//
// Mock clusterInfo
secrets := map[string][]byte{
"cluster-name": []byte("foo-cluster"),
"fsid": []byte(name),
"mon-secret": []byte("monsecret"),
"admin-secret": []byte("adminsecret"),
}
secret := &v1.Secret{
ObjectMeta: metav1.ObjectMeta{
Name: "rook-ceph-mon",
Namespace: namespace,
},
Data: secrets,
Type: k8sutil.RookType,
}
_, err = c.Clientset.CoreV1().Secrets(namespace).Create(secret)
assert.NoError(t, err)
// Add ready status to the CephCluster
cephCluster.Status.Phase = k8sutil.ReadyStatus
// Create a fake client to mock API calls.
cl = fake.NewFakeClientWithScheme(s, object...)
func TestGetFilesystemObject(t *testing.T) {
// get a current version filesystem object, should return with no error and no migration needed
filesystem, err := getFilesystemObject(&cephv1.CephFilesystem{})
assert.NotNil(t, filesystem)
assert.Nil(t, err)
// Create a ReconcileCephFilesystem object with the scheme and fake client.
r = &ReconcileCephFilesystem{client: cl, scheme: s, context: c}
// try to get an object that isn't a filesystem, should return with an error
filesystem, err = getFilesystemObject(&map[string]string{})
assert.Nil(t, filesystem)
assert.NotNil(t, err)
logger.Info("STARTING PHASE 3")
res, err = r.Reconcile(req)
assert.NoError(t, err)
assert.False(t, res.Requeue)
err = r.client.Get(context.TODO(), req.NamespacedName, fs)
assert.NoError(t, err)
assert.Equal(t, "Ready", fs.Status.Phase, fs)
logger.Info("PHASE 3 DONE")
}
......@@ -29,6 +29,7 @@ import (
"github.com/rook/rook/pkg/operator/ceph/pool"
cephver "github.com/rook/rook/pkg/operator/ceph/version"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/runtime"
)
const (
......@@ -50,14 +51,12 @@ func createFilesystem(
clusterInfo *cephconfig.ClusterInfo,
context *clusterd.Context,
fs cephv1.CephFilesystem,
rookVersion string,
clusterSpec *cephv1.ClusterSpec,
ownerRefs metav1.OwnerReference,
dataDirHostPath string,
scheme *runtime.Scheme,
) error {
if err := validateFilesystem(context, fs); err != nil {
return err
}
if len(fs.Spec.DataPools) != 0 {
var dataPools []*model.Pool
......@@ -66,18 +65,18 @@ func createFilesystem(
}
f := newFS(fs.Name, fs.Spec.MetadataPool.ToModel(""), dataPools, fs.Spec.MetadataServer.ActiveCount)
if err := f.doFilesystemCreate(context, clusterInfo.CephVersion, fs.Namespace); err != nil {
return errors.Wrapf(err, "failed to create filesystem %s", fs.Name)
return errors.Wrapf(err, "failed to create filesystem %q", fs.Name)
}
}
filesystem, err := client.GetFilesystem(context, fs.Namespace, fs.Name)
if err != nil {
return errors.Wrapf(err, "failed to get filesystem %s", fs.Name)
return errors.Wrapf(err, "failed to get filesystem %q", fs.Name)
}
if fs.Spec.MetadataServer.ActiveStandby {
if err = client.AllowStandbyReplay(context, fs.Namespace, fs.Name, fs.Spec.MetadataServer.ActiveStandby); err != nil {
return errors.Wrapf(err, "failed to set allow_standby_replay to filesystem %s", fs.Name)
return errors.Wrapf(err, "failed to set allow_standby_replay to filesystem %q", fs.Name)
}
}
......@@ -88,8 +87,8 @@ func createFilesystem(
}
}
logger.Infof("start running mdses for filesystem %s", fs.Name)
c := mds.NewCluster(clusterInfo, context, rookVersion, clusterSpec, fs, filesystem, ownerRefs, dataDirHostPath)
logger.Infof("start running mdses for filesystem %q", fs.Name)
c := mds.NewCluster(clusterInfo, context, clusterSpec, fs, filesystem, ownerRefs, dataDirHostPath, scheme)
if err := c.Start(); err != nil {
return err
}
......@@ -115,7 +114,7 @@ func deleteFilesystem(context *clusterd.Context, cephVersion cephver.CephVersion
return nil
}
func validateFilesystem(context *clusterd.Context, f cephv1.CephFilesystem) error {
func validateFilesystem(context *clusterd.Context, f *cephv1.CephFilesystem) error {
if f.Name == "" {
return errors.New("missing name")
}
......
......@@ -25,6 +25,7 @@ import (
"testing"
cephv1 "github.com/rook/rook/pkg/apis/ceph.rook.io/v1"
"github.com/rook/rook/pkg/client/clientset/versioned/scheme"
"github.com/rook/rook/pkg/clusterd"
cephconfig "github.com/rook/rook/pkg/daemon/ceph/config"
cephtest "github.com/rook/rook/pkg/daemon/ceph/test"
......@@ -41,7 +42,7 @@ import (
func TestValidateSpec(t *testing.T) {
context := &clusterd.Context{Executor: &exectest.MockExecutor{}}
fs := cephv1.CephFilesystem{}
fs := &cephv1.CephFilesystem{}
// missing name
assert.NotNil(t, validateFilesystem(context, fs))
......@@ -116,14 +117,14 @@ func TestCreateFilesystem(t *testing.T) {
clusterInfo := &cephconfig.ClusterInfo{FSID: "myfsid"}
// start a basic cluster
err := createFilesystem(clusterInfo, context, fs, "v0.1", &cephv1.ClusterSpec{}, metav1.OwnerReference{}, "/var/lib/rook/")
err := createFilesystem(clusterInfo, context, fs, &cephv1.ClusterSpec{}, metav1.OwnerReference{}, "/var/lib/rook/", scheme.Scheme)
assert.Nil(t, err)
validateStart(t, context, fs)
assert.ElementsMatch(t, []string{}, testopk8s.DeploymentNamesUpdated(deploymentsUpdated))
testopk8s.ClearDeploymentsUpdated(deploymentsUpdated)
// starting again should be a no-op
err = createFilesystem(clusterInfo, context, fs, "v0.1", &cephv1.ClusterSpec{}, metav1.OwnerReference{}, "/var/lib/rook/")
err = createFilesystem(clusterInfo, context, fs, &cephv1.ClusterSpec{}, metav1.OwnerReference{}, "/var/lib/rook/", scheme.Scheme)
assert.Nil(t, err)
validateStart(t, context, fs)
assert.ElementsMatch(t, []string{"rook-ceph-mds-myfs-a", "rook-ceph-mds-myfs-b"}, testopk8s.DeploymentNamesUpdated(deploymentsUpdated))
......@@ -144,8 +145,8 @@ func TestCreateFilesystem(t *testing.T) {
Clientset: clientset}
//Create another filesystem which should fail
err = createFilesystem(clusterInfo, context, fs, "v0.1", &cephv1.ClusterSpec{}, metav1.OwnerReference{}, "/var/lib/rook/")
assert.Equal(t, "failed to create filesystem myfs: cannot create multiple filesystems. enable ROOK_ALLOW_MULTIPLE_FILESYSTEMS env variable to create more than one", err.Error())
err = createFilesystem(clusterInfo, context, fs, &cephv1.ClusterSpec{}, metav1.OwnerReference{}, "/var/lib/rook/", scheme.Scheme)
assert.Equal(t, "failed to create filesystem \"myfs\": cannot create multiple filesystems. enable ROOK_ALLOW_MULTIPLE_FILESYSTEMS env variable to create more than one", err.Error())
}
func TestCreateNopoolFilesystem(t *testing.T) {
......@@ -183,12 +184,12 @@ func TestCreateNopoolFilesystem(t *testing.T) {
clusterInfo := &cephconfig.ClusterInfo{FSID: "myfsid"}
// start a basic cluster
err := createFilesystem(clusterInfo, context, fs, "v0.1", &cephv1.ClusterSpec{}, metav1.OwnerReference{}, "/var/lib/rook/")
err := createFilesystem(clusterInfo, context, fs, &cephv1.ClusterSpec{}, metav1.OwnerReference{}, "/var/lib/rook/", scheme.Scheme)
assert.Nil(t, err)
validateStart(t, context, fs)
// starting again should be a no-op
err = createFilesystem(clusterInfo, context, fs, "v0.1", &cephv1.ClusterSpec{}, metav1.OwnerReference{}, "/var/lib/rook/")
err = createFilesystem(clusterInfo, context, fs, &cephv1.ClusterSpec{}, metav1.OwnerReference{}, "/var/lib/rook/", scheme.Scheme)
assert.Nil(t, err)
validateStart(t, context, fs)
......
......@@ -20,8 +20,6 @@ import (
"fmt"
"strconv"
apps "k8s.io/api/apps/v1"
"github.com/pkg/errors"
"github.com/rook/rook/pkg/operator/ceph/config"
"github.com/rook/rook/pkg/operator/ceph/config/keyring"
......@@ -65,11 +63,6 @@ func (c *Cluster) generateKeyring(m *mdsConfig) (string, error) {
return keyring, s.CreateOrUpdate(m.ResourceName, keyring)
}
func (c *Cluster) associateKeyring(existingKeyring string, d *apps.Deployment) error {
s := keyring.GetSecretStoreForDeployment(c.context, d)
return s.CreateOrUpdate(d.GetName(), existingKeyring)
}
func (c *Cluster) setDefaultFlagsMonConfigStore(mdsID string) error {
monStore := config.GetMonStore(c.context, c.fs.Namespace)
who := fmt.Sprintf("mds.%s", mdsID)
......
......@@ -36,6 +36,8 @@ import (
"github.com/rook/rook/pkg/operator/k8sutil"
kerrors "k8s.io/apimachinery/pkg/api/errors"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/runtime"
"sigs.k8s.io/controller-runtime/pkg/controller/controllerutil"
)
var logger = capnslog.NewPackageLogger("github.com/rook/rook", "op-mds")
......@@ -62,6 +64,7 @@ type Cluster struct {
fsID string
ownerRef metav1.OwnerReference
dataDirHostPath string
scheme *runtime.Scheme
}
type mdsConfig struct {
......@@ -74,22 +77,22 @@ type mdsConfig struct {
func NewCluster(
clusterInfo *cephconfig.ClusterInfo,
context *clusterd.Context,
rookVersion string,
clusterSpec *cephv1.ClusterSpec,
fs cephv1.CephFilesystem,
fsdetails *client.CephFilesystemDetails,
ownerRef metav1.OwnerReference,
dataDirHostPath string,
scheme *runtime.Scheme,
) *Cluster {
return &Cluster{
clusterInfo: clusterInfo,
context: context,
rookVersion: rookVersion,
clusterSpec: clusterSpec,
fs: fs,
fsID: strconv.Itoa(fsdetails.ID),
ownerRef: ownerRef,
dataDirHostPath: dataDirHostPath,
scheme: scheme,
}
}
......@@ -136,7 +139,7 @@ func (c *Cluster) Start() error {
}
// create unique key for each mds saved to k8s secret
keyring, err := c.generateKeyring(mdsConfig)
_, err := c.generateKeyring(mdsConfig)
if err != nil {
return errors.Wrapf(err, "failed to generate keyring for %q", resourceName)
}
......@@ -159,30 +162,30 @@ func (c *Cluster) Start() error {
d := c.makeDeployment(mdsConfig)
logger.Debugf("starting mds: %+v", d)
// Set owner ref to cephFilesystem object
err = controllerutil.SetControllerReference(&c.fs, d, c.scheme)
if err != nil {
return errors.Wrapf(err, "failed to set owner reference for ceph filesystem %q secret", d.Name)
}
// Set the deployment hash as an annotation
err = patch.DefaultAnnotator.SetLastAppliedAnnotation(d)
if err != nil {
return errors.Wrapf(err, "failed to set annotation for deployment %q", d.Name)
}
createdDeployment, createErr := c.context.Clientset.AppsV1().Deployments(c.fs.Namespace).Create(d)
_, createErr := c.context.Clientset.AppsV1().Deployments(c.fs.Namespace).Create(d)
if createErr != nil {
if !kerrors.IsAlreadyExists(createErr) {
return errors.Wrapf(createErr, "failed to create mds deployment %s", mdsConfig.ResourceName)
}
logger.Infof("deployment for mds %s already exists. updating if needed", mdsConfig.ResourceName)
createdDeployment, err = c.context.Clientset.AppsV1().Deployments(c.fs.Namespace).Get(d.Name, metav1.GetOptions{})
_, err = c.context.Clientset.AppsV1().Deployments(c.fs.Namespace).Get(d.Name, metav1.GetOptions{})
if err != nil {
return errors.Wrapf(err, "failed to get existing mds deployment %s for update", d.Name)
}
}
if err := c.associateKeyring(keyring, createdDeployment); err != nil {
logger.Warningf("failed to associate keyring with deployment for %q. %v", resourceName, err)
}
// keyring must be generated before update-and-wait since no keyring will prevent the
// deployment from reaching ready state
if createErr != nil && kerrors.IsAlreadyExists(createErr) {
daemon := string(config.MdsType)
if err = UpdateDeploymentAndWait(c.context, d, c.fs.Namespace, daemon, daemonLetterID, c.clusterSpec.SkipUpgradeChecks, false); err != nil {
......@@ -194,7 +197,7 @@ func (c *Cluster) Start() error {
}
if err := c.scaleDownDeployments(replicas, desiredDeployments); err != nil {
return errors.Wrapf(err, "failed to scale down mds deployments")
return errors.Wrap(err, "failed to scale down mds deployments")
}
return nil
......
......@@ -19,6 +19,7 @@ package mds
import (
"testing"
"github.com/rook/rook/pkg/client/clientset/versioned/scheme"
"github.com/rook/rook/pkg/operator/ceph/config"
cephv1 "github.com/rook/rook/pkg/apis/ceph.rook.io/v1"
......@@ -62,10 +63,10 @@ func testDeploymentObject(t *testing.T, network cephv1.NetworkSpec) (*apps.Deplo
CephVersion: cephver.Nautilus,
}
clientset := testop.New(t, 1)
c := NewCluster(
clusterInfo,
&clusterd.Context{Clientset: clientset},
"rook/rook:myversion",
&cephv1.ClusterSpec{
CephVersion: cephv1.CephVersionSpec{Image: "ceph/ceph:testversion"},
Network: network,
......@@ -74,6 +75,7 @@ func testDeploymentObject(t *testing.T, network cephv1.NetworkSpec) (*apps.Deplo
&client.CephFilesystemDetails{ID: 15},
metav1.OwnerReference{},
"/var/lib/rook/",
scheme.Scheme,
)
mdsTestConfig := &mdsConfig{
DaemonID: "myfs-a",
......
......@@ -31,7 +31,6 @@ import (
"github.com/rook/rook/pkg/operator/ceph/agent"
"github.com/rook/rook/pkg/operator/ceph/cluster"
"github.com/rook/rook/pkg/operator/ceph/csi"
"github.com/rook/rook/pkg/operator/ceph/file"
"github.com/rook/rook/pkg/operator/ceph/provisioner"
"github.com/rook/rook/pkg/operator/discover"
"github.com/rook/rook/pkg/operator/k8sutil"
......@@ -83,7 +82,7 @@ type Operator struct {
// New creates an operator instance
func New(context *clusterd.Context, volumeAttachmentWrapper attachment.Attachment, rookImage, securityAccount string) *Operator {
schemes := []k8sutil.CustomResource{cluster.ClusterResource, file.FilesystemResource, attachment.VolumeResource}
schemes := []k8sutil.CustomResource{cluster.ClusterResource, attachment.VolumeResource}
operatorNamespace := os.Getenv(k8sutil.PodNamespaceEnvVar)
o := &Operator{
......
......@@ -23,7 +23,6 @@ import (
"github.com/rook/rook/pkg/clusterd"
"github.com/rook/rook/pkg/daemon/ceph/agent/flexvolume/attachment"
"github.com/rook/rook/pkg/operator/ceph/cluster"
"github.com/rook/rook/pkg/operator/ceph/file"
"github.com/rook/rook/pkg/operator/test"
"github.com/stretchr/testify/assert"
)
......@@ -37,9 +36,9 @@ func TestOperator(t *testing.T) {
assert.NotNil(t, o.clusterController)
assert.NotNil(t, o.resources)
assert.Equal(t, context, o.context)
assert.Equal(t, len(o.resources), 3)
assert.Equal(t, len(o.resources), 2)
for _, r := range o.resources {
if r.Name != cluster.ClusterResource.Name && r.Name != file.FilesystemResource.Name && r.Name != attachment.VolumeResource.Name {
if r.Name != cluster.ClusterResource.Name && r.Name != attachment.VolumeResource.Name {
assert.Fail(t, fmt.Sprintf("Resource %s is not valid", r.Name))
}
}
......
......@@ -82,8 +82,13 @@ func (f *FilesystemOperation) Delete(name, namespace string) error {
return err
}
crdCheckerFunc := func() error {
_, err := f.k8sh.RookClientset.CephV1().CephFilesystems(namespace).Get(name, metav1.GetOptions{})
return err
}
logger.Infof("Deleted filesystem %s in namespace %s", name, namespace)
return nil
return f.k8sh.WaitForCustomResourceDeletion(namespace, crdCheckerFunc)
}
// List lists filesystems in Rook
......
......@@ -116,6 +116,19 @@ func (p *PoolOperation) DeletePool(blockClient *BlockOperation, namespace, poolN
}
}
logger.Infof("deleting pool %q", poolName)
return p.k8sh.RookClientset.CephV1().CephBlockPools(namespace).Delete(poolName, &metav1.DeleteOptions{})
logger.Infof("deleting pool CR %q", poolName)
err := p.k8sh.RookClientset.CephV1().CephBlockPools(namespace).Delete(poolName, &metav1.DeleteOptions{})
if err != nil {
if errors.IsNotFound(err) {
return nil
}
return fmt.Errorf("failed to delete pool CR. %v", err)
}
crdCheckerFunc := func() error {
_, err := p.k8sh.RookClientset.CephV1().CephBlockPools(namespace).Get(poolName, metav1.GetOptions{})
return err
}
return p.k8sh.WaitForCustomResourceDeletion(namespace, crdCheckerFunc)
}
......@@ -350,7 +350,7 @@ func (k8sh *K8sHelper) DeleteResource(args ...string) error {
func (k8sh *K8sHelper) WaitForCustomResourceDeletion(namespace string, checkerFunc func() error) error {
// wait for the operator to finalize and delete the CRD
for i := 0; i < 10; i++ {
for i := 0; i < 30; i++ {
err := checkerFunc()
if err == nil {
logger.Infof("custom resource %s still exists", namespace)
......
Supports Markdown
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment