Unverified Commit 27fa0afb authored by M. Mert Yıldıran's avatar M. Mert Yıldıran Committed by GitHub
Browse files

TRA-4331 Implement full data streaming over WebSocket (#819)

* Implement full data streaming over WebSocket

* Fix the linting error

* Make the empty being the criteria

* Use a label to break the nested loop
parent c98c99e4
Showing with 45 additions and 12 deletions
+45 -12
...@@ -110,20 +110,31 @@ func websocketHandler(w http.ResponseWriter, r *http.Request, eventHandlers Even ...@@ -110,20 +110,31 @@ func websocketHandler(w http.ResponseWriter, r *http.Request, eventHandlers Even
logger.Log.Error(err) logger.Log.Error(err)
} }
out:
for { for {
_, msg, err := ws.ReadMessage() // params[0]: query
if err != nil { // params[1]: enableFullEntries (empty: disable, non-empty: enable)
if _, ok := err.(*websocket.CloseError); ok { params := make([][]byte, 2)
logger.Log.Debugf("Received websocket close message, socket id: %d", socketId) for i := range params {
} else { _, params[i], err = ws.ReadMessage()
logger.Log.Errorf("Error reading message, socket id: %d, error: %v", socketId, err) if err != nil {
if _, ok := err.(*websocket.CloseError); ok {
logger.Log.Debugf("Received websocket close message, socket id: %d", socketId)
} else {
logger.Log.Errorf("Error reading message, socket id: %d, error: %v", socketId, err)
}
break out
} }
}
break enableFullEntries := false
if len(params[1]) > 0 {
enableFullEntries = true
} }
if !isTapper && !isQuerySet { if !isTapper && !isQuerySet {
query := string(msg) query := string(params[0])
err = basenine.Validate(shared.BasenineHost, shared.BaseninePort, query) err = basenine.Validate(shared.BasenineHost, shared.BaseninePort, query)
if err != nil { if err != nil {
toastBytes, _ := models.CreateWebsocketToastMessage(&models.ToastMessage{ toastBytes, _ := models.CreateWebsocketToastMessage(&models.ToastMessage{
...@@ -150,10 +161,15 @@ func websocketHandler(w http.ResponseWriter, r *http.Request, eventHandlers Even ...@@ -150,10 +161,15 @@ func websocketHandler(w http.ResponseWriter, r *http.Request, eventHandlers Even
var entry *tapApi.Entry var entry *tapApi.Entry
err = json.Unmarshal(bytes, &entry) err = json.Unmarshal(bytes, &entry)
base := tapApi.Summarize(entry) var message []byte
if enableFullEntries {
message, _ = models.CreateFullEntryWebSocketMessage(entry)
} else {
base := tapApi.Summarize(entry)
message, _ = models.CreateBaseEntryWebSocketMessage(base)
}
baseEntryBytes, _ := models.CreateBaseEntryWebSocketMessage(base) if err := SendToSocket(socketId, message); err != nil {
if err := SendToSocket(socketId, baseEntryBytes); err != nil {
logger.Log.Error(err) logger.Log.Error(err)
} }
} }
...@@ -185,7 +201,7 @@ func websocketHandler(w http.ResponseWriter, r *http.Request, eventHandlers Even ...@@ -185,7 +201,7 @@ func websocketHandler(w http.ResponseWriter, r *http.Request, eventHandlers Even
connection.Query(query, data, meta) connection.Query(query, data, meta)
} else { } else {
eventHandlers.WebSocketMessage(socketId, msg) eventHandlers.WebSocketMessage(socketId, params[0])
} }
} }
} }
......
...@@ -42,6 +42,11 @@ type WebSocketEntryMessage struct { ...@@ -42,6 +42,11 @@ type WebSocketEntryMessage struct {
Data *tapApi.BaseEntry `json:"data,omitempty"` Data *tapApi.BaseEntry `json:"data,omitempty"`
} }
type WebSocketFullEntryMessage struct {
*shared.WebSocketMessageMetadata
Data *tapApi.Entry `json:"data,omitempty"`
}
type WebSocketTappedEntryMessage struct { type WebSocketTappedEntryMessage struct {
*shared.WebSocketMessageMetadata *shared.WebSocketMessageMetadata
Data *tapApi.OutputChannelItem Data *tapApi.OutputChannelItem
...@@ -88,6 +93,16 @@ func CreateBaseEntryWebSocketMessage(base *tapApi.BaseEntry) ([]byte, error) { ...@@ -88,6 +93,16 @@ func CreateBaseEntryWebSocketMessage(base *tapApi.BaseEntry) ([]byte, error) {
return json.Marshal(message) return json.Marshal(message)
} }
func CreateFullEntryWebSocketMessage(entry *tapApi.Entry) ([]byte, error) {
message := &WebSocketFullEntryMessage{
WebSocketMessageMetadata: &shared.WebSocketMessageMetadata{
MessageType: shared.WebSocketMessageTypeFullEntry,
},
Data: entry,
}
return json.Marshal(message)
}
func CreateWebsocketTappedEntryMessage(base *tapApi.OutputChannelItem) ([]byte, error) { func CreateWebsocketTappedEntryMessage(base *tapApi.OutputChannelItem) ([]byte, error) {
message := &WebSocketTappedEntryMessage{ message := &WebSocketTappedEntryMessage{
WebSocketMessageMetadata: &shared.WebSocketMessageMetadata{ WebSocketMessageMetadata: &shared.WebSocketMessageMetadata{
......
...@@ -15,6 +15,7 @@ type WebSocketMessageType string ...@@ -15,6 +15,7 @@ type WebSocketMessageType string
const ( const (
WebSocketMessageTypeEntry WebSocketMessageType = "entry" WebSocketMessageTypeEntry WebSocketMessageType = "entry"
WebSocketMessageTypeFullEntry WebSocketMessageType = "fullEntry"
WebSocketMessageTypeTappedEntry WebSocketMessageType = "tappedEntry" WebSocketMessageTypeTappedEntry WebSocketMessageType = "tappedEntry"
WebSocketMessageTypeUpdateStatus WebSocketMessageType = "status" WebSocketMessageTypeUpdateStatus WebSocketMessageType = "status"
WebSocketMessageTypeAnalyzeStatus WebSocketMessageType = "analyzeStatus" WebSocketMessageTypeAnalyzeStatus WebSocketMessageType = "analyzeStatus"
......
...@@ -122,6 +122,7 @@ export const TrafficPage: React.FC<TrafficPageProps> = ({setAnalyzeStatus}) => { ...@@ -122,6 +122,7 @@ export const TrafficPage: React.FC<TrafficPageProps> = ({setAnalyzeStatus}) => {
ws.current.onopen = () => { ws.current.onopen = () => {
setWsConnection(WsConnectionStatus.Connected); setWsConnection(WsConnectionStatus.Connected);
ws.current.send(query); ws.current.send(query);
ws.current.send("");
} }
ws.current.onclose = () => { ws.current.onclose = () => {
setWsConnection(WsConnectionStatus.Closed); setWsConnection(WsConnectionStatus.Closed);
......
Supports Markdown
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment