Unverified Commit b4b33740 authored by xixi's avatar xixi Committed by GitHub
Browse files

chaosctl add recover subcommand (#3056)


* kill processes works
Signed-off-by: default avatarxixi <i@hexilee.me>

* support podSelector
Signed-off-by: default avatarxixi <i@hexilee.me>

* build ctrl server by fx
Signed-off-by: default avatarxixi <i@hexilee.me>

* make check
Signed-off-by: default avatarxixi <i@hexilee.me>

* add recover subcommand
Signed-off-by: default avatarxixi <i@hexilee.me>

* support httpchaos forcedly recover
Signed-off-by: default avatarxixi <i@hexilee.me>

* add partial pod
Signed-off-by: default avatarxixi <i@hexilee.me>

* support iochaos recover
Signed-off-by: default avatarxixi <i@hexilee.me>

* add PipelineRecover and CleanProcessRecover
Signed-off-by: default avatarxixi <i@hexilee.me>

* support stressChaos
Signed-off-by: default avatarxixi <i@hexilee.me>

* split tcQdisc and iptables rules in ctrl server
Signed-off-by: default avatarxixi <i@hexilee.me>

* ctrl server support clean tcs and iptables
Signed-off-by: default avatarxixi <i@hexilee.me>

* complete recover of network chaos
Signed-off-by: default avatarxixi <i@hexilee.me>

* support label selector
Signed-off-by: default avatarxixi <i@hexilee.me>

* make check
Signed-off-by: default avatarxixi <i@hexilee.me>

* remove specific recovery implementations
Signed-off-by: default avatarxixi <i@hexilee.me>

* add noopRecover
Signed-off-by: default avatarxixi <i@hexilee.me>

* remove mutation from ctrlserver
Signed-off-by: default avatarxixi <i@hexilee.me>

* fix comments
Signed-off-by: default avatarxixi <i@hexilee.me>

* make check
Signed-off-by: default avatarxixi <i@hexilee.me>

* add more examples in description
Signed-off-by: default avatarxixi <i@hexilee.me>

* parse labels by pkg/labels
Signed-off-by: default avatarxixi <i@hexilee.me>

* move PartialPod to pkg/chaosctl
Signed-off-by: default avatarxixi <i@hexilee.me>

* make check
Signed-off-by: default avatarxixi <i@hexilee.me>

* modify description
Signed-off-by: default avatarxixi <i@hexilee.me>
Co-authored-by: default avatarTi Chi Robot <ti-community-prow-bot@tidb.io>
parent 187418f5
Showing with 734 additions and 47 deletions
+734 -47
......@@ -23,6 +23,7 @@ import (
"os"
"time"
"github.com/99designs/gqlgen/graphql/handler"
"github.com/99designs/gqlgen/graphql/playground"
"github.com/go-logr/logr"
"go.uber.org/fx"
......@@ -83,6 +84,7 @@ func main() {
fx.Supply(controllermetrics.Registry),
fx.Supply(rootLogger),
fx.Provide(metrics.NewChaosControllerManagerMetricsCollector),
fx.Provide(ctrlserver.New),
fx.Options(
provider.Module,
controllers.Module,
......@@ -105,6 +107,7 @@ type RunParams struct {
AuthCli *authorizationv1.AuthorizationV1Client
DaemonClientBuilder *chaosdaemon.ChaosDaemonClientBuilder
MetricsCollector *metrics.ChaosControllerManagerMetricsCollector
CtrlServer *handler.Server
Objs []types.Object `group:"objs"`
WebhookObjs []types.WebhookObject `group:"webhookObjs"`
......@@ -181,7 +184,7 @@ func Run(params RunParams) error {
go func() {
mutex := http.NewServeMux()
mutex.Handle("/", playground.Handler("GraphQL playground", "/query"))
mutex.Handle("/query", ctrlserver.Handler(params.Logger, mgr.GetClient(), params.Clientset, params.DaemonClientBuilder))
mutex.Handle("/query", params.CtrlServer)
setupLog.Info("setup ctrlserver", "addr", ccfg.ControllerCfg.CtrlAddr)
setupLog.Error(http.ListenAndServe(ccfg.ControllerCfg.CtrlAddr, mutex), "unable to start ctrlserver")
}()
......
// Copyright 2021 Chaos Mesh Authors.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
//
package cmd
import (
"context"
"fmt"
"strings"
"github.com/go-logr/logr"
"github.com/pkg/errors"
"github.com/spf13/cobra"
"github.com/chaos-mesh/chaos-mesh/api/v1alpha1"
"github.com/chaos-mesh/chaos-mesh/pkg/chaosctl/common"
"github.com/chaos-mesh/chaos-mesh/pkg/chaosctl/recover"
ctrlclient "github.com/chaos-mesh/chaos-mesh/pkg/ctrl/client"
"github.com/chaos-mesh/chaos-mesh/pkg/label"
)
type RecoverOptions struct {
namespace string
labels *[]string
}
func NewRecoverCommand(logger logr.Logger, builders map[string]recover.RecoverBuilder) (*cobra.Command, error) {
o := &RecoverOptions{namespace: "default"}
recoverCmd := &cobra.Command{
Use: `recover (CHAOSTYPE) POD[,POD[,POD...]] [-n NAMESPACE]`,
Short: `Recover certain chaos from certain pods`,
Long: `Recover certain chaos from certain pods.
Currently unimplemented.
Examples:
# Recover network chaos from pods in namespace default
chaosctl recover networkchaos
# Recover network chaos from certain pods in certain namespace
chaosctl recover networkchaos pod1 pod2 pod3 -n NAMESPACE
# Recover network chaos from pods with label key=value
chaosctl recover networkchaos -l key=value`,
ValidArgsFunction: noCompletions,
}
for chaosType, builder := range builders {
recoverCmd.AddCommand(recoverResourceCommand(o, chaosType, builder))
}
recoverCmd.PersistentFlags().StringVarP(&o.namespace, "namespace", "n", "default", "namespace to find pods")
o.labels = recoverCmd.PersistentFlags().StringSliceP("label", "l", nil, "labels to select pods")
err := recoverCmd.RegisterFlagCompletionFunc("namespace", func(cmd *cobra.Command, args []string, toComplete string) ([]string, cobra.ShellCompDirective) {
client, cancel, err := common.CreateClient(context.TODO(), managerNamespace, managerSvc)
if err != nil {
logger.Error(err, "create client")
return nil, cobra.ShellCompDirectiveNoFileComp
}
defer cancel()
completion, err := client.ListNamespace(context.TODO())
if err != nil {
logger.Error(err, "complete resource")
return nil, cobra.ShellCompDirectiveNoFileComp
}
return completion, cobra.ShellCompDirectiveNoFileComp | cobra.ShellCompDirectiveNoSpace
})
if err != nil {
return nil, errors.Wrap(err, "register completion func for flag `namespace`")
}
err = recoverCmd.RegisterFlagCompletionFunc("label", func(cmd *cobra.Command, args []string, toComplete string) ([]string, cobra.ShellCompDirective) {
return nil, cobra.ShellCompDirectiveNoFileComp
})
if err != nil {
return nil, errors.Wrap(err, "register completion func for flag `label`")
}
return recoverCmd, nil
}
func recoverResourceCommand(option *RecoverOptions, chaosType string, builder recover.RecoverBuilder) *cobra.Command {
return &cobra.Command{
Use: fmt.Sprintf(`%s POD[,POD[,POD...]] [-n NAMESPACE]`, chaosType),
Short: fmt.Sprintf(`Recover %s from certain pods`, chaosType),
Long: fmt.Sprintf(`Recover %s from certain pods`, chaosType),
RunE: func(cmd *cobra.Command, args []string) error {
client, cancel, err := common.CreateClient(context.TODO(), managerNamespace, managerSvc)
if err != nil {
return err
}
defer cancel()
return option.Run(builder(client), client, args)
},
SilenceErrors: true,
SilenceUsage: true,
ValidArgsFunction: func(cmd *cobra.Command, args []string, toComplete string) ([]string, cobra.ShellCompDirective) {
if len(args) != 0 {
return []string{}, cobra.ShellCompDirectiveNoFileComp
}
client, cancel, err := common.CreateClient(context.TODO(), managerNamespace, managerSvc)
if err != nil {
common.PrettyPrint(errors.Wrap(err, "create client").Error(), 0, common.Red)
return nil, cobra.ShellCompDirectiveNoFileComp
}
defer cancel()
return option.List(client)
},
}
}
// Run recover
func (o *RecoverOptions) Run(recover recover.Recover, client *ctrlclient.CtrlClient, args []string) error {
pods, err := o.selectPods(client, args)
if err != nil {
return err
}
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
for _, pod := range pods {
err = recover.Recover(ctx, pod)
if err != nil {
return err
}
}
return nil
}
// List pods to recover
func (o *RecoverOptions) List(client *ctrlclient.CtrlClient) ([]string, cobra.ShellCompDirective) {
pods, err := o.selectPods(client, []string{})
if err != nil {
common.PrettyPrint(errors.Wrap(err, "select pods").Error(), 0, common.Red)
return nil, cobra.ShellCompDirectiveNoFileComp
}
var names []string
for _, pod := range pods {
names = append(names, pod.Name)
}
return names, cobra.ShellCompDirectiveNoFileComp
}
func (o *RecoverOptions) selectPods(client *ctrlclient.CtrlClient, names []string) ([]*recover.PartialPod, error) {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
selector := v1alpha1.PodSelectorSpec{
GenericSelectorSpec: v1alpha1.GenericSelectorSpec{
Namespaces: []string{o.namespace},
},
}
if len(names) != 0 {
selector.Pods = map[string][]string{o.namespace: names}
}
if o.labels != nil && len(*o.labels) > 0 {
labels, err := label.ParseLabel(strings.Join(*o.labels, ","))
if err != nil {
return nil, errors.Wrap(err, "parse labels")
}
if len(labels) != 0 {
selector.LabelSelectors = labels
}
}
return recover.SelectPods(ctx, client, selector)
}
......@@ -22,6 +22,7 @@ import (
cm "github.com/chaos-mesh/chaos-mesh/pkg/chaosctl/common"
"github.com/chaos-mesh/chaos-mesh/pkg/chaosctl/debug"
"github.com/chaos-mesh/chaos-mesh/pkg/chaosctl/recover"
)
var managerNamespace, managerSvc string
......@@ -96,6 +97,19 @@ func Execute() {
os.Exit(1)
}
// TODO: register recovers
recoverCommand, err := NewRecoverCommand(rootLogger.WithName("cmd-recover"), map[string]recover.RecoverBuilder{
networkChaos: recover.NewNoopRecover,
ioChaos: recover.NewNoopRecover,
httpChaos: recover.NewNoopRecover,
stressChaos: recover.NewNoopRecover,
})
if err != nil {
cm.PrettyPrint("failed to initialize cmd: ", 0, cm.Red)
cm.PrettyPrint("recover command: "+err.Error(), 1, cm.Red)
os.Exit(1)
}
physicalMachineCommand, err := NewPhysicalMachineCommand()
if err != nil {
cm.PrettyPrint("failed to initialize cmd: ", 0, cm.Red)
......@@ -104,6 +118,7 @@ func Execute() {
}
rootCmd.AddCommand(debugCommand)
rootCmd.AddCommand(recoverCommand)
rootCmd.AddCommand(completionCmd)
rootCmd.AddCommand(forwardCmd)
rootCmd.AddCommand(physicalMachineCommand)
......
......@@ -55,7 +55,7 @@ func (d *httpDebugger) Collect(ctx context.Context, namespace, chaosName string)
Name string
Spec *v1alpha1.PodHttpChaosSpec
Pod struct {
Iptables string
Iptables []string
Processes []struct {
Pid string
Command string
......@@ -93,7 +93,7 @@ func (d *httpDebugger) Collect(ctx context.Context, namespace, chaosName string)
Name: string(podhttpchaos.Name),
}
podResult.Items = append(podResult.Items, common.ItemResult{Name: "iptables list", Value: string(podhttpchaos.Pod.Iptables)})
podResult.Items = append(podResult.Items, common.ItemResult{Name: "iptables list", Value: strings.Join(podhttpchaos.Pod.Iptables, "\n")})
for _, process := range podhttpchaos.Pod.Processes {
var fds []string
for _, fd := range process.Fds {
......
......@@ -17,6 +17,7 @@ package debug
import (
"context"
"strings"
"github.com/hasura/go-graphql-client"
......@@ -54,8 +55,8 @@ func (d *networkDebugger) Collect(ctx context.Context, namespace, chaosName stri
Name string
Pod struct {
Ipset string
TcQdisc string
Iptables string
TcQdisc []string
Iptables []string
}
}
} `graphql:"networkchaos(name: $name)"`
......@@ -87,8 +88,8 @@ func (d *networkDebugger) Collect(ctx context.Context, namespace, chaosName stri
}
podResult.Items = append(podResult.Items, common.ItemResult{Name: "ipset list", Value: podNetworkChaos.Pod.Ipset})
podResult.Items = append(podResult.Items, common.ItemResult{Name: "tc qdisc list", Value: podNetworkChaos.Pod.TcQdisc})
podResult.Items = append(podResult.Items, common.ItemResult{Name: "iptables list", Value: podNetworkChaos.Pod.Iptables})
podResult.Items = append(podResult.Items, common.ItemResult{Name: "tc qdisc list", Value: strings.Join(podNetworkChaos.Pod.TcQdisc, "\n")})
podResult.Items = append(podResult.Items, common.ItemResult{Name: "iptables list", Value: strings.Join(podNetworkChaos.Pod.Iptables, "\n")})
output, err := common.MarshalChaos(podNetworkChaos.Spec)
if err != nil {
return nil, err
......
// Copyright 2021 Chaos Mesh Authors.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
//
package recover
import (
"context"
ctrlclient "github.com/chaos-mesh/chaos-mesh/pkg/ctrl/client"
)
// PartialPod is a subset of the Pod type.
// It contains necessary information for forced recovery.
type PartialPod struct {
Namespace string
Name string
Processes []struct {
Pid, Command string
}
TcQdisc []string
Iptables []string
}
type Recover interface {
// Recover target pod forcedly
Recover(ctx context.Context, pod *PartialPod) error
}
type RecoverBuilder func(client *ctrlclient.CtrlClient) Recover
type noopRecover struct{}
func NewNoopRecover(client *ctrlclient.CtrlClient) Recover {
return &noopRecover{}
}
func (r *noopRecover) Recover(ctx context.Context, pod *PartialPod) error {
return nil
}
// Copyright 2021 Chaos Mesh Authors.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
//
package recover
import (
"context"
"github.com/pkg/errors"
"github.com/chaos-mesh/chaos-mesh/api/v1alpha1"
ctrlclient "github.com/chaos-mesh/chaos-mesh/pkg/ctrl/client"
"github.com/chaos-mesh/chaos-mesh/pkg/ctrl/server/model"
)
func SelectPods(ctx context.Context, client *ctrlclient.CtrlClient, selector v1alpha1.PodSelectorSpec) ([]*PartialPod, error) {
selectInput := model.PodSelectorInput{
Pods: map[string]interface{}{},
NodeSelectors: map[string]interface{}{},
Nodes: selector.Nodes,
PodPhaseSelectors: selector.PodPhaseSelectors,
Namespaces: selector.Namespaces,
FieldSelectors: map[string]interface{}{},
LabelSelectors: map[string]interface{}{},
AnnotationSelectors: map[string]interface{}{},
}
for k, v := range selector.Pods {
selectInput.Pods[k] = v
}
for k, v := range selector.NodeSelectors {
selectInput.NodeSelectors[k] = v
}
for k, v := range selector.FieldSelectors {
selectInput.FieldSelectors[k] = v
}
for k, v := range selector.LabelSelectors {
selectInput.LabelSelectors[k] = v
}
for k, v := range selector.AnnotationSelectors {
selectInput.AnnotationSelectors[k] = v
}
podsQuery := new(struct {
Pods []*PartialPod `graphql:"pods(selector: $selector)"`
})
err := client.QueryClient.Query(ctx, podsQuery, map[string]interface{}{"selector": selectInput})
if err != nil {
return nil, errors.Wrapf(err, "select pods with selector: %+v", selector)
}
return podsQuery.Pods, nil
}
......@@ -16,10 +16,9 @@
package ctrl
import (
"net/http"
"github.com/99designs/gqlgen/graphql/handler"
"github.com/go-logr/logr"
"go.uber.org/fx"
"k8s.io/client-go/kubernetes"
"sigs.k8s.io/controller-runtime/pkg/client"
......@@ -28,12 +27,23 @@ import (
"github.com/chaos-mesh/chaos-mesh/pkg/ctrl/server/generated"
)
func Handler(logger logr.Logger, client client.Client, clientset *kubernetes.Clientset, daemonClientBuilder *chaosdaemon.ChaosDaemonClientBuilder) http.Handler {
type ServerParams struct {
fx.In
NoCacheReader client.Reader `name:"no-cache"`
Logger logr.Logger
Client client.Client
Clientset *kubernetes.Clientset
DaemonClientBuilder *chaosdaemon.ChaosDaemonClientBuilder
}
func New(param ServerParams) *handler.Server {
resolvers := &server.Resolver{
DaemonHelper: &server.DaemonHelper{Builder: daemonClientBuilder},
Log: logger,
Client: client,
Clientset: clientset,
DaemonHelper: &server.DaemonHelper{Builder: param.DaemonClientBuilder},
Log: param.Logger.WithName("ctrl-server"),
Client: param.Client,
Clientset: param.Clientset,
NoCacheReader: param.NoCacheReader,
}
return handler.NewDefaultServer(generated.NewExecutableSchema(generated.Config{Resolvers: resolvers}))
}
......@@ -630,6 +630,7 @@ type ComplexityRoot struct {
 
Query struct {
Namespace func(childComplexity int, ns *string) int
Pods func(childComplexity int, selector model.PodSelectorInput) int
}
 
RawIPSet struct {
......@@ -866,8 +867,8 @@ type PodResolver interface {
Processes(ctx context.Context, obj *v1.Pod) ([]*model.Process, error)
Mounts(ctx context.Context, obj *v1.Pod) ([]string, error)
Ipset(ctx context.Context, obj *v1.Pod) (string, error)
TcQdisc(ctx context.Context, obj *v1.Pod) (string, error)
Iptables(ctx context.Context, obj *v1.Pod) (string, error)
TcQdisc(ctx context.Context, obj *v1.Pod) ([]string, error)
Iptables(ctx context.Context, obj *v1.Pod) ([]string, error)
}
type PodConditionResolver interface {
Type(ctx context.Context, obj *v1.PodCondition) (string, error)
......@@ -944,6 +945,7 @@ type ProcessResolver interface {
}
type QueryResolver interface {
Namespace(ctx context.Context, ns *string) ([]*model.Namespace, error)
Pods(ctx context.Context, selector model.PodSelectorInput) ([]*v1.Pod, error)
}
type RawIptablesResolver interface {
Direction(ctx context.Context, obj *v1alpha1.RawIptables) (string, error)
......@@ -3679,6 +3681,18 @@ func (e *executableSchema) Complexity(typeName, field string, childComplexity in
 
return e.complexity.Query.Namespace(childComplexity, args["ns"].(*string)), true
 
case "Query.pods":
if e.complexity.Query.Pods == nil {
break
}
args, err := ec.field_Query_pods_args(context.TODO(), rawArgs)
if err != nil {
return 0, false
}
return e.complexity.Query.Pods(childComplexity, args["selector"].(model.PodSelectorInput)), true
case "RawIPSet.cidrs":
if e.complexity.RawIPSet.Cidrs == nil {
break
......@@ -4174,6 +4188,7 @@ schema {
 
type Query {
namespace(ns: String): [Namespace!]
pods(selector: PodSelectorInput!): [Pod!]
}
 
type Logger {
......@@ -4222,6 +4237,42 @@ type Fd {
target: String!
}
 
# PodSelectorInput defines the some selectors to select objects.
# If the all selectors are empty, all objects will be used in chaos experiment.
input PodSelectorInput {
# namespaces is a set of namespace to which objects belong.
namespaces: [String!]
# nodes is a set of node name and objects must belong to these nodes.
nodes: [String!]
# pods is a map of string keys and a set values that used to select pods.
# The key defines the namespace which pods belong,
# and the each values is a set of pod names.
pods: Map
# map of string keys and values that can be used to select nodes.
# Selector which must match a node's labels,
# and objects must belong to these selected nodes.
nodeSelectors: Map
# map of string keys and values that can be used to select objects.
# A selector based on fields.
fieldSelectors: Map
# map of string keys and values that can be used to select objects.
# A selector based on labels.
labelSelectors: Map
# map of string keys and values that can be used to select objects.
# A selector based on annotations.
annotationSelectors: Map
# podPhaseSelectors is a set of condition of a pod at the current time.
# supported value: Pending / Running / Succeeded / Failed / Unknown
podPhaseSelectors: [String!]
}
type Pod @goModel(model: "k8s.io/api/core/v1.Pod") {
kind: String!
apiVersion: String!
......@@ -4249,8 +4300,8 @@ type Pod @goModel(model: "k8s.io/api/core/v1.Pod") {
processes: [Process!] @goField(forceResolver: true)
mounts: [String!] @goField(forceResolver: true)
ipset: String! @goField(forceResolver: true)
tcQdisc: String! @goField(forceResolver: true)
iptables: String! @goField(forceResolver: true)
tcQdisc: [String!] @goField(forceResolver: true)
iptables: [String!] @goField(forceResolver: true)
}
 
# PodStatus represents information about the status of a pod. Status may trail the actual
......@@ -5534,6 +5585,21 @@ func (ec *executionContext) field_Query_namespace_args(ctx context.Context, rawA
return args, nil
}
 
func (ec *executionContext) field_Query_pods_args(ctx context.Context, rawArgs map[string]interface{}) (map[string]interface{}, error) {
var err error
args := map[string]interface{}{}
var arg0 model.PodSelectorInput
if tmp, ok := rawArgs["selector"]; ok {
ctx := graphql.WithPathContext(ctx, graphql.NewPathWithField("selector"))
arg0, err = ec.unmarshalNPodSelectorInput2githubᚗcomᚋchaosᚑmeshᚋchaosᚑmeshᚋpkgᚋctrlᚋserverᚋmodelᚐPodSelectorInput(ctx, tmp)
if err != nil {
return nil, err
}
}
args["selector"] = arg0
return args, nil
}
func (ec *executionContext) field___Type_enumValues_args(ctx context.Context, rawArgs map[string]interface{}) (map[string]interface{}, error) {
var err error
args := map[string]interface{}{}
......@@ -13342,14 +13408,11 @@ func (ec *executionContext) _Pod_tcQdisc(ctx context.Context, field graphql.Coll
return graphql.Null
}
if resTmp == nil {
if !graphql.HasFieldError(ctx, fc) {
ec.Errorf(ctx, "must not be null")
}
return graphql.Null
}
res := resTmp.(string)
res := resTmp.([]string)
fc.Result = res
return ec.marshalNString2string(ctx, field.Selections, res)
return ec.marshalOString2string(ctx, field.Selections, res)
}
 
func (ec *executionContext) _Pod_iptables(ctx context.Context, field graphql.CollectedField, obj *v1.Pod) (ret graphql.Marshaler) {
......@@ -13377,14 +13440,11 @@ func (ec *executionContext) _Pod_iptables(ctx context.Context, field graphql.Col
return graphql.Null
}
if resTmp == nil {
if !graphql.HasFieldError(ctx, fc) {
ec.Errorf(ctx, "must not be null")
}
return graphql.Null
}
res := resTmp.(string)
res := resTmp.([]string)
fc.Result = res
return ec.marshalNString2string(ctx, field.Selections, res)
return ec.marshalOString2string(ctx, field.Selections, res)
}
 
func (ec *executionContext) _PodCondition_type(ctx context.Context, field graphql.CollectedField, obj *v1.PodCondition) (ret graphql.Marshaler) {
......@@ -18218,6 +18278,45 @@ func (ec *executionContext) _Query_namespace(ctx context.Context, field graphql.
return ec.marshalONamespace2ᚕᚖgithubᚗcomᚋchaosᚑmeshᚋchaosᚑmeshᚋpkgᚋctrlᚋserverᚋmodelᚐNamespaceᚄ(ctx, field.Selections, res)
}
 
func (ec *executionContext) _Query_pods(ctx context.Context, field graphql.CollectedField) (ret graphql.Marshaler) {
defer func() {
if r := recover(); r != nil {
ec.Error(ctx, ec.Recover(ctx, r))
ret = graphql.Null
}
}()
fc := &graphql.FieldContext{
Object: "Query",
Field: field,
Args: nil,
IsMethod: true,
IsResolver: true,
}
ctx = graphql.WithFieldContext(ctx, fc)
rawArgs := field.ArgumentMap(ec.Variables)
args, err := ec.field_Query_pods_args(ctx, rawArgs)
if err != nil {
ec.Error(ctx, err)
return graphql.Null
}
fc.Args = args
resTmp, err := ec.ResolverMiddleware(ctx, func(rctx context.Context) (interface{}, error) {
ctx = rctx // use context from middleware stack in children
return ec.resolvers.Query().Pods(rctx, args["selector"].(model.PodSelectorInput))
})
if err != nil {
ec.Error(ctx, err)
return graphql.Null
}
if resTmp == nil {
return graphql.Null
}
res := resTmp.([]*v1.Pod)
fc.Result = res
return ec.marshalOPod2ᚕᚖk8sᚗioᚋapiᚋcoreᚋv1ᚐPodᚄ(ctx, field.Selections, res)
}
func (ec *executionContext) _Query___type(ctx context.Context, field graphql.CollectedField) (ret graphql.Marshaler) {
defer func() {
if r := recover(); r != nil {
......@@ -21351,6 +21450,85 @@ func (ec *executionContext) ___Type_specifiedByURL(ctx context.Context, field gr
 
// region **************************** input.gotpl *****************************
 
func (ec *executionContext) unmarshalInputPodSelectorInput(ctx context.Context, obj interface{}) (model.PodSelectorInput, error) {
var it model.PodSelectorInput
asMap := map[string]interface{}{}
for k, v := range obj.(map[string]interface{}) {
asMap[k] = v
}
for k, v := range asMap {
switch k {
case "namespaces":
var err error
ctx := graphql.WithPathContext(ctx, graphql.NewPathWithField("namespaces"))
it.Namespaces, err = ec.unmarshalOString2ᚕstringᚄ(ctx, v)
if err != nil {
return it, err
}
case "nodes":
var err error
ctx := graphql.WithPathContext(ctx, graphql.NewPathWithField("nodes"))
it.Nodes, err = ec.unmarshalOString2ᚕstringᚄ(ctx, v)
if err != nil {
return it, err
}
case "pods":
var err error
ctx := graphql.WithPathContext(ctx, graphql.NewPathWithField("pods"))
it.Pods, err = ec.unmarshalOMap2map(ctx, v)
if err != nil {
return it, err
}
case "nodeSelectors":
var err error
ctx := graphql.WithPathContext(ctx, graphql.NewPathWithField("nodeSelectors"))
it.NodeSelectors, err = ec.unmarshalOMap2map(ctx, v)
if err != nil {
return it, err
}
case "fieldSelectors":
var err error
ctx := graphql.WithPathContext(ctx, graphql.NewPathWithField("fieldSelectors"))
it.FieldSelectors, err = ec.unmarshalOMap2map(ctx, v)
if err != nil {
return it, err
}
case "labelSelectors":
var err error
ctx := graphql.WithPathContext(ctx, graphql.NewPathWithField("labelSelectors"))
it.LabelSelectors, err = ec.unmarshalOMap2map(ctx, v)
if err != nil {
return it, err
}
case "annotationSelectors":
var err error
ctx := graphql.WithPathContext(ctx, graphql.NewPathWithField("annotationSelectors"))
it.AnnotationSelectors, err = ec.unmarshalOMap2map(ctx, v)
if err != nil {
return it, err
}
case "podPhaseSelectors":
var err error
ctx := graphql.WithPathContext(ctx, graphql.NewPathWithField("podPhaseSelectors"))
it.PodPhaseSelectors, err = ec.unmarshalOString2ᚕstringᚄ(ctx, v)
if err != nil {
return it, err
}
}
}
return it, nil
}
// endregion **************************** input.gotpl *****************************
 
// region ************************** interface.gotpl ***************************
......@@ -24790,9 +24968,6 @@ func (ec *executionContext) _Pod(ctx context.Context, sel ast.SelectionSet, obj
}
}()
res = ec._Pod_tcQdisc(ctx, field, obj)
if res == graphql.Null {
atomic.AddUint32(&invalids, 1)
}
return res
}
 
......@@ -24810,9 +24985,6 @@ func (ec *executionContext) _Pod(ctx context.Context, sel ast.SelectionSet, obj
}
}()
res = ec._Pod_iptables(ctx, field, obj)
if res == graphql.Null {
atomic.AddUint32(&invalids, 1)
}
return res
}
 
......@@ -26967,6 +27139,26 @@ func (ec *executionContext) _Query(ctx context.Context, sel ast.SelectionSet) gr
return ec.OperationContext.RootResolverMiddleware(ctx, innerFunc)
}
 
out.Concurrently(i, func() graphql.Marshaler {
return rrm(innerCtx)
})
case "pods":
field := field
innerFunc := func(ctx context.Context) (res graphql.Marshaler) {
defer func() {
if r := recover(); r != nil {
ec.Error(ctx, ec.Recover(ctx, r))
}
}()
res = ec._Query_pods(ctx, field)
return res
}
rrm := func(ctx context.Context) graphql.Marshaler {
return ec.OperationContext.RootResolverMiddleware(ctx, innerFunc)
}
out.Concurrently(i, func() graphql.Marshaler {
return rrm(innerCtx)
})
......@@ -28531,6 +28723,11 @@ func (ec *executionContext) marshalNPodNetworkChaosStatus2githubᚗcomᚋchaos
return ec._PodNetworkChaosStatus(ctx, sel, &v)
}
 
func (ec *executionContext) unmarshalNPodSelectorInput2githubᚗcomᚋchaosᚑmeshᚋchaosᚑmeshᚋpkgᚋctrlᚋserverᚋmodelᚐPodSelectorInput(ctx context.Context, v interface{}) (model.PodSelectorInput, error) {
res, err := ec.unmarshalInputPodSelectorInput(ctx, v)
return res, graphql.ErrorOnPath(ctx, err)
}
func (ec *executionContext) marshalNPodSelectorSpec2githubᚗcomᚋchaosᚑmeshᚋchaosᚑmeshᚋapiᚋv1alpha1ᚐPodSelectorSpec(ctx context.Context, sel ast.SelectionSet, v v1alpha1.PodSelectorSpec) graphql.Marshaler {
return ec._PodSelectorSpec(ctx, sel, &v)
}
......
......@@ -45,6 +45,17 @@ type Namespace struct {
Podnetworkchaos []*v1alpha1.PodNetworkChaos `json:"podnetworkchaos"`
}
type PodSelectorInput struct {
Namespaces []string `json:"namespaces"`
Nodes []string `json:"nodes"`
Pods map[string]interface{} `json:"pods"`
NodeSelectors map[string]interface{} `json:"nodeSelectors"`
FieldSelectors map[string]interface{} `json:"fieldSelectors"`
LabelSelectors map[string]interface{} `json:"labelSelectors"`
AnnotationSelectors map[string]interface{} `json:"annotationSelectors"`
PodPhaseSelectors []string `json:"podPhaseSelectors"`
}
type PodStressChaos struct {
StressChaos *v1alpha1.StressChaos `json:"stressChaos"`
Pod *v1.Pod `json:"pod"`
......
......@@ -17,7 +17,9 @@ package server
import (
"context"
"strings"
"github.com/pingcap/errors"
v1 "k8s.io/api/core/v1"
"github.com/chaos-mesh/chaos-mesh/pkg/bpm"
......@@ -30,13 +32,49 @@ func (r *Resolver) GetIpset(ctx context.Context, obj *v1.Pod) (string, error) {
}
// GetIpset returns result of tc qdisc list
func (r *Resolver) GetTcQdisc(ctx context.Context, obj *v1.Pod) (string, error) {
func (r *Resolver) GetTcQdisc(ctx context.Context, obj *v1.Pod) ([]string, error) {
cmd := "tc qdisc list"
return r.ExecBypass(ctx, obj, cmd, bpm.PidNS, bpm.NetNS)
rules, err := r.ExecBypass(ctx, obj, cmd, bpm.PidNS, bpm.NetNS)
if err != nil {
return nil, errors.Wrapf(err, "exec `%s`", cmd)
}
return strings.Split(rules, "\n"), nil
}
// GetIptables returns result of iptables --list
func (r *Resolver) GetIptables(ctx context.Context, obj *v1.Pod) (string, error) {
func (r *Resolver) GetIptables(ctx context.Context, obj *v1.Pod) ([]string, error) {
cmd := "iptables --list"
return r.ExecBypass(ctx, obj, cmd, bpm.PidNS, bpm.NetNS)
rules, err := r.ExecBypass(ctx, obj, cmd, bpm.PidNS, bpm.NetNS)
if err != nil {
return nil, errors.Wrapf(err, "exec `%s`", cmd)
}
return strings.Split(rules, "\n"), nil
}
// cleanTcs returns actually cleaned devices
func (r *Resolver) cleanTcs(ctx context.Context, obj *v1.Pod, devices []string) ([]string, error) {
var cleaned []string
for _, device := range devices {
cmd := "tc qdisc del dev " + device + " root"
_, err := r.ExecBypass(ctx, obj, cmd, bpm.PidNS, bpm.NetNS)
if err != nil {
return cleaned, errors.Wrapf(err, "exec `%s`", cmd)
}
cleaned = append(cleaned, device)
}
return cleaned, nil
}
// cleanIptables returns actually cleaned chains
func (r *Resolver) cleanIptables(ctx context.Context, obj *v1.Pod, chains []string) ([]string, error) {
var cleaned []string
for _, chain := range chains {
cmd := "iptables -F " + chain
_, err := r.ExecBypass(ctx, obj, cmd, bpm.PidNS, bpm.NetNS)
if err != nil {
return cleaned, errors.Wrapf(err, "exec `%s`", cmd)
}
cleaned = append(cleaned, chain)
}
return cleaned, nil
}
......@@ -29,7 +29,8 @@ import (
type Resolver struct {
*DaemonHelper
Log logr.Logger
Client client.Client
Clientset *kubernetes.Clientset
Log logr.Logger
Client client.Client
Clientset *kubernetes.Clientset
NoCacheReader client.Reader
}
......@@ -34,6 +34,7 @@ schema {
type Query {
namespace(ns: String): [Namespace!]
pods(selector: PodSelectorInput!): [Pod!]
}
type Logger {
......@@ -82,6 +83,42 @@ type Fd {
target: String!
}
# PodSelectorInput defines the some selectors to select objects.
# If the all selectors are empty, all objects will be used in chaos experiment.
input PodSelectorInput {
# namespaces is a set of namespace to which objects belong.
namespaces: [String!]
# nodes is a set of node name and objects must belong to these nodes.
nodes: [String!]
# pods is a map of string keys and a set values that used to select pods.
# The key defines the namespace which pods belong,
# and the each values is a set of pod names.
pods: Map
# map of string keys and values that can be used to select nodes.
# Selector which must match a node's labels,
# and objects must belong to these selected nodes.
nodeSelectors: Map
# map of string keys and values that can be used to select objects.
# A selector based on fields.
fieldSelectors: Map
# map of string keys and values that can be used to select objects.
# A selector based on labels.
labelSelectors: Map
# map of string keys and values that can be used to select objects.
# A selector based on annotations.
annotationSelectors: Map
# podPhaseSelectors is a set of condition of a pod at the current time.
# supported value: Pending / Running / Succeeded / Failed / Unknown
podPhaseSelectors: [String!]
}
type Pod @goModel(model: "k8s.io/api/core/v1.Pod") {
kind: String!
apiVersion: String!
......@@ -109,8 +146,8 @@ type Pod @goModel(model: "k8s.io/api/core/v1.Pod") {
processes: [Process!] @goField(forceResolver: true)
mounts: [String!] @goField(forceResolver: true)
ipset: String! @goField(forceResolver: true)
tcQdisc: String! @goField(forceResolver: true)
iptables: String! @goField(forceResolver: true)
tcQdisc: [String!] @goField(forceResolver: true)
iptables: [String!] @goField(forceResolver: true)
}
# PodStatus represents information about the status of a pod. Status may trail the actual
......
......@@ -7,6 +7,7 @@ import (
"bufio"
"context"
"encoding/json"
"errors"
"fmt"
"io"
"time"
......@@ -19,6 +20,7 @@ import (
"github.com/chaos-mesh/chaos-mesh/api/v1alpha1"
"github.com/chaos-mesh/chaos-mesh/pkg/ctrl/server/generated"
"github.com/chaos-mesh/chaos-mesh/pkg/ctrl/server/model"
podSelector "github.com/chaos-mesh/chaos-mesh/pkg/selector/pod"
)
func (r *attrOverrideSpecResolver) Ino(ctx context.Context, obj *v1alpha1.AttrOverrideSpec) (*int, error) {
......@@ -765,11 +767,11 @@ func (r *podResolver) Ipset(ctx context.Context, obj *v1.Pod) (string, error) {
return r.GetIpset(ctx, obj)
}
func (r *podResolver) TcQdisc(ctx context.Context, obj *v1.Pod) (string, error) {
func (r *podResolver) TcQdisc(ctx context.Context, obj *v1.Pod) ([]string, error) {
return r.GetTcQdisc(ctx, obj)
}
func (r *podResolver) Iptables(ctx context.Context, obj *v1.Pod) (string, error) {
func (r *podResolver) Iptables(ctx context.Context, obj *v1.Pod) ([]string, error) {
return r.GetIptables(ctx, obj)
}
......@@ -1062,6 +1064,73 @@ func (r *queryResolver) Namespace(ctx context.Context, ns *string) ([]*model.Nam
return []*model.Namespace{{Ns: *ns}}, nil
}
func (r *queryResolver) Pods(ctx context.Context, selector model.PodSelectorInput) ([]*v1.Pod, error) {
spec := v1alpha1.PodSelectorSpec{
Pods: map[string][]string{},
NodeSelectors: map[string]string{},
Nodes: selector.Nodes,
PodPhaseSelectors: selector.PodPhaseSelectors,
GenericSelectorSpec: v1alpha1.GenericSelectorSpec{
Namespaces: selector.Namespaces,
FieldSelectors: map[string]string{},
LabelSelectors: map[string]string{},
AnnotationSelectors: map[string]string{},
},
}
for ns, p := range selector.Pods {
if pod, ok := p.(string); ok {
spec.Pods[ns] = []string{pod}
}
if pods, ok := p.([]string); ok {
spec.Pods[ns] = pods
}
}
for k, s := range selector.NodeSelectors {
if selector, ok := s.(string); ok {
spec.NodeSelectors[k] = selector
}
}
for k, s := range selector.FieldSelectors {
if selector, ok := s.(string); ok {
spec.FieldSelectors[k] = selector
}
}
for k, s := range selector.LabelSelectors {
if selector, ok := s.(string); ok {
spec.LabelSelectors[k] = selector
}
}
for k, s := range selector.AnnotationSelectors {
if selector, ok := s.(string); ok {
spec.AnnotationSelectors[k] = selector
}
}
selectImpl := podSelector.New(podSelector.Params{
Client: r.Client,
Reader: r.Client,
})
pods, err := selectImpl.Select(ctx, &v1alpha1.PodSelector{Selector: spec, Mode: v1alpha1.AllMode})
if err != nil {
if errors.Is(err, podSelector.ErrNoPodSelected) {
return nil, nil
}
return nil, err
}
var list []*v1.Pod
for _, pod := range pods {
p := pod
list = append(list, &p.Pod)
}
return list, nil
}
func (r *rawIptablesResolver) Direction(ctx context.Context, obj *v1alpha1.RawIptables) (string, error) {
return string(obj.Direction), nil
}
......
......@@ -37,6 +37,8 @@ import (
"github.com/chaos-mesh/chaos-mesh/pkg/selector/generic/registry"
)
var ErrNoPodSelected = errors.New("no pod is selected")
type SelectImpl struct {
c client.Client
r client.Reader
......@@ -114,8 +116,7 @@ func SelectAndFilterPods(ctx context.Context, c client.Client, r client.Reader,
}
if len(pods) == 0 {
err = errors.New("no pod is selected")
return nil, err
return nil, ErrNoPodSelected
}
filteredPod, err := filterPodsByMode(pods, mode, value)
......
Supports Markdown
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment