Skip to content

Commit

Permalink
Optimize IO handling by draining FDs on every wake-up.
Browse files Browse the repository at this point in the history
A single IO wake-up can correspond to multiple actual IO events/waiting IO.
Currently, after handling a single event we go back to waiting on the FD, where we will
be immediatly woke again because of the already waiting IO.

This increases context switches and can increase latency.

By handling all the IO possible on every wakeup before waiting again we can reduce both of these.
  • Loading branch information
rrb3942 committed Nov 17, 2022
1 parent 950a6b6 commit d9e3fbe
Show file tree
Hide file tree
Showing 4 changed files with 55 additions and 42 deletions.
37 changes: 20 additions & 17 deletions ipc.c
Original file line number Diff line number Diff line change
Expand Up @@ -278,26 +278,29 @@ void ipc_handle_job(int fd)
ipc_job job;
int n;

/* read one IPC job from the pipe; even if the read is blocking,
* we are here triggered from the reactor, on a READ event, so
* we shouldn;t ever block */
n = read(fd, &job, sizeof(job) );
if (n==-1) {
if (errno==EAGAIN || errno==EINTR || errno==EWOULDBLOCK )
//Process all jobs until handle is drained
while (1) {
/* read one IPC job from the pipe; even if the read is blocking,
* we are here triggered from the reactor, on a READ event, so
* we shouldn;t ever block */
n = read(fd, &job, sizeof(job) );
if (n==-1) {
if (errno==EAGAIN || errno==EINTR || errno==EWOULDBLOCK )
return;
LM_ERR("read failed:[%d] %s\n", errno, strerror(errno));
return;
LM_ERR("read failed:[%d] %s\n", errno, strerror(errno));
return;
}
}

LM_DBG("received job type %d[%s] from process %d\n",
job.handler_type, ipc_handlers[job.handler_type].name, job.snd_proc);
LM_DBG("received job type %d[%s] from process %d\n",
job.handler_type, ipc_handlers[job.handler_type].name, job.snd_proc);

/* custom handling for RPC type */
if (job.handler_type==ipc_rpc_type) {
((ipc_rpc_f*)job.payload1)( job.snd_proc, job.payload2);
} else {
/* generic registered type */
ipc_handlers[job.handler_type].func( job.snd_proc, job.payload1);
/* custom handling for RPC type */
if (job.handler_type==ipc_rpc_type) {
((ipc_rpc_f*)job.payload1)( job.snd_proc, job.payload2);
} else {
/* generic registered type */
ipc_handlers[job.handler_type].func( job.snd_proc, job.payload1);
}
}

return;
Expand Down
11 changes: 9 additions & 2 deletions net/net_udp.c
Original file line number Diff line number Diff line change
Expand Up @@ -292,8 +292,11 @@ inline static int handle_io(struct fd_map* fm, int idx,int event_type)

switch(fm->type){
case F_UDP_READ:
n = protos[((struct socket_info*)fm->data)->proto].net.
read( fm->data /*si*/, &read);
do {
n = protos[((struct socket_info*)fm->data)->proto].net.
read( fm->data /*si*/, &read);
//Continue reading packets until we get an error
} while (n == 0);
break;
case F_TIMER_JOB:
handle_timer_job();
Expand Down Expand Up @@ -327,6 +330,10 @@ inline static int handle_io(struct fd_map* fm, int idx,int event_type)
post_run_handle_script_reload();

pt_become_idle();

if (n == 1) {
n = 0;
}
return n;
}

Expand Down
6 changes: 3 additions & 3 deletions net/proto_udp/proto_udp.c
Original file line number Diff line number Diff line change
Expand Up @@ -135,9 +135,9 @@ static int udp_read_req(struct socket_info *si, int* bytes_read)
/* coverity[overrun-buffer-arg: FALSE] - union has 28 bytes, CID #200029 */
len=recvfrom(bind_address->socket, buf, BUF_SIZE,0,&ri.src_su.s,&fromlen);
if (len==-1){
if (errno==EAGAIN)
return 0;
if ((errno==EINTR)||(errno==EWOULDBLOCK)|| (errno==ECONNREFUSED))
if (errno==EAGAIN || errno==EWOULDBLOCK || errno==EINTR)
return 1;
if (errno==ECONNREFUSED)
return -1;
LM_ERR("recvfrom:[%d] %s\n", errno, strerror(errno));
return -2;
Expand Down
43 changes: 23 additions & 20 deletions timer.c
Original file line number Diff line number Diff line change
Expand Up @@ -840,32 +840,35 @@ void handle_timer_job(void)
struct os_timer *t;
ssize_t l;

/* read one "os_timer" pointer from the pipe (non-blocking) */
l = read( timer_fd_out, &t, sizeof(t) );
if (l==-1) {
if (errno==EAGAIN || errno==EINTR || errno==EWOULDBLOCK )
/* Read events until epipe is empty */
while(1) {
/* read one "os_timer" pointer from the pipe (non-blocking) */
l = read( timer_fd_out, &t, sizeof(t) );
if (l==-1) {
if (errno==EAGAIN || errno==EINTR || errno==EWOULDBLOCK )
return;
LM_ERR("read failed:[%d] %s\n", errno, strerror(errno));
return;
LM_ERR("read failed:[%d] %s\n", errno, strerror(errno));
return;
}
}

/* run the handler */
if (t->flags&TIMER_FLAG_IS_UTIMER) {
/* run the handler */
if (t->flags&TIMER_FLAG_IS_UTIMER) {

if (t->trigger_time<(*ijiffies-ITIMER_TICK) )
LM_WARN("utimer job <%s> has a %lld us delay in execution\n",
t->label, *ijiffies-t->trigger_time);
t->u.utimer_f( t->time , t->t_param);
t->trigger_time = 0;
if (t->trigger_time<(*ijiffies-ITIMER_TICK) )
LM_WARN("utimer job <%s> has a %lld us delay in execution\n",
t->label, *ijiffies-t->trigger_time);
t->u.utimer_f( t->time , t->t_param);
t->trigger_time = 0;

} else {
} else {

if (t->trigger_time<(*ijiffies-ITIMER_TICK) )
LM_WARN("timer job <%s> has a %lld us delay in execution\n",
t->label, *ijiffies-t->trigger_time);
t->u.timer_f( (unsigned int)t->time , t->t_param);
t->trigger_time = 0;
if (t->trigger_time<(*ijiffies-ITIMER_TICK) )
LM_WARN("timer job <%s> has a %lld us delay in execution\n",
t->label, *ijiffies-t->trigger_time);
t->u.timer_f( (unsigned int)t->time , t->t_param);
t->trigger_time = 0;

}
}

return;
Expand Down

0 comments on commit d9e3fbe

Please sign in to comment.