Unverified Commit 1f2f63d1 authored by M. Mert Yıldıran's avatar M. Mert Yıldıran Committed by GitHub
Browse files

Implement AMQP request-response matcher (#1091)

* Implement the basis of AMQP request-response matching

* Fix `package.json`

* Add `ExchangeDeclareOk`

* Add `ConnectionCloseOk`

* Add `BasicConsumeOk`

* Add `QueueBindOk`

* Add `representEmptyResponse` and fix `BasicPublish` and `BasicDeliver`

* Fix ident and matcher, add `connectionOpen`, `channelOpen`, `connectionTune`, `basicCancel`

* Fix linter

* Fix the unit tests

* #run_acceptance_tests

* #run_acceptance_tests

* Fix the tests #run_acceptance_tests

* Log don't panic

* Don't skip AMQP acceptance tests #run_acceptance_tests

* Revert "Don't skip AMQP acceptance tests #run_acceptance_tests"

This reverts commit c60e9cf7.

* Remove `Details` section from `representEmpty`

* Add `This request or response has no data.` text
parent e2544aea
Showing with 980 additions and 83 deletions
+980 -83
...@@ -13,4 +13,4 @@ test-pull-bin: ...@@ -13,4 +13,4 @@ test-pull-bin:
test-pull-expect: test-pull-expect:
@mkdir -p expect @mkdir -p expect
@[ "${skipexpect}" ] && echo "Skipping downloading expected JSONs" || gsutil -o 'GSUtil:parallel_process_count=5' -o 'GSUtil:parallel_thread_count=5' -m cp -r gs://static.up9.io/mizu/test-pcap/expect15/amqp/\* expect @[ "${skipexpect}" ] && echo "Skipping downloading expected JSONs" || gsutil -o 'GSUtil:parallel_process_count=5' -o 'GSUtil:parallel_thread_count=5' -m cp -r gs://static.up9.io/mizu/test-pcap/expect16/amqp/\* expect
...@@ -4,16 +4,20 @@ go 1.17 ...@@ -4,16 +4,20 @@ go 1.17
require ( require (
github.com/stretchr/testify v1.7.0 github.com/stretchr/testify v1.7.0
github.com/up9inc/mizu/logger v0.0.0
github.com/up9inc/mizu/tap/api v0.0.0 github.com/up9inc/mizu/tap/api v0.0.0
) )
require ( require (
github.com/davecgh/go-spew v1.1.0 // indirect github.com/davecgh/go-spew v1.1.0 // indirect
github.com/op/go-logging v0.0.0-20160315200505-970db520ece7 // indirect
github.com/pmezard/go-difflib v1.0.0 // indirect github.com/pmezard/go-difflib v1.0.0 // indirect
github.com/up9inc/mizu/tap/dbgctl v0.0.0 // indirect github.com/up9inc/mizu/tap/dbgctl v0.0.0 // indirect
gopkg.in/yaml.v3 v3.0.0-20200313102051-9f266ea9e77c // indirect gopkg.in/yaml.v3 v3.0.0-20200313102051-9f266ea9e77c // indirect
) )
replace github.com/up9inc/mizu/logger v0.0.0 => ../../../logger
replace github.com/up9inc/mizu/tap/api v0.0.0 => ../../api replace github.com/up9inc/mizu/tap/api v0.0.0 => ../../api
replace github.com/up9inc/mizu/tap/dbgctl v0.0.0 => ../../dbgctl replace github.com/up9inc/mizu/tap/dbgctl v0.0.0 => ../../dbgctl
github.com/davecgh/go-spew v1.1.0 h1:ZDRjVQ15GmhC3fiQ8ni8+OwkZQO4DARzQgrnXU1Liz8= github.com/davecgh/go-spew v1.1.0 h1:ZDRjVQ15GmhC3fiQ8ni8+OwkZQO4DARzQgrnXU1Liz8=
github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
github.com/op/go-logging v0.0.0-20160315200505-970db520ece7 h1:lDH9UUVJtmYCjyT0CI4q8xvlXPxeZ0gYCVvWbmPlp88=
github.com/op/go-logging v0.0.0-20160315200505-970db520ece7/go.mod h1:HzydrMdWErDVzsI23lYNej1Htcns9BCg93Dk0bBINWk=
github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM= github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM=
github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4= github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4=
github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME= github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME=
......
...@@ -5,8 +5,8 @@ import ( ...@@ -5,8 +5,8 @@ import (
"fmt" "fmt"
"sort" "sort"
"strconv" "strconv"
"time"
"github.com/up9inc/mizu/logger"
"github.com/up9inc/mizu/tap/api" "github.com/up9inc/mizu/tap/api"
) )
...@@ -25,14 +25,14 @@ var connectionMethodMap = map[int]string{ ...@@ -25,14 +25,14 @@ var connectionMethodMap = map[int]string{
61: "connection unblocked", 61: "connection unblocked",
} }
// var channelMethodMap = map[int]string{ var channelMethodMap = map[int]string{
// 10: "channel open", 10: "channel open",
// 11: "channel open-ok", 11: "channel open-ok",
// 20: "channel flow", 20: "channel flow",
// 21: "channel flow-ok", 21: "channel flow-ok",
// 40: "channel close", 40: "channel close",
// 41: "channel close-ok", 41: "channel close-ok",
// } }
var exchangeMethodMap = map[int]string{ var exchangeMethodMap = map[int]string{
10: "exchange declare", 10: "exchange declare",
...@@ -94,29 +94,41 @@ type AMQPWrapper struct { ...@@ -94,29 +94,41 @@ type AMQPWrapper struct {
Details interface{} `json:"details"` Details interface{} `json:"details"`
} }
func emitAMQP(event interface{}, _type string, method string, connectionInfo *api.ConnectionInfo, captureTime time.Time, captureSize int, emitter api.Emitter, capture api.Capture) { type emptyResponse struct {
request := &api.GenericMessage{ }
IsRequest: true,
CaptureTime: captureTime, const emptyMethod = "empty"
Payload: AMQPPayload{
Data: &AMQPWrapper{ func getIdent(reader api.TcpReader, methodFrame *MethodFrame) (ident string) {
Method: method, tcpID := reader.GetTcpID()
Url: "", // To match methods to their Ok(s)
Details: event, methodId := methodFrame.MethodId - methodFrame.MethodId%10
},
}, if reader.GetIsClient() {
} ident = fmt.Sprintf(
item := &api.OutputChannelItem{ "%s_%s_%s_%s_%d_%d_%d",
Protocol: protocol, tcpID.SrcIP,
Capture: capture, tcpID.DstIP,
Timestamp: captureTime.UnixNano() / int64(time.Millisecond), tcpID.SrcPort,
ConnectionInfo: connectionInfo, tcpID.DstPort,
Pair: &api.RequestResponsePair{ methodFrame.ChannelId,
Request: *request, methodFrame.ClassId,
Response: api.GenericMessage{}, methodId,
}, )
} else {
ident = fmt.Sprintf(
"%s_%s_%s_%s_%d_%d_%d",
tcpID.DstIP,
tcpID.SrcIP,
tcpID.DstPort,
tcpID.SrcPort,
methodFrame.ChannelId,
methodFrame.ClassId,
methodId,
)
} }
emitter.Emit(item)
return
} }
func representProperties(properties map[string]interface{}, rep []interface{}) ([]interface{}, string, string) { func representProperties(properties map[string]interface{}, rep []interface{}) ([]interface{}, string, string) {
...@@ -460,6 +472,36 @@ func representQueueDeclare(event map[string]interface{}) []interface{} { ...@@ -460,6 +472,36 @@ func representQueueDeclare(event map[string]interface{}) []interface{} {
return rep return rep
} }
func representQueueDeclareOk(event map[string]interface{}) []interface{} {
rep := make([]interface{}, 0)
details, _ := json.Marshal([]api.TableData{
{
Name: "Queue",
Value: event["queue"].(string),
Selector: `response.queue`,
},
{
Name: "Message Count",
Value: fmt.Sprintf("%g", event["messageCount"].(float64)),
Selector: `response.messageCount`,
},
{
Name: "Consumer Count",
Value: fmt.Sprintf("%g", event["consumerCount"].(float64)),
Selector: `response.consumerCount`,
},
})
rep = append(rep, api.SectionData{
Type: api.TABLE,
Title: "Details",
Data: string(details),
})
return rep
}
func representExchangeDeclare(event map[string]interface{}) []interface{} { func representExchangeDeclare(event map[string]interface{}) []interface{} {
rep := make([]interface{}, 0) rep := make([]interface{}, 0)
...@@ -571,7 +613,7 @@ func representConnectionStart(event map[string]interface{}) []interface{} { ...@@ -571,7 +613,7 @@ func representConnectionStart(event map[string]interface{}) []interface{} {
x, _ := json.Marshal(value) x, _ := json.Marshal(value)
outcome = string(x) outcome = string(x)
default: default:
panic("Unknown data type for the server property!") logger.Log.Info("Unknown data type for the server property!")
} }
headers = append(headers, api.TableData{ headers = append(headers, api.TableData{
Name: name, Name: name,
...@@ -593,6 +635,65 @@ func representConnectionStart(event map[string]interface{}) []interface{} { ...@@ -593,6 +635,65 @@ func representConnectionStart(event map[string]interface{}) []interface{} {
return rep return rep
} }
func representConnectionStartOk(event map[string]interface{}) []interface{} {
rep := make([]interface{}, 0)
details, _ := json.Marshal([]api.TableData{
{
Name: "Mechanism",
Value: event["mechanism"].(string),
Selector: `response.mechanism`,
},
{
Name: "Mechanism",
Value: event["mechanism"].(string),
Selector: `response.response`,
},
{
Name: "Locale",
Value: event["locale"].(string),
Selector: `response.locale`,
},
})
rep = append(rep, api.SectionData{
Type: api.TABLE,
Title: "Details",
Data: string(details),
})
if event["clientProperties"] != nil {
headers := make([]api.TableData, 0)
for name, value := range event["clientProperties"].(map[string]interface{}) {
var outcome string
switch v := value.(type) {
case string:
outcome = v
case map[string]interface{}:
x, _ := json.Marshal(value)
outcome = string(x)
default:
logger.Log.Info("Unknown data type for the client property!")
}
headers = append(headers, api.TableData{
Name: name,
Value: outcome,
Selector: fmt.Sprintf(`response.clientProperties["%s"]`, name),
})
}
sort.Slice(headers, func(i, j int) bool {
return headers[i].Name < headers[j].Name
})
headersMarshaled, _ := json.Marshal(headers)
rep = append(rep, api.SectionData{
Type: api.TABLE,
Title: "Client Properties",
Data: string(headersMarshaled),
})
}
return rep
}
func representConnectionClose(event map[string]interface{}) []interface{} { func representConnectionClose(event map[string]interface{}) []interface{} {
replyCode := "" replyCode := ""
...@@ -750,3 +851,122 @@ func representBasicConsume(event map[string]interface{}) []interface{} { ...@@ -750,3 +851,122 @@ func representBasicConsume(event map[string]interface{}) []interface{} {
return rep return rep
} }
func representBasicConsumeOk(event map[string]interface{}) []interface{} {
rep := make([]interface{}, 0)
details, _ := json.Marshal([]api.TableData{
{
Name: "Consumer Tag",
Value: event["consumerTag"].(string),
Selector: `response.consumerTag`,
},
})
rep = append(rep, api.SectionData{
Type: api.TABLE,
Title: "Details",
Data: string(details),
})
return rep
}
func representConnectionOpen(event map[string]interface{}) []interface{} {
rep := make([]interface{}, 0)
details, _ := json.Marshal([]api.TableData{
{
Name: "Virtual Host",
Value: event["virtualHost"].(string),
Selector: `request.virtualHost`,
},
})
rep = append(rep, api.SectionData{
Type: api.TABLE,
Title: "Details",
Data: string(details),
})
return rep
}
func representConnectionTune(event map[string]interface{}) []interface{} {
rep := make([]interface{}, 0)
details, _ := json.Marshal([]api.TableData{
{
Name: "Channel Max",
Value: fmt.Sprintf("%g", event["channelMax"].(float64)),
Selector: `request.channelMax`,
},
{
Name: "Frame Max",
Value: fmt.Sprintf("%g", event["frameMax"].(float64)),
Selector: `request.frameMax`,
},
{
Name: "Heartbeat",
Value: fmt.Sprintf("%g", event["heartbeat"].(float64)),
Selector: `request.heartbeat`,
},
})
rep = append(rep, api.SectionData{
Type: api.TABLE,
Title: "Details",
Data: string(details),
})
return rep
}
func representBasicCancel(event map[string]interface{}) []interface{} {
rep := make([]interface{}, 0)
details, _ := json.Marshal([]api.TableData{
{
Name: "Consumer Tag",
Value: event["consumerTag"].(string),
Selector: `response.consumerTag`,
},
{
Name: "NoWait",
Value: strconv.FormatBool(event["noWait"].(bool)),
Selector: `request.noWait`,
},
})
rep = append(rep, api.SectionData{
Type: api.TABLE,
Title: "Details",
Data: string(details),
})
return rep
}
func representBasicCancelOk(event map[string]interface{}) []interface{} {
rep := make([]interface{}, 0)
details, _ := json.Marshal([]api.TableData{
{
Name: "Consumer Tag",
Value: event["consumerTag"].(string),
Selector: `response.consumerTag`,
},
})
rep = append(rep, api.SectionData{
Type: api.TABLE,
Title: "Details",
Data: string(details),
})
return rep
}
func representEmpty(event map[string]interface{}) []interface{} {
rep := make([]interface{}, 0)
return rep
}
...@@ -46,22 +46,12 @@ func (d dissecting) Ping() { ...@@ -46,22 +46,12 @@ func (d dissecting) Ping() {
log.Printf("pong %s", protocol.Name) log.Printf("pong %s", protocol.Name)
} }
const amqpRequest string = "amqp_request"
func (d dissecting) Dissect(b *bufio.Reader, reader api.TcpReader, options *api.TrafficFilteringOptions) error { func (d dissecting) Dissect(b *bufio.Reader, reader api.TcpReader, options *api.TrafficFilteringOptions) error {
r := AmqpReader{b} r := AmqpReader{b}
var remaining int var remaining int
var header *HeaderFrame var header *HeaderFrame
connectionInfo := &api.ConnectionInfo{
ClientIP: reader.GetTcpID().SrcIP,
ClientPort: reader.GetTcpID().SrcPort,
ServerIP: reader.GetTcpID().DstIP,
ServerPort: reader.GetTcpID().DstPort,
IsOutgoing: true,
}
eventBasicPublish := &BasicPublish{ eventBasicPublish := &BasicPublish{
Exchange: "", Exchange: "",
RoutingKey: "", RoutingKey: "",
...@@ -83,6 +73,10 @@ func (d dissecting) Dissect(b *bufio.Reader, reader api.TcpReader, options *api. ...@@ -83,6 +73,10 @@ func (d dissecting) Dissect(b *bufio.Reader, reader api.TcpReader, options *api.
var lastMethodFrameMessage Message var lastMethodFrameMessage Message
var ident string
isClient := reader.GetIsClient()
reqResMatcher := reader.GetReqResMatcher().(*requestResponseMatcher)
for { for {
frameVal, err := r.readFrame() frameVal, err := r.readFrame()
if err == io.EOF { if err == io.EOF {
...@@ -121,16 +115,22 @@ func (d dissecting) Dissect(b *bufio.Reader, reader api.TcpReader, options *api. ...@@ -121,16 +115,22 @@ func (d dissecting) Dissect(b *bufio.Reader, reader api.TcpReader, options *api.
switch lastMethodFrameMessage.(type) { switch lastMethodFrameMessage.(type) {
case *BasicPublish: case *BasicPublish:
eventBasicPublish.Body = f.Body eventBasicPublish.Body = f.Body
emitAMQP(*eventBasicPublish, amqpRequest, basicMethodMap[40], connectionInfo, reader.GetCaptureTime(), reader.GetReadProgress().Current(), reader.GetEmitter(), reader.GetParent().GetOrigin()) reqResMatcher.emitEvent(isClient, ident, basicMethodMap[40], *eventBasicPublish, reader)
reqResMatcher.emitEvent(!isClient, ident, emptyMethod, &emptyResponse{}, reader)
case *BasicDeliver: case *BasicDeliver:
eventBasicDeliver.Body = f.Body eventBasicDeliver.Body = f.Body
emitAMQP(*eventBasicDeliver, amqpRequest, basicMethodMap[60], connectionInfo, reader.GetCaptureTime(), reader.GetReadProgress().Current(), reader.GetEmitter(), reader.GetParent().GetOrigin()) reqResMatcher.emitEvent(!isClient, ident, basicMethodMap[60], *eventBasicDeliver, reader)
reqResMatcher.emitEvent(isClient, ident, emptyMethod, &emptyResponse{}, reader)
} }
case *MethodFrame: case *MethodFrame:
reader.GetParent().SetProtocol(&protocol) reader.GetParent().SetProtocol(&protocol)
lastMethodFrameMessage = f.Method lastMethodFrameMessage = f.Method
ident = getIdent(reader, f)
switch m := f.Method.(type) { switch m := f.Method.(type) {
case *BasicPublish: case *BasicPublish:
eventBasicPublish.Exchange = m.Exchange eventBasicPublish.Exchange = m.Exchange
...@@ -146,7 +146,10 @@ func (d dissecting) Dissect(b *bufio.Reader, reader api.TcpReader, options *api. ...@@ -146,7 +146,10 @@ func (d dissecting) Dissect(b *bufio.Reader, reader api.TcpReader, options *api.
NoWait: m.NoWait, NoWait: m.NoWait,
Arguments: m.Arguments, Arguments: m.Arguments,
} }
emitAMQP(*eventQueueBind, amqpRequest, queueMethodMap[20], connectionInfo, reader.GetCaptureTime(), reader.GetReadProgress().Current(), reader.GetEmitter(), reader.GetParent().GetOrigin()) reqResMatcher.emitEvent(isClient, ident, queueMethodMap[20], *eventQueueBind, reader)
case *QueueBindOk:
reqResMatcher.emitEvent(isClient, ident, queueMethodMap[21], m, reader)
case *BasicConsume: case *BasicConsume:
eventBasicConsume := &BasicConsume{ eventBasicConsume := &BasicConsume{
...@@ -158,7 +161,10 @@ func (d dissecting) Dissect(b *bufio.Reader, reader api.TcpReader, options *api. ...@@ -158,7 +161,10 @@ func (d dissecting) Dissect(b *bufio.Reader, reader api.TcpReader, options *api.
NoWait: m.NoWait, NoWait: m.NoWait,
Arguments: m.Arguments, Arguments: m.Arguments,
} }
emitAMQP(*eventBasicConsume, amqpRequest, basicMethodMap[20], connectionInfo, reader.GetCaptureTime(), reader.GetReadProgress().Current(), reader.GetEmitter(), reader.GetParent().GetOrigin()) reqResMatcher.emitEvent(isClient, ident, basicMethodMap[20], *eventBasicConsume, reader)
case *BasicConsumeOk:
reqResMatcher.emitEvent(isClient, ident, basicMethodMap[21], m, reader)
case *BasicDeliver: case *BasicDeliver:
eventBasicDeliver.ConsumerTag = m.ConsumerTag eventBasicDeliver.ConsumerTag = m.ConsumerTag
...@@ -177,7 +183,10 @@ func (d dissecting) Dissect(b *bufio.Reader, reader api.TcpReader, options *api. ...@@ -177,7 +183,10 @@ func (d dissecting) Dissect(b *bufio.Reader, reader api.TcpReader, options *api.
NoWait: m.NoWait, NoWait: m.NoWait,
Arguments: m.Arguments, Arguments: m.Arguments,
} }
emitAMQP(*eventQueueDeclare, amqpRequest, queueMethodMap[10], connectionInfo, reader.GetCaptureTime(), reader.GetReadProgress().Current(), reader.GetEmitter(), reader.GetParent().GetOrigin()) reqResMatcher.emitEvent(isClient, ident, queueMethodMap[10], *eventQueueDeclare, reader)
case *QueueDeclareOk:
reqResMatcher.emitEvent(isClient, ident, queueMethodMap[11], m, reader)
case *ExchangeDeclare: case *ExchangeDeclare:
eventExchangeDeclare := &ExchangeDeclare{ eventExchangeDeclare := &ExchangeDeclare{
...@@ -190,17 +199,19 @@ func (d dissecting) Dissect(b *bufio.Reader, reader api.TcpReader, options *api. ...@@ -190,17 +199,19 @@ func (d dissecting) Dissect(b *bufio.Reader, reader api.TcpReader, options *api.
NoWait: m.NoWait, NoWait: m.NoWait,
Arguments: m.Arguments, Arguments: m.Arguments,
} }
emitAMQP(*eventExchangeDeclare, amqpRequest, exchangeMethodMap[10], connectionInfo, reader.GetCaptureTime(), reader.GetReadProgress().Current(), reader.GetEmitter(), reader.GetParent().GetOrigin()) reqResMatcher.emitEvent(isClient, ident, exchangeMethodMap[10], *eventExchangeDeclare, reader)
case *ExchangeDeclareOk:
reqResMatcher.emitEvent(isClient, ident, exchangeMethodMap[11], m, reader)
case *ConnectionStart: case *ConnectionStart:
eventConnectionStart := &ConnectionStart{ // In our tests, *ConnectionStart does not result in *ConnectionStartOk
VersionMajor: m.VersionMajor, reqResMatcher.emitEvent(!isClient, ident, connectionMethodMap[10], m, reader)
VersionMinor: m.VersionMinor, reqResMatcher.emitEvent(isClient, ident, emptyMethod, &emptyResponse{}, reader)
ServerProperties: m.ServerProperties,
Mechanisms: m.Mechanisms, case *ConnectionStartOk:
Locales: m.Locales, // In our tests, *ConnectionStart does not result in *ConnectionStartOk
} reqResMatcher.emitEvent(isClient, ident, connectionMethodMap[11], m, reader)
emitAMQP(*eventConnectionStart, amqpRequest, connectionMethodMap[10], connectionInfo, reader.GetCaptureTime(), reader.GetReadProgress().Current(), reader.GetEmitter(), reader.GetParent().GetOrigin())
case *ConnectionClose: case *ConnectionClose:
eventConnectionClose := &ConnectionClose{ eventConnectionClose := &ConnectionClose{
...@@ -209,7 +220,40 @@ func (d dissecting) Dissect(b *bufio.Reader, reader api.TcpReader, options *api. ...@@ -209,7 +220,40 @@ func (d dissecting) Dissect(b *bufio.Reader, reader api.TcpReader, options *api.
ClassId: m.ClassId, ClassId: m.ClassId,
MethodId: m.MethodId, MethodId: m.MethodId,
} }
emitAMQP(*eventConnectionClose, amqpRequest, connectionMethodMap[50], connectionInfo, reader.GetCaptureTime(), reader.GetReadProgress().Current(), reader.GetEmitter(), reader.GetParent().GetOrigin()) reqResMatcher.emitEvent(isClient, ident, connectionMethodMap[50], *eventConnectionClose, reader)
case *ConnectionCloseOk:
reqResMatcher.emitEvent(isClient, ident, connectionMethodMap[51], m, reader)
case *connectionOpen:
eventConnectionOpen := &connectionOpen{
VirtualHost: m.VirtualHost,
}
reqResMatcher.emitEvent(isClient, ident, connectionMethodMap[40], *eventConnectionOpen, reader)
case *connectionOpenOk:
reqResMatcher.emitEvent(isClient, ident, connectionMethodMap[41], m, reader)
case *channelOpen:
reqResMatcher.emitEvent(isClient, ident, channelMethodMap[10], m, reader)
case *channelOpenOk:
reqResMatcher.emitEvent(isClient, ident, channelMethodMap[11], m, reader)
case *connectionTune:
// In our tests, *connectionTune does not result in *connectionTuneOk
reqResMatcher.emitEvent(!isClient, ident, connectionMethodMap[30], m, reader)
reqResMatcher.emitEvent(isClient, ident, emptyMethod, &emptyResponse{}, reader)
case *connectionTuneOk:
// In our tests, *connectionTune does not result in *connectionTuneOk
reqResMatcher.emitEvent(isClient, ident, connectionMethodMap[31], m, reader)
case *basicCancel:
reqResMatcher.emitEvent(isClient, ident, basicMethodMap[30], m, reader)
case *basicCancelOk:
reqResMatcher.emitEvent(isClient, ident, basicMethodMap[31], m, reader)
} }
default: default:
...@@ -220,9 +264,17 @@ func (d dissecting) Dissect(b *bufio.Reader, reader api.TcpReader, options *api. ...@@ -220,9 +264,17 @@ func (d dissecting) Dissect(b *bufio.Reader, reader api.TcpReader, options *api.
func (d dissecting) Analyze(item *api.OutputChannelItem, resolvedSource string, resolvedDestination string, namespace string) *api.Entry { func (d dissecting) Analyze(item *api.OutputChannelItem, resolvedSource string, resolvedDestination string, namespace string) *api.Entry {
request := item.Pair.Request.Payload.(map[string]interface{}) request := item.Pair.Request.Payload.(map[string]interface{})
response := item.Pair.Response.Payload.(map[string]interface{})
reqDetails := request["details"].(map[string]interface{}) reqDetails := request["details"].(map[string]interface{})
resDetails := response["details"].(map[string]interface{})
elapsedTime := item.Pair.Response.CaptureTime.Sub(item.Pair.Request.CaptureTime).Round(time.Millisecond).Milliseconds()
if elapsedTime < 0 {
elapsedTime = 0
}
reqDetails["method"] = request["method"] reqDetails["method"] = request["method"]
resDetails["method"] = response["method"]
return &api.Entry{ return &api.Entry{
Protocol: protocol.ProtocolSummary, Protocol: protocol.ProtocolSummary,
Capture: item.Capture, Capture: item.Capture,
...@@ -236,13 +288,15 @@ func (d dissecting) Analyze(item *api.OutputChannelItem, resolvedSource string, ...@@ -236,13 +288,15 @@ func (d dissecting) Analyze(item *api.OutputChannelItem, resolvedSource string,
IP: item.ConnectionInfo.ServerIP, IP: item.ConnectionInfo.ServerIP,
Port: item.ConnectionInfo.ServerPort, Port: item.ConnectionInfo.ServerPort,
}, },
Namespace: namespace, Namespace: namespace,
Outgoing: item.ConnectionInfo.IsOutgoing, Outgoing: item.ConnectionInfo.IsOutgoing,
Request: reqDetails, Request: reqDetails,
RequestSize: item.Pair.Request.CaptureSize, Response: resDetails,
Timestamp: item.Timestamp, RequestSize: item.Pair.Request.CaptureSize,
StartTime: item.Pair.Request.CaptureTime, ResponseSize: item.Pair.Response.CaptureSize,
ElapsedTime: 0, Timestamp: item.Timestamp,
StartTime: item.Pair.Request.CaptureTime,
ElapsedTime: elapsedTime,
} }
} }
...@@ -283,6 +337,21 @@ func (d dissecting) Summarize(entry *api.Entry) *api.BaseEntry { ...@@ -283,6 +337,21 @@ func (d dissecting) Summarize(entry *api.Entry) *api.BaseEntry {
case basicMethodMap[20]: case basicMethodMap[20]:
summary = entry.Request["queue"].(string) summary = entry.Request["queue"].(string)
summaryQuery = fmt.Sprintf(`request.queue == "%s"`, summary) summaryQuery = fmt.Sprintf(`request.queue == "%s"`, summary)
case connectionMethodMap[40]:
summary = entry.Request["virtualHost"].(string)
summaryQuery = fmt.Sprintf(`request.virtualHost == "%s"`, summary)
case connectionMethodMap[30]:
summary = fmt.Sprintf("%g", entry.Request["channelMax"].(float64))
summaryQuery = fmt.Sprintf(`request.channelMax == "%s"`, summary)
case connectionMethodMap[31]:
summary = fmt.Sprintf("%g", entry.Request["channelMax"].(float64))
summaryQuery = fmt.Sprintf(`request.channelMax == "%s"`, summary)
case basicMethodMap[30]:
summary = entry.Request["consumerTag"].(string)
summaryQuery = fmt.Sprintf(`request.consumerTag == "%s"`, summary)
case basicMethodMap[31]:
summary = entry.Request["consumerTag"].(string)
summaryQuery = fmt.Sprintf(`request.consumerTag == "%s"`, summary)
} }
return &api.BaseEntry{ return &api.BaseEntry{
...@@ -306,6 +375,8 @@ func (d dissecting) Summarize(entry *api.Entry) *api.BaseEntry { ...@@ -306,6 +375,8 @@ func (d dissecting) Summarize(entry *api.Entry) *api.BaseEntry {
func (d dissecting) Represent(request map[string]interface{}, response map[string]interface{}) (object []byte, err error) { func (d dissecting) Represent(request map[string]interface{}, response map[string]interface{}) (object []byte, err error) {
representation := make(map[string]interface{}) representation := make(map[string]interface{})
var repRequest []interface{} var repRequest []interface{}
var repResponse []interface{}
switch request["method"].(string) { switch request["method"].(string) {
case basicMethodMap[40]: case basicMethodMap[40]:
repRequest = representBasicPublish(request) repRequest = representBasicPublish(request)
...@@ -323,9 +394,45 @@ func (d dissecting) Represent(request map[string]interface{}, response map[strin ...@@ -323,9 +394,45 @@ func (d dissecting) Represent(request map[string]interface{}, response map[strin
repRequest = representQueueBind(request) repRequest = representQueueBind(request)
case basicMethodMap[20]: case basicMethodMap[20]:
repRequest = representBasicConsume(request) repRequest = representBasicConsume(request)
case connectionMethodMap[40]:
repRequest = representConnectionOpen(request)
case channelMethodMap[10]:
repRequest = representEmpty(request)
case connectionMethodMap[30]:
repRequest = representConnectionTune(request)
case basicMethodMap[30]:
repRequest = representBasicCancel(request)
} }
switch response["method"].(string) {
case queueMethodMap[11]:
repResponse = representQueueDeclareOk(response)
case exchangeMethodMap[11]:
repResponse = representEmpty(response)
case connectionMethodMap[11]:
repResponse = representConnectionStartOk(response)
case connectionMethodMap[51]:
repResponse = representEmpty(response)
case basicMethodMap[21]:
repResponse = representBasicConsumeOk(response)
case queueMethodMap[21]:
repResponse = representEmpty(response)
case connectionMethodMap[41]:
repResponse = representEmpty(response)
case channelMethodMap[11]:
repResponse = representEmpty(request)
case connectionMethodMap[31]:
repResponse = representConnectionTune(request)
case basicMethodMap[31]:
repResponse = representBasicCancelOk(request)
case emptyMethod:
repResponse = representEmpty(response)
}
representation["request"] = repRequest representation["request"] = repRequest
representation["response"] = repResponse
object, err = json.Marshal(representation) object, err = json.Marshal(representation)
return return
} }
...@@ -336,7 +443,7 @@ func (d dissecting) Macros() map[string]string { ...@@ -336,7 +443,7 @@ func (d dissecting) Macros() map[string]string {
} }
func (d dissecting) NewResponseRequestMatcher() api.RequestResponseMatcher { func (d dissecting) NewResponseRequestMatcher() api.RequestResponseMatcher {
return nil return createResponseRequestMatcher()
} }
var Dissector dissecting var Dissector dissecting
......
package amqp
import (
"sync"
"time"
"github.com/up9inc/mizu/tap/api"
)
// Key is {client_addr}_{client_port}_{dest_addr}_{dest_port}_{channel_id}_{class_id}_{method_id}
type requestResponseMatcher struct {
openMessagesMap *sync.Map
}
func createResponseRequestMatcher() api.RequestResponseMatcher {
return &requestResponseMatcher{openMessagesMap: &sync.Map{}}
}
func (matcher *requestResponseMatcher) GetMap() *sync.Map {
return matcher.openMessagesMap
}
func (matcher *requestResponseMatcher) SetMaxTry(value int) {
}
func (matcher *requestResponseMatcher) emitEvent(isRequest bool, ident string, method string, event interface{}, reader api.TcpReader) {
reader.GetParent().SetProtocol(&protocol)
var item *api.OutputChannelItem
if isRequest {
item = matcher.registerRequest(ident, method, event, reader.GetCaptureTime(), reader.GetReadProgress().Current())
} else {
item = matcher.registerResponse(ident, method, event, reader.GetCaptureTime(), reader.GetReadProgress().Current())
}
if item != nil {
item.ConnectionInfo = &api.ConnectionInfo{
ClientIP: reader.GetTcpID().SrcIP,
ClientPort: reader.GetTcpID().SrcPort,
ServerIP: reader.GetTcpID().DstIP,
ServerPort: reader.GetTcpID().DstPort,
IsOutgoing: true,
}
item.Capture = reader.GetParent().GetOrigin()
reader.GetEmitter().Emit(item)
}
}
func (matcher *requestResponseMatcher) registerRequest(ident string, method string, request interface{}, captureTime time.Time, captureSize int) *api.OutputChannelItem {
requestAMQPMessage := api.GenericMessage{
IsRequest: true,
CaptureTime: captureTime,
CaptureSize: captureSize,
Payload: AMQPPayload{
Data: &AMQPWrapper{
Method: method,
Url: "",
Details: request,
},
},
}
if response, found := matcher.openMessagesMap.LoadAndDelete(ident); found {
// Type assertion always succeeds because all of the map's values are of api.GenericMessage type
responseAMQPMessage := response.(*api.GenericMessage)
if responseAMQPMessage.IsRequest {
return nil
}
return matcher.preparePair(&requestAMQPMessage, responseAMQPMessage)
}
matcher.openMessagesMap.Store(ident, &requestAMQPMessage)
return nil
}
func (matcher *requestResponseMatcher) registerResponse(ident string, method string, response interface{}, captureTime time.Time, captureSize int) *api.OutputChannelItem {
responseAMQPMessage := api.GenericMessage{
IsRequest: false,
CaptureTime: captureTime,
CaptureSize: captureSize,
Payload: AMQPPayload{
Data: &AMQPWrapper{
Method: method,
Url: "",
Details: response,
},
},
}
if request, found := matcher.openMessagesMap.LoadAndDelete(ident); found {
// Type assertion always succeeds because all of the map's values are of api.GenericMessage type
requestAMQPMessage := request.(*api.GenericMessage)
if !requestAMQPMessage.IsRequest {
return nil
}
return matcher.preparePair(requestAMQPMessage, &responseAMQPMessage)
}
matcher.openMessagesMap.Store(ident, &responseAMQPMessage)
return nil
}
func (matcher *requestResponseMatcher) preparePair(requestAMQPMessage *api.GenericMessage, responseAMQPMessage *api.GenericMessage) *api.OutputChannelItem {
return &api.OutputChannelItem{
Protocol: protocol,
Timestamp: requestAMQPMessage.CaptureTime.UnixNano() / int64(time.Millisecond),
ConnectionInfo: nil,
Pair: &api.RequestResponsePair{
Request: *requestAMQPMessage,
Response: *responseAMQPMessage,
},
}
}
...@@ -81,10 +81,10 @@ func (msg *ConnectionStart) read(r io.Reader) (err error) { ...@@ -81,10 +81,10 @@ func (msg *ConnectionStart) read(r io.Reader) (err error) {
} }
type ConnectionStartOk struct { type ConnectionStartOk struct {
ClientProperties Table ClientProperties Table `json:"clientProperties"`
Mechanism string Mechanism string `json:"mechanism"`
Response string Response string `json:"response"`
Locale string Locale string `json:"locale"`
} }
func (msg *ConnectionStartOk) read(r io.Reader) (err error) { func (msg *ConnectionStartOk) read(r io.Reader) (err error) {
...@@ -135,9 +135,9 @@ func (msg *connectionSecureOk) read(r io.Reader) (err error) { ...@@ -135,9 +135,9 @@ func (msg *connectionSecureOk) read(r io.Reader) (err error) {
} }
type connectionTune struct { type connectionTune struct {
ChannelMax uint16 ChannelMax uint16 `json:"channelMax"`
FrameMax uint32 FrameMax uint32 `json:"frameMax"`
Heartbeat uint16 Heartbeat uint16 `json:"heartbeat"`
} }
func (msg *connectionTune) read(r io.Reader) (err error) { func (msg *connectionTune) read(r io.Reader) (err error) {
...@@ -181,7 +181,7 @@ func (msg *connectionTuneOk) read(r io.Reader) (err error) { ...@@ -181,7 +181,7 @@ func (msg *connectionTuneOk) read(r io.Reader) (err error) {
} }
type connectionOpen struct { type connectionOpen struct {
VirtualHost string VirtualHost string `json:"virtualHost"`
reserved1 string reserved1 string
reserved2 bool reserved2 bool
} }
...@@ -580,9 +580,9 @@ func (msg *QueueDeclare) read(r io.Reader) (err error) { ...@@ -580,9 +580,9 @@ func (msg *QueueDeclare) read(r io.Reader) (err error) {
} }
type QueueDeclareOk struct { type QueueDeclareOk struct {
Queue string Queue string `json:"queue"`
MessageCount uint32 MessageCount uint32 `json:"messageCount"`
ConsumerCount uint32 ConsumerCount uint32 `json:"consumerCount"`
} }
func (msg *QueueDeclareOk) read(r io.Reader) (err error) { func (msg *QueueDeclareOk) read(r io.Reader) (err error) {
...@@ -840,7 +840,7 @@ func (msg *BasicConsume) read(r io.Reader) (err error) { ...@@ -840,7 +840,7 @@ func (msg *BasicConsume) read(r io.Reader) (err error) {
} }
type BasicConsumeOk struct { type BasicConsumeOk struct {
ConsumerTag string ConsumerTag string `json:"consumerTag"`
} }
func (msg *BasicConsumeOk) read(r io.Reader) (err error) { func (msg *BasicConsumeOk) read(r io.Reader) (err error) {
...@@ -853,8 +853,8 @@ func (msg *BasicConsumeOk) read(r io.Reader) (err error) { ...@@ -853,8 +853,8 @@ func (msg *BasicConsumeOk) read(r io.Reader) (err error) {
} }
type basicCancel struct { type basicCancel struct {
ConsumerTag string ConsumerTag string `json:"consumerTag"`
NoWait bool NoWait bool `json:"noWait"`
} }
func (msg *basicCancel) read(r io.Reader) (err error) { func (msg *basicCancel) read(r io.Reader) (err error) {
...@@ -873,7 +873,7 @@ func (msg *basicCancel) read(r io.Reader) (err error) { ...@@ -873,7 +873,7 @@ func (msg *basicCancel) read(r io.Reader) (err error) {
} }
type basicCancelOk struct { type basicCancelOk struct {
ConsumerTag string ConsumerTag string `json:"consumerTag"`
} }
func (msg *basicCancelOk) read(r io.Reader) (err error) { func (msg *basicCancelOk) read(r io.Reader) (err error) {
......
This diff is collapsed.
...@@ -26,11 +26,11 @@ ...@@ -26,11 +26,11 @@
"@craco/craco": "^6.4.3", "@craco/craco": "^6.4.3",
"@types/jest": "^26.0.24", "@types/jest": "^26.0.24",
"@types/node": "^12.20.54", "@types/node": "^12.20.54",
"sass": "^1.52.3",
"react": "^17.0.2", "react": "^17.0.2",
"react-copy-to-clipboard": "^5.1.0", "react-copy-to-clipboard": "^5.1.0",
"react-dom": "^17.0.2", "react-dom": "^17.0.2",
"recoil": "^0.7.2" "recoil": "^0.7.2",
"sass": "^1.52.3"
}, },
"dependencies": { "dependencies": {
"@craco/craco": "^6.4.3", "@craco/craco": "^6.4.3",
...@@ -72,6 +72,7 @@ ...@@ -72,6 +72,7 @@
"devDependencies": { "devDependencies": {
"@rollup/plugin-node-resolve": "^13.3.0", "@rollup/plugin-node-resolve": "^13.3.0",
"@svgr/rollup": "^6.2.1", "@svgr/rollup": "^6.2.1",
"@types/ace": "^0.0.48",
"cross-env": "^7.0.3", "cross-env": "^7.0.3",
"env-cmd": "^10.1.0", "env-cmd": "^10.1.0",
"gh-pages": "^4.0.0", "gh-pages": "^4.0.0",
......
...@@ -28,6 +28,10 @@ const SectionsRepresentation: React.FC<any> = ({ data, color }) => { ...@@ -28,6 +28,10 @@ const SectionsRepresentation: React.FC<any> = ({ data, color }) => {
} }
} }
if (sections.length === 0) {
sections.push(<div>This request or response has no data.</div>);
}
return <React.Fragment>{sections}</React.Fragment>; return <React.Fragment>{sections}</React.Fragment>;
} }
......
Supports Markdown
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment