Unverified Commit 9235dbcd authored by Omid Azizi's avatar Omid Azizi Committed by Copybara
Browse files

SocketTraceConnector: Clean-up TransferStream


Summary:
Tidying up some of the code for consistency.
Also optimize to make fewer calls to std::chrono::now().

Test Plan: No functional changes.

Reviewers: #stirling, yzhao

Reviewed By: #stirling, yzhao

Subscribers: yzhao
Signed-off-by: default avatarOmid Azizi <oazizi@pixielabs.ai>

Differential Revision: https://phab.corp.pixielabs.ai/D10832

GitOrigin-RevId: 2576c5bd34eecb2ccd55e62876a0e6da75ff7526
parent caac1566
Showing with 36 additions and 30 deletions
+36 -30
......@@ -526,8 +526,6 @@ class ConnTracker : NotCopyMoveable {
using TFrameType = typename TProtocolTraits::frame_type;
using TStateType = typename TProtocolTraits::state_type;
InitFrames<TFrameType>();
if constexpr (std::is_same_v<TFrameType, protocols::http2::Stream>) {
http2_client_streams_.Cleanup(frame_size_limit_bytes, frame_expiry_timestamp);
http2_server_streams_.Cleanup(frame_size_limit_bytes, frame_expiry_timestamp);
......@@ -589,12 +587,14 @@ class ConnTracker : NotCopyMoveable {
data_buffer_total += recv_data().data_buffer().capacity();
size_t parsed_msg_total = 0;
parsed_msg_total += send_data().FramesSize<TFrameType>();
parsed_msg_total += recv_data().FramesSize<TFrameType>();
size_t http2_events_total = 0;
http2_events_total += http2_client_streams_.StreamsSize();
http2_events_total += http2_server_streams_.StreamsSize();
if constexpr (std::is_same_v<TFrameType, protocols::http2::Stream>) {
http2_events_total += http2_client_streams_.StreamsSize();
http2_events_total += http2_server_streams_.StreamsSize();
} else {
parsed_msg_total += send_data().FramesSize<TFrameType>();
parsed_msg_total += recv_data().FramesSize<TFrameType>();
}
return data_buffer_total + http2_events_total + parsed_msg_total;
}
......
......@@ -194,10 +194,6 @@ void SocketTraceConnector::InitProtocolTransferSpecs() {
kNATSTableNum,
{kRoleClient, kRoleServer},
TRANSFER_STREAM_PROTOCOL(nats)}},
// TODO(chengruizhe): Add Mongo table. nullptr should be replaced by the transfer_fn for
// mongo in the future.
{kProtocolMongo,
TransferSpec{false, kHTTPTableNum, {kRoleUnknown, kRoleClient, kRoleServer}, nullptr}},
{kProtocolKafka, TransferSpec{FLAGS_stirling_enable_kafka_tracing,
kKafkaTableNum,
{kRoleClient, kRoleServer},
......@@ -206,13 +202,15 @@ void SocketTraceConnector::InitProtocolTransferSpecs() {
kMuxTableNum,
{kRoleClient, kRoleServer},
TRANSFER_STREAM_PROTOCOL(mux)}},
{kProtocolUnknown, TransferSpec{false /*enabled*/,
// Unknown protocols attached to HTTP table so that they run
// their cleanup functions, but the use of nullptr transfer_fn
// means it won't actually transfer data to the HTTP table.
kHTTPTableNum,
{kRoleUnknown, kRoleClient, kRoleServer},
nullptr /*transfer_fn*/}}};
// TODO(chengruizhe): Update Mongo after implementing protocol parsers.
{kProtocolMongo, TransferSpec{/* enabled */ false,
/* table_num */ static_cast<uint32_t>(-1),
/* trace_roles */ {},
/* transfer_fn */ nullptr}},
{kProtocolUnknown, TransferSpec{/*enabled*/ false,
/* table_num */ static_cast<uint32_t>(-1),
/* trace_roles */ {},
/* transfer_fn */ nullptr}}};
#undef TRANSFER_STREAM_PROTOCOL
for (uint64_t i = 0; i < kNumProtocols; ++i) {
......@@ -590,7 +588,7 @@ void SocketTraceConnector::TransferDataImpl(ConnectorContext* ctx,
}
constexpr auto kDebugDumpPeriod = std::chrono::minutes(1);
if (sampling_freq_mgr_.count() % (kDebugDumpPeriod / kSamplingPeriod)) {
if (sampling_freq_mgr_.count() % (kDebugDumpPeriod / kSamplingPeriod) == 0) {
if (debug_level_ >= 1) {
LOG(INFO) << "Context: " << DumpContext(ctx);
LOG(INFO) << "BPF map info: " << BPFMapsInfo(static_cast<BCCWrapper*>(this));
......@@ -612,15 +610,27 @@ void SocketTraceConnector::TransferDataImpl(ConnectorContext* ctx,
for (const auto& conn_tracker : conn_trackers_mgr_.active_trackers()) {
const auto& transfer_spec = protocol_transfer_specs_[conn_tracker->protocol()];
DataTable* data_table = data_tables[transfer_spec.table_num];
DataTable* data_table = nullptr;
if (transfer_spec.enabled) {
data_table = data_tables[transfer_spec.table_num];
}
UpdateTrackerTraceLevel(conn_tracker);
conn_tracker->IterationPreTick(iteration_time_, cluster_cidrs, proc_parser_.get(),
socket_info_mgr_.get());
if (transfer_spec.enabled && transfer_spec.transfer_fn && data_table != nullptr) {
if (transfer_spec.transfer_fn != nullptr) {
transfer_spec.transfer_fn(*this, ctx, conn_tracker, data_table);
} else {
// If there's no transfer function, then the tracker should not be holding any data.
// http::ProtocolTraits is used as a placeholder; the frames deque is expected to be
// std::monotstate.
ECHECK(conn_tracker->send_data().Empty<protocols::http::Message>());
ECHECK(conn_tracker->recv_data().Empty<protocols::http::Message>());
}
conn_tracker->IterationPostTick();
}
......@@ -1212,7 +1222,7 @@ void SocketTraceConnector::TransferStream(ConnectorContext* ctx, ConnTracker* tr
// This is a nop if the containers are already of the right type.
tracker->InitFrames<TFrameType>();
if (tracker->state() == ConnTracker::State::kTransferring) {
if (data_table != nullptr && tracker->state() == ConnTracker::State::kTransferring) {
// ProcessToRecords() parses raw events and produces messages in format that are expected by
// table store. But those messages are not cached inside ConnTracker.
auto records = tracker->ProcessToRecords<TProtocolTraits>();
......@@ -1223,11 +1233,10 @@ void SocketTraceConnector::TransferStream(ConnectorContext* ctx, ConnTracker* tr
}
}
auto buffer_expiry_timestamp =
iteration_time() - std::chrono::seconds(FLAGS_datastream_buffer_expiry_duration_secs);
auto message_expiry_timestamp =
iteration_time_ - std::chrono::seconds(FLAGS_messages_expiry_duration_secs);
auto buffer_expiry_timestamp = std::chrono::steady_clock::now() -
std::chrono::seconds(FLAGS_datastream_buffer_expiry_duration_secs);
iteration_time() - std::chrono::seconds(FLAGS_messages_expiry_duration_secs);
tracker->Cleanup<TProtocolTraits>(FLAGS_messages_size_limit_bytes,
FLAGS_datastream_buffer_retention_size,
......
......@@ -170,12 +170,9 @@ class SocketTraceConnector : public SourceConnector, public bpf_tools::BCCWrappe
void AcceptHTTP2Header(std::unique_ptr<HTTP2HeaderEvent> event);
void AcceptHTTP2Data(std::unique_ptr<HTTP2DataEvent> event);
// Transfer of messages to the data table.
void TransferStreams(ConnectorContext* ctx, uint32_t table_num, DataTable* data_table);
void TransferConnStats(ConnectorContext* ctx, DataTable* data_table);
template <typename TProtocolTraits>
void TransferStream(ConnectorContext* ctx, ConnTracker* tracker, DataTable* data_table);
void TransferConnStats(ConnectorContext* ctx, DataTable* data_table);
void set_iteration_time(std::chrono::time_point<std::chrono::steady_clock> time) {
DCHECK(time >= iteration_time_);
......
Supports Markdown
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment