Unverified Commit 9601ddd5 authored by Alena Prokharchyk's avatar Alena Prokharchyk Committed by GitHub
Browse files

Merge pull request #335 from moelsayed/generate_plan

Generate deployment plan
parents 6ea9ff01 78b41f8d
Showing with 520 additions and 719 deletions
+520 -719
......@@ -54,17 +54,22 @@ const (
func (c *Cluster) DeployControlPlane(ctx context.Context) error {
// Deploy Etcd Plane
if err := services.RunEtcdPlane(ctx, c.EtcdHosts, c.Services.Etcd, c.LocalConnDialerFactory, c.PrivateRegistriesMap); err != nil {
etcdProcessHostMap := c.getEtcdProcessHostMap(nil)
if err := services.RunEtcdPlane(ctx, c.EtcdHosts, etcdProcessHostMap, c.LocalConnDialerFactory, c.PrivateRegistriesMap); err != nil {
return fmt.Errorf("[etcd] Failed to bring up Etcd Plane: %v", err)
}
// Deploy Control plane
processMap := map[string]v3.Process{
services.SidekickContainerName: c.BuildSidecarProcess(),
services.KubeAPIContainerName: c.BuildKubeAPIProcess(),
services.KubeControllerContainerName: c.BuildKubeControllerProcess(),
services.SchedulerContainerName: c.BuildSchedulerProcess(),
}
if err := services.RunControlPlane(ctx, c.ControlPlaneHosts,
c.EtcdHosts,
c.Services,
c.SystemImages.KubernetesServicesSidecar,
c.Authorization.Mode,
c.LocalConnDialerFactory,
c.PrivateRegistriesMap); err != nil {
c.PrivateRegistriesMap,
processMap); err != nil {
return fmt.Errorf("[controlPlane] Failed to bring up Control Plane: %v", err)
}
// Apply Authz configuration after deploying controlplane
......@@ -76,14 +81,22 @@ func (c *Cluster) DeployControlPlane(ctx context.Context) error {
func (c *Cluster) DeployWorkerPlane(ctx context.Context) error {
// Deploy Worker Plane
if err := services.RunWorkerPlane(ctx, c.ControlPlaneHosts,
c.WorkerHosts,
c.EtcdHosts,
c.Services,
c.SystemImages.NginxProxy,
c.SystemImages.KubernetesServicesSidecar,
processMap := map[string]v3.Process{
services.SidekickContainerName: c.BuildSidecarProcess(),
services.KubeproxyContainerName: c.BuildKubeProxyProcess(),
services.NginxProxyContainerName: c.BuildProxyProcess(),
}
kubeletProcessHostMap := make(map[*hosts.Host]v3.Process)
for _, host := range hosts.GetUniqueHostList(c.EtcdHosts, c.ControlPlaneHosts, c.WorkerHosts) {
kubeletProcessHostMap[host] = c.BuildKubeletProcess(host)
}
allHosts := hosts.GetUniqueHostList(c.EtcdHosts, c.ControlPlaneHosts, c.WorkerHosts)
if err := services.RunWorkerPlane(ctx, allHosts,
c.LocalConnDialerFactory,
c.PrivateRegistriesMap); err != nil {
c.PrivateRegistriesMap,
processMap,
kubeletProcessHostMap,
); err != nil {
return fmt.Errorf("[workerPlane] Failed to bring up Worker Plane: %v", err)
}
return nil
......@@ -284,3 +297,11 @@ func ConfigureCluster(ctx context.Context, rkeConfig v3.RancherKubernetesEngineC
return kubeCluster.deployAddons(ctx)
}
func (c *Cluster) getEtcdProcessHostMap(readyEtcdHosts []*hosts.Host) map[*hosts.Host]v3.Process {
etcdProcessHostMap := make(map[*hosts.Host]v3.Process)
for _, host := range c.EtcdHosts {
etcdProcessHostMap[host] = c.BuildEtcdProcess(host, readyEtcdHosts)
}
return etcdProcessHostMap
}
......@@ -41,6 +41,9 @@ const (
KubeProxyPort = "10256"
FlannetVXLANPortUDP = "8472"
ProtocolTCP = "TCP"
ProtocolUDP = "UDP"
FlannelNetworkPlugin = "flannel"
FlannelImage = "flannel_image"
FlannelCNIImage = "flannel_cni_image"
......@@ -96,6 +99,19 @@ const (
RBACConfig = "RBACConfig"
)
var EtcdPortList = []string{
EtcdPort1,
EtcdPort2,
}
var ControlPlanePortList = []string{
KubeAPIPort,
}
var WorkerPortList = []string{
KubeletPort,
}
func (c *Cluster) deployNetworkPlugin(ctx context.Context) error {
log.Infof(ctx, "[network] Setting up network plugin: %s", c.Network.Plugin)
switch c.Network.Plugin {
......@@ -262,27 +278,17 @@ func (c *Cluster) deployTCPPortListeners(ctx context.Context, currentCluster *Cl
workerHosts = c.WorkerHosts
}
// deploy ectd listeners
etcdPortList := []string{
EtcdPort1,
EtcdPort2,
}
if err := c.deployListenerOnPlane(ctx, etcdPortList, etcdHosts, EtcdPortListenContainer); err != nil {
if err := c.deployListenerOnPlane(ctx, EtcdPortList, etcdHosts, EtcdPortListenContainer); err != nil {
return err
}
// deploy controlplane listeners
controlPlanePortList := []string{
KubeAPIPort,
}
if err := c.deployListenerOnPlane(ctx, controlPlanePortList, cpHosts, CPPortListenContainer); err != nil {
if err := c.deployListenerOnPlane(ctx, ControlPlanePortList, cpHosts, CPPortListenContainer); err != nil {
return err
}
// deploy worker listeners
workerPortList := []string{
KubeletPort,
}
if err := c.deployListenerOnPlane(ctx, workerPortList, workerHosts, WorkerPortListenContainer); err != nil {
if err := c.deployListenerOnPlane(ctx, WorkerPortList, workerHosts, WorkerPortListenContainer); err != nil {
return err
}
log.Infof(ctx, "[network] Port listener containers deployed successfully")
......@@ -360,17 +366,13 @@ func removeListenerFromPlane(ctx context.Context, hostPlane []*hosts.Host, conta
func (c *Cluster) runServicePortChecks(ctx context.Context) error {
var errgrp errgroup.Group
// check etcd <-> etcd
etcdPortList := []string{
EtcdPort1,
EtcdPort2,
}
// one etcd host is a pass
if len(c.EtcdHosts) > 1 {
log.Infof(ctx, "[network] Running etcd <-> etcd port checks")
for _, host := range c.EtcdHosts {
runHost := host
errgrp.Go(func() error {
return checkPlaneTCPPortsFromHost(ctx, runHost, etcdPortList, c.EtcdHosts, c.SystemImages.Alpine, c.PrivateRegistriesMap)
return checkPlaneTCPPortsFromHost(ctx, runHost, EtcdPortList, c.EtcdHosts, c.SystemImages.Alpine, c.PrivateRegistriesMap)
})
}
if err := errgrp.Wait(); err != nil {
......@@ -382,7 +384,7 @@ func (c *Cluster) runServicePortChecks(ctx context.Context) error {
for _, host := range c.ControlPlaneHosts {
runHost := host
errgrp.Go(func() error {
return checkPlaneTCPPortsFromHost(ctx, runHost, etcdPortList, c.EtcdHosts, c.SystemImages.Alpine, c.PrivateRegistriesMap)
return checkPlaneTCPPortsFromHost(ctx, runHost, EtcdPortList, c.EtcdHosts, c.SystemImages.Alpine, c.PrivateRegistriesMap)
})
}
if err := errgrp.Wait(); err != nil {
......@@ -392,7 +394,7 @@ func (c *Cluster) runServicePortChecks(ctx context.Context) error {
for _, host := range c.WorkerHosts {
runHost := host
errgrp.Go(func() error {
return checkPlaneTCPPortsFromHost(ctx, runHost, etcdPortList, c.EtcdHosts, c.SystemImages.Alpine, c.PrivateRegistriesMap)
return checkPlaneTCPPortsFromHost(ctx, runHost, EtcdPortList, c.EtcdHosts, c.SystemImages.Alpine, c.PrivateRegistriesMap)
})
}
if err := errgrp.Wait(); err != nil {
......@@ -400,13 +402,10 @@ func (c *Cluster) runServicePortChecks(ctx context.Context) error {
}
// check controle plane -> Workers
log.Infof(ctx, "[network] Running control plane -> etcd port checks")
workerPortList := []string{
KubeletPort,
}
for _, host := range c.ControlPlaneHosts {
runHost := host
errgrp.Go(func() error {
return checkPlaneTCPPortsFromHost(ctx, runHost, workerPortList, c.WorkerHosts, c.SystemImages.Alpine, c.PrivateRegistriesMap)
return checkPlaneTCPPortsFromHost(ctx, runHost, WorkerPortList, c.WorkerHosts, c.SystemImages.Alpine, c.PrivateRegistriesMap)
})
}
if err := errgrp.Wait(); err != nil {
......@@ -414,13 +413,10 @@ func (c *Cluster) runServicePortChecks(ctx context.Context) error {
}
// check workers -> control plane
log.Infof(ctx, "[network] Running workers -> control plane port checks")
controlPlanePortList := []string{
KubeAPIPort,
}
for _, host := range c.WorkerHosts {
runHost := host
errgrp.Go(func() error {
return checkPlaneTCPPortsFromHost(ctx, runHost, controlPlanePortList, c.ControlPlaneHosts, c.SystemImages.Alpine, c.PrivateRegistriesMap)
return checkPlaneTCPPortsFromHost(ctx, runHost, ControlPlanePortList, c.ControlPlaneHosts, c.SystemImages.Alpine, c.PrivateRegistriesMap)
})
}
return errgrp.Wait()
......
package cluster
import (
"context"
"fmt"
"strconv"
"github.com/rancher/rke/hosts"
"github.com/rancher/rke/pki"
"github.com/rancher/rke/services"
"github.com/rancher/types/apis/management.cattle.io/v3"
)
func GeneratePlan(ctx context.Context, rkeConfig *v3.RancherKubernetesEngineConfig) (v3.RKEPlan, error) {
clusterPlan := v3.RKEPlan{}
myCluster, _ := ParseCluster(ctx, rkeConfig, "", "", nil, nil)
// rkeConfig.Nodes are already unique. But they don't have role flags. So I will use the parsed cluster.Hosts to make use of the role flags.
uniqHosts := hosts.GetUniqueHostList(myCluster.EtcdHosts, myCluster.ControlPlaneHosts, myCluster.WorkerHosts)
for _, host := range uniqHosts {
clusterPlan.Nodes = append(clusterPlan.Nodes, BuildRKEConfigNodePlan(ctx, myCluster, host))
}
return clusterPlan, nil
}
func BuildRKEConfigNodePlan(ctx context.Context, myCluster *Cluster, host *hosts.Host) v3.RKEConfigNodePlan {
processes := []v3.Process{}
portChecks := []v3.PortCheck{}
// Everybody gets a sidecar and a kubelet..
processes = append(processes, myCluster.BuildSidecarProcess())
processes = append(processes, myCluster.BuildKubeletProcess(host))
processes = append(processes, myCluster.BuildKubeProxyProcess())
portChecks = append(portChecks, BuildPortChecksFromPortList(host, WorkerPortList, ProtocolTCP)...)
// Do we need an nginxProxy for this one ?
if host.IsWorker && !host.IsControl {
processes = append(processes, myCluster.BuildProxyProcess())
}
if host.IsControl {
processes = append(processes, myCluster.BuildKubeAPIProcess())
processes = append(processes, myCluster.BuildKubeControllerProcess())
processes = append(processes, myCluster.BuildSchedulerProcess())
portChecks = append(portChecks, BuildPortChecksFromPortList(host, ControlPlanePortList, ProtocolTCP)...)
}
if host.IsEtcd {
processes = append(processes, myCluster.BuildEtcdProcess(host, nil))
portChecks = append(portChecks, BuildPortChecksFromPortList(host, EtcdPortList, ProtocolTCP)...)
}
return v3.RKEConfigNodePlan{
Address: host.Address,
Processes: processes,
PortChecks: portChecks,
}
}
func (c *Cluster) BuildKubeAPIProcess() v3.Process {
etcdConnString := services.GetEtcdConnString(c.EtcdHosts)
args := []string{}
Command := []string{
"/opt/rke/entrypoint.sh",
"kube-apiserver",
"--insecure-bind-address=127.0.0.1",
"--bind-address=0.0.0.0",
"--insecure-port=0",
"--secure-port=6443",
"--cloud-provider=",
"--allow_privileged=true",
"--kubelet-preferred-address-types=InternalIP,ExternalIP,Hostname",
"--service-cluster-ip-range=" + c.Services.KubeAPI.ServiceClusterIPRange,
"--admission-control=ServiceAccount,NamespaceLifecycle,LimitRanger,PersistentVolumeLabel,DefaultStorageClass,ResourceQuota,DefaultTolerationSeconds",
"--runtime-config=batch/v2alpha1",
"--runtime-config=authentication.k8s.io/v1beta1=true",
"--storage-backend=etcd3",
"--client-ca-file=" + pki.GetCertPath(pki.CACertName),
"--tls-cert-file=" + pki.GetCertPath(pki.KubeAPICertName),
"--tls-private-key-file=" + pki.GetKeyPath(pki.KubeAPICertName),
"--service-account-key-file=" + pki.GetKeyPath(pki.KubeAPICertName),
"--etcd-cafile=" + pki.GetCertPath(pki.CACertName),
"--etcd-certfile=" + pki.GetCertPath(pki.KubeAPICertName),
"--etcd-keyfile=" + pki.GetKeyPath(pki.KubeAPICertName),
}
args = append(args, "--etcd-servers="+etcdConnString)
if c.Authorization.Mode == services.RBACAuthorizationMode {
args = append(args, "--authorization-mode=RBAC")
}
if c.Services.KubeAPI.PodSecurityPolicy {
args = append(args, "--runtime-config=extensions/v1beta1/podsecuritypolicy=true", "--admission-control=PodSecurityPolicy")
}
VolumesFrom := []string{
services.SidekickContainerName,
}
Binds := []string{
"/etc/kubernetes:/etc/kubernetes",
}
for arg, value := range c.Services.KubeAPI.ExtraArgs {
cmd := fmt.Sprintf("--%s=%s", arg, value)
Command = append(Command, cmd)
}
healthCheck := v3.HealthCheck{
URL: services.GetHealthCheckURL(true, services.KubeAPIPort),
}
return v3.Process{
Command: Command,
Args: args,
VolumesFrom: VolumesFrom,
Binds: Binds,
NetworkMode: "host",
RestartPolicy: "always",
Image: c.Services.KubeAPI.Image,
HealthCheck: healthCheck,
}
}
func (c *Cluster) BuildKubeControllerProcess() v3.Process {
Command := []string{"/opt/rke/entrypoint.sh",
"kube-controller-manager",
"--address=0.0.0.0",
"--cloud-provider=",
"--leader-elect=true",
"--kubeconfig=" + pki.GetConfigPath(pki.KubeControllerCertName),
"--enable-hostpath-provisioner=false",
"--node-monitor-grace-period=40s",
"--pod-eviction-timeout=5m0s",
"--v=2",
"--allocate-node-cidrs=true",
"--cluster-cidr=" + c.ClusterCIDR,
"--service-cluster-ip-range=" + c.Services.KubeController.ServiceClusterIPRange,
"--service-account-private-key-file=" + pki.GetKeyPath(pki.KubeAPICertName),
"--root-ca-file=" + pki.GetCertPath(pki.CACertName),
}
args := []string{}
if c.Authorization.Mode == services.RBACAuthorizationMode {
args = append(args, "--use-service-account-credentials=true")
}
VolumesFrom := []string{
services.SidekickContainerName,
}
Binds := []string{
"/etc/kubernetes:/etc/kubernetes",
}
for arg, value := range c.Services.KubeController.ExtraArgs {
cmd := fmt.Sprintf("--%s=%s", arg, value)
Command = append(Command, cmd)
}
healthCheck := v3.HealthCheck{
URL: services.GetHealthCheckURL(false, services.KubeControllerPort),
}
return v3.Process{
Command: Command,
Args: args,
VolumesFrom: VolumesFrom,
Binds: Binds,
NetworkMode: "host",
RestartPolicy: "always",
Image: c.Services.KubeController.Image,
HealthCheck: healthCheck,
}
}
func (c *Cluster) BuildKubeletProcess(host *hosts.Host) v3.Process {
Command := []string{"/opt/rke/entrypoint.sh",
"kubelet",
"--v=2",
"--address=0.0.0.0",
"--cluster-domain=" + c.ClusterDomain,
"--pod-infra-container-image=" + c.Services.Kubelet.InfraContainerImage,
"--cgroups-per-qos=True",
"--enforce-node-allocatable=",
"--hostname-override=" + host.HostnameOverride,
"--cluster-dns=" + c.ClusterDNSServer,
"--network-plugin=cni",
"--cni-conf-dir=/etc/cni/net.d",
"--cni-bin-dir=/opt/cni/bin",
"--resolv-conf=/etc/resolv.conf",
"--allow-privileged=true",
"--cloud-provider=",
"--kubeconfig=" + pki.GetConfigPath(pki.KubeNodeCertName),
"--require-kubeconfig=True",
"--fail-swap-on=" + strconv.FormatBool(c.Services.Kubelet.FailSwapOn),
}
VolumesFrom := []string{
services.SidekickContainerName,
}
Binds := []string{
"/etc/kubernetes:/etc/kubernetes",
"/usr/libexec/kubernetes/kubelet-plugins:/usr/libexec/kubernetes/kubelet-plugins",
"/etc/cni:/etc/cni:ro",
"/opt/cni:/opt/cni:ro",
"/etc/resolv.conf:/etc/resolv.conf",
"/sys:/sys",
"/var/lib/docker:/var/lib/docker:rw",
"/var/lib/kubelet:/var/lib/kubelet:shared",
"/var/run:/var/run:rw",
"/run:/run",
"/etc/ceph:/etc/ceph",
"/dev:/host/dev",
"/var/log/containers:/var/log/containers",
"/var/log/pods:/var/log/pods",
}
for arg, value := range c.Services.Kubelet.ExtraArgs {
cmd := fmt.Sprintf("--%s=%s", arg, value)
Command = append(Command, cmd)
}
healthCheck := v3.HealthCheck{
URL: services.GetHealthCheckURL(true, services.KubeletPort),
}
return v3.Process{
Command: Command,
VolumesFrom: VolumesFrom,
Binds: Binds,
NetworkMode: "host",
RestartPolicy: "always",
Image: c.Services.Kubelet.Image,
PidMode: "host",
Privileged: true,
HealthCheck: healthCheck,
}
}
func (c *Cluster) BuildKubeProxyProcess() v3.Process {
Command := []string{"/opt/rke/entrypoint.sh",
"kube-proxy",
"--v=2",
"--healthz-bind-address=0.0.0.0",
"--kubeconfig=" + pki.GetConfigPath(pki.KubeProxyCertName),
}
VolumesFrom := []string{
services.SidekickContainerName,
}
Binds := []string{
"/etc/kubernetes:/etc/kubernetes",
}
for arg, value := range c.Services.Kubeproxy.ExtraArgs {
cmd := fmt.Sprintf("--%s=%s", arg, value)
Command = append(Command, cmd)
}
healthCheck := v3.HealthCheck{
URL: services.GetHealthCheckURL(false, services.KubeproxyPort),
}
return v3.Process{
Command: Command,
VolumesFrom: VolumesFrom,
Binds: Binds,
NetworkMode: "host",
RestartPolicy: "always",
PidMode: "host",
Privileged: true,
HealthCheck: healthCheck,
Image: c.Services.Kubeproxy.Image,
}
}
func (c *Cluster) BuildProxyProcess() v3.Process {
nginxProxyEnv := ""
for i, host := range c.ControlPlaneHosts {
nginxProxyEnv += fmt.Sprintf("%s", host.InternalAddress)
if i < (len(c.ControlPlaneHosts) - 1) {
nginxProxyEnv += ","
}
}
Env := []string{fmt.Sprintf("%s=%s", services.NginxProxyEnvName, nginxProxyEnv)}
return v3.Process{
Env: Env,
NetworkMode: "host",
RestartPolicy: "always",
HealthCheck: v3.HealthCheck{},
Image: c.SystemImages.NginxProxy,
}
}
func (c *Cluster) BuildSchedulerProcess() v3.Process {
Command := []string{"/opt/rke/entrypoint.sh",
"kube-scheduler",
"--leader-elect=true",
"--v=2",
"--address=0.0.0.0",
"--kubeconfig=" + pki.GetConfigPath(pki.KubeSchedulerCertName),
}
VolumesFrom := []string{
services.SidekickContainerName,
}
Binds := []string{
"/etc/kubernetes:/etc/kubernetes",
}
for arg, value := range c.Services.Scheduler.ExtraArgs {
cmd := fmt.Sprintf("--%s=%s", arg, value)
Command = append(Command, cmd)
}
healthCheck := v3.HealthCheck{
URL: services.GetHealthCheckURL(false, services.SchedulerPort),
}
return v3.Process{
Command: Command,
Binds: Binds,
VolumesFrom: VolumesFrom,
NetworkMode: "host",
RestartPolicy: "always",
Image: c.Services.Scheduler.Image,
HealthCheck: healthCheck,
}
}
func (c *Cluster) BuildSidecarProcess() v3.Process {
return v3.Process{
NetworkMode: "none",
Image: c.SystemImages.KubernetesServicesSidecar,
HealthCheck: v3.HealthCheck{},
}
}
func (c *Cluster) BuildEtcdProcess(host *hosts.Host, etcdHosts []*hosts.Host) v3.Process {
nodeName := pki.GetEtcdCrtName(host.InternalAddress)
initCluster := ""
if len(etcdHosts) == 0 {
initCluster = services.GetEtcdInitialCluster(c.EtcdHosts)
} else {
initCluster = services.GetEtcdInitialCluster(etcdHosts)
}
clusterState := "new"
if host.ExistingEtcdCluster {
clusterState = "existing"
}
args := []string{"/usr/local/bin/etcd",
"--name=etcd-" + host.HostnameOverride,
"--data-dir=/etcd-data",
"--advertise-client-urls=https://" + host.InternalAddress + ":2379,https://" + host.InternalAddress + ":4001",
"--listen-client-urls=https://0.0.0.0:2379",
"--initial-advertise-peer-urls=https://" + host.InternalAddress + ":2380",
"--listen-peer-urls=https://0.0.0.0:2380",
"--initial-cluster-token=etcd-cluster-1",
"--initial-cluster=" + initCluster,
"--initial-cluster-state=" + clusterState,
"--peer-client-cert-auth",
"--client-cert-auth",
"--trusted-ca-file=" + pki.GetCertPath(pki.CACertName),
"--peer-trusted-ca-file=" + pki.GetCertPath(pki.CACertName),
"--cert-file=" + pki.GetCertPath(nodeName),
"--key-file=" + pki.GetKeyPath(nodeName),
"--peer-cert-file=" + pki.GetCertPath(nodeName),
"--peer-key-file=" + pki.GetKeyPath(nodeName),
}
Binds := []string{
"/var/lib/etcd:/etcd-data:z",
"/etc/kubernetes:/etc/kubernetes:z",
}
for arg, value := range c.Services.Etcd.ExtraArgs {
cmd := fmt.Sprintf("--%s=%s", arg, value)
args = append(args, cmd)
}
healthCheck := v3.HealthCheck{
URL: services.EtcdHealthCheckURL,
}
return v3.Process{
Args: args,
Binds: Binds,
NetworkMode: "host",
Image: c.Services.Etcd.Image,
HealthCheck: healthCheck,
}
}
func BuildPortChecksFromPortList(host *hosts.Host, portList []string, proto string) []v3.PortCheck {
portChecks := []v3.PortCheck{}
for _, port := range portList {
intPort, _ := strconv.Atoi(port)
portChecks = append(portChecks, v3.PortCheck{
Address: host.Address,
Port: intPort,
Protocol: proto,
})
}
return portChecks
}
......@@ -197,7 +197,10 @@ func reconcileEtcd(ctx context.Context, currentCluster, kubeCluster *Cluster, ku
return err
}
etcdHost.ToAddEtcdMember = false
if err := services.ReloadEtcdCluster(ctx, kubeCluster.EtcdHosts, kubeCluster.Services.Etcd, currentCluster.LocalConnDialerFactory, clientCert, clientkey, currentCluster.PrivateRegistriesMap); err != nil {
readyHosts := getReadyEtcdHosts(kubeCluster.EtcdHosts)
etcdProcessHostMap := kubeCluster.getEtcdProcessHostMap(readyHosts)
if err := services.ReloadEtcdCluster(ctx, readyHosts, currentCluster.LocalConnDialerFactory, clientCert, clientkey, currentCluster.PrivateRegistriesMap, etcdProcessHostMap); err != nil {
return err
}
}
......@@ -220,3 +223,14 @@ func syncLabels(ctx context.Context, currentCluster, kubeCluster *Cluster) {
}
}
}
func getReadyEtcdHosts(etcdHosts []*hosts.Host) []*hosts.Host {
readyEtcdHosts := []*hosts.Host{}
for _, host := range etcdHosts {
if !host.ToAddEtcdMember {
readyEtcdHosts = append(readyEtcdHosts, host)
host.ExistingEtcdCluster = true
}
}
return readyEtcdHosts
}
......@@ -9,13 +9,13 @@ import (
"golang.org/x/sync/errgroup"
)
func RunControlPlane(ctx context.Context, controlHosts, etcdHosts []*hosts.Host, controlServices v3.RKEConfigServices, sidekickImage, authorizationMode string, localConnDialerFactory hosts.DialerFactory, prsMap map[string]v3.PrivateRegistry) error {
func RunControlPlane(ctx context.Context, controlHosts []*hosts.Host, localConnDialerFactory hosts.DialerFactory, prsMap map[string]v3.PrivateRegistry, processMap map[string]v3.Process) error {
log.Infof(ctx, "[%s] Building up Controller Plane..", ControlRole)
var errgrp errgroup.Group
for _, host := range controlHosts {
runHost := host
errgrp.Go(func() error {
return doDeployControlHost(ctx, runHost, etcdHosts, controlServices, sidekickImage, authorizationMode, localConnDialerFactory, prsMap)
return doDeployControlHost(ctx, runHost, localConnDialerFactory, prsMap, processMap)
})
}
if err := errgrp.Wait(); err != nil {
......@@ -66,24 +66,24 @@ func RemoveControlPlane(ctx context.Context, controlHosts []*hosts.Host, force b
return nil
}
func doDeployControlHost(ctx context.Context, host *hosts.Host, etcdHosts []*hosts.Host, controlServices v3.RKEConfigServices, sidekickImage, authorizationMode string, localConnDialerFactory hosts.DialerFactory, prsMap map[string]v3.PrivateRegistry) error {
func doDeployControlHost(ctx context.Context, host *hosts.Host, localConnDialerFactory hosts.DialerFactory, prsMap map[string]v3.PrivateRegistry, processMap map[string]v3.Process) error {
if host.IsWorker {
if err := removeNginxProxy(ctx, host); err != nil {
return err
}
}
// run sidekick
if err := runSidekick(ctx, host, sidekickImage, prsMap); err != nil {
if err := runSidekick(ctx, host, prsMap, processMap[SidekickContainerName]); err != nil {
return err
}
// run kubeapi
if err := runKubeAPI(ctx, host, etcdHosts, controlServices.KubeAPI, authorizationMode, localConnDialerFactory, prsMap); err != nil {
if err := runKubeAPI(ctx, host, localConnDialerFactory, prsMap, processMap[KubeAPIContainerName]); err != nil {
return err
}
// run kubecontroller
if err := runKubeController(ctx, host, controlServices.KubeController, authorizationMode, localConnDialerFactory, prsMap); err != nil {
if err := runKubeController(ctx, host, localConnDialerFactory, prsMap, processMap[KubeControllerContainerName]); err != nil {
return err
}
// run scheduler
return runScheduler(ctx, host, controlServices.Scheduler, localConnDialerFactory, prsMap)
return runScheduler(ctx, host, localConnDialerFactory, prsMap, processMap[SchedulerContainerName])
}
......@@ -7,22 +7,21 @@ import (
"context"
etcdclient "github.com/coreos/etcd/client"
"github.com/docker/docker/api/types/container"
"github.com/rancher/rke/docker"
"github.com/rancher/rke/hosts"
"github.com/rancher/rke/log"
"github.com/rancher/rke/pki"
"github.com/rancher/types/apis/management.cattle.io/v3"
"github.com/sirupsen/logrus"
)
func RunEtcdPlane(ctx context.Context, etcdHosts []*hosts.Host, etcdService v3.ETCDService, localConnDialerFactory hosts.DialerFactory, prsMap map[string]v3.PrivateRegistry) error {
const (
EtcdHealthCheckURL = "https://127.0.0.1:2379/health"
)
func RunEtcdPlane(ctx context.Context, etcdHosts []*hosts.Host, etcdProcessHostMap map[*hosts.Host]v3.Process, localConnDialerFactory hosts.DialerFactory, prsMap map[string]v3.PrivateRegistry) error {
log.Infof(ctx, "[%s] Building up Etcd Plane..", ETCDRole)
initCluster := getEtcdInitialCluster(etcdHosts)
for _, host := range etcdHosts {
nodeName := pki.GetEtcdCrtName(host.InternalAddress)
imageCfg, hostCfg := buildEtcdConfig(host, etcdService, initCluster, nodeName)
imageCfg, hostCfg, _ := getProcessConfig(etcdProcessHostMap[host])
err := docker.DoRunContainer(ctx, host.DClient, imageCfg, hostCfg, EtcdContainerName, host.Address, ETCDRole, prsMap)
if err != nil {
return err
......@@ -60,49 +59,6 @@ func RemoveEtcdPlane(ctx context.Context, etcdHosts []*hosts.Host, force bool) e
return nil
}
func buildEtcdConfig(host *hosts.Host, etcdService v3.ETCDService, initCluster, nodeName string) (*container.Config, *container.HostConfig) {
clusterState := "new"
if host.ExistingEtcdCluster {
clusterState = "existing"
}
imageCfg := &container.Config{
Image: etcdService.Image,
Cmd: []string{"/usr/local/bin/etcd",
"--name=etcd-" + host.HostnameOverride,
"--data-dir=/etcd-data",
"--advertise-client-urls=https://" + host.InternalAddress + ":2379,https://" + host.InternalAddress + ":4001",
"--listen-client-urls=https://0.0.0.0:2379",
"--initial-advertise-peer-urls=https://" + host.InternalAddress + ":2380",
"--listen-peer-urls=https://0.0.0.0:2380",
"--initial-cluster-token=etcd-cluster-1",
"--initial-cluster=" + initCluster,
"--initial-cluster-state=" + clusterState,
"--peer-client-cert-auth",
"--client-cert-auth",
"--trusted-ca-file=" + pki.GetCertPath(pki.CACertName),
"--peer-trusted-ca-file=" + pki.GetCertPath(pki.CACertName),
"--cert-file=" + pki.GetCertPath(nodeName),
"--key-file=" + pki.GetKeyPath(nodeName),
"--peer-cert-file=" + pki.GetCertPath(nodeName),
"--peer-key-file=" + pki.GetKeyPath(nodeName),
},
}
hostCfg := &container.HostConfig{
RestartPolicy: container.RestartPolicy{Name: "always"},
Binds: []string{
"/var/lib/etcd:/etcd-data:z",
"/etc/kubernetes:/etc/kubernetes:z",
},
NetworkMode: "host",
}
for arg, value := range etcdService.ExtraArgs {
cmd := fmt.Sprintf("--%s=%s", arg, value)
imageCfg.Entrypoint = append(imageCfg.Entrypoint, cmd)
}
return imageCfg, hostCfg
}
func AddEtcdMember(ctx context.Context, etcdHost *hosts.Host, etcdHosts []*hosts.Host, localConnDialerFactory hosts.DialerFactory, cert, key []byte) error {
log.Infof(ctx, "[add/%s] Adding member [etcd-%s] to etcd cluster", ETCDRole, etcdHost.HostnameOverride)
peerURL := fmt.Sprintf("https://%s:2380", etcdHost.InternalAddress)
......@@ -164,17 +120,9 @@ func RemoveEtcdMember(ctx context.Context, etcdHost *hosts.Host, etcdHosts []*ho
return nil
}
func ReloadEtcdCluster(ctx context.Context, etcdHosts []*hosts.Host, etcdService v3.ETCDService, localConnDialerFactory hosts.DialerFactory, cert, key []byte, prsMap map[string]v3.PrivateRegistry) error {
readyEtcdHosts := []*hosts.Host{}
for _, host := range etcdHosts {
if !host.ToAddEtcdMember {
readyEtcdHosts = append(readyEtcdHosts, host)
host.ExistingEtcdCluster = true
}
}
initCluster := getEtcdInitialCluster(readyEtcdHosts)
for _, host := range readyEtcdHosts {
imageCfg, hostCfg := buildEtcdConfig(host, etcdService, initCluster, pki.GetEtcdCrtName(host.InternalAddress))
func ReloadEtcdCluster(ctx context.Context, readyEtcdHosts []*hosts.Host, localConnDialerFactory hosts.DialerFactory, cert, key []byte, prsMap map[string]v3.PrivateRegistry, etcdProcessHostMap map[*hosts.Host]v3.Process) error {
for host, process := range etcdProcessHostMap {
imageCfg, hostCfg, _ := getProcessConfig(process)
if err := docker.DoRunContainer(ctx, host.DClient, imageCfg, hostCfg, EtcdContainerName, host.Address, ETCDRole, prsMap); err != nil {
return err
}
......@@ -182,7 +130,8 @@ func ReloadEtcdCluster(ctx context.Context, etcdHosts []*hosts.Host, etcdService
time.Sleep(10 * time.Second)
var healthy bool
for _, host := range readyEtcdHosts {
if healthy = isEtcdHealthy(ctx, localConnDialerFactory, host, cert, key); healthy {
_, _, healthCheckURL := getProcessConfig(etcdProcessHostMap[host])
if healthy = isEtcdHealthy(ctx, localConnDialerFactory, host, cert, key, healthCheckURL); healthy {
break
}
}
......
package services
import (
"fmt"
"testing"
"github.com/rancher/rke/hosts"
"github.com/rancher/rke/pki"
"github.com/rancher/types/apis/management.cattle.io/v3"
)
const (
TestInitEtcdClusterString = "etcd-etcd1=https://1.1.1.1:2380,etcd-etcd2=https://2.2.2.2:2380"
TestEtcdImage = "etcd/etcdImage:latest"
TestEtcdNamePrefix = "--name=etcd-"
TestEtcdVolumeBind = "/var/lib/etcd:/etcd-data:z"
TestEtcdExtraArgs = "--foo=bar"
)
func TestEtcdConfig(t *testing.T) {
etcdHosts := []*hosts.Host{
&hosts.Host{
RKEConfigNode: v3.RKEConfigNode{
Address: "1.1.1.1",
InternalAddress: "1.1.1.1",
Role: []string{"etcd"},
HostnameOverride: "etcd1",
},
DClient: nil,
},
&hosts.Host{
RKEConfigNode: v3.RKEConfigNode{
Address: "2.2.2.2",
InternalAddress: "2.2.2.2",
Role: []string{"etcd"},
HostnameOverride: "etcd2",
},
DClient: nil,
},
}
etcdService := v3.ETCDService{}
etcdService.Image = TestEtcdImage
etcdService.ExtraArgs = map[string]string{"foo": "bar"}
// Test init cluster string
initCluster := getEtcdInitialCluster(etcdHosts)
assertEqual(t, initCluster, TestInitEtcdClusterString, "")
for _, host := range etcdHosts {
nodeName := pki.GetEtcdCrtName(host.InternalAddress)
imageCfg, hostCfg := buildEtcdConfig(host, etcdService, TestInitEtcdClusterString, nodeName)
assertEqual(t, isStringInSlice(TestEtcdNamePrefix+host.HostnameOverride, imageCfg.Cmd), true,
fmt.Sprintf("Failed to find [%s] in Etcd command", TestEtcdNamePrefix+host.HostnameOverride))
assertEqual(t, TestEtcdImage, imageCfg.Image,
fmt.Sprintf("Failed to verify [%s] as Etcd Image", TestEtcdImage))
assertEqual(t, isStringInSlice(TestEtcdVolumeBind, hostCfg.Binds), true,
fmt.Sprintf("Failed to find [%s] in volume binds of Etcd Service", TestEtcdVolumeBind))
assertEqual(t, isStringInSlice(TestEtcdExtraArgs, imageCfg.Entrypoint), true,
fmt.Sprintf("Failed to find [%s] in extra args of Etcd Service", TestEtcdExtraArgs))
}
}
......@@ -39,7 +39,7 @@ func getEtcdClient(ctx context.Context, etcdHost *hosts.Host, localConnDialerFac
return etcdclient.New(cfg)
}
func isEtcdHealthy(ctx context.Context, localConnDialerFactory hosts.DialerFactory, host *hosts.Host, cert, key []byte) bool {
func isEtcdHealthy(ctx context.Context, localConnDialerFactory hosts.DialerFactory, host *hosts.Host, cert, key []byte, url string) bool {
logrus.Debugf("[etcd] Check etcd cluster health")
for i := 0; i < 3; i++ {
dialer, err := getEtcdDialer(localConnDialerFactory, host)
......@@ -59,7 +59,7 @@ func isEtcdHealthy(ctx context.Context, localConnDialerFactory hosts.DialerFacto
TLSHandshakeTimeout: 10 * time.Second,
},
}
healthy, err := getHealthEtcd(hc, host)
healthy, err := getHealthEtcd(hc, host, url)
if err != nil {
logrus.Debug(err)
time.Sleep(5 * time.Second)
......@@ -73,9 +73,9 @@ func isEtcdHealthy(ctx context.Context, localConnDialerFactory hosts.DialerFacto
return false
}
func getHealthEtcd(hc http.Client, host *hosts.Host) (string, error) {
func getHealthEtcd(hc http.Client, host *hosts.Host, url string) (string, error) {
healthy := struct{ Health string }{}
resp, err := hc.Get("https://127.0.0.1:2379/health")
resp, err := hc.Get(url)
if err != nil {
return healthy.Health, fmt.Errorf("Failed to get /health for host [%s]: %v", host.Address, err)
}
......@@ -90,7 +90,7 @@ func getHealthEtcd(hc http.Client, host *hosts.Host) (string, error) {
return healthy.Health, nil
}
func getEtcdInitialCluster(hosts []*hosts.Host) string {
func GetEtcdInitialCluster(hosts []*hosts.Host) string {
initialCluster := ""
for i, host := range hosts {
initialCluster += fmt.Sprintf("etcd-%s=https://%s:2380", host.HostnameOverride, host.InternalAddress)
......
......@@ -6,6 +6,8 @@ import (
"fmt"
"io/ioutil"
"net/http"
"strconv"
"strings"
"time"
"github.com/rancher/rke/hosts"
......@@ -20,14 +22,18 @@ const (
HTTPSProtoPrefix = "https://"
)
func runHealthcheck(ctx context.Context, host *hosts.Host, port int, useTLS bool, serviceName string, localConnDialerFactory hosts.DialerFactory) error {
func runHealthcheck(ctx context.Context, host *hosts.Host, serviceName string, localConnDialerFactory hosts.DialerFactory, url string) error {
log.Infof(ctx, "[healthcheck] Start Healthcheck on service [%s] on host [%s]", serviceName, host.Address)
port, err := getPortFromURL(url)
if err != nil {
return err
}
client, err := getHealthCheckHTTPClient(host, port, localConnDialerFactory)
if err != nil {
return fmt.Errorf("Failed to initiate new HTTP client for service [%s] for host [%s]", serviceName, host.Address)
}
for retries := 0; retries < 10; retries++ {
if err = getHealthz(client, useTLS, serviceName, host.Address, port); err != nil {
if err = getHealthz(client, serviceName, host.Address, url); err != nil {
logrus.Debugf("[healthcheck] %v", err)
time.Sleep(5 * time.Second)
continue
......@@ -58,14 +64,10 @@ func getHealthCheckHTTPClient(host *hosts.Host, port int, localConnDialerFactory
}, nil
}
func getHealthz(client *http.Client, useTLS bool, serviceName, hostAddress string, port int) error {
proto := HTTPProtoPrefix
if useTLS {
proto = HTTPSProtoPrefix
}
resp, err := client.Get(fmt.Sprintf("%s%s:%d%s", proto, HealthzAddress, port, HealthzEndpoint))
func getHealthz(client *http.Client, serviceName, hostAddress, url string) error {
resp, err := client.Get(url)
if err != nil {
return fmt.Errorf("Failed to check %s for service [%s] on host [%s]: %v", HealthzEndpoint, serviceName, hostAddress, err)
return fmt.Errorf("Failed to check %s for service [%s] on host [%s]: %v", url, serviceName, hostAddress, err)
}
if resp.StatusCode != http.StatusOK {
statusBody, _ := ioutil.ReadAll(resp.Body)
......@@ -73,3 +75,12 @@ func getHealthz(client *http.Client, useTLS bool, serviceName, hostAddress strin
}
return nil
}
func getPortFromURL(url string) (int, error) {
port := strings.Split(strings.Split(url, ":")[2], "/")[0]
intPort, err := strconv.Atoi(port)
if err != nil {
return 0, err
}
return intPort, nil
}
......@@ -2,75 +2,20 @@ package services
import (
"context"
"fmt"
"github.com/docker/docker/api/types/container"
"github.com/rancher/rke/docker"
"github.com/rancher/rke/hosts"
"github.com/rancher/rke/pki"
"github.com/rancher/types/apis/management.cattle.io/v3"
)
func runKubeAPI(ctx context.Context, host *hosts.Host, etcdHosts []*hosts.Host, kubeAPIService v3.KubeAPIService, authorizationMode string, df hosts.DialerFactory, prsMap map[string]v3.PrivateRegistry) error {
etcdConnString := GetEtcdConnString(etcdHosts)
imageCfg, hostCfg := buildKubeAPIConfig(host, kubeAPIService, etcdConnString, authorizationMode)
func runKubeAPI(ctx context.Context, host *hosts.Host, df hosts.DialerFactory, prsMap map[string]v3.PrivateRegistry, kubeAPIProcess v3.Process) error {
imageCfg, hostCfg, healthCheckURL := getProcessConfig(kubeAPIProcess)
if err := docker.DoRunContainer(ctx, host.DClient, imageCfg, hostCfg, KubeAPIContainerName, host.Address, ControlRole, prsMap); err != nil {
return err
}
return runHealthcheck(ctx, host, KubeAPIPort, true, KubeAPIContainerName, df)
return runHealthcheck(ctx, host, KubeAPIContainerName, df, healthCheckURL)
}
func removeKubeAPI(ctx context.Context, host *hosts.Host) error {
return docker.DoRemoveContainer(ctx, host.DClient, KubeAPIContainerName, host.Address)
}
func buildKubeAPIConfig(host *hosts.Host, kubeAPIService v3.KubeAPIService, etcdConnString, authorizationMode string) (*container.Config, *container.HostConfig) {
imageCfg := &container.Config{
Image: kubeAPIService.Image,
Entrypoint: []string{"/opt/rke/entrypoint.sh",
"kube-apiserver",
"--insecure-bind-address=127.0.0.1",
"--bind-address=0.0.0.0",
"--insecure-port=0",
"--secure-port=6443",
"--cloud-provider=",
"--allow_privileged=true",
"--kubelet-preferred-address-types=InternalIP,ExternalIP,Hostname",
"--service-cluster-ip-range=" + kubeAPIService.ServiceClusterIPRange,
"--admission-control=ServiceAccount,NamespaceLifecycle,LimitRanger,PersistentVolumeLabel,DefaultStorageClass,ResourceQuota,DefaultTolerationSeconds",
"--runtime-config=batch/v2alpha1",
"--runtime-config=authentication.k8s.io/v1beta1=true",
"--storage-backend=etcd3",
"--client-ca-file=" + pki.GetCertPath(pki.CACertName),
"--tls-cert-file=" + pki.GetCertPath(pki.KubeAPICertName),
"--tls-private-key-file=" + pki.GetKeyPath(pki.KubeAPICertName),
"--service-account-key-file=" + pki.GetKeyPath(pki.KubeAPICertName),
"--etcd-cafile=" + pki.GetCertPath(pki.CACertName),
"--etcd-certfile=" + pki.GetCertPath(pki.KubeAPICertName),
"--etcd-keyfile=" + pki.GetKeyPath(pki.KubeAPICertName)},
}
imageCfg.Cmd = append(imageCfg.Cmd, "--etcd-servers="+etcdConnString)
if authorizationMode == RBACAuthorizationMode {
imageCfg.Cmd = append(imageCfg.Cmd, "--authorization-mode=RBAC")
}
if kubeAPIService.PodSecurityPolicy {
imageCfg.Cmd = append(imageCfg.Cmd, "--runtime-config=extensions/v1beta1/podsecuritypolicy=true", "--admission-control=PodSecurityPolicy")
}
hostCfg := &container.HostConfig{
VolumesFrom: []string{
SidekickContainerName,
},
Binds: []string{
"/etc/kubernetes:/etc/kubernetes:z",
},
NetworkMode: "host",
RestartPolicy: container.RestartPolicy{Name: "always"},
}
for arg, value := range kubeAPIService.ExtraArgs {
cmd := fmt.Sprintf("--%s=%s", arg, value)
imageCfg.Entrypoint = append(imageCfg.Entrypoint, cmd)
}
return imageCfg, hostCfg
}
package services
import (
"fmt"
"testing"
"github.com/rancher/rke/hosts"
"github.com/rancher/types/apis/management.cattle.io/v3"
)
const (
TestEtcdConnString = "https://1.1.1.1:2379,https://2.2.2.2:2379"
TestKubeAPIImage = "rancher/k8s:latest"
TestInsecureBindAddress = "--insecure-bind-address=127.0.0.1"
TestKubeAPIVolumeBind = "/etc/kubernetes:/etc/kubernetes:z"
TestKubeAPIExtraArgs = "--foo=bar"
)
func TestKubeAPIConfig(t *testing.T) {
etcdHosts := []*hosts.Host{
&hosts.Host{
RKEConfigNode: v3.RKEConfigNode{
Address: "1.1.1.1",
InternalAddress: "1.1.1.1",
Role: []string{"etcd"},
HostnameOverride: "etcd1",
},
DClient: nil,
},
&hosts.Host{
RKEConfigNode: v3.RKEConfigNode{
Address: "2.2.2.2",
InternalAddress: "2.2.2.2",
Role: []string{"etcd"},
HostnameOverride: "etcd2",
},
DClient: nil,
},
}
cpHost := &hosts.Host{
RKEConfigNode: v3.RKEConfigNode{
Address: "3.3.3.3",
InternalAddress: "3.3.3.3",
Role: []string{"controlplane"},
HostnameOverride: "node1",
},
DClient: nil,
}
kubeAPIService := v3.KubeAPIService{}
kubeAPIService.Image = TestKubeAPIImage
kubeAPIService.ServiceClusterIPRange = "10.0.0.0/16"
kubeAPIService.ExtraArgs = map[string]string{"foo": "bar"}
// Test init cluster string
etcdConnString := GetEtcdConnString(etcdHosts)
assertEqual(t, etcdConnString, TestEtcdConnString, "")
imageCfg, hostCfg := buildKubeAPIConfig(cpHost, kubeAPIService, etcdConnString, "")
// Test image and host config
assertEqual(t, isStringInSlice(TestInsecureBindAddress, imageCfg.Entrypoint), true,
fmt.Sprintf("Failed to find [%s] in Entrypoint of KubeAPI", TestInsecureBindAddress))
assertEqual(t, TestKubeAPIImage, imageCfg.Image,
fmt.Sprintf("Failed to find correct image [%s] in KubeAPI Config", TestKubeAPIImage))
assertEqual(t, isStringInSlice(TestKubeAPIVolumeBind, hostCfg.Binds), true,
fmt.Sprintf("Failed to find [%s] in volume binds of KubeAPI", TestKubeAPIVolumeBind))
assertEqual(t, isStringInSlice(TestKubeAPIExtraArgs, imageCfg.Entrypoint), true,
fmt.Sprintf("Failed to find [%s] in extra args of KubeAPI", TestKubeAPIExtraArgs))
}
......@@ -2,63 +2,20 @@ package services
import (
"context"
"fmt"
"github.com/docker/docker/api/types/container"
"github.com/rancher/rke/docker"
"github.com/rancher/rke/hosts"
"github.com/rancher/rke/pki"
"github.com/rancher/types/apis/management.cattle.io/v3"
)
func runKubeController(ctx context.Context, host *hosts.Host, kubeControllerService v3.KubeControllerService, authorizationMode string, df hosts.DialerFactory, prsMap map[string]v3.PrivateRegistry) error {
imageCfg, hostCfg := buildKubeControllerConfig(kubeControllerService, authorizationMode)
func runKubeController(ctx context.Context, host *hosts.Host, df hosts.DialerFactory, prsMap map[string]v3.PrivateRegistry, controllerProcess v3.Process) error {
imageCfg, hostCfg, healthCheckURL := getProcessConfig(controllerProcess)
if err := docker.DoRunContainer(ctx, host.DClient, imageCfg, hostCfg, KubeControllerContainerName, host.Address, ControlRole, prsMap); err != nil {
return err
}
return runHealthcheck(ctx, host, KubeControllerPort, false, KubeControllerContainerName, df)
return runHealthcheck(ctx, host, KubeControllerContainerName, df, healthCheckURL)
}
func removeKubeController(ctx context.Context, host *hosts.Host) error {
return docker.DoRemoveContainer(ctx, host.DClient, KubeControllerContainerName, host.Address)
}
func buildKubeControllerConfig(kubeControllerService v3.KubeControllerService, authorizationMode string) (*container.Config, *container.HostConfig) {
imageCfg := &container.Config{
Image: kubeControllerService.Image,
Entrypoint: []string{"/opt/rke/entrypoint.sh",
"kube-controller-manager",
"--address=0.0.0.0",
"--cloud-provider=",
"--leader-elect=true",
"--kubeconfig=" + pki.GetConfigPath(pki.KubeControllerCertName),
"--enable-hostpath-provisioner=false",
"--node-monitor-grace-period=40s",
"--pod-eviction-timeout=5m0s",
"--v=2",
"--allocate-node-cidrs=true",
"--cluster-cidr=" + kubeControllerService.ClusterCIDR,
"--service-cluster-ip-range=" + kubeControllerService.ServiceClusterIPRange,
"--service-account-private-key-file=" + pki.GetKeyPath(pki.KubeAPICertName),
"--root-ca-file=" + pki.GetCertPath(pki.CACertName),
},
}
if authorizationMode == RBACAuthorizationMode {
imageCfg.Cmd = append(imageCfg.Cmd, "--use-service-account-credentials=true")
}
hostCfg := &container.HostConfig{
VolumesFrom: []string{
SidekickContainerName,
},
Binds: []string{
"/etc/kubernetes:/etc/kubernetes:z",
},
NetworkMode: "host",
RestartPolicy: container.RestartPolicy{Name: "always"},
}
for arg, value := range kubeControllerService.ExtraArgs {
cmd := fmt.Sprintf("--%s=%s", arg, value)
imageCfg.Entrypoint = append(imageCfg.Entrypoint, cmd)
}
return imageCfg, hostCfg
}
package services
import (
"fmt"
"testing"
"github.com/rancher/types/apis/management.cattle.io/v3"
)
const (
TestKubeControllerClusterCidr = "10.0.0.0/16"
TestKubeControllerServiceClusterIPRange = "10.1.0.0/16"
TestKubeControllerImage = "rancher/k8s:latest"
TestKubeControllerVolumeBind = "/etc/kubernetes:/etc/kubernetes:z"
TestKubeControllerExtraArgs = "--foo=bar"
TestClusterCidrPrefix = "--cluster-cidr="
TestServiceIPRangePrefix = "--service-cluster-ip-range="
)
func TestKubeControllerConfig(t *testing.T) {
kubeControllerService := v3.KubeControllerService{}
kubeControllerService.Image = TestKubeControllerImage
kubeControllerService.ClusterCIDR = TestKubeControllerClusterCidr
kubeControllerService.ServiceClusterIPRange = TestKubeControllerServiceClusterIPRange
kubeControllerService.ExtraArgs = map[string]string{"foo": "bar"}
imageCfg, hostCfg := buildKubeControllerConfig(kubeControllerService, "")
// Test image and host config
assertEqual(t, isStringInSlice(TestClusterCidrPrefix+TestKubeControllerClusterCidr, imageCfg.Entrypoint), true,
fmt.Sprintf("Failed to find [%s] in KubeController Command", TestClusterCidrPrefix+TestKubeControllerClusterCidr))
assertEqual(t, isStringInSlice(TestServiceIPRangePrefix+TestKubeControllerServiceClusterIPRange, imageCfg.Entrypoint), true,
fmt.Sprintf("Failed to find [%s] in KubeController Command", TestServiceIPRangePrefix+TestKubeControllerServiceClusterIPRange))
assertEqual(t, TestKubeControllerImage, imageCfg.Image,
fmt.Sprintf("Failed to verify [%s] as KubeController Image", TestKubeControllerImage))
assertEqual(t, isStringInSlice(TestKubeControllerVolumeBind, hostCfg.Binds), true,
fmt.Sprintf("Failed to find [%s] in volume binds of KubeController", TestKubeControllerVolumeBind))
assertEqual(t, isStringInSlice(TestKubeControllerExtraArgs, imageCfg.Entrypoint), true,
fmt.Sprintf("Failed to find [%s] in extra args of KubeController", TestKubeControllerExtraArgs))
assertEqual(t, true, hostCfg.NetworkMode.IsHost(), "")
}
......@@ -2,79 +2,20 @@ package services
import (
"context"
"fmt"
"strconv"
"github.com/docker/docker/api/types/container"
"github.com/rancher/rke/docker"
"github.com/rancher/rke/hosts"
"github.com/rancher/rke/pki"
"github.com/rancher/types/apis/management.cattle.io/v3"
)
func runKubelet(ctx context.Context, host *hosts.Host, kubeletService v3.KubeletService, df hosts.DialerFactory, prsMap map[string]v3.PrivateRegistry) error {
imageCfg, hostCfg := buildKubeletConfig(host, kubeletService)
func runKubelet(ctx context.Context, host *hosts.Host, df hosts.DialerFactory, prsMap map[string]v3.PrivateRegistry, kubeletProcess v3.Process) error {
imageCfg, hostCfg, healthCheckURL := getProcessConfig(kubeletProcess)
if err := docker.DoRunContainer(ctx, host.DClient, imageCfg, hostCfg, KubeletContainerName, host.Address, WorkerRole, prsMap); err != nil {
return err
}
return runHealthcheck(ctx, host, KubeletPort, true, KubeletContainerName, df)
return runHealthcheck(ctx, host, KubeletContainerName, df, healthCheckURL)
}
func removeKubelet(ctx context.Context, host *hosts.Host) error {
return docker.DoRemoveContainer(ctx, host.DClient, KubeletContainerName, host.Address)
}
func buildKubeletConfig(host *hosts.Host, kubeletService v3.KubeletService) (*container.Config, *container.HostConfig) {
imageCfg := &container.Config{
Image: kubeletService.Image,
Entrypoint: []string{"/opt/rke/entrypoint.sh",
"kubelet",
"--v=2",
"--address=0.0.0.0",
"--cluster-domain=" + kubeletService.ClusterDomain,
"--pod-infra-container-image=" + kubeletService.InfraContainerImage,
"--cgroups-per-qos=True",
"--enforce-node-allocatable=",
"--hostname-override=" + host.HostnameOverride,
"--cluster-dns=" + kubeletService.ClusterDNSServer,
"--network-plugin=cni",
"--cni-conf-dir=/etc/cni/net.d",
"--cni-bin-dir=/opt/cni/bin",
"--resolv-conf=/etc/resolv.conf",
"--allow-privileged=true",
"--cloud-provider=",
"--kubeconfig=" + pki.GetConfigPath(pki.KubeNodeCertName),
"--volume-plugin-dir=/var/lib/kubelet/volumeplugins",
"--require-kubeconfig=True",
"--fail-swap-on=" + strconv.FormatBool(kubeletService.FailSwapOn),
},
}
hostCfg := &container.HostConfig{
VolumesFrom: []string{
SidekickContainerName,
},
Binds: []string{
"/etc/kubernetes:/etc/kubernetes:z",
"/etc/cni:/etc/cni:ro,z",
"/opt/cni:/opt/cni:ro,z",
"/etc/resolv.conf:/etc/resolv.conf",
"/sys:/sys",
"/var/lib/docker:/var/lib/docker:rw,z",
"/var/lib/kubelet:/var/lib/kubelet:shared,z",
"/var/run:/var/run:rw",
"/run:/run",
"/etc/ceph:/etc/ceph",
"/dev:/host/dev",
"/var/log/containers:/var/log/containers:z",
"/var/log/pods:/var/log/pods:z"},
NetworkMode: "host",
PidMode: "host",
Privileged: true,
RestartPolicy: container.RestartPolicy{Name: "always"},
}
for arg, value := range kubeletService.ExtraArgs {
cmd := fmt.Sprintf("--%s=%s", arg, value)
imageCfg.Entrypoint = append(imageCfg.Entrypoint, cmd)
}
return imageCfg, hostCfg
}
package services
import (
"fmt"
"testing"
"github.com/rancher/rke/hosts"
"github.com/rancher/types/apis/management.cattle.io/v3"
)
const (
TestKubeletClusterDomain = "cluster.local"
TestKubeletClusterDNSServer = "10.1.0.3"
TestKubeletInfraContainerImage = "test/test:latest"
TestKubeletImage = "rancher/k8s:latest"
TestKubeletVolumeBind = "/etc/kubernetes:/etc/kubernetes:z"
TestKubeletExtraArgs = "--foo=bar"
TestClusterDomainPrefix = "--cluster-domain="
TestClusterDNSServerPrefix = "--cluster-dns="
TestInfraContainerImagePrefix = "--pod-infra-container-image="
TestHostnameOverridePrefix = "--hostname-override="
)
func TestKubeletConfig(t *testing.T) {
host := &hosts.Host{
RKEConfigNode: v3.RKEConfigNode{
Address: "1.1.1.1",
InternalAddress: "1.1.1.1",
Role: []string{"worker", "controlplane", "etcd"},
HostnameOverride: "node1",
},
DClient: nil,
}
kubeletService := v3.KubeletService{}
kubeletService.Image = TestKubeletImage
kubeletService.ClusterDomain = TestKubeletClusterDomain
kubeletService.ClusterDNSServer = TestKubeletClusterDNSServer
kubeletService.InfraContainerImage = TestKubeletInfraContainerImage
kubeletService.ExtraArgs = map[string]string{"foo": "bar"}
imageCfg, hostCfg := buildKubeletConfig(host, kubeletService)
// Test image and host config
assertEqual(t, isStringInSlice(TestClusterDomainPrefix+TestKubeletClusterDomain, imageCfg.Entrypoint), true,
fmt.Sprintf("Failed to find [%s] in Kubelet Command", TestClusterDomainPrefix+TestKubeletClusterDomain))
assertEqual(t, isStringInSlice(TestClusterDNSServerPrefix+TestKubeletClusterDNSServer, imageCfg.Entrypoint), true,
fmt.Sprintf("Failed to find [%s] in Kubelet Command", TestClusterDNSServerPrefix+TestKubeletClusterDNSServer))
assertEqual(t, isStringInSlice(TestInfraContainerImagePrefix+TestKubeletInfraContainerImage, imageCfg.Entrypoint), true,
fmt.Sprintf("Failed to find [%s] in Kubelet Command", TestInfraContainerImagePrefix+TestKubeletInfraContainerImage))
assertEqual(t, TestKubeletImage, imageCfg.Image,
fmt.Sprintf("Failed to verify [%s] as Kubelet Image", TestKubeletImage))
assertEqual(t, isStringInSlice(TestHostnameOverridePrefix+host.HostnameOverride, imageCfg.Entrypoint), true,
fmt.Sprintf("Failed to find [%s] in Kubelet Command", TestHostnameOverridePrefix+host.HostnameOverride))
assertEqual(t, isStringInSlice(TestKubeletVolumeBind, hostCfg.Binds), true,
fmt.Sprintf("Failed to find [%s] in Kubelet Volume Binds", TestKubeletVolumeBind))
assertEqual(t, isStringInSlice(TestKubeletExtraArgs, imageCfg.Entrypoint), true,
fmt.Sprintf("Failed to find [%s] in Kubelet extra args", TestKubeletExtraArgs))
assertEqual(t, true, hostCfg.Privileged,
"Failed to verify that Kubelet is privileged")
assertEqual(t, true, hostCfg.PidMode.IsHost(),
"Failed to verify that Kubelet has host PID mode")
assertEqual(t, true, hostCfg.NetworkMode.IsHost(),
"Failed to verify that Kubelet has host Network mode")
}
......@@ -2,51 +2,20 @@ package services
import (
"context"
"fmt"
"github.com/docker/docker/api/types/container"
"github.com/rancher/rke/docker"
"github.com/rancher/rke/hosts"
"github.com/rancher/rke/pki"
"github.com/rancher/types/apis/management.cattle.io/v3"
)
func runKubeproxy(ctx context.Context, host *hosts.Host, kubeproxyService v3.KubeproxyService, df hosts.DialerFactory, prsMap map[string]v3.PrivateRegistry) error {
imageCfg, hostCfg := buildKubeproxyConfig(host, kubeproxyService)
func runKubeproxy(ctx context.Context, host *hosts.Host, df hosts.DialerFactory, prsMap map[string]v3.PrivateRegistry, kubeProxyProcess v3.Process) error {
imageCfg, hostCfg, healthCheckURL := getProcessConfig(kubeProxyProcess)
if err := docker.DoRunContainer(ctx, host.DClient, imageCfg, hostCfg, KubeproxyContainerName, host.Address, WorkerRole, prsMap); err != nil {
return err
}
return runHealthcheck(ctx, host, KubeproxyPort, false, KubeproxyContainerName, df)
return runHealthcheck(ctx, host, KubeproxyContainerName, df, healthCheckURL)
}
func removeKubeproxy(ctx context.Context, host *hosts.Host) error {
return docker.DoRemoveContainer(ctx, host.DClient, KubeproxyContainerName, host.Address)
}
func buildKubeproxyConfig(host *hosts.Host, kubeproxyService v3.KubeproxyService) (*container.Config, *container.HostConfig) {
imageCfg := &container.Config{
Image: kubeproxyService.Image,
Entrypoint: []string{"/opt/rke/entrypoint.sh",
"kube-proxy",
"--v=2",
"--healthz-bind-address=0.0.0.0",
"--kubeconfig=" + pki.GetConfigPath(pki.KubeProxyCertName),
},
}
hostCfg := &container.HostConfig{
VolumesFrom: []string{
SidekickContainerName,
},
Binds: []string{
"/etc/kubernetes:/etc/kubernetes:z",
},
NetworkMode: "host",
RestartPolicy: container.RestartPolicy{Name: "always"},
Privileged: true,
}
for arg, value := range kubeproxyService.ExtraArgs {
cmd := fmt.Sprintf("--%s=%s", arg, value)
imageCfg.Entrypoint = append(imageCfg.Entrypoint, cmd)
}
return imageCfg, hostCfg
}
package services
import (
"fmt"
"testing"
"github.com/rancher/types/apis/management.cattle.io/v3"
)
const (
TestKubeproxyImage = "rancher/k8s:latest"
TestKubeproxyVolumeBind = "/etc/kubernetes:/etc/kubernetes:z"
TestKubeproxyExtraArgs = "--foo=bar"
)
func TestKubeproxyConfig(t *testing.T) {
kubeproxyService := v3.KubeproxyService{}
kubeproxyService.Image = TestKubeproxyImage
kubeproxyService.ExtraArgs = map[string]string{"foo": "bar"}
imageCfg, hostCfg := buildKubeproxyConfig(nil, kubeproxyService)
// Test image and host config
assertEqual(t, TestKubeproxyImage, imageCfg.Image,
fmt.Sprintf("Failed to verify [%s] as KubeProxy Image", TestKubeproxyImage))
assertEqual(t, isStringInSlice(TestKubeproxyVolumeBind, hostCfg.Binds), true,
fmt.Sprintf("Failed to find [%s] in KubeProxy Volume Binds", TestKubeproxyVolumeBind))
assertEqual(t, isStringInSlice(TestKubeproxyExtraArgs, imageCfg.Entrypoint), true,
fmt.Sprintf("Failed to find [%s] in KubeProxy extra args", TestKubeproxyExtraArgs))
assertEqual(t, true, hostCfg.Privileged,
"Failed to verify that KubeProxy is privileged")
assertEqual(t, true, hostCfg.NetworkMode.IsHost(),
"Failed to verify that KubeProxy has host Network mode")
}
......@@ -2,9 +2,7 @@ package services
import (
"context"
"fmt"
"github.com/docker/docker/api/types/container"
"github.com/rancher/rke/docker"
"github.com/rancher/rke/hosts"
"github.com/rancher/types/apis/management.cattle.io/v3"
......@@ -15,48 +13,11 @@ const (
NginxProxyEnvName = "CP_HOSTS"
)
func RollingUpdateNginxProxy(ctx context.Context, cpHosts []*hosts.Host, workerHosts []*hosts.Host, nginxProxyImage string, prsMap map[string]v3.PrivateRegistry) error {
nginxProxyEnv := buildProxyEnv(cpHosts)
for _, host := range workerHosts {
imageCfg, hostCfg := buildNginxProxyConfig(host, nginxProxyEnv, nginxProxyImage)
if err := docker.DoRollingUpdateContainer(ctx, host.DClient, imageCfg, hostCfg, NginxProxyContainerName, host.Address, WorkerRole, prsMap); err != nil {
return err
}
}
return nil
}
func runNginxProxy(ctx context.Context, host *hosts.Host, cpHosts []*hosts.Host, nginxProxyImage string, prsMap map[string]v3.PrivateRegistry) error {
nginxProxyEnv := buildProxyEnv(cpHosts)
imageCfg, hostCfg := buildNginxProxyConfig(host, nginxProxyEnv, nginxProxyImage)
func runNginxProxy(ctx context.Context, host *hosts.Host, prsMap map[string]v3.PrivateRegistry, proxyProcess v3.Process) error {
imageCfg, hostCfg, _ := getProcessConfig(proxyProcess)
return docker.DoRunContainer(ctx, host.DClient, imageCfg, hostCfg, NginxProxyContainerName, host.Address, WorkerRole, prsMap)
}
func removeNginxProxy(ctx context.Context, host *hosts.Host) error {
return docker.DoRemoveContainer(ctx, host.DClient, NginxProxyContainerName, host.Address)
}
func buildNginxProxyConfig(host *hosts.Host, nginxProxyEnv, nginxProxyImage string) (*container.Config, *container.HostConfig) {
imageCfg := &container.Config{
Image: nginxProxyImage,
Env: []string{fmt.Sprintf("%s=%s", NginxProxyEnvName, nginxProxyEnv)},
Cmd: []string{fmt.Sprintf("%s=%s", NginxProxyEnvName, nginxProxyEnv)},
}
hostCfg := &container.HostConfig{
NetworkMode: "host",
RestartPolicy: container.RestartPolicy{Name: "always"},
}
return imageCfg, hostCfg
}
func buildProxyEnv(cpHosts []*hosts.Host) string {
proxyEnv := ""
for i, cpHost := range cpHosts {
proxyEnv += fmt.Sprintf("%s", cpHost.InternalAddress)
if i < (len(cpHosts) - 1) {
proxyEnv += ","
}
}
return proxyEnv
}
package services
import (
"fmt"
"testing"
"github.com/rancher/rke/hosts"
"github.com/rancher/types/apis/management.cattle.io/v3"
)
const (
TestNginxProxyImage = "test/test:latest"
TestNginxProxyConnectionString = "1.1.1.1,2.2.2.2"
)
func TestNginxProxyConfig(t *testing.T) {
cpHosts := []*hosts.Host{
&hosts.Host{
RKEConfigNode: v3.RKEConfigNode{
Address: "1.1.1.1",
InternalAddress: "1.1.1.1",
Role: []string{"controlplane"},
HostnameOverride: "cp1",
},
DClient: nil,
},
&hosts.Host{
RKEConfigNode: v3.RKEConfigNode{
Address: "2.2.2.2",
InternalAddress: "2.2.2.2",
Role: []string{"controlplane"},
HostnameOverride: "cp1",
},
DClient: nil,
},
}
nginxProxyImage := TestNginxProxyImage
nginxProxyEnv := buildProxyEnv(cpHosts)
assertEqual(t, TestNginxProxyConnectionString, nginxProxyEnv,
fmt.Sprintf("Failed to verify nginx connection string [%s]", TestNginxProxyConnectionString))
imageCfg, hostCfg := buildNginxProxyConfig(nil, nginxProxyEnv, nginxProxyImage)
// Test image and host config
assertEqual(t, TestNginxProxyImage, imageCfg.Image,
fmt.Sprintf("Failed to verify [%s] as Nginx Proxy Image", TestNginxProxyImage))
assertEqual(t, true, hostCfg.NetworkMode.IsHost(),
"Failed to verify that Nginx Proxy has host Network mode")
}
......@@ -2,51 +2,20 @@ package services
import (
"context"
"fmt"
"github.com/docker/docker/api/types/container"
"github.com/rancher/rke/docker"
"github.com/rancher/rke/hosts"
"github.com/rancher/rke/pki"
"github.com/rancher/types/apis/management.cattle.io/v3"
)
func runScheduler(ctx context.Context, host *hosts.Host, schedulerService v3.SchedulerService, df hosts.DialerFactory, prsMap map[string]v3.PrivateRegistry) error {
imageCfg, hostCfg := buildSchedulerConfig(host, schedulerService)
func runScheduler(ctx context.Context, host *hosts.Host, df hosts.DialerFactory, prsMap map[string]v3.PrivateRegistry, schedulerProcess v3.Process) error {
imageCfg, hostCfg, healthCheckURL := getProcessConfig(schedulerProcess)
if err := docker.DoRunContainer(ctx, host.DClient, imageCfg, hostCfg, SchedulerContainerName, host.Address, ControlRole, prsMap); err != nil {
return err
}
return runHealthcheck(ctx, host, SchedulerPort, false, SchedulerContainerName, df)
return runHealthcheck(ctx, host, SchedulerContainerName, df, healthCheckURL)
}
func removeScheduler(ctx context.Context, host *hosts.Host) error {
return docker.DoRemoveContainer(ctx, host.DClient, SchedulerContainerName, host.Address)
}
func buildSchedulerConfig(host *hosts.Host, schedulerService v3.SchedulerService) (*container.Config, *container.HostConfig) {
imageCfg := &container.Config{
Image: schedulerService.Image,
Entrypoint: []string{"/opt/rke/entrypoint.sh",
"kube-scheduler",
"--leader-elect=true",
"--v=2",
"--address=0.0.0.0",
"--kubeconfig=" + pki.GetConfigPath(pki.KubeSchedulerCertName),
},
}
hostCfg := &container.HostConfig{
VolumesFrom: []string{
SidekickContainerName,
},
Binds: []string{
"/etc/kubernetes:/etc/kubernetes:z",
},
NetworkMode: "host",
RestartPolicy: container.RestartPolicy{Name: "always"},
}
for arg, value := range schedulerService.ExtraArgs {
cmd := fmt.Sprintf("--%s=%s", arg, value)
imageCfg.Entrypoint = append(imageCfg.Entrypoint, cmd)
}
return imageCfg, hostCfg
}
Supports Markdown
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment