Unverified Commit f84c3d38 authored by Vihang Mehta's avatar Vihang Mehta Committed by Copybara
Browse files

Use k8s informers to watch resources instead of ListWatchers


Summary:
We have seen multiple reports of our operator monitoring failing on
the k8s watcher and restarting the watcher also fails since the RV is too old.

On further investigation, it seems like we should be using informers to handle
resource watching, so switch to informers

Test Plan: skaffold deploy the operator with some additional logging

Reviewers: michelle, philkuz

Reviewed By: philkuz

JIRA Issues: PP-3066
Signed-off-by: default avatarVihang Mehta <vihang@pixielabs.ai>

Differential Revision: https://phab.corp.pixielabs.ai/D9899

GitOrigin-RevId: 1f776f63db89e527d45d688d2d7d9be2b9a0eae0
parent 5366ed29
Showing with 97 additions and 215 deletions
+97 -215
...@@ -42,14 +42,13 @@ go_library( ...@@ -42,14 +42,13 @@ go_library(
"@io_k8s_api//core/v1:core", "@io_k8s_api//core/v1:core",
"@io_k8s_apimachinery//pkg/apis/meta/v1:meta", "@io_k8s_apimachinery//pkg/apis/meta/v1:meta",
"@io_k8s_apimachinery//pkg/apis/meta/v1/unstructured", "@io_k8s_apimachinery//pkg/apis/meta/v1/unstructured",
"@io_k8s_apimachinery//pkg/fields",
"@io_k8s_apimachinery//pkg/runtime", "@io_k8s_apimachinery//pkg/runtime",
"@io_k8s_apimachinery//pkg/types", "@io_k8s_apimachinery//pkg/types",
"@io_k8s_apimachinery//pkg/watch", "@io_k8s_apimachinery//pkg/watch",
"@io_k8s_client_go//informers",
"@io_k8s_client_go//kubernetes", "@io_k8s_client_go//kubernetes",
"@io_k8s_client_go//rest", "@io_k8s_client_go//rest",
"@io_k8s_client_go//tools/cache", "@io_k8s_client_go//tools/cache",
"@io_k8s_client_go//tools/watch",
"@io_k8s_sigs_controller_runtime//:controller-runtime", "@io_k8s_sigs_controller_runtime//:controller-runtime",
"@io_k8s_sigs_controller_runtime//pkg/client", "@io_k8s_sigs_controller_runtime//pkg/client",
"@org_golang_google_grpc//:go_default_library", "@org_golang_google_grpc//:go_default_library",
......
...@@ -33,12 +33,10 @@ import ( ...@@ -33,12 +33,10 @@ import (
log "github.com/sirupsen/logrus" log "github.com/sirupsen/logrus"
"google.golang.org/grpc" "google.golang.org/grpc"
v1 "k8s.io/api/core/v1" v1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/fields"
"k8s.io/apimachinery/pkg/types" "k8s.io/apimachinery/pkg/types"
"k8s.io/client-go/informers"
"k8s.io/client-go/kubernetes" "k8s.io/client-go/kubernetes"
"k8s.io/client-go/tools/cache" "k8s.io/client-go/tools/cache"
"k8s.io/client-go/tools/watch"
"sigs.k8s.io/controller-runtime/pkg/client" "sigs.k8s.io/controller-runtime/pkg/client"
"px.dev/pixie/src/api/proto/cloudpb" "px.dev/pixie/src/api/proto/cloudpb"
...@@ -106,6 +104,7 @@ func (c *concurrentPodMap) write(nameLabel, k8sName string, p *podWrapper) { ...@@ -106,6 +104,7 @@ func (c *concurrentPodMap) write(nameLabel, k8sName string, p *podWrapper) {
// for the overall Vizier instance. // for the overall Vizier instance.
type VizierMonitor struct { type VizierMonitor struct {
clientset kubernetes.Interface clientset kubernetes.Interface
factory informers.SharedInformerFactory
httpClient HTTPClient httpClient HTTPClient
ctx context.Context ctx context.Context
cancel func() cancel func()
...@@ -115,7 +114,6 @@ type VizierMonitor struct { ...@@ -115,7 +114,6 @@ type VizierMonitor struct {
namespacedName types.NamespacedName namespacedName types.NamespacedName
podStates *concurrentPodMap podStates *concurrentPodMap
lastPodRV string
// States from the various state-updaters, which should be aggregated into a single status. // States from the various state-updaters, which should be aggregated into a single status.
nodeState *vizierState nodeState *vizierState
...@@ -140,18 +138,16 @@ func (m *VizierMonitor) InitAndStartMonitor(cloudClient *grpc.ClientConn) error ...@@ -140,18 +138,16 @@ func (m *VizierMonitor) InitAndStartMonitor(cloudClient *grpc.ClientConn) error
m.nodeState = okState() m.nodeState = okState()
m.pvcState = okState() m.pvcState = okState()
err := m.initState() m.factory = informers.NewSharedInformerFactoryWithOptions(m.clientset, 0, informers.WithNamespace(m.namespace))
if err != nil {
return err
}
// Watch for future updates in the namespace. // Watch for pod updates in the namespace.
go m.watchK8sPods() go m.watchK8sPods()
// Start PVC monitor. // Start PVC monitor.
pvcStateCh := make(chan *vizierState) pvcStateCh := make(chan *vizierState)
pvcW := &pvcWatcher{ pvcW := &pvcWatcher{
clientset: m.clientset, clientset: m.clientset,
factory: m.factory,
namespace: m.namespace, namespace: m.namespace,
state: pvcStateCh, state: pvcStateCh,
} }
...@@ -160,8 +156,8 @@ func (m *VizierMonitor) InitAndStartMonitor(cloudClient *grpc.ClientConn) error ...@@ -160,8 +156,8 @@ func (m *VizierMonitor) InitAndStartMonitor(cloudClient *grpc.ClientConn) error
// Start node monitor. // Start node monitor.
nodeStateCh := make(chan *vizierState) nodeStateCh := make(chan *vizierState)
nodeW := &nodeWatcher{ nodeW := &nodeWatcher{
clientset: m.clientset, factory: m.factory,
state: nodeStateCh, state: nodeStateCh,
} }
go nodeW.start(m.ctx) go nodeW.start(m.ctx)
...@@ -173,83 +169,40 @@ func (m *VizierMonitor) InitAndStartMonitor(cloudClient *grpc.ClientConn) error ...@@ -173,83 +169,40 @@ func (m *VizierMonitor) InitAndStartMonitor(cloudClient *grpc.ClientConn) error
return nil return nil
} }
func (m *VizierMonitor) initState() error { func (m *VizierMonitor) onAddPod(obj interface{}) {
podList, err := m.clientset.CoreV1().Pods(m.namespace).List(m.ctx, metav1.ListOptions{}) pod, ok := obj.(*v1.Pod)
if err != nil { if !ok {
return err return
}
m.lastPodRV = podList.ResourceVersion
// Populate vizierStates with current pod state.
for _, pod := range podList.Items {
m.handlePod(pod)
} }
m.podStates.write(pod.ObjectMeta.Labels["name"], pod.ObjectMeta.Name, &podWrapper{pod: pod})
return nil
} }
func (m *VizierMonitor) handlePod(p v1.Pod) { func (m *VizierMonitor) onUpdatePod(oldObj, newObj interface{}) {
// We label all of our vizier pods with a name=<componentName>. pod, ok := newObj.(*v1.Pod)
// For now, this assumes no replicas. If a new pod starts up, it will replace
// the status of the previous pod.
// In the future we may add special handling for PEMs/multiple kelvins.
nameLabel, ok := p.ObjectMeta.Labels["name"]
if !ok { if !ok {
nameLabel = "" return
} }
k8sName := p.ObjectMeta.Name m.podStates.write(pod.ObjectMeta.Labels["name"], pod.ObjectMeta.Name, &podWrapper{pod: pod})
}
// Delete from our pemState if the pod is set for deletion. func (m *VizierMonitor) onDeletePod(obj interface{}) {
if p.ObjectMeta.DeletionTimestamp != nil { pod, ok := obj.(*v1.Pod)
m.podStates.delete(nameLabel, k8sName) if !ok {
return return
} }
m.podStates.delete(pod.ObjectMeta.Labels["name"], pod.ObjectMeta.Name)
m.podStates.write(nameLabel, k8sName, &podWrapper{pod: &p})
} }
func (m *VizierMonitor) watchK8sPods() { func (m *VizierMonitor) watchK8sPods() {
for { informer := m.factory.Core().V1().Pods().Informer()
watcher := cache.NewListWatchFromClient(m.clientset.CoreV1().RESTClient(), "pods", m.namespace, fields.Everything()) stopper := make(chan struct{})
retryWatcher, err := watch.NewRetryWatcher(m.lastPodRV, watcher) defer close(stopper)
if err != nil { informer.AddEventHandler(cache.ResourceEventHandlerFuncs{
log.WithError(err).Fatal("Could not start watcher for pods") AddFunc: m.onAddPod,
} UpdateFunc: m.onUpdatePod,
DeleteFunc: m.onDeletePod,
resCh := retryWatcher.ResultChan() })
loop := true informer.Run(stopper)
for loop {
select {
case <-m.ctx.Done():
log.Info("Received cancel, stopping K8s watcher")
return
case c, ok := <-resCh:
if !ok {
log.Info("Watcher channel closed, restarting")
loop = false
break
}
s, ok := c.Object.(*metav1.Status)
if ok && s.Status == metav1.StatusFailure {
log.WithField("status", s.Status).WithField("msg", s.Message).WithField("reason", s.Reason).WithField("details", s.Details).Info("Received failure status in pod watcher")
// Try to start up another watcher instance.
// Sleep a second before retrying, so as not to drive up the CPU.
time.Sleep(1 * time.Second)
loop = false
break
}
// Update the lastPodRV, so that if the watcher restarts, it starts at the correct resource version.
o, ok := c.Object.(*v1.Pod)
if !ok {
continue
}
m.lastPodRV = o.ObjectMeta.ResourceVersion
m.handlePod(*o)
}
}
}
} }
// vizierState details the state of Vizier at a snapshot. // vizierState details the state of Vizier at a snapshot.
......
...@@ -21,17 +21,12 @@ package controllers ...@@ -21,17 +21,12 @@ package controllers
import ( import (
"context" "context"
"strings" "strings"
"time"
"github.com/blang/semver" "github.com/blang/semver"
log "github.com/sirupsen/logrus" log "github.com/sirupsen/logrus"
v1 "k8s.io/api/core/v1" v1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/client-go/informers"
"k8s.io/apimachinery/pkg/fields"
apiwatch "k8s.io/apimachinery/pkg/watch"
"k8s.io/client-go/kubernetes"
"k8s.io/client-go/tools/cache" "k8s.io/client-go/tools/cache"
"k8s.io/client-go/tools/watch"
"px.dev/pixie/src/shared/status" "px.dev/pixie/src/shared/status"
) )
...@@ -122,10 +117,9 @@ func (n *nodeCompatTracker) state() *vizierState { ...@@ -122,10 +117,9 @@ func (n *nodeCompatTracker) state() *vizierState {
// NodeWatcher is responsible for tracking the nodes from the K8s API and using the NodeInfo to determine // NodeWatcher is responsible for tracking the nodes from the K8s API and using the NodeInfo to determine
// whether or not Pixie can successfully collect data on the cluster. // whether or not Pixie can successfully collect data on the cluster.
type nodeWatcher struct { type nodeWatcher struct {
clientset kubernetes.Interface factory informers.SharedInformerFactory
compatTracker nodeCompatTracker compatTracker nodeCompatTracker
lastRV string
state chan<- *vizierState state chan<- *vizierState
} }
...@@ -135,70 +129,41 @@ func (nw *nodeWatcher) start(ctx context.Context) { ...@@ -135,70 +129,41 @@ func (nw *nodeWatcher) start(ctx context.Context) {
incompatibleCount: 0.0, incompatibleCount: 0.0,
nodeCompatible: make(map[string]bool), nodeCompatible: make(map[string]bool),
} }
nodeList, err := nw.clientset.CoreV1().Nodes().List(ctx, metav1.ListOptions{})
if err != nil {
log.WithError(err).Fatal("Could not list nodes")
}
nw.lastRV = nodeList.ResourceVersion
for i := range nodeList.Items { informer := nw.factory.Core().V1().Nodes().Informer()
nw.compatTracker.addNode(&nodeList.Items[i]) stopper := make(chan struct{})
defer close(stopper)
informer.AddEventHandler(cache.ResourceEventHandlerFuncs{
AddFunc: nw.onAdd,
UpdateFunc: nw.onUpdate,
DeleteFunc: nw.onDelete,
})
informer.Run(stopper)
}
func (nw *nodeWatcher) onAdd(obj interface{}) {
node, ok := obj.(*v1.Node)
if !ok {
return
} }
nw.compatTracker.addNode(node)
nw.state <- nw.compatTracker.state() nw.state <- nw.compatTracker.state()
}
// Start watcher. func (nw *nodeWatcher) onUpdate(oldObj, newObj interface{}) {
nw.watchNodes(ctx) node, ok := newObj.(*v1.Node)
if !ok {
return
}
nw.compatTracker.updateNode(node)
nw.state <- nw.compatTracker.state()
} }
func (nw *nodeWatcher) watchNodes(ctx context.Context) { func (nw *nodeWatcher) onDelete(obj interface{}) {
for { node, ok := obj.(*v1.Node)
watcher := cache.NewListWatchFromClient(nw.clientset.CoreV1().RESTClient(), "nodes", metav1.NamespaceAll, fields.Everything()) if !ok {
retryWatcher, err := watch.NewRetryWatcher(nw.lastRV, watcher) return
if err != nil {
log.WithError(err).Fatal("Could not start watcher for nodes")
}
resCh := retryWatcher.ResultChan()
loop := true
for loop {
select {
case <-ctx.Done():
log.Info("Received cancel, stopping K8s watcher")
return
case c, ok := <-resCh:
if !ok {
loop = false
break
}
s, ok := c.Object.(*metav1.Status)
if ok && s.Status == metav1.StatusFailure {
log.WithField("status", s.Status).WithField("msg", s.Message).WithField("reason", s.Reason).WithField("details", s.Details).Info("Received failure status in node watcher")
// Try to start up another watcher instance.
// Sleep a second before retrying, so as not to drive up the CPU.
time.Sleep(1 * time.Second)
loop = false
break
}
node, ok := c.Object.(*v1.Node)
if !ok {
continue
}
// Update the lastRV, so that if the watcher restarts, it starts at the correct resource version.
nw.lastRV = node.ObjectMeta.ResourceVersion
switch c.Type {
case apiwatch.Added:
nw.compatTracker.addNode(node)
case apiwatch.Modified:
nw.compatTracker.updateNode(node)
case apiwatch.Deleted:
nw.compatTracker.removeNode(node)
}
nw.state <- nw.compatTracker.state()
}
}
} }
nw.compatTracker.removeNode(node)
nw.state <- nw.compatTracker.state()
} }
...@@ -20,16 +20,12 @@ package controllers ...@@ -20,16 +20,12 @@ package controllers
import ( import (
"context" "context"
"time"
log "github.com/sirupsen/logrus"
v1 "k8s.io/api/core/v1" v1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/fields" "k8s.io/client-go/informers"
apiwatch "k8s.io/apimachinery/pkg/watch"
"k8s.io/client-go/kubernetes" "k8s.io/client-go/kubernetes"
"k8s.io/client-go/tools/cache" "k8s.io/client-go/tools/cache"
"k8s.io/client-go/tools/watch"
"px.dev/pixie/src/shared/status" "px.dev/pixie/src/shared/status"
) )
...@@ -60,7 +56,7 @@ func metadataPVCState(clientset kubernetes.Interface, pvc *v1.PersistentVolumeCl ...@@ -60,7 +56,7 @@ func metadataPVCState(clientset kubernetes.Interface, pvc *v1.PersistentVolumeCl
// whether or not Pixie can successfully run the metadata service on this cluster. // whether or not Pixie can successfully run the metadata service on this cluster.
type pvcWatcher struct { type pvcWatcher struct {
clientset kubernetes.Interface clientset kubernetes.Interface
lastRV string factory informers.SharedInformerFactory
namespace string namespace string
...@@ -68,73 +64,42 @@ type pvcWatcher struct { ...@@ -68,73 +64,42 @@ type pvcWatcher struct {
} }
func (pw *pvcWatcher) start(ctx context.Context) { func (pw *pvcWatcher) start(ctx context.Context) {
pvcs, err := pw.clientset.CoreV1().PersistentVolumeClaims(pw.namespace).List(ctx, metav1.ListOptions{}) informer := pw.factory.Core().V1().PersistentVolumeClaims().Informer()
if err != nil { stopper := make(chan struct{})
log.WithError(err).Fatal("Could not list pvcs") defer close(stopper)
informer.AddEventHandler(cache.ResourceEventHandlerFuncs{
AddFunc: pw.onAdd,
UpdateFunc: pw.onUpdate,
DeleteFunc: pw.onDelete,
})
informer.Run(stopper)
}
func (pw *pvcWatcher) isMetadataPVC(obj interface{}) bool {
pvc, ok := obj.(*v1.PersistentVolumeClaim)
if !ok {
return false
} }
pw.lastRV = pvcs.ResourceVersion return pvc.Namespace == pw.namespace && pvc.Name == metadataPVC
}
var pvc *v1.PersistentVolumeClaim
for i := range pvcs.Items { func (pw *pvcWatcher) onAdd(obj interface{}) {
if pvcs.Items[i].Name == metadataPVC { if !pw.isMetadataPVC(obj) {
pvc = &pvcs.Items[i] return
break
}
} }
pw.state <- metadataPVCState(pw.clientset, pvc) pw.state <- metadataPVCState(pw.clientset, obj.(*v1.PersistentVolumeClaim))
}
// Start watcher. func (pw *pvcWatcher) onUpdate(oldObj, newObj interface{}) {
pw.watchPVCs(ctx) if !pw.isMetadataPVC(newObj) {
return
}
pw.state <- metadataPVCState(pw.clientset, newObj.(*v1.PersistentVolumeClaim))
} }
func (pw *pvcWatcher) watchPVCs(ctx context.Context) { func (pw *pvcWatcher) onDelete(obj interface{}) {
for { if !pw.isMetadataPVC(obj) {
watcher := cache.NewListWatchFromClient(pw.clientset.CoreV1().RESTClient(), "persistentvolumeclaims", pw.namespace, fields.Everything()) return
retryWatcher, err := watch.NewRetryWatcher(pw.lastRV, watcher)
if err != nil {
log.WithError(err).Fatal("Could not start watcher for PVCs")
}
resCh := retryWatcher.ResultChan()
loop := true
for loop {
select {
case <-ctx.Done():
log.Info("Received cancel, stopping K8s watcher")
return
case c, ok := <-resCh:
if !ok {
loop = false
break
}
s, ok := c.Object.(*metav1.Status)
if ok && s.Status == metav1.StatusFailure {
log.WithField("status", s.Status).WithField("msg", s.Message).WithField("reason", s.Reason).WithField("details", s.Details).Info("Received failure status in PVC watcher")
// Try to start up another watcher instance.
// Sleep a second before retrying, so as not to drive up the CPU.
time.Sleep(1 * time.Second)
loop = false
break
}
pvc, ok := c.Object.(*v1.PersistentVolumeClaim)
if !ok {
continue
}
// Update the lastRV, so that if the watcher restarts, it starts at the correct resource version.
pw.lastRV = pvc.ObjectMeta.ResourceVersion
if pvc.Name != metadataPVC {
continue
}
switch c.Type {
case apiwatch.Added, apiwatch.Modified:
pw.state <- metadataPVCState(pw.clientset, pvc)
case apiwatch.Deleted:
pw.state <- metadataPVCState(pw.clientset, nil)
}
}
}
} }
pw.state <- metadataPVCState(pw.clientset, nil)
} }
Supports Markdown
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment