贡献者: addis
std::thread
底层通常使用 pthread。
#include <thread>
,#include <mutex>
std::thread th(函数, arg1, arg2, ...)
; 创建一个线程,调用 函数
(可以是函数指针,函数对象,lambda),arg
是 函数
的变量。但如果 函数
有参数是 reference,对应的 arg1
等需要用 std::ref(变量)
或者 std::cref(变量)
th.join()
可以让当前线程休眠等待某个线程自己退出再让当前线程继续运行。
std::mutex m
可以避免多个线程操作同一数据。该类型不可以 copy 只能通过 reference 传参。
m.lock()
给 mutex 上锁,如果已经被别人上锁就暂停该线程并等待解锁。m.try_lock()
试图上锁,如果已经被别人上锁就返回 false
。
m.unlock()
解锁,以便别的线程上锁。
m.lock()
和 m.unlock()
之间发生 throw(这样就无法自动解锁),通常不直接调用他们,而是用 std::lock_guard<std::mutex> guard(m)
。constructor 相当于 lock,destructor 相当于 unlock。
unique_lock<std::mutex>
类似于 lock_guard
但更灵活。可以用 .lock()
和 .unlock()
多次手动上锁和解锁。
std::this_thread::sleep_for(std::chrono::seconds(2));
可以让某个线程休眠若干时间
sleep_for
不是 busy wait 而是像 cin
等待时一样运行 cpu 执行别的东西,也就是让系统接管,直到某个事件发生然后再继续。
例程(编译时要给 linker 加上 -l pthread
):
#include <thread>
#include <mutex>
#include <chrono>
using namespace std;
mutex mut;
// A dummy function
void myfun(int id, int *i)
{
if (id == 1)
this_thread::sleep_for(chrono::milliseconds(100));
lock_guard<mutex> guard(mut);
printf("id = %d, i = %d\n", id, *i);
int j = *i+1;
if (id == 2)
this_thread::sleep_for(chrono::milliseconds(100));
*i = j;
}
int main()
{
int i = 0;
thread th1(myfun, 1, &i);
thread th2(myfun, 2, &i);
myfun(0, &i);
th1.join();
th2.join();
return 0;
}
另外也可以把 mut
声明为 myfun()
中的一个 static
变量。
另外注意 mutex
不光适用于 std::thread
,对任何其他 threading 库例如 pthread
或者 OpenMP 都有效。
static
替换为 thread_local
,可以让其在每个线程中保持自己独立的 copy,使函数变得 thread safe。例如上面的 myfun
中如果声明了 thread_local static vector<int> a;
,那么每个线程第一次调用该函数时就会生成一个独立的变量 a
,若一个线程多次调用该函数,那么每次调用时 a
都会有上次调用结束前的值。
condition_variable
用于 block 一个或者多个线程(也就是让它睡觉,腾出 CPU 算力做别的事情),直到某个另外的线程发送一个信号,可以由操作系统唤醒其中一个或全部。每个线程被 block 以前需要检查某个条件是否满足,如果已经满足就不会被 block,不满足就 block,直到被某个 notify 唤醒。但唤醒可能是假唤醒(跟操作系统有关,即使不 notify 也可能会唤醒),所以唤醒以后还要再确认一遍条件。
wait()
在 block 开始时会释放 mutex。唤醒后(包括假唤醒)会重新锁 mutex 再退出 wait()
(如果没有第二个参数),手动检查完条件后还可以手动对条件变量做一些更改,完了再手动释放 mutex。
wait()
的第二个可选参数可以直接把一个 lambda 函数放进去,用于返回条件,这相当于把 wait()
手动放入一个 while(条件)
循环。
#include <iostream>
#include <thread>
#include <mutex>
#include <condition_variable>
#include <queue>
std::mutex mtx;
std::condition_variable cv_producer;
std::condition_variable cv_consumer;
std::queue<int> buffer;
const unsigned int max_buffer_size = 10;
void producer(int id) {
for (int i = 0; i < 20; ++i) {
std::unique_lock<std::mutex> lck(mtx);
// Wait until buffer is not full
cv_producer.wait(lck, []{ return buffer.size() < max_buffer_size; });
buffer.push(i);
std::cout << "Producer " << id << " produced " << i << "\n";
cv_consumer.notify_one(); // Notify one consumer
}
}
void consumer(int id) {
for (int i = 0; i < 10; ++i) {
std::unique_lock<std::mutex> lck(mtx);
// Wait until buffer is not empty
cv_consumer.wait(lck, []{ return !buffer.empty(); });
int item = buffer.front();
buffer.pop();
std::cout << "Consumer " << id << " consumed " << item << "\n";
cv_producer.notify_one(); // Notify one producer
}
}
int main() {
std::thread producers[2];
std::thread consumers[4];
for (int i = 0; i < 2; ++i)
producers[i] = std::thread(producer, i);
for (int i = 0; i < 4; ++i)
consumers[i] = std::thread(consumer, i);
for (auto& p : producers) p.join();
for (auto& c : consumers) c.join();
return 0;
}
c++ 没有定义自旋锁,我们可以简单实现一个:
#include <atomic>
#include <thread>
#include <iostream>
class SpinLock {
private:
std::atomic_flag lockFlag = ATOMIC_FLAG_INIT;
public:
void lock() {
while (lockFlag.test_and_set(std::memory_order_acquire)) {
// Spin-wait (busy wait) until the lock is released
}
}
void unlock() {
lockFlag.clear(std::memory_order_release);
}
};
void exampleFunction(SpinLock& spinLock) {
spinLock.lock();
std::cout << 123 << "234" << std::endl;
spinLock.unlock();
}
int main() {
SpinLock spinLock;
std::thread t1(exampleFunction, std::ref(spinLock));
std::thread t2(exampleFunction, std::ref(spinLock));
t1.join();
t2.join();
return 0;
}