ImageMatcher设计与实现

1. 概述

干镜的扫描软件需要做连续扫描, 并将结果拼接成一张完整的大图形式. 涉及到几个难点:

  1. 图像数据量很大. 最低的20倍镜头, 扫描玻片全片大约是2200张图片. 40倍则是接近10000张, 100倍则是6万张上下. 每张图片是2448x2048的分辨率, 24bit彩色图像, 原始数据是15M上下.
  2. 扫描仪扫描图片的速度是极快的, 以20倍扫描为例, 2200张图片大约90秒就能够完成扫描. 总的数据量超过了30G, 已经超过了电脑的内存大小. 如果再加上其他过程需要的内存, 完全会将系统撑死. 因此, 图片的处理必须能够尽量跟的上扫描的速度.
  3. 图像的拼接. 扫描仪的硬件精度不是很好, 在运行扫描过程中, 帧与帧之间大约又±50个像素的误差, 因此无法直接利用硬件(步进电机)的位置作为拼接, 只能作为一个拼接的基准值, 使用图像匹配的方式来做精确的匹配. 而图像匹配的速度是相对比较慢的. 比如想尽一切办法来进行优化. 甚至, 我们都要想办法来尽量拖慢扫描仪的扫描进度(因为它不归我们控制, 我们不好对第三方提出减慢扫描速度的要求).
  4. 扫描的图片中可能存在大片的空白区域, 是真正的空白区域, 图像匹配一定是结果异常的. 但是必须要能够适应, 不能说匹配失败了就终止拼接, 既然是空白, 或者邻域空白, 那么总要找一个合适的地方来放置它.
  5. 要定义一种文件格式, 能够保存并且快速查看这么巨大的图像, 并且不能有太高的资源占用率.

在过程中, 我们考虑过利用GPU来加速图像匹配的速度, 但是实际测试下来, 价值并不大. 因为我们对图像处理很单一, 仅仅是加载图片, 做一次图像匹配检测, 然后就释放. 大量的时间实际上消耗在从CPU到GPU之间的数据总线传输上了, 和CPU多核处理比较起来, 价值很有限.

2. 基本设计

ImageMatcher的核心设计有几点:

  1. 尽量挖掘多核CPU的潜力, 实现拼接的线程化.
  2. 随扫随拼随释放, 图像尽量不在内存中积压
  3. 尽量不使用文件系统做缓存, 一张图片的整个生命周期中都在内存中存在. 因为一旦涉及到硬盘读写, 都会极大的拖累整个系统的性能.
  4. 尽量实现系统的可靠性和可调试性. 为此, 专门实现了一个可以用于多线程环境的日志系统.

3. 类结构

包括下面几个类:

  • ImageProcessor: 实现图片的采集和拼接功能
  • TileArchiverWriterEx和TileArchiverReaderEx: 实现基于Tile的文件存储和解析.
  • ImageView: 一个实现查看TileArchiver格式的图像的控件.
  • ILogger: 日志类的封装, 用于支持多线程下的调试日志输出到文件.

4. ImageProcessor类实现机理

类的主要接口如下所示:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
class ImageProcessor : public QObject, public ILogger
{
explicit ImageProcessor(const QString& task_name
, const QString& work_path
, DeepLogger::Logger::Ptr logger
, QObject* parent=nullptr);

// 执行相关的接口
// 准备运行环境, 创建工作线程.
void prepare(int cols, int rows);
// 添加一个视野到异步任务中.
void addImageAsync(Offset pos, cv::Mat data, RawImageFuncType raw_func=RawImageFuncType());
// 等待任务结束
void waitForFinished();
int waitingFutures();
// -- 任务管理函数
void startMatchTasks();

// -- 启动图像分割线程
void startSplitTask();

// 启动ArchiverServer
void startTileSaveServer();

// 启动瓦片制作的线程.
void startTileTask(TileArchiverEx::TileIndexKey pos, cv::Mat image, bool is_last_col, bool is_last_row);

//## 图像匹配和拼接相关
// 按照列来执行匹配
void matchProcByCol(int thread_id, int start_col, int end_col, int step);
// 瓦片切割函数.
void splitViewProc();
// 设置视野数据输入标志
void setViewDataInputed(ImageElement& view);
// 等待视野数据输入
void waitingViewDataInputed(ImageElement& view);
// 设置视野数据就绪标志
void setViewDataReady(ImageElement& view);
// 等待视野数据就绪
void waitingViewDataReady(ImageElement& view);
// 设置视野已经匹配完毕
void setViewMatched(ImageElement& view);
// 等待视野匹配完毕
void waitViewMatched(ImageElement& view);
// 设置视野一定定位(全局坐标已经确定), 当前什么也不做
void setViewLocated(ImageElement& view);
// 等待视野全局坐标设置完毕. 当前什么也不做
void waitViewLocated(ImageElement& view);

// 对视野做匹配计算
void matchSurround(ImageElement& view);
// ## Split功能
// 初始化Slice缓存
void initSliceCache();

// 切割一行视野
void splitViewLines(int row);
// 生成Slice的图像(拼接)
void makeSlice(int col, int row, bool is_last_col, bool is_last_row);
//
void makeSliceProc(int col, int row, bool is_last_col, bool is_last_row);
//## 缓存和内存管理功能
// 实现缓存视野的功能: 将视野的数据发送给@ref cacheServer()
void cacheView(Offset pos, cv::Mat image);

// 保存Slice.
void cacheSlice(int col, int row, cv::Mat image);

// 保存cv::Mat的内容
void dumpMat(int col, int row, cv::Mat mat, const QString& base_path, ImageFormat format, int quality);

// 打包成pack的函数. 是对TileArchiver的封装
void archiverServer();
}

4.1 线程规划

本质上ImageProcessor是数据流的处理模式:

图像数据 -> 匹配计算 -> 切割 -> Tile -> 归档

每个处理步骤由一个或几个工作线程完成. 为了实现简单, 这些全部在线程池中进行. 我们使用QtConcurrent::run()来创建工作线程, 而不是实现更复杂的线程对象.

事实上我们也不需要对启动的工作线程做什么额外的控制. 扫描过程是一旦启动就自动进行的, 出错了唯一的可以做的工作就是抛弃数据, 退出程序, 重新扫下一张数据. 主要把shadule线程杀掉就可以了.

线程规划如下:

  • 一个Matcher线程池, 用于匹配计算相邻视野的偏移量位置. 支持两种策略: 每个视野一个工作线程, 和每列视野一个工作线程. 前者为matchProcByView, 后者为matchProcByCol. 对于perView策略, 每个工作线程是临时性的, 完成自己视野的计算就结束了, 对于perColumn来说, 每个线程的生命周期是长期的, 要一直等到整个扫描处理结束后才会逐步结束. 相对来说, 前者更容易管理. 后者因为是持久性线程, 实际上线程数量远远超过了CPU的理论规格数量. 必须对线程池的大小做特别的设置.

  • 一个用于做切分线程splitViewProc, 它处理计算好匹配位置的视野, 并计算出切分的位置

  • 一个切分实施线程池, makeSliceProc, 每个要被切分的视野由一个工作线程所处理, 它会根据splitViewProc计算出来的偏移量, 从各个原始图片中获取图像部分的数据, 并拼接成一张切分图片.

  • 生成Tile的线程池, 每个splitViewProc()生成切分图片后会创建一个makeSliceProc()来做瓦片化处理. 每四个相邻的切分子图会缩小合并成上一级的金字塔. 这四个makeSliceProc线程会保留一个继续做上一级的合并, 其他三个工作线程就结束.

  • 一个Tile格式归档工作线程archiverServer, 它负责生成最终的扫描结果文件. 这个文件由一级级的图像金字塔组成, 并且提供了索引机制, 以便快速定位到具体某一个金字塔图像.

4.2 线程生命周期

4.2.1 启动线程

  • ImageProcessor::prepare()负责启动长生命周期线程. 在扫描开始时进行.

    • 启动切分线程splitViewProc()
    • 启动图像档案文件保存线程archiverServer()
    • 在preColumn模式下, 启动所有的匹配线程matchProcByCol. 如果是perView模式, 则不需要实现启动.
  • splitViewProc()线程中, 负责启动每个子图切割线程makeSliceProc(). 目前我们的做法是按行处理, 当识别到新得到的扫描图片能够拼接出新的一行后, 就启动一行子线程makeSliceProc(), 分别计算. 这些工作线程会一直向上生成金字塔, 最终只剩下一个线程完成最高一张金字塔图像.

4.3 主要数据结构

4.3.1 视野信息

定义结构ImageElement来表示一个视野的处理状态和信息, 使用一个QMap<Offset, ImageElement>来保存所有视野的数据. 其中, Offset是一个自定义结构, 其实就是一个QPoint, 表示这个视野的行列号.

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
struct ImageElement
{
QSharedPointer<QSemaphore> _view_inputed; //!< 数据已经采集.
QSharedPointer<QSemaphore> _view_available; //!< 视野数据可用标志.
QSharedPointer<QSemaphore> _view_matched; //!< 视野是否完成匹配(其offset有意义了)
//!< 被@ref splitViewProc()读
QSharedPointer<QSemaphore> _view_located; //!< 视野的全局坐标是否计算完了.
QAtomicInteger<quint32> _used_flag; //!< 标记视野使用情况的原子量, 比_view_matched更为精细.
//!< 它的几个比特位分别定义如下:
//! BIT0: 被垂直比较(被DOWN比较)
//! BIT1: 被水平比较(被Next比较)
//! BIT2: 完成Split.
//! 这三个被置位后, 就可以删除了. (Cache被复制数据使用, 不涉及)
Offset cord; // 视野的行列号
cv::Mat data; // 图像数据
double degree; // 图像匹配度
bool credible; // 表示本视野的坐标是否可信(如果匹配失败或不可信, 使用默认值, 就是不可信的)
int anchor; // 锚定的边. 0:LEFT, 1:UPPER, 2:RIGHT, -1: 自由
QPoint offset; // 相对于锚定的边的视野的相对位置
QRect grect; // 在全局坐标系中的位置
ImageElement() ...
ImageElement(Offset pos, QSize size) ...
...
};

上面的结构是针对perColumn的场景提出来的. 几个信号量用于标记这个线程的工作状态, 每个match线程会等待检测它的相邻线程的就绪标记, 当检测到邻居数据就绪了, 就可以开始匹配计算. 使用完邻居数据后, 会设置其使用标记_used_flag. 每个线程都有义务在自己或邻居被使用完毕后释放自己的图像数据.

后面的一些属性是匹配要使用的. 计算完匹配后, 会记录下和邻域的相对位置. 这是通过一个anchoroffset标记的. anchro表示它基于它的哪个邻居视野–上面的视野, 或者左/右视野–因为扫描是蛇形扫描, 上一个扫描视野可能在它的左边, 也可能在它的右边.

所有的视野的ImageElement构成一个QMap结构:
ImageMap m_imageCache; //< 视野的缓存
这个Map在扫描开始前就准备好, 在整个扫描匹配过程中不会发生element的增删操作, 涉及到多线程互斥操作的都是信号量或原子量, 就不再需要在访问时特别做互斥保护了.

4.3.2 Tile信息

TileElement用于表示每个Tile的状态和数据:

1
2
3
4
5
6
7
8
9
10
11
struct TileElement
{
TileIndexKey _pos; //!< Tile的位置
bool _is_col_end;//!< 这个Tile是否是本级列末
bool _is_row_end;//!< 这个Tile是否是本级行末
QAtomicInteger<FlagType> _flags; //!< 标志_sub_data中的每个子块是否就绪了.
QVector<cv::Mat> _sub_data; //!< 组成Tile的子数据.
cv::Mat _data; //!< Tile的数据.
QSharedPointer<QMutex> _mutex;
...
};

其中, TileIndexkey表示这个Tile的位置, 它的定义如下. 其三个成员分别表示这个Tile的层级, 以及在这一层中的行列号. 我们使用它来标记定位每一个Tile.

1
2
3
4
5
struct TileIndexKey{
qint32 _level;
qint32 _col;
qint32 _row;
};

由于我们是数据随完成随归档保存, 因此数据文件中的tile是无序的, 并且也不能保证相同的两次扫描数据存储的一致性. 同时, 在Tile中保存的是压缩后的图像数据, 长度也是不确定的. 为此, 我们在数据文件中使用一个索引来标记每个Tile的起始位置和数据长度.

1
2
3
4
5
6
7
struct TileIndexReal{
FlagType _tag;
qint32 _level;
qint32 _col;
qint32 _row;
qint64 _offset;
};

5. ImageProcessor主要处理算法

5.1 添加图像 addImageAsync()

当扫描仪检测线程从扫描仪读取到一张图片后, 调用此函数, 将图片添加到对应的图像缓冲数据中m_imageCache中去. 并设置图像就绪标志.

1
2
3
4
5
6
7
8
9
10
11
12
13
14
void ImageProcessor::addImageAsync(Offset pos, cv::Mat data, RawImageFuncType raw_func)
{
...
auto& view = getImageElement(pos);
cv::Mat image = raw_func ? raw_func(pos, data) : data;
image.copyTo(view.data);
setViewDataInputed(view);
}

void ImageProcessor::setViewDataInputed(ImageElement &view)
{
view._view_inputed->release(255);
}

这样, 这个视野自己的匹配线程就可以进行匹配操作, 而它的相邻的视野的匹配线程也可以使用它来做匹配数据了.

如果使用得是perViewperThread得模式, 则在这个函数中启动对应视野得matchProcByView()线程. 其他得处理也是一样得.

5.2 视野匹配线程 matchProcByCol()

这个函数是按列做匹配的长生命周期工作线程. 我们的列分布, 准确的说, 是一个线程处理等间隔的几列的视野匹配. 当初设计的初衷是让线程能够尽可能彼此错开计算, 因此数据是1,2,3,4,5这样的顺序, 而如果是两个线程, 则一个线程处理1,3,5, 一个线程处理2,4,6列, 这样尽量避免一个线程处理大量数据而另一个线程闲着.

工作线程根据蛇形扫描顺序, 依次等待它应该接收到的下一个视野的图像数据, 等待相邻视野数据就绪后执行匹配计算, 并将计算出的结果写入到ImageElement中.

这里要注意的一点是, 如果是使用OpenCV的GPU, 一定要创建一个matcher对象, 并且这个matcher对象不能在线程之间共用. 实际测试下来, 发现GPU价值并不大.

匹配处理线程只做匹配, 并记录匹配结果, 并不做其他的事情. 后续的处理由分割线程splitViewProc()来做.

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
void ImageProcessor::matchProcByCol(int thread_id, int start_col, int end_col, int step)
{
#ifdef HAS_GPU
auto matcher = cv::cuda::createTemplateMatching(CV_8UC3, cv::TM_CCORR_NORMED);
cv::cuda::Stream stream;
#endif
for(int row=0; row<_scan_rows; ++row)
{
int col_begin = isEven(row) ? start_col : end_col;
int col_end = isEven(row) ? (end_col + step) : (start_col-step);
int col_step = isEven(row) ? step : -step;

for(int col=col_begin; col!=col_end; col+=col_step)
{
Offset pos(col, row);
auto& view = getImageElement(pos);
// 1. 等待数据就绪
waitingViewDataInputed(view);
// 5. 缓存视野. 此时视野数据为原始数据
cacheView(pos, view.data);
// 如果不做匹配, 后面的都不需要处理.
if(!_is_do_match)
{
inspect2(__func__, "do not match!");
setViewDataReady(view);
continue;
}

// 2. 可能的其他操作处理
if( _pre_match_func){
_pre_match_func(pos);
}
// 3. 做暗角矫正
calibViewVignet(view);
// 4. 设置数据可用标志. 设置数据可用后, 其他的线程就可以访问它的数据.
setViewDataReady(view);
// 6. 匹配计算. 计算出偏移量.
// 注意, 计算完毕后设置_view_matched标志, 通知线程splitViewProc()来计算坐标和分割视野.
#ifdef HAS_GPU
matchSurround(view, matcher, stream);
#else
matchSurround(view);
#endif
this->matchTicker().stop();
setViewMatched(view);
}
}
log2(__func__, "matchProc() Finished: ", DSHOW(thread_id));
}

5.3 视野匹配

收到一个视野, 要将它和它正上方的视野, 以及它的扫描前序视野做匹配. 如果是最上面一行, 则不需要和上面的匹配, 如果是扫描过程中一行的第一个视野, 也没有前序视野可以匹配. 这些都是在matchSurround()里面需要处理的.

因为我们是按列处理, 因此, 一个视野正上方的视野数据一定是就绪的, 它的前序视野则需要等待那个处理线程的就绪标志.

匹配使用opencv的matchTemplate()函数进行, 本身没有太多的需要特别说明的.

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
void ImageProcessor::matchSurround(ImageElement &view)
{
auto pos = view.cord; // 当前view的位置
// 和两个邻居的匹配结果
MatchResult result1{MatchFlag::UnMatch, 0.0, QPoint(), 0.0 }, result2{MatchFlag::UnMatch,0,QPoint(), 0.0};
// 在垂直方向上, 一定是和上面匹配
int direction1 = UP;
// 水平方向上, 如果是偶数行, 扫描从左到右, 因此和左边匹配; 奇数行是和右边匹配.
int direction2 = pos.y() % 2==0 ? LEFT : RIGHT;
cv::Rect srcRect1, tmplRect1, srcRect2, tmplRect2;
// 先和上面的视野匹配. 上面的视野一定是就绪了的, 因此不需要等待.
// 注意, 因为扫描仪的设计, 上下移动的偏差要大于左右, 因此, 匹配时应当以左右的结果为准.
if(!isTopView(pos))
{
auto peer_pos = getUpPos(pos);
auto& peer = getImageElement(peer_pos);
TRACE_IF(peer.data.empty(), QString("视野{%1,%2}上面的视野{%3,%4}图像非法").arg(pos.x()).arg(pos.y()).arg(peer_pos.x()).arg(peer_pos.y()));
// 获取source和tmplate在各自图片中的位置区域. 它们是固定的值.
srcRect1 = getSrcRect(direction1);
tmplRect1 = getTmplRect(direction1);
try
{
// 获取图像内容.
auto srcImg = peer.data(srcRect1);
auto tmplImg = view.data(tmplRect1);
// 执行匹配.返回结果是匹配点在src中的坐标值.即templateMatch的结果.
auto t1 = cv::getCPUTickCount();
result1 = matchHost(srcImg, tmplImg);
auto t2 = cv::getCPUTickCount();
view._click += t2-t1;
if(result1._credit==Matched)
{
view.offset = QPoint(
srcRect1.x + result1._offset.x() - tmplRect1.x,
srcRect1.y + result1._offset.y() - tmplRect1.y
);
}
else // 匹配失败时, 使用默认值处理
{
view.offset = QPoint(this->_vx0, this->_vy0);
}
view.degree = result1._degree;
view.credible = (result1._flag == Matched);
view.anchor = direction1;
inspect2(__func__, "View", DSHOW(pos), "match top view",peer_pos," finished. result is: "
, DSHOW(result1._flag), DSHOW(result1._degree), DSHOW(result1._credit)
, DSHOW(result1._offset), "\n", DSHOW(view.offset)
);
}
catch(cv::Exception& e)
{
error2(__func__, "match upside exception! current: ", pos, ", up view: ", peer_pos, "exception: ", e.what());
}
}
else
{
inspect2(__func__, "current view need not match to up", DSHOW(view.offset));
}
// 和左右比较, 这里需要等待
// 另外, 因为上下的偏差大于左右, 因此应该是优先使用左右匹配的结果.
if(!isLineFirst(pos))
{
...
}
...
trace2(__func__, view.cord, "has finished matched. ", DSHOW(view.offset), DSHOW(view.anchor));
}

5.4 分割线程 splitViewProc()

分割线程是一个独立的线程, 它会按扫描的顺序依次等待每个扫描视野匹配就绪. 然后计算这个视野在全局坐标系中的位置, 也更新ImageElement集合m_imageCache.

当检测到一行扫描视野都完成匹配之后, 就执行这一行数据的分割工作, 分割出一个个相邻的不重叠的子图像.

这个线程中, 会释放掉m_imageCache中不再使用的视野的图像数据.

前面讨论的所有的匹配工作线程的结果都会汇集到这个线程来处理, 然后再分发出tile处理线程分别处理, 因此这个线程是一个数据流程中的中间汇集点, 也是吞吐量性能的瓶颈. 因此这个线程中不做大规模耗时的计算处理, 也不做IO操作. 仅仅做计算和分发.

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
void ImageProcessor::splitViewProc()
{
int rows = this->_scan_rows;
int cols = this->_scan_cols;
// 蛇形扫描下的先后顺序和坐标关系的定义
struct {
int begin;
int end;
int delta;
} controls[2] = {
{0, cols, 1} // 从左向右扫描, 偶数行
,{cols-1, -1, -1} // 从右向左扫描, 奇数行
};

// 按照扫描顺序遍历每个视野, 等待视野被定位
for(int row=0; row<rows; ++row)
{
int idx = row % 2;
for(int col=controls[idx].begin; col!=controls[idx].end; col+=controls[idx].delta)
{
Offset pos = Offset(col, row);
ImageElement& view = getImageElement(pos);
// 在此阻塞等待视野匹配完成.
waitViewMatched(view);
// 视野0是基础点. 我们可以考虑在这里重新定义它的基地址.
if( col==0 && row==0){
view.grect = view.grect.translated(-_protect_horz, -_protect_vert);
setViewLocated(view);
}
else
{
// 获取锚点的视野. 这里会阻塞等待.
auto anchor_pos = (view.anchor==LEFT || view.anchor==RIGHT) ? getPrevPos(pos) : getUpPos(pos);
auto& anchor_view = getImageElement(anchor_pos);
waitViewLocated(anchor_view);
view.grect = anchor_view.grect.translated(view.offset);
setViewLocated(view);
}
}
trace2(__func__, "scan ", DSHOW(row) );
// 扫描完毕第一行后, 计算出总的Slice的列数.
if(row==0){
initSliceCache();
}
// 将这一行视野的图像都拷贝到相关的Slice中, 并释放相关的内存数据.
splitViewLines(row);
}
}

真正的切割工作是在函数splitViewLines()中进行的:

5.5 行切割函数splitViewLines()

行切割函数在splitViewProc()检测到扫描了一行数据后进行. 为了简化实现, 我们有一定的限制:

  • 函数成功的前提是扫描视野图像的高度要大于要切割出来的子图的高度. 这个不是什么特别大的问题. 我们扫描仪的扫描视野是2048x2448的高度, 而切割出的视图大小要求不能超过1024x1024. 最高也不超过2048.
  • 要求扫描的一行视野在垂直方向上不会有太大的差别. 这个也不是问题. 经过实测, 在扫描仪各个倍率对应允许的扫描宽度范围内, 扫描仪的精度完全足以满足这个限制: 扫描仪只是每次换行时有较大的机械误差, 在同一行扫描的过程中, 其水平准确度还是能够控制在200个像素以内的.
  • 上述要求其实就是要求我们扫描最后一行后, 一定可以切割出新的一行出来. 而不是扫描玩最后一行, 发现由于重叠或参差, 导致是无效数据.
  • 还假设扫描完前两行之后一定可以切割出一行完成的slice. 这个要求收紧一点, 就是slice的高度不能大于视野的高度.

上述限制都是为了让代码实现更简单易懂而加的限制. 完全可以满足.

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
void ImageProcessor::splitViewLines(int view_row)
{
// 1. 计算这一行视野的范围, 并进而计算出穿过边界线的Slice的行号的范围
auto left_view = getImageElement(0, view_row);
int top_up = left_view.grect.top(); // 这一行视野中上表面最高的位置
int top_down = top_up; // 这一行视野中上表面最低的位置.在这个位置之下的Slice一定是新建的.
int bottom_up = left_view.grect.bottom(); // 下表面中最高的位置. 在这个位置之上的Slice一定是完成了的.
int bottom_down = bottom_up; // 下表面中最低的位置.
static int pre_finished_slice_rows = 0; // 记录上一行填满的Slice行数. 也是这一行View能够填满的第一行Slice.
static int last_used_view_row = -1; // 记录上一次调用时清除的视野行.
// 新的一轮扫描时需要将静态变量重新初始化
if(view_row==0){
pre_finished_slice_rows = 0;
last_used_view_row = -1;
}
// 检查各列
for(int col=0; col<this->_scan_cols; ++col)
{
auto rect = getImageElement(col, view_row).grect;
top_up = std::min(top_up, rect.top()); //
top_down = std::max(top_down, rect.top());
bottom_up = std::min(bottom_up, rect.bottom());
bottom_down = std::max(bottom_down, rect.bottom());
}

// 计算都有哪些行的Slice收到这一行视野的影响.
int slice_row_begin = top_up / _slice_height;
int slice_row_end = bottom_down / _slice_height;
int slice_row_finished = bottom_up / _slice_height;

// 当前的Slice行号. 如果视野不是最后一行, 则包含不完整的最后一行.
// 如果是最后一行视野, 则只包含完整的最后一行.
// 注意,只有当View尺寸大于Slice尺寸, 最后一行View能够确保产生一行完整的Slice的时候这个假设才有效!
_slice_rows = isBottomView(view_row) ? slice_row_finished : slice_row_end+1;

// 2. 遍历这些Slice, 对每个Slice, 检查是否和这些视野有交集...
try{
for(int slice_row=slice_row_begin; slice_row<_slice_rows; ++slice_row)
{
for(int slice_col=0; slice_col<this->_slice_cols; ++slice_col)
{
// 创建或获取Slice. 并设置其行/列末尾标记.
SliceElement& slice = getSliceElement(Offset(slice_col, slice_row));

// 检查每个视野和每个Slice的重叠.
for(int view_col=0; view_col<this->_scan_cols; ++view_col)
{
auto& view = getImageElement(view_col, view_row);
QRect cp = slice.rect.intersected(view.grect);
if(cp.isValid()) // 视野被这个SLice所使用.
{
// 转成相对于View和Slice的地址位置
QRect s = cp.translated(-view.grect.topLeft());
QRect d = cp.translated(-slice.rect.topLeft());
ViewPart part{ view.cord, s, d/*, view.data(cv::Rect(s.left(), s.top(), s.width(), s.height()))*/};
// 注意, 这里使用复制, slice中带有自己的内存, 和View不共享内存.
// 我们要等所有的部分都齐备了才会拼接这个Slice. 因此, 这里仅仅是保存, 而不会合并.
auto rect = cv::Rect(s.left(), s.top(), s.width(), s.height());
TRACE_IF(!validPart(view.data, rect), QString("拼接异常: rect={x=%1,y=%2, w=%3, h=%4}, data size: {cols=%5, rows=%6}")
.arg(rect.x).arg(rect.y).arg(rect.width).arg(rect.height).arg(view.data.cols).arg(view.data.rows));
view.data(cv::Rect(s.left(), s.top(), s.width(), s.height())).copyTo(part.data);
slice.parts.insert(view.cord, part);
}
else
{
continue;
}
}
}// end of for slice_col
}//-- end of for slice_row
}
catch(cv::Exception& e)
{
error2(__func__, "opencv exception occured slice making :", e.what());
}

// 4. 对已经完成的Slice, 在这里拼接数并导出. makeSlice()会并发执行
for(int row=pre_finished_slice_rows; row<slice_row_finished; ++row)
{
auto is_last_row = isBottomView(view_row) && row==_slice_rows-1;
for(int col=0; col<_slice_cols; ++col)
{
auto is_last_col = col==_slice_cols-1;
// 对makeSliceProc的封装. 可以并发或串行调用. 这里是最主要的并发场景之一.
if(_is_make_slice)
{
makeSlice(col, row, is_last_col, is_last_row);
}
}
}
// 3. 释放不再使用的视野的内存: 因为这一行还会被下一行所使用, 所以正常情况下删除
// 当前行的前面一行; 如果是最后一行就删除两行.
// !! 这个算法不一定完美.
int begin, end;
// 当前行即时第一行也是最后一行.
if( isTopView(view_row) && isBottomView(view_row) )
{
begin = end = view_row;
}
else if (isTopView(view_row)) // 多行下的第一行. 不要删除.
{
begin = 0;
end = -1;
}
else if (isBottomView(view_row)){ // 多行下的最后一行
begin = view_row -1 ;
end = view_row;
}
else { // 多行下的中间行, 只需要删除一行.
begin = end = view_row-1;
}
// 释放内存.
for(auto row=begin; row<=end; ++row)
{
for(int col=0; col<_scan_cols; ++col)
{
getImageElement(col, row).data.release();
}
}
// 5.
pre_finished_slice_rows = slice_row_finished;
}

在上面, 会调用makeSlice()来处理识别出的一个子图. 它会启动一个工作线程makeSliceProc()来进行处理.

1
2
3
4
5
6
7
8
9
10
11
12
13
14
void ImageProcessor::makeSlice(int col, int row, bool is_last_col, bool is_last_row)
{
if(_slice_task_parallel)
{
_sync_mutex.lock();
_sync.addFuture(QtConcurrent::run(&this->_slice_thread_pool, this, &ImageProcessor::makeSliceProc, col, row, is_last_col, is_last_row));
log2(__func__, "current waiting tasks: ", DSHOW(_sync.futures().size()));
_sync_mutex.unlock();
}
else {
trace2(__func__, "start makeSliceProc parallel");
makeSliceProc(col, row, is_last_col,is_last_row);
}
}

5.6 数据拼接和瓦片处理 makeSliceProc

在上一级中, 识别出每个Slice, 并将信息保存在SliceElement结构中. SliceElement中的成员parts中保存了构成这个Slice的视野的索引

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
struct ViewPart
{
Offset view_id; //!< 所在View的编号
QRect view_rect; //!< 在View中的位置(局部坐标)
QRect slice_rect; //!< 在Slice中的位置(局部坐标)
cv::Mat data; //!< 图像数据
};

struct SliceElement
{
Offset cord; //!< 切片的位置
bool is_col_end;
bool is_row_end;
QRect rect; //!< 切片在全局坐标系中的位置(可以通过cord计算出来)
QMap<Offset, ViewPart> parts; //!< 相关的视野部分的信息
...
};

首先需要完成视野的拼接. 如果不做平滑处理, 则只需要简单地拼接拷贝就可以. 生成Slice后, 则执行打包处理. 打包处理是要根据这张原始Slice向上逐级生成金字塔层次. 每四张第一级的图片合并成一张高一级的, 这样逐级向上, 直到结束.

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
void ImageProcessor::makeSliceProc(int col, int row, bool is_last_col, bool is_last_row)
{
auto& slice = getSliceElement(col, row);
// 拼接Slice中的各个子区
cv::Mat slice_data(_slice_height, _slice_width, CV_8UC3);
// 简单地拼接, 没有做平滑处理.
for(auto iter=slice.parts.begin(); iter!=slice.parts.end(); ++iter)
{
auto part_img = iter.value().data;
auto part_rect = cvRectFromQRect(iter.value().slice_rect);
part_img.copyTo(slice_data(part_rect));
iter.value().data.release(); // 释parts中的图像内存
}
...
// 单独保存Slice到硬盘
if( _is_export_slice){
cacheSlice(col, row, slice_data);
}
// 打包保存
if( _is_archive)
{
this->startTileTask({0, col, row}, slice_data, is_last_col, is_last_row);
}
...
}

这个工作我们直接在这个线程中进行. 每个线程会检测自己的邻居是否就绪了, 如果这四个邻居都就绪了, 则其中一个线程负责生成上一级图片, 其他三个线程就退出结束. 这里的startTileTask()函数最终是调用了TileArchiverWriterEx::tileTask()来做这件事.

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
void TileArchiverWriterEx::tileTask(TileArchiverEx::TileIndexKey start_pos, cv::Mat image,bool is_last_col,bool is_last_row, bool is_save)
{
...
auto pos = start_pos;
// 创建0级的Tile. 0级Tile不涉及到sub.
auto& start_tile = getOrNewElement(pos, is_last_col, is_last_row, 0);
image.copyTo(start_tile._data);

// 第一行的最后一个视野负责设置0级Tile的列数;
// 最后一行Tile的第一个Tile负责设置0级Tile的行数
// 因为只有在所有Tile都生成之后才会用到, 所以不需要考虑同步保护的问题.
if(is_last_col && pos._row==0){
_imageInfo._cols_0 = pos._col+1;
_imageInfo._imgwidth = _imageInfo._cols_0 * _imageInfo._tilewidth;
}
if( is_last_row && pos._col==0){
_imageInfo._rows_0 = pos._row+1;
_imageInfo._imgheight = _imageInfo._rows_0 * _imageInfo._tileheight;
}
_start:
// 保存数据
TileElement& tile = getTileElement(pos);
if( this->_is_debug_open){ // 保存调试
dbgDumpTile(tile._pos, tile._data);
}
// 归档保存此Tile.
if(is_save)
{
this->archiveTile(pos, this->transfer(tile._data));
}
// 判断当前的Tile是否是唯一的Tile. 如果是, 处理就结束了.
if( tile._pos._col==0 && tile._is_col_end && tile._pos._row==0 && tile._is_row_end)
{
_imageInfo._levels = tile._pos._level + 1;
if(is_save)
{
this->archiveTile(NullIndex, QByteArray());
}
return;
}
else
{
// 获取或构造上一级Tile
qint32 flags = (tile._is_col_end && pos._col % 2==0) ? 0x0A : 0; // 最右边列如果是偶数列(缺一列)的子区的右边两个子区不存在
flags |= ( tile._is_row_end && pos._row %2==0) ? 0x0C:0; // 最下面列如果是偶数列(缺一列)的子区的下面两个子区不存在.

auto np = makeNextCoord(pos);
auto sub_col = np.second.x();
auto sub_row = np.second.y();

auto& sub_tile = getOrNewElement(np.first, tile._is_col_end, tile._is_row_end, flags);
// 将图像数据缩小一半填进去
cv::Mat sub_data;
cv::pyrDown(tile._data, sub_data);
setTileSubData(sub_tile, sub_col, sub_row, sub_data);
tile._data.release();
// 检查子区是否都就绪了
if( isTileDataReady(sub_tile))
{
mergeTileSubData(sub_tile);
pos = sub_tile._pos;
goto _start;
}
}
...
return;
}

bool TileArchiverWriterEx::isTileDataReady(TileArchiverWriterEx::TileElement &tile)
{
return tile._flags.testAndSetOrdered(getFullFlag(), getFullFlag()+1);
}

void TileArchiverWriterEx::mergeTileSubData(TileArchiverWriterEx::TileElement &tile)
{
auto size = getTileSize();
auto sub_width = size.width()/2;
auto sub_height=size.height()/2;
cv::Mat result(size.height(), size.width(), CV_8UC3);
cv::Rect dst_rect[4] {
cv::Rect(0, 0, sub_width, sub_height)
, cv::Rect(sub_width, 0, sub_width, sub_height)
, cv::Rect(0, sub_height, sub_width, sub_height)
, cv::Rect(sub_width, sub_height, sub_width, sub_height)
};
for(int i=0; i<4; ++i)
{
if( !tile._sub_data.value(i).empty()){
tile._sub_data[i].copyTo(result(dst_rect[i]));
tile._sub_data[i].release();
}
}
setTileData(tile, result);
}