·ÖÏí

Spark Streaming±à³Ì½²½â

±¾Ìû×îºóÓÉ howtodown ÓÚ 2014-8-24 20:07 ±à¼­
ÎÊÌâµ¼¶Á£º

1.ʲôÊÇSpark Streaming£¿
2.Spark Streaming¿ÉÒÔ½ÓÊÜÄÇЩÊý¾ÝÔ´£¿
3.Dstream£¬ÎÒÃÇ¿ÉÒÔ½øÐÐÄÄÁ½ÖÖ²Ù×÷£¿




²Î¿¼£ºSpark£ºÒ»¸ö¸ßЧµÄ·Ö²¼Ê½¼ÆËãϵͳ
ÔÚ¿´spark Streaming£¬ÎÒÃÇÐèÒªÊ×ÏÈÖªµÀʲôÊÇ
Spark streaming£¿
Spark streaming: ¹¹½¨ÔÚSparkÉÏ´¦ÀíStreamÊý¾ÝµÄ¿ò¼Ü£¬»ù±¾µÄÔ­ÀíÊǽ«StreamÊý¾Ý·Ö³ÉСµÄʱ¼äƬ¶Ï£¨¼¸Ã룩£¬ÒÔÀàËÆbatchÅúÁ¿´¦ÀíµÄ·½Ê½À´´¦ÀíÕâС²¿·ÖÊý¾Ý¡£Spark Streaming¹¹½¨ÔÚSparkÉÏ£¬Ò»·½ÃæÊÇÒòΪSparkµÄµÍÑÓ³ÙÖ´ÐÐÒýÇ棨100ms+£©¿ÉÒÔÓÃÓÚʵʱ¼ÆË㣬ÁíÒ»·½ÃæÏà±È»ùÓÚRecordµÄÆäËü´¦Àí¿ò¼Ü£¨ÈçStorm£©£¬RDDÊý¾Ý¼¯¸üÈÝÒ××ö¸ßЧµÄÈÝ´í´¦Àí¡£´ËÍâСÅúÁ¿´¦ÀíµÄ·½Ê½Ê¹µÃËü¿ÉÒÔͬʱ¼æÈÝÅúÁ¿ºÍʵʱÊý¾Ý´¦ÀíµÄÂß¼­ºÍËã·¨¡£·½±ãÁËһЩÐèÒªÀúÊ·Êý¾ÝºÍʵʱÊý¾ÝÁªºÏ·ÖÎöµÄÌض¨Ó¦Óó¡ºÏ¡£





Overview
Spark StreamingÊôÓÚSparkµÄºËÐÄapi£¬ËüÖ§³Ö¸ßÍÌÍÂÁ¿¡¢Ö§³ÖÈÝ´íµÄʵʱÁ÷Êý¾Ý´¦Àí¡£
Ëü¿ÉÒÔ½ÓÊÜÀ´×ÔKafka, Flume, Twitter, ZeroMQºÍTCP SocketµÄÊý¾ÝÔ´£¬Ê¹Óüòµ¥µÄapiº¯Êý±ÈÈç map, reduce, join, windowµÈ²Ù×÷£¬»¹¿ÉÒÔÖ±½ÓʹÓÃÄÚÖõĻúÆ÷ѧϰËã·¨¡¢Í¼Ëã·¨°üÀ´´¦ÀíÊý¾Ý¡£

streaming-arch1.png


ËüµÄ¹¤×÷Á÷³ÌÏñÏÂÃæµÄͼËùʾһÑù£¬½ÓÊܵ½ÊµÊ±Êý¾Ýºó£¬¸øÊý¾Ý·ÖÅú´Î£¬È»ºó´«¸øSpark Engine´¦Àí×îºóÉú³É¸ÃÅú´ÎµÄ½á¹û¡£

streaming-flow2.png

ËüÖ§³ÖµÄÊý¾ÝÁ÷½ÐDstream£¬Ö±½ÓÖ§³ÖKafka¡¢FlumeµÄÊý¾ÝÔ´¡£DstreamÊÇÒ»ÖÖÁ¬ÐøµÄRDDs£¬ÏÂÃæÊÇÒ»¸öÀý×Ó°ïÖú´ó¼ÒÀí½âDstream¡£

A Quick Example
  1. // ´´½¨StreamingContext£¬1ÃëÒ»¸öÅú´Î
  2. val ssc = new StreamingContext(sparkConf, Seconds(1));
  3. // »ñµÃÒ»¸öDStream¸ºÔðÁ¬½Ó ¼àÌý¶Ë¿Ú:µØÖ·
  4. val lines = ssc.socketTextStream(serverIP, serverPort);
  5. // ¶ÔÿһÐÐÊý¾ÝÖ´ÐÐSplit²Ù×÷
  6. val words = lines.flatMap(_.split(" "));
  7. // ͳ¼ÆwordµÄÊýÁ¿
  8. val pairs = words.map(word => (word, 1));
  9. val wordCounts = pairs.reduceByKey(_ + _);
  10. // Êä³ö½á¹û
  11. wordCounts.print();
  12. ssc.start();             // ¿ªÊ¼
  13. ssc.awaitTermination();  // ¼ÆËãÍê±ÏÍ˳ö
