Commit 82d35c33 authored by David Levanon's avatar David Levanon
Browse files

throttling by cpu usage

No related merge requests found
Showing with 48 additions and 14 deletions
+48 -14
......@@ -16,6 +16,7 @@ type AppStats struct {
MatchedPairs uint64 `json:"matchedPairs"`
DroppedTcpStreams uint64 `json:"droppedTcpStreams"`
LiveTcpStreams uint64 `json:"liveTcpStreams"`
ThrottledPackets uint64 `json:"throttledPackets"`
}
func (as *AppStats) IncMatchedPairs() {
......@@ -55,6 +56,10 @@ func (as *AppStats) DecLiveTcpStreams() {
atomic.AddUint64(&as.LiveTcpStreams, ^uint64(0))
}
func (as *AppStats) IncThrottledPackets() {
atomic.AddUint64(&as.ThrottledPackets, 1)
}
func (as *AppStats) UpdateProcessedBytes(size uint64) {
atomic.AddUint64(&as.ProcessedBytes, size)
}
......@@ -74,6 +79,7 @@ func (as *AppStats) DumpStats() *AppStats {
currentAppStats.TlsConnectionsCount = resetUint64(&as.TlsConnectionsCount)
currentAppStats.MatchedPairs = resetUint64(&as.MatchedPairs)
currentAppStats.DroppedTcpStreams = resetUint64(&as.DroppedTcpStreams)
currentAppStats.ThrottledPackets = resetUint64(&as.ThrottledPackets)
currentAppStats.LiveTcpStreams = as.LiveTcpStreams
return currentAppStats
......
......@@ -19,7 +19,6 @@ import (
"time"
"github.com/shirou/gopsutil/cpu"
"github.com/struCoder/pidusage"
"github.com/up9inc/mizu/logger"
"github.com/up9inc/mizu/tap/api"
"github.com/up9inc/mizu/tap/diagnose"
......@@ -45,6 +44,7 @@ var quiet = flag.Bool("quiet", false, "Be quiet regarding errors")
var hexdumppkt = flag.Bool("dumppkt", false, "Dump packet as hex")
var procfs = flag.String("procfs", "/proc", "The procfs directory, used when mapping host volumes into a container")
var ignoredPorts = flag.String("ignore-ports", "", "A comma separated list of ports to ignore")
var cpuLimit = flag.Float64("cpu-usage", 30.0, "Maximum cpu usage used by the tapper (approximately)")
// capture
var iface = flag.String("i", "en0", "Interface to read packets from")
......@@ -61,6 +61,7 @@ var memprofile = flag.String("memprofile", "", "Write memory profile")
type TapOpts struct {
HostMode bool
IgnoredPorts []uint16
cpuLimit float64
}
var extensions []*api.Extension // global
......@@ -124,7 +125,7 @@ func printNewTapTargets(success bool) {
}
}
func printPeriodicStats(cleaner *Cleaner) {
func printPeriodicStats(cleaner *Cleaner, assembler *tcpAssembler) {
statsPeriod := time.Second * time.Duration(*statsevery)
ticker := time.NewTicker(statsPeriod)
......@@ -154,21 +155,14 @@ func printPeriodicStats(cleaner *Cleaner) {
// At this moment
memStats := runtime.MemStats{}
runtime.ReadMemStats(&memStats)
sysInfo, err := pidusage.GetStat(os.Getpid())
if err != nil {
sysInfo = &pidusage.SysInfo{
CPU: -1,
Memory: -1,
}
}
logger.Log.Infof(
"mem: %d, goroutines: %d, cpu: %f, cores: %d/%d, rss: %f",
memStats.HeapAlloc,
runtime.NumGoroutine(),
sysInfo.CPU,
assembler.sysInfo.CPU,
logicalCoreCount,
physicalCoreCount,
sysInfo.Memory)
assembler.sysInfo.Memory)
// Since the last print
cleanStats := cleaner.dumpStats()
......@@ -220,6 +214,10 @@ func initializePassiveTapper(opts *TapOpts, outputItems chan *api.OutputChannelI
opts.IgnoredPorts = append(opts.IgnoredPorts, buildIgnoredPortsList(*ignoredPorts)...)
if *cpuLimit > 0 {
opts.cpuLimit = (*cpuLimit / 2) // user limit / 2 on purpose, the throttler is not strict enough
}
assembler := NewTcpAssembler(outputItems, streamsMap, opts)
return assembler
......@@ -240,7 +238,7 @@ func startPassiveTapper(streamsMap *tcpStreamMap, assembler *tcpAssembler) {
}
cleaner.start()
go printPeriodicStats(&cleaner)
go printPeriodicStats(&cleaner, assembler)
assembler.processPackets(*hexdumppkt, mainPacketInputChan)
......
......@@ -11,6 +11,7 @@ import (
"github.com/google/gopacket"
"github.com/google/gopacket/layers"
"github.com/google/gopacket/reassembly"
"github.com/struCoder/pidusage"
"github.com/up9inc/mizu/logger"
"github.com/up9inc/mizu/tap/api"
"github.com/up9inc/mizu/tap/dbgctl"
......@@ -28,6 +29,9 @@ type tcpAssembler struct {
ignoredPorts []uint16
liveStreams map[string]bool
liveStreamsLock sync.RWMutex
sysInfo *pidusage.SysInfo
cpuLimit float64
tapperPid int
}
// Context
......@@ -50,7 +54,10 @@ func NewTcpAssembler(outputItems chan *api.OutputChannelItem, streamsMap *tcpStr
a := &tcpAssembler{
ignoredPorts: opts.IgnoredPorts,
cpuLimit: opts.cpuLimit,
liveStreams: make(map[string]bool),
tapperPid: os.Getpid(),
sysInfo: &pidusage.SysInfo{CPU: -1, Memory: -1},
}
closeHandler := func(stream *tcpStream) {
......@@ -80,6 +87,7 @@ func NewTcpAssembler(outputItems chan *api.OutputChannelItem, streamsMap *tcpStr
a.streamPool = streamPool
a.streamFactory = streamFactory
a.Assembler = assembler
return a
}
......@@ -94,8 +102,11 @@ func (a *tcpAssembler) buildConnectionId(saddr string, daddr string, sport strin
}
func (a *tcpAssembler) shouldThrottleNewStreams(connectionId string) bool {
logger.Log.Infof("CURRENTLY %d LIVE STREAMS - %s", len(a.liveStreams), connectionId)
return false
if a.cpuLimit == 0 {
return false
}
return a.sysInfo.CPU > a.cpuLimit
}
func (a *tcpAssembler) connectionExists(connectionId string) bool {
......@@ -144,6 +155,7 @@ func (a *tcpAssembler) handlePacket(packetInfo *source.TcpPacketInfo, dumpPacket
if !a.connectionExists(connectionId) {
if a.shouldThrottleNewStreams(connectionId) {
diagnose.AppStats.IncThrottledPackets()
return done
}
}
......@@ -164,10 +176,13 @@ func (a *tcpAssembler) handlePacket(packetInfo *source.TcpPacketInfo, dumpPacket
func (a *tcpAssembler) processPackets(dumpPacket bool, packets <-chan source.TcpPacketInfo) {
signalChan := make(chan os.Signal, 1)
signal.Notify(signalChan, os.Interrupt)
ticker := time.NewTicker(1000 * time.Millisecond)
out:
for {
select {
case <-ticker.C:
a.updateUsage()
case packetInfo := <-packets:
if a.handlePacket(&packetInfo, dumpPacket) {
break out
......@@ -212,3 +227,18 @@ func (a *tcpAssembler) shouldIgnorePort(port uint16) bool {
return false
}
func (a *tcpAssembler) updateUsage() {
sysInfo, err := pidusage.GetStat(a.tapperPid)
if err != nil {
logger.Log.Warningf("Unable to get CPU Usage for %d", a.tapperPid)
a.sysInfo = &pidusage.SysInfo{
CPU: -1,
Memory: -1,
}
return
}
a.sysInfo = sysInfo
}
Supports Markdown
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment