Unverified Commit 81727cd1 authored by Sébastien Han's avatar Sébastien Han Committed by GitHub
Browse files

Merge pull request #4964 from leseb/refactor-controller-helpers

ceph: refactor controller helpers
parents d5d0817c 9a5bd4f9
Showing with 874 additions and 302 deletions
+874 -302
......@@ -396,7 +396,7 @@ Custom Resource Definitions. This will help with creating or modifying Rook-Ceph
deployments in the future with the updated schema validation.
```sh
kubectl apply -f upgrade-from-v1.1-crds.yaml
kubectl apply -f upgrade-from-v1.2-crds.yaml
```
......
......@@ -1536,6 +1536,7 @@
"k8s.io/api/storage/v1beta1",
"k8s.io/apiextensions-apiserver/pkg/client/clientset/clientset",
"k8s.io/apimachinery/pkg/api/errors",
"k8s.io/apimachinery/pkg/api/meta",
"k8s.io/apimachinery/pkg/api/resource",
"k8s.io/apimachinery/pkg/apis/meta/v1",
"k8s.io/apimachinery/pkg/fields",
......
......@@ -175,6 +175,9 @@ spec:
type: boolean
placement: {}
resources: {}
# somehow this is breaking the status, but let's keep this here so we don't forget it once we move to controller-runtime
# subresources:
# status: {}
additionalPrinterColumns:
- name: DataDirHostPath
type: string
......@@ -222,6 +225,8 @@ spec:
properties:
caps:
type: object
subresources:
status: {}
# OLM: END CEPH CLIENT CRD
# OLM: BEGIN CEPH FS CRD
---
......@@ -310,6 +315,8 @@ spec:
- name: Age
type: date
JSONPath: .metadata.creationTimestamp
subresources:
status: {}
# OLM: END CEPH FS CRD
# OLM: BEGIN CEPH NFS CRD
---
......@@ -346,6 +353,8 @@ spec:
annotations: {}
placement: {}
resources: {}
subresources:
status: {}
# OLM: END CEPH NFS CRD
# OLM: BEGIN CEPH OBJECT STORE CRD
---
......@@ -412,6 +421,8 @@ spec:
type: integer
preservePoolsOnDelete:
type: boolean
subresources:
status: {}
# OLM: END CEPH OBJECT STORE CRD
# OLM: BEGIN CEPH OBJECT STORE USERS CRD
---
......@@ -431,6 +442,8 @@ spec:
- objectuser
scope: Namespaced
version: v1
subresources:
status: {}
# OLM: END CEPH OBJECT STORE USERS CRD
# OLM: BEGIN CEPH BLOCK POOL CRD
---
......@@ -474,6 +487,8 @@ spec:
type: integer
minimum: 0
maximum: 9
subresources:
status: {}
# OLM: END CEPH BLOCK POOL CRD
# OLM: BEGIN CEPH VOLUME POOL CRD
---
......@@ -492,6 +507,8 @@ spec:
- rv
scope: Namespaced
version: v1alpha2
subresources:
status: {}
# OLM: END CEPH VOLUME POOL CRD
# OLM: BEGIN OBJECTBUCKET CRD
---
......
apiVersion: rbac.authorization.k8s.io/v1beta1
kind: ClusterRole
metadata:
name: rook-ceph-global-rules
labels:
operator: rook
storage-backend: ceph
rbac.ceph.rook.io/aggregate-to-rook-ceph-global: "true"
rules:
- apiGroups:
- ""
resources:
# Pod access is needed for fencing
- pods
# Node access is needed for determining nodes where mons should run
- nodes
- nodes/proxy
verbs:
- get
- list
- watch
- apiGroups:
- ""
resources:
- events
# PVs and PVCs are managed by the Rook provisioner
- persistentvolumes
- persistentvolumeclaims
- endpoints
verbs:
- get
- list
- watch
- patch
- create
- update
- delete
- apiGroups:
- storage.k8s.io
resources:
- storageclasses
verbs:
- get
- list
- watch
- apiGroups:
- batch
resources:
- jobs
verbs:
- get
- list
- watch
- create
- update
- delete
- apiGroups:
- ceph.rook.io
resources:
- "*"
verbs:
- "*"
- apiGroups:
- rook.io
resources:
- "*"
verbs:
- "*"
- apiGroups:
- policy
- apps
resources:
# This is for the clusterdisruption controller
- poddisruptionbudgets
# This is for both clusterdisruption and nodedrain controllers
- deployments
- replicasets
verbs:
- "*"
- apiGroups:
- healthchecking.openshift.io
resources:
- machinedisruptionbudgets
verbs:
- get
- list
- watch
- create
- update
- delete
- apiGroups:
- machine.openshift.io
resources:
- machines
verbs:
- get
- list
- watch
- create
- update
- delete
- apiGroups:
- storage.k8s.io
resources:
- csidrivers
verbs:
- create
---
apiVersion: apiextensions.k8s.io/v1beta1
kind: CustomResourceDefinition
metadata:
name: cephclients.ceph.rook.io
spec:
group: ceph.rook.io
names:
kind: CephClient
listKind: CephClientList
plural: cephclients
singular: cephclient
scope: Namespaced
version: v1
validation:
openAPIV3Schema:
properties:
spec:
properties:
caps:
type: object
# Latest custom resource definitions with schema validations
# Latest custom resource definitions
# Do NOT apply this until after all Rook-Ceph clusters have been updated
---
apiVersion: apiextensions.k8s.io/v1beta1
kind: CustomResourceDefinition
metadata:
name: cephclusters.ceph.rook.io
name: cephclients.ceph.rook.io
spec:
group: ceph.rook.io
names:
kind: CephCluster
listKind: CephClusterList
plural: cephclusters
singular: cephcluster
kind: CephClient
listKind: CephClientList
plural: cephclients
singular: cephclient
scope: Namespaced
version: v1
validation:
......@@ -19,175 +19,10 @@ spec:
properties:
spec:
properties:
annotations: {}
cephVersion:
properties:
allowUnsupported:
type: boolean
image:
type: string
dashboard:
properties:
enabled:
type: boolean
urlPrefix:
type: string
port:
type: integer
minimum: 0
maximum: 65535
ssl:
type: boolean
dataDirHostPath:
pattern: ^/(\S+)
type: string
disruptionManagement:
properties:
machineDisruptionBudgetNamespace:
type: string
managePodBudgets:
type: boolean
osdMaintenanceTimeout:
type: integer
manageMachineDisruptionBudgets:
type: boolean
skipUpgradeChecks:
type: boolean
mon:
properties:
allowMultiplePerNode:
type: boolean
count:
maximum: 9
minimum: 0
type: integer
volumeClaimTemplate: {}
mgr:
properties:
modules:
items:
properties:
name:
type: string
enabled:
type: boolean
network:
properties:
hostNetwork:
type: boolean
provider:
type: string
selectors: {}
storage:
properties:
disruptionManagement:
properties:
machineDisruptionBudgetNamespace:
type: string
managePodBudgets:
type: boolean
osdMaintenanceTimeout:
type: integer
manageMachineDisruptionBudgets:
type: boolean
useAllNodes:
type: boolean
nodes:
items:
properties:
name:
type: string
config:
properties:
metadataDevice:
type: string
storeType:
type: string
pattern: ^(filestore|bluestore)$
databaseSizeMB:
type: string
walSizeMB:
type: string
journalSizeMB:
type: string
osdsPerDevice:
type: string
encryptedDevice:
type: string
pattern: ^(true|false)$
useAllDevices:
type: boolean
deviceFilter:
type: string
devicePathFilter:
type: string
directories:
type: array
items:
properties:
path:
type: string
devices:
type: array
items:
properties:
name:
type: string
config: {}
resources: {}
type: array
useAllDevices:
type: boolean
deviceFilter:
type: string
devicePathFilter:
type: string
directories:
type: array
items:
properties:
path:
type: string
config: {}
storageClassDeviceSets: {}
monitoring:
properties:
enabled:
type: boolean
rulesNamespace:
type: string
rbdMirroring:
properties:
workers:
type: integer
removeOSDsIfOutAndSafeToRemove:
type: boolean
external:
properties:
enable:
type: boolean
placement: {}
resources: {}
additionalPrinterColumns:
- name: DataDirHostPath
type: string
description: Directory used on the K8s nodes
JSONPath: .spec.dataDirHostPath
- name: MonCount
type: string
description: Number of MONs
JSONPath: .spec.mon.count
- name: Age
type: date
JSONPath: .metadata.creationTimestamp
- name: State
type: string
description: Current State
JSONPath: .status.state
- name: Health
type: string
description: Ceph Health
JSONPath: .status.ceph.health
caps:
type: object
subresources:
status: {}
---
apiVersion: apiextensions.k8s.io/v1beta1
kind: CustomResourceDefinition
......@@ -228,6 +63,8 @@ spec:
minimum: 0
maximum: 10
type: integer
requireSafeReplicaSize:
type: boolean
erasureCoded:
properties:
dataChunks:
......@@ -250,6 +87,8 @@ spec:
minimum: 0
maximum: 10
type: integer
requireSafeReplicaSize:
type: boolean
erasureCoded:
properties:
dataChunks:
......@@ -270,6 +109,44 @@ spec:
- name: Age
type: date
JSONPath: .metadata.creationTimestamp
subresources:
status: {}
---
apiVersion: apiextensions.k8s.io/v1beta1
kind: CustomResourceDefinition
metadata:
name: cephnfses.ceph.rook.io
spec:
group: ceph.rook.io
names:
kind: CephNFS
listKind: CephNFSList
plural: cephnfses
singular: cephnfs
shortNames:
- nfs
scope: Namespaced
version: v1
validation:
openAPIV3Schema:
properties:
spec:
properties:
rados:
properties:
pool:
type: string
namespace:
type: string
server:
properties:
active:
type: integer
annotations: {}
placement: {}
resources: {}
subresources:
status: {}
---
apiVersion: apiextensions.k8s.io/v1beta1
kind: CustomResourceDefinition
......@@ -334,3 +211,85 @@ spec:
type: integer
preservePoolsOnDelete:
type: boolean
subresources:
status: {}
---
apiVersion: apiextensions.k8s.io/v1beta1
kind: CustomResourceDefinition
metadata:
name: cephobjectstoreusers.ceph.rook.io
spec:
group: ceph.rook.io
names:
kind: CephObjectStoreUser
listKind: CephObjectStoreUserList
plural: cephobjectstoreusers
singular: cephobjectstoreuser
shortNames:
- rcou
- objectuser
scope: Namespaced
version: v1
subresources:
status: {}
---
apiVersion: apiextensions.k8s.io/v1beta1
kind: CustomResourceDefinition
metadata:
name: cephblockpools.ceph.rook.io
spec:
group: ceph.rook.io
names:
kind: CephBlockPool
listKind: CephBlockPoolList
plural: cephblockpools
singular: cephblockpool
scope: Namespaced
version: v1
validation:
openAPIV3Schema:
properties:
spec:
properties:
failureDomain:
type: string
replicated:
properties:
size:
type: integer
minimum: 0
maximum: 9
targetSizeRatio:
type: number
requireSafeReplicaSize:
type: boolean
erasureCoded:
properties:
dataChunks:
type: integer
minimum: 0
maximum: 9
codingChunks:
type: integer
minimum: 0
maximum: 9
subresources:
status: {}
---
apiVersion: apiextensions.k8s.io/v1beta1
kind: CustomResourceDefinition
metadata:
name: volumes.rook.io
spec:
group: rook.io
names:
kind: Volume
listKind: VolumeList
plural: volumes
singular: volume
shortNames:
- rv
scope: Namespaced
version: v1alpha2
subresources:
status: {}
\ No newline at end of file
......@@ -41,8 +41,9 @@ var (
)
// IsReadyToReconcile determines if a controller is ready to reconcile or not
func IsReadyToReconcile(client client.Client, clustercontext *clusterd.Context, namespacedName types.NamespacedName) (cephv1.ClusterSpec, bool, reconcile.Result) {
func IsReadyToReconcile(client client.Client, clustercontext *clusterd.Context, namespacedName types.NamespacedName) (cephv1.ClusterSpec, bool, bool, reconcile.Result) {
namespacedName.Name = namespacedName.Namespace
cephClusterExists := true
// Running ceph commands won't work and the controller will keep re-queuing so I believe it's fine not to check
// Make sure a CephCluster exists before doing anything
......@@ -50,11 +51,12 @@ func IsReadyToReconcile(client client.Client, clustercontext *clusterd.Context,
err := client.Get(context.TODO(), namespacedName, cephCluster)
if err != nil {
if kerrors.IsNotFound(err) {
cephClusterExists = false
logger.Errorf("CephCluster resource %q not found in namespace %q", namespacedName.Name, namespacedName.Namespace)
return cephv1.ClusterSpec{}, false, ImmediateRetryResult
return cephv1.ClusterSpec{}, false, cephClusterExists, ImmediateRetryResult
} else if err != nil {
logger.Errorf("failed to fetch CephCluster %v", err)
return cephv1.ClusterSpec{}, false, ImmediateRetryResult
return cephv1.ClusterSpec{}, false, cephClusterExists, ImmediateRetryResult
}
}
......@@ -67,36 +69,14 @@ func IsReadyToReconcile(client client.Client, clustercontext *clusterd.Context,
if err != nil {
if strings.Contains(err.Error(), "error calling conf_read_file") {
logger.Info("operator is not ready to run ceph command, cannot reconcile yet.")
return cephCluster.Spec, false, WaitForRequeueIfCephClusterNotReady
return cephCluster.Spec, false, cephClusterExists, WaitForRequeueIfCephClusterNotReady
}
// We should not arrive there
logger.Errorf("ceph command error %v", err)
return cephCluster.Spec, false, ImmediateRetryResult
return cephCluster.Spec, false, cephClusterExists, ImmediateRetryResult
}
return cephCluster.Spec, true, reconcile.Result{}
return cephCluster.Spec, true, cephClusterExists, reconcile.Result{}
}
return cephCluster.Spec, false, ImmediateRetryResult
}
// Contains checks if an item exists in a given list.
func Contains(list []string, s string) bool {
for _, v := range list {
if v == s {
return true
}
}
return false
}
// Removes any element from a list
func Remove(list []string, s string) []string {
for i, v := range list {
if v == s {
list = append(list[:i], list[i+1:]...)
}
}
return list
return cephCluster.Spec, false, cephClusterExists, ImmediateRetryResult
}
/*
Copyright 2020 The Rook Authors. All rights reserved.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package controller
import (
"context"
"fmt"
"strings"
"github.com/pkg/errors"
cephv1 "github.com/rook/rook/pkg/apis/ceph.rook.io/v1"
"k8s.io/apimachinery/pkg/api/meta"
"k8s.io/apimachinery/pkg/runtime"
"sigs.k8s.io/controller-runtime/pkg/client"
)
// contains checks if an item exists in a given list.
func contains(list []string, s string) bool {
for _, v := range list {
if v == s {
return true
}
}
return false
}
// remove removes any element from a list
func remove(list []string, s string) []string {
for i, v := range list {
if v == s {
list = append(list[:i], list[i+1:]...)
}
}
return list
}
// AddFinalizerIfNotPresent adds a finalizer an object to avoid instant deletion
// of the object without finalizing it.
func AddFinalizerIfNotPresent(client client.Client, obj runtime.Object) error {
objectFinalizer := buildFinalizerName(obj.GetObjectKind().GroupVersionKind().Kind)
accessor, err := meta.Accessor(obj)
if err != nil {
return errors.Wrap(err, "failed to get meta information of object")
}
if !contains(accessor.GetFinalizers(), objectFinalizer) {
logger.Infof("adding finalizer %q on %q", objectFinalizer, accessor.GetName())
accessor.SetFinalizers(append(accessor.GetFinalizers(), objectFinalizer))
// Update CR with finalizer
if err := client.Update(context.TODO(), obj); err != nil {
return errors.Wrapf(err, "failed to add finalizer %q on %q", objectFinalizer, accessor.GetName())
}
}
return nil
}
// RemoveFinalizer removes a finalizer from an object
func RemoveFinalizer(client client.Client, obj runtime.Object) error {
objectFinalizer := buildFinalizerName(obj.GetObjectKind().GroupVersionKind().Kind)
accessor, err := meta.Accessor(obj)
if err != nil {
return errors.Wrap(err, "failed to get meta information of object")
}
if contains(accessor.GetFinalizers(), objectFinalizer) {
logger.Infof("removing finalizer %q on %q", objectFinalizer, accessor.GetName())
accessor.SetFinalizers(remove(accessor.GetFinalizers(), objectFinalizer))
if err := client.Update(context.TODO(), obj); err != nil {
return errors.Wrapf(err, "failed to remove finalizer %q on %q", objectFinalizer, accessor.GetName())
}
}
return nil
}
// buildFinalizerName returns the finalizer name
func buildFinalizerName(kind string) string {
return fmt.Sprintf("%s.%s", strings.ToLower(kind), cephv1.CustomResourceGroup)
}
/*
Copyright 2020 The Rook Authors. All rights reserved.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package controller
import (
"testing"
cephv1 "github.com/rook/rook/pkg/apis/ceph.rook.io/v1"
"github.com/rook/rook/pkg/client/clientset/versioned/scheme"
"github.com/stretchr/testify/assert"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/runtime"
"sigs.k8s.io/controller-runtime/pkg/client/fake"
)
func TestAddFinalizerIfNotPresent(t *testing.T) {
fakeObject := &cephv1.CephBlockPool{
ObjectMeta: metav1.ObjectMeta{
Name: "test",
Namespace: "rook-ceph",
Finalizers: []string{},
},
}
// Objects to track in the fake client.
object := []runtime.Object{
fakeObject,
}
s := scheme.Scheme
s.AddKnownTypes(cephv1.SchemeGroupVersion, fakeObject)
cl := fake.NewFakeClientWithScheme(s, object...)
assert.Empty(t, fakeObject.Finalizers)
err := AddFinalizerIfNotPresent(cl, fakeObject)
assert.NoError(t, err)
assert.NotEmpty(t, fakeObject.Finalizers)
}
func TestRemoveFinalizer(t *testing.T) {
fakeObject := &cephv1.CephBlockPool{
ObjectMeta: metav1.ObjectMeta{
Name: "test",
Namespace: "rook-ceph",
Finalizers: []string{
"cephblockpool.ceph.rook.io",
},
},
TypeMeta: metav1.TypeMeta{
Kind: "cephblockpool",
},
}
object := []runtime.Object{
fakeObject,
}
s := scheme.Scheme
s.AddKnownTypes(cephv1.SchemeGroupVersion, fakeObject)
cl := fake.NewFakeClientWithScheme(s, object...)
assert.NotEmpty(t, fakeObject.Finalizers)
err := RemoveFinalizer(cl, fakeObject)
assert.NoError(t, err)
assert.Empty(t, fakeObject.Finalizers)
}
/*
Copyright 2020 The Rook Authors. All rights reserved.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package controller
import (
"encoding/json"
"github.com/banzaicloud/k8s-objectmatcher/patch"
"github.com/pkg/errors"
"k8s.io/apimachinery/pkg/api/meta"
"k8s.io/apimachinery/pkg/runtime"
"sigs.k8s.io/controller-runtime/pkg/event"
"sigs.k8s.io/controller-runtime/pkg/predicate"
)
// WatchUpdatePredicate is a special update filter for update events
// do not reconcile if the the status changes, this avoids a reconcile storm loop
//
// returning 'true' means triggering a reconciliation
// returning 'false' means do NOT trigger a reconciliation
func WatchUpdatePredicate() predicate.Funcs {
return predicate.Funcs{
UpdateFunc: func(e event.UpdateEvent) bool {
objectChanged, err := objectChanged(e.ObjectOld, e.ObjectNew)
if err != nil {
logger.Errorf("failed to check if object changed. %v", err)
}
if !objectChanged {
return false
}
return true
},
}
}
// objectChanged checks whether the object has been updated
func objectChanged(oldObj, newObj runtime.Object) (bool, error) {
old := oldObj.DeepCopyObject()
new := newObj.DeepCopyObject()
// Set resource version
accessor := meta.NewAccessor()
currentResourceVersion, err := accessor.ResourceVersion(old)
if err == nil {
accessor.SetResourceVersion(new, currentResourceVersion)
}
// Calculate diff between old and new object
diff, err := patch.DefaultPatchMaker.Calculate(old, new)
if err != nil {
return true, errors.Wrap(err, "failed to calculate object diff")
} else if diff.IsEmpty() {
return false, nil
}
logger.Debugf("%v", diff.String())
// It looks like there is a diff
// if the status changed, we do nothing
var patch map[string]interface{}
json.Unmarshal(diff.Patch, &patch)
delete(patch, "status")
if len(patch) == 0 {
return false, nil
}
// Get object meta
objectMeta, err := meta.Accessor(newObj)
if err != nil {
return true, errors.Wrap(err, "failed to get object meta")
}
// This handles the case where the object is deleted
spec := patch["spec"]
if spec != nil {
logger.Infof("object spec %q changed with %v", objectMeta.GetName(), spec)
}
return true, nil
}
/*
Copyright 2020 The Rook Authors. All rights reserved.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package controller
import (
"testing"
cephv1 "github.com/rook/rook/pkg/apis/ceph.rook.io/v1"
"github.com/stretchr/testify/assert"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
)
var (
name = "my-pool"
namespace = "rook-ceph"
oldReplicas uint = 3
newReplicas uint = 2
)
func TestObjectChanged(t *testing.T) {
oldObject := &cephv1.CephBlockPool{
ObjectMeta: metav1.ObjectMeta{
Name: name,
Namespace: namespace,
},
Spec: cephv1.PoolSpec{
Replicated: cephv1.ReplicatedSpec{
Size: oldReplicas,
},
},
Status: &cephv1.Status{
Phase: "",
},
}
newObject := &cephv1.CephBlockPool{
ObjectMeta: metav1.ObjectMeta{
Name: name,
Namespace: namespace,
},
Spec: cephv1.PoolSpec{
Replicated: cephv1.ReplicatedSpec{
Size: oldReplicas,
},
},
Status: &cephv1.Status{
Phase: "",
},
}
// Identical
changed, err := objectChanged(oldObject, newObject)
assert.NoError(t, err)
assert.False(t, changed)
// Replica size changed
oldObject.Spec.Replicated.Size = newReplicas
changed, err = objectChanged(oldObject, newObject)
assert.NoError(t, err)
assert.True(t, changed)
}
/*
Copyright 2020 The Rook Authors. All rights reserved.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package controller
import (
"context"
"k8s.io/apimachinery/pkg/api/meta"
"github.com/pkg/errors"
kerrors "k8s.io/apimachinery/pkg/api/errors"
"k8s.io/apimachinery/pkg/runtime"
"sigs.k8s.io/controller-runtime/pkg/client"
)
// UpdateStatus updates an object with a given status
func UpdateStatus(client client.Client, obj runtime.Object) error {
accessor, err := meta.Accessor(obj)
if err != nil {
return errors.Wrap(err, "failed to get meta information of object")
}
// Try to update the status
err = client.Status().Update(context.Background(), obj)
// If the object doesn't exist yet, we need to initialize it
if kerrors.IsNotFound(err) {
err = client.Update(context.Background(), obj)
}
if err != nil {
if !kerrors.IsConflict(err) || !kerrors.IsInvalid(err) {
return errors.Wrapf(err, "failed to update object %q status", accessor.GetName())
}
err = client.Status().Update(context.Background(), obj)
if kerrors.IsNotFound(err) {
err = client.Update(context.Background(), obj)
}
if err != nil {
return errors.Wrapf(err, "failed to update object %q status", accessor.GetName())
}
}
return nil
}
/*
Copyright 2020 The Rook Authors. All rights reserved.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package controller
import (
"testing"
cephv1 "github.com/rook/rook/pkg/apis/ceph.rook.io/v1"
"github.com/rook/rook/pkg/client/clientset/versioned/scheme"
"github.com/rook/rook/pkg/operator/k8sutil"
"github.com/stretchr/testify/assert"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/runtime"
"sigs.k8s.io/controller-runtime/pkg/client/fake"
)
func TestUpdateStatus(t *testing.T) {
fakeObject := &cephv1.CephBlockPool{
ObjectMeta: metav1.ObjectMeta{
Name: "test",
Namespace: "rook-ceph",
Finalizers: []string{},
},
Status: &cephv1.Status{
Phase: "",
},
}
// Objects to track in the fake client.
object := []runtime.Object{
fakeObject,
}
s := scheme.Scheme
s.AddKnownTypes(cephv1.SchemeGroupVersion, fakeObject)
cl := fake.NewFakeClientWithScheme(s, object...)
fakeObject.Status.Phase = k8sutil.ReadyStatus
err := UpdateStatus(cl, fakeObject)
assert.NoError(t, err)
assert.Equal(t, fakeObject.Status.Phase, k8sutil.ReadyStatus)
}
......@@ -19,8 +19,6 @@ package pool
import (
"context"
"strconv"
"strings"
"github.com/coreos/pkg/capnslog"
cephclient "github.com/rook/rook/pkg/daemon/ceph/client"
......@@ -32,7 +30,6 @@ import (
opcontroller "github.com/rook/rook/pkg/operator/ceph/controller"
"github.com/rook/rook/pkg/operator/k8sutil"
kerrors "k8s.io/apimachinery/pkg/api/errors"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/runtime"
"sigs.k8s.io/controller-runtime/pkg/client"
"sigs.k8s.io/controller-runtime/pkg/controller"
......@@ -47,7 +44,6 @@ const (
erasureCodeType = "erasure-coded"
poolApplicationNameRBD = "rbd"
controllerName = "ceph-block-pool-controller"
cephBlockPoolFinalizer = "finalizer.ceph.io"
)
var logger = capnslog.NewPackageLogger("github.com/rook/rook", controllerName)
......@@ -88,7 +84,7 @@ func add(mgr manager.Manager, r reconcile.Reconciler) error {
}
// Watch for changes on the CephBlockPool CRD object
err = c.Watch(&source.Kind{Type: &cephv1.CephBlockPool{}}, &handler.EnqueueRequestForObject{})
err = c.Watch(&source.Kind{Type: &cephv1.CephBlockPool{}}, &handler.EnqueueRequestForObject{}, opcontroller.WatchUpdatePredicate())
if err != nil {
return err
}
......@@ -111,13 +107,6 @@ func (r *ReconcileCephBlockPool) Reconcile(request reconcile.Request) (reconcile
}
func (r *ReconcileCephBlockPool) reconcile(request reconcile.Request) (reconcile.Result, error) {
// Make sure a CephCluster is present otherwise do nothing
_, isReadyToReconcile, reconcileResponse := opcontroller.IsReadyToReconcile(r.client, r.context, request.NamespacedName)
if !isReadyToReconcile {
logger.Debugf("CephCluster resource not ready in namespace %q, retrying in %q.", request.NamespacedName.Namespace, opcontroller.WaitForRequeueIfCephClusterNotReadyAfter.String())
return reconcileResponse, nil
}
// Fetch the CephBlockPool instance
cephBlockPool := &cephv1.CephBlockPool{}
err := r.client.Get(context.TODO(), request.NamespacedName, cephBlockPool)
......@@ -127,21 +116,47 @@ func (r *ReconcileCephBlockPool) reconcile(request reconcile.Request) (reconcile
return reconcile.Result{}, nil
}
// Error reading the object - requeue the request.
return opcontroller.ImmediateRetryResult, errors.Wrapf(err, "failed to get CephBlockPool")
return reconcile.Result{}, errors.Wrapf(err, "failed to get CephBlockPool")
}
// validate the pool settings
if err := ValidatePool(r.context, cephBlockPool); err != nil {
updateCephBlockPoolStatus(cephBlockPool.GetName(), cephBlockPool.GetNamespace(), k8sutil.ReconcileFailedStatus, r.context)
return reconcile.Result{}, errors.Wrapf(err, "invalid pool CR %q spec", cephBlockPool.Name)
// The CR was just created, initializing status fields
if cephBlockPool.Status == nil {
cephBlockPool.Status = &cephv1.Status{}
cephBlockPool.Status.Phase = k8sutil.Created
err := opcontroller.UpdateStatus(r.client, cephBlockPool)
if err != nil {
return reconcile.Result{}, errors.Wrap(err, "failed to set status")
}
}
// Set a finalizer so we can do cleanup before the object goes away
if !opcontroller.Contains(cephBlockPool.GetFinalizers(), cephBlockPoolFinalizer) {
err := r.addFinalizer(cephBlockPool)
if err != nil {
return opcontroller.ImmediateRetryResult, errors.Wrapf(err, "failed to add finalizer")
// Make sure a CephCluster is present otherwise do nothing
_, isReadyToReconcile, cephClusterExists, reconcileResponse := opcontroller.IsReadyToReconcile(r.client, r.context, request.NamespacedName)
if !isReadyToReconcile {
// This handles the case where the Ceph Cluster is gone and we want to delete that CR
// We skip the deletePool() function since everything is gone already
//
// ALso, only remove the finalizer if the CephCluster is gone
// If not, we should wait for it to be ready
// This handles the case where the operator is not ready to accept Ceph command but the cluster exists
if !cephBlockPool.GetDeletionTimestamp().IsZero() && !cephClusterExists {
// Remove finalizer
err = opcontroller.RemoveFinalizer(r.client, cephBlockPool)
if err != nil {
return reconcile.Result{}, errors.Wrap(err, "failed to remove finalizer")
}
// Return and do not requeue. Successful deletion.
return reconcile.Result{}, nil
}
logger.Debugf("CephCluster resource not ready in namespace %q, retrying in %q.", request.NamespacedName.Namespace, opcontroller.WaitForRequeueIfCephClusterNotReadyAfter.String())
return reconcileResponse, nil
}
// Set a finalizer so we can do cleanup before the object goes away
err = opcontroller.AddFinalizerIfNotPresent(r.client, cephBlockPool)
if err != nil {
return reconcile.Result{}, errors.Wrap(err, "failed to add finalizer")
}
// DELETE: the CR was deleted
......@@ -149,56 +164,60 @@ func (r *ReconcileCephBlockPool) reconcile(request reconcile.Request) (reconcile
logger.Debugf("deleting pool %q", cephBlockPool.Name)
err := deletePool(r.context, cephBlockPool)
if err != nil {
logger.Errorf("could not delete pool %q. %v", cephBlockPool.Name, err)
return opcontroller.ImmediateRetryResult, errors.Wrapf(err, "failed to delete pool %q. ", cephBlockPool.Name)
return reconcile.Result{}, errors.Wrapf(err, "failed to delete pool %q. ", cephBlockPool.Name)
}
// Remove finalizer
err = r.removeFinalizer(cephBlockPool)
err = opcontroller.RemoveFinalizer(r.client, cephBlockPool)
if err != nil {
return opcontroller.ImmediateRetryResult, errors.Wrap(err, "failed to remove finalizer")
return reconcile.Result{}, errors.Wrap(err, "failed to remove finalizer")
}
// Return and do not requeue. Successful deletion.
return reconcile.Result{}, nil
}
// CREATE
if cephBlockPool.Status == nil || cephBlockPool.Status.Phase != k8sutil.ReadyStatus {
reconcileResponse, err := r.reconcileCreatePool(cephBlockPool)
if err != nil {
return reconcileResponse, errors.Wrapf(err, "failed to create pool %q.", cephBlockPool.GetName())
}
return reconcile.Result{}, nil
// validate the pool settings
if err := ValidatePool(r.context, cephBlockPool); err != nil {
return reconcile.Result{}, errors.Wrapf(err, "invalid pool CR %q spec", cephBlockPool.Name)
}
// UPDATE
needsUpdate, err := r.needsUpdate(cephBlockPool)
// Start object reconciliation, updating status for this
cephBlockPool.Status.Phase = k8sutil.ReconcilingStatus
err = opcontroller.UpdateStatus(r.client, cephBlockPool)
if err != nil {
return opcontroller.ImmediateRetryResult, errors.Wrapf(err, "failed to check if the pool %q needs to be updated", cephBlockPool.GetName())
return reconcile.Result{}, errors.Wrap(err, "failed to set status")
}
if needsUpdate {
reconcileResponse, err := r.reconcileCreatePool(cephBlockPool)
if err != nil {
return reconcileResponse, errors.Wrapf(err, "failed to create pool %q.", cephBlockPool.GetName())
// CREATE/UPDATE
reconcileResponse, err = r.reconcileCreatePool(cephBlockPool)
if err != nil {
cephBlockPool.Status.Phase = k8sutil.ReconcileFailedStatus
errStatus := opcontroller.UpdateStatus(r.client, cephBlockPool)
if errStatus != nil {
return reconcile.Result{}, errors.Wrap(errStatus, "failed to set status")
}
return reconcile.Result{}, nil
return reconcileResponse, errors.Wrapf(err, "failed to create pool %q.", cephBlockPool.GetName())
}
// Set Ready status, we are done reconciling
cephBlockPool.Status.Phase = k8sutil.ReadyStatus
err = opcontroller.UpdateStatus(r.client, cephBlockPool)
if err != nil {
return reconcile.Result{}, errors.Wrap(err, "failed to set status")
}
// Return and do not requeue
logger.Debug("done reconciling")
return reconcile.Result{}, nil
}
func (r *ReconcileCephBlockPool) reconcileCreatePool(cephBlockPool *cephv1.CephBlockPool) (reconcile.Result, error) {
updateCephBlockPoolStatus(cephBlockPool.GetName(), cephBlockPool.GetNamespace(), k8sutil.ProcessingStatus, r.context)
err := createPool(r.context, cephBlockPool)
if err != nil {
updateCephBlockPoolStatus(cephBlockPool.GetName(), cephBlockPool.GetNamespace(), k8sutil.FailedStatus, r.context)
return opcontroller.ImmediateRetryResult, errors.Wrapf(err, "failed to create pool %q.", cephBlockPool.GetName())
return reconcile.Result{}, errors.Wrapf(err, "failed to create pool %q.", cephBlockPool.GetName())
}
// Set Ready status
updateCephBlockPoolStatus(cephBlockPool.GetName(), cephBlockPool.GetNamespace(), k8sutil.ReadyStatus, r.context)
// Let's return here so that on the initial creation we don't check for update right away
return reconcile.Result{}, nil
}
......@@ -299,91 +318,3 @@ func ValidatePoolSpec(context *clusterd.Context, namespace string, p *cephv1.Poo
return nil
}
func (r *ReconcileCephBlockPool) needsUpdate(pool *cephv1.CephBlockPool) (bool, error) {
var needUpdates bool
if pool.Spec.Replicated.Size > 0 {
replicatedPoolDetails, err := cephclient.GetPoolDetails(r.context, pool.GetNamespace(), pool.GetName())
if err != nil {
if strings.Contains(err.Error(), "error calling conf_read_file") {
return false, errors.Errorf("ceph %q cluster is not ready, cannot check pool details yet.", pool.GetNamespace())
}
return false, errors.Wrapf(err, "failed to get pool %q details", pool.GetName())
}
// Was the size updated?
if replicatedPoolDetails.Size != pool.Spec.Replicated.Size {
logger.Infof("pool size property changed from %d to %d, updating.", replicatedPoolDetails.Size, pool.Spec.Replicated.Size)
needUpdates = true
}
// Was the target_size_ratio updated?
if replicatedPoolDetails.TargetSizeRatio != pool.Spec.Replicated.TargetSizeRatio {
logger.Infof("pool target_size_ratio property changed from %q to %q, updating.", strconv.FormatFloat(replicatedPoolDetails.TargetSizeRatio, 'f', -1, 32), strconv.FormatFloat(pool.Spec.Replicated.TargetSizeRatio, 'f', -1, 32))
needUpdates = true
}
} else {
erasurePoolDetails, err := cephclient.GetErasureCodeProfileDetails(r.context, pool.GetNamespace(), pool.GetName())
if err != nil {
if strings.Contains(err.Error(), "error calling conf_read_file") {
return false, errors.Errorf("ceph %q cluster is not ready, cannot check pool details yet.", pool.GetNamespace())
}
return false, errors.Wrapf(err, "failed to get pool %q details", pool.GetName())
}
if erasurePoolDetails.CodingChunkCount != pool.Spec.ErasureCoded.CodingChunks {
logger.Infof("pool coding chunk count property changed from %d to %d, updating.", erasurePoolDetails.CodingChunkCount, pool.Spec.ErasureCoded.CodingChunks)
needUpdates = true
}
if erasurePoolDetails.DataChunkCount != pool.Spec.ErasureCoded.DataChunks {
logger.Infof("pool data chunk count property changed from %d to %d, updating.", erasurePoolDetails.DataChunkCount, pool.Spec.ErasureCoded.DataChunks)
needUpdates = true
}
}
return needUpdates, nil
}
func updateCephBlockPoolStatus(name, namespace, status string, context *clusterd.Context) {
updatedCephBlockPool, err := context.RookClientset.CephV1().CephBlockPools(namespace).Get(name, metav1.GetOptions{})
if err != nil {
logger.Errorf("Unable to update the cephBlockPool %s status %v", updatedCephBlockPool.GetName(), err)
return
}
if updatedCephBlockPool.Status == nil {
updatedCephBlockPool.Status = &cephv1.Status{}
} else if updatedCephBlockPool.Status.Phase == status {
return
}
updatedCephBlockPool.Status.Phase = status
_, err = context.RookClientset.CephV1().CephBlockPools(updatedCephBlockPool.Namespace).Update(updatedCephBlockPool)
if err != nil {
logger.Errorf("Unable to update the cephBlockPool %s status %v", updatedCephBlockPool.GetName(), err)
return
}
}
// addFinalizer adds a finalizer on the cluster object to avoid instant deletion
// of the object without finalizing it.
func (r *ReconcileCephBlockPool) addFinalizer(cephBlockPool *cephv1.CephBlockPool) error {
logger.Infof("adding finalizer on %q", cephBlockPool.Name)
cephBlockPool.SetFinalizers(append(cephBlockPool.GetFinalizers(), cephBlockPoolFinalizer))
// Update CR with finalizer
if err := r.client.Update(context.TODO(), cephBlockPool); err != nil {
return errors.Wrapf(err, "failed to add finalizer on %q", cephBlockPool.Name)
}
return nil
}
func (r *ReconcileCephBlockPool) removeFinalizer(cephBlockPool *cephv1.CephBlockPool) error {
logger.Infof("removing finalizer on %q", cephBlockPool.Name)
cephBlockPool.SetFinalizers(opcontroller.Remove(cephBlockPool.GetFinalizers(), cephBlockPoolFinalizer))
if err := r.client.Update(context.TODO(), cephBlockPool); err != nil {
return errors.Wrapf(err, "failed to remove finalizer on %q", cephBlockPool.Name)
}
return nil
}
......@@ -17,6 +17,7 @@ limitations under the License.
package pool
import (
"context"
"testing"
"github.com/pkg/errors"
......@@ -243,7 +244,7 @@ func TestCephBlockPoolController(t *testing.T) {
return "", nil
},
}
context := &clusterd.Context{
c := &clusterd.Context{
Executor: executor,
RookClientset: rookclient.NewSimpleClientset()}
......@@ -254,7 +255,7 @@ func TestCephBlockPoolController(t *testing.T) {
// Create a fake client to mock API calls.
cl := fake.NewFakeClient(object...)
// Create a ReconcileCephBlockPool object with the scheme and fake client.
r := &ReconcileCephBlockPool{client: cl, scheme: s, context: context}
r := &ReconcileCephBlockPool{client: cl, scheme: s, context: c}
// Mock request to simulate Reconcile() being called on an event for a
// watched resource .
......@@ -266,7 +267,7 @@ func TestCephBlockPoolController(t *testing.T) {
}
// Create pool for updateCephBlockPoolStatus()
_, err := context.RookClientset.CephV1().CephBlockPools(namespace).Create(pool)
_, err := c.RookClientset.CephV1().CephBlockPools(namespace).Create(pool)
assert.NoError(t, err)
res, err := r.Reconcile(req)
assert.NoError(t, err)
......@@ -289,14 +290,14 @@ func TestCephBlockPoolController(t *testing.T) {
s.AddKnownTypes(cephv1.SchemeGroupVersion, cephCluster)
// Create CephCluster for updateCephBlockPoolStatus()
_, err = context.RookClientset.CephV1().CephClusters(namespace).Create(cephCluster)
_, err = c.RookClientset.CephV1().CephClusters(namespace).Create(cephCluster)
assert.NoError(t, err)
object = append(object, cephCluster)
// Create a fake client to mock API calls.
cl = fake.NewFakeClient(object...)
// Create a ReconcileCephBlockPool object with the scheme and fake client.
r = &ReconcileCephBlockPool{client: cl, scheme: s, context: context}
r = &ReconcileCephBlockPool{client: cl, scheme: s, context: c}
assert.True(t, res.Requeue)
......@@ -313,14 +314,13 @@ func TestCephBlockPoolController(t *testing.T) {
// Create a fake client to mock API calls.
cl = fake.NewFakeClient(objects...)
// Create a ReconcileCephBlockPool object with the scheme and fake client.
r = &ReconcileCephBlockPool{client: cl, scheme: s, context: context}
r = &ReconcileCephBlockPool{client: cl, scheme: s, context: c}
res, err = r.Reconcile(req)
assert.NoError(t, err)
assert.False(t, res.Requeue)
// Get the updated CephBlockPool object.
pool, err = context.RookClientset.CephV1().CephBlockPools(namespace).Get(name, metav1.GetOptions{})
err = r.client.Get(context.TODO(), req.NamespacedName, pool)
assert.NoError(t, err)
assert.Equal(t, "Ready", pool.Status.Phase)
}
......@@ -27,4 +27,6 @@ const (
ReconcilingStatus = "Reconciling"
// ReconcileFailedStatus indicates a reconciliation failed
ReconcileFailedStatus = "ReconcileFailed"
// Created indicates the object just got created
Created = "Created"
)
......@@ -441,6 +441,7 @@ func (h *CephInstaller) UninstallRookFromMultipleNS(gatherLogs bool, systemNames
pools, err := h.k8shelper.RookClientset.CephV1().CephBlockPools(namespace).List(metav1.ListOptions{})
assert.NoError(h.T(), err, "failed to retrieve pool CRs")
for _, pool := range pools.Items {
logger.Infof("found pools: %v", pools)
assert.Failf(h.T(), "pool %q still exists", pool.Name)
}
......
......@@ -338,6 +338,8 @@ spec:
- name: Age
type: date
JSONPath: .metadata.creationTimestamp
subresources:
status: {}
---
apiVersion: apiextensions.k8s.io/v1beta1
kind: CustomResourceDefinition
......@@ -372,6 +374,8 @@ spec:
annotations: {}
placement: {}
resources: {}
subresources:
status: {}
---
apiVersion: apiextensions.k8s.io/v1beta1
kind: CustomResourceDefinition
......@@ -440,6 +444,8 @@ spec:
type: integer
preservePoolsOnDelete:
type: boolean
subresources:
status: {}
---
apiVersion: apiextensions.k8s.io/v1beta1
kind: CustomResourceDefinition
......@@ -457,6 +463,8 @@ spec:
- objectuser
scope: Namespaced
version: v1
subresources:
status: {}
---
apiVersion: apiextensions.k8s.io/v1beta1
kind: CustomResourceDefinition
......@@ -498,6 +506,8 @@ spec:
type: integer
minimum: 0
maximum: 9
subresources:
status: {}
---
apiVersion: apiextensions.k8s.io/v1beta1
kind: CustomResourceDefinition
......@@ -514,6 +524,8 @@ spec:
- rv
scope: Namespaced
version: v1alpha2
subresources:
status: {}
---
apiVersion: apiextensions.k8s.io/v1beta1
kind: CustomResourceDefinition
......@@ -584,7 +596,9 @@ spec:
osd:
type: string
mds:
type: string`
type: string
subresources:
status: {}`
}
// GetRookOperator returns rook Operator manifest
......
Supports Markdown
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment