0%

线程同步

进程有自己的独立地址空间,因此进程之间重点关注通信。而对于线程来说,除了线程栈外其他数据都是共享的,如果同时读写数据可能造成数据不一致甚至程序崩溃的后果,因此线程之间重点关注同步

线程同步是指在多线程编程中控制多个线程之间的执行顺序或共享资源访问的过程。在多线程环境中,由于线程的并发执行,可能会导致数据竞争、资源冲突等问题。

1. 竞争条件

在多线程编程中,竞争(Race condition)是指两个或多个线程对共享资源的访问产生的不确定性行为。竞争条件通常发生在多个线程同时访问共享资源时,其中至少一个线程试图修改这些资源的值。在多线程并发场景下指令执行的先后顺序由内核决定,同一个线程内部指令按照先后顺序执行,但不同线程之间的指令执行先后顺序是不一定的。如果执行结果依赖于不同线程执行的先后顺序,那么就会形成“竞争条件”,产生非预期的计算结果,导致程序崩溃等问题。

最常见的解决竞争条件的方式是原子操作,其次便是线程同步

原子操作是指不可被中断的操作,在执行过程中不会被其他线程或进程干扰,要么全部执行成功,要么全部不执行,不会出现部分执行的情况。原子操作通常是基本的、不可分割的操作单元。

原子操作通常由硬件或者操作系统提供支持,C++11引入了std::atomic模板类来支持原子操作。

2. 线程同步

常见的线程同步的方式有四种:互斥锁、读写锁、条件变量和信号量

2.1 互斥锁

互斥锁(又名互斥量)强调的是资源之间的访问互斥:每个线程在对共享资源操作前都会尝试先加锁,加锁成功才能操作,操作结束之后解锁。

某个线程对互斥量加锁后,其他线程必须等待该线程释放锁才能继续访问共享资源。如果释放互斥锁时有多个线程阻塞,所有在该互斥锁上的阻塞线程都会变成可运行状态。第一个变成运行状态的线程可以对互斥量加锁,其余线程将会看到互斥量依然被锁住,只能回去再次等待它重新变为可用。

mutex是睡眠等待(sleep waiting)类型的锁,当线程抢互斥锁失败的时候,线程会陷入休眠。优点就是节省CPU资源,缺点就是休眠唤醒会消耗一点时间。

互斥锁的接口通常包括 mutex_init()mutex_lock()mutex_unlock()mutex_destroy()

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
#include <pthread.h>

// 初始化互斥锁
pthread_mutex_t mutex;
pthread_mutex_init(&mutex, NULL);

// 加锁
pthread_mutex_lock(&mutex);

// 解锁
pthread_mutex_unlock(&mutex);

// 销毁互斥锁
pthread_mutex_destroy(&mutex);

2.2 读写锁

读写锁和互斥量类似,是另一种实现线程同步的方式,但是它将操作分为读、写两种方式,可以多个线程同时占用读模式,但只允许一个线程写入。

  • 写独占:写锁占用时,其他线程加读锁或者写锁时都会阻塞(并非失败)
  • 读共享:读锁占用时,其他线程加写锁时会阻塞,加读锁会成功

读写锁有两种策略:

  • 强读同步:读锁优先,只要写锁没有占用那么就可以加读锁
  • 强写同步:写锁优先,只能等到所有正在等待或者执行的写锁执行完成后才能加读锁

大部分读写锁的实现都采用的是“强写同步”策略,对尝试加锁的操作进行排队,如果前面已经有尝试加写被锁阻塞住的话,后续加读锁也都会被阻塞住(尽管当前时刻是读锁占用的状态)。这样做的目的主要是为了避免“写饥饿”,在“多读少写”的情况下防止数据修改延迟过高。

读写锁的接口通常包括 rwlock_init()rwlock_rdlock()rwlock_wrlock()rwlock_unlock()rwlock_destroy()

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
#include <pthread.h>

// 初始化读写锁
pthread_rwlock_t rwlock;
pthread_rwlock_init(&rwlock, NULL);

// 读取加锁
pthread_rwlock_rdlock(&rwlock);

// 写入加锁
pthread_rwlock_wrlock(&rwlock);

// 解锁
pthread_rwlock_unlock(&rwlock);

// 销毁读写锁
pthread_rwlock_destroy(&rwlock);

2.3 条件变量

严格意义上来说,条件变量的主要作用不是处理线程同步, 而是进行线程的阻塞。如果在多线程程序中只使用条件变量无法实现线程的同步, 必须要配合互斥锁来使用。

线程可以等待某个条件变量的发生,如果条件不满足,则线程会阻塞等待,并在条件满足时被唤醒。

条件变量的接口通常包括 cond_init()cond_wait()cond_signal()cond_broadcast()

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
#include <pthread.h>

// 初始化条件变量和关联的互斥锁
pthread_cond_t cond = PTHREAD_COND_INITIALIZER;
pthread_mutex_t mutex = PTHREAD_MUTEX_INITIALIZER;

// 等待条件满足
pthread_mutex_lock(&mutex);
pthread_cond_wait(&cond, &mutex);
pthread_mutex_unlock(&mutex);

// 发送信号通知条件满足
pthread_cond_signal(&cond);

// 广播通知条件满足
pthread_cond_broadcast(&cond);

2.4 信号量

信号量是一种更为通用的同步原语,用来控制多个线程对共享资源的访问。

信号量本质上是一个非负的整数计数器,表示可用资源的数量。当资源被占用时,计数器减少;当资源释放时,计数器增加。

信号量通常分为两种类型:二进制信号量和计数信号量。

  1. 二进制信号量(Binary Semaphore):
    • 二进制信号量只能取两个值,通常是0和1,分别表示资源的可用和不可用状态。
    • 二进制信号量常用于实现互斥锁(Mutex),用于保护对临界区(Critical Section)的访问,确保同一时间只有一个线程可以访问共享资源。
  2. 计数信号量(Counting Semaphore):
    • 计数信号量可以取多个值,通常是一个非负整数,表示资源的可用数量。
    • 计数信号量常用于限制对资源的访问数量,例如线程池中限制并发执行的线程数量。

信号量提供了两种操作:P操作(等待信号量)和V操作(释放信号量),用于控制资源的访问和释放,也被称为PV原子操作:

  • P操作:即信号量sem减一,若sem小于等于0则P操作被阻塞,直到sem变量大于0为止
  • V操作:即信号量sem加一

信号量的接口通常包括 sem_init()sem_wait()sem_post()sem_destroy()

  1. **sem_init()**:用于初始化一个信号量。

    1
    cCopy codeint sem_init(sem_t *sem, int pshared, unsigned int value);
    • sem:指向要初始化的信号量的指针。
    • pshared:指定信号量是进程共享的还是线程共享的。可以传入 0 表示信号量是线程共享的,非零值表示信号量是进程共享的。
    • value:指定信号量的初始值。
  2. **sem_destroy()**:用于销毁一个信号量。

    1
    cCopy codeint sem_destroy(sem_t *sem);
    • sem:指向要销毁的信号量的指针。
  3. **sem_wait()**:用于等待(阻塞)一个信号量。

    1
    cCopy codeint sem_wait(sem_t *sem);
    • sem:指向要等待的信号量的指针。
  4. **sem_post()**:用于释放一个信号量。

    1
    cCopy codeint sem_post(sem_t *sem);
    • sem:指向要释放的信号量的指针。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
#include <stdio.h>
#include <pthread.h>
#include <semaphore.h>

#define NUM_THREADS 5

sem_t semaphore;

void *thread_func(void *arg) {
int id = *((int *)arg);
sem_wait(&semaphore);
printf("Thread %d is printing.\n", id);
sem_post(&semaphore);
pthread_exit(NULL);
}

int main() {
pthread_t threads[NUM_THREADS];
int thread_args[NUM_THREADS];

// 初始化信号量,初始值为1
sem_init(&semaphore, 0, 1);

// 创建多个线程
for (int i = 0; i < NUM_THREADS; ++i) {
thread_args[i] = i;
pthread_create(&threads[i], NULL, thread_func, (void *)&thread_args[i]);
}

// 等待所有线程结束
for (int i = 0; i < NUM_THREADS; ++i) {
pthread_join(threads[i], NULL);
}

// 销毁信号量
sem_destroy(&semaphore);

return 0;
}

3. C++示例

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
#include <iostream>
#include <thread>
#include <mutex>
#include <shared_mutex>
#include <condition_variable>
#include <semaphore.h>
#include <unistd.h> // 用于 sleep 函数

using namespace std;

// 共享资源
int shared_data = 0;

// 互斥锁
mutex mtx;

// 读写锁
shared_mutex rw_mtx;

// 条件变量
condition_variable cond_var;
bool ready = false;

// 信号量
sem_t semaphore;

// 互斥锁示例
void mutex_example(int id) {
mtx.lock();
cout << "Thread " << id << " is accessing shared resource with mutex. Shared data: " << shared_data << endl;
shared_data++;
mtx.unlock();
}

// 读写锁示例
void rw_lock_example(int id) {
if (id % 2 == 0) {
// 读操作
shared_lock<shared_mutex> lock(rw_mtx);
cout << "Thread " << id << " is reading shared resource with read-write lock. Shared data: " << shared_data << endl;
} else {
// 写操作
unique_lock<shared_mutex> lock(rw_mtx);
cout << "Thread " << id << " is writing shared resource with read-write lock. Shared data: " << shared_data << endl;
shared_data++;
}
}

// 条件变量示例
void cond_var_example(int id) {
unique_lock<mutex> lock(mtx);
cond_var.wait(lock, []{ return ready; });
cout << "Thread " << id << " is accessing shared resource with condition variable. Shared data: " << shared_data << endl;
shared_data++;
}

// 信号量示例
void semaphore_example(int id) {
sem_wait(&semaphore);
cout << "Thread " << id << " is accessing shared resource with semaphore. Shared data: " << shared_data << endl;
shared_data++;
sem_post(&semaphore);
}

int main() {
// 初始化信号量
sem_init(&semaphore, 0, 1);

// 创建线程
thread t1(mutex_example, 1);
thread t2(mutex_example, 2);
thread t3(rw_lock_example, 3);
thread t4(rw_lock_example, 4);
thread t5(cond_var_example, 5);
thread t6(cond_var_example, 6);
thread t7(semaphore_example, 7);
thread t8(semaphore_example, 8);

// 通知条件变量
sleep(1);
{
lock_guard<mutex> lock(mtx);
ready = true;
}
cond_var.notify_all();

// 等待线程结束
t1.join();
t2.join();
t3.join();
t4.join();
t5.join();
t6.join();
t7.join();
t8.join();

// 销毁信号量
sem_destroy(&semaphore);

return 0;
}

4. 死锁

死锁(Deadlock)是指两个或多个进程(线程)在互相等待对方持有的资源而无法继续执行的情况。在死锁状态下,每个进程都在等待某个资源被释放,而该资源被其他进程所持有,导致所有进程都无法继续执行。

死锁发生的主要原因通常是由于多个进程同时持有某些资源,并且每个进程都在等待其他进程释放它所需要的资源。