Skip to content

Commit

Permalink
Merge pull request #23 from jayjiahua/master
Browse files Browse the repository at this point in the history
feat: 增加旁路检测文件删除并及时释放fd
  • Loading branch information
liuwenping authored Jun 28, 2023
2 parents 9d7030b + 77b7134 commit 4bc1fc2
Show file tree
Hide file tree
Showing 7 changed files with 94 additions and 17 deletions.
1 change: 1 addition & 0 deletions filebeat/harvester/source.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ import (
type Source interface {
io.ReadCloser
Name() string
Removed() bool // check if source has been removed
Stat() (os.FileInfo, error)
Continuable() bool // can we continue processing after EOF?
HasState() bool // does this source have a state?
Expand Down
7 changes: 6 additions & 1 deletion filebeat/input/log/file.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,11 +17,16 @@

package log

import "os"
import (
"os"

"github.com/elastic/beats/libbeat/common/file"
)

type File struct {
*os.File
}

func (File) Continuable() bool { return true }
func (File) HasState() bool { return true }
func (f File) Removed() bool { return file.IsRemoved(f.File) }
46 changes: 34 additions & 12 deletions filebeat/input/log/harvester.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,14 +18,15 @@
// Package log harvests different inputs for new information. Currently
// two harvester types exist:
//
// * log
// * stdin
// - log
//
// The log harvester reads a file line by line. In case the end of a file is found
// with an incomplete line, the line pointer stays at the beginning of the incomplete
// line. As soon as the line is completed, it is read and returned.
// - stdin
//
// The stdin harvesters reads data from stdin.
// The log harvester reads a file line by line. In case the end of a file is found
// with an incomplete line, the line pointer stays at the beginning of the incomplete
// line. As soon as the line is completed, it is read and returned.
//
// The stdin harvesters reads data from stdin.
package log

import (
Expand Down Expand Up @@ -198,12 +199,33 @@ func (h *Harvester) Run() error {
closeTimeout = time.After(h.config.CloseTimeout)
}

select {
// Applies when timeout is reached
case <-closeTimeout:
logp.Info("Closing harvester because close_timeout was reached: %s", source)
// Required when reader loop returns and reader finished
case <-h.done:
removedCheckTick := make(<-chan time.Time)
if h.config.CloseRemoved {
removedCheckTick = time.After(h.config.ScanFrequency)
}

L:
for {
select {
// Applies when timeout is reached
case <-closeTimeout:
logp.Info("Closing harvester because close_timeout was reached: %s", source)
break L
// Check whether file is removed
case <-removedCheckTick:
// 通过旁路判断文件是否被删除,避免输出堵塞时未能执行 errorChecks 导致已删文件的 fd 没有被及时释放的问题
if h.reader.fileReader.log.fs.Removed() {
logp.Info("Closing harvester because file was removed: %s", source)
break L
} else {
logp.Debug("harvester", "File was not removed: %s, check again after %v", source, h.config.ScanFrequency)
// update timer
removedCheckTick = time.After(h.config.ScanFrequency)
}
// Required when reader loop returns and reader finished
case <-h.done:
break L
}
}

h.stop()
Expand Down
5 changes: 1 addition & 4 deletions filebeat/input/log/log.go
Original file line number Diff line number Diff line change
Expand Up @@ -152,10 +152,7 @@ func (f *Log) errorChecks(err error) error {

if f.config.CloseRemoved {
// Check if the file name exists. See https://github.com/elastic/filebeat/issues/93
_, statErr := os.Stat(f.fs.Name())

// Error means file does not exist.
if statErr != nil {
if f.fs.Removed() {
return ErrRemoved
}
}
Expand Down
1 change: 1 addition & 0 deletions filebeat/input/log/stdin.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,3 +44,4 @@ func (p Pipe) Name() string { return p.File.Name() }
func (p Pipe) Stat() (os.FileInfo, error) { return p.File.Stat() }
func (p Pipe) Continuable() bool { return false }
func (p Pipe) HasState() bool { return false }
func (p Pipe) Removed() bool { return false }
12 changes: 12 additions & 0 deletions libbeat/common/file/file_other.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
// specific language governing permissions and limitations
// under the License.

//go:build !windows
// +build !windows

package file
Expand Down Expand Up @@ -62,3 +63,14 @@ func ReadOpen(path string) (*os.File, error) {
perm := os.FileMode(0)
return os.OpenFile(path, flag, perm)
}

// IsRemoved checks wheter the file held by f is removed.
func IsRemoved(f *os.File) bool {
stat, err := f.Stat()
if err != nil {
// if we got an error from a Stat call just assume we are removed
return true
}
sysStat := stat.Sys().(*syscall.Stat_t)
return sysStat.Nlink == 0
}
39 changes: 39 additions & 0 deletions libbeat/common/file/file_windows.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,9 @@ import (
"reflect"
"strconv"
"syscall"
"unsafe"

"golang.org/x/sys/windows"
)

type StateOS struct {
Expand All @@ -31,6 +34,12 @@ type StateOS struct {
Vol uint64 `json:"vol,"`
}

var (
modkernel32 = windows.NewLazySystemDLL("kernel32.dll")

procGetFileInformationByHandleEx = modkernel32.NewProc("GetFileInformationByHandleEx")
)

// GetOSState returns the platform specific StateOS
func GetOSState(info os.FileInfo) StateOS {
// os.SameFile must be called to populate the id fields. Otherwise in case for example
Expand Down Expand Up @@ -107,3 +116,33 @@ func ReadOpen(path string) (*os.File, error) {

return os.NewFile(uintptr(handle), path), nil
}

// IsRemoved checks wheter the file held by f is removed.
// On Windows IsRemoved reads the DeletePending flags using the GetFileInformationByHandleEx.
// A file is not removed/unlinked as long as at least one process still own a
// file handle. A delete file is only marked as deleted, and file attributes
// can still be read. Only opening a file marked with 'DeletePending' will
// fail.
func IsRemoved(f *os.File) bool {
hdl := f.Fd()
if hdl == uintptr(syscall.InvalidHandle) {
return false
}

info := struct {
AllocationSize int64
EndOfFile int64
NumberOfLinks int32
DeletePending bool
Directory bool
}{}
infoSz := unsafe.Sizeof(info)

const class = 1 // FileStandardInfo
r1, _, _ := syscall.Syscall6(
procGetFileInformationByHandleEx.Addr(), 4, uintptr(hdl), class, uintptr(unsafe.Pointer(&info)), infoSz, 0, 0)
if r1 == 0 {
return true // assume file is removed if syscall errors
}
return info.DeletePending
}

0 comments on commit 4bc1fc2

Please sign in to comment.