高倍扫描

高倍扫描从细胞类别上分为白细胞,红细胞,巨核细胞三类,在细节上有所差别,但是本质上是相同的。我们只讨论白细胞的高倍扫描。

高倍扫描有两种扫描方式,一种是外周血常用的离散方式,即根据提供的坐标,拍摄一张张独立的照片,照片和照片之间没有关联。另一种是连续扫描,和低倍扫描类似,提供的是一个矩形的范围,拍摄连续的照片。和低倍扫描不同的是高倍连续扫描下,相邻的图片之间是有重叠区域的,我们需要将这些图片做拼接和切割,重新生成无重叠的固定大小的图片。

为什么外周血要使用离散扫描方式,而骨髓要使用连续扫描方式?这是一种时间和空间的权衡。在外周血中,白细胞的密度是比较低的,一般需要几个百倍视野的范围内才能有一个白细胞,如果使用连续扫描,会浪费大量的存储空间。相反,一般骨髓样本中的白细胞密度相当高,从几个到几十个,这样我们就不可能拍摄单独的照片了。对于一些特殊的病例,比如急性白血病,白细胞大量增生,我们就应当改用连续扫描,而另一方面,比如对化疗晚期的病人,其体内的白细胞极其稀少,即使是骨髓,一张玻片可能都找不到一百个,此时我们自然需要使用离散扫描的方式。

同样的道理,我们拍摄红细胞的时候必然是连续模式,而巨核细胞自然就是离散方式了。

我们的设备使用的是百倍油镜,即在做百倍扫描时,需要首先在观察区域滴油。跳过这些细节,高倍扫描的处理流程和低倍扫描很类似。

设备高倍扫描的接口为:

1
2
3
4
5
int HighScanMoving( std::vector<std::pair<std::pair<int,int>, 
std::pair<int,int>>> points,
int scan_method,
bool is_oil,
ScanInfo* scan_info)

其参数:

  • points:为一系列点的集合,表示要扫描的范围(连续扫描时)或离散视野的位置。在上一章已经介绍过了。在doLowScan()中,扫描点被_impl->_lowAnalyzer计算并保存到_impl->_whiteScanRegions里面。
  • scan_method:扫描方式。指明是离散模式还是连续模式
  • is_oil:是否在扫描前执行滴油操作。因为我们可能会执行不止一轮高倍扫描,大多数情况下,只有第一轮高倍扫描时需要滴油和涂抹。
  • scan_info:在连续扫描时,其columnrow分别给出了扫描的行数和列数。如果是非连续扫描,column是要扫描的点数,row是1。

我们先考虑离散扫描,它的处理相对简单直接,拍摄到的照片直接保存就可以,不涉及到其他的处理。我们先编写基本的流程:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
bool ScanTask::doHighScan(int cell_type)
{
const auto& regions = scanRegions(cell_type);
RETVAL_LOG_IF(regions.empty(), true, QString("细胞类别%1的扫描区域为空").arg(cell_type));
auto method = scanMethod(cell_type);
auto ratio = _impl->_highZoom / _impl->_lowZoom;
ScanInfo scanInfo;
for(int round=0; round<regions.size(); ++round)
{
auto viewDumpPath = viewDumpDirPath(EViewType::eHighView, cell_type,method, round);
auto sliceDumpPath = sliceDumpDirPath(cell_type, method, round);
emit sigPrepareDump(viewDumpPath);

auto rects = transformScanRegions(regions.at(round), method, _impl->_imageSize, ratio);
auto ret = _impl->_device->HighScanMoving(regions.at(round), method, !_impl->_oiled, &scanInfo);
emit sigScanStart(scanInfo.column, scanInfo.row, method);
_impl->_oiled = true;

int total = scanInfo.column * scanInfo.row;
int index = 0;

std::vector<PicInfor> picList;
while(true)
{
ret = _impl->_device->NormalStatusCheck();
if(ret != DCM_HIGH_SCANMOVING_START && ret != DCM_HIGH_SCANMOVING_FINISHED)
{
return false;
}
else if (index==total && ret==DCM_HIGH_SCANMOVING_FINISHED)
{
break;
}
else
{
_impl->_device->GetRealtimePics(&picList);
while(index<picList.size() && index<total)
{
auto& pic = picList.at(index);
auto column = pic.point.first;
auto row = pic.point.second;
auto imgMat = cvtPicToMat(pic);
//TODO: 连续扫描特殊处理
//......
emit sigImageCaptured({column, row}, imgMat);
emit sigDumpImage(imgMat, column, row, _impl->_highImageFormat, _impl->_highImageQuality, viewDumpPath);

if(method==EScanMethod::eSingleScan)
{
auto path = QString("%1/%2").arg(viewZipDirPath(EViewType::eHighView, cell_type, round)).arg(imageFileName(column,row,_impl->_highImageFormat));
emit sigPackViewInfo(EViewType::eHighView, cell_type, round, index, _impl->_imageSize, rects.at(index), method, path );
}
FREE_AND_CLEAR_BUFFER(pic.matPic);
index++;
}
}
}
//TODO: 如果是连续视野,需要等待拼接结束
//...
if(method==EScanMethod::eMatrixScan)
{
auto path = viewZipDirPath(EViewType::eHighView, cell_type, round);
//TODO: 计算拼接后的连续视野的大小
QSize size{};
//保存视野信息到数据库中
emit sigPackViewInfo(EViewType::eHighView, cell_type, round, 0, _impl->_imageSize, rects.at(0), method, path );
//压缩Slice的目录
emit sigZipDir(sliceDumpPath, basePath());
}
else
{
emit sigZipDir(viewDumpPath, basePath());
}
}
TRACE() << QString("细胞type=%1的高倍扫描已经结束").arg(cell_type);
return true;
}

函数首先会获取上个阶段计算出来的扫描区域和扫描方法。如果是非连续扫描,会只有一组结果,每个点是一个扫描位置的坐标。如果是连续扫描,可能会需要扫描多个区域,结果会给出多组数据,每一组会由四个点组成,分别是扫描范围的矩形的四个角点的坐标——虽然很奇怪,但是设备驱动就是这样要的。

对非连续视野的扫描,从设备得到的照片就是最终的图片,每张照片代表了一个“视野”,都需要写入数据库中。而如果是连续扫描,就和低倍连续扫描一样,最后拼起来的大图才是一个“视野”,高倍连续扫描和低倍连续扫描的区别只是在于低倍扫描不需要做复杂的拼图,直接将拍摄到的照片作为“Slice”处理。

对高倍连续扫描,我们需要将拍摄到的照片发给一个”Jointer”去做拼接处理,并生成Slice。当前我们还没有实现,这部分代码先空着。同样,我们暂时无法计算出拼接后的视野的实际大小。除此之外,代码就已经完成了。

这里唯一需要注意的是transformScanRegions(),它会将高倍视野的区域位置坐标转换到低倍视野中。这个功能是给在低倍连续图中显示扫描的高倍扫描的视野的位置的,对扫描本身没有影响,我们不需要太关注它。

初测试doHighScan()

虽然还没有最终完成连续扫描功能,但是我们已经可以编写测试用例对doHighScan()做测试了。我们使用相同的方式,通过统计signal的个数来做测试。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
void ScanTaskTester::test_doHighScan_Single()
{
//QSKIP("高倍逐点扫描");
const int HIGH_COLUMNS = 12;
const int HIGH_ROWS = 1;
SCAN_PREPARE()
scaner.setHighScanCount(HIGH_COLUMNS, HIGH_ROWS);
task.setScanMethod(ECellType::eWhiteCell, EScanMethod::eSingleScan);
task.setScanRegions(ECellType::eWhiteCell, {
// 非连续扫描只会提供一个round
{
{{1,1},{44,101}},
...
{{2,1},{1897,1938}},
}
});

connect(&task, &ScanTask::sigScanStart, this, [](int a, int b)
{
QCOMPARE(a, HIGH_COLUMNS);
QCOMPARE(b, HIGH_ROWS);
});
QSignalSpy spy31(&task, &ScanTask::sigPrepareDump);
QSignalSpy spy32(&task, &ScanTask::sigScanStart);
QSignalSpy spy33(&task, &ScanTask::sigImageCaptured);
QSignalSpy spy34(&task, &ScanTask::sigDumpImage);
QSignalSpy spy35(&task, &ScanTask::sigPackViewInfo);
QSignalSpy spy36(&task, &ScanTask::sigZipDir);

QCOMPARE(task.doHighScan(ECellType::eWhiteCell), true);

QCOMPARE(spy31.count(), 1);
QCOMPARE(spy32.count(), 1);
QCOMPARE(spy33.count(), HIGH_COLUMNS*HIGH_ROWS);
QCOMPARE(spy34.count(), HIGH_COLUMNS*HIGH_ROWS);
QCOMPARE(spy35.count(), HIGH_COLUMNS*HIGH_ROWS);
QCOMPARE(spy36.count(), 1);
}

同样我们也可以测试未完成的连续扫描。这样还可以先验证一下Jointer缺失下的行为。我们先测试一个最简单的场景——只有一个区域的时候:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
void ScanTaskTester::test_doHighScan_Matrix()
{
//QSKIP("test doHighScan() with matrix scan mode");
const int HIGH_COLUMNS = 5;
const int HIGH_ROWS = 5;
SCAN_PREPARE()
scaner.setHighScanCount(HIGH_COLUMNS, HIGH_ROWS);
task.setScanMethod(ECellType::eWhiteCell, EScanMethod::eMatrixScan);
task.setBasePath("D:/temp/testscan");
task.setScanRegions(ECellType::eWhiteCell, {
// round0
{
{{1,2},{1844,15}}, //top-left
{{2,2},{679 ,15}}, //top-right
{{2,2},{679 ,1204}}, //bottom-right
{{1,2},{1844,1204}} //bottom-left
},
// round1
//{
//{{3,3},{1844,15}}, //top-left
//{{4,3},{679 ,15}}, //top-right
//{{4,3},{679 ,1204}}, //bottom-right
//{{3,3},{1844,1204}} //bottom-left
//},
});

QCOMPARE(task.doHighScan(ECellType::eWhiteCell), true);

QSignalSpy spy31(&task, &ScanTask::sigPrepareDump);
QSignalSpy spy32(&task, &ScanTask::sigScanStart);
QSignalSpy spy33(&task, &ScanTask::sigImageCaptured);
QSignalSpy spy34(&task, &ScanTask::sigDumpImage);
QSignalSpy spy35(&task, &ScanTask::sigPackViewInfo);
QSignalSpy spy36(&task, &ScanTask::sigZipDir);

QCOMPARE(task.doHighScan(ECellType::eWhiteCell), true);

QCOMPARE(spy31.count(), 1);
QCOMPARE(spy32.count(), 1);
QCOMPARE(spy33.count(), HIGH_COLUMNS*HIGH_ROWS);
QCOMPARE(spy34.count(), HIGH_COLUMNS*HIGH_ROWS);
QCOMPARE(spy35.count(), 1);
QCOMPARE(spy36.count(), 1);
}

执行集成测试

到现在,我们已经完成了非连续扫描的扫描流程了。我们可以把整个流程串起来测试一下。我们增加一个test_scan(),把几个部分都加进来,模仿一个真正的扫描流程行为。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
void ScanTaskTester::test_scan()
{
QSKIP("full scan");
SCAN_PREPARE()
// 初始化测试参数
TBarParserRule rule1;
rule1._sample_id_include = true;
rule1._case_no_include = true;
rule1._sample_id_location = QSize(0,10);
rule1._case_no_location = QSize(11, 8);
rule1._sample_type_location = -1;
rule1._sample_type_bm_code = "B";
rule1._sample_type_pb_code = "P";
rule1._sample_type_default = "P";
configer->SetBarParserRule(rule1);
TRACE() << TSHOW(configer->Hospital());
configer->SetHospital("RENJI");
TRACE() << "修改后" << configer->Hospital();
configer->SetPbCount(100);
configer->SetBmCount(100);
configer->SetRedCount(1000);
configer->SetPbScanMethod(EScanMethod::eSingleScan);
configer->SetBmScanMethod(EScanMethod::eMatrixScan);
configer->SetScanWhiteCell(true);
configer->SetScanRedCell(false);
configer->SetScanMegaCell(false);

// 装配
QCOMPARE(task.doPrepare({}), true);


// 测试预览图
{
QSignalSpy spy11(&task, &ScanTask::sigPrepareDump);
QSignalSpy spy12(&task, &ScanTask::sigPreviewed);
QSignalSpy spy13(&task, &ScanTask::sigDumpImage);
QSignalSpy spy14(&task, &ScanTask::sigPackViewInfo);
QSignalSpy spy15(&task, &ScanTask::sigPackSlideInfo);

QCOMPARE(task.doPreview(), true);

QCOMPARE(spy11.size(), 1);
QCOMPARE(spy12.size(), 1);
QCOMPARE(spy13.size(), 2);
QCOMPARE(spy14.size(), 2);
QCOMPARE(spy15.size(), 1);

QCOMPARE(task.scanMethod(ECellType::eWhiteCell), EScanMethod::eSingleScan);
QCOMPARE(task.cellCount(ECellType::eWhiteCell), 100);
QCOMPARE(task.cellCount(ECellType::eRedCell), 0);
QCOMPARE(task.cellCount(ECellType::eMegaCell), 0);
QCOMPARE(task.sampleType(), ESampleType::ePbType);
}

TRACE() << "执行低倍扫描";
{
const int LOW_COLUMNS = 4;
const int LOW_ROWS = 4;
scaner.setLowScanCount(LOW_COLUMNS,LOW_ROWS);
connect(&task, &ScanTask::sigScanStart, this, [](int a, int b)
{
QCOMPARE(a, LOW_COLUMNS);
QCOMPARE(b, LOW_ROWS);
});
QVERIFY2(task.doLowScan(), "low scan failed");
TRACE() << "待扫描区域: round=" << task.scanRegions(ECellType::eWhiteCell).size() << ", position=" << task.scanRegions(ECellType::eWhiteCell).at(0).size();
// 检查低倍扫描的存储目录是否存在...
auto dumpPath = task.viewDumpDirPath(EViewType::eLowView, 0, 0, 0);
QCOMPARE(QDir(dumpPath).exists(), true);
QCOMPARE(QDir(dumpPath).entryList(QDir::Filter::Files).size(), LOW_COLUMNS*LOW_ROWS);

}

#if 1 // 非连续扫描
const int HIGH_COLUMNS = 12;
const int HIGH_ROWS = 1;
task.setScanMethod(ECellType::eWhiteCell, EScanMethod::eSingleScan);
#else // 连续扫描
const int HIGH_COLUMNS = 5;
const int HIGH_ROWS = 5;
task.setScanMethod(ECellType::eWhiteCell, EScanMethod::eMatrixScan);
#endif
TRACE() << QString("执行高倍扫描. 将扫描%1%2列图像").arg(HIGH_COLUMNS).arg(HIGH_ROWS);
scaner.setHighScanCount(HIGH_COLUMNS, HIGH_ROWS);
disconnect(&task, &ScanTask::sigScanStart, this, nullptr);
connect(&task, &ScanTask::sigScanStart, this, [](int a, int b)
{
QCOMPARE(a, HIGH_COLUMNS);
QCOMPARE(b, HIGH_ROWS);
});

QCOMPARE(task.doHighScan(ECellType::eWhiteCell), true);


emit task.sigCompletePacking();

auto packing = task.packing();
QVERIFY2(QDir().exists(packing->dbPath()), "db not exists");
QVERIFY2(QDir().exists(packing->archivePath()), "archive not exist");

}

我们通过检查扫描目录里面的文件数量来判断扫描是否成功。注意一个小细节,在使用QDir::entryList()的时候一定要给出过滤条件。不然,会将...给计算进去。

关于高倍连续扫描

我们在低倍扫描的时候已经认识了连续扫描了,在前面又接触了非连续扫描的方式。两种扫描方式,从逻辑上,区别在于非连续扫描得到的每张照片都是一个独立的“视野”,而一次连续扫描得到的整个覆盖的区域是一个“视野”。在低倍扫描中,因为仅仅是要从中寻找目标细胞,我们对照片的对齐没有精确的要求,可以容忍存在一定的偏差,即使存在重复,也可以通过高倍扫描中的准确的物理位置来识别处理。而高倍连续扫描的结果是要观察的,如果还对不齐就不可接受了。因此需要对图片做匹配拼接处理。

很多人都使用过使用相机或手机做大范围的连续拍摄,并由软件对照片做拼接形成一张照片。我们的连续扫描也是相同的道理。区别在于手机或相机的拍摄更多的需要考虑镜头的扭曲,亮度和色彩的变化,相机的抖动等,而对我们而言这些问题的影响相对要小的多,更多的是要考虑照片数量上去之后的快速处理问题。另外也需要考虑相机镜头暗角带来的亮度不一致的问题。在本文中,我们主要关心的是速度和资源消耗问题。

我们定义一个专门的类来做图像拼接和切割工作:它接收扫描过程中接收的照片,执行拼接并输出拼接切割后的图像。从这个角度看,我们发现,其实不做拼接也是一种“拼接”,因此我们可以将低倍高倍扫描的图像处理都交给这个Jointer的不同派生类来实现。我们先考虑高倍扫描。定义接口IImageJointer

说到这里,再说一句题外话,医学显微扫描是数据量极为庞大的工作。我们曾经见过某国内厂商宣称自己的百倍扫描支持全片扫描,规格是3分钟扫完。呵呵,全片扫描,百倍镜头,大约是10万数量级的照片,500万像素级别的相机,目前为止,去掉虚标的成本,靠谱的帧率,也就是30~40FPS这个数量级。那么算一下就知道了,就算按照50FPS,10万帧照片,哪怕镜头一动不动不用对焦,不用运动,也要2000秒才能拍完。再看存储,就算我们不做拼接计算,图像编码耗时为0,按照每张照片平均3M计算,也是300G的数据量,就算你用固态硬盘做存储,SSD的固态硬盘撑破天500M/s的顺序写速度,又要写多久?哪怕是豪横到用NVME。这就和另一家厂商宣称自己的LIS系统支持3000用户在线一样,从2千万用户数据中检索一个数据用时10毫秒一样。

我曾经访问过一家关系亲密的做20倍组织扫描的厂商,他们为了实现1分钟扫描一张玻片(1.5x1.5cm的范围),使用的是双路至强CPU,万兆以太网接口的镜头,固态硬盘做存储缓冲。

IImageJointer接口类定义

接下来我们先定义接口IImageJointer。它会派生出两个子类:DummyImageJointerHighImageJointer。其中DummyImageJointer只是一个占位符,实际上什么也不做,用于非连续扫描使用;而HighImageJointer则用于连续扫描的图像拼接使用。ImageJointer的生命期在每轮(round)扫描的范围内,所有细胞类型,扫描模式,扫描轮次等都被slice_path所屏蔽。

我们提供了两个接口:prepare()start(),一个设置的是属性信息,一个设置的是运行信息。根本原因是把可配置的参数放到prepare()里面了。我们实际上的类里面这些参数都有默认值,也就是它并不是必须的。

以后还考虑会增加一个NoMatchJointer,用于初始十倍图,以统一处理流程

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
class IImageJointer : public QObject
{
Q_OBJECT
public:
explicit IImageJointer(QObject *parent = nullptr) : QObject(parent){}
~IImageJointer() = default;

virtual void prepare(QSize image_size, QSize slice_slide, QPoint std_offset, int image_channels) = 0;
virtual void start(int columns, int rows, QRect rect, QString slice_path, bool parallel_run, bool make_tile) = 0;
virtual void add(QPoint pos, const cv::Mat& img) = 0;
virtual void waitForFinished() = 0;
// 参数设置
virtual void setSliceSize(const QSize& size) override;
virtual void setStdOffset(const QPoint& offset) override;
virtual void setMaxDeviate(int val) override;
virtual void setSlicePath(const QString& path) override;
virtual void setImageChannels(int v) override;
virtual int sliceColumns() const = 0;
virtual int sliceRows() const =0;
virtual int sliceWidth() const = 0;
virtual int sliceHeight() const = 0;
signals:
};

首先我们定义一个用于非连续扫描的DummyImageJointer类。这个类实际上是一个Dummy类,什么也不用做。

然后是用于高倍连续扫描的HighImageJointer

实现HighImageJointer

工作原理

先看一下我们设计的HighImageJointer的内部的工作流程:

调用者ScanTask::doHighScan()每接受到一张照片,就调用接口HighImageJointer::add()将图像传进来。它会为这张图片启动一个匹配线程,计算出这张照片和前一张照片,以及它顶上的照片的匹配,得到相对位置,并选择和它匹配度更高的那张照片作为它的定位锚点。

另外一个工作线程,切分线程splitProc,会监测匹配结果,当完成一行扫描后,就计算这一行照片的全局坐标,并切割出拼接的切片图图。对每张图片,它会再启动一个工作线程,来完成图像的渲染,保存工作。如果需要生成金字塔图像集,也由这个线程来做。

另外,我们还会有一个垃圾收集线程,负责检测不再使用的图像文件,并释放它们。

doMatch(): 计算图像的匹配

要首先图像拼接,首先要确定两张相邻图片的重叠位置。我们面临的是一个很特化的场景,相机位置不变,每次移动相同的位移,视场不存在变形(不考虑镜头造成的畸变)。在这种情况下,我们使用OpenCV提供的基本的图像匹配方法matchTemplate()就可以了。这个函数用于计算在图像src中寻找图像tmpl的匹配情况,匹配情况被放到result里面,我们需要从中寻找最大或最小值(根据使用的method)。在这里,我们使用归一化相关系数方法,实测各种方法,发现在我们的使用场景下,它对匹配有比较好的敏感性。完全匹配时它的值为1.0,完全不匹配时为0。更具体的信息可以查阅OpenCV的文档。

在进行连续区域扫描时,相机是按照蛇形方式运动,先从左向右扫描第一行,然后移动到下一行,再从右向左扫描。这样,大部分照片都有两个相邻的前序图片:一个是前一张(扫描行首除外),一个是顶上一张(第一行除外)。我们这里使用的一种简化的匹配算法就是使用这种方式,每张照片都和它的两个前序照片进行比较,寻找匹配度最高的那个作为它的锚点。在这种方式下,整个的连续视野也是按照扫描的顺序一点点呈现出来的,比较直观和简单。

当然,我们必须考虑万一我们遇到了大片的连续空白区域,导致匹配无效的情况。对这个问题,主要是工程上的思路:首先,在选区的时候就会尽量排除大片空白的情况;其次,一张照片的两个相邻方向都是空白区域的概率是很低很低的,而且,在这种情况下,实际上,一张图片放的位置怎么样也大概率是看不出来的,我们就使用它的“理论”位置来锚定就是了。遇到这种情况,我们将这张图片标记为位置不可信,从而它的后续照片尽量不使用它作为锚点,从而将拼接错误的影响限制到最低。总而言之,针对匹配失败的问题,可以想出各种解决方案。对这些方案的实现多寡,就是学校的玩具和真实的产品之间的差别。

笔者还曾实现过另一种匹配机制,让一张照片和它四周的四张照片做匹配,最后从某个连续匹配度最高,内容最丰富的“种子”区域开始向四周扩展,最后得到一个关注“热点”区域的拼接实现。它是很特化的,专门用于那种内容很稀疏,很难匹配的场景(主要是某些组织的显微图像)。自然,这种方式的算法复杂度和资源消耗就比这里的这种算法要大得多,它是解决了有无问题之后的优化手段。对于我们这本书来说就过于喧宾夺主了,并且,一般来说,血细胞涂片也不需要搞这么复杂。我们这里的简单算法已经能够用于几乎所有的血液涂片了,对于极少数情况,给用户一个提示就可以了。其实根本不影响最终的判读结果。

说句题外话,写一个能“跑”的软件是很容易的,但是写一个能“用”的软件则是困难,枯燥,乏味的事情,大量的时间是消耗在解决这些各种各样的异常情况的,也是更考验开发人员心智的地方。

首先来看图像匹配的计算函数doMatch()。这个函数是匹配的计算基础,它会被放到线程中多次调用。为了便于测试,我们将其定义为static类型,让它独立出来,不依赖于上下文。

这个函数大部分代码是匹配区域的选择和计算。它会根据传入的两个视野的行列编号来判断它们的相对位置,并在理想的匹配位置周边进行匹配。参数std_offset是理想的偏移量,其中,std_offset.x()是水平运动时的水平位移量(以像素计算),std_offset.y()是垂直运动时垂直方向的位移量。这两个值都是可以在扫描设备中进行配置的。最后会选出匹配度最高的位置,并计算出两张图片的偏移量,以及与理论偏移量之间的偏差(用于后期的矫正使用,和本功能无关)。另外,在matchTemplate()调用之后,我们还对结果做了一次平滑,以期平滑出现的奇点,这个,只是理论上的可能而已。

我们看一下代码。为了便于测试,我们将其实现为static类型的方法,这使得它的参数有一点冗长。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
HighImageJointer::MatchResult HighImageJointer::doMatch(QPoint cur_pos, QPoint src_pos, cv::Mat cur_img, cv::Mat src_img, QPoint std_offset, int delta)
{
int width = cur_img.cols;
int height = cur_img.rows;
int overlap_x, overlap_y;
EMatchDir matchDir;
int src_left, src_top, src_width, src_height, tmpl_left, tmpl_top, tmpl_width, tmpl_height;

if(cur_pos.x() == src_pos.x()+1 && cur_pos.y() == src_pos.y())
{
matchDir = EMatchDir::eMatchLeft;
overlap_x = std_offset.x();
overlap_y = 0;
src_left = std::abs(overlap_x - delta);
src_top = 0;
src_width = width - src_left;
src_height = height;

tmpl_left = 0;
tmpl_width = (width - std::abs(overlap_x)) * 2 / 3;
tmpl_top = std::abs(overlap_y) + delta;
tmpl_height = height - 2*std::abs(overlap_y) - 2*delta;
}
else if (cur_pos.x() == src_pos.x()-1 && cur_pos.y()==src_pos.y())
{
matchDir = EMatchDir::eMatchRight;
overlap_x = - std_offset.x();
overlap_y = 0;
src_left = 0;
src_top = 0;
src_width = width - std::abs(overlap_x) + delta;
src_height = height;

tmpl_left = width - (width-std::abs(overlap_x))*2/3;
tmpl_width = (width - std::abs(overlap_x))*2/3;
tmpl_top = std::abs(overlap_y) + delta;
tmpl_height = height - 2*std::abs(overlap_y) - 2*delta;
}
else if (cur_pos.x()==src_pos.x() && cur_pos.y()==src_pos.y()+1)
{
matchDir = EMatchDir::eMatchUp;
overlap_x = 0;
overlap_y = std_offset.y();
src_left = 0;
src_width = width;
src_top = std::abs(overlap_y) - delta;
src_height = height - std::abs(overlap_y) + delta;

tmpl_left = std::abs(overlap_x) + delta;
tmpl_width = width - 2*std::abs(overlap_x) - 2*delta;
tmpl_top = 0;
tmpl_height = (height-std::abs(overlap_y))*2/3;
}
else if (cur_pos.x()==src_pos.x() && cur_pos.y()==src_pos.y()-1)
{
matchDir = EMatchDir::eMatchDown;
overlap_x = 0;
overlap_y = - std_offset.y();
src_left = 0;
src_width = width;
src_top = 0;
src_height = height - std::abs(overlap_y) + delta;

tmpl_left = std::abs(overlap_x) + delta;
tmpl_width = width - 2*std::abs(overlap_x) - 2*delta;
tmpl_top = height - (height-std::abs(overlap_y))*2/3;
tmpl_height = (height-std::abs(overlap_y))*2/3;
}
else
{
TRACE() << "invalid parameter: " << TSHOW(cur_pos) << TSHOW(src_pos);
return {false, -1, QPoint(), QPoint(), {-1, -1}};
}
cv::Rect src_rect, tmpl_rect;
try
{
src_rect = cv::Rect(src_left, src_top, src_width, src_height);
tmpl_rect = cv::Rect(tmpl_left, tmpl_top, tmpl_width, tmpl_height);
auto src = src_img(src_rect);
auto tmpl = cur_img(tmpl_rect);

cv::Mat match_result;
cv::matchTemplate(src, tmpl, match_result, cv::TM_CCOEFF_NORMED);
cv::blur(match_result, match_result, cv::Size(3,3));
double maxVal;
cv::Point maxPos;
cv::minMaxLoc(match_result, nullptr, &maxVal, nullptr, &maxPos);
int offset_x = maxPos.x + src_rect.x - tmpl_rect.x;
int offset_y = maxPos.y + src_rect.y - tmpl_rect.y;
int delta_x = (matchDir==EMatchDir::eMatchLeft || matchDir==EMatchDir::eMatchRight) ? offset_x - overlap_x : offset_x;
int delta_y = (matchDir==EMatchDir::eMatchLeft || matchDir==EMatchDir::eMatchRight) ? offset_y : offset_y - overlap_y;
return {true, maxVal, {offset_x, offset_y},{delta_x, delta_y}, {overlap_x, overlap_y}};

}
catch(const cv::Exception& e)
{
TRACE() << "比较视野" << TSHOW(cur_pos) << TSHOW(src_pos)
<< TSHOW(src_rect) << TSHOW(tmpl_rect)
<< "出现了错误: " << e.what();
return {false, };
}
}

测试doMatch()函数

因为doMatch()的关键性,我们要尽早开始它的测试。但是匹配的准确情况还真的不容易获得。我们主要是大致判断一下差不多就可以了:就是判断最终的位置是不是在“期望”位置附近,以及匹配度的值在不在范围里面。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
void ImageJointerTester::test_doMatch()
{
QString _dataRoot{R"(D:\DataStore\TestData\scanstub\2024-02-19-092454\higher\matrix)"};
QPoint cur_pos{1,1};
QPoint prev_pos{0,1};
QPoint up_pos{1,0};
QPoint down_pos{1,2};
QPoint post_pos{2,1};
cv::Mat cur_img = cv::imread(QString("%1/%2_%3.jpg").arg(_dataRoot).arg(cur_pos.x()).arg(cur_pos.y()).toStdString());
cv::Mat prev_img = cv::imread(QString("%1/%2_%3.jpg").arg(_dataRoot).arg(prev_pos.x()).arg(prev_pos.y()).toStdString());
cv::Mat up_img = cv::imread(QString("%1/%2_%3.jpg").arg(_dataRoot).arg(up_pos.x()).arg(up_pos.y()).toStdString());
cv::Mat down_img = cv::imread(QString("%1/%2_%3.jpg").arg(_dataRoot).arg(down_pos.x()).arg(down_pos.y()).toStdString());
cv::Mat post_img = cv::imread(QString("%1/%2_%3.jpg").arg(_dataRoot).arg(post_pos.x()).arg(post_pos.y()).toStdString());

QVERIFY2(!cur_img.empty(), "invalid cur_img");
QVERIFY2(!prev_img.empty(), "invalid prev_img");
QVERIFY2(!up_img.empty(), "invalid up_img");
QVERIFY2(!down_img.empty(), "invalid down_img");
QVERIFY2(!post_img.empty(), "invalid post_img");

HighImageJointer jointer;
const QPoint ori_offset{2203,1843};
const int delta = 30;
const double thd = 0.95;

{
TRACE() << "向左匹配: ";
auto result = HighImageJointer::doMatch(cur_pos, prev_pos, cur_img, prev_img, ori_offset, delta);
QVERIFY2(result._degree>thd, "match failed!");
QCOMPARE(result._ideaOffset.x(), ori_offset.x());
QCOMPARE(result._ideaOffset.y(), 0);
QVERIFY2(std::abs(result._offset.x()-result._ideaOffset.x())<delta, "left match error!");
QVERIFY2(std::abs(result._offset.y())<delta, "vert match error!");
TRACE() << TSHOW(result._offset) << TSHOW(result._degree) << TSHOW(result._ideaOffset);
}

{
TRACE() << "向右匹配: ";
...
}
...
}

我们还打算对这个函数做一下Benchmark测试,看看到底做匹配要花多少时间。QTEST提供了BENCHMARK宏。我们可以使用它:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
void ImageJointerTester::test_doMatch_BenchMark()
{
TRACE() << "测试doMatch()的性能";
QString dataRoot{R"(D:\DataStore\TestData\scanstub\2024-02-19-092454\higher\matrix)"};
QPoint cur_pos{1,1};
QPoint prev_pos{0,1};
cv::Mat cur_img = cv::imread(QString("%1/%2_%3.jpg").arg(dataRoot).arg(cur_pos.x()).arg(cur_pos.y()).toStdString());
cv::Mat prev_img = cv::imread(QString("%1/%2_%3.jpg").arg(dataRoot).arg(prev_pos.x()).arg(prev_pos.y()).toStdString());

QVERIFY2(!cur_img.empty(), "invalid cur_img");
QVERIFY2(!prev_img.empty(), "invalid prev_img");

HighImageJointer jointer;
const QPoint ori_offset{2203,1843};
const int delta = 30;

QBENCHMARK{
HighImageJointer::doMatch(cur_pos, prev_pos, cur_img, prev_img, ori_offset, delta);
};
}

在一台i5-1135G7的轻便笔记本上运行结果是27ms,这个速度已经比我们想象的要快多了,详细切换到i7上会更快,按照这个性能,实际上做多线程都有点不必要了。

基本数据结构

我们先看两个重要的数据结构,它们本来可以定义到.cpp文件中,但是为了单元测试使用,我们将它们提到头文件中,放到ImageJointer类里面定义。

先看第一个数据结构ViewElement

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
struct ViewElement
{
QPoint _pos; // 行列号
QSize _size; // 图像大小
int _channels; // 通道数
cv::Mat _srcMat; // 图像数据
bool _captured{false}; // 图像数据是否就绪
bool _anchored{false}; // 图像是否已经定位
bool _credit{false}; // 匹配数据是否可信
bool _located{false}; // 是否计算完成了全局坐标
QPoint _anchor; // 锚点视野
int _anchorType; // 锚点标记0:左, 1:右, 2:上, 就是EMatchDir的值
QPoint _offset; // 视野左上角相对于锚点的偏移量
double _matchRatio{-1.0}; // 匹配度
QRect _grect; // 拼接后的全局坐标
QAtomicInteger<int> _viewRefCount{255};
// 下面是调试数据, [0]是pre, [1]是up
ViewElement()=default;
ViewElement(int col, int row, int ref_count): _pos{col,row}, _viewRefCount{ref_count}{};
};


在这里的View使用的扫描仪硬件的术语,指的是扫描仪拍摄的一张照片。它表示扫描得到的一张照片,它的每个字段的含义在注释中写的很清楚了。每个View的_srcMat里面保存了图像的数据,当View数量很高的时候,占用的内存资源是十分可观的——对一张2448x2048的四百万像素的照片,会占用大约15M内存——所以我们需要在不用的时候及时释放它。我们会利用cv::Matrelease()方法来释放它的资源。

ImageJointer中的几个名词要明确一下其含义:

  • View,或者说“视野”,指的是显微镜拍摄得到的一张照片。
  • Slice,或者说“切片”,指的是拼接后又裁出来的一张图片,这些Slice无缝拼接起来组成一个完整的扫描区域。
  • Tile,或者说“瓦片”,特指当我们制作图像金字塔的时候的每个Slice。当制作金字塔的时候,每四张底层图片被缩小合并成一张上层图片。在这个特定的语义环境下,我们称Slice为Tile。

第二个结构SliceElement表示的是一个Slice的数据。我们会约束一个Slice的尺寸不会超过一个View的尺寸,这样,一个Slice的数据最多可能会来自四个相邻的View的部分数据。它们的位置信息和图像数据会保存在Viewpart里面。当凑齐了数据,就会拼接渲染出这个Slice的完整的图像。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
struct SliceElement
{
// 表示View中的一个部分(或者说Slice的一个部分).
struct ViewPart
{
QPoint _viewPos; // View的行列号
QRect _viewRect; // 在View中的位置
QRect _sliceRect; // 在Slice中的位置
cv::Mat _data; // 图像数据
};
QPoint _pos; // Slice的行列号
bool _isColEnd, _isRowEnd; // 是否是列尾部/行尾
QRect _rect; // 区域坐标. (可以用pos算出来)
QList<ViewPart> _parts; // 组成slice的各个图片部分
SliceElement(const QPoint& pos, const QSize& size, bool is_col_end=false, bool is_row_end=false )
: _pos{pos}, _isColEnd{is_col_end}, _isRowEnd{is_row_end}, _rect{pos.x()*size.width(), pos.y()*size.height(), size.width(), size.height()}, _parts{}
{
}
SliceElement() : SliceElement(QPoint{-2,-2},QSize{2448,2048},false,false){}
bool valid() const {return _pos.x()>=-1 && _pos.y()>=-1; }
};

下面是HighImageJointer_impl的数据结构定义的核心部分:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
struct HighImageJointer::Implementation
{
... ...
QHash<QPoint, ViewElement> _viewList;
QHash<QPoint, SliceElement> _sliceList;

QThreadPool _matchPool;
QFutureSynchronizer<void> _matchSync;
QFutureSynchronizer<void> _splitSync;
QThreadPool _slicePool;
QMutex _sliceSyncMutex;
QFutureSynchronizer<void> _sliceSync;

QFutureSynchronizer<void> _viewGCSync;
};

其中,_viewList是扫描得到的每个视野的集合,这是一个QHash,key是视野的行列号。我们使用QHash来保存视野的集合而不是更常被使用的QMap,一方面我们对它的添加顺序不感兴趣,另一方面QPoint没有定义operator <,要想在QMap中用QPoint作为key,我们还要自己定义它的实现,这事就没有必要了。

Qt里面所有的容器都不是线程安全的,但是是可重入的。我们需要分析和规划如何在多线程中尽可能的安全高效地使用它们。

首先看_viewList,每收到一张扫描图片时,会添加一个记录,然后会有这个视野的匹配线程来修改里面的内容,还会有最多两个线程来读取它的内容来做匹配。然后还会有一个线程来计算并修改它的全局坐标。也就是说,会有两个写线程,和若干个读线程。另一方面,我们知道匹配和定位是存在明确的先后关系的,在匹配结束之前,无论如何都不会定位,所以这两个写线程天然不会冲突,不需要互斥保护。因此,对具体一个视野的数据来说它是线程安全的。那么对这个_viewList,如果每个视野都向里面写数据,就会产生竞争——我们不想用锁,因为锁这种东西,一旦锁住了,根本就不知道是什么原因——要想不使用锁,要么事先把所有视野的数据都添加进去,要么所有的添加数据都放在同一个线程里面。对于_viewList来说,视野的个数最多也就不到两千个,而且一旦开始扫描,个数就是确定的,我们就采用实现创建的方式了。我们在start()函数中将所有视野的ViewElement成员都添加到_viewList里面。在后面仅仅是访问内容,而不会涉及到对哈希表本身的修改。

而对_sliceList来说则是另一种情况。要切分多少个Slice只有到最后才能知道,所以必然是动态创建的Slice,就不能提前创建了。我们就采用另一种方法——将添加的操作放在一个线程中。

然后是两个线程池_matchPool_slicePool,分别用于运行匹配的线程和生成Slice的线程(实际上是生成Tile的,单纯生成Slice根本不需要并行化)。

接下来是三个QFutureSynchronizer,分别用于同步监测匹配线程组,拼接线程,切片线程组,我们用它们的waitForFinished()来等待全部线程运行结束。QFutureSynchronizer可以使用addFuture添加多个QFuture,如果是在多线程环境中调用addFuture(),就需要使用锁来保护它。但是对我们来说,所有的调用都是在单一线程进行的,所以也不需要保护了。

最后一个QFutureSynchronizer则用来监测View的图像数据的资源释放。

我们看一下HighImageJointer的类定义的主要成分。为了篇幅,删除了不重要的内容。完整的代码请参看源码

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
class FRAMEWORKS_EXPORT HighImageJointer : public IImageJointer
{
Q_OBJECT
public:
enum EMatchDir{eMatchLeft=0, eMatchRight, eMatchUp, eMatchDown};
struct MatchResult
{
bool _valid{false}; // 是否有效. 只要做了比较就有效.
qreal _degree{-1.0}; // 匹配值. 实际上上只要大于0, 就是有效
QPoint _offset{QPoint(-1,-1)}; // 偏移量
QPoint _devia{QPoint(65535,65535)}; // 和理论偏移量的偏差
QPoint _ideaOffset{65535,65536}; // 根据理论值计算出来的偏移量
};
struct ViewElement
{
...
};

struct SliceElement
{
...
};

friend class ImageJointerTester;
explicit HighImageJointer(QObject *parent = nullptr);
~HighImageJointer();

virtual void start(int columns, int rows, QRect rect, QString slice_path, bool parallel_run, bool make_tile) override;
virtual void add(QPoint pos, const cv::Mat& img) override;
virtual void waitForFinished() override;
...

private:
void matchView(QPoint pos);
static MatchResult doMatch(QPoint cur_pos, QPoint src_pos, cv::Mat cur_img, cv::Mat src_img, QPoint std_offset, int delta) ;

void splitViewProc();
void splitLine(int current_row);

void makeSliceProc(int col, int row, bool is_last_col, bool is_last_row);
void makeSlice(int col, int row, bool is_last_col, bool is_last_row);
void makeTile(int col, int row, cv::Mat image, bool is_last_col, bool is_last_row, bool make_pyrmid);

QPoint getPrevPos(const QPoint& pos) const;
QPoint getUpPos(const QPoint& pos) const;

ViewElement& getViewElement(const QPoint& pos);
SliceElement& getSliceElement(QPoint pos);
std::tuple<int, int, int, int> getRowRange(int row) ;

void viewGCProc(int columns, int rows);
static int calcRefCount(int col, int row, int columns, int rows) ;
private:
struct Implementation;
QScopedPointer<Implementation> _impl;
};

start()函数,启动任务

在创建HighImageJointer之后,可以调用几个set方法来修改默认参数设置,然后就调用start()来启动任务。它的几个参数的含义:

  • columnsrows:要扫描的照片的行列数。通过它们可以确定照片的总数
  • slice_path:Slice的保存目录。
  • parallel_run:这个是一个调试开关,用于控制生成Slice是串行还是并行。
  • make_tile:是否生成金字塔图像数据。默认为false

函数会根据要采集的照片的行列数来构造并填充_impl->_viewList。同时还会启动切分线程splitProc()和资源收集线程viewGCProc()

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
void HighImageJointer::start(int columns, int rows, QRect rect, QString slice_path, bool parallel_run, bool make_tile)
{
_impl->_scanColumns = columns;
_impl->_scanRows = rows;
_impl->_scanRect = rect;
_impl->_sliceDumpPath = slice_path;
_impl->_parallelSlice = parallel_run;
_impl->_makeTile = make_tile;

_impl->_viewList.clear();
_impl->_sliceList.clear();
_impl->_sliceColumns = 0;
_impl->_sliceRows = 0;

for(int row=0; row<rows; ++row)
{
for(int col=0; col<columns; ++col)
{
_impl->_viewList.insert({col,row}, ViewElement(col, row, calcRefCount(col,row, columns, rows)));
}
}

_impl->_splitSync.setFuture(QtConcurrent::run(&HighImageJointer::splitViewProc, this));
_impl->_viewGCSync.setFuture(QtConcurrent::run(&HighImageJointer::viewGCProc, this, columns, rows));
}

add()添加一个视野

ScanTask::doHighScan()中采集到一张照片后,就会调用add()将图片和位置信息发送给HighImageJointer。其中,参数pos是这张照片的行列号,而img则是图像的Mat数据。函数会检查这个View是否是第一个视野,如果是,它是初始定位锚点,我们直接设置它的全局坐标。如果不是,就启动一个工作线程matchView()来指定匹配工作。

这里有一点关于_impl->_oriOffset的说明。因为我们需要做拼接,各个扫描的视野的位置是有浮动的,那么最后拼接完毕后就需要裁掉边上没有对齐的内容,不然边上会出现没有图像的情况,不够美观(起始啥影响都没有,但是没办法,就是永远不缺永远不知道自己要做的是什么的用户)。当时有人就说,不扫描到最后一行,就不可能知道左右要裁掉多少尺寸,所以没法边扫边切。他说的其实没错,但是我们并不需要一点不多裁啊。定一个一定足够富余的尺寸就是了。至于担心裁掉有用的内容就更简单了,在计算扫描区域的时候两边多加一点就可以了。

这里核心问题是,在工程上,第一你要知道自己要解决的主要问题是什么,第二是要达到目标有什么简单的方法,而不是纠结于什么理论解。

_impl->_oriOffset给的默认值是{-256,-256},即在两个方向各自让出了256个像素作为保留尺寸,或者说将视野{0,0}向左上方各偏移了256各像素。这个值有些过度保守,从我们实际扫描的结果看,大概只需要三十多个像素就足够了。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
void HighImageJointer::add(QPoint pos, const cv::Mat &img)
{
auto& cur_view = getViewElement(pos);
cur_view._srcMat = img;
if(pos.x()==0 && pos.y()==0)
{
cur_view._anchor = QPoint(-1,-1);
cur_view._offset = _impl->_oriOffset;
cur_view._grect = QRect(_impl->_oriOffset, _impl->_imageSize);
cur_view._captured = true;
cur_view._credit = true;
cur_view._viewRefCount.deref();
cur_view._anchored = true;
}
else
{
cur_view._captured = true;
cur_view._anchored = false;
cur_view._credit = false;
_impl->_matchSync.addFuture(QtConcurrent::run(&_impl->_matchPool, &HighImageJointer::matchView, this, pos));
}
}

注意我们将启动拼接线程放到最后,当这个线程开始运行时,它自身的数据已经是ready了的。线程被添加到_impl->_matchSync里面进行跟踪。

匹配线程matchView()

接下来我们看matchView()。这个函数在线程中执行,它将这个视野和它顶上的那个视野,以及它前面扫的视野做匹配,计算出最佳的锚点以及偏移量来。这里的匹配是按照扫描的顺序来的,所以当一个线程开始运行的时候,它要进行匹配的两个视野的数据一定是就绪了的,直接计算就可以了。

当完成和两个邻居的匹配计算后,选择匹配度最高的那个,记录锚点和相对于锚点的偏移量。

在实际的匹配中,不仅仅要匹配,还应该检查匹配结果的可信性。比如,如果匹配区域恰好是完全单调的一片空白区域,那么这个结果就是不可信的,就要抛弃的,或者,如果偏移量距离理想偏移量太大了,也要抛弃。为了篇幅,这些都省略掉了。

另外,还要注意,我们最后才给cur_view._anchored赋值。这个标识值是用来标识这个视野是不是完成了偏移量计算的,在拼接线程中会等待这个值。如果用锁做保护,意味着我们需要为每个视野加一个互斥锁或者信号量,这个代价有点太高。因为这个值只会有一个线程写,只会有一个线程读,并且又是简单值,我们连原子类型都不需要使用。我们在读线程中会使用等待并睡眠的方式,既不会阻塞也不会忙等。在一般的多线程中使用休眠并不是被推荐的做法。但是对我们来说并不是什么很大的问题。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
void HighImageJointer::matchView(QPoint pos)
{
auto& cur_view = getViewElement(pos);
QPoint peers[2]{getUpPos(pos), getPrevPos(pos)};
MatchResult results[2];
for(int i=0; i<2; ++i)
{
if(peers[i].x()<0 || peers[i].y()<0)
{
continue;
}
auto& peer_view = getViewElement(peers[i]);
results[i] = doMatch(cur_view._pos, peer_view._pos, cur_view._srcMat, peer_view._srcMat, _impl->_stdOffset, _impl->_moveDevia);
peer_view._viewRefCount.deref();
}
auto matchId = results[0]._degree > results[1]._degree ? 0:1;
cur_view._matchRatio = results[matchId]._degree;
cur_view._anchor = peers[matchId];
cur_view._anchorType = matchId==0 ? EMatchDir::eMatchUp : (pos.x()>peers[matchId].x() ? EMatchDir::eMatchLeft : EMatchDir::eMatchRight );
cur_view._credit = cur_view._matchRatio > _impl->_matchThd;
cur_view._offset = cur_view._credit ? results[matchId]._offset : results[matchId]._ideaOffset;
cur_view._viewRefCount.deref();
cur_view._anchored = true;
}

这个函数因为涉及到修改类属性_impl->_viewList,而_viewList又会对其他函数造成影响,导致我们无法对这个函数做持续的单元测试。总的来说,ImageJointer的实现中,各个函数都耦合在几个类属性上面,很难将其隔离开。这种高度耦合的设计并不是理想的模式。对matchView()单独测试还不是很大的问题,后面的splitViewProc()单独测试的代价更大。我们稍后会看到。

定位和切分函数splitViewProc()

splitViewProc()是一个独立的工作线程,用于“拼接”各个视野并发起切割。函数的逻辑很简单:它按照扫描的顺序检查每个View是否匹配结束了。如果没有,就简单地睡眠一会儿再继续尝试——我们前面测试过,线程完成匹配一个视野低于40ms,做两次匹配耗时不会超过80ms。这里的休眠时间只要大于这个时间就可以了——时间长一点,这个任务可以一次多跑一点,比不断等待睡眠更有效率。当一行完成后,就调用splitLine()函数切分出完成的切片行。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
void HighImageJointer::splitViewProc()
{
DEC_TRACER();
int controls[][3]{{0, _impl->_scanColumns, 1}, {_impl->_scanColumns-1, -1, -1}};
for(int row=0; row<_impl->_scanRows; ++row)
{
int idx = row % 2;
for(int col=controls[idx][0]; col!=controls[idx][1]; col+=controls[idx][2])
{
QPoint pos{col,row};
auto& view = getViewElement(pos);
while(!view._anchored)
{
QThread::msleep(100);
}
if(col+row!=0)
{
const auto& anchor_view = getViewElement(view._anchor);
view._grect = anchor_view._grect.translated(view._offset);
}
}//-- end of for(int col...
if(row==0)
{
const auto& left_view = getViewElement({0,0});
const auto& right_view = getViewElement({_impl->_scanColumns-1, 0});
_impl->_sliceColumns = right_view._grect.right() / _impl->_sliceSize.width();
}
splitLine(row);
// 释放Mat的内存
for(int col=0; col<_impl->_scanColumns; ++col)
{
auto& view = getViewElement({col, row});
view._viewRefCount.deref();
}
}
auto size = QSize(_impl->_sliceColumns*_impl->_sliceSize.width(), _impl->_sliceRows*_impl->_sliceSize.height());
//emit sigAddHighViewInfo(_impl->_scanRound, 0, size, _impl->_scanRect, _impl->_cellType, _impl->_scanMethod);
}

splitLine()函数会检测都有哪些行的Slice会收到这一行View的影响,并将每个视野中和受影响的Slice重叠的图像拷贝到对应Slice的_parts里面去。同时还会识别出哪些行的Slice是已经被完成了的,对于这些已经完成了的Slice,就会一次调用makeSliceProc()来生成对应的Slice,并可能会进而生成Tile。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
oid HighImageJointer::splitLine(int current_row)
{
// 计算这一行各个视野的上下边的范围
auto [top_up,top_down,bottom_up,bottom_down] = getRowRange(current_row);
int slice_row_begin = _impl->_sliceRows;
int slice_row_finished = bottom_up / _impl->_sliceSize.height() - 1;
int slice_row_end = current_row==_impl->_sliceColumns-1 ? slice_row_finished : bottom_down / _impl->_sliceSize.height();
// 完成已经填充完成的行:
for(int slice_row=slice_row_begin; slice_row<=slice_row_end; ++slice_row)
{
bool is_last_row = (current_row==_impl->_scanRows-1) && (slice_row==slice_row_finished);
for(int slice_col=0; slice_col<_impl->_sliceColumns; ++slice_col)
{
auto& slice = getSliceElement({slice_col, slice_row});
for(int view_col=0; view_col<_impl->_scanColumns; ++view_col)
{
const auto& view = getViewElement({view_col, current_row});
auto cp = slice._rect.intersected(view._grect);
if(cp.isValid())
{
auto s = cp.translated(-view._grect.topLeft());
auto d = cp.translated(-slice._rect.topLeft());
auto r = cv::Rect(s.x(), s.y(), s.width(), s.height());
SliceElement::ViewPart part{view._pos, s, d, view._srcMat(r).clone()};
slice._parts.push_back(part);
}
}
}
// 生成完成的slice
if(slice_row <=slice_row_finished)
{
for(int slice_col=0; slice_col<_impl->_sliceColumns; ++slice_col)
{
makeSliceProc(slice_col, slice_row, slice_col==_impl->_sliceColumns-1, is_last_row);
}
}
}
_impl->_sliceRows = slice_row_finished+1;
}

其中的函数getRowRange()是一个内部函数,它会他计算出一行View的上下边界的范围,其中上边界的上下范围没有被使用到。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
std::tuple<int, int, int, int> HighImageJointer::getRowRange(int row)
{
int top_up = 10000, top_down = 0, bottom_up = 10000, bottom_down = 0;
for(int col=0; col<_impl->_scanColumns; ++col)
{
const auto& view = getViewElement({col, row});
top_up = std::min(top_up, view._grect.top());
top_down = std::max(top_down, view._grect.top());
bottom_up = std::min(bottom_up, view._grect.bottom());
bottom_down = std::max(bottom_down, view._grect.bottom());
}
top_up = std::max(0, top_up);
top_down = std::max(top_up, top_down);
return {top_up, top_down, bottom_up, bottom_down};
}

生成切片 makeSliceProc()

makeSliceProc()会根据设置来决定是为每个Slice启动一个线程来执行makeSlice()还是就在当前现线程中执行。因为涉及到Mat的编码,这个makeSlice()还是比较消耗时间的。这里的串行更多的是为了调试方便。

makeSliceProc的几个参数中,colrow指的是这个Slice的行列号,is_last_colis_last_row用于标记这个Slice是否是最右边的列和最下面的行。当我们需要构造金字塔的时候这两个标记需要用到。当前我们仅仅是生成切片,不做进一步的合并,这两个参数用不到。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
void HighImageJointer::makeSliceProc(int col, int row, bool is_last_col, bool is_last_row)
{
if(_impl->_parallelSlice)
{
_impl->_sliceSync.addFuture(QtConcurrent::run(&_impl->_slicePool, &HighImageJointer::makeSlice, this, col, row, is_last_col, is_last_row));
}
else
{
makeSlice(col, row, is_last_col, is_last_row);
}
}

void HighImageJointer::makeSlice(int col, int row, bool is_last_col, bool is_last_row)
{
auto& slice = getSliceElement({col, row});
cv::Mat data(_impl->_sliceSize.height(), _impl->_sliceSize.width(), CV_8UC(_impl->_imageChannels));
for(int i=0; i<slice._parts.size(); ++i)
{
auto& part = slice._parts[i];
auto part_rect = cv::Rect(part._sliceRect.x(), part._sliceRect.y(), part._sliceRect.width(), part._sliceRect.height());
part._data.copyTo(data(part_rect));
part._data.release();
}
makeTile(col, row, data, is_last_col, is_last_row, _impl->_makeTile);
}

生成切片

函数makeTile用于保存切片,并在需要时生成Tile。如果直接写文件,测试起来有点麻烦,我们先改一下代码,让它发送一个signal,通过统计这个signal的个数来做单元测试。

1
2
3
4
5
6
7
8
9
10
11
12
void HighImageJointer::makeTile(int col, int row, cv::Mat image, bool is_last_col, bool is_last_row, bool make_pyrmid)
{
dumpSlice(col, row, image);
emit sigDumpSlice(col, row, image);

}
void HighImageJointer::dumpSlice(int col, int row, const cv::Mat &img)
{
auto path = QString("%1/%2_%3.jpg").arg(_impl->_sliceDumpPath).arg(col).arg(row);
auto data = ImageTool::transformMat(img, _impl->_sliceFormat, _impl->_sliceQuality);
ImageTool::dumpFile(data, path);
}

测试流程

现在我们构造一下HighImageJointer的使用流程,测试一下它的使用:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
void ImageJointerTester::test_addView()
{
QString dataRoot{R"(D:\DataStore\TestData\scanstub\2024-02-19-092454\higher\matrix)"};
QList<std::pair<QPoint, cv::Mat>> scanDataList;
const int COLUMNS = 5;
const int ROWS = 5;
TRACE() << "开始准备数据. 共有" << COLUMNS << "行" << ROWS << "列共" << COLUMNS*ROWS << "张图片需要拼接";
auto t1 = cv::getTickCount();
for(int row=0; row<ROWS; ++row)
{
for(int scan_col=0; scan_col<COLUMNS; ++scan_col)
{
int col = row%2==0 ? scan_col : COLUMNS-scan_col-1;
cv::Mat img = cv::imread(QString("%1/%2_%3.jpg").arg(dataRoot).arg(col).arg(row).toStdString());
QVERIFY2(!img.empty(), QString("Failed to load image %1_%2.jpg").arg(col).arg(row).toStdString().c_str());
scanDataList.push_back(std::make_pair(QPoint(col,row), img));
}
}
auto t2 = cv::getTickCount();
auto load_msecs = (t2-t1)*1000/cv::getTickFrequency();
TRACE() << QString("数据准备完毕. 共加载了%1个图片文件, 耗时%2 ms").arg(COLUMNS*ROWS).arg(load_msecs);

QBENCHMARK
{
auto t1 = cv::getTickCount();
HighImageJointer jointer;
QSignalSpy spy1(&jointer, &HighImageJointer::sigDumpSlice);
jointer.start(COLUMNS,ROWS, QRect(), "D:\\Temp\\slices", true, false);
for(const auto& item: scanDataList)
{
jointer.add(item.first, item.second);
}
jointer.waitForFinished();
QCOMPARE(spy1.count(), jointer.sliceColumns()*jointer.sliceRows());
}
}

很遗憾的是,程序测试会概率性崩溃。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
A crash occurred in D:\CodeLib\QtInPractice\source\ScanerSuit\build\Desktop_Qt_6_7_0_MSVC2019_64bit-Debug\UnitTest\UnitTest_Frameworks_ImageJointer\debug\UnitTest_Frameworks_ImageJointer.exe.
While testing test_addView
Function time: 2855ms Total time: 3941ms

Exception address: 0x00007FF6EF815DA9
Exception code : 0xc0000005
Nearby symbol : std::_Atomic_integral<int,4>::fetch_add

Stack:
# 1: QTestLog::enterTestFunction() - 0x00007FFE68FC5CE0
# 2: UnhandledExceptionFilter() - 0x00007FFE98D3DB40
# 3: memset() - 0x00007FFE9B4AAA80
# 4: _C_specific_handler() - 0x00007FFE9B493C30
# 5: _chkstk() - 0x00007FFE9B4A8BB0
# 6: RtlRestoreContext() - 0x00007FFE9B435390
# 7: KiUserExceptionDispatcher() - 0x00007FFE9B4A7CA0
# 8: std::_Atomic_integral<int,4>::fetch_add() - 0x00007FF6EF815D80
# 9: std::_Atomic_integral_facade<int>::fetch_sub() - 0x00007FF6EF815DD0
# 10: QAtomicOps<int>::deref<int>() - 0x00007FF6EF80C030
# 11: QBasicAtomicInteger<int>::deref() - 0x00007FF6EF814AD0
# 12: QArrayData::deref() - 0x00007FF6EF814B00
# 13: QArrayDataPointer<QVariant>::deref() - 0x00007FF6EF814A70
# 14: QArrayDataPointer<QVariant>::~QArrayDataPointer<QVariant>() - 0x00007FF6EF8107C0
# 15: QList<QVariant>::~QList<QVariant>() - 0x00007FF6EF8108D0
# 16: QList<QVariant>::`scalar deleting destructor'() - 0x00007FF6EF811860
# 17: std::_Destroy_in_place<QList<QVariant> >() - 0x00007FF6EF80B840
# 18: std::_Destroy_range<QList<QVariant> *,QList<QVariant> *>() - 0x00007FF6EF80B8F0
# 19: std::destroy<QList<QVariant> *>() - 0x00007FF6EF80C0F0
# 20: QtPrivate::QGenericArrayOps<QList<QVariant> >::destroyAll() - 0x00007FF6EF814BF0
# 21: QArrayDataPointer<QList<QVariant> >::~QArrayDataPointer<QList<QVariant> >() - 0x00007FF6EF810720
# 22: QList<QList<QVariant> >::~QList<QList<QVariant> >() - 0x00007FF6EF810870
# 23: QSignalSpy::~QSignalSpy() - 0x00007FF6EF810CF0
# 24: ImageJointerTester::test_addView() - 0x00007FF6EF8079F0
# 25: ImageJointerTester::qt_static_metacall() - 0x00007FF6EF802BF0
# 26: QFileInfo::filePath() - 0x00007FFE062E77F9
# 27: QFileInfo::filePath() - 0x00007FFE062E77F9
# 28: QTestLog::enterTestFunction() - 0x00007FFE68FC5CE0
# 29: QTestLog::enterTestFunction() - 0x00007FFE68FC5CE0
# 30: QTestLog::enterTestFunction() - 0x00007FFE68FC5CE0
# 31: QTestLog::enterTestFunction() - 0x00007FFE68FC5CE0
# 32: QTestLog::enterTestFunction() - 0x00007FFE68FC5CE0
# 33: QTestLog::enterTestFunction() - 0x00007FFE68FC5CE0
# 34: QTestLog::enterTestFunction() - 0x00007FFE68FC5CE0
# 35: QTestLog::enterTestFunction() - 0x00007FFE68FC5CE0
# 36: main() - 0x00007FF6EF808B10
# 37: invoke_main() - 0x00007FF6EF81BC50
# 38: __scrt_common_main_seh() - 0x00007FF6EF81BA00
# 39: __scrt_common_main() - 0x00007FF6EF81B9E0
# 40: mainCRTStartup() - 0x00007FF6EF81BD10
# 41: BaseThreadInitThunk() - 0x00007FFE9AC553D0
# 42: RtlUserThreadStart() - 0x00007FFE9B404830

我们看栈信息,第23行和22行看到了QSignalSpy的析构序列,看上面的都是QList的释放序列。我们尝试把spy1给注释掉,再怎么跑都不崩溃了。或者,我们修改start()的参数,把parallel_run改成false,发现也不会崩溃了。这就很明确了。我们是在多线程中emit sigDumpSlice()的,而所谓QSignalSpy,其实就是把emit的内容给加到一个List里面。我们知道QSignalSpyQList的派生类,而Qt中,或者说,几乎所有的C++库中的容器类都不是线程安全的,会不会是这个原因?在QTEST里面没有说这件事,我们只能自己尝试去看看。

我们去看QSignalSpy的源码:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
QSignalSpy(const QObject *obj, const QMetaMethod &signal)
: m_waiting(false)
{
if (isObjectValid(obj) && isSignalMetaMethodValid(signal) &&
connectToSignal(obj, signal.methodIndex())) {
sig = signal.methodSignature();
initArgs(signal, obj);
}
}

...
bool connectToSignal(const QObject *sender, int sigIndex)
{
static const int memberOffset = QObject::staticMetaObject.methodCount();
const bool connected = QMetaObject::connect(
sender, sigIndex, this, memberOffset, Qt::DirectConnection, nullptr);

if (!connected)
qWarning("QSignalSpy: QMetaObject::connect returned false. Unable to connect.");

return connected;
}
...

看到这里就不用再看下去了。它用的是Qt::DirectConnection!实在是坑死人不偿命啊。

要想规避这个问题,很自然的是加互斥量保护,让线程串行发送信号,但是这个就跟我们用false来启动Jointer一样,毫无意义。那我们就只能不用QSignalSpy了,改为自己跟踪好了:

1
2
3
4
5
6
...
QAtomicInt count1=0;
auto v = connect(&jointer, &HighImageJointer::sigDumpSlice,
this, [&count1](int, int, const cv::Mat&){count1++;});
...
QCOMPARE(count1, jointer.sliceColumns()*jointer.sliceRows());

或者我们还是老老实实生成文件,并检查生成的Slice的个数算了:

1
2
3
4
5
const QString dumpPath{"D:\\Temp\\slices"};
QCOMPARE(QDir(dumpPath).removeRecursively(), true);
QDir().mkpath(dumpPath);
...
QCOMPARE(QDir(dumpPath).entryList({"*.jpg"}, QDir::Filter::Files).size(), jointer.sliceColumns()*jointer.sliceRows());

释放资源

接下来分析一下资源问题。我们说过,一张图的内存占用大约是15M上下,我们必须在图片不再使用时及早释放掉。我们看ViewSlice的使用情况。

我们分析ViewElement的srcMat的使用情况:

  • 一开始创建的默认构造函数,它是空的
  • add()调用之后,它有内容了
  • 它自己的matchView()会访问它
  • 它的相邻视野的matchView()会访问它
  • splitViewProc()会调用splitLine(),在里面会访问它。因为会将一个视野的子区分给多个Slice,我们这里使用了clone(),实际就是在调用后割断了Slice和View之间的数据共享。因此当一个扫描行处理和分配完毕后,就不需要了。

但是我们不能简单地在splitViewProc()里面直接释放掉,因为这个时候下一行图像数据可能还没有处理完毕,还可能需要访问它的数据。换句话说,一个视野的图像数据什么时候被用完是不确定的。我们要想主动删除它,就只能有两种方法:一种是在每次用完后检查是否用完了,一种是专门做一个线程来负责释放。相对来说,后一种方式可能维护性更好,并且也是非破坏性的。

接下来我们要考虑如何设计View的引用计数。从前面的分析看,一个View最多会被引用4次——它自己,0-2个邻居视野,以及splitViewProc()一个。因为这里存在多个写线程的冲突,我们就不能再像前面一样使用一个普通数值来计数了,可以用的,要么是信号量,要么是原子量。不管是使用什么,这个资源管理线程都要等待引用计数变为0时来释放它的资源。而相对于笨重的信号量,QAtomicInteger<T>恰好就是做这件事情的,它也是Qt自己用来做引用计数的数据类型,它专门为引用计数使用提供了ref()dref()两个有明确含义的接口。

我们在ViewElement中的_viewRefCount就是用于记录引用计数的,在ViewElement构造的时候,需要提供它的初始值。我们在start()函数中初始化每个视野的数据的时候使用calcRefCount()来计算这个View会被引用的次数,当这个值变为0的时候就可以释放它了:

1
2
3
4
5
6
7
8
int ImageJointer::calcRefCount(int col, int row, int columns, int rows)
{
int val = 4;
if(row==rows-1) val--;
if(row%2==0 && col==columns-1) val--;
if(row%2==1 && col==0) val--;
return val;
}

在代码中我们能看到在add()matchViewsplitViewProc中对引用计数的修改。

最后就是释放线程viewGCProc(),它会按照和扫描相同的顺序检查每个View,判断是否需要释放资源。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
void ImageJointer::viewGCProc(int columns, int rows)
{
for(int row=0; row<rows; ++row)
{
for(int scan_col=0; scan_col<columns; ++scan_col)
{
int col = row%2==0 ? scan_col : columns-scan_col-1;
auto& view = getViewElement({col, row});
while(view._viewRefCount)
{
QThread::msleep(500);
}
view._srcMat.release();
}
}
}

我们在测试函数test_addView()中增加资源检查:

1
2
3
auto v1 = std::count_if(jointer.viewList().cbegin(), jointer.viewList().cend(), 
[](const HighImageJointer::ViewElement& view){ return !view._srcMat.empty();});
QCOMPARE(v1, 0);

然后就是SliceElement中的资源释放问题。我们看代码,似乎是没有问题的,在makeSlice()中,我们每次真正生成切片后都释放了ViewPart里面的Mat数据。是否真的是这样?我们继续增加测试函数test_addView中的检查内容:

1
2
3
4
5
6
7
8
9
auto& sliceList = jointer.sliceList();
for(auto iter=sliceList.cbegin(); iter!=sliceList.cend(); ++iter)
{
auto c = std::count_if(iter->_parts.cbegin(), iter->_parts.cend(),
[](const HighImageJointer::SliceElement::ViewPart& part){
return !part._data.empty();
});
QCOMPARE(c, 0);
}

运行测试,竟然失败了!

我们修改一下代码,屏蔽掉QCOMPARE,把失败的Slice编号打印出来,

1
2
3
4
5
6
7
8
9
10
11
12
TRACE() << "Slice行数: " << jointer.sliceRows() << ", 列数: " << jointer.sliceColumns();
auto& sliceList = jointer.sliceList();
for(auto iter=sliceList.cbegin(); iter!=sliceList.cend(); ++iter)
{
auto c = std::count_if(iter->_parts.cbegin(), iter->_parts.cend(),
[](const HighImageJointer::SliceElement::ViewPart& part){
return !part._data.empty();
});
if(c>0)
TRACE() << "Slice" << iter.key() << "没有释放干净";
//QCOMPARE(c, 0);
}

函数输入:

1
2
3
4
5
6
7
8
9
10
11
Slice行数:  8 , 列数:  10
... Slice QPoint(4,8) 没有释放干净
... Slice QPoint(2,8) 没有释放干净
... Slice QPoint(1,8) 没有释放干净
... Slice QPoint(3,8) 没有释放干净
... Slice QPoint(0,8) 没有释放干净
... Slice QPoint(5,8) 没有释放干净
... Slice QPoint(9,8) 没有释放干净
... Slice QPoint(7,8) 没有释放干净
... Slice QPoint(8,8) 没有释放干净
... Slice QPoint(6,8) 没有释放干净

是一整行没有释放?切割出的Slice是8行,为什么这里出来了行8?

哦,看我们splitLine()实现,原来我们仅对完成了的行调用了makeSliceProc(),并进而调用makeSlice(),释放了资源。而最后一行,因为识别出到了最后一行扫描行,这些Slice没有完成,我们就没有完成!

知道了问题,我们只要在这里释放掉就可以了。这些数据完全没有用处。

我们在splitViewProc()退出之前添加下面的一行:

1
_impl->_sliceList.removeIf([max_row=_impl->_sliceRows](std::pair<const QPoint &, SliceElement &> item){ return item.first.y()>=max_row; });

其中,QHash::removeIf()是Qt6.1中新增的一个方法,可以大致理解为QHash上面提供了类似于std::remove_if的功能。

到此,HighImageJointer就完成了。接下来将其集成到ScanTask中去,并完成其他的工作。