[What]Linux 并发基本操作

参考宋宝华老师的书 Linux设备驱动开发详解 ,来整理驱动并发操作。

并发与竞态

在并发运行模式下,如果不对共享资源进行保护,那么就会破坏该资源的操作(竞态)。

在Linux内核中,主要的竞态发生于以下几种:

  1. 多核上运行各自的进程(或线程)导致竞态
    • 这种情况是真正的并行运行
  2. 单核上的多个进程(或线程)由于优先级和时间片的关系导致竞态
  3. 进程(或线程)被中断(硬中断、软中断、Tasklet、底半部)打断导致竞态
  4. 中断被更高优先级的中断打断导致竞态
    • Linux2.6.35之后取消了中断嵌套
  5. 多核中断和中断之间的打断导致竞态

解决这些问题的根本在于对共享资源的互斥访问,访问共享资源的 代码区域 称为临界区(Critical Sections).

编译乱序和执行乱序以及解决

除了对共享资源的注意外,还需要理解编译器和CPU对一些代码的非顺序操作而导致的结果异常。

编译乱序优化

编译乱序优化:编译器可以对访存的指令进行乱序,减少逻辑上不必要的访存,以及尽量提高Cache命中率和CPU的Load/Store单元的工作效率。 因此在打开编译器优化以后,可能会看到生成的汇编并没有严格按照代码逻辑顺序执行。

但这种乱序优化就可能会导致代码的运行结果与预期不符合。

解决

使用 barrier() 保证其后面的代码不能跑到前面去。

/*include/linux/compiler-gcc.h*/
/* The "volatile" is due to gcc bugs */
/**
* @brief 保证此宏前的语句和后的语句代码不会乱序
*/

#define barrier() __asm__ __volatile__("": : :"memory")
/**
* @note volatile 用于指定编译器每次读变量的操作都需要直接读内存
* 对于volatile型变量读取,使用此宏
*/

#define barrier_data(ptr) __asm__ __volatile__("": :"r"(ptr) :"memory")

执行乱序(Out-of-Order Execution)

在处理器中执行时,后发射的指令还是可能先执行完。高级的CPU可以根据自己缓存的组织特性,将访存指令重新排序执行。

  • 为了较高的缓存命中率,连续地址的访问可能会先执行。
  • 有的还允许访存的非阻塞,即如果前面一条访存指令缓存不命中,造成长延时的存储访问,后面的访存指令可以先执行,以便从缓存中取数。
    • 所有即使是汇编上看顺序正确的指令,其执行的顺序也是不可预知的。
对于大多数体结构而言,尽管每个CPU都是乱序执行,但是这一乱序对于单个程序执行是不可见的,因为单个CPU在碰到依赖点(后面的指令依赖于前面执行的执行结果)
的时候会等待,所以程序员可能感觉不到这个乱序过程。但是这个依赖点等待的过程,在SMP处理器里面对于其他核是不可见的。

比如CPU0上有语句

{
  while(f == 0);
  print x
}

CPU1上有语句
{
  x = 42;
  f = 1;
}

当CPU1乱序执行则可能 "f = 1" 在 "x = 42" 之前执行, 那么CPU0 打印的 "x" 值就不一定是 42!
虽然单个CPU在碰到依赖点会等待,当程序在访问外设的寄存器时,这些寄存器的访问顺序在CPU逻辑上构不成依赖关系。

解决

处理器为了解决多核间一个核的内存行为对另外一个核的可见问题和访问外设寄存器的顺序问题,引入了一些内存屏障指令。

Linux内核的自旋锁、互斥体等互斥逻辑都需要使用内存屏障指令:在请求获得锁时调用屏障指令,在解锁时也调用屏障指令。

对于外设寄存器的读写,Linux封装了以下读写宏以保证顺序执行:

static inline u8 ioread8(const volatile void __iomem *addr);
static inline u16 ioread16(const volatile void __iomem *addr);
static inline u32 ioread32(const volatile void __iomem *addr);

static inline void iowrite8(u8 value, volatile void __iomem *addr);
static inline void iowrite16(u16 value, volatile void __iomem *addr);
static inline void iowrite32(u32 value, volatile void __iomem *addr);

中断屏蔽(仅单核使用)

单CPU范围内 可以在进入临界区之前屏蔽中断,在退出临界区之后打开中断避免竞态。 (由于设备驱动是应用在各种CPU上的,无法保证是单CPU,所以不推荐驱动使用此方法)

屏蔽中断后使得中断与进程之间的并发不再发生,而且由于Linux内核的进程调度等操作都依赖中断来实现, 内核抢占进程之间的并发也得以避免了。

由于Linux的异步I/O、进程调度等很多重要操作都依赖于中断,所以要尽量保证中断屏蔽时间够短。

使用流程:

/**
* @brief 简单的关闭和打开
*/

//关闭中断
local_irq_disable();

//临界区处理

//打开中断
local_irq_enable();


/**
* @brief 保存现场
*/

unsigned long flags;
//关闭中断并保存目前CPU中断信息位
local_irq_save(flags);

//临界区处理

//恢复中断信息并打开中断
local_irq_restore(flags);

/**
* @brief 仅仅操作底半
*/

local_bh_disable();

local_bh_enable();

原子操作

原子操作主要用于保证对整形变量的修改是互斥的。

typedef struct {
int counter;
} atomic_t;
//定义原子变量并置0
atomic_t v = ATOMIC_INIT(0);

//设置原子变量v的值为i
#define atomic_set(v, i) WRITE_ONCE(((v)->counter), (i))
//读取
#define atomic_read(v) READ_ONCE((v)->counter)

static inline void atomic_add(int i, atomic_t *v);
static inline void atomic_sub(int i, atomic_t *v);

//自加1
static inline void atomic_inc(atomic_t *v);
//自减1
static inline void atomic_dec(atomic_t *);

//减去值后判断是否为0
#define atomic_sub_and_test(i, v)(atomic_sub_return((i), (v)) == 0)
//自减后判断是否为0
#define atomic_dec_and_test(v)(atomic_dec_return(v) == 0)
//自增后判断是否为0
#define atomic_inc_and_test(v)(atomic_inc_return(v) == 0)


//设置地址 addr 处第 nr 位为1
static inline void set_bit(int nr, volatile unsigned long *addr);
static inline void clear_bit(int nr, volatile unsigned long *addr);
//翻转
static inline void change_bit(int nr, volatile unsigned long *addr);

//先测试可以操作 然后再操作
static inline int test_and_set_bit(int nr, volatile unsigned long *addr);
static inline int test_and_clear_bit(int nr, volatile unsigned long *addr);
static inline int test_and_change_bit(int nr, volatile unsigned long *addr);

示例:设备最多只能被一个进程打开

static atomic_t xxx_available = ATOMIC_INIT(1);

static int xxx_open(struct inode *inode, struct file *filp)
{

...
if(atomic_dec_and_test(&xxx_available) == false)
{
//设备已经被打开
atomic_inc(&xxx_available);

return -EBUSY;
}
...
return 0;
}

static int xxx_release(struct inode *inode, struct file *filp)
{

atomic_inc(&xxx_available);

return 0;
}

自旋锁(Spin Lock)

自旋锁的基本使用

获得自旋锁的进程可以操作资源,等待自旋锁的进程就在原地死等,所以在使用自旋锁的场合也应该尽快退出。

自旋锁相关操作函数如下:

//定义自旋锁
spinlock_t lock;

//初始化自旋锁
#define spin_lock_init(_lock) \
do {\
spinlock_check(_lock);\
raw_spin_lock_init(&(_lock)->rlock);\
} while (0)


/**
* @brief 以阻塞的形式获取自旋锁
*/

static __always_inline void spin_lock(spinlock_t *lock);
/**
* @brief 以非阻塞的形式获取自旋锁
* @ret 成功返回 1
*/

static __always_inline int spin_trylock(spinlock_t *lock);
/**
* @brief 释放自旋锁
*/

static __always_inline void spin_unlock(spinlock_t *lock);

范例:

spinlock_t lock;

spin_lock_init(&lock);

spin_lock(&lock);
//临界区处理
spin_unlock(&lock);

自旋锁的衍生

自旋锁可以避免临界区不受本CPU和其他CPU的进程打扰,但可能会受到中断和底半部的影响。 所以需要使用其衍生方法:

  • 一般在中断中使用 spin_lock()/spin_unlock() , 在进程中使用 spin_lock_irqsave()/spin_unlock_irqrestore()
//include/linux/spinlock.h
//关闭中断 + 获取自旋锁
static __always_inline void spin_lock_irq(spinlock_t *lock);
//打开中断 + 释放自旋锁
static __always_inline void spin_unlock_irq(spinlock_t *lock);

//关闭中断 + 保存状态 + 获取自旋锁(在进程中使用)
#define spin_lock_irqsave(lock, flags)\
do {\
raw_spin_lock_irqsave(spinlock_check(lock), flags);\
} while (0)


//打开中断 + 恢复状态 + 释放自旋锁(在进程中使用)
static __always_inline void spin_unlock_irqrestore(spinlock_t *lock, unsigned long flags);

//关闭底半 + 获取自旋锁
static __always_inline void spin_lock_bh(spinlock_t *lock);
//打开底半 + 释放自旋锁
static __always_inline void spin_unlock_bh(spinlock_t *lock);

需要注意的问题

使用自旋锁需要注意的问题:

  1. 在占用锁时间极短的情况下,使用自旋锁才合理
  2. 当递归使用自旋锁时,可能导致系统死锁
  3. 在自旋锁锁定期间不能调用可能引起进程调度的函数
    • 如果此时启动一个进程,进程由阻塞迟迟不返回,那系统将崩溃
  4. 在单核情况下编程时,也应该认为自己的CPU是多核的,因为驱动需要跨平台。

实例

展示设备只能被最多一个进程打开:

int xxx_count = 0; //定义文件打开的次数

static int xxx_open(struct inode *inode, struct file *filp)
{

...
spin_lock(&xxx_lock);
if(xxx_count)
{
//文件已经被打开则退出
spin_unlock(&xxx_lock);
return -EBUSY;
}
xxx_count++;
spin_unlock(&xxx_lock);

return 0;
}
static int xxx_release(struct inode *inode, struct file *filp)
{

...
spin_lock(&xxx_lock);
xxx_count--;
spin_unlock(&xxx_lock);

return 0;
}

读写自旋锁

读写自旋锁允许读并发,写只能有一个进程操作,读写不能同时操作。

相关操作函数如下:

//file: include/linux/rwlock.h
//定义并初始化
rwlock_t my_rwlock;
rwlock_init(&my_rwlock);

//读锁定
#define read_lock(lock) _raw_read_lock(lock)
#define read_lock_irqsave(lock, flags) ...
#define read_lock_irq(lock) _raw_read_lock_irq(lock)
#define read_lock_bh(lock) _raw_read_lock_bh(lock)

//读解锁
#define read_unlock(lock) _raw_read_unlock(lock)
#define read_unlock_irqrestore(lock, flags) ...
#define read_unlock_irq(lock) _raw_read_unlock_irq(lock)
#define read_unlock_bh(lock) _raw_read_unlock_bh(lock)

//写锁定
#define write_lock(lock) _raw_write_lock(lock)
#define write_lock_irqsave(lock, flags) ...
#define write_lock_irq(lock) _raw_write_lock_irq(lock)
#define write_lock_bh(lock) _raw_write_lock_bh(lock)

//写解锁
#define write_unlock(lock) _raw_write_unlock(lock)
#define write_unlock_irqrestore(lock, flags) ...
#define write_unlock_irq(lock) _raw_write_unlock_irq(lock)
#define write_unlock_bh(lock) _raw_write_unlock_bh(lock)

一般操作如下:

rwlock_t lock;
rwlock_init(&lock);

read_lock(&lock);
//临界资源
read_unlock(&lock);

write_lock_irqsave(&lock, flags);
//临界资源
write_unlock_irqrestore(&lock, flags);

顺序锁

顺序锁是对读写锁的优化,读操作不会被写操作阻塞,读操作不用等待写操作完成,写操作也不用等待读操作完成。 但是写和写操作之间仍然是互斥的,同一时刻只能有一个写操作获取共享资源。

虽然读写之间不互相排斥,但如果读执行单元在读操作期间,写操作已经发生,那么就需要重新读取数据。

相关的操作函数:

//file: include/linux/seqlock.h

//获取顺序锁
static inline void write_seqlock(seqlock_t *sl);
static inline void write_seqlock_irq(seqlock_t *sl);
static inline void write_seqlock_bh(seqlock_t *sl)
#define write_seqlock_irqsave(lock, flags) ...

//释放
static inline void write_sequnlock(seqlock_t *sl);

static inline void write_sequnlock_bh(seqlock_t *sl);
static inline void write_sequnlock_irq(seqlock_t *sl);
static inline void write_sequnlock_irqrestore(seqlock_t *sl, unsigned long flags);

示例:

seqlock_t lock;
seqlock_init(&lock);

/**
* @brief 写操作
*/

write_seqlock(&lock);
//写操作
...
write_sequnlock(&seqlock_a);

/**
* @brief 读操作
*/

do
{
//读之前需要此函数并返回一个序号
seqnum = read_seqbegin(&lock);
//读操作
...
//读之后需要检查是否需要重读
}while(read_seqretry(&lock, seqnum));

读-复制-更新

RCU(Read-Copy-Update)在读端没有锁、内存屏障、原子指令类的开销,几乎可以认为是直接读,RCU在写执行单元访问它的共享资源前首先复制一个副本, 然后对副本进行修改,最后使用回调机制在适当的时机 把指向原来数据的指针重新指向新的被修改的数据。 这个时机就是所有引用数据的CPU都退出对 共享数据读操作的时候。等待适当时机的时期称为宽限期(Grace Period).

RCU既允许多个读操作又允许多个写操作。 但是RCU不能替代读写锁,因为如果写操作比较多时,对读执行单元的性能提高不能弥补写执行单元同步导致的损失。

操作函数:

//file: include/linux/rcupdate.h
//读锁定
static inline void rcu_read_lock(void);
static inline void rcu_read_lock_bh(void);

//读解锁
static inline void rcu_read_unlock(void);
static inline void rcu_read_unlock_bh(void);

/**
* @brief 同步
* @note 将写操作阻塞直到当前已经存在的读操作完成,写操作才继续下一步。
* 它并不需要等待后续读临界区完成
*/

void synchronize_rcu(void);

struct callback_head {
struct callback_head *next;
void (*func)(struct callback_head *head);
} __attribute__((aligned(sizeof(void *))));
#define rcu_head callback_head

typedef void (*rcu_callback_t)(struct rcu_head *head);

/**
* @brief 挂接回调,把 func 挂接到RCU回调链上然后立即返回。
* func会在宽限期结束后被执行
* @note 不会使写执行单元阻塞,可以在中断上下文或软中断中使用。
*/

void call_rcu(struct rcu_head *head, rcu_callback_t func);

//给RCU保护的指针赋一个新值
#define rcu_assign_pointer(p, v) smp_store_release(&p, RCU_INITIALIZER(v))
//获取一个RCU保护的指针
#define rcu_dereference(p) rcu_dereference_check(p, 0)

/**
* @brief 举例
* @note 写端将结构体地址赋值给全局指针 gp,读端再访问
*/

struct foo{
int a;
int b;
int c;
};
struct foo *gp = NULL;

p = kmalloc(sizeof(*p), GFP_KERNEL);
p->a = 1;
p->b = 2;
p->c = 3;
rcu_assign_pointer(gp, p);

//读端访问
rcu_read_lock()
p = rcu_dereference(gp);
if(p != NULL)
{
do_something_with(p->a, p->b, p->c);
}
rcu_read_unlock();

/**
* @brief 对链表操作的RCU
* @note include/linux/rculist.h
*/

//把元素new插入rcu保护链表head开头
static inline void list_add_rcu(struct list_head *new, struct list_head *head);
//插入到尾
static inline void list_add_tail_rcu(struct list_head *new,
struct list_head *head)
;

//删除元素
static inline void list_del_rcu(struct list_head *entry);
//新元素new取代旧old
static inline void list_replace_rcu(struct list_head *old,
struct list_head *new)
;

//遍历链表
#define list_for_each_entry_rcu(pos, head, member) ...

/**
* @brief 示例链表操作
*/

struct foo{
struct list_head list;
int a;
int b;
int c;
};
LIST_HEAD(head);

p = kmalloc(sizeof(*p), GFP_KERNEL);
p->a = 1;
p->b = 2;
p->c = 3;
list_add_rcu(&p->list, &head);

//读端
rcu_read_lock();
list_for_each_entry_rcu(p, head, list)
{
do_something_with(p->a, p->b, p->c);
}
rcu_read_unlock();

下面的例子演示RCU保护的链表删除节点N的工作,删除N后 等待一个宽限期结束后再释放N的内存。

struct el
{
struct list_head lp;
long key;
int data;
};
DEFINE_SPINLOCK(listmutex);
LIST_HEAD(head);

int search(long key, int *result)
{

struct el *p;

rcu_read_lock();
list_for_each_entry_rcu(p, &head, lp)
{
if(p->key == key)
{
*result = p->data;
rcu_read_unlock();
return 1;
}
}
rcu_read_unlock();

return 0;
}

int delete(long key)
{

struct el *p;

spin_lock(&listmutex);
list_for_each_entry_rcu(p, &head, lp)
{
if(p->key == key)
{
list_del_rcu(&p->lp);
spin_unlock(&listmutex);
synchronize_rcu();
kfree(p);
return 1;
}
}
spin_unlock(&listmutex);

return 0;
}

信号量

目前来说信号量更多的被用于同步。

与信号量相关操作函数:

//定义信号量
struct semaphore sem;

//初始化信号量
void sema_init(struct semaphore *sem, int val);

/**
* @brief 获取信号量
* @note 如果获取失败则会睡眠等待,所以不能在中断上下文中调用
*/

void down(struct semaphore *sem);

/**
* @brief 获取信号量
* @note 此函数导致的睡眠可以被信号打断,并返回负数
*
* if(down_interruptible(&sem)
* return -ERESTARTSYS;
*
*/

int dowm_interruptible(struct semaphore *sem);

//超时获取不到信号量则自动退出
int down_timeout(struct semaphore *sem, long jiffies);

//非阻塞形式获取信号量,可以在中断上下文使用
int dowm_trylock(struct semaphore *sem);

//释放信号量
void up(struct semaphore *sem);

互斥体

互斥量更适合用于互斥,因为它可以被同一个进程重复获取而不导致睡眠。

相关函数:

//定义并初始化
struct mutex my_mutex;
mutex_init(&my_mutex);

//获取互斥量
void mutex_lock(struct mutex *lock);
int mutex_lock_interruptible(struct mutex *lock);
int mutex_trylock(struct mutex *lock);

//释放互斥量
void mutex_unlock(struct mutex *lock);

自旋锁和互斥体选用的3项原则:

  1. 若临界区比较小宜使用自旋锁,否则使用互斥体。
    • 互斥体依赖于自旋锁,且互斥体引起阻塞后会执行进程切换操作,耗时相对较多。
  2. 如果临界区包含可能引起阻塞的代码,只能使用互斥体。
    • 如果使用自旋锁,临界区阻塞后切换到另一个进程。而另一个进程也在获取本自旋锁,就会一直死等下去。
  3. 如果临界区在中断或软中断中使用,那么应该使用自旋锁。
    • 如果使用互斥体,则应该使用 mutex_trylock() 的方式。

完成量

完成量用于一个执行单元等待另一个执行单元执行完某事。

操作函数:

//定义完成量
struct completion my_completion;

//赋值完成量为0
init_completion(&my_completion);
reinit_completion(&my_completion);

//等待完成量
void wait_for_completion(struct completion *c);

//唤醒一个完成量
void complete(struct completion *c);
//唤醒所有完成量
void complete_all(struct completion *c);
Last Updated 2018-10-14 日 19:32.
Render by hexo-renderer-org with Emacs 26.1 (Org mode 9.1.14)