分享

用Hadoop实现KMeans算法

desehawk 2014-6-28 12:00:32 发表于 实操演练 [显示全部楼层] 回帖奖励 阅读模式 关闭右栏 1 10356
本帖最后由 desehawk 于 2014-6-28 12:19 编辑
问题导读:
什么是质心文件?
mapreduce的从哪里读取质心文件?
Driver驱动程序如何比较两次的质心是否相同?






在我们阅读的时候,我们首先知道什么是KMeans:
K-means算法是最为经典的基于划分的聚类方法,是十大经典数据挖掘算法之一。K-means算法的基本思想是:以空间中k个点为中心进行聚类,对最靠近他们的对象归类。通过迭代的方法,逐次更新各聚类中心的值,直至得到最好的聚类结果。



虽然已经发展到了hadoop2.4,但是对于一些算法只要明白其中的含义,是和语言无关的,无论是使用Java、C++、python等,
本文以Hadoop1.0.3为例。

从理论上来讲用MapReduce技术实现KMeans算法是很Natural的想法:在Mapper中逐个计算样本点离哪个中心最近,然后Emit(样本点所属的簇编号,样本点);在Reducer中属于同一个质心的样本点在一个链表中,方便我们计算新的中心,然后Emit(质心编号,质心)。但是技术上的事并没有理论层面那么简单。

Mapper和Reducer都要用到K个中心(我习惯称之为质心),Mapper要读这些质心,Reducer要写这些质心。另外Mapper还要读存储样本点的数据文件。我先后尝试以下3种方法,只有第3种是可行的,如果你不想被我误导,请直接跳过前两种。

一、用一个共享变量在存储K个质心

由于K很小,所以我们认为用一个Vector<Sample>来存储K个质心是没有问题的。以下代码是错误的:

  1. class MyJob extends Tool{
  2.   static Vector<Sample> centers=new Vector<Sample>(K);
  3.   static class MyMapper extends Mapper{
  4.     //read centers
  5.   } 
  6.   static class MyMapper extends Reducer{
  7.     //update centers
  8.   }
  9.   void run(){
  10.     until ( convergence ){
  11.       map();
  12.       reduce();
  13.     }
  14. }
复制代码

发生这种错误是因为对hadoop执行流程不清楚,对数据流不清楚。简单地说Mapper和Reducer作为MyJob的内部静态类,它们应该是独立的--它们不应该与MyJob有任何交互,因为Mapper和Reducer分别在Task Tracker的不同JVM中运行,而MyJob以及MyJob的内部其他类都在客户端上运行,自然不能在不同的JVM中共享一个变量。

详细的流程是这样的:
首先在客户端上,JVM加载MyJob时先初始化静态变量,执行static块。然后提交作业到Job Tracker。
在Job Tracker上,分配Mapper和Reducer到不同的Task Tracker上。Mapper和Reducer线程获得了MyJob类静态变量的初始拷贝(这份拷贝是指MyJob执行完静态块之后静态变量的模样)。
在Task Tracker上,Mapper和Reducer分别地读写MyJob的静态变量的本地拷贝,但是并不影响原始的MyJob中的静态变量的值。

二、用分布式缓存文件存储K个质心
既然不能通过共享外部类变量的方式,那我们通过文件在map和reduce之间传递数据总可以吧,Mapper从文件中读取质心,Reducer把更新后的质心再写入这个文件。这里的问题是:如果确定要把质心放在文件中,那Mapper就需要从2个文件中读取数据--质心文件和样本数据文件。虽然有MutipleInputs可以指定map()的输入文件有多个,并可以为每个输入文件分别指定解析方式,但是MutipleInputs不能保证每条记录从不同文件中传给map()的顺序。在我们的KMeans中,我们希望质心文件全部被读入后再逐条读入样本数据。

于是乎就想到了DistributedCache,它主要用于Mapper和Reducer之间共享数据。DistributedCacheFile是缓存在本地文件,在Mapper和Reducer中都可使用本地Java I/O的方式读取它。于是我又有了一个错误的思路:

  1. class MyMaper{
  2.     Vector<Sample> centers=new Vector<Sample>(K);
  3.     void setup(){
  4.         //读取cacheFile,给centers赋值
  5.     }
  6.     void map(){
  7.         //计算样本离哪个质心最近
  8.     }
  9. }
  10. class MyReducer{
  11.     Vector<Sample> centers=new Vector<Sample>(K);
  12.     void reduce(){
  13.         //更新centers
  14.     }
  15.     void cleanup(){
  16.         //把centers写回cacheFile
  17.     }
  18. }
复制代码



错因:DistributedCacheFile是只读的,在任务运行前,TaskTracker从JobTracker文件系统复制文件到本地磁盘作为缓存,这是单向的复制,是不能写回的。试想在分布式环境下,如果不同的mapper和reducer可以把缓存文件写回的话,那岂不又需要一套复杂的文件共享机制,严重地影响hadoop执行效率。

三、用分布式缓存文件存储样本数据
其实DistributedCache还有一个特点,它更适合于“大文件”(各节点内存容不下)缓存在本地。仅存储了K个质心的文件显然是小文件,与之相比样本数据文件才是大文件。

此时我们需要2个质心文件:一个存放上一次的质心prevCenterFile,一个存放reducer更新后的质心currCenterFile。Mapper从prevCenterFile中读取质心,Reducer把更新后有质心写入currCenterFile。在Driver中读入prevCenterFile和currCenterFile,比较前后两次的质心是否相同(或足够地接近),如果相同则停止迭代,否则就用currCenterFile覆盖prevCenterFile(使用fs.rename),进入下一次的迭代。

这时候Mapper就是这样的:
  1. class MyMaper{
  2.     Vector<Sample> centers=new Vector<Sample>(K);
  3.     void map(){
  4.         //逐条读取质心,给centers赋值
  5.     }
  6.     void cleanup(){
  7.         //逐行读取cacheFile,计算每个样本点离哪个质心最近
  8.         //然后Emit(样本点所属的簇编号,样本点)
  9.     }
  10. }
复制代码





源代码
试验数据是在Mahout项目中作为example提供的,600个样本点,每个样本是一个60维的浮点向量。 synthetic_control.data.zip (118.04 KB, 下载次数: 8)

已有(1)人评论

跳转到指定楼层
微笑的老颜 发表于 2014-6-30 11:05:32
好帖子支持一下 感谢分享
回复

使用道具 举报

您需要登录后才可以回帖 登录 | 立即注册

本版积分规则

关闭

推荐上一条 /2 下一条