实现批量命令的并行处理
传统上, Restful的命令都是同步执行的, 一条命令执行完毕后再执行另一条.
但是, 我们的产品的一条命令需要批量执行多条命令的查询, 下载几十甚至上百个文件. 如果串行执行, 会消耗大量的时间在等待上, 包括服务器的命令执行等待和网络传输的等待.
一种做法是简单地启动线程池, 每个命令一个线程, 最后等待完成. 但是并不好, 本身, Qt提供的QNetworkAccessManager
支持异步处理. 我们需要实现批量发送命令, 并且同步等待所有结果完成. 即我们将整个的多个命令的发送和接受作为一个阻塞式的任务, 批量发送, 异步处理, 并且等待所有命令结果都收到后再返回.
下面是批量下载文件的接口的简化实现:
paths
是要下载的图片的网络地址的数组.
id_list
是每个要下载的图片的标识ID, 用于区分识别.
最终返回的是一个QMap<id, QPixmap>
.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20
| QMap<QString, QPixmap> DeepApi::batchDownloadImages(const QStringList &paths, const QStringList &id_list) { BatchRequestManager mgr(0); for(int i=0; i<paths.size(); ++i) { if(paths.at(i).isEmpty()) continue; auto reply = getASync(paths.at(i)); mgr.addNetworkReply(reply, id_list.at(i)); } QMap<QString, QPixmap> result; auto data = mgr.getAllRaw(); for(auto iter=data.cbegin(); iter!=data.cend(); ++iter) { QPixmap pixmap; pixmap.loadFromData(iter.value()); result.insert(iter.key(), pixmap); } return result; }
|
getAsync()
是命令发送接口, 它最终映射到下面的函数. 它发送一条命令, 并返回一个QNetworkReply
的指针. 这个指针我们传递给一个BatchRequestManager
来管理和维护.
1 2 3 4 5 6 7 8 9 10
| QNetworkReply *DeepApi::sendAsync(const QString &path, const QByteArray &method, const QByteArray &data, const QVariantMap &headers) { QNetworkRequest req(_impl->_url.resolved(QUrl(path))); req.setRawHeader("Authorization", _impl->_token); for(auto iter=headers.begin(); iter!=headers.end(); ++iter) { req.setRawHeader(iter.key().toUtf8(), iter.value().toByteArray()); } return _impl->_netmgr.sendCustomRequest(req, method, data); }
|
BatchRequestManager
的声明如下. 它最主要的对外接口是addNetworkReply()
和getAllRaw()
. 另外, 则是slot处理函数replyFinished()
.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18
| class DEEPLIB_EXPORT BatchRequestManager : public QObject { Q_OBJECT public: explicit BatchRequestManager(int mode, QObject *parent = nullptr); virtual ~BatchRequestManager(); void addNetworkReply(QNetworkReply* reply, const QString& id); QMap<QString, QByteArray> getAllRaw() const; signals: void itemReady(const QString& id, const QByteArray& data); void finished(); private: void replyFinished(const QString& id); private: struct Implementation; QScopedPointer<Implementation> _impl; };
|
在addNetworkReply()
中, 将QNetworkReply
指针的finished()
信号关联到replyFinished
上面:
1 2 3 4 5
| void BatchRequestManager::addNetworkReply(QNetworkReply *reply, const QString &id) { connect(reply, &QNetworkReply::finished, this, std::bind(&BatchRequestManager::replyFinished, this, id)); _impl->_total ++; }
|
在这里, 使用了std::bind
函数将id作为一个附加参数传递进去, 从而能够区分出每一个QNetworkReply
指针来.
不知道为啥, 这里不能使用lambda来绑定. 如果使用lambda, 则sender()
函数总是nullptr. 原因未知. 可能涉及到绑定的细微的差异.
在replyFinished()
中, 则是对接收到的每个命令响应做处理. 我们还定义了一个类属性_numOfFinished
, 用来统计收到的响应的个数, 以快速判断是否所有的命令都结束了.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17
| void BatchRequestManager::replyFinished(const QString &id) { QByteArray data; auto reply = dynamic_cast<QNetworkReply *>(sender()); if(reply->error() == QNetworkReply::NoError) { data = reply->readAll(); } emit itemReady(id, data); reply->deleteLater(); QMutexLocker locker(&_impl->_mutex); _impl->_raw_result.insert(id, data); if(++_impl->_numOfFinished == _impl->_total ) { emit finished(); } }
|