1. 概述

  • C++标准的同步工具:

    • 条件变量
      • std::condition_variable
      • std::condition_variable_any
    • future
    • latch
    • barrier
  • Qt中的对应物:

    • QWaitCondition
    • QFuture
    • QPromise
    • QFutureSynchronizer
    • QFutureWatcher

2. 条件变量

条件变量必须配合互斥量一起使用.
两个条件变量:

  • std::condition_variable: 必须配合std::mutex一起使用.

  • std::condition_variable_any: 只要某一类型符合称为互斥量的最低标准, 就可以使用.

  • 条件变量有notify_one()notify_all()两个方法, 适用于不同的场景.

  • 有些特定的情况下, 可能std::call_once()更简单, 使用条件变量过于复杂. 例如初始化

  • 若线程只需要等待一次, 可能使用future更合适.

  • 在生产者中, 生产后用条件变量的notify_one()通知消费者

  • 在消费者中:

    1. 先用std::unique_lock对互斥量加锁
    2. 使用条件变量的wait(), 传入锁和一个predict来表达等待成立的条件
    3. 执行后, 对互斥量使用unlock解锁.
      注意点是,
    • 消费者在等待期间, 必须将互斥量解锁, 等待结束后, 要重新加锁. 所以这里要使用unique_lock. 当然也可以直接用mutex(Qt中就这样做, 因为没有对应unique_lock的等价物)
    • 要注意伪唤醒的影响, 因此条件变量中的predict对象不应该有副作用.

其他
C++11中还有一个notify_all_at_thread_exit.

2.1 利用条件变量构造一个线程安全的队列

定义其接口:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
template<typename T>
class threadsafe_queue
{
private:
// 这里必须将_mutex设置为mutable, 才能在empty()和拷贝构造函数中使用.
mutable std::mutex _mutex;
std::queue<T> _data_queue;
std::condition_variable _condition;

public:
threadsafe_queue(){}
threadsafe_queue(threadsafe_queue const& other)
{
std::lock_guard<std::mutex> lk(other._mutex);
_data_queue = other._data_queue;
}
void push(T new_value)
{
std::lock_guard<std::mutex> lk(_mutex);
_data_queue.push(new_value);
_condition.notify_one();
}

bool try_pop(T& value)
{
std::lock_guard<std::mutex> lk(_mutex);
if(_data_queue.empty)
{
return false;
}
value = _data_queue.front();
_data_queue.pop();
return true;
}

std::shared_pop<T> try_pop()
{
std::lock_guard<std::mutex> lk(_mutex);
if(_data_queue.empty)
{
return std::shared_ptr<T>();
}
std::shared_ptr<T> res(std::make_shared<T>(_data_queue.front()));
_data_queue.pop();
return res;
}

void wait_and_pop(T& value)
{
std::unique_lock<std::mutex> lk(_mutex);
_condition.wait(lk, [this](){return !_data_queue.empty();});
value = _data_queue.front();
_data_queue.pop();
}
std::shared_pop<T> wait_and_pop()
{
std::unique_lock<std::mutex> lk(_mutex);
_condition.wait(lk, [this](){return !_data_queue.empty();});
std::shared_ptr<T> res(std::make_shared<T>(_data_queue.front()));
_data_queue.pop();
return res;
}

bool empty() const
{
std::lock_guard<std::mutex> lk(_mutex);
return _data_queue.empty();
}
};

在上面的代码中, 在wait_and_pop中, 我们没有显示释放lk. 因为这里只有一个队列的pop操作, 不需要提前pop. 这里, 实际上, 甚至也不需要使用unique_lock, 简单的使用lock_guard应该也可以.

3. future

C++11开始提供的future相关的类, 位于头文件<future>中.

  • std::future, 独占future, 一个事件只允许关联唯一一个future实例
  • std::shared_future, 一个事件可以关联多个shared_future实例.
  • async: 按照异步方式启动一个任务(函数), 并返回std::future的结果
  • std::promise
  • packaged_task
  • lanuch
  • future_status

3.1 使用std::promise

promise表示一个异步执行的返回值, 它在future中履行.
我们可以定义一对绑定的对象s=f(p,f), 其中, p是状态s的一个可写视图, 即promise, 它可以设置为一个特定的值, 并且只能完成一次.

Qt的QPromise可以多次setValue().
f是状态s的一个可读视图, 能够在promise发出信令之后访问.
从而, 将promisefuture作为主线程和派生线程之间的异步机制.

简单的使用方式:

  1. 在主线程中定义一个promise对象p, 并使用std::future<T> f=g.get_future()分配与p相关联的future.
  2. 通过std::promise<T>&&传递promise对象p, 将其从主线程移动到派生线程. p必须使用std::move(p).
  3. 在派生线程中使用p.set_value()设置对应的值.
  4. 在主线程中使用f.get()而读出future f. 主线程会阻塞进行, 直到fp得到通知.

下面的例子会使用多线程, 每个线程独立迭代计算一个Fibonacci的值—-故意这么实现, 以消耗一点计算时间:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
template<typename value_t, typename index_t>
void fibo(index_t n, std::promise<value_t>&& promise)
{
value_t val0 = 0;
value_t val1 = 1;
for (index_t index = 0; index < n; ++index)
{
const value_t tmp = val0;
val0 = val1;
val1 += tmp;
}
// 第三步, 通过set_value设置promise
promise.set_value(val0);
}

int main()
{
std::cout << "Hello World!\n";
const uint64_t num_threads = 32;
std::vector<std::thread> threads;
std::vector<std::future<uint64_t>> results;

for (uint64_t id = 0; id < num_threads; ++id)
{
// 第一步, 定义promise对象, 并通过get_future()获取其future对象
std::promise<uint64_t> promise;
results.emplace_back(promise.get_future());
// 第二步, 将promise移动到派生线程中
threads.emplace_back(fibo<uint64_t, uint64_t>, id, std::move(promise));
}

for (auto& result : results)
{
// 第4步, future利用get()获取promise的结果
std::cout << "result: " << result.get() << std::endl;
}

for (auto& thread : threads)
{
thread.join();
}

std::cout << "Finished" << std::endl;
}

3.2 使用std::packaged_task<>封装任务

std::packaged_task<>是对任务构造对象的一个简化. 它返回一个带有对应的future对象的任务. 从而可以简化使用.

它关联了future和函数. 当它被执行时, 它会调用关联的函数或可调用对象, 并将结果保存为future中, 并令future准备就绪.
它本身是一个可调用对象, 可以直接调用, 或者封装到std::function中.

基本描述:

  1. 它是一个对象, 可以传递
  2. 它是一个可调用对象, 可以被执行.

例如, 假定有一个函数comp:

1
2
3
bool comp(float value, int64_t threshold){
return value < threshold;
}

我们可以这样创建任务使用:

1
2
3
4
5
6
7
// 创建task, 并得到其future
std::packaged_task<bool(float, int64_t)> task(comp);
auto future = task.get_future();
// 执行task:
task(value, threshold);
// 获取执行结果:
std::cout << future.get() << std::endl;

使用技巧: 构造一个工厂模板来使用packaged_task:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15

template<
typename Func,
typename ... Args,
//typename Rtn = typename std::result_of<Func(Args...)>::type>
typename Rtn = typename std::result_of_t<Func(Args...)>>
auto make_task(
Func&& func,
Args&& ...args) -> std::packaged_task<Rtn(void)>
{
auto aux = std::bind(std::forward<Func>(func), std::forward<Args>(args)...);
auto task = std::packaged_task<Rtn(void)>(aux);
return task;
}

从C++14开始, 我们可以用result_of_t<>来代替result_of<>::type
result_ofresult_of_t在C++17中被deprecated, 从C++20开始被removed. 不过在VC2022中还留着, 反而invoke_result倒是编译失败. 不知道是怎么回事.

下面的代码使用make_task来计算一组Fibonacci的值:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
template<
typename Func,
typename ... Args,
//typename Rtn = typename std::result_of<Func(Args...)>::type>
typename Rtn = typename std::result_of_t<Func(Args...)>>
auto make_task(
Func&& func,
Args&& ...args) -> std::packaged_task<Rtn(void)>
{
auto aux = std::bind(std::forward<Func>(func), std::forward<Args>(args)...);
auto task = std::packaged_task<Rtn(void)>(aux);
return task;
}

uint64_t fibo2(uint64_t n)
{
uint64_t val0 = 0, val1 = 1;
for (uint64_t index = 0; index < n; ++index)
{
const uint64_t tmp = val0; val0 = val1; val1 += tmp;
}
return val0;
}

void test2()
{
std::cout << "test by make_task and packaged_task" << std::endl;

const int64_t num_threads = 32;
std::vector<std::thread> threads;
std::vector<std::future<uint64_t>> results;

for (auto id = 0; id < num_threads; ++id)
{
auto task = make_task(fibo2, id);
results.emplace_back(task.get_future());
threads.emplace_back(std::move(task));
}

for (auto& result : results)
{
std::cout << "result: " << result.get() << std::endl;
}

for (auto& thread : threads)
{
thread.join();
}

std::cout << "Finished" << std::endl;
}

int main()
{
std::cout << "Hello World!\n";
test2();
}

3.3 使用async创建任务

C++11提供了std::async()来简单地创建任务对象.

几个注意的地方:

  • 不一定总是创建新的线程:
    可以给std::async()额外指定一个std::lanuch类型的参数, 指定是启动新线程执行, 还是在主线程中同步执行任务:

    • std::lanuch::defered, 表示在调用线程上延迟执行, 相当于在future上面调用了wait()get().
    • std::lanuch::async, 表示在另外的线程上执行.
  • 如果没有通过future.get()访问对应的future, 则任务可能永远得不到执行

  • 需要特别关注future的定义域, 避免任务串行执行. 典型的, 如果调用了future的析构函数, 任务就会同步. 例如, 下面的代码都会串行化:

1
2
3
4
5
6
7
8
9
10
// future的析构函数导致的串行
for(int id=0; id<num_threads; ++id>)
{
auto future = std::async(std::launch::async, fibo, id);
}
// 临时对象的析构同样导致串行
for(int id=0; id<num_threads; ++id>)
{
std::async(std::launch::async, fibo, id);
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
void test3()
{
std::cout << "test by std::async" << std::endl;

int num_threads = 32;
std::vector<std::future<uint64_t>> results;
for (auto id = 0; id < num_threads; ++id)
{
auto task = make_task(fibo2, id);
results.emplace_back(
std::async(std::launch::async, fibo2, id)
);
}

for (auto& result : results)
{
std::cout << "result: " << result.get() << std::endl;
}
}

参考:
QtConcurrent::run和QtConcurrent::task.
std::async有些类似QtConcurrent::run()QtConcurrent::task(). 都是返回一个future对象. 基本上可以互相参考使用.

3.5 future和异常

线程中发生了异常, 会将异常保存到future里面.

3.4 多个线程一起等待

这里指的是多个线程一起等待某一个future, 而不是同时等待多个future. 那个是barrier

std::future仅能被一个线程使用, 其get()好像仅能被调用一次

在cpprefreence中是这么描述的: The value v stored in the shared state, as std::move(v)

要多个线程等待, 需要使用std::shared_future.
std::shared_futurestd::future得到. 且支持operator =.
也可以用std::futureshare()方法得到.

目前还没有遇到过需要这个东西的地方.
在Qt中也没有看到过等价的东西.
其实, 走到这个地步, 使用shared_future是不是有价值大部分时候也值得商榷一下. 可能还不如直接用条件变量的notify_all()更直观.

3.5 链式风格

3.5.1 std::experimental中的支持

主要技术点:

  • std::experimental里面, 有std::experimental::futurestd::experimental::promise两个.

  • 提供then方法.

  • 其结果保存在std::experimental::future内部, 且只能被取出最多一次. 它被传递个下一级的when里面的线程函数, 并返回新的future.

    也就是说, 下面的代码

    1
    2
    3
    4
    std::experimentalfuture<int> fun_1();
    std::experimentalfuture<std::string> func_2(std::experimentalfuture<int> func);

    auto result = func1().then(func_2);

    其中, func_1()是第一个被调用的线程, 它的返回值是std::experimental::future<int>, 则第二个线程函数func_2的参数就必须是fun_1()的返回值类型. 依次类推.

通过future传递参数而不是提取值作为参数的主要考虑是便于异常传递, 这样库不需要考虑线程的处理, 而是交给下一级的线程(称为后续(continuation))来处理.

参见备注, Qt的QFuture使用的就是后者. 个人认为这里的方法相对于Qt有笨又蠢又难看, 是一种很差很差的工程实践. 感觉这些人有点走火入魔

实现一个类似于std::async()的返回std::experimental::future的工厂函数:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
template <typename Func>
std::experimental::future<decltype(std::declval<Func>()())>
spawn_async(Func&& func)
{
std::experimental::promise<decltype(std::declval<Func>()())> p;
auto res = p.get_future();

std::thread t([p=std::move(p), f=std::decay_t<Func>(func)]() mutable{
try
{
p.set_value_at_thread_exit(f());
}
catch(...)
{
p.set_exception_at_thread_exit(std::current_exception());
}
});

t.detatch();
return res;
}

下面例子中, 将一个登录流程拆分后:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
std::experimental::future<void> process_login(std::string const& username, std::string const& password)
{
return spaw_async([=](){
return backend.authenticate_user(username, password);
}).then([](std::experimental::future<user_id> id){
return backend.request_current_info(id.get());
}).then([](std::experamental::future<user_data> info_to_display){
try{
update_display(info_to_display.get());
}catch(std::exception& e){
display_error(e);
}
});
}

在上面的代码中, 异常会沿着调用链向外传递, 在最后的函数中统一处理异常.

评论: Qt对链式处理的支持

Qt6.0开始, QFuture里面增加了后续的处理支持. 相对于C++标准库, 我觉得QFuture提供的接口设计得更好, 更人性化:

QFuture提供了三类函数:

  • onCanceled()
  • onFailed()
  • then()
    每个函数又有多种方式.

但是与std::experimental::future不同, 在QFuture中的后续函数的参数并不限定必须是future, 同样也可以是它里面包含的实际内容. 相应的, onFailed()则用于处理链式处理中的所有异常.

这种用法, 相当于将前面的例子process_login()中最后一个后续里面的catch部分给拿出来放到onFailed()函数里面, 解耦得更漂亮, 我们看Boot的库, Leaf也是这种风格. 它的可维护性要比std::experimental里面的做法要好看的多.

这一点上, Qt要比std::experimental::future要灵活得多. 个人不喜欢将future作为参数的做法, 太长了, 虽然可以用auto.

如果软件不允许使用异常 (比如, 很多嵌入式系统中), 则可以考虑将错误状态作为QFuture类型的一部分传递给QFuture.

例如, 下面的代码, 我们用std::variant将错误标记和正常的结果封装在一起使用.

不能使用QVariant, 因为后者不能携带类型信息.

1
2
3
4
using NetworkReply = std::variant<QByteArray, QNetworkReply::NetworkError>;

enum class IOError { FailedToRead, FailedToWrite };
using IOResult = std::variant<QString, IOError>;

则:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
QFuture<IOResult> future = QtConcurrent::run([url] {
...
return NetworkReply(QNetworkReply::TimeoutError);
}).then([](NetworkReply reply) {
// 在这里检查错误, 并返回
if (auto error = std::get_if<QNetworkReply::NetworkError>(&reply))
return IOResult(IOError::FailedToRead);

auto data = std::get_if<QByteArray>(&reply);
// try to write *data and return IOError::FailedToWrite on failure
...
});

auto result = future.result();
// 最后检查结果中是否有错误
if (auto filePath = std::get_if<QString>(&result)) {
// do something with *filePath
else
// process the error

和前面的std::experimental::futurethen比起来, 这里的代码更像是人话.

一个完整的链式处理流程大概是这样的:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
QFuture<int> testFuture = ...;
auto resultFuture = testFuture.then([](int res) {
// Block 1
}).onCanceled([] {
// Block 2
}).onFailed([] {
// Block 3
}).then([] {
// Block 4
}).onFailed([] {
// Block 5
}).onCanceled([] {
// Block 6
});

根据testFuture的不同结果, 会有不同的处理分支:

  • 如果执行成功, 则走到Block1. 如果Block1执行成功, 则会走到下一个then处– Block4

  • 如果testFuture被cancel了或失败了, 则分别走到Block2Block3处, 然后仍然会走到下一个thenBlock4

  • 如果在Block2中又抛出了异常, 则会走到Block3里面

  • 如果在代码中Block3所在的onFailed放到了Block2所在的onCanceled的前面, 那么Block2中抛出的异常会被传递到后面的那个onFailed里面, 即Block5

  • 如果我们将第一个onCanceled给去掉, 如下:

1
2
3
4
5
6
7
8
9
10
11
12
QFuture<int> testFuture = ...;
auto resultFuture = testFuture.then([](int res) {
// Block 1
}).onFailed([] {
// Block 3
}).then([] {
// Block 4
}).onFailed([] {
// Block 5
}).onCanceled([] {
// Block 6
});

如果在testFuture里面cancel了操作, 那么仍然会走到Block4, 并走到Block6中.

Qt得QtFuture::Launch比std里面得launch要多了一个取值:

  • Sync: 在调用这得线程上执行, 大概类似于std::launch::deferred
  • Async: 在独立得线程上执行, 等于std::launch::async.
  • Inherit: 在它attatch的future所在的线程上执行

比如, 下面的代码, 两个后续都在同一个独立线程上执行.

1
2
3
QFuture<int> future = ...;
future.then(QtFuture::Launch::Async, [](int res){ ... })
.then(QtFuture::Launch::Inherit, [](int res2){ ... });

unwarp()

以后要格外关注一下Qt6.4新增的unwarp()函数. 这个新的函数没有用过.

QtFuture::makeReadyFuture(...)用于创建一个已经完成的future. 在std里面好像没有对应的东西.

我记得C#的Task里面有一个相同概念的东西, 叫什么名字记不得了, 也是异步函数返回立即结果.

3.6 等待多个future

std::experimental提供了when_all()when_any()两个函数.

Qt

在Qt中, 从Qt6.3 开始还提供了全局函数:

  • QtFuture::whenAll
  • QtFuture::whenAny

实现了相同的功能.

Qt中, 在Qt6.3之前, 如果要实现when_all功能, 我们需要一个QFutureSynchronizer, 使用waitForFinished().
when_any就比较麻烦了. 一种做法或许是使用QFutureSynchronizer::futures(), 并且connect到后续上面? 反正很麻烦.

这个做法行不行还不好说, 因为futures()好像得到的list好像是一个副本了? 以后有机会要试一下才知道.

3.7 latch和barrier

std::experiment::latch

是一个包含计数器的同步对象, 一旦计数器减到0, 就进入就绪状态.

  • 使用count_down()令计数器减一
  • 使用wait()等待其就绪
  • 使用is_ready()检查它是否就绪

这个东西其实就是信号量? 或者说是轻量级的信号量?

std::experimental::barrier

可以指定一组线程阻塞等待在某处, 当达到满足条件的线程数量的时候, 接着运行指定的内容.

这个主要的用途是串行化一组工作线程. 尤其是当只需要一部分线程完成就可以继续的情况.

latch和barrier在Qt里面目前为止还没有看到等价物. 或许信号量就是latch的对等东西? 但是Qt的信号量好像是系统级的同步对象, 不像.NET还有Slim和普通的信号量. 至于Barrier, 如果是等待所有线程完成, 可以用Synchronizer. 如果是部分, 就找不到直接的做法了. 这也说明这个东西其实很小众, 小众到活了这么多年也没有需求:-)

4. std和Qt比较

Qt中和std::future对应的是QFuture, 暂时没有找到和std::shared_future对应的东西, 也没有遇到使用的场景. Qt提供的Watcher, Synchronizer等可以实现类似的需求.

Qt提供了对应于std::promiseQPromise, 但是有一些不同的地方. 总体来说, QPromise要比std::promise强大和丰富的多, 尤其适合GUI场景使用.

  • std::promise只能执行一次set_result(), 而QPromise可以多次addResult(), 设置多个结果, 并且使用resultAt()获取不同的结果. 对应的, QPromise提供了finish(), 表征计算结束. addResult()函数可以指定位置.
  • QPromise提供了start()suspendIfRequested()
  • QPromise还提供了setProgressRange()setProgressValue()方法.

Qt的QPromise也是moveonly的对象, 这一点和std::promise一样. Qt建议使用QSharedPointer

使用QPromise有两种途径: 一种是使用底层的QThread, 另一种是使用QtConcurrent. 对于简单的工作线程, 后者一般更简便一些. 而对于更复杂的情况, 尤其是涉及到交互, 更好的做法还是moveToThread的做法.

QtConcurrent::run和QtConcurrent::task比较

Qt在Concurrent这块的变化一直比较大, 尤其是Qt5和Qt6之间. Qt5的代码很容易编译失败. 一方面是接口变化, 一方面是编译器的语法检查更严格(尤其是从VC2017迁移到VC2022).

  • run返回QFuture, task返回QTaskBuilder.
  • run的线程会立即执行, 而task创建后需要手工启动执行, 例如, 利用spawn(), 启动后返回的也是QFuture. 大概可以认为, QtConcurrent::run(...)QtConcurrent::task(...).spawn()是等价的.
  • run的参数是在调用run的时候提供的, 而task创建的线程, 通过QTaskBuild的方法withArguments()方法来提供.