Unverified Commit eb81bd1a authored by James Bartlett's avatar James Bartlett Committed by Copybara
Browse files

Add px/cluster_egress script, for viewing data exfiltration from the cluster.


Summary:
Adds a pxl script for viewing all traffic leaving the cluster. Uses Pixie's `dns_events` to determine domain names for external IPs, instead of the domain names nslookup yields. Allows filtering by traffic containing PII.

This script will be used for the data exfiltration demo.

Test Plan: Tested on a cluster with the new CIDR UDFs.

Reviewers: nserrino, zasgar, vihang, philkuz, michelle

Reviewed By: vihang
Signed-off-by: default avatarJames Bartlett <jamesbartlett@pixielabs.ai>

Differential Revision: https://phab.corp.pixielabs.ai/D11384

GitOrigin-RevId: 0e933b545d7fc61db1ee863917657652f86180e8
parent 3e00c3d6
Showing with 477 additions and 0 deletions
+477 -0
# Copyright 2018- The Pixie Authors.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#
# SPDX-License-Identifier: Apache-2.0
import px
def egress_graph(
start_time: str,
filter_for_pii_egress: bool,
include_unknown_protocol: bool,
filter_for_not_ssl: bool,
use_pixie_dns_resolution: bool,
):
"""Get a graphical representation of all traffic leaving the cluster."""
df = traffic_leaving_cluster(
start_time,
filter_for_pii_egress,
include_unknown_protocol,
filter_for_not_ssl,
use_pixie_dns_resolution,
)
df = df.groupby(["protocol", "resolved_domain"]).agg(
ssl_sum=("ssl", px.sum),
count=("ssl", px.count),
bytes_egressed=("bytes_egressed", px.sum),
example_req=("example_req", px.any),
)
df.ssl = px.select(
df.ssl_sum == 0,
"no_ssl",
px.select(df.ssl_sum == df.count, "ssl", "both_ssl_and_no_ssl"),
)
# The Graph widget doesn't support categorical labels for edge color,
# so use a really small number and a really big number to fake a 'latency' for the categorical edge color.
df.edge_color = px.select(
df.ssl == "ssl", 0, px.select(df.ssl == "no_ssl", 201, 101)
)
return df
def egress_data(
start_time: str,
filter_for_pii_egress: bool,
include_unknown_protocol: bool,
filter_for_not_ssl: bool,
use_pixie_dns_resolution: bool,
):
"""Get all traffic leaving the cluster grouped by pod, remote IP address, protocol, and SSL."""
df = traffic_leaving_cluster(
start_time,
filter_for_pii_egress,
include_unknown_protocol,
filter_for_not_ssl,
use_pixie_dns_resolution,
)
df = df.groupby(["protocol", "resolved_domain", "remote_addr", "pod", "ssl"]).agg(
bytes_egressed=("bytes_egressed", px.sum), example_req=("example_req", px.any)
)
return df[
[
"resolved_domain",
"protocol",
"remote_addr",
"pod",
"example_req",
"bytes_egressed",
"ssl",
]
]
def traffic_leaving_cluster(
start_time: str,
filter_for_pii_egress: bool,
include_unknown_protocol: bool,
filter_for_not_ssl: bool,
use_pixie_dns_resolution: bool,
):
"""Get all traffic leaving the cluster aggregated by upid,remote_addr pairs."""
df = unified_events_table(start_time, filter_for_pii_egress)
df = df.groupby(
["node", "pod", "upid", "remote_addr", "remote_port", "protocol"]
).agg(
bytes_egressed=("bytes", px.sum),
example_req=("req", px.any),
)
# Add whether the connection is ssl'd using conn stats.
df = ssl_from_conn_stats(df, start_time)
# Add conn stats info about unknown protocols.
df = add_unknown_from_conn_stats(df, start_time, include_unknown_protocol)
# Resolve IPs to domains using either nslookup or dns_events.
df = reverse_lookup_dns(df, start_time, use_pixie_dns_resolution)
# Filter to only not ssl'd if filter_for_not_ssl is set.
df = df[(not df.ssl) or (not filter_for_not_ssl)]
return df
def ssl_from_conn_stats(df, start_time):
conn_df = px.DataFrame("conn_stats", start_time=start_time)
conn_df = conn_df[conn_df.trace_role == 1]
conn_df.cidrs = px.get_cidrs()
conn_df = conn_df[
conn_df.remote_addr != "127.0.0.1"
and (conn_df.remote_addr != "0.0.0.0" and conn_df.remote_addr != "-")
]
conn_df = conn_df[not px.cidrs_contain_ip(conn_df.cidrs, conn_df.remote_addr)]
conn_df.pod = conn_df.ctx["pod"]
conn_df = conn_df.groupby(
["remote_addr", "protocol", "remote_port", "pod", "upid", "ssl"]
).agg()
conn_df.protocol = px.protocol_name(conn_df.protocol)
merged_df = df.merge(
conn_df,
how="inner",
left_on=["pod", "upid", "remote_addr", "remote_port", "protocol"],
right_on=["pod", "upid", "remote_addr", "remote_port", "protocol"],
suffixes=["", "_y"],
)
return merged_df[
[
"node",
"pod",
"upid",
"remote_addr",
"remote_port",
"protocol",
"ssl",
"bytes_egressed",
"example_req",
]
]
def dns_get_answer_at_idx(df, answer_idx):
df.domain = px.pluck(px.pluck_array(px.pluck(df.req_body, "queries"), 0), "name")
df.addr = px.pluck(
px.pluck_array(px.pluck(df.resp_body, "answers"), answer_idx), "addr"
)
df = df[df.domain != "" and df.addr != ""]
df = df[["domain", "addr"]]
return df
def reverse_lookup_dns(df, start_time: str, use_pixie_dns_resolution: bool):
dns_df = px.DataFrame("dns_events", start_time=start_time)
# Since we don't have an unnest operator, instead we union answers at each index.
dns_df_merged = dns_get_answer_at_idx(dns_df, 0)
dns_df_merged = dns_df_merged.append(dns_get_answer_at_idx(dns_df, 1))
dns_df_merged = dns_df_merged.append(dns_get_answer_at_idx(dns_df, 2))
dns_df_merged = dns_df_merged.append(dns_get_answer_at_idx(dns_df, 3))
dns_df_merged = dns_df_merged.append(dns_get_answer_at_idx(dns_df, 4))
dns_df_merged = dns_df_merged.append(dns_get_answer_at_idx(dns_df, 5))
# Make sure that the mapping from addr -> domain is one-to-many. Should investigate many-to-many situations.
dns_df_merged = dns_df_merged.groupby(["addr"]).agg(
resolved_domain=("domain", px.any)
)
df = df.merge(dns_df_merged, how="left", left_on=["remote_addr"], right_on=["addr"])
df = df.drop(["addr"])
df.resolved_domain = px.select(
(df.resolved_domain == "") or (not use_pixie_dns_resolution),
px.nslookup(df.remote_addr),
df.resolved_domain,
)
return df
def add_unknown_from_conn_stats(df, start_time: str, include_unknown_protocol: bool):
unknown_df = px.DataFrame("conn_stats", start_time=start_time)
unknown_df = unknown_df[unknown_df.protocol == 0]
unknown_df = unknown_df[unknown_df.trace_role == 1]
unknown_df.cidrs = px.get_cidrs()
unknown_df = unknown_df[
unknown_df.remote_addr != "127.0.0.1"
and (unknown_df.remote_addr != "0.0.0.0" and unknown_df.remote_addr != "-")
]
unknown_df = unknown_df[
not px.cidrs_contain_ip(unknown_df.cidrs, unknown_df.remote_addr)
]
unknown_df.pod = unknown_df.ctx["pod"]
unknown_df.service = unknown_df.ctx["service"]
unknown_df.node = unknown_df.ctx["node"]
unknown_df.protocol = px.protocol_name(unknown_df.protocol)
unknown_df = unknown_df.groupby(
["node", "pod", "upid", "remote_addr", "remote_port", "protocol", "ssl"]
).agg(
bytes_sent_min=("bytes_sent", px.min),
bytes_sent_max=("bytes_sent", px.max),
)
unknown_df.bytes_egressed = unknown_df.bytes_sent_max - unknown_df.bytes_sent_min
unknown_df = unknown_df.drop(["bytes_sent_max", "bytes_sent_min"])
unknown_df.example_req = (
"Traffic egressed over unknown protocol. "
+ "Pixie does not currently support tracing the protocol of this traffic."
)
unknown_df = unknown_df[include_unknown_protocol]
df = df.append(unknown_df)
return df
def common_events_setup(table_name: str, start_time: str):
df = px.DataFrame(table_name, start_time=start_time)
df = df[df.trace_role == 1]
df.cidrs = px.get_cidrs()
df = df[
df.remote_addr != "127.0.0.1"
and (df.remote_addr != "0.0.0.0" and df.remote_addr != "-")
]
df = df[not px.cidrs_contain_ip(df.cidrs, df.remote_addr)]
df.pod = df.ctx["pod"]
# Get the node from the hostname so that host processes have the correct node.
df.node = px._exec_hostname()
return df
def filter_for_pii(df, filter_for_pii_egress: bool):
df.req_redacted = px.redact_pii_best_effort(df.req)
df.contains_pii = px.regex_match(".*[<]REDACTED[_].*[>].*", df.req_redacted)
df = df[df.contains_pii or (not filter_for_pii_egress)]
return df.drop(["req_redacted", "contains_pii"])
def unified_events_table(start_time: str, filter_for_pii_egress: bool):
out_columns = [
"time_",
"upid",
"protocol",
"pod",
"node",
"remote_addr",
"remote_port",
"req",
"req_hdrs",
"resp",
"resp_hdrs",
"bytes",
]
# HTTP
http_df = common_events_setup("http_events", start_time)
http_df.req = http_df.req_body
http_df.req_hdrs = http_df.req_headers
http_df.resp = http_df.resp_body
http_df.resp_hdrs = http_df.resp_headers
http_df.protocol = "HTTP"
http_df.bytes = http_df.req_body_size
http_df = filter_for_pii(http_df, filter_for_pii_egress)
http_df = http_df[out_columns]
# PGSQL
pg_df = common_events_setup("pgsql_events", start_time)
pg_df.req_hdrs = '{"req_cmd": "' + pg_df.req_cmd + '"}'
pg_df.resp_hdrs = ""
pg_df.protocol = "PGSQL"
pg_df.bytes = px.length(pg_df.req)
pg_df = filter_for_pii(pg_df, filter_for_pii_egress)
pg_df = pg_df[out_columns]
# MYSQL
mysql_df = common_events_setup("mysql_events", start_time)
mysql_df.req_hdrs = '{"req_cmd": "' + px.itoa(mysql_df.req_cmd) + '"}'
mysql_df.resp_hdrs = ""
mysql_df.req = mysql_df.req_body
mysql_df.resp = mysql_df.resp_body
mysql_df.protocol = "MySQL"
mysql_df.bytes = px.length(mysql_df.req_body)
mysql_df = filter_for_pii(mysql_df, filter_for_pii_egress)
mysql_df = mysql_df[out_columns]
# CQL
cql_df = common_events_setup("cql_events", start_time)
cql_df.req_hdrs = '{"req_op": "' + px.itoa(cql_df.req_op) + '"}'
cql_df.resp_hdrs = '{"resp_op": "' + px.itoa(cql_df.resp_op) + '"}'
cql_df.req = cql_df.req_body
cql_df.resp = cql_df.resp_body
cql_df.protocol = "CQL"
cql_df.bytes = px.length(cql_df.req_body)
cql_df = filter_for_pii(cql_df, filter_for_pii_egress)
cql_df = cql_df[out_columns]
# DNS
dns_df = common_events_setup("dns_events", start_time)
dns_df.req = dns_df.req_body
dns_df.req_hdrs = dns_df.req_header
dns_df.resp = dns_df.resp_body
dns_df.resp_hdrs = dns_df.resp_header
dns_df.protocol = "DNS"
dns_df.bytes = px.length(dns_df.req_body)
dns_df = filter_for_pii(dns_df, filter_for_pii_egress)
dns_df = dns_df[out_columns]
# REDIS
redis_df = common_events_setup("redis_events", start_time)
redis_df.req_hdrs = '{"req_cmd": "' + redis_df.req_cmd + '"}'
redis_df.resp_hdrs = ""
redis_df.req = redis_df.req_args
redis_df.bytes = px.length(redis_df.req_args)
redis_df = filter_for_pii(redis_df, filter_for_pii_egress)
redis_df = redis_df[out_columns]
redis_df.protocol = "Redis"
df = http_df
df = df.append(pg_df)
df = df.append(mysql_df)
df = df.append(cql_df)
df = df.append(dns_df)
df = df.append(redis_df)
return df
---
short: Traffic leaving the cluster
long: >
This view displays a summary of the traffic from the cluster to external endpoints.
{
"variables": [
{
"name": "start_time",
"type": "PX_STRING",
"description": "The start time of the window in time units before now.",
"defaultValue": "-5m"
},
{
"name": "filter_for_pii_egress",
"type": "PX_BOOLEAN",
"description": "Whether to only include results where PII has been detected egressing.",
"defaultValue": "false"
},
{
"name": "include_unknown_protocol",
"type": "PX_BOOLEAN",
"description": "Whether to include results where protocol of traffic is not currently supported by Pixie tracing.",
"defaultValue": "true"
},
{
"name": "filter_for_not_ssl",
"type": "PX_BOOLEAN",
"description": "Include only traffic that is leaving the cluster unencrypted",
"defaultValue": "false"
},
{
"name": "use_pixie_dns_resolution",
"type": "PX_BOOLEAN",
"description": "Use pixie's DNS data to reverrse IP using forward DNS records, instead of nslookup",
"defaultValue": "false"
}
],
"widgets": [
{
"name": "Cluster Egress",
"position": {
"x": 0,
"y": 0,
"w": 12,
"h": 4
},
"displaySpec": {
"@type": "types.px.dev/px.vispb.Graph",
"adjacencyList": {
"fromColumn": "protocol",
"toColumn": "resolved_domain"
},
"edgeWeightColumn": "bytes_egressed",
"edgeColorColumn": "edge_color",
"edgeHoverInfo": [
"resolved_domain",
"protocol",
"bytes_egressed",
"ssl",
"example_req"
],
"edgeLength": 500,
"enableDefaultHierarchy": true,
"edgeThresholds": {
"mediumThreshold": 100,
"highThreshold": 200
}
},
"func": {
"name": "egress_graph",
"args": [
{
"name": "start_time",
"variable": "start_time"
},
{
"name": "filter_for_pii_egress",
"variable": "filter_for_pii_egress"
},
{
"name": "include_unknown_protocol",
"variable": "include_unknown_protocol"
},
{
"name": "filter_for_not_ssl",
"variable": "filter_for_not_ssl"
},
{
"name": "use_pixie_dns_resolution",
"variable": "use_pixie_dns_resolution"
}
]
}
},
{
"name": "Egress by pod, IP",
"position": {
"x": 0,
"y": 4,
"w": 12,
"h": 4
},
"displaySpec": {
"@type": "types.px.dev/px.vispb.Table"
},
"func": {
"name": "egress_data",
"args": [
{
"name": "start_time",
"variable": "start_time"
},
{
"name": "filter_for_pii_egress",
"variable": "filter_for_pii_egress"
},
{
"name": "include_unknown_protocol",
"variable": "include_unknown_protocol"
},
{
"name": "filter_for_not_ssl",
"variable": "filter_for_not_ssl"
},
{
"name": "use_pixie_dns_resolution",
"variable": "use_pixie_dns_resolution"
}
]
}
}
]
}
Supports Markdown
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment