Chapter 6: Resource Management: Performance Assuredness in Distributed Cloud Computing via Online Reconfigurations – Assured Cloud Computing

Resource Management: Performance Assuredness in Distributed Cloud Computing via Online Reconfigurations

Mainak Ghosh, Le Xu, Indranil Gupta1

Department of Computer Science, University of Illinois at Urbana-Champaign, Urbana, IL, USA

Cloud computing relies on software for distributed batch and stream processing, as well as distributed storage. This chapter focuses on an oft-ignored angle of assuredness: performance assuredness. A significant pain point today is the inability to support reconfiguration operations, such as changing of the shard key in a sharded storage/database system, or scaling up (or down) of the number of virtual machines (VMs) being used in a stream or batch processing system. We discuss new techniques to support such reconfiguration operations in an online manner, whereby the system does not need to be shut down and the user/client-perceived behavior is indistinguishable regardless of whether a reconfiguration is occurring in the background, that is, the performance continues to be assured in spite of ongoing background reconfiguration. Next, we describe how to scale-out and scale-in (increase or decrease) the number of machines/VMs in cloud computing frameworks like distributed stream processing and distributed graph processing systems, again while offering assured performance to the customer in spite of the reconfigurations occurring in the background. The ultimate performance assuredness is the ability to support SLAs/SLOs (service-level agreements/objectives) such as deadlines. We present a new real-time scheduler that supports priorities and hard deadlines for Hadoop jobs. We implemented our reconfiguration systems as patches to several popular and open-source cloud computing systems, including MongoDB and Cassandra (storage), Storm (stream processing), LFGraph (graph processing), and Hadoop (batch processing).

6.1 Introduction

Cloud computing relies on distributed systems for storing (and querying) large amounts of data, for batch computation of these data and for streaming computation of big data, incoming at high velocity. Distributed storage systems include NoSQL databases such as MongoDB [1] and key-value stores such as Cassandra [2], which have proliferated for workloads that need fast and weakly consistent access to data. A large amount of batch computation today relies on Apache Hadoop [3], which uses the MapReduce paradigm. Apache Storm [4] is a prominent stream processing system used by a variety of companies; others include Heron [5] (used by Twitter) and Samza [6] (used by LinkedIn).

Design of the first generation of these systems focused largely on scale and optimizing performance, but these systems have reached a level of maturity that now calls for performance assuredness. In other words, in spite of changes made to a distributed cloud service (such as the ones already listed), the service should run uninterrupted and offer performance (latency or throughput) and availability indistinguishable from those in the common-case operations. In general, the requirement is that clients (front-end machines) and eventually user devices should be unaware of any reconfiguration changes taking place in the background.

Examples of such reconfiguration operations include (i) changing the layout of data in a sharded cloud database, for example, by changing the shard key; (ii) scaling-out or scaling-in the number of machines/VMs involved in a distributed computation or database; and (iii) eventually, supporting predictable service level agreements/objectives (SLAs/SLOs) for jobs. Support for those operations will make the performance of each job predictable, push systems toward supporting multitenant scenarios (in which multiple jobs share the same cluster), and allow developers to specify requirements (in the form of SLAs/SLOs); the scheduler would then be able to manage resources automatically to satisfy these SLAs/SLOs without human involvement.

This chapter is not focused on security challenges. However, even meeting performance assuredness goals is a significant undertaking today, and it entails several challenges. First, the sheer scale of the data being stored or processed, and the nontrivial number of machines/VMs involved, means that decision to migrate data or computation need to be made wisely. Second, the de facto approach used today in industry systems is to shut down services before reconfiguration; this is unacceptable, as it negatively affects availability and latency (in storage/databases), drops throughput to zero (in stream processing systems), and wastes work (in batch processing systems). What is needed is a way to migrate components in the background, without affecting the foreground performance perceived by clients and users. Third, the mechanisms to scale-out and scale-in can be used either manually or in an adaptive fashion, such that they are driven by logic that seeks to satisfy SLAs/SLOs.

In this chapter, we describe multiple new approaches to attacking different pieces of the broad problem already described. We incorporate performance assuredness into cloud storage/database systems, stream processing systems, and batch processing systems. Specifically, we describe the following innovations:

  1. Reconfigurations in NoSQL distributed storage/database systems:
    1. The Morphus system supports reconfiguration operations such as shard key change in (flexibly) sharded databases such as MongoDB (in which we implemented our techniques). Morphus is described in Section 6.4.2.
      • Key idea: Our approach is to make optimal decisions on where to place new chunks based on bipartite graph matching (which minimizes network volume transferred), to support concurrent migration of data and processing of foreground queries in a careful and staged manner, and to replay logged writes received while the reconfiguration was in progress.
    2. The Parqua system extends our Morphus design to ring-based key-value stores. Our implementation is in Apache Cassandra, the most popular key-value store in industry today. Parqua is described in Section 6.4.3.
  2. Scale-out/scale-in in distributed stream and distributed graph processing systems:
    1. The Stela system supports automated scale-out/scale-in in distributed stream processing systems. Our implementation is in Apache Storm, the most popular stream processing system in industry today. Stela is described in Section 6.5.1.
      • Key idea: We identify the congestion level of operators in the stream processing job (which is a DAG of operators). For scale-out, we give more resources to the most congested operators, while for scale-in we take resources away from the least congested operators. We do so in the background without affecting ongoing processing at other operators in the job.
    2. For graph processing elasticity, we support scale-out/scale-in in distributed graph processing systems; our implementation is on our home-baked system called LFGraph, which is described in Section 6.5.2.
      • Key idea: We optimally decide how to repartition the vertices of the graphs (e.g., a Web graph or Facebook-style social network) among the remaining servers, so as to minimize the amount of data moved and thus the time to reconfigure. We perform the migration carefully and in the background, allowing the iteration-based computation to proceed normally.
  3. Priorities and deadlines in batch processing systems: The Natjam system incorporates support for job priorities and deadlines (i.e., SLAs/SLOs) in Apache YARN. Our implementation is in Apache Hadoop, the most popular distributed batch processing system in industry today. Natjam is described in Section 6.6.
    • Key idea: We quickly checkpoint individual tasks, so that if one is preempted by a task of a higher priority (or lower deadline) job, then it can resume from where it left off (thus avoiding wasted work). We developed policies for both job-level and task-level evictions that allow the scheduler to decide which running components will be victimized when a more important job arrives.

In this chapter, we present the key design techniques and algorithms, outline design and implementation details, and touch on key experimental results. We performed all experiments by deploying both the original system(s) and our modified system on real clusters and subjecting them to real workloads.

6.2 Vision: Using Cloud Technology in Missions

Missions that rely on the cloud, by definition, have strong time criticality. This implies two requirements:

  1. Any changes that the mission team needs to make must be made on an already-running system in the cloud (e.g., a database or a processing system), without shutting it down and without affecting the ongoing foreground performance of the system. This capability will maximize the flexibility with which the team can change configuration settings on the fly, including the data layout (affecting query time in a database) and the number of machines/VMs running a computation.
  2. The team must be able to specify priorities or deadlines for certain jobs. A mission typically consists of a diverse set of jobs running simultaneously on the cloud. For instance, a Hadoop job tracking the flight of a fighter aircraft must be run at a higher priority than a Hadoop job that is computing the overnight logs of yesterday's missions (and perhaps must be run with a deadline).

The research contributions described in this chapter fulfill both of these requirements. First, our Morphus and Parqua systems make it possible to change configuration parameters (such as the shard key) that affect the layout of data in a database, without affecting query latency, and while minimizing total transfer time. That gives the team flexibility to make such configuration changes at any time. Our Stela system allows the team to scale-out or scale-in, at will, the number of machines or VMs associated with a stream processing job. For instance, consider a stream processing job tracking the camera feed from an aircraft fleet (e.g., in order to identify enemy positions). When the volume of such data increases, our Stela system allows the team to scale-out the number of machines quickly to cope with the increased workload, while still continuing to process the incoming stream at a high rate. Similar flexibility extends to our scale-out/scale-in techniques for distributed graph processing.

Second, our Natjam system allows Hadoop jobs (running in a YARN cluster) to be associated with a priority or a deadline. The Natjam scheduler automatically ensures that higher priority (or earlier deadline) jobs preempt other jobs, run sooner, and thus finish sooner – an ability that is much needed in critical missions that rely on the cloud.

6.3 State of the Art

Here, we summarize the state of the art for each of the individual challenges and systems covered in this chapter.

6.3.1 State of the Art: Reconfigurations in Sharded Databases/Storage Database Reconfigurations

Research in distributed databases has focused on query optimization and load balancing [7], and orthogonally on using group communication for online reconfiguration [8]; however, that work has not solved the core algorithmic problems for efficient reconfiguration. Online schema change was targeted in Google [9], but the resultant availabilities were lower than those provided by Morphus and Parqua. In a parallel work, Elmore et al. [10] looked into the reconfiguration problem for a partitioned main memory database such as H-Store. Data placement in parallel databases has used hash-based and range-based partitioning [11,12], but optimality for reconfiguration has not been a goal. Live Migration

The problem of live migration has been looked into in the context of databases. Albatross [13], Zephyr [14], and ShuttleDB [15] address live migration in multitenant transactional databases. Albatross and ShuttleDB use iterative operation replay, such as Morphus, while Zephyr routes updates based on current data locations. Data migration in these systems happens between two different sets of servers, while Morphus and Parqua achieve data migration within a replica set. Also, these papers do not propose optimal solutions for any reconfiguration operation. Opportunistic lazy migration, explored in the Relational Cloud [16], entails longer completion times. Tuba [17] addressed the problem of migration in a geo-replicated setting. The authors avoided write throttle by having multiple masters at the same time, which is not supported by MongoDB and Cassandra.

Morphus's techniques naturally bear some similarities to live VM migration. Precopy techniques migrate a VM without stopping the OS, and if this fails, then the OS is stopped [18]. Like precopy, Morphus also replays operations that occurred during the migration. Precopy systems also use write throttling [19], and precopy has been used in database migration [20]. Network Flow Scheduling

For network flow scheduling, Chowdhury et al. [21] proposed a weighted flow-scheduling approach that allocates multiple TCP connections to each flow to minimize migration time. Our WFS approach improves upon their approach by considering network latencies as well. Morphus's performance will likely improve further if we also consider bandwidth. Hedera [22] also provides a dynamic flow scheduling algorithm for multirooted network topology. While these techniques may improve reconfiguration time, Morphus's approach is end-to-end and is less likely to disrupt normal reads and writes that use the same network links.

6.3.2 State of the Art: Scale-Out/Scale-In in Distributed Stream Processing Systems Real-Time Reconfigurations

Based on its predecessor Aurora [23], Borealis [24] is a stream processing engine that enables modification of queries on the fly. Borealis focuses on load balancing on individual machines and distributes load shedding in a static environment. Borealis also uses ROD (resilient operator distribution) to determine the best operator distribution plan that is closest to an “ideal” feasible set: a maximum set of machines that are underloaded. Borealis does not explicitly support elasticity. Twitter's Heron [5] improves on Storm's congestion-handling mechanism by using back pressure; however, elasticity is not explicitly addressed. Live Migration

Stormy [25] uses a logical ring and consistent hashing to place new nodes upon a scale-out. Unlike Stela, it does not take congestion into account. StreamCloud [26] builds elasticity into the Borealis Stream Processing Engine [27]. StreamCloud modifies the parallelism level by splitting queries into subqueries and uses rebalancing to adjust resource usage. Stela does not change running topologies, because we believe that would be intrusive to the applications. Real-Time Elasticity

SEEP [28] uses an approach to elasticity that focuses mainly on an operator's state management. It proposes mechanisms to back up, restore, and partition operators' states in order to achieve short recovery time. Several papers have focused on elasticity for stateful stream processing systems. Work [29,30] from IBM enabled elasticity for both IBM System S [31–33] and SPADE [34] by increasing the parallelism of processing operators. It applied networking concepts such as congestion control to expand and contract the parallelism of a processing operator by constantly monitoring the throughput of its links. They do not assume that a fixed number of machines is provided (or taken away) by the users. Our system aims at intelligent prioritization of target operators to parallelize more toward (or migrate further from) the user-determined number of machines that join (or are taken away from) the cluster, with a mechanism for optimizing throughput.

Recent work [35] proposes an elasticity model that provides latency guarantees by tuning the task-wise parallelism level in a fixed-size cluster. Meanwhile, another recent effort [36] implemented stream processing system elasticity; however, it focused on both latency (not throughput) and policy (not mechanisms). Nevertheless, Stela's mechanisms can be used as a black box inside this system.

Some of these recent efforts have looked at policies for adaptivity [36], while others [25,26,29,30] focus on the mechanisms for elasticity. These are important building blocks for adaptivity. To the best of our knowledge, Ref. [37] describes the only prior mechanism for elasticity in stream processing systems; in Sections–, we compare it to Stela.

6.3.3 State of the Art: Scale-Out/Scale-In in Distributed Graph Processing Systems

To the best of our knowledge, we are the first to explore elasticity for distributed graph computations. However, elasticity has been explored in many other areas. Data Centers

AutoScale [38] enables elastic capacity management for data centers. Its goal is to reduce power wastage by maintaining just enough server capacity for the current workload. Zhang et al. [39] proposes to solve the same problem using a Model Predictive Control framework. Cloud and Storage Systems

The creators of CloudScale [40] and Jiang et al. [41] propose mechanisms for scaling up VM resources based on predicted application needs in an Infrastructure as a Service (IaaS) environment. AGILE [42] proposes mechanisms to scale-out VM resources based on predicted application needs.

Pujol et al. [43] propose a social partitioning and replication middleware to enable efficient storage of social network graphs. The technique enables the storage system to scale-out/scale-in based on need. Albatross [13] enables scaling of multitenant databases by using live migration. TIRAMOLA [44] allows NoSQL storage clusters to scale-out or scale-in by using user-provided policies to apply a Markov decision process on the workload. The creators of Transactional Auto Scaler [45] propose another storage resource scaling mechanism, which uses analytical modeling and machine learning to predict workload behavior. Data Processing Frameworks

Starfish [46] performs tuning of Hadoop parameters at job, workflow, and workload levels. Herodotou et al. [47] propose Elastisizer, which has the ability to predict cluster size for Hadoop workloads. Partitioning in Graph Processing

PowerGraph [48] performs vertex assignment using balanced partitioning of edges. The aim is to limit the number of servers spanned by the vertices. Distributed GraphLab [49] involves a two-phase assignment of vertices to servers; the first phase creates partitions that are more numerous than the servers. In the second phase, the servers load their respective partitions based on a balanced partitioning. That makes it possible to load the graph in a distributed manner, and to change the number of servers, without affecting the initial load phase. Stanton et al. [50] discuss partitioning strategies of streaming graph data. However, they do not explore elasticity. Use of partitioning during a scale-out/scale-in operation is not viable, as it would partition the entire graph; instead, we do incremental repartitioning. Dynamic Repartitioning in Graph Processing

Vaquero et al. [51] proposes a vertex migration heuristic between partitions, to maintain balance and reduce communication cost. GPS [52] involves dynamic repartition of vertices by colocating the vertices that exchange larger numbers of messages with each other. While those efforts did not explicitly explore on-demand elasticity, our techniques are orthogonal and can be applied to such systems. More specifically, one can use our vertex repartitioning technique in such systems by treating each partition as the set of vertices currently assigned to that server.

6.3.4 State of the Art: Priorities and Deadlines in Batch Processing Systems OS Mechanisms

Sharing finite resources among applications is a fundamental issue in operating systems [53]. Not surprisingly, Natjam's eviction policies are analogous to multiprocessor scheduling techniques (e.g., shortest task first) and to eviction policies for caches and paged OSes. However, our results are different, because MapReduce jobs need to have all tasks finished. PACMan [54] looks at eviction policies for caches in MapReduce clusters, and it can be used orthogonally with Natjam. Preemption

Amoeba, a system built in parallel with ours, provides instantaneous fairness with elastic queues and uses a checkpointing mechanism [55]. The main differences between Natjam and Amoeba are that (i) Natjam focuses on job and task eviction policies, (ii) Natjam focuses on jobs with hard deadlines, and (iii) our implementation of it works directly with Hadoop 0.23, while Amoeba requires use of the prototype Sailfish system [56]. Further, Sailfish was built on Hadoop 0.20, and Hadoop 0.23 later addressed many relevant bottlenecks, for example, the use of read-ahead seeks and the use of Netty [57] to speed up shuffle. Finally, our eviction policies and scheduling can be implemented orthogonally in Amoeba.

Delay scheduling [58] avoids killing map tasks while achieving data locality. In comparison, Natjam focuses on reduce tasks, as they are longer than maps and release resources more slowly, making our problem more challenging. Global preemption [59] selects tasks to kill across all jobs, which is a suboptimal solution.

A recently started Hadoop JIRA issue [60] also looks at checkpointing and preemption of reduce tasks. Such checkpointing can be used orthogonally with our eviction policies, which comprise the primary contribution of this part of our work. Finally, Piccolo [61] is a data-processing framework that uses checkpointing based on consistent global snapshots; in comparison, Natjam's checkpoints are local. Real-Time Scheduling

ARIA [62] and Conductor [63] estimate how a Hadoop job needs to be scaled up to meet its deadline, for example, based on past execution profiles or a constraint satisfaction problem. They do not target clusters with finite resources. Real-time constraint satisfaction problems have been solved analytically [64], and Jockey [65] addressed DAGs of data-parallel jobs; however, eviction policies and Hadoop integration were not fleshed out in that work. Statistics-driven approaches have been used for cluster management [66] and for Hadoop [67]. Much work has also been done in speeding up MapReduce environments by tackling stragglers, for example [68,69], but those efforts do not support job priorities.

Dynamic proportional share scheduling [70] allows applications to bid for resources but is driven by economic metrics rather than priorities or deadlines. The network can prioritize data for time-sensitive jobs [21], and Natjam can be used orthogonally.

Natjam focuses on batch jobs rather than stream processing or interactive queries. Stream processing in the cloud has been looked at intensively, for example, in the work on Hadoop Online [71], Spark [72], Storm [73], Timestream [74], and Infosphere [75]. BlinkDB [76] and MeT [77] optimize interactive queries for SQL and NoSQL systems.

Finally, classical work on real-time systems has proposed a variety of scheduling approaches, including classical EDF and rate monotonic scheduling [78,79], priority-based scheduling of periodic tasks [80], laxity-based approaches [81], and handling of task DAGs [82]. Natjam is different in its focus on MapReduce workloads. Fairness

Providing fairness across jobs has been a recent focus in cloud computing engines. Outcomes of such work include Hadoop's Capacity Scheduler [83] and Fair Scheduler [84], which provide fairness by allowing an administrator to configure queue capacities and job priorities. They do not, however, allow resource preemption [85]. Quincy [86] solves an optimization problem to provide fairness in DryadLINQ [87]. Quincy does consider preemption but proposes neither eviction policies nor checkpointing mechanisms. Finally, there has been a recent focus on satisfying SLAs [88] and satisfying real-time QoS [89], but such efforts have not targeted MapReduce clusters. Cluster Management with SLOs

Recent cluster management systems have targeted SLOs, for example, Omega [90], Cake [91], Azure [92], Centrifuge [93], and Albatross [13]. Mesos [94] uses dominant resource fairness across applications that share a cluster, and Pisces [95] looks at multitenant fairness in key-value stores.

6.4 Reconfigurations in NoSQL and Key-Value Storage/Databases

In this section, we describe how to perform reconfiguration operations in both sharded NoSQL databases (e.g., MongoDB) and key-value stores that use hash-based partitioning (e.g., Cassandra). We first discuss motivations in Section 6.4.1, then our Morphus system (integrated into MongoDB) in Section 6.4.2, and finally our Parqua system (integrated into Cassandra) in Section 6.4.3. (We discussed related work in Section 6.3.1.)

6.4.1 Motivation

Distributed NoSQL storage systems comprise one of the core technologies in today's cloud computing revolution. These systems are attractive because they offer high availability and fast read/write operations for data. They are used in production deployments for content management, archiving, e-commerce, education, finance, gaming, e-mail, and health care. The NoSQL market is expected to earn $14 billion in revenue during 2015–2020 and become a $3.4 billion market by 2020 [96].

In today's NoSQL deployments [1,2,97,98], data-centric2 global reconfiguration operations are quite inefficient. The reason is that executing them relies on ad hoc mechanisms rather than solution of the core underlying algorithmic and system design problems. The most common solution involves first saving a table or the entire database, and then reimporting all the data into the new configuration [99]. This approach leads to a significant period of unavailability. A second option may be to create a new cluster of servers with the new database configuration and then populate it with data from the old cluster [13–15]. This approach does not support concurrent reads and writes during migration, a feature we would like to provide.

Consider an admin who wishes to change the shard key inside a sharded NoSQL store such as MongoDB [99]. The shard key is used to split the database into blocks, where each block stores values for a contiguous range of shard keys. Queries are answered much faster if they include the shard key as a query parameter (because otherwise the query needs to be multicast). For today's systems, it is strongly recommended that the admin choose the shard key at database creation time and not change it afterward. However, that approach is challenging because it is hard to guess how the workload will evolve in the future. In reality, there are many reasons why admins might need to change the shard key, such as changes in the nature of the data being received, evolving business logic, the need to perform operations such as joins with other tables, and the discovery in hindsight that prior design choices were suboptimal. As a result, the reconfiguration problem has been a fervent point of discussion in the community for many years [100,101].

In this work, we present two systems that support automated reconfiguration. Our systems, called Morphus and Parqua, allow reconfiguration changes to happen in an online manner, that is, by concurrently supporting reads and writes on the database table while its data are being reconfigured.

6.4.2 Morphus: Reconfigurations in Sharded Databases/Storage

This section is based on Ref. [102], and we encourage the reader to refer to it for further details on design, implementation, and experiments. Assumptions

Morphus assumes that the NoSQL system features master–slave replication, range-based (as opposed to hash-based) sharding,3 and flexibility in data assignment.4 Several databases satisfy these assumptions, for example, MongoDB [1], RethinkDB [103], and CouchDB [104]. To integrate our Morphus system, we chose MongoDB because of its clean documentation, strong user base, and significant development activity. To simplify discussion, we assume a single data center, but our paper [105] present results for geo-distributed experiments. Finally, we focus on NoSQL rather than ACID databases because the simplified CRUD (Create, Read, Update, Delete) operations allow us to focus on the reconfiguration problem. Addressing of ACID transactions is an exciting avenue that our work opens up.

Morphus solves three major challenges: (i) In order to be fast, data migration across servers must incur the least traffic volume. (ii) Degradation of read and write latencies during reconfiguration must be small compared to operation latencies when there is no reconfiguration. (iii) Data migration traffic must adapt itself to the data center's network topology. MongoDB System Model

We have chosen to incorporate Morphus into a popular sharded key-value store, MongoDB v2.4 [1]. As noted earlier, our choice of MongoDB was driven not just by its popularity but also by its clean documentation, its strong user base, and the significant development and discussion around it.

A MongoDB deployment consists of three types of servers. The mongod servers store the data chunks themselves, which typically are grouped into disjoint replica sets. Each replica set contains the same number of servers (typically 3), which are exact replicas of each other, with one of the servers marked as a primary (master) and the others acting as secondaries (slaves). The configuration parameters of the database are stored at the config servers. Clients send CRUD queries to a set of front-end servers, also called mongos. The mongos servers also cache some of the configuration information from the config servers, for example, in order to route queries, they cache mappings from each chunk range to a replica set.

A single database table in MongoDB is called a collection. Thus, a single MongoDB deployment consists of several collections. Reconfiguration Phases in Morphus

Morphus allows a reconfiguration operation to be initiated by a system administrator on any collection. Morphus executes the reconfiguration via five sequential phases, as shown in Figure 6.1.

Figure 6.1 Morphus phases. Arrows represent RPCs. M stands for Master and S for Slave.

First, in the prepare phase, Morphus prepares for the reconfiguration by creating partitions (with empty new chunks) by using the new shard key (prepare phase). Second, in the isolation phase, Morphus isolates one secondary server from each replica set. Third, in the execution phase, these secondaries exchange data based on the placement plan chosen by the mongos. In the meantime, further operations may have arrived at the primary servers, and they are now replayed at the secondaries in the fourth phase – recovery phase. When the reconfigured secondaries have caught up, they swap places with their primaries in the fifth phase – commit phase.

At that point, the database has been reconfigured and can start serving queries with the new configuration. However, other secondaries in all replica sets need to reconfigure as well. This slave catchup is done in multiple rounds, with the number of rounds equal to the size of the replica set.

We discuss the individual phases in detail in our paper [105].

Read–Write Behavior

The end of the commit phase marks the switch to the new shard key. Until this point, all queries with the old shard key were routed to the mapped server and all queries with the new shard key were multicast to all the servers (which is normal MongoDB behavior). After the commit phase, a query with the new shard key is routed to the appropriate server (the new primary). Queries that do not use the new shard key are handled with a multicast, which again is normal MongoDB behavior.

Reads in MongoDB offer per-key sequential consistency. Morphus is designed so that it continues to offer the same consistency model for data undergoing migration. Algorithms for Efficient Shard Key Reconfigurations

In a reconfiguration operation, the data present in shards across multiple servers are resharded. The new shards need to be placed at the servers in such a way as to reduce the total network transfer volume during reconfiguration and achieve load balance. This section presents optimal algorithms for this planning problem.

We present two algorithms for placement of the new chunks in the cluster. Our first algorithm is greedy and is optimal in the total network transfer volume. However, it may create bottlenecks by clustering many new chunks at a few servers. Our second algorithm, based on bipartite matching, is optimal in network transfer volume among all those strategies that ensure load balance.

Greedy Assignment

The greedy approach considers each new chunk independently. For each new chunk NCi, the approach evaluates all the N servers. For each server Sj, it calculates the number of data items of chunk NCi that are already present in old chunks at server Sj. The approach then allocates each new chunk NCi to the server Sj that has the maximum value of , that is, . As chunks are considered independently, the algorithm produces the same output irrespective of the order in which chunks are considered by it.

To illustrate the greedy scheme in action, Figure 6.2 provides two examples for the shard key change operation. In each example, the database has three old chunks, OC1OC3, each of which contains three data items. For each data item, we show the old shard key Ko and the new shard key Kn (both in the range 1–9). The new configuration splits the new key range evenly across the three chunks, shown as NC1NC3.

Figure 6.2 Greedy and Hungarian strategies for shard key change using (a) balanced, (b) unbalanced old chunk configuration. S1S3 represent servers. OC1OC3 and NC1NC3 are old and new chunks, respectively. Ko and Kn are old and new shard keys, respectively. Edges are annotated with weights.

In Figure 6.2a, the old chunks are spread evenly across servers S1S3. The edge weights in the bipartite graph show the number of data items of NCi that are local at Sj, that is, values. Thick lines show the greedy assignment.

However, the greedy approach may produce an unbalanced chunk assignment for skewed bipartite graphs, as in Figure 6.2b. While the greedy approach minimizes network transfer volume, it assigns new chunks NC2 and NC3 to server S1, while leaving server S3 empty.

Load Balance via Bipartite Matching

Load balancing of chunks across servers is important for several reasons. First, it improves read/write latencies for clients by spreading data and queries across more servers. Second, it reduces read/write bottlenecks. Finally, it reduces the tail of the reconfiguration time by preventing the allocation of too many chunks to any one server.

Our second strategy achieves load balance by capping the number of new chunks allocated to each server. With m new chunks, this per-server cap is chunks. We then create a bipartite graph with two sets of vertices, top and bottom. The top set consists of vertices for each of the N servers in the system; the vertices for server are denoted by . The bottom set of vertices consists of the new chunks. All edges between a top vertex and a bottom vertex NCi have an edge cost equal to , that is, the number of data items that will move to server if new chunk NCi is allocated to it.

Assigning new chunks to servers in order to minimize data transfer volume now becomes a bipartite matching problem. Thus, we find the minimum weight matching by using the classical Hungarian algorithm [106]. The complexity of this algorithm is , where chunks. This reduces to O(m3). The greedy strategy becomes a special case of this algorithm with .

Figure 6.2b shows the outcome of the bipartite matching algorithm with dotted lines in the graph. While it incurs the same overall cost as the greedy approach, it provides the benefit of a load-balanced new configuration, wherein each server is allocated exactly one new chunk.

While we focus on the shard key change, this technique can also be used for other reconfigurations, such as changing shard size, or cluster scale-out and scale-in. The bipartite graph would be drawn appropriately (depending on the reconfiguration operation) and the same matching algorithm used. For the purpose of concreteness, the rest of this section of the chapter focuses on shard key change.

Finally, we have used data size (number of key-value pairs) as the main cost metric. Instead, we could use traffic to key-value pairs as the cost metric and derive edge weights in the bipartite graph (Figure 6.2) from these traffic estimates. The Hungarian approach on this new graph would balance out traffic load, while trading off optimality. Further exploration of this variant is beyond our scope here. Network Awareness

Data centers use a wide variety of topologies, the most popular being hierarchical; for example, a typical two-level topology consists of a core switch and multiple rack switches. Others that are commonly used in practice include fat-trees [107], CLOS [108], and butterfly topologies [109].

Our first-cut data migration strategy, discussed in Section, was chunk based: It assigned as many sockets (TCP streams) to a new chunk C at its destination server as there were source servers for C, that is, it assigned one TCP stream per server pair. Using multiple TCP streams per server pair has been shown to better utilize the available network bandwidth [21]. Further, the chunk-based approach also results in stragglers in the execution phase. In particular, we observe that 60% of the chunks finish quickly, followed by a 40% cluster of chunks that finish late.

To address these two issues, we propose a weighted fair sharing (WFS) scheme that takes both data transfer size and network latency into account. Consider a pair of servers i and j, where i is sending some data to j during the reconfiguration. Let denote the total amount of data that i needs to transfer to j, and denote the latency in the shortest network path from i to j. Then, we set , the weight for the flow from server i to j, as follows:

In our implementation, the weights determine the number of sockets that we assign to each flow. We assign each destination server j a total number of sockets , where K is the total number of sockets throughout the system. Thereafter, each destination server j assigns each source server i a number of sockets, .

However, may be different from the number of new chunks that j needs to fetch from i. If is larger, we treat each new chunk as a data slice, and iteratively split the largest slice into smaller slices until equals the total number of slices. Similarly, if is smaller, we use iterative merging of the smallest slices. Finally, each slice is assigned a socket for data transfer. Splitting or merging of slices is done only for the purpose of socket assignment and to speed up data transfer; it does not affect the final chunk configuration that was computed in the prepare phase.

Our approach above could have used estimates of available bandwidth instead of latency estimates. We chose the latter because (i) they can be measured with a lower overhead, (ii) they are more predictable over time, and (iii) they are correlated to the effective bandwidth. Evaluation


We used the data set of Amazon reviews as our default collection [110]. Each data item had 10 fields. We chose product ID as the old shard key and userID as the new shard key; update operations used these two fields and a price field. Our default database size was 1 GB. (We later show scalability with data size.)

The default Morphus cluster used 10 machines, which included one mongos (front end) and three replica sets, each containing a primary and two secondaries. There were three config servers, each of which was colocated on a physical machine with a replica set primary; this is an allowed MongoDB installation. Each physical machine was a d710 Emulab node [111] with a 2.4 GHz processor, four cores, 12 GB RAM, two hard disks of 250 GB and 500 GB, and 64-bit CentOS 5.5, connected to a 100 Mbps LAN switch.

We implemented a custom workload generator that injects YCSB-like workloads via MongoDB's pymongo interface. Our default injection rate was 100 ops/s with 40% reads, 40% updates, and 20% inserts. To model realistic key access patterns, we selected keys for each operation via one of three YCSB-like [112] distributions: (i) Uniform (default), (ii) Zipf, and (iii) Latest. For the Zipf and Latest distributions, we employed a shape parameter α = 1.5. The workload generator ran on a dedicated pc3000 node in Emulab that ran a 3 GHz processor, 2 GB RAM, two 146 GB SCSI disks, and 64-bit Ubuntu 12.04 LTS.

Morphus was implemented in about 4000 lines of C++ code, which is publicly available at Each plotted data point is an average of at least three experimental trials, shown along with standard deviation bars. Section outlined two algorithms for the shard key change reconfiguration: Hungarian and greedy. We implemented both in Morphus, and call them variants Morphus-H and Morphus-G, respectively.

While we present only selected experimental results in this chapter, we refer the reader to Ref. [102] for extensive information on experiments and evaluation of our system.

Effect on Reads and Writes

A key goal of Morphus is to ensure the availability of the database during reconfiguration. To evaluate its success, we generated read and write requests and measured their latency while a reconfiguration was in progress. We used Morphus-G with a chunk-based migration scheme. We ran separate experiments for all the key access distributions and also for a read-only workload.

Table 6.1 lists the percentages of read and write requests that succeeded during reconfiguration. The number of writes that failed is low: For the Uniform and Zipf workloads, fewer than 2% of writes failed. We observe that many of the failed writes occurred during one of the write-throttling periods. Recall from Section that the number of write-throttling periods is the same as the replica set size, with one throttle period at the end of each reconfiguration round. The Latest workload has a slightly higher failure rate, since if an attempt has been made to write a particular key, that increases the likelihood that in the near future there will be another attempt to write or read that same key. Still, the write failure rate of 3.2% and the read failure rate of 2.8% are reasonably low.

Table 6.1 Percentages of reads and writes that succeeded under reconfiguration.

Read Write
Read only 99.9
Uniform 99.9 98.5
Latest 97.2 96.8
Zipf 99.9 98.3

Overall, the availability numbers are higher, at 99% to 99.9% for Uniform and Zipf workloads, which is comparable to the numbers for a scenario with no insertions. We conclude that unless there is temporal and spatial (key-wise) correlation between writes and reads (i.e., Latest workloads), the read latency is not affected much by concurrent writes. When there is correlation, Morphus mildly reduces the offered availability.

Going further, we plot in Figure 6.3a the CDF of read latencies for the four settings, and for a situation in which there was no reconfiguration (Uniform workload). Note that the horizontal axis is logarithmic scale. We consider latencies only for successful reads. We observe that the 96th percentile latencies for all workloads are within a range of 2 ms. The median (50th percentile) latency for No Reconfiguration is 1.4 ms, and this median holds for both the Read only (No Write) and Uniform workloads. The medians for the Zipf and Latest workloads are lower, at 0.95 ms. This lowered latency has two causes: caching at the mongod servers for the frequently accessed keys, and, in the case of Latest, the lower percentage of successful reads. In Figure 6.3b, we plot the corresponding CDF for write latencies. The median for writes when there is no reconfiguration (Uniform workload) is similar to that of the other distributions.

Figure 6.3 CDF of (a) read and (b) write latency distribution for no reconfiguration (No Reconf) and three under-reconfiguration workloads.

We conclude that under reconfiguration, the read and write availability provided by Morphus is high (close to 99%), while latencies of successful writes degrade only mildly compared to those observed when there is no reconfiguration in progress.

Effect of Network Awareness

