Commit 4fe9466e authored by Natalie Serrino's avatar Natalie Serrino
Browse files

PP-2117: When Kelvin is executing a streaming plan, don't call the batch API...

PP-2117: When Kelvin is executing a streaming plan, don't call the batch API when the query completes.

Summary:
Currently, there are two APIs for Kelvin sending results to the query broker: TransferResultChunk and ReceiveAgentQueryResult. TransferResultChunk is streaming and will subsume ReceiveAgentQueryResult, which is batch. The query broker has changes to receive results from both, but this Kelvin change is needed so that we only call the ReceiveAgentQueryResult API in situations where the Kelvin plan produces a memory sink.

ReceiveAgentQueryResult reads memory sinks, but in the new streaming API, the sinks will send the data directly to the query broker as GRPC sinks. As a result, during the temporary phase where we support both APIs, we differentiate between which API to use to send the results based on whether or not there are memory sinks in the final plan.

Afterwards, ReceiveAgentQueryResult will be fully deprecated and a lot of this code will go away.

Also, deprecated query_str fully from the execute query request, because it is no longer used anywhere.

Test Plan: added unit test

Reviewers: zasgar, michelle, philkuz, #engineering

Reviewed By: zasgar, #engineering

JIRA Issues: PP-2117

Differential Revision: https://phab.corp.pixielabs.ai/D5961

GitOrigin-RevId: ae680d647a895adbc965e8b2b36c693534bb9656
parent 80736260
No related merge requests found
Showing with 243 additions and 159 deletions
+243 -159
This diff is collapsed.
......@@ -112,8 +112,8 @@ message ExecuteQueryRequest {
uuidpb.UUID query_id = 1 [
(gogoproto.customname) = "QueryID"
];
// TODO(philkuz) DEPRECATION planned.
string query_str = 2;
// Formerly used for query string.
reserved 2;
pl.carnot.planpb.Plan plan = 3;
bool analyze = 4;
}
......
......@@ -73,3 +73,15 @@ pl_cc_test(
"//src/common/testing/event:cc_library",
],
)
pl_cc_test(
name = "exec_test",
srcs = ["exec_test.cc"],
deps = [
":cc_library",
"//src/common/testing/grpc_utils:cc_library",
"//src/common/event:cc_library",
"//src/common/system:cc_library_mock",
"//src/common/testing/event:cc_library",
],
)
......@@ -57,15 +57,22 @@ class ExecuteQueryMessageHandler::ExecuteQueryTask : public AsyncTask {
void Work() override {
AgentQueryResultRequest res_req;
auto contains_batch_result_or_s = PlanContainsBatchResults(req_.plan());
if (!contains_batch_result_or_s.ok()) {
LOG(ERROR) << absl::Substitute("Query failed, reason: $0, plan: $1",
contains_batch_result_or_s.status().msg(),
req_.plan().DebugString());
}
bool send_batch_result = qb_stub_ != nullptr && contains_batch_result_or_s.ConsumeValueOrDie();
auto s = ExecuteQueryInternal((qb_stub_ == nullptr) ? nullptr : res_req.mutable_result());
auto s = ExecuteQueryInternal(send_batch_result ? res_req.mutable_result() : nullptr);
if (!s.ok()) {
LOG(ERROR) << absl::Substitute("Query failed, reason: $0, query: $1", s.ToString(),
req_.query_str());
LOG(ERROR) << req_.plan().DebugString();
LOG(ERROR) << absl::Substitute("Query failed, reason: $0, plan: $1", s.ToString(),
req_.plan().DebugString());
}
if (agent_info_->capabilities.collects_data()) {
if (!send_batch_result) {
// In distributed mode only non data collecting nodes send data.
// TODO(zasgar/philkuz/michelle): We should actually just code in the Querybroker address into
// the plan and remove the hardcoding here.
......@@ -101,16 +108,13 @@ class ExecuteQueryMessageHandler::ExecuteQueryTask : public AsyncTask {
{
ScopedTimer query_timer(absl::Substitute("query timer: id=$0", query_id_.str()));
StatusOr<carnot::CarnotQueryResult> result_or_s;
if (req_.has_plan()) {
result_or_s = carnot_->ExecutePlan(req_.plan(), query_id_, req_.analyze());
} else {
result_or_s =
carnot_->ExecuteQuery(req_.query_str(), query_id_, CurrentTimeNS(), req_.analyze());
}
result_or_s = carnot_->ExecutePlan(req_.plan(), query_id_, req_.analyze());
if (resp == nullptr) {
return result_or_s.status();
}
// TODO(nserrino): Deprecate this logic when Kelvin executes all queries in streaming mode.
*resp->mutable_query_id() = req_.query_id();
if (!result_or_s.ok()) {
*resp->mutable_status() = result_or_s.status().ToProto();
......
......@@ -3,6 +3,7 @@
#include <memory>
#include <absl/container/flat_hash_map.h>
#include "src/carnot/plan/plan.h"
#include "src/vizier/services/agent/manager/manager.h"
#include "src/vizier/services/query_broker/querybrokerpb/service.grpc.pb.h"
......@@ -48,6 +49,39 @@ class ExecuteQueryMessageHandler : public Manager::MessageHandler {
absl::flat_hash_map<sole::uuid, pl::event::RunnableAsyncTaskUPtr> running_queries_;
};
// TODO(nserrino): Delete this function when the batch API is deprecated.
inline StatusOr<bool> PlanContainsBatchResults(const carnot::planpb::Plan& plan_pb) {
auto no_op = [&](const auto&) { return Status::OK(); };
carnot::plan::Plan plan;
PL_RETURN_IF_ERROR(plan.Init(plan_pb));
bool plan_contains_batch_results = false;
auto s = carnot::plan::PlanWalker()
.OnPlanFragment([&](auto* pf) {
return carnot::plan::PlanFragmentWalker()
.OnMemorySink([&](auto&) {
plan_contains_batch_results = true;
return Status::OK();
})
.OnMap(no_op)
.OnAggregate(no_op)
.OnFilter(no_op)
.OnLimit(no_op)
.OnMemorySource(no_op)
.OnUnion(no_op)
.OnJoin(no_op)
.OnGRPCSource(no_op)
.OnGRPCSink(no_op)
.OnUDTFSource(no_op)
.Walk(pf);
})
.Walk(&plan);
PL_RETURN_IF_ERROR(s);
return plan_contains_batch_results;
}
} // namespace agent
} // namespace vizier
} // namespace pl
#include <gtest/gtest.h>
#include "src/vizier/services/agent/manager/exec.h"
#include "src/common/testing/testing.h"
namespace pl {
namespace vizier {
namespace agent {
constexpr char kPlanFragmentTmpl[] = R"proto(
dag {
nodes {
id: 1
}
}
nodes {
id: 1
dag {
nodes {
id: 1
sorted_children: 2
}
nodes {
id: 2
}
}
nodes {
id: 1
op {
op_type: MEMORY_SOURCE_OPERATOR
mem_source_op {
name: "cpu"
column_idxs: 0
column_names: "count"
column_types: INT64
}
}
}
nodes {
id: 2
op $0
}
}
)proto";
constexpr char kMemSinkOp[] = R"(
{
op_type: MEMORY_SINK_OPERATOR
mem_sink_op {
name: "out"
column_names: "count"
column_types: INT64
column_semantic_types: ST_NONE
}
}
)";
constexpr char kGRPCSinkOp[] = R"(
{
op_type: GRPC_SINK_OPERATOR
grpc_sink_op {
address: "localhost:1234"
grpc_source_id: 0
}
}
)";
TEST(PlanContainsBatchResultsTest, WithBatchResults) {
carnot::planpb::Plan plan_pb;
auto plan_str = absl::Substitute(kPlanFragmentTmpl, kMemSinkOp);
ASSERT_TRUE(google::protobuf::TextFormat::MergeFromString(plan_str, &plan_pb));
auto res = PlanContainsBatchResults(plan_pb);
ASSERT_OK(res);
ASSERT_TRUE(res.ConsumeValueOrDie());
}
TEST(PlanContainsBatchResultsTest, WithoutBatchResults) {
carnot::planpb::Plan plan_pb;
auto plan_str = absl::Substitute(kPlanFragmentTmpl, kGRPCSinkOp);
ASSERT_TRUE(google::protobuf::TextFormat::MergeFromString(plan_str, &plan_pb));
auto res = PlanContainsBatchResults(plan_pb);
ASSERT_OK(res);
ASSERT_FALSE(res.ConsumeValueOrDie());
}
} // namespace agent
} // namespace vizier
} // namespace pl
Supports Markdown
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment