贡献者: addis
std::thread 底层通常使用 pthread。
#include <thread>,#include <mutex>
std::thread th(函数, arg1, arg2, ...); 创建一个线程,调用 函数(可以是函数指针,函数对象,lambda),arg* 是 函数 的变量。但如果 函数 有参数是 reference,对应的 arg* 需要用 std::ref(变量) 或者 std::cref(变量)
th.join() 可以让当前线程休眠等待某个线程自己退出再让当前线程继续运行。
std::mutex m(互斥锁,mutual exclusive lock)可以避免多个线程操作同一数据。该类型不可以 copy 只能通过 reference 传参。
m.lock() 给 mutex 上锁,如果已经被别人上锁就暂停该线程并等待解锁。m.try_lock() 试图上锁,如果已经被别人上锁就返回 false。
m.unlock() 解锁,以便别的线程上锁。
m.lock() 和 m.unlock() 之间发生 throw(这样就无法自动解锁),通常不直接调用他们,而是用 std::lock_guard<std::mutex> guard(m)。constructor 相当于 lock,destructor 相当于 unlock。
std::unique_lock<std::mutex> 类似于 lock_guard 但更灵活。可以用 .lock() 和 .unlock() 多次手动上锁和解锁。
std::this_thread::sleep_for(std::chrono::seconds(2)); 可以让某个线程休眠若干时间
sleep_for 不是 busy wait 而是像 cin 等待时一样运行 cpu 执行别的东西,也就是让系统接管,直到某个事件发生然后再继续。
例程(编译时要给 linker 加上 -l pthread):
#include <thread>
#include <mutex>
#include <chrono>
using namespace std;
mutex mut;
// A dummy function
void myfun(int id, int *i)
{
if (id == 1)
this_thread::sleep_for(chrono::milliseconds(100));
lock_guard<mutex> guard(mut);
printf("id = %d, i = %d\n", id, *i);
int j = *i+1;
if (id == 2)
this_thread::sleep_for(chrono::milliseconds(100));
*i = j;
}
int main()
{
int i = 0;
thread th1(myfun, 1, &i);
thread th2(myfun, 2, &i);
myfun(0, &i);
th1.join();
th2.join();
return 0;
}
另外也可以把 mut 声明为 myfun() 中的一个 static 变量。
另外注意 mutex 不光适用于 std::thread,对任何其他 threading 库例如 pthread 或者 OpenMP 都有效。
static 替换为 thread_local(后者已经包含了前者,但 static thread_local 也是允许的),可以让其在每个线程中保持自己独立的 copy,使函数变得 thread safe。例如上面的 myfun 中如果声明了 thread_local static vector<int> a;,那么每个线程第一次调用该函数时就会生成一个独立的变量 a,若一个线程多次调用该函数,那么每次调用时 a 都会有上次调用结束前的值。
condition_variable 用于 block 一个或者多个线程(也就是让它睡觉,腾出 CPU 算力做别的事情),直到某个另外的线程发送一个信号,可以由操作系统唤醒其中一个或全部。每个线程被 block 以前需要检查某个条件是否满足,如果已经满足就不会被 block,不满足就 block,直到被某个 notify 唤醒。但唤醒可能是假唤醒(跟操作系统有关,即使不 notify 也可能会唤醒),所以唤醒以后还要再确认一遍条件。
wait(),notify 作废,不会保留到以后。
wait() 在 block 开始时会释放 mutex。唤醒后(包括假唤醒)会重新锁 mutex 再退出 wait(),手动检查完条件后还可以手动对条件变量做一些更改,完了再手动释放 mutex。
wait() 的第二个可选参数可以直接把一个 lambda 函数放进去,用于返回条件,这相当于把 wait() 手动放入一个 while(条件) 循环。
#include <iostream>
#include <thread>
#include <mutex>
#include <condition_variable>
#include <queue>
using namespace std;
mutex mtx;
condition_variable cv_producer;
condition_variable cv_consumer;
queue<int> buffer;
const unsigned int max_buffer_size = 10;
int product_id = 0;
void producer(int id) {
for (int i = 0; i < 4; ++i) {
unique_lock<mutex> lck(mtx); // locked
int this_product_id = product_id++;
cout << "+ Producer [" << id << "] producing "
<< this_product_id << "\n";
lck.unlock();
// do some heavy work here in parallel
this_thread::sleep_for(chrono::milliseconds(200));
lck.lock();
auto wake_cond = []{ return buffer.size() < max_buffer_size; };
// wakes when notified and wake_cond is true
// do nothing (no waiting, no unlock) if condition is already true
// 如果 wait() 第一次检查 wake_cond 不要求上锁,那么可能在检查到休眠之间,
// 别人生产了一个且发送了无效的 notify,那这里就可能醒不来了。
// 所以 buffer 必须要在 lock 的时候修改和检查
cv_producer.wait(lck, wake_cond);
// lock re-acquired on wake (or ramain locked)
// followings not thread-safe without lock
// buffer might already be full if not locked
buffer.push(this_product_id);
cout << " Producer [" << id << "] produced "
<< this_product_id << "\n";
// guaranteed waking at least one consumer (lock unnecessary)
// do nothing is all consumers are awake
cv_consumer.notify_one();
// destructor unlocked loc
}
}
void consumer(int id) {
for (int i = 0; i < 2; ++i) {
unique_lock<mutex> lck(mtx); // locked
auto wake_cond = []{ return !buffer.empty(); };
// wakes when notified and wake_cond is true
// do nothing (no waiting, no unlock) if condition is already true
cv_consumer.wait(lck, wake_cond);
// lock re-acqured on wake (or ramain locked)
// not thread-safe if unlocked
// (buffer might be poped by others and empty if unlocked)
int item = buffer.front();
buffer.pop();
cout << "- Consumer [" << id << "] received " << item << "\n";
// loc.unlock();
// do some heavy work here in parallel
this_thread::sleep_for(chrono::milliseconds(200));
// loc.lock();
cout << " Consumer [" << id << "] processed " << item << "\n";
cv_producer.notify_one(); // notify one producer
// destructor unlocked loc
}
}
int main() {
thread producers[2];
thread consumers[4];
int tid = 0;
for (int i = 0; i < 2; ++i)
producers[i] = thread(producer, tid++);
for (int i = 0; i < 4; ++i)
consumers[i] = thread(consumer, tid++);
for (auto& p : producers) p.join();
for (auto& c : consumers) c.join();
cout << "final buffer size: " << buffer.size() << endl;
return 0;
}
c++ 没有定义自旋锁,我们可以简单实现一个:
#include <atomic>
#include <thread>
#include <iostream>
class SpinLock {
private:
std::atomic_flag lockFlag = ATOMIC_FLAG_INIT;
public:
void lock() {
while (lockFlag.test_and_set(std::memory_order_acquire)) {
// Spin-wait (busy wait) until the lock is released
}
}
void unlock() {
lockFlag.clear(std::memory_order_release);
}
};
void exampleFunction(SpinLock& spinLock) {
spinLock.lock();
std::cout << 123 << "234" << std::endl;
spinLock.unlock();
}
int main() {
SpinLock spinLock;
std::thread t1(exampleFunction, std::ref(spinLock));
std::thread t2(exampleFunction, std::ref(spinLock));
t1.join();
t2.join();
return 0;
}
 
 
 
 
 
 
 
 
 
 
 
友情链接: 超理论坛 | ©小时科技 保留一切权利