Unverified Commit 01a1cd84 authored by RamiBerm's avatar RamiBerm Committed by GitHub
Browse files

TRA-3417 replace kubectl port-forward with kubectl proxy

TRA-3417 replace kubectl port-forward with kubectl proxy
parents 7abf8b83 466214c4
Showing with 587 additions and 98 deletions
+587 -98
......@@ -8,7 +8,7 @@ type MizuFetchOptions struct {
FromTimestamp int64
ToTimestamp int64
Directory string
MizuPort uint
MizuPort uint16
}
var mizuFetchOptions = MizuFetchOptions{}
......@@ -28,5 +28,5 @@ func init() {
fetchCmd.Flags().StringVarP(&mizuFetchOptions.Directory, "directory", "d", ".", "Provide a custom directory for fetched entries")
fetchCmd.Flags().Int64Var(&mizuFetchOptions.FromTimestamp, "from", 0, "Custom start timestamp for fetched entries")
fetchCmd.Flags().Int64Var(&mizuFetchOptions.ToTimestamp, "to", 0, "Custom end timestamp fetched entries")
fetchCmd.Flags().UintVarP(&mizuFetchOptions.MizuPort, "port", "p", 8899, "Custom port for mizu")
fetchCmd.Flags().Uint16VarP(&mizuFetchOptions.MizuPort, "port", "p", 8899, "Custom port for mizu")
}
......@@ -4,6 +4,8 @@ import (
"archive/zip"
"bytes"
"fmt"
"github.com/up9inc/mizu/cli/kubernetes"
"github.com/up9inc/mizu/cli/mizu"
"io"
"io/ioutil"
"log"
......@@ -14,7 +16,8 @@ import (
)
func RunMizuFetch(fetch *MizuFetchOptions) {
resp, err := http.Get(fmt.Sprintf("http://localhost:%v/api/har?from=%v&to=%v", fetch.MizuPort, fetch.FromTimestamp, fetch.ToTimestamp))
mizuProxiedUrl := kubernetes.GetMizuCollectorProxiedHostAndPath(fetch.MizuPort, mizu.ResourcesNamespace, mizu.AggregatorPodName)
resp, err := http.Get(fmt.Sprintf("http://%s/api/har?from=%v&to=%v", mizuProxiedUrl, fetch.FromTimestamp, fetch.ToTimestamp))
if err != nil {
log.Fatal(err)
}
......
......@@ -19,7 +19,6 @@ type MizuTapOptions struct {
AnalyzeDestination string
KubeConfigPath string
MizuImage string
MizuPodPort uint16
PlainTextFilterRegexes []string
TapOutgoing bool
}
......@@ -68,7 +67,6 @@ func init() {
tapCmd.Flags().BoolVarP(&mizuTapOptions.AllNamespaces, "all-namespaces", "A", false, "Tap all namespaces")
tapCmd.Flags().StringVarP(&mizuTapOptions.KubeConfigPath, "kube-config", "k", "", "Path to kube-config file")
tapCmd.Flags().StringVarP(&mizuTapOptions.MizuImage, "mizu-image", "", fmt.Sprintf("gcr.io/up9-docker-hub/mizu/%s:latest", mizu.Branch), "Custom image for mizu collector")
tapCmd.Flags().Uint16VarP(&mizuTapOptions.MizuPodPort, "mizu-port", "", 8899, "Port which mizu cli will attempt to forward from the mizu collector pod")
tapCmd.Flags().StringArrayVarP(&mizuTapOptions.PlainTextFilterRegexes, "regex-masking", "r", nil, "List of regex expressions that are used to filter matching values from text/plain http bodies")
tapCmd.Flags().StringVarP(&direction, "direction", "", "in", "Record traffic that goes in this direction (relative to the tapped pod): in/any")
}
......@@ -231,7 +231,6 @@ func portForwardApiPod(ctx context.Context, kubernetesProvider *kubernetes.Provi
podExactRegex := regexp.MustCompile(fmt.Sprintf("^%s$", mizu.AggregatorPodName))
added, modified, removed, errorChan := kubernetes.FilteredWatch(ctx, kubernetesProvider.GetPodWatcher(ctx, mizu.ResourcesNamespace), podExactRegex)
isPodReady := false
var portForward *kubernetes.PortForward
for {
select {
case <-added:
......@@ -243,20 +242,23 @@ func portForwardApiPod(ctx context.Context, kubernetesProvider *kubernetes.Provi
case modifiedPod := <-modified:
if modifiedPod.Status.Phase == "Running" && !isPodReady {
isPodReady = true
var portForwardCreateError error
if portForward, portForwardCreateError = kubernetes.NewPortForward(kubernetesProvider, mizu.ResourcesNamespace, mizu.AggregatorPodName, tappingOptions.GuiPort, tappingOptions.MizuPodPort, cancel); portForwardCreateError != nil {
fmt.Printf("error forwarding port to pod %s\n", portForwardCreateError)
cancel()
} else {
fmt.Printf("Web interface is now available at http://localhost:%d\n", tappingOptions.GuiPort)
time.Sleep(time.Second * 5) // Waiting to be sure port forwarding finished
if tappingOptions.Analyze {
if _, err := http.Get(fmt.Sprintf("http://localhost:%d/api/uploadEntries?dest=%s", tappingOptions.GuiPort, tappingOptions.AnalyzeDestination)); err != nil {
fmt.Println(err)
} else {
fmt.Printf(mizu.Purple, "Traffic is uploading to UP9 cloud for further analsys")
fmt.Println()
}
go func() {
err := kubernetes.StartProxy(kubernetesProvider, tappingOptions.GuiPort, mizu.ResourcesNamespace, mizu.AggregatorPodName)
if err != nil {
fmt.Printf("Error occured while running k8s proxy %v\n", err)
cancel()
}
}()
mizuProxiedUrl := kubernetes.GetMizuCollectorProxiedHostAndPath(tappingOptions.GuiPort, mizu.ResourcesNamespace, mizu.AggregatorPodName)
fmt.Printf("Mizu is available at http://%s\n", mizuProxiedUrl)
time.Sleep(time.Second * 5) // Waiting to be sure the proxy is ready
if tappingOptions.Analyze {
if _, err := http.Get(fmt.Sprintf("http://%s/api/uploadEntries?dest=%s", mizuProxiedUrl, tappingOptions.AnalyzeDestination)); err != nil {
fmt.Println(err)
} else {
fmt.Printf(mizu.Purple, "Traffic is uploading to UP9 cloud for further analsys")
fmt.Println()
}
}
}
......@@ -271,9 +273,6 @@ func portForwardApiPod(ctx context.Context, kubernetesProvider *kubernetes.Provi
cancel()
case <-ctx.Done():
if portForward != nil {
portForward.Stop()
}
return
}
}
......@@ -322,7 +321,7 @@ func waitForFinish(ctx context.Context, cancel context.CancelFunc) {
}
func syncApiStatus(ctx context.Context, cancel context.CancelFunc, tappingOptions *MizuTapOptions) {
controlSocket, err := mizu.CreateControlSocket(fmt.Sprintf("ws://localhost:%d/ws", tappingOptions.GuiPort))
controlSocket, err := mizu.CreateControlSocket(fmt.Sprintf("ws://%s/ws", kubernetes.GetMizuCollectorProxiedHostAndPath(tappingOptions.GuiPort, mizu.ResourcesNamespace, mizu.AggregatorPodName)))
if err != nil {
fmt.Printf("error establishing control socket connection %s\n", err)
cancel()
......
......@@ -4,15 +4,24 @@ import (
"github.com/spf13/cobra"
)
type MizuViewOptions struct {
GuiPort uint16
}
var mizuViewOptions = &MizuViewOptions{}
var viewCmd = &cobra.Command{
Use: "view",
Short: "Open GUI in browser",
RunE: func(cmd *cobra.Command, args []string) error {
runMizuView()
runMizuView(mizuViewOptions)
return nil
},
}
func init() {
rootCmd.AddCommand(viewCmd)
viewCmd.Flags().Uint16VarP(&mizuViewOptions.GuiPort, "gui-port", "p", 8899, "Provide a custom port for the web interface webserver")
}
......@@ -8,7 +8,7 @@ import (
"net/http"
)
func runMizuView() {
func runMizuView(mizuViewOptions *MizuViewOptions) {
kubernetesProvider := kubernetes.NewProvider("")
ctx, cancel := context.WithCancel(context.Background())
......@@ -23,11 +23,17 @@ func runMizuView() {
return
}
_, err = http.Get("http://localhost:8899/")
mizuProxiedUrl := kubernetes.GetMizuCollectorProxiedHostAndPath(mizuViewOptions.GuiPort, mizu.ResourcesNamespace, mizu.AggregatorPodName)
_, err = http.Get(fmt.Sprintf("http://%s/", mizuProxiedUrl))
if err == nil {
fmt.Printf("Found a running service %s and open port 8899\n", mizu.AggregatorPodName)
fmt.Printf("Found a running service %s and open port %d\n", mizu.AggregatorPodName, mizuViewOptions.GuiPort)
return
}
fmt.Printf("Found service %s, creating port forwarding to 8899\n", mizu.AggregatorPodName)
portForwardApiPod(ctx, kubernetesProvider, cancel, &MizuTapOptions{GuiPort: 8899, MizuPodPort: 8899})
fmt.Printf("Found service %s, creating k8s proxy\n", mizu.AggregatorPodName)
fmt.Printf("Mizu is available at http://%s\n", kubernetes.GetMizuCollectorProxiedHostAndPath(mizuViewOptions.GuiPort, mizu.ResourcesNamespace, mizu.AggregatorPodName))
err = kubernetes.StartProxy(kubernetesProvider, mizuViewOptions.GuiPort, mizu.ResourcesNamespace, mizu.AggregatorPodName)
if err != nil {
fmt.Printf("Error occured while running k8s proxy %v\n", err)
}
}
......@@ -6,9 +6,11 @@ require (
github.com/gorilla/websocket v1.4.2
github.com/spf13/cobra v1.1.3
github.com/up9inc/mizu/shared v0.0.0
k8s.io/api v0.21.0
k8s.io/apimachinery v0.21.0
k8s.io/client-go v0.21.0
k8s.io/api v0.21.2
k8s.io/apimachinery v0.21.2
k8s.io/cli-runtime v0.21.2 // indirect
k8s.io/client-go v0.21.2
k8s.io/kubectl v0.21.2
)
replace github.com/up9inc/mizu/shared v0.0.0 => ../shared
This diff is collapsed.
package kubernetes
import (
"bytes"
"context"
"fmt"
"k8s.io/apimachinery/pkg/util/httpstream"
"k8s.io/client-go/tools/portforward"
"k8s.io/client-go/transport/spdy"
"net/http"
"net/url"
"strings"
)
type PortForward struct {
stopChan chan struct{}
}
func NewPortForward(kubernetesProvider *Provider, namespace string, podName string, localPort uint16, podPort uint16, cancel context.CancelFunc) (*PortForward, error) {
dialer := getHttpDialer(kubernetesProvider, namespace, podName)
stopChan, readyChan := make(chan struct{}, 1), make(chan struct{}, 1)
out, errOut := new(bytes.Buffer), new(bytes.Buffer)
forwarder, err := portforward.New(dialer, []string{fmt.Sprintf("%d:%d", localPort, podPort)}, stopChan, readyChan, out, errOut)
if err != nil {
return nil, err
}
go func() {
err = forwarder.ForwardPorts() // this is blocking
if err != nil {
fmt.Printf("kubernetes port-forwarding error: %s", err)
cancel()
}
}()
return &PortForward{stopChan: stopChan}, nil
}
func (portForward *PortForward) Stop() {
close(portForward.stopChan)
}
func getHttpDialer(kubernetesProvider *Provider, namespace string, podName string) httpstream.Dialer {
roundTripper, upgrader, err := spdy.RoundTripperFor(&kubernetesProvider.clientConfig)
if err != nil {
panic(err)
}
path := fmt.Sprintf("/api/v1/namespaces/%s/pods/%s/portforward", namespace, podName)
hostIP := strings.TrimLeft(kubernetesProvider.clientConfig.Host, "htps:/")
serverURL := url.URL{Scheme: "https", Path: path, Host: hostIP}
return spdy.NewDialer(upgrader, &http.Client{Transport: roundTripper}, http.MethodPost, &serverURL)
}
package kubernetes
import (
"fmt"
"k8s.io/kubectl/pkg/proxy"
"net"
"net/http"
"net/url"
"time"
)
const k8sProxyApiPrefix = "/"
func StartProxy(kubernetesProvider *Provider, mizuPort uint16, mizuNamespace string, mizuServiceName string) error {
filter := &proxy.FilterServer{
AcceptPaths: proxy.MakeRegexpArrayOrDie(proxy.DefaultPathAcceptRE),
RejectPaths: proxy.MakeRegexpArrayOrDie(proxy.DefaultPathRejectRE),
AcceptHosts: proxy.MakeRegexpArrayOrDie(proxy.DefaultHostAcceptRE),
RejectMethods: proxy.MakeRegexpArrayOrDie(proxy.DefaultMethodRejectRE),
}
mizuProxiedUrl := GetMizuCollectorProxiedHostAndPath(mizuPort, mizuNamespace, mizuServiceName)
proxyHandler, err := proxy.NewProxyHandler(k8sProxyApiPrefix, filter, &kubernetesProvider.clientConfig, time.Second * 2)
if err != nil {
return err
}
mux := http.NewServeMux()
mux.Handle(k8sProxyApiPrefix, proxyHandler)
//work around to make static resources available to the dashboard (all .svgs will not load without this)
mux.Handle("/static/", getRerouteHttpHandler(proxyHandler, mizuProxiedUrl))
l, err := net.Listen("tcp", fmt.Sprintf("%s:%d", "127.0.0.1", int(mizuPort)))
if err != nil {
return err
}
server := http.Server{
Handler: mux,
}
return server.Serve(l)
}
func GetMizuCollectorProxiedHostAndPath(mizuPort uint16, mizuNamespace string, mizuServiceName string) string {
return fmt.Sprintf("localhost:%d/api/v1/namespaces/%s/services/%s:80/proxy", mizuPort, mizuNamespace, mizuServiceName)
}
// rewrites requests so they end up reaching the mizu-collector k8s service via the k8s proxy handler
func getRerouteHttpHandler(proxyHandler http.Handler, mizuProxyUrl string) http.Handler {
return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
newUrl, _ := url.Parse(fmt.Sprintf("http://%s%s", mizuProxyUrl, r.URL.Path))
r.URL = newUrl
proxyHandler.ServeHTTP(w, r)
})
}
......@@ -39,6 +39,28 @@ interface HarPageProps {
setAnalyzeStatus: (status: any) => void;
}
const isKubeProxy = () => {
return window.location.href.indexOf("/api/v1/namespaces/") > -1;
}
const getMizuApiUrl = () => {
if (isKubeProxy()) {
return window.location.href;
}
return window.location.origin;
};
const getMizuWebsocketUrl = () => {
if (isKubeProxy()) {
return `ws://${window.location.href.replace(`${window.location.protocol}//`, "")}ws`;
}
return `ws://${window.location.host}/ws`;
}
const mizuApiUrl = getMizuApiUrl();
const mizuWebsocketUrl = getMizuWebsocketUrl();
export const HarPage: React.FC<HarPageProps> = ({setAnalyzeStatus}) => {
const classes = useLayoutStyles();
......@@ -59,7 +81,7 @@ export const HarPage: React.FC<HarPageProps> = ({setAnalyzeStatus}) => {
const ws = useRef(null);
const openWebSocket = () => {
ws.current = new WebSocket("ws://localhost:8899/ws");
ws.current = new WebSocket(mizuWebsocketUrl);
ws.current.onopen = () => setConnection(ConnectionStatus.Connected);
ws.current.onclose = () => setConnection(ConnectionStatus.Closed);
}
......@@ -98,11 +120,11 @@ export const HarPage: React.FC<HarPageProps> = ({setAnalyzeStatus}) => {
useEffect(() => {
openWebSocket();
fetch(`http://localhost:8899/api/tapStatus`)
fetch(`${mizuApiUrl}/api/tapStatus`)
.then(response => response.json())
.then(data => setTappingStatus(data));
fetch(`http://localhost:8899/api/analyzeStatus`)
fetch(`${mizuApiUrl}/api/analyzeStatus`)
.then(response => response.json())
.then(data => setAnalyzeStatus(data));
}, []);
......@@ -111,7 +133,7 @@ export const HarPage: React.FC<HarPageProps> = ({setAnalyzeStatus}) => {
useEffect(() => {
if (!focusedEntryId) return;
setSelectedHarEntry(null)
fetch(`http://localhost:8899/api/entries/${focusedEntryId}`)
fetch(`${mizuApiUrl}/api/entries/${focusedEntryId}`)
.then(response => response.json())
.then(data => setSelectedHarEntry(data));
}, [focusedEntryId])
......
Supports Markdown
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment