Unverified Commit be6d6c27 authored by Jerry R. Jackson's avatar Jerry R. Jackson Committed by GitHub
Browse files

Move marking an updating node earlier so we can catch ssh errors (#277)


* Move the test for an updating node earlier so we can catch ssh errors

* Also move the mark for an updating node earlier

* add reference in go.mod for release build

* Make sure to remove the updating annotation on noop upgrade

* catch failures in "Exists" as well as "Update"

* remove debugging output
Co-authored-by: default avatarJerry Jackson <jerry@weave.works>
parent f38f7d1e
Showing with 83 additions and 58 deletions
+83 -58
......@@ -54,7 +54,7 @@ import (
const (
planKey string = "wks.weave.works/node-plan"
upgradeCountKey string = "wks.weave.works/upgrade-count"
updateCountKey string = "wks.weave.works/update-count"
maxUpgradeAttempts int = 5
masterLabel string = "node-role.kubernetes.io/master"
originalMasterLabel string = "wks.weave.works/original-master"
......@@ -111,6 +111,7 @@ type MachineActuator struct {
// Create the machine.
func (a *MachineActuator) Create(ctx context.Context, cluster *clusterv1.Cluster, machine *clusterv1.Machine) error {
contextLog := log.WithFields(log.Fields{"context": ctx, "cluster": *cluster, "machine": *machine})
log.Infof("........................CREATING: %s...........................", machine.Name)
contextLog.Info("creating machine...")
if err := a.create(ctx, cluster, machine); err != nil {
contextLog.Errorf("failed to create machine: %v", err)
......@@ -466,6 +467,21 @@ func (a *MachineActuator) update(ctx context.Context, cluster *clusterv1.Cluster
if err != nil {
return err
}
addr := a.getMachineAddress(machine)
node, err := a.findNodeByPrivateAddress(addr)
if err != nil {
return gerrors.Wrapf(err, "failed to find node by address: %s", addr)
}
contextLog := log.WithFields(log.Fields{"machine": machine.Name, "cluster": cluster.Name, "node": node.Name})
updatingNode, err := a.findUpdatingNode()
if err != nil {
return err
}
if updatingNode != nil && updatingNode.Name != node.Name {
msg := "Can't update node %s; node %s is currently updating..."
contextLog.Infof(msg, node.Name, updatingNode.Name)
return fmt.Errorf(msg, node.Name, updatingNode.Name)
}
installer, closer, err := a.connectTo(machine, c, m)
if err != nil {
return gerrors.Wrapf(err, "failed to establish connection to machine %s", machine.Name)
......@@ -474,26 +490,17 @@ func (a *MachineActuator) update(ctx context.Context, cluster *clusterv1.Cluster
// Bootstrap - set plan on seed node if not present before any updates can occur
if err := a.initializeMasterPlanIfNecessary(c); err != nil {
return err
}
ids, err := installer.IDs()
if err != nil {
return gerrors.Wrapf(err, "failed to read machine %s's IDs", machine.Name)
return a.restartableFailure(node, err)
}
node, err := a.findNodeByID(ids.MachineID, ids.SystemUUID)
if err != nil {
return gerrors.Wrapf(err, "failed to find node by id: %s/%s", ids.MachineID, ids.SystemUUID)
}
contextLog := log.WithFields(log.Fields{"machine": machine.Name, "cluster": cluster.Name, "node": node.Name})
nodePlan, err := a.getNodePlan(c, machine, a.getMachineAddress(machine), installer)
if err != nil {
return gerrors.Wrapf(err, "Failed to get node plan for machine %s", machine.Name)
return a.restartableFailure(node, gerrors.Wrapf(err, "Failed to get node plan for machine %s", machine.Name))
}
planJSON := nodePlan.ToJSON()
currentPlan := node.Annotations[planKey]
if currentPlan == planJSON {
contextLog.Info("Machine and node have matching plans; nothing to do")
return nil
return a.clearUpdateCount(node)
}
if diffedPlan, err := plandiff.GetUnifiedDiff(currentPlan, planJSON); err == nil {
......@@ -507,7 +514,7 @@ func (a *MachineActuator) update(ctx context.Context, cluster *clusterv1.Cluster
nodeIsMaster := isMaster(node)
if nodeIsMaster {
if err := a.prepareForMasterUpdate(); err != nil {
return err
return a.restartableFailure(node, err)
}
}
upOrDowngrade := isUpOrDowngrade(machine, node)
......@@ -520,33 +527,25 @@ func (a *MachineActuator) update(ctx context.Context, cluster *clusterv1.Cluster
nodeStyleVersion := "v" + version
originalNeedsUpdate, err := a.checkIfOriginalMasterNotAtVersion(nodeStyleVersion)
if err != nil {
return err
return a.restartableFailure(node, err)
}
contextLog.Infof("Original needs update: %t", originalNeedsUpdate)
masterNeedsUpdate, err := a.checkIfMasterNotAtVersion(nodeStyleVersion)
if err != nil {
return err
return a.restartableFailure(node, err)
}
contextLog.Infof("Master needs update: %t", masterNeedsUpdate)
updatingNode, err := a.findUpdatingNode()
if err != nil {
return err
}
if updatingNode != nil && isMaster(updatingNode) && updatingNode != node {
contextLog.Infof("Master is updating: %s", updatingNode.Name)
return fmt.Errorf("Master %s is currently updating...", updatingNode.Name)
}
isOriginal, err := a.isOriginalMaster(node)
if err != nil {
return err
return a.restartableFailure(node, err)
}
contextLog.Infof("Is original: %t", isOriginal)
if (!isOriginal && originalNeedsUpdate) || (!nodeIsMaster && masterNeedsUpdate) || (updatingNode != nil && updatingNode != node) {
return errors.New("A higher priority node currently needs updating")
if (!isOriginal && originalNeedsUpdate) || (!nodeIsMaster && masterNeedsUpdate) {
return a.restartableFailure(node, errors.New("A higher priority node currently needs updating"))
}
isController, err := a.isControllerNode(node)
if err != nil {
return err
return a.restartableFailure(node, err)
}
contextLog.Infof("Is controller: %t", isController)
if nodeIsMaster {
......@@ -579,18 +578,21 @@ func (a *MachineActuator) update(ctx context.Context, cluster *clusterv1.Cluster
return nil
}
func (a *MachineActuator) restartableFailure(node *corev1.Node, cause error) error {
if err := a.clearUpdateCount(node); err != nil {
return err
}
return cause
}
func (a *MachineActuator) clearUpdateCount(node *corev1.Node) error {
return a.setNodeAnnotation(node, updateCountKey, "")
}
// kubeadmUpOrDowngrade does upgrade or downgrade a machine.
// Parameter k8sversion specified here represents the version of both Kubernetes and Kubeadm.
func (a *MachineActuator) kubeadmUpOrDowngrade(machine *clusterv1.Machine, node *corev1.Node, installer *os.OS,
k8sVersion, planKey, planJSON string, ntype nodeType) error {
tooManyRetries, err := a.incUpgradeCount(node)
if err != nil {
return err
}
if tooManyRetries {
return nil
}
b := plan.NewBuilder()
b.AddResource(
"upgrade:node-unlock-kubernetes",
......@@ -658,7 +660,7 @@ func (a *MachineActuator) kubeadmUpOrDowngrade(machine *clusterv1.Machine, node
return err
}
log.Info("Finished with uncordon...")
if err = a.setNodeAnnotation(node, upgradeCountKey, ""); err != nil {
if err = a.clearUpdateCount(node); err != nil {
return err
}
......@@ -683,13 +685,6 @@ func (a *MachineActuator) performActualUpdate(
node *corev1.Node,
nodePlan *plan.Plan,
cluster *baremetalspecv1.BareMetalClusterProviderSpec) error {
tooManyRetries, err := a.incUpgradeCount(node)
if err != nil {
return err
}
if tooManyRetries {
return nil
}
if err := drain.Drain(node, a.clientSet, drain.Params{
Force: true,
DeleteLocalData: true,
......@@ -703,11 +698,7 @@ func (a *MachineActuator) performActualUpdate(
if err := a.uncordon(node); err != nil {
return err
}
if err := a.setNodeAnnotation(node, upgradeCountKey, ""); err != nil {
return err
}
return nil
return a.clearUpdateCount(node)
}
func (a *MachineActuator) getNodePlan(providerSpec *baremetalspecv1.BareMetalClusterProviderSpec, machine *clusterv1.Machine, machineAddress string, installer *os.OS) (*plan.Plan, error) {
......@@ -886,7 +877,7 @@ func (a *MachineActuator) findUpdatingNode() (*corev1.Node, error) {
}
var updatingNode *corev1.Node = nil
for _, node := range nodes {
if node.Annotations[upgradeCountKey] != "" {
if node.Annotations[updateCountKey] != "" {
if updatingNode == nil {
updatingNode = node
} else {
......@@ -1059,6 +1050,28 @@ func (a *MachineActuator) exists(ctx context.Context, cluster *clusterv1.Cluster
a.updateMachine(machine, ip)
contextLog.Infof("Updated machine: %s", ip)
}
addr := a.getMachineAddress(machine)
if node, err := a.findNodeByPrivateAddress(addr); err == nil {
contextLog := log.WithFields(log.Fields{"machine": machine.Name, "cluster": cluster.Name, "node": node.Name})
updatingNode, err := a.findUpdatingNode()
if err != nil {
return true, err
}
if updatingNode != nil && updatingNode.Name != node.Name {
msg := "Can't update node %s; node %s is currently updating..."
contextLog.Infof(msg, node.Name, updatingNode.Name)
return true, fmt.Errorf(msg, node.Name, updatingNode.Name)
}
tooManyRetries, err := a.incUpdateCount(node)
if err != nil {
return true, err
}
if tooManyRetries {
return true, nil
}
count, e := a.getUpdateCount(node)
contextLog.Infof("Update count for %s incremented to: %d, ERR: %v\n", node.Name, count, e)
}
os, closer, err := a.connectTo(machine, c, m)
if err != nil {
return false, gerrors.Wrapf(err, "failed to establish connection to machine %s", machine.Name)
......@@ -1094,26 +1107,38 @@ func (a *MachineActuator) findNodeByID(machineID, systemUUID string) (*corev1.No
return nil, apierrs.NewNotFound(schema.GroupResource{Group: "", Resource: "nodes"}, "")
}
func (a *MachineActuator) findNodeByPrivateAddress(addr string) (*corev1.Node, error) {
nodes, err := a.clientSet.CoreV1().Nodes().List(metav1.ListOptions{})
if err != nil {
return nil, gerrors.Wrap(err, "failed to list nodes")
}
for _, node := range nodes.Items {
if getNodePrivateAddress(&node) == addr {
return &node, nil
}
}
return nil, apierrs.NewNotFound(schema.GroupResource{Group: "", Resource: "nodes"}, "")
}
var r = rand.New(rand.NewSource(time.Now().Unix()))
func (a *MachineActuator) incUpgradeCount(node *corev1.Node) (bool, error) {
count, err := a.getUpgradeCount(node)
func (a *MachineActuator) incUpdateCount(node *corev1.Node) (bool, error) {
count, err := a.getUpdateCount(node)
if err != nil {
return false, err
}
switch count {
case 0:
return false, a.setNodeAnnotation(node, upgradeCountKey, "0")
return false, a.setNodeAnnotation(node, updateCountKey, "1")
case maxUpgradeAttempts:
return true, fmt.Errorf("Maximum number of upgrade attempts exceeded for: %s", node.Name)
return true, nil
default:
return false, a.setNodeAnnotation(node, upgradeCountKey, fmt.Sprintf("%d", count+1))
return false, a.setNodeAnnotation(node, updateCountKey, fmt.Sprintf("%d", count+1))
}
}
func (a *MachineActuator) getUpgradeCount(node *corev1.Node) (int, error) {
countAnnotation := node.Annotations[upgradeCountKey]
func (a *MachineActuator) getUpdateCount(node *corev1.Node) (int, error) {
countAnnotation := node.Annotations[updateCountKey]
if countAnnotation == "" {
return 0, nil
}
......
Supports Markdown
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment