Skip to content

Commit

Permalink
feat: 行日志支持高性能采集模式
Browse files Browse the repository at this point in the history
  • Loading branch information
jayjiahua authored Dec 11, 2023
2 parents 6aece75 + 03facf9 commit d63e5b8
Show file tree
Hide file tree
Showing 29 changed files with 3,357 additions and 59 deletions.
9 changes: 9 additions & 0 deletions filebeat/input/log/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -70,6 +70,7 @@ var (
CloseEOF: false,
CloseTimeout: 0,
},
LudicrousMode: false,
}
)

Expand Down Expand Up @@ -114,6 +115,9 @@ type config struct {
ForceCRI bool `config:"force_cri_logs"`
CRIFlags bool `config:"cri_flags"`
} `config:"docker-json"`

// ludicrous mode, the collection speed of the single-line-log can reach 100+MB/s !!!
LudicrousMode bool `config:"ludicrous_mode"`
}

type LogConfig struct {
Expand Down Expand Up @@ -232,3 +236,8 @@ func (c *config) normalizeGlobPatterns() error {
c.Paths = paths
return nil
}

func (c *config) IsLudicrousModeActivated() bool {
inSingleLineScene := c.JSON == nil && c.Multiline == nil
return c.LudicrousMode && inSingleLineScene
}
12 changes: 9 additions & 3 deletions filebeat/input/log/harvester.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@ import (
"fmt"
file_helper "github.com/elastic/beats/libbeat/common/file"
"io"
"strings"
"sync"
"time"

Expand Down Expand Up @@ -310,10 +311,15 @@ func (h *Harvester) Run() error {
data.Event.Timestamp = ts
}
} else if &text != nil {
if fields == nil {
fields = common.MapStr{}
if h.config.IsLudicrousModeActivated() {
normalizedText := strings.ReplaceAll(text, "\r\n", "\n")
data.Event.Texts = strings.Split(normalizedText, "\n")
} else {
if fields == nil {
fields = common.MapStr{}
}
fields["data"] = text
}
fields["data"] = text
}

data.Event.Fields = fields
Expand Down
57 changes: 34 additions & 23 deletions filebeat/input/log/reader.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,14 +18,15 @@
// Package log harvests different inputs for new information. Currently
// two harvester types exist:
//
// * log
// * stdin
// - log
//
// The log harvester reads a file line by line. In case the end of a file is found
// with an incomplete line, the line pointer stays at the beginning of the incomplete
// line. As soon as the line is completed, it is read and returned.
// - stdin
//
// The stdin harvesters reads data from stdin.
// The log harvester reads a file line by line. In case the end of a file is found
// with an incomplete line, the line pointer stays at the beginning of the incomplete
// line. As soon as the line is completed, it is read and returned.
//
// The stdin harvesters reads data from stdin.
package log

import (
Expand Down Expand Up @@ -59,7 +60,7 @@ var (
ErrContextNotExists = errors.New("harvester context dose not exists")
)

//ReuseMessage
// ReuseMessage
type ReuseMessage struct {
message reader.Message
error error
Expand Down Expand Up @@ -104,7 +105,7 @@ func NewReuseHarvester(
return r, nil
}

//Next: 按行读取文件内容,并根据harvester offset返回
// Next: 按行读取文件内容,并根据harvester offset返回
func (r *ReuseHarvester) Next() (reader.Message, error) {
select {
case <-r.done:
Expand All @@ -116,7 +117,7 @@ func (r *ReuseHarvester) Next() (reader.Message, error) {
}
}

//OnMessage:
// OnMessage:
func (r *ReuseHarvester) OnMessage(message ReuseMessage) error {
select {
case <-r.done:
Expand All @@ -128,19 +129,19 @@ func (r *ReuseHarvester) OnMessage(message ReuseMessage) error {
}
}

//Stop: 停止harvester
// Stop: 停止harvester
func (r *ReuseHarvester) Stop() {
r.closeOnce.Do(func() {
close(r.done)
})
}

//HasState
// HasState
func (r *ReuseHarvester) HasState() bool {
return r.fileReader.HasState()
}

//GetState
// GetState
func (r *ReuseHarvester) GetState() file.State {
return r.State
}
Expand Down Expand Up @@ -233,7 +234,7 @@ func (m *FileReaderManager) cleanup() {
}
}

//FileHarvester:
// FileHarvester:
type FileHarvester struct {
config config
state file.State
Expand All @@ -258,7 +259,7 @@ type FileHarvester struct {
forwarder chan *ReuseHarvester
}

//newFileHarvester: get file harvester
// newFileHarvester: get file harvester
func newFileHarvester(reuseReader *ReuseHarvester) (*FileHarvester, error) {
r := &FileHarvester{
config: reuseReader.Config,
Expand All @@ -281,7 +282,7 @@ func newFileHarvester(reuseReader *ReuseHarvester) (*FileHarvester, error) {
return r, nil
}

//addForwarder:
// addForwarder:
func (h *FileHarvester) AddForwarder(reuseReader *ReuseHarvester) error {
h.forwardersLock.Lock()
defer h.forwardersLock.Unlock()
Expand Down Expand Up @@ -314,17 +315,17 @@ func (h *FileHarvester) AddForwarder(reuseReader *ReuseHarvester) error {
return nil
}

//HasReuseReader
// HasReuseReader
func (h *FileHarvester) HasReuseReader() bool {
return len(h.forwarders) > 0
}

//HasState
// HasState
func (h *FileHarvester) HasState() bool {
return h.source.HasState()
}

//Run: 从最小的Offset读取一行,并发送给所有匹配的reader
// Run: 从最小的Offset读取一行,并发送给所有匹配的reader
func (h *FileHarvester) Run() {
defer func() {
L:
Expand Down Expand Up @@ -494,7 +495,7 @@ func (h *FileHarvester) Close() {
})
}

//Setup: 打开文件FD,首次执行会直接转到第一个state.offset
// Setup: 打开文件FD,首次执行会直接转到第一个state.offset
func (h *FileHarvester) Setup() error {
err := h.open()
if err != nil {
Expand All @@ -510,7 +511,7 @@ func (h *FileHarvester) Setup() error {
return nil
}

//Close: 关闭FD
// Close: 关闭FD
func (h *FileHarvester) closeFile() {
if h.source != nil {
logp.Info("file harvester is close, file:%s", h.state.Source)
Expand Down Expand Up @@ -629,12 +630,12 @@ func (h *FileHarvester) initFileOffset(file *os.File) (int64, error) {
//
// It creates a chain of readers which looks as following:
//
// limit -> (multiline -> timeout) -> strip_newline -> json -> encode -> line -> log_file
// limit -> (multiline -> timeout) -> strip_newline -> json -> encode -> line -> log_file
//
// Each reader on the left, contains the reader on the right and calls `Next()` to fetch more data.
// At the base of all readers the the log_file reader. That means in the data is flowing in the opposite direction:
//
// log_file -> line -> encode -> json -> strip_newline -> (timeout -> multiline) -> limit
// log_file -> line -> encode -> json -> strip_newline -> (timeout -> multiline) -> limit
//
// log_file implements io.Reader interface and encode reader is an adapter for io.Reader to
// reader.Reader also handling file encodings. All other readers implement reader.Reader
Expand All @@ -655,6 +656,8 @@ func (h *FileHarvester) newLogFileReader() (reader.Reader, error) {
return nil, err
}

logp.Info("NewLogFileReader: %s, LudicrousMode: %v", h.state.Source, h.config.IsLudicrousModeActivated())

// Configure MaxBytes limit for EncodeReader as multiplied by 4
// for the worst case scenario where incoming UTF32 charchers are decoded to the single byte UTF-8 characters.
// This limit serves primarily to avoid memory bload or potential OOM with expectedly long lines in the file.
Expand All @@ -665,14 +668,22 @@ func (h *FileHarvester) newLogFileReader() (reader.Reader, error) {
Codec: h.encoding,
BufferSize: h.config.BufferSize,
MaxBytes: encReaderMaxBytes,
BatchMode: h.config.IsLudicrousModeActivated(),
})
if err != nil {
return nil, err
}

if h.config.DockerJSON != nil {
// Docker json-file format, add custom parsing to the pipeline
r = readjson.New(r, h.config.DockerJSON.Stream, h.config.DockerJSON.Partial, h.config.DockerJSON.ForceCRI, h.config.DockerJSON.CRIFlags)
r = readjson.New(
r,
h.config.DockerJSON.Stream,
h.config.DockerJSON.Partial,
h.config.DockerJSON.ForceCRI,
h.config.DockerJSON.CRIFlags,
h.config.IsLudicrousModeActivated(),
)
}

if h.config.JSON != nil {
Expand Down
29 changes: 29 additions & 0 deletions libbeat/beat/event.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ const FlagField = "log.flags"
// Output can optionally publish a subset of Meta, or ignore Meta.
type Event struct {
Timestamp time.Time
Texts []string
Meta common.MapStr
Fields common.MapStr
Private interface{} // for beats private use
Expand Down Expand Up @@ -77,3 +78,31 @@ func (e *Event) PutValue(key string, v interface{}) (interface{}, error) {
func (e *Event) Delete(key string) error {
return e.Fields.Delete(key)
}

func (e *Event) HasTexts() bool {
return len(e.Texts) > 0
}

func (e *Event) Count() int {
if !e.HasTexts() {
return 1
}
return len(e.Texts)
}

func (e *Event) GetTexts() []string {
if e.HasTexts() {
// 如果 texts 字段有值,直接返回
return e.Texts
}
if e.Fields == nil {
return nil
}
if _, ok := e.Fields["data"]; ok {
if text, valid := e.Fields["data"].(string); valid {
// 如果 data 字段有值,且为字符串,则返回单个字符串的切片
return []string{text}
}
}
return nil
}
15 changes: 15 additions & 0 deletions libbeat/common/streambuf/streambuf.go
Original file line number Diff line number Diff line change
Expand Up @@ -398,6 +398,21 @@ func (b *Buffer) IndexFrom(from int, seq []byte) int {
return idx + from + b.mark
}

// LastIndexFrom returns offset of seq in unprocessed buffer start at from.
// Returns -1 if seq can not be found.
func (b *Buffer) LastIndexFrom(from int, seq []byte) int {
if b.err != nil {
return -1
}

idx := bytes.LastIndex(b.data[b.mark+from:], seq)
if idx < 0 {
return -1
}

return idx + from + b.mark
}

// IndexByte returns offset of byte in unprocessed buffer.
// Returns -1 if byte not in buffer.
func (b *Buffer) IndexByte(byte byte) int {
Expand Down
1 change: 1 addition & 0 deletions libbeat/reader/readfile/encode.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@ type Config struct {
Codec encoding.Encoding
BufferSize int
MaxBytes int
BatchMode bool
}

// New creates a new Encode reader from input reader by applying
Expand Down
17 changes: 13 additions & 4 deletions libbeat/reader/readfile/line.go
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,7 @@ type LineReader struct {
decoder transform.Transformer

skippedByteCount int // number of bytes skipped, when the line is too long
batchMode bool
}

// New creates a new reader object
Expand All @@ -67,6 +68,7 @@ func NewLineReader(input io.Reader, config Config) (*LineReader, error) {
decoder: config.Codec.NewDecoder(),
inBuffer: streambuf.New(nil),
outBuffer: streambuf.New(nil),
batchMode: config.BatchMode,
}, nil
}

Expand Down Expand Up @@ -161,11 +163,18 @@ func (r *LineReader) Next() ([]byte, int, error) {
return bytes, sz, nil
}

func (r *LineReader) findInBufferIndex(from int, seq []byte) int {
if r.batchMode {
return r.inBuffer.LastIndexFrom(from, seq)
}
return r.inBuffer.IndexFrom(from, seq)
}

// Reads from the buffer until a new line character is detected
// Returns an error otherwise
func (r *LineReader) advance() error {
// Initial check if buffer has already a newLine character
idx := r.inBuffer.IndexFrom(r.inOffset, r.nl)
idx := r.findInBufferIndex(r.inOffset, r.nl)

// fill inBuffer until '\n' sequence has been found in input buffer
for idx == -1 {
Expand All @@ -192,7 +201,7 @@ func (r *LineReader) advance() error {
}

// Check if buffer has newLine character
idx = r.inBuffer.IndexFrom(r.inOffset, r.nl)
idx = r.findInBufferIndex(r.inOffset, r.nl)

// If max bytes limit per line is set, then drop the lines that are longer
if r.maxBytes != 0 {
Expand All @@ -204,7 +213,7 @@ func (r *LineReader) advance() error {
err = r.inBuffer.Advance(skipped)
r.inBuffer.Reset()
r.inOffset = 0
idx = r.inBuffer.IndexFrom(r.inOffset, r.nl)
idx = r.findInBufferIndex(r.inOffset, r.nl)
}

// If newLine is not found and the incoming data buffer exceeded max bytes limit, then skip until the next newLine
Expand All @@ -216,7 +225,7 @@ func (r *LineReader) advance() error {
return err
}
logp.Warn("Exceeded %d max bytes in line limit, skipped %d bytes line", r.maxBytes, skipped)
idx = r.inBuffer.IndexFrom(r.inOffset, r.nl)
idx = r.findInBufferIndex(r.inOffset, r.nl)
}
}
}
Expand Down
Loading

0 comments on commit d63e5b8

Please sign in to comment.