Unverified Commit a5073ab6 authored by mmaxiaolei's avatar mmaxiaolei Committed by GitHub
Browse files

refactor: reader process chain (#166)

* refactor: reader process chain
* chore: remove unused code
parent d54f3e6c
No related merge requests found
Showing with 429 additions and 140 deletions
+429 -140
......@@ -46,6 +46,7 @@ import (
_ "github.com/loggie-io/loggie/pkg/sink/loki"
_ "github.com/loggie-io/loggie/pkg/source/dev"
_ "github.com/loggie-io/loggie/pkg/source/file"
_ "github.com/loggie-io/loggie/pkg/source/file/process"
_ "github.com/loggie-io/loggie/pkg/source/grpc"
_ "github.com/loggie-io/loggie/pkg/source/kafka"
_ "github.com/loggie-io/loggie/pkg/source/kubernetes_event"
......
......@@ -59,14 +59,15 @@ type Job struct {
nextOffset int64
currentLineNumber int64
currentLines int64
eofCount int
lastActiveTime time.Time
deleteTime atomic.Value
renameTime atomic.Value
stopTime atomic.Value
identifier string
task *WatchTask
EofCount int
LastActiveTime time.Time
}
func JobUid(fileInfo os.FileInfo) string {
......@@ -229,8 +230,8 @@ func (j *Job) Active() (error, bool) {
}
}
j.ChangeStatusTo(JobActive)
j.eofCount = 0
j.lastActiveTime = time.Now()
j.EofCount = 0
j.LastActiveTime = time.Now()
return nil, fdOpen
}
......@@ -287,6 +288,10 @@ func (j *Job) Read() {
j.task.activeChan <- j
}
func (j *Job) File() *os.File {
return j.file
}
const tsLayout = "2006-01-02T15:04:05.000Z"
func (j *Job) ProductEvent(endOffset int64, collectTime time.Time, body []byte) {
......
package file
import (
"errors"
"io"
"sort"
"strings"
"github.com/loggie-io/loggie/pkg/core/log"
)
type JobCollectContext struct {
Job *Job
Filename string
LastOffset int64
BacklogBuffer []byte
ReadBuffer []byte
// runtime property
WasSend bool
IsEOF bool
}
func NewJobCollectContextAndValidate(job *Job, readBuffer, backlogBuffer []byte) (*JobCollectContext, error) {
lastOffset, err := validateJob(job)
if err != nil {
return nil, err
}
return &JobCollectContext{
Job: job,
Filename: job.filename,
LastOffset: lastOffset,
BacklogBuffer: backlogBuffer,
ReadBuffer: readBuffer,
}, nil
}
func validateJob(job *Job) (lastOffset int64, err error) {
filename := job.filename
status := job.status
if status == JobStop {
log.Info("Job(uid: %s) file(%s) status(%d) is stop, Job will be ignore", job.Uid(), filename, status)
return 0, errors.New("Job is stop")
}
osFile := job.file
if osFile == nil {
log.Error("Job(uid: %s) file(%s) released,Job will be ignore", job.Uid(), filename)
return 0, errors.New("Job file released")
}
lastOffset, err = osFile.Seek(0, io.SeekCurrent)
if err != nil {
log.Error("can't get offset, file(name:%s) seek error, err: %v", filename, err)
return 0, err
}
return lastOffset, nil
}
type ProcessChain interface {
Process(ctx *JobCollectContext)
}
type abstractProcessChain struct {
DoProcess func(ctx *JobCollectContext)
}
func (ap *abstractProcessChain) Process(ctx *JobCollectContext) {
ap.DoProcess(ctx)
}
type Processor interface {
Order() int
Code() string
Process(processorChain ProcessChain, ctx *JobCollectContext)
}
type sortableProcessor struct {
processors []Processor
}
func newSortableProcessor() *sortableProcessor {
return &sortableProcessor{
processors: make([]Processor, 0),
}
}
func (sp *sortableProcessor) Len() int {
return len(sp.processors)
}
func (sp *sortableProcessor) Less(i, j int) bool {
pi := sp.processors[i]
pj := sp.processors[j]
// Reverse order
return pi.Order() > pj.Order()
}
func (sp *sortableProcessor) Swap(i, j int) {
sp.processors[i], sp.processors[j] = sp.processors[j], sp.processors[i]
}
func (sp *sortableProcessor) Sort() {
sort.Sort(sp)
}
func (sp *sortableProcessor) Append(processor Processor) {
sp.processors = append(sp.processors, processor)
}
func (sp *sortableProcessor) Processors() []Processor {
return sp.processors
}
func NewProcessChain(config ReaderConfig) ProcessChain {
l := len(processFactories)
if l == 0 {
return nil
}
sp := newSortableProcessor()
for _, factory := range processFactories {
sp.Append(factory(config))
}
sp.Sort()
processors := sp.Processors()
var processChainName strings.Builder
processChainName.WriteString("end<-")
last := &abstractProcessChain{
DoProcess: func(ctx *JobCollectContext) {
// do nothing
},
}
for _, processor := range processors {
tempProcessor := processor
next := last
last = &abstractProcessChain{
DoProcess: func(ctx *JobCollectContext) {
tempProcessor.Process(next, ctx)
},
}
processChainName.WriteString(tempProcessor.Code())
processChainName.WriteString("<-")
}
processChainName.WriteString("start")
log.Info("process chain: %s", processChainName.String())
return last
}
var processFactories = make([]ProcessFactory, 0)
type ProcessFactory func(config ReaderConfig) Processor
func RegisterProcessor(factory ProcessFactory) {
processFactories = append(processFactories, factory)
}
package process
import (
"io"
"time"
"github.com/loggie-io/loggie/pkg/core/log"
"github.com/loggie-io/loggie/pkg/source/file"
)
func init() {
file.RegisterProcessor(makeLastLine)
}
func makeLastLine(config file.ReaderConfig) file.Processor {
return &LastLineProcessor{
inactiveTimeout: config.InactiveTimeout,
}
}
type LastLineProcessor struct {
inactiveTimeout time.Duration
}
func (llp *LastLineProcessor) Order() int {
return 100
}
func (llp *LastLineProcessor) Code() string {
return "lastLine"
}
func (llp *LastLineProcessor) Process(processorChain file.ProcessChain, ctx *file.JobCollectContext) {
ctx.BacklogBuffer = ctx.BacklogBuffer[:0]
// see LoopProcessor.Process()
processorChain.Process(ctx)
// check last line
l := len(ctx.BacklogBuffer)
if l <= 0 {
return
}
job := ctx.Job
// When it is necessary to back off the offset, check whether it is inactive to collect the last line
isLastLineSend := false
if ctx.IsEOF && !ctx.WasSend {
if time.Since(job.LastActiveTime) >= llp.inactiveTimeout {
// Send "last line"
endOffset := ctx.LastOffset
job.ProductEvent(endOffset, time.Now(), ctx.BacklogBuffer)
job.LastActiveTime = time.Now()
isLastLineSend = true
// Ignore the /n that may be written next.
// Because the "last line" of the collection thinks that either it will not be written later,
// or it will write /n first, and then write the content of the next line,
// it is necessary to seek a position later to ignore the /n that may be written
_, err := job.File().Seek(1, io.SeekCurrent)
if err != nil {
log.Error("can't set offset, file(name:%s) seek error: %v", ctx.Filename, err)
}
} else {
// Enable the job to escape and collect the last line
job.EofCount = 0
}
}
// Fallback accumulated buffer offset
if !isLastLineSend {
backwardOffset := int64(-l)
_, err := job.File().Seek(backwardOffset, io.SeekCurrent)
if err != nil {
log.Error("can't set offset(%d), file(name:%s) seek error: %v", backwardOffset, ctx.Filename, err)
}
return
}
}
package process
import (
"bytes"
"time"
"github.com/loggie-io/loggie/pkg/source/file"
)
func init() {
file.RegisterProcessor(makeLine)
}
func makeLine(config file.ReaderConfig) file.Processor {
return &LineProcessor{}
}
type LineProcessor struct {
}
func (lp *LineProcessor) Order() int {
return 500
}
func (lp *LineProcessor) Code() string {
return "line"
}
func (lp *LineProcessor) Process(processorChain file.ProcessChain, ctx *file.JobCollectContext) {
job := ctx.Job
now := time.Now()
readBuffer := ctx.ReadBuffer
read := int64(len(readBuffer))
processed := int64(0)
for processed < read {
index := int64(bytes.IndexByte(readBuffer[processed:], '\n'))
if index == -1 {
break
}
index += processed
endOffset := ctx.LastOffset + index
if len(ctx.BacklogBuffer) != 0 {
ctx.BacklogBuffer = append(ctx.BacklogBuffer, readBuffer[processed:index]...)
job.ProductEvent(endOffset, now, ctx.BacklogBuffer)
// Clean the backlog buffer after sending
ctx.BacklogBuffer = ctx.BacklogBuffer[:0]
} else {
job.ProductEvent(endOffset, now, readBuffer[processed:index])
}
processed = index + 1
}
ctx.LastOffset += read
ctx.WasSend = processed != 0
// The remaining bytes read are added to the backlog buffer
if processed < read {
ctx.BacklogBuffer = append(ctx.BacklogBuffer, readBuffer[processed:]...)
// TODO check whether it is too long to avoid bursting the memory
//if len(backlogBuffer)>max_bytes{
// log.Error
// break
//}
}
}
package process
import (
"github.com/loggie-io/loggie/pkg/source/file"
"time"
)
func init() {
file.RegisterProcessor(makeLoop)
}
func makeLoop(config file.ReaderConfig) file.Processor {
return &LoopProcessor{
maxContinueRead: config.MaxContinueRead,
maxContinueReadTimeout: config.MaxContinueReadTimeout,
}
}
type LoopProcessor struct {
maxContinueRead int
maxContinueReadTimeout time.Duration
continueRead int
startReadTime time.Time
}
func (bp *LoopProcessor) Order() int {
return 200
}
func (bp *LoopProcessor) Code() string {
return "loop"
}
func (bp *LoopProcessor) Process(processorChain file.ProcessChain, ctx *file.JobCollectContext) {
bp.startReadTime = time.Now()
bp.continueRead = 0
for {
// see SourceProcessor.Process
processorChain.Process(ctx)
if ctx.IsEOF {
break
}
bp.continueRead++
// According to the number of batches 2048, a maximum of one batch can be read,
// and a single event is calculated according to 512 bytes, that is, the maximum reading is 1mb ,maxContinueRead = 16 by default
// SSD recommends that maxContinueRead be increased by 3 ~ 5x
if bp.continueRead > bp.maxContinueRead {
break
}
if time.Since(bp.startReadTime) > bp.maxContinueReadTimeout {
break
}
}
// send event, reset job eof count
if ctx.WasSend {
ctx.Job.EofCount = 0
ctx.Job.LastActiveTime = time.Now()
}
}
package process
import (
"errors"
"github.com/loggie-io/loggie/pkg/core/log"
"github.com/loggie-io/loggie/pkg/source/file"
"io"
)
func init() {
file.RegisterProcessor(makeSource)
}
func makeSource(config file.ReaderConfig) file.Processor {
return &SourceProcessor{
readBufferSize: config.ReadBufferSize,
}
}
type SourceProcessor struct {
readBufferSize int
}
func (sp *SourceProcessor) Order() int {
return 300
}
func (sp *SourceProcessor) Code() string {
return "source"
}
func (sp *SourceProcessor) Process(processorChain file.ProcessChain, ctx *file.JobCollectContext) {
job := ctx.Job
ctx.ReadBuffer = ctx.ReadBuffer[:sp.readBufferSize]
l, err := job.File().Read(ctx.ReadBuffer)
if errors.Is(err, io.EOF) || l == 0 {
ctx.IsEOF = true
job.EofCount++
return
}
if err != nil {
ctx.IsEOF = true
log.Error("file(name:%s) read fail: %v", ctx.Filename, err)
return
}
read := int64(l)
ctx.ReadBuffer = ctx.ReadBuffer[:read]
// see lineProcessor.Process
processorChain.Process(ctx)
}
......@@ -17,15 +17,12 @@ limitations under the License.
package file
import (
"bytes"
"io"
"sync"
"time"
"github.com/loggie-io/loggie/pkg/core/event"
"github.com/loggie-io/loggie/pkg/core/log"
"github.com/loggie-io/loggie/pkg/pipeline"
"github.com/pkg/errors"
)
const (
......@@ -111,148 +108,25 @@ func (r *Reader) work(index int) {
r.countDown.Done()
}()
readBufferSize := r.config.ReadBufferSize
maxContinueReadTimeout := r.config.MaxContinueReadTimeout
maxContinueRead := r.config.MaxContinueRead
inactiveTimeout := r.config.InactiveTimeout
backlogBuffer := make([]byte, 0, readBufferSize)
readBuffer := make([]byte, readBufferSize)
jobs := r.jobChan
processChain := r.buildProcessChain()
for {
select {
case <-r.done:
return
case job := <-jobs:
filename := job.filename
status := job.status
if status == JobStop {
log.Info("job(uid: %s) file(%s) status(%d) is stop, job will be ignore", job.Uid(), filename, status)
r.watcher.decideJob(job)
continue
}
file := job.file
if file == nil {
log.Error("job(uid: %s) file(%s) released,job will be ignore", job.Uid(), filename)
r.watcher.decideJob(job)
continue
}
lastOffset, err := file.Seek(0, io.SeekCurrent)
ctx, err := NewJobCollectContextAndValidate(job, readBuffer, backlogBuffer)
if err != nil {
log.Error("can't get offset, file(name:%s) seek error, err: %v", filename, err)
r.watcher.decideJob(job)
continue
}
job.currentLines = 0
startReadTime := time.Now()
continueRead := 0
isEOF := false
wasSend := false
readTotal := int64(0)
processed := int64(0)
backlogBuffer = backlogBuffer[:0]
for {
readBuffer = readBuffer[:readBufferSize]
l, readErr := file.Read(readBuffer)
if errors.Is(readErr, io.EOF) || l == 0 {
isEOF = true
job.eofCount++
break
}
if readErr != nil {
log.Error("file(name:%s) read error, err: %v", filename, err)
break
}
read := int64(l)
readBuffer = readBuffer[:read]
now := time.Now()
processed = 0
for processed < read {
index := int64(bytes.IndexByte(readBuffer[processed:], '\n'))
if index == -1 {
break
}
index += processed
endOffset := lastOffset + readTotal + index
if len(backlogBuffer) != 0 {
backlogBuffer = append(backlogBuffer, readBuffer[processed:index]...)
job.ProductEvent(endOffset, now, backlogBuffer)
// Clean the backlog buffer after sending
backlogBuffer = backlogBuffer[:0]
} else {
job.ProductEvent(endOffset, now, readBuffer[processed:index])
}
processed = index + 1
}
readTotal += read
// The remaining bytes read are added to the backlog buffer
if processed < read {
backlogBuffer = append(backlogBuffer, readBuffer[processed:]...)
// TODO check whether it is too long to avoid bursting the memory
// if len(backlogBuffer)>max_bytes{
// log.Error
// break
// }
}
wasSend = processed != 0
if wasSend {
continueRead++
// According to the number of batches 2048, a maximum of one batch can be read,
// and a single event is calculated according to 512 bytes, that is, the maximum reading is 1mb ,maxContinueRead = 16 by default
// SSD recommends that maxContinueRead be increased by 3 ~ 5x
if continueRead > maxContinueRead {
break
}
if time.Since(startReadTime) > maxContinueReadTimeout {
break
}
}
}
if wasSend {
job.eofCount = 0
job.lastActiveTime = time.Now()
}
l := len(backlogBuffer)
if l > 0 {
// When it is necessary to back off the offset, check whether it is inactive to collect the last line
wasLastLineSend := false
if isEOF && !wasSend {
if time.Since(job.lastActiveTime) >= inactiveTimeout {
// Send "last line"
endOffset := lastOffset + readTotal
job.ProductEvent(endOffset, time.Now(), backlogBuffer)
job.lastActiveTime = time.Now()
wasLastLineSend = true
// Ignore the /n that may be written next.
// Because the "last line" of the collection thinks that either it will not be written later,
// or it will write /n first, and then write the content of the next line,
// it is necessary to seek a position later to ignore the /n that may be written
_, err = file.Seek(1, io.SeekCurrent)
if err != nil {
log.Error("can't set offset, file(name:%s) seek error: %v", filename, err)
}
} else {
// Enable the job to escape and collect the last line
job.eofCount = 0
}
}
// Fallback accumulated buffer offset
if !wasLastLineSend {
backwardOffset := int64(-l)
_, err = file.Seek(backwardOffset, io.SeekCurrent)
if err != nil {
log.Error("can't set offset, file(name:%s) seek error: %v", filename, err)
}
}
}
processChain.Process(ctx)
r.watcher.decideJob(job)
}
}
}
func (r *Reader) buildProcessChain() ProcessChain {
return NewProcessChain(r.config)
}
......@@ -187,7 +187,7 @@ func (w *Watcher) decideJob(job *Job) {
return
}
// inactive
if job.eofCount > w.config.MaxEofCount {
if job.EofCount > w.config.MaxEofCount {
w.zombieJobChan <- job
return
}
......@@ -212,6 +212,7 @@ func (w *Watcher) reportMetric(job *Job) {
// FileSize: fileSize,
SourceFields: job.task.sourceFields,
}
job.currentLines = 0
eventbus.PublishOrDrop(eventbus.FileSourceMetricTopic, collectMetricData)
}
......@@ -276,7 +277,7 @@ func (w *Watcher) eventBus(e jobEvent) {
existAckOffset := existRegistry.Offset
fileSize := stat.Size()
// check whether the existAckOffset is larger than the file size
if existAckOffset > fileSize {
if existAckOffset > fileSize+1 {
log.Warn("new job(jobUid:%s) fileName(%s) existRegistry(%+v) ackOffset is larger than file size(%d).is inode repeat?", job.Uid(), filename, existRegistry, fileSize)
// file was truncated,start from the beginning
if job.task.config.RereadTruncated {
......@@ -537,7 +538,7 @@ func (w *Watcher) scanZombieJob() {
}
} else {
// release fd
if time.Since(job.lastActiveTime) > w.config.FdHoldTimeoutWhenInactive {
if time.Since(job.LastActiveTime) > w.config.FdHoldTimeoutWhenInactive {
if job.Release() {
w.currentOpenFds--
}
......
Supports Markdown
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment