实现批量命令的并行处理

传统上, Restful的命令都是同步执行的, 一条命令执行完毕后再执行另一条.
但是, 我们的产品的一条命令需要批量执行多条命令的查询, 下载几十甚至上百个文件. 如果串行执行, 会消耗大量的时间在等待上, 包括服务器的命令执行等待和网络传输的等待.

一种做法是简单地启动线程池, 每个命令一个线程, 最后等待完成. 但是并不好, 本身, Qt提供的QNetworkAccessManager支持异步处理. 我们需要实现批量发送命令, 并且同步等待所有结果完成. 即我们将整个的多个命令的发送和接受作为一个阻塞式的任务, 批量发送, 异步处理, 并且等待所有命令结果都收到后再返回.

下面是批量下载文件的接口的简化实现:

  • paths是要下载的图片的网络地址的数组.
  • id_list是每个要下载的图片的标识ID, 用于区分识别.
    最终返回的是一个QMap<id, QPixmap>.
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
QMap<QString, QPixmap> DeepApi::batchDownloadImages(const QStringList &paths, const QStringList &id_list)
{
BatchRequestManager mgr(0);
for(int i=0; i<paths.size(); ++i)
{
if(paths.at(i).isEmpty())
continue;
auto reply = getASync(paths.at(i));
mgr.addNetworkReply(reply, id_list.at(i));
}
QMap<QString, QPixmap> result;
auto data = mgr.getAllRaw();
for(auto iter=data.cbegin(); iter!=data.cend(); ++iter)
{
QPixmap pixmap;
pixmap.loadFromData(iter.value());
result.insert(iter.key(), pixmap);
}
return result;
}

getAsync()是命令发送接口, 它最终映射到下面的函数. 它发送一条命令, 并返回一个QNetworkReply的指针. 这个指针我们传递给一个BatchRequestManager来管理和维护.

1
2
3
4
5
6
7
8
9
10
QNetworkReply *DeepApi::sendAsync(const QString &path, const QByteArray &method, const QByteArray &data, const QVariantMap &headers)
{
QNetworkRequest req(_impl->_url.resolved(QUrl(path)));
req.setRawHeader("Authorization", _impl->_token);
for(auto iter=headers.begin(); iter!=headers.end(); ++iter)
{
req.setRawHeader(iter.key().toUtf8(), iter.value().toByteArray());
}
return _impl->_netmgr.sendCustomRequest(req, method, data);
}

BatchRequestManager的声明如下. 它最主要的对外接口是addNetworkReply()getAllRaw(). 另外, 则是slot处理函数replyFinished().

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
class DEEPLIB_EXPORT BatchRequestManager : public QObject
{
Q_OBJECT
public:
explicit BatchRequestManager(int mode, QObject *parent = nullptr);
virtual ~BatchRequestManager();
void addNetworkReply(QNetworkReply* reply, const QString& id);
QMap<QString, QByteArray> getAllRaw() const;
signals:
void itemReady(const QString& id, const QByteArray& data);
void finished();
private:
void replyFinished(const QString& id);
private:
struct Implementation;
QScopedPointer<Implementation> _impl;
};

addNetworkReply()中, 将QNetworkReply指针的finished()信号关联到replyFinished上面:

1
2
3
4
5
void BatchRequestManager::addNetworkReply(QNetworkReply *reply, const QString &id)
{
connect(reply, &QNetworkReply::finished, this, std::bind(&BatchRequestManager::replyFinished, this, id));
_impl->_total ++;
}

在这里, 使用了std::bind函数将id作为一个附加参数传递进去, 从而能够区分出每一个QNetworkReply指针来.

不知道为啥, 这里不能使用lambda来绑定. 如果使用lambda, 则sender()函数总是nullptr. 原因未知. 可能涉及到绑定的细微的差异.

replyFinished()中, 则是对接收到的每个命令响应做处理. 我们还定义了一个类属性_numOfFinished, 用来统计收到的响应的个数, 以快速判断是否所有的命令都结束了.

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
void BatchRequestManager::replyFinished(const QString &id)
{
QByteArray data;
auto reply = dynamic_cast<QNetworkReply *>(sender());
if(reply->error() == QNetworkReply::NoError)
{
data = reply->readAll();
}
emit itemReady(id, data);
reply->deleteLater();
QMutexLocker locker(&_impl->_mutex);
_impl->_raw_result.insert(id, data);
if(++_impl->_numOfFinished == _impl->_total )
{
emit finished();
}
}