First, Figure 6.4a shows the length of the execution phase (when a 500 MB Amazon collection was used) for two hierarchical topologies and five migration strategies. The topologies were (i) homogeneous, in which nine servers were distributed evenly across three racks; and (ii) heterogeneous, in which three racks contained six, two, and one servers, respectively. The switches were Emulab pc3000 nodes and all links were 100 Mbps. The inter-rack and intra-rack latencies were 2 and 1 ms, respectively. The five strategies were (i) fixed sharing, with one socket assigned to each destination node; (ii) a chunk-based approach (see Section; (iii) Orchestra [21] with ; (iv) WFS with (see Section; and (v) WFS with .

Figure 6.4 Network-aware evaluation.

We observed that in the homogeneous clusters, the WFS strategy with was 30% faster than fixed sharing and 20% faster than the chunk-based strategy. Compared to Orchestra, which weights flow only by their data size, WFS with does 9% better, because it takes the network into account as well. Increasing K from 21 to 28 improves completion time in the homogeneous cluster, but causes degradation in the heterogeneous cluster. The reason is that a higher K results in more TCP connections, and at , this begins to cause congestion at the rack switch of six servers.

Second, Figure 6.4b shows that Morphus's network-aware WFS strategy has a shorter tail and finishes earlier. Network awareness lowers the median chunk finish time by around 20% in both the homogeneous and heterogeneous networks.

We conclude that the WFS strategy improves performance compared to existing approaches, and K should be chosen to be as high as possible without leading to congestion.

Large-Scale Experiment

In this experiment, we increased data and cluster size simultaneously such that the amount of data per replica set was constant. We ran this experiment on Google Cloud [113]. We used n1-standard-4 VMs, each with four virtual CPUs and 15 GB of memory. The disk capacity was 1 GB, and the VMs were running Debian 7. We generated a synthetic data set by randomly dispersing data items among new chunks. Morphus-H was used for reconfiguration with the WFS migration scheme, and K = the number of old chunks.

Figure 6.5 shows a sublinear increase in reconfiguration time as data size and cluster size increased. Note that the x-axis uses a log scale. In the execution phase, all replica sets communicated among themselves to migrate data. As the number of replica sets increased with cluster size, the total number of connections increased, leading to network congestion. Thus, the execution phase took longer.

Figure 6.5 Running Morphus-H with WFS (K = number of old chunks) for reconfiguring databases with sizes of 25, 50, and 100 GB running on clusters of 25 machines (8 replica sets * 3 + 1 mongos), 49 machines (16 replica sets), and 100 machines (33 replica sets).

The amount of data per replica set affects reconfiguration time superlinearly. On the other hand, cluster size has a sublinear impact. In this experiment, the latter dominated, as the amount of data per replica set was constant.

6.4.3 Parqua: Reconfigurations in Distributed Key-Value Stores

This section is based on Ref. [114], and we refer the reader to that publication for further details on design, implementation, and experiments.

In this section, we describe how to perform reconfigurations in ring-based key-value/NoSQL stores such as Cassandra [2], Riak [115], Dynamo [116], and Voldemort [117].

The techniques described for Morphus in Section 6.4.2 cannot be applied directly, for two reasons. First, ring-based systems place data strictly in a deterministic fashion around the ring (e.g., using consistent hashing), which determines which keys can be placed where. Thus, our optimal placement strategies from Morphus do not apply to ring-based systems. Second, unlike sharded systems (e.g., MongoDB), ring-based systems do not allow isolation of a set of servers for reconfiguration (a fact that Morphus leveraged). In sharded databases, each participating server exclusively owns a range of data (as master or slave). In ring-based stores, however, ranges of keys overlap across multiple servers in a chained manner (because a node and its successors on the ring are replicas), and this makes full isolation impossible.

That motivated us to build a new reconfiguration system oriented toward ring-based key-value/NoSQL stores. Our system, named Parqua,5 enables online and efficient reconfigurations in virtual ring-based key-value/NoSQL systems. Parqua suffers no overhead when the system is not undergoing reconfiguration. During reconfiguration, Parqua minimizes the impact on read and write latency by performing reconfiguration in the background while responding to reads and writes in the foreground. It keeps the availability of data high during the reconfiguration and migrates to the new reconfiguration at an atomic switch point. Parqua is fault-tolerant, and its performance improves as cluster size increases. We have integrated Parqua into Apache Cassandra. System Model

Parqua is applicable to any key-value/NoSQL store that satisfies the following assumptions. First, we assume a distributed key-value store that is fully decentralized, without the notion of a single master node or replica. Second, each node in the cluster must be able to deterministically choose the destination of the entries that are being moved because of the reconfiguration. This is necessary because there is no notion of a master in a fully decentralized distributed key-value store, and for each entry, all replicas should be preserved after the reconfiguration is finished. Third, we require the key-value store to utilize SSTable (Sorted String Table) to ensure that the entries persist permanently. An SSTable is essentially an immutable sorted list of entries stored on disk [98]. Fourth, each write operation accompanies a timestamp or a version number that can be used to resolve a conflict. Finally, we assume that the operations issued are idempotent. Therefore, supported operations are insert, update, and read operations, and nonidempotent operations such as counter-incrementing are not supported. System Design and Implementation

Parqua runs reconfiguration in four phases. A graphical overview of the Parqua phases is given in Figure 6.6. Next, we discuss these individual phases in detail.

Figure 6.6 Overview of Parqua phases. The gray solid lines represent internal entry transfers and the gray dashed lines are client requests. The phases progress from left to right.

Isolate Phase

In this phase, the initiator node, in which the reconfiguration command is run, creates a new (and empty) column family (i.e., database table), denoted by Reconfigured CF (column family). It does so using a schema derived from the Original CF, except it uses the desired key as the new primary key. The Reconfigured CF enables reconfiguration to happen in the background while the Original CF continues to serve reads and writes using the old reconfiguration. We also record the timestamp of the last operation before the Reconfigured CF is created so that all operations that arrive while the execute phase is running can be applied later in the recovery phase.

Execute Phase

The initiator node notifies all other nodes to start copying data from the Original CF to the Reconfigured CF. Read and write requests from clients continue to be served normally during this phase. At each node, Parqua iterates through all entries for which it is responsible and sends them to the appropriate new destination nodes. The destination node for an entry is determined by (i) hashing the new primary key-value on the hash ring, and (ii) using the replica number associated with the entry. Key-value pairs are transferred between corresponding nodes that have matching replica numbers in the old configuration and the new configuration.

For example, in the execute phase of Figure 6.6, the entry with the old primary key “1” and the new primary key “10” has a replica number of 1 at node A, 2 at B, and 3 at C. In this example, after the primary key is changed, the new position of the entry on the ring is between nodes C and D, where nodes D, E, and F are replica numbers 1, 2, and 3, respectively. Thus, in the execute phase, the said entry in node A is sent to node D, and similarly the entry in B is sent to E, and from C to F.

Commit Phase

After the execute phase, the Reconfigured CF has the new configuration, and the entries from the Original CF have been copied to the Reconfigured CF. Now, Parqua atomically swaps both the schema and the SSTables between the Original CF and the Reconfigured CF. The write requests are locked in this phase, while reads still continue to be served. To implement the SSTable swap, we leverage the fact that SSTables are maintained as files on disk, stored in a directory named after the column family. Therefore, we move SSTable files from one directory to another. This does not cause disk I/O, as we update the inodes only when moving files.

At the end of the commit phase, the write lock is released at each node. At this point, all client-facing requests are processed according to the new configuration. In our case, the new primary key is now in effect, and the read requests must use the new primary key.

Recovery Phase

During this phase, the system catches up with the recent writes that were not transferred to the Reconfigured CF in the execute phase. Read/write requests are processed normally, with the difference that until the recovery is done, the read requests may return stale results.6 At each node, Parqua iterates through the SSTables of the Original CF to recover the entries that were written during the reconfiguration. We limit the amount of disk accesses required for recovery by iterating only the SSTables that were created after the reconfiguration started. The iterated entries are routed to appropriate destinations in the same way as in the execute phase.

Since all writes in Cassandra carry a timestamp [118], Parqua can ensure that the recovery of an entry does not overshadow newer updates, thus guaranteeing the eventual consistency. Experimental Evaluation


We used the Yahoo! Cloud Service Benchmark (YCSB) [112] to generate the data set and used the Uniform, Zipfian, and Latest key access distributions to generate CRUD workloads. Our default database size was 10 GB in all experiments. The operations consisted of 40% reads, 40% updates, and 20% inserts. Again, we present only selected experimental results in this chapter, and refer the reader to Ref. [114] for extensive information on experiments and evaluation of our system.


In this experiment, we measured the availability of our system during reconfiguration, shown in Table 6.2. The slight degradation in availability was due to the rejection of writes in the commit phase. The total duration of the unavailability was only a few seconds, which is orders of magnitude better than the current state of the art.

Table 6.2 Percentages of reads and writes that succeeded during reconfiguration.

Read (%) Write (%)
Read only 99.17
Uniform 99.27 99.01
Latest 96.07 98.92
Zipfian 99.02 98.92

The lowest availability was observed for the Latest distribution. The reason is that YCSB does not wait for the database to acknowledge an insert of a key. Because of the temporal nature of the distribution, as keys are further read and updated, the operations fail because the inserts are still in progress.

Effect on Read Latency

Figure 6.7a shows the CDF of read latencies under various workloads while reconfiguration is being executed. As a baseline, we also plot the CDF of read latency when no reconfiguration is being run using the Uniform key access distribution. We plot the latencies of successful reads only.

Figure 6.7 Parqua experiments.

The median (50th percentile) latencies for the read-only workload and the baseline are similar because they both use Uniform distribution. Under reconfiguration, 20% of the reads take longer. With writes in the workload, the observed latencies for the Uniform curve are higher overall.

Compared to other workloads, Latest had the smallest median latency. Because of that workload's temporal nature, recently inserted keys were present in Memtables, which is a data structure maintained in memory. As a result, reads were faster than in other distributions that require disk accesses.

Overall, Parqua affects median read latency minimally across all the distributions. Our observations for write latency are similar. We refer the reader to our technical report for more detail in Ref. [119].


Next, we measured how well Parqua scales with (i) database size, (ii) cluster size, (iii) operation injection rate, and (iv) replication factor. For lack of space, we omit the plots for the last two experiments and refer the reader our technical report for them [119]. To evaluate our system's scalability, we measured the total reconfiguration times along with the time spent in each phase. We did not inject operations for the experiments presented next.

Figure 6.7b depicts the reconfiguration times as the database size is increased up to 30 GB. Since we used a replication factor (number of copies of the same entry across the cluster) of 3 for fault tolerance, 30 GB of data implies 90 GB of total data in the database. In this plot, we observe that the total reconfiguration time scales linearly with database size. The bulk of the reconfiguration time is spent in transferring data in the execute phase.

In Figure 6.7c, we observe that the reconfiguration time decreases as the number of Cassandra peers increases. The decrease occurs because as the number of machines increases, the same amount of data, divided into smaller chunks, gets transferred by a larger number of peers. Again, the execute phase dominated the reconfiguration time.

6.5 Scale-Out and Scale-In Operations

Next, we describe a transparent way to scale-out and scale-in cloud computing applications. Scaling-out means increasing the number of machines (or VMs) running the application, and scaling-in means reducing the number. First, we tackle distributed stream processing systems in Section 6.5.1, where we describe our Stela system, which supports scale-out/scale-in and is implemented in Apache Storm [4]. Then, in Section 6.5.2, we address distributed graph processing systems, describing how to support scale-out/scale-in; our implementation is in LFGraph.

6.5.1 Stela: Scale-Out/Scale-In in Distributed Stream Processing Systems

This section is based on Ref. [120], and we refer the reader to that paper for further details on design, implementation, and experiments. Motivation

The volume and velocity of real-time data require frameworks that can process large dynamic streams of data on the fly and serve results with high throughput. To meet this demand, several new stream processing engines have recently been developed that are now widely in use in industry, for example, Storm [4], System S [33], and Spark Streaming [121], among others [25,27,31]. Apache Storm is one of the most popular.

Unfortunately, these new stream processing systems used in industry lack an ability to scale the number of servers seamlessly and efficiently in an on-demand manner. On-demand means that the scaling is performed when the user (or some adaptive program) requests an increase or decrease in the number of servers in the application. Today, Storm supports an on-demand scaling request by simply unassigning all processing operators and then reassigning them in a round-robin fashion to the new set of machines. This approach is not seamless, as it interrupts the ongoing computation for a long time. It is not efficient either, as it results in suboptimal throughput after the scaling is completed (as our experiments showed, as we will discuss later).

Scaling-out and scaling-in are critical tools for customers. For instance, a user might start running a stream processing application with a given number of servers, but if the incoming data rate rises or if there is a need to increase the processing throughput, the user may wish to add a few more servers (scale-out) to the stream processing application. On the other hand, if the application is currently underutilizing servers, then the user may want to remove some servers (scale-in) in order to reduce the dollar cost (e.g., if the servers are VMs in AWS [122]).

On-demand scaling operations should meet two goals: (i) The post-scaling throughput (tuples per second) should be optimized, and (ii) the interruption to the ongoing computation (while the scaling operation is being carried out) should be minimized. We have created a new system, named Stela (STream processing ELAsticity), that meets those two goals. For scale-out, Stela carefully selects which operators (inside the application) are given more resources, and does so with minimal intrusion. Similarly, for scale-in, Stela carefully selects which machine(s) to remove in a way that minimizes the overall detriment to the application's performance.

To select the best operators to give more resources when scaling out, Stela uses a new metric called ETP (effective throughput percentage). The key intuition behind ETP is that it is used to capture those operators (e.g., bolts and spouts in Storm) that both (i) are congested (i.e., are being overburdened with incoming tuples), and (ii) affect throughput the most because they reach a large number of sink operators. For scale-in, we also use an ETP-based approach to decide which machine(s) to remove and where to migrate operator(s).

The ETP metric is both hardware- and application-agnostic. Thus, Stela needs neither hardware profiling (which can be intrusive and inaccurate) nor knowledge of application code.

The design of Stela is generic to any data flow system (see Section For concreteness, we integrated Stela into Apache Storm. We compare Stela against the most closely related elasticity techniques in the literature [37]. We generated experimental results by using microbenchmark Storm applications, as well as production applications from industry (Yahoo! Inc. and IBM [29]). We believe our metric can be applied to other systems as well.

Our main contributions in this are (i) development of the novel metric, ETP, that captures the “importance” of an operator; (ii) to the best of our knowledge, the first description and implementation of on-demand elasticity within Storm; and (iii) the evaluation of our system both on microbenchmark applications and on applications used in production. Data Stream Processing Model and Assumptions

We target distributed data stream processing systems that represent each application as a directed acyclic graph (DAG) of operators. An operator is a user-defined logical processing unit that receives one or more streams of tuples, processes each tuple, and outputs one or more streams of tuples. We assume operators are stateless, and that tuple sizes and processing rates follow an ergodic distribution. These assumptions hold true for most Storm topologies used in industry. Operators that have no parents are sources of data injection. They may read from a Web crawler. Operators with no children are sinks. The intermediate operators perform processing of tuples. Each sink outputs data to a GUI or database, and the application throughput is the sum of the throughputs of all sinks in the application. An application may have multiple sources and sinks.

An instance of an operator is an instantiation of the operator's processing logic and is the physical entity that executes the operator's logic. The number of instances is correlated with the operator's parallelism level. For example, in Storm, these instances are called “executors” [4]. Stela: Scale-Out Overview

In this section, we give an overview of how Stela supports scale-out. When a user requests a scale-out with a given number of new machines, Stela needs to choose the operators to which it will give more resources by increasing their parallelism.

Stela first identifies operators that are congested based on their input and output rates. It identifies all congested operators in the graph by continuously sampling the input rate and processing rate of each operator. When the ratio of input to processing exceeds a threshold CongestionRate, we consider that operator to be congested. The CongestionRate parameter can be tuned as needed and controls the sensitivity of the algorithm. For our Stela experiments, we set CongestionRate to be 1.2.

After an operator is identified as congested, Stela calculates a per-operator metric called the ETP. ETP takes the topology into account: It captures the percentage of total application throughput (across all sinks) on which the operator has direct impact but ignores all down-stream paths in the topology that are already congested; it selects the next operator to increase its parallelism, and iterates this process. To ensure load balance, the total number of such iterations equals the number of new machines added times the average number of instances per machine prescale. We determine the number of instances to allocate a new machine with Ninstances = (total number of instances)/(number of machines); in other words, Ninstances is the average number of instances per machine prior to scale-out. This ensures load balance post-scale-out. The schedule of operators on existing machines is left unchanged. Effective Throughput Percentage (ETP)

Effective Throughput Percentage

To estimate the impact of each operator on the application throughput, Stela uses a new metric we developed called ETP. An operator's ETP is defined as the percentage of the final throughput that would be affected if the operator's processing speed were changed.

The ETP of an operator o is computed as

Here, ThroughputEffectiveReachableSinks denotes the sum of the throughputs of all sinks reachable from o by at least one uncongested path, that is, a path consisting only of operators that are not classified as congested. Throughputworkflow denotes the sum of the throughputs of all sink operators of the entire application. The algorithm that calculates ETPs does a depth-first search throughout the application DAG and calculates ETPs via a postorder traversal. ProcessingRateMap stores the processing rates of all operators. Note that if an operator o has multiple parents, then the effect of o's ETP is the same at each of its parents (i.e., it is replicated, not split).

ETP Calculation Example and Intuition

In Figure 6.8, we illustrate the ETP calculation with an example application.7 The processing rate of each operator is shown. The congested operators, that is, operators 1, 3, 4, and 6, are shaded. The total throughput of the workflow is calculated with Throughputworkflow = 5000 tuples/s as the sum of throughputs of sink operators 4, 7, 8, 9, and 10.

Figure 6.8 A sliver of a stream processing application. Each operator is labeled with its input/execution speed. Shaded operators are congested. CongestionRate = 1.

Let us calculate the ETP of operator 3. Its reachable sink operators are 7, 8, 9, and 10. Of these, only 7 and 8 are considered to be the “effectively” reachable sink operators, as they are both reachable via an uncongested path. Thus, increasing the speed of operator 3 will improve the throughput of operators 7 and 8. However, operator 6 is not effectively reachable from operator 3, because operator 6 is already congested; thus, increasing operator 3's resources will only increase operator 6's input rate and make operator 6 further congested, without improving its processing rate. Thus, we ignore the subtree of operator 6 when calculating 3's ETP. The ETP of operator 3 is ETP3 = (1000 + 1000)/5000 = 40%.

Similarly, for operator 1, the sink operators 4, 7, 8, 9, and 10 are reachable, but none of them are reachable via an uncongested path. Thus, the ETP of operator 1 is 0. Likewise, we can calculate the ETP of operator 4 as 40% and the ETP of operator 6 as 20%. Therefore, the priority order in which Stela will assign resources to these operators is 3, 4, 6, 1. Iterative Assignment and Intuition

During each iteration, Stela calculates the ETP for all congested operators. Stela targets the operator with the highest ETP and increases the parallelism of that operator by assigning a new instance of it at the newly added machine. If multiple machines are being added, then the target machine is chosen in a round-robin manner. Overall, this algorithm runs Ninstances iterations to select Ninstances target operators. (Section showed how to calculate Ninstances.)

In each iteration, Stela constructs a CongestedMap to store all congested operators. If there are no congested operators in the application, Stela chooses a source operator as a target in order to increase the input rate of the entire application. If congested operators do exist, for each one, Stela finds its ETP using the algorithm discussed in Section The result is sorted in ETPMap. Stela chooses the operator that has the highest ETP value from ETPMap as a target for the current iteration. It increases the parallelism of this operator by assigning one additional random instance to it on one of the new machines in a round-robin manner.

For the next iteration, Stela estimates the processing rate of the previously targeted operator o proportionally, that is, if the o previously had an output rate E and k instances, then o's new projected processing rate is . This is a reasonable approach since all machines have the same number of instances and thus proportionality holds. (This approach may not be accurate, but we find that it works in practice.) Then Stela uses the projected processing rate to update the output rate for o, and the input rates for o's children. (The processing rates of o's children, and indeed o's grand-descendants, do not need updates, as their resources remain unchanged.) Stela updates the emit rate of the target operator in the same manner to ensure that the estimated operator submission rate can be applied.

Once it has done that, Stela recalculates the ETPs of all operators again using the same algorithm. We call these new ETPs projected ETPs, or PETPs, because they are based on estimates. The PETPs are used as ETPs for the next iteration. Iterations are performed until all available instance slots at the new machines are filled. Once that procedure is complete, the schedule is committed through starting of the appropriate executors on new instances.

The algorithm involves searching for all reachable sinks for every congested operator; as a result, each iteration of Stela has a running time complexity of O(n2), where n is the number of operators in the workflow. The entire algorithm has a running time complexity of , where m is the number of new instance slots at the new workers. Stela: Scale-In

For scale-in, we assume that the user specifies only the number of machines to be removed and Stela picks the “best” machines from the cluster to remove. (If the user specifies the exact machines to remove, the problem is no longer challenging.) We describe how techniques used for scale-out, particularly the ETP metric, can also be used for scale-in. For scale-in, we will calculate the ETP not merely per operator but instead per machine in the cluster. That is, we first calculate the ETPSum for each machine, as follows:

The scale-in procedure is called iteratively, as many times as the number of machines the user asked Stela to remove. The procedure calculates the ETPSum for every machine in the cluster and puts the machine and its corresponding ETPSum into the ETPMachineMap. The ETPSum for a machine is the sum of all the ETPs of instances of all operators that currently reside on the machine. Thus, for every instance τi, we first find the operator of which τi is an instance (e.g., operatoro), and then find the ETP of that operatoro. Then, we sum all of these ETPs. The ETPSum of a machine is thus an indication of how much the instances executing on that machine contribute to the overall throughput.

The ETPMachineMap is sorted by increasing order of ETPSum values. The machine with the lowest ETPSum will be the target machine to be removed in this round of scale-in. Operator migration to machines with lower ETPSums will have less of an effect on the overall performance, since machines with lower ETPSums contribute less to the overall performance. This approach also helps shorten the amount of downtime the application experiences because of the rescheduling. Operators from the machine that is chosen to be removed are reassigned to the remaining machines in the cluster in a round-robin fashion in increasing order of their ETPSums.

After the schedule is created, Stela commits it by migrating operators from the selected machines, and then releases these machines. The scale-in algorithm involves sorting of ETPSum, which results in a running time complexity of O(nlog(n)). Core Architecture

Stela runs as a custom scheduler in a Java class that implements a predefined IScheduler interface in Storm. A user can specify which scheduler to use in a YAML-formatted configuration file called storm.yaml. Our scheduler runs as part of the Storm Nimbus daemon. The architecture of Stela's implementation in Storm is presented in Figure 6.9. It consists of the following three modules:

  • StatisticServer: This module is responsible for collecting statistics in the Storm cluster, for example, the throughput at each task, at each bolt, and for the topology. These data are used as input to the congestion detection.
  • GlobalState: This module stores important state information regarding the scheduling and performance of a Storm cluster. It holds information about where each task is placed in the cluster. It also stores statistics such as sampled throughputs of incoming and outgoing traffic for each bolt for a specific duration, and these statistics are used to identify congested operators, as mentioned in Section
  • ElasticityScheduler: This module is the custom scheduler that implements the IScheduler interface. This class starts the StatisticServer and GlobalState modules and invokes the Strategy module when needed.
    • Strategy: Contained inside ElasticityScheduler, this provides an interface for implementation of scale-out strategies so that different strategies can be easily swapped in and out for evaluation purposes. This module calculates a new schedule based on the scale-in or scale-out strategy in use and uses information from the Statistics and GlobalState modules. The core Stela policy (Sections and and alternative strategies [37] are implemented here.

Figure 6.9 Stela architecture.

When a scale-in or scale-out signal is sent by the user to the ElasticityScheduler, a procedure is invoked that detects newly joined machines based on previous membership. The ElasticityScheduler invokes the Strategy module, which calculates the entire new scheduling; for example, for scale-out, it decides on all the newly created executors that need to be assigned to newly joined machines. The new scheduling is then returned to the ElasticityScheduler, which atomically (at the commit point) changes the current scheduling in the cluster. Computation is thereafter resumed.

Fault Tolerance

When no scaling is occurring, failures are handled the same way as in Storm, that is, Stela inherits Storm's fault tolerance. If a failure occurs during a scaling operation, Stela's scaling will need to be aborted and restarted. If the scaling is already committed, failures are handled as in Storm. Evaluation

Our evaluation is two-pronged, and includes use of both microbenchmark topologies and real topologies (including two from Yahoo!). We adopted this approach because of the absence of standard benchmark suites (e.g., TPC-H or YCSB) for stream processing systems. Our microbenchmarks include small topologies such as star, linear, and diamond, because we believe that most realistic topologies will be a combination of these. We also use two topologies from Yahoo! Inc., which we call the Page Load topology and Processing topology, as well as a Network Monitoring topology [29]. In addition, we present a comparison among Stela, the Link Load Strategy [37], and Storm's default scheduler (which is state of the art). We present only selected experimental results in this chapter and refer the reader to Ref. [120] for extensive information on the experiments and evaluation of our system. Experimental Setup

For our evaluation, we used two types of machines from the Emulab [111] test bed to perform our experiments. Our typical Emulab setup consisted of a number of machines running Ubuntu 12.04 LTS images, connected via a 100 Mpbs VLAN. A type 1 machine had one 3 GHz processor, 2 GB of memory, and 10,000 RPM 146 GB SCSI disks. A type 2 machine had one 2.4 GHz quad-core processor, 12 GB of memory, and 750 GB SATA disks. The settings for all topologies tested are listed in Table 6.3. For each topology, the same scaling operations were applied to all strategies.

Table 6.3 Experiment settings and configurations.

Topology type Number of tasks per component Initial number of executors per component Number of worker processes Initial cluster size Cluster size after scaling Machine type
Page Load 8 4 28 7 8 1
Network 8 4 32 8 9 2
Page Load Scale-in 15 15 32 8 4 1 Yahoo! Storm Topologies and Network Monitoring Topology

We obtained the layout of a topology in use at Yahoo! Inc. We refer to this topology as the Page Load topology (which is not its original name). The layout of the Page Load topology is displayed in Figure 6.10a and the layout of the Network Monitoring topology, which we derived from Ref. [29], is displayed in Figure 6.10b.

Figure 6.10 Yahoo! topology and a Network Monitoring topology derived from Ref. [29].

We examine the performance of three scale-out strategies: default, Link-based [37], and Stela. The throughput results are shown in Figure 6.11. Recall that link-load-based strategies reduce the network latency of the workflow by colocating communicating tasks on the same machine.

Figure 6.11 Scale-out: Throughput behavior for Yahoo! topologies and Network Monitoring topology (Window size of 10 s).

From Figure 6.11, we observe that Stela improves the throughput by 80% after a scale-out of the Page Load topology. In comparison, the Least Link Load strategy barely improves the throughput after a scale-out, because migration of tasks that are not resource-constrained will not significantly improve performance. The default scheduler actually decreases the throughput after the scale-out, since it simply unassigns all executors and reassigns them in a round-robin fashion to all machines, including the new ones. That may cause machines with “heavier” bolts to be overloaded, thus creating newer bottlenecks that damage performance, especially for topologies with a linear structure. In comparison, Stela's postscaling throughput is about 80% better than Storm's for both the Page Load and Network Monitoring topologies, indicating that Stela is able to find the most congested bolts and paths and give them more resources.

In addition to the Page Load and Network Monitoring topologies, we also looked at a published application from IBM [29], and we wrote from scratch a similar Storm topology (shown in Fig. 6.10b). Because we increased the cluster size from 8 to 9, our experimental result (Figure 6.11b) shows that Stela improves the throughput by 21% by choosing to parallelize the congested operator closest to the sink. The Storm default scheduler does not improve postscale throughput, and the Least Link Load strategy decreases system throughput. Convergence Time

We measured interruptions to ongoing computation by measuring the convergence time. A convergence time is the duration of time between the start of a scale-out operation and the stabilization of the overall throughput of the Storm topology. More specifically, the convergence time duration stopping criteria are (i) the throughput oscillates twice above and twice below the average of post-scale-out throughput, and (ii) the oscillation is within a small standard deviation of 5%. Thus, a lower convergence time means that the system is less intrusive during the scale-out operation, and it can resume meaningful work earlier.

Figure 6.12a shows the convergence time for the Yahoo! topology. We observe that Stela is far less intrusive than Storm (with an 88% lower convergence time) when scaling out. The main reason why Stela has a better convergence time than either Storm's default scheduler or the Least Link Load strategy [37] is that Stela (unlike the other two) does not change the current scheduling at existing machines, instead choosing to schedule operators at the new machines only.

Figure 6.12 Scale-out: Convergence time comparison (in seconds).

In the Network Monitoring topology, Stela experiences longer convergence times than Storm's default scheduler and the Least Link Load strategy, because of re-parallelization during the scale-out operation (Figure 6.12b). On the other hand, Stela provides the benefit of higher post-scale throughput, as shown in Figure 6.11b. Scale-In Experiments

We examined the performance of Stela scale-in by running Yahoo's Page Load topology. The initial cluster size was 8, and Figure 6.13a shows how the throughput changed after the cluster size shrank to four machines. (We initialized the operator allocation so that each machine could be occupied by tasks from fewer than two operators (bolts and spouts).) We compared the performance against that of a round-robin scheduler (the same as Storm's default scheduler), using two alternative groups of randomly selected machines.

Figure 6.13 Scale-in experiments. (Window size is 10 s).

We observe that Stela preserved throughput after scale-in, while the two Storm groups experienced 80% and 40% decreases in throughput, respectively. Thus, Stela's post-scale-in throughput is 25× higher than that obtained when the machines to remove are chosen randomly. Stela also achieved 87.5% and 75% less downtime (time during which the throughput is zero) than group 1 and group 2, respectively (see Fig. 6.13b). The main reason is that Stela's migration of operators with low ETPs will intrude less on the application, which will allow downstream congested components to digest tuples in their queues and continue producing output. In the Page Load topology, the two machines with the lowest ETPs were chosen for redistribution by Stela, and that resulted in less intrusion for the application and, thus, significantly better performance than Storm's default scheduler.

Therefore, Stela is intelligent at picking the best machines to remove (via ETPSum). In comparison, Storm has to be lucky. In the above scenario, two out of the eight machines were the “best.” The probability that Storm would have been lucky enough to pick both (when it picks 4 at random) = , which is low.

6.5.2 Scale-Out/Scale-In in Distributed Graph Processing Systems

This section is based on Ref. [123], and we refer the reader to that publication for further details on design, implementation, and experiments. Motivation

Large graphs are increasingly common; examples include online social networks such as Twitter and Facebook, Web graphs, Internet graphs, biological networks, and many others. Processing and storing these graphs in a single machine is not feasible. Google's Pregel [124] and GraphLab [125] were the first attempts at processing these graphs in a distributed way. Subsequently, the research community has developed more efficient engines that adopt the vertex-centric approach for graph processing, such as LFGraph [126], PowerGraph [48], and GPS [52].

Today's graph processing frameworks operate on statically allocated resources; the user must decide on resource requirements before an application starts. However, partway through computation, the user may wish to scale-out (e.g., to speed up computation) or scale-in (e.g., to reduce hourly costs). The capability to scale-out/scale-in when required by the user is called on-demand elasticity. Alternatively, an adaptive policy may request scale-out or scale-in.8 Such a concept has been explored for data centers [38,39], cloud systems [40–42], storage systems [13,43–45,127], and data processing frameworks such as Hadoop [46,47] and Storm [37]. However, on-demand elasticity remains relatively unexplored in batch-distributed graph processing systems.

Partitioning techniques have been proposed to optimize computation and communication [48], but they partition the entire graph across servers and are thus applicable only at the start of the graph computation. On-demand elasticity requires an incremental approach to (re-)partitioning vertices on demand. Solving the problem of on-demand elasticity is also the first step toward adaptive elasticity (e.g., satisfying an SLA in a graph computation), for which our techniques may be employed as black boxes.

A distributed graph processing system that supports on-demand scale-out/scale-in must overcome three challenges:

  1. The need to perform scale-out/scale-in without interrupting graph computation. A scale-out/scale-in operation requires a reassignment of vertices among servers. During scale-out, new servers must obtain some vertices (and their values) from existing servers. Similarly, during scale-in, vertices from the departing servers must be reassigned to the remaining servers. These transfers must be done while minimally affecting ongoing computation times.
  2. The need to minimize the background network overhead involved in the scale-out/scale-in. To reduce the impact of the vertex transfer on computation time, we wish to minimize the total amount of vertex data transferred during scale-out/scale-in.
  3. The need to mitigate stragglers by maintaining load balance across servers. Graph processing proceeds in iterations, and stragglers will slow an entire iteration down. Thus, while reassigning vertices at the scale-out/scale-in point, we aim to achieve load balance in order to mitigate stragglers and keep computation time low.

Our approach to solving the problem of on-demand elasticity and overcoming the above challenges is motivated by two critical questions:

  1. What should be migrated, and how? Which vertices from which servers should be migrated in order to reduce the network transfer volume and maintain load balance?
  2. When should they be migrated? At what points during computation should migration begin and end?

To answer the first question, we created and analyzed two techniques. The first, called contiguous vertex repartitioning (CVR), achieves load balance across servers. However, it may result in high overhead during the scale-out/scale-in operation. Thus, we developed a second technique, called ring-based vertex repartitioning (RVR), that relies on ring-based hashing to lower the overhead. To address the second question, of when to migrate, we integrated our techniques into the LFGraph graph processing system [126], and used our implementation to carefully decide when to begin and end background migration, and when to migrate static versus dynamic data. We also use our implementation to explore system optimizations that make migration more efficient.

We performed experiments with multiple graph benchmark applications on a real Twitter graph with 41.65 million vertices and 1.47 billion edges. Our results indicate that our techniques are within 9% of an optimal mechanism for scale-out operations and within 21% for scale-in operations. What to Migrate, and How?

In this section, we address the question of which vertices to migrate when the user requests a scale-out/scale-in operation.

Contiguous Vertex Repartitioning (CVR)

Our first technique assumes that the hashed vertex space is divided into as many partitions as there are servers, and each server is assigned one partition. Partitions are equisized in order to accomplish load balancing. The top of Figure 6.14a shows an example graph containing 100 vertices, split across 4 servers. The vertex sequence (i.e., Vi) is random but consistent due to our use of consistent hashing and is split into four equisized partitions, which are then assigned to servers S1S4 sequentially.

Figure 6.14 Scale-out from four (top) to five (bottom) servers using CVR. Fewer vertices are transferred in the optimal partition assignment (30 versus 50).

Upon a scale-out/scale-in operation, the key problem we need to solve is, how do we assign the (new) equisized partitions to servers (one partition per server), such that network traffic volume is minimized? For instance, the bottom of Figure 6.14 shows the problem when scaling out from four to five servers. To solve this problem, we now (i) show how to reduce the problem to one of graph matching, and (ii) propose an efficient heuristic.

When we scale-out/scale-in, we repartition the vertex sequence into equisized partitions. Assigning these new partitions in an arbitrary fashion to servers may be suboptimal and involve transfer of large amounts of vertex data across the network. For instance, in the bottom of Figure 6.14a, we scale-out by adding one server, resulting in five new partitions. Merely adding the new server to the end of the server sequence and assigning partitions to servers in that order results in movement of 50 total vertices. On the other hand, Figure 6.14b shows the optimal solution for this example, wherein adding the new server in the middle of the partition sequence results in movement of only 30 vertices.

To achieve the optimal solution, we consider the scale-out problem formally. (The solution for scale-in is analogous and excluded for brevity.) Let the cluster initially have N servers S1,…, SN. With a graph of V vertices, the initial size of each partition is , where 1 ≤ i ≤ N. Each jth vertex ID is hashed, and then the resulting value is used to assign the vertex to partition where . If we add k servers to this cluster, the size of each new partition becomes . We label these new partitions , 1 ≤ i ≤ N + k, and assign each jth vertex, as usual, to a new partition by first hashing the vertex ID and using the resulting hash to partition where .

Next, we create a bipartite graph B, which contains (i) a left set of vertices, with one vertex per new partition , and (ii) a right set of vertices, with one vertex per server . The left and right sets each contain (N + k) vertices. The result is a complete bipartite graph, with the edge joining a partition and a server associated with a cost. The cost is equal to the number of vertices that must be transferred over the network if partition is assigned to server after scale-out. In other words, the cost is equal to .

The problem of minimizing network transfer volume now reduces to that of finding a minimum-cost perfect matching in B. This is a well-studied problem, and an optimal solution can be obtained by using the Hungarian algorithm [128]. However, the Hungarian algorithm has O(N3) complexity [129], which may be prohibitive for large clusters.

As a result, we propose a greedy algorithm that iterates sequentially through S1,…, SN, in that order. 9 For each server , the algorithm considers the new partitions with which it has a nonzero overlap; because of the contiguity of partitions, there are only O(1) such partitions. Among these partitions, the one with the largest number of overlapping vertices with is assigned to server . Because of the linear order of traversal, when is considered, is guaranteed to have at least one (overlapping) candidate position. This makes the greedy algorithm run efficiently in O(N). For example, in Figure 6.14b, to determine the new partition for , we need to consider only two partitions, and ; next, we need to consider partitions and , and so on.

Ring-Based Vertex Repartitioning (RVR)

In this technique, we assume an underlying hash-based partitioning that leverages Chord-style consistent hashing [127]. To maintain load balance, servers are not hashed directly to the ring; instead (as in Cassandra [2] and Riak [115]), we assume that each server is assigned an equisized segment of the ring. Specifically, a server with ID ni is responsible for vertices hashed in the interval , where is ni's predecessor.

Under that assumption, performing a scale-out/scale-in operation is straightforward: a joining server splits a segment with its successor, while a leaving server gives up its segment to its successor. For instance, in a scale-out operation involving one server, the affected server receives its set of vertices from its successor in the ring, that is, a server ni takes the set of vertices from its successor . Scale-in operations occur symmetrically: a leaving server ni migrates its vertex set to its successor , which is then responsible for the set of vertices in .

More generally, we can state that a scale-out/scale-in operation that involves simultaneous addition or removal of k servers affects at most k existing servers. If some of the joining or leaving servers have segments that are adjacent, the number of servers affected would be smaller than k.

While the technique is minimally invasive to existing servers and the ongoing graph computation, it may result in load imbalance. We can mitigate load imbalance for the scale-out case by choosing intelligently the point on the ring to which the new server(s) should be added. For the scale-in case, we can intelligently decide which server(s) to remove.

Consider a cluster with N servers, each with vertices. If we use CVR to add m × N servers or remove servers (for m ≥ 1), then the optimal position of servers to be added or removed is same as their position with RVR. When to Migrate?

Given the knowledge of which vertices must be migrated and to where, we must now decide when to migrate them in a way that minimizes interference with normal execution. Two types of data need to be migrated between servers: (i) static data, including sets of vertex IDs, neighboring vertex IDs, and edge values to neighbors, and (ii) dynamic data, including the latest values of vertices and latest values of neighbors. Static data correspond to graph partitions, while dynamic data represent computation state. Once this migration is complete, the cluster can switch to the new partition assignment.

Executing Migration

LFGraph uses a publish–subscribe mechanism. Before the iterations start, each server subscribes to in-neighbors of the vertices hosted by the server. Based on these subscriptions, each server builds a publish list for every other server in the cluster. After each iteration, servers send updated values of the vertices present in the publish lists to the respective servers. After a scale-out/scale-in operation, we perform the publish–subscribe phase again to update the publish lists of servers.

First-Cut Migration

A first-cut approach is to perform migration of both static and dynamic data during the next available barrier synchronization interval. However, when we implemented this approach, we found that it added significant overheads by prolonging that iteration. As a result, we introduced two further optimizations as follows:

Static Data Migration

This technique is based on the observation that static data can be migrated in the background while computation is going on. Recall that static data consists of vertex IDs, their neighboring vertex IDs, and edge values to neighbors. Only dynamic data (vertex values and neighboring vertex values) need to wait to be migrated during a barrier synchronization interval (i.e., after such data are last updated). This reduces the overhead on that iteration.

Dynamic Data Migration

LFGraph has two barrier synchronization intervals. One interval is between the gather and scatter phases, and the other is after the scatter phase. That gives us two options for the transfer of dynamic data. We choose to perform dynamic data transfer and cluster reconfiguration in the barrier synchronization interval between the gather and scatter phases. This enables us to leverage the default scatter phase to migrate neighboring vertex values. The scatter phase simply considers the new set of servers in the cluster while distributing updated vertex values. This optimization further reduces the overhead on the iteration.

A scale-out/scale-in operation that starts in iteration i ends in iteration i + 2. Background static data migration occurs in iterations i and i + 1, while vertex value migration occurs after the gather phase of iteration i + 2. At that point, computation continues on the new set of servers. The performance impact due to background data migration is greater in iteration i than in iteration i + 1, that is, iteration times are longer in iteration i. The reason is that a majority of the migration happens in iteration i. In iteration i + 1 servers build their new subscription lists for the publish–subscribe phase.

To explain further, we will describe the steps involved in a scale-out as follows: (i) The joining server sends a Join message containing its IP address and port to the barrier server at the start of iteration i. (ii) The barrier server responds with a Cluster Info message assigning the joining server an ID and the contact information of the servers from which it should request its vertices. (iii) In addition, the barrier server sends an Add Host message to all servers, informing them about the new server in the cluster. (iv) The joining server requests its vertices with a Vertex Request message. (v) After receiving its vertices, it informs the barrier server with a Ready message that it can join the cluster. Concurrently, the servers start updating their subscription lists to reflect the modifications in the cluster servers. (vi) The barrier server sends a Reconfigure message to the servers in the synchronization interval after the gather phase of iteration i + 2. (vii) Upon receiving the Reconfigure message, joining servers request the vertex values with a Vertex Value Request message. In addition, all servers update their vertex-to-server mapping to reflect newly added servers. (viii) The scatter phase of iteration i + 2 executes with this new mapping. From then on, computation proceeds on the new set of servers.

Role of Barrier Server

In our repartitioning techniques, the barrier server accepts join and leave requests and determines an optimal partition assignment. We adopted this approach, instead of a fully decentralized reassignment, for two reasons: (i) fully decentralized reassignment may lead to complex race conditions, and (ii) the barrier server, once initialized, has the capability to obtain per-server iteration run times via the barrier synchronization messages and assigns new servers to alleviate the load on the busiest servers. Evaluation

In this section, we describe our experimental evaluation of the efficiency and overhead of our elasticity techniques. We present only selected experimental results in this chapter and refer the reader to Ref. [123] for extensive information on experiments and evaluation of our system.

Experimental Setup

We performed our experiments with both our CVR and RVR techniques on virtual instances, each with 16 GB RAM and 8 VCPUs. We used a Twitter graph [130] containing 41.65 million vertices and 1.47 billion edges. (With larger graphs, we expect similar performance improvements.) We evaluated our techniques using five graph benchmarks: PageRank, single-source shortest paths (SSSP), connected components, k-means clustering, and multiple-source shortest paths (MSSP).

Scale-Out and Scale-In

Our first set of experiments measured the overhead experienced by the computation because of a scale-out operation. Figure 6.15 illustrates two experiments in which a scale-out from X servers to 2X servers (for X ∈ {5, 10, 15}) was performed, with the scale-out starting at iteration i = 1 and ending at iteration 3. The vertical axis plots the per-iteration run time. For comparison, we plot the per-iteration times for a run with X servers throughout, and a run with 2X servers throughout.

Figure 6.15 Per-iteration execution time with scale-out at iterations i = 1 to i = 3, for different repartitioning strategies and cluster sizes.

In Figure 6.15ac, we can observe that (i) both CVR and RVR appear to perform similarly, and (ii) after the scale-out operation is completed, the performance of the scaled-out system converges to that of a cluster with 2X servers, demonstrating that our approaches converge to the desired throughput after scale-out.

Similarly, Figure 6.16 shows the plots for scale-in from 2X servers to X servers (for X ∈ {5, 10, 15}). Once again, the cluster converges to the performance of X servers.

Figure 6.16 Per-iteration execution time with scale-out at iterations i = 1 to i = 3, for different repartitioning strategies and cluster sizes.

6.6 Priorities and Deadlines in Batch Processing Systems

6.6.1 Natjam: Supporting Priorities and Deadlines in Hadoop

This section is based on Ref. [131], and we encourage the reader to refer to that publication for further details on design, implementation, and experiments. Motivation

Today, computation clusters running engines such as Apache Hadoop [3,132], DryadLINQ [87], DOT [133], Hive [134], and Pig Latin [135] are used to process a variety of big data sets. The batch MapReduce jobs in these clusters have priority levels or deadlines. For instance, a job with a high priority (or short deadline) may be one that processes click-through logs and differentiates ads that have reached their advertiser targets from ads that it would be good to display. For such jobs, it is critical to produce timely results, since they directly affect revenue. On the other hand, a lower priority (or long-deadline) job may, for instance, identify more lucrative ad placement patterns via a machine learning algorithm on long-term historical click data. Such jobs affect revenue indirectly and therefore need to complete soon, but they must be treated as lower priority.

The most common use case is a dual-priority setting, with only two priority levels: high-priority jobs and low-priority jobs. We call the high-priority jobs production jobs and the low-priority ones research jobs.10 A popular approach among organizations is to provision two physically separate clusters: one for production jobs and one for research jobs. Administrators tightly restrict the workloads allowed on the production cluster, perform admission control manually based on deadlines, keep track of deadline violations via alert systems such as pagers, and subsequently readjust job and cluster parameters manually.

In addition to requiring intensive human involvement, the above approach suffers from (i) long job completion times, and (ii) inefficient resource utilization. For instance, jobs in an overloaded production cluster might take longer, even though the research cluster is underutilized (and vice versa). In fact, MapReduce cluster workloads are time-varying and unpredictable, for example, in the Yahoo! Hadoop traces we used in the work described here, hourly job arrival rates exhibited a max–min ratio as high as 30. Thus, there are times when the cluster is resource-constrained, that is, it has insufficient resources to meet incoming demand. Since physically separate clusters cannot reclaim resources from each other, the infrastructure's overall resource utilization stays suboptimal.

The goals of the work described here are (i) to run a consolidated MapReduce cluster that supports all jobs, regardless of their priority or deadline; (ii) to achieve low completion times for higher priority jobs; and (iii) to do so while still optimizing the completion times of lower priority jobs. The benefits are high cluster resource utilization, and, thus, reduced capital and operational expenses.

Natjam11 achieves the above goals, and we have integrated it into the Hadoop YARN scheduler (Hadoop 0.23). Natjam's first challenge is to build a unified scheduler for all job priorities and deadlines in a way that fluidly manages resources among a heterogeneous mix of jobs. When a higher priority job arrives in a full cluster, today's approaches involve either killing lower priority jobs' tasks [59,84] or waiting for them to finish [83]. The former approach prolongs low-priority jobs because they repeat work, while the latter prolongs high-priority jobs. Natjam solves those problems by using an on-demand checkpointing technique that saves the state of a task when it is preempted, so that it can resume where it left off when resources become available. This checkpointing is fast, inexpensive, and automatic in that it requires no programmer involvement.

Natjam's second challenge is to enable quick completion of high-priority jobs, but not at the expense of extending many low-priority jobs' completion times. Natjam addresses this by leveraging smart eviction policies that select which low-priority jobs and their constituent tasks are affected by arriving high-priority jobs. Natjam uses a two-level eviction approach: It first selects a victim job (via a job eviction policy) and then, within that job, one or more victim tasks (via a task eviction policy). For the dual-priority setting with only two priority levels, our eviction policies take into account (i) resources utilized by a job, and (ii) time remaining in a task. We then generalize to arbitrary real-time job deadlines via eviction policies based on both a job's deadline and its resource usage.

We provide experimental results from deployments on a test cluster, both on Emulab and on a commercial cluster at Yahoo!. Our experiments used both synthetic workloads and Hadoop workloads from Yahoo! Inc. We evaluated various eviction policies and found that compared to their behavior in traditional multiprocessor environments, eviction policies have counterintuitive behavior in MapReduce environments; for example, we discovered that longest-task-first scheduling is optimal for MapReduce environments. For the dual-priority setting, Natjam incurs overheads of under 7% for all jobs. For the real-time setting with arbitrary deadlines, our generalized system, called Natjam-R, meets deadlines with only 20% extra laxity in the deadline compared to the job runtime.

In brief, at a high level, our work is placed within the body of related work as follows (see Section 6.3.4 for more details). Our focus is on batch jobs rather than streaming or interactive workloads [71–74,76,77]. Some systems have looked at preemption in MapReduce [55], with respect to fairness [86], at intelligent killing of tasks [59] (including the Hadoop Fair Scheduler [84]), and in SLOs (service level objectives) in generic cluster management [90,93,94]. In comparison, our work is the first to study the effects of eviction policies and deadline-based scheduling for resource-constrained MapReduce clusters. Our strategies can be applied orthogonally in systems such as Amoeba [55]. We are also the first to incorporate such support directly into Hadoop YARN. Finally, MapReduce deadline scheduling has been studied in infinite clusters [62–65] but not in resource-constrained clusters. Eviction Policies for a Dual-Priority Setting

This section presents the eviction policies, and the following section describes the systems architecture. Section 6.1.4 generalizes the solution to the case where jobs have multiple priorities.

Eviction policies lie at the heart of Natjam. When a production (high-priority) MapReduce job arrives at a resource-constrained cluster and there are insufficient resources to schedule it, some tasks of research (low-priority) MapReduce jobs need to be preempted. Our goals here are to minimize job completion times both for production and for research jobs. This section addresses the twin questions of (i) how to choose a victim (research) job so that some of its tasks can be preempted, and (ii) within a given victim job, how to choose victim task(s) for preemption. We call these job eviction and task eviction policies, respectively.

The job and task eviction policies are applied in tandem, that is, for each required task of the arriving production job, a running research task is evicted through application of the job eviction policy followed by the task eviction policy. A research job chosen as victim may be evicted only partially; in other words, some of its tasks may continue running, for example, if the arriving job is relatively small, or if the eviction policy also picks other victim research jobs.

Job Eviction Policies

The choice of victim job affects the completion time of lower priority research jobs by altering resources already allocated to them. Thus, job eviction policies need to be sensitive to current resource usage of individual research jobs. We discuss three resource-aware job eviction policies:

Most Resources (MR): This policy chooses as victim the research job that is currently using the most resources inside the cluster. In Hadoop YARN, resource usage would be in terms of the number of containers used by the job, while in other versions of MapReduce, it would be determined by the number of cluster slots.12

The MR policy, which is loosely akin to the worst-fit policy in OS segmentation, is motivated by the need to evict as few research jobs as possible; a large research job may contain sufficient resources to accommodate one large production job or multiple small production jobs. Thus, fewer research jobs are deferred, more of them complete earlier, and average research job completion time is minimized.

The downside of the MR policy is that when there is one large research job (as might be the case with heavy tailed distributions), it is always victimized whenever a production job arrives. This may lead to starvation and thus longer completion times for large research jobs.

Least Resources (LR): In order to prevent starving of large research jobs, this policy chooses as victim the research job that is currently using the least resources inside the cluster. The rationale here is that small research jobs that are preempted can always find resources if the cluster frees up even a little in the future. However, the LR policy can cause starvation for small research jobs if the cluster stays overloaded; for example, if a new production job arrives whenever one completes, LR will pick the same smallest jobs for eviction each time.

Probabilistically Weighted on Resources (PR): In order to address the starvation issues of LR and MR, our third policy selects a victim job using a probabilistic metric based on resource usage. In PR, the probability of choosing a job as a victim is directly proportional to the resources it currently holds. In effect, PR treats all tasks identically in choosing ones to evict, that is, if the task eviction policy were random, the chances of eviction for all tasks would be identical and independent of their jobs. The downside of PR is that it spreads out evictions across multiple jobs; in PR, unlike MR, one incoming production job may slow down multiple research jobs.

Task Eviction Policies

Once a victim job has been selected, the task eviction policy is applied within that job to select one task that will be preempted (i.e., suspended).

Our approach makes three assumptions, which are based on use case studies: (i) reduces are long enough that preemption of a task takes less time than the task itself; (ii) only reduce tasks are preempted; and (iii) reduces are stateless in between keys. For instance, in Facebook workloads the median reduce task takes 231 s [58], substantially longer than the time needed to preempt a task (see Section 6.1.5). There are two reasons why we focus on preemption only of reduces. First, the challenge of checkpointing reduce tasks subsumes that of checkpointing map tasks, since a map processes individual key-value pairs, while a reduce processes batches of them. Second, several use case studies have revealed that reduces are substantially longer than maps and thus have a bigger effect on the job tail. In the same Facebook trace already mentioned, the median map task time is only 19 s. While 27.1 map containers are freed per second, only 3 (out of 3100) reduce containers are freed per second. Thus, a small production job with 30 reduces would wait on average 10 s, and a large job with 3000 reduces would wait 1000 s. Finally, the traditional stateless reduce approach is used in many MapReduce programs; however, Natjam could be extended to support stateful reducers.

A MapReduce research job's completion time is determined by its last-finishing reduce task. A long tail, or even a single task that finishes late, will extend the research job's completion time. This concern implies that tasks with a shorter remaining time (for execution) must be evicted first. However, in multiprocessors, shortest-task-first scheduling is known to be optimal [53]. In our context, this means that the task with the longest remaining time must be evicted first. That motivated two contrasting task eviction policies, SRT and LRT:

Shortest Remaining Time ( SRT): In this policy, tasks that have the shortest remaining time are selected to be suspended. This policy aims to minimize the impact on the tail of a research job. Further, a task suspended by SRT will finish quickly once it has been resumed. Thus, SRT is loosely akin to the longest-task-first strategy in multiprocessor scheduling. Rather counterintuitively, SRT is provably optimal under certain conditions:

We note that the assumption – that tasks of the victim job will be resumed simultaneously – is reasonable in real-life scenarios in which production job submission times and sizes are unpredictable. Our experiments also validated this theorem.

Longest Remaining Time ( LRT): In this policy, the task with the longest remaining time is chosen to be suspended earlier. This policy is loosely akin to shortest-task-first scheduling in multiprocessors. Its main advantage over SRT is that it is less selfish and frees up more resources earlier. LRT might thus be useful in scenarios in which production job arrivals are bursty. Consider a victim job containing two tasks: one short and one with a long remaining time. SRT evicts the shorter task, freeing up resources for one production task. LRT evicts the longer task, but the shorter unevicted task will finish soon anyway, thus releasing resources for two production tasks, while incurring the overhead for only one task suspension. However, LRT can lengthen the tail of the research job, increasing its completion time. Natjam Architecture

In order to understand the design decisions required to build eviction policies into a MapReduce cluster management system, we incorporated Natjam into the popular Hadoop YARN framework in Hadoop 0.23. We now describe Natjam's architecture, focusing on the dual-priority setting (for production and research jobs).

Preemption in Hadoop YARN

Background: Hadoop YARN Architecture: In the Hadoop YARN architecture, a single cluster-wide Resource Manager (RM) performs resource management. It is assisted by one Node Manager (NM) per node (server). The RM receives periodic heartbeats from each NM containing status updates about resource usage and availability at that node.

The RM runs the Hadoop Capacity Scheduler. The Capacity Scheduler maintains multiple queues that contain jobs. An incoming job is submitted to one of these queues. An administrator can configure two capacities per queue: a minimum (guaranteed) capacity, and a maximum capacity. The scheduler varies the queue capacity between these two queues based on the jobs that have arrived at them.

The basic unit of resource allocation for a task is called a container. A container is effectively a resource slot that contains sufficient resources (primarily memory) to run one task: a map, a reduce, or a master task. An example master task is the Application Master (AM), which is allocated one container. One AM is assigned to each MapReduce job and performs job management functions.

An AM requests and receives, from the RM, container allocations for its tasks. The AM assigns a task to each container it receives and sends launch requests to the container's NM. It also performs speculative execution when needed.

An AM sends heartbeats to the RM. The AM also receives periodic heartbeats from its tasks. For efficiency, YARN piggybacks control traffic (e.g., container requests and task assignments) atop heartbeat messages.

Natjam Components: Natjam entails changes to the Hadoop Capacity Scheduler (at the RM) and the AM, while the NM stays unchanged. Specifically, Natjam adds the following new components to Hadoop YARN:

  1. Preemptor: The preemptor is a part of the RM. We configured the Capacity Scheduler to contain two queues: one for production jobs and one for research jobs. The preemptor makes preemption decisions by using job eviction policies.
  2. Releaser: This component is a part of each AM and is responsible for running the task eviction policies.

When we modify Hadoop, instead of adding new messages that will incur overhead, Natjam leverages and piggybacks atop YARN's existing heartbeats for efficiency. The trade-off is a small scheduling delay, but our experiments show that such delays are small.

We will detail those two components in Section 6.1.3. Now we show how preemption and checkpointing work.

Natjam Preemption Mechanism Example: We illustrate how Natjam's preemption works in YARN. Figure 6.17 depicts an example in which a research job 2 is initially executing in a full cluster, when a production Job 1 requires a single container.13 The steps in the figure are as follows:

  1. Step 1: On AM1's heartbeat, Natjam asks the RM to allocate one container.
  2. Steps 2 and 3: The cluster is full, so the RM applies the job eviction policies and selects Job 2 as victim.
  3. Step 4: The Preemptor waits for AM2's next heartbeat, and in response to the heartbeat sends AM2 the number and type of containers to be released.
  4. Step 5: The Releaser at AM2 uses the task eviction policy to select a victim task.
  5. Step 6: When the victim task (still running) sends its next usual heartbeat to AM2, it is asked to suspend.
  6. Step 7: The victim task suspends and saves a checkpoint.
  7. Step 8: The victim task sends the checkpoint to AM2.
  8. Step 9: The task indicates to NM-A that it has completed and it exits, freeing the container.

    With that, the Natjam-specific steps are done. For completeness, we list below the remaining steps, which are taken by YARN by default to give AM1 the new container.

  9. Step 10:NM-A's heartbeat sends the container to RM.
  10. Step 11:AM1's next RM heartbeat gets the container.
  11. Step 12: AM1 sends NM-A the task request.
  12. Step 13: NM-A launches the task on the container.

Figure 6.17 Example: Container suspend in Natjam. New components are shown in bold font; others are from YARN. AM1 is a production job, and AM2 is a research job.

Checkpoint Saved and Used by Natjam: When Natjam suspends a research job's reduce task, an on-demand checkpoint is saved automatically. It contains the following items: (i) an ordered list of past suspended container IDs, one for each attempt, that is, each time this task was suspended in the past; (ii) a key counter, that is, the number of keys that have been processed so far; (iii) reduce input paths, that is, local file paths; and (iv) the hostname associated with the last suspended attempt, which is useful for preferably resuming the research task on the same server. Natjam also leverages intermediate task data already available via Hadoop [136], including (v) reduce inputs, which are stored at a local host, and (vi) reduce outputs, which are stored on HDFS.

Task Suspend: We modified YARN so that the reduce task keeps track of two pieces of state: paths to files in the local file system that hold reduce input and the key counter, that is, the number of keys that have been processed by the reduce function so far. When a reduce task receives a suspend request from its AM, the task checks whether it is in the middle of processing a particular key, and finishes that key. Second, it writes the input file paths to a local log file. Third, Hadoop maintains a partial output file per reduce attempt, in the HDFS distributed file system. It holds the output so far generated from the current attempt. The partial output file is given a name that includes the container ID. When a task suspends, this partial output file is closed. Finally, the reduce compiles its checkpoint and sends the result to its AM, and the reduce task exits.

Task Resume: On a resume, the task's AM sends the saved checkpoint state as launch parameters to the chosen NM. The Preemptor is in charge of scheduling the resuming reduce on a node. The Preemptor prefers the old node on which the last attempt ran (available from the hostname field in the checkpoint). If the resumed task is assigned to its old node, the reduce input can be read without network overhead, that is, from local disk. If it is resumed on a different node, the reduce input is assembled from map task outputs, much like a new task.

Next, the reduce task creates a new partial output file in HDFS. It skips over the input keys that the checkpoint's key counter field indicates have already been processed. It then starts execution as a normal reduce task.

Commit after Resume: When a previously suspended reduce task finishes, it needs to assemble its partial output. It starts that by finding, in HDFS, all its past partial output files; it does so by using the ordered list of past suspended container IDs from its checkpoint. It then accumulates their data into output HDFS files that are named in that order. This order is critical so that the output is indistinguishable from that of a reduce task that was never suspended.

Implementation Issues

This section first explains how we modify the AM state machines in Hadoop YARN, and then describes the Preemptor and Releaser. As mentioned earlier, we leverage existing Hadoop mechanisms such as heartbeats.

Application Master's State Machines: For job and task management, Hadoop YARN's AM maintains separate state machines per job, per task, and per task attempt. Natjam does not change the job state machine; we enabled this state machine only to handle the checkpoint. Thus, both the suspend and the resume occur during the Running state in this state machine.

We modify the task state machine very little. When the AM learns that a task attempt has been suspended (from step 8 in Figure 6.17), the task state machine goes ahead and creates a new task attempt to resume the task. However, this does not mean that the task is scheduled immediately; the transitions of the task attempt state machine determine whether it does.

The task attempt state machine is used by YARN to assign the container, set up execution parameters, monitor progress, and commit output. Natjam adds two states to the task attempt state machine, as shown in Figure 6.18: Suspend-Pending and Suspended. The task attempt has a state of Suspend-Pending when it wishes to suspend a task but has not received suspension confirmation from the local task (steps 6b–7 from Figure 6.17). The state becomes Suspended when the saved checkpoint is received (step 8), and this is a terminal state for that task attempt.

Figure 6.18 Modified task attempt state machine: At application master. Failure states are omitted.

The new transitions for suspension in Figure 6.18 are as follows:

  • S1: AM asks the task to suspend and requests a checkpoint.
  • S2: AM receives the task checkpoint and saves it in the task attempt state machine.

A resuming reduce task starts from the New state in the task attempt state machine. However, we modify some transitions to distinguish a resuming task from a new (nonresuming) task attempt as follows:

  • R1: Just as for any reduce task attempt, every heartbeat from the AM to RM requests a container for the resuming reduce. If the RM cannot satisfy the request, it ignores it (since the next heartbeat will resend the request anyway). Suppose a container frees up (e.g., as production jobs complete), so that the RM can now schedule a research task. In doing so, the RM prefers to respond to a resuming reduce's request, rather than one from a non-resuming research reduce. The AM to RM requests also carry the hostname field from the task checkpoint; the RM uses it to declare a preference for allocating that container at that hostname.
  • R2: Once the AM has received a container from the RM, it launches a task attempt on the allocated container. For resuming reduces, the AM also sends the saved checkpoint to the container.
  • C1: On commit, the AM accumulates partial output files into the final task output in HDFS (see Section 6.1.3).

Preemptor: Recall that Natjam sets up the RM's Capacity Scheduler with two queues: one for production jobs and one for research jobs. The Preemptor is implemented as a thread within the Capacity Scheduler. In order to reclaim resources from the research queue for use by the production queue, the Preemptor periodically runs a reclaim algorithm, with sleeps of 1 s between runs. A run may generate reclaim requests, each of which is sent to some research job's AM to reclaim a container (which is step 4 in Figure 6.17). In a sense, a reclaim request is a statement of a production job's intention to acquire a container.

We keep track of a per-production job reclaim list. When the RM sends a reclaim request on behalf of a job, an entry is added to the job's reclaim list. When a container is allocated to that job, that reclaim list entry is removed. The reclaim list is needed to prevent the Preemptor from generating duplicate reclaim requests, which might occur because our reliance on heartbeats entails a delay between a container suspension and its subsequent allocation to a new task. Thus, we generate a reclaim request whenever (i) the cluster is full, and (ii) the number of pending container requests from a job is greater than the number of requests in its reclaim list.

In extreme cases, the Preemptor may need to kill a container, for example, if the AM has remained unresponsive for too long. Our threshold to kill a container is reached when a reclaim request has remained in the reclaim list for longer than a killing timeout (12 s). A kill request is sent directly to the NM to kill the container. This bypasses the AM, ensuring that the container will indeed be killed. When a kill request is sent, the reclaim request is added to an expired list. It remains there for an additional time interval (2 s), after which it is assumed the container is dead, and the reclaim request is thus removed from the expired list. With those timeouts, we have never observed killings of any tasks in any of our cluster runs.

Releaser: The Releaser runs at each job's AM and decides which tasks to suspend. Since the task eviction policies discussed in Section 6.1.2 (i.e., SRT and LRT) use the time remaining at the task, the Releaser needs to estimate it. We use Hadoop's default exponentially smoothed task runtime estimator, which relies on the task's observed progress [69]. However, calculating this estimate on demand can be expensive due to the large numbers of tasks. Thus, we have the AM only periodically estimate the progress of all tasks in the job (once a second), and use the latest complete set of estimates for task selection. While the estimates might be stale, our experiments show that this approach works well in practice.

Interaction with Speculative Execution: Our discussion so far has ignored speculative execution, which Hadoop uses to replicate straggler task attempts. Natjam does not change speculative execution and works orthogonally, that is, speculative task attempts are candidates for eviction. When all attempts of a task are evicted, the progress rate calculation of the task is not skewed, because speculative execution tracks the progress of task attempts rather than the tasks themselves. While this interaction could be optimized further, it works well in practice. Natjam can be further optimized to support user-defined (i.e., per job) task eviction policies that would prioritize the eviction of speculative tasks, but discussion of that is beyond the scope of this chapter. Natjam-R: Deadline-Based Eviction

We have created Natjam-R, a generalization of Natjam that targets environments in which each job has a hard and fixed real-time deadline. Unlike Natjam, which allowed only two priority levels, Natjam-R supports multiple priorities; in the real-time case, a job's priority is derived from its deadline. While Natjam supported inter-queue preemption (with two queues), Natjam-R uses only intra-queue preemption. Thus, all jobs can be put into one queue; there is no need for two queues. Jobs in the queue are sorted based on priority.

Eviction Policies

First, for job eviction, we explored two deadline-based policies inspired by the classical real-time literature [78,81]: Maximum Deadline First (MDF) and Maximum Laxity First (MLF). MDF chooses as victim the running job that has the longest deadline. On the other hand, MLF evicts the job with the highest laxity, where laxity = the deadline minus the job's projected completion time. For MLF, we extrapolate Hadoop's reported job progress rate to calculate a job's projected completion time.

While MDF is a static scheduling policy that accounts only for deadlines, MLF is a dynamic policy that also accounts for a job's resource needs. If a job has an unsatisfactory progress rate, MLF may give it more resources closer to its deadline. It may do so by evicting small jobs with long deadlines. In essence, while MLF may run some long-deadline, high-resource jobs, MDF might starve all long-deadline jobs equally. Further, MLF is fair in that it allows many jobs with similar laxities to make simultaneous progress. However, this fairness can be a shortcoming in scenarios with many short deadlines; MLF results in many deadline misses, while MDF would meet at least some deadlines. Section 6.1.6 describes our experimental evaluation of this issue.

Second, our task eviction policies remain the same as before (SRT, LRT) because the deadline is for the job, not for individual tasks.

In addition to the job and task eviction policies, we need to have a job selection policy. When resources free up, this policy selects a job from among the suspended ones and gives it containers. Possible job selection policies are earliest deadline first (EDF) and least laxity first (LLF). In fact, we implemented these but observed thrashing-like scheduling behavior if the job eviction policy was inconsistent with the job selection policy. For instance, if we used MDF job eviction and LLF job selection, a job selected for eviction by MDF would soon after be selected for resumption by LLF, and thus enter a suspend-resume loop. We concluded that the job selection policy needed to be dictated by the job eviction policy, that is, MDF job eviction implies EDF job selection, while MLF implies LLF job selection.


The main changes that differentiate Natjam-R from Natjam are in the RM. In Natjam-R, the RM keeps one Capacity Scheduler queue sorted by decreasing priority. A priority is inversely proportional to the deadline for MDF, and to laxity for MLF. The Preemptor periodically (once a second) examines the queue and selects the first job (say Ji) that still has tasks waiting to be scheduled. Then it considers job eviction candidates from the queue, starting with the lowest priority (i.e., later deadlines or larger laxities) up to Ji's priority. If it encounters a job that still has allocated resources, that job is picked as the victim; otherwise, no further action is taken. To evict the job, the Releaser from Natjam uses the task eviction policy to free a container. Checkpointing, suspend, and resume work in Natjam-R as described earlier for Natjam (see Section 6.1.3). Microbenchmarks

Experimental Plan

We present two sets of experiments, increasing in complexity and scale. This section presents microbenchmarking results for a small Natjam cluster. Section 6.1.6 evaluates Natjam-R. While we present only selected experimental results in this chapter, we refer the reader to Ref. [131] for extensive information on experiments and evaluation of our system.

Microbenchmark Setup

We first evaluated the core Natjam system that supports a dual-priority workload, that is, research and production jobs. We addressed the following questions: (i) How beneficial is Natjam relative to existing techniques? (ii) What is the overhead of the Natjam suspend mechanism? (iii) What are the best job eviction and task eviction policies?

We used a small-scale test bed and a representative workload because this first experimental stage involved exploration of different parameter settings and study of many fine-grained aspects of system performance. A small test bed gave us flexibility.

Our test cluster had seven servers running on a 1 GigE network. Each server had two quad-core processors and 16 GB of RAM, of which 8 GB were configured to run 1 GB-sized Hadoop containers. (Thus, 48 containers were available in the cluster.) One server acted as the Resource Manager, while the other six were workers. Each entity (AM, map task, and reduce task) used one container.

In our experiments, we injected a mix of research and production jobs, as shown in Table 6.4. To reflect job size variation, the job sizes ranged from XL (filling the entire cluster) to S (filling a fourth of the cluster). To mimic use case studies [58], each job had a small map execution time, and was dominated by the reduce execution time. To model variance in task running times, we selected reduce task lengths uniformly from the interval (0.5, 1.0], where 1.0 is the normalized largest reduce task. To emulate computations, we used SWIM [137] to create random keys and values, with thread sleeps called between keys. Shuffle and HDFS traffic were incurred as usual.

Table 6.4 Microbenchmark settings.

Job Number of reduces Average time (s)
Research-XL 47 192.3
Research-L 35 193.8
Research-M 23 195.6
Research-S 11 202.6
Production-XL 47 67.2
Production-L 35 67.0
Production-M 23 67.6
Production-S 11 70.4

The primary metric was job completion time. Each of our data points shows an average and standard deviation over five runs. Unless otherwise noted, Natjam used MR job eviction and SRT task eviction policies.

Natjam versus Existing Techniques

Figure 6.19 compares Natjam to several alternatives versus an ideal setting versus two existing mechanisms in the Hadoop Capacity scheduler, and versus pessimistic killing of tasks (instead of saving the cheap checkpoint). The ideal setting measures each job's completion time when it is executed on an otherwise empty cluster; thus, it ignores resource sharing and context switch overheads. For the second setting, we chose the Hadoop Capacity Scheduler because it represents approaches that we might take with two physically separate clusters sharing the same scheduler. Finally, killing of tasks is akin to approaches such as those described in Ref. [59] and for the Hadoop Fair Scheduler [84].

Figure 6.19 Natjam versus existing techniques. At t = 0 s, the Research-XL job was submitted, and at t = 50 s, the Production-S job was submitted.

In this experiment, a Research-XL job was submitted initially to occupy the entire cluster. Then, 50 s later, a Production-S job was submitted. Figure 6.19 shows that killing of tasks (the fourth pair of bars) finished production jobs fast, but prolonged research jobs by 23% compared to the ideal case (the first pair of bars). Thus, saving the overhead of checkpoints is not worth the repeated work due to task restarts.

We next examined two popular Hadoop Capacity Scheduler approaches called Hard cap and Soft cap. Recall that the Capacity Scheduler allows the administrator to set a maximum cap on the capacity allocated to each of the two queues (research and production). In Hard cap, that cap is used as a hard limit for each queue. In the Soft cap approach, each queue is allowed to expand to the full cluster if there are unused resources, but it cannot scale down without waiting for its scheduled tasks to finish (e.g., if the production queue needs resources from the research queue). We configured these two approaches with the research queue set to 75% capacity (36 containers) and production queue to 25% capacity (12 containers), as these settings performed well.

Figure 6.19 shows that in Hard cap (the second pair of bars), the research job took 52% longer than ideal, while the production job was unaffected. Under Soft cap (the third pair of bars), the production job could obtain containers only when the research job freed them; this resulted in an 85% increase in production job completion time, while the research job was unaffected.

The last pair of bars shows that when Natjam was used, the production job's completion time was 7% worse (5.4 s longer) than ideal and 77% better than the result for the Hadoop Capacity Scheduler's Soft cap. The research job's completion time was only 2% worse (4.7 s longer) than ideal, 20% better than that of killing, and 49% better than that of Hadoop Hard cap. One of the reasons the research job was close to ideal is that it was able to make progress in parallel with the production job. There are other internal reasons for the performance benefit, which we explore next.

Suspend Overhead

We measured Natjam's suspend overhead on a fully loaded cluster. We observed that it took an average of 1.35 s to suspend a task and 3.88 s to resume a task. Standard deviations were low. In comparison, default Hadoop took an average of 2.63 s to schedule a task on an empty cluster. From this it might appear that Natjam incurs a higher total overhead of 5.23 s per task suspend-resume. However, in practice the effective overhead is lower; for instance, Figure 6.19 showed only a 4.7 s increase in research job completion time. The reason is that task suspends typically occur in parallel, and in some cases, task resumes do too. Thus, the time overheads are parallelized rather than aggregated.

Task Eviction Policies

We now compare the two task eviction policies (SRT and LRT) from Section 6.1.2 against each other, and against a random eviction strategy that we also implemented. We performed two sets of experiments: one with Production-S and another with Production-L. The production job was injected 50 s after a Research-XL job.

Table 6.5 tabulates the results. In all cases the production job incurred overhead similar to that for an empty cluster. Thus, we discuss only research job completion time (last column). As shown in the top half of the table, a random task eviction strategy resulted in a 45 s increase in completion time compared to the ideal; we observed that a fourth of the tasks were suspended, leading to a long job tail. Evicting the LRT incurred a higher increase of 55 s because LRT prolongs the tail. Evicting the SRT emerged as the best policy and was only 4.7 s worse than the ideal because it respects the job tail.

Table 6.5 Task eviction policies.

Task eviction policy Production job Mean (S.D.) runtime (s) Research job Mean (S.D.) runtime (s)
Random Production-S 76.6 (3.0) Research-XL 237.6 (7.8)
LRT Production-S 78.8 (1.8) Research-XL 247.2 (6.3)
SRT Production-S 75.6 (1.5) Research-XL 197.0 (5.1)
Random Production-L 75.0 (1.9) Research-XL 244.2 (5.6)
LRT Production-L 75.8 (0.4) Research-XL 246.6 (6.8)
SRT Production-L 74.2 (1.9) Research-XL 234.6 (3.4)
At t = 0 s, a Research-XL job was submitted; at t = 50 s, the production job was submitted. Job completion times are shown. The ideal job completion times are shown in Table 6.4.

In the lower half of Table 6.5, it can be seen that a larger production job caused more suspensions. The research job completion times for the random and LRT eviction policies are similar to those in the top half because the job's tail was already long for the small production job, and was not much longer for the larger job. SRT is worse than it was for a small production job, yet it outperformed the other two eviction strategies.

We conclude that SRT is the best task eviction policy, especially when production jobs are smaller than research jobs. We believe this is a significant use case since research jobs run longer and process more data, while production jobs are typically small due to the need for faster results.

Job Eviction Policies

We next compare the three job eviction policies discussed in Section 6.1.2. Based on the previous results, we always used SRT task eviction.14 We initially submitted two research jobs and followed 50 s later with a small production job. We examined two settings: one in which the initial research jobs were comparable in size and another in which they were different. We observed that the production job completion time was close to ideal; hence Table 6.6 shows only research job completion times.

Table 6.6 Job eviction policies.

Job eviction policy Research job Mean (S.D.) runtime (s) Research job Mean (S.D.) runtime (s)
PR Research-M 195.8 (1.3) Research-M 201.2 (0.8)
MR Research-M 196.2 (1.3) Research-M 200.6 (2.1)
LR Research-M 200.6 (1.3) Research-M 228.8 (12.7)
PR Research-L 201.6 (8.3) Research-S 213.8 (18.8)
MR Research-L 195.8 (1.1) Research-S 204.8 (2.2)
LR Research-L 195.8 (0.4) Research-S 252.4 (9.3)
At t = 0 s, two research jobs were submitted (either two Research-M's or a Research-S and a Research-L); at t = 50 s, a Production-S job was submitted. Only the research job completion times are shown. The ideal job completion times are shown in Table 6.5.

The top half of Table 6.6 shows that when research job sizes were comparable, probabilistic weighing of job evictions by resources (PR) and eviction of the job with the MR performed comparably: research job completion times for the two policies were within 2 s (0.5%) of each other. This is desirable due to the matching job sizes. On the other hand, eviction of the job with the LR performed the worst, because it caused starvation in one of the jobs. Once tasks start getting evicted from a research job (which may at first have been picked randomly by LR if all jobs had the same resource usage), subsequently LR will always pick the same job (until it is fully suspended).

That behavior of LR is even more pronounced on small research jobs in a heterogeneous mix, as can be seen in the bottom half of Table 6.6. The Research-S job is picked as a victim by PR less often than by LR, and thus PR outperforms LR. PR penalizes the Research-L job slightly more than LR does, since PR evicts more tasks from a larger job. Even so, PR and MR are within 10 s (5%) of each other; any differences are due to the variable task lengths, and the effectiveness of the SRT task eviction policy. We observed that MR evicted no tasks at all from the Research-S job.

We conclude that when the best task eviction policy (SRT) is used, the PR and MR job eviction policies are preferable to LR, and MR is especially good under heterogeneous mixes of research job sizes. Natjam-R Evaluation

We evaluated the real-time support of our Natjam-R system (described in Section 6.1.4). Our experiments addressed the following questions: (i) How do MDF and MLF job eviction strategies compare? (ii) How good is Natjam-R at meeting deadlines? And (iii) Do Natjam-R's benefits hold under realistic workloads?

We used eight Emulab servers [111,138], each with eight-core Xeon processors and 250 GB disk space. One server was the Resource Manager, and each of the other seven servers ran three containers of 1 GB each. (Thus, there were 21 containers in total.)

MDF versus MLF

We injected three identical jobs, Job 1 to Job 3, each with 8 maps and 50 reduces. (Each job took 87 s on an empty cluster.) They were submitted in numerical order starting at t = 0 s and 5 s apart, thus overloading the cluster. Since MDF and MLF will both meet long deadlines, we chose shorter deadlines. To force preemption, the deadlines of job 1, job 2, and job 3 were set 10 s apart: 200 s, 190 s, and 180 s, respectively.

Figure 6.20 depicts the progress rate for the MDF cluster and the MLF cluster. Our first observation is that while MDF allowed the short-deadline jobs to run earlier and thus satisfy all deadlines, MLF missed all deadlines (see Figure 6.20b). In the reduce phase for MLF, after a while jobs proceeded in lockstep in the reduce phase, because when a lower laxity job (e.g., job 3) has run for a while in lieu of a higher laxity job (e.g., job 1), their laxities become comparable. Thereafter, the two jobs take turns preempting each other. Breaking ties, for example, by using a deadline, does not eliminate this behavior. In a sense, MLF tries to be fair to all jobs by allowing them all to make progress simultaneously, but this fairness is in fact a drawback.

Figure 6.20 Natjam-R: (a) MDF versus (b) MLF. Lower index jobs have shorter deadlines but arrive later.

MLF also takes longer to finish all jobs, that is, 239 s compared to MDF's 175 s. MLF's lockstep behavior incurs a high context switch overhead. We conclude that MDF is preferable to MLF, especially under short deadlines.

Varying the Deadline

We submitted a job (job 1) just as described previously, and 5 s later submitted an identical job (job 2) whose deadline was 1 s earlier than job 1's. We measured job 1's clean compute time as the time to run the job in an empty cluster. Then, we set its deadline = submission time + (clean compute time × (1 + ɛ)). Figure 6.21 shows the effect of ɛ on a metric called margin. We define a job's margin =(deadline) minus (job completion time). A negative margin implies a deadline miss. We observe that an ε as low as 0.8 still meets both deadlines, while an ɛ as low as 0.2 meets at least the shorter deadline. This means that given one critical job with a very short deadline, Natjam-R can satisfy it if it has at least 20% more time than the job's clean compute time. This percentage is thus an estimate of Natjam-R's overhead. We also performed experiments that varied the second job's size as a fraction of the first job from 0.4 to 2.0, but we saw little effect on margin.

Figure 6.21 Natjam-R: Effect of deadlines: Margin = Deadline – Job completion time; thus, a negative margin implies a deadline miss. Job 2 has a deadline 1 s earlier but is submitted 5 s after Job 1.

Trace-Driven Experiments

We used the Yahoo! Hadoop traces to evaluate Natjam-R's deadline satisfaction. We used only the production cluster trace, scaled so as to overload the target cluster. Since the original system did not support deadline scheduling, no deadlines were available from the traces. Thus, we chose ε randomly for each job from the interval [0, 2.0], and used it to set the job's deadline forward from its submission time (as described earlier). A given job's deadline was selected to be the same in all runs.

Figure 6.22 compares Natjam-R against Hadoop Soft cap. It shows the CDF of the difference between the margins of the two approaches; a negative difference implies that Natjam-R is better. Natjam-R's margin is better than Soft cap's for 69% of jobs. The largest improvement in margin was 366 s. The plot is biased by one outlier job that took 1000 s longer in Natjam-R; the next greatest outlier is only –287 s. The first outlier job suffered in Natjam-R because the four jobs submitted just before it and one job right after had much shorter deadlines. Yet the conclusion is positive: among the 400 jobs with variable deadlines, there was only one such outlier. We conclude that Natjam-R satisfies deadlines well under a realistic workload.

Figure 6.22 Natjam-R: Effect of real Yahoo! Hadoop trace: Margin = Deadline – Job completion time. Negative values imply that Natjam-R is better.

6.7 Summary

In this chapter, we have given an overview of five systems we created that are oriented toward offering performance assuredness in cloud computing frameworks, even while the system is under change.

  1. Morphus, which supports reconfigurations in sharded distributed NoSQL databases/storage systems.
  2. Parqua, which supports reconfigurations in distributed ring-based key-value stores.
  3. Stela, which supports scale-out/scale-in in distributed stream processing systems.
  4. A system to support scale-out/scale-in in distributed graph processing systems.
  5. Natjam, which supports priorities and deadlines for jobs in batch processing systems.

For each system, we described its motivations, design, and implementation, and presented experimental results. Our systems are implemented in popular open-source cloud computing frameworks, including MongoDB (Morphus), Cassandra (Parqua), Storm (Stela), LFGraph, and Hadoop (Natjam). Readers who are interested in more detailed design and implementation and more extensive experimental findings are advised to see our original papers that introduced these systems [102,114,120,123,131].

6.8 The Future

Overall, building systems that perform predictably in the cloud remains one of the biggest challenges today, both in mission-critical scenarios and in non-real-time scenarios. The work outlined in this chapter has made deep inroads toward solving key issues in this area.

More specifically, the work described in this chapter constitutes the starting steps toward realization of a truly autonomous and self-aware cloud system for which the mission team merely needs to specify SLAs/SLOs (service level agreements and objectives), and the system will reconfigure itself automatically and continuously over the lifetime of the mission to ensure that these requirements are always met. For instance, as of this writing, we are currently building on our scale-out/scale-in work in the areas of distributed stream processing and distributed graph processing, by adding in an extra layer of adaptive scale-out/scale-in that seeks to meet SLA/SLO requirements such as latency or throughput (for stream processing), and completion time deadlines or throughput (for graph processing). These adaptive techniques will automatically give resources to a job that is facing a higher workload or more stringent deadlines and take away resources from a job that has more relaxed needs. Adaptivity implies that there is no human involvement in making decisions on, for example, the number of machines to give or take away from a job, or when to do so; such decisions will be made automatically by the system. Such adaptive logic may be able to leverage machine learning techniques that will learn the system's performance characteristics and adjust the resource allocation changes over time to ensure that the best possible performance is gained, given the cloud resources at hand. It is also potentially possible to add an adaptive layer atop our database reconfiguration systems (Morphus and Parqua); however, that would need to be done wisely and relatively rarely because of the enormous cost of each reconfiguration operation for large databases.



  1. 1 MongoDB. Available at (accessed Jan. 1, 2015).
  2. 2 Lakshman, A. and Malik, P. (2010) Cassandra: a decentralized structured storage system. ACM SIGOPS Operating Systems Review, 44 (2), 35–40.
  3. 3 Apache Hadoop. Available at
  4. 4 Apache Storm. The Apache Software Foundation. Available at (accessed 2016).
  5. 5 Kulkarni, S., Bhagat, N., Fu, M., Kedigehalli, V., Kellogg, C., Mittal, S., Patel, J.M., Ramasamy, K., and Taneja, S. (2015) Twitter Heron: stream processing at scale, in Proceedings of the 2015 ACM SIGMOD International Conference on Management of Data, pp. 239–250.
  6. 6 Samza. Available at (accessed Nov. 14, 2016).
  7. 7 Carey, M.J. and Lu, H., Load balancing in a locally distributed DB system, in Proceedings of the 1986 ACM SIGMOD International Conference on Management of Data, pp. 108–119.
  8. 8 Kemme, B., Bartoli, A., and Babaoglu, O., Online reconfiguration in replicated databases based on group communication, in Proceedings of the 2001 International Conference on Dependable Systems and Networks, pp. 117–126.
  9. 9 Rae, I., Rollins, E., Shute, J., Sodhi, S., and Vingralek, R. (2013) Online, asynchronous schema change in F1. Proceedings of the VLDB Endowment, 6 (11), 1045–1056.
  10. 10 Elmore, A.J., Arora, V., Taft, R., Pavlo, A., Agrawal, D., and El Abbadi, A. (2015) Squall: fine-grained live reconfiguration for partitioned main memory databases, in Proceedings of the ACM SIGMOD International Conference on Management of Data, pp. 299–313.
  11. 11 Copeland, G., Alexander, W., Boughter, E., and Keller, T. (1988) Data placement in Bubba, in Proceedings of the ACM SIGMOD International Conference on Management of Data, pp. 99–108.
  12. 12 Mehta, M. and DeWitt, D.J. (1997) Data placement in shared-nothing parallel database systems. Proceedings of the VLDB Endowment, 6 (1), 53–72.
  13. 13 Das, S., Nishimura, S., Agrawal, D., and El Abbadi, A. (2011) Albatross: lightweight elasticity in shared storage databases for the cloud using live data migration. Proceedings of the VLDB Endowment, 4 (8), 494–505.
  14. 14 Elmore, A.J., Das, S., Agrawal, D., and El Abbadi, A. (2011) Zephyr: live migration in shared nothing databases for elastic cloud platforms, in Proceedings of the 2011 ACM SIGMOD International Conference on Management of Data, pp. 301–312.
  15. 15 Barker, S., Chi, Y., Hacıgümüş, H., Shenoy, P., and Cecchet, E. (2014) ShuttleDB: database-aware elasticity in the cloud, in Proceedings of the 11th International Conference on Autonomic Computing, pp. 33–43. Available at
  16. 16 Curino, C., Jones, E.P.C., Popa, R.A., Malviya, N., Wu, E., Madden, S., Balakrishnan, H., and Zeldovich, N. (2011) Relational cloud: a database-as-a-service for the cloud, in Proceedings of the 5th Biennial Conference on Innovative Data Systems Research, pp. 235–240. Available at
  17. 17 Ardekani, M.S. and Terry, D.B. (2014) A self-configurable geo-replicated cloud storage system, in Proceedings of the 11th USENIX Conference on Operating Systems Design and Implementation, pp. 367–381. Available at
  18. 18 Clark, C., Fraser, K., Hand, S., Hansen, J.G., Jul, E., Limpach, C., Pratt, I., and Warfield, A. (2005) Live migration of virtual machines, in Proceedings of the 2nd Symposium on Networked Systems Design and Implementation, vol. 2, pp. 273–286.
  19. 19 Bradford, R., Kotsovinos, E., Feldmann, A., and Schiöberg, H. (2007) Live wide-area migration of virtual machines including local persistent state, in Proceedings of the 3rd International Conference on Virtual Execution Environments, pp. 169–179.
  20. 20 Barker, S., Chi, Y., Moon, H.J., Hacıgümüş, H., and Shenoy, P. (2012) ‘Cut me some slack’: latency-aware live migration for databases, in Proceedings of the 15th International Conference on Extending Database Technology, pp. 432–443.
  21. 21 Chowdhury, M., Zaharia, M., Ma, J., Jordan, M.I., and Stoica, I., Managing data transfers in computer clusters with Orchestra, in Proceedings of the ACM SIGCOMM 2011 Conference, pp. 98–109.
  22. 22 Al-Fares, M., Radhakrishnan, S., Raghavan, B., Huang, N., and Vahdat, A. (2010) Hedera: dynamic flow scheduling for data center networks, in Proceedings of the 7th USENIX Conference on Networked Systems Design and Implementation, p. 19.
  23. 23 Abadi, D.J., Carney, D., Çetintemel, U., Cherniack, M., Convey, C., Lee, S., Stonebraker, M., Tatbul, N., and Zdonik, S. (2003) Aurora: a new model and architecture for data stream management. VLDB J., 12 (2), 120–139.
  24. 24 Tatbul, N., Ahmad, Y., Çetintemel, U., Hwang, J.-H., Xing, Y., and Zdonik, S. (2008) Load management and high availability in the Borealis distributed stream processing engine, in GeoSensor Networks: 2nd International Conference, GSN 2006, Boston, MA, USA, October 1–3, 2006: Revised Selected and Invited Papers, Lecture Notes in Computer Science, vol. 4540 (eds. S. Nittel, A. Labrinidis, and A. Stefanidis), Springer, Berlin, Germany, pp. 66–85.
  25. 25 Loesing, S., Hentschel, M., Kraska, T., and Kossmann, D., Stormy: an elastic and highly available streaming service in the cloud, in Proceedings of the 2012 Joint EDBT/ICDT Workshops, pp. 55–60.
  26. 26 Gulisano, V., Jimenez-Peris, R., Patino-Martinez, M., Soriente, C., and Valduriez, P. (2012) StreamCloud: an elastic and scalable data streaming system. IEEE Transactions on Parallel and Distributed Systems, 23 (12), 2351–2365.
  27. 27 Abadi, D.J., Ahmad, Y., Balazinska, M., Çetintemel, U., Cherniack, M., Hwang, J.-H., Lindner, W., Maskey, A.S., Rasin, A., Ryvkina, E., Tatbul, N., Xing, Y., and Zdonik, S. (2005) The design of the Borealis stream processing engine, in Proceedings of the 2nd Biennial Conference on Innovative Data Systems Research, pp. 277–289.
  28. 28 Castro Fernandez, R., Migliavacca, M., Kalyvianaki, E., and Pietzuch, P., Integrating scale out and fault tolerance in stream processing using operator state management, in Proceedings of the 2013 ACM SIGMOD International Conference on Management of Data, pp. 725–736.
  29. 29 Gedik, B., Schneider, S., Hirzel, M., and Wu, K.-L. (2014) Elastic scaling for data stream processing. IEEE Transactions on Parallel and Distributed Systems, 25 (6), 1447–1463.
  30. 30 Schneider, S., Andrade, H., Gedik, B., Biem, A., and Wu, K.-L. (2009) Elastic scaling of data parallel operators in stream processing, in Proceedings of the IEEE International Symposium on Parallel and Distributed Processing, pp. 1–12.
  31. 31 Amini, L., Andrade, H., Bhagwan, R., Eskesen, F., King, R., Selo, P., Park, Y., and Venkatramani, C. (2006) SPC: a distributed, scalable platform for data mining, in Proceedings of the 4th International Workshop on Data Mining Standards, Services and Platforms, pp. 27–37.
  32. 32 Jain, N., Amini, L., Andrade, H., King, R., Park, Y., Selo, P., and Venkatramani, C., Design, implementation, and evaluation of the linear road benchmark on the stream processing core, in Proceedings of the 2006 ACM SIGMOD International Conference on Management of Data, pp. 431–442.
  33. 33 Wu, K.-L., Yu, P.S., Gedik, B., Hildrum, K.W., Aggarwal, C.C., Bouillet, E., Fan, W., George, D.A., Gu, X., Luo, G., and Wang, H. (2007) Challenges and experience in prototyping a multi-modal stream analytic and monitoring application on System S, in Proceedings of the 33rd International Conference on Very Large Data Bases, pp. 1185–1196.
  34. 34 Gedik, B., Andrade, H., Wu, K.-L., Yu, P.S., and Doo, M. (2008) SPADE: the system S declarative stream processing engine, in Proceedings of the ACM SIGMOD International Conference on Management of Data, pp. 1123–1134.
  35. 35 Lohrmann, B., Janacik, P., and Kao, O. (2015) Elastic stream processing with latency guarantees, in Proceedings of the IEEE 35th International Conference on Distributed Computing Systems, pp. 399–410.
  36. 36 Heinze, T., Roediger, L., Meister, A., Ji, Y., Jerzak, Z., and Fetzer, C. (2015) Online parameter optimization for elastic data stream processing, in Proceedings of the 6th ACM Symposium on Cloud Computing, pp. 276–287.
  37. 37 Aniello, L., Baldoni, R., and Querzoni, L. (2013) Adaptive online scheduling in Storm, in Proceedings of the 7th ACM International Conference on Distributed Event-Based Systems, pp. 207–218.
  38. 38 Gandhi, A., Harchol-Balter, M., Raghunathan, R., and Kozuch, M.A. (2012) AutoScale: dynamic, robust capacity management for multi-tier data centers. ACM Transactions on Computer Systems, 30 (4), 14:1–14:26.
  39. 39 Zhang, Q., Zhani, M.F., Zhang, S., Zhu, Q., Boutaba, R., and Hellerstein, J.L. (2012) Dynamic energy-aware capacity provisioning for cloud computing environments, in Proceedings of the 9th International Conference on Autonomic Computing, pp. 145–154.
  40. 40 Shen, Z., Subbiah, S., Gu, X., and Wilkes, J. (2011) CloudScale: elastic resource scaling for multi-tenant cloud systems, in Proceedings of the 2nd ACM Symposium on Cloud Computing, pp. 5:1–5:14.
  41. 41 Jiang, J., Lu, J., Zhang, G., and Long, G. (2013) Optimal cloud resource auto-scaling for web applications, in Proceedings of the 13th IEEE/ACM International Symposium on Cluster, Cloud, and Grid Computing, pp. 58–65.
  42. 42 Nguyen, H., Shen, Z., Gu, X., Subbiah, S., and Wilkes, J. (2013) AGILE: elastic distributed resource scaling for infrastructure-as-a-service, in Proceedings of the 10th International Conference on Autonomic Computing, pp. 69–82.
  43. 43 Pujol, J.M., Erramilli, V., Siganos, G., Yang, X., Laoutaris, N., Chhabra, P., and Rodriguez, P. (2010) The little engine(s) that could: scaling online social networks, in Proceedings of the ACM SIGCOMM Conference, pp. 375–386.
  44. 44 Tsoumakos, D., Konstantinou, I., Boumpouka, C., Sioutas, S., and Koziris, N. (2013) Automated, elastic resource provisioning for NoSQL clusters using TIRAMOLA, in Proceedings of the 13th IEEE/ACM International Symposium on Cluster, Cloud, and Grid Computing. pp. 34–41.
  45. 45 Didona, D., Romano, P., Peluso, S., and Quaglia, F. (2012) Transactional Auto Scaler: elastic scaling of in-memory transactional data grids, in Proceedings of the 9th International Conference on Autonomic Computing, pp. 125–134.
  46. 46 Herodotou, H., Lim, H., Luo, G., Borisov, N., Dong, L., Cetin, F.B., and Babu, S. (2011) Starfish: a self-tuning system for big data analytics, in Proceedings of the 5th Biennial Conference on Innovative Data Systems Research, pp. 261–272. Available at
  47. 47 Herodotou, H., Dong, F., and Babu, S. (2011) No one (cluster) size fits all: automatic cluster sizing for data-intensive analytics, in Proceedings of the 2nd ACM Symposium on Cloud Computing, pp. 18:1–18:14.
  48. 48 Gonzalez, J.E., Low, Y., Gu, H., Bickson, D., and Guestrin, C. (2012) PowerGraph: Distributed graph-parallel computation on natural graphs, in Proceedings of the 10th USENIX Symposium on Operating Systems Design and Implementation, pp. 17–30. Available at
  49. 49 Low, Y., Bickson, D., Gonzalez, J., Guestrin, C., Kyrola, A., and Hellerstein, J.M. (2012) Distributed GraphLab: a framework for machine learning and data mining in the cloud. Proceedings of the VLDB Endowment, 5 (8), 716–727.
  50. 50 Stanton, I. and Kliot, G. (2012) Streaming graph partitioning for large distributed graphs, in Proceedings of the 18th ACM SIGKDD International Conference on Knowledge Discovery and Data Mining, pp. 1222–1230.
  51. 51 Vaquero, L., Cuadrado, F., Logothetis, D., and Martella, C. (2013) Adaptive partitioning for large-scale dynamic graphs, in Proceedings of the 4th Annual Symposium on Cloud Computing, pp. 35:1–35:2.
  52. 52 Salihoglu, S. and Widom, J. (2013) GPS: a graph processing system, in Proceedings of the 25th International Conference on Scientific and Statistical Database Management, pp. 22:1–22:12.
  53. 53 Tanenbaum, A.S. (2008) The operating system as a resource manager, in Modern Operating Systems, 3rd edn, Pearson Prentice Hall, Upper Saddle River, NJ, Chapter 1, Section 1.1.2, pp. 6–7.
  54. 54 Ananthanarayanan, G., Ghodsi, A., Wang, A., Borthakur, D., Kandula, S., Shenker, S., and Stoica, I. (2012) PACMan: coordinated memory caching for parallel jobs, Proceedings of the 9th USENIX Symposium on Networked Systems Design and Implementation. Available at
  55. 55 Ananthanarayanan, G., Douglas, C., Ramakrishnan, R., Rao, S., and Stoica, I. (2012) True elasticity in multi-tenant data-intensive compute clusters, in Proceedings of the 3rd ACM Symposium on Cloud Computing, Article No. 24.
  56. 56 Rao, S., Ramakrishnan, R., Silberstein, A., Ovsiannikov, M., and Reeves, D. (2012) Sailfish: a framework for large scale data processing, in Proceedings of the 3rd ACM Symposium on Cloud Computing, Article No. 4.
  57. 57 Netty (2013) Available at
  58. 58 Zaharia, M., Borthakur, D., Sarma, J.S., Elmeleegy, K., Shenker, S., and Stoica, I. (2010) Delay scheduling: a simple technique for achieving locality and fairness in cluster scheduling, in Proceedings of the 5th European Conference on Computer Systems, pp. 265–278.
  59. 59 Cheng, L., Zhang, Q., and Boutaba, R. (2011) Mitigating the negative impact of preemption on heterogeneous MapReduce workloads, in Proceedings of the 7th International Conference on Network and Service Management.
  60. 60 Preemption of Reducer (and Shuffle) via checkpointing. Hadoop MapReduce, MAPREDUCE-5269, the Apache Software Foundation, 2013. Available at
  61. 61 Power, R. and Li, J. (2010) Piccolo: building fast, distributed programs with partitioned tables, in Proceedings of the 9th USENIX Symposium on Operating Systems Design and Implementation. Available at
  62. 62 Verma, A., Cherkasova, L., and Campbell, R.H. (2011) ARIA: automatic resource inference and allocation for MapReduce environments, in Proceedings of the 8th ACM International Conference on Autonomic Computing, pp. 235–244.
  63. 63 Wieder, A., Bhatotia, P., Post, A., and Rodrigues, R. (2012) Orchestrating the deployment of computations in the cloud with conductor, in Proceedings of the 9th USENIX Symposium on Networked Systems Design and Implementation. Available at
  64. 64 Phan, L.T.X., Zhang, Z., Loo, B.T., and Lee, I. (2010) Real-time MapReduce scheduling, University of Pennsylvania Department of Computer and Information Science Technical Report MS-CIS-10-32, Jan. 1. Available at
  65. 65 Ferguson, A.D., Bodik, P., Kandula, S., Boutin, E., and Fonseca, R. (2012) Jockey: guaranteed job latency in data parallel clusters, in Proceedings of the 7th ACM European Conference on Computer Systems, pp. 99–112.
  66. 66 Ganapathi, A., Chen, Y., Fox, A., Katz, R., and Patterson, D. (2010) Statistics-driven workload modeling for the cloud, in Proceedings of the IEEE 26th International Conference on Data Engineering Workshops, pp. 87–92.
  67. 67 Kambatla, K., Pathak, A., and Pucha, H. (2009) Towards optimizing Hadoop provisioning in the cloud, in Proceedings of the USENIX Workshop Hot Topics in Cloud Computing. Available at
  68. 68 Ananthanarayanan, G., Kandula, S., Greenberg, A., Stoica, I., Lu, Y., Saha, B., and Harris, E. (2010) Reining in the outliers in Map-Reduce clusters using Mantri, in Proceedings of the 9th USENIX Symposium on Operating Systems Design and Implementation. Available at
  69. 69 Zaharia, M., Konwinski, A., Joseph, A.D., Katz, R., and Stoica, I. (2008) Improving MapReduce performance in heterogeneous environments, in Proceedings of the 8th USENIX Symposium on Operating Systems Design and Implementation, pp. 29–42. Available at
  70. 70 Sandholm, T. and Lai, K. (2010) Dynamic proportional share scheduling in Hadoop, in Proceedings of the 15th International Conference on Job Scheduling Strategies for Parallel Processing, pp. 110–131.
  71. 71 Condie, T., Conway, N., Alvaro, P., Hellerstein, J.M., Elmeleegy, K., and Sears, R. (2010) MapReduce online, in Proceedings of the 7th USENIX Conference on Networked Systems Design and Implementation.
  72. 72 Zaharia, M., Chowdhury, M., Franklin, M.J., Shenker, S., and Stoica, I. (2010) Spark: cluster computing with working sets, in Proceedings of the 2nd USENIX Workshop Hot Topics in Cloud Computing. Available at
  73. 73 Storm. Available at
  74. 74 Qian, Z., He, Y., Su, C., Wu, Z., Zhu, H., Zhang, T., Zhou, L., Yu, Y., and Zhang, Z. (2013) TimeStream: reliable stream computation in the cloud, in Proceedings of the 8th ACM European Conference on Computer Systems.
  75. 75 IBM InfoSphere Platform. Available at
  76. 76 Agarwal, S., Mozafari, B., Panda, A., Milner, H., Madden, S., and Stoica, I. (2013) BlinkDB: queries with bounded errors and bounded response times on very large data, in Proceedings of the 8th ACM European Conference on Computer Systems, pp. 29–42.
  77. 77 Cruz, F., Maia, F., Matos, M., Oliveira, R., Paulo, J., Pereira, J., and Vilaça, R. (2013) MeT: workload aware elasticity for NoSQL, in Proceedings of the 8th ACM European Conference on Computer Systems, pp. 183–196.
  78. 78 Liu, C.L. and Layland, J.W. (1973) Scheduling algorithms for multiprogramming in a hard-real-time environment. Journal of ACM, 20 (1), 46–61.
  79. 79 Liu, J.W.S. (2000) Real-Time Systems, Prentice Hall.
  80. 80 Goossens, J., Funk, S., and Baruah, S. (2003) Priority-driven scheduling of periodic task systems on multiprocessors. Real-Time Systems, 25 (2), 187–205.
  81. 81 Dertouzos, M.L. and Mok, A.K.-L. (1989) Multiprocessor on-line scheduling of hard-real-time tasks. IEEE Transactions on Software Engineering, 15 (12), 1497–1506.
  82. 82 Santhoshkumar, I., Manimaran, G., and Murthy, C.S.R. (1999) A pre-run-time scheduling algorithm for object-based distributed real-time systems. Journal of Systems Architecture, 45 (14), 1169–1188.
  83. 83 Hadoop Capacity Scheduler. Available at
  84. 84 Fair Scheduler, Hadoop, Apache Software Foundation. Available at (accessed Aug. 4, 2013).
  85. 85 Remove pre-emption from the capacity scheduler code base. Hadoop Common, HADOOP-5726, the Apache Software Foundation. Available at (updated June 22, 2012).
  86. 86 Isard, M., Prabhakaran, V., Currey, J., Wieder, U., Talwar, K., and Goldberg, A. (2009) Quincy: fair scheduling for distributed computing clusters, in Proceedings of the ACM SIGOPS 22nd Symposium on Operating Systems Principles, pp. 261–276.
  87. 87 Yu, Y., Isard, M., Fetterly, D., Budiu, M., Erlingsson, Ú., Gunda, P.K., and Currey, J. (2008) DryadLINQ: a system for general-purpose distributed data-parallel computing using a high-level language, in Proceedings of the 8th USENIX Symposium on Operating Systems Design and Implementation, pp. 1–14. Available at
  88. 88 Andrzejak, A., Kondo, D., and Yi, S. (2010) Decision model for cloud computing under SLA constraints, in Proceedings of the 2010 IEEE International Symposium on Modeling, Analysis and Simulation of Computer and Telecommunication Systems, pp. 257–266.
  89. 89 Amirijoo, M., Hansson, J., and Son, S.H. (2004) Algorithms for managing QoS for real-time data services using imprecise computation, in Real-Time and Embedded Computing Systems and Applications: 9th International Conference on, RTCSA 2003, Tainan, Taiwan, February 18–20, 2003: Revised Papers, Lecture Notes in Computer Science, vol. 2968 (eds. J. Chen and S. Hong), Springer, Berlin, Germany, pp. 136–157.
  90. 90 Schwarzkopf, M., Konwinski, A., Abd-El-Malek, M., and Wilkes, J. (2013) Omega: flexible, scalable schedulers for large compute clusters, in Proceedings of the 8th ACM European Conference on Computer Systems, pp. 351–364.
  91. 91 Wang, A., Venkataraman, S., Alspaugh, S., Katz, R., and Stoica, I. (2012) Cake: enabling high-level SLOs on shared storage systems, in Proceedings of the 3rd ACM Symposium on Cloud Computing, Article No. 14.
  92. 92 Calder, B., Wang, J., Ogus, A., Nilakantan, N., Skjolsvold, A., McKelvie, S., Xu, Y., Srivastav, S., Wu, J., Simitci, H., Haridas, J., Uddaraju, C., Khatri, H., Edwards, A., Bedekar, V., Mainali, S., Abbasi, R., Agarwal, A., ul Haq, M.F., ul Haq, M.I., Bhardwaj, D., Dayanand, S., Adusumilli, A., McNett, M., Sankaran, S., Manivannan, K., and Rigas, L. (2011) Windows Azure Storage: a highly available cloud storage service with strong consistency, in Proceedings of the 23rd ACM Symposium on Operating Systems Principles, pp. 143–157.
  93. 93 Adya, A., Dunagan, J., and Wolman, A. (2010) Centrifuge: integrated lease management and partitioning for cloud services, in Proceedings of the 7th USENIX Conference on Networked Systems Design and Implementation.
  94. 94 Hindman, B., Konwinski, A., Zaharia, M., Ghodsi, A., Joseph, A.D., Katz, R., Shenker, S., and Stoica, I. (2011) Mesos: a platform for fine-grained resource sharing in the data center, in Proceedings of the 8th USENIX Symposium on Networked Systems Design and Implementation. Available at
  95. 95 Shue, D., Freedman, M.J., and Shaikh, A. (2012) Performance isolation and fairness for multi-tenant cloud storage, in Proceedings of the 10th USENIX Symposium on Operating Systems Design and Implementation, pp. 349–362. Available at
  96. 96 NoSQL market forecast 2015–2020. Market Research Media, 2012. Available at (accessed Jan. 1, 2015).
  97. 97 Apache HBase. The Apache Software Foundation. Available at (accessed Jan. 5, 2015).
  98. 98 Chang, F., Dean, J., Ghemawat, S., Hsieh, W.C., Wallach, D.A., Burrows, M., Chandra, T., Fikes, A., and Gruber, R.E. (2006) Bigtable: a distributed storage system for structured data, in Proceedings of the 7th USENIX Symposium on Operating Systems Design and Implementation, pp. 205–218. Available at
  99. 99 Can I change the shard key after sharding a collection? in FAQ: Sharding with MongoDB FAQ. Available at (accessed Jan. 5, 2015).
  100. 100 Alter Cassandra column family primary key using cassandra-cli or CQL, Stack Overflow. Available at (accessed Jan. 5, 2015).
  101. 101 The great primary-key debate. TechRepublic, Mar. 22, 2012. Available at (accessed Jan. 5, 2015).
  102. 102 Ghosh, M., Wang, W., Holla, G., and Gupta, I. (2015) Morphus: supporting online reconfigurations in sharded NoSQL systems, in Proceedings of the IEEE International Conference on Autonomic Computing, pp. 1–10.
  103. 103 RethinkDB. Available at (accessed Jan. 1, 2015).
  104. 104 CouchDB. The Apache Software Foundation. Available at (accessed Jan. 5, 2015).
  105. 105 Ghosh, M., Wang, W., Holla, G., and Gupta, I. (2015) Morphus: supporting online reconfigurations in sharded NoSQL systems, in IEEE Transactions on Emerging Topics in Computing.
  106. 106 Hungarian algorithm. Wikipedia. Available at (accessed Jan. 1, 2015).
  107. 107 Al-Fares, M., Loukissas, A., and Vahdat, A., A scalable, commodity data center network architecture, in Proceedings of the ACM SIGCOMM 2008 Conference on Data Communication, pp. 63–74.
  108. 108 Kim, J., Dally, W.J., and Abts, D. (2010) Efficient topologies for large-scale cluster networks, in Proceedings of the 2010 Conference on Optical Fiber Communication collocated Natural Fiber Optic Engineers Conference, pp. 1–3.
  109. 109 Kim, J., Dally, W.J., and Abts, D. (2007) Flattened butterfly: a cost-efficient topology for high-radix networks, in Proceedings of the 34th Annual International Symposium on Computer Architecture, pp. 126–137.
  110. 110 McAuley, J. and Leskovec, J. (2013) Hidden factors and hidden topics: understanding rating dimensions with review text, in Proceedings of the 7th ACM Conference on Recommender Systems, pp. 165–172.
  111. 111 Emulab. Available at (accessed 2016).
  112. 112 Cooper, B.F., Silberstein, A., Tam, E., Ramakrishnan, R., and Sears, R. (2010) Benchmarking cloud serving systems with YCSB, in Proceedings of the 1st ACM Symposium on Cloud Computing, pp. 143–154.
  113. 113 Google Cloud Platform. Available at (accessed Jan. 5, 2015).
  114. 114 Shin, Y., Ghosh, M., and Gupta, I., Parqua: online reconfigurations in virtual ring-based NoSQL systems, in Proceedings of the 2015 International Conference on Cloud and Autonomic Computing, pp. 220–223.
  115. 115 Riak. Available at (accessed Jan. 1, 2015).
  116. 116 Amazon DynamoDB. Amazon Web Services, Inc. Available at (accessed May 5, 2015).
  117. 117 Voldemort. Available at May 12, 2014).
  118. 118 C An introduction to using custom timestamps in CQL3. Available at (accessed Apr. 25, 2015).
  119. 119 Shin, Y., Ghosh, M., and Gupta, I. (2015) Parqua: Online Reconfigurations in Virtual Ring-Based NoSQL Systems, Technical Report, University of Illinois at Urbana-Champaign. Available at
  120. 120 Xu, L., Peng, B., and Gupta, I. (2016) Stela: enabling stream processing systems to scale-in and scale-out on-demand, in Proceedings of the IEEE International Conference on Cloud Engineering, pp. 22–31.
  121. 121 Zaharia, M., Das, T., Li, H., Hunter, T., Shenker, S., and Stoica, I. (2013) Discretized streams: fault-tolerant streaming computation at scale, in Proceedings of the 24th ACM Symposium on Operating Systems Principles, pp. 423–438.
  122. 122 Armbrust, M., Fox, A., Griffith, R., Joseph, A.D., Katz, R., Konwinski, A., Lee, G., Patterson, D., Rabkin, A., Stoica, I., and Zaharia, M. (2010) A view of cloud computing. Communications of the ACM, 53 (4), 50–58.
  123. 123 Pundir, M., Kumar, M., Leslie, L.M., Gupta, I., and Campbell, R.H. (2016) Supporting on-demand elasticity in distributed graph processing, in Proceedings of the IEEE International Conference on Cloud Engineering, pp. 12–21.
  124. 124 Malewicz, G., Austern, M.H., Bik, A.J.C., Dehnert, J.C., Horn, I., Leiser, N., and Czajkowski, G., Pregel: a system for large-scale graph processing, in Proceedings of the 2010 ACM SIGMOD International Conference on Management of Data, pp. 135–146.
  125. 125 Low, Y., Gonzalez, J., Kyrola, A., Bickson, D., Guestrin, C., and Hellerstein, J. (2010) GraphLab: a new framework for parallel machine learning, in Proceedings of the 26th Conference on Uncertainty in Artificial Intelligence. Available at
  126. 126 Hoque, I. and Gupta, I. (2013) LFGraph: simple and fast distributed graph analytics, in Proceedings of the 1st ACM SIGOPS Conference on Timely Results in Operating Systems, pp. 9:1–9:17.
  127. 127 Stoica, I., Morris, R., Karger, D., Kaashoek, M.F., and Balakrishnan, H. (2001) Chord: a scalable peer-to-peer lookup service for Internet applications, in Proceedings of the Conference on Applications, Technologies, Architectures, and Protocols for Computer Communications, pp. 149–160.
  128. 128 Kuhn, H.W. (1955) The Hungarian method for the assignment problem. Naval Research Logistics Quarterly, 2 (1–2), 83–97.
  129. 129 Jonker, R. and Volgenant, T. (1986) Improving the Hungarian assignment algorithm. Operations Research Letters, 5 (4), 171–175.
  130. 130 Kwak, H., Lee, C., Park, H., and Moon, S. (2010) What is Twitter, a social network or a news media? in Proceedings of the 19th International Conference on World Wide Web, pp. 591–600.
  131. 131 Cho, B., Rahman, M., Chajed, T., Gupta, I., Abad, C., Roberts, N., and Lin, P. (2013) Natjam: design and evaluation of eviction policies for supporting priorities and deadlines in Mapreduce clusters, in Proceedings of the 4th Annual Symposium on Cloud Computing, pp. 6:1–6:17.
  132. 132 Dean, J. and Ghemawat, S. (2008) MapReduce: simplified data processing on large clusters. Communications of the ACM, 51 (1), 107–113.
  133. 133 Huai, Y., Lee, R., Zhang, S., Xia, C.H., and Zhang, X. (2011) DOT: a matrix model for analyzing, optimizing and deploying software for big data analytics in distributed systems, in Proceedings of the 2nd ACM Symposium on Cloud Computing, Article no. 4.
  134. 134 Thusoo A., Sarma, J.S., Jain, N., Shao, Z., Chakka, P., Anthony, S., Liu, H., Wyckoff, P., and Murthy, R. (2009) Hive: a warehousing solution over a map-reduce framework, in Proceedings of the VLDB Endowment, 2 (2), 1626–1629.
  135. 135 Olston, C., Reed, B., Srivastava, U., Kumar, R., and Tomkins, A. (2008) Pig Latin: a not-so-foreign language for data processing, in Proceedings of the ACM SIGMOD International Conference on Management of Data, pp. 1099–1110.
  136. 136 Ko, S.Y., Hoque, I., Cho, B., and Gupta, I. (2010) Making cloud intermediate data fault-tolerant, in Proceedings of the 1st ACM Symposium on Cloud Computing, pp. 181–192.
  137. 137 Chen, Y., Ganapathi, A., Griffith, R., and Katz, R. (2011) The case for evaluating MapReduce performance using workload suites, in Proceedings of the IEEE 19th Annual International Symposium on Modeling, Analysis and Simulation of Computer and Telecommunication Systems, pp. 390–399.
  138. 138 White, B., Lepreau, J., Stoller, L., Ricci, R., Guruprasad, S., Newbold, M., Hibler, M., Barb, C., and Joglekar, A. (2002) An integrated experimental environment for distributed systems and networks, in ACM SIGOPS Operating Systems Review – OSDI '02: Proceedings of the 5th Symposium on Operating Systems Design and Implementation, vol. 36, No. SI, pp. 255–270.