Why YARN?
- Higher scalability: As the central arbiter of the compute cluster, the JobTracker was also responsible for (each of these limited its scalability):
- admission control,
- tracking the liveness of TaskTrackers (to re-execute running tasks or tasks whose output becomes unavailable),
- launching tasks speculatively to route around slow nodes,
- reporting job status to users through a web server,
- recording audit logs and aggregate statistics,
- authenticating users and many other functions;
- Decoupling from MapReduce programming model
YARN
- Scalability
- Multi-tenancy
- Serviceability: decoupling of upgrade dependencies
- Locality awareness is a key requirement for YARN
- High Cluster Utilization is a top priority for YARN
- Reliability/Availability
- Secure and auditable operation
- Support for Programming Model Diversity
- Flexible Resource Model
- Backward compatibility
Reference
Components
Resource Manager (RM)
- central authority arbitrating resources: User would try to run task once and determine how much a resource (e.g., memory) would be used in the future jobs
- dynamically allocates leases: containers to applications
- it enforce global properties, ignoring local optimizations and internal application flow
- if the app is non-collaborative, the RM can kill it after time-out
- Tracks overall cluster node resources
- list of containers
- resource requirements
- locality preferences
- priority ordering among requests
- list of containers
Applicaion Master (AM)
- coordinates the logical plan of a single job by requesting resources from RM. resource requests:
- locality preferences
- properties of the containers
- monitoring the process of work done inside the container
- AM have some flexibility when fulfilling “preemption” requests, this allows app to preserve work
- preemption will significantly increase cluster utilization
- Resource allocations are late binding: the process spawned is not bound to the request, but to the lease
- AM optimizaes for locality among map tasks: it would select a task with input data close to the container
Node Manager (NM)
- container launch context all containers are described by it:
- a map of environment variables
- dependencies stored remotely
- security tokens
- payloads for NM services
- command necessary to create the process
- the NM configures the environment for the container
Communications
- RM and NMs are heartbeat-based for scalability
- AM -> RM: heartbeat message: encodes its preferences and constraints
- RM -> AM: container lease
- NM -> RM -> AM: container exit status: AM takes care of fault tolerance rather than RM
Fault Tolerance
- RM: single point of failure
- recovers from its own failures by restoring its state from a persistent store on initialization
- once recovery process is complete, it kills all the containers running in the cluster, including AM
- AM can survive RM restart by using existing containers
- once recovery process is complete, it kills all the containers running in the cluster, including AM
- launches new instances of each AM
- recovers from its own failures by restoring its state from a persistent store on initialization
- NM fails:
- RM detects it by timing out its heartbeat response, mark all the containers running on that node as killed, and reports the failure to all running AMs
- NM will re-synchronize with RM, clean up its local state
- AMs are responsible fro reacting to node failures, potentially redoing work done by any containers running on that node during the fault
- RM detects it by timing out its heartbeat response, mark all the containers running on that node as killed, and reports the failure to all running AMs
- AM fails:
- its failure does not affect the availability of the cluster
- RM may restart AM
- platform offers no support to restore the AMs state * platform does not care about the restarted AM’s synchronization with its own running containers
- its failure does not affect the availability of the cluster
How to improve tez?
- multiple MapReduce jobs -> 1 tez job
- avoid unnecessary steps of persisting outputs of the intermediate MapReduce jobs to HDFS
Scheduling discipline
job has many tasks, one plan is one job, one job is one DAG
- Fairness metric:
- job-level completion time fairness
- Dominant resource fairness (max-min)
methods
- Tetris (packing)
- Delay scheduling (queue based)
- Flow-based scheduling
- Capacity scheduling
Queue based Scheduler
application tasks are put in container queue, and replicated in rack queue, and cluster queue
Origin Hadoop Fair Scheduler
get per job limit on the number of tasks scheduled (this is called sticky slot)
Delay scheduling
- do not unblock job immediately (wait for a while)
- if number of tasks > threshold, time-out
Flow based scheduling
Dominant Resource Fairness (multiple resources, jobs may differs in the resources they execute)
Equalize shares of dominant resources across jobs. e.g., some jobs takes too much memory, some jobs takes too much IO, could put these complementary demands together
Tetris is a cluster scheduler that packs tasks
- PPT
- minimizes makespan
- average completion, SRPT (shortest remaining processing time)
- trading-off packability for minimize average completion time
- SRPT: jobs: remaining time (l) in job
- l is weighted, if weight increase, it would converge to SRPT
- fairness: DRF: pick 1 job farthest away from DRF
- pick from a set of candidates rather than pick from only 1 candidate (it is DRF)
- average completion, SRPT (shortest remaining processing time)
- pick machine, task pair with highest product
<task CPU, memory> * <available CPU, memory>