A general-purpose big data framework

Paul Krzyzanowski

November 25, 2015


MapReduce turned out to be an incredibly useful and widely-deployed framework for processing large amounts of data. However, its design forces programs to comply with its computation model, which is:

  1. Map: create a key,value pairs
  2. Shuffle: combine common keys together and partition them to reduce workers
  3. Reduce: process each unique key and all of its associated values

It turned out that many applications had to run MapReduce over multiple passes to process their data. All intermediate data had to be stored back in the file system (GFS at Google, HDFS elsewhere), which tended to be slow since stored data was not just written to disks but also replicated. Moreover, the next MapReduce phase could not start until the previous MapReduce job completed fully.

MapReduce was also designed to read its data from a distributed file system (GFS/HDFS). In many cases, however, data resides within an SQL database or is streaming in (e.g, activity logs, remote monitoring).

Spark is a framework that provides a highly flexible and general-purpose way of dealing with big data processing needs, does not impose a rigid computation model, and supports a variety of input types. This enables Spark to deal with text files, graph data, database queries, and streaming sources and not be confined to a two-stage processing model. Programmers can develop arbitrarily-complex, multi-step data pipelines arranged in an arbitrary directed acyclic graph (DAG) pattern.

Programming in Spark involves defining a sequence of transformations and actions. Spark has support for a map action and a reduce operation, so it can implement traditional MapReduce operations but it also supports SQL queries, graph processing, and machine learning. Unlike MapReduce, Spark stores its intermediate results in memory, providing for dramatically higher performance.

Spark architecture: tasks

An application that uses Spark identifies data sources and the operations on that data. The main application, called the driver program is linked with the Spark API, which creates a SparkContext. This is the heart of the Spark system and coordinates all processing activity. This SparkContext in the driver program connects to a Spark cluster manager. The cluster manager responsible for allocating worker nodes, launching executors on them, and keeping track of their status.

Each worker node runs one or more executors. An executor is a process that runs an instance of a Java Virtual Machine (JVM). When each executor is launched by the manager, it establishes a connection back to the driver program. The executor runs tasks on behalf of a specific SparkContext (application) and keeps related data in memory or disk storage. A task is a transformation or action. The executor remains running for the duration of the application. This provides a performance advantage over the MapReduce approach since new tasks can be started very quickly.

The executor also maintains a cache, which stores frequently-used data in memory instead of having to store it to a disk-based file as the MapReduce framework does.

The driver goes through the user’s program, which consists of actions and transformations on data and converts that into a series of tasks. The driver then sends tasks to the executors that registered with it. A task is application code that runs in the executor on a Java Virtual Machine (JVM) and can be written in languages such as Scala, Java, Python, Clojure, and R. It is transmitted as a jar file to an executor, which then runs it.

Spark architecture: Resilient Distributed Data (RDD)

Data in Spark is a collection of Resilient Distributed Datasets (RDDs). This is often a huge collection of stuff. Think of an individual RDD as a table in a database or a structured file.

Input data is organized into RDDs, which will often be partitioned across many computers. RDDs can be created in three ways:

  1. They can be present as any file stored in HDFS or any other storage system supported in Hadoop. This includes Amazon S3 (a key-value server, similar in design to Dynamo), HBase (Hadoop’s version of Bigtable), and Cassandra (a no-SQL eventually-consistent database). This data is created by other services, such as event streams, text logs, or a database. For instance, the results of a specific query can be treated as an RDD. A list of files in a specific directory can also be an RDD.

  2. RDDs can be streaming sources using the Spark Streaming extension. This could be a stream of events from remote sensors, for example. For fault tolerance, a sliding window is used, where the contents of the stream are buffered in memory for a predefined time interval.

  3. An RDD can be the output of a transformation function. This allows one task to create data that can be consumed by another task and is the way tasks pass data around. For example, one task can filter out unwanted data and generate a set of key-value pairs, writing them to an RDD. This RDD will be cached in memory (overflowing to disk if needed) and will be read by a task that reads the output of the task that created the key-value data.

RDDs have specific properties:

  • They are immutable. That means their contents cannot be changed. A task can read from an RDD and create a new RDD but it cannot modify an RDD. The framework magically garbage collects unneeded intermediate RDDs.

  • They are typed. An RDD will have some kind of structure within in, such as a key-value pair or a set of fields. Tasks need to be able to parse RDD streams.

  • They are ordered. An RDD contains a set of elements that can be sorted. In the case of key-value lists, the elements will be sorted by a key. The sorting function can be defined by the programmer but sorting enables one to implement things like Reduce operations.

  • They are partitioned. Parts of an RDD may be sent to different servers. The default partitioning function is to send a row of data to the server corresponding to hash(key) mod servercount.

RDD operations

Spark allows two types of operations on RDDs: transformations and actions. Transformations read an RDD and return a new RDD. Example transformations are map, filter, groupByKey, and reduceByKey. Transformations are evaluated lazily, which means they are computed only when some task wants their data (the RDD that they generate). At that point, the driver schedules them for execution.

Actions are operations that evaluate and return a new value. When an action is requested on an RDD object, the necessary transformations are computed and the result is returned. Actions tend to be the things that generate the final output needed by a program. Example actions are reduce, grab samples, and write to file.

Sample transformations

Transformation Description
map(func) Pass each element through a function func
filter(func) Select elements of the source on which func returns true
flatmap(func) Each input item can be mapped to 0 or more output items
sample(withReplacement, fraction, seed) Sample a fraction fraction of the data, with or without replacement, using a given random number generator seed
union(otherdataset) Union of the elements in the source data set and otherdataset
distinct([numtasks]) The distinct elements of the source dataset
groupByKey([numtasks]) When called on a dataset of (K, V) pairs, returns a dataset of (K, seq[V]) pairs
reduceByKey(func, [numtasks]) Aggregate the values for each key using the given reduce function
sortByKey([ascending], [numtasks]) Sort keys in ascending or descending order
join(otherDataset, [numtasks]) Combines two datasets, (K, V) and (K, W) into (K, (V, W))
cogroup(otherDataset, [numtasks]) Given (K, V) and (K, W), returns (K, Seq[V], Seq[W])
cartesian(otherDataset) For two datasets of types T and U, returns a dataset of (T, U) pairs

Sample actions

Action Description
reduce(func) Aggregate elements of the dataset using func.
collect(func, [numtasks]) Return all elements of the dataset as an array
count() Return the number of elements in the dataset
first() Return the first element of the dataset
take(n) Return an array with the first n elements of the dataset
takeSample(withReplacement, fraction, seed) Return an array with a random sample of num elements of the dataset

Data storage

Spark does not care how data is stored. The appropriate RDD connector determines how to read data. For example, RDDs can be the result of a query in a Cassandra database and new RDDs can be written to Cassandra tables. Alternatively, RDDs can be read from HDFS files or written to an HBASE table.

Fault tolerance

For each RDD, the driver tracks the sequence of transformations used to create it. That means every RDD knows which task needed to create it. If any RDD is lost (e.g., a task that creates one died), the driver can ask the task that generated it to recreate it. The driver maintains the entire dependency graph, so this recreation may end up being a chain of transformation tasks going back to the original data.

Sample program

Here is a sample of a trivially small program that uses Spark and processes log files.

The transformation (which creates new RDDs) reads ERROR message from a log file. Anything that is not an error is ignored. The assumption is that the log message begins with the text “ERROR”, contains tab-separated fields, and the next field identifies the source of the error. In this case the error may be due to “php” or “mysql”. We split the string by tabs, extract the second element (element 1), and that becomes our output RDD.

The actions are to count messages that contain “mysql” and to count messages that contain “php”.

// base RDD
val lines = sc.textFile("hdfs://...”)
// transformed RDDs
val errors = lines.filter(_.startsWith("ERROR"))
val messages ="\t")).map(r => r(1)) messages.cache()

// action 1 messages.filter(_.contains("mysql")).count()
// action 2 messages.filter(_.contains("php")).count()