#include <pthread.h>
int pthread_equal(pthread_t tid1, pthread_t tid2)
对比两个线程id是否相等
#include <pthread.h>
pthread_t pthread_self(void)
获取自身的tid
3.创建线程
#include <pthread.h>
int pthread_create(pthread_t *restrict tidp, const pthread_attr_t *restrict attr, void *(*start_rtn)(void *), void *restrict arg); 成功返回0,否则返回错误编号
当函数成功返回的时候,新创建的线程ID会被设置成tidp指向的内存单元。
新创建的线程从start_rtn函数的地址开始运行,该函数只有一个无类型的指针参数arg。如果需要向start_rtn传递的参数有一个以上,需要把这些参数放到一个结构中,然后将结构地址作为arg参数传入
4.线程终止
1.线程正常返回,返回值是线程的退出码
2.倍同一进程中的其他线程取消
3.线程调用pthread_exit
#include <pthread.h>
void pthread_exit(void *rval_ptr)
int pthread_join(pthread_t thread, void **rval_ptr);
如果线程调用pthread_join 将会一直阻塞,直到指定的线程调用pthread_exit,或者从启动历程中返回或者被取消。
如果只是从它的启动例程返回,rval_ptr就包含返回码。如果线程被取消,rval_ptr指定的内存单元会被设置成PTHREAD_CANCELED。如果不关心返回值,可以把rval_ptr设为NULL。
5.线程取消
#include <pthread.h>
int pthread_cancel(pthread_t tid);
//成功返回 0 否则返回错误编号
该函数并不等待线程终止,只是提出请求
6.线程退出清理
#include <pthread.h>
void pthread_cleanup_push(void (*rtn)(void *), void *arg);
void pthread_cleanup_pop(int execute);
触发时机:
当调用pthread_exit时
响应取消请求时
execute设置非0时
execute设置为0时,将删除上一次 pthread_cleanup_push调用建立的程序
进程函数与线程函数对比
7.线程同步
增量操作通常分三步
1.把内存单元读入寄存器
2.把寄存器中对变量做增量操作
3.把新的值写回单元
8.互斥量(mutex)
本质上是一把锁,在访问共享资源前对mutex进行设置(加锁),访问完成后释放(解锁)互斥量。
任何其他试图再次对互斥量加锁的都会被阻塞,直到当前线程释放互斥量。如果释放互斥量时有一个以上的线程阻塞,那么所有该锁上的阻塞线程都会变成可运行状态,第一个变为可运行的线程就可以对互斥量加锁。
如果允许其中的某一个线程在没有加锁的情况下访问共享资源,那么互斥量就没用了
#include <pthread.h>
int pthread_mutex_init(pthread_mutex_t *restrict mutex,
const pthread_mutexattr_t *restrict attr);
int pthread_mutex_destroy(phtread_mutex_t *mutex);
//成功返回0 否则返回错误编号
默认初始化变量只需要把attr设置为NULL
#include<pthread.h>
int pthread_mutex_lock(pthread_mutex_t *mutex);
int pthread_mutex_trylock(pthread_mutex_t *mutex);
int pthread_mutex_unlock(pthread_mutex_t *mutex)
//成功返回 0 否则返回错误编号
如果线程不希望被阻塞,可以使用trylock尝试对互斥量进行加锁,如果处于未锁住状态,就锁住互斥量 ,如果成功返回0,锁住互斥量,否则失败,返回EBUSY
#include <stdlib.h>
#include <pthread.h>
struct foo{
int f_count;
pthread_mutex_t f_lock;
int f_id;
};
struct foo*
fool_alloc(int id)
{
struct foo *fp;
if((fp = (struct foo*)malloc(sizeof(struct foo))) != NULL){
fp->f_count = 1;
fp->f_id = id;
if(pthread_mutex_init(&fp->f_lock,NULL) != 0){
free(fp);
return NULL;
}
}
return fp;
}
void
foo_hold(struct foo *fp)
{
pthread_mutex_lock(&fp->f_lock);
fp->f_count++;
pthread_mutex_unlock(&fp->f_lock);
}
void
foo_rele(struct foo *fp)
{
pthread_mutex_lock(&fp->f_lock);
if(--fp->f_count == 0){
pthread_mutex_unlock(&fp->f_lock);
pthread_mutex_destroy(&fp->f_lock);
free(fp);
}
}
9.避免死锁
如果线程试图对同一个互斥量加锁两次,那么自身就会陷入死锁状态。
程序中使用一个以上的互斥量时,如果允许一个线程一直占用第一个互斥量,并且试图在锁住第二个互斥量时处于阻塞状态,拥有第二个互斥量的线程也在试图锁住第一个互斥量,互相都不释放锁并且阻塞的情况。
假设 需要对互斥量AB同时加锁。如果所有线程总是在互斥量B加锁之前锁住互斥量A,那么使用这两个互斥量就不会产生死锁。
如果涉及太多的锁和数据结构,可以采用另外的方法。在这种情况下,可以先释放占有的锁,然后过一段时间再试。这种情况可以使用pthread_mutex_trylock接口避免死锁,如果已经占用某些锁 而且pthread_mutex_trylock接口返回成功,那么就可以继续。
#include <stdlib.h>
#include <pthread.h>
#define NHASH 29
#define HASH(id) (((unsigned long)id) % NHASH)
struct foo *fh[NHASH]; //pointer to array fh
pthread_mutex_t hashlock = PTHREAD_MUTEX_INITIALIZER;
struct foo{
int f_count;
pthread_mutex_t f_lock;
int f_id;
struct foo * f_next; //pointer to next foo
/***more stuff.....***/
};
struct foo*
fool_alloc(int id)
{
struct foo *fp;
int idx;
if((fp = (struct foo*)malloc(sizeof(struct foo))) != NULL){ //allocate memory of object foo
fp->f_count = 1;
fp->f_id = id;
if(pthread_mutex_init(&fp->f_lock,NULL) != 0){
free(fp); //if mutex init error free memory of struct foo fp;
return NULL;
}
idx = HASH(id);
pthread_mutex_lock(&hashlock);
fp->f_next = fh[idx];
fh[idx] = fp;
pthread_mutex_lock(&fp->f_lock);
pthread_mutex_unlock(&hashlock);
pthread_mutex_unlock(&fp->f_lock);
}
return fp;
}
void
foo_hold(struct foo *fp)
{
pthread_mutex_lock(&fp->f_lock);
fp->f_count++;
pthread_mutex_unlock(&fp->f_lock);
}
struct foo*
foo_find(int id)
{
struct foo *fp;
pthread_mutex_lock(&hashlock);
for(fp = fh[HASH(id)]; fp != NULL; fp = fp->f_next){
if(fp->f_id == id) {
foo_hold(fp);
break;
}
}
pthread_mutex_unlock(&hashlock);
return fp;
}
void
foo_rele(struct foo *fp)
{
struct foo *tfp;
int idx;
pthread_mutex_lock(&fp->f_lock);
if(fp->f_count == 1){
pthread_mutex_unlock(&fp->f_lock);
pthread_mutex_lock(&hashlock);
pthread_mutex_lock(&fp->f_lock);
if(fp->f_count != 1){
fp->f_count--;
pthread_mutex_unlock(&fp->f_lock);
pthread_mutex_unlock(&hashlock);
return;
}
idx = HASH(fp->f_id);
tfp = fh[idx];
if(tfp == fp){
fh[idx] = fp->f_next;
}else
{
while(tfp->f_next != fp)
tfp = tfp->f_next;
tfp->f_next = fp->f_next;
}
pthread_mutex_unlock(&hashlock);
pthread_mutex_unlock(&fp->f_lock);
pthread_mutex_destroy(&fp->f_lock);
free(fp);
}
else{
fp->f_count--;
pthread_mutex_destroy(&fp->f_lock);
}
}
上面的功能可以看出 锁的颗粒度的大小严重影响多线程程序的复杂度 颗粒度越小 复杂度越大
10.pthread_mutex_timedlock
访问已加锁的互斥量的时候,允许绑定一个时间,如果在这个时间内都没有加上锁,那么就会返回 ETIMEDOUT
11.读写锁 (reader-writer lock)或者叫共享互斥锁(shared-exclusive lock)
1.读模式下加锁 有多线程可以占有
2.写模式下加锁 一次只有一个线程可以占有
3.不加锁
当读写锁 是处于写加锁状态的时候,所有试图访问该锁的线程都会阻塞。
当读写锁 是处于读加锁状态的时候,所有以读模式对它进行加锁的线程都可以得到访问权限,但是以写模式对此进行加锁的线程都会阻塞,一直到所有线程都释放他们的读锁位置 (当以写模式对此进行加锁到来的时候,后续进行的读模式进行加锁都会被阻塞,为了防止出现等待的写模式锁的请求一直得不到满足的情况)
读写锁非常适用于数据结构的读取远大于写的情况
初始化:
#include <pthread.h>
int pthread_rwlock_init(pthread_rwlock_t * restrict rwlock,
const pthread_rwlockattr_t * restrict attr);
int pthread_rwlock_destroy(pthread_rwlock_t *rwlock);
//成功返回0 否则返回错误编号
读写锁操作:
#include <pthread.h>
int pthread_rwlock_rdlock(pthread_rwlock_t *rwlock);
int pthread_rwlock_wrlock(pthread_rwlock_t *rwlock);
int pthread_rwlock_unlock(pthread_rwlock_t *rwlock);
//成功返回0 否则返回错误编号
#include <pthread.h>
int pthread_rwlock_tryrdlock(pthread_rwlock_t *rwlock);
int pthread_rwlock_trywrlock(pthread_rwlock_t *rwlock);
可以获取锁的时候返回0,否则返回EBUSY
代码示例:
多个工作线程获取单个主线程分配给他们的作业
#include <stdlib.h>
#include <pthread.h>
struct job {
struct job *j_next;
struct job *j_prev;
pthread_t j_id;
/** stuff here**/
};
struct queue{
struct job *q_head;
struct job *q_tail;
pthread_rwlock_t q_lock;
};
/*
*Initialize a queue
*/
int
queue_init(struct queue *qp)
{
int err;
qp->q_head = NULL;
qp->q_tail = NULL;
err = pthread_rwlock_init(&qp->q_lock,NULL);
if(err != 0)
{
return(err);
}
/**continue init data....**/
return 0;
};
/*
*Insert a job at the head of the queue.
*/
void
job_insert(struct queue *qp , struct job *jb)
{
pthread_rwlock_wrlock(&qp->q_lock); // lock with read mod
jb->j_next = qp->q_head;
jb->j_prev = NULL;
if(qp->q_head != NULL)
qp->q_head->j_prev = jb;
else
qp->q_tail = jb; /* list was empty*/
qp->q_head = jb;
pthread_rwlock_unlock(&qp->q_lock);
}
/*
* Append a job on the tail of the queue
*/
void
job_append(struct queue *qp , struct job *jb)
{
pthread_rwlock_wrlock(&qp->q_lock);
jb->j_next = NULL;
jb->j_prev = qp->q_tail;
if(qp->q_tail != NULL)
qp->q_tail->j_next = jb;
else
qp->q_head->j_next = jb;
qp->q_tail = jb;
pthread_rwlock_unlock(&qp->q_lock);
}
/*
*Remove the given job from a queue
*/
void
job_remove(struct queue *qp, struct job *jb)
{
pthread_rwlock_wrlock(&qp->q_lock);
if(jb == qp->q_head){ //if job at the head of a queue
qp->q_head = jb->j_next;
if(qp->q_tail == jb) // if only one job
qp->q_tail = NULL;
else
jb->j_next->j_prev = jb->j_prev;
}
else if (jb == qp->q_tail){
qp->q_tail = jb->j_prev;//if job at the head of a queue
jb->j_prev->j_next = jb->j_next;
} else {
jb->j_prev->j_next = jb->j_next;
jb->j_next->j_prev = jb->j_prev;
}
pthread_rwlock_unlock(&qp->q_lock);
}
/*
*Find a job by given thread ID
*/
struct job *
job_find(struct queue *qp, pthread_t id)
{
struct job *jp;
if(pthread_rwlock_rdlock(&qp->q_lock) != 0)
return NULL;
for(jp = qp->q_head ; jp != NULL ;jp = jp->j_next)
if(pthread_equal(jp->j_id,id))
break;
pthread_rwlock_unlock(&qp->q_lock);
return jp;
}
12.条件变量
条件变量本身是由互斥量保护的。所以改变条件状态之前必须首先锁住互斥量。
初始化:
#include <pthread.h>
int pthread_cond_init(pthread_cond_t *restrict cond,
const pthread_condattr_t *restrict attr);
int pthread_cond_destroy(pthread_cond_t *cond);
//成功返回0 否则返回错误编号
使用接口:
#include <pthread.h>
int pthread_cond_wait(pthread_cond_t *restrict cond,
pthread_mutex_t *restrict mutex);
int pthread_cond_timedwait(pthread_cond_t *restrict cond,
pthread_mutex_t *restrict mutex,
const struct timespec *restrict tsptr);
//当条件满足时 返回0 否则 阻塞 如果错误返回错误编号
使用wait的时候 mutex必须是锁住的,函数自动把调用线程放到等待条件的线程列表上,然后对互斥量进行解锁。
当wait函数返回时,互斥量再次被锁住。
通知条件已满足
#include <pthread.h>
int pthread_cond_signal(pthread_cond_t *cond); //至少唤醒一个等待该条件线程
int pthread_cond_broadcast(pthread_cond_t *cond); //唤醒等待该条件的所有进程
//成功返回0 错误返回错误编号
一定要在改变条件状态后再给线程发信号
代码示例
#include <pthread.h>
struct msg{
struct msg *m_next;
/*more stuff here*/
};
struct msg *workq;
pthread_cond_t qready = PTHREAD_COND_INITIALIZER;
pthread_mutex_t qlock = PTHREAD_MUTEX_INITIALIZER;
void
process_msg(void)
{
struct msg *mp;
for(;;){
pthread_mutex_lock(&qlock);
while(workq == NULL)
{
pthread_cond_wait(&qready, &qlock);
mp = workq;
workq = mp->m_next;
pthread_mutex_unlock(&qlock);
}
}
}
void
enqueue_msg(struct msg *mp)
{
pthread_mutex_lock(&qlock);
mp->m_next = workq;
workq = mp;
pthread_mutex_unlock(&qlock);
pthread_cond_signal(&qready);
}
process_msg中 先锁住qlock互斥量,如果工作队列wq为空,则需要等待qready条件,此时把qlock锁释放掉,线程阻塞,一直到有某个线程调用了 enqueue_msg 通知qready已经ok,线程被唤醒 然后继续执行下面的工作处理逻辑。
13.自旋锁
与互斥量相似,但是不是通过休眠使进程阻塞,而是在获取锁之前一直处于忙等(自旋)阻塞状态。
用于:锁被持有的时间短。
#include <pthread.h>
int pthread_spin_init(pthread_spinlock_t *lock, int pshared);
int pthread_spin_destroy(pthread_spinlock_t *lock);
//成功返回0 否则返回错误编号
pshared参数表示进程共享属性,表面自旋锁是如何获取的。
如果设置为:PTHREAD_PROCESS_SHARED 则该锁可以被 可以访问锁底层内存的线程 所获取
否则设置为:PTHREAD_PROCESS_PRIVATE 只能被初始化该锁的 进程内部的线程 所访问
14.屏障
屏障允许每个线程等待,直到所有的合作线程都到达某个点,然后从该点继续执行。
#include <pthread.h>
int pthread_barrier_init(pthread_barrier_t *restrict barrier,
const pthread_barrierattr_t *restrict attr,
unsigned int count);
int pthread_barrier_destroy(pthread_barrier_t *barrier);
int pthread_barrier_wait(pthread_barrier_t * barrier); //返回0 或者PTHREAD_BARRIER_SERIAL_THREAD 否则返回错误编号
count : 在允许所以线程继续允许之前,必须到达屏障的线程数目。
调用wait函数的线程在屏障计数,未满足条件的时候,会进入休眠状态。如果这个线程是最后一个调用pthread_barrier_wait的线程,就满足了屏障计数,所有的线程都被唤醒。
对于任一一个线程,如果wait函数返回了PTHREAD_BARRIER_SERIAL_THREAD。说明这个线程可以作为主线程,剩下的线程返回值只会为0。
一旦达到屏障计数值,并且线程处于非阻塞状态,屏障就可以被复用。
但是在destroy之后,又调用init 时会对计数器进行初始化。否则复用的时候计数器不会改变