[What]Linux I/O 异步

参考链接: I/O模式详解

参考宋宝华老师的书 Linux设备驱动开发详解 ,来理解I/O的异步概念。

异步通知最大的特点就是: 当应用程序向内核发送请求,内核资源满足要求后, 由内核主动向应用程序发送通知 ,类似于中断。

除了异步通知外,应用还可以在发起I/O请求后立即返回。之后再查询I/O完成情况,或者I/O完成后被调回,这个过程叫做异步I/O。

异步通知编程

注意 : 当一个驱动代码应用于多个设备时,也就是同一份驱动对应的多个设备发出异步通知的情况下应用程序端无法区分是哪个设备发出了消息。

  • 这种情况下还是只有使用Poll和select机制。

Linux 中可用的信号如下表:

信号 含义
SIGHUP 1 挂起
SIGINT 2 终端中断(Ctrl + C 发出)
SIGQUIT 3 终端退出
SIGILL 4 无效命令
SIGTRAP 5 跟踪陷进
SIGIOT 6 IOT陷阱
SIGBUS 7 BUS错误
SIGFPE 8 浮点异常
SIGKILL 9 强行终止(不能被捕获或忽略)
SIGUSR1 10 用户定义信号1
SIGSEGV 11 无效的内存段处理
SIGUSR2 12 用户定义信号2
SIGPIPE 13 半关闭管道的写操作已经发生
SIGALRM 14 计时器到期
SIGTERM 15 终止
SIGSTKFLT 16 堆栈错误
SIGCHLD 17 子进程已经停止或退出
SIGCONT 18 如果停止了,继续执行
SIGSTOP 19 停止执行(不能被捕获或忽略)
SIGSTP 20 终端停止信号
SIGTTIN 21 后台进程需要从终端读取输入
SIGTTOU 22 后台进程需要从终端写出
SIGURG 23 紧急的套接字时间
SIGXCPU 24 超额使用CPU分配时间
SIGXFSZ 25 文件尺寸超额
SIGVTALRM 26 虚拟时钟信号
SIGPROF 27 时钟信号描述
SIGWINCH 28 串口尺寸变化
SIGIO 29 I/O
SIGPWR 30 断点重启

一个信号被捕获的意思是当一个信号到达时,有相应的代码处理它。如果一个信号没有被这个进程所捕获,内核将采用默认行为处理。

信号的接收

void (*signal (int signum, void (*handler))(int))) (int);
//等价于
typedef void (*sighandler_t)(int);
/**
* @brief 绑定信号处理函数
* @param signum: 需要接收的信号值
* @param hander: 对应的处理函数
* 若为 SIG_IGN 则表示忽略信号
* 若为 SIG_DFL 则表示采用系统默认方式处理
* @ret 返回最后一次为信号 signum绑定的处理函数的handler值
* 失败则返回 SIG_ERR
*/

sighandler_t signal(int signum, sighandler_t handler));

/**
* @brief 改变进程接收到特定信号后的行为
* @param act : 新动作,为NULL代表以缺省方式处理
* @param oldact: 老动作
* @note 如果 act 和 oldact 都为NULL,那么可以用于检查信号有效性
*/

int sigaction(int signum, const struct sigaction *act, struct sigaction *oldact);

捕捉 SIGINT 信号和 SIGTERM 信号:

void sigterm_handler(int signo)
{

printf("Have caught sig N.O. %d\n", signo);

exit(0);
}
int main(void)
{

signal(SIGINT, sigterm_handler);
signal(SIGTERM,sigterm_handler);
while(1);

return 0;
}

示例:接收标准输入信号并读取内容

#include <sys/types.h>
#include <sys/stat.h>
#include <stdio.h>
#include <fcntl.h>
#include <signal.h>
#include <unistd.h>

#define MAX_LEN 100

void input_handler(int num)
{

char data[MAX_LEN];
int len;

len = read(STDIN_FILENO, &data, MAX_LEN);
data[len] = 0;
printf("input available:%s\n", data);
}
int main()
{

int oflags;

//1. 绑定处理函数
signal(SIGIO, input_handler);
//2. 此进程为STDIN_FILENO 文件拥有者
fcntl(STDIN_FILENO, F_SETOWN, getpid());
//3. 启动异步机制
oflags = fcntl(STDIN_FILENO, F_GETFL);
fcntl(STDIN_FILENO, F_SETFL, oflags | FASYNC);

while(1);
}

信号的释放

为了使设备支持异步通知机制,驱动程序中涉及3项工作:

  1. 支持 F_SETTOWN 命令,能在这个控制命令处理中设置 filp->f_owner 为对应进程ID。
    • 此项工作已由内核完成,驱动无须处理。
  2. 支持 F_SETFL 命令的处理,每当 FASYNC 标志改变时,驱动中的 fasync() 函数将得以执行。
  3. 在设备资源可获得时,调用 kill_fasync() 函数激发相应信号。
struct fasync_struct {
spinlock_t fa_lock;
int magic;
int fa_fd;
struct fasync_struct *fa_next; /* singly linked list */
struct file *fa_file;
struct rcu_head fa_rcu;
};

/**
* @brief 处理标志变更
*/

int fasync_helper(int fd, strut file *filp, int mode, struct fasync_struct **fa);

/**
* @brief 释放信号
*/

void kill_fasync(struct fasync_sturct **fa, int sig, int band);

模板:

static int xxx_fasync(int fd, struct file *filp, int mode)
{

struct xxx_dev *dev = filp->private_data;

return fasync_helper(fd, filp, mode, &dev->async_queue);
}

static ssize_t xxx_write(struct file *filp, const char __user *buf, size_t count,
loff_t *f_ops)

{

struct xxx_dev *dev = filp->private_data;

//产生异步读信号
if(dev->async_queue)
kill_fasync(&dev->async_queue, SIGIO, POLL_IN);
}

static int xxx_release(struct inode *inode, struct file *filp)
{

//将文件从异步通知列表中删除
xxx_fasync(-1, filp, 0);

return 0;
}

异步I/O

应用程序发起I/O动作后,直接开始执行,并不等待I/O结束,要么过一段时间来查询之前的I/O请求完成情况,要么I/O请求完成后自动被调用回调函数。 与异步通知不同的是,异步I/O是内核或库线程主动完成了数据搬移操作,异步通知是被通知后应用程序还要主动去数据搬移。

Linux的AIO有多种实现,其中一种是在用户空间的 glibc 库中实现, 本质上是借用了多线程模型,用开启新线程以同步的方法来做I/O,新的AIO辅助线程与发起AIO的线程以 pthread_cond_signal() 的形式进行线程间同步。

glibc

相关操作函数:

/**
* @brief 请求对一个有效文件描述符进行异步读操作
* @param aiocbp: 包含传输信息,用户空间缓冲区
*/

int aio_read(struct aiocb *aiocbp);

/**
* @brief 请求对一个有效文件描述符进行异步写操作
* @param aiocbp: 包含传输信息,用户空间缓冲区
*/

int aio_write(struct aiocb *aiocbp);

/**
* @brief 确定请求的状态
* @ret EINPROGRESS: 请求尚未完成
* ECANCELED: 请求被应用程序取消了
* -1 发生了错误
*/

int aio_error(struct aiocb *aiocbp);

/**
* @brief 获取请求返回
*/

ssize_t aio_return(struct aiocb *aiocbp);

/**
* @brief 阻塞调用进程,直到异步请求完成
* @param cblist: aiocb 请求列表
* @note 请求列表中任何一个完成都会导致此函数返回
*/

int aio_suspend(const struct aiocb *const cblist[], int n, const struct timespec *timeout);

/**
* @brief 取消对某个文件描述符执行一个或所有的I/O请求
* @param aiocbp: 为NULL时,表示取消所有请求
* @ret AIO_CANCELED: 处理中的请求已经被取消
* AIO_NOTCANCELED: 至少一个请求已经被完成(使用 aio_error() 来遍历哪些被取消)
* AIO_ALLDONE: 所有请求已经被完成了
*/

int aio_cancel(int fd, struct aiocb *aiocbp);

/**
* @brief 同时发起多个传输
* @param mode: LIO_WAIT: 阻塞进程,直到所有I/O都完成
* LIO_NOWAIT: 放入请求队列并理解返回
* @param list: aiocb列表
* @param nent:传输个数
*/

int lio_listio(int mode, struct aiocb *list[], int nent, struct sigevent *sig);

示例:异步读取

#include <aio.h>
...

int fd, ret;
struct aiocb my_aiocb;

fd = open("file.txt", O_RDONLY);
if(fd < 0)
perror("open");

//清零结构体
bzero(&my_aiocb, sizeof(struct aiocb));

my_aiocb.aio_buf = malloc(BUFSIZE + 1);
if(!my_aiocb.aio_buf)
perror("malloc");

my_aiocb.aio_fildes = fd;
my_aiocb.aio_nbytes = BUFSIZE;
my_aiocb.aio_offset = 0;

ret = aio_read(&my_aiocb);

if(ret < 0)
perror("aio_read");

//等待处理完成
while(aio_error(&my_aiocb) == EINPROGRESS)
continue;

if((ret = aio_return(&my_iocb)) > 0)
{
//获取异步读返回值
}
else
{
//读失败
}

aio_suspend()

struct aioct *cblist[MAX_LIST];

bzero((char *)cblist, sizeof(cblist));

cblist[0] = &my_aiocb;
ret = aio_read(&my_aiocb);
ret = aio_suspend(cblist, MAX_LIST, NULL);

lio_listio()

struct aiocb aiocb1, aiocb2;
struct aiocb *list[MAX_LIST];

aiocb1.aio_fildes = fd;
aiocb1.aio_buf = malloc(BUFSIZE + 1);
aiocb1.aio_nbytes = BUFSIZE;
aiocb1.aio_offset = next_offset;
aiocb1.aio_lio_opcode = LIO_READ;

...
bzero((char *)list, sizeof(list));

list[0] = &aiocb1;
list[1] = &aiocb2;
...

ret = lio_listio(LIO_WAIT, list, MAX_LIST, NULL);

内核

内核在2.6以后支持异步I/O,AIO可以一次性发出大量的read/write调用并通过通用块层的I/O调度来获得更好的性能。 用户程序也可以减少过多的同步负载,还可以在业务逻辑中更灵活地进行并发控制和负载均衡。

相较于glibc的用户空间多线程同步等实现也减少了线程的负载和上下文切换等。

对于网络设备而言,在socket层面上,也可以使用AIO,让CPU和网卡的收发动作充分交叠以改善吞吐性能。

在用户空间中,一般要结合 libaio 来进行内核AIO系统调用。

用户空间

int io_setup(int maxevents, io_context_t *ctxp);
int io_destory(io_context_t ctx);
//发送读写请求
int io_submit(io_context_t ctx, long nr, struct iocb *ios[]);
int io_cancel(io_context_t ctx, struct iocb *iocb, struct io_event *evt);
//获取完成事件
int io_getevents(io_context_t ctx_id, long min_nr, long nr, struct io_event *events,
struct timespec *timeout)
;

//设置完成时的回调
void io_set_callback(struct iocb *iocb, io_callback_t cb);
//写请求准备
void io_prep_pwrite(struct iocb *iocb, int fd, void buf,size_t count, long long offset);
//读请求准备
void io_prep_pread(struct iocb *iocb, int fd, void buf,size_t count, long long offset);
void io_prep_pwritev(struct iocb *iocb, int fd, const struct iovec *iov, int iovcnt, long long offset);
void io_prep_preadv(struct iocb *iocb, int fd, const struct iovec *iov, int iovcnt, long long offset);

示例:

#define _GNU_SOURCE
#include <stdio.h>
#include <unistd.h>
#include <fcntl.h>
#include <string.h>
#include <inttypes.h>
#include <stdlib.h>
#include <libaio.h>

#define BUF_SIZE 4096

int main(int argc , char **argv)
{

io_context_t ctx = 0;
struct iocb cb;
struct iocb *cbs[1];
unsigned char *buf;
struct io_event events[1];
int ret;
int fd;

if(argc < 2)
{
printf("the command format: aior [file name]\n");
exit(1);
}

fd = open(argv[1], O_RDWR | O_DIRECT);
if(fd < 0)
{
perror("open error");
goto err;
}

//allocate aligned memory
ret = posix_memalign((void **)&buf, 512, (BUF_SIZE + 1));
if(ret < 0)
{
perror("posix_memalign failed");
goto err1;
}
memset(buf, 0, BUF_SIZE + 1);

ret = io_setup(128, &ctx);
if(ret < 0)
{
printf("io_setup error:%s", strerror(-ret));
goto err2;
}

//setup I/O control block
io_prep_pread(&cb, fd, buf, BUF_SIZE, 0);

cbs[0] = &cb;

ret = io_submit(ctx, 1, cbs);
if(ret != 1)
{
if(ret < 0)
{
printf("io_submit error:%s", strerror(-ret));
}
else
{
fprintf(stderr, "could not submit IOs");
}
goto err3;
}

//get the reply
ret = io_getevents(ctx, 1, 1, events, NULL):
if(ret != 1)
{
if(ret < 0)
{
printf("io_getevents error:%s", strerror(-ret));
}
else
{
goto err3;
}
}

if(events[0].res2 == 0)
{
printf("%s\n", buf);
}
else
{
printf("AIO error: %s", strerror(-events[0].res));
goto err3;
}

if((ret = io_destory(ctx)) < 0)
{
printf("io_destory error: %s", strerror(-ret));
goto err2;
}

free(buf);
close(fd);
return 0;

err3:
if((ret = io_destory(ctx)) < 0)
{
printf("io_destory error : %s", strerror(-ret));
}
err2:
free(buf);
err1:
close(fd);
err:
return -1;
}

使用场景

  • 当一个简单的任务仅是读写一个对象时,可以使用最简单的阻塞方式。
  • 当需要对多个对象进行I/O操作时考虑使用 selectepoll/libevent
    • 一般 epoll/libevent 用于监控网络 socket
    • select 在处理效率会随着I/O数的增加而下降,这个时候可以使用 epoll 替代
  • 当需要异步读写I/O时那么使用AIO是一个比较好的选择
Last Updated 2018-10-14 日 19:32.
Render by hexo-renderer-org with Emacs 26.1 (Org mode 9.1.14)