互斥与条件变量

前言:现在我们开始学习如何同步多个线程,从最简单的「互斥」开始,废话不多说,让我们开始吧

0X00 互斥的感性认识

互斥是最简单的同步形式,简单来说,就是一个锁问题,

上锁的时候,只有一个线程能够执行"锁里面"的代码

什么是锁里面的代码呢?代码一般长这个样子:

lock_the_mutex(...);

// 锁里面的代码,我们把它叫做临界区
...
unlock_the_mutex();

那这个锁锁住的是什么呢?锁住的是线程之间共享的资源,也就是说,当前线程上锁的时候,只有当前线程能够修改共享的资源,其他的线程都不行

接下来我们介绍与互斥相关的 c 函数

#include <pthread.h>

int pthread_mutex_lock(pthread_mutex_t *mptr);
int pthread_mutex_unlock (pthread_mutex_t *mpir);

0X01 逃不开的经典问题——生产者与消费者问题

接下来我们用学到的知识实现下述的模型:


有数个生产者线程给一块区域填值,等到所有生产者结束操作,消费者才能去动那块区域

书上的源程序在这里:

https://github.com/TensShinet/learn_IPC/blob/master/book_code/mutex/prodcons2.c

这是我的程序,但是这个程序有点问题(只执行了一个线程,其他线程不执行)

#include "./unpipc.h"


#define MAXNITEMS 1000000
#define MAXNTHREADS 100

int nitems;

struct {
    // 互斥锁
    pthread_mutex_t mutex;

    // 共享资源
    int buff[MAXNITEMS];
    // buff 的索引
    int nput;
    // 放进去的值
    int nvalue;
} shared = { PTHREAD_MUTEX_INITIALIZER };

/*
argv 输入 线程数 和 要产生多少 item 的数目

share.nput 记录着要写入的 buff 索引

share.nvalue 记录要写入的值
*/

void *produce(void *);
void *consume(void *);

int main(int argc, char **argv) {

    int i, nthreads, count[MAXNTHREADS];
    pthread_t tid_produce[MAXNTHREADS], tid_consume;

    if(argc != 3) {
        perror("usage:  prodcons2 <#items> <#threads>");
    }

    nitems = min(atoi(argv[1]), MAXNITEMS);
    nthreads = min(atoi(argv[2]), MAXNTHREADS);

    // 设置并发
    thr_setconcurrency(nthreads);
    // 产生所有的生产者
    for(i = 0; i < nthreads; i++) {
        count[i] = 0;
        pthread_create(&tid_produce[i], NULL, produce, &count);
    }

    // 等待所有生产者
    for(i = 0; i < nthreads; i++) {
        pthread_join(tid_produce[i], NULL);
        // 看每个线程执行了多少次
        printf("[%d]th thread = %d\n", i, count[i]);
    }

    // 创建消费者
    pthread_create(&tid_consume, NULL, consume, NULL);
    // 等待消费者
    pthread_join(tid_consume, NULL);
    return 0;
}

void *produce(void *arg) {
    for (;;) {   
        // 上锁
        pthread_mutex_lock(&shared.mutex);
        // 数组满了,解锁退出线程
        if (shared.nput >= nitems) {
            pthread_mutex_unlock(&shared.mutex);
            return (NULL);
        }
        shared.buff[shared.nput] = shared.nvalue;
        shared.nput++;
        shared.nvalue++;
        *((int *)arg) += 1;
        pthread_mutex_unlock(&shared.mutex);
    }
}
/*

如果按照我们想象的那样 

buff[i] == i

所以我们的消费者只是用来检测是否出错
*/
void *consume(void *arg) {
    int i;
    printf("nitems %d\n", nitems);
    for (i = 0; i < nitems; i++)
    {
        if (shared.buff[i] != i) {
            printf("buff[%d] = %d\n", i, shared.buff[i]);
        }
    }
    return (NULL);
}

头文件在这里:https://github.com/TensShinet/learn_IPC/blob/master/my_code/mutex/unpipc.h

0X02 条件变量的感性认识

我们从最简单的互斥开始,开始线程之间的同步,紧接着为了优化这个程序的速度,我们为什么不可以做到,生产者一产生,消费者就拿走的效果呢

而不是像之前那样,所有生产者都产生完了,消费者再行动

为了达到这个目的,我们必须在每次消费者消费之前,检查生产者是否完成了对该区域的生产,为了达到这个目的,我们必须不停的解锁,上锁,完成对该区域的检查,这个叫做轮询

有了条件变量以后,不需要这样做,除了上锁,我们还可以等待

一旦某个条件没达成,这个线程就阻塞等待,一旦这个这个条件达成了,这个线程就被唤醒,这就是「条件变量」

接下来我们介绍与条件变量相关的函数

 #include <pthread.h>
 int pthread_cond_wait(pthread_cond_t *cond,
                      pthread_mutex_t *mutex);   
 int pthread_cond_signal(pthread_cond_t *cond);  

前者用来当条件不满足时阻塞,后者被用来设置满足条件。而且条件变量一定会和一个互斥锁绑定在一起

0X03 使用「条件变量」解决生产者与消费者问题

接下来我们用「条件变量」解决生产者与消费者问题

现在我们的问题是:

如何在生产者产生条目以后,消费者能够立即使用

#include "./unpipc.h"

#define MAXNITEMS 1000000
#define MAXNTHREADS 100

int nitems = 0;

struct
{
    // 互斥锁
    pthread_mutex_t mutex;

    // 共享资源
    int buff[MAXNITEMS];
    // buff 的索引
    int nput;
    // 放进去的值
    int nvalue;
} put = {PTHREAD_MUTEX_INITIALIZER};

struct {
    pthread_mutex_t mutex;
    pthread_cond_t cond;
    int nready; /* number ready for consumer */
} nready = {PTHREAD_MUTEX_INITIALIZER, PTHREAD_COND_INITIALIZER};

/*
argv 输入 线程数 和 要产生多少 item 的数目

share.nput 记录着要写入的 buff 索引

share.nvalue 记录要写入的值

条件变量的结构体设置三个值

mutex 用于锁
cond 用于条件满足的 signal
nready 如果生产者产生了一个 item nready++ 然后设置条件
消费者线程被唤醒,执行操作以后,nready--
*/

void *produce(void *);
void *consume(void *);

int main(int argc, char **argv)
{
    int i, nthreads, count[MAXNTHREADS];
    pthread_t tid_produce[MAXNTHREADS], tid_consume;

    if (argc != 3)
    {
        printf("usage:  prodcons2 <#items> <#threads>");
        return 0;
    }

    nitems = min(atoi(argv[1]), MAXNITEMS);
    nthreads = min(atoi(argv[2]), MAXNTHREADS);

    // 设置并发
    // pthread_setconcurrency(nthreads + 1);
    // 产生所有的生产者
    for (i = 0; i < nthreads; i++)
    {
        count[i] = 0;
        pthread_create(&tid_produce[i], NULL, produce, &count);
    }
    // 创建消费者
    pthread_create(&tid_consume, NULL, consume, NULL);

    for (i = 0; i < nthreads; i++)
    {   
        pthread_join(tid_produce[i], NULL);
        // 看每个线程执行了多少次
        printf("[%d]th thread = %d\n", i, count[i]);
    }
    // 等待消费者
    pthread_join(tid_consume, NULL);
    return 0;
}

void *produce(void *arg)
{

    for (;;)
    {
        // 上锁
        pthread_mutex_lock(&put.mutex);
        // 数组满了,解锁退出线程
        if (put.nput >= nitems)
        {
            pthread_mutex_unlock(&put.mutex);
            return (NULL);
        }
        put.buff[put.nput] = put.nvalue;
        put.nput++;
        put.nvalue++;
        *((int *)arg) += 1;
        pthread_mutex_unlock(&put.mutex);

        // 完成一次操作
        pthread_mutex_lock(&nready.mutex);
        if (nready.nready == 0)
            // 唤醒另外一个线程
            pthread_cond_signal(&nready.cond);
        nready.nready++;
        pthread_mutex_unlock(&nready.mutex);
    }
}
/*

如果按照我们想象的那样 

buff[i] == i

所以我们的消费者只是用来检测是否出错
*/
void *consume(void *arg)
{
    int i;
    for (i = 0; i < nitems; i++)
    {
        pthread_mutex_lock(&nready.mutex);
        while (nready.nready == 0)
            pthread_cond_wait(&nready.cond, &nready.mutex);
        nready.nready--;
        pthread_mutex_unlock(&nready.mutex);

        if (put.buff[i] != i)
        {
            printf("buff[%d] = %d\n", i, put.buff[i]);
        }
    }
    return (NULL);
}

头文件在这里:https://github.com/TensShinet/learn_IPC/blob/master/my_code/mutex/unpipc.h

虽然这个代码看起来复杂其实是有套路的,我们要做的就是记住这个套路,给条件变量发送信号的代码大体如下:

struct {
    pthread_mutex_t mutex;
    pthread_cond_t cond;
    // 维护本条件的各个变量
} var = {PTHREAD_MUTEX_INITIALIZER, PTHREAD_COND_INITIALIZER};

pthread_mutex_lock(&var.mutex);

// 条件为真
// 发信号
pthread_cond_signal(&var.cond);
pthread_mutex_unlock(&var.mutex);


pthread_mutex_lock(&var.mutex);
while(条件为假)
    // 阻塞
    pthread_mutex_wait(&var.cond, &var.mutex);

// 修改条件
...
    
 pthread_mutex_unlock(&var.mutex);

默认情况下,pthread_mutex_lock 是阻塞函数,如果两个线程使用同一个互斥锁,那么后使用的会被阻塞,直到这个锁被解开,上述的代码就有这个问题,重复上锁,导致阻塞,修改方式如下

int t = 0;
// 完成一次操作
pthread_mutex_lock(&nready.mutex);
t = nready.nready == 0 ? 1 : 0
nready.nready++;
pthread_mutex_unlock(&nready.mutex);

if(t) {
    // 唤醒另外一个线程
    pthread_cond_signal(&nready.cond);
}

0X04「条件变量」高级用法

这部分的内容在《UNIX 网络编程——进程间通信》7.6 以后,本博客只稍微介绍一点

#include <pthread.h>

// 唤起所有被相同条件阻塞的线程
int pthread_cond_broadcast (pthread_cond_t *cpir);

// 阻塞有时间限制
int pthread_cond_timedwait (pthread_cond_t *cpir, pthread_mutex_t *mptr,
const struct timespec abstime);
最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 202,607评论 5 476
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 85,047评论 2 379
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 149,496评论 0 335
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 54,405评论 1 273
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 63,400评论 5 364
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 48,479评论 1 281
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 37,883评论 3 395
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 36,535评论 0 256
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 40,743评论 1 295
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 35,544评论 2 319
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 37,612评论 1 329
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 33,309评论 4 318
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 38,881评论 3 306
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 29,891评论 0 19
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 31,136评论 1 259
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 42,783评论 2 349
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 42,316评论 2 342