Unverified Commit 7d9c647b authored by zq200618's avatar zq200618 Committed by GitHub
Browse files

Feat: support step group with substep in the workflow (#3772)

Signed-off-by: default avatarzhengq2006 <zhengq200618@cmbchina.com>

workflow substep develop

Feat: support step group with substep in the workflow
Signed-off-by: default avatarQiang Zheng <zhengq20018@cmbchina.com>

Feat: support step group with substep in the workflow
Signed-off-by: default avatarQiang Zheng <zhengq20018@cmbchina.com>

Feat: support step group with substep in the workflow
Signed-off-by: default avatarQiang Zheng <zhengq20018@cmbchina.com>

Feat: support step group with substep in the workflow
Signed-off-by: default avatarQiang Zheng <zhengq20018@cmbchina.com>

Feat: support step group with substep in the workflow
Signed-off-by: default avatarQiang Zheng <zhengq20018@cmbchina.com>

Feat: support step group with substep in the workflow
Signed-off-by: default avatarQiang Zheng <zhengq20018@cmbchina.com>

Feat: support step group with substep in the workflow
Signed-off-by: default avatarQiang Zheng <zhengq20018@cmbchina.com>

Feat: support step group with substep in the workflow

Signed-off-by: Qia...
parent 493a6098
Showing with 1583 additions and 675 deletions
+1583 -675
......@@ -268,8 +268,8 @@ type RawComponent struct {
Raw runtime.RawExtension `json:"raw"`
}
// WorkflowStepStatus record the status of a workflow step
type WorkflowStepStatus struct {
// StepStatus record the base status of workflow step, which could be workflow step or subStep
type StepStatus struct {
ID string `json:"id"`
Name string `json:"name,omitempty"`
Type string `json:"type,omitempty"`
......@@ -277,24 +277,22 @@ type WorkflowStepStatus struct {
// A human readable message indicating details about why the workflowStep is in this state.
Message string `json:"message,omitempty"`
// A brief CamelCase message indicating details about why the workflowStep is in this state.
Reason string `json:"reason,omitempty"`
SubSteps *SubStepsStatus `json:"subSteps,omitempty"`
Reason string `json:"reason,omitempty"`
// FirstExecuteTime is the first time this step execution.
FirstExecuteTime metav1.Time `json:"firstExecuteTime,omitempty"`
// LastExecuteTime is the last time this step execution.
LastExecuteTime metav1.Time `json:"lastExecuteTime,omitempty"`
}
// WorkflowSubStepStatus record the status of a workflow step
// WorkflowStepStatus record the status of a workflow step, include step status and subStep status
type WorkflowStepStatus struct {
StepStatus `json:",inline"`
SubStepsStatus []WorkflowSubStepStatus `json:"subSteps,omitempty"`
}
// WorkflowSubStepStatus record the status of a workflow subStep
type WorkflowSubStepStatus struct {
ID string `json:"id"`
Name string `json:"name,omitempty"`
Type string `json:"type,omitempty"`
Phase WorkflowStepPhase `json:"phase,omitempty"`
// A human readable message indicating details about why the workflowStep is in this state.
Message string `json:"message,omitempty"`
// A brief CamelCase message indicating details about why the workflowStep is in this state.
Reason string `json:"reason,omitempty"`
StepStatus `json:",inline"`
}
// AppStatus defines the observed state of Application
......@@ -347,6 +345,25 @@ type WorkflowStep struct {
// +kubebuilder:pruning:PreserveUnknownFields
Properties *runtime.RawExtension `json:"properties,omitempty"`
SubSteps []WorkflowSubStep `json:"subSteps,omitempty"`
DependsOn []string `json:"dependsOn,omitempty"`
Inputs StepInputs `json:"inputs,omitempty"`
Outputs StepOutputs `json:"outputs,omitempty"`
}
// WorkflowSubStep defines how to execute a workflow subStep.
type WorkflowSubStep struct {
// Name is the unique name of the workflow step.
Name string `json:"name"`
Type string `json:"type"`
// +kubebuilder:pruning:PreserveUnknownFields
Properties *runtime.RawExtension `json:"properties,omitempty"`
DependsOn []string `json:"dependsOn,omitempty"`
Inputs StepInputs `json:"inputs,omitempty"`
......@@ -372,13 +389,6 @@ type WorkflowStatus struct {
StartTime metav1.Time `json:"startTime,omitempty"`
}
// SubStepsStatus record the status of workflow steps.
type SubStepsStatus struct {
StepIndex int `json:"stepIndex,omitempty"`
Mode WorkflowMode `json:"mode,omitempty"`
Steps []WorkflowSubStepStatus `json:"steps,omitempty"`
}
// WorkflowStepPhase describes the phase of a workflow step.
type WorkflowStepPhase string
......
......@@ -612,21 +612,18 @@ func (in StepOutputs) DeepCopy() StepOutputs {
}
// DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil.
func (in *SubStepsStatus) DeepCopyInto(out *SubStepsStatus) {
func (in *StepStatus) DeepCopyInto(out *StepStatus) {
*out = *in
if in.Steps != nil {
in, out := &in.Steps, &out.Steps
*out = make([]WorkflowSubStepStatus, len(*in))
copy(*out, *in)
}
in.FirstExecuteTime.DeepCopyInto(&out.FirstExecuteTime)
in.LastExecuteTime.DeepCopyInto(&out.LastExecuteTime)
}
// DeepCopy is an autogenerated deepcopy function, copying the receiver, creating a new SubStepsStatus.
func (in *SubStepsStatus) DeepCopy() *SubStepsStatus {
// DeepCopy is an autogenerated deepcopy function, copying the receiver, creating a new StepStatus.
func (in *StepStatus) DeepCopy() *StepStatus {
if in == nil {
return nil
}
out := new(SubStepsStatus)
out := new(StepStatus)
in.DeepCopyInto(out)
return out
}
......@@ -692,6 +689,13 @@ func (in *WorkflowStep) DeepCopyInto(out *WorkflowStep) {
*out = new(runtime.RawExtension)
(*in).DeepCopyInto(*out)
}
if in.SubSteps != nil {
in, out := &in.SubSteps, &out.SubSteps
*out = make([]WorkflowSubStep, len(*in))
for i := range *in {
(*in)[i].DeepCopyInto(&(*out)[i])
}
}
if in.DependsOn != nil {
in, out := &in.DependsOn, &out.DependsOn
*out = make([]string, len(*in))
......@@ -722,13 +726,14 @@ func (in *WorkflowStep) DeepCopy() *WorkflowStep {
// DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil.
func (in *WorkflowStepStatus) DeepCopyInto(out *WorkflowStepStatus) {
*out = *in
if in.SubSteps != nil {
in, out := &in.SubSteps, &out.SubSteps
*out = new(SubStepsStatus)
(*in).DeepCopyInto(*out)
in.StepStatus.DeepCopyInto(&out.StepStatus)
if in.SubStepsStatus != nil {
in, out := &in.SubStepsStatus, &out.SubStepsStatus
*out = make([]WorkflowSubStepStatus, len(*in))
for i := range *in {
(*in)[i].DeepCopyInto(&(*out)[i])
}
}
in.FirstExecuteTime.DeepCopyInto(&out.FirstExecuteTime)
in.LastExecuteTime.DeepCopyInto(&out.LastExecuteTime)
}
// DeepCopy is an autogenerated deepcopy function, copying the receiver, creating a new WorkflowStepStatus.
......@@ -741,9 +746,45 @@ func (in *WorkflowStepStatus) DeepCopy() *WorkflowStepStatus {
return out
}
// DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil.
func (in *WorkflowSubStep) DeepCopyInto(out *WorkflowSubStep) {
*out = *in
if in.Properties != nil {
in, out := &in.Properties, &out.Properties
*out = new(runtime.RawExtension)
(*in).DeepCopyInto(*out)
}
if in.DependsOn != nil {
in, out := &in.DependsOn, &out.DependsOn
*out = make([]string, len(*in))
copy(*out, *in)
}
if in.Inputs != nil {
in, out := &in.Inputs, &out.Inputs
*out = make(StepInputs, len(*in))
copy(*out, *in)
}
if in.Outputs != nil {
in, out := &in.Outputs, &out.Outputs
*out = make(StepOutputs, len(*in))
copy(*out, *in)
}
}
// DeepCopy is an autogenerated deepcopy function, copying the receiver, creating a new WorkflowSubStep.
func (in *WorkflowSubStep) DeepCopy() *WorkflowSubStep {
if in == nil {
return nil
}
out := new(WorkflowSubStep)
in.DeepCopyInto(out)
return out
}
// DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil.
func (in *WorkflowSubStepStatus) DeepCopyInto(out *WorkflowSubStepStatus) {
*out = *in
in.StepStatus.DeepCopyInto(&out.StepStatus)
}
// DeepCopy is an autogenerated deepcopy function, copying the receiver, creating a new WorkflowSubStepStatus.
......
......@@ -954,6 +954,13 @@ func (in *WorkflowStep) DeepCopyInto(out *WorkflowStep) {
*out = new(runtime.RawExtension)
(*in).DeepCopyInto(*out)
}
if in.SubSteps != nil {
in, out := &in.SubSteps, &out.SubSteps
*out = make([]common.WorkflowSubStep, len(*in))
for i := range *in {
(*in)[i].DeepCopyInto(&(*out)[i])
}
}
if in.DependsOn != nil {
in, out := &in.DependsOn, &out.DependsOn
*out = make([]string, len(*in))
......
This diff is collapsed.
......@@ -779,7 +779,7 @@ spec:
steps:
items:
description: WorkflowStepStatus record the status of a workflow
step
step, include step status and subStep status
properties:
firstExecuteTime:
description: FirstExecuteTime is the first time this step
......@@ -808,44 +808,42 @@ spec:
about why the workflowStep is in this state.
type: string
subSteps:
description: SubStepsStatus record the status of workflow
steps.
properties:
mode:
description: WorkflowMode describes the mode of workflow
type: string
stepIndex:
type: integer
steps:
items:
description: WorkflowSubStepStatus record the status
of a workflow step
properties:
id:
type: string
message:
description: A human readable message indicating
details about why the workflowStep is in this
state.
type: string
name:
type: string
phase:
description: WorkflowStepPhase describes the phase
of a workflow step.
type: string
reason:
description: A brief CamelCase message indicating
details about why the workflowStep is in this
state.
type: string
type:
type: string
required:
- id
type: object
type: array
type: object
items:
description: WorkflowSubStepStatus record the status of
a workflow subStep
properties:
firstExecuteTime:
description: FirstExecuteTime is the first time this
step execution.
format: date-time
type: string
id:
type: string
lastExecuteTime:
description: LastExecuteTime is the last time this
step execution.
format: date-time
type: string
message:
description: A human readable message indicating details
about why the workflowStep is in this state.
type: string
name:
type: string
phase:
description: WorkflowStepPhase describes the phase
of a workflow step.
type: string
reason:
description: A brief CamelCase message indicating
details about why the workflowStep is in this state.
type: string
type:
type: string
required:
- id
type: object
type: array
type:
type: string
required:
......@@ -1054,6 +1052,57 @@ spec:
properties:
type: object
x-kubernetes-preserve-unknown-fields: true
subSteps:
items:
description: WorkflowSubStep defines how to execute a
workflow subStep.
properties:
dependsOn:
items:
type: string
type: array
inputs:
description: StepInputs defines variable input of
WorkflowStep
items:
properties:
from:
type: string
parameterKey:
type: string
required:
- from
- parameterKey
type: object
type: array
name:
description: Name is the unique name of the workflow
step.
type: string
outputs:
description: StepOutputs defines output variable of
WorkflowStep
items:
properties:
name:
type: string
valueFrom:
type: string
required:
- name
- valueFrom
type: object
type: array
properties:
type: object
x-kubernetes-preserve-unknown-fields: true
type:
type: string
required:
- name
- type
type: object
type: array
type:
type: string
required:
......@@ -1446,7 +1495,7 @@ spec:
steps:
items:
description: WorkflowStepStatus record the status of a workflow
step
step, include step status and subStep status
properties:
firstExecuteTime:
description: FirstExecuteTime is the first time this step
......@@ -1475,44 +1524,42 @@ spec:
about why the workflowStep is in this state.
type: string
subSteps:
description: SubStepsStatus record the status of workflow
steps.
properties:
mode:
description: WorkflowMode describes the mode of workflow
type: string
stepIndex:
type: integer
steps:
items:
description: WorkflowSubStepStatus record the status
of a workflow step
properties:
id:
type: string
message:
description: A human readable message indicating
details about why the workflowStep is in this
state.
type: string
name:
type: string
phase:
description: WorkflowStepPhase describes the phase
of a workflow step.
type: string
reason:
description: A brief CamelCase message indicating
details about why the workflowStep is in this
state.
type: string
type:
type: string
required:
- id
type: object
type: array
type: object
items:
description: WorkflowSubStepStatus record the status of
a workflow subStep
properties:
firstExecuteTime:
description: FirstExecuteTime is the first time this
step execution.
format: date-time
type: string
id:
type: string
lastExecuteTime:
description: LastExecuteTime is the last time this
step execution.
format: date-time
type: string
message:
description: A human readable message indicating details
about why the workflowStep is in this state.
type: string
name:
type: string
phase:
description: WorkflowStepPhase describes the phase
of a workflow step.
type: string
reason:
description: A brief CamelCase message indicating
details about why the workflowStep is in this state.
type: string
type:
type: string
required:
- id
type: object
type: array
type:
type: string
required:
......
......@@ -74,6 +74,54 @@ spec:
properties:
type: object
x-kubernetes-preserve-unknown-fields: true
subSteps:
items:
description: WorkflowSubStep defines how to execute a workflow
subStep.
properties:
dependsOn:
items:
type: string
type: array
inputs:
description: StepInputs defines variable input of WorkflowStep
items:
properties:
from:
type: string
parameterKey:
type: string
required:
- from
- parameterKey
type: object
type: array
name:
description: Name is the unique name of the workflow step.
type: string
outputs:
description: StepOutputs defines output variable of WorkflowStep
items:
properties:
name:
type: string
valueFrom:
type: string
required:
- name
- valueFrom
type: object
type: array
properties:
type: object
x-kubernetes-preserve-unknown-fields: true
type:
type: string
required:
- name
- type
type: object
type: array
type:
type: string
required:
......@@ -131,6 +179,54 @@ spec:
properties:
type: object
x-kubernetes-preserve-unknown-fields: true
subSteps:
items:
description: WorkflowSubStep defines how to execute a workflow
subStep.
properties:
dependsOn:
items:
type: string
type: array
inputs:
description: StepInputs defines variable input of WorkflowStep
items:
properties:
from:
type: string
parameterKey:
type: string
required:
- from
- parameterKey
type: object
type: array
name:
description: Name is the unique name of the workflow step.
type: string
outputs:
description: StepOutputs defines output variable of WorkflowStep
items:
properties:
name:
type: string
valueFrom:
type: string
required:
- name
- valueFrom
type: object
type: array
properties:
type: object
x-kubernetes-preserve-unknown-fields: true
type:
type: string
required:
- name
- type
type: object
type: array
type:
type: string
required:
......
# Code generated by KubeVela templates. DO NOT EDIT. Please edit the original cue file.
# Definition source cue file: vela-templates/definitions/internal/step-group.cue
apiVersion: core.oam.dev/v1beta1
kind: WorkflowStepDefinition
metadata:
annotations:
definition.oam.dev/description: step group
name: step-group
namespace: {{ include "systemDefinitionNamespace" . }}
spec:
schematic:
cue:
template: |
// no parameters
parameter: {}
......@@ -779,7 +779,7 @@ spec:
steps:
items:
description: WorkflowStepStatus record the status of a workflow
step
step, include step status and subStep status
properties:
firstExecuteTime:
description: FirstExecuteTime is the first time this step
......@@ -808,44 +808,42 @@ spec:
about why the workflowStep is in this state.
type: string
subSteps:
description: SubStepsStatus record the status of workflow
steps.
properties:
mode:
description: WorkflowMode describes the mode of workflow
type: string
stepIndex:
type: integer
steps:
items:
description: WorkflowSubStepStatus record the status
of a workflow step
properties:
id:
type: string
message:
description: A human readable message indicating
details about why the workflowStep is in this
state.
type: string
name:
type: string
phase:
description: WorkflowStepPhase describes the phase
of a workflow step.
type: string
reason:
description: A brief CamelCase message indicating
details about why the workflowStep is in this
state.
type: string
type:
type: string
required:
- id
type: object
type: array
type: object
items:
description: WorkflowSubStepStatus record the status of
a workflow subStep
properties:
firstExecuteTime:
description: FirstExecuteTime is the first time this
step execution.
format: date-time
type: string
id:
type: string
lastExecuteTime:
description: LastExecuteTime is the last time this
step execution.
format: date-time
type: string
message:
description: A human readable message indicating details
about why the workflowStep is in this state.
type: string
name:
type: string
phase:
description: WorkflowStepPhase describes the phase
of a workflow step.
type: string
reason:
description: A brief CamelCase message indicating
details about why the workflowStep is in this state.
type: string
type:
type: string
required:
- id
type: object
type: array
type:
type: string
required:
......@@ -1054,6 +1052,57 @@ spec:
properties:
type: object
x-kubernetes-preserve-unknown-fields: true
subSteps:
items:
description: WorkflowSubStep defines how to execute a
workflow subStep.
properties:
dependsOn:
items:
type: string
type: array
inputs:
description: StepInputs defines variable input of
WorkflowStep
items:
properties:
from:
type: string
parameterKey:
type: string
required:
- from
- parameterKey
type: object
type: array
name:
description: Name is the unique name of the workflow
step.
type: string
outputs:
description: StepOutputs defines output variable of
WorkflowStep
items:
properties:
name:
type: string
valueFrom:
type: string
required:
- name
- valueFrom
type: object
type: array
properties:
type: object
x-kubernetes-preserve-unknown-fields: true
type:
type: string
required:
- name
- type
type: object
type: array
type:
type: string
required:
......@@ -1446,7 +1495,7 @@ spec:
steps:
items:
description: WorkflowStepStatus record the status of a workflow
step
step, include step status and subStep status
properties:
firstExecuteTime:
description: FirstExecuteTime is the first time this step
......@@ -1475,44 +1524,42 @@ spec:
about why the workflowStep is in this state.
type: string
subSteps:
description: SubStepsStatus record the status of workflow
steps.
properties:
mode:
description: WorkflowMode describes the mode of workflow
type: string
stepIndex:
type: integer
steps:
items:
description: WorkflowSubStepStatus record the status
of a workflow step
properties:
id:
type: string
message:
description: A human readable message indicating
details about why the workflowStep is in this
state.
type: string
name:
type: string
phase:
description: WorkflowStepPhase describes the phase
of a workflow step.
type: string
reason:
description: A brief CamelCase message indicating
details about why the workflowStep is in this
state.
type: string
type:
type: string
required:
- id
type: object
type: array
type: object
items:
description: WorkflowSubStepStatus record the status of
a workflow subStep
properties:
firstExecuteTime:
description: FirstExecuteTime is the first time this
step execution.
format: date-time
type: string
id:
type: string
lastExecuteTime:
description: LastExecuteTime is the last time this
step execution.
format: date-time
type: string
message:
description: A human readable message indicating details
about why the workflowStep is in this state.
type: string
name:
type: string
phase:
description: WorkflowStepPhase describes the phase
of a workflow step.
type: string
reason:
description: A brief CamelCase message indicating
details about why the workflowStep is in this state.
type: string
type:
type: string
required:
- id
type: object
type: array
type:
type: string
required:
......
# Code generated by KubeVela templates. DO NOT EDIT. Please edit the original cue file.
# Definition source cue file: vela-templates/definitions/internal/step-group.cue
apiVersion: core.oam.dev/v1beta1
kind: WorkflowStepDefinition
metadata:
annotations:
definition.oam.dev/description: step group
name: step-group
namespace: {{ include "systemDefinitionNamespace" . }}
spec:
schematic:
cue:
template: |
// no parameters
parameter: {}
......@@ -652,7 +652,7 @@ The process goes as:
- The HelmTemplate/KustomizePatch controller would read the template from specified source, render the final config. It will compare the config with the Application object -- if there is difference, it will write back to the Application object per se.
- The update of Application will trigger another event, the app controller will apply the HelmTemplate/KustomizePatch objects with new context. But this time, the HelmTemplate/KustomizePatch controller will find no diff after the rendering. So it will skip this time.
### Case 5: Conditional Check
### Case 6: Conditional Check
In this case, users want to execute different steps based on the responseCode. When the `if` condition is not met, the step will be skipped.
......@@ -684,6 +684,28 @@ steps:
type: notification
```
### Case 7: step group
In this case, the user runs multiple workflow steps in the `step-group` workflow type. subSteps in a step group will be executed in dag mode.
```yaml
workflow:
steps:
- type: step-group
name: run-step-group1
subSteps:
- name: sub-step1
type: ...
...
- name: sub-step2
type: ...
...
```
The process is as follows:
- When executing a `step-group` step, the subSteps in the step group are executed in dag mode. A step group will only complete when all subSteps have been executed to completion.
## Considerations
### Comparison with Argo Workflow/Tekton
......
apiVersion: core.oam.dev/v1beta1
kind: Application
metadata:
name: step-group-example
namespace: default
spec:
components:
- name: express-server
type: webservice
properties:
image: crccheck/hello-world
port: 8000
workflow:
steps:
- name: step
type: step-group
subSteps:
- name: apply-server
type: apply-component
properties:
component: express-server
......@@ -780,7 +780,7 @@ spec:
steps:
items:
description: WorkflowStepStatus record the status of a workflow
step
step, include step status and subStep status
properties:
firstExecuteTime:
description: FirstExecuteTime is the first time this step
......@@ -809,44 +809,42 @@ spec:
about why the workflowStep is in this state.
type: string
subSteps:
description: SubStepsStatus record the status of workflow
steps.
properties:
mode:
description: WorkflowMode describes the mode of workflow
type: string
stepIndex:
type: integer
steps:
items:
description: WorkflowSubStepStatus record the status
of a workflow step
properties:
id:
type: string
message:
description: A human readable message indicating
details about why the workflowStep is in this
state.
type: string
name:
type: string
phase:
description: WorkflowStepPhase describes the phase
of a workflow step.
type: string
reason:
description: A brief CamelCase message indicating
details about why the workflowStep is in this
state.
type: string
type:
type: string
required:
- id
type: object
type: array
type: object
items:
description: WorkflowSubStepStatus record the status of
a workflow subStep
properties:
firstExecuteTime:
description: FirstExecuteTime is the first time this
step execution.
format: date-time
type: string
id:
type: string
lastExecuteTime:
description: LastExecuteTime is the last time this
step execution.
format: date-time
type: string
message:
description: A human readable message indicating details
about why the workflowStep is in this state.
type: string
name:
type: string
phase:
description: WorkflowStepPhase describes the phase
of a workflow step.
type: string
reason:
description: A brief CamelCase message indicating
details about why the workflowStep is in this state.
type: string
type:
type: string
required:
- id
type: object
type: array
type:
type: string
required:
......@@ -1055,6 +1053,57 @@ spec:
properties:
type: object
subSteps:
items:
description: WorkflowSubStep defines how to execute a
workflow subStep.
properties:
dependsOn:
items:
type: string
type: array
inputs:
description: StepInputs defines variable input of
WorkflowStep
items:
properties:
from:
type: string
parameterKey:
type: string
required:
- from
- parameterKey
type: object
type: array
name:
description: Name is the unique name of the workflow
step.
type: string
outputs:
description: StepOutputs defines output variable of
WorkflowStep
items:
properties:
name:
type: string
valueFrom:
type: string
required:
- name
- valueFrom
type: object
type: array
properties:
type: object
type:
type: string
required:
- name
- type
type: object
type: array
type:
type: string
required:
......@@ -1447,7 +1496,7 @@ spec:
steps:
items:
description: WorkflowStepStatus record the status of a workflow
step
step, include step status and subStep status
properties:
firstExecuteTime:
description: FirstExecuteTime is the first time this step
......@@ -1476,44 +1525,42 @@ spec:
about why the workflowStep is in this state.
type: string
subSteps:
description: SubStepsStatus record the status of workflow
steps.
properties:
mode:
description: WorkflowMode describes the mode of workflow
type: string
stepIndex:
type: integer
steps:
items:
description: WorkflowSubStepStatus record the status
of a workflow step
properties:
id:
type: string
message:
description: A human readable message indicating
details about why the workflowStep is in this
state.
type: string
name:
type: string
phase:
description: WorkflowStepPhase describes the phase
of a workflow step.
type: string
reason:
description: A brief CamelCase message indicating
details about why the workflowStep is in this
state.
type: string
type:
type: string
required:
- id
type: object
type: array
type: object
items:
description: WorkflowSubStepStatus record the status of
a workflow subStep
properties:
firstExecuteTime:
description: FirstExecuteTime is the first time this
step execution.
format: date-time
type: string
id:
type: string
lastExecuteTime:
description: LastExecuteTime is the last time this
step execution.
format: date-time
type: string
message:
description: A human readable message indicating details
about why the workflowStep is in this state.
type: string
name:
type: string
phase:
description: WorkflowStepPhase describes the phase
of a workflow step.
type: string
reason:
description: A brief CamelCase message indicating
details about why the workflowStep is in this state.
type: string
type:
type: string
required:
- id
type: object
type: array
type:
type: string
required:
......
......@@ -74,6 +74,54 @@ spec:
properties:
type: object
subSteps:
items:
description: WorkflowSubStep defines how to execute a workflow
subStep.
properties:
dependsOn:
items:
type: string
type: array
inputs:
description: StepInputs defines variable input of WorkflowStep
items:
properties:
from:
type: string
parameterKey:
type: string
required:
- from
- parameterKey
type: object
type: array
name:
description: Name is the unique name of the workflow step.
type: string
outputs:
description: StepOutputs defines output variable of WorkflowStep
items:
properties:
name:
type: string
valueFrom:
type: string
required:
- name
- valueFrom
type: object
type: array
properties:
type: object
type:
type: string
required:
- name
- type
type: object
type: array
type:
type: string
required:
......@@ -131,6 +179,54 @@ spec:
properties:
type: object
subSteps:
items:
description: WorkflowSubStep defines how to execute a workflow
subStep.
properties:
dependsOn:
items:
type: string
type: array
inputs:
description: StepInputs defines variable input of WorkflowStep
items:
properties:
from:
type: string
parameterKey:
type: string
required:
- from
- parameterKey
type: object
type: array
name:
description: Name is the unique name of the workflow step.
type: string
outputs:
description: StepOutputs defines output variable of WorkflowStep
items:
properties:
name:
type: string
valueFrom:
type: string
required:
- name
- valueFrom
type: object
type: array
properties:
type: object
type:
type: string
required:
- name
- type
type: object
type: array
type:
type: string
required:
......
......@@ -444,21 +444,37 @@ func (p *Parser) parseWorkflowSteps(ctx context.Context, af *Appfile) error {
return err
}
for _, workflowStep := range af.WorkflowSteps {
if wftypes.IsBuiltinWorkflowStepType(workflowStep.Type) {
continue
}
if _, found := af.RelatedWorkflowStepDefinitions[workflowStep.Type]; found {
continue
err := p.parseWorkflowStep(ctx, af, workflowStep.Type)
if err != nil {
return err
}
def := &v1beta1.WorkflowStepDefinition{}
if err := util.GetCapabilityDefinition(ctx, p.client, def, workflowStep.Type); err != nil {
return errors.Wrapf(err, "failed to get workflow step definition %s", workflowStep.Type)
if workflowStep.SubSteps != nil {
for _, workflowSubStep := range workflowStep.SubSteps {
err := p.parseWorkflowStep(ctx, af, workflowSubStep.Type)
if err != nil {
return err
}
}
}
af.RelatedWorkflowStepDefinitions[workflowStep.Type] = def
}
return nil
}
func (p *Parser) parseWorkflowStep(ctx context.Context, af *Appfile, workflowStepType string) error {
if wftypes.IsBuiltinWorkflowStepType(workflowStepType) {
return nil
}
if _, found := af.RelatedWorkflowStepDefinitions[workflowStepType]; found {
return nil
}
def := &v1beta1.WorkflowStepDefinition{}
if err := util.GetCapabilityDefinition(ctx, p.client, def, workflowStepType); err != nil {
return errors.Wrapf(err, "failed to get workflow step definition %s", workflowStepType)
}
af.RelatedWorkflowStepDefinitions[workflowStepType] = def
return nil
}
func (p *Parser) makeWorkload(ctx context.Context, name, typ string, capType types.CapType, props *runtime.RawExtension) (*Workload, error) {
templ, err := p.tmplLoader.LoadTemplate(ctx, p.dm, p.client, typ, capType)
if err != nil {
......
......@@ -83,27 +83,7 @@ func (h *AppHandler) GenerateApplicationSteps(ctx monitorContext.Context,
var tasks []wfTypes.TaskRunner
for _, step := range af.WorkflowSteps {
options := &wfTypes.GeneratorOptions{
ID: generateStepID(step.Name, app.Status.Workflow),
}
generatorName := step.Type
if generatorName == wfTypes.WorkflowStepTypeApplyComponent {
generatorName = wfTypes.WorkflowStepTypeBuiltinApplyComponent
options.StepConvertor = func(lstep v1beta1.WorkflowStep) (v1beta1.WorkflowStep, error) {
copierStep := lstep.DeepCopy()
if err := convertStepProperties(copierStep, app); err != nil {
return lstep, errors.WithMessage(err, "convert [apply-component]")
}
return *copierStep, nil
}
}
genTask, err := taskDiscover.GetTaskGenerator(ctx, generatorName)
if err != nil {
return nil, err
}
task, err := genTask(step, options)
task, err := generateStep(ctx, app, step, taskDiscover, "")
if err != nil {
return nil, err
}
......@@ -112,6 +92,57 @@ func (h *AppHandler) GenerateApplicationSteps(ctx monitorContext.Context,
return tasks, nil
}
func generateStep(ctx context.Context,
app *v1beta1.Application,
step v1beta1.WorkflowStep,
taskDiscover wfTypes.TaskDiscover,
parentStepName string) (wfTypes.TaskRunner, error) {
options := &wfTypes.GeneratorOptions{
ID: generateStepID(step.Name, app.Status.Workflow, parentStepName),
}
generatorName := step.Type
switch {
case generatorName == wfTypes.WorkflowStepTypeApplyComponent:
generatorName = wfTypes.WorkflowStepTypeBuiltinApplyComponent
options.StepConvertor = func(lstep v1beta1.WorkflowStep) (v1beta1.WorkflowStep, error) {
copierStep := lstep.DeepCopy()
if err := convertStepProperties(copierStep, app); err != nil {
return lstep, errors.WithMessage(err, "convert [apply-component]")
}
return *copierStep, nil
}
case generatorName == wfTypes.WorkflowStepTypeStepGroup:
var subTaskRunners []wfTypes.TaskRunner
for _, subStep := range step.SubSteps {
workflowStep := v1beta1.WorkflowStep{
Name: subStep.Name,
Type: subStep.Type,
Properties: subStep.Properties,
DependsOn: subStep.DependsOn,
Inputs: subStep.Inputs,
Outputs: subStep.Outputs,
}
subTask, err := generateStep(ctx, app, workflowStep, taskDiscover, step.Name)
if err != nil {
return nil, err
}
subTaskRunners = append(subTaskRunners, subTask)
}
options.SubTaskRunners = subTaskRunners
}
genTask, err := taskDiscover.GetTaskGenerator(ctx, generatorName)
if err != nil {
return nil, err
}
task, err := genTask(step, options)
if err != nil {
return nil, err
}
return task, nil
}
func convertStepProperties(step *v1beta1.WorkflowStep, app *v1beta1.Application) error {
o := struct {
Component string `json:"component"`
......@@ -360,15 +391,38 @@ func getComponentResources(ctx context.Context, manifest *types.ComponentManifes
return workload, traits, nil
}
func generateStepID(stepName string, wfStatus *common.WorkflowStatus) string {
func getStepID(stepName string, stepsStatus []common.StepStatus) string {
for _, status := range stepsStatus {
if status.Name == stepName {
return status.ID
}
}
return ""
}
func generateStepID(stepName string, workflowStatus *common.WorkflowStatus, parentStepName string) string {
var id string
if wfStatus != nil {
for _, status := range wfStatus.Steps {
if status.Name == stepName {
id = status.ID
if workflowStatus != nil {
workflowStepsStatus := workflowStatus.Steps
if parentStepName != "" {
for _, status := range workflowStepsStatus {
if status.Name == parentStepName {
var stepsStatus []common.StepStatus
for _, status := range status.SubStepsStatus {
stepsStatus = append(stepsStatus, status.StepStatus)
}
id = getStepID(stepName, stepsStatus)
}
}
} else {
var stepsStatus []common.StepStatus
for _, status := range workflowStepsStatus {
stepsStatus = append(stepsStatus, status.StepStatus)
}
id = getStepID(stepName, stepsStatus)
}
}
if id == "" {
id = utils.RandomString(10)
}
......
......@@ -84,7 +84,7 @@ func (t *TaskLoader) GetTaskGenerator(ctx context.Context, name string) (wfTypes
type taskRunner struct {
name string
run func(ctx wfContext.Context, options *wfTypes.TaskRunOptions) (common.WorkflowStepStatus, *wfTypes.Operation, error)
run func(ctx wfContext.Context, options *wfTypes.TaskRunOptions) (common.StepStatus, *wfTypes.Operation, error)
checkPending func(ctx wfContext.Context) bool
}
......@@ -94,7 +94,7 @@ func (tr *taskRunner) Name() string {
}
// Run execute task.
func (tr *taskRunner) Run(ctx wfContext.Context, options *wfTypes.TaskRunOptions) (common.WorkflowStepStatus, *wfTypes.Operation, error) {
func (tr *taskRunner) Run(ctx wfContext.Context, options *wfTypes.TaskRunOptions) (common.StepStatus, *wfTypes.Operation, error) {
return tr.run(ctx, options)
}
......@@ -103,12 +103,17 @@ func (tr *taskRunner) Pending(ctx wfContext.Context) bool {
return tr.checkPending(ctx)
}
// SubTaskRunners return child step names. it could be null if the step have no sub-step
func (tr *taskRunner) SubTaskRunners() []wfTypes.TaskRunner {
return nil
}
func (t *TaskLoader) makeTaskGenerator(templ string) (wfTypes.TaskGenerator, error) {
return func(wfStep v1beta1.WorkflowStep, genOpt *wfTypes.GeneratorOptions) (wfTypes.TaskRunner, error) {
exec := &executor{
handlers: t.handlers,
wfStatus: common.WorkflowStepStatus{
wfStatus: common.StepStatus{
Name: wfStep.Name,
Type: wfStep.Type,
Phase: common.WorkflowStepPhaseSucceeded,
......@@ -154,7 +159,7 @@ func (t *TaskLoader) makeTaskGenerator(templ string) (wfTypes.TaskGenerator, err
}
return false
}
tRunner.run = func(ctx wfContext.Context, options *wfTypes.TaskRunOptions) (common.WorkflowStepStatus, *wfTypes.Operation, error) {
tRunner.run = func(ctx wfContext.Context, options *wfTypes.TaskRunOptions) (common.StepStatus, *wfTypes.Operation, error) {
if options.GetTracer == nil {
options.GetTracer = func(id string, step v1beta1.WorkflowStep) monitorContext.Context {
return monitorContext.NewTraceContext(context.Background(), "")
......@@ -177,13 +182,13 @@ func (t *TaskLoader) makeTaskGenerator(templ string) (wfTypes.TaskGenerator, err
paramsValue, err := ctx.MakeParameter(params)
if err != nil {
tracer.Error(err, "make parameter")
return common.WorkflowStepStatus{}, nil, errors.WithMessage(err, "make parameter")
return common.StepStatus{}, nil, errors.WithMessage(err, "make parameter")
}
for _, hook := range options.PreStartHooks {
if err := hook(ctx, paramsValue, wfStep); err != nil {
tracer.Error(err, "do preStartHook")
return common.WorkflowStepStatus{}, nil, errors.WithMessage(err, "do preStartHook")
return common.StepStatus{}, nil, errors.WithMessage(err, "do preStartHook")
}
}
......@@ -196,7 +201,7 @@ func (t *TaskLoader) makeTaskGenerator(templ string) (wfTypes.TaskGenerator, err
if params != nil {
ps, err := paramsValue.String()
if err != nil {
return common.WorkflowStepStatus{}, nil, errors.WithMessage(err, "params encode")
return common.StepStatus{}, nil, errors.WithMessage(err, "params encode")
}
paramFile = fmt.Sprintf(model.ParameterFieldName+": {%s}\n", ps)
}
......@@ -256,7 +261,7 @@ func (t *TaskLoader) makeValue(ctx wfContext.Context, templ string, id string, p
type executor struct {
handlers providers.Providers
wfStatus common.WorkflowStepStatus
wfStatus common.StepStatus
suspend bool
terminated bool
failedAfterRetries bool
......@@ -314,7 +319,7 @@ func (exec *executor) operation() *wfTypes.Operation {
}
}
func (exec *executor) status() common.WorkflowStepStatus {
func (exec *executor) status() common.StepStatus {
return exec.wfStatus
}
......
......@@ -87,6 +87,14 @@ func suspend(step v1beta1.WorkflowStep, opt *types.GeneratorOptions) (types.Task
return tr, nil
}
func stepGroup(step v1beta1.WorkflowStep, opt *types.GeneratorOptions) (types.TaskRunner, error) {
return &stepGroupTaskRunner{
id: opt.ID,
name: step.Name,
subTaskRunners: opt.SubTaskRunners,
}, nil
}
func newTaskDiscover(ctx monitorContext.Context, providerHandlers providers.Providers, pd *packages.PackageDiscover, pCtx process.Context, templateLoader template.Loader) types.TaskDiscover {
// install builtin provider
workspace.Install(providerHandlers)
......@@ -95,7 +103,8 @@ func newTaskDiscover(ctx monitorContext.Context, providerHandlers providers.Prov
return &taskDiscover{
builtins: map[string]types.TaskGenerator{
types.WorkflowStepTypeSuspend: suspend,
types.WorkflowStepTypeSuspend: suspend,
types.WorkflowStepTypeStepGroup: stepGroup,
},
remoteTaskDiscover: custom.NewTaskLoader(templateLoader.LoadTaskTemplate, pd, providerHandlers, 0, pCtx),
templateLoader: templateLoader,
......@@ -120,8 +129,8 @@ func (tr *suspendTaskRunner) Name() string {
}
// Run make workflow suspend.
func (tr *suspendTaskRunner) Run(ctx wfContext.Context, options *types.TaskRunOptions) (common.WorkflowStepStatus, *types.Operation, error) {
stepStatus := common.WorkflowStepStatus{
func (tr *suspendTaskRunner) Run(ctx wfContext.Context, options *types.TaskRunOptions) (common.StepStatus, *types.Operation, error) {
stepStatus := common.StepStatus{
ID: tr.id,
Name: tr.name,
Type: types.WorkflowStepTypeSuspend,
......@@ -140,6 +149,46 @@ func (tr *suspendTaskRunner) Pending(ctx wfContext.Context) bool {
return false
}
// SubTaskRunners return child step names. it could be null if the step have no sub-step
func (tr *suspendTaskRunner) SubTaskRunners() []types.TaskRunner {
return nil
}
type stepGroupTaskRunner struct {
id string
name string
subTaskRunners []types.TaskRunner
}
// Name return suspend step name.
func (tr *stepGroupTaskRunner) Name() string {
return tr.name
}
// SubTaskRunners return child step runners. it could be null if the step have no sub-step
func (tr *stepGroupTaskRunner) SubTaskRunners() []types.TaskRunner {
return tr.subTaskRunners
}
// Run make workflow step group.
func (tr *stepGroupTaskRunner) Run(ctx wfContext.Context, options *types.TaskRunOptions) (common.StepStatus, *types.Operation, error) {
phase := common.WorkflowStepPhaseRunning
if tr.subTaskRunners == nil {
phase = common.WorkflowStepPhaseSucceeded
}
return common.StepStatus{
ID: tr.id,
Name: tr.name,
Type: types.WorkflowStepTypeStepGroup,
Phase: phase,
}, &types.Operation{}, nil
}
// Pending check task should be executed or not.
func (tr *stepGroupTaskRunner) Pending(ctx wfContext.Context) bool {
return false
}
// NewViewTaskDiscover will create a client for load task generator.
func NewViewTaskDiscover(pd *packages.PackageDiscover, cli client.Client, cfg *rest.Config, apply kube.Dispatcher, delete kube.Deleter, viewNs string, logLevel int, pCtx process.Context) types.TaskDiscover {
handlerProviders := providers.NewProviders()
......
......@@ -55,13 +55,16 @@ func TestDiscover(t *testing.T) {
})
discover := &taskDiscover{
builtins: map[string]types.TaskGenerator{
"suspend": suspend,
"suspend": suspend,
"stepGroup": stepGroup,
},
remoteTaskDiscover: custom.NewTaskLoader(loadTemplate, nil, nil, 0, pCtx),
}
_, err := discover.GetTaskGenerator(context.Background(), "suspend")
assert.NilError(t, err)
_, err = discover.GetTaskGenerator(context.Background(), "stepGroup")
assert.NilError(t, err)
_, err = discover.GetTaskGenerator(context.Background(), "foo")
assert.NilError(t, err)
_, err = discover.GetTaskGenerator(context.Background(), "crazy")
......@@ -90,3 +93,23 @@ func TestSuspendStep(t *testing.T) {
assert.Equal(t, status.Name, "test")
assert.Equal(t, status.Phase, common.WorkflowStepPhaseSucceeded)
}
func TestStepGroupStep(t *testing.T) {
discover := &taskDiscover{
builtins: map[string]types.TaskGenerator{
"stepGroup": stepGroup,
},
}
gen, err := discover.GetTaskGenerator(context.Background(), "stepGroup")
assert.NilError(t, err)
runner, err := gen(v1beta1.WorkflowStep{Name: "test"}, &types.GeneratorOptions{ID: "124"})
assert.NilError(t, err)
assert.Equal(t, runner.Name(), "test")
assert.Equal(t, runner.Pending(nil), false)
status, act, err := runner.Run(nil, nil)
assert.NilError(t, err)
assert.Equal(t, act.Suspend, false)
assert.Equal(t, status.ID, "124")
assert.Equal(t, status.Name, "test")
assert.Equal(t, status.Phase, common.WorkflowStepPhaseSucceeded)
}
Supports Markdown
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment