Unverified Commit cd8d57d5 authored by RecallSong's avatar RecallSong Committed by GitHub
Browse files

check container log only (#2644)

* check container log only

* change max time range

* rename decode

* fix invalid field

* add test file

* add test case
parent efb21e21
Showing with 139 additions and 14 deletions
+139 -14
...@@ -194,6 +194,7 @@ type Request interface { ...@@ -194,6 +194,7 @@ type Request interface {
const ( const (
defaultQueryCount = 50 defaultQueryCount = 50
maxQueryCount = 700 maxQueryCount = 700
maxTimeRange = 30 * 24 * int64(time.Hour)
) )
func getLimit(count int64) int { func getLimit(count int64) int {
...@@ -223,8 +224,11 @@ func toQuerySelector(req Request) (*storage.Selector, error) { ...@@ -223,8 +224,11 @@ func toQuerySelector(req Request) (*storage.Selector, error) {
if sel.End <= 0 { if sel.End <= 0 {
sel.End = time.Now().UnixNano() sel.End = time.Now().UnixNano()
} }
if sel.Start < 0 { if sel.Start <= 0 {
sel.Start = 0 sel.Start = sel.End - maxTimeRange
if sel.Start < 0 {
sel.Start = 0
}
} else if sel.Start > 0 && req.GetCount() >= 0 { } else if sel.Start > 0 && req.GetCount() >= 0 {
// avoid duplicating previous log // avoid duplicating previous log
// TODO: check by offset // TODO: check by offset
...@@ -233,6 +237,8 @@ func toQuerySelector(req Request) (*storage.Selector, error) { ...@@ -233,6 +237,8 @@ func toQuerySelector(req Request) (*storage.Selector, error) {
if sel.End < sel.Start { if sel.End < sel.Start {
return nil, errors.NewInvalidParameterError("(start,end]", "start must be less than end") return nil, errors.NewInvalidParameterError("(start,end]", "start must be less than end")
} else if sel.End-sel.Start > maxTimeRange {
return nil, errors.NewInvalidParameterError("(start,end]", "time range is too large")
} }
if len(req.GetRequestId()) > 0 { if len(req.GetRequestId()) > 0 {
......
// Copyright (c) 2021 Terminus, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package query
import (
"math"
"reflect"
"testing"
"github.com/erda-project/erda/modules/core/monitor/log/storage"
)
func Test_toQuerySelector(t *testing.T) {
tests := []struct {
name string
req Request
want *storage.Selector
wantErr bool
}{
{
req: &LogRequest{
ID: "testid",
Start: 1,
End: math.MaxInt64,
},
wantErr: true,
},
{
req: &LogRequest{
ID: "testid",
Start: 10,
End: 1,
},
wantErr: true,
},
{
req: &LogRequest{
ID: "testid",
Start: 0,
End: 10,
},
want: &storage.Selector{
Start: 0,
End: 10,
Scheme: "",
Filters: []*storage.Filter{
{
Key: "id",
Op: storage.EQ,
Value: "testid",
},
},
},
},
{
req: &LogRequest{
ID: "testid",
Start: 1,
End: 100,
Count: -200,
Source: "container",
},
want: &storage.Selector{
Start: 1,
End: 100,
Scheme: "container",
Filters: []*storage.Filter{
{
Key: "id",
Op: storage.EQ,
Value: "testid",
},
{
Key: "source",
Op: storage.EQ,
Value: "container",
},
},
},
},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
got, err := toQuerySelector(tt.req)
if (err != nil) != tt.wantErr {
t.Errorf("toQuerySelector() error = %v, wantErr %v", err, tt.wantErr)
return
}
if !reflect.DeepEqual(got, tt.want) {
t.Errorf("toQuerySelector() = %v, want %v", got, tt.want)
}
})
}
}
...@@ -99,7 +99,8 @@ func (p *provider) Iterator(ctx context.Context, sel *storage.Selector) (_ store ...@@ -99,7 +99,8 @@ func (p *provider) Iterator(ctx context.Context, sel *storage.Selector) (_ store
if _, ok := values["id"]; !ok { if _, ok := values["id"]; !ok {
return nil, fmt.Errorf("id is required") return nil, fmt.Errorf("id is required")
} }
if _, ok := values["source"]; !ok { source, ok := values["source"]
if !ok {
return nil, fmt.Errorf("source is required") return nil, fmt.Errorf("source is required")
} }
_, hasStream := values["stream"] _, hasStream := values["stream"]
...@@ -113,11 +114,13 @@ func (p *provider) Iterator(ctx context.Context, sel *storage.Selector) (_ store ...@@ -113,11 +114,13 @@ func (p *provider) Iterator(ctx context.Context, sel *storage.Selector) (_ store
if err != nil { if err != nil {
return nil, err return nil, err
} }
if meta == nil { if source == "container" {
return storekit.EmptyIterator{}, nil if meta == nil {
} return storekit.EmptyIterator{}, nil
if meta.Tags["dice_application_id"] != applicationID { }
return storekit.EmptyIterator{}, nil if meta.Tags["dice_application_id"] != applicationID {
return storekit.EmptyIterator{}, nil
}
} }
table = p.getTableName(meta) table = p.getTableName(meta)
} else if len(clusterName) > 0 { } else if len(clusterName) > 0 {
......
...@@ -26,7 +26,7 @@ import ( ...@@ -26,7 +26,7 @@ import (
// Skip . // Skip .
var Skip = errors.New("skip") var Skip = errors.New("skip")
func (p *provider) decodeLog(key, value []byte, topic *string, timestamp time.Time) (interface{}, error) { func (p *provider) decodeData(key, value []byte, topic *string, timestamp time.Time) (interface{}, error) {
data := &metric.Metric{} data := &metric.Metric{}
if err := json.Unmarshal(value, data); err != nil { if err := json.Unmarshal(value, data); err != nil {
p.stats.DecodeError(value, err) p.stats.DecodeError(value, err)
......
...@@ -70,7 +70,7 @@ func (p *provider) Init(ctx servicehub.Context) error { ...@@ -70,7 +70,7 @@ func (p *provider) Init(ctx servicehub.Context) error {
// add consumer task // add consumer task
for i := 0; i < p.Cfg.Parallelism; i++ { for i := 0; i < p.Cfg.Parallelism; i++ {
ctx.AddTask(func(ctx context.Context) error { ctx.AddTask(func(ctx context.Context) error {
r, err := p.Kafka.NewBatchReader(&p.Cfg.Input, kafka.WithReaderDecoder(p.decodeLog)) r, err := p.Kafka.NewBatchReader(&p.Cfg.Input, kafka.WithReaderDecoder(p.decodeData))
if err != nil { if err != nil {
return err return err
} }
......
...@@ -142,7 +142,10 @@ const ( ...@@ -142,7 +142,10 @@ const (
MetricTagTTLFixed = "fixed" MetricTagTTLFixed = "fixed"
) )
const esMaxValue = float64(math.MaxInt64) const (
esMaxValue = float64(math.MaxInt64)
esMinValue = float64(math.MinInt64)
)
func processInvalidFields(m *metric.Metric) { func processInvalidFields(m *metric.Metric) {
fields := m.Fields fields := m.Fields
...@@ -152,7 +155,7 @@ func processInvalidFields(m *metric.Metric) { ...@@ -152,7 +155,7 @@ func processInvalidFields(m *metric.Metric) {
for k, v := range fields { for k, v := range fields {
switch val := v.(type) { switch val := v.(type) {
case float64: case float64:
if val > esMaxValue { if val < esMinValue || esMaxValue < val {
fields[k] = strconv.FormatFloat(val, 'f', -1, 64) fields[k] = strconv.FormatFloat(val, 'f', -1, 64)
} }
} }
......
...@@ -107,10 +107,14 @@ func (w *channelWriter) run(bw BatchWriter, capacity int, timeout time.Duration, ...@@ -107,10 +107,14 @@ func (w *channelWriter) run(bw BatchWriter, capacity int, timeout time.Duration,
} }
// StdoutWriter . // StdoutWriter .
type StdoutWriter struct{} type StdoutWriter struct {
Filter func(val Data) bool
}
// DefaultStdoutWriter . // DefaultStdoutWriter .
var DefaultStdoutWriter = StdoutWriter{} var DefaultStdoutWriter = StdoutWriter{
Filter: func(val Data) bool { return true },
}
func (w StdoutWriter) Write(val Data) error { func (w StdoutWriter) Write(val Data) error {
w.WriteN(val) w.WriteN(val)
...@@ -119,6 +123,9 @@ func (w StdoutWriter) Write(val Data) error { ...@@ -119,6 +123,9 @@ func (w StdoutWriter) Write(val Data) error {
func (w StdoutWriter) WriteN(vals ...Data) (int, error) { func (w StdoutWriter) WriteN(vals ...Data) (int, error) {
for _, val := range vals { for _, val := range vals {
if w.Filter != nil && !w.Filter(val) {
continue
}
sb := &strings.Builder{} sb := &strings.Builder{}
enc := json.NewEncoder(sb) enc := json.NewEncoder(sb)
enc.SetIndent("", "\t") enc.SetIndent("", "\t")
......
Supports Markdown
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment