分享

企业大数据经典案例项目:新闻网站实时分析系统

问题导读


1.如何离线采集数据?
2.Hive和HBase如何关联?
3.如何求出热点话题Top10?
4.如何实时采集数据?
5.实时处理数据如何存入Redis中?




本次项目是基于企业大数据经典案例项目(大数据日志分析),全方位、全流程讲解大数据项目的业务分析、技术选型、架构设计、集群规划、安装部署、整合继承与开发和web可视化交互设计。


一、业务需求分析
(一)捕获用户浏览日志信息
(二)实时分析前20名流量最高的新闻话题
(三)实时统计当前线上已曝光的新闻话题
(四)统计哪个时段用户浏览量最高

二、系统架构图设计
20181127174800373.png


三、系统数据流程设计
20181205193002219.png


四、集群资源规划设计
2018120519311727.png


五、数据结构
20181205193141526.png


六、步骤详解
6.1数据的采集
6.1.1离线采集

    Flume+HBase+Hive
    使用HBase?
    首先数据是实时的用户查询日志信息,为了做离线的统计,Flume+HDFS:需要将数据存储HDFS,写入的压力非常大,单位时间内写入的数据量比较大,直接向HDFS中写入的效率比较低,而HBase,高速随机读写的数据库,使用HBase去接收Flume传送过来的数据,当数据被传入HBase之后,可以使用Hive去关联HBase中的数据,之后就可以做一些统计分析的工作。
  flume-sinks是HBase的时候:
    payloadColume:  获取到要写入到Hbase中的列,如果没有做列的配置,默认值就是pCol
    incrementColume:  特有的列,不参与计算,系统保有,如果这一列缺省,同样表中只有有一列内容,而且在操作过程中很有可能报错
    flume+hbase整合中默认的行键类型有四种,默认为uuid
    timestamp:  精度到毫秒 10^-3
    random: 随机数
    nano: 纳米 10^-9

6.1.1.1关联Flume+HBase
  使用sink是hbase或者asynchbase,但是这两种方式其中定义EventSerializer满足不了需求,日志数据有6列,数据以\t分割,但是默认的hbasesink只能写入到一列中,所以可以使用正则或者自定义hbase-sink。

  使用正则表达式:
    Flume提供了两个序列化器。
SimpleHbaseEventSerializer(org.apache.flume.sink.hbase.SimpleHbaseEventSerializer)按原样将事件主体写入HBase,并可选择增加Hbase中的列。RegexHbaseEventSerializer(org.apache.flume.sink.hbase.RegexHbaseEventSerializer)根据给定的正则表达式打破事件体,并将每个部分写入不同的列。
[mw_shl_code=text,true]#define sinks(未测试)
a1.sinks.k1.type = org.apache.flume.sink.hbase.HBaseSink
#a1.sinks.k1.type = hbase
a1.sinks.k1.table  =  new_log
a1.sinks.k1.columnFamily  = cf
a1.sinks.k1.serializer = org.apache.flume.sink.hbase.RegexHbaseEventSerializer
#        \t        匹配一个制表符。等价于 \x09 和 \cI。
a1.sinks.k1.serializer.regex = (.*?)\x09(.*?)\x09(.*?)\x09(.*?)\x09(.*?)\x09(.*?)
a1.sinks.k1.serializer.colNames  = datetime,userid,searchname,retorder,cliorder,cliurl
[/mw_shl_code]
自定义flume-hbase-sink:
  修改flume的源码包flume-hbase-sink model,添加一下MyAsyncHbaseEventSerializer 类和SimpleRowKeyGenerator的自定义方法
[mw_shl_code=java,true]/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements.  See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership.  The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License.  You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied.  See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.flume.sink.hbase;

import com.google.common.base.Charsets;
import org.apache.flume.Context;
import org.apache.flume.Event;
import org.apache.flume.FlumeException;
import org.apache.flume.conf.ComponentConfiguration;
import org.apache.flume.sink.hbase.SimpleHbaseEventSerializer.KeyType;
import org.hbase.async.AtomicIncrementRequest;
import org.hbase.async.PutRequest;

import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;

/**
*  自定义的Flume-Hbase-Sink,主要完成的功能:
*    1. 可以指定任意的分隔符
*    2. 可以完成任意的列内容的分割
*    3. 自定义行键
*/

public class MyAsyncHbaseEventSerializer implements AsyncHbaseEventSerializer {
  private byte[] table;
  private byte[] cf;
  private byte[] payload;
  private byte[] payloadColumn;
  private String separator; //自定义的分隔符
  private byte[] incrementColumn;
  private String rowPrefix;
  private byte[] incrementRow;
  private KeyType keyType;

  @Override
  public void initialize(byte[] table, byte[] cf) {
    this.table = table;
    this.cf = cf;
  }

  @Override
  public List<PutRequest> getActions() {
    List<PutRequest> actions = new ArrayList<PutRequest>();
    if (payloadColumn != null) {

      try {
        String[] columns = new String(payloadColumn).split(",");
        String[] values = new String(payload).split(separator);
        System.out.println("------------>columns: "+ Arrays.toString(columns));
        System.out.println("------------>values: "+ Arrays.toString(values));
        byte[] rowKey = SimpleRowKeyGenerator.getMyRowKey(values[0],values[1]);

        for (int i = 0; i < columns.length; i++){
          byte[] column = columns.getBytes(Charsets.UTF_8);
          byte[] value = values.getBytes(Charsets.UTF_8);

          PutRequest putRequest =  new PutRequest(table, rowKey, cf, column, value);
          actions.add(putRequest);
        }


      } catch (Exception e) {
        throw new FlumeException("Could not get row key!", e);
      }
    }
    return actions;
  }

  public List<AtomicIncrementRequest> getIncrements() {
    List<AtomicIncrementRequest> actions = new ArrayList<AtomicIncrementRequest>();
    if (incrementColumn != null) {
      AtomicIncrementRequest inc = new AtomicIncrementRequest(table,
          incrementRow, cf, incrementColumn);
      actions.add(inc);
    }
    return actions;
  }

  @Override
  public void cleanUp() {
    // TODO Auto-generated method stub

  }

  @Override
  public void configure(Context context) {
    //获取到要写入到hbase中的列,如果没有做列的配置,默认值就是pCol

    String pCol = context.getString("payloadColumn", "pCol");
    String sep = context.getString("separator", ",");

    //特有的列,不参与计算,系统保有,如果这一列缺省,同样表中只有有一列内容,这样的话,而且在操作过程中很可能报错
    String iCol = context.getString("incrementColumn", "iCol");
    rowPrefix = context.getString("rowPrefix", "default");
    String suffix = context.getString("suffix", "uuid");
    if (pCol != null && !pCol.isEmpty()) {
      //获取了所有的列,交给成员标量payloadColumn
      payloadColumn = pCol.getBytes(Charsets.UTF_8);
    }
    if(sep != null && !sep.isEmpty()){
      //指定分隔符
      separator = sep;
      System.out.println("-----------separator: "+separator +"--------------------------");

    }
    if (iCol != null && !iCol.isEmpty()) {
      incrementColumn = iCol.getBytes(Charsets.UTF_8);
    }
    incrementRow = context.getString("incrementRow", "incRow").getBytes(Charsets.UTF_8);
  }

  @Override
  public void setEvent(Event event) {
    this.payload = event.getBody();
  }

  @Override
  public void configure(ComponentConfiguration conf) {
    // TODO Auto-generated method stub
  }

}[/mw_shl_code]
SimpleRowKeyGenerator类的一个自定义方法:
[mw_shl_code=java,true]SimpleRowKeyGenerator类的一个自定义方法:
/**
   *  自定义行键,必须要跟业务相关,同时在定义过程中要尽量避免出现hbase热点问题
   *
   * @param datetime    00:00:00
   *    考虑到热点问题,如果出现时间戳,手机号等待类似连续的数据,尽量倒置
   * @param userid      23908140386148713
   * @return
   * @throws UnsupportedEncodingException
   */
  public static byte[] getMyRowKey(String datetime, String userid) throws UnsupportedEncodingException {
    return (userid + "_" + datetime).getBytes("UTF8");
  }
[/mw_shl_code]
编译打包:mvn clean package -DskipTests
将项目target目录下面的flume-ng-hbase-sink-1.8.0.jar 替换掉集群中的flume中对应的jar包
然后
配置flume-hbase-sink.conf:
[mw_shl_code=text,true]#########################################################
##
##主要作用是文件中的新增内容,将数据打入到HBase中
##    注意:Flume agent的运行,主要就是配置source channel sink
##  下面的a1就是agent的代号,source叫r1 channel叫c1 sink叫k1
#########################################################
a1.sources = r1 r2 r3
a1.sinks = k1
a1.channels = c1

#对于source的配置描述 监听文件中的新增数据 exec
a1.sources.r1.type = exec
a1.sources.r1.command  = tail -F /home/hadoop/data/projects/news/data/news-logs.log


#对于sink的配置描述 使用hbase做数据的消费
a1.sinks.k1.type = asynchbase
a1.sinks.k1.table = new_log
a1.sinks.k1.columnFamily = cf
a1.sinks.k1.serializer = org.apache.flume.sink.hbase.MyAsyncHbaseEventSerializer
# 将flume采集到的数据,写入到hbase表中对应的列
a1.sinks.k1.serializer.payloadColumn = datetime,userid,searchname,retorder,cliorder,cliurl
# 自定义的分隔符
a1.sinks.k1.serializer.separator =  \\t



#对于channel的配置描述 使用文件做数据的临时缓存 这种的安全性要高
a1.channels.c1.type = file
a1.channels.c1.checkpointDir = /home/hadoop/data/projects/news/checkpoint
a1.channels.c1.dataDirs = /home/hadoop/data/projects/news/channel

#通过channel c1将source r1和sink k1关联起来
a1.sources.r1.channels = c1
a1.sinks.k1.channel = c1[/mw_shl_code]
  启动flume:
[mw_shl_code=shell,true] nohup bin/flume-ng agent -n a1 -c conf -f conf/flume-hbase-sink.conf >/dev/null 2>&1 &[/mw_shl_code]
  测试: 进入到hbase shell  >>scan 'new_log'

6.1.1.2 hive和habse关联
[参考hive官网中hbase集成]https://cwiki.apache.org/confluence/display/Hive/HBaseIntegration#HBaseIntegration-HiveHBaseIntegration


Here’s an example which instead targets a distributed HBase cluster where a quorum of 3 zookeepers is used to elect the HBase master:
[mw_shl_code=shell,true]
$HIVE_SRC/build/dist/bin/hive --auxpath $HIVE_SRC/build/dist/lib/hive-hbase-handler-0.9.0.jar,$HIVE_SRC/build/dist/lib/hbase-0.92.0.jar,$HIVE_SRC/build/dist/lib/zookeeper-3.3.4.jar,$HIVE_SRC/build/dist/lib/guava-r09.jar --hiveconf hbase.zookeeper.quorum=zk1.yoyodyne.com,zk2.yoyodyne.com,zk3.yoyodyne.com[/mw_shl_code]

创建一个hive的外部表
[mw_shl_code=sql,true]CREATE EXTERNAL TABLE new_log(
id string,
datetime string,
userid string,
searchname string,
retorder int,
cliorder int,
cliurl string
)
STORED BY 'org.apache.hadoop.hive.hbase.HBaseStorageHandler'
WITH SERDEPROPERTIES ("hbase.columns.mapping" = ":key,cf:datetime,cf:userid,cf:searchname,cf:retorder,cf:cliorder,cf:cliurl")
TBLPROPERTIES ("hbase.table.name" = "new_log", "hbase.mapred.output.outputtable" = "new_log");[/mw_shl_code]
注意:
hbase.columns.mapping
是必须要配置的。 hbase.table.name 配置是可选的,它允许使用Hive中不同表名的表去操作HBase中已知的一张表。如:hive中已知的表 hbase_table_1 可以操作HBase中的表。如果未指定,Hive和HBase的表名称需要相同。 hbase.mapred.output.outputtable 配置是可选的,如果计划向表中插入数据(使用 hbase.mapreduce.TableOutputFormat操作该属性),则需要它。

问题1:
使用select * from new_log limit 5; 可以查到数据,但是使用select count(*) from new_log;统计的数据量为0,
解决方法:进入hive shell,输入set hive.compute.query.using.stats=false,然后运行下查询语句,发现正常。
原因:hive.compute.query.using.stats 默认值: false,当set hive.compute.query.using.stats设置为true时,hive在进行一些如:min,max和count(1)的查询的时候,将使用存储在Metastore中的统计信息来回答。对于基本统计信息收集,需要将配置属性hive.stats.autogather设置为true。详细请参照:hive官网配置参数

问题2:
使用HBase和Hive查询出来的数据量是253637,而flume源news-logs.log的数据量是1724258,剩下的数据去哪了?

???
[mw_shl_code=text,true]追踪源码        追踪日志信息        通过分析        版本不匹配        时间不同步                存在脏数据
打印非常详细的日志
        源码中定义一个静态变量Long统计数据条数
        考虑问题的思路:
                1. 日志输出有没有问题
                2. 代码有没有问题
                3. 服务器是否有问题
                4. 各个软件之间的兼容性是否有问题
                5. 编写的程序的稳定性
[/mw_shl_code]
6.1.1.3 相关的查询分析操作
1.求出热点话题Top10(对应实时的统计:实时分析前20名流量最高的新闻话题)

[mw_shl_code=sql,true]select searchname, count(1) count from new_log group by searchname distribute by searchname sort by count desc limit 10;
[/mw_shl_code]
复习:
hive的排序方式:
oder by: 全局排序,尽量少用,会将所有的数据先拉到一起来排序,相当于是生成一个reduce作业,要尽量去避免只有一个reduce作业的情况。
distribute by:  按照某个字段进行分区
sort by: 针对某一个分区的排序
distribute by 和 sort by 联合在一起可以达到order by 的效果
有一个:  cluster by ,但是比较局限,只支持asc,不能在cluster by后加一个desc,而且要求分区字段和排序字段相同。
group by 只是分组,没有分区和排序的功能。

2.统计当前线上已曝光的新闻话题
[mw_shl_code=sql,true]select distinct searchname from new_log;
[/mw_shl_code]
3.离线统计各个时段的新闻浏览量(统计哪个时段用户浏览量最高)
[mw_shl_code=sql,true]select
        hour(datetime) hour,
        count(1) count
from new_log
group by hour(datetime)            
distribute by hour sort by count desc     
limit 10;
[/mw_shl_code]
6.1.1.3 将离线的统计结果导入到HBase(可以考虑建立二级索引)
为了方便后期数据可视化:
创建一张结果输出表,用于存储hive统计的结果,这张表关联hbase。
[mw_shl_code=sql,true]CREATE TABLE hour_news_count (
        hour string,
        count int
)
STORED BY 'org.apache.hadoop.hive.hbase.HBaseStorageHandler'
WITH SERDEPROPERTIES ("hbase.columns.mapping" = ":key,cf:count")
TBLPROPERTIES ("hbase.table.name" = "hour_news_count", "hbase.mapred.output.outputtable" = "hour_news_count");
[/mw_shl_code]
导入数据集:
[mw_shl_code=sql,true]insert into hour_news_count
select
        hour(datetime) hour,
        count(1) count
from new_log
group by hour(datetime)
distribute by hour sort by count desc
limit 10;
[/mw_shl_code]
数据导入到HBase或者Hive或者mysql或者redis之后,就可以进行数据可视化了。最后使用SSM+Echars+H5展示即可。

实时采集
6.1.2.1 Flume+Kafka

kafka-sink中设置数据确认: 0: 不等待, 1: 只要有leader即可, -1: 需要所有副本都写入成功
kafka.producer.acks 1 How many replicas must acknowledge a message before its considered successfully written. Accepted values are 0 (Never wait for acknowledgement), 1 (wait for leader only), -1 (wait for all replicas) Set this to -1 to avoid data loss in some cases of leader failure.

首先在Kafka里面创建一个topic:
[mw_shl_code=shell,true]kafka-topics.sh --create --topic news-logs --zookeeper hadoop01:2181,hadoop02:2181,hadoop03:2181 --partitions 3 --replication-factor 2
[/mw_shl_code]
指定flume-kafka-sink.conf配置文件(flume1.8不支持kafka2.1):
[mw_shl_code=text,true]#########################################################
##
##主要作用是监听文件中的新增数据,采集到数据之后,导入到kafka
##    注意:Flume agent的运行,主要就是配置source channel sink
##  下面的a1就是agent的代号,source叫r1 channel叫c1 sink叫k1
#########################################################
a1.sources = r1
a1.sinks = k1
a1.channels = c1

#对于source的配置描述 监听文件中的新增数据 exec
a1.sources.r1.type = exec
a1.sources.r1.command  = tail -F /home/hadoop/data/projects/news/data/news_log_rt.log


#对于sink的配置描述 使用kafka日志做数据的消费
#a1.sinks.k1.type = logger
a1.sinks.k1.type = org.apache.flume.sink.kafka.KafkaSink
a1.sinks.k1.kafka.bootstrap.servers = hadoop01:9092,hadoop02:9092,hadoop03:9092
a1.sinks.k1.kafka.topic = news-logs
a1.sinks.k1.kafka.flumeBatchSize = 1000
a1.sinks.k1.kafka.producer.acks = 1
a1.sinks.k1.kafka.producer.linger.ms = 1
#a1.sinks.k1.kafka.producer.compression.type = snappy

#对于channel的配置描述 使用文件做数据的临时缓存 这种的安全性要高
a1.channels.c1.type = memory
a1.channels.c1.capacity = 10000
a1.channels.c1.transactionCapacity = 1000

#通过channel c1将source r1和sink k1关联起来
a1.sources.r1.channels = c1
a1.sinks.k1.channel = c1

[/mw_shl_code]
6.2 数据的统计分析
6.2.1 使用的软件架构:

SparkStreaming+Kafka+web+scala+java+maven+hbase/mysql/redis

6.2.2 项目创建
小技巧:在创建maven项目时,添加一个kv键值对,key=archetypeCatalog,value=local|internet,建议使用local,默认是internet。创建的maven项目是由原型,采用默认每次都要从互联网去下载原型,网络不好时,下载效率非常慢;使用local第一次会将网络上的原型下载到本地仓库,后期直接使用本地仓库中的原型,效率高

6.2.3 实时任务的统计
1. 先获取kafka中的数据:
receiver V.S. Direct
在流计算中有三种语义需要处理:
receiver情况 receiver问题
at least once: 至少一次 一条记录被重复消费
at most once: 最多一次 一条记录由于失败的原因没有被成功消费
exactly once: 恰好一次 保证数据的完整
之前解决方法: kafka --> zk记录offset, sparkstreming,如何解决receiver带来的问题: 开启checkpoint,开启wal
现在的解决方法: KafkaUtils.createDirectStream:保证数据的完整

分布式中遵循的原则: 移动计算不移动数据
数据的本地性: 数据和计算它的代码之间的距离
  • process: 进程级别(数据和代码在同一个进程)
  • node: 数据和代码在同一节点的不同进程中
  • rack: 不同节点,机架
[mw_shl_code=scala,true]def createStream(ssc: StreamingContext, topics: String): DStream[String] = {
    /**
      * ConsumerStrategy: 主要就是用来指定kafka的相关配置信息的。
      * "bootstrap.servers" -> "localhost:9092,anotherhost:9092",
      * "key.deserializer" -> classOf[StringDeserializer],
      * "value.deserializer" -> classOf[StringDeserializer],
      * "group.id" -> "use_a_separate_group_id_for_each_stream",
      * "auto.offset.reset" -> "latest",
      * "enable.auto.commit" -> (false: java.lang.Boolean)
      *
      */
    val kafkaParams = Map(
       "bootstrap.servers" -> "hadoop01:9092,hadoop02:9092,hadoop03:9092",
//      "bootstrap.servers" -> "hadoop:19092,hadoop:29092,hadoop:39092",
      "key.deserializer" -> "org.apache.kafka.common.serialization.StringDeserializer",
      "value.deserializer" -> "org.apache.kafka.common.serialization.StringDeserializer",
      "group.id" -> "news_logs_topgroup"
    )
    /**
      * LocationStrategy: 主要指定的数据加载的策略
      * 主要是读取数据性能问题的考虑,选择不同的数据本地性级别来加载数据。
      * 核心原因在于,新版的api要预先fentch一批数据放到buffer
      * PreferBrokers:  如果brokers和spark作业的executor在相同的节点的时候,可以选择这种方式。
      * PreferConsistent: 最常用的一种数据加载策略,为每一个executor,只会去分发不同partition中的数据。
      * PreferFixed:  固定的方式,在某些host上加载不同的partition,
      * PreferFixed(hostMap:collection.Map[TopicPartition,String])
      * Map[TopicPartition,String](
      * (new TopicPartition("news-logs",0) -> "hadoop03"),
      * (new TopicPartition("news-logs",1) -> "hadoop01"),
      * (new TopicPartition("news-logs",2) -> "hadoop02")
      * )
      * Topic:news-logs PartitionCount:3        ReplicationFactor:2     Configs:
      * Topic: news-logs        Partition: 0    Leader: 3       Replicas: 3,1   Isr: 1,3
      * Topic: news-logs        Partition: 1    Leader: 1       Replicas: 1,2   Isr: 1,2
      * Topic: news-logs        Partition: 2    Leader: 2       Replicas: 2,3   Isr: 2,3
      */
         val kafkaStream: InputDStream[ConsumerRecord[String, String]] =
      KafkaUtils.createDirectStream(
        ssc,
        LocationStrategies.PreferConsistent,
        ConsumerStrategies.Subscribe(topics.split(","), kafkaParams)
      )
        val messages: DStream[String] = kafkaStream.map(record => record.value())
        messages
[/mw_shl_code]
2.实时分析前20名流量最高的新闻话题
(1)需求分析: 实际上求的每一个新闻话题的wordcount,然后排序,求最高的前20,实时分析前20名的流量最高,就要计算截止到目前为止的数据流量,不是单一批次
(2)使用算子: updataStateByKey,需要启动一个checkpoint的位置,用来存储截止到目前为止之前的数据
         截止到目前的所有话题的浏览量
         总的数据量 = 当前批次的数据量 + 之前的数据量
         之前的数据:  A —> 10 B —> 11
         新来了一批数据:  C —> 2 B —> 1
         updateFunc: (Seq[V], Option[S]) 相当于 a left join b a表所有的都显示,b表中对应不上的显示null
(3)新闻话题就是log中的第三列: searchname
(4)前20名 —> 发现streaming无法进行排序,要么是用core,要么是用sql进行排序
         DStream.foreach( rdd --> smDF:rdd.toDF --> createOrReplaceTempView  -->retDF: sql --> retDF.write.mode(SaveMode.Overwrite).jdbc

3.实时统计当前线上已曝光的新闻话题
[mw_shl_code=sql,true]select distinct searchname from t; 略
[/mw_shl_code]
4.统计哪个时段用户浏览量最高统计哪个时段用户浏览量最高
      步骤:
  • 读取kafka的数据  --> createStream(ssc, topics)
  • 统计各个小时的浏览量  --> updateStateByKey
  • 使用sparkStreaming直接将结果落地到mysql数据库
[mw_shl_code=scala,true] usbDStream.foreachRDD(rdd =>{
      if(! rdd.isEmpty()){
        rdd.foreachPartition(partition => {
          if(!partition.isEmpty){
           /* val qr = new QueryRunner(DBCPUtils.getDataSource)
            val sql = "INSERT OVERWRITE b_hour_topics values(?, ?)"
            partition.foreach{
              case (hour, count) =>{
                val objs = Array(hour, count)
                qr.update(sql,objs :_ *)
              }
            }
            qr.batch(sql,params)*/

            classOf[com.mysql.jdbc.Driver]
            val connection = DriverManager.getConnection(DBCPUtils.url,DBCPUtils.username, DBCPUtils.password)
          val sql = "INSERT INTO b_hour_topics values(?, ?)"
          val ps = connection.prepareStatement(sql)
          partition.foreach{
             case(hour, count) => {
              ps.setString(1,hour)
              ps.setInt(2,count)
              ps.addBatch()
            }}
          ps.executeBatch()
          ps.close()
          connection.close()
       }
     })
   }
})
[/mw_shl_code]
但是在操作过程中发现出现大量的数据重复,数据乱码等存在一些问题,故采取换一种方法:

配置文件(dbcp-config.properties):
[mw_shl_code=text,true]#连接设置
driverClassName=com.mysql.jdbc.Driver
#在mysql5.8中需要加useSSL=false
url=jdbc:mysql://hadoop01:3306/news_db?useSSL=false
username=root
password=root

#<!-- 初始化连接 -->
initialSize=10

#最大连接数量
maxActive=50

#<!-- 最大空闲连接 -->
maxIdle=20

#<!-- 最小空闲连接 -->
minIdle=5

#<!-- 超时等待时间以毫秒为单位 6000毫秒/1000等于60秒 -->
maxWait=60000

#JDBC驱动建立连接时附带的连接属性属性的格式必须为这样:[属性名=property;]
#注意:"user" 与 "password" 两个属性会被明确地传递,因此这里不需要包含他们。
connectionProperties=useUnicode=true;characterEncoding=utf8
#指定由连接池所创建的连接的自动提交(auto-commit)状态。
defaultAutoCommit=true
[/mw_shl_code]
先获取JDBC连接:
[mw_shl_code=java,true]package rk.news.utils;

import org.apache.commons.dbcp.BasicDataSourceFactory;

import javax.sql.DataSource;
import java.io.IOException;
import java.io.InputStream;
import java.sql.Connection;
import java.sql.ResultSet;
import java.sql.SQLException;
import java.sql.Statement;
import java.util.Properties;

/**
* @Author rk
* @Date 2018/12/1 14:59
* @Description:
*      dbcp和p0都是常用的数据库的连接池
**/
public class DBCPUtils {

    private static DataSource ds;
    private DBCPUtils(){}
    public static String url;
    public static String username;
    public static String password;
    static {
        try {
            Properties properties = new Properties();
            String path = "dbcp-config.properties";
            InputStream in = DBCPUtils.class.getClassLoader().getResourceAsStream(path);
            properties.load(in);
            url = properties.getProperty("url");
            username = properties.getProperty("username");
            password = properties.getProperty("password");
            ds = BasicDataSourceFactory.createDataSource(properties);
        } catch (IOException e) {
            e.printStackTrace();
        } catch (Exception e) {
            e.printStackTrace();
        }
    }

    public static DataSource getDataSource(){
        return ds;
    }

    public static Connection getConnection(){
        try {
            return ds.getConnection();
        } catch (SQLException e) {
            e.printStackTrace();
        }
        return null;
    }

    public static void release(Connection con, Statement st, ResultSet rs){
        try {
                if(rs != null) {
                    rs.close();
                }
            } catch (SQLException e) {
                e.printStackTrace();
            }finally {
            try {
                if (st != null) {
                    st.close();
                }
            } catch (SQLException e) {
                e.printStackTrace();
            } finally {
                try {
                    if (con != null) {
                        con.close();
                    }
                } catch (SQLException e) {
                    e.printStackTrace();
                }
            }
        }
    }
}

[/mw_shl_code]
封装对象HourTopics :
[mw_shl_code=java,true]package rk.news.entity;

/**
* @Author rk
* @Date 2018/12/3 17:04
* @Description:
*      create table b_hour_top(
*          hour varchar(10),
*          times int
*        );
**/
public class HourTopics {
    private String hour;
    private int times;

    public String getHour() {
        return hour;
    }

    public void setHour(String hour) {
        this.hour = hour;
    }

    public int getTimes() {
        return times;
    }

    public void setTimes(int times) {
        this.times = times;
    }
}

[/mw_shl_code]
统计各时间TopN接口(IHourTopicsDao ):
[mw_shl_code=java,true]public interface IHourTopicsDao {
    void insert(HourTopics ht);
    void insertBatch(List<HourTopics> hts);
}
[/mw_shl_code]
统计各时间实现类(DefaultHourTopicsDaoImpl):
[mw_shl_code=java,true]public class DefaultHourTopicsDaoImpl implements IHourTopicsDao {
    private QueryRunner qr = new QueryRunner(DBCPUtils.getDataSource());

    String insertSQL = "INSERT INTO b_hour_topics (hour, times) values(?,?)";
    String updateSQL = "UPDATE b_hour_topics SET times = ? WHERE hour = ? ";
    String selectSQL = "SELECT times FROM b_hour_topics WHERE hour = ?";
    @Override
    public void insert(HourTopics ht) {
        try {
            //首先查看当前小时对应的数据是否存在,如果不存在,则插入,如果存在,覆盖
            Integer times = qr.query(selectSQL, new ScalarHandler<>(), ht.getHour());
            if (times == null ){//数据库中没有
                qr.update(insertSQL, ht.getHour(), ht.getTimes());
            }else{
                qr.update(updateSQL, ht.getTimes(), ht.getHour());
            }
        } catch (SQLException e) {
            e.printStackTrace();
        }
    }
    @Override
    public void insertBatch(List<HourTopics> hts) {
        //进行插入的数据
        List<HourTopics> insertList = new ArrayList<>();
        //更新的数据
        List<HourTopics> updateList = new ArrayList<>();

        try {
            //一条一条的判断当前数据库中是否已经存在对应的数据
            for(HourTopics ht : hts){
                Integer times = qr.query(selectSQL, new ScalarHandler<>(), ht.getHour());
                if (times == null ){//数据库中没有
                    insertList.add(ht);
                }else{
                    updateList.add(ht);
                }
            }
            //执行插入
            if(!insertList.isEmpty()){
                Object[][] insertParams = new Object[insertList.size()][];

                for (int i = 0; i < insertList.size(); i++){
                    HourTopics ht = insertList.get(i);
                    Object[] obj = {ht.getHour(),ht.getTimes()};
                    insertParams = obj;
                }
                qr.batch(insertSQL,insertParams);
            }
            //执行更新
            if(!updateList.isEmpty()){
                Object[][] updateParams = new Object[updateList.size()][];

                for (int i = 0; i < updateList.size(); i++){
                    HourTopics ht = updateList.get(i);
                    Object[] obj = {ht.getTimes(),ht.getHour()};
                    updateParams = obj;
                }
                qr.batch(updateSQL,updateParams);
            }
        } catch (SQLException e) {
            e.printStackTrace();
        }
    }
}

[/mw_shl_code]
小技巧:TRUNCATE和DELETE的区别:
TRUNCATE和DELETE的区别 当加入表中有1000w条记录
TRUNCATE TABLE ‘b_hour_topics’; 执行 2条:第一条 drop talbe  第二条:  create table
DELETE FROM ‘b_hour_topics’; 执行1000w次

在实时或准实时统计中,不建议将数据落地到mysql中,高频操作,mysql服务承载压力有限,如果可以到达大概每秒10w条,可以考虑,一般建议将结果落地到hbase, redis, es, ignite等。这里我们使用redis,将话题数据插入到redis中。

但是插入到数据库是一个action操作,所以需要先把数据进行持久化。
[mw_shl_code=text,true]    /**
      * MEMORY_ONLY:  效率最快,最消耗资源
      *         数据是没有经过序列化保存在内存中,容易造成OOM,gc的频率和object的个数成正比 不建议使用(100条以内可以考虑)
      * MEMORY_ONLY_SER:  针对上述的优化,数据经过序列化存储,一个partition的数据就只有一个object(默认3M以上进入老年代)
      *          这里的性能消耗在于:序列化和反序列化 kryo
      * MERORY_AND_DISK (不建议)
      * MEMORY_AND_DISK_SER(建议)
      * DISK_ONLY(不用)
      * MEMORY_ONLY_2,MEMORY_AND_DISK_2(不建议,除非对数据的容错性要求非常高)
      * OFF_HEAP(experimental)  堆外内存: 以上数据都要占用executor的内存
      *     executor中的  spark.storge.memoeyFraction:  0.6(持久化的数据,占到了executor内存的60%)
      *                   spark.shuffle.memoeyFraction: 0.2(shuffle,占到了executor内存的20%)
      *
      * alluxio(tachyon --> 基于内存的hdfs版本)
      */
        batchTopics2Count.persist(StorageLevel.MEMORY_ONLY)
[/mw_shl_code]
[mw_shl_code=text,true]    /**
      * 将话题数据插入到redis中
      */
    batchTopics2Count.foreachRDD(rdd => {
      if(! rdd.isEmpty()){
        rdd.foreachPartition(partition =>{
          if(! partition.isEmpty){
            val jedis = JedisUtil.getJedis
            partition.foreach{case (topic, topicNum) =>{
              jedis.sadd(Constants.NEW_TOPICS, topic)
            }}
            JedisUtil.close(jedis)
          }
        })
      }
    })
[/mw_shl_code]
之后再将数据写入到redis
首先编写一个配置文件:

[mw_shl_code=text,true]########################################
##
##这个是redis的配置文件
##
########################################
#host or ip
host=hadoop01

#port redis connect port
port=6379

##最大空闲连接树
maxIdle=10

##最大连接数
maxTotal=100

##创建连接超时时间
maxWaitMillis=2000

##获取连接测试是否可用
testOnBorrow=true

##超时时间
timeout=20000

##认证面
#password=hadoop

[/mw_shl_code]
然后通过配置文件获取一个Jedis连接:
[mw_shl_code=java,true]package rk.news.utils;

import redis.clients.jedis.Jedis;
import redis.clients.jedis.JedisPool;
import redis.clients.jedis.JedisPoolConfig;
import rk.news.conf.RedisConfig;

import java.io.IOException;
import java.util.Properties;

/**
* @Author rk
* @Date 2018/12/3 19:02
* @Description:
**/
public class JedisUtil {
    private static JedisPool jedisPool;
    private JedisUtil(){}
    private static Properties prop;
    static {
        prop = new Properties();
        try {
            prop.load(JedisUtil.class.getClassLoader().getResourceAsStream("redis.conf"));
        } catch (IOException e) {
            e.printStackTrace();
        }
    }

    public static Jedis getJedis(){
        if(jedisPool == null){
            synchronized (JedisUtil.class){
                JedisPoolConfig poolConfig = new JedisPoolConfig();
                poolConfig.setMaxIdle(Integer.valueOf(prop.getProperty(RedisConfig.MAX_IDLE)));
                poolConfig.setMaxTotal(Integer.valueOf(prop.getProperty(RedisConfig.MAX_TOTAL)));
                poolConfig.setMaxWaitMillis(Integer.valueOf(prop.getProperty(RedisConfig.MAX_WAIT_MILLIS)));
                jedisPool = new JedisPool(poolConfig,
                        prop.getProperty(RedisConfig.HOST),
                        Integer.valueOf(prop.getProperty(RedisConfig.PORT)),
                        Integer.valueOf(prop.getProperty(RedisConfig.TIME_OUT)));
            }
        }
        return jedisPool.getResource();
    }
    public static void close(Jedis jedis){
        if (jedis != null){
            jedis.close();
        }
    }
}

[/mw_shl_code]
获取一个redis的key:
[mw_shl_code=java,true]package rk.news.conf;

/**
* @Author rk
* @Date 2018/12/3 19:09
* @Description:
**/
public class Constants {
    public static String NEW_TOPICS = "news.topics";
}

[/mw_shl_code]
写入到redis中:
[mw_shl_code=java,true] /**
      * 将话题数据插入到redis中
      *     实时或准实时统计中,不建议将数据落地到mysql中,高频操作,mysql服务承载压力有限,如果可以到达大概每秒10w条,可以考虑
      *     所以建议,可以将结果落地到hbase, redis, es, ignite等
      *
      */
    batchTopics2Count.foreachRDD(rdd => {
      if(! rdd.isEmpty()){
        rdd.foreachPartition(partition =>{
          if(! partition.isEmpty){
            val jedis = JedisUtil.getJedis
            partition.foreach{case (topic, topicNum) =>{
              jedis.sadd(Constants.NEW_TOPICS, topic)
            }}
            JedisUtil.close(jedis)
          }
        })
      }
    })
[/mw_shl_code]
后面获取到redis中的数据:
[mw_shl_code=java,true] /**
     * redis基本数据:
     *      string: len set get
     *      list:   lpush
     *      hash:   hget
     *      set:    sadd smembers   scard: 获取长度
     *      zset:   z
     */
    @Override
    public long getAllTopics() {
        Jedis jedis = JedisUtil.getJedis();
        Long totalTopics = jedis.scard(Constants.NEW_TOPICS);
        JedisUtil.close(jedis);
        return totalTopics;
    }
[/mw_shl_code]

代码下载: NewsProject.zip (290.1 KB, 下载次数: 1, 售价: 1 云币)

已有(1)人评论

跳转到指定楼层
您需要登录后才可以回帖 登录 | 立即注册

本版积分规则

关闭

推荐上一条 /2 下一条