Unverified Commit ffc2b498 authored by Snow Pettersen's avatar Snow Pettersen Committed by GitHub
Browse files

experimentation: add configurable termination service (#1211)

Wires up the termination framework as a configurable service that allows configuring an arbitrary set of termination criteria,
including custom ones that can be registered by adding them to the terminator.CriteriaFactories map. This allows for
custom pluggable termination criteria that may rely on internal systems (e.g. check for pages using an internal library)
that can hook into the monitoring system provided by the termination framework.

Includes a single termination criteria as an example, which simply terminates experiments after they have run for a configured
amount.

Rejigs the monitoring class a bit to allow for per config type termination criteria
parent 423a7be3
Showing with 1579 additions and 70 deletions
+1579 -70
syntax = "proto3";
package clutch.config.service.chaos.experimentation.terminator.v1;
option go_package = "github.com/lyft/clutch/backend/api/config/service/chaos/experimentation/terminator/v1;terminatorv1";
import "validate/validate.proto";
import "google/protobuf/duration.proto";
import "google/protobuf/any.proto";
message Config {
message PerConfigTypeConfig {
// List of termination criteria to evaluate for each config type.
repeated google.protobuf.Any termination_criteria = 2 [ (validate.rules).repeated = {min_items : 1} ];
}
// Mapping from typeUrl of registered experiment type to its termination configuration.
map<string, PerConfigTypeConfig> per_config_type_configuration = 1;
// The interval at which the outer loop should poll for active experiments.
google.protobuf.Duration outer_loop_interval = 2 [ (validate.rules).duration.gt.seconds = 0 ];
// The interval at which the inner loop should evaluate the termination criteria for each monitored experiment.
// This should likely be less than outer_loop_interval as the checks should be relatively cheap.
google.protobuf.Duration per_experiment_check_interval = 3 [ (validate.rules).duration.gt.seconds = 0 ];
}
// Termination criterion that will terminate an experiment after a configured max duration. This is helpful in ensuring
// that there is an upper limit to how long experiments will run for.
message MaxTimeTerminationCriterion {
// The maximum duration experiments can run for before being terminated.
google.protobuf.Duration max_duration = 1 [ (validate.rules).duration.gt.seconds = 0 ];
}
// Code generated by protoc-gen-go. DO NOT EDIT.
// versions:
// protoc-gen-go v1.26.0
// protoc v3.14.0
// source: config/service/chaos/experimentation/terminator/v1/termination.proto
package terminatorv1
import (
_ "github.com/envoyproxy/protoc-gen-validate/validate"
protoreflect "google.golang.org/protobuf/reflect/protoreflect"
protoimpl "google.golang.org/protobuf/runtime/protoimpl"
anypb "google.golang.org/protobuf/types/known/anypb"
durationpb "google.golang.org/protobuf/types/known/durationpb"
reflect "reflect"
sync "sync"
)
const (
// Verify that this generated code is sufficiently up-to-date.
_ = protoimpl.EnforceVersion(20 - protoimpl.MinVersion)
// Verify that runtime/protoimpl is sufficiently up-to-date.
_ = protoimpl.EnforceVersion(protoimpl.MaxVersion - 20)
)
type Config struct {
state protoimpl.MessageState
sizeCache protoimpl.SizeCache
unknownFields protoimpl.UnknownFields
// Mapping from typeUrl of registered experiment type to its termination configuration.
PerConfigTypeConfiguration map[string]*Config_PerConfigTypeConfig `protobuf:"bytes,1,rep,name=per_config_type_configuration,json=perConfigTypeConfiguration,proto3" json:"per_config_type_configuration,omitempty" protobuf_key:"bytes,1,opt,name=key,proto3" protobuf_val:"bytes,2,opt,name=value,proto3"`
// The interval at which the outer loop should poll for active experiments.
OuterLoopInterval *durationpb.Duration `protobuf:"bytes,2,opt,name=outer_loop_interval,json=outerLoopInterval,proto3" json:"outer_loop_interval,omitempty"`
// The interval at which the inner loop should evaluate the termination criteria for each monitored experiment.
// This should likely be less than outer_loop_interval as the checks should be relatively cheap.
PerExperimentCheckInterval *durationpb.Duration `protobuf:"bytes,3,opt,name=per_experiment_check_interval,json=perExperimentCheckInterval,proto3" json:"per_experiment_check_interval,omitempty"`
}
func (x *Config) Reset() {
*x = Config{}
if protoimpl.UnsafeEnabled {
mi := &file_config_service_chaos_experimentation_terminator_v1_termination_proto_msgTypes[0]
ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x))
ms.StoreMessageInfo(mi)
}
}
func (x *Config) String() string {
return protoimpl.X.MessageStringOf(x)
}
func (*Config) ProtoMessage() {}
func (x *Config) ProtoReflect() protoreflect.Message {
mi := &file_config_service_chaos_experimentation_terminator_v1_termination_proto_msgTypes[0]
if protoimpl.UnsafeEnabled && x != nil {
ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x))
if ms.LoadMessageInfo() == nil {
ms.StoreMessageInfo(mi)
}
return ms
}
return mi.MessageOf(x)
}
// Deprecated: Use Config.ProtoReflect.Descriptor instead.
func (*Config) Descriptor() ([]byte, []int) {
return file_config_service_chaos_experimentation_terminator_v1_termination_proto_rawDescGZIP(), []int{0}
}
func (x *Config) GetPerConfigTypeConfiguration() map[string]*Config_PerConfigTypeConfig {
if x != nil {
return x.PerConfigTypeConfiguration
}
return nil
}
func (x *Config) GetOuterLoopInterval() *durationpb.Duration {
if x != nil {
return x.OuterLoopInterval
}
return nil
}
func (x *Config) GetPerExperimentCheckInterval() *durationpb.Duration {
if x != nil {
return x.PerExperimentCheckInterval
}
return nil
}
// Termination criterion that will terminate an experiment after a configured max duration. This is helpful in ensuring
// that there is an upper limit to how long experiments will run for.
type MaxTimeTerminationCriterion struct {
state protoimpl.MessageState
sizeCache protoimpl.SizeCache
unknownFields protoimpl.UnknownFields
// The maximum duration experiments can run for before being terminated.
MaxDuration *durationpb.Duration `protobuf:"bytes,1,opt,name=max_duration,json=maxDuration,proto3" json:"max_duration,omitempty"`
}
func (x *MaxTimeTerminationCriterion) Reset() {
*x = MaxTimeTerminationCriterion{}
if protoimpl.UnsafeEnabled {
mi := &file_config_service_chaos_experimentation_terminator_v1_termination_proto_msgTypes[1]
ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x))
ms.StoreMessageInfo(mi)
}
}
func (x *MaxTimeTerminationCriterion) String() string {
return protoimpl.X.MessageStringOf(x)
}
func (*MaxTimeTerminationCriterion) ProtoMessage() {}
func (x *MaxTimeTerminationCriterion) ProtoReflect() protoreflect.Message {
mi := &file_config_service_chaos_experimentation_terminator_v1_termination_proto_msgTypes[1]
if protoimpl.UnsafeEnabled && x != nil {
ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x))
if ms.LoadMessageInfo() == nil {
ms.StoreMessageInfo(mi)
}
return ms
}
return mi.MessageOf(x)
}
// Deprecated: Use MaxTimeTerminationCriterion.ProtoReflect.Descriptor instead.
func (*MaxTimeTerminationCriterion) Descriptor() ([]byte, []int) {
return file_config_service_chaos_experimentation_terminator_v1_termination_proto_rawDescGZIP(), []int{1}
}
func (x *MaxTimeTerminationCriterion) GetMaxDuration() *durationpb.Duration {
if x != nil {
return x.MaxDuration
}
return nil
}
type Config_PerConfigTypeConfig struct {
state protoimpl.MessageState
sizeCache protoimpl.SizeCache
unknownFields protoimpl.UnknownFields
// List of termination criteria to evaluate for each config type.
TerminationCriteria []*anypb.Any `protobuf:"bytes,2,rep,name=termination_criteria,json=terminationCriteria,proto3" json:"termination_criteria,omitempty"`
}
func (x *Config_PerConfigTypeConfig) Reset() {
*x = Config_PerConfigTypeConfig{}
if protoimpl.UnsafeEnabled {
mi := &file_config_service_chaos_experimentation_terminator_v1_termination_proto_msgTypes[2]
ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x))
ms.StoreMessageInfo(mi)
}
}
func (x *Config_PerConfigTypeConfig) String() string {
return protoimpl.X.MessageStringOf(x)
}
func (*Config_PerConfigTypeConfig) ProtoMessage() {}
func (x *Config_PerConfigTypeConfig) ProtoReflect() protoreflect.Message {
mi := &file_config_service_chaos_experimentation_terminator_v1_termination_proto_msgTypes[2]
if protoimpl.UnsafeEnabled && x != nil {
ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x))
if ms.LoadMessageInfo() == nil {
ms.StoreMessageInfo(mi)
}
return ms
}
return mi.MessageOf(x)
}
// Deprecated: Use Config_PerConfigTypeConfig.ProtoReflect.Descriptor instead.
func (*Config_PerConfigTypeConfig) Descriptor() ([]byte, []int) {
return file_config_service_chaos_experimentation_terminator_v1_termination_proto_rawDescGZIP(), []int{0, 0}
}
func (x *Config_PerConfigTypeConfig) GetTerminationCriteria() []*anypb.Any {
if x != nil {
return x.TerminationCriteria
}
return nil
}
var File_config_service_chaos_experimentation_terminator_v1_termination_proto protoreflect.FileDescriptor
var file_config_service_chaos_experimentation_terminator_v1_termination_proto_rawDesc = []byte{
0x0a, 0x44, 0x63, 0x6f, 0x6e, 0x66, 0x69, 0x67, 0x2f, 0x73, 0x65, 0x72, 0x76, 0x69, 0x63, 0x65,
0x2f, 0x63, 0x68, 0x61, 0x6f, 0x73, 0x2f, 0x65, 0x78, 0x70, 0x65, 0x72, 0x69, 0x6d, 0x65, 0x6e,
0x74, 0x61, 0x74, 0x69, 0x6f, 0x6e, 0x2f, 0x74, 0x65, 0x72, 0x6d, 0x69, 0x6e, 0x61, 0x74, 0x6f,
0x72, 0x2f, 0x76, 0x31, 0x2f, 0x74, 0x65, 0x72, 0x6d, 0x69, 0x6e, 0x61, 0x74, 0x69, 0x6f, 0x6e,
0x2e, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x12, 0x39, 0x63, 0x6c, 0x75, 0x74, 0x63, 0x68, 0x2e, 0x63,
0x6f, 0x6e, 0x66, 0x69, 0x67, 0x2e, 0x73, 0x65, 0x72, 0x76, 0x69, 0x63, 0x65, 0x2e, 0x63, 0x68,
0x61, 0x6f, 0x73, 0x2e, 0x65, 0x78, 0x70, 0x65, 0x72, 0x69, 0x6d, 0x65, 0x6e, 0x74, 0x61, 0x74,
0x69, 0x6f, 0x6e, 0x2e, 0x74, 0x65, 0x72, 0x6d, 0x69, 0x6e, 0x61, 0x74, 0x6f, 0x72, 0x2e, 0x76,
0x31, 0x1a, 0x17, 0x76, 0x61, 0x6c, 0x69, 0x64, 0x61, 0x74, 0x65, 0x2f, 0x76, 0x61, 0x6c, 0x69,
0x64, 0x61, 0x74, 0x65, 0x2e, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x1a, 0x1e, 0x67, 0x6f, 0x6f, 0x67,
0x6c, 0x65, 0x2f, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x62, 0x75, 0x66, 0x2f, 0x64, 0x75, 0x72, 0x61,
0x74, 0x69, 0x6f, 0x6e, 0x2e, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x1a, 0x19, 0x67, 0x6f, 0x6f, 0x67,
0x6c, 0x65, 0x2f, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x62, 0x75, 0x66, 0x2f, 0x61, 0x6e, 0x79, 0x2e,
0x70, 0x72, 0x6f, 0x74, 0x6f, 0x22, 0x81, 0x05, 0x0a, 0x06, 0x43, 0x6f, 0x6e, 0x66, 0x69, 0x67,
0x12, 0xa4, 0x01, 0x0a, 0x1d, 0x70, 0x65, 0x72, 0x5f, 0x63, 0x6f, 0x6e, 0x66, 0x69, 0x67, 0x5f,
0x74, 0x79, 0x70, 0x65, 0x5f, 0x63, 0x6f, 0x6e, 0x66, 0x69, 0x67, 0x75, 0x72, 0x61, 0x74, 0x69,
0x6f, 0x6e, 0x18, 0x01, 0x20, 0x03, 0x28, 0x0b, 0x32, 0x61, 0x2e, 0x63, 0x6c, 0x75, 0x74, 0x63,
0x68, 0x2e, 0x63, 0x6f, 0x6e, 0x66, 0x69, 0x67, 0x2e, 0x73, 0x65, 0x72, 0x76, 0x69, 0x63, 0x65,
0x2e, 0x63, 0x68, 0x61, 0x6f, 0x73, 0x2e, 0x65, 0x78, 0x70, 0x65, 0x72, 0x69, 0x6d, 0x65, 0x6e,
0x74, 0x61, 0x74, 0x69, 0x6f, 0x6e, 0x2e, 0x74, 0x65, 0x72, 0x6d, 0x69, 0x6e, 0x61, 0x74, 0x6f,
0x72, 0x2e, 0x76, 0x31, 0x2e, 0x43, 0x6f, 0x6e, 0x66, 0x69, 0x67, 0x2e, 0x50, 0x65, 0x72, 0x43,
0x6f, 0x6e, 0x66, 0x69, 0x67, 0x54, 0x79, 0x70, 0x65, 0x43, 0x6f, 0x6e, 0x66, 0x69, 0x67, 0x75,
0x72, 0x61, 0x74, 0x69, 0x6f, 0x6e, 0x45, 0x6e, 0x74, 0x72, 0x79, 0x52, 0x1a, 0x70, 0x65, 0x72,
0x43, 0x6f, 0x6e, 0x66, 0x69, 0x67, 0x54, 0x79, 0x70, 0x65, 0x43, 0x6f, 0x6e, 0x66, 0x69, 0x67,
0x75, 0x72, 0x61, 0x74, 0x69, 0x6f, 0x6e, 0x12, 0x55, 0x0a, 0x13, 0x6f, 0x75, 0x74, 0x65, 0x72,
0x5f, 0x6c, 0x6f, 0x6f, 0x70, 0x5f, 0x69, 0x6e, 0x74, 0x65, 0x72, 0x76, 0x61, 0x6c, 0x18, 0x02,
0x20, 0x01, 0x28, 0x0b, 0x32, 0x19, 0x2e, 0x67, 0x6f, 0x6f, 0x67, 0x6c, 0x65, 0x2e, 0x70, 0x72,
0x6f, 0x74, 0x6f, 0x62, 0x75, 0x66, 0x2e, 0x44, 0x75, 0x72, 0x61, 0x74, 0x69, 0x6f, 0x6e, 0x42,
0x0a, 0xfa, 0x42, 0x07, 0xaa, 0x01, 0x04, 0x2a, 0x02, 0x08, 0x00, 0x52, 0x11, 0x6f, 0x75, 0x74,
0x65, 0x72, 0x4c, 0x6f, 0x6f, 0x70, 0x49, 0x6e, 0x74, 0x65, 0x72, 0x76, 0x61, 0x6c, 0x12, 0x68,
0x0a, 0x1d, 0x70, 0x65, 0x72, 0x5f, 0x65, 0x78, 0x70, 0x65, 0x72, 0x69, 0x6d, 0x65, 0x6e, 0x74,
0x5f, 0x63, 0x68, 0x65, 0x63, 0x6b, 0x5f, 0x69, 0x6e, 0x74, 0x65, 0x72, 0x76, 0x61, 0x6c, 0x18,
0x03, 0x20, 0x01, 0x28, 0x0b, 0x32, 0x19, 0x2e, 0x67, 0x6f, 0x6f, 0x67, 0x6c, 0x65, 0x2e, 0x70,
0x72, 0x6f, 0x74, 0x6f, 0x62, 0x75, 0x66, 0x2e, 0x44, 0x75, 0x72, 0x61, 0x74, 0x69, 0x6f, 0x6e,
0x42, 0x0a, 0xfa, 0x42, 0x07, 0xaa, 0x01, 0x04, 0x2a, 0x02, 0x08, 0x00, 0x52, 0x1a, 0x70, 0x65,
0x72, 0x45, 0x78, 0x70, 0x65, 0x72, 0x69, 0x6d, 0x65, 0x6e, 0x74, 0x43, 0x68, 0x65, 0x63, 0x6b,
0x49, 0x6e, 0x74, 0x65, 0x72, 0x76, 0x61, 0x6c, 0x1a, 0x68, 0x0a, 0x13, 0x50, 0x65, 0x72, 0x43,
0x6f, 0x6e, 0x66, 0x69, 0x67, 0x54, 0x79, 0x70, 0x65, 0x43, 0x6f, 0x6e, 0x66, 0x69, 0x67, 0x12,
0x51, 0x0a, 0x14, 0x74, 0x65, 0x72, 0x6d, 0x69, 0x6e, 0x61, 0x74, 0x69, 0x6f, 0x6e, 0x5f, 0x63,
0x72, 0x69, 0x74, 0x65, 0x72, 0x69, 0x61, 0x18, 0x02, 0x20, 0x03, 0x28, 0x0b, 0x32, 0x14, 0x2e,
0x67, 0x6f, 0x6f, 0x67, 0x6c, 0x65, 0x2e, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x62, 0x75, 0x66, 0x2e,
0x41, 0x6e, 0x79, 0x42, 0x08, 0xfa, 0x42, 0x05, 0x92, 0x01, 0x02, 0x08, 0x01, 0x52, 0x13, 0x74,
0x65, 0x72, 0x6d, 0x69, 0x6e, 0x61, 0x74, 0x69, 0x6f, 0x6e, 0x43, 0x72, 0x69, 0x74, 0x65, 0x72,
0x69, 0x61, 0x1a, 0xa4, 0x01, 0x0a, 0x1f, 0x50, 0x65, 0x72, 0x43, 0x6f, 0x6e, 0x66, 0x69, 0x67,
0x54, 0x79, 0x70, 0x65, 0x43, 0x6f, 0x6e, 0x66, 0x69, 0x67, 0x75, 0x72, 0x61, 0x74, 0x69, 0x6f,
0x6e, 0x45, 0x6e, 0x74, 0x72, 0x79, 0x12, 0x10, 0x0a, 0x03, 0x6b, 0x65, 0x79, 0x18, 0x01, 0x20,
0x01, 0x28, 0x09, 0x52, 0x03, 0x6b, 0x65, 0x79, 0x12, 0x6b, 0x0a, 0x05, 0x76, 0x61, 0x6c, 0x75,
0x65, 0x18, 0x02, 0x20, 0x01, 0x28, 0x0b, 0x32, 0x55, 0x2e, 0x63, 0x6c, 0x75, 0x74, 0x63, 0x68,
0x2e, 0x63, 0x6f, 0x6e, 0x66, 0x69, 0x67, 0x2e, 0x73, 0x65, 0x72, 0x76, 0x69, 0x63, 0x65, 0x2e,
0x63, 0x68, 0x61, 0x6f, 0x73, 0x2e, 0x65, 0x78, 0x70, 0x65, 0x72, 0x69, 0x6d, 0x65, 0x6e, 0x74,
0x61, 0x74, 0x69, 0x6f, 0x6e, 0x2e, 0x74, 0x65, 0x72, 0x6d, 0x69, 0x6e, 0x61, 0x74, 0x6f, 0x72,
0x2e, 0x76, 0x31, 0x2e, 0x43, 0x6f, 0x6e, 0x66, 0x69, 0x67, 0x2e, 0x50, 0x65, 0x72, 0x43, 0x6f,
0x6e, 0x66, 0x69, 0x67, 0x54, 0x79, 0x70, 0x65, 0x43, 0x6f, 0x6e, 0x66, 0x69, 0x67, 0x52, 0x05,
0x76, 0x61, 0x6c, 0x75, 0x65, 0x3a, 0x02, 0x38, 0x01, 0x22, 0x67, 0x0a, 0x1b, 0x4d, 0x61, 0x78,
0x54, 0x69, 0x6d, 0x65, 0x54, 0x65, 0x72, 0x6d, 0x69, 0x6e, 0x61, 0x74, 0x69, 0x6f, 0x6e, 0x43,
0x72, 0x69, 0x74, 0x65, 0x72, 0x69, 0x6f, 0x6e, 0x12, 0x48, 0x0a, 0x0c, 0x6d, 0x61, 0x78, 0x5f,
0x64, 0x75, 0x72, 0x61, 0x74, 0x69, 0x6f, 0x6e, 0x18, 0x01, 0x20, 0x01, 0x28, 0x0b, 0x32, 0x19,
0x2e, 0x67, 0x6f, 0x6f, 0x67, 0x6c, 0x65, 0x2e, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x62, 0x75, 0x66,
0x2e, 0x44, 0x75, 0x72, 0x61, 0x74, 0x69, 0x6f, 0x6e, 0x42, 0x0a, 0xfa, 0x42, 0x07, 0xaa, 0x01,
0x04, 0x2a, 0x02, 0x08, 0x00, 0x52, 0x0b, 0x6d, 0x61, 0x78, 0x44, 0x75, 0x72, 0x61, 0x74, 0x69,
0x6f, 0x6e, 0x42, 0x64, 0x5a, 0x62, 0x67, 0x69, 0x74, 0x68, 0x75, 0x62, 0x2e, 0x63, 0x6f, 0x6d,
0x2f, 0x6c, 0x79, 0x66, 0x74, 0x2f, 0x63, 0x6c, 0x75, 0x74, 0x63, 0x68, 0x2f, 0x62, 0x61, 0x63,
0x6b, 0x65, 0x6e, 0x64, 0x2f, 0x61, 0x70, 0x69, 0x2f, 0x63, 0x6f, 0x6e, 0x66, 0x69, 0x67, 0x2f,
0x73, 0x65, 0x72, 0x76, 0x69, 0x63, 0x65, 0x2f, 0x63, 0x68, 0x61, 0x6f, 0x73, 0x2f, 0x65, 0x78,
0x70, 0x65, 0x72, 0x69, 0x6d, 0x65, 0x6e, 0x74, 0x61, 0x74, 0x69, 0x6f, 0x6e, 0x2f, 0x74, 0x65,
0x72, 0x6d, 0x69, 0x6e, 0x61, 0x74, 0x6f, 0x72, 0x2f, 0x76, 0x31, 0x3b, 0x74, 0x65, 0x72, 0x6d,
0x69, 0x6e, 0x61, 0x74, 0x6f, 0x72, 0x76, 0x31, 0x62, 0x06, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x33,
}
var (
file_config_service_chaos_experimentation_terminator_v1_termination_proto_rawDescOnce sync.Once
file_config_service_chaos_experimentation_terminator_v1_termination_proto_rawDescData = file_config_service_chaos_experimentation_terminator_v1_termination_proto_rawDesc
)
func file_config_service_chaos_experimentation_terminator_v1_termination_proto_rawDescGZIP() []byte {
file_config_service_chaos_experimentation_terminator_v1_termination_proto_rawDescOnce.Do(func() {
file_config_service_chaos_experimentation_terminator_v1_termination_proto_rawDescData = protoimpl.X.CompressGZIP(file_config_service_chaos_experimentation_terminator_v1_termination_proto_rawDescData)
})
return file_config_service_chaos_experimentation_terminator_v1_termination_proto_rawDescData
}
var file_config_service_chaos_experimentation_terminator_v1_termination_proto_msgTypes = make([]protoimpl.MessageInfo, 4)
var file_config_service_chaos_experimentation_terminator_v1_termination_proto_goTypes = []interface{}{
(*Config)(nil), // 0: clutch.config.service.chaos.experimentation.terminator.v1.Config
(*MaxTimeTerminationCriterion)(nil), // 1: clutch.config.service.chaos.experimentation.terminator.v1.MaxTimeTerminationCriterion
(*Config_PerConfigTypeConfig)(nil), // 2: clutch.config.service.chaos.experimentation.terminator.v1.Config.PerConfigTypeConfig
nil, // 3: clutch.config.service.chaos.experimentation.terminator.v1.Config.PerConfigTypeConfigurationEntry
(*durationpb.Duration)(nil), // 4: google.protobuf.Duration
(*anypb.Any)(nil), // 5: google.protobuf.Any
}
var file_config_service_chaos_experimentation_terminator_v1_termination_proto_depIdxs = []int32{
3, // 0: clutch.config.service.chaos.experimentation.terminator.v1.Config.per_config_type_configuration:type_name -> clutch.config.service.chaos.experimentation.terminator.v1.Config.PerConfigTypeConfigurationEntry
4, // 1: clutch.config.service.chaos.experimentation.terminator.v1.Config.outer_loop_interval:type_name -> google.protobuf.Duration
4, // 2: clutch.config.service.chaos.experimentation.terminator.v1.Config.per_experiment_check_interval:type_name -> google.protobuf.Duration
4, // 3: clutch.config.service.chaos.experimentation.terminator.v1.MaxTimeTerminationCriterion.max_duration:type_name -> google.protobuf.Duration
5, // 4: clutch.config.service.chaos.experimentation.terminator.v1.Config.PerConfigTypeConfig.termination_criteria:type_name -> google.protobuf.Any
2, // 5: clutch.config.service.chaos.experimentation.terminator.v1.Config.PerConfigTypeConfigurationEntry.value:type_name -> clutch.config.service.chaos.experimentation.terminator.v1.Config.PerConfigTypeConfig
6, // [6:6] is the sub-list for method output_type
6, // [6:6] is the sub-list for method input_type
6, // [6:6] is the sub-list for extension type_name
6, // [6:6] is the sub-list for extension extendee
0, // [0:6] is the sub-list for field type_name
}
func init() { file_config_service_chaos_experimentation_terminator_v1_termination_proto_init() }
func file_config_service_chaos_experimentation_terminator_v1_termination_proto_init() {
if File_config_service_chaos_experimentation_terminator_v1_termination_proto != nil {
return
}
if !protoimpl.UnsafeEnabled {
file_config_service_chaos_experimentation_terminator_v1_termination_proto_msgTypes[0].Exporter = func(v interface{}, i int) interface{} {
switch v := v.(*Config); i {
case 0:
return &v.state
case 1:
return &v.sizeCache
case 2:
return &v.unknownFields
default:
return nil
}
}
file_config_service_chaos_experimentation_terminator_v1_termination_proto_msgTypes[1].Exporter = func(v interface{}, i int) interface{} {
switch v := v.(*MaxTimeTerminationCriterion); i {
case 0:
return &v.state
case 1:
return &v.sizeCache
case 2:
return &v.unknownFields
default:
return nil
}
}
file_config_service_chaos_experimentation_terminator_v1_termination_proto_msgTypes[2].Exporter = func(v interface{}, i int) interface{} {
switch v := v.(*Config_PerConfigTypeConfig); i {
case 0:
return &v.state
case 1:
return &v.sizeCache
case 2:
return &v.unknownFields
default:
return nil
}
}
}
type x struct{}
out := protoimpl.TypeBuilder{
File: protoimpl.DescBuilder{
GoPackagePath: reflect.TypeOf(x{}).PkgPath(),
RawDescriptor: file_config_service_chaos_experimentation_terminator_v1_termination_proto_rawDesc,
NumEnums: 0,
NumMessages: 4,
NumExtensions: 0,
NumServices: 0,
},
GoTypes: file_config_service_chaos_experimentation_terminator_v1_termination_proto_goTypes,
DependencyIndexes: file_config_service_chaos_experimentation_terminator_v1_termination_proto_depIdxs,
MessageInfos: file_config_service_chaos_experimentation_terminator_v1_termination_proto_msgTypes,
}.Build()
File_config_service_chaos_experimentation_terminator_v1_termination_proto = out.File
file_config_service_chaos_experimentation_terminator_v1_termination_proto_rawDesc = nil
file_config_service_chaos_experimentation_terminator_v1_termination_proto_goTypes = nil
file_config_service_chaos_experimentation_terminator_v1_termination_proto_depIdxs = nil
}
// Code generated by protoc-gen-validate. DO NOT EDIT.
// source: config/service/chaos/experimentation/terminator/v1/termination.proto
package terminatorv1
import (
"bytes"
"errors"
"fmt"
"net"
"net/mail"
"net/url"
"regexp"
"strings"
"time"
"unicode/utf8"
"github.com/golang/protobuf/ptypes"
)
// ensure the imports are used
var (
_ = bytes.MinRead
_ = errors.New("")
_ = fmt.Print
_ = utf8.UTFMax
_ = (*regexp.Regexp)(nil)
_ = (*strings.Reader)(nil)
_ = net.IPv4len
_ = time.Duration(0)
_ = (*url.URL)(nil)
_ = (*mail.Address)(nil)
_ = ptypes.DynamicAny{}
)
// define the regex for a UUID once up-front
var _termination_uuidPattern = regexp.MustCompile("^[0-9a-fA-F]{8}-[0-9a-fA-F]{4}-[0-9a-fA-F]{4}-[0-9a-fA-F]{4}-[0-9a-fA-F]{12}$")
// Validate checks the field values on Config with the rules defined in the
// proto definition for this message. If any rules are violated, an error is returned.
func (m *Config) Validate() error {
if m == nil {
return nil
}
for key, val := range m.GetPerConfigTypeConfiguration() {
_ = val
// no validation rules for PerConfigTypeConfiguration[key]
if v, ok := interface{}(val).(interface{ Validate() error }); ok {
if err := v.Validate(); err != nil {
return ConfigValidationError{
field: fmt.Sprintf("PerConfigTypeConfiguration[%v]", key),
reason: "embedded message failed validation",
cause: err,
}
}
}
}
if d := m.GetOuterLoopInterval(); d != nil {
dur, err := ptypes.Duration(d)
if err != nil {
return ConfigValidationError{
field: "OuterLoopInterval",
reason: "value is not a valid duration",
cause: err,
}
}
gt := time.Duration(0*time.Second + 0*time.Nanosecond)
if dur <= gt {
return ConfigValidationError{
field: "OuterLoopInterval",
reason: "value must be greater than 0s",
}
}
}
if d := m.GetPerExperimentCheckInterval(); d != nil {
dur, err := ptypes.Duration(d)
if err != nil {
return ConfigValidationError{
field: "PerExperimentCheckInterval",
reason: "value is not a valid duration",
cause: err,
}
}
gt := time.Duration(0*time.Second + 0*time.Nanosecond)
if dur <= gt {
return ConfigValidationError{
field: "PerExperimentCheckInterval",
reason: "value must be greater than 0s",
}
}
}
return nil
}
// ConfigValidationError is the validation error returned by Config.Validate if
// the designated constraints aren't met.
type ConfigValidationError struct {
field string
reason string
cause error
key bool
}
// Field function returns field value.
func (e ConfigValidationError) Field() string { return e.field }
// Reason function returns reason value.
func (e ConfigValidationError) Reason() string { return e.reason }
// Cause function returns cause value.
func (e ConfigValidationError) Cause() error { return e.cause }
// Key function returns key value.
func (e ConfigValidationError) Key() bool { return e.key }
// ErrorName returns error name.
func (e ConfigValidationError) ErrorName() string { return "ConfigValidationError" }
// Error satisfies the builtin error interface
func (e ConfigValidationError) Error() string {
cause := ""
if e.cause != nil {
cause = fmt.Sprintf(" | caused by: %v", e.cause)
}
key := ""
if e.key {
key = "key for "
}
return fmt.Sprintf(
"invalid %sConfig.%s: %s%s",
key,
e.field,
e.reason,
cause)
}
var _ error = ConfigValidationError{}
var _ interface {
Field() string
Reason() string
Key() bool
Cause() error
ErrorName() string
} = ConfigValidationError{}
// Validate checks the field values on MaxTimeTerminationCriterion with the
// rules defined in the proto definition for this message. If any rules are
// violated, an error is returned.
func (m *MaxTimeTerminationCriterion) Validate() error {
if m == nil {
return nil
}
if d := m.GetMaxDuration(); d != nil {
dur, err := ptypes.Duration(d)
if err != nil {
return MaxTimeTerminationCriterionValidationError{
field: "MaxDuration",
reason: "value is not a valid duration",
cause: err,
}
}
gt := time.Duration(0*time.Second + 0*time.Nanosecond)
if dur <= gt {
return MaxTimeTerminationCriterionValidationError{
field: "MaxDuration",
reason: "value must be greater than 0s",
}
}
}
return nil
}
// MaxTimeTerminationCriterionValidationError is the validation error returned
// by MaxTimeTerminationCriterion.Validate if the designated constraints
// aren't met.
type MaxTimeTerminationCriterionValidationError struct {
field string
reason string
cause error
key bool
}
// Field function returns field value.
func (e MaxTimeTerminationCriterionValidationError) Field() string { return e.field }
// Reason function returns reason value.
func (e MaxTimeTerminationCriterionValidationError) Reason() string { return e.reason }
// Cause function returns cause value.
func (e MaxTimeTerminationCriterionValidationError) Cause() error { return e.cause }
// Key function returns key value.
func (e MaxTimeTerminationCriterionValidationError) Key() bool { return e.key }
// ErrorName returns error name.
func (e MaxTimeTerminationCriterionValidationError) ErrorName() string {
return "MaxTimeTerminationCriterionValidationError"
}
// Error satisfies the builtin error interface
func (e MaxTimeTerminationCriterionValidationError) Error() string {
cause := ""
if e.cause != nil {
cause = fmt.Sprintf(" | caused by: %v", e.cause)
}
key := ""
if e.key {
key = "key for "
}
return fmt.Sprintf(
"invalid %sMaxTimeTerminationCriterion.%s: %s%s",
key,
e.field,
e.reason,
cause)
}
var _ error = MaxTimeTerminationCriterionValidationError{}
var _ interface {
Field() string
Reason() string
Key() bool
Cause() error
ErrorName() string
} = MaxTimeTerminationCriterionValidationError{}
// Validate checks the field values on Config_PerConfigTypeConfig with the
// rules defined in the proto definition for this message. If any rules are
// violated, an error is returned.
func (m *Config_PerConfigTypeConfig) Validate() error {
if m == nil {
return nil
}
if len(m.GetTerminationCriteria()) < 1 {
return Config_PerConfigTypeConfigValidationError{
field: "TerminationCriteria",
reason: "value must contain at least 1 item(s)",
}
}
for idx, item := range m.GetTerminationCriteria() {
_, _ = idx, item
if v, ok := interface{}(item).(interface{ Validate() error }); ok {
if err := v.Validate(); err != nil {
return Config_PerConfigTypeConfigValidationError{
field: fmt.Sprintf("TerminationCriteria[%v]", idx),
reason: "embedded message failed validation",
cause: err,
}
}
}
}
return nil
}
// Config_PerConfigTypeConfigValidationError is the validation error returned
// by Config_PerConfigTypeConfig.Validate if the designated constraints aren't met.
type Config_PerConfigTypeConfigValidationError struct {
field string
reason string
cause error
key bool
}
// Field function returns field value.
func (e Config_PerConfigTypeConfigValidationError) Field() string { return e.field }
// Reason function returns reason value.
func (e Config_PerConfigTypeConfigValidationError) Reason() string { return e.reason }
// Cause function returns cause value.
func (e Config_PerConfigTypeConfigValidationError) Cause() error { return e.cause }
// Key function returns key value.
func (e Config_PerConfigTypeConfigValidationError) Key() bool { return e.key }
// ErrorName returns error name.
func (e Config_PerConfigTypeConfigValidationError) ErrorName() string {
return "Config_PerConfigTypeConfigValidationError"
}
// Error satisfies the builtin error interface
func (e Config_PerConfigTypeConfigValidationError) Error() string {
cause := ""
if e.cause != nil {
cause = fmt.Sprintf(" | caused by: %v", e.cause)
}
key := ""
if e.key {
key = "key for "
}
return fmt.Sprintf(
"invalid %sConfig_PerConfigTypeConfig.%s: %s%s",
key,
e.field,
e.reason,
cause)
}
var _ error = Config_PerConfigTypeConfigValidationError{}
var _ interface {
Field() string
Reason() string
Key() bool
Cause() error
ErrorName() string
} = Config_PerConfigTypeConfigValidationError{}
......@@ -36,6 +36,7 @@ import (
authzservice "github.com/lyft/clutch/backend/service/authz"
awsservice "github.com/lyft/clutch/backend/service/aws"
"github.com/lyft/clutch/backend/service/chaos/experimentation/experimentstore"
"github.com/lyft/clutch/backend/service/chaos/experimentation/terminator"
pgservice "github.com/lyft/clutch/backend/service/db/postgres"
"github.com/lyft/clutch/backend/service/envoyadmin"
"github.com/lyft/clutch/backend/service/github"
......@@ -78,6 +79,7 @@ var Services = service.Factory{
awsservice.Name: awsservice.New,
envoyadmin.Name: envoyadmin.New,
experimentstore.Name: experimentstore.New,
terminator.Name: terminator.New,
github.Name: github.New,
k8sservice.Name: k8sservice.New,
loggingsink.Name: loggingsink.New,
......
......@@ -15,10 +15,12 @@ import (
"google.golang.org/protobuf/types/known/anypb"
"google.golang.org/protobuf/types/known/durationpb"
"google.golang.org/protobuf/types/known/timestamppb"
"google.golang.org/protobuf/types/known/wrapperspb"
experimentationv1 "github.com/lyft/clutch/backend/api/chaos/experimentation/v1"
serverexperimentation "github.com/lyft/clutch/backend/api/chaos/serverexperimentation/v1"
xdsconfigv1 "github.com/lyft/clutch/backend/api/config/module/chaos/experimentation/xds/v1"
terminatorv1 "github.com/lyft/clutch/backend/api/config/service/chaos/experimentation/terminator/v1"
"github.com/lyft/clutch/backend/internal/test/integration/helper/envoytest"
"github.com/lyft/clutch/backend/mock/service/chaos/experimentation/experimentstoremock"
"github.com/lyft/clutch/backend/module/chaos/serverexperimentation/xds/internal/xdstest"
......@@ -81,20 +83,34 @@ func TestEnvoyFaultsTimeBasedTermination(t *testing.T) {
ts := xdstest.NewTestModuleServer(New, true, xdsConfig)
defer ts.Stop()
criteria := &testCriteria{}
criterion := &testCriterion{}
terminator := terminator.NewTestMonitor(
ts.Storer,
[]string{"type.googleapis.com/clutch.chaos.serverexperimentation.v1.HTTPFaultConfig"},
[]terminator.TerminationCriteria{criteria},
ts.Logger.Sugar(),
ts.Scope)
terminator.CriterionFactories["type.googleapis.com/google.protobuf.StringValue"] = &testCriterionFactory{testCriterion: criterion}
anyString, err := anypb.New(&wrapperspb.StringValue{})
assert.NoError(t, err)
typedConfig := &terminatorv1.Config{
PerConfigTypeConfiguration: map[string]*terminatorv1.Config_PerConfigTypeConfig{
"type.googleapis.com/clutch.chaos.serverexperimentation.v1.HTTPFaultConfig": {
TerminationCriteria: []*anypb.Any{anyString},
},
},
OuterLoopInterval: durationpb.New(time.Second),
PerExperimentCheckInterval: durationpb.New(time.Second),
}
anyConfig, err := anypb.New(typedConfig)
assert.NoError(t, err)
terminator, err := terminator.NewMonitor(anyConfig, ts.Logger, ts.Scope)
assert.NoError(t, err)
// Cancel to ensure that the terminator doesn't leak into other tests.
ctx, cancel := context.WithCancel(context.Background())
terminator.Run(ctx)
defer cancel()
terminator.Run(ctx)
e, err := envoytest.NewEnvoyHandle()
assert.NoError(t, err)
......@@ -113,7 +129,7 @@ func TestEnvoyFaultsTimeBasedTermination(t *testing.T) {
})
assert.NoError(t, err, "did not see faults enabled")
criteria.start()
criterion.start()
// Since we've enabled a time based automatic termination, we expect to see faults get disabled on their own after some time.
err = awaitExpectedReturnValueForSimpleCall(t, e, awaitReturnValueParams{
......@@ -233,21 +249,29 @@ func awaitExpectedReturnValueForSimpleCall(t *testing.T, e *envoytest.EnvoyHandl
return nil
}
type testCriteria struct {
type testCriterionFactory struct {
testCriterion *testCriterion
}
func (t *testCriterionFactory) Create(*anypb.Any) (terminator.TerminationCriterion, error) {
return t.testCriterion, nil
}
type testCriterion struct {
startCheckingTime bool
sync.Mutex
}
// We want the time check to be low but also avoid races, so this lets us prevent the criteria from activating until
// we know that we're seeing faults enabled.
func (t *testCriteria) start() {
func (t *testCriterion) start() {
t.Lock()
defer t.Unlock()
t.startCheckingTime = true
}
func (t *testCriteria) ShouldTerminate(experiment *experimentationv1.Experiment, experimentConfig proto.Message) (string, error) {
func (t *testCriterion) ShouldTerminate(experiment *experimentationv1.Experiment, experimentConfig proto.Message) (string, error) {
t.Lock()
defer t.Unlock()
......
package terminator
import (
"fmt"
"time"
"github.com/golang/protobuf/ptypes/any"
"google.golang.org/protobuf/proto"
experimentationv1 "github.com/lyft/clutch/backend/api/chaos/experimentation/v1"
terminatorv1 "github.com/lyft/clutch/backend/api/config/service/chaos/experimentation/terminator/v1"
)
type maxTimeTerminationCriterion struct {
maxTime time.Duration
}
func (m maxTimeTerminationCriterion) ShouldTerminate(experiment *experimentationv1.Experiment, experimentConfig proto.Message) (string, error) {
startTime := experiment.StartTime.AsTime()
if startTime.Add(m.maxTime).Before(time.Now()) {
return fmt.Sprintf("timed out (max duration %s)", m.maxTime), nil
}
return "", nil
}
type maxTimeTerminationFactory struct{}
func (maxTimeTerminationFactory) Create(cfg *any.Any) (TerminationCriterion, error) {
typedConfig := &terminatorv1.MaxTimeTerminationCriterion{}
err := cfg.UnmarshalTo(typedConfig)
if err != nil {
return nil, err
}
maxTime := typedConfig.MaxDuration.AsDuration()
return maxTimeTerminationCriterion{maxTime: maxTime}, nil
}
......@@ -2,6 +2,8 @@ package terminator
import (
"context"
"errors"
"fmt"
"sync/atomic"
"time"
......@@ -11,55 +13,111 @@ import (
"google.golang.org/protobuf/types/known/anypb"
experimentationv1 "github.com/lyft/clutch/backend/api/chaos/experimentation/v1"
terminatorv1 "github.com/lyft/clutch/backend/api/config/service/chaos/experimentation/terminator/v1"
"github.com/lyft/clutch/backend/service"
"github.com/lyft/clutch/backend/service/chaos/experimentation/experimentstore"
)
type TerminationCriteria interface {
const Name = "clutch.module.chaos.experimentation.termination"
func TypeUrl(message proto.Message) string {
return "type.googleapis.com/" + string(message.ProtoReflect().Descriptor().FullName())
}
var CriterionFactories = map[string]CriterionFactory{
TypeUrl(&terminatorv1.MaxTimeTerminationCriterion{}): &maxTimeTerminationFactory{},
}
type CriterionFactory interface {
Create(cfg *anypb.Any) (TerminationCriterion, error)
}
type TerminationCriterion interface {
// ShouldTerminate determines whether the provided experiment should be terminated.
// To signal that termination should occur, return a non-empty string with a nil error.
ShouldTerminate(experiment *experimentationv1.Experiment, experimentConfig proto.Message) (string, error)
}
type Monitor interface {
Run(ctx context.Context)
func New(cfg *anypb.Any, logger *zap.Logger, scope tally.Scope) (service.Service, error) {
m, err := NewMonitor(cfg, logger, scope)
if err != nil {
m.Run(context.Background())
}
return m, nil
}
// TODO(snowp): Remove this once we have a proper service object that we can create.
func NewTestMonitor(store experimentstore.Storer, enabledConfigTypes []string, criterias []TerminationCriteria, log *zap.SugaredLogger, stats tally.Scope) Monitor {
return &monitor{
store: store,
enabledConfigTypes: enabledConfigTypes,
criterias: criterias,
outerLoopInterval: 1,
perExperimentCheckInterval: 1,
log: log,
activeMonitoringRoutines: trackingGauge{gauge: stats.Gauge("active_monitoring_routines")},
criteriaEvaluationSuccess: stats.Counter("criteria_success"),
criteriaEvaluationFailure: stats.Counter("criteria_failure"),
terminationCount: stats.Counter("terminations"),
marshallingErrors: stats.Counter("unpack_error"),
func NewMonitor(cfg *anypb.Any, logger *zap.Logger, scope tally.Scope) (*Monitor, error) {
typedConfig := &terminatorv1.Config{}
err := cfg.UnmarshalTo(typedConfig)
if err != nil {
return nil, err
}
store, ok := service.Registry[experimentstore.Name]
if !ok {
return nil, errors.New("could not find experiment store service")
}
storer, ok := store.(experimentstore.Storer)
if !ok {
return nil, errors.New("service was not the correct type")
}
terminationCriteria := map[string][]TerminationCriterion{}
for configType, perConfigTypeConfig := range typedConfig.PerConfigTypeConfiguration {
perConfigCriteria := []TerminationCriterion{}
for _, c := range perConfigTypeConfig.TerminationCriteria {
factory, ok := CriterionFactories[c.TypeUrl]
if !ok {
return nil, fmt.Errorf("terminator module configured with unknown criterion '%s'", c.TypeUrl)
}
criterion, err := factory.Create(c)
if err != nil {
return nil, fmt.Errorf("failed to create termination criteria '%s': %s", c.TypeUrl, err)
}
perConfigCriteria = append(perConfigCriteria, criterion)
}
terminationCriteria[configType] = perConfigCriteria
}
return &Monitor{
store: storer,
terminationCriteriaByTypeUrl: terminationCriteria,
outerLoopInterval: typedConfig.OuterLoopInterval.AsDuration(),
perExperimentCheckInterval: typedConfig.PerExperimentCheckInterval.AsDuration(),
log: logger.Sugar(),
activeMonitoringRoutines: trackingGauge{gauge: scope.Gauge("active_monitoring_routines")},
criterionEvaluationSuccessCount: scope.Counter("criterion_success"),
criterionEvaluationFailureCount: scope.Counter("criterion_failure"),
terminationCount: scope.Counter("terminations"),
marshallingErrorCount: scope.Counter("unpack_error"),
}, nil
}
type monitor struct {
store experimentstore.Storer
enabledConfigTypes []string
criterias []TerminationCriteria
type Monitor struct {
store experimentstore.Storer
terminationCriteriaByTypeUrl map[string][]TerminationCriterion
outerLoopInterval time.Duration
perExperimentCheckInterval time.Duration
log *zap.SugaredLogger
criteriaEvaluationSuccess tally.Counter
criteriaEvaluationFailure tally.Counter
activeMonitoringRoutines trackingGauge
terminationCount tally.Counter
marshallingErrors tally.Counter
activeMonitoringRoutines trackingGauge
criterionEvaluationSuccessCount tally.Counter
criterionEvaluationFailureCount tally.Counter
terminationCount tally.Counter
marshallingErrorCount tally.Counter
}
func (m *monitor) Run(ctx context.Context) {
for _, configType := range m.enabledConfigTypes {
func (m *Monitor) Run(ctx context.Context) {
for configType, criteria := range m.terminationCriteriaByTypeUrl {
// For each monitored config type, start a single goroutine that polls the active experiments at a fixed interval.
// Whenever a new (new to this goroutine) experiment is found, open up a goroutine that periodically evaluates all
// the termination criteria for the experiment.
......@@ -67,7 +125,7 @@ func (m *monitor) Run(ctx context.Context) {
// This approach ensures provides a steady DB pressure (mostly outer loop, some from triggering termination) and relatively high
// fairness, as checking the termination conditions for one experiment should not be delaying the checks for another experiment
// unless we're under very heavy load.
go func(configType string) {
go func(configType string, criteria []TerminationCriterion) {
trackedExperiments := map[string]context.CancelFunc{}
ticker := time.NewTicker(m.outerLoopInterval)
......@@ -78,11 +136,11 @@ func (m *monitor) Run(ctx context.Context) {
case <-ticker.C:
es, err := m.store.GetExperiments(context.Background(), configType, experimentationv1.GetExperimentsRequest_STATUS_RUNNING)
if err != nil {
m.log.Errorw("failed to retrieve experiments from experiment store", "err", err, "enableConfigTypes", m.enabledConfigTypes)
m.log.Errorw("failed to retrieve experiments from experiment store", "err", err, "configType", configType)
continue
}
activeExperiments := m.monitorNewExperiments(es, trackedExperiments)
activeExperiments := m.monitorNewExperiments(es, trackedExperiments, criteria)
// For all experiments that we're tracking that no longer appear to be active, cancel the goroutine
// and clean it up.
......@@ -94,13 +152,13 @@ func (m *monitor) Run(ctx context.Context) {
}
}
}
}(configType)
}(configType, criteria)
}
}
// Iterates over all the provided experiments, spawning a goroutine to montior each experiment that doesn't already have
// a monitoring routine. Returns a set containing all the active experiment ids for further processing.
func (m *monitor) monitorNewExperiments(es []*experimentationv1.Experiment, trackedExperiments map[string]context.CancelFunc) map[string]struct{} {
func (m *Monitor) monitorNewExperiments(es []*experimentationv1.Experiment, trackedExperiments map[string]context.CancelFunc, criteria []TerminationCriterion) map[string]struct{} {
// For each active experiment, create a monitoring goroutine if necessary.
activeExperiments := map[string]struct{}{}
for _, e := range es {
......@@ -112,14 +170,14 @@ func (m *monitor) monitorNewExperiments(es []*experimentationv1.Experiment, trac
m.activeMonitoringRoutines.inc()
go func() {
defer m.activeMonitoringRoutines.dec()
m.monitorSingleExperiment(ctx, e)
m.monitorSingleExperiment(ctx, e, criteria)
}()
}
}
return activeExperiments
}
func (m *monitor) monitorSingleExperiment(ctx context.Context, e *experimentationv1.Experiment) {
func (m *Monitor) monitorSingleExperiment(ctx context.Context, e *experimentationv1.Experiment, criteria []TerminationCriterion) {
ticker := time.NewTicker(m.perExperimentCheckInterval)
terminated := false
......@@ -129,7 +187,7 @@ func (m *monitor) monitorSingleExperiment(ctx context.Context, e *experimentatio
unpackedConfig, err := anypb.UnmarshalNew(e.Config, proto.UnmarshalOptions{})
if err != nil {
m.log.Errorw("failed to unmarshal experiment", "runId", e.RunId)
m.marshallingErrors.Inc(1)
m.marshallingErrorCount.Inc(1)
return
}
......@@ -143,16 +201,16 @@ func (m *monitor) monitorSingleExperiment(ctx context.Context, e *experimentatio
// loop can race and restart this goroutine.
continue
}
for _, c := range m.criterias {
for _, c := range criteria {
terminationReason, err := c.ShouldTerminate(e, unpackedConfig)
// TODO(snowp): The logs here might get spammy, rate limit or terminate montioring routine somehow?
if err != nil {
m.criteriaEvaluationFailure.Inc(1)
m.log.Errorw("error while evaluating termination criteria", "runId", e.RunId, "error", err)
m.criterionEvaluationFailureCount.Inc(1)
m.log.Errorw("error while evaluating termination criterion", "runId", e.RunId, "error", err)
continue
}
m.criteriaEvaluationSuccess.Inc(1)
m.criterionEvaluationSuccessCount.Inc(1)
if terminationReason != "" {
err = m.store.CancelExperimentRun(context.Background(), e.RunId, terminationReason)
......
......@@ -11,13 +11,57 @@ import (
"go.uber.org/zap"
"google.golang.org/protobuf/proto"
"google.golang.org/protobuf/types/known/anypb"
"google.golang.org/protobuf/types/known/durationpb"
"google.golang.org/protobuf/types/known/wrapperspb"
experimentationv1 "github.com/lyft/clutch/backend/api/chaos/experimentation/v1"
serverexperimentationv1 "github.com/lyft/clutch/backend/api/chaos/serverexperimentation/v1"
terminatorv1 "github.com/lyft/clutch/backend/api/config/service/chaos/experimentation/terminator/v1"
"github.com/lyft/clutch/backend/mock/service/chaos/experimentation/experimentstoremock"
"github.com/lyft/clutch/backend/service"
"github.com/lyft/clutch/backend/service/chaos/experimentation/experimentstore"
)
const testConfigType = "type.googleapis.com/clutch.chaos.serverexperimentation.v1.HTTPFaultConfig"
func TestConfigLoad(t *testing.T) {
builtInCriterionConfig := &terminatorv1.MaxTimeTerminationCriterion{
MaxDuration: durationpb.New(time.Hour),
}
builtInCriterionAnyConfig, err := anypb.New(builtInCriterionConfig)
assert.NoError(t, err)
cfg := &terminatorv1.Config{
PerConfigTypeConfiguration: map[string]*terminatorv1.Config_PerConfigTypeConfig{testConfigType: {TerminationCriteria: []*anypb.Any{builtInCriterionAnyConfig}}},
OuterLoopInterval: durationpb.New(time.Second),
PerExperimentCheckInterval: durationpb.New(time.Second),
}
any, err := anypb.New(cfg)
assert.NoError(t, err)
service.Registry[experimentstore.Name] = &experimentstoremock.MockStorer{}
// Happy path testing.
m, err := NewMonitor(any, zap.NewNop(), tally.NoopScope)
assert.NoError(t, err)
assert.Len(t, m.terminationCriteriaByTypeUrl, 1)
assert.Len(t, m.terminationCriteriaByTypeUrl[testConfigType], 1)
// If configured with a criteria we don't have registered we should error out.
otherProto := &wrapperspb.StringValue{}
otherProtoAny, err := anypb.New(otherProto)
assert.NoError(t, err)
cfg.PerConfigTypeConfiguration[testConfigType].TerminationCriteria = append(cfg.PerConfigTypeConfiguration[testConfigType].TerminationCriteria, otherProtoAny)
any, err = anypb.New(cfg)
assert.NoError(t, err)
m, err = NewMonitor(any, zap.NewNop(), tally.NoopScope)
assert.Nil(t, m)
assert.EqualError(t, err, "terminator module configured with unknown criterion 'type.googleapis.com/google.protobuf.StringValue'")
}
func TestTerminator(t *testing.T) {
l, err := zap.NewDevelopment()
assert.NoError(t, err)
......@@ -25,22 +69,21 @@ func TestTerminator(t *testing.T) {
testScope := tally.NewTestScope("", map[string]string{})
store := &experimentstoremock.SimpleStorer{}
criteria := &testCriteria{}
monitor := monitor{
store: store,
enabledConfigTypes: []string{"type.googleapis.com/clutch.chaos.serverexperimentation.v1.HTTPFaultConfig"},
criterias: []TerminationCriteria{criteria},
outerLoopInterval: time.Millisecond,
perExperimentCheckInterval: time.Millisecond,
log: l.Sugar(),
criterion := &testCriterion{}
monitor := Monitor{
store: store,
terminationCriteriaByTypeUrl: map[string][]TerminationCriterion{testConfigType: {criterion}},
outerLoopInterval: time.Millisecond,
perExperimentCheckInterval: time.Millisecond,
log: l.Sugar(),
activeMonitoringRoutines: trackingGauge{
gauge: testScope.Gauge("active_routines"),
value: 0,
},
criteriaEvaluationSuccess: testScope.Counter("criteria_success"),
criteriaEvaluationFailure: testScope.Counter("criteria_failure"),
terminationCount: testScope.Counter("terminations"),
marshallingErrors: testScope.Counter("unpack_error"),
criterionEvaluationSuccessCount: testScope.Counter("criterion_success"),
criterionEvaluationFailureCount: testScope.Counter("criterion_failure"),
terminationCount: testScope.Counter("terminations"),
marshallingErrorCount: testScope.Counter("unpack_error"),
}
ctx, cancel := context.WithTimeout(context.Background(), time.Second*10)
defer cancel()
......@@ -50,17 +93,17 @@ func TestTerminator(t *testing.T) {
awaitGaugeValue(ctx, t, testScope, "active_routines", 1)
es, err := store.GetExperiments(ctx, "type.googleapis.com/clutch.chaos.serverexperimentation.v1.HTTPFaultConfig", experimentationv1.GetExperimentsRequest_STATUS_RUNNING)
es, err := store.GetExperiments(ctx, testConfigType, experimentationv1.GetExperimentsRequest_STATUS_RUNNING)
assert.NoError(t, err)
assert.Len(t, es, 1)
criteria.update(true)
criterion.update(true)
// Ensure that we tear down the monitoring goroutines once we're done with an experiment.
awaitGaugeValue(ctx, t, testScope, "active_routines", 0)
assert.Equal(t, int64(1), testScope.Snapshot().Counters()["terminations+"].Value())
es, err = store.GetExperiments(ctx, "type.googleapis.com/clutch.chaos.serverexperimentation.v1.HTTPFaultConfig", experimentationv1.GetExperimentsRequest_STATUS_RUNNING)
es, err = store.GetExperiments(ctx, testConfigType, experimentationv1.GetExperimentsRequest_STATUS_RUNNING)
assert.NoError(t, err)
assert.Len(t, es, 0)
}
......@@ -107,13 +150,13 @@ func createTestExperiment(t *testing.T, faultHttpStatus int, storer *experiments
return experiment
}
type testCriteria struct {
type testCriterion struct {
evaluation bool
sync.Mutex
}
func (t *testCriteria) ShouldTerminate(experiment *experimentationv1.Experiment, experimentConfig proto.Message) (string, error) {
func (t *testCriterion) ShouldTerminate(experiment *experimentationv1.Experiment, experimentConfig proto.Message) (string, error) {
t.Lock()
defer t.Unlock()
......@@ -124,7 +167,7 @@ func (t *testCriteria) ShouldTerminate(experiment *experimentationv1.Experiment,
return "", nil
}
func (t *testCriteria) update(evaluation bool) {
func (t *testCriterion) update(evaluation bool) {
t.Lock()
defer t.Unlock()
......
......@@ -7846,6 +7846,181 @@ export namespace clutch {
}
}
 
/** Namespace chaos. */
namespace chaos {
/** Namespace experimentation. */
namespace experimentation {
/** Namespace terminator. */
namespace terminator {
/** Namespace v1. */
namespace v1 {
/** Properties of a Config. */
interface IConfig {
/** Config perConfigTypeConfiguration */
perConfigTypeConfiguration?: ({ [k: string]: clutch.config.service.chaos.experimentation.terminator.v1.Config.IPerConfigTypeConfig }|null);
/** Config outerLoopInterval */
outerLoopInterval?: (google.protobuf.IDuration|null);
/** Config perExperimentCheckInterval */
perExperimentCheckInterval?: (google.protobuf.IDuration|null);
}
/** Represents a Config. */
class Config implements IConfig {
/**
* Constructs a new Config.
* @param [properties] Properties to set
*/
constructor(properties?: clutch.config.service.chaos.experimentation.terminator.v1.IConfig);
/** Config perConfigTypeConfiguration. */
public perConfigTypeConfiguration: { [k: string]: clutch.config.service.chaos.experimentation.terminator.v1.Config.IPerConfigTypeConfig };
/** Config outerLoopInterval. */
public outerLoopInterval?: (google.protobuf.IDuration|null);
/** Config perExperimentCheckInterval. */
public perExperimentCheckInterval?: (google.protobuf.IDuration|null);
/**
* Verifies a Config message.
* @param message Plain object to verify
* @returns `null` if valid, otherwise the reason why it is not
*/
public static verify(message: { [k: string]: any }): (string|null);
/**
* Creates a Config message from a plain object. Also converts values to their respective internal types.
* @param object Plain object
* @returns Config
*/
public static fromObject(object: { [k: string]: any }): clutch.config.service.chaos.experimentation.terminator.v1.Config;
/**
* Creates a plain object from a Config message. Also converts values to other types if specified.
* @param message Config
* @param [options] Conversion options
* @returns Plain object
*/
public static toObject(message: clutch.config.service.chaos.experimentation.terminator.v1.Config, options?: $protobuf.IConversionOptions): { [k: string]: any };
/**
* Converts this Config to JSON.
* @returns JSON object
*/
public toJSON(): { [k: string]: any };
}
namespace Config {
/** Properties of a PerConfigTypeConfig. */
interface IPerConfigTypeConfig {
/** PerConfigTypeConfig terminationCriteria */
terminationCriteria?: (google.protobuf.IAny[]|null);
}
/** Represents a PerConfigTypeConfig. */
class PerConfigTypeConfig implements IPerConfigTypeConfig {
/**
* Constructs a new PerConfigTypeConfig.
* @param [properties] Properties to set
*/
constructor(properties?: clutch.config.service.chaos.experimentation.terminator.v1.Config.IPerConfigTypeConfig);
/** PerConfigTypeConfig terminationCriteria. */
public terminationCriteria: google.protobuf.IAny[];
/**
* Verifies a PerConfigTypeConfig message.
* @param message Plain object to verify
* @returns `null` if valid, otherwise the reason why it is not
*/
public static verify(message: { [k: string]: any }): (string|null);
/**
* Creates a PerConfigTypeConfig message from a plain object. Also converts values to their respective internal types.
* @param object Plain object
* @returns PerConfigTypeConfig
*/
public static fromObject(object: { [k: string]: any }): clutch.config.service.chaos.experimentation.terminator.v1.Config.PerConfigTypeConfig;
/**
* Creates a plain object from a PerConfigTypeConfig message. Also converts values to other types if specified.
* @param message PerConfigTypeConfig
* @param [options] Conversion options
* @returns Plain object
*/
public static toObject(message: clutch.config.service.chaos.experimentation.terminator.v1.Config.PerConfigTypeConfig, options?: $protobuf.IConversionOptions): { [k: string]: any };
/**
* Converts this PerConfigTypeConfig to JSON.
* @returns JSON object
*/
public toJSON(): { [k: string]: any };
}
}
/** Properties of a MaxTimeTerminationCriterion. */
interface IMaxTimeTerminationCriterion {
/** MaxTimeTerminationCriterion maxDuration */
maxDuration?: (google.protobuf.IDuration|null);
}
/** Represents a MaxTimeTerminationCriterion. */
class MaxTimeTerminationCriterion implements IMaxTimeTerminationCriterion {
/**
* Constructs a new MaxTimeTerminationCriterion.
* @param [properties] Properties to set
*/
constructor(properties?: clutch.config.service.chaos.experimentation.terminator.v1.IMaxTimeTerminationCriterion);
/** MaxTimeTerminationCriterion maxDuration. */
public maxDuration?: (google.protobuf.IDuration|null);
/**
* Verifies a MaxTimeTerminationCriterion message.
* @param message Plain object to verify
* @returns `null` if valid, otherwise the reason why it is not
*/
public static verify(message: { [k: string]: any }): (string|null);
/**
* Creates a MaxTimeTerminationCriterion message from a plain object. Also converts values to their respective internal types.
* @param object Plain object
* @returns MaxTimeTerminationCriterion
*/
public static fromObject(object: { [k: string]: any }): clutch.config.service.chaos.experimentation.terminator.v1.MaxTimeTerminationCriterion;
/**
* Creates a plain object from a MaxTimeTerminationCriterion message. Also converts values to other types if specified.
* @param message MaxTimeTerminationCriterion
* @param [options] Conversion options
* @returns Plain object
*/
public static toObject(message: clutch.config.service.chaos.experimentation.terminator.v1.MaxTimeTerminationCriterion, options?: $protobuf.IConversionOptions): { [k: string]: any };
/**
* Converts this MaxTimeTerminationCriterion to JSON.
* @returns JSON object
*/
public toJSON(): { [k: string]: any };
}
}
}
}
}
/** Namespace db. */
namespace db {
 
......
This diff is collapsed.
Supports Markdown
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment