Commit de1410a0 authored by Mahmood Ali's avatar Mahmood Ali
Browse files

note places that need to be updated for task dependencies

along with logic

[ci skip]
parent 830c0b16
Showing with 26 additions and 0 deletions
+26 -0
......@@ -29,6 +29,11 @@ import (
"github.com/hashicorp/nomad/plugins/drivers"
)
// TODO (TaskDependencies): for coordinating tasks blocking, introduce a helper
// structs that is passed Tasks and becomes the condition for unblocking
// TaskRunner.Run function.
// Need to be set as a field in allocRunner and taskRunner and passed along when creating TRs.
// allocRunner is used to run all the tasks in a given allocation
type allocRunner struct {
// id is the ID of the allocation. Can be accessed without a lock
......@@ -399,6 +404,12 @@ func (ar *allocRunner) TaskStateUpdated() {
func (ar *allocRunner) handleTaskStateUpdates() {
defer close(ar.taskStateUpdateHandlerCh)
// TODO (TaskDependencies): Update logic here to inspect completed task
// and unblock appropriate task runners
// e.g. unblock main tasks when all prestart tasks started and
// prestart/block_until=completed finished
//
for done := false; !done; {
select {
case <-ar.taskStateUpdatedCh:
......
......@@ -326,6 +326,10 @@ func NewTaskRunner(config *Config) (*TaskRunner, error) {
tr.logger.Error("alloc missing task group")
return nil, fmt.Errorf("alloc missing task group")
}
// TODO (TaskDependencies): Set the appropriate RestartPolicy here depending on
// job and hook types; consider having restart policy be a task field rather than
// a group setting
tr.restartTracker = restarts.NewRestartTracker(tg.RestartPolicy, tr.alloc.Job.Type)
// Get the driver
......@@ -460,6 +464,10 @@ func (tr *TaskRunner) Run() {
}
}
// TODO (TaskDependencies): Block here until the phase corresponding to task
// is reached. Use channels for coordination and consider killCtx and shutdownCtx
// as well like above
MAIN:
for !tr.Alloc().TerminalStatus() {
select {
......@@ -511,6 +519,8 @@ MAIN:
if resultCh, err := handle.WaitCh(context.Background()); err != nil {
tr.logger.Error("wait task failed", "error", err)
} else {
// TODO (TaskDependencies): wait for deadline if task is prestart/until_completed
// and cause it to fail if resultCh hasn't returned by deadline
select {
case <-tr.killCtx.Done():
// We can go through the normal should restart check since
......
......@@ -113,6 +113,8 @@ func AllocsFit(node *Node, allocs []*Allocation, netIdx *NetworkIndex, checkDevi
continue
}
// TODO (TaskDependencies): update this call to use
// alloc.AllocatedResources.Comparable()
used.Add(alloc.ComparableResources())
}
......
......@@ -2879,6 +2879,9 @@ func (n *NodeReservedNetworkResources) ParseReservedHostPorts() ([]uint64, error
}
// AllocatedResources is the set of resources to be used by an allocation.
// TODO (TaskDependencies): Update this struct and initializer to track
// task hooks and have Comparable() returns cpu/memory according the following
// MAX(SUM(prestart_completed_hooks), SUM(main))+SUM(prestart_running_hooks)
type AllocatedResources struct {
// Tasks is a mapping of task name to the resources for the task.
Tasks map[string]*AllocatedTaskResources
......
Supports Markdown
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment