分享

为 Mahout 增加聚类评估功能

本帖最后由 langke93 于 2016-5-18 13:43 编辑
问题导读
1.聚类(clustering)是无监督学习还是监督学习?
2.什么是聚类评估算法?
3.如何产生聚类需要的向量文件?







聚类算法及聚类评估 Silhouette 简介
聚类算法简介

聚类(clustering)是属于无监督学习(Unsupervised learning)的一种,用来把一组数据划分为几类,每类中的数据尽可能的相似,而不同类之间尽可能的差异最大化。通过聚类,可以为样本选取提供参考,或进行根源分析,或作为其它算法的预处理步骤。
聚类算法中,最经典的要属于 Kmeans 算法,它的基本思想是:假设我们要把一组数据聚成 N 类,那就:
  • 把数据中的每个样本作为一个向量,记作ā
  • 首先随机选取 n 个样本,把这 n 个样本作为 N 类的中心点, 称为 centroid
  • 针对数据中的所有样本,计算到 n 个 centroid 的距离,距离哪个中心点最近,就属于哪一类
  • 在每一类中,重新选取 centroid,假设该类有 k 个样本,则 centroid 为 img003.jpg
  • 重复 2,3 直到 centroid 的变化小于预设的值。
Mahout 是一个开源的机器学习软件,提供了应用推荐、聚类、分类、Logistic 回归分析等算法。特别是由于结合了 Hadoop 的大数据处理能力,每个算法都可以作为独立的 job 方便的部署在 Hadoop 平台上,因此得到了越来越广的应用。在聚类领域,Mahout 提供了 Kmeans,LDA, Canopy 等多种算法。

聚类评估算法 Silhouette 简介
在 Kmeans 中,我们会注意到需要我们预先设置聚合成几类。实际上,在聚类的过程中我们也不可能预先知道,那只能分成 2 类,3 类,……n 类这样进行尝试,并评估每次的聚类效果。
实际上,由于聚类的无监督学习特性,无论什么算法都需要评估效果。在聚类的评估中,有基于外部数据的评价,也有单纯的基于聚类本身的评价,其基本思想就是:在同一类中,各个数据点越近越好,并且和类外的数据点越远越好;前者称为内聚因子(cohension),后者称为离散因子(separation)。
把这两者结合起来,就形成了评价聚类效果的 Silhouette 因子:
首先看如何评价一个点的聚类效果:
a = 一个点到同一聚类内其它点的平均距离
b=min(一个点到其他聚类内的点的平均距离)
Silhouette 因子s = 1 – a/b (a<b) 或b/a -1 (a>=b)
衡量整体聚类的效果,则是所有点的 Silhouette 因子的平均值。范围应该在 (-1,1), 值越大则说明聚类效果越好。

图 1.Silhouette 中内聚、离散因子示意

img001.jpg

以图 1 为例。图 1 显示的是一个具有 9 个点的聚类,三个圆形表示聚成了三类,其中的黄点表示质心(centroid)。为了评估图 1 中深蓝色点的聚类效果,其内聚因子a就是该点到所在圆中其它三个点的平均距离。离散因子b的计算相对复杂:我们需要先求出到该点到右上角圆中的三个点的平均距离,记为 b1;然后求出该点到右下角圆中两个点的平均距离,记为 b2;b1 和 b2 的较小值则为b。
[size=1.166em]在 IBM 的 SPSS Clementine 中,也有 Silhouett 评估算法的实现,不过 IBM 提供的是一个简化版本,把一个点到一个类内的距离的平均值,简化为到该类质心(centroid)的距离,具体来说,就是:

图 2.IBM 关于内聚、离散因子的简化实现


img002.jpg


还是以上面描述的 9 个点聚成 3 类的例子来说明。IBM 的实现把a的实现简化为到深蓝色的点所在的质心的距离。计算b时候,还是要先计算 b1 和 b2,然后求最小值。但 b1 简化为到右上角圆质心的距离;b2 简化为到右下角圆质心的距离。
在下面的内容中,我们尝试利用 IBM 简化后的公式为 Mahout 增加聚类评估功能。



Mahout 聚类过程分析

Mahout 运行环境简介
前面说过,Mahout 是依赖 Hadoop 环境,每一个算法或辅助功能都是作为 Hadoop 的一个单独的 job 来运行,所以必须准备好一个可运行的 Hadoop 环境,(至少本文写作时候使用的 Mahout0.9 还在依赖 Hadoop),如何安装配置一个可运行的 Hadoop 环境不在这篇文章的介绍范围内。请自行参考 Hadoop 网站。需要说明的是,本文采用的 Hadoop 为 2.2.0。
安装完 Hadoop 后,下载 mahout-distribution-0.9,解压缩后的重要内容如下:
  • bin/: 目录下有 Mahout 可执行脚本
  • mahout-examples-0.9-job.jar,各种算法的实现类
  • example/ 各种实现算法的源码
  • conf/ 存放各实现类的配置文件,其中重要的为 driver.classes.default.props,如果增加实现算法类,可以在该文件中增加配置项,从而可以被 Mahout 启动脚本调用。

单独执行 Mahout,是一个实现的各种功能的简介,如下例:
执行 /data01/shanlei/src/mahout-distribution-0.9/bin/mahout
输出:
[mw_shl_code=bash,true] MAHOUT_LOCAL is not set; adding
HADOOP_CONF_DIR to classpath.
Running on hadoop, using /data01/shanlei/hadoop-2.2.0/bin/hadoop and
HADOOP_CONF_DIR=/data01/shanlei/hadoop-2.2.0/conf
p1 is org.apache.mahout.driver.MahoutDriver
MAHOUT-JOB:

/data01/shanlei/src/mahout-distribution-0.9/examples/target/mahout-examples-0.9-job.jar
An example program must be given as the first argument.
Valid program names are:
arff.vector: : Generate Vectors from an ARFF file or directory
assesser: : assesse cluster result using silhoueter algorithm
baumwelch: : Baum-Welch algorithm for unsupervised HMM training
canopy: : Canopy clustering
cat: : Print a file or resource as the logistic regression models would see it
cleansvd: : Cleanup and verification of SVD output
clusterdump: : Dump cluster output to text
clusterpp: : Groups Clustering Output In Clusters
cmdump: : Dump confusion matrix in HTML or text formats
concatmatrices: : Concatenates 2 matrices of same cardinality into a single matrix[/mw_shl_code]



如果要执行某种算法,如上面结果中显示的 canopy,就需要执行 mahout canopy 加上该算法需要的其它参数。
另外,Mahout 算法的输入输出,都是在 Hadoop HDFS 上,因此需要通过 hdfs 命令上传到 hdfs 文件系统;输出大多为 Mahout 特有的二进制格式,需要通过 mahout seqdumper 等命令来导出并转换为可读文本。


准备输入
Mahout 算法使用的 input 需要特定格式的 Vector 文件,不能够直接使用一般的文本文件,因此需要把文本转换为 Vector 文件,好在 Mahout 自身提供了这样的类:
org.apache.mahout.clustering.conversion.InputDriver。
在 Mahout 的 conf 目录中的 driver.classes.default.props 增加如下行:
org.apache.mahout.clustering.conversion.InputDriver = input2Seq : create sequence file from blank separated files,然后就可以为 Mahout 增加一个功能,把空格分隔的文本文件转换为 Mahout 聚类可以使用的向量。
如下面的数据所示,该数据每行为一个包含 6 个属性的向量:
1 4 3 11 4 3
2 2 5 2 10 3
1 1 2 2 10 1
1 4 2 11 5 4
1 1 3 2 10 1
2 4 5 9 5 2
2 6 5 3 8 1
执行 ./mahout input2Seq -i /shanlei/userEnum -o /shanlei/vectors 则产生聚类需要的向量文件。


聚类
以 Kmeans 聚类为例:
./mahout kmeans --input /shanlei/vectors --output /shanlei/kmeans -c /shanlei/k --maxIter 5 -k 8 –cl
-k 8 指明产生 8 类,执行完成后,在/shanlei/kmeans/下会产生: clusters-0,clusters-1,… …,clusters-n-final 目录,每个目录都是一次迭代产生的 centroids, 目录数会受 --maxIter 控制;最后的结果会加上 final。
利用 Mahout 的 clusterdump 功能我们可以查看聚类的结果:
[mw_shl_code=bash,true]./mahout clusterdump -i /shanlei/kmeans/clusters-2-final -o ./centroids.txt
more centroids.txt:
VL-869{n=49 c=[1.163, 5.082, 4.000,
4.000, 4.592, 2.429] r=[0.370, 0.965, 1.030, 1.245, 1.244, 1.161]}
VL-949{n=201 c=[1.229, 4.458, 4.403,
10.040, 6.134, 1.458] r=[0.420, 1.079, 0.836, 1.196, 1.392, 0.852]}
   … …
VL-980{n=146 c=[1.281, 2.000, 4.178,
2.158, 9.911, 1.918] r=[0.449, 0.712, 1.203, 0.570, 0.437, 1.208]}[/mw_shl_code]


VL-869 中的 869 为该类的 id,c=[1.163, 5.082, 4.000, 4.000, 4.592, 2.429] 为 centroid 的坐标,n=49 表示该类中数据点的个数。
如果使用-cl 参数,则在/shanlei/kmeans/下会产生 clusteredPoints,利用 Mahout 的 seqdumper 可以看其内容:

[mw_shl_code=bash,true] Input Path:
hdfs://rac122:18020/shanlei/kmeans/clusteredPoints/part-m-00000
Key class: class
org.apache.hadoop.io.IntWritable Value Class: class
org.apache.mahout.clustering.classify.WeightedPropertyVectorWri
table
Key: 301: Value: wt: 1.0 distance:
6.6834472852629006 vec: 1 = [1.000, 4.000, 3.000, 11.000, 4.000, 3.000]
Key: 980: Value: wt: 1.0 distance:
2.3966504034528384 vec: 2 = [2.000, 2.000, 5.000, 2.000, 10.000, 3.000][/mw_shl_code]

Key 对应的则是相关聚类的 id,distance 为到 centroid 的距离。vec 则是原始的向量。
从 Mahout 的聚类输出结果来看,能够很容易的实现 IBM 简化后的 Silhouette 算法,内聚因子 (a) 可以简单的获取到,而离散因子 (b) 也能够简单的计算实现。下面我们就来设计 Mahout 中的实现。


Mahout 中 Silhouette 实现

算法设计:
遵循 Hadoop 上 MR 程序的设计原则,算法设计考虑了 mapper,reducer 及 combiner 类。
Mapper 设计:

输入目录:聚类的最终结果目录 clusteredPoints(通过命令行参数-i 设置),
输入:
Key:IntWritable,Value:WeightedPropertyVectorWritable
输出:
Key:IntWritable(无意义,常量 1),Value:Text(单个点的 Silhouette 值,格式为“cnt,Silhouette 值”)
Setup 过程:
因为需要计算 separation 时候要访问其它的 centroids,所以在 setup 中读取(通过命令行参数-c 设置)并缓存。
Map 过程:
由于输入的 Value 为 WeightedPropertyVectorWritable,可以通过访问字段 distance 获得参数 a,并遍历缓存的 centroids,针对其 id 不等于 Key 的,逐一计算距离,其最小的就是参数 b。
Map 的结果 Key 使用常量 1,Value 为形如“1,0.23”这样的“cnt,Silhouette 值”格式。


Reducer 设计:
输入:
Key:IntWritable(常量 1),Value: Text (combine 后的中间 Silhouette 值,格式为“cnt,Silhouette 值”)。
输出:
Key:IntWritable(常量 1),Value:整个聚类的 Silhouette 值,格式为“cnt,Silhouette 值”。
输出目录:最终文件的产生目录,通过命令行参数-o 设置。
Reduce 过程:
根据“,”把每个 Value,分解为 cnt,和 Silhouette,最后进行加权平均。
[size=1.166em]Combiner 设计:
为减少数据的 copy,采用 combiner,其实现即为 reducer 的实现。


实现代码:

Mapper 类:

[mw_shl_code=bash,true]public class AssesserMapper extends Mapper<IntWritable,
WeightedPropertyVectorWritable, IntWritable, Text> {
private List<Cluster> clusterModels;
private static final Logger log = LoggerFactory.getLogger(ClusterAssesser.class);
protected void setup(Context context) throws IOException, InterruptedException {
super.setup(context);

Configuration conf = context.getConfiguration();
String clustersIn = conf.get(ClusterClassificationConfigKeys.CLUSTERS_IN);

clusterModels = Lists.newArrayList();

if (clustersIn != null && !clustersIn.isEmpty()) {
Path clustersInPath = new Path(clustersIn);
clusterModels = populateClusterModels(clustersInPath, conf);

}
}

private static List<Cluster> populateClusterModels(Path clustersIn,
Configuration conf) throws IOException {
List<Cluster> clusterModels = Lists.newArrayList();
Path finalClustersPath = finalClustersPath(conf, clustersIn);
Iterator<?> it = new SequenceFileDirValueIterator<Writable>(finalClustersPath, PathType.LIST,
PathFilters.partFilter(), null, false, conf);
while (it.hasNext()) {
ClusterWritable next = (ClusterWritable) it.next();
Cluster cluster = next.getValue();
cluster.configure(conf);
clusterModels.add(cluster);
}
return clusterModels;
}
private static Path finalClustersPath(Configuration conf,
Path clusterOutputPath) throws IOException {
FileSystem fileSystem = clusterOutputPath.getFileSystem(conf);
FileStatus[] clusterFiles = fileSystem.listStatus(clusterOutputPath,
PathFilters.finalPartFilter());
log.info("files: {}", clusterOutputPath.toString());
return clusterFiles[0].getPath();
}
protected void map(IntWritable key, WeightedPropertyVectorWritable vw, Context context)
throws IOException, InterruptedException {
int clusterId=key.get();
double cohension,separation=-1,silhouete;
Map<Text,Text> props=vw.getProperties();
cohension=Float.valueOf(props.get(new Text("distance")).toString());
Vector vector = vw.getVector();
for ( Cluster centroid : clusterModels) {
if (centroid.getId()!=clusterId) {

DistanceMeasureCluster distanceMeasureCluster = (DistanceMeasureCluster) centroid;
DistanceMeasure distanceMeasure = distanceMeasureCluster.getMeasure();
double f = distanceMeasure.distance(centroid.getCenter(), vector);

if (f<separation || separation<-0.5) separation=f;
}
}
Text value=new Text(Long.toString(1)+","+Double.toString(silhouete));
IntWritable okey=new IntWritable();
okey.set(1);
context.write(okey, value);
}

}[/mw_shl_code]


Reducer 类:

[mw_shl_code=bash,true]public class AssesserReducer extends Reducer<IntWritable, Text, IntWritable, Text> {
private static final Logger log = LoggerFactory.getLogger(ClusterAssesser.class);
protected void setup(Context context) throws IOException, InterruptedException {
super.setup(context);
log.info("reducer");
}
private static final Pattern SEPARATOR = Pattern.compile("[\t,]");
public void reduce(IntWritable key, Iterable<Text> values,
Context context)
throws IOException, InterruptedException {
long cnt=0;
double total=0;
for (Text value : values) {
String[] p=SEPARATOR.split(value.toString());
Long itemCnt=Long.parseLong(p[0]);
double v=Double.parseDouble(p[1]);
total=total+ itemCnt*v;
cnt=cnt+itemCnt;
}

}

}[/mw_shl_code]


Job 类:


[mw_shl_code=bash,true]public class ClusterAssesser extends AbstractJob {

private ClusterAssesser() {

}
public int run(String[] args) throws Exception {
addInputOption();
addOutputOption();
//addOption(DefaultOptionCreator.methodOption().create());
addOption(DefaultOptionCreator.clustersInOption()
.withDescription("The input centroids").create());

if (parseArguments(args) == null) {
return -1;
}
Path input = getInputPath();
Path output = getOutputPath();
Path clustersIn = new Path(getOption(DefaultOptionCreator.CLUSTERS_IN_OPTION));
if (getConf() == null) {
setConf(new Configuration());
}
run(getConf(), input, clustersIn, output);
return 0;
}
private void run(Configuration conf, Path input, Path clustersIn,
Path output)throws IOException, InterruptedException,
ClassNotFoundException {
conf.set(ClusterClassificationConfigKeys.CLUSTERS_IN, clustersIn.toUri().toString());

Job job = new Job(conf, "Cluster Assesser using silhouete over input: " + input);
job.setJarByClass(ClusterAssesser.class);

job.setInputFormatClass(SequenceFileInputFormat.class);
job.setOutputFormatClass(SequenceFileOutputFormat.class);

job.setMapperClass(AssesserMapper.class);

job.setCombinerClass(AssesserReducer.class);

job.setReducerClass(AssesserReducer.class);
job.setNumReduceTasks(1);

job.setOutputKeyClass(IntWritable.class);
job.setOutputValueClass(Text.class);

FileInputFormat.addInputPath(job, input);
FileOutputFormat.setOutputPath(job, output);
if (!job.waitForCompletion(true)) {
throw new InterruptedException("Cluster Assesser Job failed processing " + input);
}

}
private static final Logger log = LoggerFactory.getLogger(ClusterAssesser.class);

public static void main(String[] args) throws Exception {
ToolRunner.run(new Configuration(), new ClusterAssesser(), args);
}

}[/mw_shl_code]


编译运行:
编译环境准备:
在从 Mahout 网站下载的包中,同时包含了源码以及可以导入到 eclipse 的工程,导入后,会产生 mahout-core,mahout-distribution,mahout-example 等不同的 projects,我们首先编译一遍,保证没有错误,然后再考虑如何增加自己的代码。
当然,Mahout 在顶层目录也提供了一个编译脚本:compile.sh, 可以在命令行完成编译。
代码编译:
把自己的代码放到 example/src/main/java/目录下,自动编译就可以了。输出产生的类:com.ai.cluster.assesser.ClusterAssesser,然后就被打包到了 examples/target/mahout-examples-0.9-job.jar 中。
配置:
  • 把 examples/target/mahout-examples-0.9-job.jar 覆盖顶层的 mahout-examples-0.9-job.jar
  • 通过在 conf/driver.classes.default.props 文件添加如下行,把我们的实现类加入到 Mahout 的配置中,从而可以通过 Mahout 脚本执行:

[mw_shl_code=bash,true]com.ai.cluster.assesser.ClusterAssesser = assesser : assesse cluster result using silhoueter algorithm
[/mw_shl_code]

运行

利用前面我们做聚类过程分析产生的聚类结果:

[mw_shl_code=bash,true]bin/mahout assesser -i /shanlei/kmeans/clusteredPoints -o /shanlei/silhouete -c
/shanlei/kmeans --tempDir /shanlei/temp[/mw_shl_code]
其中的-c 为输入聚类的中心点,-i 为聚类的点 –o 为最终的输出。

[mw_shl_code=bash,true]查看结果:
bin/mahout seqdumper -i /shanlei/silhouete -o ./a.txt
more a.txt:[/mw_shl_code]
[mw_shl_code=bash,true]Input Path: hdfs://rac122:18020/shanlei/silhouete/part-r-00000
Key class: class org.apache.hadoop.io.IntWritable Value Class: class org.apache.hadoop.io.Text
Key: 1: Value: 1000,0.5217678842906524
Count: 1[/mw_shl_code]

1000 表示共 1000 个点,0.52176 为聚类的 Silhouette 值。大于 0.5,看起来效果还行。



结束语:
不同于其它的套件,Mahout 从发布起就是为处理海量数据、为生产而准备的。直到现在,Mahout 的重心还是在优化各种算法上面,对易用性考虑不多,而且学习成本也很高。但 Mahout 不仅仅提供某些特定的算法,而且还把前期准备中的数据清洗,转换,以及后续的效果评估、图形化展现都集成在一块,方便用户。这不仅是一种发展趋势,也是争取用户的一个关键因素。希望大家都能够加入进来,提供各种各样的辅助功能,让 Mahout 变得易用起来。






欢迎加入about云群425860289432264021 ,云计算爱好者群,关注about云腾讯认证空间

没找到任何评论,期待你打破沉寂

您需要登录后才可以回帖 登录 | 立即注册

本版积分规则

关闭

推荐上一条 /2 下一条