当前位置:嗨网首页>书籍在线阅读

03-k均值聚类算法

  
选择背景色: 黄橙 洋红 淡粉 水蓝 草绿 白色 选择字体: 宋体 黑体 微软雅黑 楷体 选择字体大小: 恢复默认

6.2 k均值聚类算法

k均值聚类(k-means clustering)算法根据每个点与聚类簇中心的相对距离,尝试将数据点分组到某个聚类簇中,聚类簇的数量是预先定义好的。在每一轮k均值的运行过程中,都要计算每个数据点与聚类簇每个中心(称为形心的一个点)之间的距离。数据点将被分配到与其距离最近的形心所在的聚类簇。然后算法将重新计算所有形心,求出分配到每个聚类簇的所有点的均值,并用新的均值替换旧的形心。数据点的分配和形心的重新计算会一直持续下去,直至形心停止移动或迭代达到了一定的次数。

提供给k均值聚类算法的初始数据点的所有维度都需要在量度上具备可比性,否则,k均值聚类算法在进行聚类时将向差最大的维度倾斜。使不同类型的数据(本例中是不同的维度)具有可比性的过程被称为归一化(normalization)。有一种常用的归一化数据的方法是基于每个数据值的z分数(也称为标准分数)进行评估,z分数是相对于其他同类型数据而言的。读取一个数据值,从中减去所有数据的均值,将其结果除以所有数据的标准差,即可求得z分数。真正对可迭代的一串 float 值执行z分数计算的,就是在上一节开头部分设计的 zscores() 函数。

使用k均值聚类算法的主要困难是初始形心如何给出。在以下即将实现的最简单形式的算法中,初始形心是随机分布于数据范围中的。另一个困难是该把数据划分为多少个(k均值中的“k”)聚类簇。经典算法实现中的k值由用户来确定,但用户可能并不知道适合的个数,这需要经过一些实验才能确定。这里将由用户来定义“k”值。

将这些步骤和注意事项全部汇集在一起,就是下面的k均值聚类算法。

(1)初始化全部数据点和k个空聚类。

(2)对全部数据点进行归一化操作。

(3)为每个聚类簇创建与其关联的随机分布的形心。

(4)将每个数据点分配到与其距离最近的形心所关联的聚类簇。

(5)重新计算每个形心,应是其关联聚类簇的中心(均值)。

(6)重复第4步和第5步,直至迭代数量到达最大值或所有形心都停止移动(收敛)。

从概念上讲,k均值聚类算法其实非常简单:在每次迭代过程中,每个数据点都以聚类簇的中心为依据与最近的聚类簇相关联。当有新的数据点与聚类簇关联时,聚类簇的中心就会移动,如图6-1所示。

36.png

图6-1 在某个数据集上运行了3代后的k均值聚类算法示例。星星表示形心。 不同形状代表当前的聚类簇成员状态(一直在变化)

下面将实现一个用于记录状态和运行算法的类,类似于第5章中的 GeneticAlgorithm 。现在回到kmeans.py文件,具体代码如代码清单6-3所示。

代码清单6-3 kmeans.py(续)

Point = TypeVar('Point', bound=DataPoint)
class KMeans(Generic[Point]):
    @dataclass
    class Cluster:
        points: List[Point]
        centroid: DataPoint

KMeans 是一个泛型类。它适用于 DataPointDataPoint 的任何子类,这些类由 Point 类型的 bound 给出定义。 KMeans 包含一个内部类 Cluster ,用于记录操作过程中的各个聚类簇。每个 Cluster 都包含数据点和与之关联的形心。

下面继续介绍 KMeans 类的 __init__() 方法,具体代码如代码清单6-4所示。

代码清单6-4 kmeans.py(续)

def __init__(self, k: int, points: List[Point]) -> None:
    if k < 1: # k-means can't do negative or zero clusters
       raise ValueError("k must be >= 1")
    self._points: List[Point] = points
    self._zscore_normalize()
    # initialize empty clusters with random centroids
    self._clusters: List[KMeans.Cluster] = []
    for _ in range(k):
        rand_point: DataPoint = self._random_point()
        cluster: KMeans.Cluster = KMeans.Cluster([], rand_point)
        self._clusters.append(cluster)
@property
def _centroids(self) -> List[DataPoint]:
    return [x.centroid for x in self._clusters]

KMeans 包含一个与之关联的数组 _points ,它就是数据集中的所有数据点。这些点后续将会被划分到各个聚类簇中,聚类簇则存储在 _clusters 变量中。当 KMeans 被实例化时,它需要知道创建多少个聚类簇( k )。每个聚类簇最初都有一个随机分布的形心。本算法要用到的所有数据点都基于z分数进行了归一化处理。计算出的 _centroids 属性将返回本算法相关聚类簇所关联的所有形心。具体代码如代码清单6-5所示。

代码清单6-5 kmeans.py(续)

def _dimension_slice(self, dimension: int) -> List[float]:
    return [x.dimensions[dimension] for x in self._points]

_dimension_slice() 是一个快捷方法,可被视为返回一列数据。它将返回一个列表,由每个数据点指定索引处的值组成。例如,如果数据点是 DataPoint 类型,则 _dimension_slice(0) 将返回由每个数据点的第一维值组成的列表。这在代码清单6-6所示的归一化方法中很有用。

代码清单6-6 kmeans.py(续)

def _zscore_normalize(self) -> None:
    zscored: List[List[float]] = [[] for _ in range(len(self._points))]
    for dimension in range(self._points[0].num_dimensions):
        dimension_slice: List[float] = self._dimension_slice(dimension)
        for index, zscore in enumerate(zscores(dimension_slice)):
            zscored[index].append(zscore)
    for i in range(len(self._points)):
        self._points[i].dimensions = tuple(zscored[i])

_zscore_normalize() 把每个数据点的 dimensions 元组中的值都替换为其等价的z分数。这里用到了之前为 float 序列定义的 zscores() 函数。尽管 dimensions 元组中的值被替换了,但 DataPoint 中的 _originals 元组没有被替换。这一点很有用,如果存了两份原始数据,则用户在算法运行完毕后仍能获取归一化处理之前各个维度的原始值。具体代码如代码清单6-7所示。

代码清单6-7 kmeans.py(续)

def _random_point(self) -> DataPoint:
    rand_dimensions: List[float] = []
    for dimension in range(self._points[0].num_dimensions):
        values: List[float] = self._dimension_slice(dimension)
        rand_value: float = uniform(min(values), max(values))
        rand_dimensions.append(rand_value)
    return DataPoint(rand_dimensions)

代码清单6-4中的 __init__() 方法将用上述 _random_point() 方法为每个聚类簇创建最初的随机形心。为每个数据点生成的随机值将被限制在现有数据点的值域内。 _random_point() 方法用之前为 DataPoint 定义的构造函数从一个可迭代值序列中新建一个数据点。

下面介绍为数据点查找合适归属聚类簇的方法,具体代码如代码清单6-8所示。

代码清单6-8 kmeans.py(续)

# Find the closest cluster centroid to each point and assign the point to that cluster
def _assign_clusters(self) -> None:
    for point in self._points:
        closest: DataPoint = min(self._centroids, key=partial(DataPoint.distance, point))
        idx: int = self._centroids.index(closest)
        cluster: KMeans.Cluster = self._clusters[idx]
        cluster.points.append(point)

在本书中我们已经创建了几个能够在列表中找到最小值或最大值的函数。上述函数也是类似的。当前情况是要查找与每个数据点的距离都最短的聚类簇形心,然后将该数据点分配到这一聚类簇中。唯一稍显复杂的地方就是用到了 partial() 做中介的函数,其作为 min()keypartial() 的参数为一个函数,在调用该函数之前为其提供一些参数。在当前情况下,我们将把要计算的数据点作为 other 参数,提供给 DataPoint.distance() 方法,从而计算出每个形心到该数据点的距离,并由 min() 返回这些距离的最小值。具体代码如代码清单6-9所示。

代码清单6-9 kmeans.py(续)

# Find the center of each cluster and move the centroid to there
def _generate_centroids(self) -> None:
    for cluster in self._clusters:
        if len(cluster.points) == 0: # keep the same centroid if no points
            continue
        means: List[float] = []
        for dimension in range(cluster.points[0].num_dimensions):
            dimension_slice: List[float] = [p.dimensions[dimension] for p in cluster.                  points]
            means.append(mean(dimension_slice))
        cluster.centroid = DataPoint(means)

每个数据点被分配到聚类簇之后,都会计算新的形心。这涉及计算聚类簇中每个点的每个维度的均值。然后将每个维度的均值组合在一起,以求得聚类簇中的“中心点”(mean point),该点将成为新的形心。注意,此处不能使用 _dimension_slice() ,因为当前这些数据点只是全部数据点的子集(仅归属于某聚类簇的点)。请问该如何重写 _dimension_slice() ,以使其更加通用呢?

下面介绍实际运行算法的方法,具体代码如代码清单6-10所示。

代码清单6-10 kmeans.py(续)

def run(self, max_iterations: int = 100) -> List[KMeans.Cluster]:
    for iteration in range(max_iterations):
        for cluster in self._clusters: # clear all clusters
            cluster.points.clear()
        self._assign_clusters() # find cluster each point is closest to
        old_centroids: List[DataPoint] = deepcopy(self._centroids) # record
        self._generate_centroids() # find new centroids
        if old_centroids == self._centroids: # have centroids moved?
            print(f"Converged after {iteration} iterations")
            return self._clusters
    return self._clusters

run() 是原始算法最纯粹的体现。唯一可能会令你感到意外的改动是,在每次迭代开始时会移除所有数据点因为如果不这么做, _assign_clusters() 方法最终会在每个聚类簇中放入重复的数据点。

我们不妨将 k 设为2,并用一些测试用的 DataPoint 进行一次快速检验,具体代码如代码清单6-11所示。

代码清单6-11 kmeans.py(续)

if __name__ == "__main__":
   point1: DataPoint = DataPoint([2.0, 1.0, 1.0])
   point2: DataPoint = DataPoint([2.0, 2.0, 5.0])
   point3: DataPoint = DataPoint([3.0, 1.5, 2.5])
   kmeans_test: KMeans[DataPoint] = KMeans(2, [point1, point2, point3])
   test_clusters: List[KMeans.Cluster] = kmeans_test.run()
   for index, cluster in enumerate(test_clusters):
       print(f"Cluster {index}: {cluster.points}")

由于随机性的存在,你的结果可能会有所不同。结果应该如下所示:

Converged after 1 iterations
Cluster 0: [(2.0, 1.0, 1.0), (3.0, 1.5, 2.5)]
Cluster 1: [(2.0, 2.0, 5.0)]