在 C++ 开发中,原生的线程库主要有两个,一个是 Linux 下的 <pthread.h>,另一个是 C++11 提供的 <thread>

以前一直用的是 pthread 的 API 写 C++ 的多线程程序,直到听说从 C++11 开始的标准库已经包含了对线程的支持。

推荐阅读:https://chengxumiaodaren.com/docs/concurrent/

在 C++ 开发中,原生的线程库主要有两个,一个是 Linux 下的 <pthread.h>,另一个是 C++11 提供的 <thread>

以前一直用的是 pthread 的 API 写 C++ 的多线程程序,直到听说从 C++11 开始的标准库已经包含了对线程的支持。

pthread

pthread 中的 p 是 POSIX (Portable Operating System Interface) 的缩写,是 IEEE 为了在各种 UNIX 操作系统上运行软件,而定义 API 的一系列互相关联的标准总称。相比于 std::thread 的简便易用,pthread 功能比较强大。

线程的创建和管理

创建线程|pthread_create

每个线程都有一个在进程中唯一的线程标识符,用一个数据类型 pthread_t 表示,该数据类型在 Linux 中就是一个无符号长整型数据。

1
int pthread_create(pthread_t *thread, pthread_attr_t *attr, void *(*start_routine)(void *), void *arg);

若创建成功,返回 0;若出错,则返回错误编号:

  • thread 是线程标识符,但这个参数不是由用户指定的,而是由 pthread_create 函数在创建时将新线程的标识符放到这个变量中
  • attr 指定线程的属性,可以用 NULL 表示默认属性
  • start_routine 指定线程开始运行的函数
  • arg 是 start_routine 所需的参数,是一个无类型指针

默认地,线程在被创建时要被赋予一定的属性,这个属性存放在数据类型 pthread_attr_t 中,它包含了线程的调度策略,堆栈的相关信息,join or detach 的状态等。

pthread_attr_initpthread_attr_destroy 函数分别用来创建和销毁 pthread_attr_t,具体函数声明可参考 man 手册帮助。

结束线程|pthread_exitpthread_cancel

当发生以下情形之一时,线程就会结束:

  • 线程运行的函数 return 了,也就是线程的任务已经完成;
  • 线程调用了 pthread_exit()
  • 其他线程调用 pthread_cancel() 结束了线程;
  • 进程调用 exec()exit() 结束;
  • main() 函数先结束了,而且 main() 自己没有调用 pthread_exit() 来等所有线程完成任务。

更抽象地说,线程结束执行的方式共有 3 种,分别是:

  1. 线程将指定函数体中的代码执行完后自行结束;
  2. 线程执行过程中,遇到 pthread_exit() 函数结束执行。
  3. 线程执行过程中,被同一进程中的其它线程(包括主线程)强制终止;

当然,一个线程结束,并不意味着它的所有信息都已经消失,后面会看到僵尸线程的问题。

下面介绍两个函数:

1
void pthread_exit(void *retval);
  • retval 是由用户指定的参数,pthread_exit 完成之后可以通过这个参数获得线程的退出状态/信息。
1
int pthread_cancel(pthread_t thread);
  • 一个线程可以通过调用 pthread_cancel 函数来请求取消同一进程中的线程,这个线程由 thread 参数指定。
  • 如果操作成功则返回 0,失败则返回对应的错误编码。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
#include <pthread.h>
#include <stdio.h>
#include <stdlib.h> // sleep() 函数

// 线程执行的函数
void* thread_Fun(void* arg) {
printf("新建线程开始执行\n");
sleep(10);
}

int main() {
pthread_t myThread;
void* mess;
int value;
int res;

// 创建 myThread 线程
res = pthread_create(&myThread, NULL, thread_Fun, NULL);
if (res != 0) {
printf("线程创建失败\n");
return 0;
}
sleep(1);

// 向 myThread 线程发送 Cancel 信号
res = pthread_cancel(myThread);
if (res != 0) {
printf("终止 myThread 线程失败\n");
return 0;
}

// 获取已终止线程的返回值
res = pthread_join(myThread, &mess);
if (res != 0) {
printf("等待线程失败\n");
return 0;
}

// 如果线程被强制终止,其返回值为 PTHREAD_CANCELED
if (mess == PTHREAD_CANCELED) {
printf("myThread 线程被强制终止\n");
} else {
printf("error\n");
}
return 0;
}
1
2
3
$ ./pthread
新建线程开始执行
myThread 线程被强制终止

一个简单的多线程实现

这是一个非常简单的基于 pthread 的多线程实现:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
#include <pthread.h>
#include <stdio.h>
#include <stdlib.h>

#define NUM_THREADS 5

void *printHello(void *thread_id) {
long tid;
tid = (long)thread_id;
printf("Hello World! It's me, thread #%ld!\n", tid);
pthread_exit(NULL);
}

int main(int argc, char *argv[]) {
pthread_t threads[NUM_THREADS];
int rc;
long t;
for (t = 0; t < NUM_THREADS; t++) {
printf("In main: creating thread %ld\n", t);
rc = pthread_create(&threads[t], NULL, printHello, (void *)t);
if (rc) {
printf("ERROR; return code frome pthread_create() is %d\n", rc);
exit(-1);
}
}
// Last thing that main() should do
pthread_exit(NULL);
}
1
2
gcc -Wall _pthread.c -lpthread -o pthread
./pthread
1
2
3
4
5
6
7
8
9
10
In main: creating thread 0
In main: creating thread 1
Hello World! It's me, thread #0!
In main: creating thread 2
Hello World! It's me, thread #1!
In main: creating thread 3
Hello World! It's me, thread #2!
In main: creating thread 4
Hello World! It's me, thread #3!
Hello World! It's me, thread #4!

注意输出的顺序可能不同, 要特别注意的是,main() 显示地调用了 pthread_exit() 来等待其他线程的结束(如果不使用这个函数的话,可能 main() 函数结束了也有线程没有执行完毕)

给线程传入初始化参数

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
#include <pthread.h>
#include <stdio.h>
#include <stdlib.h>

#define NUM_THREADS 8

char *messages[NUM_THREADS];

struct thread_data {
int tid;
int sum;
char *msg;
};

struct thread_data _datas[NUM_THREADS];

void *printHello(void *thread_arg) {
int task_id, sum;
char *hello_msg;
struct thread_data *my_data;

my_data = (struct thread_data *)thread_arg;
task_id = my_data->tid;
sum = my_data->sum;
hello_msg = my_data->msg;

// sleep(1);
printf("Thread %d: %s Sum = %d\n", task_id, hello_msg, sum);
pthread_exit(NULL);
}

int main(int argc, char *argv[]) {
pthread_t threads[NUM_THREADS];
int *task_ids[NUM_THREADS];
int rc, t, sum;

sum = 0;
messages[0] = "English: Hello World!";
messages[1] = "French: Bonjour, le monde!";
messages[2] = "Spanish: Hola al mundo";
messages[3] = "Klingon: Nuq neH!";
messages[4] = "German: Guten Tag, Welt!";
messages[5] = "Russian: Zdravstvytye, mir!";
messages[6] = "Japan: Sekai e konnichiwa!";
messages[7] = "Latin: Orbis, te saluto!";

for (t = 0; t < NUM_THREADS; t++) {
sum = sum + t;
_datas[t].tid = t;
_datas[t].sum = sum;
_datas[t].msg = messages[t];
printf("Creating thread %d\n", t);
rc = pthread_create(&threads[t], NULL, printHello, (void *)&_datas[t]);
if (rc) {
printf("ERROR; return code from pthread_create() is %d\n", rc);
exit(-1);
}
}
pthread_exit(NULL);
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
Creating thread 0
Creating thread 1
Thread 0: English: Hello World! Sum = 0
Creating thread 2
Thread 1: French: Bonjour, le monde! Sum = 1
Creating thread 3
Thread 2: Spanish: Hola al mundo Sum = 3
Creating thread 4
Thread 3: Klingon: Nuq neH! Sum = 6
Creating thread 5
Thread 4: German: Guten Tag, Welt! Sum = 10
Creating thread 6
Thread 5: Russian: Zdravstvytye, mir! Sum = 15
Creating thread 7
Thread 6: Japan: Sekai e konnichiwa! Sum = 21
Thread 7: Latin: Orbis, te saluto! Sum = 28

对线程的阻塞|pthread_joinpthread_detach

阻塞时线程之间「同步」的一种方法。

1
int pthread_join(pthread_t thread_id, void **value_ptr);
  • pthread_join 函数会让调用它的线程等待 thread_id 线程运行结束之后再运行(如果是 main() 调用,则阻塞 main 线程,直到 join 的所有线程执行结束 —— 常用于等待 main 中创建的所有线程执行完毕)
  • value_ptr 存放了其他线程的返回值

一个可以被 join 的线程,仅仅可以被另一个线程 join,如果同时有多个线程尝试 join 同一个线程时,最终结果是未知的;另外,线程不能 join 自己。上面提到过,创建一个线程时,要赋予它一定的属性,这其中就包括 joinable or detachable 的属性,只有被声明称 joinable 的线程才可以被其他线程 join。

POSIX 标准的最终版本指出线程应该被设置成 joinable 的,显式设置一个线程为 joinable,需要以下四个步骤:

  • Declare a pthread attribute variable of the pthread_attr_t data type
  • Initialize the attribute variable with pthread_attr_init()
  • Set the attribute detached status with pthread_attr_setdetchstate()
  • When done, free library resources used by the attribute with pthread_attr_destroy()

pthread_join() 函数会一直阻塞调用它的线程,直至目标线程执行结束(接收到目标线程的返回值),阻塞状态才会解除。如果 pthread_join() 函数成功等到了目标线程执行结束(成功获取到目标线程的返回值),返回值为数字 0;反之如果执行失败,函数会根据失败原因返回相应的非零值,每个非零值都对应着不同的宏,例如:

  • EDEADLK:检测到线程发生了死锁。
  • EINVAL:分为两种情况,要么目标线程本身不允许其它线程获取它的返回值,要么事先就已经有线程调用 pthread_join() 函数获取到了目标线程的返回值。
  • ESRCH:找不到指定的 thread 线程。

再次强调,一个线程执行结束的返回值只能由一个 pthread_join() 函数获取,当有多个线程调用 pthread_join() 函数获取同一个线程的执行结果时,哪个线程最先执行 pthread_join() 函数,执行结果就由那个线程获得,其它线程的 pthread_join() 函数都将执行失败。

对于一个默认属性的线程 A 来说,线程占用的资源并不会因为执行结束而得到释放。而通过在其它线程中执行pthread_join(A,NULL);语句,可以轻松实现“及时释放线程 A 所占资源”的目的。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
#include <errno.h>  //使用宏 ESRCH
#include <pthread.h>
#include <stdio.h>

// 线程执行的函数
void *ThreadFun(void *arg) { pthread_exit("test_msg"); }

int main() {
int res;
void *thread_result;
pthread_t myThread;
// 创建 myThread 线程
res = pthread_create(&myThread, NULL, ThreadFun, NULL);
if (res != 0) {
printf("线程创建失败");
return 0;
}
// 阻塞主线程,等待 myThread 线程执行结束
res = pthread_join(myThread, &thread_result);
if (res != 0) {
printf("1:等待线程失败");
}
// 输出获取到的 myThread 线程的返回值
printf("%s\n", (char *)thread_result);

// 尝试再次获取 myThread 线程的返回值
res = pthread_join(myThread, &thread_result);
if (res == ESRCH) {
printf("2:等待线程失败,线程不存在\n");
}
return 0;
}
1
2
3
$ ./pthread
test_msg
2:等待线程失败,线程不存在

__detachstate 属性值用于指定线程终止执行的时机,该属性的值有两个,分别是:

  • PTHREAD_CREATE_JOINABLE(默认值):线程执行完函数后不会自行释放资源;
  • PTHREAD_CREATE_DETACHED:线程执行完函数后,会自行终止并释放占用的资源。

还有 pthread_detach() 函数,可以直接将目标线程的 __detachstate 属性改为 PTHREAD_CREATE_DETACHED,语法格式如下:

1
int pthread_detach(pthread_t thread);

关于 __detachstate 属性,<pthread.h> 头文件中提供了 2 个与它相关的函数,分别是:

1
2
int pthread_attr_getdetachstate(const pthread_attr_t * attr,int * detachstate);
int pthread_attr_setdetachstate(pthread_attr_t *sttr,int detachstate);

可以如下创建 detach 状态的线程:

1
2
3
4
5
pthread_t tid;
pthread_attr_t attr;
pthread_attr_init(&attr);
pthread_attr_setdetachstate(&attr, PTHREAD_CREATE_DETACHED);
pthread_create(&tid, &attr, THREAD_FUNCTION, arg);

⚠️ 值得注意的是:僵尸线程(zombie thread)是一种已经退出了的 joinable 线程,但是等待其他线程调用 pthread_join 来 join 它,以收集它的退出信息。如果没有其他线程调用 pthread_join 来 join 它的话,它占用的一些系统资源不会被释放,比如堆栈。如果 main() 函数需要长时间运行,并且创建大量 joinable 的线程,就有可能出现堆栈不足的 error。

⚠️ 对于那些不需要 join 的线程,最好利用 pthread_detach,这样它运行结束后,资源就会及时得到释放。注意一个线程被使用 pthread_detach 之后,它就不能再被改成 joinable 的了。

⚠️ 总而言之,创建的每一个线程都应该使用 pthread_join 或者 pthread_detach 其中一个,以防止僵尸线程的出现。

Linux 线程属性之线程栈大小|pthread_attr_t

线程的属性用 pthread_attr_t 类型的变量表示,使用此变量前,必须调用 pthread_attr_init() 函数进行初始化:

1
int pthread_attr_init(pthread_attr_t * attr);

pthread_attr_t 是一种结构体类型,内部包含多种线程属性(更多内容请看参考资料):

1
2
3
4
5
6
7
8
9
10
11
12
typedef struct
{
int __detachstate; // 用于指定线程终止执行的时机
int __schedpolicy; // 指定系统调度该线程所用的算法
struct sched_param __schedparam; // 设置线程的优先级
int __inheritsched; // 默认遵循父线程的属性, 用于自定义线程的调度属性
int __scope; // 用于指定目标线程和哪些线程抢夺 CPU 资源
size_t __guardsize; // 用来设置警戒缓冲区的大小
int __stackaddr_set;
void* __stackaddr;
size_t __stacksize; // 每个线程都有属于自己的内存空间, 线程执行如果需要较大的栈内存,就需要自定义线程拥有的栈大小
} pthread_attr_t;

POSIX 标准没有规定一个线程的堆栈大小,安全可移植的程序不会依赖于具体实现默认的堆栈限制,而是显式地调用 pthread_attr_setstacksize 来分配足够的堆栈空间。

关于堆栈大小的一个例子:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
#include <pthread.h>

#define N_THREADS 5
#define N 1000
#define MEGEXTRA 1000000

pthread_attr_t _attr;

void* do_work(void* thread_id) {
double A[N][N];
int i, j;
long tid;
size_t my_stack_size;
tid = (long)thread_id;
pthread_attr_getstacksize(&_attr, &my_stack_size);
printf("Thread %ld: stack size = %ld bytes \n", tid, my_stack_size);
for (i = 0; i < N; i++) {
for (j = 0; j < N; j++) {
A[i][j] = ((i * j) / 3.452) + (N - i);
}
}
pthread_exit(NULL);
}

int main(int argc, char* argv[]) {
pthread_t threads[N_THREADS];
size_t stack_size;
int rc;
long t;

pthread_attr_init(&_attr);
pthread_attr_getstacksize(&_attr, &stack_size);
printf("Default stack size = %li\n", stack_size); // 线程栈大小: 8 MB

stack_size = sizeof(double) * N * N + MEGEXTRA;
printf("Amount of stack needed per thread = %li\n", stack_size);

pthread_attr_setstacksize(&_attr, stack_size);
printf("Creating threads with stack size = %li bytes\n", stack_size);

for (t = 0; t < N_THREADS; t++) {
rc = pthread_create(&threads[t], &_attr, do_work, (void*)t);
if (rc) {
printf("ERROR; return code from pthread_create() is %d\n", rc);
exit(-1);
}
}
printf("Creating %ld threads.\n", t);
pthread_exit(NULL);
}
1
2
3
4
5
6
7
8
9
10
$ ./pthread 
Default stack size = 8388608
Amount of stack needed per thread = 9000000
Creating threads with stack size = 9000000 bytes
Creating 5 threads.
Thread 1: stack size = 9000000 bytes
Thread 2: stack size = 9000000 bytes
Thread 0: stack size = 9000000 bytes
Thread 3: stack size = 9000000 bytes
Thread 4: stack size = 9000000 bytes

其他相关函数:

1
2
3
4
// 返回 thread ID
pthread_self();
// 比较两个线程的 ID, 如果不同则返回 0, 否则返回一个非零值
pthread_equal(thread_1, thread_2);

互斥锁 Mutex

Mutex 常常被用来保护那些可以被多个线程访问的共享资源,比如可以防止多个线程同时更新同一个数据时出现混乱。

使用互斥锁的一般步骤是:

  • 创建一个互斥锁,即声明一个 pthread_mutex_t 类型的数据,然后初始化,只有初始化之后才能使用;
  • 多个线程尝试锁定这个互斥锁;
  • 只有一个成功锁定互斥锁,成为互斥锁的拥有者,然后进行一些指令;
  • 拥有者解锁互斥锁;
  • 其他线程尝试锁定这个互斥锁,重复上面的过程;
  • 最后互斥锁被显式地调用 pthread_mutex_destroy 来进行销毁。

有两种方式初始化一个互斥锁:

1️⃣ 第一种,利用已经定义的常量初始化,例如:

1
pthread_mutex_t mymutex = PTHREAD_MUTEX_INITIALIZER;

2️⃣ 第二种方式是调用 pthread_mutex_init(mutex, attr) 进行初始化。

当多个线程同时去锁定同一个互斥锁时,失败的那些线程

  • 如果是用 pthread_mutex_lock 函数,那么会被阻塞,直到这个互斥锁被解锁,它们再继续竞争;
  • 如果是用 pthread_mutex_trylock 函数,那么失败者只会返回一个错误。

最后需要指出的是,保护共享数据是程序员的责任。程序员要负责所有可以访问该数据的线程都使用 mutex 这种机制,否则,不使用 mutex 的线程还是有可能对数据造成破坏。

相关函数:

1
2
3
4
5
6
7
int pthread_mutex_init(pthread_mutex_t *__mutex, const pthread_mutexattr_t *__mutexattr);
int pthread_mutex_destroy(pthread_mutex_t *__mutex);
int pthread_mutex_lock(pthread_mutex_t *__mutex);
int pthread_mutex_unlock(pthread_mutex_t *__mutex);
int pthread_mutex_trylock(pthread_mutex_t *__mutex);
int pthread_mutexattr_init(pthread_mutexattr_t *__attr);
int pthread_mutexattr_destroy(pthread_mutexattr_t *__attr);

Example

下面是一个利用多线程进行向量点乘的程序,其中需要对 dotstr.sum 这个共同读写的数据进行保护。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
#include <pthread.h>
#include <stdio.h>
#include <stdlib.h>

/*
The following structure contains the necessary information
to allow the function "dotprod" to access its input data and
place its output into the structure. This structure is
unchanged from the sequential version.
*/

typedef struct {
double *a;
double *b;
double sum;
int veclen;
} DOTDATA;

/* Define globally accessible variables and a mutex */

#define NUMTHRDS 4
#define VECLEN 100000
DOTDATA dotstr;
pthread_t callThd[NUMTHRDS];
pthread_mutex_t mutexsum;

/*
The function dotprod is activated when the thread is created.
As before, all input to this routine is obtained from a structure
of type DOTDATA and all output from this function is written into
this structure. The benefit of this approach is apparent for the
multi-threaded program: when a thread is created we pass a single
argument to the activated function - typically this argument
is a thread number. All the other information required by the
function is accessed from the globally accessible structure.
*/

void *dotprod(void *arg) {
/* Define and use local variables for convenience */

int i, start, end, len;
long offset;
double mysum, *x, *y;
offset = (long)arg;

len = dotstr.veclen;
start = offset * len;
end = start + len;
x = dotstr.a;
y = dotstr.b;

/*
Perform the dot product and assign result
to the appropriate variable in the structure.
*/
mysum = 0;
for (i = start; i < end; i++) {
mysum += (x[i] * y[i]);
}

/*
Lock a mutex prior to updating the value in the shared
structure, and unlock it upon updating.
*/
pthread_mutex_lock(&mutexsum);
dotstr.sum += mysum;
printf("Thread %ld did %d to %d: mysum=%f global sum=%f\n", offset, start, end, mysum, dotstr.sum);
pthread_mutex_unlock(&mutexsum);
pthread_exit((void *)0);
}

/*
The main program creates threads which do all the work and then print out result
upon completion. Before creating the threads, The input data is created. Since
all threads update a shared structure, we need a mutex for mutual exclusion.
The main thread needs to wait for all threads to complete, it waits for each one
of the threads. We specify a thread attribute value that allow the main thread to
join with the threads it creates. Note also that we free up handles when they
are no longer needed.
*/

int main(int argc, char *argv[]) {
long i;
double *a, *b;
void *status;
pthread_attr_t attr;

/* Assign storage and initialize values */

a = (double *)malloc(NUMTHRDS * VECLEN * sizeof(double));
b = (double *)malloc(NUMTHRDS * VECLEN * sizeof(double));

for (i = 0; i < VECLEN * NUMTHRDS; i++) {
a[i] = 1;
b[i] = a[i];
}

dotstr.veclen = VECLEN;
dotstr.a = a;
dotstr.b = b;
dotstr.sum = 0;

pthread_mutex_init(&mutexsum, NULL);

/* Create threads to perform the dotproduct */
pthread_attr_init(&attr);
pthread_attr_setdetachstate(&attr, PTHREAD_CREATE_JOINABLE);

for (i = 0; i < NUMTHRDS; i++) {
/* Each thread works on a different set of data.
* The offset is specified by 'i'. The size of
* the data for each thread is indicated by VECLEN.
*/
pthread_create(&callThd[i], &attr, dotprod, (void *)i);
}

pthread_attr_destroy(&attr);

/* Wait on the other threads */
for (i = 0; i < NUMTHRDS; i++) {
pthread_join(callThd[i], &status);
}
/* After joining, print out the results and cleanup */

printf("Sum = %f \n", dotstr.sum);
free(a);
free(b);
pthread_mutex_destroy(&mutexsum);
pthread_exit(NULL);
}
1
2
3
4
5
6
$ ./pthread 
Thread 0 did 0 to 100000: mysum=100000.000000 global sum=100000.000000
Thread 2 did 200000 to 300000: mysum=100000.000000 global sum=200000.000000
Thread 1 did 100000 to 200000: mysum=100000.000000 global sum=300000.000000
Thread 3 did 300000 to 400000: mysum=100000.000000 global sum=400000.000000
Sum = 400000.000000

条件变量 Condition Variable

互斥锁只有两种状态,这限制了它的用途。条件变量允许线程在阻塞的时候等待另一个线程发送的信号,当收到信号后,阻塞的线程就被唤醒并试图锁定与之相关的互斥锁。条件变量要和互斥锁结合使用

条件变量的声明和初始化

通过声明 pthread_cond_t 类型的数据,并且必须先初始化才能使用。

初始化的方法也有两种:

1️⃣ 第一种,利用内部定义的常量,例如:

1
pthread_cond_t myconvar = PTHREAD_COND_INITIALIZER;

2️⃣ 第二种,利用函数 pthread_cond_init(cond, attr),其中 attr 由 pthread_condattr_init()pthread_condattr_destroy() 创建和销毁;可以用 pthread_cond_destroy() 销毁一个条件变量。

相关函数:

1
2
3
int pthread_cond_wait(pthread_cond_t *__restrict__ __cond, pthread_mutex_t *__restrict__ __mutex);
int pthread_cond_signal(pthread_cond_t *__cond);
int pthread_cond_broadcast(pthread_cond_t *__cond);
  • pthread_cond_wait() 会阻塞调用它的线程,直到收到某一个信号:这个函数需要在 mutex 已经被锁之后进行调用,并且当线程被阻塞时,会自动解锁 mutex。信号收到后,线程被唤醒,这时 mutex 又会被这个线程锁定。
  • pthread_cond_signal() 函数结束时,必须解锁 mutex,以供 pthread_cond_wait() 锁定mutex。
  • 当不止一个线程在等待信号时,要用 pthread_cond_broadcast() 代替 pthread_cond_signal() 来告诉所有被该条件变量阻塞的线程结束阻塞状态。

Example

下面是一个例子,三个线程共同访问 count 变量,thread 2 和 thread 3 竞争地对其进行加 1 的操作,thread 1 等 count 达到 12 的时候,一次性加 125 。 然后 thread 2 和 thread 3 再去竞争 count 的控制权,直到完成自己的对 count 加 10 次的任务。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
#include <pthread.h>
#include <stdio.h>
#include <stdlib.h>

#define NUM_THREADS 3
#define TCOUNT 10
#define COUNT_LIMIT 12

int count = 0;
pthread_mutex_t count_mutex;
pthread_cond_t count_threshold_cv;

void *inc_count(void *t) {
int i;
long my_id = (long)t;

for (i = 0; i < TCOUNT; i++) {
pthread_mutex_lock(&count_mutex);
count++;

/*
Check the value of count and signal waiting thread when condition is
reached. Note that this occurs while mutex is locked.
*/
if (count == COUNT_LIMIT) {
printf("inc_count(): thread %ld, count = %d Threshold reached. ", my_id, count);
pthread_cond_signal(&count_threshold_cv);
printf("Just sent signal.\n");
}
printf("inc_count(): thread %ld, count = %d, unlocking mutex\n", my_id, count);
pthread_mutex_unlock(&count_mutex);

/* Do some work so threads can alternate on mutex lock */
sleep(1);
}
pthread_exit(NULL);
}

void *watch_count(void *t) {
long my_id = (long)t;

printf("Starting watch_count(): thread %ld\n", my_id);

/*
Lock mutex and wait for signal. Note that the pthread_cond_wait routine
will automatically and atomically unlock mutex while it waits.
Also, note that if COUNT_LIMIT is reached before this routine is run by
the waiting thread, the loop will be skipped to prevent pthread_cond_wait
from never returning.
*/
pthread_mutex_lock(&count_mutex);
while (count < COUNT_LIMIT) {
printf("watch_count(): thread %ld Count= %d. Going into wait...\n", my_id, count);
pthread_cond_wait(&count_threshold_cv, &count_mutex);
printf("watch_count(): thread %ld Condition signal received. Count= %d\n", my_id, count);
printf("watch_count(): thread %ld Updating the value of count...\n", my_id, count);
count += 125;
printf("watch_count(): thread %ld count now = %d.\n", my_id, count);
}
printf("watch_count(): thread %ld Unlocking mutex.\n", my_id);
pthread_mutex_unlock(&count_mutex);
pthread_exit(NULL);
}

int main(int argc, char *argv[]) {
int i, rc;
long t1 = 1, t2 = 2, t3 = 3;
pthread_t threads[3];
pthread_attr_t attr;

/* Initialize mutex and condition variable objects */
pthread_mutex_init(&count_mutex, NULL);
pthread_cond_init(&count_threshold_cv, NULL);

/* For portability, explicitly create threads in a joinable state */
pthread_attr_init(&attr);
pthread_attr_setdetachstate(&attr, PTHREAD_CREATE_JOINABLE);
pthread_create(&threads[0], &attr, watch_count, (void *)t1);
pthread_create(&threads[1], &attr, inc_count, (void *)t2);
pthread_create(&threads[2], &attr, inc_count, (void *)t3);

/* Wait for all threads to complete */
for (i = 0; i < NUM_THREADS; i++) {
pthread_join(threads[i], NULL);
}
printf("Main(): Waited and joined with %d threads. Final value of count = %d. Done.\n", NUM_THREADS, count);

/* Clean up and exit */
pthread_attr_destroy(&attr);
pthread_mutex_destroy(&count_mutex);
pthread_cond_destroy(&count_threshold_cv);
pthread_exit(NULL);
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
$ ./pthread 
Starting watch_count(): thread 1
inc_count(): thread 2, count = 1, unlocking mutex
inc_count(): thread 3, count = 2, unlocking mutex
watch_count(): thread 1 Count= 2. Going into wait...
inc_count(): thread 2, count = 3, unlocking mutex
inc_count(): thread 3, count = 4, unlocking mutex
inc_count(): thread 2, count = 5, unlocking mutex
inc_count(): thread 3, count = 6, unlocking mutex
inc_count(): thread 2, count = 7, unlocking mutex
inc_count(): thread 3, count = 8, unlocking mutex
inc_count(): thread 2, count = 9, unlocking mutex
inc_count(): thread 3, count = 10, unlocking mutex
inc_count(): thread 2, count = 11, unlocking mutex
inc_count(): thread 3, count = 12 Threshold reached. Just sent signal.
inc_count(): thread 3, count = 12, unlocking mutex
watch_count(): thread 1 Condition signal received. Count= 12
watch_count(): thread 1 Updating the value of count...
watch_count(): thread 1 count now = 137.
watch_count(): thread 1 Unlocking mutex.
inc_count(): thread 2, count = 138, unlocking mutex
inc_count(): thread 3, count = 139, unlocking mutex
inc_count(): thread 2, count = 140, unlocking mutex
inc_count(): thread 3, count = 141, unlocking mutex
inc_count(): thread 2, count = 142, unlocking mutex
inc_count(): thread 3, count = 143, unlocking mutex
inc_count(): thread 3, count = 144, unlocking mutex
inc_count(): thread 2, count = 145, unlocking mutex
Main(): Waited and joined with 3 threads. Final value of count = 145. Done.

std::thread

在 C++11 中引入的线程库 std::thread 实际是基于 pthread 实现的,后续主要介绍:

  • 如何使用 std::thread 创建线程
  • 深入剖析 std::thread 的设计原理

使用 std::thread

当你创建了一个(非空的)线程对象时,对应线程就会执行,不需要显式的调用 start 或者 run(pthread 也是)。如果之前你没有用过 pthread,也许不会理解何为“方便得出人意料”:

  • pthread_create 只接受 void *f(void *),所以如果你想调用现成的函数,还需要包装一下;
  • 而且 pthread_create 其函数接受参数(第四个参数)类型为 void *arg,如果要传多个参数,还需要定义结构体,接着将结构体转为 void * 类型再传递进去;
  • 这还没完,传递进去的参数还需要在其内部函数中,重新转型成(可能是一次性的)某个结构体,最后才能取出其中的变量;

创建线程后,调用 Thread.join 就会阻塞到线程执行结束为止(相当于pthread_join)。你也可以选择 detach 该线程,这时候线程会独立执行,不会随调用者终止而结束。

在如下的 demo 中,主线程中使用 std::thread 创建 3 个子线程,线程入口函数是 do_some_work,在主线程运行结束前等待子线程的结束。

注:在构造线程对象 std::thread{do_some_work, i} 的时候,还是建议使用 {} 而不是 (),以防止编译器产生错误的决议,具体原因可以参考文章(深入了解 C++:别再徘徊于 {} 与 () 之间了

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
// #include <bits/stdc++.h>
#include <iostream>
#include <thread>
#include <vector>

const int N_THREADS = 3;

void do_some_work(int num) { std::cout << "thread: " << num << std::endl; }

int main(int argc, char const* argv[]) {
std::vector<std::thread> thread_list;
thread_list.reserve(N_THREADS);

// start thread
for (int i = 0; i < N_THREADS; i++) {
// 🆗 thread_list.push_back(std::thread{do_some_work, i});
// 🆗 thread_list.push_back(std::thread(do_some_work, i));
thread_list.emplace_back(do_some_work, i);
}
std::cout << "work in main thread" << std::endl;

// main() thread will waiting other threads
for (int i = 0; i < N_THREADS; i++) {
thread_list[i].join();
}
std::cout << "main thread end" << std::endl;
return 0;
}

三个子线程共享输出缓冲区 std::cout,此时没有采取任何机制保护线程间共享数据,因此上面 demo 的输出可能不符合你的预期,即很可能不是按照如下格式输出:

1
2
3
4
$ ./thread
thread: 0
thread: 1
thread: 2

实际输出结果(非常混乱):

1
2
3
4
5
6
$ ./thread
thread: work in main thread
thread: 0
2
thread: 1
main thread end

从输出可以看出:

  • 先创建的线程,未必就先运行;
  • 而且几个线程之间是互相抢 CPU 资源的

线程间数据共享问题及其应对措施,留到后文讲解,下面讲解 std::thread 的设计。

深入剖析 std::thread

g++ 中,thread 是基于 pthread 实现的。本次主要从以下三个方面分 std::thread

  • std::thread 对象不可复制,只具有移动属性
  • 每个线程具有唯一的标志,即线程 id
  • 创建子线程(即构造 std::thread

1. 移动属性

有很多书籍说,std::thread 对象的所有权只能传递不能复制。实际上,就 std::thread 对象,只具有移动属性,不具有复制属性。std::thread 的构造函数如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
class thread {
private:
id _M_id;

public:
thread() noexcept = default;

template <typename _Callable, typename... _Args, typename = _Require<__not_same<_Callable>>>
explicit thread(_Callable&& __f, _Args&&... __args) {
//...
}

~thread() {
if (joinable()) std::terminate();
}
// 禁止复制(复制构造、复制赋值)
thread(const thread&) = delete;
thread& operator=(const thread&) = delete;

// std::thread 只具有移动属性(移动构造、移动赋值)
thread(thread&& __t) noexcept { swap(__t); }

thread& operator=(thread&& __t) noexcept {
if (joinable()) std::terminate();
swap(__t);
return *this;
}
//...
}

可以发现,std::thread 禁止了复制构造函数、复制赋值表达式,只留下了移动构造函数、移动赋值,使得 std::thread 对象只能移动,不能复制。这就是之前 demo 中使用 emplace_back 函数添加 std::thread 对象的原因,防止触发复制构造函数。所以向 thread_list 中添加 std::thread 对象有以下几种方式:

当 push_back 接受的是右值,底层调用的还是 emplace_back 函数,因此 4 和 5 是等价的。

1
2
3
4
5
6
7
8
9
10
thread_list.push_back(std::thread{do_some_work, i});    // 1.ok

thread_list.emplace_back(do_some_work, i); // 2.ok
thread_list.emplace_back(std::thread{do_some_work, i}); // 2.ok

std::thread trd{do_some_work, i};
thread_list.push_back(trd); // 3.error❌

thread_list.push_back(std::move(trd)); // 4.ok
thread_list.emplace_back(std::move(trd)); // 5.ok

第三种办法报错:

1
2
3
4
5
6
7
8
9
10
/usr/include/c++/9/ext/new_allocator.h: In instantiation of ‘void __gnu_cxx::new_allocator<_Tp>::construct(_Up*, _Args&& ...) [with _Up = std::thread; _Args = {std::thread&}; _Tp = std::thread]’:
/usr/include/c++/9/bits/alloc_traits.h:483:4: required from ‘static void std::allocator_traits<std::allocator<_CharT> >::construct(std::allocator_traits<std::allocator<_CharT> >::allocator_type&, _Up*, _Args&& ...) [with _Up = std::thread; _Args = {std::thread&}; _Tp = std::thread; std::allocator_traits<std::allocator<_CharT> >::allocator_type = std::allocator<std::thread>]’
/usr/include/c++/9/bits/vector.tcc:115:30: required from ‘void std::vector<_Tp, _Alloc>::emplace_back(_Args&& ...) [with _Args = {std::thread&}; _Tp = std::thread; _Alloc = std::allocator<std::thread>]’
_thread.cpp:22:37: required from here
/usr/include/c++/9/ext/new_allocator.h:146:4: error: use of deleted function ‘std::thread::thread(const std::thread&)’
146 | { ::new((void *)__p) _Up(std::forward<_Args>(__args)...); }
| ^~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
In file included from _thread.cpp:3:
/usr/include/c++/9/thread:142:5: note: declared here
142 | thread(const thread&) = delete;

2. std::thread::id

可以发现,在 std::thread 对象中,只有一个成员变量 _M_id

这个类 id 全称是 std::thread::id,实现如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
class id {
// // _M_thread 即 pthread_t 对象,线程的唯一辨识标志
native_handle_type _M_thread;

public:
// _M_thread 默认值是 0
id() noexcept : _M_thread() {}

explicit id(native_handle_type __id) : _M_thread(__id) {}

private:
friend class thread;
friend class hash<thread::id>;
// 为 std::thread::id 对象重载了 == 运算
friend bool operator==(thread::id __x, thread::id __y) noexcept;

friend bool operator<(thread::id __x, thread::id __y) noexcept;
// 为 std::thread::id 对象重载了 << 操作
template <class _CharT, class _Traits>
friend basic_ostream<_CharT, _Traits>& operator<<(basic_ostream<_CharT, _Traits>& __out, thread::id __id);
};

因此,这个 std::thread::id 实际上就是封装了 pthread_t 对象,用作每个线程的标志。

  • 在构造 std::thread 对象的时候,如果没有设置线程入口函数,则线程 _M_id._M_thread 的值是 0

比如下面的 demo,trd 没有设置线程入口函数,trd 调用默认构造函数时,trd_M_id._M_thread 会被初始化为 0

1
2
3
4
5
int main(int argc, char const* argv[]) {
std::thread trd;
std::cout << trd.get_id() << std::endl;
return 0;
}

但是,打印线程标志 trd.get_id(),输出的是却不是0。这仅仅是 std::thread::id 在重载 << 操作符时的设定,用于提示调用者线程没有启动。

1
2
$ g++  thread_.cc -o thread_ && ./thread_
thread::id of a non-executing thread

可以到 std::thread::id 重载的 << 操作符的函数中一探究竟:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
template<class _CharT, class _Traits>
inline basic_ostream<_CharT, _Traits>& operator<<(basic_ostream<_CharT, _Traits>& __out, thread::id __id) {
// 线程未启动
if (__id == thread::id())
return __out << "thread::id of a non-executing thread";
// 线程成功启动
else
return __out << __id._M_thread;
}

// id的相等判断
inline bool operator==(thread::id __x, thread::id __y) noexcept {
return __x._M_thread == __y._M_thread;
}

因此判断一个线程是否启动,可如下检测:

1
2
3
bool thread_is_active(const std::thread::id& thread_id) { 
return thread_id != std::thread::id();
}

设置了线程入口函数,_M_id._M_thread 才是线程的tid值,由 pthread_create(&tid, NULL, ...) 函数设置:

1
2
3
4
5
6
7
int main(int argc, char const* argv[]) {
std::thread trd{[] { std::cout << "work in sub-thread\n"; }};

std::cout << trd.get_id() << std::endl;
trd.join();
return 0;
}
1
2
3
$ ./thread 
140203273147968
work in sub-thread

by the way

在创建 std::thread 对象 trd 时:

  • 如果设置了线程入口函数,那么就必须使用 trd.join() 或者 trd.detach() 来表达子线程与主线程的运行关系,否则在 std::thread 对象析构时,整个程序会被 std::terminate() 中止
  • 如果没有设置线程入口函数,trd.joinable() 返回值就是 false,因此不会触发 std::terminate()
1
2
3
4
~thread() {
if (joinable())
std::terminate();
}

3. 创建子线程

当构造 std::thread 对象时,设置了线程入口函数,会在相匹配的构造函数里调用 pthread_create 函数创建子线程。先看整体实现:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
// std::thread 构造函数
template<typename _Callable,
typename... _Args,
typename = _Require<__not_same<_Callable>>>
explicit thread(_Callable&& __f, _Args&&... __args)
{
static_assert( __is_invocable<typename decay<_Callable>::type,
typename decay<_Args>::type...>::value,
"std::thread arguments must be invocable after conversion to rvalues");

// Create a reference to pthread_create, not just the gthr weak symbol.
auto __depend = reinterpret_cast<void(*)()>(&pthread_create);
// 启动线程
_M_start_thread(_S_make_state(__make_invoker(std::forward<_Callable>(__f),
std::forward<_Args>(__args)...)),
__depend);
}

再细看构造函数执行流程:

  1. 在编译期判断构造 std::thread 对象时设置的线程入口函数 __f 及其参数 __args 能否调用。

比如,下面的 demo 中,线程入口函数 thread_func 有个 int 类型的参数 arg,如果传入的参数 __args 无法隐式转换为 int 类型,或者没有设置 __args,都会触发 std::thread 构造函数中的静态断言 static_assert

报错:error: static assertion failed: std::thread arguments must be invocable after conversion to rvalues

1
2
3
4
5
6
7
8
9
void thread_func(int arg) { }

int main(int argc, char const *argv[]) {
std::thread trd_1{thread_func, "str"}; // arg 类型不对
std::thread trd_2{thread_func}; // 缺少 arg

// ...
return 0;
}
  1. 将线程入口函数 __f 及其参数 __args 进一步封装起来。

这里是使用 __make_invoker 完成的:

1
__make_invoker(std::forward<_Callable>(__f), std::forward<_Args>(__args)...);

__make_invoker 的作用是返回一个 _Invoker 对象,_Invoker 是个仿函数,通过 _Invoker() 就可以以指定的参数 __args 直接执行线程入口函数 __f。类似于 std::bind

1
2
3
4
5
6
7
8
9
10
void print_num(int i) {
std::cout << i << '\n';
}

int main(int argc, const char* argv[]) {
// wrapper
auto invoker = std::bind(print_num, -9);
// 直接调用 invoker() 就可以以指定参数 -9 调用 print_num
invoker();
}
  1. 启动子线程

在调用 _M_start_thread 函数启动子线程前,执行过程:

  • 创建 _State_ptr 的对象,来封装 _Invoker 对象,再传递给 _M_start_thread 函数。
  • 传递 _M_start_thread 函数的过程,由 _S_make_state 函数完成,_S_make_state 最终返回 _State_ptr 对象。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
// 基类
struct _State {
virtual ~_State(); // 虚析构函数
virtual void _M_run() = 0; // 线程运行函数
};
using _State_ptr = unique_ptr<_State>; // 父类指针

// 子类
template<typename _Callable>
struct _State_impl : public _State {
_Callable _M_func; // 线程入口函数

_State_impl(_Callable&& __f) : _M_func(std::forward<_Callable>(__f))
{ }

void _M_run() { _M_func(); } // 执行线程入口函数
};

// 传入_Invoker对象,返回 _State_ptr 对象
template<typename _Callable>
static _State_ptr _S_make_state(_Callable&& __f) {
using _Impl = _State_impl<_Callable>;
// 使用子类对象来初始化父类
return _State_ptr{new _Impl{std::forward<_Callable>(__f)}};
}

_S_make_state 函数,将线程入口函数 __f 及其参数 __args 封装到 _State_ptr 对象 _State_ptr_obj 中, 这样最后可以通过 _State_ptr_obj->_M_run() 来调用 __f

下面到了 _M_start_thread 函数:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
void thread::_M_start_thread(_State_ptr state, void (*)())
{
const int err = __gthread_create(&_M_id._M_thread,
&execute_native_thread_routine, // 线程执行函数
state.get());
if (err)
__throw_system_error(err);
state.release();
}

// 内部调用的是 pthread_create 函数
static inline int __gthread_create(pthread_t *__threadid, void *(*__func) (void*), void *__args)
{
return pthread_create(__threadid, NULL, __func, __args);
}

// 内部执行线程入口函数
static void* execute_native_thread_routine(void* __p)
{
thread::_State_ptr __t{static_cast<thread::_State*>(__p)};
__t->_M_run(); // 运行线程入口函数
return nullptr;
}

因此,在执行完 _M_start_thread 函数后,才具有 _M_start_thread != 0

Mutex

有时候需要限制多个线程对同一资源的访问,这时候一般会使用 Mutex。Mutex 就是一把锁,只有某些线程可以同时占用它(通过 lock 操作)。当线程不用的时候,就得通过 unlock 操作来释放它。

对于 Mutex,std::threadpthread 差不多,无非是 pthread_mutex_lock(&mutex) 变成了 mutex.lock() 等等。

不过在 std::thread 中,mutex 往往和 lock 系列模板一起使用。这是因为 lock 系列模板包装了 mutex 类,提供了 RAII 风格的加锁解锁

1
2
3
4
5
6
7
{
// 加锁
std::lock_guard<std::mutex> guard(mutex);
...

// 自动解锁
}

mutex 有四种:

  • std::mutex:独占的互斥量,不能递归使用,不带超时功能
  • std::recursive_mutex:递归互斥量,可重入,不带超时功能
  • std::timed_mutex:带超时的互斥量,不能递归
  • std::recursive_timed_mutex:带超时的互斥量,可以递归使用

加解锁方式有三种:

  • std::lock_guard:可以RAII方式加锁
  • std::unique_lock:比 lock_guard 多了个手动加解锁的功能
  • std::scoped_lock:防止多个锁顺序问题导致的死锁问题而出世的一把锁

Condition Variable

有时候线程之间需要某种同步:当某些条件不满足时,停止执行直到该条件被满足。

这时候需要引入 condition variable —— 状态变量。

在经典的「生产者消费者模式」下,生产者和消费者就是通过 condition variable 来实现同步的。

当有限的生产力无法满足日益增长的消费需求时,消费者进程就会去睡一觉,直到它想要的东西生产出来才醒来。

1
2
3
4
5
6
7
8
std::condition_variable condvar;

consumer:
std::unique_lock<std::mutex> ulock(mutex);
condvar.wait(ulock, []{ return msgQueue.size() > 0;});

producer:
condvar.notify_all();
  • condition_variable 需要和 unique_lock 搭配使用
  • 在一个线程调用 wait 之前,它必须持有 unique_lock
  • wait 被调用时,该锁会被释放,线程会陷入沉睡,等待着生产者发过来的唤醒信号
  • 当生产者调用同一个 condition_variablenotify_all 方法时,所有沉睡在该变量前的消费者会被唤醒,并尝试重新获取之前释放的 unique_lock,继续执行下去(注意这里发生了锁争用,只有一个消费者能够获得锁,其他消费者得等待该消费者释放锁)
  • 如果只想叫醒一个线程,可以用 notify_one
  • pthread 中也提供了对应的方法,分别是 pthread_cond_wait, pthread_cond_broadcast, pthread_cond_signal

wait 可以接受两个参数,此时第二个参数用于判断当前是否要沉睡。

1
[]{ return msgQueue.size() > 0;});

相当于

1
2
3
while (msgQueue.size() <= 0) {
condvar.wait()
}

为了防止线程无限等待(可能一直没有唤醒),通过 wait_untilwait_for,你可以设定线程的等待时间。设置 notify_all_at_thread_exit 也许能帮得上忙。

pthread 中,对应的调用是 pthread_cond_timedwait

More

C++11 的线程库还提供了其他多线程编程的概念,比如 futureatomic

future

future 位于头文件 <future> 下,包装了未来某个计算结果的期诺。

当你对所获得的 future 调用 get 时,程序会一直阻塞直到 future 的值被计算出来(如果 future 的值已经计算出来了,get 调用会立刻获得返回值),而这一切都是在后台执行的。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
#include <chrono>
#include <iostream>
#include <future>

using namespace std;

int main()
{
future<int> f1 = async(launch::async, [](){
std::chrono::milliseconds dura(2000);
std::this_thread::sleep_for(dura);
return 0;
});

future<int> f2 = async(launch::async, [](){
std::chrono::milliseconds dura(2000);
std::this_thread::sleep_for(dura);
return 1;
});

cout << "Results are: " << f1.get() << " " << f2.get() << "\n";
return 0;
}
1
2
3
4
5
$ g++ -std=c++11 -pthread ./future.cpp

$ time ./a.out
Results are: 0 1
./a.out 0.00s user 0.00s system 0% cpu 2.012 total # 是两秒左右而不是四秒

除了 asyncpackaged_taskpromise 也都返回一个 future

atomic

atomic 位于头文件 <atomic> 下,实现了类似于 java.util.concurrent.atomic 的功能。它提供了一组轻量级的、作用在单个变量上的原子操作,是 volatile 的替代品,有些时候你也可以用它来替换掉 lock(假如整个 race condition 中只有单个变量)

下面这个例子解释了什么叫做原子操作:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
#include <atomic>
#include <iostream>
#include <thread>

using namespace std;

const int NUM = 100;

int target = 0;
atomic<int> atomicTarget(0);

template<typename T>
void atomicPlusOne(int trys)
{
while (trys > 0) {
atomicTarget.fetch_add(1);
--trys;
}
}

void plusOne(int trys)
{
while (trys > 0) {
++target;
--trys;
}
}

int main()
{
thread threads[NUM];
thread atomicThreads[NUM];
for (int i = 0; i < NUM; i++) {
atomicThreads[i] = thread(atomicPlusOne<int>, 10000);
}
for (int i = 0; i < NUM; i++) {
threads[i] = thread(plusOne, 10000);
}

for (int i = 0; i < NUM; i++) {
atomicThreads[i].join();
}
for (int i = 0; i < NUM; i++) {
threads[i].join();
}

cout << "Atomic target's value : " << atomicTarget << "\n";
cout << "Non-atomic target's value : " << target << "\n";

return 0;
}
1
2
3
4
# atomicTarget 的值总是固定的,而 target 的值每次运行时各不相同
$ g++ -std=c++11 -pthread ./atom.cpp
Atomic target's value : 1000000
Non-atomic target's value : 842480

Pros & Cons

最后总结下 std::thread 对比于 pthread 的优缺点:

优点:

  1. 简单,易用
  2. 跨平台,pthread 只能用在 POSIX 系统上(其他系统有其独立的 thread 实现)
  3. 提供了更多高级功能,比如 future
  4. 更加 C++(与匿名函数std::bind,RAII 等 C++ 特性更好的集成)

缺点:

  1. 没有 RWlock:有一个类似的 shared_mutex,不过它属于 C++14,你的编译器很有可能不支持
  2. 操作线程和 Mutex 等的 API 较少:毕竟为了跨平台,只能选取各原生实现的子集。如果你需要设置某些属性,需要通过 API 调用返回原生平台上的对应对象,再对返回的对象进行操作。

生产者消费者(pthread & thread 版本)

附上我自己写的,分别用 std::threadpthread 实现的多生产者多消费者程序。

注意行数上的差距。

pthread 版本

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
#include <pthread.h>
#include <queue>
#include <stdio.h>
#include <unistd.h>

// 注意 pthread_* 函数返回的异常值,为了简单(偷懒),我没有去处理它们

pthread_mutex_t mutex;
pthread_cond_t condvar;

std::queue<int> msgQueue;
struct Produce_range {
int start;
int end;
};

void *producer(void *args)
{
int start = static_cast<Produce_range *>(args)->start;
int end = static_cast<Produce_range *>(args)->end;
for (int x = start; x < end; x++) {
usleep(200 * 1000);
pthread_mutex_lock(&mutex);
msgQueue.push(x);
pthread_mutex_unlock(&mutex);
pthread_cond_signal(&condvar);
printf("Produce message %d\n", x);
}
pthread_exit((void *)0);
return NULL;
}

void *consumer(void *args)
{
int demand = *static_cast<int *>(args);
while (true) {
pthread_mutex_lock(&mutex);
if (msgQueue.size() <= 0) {
pthread_cond_wait(&condvar, &mutex);
}
if (msgQueue.size() > 0) {
printf("Consume message %d\n", msgQueue.front());
msgQueue.pop();
--demand;
}
pthread_mutex_unlock(&mutex);
if (!demand) break;
}
pthread_exit((void *)0);
return NULL;
}


int main()
{
pthread_attr_t attr;
pthread_attr_init(&attr);
pthread_mutex_init(&mutex, NULL);
pthread_cond_init(&condvar, NULL);

pthread_t producer1, producer2, producer3, consumer1, consumer2;

Produce_range range1 = {0, 10};
pthread_create(&producer1, &attr, producer, static_cast<void *>(&range1));
Produce_range range2 = {range1.end, range1.end + 10};
pthread_create(&producer2, &attr, producer, static_cast<void *>(&range2));
Produce_range range3 = {range2.end, range2.end + 10};
pthread_create(&producer3, &attr, producer, static_cast<void *>(&range3));

int consume_demand1 = 20;
int consume_demand2 = 10;
pthread_create(&consumer1, &attr, consumer, static_cast<void *>(&consume_demand1));
pthread_create(&consumer2, &attr, consumer, static_cast<void *>(&consume_demand2));

pthread_join(producer1, NULL);
pthread_join(producer2, NULL);
pthread_join(producer3, NULL);
pthread_join(consumer1, NULL);
pthread_join(consumer2, NULL);
}

std::thread 版本

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
#include <chrono>
#include <condition_variable>
#include <future>
#include <mutex>
#include <queue>

// 注意某些调用可能会抛出std::system_error, 为了简单(偷懒),我没有去捕获
std::mutex mutex;
std::condition_variable condvar;

std::queue<int> msgQueue;

void producer(int start, int end)
{
for (int x = start; x < end; x++) {
std::this_thread::sleep_for(std::chrono::milliseconds(200));
{
std::lock_guard<std::mutex> guard(mutex);
msgQueue.push(x);
}
printf("Produce message %d\n", x);
condvar.notify_all();
}
}

void consumer(int demand)
{
while (true) {
std::unique_lock<std::mutex> ulock(mutex);
condvar.wait(ulock, []{ return msgQueue.size() > 0;});
// wait的第二个参数使得显式的double check不再必要
printf("Consume message %d\n", msgQueue.front());
msgQueue.pop();
--demand;
if (!demand) break;
}
}


int main()
{
std::thread producer1(producer, 0, 10);
std::thread producer2(producer, 10, 20);
std::thread producer3(producer, 20, 30);
std::thread consumer1(consumer, 20);
std::thread consumer2(consumer, 10);

producer1.join();
producer2.join();
producer3.join();
consumer1.join();
consumer2.join();
}

参考资料


本站总访问量
本站共发表 94 篇文章 · 总计 325.9k 字
载入天数...载入时分秒...