内核里处理的竞态主要通过以下方法处理: 信号量(互斥量)、自旋锁、读写信号量、读写自旋锁、等待队列、完成量。
- 信号量(互斥量)
//初始化信号量,val表示可并发使用的资源数量
void sema_init(struct semaphore *sem, int val);
//获取一个资源,资源数量减1,获取不到则线程休眠
void down (&sem);
//可中断的down操作,休眠中可被中断唤醒
int down_interruptible(&sem);
//尝试获取,不成功不会阻塞,直接返回非0
int down_trylock(&sem);
//释放信号量
void up(&sem);
上述获取函数中,down_interruptible和down_trylock都有返回值,返回为0表示获取信号量成功,非0则说明未获取到。对 down_interruptible 来说是在等待休眠中被打断了,对down_trylock则是说明信号量已全被占用。
使用这两种down肯定要始终检查返回值。
互斥量就是信号量初始化为1的情况,也是最常用的情况:
DECLARE_MUTEX(name);
DECLARE_MUTEX_LOCKED(name)
//或者
void init_MUTEX(struct semaphore *sem);
void init_MUTEX_LOCKED(struct semaphore *sem);
example:
//用信号量保证设备同时只被一个线程打开
static DECLARE_MUTEX(mutex);
static xxx_open(struct inode *inode, struct file *filp)
{
......
if(0 != down_trylock(&mutex)) {
//返回设备忙
return -EBUSY;
}
......
//正常返回,设备文件打开
return 0;
}
static xxx_release(struct inode *inode, struct file *filp)
{
//close时释放互斥量
up(&mutex);
return 0;
}
***关闭设备时一定要 up !! ***
- 自旋锁
自旋锁也是一个互斥设备,线程获取到锁后可使用某个资源,使用完后释放该锁。在未释放时,另一线程申请该锁,则陷入自旋,即忙等待,期间一直对锁进行测试,一旦可用就锁定。对SMP及preemptive的单处理器就是如此,对于non-preemptive的单处理器自旋锁什么也不需要做,因为线程执行中并不会被抢占,一旦某个线程陷入自旋,就不会有其他线程来释放它等待的锁。
自旋锁的实现描述:
do {
preempt_disable(); __acquire(lock); (void)(lock);
}while(0)
即关闭抢占 -> 请求锁 -> 锁定(置位)
//初始化
spinlock_t lock = SPIN_LOCK_UNLOCKED;
//或者
void spinlock_init(spinlock_t *lock);
//请求锁
void spin_lock(spinlock_t *lock);
//请求锁、保存中断状态、禁止中断
void spin_lock_irqsave(&lock, unsigned long flag);
//请求锁、禁止中断
void spin_lock_irq(&lock);
//请求锁、禁止软中断、允许硬件中断
void spin_lock_bh(&lock)
//---------------------------------------------------
//释放
void spin_unlock(spinlock_t *lock);
//其他lock都有对应的unlock
void spin_unlock_irqrestore(&lock, unsigned long falg);
void spin_unlock_irq(&lock);
void spin_unlock_bh(&iock);
//-------------------------------------------------
//try 版本,锁被占用立即返回非0
int spin_trylock(&lock);
int spin_trylock_bh(&lock);
获取自旋锁期间的代码段是禁止休眠的,
这里要注意禁止使用一些可能引起休眠的函数,包括copy_to_user, copy_from_user, kmalloc等
- 读写信号量
基本机制就是: 允许多个读取者获得信号量,但只允许一个写入者,且一旦一个写入者获取到锁,就会禁止其他读取/写入者获取信号量,即完全锁定了。
void init_rwsem(struct rw_semaphore *sem);
//读
void down_read(&sem);
int dow_read_trylock(&sem);
void up_read(&sem);
//写
void down_write(&sem);
int down_write_trylock(&sem);
void up_write(&sem);
//一段时间内禁止获取写入者信号量
void downgrade_write(&sem);
downgrade_write的意义:
读写信号量中写入者优先级比读取者高,且排斥读取者,这就可能会出现在写入频繁时,读取者长时间获取不到锁。因此可以在某次写入者释放锁后,调用downgrade_write可在一段时间内禁止写入,即将保护资源变成了只读。
- 读写自旋锁
使用和自旋锁API相似,如下:
void rwlock_init(rwlock_t *lock);
void read_lock(&lock);
void read_lock_irqsave(&lock);
void read_lock_irq(&lock);
void read_lock_bh(&lock);
//释放锁参考自旋锁部分
//写入锁多一个try版本,读取者并没有try版本
int lock_write_rrylock(&lock);
对自旋锁似乎并不是很在意读取者饥饿的情况。
- 完成量 & 等待队列
信号量和自旋锁都用于处理并发时的资源保护,等待队列和完成量则是用于内核线程间的同步。
完成量可以很好的实现同步,接口也很简洁:
//初始化
DECLARE_COMPLETION(cmp);
//或者,动态的创建
void init_completion(struct completion *cmp);
//等待完成量,非中断等待,不可杀
void wait_for_completion(&cmp);
//通知完成量
//唤醒一个等待线程
void complete(&cmp);
//唤醒所有等待线程
void complete_all(&cmp);
等待队列更加古老,完成量是在其基础上封装的,
唤醒在等待队列上休眠的线程需要两个条件,一个是condition为真,另一个是对应的wake_up被调用。
基本原理是线程调用wait_even_xxx
后,线程被置为TASK_INTERRUPTIBLE(或UNINTERRUPTIBLE),并被调度出去。调用wake_up就是将这些线程重新放回就绪队列。线程被再次调度后,首先判断condition是否为真,如果不是,重复进入等待。
#define wait_event(wq, condition) \
do { \
if (condition) \
break; \
__wait_event(wq, condition); \
} while (0)
//-------------------------------------------------------------------------
#define __wait_event(wq, condition) \
do { \
DEFINE_WAIT(__wait); \
\
for (;;) { \
prepare_to_wait(&wq, &__wait, TASK_UNINTERRUPTIBLE); \
if (condition) \
break; \
schedule(); \
} \
finish_wait(&wq, &__wait); \
} while (0)
使用步骤如下:
//初始化等待队列头
DECLARE_WAIT_QUEUE_HEAD(queue_head);
init_waitqueue_head(wait_queue_head_t *queue_head);
//等待事件
wait_event(queue_head, condition)
wait_event_interruptible(queue_head, condition);
wait_event_timeout(queue_head, condition, timeout);
wait_event_interruptible_timeout(queue_head, condition, timeout);
//唤醒
void wake_up(&queue_head);
void wake_up_interruptible(&queue_head);
另一种方法,直接向等待队列中增加一个任务,控制程序调度,实际上就是手动控制wait_event的过程,显然麻烦了。
/*接口*/
//定义一个等待队列节点
DECLARE_WAITQUEUE(queue, tsk);
// 添加/移除等待队列
void fastcall add_wait_queue(&queue_head, &queue);
void fastcall remove_wait_queue(&queue_head, &queue);
/*例子*/
DECLEARE_WAITQUEUE(queue,current); //保存current指针
add_wait_queue(&queue_head, &queue);
while(!conditon)
{
set_current_state(TASK_INTERRUPTIBLE);
if(signal_pending(currrent))
/*处理信号*/
schedule();
}
set_current_state(TASK_RUNNING);
remove_wait_queue(q,&wait);