在 C++ 开发中,原生的线程库主要有两个,一个是 Linux 下的 <pthread.h>
,另一个是 C++11 提供的 <thread>
以前一直用的是 pthread 的 API 写 C++ 的多线程程序,直到听说从 C++11 开始的标准库已经包含了对线程的支持。
推荐阅读:https://chengxumiaodaren.com/docs/concurrent/
在 C++ 开发中,原生的线程库主要有两个,一个是 Linux 下的 <pthread.h>
,另一个是 C++11 提供的 <thread>
。
以前一直用的是 pthread 的 API 写 C++ 的多线程程序,直到听说从 C++11 开始的标准库已经包含了对线程的支持。
pthread pthread 中的 p 是 POSIX (Portable Operating System Interface) 的缩写,是 IEEE 为了在各种 UNIX 操作系统上运行软件,而定义 API 的一系列互相关联的标准总称。相比于 std::thread
的简便易用,pthread
功能比较强大。
线程的创建和管理 创建线程|pthread_create
每个线程都有一个在进程中唯一的线程标识符,用一个数据类型 pthread_t
表示,该数据类型在 Linux 中就是一个无符号长整型数据。
1 int pthread_create (pthread_t *thread, pthread_attr_t *attr, void *(*start_routine)(void *), void *arg) ;
若创建成功,返回 0;若出错,则返回错误编号:
thread 是线程标识符,但这个参数不是由用户指定的,而是由 pthread_create
函数在创建时将新线程的标识符放到这个变量中
attr 指定线程的属性,可以用 NULL 表示默认属性
start_routine 指定线程开始运行的函数
arg 是 start_routine 所需的参数,是一个无类型指针
默认地,线程在被创建时要被赋予一定的属性,这个属性存放在数据类型 pthread_attr_t
中,它包含了线程的调度策略,堆栈的相关信息,join
or detach
的状态等。
pthread_attr_init
和 pthread_attr_destroy
函数分别用来创建和销毁 pthread_attr_t
,具体函数声明可参考 man 手册帮助。
结束线程|pthread_exit
、pthread_cancel
当发生以下情形之一时,线程就会结束:
线程运行的函数 return 了,也就是线程的任务已经完成;
线程调用了 pthread_exit()
;
其他线程调用 pthread_cancel()
结束了线程;
进程调用 exec()
或 exit()
结束;
main()
函数先结束了,而且 main()
自己没有调用 pthread_exit()
来等所有线程完成任务。
更抽象地说,线程结束执行的方式共有 3 种,分别是:
线程将指定函数体中的代码执行完后自行结束;
线程执行过程中,遇到 pthread_exit()
函数结束执行。
线程执行过程中,被同一进程中的其它线程(包括主线程)强制终止;
当然,一个线程结束,并不意味着它的所有信息都已经消失,后面会看到僵尸线程 的问题。
下面介绍两个函数:
1 void pthread_exit (void *retval) ;
retval
是由用户指定的参数,pthread_exit
完成之后可以通过这个参数获得线程的退出状态/信息。
1 int pthread_cancel (pthread_t thread) ;
一个线程可以通过调用 pthread_cancel
函数来请求取消同一进程中的线程,这个线程由 thread
参数指定。
如果操作成功则返回 0,失败则返回对应的错误编码。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 #include <pthread.h> #include <stdio.h> #include <stdlib.h> void * thread_Fun (void * arg) { printf ("新建线程开始执行\n" ); sleep (10 ); } int main () { pthread_t myThread; void * mess; int value; int res; res = pthread_create (&myThread, NULL , thread_Fun, NULL ); if (res != 0 ) { printf ("线程创建失败\n" ); return 0 ; } sleep (1 ); res = pthread_cancel (myThread); if (res != 0 ) { printf ("终止 myThread 线程失败\n" ); return 0 ; } res = pthread_join (myThread, &mess); if (res != 0 ) { printf ("等待线程失败\n" ); return 0 ; } if (mess == PTHREAD_CANCELED) { printf ("myThread 线程被强制终止\n" ); } else { printf ("error\n" ); } return 0 ; }
1 2 3 $ ./pthread 新建线程开始执行 myThread 线程被强制终止
一个简单的多线程实现 这是一个非常简单的基于 pthread 的多线程实现:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 #include <pthread.h> #include <stdio.h> #include <stdlib.h> #define NUM_THREADS 5 void *printHello (void *thread_id) { long tid; tid = (long )thread_id; printf ("Hello World! It's me, thread #%ld!\n" , tid); pthread_exit(NULL ); } int main (int argc, char *argv[]) { pthread_t threads[NUM_THREADS]; int rc; long t; for (t = 0 ; t < NUM_THREADS; t++) { printf ("In main: creating thread %ld\n" , t); rc = pthread_create(&threads[t], NULL , printHello, (void *)t); if (rc) { printf ("ERROR; return code frome pthread_create() is %d\n" , rc); exit (-1 ); } } pthread_exit(NULL ); }
1 2 gcc -Wall _pthread.c -lpthread -o pthread ./pthread
1 2 3 4 5 6 7 8 9 10 In main: creating thread 0 In main: creating thread 1 Hello World! It's me, thread #0! In main: creating thread 2 Hello World! It' s me, thread In main: creating thread 3 Hello World! It's me, thread #2! In main: creating thread 4 Hello World! It' s me, thread Hello World! It's me, thread #4!
注意输出的顺序可能不同, 要特别注意的是,main()
显示地调用了 pthread_exit()
来等待其他线程的结束(如果不使用这个函数的话,可能 main()
函数结束了也有线程没有执行完毕)
给线程传入初始化参数 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 #include <pthread.h> #include <stdio.h> #include <stdlib.h> #define NUM_THREADS 8 char *messages[NUM_THREADS];struct thread_data { int tid; int sum; char *msg; }; struct thread_data _datas [NUM_THREADS ];void *printHello (void *thread_arg) { int task_id, sum; char *hello_msg; struct thread_data *my_data ; my_data = (struct thread_data *)thread_arg; task_id = my_data->tid; sum = my_data->sum; hello_msg = my_data->msg; printf ("Thread %d: %s Sum = %d\n" , task_id, hello_msg, sum); pthread_exit(NULL ); } int main (int argc, char *argv[]) { pthread_t threads[NUM_THREADS]; int *task_ids[NUM_THREADS]; int rc, t, sum; sum = 0 ; messages[0 ] = "English: Hello World!" ; messages[1 ] = "French: Bonjour, le monde!" ; messages[2 ] = "Spanish: Hola al mundo" ; messages[3 ] = "Klingon: Nuq neH!" ; messages[4 ] = "German: Guten Tag, Welt!" ; messages[5 ] = "Russian: Zdravstvytye, mir!" ; messages[6 ] = "Japan: Sekai e konnichiwa!" ; messages[7 ] = "Latin: Orbis, te saluto!" ; for (t = 0 ; t < NUM_THREADS; t++) { sum = sum + t; _datas[t].tid = t; _datas[t].sum = sum; _datas[t].msg = messages[t]; printf ("Creating thread %d\n" , t); rc = pthread_create(&threads[t], NULL , printHello, (void *)&_datas[t]); if (rc) { printf ("ERROR; return code from pthread_create() is %d\n" , rc); exit (-1 ); } } pthread_exit(NULL ); }
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 Creating thread 0 Creating thread 1 Thread 0: English: Hello World! Sum = 0 Creating thread 2 Thread 1: French: Bonjour, le monde! Sum = 1 Creating thread 3 Thread 2: Spanish: Hola al mundo Sum = 3 Creating thread 4 Thread 3: Klingon: Nuq neH! Sum = 6 Creating thread 5 Thread 4: German: Guten Tag, Welt! Sum = 10 Creating thread 6 Thread 5: Russian: Zdravstvytye, mir! Sum = 15 Creating thread 7 Thread 6: Japan: Sekai e konnichiwa! Sum = 21 Thread 7: Latin: Orbis, te saluto! Sum = 28
对线程的阻塞|pthread_join
、pthread_detach
阻塞时线程之间「同步 」的一种方法。
1 int pthread_join (pthread_t thread_id, void **value_ptr) ;
pthread_join
函数会让调用它的线程等待 thread_id
线程运行结束之后再运行(如果是 main()
调用,则阻塞 main 线程,直到 join 的所有线程执行结束 —— 常用于等待 main 中创建的所有线程执行完毕)
value_ptr
存放了其他线程的返回值
一个可以被 join 的线程,仅仅可以被另一个线程 join,如果同时有多个线程尝试 join 同一个线程时,最终结果是未知的;另外,线程不能 join 自己。上面提到过,创建一个线程时,要赋予它一定的属性,这其中就包括 joinable or detachable 的属性,只有被声明称 joinable 的线程才可以被其他线程 join。
POSIX 标准的最终版本指出线程应该被设置成 joinable 的,显式设置一个线程为 joinable,需要以下四个步骤:
Declare a pthread attribute variable of the pthread_attr_t
data type
Initialize the attribute variable with pthread_attr_init()
Set the attribute detached status with pthread_attr_setdetchstate()
When done, free library resources used by the attribute with pthread_attr_destroy()
pthread_join()
函数会一直阻塞调用它的线程,直至目标线程执行结束(接收到目标线程的返回值),阻塞状态才会解除。如果 pthread_join() 函数成功等到了目标线程执行结束(成功获取到目标线程的返回值),返回值为数字 0;反之如果执行失败,函数会根据失败原因返回相应的非零值,每个非零值都对应着不同的宏,例如:
EDEADLK
:检测到线程发生了死锁。
EINVAL
:分为两种情况,要么目标线程本身不允许其它线程获取它的返回值,要么事先就已经有线程调用 pthread_join()
函数获取到了目标线程的返回值。
ESRCH
:找不到指定的 thread 线程。
再次强调,一个线程执行结束的返回值只能由一个 pthread_join()
函数获取,当有多个线程调用 pthread_join()
函数获取同一个线程的执行结果时,哪个线程最先执行 pthread_join()
函数,执行结果就由那个线程获得,其它线程的 pthread_join()
函数都将执行失败。
对于一个默认属性的线程 A 来说,线程占用的资源并不会因为执行结束而得到释放。而通过在其它线程中执行pthread_join(A,NULL);
语句,可以轻松实现“及时释放线程 A 所占资源”的目的。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 #include <errno.h> #include <pthread.h> #include <stdio.h> void *ThreadFun (void *arg) { pthread_exit("test_msg" ); }int main () { int res; void *thread_result; pthread_t myThread; res = pthread_create(&myThread, NULL , ThreadFun, NULL ); if (res != 0 ) { printf ("线程创建失败" ); return 0 ; } res = pthread_join(myThread, &thread_result); if (res != 0 ) { printf ("1:等待线程失败" ); } printf ("%s\n" , (char *)thread_result); res = pthread_join(myThread, &thread_result); if (res == ESRCH) { printf ("2:等待线程失败,线程不存在\n" ); } return 0 ; }
1 2 3 $ ./pthread test_msg 2:等待线程失败,线程不存在
__detachstate
属性值用于指定线程终止执行的时机,该属性的值有两个,分别是:
PTHREAD_CREATE_JOINABLE
(默认值):线程执行完函数后不会自行释放资源;
PTHREAD_CREATE_DETACHED
:线程执行完函数后,会自行终止并释放占用的资源。
还有 pthread_detach()
函数,可以直接将目标线程的 __detachstate
属性改为 PTHREAD_CREATE_DETACHED
,语法格式如下:
1 int pthread_detach (pthread_t thread) ;
关于 __detachstate 属性,<pthread.h> 头文件中提供了 2 个与它相关的函数,分别是:
1 2 int pthread_attr_getdetachstate (const pthread_attr_t * attr,int * detachstate) ;int pthread_attr_setdetachstate (pthread_attr_t *sttr,int detachstate) ;
可以如下创建 detach 状态的线程:
1 2 3 4 5 pthread_t tid;pthread_attr_t attr;pthread_attr_init(&attr); pthread_attr_setdetachstate(&attr, PTHREAD_CREATE_DETACHED); pthread_create(&tid, &attr, THREAD_FUNCTION, arg);
⚠️ 值得注意的是:僵尸线程(zombie thread)是一种已经退出了的 joinable 线程,但是等待其他线程调用 pthread_join
来 join 它,以收集它的退出信息 。如果没有其他线程调用 pthread_join
来 join 它的话,它占用的一些系统资源不会被释放,比如堆栈。如果 main()
函数需要长时间运行,并且创建大量 joinable 的线程,就有可能出现堆栈不足的 error。
⚠️ 对于那些不需要 join 的线程,最好利用 pthread_detach
,这样它运行结束后,资源就会及时得到释放 。注意一个线程被使用 pthread_detach
之后,它就不能再被改成 joinable 的了。
⚠️ 总而言之,创建的每一个线程都应该使用 pthread_join
或者 pthread_detach
其中一个,以防止僵尸线程的出现。
Linux 线程属性之线程栈大小|pthread_attr_t
线程的属性用 pthread_attr_t
类型的变量表示,使用此变量前,必须调用 pthread_attr_init()
函数进行初始化:
1 int pthread_attr_init (pthread_attr_t * attr) ;
pthread_attr_t
是一种结构体类型,内部包含多种线程属性(更多内容请看参考资料):
1 2 3 4 5 6 7 8 9 10 11 12 typedef struct { int __detachstate; int __schedpolicy; struct sched_param __schedparam ; int __inheritsched; int __scope; size_t __guardsize; int __stackaddr_set; void * __stackaddr; size_t __stacksize; } pthread_attr_t ;
POSIX 标准没有规定一个线程的堆栈大小,安全可移植的程序不会依赖于具体实现默认的堆栈限制,而是显式地调用 pthread_attr_setstacksize
来分配足够的堆栈空间。
关于堆栈大小的一个例子:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 #include <pthread.h> #define N_THREADS 5 #define N 1000 #define MEGEXTRA 1000000 pthread_attr_t _attr;void * do_work (void * thread_id) { double A[N][N]; int i, j; long tid; size_t my_stack_size; tid = (long )thread_id; pthread_attr_getstacksize(&_attr, &my_stack_size); printf ("Thread %ld: stack size = %ld bytes \n" , tid, my_stack_size); for (i = 0 ; i < N; i++) { for (j = 0 ; j < N; j++) { A[i][j] = ((i * j) / 3.452 ) + (N - i); } } pthread_exit(NULL ); } int main (int argc, char * argv[]) { pthread_t threads[N_THREADS]; size_t stack_size; int rc; long t; pthread_attr_init(&_attr); pthread_attr_getstacksize(&_attr, &stack_size); printf ("Default stack size = %li\n" , stack_size); stack_size = sizeof (double ) * N * N + MEGEXTRA; printf ("Amount of stack needed per thread = %li\n" , stack_size); pthread_attr_setstacksize(&_attr, stack_size); printf ("Creating threads with stack size = %li bytes\n" , stack_size); for (t = 0 ; t < N_THREADS; t++) { rc = pthread_create(&threads[t], &_attr, do_work, (void *)t); if (rc) { printf ("ERROR; return code from pthread_create() is %d\n" , rc); exit (-1 ); } } printf ("Creating %ld threads.\n" , t); pthread_exit(NULL ); }
1 2 3 4 5 6 7 8 9 10 $ ./pthread Default stack size = 8388608 Amount of stack needed per thread = 9000000 Creating threads with stack size = 9000000 bytes Creating 5 threads. Thread 1: stack size = 9000000 bytes Thread 2: stack size = 9000000 bytes Thread 0: stack size = 9000000 bytes Thread 3: stack size = 9000000 bytes Thread 4: stack size = 9000000 bytes
其他相关函数:
1 2 3 4 pthread_self(); pthread_equal(thread_1, thread_2);
互斥锁 Mutex Mutex 常常被用来保护那些可以被多个线程访问的共享资源,比如可以防止多个线程同时更新同一个数据时出现混乱。
使用互斥锁的一般步骤是:
创建一个互斥锁,即声明一个 pthread_mutex_t
类型的数据,然后初始化,只有初始化之后才能使用;
多个线程尝试锁定这个互斥锁;
只有一个成功锁定互斥锁,成为互斥锁的拥有者,然后进行一些指令;
拥有者解锁互斥锁;
其他线程尝试锁定这个互斥锁,重复上面的过程;
最后互斥锁被显式地调用 pthread_mutex_destroy
来进行销毁。
有两种方式初始化一个互斥锁:
1️⃣ 第一种,利用已经定义的常量初始化,例如:
1 pthread_mutex_t mymutex = PTHREAD_MUTEX_INITIALIZER;
2️⃣ 第二种方式是调用 pthread_mutex_init(mutex, attr)
进行初始化。
当多个线程同时去锁定同一个互斥锁时,失败的那些线程
如果是用 pthread_mutex_lock
函数,那么会被阻塞,直到这个互斥锁被解锁,它们再继续竞争;
如果是用 pthread_mutex_trylock
函数,那么失败者只会返回一个错误。
最后需要指出的是,保护共享数据是程序员的责任。程序员要负责所有可以访问该数据的线程都使用 mutex
这种机制,否则,不使用 mutex
的线程还是有可能对数据造成破坏。
相关函数:
1 2 3 4 5 6 7 int pthread_mutex_init (pthread_mutex_t *__mutex, const pthread_mutexattr_t *__mutexattr) ;int pthread_mutex_destroy (pthread_mutex_t *__mutex) ;int pthread_mutex_lock (pthread_mutex_t *__mutex) ;int pthread_mutex_unlock (pthread_mutex_t *__mutex) ;int pthread_mutex_trylock (pthread_mutex_t *__mutex) ;int pthread_mutexattr_init (pthread_mutexattr_t *__attr) ;int pthread_mutexattr_destroy (pthread_mutexattr_t *__attr) ;
Example 下面是一个利用多线程进行向量点乘的程序,其中需要对 dotstr.sum
这个共同读写的数据进行保护。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 #include <pthread.h> #include <stdio.h> #include <stdlib.h> typedef struct { double *a; double *b; double sum; int veclen; } DOTDATA; #define NUMTHRDS 4 #define VECLEN 100000 DOTDATA dotstr; pthread_t callThd[NUMTHRDS];pthread_mutex_t mutexsum;void *dotprod (void *arg) { int i, start, end, len; long offset; double mysum, *x, *y; offset = (long )arg; len = dotstr.veclen; start = offset * len; end = start + len; x = dotstr.a; y = dotstr.b; mysum = 0 ; for (i = start; i < end; i++) { mysum += (x[i] * y[i]); } pthread_mutex_lock(&mutexsum); dotstr.sum += mysum; printf ("Thread %ld did %d to %d: mysum=%f global sum=%f\n" , offset, start, end, mysum, dotstr.sum); pthread_mutex_unlock(&mutexsum); pthread_exit((void *)0 ); } int main (int argc, char *argv[]) { long i; double *a, *b; void *status; pthread_attr_t attr; a = (double *)malloc (NUMTHRDS * VECLEN * sizeof (double )); b = (double *)malloc (NUMTHRDS * VECLEN * sizeof (double )); for (i = 0 ; i < VECLEN * NUMTHRDS; i++) { a[i] = 1 ; b[i] = a[i]; } dotstr.veclen = VECLEN; dotstr.a = a; dotstr.b = b; dotstr.sum = 0 ; pthread_mutex_init(&mutexsum, NULL ); pthread_attr_init(&attr); pthread_attr_setdetachstate(&attr, PTHREAD_CREATE_JOINABLE); for (i = 0 ; i < NUMTHRDS; i++) { pthread_create(&callThd[i], &attr, dotprod, (void *)i); } pthread_attr_destroy(&attr); for (i = 0 ; i < NUMTHRDS; i++) { pthread_join(callThd[i], &status); } printf ("Sum = %f \n" , dotstr.sum); free (a); free (b); pthread_mutex_destroy(&mutexsum); pthread_exit(NULL ); }
1 2 3 4 5 6 $ ./pthread Thread 0 did 0 to 100000: mysum=100000.000000 global sum =100000.000000 Thread 2 did 200000 to 300000: mysum=100000.000000 global sum =200000.000000 Thread 1 did 100000 to 200000: mysum=100000.000000 global sum =300000.000000 Thread 3 did 300000 to 400000: mysum=100000.000000 global sum =400000.000000 Sum = 400000.000000
条件变量 Condition Variable 互斥锁只有两种状态,这限制了它的用途。条件变量允许线程在阻塞的时候等待另一个线程发送的信号,当收到信号后,阻塞的线程就被唤醒并试图锁定与之相关的互斥锁。条件变量要和互斥锁结合使用 。
条件变量的声明和初始化 通过声明 pthread_cond_t
类型的数据,并且必须先初始化才能使用。
初始化的方法也有两种:
1️⃣ 第一种,利用内部定义的常量,例如:
1 pthread_cond_t myconvar = PTHREAD_COND_INITIALIZER;
2️⃣ 第二种,利用函数 pthread_cond_init(cond, attr)
,其中 attr 由 pthread_condattr_init()
和 pthread_condattr_destroy()
创建和销毁;可以用 pthread_cond_destroy()
销毁一个条件变量。
相关函数:
1 2 3 int pthread_cond_wait (pthread_cond_t *__restrict__ __cond, pthread_mutex_t *__restrict__ __mutex) ;int pthread_cond_signal (pthread_cond_t *__cond) ;int pthread_cond_broadcast (pthread_cond_t *__cond) ;
pthread_cond_wait()
会阻塞调用它的线程,直到收到某一个信号:这个函数需要在 mutex 已经被锁之后进行调用,并且当线程被阻塞时,会自动解锁 mutex。信号收到后,线程被唤醒,这时 mutex 又会被这个线程锁定。
pthread_cond_signal()
函数结束时,必须解锁 mutex,以供 pthread_cond_wait()
锁定mutex。
当不止一个线程在等待信号时 ,要用 pthread_cond_broadcast()
代替 pthread_cond_signal()
来告诉所有被该条件变量阻塞的线程结束阻塞状态。
Example 下面是一个例子,三个线程共同访问 count
变量,thread 2 和 thread 3 竞争地对其进行加 1 的操作,thread 1 等 count 达到 12 的时候,一次性加 125 。 然后 thread 2 和 thread 3 再去竞争 count 的控制权,直到完成自己的对 count 加 10 次的任务。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 #include <pthread.h> #include <stdio.h> #include <stdlib.h> #define NUM_THREADS 3 #define TCOUNT 10 #define COUNT_LIMIT 12 int count = 0 ;pthread_mutex_t count_mutex;pthread_cond_t count_threshold_cv;void *inc_count (void *t) { int i; long my_id = (long )t; for (i = 0 ; i < TCOUNT; i++) { pthread_mutex_lock(&count_mutex); count++; if (count == COUNT_LIMIT) { printf ("inc_count(): thread %ld, count = %d Threshold reached. " , my_id, count); pthread_cond_signal(&count_threshold_cv); printf ("Just sent signal.\n" ); } printf ("inc_count(): thread %ld, count = %d, unlocking mutex\n" , my_id, count); pthread_mutex_unlock(&count_mutex); sleep(1 ); } pthread_exit(NULL ); } void *watch_count (void *t) { long my_id = (long )t; printf ("Starting watch_count(): thread %ld\n" , my_id); pthread_mutex_lock(&count_mutex); while (count < COUNT_LIMIT) { printf ("watch_count(): thread %ld Count= %d. Going into wait...\n" , my_id, count); pthread_cond_wait(&count_threshold_cv, &count_mutex); printf ("watch_count(): thread %ld Condition signal received. Count= %d\n" , my_id, count); printf ("watch_count(): thread %ld Updating the value of count...\n" , my_id, count); count += 125 ; printf ("watch_count(): thread %ld count now = %d.\n" , my_id, count); } printf ("watch_count(): thread %ld Unlocking mutex.\n" , my_id); pthread_mutex_unlock(&count_mutex); pthread_exit(NULL ); } int main (int argc, char *argv[]) { int i, rc; long t1 = 1 , t2 = 2 , t3 = 3 ; pthread_t threads[3 ]; pthread_attr_t attr; pthread_mutex_init(&count_mutex, NULL ); pthread_cond_init(&count_threshold_cv, NULL ); pthread_attr_init(&attr); pthread_attr_setdetachstate(&attr, PTHREAD_CREATE_JOINABLE); pthread_create(&threads[0 ], &attr, watch_count, (void *)t1); pthread_create(&threads[1 ], &attr, inc_count, (void *)t2); pthread_create(&threads[2 ], &attr, inc_count, (void *)t3); for (i = 0 ; i < NUM_THREADS; i++) { pthread_join(threads[i], NULL ); } printf ("Main(): Waited and joined with %d threads. Final value of count = %d. Done.\n" , NUM_THREADS, count); pthread_attr_destroy(&attr); pthread_mutex_destroy(&count_mutex); pthread_cond_destroy(&count_threshold_cv); pthread_exit(NULL ); }
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 $ ./pthread Starting watch_count(): thread 1 inc_count(): thread 2, count = 1, unlocking mutex inc_count(): thread 3, count = 2, unlocking mutex watch_count(): thread 1 Count= 2. Going into wait ... inc_count(): thread 2, count = 3, unlocking mutex inc_count(): thread 3, count = 4, unlocking mutex inc_count(): thread 2, count = 5, unlocking mutex inc_count(): thread 3, count = 6, unlocking mutex inc_count(): thread 2, count = 7, unlocking mutex inc_count(): thread 3, count = 8, unlocking mutex inc_count(): thread 2, count = 9, unlocking mutex inc_count(): thread 3, count = 10, unlocking mutex inc_count(): thread 2, count = 11, unlocking mutex inc_count(): thread 3, count = 12 Threshold reached. Just sent signal. inc_count(): thread 3, count = 12, unlocking mutex watch_count(): thread 1 Condition signal received. Count= 12 watch_count(): thread 1 Updating the value of count... watch_count(): thread 1 count now = 137. watch_count(): thread 1 Unlocking mutex. inc_count(): thread 2, count = 138, unlocking mutex inc_count(): thread 3, count = 139, unlocking mutex inc_count(): thread 2, count = 140, unlocking mutex inc_count(): thread 3, count = 141, unlocking mutex inc_count(): thread 2, count = 142, unlocking mutex inc_count(): thread 3, count = 143, unlocking mutex inc_count(): thread 3, count = 144, unlocking mutex inc_count(): thread 2, count = 145, unlocking mutex Main(): Waited and joined with 3 threads. Final value of count = 145. Done.
std::thread 在 C++11 中引入的线程库 std::thread
实际是基于 pthread
实现的,后续主要介绍:
如何使用 std::thread
创建线程
深入剖析 std::thread
的设计原理
使用 std::thread 当你创建了一个(非空的)线程对象时,对应线程就会执行,不需要显式的调用 start
或者 run
(pthread 也是)。如果之前你没有用过 pthread,也许不会理解何为“方便得出人意料”:
pthread_create
只接受 void *f(void *)
,所以如果你想调用现成的函数,还需要包装一下;
而且 pthread_create
其函数接受参数(第四个参数)类型为 void *arg
,如果要传多个参数,还需要定义结构体,接着将结构体转为 void *
类型再传递进去;
这还没完,传递进去的参数还需要在其内部函数中,重新转型成(可能是一次性的)某个结构体,最后才能取出其中的变量;
创建线程后,调用 Thread.join
就会阻塞到线程执行结束为止(相当于pthread_join
)。你也可以选择 detach
该线程,这时候线程会独立执行,不会随调用者终止而结束。
在如下的 demo 中,主线程中使用 std::thread
创建 3 个子线程,线程入口函数是 do_some_work
,在主线程运行结束前等待子线程的结束。
注:在构造线程对象 std::thread{do_some_work, i}
的时候,还是建议使用 {}
而不是 ()
,以防止编译器产生错误的决议,具体原因可以参考文章(深入了解 C++:别再徘徊于 {} 与 () 之间了 )
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 #include <iostream> #include <thread> #include <vector> const int N_THREADS = 3 ;void do_some_work (int num) { std::cout << "thread: " << num << std::endl; }int main (int argc, char const * argv[]) { std::vector<std::thread> thread_list; thread_list.reserve (N_THREADS); for (int i = 0 ; i < N_THREADS; i++) { thread_list.emplace_back (do_some_work, i); } std::cout << "work in main thread" << std::endl; for (int i = 0 ; i < N_THREADS; i++) { thread_list[i].join (); } std::cout << "main thread end" << std::endl; return 0 ; }
三个子线程共享输出缓冲区 std::cout
,此时没有采取任何机制保护线程间共享数据,因此上面 demo 的输出可能不符合你的预期,即很可能不是按照如下格式输出:
1 2 3 4 $ ./thread thread: 0 thread: 1 thread: 2
实际输出结果(非常混乱):
1 2 3 4 5 6 $ ./thread thread: work in main thread thread: 0 2 thread: 1 main thread end
从输出可以看出:
先创建的线程,未必就先运行;
而且几个线程之间是互相抢 CPU 资源的
线程间数据共享问题及其应对措施,留到后文讲解,下面讲解 std::thread
的设计。
深入剖析 std::thread 在 g++
中,thread
是基于 pthread
实现的 。本次主要从以下三个方面分 std::thread
:
std::thread
对象不可复制,只具有移动属性
每个线程具有唯一的标志,即线程 id
创建子线程(即构造 std::thread
)
1. 移动属性 有很多书籍说,std::thread
对象的所有权只能传递不能复制。实际上,就 std::thread
对象,只具有移动属性,不具有复制属性。std::thread
的构造函数如下:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 class thread { private : id _M_id; public : thread () noexcept = default ; template <typename _Callable, typename ... _Args, typename = _Require<__not_same<_Callable>>> explicit thread (_Callable&& __f, _Args&&... __args) { } ~thread () { if (joinable ()) std::terminate (); } thread (const thread&) = delete ; thread& operator =(const thread&) = delete ; thread (thread&& __t ) noexcept { swap (__t ); } thread& operator =(thread&& __t ) noexcept { if (joinable ()) std::terminate (); swap (__t ); return *this ; } }
可以发现,std::thread
禁止了复制构造函数、复制赋值表达式,只留下了移动构造函数、移动赋值,使得 std::thread
对象只能移动,不能复制。这就是之前 demo 中使用 emplace_back
函数添加 std::thread
对象的原因,防止触发复制构造函数。所以向 thread_list 中添加 std::thread
对象有以下几种方式:
当 push_back 接受的是右值,底层调用的还是 emplace_back 函数,因此 4 和 5 是等价的。
1 2 3 4 5 6 7 8 9 10 thread_list.push_back (std::thread{do_some_work, i}); thread_list.emplace_back (do_some_work, i); thread_list.emplace_back (std::thread{do_some_work, i}); std::thread trd{do_some_work, i}; thread_list.push_back (trd); thread_list.push_back (std::move (trd)); thread_list.emplace_back (std::move (trd));
第三种办法报错:
1 2 3 4 5 6 7 8 9 10 /usr/include/c++/9 /ext/new_allocator.h: In instantiation of ‘void __gnu_cxx::new_allocator<_Tp>::construct (_Up*, _Args&& ...) [with _Up = std::thread; _Args = {std::thread&}; _Tp = std::thread]’: /usr/include/c++/9 /bits/alloc_traits.h:483 :4 : required from ‘static void std::allocator_traits<std::allocator<_CharT> >::construct (std::allocator_traits<std::allocator<_CharT> >::allocator_type&, _Up*, _Args&& ...) [with _Up = std::thread; _Args = {std::thread&}; _Tp = std::thread; std::allocator_traits<std::allocator<_CharT> >::allocator_type = std::allocator<std::thread>]’ /usr/include/c++/9 /bits/vector.tcc:115 :30 : required from ‘void std::vector<_Tp, _Alloc>::emplace_back (_Args&& ...) [with _Args = {std::thread&}; _Tp = std::thread; _Alloc = std::allocator<std::thread>]’ _thread.cpp:22 :37 : required from here /usr/include/c++/9 /ext/new_allocator.h:146 :4 : error: use of deleted function ‘std::thread::thread (const std::thread&)’ 146 | { ::new ((void *)__p) _Up(std::forward<_Args>(__args)...); } | ^~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ In file included from _thread.cpp:3 : /usr/include/c++/9 /thread:142 :5 : note: declared here 142 | thread (const thread&) = delete ;
2. std::thread::id 可以发现,在 std::thread
对象中,只有一个成员变量 _M_id
。
这个类 id 全称是 std::thread::id
,实现如下:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 class id { native_handle_type _M_thread; public : id () noexcept : _M_thread() {} explicit id (native_handle_type __id) : _M_thread(__id) { } private : friend class thread ; friend class hash <thread::id>; friend bool operator ==(thread::id __x, thread::id __y) noexcept ; friend bool operator <(thread::id __x, thread::id __y) noexcept ; template <class _CharT , class _Traits > friend basic_ostream<_CharT, _Traits>& operator <<(basic_ostream<_CharT, _Traits>& __out, thread::id __id); };
因此,这个 std::thread::id
实际上就是封装了 pthread_t
对象,用作每个线程的标志。
在构造 std::thread
对象的时候,如果没有设置线程入口函数,则线程 _M_id._M_thread
的值是 0
比如下面的 demo,trd
没有设置线程入口函数,trd
调用默认构造函数时,trd
的 _M_id._M_thread
会被初始化为 0
1 2 3 4 5 int main (int argc, char const * argv[]) { std::thread trd; std::cout << trd.get_id () << std::endl; return 0 ; }
但是,打印线程标志 trd.get_id()
,输出的是却不是0。这仅仅是 std::thread::id
在重载 <<
操作符时的设定,用于提示调用者线程没有启动。
1 2 $ g++ thread_.cc -o thread_ && ./thread_ thread::id of a non-executing thread
可以到 std::thread::id
重载的 <<
操作符的函数中一探究竟:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 template <class _CharT , class _Traits >inline basic_ostream<_CharT, _Traits>& operator <<(basic_ostream<_CharT, _Traits>& __out, thread::id __id) { if (__id == thread::id ()) return __out << "thread::id of a non-executing thread" ; else return __out << __id._M_thread; } inline bool operator ==(thread::id __x, thread::id __y) noexcept { return __x._M_thread == __y._M_thread; }
因此判断一个线程是否启动,可如下检测:
1 2 3 bool thread_is_active (const std::thread::id& thread_id) { return thread_id != std::thread::id (); }
设置了线程入口函数,_M_id._M_thread
才是线程的tid
值,由 pthread_create(&tid, NULL, ...)
函数设置:
1 2 3 4 5 6 7 int main (int argc, char const * argv[]) { std::thread trd{[] { std::cout << "work in sub-thread\n" ; }}; std::cout << trd.get_id () << std::endl; trd.join (); return 0 ; }
1 2 3 $ ./thread 140203273147968 work in sub-thread
by the way
在创建 std::thread
对象 trd
时:
如果设置了线程入口函数,那么就必须使用 trd.join()
或者 trd.detach()
来表达子线程与主线程的运行关系,否则在 std::thread
对象析构时,整个程序会被 std::terminate()
中止 。
如果没有设置线程入口函数,trd.joinable()
返回值就是 false
,因此不会触发 std::terminate()
。
1 2 3 4 ~thread () { if (joinable ()) std::terminate (); }
3. 创建子线程 当构造 std::thread
对象时,设置了线程入口函数,会在相匹配的构造函数里调用 pthread_create
函数创建子线程。先看整体实现:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 template <typename _Callable, typename ... _Args, typename = _Require<__not_same<_Callable>>> explicit thread (_Callable&& __f, _Args&&... __args){ static_assert ( __is_invocable<typename decay<_Callable>::type, typename decay<_Args>::type...>::value, "std::thread arguments must be invocable after conversion to rvalues" ); auto __depend = reinterpret_cast <void (*)()>(&pthread_create); _M_start_thread(_S_make_state(__make_invoker(std::forward<_Callable>(__f), std::forward<_Args>(__args)...)), __depend); }
再细看构造函数执行流程:
在编译期判断构造 std::thread
对象时设置的线程入口函数 __f
及其参数 __args
能否调用。
比如,下面的 demo 中,线程入口函数 thread_func
有个 int
类型的参数 arg
,如果传入的参数 __args
无法隐式转换为 int
类型,或者没有设置 __args
,都会触发 std::thread
构造函数中的静态断言 static_assert
。
报错:error: static assertion failed: std::thread arguments must be invocable after conversion to rvalues
1 2 3 4 5 6 7 8 9 void thread_func (int arg) { }int main (int argc, char const *argv[]) { std::thread trd_1{thread_func, "str" }; std::thread trd_2{thread_func}; return 0 ; }
将线程入口函数 __f
及其参数 __args
进一步封装起来。
这里是使用 __make_invoker
完成的:
1 __make_invoker(std::forward<_Callable>(__f), std::forward<_Args>(__args)...);
__make_invoker
的作用是返回一个 _Invoker
对象,_Invoker
是个仿函数,通过 _Invoker()
就可以以指定的参数 __args
直接执行线程入口函数 __f
。类似于 std::bind
:
1 2 3 4 5 6 7 8 9 10 void print_num (int i) { std::cout << i << '\n' ; } int main (int argc, const char * argv[]) { auto invoker = std::bind (print_num, -9 ); invoker (); }
启动子线程
在调用 _M_start_thread
函数启动子线程前,执行过程:
创建 _State_ptr
的对象,来封装 _Invoker
对象,再传递给 _M_start_thread
函数。
传递 _M_start_thread
函数的过程,由 _S_make_state
函数完成,_S_make_state
最终返回 _State_ptr
对象。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 struct _State { virtual ~_State(); virtual void _M_run() = 0 ; }; using _State_ptr = unique_ptr<_State>; template <typename _Callable>struct _State_impl : public _State { _Callable _M_func; _State_impl(_Callable&& __f) : _M_func(std::forward<_Callable>(__f)) { } void _M_run() { _M_func(); } }; template <typename _Callable>static _State_ptr _S_make_state(_Callable&& __f) { using _Impl = _State_impl<_Callable>; return _State_ptr{new _Impl{std::forward<_Callable>(__f)}}; }
_S_make_state
函数,将线程入口函数 __f
及其参数 __args
封装到 _State_ptr
对象 _State_ptr_obj
中, 这样最后可以通过 _State_ptr_obj->_M_run()
来调用 __f
。
下面到了 _M_start_thread
函数:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 void thread::_M_start_thread(_State_ptr state, void (*)()){ const int err = __gthread_create(&_M_id._M_thread, &execute_native_thread_routine, state.get ()); if (err) __throw_system_error(err); state.release (); } static inline int __gthread_create(pthread_t *__threadid, void *(*__func) (void *), void *__args){ return pthread_create (__threadid, NULL , __func, __args); } static void * execute_native_thread_routine (void * __p) { thread::_State_ptr __t {static_cast <thread::_State*>(__p)}; __t ->_M_run(); return nullptr ; }
因此,在执行完 _M_start_thread
函数后,才具有 _M_start_thread != 0
。
Mutex 有时候需要限制多个线程对同一资源的访问,这时候一般会使用 Mutex
。Mutex 就是一把锁,只有某些线程可以同时占用它(通过 lock 操作)。当线程不用的时候,就得通过 unlock 操作来释放它。
对于 Mutex,std::thread
和 pthread
差不多,无非是 pthread_mutex_lock(&mutex)
变成了 mutex.lock()
等等。
不过在 std::thread
中,mutex 往往和 lock 系列模板一起使用。这是因为 lock 系列模板包装了 mutex 类,提供了 RAII 风格的加锁解锁 。
1 2 3 4 5 6 7 { std::lock_guard<std::mutex> guard (mutex) ; ... }
mutex 有四种:
std::mutex
:独占的互斥量,不能递归使用,不带超时功能
std::recursive_mutex
:递归互斥量,可重入,不带超时功能
std::timed_mutex
:带超时的互斥量,不能递归
std::recursive_timed_mutex
:带超时的互斥量,可以递归使用
加解锁方式有三种:
std::lock_guard
:可以RAII方式加锁
std::unique_lock
:比 lock_guard
多了个手动加解锁的功能
std::scoped_lock
:防止多个锁顺序问题导致的死锁问题而出世的一把锁
Condition Variable 有时候线程之间需要某种同步:当某些条件不满足时,停止执行直到该条件被满足。
这时候需要引入 condition variable —— 状态变量。
在经典的「生产者消费者模式」下,生产者和消费者就是通过 condition variable 来实现同步的。
当有限的生产力无法满足日益增长的消费需求时,消费者进程就会去睡一觉,直到它想要的东西生产出来才醒来。
1 2 3 4 5 6 7 8 std::condition_variable condvar; consumer: std::unique_lock<std::mutex> ulock (mutex) ; condvar.wait (ulock, []{ return msgQueue.size () > 0 ;}); producer: condvar.notify_all ();
condition_variable
需要和 unique_lock
搭配使用
在一个线程调用 wait
之前,它必须持有 unique_lock
锁
当 wait
被调用时,该锁会被释放,线程会陷入沉睡,等待着生产者发过来的唤醒信号
当生产者调用同一个 condition_variable
的 notify_all
方法时,所有沉睡在该变量前的消费者会被唤醒,并尝试重新获取之前释放的 unique_lock
,继续执行下去(注意这里发生了锁争用,只有一个消费者能够获得锁,其他消费者得等待该消费者释放锁)
如果只想叫醒一个线程,可以用 notify_one
pthread
中也提供了对应的方法,分别是 pthread_cond_wait
, pthread_cond_broadcast
, pthread_cond_signal
wait
可以接受两个参数,此时第二个参数用于判断当前是否要沉睡。
1 []{ return msgQueue.size () > 0 ;});
相当于
1 2 3 while (msgQueue.size () <= 0 ) { condvar.wait () }
为了防止线程无限等待(可能一直没有唤醒),通过 wait_until
和 wait_for
,你可以设定线程的等待时间。设置 notify_all_at_thread_exit
也许能帮得上忙。
在 pthread
中,对应的调用是 pthread_cond_timedwait
。
More C++11 的线程库还提供了其他多线程编程的概念,比如 future
和 atomic
。
future future 位于头文件 <future>
下,包装了未来某个计算结果的期诺。
当你对所获得的 future
调用 get
时,程序会一直阻塞直到 future 的值被计算出来(如果 future 的值已经计算出来了,get 调用会立刻获得返回值),而这一切都是在后台执行的。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 #include <chrono> #include <iostream> #include <future> using namespace std;int main () { future<int > f1 = async (launch::async, [](){ std::chrono::milliseconds dura (2000 ); std::this_thread::sleep_for (dura); return 0 ; }); future<int > f2 = async (launch::async, [](){ std::chrono::milliseconds dura (2000 ); std::this_thread::sleep_for (dura); return 1 ; }); cout << "Results are: " << f1. get () << " " << f2. get () << "\n" ; return 0 ; }
1 2 3 4 5 $ g++ -std=c++11 -pthread ./future.cpp $ time ./a.out Results are: 0 1 ./a.out 0.00s user 0.00s system 0% cpu 2.012 total
除了 async
, packaged_task
和 promise
也都返回一个 future
。
atomic atomic
位于头文件 <atomic>
下,实现了类似于 java.util.concurrent.atomic
的功能。它提供了一组轻量级的、作用在单个变量上的原子操作,是 volatile
的替代品,有些时候你也可以用它来替换掉 lock
(假如整个 race condition 中只有单个变量)
下面这个例子解释了什么叫做原子操作:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 #include <atomic> #include <iostream> #include <thread> using namespace std;const int NUM = 100 ;int target = 0 ;atomic<int > atomicTarget (0 ) ;template <typename T>void atomicPlusOne (int trys) { while (trys > 0 ) { atomicTarget.fetch_add (1 ); --trys; } } void plusOne (int trys) { while (trys > 0 ) { ++target; --trys; } } int main () { thread threads[NUM]; thread atomicThreads[NUM]; for (int i = 0 ; i < NUM; i++) { atomicThreads[i] = thread (atomicPlusOne<int >, 10000 ); } for (int i = 0 ; i < NUM; i++) { threads[i] = thread (plusOne, 10000 ); } for (int i = 0 ; i < NUM; i++) { atomicThreads[i].join (); } for (int i = 0 ; i < NUM; i++) { threads[i].join (); } cout << "Atomic target's value : " << atomicTarget << "\n" ; cout << "Non-atomic target's value : " << target << "\n" ; return 0 ; }
1 2 3 4 $ g++ -std=c++11 -pthread ./atom.cpp Atomic target's value : 1000000 Non-atomic target' s value : 842480
Pros & Cons 最后总结下 std::thread
对比于 pthread
的优缺点:
优点:
简单,易用
跨平台,pthread
只能用在 POSIX 系统上(其他系统有其独立的 thread 实现)
提供了更多高级功能,比如 future
更加 C++(与匿名函数 ,std::bind
,RAII 等 C++ 特性更好的集成)
缺点:
没有 RWlock:有一个类似的 shared_mutex
,不过它属于 C++14,你的编译器很有可能不支持
操作线程和 Mutex 等的 API 较少:毕竟为了跨平台,只能选取各原生实现的子集。如果你需要设置某些属性,需要通过 API 调用返回原生平台上的对应对象,再对返回的对象进行操作。
生产者消费者(pthread & thread 版本) 附上我自己写的,分别用 std::thread
和 pthread
实现的多生产者多消费者程序。
注意行数上的差距。
pthread 版本 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 #include <pthread.h> #include <queue> #include <stdio.h> #include <unistd.h> pthread_mutex_t mutex;pthread_cond_t condvar;std ::queue <int > msgQueue;struct Produce_range { int start; int end; }; void *producer (void *args) { int start = static_cast<Produce_range *>(args)->start; int end = static_cast<Produce_range *>(args)->end; for (int x = start; x < end; x++) { usleep(200 * 1000 ); pthread_mutex_lock(&mutex); msgQueue.push(x); pthread_mutex_unlock(&mutex); pthread_cond_signal(&condvar); printf ("Produce message %d\n" , x); } pthread_exit((void *)0 ); return NULL ; } void *consumer (void *args) { int demand = *static_cast<int *>(args); while (true ) { pthread_mutex_lock(&mutex); if (msgQueue.size() <= 0 ) { pthread_cond_wait(&condvar, &mutex); } if (msgQueue.size() > 0 ) { printf ("Consume message %d\n" , msgQueue.front()); msgQueue.pop(); --demand; } pthread_mutex_unlock(&mutex); if (!demand) break ; } pthread_exit((void *)0 ); return NULL ; } int main () { pthread_attr_t attr; pthread_attr_init(&attr); pthread_mutex_init(&mutex, NULL ); pthread_cond_init(&condvar, NULL ); pthread_t producer1, producer2, producer3, consumer1, consumer2; Produce_range range1 = {0 , 10 }; pthread_create(&producer1, &attr, producer, static_cast<void *>(&range1)); Produce_range range2 = {range1.end, range1.end + 10 }; pthread_create(&producer2, &attr, producer, static_cast<void *>(&range2)); Produce_range range3 = {range2.end, range2.end + 10 }; pthread_create(&producer3, &attr, producer, static_cast<void *>(&range3)); int consume_demand1 = 20 ; int consume_demand2 = 10 ; pthread_create(&consumer1, &attr, consumer, static_cast<void *>(&consume_demand1)); pthread_create(&consumer2, &attr, consumer, static_cast<void *>(&consume_demand2)); pthread_join(producer1, NULL ); pthread_join(producer2, NULL ); pthread_join(producer3, NULL ); pthread_join(consumer1, NULL ); pthread_join(consumer2, NULL ); }
std::thread 版本 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 #include <chrono> #include <condition_variable> #include <future> #include <mutex> #include <queue> std::mutex mutex; std::condition_variable condvar; std::queue<int > msgQueue; void producer (int start, int end) { for (int x = start; x < end; x++) { std::this_thread::sleep_for (std::chrono::milliseconds (200 )); { std::lock_guard<std::mutex> guard (mutex) ; msgQueue.push (x); } printf ("Produce message %d\n" , x); condvar.notify_all (); } } void consumer (int demand) { while (true ) { std::unique_lock<std::mutex> ulock (mutex) ; condvar.wait (ulock, []{ return msgQueue.size () > 0 ;}); printf ("Consume message %d\n" , msgQueue.front ()); msgQueue.pop (); --demand; if (!demand) break ; } } int main () { std::thread producer1 (producer, 0 , 10 ) ; std::thread producer2 (producer, 10 , 20 ) ; std::thread producer3 (producer, 20 , 30 ) ; std::thread consumer1 (consumer, 20 ) ; std::thread consumer2 (consumer, 10 ) ; producer1. join (); producer2. join (); producer3. join (); consumer1. join (); consumer2. join (); }
参考资料