Tag Archives: YARN

Spark Error: Failed to Send RPC to Datanode

This past week we had quite few issues with users not being able to run Spark jobs running in YARN Cluster mode. Particularly a team that was on tight schedule used to get errors like this all the time:

java.io.IOException: Failed to send RPC 8277242275361198650 to datanode-055: java.nio.channels.ClosedChannelException

Mostly accompanied by error messages like:

org.apache.spark.SparkException: Error sending message [message = Heartbeat(9,[Lscala.Tuple2;@e47ba81,BlockManagerId(9, datanode-50 , 43381))]

ERROR Executor: Exit as unable to send heartbeats to driver more than 60 times

These errors basically mean the connection between Spark driver and executors are broken, mainly because executor is killed. This could happen because of a number of reasons:

  1. We realized this happens a lot more often when our cluster is too busy and has hit maximum usage. What it means is that executors are accepted to DataNodes, but they fail to acquire enough memory on the datanode and therefore get killed.
  2. Metaspace attempts to grow beyond the executor(JVM) memory limits, resulting in loss of executors.The best way to stop this error from appearing is to set below properties when launching Spark-Shell or submitting application using spark-submit:spark.driver.extraJavaOptions = -XX:ReservedCodeCacheSize=100M

    -XX:MaxMetaspaceSize=256m

    -XX:CompressedClassSpaceSize=256m

    spark.executor.extraJavaOptions = -XX:ReservedCodeCacheSize=100M

    -XX:MaxMetaspaceSize=256m

    -XX:CompressedClassSpaceSize=256m

    Please note that depending on your project and code, you may need to increase the values mentioned above.

  3. Network is slow for whatever reason. In our case, this was caused by a change in DNS which resulted in turning off caching.This case could be fixed by adjusting spark.executor.heartbeatInterval and spark.network.timeout. Default values for these 2 parameters are 10s and 120s. You can adjust these 2 values based on how your network, the only point to consider here is that the later property, spark.network.timeout, should be greater than the first one.

If none of what mentioned above helps your situation, then it is something you need to take to your cluster’s administrator. There could be something wrong with the datanodes where executors are sent to that admins are not aware of.

Happy coding!

Advertisements

YARN Capacity Scheduler: Queue Priority

Capacity Scheduler is designed to run Hadoop jobs in a shared, multi-tenant cluster in a friendly manner. Its main strength is that it guarantees specific capacity for a certain group of users by supporting multiple queues and allowing users to submit their queries into their dedicated queues. Each queue is given a fraction of total cluster capacity (RAM and CPU) and all jobs submitted to a queue will have access to the capacity dedicated to that queue.

Queue priority in Capacity Scheduler is implemented by assigning higher/lower capacity to the queues which should have higher/lower priority. Another way of making sure of this arrangement is by setting the maximum percentage of cluster resources each queue can use. Therefore to assign lower priority to a queue we should limit the amount of resource it can use.

Doing so on default queue is a bit tricky, as all the jobs submitted to the platform go through the default queue and get their Application Master Container created in there. It is a very small container that controls application execution and requests resources for YARN job submiited to the cluster. Having said that, we can use another setting in our platform that allows assigning higher priority to more important applications by setting mapred.capacity-scheduler.queue.<queue-name>.supports-priority.

To see the settings of each queue in the cluster, you should navigate to resource manager’s web UI and click on Scheduler from left menu. Then click the arrow on the left hand side of each queue to expand the settings. The 2 most important settings to check are Absolute Capacity (Queue capacity in percentage) and Absolute Max Capacity (Maximum queue capacity in percentage (%) as a float. This limits the elasticity for applications in the queue):

C Scheduler