Unverified Commit d6276e10 authored by TzZtzt's avatar TzZtzt Committed by GitHub
Browse files

Use APIReader instead in Fluid CSI Plugin for 0.7.0 (#1530)


* Use api reader instead in Fluid CSI Plugin
Signed-off-by: default avatarTrafalgarZZZ <trafalgarz@outlook.com>

* Constrain fluid-csi-plugin cluster role rbac
Signed-off-by: default avatarTrafalgarZZZ <trafalgarz@outlook.com>
No related merge requests found
Showing with 77 additions and 74 deletions
+77 -74
......@@ -22,22 +22,22 @@ rules:
verbs: ["get", "list", "watch"]
- apiGroups: [""]
resources: ["persistentvolumes"]
verbs: ["get", "list", "watch"]
verbs: ["get"]
- apiGroups: [""]
resources: ["persistentvolumes/status"]
verbs: ["get", "list", "watch"]
verbs: ["get"]
- apiGroups: [""]
resources: ["persistentvolumeclaims"]
verbs: ["get", "list", "watch"]
verbs: ["get"]
- apiGroups: [""]
resources: ["persistentvolumeclaims/status"]
verbs: ["get", "list", "watch"]
verbs: ["get"]
- apiGroups: [""]
resources: ["events"]
verbs: ["get", "list", "watch", "create", "update", "patch"]
- apiGroups: [""]
resources: ["nodes"]
verbs: ["get", "list", "watch", "patch"]
verbs: ["get", "patch"]
---
kind: ClusterRoleBinding
apiVersion: rbac.authorization.k8s.io/v1
......
......@@ -117,7 +117,7 @@ func handle() {
}
}()
d := csi.NewDriver(nodeID, endpoint, mgr.GetClient())
d := csi.NewDriver(nodeID, endpoint, mgr.GetClient(), mgr.GetAPIReader())
d.Run()
}
......
......@@ -50,11 +50,12 @@ const (
type driver struct {
client client.Client
apiReader client.Reader
csiDriver *csicommon.CSIDriver
nodeId, endpoint string
}
func NewDriver(nodeID, endpoint string, client client.Client) *driver {
func NewDriver(nodeID, endpoint string, client client.Client, apiReader client.Reader) *driver {
glog.Infof("Driver: %v version: %v", driverName, version)
proto, addr := utils.SplitSchemaAddr(endpoint)
......@@ -80,6 +81,7 @@ func NewDriver(nodeID, endpoint string, client client.Client) *driver {
endpoint: endpoint,
csiDriver: csiDriver,
client: client,
apiReader: apiReader,
}
}
......@@ -93,6 +95,7 @@ func (d *driver) newNodeServer() *nodeServer {
nodeId: d.nodeId,
DefaultNodeServer: csicommon.NewDefaultNodeServer(d.csiDriver),
client: d.client,
apiReader: d.apiReader,
}
}
......
......@@ -40,8 +40,9 @@ import (
type nodeServer struct {
nodeId string
*csicommon.DefaultNodeServer
client client.Client
mutex sync.Mutex
client client.Client
apiReader client.Reader
mutex sync.Mutex
}
func (ns *nodeServer) NodePublishVolume(ctx context.Context, req *csi.NodePublishVolumeRequest) (*csi.NodePublishVolumeResponse, error) {
......@@ -172,7 +173,7 @@ func (ns *nodeServer) NodeUnstageVolume(ctx context.Context, req *csi.NodeUnstag
// from the VolumeId information. So the solution is to get PV according to the VolumeId and to check the ClaimRef
// of the PV. Fluid creates PVC with the same namespace and name for any datasets so the namespace and name of the ClaimRef
// can be used to indicate a specific dataset.
namespace, name, err := getNamespacedNameByVolumeId(ns.client, req.GetVolumeId())
namespace, name, err := getNamespacedNameByVolumeId(ns.apiReader, req.GetVolumeId())
if err != nil {
glog.Errorf("NodeUnstageVolume: can't get namespace and name by volume id %s: %v", req.GetVolumeId(), err)
return nil, errors.Wrapf(err, "NodeUnstageVolume: can't get namespace and name by volume id %s", req.GetVolumeId())
......@@ -195,7 +196,7 @@ func (ns *nodeServer) NodeUnstageVolume(ctx context.Context, req *csi.NodeUnstag
var labelsToModify common.LabelsToModify
labelsToModify.Delete(fuseLabelKey)
node, err := kubeclient.GetNode(ns.client, ns.nodeId)
node, err := kubeclient.GetNode(ns.apiReader, ns.nodeId)
if err != nil {
glog.Errorf("NodeUnstageVolume: can't get node %s: %v", ns.nodeId, err)
return nil, errors.Wrapf(err, "NodeUnstageVolume: can't get node %s", ns.nodeId)
......@@ -216,7 +217,7 @@ func (ns *nodeServer) NodeStageVolume(ctx context.Context, req *csi.NodeStageVol
defer ns.mutex.Unlock()
// 1. get dataset namespace and name by volume id
namespace, name, err := getNamespacedNameByVolumeId(ns.client, req.GetVolumeId())
namespace, name, err := getNamespacedNameByVolumeId(ns.apiReader, req.GetVolumeId())
if err != nil {
glog.Errorf("NodeStageVolume: can't get namespace and name by volume id %s: %v", req.GetVolumeId(), err)
return nil, errors.Wrapf(err, "NodeStageVolume: can't get namespace and name by volume id %s", req.GetVolumeId())
......@@ -227,7 +228,7 @@ func (ns *nodeServer) NodeStageVolume(ctx context.Context, req *csi.NodeStageVol
var labelsToModify common.LabelsToModify
labelsToModify.Add(fuseLabelKey, "true")
node, err := kubeclient.GetNode(ns.client, ns.nodeId)
node, err := kubeclient.GetNode(ns.apiReader, ns.nodeId)
if err != nil {
glog.Errorf("NodeStageVolume: can't get node %s: %v", ns.nodeId, err)
return nil, errors.Wrapf(err, "NodeStageVolume: can't get node %s", ns.nodeId)
......@@ -307,7 +308,7 @@ func checkMountInUse(volumeName string) (bool, error) {
return inUse, err
}
func getNamespacedNameByVolumeId(client client.Client, volumeId string) (namespace, name string, err error) {
func getNamespacedNameByVolumeId(client client.Reader, volumeId string) (namespace, name string, err error) {
pv, err := kubeclient.GetPersistentVolume(client, volumeId)
if err != nil {
return "", "", err
......
......@@ -25,26 +25,26 @@ import (
"sigs.k8s.io/controller-runtime/pkg/log"
"testing"
)
func newGooseEngineRT(client client.Client, name string, namespace string, withRuntimeInfo bool, unittest bool) *GooseFSEngine {
runTimeInfo,_ := base.BuildRuntimeInfo(name,namespace,"GooseFS", v1alpha1.TieredStore{})
func newGooseEngineRT(client client.Client, name string, namespace string, withRuntimeInfo bool, unittest bool) *GooseFSEngine {
runTimeInfo, _ := base.BuildRuntimeInfo(name, namespace, "GooseFS", v1alpha1.TieredStore{})
engine := &GooseFSEngine{
runtime: &datav1alpha1.GooseFSRuntime{},
name: name,
namespace: namespace,
Client: client,
runtimeInfo: nil,
UnitTest: unittest,
Log: log.NullLogger{},
runtime: &datav1alpha1.GooseFSRuntime{},
name: name,
namespace: namespace,
Client: client,
runtimeInfo: nil,
UnitTest: unittest,
Log: log.NullLogger{},
}
if withRuntimeInfo{
if withRuntimeInfo {
engine.runtimeInfo = runTimeInfo
}
return engine
}
func TestGetRuntimeInfo(t *testing.T){
func TestGetRuntimeInfo(t *testing.T) {
runtimeInputs := []*v1alpha1.GooseFSRuntime{
{
ObjectMeta: metav1.ObjectMeta{
......@@ -52,7 +52,7 @@ func TestGetRuntimeInfo(t *testing.T){
Namespace: "fluid",
},
Spec: v1alpha1.GooseFSRuntimeSpec{
Fuse:v1alpha1.GooseFSFuseSpec{
Fuse: v1alpha1.GooseFSFuseSpec{
Global: true,
},
},
......@@ -63,7 +63,7 @@ func TestGetRuntimeInfo(t *testing.T){
Namespace: "fluid",
},
Spec: v1alpha1.GooseFSRuntimeSpec{
Fuse:v1alpha1.GooseFSFuseSpec{
Fuse: v1alpha1.GooseFSFuseSpec{
Global: false,
},
},
......@@ -95,8 +95,8 @@ func TestGetRuntimeInfo(t *testing.T){
}
dataSetInputs := []*v1alpha1.Dataset{
{
ObjectMeta:metav1.ObjectMeta{
Name:"hadoop",
ObjectMeta: metav1.ObjectMeta{
Name: "hadoop",
Namespace: "fluid",
},
},
......@@ -105,10 +105,10 @@ func TestGetRuntimeInfo(t *testing.T){
for _, runtimeInput := range runtimeInputs {
objs = append(objs, runtimeInput.DeepCopy())
}
for _, daemonSetInput := range daemonSetInputs{
for _, daemonSetInput := range daemonSetInputs {
objs = append(objs, daemonSetInput.DeepCopy())
}
for _, dataSetInput := range dataSetInputs{
for _, dataSetInput := range dataSetInputs {
objs = append(objs, dataSetInput.DeepCopy())
}
//scheme := runtime.NewScheme()
......@@ -116,57 +116,56 @@ func TestGetRuntimeInfo(t *testing.T){
//scheme.AddKnownTypes(v1alpha1.GroupVersion,runtimeInput)
fakeClient := fake.NewFakeClientWithScheme(testScheme, objs...)
testCases := []struct{
name string
namespace string
withRuntimeInfo bool
unittest bool
isErr bool
isNil bool
testCases := []struct {
name string
namespace string
withRuntimeInfo bool
unittest bool
isErr bool
isNil bool
}{
{
name: "hbase",
namespace: "fluid",
withRuntimeInfo: false,
unittest: false,
isErr: false,
isNil: false,
name: "hbase",
namespace: "fluid",
withRuntimeInfo: false,
unittest: false,
isErr: false,
isNil: false,
},
{
name: "hbase",
namespace: "fluid",
withRuntimeInfo: false,
unittest: true,
isErr: false,
isNil: false,
name: "hbase",
namespace: "fluid",
withRuntimeInfo: false,
unittest: true,
isErr: false,
isNil: false,
},
{
name: "hbase",
namespace: "fluid",
withRuntimeInfo: true,
isErr: false,
isNil: false,
name: "hbase",
namespace: "fluid",
withRuntimeInfo: true,
isErr: false,
isNil: false,
},
{
name: "hadoop",
namespace: "fluid",
withRuntimeInfo: false,
unittest: false,
isErr: false,
isNil: false,
name: "hadoop",
namespace: "fluid",
withRuntimeInfo: false,
unittest: false,
isErr: false,
isNil: false,
},
}
for _,testCase := range testCases{
engine := newGooseEngineRT(fakeClient,testCase.name,testCase.namespace,testCase.withRuntimeInfo,testCase.unittest)
runtimeInfo,err := engine.getRuntimeInfo()
for _, testCase := range testCases {
engine := newGooseEngineRT(fakeClient, testCase.name, testCase.namespace, testCase.withRuntimeInfo, testCase.unittest)
runtimeInfo, err := engine.getRuntimeInfo()
isNil := runtimeInfo == nil
isErr := err != nil
if isNil != testCase.isNil{
t.Errorf(" want %t, got %t",testCase.isNil,isNil)
if isNil != testCase.isNil {
t.Errorf(" want %t, got %t", testCase.isNil, isNil)
}
if isErr != testCase.isErr{
t.Errorf(" want %t, got %t",testCase.isErr,isErr)
if isErr != testCase.isErr {
t.Errorf(" want %t, got %t", testCase.isErr, isErr)
}
}
}
\ No newline at end of file
}
......@@ -25,7 +25,7 @@ import (
)
// GetNode gets the latest node info
func GetNode(client client.Client, name string) (node *v1.Node, err error) {
func GetNode(client client.Reader, name string) (node *v1.Node, err error) {
key := types.NamespacedName{
Name: name,
}
......
......@@ -32,7 +32,7 @@ const (
persistentVolumeClaimProtectionFinalizerName = "kubernetes.io/pvc-protection"
)
func GetPersistentVolume(client client.Client, name string) (pv *v1.PersistentVolume, err error) {
func GetPersistentVolume(client client.Reader, name string) (pv *v1.PersistentVolume, err error) {
pv = &v1.PersistentVolume{}
err = client.Get(context.TODO(), types.NamespacedName{Name: name}, pv)
if err != nil {
......@@ -335,7 +335,7 @@ func ShouldRemoveProtectionFinalizer(client client.Client, name, namespace strin
}
// IsDatasetPVC check whether the PVC is a dataset PVC
func IsDatasetPVC(client client.Client, name string, namespace string) (find bool, err error) {
func IsDatasetPVC(client client.Reader, name string, namespace string) (find bool, err error) {
pvc := &v1.PersistentVolumeClaim{}
err = client.Get(context.TODO(), types.NamespacedName{
Namespace: namespace,
......
Supports Markdown
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment