生产者和消费者

这里讨论多线程下的生产者消费者, 尤其是多生产者-单消费者模式. 单线程的生产者-消费者模式最好的语法糖是协程. 这个以后再说.

1. C++/Qt

生产者-消费者问题, 基本的模式是需要有一个共享的数据结构, 生产者向里面加入数据, 消费者从里面取出数据. 在多线程/多进程模式下, 要考虑的问题有两点:

  1. 数据保护
  2. 状态的通知

在绝大多数工程场景下, 我们一般不用考虑公共缓冲区的大小, 也就是假定生产者可以不受约束的向里面加入数据. 那么剩下的问题就是消费者.

最简单的做法, 就是网上大部分的例子一样, 一个忙等待:

1
2
3
4
5
6
7
8
9
while(true)
{
mutex.lock();
if(!buffer.empty())
{
// ....
}
mutex.unlock();
}

这样的问题在于, 消费者线程是没有被阻塞的, 即使是队列为空, 消费者线程也是在不断地轮询检查, 始终占用CPU资源. 尤其是消耗互斥量内核资源.

我们真正需要的模式应该是这样的:

  • 如果队列为空, 那么线程等待
  • 当队列有内容了, 线程就被唤醒并处理数据

这种情况下就需要条件变量了. 条件变量是POSIX中提出的东西, 在Windows下, 要Windows XP之后的版本才能支持(Win10, 比如). 也能找到对应的等价物: SleepConditionVariableCS, WakeConditionVariable等. Qt下, 则可以使用QWaitCondition.

为此, 我们首先声明一个条件变量:

1
QWaitCondition  _bufferNotEmpty;

1.1 生产者:

1
2
3
4
5
6
...
{
QMutexLocker locker(&_mutex);
_messageList.append(...);
_bufferNotEmpty.wakeAll();
}

1.2 消费者:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
...
while(true)
{
_mutex.lock();
while(_messageList.empty())
{
_bufferNotEmpty.wait(&_mutex);
}

while(!_messageList.empty())
{
//...
}
//...
_mutex.unlock();
}

1.3 使用条件变量的注意事项

条件变量受到假唤醒(spurious wakeups)和偷唤醒(stolen wakeup)的影响.

  • 假唤醒: 不是被一个线程显式唤醒
  • 偷唤醒: 另一个线程在被唤醒的线程之前运行了.
    因此,在睡眠操作返回后,应重新检查通常位于 while 循环) 中的谓词。这是一种通用的编码模式. 就比如上面的:
    1
    2
    3
    4
    while(_messageList.empty())
    {
    _bufferNotEmpty.wait(&_mutex);
    }

2. Win32/C++

Windows在WinXP/Windows Server 2003之后提供了条件变量(WinXP和Windows Server 2003不支持).

  • InitializeConditionVariable(): 创建条件变量
  • SleepConditionVariableCS():
  • WakeConditionVariable():
  • WakeAllConditionVariable():
  • SleepConditionVariableSRW():

2.1 公共数据:

1
2
3
4
5
6
7
8
9
10
11
#define BUFFER_SIZE 10
#define PRODUcER_SLEEP_TIME_MS 500
#define CONSUMER_SLEEP_TIME_MS 2000

LONG Buffer[BUFFER_SIZE]; // 数据缓冲区
ULONG QueueSize; // 数据缓冲区的大小
ULONG QueueStartOffset;

CONDITION_VARIABLE BufferNotEmpty; // 表征缓冲区有内容
CONDITION_VARIABLE BufferNotFull; // 表征缓冲区未满
CRITICAL_SECTION BufferLock; // 临界区保护

2.2 生产者线程

下面是一个支持有限长度缓冲区的生产者线程的核心代码:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
while(...)
{
// 临界区保护
EnterCriticalSection(&BufferLock);
// 如果缓冲区满, 线程就释放临界区并在这里睡眠掉.
while(QueueSize==BUFFER_SIZE)
{
SleepConditionVariableCS(&BufferNotFull, &BufferLock, INFINITE);
}
// 如果有数据, 才会走下去, 在这里处理缓冲区, 并最后释放临界区
LeaveConditionVariable(&BufferNotEmpty);
// 这是最重要的, 当数据被加入, 则缓冲区不为空, 通知消费者线程...
WakeConditionVariable (&BufferNotEmpty);
}

2.3 消费者线程

下面是对应的消费者线程. 它会检测缓冲区是否为空, 如果为空, 就释放临界区并等待睡眠.

1
2
3
4
5
6
7
8
9
10
11
12
while(true)
{
EnterCriticalSection(&BufferLock);
while(QueueSize==0)
{
SleepConditionVariableCS(&BufferNotEmpty, &BufferLock, INFINITE);
}
// 处理数据....
LeaveCriticalSection(&BufferLock);
// 通知生产者队列不满了
WakeConditionVariable(&BufferNotFull);
}

3. Linux

Linux下面最简单的做法是使用pthread函数. 记得Linux也有local函数的?

4. C#

在.NET里面, 迄今没有看到条件变量, 不知是因为这个概念太深奥, 没有资料介绍, 还是说它被封装到别的地方了. 一般资料里面都说使用Event来通知唤醒. 但是休眠这件事情就不知道怎么使用了.

在.NET中, 提供了BlockingCollection. 在微软的资料里面说, 它就是专门用来实现生产者-消费者模式的数据结构.

BlockingCollection提供了三种操作方式:

  • 使用AddTake
  • 使用TryAddTryTake
  • foreach中使用BlockingCollection<T>.GetConsumingEnumerable()

另外, 它还提供了CompleteAdding()接口, 用来标识生产者完成了添加操作. 这个方法极为方便, 省的我们自己实现队列的时候自己实现这个标志了. 当设置了完成后, 再使用Add(),Take(), Try操作的时候, 会抛出异常InvalidOperationException异常来.

MSDN中给出来的例子中, 似乎是Take会在队列为空的时候阻塞, Add会在队列满的时候阻塞, 也就是不需要自己使用条件变量来进行休眠和通知了. 而TryAddTryTake则不会这么做, 所以会是忙循环的操作. 但是GetConsumingEnumerable()是怎么做的, 就没有看到…

4.1 使用AddTake实现的生产者-消费者:

生产者:

1
2
3
4
5
6
for(...)
{
//...
buffer.Add(value);
}
buffer.CompleteAdding();

使用TryAdd的实现:

消费者

1
2
3
4
5
6
7
8
9
10
11
12
13
while(!buffer.IsCompleted)
{
try
{
value = buffer.Take();
}
catch(InvalidOperationException)
{
//...
break;
}
// 处理value
}

4.2 使用TryAddTryTake实现的生产者-消费者:

当使用TryAddTryTake时, 若集合为空, 或者已满, 会发挥false.
下面的例子中还演示了取消操作:

生产者

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
while(...)
{
try
{
success = buffer.TryAdd(value, timeout, token);
}
catch(OperationCanceledException) // 收到了取消信号
{
// 取消生产, 设置为已完成( 这个实际上和后面的重复了)
buffer.CompleteAdding();
break;
}
if(success)
{
// 添加成功的处理
}
else
{
// 添加失败的处理...
}
}
// 设置完成标志
buffer.CompleteAdding();

消费者

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
while(!buffer.IsComplete)
{
try
{
if(!buffer.TryTake(out value, 0, token))
{
// 处理成功
}
else
{
// 处理失败场景(为空)
}
}
catch(OperationCanceledException)
{
// 用户取消了操作
}
}

4.3 使用foreachGetConsumingEnumerable

使用GetConsumingEnumerable(), 在获取数据的同时从队列中删除数据.
则消费者可以这样实现:

1
2
3
4
foreach(var item in buffer.GetConsumingEnumerable())
{
// 处理数据...
}

另外, 按照MSDN的说法, 如果你只是想枚举集合而不修改, 可以仅使用foreach. 但是这个时候得到的是执行时的数据快照.

不过按照这个说法, 它至少能够保证数据的完整性, 即使其他线程增删了数据, 也不会影响这里. 不过这个很难测试.

4.4 CancelToken

我们可以使用另外一个线程来处理Token. 例如在前面的TryAdd的例子中, 我们可以定义一个线程

1
2
3
4
5
6
7
8
var cts = new CancellationTokenSource();
Task.Run(()=>{
// 比如, 用户输入了C就取消
if(Console.ReadKey(true).KeyChar=='C')
{
cts.Cancel();
}
});

注意的是, 要记得释放掉cts:

1
cts.Dispose();