diff --git a/authz/authz.go b/authz/authz.go index 0ea5b79bc47204bcd8a9ac0228e53a16fdbc549d..a2bbd84d70a950455e2a004fc74cb6b5111eeb27 100644 --- a/authz/authz.go +++ b/authz/authz.go @@ -8,9 +8,9 @@ import ( "github.com/rancher/rke/templates" ) -func ApplyJobDeployerServiceAccount(ctx context.Context, kubeConfigPath string) error { +func ApplyJobDeployerServiceAccount(ctx context.Context, kubeConfigPath string, k8sWrapTransport k8s.WrapTransport) error { log.Infof(ctx, "[authz] Creating rke-job-deployer ServiceAccount") - k8sClient, err := k8s.NewClient(kubeConfigPath) + k8sClient, err := k8s.NewClient(kubeConfigPath, k8sWrapTransport) if err != nil { return err } @@ -24,9 +24,9 @@ func ApplyJobDeployerServiceAccount(ctx context.Context, kubeConfigPath string) return nil } -func ApplySystemNodeClusterRoleBinding(ctx context.Context, kubeConfigPath string) error { +func ApplySystemNodeClusterRoleBinding(ctx context.Context, kubeConfigPath string, k8sWrapTransport k8s.WrapTransport) error { log.Infof(ctx, "[authz] Creating system:node ClusterRoleBinding") - k8sClient, err := k8s.NewClient(kubeConfigPath) + k8sClient, err := k8s.NewClient(kubeConfigPath, k8sWrapTransport) if err != nil { return err } diff --git a/authz/psp.go b/authz/psp.go index ddf849c27bce0139bda725385b0d55320f256d1f..1d988fb3003ec5842a84842e20926b2b96396ae2 100644 --- a/authz/psp.go +++ b/authz/psp.go @@ -8,9 +8,9 @@ import ( "github.com/rancher/rke/templates" ) -func ApplyDefaultPodSecurityPolicy(ctx context.Context, kubeConfigPath string) error { +func ApplyDefaultPodSecurityPolicy(ctx context.Context, kubeConfigPath string, k8sWrapTransport k8s.WrapTransport) error { log.Infof(ctx, "[authz] Applying default PodSecurityPolicy") - k8sClient, err := k8s.NewClient(kubeConfigPath) + k8sClient, err := k8s.NewClient(kubeConfigPath, k8sWrapTransport) if err != nil { return err } @@ -21,9 +21,9 @@ func ApplyDefaultPodSecurityPolicy(ctx context.Context, kubeConfigPath string) e return nil } -func ApplyDefaultPodSecurityPolicyRole(ctx context.Context, kubeConfigPath string) error { +func ApplyDefaultPodSecurityPolicyRole(ctx context.Context, kubeConfigPath string, k8sWrapTransport k8s.WrapTransport) error { log.Infof(ctx, "[authz] Applying default PodSecurityPolicy Role and RoleBinding") - k8sClient, err := k8s.NewClient(kubeConfigPath) + k8sClient, err := k8s.NewClient(kubeConfigPath, k8sWrapTransport) if err != nil { return err } diff --git a/cluster/addons.go b/cluster/addons.go index dbc934356029de9b2b1253d3cb251d190424ae4e..26301702fd9e14d3bf02085682d31b2c1bdf1ae9 100644 --- a/cluster/addons.go +++ b/cluster/addons.go @@ -88,7 +88,7 @@ func (c *Cluster) doAddonDeploy(ctx context.Context, addonYaml, resourceName str func (c *Cluster) StoreAddonConfigMap(ctx context.Context, addonYaml string, addonName string) error { log.Infof(ctx, "[addons] Saving addon ConfigMap to Kubernetes") - kubeClient, err := k8s.NewClient(c.LocalKubeConfigPath) + kubeClient, err := k8s.NewClient(c.LocalKubeConfigPath, c.K8sWrapTransport) if err != nil { return err } @@ -116,7 +116,7 @@ func (c *Cluster) StoreAddonConfigMap(ctx context.Context, addonYaml string, add func (c *Cluster) ApplySystemAddonExcuteJob(addonJob string) error { - if err := k8s.ApplyK8sSystemJob(addonJob, c.LocalKubeConfigPath); err != nil { + if err := k8s.ApplyK8sSystemJob(addonJob, c.LocalKubeConfigPath, c.K8sWrapTransport); err != nil { fmt.Println(err) return err } diff --git a/cluster/cluster.go b/cluster/cluster.go index 32ee1394655188f0e1bdecfb91be0c3390208978..0d41c0aa12798bc1f96e383e0ccf458d8f9786e9 100644 --- a/cluster/cluster.go +++ b/cluster/cluster.go @@ -38,6 +38,7 @@ type Cluster struct { DockerDialerFactory hosts.DialerFactory LocalConnDialerFactory hosts.DialerFactory PrivateRegistriesMap map[string]v3.PrivateRegistry + K8sWrapTransport k8s.WrapTransport } const ( @@ -122,7 +123,8 @@ func ParseCluster( rkeConfig *v3.RancherKubernetesEngineConfig, clusterFilePath, configDir string, dockerDialerFactory, - localConnDialerFactory hosts.DialerFactory) (*Cluster, error) { + localConnDialerFactory hosts.DialerFactory, + k8sWrapTransport k8s.WrapTransport) (*Cluster, error) { var err error c := &Cluster{ RancherKubernetesEngineConfig: *rkeConfig, @@ -130,6 +132,7 @@ func ParseCluster( DockerDialerFactory: dockerDialerFactory, LocalConnDialerFactory: localConnDialerFactory, PrivateRegistriesMap: make(map[string]v3.PrivateRegistry), + K8sWrapTransport: k8sWrapTransport, } // Setting cluster Defaults c.setClusterDefaults(ctx) @@ -187,7 +190,7 @@ func rebuildLocalAdminConfig(ctx context.Context, kubeCluster *Cluster) error { return fmt.Errorf("Failed to redeploy local admin config with new host") } workingConfig = newConfig - if _, err := GetK8sVersion(kubeCluster.LocalKubeConfigPath); err == nil { + if _, err := GetK8sVersion(kubeCluster.LocalKubeConfigPath, kubeCluster.K8sWrapTransport); err == nil { log.Infof(ctx, "[reconcile] host [%s] is active master on the cluster", cpHost.Address) break } @@ -197,8 +200,8 @@ func rebuildLocalAdminConfig(ctx context.Context, kubeCluster *Cluster) error { return nil } -func isLocalConfigWorking(ctx context.Context, localKubeConfigPath string) bool { - if _, err := GetK8sVersion(localKubeConfigPath); err != nil { +func isLocalConfigWorking(ctx context.Context, localKubeConfigPath string, k8sWrapTransport k8s.WrapTransport) bool { + if _, err := GetK8sVersion(localKubeConfigPath, k8sWrapTransport); err != nil { log.Infof(ctx, "[reconcile] Local config is not vaild, rebuilding admin config") return false } @@ -230,22 +233,22 @@ func getLocalAdminConfigWithNewAddress(localConfigPath, cpAddress string) string } func (c *Cluster) ApplyAuthzResources(ctx context.Context) error { - if err := authz.ApplyJobDeployerServiceAccount(ctx, c.LocalKubeConfigPath); err != nil { + if err := authz.ApplyJobDeployerServiceAccount(ctx, c.LocalKubeConfigPath, c.K8sWrapTransport); err != nil { return fmt.Errorf("Failed to apply the ServiceAccount needed for job execution: %v", err) } if c.Authorization.Mode == NoneAuthorizationMode { return nil } if c.Authorization.Mode == services.RBACAuthorizationMode { - if err := authz.ApplySystemNodeClusterRoleBinding(ctx, c.LocalKubeConfigPath); err != nil { + if err := authz.ApplySystemNodeClusterRoleBinding(ctx, c.LocalKubeConfigPath, c.K8sWrapTransport); err != nil { return fmt.Errorf("Failed to apply the ClusterRoleBinding needed for node authorization: %v", err) } } if c.Authorization.Mode == services.RBACAuthorizationMode && c.Services.KubeAPI.PodSecurityPolicy { - if err := authz.ApplyDefaultPodSecurityPolicy(ctx, c.LocalKubeConfigPath); err != nil { + if err := authz.ApplyDefaultPodSecurityPolicy(ctx, c.LocalKubeConfigPath, c.K8sWrapTransport); err != nil { return fmt.Errorf("Failed to apply default PodSecurityPolicy: %v", err) } - if err := authz.ApplyDefaultPodSecurityPolicyRole(ctx, c.LocalKubeConfigPath); err != nil { + if err := authz.ApplyDefaultPodSecurityPolicyRole(ctx, c.LocalKubeConfigPath, c.K8sWrapTransport); err != nil { return fmt.Errorf("Failed to apply default PodSecurityPolicy ClusterRole and ClusterRoleBinding: %v", err) } } @@ -262,7 +265,7 @@ func (c *Cluster) deployAddons(ctx context.Context) error { func (c *Cluster) SyncLabelsAndTaints(ctx context.Context) error { if len(c.ControlPlaneHosts) > 0 { log.Infof(ctx, "[sync] Syncing nodes Labels and Taints") - k8sClient, err := k8s.NewClient(c.LocalKubeConfigPath) + k8sClient, err := k8s.NewClient(c.LocalKubeConfigPath, c.K8sWrapTransport) if err != nil { return fmt.Errorf("Failed to initialize new kubernetes client: %v", err) } @@ -297,9 +300,9 @@ func (c *Cluster) PrePullK8sImages(ctx context.Context) error { return nil } -func ConfigureCluster(ctx context.Context, rkeConfig v3.RancherKubernetesEngineConfig, crtBundle map[string]pki.CertificatePKI, clusterFilePath, configDir string) error { +func ConfigureCluster(ctx context.Context, rkeConfig v3.RancherKubernetesEngineConfig, crtBundle map[string]pki.CertificatePKI, clusterFilePath, configDir string, k8sWrapTransport k8s.WrapTransport) error { // dialer factories are not needed here since we are not uses docker only k8s jobs - kubeCluster, err := ParseCluster(ctx, &rkeConfig, clusterFilePath, configDir, nil, nil) + kubeCluster, err := ParseCluster(ctx, &rkeConfig, clusterFilePath, configDir, nil, nil, k8sWrapTransport) if err != nil { return err } diff --git a/cluster/plan.go b/cluster/plan.go index bc7aa7558e0c533103e1705d4f0c3b7637577c2d..8b84570145d0dbaa2dd037748bc3605c80864c11 100644 --- a/cluster/plan.go +++ b/cluster/plan.go @@ -18,7 +18,7 @@ const ( func GeneratePlan(ctx context.Context, rkeConfig *v3.RancherKubernetesEngineConfig) (v3.RKEPlan, error) { clusterPlan := v3.RKEPlan{} - myCluster, _ := ParseCluster(ctx, rkeConfig, "", "", nil, nil) + myCluster, _ := ParseCluster(ctx, rkeConfig, "", "", nil, nil, nil) // rkeConfig.Nodes are already unique. But they don't have role flags. So I will use the parsed cluster.Hosts to make use of the role flags. uniqHosts := hosts.GetUniqueHostList(myCluster.EtcdHosts, myCluster.ControlPlaneHosts, myCluster.WorkerHosts) for _, host := range uniqHosts { diff --git a/cluster/reconcile.go b/cluster/reconcile.go index 5398afb9e2e69478dae3f8c3bd18748fc5e3d7dd..7b73df59fda7042225c7b640a8482c24ea42dcfb 100644 --- a/cluster/reconcile.go +++ b/cluster/reconcile.go @@ -27,7 +27,7 @@ func ReconcileCluster(ctx context.Context, kubeCluster, currentCluster *Cluster) return nil } - kubeClient, err := k8s.NewClient(kubeCluster.LocalKubeConfigPath) + kubeClient, err := k8s.NewClient(kubeCluster.LocalKubeConfigPath, kubeCluster.K8sWrapTransport) if err != nil { return fmt.Errorf("Failed to initialize new kubernetes client: %v", err) } @@ -90,7 +90,7 @@ func reconcileControl(ctx context.Context, currentCluster, kubeCluster *Cluster, } for _, toDeleteHost := range cpToDelete { - kubeClient, err := k8s.NewClient(kubeCluster.LocalKubeConfigPath) + kubeClient, err := k8s.NewClient(kubeCluster.LocalKubeConfigPath, kubeCluster.K8sWrapTransport) if err != nil { return fmt.Errorf("Failed to initialize new kubernetes client: %v", err) } diff --git a/cluster/state.go b/cluster/state.go index 00bac94d06d393958f93a95f43f249bfe2316a04..bc8dd70e2befc4653f56ab3d21b908117d01116d 100644 --- a/cluster/state.go +++ b/cluster/state.go @@ -19,7 +19,7 @@ func (c *Cluster) SaveClusterState(ctx context.Context, rkeConfig *v3.RancherKub if len(c.ControlPlaneHosts) > 0 { // Reinitialize kubernetes Client var err error - c.KubeClient, err = k8s.NewClient(c.LocalKubeConfigPath) + c.KubeClient, err = k8s.NewClient(c.LocalKubeConfigPath, c.K8sWrapTransport) if err != nil { return fmt.Errorf("Failed to re-initialize Kubernetes Client: %v", err) } @@ -44,14 +44,14 @@ func (c *Cluster) GetClusterState(ctx context.Context) (*Cluster, error) { log.Infof(ctx, "[state] Found local kube config file, trying to get state from cluster") // to handle if current local admin is down and we need to use new cp from the list - if !isLocalConfigWorking(ctx, c.LocalKubeConfigPath) { + if !isLocalConfigWorking(ctx, c.LocalKubeConfigPath, c.K8sWrapTransport) { if err := rebuildLocalAdminConfig(ctx, c); err != nil { return nil, err } } // initiate kubernetes client - c.KubeClient, err = k8s.NewClient(c.LocalKubeConfigPath) + c.KubeClient, err = k8s.NewClient(c.LocalKubeConfigPath, c.K8sWrapTransport) if err != nil { log.Warnf(ctx, "Failed to initiate new Kubernetes Client: %v", err) return nil, nil @@ -140,9 +140,9 @@ func getStateFromKubernetes(ctx context.Context, kubeClient *kubernetes.Clientse } } -func GetK8sVersion(localConfigPath string) (string, error) { +func GetK8sVersion(localConfigPath string, k8sWrapTransport k8s.WrapTransport) (string, error) { logrus.Debugf("[version] Using %s to connect to Kubernetes cluster..", localConfigPath) - k8sClient, err := k8s.NewClient(localConfigPath) + k8sClient, err := k8s.NewClient(localConfigPath, k8sWrapTransport) if err != nil { return "", fmt.Errorf("Failed to create Kubernetes Client: %v", err) } diff --git a/cmd/remove.go b/cmd/remove.go index c8065927ef4375f309f86348bdd9306ee3df4d8a..48d143acfc4324b3adc37ff5a9bd7984f19786aa 100644 --- a/cmd/remove.go +++ b/cmd/remove.go @@ -9,6 +9,7 @@ import ( "github.com/rancher/rke/cluster" "github.com/rancher/rke/hosts" + "github.com/rancher/rke/k8s" "github.com/rancher/rke/log" "github.com/rancher/rke/pki" "github.com/rancher/types/apis/management.cattle.io/v3" @@ -45,10 +46,11 @@ func ClusterRemove( ctx context.Context, rkeConfig *v3.RancherKubernetesEngineConfig, dialerFactory hosts.DialerFactory, + k8sWrapTransport k8s.WrapTransport, local bool, configDir string) error { log.Infof(ctx, "Tearing down Kubernetes cluster") - kubeCluster, err := cluster.ParseCluster(ctx, rkeConfig, clusterFilePath, configDir, dialerFactory, nil) + kubeCluster, err := cluster.ParseCluster(ctx, rkeConfig, clusterFilePath, configDir, dialerFactory, nil, k8sWrapTransport) if err != nil { return err } @@ -94,7 +96,7 @@ func clusterRemoveFromCli(ctx *cli.Context) error { if err != nil { return fmt.Errorf("Failed to parse cluster file: %v", err) } - return ClusterRemove(context.Background(), rkeConfig, nil, false, "") + return ClusterRemove(context.Background(), rkeConfig, nil, nil, false, "") } func clusterRemoveLocal(ctx *cli.Context) error { @@ -111,5 +113,5 @@ func clusterRemoveLocal(ctx *cli.Context) error { } rkeConfig.Nodes = []v3.RKEConfigNode{*cluster.GetLocalRKENodeConfig()} } - return ClusterRemove(context.Background(), rkeConfig, nil, true, "") + return ClusterRemove(context.Background(), rkeConfig, nil, nil, true, "") } diff --git a/cmd/up.go b/cmd/up.go index da2692cf8a8e4c1a77886ec32179f5957118a60b..0adf5eebf402cfb405383061b998147c1df3cdfe 100644 --- a/cmd/up.go +++ b/cmd/up.go @@ -6,6 +6,7 @@ import ( "github.com/rancher/rke/cluster" "github.com/rancher/rke/hosts" + "github.com/rancher/rke/k8s" "github.com/rancher/rke/log" "github.com/rancher/rke/pki" "github.com/rancher/types/apis/management.cattle.io/v3" @@ -40,11 +41,12 @@ func ClusterUp( ctx context.Context, rkeConfig *v3.RancherKubernetesEngineConfig, dockerDialerFactory, localConnDialerFactory hosts.DialerFactory, + k8sWrapTransport k8s.WrapTransport, local bool, configDir string) (string, string, string, string, error) { log.Infof(ctx, "Building Kubernetes cluster") var APIURL, caCrt, clientCert, clientKey string - kubeCluster, err := cluster.ParseCluster(ctx, rkeConfig, clusterFilePath, configDir, dockerDialerFactory, localConnDialerFactory) + kubeCluster, err := cluster.ParseCluster(ctx, rkeConfig, clusterFilePath, configDir, dockerDialerFactory, localConnDialerFactory, k8sWrapTransport) if err != nil { return APIURL, caCrt, clientCert, clientKey, err } @@ -102,7 +104,7 @@ func ClusterUp( return APIURL, caCrt, clientCert, clientKey, err } - err = cluster.ConfigureCluster(ctx, kubeCluster.RancherKubernetesEngineConfig, kubeCluster.Certificates, clusterFilePath, configDir) + err = cluster.ConfigureCluster(ctx, kubeCluster.RancherKubernetesEngineConfig, kubeCluster.Certificates, clusterFilePath, configDir, k8sWrapTransport) if err != nil { return APIURL, caCrt, clientCert, clientKey, err } @@ -131,7 +133,7 @@ func clusterUpFromCli(ctx *cli.Context) error { if err != nil { return fmt.Errorf("Failed to parse cluster file: %v", err) } - _, _, _, _, err = ClusterUp(context.Background(), rkeConfig, nil, nil, false, "") + _, _, _, _, err = ClusterUp(context.Background(), rkeConfig, nil, nil, nil, false, "") return err } @@ -149,6 +151,6 @@ func clusterUpLocal(ctx *cli.Context) error { } rkeConfig.Nodes = []v3.RKEConfigNode{*cluster.GetLocalRKENodeConfig()} } - _, _, _, _, err = ClusterUp(context.Background(), rkeConfig, nil, hosts.LocalHealthcheckFactory, true, "") + _, _, _, _, err = ClusterUp(context.Background(), rkeConfig, nil, hosts.LocalHealthcheckFactory, nil, true, "") return err } diff --git a/cmd/version.go b/cmd/version.go index 142db92a69d0a0792454be2bacaab19f42e77e07..09e80f39fabd83c6f227963736803301b010a502 100644 --- a/cmd/version.go +++ b/cmd/version.go @@ -27,7 +27,8 @@ func VersionCommand() cli.Command { func getClusterVersion(ctx *cli.Context) error { localKubeConfig := pki.GetLocalKubeConfig(ctx.String("config"), "") - serverVersion, err := cluster.GetK8sVersion(localKubeConfig) + // not going to use a k8s dialer here.. this is a CLI command + serverVersion, err := cluster.GetK8sVersion(localKubeConfig, nil) if err != nil { return err } diff --git a/k8s/job.go b/k8s/job.go index 80586c967006d41682a4e463748f5a8e4c3acf49..093fc0f64f4c93b73182f807f884f8bc903c85e1 100644 --- a/k8s/job.go +++ b/k8s/job.go @@ -12,7 +12,7 @@ import ( metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" ) -func ApplyK8sSystemJob(jobYaml, kubeConfigPath string) error { +func ApplyK8sSystemJob(jobYaml, kubeConfigPath string, k8sWrapTransport WrapTransport) error { job := v1.Job{} if err := decodeYamlResource(&job, jobYaml); err != nil { return err @@ -20,7 +20,7 @@ func ApplyK8sSystemJob(jobYaml, kubeConfigPath string) error { if job.Namespace == metav1.NamespaceNone { job.Namespace = metav1.NamespaceSystem } - k8sClient, err := NewClient(kubeConfigPath) + k8sClient, err := NewClient(kubeConfigPath, k8sWrapTransport) if err != nil { return err } diff --git a/k8s/k8s.go b/k8s/k8s.go index b68bd1884ee8859b65858a67c65d3b861ea25f71..97654e2a31e5391c34355b30fc72ead347162674 100644 --- a/k8s/k8s.go +++ b/k8s/k8s.go @@ -2,6 +2,7 @@ package k8s import ( "bytes" + "net/http" "time" yamlutil "k8s.io/apimachinery/pkg/util/yaml" @@ -16,12 +17,17 @@ const ( type k8sCall func(*kubernetes.Clientset, interface{}) error -func NewClient(kubeConfigPath string) (*kubernetes.Clientset, error) { +type WrapTransport func(rt http.RoundTripper) http.RoundTripper + +func NewClient(kubeConfigPath string, k8sWrapTransport WrapTransport) (*kubernetes.Clientset, error) { // use the current admin kubeconfig config, err := clientcmd.BuildConfigFromFlags("", kubeConfigPath) if err != nil { return nil, err } + if k8sWrapTransport != nil { + config.WrapTransport = k8sWrapTransport + } K8sClientSet, err := kubernetes.NewForConfig(config) if err != nil { return nil, err