¸´ÖÆ´úÂë

¾ßÌåµÄ´úÂë¿ÉÒÔ·ÃÎÊÕâ¸öÒ³Ã棺

https://github.com/apache/incuba ... workWordCount.scala

Èç¹ûÒѾ­×°ºÃSparkµÄÅóÓÑ£¬ÎÒÃÇ¿ÉÒÔͨ¹ýÏÂÃæµÄÀý×ÓÊÔÊÔ¡£

Ê×ÏÈ£¬Æô¶¯Netcat£¬Õâ¸ö¹¤¾ßÔÚUnix-likeµÄϵͳ¶¼´æÔÚ£¬ÊǸö¼òÒ×µÄÊý¾Ý·þÎñÆ÷¡£

ʹÓÃÏÂÃæÕâ¾äÃüÁîÀ´Æô¶¯Netcat£º


  1. $ nc -lk 9999
¸´ÖÆ´úÂë

½Ó×ÅÆô¶¯example

  1. $ ./bin/run-example org.apache.spark.streaming.examples.NetworkWordCount local[2] localhost 9999
¸´ÖÆ´úÂë

ÔÚNetcatÕâ¶ËÊäÈëhello world£¬¿´SparkÕâ±ßµÄ

  1. # TERMINAL 1:
  2. # Running Netcat
  3. $ nc -lk 9999
  4. hello world
  5. ...
  6. # TERMINAL 2: RUNNING NetworkWordCount or JavaNetworkWordCount
  7. $ ./bin/run-example org.apache.spark.streaming.examples.NetworkWordCount local[2] localhost 9999
  8. ...
  9. -------------------------------------------
  10. Time: 1357008430000 ms
  11. -------------------------------------------
  12. (hello,1)
  13. (world,1)
  14. ...
¸´ÖÆ´úÂë

Basics
ÏÂÃæÕâ¿éÊÇÈçºÎ±àд´úÂëµÄÀ²£¬ÍÛßÇßÇ£¡
Ê×ÏÈÎÒÃÇÒªÔÚSBT»òÕßMaven¹¤³ÌÌí¼ÓÒÔÏÂÐÅÏ¢£º


  1. groupId = org.apache.spark
  2. artifactId = spark-streaming_2.10
  3. version = 0.9.0-incubating
¸´ÖÆ´úÂë
  1. //ÐèҪʹÓÃÒ»ÏÂÊý¾ÝÔ´µÄ£¬»¹ÒªÌí¼ÓÏàÓ¦µÄÒÀÀµ
  2. Source    Artifact
  3. Kafka     spark-streaming-kafka_2.10
  4. Flume     spark-streaming-flume_2.10
  5. Twitter     spark-streaming-twitter_2.10
  6. ZeroMQ     spark-streaming-zeromq_2.10
  7. MQTT     spark-streaming-mqtt_2.10
¸´ÖÆ´úÂë

½ÓמÍÊÇʵÀý»¯

  1. new StreamingContext(master, appName, batchDuration, [sparkHome], [jars])
¸´ÖÆ´úÂë

ÕâÊÇ֮ǰµÄÀý×Ó¶ÔDStreamµÄ²Ù×÷¡£

streaming-dstream-ops3.png


Input Sources
³ýÁËsocketsÖ®Í⣬ÎÒÃÇ»¹¿ÉÒÔÕâÑù´´½¨Dstream

  1. streamingContext.fileStream(dataDirectory)
¸´ÖÆ´úÂë


ÕâÀïÓÐ3¸öÒªµã£º
£¨1£©dataDirectoryϵÄÎļþ¸ñʽ¶¼ÊÇÒ»Ñù
£¨2£©ÔÚÕâ¸öĿ¼Ï´´½¨Îļþ¶¼ÊÇͨ¹ýÒƶ¯»òÕßÖØÃüÃûµÄ·½Ê½´´½¨µÄ
£¨3£©Ò»µ©Îļþ½øÈ¥Ö®ºó¾Í²»ÄÜÔٸıä

¼ÙÉèÎÒÃÇÒª´´½¨Ò»¸öKafkaµÄDstream¡£

  1. import org.apache.spark.streaming.kafka._
  2. KafkaUtils.createStream(streamingContext, kafkaParams, ...)
¸´ÖÆ´úÂë


Èç¹ûÎÒÃÇÐèÒª×Ô¶¨ÒåÁ÷µÄreceiver£¬¿ÉÒԲ鿴https://spark.incubator.apache.o ... stom-receivers.html

Operations
¶ÔÓÚDstream£¬ÎÒÃÇ¿ÉÒÔ½øÐÐÁ½ÖÖ²Ù×÷£¬transformations ºÍ output

Transformations
  1. Transformation                          Meaning
  2. map(func)                        ¶Ôÿһ¸öÔªËØÖ´ÐÐfunc·½·¨
  3. flatMap(func)                    ÀàËÆmapº¯Êý£¬µ«ÊÇ¿ÉÒÔmapµ½0+¸öÊä³ö
  4. filter(func)                     ¹ýÂË
  5. repartition(numPartitions)       Ôö¼Ó·ÖÇø£¬Ìá¸ß²¢ÐжȠ    
  6. union(otherStream)               ºÏ²¢Á½¸öÁ÷
  7. count()     ¡¡¡¡¡¡¡¡¡¡¡¡¡¡¡¡       ͳ¼ÆÔªËصĸöÊý
  8. reduce(func)                     ¶ÔRDDsÀïÃæµÄÔªËؽøÐоۺϲÙ×÷£¬2¸öÊäÈë²ÎÊý£¬1¸öÊä³ö²ÎÊý
  9. countByValue()                   Õë¶ÔÀàÐÍͳ¼Æ£¬µ±Ò»¸öDstreamµÄÔªËصÄÀàÐÍÊÇKµÄʱºò£¬µ÷ÓÃËü»á·µ»ØÒ»¸öеÄDstream£¬°üº¬<K,Long>¼üÖµ¶Ô£¬LongÊÇÿ¸öK³öÏÖµÄƵÂÊ¡£
  10. reduceByKey(func, [numTasks])    ¶ÔÓÚÒ»¸ö(K, V)ÀàÐ͵ÄDstream£¬ÎªÃ¿¸ökey£¬Ö´ÐÐfuncº¯Êý£¬Ä¬ÈÏÊÇlocalÊÇ2¸öỊ̈߳¬clusterÊÇ8¸öỊ̈߳¬Ò²¿ÉÒÔÖ¸¶¨numTasks
  11. join(otherStream, [numTasks])    °Ñ(K, V)ºÍ(K, W)µÄDstreamÁ¬½Ó³ÉÒ»¸ö(K, (V, W))µÄÐÂDstream
  12. cogroup(otherStream, [numTasks]) °Ñ(K, V)ºÍ(K, W)µÄDstreamÁ¬½Ó³ÉÒ»¸ö(K, Seq[V], Seq[W])µÄÐÂDstream
  13. transform(func)                  ×ª»»²Ù×÷£¬°ÑÔ­À´µÄRDDͨ¹ýfuncת»»³ÉÒ»¸öеÄRDD
  14. updateStateByKey(func)           Õë¶ÔkeyʹÓÃfuncÀ´¸üÐÂ״̬ºÍÖµ£¬¿ÉÒÔ½«state¸ÃΪÈκÎÖµ
¸´ÖÆ´úÂë


UpdateStateByKey Operation
ʹÓÃÕâ¸ö²Ù×÷£¬ÎÒÃÇÊÇÏ£Íû±£´æËü״̬µÄÐÅÏ¢£¬È»ºó³ÖÐøµÄ¸üÐÂËü£¬Ê¹ÓÃËüÓÐÁ½¸ö²½Ö裺
£¨1£©¶¨Òå״̬£¬Õâ¸ö״̬¿ÉÒÔÊÇÈÎÒâµÄÊý¾ÝÀàÐÍ
£¨2£©¶¨Òå״̬¸üк¯Êý£¬´ÓÇ°Ò»¸ö״̬¸ü¸ÄеÄ״̬
ÏÂÃæչʾһ¸öÀý×Ó£º

  1. def updateFunction(newValues: Seq[Int], runningCount: Option[Int]): Option[Int] = {
  2.     val newCount = ...  // add the new values with the previous running count to get the new count
  3.     Some(newCount)
  4. }
¸´ÖÆ´úÂë


Ëü¿ÉÒÔÓÃÔÚ°üº¬(word, 1) µÄDstreamµ±ÖУ¬±ÈÈçÇ°ÃæչʾµÄexample

  1. val runningCounts = pairs.updateStateByKey[Int](updateFunction _)
¸´ÖÆ´úÂë


Ëü»áÕë¶ÔÀïÃæµÄÿ¸öwordµ÷ÓÃһϸüк¯Êý£¬newValuesÊÇ×îеÄÖµ£¬runningCountÊÇ֮ǰµÄÖµ¡£


Transform Operation
ºÍtransformWithÒ»Ñù£¬¿ÉÒÔ¶ÔÒ»¸öDstream½øÐÐRDD->RDD²Ù×÷£¬±ÈÈçÎÒÃÇÒª¶ÔDstreamÁ÷ÀïµÄRDDºÍÁíÍâÒ»¸öÊý¾Ý¼¯½øÐÐjoin²Ù×÷£¬µ«ÊÇDstreamµÄAPIûÓÐÖ±½Ó±©Â¶³öÀ´£¬ÎÒÃǾͿÉÒÔʹÓÃtransform·½·¨À´½øÐÐÕâ¸ö²Ù×÷£¬ÏÂÃæÊÇÀý×Ó£º

  1. val spamInfoRDD = sparkContext.hadoopFile(...) // RDD containing spam information
  2. val cleanedDStream = inputDStream.transform(rdd => {
  3.   rdd.join(spamInfoRDD).filter(...) // join data stream with spam information to do data cleaning
  4.   ...
  5. })
¸´ÖÆ´úÂë


ÁíÍ⣬ÎÒÃÇÒ²¿ÉÒÔÔÚÀïÃæʹÓûúÆ÷ѧϰËã·¨ºÍͼËã·¨¡£

Window Operations
streaming-dstream-window4.png ¡¢

ÏȾٸöÀý×Ó°É£¬±ÈÈçÇ°ÃæµÄword countµÄÀý×Ó£¬ÎÒÃÇÏëҪÿ¸ô10Ãë¼ÆËãÒ»ÏÂ×î½ü30ÃëµÄµ¥´Ê×ÜÊý¡£

ÎÒÃÇ¿ÉÒÔʹÓÃÒÔÏÂÓï¾ä£º

  1. // Reduce last 30 seconds of data, every 10 seconds
  2. val windowedWordCounts = pairs.reduceByKeyAndWindow(_ + _, Seconds(30), Seconds(10))
¸´ÖÆ´úÂë

ÕâÀïÃæÌáµ½ÁËwindowsµÄÁ½¸ö²ÎÊý£º

£¨1£©window length£ºwindowµÄ³¤¶ÈÊÇ30Ã룬×î½ü30ÃëµÄÊý¾Ý

£¨2£©slice interval£º¼ÆËãµÄʱ¼ä¼ä¸ô

ͨ¹ýÕâ¸öÀý×Ó£¬ÎÒÃÇ´ó¸ÅÄܹ»´°¿ÚµÄÒâ˼ÁË£¬¶¨ÆÚ¼ÆË㻬¶¯µÄÊý¾Ý¡£

ÏÂÃæÊÇwindowµÄһЩ²Ù×÷º¯Êý£¬»¹ÊÇÓеã¶ùÀí½â²»ÁËwindowµÄ¸ÅÄMeaning¾Í²»·­ÒëÁË£¬Ö±½Óɾµô

  1. Transformation                                                                              Meaning
  2. window(windowLength, slideInterval)     
  3. countByWindow(windowLength, slideInterval)     
  4. reduceByWindow(func, windowLength, slideInterval)     
  5. reduceByKeyAndWindow(func, windowLength, slideInterval, [numTasks])     
  6. reduceByKeyAndWindow(func, invFunc, windowLength, slideInterval, [numTasks])   
  7. countByValueAndWindow(windowLength, slideInterval, [numTasks])
¸´ÖÆ´úÂë


Output Operations
  1. Output Operation                                      Meaning
  2. print()                                 ´òÓ¡µ½¿ØÖÆ̨
  3. foreachRDD(func)                        ¶ÔDstreamÀïÃæµÄÿ¸öRDDÖ´ÐÐfunc£¬±£´æµ½Íⲿϵͳ
  4. saveAsObjectFiles(prefix, [suffix])     ±£´æÁ÷µÄÄÚÈÝΪSequenceFile, ÎļþÃû : "prefix-TIME_IN_MS[.suffix]".
  5. saveAsTextFiles(prefix, [suffix])       ±£´æÁ÷µÄÄÚÈÝΪÎı¾Îļþ, ÎļþÃû : "prefix-TIME_IN_MS[.suffix]".
  6. saveAsHadoopFiles(prefix, [suffix])     ±£´æÁ÷µÄÄÚÈÝΪhadoopÎļþ, ÎļþÃû : "prefix-TIME_IN_MS[.suffix]".
¸´ÖÆ´úÂë


Persistence
DstreamÖеÄRDDÒ²¿ÉÒÔµ÷ÓÃpersist()·½·¨±£´æÔÚÄÚ´æµ±ÖУ¬µ«ÊÇ»ùÓÚwindowºÍstateµÄ²Ù×÷£¬reduceByWindow,reduceByKeyAndWindow,updateStateByKeyËüÃǾÍÊÇÒþʽµÄ±£´æÁË£¬ÏµÍ³ÒѾ­°ïËü×Ô¶¯±£´æÁË¡£
´ÓÍøÂç½ÓÊÕµÄÊý¾Ý(such as, Kafka, Flume, sockets, etc.)£¬Ä¬ÈÏÊDZ£´æÔÚÁ½¸ö½ÚµãÀ´ÊµÏÖÈÝ´íÐÔ£¬ÒÔÐòÁл¯µÄ·½Ê½±£´æÔÚÄÚ´æµ±ÖС£


RDD Checkpointing
״̬µÄ²Ù×÷ÊÇ»ùÓÚ¶à¸öÅú´ÎµÄÊý¾ÝµÄ¡£Ëü°üÀ¨»ùÓÚwindowµÄ²Ù×÷ºÍupdateStateByKey¡£ÒòΪ״̬µÄ²Ù×÷ÒªÒÀÀµÓÚÉÏÒ»¸öÅú´ÎµÄÊý¾Ý£¬ËùÒÔËüÒª¸ù¾Ýʱ¼ä£¬²»¶ÏÀÛ»ýÔªÊý¾Ý¡£ÎªÁËÇå¿ÕÊý¾Ý£¬ËüÖ§³ÖÖÜÆÚÐԵļì²éµã£¬Í¨¹ý°ÑÖмä½á¹û±£´æµ½hdfsÉÏ¡£ÒòΪ¼ì²é²Ù×÷»áµ¼Ö±£´æµ½hdfsÉϵĿªÏú£¬ËùÒÔÉèÖÃÕâ¸öʱ¼ä¼ä¸ô£¬ÒªºÜÉ÷ÖØ¡£¶ÔÓÚСÅú´ÎµÄÊý¾Ý£¬±ÈÈçÒ»ÃëµÄ£¬¼ì²é²Ù×÷»á´ó´ó½µµÍÍÌÍÂÁ¿¡£µ«ÊǼì²éµÄ¼ä¸ôÌ«³¤£¬»áµ¼ÖÂÈÎÎñ±ä´ó¡£Í¨³£À´Ëµ£¬5-10ÃëµÄ¼ì²é¼ä¸ôʱ¼äÊDZȽϺÏÊʵġ£

  1. ssc.checkpoint(hdfsPath)  //ÉèÖüì²éµãµÄ±£´æλÖÃ
  2. dstream.checkpoint(checkpointInterval)  //ÉèÖüì²éµã¼ä¸ô
¸´ÖÆ´úÂë


¶ÔÓÚ±ØÐëÉèÖüì²éµãµÄDstream£¬±ÈÈçͨ¹ýupdateStateByKeyºÍreduceByKeyAndWindow´´½¨µÄDstream£¬Ä¬ÈÏÉèÖÃÊÇÖÁÉÙ10Ãë¡£

Performance Tuning
¶ÔÓÚµ÷ÓÅ£¬¿ÉÒÔ´ÓÁ½¸ö·½Ã濼ÂÇ£º
£¨1£©ÀûÓü¯Èº×ÊÔ´£¬¼õÉÙ´¦Àíÿ¸öÅú´ÎµÄÊý¾ÝµÄʱ¼ä
£¨2£©¸øÿ¸öÅú´ÎµÄÊý¾ÝÁ¿µÄÉ趨һ¸öºÏÊʵĴóС

Level of Parallelism
ÏñһЩ·Ö²¼Ê½µÄ²Ù×÷£¬±ÈÈçreduceByKeyºÍreduceByKeyAndWindow£¬Ä¬ÈϵÄ8¸ö²¢·¢Ị̈߳¬¿ÉÒÔͨ¹ý¶ÔÓ¦µÄº¯ÊýÌá¸ßËüµÄÖµ£¬»òÕßͨ¹ýÐ޸IJÎÊýspark.default.parallelismÀ´Ìá¸ßÕâ¸öĬÈÏÖµ¡£

Task Launching Overheads
ͨ¹ý½øÐеÄÈÎÎñÌ«¶àÒ²²»ºÃ£¬±ÈÈçÿÃë50¸ö£¬·¢ËÍÈÎÎñµÄ¸ºÔؾͻá±äµÃºÜÖØÒª£¬ºÜÄÑʵÏÖѹÃ뼶µÄʱÑÓÁË£¬µ±È»¿ÉÒÔͨ¹ýѹËõÀ´½µµÍÅú´ÎµÄ´óС¡£

Setting the Right Batch Size
ҪʹÁ÷³ÌÐòÄÜÔÚ¼¯ÈºÉÏÎȶ¨µÄÔËÐУ¬ÒªÊ¹´¦ÀíÊý¾ÝµÄËٶȸúÉÏÊý¾ÝÁ÷ÈëµÄËٶȡ£×îºÃµÄ·½Ê½¼ÆËãÕâ¸öÅúÁ¿µÄ´óС£¬ÎÒÃÇÊ×ÏÈÉèÖÃbatch sizeΪ5-10ÃëºÍÒ»¸öºÜµÍµÄÊý¾ÝÊäÈëËٶȡ£È·ÊµÏµÍ³ÄܸúÉÏÊý¾ÝµÄËٶȵÄʱºò£¬ÎÒÃÇ¿ÉÒÔ¸ù¾Ý¾­ÑéÉèÖÃËüµÄ´óС£¬Í¨¹ý²é¿´ÈÕÖ¾¿´¿´Total delayµÄ¶à³¤Ê±¼ä¡£Èç¹ûdelayµÄСÓÚbatchµÄ£¬ÄÇôϵͳ¿ÉÒÔÎȶ¨£¬Èç¹ûdelayÒ»Ö±Ôö¼Ó£¬ËµÃ÷ϵͳµÄ´¦ÀíËٶȸú²»ÉÏÊý¾ÝµÄÊäÈëËٶȡ£

24/7 Operation
SparkĬÈϲ»»áÍü¼ÇÔªÊý¾Ý£¬±ÈÈçÉú³ÉµÄRDD£¬´¦ÀíµÄstages£¬µ«ÊÇSpark StreamingÊÇÒ»¸ö24/7µÄ³ÌÐò£¬ËüÐèÒªÖÜÆÚÐÔµÄÇåÀíÔªÊý¾Ý£¬Í¨¹ýspark.cleaner.ttlÀ´ÉèÖᣱÈÈçÎÒÉèÖÃËüΪ600£¬µ±³¬¹ý10·ÖÖÓµÄʱºò£¬Spark¾Í»áÇå³þËùÓÐÔªÊý¾Ý£¬È»ºó³Ö¾Ã»¯RDDs¡£µ«ÊÇÕâ¸öÊôÐÔÒªÔÚSparkContext ´´½¨Ö®Ç°ÉèÖá£

µ«ÊÇÕâ¸öÖµÊǺÍÈκεÄwindow²Ù×÷°ó¶¨¡£Spark»áÒªÇóÊäÈëÊý¾ÝÔÚ¹ýÆÚÖ®ºó±ØÐë³Ö¾Ã»¯µ½ÄÚ´æµ±ÖУ¬ËùÒÔ±ØÐëÉèÖÃdelayµÄÖµÖÁÉÙºÍ×î´óµÄwindow²Ù×÷Ò»Ö£¬Èç¹ûÉèÖÃСÁË£¬¾Í»á±¨´í¡£

Monitoring
³ýÁËSparkÄÚÖõļà¿ØÄÜÁ¦£¬»¹¿ÉÒÔStreamingListenerÕâ¸ö½Ó¿ÚÀ´»ñÈ¡Åú´¦ÀíµÄʱ¼ä, ²éѯʱÑÓ, È«²¿µÄ¶Ëµ½¶ËµÄÊÔÑé¡£

Memory Tuning
Spark StreamĬÈϵÄÐòÁл¯·½Ê½ÊÇStorageLevel.MEMORY_ONLY_SER£¬¶ø²»ÊÇRDDµÄStorageLevel.MEMORY_ONLY¡£
ĬÈϵģ¬ËùÓг־û¯µÄRDD¶¼»áͨ¹ý±»SparkµÄLRUËã·¨ÌÞ³ý³öÄڴ棬Èç¹ûÉèÖÃÁËspark.cleaner.ttl£¬¾Í»áÖÜÆÚÐÔµÄÇåÀí£¬µ«ÊÇÕâ¸ö²ÎÊýÉèÖÃÒªºÜ½÷É÷¡£Ò»¸ö¸üºÃµÄ·½·¨ÊÇÉèÖÃspark.streaming.unpersistΪtrue£¬Õâ¾ÍÈÃSparkÀ´¼ÆËãÄÄЩRDDÐèÒª³Ö¾Ã»¯£¬ÕâÑùÓÐÀûÓÚÌá¸ßGCµÄ±íÏÖ¡£
ÍƼöʹÓÃconcurrent mark-and-sweep GC£¬ËäÈ»ÕâÑù»á½µµÍϵͳµÄÍÌÍÂÁ¿£¬µ«ÊÇÕâÑùÓÐÖúÓÚ¸üÎȶ¨µÄ½øÐÐÅú´¦Àí¡£

Fault-tolerance PropertiesFailure of a Worker Node
ÏÂÃæÓÐÁ½ÖÖʧЧµÄ·½Ê½£º
1.ʹÓÃhdfsÉϵÄÎļþ£¬ÒòΪhdfsÊÇ¿É¿¿µÄÎļþϵͳ£¬ËùÒÔ²»»áÓÐÈκεÄÊý¾ÝʧЧ¡£
2.Èç¹ûÊý¾ÝÀ´Ô´ÊÇÍøÂ磬±ÈÈçKafkaºÍFlume£¬ÎªÁË·ÀֹʧЧ£¬Ä¬ÈÏÊÇÊý¾Ý»á±£´æµ½2¸ö½ÚµãÉÏ£¬µ«ÊÇÓÐÒ»ÖÖ¿ÉÄÜÐÔÊǽÓÊÜÊý¾ÝµÄ½Úµã¹ÒÁË£¬ÄÇôÊý¾Ý¿ÉÄܻᶪʧ£¬ÒòΪËü»¹Ã»À´µÃ¼°°ÑÊý¾Ý¸´ÖƵ½ÁíÍâÒ»¸ö½Úµã¡£

Failure of the Driver Node
ΪÁËÖ§³Ö24/7²»¼ä¶ÏµÄ´¦Àí£¬SparkÖ§³ÖÇý¶¯½ÚµãʧЧºó£¬ÖØлָ´¼ÆËã¡£Spark Streaming»áÖÜÆÚÐÔµÄдÊý¾Ýµ½hdfsϵͳ£¬¾ÍÊÇÇ°ÃæµÄ¼ì²éµãµÄÄǸöĿ¼¡£Çý¶¯½ÚµãʧЧ֮ºó£¬StreamingContext¿ÉÒÔ±»»Ö¸´µÄ¡£

ΪÁËÈÃÒ»¸öSpark Streaming³ÌÐòÄܹ»±»»Ø¸´£¬ËüÐèÒª×öÒÔϲÙ×÷£º
£¨1£©µÚÒ»´ÎÆô¶¯µÄʱºò£¬´´½¨ StreamingContext£¬´´½¨ËùÓеÄstreams£¬È»ºóµ÷ÓÃstart()·½·¨¡£
£¨2£©»Ö¸´ºóÖØÆôµÄ£¬±ØÐëͨ¹ý¼ì²éµãµÄÊý¾ÝÖØд´½¨StreamingContext¡£

ÏÂÃæÊÇÒ»¸öʵ¼ÊµÄÀý×Ó£º
ͨ¹ýStreamingContext.getOrCreateÀ´¹¹ÔìStreamingContext£¬¿ÉÒÔʵÏÖÉÏÃæËù˵µÄ¡£

  1. // Function to create and setup a new StreamingContext
  2. def functionToCreateContext(): StreamingContext = {
  3.     val ssc = new StreamingContext(...)   // new context
  4.     val lines = ssc.socketTextStream(...) // create DStreams
  5.     ...
  6.     ssc.checkpoint(checkpointDirectory)   // set checkpoint directory
  7.     ssc
  8. }
  9. // Get StreaminContext from checkpoint data or create a new one
  10. val context = StreamingContext.getOrCreate(checkpointDirectory, functionToCreateContext _)
  11. // Do additional setup on context that needs to be done,
  12. // irrespective of whether it is being started or restarted
  13. context. ...
  14. // Start the context
  15. context.start()
  16. context.awaitTermination()
¸´ÖÆ´úÂë


ÔÚstand-aloneµÄ²¿ÊðģʽÏÂÃ棬Çý¶¯½ÚµãʧЧÁË£¬Ò²¿ÉÒÔ×Ô¶¯»Ö¸´£¬ÈñðµÄÇý¶¯½ÚµãÌæ´úËü¡£Õâ¸ö¿ÉÒÔÔÚ±¾µØ½øÐвâÊÔ£¬ÔÚÌá½»µÄʱºò²ÉÓÃsuperviseģʽ£¬µ±Ìá½»Á˳ÌÐòÖ®ºó£¬Ê¹ÓÃjps²é¿´½ø³Ì£¬¿´µ½ÀàËÆDriverWrapper¾ÍɱËÀËü£¬Èç¹ûÊÇʹÓÃYARNģʽµÄ»°¾ÍµÃʹÓÃÆäËü·½Ê½À´ÖØÐÂÆô¶¯ÁË¡£
ÕâÀï˳±ãÌáÒ»ÏÂÏò¿Í»§¶ËÌá½»³ÌÐò°É£¬Ö®Ç°×ܽáµÄʱºò°ÑÕâ¿é¸øÂäÏÂÁË¡£

  1. ./bin/spark-class org.apache.spark.deploy.Client launch
  2.    [client-options] \
  3.    <cluster-url> <application-jar-url> <main-class> \
  4.    [application-options]
  5. cluster-url: masterµÄµØÖ·.
  6. application-jar-url: jar°üµÄµØÖ·£¬×îºÃÊÇhdfsÉϵÄ,´øÉÏhdfs£º//...·ñÔòÒªËùÓеĽڵãµÄĿ¼Ï¶¼ÓÐÕâ¸öjarµÄ
  7. main-class: Òª·¢²¼µÄ³ÌÐòµÄmainº¯ÊýËùÔÚÀà.
  8. Client Options:
  9. --memory <count> (Çý¶¯³ÌÐòµÄÄڴ棬µ¥Î»ÊÇMB)
  10. --cores <count> (ΪÄãµÄÇý¶¯³ÌÐò·ÖÅä¶àÉÙ¸öºËÐÄ)
  11. --supervise (½ÚµãʧЧµÄʱºò£¬ÊÇ·ñÖØÐÂÆô¶¯Ó¦ÓÃ)
  12. --verbose (´òÓ¡ÔöÁ¿µÄÈÕÖ¾Êä³ö)
¸´ÖÆ´úÂë


ÔÚδÀ´µÄ°æ±¾£¬»áÖ§³ÖËùÓеÄÊý¾ÝÔ´µÄ¿É»Ö¸´ÐÔ¡£


ΪÁ˸üºÃµÄÀí½â»ùÓÚHDFSµÄÇý¶¯½ÚµãʧЧ»Ö¸´£¬ÏÂÃæÓÃÒ»¸ö¼òµ¥µÄÀý×ÓÀ´ËµÃ÷£º

  1. Time Number of lines in input file Output without driver failure Output with driver failure
  2. ¡¡¡¡10 ¡¡¡¡¡¡¡¡¡¡¡¡¡¡¡¡¡¡¡¡¡¡¡¡¡¡¡¡¡¡¡¡¡¡10 ¡¡¡¡¡¡¡¡¡¡¡¡¡¡¡¡¡¡¡¡¡¡¡¡¡¡¡¡¡¡¡¡10
  3. ¡¡¡¡20 ¡¡¡¡¡¡¡¡¡¡¡¡¡¡¡¡¡¡¡¡¡¡¡¡¡¡¡¡¡¡¡¡¡¡20 ¡¡¡¡¡¡¡¡¡¡¡¡¡¡¡¡¡¡¡¡¡¡¡¡¡¡¡¡¡¡¡¡20
  4. ¡¡¡¡30 ¡¡¡¡¡¡¡¡¡¡¡¡¡¡¡¡¡¡¡¡¡¡¡¡¡¡¡¡¡¡¡¡¡¡30 ¡¡¡¡¡¡¡¡¡¡¡¡¡¡¡¡¡¡¡¡¡¡¡¡¡¡¡¡¡¡¡¡30
  5. ¡¡¡¡40 ¡¡¡¡¡¡¡¡¡¡¡¡¡¡¡¡¡¡¡¡¡¡¡¡¡¡¡¡¡¡¡¡¡¡40 ¡¡¡¡¡¡¡¡¡¡¡¡¡¡¡¡¡¡¡¡¡¡¡¡¡¡¡¡¡¡¡¡[DRIVER FAILS] no output
  6. ¡¡¡¡50 ¡¡¡¡¡¡¡¡¡¡¡¡¡¡¡¡¡¡¡¡¡¡¡¡¡¡¡¡¡¡¡¡¡¡50 ¡¡¡¡¡¡¡¡¡¡¡¡¡¡¡¡¡¡¡¡¡¡¡¡¡¡¡¡¡¡¡¡no output
  7. ¡¡¡¡60 ¡¡¡¡¡¡¡¡¡¡¡¡¡¡¡¡¡¡¡¡¡¡¡¡¡¡¡¡¡¡¡¡¡¡60 ¡¡¡¡¡¡¡¡¡¡¡¡¡¡¡¡¡¡¡¡¡¡¡¡¡¡¡¡¡¡¡¡no output
  8. ¡¡¡¡70 ¡¡¡¡¡¡¡¡¡¡¡¡¡¡¡¡¡¡¡¡¡¡¡¡¡¡¡¡¡¡¡¡¡¡70 ¡¡¡¡¡¡¡¡¡¡¡¡¡¡¡¡¡¡¡¡¡¡¡¡¡¡¡¡¡¡¡¡[DRIVER RECOVERS] 40, 50, 60, 70
  9. ¡¡¡¡80 ¡¡¡¡¡¡¡¡¡¡¡¡¡¡¡¡¡¡¡¡¡¡¡¡¡¡¡¡¡¡¡¡¡¡80 ¡¡¡¡¡¡¡¡¡¡¡¡¡¡¡¡¡¡¡¡¡¡¡¡¡¡¡¡¡¡¡¡80
  10. ¡¡¡¡90 ¡¡¡¡¡¡¡¡¡¡¡¡¡¡¡¡¡¡¡¡¡¡¡¡¡¡¡¡¡¡¡¡¡¡90 ¡¡¡¡¡¡¡¡¡¡¡¡¡¡¡¡¡¡¡¡¡¡¡¡¡¡¡¡¡¡¡¡90
  11.   ¡¡100 ¡¡¡¡¡¡¡¡¡¡¡¡¡¡¡¡¡¡¡¡¡¡¡¡¡¡¡¡¡¡¡¡   100 ¡¡¡¡¡¡¡¡¡¡¡¡¡¡¡¡¡¡¡¡¡¡¡¡ ¡¡¡¡¡¡ 100
¸´ÖÆ´úÂë






ÔÚ4µÄʱºò³öÏÖÁË´íÎó£¬40,50,60¶¼Ã»ÓÐÊä³ö£¬µ½70µÄʱºò»Ö¸´ÁË£¬»Ö¸´Ö®ºó°Ñ֮ǰûÊä³öµÄÒ»ÏÂ×ÓÈ«²¿Êä³ö¡£



á¯Óñº£

ÒÑÓÐ(15)ÈËÆÀÂÛ

Ìøתµ½Ö¸¶¨Â¥²ã
hlyz2008 ·¢±íÓÚ 2015-4-21 09:49:57
¸Ðл¥Ö÷¿¶¿®·ÖÏí¡£
»Ø¸´

ʹÓõÀ¾ß ¾Ù±¨

GreenArrow ·¢±íÓÚ 2014-8-24 22:17:12
²»´í²»´í£¬Ð»Ð»·ÖÏí
»Ø¸´

ʹÓõÀ¾ß ¾Ù±¨

zhujun182104906 ·¢±íÓÚ 2015-3-19 22:59:50
»Ø¸´

ʹÓõÀ¾ß ¾Ù±¨

YLV ·¢±íÓÚ 2015-4-29 17:00:25
лл¥Ö÷·ÖÏí
»Ø¸´

ʹÓõÀ¾ß ¾Ù±¨

evababy ·¢±íÓÚ 2015-6-15 09:50:55
TCP·½Ê½ÈçºÎÓôúÂëд¿Í»§¶ËºÍ·þÎñÆ÷¶ËÄØ£¿²âÊÔÁ˺þÃÖ»ÄÜ»ùÓÚnc -lk  £¬Çó´óÉñÖ¸µã

²âÊÔ¹ý³Ì
»Ø¸´

ʹÓõÀ¾ß ¾Ù±¨

СÇØÇí ·¢±íÓÚ 2015-7-17 15:50:15
ÕâƪÎÄÕ²»´í¡£
»Ø¸´

ʹÓõÀ¾ß ¾Ù±¨

zhangblh ·¢±íÓÚ 2015-9-14 15:06:19
ÐÂÈËѧϰһÏÂ
»Ø¸´

ʹÓõÀ¾ß ¾Ù±¨

12ÏÂÒ»Ò³
ÄúÐèÒªµÇ¼ºó²Å¿ÉÒÔ»ØÌû µÇ¼ | Á¢¼´×¢²á

±¾°æ»ý·Ö¹æÔò

¹Ø±Õ

ÍƼöÉÏÒ»Ìõ /2 ÏÂÒ»Ìõ