Unverified Commit 5e94989b authored by CMC's avatar CMC Committed by GitHub
Browse files

Polish/remove cache store (#2688)

* polish: remove cache store

* change to use interface with cache (#2487)

* refactor: :bulb: metrics store cache discard bytes and adopt struct

* refactor: :bulb:

 change to use interface with cache

* change to interface cache value
Co-authored-by: default avatarbugaolengdeyuxiaoer <46627662+bugaolengdeyuxiaoer@users.noreply.github.com>
Showing with 518 additions and 881 deletions
+518 -881
......@@ -455,14 +455,156 @@ func (b BoolValue) Value() interface{} {
}
type InterfaceValue struct {
o interface{}
o interface{}
size int64
}
func (i InterfaceValue) Size() int64 {
return interfaceGetSize(i.o)
if i.size == 0 {
return 0
}
if size, err := calc(reflect.ValueOf(i.o)); err != nil {
logrus.Error(err)
return -1
} else {
return int64(size)
}
}
// Copy creates a deep copy of whatever is passed to it and returns the copy
// in an interface{}. The returned value will need to be asserted to the
// correct type.
func copy(src interface{}) interface{} {
if src == nil {
return nil
}
// Make the interface a reflect.Value
original := reflect.ValueOf(src)
// Make a copy of the same type as the original.
cpy := reflect.New(original.Type()).Elem()
// Recursively copy the original.
copyAndCalcRecursive(original, cpy)
// Return the copy as an interface.
return cpy.Interface()
}
// copyAndCalcRecursive does the actual copying of the interface. It currently has
// limited support for what it can handle. Add as needed.
func copyAndCalcRecursive(original, cpy reflect.Value) (int, error) {
size := 0
// handle according to original's Kind
switch original.Kind() {
case reflect.Ptr:
// Get the actual value being pointed to.
originalValue := original.Elem()
// if it isn't valid, return.
if !originalValue.IsValid() {
return 0, nil
}
cpy.Set(reflect.New(originalValue.Type()))
s, err := copyAndCalcRecursive(originalValue, cpy.Elem())
if err != nil {
return 0, err
}
size += s
case reflect.Interface:
// If this is a nil, don't do anything
if original.IsNil() {
return 0, nil
}
// Get the value for the interface, not the pointer.
originalValue := original.Elem()
// Get the value by calling Elem().
copyValue := reflect.New(originalValue.Type()).Elem()
s, err := copyAndCalcRecursive(originalValue, copyValue)
if err != nil {
return 0, err
}
size += s
cpy.Set(copyValue)
case reflect.Struct:
t, ok := original.Interface().(time.Time)
if ok {
cpy.Set(reflect.ValueOf(t))
return 0, nil
}
// Go through each field of the struct and copy it.
for i := 0; i < original.NumField(); i++ {
// The Type's StructField for a given field is checked to see if StructField.PkgPath
// is set to determine if the field is exported or not because CanSet() returns false
// for settable fields. I'm not sure why. -mohae
if original.Type().Field(i).PkgPath != "" {
continue
}
s, err := copyAndCalcRecursive(original.Field(i), cpy.Field(i))
if err != nil {
return 0, err
}
size += s
}
case reflect.Slice:
if original.IsNil() {
return 0, nil
}
// Make a new slice and copy each element.
cpy.Set(reflect.MakeSlice(original.Type(), original.Len(), original.Cap()))
for i := 0; i < original.Len(); i++ {
s, err := copyAndCalcRecursive(original.Index(i), cpy.Index(i))
if err != nil {
return 0, err
}
size += s
}
case reflect.Map:
if original.IsNil() {
return 0, nil
}
cpy.Set(reflect.MakeMap(original.Type()))
for _, key := range original.MapKeys() {
originalValue := original.MapIndex(key)
copyValue := reflect.New(originalValue.Type()).Elem()
s, err := copyAndCalcRecursive(originalValue, copyValue)
if err != nil {
return 0, err
}
size += s
copyKey := copy(key.Interface())
cpy.SetMapIndex(reflect.ValueOf(copyKey), copyValue)
}
default:
switch original.Kind() {
case reflect.Int8, reflect.Bool, reflect.Uint8:
size += 1
case reflect.Int16, reflect.Uint16:
size += 2
case reflect.Int, reflect.Int32, reflect.Uint32, reflect.Float32:
size += 4
case reflect.Int64, reflect.Uint64, reflect.Float64:
size += 8
case reflect.String:
size += original.Len()
default:
return -1, ValueNotSupportError
}
cpy.Set(original)
}
return size, nil
}
func calc(refValue reflect.Value) (int, error) {
if !refValue.IsValid() {
return 0, nil
}
refValue.Type()
size := 0
switch refValue.Kind() {
......@@ -518,14 +660,27 @@ func calc(refValue reflect.Value) (int, error) {
return size, nil
}
func interfaceGetSize(o interface{}) int64 {
refValue := reflect.ValueOf(o)
size, err := calc(refValue)
func getInterfaceValue(src interface{}) (InterfaceValue, error) {
i := InterfaceValue{}
if src == nil {
return i, nil
}
// Make the interface a reflect.Value
original := reflect.ValueOf(src)
// Make a copy of the same type as the original.
cpy := reflect.New(original.Type()).Elem()
// Recursively copy the original.
s, err := copyAndCalcRecursive(original, cpy)
if err != nil {
logrus.Error(err)
return -1
return i, err
}
return int64(size)
i.size = int64(s)
// Return the copy as an interface.
return i, nil
}
func (i InterfaceValue) String() string {
......@@ -689,7 +844,16 @@ func MarshalValue(o interface{}) (Values, error) {
}
func GetInterfaceValue(o interface{}) (Values, error) {
return Values{InterfaceValue{o: o}}, nil
refValue := reflect.ValueOf(o)
s, err := calc(refValue)
if err != nil {
return nil, err
}
ifv := InterfaceValue{
o: o,
size: int64(s),
}
return Values{ifv}, nil
}
func GetByteValue(d byte) (Values, error) {
......
......@@ -19,7 +19,6 @@ import (
"fmt"
"time"
jsi "github.com/json-iterator/go"
types2 "github.com/rancher/apiserver/pkg/types"
"github.com/rancher/wrangler/pkg/data"
"github.com/sirupsen/logrus"
......@@ -150,11 +149,7 @@ func GetNamespaceAllocatedRes(ctx context.Context, server _interface.SteveServer
hasExpired = true
}
if value != nil {
var nar AllocatedRes
if err = jsi.Unmarshal(value[0].Value().([]byte), &nar); err != nil {
logrus.Errorf("failed to unmarshal for namespace %s in GetNodeAllocatedRes, %v", namespace, err)
continue
}
nar := value[0].Value().(AllocatedRes)
nsAllocatedRes[namespace] = nar
continue
}
......@@ -177,7 +172,7 @@ func GetNamespaceAllocatedRes(ctx context.Context, server _interface.SteveServer
Mem: mem,
PodNum: podNum,
}
value, err = cache.MarshalValue(nar)
value, err = cache.GetInterfaceValue(nar)
if err != nil {
logrus.Errorf("failed to marshal value for namespace %s in GetNamespaceAllocatedRes, %v", namespace, err)
continue
......@@ -187,10 +182,7 @@ func GetNamespaceAllocatedRes(ctx context.Context, server _interface.SteveServer
continue
}
if err = jsi.Unmarshal(value[0].Value().([]byte), &nar); err != nil {
logrus.Errorf("failed to unmarshal for namespace %s in GetNamespaceAllocatedRes, %v", namespace, err)
continue
}
nar = value[0].Value().(AllocatedRes)
nsAllocatedRes[namespace] = nar
}
if hasExpired {
......@@ -220,7 +212,7 @@ func GetNamespaceAllocatedRes(ctx context.Context, server _interface.SteveServer
Mem: mem,
PodNum: podNum,
}
value, err := cache.MarshalValue(nar)
value, err := cache.GetInterfaceValue(nar)
if err != nil {
logrus.Errorf("failed to marshal value for namespace %s in GetNamespaceAllocatedRes goroutine, %v", namespace, err)
continue
......@@ -282,11 +274,7 @@ func GetNodesAllocatedRes(ctx context.Context, server _interface.SteveServer, no
hasExpired = true
}
if value != nil {
var nar AllocatedRes
if err = jsi.Unmarshal(value[0].Value().([]byte), &nar); err != nil {
logrus.Errorf("failed to unmarshal for node %s in GetNodeAllocatedRes, %v", nodeName, err)
continue
}
nar := value[0].Value().(AllocatedRes)
nodesAllocatedRes[nodeName] = nar
continue
}
......@@ -309,7 +297,7 @@ func GetNodesAllocatedRes(ctx context.Context, server _interface.SteveServer, no
Mem: mem,
PodNum: podNum,
}
value, err = cache.MarshalValue(nar)
value, err = cache.GetInterfaceValue(nar)
if err != nil {
logrus.Errorf("failed to marshal value for node %s in GetNodeAllocatedRes, %v", nodeName, err)
continue
......@@ -319,10 +307,7 @@ func GetNodesAllocatedRes(ctx context.Context, server _interface.SteveServer, no
continue
}
if err = jsi.Unmarshal(value[0].Value().([]byte), &nar); err != nil {
logrus.Errorf("failed to unmarshal for node %s in GetNodeAllocatedRes, %v", nodeName, err)
continue
}
nar = value[0].Value().(AllocatedRes)
nodesAllocatedRes[nodeName] = nar
}
if hasExpired {
......@@ -353,7 +338,7 @@ func GetNodesAllocatedRes(ctx context.Context, server _interface.SteveServer, no
Mem: mem,
PodNum: podNum,
}
value, err := cache.MarshalValue(nar)
value, err := cache.GetInterfaceValue(nar)
if err != nil {
logrus.Errorf("failed to marshal value for node %s in GetNodeAllocatedRes goroutine, %v", nodeName, err)
continue
......
......@@ -115,39 +115,39 @@ func (t *ComponentEventTable) GenComponentState(component *cptype.Component) err
}
func (t *ComponentEventTable) DecodeURLQuery() error {
query, ok := t.sdk.InParams["eventTable__urlQuery"].(string)
queryData, ok := t.sdk.InParams["eventTable__urlQuery"].(string)
if !ok {
return nil
}
decode, err := base64.StdEncoding.DecodeString(query)
decoded, err := base64.StdEncoding.DecodeString(queryData)
if err != nil {
return err
}
urlQuery := make(map[string]interface{})
if err := json.Unmarshal(decode, &urlQuery); err != nil {
query := make(map[string]interface{})
if err := json.Unmarshal(decoded, &query); err != nil {
return err
}
t.State.PageNo = uint64(urlQuery["pageNo"].(float64))
t.State.PageSize = uint64(urlQuery["pageSize"].(float64))
sorter := urlQuery["sorterData"].(map[string]interface{})
t.State.Sorter.Field = sorter["field"].(string)
t.State.Sorter.Order = sorter["order"].(string)
t.State.PageNo = uint64(query["pageNo"].(float64))
t.State.PageSize = uint64(query["pageSize"].(float64))
sorterData := query["sorterData"].(map[string]interface{})
t.State.Sorter.Field = sorterData["field"].(string)
t.State.Sorter.Order = sorterData["order"].(string)
return nil
}
func (t *ComponentEventTable) EncodeURLQuery() error {
query := make(map[string]interface{})
query["pageNo"] = int(t.State.PageNo)
query["pageSize"] = int(t.State.PageSize)
query["sorterData"] = t.State.Sorter
urlQuery := make(map[string]interface{})
urlQuery["pageNo"] = int(t.State.PageNo)
urlQuery["pageSize"] = int(t.State.PageSize)
urlQuery["sorterData"] = t.State.Sorter
data, err := json.Marshal(query)
jsonData, err := json.Marshal(urlQuery)
if err != nil {
return err
}
decode := base64.StdEncoding.EncodeToString(data)
t.State.EventTableUQLQuery = decode
decoded := base64.StdEncoding.EncodeToString(jsonData)
t.State.EventTableUQLQuery = decoded
return nil
}
......@@ -375,10 +375,10 @@ func (t *ComponentEventTable) SetComponentValue(ctx context.Context) {
}
}
func (t *ComponentEventTable) Transfer(component *cptype.Component) {
component.Props = t.Props
component.Data = map[string]interface{}{"list": t.Data.List}
component.State = map[string]interface{}{
func (t *ComponentEventTable) Transfer(c *cptype.Component) {
c.Props = t.Props
c.Data = map[string]interface{}{"list": t.Data.List}
c.State = map[string]interface{}{
"clusterName": t.State.ClusterName,
"filterValues": t.State.FilterValues,
"pageNo": t.State.PageNo,
......@@ -387,7 +387,7 @@ func (t *ComponentEventTable) Transfer(component *cptype.Component) {
"total": t.State.Total,
"eventTable__urlQuery": t.State.EventTableUQLQuery,
}
component.Operations = t.Operations
c.Operations = t.Operations
}
func contain(arr []string, target string) bool {
......
......@@ -85,41 +85,41 @@ func (f *ComponentFilter) InitComponent(ctx context.Context) {
}
func (f *ComponentFilter) DecodeURLQuery() error {
query, ok := f.sdk.InParams["filter__urlQuery"].(string)
urlQuery, ok := f.sdk.InParams["filter__urlQuery"].(string)
if !ok {
return nil
}
decode, err := base64.StdEncoding.DecodeString(query)
decoded, err := base64.StdEncoding.DecodeString(urlQuery)
if err != nil {
return err
}
var values Values
if err := json.Unmarshal(decode, &values); err != nil {
var v Values
if err := json.Unmarshal(decoded, &v); err != nil {
return err
}
f.State.Values = values
f.State.Values = v
return nil
}
func (f *ComponentFilter) EncodeURLQuery() error {
jsonData, err := json.Marshal(f.State.Values)
data, err := json.Marshal(f.State.Values)
if err != nil {
return err
}
encoded := base64.StdEncoding.EncodeToString(jsonData)
f.State.FilterURLQuery = encoded
encode := base64.StdEncoding.EncodeToString(data)
f.State.FilterURLQuery = encode
return nil
}
func (f *ComponentFilter) GenComponentState(c *cptype.Component) error {
if c == nil || c.State == nil {
func (f *ComponentFilter) GenComponentState(component *cptype.Component) error {
if component == nil || component.State == nil {
return nil
}
var state State
cont, err := json.Marshal(c.State)
cont, err := json.Marshal(component.State)
if err != nil {
logrus.Errorf("marshal component state failed, content:%v, err:%v", c.State, err)
logrus.Errorf("marshal component state failed, content:%v, err:%v", component.State, err)
return err
}
err = json.Unmarshal(cont, &state)
......@@ -283,14 +283,14 @@ func (f *ComponentFilter) SetComponentValue(ctx context.Context) error {
return nil
}
func (f *ComponentFilter) Transfer(c *cptype.Component) {
c.State = map[string]interface{}{
func (f *ComponentFilter) Transfer(component *cptype.Component) {
component.State = map[string]interface{}{
"clusterName": f.State.ClusterName,
"conditions": f.State.Conditions,
"values": f.State.Values,
"filter__urlQuery": f.State.FilterURLQuery,
}
c.Operations = f.Operations
component.Operations = f.Operations
}
func (f *ComponentFilter) getDisplayName(name string) (string, error) {
......
......@@ -21,18 +21,17 @@ import (
"github.com/pkg/errors"
"github.com/sirupsen/logrus"
v1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"github.com/erda-project/erda-infra/base/servicehub"
"github.com/erda-project/erda-infra/providers/component-protocol/cptype"
"github.com/erda-project/erda-infra/providers/component-protocol/utils/cputil"
"github.com/erda-project/erda/apistructs"
"github.com/erda-project/erda/bundle"
"github.com/erda-project/erda/modules/cmp"
"github.com/erda-project/erda/modules/cmp/cache"
"github.com/erda-project/erda/modules/cmp/cmp_interface"
"github.com/erda-project/erda/modules/cmp/component-protocol/components/cmp-dashboard-nodes/common"
"github.com/erda-project/erda/modules/cmp/component-protocol/types"
"github.com/erda-project/erda/modules/cmp/steve/proxy"
"github.com/erda-project/erda/modules/openapi/component-protocol/components/base"
)
......@@ -256,12 +255,8 @@ func (bot *BatchOperationTipModal) DrainNode(nodeIDs []string) error {
return err
}
}
gvk := v1.GroupVersionKind{
Version: "v1",
Kind: "Pod",
}
cacheKey := proxy.CacheKey{
GVK: gvk.String(),
cacheKey := cmp.CacheKey{
Kind: string(apistructs.K8SNode),
ClusterName: bot.SDK.InParams["clusterName"].(string),
}
if _, err := cache.GetFreeCache().Remove(cacheKey.GetKey()); err != nil {
......
......@@ -178,7 +178,7 @@ func (nf *NodeFilter) getState(labels map[string]struct{}) {
Value: "env",
Label: nf.SDK.I18n("env-label"),
Children: []filter.Option{
{Label: nf.SDK.I18n("dev"), Value: "dice/workspace-dev"},
{Label: nf.SDK.I18n("dev"), Value: "dice/workspace-dev=true"},
{Label: nf.SDK.I18n("test"), Value: "dice/workspace-test=true"},
{Label: nf.SDK.I18n("staging"), Value: "dice/workspace-staging=true"},
{Label: nf.SDK.I18n("prod"), Value: "dice/workspace-prod=true"},
......@@ -205,8 +205,8 @@ func (nf *NodeFilter) getState(labels map[string]struct{}) {
Value: "other-label",
Label: nf.SDK.I18n("other-label"),
Children: append([]filter.Option{
{Label: nf.SDK.I18n("lb"), Value: "dice/lb"},
{Label: nf.SDK.I18n("platform"), Value: "dice/platform"},
{Label: nf.SDK.I18n("lb"), Value: "dice/lb=true"},
{Label: nf.SDK.I18n("platform"), Value: "dice/platform=true"},
}, customOps...),
},
}...,
......
......@@ -26,7 +26,6 @@ import (
"time"
"github.com/go-openapi/strfmt"
jsi "github.com/json-iterator/go"
"github.com/pkg/errors"
types2 "github.com/rancher/apiserver/pkg/types"
"github.com/sirupsen/logrus"
......@@ -284,14 +283,14 @@ func (p *ComponentPodsTable) RenderTable() error {
}
cpuStatus, cpuValue, cpuTip := "success", "0", "N/A"
metricsData := getCache(cache.GenerateKey(p.State.ClusterName, name, namespace, metrics.Cpu, metrics.Pod))
metricsData := metrics.GetCache(cache.GenerateKey(p.State.ClusterName, name, namespace, metrics.Cpu, metrics.Pod))
if metricsData != nil && !cpuLimits.IsZero() {
usedCPUPercent := metricsData.Used
cpuStatus, cpuValue, cpuTip = p.parseResPercent(usedCPUPercent, cpuLimits, resource.DecimalSI)
}
memStatus, memValue, memTip := "success", "0", "N/A"
metricsData = getCache(cache.GenerateKey(p.State.ClusterName, name, namespace, metrics.Memory, metrics.Pod))
metricsData = metrics.GetCache(cache.GenerateKey(p.State.ClusterName, name, namespace, metrics.Memory, metrics.Pod))
if metricsData != nil && !memLimits.IsZero() {
usedMemPercent := metricsData.Used
memStatus, memValue, memTip = p.parseResPercent(usedMemPercent, memLimits, resource.BinarySI)
......@@ -706,18 +705,3 @@ func parseResource(str string, format resource.Format) *resource.Quantity {
res, _ := resource.ParseQuantity(str)
return &res
}
func getCache(key string) *metrics.MetricsData {
v, _, err := cache.GetFreeCache().Get(key)
if err != nil {
logrus.Errorf("get metrics %v err :%v", key, err)
}
d := &metrics.MetricsData{}
if v != nil {
err = jsi.Unmarshal(v[0].Value().([]byte), d)
if err != nil {
logrus.Errorf("get metrics %v unmarshal to json err :%v", key, err)
}
}
return d
}
......@@ -25,7 +25,6 @@ import (
"sync"
"github.com/go-openapi/strfmt"
jsi "github.com/json-iterator/go"
"github.com/pkg/errors"
types2 "github.com/rancher/apiserver/pkg/types"
"github.com/rancher/wrangler/pkg/data"
......@@ -303,14 +302,14 @@ func (p *ComponentPodsTable) RenderTable() error {
}
cpuStatus, cpuValue, cpuTip := "success", "0", "N/A"
metricsData := getCache(cache.GenerateKey(p.State.ClusterName, podName, podNamespace, metrics.Cpu, metrics.Pod))
metricsData := metrics.GetCache(cache.GenerateKey(p.State.ClusterName, name, namespace, metrics.Cpu, metrics.Pod))
if metricsData != nil && !cpuLimits.IsZero() {
usedCPUPercent := metricsData.Used
cpuStatus, cpuValue, cpuTip = p.parseResPercent(usedCPUPercent, cpuLimits, resource.DecimalSI)
}
memStatus, memValue, memTip := "success", "0", "N/A"
metricsData = getCache(cache.GenerateKey(p.State.ClusterName, podName, podNamespace, metrics.Memory, metrics.Pod))
metricsData = metrics.GetCache(cache.GenerateKey(p.State.ClusterName, name, namespace, metrics.Memory, metrics.Pod))
if metricsData != nil && !memLimits.IsZero() {
usedMemPercent := metricsData.Used
memStatus, memValue, memTip = p.parseResPercent(usedMemPercent, memLimits, resource.BinarySI)
......@@ -739,19 +738,3 @@ func getRange(length, pageNo, pageSize int) (int, int) {
}
return l, r
}
func getCache(key string) *metrics.MetricsData {
v, _, err := cache.GetFreeCache().Get(key)
if err != nil {
logrus.Errorf("get metrics %v err :%v", key, err)
}
d := &metrics.MetricsData{}
if v != nil {
err = jsi.Unmarshal(v[0].Value().([]byte), d)
if err != nil {
logrus.Errorf("get metrics %v unmarshal to json err :%v", key, err)
}
}
return d
}
......@@ -25,7 +25,6 @@ import (
"github.com/pkg/errors"
"github.com/sirupsen/logrus"
v1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/runtime/schema"
"k8s.io/apimachinery/pkg/types"
"github.com/erda-project/erda-infra/base/servicehub"
......@@ -39,7 +38,6 @@ import (
cputil2 "github.com/erda-project/erda/modules/cmp/component-protocol/cputil"
cmpTypes "github.com/erda-project/erda/modules/cmp/component-protocol/types"
"github.com/erda-project/erda/modules/cmp/steve/middleware"
"github.com/erda-project/erda/modules/cmp/steve/proxy"
"github.com/erda-project/erda/modules/openapi/component-protocol/components/base"
)
......@@ -196,19 +194,12 @@ func (b *ComponentRestartButton) restartWorkload(userID, orgID, clusterName, kin
return errors.Errorf("failed to marshal body, %v", err)
}
gvk := schema.GroupVersionKind{
Group: "apps",
Version: "v1",
}
switch kind {
case string(apistructs.K8SDeployment):
gvk.Kind = "Deployment"
_, err = client.ClientSet.AppsV1().Deployments(namespace).Patch(b.ctx, name, types.StrategicMergePatchType, data, v1.PatchOptions{})
case string(apistructs.K8SStatefulSet):
gvk.Kind = "StatefulSet"
_, err = client.ClientSet.AppsV1().StatefulSets(namespace).Patch(b.ctx, name, types.StrategicMergePatchType, data, v1.PatchOptions{})
case string(apistructs.K8SDaemonSet):
gvk.Kind = "DaemonSet"
_, err = client.ClientSet.AppsV1().StatefulSets(namespace).Patch(b.ctx, name, types.StrategicMergePatchType, data, v1.PatchOptions{})
default:
return errors.Errorf("invalid workload kind %s (only deployment, statefulSet and daemonSet can be restarted)", kind)
......@@ -217,12 +208,12 @@ func (b *ComponentRestartButton) restartWorkload(userID, orgID, clusterName, kin
return err
}
cacheKey := proxy.CacheKey{
GVK: gvk.String(),
cacheKey := cmp.CacheKey{
Kind: kind,
ClusterName: clusterName,
}
if _, err := cache.GetFreeCache().Remove(cacheKey.GetKey()); err != nil {
logrus.Errorf("failed to remove cache for %s, %v", gvk.String(), err)
logrus.Errorf("failed to remove cache for %s, %v", kind, err)
}
return nil
}
......
......@@ -85,33 +85,33 @@ func (f *ComponentFilter) InitComponent(ctx context.Context) {
}
func (f *ComponentFilter) DecodeURLQuery() error {
query, ok := f.sdk.InParams["filter__urlQuery"].(string)
urlQuery, ok := f.sdk.InParams["filter__urlQuery"].(string)
if !ok {
return nil
}
decode, err := base64.StdEncoding.DecodeString(query)
decoded, err := base64.StdEncoding.DecodeString(urlQuery)
if err != nil {
return err
}
var values Values
if err := json.Unmarshal(decode, &values); err != nil {
var v Values
if err := json.Unmarshal(decoded, &v); err != nil {
return err
}
f.State.Values = values
f.State.Values = v
return nil
}
func (f *ComponentFilter) GenComponentState(c *cptype.Component) error {
if c == nil || c.State == nil {
func (f *ComponentFilter) GenComponentState(component *cptype.Component) error {
if component == nil || component.State == nil {
return nil
}
var state State
data, err := json.Marshal(c.State)
jsonData, err := json.Marshal(component.State)
if err != nil {
return err
}
if err = json.Unmarshal(data, &state); err != nil {
if err = json.Unmarshal(jsonData, &state); err != nil {
return err
}
f.State = state
......@@ -308,13 +308,13 @@ func (f *ComponentFilter) SetComponentValue(ctx context.Context) error {
}
func (f *ComponentFilter) EncodeURLQuery() error {
jsonData, err := json.Marshal(f.State.Values)
data, err := json.Marshal(f.State.Values)
if err != nil {
return err
}
encode := base64.StdEncoding.EncodeToString(jsonData)
f.State.FilterURLQuery = encode
encoded := base64.StdEncoding.EncodeToString(data)
f.State.FilterURLQuery = encoded
return nil
}
......
......@@ -84,32 +84,32 @@ func (w *ComponentWorkloadTable) Render(ctx context.Context, component *cptype.C
}
func (w *ComponentWorkloadTable) DecodeURLQuery() error {
query, ok := w.sdk.InParams["workloadTable__urlQuery"].(string)
queryData, ok := w.sdk.InParams["workloadTable__urlQuery"].(string)
if !ok {
return nil
}
decode, err := base64.StdEncoding.DecodeString(query)
decoded, err := base64.StdEncoding.DecodeString(queryData)
if err != nil {
return err
}
urlQuery := make(map[string]interface{})
if err := json.Unmarshal(decode, &urlQuery); err != nil {
query := make(map[string]interface{})
if err := json.Unmarshal(decoded, &query); err != nil {
return err
}
w.State.PageNo = uint64(urlQuery["pageNo"].(float64))
w.State.PageSize = uint64(urlQuery["pageSize"].(float64))
sorterData := urlQuery["sorterData"].(map[string]interface{})
w.State.Sorter.Field, _ = sorterData["field"].(string)
w.State.Sorter.Order, _ = sorterData["order"].(string)
w.State.PageNo = uint64(query["pageNo"].(float64))
w.State.PageSize = uint64(query["pageSize"].(float64))
sorter := query["sorterData"].(map[string]interface{})
w.State.Sorter.Field, _ = sorter["field"].(string)
w.State.Sorter.Order, _ = sorter["order"].(string)
return nil
}
func (w *ComponentWorkloadTable) EncodeURLQuery() error {
query := make(map[string]interface{})
query["pageNo"] = w.State.PageNo
query["pageSize"] = w.State.PageSize
query["sorterData"] = w.State.Sorter
data, err := json.Marshal(query)
urlQuery := make(map[string]interface{})
urlQuery["pageNo"] = w.State.PageNo
urlQuery["pageSize"] = w.State.PageSize
urlQuery["sorterData"] = w.State.Sorter
data, err := json.Marshal(urlQuery)
if err != nil {
return err
}
......
......@@ -23,7 +23,6 @@ import (
"strings"
"time"
jsi "github.com/json-iterator/go"
"github.com/pkg/errors"
"github.com/rancher/wrangler/pkg/data"
"github.com/sirupsen/logrus"
......@@ -271,7 +270,7 @@ func GetAllProjectsDisplayNameFromCache(bdl *bundle.Bundle, orgID string) (map[u
if err != nil {
return nil, err
}
values, err := cache.MarshalValue(id2displayName)
values, err := cache.GetInterfaceValue(id2displayName)
if err != nil {
return nil, errors.Errorf("failed to marshal cache value for projects dispalyName, %v", err)
}
......@@ -287,7 +286,7 @@ func GetAllProjectsDisplayNameFromCache(bdl *bundle.Bundle, orgID string) (map[u
logrus.Errorf("failed to get all projects displayName in goroutine, %v", err)
return
}
values, err := cache.MarshalValue(id2displayName)
values, err := cache.GetInterfaceValue(id2displayName)
if err != nil {
logrus.Errorf("failed to marshal cache value for projects displayName in goroutine, %v", err)
return
......@@ -298,10 +297,7 @@ func GetAllProjectsDisplayNameFromCache(bdl *bundle.Bundle, orgID string) (map[u
}
}()
}
id2displayName := make(map[uint64]string)
if err := jsi.Unmarshal(values[0].Value().([]byte), &id2displayName); err != nil {
return nil, err
}
id2displayName := values[0].Value().(map[uint64]string)
return id2displayName, nil
}
......
......@@ -21,7 +21,6 @@ import (
"strconv"
"time"
jsi "github.com/json-iterator/go"
"github.com/pkg/errors"
"github.com/sirupsen/logrus"
"golang.org/x/time/rate"
......@@ -223,7 +222,7 @@ func (m *Metric) Store(resp *pb.QueryWithInfluxFormatResponse, metricsRequest *M
}
func SetCache(k string, d interface{}) {
data, err := cache.MarshalValue(d)
data, err := cache.GetInterfaceValue(d)
if err != nil {
logrus.Errorf("cache marshal metrics %v err: %v", k, err)
} else {
......@@ -240,12 +239,14 @@ func GetCache(key string) *MetricsData {
logrus.Errorf("get metrics %v err :%v", key, err)
return nil
}
var d *MetricsData
var (
d *MetricsData
ok bool
)
if v != nil {
d = &MetricsData{}
err = jsi.Unmarshal(v[0].Value().([]byte), d)
if err != nil {
logrus.Errorf("get metrics %v unmarshal to json err :%v", key, err)
d, ok = v[0].Value().(*MetricsData)
if !ok {
logrus.Errorf("get metrics %v assert err", key)
}
}
return d
......
......@@ -24,6 +24,7 @@ import (
"github.com/gorilla/mux"
"github.com/pkg/errors"
"github.com/rancher/apiserver/pkg/types"
"github.com/rancher/steve/pkg/attributes"
"github.com/sirupsen/logrus"
v1 "k8s.io/api/core/v1"
apierrors "k8s.io/apimachinery/pkg/api/errors"
......@@ -74,6 +75,46 @@ func (a *Aggregator) GetAllClusters() []string {
return clustersNames
}
func (a *Aggregator) IsServerReady(clusterName string) bool {
s, ok := a.servers.Load(clusterName)
if !ok {
return false
}
g := s.(*group)
return g.ready
}
// HasAccess set schemas for apiOp and check access for user in apiOp
func (a *Aggregator) HasAccess(clusterName string, apiOp *types.APIRequest, verb string) (bool, error) {
g, ok := a.servers.Load(clusterName)
if !ok {
return false, errors.Errorf("steve server not found for cluster %s", clusterName)
}
server := g.(*group).server
if err := server.SetSchemas(apiOp); err != nil {
return false, err
}
schema := apiOp.Schemas.LookupSchema(apiOp.Type)
if schema == nil {
return false, errors.Errorf("steve server for cluster %s is not ready", clusterName)
}
user, ok := request.UserFrom(apiOp.Context())
if !ok {
return false, nil
}
access := server.AccessSetLookup.AccessFor(user)
gr := attributes.GR(schema)
ns := apiOp.Namespace
if ns == "" {
ns = "*"
}
return access.Grants(verb, gr, ns, attributes.Resource(schema)), nil
}
func (a *Aggregator) watchClusters(ctx context.Context) {
for {
select {
......@@ -343,7 +384,14 @@ func (a *Aggregator) Serve(clusterName string, apiOp *types.APIRequest) error {
if !group.ready {
return apierrors2.ErrInvoke.InternalError(errors.Errorf("k8s API for cluster %s is not ready, please wait", clusterName))
}
return group.server.Handle(apiOp)
if apiOp.Schemas == nil {
if err := group.server.SetSchemas(apiOp); err != nil {
return err
}
}
group.server.Handle(apiOp)
return nil
}
func (a *Aggregator) createSteve(clusterInfo apistructs.ClusterInfo) (*Server, context.CancelFunc, error) {
......@@ -414,9 +462,10 @@ func (a *Aggregator) list(server *Server, resType string) int {
}
apiOp.Request = req
if err = server.Handle(apiOp); err != nil {
if err = server.SetSchemas(apiOp); err != nil {
logrus.Errorf("failed to preload cache for %s, %v", resType, err)
}
server.Handle(apiOp)
return resp.StatusCode
}
......
// Copyright (c) 2021 Terminus, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package proxy
import (
"context"
"crypto/sha256"
"encoding/hex"
"os"
"strconv"
"time"
jsi "github.com/json-iterator/go"
"github.com/rancher/apiserver/pkg/apierror"
"github.com/rancher/apiserver/pkg/types"
"github.com/rancher/steve/pkg/accesscontrol"
"github.com/rancher/steve/pkg/attributes"
"github.com/rancher/wrangler/pkg/schemas/validation"
"github.com/sirupsen/logrus"
"k8s.io/apiserver/pkg/endpoints/request"
"github.com/erda-project/erda/modules/cmp/cache"
"github.com/erda-project/erda/modules/cmp/queue"
)
var queryQueue *queue.QueryQueue
func init() {
queueSize := 10
if size, err := strconv.Atoi(os.Getenv("LIST_QUEUE_SIZE")); err == nil && size > queueSize {
queueSize = size
}
queryQueue = queue.NewQueryQueue(queueSize)
}
type cacheStore struct {
types.Store
ctx context.Context
asl accesscontrol.AccessSetLookup
cache *cache.Cache
clusterName string
}
type CacheKey struct {
GVK string
Namespace string
ClusterName string
}
func (k *CacheKey) GetKey() string {
d := sha256.New()
d.Write([]byte(k.GVK))
d.Write([]byte(k.Namespace))
d.Write([]byte(k.ClusterName))
return hex.EncodeToString(d.Sum(nil))
}
func (c *cacheStore) List(apiOp *types.APIRequest, schema *types.APISchema) (types.APIObjectList, error) {
if !c.hasAccess(apiOp, schema, "list") {
return types.APIObjectList{}, apierror.NewAPIError(validation.PermissionDenied, "access denied")
}
if apiOp.Query.Get("labelSelector") != "" || apiOp.Query.Get("fieldSelector") != "" {
return c.Store.List(apiOp, schema)
}
gvk := attributes.GVK(schema)
key := CacheKey{
GVK: gvk.String(),
Namespace: apiOp.Namespace,
ClusterName: c.clusterName,
}
logrus.Infof("[DEBUG %s] start get cache at %s", apiOp.Type, time.Now().Format(time.StampNano))
values, lexpired, err := c.cache.Get(key.GetKey())
logrus.Infof("[DEBUG %s] end get cache at %s", apiOp.Type, time.Now().Format(time.StampNano))
if values == nil || err != nil {
if apiOp.Namespace != "" {
key := CacheKey{
GVK: gvk.String(),
Namespace: "",
ClusterName: c.clusterName,
}
allNsValues, expired, err := c.cache.Get(key.GetKey())
if allNsValues != nil && err == nil && !expired {
var list types.APIObjectList
logrus.Infof("[DEBUG %s] start jsi unmarshal data from cache at %s", apiOp.Type, time.Now().Format(time.StampNano))
if err = jsi.Unmarshal(allNsValues[0].Value().([]byte), &list); err == nil {
logrus.Infof("[DEBUG %s] end jsi unmarshal data from cache at %s", apiOp.Type, time.Now().Format(time.StampNano))
logrus.Infof("[DEBUG %s] start get by namespace at %s", apiOp.Type, time.Now().Format(time.StampNano))
list := getByNamespace(list, apiOp.Namespace)
logrus.Infof("[DEBUG %s] end get by namespace at %s", apiOp.Type, time.Now().Format(time.StampNano))
return list, nil
}
}
}
logrus.Infof("[DEBUG %s] start list at %s", apiOp.Type, time.Now().Format(time.StampNano))
queryQueue.Acquire(c.clusterName, 1)
list, err := c.Store.List(apiOp, schema)
queryQueue.Release(c.clusterName, 1)
if err != nil {
return types.APIObjectList{}, err
}
logrus.Infof("[DEBUG %s] end list at %s", apiOp.Type, time.Now().Format(time.StampNano))
logrus.Infof("[DEBUG %s] start marshal for cache at %s", apiOp.Type, time.Now().Format(time.StampNano))
vals, err := cache.MarshalValue(list)
logrus.Infof("[DEBUG %s] end marshal for cache at %s", apiOp.Type, time.Now().Format(time.StampNano))
if err != nil {
logrus.Errorf("failed to marshal cache data for %s, %v", gvk.Kind, err)
return types.APIObjectList{}, apierror.NewAPIError(validation.ServerError, "internal error")
}
logrus.Infof("[DEBUG %s] start set cache at %s", apiOp.Type, time.Now().Format(time.StampNano))
if err = c.cache.Set(key.GetKey(), vals, time.Second.Nanoseconds()*30); err != nil {
logrus.Errorf("failed to set cache for %s, %v", gvk.String(), err)
}
logrus.Infof("[DEBUG %s] end set cache at %s", apiOp.Type, time.Now().Format(time.StampNano))
return list, nil
}
if lexpired {
logrus.Infof("list data is expired, need update, key:%s", key.GetKey())
if !cache.ExpireFreshQueue.IsFull() {
task := &queue.Task{
Key: key.GetKey(),
Do: func() {
user, ok := request.UserFrom(apiOp.Context())
if !ok {
logrus.Errorf("user not found in context when steve auth")
return
}
ctx := request.WithUser(c.ctx, user)
newOp := apiOp.WithContext(ctx)
list, err := c.Store.List(newOp, schema)
if err != nil {
logrus.Errorf("failed to list %s in steve cache store, %v", gvk.Kind, err)
return
}
data, err := cache.MarshalValue(list)
if err != nil {
logrus.Errorf("failed to marshal cache data for %s, %v", gvk.Kind, err)
return
}
if err = c.cache.Set(key.GetKey(), data, time.Second.Nanoseconds()*30); err != nil {
logrus.Errorf("failed to set cache for %s, %v", gvk.String(), err)
}
},
}
cache.ExpireFreshQueue.Enqueue(task)
} else {
logrus.Warnf("queue size is full, task is ignored, key:%s", key.GetKey())
}
}
var list types.APIObjectList
logrus.Infof("[DEBUG %s] start unmarshal data from cache at %s", apiOp.Type, time.Now().Format(time.StampNano))
if err = jsi.Unmarshal(values[0].Value().([]byte), &list); err != nil {
logrus.Errorf("failed to marshal list %s result, %v", gvk.Kind, err)
return types.APIObjectList{}, apierror.NewAPIError(validation.ServerError, "internal error")
}
logrus.Infof("[DEBUG %s] end unmarshal data from cache at %s", apiOp.Type, time.Now().Format(time.StampNano))
return list, nil
}
func (c *cacheStore) Create(apiOp *types.APIRequest, schema *types.APISchema, data types.APIObject) (types.APIObject, error) {
gvk := attributes.GVK(schema)
key := CacheKey{
GVK: gvk.String(),
ClusterName: c.clusterName,
}
if _, err := c.cache.Remove(key.GetKey()); err != nil {
logrus.Errorf("failed to remove cache for %s, %v", gvk.String(), err)
}
return c.Store.Create(apiOp, schema, data)
}
func (c *cacheStore) Update(apiOp *types.APIRequest, schema *types.APISchema, data types.APIObject, id string) (types.APIObject, error) {
gvk := attributes.GVK(schema)
key := CacheKey{
GVK: gvk.String(),
ClusterName: c.clusterName,
}
if _, err := c.cache.Remove(key.GetKey()); err != nil {
logrus.Errorf("failed to remove cache for %s, %v", gvk.String(), err)
}
return c.Store.Update(apiOp, schema, data, id)
}
func (c *cacheStore) Delete(apiOp *types.APIRequest, schema *types.APISchema, id string) (types.APIObject, error) {
gvk := attributes.GVK(schema)
key := CacheKey{
GVK: gvk.String(),
ClusterName: c.clusterName,
}
if _, err := c.cache.Remove(key.GetKey()); err != nil {
logrus.Errorf("failed to remove cache for %s, %v", gvk.String(), err)
}
return c.Store.Delete(apiOp, schema, id)
}
func (c *cacheStore) hasAccess(apiOp *types.APIRequest, schema *types.APISchema, verb string) bool {
user, ok := request.UserFrom(apiOp.Context())
if !ok {
return false
}
access := c.asl.AccessFor(user)
gr := attributes.GR(schema)
ns := apiOp.Namespace
if ns == "" {
ns = "*"
}
return access.Grants(verb, gr, ns, attributes.Resource(schema))
}
func getByNamespace(list types.APIObjectList, namespace string) types.APIObjectList {
res := types.APIObjectList{
Revision: "-1",
}
for _, apiObj := range list.Objects {
if apiObj.Namespace() == namespace {
res.Objects = append(res.Objects, apiObj)
}
}
return res
}
// Copyright (c) 2021 Terminus, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package proxy
import (
"context"
"net/http"
"testing"
"github.com/rancher/apiserver/pkg/types"
"github.com/rancher/steve/pkg/accesscontrol"
"github.com/rancher/steve/pkg/attributes"
v1 "github.com/rancher/wrangler/pkg/generated/controllers/rbac/v1"
"github.com/rancher/wrangler/pkg/schemas"
rbacv1 "k8s.io/api/rbac/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apiserver/pkg/authentication/user"
"k8s.io/apiserver/pkg/endpoints/request"
"github.com/erda-project/erda/modules/cmp/cache"
)
type rbacInterface struct {
v1.Interface
}
func (r *rbacInterface) Role() v1.RoleController {
return &roleController{}
}
func (r *rbacInterface) RoleBinding() v1.RoleBindingController {
return &roleBindingController{}
}
func (r *rbacInterface) ClusterRole() v1.ClusterRoleController {
return &clusterRoleController{}
}
func (r *rbacInterface) ClusterRoleBinding() v1.ClusterRoleBindingController {
return &clusterRoleBindingController{}
}
type roleController struct {
v1.RoleController
}
func (r *roleController) Cache() v1.RoleCache {
return &roleCache{}
}
func (r *roleController) OnChange(_ context.Context, _ string, _ v1.RoleHandler) {
}
type roleBindingController struct {
v1.RoleBindingController
}
func (r *roleBindingController) Cache() v1.RoleBindingCache {
return &roleBindingCache{}
}
type clusterRoleController struct {
v1.ClusterRoleController
}
func (c *clusterRoleController) Cache() v1.ClusterRoleCache {
return &clusterRoleCache{}
}
func (c *clusterRoleController) OnChange(_ context.Context, _ string, _ v1.ClusterRoleHandler) {
}
type clusterRoleBindingController struct {
v1.ClusterRoleBindingController
}
func (c *clusterRoleBindingController) Cache() v1.ClusterRoleBindingCache {
return &clusterRoleBindingCache{}
}
type roleCache struct {
v1.RoleCache
}
func (r *roleCache) Get(_, name string) (*rbacv1.Role, error) {
return testRoles[name], nil
}
type roleBindingCache struct {
v1.RoleBindingCache
}
func (r *roleBindingCache) GetByIndex(_, key string) ([]*rbacv1.RoleBinding, error) {
return testRoleBindings[key], nil
}
func (r *roleBindingCache) AddIndexer(_ string, _ v1.RoleBindingIndexer) {
}
type clusterRoleCache struct {
v1.ClusterRoleCache
}
func (c *clusterRoleCache) Get(name string) (*rbacv1.ClusterRole, error) {
return testClusterRoles[name], nil
}
type clusterRoleBindingCache struct {
v1.ClusterRoleBindingCache
}
func (c *clusterRoleBindingCache) GetByIndex(_, key string) ([]*rbacv1.ClusterRoleBinding, error) {
return testClusterRoleBindings[key], nil
}
func (c *clusterRoleBindingCache) AddIndexer(_ string, _ v1.ClusterRoleBindingIndexer) {
}
var (
testRoles = map[string]*rbacv1.Role{
// default namespace pods reader
"viewer": {
ObjectMeta: metav1.ObjectMeta{
Namespace: "default",
},
Rules: []rbacv1.PolicyRule{
{
Verbs: []string{"get"},
APIGroups: []string{""},
Resources: []string{"pods"},
},
},
},
}
testRoleBindings = map[string][]*rbacv1.RoleBinding{
// default namespace pods reader
"viewer": {
{
ObjectMeta: metav1.ObjectMeta{
Namespace: "default",
},
Subjects: []rbacv1.Subject{
{
Kind: "Group",
Name: "viewer",
},
},
RoleRef: rbacv1.RoleRef{
Kind: "Role",
Name: "viewer",
},
},
},
}
testClusterRoles = map[string]*rbacv1.ClusterRole{
// test for admin
"manager": {
Rules: []rbacv1.PolicyRule{
{
Verbs: []string{"*"},
APIGroups: []string{"*"},
Resources: []string{"*"},
},
},
},
}
testClusterRoleBindings = map[string][]*rbacv1.ClusterRoleBinding{
// test for admin
"manager": {
{
Subjects: []rbacv1.Subject{
{
Kind: "Group",
Name: "manager",
},
},
RoleRef: rbacv1.RoleRef{
Kind: "ClusterRole",
Name: "manager",
},
},
},
}
methods = []string{
"get", "list", "update", "create", "update", "delete",
}
)
func TestHasAccess(t *testing.T) {
ctx := context.Background()
cs := cacheStore{
asl: accesscontrol.NewAccessStore(ctx, true, &rbacInterface{}),
}
req, err := http.NewRequestWithContext(ctx, http.MethodGet, "https://unit.test", nil)
if err != nil {
t.Error(err)
}
// test admin
managerUser := &user.DefaultInfo{
Name: "manager-01",
UID: "manager-01",
Groups: []string{
"manager",
},
}
managerCtx := request.WithUser(ctx, managerUser)
managerReq := &types.APIRequest{
Request: req.WithContext(managerCtx),
}
for _, res := range []string{"roles", "clusterRoles", "roleBindings", "clusterRoleBindings", "secrets", "nodes"} {
schema := &types.APISchema{
Schema: &schemas.Schema{
Attributes: map[string]interface{}{
"group": "",
"resource": res,
},
},
}
for _, method := range methods {
if !cs.hasAccess(managerReq, schema, method) {
t.Errorf("test failed, user %s expected to have access for %s %s, actual not", method, managerUser.Name, res)
}
}
}
defaultPodsViewer := &user.DefaultInfo{
Name: "defaultPodViewer-01",
UID: "defaultPodViewer-01",
Groups: []string{
"viewer",
},
}
viewerCtx := request.WithUser(ctx, defaultPodsViewer)
viewerReq := &types.APIRequest{
Request: req.WithContext(viewerCtx),
}
for res, resAccess := range map[string]bool{
"pods": true,
"deployments": false,
"statefulSets": false,
"nodes": false,
} {
schema := &types.APISchema{
Schema: &schemas.Schema{
Attributes: map[string]interface{}{
"group": "",
"resource": res,
},
},
}
// all namespaces
viewerReq.Namespace = ""
if cs.hasAccess(viewerReq, schema, "get") {
t.Errorf("test failed, user %s is not expected to have access for get %s in all namespaces, actual have", defaultPodsViewer.Name, res)
}
for i := 1; i < len(methods); i++ {
if cs.hasAccess(viewerReq, schema, methods[i]) {
t.Errorf("test failed, user %s is not expected to have access for %s %s, actual not", methods[i], defaultPodsViewer.Name, res)
}
}
// default namespace
viewerReq.Namespace = "default"
if !cs.hasAccess(viewerReq, schema, "get") && resAccess {
t.Errorf("test failed, user %s is expected to have access for get %s in default namespace, actual not", defaultPodsViewer.Name, res)
} else if cs.hasAccess(viewerReq, schema, "get") && !resAccess {
t.Errorf("test failed, user %s is not expected to have access for get %s in all namespaces, actual have", defaultPodsViewer.Name, res)
}
}
}
type store struct {
types.Store
}
func (s *store) List(_ *types.APIRequest, _ *types.APISchema) (types.APIObjectList, error) {
return types.APIObjectList{
Revision: "-1",
Continue: "false",
Objects: []types.APIObject{
{
Type: "pod",
ID: "test",
},
},
}, nil
}
func (s *store) Create(_ *types.APIRequest, _ *types.APISchema, _ types.APIObject) (types.APIObject, error) {
return types.APIObject{}, nil
}
func (s *store) Update(_ *types.APIRequest, _ *types.APISchema, _ types.APIObject, _ string) (types.APIObject, error) {
return types.APIObject{}, nil
}
func (s *store) Delete(_ *types.APIRequest, _ *types.APISchema, _ string) (types.APIObject, error) {
return types.APIObject{}, nil
}
func TestCacheStoreMethods(t *testing.T) {
ctx := context.Background()
cache, err := cache.New(256<<20, 256*1024)
if err != nil {
t.Error(err)
}
cs := cacheStore{
Store: &store{},
ctx: ctx,
asl: accesscontrol.NewAccessStore(ctx, true, &rbacInterface{}),
cache: cache,
}
req, err := http.NewRequestWithContext(ctx, http.MethodGet, "https://unit.test", nil)
if err != nil {
t.Error(err)
}
managerUser := &user.DefaultInfo{
Name: "manager-02",
UID: "manager-02",
Groups: []string{
"manager",
},
}
managerCtx := request.WithUser(ctx, managerUser)
apiOp := &types.APIRequest{
Request: req.WithContext(managerCtx),
}
schema := &types.APISchema{
Schema: &schemas.Schema{
Attributes: map[string]interface{}{
"group": "",
"version": "v1",
"kind": "pod",
},
},
}
_, err = cs.List(apiOp, schema)
if err != nil {
t.Error(err)
}
_, err = cs.List(apiOp, schema)
if err != nil {
t.Error(err)
}
gvk := attributes.GVK(schema)
key := CacheKey{
GVK: gvk.String(),
Namespace: "",
}
if res, _, err := cs.cache.Get(key.GetKey()); res == nil || err != nil {
t.Error("test failed, expected pods in cache, actual not")
}
_, err = cs.Create(apiOp, schema, types.APIObject{})
if res, _, err := cs.cache.Get(key.GetKey()); res != nil && err == nil {
t.Error("test failed, expected no pods in cache, actual have")
}
_, err = cs.List(apiOp, schema)
if err != nil {
t.Error(err)
}
_, err = cs.Update(apiOp, schema, types.APIObject{}, "")
if res, _, err := cs.cache.Get(key.GetKey()); res != nil && err == nil {
t.Error("test failed, expected no pods in cache, actual have")
}
_, err = cs.List(apiOp, schema)
if err != nil {
t.Error(err)
}
_, err = cs.Delete(apiOp, schema, "")
if res, _, err := cs.cache.Get(key.GetKey()); res != nil && err == nil {
t.Error("test failed, expected no pods in cache, actual have")
}
}
......@@ -40,8 +40,6 @@ import (
"k8s.io/apimachinery/pkg/util/sets"
"k8s.io/apimachinery/pkg/watch"
"k8s.io/client-go/dynamic"
"github.com/erda-project/erda/modules/cmp/cache"
)
var (
......@@ -59,14 +57,8 @@ type Store struct {
func NewProxyStore(ctx context.Context, clusterName string, clientGetter proxy.ClientGetter, asl accesscontrol.AccessSetLookup) types.Store {
return &errorStore{
Store: &cacheStore{
Store: &Store{
clientGetter: clientGetter,
},
ctx: ctx,
asl: asl,
cache: cache.GetFreeCache(),
clusterName: clusterName,
Store: &Store{
clientGetter: clientGetter,
},
}
}
......
......@@ -159,6 +159,7 @@ func setup(ctx context.Context, server *Server) error {
asl := server.AccessSetLookup
if asl == nil {
asl = accesscontrol.NewAccessStore(ctx, true, server.controllers.RBAC)
server.AccessSetLookup = asl
}
sf := schema.NewCollection(ctx, server.BaseSchemas, asl)
......@@ -228,17 +229,20 @@ func (c *Server) ListenAndServe(ctx context.Context, httpsPort, httpPort int, op
return ctx.Err()
}
func (c *Server) Handle(apiOp *types.APIRequest) error {
func (c *Server) SetSchemas(apiOp *types.APIRequest) error {
user, ok := request.UserFrom(apiOp.Request.Context())
if !ok {
return fmt.Errorf("user can not be empty in apiRequest")
}
schemas, err := c.SchemaFactory.Schemas(user)
if err != nil {
logrus.Errorf("handle failed, %v", err)
logrus.Errorf("set schemas failed, %v", err)
return err
}
apiOp.Schemas = schemas
c.APIServer.Handle(apiOp)
return nil
}
func (c *Server) Handle(apiOp *types.APIRequest) {
c.APIServer.Handle(apiOp)
}
......@@ -17,11 +17,14 @@ package cmp
import (
"bytes"
"context"
"crypto/sha256"
"encoding/hex"
"encoding/json"
"fmt"
"io"
"net/http"
"net/url"
"os"
"reflect"
"strconv"
"strings"
......@@ -38,6 +41,8 @@ import (
"github.com/erda-project/erda/apistructs"
"github.com/erda-project/erda/bundle/apierrors"
"github.com/erda-project/erda/modules/cmp/cache"
"github.com/erda-project/erda/modules/cmp/queue"
"github.com/erda-project/erda/modules/cmp/steve"
"github.com/erda-project/erda/modules/cmp/steve/middleware"
httpapi "github.com/erda-project/erda/pkg/common/httpapi"
......@@ -50,6 +55,16 @@ import (
const OfflineLabel = "dice/offline"
var queryQueue *queue.QueryQueue
func init() {
queueSize := 10
if size, err := strconv.Atoi(os.Getenv("LIST_QUEUE_SIZE")); err == nil && size > queueSize {
queueSize = size
}
queryQueue = queue.NewQueryQueue(queueSize)
}
type SteveServer interface {
GetSteveResource(context.Context, *apistructs.SteveRequest) (types.APIObject, error)
ListSteveResource(context.Context, *apistructs.SteveRequest) ([]types.APIObject, error)
......@@ -65,7 +80,6 @@ type SteveServer interface {
OfflineNode(context.Context, string, string, string, []string) error
OnlineNode(context.Context, *apistructs.SteveRequest) error
Auth(userID, orgID, clusterName string) (apiuser.Info, error)
RestartWorkload(context.Context, *apistructs.SteveRequest) error
}
// GetSteveResource gets k8s resource from steve server.
......@@ -133,13 +147,34 @@ func (p *provider) GetSteveResource(ctx context.Context, req *apistructs.SteveRe
return obj, nil
}
// ListSteveResource lists k8s resource from steve server.
// Required fields: ClusterName, Type.
func (p *provider) ListSteveResource(ctx context.Context, req *apistructs.SteveRequest) ([]types.APIObject, error) {
if req.Type == "" || req.ClusterName == "" {
return nil, apierrors.ErrInvoke.InvalidParameter(errors.New("clusterName and type fields are required"))
func (p *provider) list(apiOp *types.APIRequest, resp *steve.Response, clusterName string) ([]types.APIObject, error) {
logrus.Infof("[DEBUG %s] start request steve aggregator at %s", apiOp.Type, time.Now().Format(time.StampNano))
if err := p.SteveAggregator.Serve(clusterName, apiOp); err != nil {
return nil, apierrors.ErrInvoke.InternalError(err)
}
logrus.Infof("[DEBUG %s] end request steve aggregator at %s", apiOp.Type, time.Now().Format(time.StampNano))
collection, ok := resp.ResponseData.(*types.GenericCollection)
if !ok {
if resp.ResponseData == nil {
return nil, apierrors.ErrInvoke.InternalError(errors.New("null response data"))
}
rawResource, ok := resp.ResponseData.(*types.RawResource)
if !ok {
return nil, apierrors.ErrInvoke.InternalError(errors.Errorf("unknown response data type: %s", reflect.TypeOf(resp.ResponseData).String()))
}
obj := rawResource.APIObject.Data()
return nil, apierrors.ErrInvoke.InternalError(errors.New(obj.String("message")))
}
var objects []types.APIObject
for _, obj := range collection.Data {
objects = append(objects, obj.APIObject)
}
return objects, nil
}
func (p *provider) getApiRequest(ctx context.Context, req *apistructs.SteveRequest) (*types.APIRequest, *steve.Response, error) {
path := strutil.JoinPath("/api/k8s/clusters", req.ClusterName, "v1", string(req.Type), req.Namespace)
var (
......@@ -158,7 +193,7 @@ func (p *provider) ListSteveResource(ctx context.Context, req *apistructs.SteveR
}
url, err := url.ParseRequestURI(fmt.Sprintf("%s?%s", path, query))
if err != nil {
return nil, errors.Errorf("failed to parse url, %v", err)
return nil, nil, errors.Errorf("failed to parse url, %v", err)
}
var user apiuser.Info
......@@ -174,19 +209,18 @@ func (p *provider) ListSteveResource(ctx context.Context, req *apistructs.SteveR
} else {
user, err = p.Auth(req.UserID, req.OrgID, req.ClusterName)
if err != nil {
return nil, err
return nil, nil, err
}
}
withUser := request.WithUser(ctx, user)
r, err := http.NewRequestWithContext(withUser, http.MethodGet, url.String(), nil)
if err != nil {
return nil, apierrors.ErrInvoke.InternalError(err)
return nil, nil, apierrors.ErrInvoke.InternalError(err)
}
resp := &steve.Response{}
apiOp := &types.APIRequest{
Name: req.Name,
Type: string(req.Type),
Method: http.MethodGet,
Namespace: req.Namespace,
......@@ -194,31 +228,128 @@ func (p *provider) ListSteveResource(ctx context.Context, req *apistructs.SteveR
Request: r,
Response: &steve.StatusCodeGetter{Response: resp},
}
return apiOp, resp, nil
}
logrus.Infof("[DEBUG %s] start request steve aggregator at %s", req.Type, time.Now().Format(time.StampNano))
if err := p.SteveAggregator.Serve(req.ClusterName, apiOp); err != nil {
return nil, apierrors.ErrInvoke.InternalError(err)
type CacheKey struct {
Kind string
Namespace string
ClusterName string
}
func (k *CacheKey) GetKey() string {
d := sha256.New()
d.Write([]byte(k.Kind))
d.Write([]byte(k.Namespace))
d.Write([]byte(k.ClusterName))
return hex.EncodeToString(d.Sum(nil))
}
// ListSteveResource lists k8s resource from steve server.
// Required fields: ClusterName, Type.
func (p *provider) ListSteveResource(ctx context.Context, req *apistructs.SteveRequest) ([]types.APIObject, error) {
if req.Type == "" || req.ClusterName == "" {
return nil, apierrors.ErrInvoke.InvalidParameter(errors.New("clusterName and type fields are required"))
}
logrus.Infof("[DEBUG %s] end request steve aggregator at %s", req.Type, time.Now().Format(time.StampNano))
collection, ok := resp.ResponseData.(*types.GenericCollection)
if !ok {
if resp.ResponseData == nil {
return nil, apierrors.ErrInvoke.InternalError(errors.New("null response data"))
apiOp, resp, err := p.getApiRequest(ctx, req)
if err != nil {
return nil, err
}
if !p.SteveAggregator.IsServerReady(req.ClusterName) {
return p.list(apiOp, resp, req.ClusterName)
}
hasAccess, err := p.SteveAggregator.HasAccess(req.ClusterName, apiOp, "list")
if err != nil {
return nil, err
}
if !hasAccess {
return nil, apierrors.ErrInvoke.AccessDenied()
}
key := CacheKey{
Kind: apiOp.Type,
Namespace: apiOp.Namespace,
ClusterName: req.ClusterName,
}
values, lexpired, err := cache.GetFreeCache().Get(key.GetKey())
if values == nil || err != nil {
if apiOp.Namespace != "" {
key := CacheKey{
Kind: apiOp.Type,
Namespace: "",
ClusterName: req.ClusterName,
}
allNsValues, expired, err := cache.GetFreeCache().Get(key.GetKey())
if allNsValues != nil && err == nil && !expired {
return getByNamespace(allNsValues[0].Value().([]types.APIObject), apiOp.Namespace), nil
}
}
rawResource, ok := resp.ResponseData.(*types.RawResource)
if !ok {
return nil, apierrors.ErrInvoke.InternalError(errors.Errorf("unknown response data type: %s", reflect.TypeOf(resp.ResponseData).String()))
queryQueue.Acquire(req.ClusterName, 1)
list, err := p.list(apiOp, resp, req.ClusterName)
queryQueue.Release(req.ClusterName, 1)
if err != nil {
return nil, err
}
vals, err := cache.GetInterfaceValue(list)
if err != nil {
return nil, errors.Errorf("failed to marshal cache data for %s, %v", apiOp.Type, err)
}
if err = cache.GetFreeCache().Set(key.GetKey(), vals, time.Second.Nanoseconds()*30); err != nil {
logrus.Errorf("failed to set cache for %s", apiOp.Type)
}
return list, nil
}
if lexpired {
if !cache.ExpireFreshQueue.IsFull() {
task := &queue.Task{
Key: key.GetKey(),
Do: func() {
ctx, cancel := context.WithTimeout(context.Background(), time.Second*10)
defer cancel()
apiOp, resp, err := p.getApiRequest(ctx, req)
if err != nil {
logrus.Errorf("failed to get api request in task, %v", err)
return
}
list, err := p.list(apiOp, resp, req.ClusterName)
if err != nil {
logrus.Errorf("failed to list %s in task, %v", apiOp.Type, err)
return
}
value, err := cache.GetInterfaceValue(list)
if err != nil {
logrus.Errorf("failed to marshal cache data for %s, %v", apiOp.Type, err)
return
}
if err = cache.GetFreeCache().Set(key.GetKey(), value, time.Second.Nanoseconds()*30); err != nil {
logrus.Errorf("failed to set cache for %s, %v", apiOp.Type, err)
}
},
}
cache.ExpireFreshQueue.Enqueue(task)
} else {
logrus.Warnf("queue size is full, task is ignored, key:%s", key.GetKey())
}
obj := rawResource.APIObject.Data()
return nil, apierrors.ErrInvoke.InternalError(errors.New(obj.String("message")))
}
var objects []types.APIObject
for _, obj := range collection.Data {
objects = append(objects, obj.APIObject)
list := values[0].Value().([]types.APIObject)
return list, nil
}
func getByNamespace(list []types.APIObject, namespace string) []types.APIObject {
var res []types.APIObject
for _, apiObj := range list {
if apiObj.Namespace() == namespace {
res = append(res, apiObj)
}
}
return objects, nil
return res
}
func newReadCloser(obj interface{}) (io.ReadCloser, error) {
......@@ -283,6 +414,14 @@ func (p *provider) UpdateSteveResource(ctx context.Context, req *apistructs.Stev
return types.APIObject{}, apierrors.ErrInvoke.InternalError(errors.New(objData.String("message")))
}
cacheKey := CacheKey{
Kind: apiOp.Type,
ClusterName: req.ClusterName,
}
if _, err = cache.GetFreeCache().Remove(cacheKey.GetKey()); err != nil {
logrus.Errorf("failed to remove cache for %s, %v", apiOp.Type, err)
}
auditCtx := map[string]interface{}{
middleware.AuditClusterName: req.ClusterName,
middleware.AuditResourceType: req.Type,
......@@ -347,6 +486,14 @@ func (p *provider) CreateSteveResource(ctx context.Context, req *apistructs.Stev
return types.APIObject{}, apierrors.ErrInvoke.InternalError(errors.New(objData.String("message")))
}
cacheKey := CacheKey{
Kind: apiOp.Type,
ClusterName: req.ClusterName,
}
if _, err = cache.GetFreeCache().Remove(cacheKey.GetKey()); err != nil {
logrus.Errorf("failed to remove cache for %s, %v", apiOp.Type, err)
}
reqObj, err := data.Convert(req.Obj)
if err != nil {
logrus.Errorf("failed to convert obj in request to data.Object, %v", err)
......@@ -410,6 +557,14 @@ func (p *provider) DeleteSteveResource(ctx context.Context, req *apistructs.Stev
}
}
cacheKey := CacheKey{
Kind: apiOp.Type,
ClusterName: req.ClusterName,
}
if _, err = cache.GetFreeCache().Remove(cacheKey.GetKey()); err != nil {
logrus.Errorf("failed to remove cache for %s, %v", apiOp.Type, err)
}
auditCtx := map[string]interface{}{
middleware.AuditClusterName: req.ClusterName,
middleware.AuditResourceType: req.Type,
......@@ -452,7 +607,6 @@ func (p *provider) PatchNode(ctx context.Context, req *apistructs.SteveRequest)
Name: req.Name,
Type: string(req.Type),
Method: http.MethodPatch,
Namespace: req.Namespace,
ResponseWriter: resp,
Request: r,
Response: &steve.StatusCodeGetter{Response: resp},
......@@ -474,6 +628,15 @@ func (p *provider) PatchNode(ctx context.Context, req *apistructs.SteveRequest)
if objData.String("type") == "error" {
return apierrors.ErrInvoke.InternalError(errors.New(objData.String("message")))
}
cacheKey := CacheKey{
Kind: apiOp.Type,
ClusterName: req.ClusterName,
}
if _, err = cache.GetFreeCache().Remove(cacheKey.GetKey()); err != nil {
logrus.Errorf("failed to remove cache for %s, %v", apiOp.Type, err)
}
return nil
}
......@@ -765,40 +928,6 @@ func (p *provider) OnlineNode(ctx context.Context, req *apistructs.SteveRequest)
return nil
}
func (p *provider) RestartWorkload(ctx context.Context, req *apistructs.SteveRequest) error {
if req.Type != apistructs.K8SDeployment && req.Type != apistructs.K8SStatefulSet &&
req.Type != apistructs.K8SDaemonSet {
return errors.New("only deployment, statefulSet and daemonSet can be restarted")
}
patchBody := map[string]interface{}{
"spec": map[string]interface{}{
"template": map[string]interface{}{
"metadata": map[string]interface{}{
"annotations": map[string]interface{}{
"kubectl.kubernetes.io/restartedAt": time.Now().Format("2006-01-02T15:04:05+07:00"),
},
},
},
},
}
req.Obj = patchBody
if err := p.PatchNode(ctx, req); err != nil {
return err
}
auditCtx := map[string]interface{}{
middleware.AuditClusterName: req.ClusterName,
middleware.AuditResourceName: req.Name,
middleware.AuditNamespace: req.Namespace,
middleware.AuditResourceType: req.Type,
}
if err := p.Audit(req.UserID, req.OrgID, middleware.AuditRestartWorkload, auditCtx); err != nil {
logrus.Errorf("failed to audit when offline node, %v", err)
}
return nil
}
// Auth authenticates by userID and orgID.
func (p *provider) Auth(userID, orgID, clusterName string) (apiuser.Info, error) {
scopeID, err := strconv.ParseUint(orgID, 10, 64)
......
Supports Markdown
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment