Unverified Commit 4db57ff6 authored by Donnie Adams's avatar Donnie Adams Committed by GitHub
Browse files

Merge pull request #30313 from thedadams/eks-private-network-2.5

[2.5] Allow communication with EKS clusters with private API endpoint
Showing with 87 additions and 23 deletions
+87 -23
......@@ -11,10 +11,9 @@ import (
"strings"
"sync"
v32 "github.com/rancher/rancher/pkg/apis/management.cattle.io/v3"
"github.com/rancher/norman/httperror"
factory "github.com/rancher/rancher/pkg/dialer"
v32 "github.com/rancher/rancher/pkg/apis/management.cattle.io/v3"
dialer2 "github.com/rancher/rancher/pkg/dialer"
v3 "github.com/rancher/rancher/pkg/generated/norman/management.cattle.io/v3"
"github.com/rancher/rancher/pkg/types/config/dialer"
"k8s.io/apimachinery/pkg/runtime/schema"
......@@ -173,7 +172,7 @@ func (r *RemoteService) getTransport() (http.RoundTripper, error) {
return nil, err
}
transport.DialContext = d
if factory.IsCloudDriver(newCluster) {
if dialer2.IsPublicCloudDriver(newCluster) {
transport.Proxy = http.ProxyFromEnvironment
}
}
......
......@@ -10,12 +10,12 @@ import (
"sync"
"time"
v32 "github.com/rancher/rancher/pkg/apis/management.cattle.io/v3"
"github.com/pkg/errors"
"github.com/rancher/norman/types"
v32 "github.com/rancher/rancher/pkg/apis/management.cattle.io/v3"
util "github.com/rancher/rancher/pkg/cluster"
"github.com/rancher/rancher/pkg/clustermanager"
"github.com/rancher/rancher/pkg/dialer"
v3 "github.com/rancher/rancher/pkg/generated/norman/management.cattle.io/v3"
"github.com/rancher/rancher/pkg/image"
"github.com/rancher/rancher/pkg/kubectl"
......@@ -219,6 +219,11 @@ func getDesiredImage(cluster *v3.Cluster) string {
}
func (cd *clusterDeploy) deployAgent(cluster *v3.Cluster) error {
if dialer.HasPublicAPIEndpoint(cluster) {
logrus.Debugf("clusterDeploy: deployAgent: cluster [%s] is private so agent cannot be deployed automatically", cluster.Name)
return nil
}
if cluster.Spec.Internal {
return nil
}
......
......@@ -21,6 +21,7 @@ import (
"github.com/rancher/rancher/pkg/catalog/manager"
"github.com/rancher/rancher/pkg/controllers/management/eksupstreamrefresh"
"github.com/rancher/rancher/pkg/controllers/management/rbac"
"github.com/rancher/rancher/pkg/dialer"
v3 "github.com/rancher/rancher/pkg/generated/controllers/management.cattle.io/v3"
corev1 "github.com/rancher/rancher/pkg/generated/norman/core/v1"
mgmtv3 "github.com/rancher/rancher/pkg/generated/norman/management.cattle.io/v3"
......@@ -31,6 +32,7 @@ import (
"github.com/rancher/rancher/pkg/ref"
"github.com/rancher/rancher/pkg/systemaccount"
"github.com/rancher/rancher/pkg/types/config"
typesDialer "github.com/rancher/rancher/pkg/types/config/dialer"
"github.com/rancher/rancher/pkg/wrangler"
wranglerv1 "github.com/rancher/wrangler/pkg/generated/controllers/core/v1"
"github.com/sirupsen/logrus"
......@@ -44,6 +46,7 @@ import (
"k8s.io/client-go/dynamic"
"k8s.io/client-go/kubernetes"
"k8s.io/client-go/rest"
"k8s.io/client-go/util/retry"
"sigs.k8s.io/aws-iam-authenticator/pkg/token"
"sigs.k8s.io/yaml"
)
......@@ -77,6 +80,7 @@ type eksOperatorController struct {
catalogManager manager.CatalogManager
systemAccountManager *systemaccount.Manager
dynamicClient dynamic.NamespaceableResourceInterface
clientDialer typesDialer.Factory
}
func Register(ctx context.Context, wContext *wrangler.Context, mgmtCtx *config.ManagementContext) {
......@@ -99,6 +103,7 @@ func Register(ctx context.Context, wContext *wrangler.Context, mgmtCtx *config.M
catalogManager: mgmtCtx.CatalogManager,
systemAccountManager: systemaccount.NewManager(mgmtCtx),
dynamicClient: eksCCDynamicClient,
clientDialer: mgmtCtx.Dialer,
}
wContext.Mgmt.Cluster().OnChange(ctx, "eks-operator-controller", e.onClusterChange)
......@@ -281,6 +286,14 @@ func (e *eksOperatorController) onClusterChange(key string, cluster *mgmtv3.Clus
cluster, err = e.generateAndSetServiceAccount(cluster)
if err != nil {
var statusErr error
if strings.Contains(err.Error(), fmt.Sprintf(dialer.WaitForAgentError, cluster.Name)) {
// In this case, the API endpoint is private and rancher is waiting for the import cluster command to be run.
cluster, statusErr = e.setUnknown(cluster, apimgmtv3.ClusterConditionWaiting, "waiting for cluster agent to be deployed")
if statusErr == nil {
e.clusterEnqueueAfter(cluster.Name, enqueueTime)
}
return cluster, statusErr
}
cluster, statusErr = e.setFalse(cluster, apimgmtv3.ClusterConditionWaiting,
fmt.Sprintf("failed to communicate with cluster: %v", err))
if statusErr != nil {
......@@ -391,9 +404,8 @@ func (e *eksOperatorController) updateEKSClusterConfig(cluster *mgmtv3.Cluster,
}
}
// generateAndSetServiceAccount reads the EKSClusterConfig's secret once available and uses its fields to generate a
// service account token. The token, CA cert, and API endpoint are then copied to the cluster status.
func (e *eksOperatorController) generateAndSetServiceAccount(cluster *mgmtv3.Cluster) (*mgmtv3.Cluster, error) {
// getCAAndAPIEndpoint reads the EKSClusterConfig's secret once available. The CA cert and API endpoint are then copied to the cluster status.
func (e *eksOperatorController) getCAAndAPIEndpoint(cluster *mgmtv3.Cluster) (*mgmtv3.Cluster, error) {
backoff := wait.Backoff{
Duration: 2 * time.Second,
Factor: 2,
......@@ -419,22 +431,51 @@ func (e *eksOperatorController) generateAndSetServiceAccount(cluster *mgmtv3.Clu
return cluster, fmt.Errorf("failed waiting for cluster [%s] secret: %s", cluster.Name, err)
}
logrus.Infof("generating service account token for cluster [%s]", cluster.Name)
apiEndpoint := string(caSecret.Data["endpoint"])
caCert := string(caSecret.Data["ca"])
if cluster.Status.APIEndpoint == apiEndpoint && cluster.Status.CACert == caCert {
return cluster, nil
}
var currentCluster *mgmtv3.Cluster
err = retry.RetryOnConflict(retry.DefaultRetry, func() error {
currentCluster, err = e.clusterClient.Get(cluster.Name, v1.GetOptions{})
if err != nil {
return err
}
currentCluster.Status.APIEndpoint = apiEndpoint
currentCluster.Status.CACert = caCert
currentCluster, err = e.clusterClient.Update(currentCluster)
if err != nil {
return err
}
return nil
})
return currentCluster, err
}
// generateAndSetServiceAccount uses the API endpoint and CA cert to generate a service account token. The token is then copied to the cluster status.
func (e *eksOperatorController) generateAndSetServiceAccount(cluster *mgmtv3.Cluster) (*mgmtv3.Cluster, error) {
var err error
cluster, err = e.getCAAndAPIEndpoint(cluster)
if err != nil {
return cluster, err
}
clusterDialer, err := e.clientDialer.ClusterDialer(cluster.Name)
if err != nil {
return cluster, err
}
sess, _, err := controller.StartAWSSessions(e.secretsCache, *cluster.Spec.EKSConfig)
if err != nil {
return cluster, err
}
endpoint := string(caSecret.Data["endpoint"])
ca := string(caSecret.Data["ca"])
saToken, err := generateSAToken(sess, cluster.Spec.EKSConfig.DisplayName, endpoint, ca)
saToken, err := generateSAToken(sess, cluster.Spec.EKSConfig.DisplayName, cluster.Status.APIEndpoint, cluster.Status.CACert, clusterDialer)
if err != nil {
return cluster, err
}
cluster = cluster.DeepCopy()
cluster.Status.APIEndpoint = endpoint
cluster.Status.CACert = ca
cluster.Status.ServiceAccountToken = saToken
return e.clusterClient.Update(cluster)
}
......@@ -567,7 +608,7 @@ func (e *eksOperatorController) deployEKSOperator() error {
return nil
}
func generateSAToken(sess *session.Session, clusterID, endpoint, ca string) (string, error) {
func generateSAToken(sess *session.Session, clusterID, endpoint, ca string, dialer typesDialer.Dialer) (string, error) {
decodedCA, err := base64.StdEncoding.DecodeString(ca)
if err != nil {
return "", err
......@@ -586,15 +627,16 @@ func generateSAToken(sess *session.Session, clusterID, endpoint, ca string) (str
return "", err
}
config := &rest.Config{
restConfig := &rest.Config{
Host: endpoint,
TLSClientConfig: rest.TLSClientConfig{
CAData: decodedCA,
},
BearerToken: awsToken.Token,
Dial: dialer,
}
clientset, err := kubernetes.NewForConfig(config)
clientset, err := kubernetes.NewForConfig(restConfig)
if err != nil {
return "", fmt.Errorf("error creating clientset: %v", err)
}
......
......@@ -9,9 +9,8 @@ import (
"strings"
"time"
v32 "github.com/rancher/rancher/pkg/apis/management.cattle.io/v3"
"github.com/rancher/norman/types/slice"
v32 "github.com/rancher/rancher/pkg/apis/management.cattle.io/v3"
v3 "github.com/rancher/rancher/pkg/generated/norman/management.cattle.io/v3"
"github.com/rancher/rancher/pkg/tunnelserver"
"github.com/rancher/rancher/pkg/types/config"
......@@ -25,6 +24,10 @@ import (
"k8s.io/apimachinery/pkg/util/wait"
)
const (
WaitForAgentError = "waiting for cluster [%s] agent to connect"
)
func NewFactory(apiContext *config.ScaledContext) (*Factory, error) {
authorizer := tunnelserver.NewAuthorizer(apiContext)
tunneler := tunnelserver.NewTunnelServer(authorizer)
......@@ -65,6 +68,21 @@ func IsCloudDriver(cluster *v3.Cluster) bool {
cluster.Status.Driver != v32.ClusterDriverRancherD
}
func IsPublicCloudDriver(cluster *v3.Cluster) bool {
return IsCloudDriver(cluster) && HasPublicAPIEndpoint(cluster)
}
func HasPublicAPIEndpoint(cluster *v3.Cluster) bool {
switch cluster.Status.Driver {
case v32.ClusterDriverEKS:
return cluster.Status.EKSStatus.UpstreamSpec == nil ||
cluster.Status.EKSStatus.UpstreamSpec.PublicAccess == nil ||
*cluster.Status.EKSStatus.UpstreamSpec.PublicAccess
default:
return true
}
}
func (f *Factory) translateClusterAddress(cluster *v3.Cluster, clusterHostPort, address string) string {
if clusterHostPort != address {
logrus.Tracef("dialerFactory: apiEndpoint clusterHostPort [%s] is not equal to address [%s]", clusterHostPort, address)
......@@ -149,7 +167,7 @@ func (f *Factory) clusterDialer(clusterName, address string) (dialer.Dialer, err
hostPort := hostPort(cluster)
logrus.Tracef("dialerFactory: apiEndpoint hostPort for cluster [%s] is [%s]", clusterName, hostPort)
if (address == hostPort || isProxyAddress(address)) && IsCloudDriver(cluster) {
if (address == hostPort || isProxyAddress(address)) && IsPublicCloudDriver(cluster) {
// For cloud drivers we just connect directly to the k8s API, not through the tunnel. All other go through tunnel
return native()
}
......@@ -214,7 +232,7 @@ func (f *Factory) clusterDialer(clusterName, address string) (dialer.Dialer, err
time.Sleep(wait.Jitter(5*time.Second, 1))
}
return nil, fmt.Errorf("waiting for cluster [%s] agent to connect", cluster.Name)
return nil, fmt.Errorf(WaitForAgentError, cluster.Name)
}
func hostPort(cluster *v3.Cluster) string {
......
Supports Markdown
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment