Unverified Commit d41c5db2 authored by Jun Du's avatar Jun Du Committed by GitHub
Browse files

Merge pull request #193 from m1093782566/cloud-part

Open source the cloud part of kubeedge
parents de85efa0 5ec1d19d
Showing with 1270 additions and 20 deletions
+1270 -20
......@@ -31,3 +31,5 @@ jobs:
name: "integration test edge" # names the fourth Tests stage job
- script: make edge_cross_build
name: "cross build edge" # names the fifth Tests stage job
- script: make edgecontroller
name: "build edgecontroller" # names the sixth Tests stage job
......@@ -164,14 +164,6 @@
revision = "2ee87856327ba09384cabd113bc6b5d174e9ec0f"
version = "v3.5.1"
[[projects]]
digest = "1:d8abdc866ebbe05fa3bce50863e23afd4c73c804b90c908caa864e86df1db8a1"
name = "github.com/bouk/monkey"
packages = ["."]
pruneopts = "UT"
revision = "5df1f207ff77e025801505ae4d903133a0b4353f"
version = "v1.0.0"
[[projects]]
branch = "master"
digest = "1:1ed8f94f16010f9e1a419a818ec8ed5765d04561e6bfcc872e3a5a16fbae08b2"
......@@ -623,6 +615,14 @@
pruneopts = "UT"
revision = "a0d98a5f288019575c6d1f4bb1573fef2d1fcdc4"
[[projects]]
branch = "master"
digest = "1:0778dc7fce1b4669a8bfa7ae506ec1f595b6ab0f8989c1c0d22a8ca1144e9972"
name = "github.com/howeyc/gopass"
packages = ["."]
pruneopts = "UT"
revision = "bf9dde6d0d2c004a008c27aaee91170c786f6db8"
[[projects]]
digest = "1:a1038ef593beb4771c8f0f9c26e8b00410acd800af5c6864651d9bf160ea1813"
name = "github.com/hpcloud/tail"
......@@ -637,6 +637,14 @@
revision = "a30252cb686a21eb2d0b98132633053ec2f7f1e5"
version = "v1.0.0"
[[projects]]
digest = "1:a0cefd27d12712af4b5018dc7046f245e1e3b5760e2e848c30b171b570708f9b"
name = "github.com/imdario/mergo"
packages = ["."]
pruneopts = "UT"
revision = "7c29201646fa3de8506f701213473dd407f19646"
version = "v0.3.7"
[[projects]]
digest = "1:e22af8c7518e1eab6f2eab2b7d7558927f816262586cd6ed9f349c97a6c285c4"
name = "github.com/jmespath/go-jmespath"
......@@ -1345,7 +1353,7 @@
version = "kubernetes-1.10.9"
[[projects]]
digest = "1:c07b934022f532afe3a492db0c8582fd6145ac520433e254c24f9d22c48fe7d7"
digest = "1:ae349bea8ee6d5d3150688376e1c7be277720c28e460f2ba9490a46cc237131e"
name = "k8s.io/client-go"
packages = [
"discovery",
......@@ -1480,8 +1488,12 @@
"rest",
"rest/watch",
"testing",
"tools/auth",
"tools/cache",
"tools/clientcmd",
"tools/clientcmd/api",
"tools/clientcmd/api/latest",
"tools/clientcmd/api/v1",
"tools/metrics",
"tools/pager",
"tools/record",
......@@ -1493,6 +1505,7 @@
"util/cert",
"util/exec",
"util/flowcontrol",
"util/homedir",
"util/integer",
"util/retry",
"util/workqueue",
......@@ -1687,7 +1700,6 @@
input-imports = [
"github.com/256dpi/gomqtt/broker",
"github.com/256dpi/gomqtt/packet",
"github.com/256dpi/gomqtt/session",
"github.com/256dpi/gomqtt/topic",
"github.com/256dpi/gomqtt/transport",
"github.com/ServiceComb/go-archaius",
......@@ -1695,7 +1707,6 @@
"github.com/ServiceComb/paas-lager",
"github.com/ServiceComb/paas-lager/rotate",
"github.com/astaxie/beego/orm",
"github.com/bouk/monkey",
"github.com/docker/distribution/reference",
"github.com/docker/docker/api/types",
"github.com/docker/docker/api/types/container",
......@@ -1707,6 +1718,7 @@
"github.com/eclipse/paho.mqtt.golang",
"github.com/go-chassis/go-archaius/core",
"github.com/go-chassis/paas-lager/third_party/forked/cloudfoundry/lager",
"github.com/golang/glog",
"github.com/golang/mock/gomock",
"github.com/gorilla/mux",
"github.com/gorilla/websocket",
......@@ -1720,8 +1732,10 @@
"gopkg.in/yaml.v2",
"k8s.io/api/core/v1",
"k8s.io/apimachinery/pkg/api/equality",
"k8s.io/apimachinery/pkg/api/errors",
"k8s.io/apimachinery/pkg/api/resource",
"k8s.io/apimachinery/pkg/apis/meta/v1",
"k8s.io/apimachinery/pkg/fields",
"k8s.io/apimachinery/pkg/runtime",
"k8s.io/apimachinery/pkg/types",
"k8s.io/apimachinery/pkg/util/clock",
......@@ -1729,10 +1743,13 @@
"k8s.io/apimachinery/pkg/util/runtime",
"k8s.io/apimachinery/pkg/util/sets",
"k8s.io/apimachinery/pkg/util/wait",
"k8s.io/apimachinery/pkg/watch",
"k8s.io/apiserver/pkg/util/feature",
"k8s.io/client-go/kubernetes",
"k8s.io/client-go/kubernetes/fake",
"k8s.io/client-go/rest",
"k8s.io/client-go/tools/cache",
"k8s.io/client-go/tools/clientcmd",
"k8s.io/client-go/util/flowcontrol",
"k8s.io/client-go/util/workqueue",
"k8s.io/kubernetes/pkg/cloudprovider",
......
......@@ -20,3 +20,8 @@ edge_integration_test:
.PHONY: edge_cross_build
edge_cross_build:
cd edge && $(MAKE) cross_build
.PHONY: edgecontroller
edgecontroller:
cd cloud/edgecontroller && $(MAKE)
......@@ -7,21 +7,21 @@
<img src="./docs/images/KubeEdge_logo.png">
KubeEdge is an open source system extending native containerized application orchestration and device management to hosts at the Edge. It is built upon Kubernetes and provides core infrastructure support for networking, application deployment and metadata synchronization between cloud and edge. It also supports **MQTT** and allows developers to author custom logic and enable resource constrained device communication at the Edge. Kubeedge consists of a cloud part and an edge part. The edge part has already been open sourced and the cloud part is coming soon!
KubeEdge is an open source system extending native containerized application orchestration and device management to hosts at the Edge. It is built upon Kubernetes and provides core infrastructure support for networking, application deployment and metadata synchronization between cloud and edge. It also supports **MQTT** and allows developers to author custom logic and enable resource constrained device communication at the Edge. Kubeedge consists of a cloud part and an edge part.
## Advantages
#### Edge Computing
With business logic running at the Edge, much larger volumes of data can be secured & processed locally where the data is produced. This reduces the network bandwidth requirements and consumption between Edge and Cloud. This increases responsiveness, decreases costs, and protects customers' data privacy.
With business logic running at the Edge, much larger volumes of data can be secured & processed locally where the data is produced. Edge nodes can run autonomously which effectively reduces the network bandwidth requirements and consumptions between Edge and Cloud. With data processed at the Edge, the responsiveness is increased dramatically and data privacy is protected.
#### Simplified development
Developers can write regular http or mqtt based applications, containerize these, and run them anywhere - either at the Edge or in the Cloud - whichever is more appropriate.
Developers can write regular http or mqtt based applications, containerize them, and run them anywhere - either at the Edge or in the Cloud - whichever is more appropriate.
#### Kubernetes-native support
With KubeEdge, users can orchestrate apps, manage devices and monitor app and device status on Edge nodes just like a traditional Kubernetes cluster in the Cloud
With KubeEdge, users can orchestrate apps, manage devices and monitor app and device status on Edge nodes just like a traditional Kubernetes cluster in the Cloud. Locations of edge nodes are transparent to customers.
#### Abundant applications
......@@ -33,6 +33,8 @@ KubeEdge is composed of the following components:
- **Edged:** an agent that runs on edge nodes and manages containerized applications.
- **EdgeHub:** a web socket client responsible for interacting with Cloud Service for the edge computing (like Edge Controller as in the KubeEdge Architecture). This includes syncing cloud-side resource updates to the edge, and reporting edge-side host and device status changes to the cloud.
- **CloudHub:** A web socket server responsible for watching changes at the cloud side, caching and sending messages to EdgeHub.
- **EdgeController:** an extended kubernetes controller which manages edge nodes and pods metadata so that the data can be targeted to a specific edge node.
- **EventBus:** an MQTT client to interact with MQTT servers (mosquitto), offering publish and subscribe capabilities to other components.
- **DeviceTwin:** responsible for storing device status and syncing device status to the cloud. It also provides query interfaces for applications.
- **MetaManager:** the message processor between edged and edgehub. It is also responsible for storing/retrieving metadata to/from a lightweight database (SQLite).
......@@ -45,7 +47,7 @@ KubeEdge is composed of the following components:
### Release 1.0
KubeEdge will provide the fundamental infrastructure and basic functionality for IOT/Edge workloads. This includes:
- An open source implementation of the cloud part.
- An open source implementation of the cloud and edge parts.
- Kubernetes application deployment through kubectl from Cloud to Edge nodes.
- Kubernetes configmap and secret deployment through kubectl from Cloud to Edge nodes and their applications.
- Bi-directional multiplexed network communication between Cloud and Edge nodes.
......@@ -54,8 +56,9 @@ KubeEdge will provide the fundamental infrastructure and basic functionality for
- Device twin and MQTT protocol for communication between IOT devices and Edge nodes.
### Release 2.0 and the Future
- Istio-based service mesh across Edge and Cloud.
- Enable function as a service at the Edge
- Istio-based service mesh across Edge and Cloud where micro-services can communicate freely in the mesh.
- Enhance performance and reliability of KubeEdge infrastructure.
- Enable function as a service at the Edge.
- Support more types of device protocols to Edge nodes such as AMQP, BlueTooth, ZigBee, etc.
- Evaluate and enable much larger scale Edge clusters with thousands of Edge nodes and millions of devices.
- Enable intelligent scheduling of applications to large scale Edge clusters.
......@@ -98,6 +101,63 @@ yum-config-manager \
https://download.docker.com/linux/centos/docker-ce.repo
yum update && yum install docker-ce-18.06.1.ce
```
KubeEdge's Cloud(edgecontroller) connects to Kubernetes master to sync updates of node/pod status. If you don't have Kubernetes setup, please follow these steps to install Kubernetes using kubeadm
#### Install kubeadm/kubectl
For Ubuntu:
```shell
apt-get update && apt-get install -y apt-transport-https curl
curl -s https://packages.cloud.google.com/apt/doc/apt-key.gpg | apt-key add -
cat <<EOF >/etc/apt/sources.list.d/kubernetes.list
deb https://apt.kubernetes.io/ kubernetes-xenial main
EOF
apt-get update
apt-get install -y kubelet kubeadm kubectl
apt-mark hold kubelet kubeadm kubectl
```
For CentOS:
```shell
at <<EOF > /etc/yum.repos.d/kubernetes.repo
[kubernetes]
name=Kubernetes
baseurl=https://packages.cloud.google.com/yum/repos/kubernetes-el7-x86_64
enabled=1
gpgcheck=1
repo_gpgcheck=1
gpgkey=https://packages.cloud.google.com/yum/doc/yum-key.gpg https://packages.cloud.google.com/yum/doc/rpm-package-key.gpg
exclude=kube*
EOF
# Set SELinux in permissive mode (effectively disabling it)
setenforce 0
sed -i 's/^SELINUX=enforcing$/SELINUX=permissive/' /etc/selinux/config
yum install -y kubelet kubeadm kubectl --disableexcludes=kubernetes
systemctl enable --now kubelet
```
#### Install Kubernetes
To initialize Kubernetes master, follow the below step:
```shell
kubeadm init
```
After initializing Kubernetes master, we need to expose insecure port 8080 for edgecontroller/kubectl to work with http connection to api-server
Please follow below steps to enable http port in apiserver
```shell
vi /etc/kubernetes/manifests/kube-apiserver.yaml
# Add the following flags in spec: containers: -command section
- --insecure-port=8080
- --insecure-bind-address=0.0.0.0
```
KubeEdge uses MQTT for communication between deviceTwin and devices. KubeEdge supports 3 MQTT modes:
1) internalMqttMode: internal mqtt broker is enabled
2) bothMqttMode: internal as well as external broker are enabled
......@@ -122,26 +182,86 @@ yum install mosquitto
See [mosquitto official website](https://mosquitto.org/download/) for more information.
### Build Edge
KubeEdge has certificate based authentication/authorization between cloud and edge. Certificates can be generated using openssl. Please follow the steps below to generate certificates.
#### Install openssl
If openssl is not already present using below command to install openssl
```shell
apt-get install openssl
```
#### Generate Certificates
RootCA certificate and a cert/key pair is required to have a setup for KubeEdge. Same cert/key pair can be used in both cloud and edge.
```shell
# Generete Root Key
openssl genrsa -des3 -out rootCA.key 4096
# Generate Root Certificate
openssl req -x509 -new -nodes -key rootCA.key -sha256 -days 1024 -out rootCA.crt
# Generate Key
openssl genrsa -out kubeedge.key 2048
# Generate csr, Fill required details after running the command
openssl req -new -key kubeedge.key -out kubeedge.csr
# Generate Certificate
openssl x509 -req -in kubeedge.csr -CA rootCA.crt -CAkey rootCA.key -CAcreateserial -out kubeedge.crt -days 500 -sha256
```
### Clone KubeEdge
Clone KubeEdge
```shell
git clone https://github.com/kubeedge/kubeedge.git $GOPATH/src/github.com/kubeedge/kubeedge
cd $GOPATH/src/github.com/kubeedge/kubeedge
```
### Build Cloud
```shell
cd $GOPATH/src/github.com/kubeedge/kubeedge/cloud/edgecontroller
make # or `make edgecontroller`
```
### Build Edge
```shell
cd $GOPATH/src/github.com/kubeedge/kubeedge/edge
make # or `make edge_core`
make # or `make edgecontroller`
```
KubeEdge can also be cross compiled to run on ARM based processors.
Please click [Cross Compilation](docs/setup/cross-compilation.md) for the instructions.
## Run KubeEdge
### Run Cloud
```shell
cd $GOPATH/src/github.com/kubeedge/kubeedge/cloud/edgecontroller
# run edge controller
# `conf/` should be in the same directory as the binary
# verify the configurations before running cloud(edgecontroller)
./edgecontroller
```
### Run Edge
We have provided a sample node.json to add a node in kubernetes. Please make sure edge-node is added in kubernetes. Run below steps to add edge-node
```shell
kubectl apply -f $GOPATH/src/github.com/kubeedge/kubeedge/build/node.json
```
Run Edge
```shell
# run mosquitto
mosquitto -d -p 1883
# run edge_core
# `conf/` should be in the same directory as the binary
# verify the configurations before running edge(edge_core)
./edge_core
# or
nohup ./edge_core > edge_core.log 2>&1 &
......@@ -149,7 +269,13 @@ nohup ./edge_core > edge_core.log 2>&1 &
If you are using HuaweiCloud IEF, then the edge node you created should be running (check it in the IEF console page).
### Deploy Application
Try out a sample application deployment by following below steps
```shell
kubectl apply -f $GOPATH/src/github.com/kubeedge/kubeedge/build/deployment.yaml
```
### Run Edge Unit Tests
```shell
......
apiVersion: apps/v1
kind: Deployment
metadata:
name: nginx-deployment
labels:
app: nginx
spec:
replicas: 1
selector:
matchLabels:
app: nginx
template:
metadata:
labels:
app: nginx
spec:
containers:
- name: nginx
image: nginx:1.7.9
ports:
- containerPort: 90
hostPort: 90
{
"kind": "Node",
"apiVersion": "v1",
"metadata": {
"name": "fb4ebb70-2783-42b8-b3ef-63e2fd6d242e",
"labels": {
"name": "edge-node"
}
}
}
This section contains the source code for KubeEdge cloud side components
## KubeEdge Cloud
At the cloud side, there are two major components: EdgeController and CloudHub.
EdgeController is an extended Kubernetes controller. It watches nodes and pods against APIServer for the cluster.
Upon changes in nodes/pods, KubeEdge will convert the pod/node binding info. in the format of node -- pods.
This way, an edge node can obtain pods targeted for itself. It enhances efficiency and reduces the network bandwidth requirement between cloud & edge.
# make edgecontroller
.PHONY: default edgecontroller
edgecontroller:
go build cmd/edgecontroller.go
package main
import (
_ "github.com/kubeedge/kubeedge/cloud/edgecontroller/pkg/cloudhub"
_ "github.com/kubeedge/kubeedge/cloud/edgecontroller/pkg/controller"
"github.com/kubeedge/kubeedge/common/beehive/pkg/core"
)
func main() {
core.Run()
}
controller:
kube:
master: http://localhost:8080
namespace: ""
content_type: "application/vnd.kubernetes.protobuf"
qps: 5
burst: 10
node_update_frequency: 10
cloudhub:
address: 0.0.0.0
port: 10000
ca: tmp/rootCA.crt
cert: tmp/test.crt
key: tmp/test.key
keepalive-interval: 30
write-timeout: 30
node-limit: 10
loggerLevel: "INFO"
enableRsyslog: false
logFormatText: true
writers: [file,stdout]
loggerFile: "edgecontroller.log"
modules:
enabled: [controller, cloudhub]
package channelq
import (
"fmt"
"strings"
"sync"
"github.com/kubeedge/kubeedge/cloud/edgecontroller/pkg/cloudhub/common/model"
"github.com/kubeedge/kubeedge/common/beehive/pkg/common/log"
"github.com/kubeedge/kubeedge/common/beehive/pkg/core/context"
)
// Read channel buffer size
const (
rChanBufSize = 10
)
// EventSet holds a set of events
type EventSet interface {
Ack() error
Get() (*model.Event, error)
}
// ChannelEventSet is the channel implementation of EventSet
type ChannelEventSet struct {
current model.Event
messages <-chan model.Event
}
// NewChannelEventSet initializes a new ChannelEventSet instance
func NewChannelEventSet(messages <-chan model.Event) *ChannelEventSet {
return &ChannelEventSet{messages: messages}
}
// Ack acknowledges once the event is processed
func (s *ChannelEventSet) Ack() error {
return nil
}
// Get obtains one event from the queue
func (s *ChannelEventSet) Get() (*model.Event, error) {
var ok bool
s.current, ok = <-s.messages
if !ok {
return nil, fmt.Errorf("failed to get message from cluster, reason: channel is closed")
}
return &s.current, nil
}
// ChannelEventQueue is the channel implementation of EventQueue
type ChannelEventQueue struct {
ctx *context.Context
channelPool sync.Map
}
// NewChannelEventQueue initializes a new ChannelEventQueue
func NewChannelEventQueue(ctx *context.Context) (*ChannelEventQueue, error) {
q := ChannelEventQueue{ctx: ctx}
go q.dispatchMessage()
return &q, nil
}
// dispatchMessage gets the message from the cloud , extracts the
// node id from it , gets the channel associated with the node
// and pushes the event on the channel
func (q *ChannelEventQueue) dispatchMessage() {
for {
msg, err := q.ctx.Receive("cloudhub")
if err != nil {
log.LOGGER.Infof("receive not Message format message")
continue
}
resource := msg.Router.Resource
tokens := strings.Split(resource, "/")
var nodeID string
for i, token := range tokens {
if token == "node" && i+1 < len(token) {
nodeID = tokens[i+1]
}
}
if nodeID == "" {
log.LOGGER.Warnf("node id is not found in the message")
continue
}
rChannel, err := q.getRChannel(nodeID)
if err != nil {
log.LOGGER.Infof("fail to get dispatch channel for %s", nodeID)
continue
}
event := model.MessageToEvent(&msg)
select {
case rChannel <- event:
default:
}
}
}
func (q *ChannelEventQueue) getRChannel(nodeID string) (chan model.Event, error) {
channels, ok := q.channelPool.Load(nodeID)
if !ok {
log.LOGGER.Errorf("rChannel for edge node %s is removed", nodeID)
return nil, fmt.Errorf("rChannel not found")
}
rChannel := channels.(chan model.Event)
return rChannel, nil
}
// Connect allocates rChannel for given project and group
func (q *ChannelEventQueue) Connect(info *model.HubInfo) error {
_, ok := q.channelPool.Load(info.NodeID)
if ok {
return fmt.Errorf("edge node %s is already connected", info.NodeID)
}
// allocate a new rchannel with default buffer size
rChannel := make(chan model.Event, rChanBufSize)
_, ok = q.channelPool.LoadOrStore(info.NodeID, rChannel)
if ok {
// rchannel is already allocated
return fmt.Errorf("edge node %s is already connected", info.NodeID)
}
return nil
}
// Close closes rChannel for given project and group
func (q *ChannelEventQueue) Close(info *model.HubInfo) error {
channels, ok := q.channelPool.Load(info.NodeID)
if !ok {
log.LOGGER.Warnf("rChannel for edge node %s is already removed", info.NodeID)
return nil
}
rChannel := channels.(chan model.Event)
close(rChannel)
q.channelPool.Delete(info.NodeID)
return nil
}
// Publish sends message via the rchannel to Edge Controller
func (q *ChannelEventQueue) Publish(info *model.HubInfo, event *model.Event) error {
msg := model.EventToMessage(event)
q.ctx.Send("controller", msg)
return nil
}
// Consume retrieves message from the rChannel for given project and group
func (q *ChannelEventQueue) Consume(info *model.HubInfo) (EventSet, error) {
rChannel, err := q.getRChannel(info.NodeID)
if err != nil {
return nil, err
}
return NewChannelEventSet((<-chan model.Event)(rChannel)), nil
}
// Workload returns the number of queue channels connected to queue
func (q *ChannelEventQueue) Workload() (float64, error) {
return 1, nil
}
package cloudhub
import (
"crypto/x509"
"fmt"
"io/ioutil"
"github.com/kubeedge/kubeedge/cloud/edgecontroller/pkg/cloudhub/channelq"
"github.com/kubeedge/kubeedge/cloud/edgecontroller/pkg/cloudhub/common/util"
"github.com/kubeedge/kubeedge/cloud/edgecontroller/pkg/cloudhub/wsserver"
"github.com/kubeedge/kubeedge/common/beehive/pkg/common/config"
"github.com/kubeedge/kubeedge/common/beehive/pkg/core"
"github.com/kubeedge/kubeedge/common/beehive/pkg/core/context"
)
type cloudHub struct {
context *context.Context
}
func init() {
core.Register(&cloudHub{})
}
func (a *cloudHub) Name() string {
return "cloudhub"
}
func (a *cloudHub) Group() string {
return "cloudhub"
}
func (a *cloudHub) Start(c *context.Context) {
a.context = c
var err error
caI := config.CONFIG.GetConfigurationByKey("cloudhub.ca")
certI := config.CONFIG.GetConfigurationByKey("cloudhub.cert")
keyI := config.CONFIG.GetConfigurationByKey("cloudhub.key")
util.HubConfig.Ca, err = ioutil.ReadFile(caI.(string))
if err != nil {
panic(err)
}
util.HubConfig.Cert, err = ioutil.ReadFile(certI.(string))
if err != nil {
panic(err)
}
util.HubConfig.Key, err = ioutil.ReadFile(keyI.(string))
if err != nil {
panic(err)
}
// init filter
pool := x509.NewCertPool()
ok := pool.AppendCertsFromPEM(util.HubConfig.Ca)
if !ok {
panic(fmt.Errorf("fail to load ca content"))
}
eventq, err := channelq.NewChannelEventQueue(c)
// start the cloudhub server
wsserver.StartCloudHub(util.HubConfig, eventq)
wsserver.EventHandler.Context = c
stopchan := make(chan bool)
<-stopchan
}
func (a *cloudHub) Cleanup() {
a.context.Cleanup(a.Name())
}
package model
import (
_ "encoding/json"
"fmt"
"strings"
"github.com/kubeedge/kubeedge/common/beehive/pkg/common/log"
"github.com/kubeedge/kubeedge/common/beehive/pkg/core/model"
)
// constants for resource types
const (
ResNode = "node"
ResMember = "membership"
ResTwin = "twin"
ResAuth = "auth_info"
ResDevice = "device"
)
// constants for resource operations
const (
OpGet = "get"
OpResult = "get_result"
OpList = "list"
OpDetail = "detail"
OpDelta = "delta"
OpDoc = "document"
OpUpdate = "updated"
OpInsert = "insert"
OpDelete = "deleted"
OpConnect = "connected"
OpDisConnect = "disconnected"
OpKeepalive = "keepalive"
)
// constants for message group
const (
GpResource = "resource"
)
// constants for message source
const (
SrcCloudHub = "cloudhub"
SrcController = "controller"
SrcManager = "edgemgr"
)
// HubInfo saves identifier information for edge hub
type HubInfo struct {
ProjectID string
NodeID string
}
// UserGroupInfo struct
type UserGroupInfo struct {
Resource string `json:"resource"`
Operation string `json:"operation"`
}
// Event represents message communicated between cloud hub and edge hub
type Event struct {
Group string `json:"msg_group"`
Source string `json:"source"`
UserGroup UserGroupInfo `json:"user_group"`
ID string `json:"msg_id"`
ParentID string `json:"parent_msg_id"`
Timestamp int64 `json:"timestamp"`
Content interface{} `json:"content"`
}
// EventToMessage converts an event to a model message
func EventToMessage(event *Event) model.Message {
var msg model.Message
msg.BuildHeader(event.ID, event.ParentID, event.Timestamp)
msg.BuildRouter(event.Source, event.Group, event.UserGroup.Resource, event.UserGroup.Operation)
msg.FillBody(event.Content)
return msg
}
// MessageToEvent converts a model message to an event
func MessageToEvent(msg *model.Message) Event {
var event Event
event.ID = msg.GetID()
event.ParentID = msg.GetParentID()
event.Timestamp = msg.GetTimestamp()
event.Source = msg.GetSource()
event.Group = msg.GetGroup()
event.Content = msg.GetContent()
event.UserGroup = UserGroupInfo{
Resource: msg.GetResource(),
Operation: msg.GetOperation(),
}
return event
}
// NewResource constructs a resource field using resource type and ID
func NewResource(resType, resID string, info *HubInfo) string {
var prefix string
if info != nil {
prefix = fmt.Sprintf("%s/%s/", "node", info.NodeID)
}
if resID == "" {
return fmt.Sprintf("%s%s", prefix, resType)
}
return fmt.Sprintf("%s%s/%s", prefix, resType, resID)
}
// IsNodeStopped indicates if the node is stopped or running
func (event *Event) IsNodeStopped() bool {
tokens := strings.Split(event.UserGroup.Resource, "/")
if len(tokens) != 2 || tokens[0] != ResNode {
return false
}
if event.UserGroup.Operation == OpDelete {
return true
}
if event.UserGroup.Operation != OpUpdate || event.Content == nil {
return false
}
body, ok := event.Content.(map[string]interface{})
if !ok {
log.LOGGER.Errorf("fail to decode node update message: %s, type is %T", event.GetContent(), event.Content)
// it can't be determined if the node has stopped
return false
}
// trust struct of json body
action, ok := body["action"]
if !ok || action.(string) != "stop" {
return false
}
return true
}
// isFromEdge judges if the event is sent from edge
func (event *Event) IsFromEdge() bool {
return true
}
// IsToEdge judges if the vent should be sent to edge
func (event *Event) IsToEdge() bool {
if event.Source != SrcManager {
return true
}
resource := event.UserGroup.Resource
if strings.HasPrefix(resource, ResNode) {
tokens := strings.Split(resource, "/")
if len(tokens) >= 3 {
resource = strings.Join(tokens[2:], "/")
}
}
// apply special check for edge manager
resOpMap := map[string][]string{
ResMember: {OpGet},
ResTwin: {OpDelta, OpDoc, OpGet},
ResAuth: {OpGet},
ResNode: {OpDelete},
}
for res, ops := range resOpMap {
for _, op := range ops {
if event.UserGroup.Operation == op && strings.Contains(resource, res) {
return false
}
}
}
return true
}
// GetContent dumps the content to string
func (event *Event) GetContent() string {
return fmt.Sprintf("%v", event.Content)
}
package util
import (
"github.com/kubeedge/kubeedge/common/beehive/pkg/common/config"
)
// HubConfig is the config for entire CloudHub
var HubConfig *Config
func init() {
HubConfig = &Config{}
HubConfig.Address, _ = config.CONFIG.GetValue("cloudhub.address").ToString()
HubConfig.Port, _ = config.CONFIG.GetValue("cloudhub.port").ToInt()
HubConfig.KeepaliveInterval, _ = config.CONFIG.GetValue("cloudhub.keepalive-interval").ToInt()
HubConfig.WriteTimeout, _ = config.CONFIG.GetValue("cloudhub.write-timeout").ToInt()
HubConfig.NodeLimit, _ = config.CONFIG.GetValue("cloudhub.node-limit").ToInt()
}
// Config represents configuration options for http access
type Config struct {
Address string
Port int
KeepaliveInterval int
Ca []byte
Cert []byte
Key []byte
WriteTimeout int
NodeLimit int
}
package wsserver
import (
"crypto/tls"
"crypto/x509"
"encoding/json"
"fmt"
"io"
"log"
"net/http"
"os"
"strings"
"sync"
"time"
"github.com/gorilla/mux"
"github.com/gorilla/websocket"
"github.com/kubeedge/kubeedge/cloud/edgecontroller/pkg/cloudhub/channelq"
emodel "github.com/kubeedge/kubeedge/cloud/edgecontroller/pkg/cloudhub/common/model"
"github.com/kubeedge/kubeedge/cloud/edgecontroller/pkg/cloudhub/common/util"
bhLog "github.com/kubeedge/kubeedge/common/beehive/pkg/common/log"
"github.com/kubeedge/kubeedge/common/beehive/pkg/core/context"
"github.com/kubeedge/kubeedge/common/beehive/pkg/core/model"
)
// ExitCode exit code
type ExitCode int
const (
webSocketReadFail ExitCode = iota
webSocketWriteFail
eventQueueDisconnect
nodeStop
)
// constants for error message
const (
MsgFormatError = "message format not correct"
)
// constants for api path
const (
PathEvent = "/{project_id}/{node_id}/events"
)
// EventHandler handle all event
var EventHandler *EventHandle
//AccessHandle access handler
type AccessHandle struct {
EventHandle *EventHandle
NodeLimit int
}
// EventHandle processes events between cloud and edge
type EventHandle struct {
KeepaliveInterval int
WriteTimeout int
Nodes sync.Map
nodeConns sync.Map
nodeLocks sync.Map
EventQueue *channelq.ChannelEventQueue
Context *context.Context
}
// JSONReadWriter reads/writes json objects from underline connection
type JSONReadWriter interface {
SetReadDeadline(time.Time) error
SetWriteDeadline(time.Time) error
ReadJSON(interface{}) error
WriteJSON(interface{}) error
Close() error
}
func dumpEventMetadata(event *emodel.Event) string {
return fmt.Sprintf("id: %s, parent_id: %s, group: %s, source: %s, resource: %s, operation: %s",
event.ID, event.ParentID, event.Group, event.Source, event.UserGroup.Resource, event.UserGroup.Operation)
}
func trimMessage(msg *model.Message) {
resource := msg.GetResource()
if strings.HasPrefix(resource, emodel.ResNode) {
tokens := strings.Split(resource, "/")
if len(tokens) < 3 {
bhLog.LOGGER.Warnf("event resource %s starts with node but length less than 3", resource)
} else {
msg.SetResourceOperation(strings.Join(tokens[2:], "/"), msg.GetOperation())
}
}
}
// EventReadLoop processes all read requests
func (eh *EventHandle) EventReadLoop(conn JSONReadWriter, info *emodel.HubInfo, stop chan ExitCode) {
for {
var msg model.Message
// set the read timeout as the keepalive interval so that we can disconnect when heart beat is lost
err := conn.SetReadDeadline(time.Now().Add(time.Duration(eh.KeepaliveInterval) * time.Second))
if err != nil {
bhLog.LOGGER.Errorf("SetReadDeadline error, %s", err.Error())
stop <- webSocketReadFail
return
}
err = conn.ReadJSON(&msg)
if err != nil {
bhLog.LOGGER.Errorf("read error, connection for node %s will be closed, reason: %s", info.NodeID, err.Error())
stop <- webSocketReadFail
return
}
msg.SetResourceOperation(fmt.Sprintf("node/%s/%s", info.NodeID, msg.GetResource()), msg.GetOperation())
event := emodel.MessageToEvent(&msg)
bhLog.LOGGER.Infof("event received for node %s %s, content: %s", info.NodeID, dumpEventMetadata(&event), event.Content)
if event.IsFromEdge() {
err := eh.EventQueue.Publish(info, &event)
if err != nil {
// content is not logged since it may contain sensitive information
bhLog.LOGGER.Errorf("fail to publish event for node %s, %s, reason: %s",
info.NodeID, dumpEventMetadata(&event), err.Error())
stop <- eventQueueDisconnect
return
}
}
}
}
func (eh *EventHandle) handleNodeQuery(info *emodel.HubInfo, event *emodel.Event) (bool, error) {
if event.UserGroup.Operation != "request_exist" {
return false, nil
}
msg := model.NewMessage(event.ID)
event.ID = msg.GetID()
event.ParentID = msg.GetParentID()
event.Timestamp = msg.GetTimestamp()
event.UserGroup.Operation = "response_exist"
return true, eh.EventQueue.Publish(info, event)
}
// EventWriteLoop processes all write requests
func (eh *EventHandle) EventWriteLoop(conn JSONReadWriter, info *emodel.HubInfo, stop chan ExitCode) {
events, err := eh.EventQueue.Consume(info)
if err != nil {
bhLog.LOGGER.Errorf("failed to consume event for node %s, reason: %s", info.NodeID, err.Error())
stop <- eventQueueDisconnect
return
}
for {
event, err := events.Get()
if err != nil {
bhLog.LOGGER.Errorf("failed to consume event for node %s, reason: %s", info.NodeID, err.Error())
if err.Error() == MsgFormatError {
// error format message should not impact other message
events.Ack()
continue
}
stop <- eventQueueDisconnect
return
}
isQuery, err := eh.handleNodeQuery(info, event)
if err != nil {
bhLog.LOGGER.Errorf("failed to process node query event for node %s, reason %s", info.NodeID, err.Error())
}
if isQuery {
events.Ack()
continue
}
if event.IsNodeStopped() {
bhLog.LOGGER.Infof("node %s is stopped, will disconnect", info.NodeID)
events.Ack()
stop <- nodeStop
return
}
if !event.IsToEdge() {
bhLog.LOGGER.Infof("skip only to cloud event for node %s, %s, content %s", info.NodeID, dumpEventMetadata(event), event.Content)
events.Ack()
continue
}
bhLog.LOGGER.Infof("event to send for node %s, %s, content %s", info.NodeID, dumpEventMetadata(event), event.Content)
msg := emodel.EventToMessage(event)
trimMessage(&msg)
err = conn.SetWriteDeadline(time.Now().Add(time.Duration(eh.WriteTimeout) * time.Second))
if err != nil {
bhLog.LOGGER.Errorf("SetWriteDeadline error, %s", err.Error())
stop <- webSocketWriteFail
return
}
err = eh.webSocketWrite(conn, info.NodeID, &msg)
if err != nil {
bhLog.LOGGER.Errorf("write error, connection for node %s will be closed, affected event %s, reason %s",
info.NodeID, dumpEventMetadata(event), err.Error())
stop <- webSocketWriteFail
return
}
events.Ack()
}
}
func (eh *EventHandle) webSocketWrite(conn JSONReadWriter, nodeID string, v interface{}) error {
value, ok := eh.nodeLocks.Load(nodeID)
if !ok {
return fmt.Errorf("node disconnected")
}
mutex := value.(*sync.Mutex)
mutex.Lock()
defer mutex.Unlock()
return conn.WriteJSON(v)
}
func notifyEventQueueError(conn JSONReadWriter, code ExitCode, nodeID string) {
if code == eventQueueDisconnect {
msg := model.NewMessage("").BuildRouter(emodel.GpResource, emodel.SrcCloudHub, emodel.NewResource(emodel.ResNode, nodeID, nil), emodel.OpDisConnect)
err := conn.WriteJSON(msg)
if err != nil {
bhLog.LOGGER.Errorf("fail to notify node %s event queue disconnected, reason: %s", nodeID, err.Error())
}
}
}
func constructConnectEvent(info *emodel.HubInfo, isConnected bool) *emodel.Event {
connected := emodel.OpConnect
if !isConnected {
connected = emodel.OpDisConnect
}
body := map[string]interface{}{
"event_type": connected,
"timestamp": time.Now().Unix(),
"client_id": info.NodeID}
content, _ := json.Marshal(body)
msg := model.NewMessage("")
return &emodel.Event{
Group: emodel.GpResource,
Source: emodel.SrcCloudHub,
UserGroup: emodel.UserGroupInfo{
Resource: emodel.NewResource(emodel.ResNode, info.NodeID, nil),
Operation: connected,
},
ID: msg.GetID(),
ParentID: msg.GetParentID(),
Timestamp: msg.GetTimestamp(),
Content: string(content),
}
}
// ServeEvent handle the event coming from websocket
func (ah *AccessHandle) ServeEvent(w http.ResponseWriter, r *http.Request) {
vars := mux.Vars(r)
projectID := vars["project_id"]
nodeID := vars["node_id"]
if ah.EventHandle.GetNodeCount() >= ah.NodeLimit {
bhLog.LOGGER.Errorf("fail to serve node %s, reach node limit", nodeID)
http.Error(w, "too many Nodes connected", http.StatusTooManyRequests)
return
}
upgrader := websocket.Upgrader{}
conn, err := upgrader.Upgrade(w, r, nil)
if err != nil {
bhLog.LOGGER.Errorf("fail to build websocket connection for node %s, reason %s", nodeID, err.Error())
http.Error(w, "failed to upgrade to websocket protocol", http.StatusInternalServerError)
return
}
info := &emodel.HubInfo{ProjectID: projectID, NodeID: nodeID}
ah.EventHandle.ServeConn(conn, info)
}
// ServeConn starts serving the incoming connection
func (eh *EventHandle) ServeConn(conn JSONReadWriter, info *emodel.HubInfo) {
err := eh.EventQueue.Connect(info)
if err != nil {
bhLog.LOGGER.Errorf("fail to connect to event queue for node %s, reason %s", info.NodeID, err.Error())
notifyEventQueueError(conn, eventQueueDisconnect, info.NodeID)
err = conn.Close()
if err != nil {
bhLog.LOGGER.Errorf("fail to close connection, reason: %s", err.Error())
}
return
}
err = eh.EventQueue.Publish(info, constructConnectEvent(info, true))
if err != nil {
bhLog.LOGGER.Errorf("fail to publish node connect event for node %s, reason %s", info.NodeID, err.Error())
notifyEventQueueError(conn, eventQueueDisconnect, info.NodeID)
err = conn.Close()
if err != nil {
bhLog.LOGGER.Errorf("fail to close connection, reason: %s", err.Error())
}
eh.EventQueue.Close(info)
return
}
eh.nodeConns.Store(info.NodeID, conn)
eh.nodeLocks.Store(info.NodeID, &sync.Mutex{})
eh.Nodes.Store(info.NodeID, true)
bhLog.LOGGER.Infof("edge node %s for project %s connected", info.NodeID, info.ProjectID)
stop := make(chan ExitCode, 2)
go eh.EventReadLoop(conn, info, stop)
go eh.EventWriteLoop(conn, info, stop)
code := <-stop
bhLog.LOGGER.Infof("edge node %s for project %s disconnected", info.NodeID, info.ProjectID)
eh.nodeLocks.Delete(info.NodeID)
eh.nodeConns.Delete(info.NodeID)
err = eh.EventQueue.Publish(info, constructConnectEvent(info, false))
if err != nil {
bhLog.LOGGER.Errorf("fail to publish node disconnect event for node %s, reason %s", info.NodeID, err.Error())
}
notifyEventQueueError(conn, code, info.NodeID)
eh.Nodes.Delete(info.NodeID)
err = conn.Close()
if err != nil {
bhLog.LOGGER.Errorf("fail to close connection, reason: %s", err.Error())
}
eh.EventQueue.Close(info)
}
// ServeQueueWorkload handle workload from queue
func (ah *AccessHandle) ServeQueueWorkload(w http.ResponseWriter, r *http.Request) {
workload, err := ah.EventHandle.GetWorkload()
if err != nil {
bhLog.LOGGER.Errorf("%s", err.Error())
http.Error(w, "fail to get event queue workload", http.StatusInternalServerError)
return
}
_, err = io.WriteString(w, fmt.Sprintf("%f", workload))
if err != nil {
bhLog.LOGGER.Errorf("fail to write string, reason: %s", err.Error())
}
}
// GetNodeCount returns the number of connected Nodes
func (eh *EventHandle) GetNodeCount() int {
var num int
iter := func(key, value interface{}) bool {
num++
return true
}
eh.Nodes.Range(iter)
return num
}
// GetWorkload returns the workload of the event queue
func (eh *EventHandle) GetWorkload() (float64, error) {
return eh.EventQueue.Workload()
}
// returns if the event queue is available or not.
// returns 0 if not available and 1 if available.
func (ah *AccessHandle) getEventQueueAvailability() int {
_, err := ah.EventHandle.GetWorkload()
if err != nil {
bhLog.LOGGER.Errorf("eventq is not available, reason %s", err.Error())
return 0
}
return 1
}
// FilterWriter filter writer
type FilterWriter struct{}
func (f *FilterWriter) Write(p []byte) (n int, err error) {
output := string(p)
if strings.Contains(output, "http: TLS handshake error from") {
return 0, nil
}
return os.Stderr.Write(p)
}
// StartCloudHub starts the cloud hub service
func StartCloudHub(config *util.Config, eventq *channelq.ChannelEventQueue) {
// init certificate
pool := x509.NewCertPool()
ok := pool.AppendCertsFromPEM(config.Ca)
if !ok {
panic(fmt.Errorf("fail to load ca content"))
}
cert, err := tls.X509KeyPair(config.Cert, config.Key)
if err != nil {
panic(err)
}
tlsConfig := tls.Config{
ClientCAs: pool,
ClientAuth: tls.RequestClientCert,
Certificates: []tls.Certificate{cert},
MinVersion: tls.VersionTLS12,
CipherSuites: []uint16{tls.TLS_ECDHE_RSA_WITH_AES_128_GCM_SHA256},
}
// init handler
ah := &AccessHandle{
EventHandle: &EventHandle{
KeepaliveInterval: config.KeepaliveInterval,
WriteTimeout: config.WriteTimeout,
EventQueue: eventq,
},
NodeLimit: config.NodeLimit,
}
EventHandler = ah.EventHandle
router := mux.NewRouter()
router.HandleFunc(PathEvent, ah.ServeEvent)
// start server
s := http.Server{
Addr: fmt.Sprintf("%s:%d", config.Address, config.Port),
Handler: router,
TLSConfig: &tlsConfig,
ErrorLog: log.New(&FilterWriter{}, "", log.LstdFlags),
}
bhLog.LOGGER.Infof("Start cloud hub service")
go s.ListenAndServeTLS("", "")
}
package config
import (
"github.com/kubeedge/kubeedge/cloud/edgecontroller/pkg/controller/constants"
"github.com/kubeedge/kubeedge/common/beehive/pkg/common/config"
"github.com/kubeedge/kubeedge/common/beehive/pkg/common/log"
)
// UpdatePodStatusBuffer is the size of channel which save update pod status message from edge
var UpdatePodStatusBuffer int
// UpdateNodeStatusBuffer is the size of channel which save update node status message from edge
var UpdateNodeStatusBuffer int
// QueryConfigMapBuffer is the size of channel which save query configmap message from edge
var QueryConfigMapBuffer int
// QuerySecretBuffer is the size of channel which save query secret message from edge
var QuerySecretBuffer int
func init() {
if psb, err := config.CONFIG.GetValue("update-pod-status-buffer").ToInt(); err != nil {
UpdatePodStatusBuffer = constants.DefaultUpdatePodStatusBuffer
} else {
UpdatePodStatusBuffer = psb
}
log.LOGGER.Infof("update pod status buffer: %d", UpdatePodStatusBuffer)
if nsb, err := config.CONFIG.GetValue("update-node-status-buffer").ToInt(); err != nil {
UpdateNodeStatusBuffer = constants.DefaultUpdateNodeStatusBuffer
} else {
UpdateNodeStatusBuffer = nsb
}
log.LOGGER.Infof("Update node status buffer: %d", UpdateNodeStatusBuffer)
if qcb, err := config.CONFIG.GetValue("query-configmap-buffer").ToInt(); err != nil {
QueryConfigMapBuffer = constants.DefaultQueryConfigMapBuffer
} else {
QueryConfigMapBuffer = qcb
}
log.LOGGER.Infof("query config map buffer: %d", QueryConfigMapBuffer)
if qsb, err := config.CONFIG.GetValue("query-secret-buffer").ToInt(); err != nil {
QuerySecretBuffer = constants.DefaultQuerySecretBuffer
} else {
QuerySecretBuffer = qsb
}
log.LOGGER.Infof("query secret buffer: %d", QuerySecretBuffer)
}
package config
import (
"github.com/kubeedge/kubeedge/cloud/edgecontroller/pkg/controller/constants"
"github.com/kubeedge/kubeedge/common/beehive/pkg/common/config"
"github.com/kubeedge/kubeedge/common/beehive/pkg/common/log"
"github.com/kubeedge/kubeedge/common/beehive/pkg/core/context"
)
// ContextSendModule is the name send message to
var ContextSendModule string
// ContextReceiveModule is the name receive message from
var ContextReceiveModule string
// ContextResponseModule is the name response message from
var ContextResponseModule string
// Context ...
var Context *context.Context
func init() {
if smn, err := config.CONFIG.GetValue("context-send-module").ToString(); err != nil {
ContextSendModule = constants.DefaultContextSendModuleName
} else {
ContextSendModule = smn
}
log.LOGGER.Infof(" send module name: %s", ContextSendModule)
if rmn, err := config.CONFIG.GetValue("context-receive-module").ToString(); err != nil {
ContextReceiveModule = constants.DefaultContextReceiveModuleName
} else {
ContextReceiveModule = rmn
}
log.LOGGER.Infof("receive module name: %s", ContextReceiveModule)
if rmn, err := config.CONFIG.GetValue("context-response-module").ToString(); err != nil {
ContextResponseModule = constants.DefaultContextResponseModuleName
} else {
ContextResponseModule = rmn
}
log.LOGGER.Infof("response module name: %s", ContextResponseModule)
}
package config
import (
"time"
"github.com/kubeedge/kubeedge/cloud/edgecontroller/pkg/controller/constants"
"github.com/kubeedge/kubeedge/common/beehive/pkg/common/config"
"github.com/kubeedge/kubeedge/common/beehive/pkg/common/log"
)
// KubeMaster is the url of edge master(kube api server)
var KubeMaster string
// KubeConfig is the config used connect to edge master
var KubeConfig string
// KubeNamespace is the namespace to watch(default is NamespaceAll)
var KubeNamespace string
// KubeContentType is the content type communicate with edge master(default is "application/vnd.kubernetes.protobuf")
var KubeContentType string
// KubeQPS is the QPS communicate with edge master(default is 1024)
var KubeQPS float32
// KubeBurst default is 10
var KubeBurst int
// KubeUpdateNodeFrequency is the time duration for update node status(default is 20s)
var KubeUpdateNodeFrequency time.Duration
func init() {
if km, err := config.CONFIG.GetValue("controller.kube.master").ToString(); err != nil {
log.LOGGER.Errorf("kube master not set")
} else {
KubeMaster = km
}
log.LOGGER.Infof("kube master: %s", KubeMaster)
if kc, err := config.CONFIG.GetValue("controller.kube.kubeconfig").ToString(); err != nil {
log.LOGGER.Errorf("kube config not set")
} else {
KubeConfig = kc
}
log.LOGGER.Infof("kube config: %s", KubeConfig)
if kn, err := config.CONFIG.GetValue("controller.kube.namespace").ToString(); err != nil {
KubeNamespace = constants.DefaultKubeNamespace
} else {
KubeNamespace = kn
}
log.LOGGER.Infof("kube namespace: %s", KubeNamespace)
if kct, err := config.CONFIG.GetValue("controller.kube.content_type").ToString(); err != nil {
KubeContentType = constants.DefaultKubeContentType
} else {
KubeContentType = kct
}
log.LOGGER.Infof("kube content type: %s", KubeContentType)
if kqps, err := config.CONFIG.GetValue("controller.kube.qps").ToFloat64(); err != nil {
KubeQPS = constants.DefaultKubeQPS
} else {
KubeQPS = float32(kqps)
}
log.LOGGER.Infof("kube QPS: %f", KubeQPS)
if kb, err := config.CONFIG.GetValue("controller.kube.burst").ToInt(); err != nil {
KubeBurst = constants.DefaultKubeBurst
} else {
KubeBurst = kb
}
log.LOGGER.Infof("kube burst: %d", KubeBurst)
if kuf, err := config.CONFIG.GetValue("controller.kube.node_update_frequency").ToInt64(); err != nil {
KubeUpdateNodeFrequency = constants.DefaultKubeUpdateNodeFrequency * time.Second
} else {
KubeUpdateNodeFrequency = time.Duration(kuf) * time.Second
}
log.LOGGER.Infof("kube update frequency: %v", KubeUpdateNodeFrequency)
}
Supports Markdown
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment