Commit 419fcb55 authored by Tim Gross's avatar Tim Gross
Browse files

backport of commit c2b7ddaa1eef68367791c2c561864487df5d3b58

parent debd2d96
No related merge requests found
Showing with 16 additions and 19 deletions
+16 -19
......@@ -201,6 +201,12 @@ func (h *csiPluginSupervisorHook) ensureSupervisorLoop(ctx context.Context) {
}()
socketPath := filepath.Join(h.mountPoint, structs.CSISocketName)
client := csi.NewClient(socketPath, h.logger.Named("csi_client").With(
"plugin.name", h.task.CSIPluginConfig.ID,
"plugin.type", h.task.CSIPluginConfig.Type))
defer client.Close()
t := time.NewTimer(0)
// Step 1: Wait for the plugin to initially become available.
......@@ -210,7 +216,7 @@ WAITFORREADY:
case <-ctx.Done():
return
case <-t.C:
pluginHealthy, err := h.supervisorLoopOnce(ctx, socketPath)
pluginHealthy, err := h.supervisorLoopOnce(ctx, client)
if err != nil || !pluginHealthy {
h.logger.Debug("CSI Plugin not ready", "error", err)
......@@ -232,9 +238,9 @@ WAITFORREADY:
}
// Step 2: Register the plugin with the catalog.
deregisterPluginFn, err := h.registerPlugin(socketPath)
deregisterPluginFn, err := h.registerPlugin(client, socketPath)
if err != nil {
h.logger.Error("CSI Plugin registration failed", "error", err)
h.logger.Error("CSI plugin registration failed", "error", err)
event := structs.NewTaskEvent(structs.TaskPluginUnhealthy)
event.SetMessage(fmt.Sprintf("failed to register plugin: %s, reason: %v", h.task.CSIPluginConfig.ID, err))
h.eventEmitter.EmitEvent(event)
......@@ -249,9 +255,9 @@ WAITFORREADY:
deregisterPluginFn()
return
case <-t.C:
pluginHealthy, err := h.supervisorLoopOnce(ctx, socketPath)
pluginHealthy, err := h.supervisorLoopOnce(ctx, client)
if err != nil {
h.logger.Error("CSI Plugin fingerprinting failed", "error", err)
h.logger.Error("CSI plugin fingerprinting failed", "error", err)
}
// The plugin has transitioned to a healthy state. Emit an event.
......@@ -281,13 +287,9 @@ WAITFORREADY:
}
}
func (h *csiPluginSupervisorHook) registerPlugin(socketPath string) (func(), error) {
func (h *csiPluginSupervisorHook) registerPlugin(client csi.CSIPlugin, socketPath string) (func(), error) {
// At this point we know the plugin is ready and we can fingerprint it
// to get its vendor name and version
client := csi.NewClient(socketPath, h.logger.Named("csi_client").With("plugin.name", h.task.CSIPluginConfig.ID, "plugin.type", h.task.CSIPluginConfig.Type))
defer client.Close()
info, err := client.PluginInfo()
if err != nil {
return nil, fmt.Errorf("failed to probe plugin: %v", err)
......@@ -351,16 +353,11 @@ func (h *csiPluginSupervisorHook) registerPlugin(socketPath string) (func(), err
}, nil
}
func (h *csiPluginSupervisorHook) supervisorLoopOnce(ctx context.Context, socketPath string) (bool, error) {
_, err := os.Stat(socketPath)
if err != nil {
return false, fmt.Errorf("failed to stat socket: %v", err)
}
client := csi.NewClient(socketPath, h.logger.Named("csi_client").With("plugin.name", h.task.CSIPluginConfig.ID, "plugin.type", h.task.CSIPluginConfig.Type))
defer client.Close()
func (h *csiPluginSupervisorHook) supervisorLoopOnce(ctx context.Context, client csi.CSIPlugin) (bool, error) {
probeCtx, probeCancelFn := context.WithTimeout(ctx, 5*time.Second)
defer probeCancelFn()
healthy, err := client.PluginProbe(ctx)
healthy, err := client.PluginProbe(probeCtx)
if err != nil {
return false, fmt.Errorf("failed to probe plugin: %v", err)
}
......
Supports Markdown
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment