Unverified Commit 42b9410f authored by Omid Azizi's avatar Omid Azizi Committed by Copybara
Browse files

ConnTrackersManager: Active trackers list

Summary: This helps avoid iterating through all trackers. The next diff will present an optimization where we reduce the number of iterations through the more expensive map.

Test Plan: Existing tests.

Reviewers: #stirling, yzhao

Reviewed By: #stirling, yzhao

Subscribers: yzhao

Differential Revision: https://phab.corp.pixielabs.ai/D8430

GitOrigin-RevId: 5d8f7a673d3951a09a16f20333b563d086b02436
parent 30826662
Showing with 52 additions and 120 deletions
+52 -120
......@@ -473,9 +473,6 @@ bool ConnTracker::SetProtocol(TrafficProtocol protocol, std::string_view reason)
TrafficProtocol old_protocol = traffic_class_.protocol;
traffic_class_.protocol = protocol;
if (manager_ != nullptr) {
manager_->UpdateProtocol(this);
}
CONN_TRACE(1) << absl::Substitute("Protocol changed: $0->$1", magic_enum::enum_name(old_protocol),
magic_enum::enum_name(protocol));
return true;
......
......@@ -111,25 +111,17 @@ uint64_t GetConnMapKey(uint32_t pid, uint32_t fd) {
ConnTrackersManager::ConnTrackersManager() : trackers_pool_(kMaxConnTrackerPoolSize) {}
void ConnTrackersManager::UpdateProtocol(px::stirling::ConnTracker* tracker) {
if (tracker->ReadyForDestruction()) {
return;
}
DebugChecks();
}
ConnTracker& ConnTrackersManager::GetOrCreateConnTracker(struct conn_id_t conn_id) {
const uint64_t conn_map_key = GetConnMapKey(conn_id.upid.pid, conn_id.fd);
DCHECK_NE(conn_map_key, 0) << "Connection map key cannot be 0, pid must be wrong";
ConnTrackerGenerations& conn_trackers = conn_id_to_conn_tracker_generations_[conn_map_key];
ConnTrackerGenerations& conn_trackers = conn_id_tracker_generations_[conn_map_key];
auto [conn_tracker_ptr, created] = conn_trackers.GetOrCreate(conn_id.tsid, &trackers_pool_);
if (created) {
++num_trackers_;
active_trackers_.push_back(conn_tracker_ptr);
conn_tracker_ptr->manager_ = this;
UpdateProtocol(conn_tracker_ptr);
}
DebugChecks();
......@@ -139,8 +131,8 @@ ConnTracker& ConnTrackersManager::GetOrCreateConnTracker(struct conn_id_t conn_i
StatusOr<const ConnTracker*> ConnTrackersManager::GetConnTracker(uint32_t pid, uint32_t fd) const {
const uint64_t conn_map_key = GetConnMapKey(pid, fd);
auto tracker_set_it = conn_id_to_conn_tracker_generations_.find(conn_map_key);
if (tracker_set_it == conn_id_to_conn_tracker_generations_.end()) {
auto tracker_set_it = conn_id_tracker_generations_.find(conn_map_key);
if (tracker_set_it == conn_id_tracker_generations_.end()) {
return error::NotFound("Could not find the tracker with pid=$0 fd=$1.", pid, fd);
}
......@@ -157,10 +149,23 @@ StatusOr<const ConnTracker*> ConnTrackersManager::GetConnTracker(uint32_t pid, u
}
void ConnTrackersManager::CleanupTrackers() {
{
auto iter = active_trackers_.begin();
while (iter != active_trackers_.end()) {
const auto& tracker = *iter;
if (tracker->ReadyForDestruction()) {
active_trackers_.erase(iter++);
++num_trackers_ready_for_destruction_;
} else {
++iter;
}
}
}
// Outer loop iterates through tracker sets (keyed by PID+FD),
// while inner loop iterates through generations of trackers for that PID+FD pair.
auto iter = conn_id_to_conn_tracker_generations_.begin();
while (iter != conn_id_to_conn_tracker_generations_.end()) {
auto iter = conn_id_tracker_generations_.begin();
while (iter != conn_id_tracker_generations_.end()) {
auto& tracker_generations = iter->second;
int num_erased = tracker_generations.CleanupGenerations(&trackers_pool_);
......@@ -169,7 +174,7 @@ void ConnTrackersManager::CleanupTrackers() {
num_trackers_ready_for_destruction_ -= num_erased;
if (tracker_generations.empty()) {
conn_id_to_conn_tracker_generations_.erase(iter++);
conn_id_tracker_generations_.erase(iter++);
} else {
++iter;
}
......@@ -178,60 +183,19 @@ void ConnTrackersManager::CleanupTrackers() {
DebugChecks();
}
Status ConnTrackersManager::TestOnlyCheckConsistency() const {
// A set used for looking for duplicate trackers.
std::set<ConnTracker*> trackers_set;
for (const auto& [protocol, conn_trackers_list] : conn_trackers_by_protocol_) {
for (auto iter = conn_trackers_list.begin(); iter != conn_trackers_list.end(); ++iter) {
ConnTracker* tracker = *iter;
// Check that tracker exists (i.e. that the pointer is valid).
// If the pointer is not valid, this will likely cause a crash or ASAN error.
const uint64_t conn_map_key =
GetConnMapKey(tracker->conn_id().upid.pid, tracker->conn_id().fd);
DCHECK_NE(conn_map_key, 0) << "Connection map key cannot be 0, pid must be wrong";
auto tracker_set_it = conn_id_to_conn_tracker_generations_.find(conn_map_key);
if (tracker_set_it == conn_id_to_conn_tracker_generations_.end()) {
return error::Internal("Tracker $0 in the protocol lists not found.",
ToString(tracker->conn_id()));
}
const auto& tracker_generations = tracker_set_it->second;
if (!tracker_generations.Contains(tracker->conn_id().tsid)) {
return error::Internal("Tracker $0 in the protocol lists not found.",
ToString(tracker->conn_id()));
}
// Check that the pointer only shows up once across all lists.
auto [unused, inserted] = trackers_set.insert(tracker);
if (!inserted) {
return error::Internal("Tracker $0 found in two lists.", ToString(tracker->conn_id()));
}
}
}
return Status::OK();
void ConnTrackersManager::DebugChecks() const {
DCHECK_EQ(num_trackers_, active_trackers_.size() + num_trackers_ready_for_destruction_);
}
void ConnTrackersManager::DebugChecks() const {}
std::string ConnTrackersManager::DebugInfo() const {
std::string out;
for (const auto& [protocol, conn_trackers_list] : conn_trackers_by_protocol_) {
absl::StrAppend(&out,
absl::Substitute("protocol=$0 num_trackers=$1", magic_enum::enum_name(protocol),
conn_trackers_list.size()));
for (auto iter = conn_trackers_list.begin(); iter != conn_trackers_list.end(); ++iter) {
ConnTracker* tracker = *iter;
absl::StrAppend(&out,
absl::Substitute(" conn_tracker=$0 zombie=$1 ready_for_destruction=$2\n",
tracker->ToString(), tracker->IsZombie(),
tracker->ReadyForDestruction()));
}
out += absl::Substitute("trackers: allocated=$0 active=$1\n", num_trackers_,
active_trackers_.size());
for (const auto& tracker : active_trackers_) {
out +=
absl::Substitute(" conn_tracker=$0 zombie=$1 ready_for_destruction=$2\n",
tracker->ToString(), tracker->IsZombie(), tracker->ReadyForDestruction());
}
return out;
......
......@@ -92,17 +92,14 @@ class ConnTrackersManager {
*/
ConnTracker& GetOrCreateConnTracker(struct conn_id_t conn_id);
const std::list<ConnTracker*>& active_trackers() const { return active_trackers_; }
/**
* Returns the latest generation of a connection tracker for the given pid and fd.
* If there is no tracker for {pid, fd}, returns error::NotFound.
*/
StatusOr<const ConnTracker*> GetConnTracker(uint32_t pid, uint32_t fd) const;
/**
* If a connection tracker has its protocol changed, then one must manually call this function.
*/
void UpdateProtocol(ConnTracker* tracker);
/**
* Deletes trackers that are ReadyForDestruction().
* Call this only after accumulating enough trackers to clean-up, to avoid the performance
......@@ -110,23 +107,11 @@ class ConnTrackersManager {
*/
void CleanupTrackers();
/**
* Checks the consistency of the data structures.
* Useful for catching bugs. Meant for use in testing.
* Could be expensive if called too regularly in production.
* See DebugChecks() for simpler checks that can be used in production.
*/
Status TestOnlyCheckConsistency() const;
/**
* Returns extensive debug information about the connection trackers.
*/
std::string DebugInfo() const;
const auto& conn_id_to_conn_tracker_generations() const {
return conn_id_to_conn_tracker_generations_;
}
private:
// Simple consistency DCHECKs meant for enforcing invariants.
void DebugChecks() const;
......@@ -134,13 +119,9 @@ class ConnTrackersManager {
// A map from conn_id (PID+FD+TSID) to tracker. This is for easy update on BPF events.
// Structured as two nested maps to be explicit about "generations" of trackers per PID+FD.
// Key is {PID, FD} for outer map, and tsid for inner map.
absl::flat_hash_map<uint64_t, ConnTrackerGenerations> conn_id_to_conn_tracker_generations_;
absl::flat_hash_map<uint64_t, ConnTrackerGenerations> conn_id_tracker_generations_;
// A set of lists of pointers to all the contained trackers, organized by protocol
// This is for easy access to the trackers during TransferData().
// Key is protocol.
// TODO(jps): Convert to vector?
absl::flat_hash_map<TrafficProtocol, std::list<ConnTracker*>> conn_trackers_by_protocol_;
std::list<ConnTracker*> active_trackers_;
// Keep track of total number of trackers, and other counts.
// Used to check for consistency.
......
......@@ -28,30 +28,27 @@ class ConnTrackersManagerTest : public ::testing::Test {
protected:
ConnTrackersManagerTest() : rng_(37), probability_dist_(0.0, 1.0) {}
ConnTrackersManager trackers_;
ConnTrackersManager trackers_mgr_;
std::default_random_engine rng_;
std::uniform_real_distribution<double> probability_dist_;
void CleanupTrackers() {
VLOG(1) << "CleanupTrackers";
trackers_.CleanupTrackers();
trackers_mgr_.CleanupTrackers();
}
void TransferStreamsProxy(double mark_for_death_probability, int death_countdown) {
for (const auto& [conn_id, conn_tracker_gen] :
trackers_.conn_id_to_conn_tracker_generations()) {
for (auto& [tsid, tracker] : conn_tracker_gen.generations()) {
if (probability_dist_(rng_) < mark_for_death_probability) {
tracker->MarkForDeath(death_countdown);
}
for (auto& tracker : trackers_mgr_.active_trackers()) {
if (probability_dist_(rng_) < mark_for_death_probability) {
tracker->MarkForDeath(death_countdown);
}
}
}
void TrackerEvent(struct conn_id_t conn_id, TrafficProtocol protocol) {
VLOG(1) << "TrackerEvent";
ConnTracker& tracker = trackers_.GetOrCreateConnTracker(conn_id);
ConnTracker& tracker = trackers_mgr_.GetOrCreateConnTracker(conn_id);
tracker.SetConnID(conn_id);
tracker.SetProtocol(protocol, "for testing");
}
......@@ -59,6 +56,7 @@ class ConnTrackersManagerTest : public ::testing::Test {
// This is a stress on ConnTrackersManager.
// Each iteration, a different action is taken, and the consistency of the structure is checked.
// The test also relies on the ConnTrackersManager internal checks (e.g. DebugChecks()).
// ASAN runs can also identify issues while being stressed.
TEST_F(ConnTrackersManagerTest, Fuzz) {
constexpr int kIters = 1000000;
......@@ -93,8 +91,6 @@ TEST_F(ConnTrackersManagerTest, Fuzz) {
} else {
CleanupTrackers();
}
ASSERT_OK(trackers_.TestOnlyCheckConsistency());
}
}
......
......@@ -377,28 +377,22 @@ void SocketTraceConnector::TransferDataImpl(ConnectorContext* ctx,
data_table->SetConsumeRecordsCutoffTime(perf_buffer_drain_time_);
}
// TODO(yzhao): Can have a ConnTrackersManager::GetAllConnTrackers() to return all ConnTracker
// objects, and hide generations.
for (const auto& [conn_id, conn_tracker_gen] :
conn_trackers_mgr_.conn_id_to_conn_tracker_generations()) {
for (auto& [tsid, conn_tracker] : conn_tracker_gen.generations()) {
DCHECK_LT(conn_tracker->traffic_class().protocol, protocol_transfer_specs_.size());
const auto& transfer_spec = protocol_transfer_specs_[conn_tracker->traffic_class().protocol];
DataTable* data_table = data_tables[transfer_spec.table_num];
if (data_table == nullptr) {
continue;
}
for (const auto& conn_tracker : conn_trackers_mgr_.active_trackers()) {
const auto& transfer_spec = protocol_transfer_specs_[conn_tracker->traffic_class().protocol];
DataTable* data_table = data_tables[transfer_spec.table_num];
if (data_table == nullptr) {
continue;
}
UpdateTrackerTraceLevel(conn_tracker.get());
UpdateTrackerTraceLevel(conn_tracker);
conn_tracker->IterationPreTick(iteration_start_time_, cluster_cidrs, proc_parser_.get(),
socket_info_mgr_.get());
if (transfer_spec.enabled && transfer_spec.transfer_fn) {
transfer_spec.transfer_fn(*this, ctx, conn_tracker.get(), data_table);
}
conn_tracker->IterationPostTick();
conn_tracker->IterationPreTick(iteration_start_time_, cluster_cidrs, proc_parser_.get(),
socket_info_mgr_.get());
if (transfer_spec.enabled && transfer_spec.transfer_fn) {
transfer_spec.transfer_fn(*this, ctx, conn_tracker, data_table);
}
conn_tracker->IterationPostTick();
}
// Once we've cleared all the debug trace levels for this pid, we can remove it from the list.
......
Supports Markdown
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment