Unverified Commit 37034897 authored by Tim Gross's avatar Tim Gross Committed by GitHub
Browse files

MRD: all regions should start pending (#8433)

Deployments should wait until kicked off by `Job.Register` so that we can
assert that all regions have a scheduled deployment before starting any
region. This changeset includes the OSS fixes to support the ENT work.

`IsMultiregionStarter` has no more callers in OSS, so remove it here.
parent 5cfd3146
Showing with 33 additions and 92 deletions
+33 -92
...@@ -163,7 +163,10 @@ func TestDeploymentStatusCommand_Multiregion(t *testing.T) { ...@@ -163,7 +163,10 @@ func TestDeploymentStatusCommand_Multiregion(t *testing.T) {
require.Contains(t, out, eastDeploys[0].ID[0:7]) require.Contains(t, out, eastDeploys[0].ID[0:7])
require.Contains(t, out, "west") require.Contains(t, out, "west")
require.Contains(t, out, westDeploys[0].ID[0:7]) require.Contains(t, out, westDeploys[0].ID[0:7])
require.Contains(t, out, "running")
// this will always be pending because we're not really doing a multiregion
// register here in OSS
require.Contains(t, out, "pending")
require.NotContains(t, out, "<none>") require.NotContains(t, out, "<none>")
......
...@@ -481,7 +481,10 @@ func TestJobStatusCommand_Multiregion(t *testing.T) { ...@@ -481,7 +481,10 @@ func TestJobStatusCommand_Multiregion(t *testing.T) {
require.Contains(t, out, eastDeploys[0].ID[0:7]) require.Contains(t, out, eastDeploys[0].ID[0:7])
require.Contains(t, out, "west") require.Contains(t, out, "west")
require.Contains(t, out, westDeploys[0].ID[0:7]) require.Contains(t, out, westDeploys[0].ID[0:7])
require.Contains(t, out, "running")
// this will always be pending because we're not really doing a multiregion
// register here in OSS
require.Contains(t, out, "pending")
require.NotContains(t, out, "<none>") require.NotContains(t, out, "<none>")
......
...@@ -307,7 +307,7 @@ func (j *Job) Register(args *structs.JobRegisterRequest, reply *structs.JobRegis ...@@ -307,7 +307,7 @@ func (j *Job) Register(args *structs.JobRegisterRequest, reply *structs.JobRegis
if existingJob != nil { if existingJob != nil {
existingVersion = existingJob.Version existingVersion = existingJob.Version
} }
err = j.multiregionRegister(args, reply, existingVersion) isRunner, err := j.multiregionRegister(args, reply, existingVersion)
if err != nil { if err != nil {
return err return err
} }
...@@ -334,6 +334,9 @@ func (j *Job) Register(args *structs.JobRegisterRequest, reply *structs.JobRegis ...@@ -334,6 +334,9 @@ func (j *Job) Register(args *structs.JobRegisterRequest, reply *structs.JobRegis
reply.JobModifyIndex = existingJob.JobModifyIndex reply.JobModifyIndex = existingJob.JobModifyIndex
} }
// used for multiregion start
args.Job.JobModifyIndex = reply.JobModifyIndex
// If the job is periodic or parameterized, we don't create an eval. // If the job is periodic or parameterized, we don't create an eval.
if args.Job.IsPeriodic() || args.Job.IsParameterized() { if args.Job.IsPeriodic() || args.Job.IsParameterized() {
return nil return nil
...@@ -371,6 +374,15 @@ func (j *Job) Register(args *structs.JobRegisterRequest, reply *structs.JobRegis ...@@ -371,6 +374,15 @@ func (j *Job) Register(args *structs.JobRegisterRequest, reply *structs.JobRegis
reply.EvalID = eval.ID reply.EvalID = eval.ID
reply.EvalCreateIndex = evalIndex reply.EvalCreateIndex = evalIndex
reply.Index = evalIndex reply.Index = evalIndex
// Kick off a multiregion deployment (enterprise only).
if isRunner {
err = j.multiregionStart(args, reply)
if err != nil {
return err
}
}
return nil return nil
} }
......
...@@ -10,9 +10,12 @@ func (j *Job) enforceSubmitJob(override bool, job *structs.Job) (error, error) { ...@@ -10,9 +10,12 @@ func (j *Job) enforceSubmitJob(override bool, job *structs.Job) (error, error) {
} }
// multiregionRegister is used to send a job across multiple regions // multiregionRegister is used to send a job across multiple regions
func (j *Job) multiregionRegister(args *structs.JobRegisterRequest, reply *structs.JobRegisterResponse, func (j *Job) multiregionRegister(args *structs.JobRegisterRequest, reply *structs.JobRegisterResponse, existingVersion uint64) (bool, error) {
existingVersion uint64) error { return false, nil
}
// multiregionStart is used to kick-off a deployment across multiple regions
func (j *Job) multiregionStart(args *structs.JobRegisterRequest, reply *structs.JobRegisterResponse) error {
return nil return nil
} }
......
...@@ -1297,6 +1297,9 @@ func JobWithScalingPolicy() (*structs.Job, *structs.ScalingPolicy) { ...@@ -1297,6 +1297,9 @@ func JobWithScalingPolicy() (*structs.Job, *structs.ScalingPolicy) {
func MultiregionJob() *structs.Job { func MultiregionJob() *structs.Job {
job := Job() job := Job()
update := *structs.DefaultUpdateStrategy
job.Update = update
job.TaskGroups[0].Update = &update
job.Multiregion = &structs.Multiregion{ job.Multiregion = &structs.Multiregion{
Strategy: &structs.MultiregionStrategy{ Strategy: &structs.MultiregionStrategy{
MaxParallel: 1, MaxParallel: 1,
......
...@@ -4175,30 +4175,6 @@ func (j *Job) IsMultiregion() bool { ...@@ -4175,30 +4175,6 @@ func (j *Job) IsMultiregion() bool {
return j.Multiregion != nil && j.Multiregion.Regions != nil && len(j.Multiregion.Regions) > 0 return j.Multiregion != nil && j.Multiregion.Regions != nil && len(j.Multiregion.Regions) > 0
} }
// IsMultiregionStarter returns whether a regional job should begin
// in the running state
func (j *Job) IsMultiregionStarter() bool {
if !j.IsMultiregion() {
return true
}
if j.Type == "system" || j.Type == "batch" {
return true
}
if j.Multiregion.Strategy == nil || j.Multiregion.Strategy.MaxParallel == 0 {
return true
}
for i, region := range j.Multiregion.Regions {
if j.Region == region.Name {
if i < j.Multiregion.Strategy.MaxParallel {
return true
} else {
break
}
}
}
return false
}
// VaultPolicies returns the set of Vault policies per task group, per task // VaultPolicies returns the set of Vault policies per task group, per task
func (j *Job) VaultPolicies() map[string]map[string]*Vault { func (j *Job) VaultPolicies() map[string]map[string]*Vault {
policies := make(map[string]map[string]*Vault, len(j.TaskGroups)) policies := make(map[string]map[string]*Vault, len(j.TaskGroups))
......
...@@ -5438,64 +5438,6 @@ func TestMultiregion_CopyCanonicalize(t *testing.T) { ...@@ -5438,64 +5438,6 @@ func TestMultiregion_CopyCanonicalize(t *testing.T) {
require.False(old.Diff(nonEmptyOld)) require.False(old.Diff(nonEmptyOld))
} }
func TestMultiregion_Starter(t *testing.T) {
require := require.New(t)
j := &Job{}
j.Type = "service"
j.Region = "north"
require.True(j.IsMultiregionStarter())
tc := &Multiregion{
Strategy: &MultiregionStrategy{},
Regions: []*MultiregionRegion{
{Name: "north"},
{Name: "south"},
{Name: "east"},
{Name: "west"},
},
}
b := &Job{}
b.Type = "batch"
b.Multiregion = tc
b.Region = "west"
require.True(j.IsMultiregionStarter())
j.Multiregion = tc
j.Region = "north"
require.True(j.IsMultiregionStarter())
j.Region = "south"
require.True(j.IsMultiregionStarter())
j.Region = "east"
require.True(j.IsMultiregionStarter())
j.Region = "west"
require.True(j.IsMultiregionStarter())
tc.Strategy = &MultiregionStrategy{MaxParallel: 1}
j.Multiregion = tc
j.Region = "north"
require.True(j.IsMultiregionStarter())
j.Region = "south"
require.False(j.IsMultiregionStarter())
j.Region = "east"
require.False(j.IsMultiregionStarter())
j.Region = "west"
require.False(j.IsMultiregionStarter())
tc.Strategy = &MultiregionStrategy{MaxParallel: 2}
j.Multiregion = tc
j.Region = "north"
require.True(j.IsMultiregionStarter())
j.Region = "south"
require.True(j.IsMultiregionStarter())
j.Region = "east"
require.False(j.IsMultiregionStarter())
j.Region = "west"
require.False(j.IsMultiregionStarter())
}
func TestNodeResources_Merge(t *testing.T) { func TestNodeResources_Merge(t *testing.T) {
res := &NodeResources{ res := &NodeResources{
Cpu: NodeCpuResources{ Cpu: NodeCpuResources{
......
...@@ -201,10 +201,10 @@ func (a *allocReconciler) Compute() *reconcileResults { ...@@ -201,10 +201,10 @@ func (a *allocReconciler) Compute() *reconcileResults {
a.deploymentFailed = a.deployment.Status == structs.DeploymentStatusFailed a.deploymentFailed = a.deployment.Status == structs.DeploymentStatusFailed
} }
if a.deployment == nil { if a.deployment == nil {
// When we create the deployment later, it will be in a paused // When we create the deployment later, it will be in a pending
// state. But we also need to tell Compute we're paused, otherwise we // state. But we also need to tell Compute we're paused, otherwise we
// make placements on the paused deployment. // make placements on the paused deployment.
if !a.job.IsMultiregionStarter() { if a.job.IsMultiregion() && !(a.job.IsPeriodic() || a.job.IsParameterized()) {
a.deploymentPaused = true a.deploymentPaused = true
} }
} }
...@@ -555,9 +555,8 @@ func (a *allocReconciler) computeGroup(group string, all allocSet) bool { ...@@ -555,9 +555,8 @@ func (a *allocReconciler) computeGroup(group string, all allocSet) bool {
// A previous group may have made the deployment already // A previous group may have made the deployment already
if a.deployment == nil { if a.deployment == nil {
a.deployment = structs.NewDeployment(a.job) a.deployment = structs.NewDeployment(a.job)
// in a multiregion job, if max_parallel is set, only the first // in multiregion jobs, most deployments start in a pending state
// region starts in the running state if a.job.IsMultiregion() && !(a.job.IsPeriodic() && a.job.IsParameterized()) {
if !a.job.IsMultiregionStarter() {
a.deployment.Status = structs.DeploymentStatusPending a.deployment.Status = structs.DeploymentStatusPending
a.deployment.StatusDescription = structs.DeploymentStatusDescriptionPendingForPeer a.deployment.StatusDescription = structs.DeploymentStatusDescriptionPendingForPeer
} }
......
Supports Markdown
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment