Skip to content

Commit

Permalink
refactor
Browse files Browse the repository at this point in the history
  • Loading branch information
cheng-zhongliang committed Oct 15, 2023
1 parent f509922 commit 56ccc6b
Show file tree
Hide file tree
Showing 4 changed files with 53 additions and 86 deletions.
14 changes: 7 additions & 7 deletions epoll.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,26 +33,26 @@ type fdEvent struct {
evs uint32
}

type poller struct {
type poll struct {
fd int
fdEvents map[int]*fdEvent
events []syscall.EpollEvent
}

func newPoller() (*poller, error) {
func openPoll() (*poll, error) {
fd, err := syscall.EpollCreate1(0)
if err != nil {
return nil, err
}

return &poller{
return &poll{
fd: fd,
fdEvents: make(map[int]*fdEvent),
events: make([]syscall.EpollEvent, initialNEvent),
}, nil
}

func (ep *poller) add(ev *Event) error {
func (ep *poll) add(ev *Event) error {
op := syscall.EPOLL_CTL_ADD
es, ok := ep.fdEvents[ev.fd]
if ok {
Expand Down Expand Up @@ -82,7 +82,7 @@ func (ep *poller) add(ev *Event) error {
return syscall.EpollCtl(ep.fd, op, ev.fd, &epEv)
}

func (ep *poller) del(ev *Event) error {
func (ep *poll) del(ev *Event) error {
es := ep.fdEvents[ev.fd]

if ev.events&EvRead != 0 {
Expand Down Expand Up @@ -114,7 +114,7 @@ func (ep *poller) del(ev *Event) error {
return syscall.EpollCtl(ep.fd, op, ev.fd, &epEv)
}

func (ep *poller) polling(cb func(ev *Event, res uint32), timeout time.Duration) error {
func (ep *poll) wait(cb func(ev *Event, res uint32), timeout time.Duration) error {
n, err := syscall.EpollWait(ep.fd, ep.events, int(timeout.Milliseconds()))
if err != nil && !temporaryErr(err) {
return err
Expand Down Expand Up @@ -151,6 +151,6 @@ func (ep *poller) polling(cb func(ev *Event, res uint32), timeout time.Duration)
return nil
}

func (ep *poller) close() error {
func (ep *poll) close() error {
return syscall.Close(ep.fd)
}
91 changes: 31 additions & 60 deletions event.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,19 +26,19 @@ const (
// EvLoopNoblock is the flag to control event base loop not block.
EvLoopNoblock = 002

// evListInserted is the flag to indicate the event is in the event list.
evListInserted = 0x01
// evListActive is the flag to indicate the event is in the active event list.
evListActive = 0x02
// evListTimeout is the flag to indicate the event is in the timeout event heap.
evListTimeout = 0x04

// HPri is the high priority.
HPri eventPriority = 0b00
// MPri is the middle priority.
MPri eventPriority = 0b01
// LPri is the low priority.
LPri eventPriority = 0b10

// evListInserted is the flag to indicate the event is in the event list.
evListInserted = 0x01
// evListActive is the flag to indicate the event is in the active event list.
evListActive = 0x02
// evListTimeout is the flag to indicate the event is in the timeout event heap.
evListTimeout = 0x04
)

// eventPriority is the priority of the event.
Expand All @@ -48,34 +48,28 @@ type eventPriority uint8
type Event struct {
// base is the event base of the event.
base *EventBase

// ele is the element in the total event list.
ele *element[*Event]
ele element[*Event]
// activeEle is the element in the active event list.
activeEle *element[*Event]
activeEle element[*Event]
// index is the index in the event heap.
index int

// fd is the file descriptor to watch.
fd int
// events is the events to watch. Such as EvRead or EvWrite.
events uint32

// cb is the callback function when the event is triggered.
cb func(fd int, events uint32, arg any)
// arg is the argument passed to the callback function.
arg any

// res is the result passed to the callback function.
res uint32
// flags is the status of the event in the event list.
flags int

// timeout is the timeout of the event.
timeout time.Duration
// deadline is the deadline for the event.
deadline time.Time

// priority is the priority of the event.
priority eventPriority
}
Expand All @@ -100,8 +94,8 @@ func (ev *Event) Assign(base *EventBase, fd int, events uint32, callback func(fd
ev.flags = 0
ev.timeout = 0
ev.deadline = time.Time{}
ev.ele = nil
ev.activeEle = nil
ev.ele = element[*Event]{}
ev.activeEle = element[*Event]{}
ev.index = -1
}

Expand All @@ -112,13 +106,10 @@ func (ev *Event) Attach(timeout time.Duration) error {
if ev.events&(EvRead|EvWrite|EvTimeout) == 0 {
return ErrEventInvalid
}

if ev.flags&evListInserted != 0 {
return ErrEventExists
}

ev.timeout = timeout

return ev.base.addEvent(ev)
}

Expand All @@ -128,7 +119,6 @@ func (ev *Event) Detach() error {
if ev.flags&evListInserted == 0 {
return ErrEventNotExists
}

return ev.base.delEvent(ev)
}

Expand Down Expand Up @@ -164,8 +154,8 @@ func (ev *Event) SetPriority(priority eventPriority) {

// EventBase is the base of all events.
type EventBase struct {
// poller is the event poller to watch events.
poller *poller
// poll is the event poller to watch events.
poll *poll
// evList is the list of all events.
evList *list[*Event]
// activeEvList is the list of active events.
Expand All @@ -178,38 +168,32 @@ type EventBase struct {

// NewBase creates a new event base.
func NewBase() (*EventBase, error) {
poller, err := newPoller()
bs := new(EventBase)
p, err := openPoll()
if err != nil {
return nil, err
}

return &EventBase{
poller: poller,
evList: newList[*Event](),
activeEvLists: []*list[*Event]{newList[*Event](), newList[*Event](), newList[*Event]()},
evHeap: new(eventHeap),
nowTimeCache: time.Time{},
}, nil
bs.poll = p
bs.evList = newList[*Event]()
bs.activeEvLists = []*list[*Event]{newList[*Event](), newList[*Event](), newList[*Event]()}
bs.evHeap = new(eventHeap)
bs.nowTimeCache = time.Time{}
return bs, nil
}

// Loop loops events.
// If flags is EvLoopOnce, it will block until an event is triggered. Then it will exit.
// If flags is EvLoopNoblock, it will not block.
func (bs *EventBase) Loop(flags int) error {
bs.clearTimeCache()

for {
err := bs.poller.polling(bs.onActive, bs.waitTime(flags&EvLoopNoblock != 0))
err := bs.poll.wait(bs.onActive, bs.waitTime(flags&EvLoopNoblock != 0))
if err != nil {
return err
}

bs.updateTimeCache()

bs.onTimeout()

bs.handleActiveEvents()

if flags&EvLoopOnce != 0 {
return nil
}
Expand All @@ -222,9 +206,9 @@ func (bs *EventBase) Dispatch() error {
return bs.Loop(0)
}

// Shutdown breaks event loop and close the poller.
// Shutdown breaks event loop and close the poll.
func (bs *EventBase) Shutdown() error {
return bs.poller.close()
return bs.poll.close()
}

// addEvent adds an event to the event base.
Expand All @@ -233,28 +217,21 @@ func (bs *EventBase) addEvent(ev *Event) error {
ev.deadline = bs.now().Add(ev.timeout)
bs.eventQueueInsert(ev, evListTimeout)
}

bs.eventQueueInsert(ev, evListInserted)

if ev.events&(EvRead|EvWrite) != 0 {
return bs.poller.add(ev)
return bs.poll.add(ev)
}

return nil
}

// delEvent deletes an event from the event base.
func (bs *EventBase) delEvent(ev *Event) error {
bs.eventQueueRemove(ev, evListTimeout)

bs.eventQueueRemove(ev, evListActive)

bs.eventQueueRemove(ev, evListInserted)

if ev.events&(EvRead|EvWrite) != 0 {
return bs.poller.del(ev)
return bs.poll.del(ev)
}

return nil
}

Expand All @@ -279,9 +256,7 @@ func (bs *EventBase) onTimeout() {
if ev.deadline.After(now) {
break
}

bs.eventQueueRemove(ev, evListTimeout)

bs.onActive(ev, EvTimeout)
}
}
Expand All @@ -291,7 +266,6 @@ func (bs *EventBase) onActive(ev *Event, res uint32) {
ev.res |= res
return
}

ev.res = res
bs.eventQueueInsert(ev, evListActive)
}
Expand All @@ -312,7 +286,6 @@ func (bs *EventBase) handleActiveEvents() {
bs.delEvent(ev)
}
e = next

ev.cb(ev.fd, ev.res, ev.arg)
}
}
Expand All @@ -322,29 +295,27 @@ func (bs *EventBase) eventQueueInsert(ev *Event, which int) {
if ev.flags&which != 0 {
return
}

ev.flags |= which
switch which {
case evListInserted:
ev.ele = bs.evList.pushBack(ev)
bs.evList.pushBack(ev, &ev.ele)
case evListActive:
ev.activeEle = bs.activeEvLists[ev.priority].pushBack(ev)
bs.activeEvLists[ev.priority].pushBack(ev, &ev.activeEle)
case evListTimeout:
ev.index = bs.evHeap.pushEvent(ev)
bs.evHeap.pushEvent(ev)
}
}

func (bs *EventBase) eventQueueRemove(ev *Event, which int) {
if ev.flags&which == 0 {
return
}

ev.flags &^= which
switch which {
case evListInserted:
bs.evList.remove(ev.ele)
bs.evList.remove(&ev.ele)
case evListActive:
bs.activeEvLists[ev.priority].remove(ev.activeEle)
bs.activeEvLists[ev.priority].remove(&ev.activeEle)
case evListTimeout:
bs.evHeap.removeEvent(ev.index)
}
Expand Down
31 changes: 14 additions & 17 deletions kqueue.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,26 +18,26 @@ const (
maxNEvent = 0x1000
)

type poller struct {
type poll struct {
fd int
changes []syscall.Kevent_t
events []syscall.Kevent_t
}

func newPoller() (*poller, error) {
func openPoll() (*poll, error) {
fd, err := syscall.Kqueue()
if err != nil {
return nil, err
}

return &poller{
return &poll{
fd: fd,
changes: make([]syscall.Kevent_t, initialNEvent),
events: make([]syscall.Kevent_t, initialNEvent),
}, nil
}

func (kq *poller) add(ev *Event) error {
func (kq *poll) add(ev *Event) error {
ET := uint16(0)
if ev.events&EvET != 0 {
ET = syscall.EV_CLEAR
Expand All @@ -61,7 +61,7 @@ func (kq *poller) add(ev *Event) error {
return nil
}

func (kq *poller) del(ev *Event) error {
func (kq *poll) del(ev *Event) error {
if ev.events&EvRead != 0 {
kq.changes = append(kq.changes, syscall.Kevent_t{
Ident: uint64(ev.fd),
Expand All @@ -79,8 +79,14 @@ func (kq *poller) del(ev *Event) error {
return nil
}

func (kq *poller) polling(cb func(ev *Event, res uint32), timeout time.Duration) error {
n, err := syscall.Kevent(kq.fd, kq.changes, kq.events, timespec(timeout))
func (kq *poll) wait(cb func(ev *Event, res uint32), timeout time.Duration) error {
var timespec *syscall.Timespec = nil
if timeout >= 0 {
ts := syscall.NsecToTimespec(timeout.Nanoseconds())
timespec = &ts
}

n, err := syscall.Kevent(kq.fd, kq.changes, kq.events, timespec)
if err != nil && !temporaryErr(err) {
return err
}
Expand Down Expand Up @@ -117,15 +123,6 @@ func (kq *poller) polling(cb func(ev *Event, res uint32), timeout time.Duration)
return nil
}

func (kq *poller) close() error {
func (kq *poll) close() error {
return syscall.Close(kq.fd)
}

func timespec(d time.Duration) *syscall.Timespec {
if d >= 0 {
ts := syscall.NsecToTimespec(d.Nanoseconds())
return &ts
} else {
return nil
}
}
Loading

0 comments on commit 56ccc6b

Please sign in to comment.