Get most out of Spark on YARN
Published in: Technology
Transcript
- 1. Page 1 © Hortonworks Inc. 2014 Get most out of Spark on YARN Oleg Zhurakousky, Hortonworks @z_oleg
- 2. Page 2 © Hortonworks Inc. 2014 Spark “Apache Spark™ is a general engine for data processing.” val file = spark.textFile("hdfs://...") val counts = file.flatMap(line => line.split(" ")) .map(word => (word, 1)) .reduceByKey(_ + _) counts.saveAsTextFile("hdfs://...") Word Count in Spark’s Scala API
- 3. Page 3 © Hortonworks Inc. 2014 Spark Under the Hood (Hadoop|FlatMapped|Filter|MapPartitions|Shuffled)RDD stage0: (Hadoop|FlatMapped|Filter|MapPartitions)RDD stage1: ShuffledRDD ShuffleMapTask: (flatMap | map) ResultTask: (reduceByKey) ShuffleMapTask: (flatMap | map) Spark API Spark Compiler / Optimizer DAG Runtime Execution Engine Spark Cluster YARN Mesos Client Cluster DAGScheduler, ActiveJob Task Task Task SparkAM
- 4. Page 4 © Hortonworks Inc. 2014 Spark 101 • Spark provides: – API – Task Execution engine – Libraries for SQL, Machine Learning, Graph • Resilient Distributed Dataset (RDD) – Immutable, distributed data collection – 2 Types of RDD Functions – Transformations (e.g. map, filter….) create new RDD – Actions (e.g. collect, count) lead of Spark DAG execution – Spark Transformations are pipelined until actions are called – Spark provides factory methods to create RDDs – From in-memory Collection – from various data sources – E.g. HDFS, Local files – You can create RDD manually as well
- 5. Page 5 © Hortonworks Inc. 2014 Spark 101 - 2 • Spark Driver – Client side application that creates Spark Context • Spark Context – Talks to Spark Driver, Cluster Manager to Launch Spark Executors • Cluster Manager – E.g YARN, Spark Standalone, MESOS • Executors – Spark worker bees
- 6. Page 6 © Hortonworks Inc. 2014 Demo
- 7. Page 7 © Hortonworks Inc. 2014 Extensibility – Why? • Integrate with native features of target system (e.g., YARN Shuffle) • Optimize by benefiting form side effects • KV Grouping • Sorting • Unified security and monitoring • Hybrid execution environment • Streaming and Micro-batching • Cross-context data sharing • Specialization over generalization
- 8. Page 8 © Hortonworks Inc. 2014 Extensibility Spark is a Framework with many extensibility points • RDD • Add, additional operations, optimize existing operations • SparkContext • Execution delegation, hybrid execution environments • ShuffleReaders/ShuffleWriters • Delegate reads/writes to other source/target storage systems • Additional operations • RDD and Context • Data Broadcasting and Caching • Cross context sharing of cached/broadcasted data • More. . .
- 9. Page 9 © Hortonworks Inc. 2014 Demo
- 10. Page 10 © Hortonworks Inc. 2011 – 2014. All Rights Reserved © Hortonworks Inc. 2013 Spark on YARN – Why? Page 10 Workloads Run Natively IN Hadoop HDFS (Redundant, Reliable Storage) YARN (Cluster Resource Management) BATCH (MapReduce) SQL (Tez) STREAMING (Storm, S4,…) GRAPH (Giraph) SPARK HPC MPI (OpenMPI) ONLINE (HBase) OTHER (Search) (Weave…) Run the Spark workload IN Hadoop Instead of shipping the data to the code with Predictable Performance and Quality of Service
- 11. Page 11 © Hortonworks Inc. 2011 – 2014. All Rights Reserved Spark On YARN Benefits Ship the code to the data instead of the other way around Efficient locality-aware access to data stored in HDFS Leverage existing Hadoop clusters Single provisioning, management, monitoring story Simplified deployment and operations Scale-up for production use with minimal IT involvement Secure, multi-tenant workload management
- 12. Page 12 © Hortonworks Inc. 2011 – 2014. All Rights Reserved Demo
- 13. Page 13 © Hortonworks Inc. 2011 – 2014. All Rights Reserved Spark on YARN – Key Features Flexible deployment options Support for secure mode Monitoring and metrics Distributed cache and local resource management
- 14. Page 15 © Hortonworks Inc. 2011 – 2014. All Rights Reserved Integration with Hadoop Security • Kerberos support - kinit & submit: • kinit -kt /etc/security/keytabs/myuser.headless.keytab myuser • HiveContext - accessing the Hive Metastore in secure mode • Minimum configuration: hive.metastore.uris • Be careful with extra configuration, as spark cannot support some of hive configurations. • Need the patch Spark-5111 for basic Kerberos setup working against Hadoop-2.6 (connecting to hive metastore). • Spark ThriftServer: • Minimum configuration: Kerberos keytab/principal, sasl, authorization, • Spark thrift server has to be co-located with hive thrift server. • Spark user has to be able to access the keytab of hive.
- 15. Page 17 © Hortonworks Inc. 2011 – 2014. All Rights Reserved Application Timeline Service (ATS) Integration
- 16. Page 19 © Hortonworks Inc. 2011 – 2014. All Rights Reserved OrcFile support • saveAsOrcFile • save the table into ORC Format File • orcFile • Read orc format file as table source. • Other features: • Column pruning, self-contained schema support, predicate push down, different compression method. • External data source API: • Refer to latest PR in the JIRA(SPARK-2883) with the support. • Import org.apache.spark.sql.hive.orc._ • Operate under HiveContext Tech Preview in HDP 2.2, targeted for rewrite to Spark Data Source API
- 17. Page 20 © Hortonworks Inc. 2014 Spark on YARN - Beyond the basics - Multi-tenancy & workload management
- 18. Page 21 © Hortonworks Inc. 2011 – 2014. All Rights Reserved Key Themes • Security: Kerberos Token Renewal • Log Aggregation: View logs for running applications • Fault Tolerance: AM Retry and Container keep alive • Service Registry: Directory for services running in YARN Long Running Services Support THEME • CPU Scheduling • CPU Isolation through CGroups • Node Labels for scheduling constraints Workload Mgmt THEME • YARN Rolling Upgrades support • Work Preserving Restart • Timeline Server support in Secure clusters Reliable and Secure Operations THEME
- 19. Page 22 © Hortonworks Inc. 2011 – 2014. All Rights Reserved Enhancements to Support Long Running Services • YARN-941: YARN updates Kerberos token for a Long Running Service after the token expires Security Capability • YARN-2468: Aggregate and capture logs during the lifetime of a Long Running Service Log Aggregation Capability • YARN-1489: When ApplicationMaster(AM) restarts, do not kill all associated containers – reconnect to the AM • YARN-611/YARN-614: Tolerate AM failures for Long Running Services Fault Tolerance Capability • YARN-913: Service Registry that publishes host and ports that each service comes up on Service Registry Capability
- 20. Page 23 © Hortonworks Inc. 2011 – 2014. All Rights Reserved Node Labels: Apply Node Constraints A App L1 L1 L1 Deploy/Allocate B App L1 L1 L1 Isolate A A A nodes labels
- 21. Page 24 © Hortonworks Inc. 2011 – 2014. All Rights Reserved CPU Scheduling What • Admin tells YARN how much CPU capacity is available in cluster • Applications specify CPU capacity needed for each container • YARN Capacity Scheduler schedules application taking CPU capacity availability into account Why • Applications (for example Storm, HBase, Machine Learning) need predictable access to CPU as a resource • CPU has become bottleneck instead of memory in certain clusters (128 GB RAM, 6 CPUs)
- 22. Page 25 © Hortonworks Inc. 2011 – 2014. All Rights Reserved CGroup Isolation What • Admin enables CGroups for CPU Isolation for YARN application workloads Why • Applications need guaranteed access to CPU resources • To ensure SLAs, need to enforce CPU allocations given to an Application container
- 23. Page 26 © Hortonworks Inc. 2011 – 2014. All Rights Reserved Default Queue Mapping What • Admin can define a default queue per user or group • If no queue is specified for an application, YARN Capacity Scheduler will use the user/group’s default queue for application Why • Queues are required for enforcing SLAs, make it easy to utilize queues • Users and Applications want to submit Yarn apps/jobs and not have to specify queue
- 24. Page 27 © Hortonworks Inc. 2011 – 2014. All Rights Reserved ResourceManager Rest API What • Submit YARN applications through REST • Get YARN application status through REST • Kill YARN application status through REST Why • External Applications need to interact (schedule/monitor) with YARN without needing to embed Java library • Enable administration from a remote system
- 25. Page 28 © Hortonworks Inc. 2014 When Things go wrong • Where to look – yarn application –list (get the list of running application) – yarn logs -applicationId <app_id> – Check Spark Environment : http://<host>:8088/proxy/<job_id>/environment/ • Common Issues – Submitted a job but nothing happens – Job stays in accepted state when allocated more memory/cores than is available – May need to kill unresponsive/stale jobs – Insufficient HDFS access – May lead to failure such as “Loading data to table default.testtable Failed with exception Unable to move sourcehdfs://red1:8020/tmp/hive-spark/hive_2015-03-04_12-45- 42_404_3643812080461575333-1/-ext-10000/kv1.txt to destination hdfs://red1:8020/apps/hive/warehouse/testtable/kv1.txt” – Wrong host in Beeline, shows error as invalid URL – “Error: Invalid URL: jdbc:hive2://localhost:10001 (state=08S01,code=0)” – Error about closed SQLContext, restart Thrift Server Grant user/group necessary HDF
- 26. Page 29 © Hortonworks Inc. 2014 Thank you!