0%

C++多线程

互斥锁(mutex)

互斥锁(Mutex)是一种同步机制,用于在多线程程序中保护共享资源,防止多个线程同时访问和修改共享资源而导致竞争条件的发生。互斥锁通过在对共享资源的访问前先获得锁来确保同一时刻只有一个线程能够访问共享资源,其他线程必须等待该线程释放锁后才能访问。

mutex提供了4种互斥类型:

  • std::mutex:独占的互斥量,不能递归使用,不带超时功能
  • std::recursive_mutex:递归互斥量,可重入,不带超时功能
  • std::timed_mutex:带超时的互斥量,不能递归
  • std::recursive_timed_mutex:带超时的互斥量,可以递归使用

1. 创建和初始化互斥锁

在C++中,可以使用std::mutex类来创建和使用互斥锁。通常情况下,我们在全局范围内定义一个互斥锁对象,或者在需要保护的共享资源的类中定义一个互斥锁成员变量。

1
2
#include <mutex>
std::mutex mtx; // 全局范围内定义一个互斥锁对象

2. 加锁和解锁

在访问共享资源之前,线程需要先获取互斥锁,以确保其他线程不会同时访问该资源。获取锁时,线程会阻塞,直到它成功地获得了锁为止。使用完共享资源后,线程需要释放锁,以允许其他线程访问该资源。

1
2
3
mtx.lock();// 加锁
/*访问共享资源的代码*/
mtx.unlock();// 解锁

3. lock_guard

除了lock()unlock()方法外,还可以使用std::lock_guard自动管理锁的加锁和解锁std::lock_guard是一个RAII(资源获取即初始化)类型,它在创建时自动获取锁,在销毁时自动释放锁,从而避免忘记手动解锁而导致的死锁或资源泄漏。

1
2
3
4
5
6
7
8
#include <mutex>

std::mutex mtx;

void someFunction() {
std::lock_guard<std::mutex> guard(mtx); // 自动加锁
// 访问共享资源的代码
} // 在 guard 超出作用域时自动解锁

创建一个名为 guardstd::lock_guard 对象,用于管理名为 mtx 的互斥锁。在 lock 对象的作用域结束时,会自动释放 mtx 互斥锁,即使在作用域内发生异常也会自动释放。这样做可以确保互斥锁在不再需要时被正确释放,避免了手动调用 lock()unlock() 方法可能带来的错误和忘记释放锁的风险。

4. unique_lock

std::unique_lock 也是 C++ 标准库提供的一个 RAII 类型,用于管理互斥锁的加锁和解锁,类似于 std::lock_guard。但与 std::lock_guard 不同的是,std::unique_lock 具有更多的灵活性和功能。它可以在创建时选择是否加锁,也可以手动释放锁,并且可以在未加锁的情况下等待条件变量。下面详细讲解 std::unique_lock 的用法:

  1. 创建 std::unique_lock 对象
1
2
3
#include <mutex>

std::mutex mtx;
  1. 使用 std::unique_lock 自动管理锁
1
2
3
4
void someFunction() {
std::unique_lock<std::mutex> lock(mtx); // 自动加锁
// 访问共享资源的代码
} // 在 lock 超出作用域时自动解锁
  1. 手动控制加锁和解锁

std::unique_lock 允许手动控制锁的加锁和解锁。例如:

1
2
3
4
std::unique_lock<std::mutex> lock(mtx, std::defer_lock); // 不加锁
lock.lock(); // 手动加锁
// 访问共享资源的代码
lock.unlock(); // 手动解锁
  1. std::unique_lock 还可以在未加锁的情况下等待条件变量,从而避免了手动释放锁后再等待条件变量的复杂过程。
1
2
3
4
5
6
7
8
9
10
#include <condition_variable>

std::condition_variable cv;

void someFunction() {
std::unique_lock<std::mutex> lock(mtx);
// 等待条件变量
cv.wait(lock, []{ /* 条件函数 */ });
// 条件满足后继续执行
}

std::unique_lock 对象 lock 会自动加锁,然后等待条件变量 cv。当条件满足时,会自动解锁并继续执行。

原子操作-atomic

