[What]Linux 定时器

在用户空间很少用定时器,再来加深一下理解。

  1. socket 选项 SO_RECVTIMEO 和 SO_SNDTIMEO
  2. SIGALRM 信号
  3. I/O 复用系统调用的超时参数

socket 选项 SO_RECVTIMEO 和 SO_SNDTIMEO

SO_RECVTIMEO 和 SO_SNDTIMEO 分别对应设置接收和发送超时。

系统调用 有效选项 系统调用超时后的行为
send SO_SNDTIMEO 返回 -1,errno 的值为 EAGAIN 或 EWOULDBLOCK
sendmsg SO_SNDTIMEO 返回 -1,errno 的值为 EAGAIN 或 EWOULDBLOCK
recv SO_RCVTIMEO 返回 -1,errno 的值为 EAGAIN 或 EWOULDBLOCK
recvmsg SO_RCVTIMEO 返回 -1,errno 的值为 EAGAIN 或 EWOULDBLOCK
accept SO_RCVTIMEO 返回 -1,errno 的值为 EAGAIN 或 EWOULDBLOCK
connect SO_SNDTIMEO 返回 -1,errno 的值为 EINPROGRESS

如下所示为 socket 使用 connect 超时后的效果:

#include <sys/types.h>
#include <sys/socket.h>
#include <netinet/in.h>
#include <arpa/inet.h>
#include <stdlib.h>
#include <assert.h>
#include <stdio.h>
#include <errno.h>
#include <fcntl.h>
#include <unistd.h>
#include <string.h>

int main(int argc, char *argv[])
{

int ret = 0;
if(argc != 3)
{
printf("usage: %s <ip> <port>\n", argv[0]);

return -1;
}

const char *ip = argv[1];
int port = atoi(argv[2]);

struct sockaddr_in address;

memset(&address, 0, sizeof(address));
address.sin_family = AF_INET;
inet_pton(AF_INET, ip, &address.sin_addr);
address.sin_port = htons(port);

int sock_fd = socket(address.sin_family, SOCK_STREAM, 0);
assert(sock_fd > 0);

struct timeval timeout;

timeout.tv_sec = 5;
timeout.tv_usec = 0;
socklen_t len = sizeof(timeout);

ret = setsockopt(sock_fd, SOL_SOCKET, SO_SNDTIMEO, &timeout, len);
assert(ret == 0);

ret = connect(sock_fd, (struct sockaddr *)&address, sizeof(address));
if(ret == -1)
{
if(errno == EINPROGRESS)
{
printf("connecting timeout\n");
return -1;
}
perror("connect failed:");

return -1;
}

return 0;
}

sigalrm 信号

alarmsetitimer 函数设置的闹钟一旦超时,将会触发 SIGALRM 信号, 为了处理多个定时任务,就需要不断的触发此信号。

基于升序链表的定时器

如果将一个定时器作为一个对象,那么这个对象将可能会包含:

  • 超时时间
  • 超时后执行的任务回调函数
  • 定时器是单次还是循环控制字
  • 函数被执行时需要传入的参数

如下为一个升序定时器链表(定时器超时时间是依次按顺序排列的):

#ifndef __LIST_TIMER__
#define __LIST_TIMER__
#include <time.h>
#include <sys/types.h>
#include <sys/socket.h>
#include <netinet/in.h>
#include <arpa/inet.h>
#include <stdio.h>

#define BUFFER_SIZE 64

class util_timer;

struct client_data
{
sockaddr_in address;
int sockfd;
char buf[BUFFER_SIZE];
util_timer *timer;
};

class util_timer
{
public:
util_timer():prev(NULL), next(NULL){}

time_t expire;
void (*cb_func)(client_data* data);
client_data *user_data;
util_timer *prev;
util_timer *next;
};

class sort_timer_list
{
public:
sort_timer_list():head(NULL), tail(NULL){}
~sort_timer_list()
{
util_timer *tmp = head;
while(tmp)
{
head = tmp->next;
delete tmp;
tmp = head;
}
}

void add_timer(util_timer *timer)
{

if(!timer)
{
return;
}
if(!head)
{
head = tail = timer;
return;
}

if(timer->expire < head->expire)
{
timer->next = head;
head->prev = timer;
head = timer;
return;
}
add_timer(timer, head);
}

void adjust_timer(util_timer *timer)
{

if(!timer)
{
return;
}

util_timer *tmp = timer->next;
if((!tmp) || (timer->expire < tmp->expire))
{
return;
}
if(timer == head)
{
head = head->next;
head->prev = NULL;
timer->next = NULL;
add_timer(timer, head);
}
else
{
timer->prev->next = timer->next;
timer->next->prev = timer->prev;

add_timer(timer, timer->next);
}
}

void del_timer(util_timer *timer)
{

if(!timer)
{
return;
}

if((timer == head) && (timer == tail))
{
delete timer;
head = NULL;
tail = NULL;
return;
}

if(timer == head)
{
head = head->next;
head->prev = NULL;
delete timer;

return;
}

if(timer == tail)
{
tail = tail->prev;
tail->next = NULL;
delete timer;
return;
}

timer->prev->next = timer->next;
timer->next->prev = timer->prev;
delete timer;
}

void tick()
{

if(!head)
{
return;
}

printf("timer tick!\n");

time_t cur = time(NULL);
util_timer *tmp = head;

while(tmp)
{
if(cur < tmp->expire)
{
break;
}

tmp->cb_func(tmp->user_data);
head = tmp->next;
if(head)
{
head->prev = NULL;
}
delete tmp;
tmp = head;
}
}
private:
void add_timer(util_timer *timer, util_timer *list_head)
{

util_timer *prev = list_head;
util_timer *tmp = prev->next;

while(tmp)
{
if(timer->expire < tmp->expire)
{
prev->next = timer;
timer->next = tmp;
tmp->prev = timer;
timer->prev = prev;
break;
}
prev = tmp;
tmp = tmp->next;
}

if(!tmp)
{
prev->next = timer;
timer->prev = prev;
timer->next = NULL;
tail = timer;
}
}
util_timer *head;
util_timer *tail;
};
#endif

处理非活动连接

服务器通常要定期处理非活动连接,在 Linux 中可以通过开启 KEEPALIVE 选项来定期检查连接是否处于活动状态。

也可以使用定时器来周期性的检查连接:

#include <sys/types.h>
#include <sys/socket.h>
#include <netinet/in.h>
#include <arpa/inet.h>
#include <assert.h>
#include <stdio.h>
#include <signal.h>
#include <unistd.h>
#include <errno.h>
#include <string.h>
#include <fcntl.h>
#include <stdlib.h>
#include <sys/epoll.h>
#include <pthread.h>
#include "list_timer.h"

#define FD_LIMIT (65535)
#define MAX_EVENT_NUMBER (1024)
#define TIMESLOT (5)

static int pipefd[2];
static sort_timer_list timer_list;
static int epoll_fd = 0;

int setnonblocking(int fd)
{

int old_opt = fcntl(fd, F_GETFL);
int new_opt = old_opt | O_NONBLOCK;
fcntl(fd, F_SETFL, new_opt);

return old_opt;
}
void addfd(int epoll_fd, int fd)
{

struct epoll_event event;

event.data.fd = fd;
event.events = EPOLLIN | EPOLLET;
epoll_ctl(epoll_fd, EPOLL_CTL_ADD, fd, &event);

setnonblocking(fd);
}
void sig_handler(int sig)
{

int save_errno = errno;
int msg = sig;
send(pipefd[1], (char *)&msg, 1, 0);
errno = save_errno;
}
void addsig(int sig)
{

struct sigaction sa;

memset(&sa, 0, sizeof(sa));
sa.sa_handler = sig_handler;
sa.sa_flags |= SA_RESTART;
sigfillset(&sa.sa_mask);

int ret = sigaction(sig, &sa, NULL);
assert(ret != -1);
}
void timer_handler()
{

timer_list.tick();
alarm(TIMESLOT);
}
void cb_func(client_data *user_data)
{

epoll_ctl(epoll_fd, EPOLL_CTL_DEL, user_data->sockfd, 0);

assert(user_data);
close(user_data->sockfd);
printf("close fd %d\n", user_data->sockfd);
}
int main(int argc, char *argv[])
{

if(argc != 2)
{
printf("usage: %s <port>\n", argv[0]);

return -1;
}

int port = atoi(argv[1]);
int ret = 0;
struct sockaddr_in address;

address.sin_family = AF_INET;
address.sin_addr.s_addr = htonl(INADDR_ANY);
address.sin_port = htons(port);

int listenfd = socket(address.sin_family, SOCK_STREAM, 0);
assert(listenfd > 0);

ret = bind(listenfd, (struct sockaddr *)&address, sizeof(address));
assert(ret == 0);

ret = listen(listenfd, 5);
assert(ret == 0);

struct epoll_event events[MAX_EVENT_NUMBER];
epoll_fd = epoll_create(5);
assert(epoll_fd > 0);

addfd(epoll_fd, listenfd);

ret = socketpair(PF_UNIX, SOCK_STREAM, 0, pipefd);
assert(ret != -1);
setnonblocking(pipefd[1]);
addfd(epoll_fd, pipefd[0]);

addsig(SIGALRM);
addsig(SIGTERM);

bool stop_server = false;
client_data *users = new client_data[FD_LIMIT];
bool timeout = false;
alarm(TIMESLOT);

while(!stop_server)
{
int number = epoll_wait(epoll_fd, events, MAX_EVENT_NUMBER, -1);

if((number < 0) && (errno != EINTR))
{
perror("epoll failed:");
break;
}

for(int i = 0; i < number; i++)
{
int sockfd = events[i].data.fd;
if(sockfd == listenfd)
{
struct sockaddr_in client_addr;
socklen_t client_addrlen = sizeof(client_addr);

int connfd = accept(listenfd, (struct sockaddr *)&client_addr, &client_addrlen);
printf("client : %s -> %d\n", inet_ntoa(client_addr.sin_addr),ntohs(client_addr.sin_port));

addfd(epoll_fd, connfd);
users[connfd].address = client_addr;
users[connfd].sockfd = connfd;

util_timer *timer = new util_timer;
timer->user_data = &users[connfd];
timer->cb_func = cb_func;
time_t cur = time(NULL);
timer->expire = cur + 3 * TIMESLOT;
users[connfd].timer = timer;
timer_list.add_timer(timer);
}
else if((sockfd == pipefd[0]) && (events[i].events & EPOLLIN))
{
int sig;
char signals[1024];

ret = recv(pipefd[0], signals, sizeof(signals), 0);
if(ret == -1)
{
continue;
}
else if(ret == 0)
{
continue;
}
else
{
for( int i = 0; i < ret; ++i)
{
switch(signals[i])
{
case SIGALRM:
{
timeout = true;
}break;
case SIGTERM:
{
stop_server = true;
}
}
}
}
}
else if(events[i].events & EPOLLIN)
{
memset(users[sockfd].buf, 0, BUFFER_SIZE);
ret = recv(sockfd, users[sockfd].buf, BUFFER_SIZE - 1, 0);
printf("client data: %s\n", users[sockfd].buf);

util_timer *timer = users[sockfd].timer;
if(ret < 0)
{
if(errno != EAGAIN)
{
cb_func(&users[sockfd]);
if(timer)
{
timer_list.del_timer(timer);
}
}
}
else if(ret == 0)
{
cb_func(&users[sockfd]);
if(timer)
{
timer_list.del_timer(timer);
}
}
else
{
if(timer)
{
time_t cur = time(NULL);
timer->expire = cur + 3 * TIMESLOT;
printf("adjust timer once\n");
timer_list.adjust_timer(timer);
}
}
}
else
{

}
}

if(timeout)
{
timer_handler();
timeout = false;
}
}

close(listenfd);
close(pipefd[1]);
close(pipefd[0]);

delete [] users;

return 0;
}

I/O 复用系统调用的超时参数

I/O 复用函数也可以设置超时参数,但由于 I/O 复用完全可能在超时时间到期之前就返回,所以如果要使用它们来定时,就需要不断更新定时参数以反映剩余时间。

其基本思路是:

  1. 在调用 I/O 复用函数前获取当前时间
  2. 在 I/O 复用函数返回后,得到当前时间
  3. 根据二者时间的差值来判断超时时间是否达到,如果达到则进行对应的处理,如果没有到则继续进行剩余时间的 I/O 复用。
Last Updated 2019-12-22 日 22:10.
Render by hexo-renderer-org with Emacs 26.3 (Org mode 9.1.14)