C++并发和同步
1. 概述
C++标准的同步工具:
- 条件变量
- std::condition_variable
- std::condition_variable_any
- future
- latch
- barrier
- 条件变量
Qt中的对应物:
- QWaitCondition
- QFuture
- QPromise
- QFutureSynchronizer
- QFutureWatcher
2. 条件变量
条件变量必须配合互斥量一起使用.
两个条件变量:
std::condition_variable
: 必须配合std::mutex
一起使用.std::condition_variable_any
: 只要某一类型符合称为互斥量的最低标准, 就可以使用.条件变量有
notify_one()
和notify_all()
两个方法, 适用于不同的场景.有些特定的情况下, 可能
std::call_once()
更简单, 使用条件变量过于复杂. 例如初始化若线程只需要等待一次, 可能使用
future
更合适.在生产者中, 生产后用条件变量的
notify_one()
通知消费者在消费者中:
- 先用
std::unique_lock
对互斥量加锁 - 使用条件变量的
wait()
, 传入锁和一个predict来表达等待成立的条件 - 执行后, 对互斥量使用
unlock
解锁.
注意点是,
- 消费者在等待期间, 必须将互斥量解锁, 等待结束后, 要重新加锁. 所以这里要使用
unique_lock
. 当然也可以直接用mutex
(Qt中就这样做, 因为没有对应unique_lock的等价物) - 要注意伪唤醒的影响, 因此条件变量中的predict对象不应该有副作用.
- 先用
其他
C++11中还有一个notify_all_at_thread_exit
.
2.1 利用条件变量构造一个线程安全的队列
定义其接口:
1 | template<typename T> |
在上面的代码中, 在
wait_and_pop
中, 我们没有显示释放lk. 因为这里只有一个队列的pop操作, 不需要提前pop. 这里, 实际上, 甚至也不需要使用unique_lock
, 简单的使用lock_guard
应该也可以.
3. future
C++11开始提供的future相关的类, 位于头文件<future>
中.
std::future
, 独占future, 一个事件只允许关联唯一一个future实例std::shared_future
, 一个事件可以关联多个shared_future实例.async
: 按照异步方式启动一个任务(函数), 并返回std::future
的结果std::promise
packaged_task
lanuch
future_status
3.1 使用std::promise
promise表示一个异步执行的返回值, 它在future中履行.
我们可以定义一对绑定的对象s=f(p,f)
, 其中, p
是状态s
的一个可写视图, 即promise
, 它可以设置为一个特定的值, 并且只能完成一次.
Qt的QPromise可以多次
setValue()
.
而f
是状态s
的一个可读视图, 能够在promise
发出信令之后访问.
从而, 将promise
和future
作为主线程和派生线程之间的异步机制.
简单的使用方式:
- 在主线程中定义一个
promise
对象p
, 并使用std::future<T> f=g.get_future()
分配与p
相关联的future
. - 通过
std::promise<T>&&
传递promise
对象p
, 将其从主线程移动到派生线程.p
必须使用std::move(p)
. - 在派生线程中使用
p.set_value()
设置对应的值. - 在主线程中使用
f.get()
而读出future f
. 主线程会阻塞进行, 直到f
从p
得到通知.
下面的例子会使用多线程, 每个线程独立迭代计算一个Fibonacci的值—-故意这么实现, 以消耗一点计算时间:
1 | template<typename value_t, typename index_t> |
3.2 使用std::packaged_task<>封装任务
std::packaged_task<>
是对任务构造对象的一个简化. 它返回一个带有对应的future
对象的任务. 从而可以简化使用.
它关联了future和函数. 当它被执行时, 它会调用关联的函数或可调用对象, 并将结果保存为future
中, 并令future
准备就绪.
它本身是一个可调用对象, 可以直接调用, 或者封装到std::function
中.
基本描述:
- 它是一个对象, 可以传递
- 它是一个可调用对象, 可以被执行.
例如, 假定有一个函数comp
:
1 | bool comp(float value, int64_t threshold){ |
我们可以这样创建任务使用:
1 | // 创建task, 并得到其future |
使用技巧: 构造一个工厂模板来使用packaged_task:
1 |
|
从C++14开始, 我们可以用
result_of_t<>
来代替result_of<>::type
result_of
和result_of_t
在C++17中被deprecated, 从C++20开始被removed. 不过在VC2022中还留着, 反而invoke_result
倒是编译失败. 不知道是怎么回事.
下面的代码使用make_task
来计算一组Fibonacci的值:
1 | template< |
3.3 使用async
创建任务
C++11提供了std::async()
来简单地创建任务对象.
几个注意的地方:
不一定总是创建新的线程:
可以给std::async()
额外指定一个std::lanuch
类型的参数, 指定是启动新线程执行, 还是在主线程中同步执行任务:std::lanuch::defered
, 表示在调用线程上延迟执行, 相当于在future上面调用了wait()
或get()
.std::lanuch::async
, 表示在另外的线程上执行.
如果没有通过
future.get()
访问对应的future, 则任务可能永远得不到执行需要特别关注future的定义域, 避免任务串行执行. 典型的, 如果调用了future的析构函数, 任务就会同步. 例如, 下面的代码都会串行化:
1 | // future的析构函数导致的串行 |
1 | void test3() |
参考:
QtConcurrent::run和QtConcurrent::task.std::async
有些类似QtConcurrent::run()
和QtConcurrent::task()
. 都是返回一个future对象. 基本上可以互相参考使用.
3.5 future和异常
线程中发生了异常, 会将异常保存到future里面.
3.4 多个线程一起等待
这里指的是多个线程一起等待某一个future
, 而不是同时等待多个future
. 那个是barrier
std::future
仅能被一个线程使用, 其get()
好像仅能被调用一次
在cpprefreence中是这么描述的: The value v stored in the shared state, as std::move(v)
要多个线程等待, 需要使用std::shared_future
.std::shared_future
从std::future
得到. 且支持operator =
.
也可以用std::future
的share()
方法得到.
目前还没有遇到过需要这个东西的地方.
在Qt中也没有看到过等价的东西.
其实, 走到这个地步, 使用shared_future
是不是有价值大部分时候也值得商榷一下. 可能还不如直接用条件变量的notify_all()
更直观.
3.5 链式风格
3.5.1 std::experimental中的支持
主要技术点:
在
std::experimental
里面, 有std::experimental::future
和std::experimental::promise
两个.提供
then
方法.其结果保存在
std::experimental::future
内部, 且只能被取出最多一次. 它被传递个下一级的when
里面的线程函数, 并返回新的future
.也就是说, 下面的代码
1
2
3
4std::experimentalfuture<int> fun_1();
std::experimentalfuture<std::string> func_2(std::experimentalfuture<int> func);
auto result = func1().then(func_2);其中,
func_1()
是第一个被调用的线程, 它的返回值是std::experimental::future<int>
, 则第二个线程函数func_2
的参数就必须是fun_1()
的返回值类型. 依次类推.
通过future
传递参数而不是提取值作为参数的主要考虑是便于异常传递, 这样库不需要考虑线程的处理, 而是交给下一级的线程(称为后续(continuation))来处理.
参见备注, Qt的
QFuture
使用的就是后者. 个人认为这里的方法相对于Qt有笨又蠢又难看, 是一种很差很差的工程实践. 感觉这些人有点走火入魔
实现一个类似于std::async()
的返回std::experimental::future
的工厂函数:
1 | template <typename Func> |
下面例子中, 将一个登录流程拆分后:
1 | std::experimental::future<void> process_login(std::string const& username, std::string const& password) |
在上面的代码中, 异常会沿着调用链向外传递, 在最后的函数中统一处理异常.
评论: Qt对链式处理的支持
从Qt6.0开始, QFuture
里面增加了后续的处理支持. 相对于C++标准库, 我觉得QFuture
提供的接口设计得更好, 更人性化:
QFuture
提供了三类函数:
onCanceled()
onFailed()
then()
每个函数又有多种方式.
但是与std::experimental::future
不同, 在QFuture
中的后续函数的参数并不限定必须是future
, 同样也可以是它里面包含的实际内容. 相应的, onFailed()
则用于处理链式处理中的所有异常.
这种用法, 相当于将前面的例子process_login()
中最后一个后续里面的catch
部分给拿出来放到onFailed()
函数里面, 解耦得更漂亮, 我们看Boot的库, Leaf
也是这种风格. 它的可维护性要比std::experimental
里面的做法要好看的多.
这一点上, Qt要比std::experimental::future
要灵活得多. 个人不喜欢将future作为参数的做法, 太长了, 虽然可以用auto
.
如果软件不允许使用异常 (比如, 很多嵌入式系统中), 则可以考虑将错误状态作为QFuture类型的一部分传递给QFuture
.
例如, 下面的代码, 我们用std::variant
将错误标记和正常的结果封装在一起使用.
不能使用
QVariant
, 因为后者不能携带类型信息.
1 | using NetworkReply = std::variant<QByteArray, QNetworkReply::NetworkError>; |
则:
1 | QFuture<IOResult> future = QtConcurrent::run([url] { |
和前面的std::experimental::future
的then
比起来, 这里的代码更像是人话.
一个完整的链式处理流程大概是这样的:
1 | QFuture<int> testFuture = ...; |
根据testFuture
的不同结果, 会有不同的处理分支:
如果执行成功, 则走到
Block1
. 如果Block1
执行成功, 则会走到下一个then
处–Block4
如果
testFuture
被cancel了或失败了, 则分别走到Block2
或Block3
处, 然后仍然会走到下一个then
的Block4
处如果在
Block2
中又抛出了异常, 则会走到Block3
里面如果在代码中
Block3
所在的onFailed
放到了Block2
所在的onCanceled
的前面, 那么Block2
中抛出的异常会被传递到后面的那个onFailed
里面, 即Block5
如果我们将第一个
onCanceled
给去掉, 如下:
1 | QFuture<int> testFuture = ...; |
如果在testFuture
里面cancel
了操作, 那么仍然会走到Block4, 并走到Block6
中.
Qt得QtFuture::Launch
比std里面得launch
要多了一个取值:
- Sync: 在调用这得线程上执行, 大概类似于
std::launch::deferred
- Async: 在独立得线程上执行, 等于
std::launch::async
. - Inherit: 在它attatch的future所在的线程上执行
比如, 下面的代码, 两个后续都在同一个独立线程上执行.
1 | QFuture<int> future = ...; |
unwarp()
以后要格外关注一下Qt6.4新增的unwarp()
函数. 这个新的函数没有用过.
QtFuture::makeReadyFuture(...)
用于创建一个已经完成的future. 在std里面好像没有对应的东西.
我记得C#的Task里面有一个相同概念的东西, 叫什么名字记不得了, 也是异步函数返回立即结果.
3.6 等待多个future
std::experimental
提供了when_all()
和when_any()
两个函数.
Qt
在Qt中, 从Qt6.3 开始还提供了全局函数:
QtFuture::whenAll
QtFuture::whenAny
实现了相同的功能.
Qt中, 在Qt6.3之前, 如果要实现when_all功能, 我们需要一个QFutureSynchronizer
, 使用waitForFinished()
.
when_any就比较麻烦了. 一种做法或许是使用QFutureSynchronizer::futures()
, 并且connect到后续上面? 反正很麻烦.
这个做法行不行还不好说, 因为
futures()
好像得到的list好像是一个副本了? 以后有机会要试一下才知道.
3.7 latch和barrier
std::experiment::latch
是一个包含计数器的同步对象, 一旦计数器减到0, 就进入就绪状态.
- 使用
count_down()
令计数器减一 - 使用
wait()
等待其就绪 - 使用
is_ready()
检查它是否就绪
这个东西其实就是信号量? 或者说是轻量级的信号量?
std::experimental::barrier
可以指定一组线程阻塞等待在某处, 当达到满足条件的线程数量的时候, 接着运行指定的内容.
这个主要的用途是串行化一组工作线程. 尤其是当只需要一部分线程完成就可以继续的情况.
latch和barrier在Qt里面目前为止还没有看到等价物. 或许信号量就是latch的对等东西? 但是Qt的信号量好像是系统级的同步对象, 不像.NET还有Slim和普通的信号量. 至于Barrier, 如果是等待所有线程完成, 可以用Synchronizer. 如果是部分, 就找不到直接的做法了. 这也说明这个东西其实很小众, 小众到活了这么多年也没有需求:-)
4. std和Qt比较
Qt中和std::future
对应的是QFuture
, 暂时没有找到和std::shared_future
对应的东西, 也没有遇到使用的场景. Qt提供的Watcher, Synchronizer等可以实现类似的需求.
Qt提供了对应于std::promise
的QPromise
, 但是有一些不同的地方. 总体来说, QPromise
要比std::promise
强大和丰富的多, 尤其适合GUI场景使用.
std::promise
只能执行一次set_result()
, 而QPromise
可以多次addResult()
, 设置多个结果, 并且使用resultAt()
获取不同的结果. 对应的,QPromise
提供了finish()
, 表征计算结束.addResult()
函数可以指定位置.QPromise
提供了start()
和suspendIfRequested()
QPromise
还提供了setProgressRange()
和setProgressValue()
方法.
Qt的QPromise
也是moveonly的对象, 这一点和std::promise
一样. Qt建议使用QSharedPointer
使用QPromise
有两种途径: 一种是使用底层的QThread
, 另一种是使用QtConcurrent
. 对于简单的工作线程, 后者一般更简便一些. 而对于更复杂的情况, 尤其是涉及到交互, 更好的做法还是moveToThread
的做法.
QtConcurrent::run和QtConcurrent::task比较
Qt在Concurrent这块的变化一直比较大, 尤其是Qt5和Qt6之间. Qt5的代码很容易编译失败. 一方面是接口变化, 一方面是编译器的语法检查更严格(尤其是从VC2017迁移到VC2022).
run
返回QFuture
,task
返回QTaskBuilder
.run
的线程会立即执行, 而task
创建后需要手工启动执行, 例如, 利用spawn()
, 启动后返回的也是QFuture
. 大概可以认为,QtConcurrent::run(...)
和QtConcurrent::task(...).spawn()
是等价的.run
的参数是在调用run
的时候提供的, 而task
创建的线程, 通过QTaskBuild
的方法withArguments()
方法来提供.