From 4fe9466ea29973cfdb88b5e93fbe482bdef93cc0 Mon Sep 17 00:00:00 2001 From: Natalie Serrino <nserrino@pixielabs.ai> Date: Wed, 12 Aug 2020 17:17:02 -0700 Subject: [PATCH] PP-2117: When Kelvin is executing a streaming plan, don't call the batch API when the query completes. Summary: Currently, there are two APIs for Kelvin sending results to the query broker: TransferResultChunk and ReceiveAgentQueryResult. TransferResultChunk is streaming and will subsume ReceiveAgentQueryResult, which is batch. The query broker has changes to receive results from both, but this Kelvin change is needed so that we only call the ReceiveAgentQueryResult API in situations where the Kelvin plan produces a memory sink. ReceiveAgentQueryResult reads memory sinks, but in the new streaming API, the sinks will send the data directly to the query broker as GRPC sinks. As a result, during the temporary phase where we support both APIs, we differentiate between which API to use to send the results based on whether or not there are memory sinks in the final plan. Afterwards, ReceiveAgentQueryResult will be fully deprecated and a lot of this code will go away. Also, deprecated query_str fully from the execute query request, because it is no longer used anywhere. Test Plan: added unit test Reviewers: zasgar, michelle, philkuz, #engineering Reviewed By: zasgar, #engineering JIRA Issues: PP-2117 Differential Revision: https://phab.corp.pixielabs.ai/D5961 GitOrigin-RevId: ae680d647a895adbc965e8b2b36c693534bb9656 --- src/vizier/messages/messagespb/messages.pb.go | 235 +++++++----------- src/vizier/messages/messagespb/messages.proto | 4 +- src/vizier/services/agent/manager/BUILD.bazel | 12 + src/vizier/services/agent/manager/exec.cc | 26 +- src/vizier/services/agent/manager/exec.h | 34 +++ .../services/agent/manager/exec_test.cc | 91 +++++++ 6 files changed, 243 insertions(+), 159 deletions(-) create mode 100644 src/vizier/services/agent/manager/exec_test.cc diff --git a/src/vizier/messages/messagespb/messages.pb.go b/src/vizier/messages/messagespb/messages.pb.go index f24564ffa..bf57cf1ec 100755 --- a/src/vizier/messages/messagespb/messages.pb.go +++ b/src/vizier/messages/messagespb/messages.pb.go @@ -834,10 +834,9 @@ func (m *HeartbeatNack) XXX_DiscardUnknown() { var xxx_messageInfo_HeartbeatNack proto.InternalMessageInfo type ExecuteQueryRequest struct { - QueryID *proto1.UUID `protobuf:"bytes,1,opt,name=query_id,json=queryId,proto3" json:"query_id,omitempty"` - QueryStr string `protobuf:"bytes,2,opt,name=query_str,json=queryStr,proto3" json:"query_str,omitempty"` - Plan *planpb.Plan `protobuf:"bytes,3,opt,name=plan,proto3" json:"plan,omitempty"` - Analyze bool `protobuf:"varint,4,opt,name=analyze,proto3" json:"analyze,omitempty"` + QueryID *proto1.UUID `protobuf:"bytes,1,opt,name=query_id,json=queryId,proto3" json:"query_id,omitempty"` + Plan *planpb.Plan `protobuf:"bytes,3,opt,name=plan,proto3" json:"plan,omitempty"` + Analyze bool `protobuf:"varint,4,opt,name=analyze,proto3" json:"analyze,omitempty"` } func (m *ExecuteQueryRequest) Reset() { *m = ExecuteQueryRequest{} } @@ -879,13 +878,6 @@ func (m *ExecuteQueryRequest) GetQueryID() *proto1.UUID { return nil } -func (m *ExecuteQueryRequest) GetQueryStr() string { - if m != nil { - return m.QueryStr - } - return "" -} - func (m *ExecuteQueryRequest) GetPlan() *planpb.Plan { if m != nil { return m.Plan @@ -1085,96 +1077,95 @@ func init() { } var fileDescriptor_0046fd1b9991f89c = []byte{ - // 1416 bytes of a gzipped FileDescriptorProto + // 1404 bytes of a gzipped FileDescriptorProto 0x1f, 0x8b, 0x08, 0x00, 0x00, 0x00, 0x00, 0x00, 0x02, 0xff, 0xcc, 0x57, 0x4d, 0x6f, 0x13, 0xc7, 0x1b, 0xf7, 0xda, 0x4e, 0xe2, 0x4c, 0xde, 0x60, 0x92, 0x80, 0x79, 0xf9, 0x3b, 0xf9, 0xbb, 0x6a, 0x09, 0x50, 0x76, 0xdb, 0x54, 0x15, 0x48, 0xb4, 0x54, 0x38, 0xae, 0x48, 0x50, 0x41, 0x30, 0x01, 0x0e, 0x48, 0xd5, 0x76, 0x76, 0x77, 0xe2, 0x8c, 0x62, 0xef, 0x2e, 0x33, 0xb3, 0x88, 0x70, 0xea, - 0x47, 0xe8, 0xa1, 0xa7, 0xaa, 0xb7, 0x4a, 0x55, 0xaf, 0x3d, 0xf5, 0x23, 0xb4, 0x47, 0x8e, 0x39, - 0x45, 0xc5, 0x5c, 0x7a, 0xe4, 0x23, 0x54, 0xf3, 0xb2, 0xeb, 0xb5, 0xbd, 0x49, 0x68, 0x4f, 0x3d, - 0x79, 0xe6, 0xd9, 0xdf, 0xf3, 0x9b, 0x99, 0xdf, 0x3c, 0x2f, 0x63, 0x70, 0x8d, 0x33, 0xdf, 0x79, - 0x4e, 0x5f, 0x52, 0xc2, 0x9c, 0x1e, 0xe1, 0x1c, 0x77, 0x08, 0xcf, 0x06, 0xb1, 0x97, 0x0d, 0xed, - 0x98, 0x45, 0x22, 0x82, 0x30, 0xee, 0xda, 0x1a, 0x6d, 0xa7, 0x5f, 0xce, 0x5f, 0xeb, 0x50, 0xb1, - 0x9b, 0x78, 0xb6, 0x1f, 0xf5, 0x9c, 0x4e, 0xd4, 0x89, 0x1c, 0x05, 0xf5, 0x92, 0x1d, 0x35, 0x53, - 0x13, 0x35, 0xd2, 0x14, 0xe7, 0xaf, 0xcb, 0x15, 0x7d, 0xcc, 0xc2, 0x48, 0x38, 0x71, 0x17, 0x87, - 0x21, 0x61, 0x4e, 0x40, 0xb9, 0x60, 0xd4, 0x4b, 0x04, 0x09, 0x62, 0x2f, 0x3f, 0x73, 0x25, 0xc2, - 0x38, 0x5e, 0x1c, 0x71, 0x8c, 0x3d, 0x27, 0xf7, 0x75, 0x55, 0x7d, 0x8d, 0x7a, 0xbd, 0x28, 0x74, - 0x92, 0x84, 0x06, 0x7a, 0x17, 0x6a, 0x68, 0x10, 0x6b, 0x12, 0xc1, 0x77, 0x31, 0x23, 0x81, 0xb3, - 0x77, 0x43, 0x9e, 0x52, 0xe0, 0x00, 0x0b, 0xac, 0x4e, 0xa9, 0x87, 0x06, 0x69, 0xe7, 0x44, 0xe1, - 0x84, 0x3d, 0xa7, 0x3e, 0xe1, 0xa9, 0x27, 0xee, 0x90, 0x50, 0xc4, 0x9e, 0xfe, 0x35, 0xf8, 0x8f, - 0x8a, 0xf0, 0x29, 0xa7, 0xc3, 0x45, 0xc4, 0x48, 0xec, 0xe9, 0xdf, 0xbc, 0x08, 0x5c, 0x50, 0xd6, - 0xa5, 0x61, 0xc7, 0x09, 0xf6, 0x43, 0xdc, 0xa3, 0xbe, 0x2b, 0x18, 0xf6, 0xe5, 0x9c, 0x32, 0xa7, - 0x1b, 0x75, 0xa8, 0x8f, 0xbb, 0xb1, 0x97, 0x8e, 0x8c, 0x63, 0x33, 0x77, 0x4c, 0x0f, 0x73, 0x62, - 0x8e, 0xc9, 0x05, 0x16, 0x89, 0xb9, 0xa4, 0xe6, 0x4f, 0x93, 0x60, 0xee, 0x89, 0xda, 0xcd, 0x3d, - 0x7d, 0x47, 0xf0, 0x1b, 0x70, 0x86, 0x91, 0x0e, 0xe5, 0x82, 0x30, 0x57, 0x6d, 0xdc, 0x65, 0xe4, - 0x59, 0x42, 0xb8, 0xa8, 0x5b, 0xab, 0xd6, 0xda, 0xcc, 0xfa, 0x9a, 0x3d, 0x7e, 0xaf, 0x36, 0x32, - 0x1e, 0xb7, 0xa5, 0x03, 0xd2, 0xf8, 0xcd, 0x12, 0x5a, 0x62, 0x05, 0x76, 0xe8, 0x83, 0xb3, 0x63, - 0x2b, 0xf0, 0x38, 0x0a, 0x39, 0xa9, 0x97, 0xd5, 0x12, 0x97, 0xdf, 0x61, 0x09, 0xed, 0xb0, 0x59, - 0x42, 0xcb, 0xac, 0xe8, 0x03, 0x7c, 0x0a, 0x96, 0x92, 0x38, 0xc0, 0x82, 0x8c, 0x1c, 0xa2, 0xa2, - 0x56, 0xf8, 0xa0, 0x68, 0x85, 0xc7, 0x0a, 0x3f, 0x72, 0x04, 0x98, 0x8c, 0x59, 0xe1, 0xd7, 0x60, - 0x79, 0x84, 0xdb, 0x6c, 0xbf, 0xaa, 0xc8, 0x2f, 0x9d, 0x48, 0x9e, 0x6d, 0x7e, 0x31, 0x19, 0x37, - 0xc3, 0xcf, 0xc1, 0xf4, 0x2e, 0xc1, 0x4c, 0x78, 0x04, 0x8b, 0xfa, 0x84, 0xa2, 0xfc, 0x5f, 0x11, - 0xe5, 0x66, 0x0a, 0xda, 0x2c, 0xa1, 0x81, 0x07, 0xbc, 0x03, 0xe6, 0xb2, 0x89, 0x8b, 0xfd, 0xbd, - 0xfa, 0xa4, 0xa2, 0x58, 0x3d, 0x96, 0xe2, 0xb6, 0xbf, 0xb7, 0x59, 0x42, 0xb3, 0xbb, 0xb9, 0x39, - 0xbc, 0x0b, 0xe6, 0x07, 0x44, 0xa1, 0x64, 0x9a, 0x52, 0x4c, 0xff, 0x3f, 0x96, 0xe9, 0x3e, 0x56, - 0x54, 0x83, 0x3d, 0x48, 0x83, 0x94, 0x8c, 0xbc, 0x20, 0x7e, 0x22, 0x88, 0xfb, 0x2c, 0x21, 0x6c, - 0x3f, 0xbb, 0x8f, 0xda, 0xd1, 0x92, 0x7d, 0xa9, 0x1d, 0x1e, 0x4a, 0xfc, 0xe0, 0x42, 0x16, 0xc9, - 0xb8, 0x19, 0x3e, 0x01, 0x50, 0x26, 0x04, 0x89, 0x23, 0x1a, 0x0a, 0xd7, 0x30, 0xd4, 0x81, 0xe2, - 0x7e, 0xbf, 0x88, 0xfb, 0x51, 0x86, 0x36, 0x71, 0xbf, 0x59, 0x42, 0xa7, 0xc5, 0xa8, 0xb1, 0x35, - 0x01, 0x2a, 0x3d, 0xde, 0x69, 0x1e, 0x94, 0xc1, 0xe9, 0x31, 0x0f, 0x99, 0x29, 0xb9, 0x45, 0x69, - 0xb8, 0x13, 0xb9, 0xfa, 0x3a, 0x8f, 0xcb, 0x94, 0x01, 0xcd, 0x56, 0xb8, 0x13, 0xe9, 0xa8, 0x90, - 0x99, 0x22, 0x0a, 0xec, 0x90, 0x82, 0x73, 0x8c, 0xf4, 0xa2, 0xe7, 0xc4, 0xcd, 0x2d, 0x94, 0x2a, - 0xa7, 0x73, 0xe5, 0x6a, 0x71, 0xae, 0x48, 0xa7, 0xc1, 0x52, 0x03, 0xf5, 0xce, 0xb2, 0xe2, 0x4f, - 0x30, 0x02, 0x17, 0xb2, 0xa4, 0x2c, 0x58, 0x4c, 0xa7, 0xcd, 0xb5, 0xe3, 0x12, 0xb3, 0x68, 0xb9, - 0x73, 0xec, 0xa8, 0x8f, 0xa9, 0xb4, 0x3f, 0x5a, 0x60, 0xa9, 0xa8, 0x7a, 0xc0, 0x5b, 0xa0, 0x2a, - 0x25, 0x35, 0x5a, 0x5e, 0xc9, 0xad, 0x9c, 0x96, 0x4d, 0x5b, 0x97, 0x59, 0x5b, 0x97, 0x57, 0xe5, - 0x2c, 0xb5, 0x43, 0xca, 0x0f, 0xb6, 0xc1, 0x8c, 0x49, 0x52, 0x45, 0xa3, 0xd5, 0x7a, 0xaf, 0xe8, - 0x00, 0xca, 0x53, 0x2b, 0xae, 0xfc, 0x41, 0x92, 0x8d, 0x9b, 0x37, 0xc1, 0x72, 0x61, 0xe1, 0x81, - 0x17, 0x41, 0x15, 0x73, 0x1a, 0xa8, 0xed, 0xcd, 0xb5, 0x6a, 0xfd, 0xc3, 0x95, 0xea, 0xed, 0xed, - 0xad, 0x36, 0x52, 0xd6, 0xbb, 0xd5, 0x5a, 0xf9, 0x54, 0xa5, 0xf9, 0x83, 0x05, 0xe0, 0x78, 0x51, - 0xf9, 0x8f, 0x9c, 0x6c, 0x19, 0x2c, 0x16, 0xd4, 0xa4, 0xa6, 0x0f, 0xe6, 0x94, 0xa1, 0x8d, 0x05, - 0x96, 0x38, 0x88, 0xc0, 0x5c, 0xda, 0x9e, 0xdc, 0xdc, 0xb6, 0x55, 0x28, 0xe8, 0x0e, 0x6b, 0x9b, - 0xd6, 0x6c, 0x0f, 0xb5, 0x66, 0xfb, 0x9e, 0xf1, 0x52, 0x2b, 0xcf, 0xf6, 0x72, 0xb3, 0xe6, 0xcf, - 0x15, 0xb0, 0x30, 0xb2, 0x37, 0x78, 0x0b, 0x4c, 0x72, 0x7f, 0x97, 0xf4, 0x70, 0xbd, 0xbc, 0x5a, - 0x19, 0x29, 0xd1, 0x99, 0x2e, 0x59, 0xf3, 0x7d, 0x84, 0xbd, 0xae, 0x3e, 0x93, 0xf1, 0x82, 0x5f, - 0x80, 0xa9, 0x98, 0x45, 0x3e, 0xe1, 0xbc, 0x5e, 0x51, 0x04, 0x2a, 0xef, 0x8d, 0x90, 0x7b, 0x37, - 0x06, 0xbe, 0xb1, 0x67, 0x3f, 0xd0, 0x40, 0xe5, 0x9f, 0x7a, 0xc1, 0x87, 0x60, 0xc1, 0x0c, 0x5d, - 0x9f, 0x11, 0x2c, 0x48, 0x50, 0xaf, 0x2a, 0xa2, 0xb5, 0x13, 0x89, 0x36, 0x34, 0x1e, 0xcd, 0xc7, - 0x43, 0x73, 0xf8, 0x14, 0xc0, 0x94, 0x52, 0x10, 0xd6, 0xa3, 0xa1, 0x62, 0x9d, 0x50, 0xac, 0x57, - 0x4f, 0x64, 0x7d, 0x94, 0xb9, 0xa0, 0xd3, 0xf1, 0xa8, 0x09, 0x7e, 0x08, 0x60, 0x10, 0x11, 0x6e, - 0x4a, 0x8e, 0x6b, 0xb4, 0x93, 0xb5, 0xbe, 0x86, 0x4e, 0xc9, 0x2f, 0x5a, 0xdb, 0x6d, 0xad, 0xce, - 0xa7, 0xa0, 0x2a, 0xc9, 0x8f, 0xab, 0xe0, 0x43, 0xd7, 0x8e, 0x14, 0xbc, 0xf9, 0xbb, 0x05, 0xa6, - 0xb3, 0xca, 0x0e, 0xaf, 0x83, 0x9a, 0x6e, 0x78, 0x26, 0xee, 0x67, 0xd6, 0x17, 0x24, 0x91, 0x7c, - 0x37, 0xc5, 0x9e, 0xfd, 0xf8, 0xf1, 0x56, 0xbb, 0x35, 0xd3, 0x3f, 0x5c, 0x99, 0xd2, 0x01, 0xdb, - 0x46, 0x53, 0x0a, 0xbd, 0x15, 0x40, 0x08, 0xaa, 0x82, 0xf6, 0x74, 0x7b, 0xaf, 0x20, 0x35, 0x1e, - 0x8d, 0xe2, 0xca, 0xbf, 0x8a, 0x62, 0x78, 0x09, 0x2c, 0x70, 0x99, 0x56, 0xa1, 0x4f, 0xdc, 0x30, - 0xe9, 0x79, 0x84, 0xa9, 0x26, 0x5c, 0x41, 0xf3, 0xa9, 0xf9, 0xbe, 0xb2, 0x36, 0x7f, 0xb3, 0x00, - 0x4c, 0x23, 0x32, 0x17, 0x75, 0x2d, 0x30, 0xa5, 0xd9, 0x78, 0xdd, 0x3a, 0xe9, 0xb2, 0x11, 0xe1, - 0x51, 0xc2, 0x7c, 0xa2, 0xdd, 0x51, 0xea, 0x08, 0xd7, 0xc1, 0xac, 0x09, 0x50, 0xd7, 0xa7, 0x01, - 0x53, 0xa7, 0x9c, 0x6e, 0x2d, 0xf4, 0x0f, 0x57, 0x66, 0xb6, 0xb5, 0x7d, 0x63, 0xab, 0x8d, 0xd0, - 0x8c, 0x01, 0x6d, 0xd0, 0x80, 0xc1, 0xcb, 0x60, 0x3a, 0x8e, 0x02, 0x85, 0xd7, 0xf1, 0x3a, 0xdd, - 0x9a, 0xed, 0x1f, 0xae, 0xd4, 0x1e, 0x44, 0x81, 0x04, 0x73, 0x54, 0x8b, 0xa3, 0x40, 0x22, 0x79, - 0xf3, 0x7b, 0x0b, 0xcc, 0xe6, 0xfb, 0x74, 0xa6, 0xa6, 0x95, 0x53, 0xb3, 0x40, 0x87, 0x72, 0x91, - 0x0e, 0xf0, 0x4e, 0x91, 0xec, 0x85, 0xcf, 0xa1, 0x71, 0xb5, 0x86, 0xea, 0xc7, 0x02, 0x98, 0x1b, - 0xea, 0xf9, 0xcd, 0x5f, 0x2d, 0xb0, 0x58, 0xd0, 0xb2, 0x65, 0xd4, 0xe8, 0x96, 0x7f, 0x42, 0xd4, - 0x28, 0x27, 0x19, 0x35, 0x0a, 0xbd, 0x15, 0xc0, 0x0b, 0x60, 0x5a, 0x3b, 0x72, 0x61, 0x44, 0x45, - 0x9a, 0x69, 0x5b, 0x30, 0x78, 0x05, 0x54, 0x65, 0xd9, 0x31, 0x07, 0x38, 0x33, 0x52, 0x8d, 0x64, - 0x12, 0x75, 0x71, 0x88, 0x14, 0x06, 0xd6, 0xc1, 0x14, 0x0e, 0x71, 0x77, 0xff, 0xa5, 0x7e, 0xa1, - 0xd5, 0x50, 0x3a, 0x95, 0x51, 0x71, 0xee, 0xc8, 0xfe, 0x05, 0x9f, 0x81, 0xe5, 0x5c, 0x2b, 0x0c, - 0x48, 0xdc, 0x8d, 0xf6, 0x7b, 0x24, 0x4c, 0x5f, 0xc2, 0x9f, 0xa9, 0x50, 0x31, 0x0f, 0x73, 0x7b, - 0xe4, 0x61, 0x6e, 0x53, 0x66, 0xa7, 0xcf, 0xf1, 0x01, 0x73, 0x3b, 0xe3, 0xc8, 0x77, 0xfc, 0x81, - 0x15, 0x5e, 0x02, 0x65, 0x1a, 0x98, 0x92, 0x3e, 0x26, 0xd3, 0x64, 0xff, 0x70, 0xa5, 0xbc, 0xd5, - 0x46, 0x65, 0x1a, 0x34, 0x0f, 0x2c, 0xb0, 0x54, 0xf4, 0x96, 0x30, 0x0c, 0xd6, 0x89, 0x0c, 0xf0, - 0x63, 0x30, 0x21, 0xff, 0x0a, 0xe8, 0xac, 0x9c, 0x5f, 0xbf, 0xa0, 0x4f, 0x23, 0xff, 0x1b, 0xc4, - 0x9e, 0xfd, 0x15, 0xdd, 0x21, 0x1b, 0xfb, 0x7e, 0x97, 0x6c, 0x4b, 0x08, 0xd2, 0x48, 0x78, 0x15, - 0x4c, 0x6a, 0x84, 0x91, 0x7d, 0x71, 0xc8, 0x67, 0x5b, 0x0d, 0x90, 0x81, 0x0c, 0x55, 0x8b, 0xea, - 0x3f, 0xa8, 0x16, 0xcd, 0x16, 0x38, 0x7b, 0xc4, 0x03, 0xe6, 0x9d, 0x0f, 0xd7, 0x12, 0xaf, 0x5e, - 0x37, 0x4a, 0x07, 0xaf, 0x1b, 0xa5, 0xb7, 0xaf, 0x1b, 0xd6, 0xb7, 0xfd, 0x86, 0xf5, 0x4b, 0xbf, - 0x61, 0xfd, 0xd1, 0x6f, 0x58, 0xaf, 0xfa, 0x0d, 0xeb, 0xcf, 0x7e, 0xc3, 0xfa, 0xab, 0xdf, 0x28, - 0xbd, 0xed, 0x37, 0xac, 0xef, 0xde, 0x34, 0x4a, 0xaf, 0xde, 0x34, 0x4a, 0x07, 0x6f, 0x1a, 0xa5, - 0xa7, 0xb7, 0x62, 0xfa, 0x82, 0x92, 0x2e, 0xf6, 0xb8, 0x8d, 0xa9, 0x93, 0x4d, 0x9c, 0xe3, 0xff, - 0xf8, 0xde, 0x4c, 0x87, 0xde, 0xa4, 0xfa, 0x53, 0xf5, 0xc9, 0xdf, 0x01, 0x00, 0x00, 0xff, 0xff, - 0x0e, 0x77, 0x9c, 0x01, 0x2a, 0x0f, 0x00, 0x00, + 0x47, 0xe8, 0xa1, 0xa7, 0xaa, 0xb7, 0x4a, 0x55, 0xbf, 0x01, 0x1f, 0xa1, 0x3d, 0x72, 0xcc, 0x29, + 0x2a, 0xe6, 0xd2, 0x23, 0x1f, 0xa1, 0x9a, 0x97, 0x5d, 0xaf, 0xed, 0x4d, 0x42, 0x7b, 0xea, 0xc9, + 0x33, 0xcf, 0xfe, 0x9e, 0xdf, 0xcc, 0xfc, 0xe6, 0x79, 0x19, 0x83, 0x6b, 0x9c, 0xf9, 0xce, 0x73, + 0xfa, 0x92, 0x12, 0xe6, 0xf4, 0x08, 0xe7, 0xb8, 0x43, 0x78, 0x36, 0x88, 0xbd, 0x6c, 0x68, 0xc7, + 0x2c, 0x12, 0x11, 0x84, 0x71, 0xd7, 0xd6, 0x68, 0x3b, 0xfd, 0x72, 0xfe, 0x5a, 0x87, 0x8a, 0xdd, + 0xc4, 0xb3, 0xfd, 0xa8, 0xe7, 0x74, 0xa2, 0x4e, 0xe4, 0x28, 0xa8, 0x97, 0xec, 0xa8, 0x99, 0x9a, + 0xa8, 0x91, 0xa6, 0x38, 0x7f, 0x5d, 0xae, 0xe8, 0x63, 0x16, 0x46, 0xc2, 0x89, 0xbb, 0x38, 0x0c, + 0x09, 0x73, 0x02, 0xca, 0x05, 0xa3, 0x5e, 0x22, 0x48, 0x10, 0x7b, 0xf9, 0x99, 0x2b, 0x11, 0xc6, + 0xf1, 0xe2, 0x88, 0x63, 0xec, 0x39, 0xb9, 0xaf, 0xab, 0xea, 0x6b, 0xd4, 0xeb, 0x45, 0xa1, 0x93, + 0x24, 0x34, 0xd0, 0xbb, 0x50, 0x43, 0x83, 0x58, 0x93, 0x08, 0xbe, 0x8b, 0x19, 0x09, 0x9c, 0xbd, + 0x1b, 0xf2, 0x94, 0x02, 0x07, 0x58, 0x60, 0x75, 0x4a, 0x3d, 0x34, 0x48, 0x3b, 0x27, 0x0a, 0x27, + 0xec, 0x39, 0xf5, 0x09, 0x4f, 0x3d, 0x71, 0x87, 0x84, 0x22, 0xf6, 0xf4, 0xaf, 0xc1, 0x7f, 0x52, + 0x84, 0x4f, 0x39, 0x1d, 0x2e, 0x22, 0x46, 0x62, 0x4f, 0xff, 0xe6, 0x45, 0xe0, 0x82, 0xb2, 0x2e, + 0x0d, 0x3b, 0x4e, 0xb0, 0x1f, 0xe2, 0x1e, 0xf5, 0x5d, 0xc1, 0xb0, 0x2f, 0xe7, 0x94, 0x39, 0xdd, + 0xa8, 0x43, 0x7d, 0xdc, 0x8d, 0xbd, 0x74, 0x64, 0x1c, 0x9b, 0xb9, 0x63, 0x7a, 0x98, 0x13, 0x73, + 0x4c, 0x2e, 0xb0, 0x48, 0xcc, 0x25, 0x35, 0x7f, 0x99, 0x04, 0x73, 0x4f, 0xd4, 0x6e, 0xee, 0xe9, + 0x3b, 0x82, 0xdf, 0x81, 0x33, 0x8c, 0x74, 0x28, 0x17, 0x84, 0xb9, 0x6a, 0xe3, 0x2e, 0x23, 0xcf, + 0x12, 0xc2, 0x45, 0xdd, 0x5a, 0xb5, 0xd6, 0x66, 0xd6, 0xd7, 0xec, 0xf1, 0x7b, 0xb5, 0x91, 0xf1, + 0xb8, 0x2d, 0x1d, 0x90, 0xc6, 0x6f, 0x96, 0xd0, 0x12, 0x2b, 0xb0, 0x43, 0x1f, 0x9c, 0x1d, 0x5b, + 0x81, 0xc7, 0x51, 0xc8, 0x49, 0xbd, 0xac, 0x96, 0xb8, 0xfc, 0x1e, 0x4b, 0x68, 0x87, 0xcd, 0x12, + 0x5a, 0x66, 0x45, 0x1f, 0xe0, 0x53, 0xb0, 0x94, 0xc4, 0x01, 0x16, 0x64, 0xe4, 0x10, 0x15, 0xb5, + 0xc2, 0x47, 0x45, 0x2b, 0x3c, 0x56, 0xf8, 0x91, 0x23, 0xc0, 0x64, 0xcc, 0x0a, 0xbf, 0x05, 0xcb, + 0x23, 0xdc, 0x66, 0xfb, 0x55, 0x45, 0x7e, 0xe9, 0x44, 0xf2, 0x6c, 0xf3, 0x8b, 0xc9, 0xb8, 0x19, + 0x7e, 0x09, 0xa6, 0x77, 0x09, 0x66, 0xc2, 0x23, 0x58, 0xd4, 0x27, 0x14, 0xe5, 0xff, 0x8a, 0x28, + 0x37, 0x53, 0xd0, 0x66, 0x09, 0x0d, 0x3c, 0xe0, 0x1d, 0x30, 0x97, 0x4d, 0x5c, 0xec, 0xef, 0xd5, + 0x27, 0x15, 0xc5, 0xea, 0xb1, 0x14, 0xb7, 0xfd, 0xbd, 0xcd, 0x12, 0x9a, 0xdd, 0xcd, 0xcd, 0xe1, + 0x5d, 0x30, 0x3f, 0x20, 0x0a, 0x25, 0xd3, 0x94, 0x62, 0xfa, 0xff, 0xb1, 0x4c, 0xf7, 0xb1, 0xa2, + 0x1a, 0xec, 0x41, 0x1a, 0xa4, 0x64, 0xe4, 0x05, 0xf1, 0x13, 0x41, 0xdc, 0x67, 0x09, 0x61, 0xfb, + 0xd9, 0x7d, 0xd4, 0x8e, 0x96, 0xec, 0x6b, 0xed, 0xf0, 0x50, 0xe2, 0x07, 0x17, 0xb2, 0x48, 0xc6, + 0xcd, 0xf0, 0x09, 0x80, 0x32, 0x21, 0x48, 0x1c, 0xd1, 0x50, 0xb8, 0x86, 0xa1, 0x0e, 0x14, 0xf7, + 0x87, 0x45, 0xdc, 0x8f, 0x32, 0xb4, 0x89, 0xfb, 0xcd, 0x12, 0x3a, 0x2d, 0x46, 0x8d, 0xad, 0x09, + 0x50, 0xe9, 0xf1, 0x4e, 0xf3, 0xa0, 0x0c, 0x4e, 0x8f, 0x79, 0xc8, 0x4c, 0xc9, 0x2d, 0x4a, 0xc3, + 0x9d, 0xc8, 0xd5, 0xd7, 0x79, 0x5c, 0xa6, 0x0c, 0x68, 0xb6, 0xc2, 0x9d, 0x48, 0x47, 0x85, 0xcc, + 0x14, 0x51, 0x60, 0x87, 0x14, 0x9c, 0x63, 0xa4, 0x17, 0x3d, 0x27, 0x6e, 0x6e, 0xa1, 0x54, 0x39, + 0x9d, 0x2b, 0x57, 0x8b, 0x73, 0x45, 0x3a, 0x0d, 0x96, 0x1a, 0xa8, 0x77, 0x96, 0x15, 0x7f, 0x82, + 0x11, 0xb8, 0x90, 0x25, 0x65, 0xc1, 0x62, 0x3a, 0x6d, 0xae, 0x1d, 0x97, 0x98, 0x45, 0xcb, 0x9d, + 0x63, 0x47, 0x7d, 0x4c, 0xa5, 0xfd, 0xd9, 0x02, 0x4b, 0x45, 0xd5, 0x03, 0xde, 0x02, 0x55, 0x29, + 0xa9, 0xd1, 0xf2, 0x4a, 0x6e, 0xe5, 0xb4, 0x6c, 0xda, 0xba, 0xcc, 0xda, 0xba, 0xbc, 0x2a, 0x67, + 0xa9, 0x1d, 0x52, 0x7e, 0xb0, 0x0d, 0x66, 0x4c, 0x92, 0x2a, 0x1a, 0xad, 0xd6, 0x07, 0x45, 0x07, + 0x50, 0x9e, 0x5a, 0x71, 0xe5, 0x0f, 0x92, 0x6c, 0xdc, 0xbc, 0x09, 0x96, 0x0b, 0x0b, 0x0f, 0xbc, + 0x08, 0xaa, 0x98, 0xd3, 0x40, 0x6d, 0x6f, 0xae, 0x55, 0xeb, 0x1f, 0xae, 0x54, 0x6f, 0x6f, 0x6f, + 0xb5, 0x91, 0xb2, 0xde, 0xad, 0xd6, 0xca, 0xa7, 0x2a, 0xcd, 0x9f, 0x2c, 0x00, 0xc7, 0x8b, 0xca, + 0x7f, 0xe4, 0x64, 0xcb, 0x60, 0xb1, 0xa0, 0x26, 0x35, 0x7d, 0x30, 0xa7, 0x0c, 0x6d, 0x2c, 0xb0, + 0xc4, 0x41, 0x04, 0xe6, 0xd2, 0xf6, 0xe4, 0xe6, 0xb6, 0xad, 0x42, 0x41, 0x77, 0x58, 0xdb, 0xb4, + 0x66, 0x7b, 0xa8, 0x35, 0xdb, 0xf7, 0x8c, 0x97, 0x5a, 0x79, 0xb6, 0x97, 0x9b, 0x35, 0x7f, 0xad, + 0x80, 0x85, 0x91, 0xbd, 0xc1, 0x5b, 0x60, 0x92, 0xfb, 0xbb, 0xa4, 0x87, 0xeb, 0xe5, 0xd5, 0xca, + 0x48, 0x89, 0xce, 0x74, 0xc9, 0x9a, 0xef, 0x23, 0xec, 0x75, 0xf5, 0x99, 0x8c, 0x17, 0xfc, 0x0a, + 0x4c, 0xc5, 0x2c, 0xf2, 0x09, 0xe7, 0xf5, 0x8a, 0x22, 0x50, 0x79, 0x6f, 0x84, 0xdc, 0xbb, 0x31, + 0xf0, 0x8d, 0x3d, 0xfb, 0x81, 0x06, 0x2a, 0xff, 0xd4, 0x0b, 0x3e, 0x04, 0x0b, 0x66, 0xe8, 0xfa, + 0x8c, 0x60, 0x41, 0x82, 0x7a, 0x55, 0x11, 0xad, 0x9d, 0x48, 0xb4, 0xa1, 0xf1, 0x68, 0x3e, 0x1e, + 0x9a, 0xc3, 0xa7, 0x00, 0xa6, 0x94, 0x82, 0xb0, 0x1e, 0x0d, 0x15, 0xeb, 0x84, 0x62, 0xbd, 0x7a, + 0x22, 0xeb, 0xa3, 0xcc, 0x05, 0x9d, 0x8e, 0x47, 0x4d, 0xf0, 0x63, 0x00, 0x83, 0x88, 0x70, 0x53, + 0x72, 0x5c, 0xa3, 0x9d, 0xac, 0xf5, 0x35, 0x74, 0x4a, 0x7e, 0xd1, 0xda, 0x6e, 0x6b, 0x75, 0x3e, + 0x07, 0x55, 0x49, 0x7e, 0x5c, 0x05, 0x1f, 0xba, 0x76, 0xa4, 0xe0, 0xcd, 0xdf, 0x2d, 0x30, 0x9d, + 0x55, 0x76, 0x78, 0x1d, 0xd4, 0x74, 0xc3, 0x33, 0x71, 0x3f, 0xb3, 0xbe, 0x20, 0x89, 0xe4, 0xbb, + 0x29, 0xf6, 0xec, 0xc7, 0x8f, 0xb7, 0xda, 0xad, 0x99, 0xfe, 0xe1, 0xca, 0x94, 0x0e, 0xd8, 0x36, + 0x9a, 0x52, 0xe8, 0xad, 0x00, 0x42, 0x50, 0x15, 0xb4, 0xa7, 0xdb, 0x7b, 0x05, 0xa9, 0xf1, 0x68, + 0x14, 0x57, 0xfe, 0x55, 0x14, 0xc3, 0x4b, 0x60, 0x81, 0xcb, 0xb4, 0x0a, 0x7d, 0xe2, 0x86, 0x49, + 0xcf, 0x23, 0x4c, 0x35, 0xe1, 0x0a, 0x9a, 0x4f, 0xcd, 0xf7, 0x95, 0xb5, 0xf9, 0xca, 0x02, 0x30, + 0x8d, 0xc8, 0x5c, 0xd4, 0xb5, 0xc0, 0x94, 0x66, 0xe3, 0x75, 0xeb, 0xa4, 0xcb, 0x46, 0x84, 0x47, + 0x09, 0xf3, 0x89, 0x76, 0x47, 0xa9, 0x23, 0x5c, 0x07, 0xb3, 0x26, 0x40, 0x5d, 0x9f, 0x06, 0x4c, + 0x9d, 0x72, 0xba, 0xb5, 0xd0, 0x3f, 0x5c, 0x99, 0xd9, 0xd6, 0xf6, 0x8d, 0xad, 0x36, 0x42, 0x33, + 0x06, 0xb4, 0x41, 0x03, 0x06, 0x2f, 0x83, 0xe9, 0x38, 0x0a, 0x14, 0x5e, 0xc7, 0xeb, 0x74, 0x6b, + 0xb6, 0x7f, 0xb8, 0x52, 0x7b, 0x10, 0x05, 0x12, 0xcc, 0x51, 0x2d, 0x8e, 0x02, 0x89, 0xe4, 0xcd, + 0x1f, 0x2d, 0x30, 0x9b, 0xef, 0xd3, 0x99, 0x9a, 0x56, 0x4e, 0xcd, 0x02, 0x1d, 0xca, 0x45, 0x3a, + 0xc0, 0x3b, 0x45, 0xb2, 0x17, 0x3e, 0x87, 0xc6, 0xd5, 0x1a, 0xaa, 0x1f, 0x0b, 0x60, 0x6e, 0xa8, + 0xe7, 0xcb, 0x6a, 0xb7, 0x58, 0xd0, 0xb2, 0x65, 0xd4, 0xe8, 0x96, 0x7f, 0x42, 0xd4, 0x28, 0x27, + 0x19, 0x35, 0x0a, 0xbd, 0x15, 0xc0, 0x2b, 0xa0, 0x2a, 0x2b, 0x8b, 0xd9, 0xe3, 0x99, 0x91, 0x82, + 0x23, 0xf3, 0xa4, 0x8b, 0x43, 0xa4, 0x30, 0xb0, 0x0e, 0xa6, 0x70, 0x88, 0xbb, 0xfb, 0x2f, 0xf5, + 0x23, 0xac, 0x86, 0xd2, 0xa9, 0x29, 0xc5, 0xaf, 0x2c, 0x70, 0xee, 0xc8, 0x46, 0x05, 0x9f, 0x81, + 0xe5, 0x5c, 0xcf, 0x0b, 0x48, 0xdc, 0x8d, 0xf6, 0x7b, 0x24, 0x4c, 0x9f, 0xbc, 0x5f, 0xa8, 0x98, + 0x30, 0x2f, 0x70, 0x7b, 0xe4, 0x05, 0x6e, 0x53, 0x66, 0xa7, 0xef, 0xee, 0x01, 0x73, 0x3b, 0xe3, + 0xc8, 0xb7, 0xf6, 0x81, 0x15, 0x5e, 0x02, 0x65, 0x1a, 0x98, 0xda, 0x3d, 0xa6, 0xc7, 0x64, 0xff, + 0x70, 0xa5, 0xbc, 0xd5, 0x46, 0x65, 0x1a, 0x34, 0x0f, 0x2c, 0xb0, 0x54, 0xf4, 0x68, 0x30, 0x0c, + 0xd6, 0x89, 0x0c, 0xf0, 0x53, 0x30, 0x21, 0xdf, 0xfc, 0x3a, 0xfd, 0xe6, 0xd7, 0x2f, 0xe8, 0xd3, + 0xc8, 0x3f, 0x01, 0xb1, 0x67, 0x7f, 0x43, 0x77, 0xc8, 0xc6, 0xbe, 0xdf, 0x25, 0xdb, 0x12, 0x82, + 0x34, 0x12, 0x5e, 0x05, 0x93, 0x1a, 0x61, 0xc4, 0x5f, 0x1c, 0xf2, 0xd9, 0x56, 0x03, 0x64, 0x20, + 0x43, 0x65, 0xa1, 0xfa, 0x0f, 0xca, 0x42, 0xb3, 0x05, 0xce, 0x1e, 0xf1, 0x52, 0x79, 0xef, 0xc3, + 0xb5, 0xc4, 0xeb, 0x37, 0x8d, 0xd2, 0xc1, 0x9b, 0x46, 0xe9, 0xdd, 0x9b, 0x86, 0xf5, 0x7d, 0xbf, + 0x61, 0xfd, 0xd6, 0x6f, 0x58, 0x7f, 0xf4, 0x1b, 0xd6, 0xeb, 0x7e, 0xc3, 0xfa, 0xb3, 0xdf, 0xb0, + 0xfe, 0xea, 0x37, 0x4a, 0xef, 0xfa, 0x0d, 0xeb, 0x87, 0xb7, 0x8d, 0xd2, 0xeb, 0xb7, 0x8d, 0xd2, + 0xc1, 0xdb, 0x46, 0xe9, 0xe9, 0xad, 0x98, 0xbe, 0xa0, 0xa4, 0x8b, 0x3d, 0x6e, 0x63, 0xea, 0x64, + 0x13, 0xe7, 0xf8, 0x7f, 0xb8, 0x37, 0xd3, 0xa1, 0x37, 0xa9, 0xfe, 0x3d, 0x7d, 0xf6, 0x77, 0x00, + 0x00, 0x00, 0xff, 0xff, 0xee, 0xa4, 0xfa, 0x4c, 0x13, 0x0f, 0x00, 0x00, } func (this *VizierMessage) Equal(that interface{}) bool { @@ -1853,9 +1844,6 @@ func (this *ExecuteQueryRequest) Equal(that interface{}) bool { if !this.QueryID.Equal(that1.QueryID) { return false } - if this.QueryStr != that1.QueryStr { - return false - } if !this.Plan.Equal(that1.Plan) { return false } @@ -2212,12 +2200,11 @@ func (this *ExecuteQueryRequest) GoString() string { if this == nil { return "nil" } - s := make([]string, 0, 8) + s := make([]string, 0, 7) s = append(s, "&messages.ExecuteQueryRequest{") if this.QueryID != nil { s = append(s, "QueryID: "+fmt.Sprintf("%#v", this.QueryID)+",\n") } - s = append(s, "QueryStr: "+fmt.Sprintf("%#v", this.QueryStr)+",\n") if this.Plan != nil { s = append(s, "Plan: "+fmt.Sprintf("%#v", this.Plan)+",\n") } @@ -3096,13 +3083,6 @@ func (m *ExecuteQueryRequest) MarshalToSizedBuffer(dAtA []byte) (int, error) { i-- dAtA[i] = 0x1a } - if len(m.QueryStr) > 0 { - i -= len(m.QueryStr) - copy(dAtA[i:], m.QueryStr) - i = encodeVarintMessages(dAtA, i, uint64(len(m.QueryStr))) - i-- - dAtA[i] = 0x12 - } if m.QueryID != nil { { size, err := m.QueryID.MarshalToSizedBuffer(dAtA[:i]) @@ -3637,10 +3617,6 @@ func (m *ExecuteQueryRequest) Size() (n int) { l = m.QueryID.Size() n += 1 + l + sovMessages(uint64(l)) } - l = len(m.QueryStr) - if l > 0 { - n += 1 + l + sovMessages(uint64(l)) - } if m.Plan != nil { l = m.Plan.Size() n += 1 + l + sovMessages(uint64(l)) @@ -3994,7 +3970,6 @@ func (this *ExecuteQueryRequest) String() string { } s := strings.Join([]string{`&ExecuteQueryRequest{`, `QueryID:` + strings.Replace(fmt.Sprintf("%v", this.QueryID), "UUID", "proto1.UUID", 1) + `,`, - `QueryStr:` + fmt.Sprintf("%v", this.QueryStr) + `,`, `Plan:` + strings.Replace(fmt.Sprintf("%v", this.Plan), "Plan", "planpb.Plan", 1) + `,`, `Analyze:` + fmt.Sprintf("%v", this.Analyze) + `,`, `}`, @@ -5837,38 +5812,6 @@ func (m *ExecuteQueryRequest) Unmarshal(dAtA []byte) error { return err } iNdEx = postIndex - case 2: - if wireType != 2 { - return fmt.Errorf("proto: wrong wireType = %d for field QueryStr", wireType) - } - var stringLen uint64 - for shift := uint(0); ; shift += 7 { - if shift >= 64 { - return ErrIntOverflowMessages - } - if iNdEx >= l { - return io.ErrUnexpectedEOF - } - b := dAtA[iNdEx] - iNdEx++ - stringLen |= uint64(b&0x7F) << shift - if b < 0x80 { - break - } - } - intStringLen := int(stringLen) - if intStringLen < 0 { - return ErrInvalidLengthMessages - } - postIndex := iNdEx + intStringLen - if postIndex < 0 { - return ErrInvalidLengthMessages - } - if postIndex > l { - return io.ErrUnexpectedEOF - } - m.QueryStr = string(dAtA[iNdEx:postIndex]) - iNdEx = postIndex case 3: if wireType != 2 { return fmt.Errorf("proto: wrong wireType = %d for field Plan", wireType) diff --git a/src/vizier/messages/messagespb/messages.proto b/src/vizier/messages/messagespb/messages.proto index 155928c6e..5d88e01c6 100644 --- a/src/vizier/messages/messagespb/messages.proto +++ b/src/vizier/messages/messagespb/messages.proto @@ -112,8 +112,8 @@ message ExecuteQueryRequest { uuidpb.UUID query_id = 1 [ (gogoproto.customname) = "QueryID" ]; - // TODO(philkuz) DEPRECATION planned. - string query_str = 2; + // Formerly used for query string. + reserved 2; pl.carnot.planpb.Plan plan = 3; bool analyze = 4; } diff --git a/src/vizier/services/agent/manager/BUILD.bazel b/src/vizier/services/agent/manager/BUILD.bazel index ab06ee46e..e943c01bb 100644 --- a/src/vizier/services/agent/manager/BUILD.bazel +++ b/src/vizier/services/agent/manager/BUILD.bazel @@ -73,3 +73,15 @@ pl_cc_test( "//src/common/testing/event:cc_library", ], ) + +pl_cc_test( + name = "exec_test", + srcs = ["exec_test.cc"], + deps = [ + ":cc_library", + "//src/common/testing/grpc_utils:cc_library", + "//src/common/event:cc_library", + "//src/common/system:cc_library_mock", + "//src/common/testing/event:cc_library", + ], +) diff --git a/src/vizier/services/agent/manager/exec.cc b/src/vizier/services/agent/manager/exec.cc index 419f274d4..2fde6fec7 100644 --- a/src/vizier/services/agent/manager/exec.cc +++ b/src/vizier/services/agent/manager/exec.cc @@ -57,15 +57,22 @@ class ExecuteQueryMessageHandler::ExecuteQueryTask : public AsyncTask { void Work() override { AgentQueryResultRequest res_req; + auto contains_batch_result_or_s = PlanContainsBatchResults(req_.plan()); + if (!contains_batch_result_or_s.ok()) { + LOG(ERROR) << absl::Substitute("Query failed, reason: $0, plan: $1", + contains_batch_result_or_s.status().msg(), + req_.plan().DebugString()); + } + + bool send_batch_result = qb_stub_ != nullptr && contains_batch_result_or_s.ConsumeValueOrDie(); - auto s = ExecuteQueryInternal((qb_stub_ == nullptr) ? nullptr : res_req.mutable_result()); + auto s = ExecuteQueryInternal(send_batch_result ? res_req.mutable_result() : nullptr); if (!s.ok()) { - LOG(ERROR) << absl::Substitute("Query failed, reason: $0, query: $1", s.ToString(), - req_.query_str()); - LOG(ERROR) << req_.plan().DebugString(); + LOG(ERROR) << absl::Substitute("Query failed, reason: $0, plan: $1", s.ToString(), + req_.plan().DebugString()); } - if (agent_info_->capabilities.collects_data()) { + if (!send_batch_result) { // In distributed mode only non data collecting nodes send data. // TODO(zasgar/philkuz/michelle): We should actually just code in the Querybroker address into // the plan and remove the hardcoding here. @@ -101,16 +108,13 @@ class ExecuteQueryMessageHandler::ExecuteQueryTask : public AsyncTask { { ScopedTimer query_timer(absl::Substitute("query timer: id=$0", query_id_.str())); StatusOr<carnot::CarnotQueryResult> result_or_s; - if (req_.has_plan()) { - result_or_s = carnot_->ExecutePlan(req_.plan(), query_id_, req_.analyze()); - } else { - result_or_s = - carnot_->ExecuteQuery(req_.query_str(), query_id_, CurrentTimeNS(), req_.analyze()); - } + result_or_s = carnot_->ExecutePlan(req_.plan(), query_id_, req_.analyze()); + if (resp == nullptr) { return result_or_s.status(); } + // TODO(nserrino): Deprecate this logic when Kelvin executes all queries in streaming mode. *resp->mutable_query_id() = req_.query_id(); if (!result_or_s.ok()) { *resp->mutable_status() = result_or_s.status().ToProto(); diff --git a/src/vizier/services/agent/manager/exec.h b/src/vizier/services/agent/manager/exec.h index e6190cbb8..0a0a63046 100644 --- a/src/vizier/services/agent/manager/exec.h +++ b/src/vizier/services/agent/manager/exec.h @@ -3,6 +3,7 @@ #include <memory> #include <absl/container/flat_hash_map.h> +#include "src/carnot/plan/plan.h" #include "src/vizier/services/agent/manager/manager.h" #include "src/vizier/services/query_broker/querybrokerpb/service.grpc.pb.h" @@ -48,6 +49,39 @@ class ExecuteQueryMessageHandler : public Manager::MessageHandler { absl::flat_hash_map<sole::uuid, pl::event::RunnableAsyncTaskUPtr> running_queries_; }; +// TODO(nserrino): Delete this function when the batch API is deprecated. +inline StatusOr<bool> PlanContainsBatchResults(const carnot::planpb::Plan& plan_pb) { + auto no_op = [&](const auto&) { return Status::OK(); }; + carnot::plan::Plan plan; + PL_RETURN_IF_ERROR(plan.Init(plan_pb)); + + bool plan_contains_batch_results = false; + + auto s = carnot::plan::PlanWalker() + .OnPlanFragment([&](auto* pf) { + return carnot::plan::PlanFragmentWalker() + .OnMemorySink([&](auto&) { + plan_contains_batch_results = true; + return Status::OK(); + }) + .OnMap(no_op) + .OnAggregate(no_op) + .OnFilter(no_op) + .OnLimit(no_op) + .OnMemorySource(no_op) + .OnUnion(no_op) + .OnJoin(no_op) + .OnGRPCSource(no_op) + .OnGRPCSink(no_op) + .OnUDTFSource(no_op) + .Walk(pf); + }) + .Walk(&plan); + + PL_RETURN_IF_ERROR(s); + return plan_contains_batch_results; +} + } // namespace agent } // namespace vizier } // namespace pl diff --git a/src/vizier/services/agent/manager/exec_test.cc b/src/vizier/services/agent/manager/exec_test.cc new file mode 100644 index 000000000..1d7bbf501 --- /dev/null +++ b/src/vizier/services/agent/manager/exec_test.cc @@ -0,0 +1,91 @@ +#include <gtest/gtest.h> + +#include "src/vizier/services/agent/manager/exec.h" + +#include "src/common/testing/testing.h" + +namespace pl { +namespace vizier { +namespace agent { + +constexpr char kPlanFragmentTmpl[] = R"proto( +dag { + nodes { + id: 1 + } +} +nodes { + id: 1 + dag { + nodes { + id: 1 + sorted_children: 2 + } + nodes { + id: 2 + } + } + nodes { + id: 1 + op { + op_type: MEMORY_SOURCE_OPERATOR + mem_source_op { + name: "cpu" + column_idxs: 0 + column_names: "count" + column_types: INT64 + } + } + } + nodes { + id: 2 + op $0 + } +} +)proto"; + +constexpr char kMemSinkOp[] = R"( +{ + op_type: MEMORY_SINK_OPERATOR + mem_sink_op { + name: "out" + column_names: "count" + column_types: INT64 + column_semantic_types: ST_NONE + } +} +)"; + +constexpr char kGRPCSinkOp[] = R"( +{ + op_type: GRPC_SINK_OPERATOR + grpc_sink_op { + address: "localhost:1234" + grpc_source_id: 0 + } +} +)"; + +TEST(PlanContainsBatchResultsTest, WithBatchResults) { + carnot::planpb::Plan plan_pb; + auto plan_str = absl::Substitute(kPlanFragmentTmpl, kMemSinkOp); + ASSERT_TRUE(google::protobuf::TextFormat::MergeFromString(plan_str, &plan_pb)); + + auto res = PlanContainsBatchResults(plan_pb); + ASSERT_OK(res); + ASSERT_TRUE(res.ConsumeValueOrDie()); +} + +TEST(PlanContainsBatchResultsTest, WithoutBatchResults) { + carnot::planpb::Plan plan_pb; + auto plan_str = absl::Substitute(kPlanFragmentTmpl, kGRPCSinkOp); + ASSERT_TRUE(google::protobuf::TextFormat::MergeFromString(plan_str, &plan_pb)); + + auto res = PlanContainsBatchResults(plan_pb); + ASSERT_OK(res); + ASSERT_FALSE(res.ConsumeValueOrDie()); +} + +} // namespace agent +} // namespace vizier +} // namespace pl -- GitLab