分享

Hadoop应用实例

本帖最后由 pig2 于 2014-5-26 14:15 编辑
问题导读:
1.Last.fm网站为什么使用hadoop?
2.Track Statistics(音轨统计)如何通过程序实现的?
3.听用户数。mapreduce是如何实现的?
4.统计音频使用总数mapreduce是如何实现的?
5.facebook如何灵活使用hive来统计各种数据的?
6.facebook如何使用hive统计Hive系统中查询2008-12-01这天的广告曝光数?






Hadoop 在Last.fm的应用

  Last.fm:社会音乐史上的革命

  Last.fm它是一个提供网络电台和网络音乐服务的社区网站,向用户提供很多服务,例如免费听音乐和音乐下载,音乐及重大事件推荐,个性化图表服务以及其他很多服务。每个月大约有2500万人使用Last.fm,因而产生大量需要处理的数据。一个例子就是用户传输他们正在收听的音乐信息(也就是收藏 “scrobbling”)。Last.fm处理并且存储这些数据,以便于用户可以直接访问这些数据(用图表的形式),并且可以利用这些数据来推断用户的个人音乐品味、喜好和喜爱的艺术家,然后用于寻找相似的音乐。


  Hadoop在Last.fm中的应用

  随着Last.fm服务的发展,用户数目从数千增长到数百万,这时,存储、处理和管理这些用户数据渐渐变成一项挑战。幸运的是,当大家认识到Hadoop技术能解决众多问题之后,Hadoop的性能迅速稳定下来,并被大家积极地运用。2006年初,Last.fm开始使用Hadoop,几个月之后便投入实际应用。Last.fm使用Hadoop的理由归纳如下。

  分布式文件系统为它所存储的数据(例如,网志,用户收听音乐的数据)提供冗余备份服务而不增加额外的费用。

  可以方便地通过增添便宜、普通的硬件来满足可扩展性需求。

  当时Last.fm财力有限,Hadoop是免费的。

  开源代码和活跃的社区团体意味着Last.fm能够自由地修改Hadoop,从而增添一些自定义特性和补丁。

  Hadoop提供了一个弹性的容易掌握的框架来进行分布式计算。

  现在,Hadoop已经成为Last.fm基础平台的关键组件,目前包括2个Hadoop集群,涉及50台计算机、300个内核和100 TB的硬盘空间。在这些集群上,运行着数百种执行各种操作的日常作业,例如日志文件分析、A/B测试评测、即时处理和图表生成。本节的例子将侧重于介绍产生图表的处理过程,因为这是Last.fm对Hadoop的第一个应用,它展示出Hadoop在处理大数据集时比其他方法具有更强的功能性和灵活性。

  用Hadoop产生图表
  Last.fm使用用户产生的音轨收听数据来生成许多不同类型的图表,例如针对每个国家或个人音轨数据的一周汇总图表。许多Hadoop程序处理收听数据和产生这些图表,它们可以以天、周或月为单位执行。图16-1展示了这些数据在网站上如何显示的一个例子,本例是音乐的周排行统计数据。

   1.jpg
  ▲图16-1. Last.fm音乐排行统计图表

  通常情况下,Last.fm有两种收听信息。

  用户播放自己的音乐(例如,在PC机或其他设备上听MP3文件),这种信息通过Last.fm的官方客户端应用或一种第三方应用 (有上百种)发送到Last.fm。


  用户收听Last.fm某个网络电台的节目,并在本地计算机上通过流技术缓冲一首歌。Last.fm播放器或站点能被用来访问这些流数据,然后它能给用户提供一些额外的功能,比如允许用户对她收听的音频进行喜爱、跳过或禁止等操作。


  在处理接收到的数据时,我们对它们进行分类:一类是用户提交的收听的音乐数据从现在开始,第一类数据称为“scrobble”(收藏数据);另一类是用户收听的Last.fm的电台数据(从现在开始,第二类数据称为“radio listen”(电台收听数据)。为了避免Last.fm的推荐系统出现信息反馈循环的问题,对数据源的区分是非常重要的,而Last.fm的推荐系统只使用scrobble数据。Last.fm的Hadoop程序的一项重要任务就是接受这些收听数据,做统计并形成能够在Last.fm网站上进行显示和作为其他Hadoop程序输入的数据格式。这一过程是Track Statistics(音轨统计)程序实现的,它就是在以下几节描述的实例。


Track Statistics程序

  音乐收听信息被发送到Last.fm时,会经历验证和转换阶段,最终结果是一系列由空格分隔的文本文件,包含的信息有用户ID(userId)、音乐(磁道)ID(trackId)、这首音乐被收藏的次数(Scrobble)、这首音乐在电台中收听的次数(Radio)以及被选择跳过的次数(Skip)。表16-1包含一些采样的收听数据,后面介绍的例子将用到这些数据,它是Track Statistics程序的输入(真实数据达GB数量级,并且具有更多的属性字段,为了方便介绍,这里省略了其他的字段)。

   2.jpg

  ▲表16-1. 收听数据

  这些文本文件作为初始输入提供给Track Statistics程序,它包括利用这个输入数据计算各种数据值的两个作业和一个用来合并结果的作业(见图16-2)。

  Unique Listeners作业模块统计收听同一首音频的不同用户数,通过累计不同用户对该音频文件的第一次访问而忽略同一用户对这一文件的多次访问,即可得到该数值。Sum作业模块通过对所有用户的所有收听信息进行计数来为每个音频统计收听总数、收藏总数、电台收听总数以及被跳过的总数。

   3.jpg

  ▲图16-2. 音频状态统计作业

  尽管这两个作业模块的输入格式是相同的,我们仍然需要两个作业模块,因为Unique Listeners作业模块负责为每个用户对每个音频产生统计值,而Sum作业模块为每个音频产生统计值。最后Merge作业模块负责合并由这两个模块产生的中间输出数据得到最终统计结果。运行这段程序的最终结果是对每个音频产生以下几项数值:

  •   不同的听众数
  •   音频的收藏次数
  •   音频在电台中的点播次数
  •   音频在电台中被收听的总次数
  •   音频在电台广播中被跳过的次数


  下面我们将详细介绍每个作业模块和它的MapReduce阶段。请注意,由于篇幅有限,所提供的代码段已被简化。要想下载完整的代码,请参考本书“前言”。


  计算不同的听众数
  Unique Listeners作业模块用于计算每个音频的不同收听用户数。
  UniqueListenerMaper UniqueListenerMaper程序处理用空格分隔的原始收听数据,然后对每个track ID(音频ID)产生相应的user ID(用户ID):
  public void map(LongWritable position, Text rawLine, OutputCollector
  IntWritable> output, Reporter reporter) throws IOException {
  String[] parts = (rawLine.toString()).split(" ");
  int scrobbles = Integer.parseInt(parts[TrackStatisticsProgram.COL_SCROBBLES]);
  int radioListens = Integer.parseInt(parts[TrackStatisticsProgram.COL_RADIO]);
  // if track somehow is marked with zero plays - ignore
  if (scrobbles <= 0 && radioListens <= 0) {
  return;
  }
  // if we get to here then user has listened to track,
  // so output user id against track id
  IntWritable trackId = new IntWritable(
  Integer.parseInt(parts[TrackStatisticsProgram.COL_TRACKID]));
  IntWritable userId = new IntWritable(
  Integer.parseInt(parts[TrackStatisticsProgram.COL_USERID]));
  output.collect(trackId, userId);
  }
  UniqueListenersReducer UniqueListenersReducer接收到每个track ID对应的user ID数据列表之后,把这个列表放入Set类型对象以消除重复的用户ID数据。然后输出每个track ID对应的这个集合的大小(不同用户数)。但是如果某个键对应的值太多,在set对象中存储所有的reduce值可能会有内存溢出的危险。实际上还没有出现过这个问题,但是为了避免这一问题,我们可以引入一个额外的MapReduce处理步骤来删除重复数据或使用辅助排序的方法(详细内容请参考第241页的“辅助排序”小节)。
  public void reduce(IntWritable trackId, Iterator values,
  OutputCollector output, Reporter reporter)
  throws IOException {
  Set userIds = new HashSet();
  // add all userIds to the set, duplicates automatically removed (set contract)
  while (values.hasNext()) {
  IntWritable userId = values.next();
  userIds.add(Integer.valueOf(userId.get()));
  }
  // output trackId -> number of unique listeners per track
  output.collect(trackId, new IntWritable(userIds.size()));
  }
  表16-2是这一作业模块的样本输入数据。map输出结果如表16-3所示,reduce输出结果如表16-4所示。
  表16-2. 作业的输入
   4.jpg

  表16-3. map输出
   5.jpg
  ▲
  表16-4. reduce输出
   6.jpg
  ▲

  统计音频使用总数

  Sum作业相对简单,它只为每个音轨累计我们感兴趣的数据。
  SumMapper 输入数据仍然是原始文本文件,但是这一阶段对输入数据的处理完全不同。期望的输出结果是针对每个音轨的一系列累计值(不同用户、播放次数、收藏次数、电台收听次数和跳过次数)。为了方便处理,我们使用一个由Hadoop Record I/O类产生的TrackStats中间对象,它实现了WritableComparable方法(因此可被用作输出)来保存这些数据。mapper创建一个TrackStats对象,根据文件中的每一行数据对它进行值的设定,但是“不同的用户数”(unique listener count)这一项没有填写(这项数据由merge作业模块填写)。
  public void map(LongWritable position, Text rawLine,
  OutputCollector output, Reporter reporter)
  throws IOException {
  String[] parts = (rawLine.toString()).split(" ");
  int trackId = Integer.parseInt(parts[TrackStatisticsProgram.COL_TRACKID]);
  int scrobbles = Integer.parseInt(parts[TrackStatisticsProgram.COL_SCROBBLES]);
  int radio = Integer.parseInt(parts[TrackStatisticsProgram.COL_RADIO]);
  int skip = Integer.parseInt(parts[TrackStatisticsProgram.COL_SKIP]);
  // set number of listeners to 0 (this is calculated later)
  // and other values as provided in text file
  TrackStats trackstat = new TrackStats(0, scrobbles + radio, scrobbles, radio, skip);
  output.collect(new IntWritable(trackId), trackstat);
  }
  SumReducer 在这一过程,reducer执行和mapper相似的函数——对每个音频使用总数情况进行统计,然后返回一个总的统计数据:
  public void reduce(IntWritable trackId, Iterator values,
  OutputCollector output, Reporter reporter)
  throws IOException {
  TrackStats sum = new TrackStats(); // holds the totals for this track
  while (values.hasNext()) {
  TrackStats trackStats = (TrackStats) values.next();
  sum.setListeners(sum.getListeners() + trackStats.getListeners());
  sum.setPlays(sum.getPlays() + trackStats.getPlays());
  sum.setSkips(sum.getSkips() + trackStats.getSkips());
  sum.setScrobbles(sum.getScrobbles() + trackStats.getScrobbles());
  sum.setRadioPlays(sum.getRadioPlays() + trackStats.getRadioPlays());
  }
  output.collect(trackId, sum);
  }
  表16-5是这个部分作业的输入数据(和Unique Listener作业模块的输入一样)。map的输出结果如表16-6所示,reduce的输出结果如表16-7所示。
  表16-5. 作业输入
   7.jpg
  ▲
  表16-6. map输出
   8.jpg
  ▲
  表16-7. reduce 输出
   9.jpg
  ▲

合并结果
  最后一个作业模块需要合并前面两个作业模块产生的输出数据:每个音频对应的不同用户数和每个音频的使用统计信息。为了能够合并这两种不同的输入数据,我们采用了两个不同的mapper(对每一种输入定义一个)。两个中间作业模块被配置之后可以把他们的输出结果写入路径不同的文件,MultipleInputs类用于指定mapper和文件的对应关系。下面的代码展示了作业的JobConf对象是如何设置来完成这一过程的:
  MultipleInputs.addInputPath(conf, sumInputDir,
  SequenceFileInputFormat.class, IdentityMapper.class);
  MultipleInputs.addInputPath(conf, listenersInputDir,
  SequenceFileInputFormat.class, MergeListenersMapper.class);
  虽然单用一个mapper也能处理不同的输入,但是示范解决方案更方便,更巧妙。
  MergeListenersMapper 这个mapper用来处理UniqueListenerJob输出的每个音轨的不同用户数据。它采用和SumMapper相似的方法创建TrackStats对象,但这次它只填写每个音轨的不同用户数信息,不管其他字段:
  public void map(IntWritable trackId, IntWritable uniqueListenerCount,
  OutputCollector output, Reporter reporter)
  throws IOException {
  TrackStats trackStats = new TrackStats();
  trackStats.setListeners(uniqueListenerCount.get());
  output.collect(trackId, trackStats);
  }
  表16-8是mapper的一些输入数据;表16-9是对应的输出结果。
  表16-8. MergeListenersMapper的输入
   10.jpg
  ▲
  表16-9. MergeListenersMapper的输出
   11.jpg
  ▲
  IdentityMapper IdentityMapper被配置用来处理SumJob输出的TrackStats对象,因为不要求对数据进行其他处理,所以它直接输出输入数据(见表16-10)。
  表16-10. IdentityMapper的输入和输出
   12.jpg
  ▲
  SumReducer 前面两个mapper产生同一类型的数据:每个音轨对应一个TrackStats对象,只是数据赋值不同。最后的reduce阶段能够重用前面描述的SumReducer来为每个音轨创建一个新的TrackStats对象,它综合前面两个TrackStats对象的值,然后输出结果(见表16-11)。
  表16-11. SumReducer的最终输出
   13.jpg
  ▲
  最终输出文件被收集后复制到服务器端,在这里一个Web服务程序使Last.fm网站能得到并展示这些数据。如图16-3所示,这个网页展示了一个音频的使用统计信息:接听者总数和播放总次数。
   14.jpg
  ▲图16-3. TrackStats结果

  总结
  Hadoop已经成为Last.fm基础框架的一个重要部件,它用于产生和处理各种各样的数据集,如网页日志信息和用户收听数据。为了让大家能够掌握主要的概念,这里讲述的例子已经被大大地简化;在实际应用中输入数据具有更复杂的结构并且数据处理的代码也更加繁琐。虽然Hadoop本身已经足够成熟可以支持实际应用,但它仍在被大家积极地开发,并且每周Hadoop社区都会为它增加新的特性并提升它的性能。Last.fm很高兴是这个社区的一分子,我们是代码和新想法的贡献者,同时也是对大量开源技术进行利用的终端用户。
  (作者:Adrian Woodhead和Marc de Palol)


Hadoop在Facebook的使用

  发展史
  随着Facebook网站的使用量增加,网站上需要处理和存储的日志和维度数据激增。在这种环境下对任何一种数据处理平台的一个关键性要求是它必须具有快速的支持系统扩展的应变能力。此外,由于工程资源有限,所以系统必须是可信的,并且易于使用和维护。

  最初,Facebook使用的数据仓库都是在Oracle系统上实现的。在我们遇到可扩展性和性能方面的问题之后,开始调查是否有开源技术能够应用到我们的环境中。这次调查工作的一部分内容就是我们部署了一个相对小规模的Hadoop实例对象,并且把一部分核心数据集发布到这个实例对象上。Hadoop对我们来说有相当大的吸引力,一是因为Yahoo!内部就一直使用这一技术来完成后台数据处理需求,二是因为我们熟知Google提出并普及使用的MapReduce模型的简单性和可扩展性。

  我们最初的原型系统开发得非常成功:工程师们都喜欢它能在合理的时间范围内处理大数量级数据的能力,这是我们以前所没有的处理能力。能用自己熟悉的编程语言来进行数据处理工作(使用Hadoop Streaming),他们也感到非常高兴。把我们的核心数据集发布到一个集中式数据仓库也非常方便。几乎同时,我们开始开发Hive工具。这使用户在Hadoop集群上处理数据变得更加容易,因为普通的计算需求都能用大多数程序员和分析师们熟悉的SQL语句来表达。

  因此,集群的规模和使用迅速增长,现在Facebook正在运行世界第二大Hadoop集群系统。在写这篇文章的时候,我们在Hadoop上存放的数据超过了2 PB,每天给它加载的数据超过10 TB。我们的Hadoop系统具有2400个内核,大约9 TB的内存,并且在一天之中的很多时间点,这些硬件设备都是满负荷运行的。根据系统的增长情况,我们能够迅速地进行集群规模扩展,而且我们已经能够利用开放资源的优点,通过修改Hadoop代码让它适应我们的需求。同时我们也对开放资源做出了贡献,比如我们开发的一些Hadoop核心组件,我们提供的Hive的开放资源代码,Hive现在是Hadoop的一个子项目。

  使用情况

  在Facebook,对Hadoop至少有下面四种相互关联但又不同的用法。

  在大规模数据上产生以天和小时为单位的概要信息。这些概要信息在公司内用于各种不同的目的:

  基于这些概要信息产生的报告,可供工程或非工程职能组用来制定产品决策。概要信息包含用户数、网页浏览次数和网站访问时间的增长情况

  提供在Facebook上进行广告营销活动的相关的效果数据

  对网站属性的后台处理,比如计算你喜欢的人和应用程序

  在历史数据上运行即时作业。数据分析结果有助于产品组和执行主管解决问题。

  成为我们日志数据集的实用而长期的存档存储器。

  通过特定的属性进行日志事件查询(用这些属性对日志建立索引),这可以用于维护网站的完整性并且保护用户免受垃圾邮件程序的侵扰。

  数据架构

  图16-4展示了我们数据架构的基本组件以及这些组件间的数据流。
15.jpg
▲图16-4. Facebook的数据仓库架构
  如图16-4所示,数据处理过程中使用了以下组件。
  Scribe(记录器)
  日志数据是由Web服务器以及内部服务如搜索后台(Search backend)产生的。我们使用了Scribe(记录)组件,它是Facebook开发的一个开源日志收集服务,它把几百个日志数据集(每天有几十个TB的数据量)存放在几个NFS(网络文件服务器)上。

  HDFS(Hadoop分布式文件系统)
  大部分的日志数据被复制存入一个中央的HDFS系统。每一天,维度数据也从我们内部的MySQL数据库复制到这个HDFS文件系统。

  Hive/Hadoop(Hive数据仓库)
  我们使用由Facebook开发的Hadoop的一个子项目“Hive”为HDFS收集的所有数据创建一个数据仓库。HDFS中的文件包括来自Scribe的日志数据和来自MySQL的维度数据,它们都作为可以访问的具有逻辑分区的表(table)。Hive提供了一种类SQL的查询语言,它配合MapReduce创建/发布各种概要信息和报表以及对这些表格数据执行历史分析。

  Tools(工具集)
  建立于Hive之上的浏览器界面允许用户通过几次简单的鼠标点击来创建和发出Hive查询(依次启动MapReduce作业)。

  Traditional RDBMS(传统的关系数据库系统)
  我们使用Oracle和MySQL数据库来发布这些概要信息。虽然这些数据库存储的数据量相对较小,但是查询频度很高并且我们需要对查询做出实时响应。

  DataBee
  它是一个内部的ETL(数据提取、转换和加载,即Extraction-Transformation-Loading)工作流软件,它可以为跨所有数据处理作业的可靠批处理提供一个通用的框架。

  Scribe数据存储在网络文件系统层(NFS tier),这些数据被复制作业持续复制到HDFS(集群上)。NFS设备被挂接在Hadoop层,因而复制处理便成为在Hadoop集群上运行的只有map阶段的作业。这使扩展复制处理变得更容易,而且使其具有错误恢复的能力。目前,我们每天用这种方法从Scribe复制6 TB以上的数据到HDFS。我们每天也从MySQL层下载多达4 TB的维度数据到HDFS。这些把数据从MySQL复制出来的map作业,我们很容易在Hadoop集群上进行调度执行。

  Hadoop配置
  Hadoop部署工作的中心思想是一体性(consolidation)。我们使用一个单独的HDFS系统,大量的处理工作在一个单独的MapReduce集群上完成(运行一个单独的jobtracker)。这样做的原因很简单。

  只运行一个集群可以最小化管理成本。

  数据不必复制。对于前面描述的使用情况,我们可以在同一个位置得到所有的数据。

  所有的部门都使用同一个计算机集群可以极大地提升效率。

  我们的用户工作在一个相互协作的环境下,因此对于服务质量的要求还不是很繁重(至少就目前而言)。

  我们也拥有一个单独的共享的Hive元数据存储工具(用MySQL数据库),它管理HDFS上存储的所有Hive表涉及的元数据信息。

  假想的使用情况
  这一节,我们将描述几个典型问题,它们在大型网站上很普遍,由于涉及的开销和规模都太大,所以这些问题很难通过传统的数据仓库管理技术来解决。Hadoop和Hive技术对解决这些问题提供了一种扩展性更好、更有效的方法。

  广告客户的洞察力和广告性能

  Hadoop最普遍的一个用途是为大量数据产生概要信息。通常用于大型广告网络,如Facebook广告网络,Google AdSense,等等,为广告商提供他们所发布的广告的常规汇总统计信息这样一项特有的功能,这样做可以有效地帮助广告商调整他们的广告营销活动。

  在大规模数据集上计算广告效果相关数据是一种数据密集型操作,Hadoop和Hive在可扩展性和计算开销上的优势真的有助于在合理的时间和资金消耗范围内完成这些计算。

  许多广告网络为广告商提供了标准的基于CPC和CPM的广告计费单位。CPC广告计费是根据广告的“点击数计费”(cost-per-click):广告商根据访问这个网站的用户对广告的点击总数付费。另一方面,CPM广告计费是根据在这个网站上看这个广告的人的比例计费。先不管这些标准计费单位,在最近几年,具有更多动态内容的广告支持对个体用户进行不同的内容剪辑(个性化广告定制),这样一种活动也在网络广告业中变得普遍起来。Yahoo!通过SmartAds来实现个性化广告定制,而Facebook给广告商提供了Social Ads。而后者允许广告商把来自用户朋友网络的信息嵌入到广告中;例如,一则Nike广告可能指向某用户的一位朋友,而这个朋友近期刚好也喜欢这个品牌,并且在Facebook上和朋友公开共享这个喜好。另外,Facebook也为广告商提供了Engagement Ad广告形式,通过对广告发表意见/嵌入视频交互,用户可以更有效地和广告交互。总之,在线广告网络为广告商们提供了各种广告发布途径,广告商们感兴趣的是其广告营销活动的相关效果数据,而这种多样性又为计算各种各样的效果数据增添了难度。

  首先,广告商们希望知道总共有多少用户观看或点击了他们的广告以及有多少独立用户。对于动态广告,他们甚至会对这些汇总信息的分类感兴趣,如通过广告单元播放的动态信息分类或通过用户对广告的参与活动分类。例如,一个特定的广告可能向3万个不同用户播放了10万次。类似地,一段嵌入到Engagement Ad的视频可能已经被10万个不同的用户观看。另外,通常我们会针对每则广告、每次广告营销活动和每个帐户汇总报告这些效果相关数据。一个帐号有可能会对应多个广告营销活动,而每个活动可能运行多则网络广告。最后,广告网络通常会根据不同时间间隔报告这些数据。典型的时间段有天、周、月(起始日期相同)和月(固定天数),甚至有时候是整个广告周期。再者,在数据分片和切割的方法中,广告商们也想查看数据的地理分布情况,比如,对于某一则特定广告,亚太地区的浏览者或点击者占多大比率。

  很明显,这里有四种主要的维度层次分类:帐户、广告营销活动和广告维度;时间段维度;交互类型维度;用户维度。最后一个用来指明独立用户的人数,而其他三个是描述广告的相关属性。用户维度也可用来产生浏览和点击用户的地理分布汇总图。总而言之,所有这些信息都有利于广告商们调整广告营销活动,从而提高他们在广告网络上的广告效果。除了这些数据流水线具有多维特性之外,从处理的数据量以及每天数据量的增长速度来看,如果没有Hadoop这样的技术,大型广告网络的规模扩展会非常困难。举个例子,写这篇文章的时候,Facebook为了计算广告的效果数据,每天所处理的日志数据量大约是1 TB数量级(非压缩数据)。2008年1月,每天处理的日志数据量大约是30 GB,那么目前这个数据量已经增长了30倍。随着硬件的增加,Hadoop扩展性增强的特性是使这些数据流能以最小的任务配置修改来适应数据的增长。通常,配置修改涉及增加数据流上进行密集型计算部分Hadoop作业模块的reducer的个数。目前,这些计算模块中最大的部分运行400个reducer(比2008年1月所用的50个reducer增加了8倍)。


  即时分析和产品反馈
  除了产生定期报告之外,数据仓库解决方案的另一种主要应用是能够支持即时分析和产品反馈解决方案。例如,一个典型的网站对其产品做了修改,产品经理或工程师们通常会基于与这个新特性相关的用户交互信息和点击率来推断这个新特性的影响。产品团队甚至希望对这个改变带来的影响做更深入的分析,这个分析有可能针对不同区域和国家进行,例如这个改变是否使美国用户的点击率增加或印度用户的使用减少。使用了Hive和普通的SQL数据库之后,在Hadoop上可以完成很多类似的分析工作。点击率的测定方法可以简单地表达成广告的曝光数和与此新特性相关链接点击次数之间的联系。这种数据能和地理位置信息结合起来用于计算产品的改变对不同区域用户产生的影响。因此,通过对这些数据进行聚集运算,我们可以得到平均点击率在不同地理区域上的分布。Hive系统用几行SQL查询语句就可以简单方便地表达所有这些工作需求(这也将相应地产生多个Hadoop作业)。如果只需要估算,可以使用Hive本身支持的取样函数取一组样本用户数据,然后运行同样的查询语句。其中有些分析工作需要使用自定义map和reduce脚本与Hive SQL联合执行,这种脚本也可以轻松嵌入到Hive查询语句。

  一个更加复杂的分析工作的典型例子是估算在过去一整年里每分钟登录到网站的峰值用户数。这个工作涉及对网页浏览日志文件采样(因为人气网站的网页浏览日志文件总数是很庞大的),根据时间对它们分组,然后运行自定义reduce脚本找出不同时间点的新用户数。这是一个要求同时使用SQL和MapReduce来解决终端用户问题的典型例子,很容易利用Hive来解决这样的问题。


  数据分析
  Hive和Hadoop可以轻松用于为数据分析应用进行训练和打分工作。这些数据分析应用能跨度不同领域,如人气网站、生物信息公司和原油勘探公司。对于在线广告网络产业来说,这种应用的一个典型实例是预测什么样的广告特征能使广告更容易被用户注意。通常,训练阶段涉及确定响应度量标准和预测性的特征。在本例中,评测广告效用的一个良好度量标准可以是点击率。广告的一些有趣的特征可能是广告所属的垂直产业、广告内容、广告在网页中的位置等。Hive可以简便易行地收集训练数据,然后把它们输送到数据分析引擎(通常是R程序或MapReduce应用)。在本例中,不同的广告效果数据和属性特征可以被结构化为Hive的表格。用户可以方便地对数据进行取样(R程序只能处理有限数据集,因此取样是必须的),使用Hive查询语句执行适合的聚集和连接操作然后整合成一个响应表,它包含着决定广告效用最重要的广告特征。然而,取样会有信息损失,有些更加重要的数据分析应用就在MapReduce框架体系之上并行实现流行的数据分析内核程序来减少信息损失。

  一旦模型训练出来,就可以部署,用于根据每天的数据进行打分评估工作。但是大多数数据分析任务并不执行每日评测打分工作。实际上,其中有很多数据分析任务具有即时的性质,要求做到一次性分析,然后结果作为输入进入产品设计过程。


  Hive

  概述

  刚开始使用Hadoop时,我们很快就倾倒于它的可扩展性和有效性。然而,我们担心它是否可以被广泛采用,主要是因为用Java写MapReduce程序的复杂度问题(还有培训用户写这种程序的代价)。我们知道很多公司的工程师和分析师很了解SQL,它是一种查询和分析数据的工具,并且我们也清楚很多人都精通几门脚本语言,如PHP和Python。因此,我们必须开发出一种软件来解决用户精通的脚本语言和Hadoop编程所需语言不同的问题。

  很明显,我们的很多数据集是结构化的,而且能够很容易进行数据分割。这些要求很自然地形成一个结果:我们需要一个系统,它可以把数据模型化成表格和数据块,并且它能够提供类似SQL的查询和分析语言。另外,能把使用用户所选编程语言编写的自定义MapReduce程序嵌入查询这一能力也非常重要。这个系统就是Hive。Hive是一个构建于Hadoop之上的数据仓库架构,在Facebook充当着重要的工具,用于对Hadoop中存储的数据进行查询。在下面几个小节,我们将详细描述这一系统。

  数据的组织

  在所有数据集中,数据的组织形式是一致的,被压缩、分区和排序之后进行存储。

  压缩

  几乎所有数据集都采用gzip codec存储成顺序文件。旧的数据采用bzip重新压缩,这样可以比用gzip编码压缩更多。bzip压缩的速度比gzip慢,但是对旧数据的访问频率要低很多,因此考虑到节省硬盘空间,这种性能损失还是很值得的。

  分区

  大部分数据集是根据日期进行分区的。独立的分区块被加载到Hive系统,PX把每个分区块加载到HDFS的一个单独的目录下。大多数情况下,这种分区只根据相关联的记录日志文件(scribe logfile)的时间戳进行。然而,在某些情况下,我们扫描数据,然后基于日志条目里能找到时间戳进行数据划分。回顾前面的介绍,我们也将根据各种特征进行数据分区(如国家和日期)。

  排序

  在一张表里,每个分区块通常根据某个唯一标识(ID)进行排序(如果ID存在的话)。这样的设计有几个主要的优点:
  在这样的数据集上容易执行取样查询操作;
  我们能基于排序数据建立索引;
  对具有唯一标识的数据进行聚集和连接,运算更有效。
  每日的MapReduce作业会把数据加载成long-term数据格式(和近实时的数据导入处理不同)。

  查询语言
  Hive查询语言和SQL类似。它具有传统的SQL的构造,如 join,group by,where,select,from从句和from从句的子查询。它尽量把SQL命令转换成一系列的MapReduce作业。除普通的SQL从句之外,它还有一些扩展功能,如具有在查询语句中描述自定义mapper和reducer脚本的功能,有对数据进行一次扫描就可以把它们插入多个表、数据块、HDFS和本地文件的功能,有在样本数据而不是全部数据集上执行查询的功能(在测试查询的时候,这个功能非常有用)。Hive metastore存储了表的元数据信息,它提供元数据给Hive编译器,从而进行SQL命令到MapReduce作业的转换。

  通过数据块修剪,map端的聚合和其他一些特色功能,编译器会尝试创建可以优化查询运行时间的方案。

  在数据流水线中使用Hive
  另外,Hive提供了在SQL语句里表达数据流水线的能力,并采用简单方便的方式合并这些数据流,这一功能能够并且已经提供了大量的所需的灵活性。这一功能对于改进中或开发中的系统和产品尤其有用。处理数据流时所需的许多操作都是大家非常了解的SQL操作,如join,group by和distinct aggregation。由于Hive能够把SQL语句转换成一系列Hadoop MapReduce作业,所以创建和维护这些数据流水线就非常容易。在这一节,我们用一个假想的广告网络的例子,通过展示使用Hive来计算广告商所需的某些典型汇总表来说明Hive这些方面的功能。例如,假设某个在线广告网络在Hive系统里把广告信息存储在名为dim_ads的表里,把和某个广告曝光相关的信息存储在名为impression_logs表里,impression_logs表里数据根据日期进行分区,那么在Hive系统中查询2008-12-01这天的广告曝光数(广告网络会把广告营销中的每个和总的广告曝光数定期反馈给广告商)可以表达为如下SQL语句:

  SELECT a.campaign_id, count(1), count(DISTINCT b.user_id)
  FROM dim_ads a JOIN impression_logs b ON(b.ad_id = a.ad_id)
  WHERE b.dateid = '2008-12-01'
  GROUP BY a.campaign_id;
  这也是大家能在其他RDMS(关系数据库系统)如Oracle和DB2等上使用的典型的SQL语句。

  为了从前面同样的连接数据上以广告和帐户为单位计算每天的广告曝光次数,Hive提供了同时做多个group by操作的能力,查询如下所示(类似SQL语句但不是严格意思上的SQL):
  FROM(
  SELECT a.ad_id, a.campaign_id, a.account_id, b.user_id
  FROM dim_ads a JOIN impression_logs b ON (b.ad_id = a.ad_id)
  WHERE b.dateid = '2008-12-01') x
  INSERT OVERWRITE DIRECTORY 'results_gby_adid'
  SELECT x.ad_id, count(1), count(DISTINCT x.user_id) GROUP BY x.ad_id
  INSERT OVERWRITE DIRECTORY 'results_gby_campaignid'
  SELECT x.campaign_id, count(1), count(DISTINCT x.user_id) GROUP BY x.campaign_id
  INSERT OVERWRITE DIRECTORY 'results_gby_accountid'
  SELECT x.account_id, count(1), count(DISTINCT x.user_id) GROUP BY x.account_id;

  在Hive增添的一项优化功能中,其中一项是查询能被转换成一系列适用于“偏斜数据”(skewed data)的Hadoop MapReduce作业。实际上,join操作转换成一个MapReduce作业,三个group by 操作转换成四个MapReduce任务,其中第一个任务通过unique_id产生部分聚集数据。这一功能非常重要,因为impression_logs表的数据在unique_id的分布比在ad_id上的分布更均匀(通常在一个广告网络中,有些广告占主导地位,因为其客户分布更均匀)。因此,通过unique_id计算部分聚集能让数据流水线把工作更均匀地分配到各个reducer。简单改变查询中的日期谓词,同一个相同的查询模板便可以用于计算不同时间段的效果数据。

  但是计算整个广告周期的数据可以采用更好的方法,如果使用前面介绍的计算策略,我们必须扫描impression_logs表中的所有分区。因此,为了计算整个广告周期的数据,一个更可行的方法是在每天的中间表的分区上执行根据ad_id和unique_id的分组操作。这张表上的数据可以和次日的impression_logs合并增量产生整个周期的广告效果数据。例如,要想得到2008-12-01日的广告曝光数据,就需要用到2008-11-30日对应的中间表分区数据块。如下面的Hive查询语句所示:
  INSERT OVERWRITE lifetime_partial_imps PARTITION(dateid='2008-12-01')
  SELECT x.ad_id, x.user_id, sum(x.cnt)
  FROM (
  SELECT a.ad_id, a.user_id, a.cnt
  FROM lifetime_partial_imps a
  WHERE a.dateid = '2008-11-30'
  UNION ALL
  SELECT b.ad_id, b.user_id, 1 as cnt
  FROM impression_log b
  WHERE b.dateid = '2008-12-01'
  ) x
  GROUP BY x.ad_id, x.user_id;

  这个查询为2008-12-01计算局部合计数据,它可以用来计算2008-12-01的数据以及2008-12-02的数据(这里没有展示)。SQL语句转换成一个单独Hadoop MapReduce作业,它实际上是在合并的输入流上做group by计算。在这个SQL语句之后,可以做如下的Hive 查询,它为每个分组计算出实际的数据(与前面对日数据流水线的查询相似)。

  FROM(
  SELECT a.ad_id, a.campaign_id, a.account_id, b.user_id, b.cnt
  FROM dim_ads a JOIN lifetime_partial_imps b ON (b.ad_id = a.ad_id)
  WHERE b.dateid = '2008-12-01') x
  INSERT OVERWRITE DIRECTORY 'results_gby_adid'
  SELECT x.ad_id, sum(x.cnt), count(DISTINCT x.user_id) GROUP BY x.ad_id
  INSERT OVERWRITE DIRECTORY 'results_gby_campaignid'
  SELECT x.campaign_id, sum(x.cnt), count(DISTINCT x.user_id) GROUP BY x.campaign_id
  INSERT OVERWRITE DIRECTORY 'results_gby_accountid'
  SELECT x.account_id, sum(x.cnt), count(DISTINCT x.user_id) GROUP BY x.account_id;

  Hive和Hadoop都是批处理系统,它们计算数据的延迟超出了常用的RDBMS,如Oracle和MySQL。因此,在许多情况下,把Hive和Hadoop系统产生的概要信息加载到传统的RDBMS,让用户通过不同的BI(商业智能)工具或网络门户来使用这些数据仍然很有用。

  存在的问题与未来工作计划

  公平共享
  Hadoop集群通常同时运行多个“日常生产作业”(production daily job)和“即时作业”(ad hoc job),日常生产作业需要在某个时间段内完成计算任务,而即时作业则可能具有不同的优先级以及不同的计算规模。在选择典型安装时,日常生产作业倾向于整夜运行,这时来自用户运行的即时作业的干扰最小。然而,大规模即时作业和生产作业之间的工作时间重合常常是不可避免的,如果没有充分健壮的保障措施,这种作业重合会导致生产作业的延迟。ETL(数据提取、转换和加载)处理也包含几个近实时的作业,它们都必须以小时为间隔地运行(包括从NFS服务器复制Scribe数据以及在某些数据集上以小时为单位的概要数据计算等处理)。它也意味着只要有一个意外作业就会使整个集群当机,使生产处理处于危险境地。

  Facebook开发并且贡献给Hadoop系统的Hadoop公平共享作业调度器(job scheduler)为许多这样的问题提供了解决方案。它为特定作业池中的作业保留保障性计算资源,同时让闲置资源可以被任何作业使用。通过在各个池之间以一种公平手段分配计算资源也可以防止大规模作业霸占集群资源。在集群里,内存是其中一种竞争比较激烈的资源。我们对Hadoop系统做了一些修改,如果发现jobtracker的内存短缺,Hadoop就会减缓或遏制作业提交。这能保证用户进程能够得到合理的进程内存限额,并且为了阻止在同一个节点运行的MapReduce作业影响HDFS后台程序(主要是因为大内存消耗),可以放置一些监控脚本程序。日志目录存储在单独的硬盘分区,并且定期清理,我们认为把MapReduce 中间存储放在单独的硬盘分区上也是有用的。

  空间管理
  硬盘容量管理仍然是一个大挑战——数据的增长带来了硬盘使用的急速增加。数据集日益攀升的许多发展中公司面临着同样的问题。在许多情况下,很多数据实际上是临时数据。这种情况下,我们可以使用Hive的保留期设置,并且可以以bzip格式重新压缩以节省空间。尽管从硬盘存储的观点来看配置可能非常对称,但增加一个高存储密度机器层来管理旧数据可能会有很大好处。这将使Hadoop存储存档数据的消费变得更便宜。然而,对这些数据的访问应该是容易的。目前我们正在为这一个数据存档层的实现而努力工作,统一旧数据处理的方方面面。

  Scribe-HDFS集成
  目前,Scribe编写了几个NFS文件存档器(filer),然后前面所描述的自定义复制作业(copier job)就从这里收集和传送数据给HDFS。我们正致力于让Scribe直接把数据写入其他的FS实例对象。这将简化Scribe的扩展和管理。基于对Scribe的正常运行时间的高要求它的目标HDFS对象可能不同于生产HDFS系统(因此它不会因为用户作业而出现负载/停机的问题)。

  改进Hive
  Hive系统仍然处于活跃的开发阶段。人们关注着几个重要特性的开发,如order by,支持having从句,更多聚集函数,更多内置函数,日期类型,数据类型,等等。同时,我们也在进行大量优化工作,如谓词下推和共同子表达式消除。在集成方面,正在开发JDBC和ODBC驱动程序用于和OLAP及BI工具集成。通过所有这些优化措施,我们希望能够释放MapReduce和Hadoop的潜能,把它更进一步推向非工程化社区以及用于Facebook。该项目相关的更多详细信息,请访问http://hadoop.apache.org/hive/
  (作者:Joydeep Sen Sarma和Ashish Thusoo)


Nutch 搜索引擎

  背景介绍
  Nutch这个框架用于构建立可扩展的网络爬虫(crawler)和搜索引擎。它是Apache软件基金会(Apache Software Foundation)的一个项目,Lucene的一个子项目,遵循Apache许可(2.0)。

  我们并不想多么深入地细究网络爬虫的知识——这个案例研究的目的是展示Hadoop是如何实现搜索引擎各种典型、复杂处理任务的。感兴趣的用户可以在Nutch官方主页(http://lucene.apache.org/nutch)找到项目相关的大量专门信息。可以这样说,为了创建和维护一个搜索引擎,必须要有下面的子系统。

  网页数据库
  这个数据库跟踪网络爬虫要抓取的所有网页和它们的状态,如上一次访问的时间,它的抓取状态信息,刷新间隔,内容校验和,等等。用Nutch的专用名词来说,这个数据库称为CrawlDb。

  爬取网页清单
  网络爬虫定期刷新其Web视图信息,然后下载新的网页(以前没有抓取的)或刷新它们认为已经过期的网页。这些准备爬取的候选网页清单,Nutch称为fetchlist。

  原始网页数据
  网页内容从远程网站下载,以原始的未解释的格式在本地存储成字节数组。Nutch称这种数据为page content。

  解析的网页数据
  网页内容用适合的解析器进行解析——Nutch为各种流行格式的文档提供了解析器,如HTML,PDF,Open Office和Microsoft Office,RSS等。

  链接图数据库
  对于计算基于链接(link)的网页排序(page rank)值来说,如PageRank,这个数据库是必须的。对于Nutch记录的每一个URL,它会包含一串指向它的其他的URL值以及这些URL关联的锚文本(在HTML文件的锚文本元素中得到)。这个数据库称为LinkDb。

  全文检索索引
  这是一个传统的倒排索引,基于搜集到的所有网页元数据与抽取到的纯文本内容而建立。它是使用卓越的Lucene库(http://lucene.apache.org/java)来实现的。

  前面我们简略地提到Hadoop作为一个组件在Nutch系统上得到实现,试图用它提高Nutch系统的可扩展性以及解决那些由集中式数据处理模型引起的一系列瓶颈问题。Nutch也是第一个移植到Hadoop架构之上的公开的概念证明应用,后来它成为Hadoop的一部分,并且事实证明,把Nutch算法和数据结构移植到Hadoop架构所需的工作量惊人地少。这一特点有可能激励大家把Hadoop的开发作为一个子项目,为除Nutch之外的其他应用提供可重用的架构。
  目前,几乎所有的Nutch工具都通过运行一个或多个MapReduce作业来处理数据。

  数据结构
  在Nutch系统中维护着几种主要的数据结构,它们都利用Hadoop I/O类和数据格式来构造。根据数据使用目的和数据创建之后的访问方式,这些数据可以使用Hadoop的映射(map)文件或顺序(sequence)文件进行保存。

  因为数据是MapReduce的作业产生和处理的,而这一过程反过来又会执行几个map和reduce任务,所以它的硬盘存储格式符合常用的Hadoop输出格式,即MapFileOutputFormat和SequenceFileOutputFormat两种格式。精确地说,数据被保存成几个map文件或顺序文件,而文件数和创建数据作业中的reduce任务数相等。为了简单,在下面几节的介绍中,我们忽略格式差异。

  CrawlDb
  CrawlDb存储每个URL的当前状态信息,存储文件是map文件,形式是,这里键使用文本格式,值使用Nutch特定的CrawlDatum类型(它实现Writable接口)。

  为了对这些记录提供快速的随机访问能力(用户想在CrawlDb里面检查个人记录信息的时候),这些数据被存储成map文件而不是顺序文件。

  CrawlDb最初是通过Injector工具创建的,它只是简单地把初始URL列表(种子列表)的纯文本文件转换成一个map文件,格式如前所述。接着,用爬取和解析的网页信息来对它做更新。稍后将对此进行详细介绍。
  LinkDb
  这个数据库为Nutch记载的每个URL存储“入链接”(incoming link)信息。它采用map文件格式进行存储,其中Inlinks是URL列表和锚文本数据。注意,这些信息在网页数据收集阶段并不是立刻可以得到的,但是可以获取反向信息,就是这个页面的“出链接”(outlink)信息。反向链接的信息获取是通过一个MapReduce作业完成的,相关详情可参见后文。

  分段
  在Nutch定义中,“分段”(segment)指的是爬取和解析URL组。图16-5展示了分段的创建和处理过程。
  一个分段(文件系统里的一个目录)包含以下几个部分(它们只不过是一些包含MapFileOutputFormat或SequenceFileOutputFormat格式数据的子目录)。

  content
  content包含下载页面的原始数据,存储为map文件,格式是。为了展示缓存页的视图,这里使用map文件存储数据,因为Nutch需要对文件做快速随机的访问。

  crawl_generate
  它包含将要爬取的URL列表以及从CrawlDb取到的与这些URL页相关的当前状态信息,对应的顺序文件的格式。这个数据采用顺序文件存储,原因有二:第一,这些数据是按顺序逐个处理的;第二,map文件排序键值的不变性不能满足我们的要求。我们需要尽量分散属于同一台主机的URL,以此减少每个目标主机的负载,这就意味着记录信息基本上是随机排列的。

  图16-5. 分割

  crawl_fetch
  它包含数据爬取的状态信息,即爬取是否成功,响应码是什么,等等。这个数据存储在map文件里,格式是。

  crawl_parse
  每个成功爬取并解析的页面的出链接列表都保存在这里,因此Nutch通过学习新的URL可以扩展它的爬取前端页。

  parse_data
  解析过程中收集的元数据;其中还有页面的出链接(frontier)列表。这些信息对于后面介绍的建立反向图(入链接—ink)是相当关键的。

  parse_text
  页面的纯文本内容适合用Lucene进行索引。这些纯文本存储成map文件,格式是,因此要展示搜索结果列表的概要信息(摘要)的时候,Nutch可以快速地访问这些文件。

  Generator工具(图16-5中编号1)运行的时候,CrawlDb里面的数据就会产生一些新的分段,并且开始只包括要爬取的URL列表(是crawl_generat下的子目录)。当这个列表经过几个步骤的处理之后,该分段就从处理工具那里收集输出数据并存放在一系列的子目录里面。

  例如,content从Fetecher工具(2)接收数据,这个工具根据fetchlist的URL列表下载网页原始数据。这个工具也把URL的状态信息存储在crawl_fetch里面,因此这些数据后来可以用于更新CrawlDb的页面状态信息。

  在分段工具中的其他小模块接收来自Parse分段工具(3)的数据,这个工具读入网页内容,然后基于声明的(或检测到的)MIME类型,选择合适的内容解析器,最后把解析结果存为三部分:

  crawl_parse,parse_data和parse_text。然后这些数据被用于更新CrawlDb(4)和创建LinkDb(5)。
  这些分段数据一直保留到它们包含的所有数据都过期为止。Nutch采用的是可配置的最大时间限制的方法,当页面保存的时间段超过这个时间限制后,这个页面会被强制进行重新获取;这将有助于操作员淘汰所有过期的分段数据(因为他能肯定超过这个时间限制之后,这个分段里面的所有页面都已经被重新爬取)。

  分段数据用来创建Lucene索引(【6】——主要是parse_text和parse_data部分的数据),但是它也提供一种数据存储机制来支持对纯文本数据和原始内容数据的快速检索。当Nutch产生摘要信息的时候(和查询最匹配的文档文本片段),需要第一种纯文本数据;第二种原始数据提供了展现页面的缓存视图的能力。这两种用例下,或是要求产生摘要信息或是要求展现缓存页面,都是直接从map文件获取数据。实际上,即使是针对大规模数据,直接从map文件访问数据的效率都很高。

  Nutch系统利用Hadoop进行数据处理的精选实例
  下面几节描述了几种Nutch工具的相关详细信息,主要用于说明Nutch系统如何利用MapReduce模型来完成具体的数据处理任务。

  链接逆转
  爬取到的HTML页面包含HTML链接,这些链接可能指向它本身(内部链接)或指向其他网页。HTML链接从源网页指向目标网页,参见图16-6。
  图16-6. 链接逆转
  然而,许多计算网页重要性(或质量)的算法需要反向链接的信息,也就是那些具有链接指向当前页面的网页。进行网页爬取的时候,我们并不能得到这些信息。另外,如果能把入链接的锚文本也用于索引,索引也会受益,因为这些锚文本可以从语义上丰富当前页面的内容。

  如前所述,Nutch收集出链接信息,然后用这些数据构造一个LinkDb,它包含这种反向链接数据,以入链接和锚文本的形式存放。

  本小节概述一下LinkDb工具的实现过程——为了展现处理过程的清晰画面,忽略了很多细节描述(如URL规范化处理和过滤)。我们主要描述一个典型的实例来解释为什么MapReduce模型能够如此合适地地应用到一个搜索引擎所要求的这种关键数据转换处理任务。大规模搜索引擎需要处理大量的网络图数据(许多页面都有很多出/入链接),Hadoop提供的并行处理和容错机制使得这样的处理成为可能。另外,使用map-sort-reduce基本操作可以很容易地表达链接逆转这一过程,我们将在下面进行介绍。

  下面的代码片段展示了LinkDb工具的作业初始化过程:
  JobConf job = new JobConf(configuration);
  FileInputFormat.addInputPath(job, new Path(segmentPath, "parse_data"));
  job.setInputFormat(SequenceFileInputFormat.class);
  job.setMapperClass(LinkDb.class);
  job.setReducerClass(LinkDb.class);
  job.setOutputKeyClass(Text.class);
  job.setOutputValueClass(Inlinks.class);
  job.setOutputFormat(MapFileOutputFormat.class);
  FileOutputFormat.setOutputPath(job, newLinkDbPath);

  可以看出,这个作业的输入源数据是爬取的URL列表(键)以及相应的ParseData记录,ParseData包含的其中一项数据是每个页面的出链接信息,它是一个数组。一个出链接记录包含目标URL以及相应的锚文本。

  这个作业的输出也是一个URL列表(键),但是值是入链接,它其实只是一个包含目标URL和锚文本的特殊的入链接集合。

  也许出乎我们意料,这些URL一般以纯文本形式存储和处理,而非以java.net.URL或java.net.URI实例的形式。这么做有几个原因:从下载内容里提取的URL通常需要做规范化处理(如把主机名变成小写形式,解析相对路径),或它们是已经坏掉或无效的URL,或它们引用的是不支持的协议。许多规范化和过滤操作能更好地表达成文本模式,它可以跨一个URL的多个组成部分。此外,考虑到链接分析,我们也许仍然想处理和计算这些无效的URL。
  让我们进一步查看map()和reduce()的实现——在这个例子中,它们非常简单以至于这两个函数可以在同一个类里实现:

  public void map(Text fromUrl, ParseData parseData,
  OutputCollector output, Reporter reporter) {
  ...
  Outlink[] outlinks = parseData.getOutlinks();
  Inlinks inlinks = new Inlinks();
  for (Outlink out : outlinks) {
  inlinks.clear(); // instance reuse to avoid excessive GC
  String toUrl = out.getToUrl();
  String anchor = out.getAnchor();
  inlinks.add(new Inlink(fromUrl, anchor));
  output.collect(new Text(toUrl), inlinks);
  }
  }
  从这个代码段可以看到,对每个出链接,我们的map()函数产生一对,其中Inlinks只包含一个Inlink,该Inlink是由fromUrl和它的锚文本组成。链接的指向实现了反转。

  接着,这些只有一个元素的Inlinks用reduce()方法实现聚集处理:
  public void reduce(Text toUrl, Iterator values,
  OutputCollector output, Reporter reporter) {
  Inlinks result = new Inlinks();
  while (values.hasNext()) {
  result.add(values.next());
  }
  output.collect(toUrl, result);
  }
  从这段代码来看,很明显我们已经得到了我们想要的数据——即指向toUrl变量的所有fromUrl列表以及相应的锚文本信息。逆转过程完成。

  然后这些数据以MapFileOutputFormat格式保存,形成新的LinkDb数据库。

  产生fetchlist
  现在来看一个更加复杂的用例。fetchlist产生于CrawlDb的数据(map文件的格式是,其中crawlDatum包含URL的状态信息),它存放准备爬取的URL列表,然后Nutch Fetcher工具处理这个列表。Fetcher工具本身是一个MapReduce应用程序(后面会介绍)。也就是说输入数据(被分成N份)将由N个map任务处理——Fetcher工具强制执行这样的规则,SequenceFileInputFormat格式的数据不能继续切分。前面我们简单提过,fetchlist是通过一个特殊的方法产生的,因此fetchlist的每部分数据(随后由每个map任务处理)必须满足特定的要求。

  1. 来自同一台主机的所有URL最后要放入同一个分区。这是必须的,以便Nutch可以轻松实现in-JVM(java虚拟机里)宿主级封锁来避免目标主机超载。
  2. 为了减少发生宿主级的封锁,来自同一台主机的URL应该尽量分开存放(比如和其他主机的URL充分混合)。
  3. 任何一个单独主机的URL链接数不能多于x个,从而使得具有很多URL的大网站相对于小网站来说,就不会占主导地位(来自小网站的URL仍然有机会被爬取)。
  4. 具有高网页排序值的URL应该优先于低的那些URL。
  5. 在fetchlist中,URL总数不能超过y。
  6. 输出数据分区数应该和最优的爬取map任务数目一致。
  本例中,需要实现两个MapReduce作业来满足所有这些要求,如图16-7所示。同样地,为了简洁,对下面的列表内容,我们将跳过对这些步骤的某些细节描述。

  图16-7. 产生fetchlist

  步骤1:选择,基于网页排序值排序,受限于每台主机的URL数 这一步骤,Nutch运行一个MapReduce作业来选择一些被认为有资格爬取的URL列表,并根据它们的网页排序值(赋给每个页面的浮点数,如PageRank值)对它们进行排序。输入数据来自CrawlDb,后者是一个格式的map文件。这一作业的输出是>格式的sequence文件,根据排序值降序排列。
  首先,我们来看一下作业的设置:
  FileInputFormat.addInputPath(job, crawlDbPath);
  job.setInputFormat(SequenceFileInputFormat.class);
  job.setMapperClass(Selector.class);
  job.setPartitionerClass(Selector.class);
  job.setReducerClass(Selector.class);
  FileOutputFormat.setOutputPath(job, tempDir);
  job.setOutputFormat(SequenceFileOutputFormat.class);
  job.setOutputKeyClass(FloatWritable.class);
  job.setOutputKeyComparatorClass(DecreasingFloatComparator.class);
  job.setOutputValueClass(SelectorEntry.class);
  Selector类实现了3个函数:mapper,reducer和partitioner。最后一个函数非常有趣:Selector用了一个自定义的Partitioner把来自同一主机的URL分配给同一个reduce任务,这样我们就能满足前面列表的要求3-5。

  如果我们不重写默认的partitioner,来自同一主机的URL最终会输出到不同的分区里面,这样我们就不能跟踪和限制URL总数,因为MapReduce任务彼此之间不做任何交流。那么现在的情况是,属于同一台主机的所有URL都会由同一个reduce任务处理,这意味着我们能控制每台主机可以选择多少个URL。

  实现一个自定义的partitioner,从而把需要在同一个任务中处理的数据最终放入同一个分区,是很简单的。同一个任务处理的数据就会被放在同一个分区里。我们首先来看一下Selector类如何实现Partitioner接口(它只包含一个方法):
  /** Partition by host. */
  public int getPartition(FloatWritable key, Writable value, int numReduceTasks) {
  return hostPartitioner.getPartition(((SelectorEntry)value).url, key,
  numReduceTasks);
  }
  这个方法返回在0到numReduceTasks – 1之间的一个整数,numReduceTasks是化简任务数。它简单地用原始的URL替换了键,URL数据从SelectorEntry获取,这样做就可以把URL(不是页面排序值)传递给PartitionUrlByHost类实例对象,并且在这里计算出URL属于的切分号:
  /** Hash by hostname. */
  public int getPartition(Text key, Writable value, int numReduceTasks) {
  String urlString = key.toString();
  URL url = null;
  try {
  url = new URL(urlString);
  } catch (MalformedURLException e) {
  LOG.warn("Malformed URL: '" + urlString + "'");
  }
  int hashCode = (url == null ? urlString : url.getHost()).hashCode();
  // make hosts wind up in different partitions on different runs
  hashCode ^= seed;
  return (hashCode & Integer.MAX_VALUE) % numReduceTasks;
  }
  从这个代码片断能看到,分区号的计算只针对URL的主机部分的地址,这意味着属于同一个主机的所有URL最终会被放入同一个分区。

  这个作业的输出数据根据网页排序值降序排列。因为CrawlDB中有很多记录有同样的排序值,所以我们不能用MapFileOutputFormat来存储输出文件,否则会违反map文件严格基于主键排序的固定规则。

  细心的读者会注意到一点,因为我们不直接使用初始键值,但是我们又想保留这种初始的键值对。这里使用一个SelectorEntry类把初始的键值对传递给下一步骤处理过程。
  Selector.reduce()函数跟踪计算URL的总数和每个主机对应的最大URL数,然后简单地摒弃多余的记录。注意,必须对URL总个数的限制进行近似化处理。

  我们用总的限制数除以ruduce任务的个数得到当前任务允许拥有的URL的个数的限制范围。但是我们并不能肯定每个任务都能够得到平均的分配数;实际上在大多数情况下很难实现,因为在各个主机中分布的URL数目是不均匀的。不管怎么样,对于Nutch来说,这种近似的控制已经足够了。

  步骤2:逆转,基于主机分区,随机排序 在前面,我们用格式存储了一个顺序文件。现在我们必须产生格式的顺序文件来满足前面描述的要求1,2和6。这个处理步骤的输入数据是步骤1的输出数据。

  下面的代码片断展示了这个作业过程的初始设置:
  FileInputFormat.addInputPath(job, tempDir);
  job.setInputFormat(SequenceFileInputFormat.class);
  job.setMapperClass(SelectorInverseMapper.class);
  job.setMapOutputKeyClass(Text.class);
  job.setMapOutputValueClass(SelectorEntry.class);
  job.setPartitionerClass(PartitionUrlByHost.class);
  job.setReducerClass(PartitionReducer.class);
  job.setNumReduceTasks(numParts);
  FileOutputFormat.setOutputPath(job, output);
  job.setOutputFormat(SequenceFileOutputFormat.class);
  job.setOutputKeyClass(Text.class);
  job.setOutputValueClass(CrawlDatum.class);
  job.setOutputKeyComparatorClass(HashComparator.class);

  SelectorInverseMapper类简单地删除了当前键(排序值),抽取原始的URL并且把它设置为键,使用SelectorEntry作为值。细心的读者可能质疑:“为什么我们不再进一步,同时再抽取原始的CrawlDatum,把它作为值?”详情参见后文。

  这个作业的最终输出是顺序文件,格式是,但是map阶段我们得到的输出是格式。我们必须指出为map输出采用不同的键/值类对象,必须用setMapOutputKeyClass()和setMapOutputValueClass()这两个类设置函数——否则,Hadoop会假定我们用的类与为reduce和reduce输出声明的类一样(这种矛盾通常会导致作业失败)。

  map阶段的输出使用PartitionUrlByHost类对象进行切分,因此它又把来自同一主机的URL分配到同一个分区。这就满足要求1。

  一旦数据从map任务移到reduce任务,Hadoop就会根据输出数据键comparator的结果对数据排序,这里的comparator是HashComparator类对象。这个类采用简单的哈希机制来混合URL,这个机制可保证来自同一主机的URL会被尽量放在一起。

  为了满足要求6,我们把reduce任务的数量设置成希望的Fetcher map任务的数量(前面提到的numParts),记住,每个reduce分区稍后将用于创建一个单独的Fetcher map任务。

  PartitionReducer类负责完成最后一步,即把数据转换成数据。使用HashComparator的一个令人惊讶的副作用是几个URL可能具有同样的哈希值,并且Hadoop调用reduce()函数时只传送遇到的第一个键对应的值,具有相等键值的记录被认为是一样的而被删除。现在能明白当初为什么我们必须在SelectorEntry类的记录中保留所有的URL值,因为我们可以从遍历的值中抽取URL。下面是这个方法的实现:
  public void reduce(Text key, Iterator values,
  OutputCollector output, Reporter reporter) throws IOException {
  // when using HashComparator, we get only one input key in case of hash collisions
  // so use only URLs extracted from values
  while (values.hasNext()) {
  SelectorEntry entry = values.next();
  output.collect(entry.url, entry.datum);
  }
  }
        最终,reduce任务的输出在Nutch分段目录中crawl_generate子目录下以SequenceFileOutputFormat格式保存。输出文件满足前面的1-6项全部要求。

Fetcher:正在运行的多线程类MapRunner

  Nutch的Fetcher应用程序负责从远程站点下载网页内容。因此,为了尽量减少爬取fetchlist的时间,对于这个处理过程来说使用每个机会来做并行处理相当重要。

  在Fetcher应用中,已经有一级并行机制——输入fetchlist的若干个分区被分配给多个map任务。然而,这么做实际上远远不够:顺序下载来自不同主机(见前一节对HashComparator的介绍)的URL相当浪费时间。因为,Fetcher的map任务使用多个工作线程同时处理这种数据。

  Hadoop 使用MapRunner类来实现对输入数据记录的顺序处理。Fetcher类实现自己的MapRunner类,它使用若干个线程并行处理输入记录。
  先从这个作业的设置开始:
  job.setSpeculativeExecution(false);
  FileInputFormat.addInputPath(job, "segment/crawl_generate");
  job.setInputFormat(InputFormat.class);
  job.setMapRunnerClass(Fetcher.class);
  FileOutputFormat.setOutputPath(job, segment);
  job.setOutputFormat(FetcherOutputFormat.class);
  job.setOutputKeyClass(Text.class);
  job.setOutputValueClass(NutchWritable.class);
  首先,我们关闭推测执行(speculative execution)。我们不能同时让几个map任务从同一个主机下载内容,因为这可能会打破宿主级的负载限制(如并发请求数和每秒请求数)。

  其次,我们使用自定义的InputFormat对象来防止Hadoop将输入数据分区进一步切分为更小的块(分片)导致map任务的数量超过输入分区的数量。这么做又一次保证我们可以控制宿主级的访问限制。

  输出数据用自定义的OutputFormat对象来存储,通过使用NutchWirtable类的数据值,它新建了几个输出map文件和顺序文件。NutchWritable类是GenericWritable的子类,它能传递几种不同Writable类的实例对象,但必须事先声明。

  Fetcher类实现MapRunner接口,我们把这个类设置为作业的MapRunner实现。相关代码如下:
  public void run(RecordReader input,
  OutputCollector output,
  Reporter reporter) throws IOException {
  int threadCount = getConf().getInt("fetcher.threads.fetch", 10);
  feeder = new QueueFeeder(input, fetchQueues, threadCount * 50);
  feeder.start();
  for (int i = 0; i < threadCount; i++) { // spawn threads
  new FetcherThread(getConf()).start();
  }
  do { // wait for threads to exit
  try {
  Thread.sleep(1000);
  } catch (InterruptedException e) {}
  reportStatus(reporter);
  } while (activeThreads.get() > 0);
  }
  Fetcher类提前读取许多输入记录数据,使用QueueFeeder线程把输入记录放入为每个主机建立的队列中。然后启动几个FetcherThread实例对象,它们将读取每个主机对应的队列数据,这时QueueFeeder继续读取输入数据来填充这些队列。每个FetcherThread读取全部非空队列中的数据项。

  与此同时,map任务的主线程也在不停运转等待所有的线程完成它们的作业。它定期向系统报告状态以保证Hadoop不会认为这个任务已经死掉并把它杀掉。一旦所有项目处理完,循环过程就结束,控制权返同Hadoop,然后Hadoop认为这个map任务即将完成。

  索引器:使用自定义的OutputFormat类

  这是一个MapReduce应用程序示例,它不会产生顺序文件或map文件,相反它的输出是一个Lucene索引。
  再提一下,因为MapReduce应用可能由几个reduce任务组成,所以这个应用的输出可能包含几个不完整的Lucene索引。
  Nutch Indxer工具使用CrawlDb,LinkDb和Nutch分段爬取状态信息,解析状态,页面元数据和纯文本数据)的信息,因此这个作业的设置部分将包括添加几个输入路径:
  FileInputFormat.addInputPath(job, crawlDbPath);
  FileInputFormat.addInputPath(job, linkDbPath);
  // add segment data
  FileInputFormat.addInputPath(job, "segment/crawl_fetch");
  FileInputFormat.addInputPath(job, "segment/crawl_parse");
  FileInputFormat.addInputPath(job, "segment/parse_data");
  FileInputFormat.addInputPath(job, "segment/parse_text");
  job.setInputFormat(SequenceFileInputFormat.class);
  job.setMapperClass(Indexer.class);
  job.setReducerClass(Indexer.class);
  FileOutputFormat.setOutputPath(job, indexDir);
  job.setOutputFormat(OutputFormat.class);
  job.setOutputKeyClass(Text.class);
  job.setOutputValueClass(LuceneDocumentWrapper.class);
  分散存储在这些输入位置的一个URL的所有相应记录需要合并起来新建Lucene文档(将被加入索引)。

  Indexer的Mapper类把输入数据(无论是源数据或是实现类)简单地封装到NutchWritable中,这样reduce阶段可能要使用不同的类来接收不同的源数据,而且它仍然能够一致地为map和reduce步骤的输出值声明一个单独的输出值类(类似于NutchWritable类)。

  Reducer方法遍历同一个键(URL)对应的所有值,解封数据(fetch CrawlDatum,CrawlDb CrawlDatum,LinkDb Inlinks,ParseData和ParseText),并用这些信息构建一个Lucene文档,后者被WritableLuceneDocumentWrapper对象封装并被收集。除了所有文本内容外(纯文本数据或是元数据),这个文档也包含类似PageRank值的信息(取自CrawlDb)。Nutch使用这种数值(score)来设置Lucene文档的权重值。
  OutputFormat方法是这个工具最有意思的部分:
  public static class OutputFormat extends
  FileOutputFormat {
  public RecordWriter
  getRecordWriter(final FileSystem fs, JobConf job,
  String name, final Progressable progress) throws IOException {
  final Path out = new Path(FileOutputFormat.getOutputPath(job), name);
  final IndexWriter writer = new IndexWriter(out.toString(),
  new NutchDocumentAnalyzer(job), true);
  return new RecordWriter() {
  boolean closed;
  public void write(WritableComparable key, LuceneDocumentWrapper value)
  throws IOException { // unwrap & index doc
  Document doc = value.get();
  writer.addDocument(doc);
  progress.progress();
  }
  public void close(final Reporter reporter) throws IOException {
  // spawn a thread to give progress heartbeats
  Thread prog = new Thread() {
  public void run() {
  while (!closed) {
  try {
  reporter.setStatus("closing");
  Thread.sleep(1000);
  } catch (InterruptedException e) { continue; }
  catch (Throwable e) { return; }
  }
  }
  };
  try {
  prog.start();
  // optimize & close index
  writer.optimize();
  writer.close();
  } finally {
  closed = true;
  }
  }
  };
  }
  当请求生成一个RecordWriter类的实例对象时,OutputFormat类通过打开一个IndexWriter对象新建一个Lucene索引。然后,针对reduce方法中收集的每个新的输出记录,它解封LuceneDocumentWrapper对象中的Lucene文档,并把它添加到索引。

  reduce任务结束的时候,Hadoop会设法关闭RecordWriter对象。本例中,关闭的过程可能持续较长时间,因为我们想在关闭它之前进行索引优化工作。在这段时间中,因为已经没有任何进度更新,所以Hadoop可能会推断该任务已经被挂起,然后它可能会尝试杀死这个任务。因此,我们首先启动一个后台线程来传送让人安心的进度更新消息,然后才开始索引优化工作。一旦优化完成,我们便停止进度更新线程。现在输出索引得以创建、优化和停止更新,它已经准备好应用于任何搜索应用程序中。

  总结
  这里对Nutch系统的简短综述其实忽略了很多细节,比如错误处理、日志记录、URL过滤和规范化,处理重定向或其他形式的网页“别名”(如镜像),剔除重复

  内容,计算PageRank值等。在这个项目的官方主页和wiki页面(http://wiki.apache.org/nutch),可以找到这些方面的介绍及其他更多信息。

  当前,Nutch正在被很多组织或个人用户使用。然而,运作一个搜索引擎要求有大量的投资来支持硬件配备,系统集成,自定义开发和索引维护;因此,在大多数情况下,Nutch用于构建商业的垂直或针对领域的搜索引擎。

  Nutch正处于积极的开发中,并且该项目紧跟Hadoop的最新版本。因此,它将继续成为使用Hadoop作为核心部件,并且具有良好产出的应用实例。
  (作者:Andrzej Bia?ecki)

Rackspace的日志处理
  Rackspace Hosting一直为企业提供管理系统,以同样的方式,Mailtrust在2007秋变成Rackspace的邮件分部。Rackspace目前在几百台服务器上为100多万用户和几千家公司提供邮件服务。

  要求/问题
  通过系统传输Rackspace用户的邮件产生了相当大的“文件”路径信息,它们以各种格式的日志文件的形式存放,每天大约有150 GB。聚集这些数据对系统发展规划以及了解用户如何使用我们的系统是非常有帮助的,并且,这些记录对系统故障排查也有好处。

  假如一封邮件发送失败或用户无法登陆系统,这时非常重要的事是让我们的客服能找到足够的问题相关信息开始调试。为了能够快速发现这些信息,我们不能把日志文件放在产生它们的机器上或以其原始格式存放。相反,我们使用Hadoop来做大量的日志处理工作,而其结果被Lucene索引之后用来支持客服的查询需求。

  日志
  数量级最大的两种日志格式是由Postfix邮件发送代理和Microsoft Exchange Server产生的。所有通过我们系统的邮件都要在某个地方使用Postfix邮件代理服务器,并且大部分消息都要穿越多个Postfix服务器。Exchange是必须独立的系统,但是其中有一类profix服务器充当一个附加保护层,它们使用SMTP协议在各个环境下的托管邮箱之间传递消息。

  消息要穿越很多机器,但是每个服务器只知道邮件的目的地,然后发送邮件到下一个负责的服务器。因此,为了给消息建立完整的历史信息,我们的日志处理系统需要拥有系统的全局视图。Hadoop给予我们的最大帮助是:随着我们的系统发展壮大,系统日志量也随之增长。为了使我们的日志处理逻辑仍然可行,我们必须确保它能扩展。MapReduce就是一个可以处理这种数据增长的完美系统架构。

  简史
  我们日志处理系统的前几个版本都基于MySQL的,但随着我们拥有越来越多的日志机器,我们达到了一个MySQL服务器能够处理的极限。虽然该数据库模式已经进行了适度的非规范化处理,使其能够较轻松地进行数据切片,但目前MySQL对数据分区的支持仍然很脆弱。我们没有在MySQL上去实现自己的切片和处理方案,而是选择使用Hadoop。

  选择Hadoop
  一旦选择在RDBMS(关系型数据库管理系统)上对数据进行分片存储,你就丧失了SQL在数据集分析处理方面的很多优势。Hadoop使我们能够使用针对小型数据集使用的同样的算法来轻松地并行处理所有数据。

  收集和存储
  日志收集
  产生日志的服务器分布在多个数据中心,但目前我们只有一个单独的Hadoop集群,位于其中一个数据中心(见图16-8)。为了汇总日志数据并把它们放入集群,我们使用syslog-ng(Unix syslog机制的替代机制)和一些简单的脚本来控制在如何Hadoop上新建文件。

  图16-8. Rackspace的Hadoop数据流

  在一个数据中心里,syslog-ng用于从source(源)机器传送日志数据到一组负载均衡的collector(收集器)机器。在这些收集器上,每种类型的日志数据被汇成一个单独的数据流,并且用gzip格式进行轻量级的压缩(图16-8步骤A)。远程收集器的数据通过SSH通道跨数据中心传送到Hadoop集群所在的“本地收集器”(local collector)上(步骤B)。

  一旦压缩的日志流到达本地收集器,数据就会被写入Hadoop(步骤C)。目前我们使用简单的Python脚本把输入数据缓存到本地硬盘,并且定期使用Hadoop命令行界面把数据放入Hadoop集群。当缓存日志数据量达到Hadoop数据块大小的倍数或是缓存已经经过了足够长的时间时,脚本程序开始复制日志缓存数据到Hadoop的各个输入文件夹。

  这种从不同数据中心安全地汇总日志数据的方法在Hadoop支持SOCKS之前就已经有人开发使用了,SOCKS是通过hadoop.rpc.socket.factory.class.default参数和SocksSocketFactory类实现的。通过直接使用远程收集器对SOCKS的支持和HDFS(分布式Hadoop文件系统)的API(应用程序编程接口),我们能够从系统中消除一个磁盘的写入操作和降低系统的复杂性。我们计划在将来的开发中实现一个使用这些特性的替代品。

  一旦原始日志被存放到Hadoop上,这些日志就已经准备好交给我们的MapReduce作业处理了。

  日志存储
  我们的Hadoop集群目前包含15个datanode(数据节点),每个节点都使用普通商用CPU和3个500 GB的硬盘。我们对文件使用默认的复本因子3,这些文件有6个月的存档期限,其中两个复本用于其他用途。

  Hadoop的namenode(域名节点)使用的硬件和datanode相同。为了提供比较高的可用性,我们使用两个辅助namenode和一个虚拟IP,该IP可以很容易地指向3台机器中具有HDFS快照的硬盘。这表明在故障转移情形下,根据辅助namenode的快照时间,我们可能会丢失最多30分钟的数据。虽然这对于我们的日志处理应用来说是可接受的,但是其他Hadoop应用可能要求通过为namenode镜像提供共享存储的能力来实现无损的故障转移。

  日志的MapReduce模型
  处理
  在分布式系统中,唯一标识符令人失望的是它们极少是真正唯一的。所有的电子邮件消息都拥有一个(所谓的)唯一标识符,叫message-id,它由消息发起的主机产生,但是一个不良客户端能够轻松发送重复消息副本。另外,因为Postfix设计者并不相信message-id可以唯一地标识消息,所以他们不得不提出设计一个独立的ID(标识)叫queue-id,在本地机器的生命周期内唯一。

  尽管message-id趋向于成为消息的权威标识,但在Postfix日志中,需要使用queue-id来查找message-id。看例16-1第二行(为了适合页面大小,日志行的格式做了调整),你将发现十六进制字符串1DBD21B48AE,它就是该行消息的queue-id。因为日志收集的时候(可能每隔几小时进行一次),每个消息(包括它的message-id)的信息都输出到单独的行,所以让我们的解析代码保留消息状态是必要的。

  例16-1. Postfix 日志行
  Nov 12 17:36:54 gate8.gate.sat.mlsrvr.com postfix/smtpd[2552]: connect from hostname
  Nov 12 17:36:54 relay2.relay.sat.mlsrvr.com postfix/qmgr[9489]: 1DBD21B48AE:
  from=, size=5950, nrcpt=1 (queue active)
  Nov 12 17:36:54 relay2.relay.sat.mlsrvr.com postfix/smtpd[28085]: disconnect from
  hostname
  Nov 12 17:36:54 gate5.gate.sat.mlsrvr.com postfix/smtpd[22593]: too many errors
  after DATA from hostname
  Nov 12 17:36:54 gate5.gate.sat.mlsrvr.com postfix/smtpd[22593]: disconnect from
  hostname
  Nov 12 17:36:54 gate10.gate.sat.mlsrvr.com postfix/smtpd[10311]: connect from
  hostname
  Nov 12 17:36:54 relay2.relay.sat.mlsrvr.com postfix/smtp[28107]: D42001B48B5:
  to=, relay=hostname[ip], delay=0.32, delays=0.28/0/0/0.04,
  dsn=2.0.0, status=sent (250 2.0.0 Ok: queued as 1DBD21B48AE)
  Nov 12 17:36:54 gate20.gate.sat.mlsrvr.com postfix/smtpd[27168]: disconnect from
  hostname
  Nov 12 17:36:54 gate5.gate.sat.mlsrvr.com postfix/qmgr[1209]: 645965A0224: removed
  Nov 12 17:36:54 gate2.gate.sat.mlsrvr.com postfix/smtp[15928]: 732196384ED: to=
  apreduce@rackspace.com>, relay=hostname[ip], conn_use=2, delay=0.69, delays=0.04/
  0.44/0.04/0.17, dsn=2.0.0, status=sent (250 2.0.0 Ok: queued as 02E1544C005)
  Nov 12 17:36:54 gate2.gate.sat.mlsrvr.com postfix/qmgr[13764]: 732196384ED: removed
  Nov 12 17:36:54 gate1.gate.sat.mlsrvr.com postfix/smtpd[26394]: NOQUEUE: reject: RCP
  T from hostname 554 5.7.1 : Client host rejected: The
  sender's mail server is blocked; from= to=
  uce@rackspace.com> proto=ESMTP helo=
  从MapReduce的角度看,日志的每一行是一个单独的键/值。第一步,我们需要把所有的行和一个单独的queue-id键联系起来,然后执行reduce过程判断日志消息值数据是否能表明这个queue-id对应的数据是完整的。

  类似地,一旦我们拥有一个消息完整的queue-id,在第二步,我们需要根据message-id对消息进行分组。我们把每个完整的queue-id和message-id对应(Map)起来,让它们作为键(key),而它对应的日志行作为值(value)。在Reduce阶段,我们判断针对某个message-id的所有的queue-id是否都表明消息已经离开我们的系统。

  邮件日志的MapReduce作业的两阶段处理和它们的InputFormat与OutputFormat形成了一种“分阶段事件驱动架构”(staged event-driven archilecture,SEDA)应用类型。在SEDA里,一个应用被分解为若干个“阶段”,“阶段”通过数据队列区分。在Hadoop环境下,队列可能是MapReduce作业使用的HDFS中的一个输入文件夹或MapReduce作业在Map和Reduce处理步骤之间形成的隐性的数据队列。

  在图16-9中,各个阶段之间的箭头代表数据队列,虚线箭头表示隐性的MapReduce数据队列。每个阶段都能通过这些队列发送键值对(SEDA称之为事件或消息)给其他处理阶段。

  图16-9. MapReduce链

  阶段1:Map 在我们的邮件日志处理作业的第一阶段,Map阶段的输入或是以行号为键、以对应的日志消息为值的数据,或是以queue-id为键、以对应的日志消息数组作为值的数据。当我们处理来自输入文件数据队列的源日志文件的时候,产生第一种类型的输入,而第二种类型是一种中间格式,它用来表示一个我们已经试图处理但因为queue-id不完整而重新进行数据排队的queue-id的状态信息。

  为了能处理这两种格式的输入,我们实现了Hadoop的InputFormat类,它根据FileSplit输入文件的扩展名把工作委托给底层的SequenceFileRecordReader类或LineRecordReader类处理。这两种输入格式的文件来自HDFS中不同的输入文件夹(数据队列)。

  阶段1:Reduce 在这一阶段,Reduce根据queue-id是否拥有足够的日志行来判定它是否完整。假如queue-id已经完整,便输出以message-id作为键、以HopWritable对象为值的数据对。否则,queue-id被设置为键,日志行数组重新列队并和下一组原始日志进行Map处理。这个过程将持续到queue-id已经完整或操作超时。

  HopWritable 对象是POJO对象(Plain Old Java Objects,简单Java对象),实现了Hadoop的Writable接口。它从一台单独服务器的视角完整地描述一条消息,包括发送地址和IP,消息发送给其他服务器的尝试记录,标准的消息头信息。

  通过实现OutputFormat类完成输出不同的结果,这一过程对应于我们的两个InputFormat对象输入格式。在Hadoop API在版本r0.17.0添加MultipleSequenceFileOutputFormat类之前,我们已经实现MultipleSequenceFileOutputFormat类,它们实现同样的目标:我们需要Reduce作业的输出对根据其键的特点存储到不同的文件。

  阶段2:Map 在邮件日志处理作业的第二个步骤,输入是从上个阶段得到的数据,它是以message-id为键、以HopWritable类对象数据为值的数据对。这一步骤并不包含任何逻辑处理:而是使用标准的SequenceFileInputFormat类和IdentityMapper类简单地合并来自第一阶段的输入数据。

  阶段2:Reduce 在最终的reduce步骤,我们想判断针对某个通过系统的message-id,收集到的所有HopWritable对象是否能表示它经过系统的整个消息路径。一条消息路径实际上是一个有向图(通常是没有循环的,但如果服务器被错误设置,有可能会包含循环)。在这个图里,点代表服务器,可标记多个queue-id,服务器之间消息的传送形成了边。对这个应用,我们使用的是JGraphT图库。

  对于输出,我们又一次使用MultiSequenceFileOutputFormat类对象。如果reducer判定对于某个message-id的所有queue-id能够创建一条完整的消息路径,消息就会被序列化,并排队等候SolrOutputFormat类的处理。否则,消息的HopWritable对象会被列入阶段2:Map阶段,然后使用下一批queue-id等待重新处理。

  SolrOutputFormat类包含一个嵌入式Apache Solr实例对象——Solr wiki(http://wiki.apache. org/solr/EmbeddedSolr)最初提出的一种流行的方法——来产生本地硬盘的索引信息。关闭OutputFormat类包括把硬盘索引压缩到输出文件的最终地址。与使用Solr’s HTTP接口或直接使用Lucene相比,这种方法有以下几个优点:


  我们目前使用默认的HashPartitioner类来决定Reduce任务和特定键之间的对应关系,就是说键是半随机分布的。在以后的新版系统中,我们将实现一个新的Partitioner,它通过发送地址(我们最通用的搜索词)来切分数据。一旦索引以发送者为单位分割,我们就能够使用地址的哈希值来判断在哪里合并或查询索引,并且我们的搜索API也只需要和相关的对地址的哈希值节点进行通信交流。

  合并相近词搜索
  在一系列的MapReduce阶段完成之后,一系列不同计算机会得知新的索引的信息,进而可以进行索引合并。这些搜索节点它们还运行Apache Tomcat和Solr来托管已经完成的索引信息,这些搜索节点不仅具有把索引合并置于本地磁盘的服务(见图16.8步骤D),它们还运行Apache Tomeate和Solr来托管已完成的索引信息。

  来自SolrOutputFormat类的每个压缩文件都是一个完整的Lucene索引,Lucene提供IndexWriter.addIndexes()方法支持快速合并多个索引。我们的MergeAgent服务把每个新索引解压到Lucene RAMDirectory或FSDirectory(根据文件的大小),把它们合并到本地硬盘,然后发送一个请求给Solr实例,后者负责提供索引服务并使更新后的索引能够用于查询处理。

  切片 Query/Management(查询/管理) API 是一个PHP代码层,它主要是处理输出索引在所有搜索节点上的“切片”(sharding)。我们使用一个简单的“一致性哈希”(consistent hashing)来判定搜索节点和索引文件之间的对应关系。目前,索引首先按照创建时间切片,然后再根据其文件名的哈希值切片,但是我们计划将来用对发送地址的哈希值来取代对文件名的哈希值(见阶段2:Reduce)。

  因为HDFS已经处理了Lucene索引的复制问题,所以没有必要在Solr实例中保留多个副本。相反,在故障转移时,相应的搜索节点会被完全删除,然后由其他节点负责合并索引。

  搜索结果 使用这个系统,从产生日志到获得搜索结果供客服团队使用,我们获得了15分钟的周转时间。

  我们的搜索API支持Lucene的全部查询语法,因此我们常常可以看到下面这样的复杂查询:

  sender:"mapreduce@rackspace.com" -recipient:"hadoop@rackspace.com"
  recipient:"@rackspace.com" short-status:deferred timestamp:[1228140900 TO 2145916799]

  查询返回的每个结果都是一个完整的序列化消息路径,它表明了各个服务器和接收者收是否收到了这个消息。现在我们把这个路径用一个2D图展示出来(图16-10),用户可以通过扩展自己感兴趣的节点来和这个图互动,但是在这个数据的可视化方面还有很多需要改进的地方。
  图16-10. 数据树

  为分析进行存档
  除了为客服提供简短词语的搜索功能之外,我们也对日志数据的分析感兴趣。
  每晚,我们运行一系列的MapReduce作业,它们的输入是白天产生的索引数据。我们实现了SolrInputFormat类对象,它可以拖回并解压索引,然后用对的形式输出每个文档。使用这种InputFormat类,我们可以遍历一天产生的所有消息路径,可以回答我们邮件系统的几乎任何问题,包括:
  •    每个域的数据(病毒程序,垃圾邮件,连接状况,收件人)
  •    最有效的垃圾邮件规则
  •    特定用户产生的负载
  •    消息量反弹的原因
  •    连接的地理分布信息
  •    特定机器之间的平均时间延迟


  因为在Hadoop上,我们拥有好几个月的压缩索引信息,所以还能够回顾性地回答夜间日志概要工作忽略的问题。例如,我们近期想确定每个月消息发送量最大的IP地址,这个任务我们可以通过一个简单的一次性MapReduce作业来完成。
  (作者:Stu Hood)

关于Cascading
  Cascading是一个开源的Java库和应用程序编程接口(API),它为MapReduce提供了一个抽象层。它允许开发者构建出能在Hadoop集群上运行的复杂的、关键任务的数据处理应用。

  Cascading项目始于2007年夏天。它的第一个公开版本,即版本0.1,发布于2008年1月。版本1.0发布于2009年1月。从该项目的主页http://www.cascading.org/可以下载二进制版本,源代码以及一些加载项模块。

  map和reduce操作提供了强大的原语操作。然而,在创建复杂的、可以被不同开发者共享的合成性高的代码时,它们粒度级别似乎不合适。再者,许多开发者发现当他们面对实际问题的时候,很难用MapReduce的模式来思考问题。

  为了解决第一个问题,Cascading用简单字段名和一个数据元组模型值来替代MapReduce使用的键和值,而该模型的元组是由值的列表构成的。对第二个问题,Cascading直接从Map和Reduce操作分离出来,引入了更高层次的抽象:Function,Filter,Aggregator和Buffer。

  其他一些可选择的方案在该项目初始版本公开发布的同时基本上也出现了,但Cascading的设计初衷是对它们进行补充和完善。主要是考虑到大部分可选的架构都是对系统强加一些前置和后置条件或有其他方面的要求而已。

  例如,在其他几种MapReduce工具里,运行应用程序之前,你必须对数据进行预格式化处理、过滤或把数据导入HDFS(Hadoop分布式文件系统)。数据准备步骤必须在系统的程序设计抽象之外完成。相反,Cascading提供方法实现把数据准备和管理作为系统程序设计抽象的组成部分。

  该实例研究将首先介绍Cascading的主要概念,最后概括介绍ShareThis如何在自己的基础框架上使用Cascading。

  如果希望进一步了解Cascading处理模型,请参见项目主页上的“Cascading用户手册”。

  字段、元组和管道
  MapReduce模型使用键和值的形式把输入数据和Map函数,Map函数和Reduce函数以及Reduce函数和输出数据联系起来。

  但据我们所知,实际的Hadoop应用程序通常会将多个MapReduce作业链在一起。看一下用MapReduce模型实现的一个典型的字数统计例子。如果需要根据统计出来的数值进行降序排列,这是一个可能的要求,它将需要启动另一个MapReduce作业来进行这项工作。

  因此,理论上来说,键和值的模式不仅把Map和 Reduce绑定到一起,它也把Reduce和下一次的Map绑定了,这样一直进行下去(图16-11)。即键/值对源自输入文件,流过Map和Reduce操作形成的链,并且最后终止到一个输出文件。实现足够多这样链接的MapReduce应用程序,便能看出一系列定义良好的键/值操作,它们被一遍一遍地用来修改键/值数据流的内容。

  图16-11. 基于MapReduce的计数和排序

  Cascading系统通过使用具有相应字段名的元组(与关系型数据库中的表名和列名类似)来替代键/值模式的方法简化了这一处理流程。在处理过程中,由这些字段和元组组成的流数据在它们通过用户定义的、由管道(pipe)链接在一起的操作时得以处理(图16-12)。
  因此,MapReduce的键和值被简化成如下形式。
  字段
  字段是一个String(字符串)类型的名称集合(如“first_name”)、表示位置信息的数值(如2和-1分别是第三和最后一个位置)或是两者混合使用的集合,与列名非常像。因此字段用来声明元组里值的名称和通过名称在元组中选出对应的值。后者就像执行SQL的select语句。

  图16-12. 由字段和元组链接的管道

  元组
  元组就是由java.lang.Comparable类对象组成的数组。元组与数据库中的行或记录类似。

  Map和Reduce操作都被抽象隐藏到一个或多个管道实例之后(图16-13)。

  Each
  Each管道一次只处理一个单独的输入元组。它可以对输入元组执行一个Function或一个Filter操作(后文马上要介绍)。

  GroupBy
  GroupBy管道在分组字段上对元组进行分组。该操作类似于SQL的group by语句。如果元组的字段名相同,它也能把多个输入元组数据流合并成一个元组数据流。
  CoGroup
  CoGroup管道既可以实现元组在相同的字段名上连接,也可以实现基于相同字段的分组。所有的标准连接类型(内连接—inner join,外连接—outer join等)以及自定义连接都可以用于两个或多个元组数据流。

  图16-13. 管道类型

  Every
  Every管道每次只处理元组的一个单独分组的数据,分组数据可以由GroupBy或CoGroup管道产生。Every管道可以对分组数据应用Aggregator或Buffer操作。

  SubAssembly
  SubAssembly管道允许在一个单独的管道内部进行循环嵌套流水线处理,或反过来,一个管道也可以被嵌入更加复杂的流水线处理中。
  所有这些管道被开发者链接在一起形成“管道流水线处理流程”,这里每个流水线可以有很多输入元组流(源数据,source)和很多输出元组流(目标数据,sink)(见图16-14)。

  图16-14. 简单的管道流水线

  从表面上看来,这可能比传统的MapReduce模型更复杂。并且,不可否认,相较于Map,Reduce,Key和Value,这里涉及的概念更多。但实际上,我们引入了更多的概念,它们必须都工作协助提供不同的功能。

  例如,如果一个开发者想对reducer的输出值提供“辅助排序”功能,她将需要实现Map、Reduce,一个“合成”Key(嵌套在父Key中的两个Key),值,partitioner、一个用于“输出值分组”的comparator和一个“输出键”的comparator,所有这些概念以各种方式结合协作使用,并且在后续的应用中几乎不可重用。

  在Cascading里,这项工作只对应一行代码:new GroupBy(, , ),其中previous是数据源管道。

  操作
  如前所述,Cascading通过引入一些替换性操作脱离了MapReduce模式,这些操作或应用于单个元组,或应用于元组分组(图16-15)。

  Function
  Function作用于单个的输入元组,对每个输入,它可能返回0或多个输出元组。Function操作供Each类型的管道使用。
  图16-15. 操作类型

  Filter
  Filter是一种特殊的函数,它的返回值是boolean(布尔)值,用于指示是否把当前的元组从元组流中删除。虽然定义一个函数也能实现这一目的,但是Filter是为实现这一目的而优化过的操作,并且很多过滤器能够通过逻辑运算符(如And、Or、Xor和Not)分组,可以快速创建更复杂的过滤操作。

  Aggregator
  Aggregator对一组元组执行某种操作,这些分组元组是通过一组共同字段分组得到的。比如,字段“last-name”值相同的元组。常见的Aggregator方法是Sum(求和)、Count(计数),Average(均值)、Max(最大)和Min(最小)。

  Buffer
  Buffer和Aggregator操作类似,不同的是,它被优化用来充当一个“滑动窗口”扫描一个唯一分组中所有的元组。当开发者需要有效地为一组排序的元组插入遗漏的值时,或计算动态均值的时候,这个操作非常有用。通常,处理元组分组数据的时候,Aggregator也是一个可选的操作,因为很多Aggregator能够有效地链接起来工作,但有时,Buffer才是处理这种作业的最佳工具。

  管道流水线创建的时候,这些操作便绑定到各管道(图16-16)。

  Each和Every类型的管道提供了一种简单的元组选择机制,它们可以选择一些或所有的输入元组,然后把这些选择的数据传送给它的子操作。并且我们有一个简单的机制把这些操作的结果和原来的输入元组进行合并,然后产生输出元组。这里并不详细说明机制,它使得每个操作只关心参数指定的元组值和字段,而不是当前输入元组的整个字段集。其次,操作在不同应用程序之间重用,这点和Jave方法重用的方式相同。

  图16-16. 操作流程

  例如,在Java中,声明一个方法concatenate(String first, Stringsecond),比直接定义concatenate(Person person)更抽象。第二个方法的定义,concatenate()函数必须“了解”Person对象;而第一个方法的定义并不清楚数据来自哪里。Cascading操作展现了同样的抽象能力。

  Tap类、Scheme对象和Flow对象
  在前面的几个图中,我们多次提到源数据(source)和目标数据(sink)。在Cascading系统中,所有的数据都是读自或写入Tab类实例,但是它们是通过Scheme对象被转换成或取自元组实例对象。

  Tap
  Tap类负责如何访问数据以及从哪个位置访问数据。例如,判断数据是存于HDFS还是存于本地?在Amazon S3中,还是跨HTTP协议进行访问?

  Scheme
  Scheme类负责读取原始数据并把它们转换成元组格式/或把元组数据写入原始数据格式文件,这里的原始数据可以是文本行、Hadoop二进制的顺序文件或是一些专用格式数据。

  注意,Tap类对象不是管道处理流程的一部分,因此它们不是Pipe类型。

  但是当Tap对象在集群上变得可执行的时候,它们就和管道组件关联到一起。当一个管道处理流程与必要的几个源和目标数据Tap实例关联一起后,我们就得到一个Flow对象。Flow对象是在管道处理流程与指定数量的源及目标数据Tap关联时创建的,而Tap对象的功能是输出或获取管道流程期望的字段名。就是说,如果Tap对象输出一个具有字段名“line”的元组(通过读取HDFS上的文件数据),那么这个管道流程头部必须也希望字段名是“line”。否则,连接管道处理流程和Tap的处理程序会立刻失败并报错。

  因此,管道处理流程实际上就是数据处理定义,并且它们本身不是“可执行”的。在它们可以在集群上运行之前,必须连接到源和目标Tap对象。这种把Tap和管道处理流程分开处理的特性使Cascading系统非常强大。

  如果认为管道处理流程和Java类相似,那么Flow就像Java对象实例(图16-17)。也就是说,在同一个应用程序里面,同样的管道处理流程可以被实例化很多次从而形成新的Flow,不用担心它们之间会有任何干扰。如此一来,管道处理流程就可以像标准Java库一样创建和共享。

  图16-17. 流水线处理过程
  Cascading实战
  现在我们知道Cascading是什么,清楚地了解它是如何工作的,但是用Cascading写的应用程序是什么样子呢?我们来看看例16-2。
  例16-2. 字数统计和排序
  Scheme sourceScheme =
  new TextLine(new Fields("line")); ?
  Tap source =
  new Hfs(sourceScheme, inputPath); ?
  Scheme sinkScheme = new TextLine(); ?
  Tap sink =
  new Hfs(sinkScheme, outputPath, SinkMode.REPLACE); ?
  Pipe assembly = new Pipe("wordcount"); ?
  String regexString = "(?
  Function regex = new RegexGenerator(new Fields("word"), regexString);
  assembly =
  new Each(assembly, new Fields("line"), regex); ?
  assembly =
  new GroupBy(assembly, new Fields("word")); ?
  Aggregator count = new Count(new Fields("count"));
  assembly = new Every(assembly, count); ?
  assembly =
  new GroupBy(assembly, new Fields("count"), new Fields("word")); ?
  FlowConnector flowConnector = new FlowConnector();
  Flow flow =
  flowConnector.connect("word-count", source, sink, assembly);
  flow.complete();

   创建一个新的Scheme对象读取简单的文本文件,为每一行名为“line”字段(被Fields对象声明)输出一个新的Tuple对象。

   创建一个新的Scheme对象用于写简单文本文件,并且它期望输出的是一个具有任意多个字段/值的Tuple对象。假如有多个值要输出,这些值在输出文件里将以制表符分隔。

   创建源和目标Tap实例分别指向输入文件和输出目录。目标Tap对象输出数据时将覆盖目录下现有的所有文件。


   构建管道处理流程的头,并把它命名为“wordcount”。这个名称用于绑定源及目标数据到这个管道处理流程。多个头或尾要求必须有自己唯一的名称。
   构建具有一个函数的Each类型管道,它将解析line字段里的每个词,把解析结果放入一个新的Tuple对象。

   构建GroupBy管道,它将创建一个新的Tuple组,实现基于word字段的分组。

   构建一个具有Aggregator操作的Every类型管道,它将对基于不同词的分组Tuple对象分别进行字数统计。统计结果存于count的字段里。

   构建GroupBy类型管道,它将根据数值对count字段进行分组,形成新的Tuple分组,然后对word字段值进行辅助排序。结果是一组基于count字段值升序排列的count字段值和word字段的值列表。

  用Flow对象把管道处理流程和数据源及目标联系起来,然后在集群上执行这个Flow。


  在这个例子里,我们统计输入文件中的不同单词的数量,并根据它们的自然序(升序)进行排序。假如有些词的统计值相同,这些词就根据它们的自然顺序(字母序)排序。

  这个例子有一个明显的问题,即有些词可能会有大写字母;例如,“the”和“The”,当它出现在句首的时候就是“The”。因此我们可以插入一个新的操作来强制所有单词都转换为小写形式,但是我们意识到那些需要从文档中解析词语的所有将来的应用都必须做同样的操作,因此我们决定创建一个可重用的管道SubAssembly,如同我们在传统应用程序中创建一个子程序一样(参见例16-3)。

  例16-3. 创建一个SubAssembly
  public class ParseWordsAssembly extends SubAssembly ?
  {
  public ParseWordsAssembly(Pipe previous)
  {
  String regexString = "(?
  Function regex = new RegexGenerator(new Fields("word"), regexString);
  previous = new Each(previous, new Fields("line"), regex);
  String exprString = "word.toLowerCase()";
  Function expression =
  new ExpressionFunction(new Fields("word"), exprString,String.class); ?
  previous = new Each(previous, new Fields("word"), expression);
  setTails(previous); ?
  }
  }
   声明SubAssembly是子类,它本身是一种管道类型。

   创建一个Java的表达式函数,它将调用toLowerCase()方法来处理“word”字段对应的字符串类型值。我们要传入表达式函数期望的“word”字段的Java类型,这里是String类型。后台用Janino(http://www.janino.net/)来编译。

   我们必须告知SubAssembly的父类这个管道子组件在哪里结束。

  首先,我们新建一个SubAssembly类,它管理我们的“解析词”管道组件。因为这是一个Java类,所以可用于其他任何应用程序,当然这要求它们处理的数据中有word字段(例16-4)。注意,也有办法可以使这个函数更加通用,这些方法在“Cascading用户手册”中都有介绍。

  例16-4. 用一个SubAssembly扩展单词计数和排序
  Scheme sourceScheme = new TextLine(new Fields("line"));
  Tap source = new Hfs(sourceScheme, inputPath);
  Scheme sinkScheme = new TextLine(new Fields("word", "count"));
  Tap sink = new Hfs(sinkScheme, outputPath, SinkMode.REPLACE);
  Pipe assembly = new Pipe("wordcount");
  assembly =
  new ParseWordsAssembly(assembly); ?
  assembly = new GroupBy(assembly, new Fields("word"));
  Aggregator count = new Count(new Fields("count"));
  assembly = new Every(assembly, count);
  assembly = new GroupBy(assembly, new Fields("count"), new Fields("word"));
  FlowConnector flowConnector = new FlowConnector();
  Flow flow = flowConnector.connect("word-count", source, sink, assembly);
  flow.complete();
   我们用ParseWordsAssembly管道组件替换了之前例子中的Each类型管道。最后,我们只是用新的SubAssembly类型子管道替代了前面Every类型管道和单词解析函数。有必要的话,还可以继续进行更深入的嵌套处理。

  灵活性
  后退一步,让我们来看看这个新的模型给我们带来了什么好处,或更妙的是,消除了哪些不足。

  可以看出,我们不必再用MapReduce作业模式来考虑问题,或考虑Mapper和Reducer接口的实现问题,后续的MapReduce作业和前面的MapReduce作业如何绑定或链接。在运行的时候,Cascading“规划器”(planner)会算出最优的方法把管道处理流程切分成MapReduce作业,并管理作业之间的链接(图16-18)。

  图16-18. 怎么把Flow翻译成链式MapReduce作业

  因此,开发者可以以任何粒度来构造自己的应用程序。它们可以一开始就只是一个很小的做日志文件过滤处理的应用程序,但是后来可以根据需要不断增添新的功能。

  Cascading是一个API而不是类似SQL的字符串句法,因此它更灵活。首先,开发者能用他们熟悉的语言创建特定领域语言(domain-specific language,DSL),像Groovy,JRuby,Jython,Scala等(示例参见项目网站)。其次,开发者能对Cascading不同的部分进行扩展,像允许自定义Thrift或JSON对象使其能读写,并且允许它们以元组数据流的形式传送。

  Hadoop和Cascading在ShareThis的应用

  ShareThis是一个方便用户共享在线内容的共享网络。通过单击网页上或浏览器插件上的一个按钮,ShareThis允许用户无缝地访问他们的任何在线联系人及在线网络,并且允许他们通过电子邮件,IM,Facebook,Digg,手机SMS等方式共享它们的内容,而这一过程的执行甚至不要求他们离开当前的访问网页。发布者能配置他们的ShareThis按钮来标记服务的全球共享能力,如此推动网络流量,刺激传播活动,追踪在线内容的共享。通过减少网页不需要的内容及提供通过社会网络、隶属组和社区实时的内容发布功能,ShareThis还简化了社区媒体服务。

  ShareThis用户通过在线窗口共享网页和信息时,一个连续的事件数据流就进入ShareThis网络。这些事件首先要过滤和处理,然后传送给各种后台系统,包括 AsterData,Hypertable和Katta。

  这些事件的数据量能达到很大数量级,数据量太大以致于传统的系统无法处理。这种数据的“污染”(dirty)也很严重,主要归咎于流氓软件系统的“注入式攻击”、网页缺陷或错误窗口。因此,ShareThis选择为后台系统部署Hadoop作为预处理和处理协调管理(orchestration)前台。他们也选择使用Amazon Web服务(基于弹性云计算平台EC2)来托管其服务器,并且使用Amazon S3(简单服务存储服务)提供长期的存储功能,目的是利用其弹性的MapReduce模式(Elastic MapReduce,EMR)。

  这里着重介绍“日志处理管道”(图16-19)。日志处理管道只是简单地从S3文件夹(bucket)里读取数据,进行处理(稍后介绍),然后把结果存入另一个文件夹。简单消息队列服务(Simple Queue Service,SQS)用于协调各种事件的处理,用它来标记数据处理执行程序的开始和完成状态。下行数据流是一些其他的处理程序,它们用于拖动数据装载AsterData数据仓库,如从Hypertable系统获取URL列表作为网络爬取工具的下载源,或把下载的网页推入Katta系统来创建Lucene索引。注意,Hadoop系统是ShareThis整个架构的中心组件。它用于协调架构组件之间的数据处理和数据移动工作。

  有了Hadoop系统作为前端处理系统,在所有事件日志文件被加载到AsterData集群或被其他组件使用之前,它会基于一系列规则基于一系列规则对数据进行解析、过滤、清理和组织。AsterData是一个集群化数据仓库系统,它能支持大数据存储,并允许使用标准的SQL语法发出复杂的即时查询请求。ShareThis选择Hadoop集群来进行数据清理和准备工作,然后它把数据加载到AsterData集群实现即时分析和报告处理。尽管使用AsterData也有可能达到我们的目的,但是在处理流程的第一阶段使用Hadoop系统来抵消主数据仓库的负载具有重要意义。

  为了简化开发过程,制定不同架构组件间的数据协调规则以及为这些组件提供面向开发者的接口,Cascading被选为主要的数据处理API。这显示出它和“传统的”Hadoop用例的差别,它们主要是用“Hadoop”来实现对存储数据的查询处理。

  图16-19. ShareThis日志处理管道

  相反的,Cascading和Hadoop的结合使用为端到端的完整解决方案提供了一个更好、更简单的结构,因此对用户来说更有价值。

  对于开发者来说,Cascading的学习过程很简单,它从一个简单的文本解析单元测试(通过创建cascading.ClusterTestCase类的子类)开始,然后把这个单元程序放入有更多规则要求的处理层,并且在整个过程中,与系统维护相关的应用逻辑组织不变。Cascading用以下几种方法帮助保持这种逻辑组织的不变性。首先,独立的操作(Function,Filter等)都可以进行独立写和测试。其次,应用程序被分成不同的处理阶段:一个阶段是解析,一个阶段是根据规则要求进行处理,最后一个阶段是封装/整理数据,所有这些处理都是通过前述的SubAssembly基础类实现的。

  ShareThis的日志文件数据看起来非常像Apache日志文件,它们有日期/时间戳、共享URL、引用页URL和一些元数据。为了让分析下行数据流使用这些数据,这些URL必须先解压(解析查询字符串数据和域名等)。因此需要创建一个高层的SubAssembly对象来封装解析工作,并且,如果字段解析很复杂,SubAssembly子对象就可被嵌入来解析一些特定字段。

  我们使用同样的方式来应用处理规则。当每个Tuple对象通过SubAssembly对象实例的时候,如果有任何规则被触发,该对象就会被标记上标签“坏”(bad)。具有“坏”字标签的Tuple对象,会被附上被标记的原因用于后来的审查工作。

  最后,创建一个切分SubAssembly对象来做两件事。第一,用于对元组数据流进行分流处理,一个数据流针对标记“好”(good)的数据,另一个针对标记“坏”的数据。第二件是,切分器把数据切分成片,如以小时为单位。为了实现这一动作,只需要两个操作:第一个是根据已有数据流的timestamp(时间戳)创建区间段;第二个是使用interval(区间)和good/bad元数据来创建目录路径(例如,“05/good/”中“05”是早上5点,“good”是经过所有规则验证的数据)。这个路径然后被Cascading TemplateTap使用,这是一个特殊的Tap类型,它可以根据Tuple对象值把元组数据流动态输出到不同的路径位置。

  本例中,“path”值被TemplateTap用来创建最终输出路径。

  开发者也创建了第四个SubAssembly类型对象——它用于在单元测试时应用Cascading Assertion(断言)类。这些断言用来复查规则组件和解析SubAssembly做的工作。

  在例16-5的单元测试中,我们看到partitioner没有被检测,但是它被放入另外一个这里没有展示的集成测试中了。

  例16-5. Flow单元测试
  public void testLogParsing() throws IOException
  {
  Hfs source = new Hfs(new TextLine(new Fields("line")), sampleData);
  Hfs sink =
  new Hfs(new TextLine(), outputPath + "/parser", SinkMode.REPLACE);
  Pipe pipe = new Pipe("parser");
  // split "line" on tabs
  pipe = new Each(pipe, new Fields("line"), new RegexSplitter("\t"));
  pipe = new LogParser(pipe);
  pipe = new LogRules(pipe);
  // testing only assertions
  pipe = new ParserAssertions(pipe);
  Flow flow = new FlowConnector().connect(source, sink, pipe);
  flow.complete(); // run the test flow
  // verify there are 98 tuples, 2 fields, and matches the regex pattern
  // for TextLine schemes the tuples are { "offset", "line }
  validateLength(flow, 98, 2, Pattern.compile("^[0-9]+(\\t[^\\t]*){19}$"));
  }
  针对集成和部署,许多Cascading内置属性都可以使该系统和外部系统更容易集成,并进行更大规模的处理工作。

  在生产环境中运行时,所有的SubAssembly对象都连接起来并规划到一个Flow对象里,但是除了有源和目标Tap对象之外,我们也设计了trap(捕捉)Tap类型(图16-20)。通常,当远程的Mapper或Reducer任务的操作抛出一个异常的时候,Flow对象就会失败并杀死它管理的所有MapReduce作业。当一个Flow有trap的时候,所有的异常都会被捕捉并且造成异常的数据信息会被保存到当前这个捕捉程序对应的Tap对象里。然后可以在不终止当前Flow的情况下,继续处理下一个Tuple对象。有时你想让程序在出现错误的时候就停止,但在这里,ShareThis开发者知道在生产系统运行的时候,他们能同时回览并查看“失败”的数据,然后更新其单元测试。丢失几个小时的处理时间比丢失几个坏记录数据更糟糕。

  使用Cascading的事件监听器,Amazon SQS可被集成进来。当一个Flow结束的时候,系统就发送一条消息来通知其他系统它们已经可以从 Amazon S3上获取准备好的数据了。当Flow处理失败的时候,会有不同的消息发送,向其他的进程报警。

  其余的位于不同的独立集群的下行数据流进程将在中断的日志处理管道位置处开始处理。现在日志处理管道一天运行一次,因此没有必要让100个节点的集群闲着运转23个小时。因此我们是每24小时执行一次终止和启用操作。

  将来,在小型的集群上根据业务需求,增加运行间歇期到每6个小时一次或1小时一次都是非常简单的。其他的集群系统可以独立地根据各自负责的业务需要以不同的间隔期启用或关闭。例如,网络数据爬取组件(使用Bixo,它是EMI和ShareThis开发的基于Cascading的网络数据爬取工具)可以在一个小型集群上与Hypertable集群协作连续运转。这种随需应变的模型在Hadoop上运行良好,每个集群都能把工作负载调节到它期望处理的数量级。

  图16-20. ShareThis日志处理Flow

  总结
  对于处理和协调跨不同架构组件的数据的移动这个问题,Hadoop是一个非常强大的平台。它唯一的缺点是它的主要计算模型是MapReduce。
  Cascading的目标是(不用MapRedue模式来考虑设计方案的情况下)帮助开发者通过使用一个逻辑定义良好的API来快速而简单地建立强大的应用程序,而同时又把提高数据分布、复制、分布式处理管理的性能和程序活性的工作都留给了Hadoop。





已有(7)人评论

跳转到指定楼层
kaka100 发表于 2015-4-23 14:45:43
回复

使用道具 举报

qq442586235 发表于 2015-4-24 15:57:12
非常好  长知识了
回复

使用道具 举报

wangzhenqiang 发表于 2015-8-13 11:47:39
牛人,非常值得学习
回复

使用道具 举报

您需要登录后才可以回帖 登录 | 立即注册

本版积分规则

关闭

推荐上一条 /2 下一条