-
Notifications
You must be signed in to change notification settings - Fork 165
/
polling.go
202 lines (179 loc) · 4.62 KB
/
polling.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
package zmq4
/*
#include <zmq.h>
#include "zmq4.h"
*/
import "C"
import (
"fmt"
"time"
)
// Return type for (*Poller)Poll
type Polled struct {
Socket *Socket // socket with matched event(s)
Events State // actual matched event(s)
}
type Poller struct {
items []C.zmq_pollitem_t
socks []*Socket
}
// Create a new Poller
func NewPoller() *Poller {
return &Poller{
items: make([]C.zmq_pollitem_t, 0),
socks: make([]*Socket, 0),
}
}
// Add items to the poller
//
// Events is a bitwise OR of zmq.POLLIN and zmq.POLLOUT
//
// Returns the id of the item, which can be used as a handle to
// (*Poller)Update and as an index into the result of (*Poller)PollAll
func (p *Poller) Add(soc *Socket, events State) int {
var item C.zmq_pollitem_t
item.socket = soc.soc
item.fd = 0
item.events = C.short(events)
p.items = append(p.items, item)
p.socks = append(p.socks, soc)
return len(p.items) - 1
}
// Update the events mask of a socket in the poller
//
// Replaces the Poller's bitmask for the specified id with the events parameter passed
//
// Returns the previous value, or ErrorNoSocket if the id was out of range
func (p *Poller) Update(id int, events State) (previous State, err error) {
if id >= 0 && id < len(p.items) {
previous = State(p.items[id].events)
p.items[id].events = C.short(events)
return previous, nil
}
return 0, ErrorNoSocket
}
// Update the events mask of a socket in the poller
//
// Replaces the Poller's bitmask for the specified socket with the events parameter passed
//
// Returns the previous value, or ErrorNoSocket if the socket didn't match
func (p *Poller) UpdateBySocket(soc *Socket, events State) (previous State, err error) {
for id, s := range p.socks {
if s == soc {
previous = State(p.items[id].events)
p.items[id].events = C.short(events)
return previous, nil
}
}
return 0, ErrorNoSocket
}
// Remove a socket from the poller
//
// Returns ErrorNoSocket if the id was out of range
func (p *Poller) Remove(id int) error {
if id >= 0 && id < len(p.items) {
if id == len(p.items)-1 {
p.items = p.items[:id]
p.socks = p.socks[:id]
} else {
p.items = append(p.items[:id], p.items[id+1:]...)
p.socks = append(p.socks[:id], p.socks[id+1:]...)
}
return nil
}
return ErrorNoSocket
}
// Remove a socket from the poller
//
// Returns ErrorNoSocket if the socket didn't match
func (p *Poller) RemoveBySocket(soc *Socket) error {
for id, s := range p.socks {
if s == soc {
return p.Remove(id)
}
}
return ErrorNoSocket
}
/*
Input/output multiplexing
If timeout < 0, wait forever until a matching event is detected
Only sockets with matching socket events are returned in the list.
Example:
poller := zmq.NewPoller()
poller.Add(socket0, zmq.POLLIN)
poller.Add(socket1, zmq.POLLIN)
// Process messages from both sockets
for {
sockets, _ := poller.Poll(-1)
for _, socket := range sockets {
switch s := socket.Socket; s {
case socket0:
msg, _ := s.Recv(0)
// Process msg
case socket1:
msg, _ := s.Recv(0)
// Process msg
}
}
}
*/
func (p *Poller) Poll(timeout time.Duration) ([]Polled, error) {
return p.poll(timeout, false)
}
/*
This is like (*Poller)Poll, but it returns a list of all sockets,
in the same order as they were added to the poller,
not just those sockets that had an event.
For each socket in the list, you have to check the Events field
to see if there was actually an event.
When error is not nil, the return list contains no sockets.
*/
func (p *Poller) PollAll(timeout time.Duration) ([]Polled, error) {
return p.poll(timeout, true)
}
func (p *Poller) poll(timeout time.Duration, all bool) ([]Polled, error) {
lst := make([]Polled, 0, len(p.items))
if len(p.items) == 0 {
return lst, nil
}
var ctx *Context
for _, soc := range p.socks {
if !soc.opened {
return lst, ErrorSocketClosed
}
// assume all sockets have the same context
ctx = soc.ctx
}
t := timeout
if t > 0 {
t = t / time.Millisecond
}
if t < 0 {
t = -1
}
var rv C.int
var err error
for {
rv, err = C.zmq4_poll(&p.items[0], C.int(len(p.items)), C.long(t))
if rv >= 0 || ctx == nil || !ctx.retry(err) {
break
}
}
if rv < 0 {
return lst, errget(err)
}
for i, it := range p.items {
if all || it.events&it.revents != 0 {
lst = append(lst, Polled{p.socks[i], State(it.revents)})
}
}
return lst, nil
}
// Poller as string.
func (p *Poller) String() string {
str := make([]string, 0)
for i, poll := range p.items {
str = append(str, fmt.Sprintf("%v%v", p.socks[i], State(poll.events)))
}
return fmt.Sprint("Poller", str)
}