分享

Spark Streaming自定义Receivers

本帖最后由 坎蒂丝_Swan 于 2014-12-29 16:09 编辑

问题导读    

  学习在Spark Streaming下如何自定义Receivers?






自定义一个Receiver
  1. class SocketTextStreamReceiver(host: String, port: Int(
  2.          extends NetworkReceiver[String]
  3.        {
  4.          protected lazy val blocksGenerator: BlockGenerator =
  5.            new BlockGenerator(StorageLevel.MEMORY_ONLY_SER_2)
  6.          protected def onStart() = {
  7.            blocksGenerator.start()
  8.            val socket = new Socket(host, port)
  9.            val dataInputStream = new BufferedReader(new InputStreamReader(socket.getInputStream(), "UTF-8"))
  10.            var data: String = dataInputStream.readLine()
  11.            while (data != null) {
  12.              blocksGenerator += data
  13.              data = dataInputStream.readLine()
  14.            }
  15.          }
  16.          protected def onStop() {
  17.            blocksGenerator.stop()
  18.          }
  19.        }
复制代码


An Actor as Receiver
  1. class SocketTextStreamReceiver (host:String,
  2.          port:Int,
  3.          bytesToString: ByteString => String) extends Actor with Receiver {
  4.           override def preStart = IOManager(context.system).connect(host, port)
  5.           def receive = {
  6.            case IO.Read(socket, bytes) => pushBlock(bytesToString(bytes))
  7.          }
  8.        }
复制代码


A Sample Spark Application
  1. val ssc = new StreamingContext(master, "WordCountCustomStreamSource",
  2.       Seconds(batchDuration))
  3.   //使用自定义的receiver
  4.   val lines = ssc.networkStream[String](new SocketTextStreamReceiver(
  5.       "localhost", 8445))
  6.   //或者使用这个自定义的actor Receiver
  7.   val lines2 = ssc.actorStream[String](Props(new SocketTextStreamReceiver(
  8.       "localhost",8445, z => z.utf8String)),"SocketReceiver") */
  9.     val words = lines.flatMap(_.split(" "))
  10.     val wordCounts = words.map(x => (x, 1)).reduceByKey(_ + _)
  11.     wordCounts.print()
  12.     ssc.start()
复制代码

提交成功之后,启动Netcat测试一下
  1. $ nc -l localhost 8445 hello world hello hello
复制代码

下面是合并多个输入流的方法:
  1. val lines = ssc.actorStream[String](Props(new SocketTextStreamReceiver(
  2.       "localhost",8445, z => z.utf8String)),"SocketReceiver")
  3.   // Another socket stream receiver
  4.   val lines2 = ssc.actorStream[String](Props(new SocketTextStreamReceiver(
  5.       "localhost",8446, z => z.utf8String)),"SocketReceiver")
  6.   val union = lines.union(lines2)
复制代码







本文转自岑玉海  http://www.cnblogs.com/cenyuhai/p/3577583.html
欢迎加入about云群90371779322273151432264021 ,云计算爱好者群,亦可关注about云腾讯认证空间||关注本站微信

没找到任何评论,期待你打破沉寂

您需要登录后才可以回帖 登录 | 立即注册

本版积分规则

关闭

推荐上一条 /2 下一条