有两个线程,一个要写数据,一个读数据,如果不加锁可能会造成读写值混乱,使用std::mutex可以使得执行不会导致混乱,但是每一次循环都要加锁解锁使得程序开销很大。为了提高性能,C++11提供了原子类型std::atomic,它提供了多线程间的原子操作。原子操作是不可分割的操作,要么完全执行,要么完全不执行,不会被其他线程中断。

原子类型是封装了一个值的类型,它的访问保证不会导致数据的竞争,并且可以用于在不同的线程之间同步内存访问。从效率上来说,原子操作要比互斥量的方式效率要高

  1. 创建 std::atomic 对象
1
2
3
#include <atomic>

std::atomic<int> atomicVariable;

创建了一个名为 atomicVariablestd::atomic<int> 对象,表示一个原子的整型变量。

  1. 原子操作

std::atomic 提供了一系列原子操作,包括读取、写入、加法、减法等。这些操作可以保证在多线程环境中的原子性,从而避免竞争条件。

1
2
3
4
5
6
7
8
9
atomicVariable.store(10); // 将10存储到原子变量中

int value = atomicVariable.load(); // 从原子变量中加载值

atomicVariable.fetch_add(5); // 原子地将5加到atomicVariable上

atomicVariable.fetch_sub(3); // 原子地将atomicVariable减3

int oldValue = atomicVariable.exchange(20); // 原子地将atomicVarible的值交换为 20,并返回之前的值
  1. 示例
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
#include <iostream>
#include <thread>
#include <atomic>

std::atomic<int> counter(0); // 声明一个原子整型变量并初始化为 0

void incrementCounter(int numIncrements) {
for (int i = 0; i < numIncrements; ++i) {
counter++; // 原子地递增 counter 的值
}
}

int main() {
constexpr int numThreads = 4; // 定义线程数量
constexpr int numIncrementsPerThread = 1000000; // 每个线程递增的次数

std::thread threads[numThreads]; // 创建线程数组

// 启动多个线程,并分别调用 incrementCounter 函数
for (int i = 0; i < numThreads; ++i) {
threads[i] = std::thread(incrementCounter, numIncrementsPerThread);
}

// 等待所有线程执行完毕
for (int i = 0; i < numThreads; ++i) {
threads[i].join();
}

std::cout << "Final value of counter: " << counter.load() << std::endl; // 输出最终的 counter 值

return 0;
}

运行结果:

1
Final value of counter: 4000000

条件变量condition_varible

用于实现线程之间的条件等待通知机制。它通常与 std::mutex(互斥锁)一起使用,用于在某个条件满足时唤醒等待的线程。主要包括两个动作:

  1. 一个线程等待条件变量的条件成立而挂起(wait)
  2. 另一个线程使条件成立(notify_onenotify_all)

先来看一个示例:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
#include <iostream>
#include <thread>
#include <mutex>
#include <condition_variable>

std::mutex mtx;
std::condition_variable cv;
bool isReady = false;

void waitingThread(){
std::unique_lock<std::mutex> lock(mtx); //自动上锁
while(!isReady){
cv.wait(lock);
}
std::cout << "Condition is met, continuing..." << std::endl;
}

int main(){
std::thread t1(waitingThread);
std::this_thread::sleep_for(std::chrono::seconds(5));//主线程休眠5秒,模拟主线程工作
{
std::unique_lock<std::mutex> lock(mtx);
isReady = true;
}// lock超出作用域时自动释放互斥锁
cv.notify_one(); //通知等待的线程
t1.join();

return 0;
}

1. 等待条件的线程

1
2
3
4
5
6
7
void waitingThread(){
std::unique_lock<std::mutex> lock(mtx); //自动上锁
while(!isReady){
cv.wait(lock);
}
std::cout << "Condition is met, continuing..." << std::endl;
}

它会执行如下步骤:

  1. 获取与条件变量相关联的互斥锁
  2. 进入 while 循环,检查条件是否满足。如果条件已经满足,线程会跳过等待,并继续执行后续代码。
  3. 如果条件尚未满足,则调用 cv.wait(lock) 函数,将当前线程置于阻塞状态,并释放互斥锁。以允许其他线程访问共享资源。
  4. 直到其他线程调用了与条件变量相关联的 notify_one()notify_all() 函数,条件变量被通知。该线程被唤醒,并会重新获取互斥锁,继续执行whie循环,检查条件是否满足。
  5. 如果条件满足,则线程会退出 while 循环,继续执行后续代码。

2. 设置条件并通知等待的线程

主线程负责设置条件并通知等待的线程

1
2
3
std::unique_lock<std::mutex> lock(mtx); // 获取互斥锁
isReady = true;
cv.notify_one(); // 通知等待的线程
  1. 在修改条件之前,必须先获得与条件变量关联的互斥锁,并在修改后立即释放锁。
  2. 然后,通过 cv.notify_one()cv.notify_all() 来通知等待的线程条件已经发生改变。

异步任务-async、future

  • 已经有多线程thread了,为什么还要有async?
    线程毕竟是属于比较低层次的东西,有时候使用有些不便,比如希望获取线程函数的返回结果的时候,就不能直接通过 thread.join()得到结果,这时就必须定义一个变量,在线程函数中去给这个变量赋值,然后join,最后得到结果,这个过程是比较繁琐的。

  • C++11 提供了**std::async()**,用于创建异步任务,即在一个新的线程中调用线程函数,并返回一个 std::future 对象,这个future中存储了线程函数返回的结果。

简单示例:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
#include <iostream>
#include <future>
#include <chrono>

// 耗时操作函数,返回一个整数
int timeConsumingOperation() {
// 模拟一个耗时操作,这里暂停 2 秒钟
std::this_thread::sleep_for(std::chrono::seconds(2));
return 42;
}

int main() {
// 创建一个异步任务,异步执行 timeConsumingOperation 函数
std::future<int> fut = std::async(std::launch::async, timeConsumingOperation);

// 执行其他操作
std::cout << "Performing other tasks..." << std::endl;

// 等待异步操作完成并获取结果
int result = fut.get();

// 输出异步操作的结果
std::cout << "Result of asynchronous operation: " << result << std::endl;

return 0;
}

概括std::async()的用法:

1. 创建异步任务并获取future 对象

1
2
3
#include <future>

std::future<int> fut = std::async(std::launch::async, func);

创建了一个异步任务,异步任务会立即在一个新线程中执行,线程调用函数func(),将函数的返回值赋给了future对象fut

2. 获取异步任务的值

1
auto result = fut.get();

需要获取异步操作的结果时,调用 get() 函数来获取 std::future 对象的值。如果异步操作还没有完成,get() 函数会阻塞当前线程,直到异步操作完成并返回结果。

如何检查异步任务是否完成:

1
bool state = fut.valid();

可以调用 valid() 函数来检查 std::future 对象是否有效。如果 std::future 对象与异步操作相关联,并且异步操作尚未完成,则 valid() 函数返回 true,否则返回 false

3. 异步执行策略

std::async() 函数提供的三种异步执行策略。它们决定了 std::async() 函数创建的异步任务的执行方式。

1. std::launch::async

  • std::launch::async 策略表示创建一个新的线程,在新的线程中异步执行指定的可调用对象。
  • 这意味着异步任务会立即在一个新的线程中执行,不会阻塞当前线程。
  • 使用 std::launch::async 策略创建的异步任务可以实现并行执行,适用于耗时的计算任务和I/O操作等。
1
std::future<int> fut = std::async(std::launch::async, task);

2. std::launch::deferred

  • std::launch::deferred 策略表示延迟执行指定的可调用对象,直到调用 get() 函数时才在调用线程中执行。
  • 这种策略不会创建新的线程,而是在需要时延迟执行。
  • 使用 std::launch::deferred 策略创建的异步任务不会立即执行,直到调用 get() 函数时才执行,适用于延迟执行和惰性求值等场景。
1
std::future<int> fut = std::async(std::launch::deferred, task);

3. std::launch::async | std::launch::deferred

  • std::launch::async | std::launch::deferred 表示由实现自行选择执行策略。
  • 这种策略允许实现根据具体情况自行选择执行方式,可以在新的线程中异步执行,也可以在调用线程中延迟执行。
  • 使用 std::launch::async | std::launch::deferred 策略创建的异步任务有可能在新的线程中执行,也有可能在调用线程中延迟执行,具体取决于实现。
1
std::future<int> fut = std::async(std::launch::async | std::launch::deferred, task);