Skip to content

Spark Cluster Capacity Planning

Maatary edited this page Sep 11, 2017 · 20 revisions

Work in Progress

Intro

A Spark cluster is in essence a multi-node (i.e. distributed) parallel processing engine. Hence, the first phase of deploying your distributed application for your problem at hand, is to size your cluster to meet your requirements. This activity is what is called Cluster Capacity Planning. In the following we will present an heuristic that can be applied for it, by showcasing how we have actually applied it for our problem.

Requirements


2 Scenarios: (i) Dumping the catalogue and (ii) Continuous update.

(i) We have about 1,5 millions Records in our catalogue. We would like that Spark Process them in about 30 minutes maximum. By process we mean, perform their transformation into unreconciled BibFrame Records.

(ii)In the update Scenario which corresponds to daily records being added/delete/updated, we would like to be able to keep up with a rate of 1000 per second.

Given that the Dump scenario do actually outpace the update scenario in term of requirements, in the next section we will limit ourself to the Capacity planning of the Dump scenario. This means that the Cluster will be over provisioned for the Update Scenario. However using that same heuristic, one can easily decide how to shrink the cluster to meet the requirement of the update scenario.


Heuristic

The heuristic provide for the minimal requirement, it is not in any way the exact size that you will need, it will however provide for your lower bound.

Note, we are using spark streaming, to process our dump. This has the advantage that we do not need to try to load all our dump in memory to be able to process it. Also it allows to have the result available as they are being processed, rather than wait that the full dump is processed before starting to see some results.

  • Step1: decide your processing time requirement

    • How many records do you need to process within what time constraint ?
    • 11 millions under 1 hour. If we want to process by mini-batch of 100K, we need to process them in 0.5 minutes.
    • Time Constraint = 100k record per 30 seconds
  • Step2:

    • Apply the following formula to find out the number of Parallel Processing you will need:

      • Npr = (Ptr * Nr) / time => Npr = (5 ms * 100000) / 30000 ms => Npr = 16.6 => 17 Parallel Processing.

      • The formula simply means: given the total number of record, how many set of records do i need to process in parallel for my total processing time to be under a certain time constraint. This knowing that these set will be processed in sequence, and that the total time for one set will be the number of records in the set multiple by the processing time of one record.

    • With the Number of Parallel Processing determine the number of records per processing

      • Num record per Parallel Processing = Nr / Npr => 100K / 17 = 5882
    • Calculate the Maximum size of each partition

      • Size of a Record * Num record per Parallel Processing => 12 KB * 5882 = 70584 KB ~ 70 MB
  • Step2: set the Virtual Spark cluster parameters

    • Decide the num of executors and how many Core per executors such that it must comply with the following formula:

      • num of executor * num of core per executor >= Num of Parallel Task i.e. Npr
      • We will Spread as much as possible to gain ingestion speed (There is only one network interface per node)
      • 4 * 5 >= 17. In practice one should reserve few core the OS and Spark itself 2 to 3 core. Hence we are going to have Executor of 4 core on 4 machine of at minimum 6 core.
    • Calculate the memory of each executors

      • Excutor Memory >= memory per Task * number of Task per executor + Memory for Internal + OverHead
  • Step3: Size the parameter of the Physical Cluster

Estimation

Using a prototype of our final application (see Demo/EstimatorStreaming) we have measured (i) the average processing time for a marc21 to Bibframe record conversion, (ii) and the average maximum size that a Record takes in memory.

Processing time: 5 ms

  • Spark provides some built-in metrics available on its GUI. Using these metrics we could determine that the average time to process one record is 5 ms.

Memory Size : 12KB

  • To determine the Size it takes for spark to represent our Record in memory as RDD, we use the caching operation. This will cache the data in memory. Spark provide in its user interface, the ability to see the amount of memory used to cache. Hence we can determine the size of our data in memory.

    • The heaviest representation of our record, is as a marc4j object. Hence we cache that representation for our measure (see Demo/EstimatorStreamApp for more details).
    • 70.3 MB for 6000 marc4j records (note this does not account for object that are created by libraries that one use. For instance the XSLT transformer create a lot of object and requires lot of GC. So this is your lower Bound)
      • Meaning an average of 11.6 KB per record in their most expensive representation, we will round it to 12KB.