Preparing Spark Streaming Applications for Production

The translation of the material was prepared as part of the online course "Ecosystem Hadoop, Spark, Hive".



We invite everyone to a free webinar "Testing Spark Applications" . In an open lesson, we will consider the problems in testing Spark applications: stat data, partial verification and start / stop of heavy systems. Let's study the libraries for the solution and write tests.






 Apache Spark  . : , .





Spark Streaming

Clairvoyant , - . , , , . ,  Apache NifiApache FlumeApache Flink .  — Spark Streaming.





Spark Streaming — Core Apache Spark , . .





 





Spark Streaming —





Spark Streaming . , ( X ) Spark Streaming Spark Engine. Spark - , (Directed Acyclic Graph, DAG) , .





 





Spark Streaming —





Spark Streaming , Spark . .





, Spark Streaming , Spark Streaming, . , :





val sparkConf = new SparkConf().setAppName("DirectKafkaWordCount")
val ssc = new StreamingContext(sparkConf, Seconds(2))

// Create direct kafka stream with brokers and topics
val topicsSet = topics.split(",").toSet
val kafkaParams = Map[String, Object](
  ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG -> brokers,
  ConsumerConfig.GROUP_ID_CONFIG -> groupId,
  ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG -> classOf[StringDeserializer],
  ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG -> classOf[StringDeserializer])
val messages = KafkaUtils.createDirectStream[String, String](
  ssc,
  LocationStrategies.PreferConsistent,
  ConsumerStrategies.Subscribe[String, String]
  (topicsSet, kafkaParams))

// Get the lines, split them into words, count the words and print
val lines = messages.map(_.value)
val words = lines.flatMap(_.split(" "))
val wordCounts = words.map(x => (x, 1L)).reduceByKey(_ + _)
wordCounts.print()

// Start the computation
ssc.start()
ssc.awaitTermination()
      
      



:





  1. StreamingContext



    , 2





  2. Kafka DStream





  3. RDD DStream









  4. StreamingContext







, Apache Kafka .





YARN

, Spark:





  1. JAR ( Python)





  2. spark-submit:





$ spark-submit --class “org.apache.spark.testSimpleApp” --master 
local[4] /path/to/jar/simple-project_2.111.0.jar
      
      



spark-submit master local[4]. , Spark , , .





Spark:





 





Spark —





Spark Driver. (master) , , (  — DAG, Java, Scala Python). (Executor) , , , .





, , Spark Hadoop, YARN (Yet Another Resource Negotiator — « »). , , YARN Spark Driver Executor. YARN master:





Spark 1.6.3





YARN : --master yarn-client





YARN : --master yarn-cluster





Spark 2.0





YARN : --master yarn --deploy-mode client





YARN : --master yarn --deploy-mode cluster





2 : . Spark Driver: . .





YARN





 





YARN  —





Spark Driver - ( , spark-submit). , Spark . Spark. Spark Streaming, .





Spark Streaming — , . , Spark Streaming, ? .





YARN

 





YARN  —





Spark Driver YARN. , - , YARN . , , , .





spark.yarn.maxAppAttempts







. , YARN.





spark.yarn.am.attemptFailuresValidityInterval







Application Master (AM). AM , AM . , .





, :





  • spark.yarn.maxAppAttempts=2





  • spark.yarn.am.attemptFailuresValidityInterval=1h,





.









spark-submit:





$ spark-submit
--class "org.apache.testSimpleApp"
--master yarn
--deploy-mode cluster
--conf spark.yarn.maxAppAttempts=2
--conf spark.yarn.am.attemptFailuresValidityInterval=1h
/path/to/jar/simple-project_2.11-1.0.jar
      
      



:





val sparkConf = new SparkConf()
  .setAppName("App")
  .set("spark.yarn.maxAppAttempts", "1")
  .set("spark.yarn.am.attemptFailuresValidityInterval", "2h")

val ssc = new StreamingContext(sparkConf, Seconds(2))
      
      



, , Spark Streaming , , . .





YARN Spark Streaming :





$ yarn application -kill {ApplicationID}
      
      



, , Spark Streaming?





, : , , .





, , Spark Kafka ( Kafka , ), Spark Streaming , , , .





, , , Spark Streaming , .





:





// Start the computation
ssc.start()
ssc.awaitTermination()  <--- REMOVE THIS LINE
      
      



Spark Streaming :





  1. Spark Streaming: HDFS.





  2. Spark Code: , . , .





  3. : , .





: .





Spark:





 — , , . awaitTermination while. , HDFS. , true while StreamingContext.





, , , . Spark Streaming .









. : http://spark.apache.org/docs/latest/monitoring#metrics





StreamingListener (Spark ≥ 2.1)





Apache Spark 2.1 , Spark Streaming. :





  • onBatchSubmitted





  • onBatchStarted





  • onBatchCompleted





  • onReceiverStarted





  • onReceiverStopped





  • onReceiverError





, . ( , , . .) . , , .





Spark





 





Spark user interface
Spark

Spark Apache Spark . , , : . . , , , . . , Spark Streaming .





, Apache Spark. , RDD DataFrame, , RDD. Spark , RDD DataFrame .





Spark Streaming, , Spark Streaming, — .









, , , , HDFS. , Spark Streaming. , , :





  • . , ;





  • DStream. DStream, ;





  • . , , .





, , updateStateByKey reduceByKeyAndWindow.





:





val checkpointDirectory = "hdfs://..."   // define checkpoint directory

// Function to create and setup a new StreamingContext
def functionToCreateContext(): StreamingContext = {
  val ssc = new StreamingContext(...)   // new context
  val lines = ssc.socketTextStream(...) // create DStreams
  ...
  ssc.checkpoint(checkpointDirectory)   // set checkpoint directory
  ssc  // Return the StreamingContext
}

// Get StreamingContext from checkpoint data or create a new one
val ssc = StreamingContext.getOrCreate(checkpointDirectory, functionToCreateContext)

ssc.start() // Start the context
      
      







.





Spark





Spark .









DStream, , . , , , , .





Kafka

Kafka Spark Streaming, Kafka . , .





Kafka ( ):





kafka-topics --zookeeper <host>:2181 --create --topic <topic-name> --partitions <number-of-partitions> --replication-factor <number-of-replicas>
      
      



 





Kafka —





Kafka, , Kafka. Kafka , . null, .





Kafka —





, Spark Streaming Kafka . «» Spark. Spark Kafka, . Kafka ( ), .





Kafka

Kafka, , 2 : . , : « ?», , , « ?». , .









 





Spark Streaming  —





,  — Spark Streaming ( , Twitter, Kinesis .). . . - , API Kafka Kafka. (Write Ahead Log — WAL). Kafka ( ). WAL, Spark .





: , . , , , WAL.





. , , :





  •  — WAL ;





  • WAL — . , : spark.streaming.receiver.wrteAheadLog.enable=true;





  • StorageLevel WAL — HDFS, , : StorageLevel.MEMROY_AND_DISK_SER.





, Spark Streaming, - . ?





: Kafka , . WAL? : WAL , Kafka . : .





 





Spark Streaming —





WAL, WAL Kafka. , . , . ,  1  2000–2050,  2 —  2051–2100. .





: , . , , .





Kafka

, , Spark Streaming « ». , , , . , Spark Streaming (  — « », « »). Spark Streaming .





, , — Kafka, , Spark Streaming Kafka. , . , .





, . , « ».





, .





 





Kafka —





, Kafka DStream, loadOffsets:





val storedOffsets: Option[mutable.Map[TopicPartition, Long]] = loadOffsets(spark, kuduContext)

val kafkaDStream = storedOffsets match {
    case None =>
        LOGGER.info("storedOffsets was None")
        kafkaParams += ("auto.offset.reset" -> "latest")
        KafkaUtils.createDirectStream[String, Array[Byte]]
            (ssc, PreferConsistent, ConsumerStrategies.Subscribe[String, Array[Byte]]
                (topicsSet, kafkaParams)
        )
    case Some(fromOffsets) =>
        LOGGER.info("storedOffsets was Some(" + fromOffsets + ")")
        kafkaParams += ("auto.offset.reset" -> "none")
        KafkaUtils.createDirectStream[String, Array[Byte]]
            (ssc, PreferConsistent, ConsumerStrategies.Assign[String, Array[Byte]]
                (fromOffsets.keys.toList, kafkaParams, fromOffsets)
        )
}
      
      



, Kudu ( kuduContext), , : Zookeeper, HBase, HDFS, Hive, Impala .





, . , , —





.





, , 30 , 30 .





. , 40 , . 40 , , . , .





, , . Spark.





Spark , . , 10 , 10 , .





Streaming statistics

, , .





(, )





, Spark Streaming. , , . , (join) , .









RDD/DataFrame, RDD/DataFrame .









Kafka , . , Spark.





, Spark Streaming . .









, (  ), , . , :





dstream.repartition(100)
      
      







, . , , , - , 10 , . .





Spark Streaming!


« Hadoop, Spark, Hive»





« Spark »








All Articles