Unverified Commit e6968512 authored by David Levanon's avatar David Levanon Committed by GitHub
Browse files

Tapper Refactor (#396)

* introduce tcp_assembler and tcp_packet_source - the motivation is to … (#380)

* add passive-tapper main tester (#353)

* add passive-tapper main tester

* add errors to go.sum of mizu agent

* disable host mode for tester - to avoid filterAuthorities

* rename main to tester

* build extenssions as part of the tester launch

* add a README to the tester

* solving go.mod and .sum conflicts with addition of go-errors

* trivial warning fixes (#354)

* add passive-tapper main tester

* trivial warning fixes

* add errors to go.sum of mizu agent

* disable host mode for tester - to avoid filterAuthorities

* tcp streams map (#355)

* add passive-tapper main tester

* trivial warning fixes

* add errors to go.sum of mizu agent

* tcp streams map

* disable host mode for tester - to avoid filterAuthorities

* set tcp streams map for tcp stream factory

* change rlog to mizu logger

* errors map (#356)

* add passive-tapper main tester
...
parent 4e50e17d
develop Fix_images_imports Remove-the-using-of-leftOff-from-websocket TRA-4276_Folder_structure_refactor TRA-4579_Fix_acceptance_test_of_right_panel_checkboxes TRA-4602_Timeline_bars_to_traffic_statistics TRA-4612_Traffic_stats_add_time_range_filter UI/Service-map-GUI-improvements UI/fix/general-gui-fixes UI/servicemap-dynamic-protocol-filters Update_material_ui_to_v5 acceptance_test_by_demand alongir-patch-1 alongir-patch-2 bug/TRA-3831_fix_negative_body bug/TRA-4387_debugging bug/TRA-4387_fix_tapping_status bug/ui/TRA-4169_apply_qury_by_enter bug/ui/TRA-4176_plus_icon_is_not_visible bug/ui/TRA-4437_line_numbers_checkbox_not_always_working bug/ui/TRA-4513_grpc_heading_overlap_request_size- chore/ignore-gob chore/makefile-lint-rule chore/remove-tap-tester ci/lint ci_improvments codegen-placeholders common_migration daemon_docs debug/disable_redaction debug/oas_no_reset debug/profile debug/profile_nimrod debug/profile_nimrod_stable debug/profile_tmp debug/reading-channel debug/triggering_tcp_kprobe_with_go debug/verbose_watch_logs dep_update edit-docs example-limiting exp/oas-gen-2 exp/oas-gen-develop feat/acceptance-test-image-caching feat/accumulative-stats feat/accumulative-stats-with-time-frame feat/add-nginx feat/add-option-to-pass-context feat/add-protocols-to-the-endpoint feat/add-timing-endpoint-for-stats feat/afpacket feat/amqp-req-res-matcher feat/basenine-mongodb feat/basenine-mongodb-mess feat/basenine-mongodb-roee feat/basenine-separate-container feat/basenine-xml-helper feat/bfl-syntax-highlighting feat/capture-source-indicator feat/change-db feat/cypress-10-migration feat/derive-summary-method-fields feat/docker-from-arm64-to-amd64 feat/ebpf-arm64 feat/ebpf-go-abi0 feat/entry-item-as-summary feat/fetch-before-anything-else feat/filter-selected-entries feat/flush-reset feat/golang-tls feat/graphql feat/improve-go-tls-address-availability feat/indexing feat/insertion-filter feat/install_helm_rename feat/log-total-active-tcp-streams-and-goroutines feat/merge-stats-endpoints-and-add-auto-interval feat/namespace-field-ui feat/oas-better-clustering feat/oas-counters feat/oas-form-posts feat/oas-improvements feat/oas-parameter-patterns feat/path-segments feat/query-fetch-first feat/reclustering feat/reply-http-endpoint feat/request-response-size feat/socket-full-entries feat/split-service-src-dst-name feat/tapper-capabilities feat/ui-paging-mode feat/ui-paging-mode-3 feat/upgrade-basenine-json-redact-helpers feature/TRA-3821_k8s_version_check feature/TRA-3842_daemon_mode feature/TRA-3842_daemon_mode1 feature/TRA-3842_daemon_tests feature/TRA-3850_mizu_clean_command feature/TRA-3860_move_config_to_configmap feature/TRA-3868_move_watches_to_shared feature/TRA-3903_daemon_followup feature/TRA-3903_fix_daemon_mode_ns_restricted feature/TRA-3991/use-caps-instead-of-priv feature/TRA-4065_support_inflight_tap_target_update feature/TRA-4075_integrate_user_management feature/TRA-4075_notify_first_login feature/TRA-4077_login_design feature/TRA-4090_get_standalone_config feature/TRA-4120_consistent_tap_config feature/TRA-4151_kubernetes_provider_singleton feature/TRA-4167_mizu_post_install_check feature/TRA-4190_telemetry_execution_time feature/TRA-4192_tag_traffic_namespace feature/TRA-4197_PCAP_tests feature/TRA-4202_role_management feature/TRA-4203_user_management_be feature/TRA-4211_workspaces_be feature/TRA-4215_post_install_proxy_check feature/TRA-4239_pre_check feature/TRA-4259_service_map_ui_improvement feature/TRA-4263_extensibility_refactor feature/TRA-4263_partial_revert feature/TRA-4266_rabbitmq_acceptance_test feature/TRA-4320_common_entries_controller_code feature/TRA-4323_depedency_injection_oas_servicemap feature/TRA-4355_cleanup_install feature/TRA-4365_Support_multiple_workspaces feature/TRA-4442_UI_performance feature/TRA-4602_Traffic_statistics feature/TRA-4622_Remove_rules_feature_UI feature/add-tls-tapper feature/allow-port-forward feature/allow_custom_resources feature/auto-discover-envoy-processes feature/build-push-to-docker-hub feature/change_logger feature/change_redact_to_opt_in feature/changelog feature/close-finished-live-streams feature/close-gopacket-conn-immediately feature/david-poc feature/elasticsearch feature/fe_report_is_tapped feature/fix-service-mesh feature/fix-tls-missing-address feature/fix-tls-not-listening feature/improve_tls_info_with_kprobes feature/limit-fd-to-address-map feature/log-fixes feature/multiarch_build feature/multiarch_build_static feature/rabbitmq_test feature/redis_test feature/redis_test_backup feature/remove_contract feature/remove_duplicate_data feature/remove_redundant_field feature/remove_rules feature/reorginize-some-tls-code-after-refactor feature/service-mesh-badge feature/service_mapping feature/set-pcap-bpf-filter feature/stop-tapping-self-tapper-traffic feature/support-linkerd feature/support-listen-to-multiple-netns feature/support-tcp-keepalive feature/support-tls-big-buffers feature/switch_debug_mode_with_log_level feature/tap_telemetry feature/throttling feature/throttling-by-live-streams feature/tls-ebpf-error-handling feature/ui/Mizu_enterprise_preparing feature/ui/Mizu_enterprise_ui_bugs feature/ui/Mizu_support_node16 feature/ui/TRA-4089_Mizu_ent_frame feature/ui/TRA-4122_OAS_dialog feature/ui/TRA-4122_OAS_dialog_2 feature/ui/TRA-4133_Mizu_build feature/ui/TRA-4159_Mizu_state_management feature/ui/TRA-4192_workspace_management feature/ui/TRA-4205_Mizu_ent_react_router feature/ui/TRA-4321_remove_reset_button feature/ui/TRA-4519_oas_searchable_dropdown feature/ui/traffic-viewer fix/TRA-3986_check_token_validity fix/TRA-4219_blue_bar_many_workspace_support fix/TRA-4396_check_pull_image_flag fix/acceptance-tests-fetch-50 fix/acceptance-tests-fetch-wait fix/acceptance-tests-setup-improve fix/add-tests-for-buckets-statistics fix/add-timing-params-to-stats-endpoint fix/amqp-nil-response-payload fix/amqp-tests fix/better-close-other-dissectors fix/better-close-other-dissectors-tmp-1 fix/check-filter-by-method fix/cli-linker-issues fix/cli-semver-error fix/cli-windows-build fix/delete_daemonset_on_helm_uninstall_enterprise fix/delete_daemonset_on_helm_uninstall_enterprise2 fix/dockerfile fix/entry-title-responsive fix/eslint_warning fix/go-ebpf-arm64 fix/go-ebpf-log-level fix/idle-cpu-usage fix/initialize_tapper_before_ws fix/kafka-api-key-name fix/kafka-nested fix/limit-rlimit fix/log-when-cannot-start-tappers fix/log_permission_error_in_resource_creation fix/matcher-map fix/matcher-map-tmp fix/no_tls_on_enterprise fix/nodefrag fix/oas-service-names fix/path_refresh_support fix/print_http_error_response_details fix/protos fix/queryable-body fix/queryable-src-name-css fix/regration-fixes-24.3 fix/remove-grpc-related-mods fix/remove-isselected-state fix/remove-up9-analyze-feature fix/req-res-matcher fix/resolve_src_in_istio fix/revert-static-cli fix/selected-entries-original fix/selected-entries-original2 fix/selected-entries-original5 fix/selected-entries-original6 fix/some-golang-inspection-solutions fix/spawn-only-two-goroutines fix/support_workspaces_in_oas fix/take_ownership_over_daemonset fix/tls-tapper-client-hello-log-revert fix/toleration-to-api-server fix/tooltip-css fix/trim-query fix/ui-feed-perf fix/ui/Service-map-GUI-issues_TRA-4499 fix/ui/Service-map-filtering-is-not-updated-after-refresh_TRA-4497 fix/ui/ServiceMapModal-filters fix/ui/TRA-4219_blue_bar_many_workspace_support fix/ui/service-graph-protocols-unSelection-_TRA-4522 fix/update_tap_targets_over_ws fix/websocket-disconnects fix_cluster_url_excaping fix_rbac_error_handling fix_ver_param githubactions_test go_mod_fix helm_version_update hotfix/TRA-4451_TLS_icon_position_develop hotfix/TRA-4451_fix_TLS_icon_position hotfix/install_typo lib_upgrades main master more_extensible_agent move_general_functions_from_timeline_stats_to_helpers oas-feed-rework origin/ui/TRA-4204_user_managment react_and_node_major_packages_upgrades readme_change refactor/cleanup_unused_debug_print refactor/generalize_watch_loop refactor/loadoas-log-level refactor/merge_watch_channels refactor/tap-logging refactor_ws remove_install_agent_features revert-941-develop revert/paging revert/pr-603 sararozenfeld-patch-1 sararozenfeld-patch-2 static_code_analysis support_stop_oas_service_map task/add_info_to_acceptance_test_slack task/delete-ebpf-object-files task/update_permission_examples tech_depth/golint test-leon test/API_calls test/enter_fix test/eslint test/first_test test/pcap-redis test/permission-acceptance test/right_side_check test/sanity_test test/service_map test/service_map_test test/tap_gui_test test/tap_ignored_user_agents test/tap_regex test/tap_regex_masking test/tep_redact_test test/ui-big-test test/unit-http tmp-acceptance-tests ui-common_package_update_143 ui/-download-request-replay ui/-replay-phase-2 ui/Link ui/Service-map-GUI-improvements ui/Service-map-split-to-ui-common ui/TRA-4255_show-50-recent-entries-and-continue-to-stream ui/TRA-4256_gui_oas_window ui/TRA-4406_add-an-information-icon ui/add-insertion-filter-to-settings-page ui/add-props-to-checkbox ui/fix/selectList-sticky-header-fix ui/fix/show-50-recent-records-on-load ui/mizu-ui-style-lint ui/replay-mizu-requests ui/with-loading ui_common_local_build validation_rules_fix 37.0 37.0-dev2 37.0-dev1 37.0-dev0 36.0 36.0-dev27 36.0-dev26 36.0-dev25 36.0-dev24 36.0-dev23 36.0-dev22 36.0-dev21 36.0-dev20 36.0-dev19 36.0-dev18 36.0-dev17 36.0-dev16 36.0-dev15 36.0-dev14 36.0-dev13 36.0-dev12 36.0-dev11 36.0-dev10 36.0-dev9 36.0-dev8 36.0-dev7 36.0-dev6 36.0-dev5 36.0-dev4 36.0-dev3 36.0-dev2 36.0-dev1 36.0-dev0 35.1 35.0 35.0-dev22 35.0-dev21 35.0-dev20 35.0-dev19 35.0-dev18 35.0-dev17 35.0-dev16 35.0-dev15 35.0-dev14 35.0-dev13 35.0-dev12 35.0-dev11 35.0-dev10 35.0-dev9 35.0-dev8 35.0-dev7 35.0-dev6 35.0-dev5 35.0-dev4 35.0-dev3 35.0-dev2 35.0-dev1 35.0-dev0 34.0 34.0-dev14 34.0-dev13 34.0-dev12 34.0-dev11 34.0-dev10 34.0-dev9 34.0-dev8 34.0-dev7 34.0-dev6 34.0-dev5 34.0-dev4 34.0-dev3 34.0-dev2 34.0-dev1 34.0-dev0 33.1 33.0 33.0-dev40 33.0-dev39 33.0-dev38 33.0-dev37 33.0-dev36 33.0-dev35 33.0-dev34 33.0-dev33 33.0-dev32 33.0-dev31 33.0-dev30 33.0-dev29 33.0-dev28 33.0-dev27 33.0-dev26 33.0-dev25 33.0-dev24 33.0-dev23 33.0-dev22 33.0-dev21 33.0-dev20 33.0-dev19 33.0-dev18 33.0-dev17 33.0-dev16 33.0-dev15 33.0-dev14 33.0-dev13 33.0-dev12 33.0-dev11 33.0-dev10 33.0-dev9 33.0-dev8 33.0-dev7 33.0-dev6 33.0-dev5 33.0-dev4 33.0-dev3 33.0-dev2 33.0-dev1 33.0-dev0 32.0 32.0-dev23 32.0-dev22 32.0-dev21 32.0-dev20 32.0-dev19 32.0-dev18 32.0-dev17 32.0-dev16 32.0-dev15 32.0-dev14 32.0-dev13 32.0-dev12 32.0-dev11 32.0-dev10 32.0-dev9 32.0-dev8 32.0-dev7 32.0-dev6 32.0-dev5 32.0-dev4 32.0-dev3 32.0-dev2 32.0-dev1 32.0-dev0 31.1 31.0 31.0-dev68 31.0-dev67 31.0-dev66 31.0-dev65 31.0-dev64 31.0-dev63 31.0-dev62 31.0-dev61 31.0-dev60 31.0-dev59 31.0-dev58 31.0-dev57 31.0-dev56 31.0-dev55 31.0-dev54 31.0-dev53 31.0-dev52 31.0-dev51 31.0-dev50 31.0-dev49 31.0-dev48 31.0-dev47 31.0-dev46 31.0-dev45 31.0-dev44 31.0-dev43 31.0-dev42 31.0-dev41 31.0-dev40 31.0-dev39 31.0-dev38 31.0-dev37 31.0-dev36 31.0-dev35 31.0-dev34 31.0-dev33 31.0-dev32 31.0-dev31 31.0-dev30 31.0-dev29 31.0-dev28 31.0-dev27 31.0-dev26 31.0-dev25 31.0-dev24 31.0-dev23 31.0-dev22 31.0-dev21 31.0-dev20 31.0-dev19 31.0-dev18 31.0-dev17 31.0-dev16 31.0-dev15 31.0-dev14 31.0-dev13 31.0-dev12 31.0-dev11 31.0-dev10 31.0-dev9 31.0-dev8 31.0-dev7 31.0-dev6 31.0-dev5 31.0-dev4 31.0-dev3 31.0-dev2 31.0-dev1 31.0-dev0 30.4 30.3 30.2 30.1 30.0 30.0-dev38 30.0-dev37 30.0-dev36 30.0-dev35 30.0-dev34 30.0-dev33 30.0-dev32 30.0-dev31 30.0-dev30 30.0-dev29 30.0-dev28 30.0-dev27 30.0-dev26 30.0-dev25 30.0-dev24 30.0-dev23 30.0-dev22 30.0-dev21 30.0-dev20 30.0-dev19 30.0-dev18 30.0-dev17 30.0-dev16 30.0-dev15 30.0-dev14 30.0-dev13 30.0-dev12 30.0-dev11 30.0-dev10 30.0-dev9 30.0-dev8 30.0-dev7 30.0-dev6 30.0-dev5 30.0-dev4 30.0-dev3 30.0-dev2 30.0-dev1 30.0-dev0 29.0 29.0-dev23 29.0-dev22 29.0-dev21 29.0-dev20 29.0-dev19 29.0-dev18 29.0-dev17 29.0-dev16 29.0-dev15 29.0-dev14 29.0-dev13 29.0-dev12 29.0-dev11 29.0-dev10 29.0-dev9 29.0-dev8 29.0-dev7 29.0-dev6 29.0-dev5 29.0-dev4 29.0-dev3 29.0-dev2 29.0-dev1 29.0-dev0 28.0 28.0-dev32 28.0-dev31 28.0-dev30 28.0-dev29 28.0-dev28 28.0-dev27 28.0-dev26 28.0-dev25 28.0-dev24 28.0-dev23 28.0-dev22 28.0-dev21 28.0-dev20 28.0-dev19 28.0-dev18 28.0-dev17 28.0-dev16 28.0-dev15 28.0-dev14 28.0-dev13 28.0-dev12 28.0-dev11 28.0-dev10 28.0-dev9 28.0-dev8 28.0-dev7 28.0-dev6 28.0-dev5 28.0-dev4 28.0-dev3 28.0-dev2 28.0-dev1 28.0-dev0 27.2 27.1 27.0 27.0-dev11 27.0-dev10 27.0-dev9 27.0-dev8 27.0-dev7 27.0-dev6 27.0-dev5 27.0-dev4 27.0-dev3 27.0-dev2 27.0-dev1 27.0-dev0 26.0 26.0-dev27 26.0-dev26 26.0-dev25 26.0-dev24 26.0-dev23 26.0-dev22 26.0-dev21 26.0-dev20 26.0-dev19 26.0-dev18 26.0-dev17 26.0-dev16 26.0-dev15 26.0-dev14 26.0-dev13 26.0-dev12 26.0-dev11 26.0-dev10 26.0-dev9 26.0-dev8 26.0-dev7 26.0-dev6 26.0-dev5 26.0-dev4 26.0-dev3 26.0-dev2 26.0-dev1 26.0-dev0 0.25.4 0.25.3 0.25.2 0.25.1 0.25.0 0.24.11 0.24.10 0.24.9 0.24.8 0.24.7 0.24.6 0.24.5 0.24.4 0.24.3 0.24.2 0.24.1 0.24.0 0.23.13 0.23.12 0.23.11 0.23.10 0.23.9 0.23.8 0.23.7 0.23.6 0.23.5 0.23.4 0.23.3 0.23.2 0.23.1 0.23.0 0.22.47 0.22.46 0.22.45 0.22.44 0.22.43 0.22.42 0.22.41 0.22.40 0.22.39 0.22.38 0.22.37 0.22.36 0.22.35 0.22.34 0.22.33 0.22.32 0.22.31 0.22.30 0.22.29 0.22.28 0.22.27 0.22.26 0.22.25 0.22.24 0.22.23 0.22.22 0.22.21 0.22.20 0.22.19 0.22.18 0.22.17 0.22.16 0.22.15 0.22.14 0.22.13 0.22.12 0.22.11 0.22.10 0.22.9 0.22.8 0.22.7 0.22.6 0.22.5 0.22.4 0.22.3 0.22.2 0.22.1 0.22.0 0.21.63 0.21.62 0.21.61 0.21.60 0.21.59 0.21.58 0.21.57 0.21.56 0.21.55 0.21.54 0.21.53 0.21.52 0.21.51 0.21.50 0.21.49 0.21.48 0.21.47 0.21.46 0.21.45 0.21.44 0.21.43 0.21.42 0.21.41 0.21.40 0.21.39 0.21.38 0.21.37 0.21.36 0.21.35 0.21.34 0.21.33 0.21.32 0.21.31 0.21.30 0.21.29 0.21.28 0.21.27 0.21.26 0.21.25 0.21.24 0.21.23 0.21.22 0.21.21 0.21.20 0.21.19 0.21.18 0.21.17 0.21.16 0.21.15 0.21.14 0.21.13 0.21.12 0.21.11 0.21.10 0.21.9 0.21.8 0.21.7 0.21.6 0.21.5 0.21.4 0.21.3 0.21.2 0.21.1 0.21.0 0.20.0 0.19.84 0.19.83 0.19.82 0.19.81 0.19.80 0.19.79 0.19.78 0.19.77 0.19.76 0.19.75 0.19.74 0.19.73 0.19.72 0.19.71 0.19.70 0.19.69 0.19.68 0.19.67 0.19.66 0.19.65 0.19.64 0.19.63 0.19.62 0.19.61 0.19.60 0.19.59 0.19.58 0.19.57 0.19.56 0.19.55 0.19.54 0.19.53 0.19.52 0.19.51 0.19.50 0.19.49 0.19.48 0.19.47 0.19.46 0.19.45 0.19.44 0.19.43 0.19.42 0.19.41 0.19.40 0.19.39 0.19.38 0.19.37 0.19.36 0.19.35 0.19.34 0.19.33 0.19.32 0.19.31 0.19.30 0.19.29 0.19.28 0.19.27 0.19.26 0.19.25 0.19.24 0.19.23 0.19.22 0.19.21 0.19.20 0.19.19 0.19.18 0.19.17 0.19.16 0.19.15 0.19.14 0.19.13 0.19.12 0.19.11 0.19.10 0.19.9 0.19.8 0.19.7 0.19.6 0.19.5 0.19.4 0.19.3 0.19.2 0.19.1 0.19.0 0.18.13 0.18.12 0.18.11 0.18.10 0.18.9 0.18.8 0.18.7 0.18.6 0.18.5 0.18.4 0.18.3 0.18.2 0.18.1 0.17.10 0.17.9 0.17.8 0.17.7 0.17.6 0.17.5 0.17.4 0.17.3
No related merge requests found
Showing with 710 additions and 393 deletions
+710 -393
package diagnose
import (
"fmt"
"os"
"runtime"
"runtime/pprof"
"strconv"
"time"
"github.com/up9inc/mizu/shared/logger"
"github.com/up9inc/mizu/tap/api"
)
var AppStats = api.AppStats{}
func StartMemoryProfiler(envDumpPath string, envTimeInterval string) {
dumpPath := "/app/pprof"
if envDumpPath != "" {
dumpPath = envDumpPath
}
timeInterval := 60
if envTimeInterval != "" {
if i, err := strconv.Atoi(envTimeInterval); err == nil {
timeInterval = i
}
}
logger.Log.Info("Profiling is on, results will be written to %s", dumpPath)
go func() {
if _, err := os.Stat(dumpPath); os.IsNotExist(err) {
if err := os.Mkdir(dumpPath, 0777); err != nil {
logger.Log.Fatal("could not create directory for profile: ", err)
}
}
for {
t := time.Now()
filename := fmt.Sprintf("%s/%s__mem.prof", dumpPath, t.Format("15_04_05"))
logger.Log.Infof("Writing memory profile to %s\n", filename)
f, err := os.Create(filename)
if err != nil {
logger.Log.Fatal("could not create memory profile: ", err)
}
runtime.GC() // get up-to-date statistics
if err := pprof.WriteHeapProfile(f); err != nil {
logger.Log.Fatal("could not write memory profile: ", err)
}
_ = f.Close()
time.Sleep(time.Second * time.Duration(timeInterval))
}
}()
}
func DumpMemoryProfile(filename string) error {
if filename == "" {
return nil
}
f, err := os.Create(filename)
if err != nil {
return err
}
defer f.Close()
if err := pprof.WriteHeapProfile(f); err != nil {
return err
}
return nil
}
package tap
package diagnose
import (
"fmt"
"sync"
"github.com/google/gopacket/examples/util"
"github.com/up9inc/mizu/shared/logger"
)
var TapErrors *errorsMap
type errorsMap struct {
errorsMap map[string]uint
outputLevel int
nErrors uint
OutputLevel int
ErrorsCount uint
errorsMapMutex sync.Mutex
}
func NewErrorsMap(outputLevel int) *errorsMap {
func InitializeErrorsMap(debug bool, verbose bool, quiet bool) {
var outputLevel int
defer util.Run()()
if debug {
outputLevel = 2
} else if verbose {
outputLevel = 1
} else if quiet {
outputLevel = -1
}
TapErrors = newErrorsMap(outputLevel)
}
func newErrorsMap(outputLevel int) *errorsMap {
return &errorsMap{
errorsMap: make(map[string]uint),
outputLevel: outputLevel,
OutputLevel: outputLevel,
}
}
......@@ -28,12 +46,12 @@ func NewErrorsMap(outputLevel int) *errorsMap {
*/
func (e *errorsMap) logError(minOutputLevel int, t string, s string, a ...interface{}) {
e.errorsMapMutex.Lock()
e.nErrors++
e.ErrorsCount++
nb := e.errorsMap[t]
e.errorsMap[t] = nb + 1
e.errorsMapMutex.Unlock()
if e.outputLevel >= minOutputLevel {
if e.OutputLevel >= minOutputLevel {
formatStr := fmt.Sprintf("%s: %s", t, s)
logger.Log.Errorf(formatStr, a...)
}
......@@ -51,10 +69,17 @@ func (e *errorsMap) Debug(s string, a ...interface{}) {
logger.Log.Debugf(s, a...)
}
func (e *errorsMap) getErrorsSummary() (int, string) {
func (e *errorsMap) GetErrorsSummary() (int, string) {
e.errorsMapMutex.Lock()
errorMapLen := len(e.errorsMap)
errorsSummery := fmt.Sprintf("%v", e.errorsMap)
e.errorsMapMutex.Unlock()
return errorMapLen, errorsSummery
}
func (e *errorsMap) PrintSummary() {
logger.Log.Infof("Errors: %d", e.ErrorsCount)
for t := range e.errorsMap {
logger.Log.Infof(" %s:\t\t%d", e, e.errorsMap[t])
}
}
package diagnose
import "github.com/up9inc/mizu/shared/logger"
type tapperInternalStats struct {
Ipdefrag int
MissedBytes int
Pkt int
Sz int
Totalsz int
RejectFsm int
RejectOpt int
RejectConnFsm int
Reassembled int
OutOfOrderBytes int
OutOfOrderPackets int
BiggestChunkBytes int
BiggestChunkPackets int
OverlapBytes int
OverlapPackets int
}
var InternalStats *tapperInternalStats
func InitializeTapperInternalStats() {
InternalStats = &tapperInternalStats{}
}
func (stats *tapperInternalStats) PrintStatsSummary() {
logger.Log.Infof("IPdefrag:\t\t%d", stats.Ipdefrag)
logger.Log.Infof("TCP stats:")
logger.Log.Infof(" missed bytes:\t\t%d", stats.MissedBytes)
logger.Log.Infof(" total packets:\t\t%d", stats.Pkt)
logger.Log.Infof(" rejected FSM:\t\t%d", stats.RejectFsm)
logger.Log.Infof(" rejected Options:\t%d", stats.RejectOpt)
logger.Log.Infof(" reassembled bytes:\t%d", stats.Sz)
logger.Log.Infof(" total TCP bytes:\t%d", stats.Totalsz)
logger.Log.Infof(" conn rejected FSM:\t%d", stats.RejectConnFsm)
logger.Log.Infof(" reassembled chunks:\t%d", stats.Reassembled)
logger.Log.Infof(" out-of-order packets:\t%d", stats.OutOfOrderPackets)
logger.Log.Infof(" out-of-order bytes:\t%d", stats.OutOfOrderBytes)
logger.Log.Infof(" biggest-chunk packets:\t%d", stats.BiggestChunkPackets)
logger.Log.Infof(" biggest-chunk bytes:\t%d", stats.BiggestChunkBytes)
logger.Log.Infof(" overlap packets:\t%d", stats.OverlapPackets)
logger.Log.Infof(" overlap bytes:\t\t%d", stats.OverlapBytes)
}
......@@ -3,6 +3,8 @@ package tap
import (
"net"
"strings"
"github.com/up9inc/mizu/tap/diagnose"
)
var privateIPBlocks []*net.IPNet
......@@ -55,7 +57,7 @@ func initPrivateIPBlocks() {
} {
_, block, err := net.ParseCIDR(cidr)
if err != nil {
tapErrors.Error("Private-IP-Block-Parse", "parse error on %q: %v", cidr, err)
diagnose.TapErrors.Error("Private-IP-Block-Parse", "parse error on %q: %v", cidr, err)
} else {
privateIPBlocks = append(privateIPBlocks, block)
}
......
......@@ -7,11 +7,11 @@ const (
)
type OutboundLink struct {
Src string
DstIP string
DstPort int
Src string
DstIP string
DstPort int
SuggestedResolvedName string
SuggestedProtocol OutboundLinkProtocol
SuggestedProtocol OutboundLinkProtocol
}
func NewOutboundLinkWriter() *OutboundLinkWriter {
......@@ -26,11 +26,11 @@ type OutboundLinkWriter struct {
func (olw *OutboundLinkWriter) WriteOutboundLink(src string, DstIP string, DstPort int, SuggestedResolvedName string, SuggestedProtocol OutboundLinkProtocol) {
olw.OutChan <- &OutboundLink{
Src: src,
DstIP: DstIP,
DstPort: DstPort,
Src: src,
DstIP: DstIP,
DstPort: DstPort,
SuggestedResolvedName: SuggestedResolvedName,
SuggestedProtocol: SuggestedProtocol,
SuggestedProtocol: SuggestedProtocol,
}
}
......
......@@ -9,28 +9,17 @@
package tap
import (
"encoding/hex"
"encoding/json"
"flag"
"fmt"
"io"
"os"
"os/signal"
"runtime"
"runtime/pprof"
"strconv"
"strings"
"sync"
"time"
"github.com/google/gopacket"
"github.com/google/gopacket/examples/util"
"github.com/google/gopacket/ip4defrag"
"github.com/google/gopacket/layers" // pulls in all layers decoders
"github.com/google/gopacket/pcap"
"github.com/google/gopacket/reassembly"
"github.com/up9inc/mizu/shared/logger"
"github.com/up9inc/mizu/tap/api"
"github.com/up9inc/mizu/tap/diagnose"
"github.com/up9inc/mizu/tap/source"
)
const cleanPeriod = time.Second * 10
......@@ -62,28 +51,6 @@ var staleTimeoutSeconds = flag.Int("staletimout", 120, "Max time in seconds to k
var memprofile = flag.String("memprofile", "", "Write memory profile")
var appStats = api.AppStats{}
var tapErrors *errorsMap
// global
var stats struct {
ipdefrag int
missedBytes int
pkt int
sz int
totalsz int
rejectFsm int
rejectOpt int
rejectConnFsm int
reassembled int
outOfOrderBytes int
outOfOrderPackets int
biggestChunkBytes int
biggestChunkPackets int
overlapBytes int
overlapPackets int
}
type TapOpts struct {
HostMode bool
}
......@@ -110,353 +77,121 @@ func inArrayString(arr []string, valueToCheck string) bool {
return false
}
// Context
// The assembler context
type Context struct {
CaptureInfo gopacket.CaptureInfo
}
func GetStats() api.AppStats {
return appStats
}
func (c *Context) GetCaptureInfo() gopacket.CaptureInfo {
return c.CaptureInfo
}
func StartPassiveTapper(opts *TapOpts, outputItems chan *api.OutputChannelItem, extensionsRef []*api.Extension, options *api.TrafficFilteringOptions) {
hostMode = opts.HostMode
extensions = extensionsRef
filteringOptions = options
if GetMemoryProfilingEnabled() {
startMemoryProfiler()
diagnose.StartMemoryProfiler(os.Getenv(MemoryProfilingDumpPath), os.Getenv(MemoryProfilingTimeIntervalSeconds))
}
go startPassiveTapper(outputItems)
}
func startMemoryProfiler() {
dumpPath := "/app/pprof"
envDumpPath := os.Getenv(MemoryProfilingDumpPath)
if envDumpPath != "" {
dumpPath = envDumpPath
}
timeInterval := 60
envTimeInterval := os.Getenv(MemoryProfilingTimeIntervalSeconds)
if envTimeInterval != "" {
if i, err := strconv.Atoi(envTimeInterval); err == nil {
timeInterval = i
}
}
logger.Log.Info("Profiling is on, results will be written to %s", dumpPath)
go func() {
if _, err := os.Stat(dumpPath); os.IsNotExist(err) {
if err := os.Mkdir(dumpPath, 0777); err != nil {
logger.Log.Fatal("could not create directory for profile: ", err)
}
}
for {
t := time.Now()
filename := fmt.Sprintf("%s/%s__mem.prof", dumpPath, t.Format("15_04_05"))
func printPeriodicStats(cleaner *Cleaner) {
statsPeriod := time.Second * time.Duration(*statsevery)
ticker := time.NewTicker(statsPeriod)
logger.Log.Infof("Writing memory profile to %s\n", filename)
f, err := os.Create(filename)
if err != nil {
logger.Log.Fatal("could not create memory profile: ", err)
}
runtime.GC() // get up-to-date statistics
if err := pprof.WriteHeapProfile(f); err != nil {
logger.Log.Fatal("could not write memory profile: ", err)
}
_ = f.Close()
time.Sleep(time.Second * time.Duration(timeInterval))
}
}()
for {
<-ticker.C
// Since the start
errorMapLen, errorsSummery := diagnose.TapErrors.GetErrorsSummary()
logger.Log.Infof("%v (errors: %v, errTypes:%v) - Errors Summary: %s",
time.Since(diagnose.AppStats.StartTime),
diagnose.TapErrors.ErrorsCount,
errorMapLen,
errorsSummery,
)
// At this moment
memStats := runtime.MemStats{}
runtime.ReadMemStats(&memStats)
logger.Log.Infof(
"mem: %d, goroutines: %d",
memStats.HeapAlloc,
runtime.NumGoroutine(),
)
// Since the last print
cleanStats := cleaner.dumpStats()
logger.Log.Infof(
"cleaner - flushed connections: %d, closed connections: %d, deleted messages: %d",
cleanStats.flushed,
cleanStats.closed,
cleanStats.deleted,
)
currentAppStats := diagnose.AppStats.DumpStats()
appStatsJSON, _ := json.Marshal(currentAppStats)
logger.Log.Infof("app stats - %v", string(appStatsJSON))
}
}
func startPassiveTapper(outputItems chan *api.OutputChannelItem) {
streamsMap := NewTcpStreamMap()
go streamsMap.closeTimedoutTcpStreamChannels()
var outputLevel int
diagnose.InitializeErrorsMap(*debug, *verbose, *quiet)
diagnose.InitializeTapperInternalStats()
defer util.Run()()
if *debug {
outputLevel = 2
} else if *verbose {
outputLevel = 1
} else if *quiet {
outputLevel = -1
var bpffilter string
if len(flag.Args()) > 0 {
bpffilter = strings.Join(flag.Args(), " ")
}
tapErrors = NewErrorsMap(outputLevel)
packetSource, err := source.NewTcpPacketSource(*fname, *iface, source.TcpPacketSourceBehaviour{
SnapLength: *snaplen,
Promisc: *promisc,
Tstype: *tstype,
DecoderName: *decoder,
Lazy: *lazy,
BpfFilter: bpffilter,
})
var handle *pcap.Handle
var err error
if *fname != "" {
if handle, err = pcap.OpenOffline(*fname); err != nil {
logger.Log.Fatalf("PCAP OpenOffline error: %v", err)
}
} else {
// This is a little complicated because we want to allow all possible options
// for creating the packet capture handle... instead of all this you can
// just call pcap.OpenLive if you want a simple handle.
inactive, err := pcap.NewInactiveHandle(*iface)
if err != nil {
logger.Log.Fatalf("could not create: %v", err)
}
defer inactive.CleanUp()
if err = inactive.SetSnapLen(*snaplen); err != nil {
logger.Log.Fatalf("could not set snap length: %v", err)
} else if err = inactive.SetPromisc(*promisc); err != nil {
logger.Log.Fatalf("could not set promisc mode: %v", err)
} else if err = inactive.SetTimeout(time.Second); err != nil {
logger.Log.Fatalf("could not set timeout: %v", err)
}
if *tstype != "" {
if t, err := pcap.TimestampSourceFromString(*tstype); err != nil {
logger.Log.Fatalf("Supported timestamp types: %v", inactive.SupportedTimestamps())
} else if err := inactive.SetTimestampSource(t); err != nil {
logger.Log.Fatalf("Supported timestamp types: %v", inactive.SupportedTimestamps())
}
}
if handle, err = inactive.Activate(); err != nil {
logger.Log.Fatalf("PCAP Activate error: %v", err)
}
defer handle.Close()
}
if len(flag.Args()) > 0 {
bpffilter := strings.Join(flag.Args(), " ")
logger.Log.Infof("Using BPF filter %q", bpffilter)
if err = handle.SetBPFFilter(bpffilter); err != nil {
logger.Log.Fatalf("BPF filter error: %v", err)
}
if err != nil {
logger.Log.Fatal(err)
}
var dec gopacket.Decoder
var ok bool
decoderName := *decoder
if decoderName == "" {
decoderName = handle.LinkType().String()
}
if dec, ok = gopacket.DecodersByLayerName[decoderName]; !ok {
logger.Log.Fatal("No decoder named", decoderName)
}
source := gopacket.NewPacketSource(handle, dec)
source.Lazy = *lazy
source.NoCopy = true
logger.Log.Info("Starting to read packets")
appStats.SetStartTime(time.Now())
defragger := ip4defrag.NewIPv4Defragmenter()
defer packetSource.Close()
var emitter api.Emitter = &api.Emitting{
AppStats: &appStats,
OutputChannel: outputItems,
if err != nil {
logger.Log.Fatal(err)
}
streamFactory := NewTcpStreamFactory(emitter, streamsMap)
streamPool := reassembly.NewStreamPool(streamFactory)
assembler := reassembly.NewAssembler(streamPool)
maxBufferedPagesTotal := GetMaxBufferedPagesPerConnection()
maxBufferedPagesPerConnection := GetMaxBufferedPagesTotal()
logger.Log.Infof("Assembler options: maxBufferedPagesTotal=%d, maxBufferedPagesPerConnection=%d", maxBufferedPagesTotal, maxBufferedPagesPerConnection)
assembler.AssemblerOptions.MaxBufferedPagesTotal = maxBufferedPagesTotal
assembler.AssemblerOptions.MaxBufferedPagesPerConnection = maxBufferedPagesPerConnection
packets := make(chan source.TcpPacketInfo)
assembler := NewTcpAssembler(outputItems, streamsMap)
var assemblerMutex sync.Mutex
logger.Log.Info("Starting to read packets")
diagnose.AppStats.SetStartTime(time.Now())
signalChan := make(chan os.Signal, 1)
signal.Notify(signalChan, os.Interrupt)
go packetSource.ReadPackets(!*nodefrag, packets)
staleConnectionTimeout := time.Second * time.Duration(*staleTimeoutSeconds)
cleaner := Cleaner{
assembler: assembler,
assemblerMutex: &assemblerMutex,
assembler: assembler.Assembler,
assemblerMutex: &assembler.assemblerMutex,
cleanPeriod: cleanPeriod,
connectionTimeout: staleConnectionTimeout,
}
cleaner.start()
go func() {
statsPeriod := time.Second * time.Duration(*statsevery)
ticker := time.NewTicker(statsPeriod)
for {
<-ticker.C
// Since the start
errorMapLen, errorsSummery := tapErrors.getErrorsSummary()
logger.Log.Infof("%v (errors: %v, errTypes:%v) - Errors Summary: %s",
time.Since(appStats.StartTime),
tapErrors.nErrors,
errorMapLen,
errorsSummery,
)
// At this moment
memStats := runtime.MemStats{}
runtime.ReadMemStats(&memStats)
logger.Log.Infof(
"mem: %d, goroutines: %d",
memStats.HeapAlloc,
runtime.NumGoroutine(),
)
// Since the last print
cleanStats := cleaner.dumpStats()
logger.Log.Infof(
"cleaner - flushed connections: %d, closed connections: %d, deleted messages: %d",
cleanStats.flushed,
cleanStats.closed,
cleanStats.deleted,
)
currentAppStats := appStats.DumpStats()
appStatsJSON, _ := json.Marshal(currentAppStats)
logger.Log.Infof("app stats - %v", string(appStatsJSON))
}
}()
if GetMemoryProfilingEnabled() {
startMemoryProfiler()
}
for {
packet, err := source.NextPacket()
if err == io.EOF {
break
} else if err != nil {
if err.Error() != "Timeout Expired" {
logger.Log.Debugf("Error: %T", err)
}
continue
}
packetsCount := appStats.IncPacketsCount()
logger.Log.Debugf("PACKET #%d", packetsCount)
data := packet.Data()
appStats.UpdateProcessedBytes(uint64(len(data)))
if *hexdumppkt {
logger.Log.Debugf("Packet content (%d/0x%x) - %s", len(data), len(data), hex.Dump(data))
}
go printPeriodicStats(&cleaner)
// defrag the IPv4 packet if required
if !*nodefrag {
ip4Layer := packet.Layer(layers.LayerTypeIPv4)
if ip4Layer == nil {
continue
}
ip4 := ip4Layer.(*layers.IPv4)
l := ip4.Length
newip4, err := defragger.DefragIPv4(ip4)
if err != nil {
logger.Log.Fatal("Error while de-fragmenting", err)
} else if newip4 == nil {
logger.Log.Debugf("Fragment...")
continue // packet fragment, we don't have whole packet yet.
}
if newip4.Length != l {
stats.ipdefrag++
logger.Log.Debugf("Decoding re-assembled packet: %s", newip4.NextLayerType())
pb, ok := packet.(gopacket.PacketBuilder)
if !ok {
logger.Log.Panic("Not a PacketBuilder")
}
nextDecoder := newip4.NextLayerType()
_ = nextDecoder.Decode(newip4.Payload, pb)
}
}
assembler.processPackets(*hexdumppkt, packets)
tcp := packet.Layer(layers.LayerTypeTCP)
if tcp != nil {
appStats.IncTcpPacketsCount()
tcp := tcp.(*layers.TCP)
if *checksum {
err := tcp.SetNetworkLayerForChecksum(packet.NetworkLayer())
if err != nil {
logger.Log.Fatalf("Failed to set network layer for checksum: %s\n", err)
}
}
c := Context{
CaptureInfo: packet.Metadata().CaptureInfo,
}
stats.totalsz += len(tcp.Payload)
logger.Log.Debugf("%s : %v -> %s : %v", packet.NetworkLayer().NetworkFlow().Src(), tcp.SrcPort, packet.NetworkLayer().NetworkFlow().Dst(), tcp.DstPort)
assemblerMutex.Lock()
assembler.AssembleWithContext(packet.NetworkLayer().NetworkFlow(), tcp, &c)
assemblerMutex.Unlock()
}
done := *maxcount > 0 && int64(appStats.PacketsCount) >= *maxcount
if done {
errorMapLen, _ := tapErrors.getErrorsSummary()
logger.Log.Infof("Processed %v packets (%v bytes) in %v (errors: %v, errTypes:%v)",
appStats.PacketsCount,
appStats.ProcessedBytes,
time.Since(appStats.StartTime),
tapErrors.nErrors,
errorMapLen)
}
select {
case <-signalChan:
logger.Log.Infof("Caught SIGINT: aborting")
done = true
default:
// NOP: continue
}
if done {
break
}
if diagnose.TapErrors.OutputLevel >= 2 {
assembler.dumpStreamPool()
}
assemblerMutex.Lock()
closed := assembler.FlushAll()
assemblerMutex.Unlock()
logger.Log.Debugf("Final flush: %d closed", closed)
if outputLevel >= 2 {
streamPool.Dump()
if err := diagnose.DumpMemoryProfile(*memprofile); err != nil {
logger.Log.Errorf("Error dumping memory profile %v\n", err)
}
if *memprofile != "" {
f, err := os.Create(*memprofile)
if err != nil {
logger.Log.Fatal(err)
}
_ = pprof.WriteHeapProfile(f)
_ = f.Close()
}
assembler.waitAndDump()
streamFactory.WaitGoRoutines()
assemblerMutex.Lock()
logger.Log.Debugf("%s", assembler.Dump())
assemblerMutex.Unlock()
if !*nodefrag {
logger.Log.Infof("IPdefrag:\t\t%d", stats.ipdefrag)
}
logger.Log.Infof("TCP stats:")
logger.Log.Infof(" missed bytes:\t\t%d", stats.missedBytes)
logger.Log.Infof(" total packets:\t\t%d", stats.pkt)
logger.Log.Infof(" rejected FSM:\t\t%d", stats.rejectFsm)
logger.Log.Infof(" rejected Options:\t%d", stats.rejectOpt)
logger.Log.Infof(" reassembled bytes:\t%d", stats.sz)
logger.Log.Infof(" total TCP bytes:\t%d", stats.totalsz)
logger.Log.Infof(" conn rejected FSM:\t%d", stats.rejectConnFsm)
logger.Log.Infof(" reassembled chunks:\t%d", stats.reassembled)
logger.Log.Infof(" out-of-order packets:\t%d", stats.outOfOrderPackets)
logger.Log.Infof(" out-of-order bytes:\t%d", stats.outOfOrderBytes)
logger.Log.Infof(" biggest-chunk packets:\t%d", stats.biggestChunkPackets)
logger.Log.Infof(" biggest-chunk bytes:\t%d", stats.biggestChunkBytes)
logger.Log.Infof(" overlap packets:\t%d", stats.overlapPackets)
logger.Log.Infof(" overlap bytes:\t\t%d", stats.overlapBytes)
logger.Log.Infof("Errors: %d", tapErrors.nErrors)
for e := range tapErrors.errorsMap {
logger.Log.Infof(" %s:\t\t%d", e, tapErrors.errorsMap[e])
}
logger.Log.Infof("AppStats: %v", GetStats())
diagnose.InternalStats.PrintStatsSummary()
diagnose.TapErrors.PrintSummary()
logger.Log.Infof("AppStats: %v", diagnose.AppStats)
}
package source
import (
"fmt"
"io"
"time"
"github.com/google/gopacket"
"github.com/google/gopacket/ip4defrag"
"github.com/google/gopacket/layers"
"github.com/google/gopacket/pcap"
"github.com/up9inc/mizu/shared/logger"
"github.com/up9inc/mizu/tap/diagnose"
)
type TcpPacketSource struct {
source *gopacket.PacketSource
handle *pcap.Handle
defragger *ip4defrag.IPv4Defragmenter
Behaviour *TcpPacketSourceBehaviour
}
type TcpPacketSourceBehaviour struct {
SnapLength int
Promisc bool
Tstype string
DecoderName string
Lazy bool
BpfFilter string
}
type TcpPacketInfo struct {
Packet gopacket.Packet
Source *TcpPacketSource
}
func NewTcpPacketSource(filename string, interfaceName string,
behaviour TcpPacketSourceBehaviour) (*TcpPacketSource, error) {
var err error
result := &TcpPacketSource{
defragger: ip4defrag.NewIPv4Defragmenter(),
Behaviour: &behaviour,
}
if filename != "" {
if result.handle, err = pcap.OpenOffline(filename); err != nil {
return result, fmt.Errorf("PCAP OpenOffline error: %v", err)
}
} else {
// This is a little complicated because we want to allow all possible options
// for creating the packet capture handle... instead of all this you can
// just call pcap.OpenLive if you want a simple handle.
inactive, err := pcap.NewInactiveHandle(interfaceName)
if err != nil {
return result, fmt.Errorf("could not create: %v", err)
}
defer inactive.CleanUp()
if err = inactive.SetSnapLen(behaviour.SnapLength); err != nil {
return result, fmt.Errorf("could not set snap length: %v", err)
} else if err = inactive.SetPromisc(behaviour.Promisc); err != nil {
return result, fmt.Errorf("could not set promisc mode: %v", err)
} else if err = inactive.SetTimeout(time.Second); err != nil {
return result, fmt.Errorf("could not set timeout: %v", err)
}
if behaviour.Tstype != "" {
if t, err := pcap.TimestampSourceFromString(behaviour.Tstype); err != nil {
return result, fmt.Errorf("supported timestamp types: %v", inactive.SupportedTimestamps())
} else if err := inactive.SetTimestampSource(t); err != nil {
return result, fmt.Errorf("supported timestamp types: %v", inactive.SupportedTimestamps())
}
}
if result.handle, err = inactive.Activate(); err != nil {
return result, fmt.Errorf("PCAP Activate error: %v", err)
}
}
if behaviour.BpfFilter != "" {
logger.Log.Infof("Using BPF filter %q", behaviour.BpfFilter)
if err = result.handle.SetBPFFilter(behaviour.BpfFilter); err != nil {
return nil, fmt.Errorf("BPF filter error: %v", err)
}
}
var dec gopacket.Decoder
var ok bool
if behaviour.DecoderName == "" {
behaviour.DecoderName = result.handle.LinkType().String()
}
if dec, ok = gopacket.DecodersByLayerName[behaviour.DecoderName]; !ok {
return nil, fmt.Errorf("no decoder named %v", behaviour.DecoderName)
}
result.source = gopacket.NewPacketSource(result.handle, dec)
result.source.Lazy = behaviour.Lazy
result.source.NoCopy = true
return result, nil
}
func (source *TcpPacketSource) Close() {
if source.handle != nil {
source.handle.Close()
}
}
func (source *TcpPacketSource) ReadPackets(ipdefrag bool, packets chan<- TcpPacketInfo) error {
for {
packet, err := source.source.NextPacket()
if err == io.EOF {
return err
} else if err != nil {
if err.Error() != "Timeout Expired" {
logger.Log.Debugf("Error: %T", err)
}
continue
}
// defrag the IPv4 packet if required
if !ipdefrag {
ip4Layer := packet.Layer(layers.LayerTypeIPv4)
if ip4Layer == nil {
continue
}
ip4 := ip4Layer.(*layers.IPv4)
l := ip4.Length
newip4, err := source.defragger.DefragIPv4(ip4)
if err != nil {
logger.Log.Fatal("Error while de-fragmenting", err)
} else if newip4 == nil {
logger.Log.Debugf("Fragment...")
continue // packet fragment, we don't have whole packet yet.
}
if newip4.Length != l {
diagnose.InternalStats.Ipdefrag++
logger.Log.Debugf("Decoding re-assembled packet: %s", newip4.NextLayerType())
pb, ok := packet.(gopacket.PacketBuilder)
if !ok {
logger.Log.Panic("Not a PacketBuilder")
}
nextDecoder := newip4.NextLayerType()
_ = nextDecoder.Decode(newip4.Payload, pb)
}
}
packets <- TcpPacketInfo{
Packet: packet,
Source: source,
}
}
}
package tap
import (
"encoding/hex"
"os"
"os/signal"
"sync"
"time"
"github.com/google/gopacket"
"github.com/google/gopacket/layers"
"github.com/google/gopacket/reassembly"
"github.com/up9inc/mizu/shared/logger"
"github.com/up9inc/mizu/tap/api"
"github.com/up9inc/mizu/tap/diagnose"
"github.com/up9inc/mizu/tap/source"
)
type tcpAssembler struct {
*reassembly.Assembler
streamPool *reassembly.StreamPool
streamFactory *tcpStreamFactory
assemblerMutex sync.Mutex
}
// Context
// The assembler context
type context struct {
CaptureInfo gopacket.CaptureInfo
}
func (c *context) GetCaptureInfo() gopacket.CaptureInfo {
return c.CaptureInfo
}
func NewTcpAssembler(outputItems chan *api.OutputChannelItem, streamsMap *tcpStreamMap) *tcpAssembler {
var emitter api.Emitter = &api.Emitting{
AppStats: &diagnose.AppStats,
OutputChannel: outputItems,
}
streamFactory := NewTcpStreamFactory(emitter, streamsMap)
streamPool := reassembly.NewStreamPool(streamFactory)
assembler := reassembly.NewAssembler(streamPool)
maxBufferedPagesTotal := GetMaxBufferedPagesPerConnection()
maxBufferedPagesPerConnection := GetMaxBufferedPagesTotal()
logger.Log.Infof("Assembler options: maxBufferedPagesTotal=%d, maxBufferedPagesPerConnection=%d",
maxBufferedPagesTotal, maxBufferedPagesPerConnection)
assembler.AssemblerOptions.MaxBufferedPagesTotal = maxBufferedPagesTotal
assembler.AssemblerOptions.MaxBufferedPagesPerConnection = maxBufferedPagesPerConnection
return &tcpAssembler{
Assembler: assembler,
streamPool: streamPool,
streamFactory: streamFactory,
}
}
func (a *tcpAssembler) processPackets(dumpPacket bool, packets <-chan source.TcpPacketInfo) {
signalChan := make(chan os.Signal, 1)
signal.Notify(signalChan, os.Interrupt)
for packetInfo := range packets {
packetsCount := diagnose.AppStats.IncPacketsCount()
logger.Log.Debugf("PACKET #%d", packetsCount)
packet := packetInfo.Packet
data := packet.Data()
diagnose.AppStats.UpdateProcessedBytes(uint64(len(data)))
if dumpPacket {
logger.Log.Debugf("Packet content (%d/0x%x) - %s", len(data), len(data), hex.Dump(data))
}
tcp := packet.Layer(layers.LayerTypeTCP)
if tcp != nil {
diagnose.AppStats.IncTcpPacketsCount()
tcp := tcp.(*layers.TCP)
if *checksum {
err := tcp.SetNetworkLayerForChecksum(packet.NetworkLayer())
if err != nil {
logger.Log.Fatalf("Failed to set network layer for checksum: %s\n", err)
}
}
c := context{
CaptureInfo: packet.Metadata().CaptureInfo,
}
diagnose.InternalStats.Totalsz += len(tcp.Payload)
logger.Log.Debugf("%s : %v -> %s : %v", packet.NetworkLayer().NetworkFlow().Src(), tcp.SrcPort, packet.NetworkLayer().NetworkFlow().Dst(), tcp.DstPort)
a.assemblerMutex.Lock()
a.AssembleWithContext(packet.NetworkLayer().NetworkFlow(), tcp, &c)
a.assemblerMutex.Unlock()
}
done := *maxcount > 0 && int64(diagnose.AppStats.PacketsCount) >= *maxcount
if done {
errorMapLen, _ := diagnose.TapErrors.GetErrorsSummary()
logger.Log.Infof("Processed %v packets (%v bytes) in %v (errors: %v, errTypes:%v)",
diagnose.AppStats.PacketsCount,
diagnose.AppStats.ProcessedBytes,
time.Since(diagnose.AppStats.StartTime),
diagnose.TapErrors.ErrorsCount,
errorMapLen)
}
select {
case <-signalChan:
logger.Log.Infof("Caught SIGINT: aborting")
done = true
default:
// NOP: continue
}
if done {
break
}
}
a.assemblerMutex.Lock()
closed := a.FlushAll()
a.assemblerMutex.Unlock()
logger.Log.Debugf("Final flush: %d closed", closed)
}
func (a *tcpAssembler) dumpStreamPool() {
a.streamPool.Dump()
}
func (a *tcpAssembler) waitAndDump() {
a.streamFactory.WaitGoRoutines()
a.assemblerMutex.Lock()
logger.Log.Debugf("%s", a.Dump())
a.assemblerMutex.Unlock()
}
package tap
import (
"fmt"
"io"
"time"
"github.com/google/gopacket"
"github.com/google/gopacket/ip4defrag"
"github.com/google/gopacket/layers"
"github.com/google/gopacket/pcap"
"github.com/up9inc/mizu/shared/logger"
"github.com/up9inc/mizu/tap/diagnose"
)
type tcpPacketSource struct {
source *gopacket.PacketSource
handle *pcap.Handle
defragger *ip4defrag.IPv4Defragmenter
behaviour *tcpPacketSourceBehaviour
}
type tcpPacketSourceBehaviour struct {
snapLength int
promisc bool
tstype string
decoderName string
lazy bool
bpfFilter string
}
type tcpPacketInfo struct {
packet gopacket.Packet
source *tcpPacketSource
}
func NewTcpPacketSource(filename string, interfaceName string,
behaviour tcpPacketSourceBehaviour) (*tcpPacketSource, error) {
var err error
result := &tcpPacketSource{
defragger: ip4defrag.NewIPv4Defragmenter(),
behaviour: &behaviour,
}
if filename != "" {
if result.handle, err = pcap.OpenOffline(filename); err != nil {
return result, fmt.Errorf("PCAP OpenOffline error: %v", err)
}
} else {
// This is a little complicated because we want to allow all possible options
// for creating the packet capture handle... instead of all this you can
// just call pcap.OpenLive if you want a simple handle.
inactive, err := pcap.NewInactiveHandle(interfaceName)
if err != nil {
return result, fmt.Errorf("could not create: %v", err)
}
defer inactive.CleanUp()
if err = inactive.SetSnapLen(behaviour.snapLength); err != nil {
return result, fmt.Errorf("could not set snap length: %v", err)
} else if err = inactive.SetPromisc(behaviour.promisc); err != nil {
return result, fmt.Errorf("could not set promisc mode: %v", err)
} else if err = inactive.SetTimeout(time.Second); err != nil {
return result, fmt.Errorf("could not set timeout: %v", err)
}
if behaviour.tstype != "" {
if t, err := pcap.TimestampSourceFromString(behaviour.tstype); err != nil {
return result, fmt.Errorf("supported timestamp types: %v", inactive.SupportedTimestamps())
} else if err := inactive.SetTimestampSource(t); err != nil {
return result, fmt.Errorf("supported timestamp types: %v", inactive.SupportedTimestamps())
}
}
if result.handle, err = inactive.Activate(); err != nil {
return result, fmt.Errorf("PCAP Activate error: %v", err)
}
}
if behaviour.bpfFilter != "" {
logger.Log.Infof("Using BPF filter %q", behaviour.bpfFilter)
if err = result.handle.SetBPFFilter(behaviour.bpfFilter); err != nil {
return nil, fmt.Errorf("BPF filter error: %v", err)
}
}
var dec gopacket.Decoder
var ok bool
if behaviour.decoderName == "" {
behaviour.decoderName = result.handle.LinkType().String()
}
if dec, ok = gopacket.DecodersByLayerName[behaviour.decoderName]; !ok {
return nil, fmt.Errorf("no decoder named %v", behaviour.decoderName)
}
result.source = gopacket.NewPacketSource(result.handle, dec)
result.source.Lazy = behaviour.lazy
result.source.NoCopy = true
return result, nil
}
func (source *tcpPacketSource) close() {
if source.handle != nil {
source.handle.Close()
}
}
func (source *tcpPacketSource) readPackets(ipdefrag bool, packets chan<- tcpPacketInfo) error {
for {
packet, err := source.source.NextPacket()
if err == io.EOF {
return err
} else if err != nil {
if err.Error() != "Timeout Expired" {
logger.Log.Debugf("Error: %T", err)
}
continue
}
// defrag the IPv4 packet if required
if !ipdefrag {
ip4Layer := packet.Layer(layers.LayerTypeIPv4)
if ip4Layer == nil {
continue
}
ip4 := ip4Layer.(*layers.IPv4)
l := ip4.Length
newip4, err := source.defragger.DefragIPv4(ip4)
if err != nil {
logger.Log.Fatal("Error while de-fragmenting", err)
} else if newip4 == nil {
logger.Log.Debugf("Fragment...")
continue // packet fragment, we don't have whole packet yet.
}
if newip4.Length != l {
diagnose.InternalStats.Ipdefrag++
logger.Log.Debugf("Decoding re-assembled packet: %s", newip4.NextLayerType())
pb, ok := packet.(gopacket.PacketBuilder)
if !ok {
logger.Log.Panic("Not a PacketBuilder")
}
nextDecoder := newip4.NextLayerType()
_ = nextDecoder.Decode(newip4.Payload, pb)
}
}
packets <- tcpPacketInfo{
packet: packet,
source: source,
}
}
}
......@@ -9,6 +9,7 @@ import (
"github.com/google/gopacket/layers" // pulls in all layers decoders
"github.com/google/gopacket/reassembly"
"github.com/up9inc/mizu/tap/api"
"github.com/up9inc/mizu/tap/diagnose"
)
/* It's a connection (bidirectional)
......@@ -36,11 +37,11 @@ type tcpStream struct {
func (t *tcpStream) Accept(tcp *layers.TCP, ci gopacket.CaptureInfo, dir reassembly.TCPFlowDirection, nextSeq reassembly.Sequence, start *bool, ac reassembly.AssemblerContext) bool {
// FSM
if !t.tcpstate.CheckState(tcp, dir) {
tapErrors.SilentError("FSM-rejection", "%s: Packet rejected by FSM (state:%s)", t.ident, t.tcpstate.String())
stats.rejectFsm++
diagnose.TapErrors.SilentError("FSM-rejection", "%s: Packet rejected by FSM (state:%s)", t.ident, t.tcpstate.String())
diagnose.InternalStats.RejectFsm++
if !t.fsmerr {
t.fsmerr = true
stats.rejectConnFsm++
diagnose.InternalStats.RejectConnFsm++
}
if !*ignorefsmerr {
return false
......@@ -49,8 +50,8 @@ func (t *tcpStream) Accept(tcp *layers.TCP, ci gopacket.CaptureInfo, dir reassem
// Options
err := t.optchecker.Accept(tcp, ci, dir, nextSeq, start)
if err != nil {
tapErrors.SilentError("OptionChecker-rejection", "%s: Packet rejected by OptionChecker: %s", t.ident, err)
stats.rejectOpt++
diagnose.TapErrors.SilentError("OptionChecker-rejection", "%s: Packet rejected by OptionChecker: %s", t.ident, err)
diagnose.InternalStats.RejectOpt++
if !*nooptcheck {
return false
}
......@@ -60,15 +61,15 @@ func (t *tcpStream) Accept(tcp *layers.TCP, ci gopacket.CaptureInfo, dir reassem
if *checksum {
c, err := tcp.ComputeChecksum()
if err != nil {
tapErrors.SilentError("ChecksumCompute", "%s: Got error computing checksum: %s", t.ident, err)
diagnose.TapErrors.SilentError("ChecksumCompute", "%s: Got error computing checksum: %s", t.ident, err)
accept = false
} else if c != 0x0 {
tapErrors.SilentError("Checksum", "%s: Invalid checksum: 0x%x", t.ident, c)
diagnose.TapErrors.SilentError("Checksum", "%s: Invalid checksum: 0x%x", t.ident, c)
accept = false
}
}
if !accept {
stats.rejectOpt++
diagnose.InternalStats.RejectOpt++
}
return accept
}
......@@ -79,28 +80,28 @@ func (t *tcpStream) ReassembledSG(sg reassembly.ScatterGather, ac reassembly.Ass
// update stats
sgStats := sg.Stats()
if skip > 0 {
stats.missedBytes += skip
diagnose.InternalStats.MissedBytes += skip
}
stats.sz += length - saved
stats.pkt += sgStats.Packets
diagnose.InternalStats.Sz += length - saved
diagnose.InternalStats.Pkt += sgStats.Packets
if sgStats.Chunks > 1 {
stats.reassembled++
diagnose.InternalStats.Reassembled++
}
stats.outOfOrderPackets += sgStats.QueuedPackets
stats.outOfOrderBytes += sgStats.QueuedBytes
if length > stats.biggestChunkBytes {
stats.biggestChunkBytes = length
diagnose.InternalStats.OutOfOrderPackets += sgStats.QueuedPackets
diagnose.InternalStats.OutOfOrderBytes += sgStats.QueuedBytes
if length > diagnose.InternalStats.BiggestChunkBytes {
diagnose.InternalStats.BiggestChunkBytes = length
}
if sgStats.Packets > stats.biggestChunkPackets {
stats.biggestChunkPackets = sgStats.Packets
if sgStats.Packets > diagnose.InternalStats.BiggestChunkPackets {
diagnose.InternalStats.BiggestChunkPackets = sgStats.Packets
}
if sgStats.OverlapBytes != 0 && sgStats.OverlapPackets == 0 {
// In the original example this was handled with panic().
// I don't know what this error means or how to handle it properly.
tapErrors.SilentError("Invalid-Overlap", "bytes:%d, pkts:%d", sgStats.OverlapBytes, sgStats.OverlapPackets)
diagnose.TapErrors.SilentError("Invalid-Overlap", "bytes:%d, pkts:%d", sgStats.OverlapBytes, sgStats.OverlapPackets)
}
stats.overlapBytes += sgStats.OverlapBytes
stats.overlapPackets += sgStats.OverlapPackets
diagnose.InternalStats.OverlapBytes += sgStats.OverlapBytes
diagnose.InternalStats.OverlapPackets += sgStats.OverlapPackets
var ident string
if dir == reassembly.TCPDirClientToServer {
......@@ -108,7 +109,7 @@ func (t *tcpStream) ReassembledSG(sg reassembly.ScatterGather, ac reassembly.Ass
} else {
ident = fmt.Sprintf("%v %v(%s): ", t.net.Reverse(), t.transport.Reverse(), dir)
}
tapErrors.Debug("%s: SG reassembled packet with %d bytes (start:%v,end:%v,skip:%d,saved:%d,nb:%d,%d,overlap:%d,%d)", ident, length, start, end, skip, saved, sgStats.Packets, sgStats.Chunks, sgStats.OverlapBytes, sgStats.OverlapPackets)
diagnose.TapErrors.Debug("%s: SG reassembled packet with %d bytes (start:%v,end:%v,skip:%d,saved:%d,nb:%d,%d,overlap:%d,%d)", ident, length, start, end, skip, saved, sgStats.Packets, sgStats.Chunks, sgStats.OverlapBytes, sgStats.OverlapPackets)
if skip == -1 && *allowmissinginit {
// this is allowed
} else if skip != 0 {
......@@ -127,18 +128,18 @@ func (t *tcpStream) ReassembledSG(sg reassembly.ScatterGather, ac reassembly.Ass
}
dnsSize := binary.BigEndian.Uint16(data[:2])
missing := int(dnsSize) - len(data[2:])
tapErrors.Debug("dnsSize: %d, missing: %d", dnsSize, missing)
diagnose.TapErrors.Debug("dnsSize: %d, missing: %d", dnsSize, missing)
if missing > 0 {
tapErrors.Debug("Missing some bytes: %d", missing)
diagnose.TapErrors.Debug("Missing some bytes: %d", missing)
sg.KeepFrom(0)
return
}
p := gopacket.NewDecodingLayerParser(layers.LayerTypeDNS, dns)
err := p.DecodeLayers(data[2:], &decoded)
if err != nil {
tapErrors.SilentError("DNS-parser", "Failed to decode DNS: %v", err)
diagnose.TapErrors.SilentError("DNS-parser", "Failed to decode DNS: %v", err)
} else {
tapErrors.Debug("DNS: %s", gopacket.LayerDump(dns))
diagnose.TapErrors.Debug("DNS: %s", gopacket.LayerDump(dns))
}
if len(data) > 2+int(dnsSize) {
sg.KeepFrom(2 + int(dnsSize))
......@@ -147,7 +148,7 @@ func (t *tcpStream) ReassembledSG(sg reassembly.ScatterGather, ac reassembly.Ass
if length > 0 {
// This is where we pass the reassembled information onwards
// This channel is read by an tcpReader object
appStats.IncReassembledTcpPayloadsCount()
diagnose.AppStats.IncReassembledTcpPayloadsCount()
timestamp := ac.GetCaptureInfo().Timestamp
if dir == reassembly.TCPDirClientToServer {
for i := range t.clients {
......@@ -173,7 +174,7 @@ func (t *tcpStream) ReassembledSG(sg reassembly.ScatterGather, ac reassembly.Ass
}
func (t *tcpStream) ReassemblyComplete(ac reassembly.AssemblerContext) bool {
tapErrors.Debug("%s: Connection closed", t.ident)
diagnose.TapErrors.Debug("%s: Connection closed", t.ident)
if t.isTapTarget && !t.isClosed {
t.Close()
}
......
......@@ -7,6 +7,7 @@ import (
"time"
"github.com/up9inc/mizu/shared/logger"
"github.com/up9inc/mizu/tap/diagnose"
)
type tcpStreamMap struct {
......@@ -44,9 +45,9 @@ func (streamMap *tcpStreamMap) closeTimedoutTcpStreamChannels() {
if stream.superIdentifier.Protocol == nil {
if !stream.isClosed && time.Now().After(streamWrapper.createdAt.Add(tcpStreamChannelTimeout)) {
stream.Close()
appStats.IncDroppedTcpStreams()
diagnose.AppStats.IncDroppedTcpStreams()
logger.Log.Debugf("Dropped an unidentified TCP stream because of timeout. Total dropped: %d Total Goroutines: %d Timeout (ms): %d\n",
appStats.DroppedTcpStreams, runtime.NumGoroutine(), tcpStreamChannelTimeout/1000000)
diagnose.AppStats.DroppedTcpStreams, runtime.NumGoroutine(), tcpStreamChannelTimeout/1000000)
}
} else {
if !stream.superIdentifier.IsClosedOthers {
......
......@@ -2,7 +2,6 @@ package main
import (
"bufio"
"fmt"
"io/ioutil"
"os"
"path"
......@@ -34,7 +33,7 @@ func loadExtensions() ([]*tapApi.Extension, error) {
continue
}
fmt.Printf("Loading extension: %s\n", filename)
logger.Log.Infof("Loading extension: %s\n", filename)
extension := &tapApi.Extension{
Path: path.Join(extensionsDir, filename),
......@@ -69,7 +68,7 @@ func loadExtensions() ([]*tapApi.Extension, error) {
})
for _, extension := range extensions {
fmt.Printf("Extension Properties: %+v\n", extension)
logger.Log.Infof("Extension Properties: %+v\n", extension)
}
return extensions, nil
......@@ -93,7 +92,7 @@ func internalRun() error {
tap.StartPassiveTapper(&opts, outputItems, extenssions, &tapOpts)
fmt.Printf("Tapping, press enter to exit...\n")
logger.Log.Infof("Tapping, press enter to exit...\n")
reader := bufio.NewReader(os.Stdin)
reader.ReadLine()
return nil
......@@ -105,9 +104,9 @@ func main() {
if err != nil {
switch err := err.(type) {
case *errors.Error:
fmt.Printf("Error: %v\n", err.ErrorStack())
logger.Log.Errorf("Error: %v\n", err.ErrorStack())
default:
fmt.Printf("Error: %v\n", err)
logger.Log.Errorf("Error: %v\n", err)
}
os.Exit(1)
......
Supports Markdown
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment