C++ 多线程笔记(std::thread)

                     

贡献者: addis

   例程(编译时要给 linker 加上 -l pthread):

#include <thread>
#include <mutex>
#include <chrono>
using namespace std;

mutex mut;

// A dummy function
void myfun(int id, int *i)
{
    if (id == 1)
        this_thread::sleep_for(chrono::milliseconds(100));
    lock_guard<mutex> guard(mut);
    printf("id = %d, i = %d\n", id, *i);
    int j = *i+1;
    if (id == 2)
        this_thread::sleep_for(chrono::milliseconds(100));
    *i = j;
}

int main()
{
    int i = 0;
    thread th1(myfun, 1, &i);
    thread th2(myfun, 2, &i);
    myfun(0, &i);
    th1.join();
    th2.join();
    return 0;
}

   另外也可以把 mut 声明为 myfun() 中的一个 static 变量。

   另外注意 mutex 不光适用于 std::thread,对任何其他 threading 库例如 pthread 或者 OpenMP 都有效。

  

未完成:互斥锁、条件锁、读写锁以及自旋锁分别什么时候用?

thread_local 变量

1. std::condition_variable

#include <iostream>
#include <thread>
#include <mutex>
#include <condition_variable>
#include <queue>

using namespace std;
mutex mtx;
condition_variable cv_producer;
condition_variable cv_consumer;
queue<int> buffer;
const unsigned int max_buffer_size = 10;
int product_id = 0;

void producer(int id) {
    for (int i = 0; i < 4; ++i) {
		unique_lock<mutex> lck(mtx); // locked
		int this_product_id = product_id++;
		cout << "+ Producer [" << id << "] producing "
            << this_product_id << "\n";
		lck.unlock();
		// do some heavy work here in parallel
        this_thread::sleep_for(chrono::milliseconds(200));
		lck.lock();
        auto wake_cond = []{ return buffer.size() < max_buffer_size; };
        // wakes when notified and wake_cond is true
        // do nothing (no waiting, no unlock) if condition is already true
        // 如果 wait() 第一次检查 wake_cond 不要求上锁,那么可能在检查到休眠之间,
        // 别人生产了一个且发送了无效的 notify,那这里就可能醒不来了。
        // 所以 buffer 必须要在 lock 的时候修改和检查
        cv_producer.wait(lck, wake_cond);
        // lock re-acquired on wake (or ramain locked)
        // followings not thread-safe without lock
        // buffer might already be full if not locked
        buffer.push(this_product_id);
        cout << "  Producer [" << id << "] produced "
            << this_product_id << "\n";
        // guaranteed waking at least one consumer (lock unnecessary)
        // do nothing is all consumers are awake
        cv_consumer.notify_one();
        // destructor unlocked loc
    }
}

void consumer(int id) {
    for (int i = 0; i < 2; ++i) {
        unique_lock<mutex> lck(mtx); // locked
        auto wake_cond = []{ return !buffer.empty(); };
        // wakes when notified and wake_cond is true
        // do nothing (no waiting, no unlock) if condition is already true
        cv_consumer.wait(lck, wake_cond);
        // lock re-acqured on wake (or ramain locked)
        // not thread-safe if unlocked
        // (buffer might be poped by others and empty if unlocked)
        int item = buffer.front();
        buffer.pop();
        cout << "- Consumer [" << id << "] received " << item << "\n";
        // loc.unlock();
        // do some heavy work here in parallel
        this_thread::sleep_for(chrono::milliseconds(200));
        // loc.lock();
        cout << "  Consumer [" << id << "] processed " << item << "\n";
        cv_producer.notify_one(); // notify one producer
        // destructor unlocked loc
    }
}

int main() {
    thread producers[2];
    thread consumers[4];

	int tid = 0;
    for (int i = 0; i < 2; ++i)
        producers[i] = thread(producer, tid++);

    for (int i = 0; i < 4; ++i)
        consumers[i] = thread(consumer, tid++);

    for (auto& p : producers) p.join();
    for (auto& c : consumers) c.join();

	cout << "final buffer size: " << buffer.size() << endl;

    return 0;
}

2. 自旋锁

   c++ 没有定义自旋锁,我们可以简单实现一个:

#include <atomic>
#include <thread>
#include <iostream>

class SpinLock {
private:
    std::atomic_flag lockFlag = ATOMIC_FLAG_INIT;

public:
    void lock() {
        while (lockFlag.test_and_set(std::memory_order_acquire)) {
            // Spin-wait (busy wait) until the lock is released
        }
    }

    void unlock() {
        lockFlag.clear(std::memory_order_release);
    }
};

void exampleFunction(SpinLock& spinLock) {
    spinLock.lock();
    std::cout << 123 << "234" << std::endl;
    spinLock.unlock();
}

int main() {
    SpinLock spinLock;
    std::thread t1(exampleFunction, std::ref(spinLock));
    std::thread t2(exampleFunction, std::ref(spinLock));

    t1.join();
    t2.join();

    return 0;
}

                     

© 保留一切权利