Tez

Zhaoyu Luo bio photo By Zhaoyu Luo

Why Tez?

  1. allow users to model computation as a DAG
    • runtime query optimization, such as pruning data partitions, based on online information
    • YARN-compatible security, data-locality awareness, resource-reuse, fault-tolerance and speculation
    • pluggable API

Tez

  • Component re-use without hindering customizability of the performance-critical data plane
  • App(Hive) -> DAG(Calcite) -> AM(Vertex Manager) --physical DAG--> YARN
  • provide shared features used by Apache Hive, Pig, Spark etc:
    • negotiating resources from YARN to run application’s tasks
    • handling security within the clusters
    • recovering from hardware failures
    • publishing metrics and statistics
  • where is better than MapReduce:
    • Expressiveness: more functions than map and reduce
    • Data-plane Customizability: data transformations and data movements that define he data plane need to be completely customizable
    • Late-binding Runtime Optimizations: make late-binding decisions, by enabling updates to key abstractions at runtime
  • use Event Based Control Plane: asynchronnous, non-blocking, push-based method of communication
  • Automatic Partition Cardinality Estimation: determine the correct number of tasks in reduce phase
    • depends on the size of the data being shuffled from the mappers to the reducers
    • depends on whether it is accurately available only at runtime
  • Scheduling Optimizations: 在有partial input的情况提前启动task
    • may incur deadlock
      • do deadlock detection and preemption to take care of that
  • user level isolation: running tasks for single user in the containers of an application
    • each task runs in its own container process, the resource allocations are much finer grained
  • run spark on tez: use RDD’s lineage dependency to generate post-compilation Spark DAG, then generates Tez DAG

Component

  • DAG: what could not be converted to DAG? loop, recursive query
  • Task: represents a unit of work in a vertex
  • Vertex: core application logic: “processor”
    • each vertex can be associated with a VertexManager
    • can map to many “tasks”
    • parallelism <- static / dynamically controlled by a vertex manager
  • VertexManager:
    • it is provided a context object that notifies it about state changes like task completions
      • then it could make changes to its own vertex’s state
    • control vertex parallelism
  • Edges
    • connectivity pattern
    • one-one
    • broadcast
    • scatter-gather
    • transport mechanism: data format and physical transport mechanism (disk or memory)
      • Tez is data format agnostic: IPO can choose their own data format
    • scheduling
    • sequential (downstream node’s run may be blocked by previous)
    • concurrent
    • Data source
    • persisted
    • ephemeral

API

  • DAG API
    • create a rich work flow
    • data sources may be invoked at runtime to determine the optimal reading pattern for the initial input (since reading is always slow)
      • e.g., read data after knowing the join key space
  • Runtime API
    • Processor -> program runs in each task corresponding to vertex
      1. initialize(context)
        • run(list, list)
        • handle_events() -> taskfinish
        • interm.ouput(read/write)
    • Output
      1. Init(context)
        • writes()
        • handle_events()
    • Input
      1. Input
        • Init()
        • Reads()
        • handle_events()

Execution Efficiency

  • Locality Aware Scheduling (等一会): the framework automatically relaxes locality from node to rack and so on with delay scheduling used to add a wait period before each relaxation
  • Speculation: clone slow task
  • Container Reuse
  • Session: run AM in session mode, so that it allow tasks from multiple DAGs to reuse containers
  • Shared Object Registry

Fault Tolerance

  • Task re-execution based fault tolerance depends on deterministic and side-effect free task execution
    • it could not handle network streaming failure: input is lost
  • task output failure: 不停查dependency information找上一个missing intermediate data,直到找到源头并recompute
  • if Tez AM fails: YARN will restart AM on another node and the AM can recover its state from the checkpoint data

Professor’s nonsense

  • Dryad -> Tez -> REEF
  • What is interesting about Tez
    • Built-in instantiation
      • security/isolation
      • built-in performance optimization
      • fault tolerance, stragglers
      • scheduling, resource negotiation
    • Dataplane-specific
    • Computation logic structure
    • Partitioning