Commit aa17e11e authored by David Levanon's avatar David Levanon
Browse files

errors map

parent b0b75e7c
Showing with 89 additions and 60 deletions
+89 -60
package tap
import (
"fmt"
"sync"
"github.com/romana/rlog"
)
type errorsMap struct {
errorsMap map[string]uint
outputLevel int
nErrors int
errorsMapMutex sync.Mutex
}
func NewErrorsMap(outputLevel int) *errorsMap {
return &errorsMap{
errorsMap: make(map[string]uint),
outputLevel: outputLevel,
}
}
/* minOutputLevel: Error will be printed only if outputLevel is above this value
* t: key for errorsMap (counting errors)
* s, a: arguments log.Printf
* Note: Too bad for perf that a... is evaluated
*/
func (e *errorsMap) logError(minOutputLevel int, t string, s string, a ...interface{}) {
e.errorsMapMutex.Lock()
e.nErrors++
nb := e.errorsMap[t]
e.errorsMap[t] = nb + 1
e.errorsMapMutex.Unlock()
if e.outputLevel >= minOutputLevel {
formatStr := fmt.Sprintf("%s: %s", t, s)
rlog.Errorf(formatStr, a...)
}
}
func (e *errorsMap) Error(t string, s string, a ...interface{}) {
e.logError(0, t, s, a...)
}
func (e *errorsMap) SilentError(t string, s string, a ...interface{}) {
e.logError(2, t, s, a...)
}
func (e *errorsMap) Debug(s string, a ...interface{}) {
rlog.Debugf(s, a...)
}
func (e *errorsMap) Trace(s string, a ...interface{}) {
rlog.Tracef(1, s, a...)
}
func (e *errorsMap) getErrorsSummary() (int, string) {
e.errorsMapMutex.Lock()
errorMapLen := len(e.errorsMap)
errorsSummery := fmt.Sprintf("%v", e.errorsMap)
e.errorsMapMutex.Unlock()
return errorMapLen, errorsSummery
}
......@@ -55,7 +55,7 @@ func initPrivateIPBlocks() {
} {
_, block, err := net.ParseCIDR(cidr)
if err != nil {
Error("Private-IP-Block-Parse", "parse error on %q: %v", cidr, err)
tapErrors.Error("Private-IP-Block-Parse", "parse error on %q: %v", cidr, err)
} else {
privateIPBlocks = append(privateIPBlocks, block)
}
......
......@@ -65,6 +65,7 @@ var staleTimeoutSeconds = flag.Int("staletimout", 120, "Max time in seconds to k
var memprofile = flag.String("memprofile", "", "Write memory profile")
var appStats = api.AppStats{}
var tapErrors *errorsMap
// global
var stats struct {
......@@ -89,47 +90,12 @@ type TapOpts struct {
HostMode bool
}
var outputLevel int
var errorsMap map[string]uint
var errorsMapMutex sync.Mutex
var nErrors uint
//lint:ignore U1000 will be used in the future
var ownIps []string // global
var hostMode bool // global
var extensions []*api.Extension // global
var filteringOptions *api.TrafficFilteringOptions // global
/* minOutputLevel: Error will be printed only if outputLevel is above this value
* t: key for errorsMap (counting errors)
* s, a: arguments log.Printf
* Note: Too bad for perf that a... is evaluated
*/
func logError(minOutputLevel int, t string, s string, a ...interface{}) {
errorsMapMutex.Lock()
nErrors++
nb := errorsMap[t]
errorsMap[t] = nb + 1
errorsMapMutex.Unlock()
if outputLevel >= minOutputLevel {
formatStr := fmt.Sprintf("%s: %s", t, s)
rlog.Errorf(formatStr, a...)
}
}
func Error(t string, s string, a ...interface{}) {
logError(0, t, s, a...)
}
func SilentError(t string, s string, a ...interface{}) {
logError(2, t, s, a...)
}
func Debug(s string, a ...interface{}) {
rlog.Debugf(s, a...)
}
func Trace(s string, a ...interface{}) {
rlog.Tracef(1, s, a...)
}
func inArrayInt(arr []int, valueToCheck int) bool {
for _, value := range arr {
if value == valueToCheck {
......@@ -223,6 +189,8 @@ func startPassiveTapper(outputItems chan *api.OutputChannelItem) {
streamsMap := NewTcpStreamMap()
go streamsMap.closeTimedoutTcpStreamChannels()
var outputLevel int
defer util.Run()()
if *debug {
outputLevel = 2
......@@ -231,7 +199,8 @@ func startPassiveTapper(outputItems chan *api.OutputChannelItem) {
} else if *quiet {
outputLevel = -1
}
errorsMap = make(map[string]uint)
tapErrors = NewErrorsMap(outputLevel)
if localhostIPs, err := getLocalhostIPs(); err != nil {
// TODO: think this over
......@@ -337,13 +306,11 @@ func startPassiveTapper(outputItems chan *api.OutputChannelItem) {
<-ticker.C
// Since the start
errorsMapMutex.Lock()
errorMapLen := len(errorsMap)
errorsSummery := fmt.Sprintf("%v", errorsMap)
errorsMapMutex.Unlock()
errorMapLen, errorsSummery := tapErrors.getErrorsSummary()
log.Printf("%v (errors: %v, errTypes:%v) - Errors Summary: %s",
time.Since(appStats.StartTime),
nErrors,
tapErrors.nErrors,
errorMapLen,
errorsSummery,
)
......@@ -440,14 +407,12 @@ func startPassiveTapper(outputItems chan *api.OutputChannelItem) {
done := *maxcount > 0 && int64(appStats.PacketsCount) >= *maxcount
if done {
errorsMapMutex.Lock()
errorMapLen := len(errorsMap)
errorsMapMutex.Unlock()
errorMapLen, _ := tapErrors.getErrorsSummary()
log.Printf("Processed %v packets (%v bytes) in %v (errors: %v, errTypes:%v)",
appStats.PacketsCount,
appStats.ProcessedBytes,
time.Since(appStats.StartTime),
nErrors,
tapErrors.nErrors,
errorMapLen)
}
select {
......@@ -501,9 +466,9 @@ func startPassiveTapper(outputItems chan *api.OutputChannelItem) {
log.Printf(" biggest-chunk bytes:\t%d", stats.biggestChunkBytes)
log.Printf(" overlap packets:\t%d", stats.overlapPackets)
log.Printf(" overlap bytes:\t\t%d", stats.overlapBytes)
log.Printf("Errors: %d", nErrors)
for e := range errorsMap {
log.Printf(" %s:\t\t%d", e, errorsMap[e])
log.Printf("Errors: %d", tapErrors.nErrors)
for e := range tapErrors.errorsMap {
log.Printf(" %s:\t\t%d", e, tapErrors.errorsMap[e])
}
log.Printf("AppStats: %v", GetStats())
}
......@@ -36,7 +36,7 @@ type tcpStream struct {
func (t *tcpStream) Accept(tcp *layers.TCP, ci gopacket.CaptureInfo, dir reassembly.TCPFlowDirection, nextSeq reassembly.Sequence, start *bool, ac reassembly.AssemblerContext) bool {
// FSM
if !t.tcpstate.CheckState(tcp, dir) {
SilentError("FSM-rejection", "%s: Packet rejected by FSM (state:%s)", t.ident, t.tcpstate.String())
tapErrors.SilentError("FSM-rejection", "%s: Packet rejected by FSM (state:%s)", t.ident, t.tcpstate.String())
stats.rejectFsm++
if !t.fsmerr {
t.fsmerr = true
......@@ -49,7 +49,7 @@ func (t *tcpStream) Accept(tcp *layers.TCP, ci gopacket.CaptureInfo, dir reassem
// Options
err := t.optchecker.Accept(tcp, ci, dir, nextSeq, start)
if err != nil {
SilentError("OptionChecker-rejection", "%s: Packet rejected by OptionChecker: %s", t.ident, err)
tapErrors.SilentError("OptionChecker-rejection", "%s: Packet rejected by OptionChecker: %s", t.ident, err)
stats.rejectOpt++
if !*nooptcheck {
return false
......@@ -60,10 +60,10 @@ func (t *tcpStream) Accept(tcp *layers.TCP, ci gopacket.CaptureInfo, dir reassem
if *checksum {
c, err := tcp.ComputeChecksum()
if err != nil {
SilentError("ChecksumCompute", "%s: Got error computing checksum: %s", t.ident, err)
tapErrors.SilentError("ChecksumCompute", "%s: Got error computing checksum: %s", t.ident, err)
accept = false
} else if c != 0x0 {
SilentError("Checksum", "%s: Invalid checksum: 0x%x", t.ident, c)
tapErrors.SilentError("Checksum", "%s: Invalid checksum: 0x%x", t.ident, c)
accept = false
}
}
......@@ -97,7 +97,7 @@ func (t *tcpStream) ReassembledSG(sg reassembly.ScatterGather, ac reassembly.Ass
if sgStats.OverlapBytes != 0 && sgStats.OverlapPackets == 0 {
// In the original example this was handled with panic().
// I don't know what this error means or how to handle it properly.
SilentError("Invalid-Overlap", "bytes:%d, pkts:%d", sgStats.OverlapBytes, sgStats.OverlapPackets)
tapErrors.SilentError("Invalid-Overlap", "bytes:%d, pkts:%d", sgStats.OverlapBytes, sgStats.OverlapPackets)
}
stats.overlapBytes += sgStats.OverlapBytes
stats.overlapPackets += sgStats.OverlapPackets
......@@ -108,7 +108,7 @@ func (t *tcpStream) ReassembledSG(sg reassembly.ScatterGather, ac reassembly.Ass
} else {
ident = fmt.Sprintf("%v %v(%s): ", t.net.Reverse(), t.transport.Reverse(), dir)
}
Trace("%s: SG reassembled packet with %d bytes (start:%v,end:%v,skip:%d,saved:%d,nb:%d,%d,overlap:%d,%d)", ident, length, start, end, skip, saved, sgStats.Packets, sgStats.Chunks, sgStats.OverlapBytes, sgStats.OverlapPackets)
tapErrors.Trace("%s: SG reassembled packet with %d bytes (start:%v,end:%v,skip:%d,saved:%d,nb:%d,%d,overlap:%d,%d)", ident, length, start, end, skip, saved, sgStats.Packets, sgStats.Chunks, sgStats.OverlapBytes, sgStats.OverlapPackets)
if skip == -1 && *allowmissinginit {
// this is allowed
} else if skip != 0 {
......@@ -127,18 +127,18 @@ func (t *tcpStream) ReassembledSG(sg reassembly.ScatterGather, ac reassembly.Ass
}
dnsSize := binary.BigEndian.Uint16(data[:2])
missing := int(dnsSize) - len(data[2:])
Trace("dnsSize: %d, missing: %d", dnsSize, missing)
tapErrors.Trace("dnsSize: %d, missing: %d", dnsSize, missing)
if missing > 0 {
Debug("Missing some bytes: %d", missing)
tapErrors.Debug("Missing some bytes: %d", missing)
sg.KeepFrom(0)
return
}
p := gopacket.NewDecodingLayerParser(layers.LayerTypeDNS, dns)
err := p.DecodeLayers(data[2:], &decoded)
if err != nil {
SilentError("DNS-parser", "Failed to decode DNS: %v", err)
tapErrors.SilentError("DNS-parser", "Failed to decode DNS: %v", err)
} else {
Trace("DNS: %s", gopacket.LayerDump(dns))
tapErrors.Trace("DNS: %s", gopacket.LayerDump(dns))
}
if len(data) > 2+int(dnsSize) {
sg.KeepFrom(2 + int(dnsSize))
......@@ -173,7 +173,7 @@ func (t *tcpStream) ReassembledSG(sg reassembly.ScatterGather, ac reassembly.Ass
}
func (t *tcpStream) ReassemblyComplete(ac reassembly.AssemblerContext) bool {
Trace("%s: Connection closed", t.ident)
tapErrors.Trace("%s: Connection closed", t.ident)
if t.isTapTarget && !t.isClosed {
t.Close()
}
......
Supports Markdown
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment