Unverified Commit 72f47536 authored by Igor Gov's avatar Igor Gov Committed by GitHub
Browse files

Develop -> main (#544)


* Add support of listening to multiple netns (#418)

* multiple netns listen - initial commit

* multiple netns listen - actual work

* remove redundant log line

* map /proc of host to tapper

* changing kubernetes provider again after big conflict

* revert node-sass version back to 5.0.0

* Rename host_source to hostSource
Co-authored-by: default avatargadotroee <55343099+gadotroee@users.noreply.github.com>

* PR fixes - adding comment + typos + naming conventions

* go fmt + making procfs read only

* setns back to the original value after packet source initialized
Co-authored-by: default avatargadotroee <55343099+gadotroee@users.noreply.github.com>

* TRA-3842 daemon acceptance tests (#429)

* Update tap_test.go and testsUtils.go

* Update tap_test.go

* Update testsUtils.go

* Update tap_test.go and testsUtils.go

* Update tap_test.go and testsUtils.go

* Update testsUtils.go

* Update tap_test.go

* gofmt

* TRA-3913 support mizu via expose service (#440)

* Update README.md, tapRunner.go, and 4 more files...

* Update testsUtils.go

* Update proxy.go

* Update README.md, testsUtils.go, and 3 more files...

* Update testsUtils.go and provider.go

* fix readme titles (#442)

* Auto close inactive issues  (#441)

* Migrate from SQLite to Basenine and introduce a new filtering syntax (#279)

* Fix the OOMKilled error by calling `debug.FreeOSMemory` periodically

* Remove `MAX_NUMBER_OF_GOROUTINES` environment variable

* Change the line

* Increase the default value of `TCP_STREAM_CHANNEL_TIMEOUT_MS` to `10000`

* Write the client and integrate to the new real-time database

* Refactor the WebSocket implementaiton for `/ws`

* Adapt the UI to the new filtering system

* Fix the rest of the issues in the UI

* Increase the buffer of the scanner

* Implement accessing single records

* Increase the buffer of another scanner

* Populate `Request` and `Response` fields of `MizuEntry`

* Add syntax highlighting for the query

* Add database to `Dockerfile`

* Fix some issues

* Update the `realtime_dbms` Git module commit hash

* Upgrade Gin version and print the query string

* Revert "Upgrade Gin version and print the query string"

This reverts commit aa09f904.

* Use WebSocket's itself to query instead of the query string

* Fix some errors related to conversion to HAR

* Fix the issues caused by the latest merge

* Fix the build error

* Fix PR validation GitHub workflow

* Replace the git submodule with latest Basenine version `0.1.0`

Remove `realtime_client.go` and use the official client library `github.com/up9inc/basenine/client/go` instead.

* Move Basenine host and port constants to `shared` module

* Reliably execute and wait for Basenine to become available

* Upgrade Basenine version

* Properly close WebSocket and data channel

* Fix the issues caused by the recent merge commit

* Clean up the TypeScript code

* Update `.gitignore`

* Limit the database size

* Add `Macros` method signature to `Dissector` interface and set the macros provided by the protocol extensions

* Run `go mod tidy` on `agent`

* Upgrade `github.com/up9inc/basenine/client/go` version

* Implement a mechanism to update the query using click events in the UI and use it for protocol macros

* Update the query on click to timestamps

* Fix some issues in the WebSocket and channel handling

* Update the query on clicks to status code

* Update the query on clicks to method, path and service

* Update the query on clicks to is outgoing, source and destination ports

* Add an API endpoint to validate the query against syntax errors

* Move the query background color state into `TrafficPage`

* Fix the logic in `setQuery`

* Display a toast message in case of a syntax error in the query

* Remove a call to `fmt.Printf`

* Upgrade Basenine version to `0.1.3`

* Fix an issue related to getting `MAX_ENTRIES_DB_BYTES` environment variable

* Have the `path` key in request details, in HTTP

* Rearrange the HTTP headers for the querying

* Do the same thing for `cookies` and `queryString`

* Update the query on click to table elements

Add the selectors for `TABLE` type representations in HTTP extension.

* Update the query on click to `bodySize` and `elapsedTime` in `EntryTitle`

* Add the selectors for `TABLE` type representations in AMQP extension

* Add the selectors for `TABLE` type representations in Kafka extension

* Add the selectors for `TABLE` type representations in Redis extension

* Define a struct in `tap/api.go` for the section representation data

* Add the selectors for `BODY` type representations

* Add `request.path` to the HTTP request details

* Change the summary string's field name from `path` to `summary`

* Introduce `queryable` CSS class for queryable UI elements and underline them on hover

* Instead of `N requests` at the bottom, make it `Displaying N results (queried X/Y)` and live update the values

Upgrade Basenine version to `0.2.0`.

* Verify the sha256sum of Basenine executable inside `Dockerfile`

* Pass the start time to web UI through WebSocket and always show the `EntriesList` footer

* Pipe the `stderr` of Basenine as well

* Fix the layout issues related to `CodeEditor` in the UI

* Use the correct `shasum` command in `Dockerfile`

* Upgrade Basenine version to `0.2.1`

* Limit the height of `CodeEditor` container

* Remove `Paused` enum `ConnectionStatus` in UI

* Fix the issue caused by the recent merge

* Add the filtering guide (cheatsheet)

* Update open cheatsheet button's title

* Update cheatsheet content

* Remove the old SQLite code, adapt the `--analyze` related code to Basenine

* Change the method signature of `NewEntry`

* Change the method signature of `Represent`

* Introduce `HTTPPair` field in `MizuEntry` specific to HTTP

* Remove `Entry`, `EntryId` and `EstimatedSizeBytes` fields from `MizuEntry`

Also remove the `getEstimatedEntrySizeBytes` method.

* Remove `gorm.io/gorm` dependency

* Remove unused `sensitiveDataFiltering` folder

* Increase the left margin of open cheatsheet button

* Add `overflow: auto` to the cheatsheet `Modal`

* Fix `GetEntry` method

* Fix the macro for gRPC

* Fix an interface conversion in case of AMQP

* Fix two more interface conversion errors in AMQP

* Make the `syncEntriesImpl` method blocking

* Fix a grammar mistake in the cheatsheet

* Adapt to the changes in the recent merge commit

* Improve the cheatsheet text

* Always display the timestamp in `en-US`

* Upgrade Basenine version to `0.2.2`

* Fix the order of closing Basenine connections and channels

* Don't close the Basenine channels at all

* Upgrade Basenine version to `0.2.3`

* Set the initial filter to `rlimit(100)`

* Make Basenine persistent

* Upgrade Basenine version to `0.2.4`

* Update `debug.Dockerfile`

* Fix a failing test

* Upgrade Basenine version to `0.2.5`

* Revert "Do not show play icon when disconnected (#428)"

This reverts commit 8af2e562

.

* Upgrade Basenine version to `0.2.6`

* Make all non-informative things informative

* Make `100` a constant

* Use `===` in JavaScript no matter what

* Remove a forgotten `console.log`

* Add a comment and update the `query` in `syncEntriesImpl`

* Don't call `panic` in `GetEntry`

* Replace `panic` calls in `startBasenineServer` with `logger.Log.Panicf`

* Remove unnecessary `\n` characters in the logs

* Remove the `Reconnect` button (#444)

* Upgrade `github.com/up9inc/basenine/client/go` version (#446)

* Fix the `Analysis` button's style into its original state (#447)

* Fix the `Analysis` button's style into its original state

* Fix the MUI button style into its original state

* Fix the acceptance tests after the merger of #279 (#443)

* Enable acceptance tests

* Fix the acceptance tests

* Move `--headless` from `getDefaultCommandArgs` to `getDefaultTapCommandArgs`

* Fix rest of the failing acceptance tests

* Revert "Enable acceptance tests"

This reverts commit 3f919e865a1133784a917442eb7fd8ca421ce017.

* Revert "Revert "Enable acceptance tests""

This reverts commit c0bfe54b70fa257060e43f7dd25abe8279d23f85.

* Ignore `--headless` in `mizu view`

* Make all non-informative things informative

* Remove `github.com/stretchr/testify` dependency from the acceptance tests

* Move the helper methods `waitTimeout` and `checkDBHasEntries` from `tap_test.go` to `testsUtils.go`

* Split `checkDBHasEntries` method into `getDBEntries` and `assertEntriesAtLeast` methods

* Revert "Revert "Revert "Enable acceptance tests"""

This reverts commit c13342671c43640edd4680cdc403b6b3bbac3d7e.

* Revert "Revert "Revert "Revert "Enable acceptance tests""""

This reverts commit 0f8c436926bb6e633504d90145698b2c85f12011.

* Make `getDBEntries` and `checkEntriesAtLeast` methods return errors instead

* Revert "Revert "Revert "Revert "Revert "Enable acceptance tests"""""

This reverts commit 643fdde009fa1775663f594c16aee63a3684dd28.

* Send the message into this WebSocket connection instead of all (#449)

* Fix the CSS issues in the cheatsheet modal (#448)

* Fix the CSS issues in the cheatsheet modal

* Change the Sass variable names

* moved headless to root config, use headless in view (#450)

* extend cleanup timeout to solve context timeout problem in dump logs (#453)

* Add link to exposing mizu wiki page in README (#455)

* changed logger debug mode to log level (#456)

* fixed acceptance test go sum (#458)

* Ignore `SNYK-JS-JSONSCHEMA-1920922` (#462)

Dependency tree:
`node-sass@5.0.0 > node-gyp@7.1.2 > request@2.88.2 > http-signature@1.2.0 > jsprim@1.4.1 > json-schema@0.2.3`

`node-sass` should fix it first.

* Optimize UI entry feed performance (#452)

* Optimize the React code for feeding the entries

By building `EntryItem` only once and updating the `entries` state on meta query messages.

* Upgrade `react-scrollable-feed-virtualized` version from `1.4.3` to `1.4.8`

* Fix the `isSelected` state

* Set the query text before deciding the background to prevent lags while typing

* Upgrade Basenine version from `0.2.6` to `0.2.7`

* Set the query background color only if the query is same after the HTTP request and use `useEffect` instead

* Upgrade Basenine version from `0.2.7` to `0.2.8`

* Use `CancelToken` of `axios` instead of trying to check the query state

* Turn `updateQuery` function into a state hook

* Update the macro for `http`

* Do the `source.cancel()` call in `axios.CancelToken`

* Reduce client-side logging

* Upgrade Basenine version from `0.2.8` to `0.2.9` (#465)

Fixes `limit` helper being not finished because of lack of meta updates.

* Set `response.bodySize` to `0` if it's negative (#466)

* Prevent `elapsedTime` to be negative (#467)

Also fix the `elapsedTime` for Redis.

* changes log format to be more readable (#463)

* Stop reduction of user agent header (#468)

* remove newline in logs, fixed logs time format (#469)

* TRA-3903 better health endpoint for daemon mode (#471)

* Update main.go, status_controller.go, and 2 more files...

* Update status_controller.go and mizuTapperSyncer.go

* fixed redact acceptance test (#472)

* Return `404` instead of `500` if the entry could not be found and display a toast message (#464)

* TRA-3903 add flag to disable pvc creation for daemon mode (#474)

* Update tapRunner.go and tapConfig.go

* Update tapConfig.go

* Revert "Update tapConfig.go"

This reverts commit 5c7c02c4ab652a84878d2555426413ff25c8aa70.

* TRA-3903 - display targetted pods before waiting for all daemon resources to be created (#475)

* WIP

* Update tapRunner.go

* Update tapRunner.go

* Update the UI screenshots (#476)

* Update the UI screenshots

* Update `mizu-ui.png`

* TRA-3903 fix daemon mode in permission restricted configs (#473)

* Update tapRunner.go, permissions-all-namespaces-daemon.yaml, and 2 more files...

* Update tapRunner.go

* Update tapRunner.go and permissions-ns-daemon.yaml

* Update tapRunner.go

* Update tapRunner.go

* Update tapRunner.go

* TRA-3903 minor daemon mode refactor (#479)

* Update common.go and tapRunner.go

* Update common.go

* Don't omit the key-value pair if the value is `false` in `EntryTableSection` (#478)

* Sync entries in batches just as before (using `uploadIntervalSec` parameter) (#477)

* Sync entries in batches just as before (using `uploadIntervalSec` parameter)

* Replace `lastTimeSynced` value with `time.Time{}`

Since it will be overwritten by the very first iteration.

* Clear `focusedEntryId` state in case of a filter is applied (#482)

* Prevent the crash on client-side in case of `text` being undefined in `FancyTextDisplay` (#481)

* Prevent the crash on client-side in case of `text` being undefined in `FancyTextDisplay`

* Use `String(text)` instead

* Refactor watch pods to allow reusing watch wrapper (#470)

Currently shared/kubernetes/watch.go:FilteredWatch only watches pods.
This PR makes it reusable for other types of resources.
This is done in preparation for watching k8s events.

* Show the source and destination IP in the entry feed (#485)

* Upgrade Basenine version from `0.2.9` to `0.2.10` (#484)

* Upgrade Basenine version from `0.2.9` to `0.2.10`

Fixes the issues in `limit` and `rlimit` helpers that occur when they are on the left operand of a binary expression.

* Upgrade the client hash to latest

* Remove unnecessary `tcpdump` dependency from `Dockerfile` (#491)

* Ignore gob files (#488)

* Ignore gob files

* Remove `*.db` from `.gitignore`

* Update README (#486)

* Add token validity check (#483)

* Add support to auto discover envoy processes (#459)

* discover envoy pids using cluster ips

* add istio flag to cli + rename mtls flag to istio

* add istio.md to docs

* Fixing typos

* Fix minor typos and grammer in docs
Co-authored-by: default avatarNimrod Gilboa Markevich <nimrod@up9.com>

* Improving daemon documentation (#457)

* Some changes to the doc (#494)

* Warn pods not starting (#493)

Print warning event related to mizu k8s resources.
In non-daemon print to CLI. In Daemon print to API-Server logs.

* Remove `tap/tester/` directory (#489)
Co-authored-by: default avatargadotroee <55343099+gadotroee@users.noreply.github.com>

* Disable IPv4 defragmentation and support IPv6 (#487)

* Remove the extra negation on `nodefrag` flag's value

* Support IPv4 fragmentation and IPv6 at the same time

* Re-enable `nodefrag` flag

* Make the `gRPC` and `HTTP/2` distinction (#492)

* Remove the extra negation on `nodefrag` flag's value

* Support IPv4 fragmentation and IPv6 at the same time

* Set `Method` and `StatusCode` fields correctly for `HTTP/2`

* Replace unnecessary `grpc` naming with `http2`

* Make the `gRPC` and `HTTP/2` distinction

* Fix the macros of `http` extension

* Fix the macros of other protocol extensions

* Update the method signature of `Represent`

* Fix the `HTTP/2` support

* Fix some minor issues

* Upgrade Basenine version from `0.2.10` to `0.2.11`

Sorts macros before expanding them and prioritize the long macros.

* Don't regex split the gRPC method name

* Re-enable `nodefrag` flag

* Remove `SetHostname` method in HTTP extension (#496)

* Remove prevPodPhase (#497)

prevPodPhase does not take into account the fact that there may be more
than one tapper pod. Therefore it is not clear what its value
represents. It is only used in a debug print. It is not worth the effort
to fix for that one debug print.
Co-authored-by: default avatargadotroee <55343099+gadotroee@users.noreply.github.com>

* minor logging changes (#499)
Co-authored-by: default avatargadotroee <55343099+gadotroee@users.noreply.github.com>

* Use one channel for events instead of three (#495)

Use one channel for events instead of three separate channels by event type

* Add response body to the error in case of failure (#503)

* add response body to the error in case of failure

* fix typo + make inline condition

* Remove local dev instruction from readme (#507)

* Rename `URL` field to `Target URI` in the UI to prevent confusion (#509)

* Add HTTP2 Over Cleartext (H2C) support (#510)

* Add HTTP2 Over Cleartext (H2C) support

* Remove a parameter which is a remnant of debugging

* Hide `Encoding` field if it's `undefined` or empty in the UI (#511)

* Show the `EntryItem` as `EntrySummary` in `EntryDetailed` (#506)

* Fix the selected entry behavior by propagating the `focusedEntryId` through WebSocket (before #452) TRA-3983 (#513)

* Revert the select entry behavior into its original state RACING! (before #452) [TRA-3983 alternative 3]

* Remove the remaining `forceSelect`(s)

* Add a missing `focusedEntryId` prop

* Fix the race condition

* Propagate the `focusedEntryId` through WebSocket to prevent racing

* Handle unexpected socket close and replace the default `rlimit(100)` filter with `leftOff(-1)` filter (#508)

* Handle unexpected socket close and replace the default `rlimit(100)` filter with `leftOff(-1)` filter

* Rename `dontClear` parameter to `resetEntriesBuffer` and remove negation

* Add `Queryable` component to show a green add circle icon for the queryable UI elements (#512)

* Add `Queryable` component to show a green circle and use it in `EntryViewLine`

* Refactor `Queryable` component

* Use the `Queryable` component `EntryDetailed`

* Use the `Queryable` component `Summary`

* Instead of passing the style to `Queryable`, pass the children components directly

* Make `useTooltip = true` by default in `Queryable`

* Refactor a lot of styling to achieve using `Queryable` in `Protocol` component

* Migrate the last queryable elements in `EntryListItem` to `Queryable` component

* Fix some of the styling issues

* Make horizontal `Protocol` `Queryable` too

* Remove unnecessary child constants

* Revert some of the changes in 2a93f365f5c815dde16e97ac84a835c2ac9016de

* Fix rest of the styling issues

* Fix one more styling issue

* Update the screenshots and text in the cheatsheet according to the change

* Use `let` not `var`

* Add missing dependencies to the React hook

* Bring back `GetEntries` HTTP endpoint (#515)

* Bring back `GetEntries` HTTP endpoint

* Upgrade Basenine version from `0.2.12` to `0.2.13`

* Accept negative `leftOff` value

* Remove `max`es from the validations

* Make `timeoutMs` optional

* Update the route comment

* Add `EntriesResponse` struct

* Disable telemetry by env var MIZU_DISABLE_TELEMTRY (#517)

* Replace `privileged` with specific CAPABILITIES requests  (#514)

* Fix the styling of `Queryable` under `StatusCode` and `Summary` components (#519)

* Fix the CSS issue in `Queryable` inside `EntryViewLine` (#521)

* TRA-4017 Bring back `getOldEntries` method using fetch API and always start streaming from now (#518)

* Bring back `getOldEntries` method using fetch API

* Determine no more data on top based on `leftOff` value

* Remove `entriesBuffer` state

* Always open WebSocket with some `leftOff` value

* Rename `leftOff` state to `leftOffBottom`

* Don't set the `focusedEntryId` through WebSocket if the WebSocket is closed

* Call `setQueriedCurrent` with addition

* Close WebSocket upon reaching to top

* Open WebSocket upon snapping to bottom

* Close the WebSocket on snap broken event instead

* Set queried current value to zero upon filter submit

* Upgrade `react-scrollable-feed-virtualized` version and use `scrollToIndex` function

* Change the footer text format

* Improve no more data top logic

* Fix `closeWebSocket()` call logic in `onSnapBrokenEvent` and handle `data.meta` being `null` in `getOldEntries`

* Fix the issues around fetching old records

* Clean up `EntriesList.module.sass`

* Decrement initial `leftOffTop` value by `2`

* Fix the order of `incomingEntries` in `getOldEntries`

* Request `leftOffTop - 1` from `fetchEntries`

* Limit the front-end total entries fetched through WebSocket count to `10000`

* Lose the UI performance gain that's provided by #452

* Revert "Fix the selected entry behavior by propagating the `focusedEntryId` through WebSocket (before #452) TRA-3983 (#513)"

This reverts commit 873f2525.

* Fix the issues caused by 09371f14



* Upgrade Basenine version from `0.2.13` to `0.2.14`

* Upgrade Basenine version from `0.2.14` to `0.2.15`

* Fix the condition of "Fetch old records" button visibility

* Upgrade Basenine version from `0.2.15` to `0.2.16` and fix the UI code related to fetching old records

* Make `newEntries` constant

* Add type switch for `Base` field of `MizuEntry` (#520)

* Disable version check for devs (#522)

* Report the platform in telemtry (#523)
Co-authored-by: default avatarIgor Gov <igor.govorov1@gmail.com>

* Include milliseconds information into the timestamps in the UI (#524)

* Include milliseconds information into the timestamps in the UI

* Upgrade Basenine version from `0.2.16` to `0.2.17`

* Increase the `width` of timestamp

* Fix the CSS issues in queryable vertical protocol element (#526)

* Remove unnecessary fields and split `service` into `src.name` and `dst.name` (#525)

* Remove unnecessary fields and split `service` into `src.name` and `dst.name`

* Don't fall back to IP address but instead display `[Unresolved]` text

* Fix the CSS issues in the plus icon position and replace the separator `->` text with `SwapHorizIcon`

* make description of mizu config options public (#527)

* Fix the glitch (#529)

* Fix the glitch

* Bring back the functionality to "Fetch old records" and "Snap to bottom" buttons

* Fix the CSS issue in `Queryable` component for `src.name` field on heading mode (#530)

* API server stores tappers status (#531)

* Decreased API server boot time (#536)

* Change the connection status text and the toggle connection behavior (#534)

* Update the "Started listening at" timestamp and `queriedTotal` state based on database truncation (#533)

* Send pod info to tapper (#532)

* Alert on acceptance tests failure (#537)

* Fix health tapper status count (#538)

* Fix: acceptance tests (#539)

* Fix a JavaScript error in case of `null` attribute and an interface conversion error in the API server (#540)

* Bringing back the pod watch api server events to make acceptance test more stable (#541)

* TRA-4060 fix proxying error (#542)

* TRA-4062 remove duplicate target pod print (#543)

* Report pods "isTapped" to FE (#535)

* Fix acceptance tests (after pods status request change) (#545)
Co-authored-by: default avatarDavid Levanon <dvdlevanon@gmail.com>
Co-authored-by: default avatargadotroee <55343099+gadotroee@users.noreply.github.com>
Co-authored-by: default avatarRamiBerm <54766858+RamiBerm@users.noreply.github.com>
Co-authored-by: default avatarM. Mert Yıldıran <mehmet@up9.com>
Co-authored-by: default avatarRoyUP9 <87927115+RoyUP9@users.noreply.github.com>
Co-authored-by: default avatarNimrod Gilboa Markevich <59927337+nimrod-up9@users.noreply.github.com>
Co-authored-by: default avatarNimrod Gilboa Markevich <nimrod@up9.com>
Co-authored-by: default avatarAlon Girmonsky <1990761+alongir@users.noreply.github.com>
Co-authored-by: default avatarIgor Gov <igor.govorov1@gmail.com>
Co-authored-by: default avatarAlex Haiut <alex@up9.com>
parent 4badaadc
Showing with 943 additions and 456 deletions
+943 -456
...@@ -30,3 +30,15 @@ jobs: ...@@ -30,3 +30,15 @@ jobs:
- name: Test - name: Test
run: make acceptance-test run: make acceptance-test
- name: Slack notification on failure
uses: ravsamhq/notify-slack-action@v1
if: always()
with:
status: ${{ job.status }}
notification_title: 'Mizu {workflow} has {status_message}'
message_format: '{emoji} *{workflow}* {status_message} during <{run_url}|run>, after commit: <{commit_url}|{commit_sha}>'
footer: 'Linked Repo <{repo_url}|{repo}>'
notify_when: 'failure'
env:
SLACK_WEBHOOK_URL: ${{ secrets.SLACK_WEBHOOK_URL }}
name: Close inactive issues
on:
schedule:
- cron: "0 0 * * *"
jobs:
close-issues:
runs-on: ubuntu-latest
permissions:
issues: write
pull-requests: write
steps:
- uses: actions/stale@v3
with:
days-before-issue-stale: 30
days-before-issue-close: 14
stale-issue-label: "stale"
stale-issue-message: "This issue is stale because it has been open for 30 days with no activity."
close-issue-message: "This issue was closed because it has been inactive for 14 days since being marked as stale."
days-before-pr-stale: -1
days-before-pr-close: -1
repo-token: ${{ secrets.GITHUB_TOKEN }}
\ No newline at end of file
name: Security validation
on:
pull_request:
branches:
- 'develop'
- 'main'
jobs:
security:
name: Check for vulnerabilities
runs-on: ubuntu-latest
env:
SNYK_TOKEN: ${{ secrets.SNYK_TOKEN }}
steps:
- uses: actions/checkout@v2
- uses: snyk/actions/setup@master
- name: Set up Go 1.16
uses: actions/setup-go@v2
with:
go-version: '1.16'
- name: Run snyl on all projects
run: snyk test --all-projects
...@@ -15,7 +15,6 @@ ...@@ -15,7 +15,6 @@
# vendor/ # vendor/
.idea/ .idea/
build build
*.db
# Mac OS # Mac OS
.DS_Store .DS_Store
...@@ -29,3 +28,10 @@ build ...@@ -29,3 +28,10 @@ build
# pprof # pprof
pprof/* pprof/*
# Database Files
*.bin
*.gob
# Nohup Files - https://man7.org/linux/man-pages/man1/nohup.1p.html
nohup.*
...@@ -13,7 +13,7 @@ FROM golang:1.16-alpine AS builder ...@@ -13,7 +13,7 @@ FROM golang:1.16-alpine AS builder
# Set necessary environment variables needed for our image. # Set necessary environment variables needed for our image.
ENV CGO_ENABLED=1 GOOS=linux GOARCH=amd64 ENV CGO_ENABLED=1 GOOS=linux GOARCH=amd64
RUN apk add libpcap-dev gcc g++ make bash RUN apk add libpcap-dev gcc g++ make bash perl-utils
# Move to agent working directory (/agent-build). # Move to agent working directory (/agent-build).
WORKDIR /app/agent-build WORKDIR /app/agent-build
...@@ -24,7 +24,7 @@ COPY tap/go.mod tap/go.mod ../tap/ ...@@ -24,7 +24,7 @@ COPY tap/go.mod tap/go.mod ../tap/
COPY tap/api/go.* ../tap/api/ COPY tap/api/go.* ../tap/api/
RUN go mod download RUN go mod download
# cheap trick to make the build faster (As long as go.mod wasn't changes) # cheap trick to make the build faster (As long as go.mod wasn't changes)
RUN go list -f '{{.Path}}@{{.Version}}' -m all | sed 1d | grep -e 'go-cache' -e 'sqlite' | xargs go get RUN go list -f '{{.Path}}@{{.Version}}' -m all | sed 1d | grep -e 'go-cache' | xargs go get
ARG COMMIT_HASH ARG COMMIT_HASH
ARG GIT_BRANCH ARG GIT_BRANCH
...@@ -41,16 +41,24 @@ RUN go build -ldflags="-s -w \ ...@@ -41,16 +41,24 @@ RUN go build -ldflags="-s -w \
-X 'mizuserver/pkg/version.BuildTimestamp=${BUILD_TIMESTAMP}' \ -X 'mizuserver/pkg/version.BuildTimestamp=${BUILD_TIMESTAMP}' \
-X 'mizuserver/pkg/version.SemVer=${SEM_VER}'" -o mizuagent . -X 'mizuserver/pkg/version.SemVer=${SEM_VER}'" -o mizuagent .
# Download Basenine executable, verify the sha1sum and move it to a directory in $PATH
ADD https://github.com/up9inc/basenine/releases/download/v0.2.19/basenine_linux_amd64 ./basenine_linux_amd64
ADD https://github.com/up9inc/basenine/releases/download/v0.2.19/basenine_linux_amd64.sha256 ./basenine_linux_amd64.sha256
RUN shasum -a 256 -c basenine_linux_amd64.sha256
RUN chmod +x ./basenine_linux_amd64
COPY devops/build_extensions.sh .. COPY devops/build_extensions.sh ..
RUN cd .. && /bin/bash build_extensions.sh RUN cd .. && /bin/bash build_extensions.sh
FROM alpine:3.14 FROM alpine:3.14
RUN apk add bash libpcap-dev tcpdump RUN apk add bash libpcap-dev
WORKDIR /app WORKDIR /app
# Copy binary and config files from /build to root folder of scratch container. # Copy binary and config files from /build to root folder of scratch container.
COPY --from=builder ["/app/agent-build/mizuagent", "."] COPY --from=builder ["/app/agent-build/mizuagent", "."]
COPY --from=builder ["/app/agent-build/basenine_linux_amd64", "/usr/local/bin/basenine"]
COPY --from=builder ["/app/agent/build/extensions", "extensions"] COPY --from=builder ["/app/agent/build/extensions", "extensions"]
COPY --from=site-build ["/app/ui-build/build", "site"] COPY --from=site-build ["/app/ui-build/build", "site"]
RUN mkdir /app/data/ RUN mkdir /app/data/
......
...@@ -4,16 +4,21 @@ ...@@ -4,16 +4,21 @@
A simple-yet-powerful API traffic viewer for Kubernetes enabling you to view all API communication between microservices to help your debug and troubleshoot regressions. A simple-yet-powerful API traffic viewer for Kubernetes enabling you to view all API communication between microservices to help your debug and troubleshoot regressions.
Think TCPDump and Chrome Dev Tools combined. Think TCPDump and Wireshark re-invented for Kubernetes.
![Simple UI](assets/mizu-ui.png) ![Simple UI](assets/mizu-ui.png)
## Features ## Features
- Simple and powerful CLI - Simple and powerful CLI
- Real-time view of all HTTP requests, REST and gRPC API calls - Monitoring network traffic in real-time. Supported protocols:
- No installation or code instrumentation - [HTTP/1.1](https://datatracker.ietf.org/doc/html/rfc2616) (REST, etc.)
- Works completely on premises - [HTTP/2](https://datatracker.ietf.org/doc/html/rfc7540) (gRPC)
- [AMQP](https://www.rabbitmq.com/amqp-0-9-1-reference.html) (RabbitMQ, Apache Qpid, etc.)
- [Apache Kafka](https://kafka.apache.org/protocol)
- [Redis](https://redis.io/topics/protocol)
- Works with Kubernetes APIs. No installation or code instrumentation
- Rich filtering
## Requirements ## Requirements
...@@ -44,15 +49,6 @@ SHA256 checksums are available on the [Releases](https://github.com/up9inc/mizu/ ...@@ -44,15 +49,6 @@ SHA256 checksums are available on the [Releases](https://github.com/up9inc/mizu/
### Development (unstable) Build ### Development (unstable) Build
Pick one from the [Releases](https://github.com/up9inc/mizu/releases) page Pick one from the [Releases](https://github.com/up9inc/mizu/releases) page
## Kubeconfig & Permissions
While `mizu`most often works out of the box, you can influence its behavior:
1. [OPTIONAL] Set `KUBECONFIG` environment variable to your Kubernetes configuration. If this is not set, Mizu assumes that configuration is at `${HOME}/.kube/config`
2. `mizu` assumes user running the command has permissions to create resources (such as pods, services, namespaces) on your Kubernetes cluster (no worries - `mizu` resources are cleaned up upon termination)
For detailed list of k8s permissions see [PERMISSIONS](docs/PERMISSIONS.md) document
## How to Run ## How to Run
1. Find pods you'd like to tap to in your Kubernetes cluster 1. Find pods you'd like to tap to in your Kubernetes cluster
...@@ -83,7 +79,7 @@ To tap all pods in current namespace - ...@@ -83,7 +79,7 @@ To tap all pods in current namespace -
``` ```
To tap specific pod - ### To tap specific pod
```bash ```bash
$ kubectl get pods $ kubectl get pods
NAME READY STATUS RESTARTS AGE NAME READY STATUS RESTARTS AGE
...@@ -96,7 +92,7 @@ To tap specific pod - ...@@ -96,7 +92,7 @@ To tap specific pod -
^C ^C
``` ```
To tap multiple pods using regex - ### To tap multiple pods using regex
```bash ```bash
$ kubectl get pods $ kubectl get pods
NAME READY STATUS RESTARTS AGE NAME READY STATUS RESTARTS AGE
...@@ -114,20 +110,21 @@ To tap multiple pods using regex - ...@@ -114,20 +110,21 @@ To tap multiple pods using regex -
## Configuration ## Configuration
Mizu can work with config file which should be stored in ${HOME}/.mizu/config.yaml (macOS: ~/.mizu/config.yaml) <br /> Mizu can optionally work with a config file that can be provided as a CLI argument (using `--set config-path=<PATH>`) or if not provided, will be stored at ${HOME}/.mizu/config.yaml
In case no config file found, defaults will be used <br />
In case of partial configuration defined, all other fields will be used with defaults <br /> In case of partial configuration defined, all other fields will be used with defaults <br />
You can always override the defaults or config file with CLI flags You can always override the defaults or config file with CLI flags
To get the default config params run `mizu config` <br /> To get the default config params run `mizu config` <br />
To generate a new config file with default values use `mizu config -r` To generate a new config file with default values use `mizu config -r`
### Telemetry
By default, mizu reports usage telemetry. It can be disabled by adding a line of `telemetry: false` in the `${HOME}/.mizu/config.yaml` file ## Advanced Usage
### Kubeconfig
## Advanced Usage It is possible to change the kubeconfig path using `KUBECONFIG` environment variable or the command like flag
with `--set kube-config-path=<PATH>`. </br >
If both are not set - Mizu assumes that configuration is at `${HOME}/.kube/config`
### Namespace-Restricted Mode ### Namespace-Restricted Mode
...@@ -142,6 +139,8 @@ using the `--namespace` flag or by setting `tap.namespaces` in the config file ...@@ -142,6 +139,8 @@ using the `--namespace` flag or by setting `tap.namespaces` in the config file
Setting `mizu-resources-namespace=mizu` resets Mizu to its default behavior Setting `mizu-resources-namespace=mizu` resets Mizu to its default behavior
For detailed list of k8s permissions see [PERMISSIONS](docs/PERMISSIONS.md) document
### User agent filtering ### User agent filtering
User-agent filtering (like health checks) - can be configured using command-line options: User-agent filtering (like health checks) - can be configured using command-line options:
...@@ -182,23 +181,7 @@ and when changed it will support accessing by IP ...@@ -182,23 +181,7 @@ and when changed it will support accessing by IP
### Run in daemon mode ### Run in daemon mode
Mizu can be ran detached from the cli using the daemon flag: `mizu tap --daemon`. This type of mizu instance will run indefinitely in the cluster. Mizu can be run detached from the cli using the daemon flag: `mizu tap --daemon`. This type of mizu instance will run
indefinitely in the cluster.
Please note that daemon mode requires you to have RBAC creation permissions, see the [permissions](docs/PERMISSIONS.md) doc for more details.
In order to access a daemon mizu you will have to run `mizu view` after running the `tap --daemon` command.
To stop the detached mizu instance and clean all cluster side resources, run `mizu clean`
## How to Run local UI
- run from mizu/agent `go run main.go --hars-read --hars-dir <folder>`
- copy Har files into the folder from last command
- change `MizuWebsocketURL` and `apiURL` in `api.js` file
- run from mizu/ui - `npm run start`
- open browser on `localhost:3000` For more information please refer to [DAEMON MODE](docs/DAEMON_MODE.md)
test: ## Run acceptance tests. test: ## Run acceptance tests.
@go test ./... -timeout 1h @go test ./... -timeout 1h -v
...@@ -2,11 +2,12 @@ package acceptanceTests ...@@ -2,11 +2,12 @@ package acceptanceTests
import ( import (
"fmt" "fmt"
"gopkg.in/yaml.v3"
"io/ioutil" "io/ioutil"
"os" "os"
"os/exec" "os/exec"
"testing" "testing"
"gopkg.in/yaml.v3"
) )
type tapConfig struct { type tapConfig struct {
......
...@@ -3,6 +3,7 @@ module github.com/up9inc/mizu/tests ...@@ -3,6 +3,7 @@ module github.com/up9inc/mizu/tests
go 1.16 go 1.16
require ( require (
github.com/gorilla/websocket v1.4.2
github.com/up9inc/mizu/shared v0.0.0 github.com/up9inc/mizu/shared v0.0.0
gopkg.in/yaml.v3 v3.0.0-20210107192922-496545a6307b gopkg.in/yaml.v3 v3.0.0-20210107192922-496545a6307b
) )
......
...@@ -211,6 +211,7 @@ github.com/googleapis/gax-go/v2 v2.0.5/go.mod h1:DWXyrwAJ9X0FpwwEdw+IPEYBICEFu5m ...@@ -211,6 +211,7 @@ github.com/googleapis/gax-go/v2 v2.0.5/go.mod h1:DWXyrwAJ9X0FpwwEdw+IPEYBICEFu5m
github.com/googleapis/gnostic v0.4.1/go.mod h1:LRhVm6pbyptWbWbuZ38d1eyptfvIytN3ir6b65WBswg= github.com/googleapis/gnostic v0.4.1/go.mod h1:LRhVm6pbyptWbWbuZ38d1eyptfvIytN3ir6b65WBswg=
github.com/gopherjs/gopherjs v0.0.0-20181017120253-0766667cb4d1/go.mod h1:wJfORRmW1u3UXTncJ5qlYoELFm8eSnnEO6hX4iZ3EWY= github.com/gopherjs/gopherjs v0.0.0-20181017120253-0766667cb4d1/go.mod h1:wJfORRmW1u3UXTncJ5qlYoELFm8eSnnEO6hX4iZ3EWY=
github.com/gorilla/websocket v1.4.0/go.mod h1:E7qHFY5m1UJ88s3WnNqhKjPHQ0heANvMoAMk2YaljkQ= github.com/gorilla/websocket v1.4.0/go.mod h1:E7qHFY5m1UJ88s3WnNqhKjPHQ0heANvMoAMk2YaljkQ=
github.com/gorilla/websocket v1.4.2 h1:+/TMaTYc4QFitKJxsQ7Yye35DkWvkdLcvGKqM+x0Ufc=
github.com/gorilla/websocket v1.4.2/go.mod h1:YR8l580nyteQvAITg2hZ9XVh4b55+EU/adAjf1fMHhE= github.com/gorilla/websocket v1.4.2/go.mod h1:YR8l580nyteQvAITg2hZ9XVh4b55+EU/adAjf1fMHhE=
github.com/gregjones/httpcache v0.0.0-20180305231024-9cad4c3443a7/go.mod h1:FecbI9+v66THATjSRHfNgh1IVFe/9kFxbXtjV0ctIMA= github.com/gregjones/httpcache v0.0.0-20180305231024-9cad4c3443a7/go.mod h1:FecbI9+v66THATjSRHfNgh1IVFe/9kFxbXtjV0ctIMA=
github.com/grpc-ecosystem/go-grpc-middleware v1.0.0/go.mod h1:FiyG127CGDf3tlThmgyCl78X/SZQqEOJBCDaAfeWzPs= github.com/grpc-ecosystem/go-grpc-middleware v1.0.0/go.mod h1:FiyG127CGDf3tlThmgyCl78X/SZQqEOJBCDaAfeWzPs=
...@@ -303,6 +304,7 @@ github.com/onsi/ginkgo v1.6.0/go.mod h1:lLunBs/Ym6LB5Z9jYTR76FiuTmxDTDusOGeTQH+W ...@@ -303,6 +304,7 @@ github.com/onsi/ginkgo v1.6.0/go.mod h1:lLunBs/Ym6LB5Z9jYTR76FiuTmxDTDusOGeTQH+W
github.com/onsi/ginkgo v1.11.0/go.mod h1:lLunBs/Ym6LB5Z9jYTR76FiuTmxDTDusOGeTQH+WWjE= github.com/onsi/ginkgo v1.11.0/go.mod h1:lLunBs/Ym6LB5Z9jYTR76FiuTmxDTDusOGeTQH+WWjE=
github.com/onsi/gomega v0.0.0-20170829124025-dcabb60a477c/go.mod h1:C1qb7wdrVGGVU+Z6iS04AVkA3Q65CEZX59MT0QO5uiA= github.com/onsi/gomega v0.0.0-20170829124025-dcabb60a477c/go.mod h1:C1qb7wdrVGGVU+Z6iS04AVkA3Q65CEZX59MT0QO5uiA=
github.com/onsi/gomega v1.7.0/go.mod h1:ex+gbHU/CVuBBDIJjb2X0qEXbFg53c61hWP/1CpauHY= github.com/onsi/gomega v1.7.0/go.mod h1:ex+gbHU/CVuBBDIJjb2X0qEXbFg53c61hWP/1CpauHY=
github.com/op/go-logging v0.0.0-20160315200505-970db520ece7 h1:lDH9UUVJtmYCjyT0CI4q8xvlXPxeZ0gYCVvWbmPlp88=
github.com/op/go-logging v0.0.0-20160315200505-970db520ece7/go.mod h1:HzydrMdWErDVzsI23lYNej1Htcns9BCg93Dk0bBINWk= github.com/op/go-logging v0.0.0-20160315200505-970db520ece7/go.mod h1:HzydrMdWErDVzsI23lYNej1Htcns9BCg93Dk0bBINWk=
github.com/opencontainers/go-digest v1.0.0/go.mod h1:0JzlMkj0TRzQZfJkVvzbP0HBR3IKzErnv2BNG4W4MAM= github.com/opencontainers/go-digest v1.0.0/go.mod h1:0JzlMkj0TRzQZfJkVvzbP0HBR3IKzErnv2BNG4W4MAM=
github.com/pascaldekloe/goe v0.0.0-20180627143212-57f6aae5913c/go.mod h1:lzWF7FIEvWOWxwDKqyGYQf6ZUaNfKdP144TG7ZOy1lc= github.com/pascaldekloe/goe v0.0.0-20180627143212-57f6aae5913c/go.mod h1:lzWF7FIEvWOWxwDKqyGYQf6ZUaNfKdP144TG7ZOy1lc=
......
This diff is collapsed.
...@@ -3,6 +3,7 @@ package acceptanceTests ...@@ -3,6 +3,7 @@ package acceptanceTests
import ( import (
"bytes" "bytes"
"encoding/json" "encoding/json"
"errors"
"fmt" "fmt"
"io/ioutil" "io/ioutil"
"net/http" "net/http"
...@@ -10,9 +11,12 @@ import ( ...@@ -10,9 +11,12 @@ import (
"os/exec" "os/exec"
"path" "path"
"strings" "strings"
"sync"
"syscall" "syscall"
"testing"
"time" "time"
"github.com/gorilla/websocket"
"github.com/up9inc/mizu/shared" "github.com/up9inc/mizu/shared"
) )
...@@ -24,8 +28,26 @@ const ( ...@@ -24,8 +28,26 @@ const (
defaultServiceName = "httpbin" defaultServiceName = "httpbin"
defaultEntriesCount = 50 defaultEntriesCount = 50
waitAfterTapPodsReady = 3 * time.Second waitAfterTapPodsReady = 3 * time.Second
cleanCommandTimeout = 1 * time.Minute
) )
type PodDescriptor struct {
Name string
Namespace string
}
func isPodDescriptorInPodArray(pods []map[string]interface{}, podDescriptor PodDescriptor) bool {
for _, pod := range pods {
podNamespace := pod["namespace"].(string)
podName := pod["name"].(string)
if podDescriptor.Namespace == podNamespace && strings.Contains(podName, podDescriptor.Name) {
return true
}
}
return false
}
func getCliPath() (string, error) { func getCliPath() (string, error) {
dir, filePathErr := os.Getwd() dir, filePathErr := os.Getwd()
if filePathErr != nil { if filePathErr != nil {
...@@ -59,7 +81,11 @@ func getProxyUrl(namespace string, service string) string { ...@@ -59,7 +81,11 @@ func getProxyUrl(namespace string, service string) string {
} }
func getApiServerUrl(port uint16) string { func getApiServerUrl(port uint16) string {
return fmt.Sprintf("http://localhost:%v/mizu", port) return fmt.Sprintf("http://localhost:%v", port)
}
func getWebSocketUrl(port uint16) string {
return fmt.Sprintf("ws://localhost:%v/ws", port)
} }
func getDefaultCommandArgs() []string { func getDefaultCommandArgs() []string {
...@@ -67,8 +93,9 @@ func getDefaultCommandArgs() []string { ...@@ -67,8 +93,9 @@ func getDefaultCommandArgs() []string {
telemetry := "telemetry=false" telemetry := "telemetry=false"
agentImage := "agent-image=gcr.io/up9-docker-hub/mizu/ci:0.0.0" agentImage := "agent-image=gcr.io/up9-docker-hub/mizu/ci:0.0.0"
imagePullPolicy := "image-pull-policy=Never" imagePullPolicy := "image-pull-policy=Never"
headless := "headless=true"
return []string{setFlag, telemetry, setFlag, agentImage, setFlag, imagePullPolicy} return []string{setFlag, telemetry, setFlag, agentImage, setFlag, imagePullPolicy, setFlag, headless}
} }
func getDefaultTapCommandArgs() []string { func getDefaultTapCommandArgs() []string {
...@@ -78,6 +105,10 @@ func getDefaultTapCommandArgs() []string { ...@@ -78,6 +105,10 @@ func getDefaultTapCommandArgs() []string {
return append([]string{tapCommand}, defaultCmdArgs...) return append([]string{tapCommand}, defaultCmdArgs...)
} }
func getDefaultTapCommandArgsWithDaemonMode() []string {
return append(getDefaultTapCommandArgs(), "--daemon")
}
func getDefaultTapCommandArgsWithRegex(regex string) []string { func getDefaultTapCommandArgsWithRegex(regex string) []string {
tapCommand := "tap" tapCommand := "tap"
defaultCmdArgs := getDefaultCommandArgs() defaultCmdArgs := getDefaultCommandArgs()
...@@ -103,6 +134,20 @@ func getDefaultConfigCommandArgs() []string { ...@@ -103,6 +134,20 @@ func getDefaultConfigCommandArgs() []string {
return append([]string{configCommand}, defaultCmdArgs...) return append([]string{configCommand}, defaultCmdArgs...)
} }
func getDefaultCleanCommandArgs() []string {
cleanCommand := "clean"
defaultCmdArgs := getDefaultCommandArgs()
return append([]string{cleanCommand}, defaultCmdArgs...)
}
func getDefaultViewCommandArgs() []string {
viewCommand := "view"
defaultCmdArgs := getDefaultCommandArgs()
return append([]string{viewCommand}, defaultCmdArgs...)
}
func retriesExecute(retriesCount int, executeFunc func() error) error { func retriesExecute(retriesCount int, executeFunc func() error) error {
var lastError interface{} var lastError interface{}
...@@ -195,16 +240,57 @@ func executeHttpGetRequest(url string) (interface{}, error) { ...@@ -195,16 +240,57 @@ func executeHttpGetRequest(url string) (interface{}, error) {
return executeHttpRequest(response, requestErr) return executeHttpRequest(response, requestErr)
} }
func executeHttpPostRequest(url string, body interface{}) (interface{}, error) { func executeHttpPostRequestWithHeaders(url string, headers map[string]string, body interface{}) (interface{}, error) {
requestBody, jsonErr := json.Marshal(body) requestBody, jsonErr := json.Marshal(body)
if jsonErr != nil { if jsonErr != nil {
return nil, jsonErr return nil, jsonErr
} }
response, requestErr := http.Post(url, "application/json", bytes.NewBuffer(requestBody)) request, err := http.NewRequest(http.MethodPost, url, bytes.NewBuffer(requestBody))
if err != nil {
return nil, err
}
request.Header.Add("Content-Type", "application/json")
for headerKey, headerValue := range headers {
request.Header.Add(headerKey, headerValue)
}
client := &http.Client{}
response, requestErr := client.Do(request)
return executeHttpRequest(response, requestErr) return executeHttpRequest(response, requestErr)
} }
func runMizuClean() error {
cliPath, err := getCliPath()
if err != nil {
return err
}
cleanCmdArgs := getDefaultCleanCommandArgs()
cleanCmd := exec.Command(cliPath, cleanCmdArgs...)
commandDone := make(chan error)
go func() {
if err := cleanCmd.Run(); err != nil {
commandDone <- err
}
commandDone <- nil
}()
select {
case err = <-commandDone:
if err != nil {
return err
}
case <-time.After(cleanCommandTimeout):
return errors.New("clean command timed out")
}
return nil
}
func cleanupCommand(cmd *exec.Cmd) error { func cleanupCommand(cmd *exec.Cmd) error {
if err := cmd.Process.Signal(syscall.SIGQUIT); err != nil { if err := cmd.Process.Signal(syscall.SIGQUIT); err != nil {
return err return err
...@@ -218,11 +304,10 @@ func cleanupCommand(cmd *exec.Cmd) error { ...@@ -218,11 +304,10 @@ func cleanupCommand(cmd *exec.Cmd) error {
} }
func getPods(tapStatusInterface interface{}) ([]map[string]interface{}, error) { func getPods(tapStatusInterface interface{}) ([]map[string]interface{}, error) {
tapStatus := tapStatusInterface.(map[string]interface{}) tapPodsInterface := tapStatusInterface.([]interface{})
podsInterface := tapStatus["pods"].([]interface{})
var pods []map[string]interface{} var pods []map[string]interface{}
for _, podInterface := range podsInterface { for _, podInterface := range tapPodsInterface {
pods = append(pods, podInterface.(map[string]interface{})) pods = append(pods, podInterface.(map[string]interface{}))
} }
...@@ -239,6 +324,87 @@ func getLogsPath() (string, error) { ...@@ -239,6 +324,87 @@ func getLogsPath() (string, error) {
return logsPath, nil return logsPath, nil
} }
func daemonCleanup(t *testing.T, viewCmd *exec.Cmd) {
if err := runMizuClean(); err != nil {
t.Logf("error running mizu clean: %v", err)
}
if err := cleanupCommand(viewCmd); err != nil {
t.Logf("failed to cleanup view command, err: %v", err)
}
}
// waitTimeout waits for the waitgroup for the specified max timeout.
// Returns true if waiting timed out.
func waitTimeout(wg *sync.WaitGroup, timeout time.Duration) bool {
channel := make(chan struct{})
go func() {
defer close(channel)
wg.Wait()
}()
select {
case <-channel:
return false // completed normally
case <-time.After(timeout):
return true // timed out
}
}
// checkEntriesAtLeast checks whether the number of entries greater than or equal to n
func checkEntriesAtLeast(entries []map[string]interface{}, n int) error {
if len(entries) < n {
return fmt.Errorf("Unexpected entries result - Expected more than %d entries", n-1)
}
return nil
}
// getDBEntries retrieves the entries from the database before the given timestamp.
// Also limits the results according to the limit parameter.
// Timeout for the WebSocket connection is defined by the timeout parameter.
func getDBEntries(timestamp int64, limit int, timeout time.Duration) (entries []map[string]interface{}, err error) {
query := fmt.Sprintf("timestamp < %d and limit(%d)", timestamp, limit)
webSocketUrl := getWebSocketUrl(defaultApiServerPort)
var connection *websocket.Conn
connection, _, err = websocket.DefaultDialer.Dial(webSocketUrl, nil)
if err != nil {
return
}
defer connection.Close()
handleWSConnection := func(wg *sync.WaitGroup) {
defer wg.Done()
for {
_, message, err := connection.ReadMessage()
if err != nil {
return
}
var data map[string]interface{}
if err = json.Unmarshal([]byte(message), &data); err != nil {
return
}
if data["messageType"] == "entry" {
entries = append(entries, data)
}
}
}
err = connection.WriteMessage(websocket.TextMessage, []byte(query))
if err != nil {
return
}
var wg sync.WaitGroup
go handleWSConnection(&wg)
wg.Add(1)
waitTimeout(&wg, timeout)
return
}
func Contains(slice []string, containsValue string) bool { func Contains(slice []string, containsValue string) bool {
for _, sliceValue := range slice { for _, sliceValue := range slice {
if sliceValue == containsValue { if sliceValue == containsValue {
......
...@@ -3,11 +3,11 @@ module mizuserver ...@@ -3,11 +3,11 @@ module mizuserver
go 1.16 go 1.16
require ( require (
github.com/antelman107/net-wait-go v0.0.0-20210623112055-cf684aebda7b
github.com/djherbis/atime v1.0.0 github.com/djherbis/atime v1.0.0
github.com/fsnotify/fsnotify v1.4.9
github.com/getkin/kin-openapi v0.76.0 github.com/getkin/kin-openapi v0.76.0
github.com/gin-contrib/static v0.0.1 github.com/gin-contrib/static v0.0.1
github.com/gin-gonic/gin v1.7.2 github.com/gin-gonic/gin v1.7.7
github.com/go-playground/locales v0.13.0 github.com/go-playground/locales v0.13.0
github.com/go-playground/universal-translator v0.17.0 github.com/go-playground/universal-translator v0.17.0
github.com/go-playground/validator/v10 v10.5.0 github.com/go-playground/validator/v10 v10.5.0
...@@ -16,13 +16,12 @@ require ( ...@@ -16,13 +16,12 @@ require (
github.com/op/go-logging v0.0.0-20160315200505-970db520ece7 github.com/op/go-logging v0.0.0-20160315200505-970db520ece7
github.com/orcaman/concurrent-map v0.0.0-20210106121528-16402b402231 github.com/orcaman/concurrent-map v0.0.0-20210106121528-16402b402231
github.com/patrickmn/go-cache v2.1.0+incompatible github.com/patrickmn/go-cache v2.1.0+incompatible
github.com/up9inc/basenine/client/go v0.0.0-20211215185650-10083bb9a1b3
github.com/up9inc/mizu/shared v0.0.0 github.com/up9inc/mizu/shared v0.0.0
github.com/up9inc/mizu/tap v0.0.0 github.com/up9inc/mizu/tap v0.0.0
github.com/up9inc/mizu/tap/api v0.0.0 github.com/up9inc/mizu/tap/api v0.0.0
github.com/yalp/jsonpath v0.0.0-20180802001716-5cc68e5049a0 github.com/yalp/jsonpath v0.0.0-20180802001716-5cc68e5049a0
go.mongodb.org/mongo-driver v1.7.1 golang.org/x/text v0.3.5 // indirect
gorm.io/driver/sqlite v1.1.4
gorm.io/gorm v1.21.8
k8s.io/api v0.21.2 k8s.io/api v0.21.2
k8s.io/apimachinery v0.21.2 k8s.io/apimachinery v0.21.2
k8s.io/client-go v0.21.2 k8s.io/client-go v0.21.2
......
This diff is collapsed.
...@@ -6,13 +6,10 @@ import ( ...@@ -6,13 +6,10 @@ import (
"errors" "errors"
"flag" "flag"
"fmt" "fmt"
"github.com/up9inc/mizu/shared/kubernetes"
"io/ioutil" "io/ioutil"
v1 "k8s.io/api/core/v1"
"mizuserver/pkg/api" "mizuserver/pkg/api"
"mizuserver/pkg/config" "mizuserver/pkg/config"
"mizuserver/pkg/controllers" "mizuserver/pkg/controllers"
"mizuserver/pkg/database"
"mizuserver/pkg/models" "mizuserver/pkg/models"
"mizuserver/pkg/providers" "mizuserver/pkg/providers"
"mizuserver/pkg/routes" "mizuserver/pkg/routes"
...@@ -20,6 +17,7 @@ import ( ...@@ -20,6 +17,7 @@ import (
"mizuserver/pkg/utils" "mizuserver/pkg/utils"
"net/http" "net/http"
"os" "os"
"os/exec"
"os/signal" "os/signal"
"path" "path"
"path/filepath" "path/filepath"
...@@ -28,10 +26,15 @@ import ( ...@@ -28,10 +26,15 @@ import (
"syscall" "syscall"
"time" "time"
"github.com/up9inc/mizu/shared/kubernetes"
v1 "k8s.io/api/core/v1"
"github.com/antelman107/net-wait-go/wait"
"github.com/gin-contrib/static" "github.com/gin-contrib/static"
"github.com/gin-gonic/gin" "github.com/gin-gonic/gin"
"github.com/gorilla/websocket" "github.com/gorilla/websocket"
"github.com/op/go-logging" "github.com/op/go-logging"
basenine "github.com/up9inc/basenine/client/go"
"github.com/up9inc/mizu/shared" "github.com/up9inc/mizu/shared"
"github.com/up9inc/mizu/shared/logger" "github.com/up9inc/mizu/shared/logger"
"github.com/up9inc/mizu/tap" "github.com/up9inc/mizu/tap"
...@@ -49,10 +52,12 @@ var harsDir = flag.String("hars-dir", "", "Directory to read hars from") ...@@ -49,10 +52,12 @@ var harsDir = flag.String("hars-dir", "", "Directory to read hars from")
var extensions []*tapApi.Extension // global var extensions []*tapApi.Extension // global
var extensionsMap map[string]*tapApi.Extension // global var extensionsMap map[string]*tapApi.Extension // global
var startTime int64
const ( const (
socketConnectionRetries = 10 socketConnectionRetries = 10
socketConnectionRetryDelay = time.Second * 2 socketConnectionRetryDelay = time.Second * 2
socketHandshakeTimeout = time.Second * 2 socketHandshakeTimeout = time.Second * 2
) )
func main() { func main() {
...@@ -89,17 +94,17 @@ func main() { ...@@ -89,17 +94,17 @@ func main() {
panic("API server address must be provided with --api-server-address when using --tap") panic("API server address must be provided with --api-server-address when using --tap")
} }
hostMode := os.Getenv(shared.HostModeEnvVar) == "1"
tapOpts := &tap.TapOpts{HostMode: hostMode}
tapTargets := getTapTargets() tapTargets := getTapTargets()
if tapTargets != nil { if tapTargets != nil {
tap.SetFilterAuthorities(tapTargets) tapOpts.FilterAuthorities = tapTargets
logger.Log.Infof("Filtering for the following authorities: %v", tap.GetFilterIPs()) logger.Log.Infof("Filtering for the following authorities: %v", tapOpts.FilterAuthorities)
} }
filteredOutputItemsChannel := make(chan *tapApi.OutputChannelItem) filteredOutputItemsChannel := make(chan *tapApi.OutputChannelItem)
filteringOptions := getTrafficFilteringOptions() filteringOptions := getTrafficFilteringOptions()
hostMode := os.Getenv(shared.HostModeEnvVar) == "1"
tapOpts := &tap.TapOpts{HostMode: hostMode}
tap.StartPassiveTapper(tapOpts, filteredOutputItemsChannel, extensions, filteringOptions) tap.StartPassiveTapper(tapOpts, filteredOutputItemsChannel, extensions, filteringOptions)
socketConnection, err := dialSocketWithRetry(*apiServerAddress, socketConnectionRetries, socketConnectionRetryDelay) socketConnection, err := dialSocketWithRetry(*apiServerAddress, socketConnectionRetries, socketConnectionRetryDelay)
if err != nil { if err != nil {
...@@ -109,7 +114,8 @@ func main() { ...@@ -109,7 +114,8 @@ func main() {
go pipeTapChannelToSocket(socketConnection, filteredOutputItemsChannel) go pipeTapChannelToSocket(socketConnection, filteredOutputItemsChannel)
} else if *apiServerMode { } else if *apiServerMode {
database.InitDataBase(config.Config.AgentDatabasePath) startBasenineServer(shared.BasenineHost, shared.BaseninePort)
startTime = time.Now().UnixNano() / int64(time.Millisecond)
api.StartResolving(*namespace) api.StartResolving(*namespace)
outputItemsChannel := make(chan *tapApi.OutputChannelItem) outputItemsChannel := make(chan *tapApi.OutputChannelItem)
...@@ -142,6 +148,53 @@ func main() { ...@@ -142,6 +148,53 @@ func main() {
logger.Log.Info("Exiting") logger.Log.Info("Exiting")
} }
func startBasenineServer(host string, port string) {
cmd := exec.Command("basenine", "-addr", host, "-port", port, "-persistent")
cmd.Dir = config.Config.AgentDatabasePath
cmd.Stdout = os.Stdout
cmd.Stderr = os.Stderr
err := cmd.Start()
if err != nil {
logger.Log.Panicf("Failed starting Basenine: %v", err)
}
if !wait.New(
wait.WithProto("tcp"),
wait.WithWait(200*time.Millisecond),
wait.WithBreak(50*time.Millisecond),
wait.WithDeadline(5*time.Second),
wait.WithDebug(true),
).Do([]string{fmt.Sprintf("%s:%s", host, port)}) {
logger.Log.Panicf("Basenine is not available: %v", err)
}
// Make a channel to gracefully exit Basenine.
channel := make(chan os.Signal)
signal.Notify(channel, os.Interrupt, syscall.SIGTERM)
// Handle the channel.
go func() {
<-channel
cmd.Process.Signal(syscall.SIGTERM)
}()
// Limit the database size to default 200MB
err = basenine.Limit(host, port, config.Config.MaxDBSizeBytes)
if err != nil {
logger.Log.Panicf("Error while limiting database size: %v", err)
}
for _, extension := range extensions {
macros := extension.Dissector.Macros()
for macro, expanded := range macros {
err = basenine.Macro(host, port, macro, expanded)
if err != nil {
logger.Log.Panicf("Error while adding a macro: %v", err)
}
}
}
}
func loadExtensions() { func loadExtensions() {
dir, _ := filepath.Abs(filepath.Dir(os.Args[0])) dir, _ := filepath.Abs(filepath.Dir(os.Args[0]))
extensionsDir := path.Join(dir, "./extensions/") extensionsDir := path.Join(dir, "./extensions/")
...@@ -154,7 +207,7 @@ func loadExtensions() { ...@@ -154,7 +207,7 @@ func loadExtensions() {
extensionsMap = make(map[string]*tapApi.Extension) extensionsMap = make(map[string]*tapApi.Extension)
for i, file := range files { for i, file := range files {
filename := file.Name() filename := file.Name()
logger.Log.Infof("Loading extension: %s\n", filename) logger.Log.Infof("Loading extension: %s", filename)
extension := &tapApi.Extension{ extension := &tapApi.Extension{
Path: path.Join(extensionsDir, filename), Path: path.Join(extensionsDir, filename),
} }
...@@ -166,7 +219,7 @@ func loadExtensions() { ...@@ -166,7 +219,7 @@ func loadExtensions() {
var ok bool var ok bool
dissector, ok = symDissector.(tapApi.Dissector) dissector, ok = symDissector.(tapApi.Dissector)
if err != nil || !ok { if err != nil || !ok {
panic(fmt.Sprintf("Failed to load the extension: %s\n", extension.Path)) panic(fmt.Sprintf("Failed to load the extension: %s", extension.Path))
} }
dissector.Register(extension) dissector.Register(extension)
extension.Dissector = dissector extension.Dissector = dissector
...@@ -179,7 +232,7 @@ func loadExtensions() { ...@@ -179,7 +232,7 @@ func loadExtensions() {
}) })
for _, extension := range extensions { for _, extension := range extensions {
logger.Log.Infof("Extension Properties: %+v\n", extension) logger.Log.Infof("Extension Properties: %+v", extension)
} }
controllers.InitExtensionsMap(extensionsMap) controllers.InitExtensionsMap(extensionsMap)
...@@ -200,7 +253,8 @@ func hostApi(socketHarOutputChannel chan<- *tapApi.OutputChannelItem) { ...@@ -200,7 +253,8 @@ func hostApi(socketHarOutputChannel chan<- *tapApi.OutputChannelItem) {
app.Use(static.ServeRoot("/", "./site")) app.Use(static.ServeRoot("/", "./site"))
app.Use(CORSMiddleware()) // This has to be called after the static middleware, does not work if its called before app.Use(CORSMiddleware()) // This has to be called after the static middleware, does not work if its called before
api.WebSocketRoutes(app, &eventHandlers) api.WebSocketRoutes(app, &eventHandlers, startTime)
routes.QueryRoutes(app)
routes.EntriesRoutes(app) routes.EntriesRoutes(app)
routes.MetadataRoutes(app) routes.MetadataRoutes(app)
routes.StatusRoutes(app) routes.StatusRoutes(app)
...@@ -210,7 +264,12 @@ func hostApi(socketHarOutputChannel chan<- *tapApi.OutputChannelItem) { ...@@ -210,7 +264,12 @@ func hostApi(socketHarOutputChannel chan<- *tapApi.OutputChannelItem) {
ctx, cancel := context.WithCancel(context.Background()) ctx, cancel := context.WithCancel(context.Background())
defer cancel() defer cancel()
if _, err := startMizuTapperSyncer(ctx); err != nil { kubernetesProvider, err := kubernetes.NewProviderInCluster()
if err != nil {
logger.Log.Fatalf("error creating k8s provider: %+v", err)
}
if _, err := startMizuTapperSyncer(ctx, kubernetesProvider); err != nil {
logger.Log.Fatalf("error initializing tapper syncer: %+v", err) logger.Log.Fatalf("error initializing tapper syncer: %+v", err)
} }
} }
...@@ -245,8 +304,8 @@ func CORSMiddleware() gin.HandlerFunc { ...@@ -245,8 +304,8 @@ func CORSMiddleware() gin.HandlerFunc {
} }
} }
func parseEnvVar(env string) map[string][]string { func parseEnvVar(env string) map[string][]v1.Pod {
var mapOfList map[string][]string var mapOfList map[string][]v1.Pod
val, present := os.LookupEnv(env) val, present := os.LookupEnv(env)
...@@ -256,12 +315,12 @@ func parseEnvVar(env string) map[string][]string { ...@@ -256,12 +315,12 @@ func parseEnvVar(env string) map[string][]string {
err := json.Unmarshal([]byte(val), &mapOfList) err := json.Unmarshal([]byte(val), &mapOfList)
if err != nil { if err != nil {
panic(fmt.Sprintf("env var %s's value of %s is invalid! must be map[string][]string %v", env, mapOfList, err)) panic(fmt.Sprintf("env var %s's value of %v is invalid! must be map[string][]v1.Pod %v", env, mapOfList, err))
} }
return mapOfList return mapOfList
} }
func getTapTargets() []string { func getTapTargets() []v1.Pod {
nodeName := os.Getenv(shared.NodeNameEnvVar) nodeName := os.Getenv(shared.NodeNameEnvVar)
tappedAddressesPerNodeDict := parseEnvVar(shared.TappedAddressesPerNodeDictEnvVar) tappedAddressesPerNodeDict := parseEnvVar(shared.TappedAddressesPerNodeDictEnvVar)
return tappedAddressesPerNodeDict[nodeName] return tappedAddressesPerNodeDict[nodeName]
...@@ -344,10 +403,11 @@ func getSyncEntriesConfig() *shared.SyncEntriesConfig { ...@@ -344,10 +403,11 @@ func getSyncEntriesConfig() *shared.SyncEntriesConfig {
} }
func determineLogLevel() (logLevel logging.Level) { func determineLogLevel() (logLevel logging.Level) {
logLevel = logging.INFO logLevel, err := logging.LogLevel(os.Getenv(shared.LogLevelEnvVar))
if os.Getenv(shared.DebugModeEnvVar) == "1" { if err != nil {
logLevel = logging.DEBUG logLevel = logging.INFO
} }
return return
} }
...@@ -361,7 +421,7 @@ func dialSocketWithRetry(socketAddress string, retryAmount int, retryDelay time. ...@@ -361,7 +421,7 @@ func dialSocketWithRetry(socketAddress string, retryAmount int, retryDelay time.
socketConnection, _, err := dialer.Dial(socketAddress, nil) socketConnection, _, err := dialer.Dial(socketAddress, nil)
if err != nil { if err != nil {
if i < retryAmount { if i < retryAmount {
logger.Log.Infof("socket connection to %s failed: %v, retrying %d out of %d in %d seconds...", socketAddress, err, i, retryAmount, retryDelay / time.Second) logger.Log.Infof("socket connection to %s failed: %v, retrying %d out of %d in %d seconds...", socketAddress, err, i, retryAmount, retryDelay/time.Second)
time.Sleep(retryDelay) time.Sleep(retryDelay)
} }
} else { } else {
...@@ -371,13 +431,7 @@ func dialSocketWithRetry(socketAddress string, retryAmount int, retryDelay time. ...@@ -371,13 +431,7 @@ func dialSocketWithRetry(socketAddress string, retryAmount int, retryDelay time.
return nil, lastErr return nil, lastErr
} }
func startMizuTapperSyncer(ctx context.Context, provider *kubernetes.Provider) (*kubernetes.MizuTapperSyncer, error) {
func startMizuTapperSyncer(ctx context.Context) (*kubernetes.MizuTapperSyncer, error){
provider, err := kubernetes.NewProviderInCluster()
if err != nil {
return nil, err
}
tapperSyncer, err := kubernetes.CreateAndStartMizuTapperSyncer(ctx, provider, kubernetes.TapperSyncerConfig{ tapperSyncer, err := kubernetes.CreateAndStartMizuTapperSyncer(ctx, provider, kubernetes.TapperSyncerConfig{
TargetNamespaces: config.Config.TargetNamespaces, TargetNamespaces: config.Config.TargetNamespaces,
PodFilterRegex: config.Config.TapTargetRegex.Regexp, PodFilterRegex: config.Config.TapTargetRegex.Regexp,
...@@ -385,11 +439,12 @@ func startMizuTapperSyncer(ctx context.Context) (*kubernetes.MizuTapperSyncer, e ...@@ -385,11 +439,12 @@ func startMizuTapperSyncer(ctx context.Context) (*kubernetes.MizuTapperSyncer, e
AgentImage: config.Config.AgentImage, AgentImage: config.Config.AgentImage,
TapperResources: config.Config.TapperResources, TapperResources: config.Config.TapperResources,
ImagePullPolicy: v1.PullPolicy(config.Config.PullPolicy), ImagePullPolicy: v1.PullPolicy(config.Config.PullPolicy),
DumpLogs: config.Config.DumpLogs, LogLevel: config.Config.LogLevel,
IgnoredUserAgents: config.Config.IgnoredUserAgents, IgnoredUserAgents: config.Config.IgnoredUserAgents,
MizuApiFilteringOptions: config.Config.MizuApiFilteringOptions, MizuApiFilteringOptions: config.Config.MizuApiFilteringOptions,
MizuServiceAccountExists: true, //assume service account exists since daemon mode will not function without it anyway MizuServiceAccountExists: true, //assume service account exists since daemon mode will not function without it anyway
}) Istio: config.Config.Istio,
}, time.Now())
if err != nil { if err != nil {
return nil, err return nil, err
...@@ -405,19 +460,31 @@ func startMizuTapperSyncer(ctx context.Context) (*kubernetes.MizuTapperSyncer, e ...@@ -405,19 +460,31 @@ func startMizuTapperSyncer(ctx context.Context) (*kubernetes.MizuTapperSyncer, e
return return
} }
logger.Log.Fatalf("fatal tap syncer error: %v", syncerErr) logger.Log.Fatalf("fatal tap syncer error: %v", syncerErr)
case _, ok := <-tapperSyncer.TapPodChangesOut: case tapPodChangeEvent, ok := <-tapperSyncer.TapPodChangesOut:
if !ok { if !ok {
logger.Log.Debug("mizuTapperSyncer pod changes channel closed, ending listener loop") logger.Log.Debug("mizuTapperSyncer pod changes channel closed, ending listener loop")
return return
} }
tapStatus := shared.TapStatus{Pods: kubernetes.GetPodInfosForPods(tapperSyncer.CurrentlyTappedPods)} providers.TapStatus = shared.TapStatus{Pods: kubernetes.GetPodInfosForPods(tapperSyncer.CurrentlyTappedPods)}
tappedPodsStatus := utils.GetTappedPodsStatus()
serializedTapStatus, err := json.Marshal(shared.CreateWebSocketStatusMessage(tapStatus)) serializedTapStatus, err := json.Marshal(shared.CreateWebSocketStatusMessage(tappedPodsStatus))
if err != nil { if err != nil {
logger.Log.Fatalf("error serializing tap status: %v", err) logger.Log.Fatalf("error serializing tap status: %v", err)
} }
api.BroadcastToBrowserClients(serializedTapStatus) api.BroadcastToBrowserClients(serializedTapStatus)
providers.TapStatus.Pods = tapStatus.Pods providers.ExpectedTapperAmount = tapPodChangeEvent.ExpectedTapperAmount
case tapperStatus, ok := <-tapperSyncer.TapperStatusChangedOut:
if !ok {
logger.Log.Debug("mizuTapperSyncer tapper status changed channel closed, ending listener loop")
return
}
if providers.TappersStatus == nil {
providers.TappersStatus = make(map[string]shared.TapperStatus)
}
providers.TappersStatus[tapperStatus.NodeName] = tapperStatus
case <-ctx.Done(): case <-ctx.Done():
logger.Log.Debug("mizuTapperSyncer event listener loop exiting due to context done") logger.Log.Debug("mizuTapperSyncer event listener loop exiting due to context done")
return return
......
...@@ -5,7 +5,6 @@ import ( ...@@ -5,7 +5,6 @@ import (
"context" "context"
"encoding/json" "encoding/json"
"fmt" "fmt"
"mizuserver/pkg/database"
"mizuserver/pkg/holder" "mizuserver/pkg/holder"
"mizuserver/pkg/providers" "mizuserver/pkg/providers"
"os" "os"
...@@ -14,15 +13,16 @@ import ( ...@@ -14,15 +13,16 @@ import (
"strings" "strings"
"time" "time"
"go.mongodb.org/mongo-driver/bson/primitive"
"github.com/google/martian/har" "github.com/google/martian/har"
"github.com/up9inc/mizu/shared"
"github.com/up9inc/mizu/shared/logger" "github.com/up9inc/mizu/shared/logger"
tapApi "github.com/up9inc/mizu/tap/api" tapApi "github.com/up9inc/mizu/tap/api"
"mizuserver/pkg/models" "mizuserver/pkg/models"
"mizuserver/pkg/resolver" "mizuserver/pkg/resolver"
"mizuserver/pkg/utils" "mizuserver/pkg/utils"
basenine "github.com/up9inc/basenine/client/go"
) )
var k8sResolver *resolver.Resolver var k8sResolver *resolver.Resolver
...@@ -76,7 +76,7 @@ func startReadingFiles(workingDir string) { ...@@ -76,7 +76,7 @@ func startReadingFiles(workingDir string) {
sort.Sort(utils.ByModTime(harFiles)) sort.Sort(utils.ByModTime(harFiles))
if len(harFiles) == 0 { if len(harFiles) == 0 {
logger.Log.Infof("Waiting for new files\n") logger.Log.Infof("Waiting for new files")
time.Sleep(3 * time.Second) time.Sleep(3 * time.Second)
continue continue
} }
...@@ -99,11 +99,17 @@ func startReadingChannel(outputItems <-chan *tapApi.OutputChannelItem, extension ...@@ -99,11 +99,17 @@ func startReadingChannel(outputItems <-chan *tapApi.OutputChannelItem, extension
panic("Channel of captured messages is nil") panic("Channel of captured messages is nil")
} }
connection, err := basenine.NewConnection(shared.BasenineHost, shared.BaseninePort)
if err != nil {
panic(err)
}
connection.InsertMode()
disableOASValidation := false disableOASValidation := false
ctx := context.Background() ctx := context.Background()
doc, contractContent, router, err := loadOAS(ctx) doc, contractContent, router, err := loadOAS(ctx)
if err != nil { if err != nil {
logger.Log.Infof("Disabled OAS validation: %s\n", err.Error()) logger.Log.Infof("Disabled OAS validation: %s", err.Error())
disableOASValidation = true disableOASValidation = true
} }
...@@ -112,13 +118,13 @@ func startReadingChannel(outputItems <-chan *tapApi.OutputChannelItem, extension ...@@ -112,13 +118,13 @@ func startReadingChannel(outputItems <-chan *tapApi.OutputChannelItem, extension
extension := extensionsMap[item.Protocol.Name] extension := extensionsMap[item.Protocol.Name]
resolvedSource, resolvedDestionation := resolveIP(item.ConnectionInfo) resolvedSource, resolvedDestionation := resolveIP(item.ConnectionInfo)
mizuEntry := extension.Dissector.Analyze(item, primitive.NewObjectID().Hex(), resolvedSource, resolvedDestionation) mizuEntry := extension.Dissector.Analyze(item, resolvedSource, resolvedDestionation)
baseEntry := extension.Dissector.Summarize(mizuEntry) baseEntry := extension.Dissector.Summarize(mizuEntry)
mizuEntry.EstimatedSizeBytes = getEstimatedEntrySizeBytes(mizuEntry) mizuEntry.Base = baseEntry
if extension.Protocol.Name == "http" { if extension.Protocol.Name == "http" {
if !disableOASValidation { if !disableOASValidation {
var httpPair tapApi.HTTPRequestResponsePair var httpPair tapApi.HTTPRequestResponsePair
json.Unmarshal([]byte(mizuEntry.Entry), &httpPair) json.Unmarshal([]byte(mizuEntry.HTTPPair), &httpPair)
contract := handleOAS(ctx, doc, router, httpPair.Request.Payload.RawRequest, httpPair.Response.Payload.RawResponse, contractContent) contract := handleOAS(ctx, doc, router, httpPair.Request.Payload.RawRequest, httpPair.Response.Payload.RawResponse, contractContent)
baseEntry.ContractStatus = contract.Status baseEntry.ContractStatus = contract.Status
...@@ -128,18 +134,18 @@ func startReadingChannel(outputItems <-chan *tapApi.OutputChannelItem, extension ...@@ -128,18 +134,18 @@ func startReadingChannel(outputItems <-chan *tapApi.OutputChannelItem, extension
mizuEntry.ContractContent = contract.Content mizuEntry.ContractContent = contract.Content
} }
var pair tapApi.RequestResponsePair harEntry, err := utils.NewEntry(mizuEntry.Request, mizuEntry.Response, mizuEntry.StartTime, mizuEntry.ElapsedTime)
json.Unmarshal([]byte(mizuEntry.Entry), &pair)
harEntry, err := utils.NewEntry(&pair)
if err == nil { if err == nil {
rules, _, _ := models.RunValidationRulesState(*harEntry, mizuEntry.Service) rules, _, _ := models.RunValidationRulesState(*harEntry, mizuEntry.Destination.Name)
baseEntry.Rules = rules baseEntry.Rules = rules
} }
} }
database.CreateEntry(mizuEntry)
baseEntryBytes, _ := models.CreateBaseEntryWebSocketMessage(baseEntry) data, err := json.Marshal(mizuEntry)
BroadcastToBrowserClients(baseEntryBytes) if err != nil {
panic(err)
}
connection.SendText(string(data))
} }
} }
...@@ -148,7 +154,7 @@ func resolveIP(connectionInfo *tapApi.ConnectionInfo) (resolvedSource string, re ...@@ -148,7 +154,7 @@ func resolveIP(connectionInfo *tapApi.ConnectionInfo) (resolvedSource string, re
unresolvedSource := connectionInfo.ClientIP unresolvedSource := connectionInfo.ClientIP
resolvedSource = k8sResolver.Resolve(unresolvedSource) resolvedSource = k8sResolver.Resolve(unresolvedSource)
if resolvedSource == "" { if resolvedSource == "" {
logger.Log.Debugf("Cannot find resolved name to source: %s\n", unresolvedSource) logger.Log.Debugf("Cannot find resolved name to source: %s", unresolvedSource)
if os.Getenv("SKIP_NOT_RESOLVED_SOURCE") == "1" { if os.Getenv("SKIP_NOT_RESOLVED_SOURCE") == "1" {
return return
} }
...@@ -156,7 +162,7 @@ func resolveIP(connectionInfo *tapApi.ConnectionInfo) (resolvedSource string, re ...@@ -156,7 +162,7 @@ func resolveIP(connectionInfo *tapApi.ConnectionInfo) (resolvedSource string, re
unresolvedDestination := fmt.Sprintf("%s:%s", connectionInfo.ServerIP, connectionInfo.ServerPort) unresolvedDestination := fmt.Sprintf("%s:%s", connectionInfo.ServerIP, connectionInfo.ServerPort)
resolvedDestination = k8sResolver.Resolve(unresolvedDestination) resolvedDestination = k8sResolver.Resolve(unresolvedDestination)
if resolvedDestination == "" { if resolvedDestination == "" {
logger.Log.Debugf("Cannot find resolved name to dest: %s\n", unresolvedDestination) logger.Log.Debugf("Cannot find resolved name to dest: %s", unresolvedDestination)
if os.Getenv("SKIP_NOT_RESOLVED_DEST") == "1" { if os.Getenv("SKIP_NOT_RESOLVED_DEST") == "1" {
return return
} }
...@@ -171,21 +177,3 @@ func CheckIsServiceIP(address string) bool { ...@@ -171,21 +177,3 @@ func CheckIsServiceIP(address string) bool {
} }
return k8sResolver.CheckIsServiceIP(address) return k8sResolver.CheckIsServiceIP(address)
} }
// gives a rough estimate of the size this will take up in the db, good enough for maintaining db size limit accurately
func getEstimatedEntrySizeBytes(mizuEntry *tapApi.MizuEntry) int {
sizeBytes := len(mizuEntry.Entry)
sizeBytes += len(mizuEntry.EntryId)
sizeBytes += len(mizuEntry.Service)
sizeBytes += len(mizuEntry.Url)
sizeBytes += len(mizuEntry.Method)
sizeBytes += len(mizuEntry.RequestSenderIp)
sizeBytes += len(mizuEntry.ResolvedDestination)
sizeBytes += len(mizuEntry.ResolvedSource)
sizeBytes += 8 // Status bytes (sqlite integer is always 8 bytes)
sizeBytes += 8 // Timestamp bytes
sizeBytes += 8 // SizeBytes bytes
sizeBytes += 1 // IsOutgoing bytes
return sizeBytes
}
package api package api
import ( import (
"encoding/json"
"errors" "errors"
"fmt"
"mizuserver/pkg/models"
"net/http" "net/http"
"sync" "sync"
"time" "time"
"github.com/gin-gonic/gin" "github.com/gin-gonic/gin"
"github.com/gorilla/websocket" "github.com/gorilla/websocket"
basenine "github.com/up9inc/basenine/client/go"
"github.com/up9inc/mizu/shared"
"github.com/up9inc/mizu/shared/debounce" "github.com/up9inc/mizu/shared/debounce"
"github.com/up9inc/mizu/shared/logger" "github.com/up9inc/mizu/shared/logger"
) )
...@@ -39,17 +44,17 @@ func init() { ...@@ -39,17 +44,17 @@ func init() {
connectedWebsockets = make(map[int]*SocketConnection, 0) connectedWebsockets = make(map[int]*SocketConnection, 0)
} }
func WebSocketRoutes(app *gin.Engine, eventHandlers EventHandlers) { func WebSocketRoutes(app *gin.Engine, eventHandlers EventHandlers, startTime int64) {
app.GET("/ws", func(c *gin.Context) { app.GET("/ws", func(c *gin.Context) {
websocketHandler(c.Writer, c.Request, eventHandlers, false) websocketHandler(c.Writer, c.Request, eventHandlers, false, startTime)
}) })
app.GET("/wsTapper", func(c *gin.Context) { app.GET("/wsTapper", func(c *gin.Context) {
websocketHandler(c.Writer, c.Request, eventHandlers, true) websocketHandler(c.Writer, c.Request, eventHandlers, true, startTime)
}) })
} }
func websocketHandler(w http.ResponseWriter, r *http.Request, eventHandlers EventHandlers, isTapper bool) { func websocketHandler(w http.ResponseWriter, r *http.Request, eventHandlers EventHandlers, isTapper bool, startTime int64) {
conn, err := websocketUpgrader.Upgrade(w, r, nil) ws, err := websocketUpgrader.Upgrade(w, r, nil)
if err != nil { if err != nil {
logger.Log.Errorf("Failed to set websocket upgrade: %v", err) logger.Log.Errorf("Failed to set websocket upgrade: %v", err)
return return
...@@ -59,30 +64,117 @@ func websocketHandler(w http.ResponseWriter, r *http.Request, eventHandlers Even ...@@ -59,30 +64,117 @@ func websocketHandler(w http.ResponseWriter, r *http.Request, eventHandlers Even
connectedWebsocketIdCounter++ connectedWebsocketIdCounter++
socketId := connectedWebsocketIdCounter socketId := connectedWebsocketIdCounter
connectedWebsockets[socketId] = &SocketConnection{connection: conn, lock: &sync.Mutex{}, eventHandlers: eventHandlers, isTapper: isTapper} connectedWebsockets[socketId] = &SocketConnection{connection: ws, lock: &sync.Mutex{}, eventHandlers: eventHandlers, isTapper: isTapper}
websocketIdsLock.Unlock() websocketIdsLock.Unlock()
var connection *basenine.Connection
var isQuerySet bool
// `!isTapper` means it's a connection from the web UI
if !isTapper {
connection, err = basenine.NewConnection(shared.BasenineHost, shared.BaseninePort)
if err != nil {
panic(err)
}
}
data := make(chan []byte)
meta := make(chan []byte)
defer func() { defer func() {
data <- []byte(basenine.CloseChannel)
meta <- []byte(basenine.CloseChannel)
connection.Close()
socketCleanup(socketId, connectedWebsockets[socketId]) socketCleanup(socketId, connectedWebsockets[socketId])
}() }()
eventHandlers.WebSocketConnect(socketId, isTapper) eventHandlers.WebSocketConnect(socketId, isTapper)
startTimeBytes, _ := models.CreateWebsocketStartTimeMessage(startTime)
SendToSocket(socketId, startTimeBytes)
for { for {
_, msg, err := conn.ReadMessage() _, msg, err := ws.ReadMessage()
if err != nil { if err != nil {
logger.Log.Errorf("Error reading message, socket id: %d, error: %v", socketId, err) logger.Log.Errorf("Error reading message, socket id: %d, error: %v", socketId, err)
break break
} }
eventHandlers.WebSocketMessage(socketId, msg)
if !isTapper && !isQuerySet {
query := string(msg)
err = basenine.Validate(shared.BasenineHost, shared.BaseninePort, query)
if err != nil {
toastBytes, _ := models.CreateWebsocketToastMessage(&models.ToastMessage{
Type: "error",
AutoClose: 5000,
Text: fmt.Sprintf("Syntax error: %s", err.Error()),
})
SendToSocket(socketId, toastBytes)
break
}
isQuerySet = true
handleDataChannel := func(c *basenine.Connection, data chan []byte) {
for {
bytes := <-data
if string(bytes) == basenine.CloseChannel {
return
}
var dataMap map[string]interface{}
err = json.Unmarshal(bytes, &dataMap)
var base map[string]interface{}
switch dataMap["base"].(type) {
case map[string]interface{}:
base = dataMap["base"].(map[string]interface{})
base["id"] = uint(dataMap["id"].(float64))
default:
logger.Log.Debugf("Base field has an unrecognized type: %+v", dataMap)
continue
}
baseEntryBytes, _ := models.CreateBaseEntryWebSocketMessage(base)
SendToSocket(socketId, baseEntryBytes)
}
}
handleMetaChannel := func(c *basenine.Connection, meta chan []byte) {
for {
bytes := <-meta
if string(bytes) == basenine.CloseChannel {
return
}
var metadata *basenine.Metadata
err = json.Unmarshal(bytes, &metadata)
if err != nil {
logger.Log.Debugf("Error recieving metadata: %v", err.Error())
}
metadataBytes, _ := models.CreateWebsocketQueryMetadataMessage(metadata)
SendToSocket(socketId, metadataBytes)
}
}
go handleDataChannel(connection, data)
go handleMetaChannel(connection, meta)
connection.Query(query, data, meta)
} else {
eventHandlers.WebSocketMessage(socketId, msg)
}
} }
} }
func socketCleanup(socketId int, socketConnection *SocketConnection) { func socketCleanup(socketId int, socketConnection *SocketConnection) {
err := socketConnection.connection.Close() err := socketConnection.connection.Close()
if err != nil { if err != nil {
logger.Log.Errorf("Error closing socket connection for socket id %d: %v\n", socketId, err) logger.Log.Errorf("Error closing socket connection for socket id %d: %v", socketId, err)
} }
websocketIdsLock.Lock() websocketIdsLock.Lock()
......
...@@ -65,14 +65,14 @@ func (h *RoutesEventHandlers) WebSocketMessage(_ int, message []byte) { ...@@ -65,14 +65,14 @@ func (h *RoutesEventHandlers) WebSocketMessage(_ int, message []byte) {
var socketMessageBase shared.WebSocketMessageMetadata var socketMessageBase shared.WebSocketMessageMetadata
err := json.Unmarshal(message, &socketMessageBase) err := json.Unmarshal(message, &socketMessageBase)
if err != nil { if err != nil {
logger.Log.Infof("Could not unmarshal websocket message %v\n", err) logger.Log.Infof("Could not unmarshal websocket message %v", err)
} else { } else {
switch socketMessageBase.MessageType { switch socketMessageBase.MessageType {
case shared.WebSocketMessageTypeTappedEntry: case shared.WebSocketMessageTypeTappedEntry:
var tappedEntryMessage models.WebSocketTappedEntryMessage var tappedEntryMessage models.WebSocketTappedEntryMessage
err := json.Unmarshal(message, &tappedEntryMessage) err := json.Unmarshal(message, &tappedEntryMessage)
if err != nil { if err != nil {
logger.Log.Infof("Could not unmarshal message of message type %s %v\n", socketMessageBase.MessageType, err) logger.Log.Infof("Could not unmarshal message of message type %s %v", socketMessageBase.MessageType, err)
} else { } else {
// NOTE: This is where the message comes back from the intermediate WebSocket to code. // NOTE: This is where the message comes back from the intermediate WebSocket to code.
h.SocketOutChannel <- tappedEntryMessage.Data h.SocketOutChannel <- tappedEntryMessage.Data
...@@ -81,16 +81,15 @@ func (h *RoutesEventHandlers) WebSocketMessage(_ int, message []byte) { ...@@ -81,16 +81,15 @@ func (h *RoutesEventHandlers) WebSocketMessage(_ int, message []byte) {
var statusMessage shared.WebSocketStatusMessage var statusMessage shared.WebSocketStatusMessage
err := json.Unmarshal(message, &statusMessage) err := json.Unmarshal(message, &statusMessage)
if err != nil { if err != nil {
logger.Log.Infof("Could not unmarshal message of message type %s %v\n", socketMessageBase.MessageType, err) logger.Log.Infof("Could not unmarshal message of message type %s %v", socketMessageBase.MessageType, err)
} else { } else {
providers.TapStatus.Pods = statusMessage.TappingStatus.Pods
BroadcastToBrowserClients(message) BroadcastToBrowserClients(message)
} }
case shared.WebsocketMessageTypeOutboundLink: case shared.WebsocketMessageTypeOutboundLink:
var outboundLinkMessage models.WebsocketOutboundLinkMessage var outboundLinkMessage models.WebsocketOutboundLinkMessage
err := json.Unmarshal(message, &outboundLinkMessage) err := json.Unmarshal(message, &outboundLinkMessage)
if err != nil { if err != nil {
logger.Log.Infof("Could not unmarshal message of message type %s %v\n", socketMessageBase.MessageType, err) logger.Log.Infof("Could not unmarshal message of message type %s %v", socketMessageBase.MessageType, err)
} else { } else {
handleTLSLink(outboundLinkMessage) handleTLSLink(outboundLinkMessage)
} }
......
...@@ -2,14 +2,19 @@ package controllers ...@@ -2,14 +2,19 @@ package controllers
import ( import (
"encoding/json" "encoding/json"
"fmt"
"github.com/gin-gonic/gin"
tapApi "github.com/up9inc/mizu/tap/api"
"mizuserver/pkg/database"
"mizuserver/pkg/models" "mizuserver/pkg/models"
"mizuserver/pkg/utils" "mizuserver/pkg/utils"
"mizuserver/pkg/validation" "mizuserver/pkg/validation"
"net/http" "net/http"
"strconv"
"time"
"github.com/gin-gonic/gin"
basenine "github.com/up9inc/basenine/client/go"
"github.com/up9inc/mizu/shared"
"github.com/up9inc/mizu/shared/logger"
tapApi "github.com/up9inc/mizu/tap/api"
) )
var extensionsMap map[string]*tapApi.Extension // global var extensionsMap map[string]*tapApi.Extension // global
...@@ -18,78 +23,113 @@ func InitExtensionsMap(ref map[string]*tapApi.Extension) { ...@@ -18,78 +23,113 @@ func InitExtensionsMap(ref map[string]*tapApi.Extension) {
extensionsMap = ref extensionsMap = ref
} }
func Error(c *gin.Context, err error) bool {
if err != nil {
logger.Log.Errorf("Error getting entry: %v", err)
c.Error(err)
c.AbortWithStatusJSON(http.StatusInternalServerError, gin.H{
"error": true,
"type": "error",
"autoClose": "5000",
"msg": err.Error(),
})
return true // signal that there was an error and the caller should return
}
return false // no error, can continue
}
func GetEntries(c *gin.Context) { func GetEntries(c *gin.Context) {
entriesFilter := &models.EntriesFilter{} entriesRequest := &models.EntriesRequest{}
if err := c.BindQuery(entriesFilter); err != nil { if err := c.BindQuery(entriesRequest); err != nil {
c.JSON(http.StatusBadRequest, err) c.JSON(http.StatusBadRequest, err)
} }
err := validation.Validate(entriesFilter) validationError := validation.Validate(entriesRequest)
if err != nil { if validationError != nil {
c.JSON(http.StatusBadRequest, err) c.JSON(http.StatusBadRequest, validationError)
} }
order := database.OperatorToOrderMapping[entriesFilter.Operator] if entriesRequest.TimeoutMs == 0 {
operatorSymbol := database.OperatorToSymbolMapping[entriesFilter.Operator] entriesRequest.TimeoutMs = 3000
var entries []tapApi.MizuEntry
database.GetEntriesTable().
Order(fmt.Sprintf("timestamp %s", order)).
Where(fmt.Sprintf("timestamp %s %v", operatorSymbol, entriesFilter.Timestamp)).
Limit(entriesFilter.Limit).
Find(&entries)
if len(entries) > 0 && order == database.OrderDesc {
// the entries always order from oldest to newest - we should reverse
utils.ReverseSlice(entries)
} }
baseEntries := make([]tapApi.BaseEntryDetails, 0) data, meta, err := basenine.Fetch(shared.BasenineHost, shared.BaseninePort,
for _, entry := range entries { entriesRequest.LeftOff, entriesRequest.Direction, entriesRequest.Query,
baseEntryDetails := tapApi.BaseEntryDetails{} entriesRequest.Limit, time.Duration(entriesRequest.TimeoutMs)*time.Millisecond)
if err := models.GetEntry(&entry, &baseEntryDetails); err != nil { if err != nil {
continue c.JSON(http.StatusInternalServerError, validationError)
} }
var pair tapApi.RequestResponsePair response := &models.EntriesResponse{}
json.Unmarshal([]byte(entry.Entry), &pair) var dataSlice []interface{}
harEntry, err := utils.NewEntry(&pair)
if err == nil { for _, row := range data {
rules, _, _ := models.RunValidationRulesState(*harEntry, entry.Service) var dataMap map[string]interface{}
baseEntryDetails.Rules = rules err = json.Unmarshal(row, &dataMap)
if err != nil {
c.JSON(http.StatusBadRequest, gin.H{
"error": true,
"type": "error",
"autoClose": "5000",
"msg": string(row),
})
return // exit
} }
baseEntries = append(baseEntries, baseEntryDetails) base := dataMap["base"].(map[string]interface{})
base["id"] = uint(dataMap["id"].(float64))
dataSlice = append(dataSlice, base)
}
var metadata *basenine.Metadata
err = json.Unmarshal(meta, &metadata)
if err != nil {
logger.Log.Debugf("Error recieving metadata: %v", err.Error())
} }
c.JSON(http.StatusOK, baseEntries) response.Data = dataSlice
response.Meta = metadata
c.JSON(http.StatusOK, response)
} }
func GetEntry(c *gin.Context) { func GetEntry(c *gin.Context) {
var entryData tapApi.MizuEntry id, _ := strconv.Atoi(c.Param("id"))
database.GetEntriesTable(). var entry tapApi.MizuEntry
Where(map[string]string{"entryId": c.Param("entryId")}). bytes, err := basenine.Single(shared.BasenineHost, shared.BaseninePort, id)
First(&entryData) if Error(c, err) {
return // exit
}
err = json.Unmarshal(bytes, &entry)
if err != nil {
c.JSON(http.StatusNotFound, gin.H{
"error": true,
"type": "error",
"autoClose": "5000",
"msg": string(bytes),
})
return // exit
}
extension := extensionsMap[entryData.ProtocolName] extension := extensionsMap[entry.Protocol.Name]
protocol, representation, bodySize, _ := extension.Dissector.Represent(&entryData) representation, bodySize, _ := extension.Dissector.Represent(entry.Request, entry.Response)
var rules []map[string]interface{} var rules []map[string]interface{}
var isRulesEnabled bool var isRulesEnabled bool
if entryData.ProtocolName == "http" { if entry.Protocol.Name == "http" {
var pair tapApi.RequestResponsePair harEntry, _ := utils.NewEntry(entry.Request, entry.Response, entry.StartTime, entry.ElapsedTime)
json.Unmarshal([]byte(entryData.Entry), &pair) _, rulesMatched, _isRulesEnabled := models.RunValidationRulesState(*harEntry, entry.Destination.Name)
harEntry, _ := utils.NewEntry(&pair)
_, rulesMatched, _isRulesEnabled := models.RunValidationRulesState(*harEntry, entryData.Service)
isRulesEnabled = _isRulesEnabled isRulesEnabled = _isRulesEnabled
inrec, _ := json.Marshal(rulesMatched) inrec, _ := json.Marshal(rulesMatched)
json.Unmarshal(inrec, &rules) json.Unmarshal(inrec, &rules)
} }
c.JSON(http.StatusOK, tapApi.MizuEntryWrapper{ c.JSON(http.StatusOK, tapApi.MizuEntryWrapper{
Protocol: protocol, Protocol: entry.Protocol,
Representation: string(representation), Representation: string(representation),
BodySize: bodySize, BodySize: bodySize,
Data: entryData, Data: entry,
Rules: rules, Rules: rules,
IsRulesEnabled: isRulesEnabled, IsRulesEnabled: isRulesEnabled,
}) })
......
package controllers
import (
"net/http"
"github.com/gin-gonic/gin"
basenine "github.com/up9inc/basenine/client/go"
"github.com/up9inc/mizu/shared"
)
type ValidateResponse struct {
Valid bool `json:"valid"`
Message string `json:"message"`
}
func PostValidate(c *gin.Context) {
query := c.PostForm("query")
valid := true
message := ""
err := basenine.Validate(shared.BasenineHost, shared.BaseninePort, query)
if err != nil {
valid = false
message = err.Error()
}
c.JSON(http.StatusOK, ValidateResponse{
Valid: valid,
Message: message,
})
}
Supports Markdown
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment