更新于 2026年6月28日

13.3 Label Spreading 算法#

在上一节内容中,我们介绍了一种基于图结构的标签传播算法。标签传播算法的核心思想认为,在样本空间中距离越相近的样本点越有可能具有相同的标签。因此,对于样本空间中的所有样本点,可以通过构建一个有向完全图来表示样本点之间的位置关系,并以此为基础构建一个概率迁移矩阵来确定未知标签的所属类别。在本节内容中,我们将继续学习另外一种同样也是基于图结构的标签扩散算法(Label Spreading)[1]。

如图13-10所示便是标签扩散算法的学习路线图,我们将先介绍其对应的思想以及sklearn的建模过程,然后再来介绍其背后的原理、计算示例和收敛性证明等过程,最后再一步步来从零实现整个算法。

图 13-10 标签扩散算法学习路线图
图 13-10 标签扩散算法学习路线图

13.3.1 Label Spreading算法思想#

从本质上来讲,标签传播和标签扩散这两种算法在思想上并没有太大的差别,都是基于样本点的空间位置来构造得到概率迁移矩阵,然后通过概率迁移矩阵用已知标签的样本点来确定未知标签样本点的类别。相比较于标签传播算法来说,标签扩散算法的改进点表现在两个方面:①使用了标准化的拉普拉斯矩阵(Laplace Matrix )来作为概率迁移矩阵,其目的是能够使得在迭代过程中标签的传播更加平滑;②加入了类似于正则化策略的惩罚参数来增加模型的泛化能力。由于标签扩散算法在思想上与标签传播算法并没有什么本质区别,下面首先直接来看标签扩散算法的示例用法。

13.3.2 Label Spreading 示例代码#

在sklearn中,可以从sklearn.semi_supervised中来导入标签传播算法LabelSpreading模块。接下来,需要构造半监督学习所需要使用到的训练集,即将部分样本的标签置为-1,示例代码如下:

1 def load_data():
2     x, y = load_iris(return_X_y=True)
3     x_train, x_test, y_train, y_test = \
4         train_test_split(x, y, test_size=0.3, random_state=2022)
5     rng = np.random.RandomState(20)
6     random_unlabeled_points = rng.rand(y_train.shape[0]) < 0.8
7     y_mixed = deepcopy(y_train)
8     y_mixed[random_unlabeled_points] = -1
9     return x_train, x_test, y_train, y_test, y_mixed

在上述代码中,第2~4行是导入原始并进行训练集和测试集的划分。第5~8行则是将训练集中$80\%$样本的标签重置为-1,同时也保留了原始重置之前的标签便于后续在训练集上测试模型的效果。第9行是返回最后各个部分的结果。

进一步,便可以导入LabelSpreading进行模型训练和测试,示例代码如下:

1 def test_label_spreding():
2     x_train, x_test, y_train, y_test, y_mixed = load_data()
3     ls = LabelSpreading(alpha=0.2)
4     ls.fit(x_train, y_mixed)
5     print("Label Spreading")
6     print(f"训练集上的准确率为:{ls.score(x_train, y_train)}")
7     print(f"测试集上的准确率为:{ls.score(x_test, y_test)}")

上述代码运行结束后便会得到类似如下所示的结果:

1 训练集上的准确率为0.9714
2 测试集上的准确率为1.0

上述完整示例代码可参见 AllBooKCode/Chapter13/C06_label_spreading.py 文件。

在学会LabelSpreading模块的使用后,再来对比一下标签扩散和标签传播算法各自在包含噪音数据上的表现。下面以sklearn中的手写体数据集为例来构造相应的训练和测试样本,示例代码如下:

 1 from sklearn.datasets import load_digits
 2 from sklearn.preprocessing import StandardScaler
 3 
 4 def load_data(noise_rate=0.1):
 5     n_class = 10
 6     x, y = load_digits(n_class=n_class, return_X_y=True)
 7     x_train, x_test, y_train, y_test = \
 8         train_test_split(x, y, test_size=0.3, random_state=2022)
 9     rng = np.random.RandomState(20)
10     random_unlabeled_points = rng.rand(y_train.shape[0]) < 0.8
11     y_mixed = deepcopy(y_train)
12     y_mixed[random_unlabeled_points] = -1
13     for i in range(len(y_mixed)):
14         if y_mixed[i] == -1:
15             continue
16         if rng.random() < noise_rate:  # 在训练集中,将有标签的样本中的noise_rate替换为随机标签
17             candidate_ids = np.random.permutation(n_class).tolist()
18             candidate_ids.remove(y_mixed[i])
19             y_mixed[i] = candidate_ids[0]
20     ss = StandardScaler()
21     x_train = ss.fit_transform(x_train)
22     x_test = ss.transform(x_test)
23     return x_train, x_test, y_train, y_test, y_mixed

在上述代码中,第1~2行是导入手写体数据集和特征标准化模块。第5~8行是导入10个类别的数据集,并将其中$70\%$用作训练集。第9~12行是将训练集中$80\%$的样本重置为无标签样本。第13~19行则是将训练样本中,有真实标签样本中的noise_rate替换为随机标签(即制造噪音样本)。第20~22行是对训练集和测试集进行标准化处理;第23行是返回最后处理完成的结果。

进一步,可以定义一个函数来对两个模型的结果进行对比,示例代码如下:

 1 def comp(p=0.1):
 2     x_train, x_test, y_train, y_test, y_mixed = load_data(noise_rate=p)
 3     lp = LabelPropagation(max_iter=200)
 4     lp.fit(x_train, y_mixed)
 5     print("LabelPropagation")
 6     print(f"训练集上的准确率为:{lp.score(x_train, y_train)}")
 7     print(f"测试集上的准确率为:{lp.score(x_test, y_test)}")
 8 
 9     ls = LabelSpreading(max_iter=200)
10     paras = {'alpha': np.arange(0.01, 1, 0.02)}
11     gs = GridSearchCV(ls, paras, verbose=1, cv=3)
12     gs.fit(x_train, y_mixed)
13     print("LabelSpreading")
14     print('最佳模型:', gs.best_params_ )
15     print(f"训练集上的准确率为:{gs.score(x_train, y_train)}")
16     print(f"测试集上的准确率为:{gs.score(x_test, y_test)}")
17 
18 if __name__ == '__main__':
19     comp()

在上述代码中,第2行是载入手写体数据集,并将训练集中有标签样本的$10\%$设定为随机标签。第3~7行是标签传播算法分别在训练集和测试集上的表现结果。第9~16行分别是标签扩散算法在训练集和测试集上的结果,同时这里使用了网格搜索与交叉验证来寻找超参数$\alpha$,这部分内容可以参考5.3.2节内容。

在上述代码中运行结束后便会得到类似如下结果:

1 LabelPropagation
2 训练集上的准确率为0.8409
3 测试集上的准确率为0.8241
4 Fitting 3 folds for each of 50 candidates, totalling 150 fits
5 LabelSpreading
6 最佳模型: {'alpha': 0.9899}
7 训练集上的准确率为0.85759
8 测试集上的准确率为0.8444

从上述结果可以看出,无论是在训练集上还是测试集上标签扩散算法的表现结果都要明显好于标签传播算法。同时,经过网格搜索后可知,当$\alpha\approx0.99$时模型的效果最优。上述代码可参见 AllBooKCode/Chapter13/C07_label_spreading_comp.py 文件。

13.3.3 Label Spreading算法原理#

在介绍完标签扩散算法的使用示例及与标签传播算法的对比以后,再来详细介绍其背后的数学原理。设$(x_1,y_1),(x_2,y_2),...,(x_l,y_l)$为已知标签的数据样本,其中$Y_L=\{y_1,y_2,...,y_l\}$是样本点对应的标签;同时样本的类别数为$C$,并且假定所有的类别均出现在$Y_L$中。继续,设$(x_{l+1},y_{l+1}),(x_{l+2},y_{l+2}),...,(x_{l+u},y_{l+u})$为不含标签的样本点,其中$Y_U=\{y_{l+1},y_{l+2},...,y_{l+u}\}$未知,且通常情况下$l\ll u$。此时有$X=\{x_1,...,x_{l+u}\}$,且$x_i\in R^D$。

首先,我们同样需要先计算任意两个样本点之间的权重距离,如式(13-20)所示

$$ W_{ij}= \begin{cases} \exp\frac{-\|x_i-x_j\|^2}{2\sigma^2}, & \text{if } i \neq j \\[2ex] 0, & \text{if } i=j \\ \end{cases}\tag{13-20} $$

从式(13-20)中可以看出,当 $i=j$ 时,即同一个样本点之间的权重距离为0,这样处理是为了避免样本点自身权重过大的问题。同时可以知道,$W$是一个形状为$(l+u,l+u)$的对称矩阵。

进一步,根据式(13-21)来构造样本点之间概率迁移矩阵

$$ S=D^{-\frac{1}{2}}WD^{-\frac{1}{2}}\tag{13-21} $$

其中 $D$ 为一个对角矩阵,对角线上每个值为权重 $W$ 中对应行所有所有元素的和,即$D_{ii}=\sum_{j=1}W_{ij}$。同时,$S$ 是标准化后的拉普拉斯矩阵,形状与权重矩阵$W$相同。

接着,再定义一个形状为$(l+u,C)$的标签矩阵$Y$,其每一行用于表示每个样本属于各个类别的概率分布。

最后,只需要迭代式(13-22)便可以求解得到未知标签样本的预测结果:

$$ F(t+1)=\alpha SF(t)+(1-\alpha)Y\tag{13-22} $$

其中$F(0)=Y$,$\alpha\in(0,1)$ 用来平衡模型在迭代过程中对于初始标签矩阵的依赖程度,$\alpha$​ 越大则表示模型更加依赖于样本的分布情况,即式(13-22) 中等式右边的第一项,这样可以一定程度上增强模型的泛化能力。

13.3.4 Label Spreading 收敛性证明#

在介绍完标签扩散算法的原理后,再来简单看一下为什么算法多次迭代后便会收敛结论。根据式(13-22)可知,此时有

$$ \begin{aligned} F(1)&=\alpha SF(0)+(1-\alpha)Y\\[1ex] F(2)&=\alpha SF(1)+(1-\alpha)Y\\[1ex] \vdots\\[1ex] F(t)&=\alpha SF(t-1)+(1-\alpha)Y\\[1ex] \end{aligned}\tag{13-23} $$

进一步可得

$$ \begin{aligned} F(t)=(\alpha S)^tY+(1-\alpha)\sum_{i=0}^{t-1}(\alpha S)^iY \end{aligned}\tag{13-24} $$

又因为$0<\alpha < 1$所以有

$$ \lim_{t \to \infty}(\alpha S)^t=0;\;\;\lim_{t \to \infty}\sum_{i=0}^{t-1}(\alpha S)^i=(I-\alpha S)^{-1}\tag{13-25} $$

因此

$$ F^{\ast}=\lim_{t \to \infty}F(t)=(1-\alpha)(I-\alpha S)^{-1}Y\tag{13-26} $$

由于对于分类任务来说,$F$表示样本所属类别的概率分布,所以式(13-26)还可以等价写为

$$ F^{\ast}=(I-\alpha S)^{-1}Y\tag{13-27} $$

也就是说,式(13-22)的整个迭代过程将会等价地收敛于式(13-27)中的结果,因此在实现标签扩散算法时还可以通过式(13-27)来直接计算得到标签概率矩阵。

接着再来看式(13-25)中极限结果的证明,此时有

$$ \begin{aligned} \lim_{t\to \infty}\sum_{i=0}^{t-1}(\alpha S)^i&=\lim_{t\to\infty}\left[(\alpha S)^0+(\alpha S)^1+...+(\alpha S)^{t-1}\right]\\[2ex] &=\lim_{t\to\infty}\left[ I+\left((\alpha S)^0+(\alpha S)^1+...+(\alpha S)^{t-2}\right)\alpha S\right]\\[2ex] &=I+\lim_{t\to\infty}\left[(\alpha S)^0+(\alpha S)^1+...+(\alpha S)^{t-2}+(\alpha S)^{t-1}-(\alpha S)^{t-1}\right]\alpha S\\[2ex] &=I+\alpha S\lim_{t\to\infty}\sum_{i=0}^{t-1}(\alpha S)^i -\lim_{t\to\infty}(\alpha S)^t\\[2ex] &=I+\alpha S\lim_{t\to\infty}\sum_{i=0}^{t-1}(\alpha S)^i \end{aligned}\tag{13-28} $$

最终,根据式(13-28)便可以得到式(13-25)所示的结果。

13.3.5 从零实现Label Spreading算法迭代法#

由于标签扩散算法和标签传播算法在整个梳理逻辑上非常类似,所以只需要在标签传播算法的实现上稍作改动即可。同标签传播算法一样,标签扩散算法既可以通过以迭代的方式来求解标签矩阵,也可以直接通过收敛后的计算公式来求解。以下完整示例代码可参见 AllBooKCode/Chapter13/C08_label_spreading_imp.py 文件。

1. 核函数实现

首先,根据式(13-20)可知需要先定义一个函数来求解样本点之间的权重值,在sklearn实现中称之为核函数,示例代码如下:

1 from sklearn.metrics.pairwise import euclidean_distances
2 def kernel(X, y=None, gamma=None):
3     if gamma is None:
4         gamma = 1.0 / X.shape[1]
5     K = euclidean_distances(X, y, squared=True)
6     K *= -gamma
7     np.exp(K, K)  # <==> K = np.exp(K)
8     # np.fill_diagonal(K, 0)
9     return K

在上述代码中,第1行是导入euclidean_distances模块来快速实现样本点之间距离的计算。第3~4行判断gamma是否为空,如果为空则取特征维度数的倒数。第5~7行是根据式(13-20)来计算权重矩阵,其中np.exp(K, K)等价于 K = np.exp(K)。由于$2\sigma^2$是一个常数,因此在实现过程中可以直接将其视为一个参数。第8行是将对角线的值置为0 ,但是在实际情况中由于若某些样本点之间相隔太远(或本身是异常点)则会导致除了 $W_{ii}\neq0$之外,其它位置上的值均会为0。如果此时再将对线上的元素也置为0,则会出现 $D_{ii}=0$ 的情况,进而致计算 $D^{-\frac{1}{2}}$ 的时候出错,所以实际实现的时候会进行取舍,即不进行这一步。

2. 概率迁移矩阵实现

在实现权重矩阵的计算之后,我们便可以定义一个LabelSpreading类来构建完整的算法模型。首先实现概率迁移矩阵的计算过程,示例代码如下:

 1 class LabelSpreading():
 2     def __init__(self, gamma=20, alpha=0.2,
 3                  max_iter=30, tol=1e-3):
 4         self.max_iter = max_iter
 5         self.tol = tol
 6         self.gamma = gamma
 7         self.alpha = alpha
 8 
 9     def _get_kernel(self, X, y=None):
10         return kernel(X, y, gamma=self.gamma)
11 
12     def _build_graph(self):
13         W = self._get_kernel(self.X_)
14         D = np.diag(np.sum(W, axis=1))
15         D_inv_sqrt = np.linalg.inv(np.sqrt(D))
16         laplacian = np.matmul(np.matmul(D_inv_sqrt, W), D_inv_sqrt)
17         return laplacian

在上述代码中,第2~7行是定义模型的4个基本参数。第9~10行是返回构造完成的权重矩阵,需要注意的是这里的y不是指的训练标签而是另外一个矩阵,当yNone时(即模型训练时)计算的是训练集X中各个样本点两两之间的权重距离,当y不为None时(即模型预测时)计算的是训练集X中各个样本与测试集y中各个样本之间的权重距离,原因在上一节内容我们已经介绍过就不再赘述。第12~17行是根据返回的权重矩阵通过式(13-21)计算得到概率迁移矩阵,即拉普拉斯矩阵$S$。

3. 模型拟合实现

在构建完成概率迁移矩阵后,便可以根据式(13-22)来迭代完成标签矩阵的计算过程,示例代码如下:

 1     def fit(self, X, y):
 2         self.X_ = X
 3         self.graph_matrix = self._build_graph()
 4         classes = np.unique(y)
 5         self.classes_ = (classes[classes != -1])
 6         n_samples, n_classes = len(y), len(self.classes_)
 7         alpha = self.alpha
 8         if alpha is None or alpha <= 0.0 or alpha >= 1.0:
 9             raise ValueError("alpha必须大于0小于1")
10         self.label_distributions_ = np.zeros((n_samples, n_classes))
11         for label in self.classes_:
12             self.label_distributions_[y == label, self.classes_ == label] = 1
13         y_static = np.copy(self.label_distributions_)
14         y_static *= 1 - alpha
15         l_previous = np.zeros((self.X_.shape[0], n_classes))
16         for self.n_iter_ in range(self.max_iter):
17             if np.abs(self.label_distributions_ - l_previous).sum() < self.tol:
18                 break
19             l_previous = self.label_distributions_
20             self.label_distributions_ = np.matmul(self.graph_matrix, self.label_distributions_)
21             self.label_distributions_ = alpha*self.label_distributions_ + y_static
22         else:
23             logging.warning('max_iter=%d was reached without convergence.' % self.max_iter)
24             self.n_iter_ += 1
25         normalizer = np.sum(self.label_distributions_, axis=1, keepdims=True)
26         normalizer[normalizer == 0] = 1
27         self.label_distributions_ /= normalizer
28         self.transduction_ = self.classes_[np.argmax(self.label_distributions_, axis=1)]
29         return self

在上述代码中,第1行X表示训练样本形状为[n_samples,n_features]y表示训练标签形状为[n_samples,],同时未知标签样本的标签需要用-1进行标识。第2-3行是得到概率迁移矩阵,形状为[n_samples,n_samples]。第4-6行用于得到标签类别的取值情况以及样本数。第10-12行是初始化标签矩阵用来记录训练集中每个样本的类别所属概率,然后再在已知的标签中把对应类别置为1,最后会得到类似下面这样一个矩阵:

1 [[0,0,1],
2  [0,0,0],
3  [1,0,0],
4  [0,0,0]]  

其中全0表示该样本没有标签。

第14~15行是计算式(13-22)中的第2项。第16~28行是根据式(13-22)迭代计算标签矩阵,其中第17~18行判断误差是否小于阈值,满足条件则退出迭代。第20~21行是完成一次式(13-22)的计算过程。第25~27行是对最后的标签矩阵进行标准化,其中第26行为平滑处理。第28行则是得到训练集中每个样本对应的标签。

4. 模型预测实现

在完成模型拟合的代码实现后,便可以来预测新样本的所属类别,示例代码如下:

1     def predict_proba(self, X):
2         weight_matrices = self._get_kernel(self.X_, X).T # [X.shape[0],n_train_sample]
3         probabilities = np.matmul(weight_matrices, self.label_distributions_)
4         normalizer = np.sum(probabilities, axis=1, keepdims=True)
5         normalizer[normalizer == 0] = 1
6         probabilities /= normalizer
7         return probabilities

在上述代码中,第1行X是待预测的样本,形状为[n_samples, n_features]。第2行是计算训练样本self._X到预测样本X的权重距离,其形状为[X.shape[0],n_train_sample]。第3~6行则是计算测试数据中每个样本点所属类别的概率值,同时由于第2行中计算的训练样本点到测试样本点的单向权重距离,并不是所有样本点两两之间的权重距离,因此并不需要计算概率迁移矩阵。第7行是返回计算后的概率值。

这里值得一提的是,因为在训练过程训练集中既含有已知标签的样本也含有未知标签的样本,因此是通过self._get_kernel(self.X_)来建立各个样本点之间的权重距离;而在测试时训练集中的所有样本均已知标签,因此是通过self._get_kernel(self.X_,X)来建立各个样本点(已知标签和未知标签样本点)之间的权重距离。

最后,只需要再定义一个预测函数来取probabilities中最大值对应的标签即可,示例代码如下:

1     def predict(self, X):
2         probas = self.predict_proba(X)
3         return self.classes_[np.argmax(probas, axis=1)].ravel()

在实现完整个模型之后,可以通过如下方式进行使用:

1 def test_label_spreading():
2     x_train, x_test, y_train, y_test, y_mixed = load_data(noise_rate=0.1)
3     model = LabelSpreading()
4     model.fit(x_train, y_mixed)
5     logging.info(f"模型在训练集上的准确率为: {model.score(x_train, y_train)}")
6     logging.info(f"模型在测试集上的准确率为: {model.score(x_test, y_test)}")
7 
8 if __name__ == '__main__':
9     test_label_spreading()

上述代码运行结束可以得到类似如下所示的结果:

1 模型在训练集上的准确率为: 0.8417
2 模型在测试集上的准确率为: 0.8259

13.3.6 从零实现Label Spreading算法非迭代法#

在13.3.4节的收敛性证明中,我们已经知道了可以直接通过式(13-27)来直接求得$F(0)=Y$迭代$n$次后对应的结果$F^{\ast}$。经过分析可知,在基于13.2.7节代码的基础之上,只需要修改fit()方法中的部分逻辑即可。

根据式(13-27)可知,可以先完成前面矩阵取逆的计算,然后再完成矩阵乘法部分。由于这里只是把迭代部分变成了公式直接计算,因此只需要把13.3.5节中fit()方法里第13~24行全部替换为如下两行代码即可:

1        inv = np.linalg.inv(np.eye(self.graph_matrix.shape[0]) - self.alpha * self.graph_matrix)
2        self.label_distributions_ = np.matmul(inv, self.label_distributions_)

上述代码运行结束后便可以得到类似如下结果:

1 模型在训练集上的准确率为: 0.8656
2 模型在测试集上的准确率为: 0.8444

可以发现,非迭代法求解得到的模型在测试集上的准确率同迭代法求解得到的模型一样。上述完整示例代码可参见 AllBooKCode/Chapter13/C09_label_spreading_simple.py 文件。

13.3.7 小结#

在本节内容中,我们首先介绍了标签扩散算法的基本原理以及其在sklearn中的使用示例;然后通过手写体分类对比实验验证了标签扩散算法在泛化能力上优于标签传播算法;最后,我们进一步详细地介绍了如何从零开始实现标签扩散标签传播算法以及其收敛性证明。

总结一下,在本章中,我们首先再次回顾了半监督学习的基本概念;然后介绍了3种常见的半监督学习方法,并且从本质上来将Self-Training算法还可以看成是一种特殊的模型训练策略;进一步介绍了各个算法的具体原理以及在sklearn中的示例用法;最后详细介绍了如何从零开始一步一步实现各个算法。

引用#

[1] Zhou D, Bousquet O, Lal T, et al. Learning with local and global consistency[J]. Advances in neural information processing systems, 2003, 16.

[2] Xiaojin Zhu and Zoubin Ghahramani. Learning from labeled and unlabeled data with label propagation. Technical Report CMU-CALD-02-107, Carnegie Mellon University, 2002

[3] https://en.wikipedia.org/wiki/Semi-supervised_learning

[4] https://scikit-learn.org/stable/modules/semi_supervised.html

[5] Olivier Chapelle, Bernhard Schölkopf, and Alexander Zien, Semi-Supervised Learning, MIT Press.

阅读 --

5.3 sklearn接口与示例代码

在这节内容中,我们首先通过一个引例介绍了K近邻分类器的主要思想,接着介绍了K值对算法结果的影响,以及介绍了衡量样本间距离的不同度量方式,最后我们通过开源的sklearn框架介绍了如何建模及使用K近邻分类器,并且同时还总结了sklearn中模 …