分享

spark学习技巧

本帖最后由 desehawk 于 2018-4-1 17:52 编辑


公众号:
bigdatatip

功能介绍

本公众号主要分享Spark使用及源码,spark 机器学习,图计算,同时会涉及到,如:hadoop,Hbase,Hive,Kafka。保证文章质量,给大家提供一个好的知识分享平台。


经典文章推荐:
Spark Structured Streaming高级特性:
链接:

开始内容:

一,事件时间窗口操作
使用Structured Streaming基于事件时间的滑动窗口的聚合操作是很简单的,很像分组聚合。在一个分组聚合操作中,聚合值被唯一保存在用户指定的列中。在基于窗口的聚合的情况下,对于行的事件时间的每个窗口,维护聚合值。
如前面的例子,我们运行wordcount操作,希望以10min窗口计算,每五分钟滑动一次窗口。也即,12:00 - 12:10, 12:05 - 12:15, 12:10 - 12:20 这些十分钟窗口中进行单词统计。12:00 - 12:10意思是在12:00之后到达12:10之前到达的数据,比如一个单词在12:07收到。这个单词会影响12:00 - 12:10, 12:05 - 12:15两个窗口。
结果表将如下所示。
1.png




高可用Redis服务架构分析与搭建

链接
部分内容:
基于内存的Redis应该是目前各种web开发业务中最为常用的key-value数据库了,我们经常在业务中用其存储用户登陆态(Session存储),加速一些热数据的查询(相比较mysql而言,速度有数量级的提升),做简单的消息队列(LPUSH和BRPOP)、订阅发布(PUB/SUB)系统等等。规模比较大的互联网公司,一般都会有专门的团队,将Redis存储以基础服务的形式提供给各个业务调用。

不过任何一个基础服务的提供方,都会被调用方问起的一个问题是:你的服务是否具有高可用性?最好不要因为你的服务经常出问题,导致我这边的业务跟着遭殃。最近我所在的项目中也自己搭了一套小型的“高可用”Redis服务,在此做一下自己的总结和思考。

首先我们要定义一下对于Redis服务来说怎样才算是高可用,即在各种出现异常的情况下,依然可以正常提供服务。或者宽松一些,出现异常的情况下,只经过很短暂的时间即可恢复正常服务。所谓异常,应该至少包含了以下几种可能性:

【异常1】某个节点服务器的某个进程突然down掉(例如某开发手残,把一台服务器的redis-server进程kill了)

【异常2】某台节点服务器down掉,相当于这个节点上所有进程都停了(例如某运维手残,把一个服务器的电源拔了;例如一些老旧机器出现硬件故障)

【异常3】任意两个节点服务器之间的通信中断了(例如某临时工手残,把用于两个机房通信的光缆挖断了)

其实以上任意一种异常都是小概率事件,而做到高可用性的基本指导思想就是:多个小概率事件同时发生的概率可以忽略不计。只要我们设计的系统可以容忍短时间内的单点故障,即可实现高可用性。

对于搭建高可用Redis服务,网上已有了很多方案,例如Keepalived,Codis,Twemproxy,Redis Sentinel。其中Codis和Twemproxy主要是用于大规模的Redis集群中,也是在Redis官方发布Redis Sentinel之前twitter和豌豆荚提供的开源解决方案。我的业务中数据量并不大,所以搞集群服务反而是浪费机器了。最终在Keepalived和Redis Sentinel之间做了个选择,选择了官方的解决方案Redis Sentinel。

Redis Sentinel可以理解为一个监控Redis Server服务是否正常的进程,并且一旦检测到不正常,可以自动地将备份(slave)Redis Server启用,使得外部用户对Redis服务内部出现的异常无感知。我们按照由简至繁的步骤,搭建一个最小型的高可用的Redis服务。


Spark调优系列之硬件要求
链接


内容:估计所有的spark开发者都很关心spark的硬件要求。恰当的硬件配置需要具体情况具体分析,浪尖在这里给出以下建议。

一,存储系统
因为因为大多数Spark工作可能需要从外部存储系统(例如Hadoop文件系统或HBase)中读取输入数据,所以将其尽可能靠近该系统很重要。所以,有如下建议:
1,如果可能,在与HDFS相同的节点上运行Spark。最简单的方式是将spark 的Standalone集群和hadoop集群安装在相同的节点,同时配置好Spark和hadoop的内存使用,避免相互干扰(对于hadoop,每个task的内存配置参数是mapred.child.java.opts; mapreduce.tasktracker.map.tasks.maximum 和mapreduce.tasktracker.reduce.tasks.maximum 决定了task的数目)。也可以将hadoop和spark运行在共同的集群管理器上,如mesos和 yarn。
2,如果不可能,请在与HDFS相同的局域网中的不同节点上运行Spark。
3,对于低延迟数据存储(如HBase),可能优先在与存储系统不同的节点上运行计算任务以避免干扰。

二,本地磁盘
虽然Spark可以在内存中执行大量的计算,但它仍然使用本地磁盘来存储不适合RAM的数据,以及在stage之间,也即shuffle的中间结果。我们建议每个节点至少有4-8块磁盘,并且不需要RAID,仅仅是独立的磁盘挂在节点。在Linux中,使用noatime选项安装磁盘,以减少不必要的写入。在spark任务中,spark.local.dir配置可以十多个磁盘目录,以逗号分开。如果你运行在hdfs上,与hdfs保持一致就很好。
使用noatime选项安装磁盘,要求当挂载文件系统时,可以指定标准Linux安装选项(noatime),这将禁用该文件系统上的atime更新。磁盘挂在命令:
mount -t gfs BlockDevice MountPoint -o noatime
BlockDevice 指定GFS文件系统驻留的块设备。
MountPoint 指定GFS文件系统应安装的目录。
例子:
mount -t gfs /dev/vg01/lvol0 /gfs1 -o noatime

三,内存
单台机器内存从8GB到数百GB,spark都能运行良好。在所有情况下,我们建议仅为Spark分配最多75%的内存;留下其余的操作系统和缓冲区缓存。
需要多少内存取决于你的应用程序。要确定你的应用的特定数据集需要多大内存,请加载部分数据集到内存,然后在Spark UI的Storage界面去看它的内存占用量。
请注意,内存使用受到存储级别和序列化格式的极大影响 - 有关如何减少内存使用的技巧,请参阅另一篇调优的文章。
最后,请注意,对于超过200GB的内存的机器JAVA VM运行状态并不一直表现良好。如果你买的机器内存超过了200GB,那么可以在一个节点上运行多个worker。Spark Standalone模式下,你可以在配置文件 conf/spark-env.sh中设置SPARK_WORKER_INSTANCES的值来设置单节点worker的数目。也可以设置SPARK_WORKER_CORES参数来设置每个Worker的cpu数目。

四,网络
根据以往的经验,假如数据是在内存中,那么spark的应用的瓶颈往往就在网络。用10 Gigabit或者更高的网络,是使spark应用跑的最更快的最佳方式。特别是针对“distributed reduce”应用,如group-bys,reduce-bys和sql joins,就表现的更加明显。在任何给定的应用程序中,你可以通过spark ui查看spark shuffle过程夸网络传输了多少数据。

五,cpu
即使每台机器几十个cpu,spark也可以很好的扩展,因为他在线程之间执行最小的共享cpu。你应该每台机器至少配置8-16个内核。根据cpu负载,可能需要更多的cpu:一旦数据在内存中,大多数应用程序的瓶颈就在CPU和内存。

SparkSql 中外连接查询中的谓词下推规则

链接

部分内容:
SparkSql

SparkSql是架构在spark计算框架之上的分布式Sql引擎,使用DataFrame和DataSet承载结构化和半结构化数据来实现数据复杂查询处理,提供的DSL可以直接使用scala语言完成sql查询,同时也使用thrift server提供服务化的Sql查询功能。SparkSql提供了Data Source API,用户通过这套API可以自己开发一套Connector,直接查询各类数据源,包括NoSql、RDBMS、搜索引擎以及HDFS等分布式FS上的文件等。和SparkSql类似的系统,从Sql和计算框架分离角度看应该就是Hive;从面相的业务类型看有PrestoDB、Impala等(都可以在一定程度上应对即系查询)。

谓词下推

所谓谓词(predicate),英文定义是这样的:A predicate is a function that returns bool (or something that can be implicitly converted to bool),也就是返回值是true或者false的函数,使用过scala或者spark的同学都知道有个filter方法,这个高阶函数传入的参数就是一个返回true或者false的函数。如果是在sql语言中,没有方法,只有表达式,where后边的表达式起的作用正是过滤的作用,而这部分语句被sql层解析处理后,在数据库内部正是以谓词的形式呈现的。


Flink流式处理概念简介

链接


一,抽象层次
Flink提供不同级别的抽象来开发流/批处理应用程序。
1.png
1,stateful streaming
最底层。它通过Process Function嵌入到DataStream API中。它允许用户从一个或多个流自由处理事件,并使用一致的容错状态。此外,用户可以注册事件时间和处理时间回调,允许程序实现复杂的计算。
2,Core APIs
实际上,大多数应用程序不需要上述的低级别抽象,而是针对Core API(如DataStream API(有界/无界流))和DataSet API(有界数据集)进行编程。这些流畅的API为数据处理提供了常见的构建模块,如用户指定的各种转换形式,连接,聚合,窗口,状态等。在这些API中处理的数据类型以各自的编程语言表示为classes。底层的Process Function和DataStream API的整合,使得针对一些特定的操作可以实现更低层次的抽象。DataSet API为有界数据集提供了额外的原函数,如循环/迭代。
3,Table API
Table API是以表为中心的声明式DSL,可能是动态更改表(表示流时)。Table API遵循(扩展)关系模型:Table 具有附加schema(与关系数据库中的表相似),API提供操作,例如select,project,join,group-by,aggregate等。Table API代表的是应该做什么逻辑操作,而不是直接指定如何编写操作的源代码。虽然Table API可以通过各种类型的用户定义的函数进行扩展,但它不如Core API那么具有表达力,但使用起来更加简洁(少写很多代码)。
此外,Table API程序还可以通过在执行之前应用优化规则的优化器。
可以在表和DataStream / DataSet之间无缝转换,允许程序将Table API和DataStream和DataSet API混合使用。
4,SQL
最高层次的抽象就是SQL。无论是语法还是表达,该层次的抽象都很像Table API。SQL抽象与Table API紧密交互,SQL查询可以在Table API中定义的表上执行。

二,Programs and Dataflows
Flink程序的基本构建块是流和转换。在概念上,stream 是data records的(潜在的永无止境的)flow,并且变换是将一个或多个流作为输入的操作,并且作为结果产生一个或多个输出流。
执行时,Flink程序被映射成streaming dataflows,由streams 和转换操作符组成。每个dataflow 从一个或多个sources开始,并以一个或多个sinks结束。dataflows 像任意的有向无环图(DAG)。虽然通过迭代构造允许特殊形式的循环,但是为了简单起见,我们大部分都会任务是DAG。
2.png

通常,程序中的变换和数据流中的运算符之间存在一对一的对应关系。然而,有时,一个变换可能由多个转换算子组成。




没找到任何评论,期待你打破沉寂

您需要登录后才可以回帖 登录 | 立即注册

本版积分规则

关闭

推荐上一条 /2 下一条