From 749bee6d559f16c66fab38a384547d0ebe3f36b2 Mon Sep 17 00:00:00 2001
From: Igor Gov <iggvrv@gmail.com>
Date: Wed, 22 Sep 2021 06:24:19 +0300
Subject: [PATCH] Using rlog in agent and tapper & removing unreferenced code
 (#289)

* .
---
 agent/main.go                                | 29 +++-----------
 agent/pkg/rules/rulesHTTP.go                 |  3 +-
 agent/pkg/utils/har.go                       |  7 ++--
 {shared => agent/pkg/utils}/socket_client.go | 15 ++++---
 agent/pkg/utils/utils.go                     | 14 +++----
 cli/mizu/controlSocket.go                    | 42 --------------------
 6 files changed, 25 insertions(+), 85 deletions(-)
 rename {shared => agent/pkg/utils}/socket_client.go (53%)
 delete mode 100644 cli/mizu/controlSocket.go

diff --git a/agent/main.go b/agent/main.go
index f0cff4971..99f4934af 100644
--- a/agent/main.go
+++ b/agent/main.go
@@ -65,6 +65,7 @@ func main() {
 
 		hostApi(nil)
 	} else if *tapperMode {
+		rlog.Infof("Starting tapper, websocket address: %s", *apiServerAddress)
 		if *apiServerAddress == "" {
 			panic("API server address must be provided with --api-server-address when using --tap")
 		}
@@ -77,13 +78,13 @@ func main() {
 
 		filteredOutputItemsChannel := make(chan *tapApi.OutputChannelItem)
 		tap.StartPassiveTapper(tapOpts, filteredOutputItemsChannel, extensions, filteringOptions)
-		socketConnection, err := shared.ConnectToSocketServer(*apiServerAddress, shared.DEFAULT_SOCKET_RETRIES, shared.DEFAULT_SOCKET_RETRY_SLEEP_TIME, false)
+		socketConnection, err := utils.ConnectToSocketServer(*apiServerAddress)
 		if err != nil {
 			panic(fmt.Sprintf("Error connecting to socket server at %s %v", *apiServerAddress, err))
 		}
+		rlog.Infof("Connected successfully to websocket %s", *apiServerAddress)
 
 		go pipeTapChannelToSocket(socketConnection, filteredOutputItemsChannel)
-		// go pipeOutboundLinksChannelToSocket(socketConnection, outboundLinkOutputChannel)
 	} else if *apiServerMode {
 		api.StartResolving(*namespace)
 
@@ -122,7 +123,7 @@ func loadExtensions() {
 	extensionsMap = make(map[string]*tapApi.Extension)
 	for i, file := range files {
 		filename := file.Name()
-		log.Printf("Loading extension: %s\n", filename)
+		rlog.Infof("Loading extension: %s\n", filename)
 		extension := &tapApi.Extension{
 			Path: path.Join(extensionsDir, filename),
 		}
@@ -290,7 +291,7 @@ func pipeTapChannelToSocket(connection *websocket.Conn, messageDataChannel <-cha
 	for messageData := range messageDataChannel {
 		marshaledData, err := models.CreateWebsocketTappedEntryMessage(messageData)
 		if err != nil {
-			rlog.Infof("error converting message to json %s, (%v,%+v)\n", err, err, err)
+			rlog.Errorf("error converting message to json %v, err: %s, (%v,%+v)", messageData, err, err, err)
 			continue
 		}
 
@@ -298,26 +299,8 @@ func pipeTapChannelToSocket(connection *websocket.Conn, messageDataChannel <-cha
 		// and goes into the intermediate WebSocket.
 		err = connection.WriteMessage(websocket.TextMessage, marshaledData)
 		if err != nil {
-			rlog.Infof("error sending message through socket server %s, (%v,%+v)\n", err, err, err)
+			rlog.Errorf("error sending message through socket server %v, err: %s, (%v,%+v)", messageData, err, err, err)
 			continue
 		}
 	}
 }
-
-func pipeOutboundLinksChannelToSocket(connection *websocket.Conn, outboundLinkChannel <-chan *tap.OutboundLink) {
-	for outboundLink := range outboundLinkChannel {
-		if outboundLink.SuggestedProtocol == tap.TLSProtocol {
-			marshaledData, err := models.CreateWebsocketOutboundLinkMessage(outboundLink)
-			if err != nil {
-				rlog.Infof("Error converting outbound link to json %s, (%v,%+v)", err, err, err)
-				continue
-			}
-
-			err = connection.WriteMessage(websocket.TextMessage, marshaledData)
-			if err != nil {
-				rlog.Infof("error sending outbound link message through socket server %s, (%v,%+v)", err, err, err)
-				continue
-			}
-		}
-	}
-}
diff --git a/agent/pkg/rules/rulesHTTP.go b/agent/pkg/rules/rulesHTTP.go
index 84215e7fc..e4dc927b4 100644
--- a/agent/pkg/rules/rulesHTTP.go
+++ b/agent/pkg/rules/rulesHTTP.go
@@ -4,6 +4,7 @@ import (
 	"encoding/base64"
 	"encoding/json"
 	"fmt"
+	"github.com/romana/rlog"
 	"reflect"
 	"regexp"
 	"strings"
@@ -65,7 +66,7 @@ func MatchRequestPolicy(harEntry har.Entry, service string) []RulesMatched {
 				if err != nil {
 					continue
 				}
-				fmt.Println(matchValue, rule.Value)
+				rlog.Info(matchValue, rule.Value)
 			} else {
 				val := fmt.Sprint(out)
 				matchValue, err = regexp.MatchString(rule.Value, val)
diff --git a/agent/pkg/utils/har.go b/agent/pkg/utils/har.go
index 419bf7568..6386356d0 100644
--- a/agent/pkg/utils/har.go
+++ b/agent/pkg/utils/har.go
@@ -4,13 +4,12 @@ import (
 	"bytes"
 	"errors"
 	"fmt"
-	"strconv"
-	"strings"
-	"time"
-
 	"github.com/google/martian/har"
 	"github.com/romana/rlog"
 	"github.com/up9inc/mizu/tap/api"
+	"strconv"
+	"strings"
+	"time"
 )
 
 // Keep it because we might want cookies in the future
diff --git a/shared/socket_client.go b/agent/pkg/utils/socket_client.go
similarity index 53%
rename from shared/socket_client.go
rename to agent/pkg/utils/socket_client.go
index 4cc618356..aa532852a 100644
--- a/shared/socket_client.go
+++ b/agent/pkg/utils/socket_client.go
@@ -1,8 +1,8 @@
-package shared
+package utils
 
 import (
-	"fmt"
 	"github.com/gorilla/websocket"
+	"github.com/romana/rlog"
 	"time"
 )
 
@@ -11,24 +11,23 @@ const (
 	DEFAULT_SOCKET_RETRY_SLEEP_TIME = time.Second * 10
 )
 
-func ConnectToSocketServer(address string, retries int, retrySleepTime time.Duration, hideTimeoutErrors bool) (*websocket.Conn, error) {
+func ConnectToSocketServer(address string) (*websocket.Conn, error) {
 	var err error
 	var connection *websocket.Conn
 	try := 0
 
 	// Connection to server fails if client pod is up before server.
 	// Retries solve this issue.
-	for try < retries {
+	for try < DEFAULT_SOCKET_RETRIES {
+		rlog.Infof("Trying to connect to websocket: %s, attempt: %v/%v", address, try, DEFAULT_SOCKET_RETRIES)
 		connection, _, err = websocket.DefaultDialer.Dial(address, nil)
 		if err != nil {
+			rlog.Warnf("Failed connecting to websocket: %s, attempt: %v/%v, err: %s, (%v,%+v)", address, try, DEFAULT_SOCKET_RETRIES, err, err, err)
 			try++
-			if !hideTimeoutErrors {
-				fmt.Printf("Failed connecting to websocket server: %s, (%v,%+v)\n", err, err, err)
-			}
 		} else {
 			break
 		}
-		time.Sleep(retrySleepTime)
+		time.Sleep(DEFAULT_SOCKET_RETRY_SLEEP_TIME)
 	}
 
 	if err != nil {
diff --git a/agent/pkg/utils/utils.go b/agent/pkg/utils/utils.go
index cdb5a35cb..e4ac80232 100644
--- a/agent/pkg/utils/utils.go
+++ b/agent/pkg/utils/utils.go
@@ -4,7 +4,6 @@ import (
 	"context"
 	"github.com/gin-gonic/gin"
 	"github.com/romana/rlog"
-	"log"
 	"net/http"
 	"net/url"
 	"os"
@@ -18,8 +17,8 @@ import (
 func StartServer(app *gin.Engine) {
 	signals := make(chan os.Signal, 2)
 	signal.Notify(signals,
-		os.Interrupt,  	  // this catch ctrl + c
-		syscall.SIGTSTP,  // this catch ctrl + z
+		os.Interrupt,    // this catch ctrl + c
+		syscall.SIGTSTP, // this catch ctrl + z
 	)
 
 	srv := &http.Server{
@@ -36,8 +35,9 @@ func StartServer(app *gin.Engine) {
 	}()
 
 	// Run server.
+	rlog.Infof("Starting the server...")
 	if err := app.Run(":8899"); err != nil {
-		log.Printf("Oops... Server is not running! Reason: %v", err)
+		rlog.Errorf("Server is not running! Reason: %v", err)
 	}
 }
 
@@ -54,15 +54,15 @@ func ReverseSlice(data interface{}) {
 
 func CheckErr(e error) {
 	if e != nil {
-		log.Printf("%v", e)
+		rlog.Infof("%v", e)
 		//panic(e)
 	}
 }
 
 func SetHostname(address, newHostname string) string {
 	replacedUrl, err := url.Parse(address)
-	if err != nil{
-		log.Printf("error replacing hostname to %s in address %s, returning original %v",newHostname, address, err)
+	if err != nil {
+		rlog.Infof("error replacing hostname to %s in address %s, returning original %v", newHostname, address, err)
 		return address
 	}
 	replacedUrl.Host = newHostname
diff --git a/cli/mizu/controlSocket.go b/cli/mizu/controlSocket.go
deleted file mode 100644
index f35b9377c..000000000
--- a/cli/mizu/controlSocket.go
+++ /dev/null
@@ -1,42 +0,0 @@
-package mizu
-
-import (
-	"encoding/json"
-	"github.com/gorilla/websocket"
-	"github.com/up9inc/mizu/shared"
-	core "k8s.io/api/core/v1"
-	"time"
-)
-
-type ControlSocket struct {
-	connection *websocket.Conn
-}
-
-func CreateControlSocket(socketServerAddress string) (*ControlSocket, error) {
-	connection, err := shared.ConnectToSocketServer(socketServerAddress, 30, 2 * time.Second, true)
-	if err != nil {
-		return nil, err
-	} else {
-		return &ControlSocket{connection: connection}, nil
-	}
-}
-
-func (controlSocket *ControlSocket) SendNewTappedPodsListMessage(pods []core.Pod) error {
-	podInfos := make([]shared.PodInfo, 0)
-	for _, pod := range pods {
-		podInfos = append(podInfos, shared.PodInfo{Name: pod.Name, Namespace: pod.Namespace})
-	}
-	tapStatus := shared.TapStatus{Pods: podInfos}
-	socketMessage := shared.CreateWebSocketStatusMessage(tapStatus)
-
-	jsonMessage, err := json.Marshal(socketMessage)
-	if err != nil {
-		return err
-	}
-	err = controlSocket.connection.WriteMessage(websocket.TextMessage, jsonMessage)
-	if err != nil {
-		return err
-	}
-
-	return nil
-}
-- 
GitLab