In this blog post, I will discuss best practices for YARN resource management. The fundamental idea of MRv2(YARN) is to split up the two major functionalities—resource management and job scheduling/monitoring, into separate daemons. The idea is to have a global ResourceManager (RM) and per-application ApplicationMaster (AM).
The ResourceManager(RM) and per-node slave, the NodeManager (NM), form the data-computation framework. The ResourceManager is the ultimate authority that arbitrates resources among all the applications in the system.
Please read the Hadoop Documentation under YARN concept and architecture first before reading this article.
This blog post covers the following topics regarding YARN resource management, and also provides best practices for each topic:
n a MapR Hadoop cluster, warden sets the default resource allocation for the operating system, MapR-FS, MapR Hadoop services, and MapReduce v1 and YARN applications. Details are described in MapR documentation: Resource Allocation for Jobs and Applications.
YARN can manage 3 system resources— memory, CPU and disks. Once warden finishes calculations, it will set environment variable YARN_NODEMANAGER_OPTS for starting NM.
For example, if you “vi /proc/<pid_for_NM>/environ” you can find the settings below:
YARN_NODEMANAGER_OPTS= -Dnodemanager.resource.memory-mb=10817 -Dnodemanager.resource.cpu-vcores=4 -Dnodemanager.resource.io-spindles=2.0
They can be overridden by setting the three configurations below in yarn-site.xml on NM nodes and restarting NM.
To view the available resources from each node, you can go to RM UI(http://<IP_of_RM>:8088/cluster/nodes), and find out the “Mem Avail”, “Vcores Avail” and “Disk Avail” from each node.
In this step, make sure warden fully considers all services for resource allocation because some services do not have dedicated parameters in warden.conf, e.g., Drill and Impala. If you plan to allocate 10% of total memory for Drill and 5% for Impala on this node, please carve out those 15% memory to parameters: service.command.os.heapsize.percent/max/min.
If memory are over allocated to YARN, huge swap may be used and kernel OOM killer may be triggered to kill the container process.
Below error is a sign of OS OOM and probably memory is over allocated to YARN.
os::commit_memory(0x0000000000000000, xxxxxxxxx, 0) failed; error=’Cannot allocate memory’ (errno=12)
If we see that, just double check if warden takes into account all memory consumed services on that node, and reduce the memory allocated by warden if needed.
Two resources—memory and CPU, as of in Hadoop 2.5.1, have minimum and maximum allocation unit in YARN, as set by the configurations below in yarn-site.xml.
Basically, it means RM can only allocate memory to containers in increments of "yarn.scheduler.minimum-allocation-mb" and not exceed "yarn.scheduler.maximum-allocation-mb";
And it can only allocate CPU vcores to containers in increments of "yarn.scheduler.minimum-allocation-vcores" and not exceed "yarn.scheduler.maximum-allocation-vcores".
If changes required, set above configurations in yarn-site.xml on RM nodes, and restart RM.
For example, if one job is asking for 1025 MB memory per map container(set mapreduce.map.memory.mb=1025), RM will give it one 2048 MB(2*yarn.scheduler.minimum-allocation-mb) container.
If you have a huge MR job which asks for a 9999 MB map container, the job will be killed with the error message below in the AM log:
MAP capability required is more than the supported max container capability in the cluster. Killing the Job. mapResourceRequest: 9999 maxContainerCapability:8192
If a Spark on YARN job asks for a huge container with size larger than "yarn.scheduler.maximum-allocation-mb", the error below will show up:
Exception in thread "main" java.lang.IllegalArgumentException: Required executor memory (99999+6886 MB) is above the max threshold (8192 MB) of this cluster!
In the above two cases, you can increase “yarn.scheduler.maximum-allocation-mb” in yarn-site.xml and restart RM.
So in this step, you need to be familiar with the lower and upper bound of resource requirements for each mapper and reducer of the jobs and set the minimum and maximum allocation unit according to that.
NodeManager can monitor the memory usage(virtual and physical) of the container. If its virtual memory exceeds “yarn.nodemanager.vmem-pmem-ratio” times the "mapreduce.reduce.memory.mb" or "mapreduce.map.memory.mb", then the container will be killed if “yarn.nodemanager.vmem-check-enabled” is true;
If its physical memory exceeds "mapreduce.reduce.memory.mb" or "mapreduce.map.memory.mb", the container will be killed if “yarn.nodemanager.pmem-check-enabled” is true.
The parameters below can be set in yarn-site.xml on each NM nodes to override the default behavior.
This is a sample error for a container killed by virtual memory checker:
Current usage: 347.3 MB of 1 GB physical memory used; <font color="red">2.2 GB of 2.1 GB virtual memory used</font>. Killing container.
And this is a sample error for physical memory checker:
Current usage: <font color="red">2.1gb of 2.0gb physical memory used</font>; 1.1gb of 3.15gb virtual memory used. Killing container.
As in Hadoop 2.5.1 of MapR 4.1.0, virtual memory checker is disabled while physical memory checker is enabled by default.
Since on Centos/RHEL 6 there are aggressive allocation of virtual memory due to OS behavior, you should disable virtual memory checker or increase yarn.nodemanager.vmem-pmem-ratio to a relatively larger value.
f the above errors occur, it is also possible that the MapReduce job has memory leaking or the memory for each container is just not enough. Try to check the application logic and also tune the container memory request—"mapreduce.reduce.memory.mb" or "mapreduce.map.memory.mb".
MapReduce v2 job has 3 different container types—Mapper, Reducer and AM.
Mapper and Reducer can ask for resources—memory, CPU and disk, while AM can only ask for memory and CPU.
Below are a summary of the configurations of resource requests for the three container types.
The default values are from Hadoop 2.5.1 of MapR 4.1, and they can be overridden in mapred-site.xml on the client node or set in applications like MapReduce java code, Pig and Hive Cli,etc.
Each container is actually a JVM process, and above “-Xmx” of java-opts should fit in the allocated memory size. One best practice is to set it to 0.8 * (container memory allocation). For example, if the requested mapper container has mapreduce.map.memory.mb=4096, we can set mapreduce.map.java.opts=-Xmx3277m.
There are many factors which can affect the memory requirement for each container. Such factors include the number of Mappers/Reducers, the file type(plain text file , parquet, ORC), data compression algorithm, type of operations(sort, group-by, aggregation, join), data skew, etc. You should be familiar with the nature of this MapReduce job and figure out the minimum requirement for Mapper,Reducer and AM. Any type of the container can run out of memory and be killed by physical/virtual memory checker, if it doesn't meet the minimum memory requirement. If so, you need to check the AM log and the failed container log to find out the cause.
For example, if the MapReduce job sorts parquet files, Mapper needs to cache the whole Parquet row group in memory. I have done tests to prove that the larger the row group size of parquet files is, the larger Mapper memory is needed. In this case, make sure the Mapper memory is large enough without triggering OOM.
Another example is AM running out of memory. Normally, AM’s 1G java heap size is enough for many jobs. However, if the job is to write lots of parquet files, during commit phase of the job, AM will call ParquetOutputCommitter.commitJob(). It will first read footers of all output parquet files, and write the metadata file named “_metadata” in output directory.
This step may cause AM being out of memory with below stacktrace in AM log:
Caused by: <font color="red">java.lang.OutOfMemoryError</font>: GC overhead limit exceeded at java.lang.StringCoding$StringEncoder.encode(StringCoding.java:300) at java.lang.StringCoding.encode(StringCoding.java:344) at java.lang.String.getBytes(String.java:916) at parquet.org.apache.thrift.protocol.TCompactProtocol.writeString(TCompactProtocol.java:298) at parquet.format.ColumnChunk.write(ColumnChunk.java:512) at parquet.format.RowGroup.write(RowGroup.java:521) at parquet.format.FileMetaData.write(FileMetaData.java:923) at parquet.format.Util.write(Util.java:56) at parquet.format.Util.writeFileMetaData(Util.java:30) at parquet.hadoop.ParquetFileWriter.serializeFooter(ParquetFileWriter.java:322) at parquet.hadoop.<font color="red">ParquetFileWriter.writeMetadataFile</font>(ParquetFileWriter.java:342) at parquet.hadoop.<font color="red">ParquetOutputCommitter.commitJob</font>(ParquetOutputCommitter.java:51) ... 10 more
The solution is to increase the memory requirement for AM and disable this parquet feature by “set parquet.enable.summary-metadata false”.
Besides figuring out the minimum memory requirement for each container, sometimes we need to balance the job performance and resource capacity. For example, jobs doing sorting may need a relatively larger “mapreduce.task.io.sort.mb” to avoid or reduce the number of spilling files. If the whole system has enough memory capacity, we can increase both “mapreduce.task.io.sort.mb” and container memory to get better job performance.
In this step, we need to make sure each type of container meets proper resource requirements. If OOM happens, always check AM logs first to figure out which container and what is the cause per stack trace.
Since there are three types of resources, different containers from different jobs may ask for different amount of resources. This can result in one of the resources becoming the bottleneck. Suppose we have a cluster with capacity (1000G RAM,16 Cores,16 disks) and each Mapper container needs (10G RAM,1 Core, 0.5 disks): at most, 16 Mappers can run in parallel because CPU cores become the bottleneck here.
As a result, (840G RAM, 8 disks) resources are not used by anyone. If you meet this situation, just check the RM UI(http://<ip_of_rm>:8088/cluster/nodes) to figure out which resource is the bottleneck. You can probably allocate the leftover resources to jobs which can improve performance with such resource. For example, you can allocate more memory to sorting jobs which used to spill to disk.</ip_of_rm>
In this blog post, you’ve learned best practices for YARN resource management. If you have any further questions, please ask them in the comments section below.
Stay ahead of the bleeding edge...get the best of Big Data in your inbox.