From bce42a376b5fc30bc4f4c1888462c51dd5362d89 Mon Sep 17 00:00:00 2001
From: ethfoo <ethfoo@163.com>
Date: Mon, 11 Jul 2022 15:54:24 +0800
Subject: [PATCH] Fix(grpc source): move private fields from header to meta

---
 Dockerfile               |  2 +-
 pkg/source/grpc/batch.go | 16 +++++++++-------
 2 files changed, 10 insertions(+), 8 deletions(-)

diff --git a/Dockerfile b/Dockerfile
index e88fca3..c4e9e43 100644
--- a/Dockerfile
+++ b/Dockerfile
@@ -8,7 +8,7 @@ ARG TARGETOS
 WORKDIR /
 COPY . .
 # Build
-#RUN CGO_ENABLED=1 go build -mod=vendor -a -o loggie cmd/loggie/main.go
+#RUN make build
 
 RUN if [ "$TARGETARCH" = "arm64" ]; then apt-get update && apt-get install -y gcc-aarch64-linux-gnu && export CC=aarch64-linux-gnu-gcc && export CC_FOR_TARGET=gcc-aarch64-linux-gnu; fi \
   && GOOS=$TARGETOS GOARCH=$TARGETARCH CC=$CC CC_FOR_TARGET=$CC_FOR_TARGET make build
diff --git a/pkg/source/grpc/batch.go b/pkg/source/grpc/batch.go
index efe9850..6278ae8 100644
--- a/pkg/source/grpc/batch.go
+++ b/pkg/source/grpc/batch.go
@@ -65,16 +65,19 @@ func (b *batch) size() int {
 
 func (b *batch) append(e api.Event) {
 	currentIndex := b.eventIndex
-	header := e.Header()
-	header[batchEventIndexKey] = currentIndex
-	header[batchIndexKey] = b.index
+	e.Meta().Set(batchEventIndexKey, currentIndex)
+	e.Meta().Set(batchIndexKey, b.index)
+
 	b.events[currentIndex] = e
 	b.eventIndex++
 }
 
 func (b *batch) ack(e api.Event) {
-	header := e.Header()
-	index := header[batchEventIndexKey].(int32)
+	indexRaw, exist := e.Meta().Get(batchEventIndexKey)
+	if !exist {
+		return
+	}
+	index := indexRaw.(int32)
 	delete(b.events, index)
 
 	if len(b.events) == 0 {
@@ -166,8 +169,7 @@ func (bc *batchChain) run() {
 			}
 		case es := <-bc.ackEvents:
 			for _, e := range es {
-				header := e.Header()
-				if batchIndex, ok := header[batchIndexKey]; ok {
+				if batchIndex, ok := e.Meta().Get(batchIndexKey); ok {
 					if b, exist := bs[batchIndex.(uint32)]; exist {
 						b.ack(e)
 						if b.isDone() {
-- 
GitLab