Commit b2b4a2ea authored by Mike Cutalo's avatar Mike Cutalo
Browse files

cachelessinfromer

No related merge requests found
Showing with 48 additions and 37 deletions
+48 -37
......@@ -52,67 +52,37 @@ func (s *svc) StartTopologyCaching(ctx context.Context) (<-chan *topologyv1.Upda
}
func (s *svc) startInformers(ctx context.Context, cs ContextClientset) {
// factory := informers.NewSharedInformerFactoryWithOptions(cs, informerResyncTime)
stop := make(chan struct{})
// podInformer := factory.Core().V1().Pods().Informer()
// deploymentInformer := factory.Apps().V1().Deployments().Informer()
// hpaInformer := factory.Autoscaling().V1().HorizontalPodAutoscalers().Informer()
informerHandlers := cache.ResourceEventHandlerFuncs{
AddFunc: s.informerAddHandler,
UpdateFunc: s.informerUpdateHandler,
DeleteFunc: s.informerDeleteHandler,
}
// podInformer.AddEventHandler(informerHandlers)
// deploymentInformer.AddEventHandler(informerHandlers)
// hpaInformer.AddEventHandler(informerHandlers)
// _, podInformer := cache.NewInformer(
// cache.NewListWatchFromClient(cs.CoreV1().RESTClient(), "pods", v1.NamespaceAll, fields.Everything()),
// &v1.Pod{},
// informerResyncTime,
// informerHandlers,
// )
// _, deploymentInformer := cache.NewInformer(
// cache.NewListWatchFromClient(cs.AppsV1().RESTClient(), "deployments", v1.NamespaceAll, fields.Everything()),
// &appsv1.Deployment{},
// informerResyncTime,
// informerHandlers,
// )
// _, hpaInformer := cache.NewInformer(
// cache.NewListWatchFromClient(cs.AutoscalingV1().RESTClient(), "horizontalpodautoscalers", v1.NamespaceAll, fields.Everything()),
// &autoscalingv1.HorizontalPodAutoscaler{},
// informerResyncTime,
// informerHandlers,
// )
_, podInformer := cache.NewIndexerInformer(
podInformer := NewCachelessInformer(
cs,
cache.NewListWatchFromClient(cs.CoreV1().RESTClient(), "pods", v1.NamespaceAll, fields.Everything()),
&v1.Pod{},
informerResyncTime,
informerHandlers,
cache.Indexers{},
)
_, deploymentInformer := cache.NewIndexerInformer(
deploymentInformer := NewCachelessInformer(
cs,
cache.NewListWatchFromClient(cs.AppsV1().RESTClient(), "deployments", v1.NamespaceAll, fields.Everything()),
&appsv1.Deployment{},
informerResyncTime,
informerHandlers,
cache.Indexers{},
)
_, hpaInformer := cache.NewIndexerInformer(
hpaInformer := NewCachelessInformer(
cs,
cache.NewListWatchFromClient(cs.AutoscalingV1().RESTClient(), "horizontalpodautoscalers", v1.NamespaceAll, fields.Everything()),
&autoscalingv1.HorizontalPodAutoscaler{},
informerResyncTime,
informerHandlers,
cache.Indexers{},
)
stop := make(chan struct{})
go podInformer.Run(stop)
go deploymentInformer.Run(stop)
go hpaInformer.Run(stop)
......
package k8s
import (
"time"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/client-go/tools/cache"
)
func NewCachelessInformer(
cs ContextClientset,
lw cache.ListerWatcher,
objType runtime.Object,
resync time.Duration,
h cache.ResourceEventHandler,
) cache.Controller {
fifo := cache.NewDeltaFIFOWithOptions(cache.DeltaFIFOOptions{
// just to satisify the interface were not going to use this
KnownObjects: cache.NewIndexer(cache.DeletionHandlingMetaNamespaceKeyFunc, cache.Indexers{}),
EmitDeltaTypeReplaced: true,
})
return cache.New(&cache.Config{
Queue: fifo,
ListerWatcher: lw,
ObjectType: objType,
FullResyncPeriod: resync,
RetryOnError: false,
Process: func(obj interface{}) error {
for _, d := range obj.(cache.Deltas) {
switch d.Type {
case cache.Sync, cache.Replaced, cache.Added, cache.Updated:
h.OnAdd(d.Object)
case cache.Deleted:
h.OnDelete(d.Object)
}
}
return nil
},
})
}
Supports Markdown
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment