Commit 36d52172 authored by galal-hussein's avatar galal-hussein Committed by Alena Prokharchyk
Browse files

Vendor Update

parent 40b771ec
Showing with 509 additions and 13 deletions
+509 -13
......@@ -33,12 +33,12 @@ github.com/smartystreets/go-aws-auth 8ef1316913ee4f44bc48c2456e44a5c1c6
github.com/mcuadros/go-version 6d5863ca60fa6fe914b5fd43ed8533d7567c5b0b
github.com/rancher/rdns-server bf662911db6acce4d6a85d2878653f68413b9176
github.com/rancher/types b9117fe98cfc2ea60ebe278c87d289f571fa1677
github.com/rancher/types 3695949a4c95d43a2255287f161faff577b6c0aa
github.com/rancher/norman 39d1bb2830de46f8ef2d1d87c6d315fe1676413a
github.com/rancher/kontainer-engine 96aa9f983324f808c1da73d83d18e411b7e26f7d
github.com/rancher/rke c32c261fea171faa02c03292819df500da60e67e
github.com/rancher/rke 84354d4d0b89ee7732086539dc61051cce56198e
gopkg.in/ldap.v2 v2.5.0
gopkg.in/asn1-ber.v1 v1.1
......
......@@ -93,7 +93,7 @@ func SetUpAuthentication(ctx context.Context, kubeCluster, currentCluster *Clust
}
log.Infof(ctx, "[certificates] Temporarily saving certs to [%s] hosts", backupPlane)
if err := deployBackupCertificates(ctx, backupHosts, kubeCluster); err != nil {
if err := deployBackupCertificates(ctx, backupHosts, kubeCluster, false); err != nil {
return err
}
log.Infof(ctx, "[certificates] Saved certs to [%s] hosts", backupPlane)
......@@ -222,13 +222,13 @@ func saveCertToKubernetes(kubeClient *kubernetes.Clientset, crtName string, crt
}
}
func deployBackupCertificates(ctx context.Context, backupHosts []*hosts.Host, kubeCluster *Cluster) error {
func deployBackupCertificates(ctx context.Context, backupHosts []*hosts.Host, kubeCluster *Cluster, force bool) error {
var errgrp errgroup.Group
for _, host := range backupHosts {
runHost := host
errgrp.Go(func() error {
return pki.DeployCertificatesOnHost(ctx, runHost, kubeCluster.Certificates, kubeCluster.SystemImages.CertDownloader, pki.TempCertPath, kubeCluster.PrivateRegistriesMap)
return pki.DeployCertificatesOnHost(ctx, runHost, kubeCluster.Certificates, kubeCluster.SystemImages.CertDownloader, pki.TempCertPath, kubeCluster.PrivateRegistriesMap, force)
})
}
return errgrp.Wait()
......
......@@ -107,7 +107,7 @@ func (c *Cluster) InvertIndexHosts() error {
return nil
}
func (c *Cluster) SetUpHosts(ctx context.Context) error {
func (c *Cluster) SetUpHosts(ctx context.Context, force bool) error {
if c.Authentication.Strategy == X509AuthenticationProvider {
log.Infof(ctx, "[certificates] Deploying kubernetes certificates to Cluster nodes")
hosts := hosts.GetUniqueHostList(c.EtcdHosts, c.ControlPlaneHosts, c.WorkerHosts)
......@@ -116,7 +116,7 @@ func (c *Cluster) SetUpHosts(ctx context.Context) error {
for _, host := range hosts {
runHost := host
errgrp.Go(func() error {
return pki.DeployCertificatesOnPlaneHost(ctx, runHost, c.RancherKubernetesEngineConfig, c.Certificates, c.SystemImages.CertDownloader, c.PrivateRegistriesMap)
return pki.DeployCertificatesOnPlaneHost(ctx, runHost, c.RancherKubernetesEngineConfig, c.Certificates, c.SystemImages.CertDownloader, c.PrivateRegistriesMap, force)
})
}
if err := errgrp.Wait(); err != nil {
......
......@@ -189,7 +189,7 @@ func reconcileEtcd(ctx context.Context, currentCluster, kubeCluster *Cluster, ku
currentCluster.Certificates = crtMap
for _, etcdHost := range etcdToAdd {
// deploy certificates on new etcd host
if err := pki.DeployCertificatesOnHost(ctx, etcdHost, currentCluster.Certificates, kubeCluster.SystemImages.CertDownloader, pki.CertPathPrefix, kubeCluster.PrivateRegistriesMap); err != nil {
if err := pki.DeployCertificatesOnHost(ctx, etcdHost, currentCluster.Certificates, kubeCluster.SystemImages.CertDownloader, pki.CertPathPrefix, kubeCluster.PrivateRegistriesMap, false); err != nil {
return err
}
......
package cluster
import (
"context"
"fmt"
"github.com/rancher/rke/hosts"
"github.com/rancher/rke/log"
"github.com/rancher/rke/pki"
)
func RotateRKECertificates(ctx context.Context, kubeCluster, currentCluster *Cluster) error {
if kubeCluster.Authentication.Strategy == X509AuthenticationProvider {
var err error
backupPlane := fmt.Sprintf("%s,%s", EtcdPlane, ControlPlane)
backupHosts := hosts.GetUniqueHostList(kubeCluster.EtcdHosts, kubeCluster.ControlPlaneHosts, nil)
if currentCluster != nil {
kubeCluster.Certificates = currentCluster.Certificates
// this is the case of handling upgrades for API server aggregation layer ca cert and API server proxy client key and cert
if kubeCluster.Certificates[pki.RequestHeaderCACertName].Certificate == nil {
kubeCluster.Certificates, err = regenerateAPIAggregationCerts(kubeCluster, kubeCluster.Certificates)
if err != nil {
return fmt.Errorf("Failed to regenerate Aggregation layer certificates %v", err)
}
}
} else {
log.Infof(ctx, "[certificates] Attempting to recover certificates from backup on [%s] hosts", backupPlane)
kubeCluster.Certificates, err = fetchBackupCertificates(ctx, backupHosts, kubeCluster)
if err != nil {
return err
}
if kubeCluster.Certificates != nil {
log.Infof(ctx, "[certificates] Certificate backup found on [%s] hosts", backupPlane)
// make sure I have all the etcd certs, We need handle dialer failure for etcd nodes https://github.com/rancher/rancher/issues/12898
for _, host := range kubeCluster.EtcdHosts {
certName := pki.GetEtcdCrtName(host.InternalAddress)
if kubeCluster.Certificates[certName].Certificate == nil {
if kubeCluster.Certificates, err = pki.RegenerateEtcdCertificate(ctx,
kubeCluster.Certificates,
host,
kubeCluster.EtcdHosts,
kubeCluster.ClusterDomain,
kubeCluster.KubernetesServiceIP); err != nil {
return err
}
}
}
// this is the case of adding controlplane node on empty cluster with only etcd nodes
if kubeCluster.Certificates[pki.KubeAdminCertName].Config == "" && len(kubeCluster.ControlPlaneHosts) > 0 {
if err := rebuildLocalAdminConfig(ctx, kubeCluster); err != nil {
return err
}
kubeCluster.Certificates, err = regenerateAPICertificate(kubeCluster, kubeCluster.Certificates)
if err != nil {
return fmt.Errorf("Failed to regenerate KubeAPI certificate %v", err)
}
}
// this is the case of handling upgrades for API server aggregation layer ca cert and API server proxy client key and cert
if kubeCluster.Certificates[pki.RequestHeaderCACertName].Certificate == nil {
kubeCluster.Certificates, err = regenerateAPIAggregationCerts(kubeCluster, kubeCluster.Certificates)
if err != nil {
return fmt.Errorf("Failed to regenerate Aggregation layer certificates %v", err)
}
}
}
}
log.Infof(ctx, "[certificates] Rotating RKE certificates")
kubeCluster.Certificates, err = pki.RotateRKECerts(ctx, kubeCluster.Certificates, kubeCluster.RancherKubernetesEngineConfig, kubeCluster.LocalKubeConfigPath, "")
if err != nil {
return fmt.Errorf("Failed to generate Kubernetes certificates: %v", err)
}
log.Infof(ctx, "[certificates] Temporarily saving certs to [%s] hosts", backupPlane)
if err := deployBackupCertificates(ctx, backupHosts, kubeCluster, true); err != nil {
return err
}
log.Infof(ctx, "[certificates] Saved certs to [%s] hosts", backupPlane)
}
return nil
}
package cmd
import (
"context"
"fmt"
"github.com/rancher/rke/cluster"
"github.com/rancher/rke/hosts"
"github.com/rancher/rke/k8s"
"github.com/rancher/rke/log"
"github.com/rancher/rke/pki"
"github.com/rancher/rke/services"
"github.com/rancher/types/apis/management.cattle.io/v3"
"github.com/urfave/cli"
"k8s.io/client-go/util/cert"
)
func CertCommand() cli.Command {
certFlags := []cli.Flag{
cli.StringFlag{
Name: "config",
Usage: "Specify an alternate cluster YAML file",
Value: pki.ClusterConfig,
EnvVar: "RKE_CONFIG",
},
}
certFlags = append(certFlags, commonFlags...)
return cli.Command{
Name: "cert-rotate",
Usage: "rotate all certificates without ca",
Action: clusterRotateFromCli,
Flags: certFlags,
}
}
func ClusterRotate(
ctx context.Context,
rkeConfig *v3.RancherKubernetesEngineConfig,
dockerDialerFactory, localConnDialerFactory hosts.DialerFactory,
k8sWrapTransport k8s.WrapTransport, configDir string) (string, string, string, string, map[string]pki.CertificatePKI, error) {
log.Infof(ctx, "Rotating Kubernetes cluster certificates")
var APIURL, caCrt, clientCert, clientKey string
kubeCluster, err := cluster.ParseCluster(ctx, rkeConfig, clusterFilePath, configDir, dockerDialerFactory, localConnDialerFactory, k8sWrapTransport)
if err != nil {
return APIURL, caCrt, clientCert, clientKey, nil, err
}
if err := kubeCluster.TunnelHosts(ctx, false); err != nil {
return APIURL, caCrt, clientCert, clientKey, nil, err
}
currentCluster, err := kubeCluster.GetClusterState(ctx)
if err != nil {
return APIURL, caCrt, clientCert, clientKey, nil, err
}
if currentCluster == nil {
log.Infof(ctx, "This is a newely built cluster, can't rotate certificates")
return APIURL, caCrt, clientCert, clientKey, nil, nil
}
if err := cluster.RotateRKECertificates(ctx, kubeCluster, currentCluster); err != nil {
return APIURL, caCrt, clientCert, clientKey, nil, err
}
if err := kubeCluster.SetUpHosts(ctx, true); err != nil {
return APIURL, caCrt, clientCert, clientKey, nil, err
}
if err := services.RestartControlPlane(ctx, kubeCluster.ControlPlaneHosts); err != nil {
return APIURL, caCrt, clientCert, clientKey, nil, err
}
allHosts := hosts.GetUniqueHostList(kubeCluster.EtcdHosts, kubeCluster.ControlPlaneHosts, kubeCluster.WorkerHosts)
if err := services.RestartWorkerPlane(ctx, allHosts); err != nil {
return APIURL, caCrt, clientCert, clientKey, nil, err
}
err = kubeCluster.SaveClusterState(ctx, &kubeCluster.RancherKubernetesEngineConfig)
if err != nil {
return APIURL, caCrt, clientCert, clientKey, nil, err
}
APIURL = fmt.Sprintf("https://" + kubeCluster.ControlPlaneHosts[0].Address + ":6443")
clientCert = string(cert.EncodeCertPEM(kubeCluster.Certificates[pki.KubeAdminCertName].Certificate))
clientKey = string(cert.EncodePrivateKeyPEM(kubeCluster.Certificates[pki.KubeAdminCertName].Key))
caCrt = string(cert.EncodeCertPEM(kubeCluster.Certificates[pki.CACertName].Certificate))
if err := checkAllIncluded(kubeCluster); err != nil {
return APIURL, caCrt, clientCert, clientKey, nil, err
}
log.Infof(ctx, "Finished Rotating Kubernetes cluster certificates successfully")
return APIURL, caCrt, clientCert, clientKey, kubeCluster.Certificates, nil
}
func clusterRotateFromCli(ctx *cli.Context) error {
clusterFile, filePath, err := resolveClusterFile(ctx)
if err != nil {
return fmt.Errorf("Failed to resolve cluster file: %v", err)
}
clusterFilePath = filePath
rkeConfig, err := cluster.ParseConfig(clusterFile)
if err != nil {
return fmt.Errorf("Failed to parse cluster file: %v", err)
}
rkeConfig, err = setOptionsFromCLI(ctx, rkeConfig)
if err != nil {
return err
}
_, _, _, _, _, err = ClusterRotate(context.Background(), rkeConfig, nil, nil, nil, "")
return err
}
......@@ -75,6 +75,9 @@ func ClusterUp(
if err != nil {
return APIURL, caCrt, clientCert, clientKey, nil, err
}
if kubeCluster.RancherKubernetesEngineConfig.RotateCertificates != nil {
return ClusterRotate(ctx, rkeConfig, dockerDialerFactory, localConnDialerFactory, k8sWrapTransport, configDir)
}
err = kubeCluster.TunnelHosts(ctx, local)
if err != nil {
......@@ -101,7 +104,7 @@ func ClusterUp(
return APIURL, caCrt, clientCert, clientKey, nil, err
}
err = kubeCluster.SetUpHosts(ctx)
err = kubeCluster.SetUpHosts(ctx, false)
if err != nil {
return APIURL, caCrt, clientCert, clientKey, nil, err
}
......
......@@ -380,7 +380,7 @@ func GetContainerLogsStdoutStderr(ctx context.Context, dClient *client.Client, c
var containerLog string
clogs, logserr := ReadContainerLogs(ctx, dClient, containerName, follow, tail)
if logserr != nil {
logrus.Debug("logserr: %v", logserr)
logrus.Debugf("logserr: %v", logserr)
return containerLog, fmt.Errorf("Failed to get gather logs from container [%s]: %v", containerName, logserr)
}
defer clogs.Close()
......@@ -435,3 +435,38 @@ func isContainerEnvChanged(containerEnv, imageConfigEnv, dockerfileEnv []string)
allImageEnv := append(imageConfigEnv, dockerfileEnv...)
return sliceEqualsIgnoreOrder(allImageEnv, containerEnv)
}
func DoRestartContainer(ctx context.Context, dClient *client.Client, containerName, hostname string) error {
if dClient == nil {
return fmt.Errorf("Failed to restart container: docker client is nil for container [%s] on host [%s]", containerName, hostname)
}
logrus.Debugf("[restart/%s] Checking if container is running on host [%s]", containerName, hostname)
// not using the wrapper to check if the error is a NotFound error
_, err := dClient.ContainerInspect(ctx, containerName)
if err != nil {
if client.IsErrNotFound(err) {
logrus.Debugf("[restart/%s] Container doesn't exist on host [%s]", containerName, hostname)
return nil
}
return err
}
logrus.Debugf("[restart/%s] Restarting container on host [%s]", containerName, hostname)
err = RestartContainer(ctx, dClient, hostname, containerName)
if err != nil {
return err
}
log.Infof(ctx, "[restart/%s] Successfully restarted container on host [%s]", containerName, hostname)
return nil
}
func RestartContainer(ctx context.Context, dClient *client.Client, hostname, containerName string) error {
if dClient == nil {
return fmt.Errorf("Failed to restart container: docker client is nil for container [%s] on host [%s]", containerName, hostname)
}
restartTimeout := RestartTimeout * time.Second
err := dClient.ContainerRestart(ctx, containerName, &restartTimeout)
if err != nil {
return fmt.Errorf("Can't restart Docker container [%s] for host [%s]: %v", containerName, hostname, err)
}
return nil
}
......@@ -20,12 +20,15 @@ import (
"k8s.io/client-go/util/cert"
)
func DeployCertificatesOnPlaneHost(ctx context.Context, host *hosts.Host, rkeConfig v3.RancherKubernetesEngineConfig, crtMap map[string]CertificatePKI, certDownloaderImage string, prsMap map[string]v3.PrivateRegistry) error {
func DeployCertificatesOnPlaneHost(ctx context.Context, host *hosts.Host, rkeConfig v3.RancherKubernetesEngineConfig, crtMap map[string]CertificatePKI, certDownloaderImage string, prsMap map[string]v3.PrivateRegistry, force bool) error {
crtBundle := GenerateRKENodeCerts(ctx, rkeConfig, host.Address, crtMap)
env := []string{}
for _, crt := range crtBundle {
env = append(env, crt.ToEnv()...)
}
if force {
env = append(env, "FORCE_DEPLOY=true")
}
return doRunDeployer(ctx, host, env, certDownloaderImage, prsMap)
}
......@@ -102,10 +105,13 @@ func RemoveAdminConfig(ctx context.Context, localConfigPath string) {
log.Infof(ctx, "Local admin Kubeconfig removed successfully")
}
func DeployCertificatesOnHost(ctx context.Context, host *hosts.Host, crtMap map[string]CertificatePKI, certDownloaderImage, certPath string, prsMap map[string]v3.PrivateRegistry) error {
func DeployCertificatesOnHost(ctx context.Context, host *hosts.Host, crtMap map[string]CertificatePKI, certDownloaderImage, certPath string, prsMap map[string]v3.PrivateRegistry, force bool) error {
env := []string{
"CRTS_DEPLOY_PATH=" + certPath,
}
if force {
env = append(env, "FORCE_DEPLOY=true")
}
for _, crt := range crtMap {
env = append(env, crt.ToEnv()...)
}
......
package pki
import (
"context"
"crypto/rsa"
"fmt"
"github.com/rancher/rke/hosts"
"github.com/rancher/rke/log"
"github.com/rancher/types/apis/management.cattle.io/v3"
"k8s.io/client-go/util/cert"
)
func RotateRKECerts(ctx context.Context, certs map[string]CertificatePKI, rkeConfig v3.RancherKubernetesEngineConfig, configPath, configDir string) (map[string]CertificatePKI, error) {
caCrt := certs[CACertName].Certificate
caKey := certs[CACertName].Key
if caCrt == nil || caKey == nil {
return certs, fmt.Errorf("CA cert or key is not found")
}
// generate API certificate and key
log.Infof(ctx, "[certificates] Generating Kubernetes API server certificates")
kubernetesServiceIP, err := GetKubernetesServiceIP(rkeConfig.Services.KubeAPI.ServiceClusterIPRange)
clusterDomain := rkeConfig.Services.Kubelet.ClusterDomain
cpHosts := hosts.NodesToHosts(rkeConfig.Nodes, controlRole)
etcdHosts := hosts.NodesToHosts(rkeConfig.Nodes, etcdRole)
if err != nil {
return nil, fmt.Errorf("Failed to get Kubernetes Service IP: %v", err)
}
kubeAPIAltNames := GetAltNames(cpHosts, clusterDomain, kubernetesServiceIP, rkeConfig.Authentication.SANs)
kubeAPICrt, kubeAPIKey, err := GenerateSignedCertAndKey(caCrt, caKey, true, KubeAPICertName, kubeAPIAltNames, certs[KubeAPICertName].Key, nil)
if err != nil {
return nil, err
}
certs[KubeAPICertName] = ToCertObject(KubeAPICertName, "", "", kubeAPICrt, kubeAPIKey)
// generate Kube controller-manager certificate and key
log.Infof(ctx, "[certificates] Generating Kube Controller certificates")
kubeControllerCrt, kubeControllerKey, err := GenerateSignedCertAndKey(caCrt, caKey, false, getDefaultCN(KubeControllerCertName), nil, nil, nil)
if err != nil {
return nil, err
}
certs[KubeControllerCertName] = ToCertObject(KubeControllerCertName, "", "", kubeControllerCrt, kubeControllerKey)
// generate Kube scheduler certificate and key
log.Infof(ctx, "[certificates] Generating Kube Scheduler certificates")
kubeSchedulerCrt, kubeSchedulerKey, err := GenerateSignedCertAndKey(caCrt, caKey, false, getDefaultCN(KubeSchedulerCertName), nil, nil, nil)
if err != nil {
return nil, err
}
certs[KubeSchedulerCertName] = ToCertObject(KubeSchedulerCertName, "", "", kubeSchedulerCrt, kubeSchedulerKey)
// generate Kube Proxy certificate and key
log.Infof(ctx, "[certificates] Generating Kube Proxy certificates")
kubeProxyCrt, kubeProxyKey, err := GenerateSignedCertAndKey(caCrt, caKey, false, getDefaultCN(KubeProxyCertName), nil, nil, nil)
if err != nil {
return nil, err
}
certs[KubeProxyCertName] = ToCertObject(KubeProxyCertName, "", "", kubeProxyCrt, kubeProxyKey)
log.Infof(ctx, "[certificates] Generating Node certificate")
nodeCrt, nodeKey, err := GenerateSignedCertAndKey(caCrt, caKey, false, KubeNodeCommonName, nil, nil, []string{KubeNodeOrganizationName})
if err != nil {
return nil, err
}
certs[KubeNodeCertName] = ToCertObject(KubeNodeCertName, KubeNodeCommonName, KubeNodeOrganizationName, nodeCrt, nodeKey)
// generate Admin certificate and key
log.Infof(ctx, "[certificates] Generating admin certificates and kubeconfig")
if len(configPath) == 0 {
configPath = ClusterConfig
}
localKubeConfigPath := GetLocalKubeConfig(configPath, configDir)
kubeAdminCrt, kubeAdminKey, err := GenerateSignedCertAndKey(caCrt, caKey, false, KubeAdminCertName, nil, nil, []string{KubeAdminOrganizationName})
if err != nil {
return nil, err
}
kubeAdminCertObj := ToCertObject(KubeAdminCertName, KubeAdminCertName, KubeAdminOrganizationName, kubeAdminCrt, kubeAdminKey)
if len(cpHosts) > 0 {
kubeAdminConfig := GetKubeConfigX509WithData(
"https://"+cpHosts[0].Address+":6443",
rkeConfig.ClusterName,
KubeAdminCertName,
string(cert.EncodeCertPEM(caCrt)),
string(cert.EncodeCertPEM(kubeAdminCrt)),
string(cert.EncodePrivateKeyPEM(kubeAdminKey)))
kubeAdminCertObj.Config = kubeAdminConfig
kubeAdminCertObj.ConfigPath = localKubeConfigPath
} else {
kubeAdminCertObj.Config = ""
}
certs[KubeAdminCertName] = kubeAdminCertObj
// generate etcd certificate and key
if len(rkeConfig.Services.Etcd.ExternalURLs) > 0 {
clientCert, err := cert.ParseCertsPEM([]byte(rkeConfig.Services.Etcd.Cert))
if err != nil {
return nil, err
}
clientKey, err := cert.ParsePrivateKeyPEM([]byte(rkeConfig.Services.Etcd.Key))
if err != nil {
return nil, err
}
certs[EtcdClientCertName] = ToCertObject(EtcdClientCertName, "", "", clientCert[0], clientKey.(*rsa.PrivateKey))
caCert, err := cert.ParseCertsPEM([]byte(rkeConfig.Services.Etcd.CACert))
if err != nil {
return nil, err
}
certs[EtcdClientCACertName] = ToCertObject(EtcdClientCACertName, "", "", caCert[0], nil)
}
etcdAltNames := GetAltNames(etcdHosts, clusterDomain, kubernetesServiceIP, []string{})
for _, host := range etcdHosts {
log.Infof(ctx, "[certificates] Generating etcd-%s certificate and key", host.InternalAddress)
etcdCrt, etcdKey, err := GenerateSignedCertAndKey(caCrt, caKey, true, EtcdCertName, etcdAltNames, nil, nil)
if err != nil {
return nil, err
}
etcdName := GetEtcdCrtName(host.InternalAddress)
certs[etcdName] = ToCertObject(etcdName, "", "", etcdCrt, etcdKey)
}
requestHeaderCACrt := certs[RequestHeaderCACertName].Certificate
requestHeaderCAKey := certs[RequestHeaderCACertName].Key
if requestHeaderCACrt == nil || requestHeaderCAKey == nil {
return nil, fmt.Errorf("Request Header CA certificate or key not found")
}
//generate API server proxy client key and certs
log.Infof(ctx, "[certificates] Generating Kubernetes API server proxy client certificates")
apiserverProxyClientCrt, apiserverProxyClientKey, err := GenerateSignedCertAndKey(requestHeaderCACrt, requestHeaderCAKey, true, APIProxyClientCertName, nil, nil, nil)
if err != nil {
return nil, err
}
certs[APIProxyClientCertName] = ToCertObject(APIProxyClientCertName, "", "", apiserverProxyClientCrt, apiserverProxyClientKey)
return certs, nil
}
......@@ -6,6 +6,7 @@ import (
"github.com/rancher/rke/hosts"
"github.com/rancher/rke/log"
"github.com/rancher/rke/pki"
"github.com/rancher/rke/util"
"github.com/rancher/types/apis/management.cattle.io/v3"
"golang.org/x/sync/errgroup"
)
......@@ -91,3 +92,39 @@ func doDeployControlHost(ctx context.Context, host *hosts.Host, localConnDialerF
// run scheduler
return runScheduler(ctx, host, localConnDialerFactory, prsMap, processMap[SchedulerContainerName], alpineImage)
}
func RestartControlPlane(ctx context.Context, controlHosts []*hosts.Host) error {
log.Infof(ctx, "[%s] Restarting the Controller Plane..", ControlRole)
var errgrp errgroup.Group
hostsQueue := util.GetObjectQueue(controlHosts)
for w := 0; w < 10; w++ {
errgrp.Go(func() error {
var errList []error
for host := range hostsQueue {
runHost := host.(*hosts.Host)
// restart KubeAPI
if err := RestartKubeAPI(ctx, runHost); err != nil {
errList = append(errList, err)
}
// restart KubeController
if err := RestartKubeController(ctx, runHost); err != nil {
errList = append(errList, err)
}
// restart scheduler
err := RestartScheduler(ctx, runHost)
if err != nil {
errList = append(errList, err)
}
}
return util.ErrList(errList)
})
}
if err := errgrp.Wait(); err != nil {
return err
}
log.Infof(ctx, "[%s] Successfully restarted Controller Plane..", ControlRole)
return nil
}
......@@ -24,3 +24,7 @@ func runKubeAPI(ctx context.Context, host *hosts.Host, df hosts.DialerFactory, p
func removeKubeAPI(ctx context.Context, host *hosts.Host) error {
return docker.DoRemoveContainer(ctx, host.DClient, KubeAPIContainerName, host.Address)
}
func RestartKubeAPI(ctx context.Context, host *hosts.Host) error {
return docker.DoRestartContainer(ctx, host.DClient, KubeAPIContainerName, host.Address)
}
......@@ -22,3 +22,7 @@ func runKubeController(ctx context.Context, host *hosts.Host, df hosts.DialerFac
func removeKubeController(ctx context.Context, host *hosts.Host) error {
return docker.DoRemoveContainer(ctx, host.DClient, KubeControllerContainerName, host.Address)
}
func RestartKubeController(ctx context.Context, host *hosts.Host) error {
return docker.DoRestartContainer(ctx, host.DClient, KubeControllerContainerName, host.Address)
}
......@@ -23,3 +23,7 @@ func runKubelet(ctx context.Context, host *hosts.Host, df hosts.DialerFactory, p
func removeKubelet(ctx context.Context, host *hosts.Host) error {
return docker.DoRemoveContainer(ctx, host.DClient, KubeletContainerName, host.Address)
}
func RestartKubelet(ctx context.Context, host *hosts.Host) error {
return docker.DoRestartContainer(ctx, host.DClient, KubeletContainerName, host.Address)
}
......@@ -22,3 +22,7 @@ func runKubeproxy(ctx context.Context, host *hosts.Host, df hosts.DialerFactory,
func removeKubeproxy(ctx context.Context, host *hosts.Host) error {
return docker.DoRemoveContainer(ctx, host.DClient, KubeproxyContainerName, host.Address)
}
func RestartKubeproxy(ctx context.Context, host *hosts.Host) error {
return docker.DoRestartContainer(ctx, host.DClient, KubeproxyContainerName, host.Address)
}
......@@ -24,3 +24,7 @@ func runNginxProxy(ctx context.Context, host *hosts.Host, prsMap map[string]v3.P
func removeNginxProxy(ctx context.Context, host *hosts.Host) error {
return docker.DoRemoveContainer(ctx, host.DClient, NginxProxyContainerName, host.Address)
}
func RestartNginxProxy(ctx context.Context, host *hosts.Host) error {
return docker.DoRestartContainer(ctx, host.DClient, NginxProxyContainerName, host.Address)
}
......@@ -22,3 +22,7 @@ func runScheduler(ctx context.Context, host *hosts.Host, df hosts.DialerFactory,
func removeScheduler(ctx context.Context, host *hosts.Host) error {
return docker.DoRemoveContainer(ctx, host.DClient, SchedulerContainerName, host.Address)
}
func RestartScheduler(ctx context.Context, host *hosts.Host) error {
return docker.DoRestartContainer(ctx, host.DClient, SchedulerContainerName, host.Address)
}
......@@ -6,6 +6,7 @@ import (
"github.com/rancher/rke/hosts"
"github.com/rancher/rke/log"
"github.com/rancher/rke/pki"
"github.com/rancher/rke/util"
"github.com/rancher/types/apis/management.cattle.io/v3"
"golang.org/x/sync/errgroup"
)
......@@ -102,3 +103,34 @@ func copyProcessMap(m map[string]v3.Process) map[string]v3.Process {
}
return c
}
func RestartWorkerPlane(ctx context.Context, workerHosts []*hosts.Host) error {
log.Infof(ctx, "[%s] Restarting Worker Plane..", WorkerRole)
var errgrp errgroup.Group
hostsQueue := util.GetObjectQueue(workerHosts)
for w := 0; w < 10; w++ {
errgrp.Go(func() error {
var errList []error
for host := range hostsQueue {
runHost := host.(*hosts.Host)
if err := RestartKubelet(ctx, runHost); err != nil {
errList = append(errList, err)
}
if err := RestartKubeproxy(ctx, runHost); err != nil {
errList = append(errList, err)
}
if err := RestartNginxProxy(ctx, runHost); err != nil {
errList = append(errList, err)
}
}
return util.ErrList(errList)
})
}
if err := errgrp.Wait(); err != nil {
return err
}
log.Infof(ctx, "[%s] Successfully restarted Worker Plane..", WorkerRole)
return nil
}
package util
import (
"fmt"
"reflect"
"strings"
"github.com/coreos/go-semver/semver"
......@@ -13,3 +15,21 @@ func StrToSemVer(version string) (*semver.Version, error) {
}
return v, nil
}
func GetObjectQueue(l interface{}) chan interface{} {
s := reflect.ValueOf(l)
c := make(chan interface{}, s.Len())
for i := 0; i < s.Len(); i++ {
c <- s.Index(i).Interface()
}
close(c)
return c
}
func ErrList(e []error) error {
if len(e) > 0 {
return fmt.Errorf("%v", e)
}
return nil
}
......@@ -25,4 +25,4 @@ github.com/Microsoft/go-winio ab35fc04b6365e8fcb18e6e9e41ea4a02b10b17
github.com/go-ini/ini 06f5f3d67269ccec1fe5fe4134ba6e982984f7f5
github.com/rancher/norman c032c4611f2eec1652ef37d254f0e8ccf90c80aa
github.com/rancher/types 15fd7dc1c07e229fcc0848438605779021ec2181
github.com/rancher/types 3695949a4c95d43a2255287f161faff577b6c0aa
Supports Markdown
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment