Unverified Commit 88c5092b authored by hc-github-team-nomad-core's avatar hc-github-team-nomad-core Committed by GitHub
Browse files

Merge pull request #12627 from hashicorp/backport/csi-plugin-expected-counts/locally-flowing-sawfly

This pull request was automerged via backport-assistant
parents 026d6624 33176d07
Showing with 371 additions and 612 deletions
+371 -612
......@@ -2098,6 +2098,39 @@ func CSIVolume(plugin *structs.CSIPlugin) *structs.CSIVolume {
}
}
func CSIPluginJob(pluginType structs.CSIPluginType, pluginID string) *structs.Job {
job := new(structs.Job)
switch pluginType {
case structs.CSIPluginTypeController:
job = Job()
job.ID = fmt.Sprintf("mock-controller-%s", pluginID)
job.Name = "job-plugin-controller"
job.TaskGroups[0].Count = 2
case structs.CSIPluginTypeNode:
job = SystemJob()
job.ID = fmt.Sprintf("mock-node-%s", pluginID)
job.Name = "job-plugin-node"
case structs.CSIPluginTypeMonolith:
job = SystemJob()
job.ID = fmt.Sprintf("mock-monolith-%s", pluginID)
job.Name = "job-plugin-monolith"
}
job.TaskGroups[0].Name = "plugin"
job.TaskGroups[0].Tasks[0].Name = "plugin"
job.TaskGroups[0].Tasks[0].Driver = "docker"
job.TaskGroups[0].Tasks[0].Services = nil
job.TaskGroups[0].Tasks[0].CSIPluginConfig = &structs.TaskCSIPluginConfig{
ID: pluginID,
Type: pluginType,
MountDir: "/csi",
}
job.Canonicalize()
return job
}
func Events(index uint64) *structs.Events {
return &structs.Events{
Index: index,
......
......@@ -853,7 +853,7 @@ func upsertNodeTxn(txn *txn, index uint64, node *structs.Node) error {
if err := txn.Insert("index", &IndexEntry{"nodes", index}); err != nil {
return fmt.Errorf("index update failed: %v", err)
}
if err := upsertNodeCSIPlugins(txn, node, index); err != nil {
if err := upsertCSIPluginsForNode(txn, node, index); err != nil {
return fmt.Errorf("csi plugin update failed: %v", err)
}
......@@ -1178,11 +1178,11 @@ func appendNodeEvents(index uint64, node *structs.Node, events []*structs.NodeEv
}
}
// upsertNodeCSIPlugins indexes csi plugins for volume retrieval, with health. It's called
// upsertCSIPluginsForNode indexes csi plugins for volume retrieval, with health. It's called
// on upsertNodeEvents, so that event driven health changes are updated
func upsertNodeCSIPlugins(txn *txn, node *structs.Node, index uint64) error {
func upsertCSIPluginsForNode(txn *txn, node *structs.Node, index uint64) error {
loop := func(info *structs.CSIInfo) error {
upsertFn := func(info *structs.CSIInfo) error {
raw, err := txn.First("csi_plugins", "id", info.PluginID)
if err != nil {
return fmt.Errorf("csi_plugin lookup error: %s %v", info.PluginID, err)
......@@ -1226,7 +1226,7 @@ func upsertNodeCSIPlugins(txn *txn, node *structs.Node, index uint64) error {
inUseNode := map[string]struct{}{}
for _, info := range node.CSIControllerPlugins {
err := loop(info)
err := upsertFn(info)
if err != nil {
return err
}
......@@ -1234,7 +1234,7 @@ func upsertNodeCSIPlugins(txn *txn, node *structs.Node, index uint64) error {
}
for _, info := range node.CSINodePlugins {
err := loop(info)
err := upsertFn(info)
if err != nil {
return err
}
......@@ -3228,7 +3228,7 @@ func (s *StateStore) nestedUpdateAllocFromClient(txn *txn, index uint64, alloc *
return err
}
if err := s.updatePluginWithAlloc(index, copyAlloc, txn); err != nil {
if err := s.updatePluginForTerminalAlloc(index, copyAlloc, txn); err != nil {
return err
}
......@@ -3337,7 +3337,7 @@ func (s *StateStore) upsertAllocsImpl(index uint64, allocs []*structs.Allocation
return err
}
if err := s.updatePluginWithAlloc(index, alloc, txn); err != nil {
if err := s.updatePluginForTerminalAlloc(index, alloc, txn); err != nil {
return err
}
......@@ -4779,7 +4779,7 @@ func (s *StateStore) updateJobScalingPolicies(index uint64, job *structs.Job, tx
func (s *StateStore) updateJobCSIPlugins(index uint64, job, prev *structs.Job, txn *txn) error {
plugIns := make(map[string]*structs.CSIPlugin)
loop := func(job *structs.Job, delete bool) error {
upsertFn := func(job *structs.Job, delete bool) error {
for _, tg := range job.TaskGroups {
for _, t := range tg.Tasks {
if t.CSIPluginConfig == nil {
......@@ -4813,13 +4813,13 @@ func (s *StateStore) updateJobCSIPlugins(index uint64, job, prev *structs.Job, t
}
if prev != nil {
err := loop(prev, true)
err := upsertFn(prev, true)
if err != nil {
return err
}
}
err := loop(job, false)
err := upsertFn(job, false)
if err != nil {
return err
}
......@@ -5064,10 +5064,11 @@ func (s *StateStore) updateSummaryWithAlloc(index uint64, alloc *structs.Allocat
return nil
}
// updatePluginWithAlloc updates the CSI plugins for an alloc when the
// updatePluginForTerminalAlloc updates the CSI plugins for an alloc when the
// allocation is updated or inserted with a terminal server status.
func (s *StateStore) updatePluginWithAlloc(index uint64, alloc *structs.Allocation,
func (s *StateStore) updatePluginForTerminalAlloc(index uint64, alloc *structs.Allocation,
txn *txn) error {
if !alloc.ServerTerminalStatus() {
return nil
}
......@@ -5123,7 +5124,9 @@ func (s *StateStore) updatePluginWithJobSummary(index uint64, summary *structs.J
plug = plug.Copy()
}
plug.UpdateExpectedWithJob(alloc.Job, summary, alloc.ServerTerminalStatus())
plug.UpdateExpectedWithJob(alloc.Job, summary,
alloc.Job.Status == structs.JobStatusDead)
err = updateOrGCPlugin(index, txn, plug)
if err != nil {
return err
......
This diff is collapsed.
Supports Markdown
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment