Unverified Commit 01ba50ea authored by Dmitry Yusupov's avatar Dmitry Yusupov Committed by GitHub
Browse files

Merge pull request #3463 from sabbot/node-add-remove

Add/remove nodes via EdgeFS cluster' CRD
parents 41005b53 aea098b8
Showing with 719 additions and 308 deletions
+719 -308
......@@ -90,4 +90,30 @@ kubectl exec -it -n $CLUSTER_NAME rook-edgefs-mgr-xxxx-xxx -- toolbox
efscli system status -v 1
```
## EdgeFS Nodes update
Nodes can be added and removed over time by updating the Cluster CRD, for example with `kubectl edit Cluster -n rook-edgefs`.
This will bring up your default text editor and allow you to add and remove storage nodes from the cluster.
This feature is only available when `useAllNodes` has been set to `false` and `resurrect` mode is not used.
### 1. Add node example
#### a. Edit Cluster CRD `kubectl edit Cluster -n rook-edgefs`
#### b. Add new node section with desired configuration in storage section of Cluster CRD
Currently we adding new node `node3072ub16` with two drives `sdb` and `sdc` on it.
```yaml
- config: null
devices:
- FullPath: ""
config: null
name: sdb
- FullPath: ""
config: null
name: sdc
name: node3072ub16
resources: {}
```
#### c. Save CRD and operator will update all target nodes and related pods of the EdgeFS cluster.
#### d. Login to EdgeFS mgr toolbox and adjust FlexHash table to a new configuration using `efscli system fhtable` command.
......@@ -15,6 +15,10 @@ limitations under the License.
*/
package v1beta1
import (
"fmt"
)
const (
DeploymentRtlfs = "rtlfs"
DeploymentRtrd = "rtrd"
......@@ -29,6 +33,47 @@ type ClusterDeploymentConfig struct {
NeedPrivileges bool
}
// GetRtlfsDevices returns array of Rtlfs devices in cluster,
// Rtlfs devices must be the same over the cluster configuration,
// so we can get first non gateway deviceConfig
func (deploymentConfig *ClusterDeploymentConfig) GetRtlfsDevices() []RtlfsDevice {
rtlfsDevices := make([]RtlfsDevice, 0)
for _, devConfig := range deploymentConfig.DevConfig {
if !devConfig.IsGatewayNode {
return devConfig.Rtlfs.Devices
}
}
return rtlfsDevices
}
func (deploymentConfig *ClusterDeploymentConfig) CompatibleWith(newConfig ClusterDeploymentConfig) (bool, error) {
if deploymentConfig.DeploymentType != newConfig.DeploymentType {
return false, fmt.Errorf("DeploymentType `%s` != `%s` for updated cluster configuration", deploymentConfig.DeploymentType, newConfig.DeploymentType)
}
if deploymentConfig.TransportKey != newConfig.TransportKey {
return false, fmt.Errorf("TransportKey `%s` != `%s` for updated cluster configuration", deploymentConfig.TransportKey, newConfig.TransportKey)
}
return true, nil
}
// NodesDifference produces A\B for set of node names
// In case of
// A: existing cluster configuration
// B: updated cluster configuration
// A\B -> nodes to delete from cluster
// B\A -> nodes to add to cluster
func (deploymentConfig *ClusterDeploymentConfig) NodesDifference(B ClusterDeploymentConfig) []string {
difference := make([]string, 0)
for nodeName := range deploymentConfig.DevConfig {
if _, ok := B.DevConfig[nodeName]; !ok {
difference = append(difference, nodeName)
}
}
return difference
}
type DevicesConfig struct {
Rtrd RTDevices
RtrdSlaves []RTDevices
......@@ -37,6 +82,17 @@ type DevicesConfig struct {
IsGatewayNode bool
}
// GetRtrdDeviceCount returns all rtrd's devices count on specific node
func (dc *DevicesConfig) GetRtrdDeviceCount() int {
count := len(dc.Rtrd.Devices)
if count > 0 {
for _, rtrdSlave := range dc.RtrdSlaves {
count += len(rtrdSlave.Devices)
}
}
return count
}
type DevicesResurrectOptions struct {
NeedToResurrect bool
NeedToZap bool
......
......@@ -119,38 +119,38 @@ func (c *cluster) createInstance(rookImage string, isClusterUpdate bool) error {
}
dro := ParseDevicesResurrectMode(c.Spec.DevicesResurrectMode)
logger.Infof("DevicesResurrect mode: '%s' options %+v", c.Spec.DevicesResurrectMode, dro)
logger.Infof("DevicesResurrect options: %+v", dro)
// In case of Cluster update we need to retrieve cluster config from cluster ConfigMap
var deploymentConfig edgefsv1beta1.ClusterDeploymentConfig
if isClusterUpdate {
// Retrive existing cluster config from Kubernetes ConfigMap
existingConfig, err := c.retrieveDeploymentConfig()
if err != nil {
return fmt.Errorf("failed to retrive DeploymentConfig for cluster %s. %s", c.Namespace, err)
}
// do not allow DeploymentConfig recover in case of restore option is set. Because there is no devices information in config map
if dro.NeedToResurrect {
logger.Warningf("Cluster %s targets upgrade not allowed due `restore` option in devicesResurrectMode", c.Namespace)
} else {
clusterReconfiguration, err := c.createClusterReconfigurationSpec(existingConfig, clusterNodes, dro)
if err != nil {
return fmt.Errorf("failed to create Reconfiguration specification for cluster %s. %s", c.Namespace, err)
}
deploymentConfig, err = c.retrieveDeploymentConfig(clusterNodes)
if err != nil {
logger.Errorf("Failed to retrieve deploymentConfig %+v", err)
return err
}
logger.Debugf("Recovered ClusterConfig: %s", ToJSON(existingConfig))
c.PrintDeploymentConfig(&clusterReconfiguration.DeploymentConfig)
logger.Debugf("Recovered deploymentConfig: %+v", deploymentConfig)
}
// Unlabel nodes
for _, nodeName := range clusterReconfiguration.ClusterNodesToDelete {
//c.UnlabelNode(node)
logger.Infof("Unlabeling host `%s` as [%s] cluster's target node", nodeName, c.Namespace)
c.UnlabelTargetNode(nodeName)
}
} else {
deploymentConfig, err = c.createDeploymentConfig(clusterNodes, dro)
if err != nil {
logger.Errorf("Failed to create deploymentConfig %+v", err)
return err
}
for _, nodeName := range clusterReconfiguration.ClusterNodesToAdd {
//c.LabelNode(node)
logger.Infof("Labeling host `%s` as [%s] cluster's target node", nodeName, c.Namespace)
c.LabelTargetNode(nodeName)
}
if err := c.createClusterConfigMap(clusterNodes, deploymentConfig, dro.NeedToResurrect); err != nil {
logger.Errorf("Failed to create/update Edgefs cluster configuration: %+v", err)
return err
}
if err := c.createClusterConfigMap(clusterReconfiguration.DeploymentConfig, dro.NeedToResurrect); err != nil {
logger.Errorf("Failed to create/update Edgefs cluster configuration: %+v", err)
return err
}
//
......@@ -159,7 +159,7 @@ func (c *cluster) createInstance(rookImage string, isClusterUpdate bool) error {
//
if c.Spec.SkipHostPrepare == false && dro.NeedToResurrect == false {
err = c.prepareHostNodes(rookImage, deploymentConfig)
err = c.prepareHostNodes(rookImage, clusterReconfiguration.DeploymentConfig)
if err != nil {
logger.Errorf("Failed to create preparation jobs. %+v", err)
}
......@@ -167,17 +167,22 @@ func (c *cluster) createInstance(rookImage string, isClusterUpdate bool) error {
logger.Infof("EdgeFS node preparation will be skipped due skipHostPrepare=true or resurrect cluster option")
}
if err := c.createClusterConfigMap(clusterReconfiguration.DeploymentConfig, dro.NeedToResurrect); err != nil {
logger.Errorf("Failed to create/update Edgefs cluster configuration: %+v", err)
return err
}
//
// Create and start EdgeFS Targets StatefulSet
//
// Do not update targets when its clusterUpdate and restore option set.
// Do not update targets when clusterUpdate and restore option set.
// Because we can't recover information from 'restored' cluster's config map and deploymentConfig is incorrect
// Rest of deployments should be updated as is
if !(isClusterUpdate && dro.NeedToResurrect) {
c.targets = target.New(c.context, c.Namespace, "latest", c.Spec.ServiceAccount, c.Spec.Storage, c.Spec.DataDirHostPath, c.Spec.DataVolumeSize,
edgefsv1beta1.GetTargetAnnotations(c.Spec.Annotations), edgefsv1beta1.GetTargetPlacement(c.Spec.Placement), c.Spec.Network,
c.Spec.Resources, c.Spec.ResourceProfile, c.Spec.ChunkCacheSize, c.ownerRef, deploymentConfig)
c.Spec.Resources, c.Spec.ResourceProfile, c.Spec.ChunkCacheSize, c.ownerRef, clusterReconfiguration.DeploymentConfig)
err = c.targets.Start(rookImage, clusterNodes, dro)
if err != nil {
......@@ -191,6 +196,7 @@ func (c *cluster) createInstance(rookImage string, isClusterUpdate bool) error {
c.mgrs = mgr.New(c.context, c.Namespace, "latest", c.Spec.ServiceAccount, c.Spec.DataDirHostPath, c.Spec.DataVolumeSize,
edgefsv1beta1.GetMgrAnnotations(c.Spec.Annotations), edgefsv1beta1.GetMgrPlacement(c.Spec.Placement), c.Spec.Network, c.Spec.Dashboard,
v1.ResourceRequirements{}, c.Spec.ResourceProfile, c.ownerRef)
err = c.mgrs.Start(rookImage)
if err != nil {
return fmt.Errorf("failed to start the edgefs mgr. %+v", err)
......@@ -261,11 +267,11 @@ func (c *cluster) validateClusterSpec() error {
}
if len(c.Spec.DataDirHostPath) == 0 && c.Spec.DataVolumeSize.Value() == 0 {
return fmt.Errorf("DataDirHostPath or DataVolumeSize EdgeFS cluster's options not specified.")
return fmt.Errorf("DataDirHostPath or DataVolumeSize EdgeFS cluster's options not specified")
}
if len(c.Spec.DataDirHostPath) > 0 && c.Spec.DataVolumeSize.Value() != 0 {
return fmt.Errorf("Both deployment options DataDirHostPath and DataVolumeSize are specified. Should be only one deployment option in cluster specification.")
return fmt.Errorf("Both deployment options DataDirHostPath and DataVolumeSize are specified. Should be only one deployment option in cluster specification")
}
if len(c.Spec.Storage.Directories) > 0 &&
......
/*
Copyright 2019 The Rook Authors. All rights reserved.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package cluster
import (
"fmt"
"os"
edgefsv1beta1 "github.com/rook/rook/pkg/apis/edgefs.rook.io/v1beta1"
rookv1alpha2 "github.com/rook/rook/pkg/apis/rook.io/v1alpha2"
"github.com/rook/rook/pkg/operator/discover"
"github.com/rook/rook/pkg/operator/edgefs/cluster/target"
"github.com/rook/rook/pkg/operator/edgefs/cluster/target/config"
"github.com/rook/rook/pkg/operator/k8sutil"
"github.com/rook/rook/pkg/util/sys"
)
type ClusterReconfigureSpec struct {
DeploymentConfig edgefsv1beta1.ClusterDeploymentConfig
ClusterNodesToDelete []string
ClusterNodesToAdd []string
}
func (c *cluster) createClusterReconfigurationSpec(existingConfig edgefsv1beta1.ClusterDeploymentConfig, validNodes []rookv1alpha2.Node, dro edgefsv1beta1.DevicesResurrectOptions) (ClusterReconfigureSpec, error) {
deploymentType, err := c.getClusterDeploymentType()
if err != nil {
return ClusterReconfigureSpec{}, err
}
logger.Debugf("ClusterSpec: %s", ToJSON(c.Spec))
transportKey := getClusterTransportKey(deploymentType)
reconfigSpec := ClusterReconfigureSpec{
DeploymentConfig: edgefsv1beta1.ClusterDeploymentConfig{
DeploymentType: deploymentType,
TransportKey: transportKey,
DevConfig: make(map[string]edgefsv1beta1.DevicesConfig, 0),
NeedPrivileges: c.needPrivelege(deploymentType),
},
}
// Iterate over cluster nodes
for _, node := range validNodes {
// Copy devices confiruration for already existing devicesConfig
// We can't modify devices config for already existing node in config map
var devicesConfig edgefsv1beta1.DevicesConfig
if nodeDevConfig, ok := existingConfig.DevConfig[node.Name]; ok {
devicesConfig = nodeDevConfig
} else {
// create ned devices config for new node
devicesConfig, err = c.createDevicesConfig(deploymentType, node, dro)
if err != nil {
logger.Warningf("Can't create DevicesConfig for %s node, Error: %+v", node.Name, err)
continue
}
}
reconfigSpec.DeploymentConfig.DevConfig[node.Name] = devicesConfig
}
// Calculate nodes to delete from cluster
reconfigSpec.ClusterNodesToDelete = existingConfig.NodesDifference(reconfigSpec.DeploymentConfig)
// Calculate nodes to add to cluster
reconfigSpec.ClusterNodesToAdd = reconfigSpec.DeploymentConfig.NodesDifference(existingConfig)
_, err = existingConfig.CompatibleWith(reconfigSpec.DeploymentConfig)
if err != nil {
return reconfigSpec, err
}
err = c.alignSlaveContainers(&reconfigSpec.DeploymentConfig)
if err != nil {
return reconfigSpec, err
}
err = c.validateDeploymentConfig(reconfigSpec.DeploymentConfig, dro)
if err != nil {
return reconfigSpec, err
}
return reconfigSpec, nil
}
func (c *cluster) getClusterDeploymentType() (string, error) {
if len(c.Spec.Storage.Directories) > 0 && (len(c.Spec.DataDirHostPath) > 0 || c.Spec.DataVolumeSize.Value() != 0) {
return edgefsv1beta1.DeploymentRtlfs, nil
} else if c.HasDevicesSpecification() && (len(c.Spec.DataDirHostPath) > 0 || c.Spec.DataVolumeSize.Value() != 0) {
return edgefsv1beta1.DeploymentRtrd, nil
} else if len(c.Spec.DataDirHostPath) == 0 || c.Spec.DataVolumeSize.Value() == 0 {
return edgefsv1beta1.DeploymentAutoRtlfs, nil
}
return "", fmt.Errorf("Can't determine deployment type for [%s] cluster", c.Namespace)
}
func (c *cluster) needPrivelege(deploymentType string) bool {
needPriveleges := false
// Set privileges==true in case of HostNetwork
if len(c.Spec.Network.ServerIfName) > 0 || len(c.Spec.Network.BrokerIfName) > 0 {
needPriveleges = true
}
if deploymentType == edgefsv1beta1.DeploymentRtrd {
needPriveleges = true
}
return needPriveleges
}
func getClusterTransportKey(deploymentType string) string {
transportKey := edgefsv1beta1.DeploymentRtrd
if deploymentType == edgefsv1beta1.DeploymentRtlfs || deploymentType == edgefsv1beta1.DeploymentAutoRtlfs {
transportKey = edgefsv1beta1.DeploymentRtlfs
}
return transportKey
}
// createDevicesConfig creates DevicesConfig for specific node
func (c *cluster) createDevicesConfig(deploymentType string, node rookv1alpha2.Node, dro edgefsv1beta1.DevicesResurrectOptions) (edgefsv1beta1.DevicesConfig, error) {
devicesConfig := edgefsv1beta1.DevicesConfig{}
devicesConfig.Rtrd.Devices = make([]edgefsv1beta1.RTDevice, 0)
devicesConfig.RtrdSlaves = make([]edgefsv1beta1.RTDevices, 0)
devicesConfig.Rtlfs.Devices = make([]edgefsv1beta1.RtlfsDevice, 0)
n := c.resolveNode(node.Name)
if n == nil {
return devicesConfig, fmt.Errorf("Can't resolve node '%s'", node.Name)
}
storeConfig := config.ToStoreConfig(n.Config)
// Apply Node's zone value
devicesConfig.Zone = storeConfig.Zone
// If node labeled as gateway then return empty devices and skip RTDevices detection
if c.isGatewayLabeledNode(c.context.Clientset, node.Name) {
devicesConfig.IsGatewayNode = true
logger.Infof("Skipping node [%s] devices as labeled as gateway node", node.Name)
return devicesConfig, nil
}
// Skip device detection in case of 'restore' option.
// If dro.NeedToResurrect is true then there is no cluster's config map available
if dro.NeedToResurrect {
logger.Infof("Skipping node [%s] devices due 'restore' option", node.Name)
devicesConfig.Rtlfs.Devices = target.GetRtlfsDevices(c.Spec.Storage.Directories, &storeConfig)
if dro.SlaveContainers > 0 {
devicesConfig.RtrdSlaves = make([]edgefsv1beta1.RTDevices, dro.SlaveContainers)
}
return devicesConfig, nil
}
if deploymentType == edgefsv1beta1.DeploymentRtrd {
rookSystemNS := os.Getenv(k8sutil.PodNamespaceEnvVar)
nodeDevices, _ := discover.ListDevices(c.context, rookSystemNS, n.Name)
logger.Infof("[%s] available devices: ", n.Name)
for _, dev := range nodeDevices[n.Name] {
logger.Infof("\tName: %s, Size: %s, Type: %s, Rotational: %t, Empty: %t", dev.Name, edgefsv1beta1.ByteCountBinary(dev.Size), dev.Type, dev.Rotational, dev.Empty)
}
availDevs, deviceErr := discover.GetAvailableDevices(c.context, n.Name, c.Namespace,
n.Devices, n.Selection.DeviceFilter, n.Selection.GetUseAllDevices())
if deviceErr != nil {
// Devices were specified but we couldn't find any.
// User needs to fix CRD.
return devicesConfig, fmt.Errorf("failed to create DevicesConfig for node %s cluster %s: %v", n.Name, c.Namespace, deviceErr)
}
// Selects Disks from availDevs and translate to RTDevices
availDisks := []sys.LocalDisk{}
logger.Infof("[%s] selected devices: ", n.Name)
for _, dev := range availDevs {
for _, disk := range nodeDevices[n.Name] {
if disk.Name == dev.Name {
diskType := "SSD"
if disk.Rotational {
diskType = "HDD"
}
logger.Infof("\tName: %s, Type: %s, Size: %s", disk.Name, diskType, edgefsv1beta1.ByteCountBinary(disk.Size))
availDisks = append(availDisks, disk)
}
}
}
rtDevices, err := target.GetContainersRTDevices(n.Name, c.Spec.MaxContainerCapacity.Value(), availDisks, &storeConfig)
if err != nil {
logger.Warningf("Can't get rtDevices for node %s due %v", n.Name, err)
rtDevices = make([]edgefsv1beta1.RTDevices, 1)
}
if len(rtDevices) > 0 {
devicesConfig.Rtrd.Devices = rtDevices[0].Devices
// append to RtrdSlaves in case of additional containers
if len(rtDevices) > 1 {
devicesConfig.RtrdSlaves = make([]edgefsv1beta1.RTDevices, len(rtDevices)-1)
devicesConfig.RtrdSlaves = rtDevices[1:]
}
}
} else {
devicesConfig.Rtlfs.Devices = target.GetRtlfsDevices(c.Spec.Storage.Directories, &storeConfig)
}
return devicesConfig, nil
}
// ValidateZones validates all nodes in cluster that each one has valid zone number or all of them has zone == 0
func validateZones(deploymentConfig edgefsv1beta1.ClusterDeploymentConfig) error {
validZonesFound := 0
for _, nodeDevConfig := range deploymentConfig.DevConfig {
if nodeDevConfig.Zone > 0 {
validZonesFound = validZonesFound + 1
}
}
if validZonesFound > 0 && len(deploymentConfig.DevConfig) != validZonesFound {
return fmt.Errorf("Valid Zone number must be propagated to all nodes")
}
return nil
}
func (c *cluster) validateDeploymentConfig(deploymentConfig edgefsv1beta1.ClusterDeploymentConfig, dro edgefsv1beta1.DevicesResurrectOptions) error {
if len(deploymentConfig.TransportKey) == 0 || len(deploymentConfig.DeploymentType) == 0 {
return fmt.Errorf("ClusterDeploymentConfig has no valid TransportKey or DeploymentType")
}
err := validateZones(deploymentConfig)
if err != nil {
return err
}
deploymentNodesCount := len(deploymentConfig.DevConfig)
if deploymentConfig.TransportKey == edgefsv1beta1.DeploymentRtlfs {
// Check directories devices count on all nodes
if len(c.Spec.Storage.Directories)*deploymentNodesCount < 3 {
return fmt.Errorf("Rtlfs devices should be more then 3 on all nodes summary")
}
} else if deploymentConfig.TransportKey == edgefsv1beta1.DeploymentRtrd {
// Check all deployment nodes has available disk devices
devicesCount := 0
for nodeName, devCfg := range deploymentConfig.DevConfig {
if devCfg.IsGatewayNode {
continue
}
if len(devCfg.Rtrd.Devices) == 0 && !dro.NeedToResurrect {
return fmt.Errorf("Node %s has no available devices", nodeName)
}
devicesCount += devCfg.GetRtrdDeviceCount()
}
// Check new deployment devices count
if !dro.NeedToResurrect && devicesCount < 3 {
return fmt.Errorf("Disk devices should be more then 3 on all nodes summary")
}
}
return nil
}
func (c *cluster) alignSlaveContainers(deploymentConfig *edgefsv1beta1.ClusterDeploymentConfig) error {
nodeContainersCount := 0
// Get Max containers count over cluster's nodes
maxContainerCount := 0
for _, nodeDevConfig := range deploymentConfig.DevConfig {
// Skip GW node
if nodeDevConfig.IsGatewayNode {
continue
}
nodeContainersCount = len(nodeDevConfig.RtrdSlaves)
if maxContainerCount < nodeContainersCount {
maxContainerCount = nodeContainersCount
}
}
for nodeName, nodeDevConfig := range deploymentConfig.DevConfig {
// Skip GW node
if nodeDevConfig.IsGatewayNode {
continue
}
nodeContainersCount = len(nodeDevConfig.RtrdSlaves)
stubContainersCount := maxContainerCount - nodeContainersCount
for i := 0; i < stubContainersCount; i++ {
nodeDevConfig.RtrdSlaves = append(nodeDevConfig.RtrdSlaves, edgefsv1beta1.RTDevices{})
}
// Update nodeDevConfig record in deploymentConfig
if stubContainersCount > 0 {
deploymentConfig.DevConfig[nodeName] = nodeDevConfig
}
}
return nil
}
......@@ -18,10 +18,8 @@ package cluster
import (
"encoding/json"
"fmt"
edgefsv1beta1 "github.com/rook/rook/pkg/apis/edgefs.rook.io/v1beta1"
rookalpha "github.com/rook/rook/pkg/apis/rook.io/v1alpha2"
"github.com/rook/rook/pkg/operator/edgefs/cluster/target"
"github.com/rook/rook/pkg/operator/k8sutil"
"k8s.io/api/core/v1"
......@@ -39,12 +37,12 @@ const (
// As we relying on StatefulSet, we want to build global ConfigMap shared
// to all the nodes in the cluster. This way configuration is simplified and
// available to all subcomponents at any point it time.
func (c *cluster) createClusterConfigMap(nodes []rookalpha.Node, deploymentConfig edgefsv1beta1.ClusterDeploymentConfig, resurrect bool) error {
func (c *cluster) createClusterConfigMap(deploymentConfig edgefsv1beta1.ClusterDeploymentConfig, resurrect bool) error {
cm := make(map[string]edgefsv1beta1.SetupNode)
dnsRecords := make([]string, len(nodes))
for i := 0; i < len(nodes); i++ {
dnsRecords := make([]string, len(deploymentConfig.DevConfig))
for i := 0; i < len(deploymentConfig.DevConfig); i++ {
dnsRecords[i] = target.CreateQualifiedHeadlessServiceName(i, c.Namespace)
}
......@@ -65,8 +63,8 @@ func (c *cluster) createClusterConfigMap(nodes []rookalpha.Node, deploymentConfi
}
// Fully resolve the storage config and resources for all nodes
for _, node := range nodes {
devConfig := deploymentConfig.DevConfig[node.Name]
for nodeName := range deploymentConfig.DevConfig {
devConfig := deploymentConfig.DevConfig[nodeName]
rtDevices := devConfig.Rtrd.Devices
rtSlaveDevices := devConfig.RtrdSlaves
rtlfsDevices := devConfig.Rtlfs.Devices
......@@ -146,9 +144,9 @@ func (c *cluster) createClusterConfigMap(nodes []rookalpha.Node, deploymentConfi
nodeConfig.Ccowd.BgConfig.TrlogDeleteAfterHours = c.Spec.TrlogKeepDays * 24
}
cm[node.Name] = nodeConfig
cm[nodeName] = nodeConfig
logger.Debugf("Resolved Node %s = %+v", node.Name, cm[node.Name])
logger.Debugf("Resolved Node %s = %+v", nodeName, cm[nodeName])
}
nesetupJson, err := json.Marshal(&cm)
......@@ -178,78 +176,5 @@ func (c *cluster) createClusterConfigMap(nodes []rookalpha.Node, deploymentConfi
}
}
// Success. Do the labeling so that StatefulSet scheduler will
// select the right nodes.
for _, node := range nodes {
k := c.Namespace
err = c.AddLabelsToNode(c.context.Clientset, node.Name, map[string]string{k: "cluster"})
logger.Debugf("added label %s from %s: %+v", k, node.Name, err)
}
return nil
}
func (c *cluster) retrieveDeploymentConfig(nodes []rookalpha.Node) (edgefsv1beta1.ClusterDeploymentConfig, error) {
deploymentConfig := edgefsv1beta1.ClusterDeploymentConfig{
DevConfig: make(map[string]edgefsv1beta1.DevicesConfig, 0),
}
cm, err := c.context.Clientset.CoreV1().ConfigMaps(c.Namespace).Get(configName, metav1.GetOptions{})
if err != nil {
return deploymentConfig, err
}
setup := map[string]edgefsv1beta1.SetupNode{}
if nesetup, ok := cm.Data["nesetup"]; ok {
err = json.Unmarshal([]byte(nesetup), &setup)
if err != nil {
logger.Errorf("invalid JSON in cluster configmap. %+v", err)
}
deploymentTypeAchived := false
for _, node := range nodes {
if nodeConfig, ok := setup[node.Name]; ok {
devicesConfig := edgefsv1beta1.DevicesConfig{}
devicesConfig.Rtlfs = nodeConfig.Rtlfs
devicesConfig.Rtrd = nodeConfig.Rtrd
devicesConfig.RtrdSlaves = nodeConfig.RtrdSlaves
devicesConfig.IsGatewayNode = false
if nodeConfig.NodeType == "gateway" {
devicesConfig.IsGatewayNode = true
}
devicesConfig.Zone = nodeConfig.Ccowd.Zone
deploymentConfig.DevConfig[node.Name] = devicesConfig
// we can't detect deployment type on gw node, move to next one
if !devicesConfig.IsGatewayNode && !deploymentTypeAchived {
if len(nodeConfig.Rtrd.Devices) > 0 {
deploymentConfig.DeploymentType = edgefsv1beta1.DeploymentRtrd
deploymentConfig.TransportKey = "rtrd"
deploymentConfig.NeedPrivileges = true
} else if len(nodeConfig.Rtlfs.Devices) > 0 {
deploymentConfig.DeploymentType = edgefsv1beta1.DeploymentRtlfs
deploymentConfig.TransportKey = "rtlfs"
} else if len(nodeConfig.RtlfsAutodetect) > 0 {
deploymentConfig.DeploymentType = edgefsv1beta1.DeploymentAutoRtlfs
deploymentConfig.TransportKey = "rtlfs"
}
// hostNetwork option specified
if nodeConfig.Ccowd.Network.ServerInterfaces != "eth0" {
deploymentConfig.NeedPrivileges = true
}
deploymentTypeAchived = true
}
}
}
if deploymentConfig.DeploymentType == "" || deploymentConfig.TransportKey == "" {
return deploymentConfig, fmt.Errorf("Can't retrieve DeploymentConfig from config map. Unknown DeploymentType or TransportKey values")
}
}
return deploymentConfig, nil
}
......@@ -286,8 +286,8 @@ func (c *ClusterController) onUpdate(oldObj, newObj interface{}) {
}
logger.Infof("update event for cluster %s is supported, orchestrating update now", newCluster.Namespace)
logger.Infof("old cluster: %+v", oldCluster.Spec)
logger.Infof("new cluster: %+v", newCluster.Spec)
logger.Debugf("old cluster: %+v", oldCluster.Spec)
logger.Debugf("new cluster: %+v", newCluster.Spec)
cluster, ok := c.clusterMap[newCluster.Namespace]
if !ok {
......@@ -375,9 +375,7 @@ func (c *ClusterController) handleDelete(clust *edgefsv1beta1.Cluster, retryInte
}
for _, node := range cluster.targets.Storage.Nodes {
k := cluster.Namespace
err := cluster.RemoveLabelOffNode(cluster.context.Clientset, node.Name, []string{k})
logger.Infof("removed label %s from %s: %+v", k, node.Name, err)
cluster.UnlabelTargetNode(node.Name)
}
// delete associated node labels
......
......@@ -30,6 +30,7 @@ import (
)
const (
// DefaultContainerMaxCapacity - max allowed container disks capacity, if exeeded then new new container will be added
DefaultContainerMaxCapacity = "132Ti"
)
......@@ -167,17 +168,12 @@ func GetContainersRTDevices(nodeName string, maxContainerCapacity int64, nodeDis
}
containersRtDevices := make([]edgefsv1beta1.RTDevices, len(containers))
logger.Infof("Node [%s] target's devices distribution:", nodeName)
for i, container := range containers {
rtDevices, err := getRTDevices(container, storeConfig)
if err != nil {
return nil, err
}
// Just for debugging
for _, rt := range rtDevices {
logger.Infof("[%s] Container[%d] Device: %s, Name: %s, Journal: %s", nodeName, i, rt.Device, rt.Name, rt.Journal)
}
containersRtDevices[i].Devices = rtDevices
}
return containersRtDevices, nil
......
......@@ -18,13 +18,14 @@ package target
import (
"fmt"
"strconv"
edgefsv1beta1 "github.com/rook/rook/pkg/apis/edgefs.rook.io/v1beta1"
"github.com/rook/rook/pkg/operator/edgefs/cluster/target/config"
"github.com/rook/rook/pkg/operator/k8sutil"
appsv1 "k8s.io/api/apps/v1"
"k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"strconv"
)
const (
......@@ -96,6 +97,10 @@ func (c *Cluster) makeCorosyncContainer(containerImage string) v1.Container {
},
},
},
{
Name: "K8S_NAMESPACE",
Value: c.Namespace,
},
},
}
}
......@@ -406,7 +411,7 @@ func (c *Cluster) createPodSpec(rookImage string, dro edgefsv1beta1.DevicesResur
if c.deploymentConfig.DeploymentType == edgefsv1beta1.DeploymentRtlfs {
// RTLFS with specified folders
for _, folder := range c.deploymentConfig.Directories {
for _, folder := range c.deploymentConfig.GetRtlfsDevices() {
volumes = append(volumes, v1.Volume{
Name: folder.Name,
VolumeSource: v1.VolumeSource{
......
......@@ -43,7 +43,7 @@ func TestStorageSpecConfig(t *testing.T) {
Name: "node1",
Location: "zone1",
Config: map[string]string{
"rtTransport": "rtrd",
"rtTransport": edgefsv1beta1.DeploymentRtrd,
"useAllSSD": "true",
"useMetadataOffload": "false",
},
......
......@@ -40,14 +40,9 @@ const (
appNameFmt = "rook-edgefs-target-%s"
targetLabelKey = "edgefs-target-id"
defaultServiceAccountName = "rook-edgefs-cluster"
unknownID = -1
labelingRetries = 5
//deployment types
deploymentRtlfs = "rtlfs"
deploymentRtrd = "rtrd"
deploymentAutoRtlfs = "autoRtlfs"
nodeTypeLabelFmt = "%s-nodetype"
nodeTypeLabelFmt = "%s-nodetype"
sleepTime = 5 // time beetween statefulset update check
)
// Cluster keeps track of the Targets
......@@ -140,7 +135,7 @@ func (c *Cluster) Start(rookImage string, nodes []rookalpha.Node, dro edgefsv1be
if _, err := UpdateStatefulsetAndWait(c.context, statefulSet, c.Namespace); err != nil {
logger.Errorf("failed to update statefulset %s. %+v", statefulSet.Name, err)
return err
return nil
}
} else {
logger.Infof("stateful set %s created in namespace %s", statefulSet.Name, statefulSet.Namespace)
......@@ -156,6 +151,9 @@ func UpdateStatefulsetAndWait(context *clusterd.Context, sts *appsv1.StatefulSet
return nil, fmt.Errorf("failed to get statefulset %s. %+v", sts.Name, err)
}
// set updateTime annotation to force rolling update of Statefulset
sts.Spec.Template.Annotations["UpdateTime"] = time.Now().Format(time.RFC850)
_, err = context.Clientset.AppsV1().StatefulSets(namespace).Update(sts)
if err != nil {
return nil, fmt.Errorf("failed to update statefulset %s. %+v", sts.Name, err)
......@@ -171,11 +169,11 @@ func UpdateStatefulsetAndWait(context *clusterd.Context, sts *appsv1.StatefulSet
}
logger.Infof("Statefulset %s update in progress... status=%+v", statefulset.Name, statefulset.Status)
//
statefulsetReplicas := *statefulset.Spec.Replicas
if statefulset.Status.ObservedGeneration != original.Status.ObservedGeneration &&
statefulset.Status.UpdatedReplicas == original.Status.UpdatedReplicas &&
statefulset.Status.ReadyReplicas == original.Status.ReadyReplicas &&
statefulset.Status.CurrentReplicas == original.Status.CurrentReplicas {
statefulsetReplicas == statefulset.Status.ReadyReplicas &&
statefulsetReplicas == statefulset.Status.CurrentReplicas &&
statefulsetReplicas == statefulset.Status.UpdatedReplicas {
logger.Infof("Statefulset '%s' update is done", statefulset.Name)
return statefulset, nil
}
......
......@@ -16,6 +16,7 @@ limitations under the License.
package cluster
import (
"encoding/json"
"fmt"
"os"
"strconv"
......@@ -23,12 +24,9 @@ import (
"time"
edgefsv1beta1 "github.com/rook/rook/pkg/apis/edgefs.rook.io/v1beta1"
rookalpha "github.com/rook/rook/pkg/apis/rook.io/v1alpha2"
rookv1alpha2 "github.com/rook/rook/pkg/apis/rook.io/v1alpha2"
"github.com/rook/rook/pkg/operator/discover"
"github.com/rook/rook/pkg/operator/edgefs/cluster/target"
"github.com/rook/rook/pkg/operator/edgefs/cluster/target/config"
"github.com/rook/rook/pkg/operator/k8sutil"
"github.com/rook/rook/pkg/util/sys"
"k8s.io/api/core/v1"
apierrs "k8s.io/apimachinery/pkg/api/errors"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
......@@ -76,7 +74,17 @@ func ParseDevicesResurrectMode(resurrectMode string) edgefsv1beta1.DevicesResurr
return drm
}
func (c *cluster) getClusterNodes() ([]rookalpha.Node, error) {
func ToJSON(obj interface{}) string {
bytes, err := json.Marshal(obj)
if err != nil {
logger.Errorf("JSON convertation failed: %+v", err)
return ""
}
return string(bytes)
}
func (c *cluster) getClusterNodes() ([]rookv1alpha2.Node, error) {
if c.Spec.Storage.UseAllNodes {
c.Spec.Storage.Nodes = nil
// Resolve all storage nodes
......@@ -87,7 +95,7 @@ func (c *cluster) getClusterNodes() ([]rookalpha.Node, error) {
return nil, err
}
for nodeName := range allNodeDevices {
storageNode := rookalpha.Node{
storageNode := rookv1alpha2.Node{
Name: nodeName,
}
c.Spec.Storage.Nodes = append(c.Spec.Storage.Nodes, storageNode)
......@@ -98,197 +106,134 @@ func (c *cluster) getClusterNodes() ([]rookalpha.Node, error) {
return validNodes, nil
}
func (c *cluster) createDeploymentConfig(nodes []rookalpha.Node, dro edgefsv1beta1.DevicesResurrectOptions) (edgefsv1beta1.ClusterDeploymentConfig, error) {
deploymentConfig := edgefsv1beta1.ClusterDeploymentConfig{DevConfig: make(map[string]edgefsv1beta1.DevicesConfig, 0)}
// Fill deploymentConfig devices struct
for _, node := range nodes {
n := c.resolveNode(node.Name)
storeConfig := config.ToStoreConfig(n.Config)
if n == nil {
return deploymentConfig, fmt.Errorf("node %s did not resolve to start target", node.Name)
}
devicesConfig := edgefsv1beta1.DevicesConfig{}
devicesConfig.Rtrd.Devices = make([]edgefsv1beta1.RTDevice, 0)
devicesConfig.Rtlfs.Devices = make([]edgefsv1beta1.RtlfsDevice, 0)
// Apply Node's zone value
devicesConfig.Zone = storeConfig.Zone
// If node labeled as gateway then return empty devices and skip RTDevices detection
if c.isGatewayLabeledNode(c.context.Clientset, node.Name) {
devicesConfig.IsGatewayNode = true
deploymentConfig.DevConfig[node.Name] = devicesConfig
logger.Infof("Skipping node [%s] devices as labeled as gateway node", node.Name)
continue
}
// Skip device detection in case of 'restore' option.
// If dro.NeedToResurrect is true then there is no cluster's config map available
if dro.NeedToResurrect {
logger.Infof("Skipping node [%s] devices due 'restore' option", node.Name)
devicesConfig.Rtlfs.Devices = target.GetRtlfsDevices(c.Spec.Storage.Directories, &storeConfig)
if dro.SlaveContainers > 0 {
devicesConfig.RtrdSlaves = make([]edgefsv1beta1.RTDevices, dro.SlaveContainers)
}
deploymentConfig.DevConfig[node.Name] = devicesConfig
continue
}
rookSystemNS := os.Getenv(k8sutil.PodNamespaceEnvVar)
nodeDevices, _ := discover.ListDevices(c.context, rookSystemNS, n.Name)
logger.Infof("[%s] available devices: ", n.Name)
for _, dev := range nodeDevices[n.Name] {
logger.Infof("\tName: %s, Size: %s, Type: %s, Rotational: %t, Empty: %t", dev.Name, edgefsv1beta1.ByteCountBinary(dev.Size), dev.Type, dev.Rotational, dev.Empty)
}
// retrieveDeploymentConfig restore ClusterDeploymentConfig from cluster's Kubernetes ConfigMap
func (c *cluster) retrieveDeploymentConfig() (edgefsv1beta1.ClusterDeploymentConfig, error) {
availDevs, deviceErr := discover.GetAvailableDevices(c.context, n.Name, c.Namespace,
n.Devices, n.Selection.DeviceFilter, n.Selection.GetUseAllDevices())
if deviceErr != nil {
// Devices were specified but we couldn't find any.
// User needs to fix CRD.
return deploymentConfig, fmt.Errorf("failed to get devices for node %s cluster %s: %v",
n.Name, c.Namespace, deviceErr)
}
// Selects Disks from availDevs and translate to RTDevices
availDisks := []sys.LocalDisk{}
logger.Infof("[%s] selected devices: ", n.Name)
for _, dev := range availDevs {
for _, disk := range nodeDevices[n.Name] {
if disk.Name == dev.Name {
diskType := "SSD"
if disk.Rotational {
diskType = "HDD"
}
logger.Infof("\tName: %s, Type: %s, Size: %s", disk.Name, diskType, edgefsv1beta1.ByteCountBinary(disk.Size))
availDisks = append(availDisks, disk)
}
}
}
rtDevices, err := target.GetContainersRTDevices(n.Name, c.Spec.MaxContainerCapacity.Value(), availDisks, &storeConfig)
if err != nil {
logger.Warningf("Can't get rtDevices for node %s due %v", n.Name, err)
rtDevices = make([]edgefsv1beta1.RTDevices, 1)
}
if len(rtDevices) > 0 {
devicesConfig.Rtrd.Devices = rtDevices[0].Devices
// append to RtrdSlaves in case of additional containers
if len(rtDevices) > 1 {
devicesConfig.RtrdSlaves = make([]edgefsv1beta1.RTDevices, len(rtDevices)-1)
devicesConfig.RtrdSlaves = rtDevices[1:]
}
}
devicesConfig.Rtlfs.Devices = target.GetRtlfsDevices(c.Spec.Storage.Directories, &storeConfig)
deploymentConfig.DevConfig[node.Name] = devicesConfig
deploymentConfig := edgefsv1beta1.ClusterDeploymentConfig{
DevConfig: make(map[string]edgefsv1beta1.DevicesConfig, 0),
}
err := ValidateSlaveContainers(&deploymentConfig)
cm, err := c.context.Clientset.CoreV1().ConfigMaps(c.Namespace).Get(configName, metav1.GetOptions{})
if err != nil {
return deploymentConfig, err
}
if apierrs.IsNotFound(err) {
// When cluster config map doesn't exist, return config with empty DevicesConfig and current DeploymentType
deploymentType, err := c.getClusterDeploymentType()
if err != nil {
return deploymentConfig, err
}
err = ValidateZones(&deploymentConfig)
if err != nil {
deploymentConfig.DeploymentType = deploymentType
deploymentConfig.TransportKey = getClusterTransportKey(deploymentType)
return deploymentConfig, nil
}
return deploymentConfig, err
}
// Add Directories to deploymentConfig
clusterStorageConfig := config.ToStoreConfig(c.Spec.Storage.Config)
deploymentConfig.Directories = target.GetRtlfsDevices(c.Spec.Storage.Directories, &clusterStorageConfig)
if len(c.Spec.Storage.Directories) > 0 && (len(c.Spec.DataDirHostPath) > 0 || c.Spec.DataVolumeSize.Value() != 0) {
deploymentConfig.DeploymentType = edgefsv1beta1.DeploymentRtlfs
deploymentConfig.TransportKey = "rtlfs"
// Check directories devices count on all nodes
if len(c.Spec.Storage.Directories)*len(nodes) < 3 {
return deploymentConfig, fmt.Errorf("Rtlfs devices should be more then 3 on all nodes summary")
setup := map[string]edgefsv1beta1.SetupNode{}
if nesetup, ok := cm.Data["nesetup"]; ok {
err = json.Unmarshal([]byte(nesetup), &setup)
if err != nil {
logger.Errorf("invalid JSON in cluster configmap. %+v", err)
return deploymentConfig, fmt.Errorf("invalid JSON in cluster configmap. %+v", err)
}
} else if c.HasDevicesSpecification() && (len(c.Spec.DataDirHostPath) > 0 || c.Spec.DataVolumeSize.Value() != 0) {
deploymentTypeAchived := false
for nodeKey, nodeConfig := range setup {
devicesConfig := edgefsv1beta1.DevicesConfig{}
// Check all deployment nodes has available disk devices
devicesCount := 0
for nodeName, devCfg := range deploymentConfig.DevConfig {
devicesConfig.Rtlfs = nodeConfig.Rtlfs
devicesConfig.Rtrd = nodeConfig.Rtrd
devicesConfig.RtrdSlaves = nodeConfig.RtrdSlaves
if devCfg.IsGatewayNode {
continue
devicesConfig.IsGatewayNode = false
if nodeConfig.NodeType == "gateway" {
devicesConfig.IsGatewayNode = true
}
devicesConfig.Zone = nodeConfig.Ccowd.Zone
deploymentConfig.DevConfig[nodeKey] = devicesConfig
// we can't detect deployment type on gw node, move to next one
if !devicesConfig.IsGatewayNode && !deploymentTypeAchived {
if len(nodeConfig.Rtrd.Devices) > 0 {
deploymentConfig.DeploymentType = edgefsv1beta1.DeploymentRtrd
deploymentConfig.TransportKey = edgefsv1beta1.DeploymentRtrd
deploymentConfig.NeedPrivileges = true
} else if len(nodeConfig.Rtlfs.Devices) > 0 {
deploymentConfig.DeploymentType = edgefsv1beta1.DeploymentRtlfs
deploymentConfig.TransportKey = edgefsv1beta1.DeploymentRtlfs
} else if len(nodeConfig.RtlfsAutodetect) > 0 {
deploymentConfig.DeploymentType = edgefsv1beta1.DeploymentAutoRtlfs
deploymentConfig.TransportKey = edgefsv1beta1.DeploymentRtlfs
}
if len(devCfg.Rtrd.Devices) == 0 && !dro.NeedToResurrect {
return deploymentConfig, fmt.Errorf("Node %s has no available devices", nodeName)
// hostNetwork option specified
if nodeConfig.Ccowd.Network.ServerInterfaces != "eth0" {
deploymentConfig.NeedPrivileges = true
}
deploymentTypeAchived = true
}
devicesCount += len(devCfg.Rtrd.Devices)
}
// Check new deployment devices count
if !dro.NeedToResurrect && devicesCount < 3 {
return deploymentConfig, fmt.Errorf("Disk devices should be more then 3 on all nodes summary")
if deploymentConfig.DeploymentType == "" || deploymentConfig.TransportKey == "" {
return deploymentConfig, fmt.Errorf("Can't retrieve DeploymentConfig from config map. Unknown DeploymentType or TransportKey values")
}
deploymentConfig.DeploymentType = edgefsv1beta1.DeploymentRtrd
deploymentConfig.TransportKey = "rtrd"
deploymentConfig.NeedPrivileges = true
} else if len(c.Spec.DataDirHostPath) == 0 || c.Spec.DataVolumeSize.Value() == 0 {
deploymentConfig.DeploymentType = edgefsv1beta1.DeploymentAutoRtlfs
deploymentConfig.TransportKey = "rtlfs"
} else {
return deploymentConfig, fmt.Errorf("Unknown deployment type! Cluster spec:\n %+v", c)
}
// Set privileges==true in case of HostNetwork
if len(c.Spec.Network.ServerIfName) > 0 || len(c.Spec.Network.BrokerIfName) > 0 {
deploymentConfig.NeedPrivileges = true
return deploymentConfig, nil
}
func (c *cluster) PrintRTDevices(containerIndex int, rtDevices edgefsv1beta1.RTDevices) {
if len(rtDevices.Devices) == 0 {
logger.Infof("\t\tContainer[%d] Stub container. No devices assigned", containerIndex)
return
}
return deploymentConfig, nil
for _, device := range rtDevices.Devices {
logger.Infof("\t\tContainer[%d] Device: %s, Name: %s, Journal: %s", containerIndex, device.Device, device.Name, device.Journal)
}
}
// ValidateSlaveContainers validates containers count for each deployment node, container's count MUST be equal for for each node
func ValidateSlaveContainers(deploymentConfig *edgefsv1beta1.ClusterDeploymentConfig) error {
func (c *cluster) PrintRtlfsDevices(containerIndex int, rtlfsDevices edgefsv1beta1.RtlfsDevices) {
isFirstNode := true
prevNodeContainersCount := 0
nodeContainersCount := 0
for _, device := range rtlfsDevices.Devices {
logger.Infof("\t\tContainer[%d] Path: %s, Name: %s, MaxSize: %s", containerIndex, device.Path, device.Name, edgefsv1beta1.ByteCountBinary(device.Maxsize))
}
}
func (c *cluster) PrintDeploymentConfig(deploymentConfig *edgefsv1beta1.ClusterDeploymentConfig) {
logger.Infof("[%s] DeploymentConfig: ", c.Namespace)
logger.Infof("DeploymentType: %s", deploymentConfig.DeploymentType)
logger.Infof("TransportKey: %s", deploymentConfig.TransportKey)
logger.Infof("Directories: %+v", deploymentConfig.Directories)
logger.Infof("NeedPrivileges: %t", deploymentConfig.NeedPrivileges)
for nodeName, nodeDevConfig := range deploymentConfig.DevConfig {
// Skip GW node
logger.Infof("\tNode [%s] devices:", nodeName)
if nodeDevConfig.IsGatewayNode {
logger.Infof("\t\tContainer[0] Configured as Edgefs gateway. No devices selected")
continue
}
nodeContainersCount = len(nodeDevConfig.RtrdSlaves)
if isFirstNode {
prevNodeContainersCount = nodeContainersCount
isFirstNode = false
}
if nodeContainersCount != prevNodeContainersCount {
return fmt.Errorf("Node [%s] has different containers count %d then others nodes %d", nodeName, nodeContainersCount, prevNodeContainersCount)
}
}
return nil
}
// ValidateZones validates all nodes in cluster that each one has valid zone number or all of them has zone == 0
func ValidateZones(deploymentConfig *edgefsv1beta1.ClusterDeploymentConfig) error {
validZonesFound := 0
for _, nodeDevConfig := range deploymentConfig.DevConfig {
if nodeDevConfig.Zone > 0 {
validZonesFound = validZonesFound + 1
switch deploymentConfig.DeploymentType {
case edgefsv1beta1.DeploymentRtrd:
c.PrintRTDevices(0, nodeDevConfig.Rtrd)
for index, slaveDevices := range nodeDevConfig.RtrdSlaves {
c.PrintRTDevices(index+1, slaveDevices)
}
case edgefsv1beta1.DeploymentRtlfs:
c.PrintRtlfsDevices(0, nodeDevConfig.Rtlfs)
case edgefsv1beta1.DeploymentAutoRtlfs:
logger.Infof("\t\tContainer[0] Path: /mnt/disks/disk0")
logger.Infof("\t\tContainer[0] Path: /mnt/disks/disk1")
logger.Infof("\t\tContainer[0] Path: /mnt/disks/disk2")
default:
logger.Errorf("[%s] Unknown DeploymentType '%s'", c.Namespace, deploymentConfig.DeploymentType)
}
}
if validZonesFound > 0 && len(deploymentConfig.DevConfig) != validZonesFound {
return fmt.Errorf("Valid Zone number must be propagated to all nodes")
}
return nil
}
func (c *cluster) resolveNode(nodeName string) *rookalpha.Node {
func (c *cluster) resolveNode(nodeName string) *rookv1alpha2.Node {
// Fully resolve the storage config and resources for this node
rookNode := c.Spec.Storage.ResolveNode(nodeName)
if rookNode == nil {
......@@ -301,7 +246,7 @@ func (c *cluster) resolveNode(nodeName string) *rookalpha.Node {
rookNode.Resources = k8sutil.MergeResourceRequirements(rookNode.Resources, c.Spec.Resources)
// Ensure no invalid dirs are specified
var validDirs []rookalpha.Directory
var validDirs []rookv1alpha2.Directory
for _, dir := range rookNode.Directories {
if dir.Path == k8sutil.DataDir || dir.Path == c.Spec.DataDirHostPath {
logger.Warningf("skipping directory %s that would conflict with the dataDirHostPath", dir.Path)
......@@ -314,7 +259,11 @@ func (c *cluster) resolveNode(nodeName string) *rookalpha.Node {
return rookNode
}
func (c *cluster) AddLabelsToNode(cs clientset.Interface, nodeName string, labels map[string]string) error {
func (c *cluster) LabelTargetNode(nodeName string) {
c.AddLabelsToNode(nodeName, map[string]string{c.Namespace: "cluster"})
}
func (c *cluster) AddLabelsToNode(nodeName string, labels map[string]string) error {
tokens := make([]string, 0, len(labels))
for k, v := range labels {
tokens = append(tokens, "\""+k+"\":\""+v+"\"")
......@@ -323,7 +272,7 @@ func (c *cluster) AddLabelsToNode(cs clientset.Interface, nodeName string, label
patch := fmt.Sprintf(`{"metadata":{"labels":%v}}`, labelString)
var err error
for attempt := 0; attempt < labelingRetries; attempt++ {
_, err = cs.CoreV1().Nodes().Patch(nodeName, types.MergePatchType, []byte(patch))
_, err = c.context.Clientset.CoreV1().Nodes().Patch(nodeName, types.MergePatchType, []byte(patch))
if err != nil {
if !apierrs.IsConflict(err) {
return err
......@@ -336,13 +285,17 @@ func (c *cluster) AddLabelsToNode(cs clientset.Interface, nodeName string, label
return err
}
func (c *cluster) UnlabelTargetNode(nodeName string) {
c.RemoveLabelOffNode(nodeName, []string{c.Namespace})
}
// RemoveLabelOffNode is for cleaning up labels temporarily added to node,
// won't fail if target label doesn't exist or has been removed.
func (c *cluster) RemoveLabelOffNode(cs clientset.Interface, nodeName string, labelKeys []string) error {
func (c *cluster) RemoveLabelOffNode(nodeName string, labelKeys []string) error {
var node *v1.Node
var err error
for attempt := 0; attempt < labelingRetries; attempt++ {
node, err = cs.CoreV1().Nodes().Get(nodeName, metav1.GetOptions{})
node, err = c.context.Clientset.CoreV1().Nodes().Get(nodeName, metav1.GetOptions{})
if err != nil {
return err
}
......@@ -355,7 +308,7 @@ func (c *cluster) RemoveLabelOffNode(cs clientset.Interface, nodeName string, la
}
delete(node.Labels, labelKey)
}
_, err = cs.CoreV1().Nodes().Update(node)
_, err = c.context.Clientset.CoreV1().Nodes().Update(node)
if err != nil {
if !apierrs.IsConflict(err) {
return err
......@@ -399,6 +352,10 @@ func (c *cluster) getNodeLabels(cs clientset.Interface, nodeName string) (map[st
func (c *cluster) HasDevicesSpecification() bool {
if c.Spec.Storage.UseAllDevices != nil && *c.Spec.Storage.UseAllDevices {
return true
}
if len(c.Spec.Storage.DeviceFilter) > 0 || len(c.Spec.Storage.Devices) > 0 {
return true
}
......
......@@ -22,6 +22,7 @@ import (
"reflect"
"github.com/coreos/pkg/capnslog"
"github.com/google/go-cmp/cmp"
opkit "github.com/rook/operator-kit"
edgefsv1beta1 "github.com/rook/rook/pkg/apis/edgefs.rook.io/v1beta1"
rookalpha "github.com/rook/rook/pkg/apis/rook.io/v1alpha2"
......@@ -189,6 +190,26 @@ func (c *ISCSIController) ParentClusterChanged(cluster edgefsv1beta1.ClusterSpec
}
func serviceChanged(oldService, newService edgefsv1beta1.ISCSISpec) bool {
var diff string
if !reflect.DeepEqual(oldService, newService) {
func() {
defer func() {
if err := recover(); err != nil {
logger.Warningf("Encountered an issue getting ISCSI service change differences: %v", err)
}
}()
// resource.Quantity has non-exportable fields, so we use its comparator method
resourceQtyComparer := cmp.Comparer(func(x, y resource.Quantity) bool { return x.Cmp(y) == 0 })
diff = cmp.Diff(oldService, newService, resourceQtyComparer)
logger.Infof("The ISCSI Service has changed. diff=%s", diff)
}()
}
if len(diff) > 0 {
return true
}
return false
}
......
......@@ -22,6 +22,7 @@ import (
"reflect"
"github.com/coreos/pkg/capnslog"
"github.com/google/go-cmp/cmp"
opkit "github.com/rook/operator-kit"
edgefsv1beta1 "github.com/rook/rook/pkg/apis/edgefs.rook.io/v1beta1"
rookalpha "github.com/rook/rook/pkg/apis/rook.io/v1alpha2"
......@@ -189,6 +190,26 @@ func (c *ISGWController) ParentClusterChanged(cluster edgefsv1beta1.ClusterSpec)
}
func serviceChanged(oldService, newService edgefsv1beta1.ISGWSpec) bool {
var diff string
if !reflect.DeepEqual(oldService, newService) {
func() {
defer func() {
if err := recover(); err != nil {
logger.Warningf("Encountered an issue getting ISGW service change differences: %v", err)
}
}()
// resource.Quantity has non-exportable fields, so we use its comparator method
resourceQtyComparer := cmp.Comparer(func(x, y resource.Quantity) bool { return x.Cmp(y) == 0 })
diff = cmp.Diff(oldService, newService, resourceQtyComparer)
logger.Infof("The ISGW Service has changed. diff=%s", diff)
}()
}
if len(diff) > 0 {
return true
}
return false
}
......
......@@ -22,6 +22,7 @@ import (
"reflect"
"github.com/coreos/pkg/capnslog"
"github.com/google/go-cmp/cmp"
opkit "github.com/rook/operator-kit"
edgefsv1beta1 "github.com/rook/rook/pkg/apis/edgefs.rook.io/v1beta1"
rookalpha "github.com/rook/rook/pkg/apis/rook.io/v1alpha2"
......@@ -188,6 +189,28 @@ func (c *NFSController) serviceOwners(service *edgefsv1beta1.NFS) []metav1.Owner
}
func serviceChanged(oldService, newService edgefsv1beta1.NFSSpec) bool {
var diff string
// any change in the crd will trigger an orchestration
if !reflect.DeepEqual(oldService, newService) {
func() {
defer func() {
if err := recover(); err != nil {
logger.Warningf("Encountered an issue getting nfs service change differences: %v", err)
}
}()
// resource.Quantity has non-exportable fields, so we use its comparator method
resourceQtyComparer := cmp.Comparer(func(x, y resource.Quantity) bool { return x.Cmp(y) == 0 })
diff = cmp.Diff(oldService, newService, resourceQtyComparer)
logger.Infof("The NFS Service has changed. diff=%s", diff)
}()
}
if len(diff) > 0 {
return true
}
return false
}
......
......@@ -22,6 +22,7 @@ import (
"reflect"
"github.com/coreos/pkg/capnslog"
"github.com/google/go-cmp/cmp"
opkit "github.com/rook/operator-kit"
edgefsv1beta1 "github.com/rook/rook/pkg/apis/edgefs.rook.io/v1beta1"
rookalpha "github.com/rook/rook/pkg/apis/rook.io/v1alpha2"
......@@ -189,6 +190,26 @@ func (c *S3Controller) ParentClusterChanged(cluster edgefsv1beta1.ClusterSpec) {
}
func serviceChanged(oldService, newService edgefsv1beta1.S3Spec) bool {
var diff string
if !reflect.DeepEqual(oldService, newService) {
func() {
defer func() {
if err := recover(); err != nil {
logger.Warningf("Encountered an issue getting S3 service change differences: %v", err)
}
}()
// resource.Quantity has non-exportable fields, so we use its comparator method
resourceQtyComparer := cmp.Comparer(func(x, y resource.Quantity) bool { return x.Cmp(y) == 0 })
diff = cmp.Diff(oldService, newService, resourceQtyComparer)
logger.Infof("The S3 Service has changed. diff=%s", diff)
}()
}
if len(diff) > 0 {
return true
}
return false
}
......
......@@ -22,6 +22,7 @@ import (
"reflect"
"github.com/coreos/pkg/capnslog"
"github.com/google/go-cmp/cmp"
opkit "github.com/rook/operator-kit"
edgefsv1beta1 "github.com/rook/rook/pkg/apis/edgefs.rook.io/v1beta1"
rookalpha "github.com/rook/rook/pkg/apis/rook.io/v1alpha2"
......@@ -189,6 +190,26 @@ func (c *S3XController) ParentClusterChanged(cluster edgefsv1beta1.ClusterSpec)
}
func serviceChanged(oldService, newService edgefsv1beta1.S3XSpec) bool {
var diff string
if !reflect.DeepEqual(oldService, newService) {
func() {
defer func() {
if err := recover(); err != nil {
logger.Warningf("Encountered an issue getting S3X service change differences: %v", err)
}
}()
// resource.Quantity has non-exportable fields, so we use its comparator method
resourceQtyComparer := cmp.Comparer(func(x, y resource.Quantity) bool { return x.Cmp(y) == 0 })
diff = cmp.Diff(oldService, newService, resourceQtyComparer)
logger.Infof("The S3X Service has changed. diff=%s", diff)
}()
}
if len(diff) > 0 {
return true
}
return false
}
......
......@@ -22,6 +22,7 @@ import (
"reflect"
"github.com/coreos/pkg/capnslog"
"github.com/google/go-cmp/cmp"
opkit "github.com/rook/operator-kit"
edgefsv1beta1 "github.com/rook/rook/pkg/apis/edgefs.rook.io/v1beta1"
rookalpha "github.com/rook/rook/pkg/apis/rook.io/v1alpha2"
......@@ -188,6 +189,26 @@ func (c *SWIFTController) ParentClusterChanged(cluster edgefsv1beta1.ClusterSpec
}
func serviceChanged(oldService, newService edgefsv1beta1.SWIFTSpec) bool {
var diff string
if !reflect.DeepEqual(oldService, newService) {
func() {
defer func() {
if err := recover(); err != nil {
logger.Warningf("Encountered an issue getting SWIFT service change differences: %v", err)
}
}()
// resource.Quantity has non-exportable fields, so we use its comparator method
resourceQtyComparer := cmp.Comparer(func(x, y resource.Quantity) bool { return x.Cmp(y) == 0 })
diff = cmp.Diff(oldService, newService, resourceQtyComparer)
logger.Infof("The SWIFT Service has changed. diff=%s", diff)
}()
}
if len(diff) > 0 {
return true
}
return false
}
......
......@@ -173,7 +173,7 @@ spec:
serviceAccountName: rook-edgefs-system
containers:
- name: rook-edgefs-operator
image: edgefs/edgefs-operator:v1beta1 #JUST FOR TRANSITION TO Beta1. MUST BE CHANGED TO rook/edgefs:master after merge
image: rook/edgefs:master
imagePullPolicy: "Always"
args: ["edgefs", "operator"]
env:
......
Supports Markdown
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment