Commit c3a7311e authored by Michelle Nguyen's avatar Michelle Nguyen
Browse files

Metadata index requests should not block agent messages

Summary:
currently, the metadata index requests are blocking the processing of any agent messages. this should only really happen when a cluster starts up, or when cloud sees any metadata is missing.
when metadata receives a message on nats,
it either calls the agentHandler.HandleMessage function, which puts the message on the correct agent channel.
Or, if it is a request for missing metadata for the cloud indexer, it calls the MetadataTopicListener.HandleMessage function, which makes the request to etcd and sends out the response. this needs to be processed in a separate channel.

Test Plan: ran skaffold

Reviewers: zasgar, #engineering

Reviewed By: zasgar, #engineering

Differential Revision: https://phab.corp.pixielabs.ai/D6183

GitOrigin-RevId: 1ac7e757dd36f4e4647efb3d35e4bc1ba88d1af2
parent e4b326d1
Showing with 49 additions and 4 deletions
+49 -4
......@@ -242,6 +242,13 @@ func (a *AgentTopicListener) onAgentTracepointInfoUpdate(m *messages.TracepointI
}
}
// Stop stops processing any agent messages.
func (a *AgentTopicListener) Stop() {
for _, ah := range a.agentMap {
ah.Stop()
}
}
// ProcessMessages handles all of the agent messages for this agent. If it does not receive a heartbeat from the agent
// after a certain amount of time, it will declare the agent dead and perform deletion.
func (ah *AgentHandler) ProcessMessages() {
......
......@@ -8,6 +8,7 @@ import (
// TopicListener handles NATS messages for a specific topic.
type TopicListener interface {
HandleMessage(*nats.Msg) error
Stop()
}
// SendMessageFn is the function the TopicListener uses to publish messages back to NATS.
......@@ -113,4 +114,8 @@ func (mc *MessageBusController) Close() {
mc.conn.Drain()
mc.conn.Close()
for _, tl := range mc.listeners {
tl.Stop()
}
}
......@@ -26,6 +26,8 @@ type MetadataTopicListener struct {
sendMessage SendMessageFn
mds MetadataStore
mh *MetadataHandler
msgCh chan *nats.Msg
quitCh chan bool
}
// NewMetadataTopicListener creates a new metadata topic listener.
......@@ -34,6 +36,8 @@ func NewMetadataTopicListener(mdStore MetadataStore, mdHandler *MetadataHandler,
sendMessage: sendMsgFn,
mds: mdStore,
mh: mdHandler,
msgCh: make(chan *nats.Msg, 1000),
quitCh: make(chan bool),
}
m.mds.UpdateSubscriberResourceVersion(subscriberName, "")
......@@ -41,11 +45,40 @@ func NewMetadataTopicListener(mdStore MetadataStore, mdHandler *MetadataHandler,
// Subscribe to metadata updates.
mdHandler.AddSubscriber(m)
go m.processMessages()
return m, nil
}
// HandleMessage handles a message on the agent topic.
func (m *MetadataTopicListener) HandleMessage(msg *nats.Msg) error {
m.msgCh <- msg
return nil
}
// ProcessMessages processes the metadata requests.
func (m *MetadataTopicListener) processMessages() {
for {
select {
case msg := <-m.msgCh:
err := m.ProcessMessage(msg)
if err != nil {
log.WithError(err).Error("Failed to process metadata message")
}
case <-m.quitCh:
log.Info("Received quit, stopping metadata listener")
return
}
}
}
// Stop stops processing any metadata messages.
func (m *MetadataTopicListener) Stop() {
m.quitCh <- true
}
// ProcessMessage processes a single message in the metadata topic.
func (m *MetadataTopicListener) ProcessMessage(msg *nats.Msg) error {
c2vMsg := &cvmsgspb.C2VMessage{}
err := proto.Unmarshal(msg.Data, c2vMsg)
if err != nil {
......
......@@ -139,7 +139,7 @@ func TestMetadataTopicListener_MetadataSubscriber(t *testing.T) {
assert.Equal(t, update, updatePb.Update)
}
func TestMetadataTopicListener_HandleMessage(t *testing.T) {
func TestMetadataTopicListener_ProcessMessage(t *testing.T) {
// Set up mock.
ctrl := gomock.NewController(t)
defer ctrl.Finish()
......@@ -180,7 +180,7 @@ func TestMetadataTopicListener_HandleMessage(t *testing.T) {
msg := nats.Msg{}
msg.Data = b
err = mdl.HandleMessage(&msg)
err = mdl.ProcessMessage(&msg)
assert.Nil(t, err)
assert.Equal(t, 1, len(updates))
wrapperPb := &cvmsgspb.V2CMessage{}
......@@ -194,7 +194,7 @@ func TestMetadataTopicListener_HandleMessage(t *testing.T) {
}
}
func TestMetadataTopicListener_HandleMessageBatch(t *testing.T) {
func TestMetadataTopicListener_ProcessMessageBatch(t *testing.T) {
tests := []struct {
name string
numUpdates int
......@@ -277,7 +277,7 @@ func TestMetadataTopicListener_HandleMessageBatch(t *testing.T) {
msg := nats.Msg{}
msg.Data = b
err = mdl.HandleMessage(&msg)
err = mdl.ProcessMessage(&msg)
assert.Nil(t, err)
assert.Equal(t, numBatches, test.expectedNumBatches)
......
Supports Markdown
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment