Unverified Commit 4d9b4ffa authored by yufeiyu's avatar yufeiyu Committed by GitHub
Browse files

Update DSP (#389)

Co-authored-by: default avataryuyufei <yufeiyu@tencent.com>
parent fb918cee
Showing with 90305 additions and 3253 deletions
+90305 -3253
......@@ -15,6 +15,7 @@ require (
github.com/stretchr/testify v1.7.0
golang.org/x/net v0.0.0-20211216030914-fe4d6282115f
google.golang.org/grpc v1.43.0
google.golang.org/protobuf v1.27.1
k8s.io/api v0.22.3
k8s.io/apimachinery v0.22.3
k8s.io/apiserver v0.22.3
......@@ -149,7 +150,6 @@ require (
golang.org/x/xerrors v0.0.0-20200804184101-5ec99f83aff1 // indirect
gomodules.xyz/jsonpatch/v2 v2.2.0 // indirect
google.golang.org/appengine v1.6.7 // indirect
google.golang.org/protobuf v1.27.1 // indirect
gopkg.in/inf.v0 v0.9.1 // indirect
gopkg.in/natefinch/lumberjack.v2 v2.0.0 // indirect
gopkg.in/warnings.v0 v0.1.1 // indirect
......@@ -169,7 +169,7 @@ require (
require (
cloud.google.com/go v0.84.0 // indirect
github.com/cespare/xxhash/v2 v2.1.2 // indirect
github.com/fsnotify/fsnotify v1.5.1 // indirect
github.com/fsnotify/fsnotify v1.5.1
github.com/json-iterator/go v1.1.12 // indirect
github.com/mattn/go-isatty v0.0.14 // indirect
github.com/robfig/cron/v3 v3.0.1
......
package dsp
import (
"math"
"math/cmplx"
"github.com/mjibson/go-dsp/fft"
"github.com/montanaflynn/stats"
)
func AutoCorrelation(samples []float64) []float64 {
N := len(samples)
if N == 0 {
return []float64{}
}
x := make([]float64, N)
mean, _ := stats.Mean(samples)
std, _ := stats.StdDevP(samples)
for i := range x {
x[i] = (samples[i] - mean) / std
}
f := fft.FFTReal(x)
var p []float64
for i := range f {
p = append(p, math.Pow(cmplx.Abs(f[i]), 2))
}
pi := fft.IFFTReal(p)
var ac []float64
for i := range pi {
ac = append(ac, real(pi[i])/float64(N))
}
return ac
}
......@@ -2,14 +2,15 @@ package dsp
import (
"math"
"math/rand"
"testing"
"github.com/go-echarts/go-echarts/v2/charts"
"github.com/go-echarts/go-echarts/v2/opts"
"github.com/go-echarts/go-echarts/v2/types"
"github.com/stretchr/testify/assert"
)
func TestSignal_Period(t *testing.T) {
func TestAutoCorrelation(t *testing.T) {
epsilon := 1e-15
sampleRate := 1000.0 // Hz
sampleInterval := 1.0 / sampleRate // seconds
......@@ -17,63 +18,44 @@ func TestSignal_Period(t *testing.T) {
for i := 0.0; i < 1; i += sampleInterval {
x = append(x, i)
}
frequency := 5.0
expected := []float64{
frequency,
frequency,
frequency,
frequency,
frequency,
}
y := make([]float64, len(x))
for i := range x {
y[i] = math.Sin(2.*math.Pi*frequency*x[i]) +
(1./3.)*math.Sin(3.*2.*math.Pi*frequency*x[i]) +
(1./5.)*math.Sin(5.*2.*math.Pi*frequency*x[i]) +
(1./7.)*math.Sin(7.*2.*math.Pi*frequency*x[i])
f := []func(float64) float64{
func(x float64) float64 {
return math.Sin(2.0*math.Pi*frequency*x) + 1.0
},
func(x float64) float64 {
return math.Sin(2.0*math.Pi*frequency*x+rand.Float64()) + 1.0
},
func(x float64) float64 {
return math.Sin(2.0*math.Pi*frequency*x) + 0.5*math.Sin(2.0*math.Pi*4*frequency*x) + 3
},
func(x float64) float64 {
return math.Sin(2.0*math.Pi*frequency*x) + rand.Float64()*2.0
},
func(x float64) float64 {
growthRatio := 0.1
period := 1.0 / frequency
return (1.0 + math.Floor(x/period)*growthRatio) * math.Sin(2.0*math.Pi*frequency*x)
},
}
sine := &Signal{
SampleRate: sampleRate,
Samples: y,
}
var lines []*charts.Line
for i := range expected {
var y []float64
for j := range x {
y = append(y, f[i](x[j]))
}
s := &Signal{
SampleRate: sampleRate,
Samples: y,
}
normalized, err := s.Normalize()
assert.NoError(t, err)
assert.InEpsilon(t, expected[i], normalized.Frequencies()[0], epsilon)
cor := AutoCorrelation(sine.Samples)
assert.InEpsilon(t, -1, cor[100], epsilon)
assert.InEpsilon(t, 1, cor[200], epsilon)
lines = append(lines, s.Plot()) //nolint // SA4010: this result of append is never used, except maybe in other appends
xAxis := make([]int, 0)
yAxis := make([]opts.LineData, 0)
for i := range cor {
xAxis = append(xAxis, i)
yAxis = append(yAxis, opts.LineData{Value: cor[i], Symbol: "cycle"})
}
acfLine := charts.NewLine()
acfLine.SetGlobalOptions(
charts.WithInitializationOpts(opts.Initialization{Width: "3000px", Theme: types.ThemeChalk}),
charts.WithTitleOpts(opts.Title{Title: "Auto Correlation"}))
acfLine.SetXAxis(xAxis).AddSeries("Auto Correlation", yAxis)
/*
Uncomment code below to see what above signals look like
Uncomment code below to see what the signal and its auto-correlation function look like
*/
//http.HandleFunc("/", func(w http.ResponseWriter, _ *http.Request) {
// page := components.NewPage()
// for i := range lines {
// page.AddCharts(lines[i])
// }
// page.AddCharts(sine.Plot(), acfLine)
// page.Render(w)
//})
//fmt.Println("Open your browser and access 'http://localhost:7001'")
......
......@@ -14,13 +14,13 @@ const (
defaultLowAmplitudeThreshold = 1.0
defaultMinNumOfSpectrumItems = 3
defaultMaxNumOfSpectrumItems = 100
defaultFFTMarginFraction = 0.1
defaultFFTMarginFraction = 0.0
defaultMaxValueMarginFraction = 0.0
defaultFFTMinValue = 0.01
)
type Estimator interface {
GetEstimation(signal *Signal, cycleDuration time.Duration) *Signal
GetEstimation(signal *Signal, periodLength time.Duration) *Signal
String() string
}
......@@ -50,18 +50,18 @@ type fftEstimator struct {
marginFraction float64
}
func (m *maxValueEstimator) GetEstimation(signal *Signal, cycleDuration time.Duration) *Signal {
nSamplesPerCycle := int(cycleDuration.Seconds() * signal.SampleRate)
estimation := make([]float64, 0, nSamplesPerCycle)
func (m *maxValueEstimator) GetEstimation(signal *Signal, periodLength time.Duration) *Signal {
nSamplesPerPeriod := int(periodLength.Seconds() * signal.SampleRate)
estimation := make([]float64, 0, nSamplesPerPeriod)
nSamples := len(signal.Samples)
nCycles := nSamples / nSamplesPerCycle
nPeriods := nSamples / nSamplesPerPeriod
for i := nSamples - nSamplesPerCycle; i < nSamples; i++ {
for i := nSamples - nSamplesPerPeriod; i < nSamples; i++ {
maxValue := signal.Samples[i]
for j := 1; j < nCycles; j++ {
if maxValue < signal.Samples[i-nSamplesPerCycle*j] {
maxValue = signal.Samples[i-nSamplesPerCycle*j]
for j := 1; j < nPeriods; j++ {
if maxValue < signal.Samples[i-nSamplesPerPeriod*j] {
maxValue = signal.Samples[i-nSamplesPerPeriod*j]
}
}
estimation = append(estimation, maxValue*(1.0+m.marginFraction))
......@@ -81,7 +81,7 @@ func (m *maxValueEstimator) String() string {
return fmt.Sprintf("Max Value Estimator {marginFraction: %f}", marginFraction)
}
func (f *fftEstimator) GetEstimation(signal *Signal, cycleDuration time.Duration) *Signal {
func (f *fftEstimator) GetEstimation(signal *Signal, periodLength time.Duration) *Signal {
minNumOfSpectrumItems, maxNumOfSpectrumItems := f.minNumOfSpectrumItems, f.maxNumOfSpectrumItems
if minNumOfSpectrumItems == 0 {
minNumOfSpectrumItems = defaultMinNumOfSpectrumItems
......@@ -146,15 +146,15 @@ func (f *fftEstimator) GetEstimation(signal *Signal, cycleDuration time.Duration
x := fft.IFFT(X)
nSamples := len(x)
nSamplesPerCycle := int(cycleDuration.Seconds() * signal.SampleRate)
nSamplesPerPeriod := int(periodLength.Seconds() * signal.SampleRate)
samples := make([]float64, nSamplesPerCycle)
for i := nSamples - nSamplesPerCycle; i < nSamples; i++ {
samples := make([]float64, nSamplesPerPeriod)
for i := nSamples - nSamplesPerPeriod; i < nSamples; i++ {
a := real(x[i])
if a <= 0.0 {
a = defaultFFTMinValue
}
samples[i+nSamplesPerCycle-nSamples] = a * (1.0 + marginFraction)
samples[i+nSamplesPerPeriod-nSamples] = a * (1.0 + marginFraction)
}
return &Signal{
......
......@@ -29,12 +29,12 @@ func TestFftEstimator_GetEstimation(t *testing.T) {
SampleRate: origSignal.SampleRate,
Samples: origSignal.Samples[:1440*7],
}
assert.True(t, origSignal.IsPeriodic(Day))
assert.Equal(t, Day, origSignal.FindPeriod())
e1 := &fftEstimator{}
e1 := &fftEstimator{marginFraction: 0.1}
s[1] = e1.GetEstimation(origSignal, Day)
e2 := &maxValueEstimator{}
e2 := &maxValueEstimator{marginFraction: 0.1}
s[2] = e2.GetEstimation(origSignal, Day)
x := make([]string, 0)
......
package dsp
import (
"math"
"math/cmplx"
"sort"
"time"
"github.com/mjibson/go-dsp/fft"
)
var PeriodicityAmplitudeThreshold = 0.
type FrequencySpectrum struct {
Amplitudes []float64
Frequencies []float64
}
// FrequencySpectrum returns the frequency spectrum of the signal.
func (s *Signal) FrequencySpectrum() *FrequencySpectrum {
// Use FFT to convert the signal from time to frequency domain
fs := fft.FFTReal(s.Samples)
sampleLength := float64(len(s.Samples))
var amplitudes []float64
for i := range fs {
// Calculate the modulus since the result of FFT is an array of complex number with both real and imaginary parts
amplitudes = append(amplitudes, cmplx.Abs(fs[i])/sampleLength)
}
spectrumLength := float64(len(amplitudes))
//Use the first half slice since the spectrum is conjugate symmetric
amplitudes = amplitudes[0 : len(amplitudes)/2]
var frequencies []float64
for i := range amplitudes {
// Calculate which frequencies the spectrum contains
frequencies = append(frequencies, float64(i)*s.SampleRate/spectrumLength)
}
return &FrequencySpectrum{
// Ignore zero frequency term since it only affects the signal's relative position
// on y-axis in time domain and has nothing to do with periodicity.
Amplitudes: amplitudes[1:],
Frequencies: frequencies[1:],
}
}
// Frequencies returns the signal frequency components in hertz in descending order.
func (s *Signal) Frequencies() []float64 {
f := s.FrequencySpectrum()
sort.Sort(sort.Reverse(f))
return f.Frequencies
}
// IsPeriodic checks whether the signal is periodic and its period is approximately
// equal to the given value
func (s *Signal) IsPeriodic(cycleDuration time.Duration) bool {
// The signal length must be at least double of the period
si, m := s.Truncate(cycleDuration)
if m < 2 {
return false
}
secondsPerCycle := cycleDuration.Seconds()
frequencies := si.Frequencies()
for _, freq := range frequencies {
t := 1.0 / freq
if t > secondsPerCycle {
return false
}
m := secondsPerCycle / t
epsilon := math.Abs(m - float64(int(m)))
if epsilon < 1e-3 {
return true
}
}
return false
}
func (f *FrequencySpectrum) Len() int {
return len(f.Amplitudes)
}
func (f *FrequencySpectrum) Less(i, j int) bool {
return f.Amplitudes[i] < f.Amplitudes[j]
}
func (f *FrequencySpectrum) Swap(i, j int) {
f.Amplitudes[i], f.Amplitudes[j] = f.Amplitudes[j], f.Amplitudes[i]
f.Frequencies[i], f.Frequencies[j] = f.Frequencies[j], f.Frequencies[i]
}
package dsp
// Use linear regression to fit x in [i-k, i+k]
func isPeak(x []float64, i int, k int) bool {
n := len(x)
if i < 1 || i > n-1 {
return false
}
l := max(0, i-k)
r := min(n-1, i+k)
p := []point{}
for j := l; j <= i; j++ {
p = append(p, point{x: float64(j), y: x[j]})
}
slope, _ := linearRegressionLSE(p)
if slope <= 0 {
return false
}
p = []point{}
for j := i; j <= r; j++ {
p = append(p, point{x: float64(j), y: x[j]})
}
slope, _ = linearRegressionLSE(p)
if slope >= 0 {
return false
}
return true
}
func min(x, y int) int {
if x <= y {
return x
}
return y
}
func max(x, y int) int {
if x >= y {
return x
}
return y
}
type point struct {
x, y float64
}
// y_hat = ax + b
func linearRegressionLSE(points []point) (float64, float64) {
var sumX, sumY, sumXY, sumXX float64
for _, p := range points {
sumX += p.x
sumY += p.y
sumXY += p.x * p.y
sumXX += p.x * p.x
}
n := float64(len(points))
a := (n*sumXY - sumX*sumY) / (n*sumXX - sumX*sumX)
b := (sumY - a*sumX) / n
return a, b
}
package dsp
import (
"testing"
"github.com/go-echarts/go-echarts/v2/charts"
"github.com/go-echarts/go-echarts/v2/opts"
"github.com/go-echarts/go-echarts/v2/types"
)
func TestLinerRegression(t *testing.T) {
var s, _ = readCsvFile("test_data/input8.csv")
s.Samples = s.Samples[:1440*14]
cor := AutoCorrelation(s.Samples)
xAxis := make([]int, 0)
yAxis := make([]opts.LineData, 0)
for i := range cor {
xAxis = append(xAxis, i)
yAxis = append(yAxis, opts.LineData{Value: cor[i], Symbol: "cycle"})
}
acfLine := charts.NewLine()
acfLine.SetGlobalOptions(
charts.WithInitializationOpts(opts.Initialization{Width: "3000px", Theme: types.ThemeChalk}),
charts.WithTitleOpts(opts.Title{Title: "Auto Correlation"}))
acfLine.SetXAxis(xAxis).AddSeries("Auto Correlation", yAxis)
xAxis = make([]int, 0)
yAxis = make([]opts.LineData, 0)
for i := 1440*7 - 240; i < 1440*7+240; i++ {
xAxis = append(xAxis, i)
yAxis = append(yAxis, opts.LineData{Value: cor[i], Symbol: "cycle"})
}
line := charts.NewLine()
line.SetGlobalOptions(
charts.WithInitializationOpts(opts.Initialization{Width: "1000px", Theme: types.ThemeMacarons}),
charts.WithTitleOpts(opts.Title{Title: ""}))
line.SetXAxis(xAxis).AddSeries("", yAxis)
points := []point{}
xAxis = make([]int, 0)
yAxis = make([]opts.LineData, 0)
// left
for i := 1440*7 - 240; i < 1440*7; i++ {
points = append(points, point{x: float64(i), y: cor[i]})
xAxis = append(xAxis, i)
}
a, b := linearRegressionLSE(points)
for i := range xAxis {
yAxis = append(yAxis, opts.LineData{Value: a*float64(xAxis[i]) + b, Symbol: "cycle"})
}
// right
points = []point{}
for i := 1440 * 7; i < 1440*7+240; i++ {
points = append(points, point{x: float64(i), y: cor[i]})
xAxis = append(xAxis, i)
}
a, b = linearRegressionLSE(points)
for i := range xAxis[240:] {
yAxis = append(yAxis, opts.LineData{Value: a*float64(xAxis[240+i]) + b, Symbol: "cycle"})
}
linerLine := charts.NewLine()
linerLine.SetXAxis(xAxis).AddSeries("", yAxis)
line.Overlap(linerLine)
/*
Uncomment code below to see what auto-correlation function and its liner regression look like
*/
//http.HandleFunc("/", func(w http.ResponseWriter, _ *http.Request) {
// page := components.NewPage()
// page.AddCharts(s.Plot(), acfLine, line)
// page.Render(w)
//})
//fmt.Println("Open your browser and access 'http://localhost:7001'")
//http.ListenAndServe(":7001", nil)
}
func TestLinerRegression2(t *testing.T) {
var s, _ = readCsvFile("test_data/input10.csv")
cor := AutoCorrelation(s.Samples)
xAxis := make([]int, 0)
yAxis := make([]opts.LineData, 0)
for i := range cor {
xAxis = append(xAxis, i)
yAxis = append(yAxis, opts.LineData{Value: cor[i], Symbol: "cycle"})
}
acfLine := charts.NewLine()
acfLine.SetGlobalOptions(
charts.WithInitializationOpts(opts.Initialization{Width: "3000px", Theme: types.ThemeChalk}),
charts.WithTitleOpts(opts.Title{Title: "Auto Correlation"}))
acfLine.SetXAxis(xAxis).AddSeries("Auto Correlation", yAxis)
xAxis = make([]int, 0)
yAxis = make([]opts.LineData, 0)
for i := 1440 - 240; i < 1440+240; i++ {
xAxis = append(xAxis, i)
yAxis = append(yAxis, opts.LineData{Value: cor[i], Symbol: "cycle"})
}
line := charts.NewLine()
line.SetGlobalOptions(
charts.WithInitializationOpts(opts.Initialization{Width: "1000px", Theme: types.ThemeMacarons}),
charts.WithTitleOpts(opts.Title{Title: ""}))
line.SetXAxis(xAxis).AddSeries("", yAxis)
points := []point{}
xAxis = make([]int, 0)
yAxis = make([]opts.LineData, 0)
// left
for i := 1440 - 240; i < 1440; i++ {
points = append(points, point{x: float64(i), y: cor[i]})
xAxis = append(xAxis, i)
}
a, b := linearRegressionLSE(points)
for i := range xAxis {
yAxis = append(yAxis, opts.LineData{Value: a*float64(xAxis[i]) + b, Symbol: "cycle"})
}
// right
points = []point{}
for i := 1440; i < 1440+240; i++ {
points = append(points, point{x: float64(i), y: cor[i]})
xAxis = append(xAxis, i)
}
a, b = linearRegressionLSE(points)
for i := range xAxis[240:] {
yAxis = append(yAxis, opts.LineData{Value: a*float64(xAxis[240+i]) + b, Symbol: "cycle"})
}
linerLine := charts.NewLine()
linerLine.SetXAxis(xAxis).AddSeries("", yAxis)
line.Overlap(linerLine)
/*
Uncomment code below to see what auto-correlation function and its liner regression look like
*/
//http.HandleFunc("/", func(w http.ResponseWriter, _ *http.Request) {
// page := components.NewPage()
// page.AddCharts(s.Plot(), acfLine, line)
// page.Render(w)
//})
//fmt.Println("Open your browser and access 'http://localhost:7001'")
//http.ListenAndServe(":7001", nil)
}
......@@ -56,89 +56,17 @@ func (p *periodicSignalPrediction) QueryRealtimePredictedValuesOnce(ctx context.
panic("implement me")
}
func preProcessTimeSeriesList(tsList []*common.TimeSeries, config *internalConfig) ([]*common.TimeSeries, error) {
var wg sync.WaitGroup
n := len(tsList)
wg.Add(n)
tsCh := make(chan *common.TimeSeries, n)
for _, ts := range tsList {
go func(ts *common.TimeSeries) {
defer wg.Done()
if err := preProcessTimeSeries(ts, config, Hour); err != nil {
klog.ErrorS(err, "Dsp failed to pre process time series.")
} else {
tsCh <- ts
}
}(ts)
}
wg.Wait()
close(tsCh)
tsList = make([]*common.TimeSeries, 0, n)
for ts := range tsCh {
tsList = append(tsList, ts)
}
return tsList, nil
}
func preProcessTimeSeries(ts *common.TimeSeries, config *internalConfig, unit time.Duration) error {
if ts == nil || len(ts.Samples) == 0 {
return fmt.Errorf("empty time series")
}
intervalSeconds := int64(config.historyResolution.Seconds())
for i := 1; i < len(ts.Samples); i++ {
diff := ts.Samples[i].Timestamp - ts.Samples[i-1].Timestamp
// If a gap in time series is larger than one hour,
// drop all samples before [i].
if diff > 3600 {
ts.Samples = ts.Samples[i:]
return preProcessTimeSeries(ts, config, unit)
}
// The samples should be in chronological order.
// If the difference between two consecutive sample timestamps is not integral multiple of interval,
// the time series is not valid.
if diff%intervalSeconds != 0 || diff <= 0 {
return fmt.Errorf("invalid time series")
}
}
newSamples := []common.Sample{ts.Samples[0]}
for i := 1; i < len(ts.Samples); i++ {
times := (ts.Samples[i].Timestamp - ts.Samples[i-1].Timestamp) / intervalSeconds
unitDiff := (ts.Samples[i].Value - ts.Samples[i-1].Value) / float64(times)
// Fill the missing samples if any
for j := int64(1); j < times; j++ {
s := common.Sample{
Value: ts.Samples[i-1].Value + unitDiff*float64(j),
Timestamp: ts.Samples[i-1].Timestamp + intervalSeconds*j,
}
newSamples = append(newSamples, s)
}
newSamples = append(newSamples, ts.Samples[i])
func findPeriod(ts *common.TimeSeries, sampleInterval time.Duration) time.Duration {
signal := SamplesToSignal(ts.Samples, sampleInterval)
si, m := signal.Truncate(Week)
if m > 1 {
return si.FindPeriod()
}
// Truncate samples of integral multiple of unit
secondsPerUnit := int64(unit.Seconds())
samplesPerUnit := int(secondsPerUnit / intervalSeconds)
beginIndex := len(newSamples)
for beginIndex-samplesPerUnit >= 0 {
beginIndex -= samplesPerUnit
si, m = signal.Truncate(Day)
if m > 1 {
return si.FindPeriod()
}
ts.Samples = newSamples[beginIndex:]
return nil
}
// isPeriodicTimeSeries returns time series with specified periodicity
func isPeriodicTimeSeries(ts *common.TimeSeries, sampleInterval time.Duration, cycleDuration time.Duration) bool {
signal := SamplesToSignal(ts.Samples, sampleInterval)
return signal.IsPeriodic(cycleDuration)
return -1
}
func SamplesToSignal(samples []common.Sample, sampleInterval time.Duration) *Signal {
......@@ -290,49 +218,35 @@ func (p *periodicSignalPrediction) updateAggregateSignals(queryExpr string, hist
}
var chosenEstimator Estimator
var signal *Signal
var nCycles int
var cycleDuration time.Duration = 0
if isPeriodicTimeSeries(ts, config.historyResolution, Hour) {
cycleDuration = Hour
klog.V(4).InfoS("This is a periodic time series.", "queryExpr", queryExpr, "labels", ts.Labels, "cycleDuration", cycleDuration)
} else if isPeriodicTimeSeries(ts, config.historyResolution, Day) {
cycleDuration = Day
klog.V(4).InfoS("This is a periodic time series.", "queryExpr", queryExpr, "labels", ts.Labels, "cycleDuration", cycleDuration)
} else if isPeriodicTimeSeries(ts, config.historyResolution, Week) {
cycleDuration = Week
klog.V(4).InfoS("This is a periodic time series.", "queryExpr", queryExpr, "labels", ts.Labels, "cycleDuration", cycleDuration)
var nPeriods int
var periodLength time.Duration = 0
p := findPeriod(ts, config.historyResolution)
if p == Day || p == Week {
periodLength = p
klog.V(4).InfoS("This is a periodic time series.", "queryExpr", queryExpr, "labels", ts.Labels, "periodLength", periodLength)
} else {
klog.V(4).InfoS("This is not a periodic time series.", "queryExpr", queryExpr, "labels", ts.Labels)
}
if cycleDuration > 0 {
if periodLength > 0 {
signal = SamplesToSignal(ts.Samples, config.historyResolution)
signal, nCycles = signal.Truncate(cycleDuration)
if nCycles >= 2 {
chosenEstimator = bestEstimator(queryExpr, config.estimators, signal, nCycles, cycleDuration)
signal, nPeriods = signal.Truncate(periodLength)
if nPeriods >= 2 {
chosenEstimator = bestEstimator(queryExpr, config.estimators, signal, nPeriods, periodLength)
}
}
if chosenEstimator != nil {
estimatedSignal := chosenEstimator.GetEstimation(signal, cycleDuration)
estimatedSignal := chosenEstimator.GetEstimation(signal, periodLength)
intervalSeconds := int64(config.historyResolution.Seconds())
nextTimestamp := ts.Samples[len(ts.Samples)-1].Timestamp + intervalSeconds
// Hack(temporary):
// Because the dsp predict only append one cycle points, when the cycle is hour, then estimate signal samples only contains at most one hour points
// This leads to tsp predictWindowSeconds more than 3600 will be always out of date. because the predicted data end point timestamp is always ts.Samples[len(ts.Samples)-1].Timestamp+ Hour in one model update interval loop
// If its cycle is hour, then we append 24 hour points to avoid the out of dated predicted data
// Alternative option 1: Reduce predictWindowSeconds in tsp less than one hour and model update interval to one hour too.
// Alternative option 2. Do not support hour cycle any more, because it is too short in production env. now the dsp can not handle hour cycle well.
cycles := 1
if cycleDuration == Hour {
cycles = 24
}
n := len(estimatedSignal.Samples)
samples := make([]common.Sample, n*cycles)
for c := 0; c < cycles; c++ {
samples := make([]common.Sample, n*nPeriods)
for k := 0; k < nPeriods; k++ {
for i := range estimatedSignal.Samples {
samples[i+c*n] = common.Sample{
samples[i+k*n] = common.Sample{
Value: estimatedSignal.Samples[i],
Timestamp: nextTimestamp,
}
......@@ -357,23 +271,23 @@ func (p *periodicSignalPrediction) updateAggregateSignals(queryExpr string, hist
p.a.SetSignals(queryExpr, signals)
}
func bestEstimator(id string, estimators []Estimator, signal *Signal, totalCycles int, cycleDuration time.Duration) Estimator {
samplesPerCycle := len(signal.Samples) / totalCycles
func bestEstimator(id string, estimators []Estimator, signal *Signal, nPeriods int, periodLength time.Duration) Estimator {
samplesPerPeriod := len(signal.Samples) / nPeriods
history := &Signal{
SampleRate: signal.SampleRate,
Samples: signal.Samples[:(totalCycles-1)*samplesPerCycle],
Samples: signal.Samples[:(nPeriods-1)*samplesPerPeriod],
}
actual := &Signal{
SampleRate: signal.SampleRate,
Samples: signal.Samples[(totalCycles-1)*samplesPerCycle:],
Samples: signal.Samples[(nPeriods-1)*samplesPerPeriod:],
}
minPE := math.MaxFloat64
var bestEstimator Estimator
for i := range estimators {
estimated := estimators[i].GetEstimation(history, cycleDuration)
estimated := estimators[i].GetEstimation(history, periodLength)
if estimated != nil {
pe, err := accuracy.PredictionError(actual.Samples, estimated.Samples)
klog.V(6).InfoS("Testing estimators ...", "key", id, "estimator", estimators[i].String(), "pe", pe, "error", err)
......@@ -384,7 +298,7 @@ func bestEstimator(id string, estimators []Estimator, signal *Signal, totalCycle
}
}
klog.V(4).InfoS("Got the best estimator.", "key", id, "estimator", bestEstimator.String(), "minPE", minPE, "totalCycles", totalCycles)
klog.V(4).InfoS("Got the best estimator.", "key", id, "estimator", bestEstimator.String(), "minPE", minPE, "periods", nPeriods)
return bestEstimator
}
......
package dsp
import (
"fmt"
"sync"
"time"
"github.com/montanaflynn/stats"
"k8s.io/klog/v2"
"github.com/gocrane/crane/pkg/common"
)
func fillMissingData(ts *common.TimeSeries, config *internalConfig, unit time.Duration) error {
if ts == nil || len(ts.Samples) == 0 {
return fmt.Errorf("empty time series")
}
intervalSeconds := int64(config.historyResolution.Seconds())
for i := 1; i < len(ts.Samples); i++ {
diff := ts.Samples[i].Timestamp - ts.Samples[i-1].Timestamp
// If a gap in time series is larger than one hour,
// drop all samples before [i].
if diff > 3600 {
ts.Samples = ts.Samples[i:]
return fillMissingData(ts, config, unit)
}
// The samples should be in chronological order.
// If the difference between two consecutive sample timestamps is not integral multiple of interval,
// the time series is not valid.
if diff%intervalSeconds != 0 || diff <= 0 {
return fmt.Errorf("invalid time series")
}
}
newSamples := []common.Sample{ts.Samples[0]}
for i := 1; i < len(ts.Samples); i++ {
times := (ts.Samples[i].Timestamp - ts.Samples[i-1].Timestamp) / intervalSeconds
unitDiff := (ts.Samples[i].Value - ts.Samples[i-1].Value) / float64(times)
// Fill the missing samples if any
for j := int64(1); j < times; j++ {
s := common.Sample{
Value: ts.Samples[i-1].Value + unitDiff*float64(j),
Timestamp: ts.Samples[i-1].Timestamp + intervalSeconds*j,
}
newSamples = append(newSamples, s)
}
newSamples = append(newSamples, ts.Samples[i])
}
// Truncate samples of integral multiple of unit
secondsPerUnit := int64(unit.Seconds())
samplesPerUnit := int(secondsPerUnit / intervalSeconds)
beginIndex := len(newSamples)
for beginIndex-samplesPerUnit >= 0 {
beginIndex -= samplesPerUnit
}
ts.Samples = newSamples[beginIndex:]
return nil
}
func deTrend() error {
return nil
}
func removeExtremeOutliers(ts *common.TimeSeries) error {
values := make([]float64, len(ts.Samples))
for i := 0; i < len(ts.Samples); i++ {
values[i] = ts.Samples[i].Value
}
var highThreshold, lowThreshold float64
var err error
highThreshold, err = stats.Percentile(values, 99.9)
if err != nil {
return err
}
lowThreshold, err = stats.Percentile(values, 0.1)
if err != nil {
return err
}
for i := 1; i < len(ts.Samples); i++ {
if ts.Samples[i].Value > highThreshold || ts.Samples[i].Value < lowThreshold {
ts.Samples[i].Value = ts.Samples[i-1].Value
}
}
return nil
}
func preProcessTimeSeries(ts *common.TimeSeries, config *internalConfig, unit time.Duration) error {
var err error
err = fillMissingData(ts, config, unit)
if err == nil {
return err
}
_ = deTrend()
return removeExtremeOutliers(ts)
}
func preProcessTimeSeriesList(tsList []*common.TimeSeries, config *internalConfig) ([]*common.TimeSeries, error) {
var wg sync.WaitGroup
n := len(tsList)
wg.Add(n)
tsCh := make(chan *common.TimeSeries, n)
for _, ts := range tsList {
go func(ts *common.TimeSeries) {
defer wg.Done()
if err := preProcessTimeSeries(ts, config, Hour); err != nil {
klog.ErrorS(err, "Dsp failed to pre process time series.")
} else {
tsCh <- ts
}
}(ts)
}
wg.Wait()
close(tsCh)
tsList = make([]*common.TimeSeries, 0, n)
for ts := range tsCh {
tsList = append(tsList, ts)
}
return tsList, nil
}
package dsp
import (
"math/rand"
"testing"
"time"
"github.com/go-echarts/go-echarts/v2/charts"
"github.com/go-echarts/go-echarts/v2/opts"
"github.com/gocrane/crane/pkg/common"
"github.com/stretchr/testify/assert"
)
func TestRemoveExtremeOutliers(t *testing.T) {
n := 10080
ts := &common.TimeSeries{
Samples: make([]common.Sample, n),
}
rand.Seed(time.Now().UnixNano())
for i := 0; i < n; i++ {
ts.Samples[i] = common.Sample{
Value: rand.Float64(),
}
}
ts.Samples[1000].Value = 3.5
ts.Samples[3000].Value = -1
ts.Samples[8000].Value = 1000
_ = removeExtremeOutliers(ts)
assert.Equal(t, ts.Samples[999].Value, ts.Samples[1000].Value)
assert.Equal(t, ts.Samples[2999].Value, ts.Samples[3000].Value)
assert.Equal(t, ts.Samples[7999].Value, ts.Samples[8000].Value)
}
func TestRemoveExtremeOutliers2(t *testing.T) {
var s, _ = readCsvFile("test_data/input14.csv")
origLine := s.Plot("green")
ts := &common.TimeSeries{
Samples: make([]common.Sample, len(s.Samples)),
}
for i := 0; i < len(s.Samples); i++ {
ts.Samples[i].Value = s.Samples[i]
}
_ = removeExtremeOutliers(ts)
xAxis := make([]int, 0)
yAxis := make([]opts.LineData, 0)
for i := range ts.Samples {
xAxis = append(xAxis, i)
yAxis = append(yAxis, opts.LineData{Value: ts.Samples[i].Value, Symbol: "cycle"})
}
line := charts.NewLine()
line.SetXAxis(xAxis).AddSeries("", yAxis)
origLine.Overlap(line)
/*
Uncomment code below to see what the original signal and the one after
getting removed outliers look like
*/
//http.HandleFunc("/", func(w http.ResponseWriter, _ *http.Request) {
// page := components.NewPage()
// page.AddCharts(origLine)
// page.Render(w)
//})
//fmt.Println("Open your browser and access 'http://localhost:7001'")
//http.ListenAndServe(":7001", nil)
}
......@@ -3,6 +3,8 @@ package dsp
import (
"fmt"
"math/cmplx"
"math/rand"
"sort"
"time"
"github.com/go-echarts/go-echarts/v2/charts"
......@@ -11,6 +13,12 @@ import (
"github.com/mjibson/go-dsp/fft"
)
var (
// MaxAutoCorrelationPeakSearchIntervalSeconds is the maximum search interval for validating if
// a periodicity hint is a peak of the ACF.
MaxAutoCorrelationPeakSearchIntervalSeconds = (time.Hour * 4).Seconds()
)
// Signal represents a discrete signal.
type Signal struct {
// SampleRate is the sampling rate in hertz
......@@ -159,11 +167,73 @@ func (s *Signal) Filter(threshold float64) *Signal {
}
}
func (s *Signal) FindPeriod() time.Duration {
x := make([]float64, len(s.Samples))
copy(x, s.Samples)
N := len(s.Samples)
// Use FFT to generate the periodogram of a random permutation of the samples, record
// its maximum power argmax|X(f)|. Repeat above operation 100 times, and use the 99th
// largest power as the threshold.
var maxPowers []float64
rand.Seed(time.Now().UnixNano())
for i := 0; i < 100; i++ {
rand.Shuffle(len(x), func(i, j int) {
x[i], x[j] = x[j], x[i]
})
X := fft.FFTReal(x)
pmax := 0.
for k := 1; k < len(X)/2; k++ {
p := cmplx.Abs(X[k])
if p > pmax {
pmax = p
}
}
maxPowers = append(maxPowers, pmax)
}
sort.Float64s(maxPowers)
pThreshold := maxPowers[98]
X := fft.FFTReal(s.Samples)
var hints []int
for k := 2; k < len(X)/2; k++ {
p := cmplx.Abs(X[k])
// If a frequency has more power than the threshold, regard its period as
// a candidate fundamental period for the further verification.
if p > pThreshold {
if len(hints) == 0 || hints[len(hints)-1] > N/k {
hints = append(hints, N/k)
}
}
}
// Use auto correlation function (ACF) to verify the candidate periods.
// The value of the fundamental period should be the 'highest peak' in the graph of ACF.
cor := AutoCorrelation(s.Samples)
maxCorVal := 0.
j := -1
maxR := int(MaxAutoCorrelationPeakSearchIntervalSeconds * s.SampleRate)
for i := range hints {
r := min(maxR, hints[i]/2)
if isPeak(cor, hints[i], r) && maxCorVal < cor[hints[i]] {
j = i
maxCorVal = cor[hints[j]]
}
}
if j >= 0 {
return time.Duration(float64(hints[j])/s.SampleRate) * time.Second
} else {
return -1
}
}
func (s *Signal) String() string {
return fmt.Sprintf("SampleRate: %.5fHz, Samples: %v, Duration: %.1fs", s.SampleRate, len(s.Samples), s.Duration())
}
func (s *Signal) Plot(o ...charts.GlobalOpts) *charts.Line {
func (s *Signal) Plot(color string, o ...charts.GlobalOpts) *charts.Line {
x := make([]string, 0)
y := make([]opts.LineData, 0)
for i := 0; i < s.Num(); i++ {
......@@ -178,7 +248,16 @@ func (s *Signal) Plot(o ...charts.GlobalOpts) *charts.Line {
if o != nil {
line.SetGlobalOptions(o...)
}
line.SetXAxis(x).AddSeries("sample value", y)
if color != "" {
line.SetXAxis(x).AddSeries("sample value", y, charts.WithAreaStyleOpts(
opts.AreaStyle{
Color: color,
Opacity: 0.1,
}),
charts.WithLineStyleOpts(opts.LineStyle{Color: color}))
} else {
line.SetXAxis(x).AddSeries("sample value", y)
}
return line
}
......@@ -81,35 +81,40 @@ func TestSignal_Denormalize(t *testing.T) {
assert.InEpsilonSlice(t, signal.Samples, denormalized.Samples, amplitude*0.01)
}
func TestSignal_IsPeriodic(t *testing.T) {
func TestSignal_FindPeriod(t *testing.T) {
signals := make([]*Signal, nInputs)
for i := 0; i < nInputs; i++ {
s, err := readCsvFile(fmt.Sprintf("test_data/input%d.csv", i))
assert.NoError(t, err)
s, _ = s.Truncate(Week)
signals[i] = s
}
cycleDurations := []time.Duration{
Day,
Day,
Day,
Day,
Day,
Day,
signals[15].SampleRate = 1. / 15.
periods := []time.Duration{
Day,
Day,
Week,
-1,
-1,
-1,
-1,
-1,
Week,
Day,
Day,
Day,
Day,
Day,
Day,
Day,
Week,
Day, // i: 11
Week,
Week,
Week,
Week,
}
for i := 0; i < nInputs; i++ {
assert.Equal(t, periodic[i], signals[i].IsPeriodic(cycleDurations[i]))
if periodic[i] {
assert.Equal(t, periods[i], signals[i].FindPeriod(), "i: %d", i)
}
}
}
......@@ -132,19 +137,19 @@ func TestSignal_IsPeriodic(t *testing.T) {
// } else {
// o = charts.WithInitializationOpts(opts.Initialization{Width: "2000px", Theme: types.ThemeRoma})
// }
// p = p.AddCharts(signals[i].Plot(o))
// p = p.AddCharts(signals[i].Plot("", o))
// }
// p.Render(w)
// _ = p.Render(w)
// })
// fmt.Println("Open your browser and access 'http://localhost:7001'")
// http.ListenAndServe(":7001", nil)
// _ = http.ListenAndServe(":7001", nil)
//}
var periodic = []bool{
true,
true,
true,
true,
false, //true,
false,
false,
false,
......
......@@ -16403,7 +16403,7 @@ ts,value
1634635260,41.705590
1634635320,40.784530
1634635380,41.533260
1634635440,41.834990
1634635440,0.834990
1634635500,41.530670
1634635560,42.468340
1634635620,41.820900
......
This diff is collapsed.
Supports Markdown
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment