Commit fb347078 authored by Omid Azizi's avatar Omid Azizi
Browse files

Standalone Stirling profiler

Summary:
Made this to investigate the performance issues on gke:prod,
but good to have around.

Test Plan: None

Reviewers: jps, yzhao, rcheng, #engineering

Reviewed By: yzhao, #engineering

Differential Revision: https://phab.corp.pixielabs.ai/D7537

GitOrigin-RevId: 0146c01421f82dd290409d2f2d98e9cf4761aeee
parent bab3943f
Showing with 198 additions and 34 deletions
+198 -34
......@@ -141,6 +141,14 @@ inline Status StatusAdapter<pl::statuspb::Status>(const pl::statuspb::Status& s)
// The argument expression is guaranteed to be evaluated exactly once.
#define PL_RETURN_IF_ERROR(__status) PL_RETURN_IF_ERROR_IMPL(PL_UNIQUE_NAME(__status__), __status)
#define PL_EXIT_IF_ERROR(__status) \
{ \
if (!__status.ok()) { \
LOG(ERROR) << __status.msg(); \
exit(1); \
} \
}
#define PL_CHECK_OK_PREPEND(to_call, msg) \
do { \
auto _s = (to_call); \
......
......@@ -36,6 +36,14 @@ pl_cc_binary(
],
)
pl_cc_binary(
name = "stirling_profiler",
srcs = ["stirling_profiler.cc"],
deps = [
"//src/stirling:cc_library",
],
)
cc_image(
name = "stirling_wrapper_image",
base = "//:pl_cc_bpf_debug_image",
......
......@@ -16,25 +16,28 @@
#include "src/stirling/source_connectors/dynamic_tracer/dynamic_tracing/dynamic_tracer.h"
#include "src/stirling/stirling.h"
using pl::Status;
using pl::StatusOr;
using pl::stirling::IndexPublication;
using pl::stirling::PrintRecordBatch;
using pl::stirling::SourceRegistry;
using pl::stirling::Stirling;
using pl::stirling::stirlingpb::Publish;
using pl::stirling::stirlingpb::Subscribe;
using ::pl::ProcessStatsMonitor;
using ::pl::Status;
using ::pl::StatusOr;
using ::pl::stirling::IndexPublication;
using ::pl::stirling::PrintRecordBatch;
using ::pl::stirling::SourceRegistry;
using ::pl::stirling::Stirling;
using ::pl::stirling::stirlingpb::InfoClass;
using ::pl::stirling::stirlingpb::Publish;
using ::pl::stirling::stirlingpb::Subscribe;
using DynamicTracepointDeployment =
pl::stirling::dynamic_tracing::ir::logical::TracepointDeployment;
::pl::stirling::dynamic_tracing::ir::logical::TracepointDeployment;
using pl::types::ColumnWrapperRecordBatch;
using pl::types::TabletID;
using ::pl::types::ColumnWrapperRecordBatch;
using ::pl::types::TabletID;
// Put this in global space, so we can kill it in the signal handler.
Stirling* g_stirling = nullptr;
pl::ProcessStatsMonitor* g_process_stats_monitor = nullptr;
absl::flat_hash_map<uint64_t, ::pl::stirling::stirlingpb::InfoClass> g_table_info_map;
ProcessStatsMonitor* g_process_stats_monitor = nullptr;
absl::flat_hash_map<uint64_t, InfoClass> g_table_info_map;
absl::base_internal::SpinLock g_callback_state_lock;
Status StirlingWrapperCallback(uint64_t table_id, TabletID /* tablet_id */,
......@@ -46,7 +49,7 @@ Status StirlingWrapperCallback(uint64_t table_id, TabletID /* tablet_id */,
if (iter == g_table_info_map.end()) {
return pl::error::Internal("Encountered unknown table id $0", table_id);
}
const pl::stirling::stirlingpb::InfoClass& table_info = iter->second;
const InfoClass& table_info = iter->second;
PrintRecordBatch(table_info.schema().name(), table_info.schema(), *record_batch);
......@@ -149,7 +152,7 @@ int main(int argc, char** argv) {
stirling->RegisterDataPushCallback(StirlingWrapperCallback);
// Start measuring process stats after init.
pl::ProcessStatsMonitor process_stats_monitor;
ProcessStatsMonitor process_stats_monitor;
g_process_stats_monitor = &process_stats_monitor;
LOG(INFO) << "Trace spec:\n" << trace_program.text;
......@@ -177,7 +180,7 @@ int main(int argc, char** argv) {
std::thread run_thread = std::thread(&Stirling::Run, stirling.get());
// Wait for the thread to return.
// This should never happen unless --timeout_secs is specified.
// This should never happen.
run_thread.join();
return 0;
......
#include <unistd.h>
#include <thread>
#include "src/common/base/base.h"
#include "src/shared/upid/upid.h"
#include "src/stirling/core/pub_sub_manager.h"
#include "src/stirling/core/source_registry.h"
#include "src/stirling/source_connectors/perf_profiler/perf_profile_connector.h"
#include "src/stirling/source_connectors/perf_profiler/stack_traces_table.h"
#include "src/stirling/stirling.h"
using ::pl::ProcessStatsMonitor;
using ::pl::Status;
using ::pl::StatusOr;
using ::pl::stirling::IndexPublication;
using ::pl::stirling::PerfProfileConnector;
using ::pl::stirling::SourceRegistry;
using ::pl::stirling::Stirling;
using ::pl::stirling::stirlingpb::InfoClass;
using ::pl::stirling::stirlingpb::Publish;
using ::pl::stirling::stirlingpb::Subscribe;
using ::pl::md::UPID;
using ::pl::types::ColumnWrapperRecordBatch;
using ::pl::types::TabletID;
struct Args {
uint32_t pid = 0;
};
// Put this in global space, so we can kill it in the signal handler.
Stirling* g_stirling = nullptr;
ProcessStatsMonitor* g_process_stats_monitor = nullptr;
absl::flat_hash_map<uint64_t, InfoClass> g_table_info_map;
std::atomic<bool> g_data_received = false;
Args g_args;
Status ParseArgs(int argc, char** argv) {
if (argc != 2) {
return ::pl::error::Internal("Usage: ./stirling_profiler <pid>");
}
std::string_view pid_str(argv[1]);
bool success = absl::SimpleAtoi(pid_str, &g_args.pid);
if (!success) {
return ::pl::error::Internal("PID is not a valid number: $0", pid_str);
}
return Status::OK();
}
Status StirlingWrapperCallback(uint64_t table_id, TabletID /* tablet_id */,
std::unique_ptr<ColumnWrapperRecordBatch> record_batch) {
// Find the table info from the publications.
auto iter = g_table_info_map.find(table_id);
CHECK(iter != g_table_info_map.end());
const InfoClass& table_info = iter->second;
CHECK_EQ(table_info.schema().name(), "stack_traces.beta");
auto& upid_col = (*record_batch)[pl::stirling::kStackTraceUPIDIdx];
auto& stack_trace_str_col = (*record_batch)[pl::stirling::kStackTraceStackTraceStrIdx];
auto& count_col = (*record_batch)[pl::stirling::kStackTraceCountIdx];
std::string out;
for (size_t i = 0; i < stack_trace_str_col->Size(); ++i) {
UPID upid(upid_col->Get<pl::types::UInt128Value>(i).val);
if (g_args.pid == upid.pid()) {
std::cout << stack_trace_str_col->Get<pl::types::StringValue>(i);
std::cout << " ";
std::cout << count_col->Get<pl::types::Int64Value>(i).val;
std::cout << "\n";
}
}
g_data_received = true;
return Status::OK();
}
void SignalHandler(int signum) {
std::cerr << "\n\nStopping, might take a few seconds ..." << std::endl;
// Important to call Stop(), because it releases BPF resources,
// which would otherwise leak.
if (g_stirling != nullptr) {
g_stirling->Stop();
}
if (g_process_stats_monitor != nullptr) {
g_process_stats_monitor->PrintCPUTime();
}
exit(signum);
}
int main(int argc, char** argv) {
// Register signal handlers to clean-up on exit.
signal(SIGINT, SignalHandler);
signal(SIGQUIT, SignalHandler);
signal(SIGTERM, SignalHandler);
signal(SIGHUP, SignalHandler);
pl::EnvironmentGuard env_guard(&argc, argv);
PL_EXIT_IF_ERROR(ParseArgs(argc, argv));
// Make Stirling.
auto registry = std::make_unique<SourceRegistry>();
registry->RegisterOrDie<PerfProfileConnector>("perf_profiler");
std::unique_ptr<Stirling> stirling = Stirling::Create(std::move(registry));
g_stirling = stirling.get();
stirling->RegisterDataPushCallback(StirlingWrapperCallback);
// Get a publish proto message and subscribe to sources.
Publish publication;
stirling->GetPublishProto(&publication);
IndexPublication(publication, &g_table_info_map);
PL_CHECK_OK(stirling->SetSubscription(pl::stirling::SubscribeToAllInfoClasses(publication)));
// Start measuring process stats after init.
ProcessStatsMonitor process_stats_monitor;
g_process_stats_monitor = &process_stats_monitor;
// Run Stirling.
std::thread run_thread = std::thread(&Stirling::Run, stirling.get());
// Run for the specified amount of time, then terminate.
for (int i = 0; i < 100; ++i) {
std::this_thread::sleep_for(std::chrono::seconds(1));
if (g_data_received) {
break;
}
}
stirling->Stop();
// Wait for the thread to return.
run_thread.join();
return 0;
}
......@@ -24,21 +24,24 @@
#include "src/carnot/planner/probes/tracepoint_generator.h"
#endif
using pl::Status;
using pl::StatusOr;
using pl::stirling::IndexPublication;
using pl::stirling::PrintRecordBatch;
using pl::stirling::SourceRegistry;
using pl::stirling::SourceRegistrySpecifier;
using pl::stirling::Stirling;
using pl::stirling::stirlingpb::Publish;
using pl::stirling::stirlingpb::Subscribe;
using ::pl::ProcessStatsMonitor;
using ::pl::Status;
using ::pl::StatusOr;
using ::pl::stirling::IndexPublication;
using ::pl::stirling::PrintRecordBatch;
using ::pl::stirling::SourceRegistry;
using ::pl::stirling::SourceRegistrySpecifier;
using ::pl::stirling::Stirling;
using ::pl::stirling::stirlingpb::InfoClass;
using ::pl::stirling::stirlingpb::Publish;
using ::pl::stirling::stirlingpb::Subscribe;
using DynamicTracepointDeployment =
pl::stirling::dynamic_tracing::ir::logical::TracepointDeployment;
::pl::stirling::dynamic_tracing::ir::logical::TracepointDeployment;
using pl::types::ColumnWrapperRecordBatch;
using pl::types::TabletID;
using ::pl::types::ColumnWrapperRecordBatch;
using ::pl::types::TabletID;
DEFINE_string(sources, "kProd",
"[kAll|kProd|kMetrics|kTracers|kProfiler] Choose sources to enable.");
......@@ -55,7 +58,7 @@ DEFINE_bool(enable_heap_profiler, false, "If true, heap profiling is enabled.");
// Put this in global space, so we can kill it in the signal handler.
Stirling* g_stirling = nullptr;
pl::ProcessStatsMonitor* g_process_stats_monitor = nullptr;
ProcessStatsMonitor* g_process_stats_monitor = nullptr;
//-----------------------------------------------------------------------------
// Callback/Printing Code
......@@ -63,7 +66,7 @@ pl::ProcessStatsMonitor* g_process_stats_monitor = nullptr;
absl::flat_hash_set<std::string> g_table_print_enables;
absl::flat_hash_map<uint64_t, ::pl::stirling::stirlingpb::InfoClass> g_table_info_map;
absl::flat_hash_map<uint64_t, InfoClass> g_table_info_map;
absl::base_internal::SpinLock g_callback_state_lock;
Status StirlingWrapperCallback(uint64_t table_id, TabletID /* tablet_id */,
......@@ -75,7 +78,7 @@ Status StirlingWrapperCallback(uint64_t table_id, TabletID /* tablet_id */,
if (iter == g_table_info_map.end()) {
return pl::error::Internal("Encountered unknown table id $0", table_id);
}
const pl::stirling::stirlingpb::InfoClass& table_info = iter->second;
const InfoClass& table_info = iter->second;
if (g_table_print_enables.contains(table_info.schema().name())) {
// Only output enabled tables (lookup by name).
......@@ -292,7 +295,7 @@ int main(int argc, char** argv) {
}
// Start measuring process stats after init.
pl::ProcessStatsMonitor process_stats_monitor;
ProcessStatsMonitor process_stats_monitor;
g_process_stats_monitor = &process_stats_monitor;
// Run Data Collector.
......
Supports Markdown
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment