Skip to content

Commit

Permalink
feat: 单行长日志截断上报
Browse files Browse the repository at this point in the history
  • Loading branch information
yiqiwang-17 committed Dec 18, 2024
1 parent 65941d5 commit b10c073
Show file tree
Hide file tree
Showing 2 changed files with 45 additions and 21 deletions.
2 changes: 1 addition & 1 deletion filebeat/input/log/reader.go
Original file line number Diff line number Diff line change
Expand Up @@ -662,7 +662,7 @@ func (h *FileHarvester) newLogFileReader() (reader.Reader, error) {
// for the worst case scenario where incoming UTF32 charchers are decoded to the single byte UTF-8 characters.
// This limit serves primarily to avoid memory bload or potential OOM with expectedly long lines in the file.
// The further size limiting is performed by LimitReader at the end of the readers pipeline as needed.
encReaderMaxBytes := h.config.MaxBytes * 4
encReaderMaxBytes := h.config.MaxBytes

r, err = readfile.NewEncodeReader(reader, readfile.Config{
Codec: h.encoding,
Expand Down
64 changes: 44 additions & 20 deletions libbeat/reader/readfile/line.go
Original file line number Diff line number Diff line change
Expand Up @@ -74,7 +74,7 @@ func NewLineReader(input io.Reader, config Config) (*LineReader, error) {

// Next reads the next line until the new line character
func (r *LineReader) Next() ([]byte, int, error) {
// This loop is need in case advance detects an line ending which turns out
// This loop is need in case advance detects a line ending which turns out
// not to be one when decoded. If that is the case, reading continues.
for {
// read next 'potential' line from input buffer/reader
Expand All @@ -97,7 +97,7 @@ func (r *LineReader) Next() ([]byte, int, error) {
end -= len(r.nl)
}

sz, err := r.decode(end)
sz, err := r.decode(end, false)
if err != nil {
logp.Err("Error decoding line: %s", err)
// In case of error increase size by unencoded length
Expand Down Expand Up @@ -176,6 +176,22 @@ func (r *LineReader) advance() error {
// Initial check if buffer has already a newLine character
idx := r.findInBufferIndex(r.inOffset, r.nl)

if r.maxBytes != 0 {
// 如果已找到最后一个换行符索引位置,且超出最大限制,则找到第一行单独处理
if idx != -1 && idx > r.maxBytes {
firstIdx := r.inBuffer.IndexFrom(r.inOffset, r.nl)
if firstIdx-r.maxBytes > 0 {
r.decode(r.maxBytes, true)
} else {
r.decode(firstIdx+len(r.nl), false)
}
err := r.inBuffer.Advance(min(firstIdx+len(r.nl), r.maxBytes))
r.inBuffer.Reset()
r.inOffset = 0
return err
}
}

// fill inBuffer until '\n' sequence has been found in input buffer
for idx == -1 {
// increase search offset to reduce iterations on buffer when looping
Expand Down Expand Up @@ -203,36 +219,41 @@ func (r *LineReader) advance() error {
// Check if buffer has newLine character
idx = r.findInBufferIndex(r.inOffset, r.nl)

// If max bytes limit per line is set, then drop the lines that are longer
// 超出最大限制的长日志处理
if r.maxBytes != 0 {
// If newLine is found, drop the lines longer than maxBytes
for idx != -1 && idx > r.maxBytes {
skipped := idx + len(r.nl)
r.skippedByteCount += skipped
logp.Warn("Exceeded %d max bytes in line limit, skipped %d bytes line", r.maxBytes, skipped)
err = r.inBuffer.Advance(skipped)
// 如果已找到最后一个换行符索引位置,且超出最大限制,则找到第一行单独处理
if idx != -1 && idx > r.maxBytes {
firstIdx := r.inBuffer.IndexFrom(r.inOffset, r.nl)
if firstIdx-r.maxBytes > 0 {
r.decode(r.maxBytes, true)
} else {
r.decode(firstIdx+len(r.nl), false)
}
err := r.inBuffer.Advance(min(firstIdx+len(r.nl), r.maxBytes))
r.inBuffer.Reset()
r.inOffset = 0
idx = r.findInBufferIndex(r.inOffset, r.nl)
return err
}

// If newLine is not found and the incoming data buffer exceeded max bytes limit, then skip until the next newLine
// 如果未找到最后一个换行符索引位置,且超出最大限制,则分批上报,先处理最大限制字节数
if idx == -1 && r.inBuffer.Len() > r.maxBytes {
skipped, err := r.skipUntilNewLine(buf)
r.skippedByteCount += skipped
idx = r.maxBytes
sz, err := r.decode(idx, true)
if err != nil {
logp.Err("Error skipping until new line, err: %v", err)
return err
logp.Err("Error decoding line: %s", err)
// In case of error increase size by unencoded length
sz = idx
}
logp.Warn("Exceeded %d max bytes in line limit, skipped %d bytes line", r.maxBytes, skipped)
idx = r.findInBufferIndex(r.inOffset, r.nl)
err = r.inBuffer.Advance(sz)
r.inBuffer.Reset()
r.inOffset = 0
return err
}
}
}

// found encoded byte sequence for '\n' in buffer
// -> decode input sequence into outBuffer
sz, err := r.decode(idx + len(r.nl))
sz, err := r.decode(idx+len(r.nl), false)
if err != nil {
logp.Err("Error decoding line: %s", err)
// In case of error increase size by unencoded length
Expand Down Expand Up @@ -296,7 +317,7 @@ func (r *LineReader) skipUntilNewLine(buf []byte) (int, error) {
return skipped, nil
}

func (r *LineReader) decode(end int) (int, error) {
func (r *LineReader) decode(end int, addNl bool) (int, error) {
var err error
buffer := make([]byte, 1024)
inBytes := r.inBuffer.Bytes()
Expand All @@ -323,5 +344,8 @@ func (r *LineReader) decode(end int) (int, error) {
}

r.byteCount += start
if addNl {
r.outBuffer.Write(r.nl)
}
return start, err
}

0 comments on commit b10c073

Please sign in to comment.