Commit 27e5cea0 authored by Tim Gross's avatar Tim Gross Committed by Tim Gross
Browse files

csi: implement CSI controller detach request/response (#7107)

This changeset implements the minimal structs on the client-side we
need to compile the work-in-progress implementation of the
server-to-controller RPCs. It doesn't include implementing the
`ClientCSI.DettachVolume` RPC on the client.
parent 5291c3ac
Showing with 180 additions and 8 deletions
+180 -8
......@@ -75,3 +75,28 @@ type ClientCSIControllerAttachVolumeResponse struct {
// subsequent `NodeStageVolume` or `NodePublishVolume` calls
PublishContext map[string]string
}
type ClientCSIControllerDetachVolumeRequest struct {
PluginName string
// The ID of the volume to be unpublished for the node
// This field is REQUIRED.
VolumeID string
// The ID of the node. This field is REQUIRED. This must match the NodeID that
// is fingerprinted by the target node for this plugin name.
NodeID string
}
func (c *ClientCSIControllerDetachVolumeRequest) ToCSIRequest() *csi.ControllerUnpublishVolumeRequest {
if c == nil {
return &csi.ControllerUnpublishVolumeRequest{}
}
return &csi.ControllerUnpublishVolumeRequest{
VolumeID: c.VolumeID,
NodeID: c.NodeID,
}
}
type ClientCSIControllerDetachVolumeResponse struct{}
......@@ -237,13 +237,12 @@ func (c *client) ControllerPublishVolume(ctx context.Context, req *ControllerPub
return nil, fmt.Errorf("controllerClient not initialized")
}
pbrequest := &csipbv1.ControllerPublishVolumeRequest{
VolumeId: req.VolumeID,
NodeId: req.NodeID,
Readonly: req.ReadOnly,
//TODO: add capabilities
err := req.Validate()
if err != nil {
return nil, err
}
pbrequest := req.ToCSIRepresentation()
resp, err := c.controllerClient.ControllerPublishVolume(ctx, pbrequest)
if err != nil {
return nil, err
......@@ -254,6 +253,27 @@ func (c *client) ControllerPublishVolume(ctx context.Context, req *ControllerPub
}, nil
}
func (c *client) ControllerUnpublishVolume(ctx context.Context, req *ControllerUnpublishVolumeRequest) (*ControllerUnpublishVolumeResponse, error) {
if c == nil {
return nil, fmt.Errorf("Client not initialized")
}
if c.controllerClient == nil {
return nil, fmt.Errorf("controllerClient not initialized")
}
err := req.Validate()
if err != nil {
return nil, err
}
upbrequest := req.ToCSIRepresentation()
_, err = c.controllerClient.ControllerUnpublishVolume(ctx, upbrequest)
if err != nil {
return nil, err
}
return &ControllerUnpublishVolumeResponse{}, nil
}
//
// Node Endpoints
//
......
......@@ -360,6 +360,7 @@ func TestClient_RPC_NodeGetCapabilities(t *testing.T) {
func TestClient_RPC_ControllerPublishVolume(t *testing.T) {
cases := []struct {
Name string
Request *ControllerPublishVolumeRequest
ResponseErr error
Response *csipbv1.ControllerPublishVolumeResponse
ExpectedResponse *ControllerPublishVolumeResponse
......@@ -367,16 +368,26 @@ func TestClient_RPC_ControllerPublishVolume(t *testing.T) {
}{
{
Name: "handles underlying grpc errors",
Request: &ControllerPublishVolumeRequest{},
ResponseErr: fmt.Errorf("some grpc error"),
ExpectedErr: fmt.Errorf("some grpc error"),
},
{
Name: "Handles missing NodeID",
Request: &ControllerPublishVolumeRequest{},
Response: &csipbv1.ControllerPublishVolumeResponse{},
ExpectedErr: fmt.Errorf("missing NodeID"),
},
{
Name: "Handles PublishContext == nil",
Request: &ControllerPublishVolumeRequest{VolumeID: "vol", NodeID: "node"},
Response: &csipbv1.ControllerPublishVolumeResponse{},
ExpectedResponse: &ControllerPublishVolumeResponse{},
},
{
Name: "Handles PublishContext != nil",
Name: "Handles PublishContext != nil",
Request: &ControllerPublishVolumeRequest{VolumeID: "vol", NodeID: "node"},
Response: &csipbv1.ControllerPublishVolumeResponse{
PublishContext: map[string]string{
"com.hashicorp/nomad-node-id": "foobar",
......@@ -400,7 +411,54 @@ func TestClient_RPC_ControllerPublishVolume(t *testing.T) {
cc.NextErr = c.ResponseErr
cc.NextPublishVolumeResponse = c.Response
resp, err := client.ControllerPublishVolume(context.TODO(), &ControllerPublishVolumeRequest{})
resp, err := client.ControllerPublishVolume(context.TODO(), c.Request)
if c.ExpectedErr != nil {
require.Error(t, c.ExpectedErr, err)
}
require.Equal(t, c.ExpectedResponse, resp)
})
}
}
func TestClient_RPC_ControllerUnpublishVolume(t *testing.T) {
cases := []struct {
Name string
Request *ControllerUnpublishVolumeRequest
ResponseErr error
Response *csipbv1.ControllerUnpublishVolumeResponse
ExpectedResponse *ControllerUnpublishVolumeResponse
ExpectedErr error
}{
{
Name: "Handles underlying grpc errors",
Request: &ControllerUnpublishVolumeRequest{},
ResponseErr: fmt.Errorf("some grpc error"),
ExpectedErr: fmt.Errorf("some grpc error"),
},
{
Name: "Handles missing NodeID",
Request: &ControllerUnpublishVolumeRequest{},
ExpectedErr: fmt.Errorf("missing NodeID"),
ExpectedResponse: nil,
},
{
Name: "Handles successful response",
Request: &ControllerUnpublishVolumeRequest{VolumeID: "vol", NodeID: "node"},
ExpectedErr: fmt.Errorf("missing NodeID"),
ExpectedResponse: &ControllerUnpublishVolumeResponse{},
},
}
for _, c := range cases {
t.Run(c.Name, func(t *testing.T) {
_, cc, _, client := newTestClient()
defer client.Close()
cc.NextErr = c.ResponseErr
cc.NextUnpublishVolumeResponse = c.Response
resp, err := client.ControllerUnpublishVolume(context.TODO(), c.Request)
if c.ExpectedErr != nil {
require.Error(t, c.ExpectedErr, err)
}
......
......@@ -44,6 +44,10 @@ type Client struct {
NextControllerPublishVolumeErr error
ControllerPublishVolumeCallCount int64
NextControllerUnpublishVolumeResponse *csi.ControllerUnpublishVolumeResponse
NextControllerUnpublishVolumeErr error
ControllerUnpublishVolumeCallCount int64
NextNodeGetCapabilitiesResponse *csi.NodeCapabilitySet
NextNodeGetCapabilitiesErr error
NodeGetCapabilitiesCallCount int64
......@@ -139,6 +143,16 @@ func (c *Client) ControllerPublishVolume(ctx context.Context, req *csi.Controlle
return c.NextControllerPublishVolumeResponse, c.NextControllerPublishVolumeErr
}
// ControllerUnpublishVolume is used to attach a remote volume to a node
func (c *Client) ControllerUnpublishVolume(ctx context.Context, req *csi.ControllerUnpublishVolumeRequest) (*csi.ControllerUnpublishVolumeResponse, error) {
c.Mu.Lock()
defer c.Mu.Unlock()
c.ControllerUnpublishVolumeCallCount++
return c.NextControllerUnpublishVolumeResponse, c.NextControllerUnpublishVolumeErr
}
func (c *Client) NodeGetCapabilities(ctx context.Context) (*csi.NodeCapabilitySet, error) {
c.Mu.Lock()
defer c.Mu.Unlock()
......
......@@ -36,6 +36,9 @@ type CSIPlugin interface {
// ControllerPublishVolume is used to attach a remote volume to a cluster node.
ControllerPublishVolume(ctx context.Context, req *ControllerPublishVolumeRequest) (*ControllerPublishVolumeResponse, error)
// ControllerUnpublishVolume is used to deattach a remote volume from a cluster node.
ControllerUnpublishVolume(ctx context.Context, req *ControllerUnpublishVolumeRequest) (*ControllerUnpublishVolumeResponse, error)
// NodeGetCapabilities is used to return the available capabilities from the
// Node Service.
NodeGetCapabilities(ctx context.Context) (*NodeCapabilitySet, error)
......@@ -223,14 +226,66 @@ type ControllerPublishVolumeRequest struct {
VolumeID string
NodeID string
ReadOnly bool
//TODO: Add Capabilities
}
func (r *ControllerPublishVolumeRequest) ToCSIRepresentation() *csipbv1.ControllerPublishVolumeRequest {
if r == nil {
return nil
}
return &csipbv1.ControllerPublishVolumeRequest{
VolumeId: r.VolumeID,
NodeId: r.NodeID,
Readonly: r.ReadOnly,
// TODO: add capabilities
}
}
func (r *ControllerPublishVolumeRequest) Validate() error {
if r.VolumeID == "" {
return errors.New("missing VolumeID")
}
if r.NodeID == "" {
return errors.New("missing NodeID")
}
return nil
}
type ControllerPublishVolumeResponse struct {
PublishContext map[string]string
}
type ControllerUnpublishVolumeRequest struct {
VolumeID string
NodeID string
}
func (r *ControllerUnpublishVolumeRequest) ToCSIRepresentation() *csipbv1.ControllerUnpublishVolumeRequest {
if r == nil {
return nil
}
return &csipbv1.ControllerUnpublishVolumeRequest{
VolumeId: r.VolumeID,
NodeId: r.NodeID,
}
}
func (r *ControllerUnpublishVolumeRequest) Validate() error {
if r.VolumeID == "" {
return errors.New("missing VolumeID")
}
if r.NodeID == "" {
// the spec allows this but it would unpublish the
// volume from all nodes
return errors.New("missing NodeID")
}
return nil
}
type ControllerUnpublishVolumeResponse struct{}
type NodeCapabilitySet struct {
HasStageUnstageVolume bool
}
......
Supports Markdown
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment