生产者和消费者的实现
生产者和消费者
这里讨论多线程下的生产者消费者, 尤其是多生产者-单消费者模式. 单线程的生产者-消费者模式最好的语法糖是协程. 这个以后再说.
1. C++/Qt
生产者-消费者问题, 基本的模式是需要有一个共享的数据结构, 生产者向里面加入数据, 消费者从里面取出数据. 在多线程/多进程模式下, 要考虑的问题有两点:
- 数据保护
- 状态的通知
在绝大多数工程场景下, 我们一般不用考虑公共缓冲区的大小, 也就是假定生产者可以不受约束的向里面加入数据. 那么剩下的问题就是消费者.
最简单的做法, 就是网上大部分的例子一样, 一个忙等待:
1 | while(true) |
这样的问题在于, 消费者线程是没有被阻塞的, 即使是队列为空, 消费者线程也是在不断地轮询检查, 始终占用CPU资源. 尤其是消耗互斥量内核资源.
我们真正需要的模式应该是这样的:
- 如果队列为空, 那么线程等待
- 当队列有内容了, 线程就被唤醒并处理数据
这种情况下就需要条件变量了. 条件变量是POSIX中提出的东西, 在Windows下, 要Windows XP之后的版本才能支持(Win10, 比如). 也能找到对应的等价物: SleepConditionVariableCS
, WakeConditionVariable
等. Qt下, 则可以使用QWaitCondition
.
为此, 我们首先声明一个条件变量:
1 | QWaitCondition _bufferNotEmpty; |
1.1 生产者:
1 | ... |
1.2 消费者:
1 | ... |
1.3 使用条件变量的注意事项
条件变量受到假唤醒(spurious wakeups)和偷唤醒(stolen wakeup)的影响.
- 假唤醒: 不是被一个线程显式唤醒
- 偷唤醒: 另一个线程在被唤醒的线程之前运行了.
因此,在睡眠操作返回后,应重新检查通常位于 while 循环) 中的谓词。这是一种通用的编码模式. 就比如上面的:1
2
3
4while(_messageList.empty())
{
_bufferNotEmpty.wait(&_mutex);
}
2. Win32/C++
Windows在WinXP/Windows Server 2003之后提供了条件变量(WinXP和Windows Server 2003不支持).
- InitializeConditionVariable(): 创建条件变量
- SleepConditionVariableCS():
- WakeConditionVariable():
- WakeAllConditionVariable():
- SleepConditionVariableSRW():
2.1 公共数据:
1 |
|
2.2 生产者线程
下面是一个支持有限长度缓冲区的生产者线程的核心代码:
1 | while(...) |
2.3 消费者线程
下面是对应的消费者线程. 它会检测缓冲区是否为空, 如果为空, 就释放临界区并等待睡眠.
1 | while(true) |
3. Linux
Linux下面最简单的做法是使用pthread函数. 记得Linux也有local函数的?
4. C#
在.NET里面, 迄今没有看到条件变量, 不知是因为这个概念太深奥, 没有资料介绍, 还是说它被封装到别的地方了. 一般资料里面都说使用Event
来通知唤醒. 但是休眠这件事情就不知道怎么使用了.
在.NET中, 提供了BlockingCollection
. 在微软的资料里面说, 它就是专门用来实现生产者-消费者模式的数据结构.
BlockingCollection
提供了三种操作方式:
- 使用
Add
和Take
- 使用
TryAdd
和TryTake
- 在
foreach
中使用BlockingCollection<T>.GetConsumingEnumerable()
另外, 它还提供了CompleteAdding()
接口, 用来标识生产者完成了添加操作. 这个方法极为方便, 省的我们自己实现队列的时候自己实现这个标志了. 当设置了完成后, 再使用Add()
,Take()
, Try
操作的时候, 会抛出异常InvalidOperationException
异常来.
MSDN中给出来的例子中, 似乎是
Take
会在队列为空的时候阻塞,Add
会在队列满的时候阻塞, 也就是不需要自己使用条件变量来进行休眠和通知了. 而TryAdd
和TryTake
则不会这么做, 所以会是忙循环的操作. 但是GetConsumingEnumerable()
是怎么做的, 就没有看到…
4.1 使用Add
和Take
实现的生产者-消费者:
生产者:
1 | for(...) |
使用TryAdd
的实现:
消费者
1 | while(!buffer.IsCompleted) |
4.2 使用TryAdd
和TryTake
实现的生产者-消费者:
当使用TryAdd
或TryTake
时, 若集合为空, 或者已满, 会发挥false.
下面的例子中还演示了取消操作:
生产者
1 | while(...) |
消费者
1 | while(!buffer.IsComplete) |
4.3 使用foreach
和GetConsumingEnumerable
使用GetConsumingEnumerable()
, 在获取数据的同时从队列中删除数据.
则消费者可以这样实现:
1 | foreach(var item in buffer.GetConsumingEnumerable()) |
另外, 按照MSDN的说法, 如果你只是想枚举集合而不修改, 可以仅使用foreach
. 但是这个时候得到的是执行时的数据快照.
不过按照这个说法, 它至少能够保证数据的完整性, 即使其他线程增删了数据, 也不会影响这里. 不过这个很难测试.
4.4 CancelToken
我们可以使用另外一个线程来处理Token. 例如在前面的TryAdd
的例子中, 我们可以定义一个线程
1 | var cts = new CancellationTokenSource(); |
注意的是, 要记得释放掉cts
:
1 | cts.Dispose(); |