Tuesday, September 19, 2017

Paper summary. Gaia: Geo-Distributed Machine Learning Approaching LAN Speeds

This paper appeared in NSDI'17 and is authored by Kevin Hsieh, Aaron Harlap, Nandita Vijaykumar, Dimitris Konomis, Gregory R. Ganger,  Phillip B. Gibbons, and Onur Mutlu. 

Motivation

This paper proposes a framework to distribute an ML system across multiple datacenters, and train models at the same datacenter where the data is generated. This is useful because it avoids the need to move big data over wide-area networks (WANs), which can be slow (WAN bandwidth is about 15x less than LAN bandwidth), costly (AWS does not charge for inside datacenter communication but charges for WAN communication), and also prone to privacy or ownership concerns.

Google's Federated Learning also considered similar motivations, and set out to reduce WAN communication. It worked as follows: 1) smartphones are sent the model by the master/datacenter parameter-server,  2) smartphones compute an updated model based on their local data over some number of iterations, 3) the updated models are sent from the smartphones to the datacenter parameter-server, 4) the datacenter parameter-server aggregates these models (by averaging) to construct the new global model, and 5) Repeat.

The Gaia paper does not cite Federated Learning paper, because they are likely submitted around the same time. There are many parallels between Gaia's approach and that of Federated Learning. Both are based on the parameter-server model, and both prescribe updating the model parameters in a relaxed/stale/approximate synchronous parallel fashion: several iterations are run in-situ before updating the "master" parameter-server. The difference is for Federated Learning there is a "master" parameter-server in the datacenter, whereas Gaia takes a peer-to-peer approach where each datacenter has a parameter-server, and updating the "master" datacenter means synchronizing the parameter-servers across the datacenters.

Approximate Synchronous Parallelism (ASP) idea

Gaia's Approximate Synchronous Parallelism (ASP) idea tries to eliminate insignificant communication between data centers while still guaranteeing the correctness of ML algorithms. ASP is motivated by the observation that the vast majority of updates to the global ML model parameters from each worker are insignificant, e.g., more than 95% of the updates may produce less than a 1% change to the parameter value. With ASP, these insignificant updates to the same parameter within a data center are aggregated (and thus not communicated to other data centers) until the aggregated updates are significant enough.

ASP builds heavily on the Stale Synchronous Parallelism (SSP) idea for parameter-server ML systems. While SSP bounds how stale (i.e., old) a parameter can be, ASP bounds how inaccurate a parameter can be, in comparison to the most up-to-date value.

ASP allows the ML programmer to specify the function and the threshold to determine the significance of updates for each ML algorithm. The significance threshold has 2 parts: a hard and a soft threshold. The purpose of the hard threshold is to guarantee ML algorithm convergence, while that of soft threshold is to use underutilized WAN bandwidth to speed up convergence. In other words, the soft threshold provides an opportunistic synchronization threshold.

Architecture

The Gaia architecture is simple: It prescribes adding a layer of indirection to parameter-server model to account for multiple datacenter deployments.

Figure 4 shows the overview of Gaia. In Gaia, each data center has some worker machines and parameter servers. Each worker machine works on a shard of the input data stored in its data center to achieve data parallelism. The parameter servers in each data center collectively maintain a version of the global model copy, and each parameter server handles a shard of this global model copy. A worker machine only READs and UPDATEs the global model copy in its data center. To reduce the communication overhead over WANs, ASP is used between parameter-servers across different data centers. Below are the 3 components of ASP.

The significance filter. ASP takes 2 inputs from user: (1) a significance function and (2) an initial significance threshold. A parameter server aggregates updates from the local worker machines and shares the aggregated updates with other datacenters when the aggregate becomes significant. To facilitate convergence to the optimal point, ASP automatically reduces the significance threshold over time: if the original threshold is v, then the threshold at iteration t of the ML algorithm is $\frac{v}{\sqrt{t}}$.

ASP selective barrier. When a parameter-server receives the significant updates at a rate that is higher than the WAN bandwidth can support, instead of sending updates (which will take a long time), it first sends a short control message to other datacenters. The receiver of this ASP selective barrier message blocks its local workers from reading the specified parameters until it receives the significant updates from the sender of the barrier.

Mirror clock. This provides a final safety net implementing SSP across datacenters. When each parameter server receives all the updates from its local worker machines at the end of a clock (e.g., an iteration), it reports its clock to the servers that are in charge of the same parameters in the other data centers. When a server detects its clock is ahead of the slowest server, it blocks until the slowest mirror server catches up.


Evaluation

The paper evaluates Gaia with 3 popular ML applications. Matrix Factorization (MF) is a technique commonly used in recommender systems. Topic Modeling (TM) is an unsupervised method for discovering hidden semantic structures (topics) in an unstructured collection of documents, each consisting of a bag (multi-set) of words. Image Classification (IC) is a task to classify images into categories, and uses deep learning and convolutional neural networks (CNNs). All applications use SGD-based optimization.


The experiments, running across 11 Amazon EC2 global regions and on a cluster that emulates EC2 WAN bandwidth, compare Gaia against the Baseline that uses BSP (Bulk Synchronous Parallelism) across all datacenters and inside a LAN.

Questions

Is this general enough? The introduction says this should apply for SGD based ML algorithms. But are there hidden/implicit assumptions?

What are some examples of advanced significance functions? ML users can define advanced significance functions to be used with Gaia, but this is not explored/explained much in the paper. This may be a hard thing to do even for advanced users.

Even though it is easier to improve bandwidth than latency, the paper focuses on the challenge imposed by the limited WAN bandwidth rather than the WAN latency.  While the end metric for evaluation is completion time of training, the paper does not investigate the effect of network latency. How would the evaluations look if the improvements are investigated in correlation to latency rather than throughput limitations? (I guess we can have a rough idea on this, if we knew how much the barrier control message was used.)

Saturday, September 16, 2017

Paper summary. OpenCL Caffe: Accelerating and enabling a cross platform machine learning framework

This 2016 paper presents an OpenCL branch/port of the deep learning framework Caffe. More specifically, this branch replaces the CUDA-based backend of Caffe to an open standard OpenCL backend. The software was first located at https://github.com/amd/OpenCL-caffe, then  graduated to https://github.com/BVLC/caffe/tree/opencl.

Once we develop a DNN model, we ideally like to be able to deploy it for different applications across multiple platforms (servers, NVDIA GPUs, AMD GPUs, ARM GPUs, or even over smartphones and tablets) with minimum developing efforts. Unfortunately, most of the deep learning frameworks (including Caffe) are integrated with CUDA libraries for running on NVIDIA GPUs, and that limits portability across multiple platforms.

OpenCL helps for portability of heterogenous computing across platforms since it is supported by a variety of commercial chip manufacturers: Altera, AMD, Apple, ARM Holdings, Creative Technology, IBM, Imagination Technologies, Intel, Nvidia, Qualcomm, Samsung, Vivante, Xilinx, ZiiLABS and etc. In order to enable compatibility between different platforms, OpenCL program detects the specific devices and compiles at runtime.

OpenCL was originally developed by Apple, Inc and later submitted to Khronos Group. It is also supported by many operating systems including Android, FreeBSD, Linux, macOS, Windows.

OpenCL Backend Porting and Optimization

Caffe framework is originally written in C++ and CUDA. The CUDA layer of Caffe handles optimization of hardware resource allocation and utilization, e.g. CPU-GPU task assignment, memory management, and data transfer. Since CUDA and OpenCL are different in hardware device abstraction, memory buffer management, synchronization, data transfers, the OpenCL backbend porting is not a straightforward process.

The paper breaks the OpenCL porting process into two phases. Phase 1 achieves a layerwise porting of 3 layers, namely C++ machine learning interfaces, OpenCL wrappers, and GPU kernels. Layerwise porting means the layers are ported one by one and unit tested by using the originals of the other layers to guarantee correctness and convergence of the DNN algorithm.

After all layers are ported to OpenCL in Phase 1, the Phase 2 focuses on performance optimization. Profiling the OpenCL port in Phase 1 (via the AMD profiling tool, CodeXL, assisted with OpenCL event and printf) demonstrates some big bottlenecks. OpenCL's online compilation frequently calls clBuildProgram to create each GPU kernel: for 100 iterations of Cifar training, there were 63 clBuildProgram calls that took about 68% of the time. Another bottleneck was that the convolutional layers take up most of the computation time. BLAS's performance suffered from irregular tall and skinny matrix sizes from different layers.

To handle these, the paper proposes three key optimization techniques including kernel caching to avoid OpenCL online compilation overheads, a batched manner data layout scheme to boost data parallelism, and multiple command queues to boost task parallelism. The optimization techniques effectively map the DNN problem size into existing OpenCL math libraries, and improve hardware resources utilization and boost performance by 4.5x.

Evaluation

The evaluation uses the AlexNet DNN model for ImageNet. The evaluation compares the performance of Caffe with CUDA (including both cuBLAS and cuDNN v2) vs OpenCL (including both original clBLAS and batched manner optimization) on NVIDIA TitanX and AMD R9 Fury, with Alexnet model with mini-batch size 100. As shown in Figure 3, OpenCL Caffe with optimizations over clBLAS matches performance of cuBLAS Caffe.


Compared to the highly optimized machine learning cuDNN library, OpenCL Caffe still has a performance gap of 2x as it lacks those optimizations. The authors argue given the current performance, the OpenCL caffe is still competitive in terms of performance per dollar, considering the market price difference of AMD R9 Fury (about 560 dollars) and the NVIDIA TitanX (about 1000 dollars).

Cross Platform Capability Analysis

A natural question that occurs is, would their OpenCL port of Caffe that is tested on AMD GPUs would automatically work with ARM MALI GPUs as well? That would be a good test of portability of OpenCL port of Caffe. This has not been answered in the paper.

However, the authors caution about minor problems in compatibility. "There are some differences in specific manufacturers' extension and keywords. For example, caffe uses a lot of template in GPU kernels to support different floating point precision. But it turns out the template keywords for different manufactures are different, which adds more difficulty for the same code to run on different platform without modifications."

OpenCL support of deep learning frameworks is still not great, but hopefully it is getting better everyday as this paper shows.

The slides presentation for the paper is also available here.

Tuesday, September 12, 2017

Retroscope: Retrospective Monitoring of Distributed Systems (part 2)

This post, part 2, focuses on monitoring distributed systems using Retroscope. This is a joint post with Aleksey Charapko. If you are unfamiliar with hybrid logical clocks and distributed snapshots, give part 1 a read first.


Monitoring and debugging distributed applications is a grueling task. When you need to debug a distributed application, you will often be required to carefully examine the logs from different components of the application and try to figure out how these components interact with each other. Our monitoring solution, Retroscope, can help you with aligning/sorting these logs and searching/focusing on the interesting parts. In particular, Retroscope captures a progression of globally consistent distributed states of a system and allows you to examine these states and search for global predicates.

Let’s say you are working on debugging ZooKeeper, a popular coordination service. Using Retroscope you can easily add instrumentation to the ZooKeeper nodes to log and stream events to the Retroscope Storage and Compute module. After that, you can use RQL, Retroscope's simple SQL-like query language, to process your queries on those logs to detect the predicates/conditions that hold on the globally consistent cuts sorted in Retroscope's compute module.


Let's say you want to examine the staleness of the data in the ZooKeeper cluster (since ZooKeeper allows local reads from a single node, a client can read a stale value that has already been updated). By adding instrumentation to track the versions of znodes (the objects that keep data) at each ZooKeeper node, you can monitor how a particular znode changes across the entire cluster in a progression of globally consistent cuts with a very short query: "SELECT vr1 FROM setd;", where setd is the name of the log keeping track of changes and vr1 is version of a znode r1.

The above example still requires you to sift through all the changes in znode r1 across a progression of cuts to see whether there are stale values of r1 in some of the cuts. Instead, you can modify the query to have it return only globally consistent cuts that contain stale versions of the znode: "SELECT vr1 FROM setd WHEN vr1 - vr1 >= 2". The expression "vr1 - vr1 >= 2" is acceptable in Retroscope, since multiple different copies of variable vr1 exist on different nodes (one at each node). With this expression, Retroscope computes the difference between znode r1’s versions across any two nodes for all possible combinations of nodes, and if the difference is 2 or more, Retroscope emits the cut. This query will give you a precise knowledge about global states in which znode differs by at least 2 versions between nodes in the system.

Let's say you got curious about the cause for the staleness in some of the ZooKeeper nodes. By including more information to be returned in each cut, you can test various hypotheses about the causes of staleness.  For instance, if you suspect that some ZooKeeper nodes to be unproportionally loaded by clients, you can test it with a query that returns a number of active client connections: "SELECT vr1, client_connections FROM setd WHEN vr1 - vr1 >= 2".  And you can further refine your query to filter staleness cases to when imbalance in client connections is high: "SELECT vr1, client_connections FROM setd WHEN vr1 - vr1 >= 2 AND client_connections - client_connections > 50".

Architecture

OK, let's look at how Retroscope works under the hood.

To keep the overhead as low as possible for the system being monitored, we augment the target system nodes with only a lightweight RetroLog server module to extract/forward logs to a separate Retroscope storage/compute cluster for monitoring. RetroLogs employ Apache Ignite’s streaming capabilities to deliver the logs to the Retroscope storage/compute cluster. RetroLogs also provide Hybrid Logical Clock (HLC) capabilities to the target system being monitored. HLC is an essential component of Retroscope, as it allows the system to identify consistent cuts. The logging part is similar to how regular logging tools work, except the values being logged are not simply messages, but key-value pairs with the key being the variable being monitored and the value being, well, the current value of the variable.



The architecture figure above shows the insides of the Retroscope storage/compute cluster implemented using Apache Ignite. As the RetroLog server stream logs to Ignite cluster for aggregation and storage, the Ignite streaming breaks messages into blocks based on their HLC time. For instance, all messages between timestamps 10 and 20 will be in the same block. The Ignite Data Grid then stores the blocks until they are needed for computing the past consistent cuts.

The RQLServer acts as the query parser and task scheduler for our monitoring tool. It breaks down the duration of state-changes over which a query needs to be evaluated into non-overlapping time-shards and assigns these time-shards to worker nodes for evaluation. As workers process through their assignments, they emit the results back to the RQLServer which then presents the complete output to you.


Let's now zoom in to the Retroscope workers. These workers are designed to search for predicates through a sequence of global distributed states. The search always starts at some known state and progresses backwards one state-change at a time. (Each time-shard has its ending state computed before being assigned to the workers. This process is done only once, as time-shards are reused between different queries. When workers perform the search, they always start at the time-shard's ending state and progress backwards one state-change at a time.) As the worker moves backwards and undo the changes, it arrives to new consistent cuts and evaluates the query predicate to decide if the cut must be emitted to the user or not. The figure illustrates running "SELECT vr1 FROM setd WHEN vr1 - vr1 >= 2" query on a ZooKeeper cluster of 3 nodes. Each time a variable's value is changed, it is being recorded into a Retroscope log. As the systems starts to search for the global states satisfying the query condition, it first looks at state c6. At this cut the difference between versions on Node 3 and Node 1 is 2, meaning that the worker emits the cut back. The system then moves to a previous cut c5, however at this point the staleness is only 1 version, so the cut is skipped. The search ends when all consistent cuts are evaluated this way. In the example, you will see cuts c6, c3, c2, c1, and c0 being emitted, while other cuts have small staleness and do not satisfy the condition of the query.

Preliminary Evaluation

We performed a preliminary evaluation of Retroscope monitoring on ZooKeeper to study the znode staleness from the server perspective. To the best of our knowledge, no prior studies have been conducted to see how stale ZooKeeper values can be from the server stance.

We started by looking at a normal running ZooKeeper cluster on AWS and ran a test workload of 10,000 updates to either a single znode or 10 znodes. All the updates were done by a single synchronous client to make sure that the ZooKeeper cluster is not saturated under a heavy load. After the updates are over, we ran a simple retrospective RQL query to retrieve consistent states at which values have been changing in znode r1: "SELECT vr1 FROM setd;", where setd is the name of the log keeping track of changes and vr1 is version of znode r1. This query produces lots of cuts that must be examined manually, however a quick glance over it shows that a staleness of 1 version is common. This is a normal behavior, as Retroscope was capturing staleness of 1 version right after the value has been committed, and the commit messages may still be in flight to the follower nodes.

To search whether there exists any cases where staleness is 2 versions or greater, we then evaluated the following query: "SELECT vr1 FROM setd WHEN vr1 - vr1 >= 2". With this query we learned that by targeting a single znode, we were able to observe a staleness of 2 or more versions on about 0.40% of consistent cuts examined. Surprisingly, we also observed a spike of 9 versions in staleness at one cut, however the stale node was then able to quickly catch up. (Probably that spike was due to garbage collection.) We observed that spreading the load across 10 znodes reduced the staleness observed for a single znode.


We performed the same staleness experiment on the cluster with one straggler ZooKeeper node that was artificially made to introduce a slight delay of 2ms in the processing of incoming messages. With the straggler, ZooKeeper staleness hit extremes: the slow node was able to keep up with the rest of the cluster only for the first 7 seconds of the test run but then fell further and further behind.


We measured the query processing performance on a single AWS EC2 t2.micro worker node with  1 vCPU and 1 GB of RAM. With such limited resources, a Retroscope worker was still able to process up to 14,000 consistent cuts per second in our ZooKeeper experiments.


Future Plans

We are working on enhancing the capabilities of Retroscope to provide richer and more universal querying language for variety of monitoring and debugging situations. Our plans include adding more complex searching capabilities with conditions that can span across multiple consistent cuts. We are also expanding the logging and processing capabilities to allow keeping track of arrays and sets instead of just single-value variables.

Related Links
Here are my other posts on "monitoring" systems:
http://muratbuffalo.blogspot.com/search?q=monitoring

Monday, September 11, 2017

Web Data Management in RDF Age

This was the keynote on ICDCS'17 Day 2, by Tamer Ozsu. Below are my notes from his talk. The slides for his presentation are available here.

Querying web data presents challenges due to lack of a schema, its volatility, and its sheer scale. There have been several recent approaches to querying web data, including XML, JSON, and fusion tables. This talk is about another approach to maintaining and querying web data: RDF and SPARQL. This last one is the recommended way by W3C (World Wide Web Consortium) and is a building block for semantic web and Linked Open Data (LOD). Here is a diagram denoting the LOD datasets and links between them as of 2014.

Resource Description Framework (RDF)

In RDF, everything is a uniquely named resource (URI). The ID for Jack Nicholson's resource is JN29704. Resources have defined attributes: y:JN29704 hasName = "Jack Nicholson", y:JN29704 BornOnDate = "1937-04-22". The relationships with other resources can be defined via predicates.(This is "predicates in grammar context", and not in logic context.) Predicates for triples of the form "Subject Predicate Object". The subjects always URIs, and the objects are "literals" or URIs.

It turns out biologists are heavy users of RDF with the UniProt dataset.

RDF query model: SPARQL

SPARQL provides a flavor of SQL. It operates on the RDF triple, and now the _variables_ can be used to substitute in place of subject, predicate, object as well. Thus it is also possible to represent SPARQL queries as a graph. Once you represent the SPARQL query as a graph, answering a SPARQL query reduces to a subgraph matching matching problem between the RDF data-graph and the query-graph.

As in querying with SQL, too many joins are bad for performance, and a lot of work is extended to optimize this.

Distributed SPARQL processing

If it is possible to gather all RDF needed in one place, then querying can be done easily with using common cloud computing tools. You can partition and store the RDF on HDFS, and then run SPARQL queries on this as mapreduce jobs (say using Spark).

If it is not possible to gather all RDF data, you need to a distributed querying, and for this methods similar to distributed RDBMS can be used.

To complicate this further not all of the RDF data sites can process SPARQL queries. So several approaches are formulated to deal with the problem that poses for distributed SPARQL processing, such as writing wrappers to execute SPARQL queries on these sites.

Finally live querying of the web of RDF linked data is also possible by sending bots to traverse/browse this graph at runtime.

Thursday, September 7, 2017

Gratitude

The other day I had taken my little daughter to the library. There was a lady tutoring his son. The boy was slightly over-active and argumentative. He was wearing a wool Buffalo hat. It was warm in the library, so that seemed out of place. It was also strange that the boy, who is around 7-8 years old, was not in school at this time of the day. But I can't put 2 and 2 together.

I assumed that the lady is home-schooling the boy. Since I was interested about how home-schooling works (what works and what not), I asked her about it. (Although I am a shy introvert, I am not shy from asking people questions when I am curious about something. That is a little oddity I have.) She said she was not homeschooling, but her son missed a lot of classes, so she was trying to help him catch up. At that moment, I notice the thick book she was reading: "CANCER"!

My heart sank. I tried to seem upbeat though. I asked the boy his name, Nicholas, and wished him best of luck with his studies.

---

Life is that way. We get selfish and self-centered when left to our devices. We think we have it bad, that we have big challenges. We become drama queens.  We are stressed about work, we complain about other small stuff. And occasionally we are hit with an experience like that, where a real challenge presents itself for us or for others, that reminds us how grateful we should have been.

We should not give into entropy and get too self-centered. Everybody has their own problems, challenges, and burdens. And sometimes those are huge challenges. The world would be a better place if we are more grateful for ourselves and for the moment and try to sympathize and help others with their challenges, rather than being self-absorbed with our petty problems.

Here is a related video I came across recently on gratitude.

This one is for kids, but hey it is simple. 

Wednesday, September 6, 2017

Paper summary. Untangling Blockchain: A Data Processing View of Blockchain Systems

This is a recent paper submitted to arxiv (Aug 17, 2017). The paper presents a hype-free, very accessible, yet technical  survey on blockchain. It takes a data-processing centric view of blockchain technology, treating  concurrency issues and efficiency of consensus as first class citizens in blockchain design. Approaching from that perspective, the paper  tries to forge a connection with blockchain technologies and distributed transaction processing systems. I think this perspective is promising for grounding the blockchain research better. When we connect a new area to an area with a large literature, we have opportunities to compare/contrast and perform efficient OODA loops.

Thanks to this paper, I am finally getting excited about blockchains. I had a huge prejudice about blockchains based on the Proof-of-Work approach being used in BitCoin. Nick Szabo tried to defend this huge inefficiency, citing that it is necessary for attestation/decentralized/etc, but that is a false dichotomy. There are many efficient ways to design blockchains and still provide decentralization and attestation guarantees. And the paper provides an exhaustive survey of blockchain design approaches.

In my summary below, I use many sentences (sometimes paragraphs) directly lifted off from the paper. The paper is easily accessible and well written, and worth reading if you like to learn about blockchains.

Blockchain basics

Blockchain is a log of ordered blocks, each containing multiple transactions. In the database context, blockchain can be viewed as a solution to distributed transaction management: nodes keep replicas of the data and agree on an execution order of transactions. Distributed transaction processing systems often deal with concurrency control via 2-phase commit. However, in order to tolerate Byzantine behavior, the overhead of concurrency control becomes higher in blockchains.

Blockchains have many applications in business, such as security trading and settlement, asset and finance management banking, and insurance. Currently these applications are handled by systems built on MySQL and Oracle and by trusted companies/parties with whole bunch of lawyers/clerks/etc. Blockchain can disrupt this status quo because it reduces infrastructure and human costs. Blockchain's immutability, transparency, and inherent security help reduce human errors/malice and the need for manual intervention due to conflicting data. The paper says: "Goldman Sachs estimated 6 billion saving in current capital market, and J.P. Morgan forecast that blockchains will start to replace currently redundant infrastructure by 2020."

Oops, I forgot to mention Bitcoin. Bitcoin is also one of the applications of blockchains, namely a digital/crypto-currency application. Cryptocurrency is currently the most famous application of blockchains, but as the paper covers smartcontracts, you can see that those applications will dwarf anything we have seen so far.

Private vs Public blockchains

In public blockchains, any node can join/leave the system, thus the blockchain is fully decentralized, resembling a peer-to-peer system. In private blockchains, the blockchain enforces strict membership via access-control and authentication.

Public blockchain: Bitcoin uses proof-of-work (PoW) for consensus: only a miner which has successfully solved a computationally hard puzzle (finding the right nonce for the block header) can append to the blockchain. It is still possible that two blocks are appended at the same time, creating a fork in the blockchain. Bitcoin resolves this by only considering a block as confirmed after it is followed by a number of blocks (typically six blocks).  PoW works well in the public settings because it guards against Sybil attacks, however, since it is non-deterministic and computationally expensive, it is unsuitable for applications such as banking and finance which must handle large volumes of transactions in a deterministic manner. (Bitcoin supports 7 transactions per second!)


Private blockchain: Since node identities are known in the private settings, most blockchains adopt one of the protocols from the vast literature on distributed consensus. Hyperledger v0.6.0 uses PBFT, a byzantine tolerant Paxos, while some other private chains even use plain Paxos. (Hyperledger  v1.0.0-rc1 adopts a no-Byzantine consensus protocol based on Kafka.)

However, even with PBFT (or XFT) scalability is a problem. Those Paxos protocols will not be able to scale to tens of nodes, let alone hundreds. An open research question is how can you have big-Paxos? (The federated consensus approach mentioned below is a promising approach to solve this problem.)

What makes Blockchains tick?

The paper studies blockchain technology under four dimensions: distributed ledger, cryptography, consensus protocol and smartcontracts.

Distributed ledger: In blockchains, the ledger is replicated over all the nodes as an append-only data structure. A blockchain starts with some initial states, and the ledger records entire history of update operations made to the states. (This reminds me of Tango and particularly the followup vCorfu paper. The ideas explored there can be used for efficiently maintaining multiple streams in the same log so that reading the whole chain is not needed for materializing the updates at the nodes.)

Cryptography: This is used for ensuring the integrity of the ledgers, for making sure there is no tampering of the blockchain data. The global states are protected by a hash (Merkle) tree whose root hash is stored in a block.  The block history is also protected by linking the blocks through a chain of cryptographic hash pointers: the content of block number n+1 contains the hash of block number n. This way, any modification in block n immediately invalidates all the subsequent blocks.

Smart Contracts: A smart contract refers to the computation executed when a transaction is performed. At the trivial end of the spectrum, Bitcoin nodes implement a simple replicated state machine model which moves coins from one address to another. At the other extreme, Ethereum smart contracts can specify arbitrary computations and comes with its own virtual machine for executing Ethereum bytecodes. Similarly, Hyperledger employs Docker containers to execute its contracts written in any language. Smart contracts have many emerging business applications, but this makes software bugs very critical. In the DAO attack, the attacker stole $50M worth of asset exploiting a bug.


Consensus:

Since I am partial to distributed consensus, I am reserving a separate section to the use of consensus in blockchains.

Proof of work: All PoW protocols require miners to find solutions to cryptographic puzzles based on cryptographic hashes. How fast a block is created is dependent on how hard the puzzle is. The puzzle cannot be arbitrary east because it leads to unnecessary forks in the blockchain. Forks not only lead to wastage of resources but have security implication since they make it possible to double spend. Ethereum, another PoW-based blockchain, adopts GHOST protocol which helps bring down block generation time to tens of seconds without compromising much security. In GHOST, the blockchain is allowed to have branches as long as the branches do not contain conflicting transactions.

Proof of Stake: To alleviate huge cost of PoW, PoS maintains a single branch but changes the puzzle's difficulty to be inversely proportional to the miner's stake in the network. A stake is essentially a locked account with a certain balance representing the miner's commitment to keep the network healthy.

Paxos protocols: PoW suffers from non-finality, that is a block appended to a blockchain is not confirmed until it is extended by many other blocks.  PBFT protocol is deterministic. Implemented in the earlier version of Hyperledger (v0.6), the protocol ensures that once a block is appended, it is final and cannot be replaced or modified. It incurs O(N^2) network messages for each round of agreement where N is the number of nodes in the network. (This is because in contrast to classical Paxos, where every participant talks only with the leader, in PBFT,  every participant talks to every other participant as well.) The paper reports that PBFT scales poorly and collapses even before reaching the network limit. Zyzzyva optimizes for normal cases (when there are no failures) via speculative execution. XFT, assumes a network less hostile than purely Byzantine, and demonstrates better performance by reducing the number of network messages.

Federated: To overcome the scalability woes with PBFT, Ripple adopts an approach that partitions the network into smaller groups called federates. Each federate runs a local consensus protocol among its members, and the local agreements are then propagated to the entire network via nodes lying in the intersections of the federates. Ripple’s safety conditions are that there is a large majority of honest nodes in every federate, and that the intersection of any two federates contain at least one honest node. Ripple assumes federates are pre-defined and their safety conditions can be enforced by a network administrator. In a decentralized environment where node identities are unknown, such assumptions do not hold. Byzcoin and Elastico propose novel, two-phase protocols that combine PoW and PBFT. In the first phase, PoW is used to form a consensus group. Byzcoin implements this by having a sliding window over the blockchain and selecting the miners of the blocks within the window. Elastico groups nodes by their identities that change every epoch. In particular, a node identity is its solution to a cryptographic puzzle. In the second phase, the selected nodes perform PBFT to determine the next block. The end result is faster block confirmation time at a scale much greater than traditional PBFT (over 1000 nodes).

Nonbyzantine: The latest release of Hyperledger (v1.0) outsources the consensus component to Kafka. More specifically, transactions are sent to a centralized Kafka service which orders them into a stream of events. Every node subscribes to the same Kafka stream and therefore is notified of new transactions in the same order as they are published.


I love that Blockchains brings new constraints and requirements to the consensus problem. In Blockchains, the participants can now be Byzantine, motivated by financial gains. And it is not sufficient to limit the consensus participants to be 3 nodes or 5 nodes, which was enough for tolerating crashes and ensuring persistency of the data. In Blockchains, for reasons of attestability and tolerating colluding groups of Byzantine participants, it is preferred to keep the participants at 100s. Thus the problem becomes: How do you design byzantine tolerant consensus algorithm that scales to 100s or 1000s of participants?

I love when applications push us to invent new distributed algorithms.

Blockbench and evaluation

The paper introduces a benchmarking framework, BLOCKBENCH, that is designed for understanding performance of private blockchains against data processing workloads. BLOCKBENCH is open source and contains YCSB and SmallBank data processing workloads as well as some popular Ethereum contracts.

The paper then presents a comprehensive evaluation of Ethereum, Parity and Hyperledger. The comparison against H-Store presented demonstrates that blockchains perform poorly at data processing tasks currently being handled by database systems. Although databases are designed without security and tolerance to Byzantine failures, the gap remains too big for blockchains to be disruptive to incumbent database systems.


The main findings are:

  • Hyperledger performs consistently better than Ethereum and Parity across the benchmarks. But it fails to scale up to more than 16 nodes.
  • Ethereum and Parity are more resilient to node failures, but they are vulnerable to security attacks that forks the blockchain.
  • The main bottlenecks in Hyperledger and Ethereum are the consensus protocols, but for Parity the bottleneck is caused by transaction signing.
  • Ethereum and Parity incur large overheads in terms of memory and disk usage. Their execution engine is also less efficient than that of Hyperledger.  
  • Hyperledger’s data model is low level, but its flexibility enables customized optimization for analytical queries.

Wednesday, August 30, 2017

Retrospective Lightweight Distributed Snapshots Using Loosely Synchronized Clocks

This is a summary of our recent work that appeared at ICDCS 2017. The tool we developed, Retroscope (available on GitHub), enables unplanned retrospective consistent global snapshots for distributed systems.

Many distributed systems would benefit from the ability to take unplanned snapshots of the past. Let's say Alice notices alarms going off for her distributed system deployment at 4:05pm. If she could roll-back to the state of the distributed system at 4:00pm, and roll forward step by step to figure out what caused the problems, she may be able to remedy the problem.

The ability to take retrospective snapshots requires each node to maintain a log of state changes and then to collate/align these logs to construct a consistent cut at a given time. However, clock uncertainty/skew among nodes is dangerous and can lead to taking an inconsistent snapshot. For example, the cut at 4:00pm in this figure using NTP is inconsistent, because event F is included in the cut, but causally preceding event E is not included.

To avoid this problem with physical clock synchronization, our snapshotting service Retroscope employs Hybrid Logical Clocks (HLC), that combine the benefits of physical time along with causality tracking of logical clocks (LC). By leveraging HLC, our system can take non-blocking unplanned/retrospective consistent snapshots of distributed systems with affinity to physical time.

Hybrid Logical Clocks

Each HLC timestamp is a two-part tuple: the first part is a shadow-copy of NTP time, and the second part is an overflow buffer used when multiple events have identical first parts in their HLC timestamp. The first part of HLC at a node is maintained as the highest NTP time that the node is aware of. (This comes from the node's own physical clock, or comes from a remote node from which a message was received recently.) The second part acts as the logical clock for events with identical first parts, and it is being reset every time the first part is updated.

The figure shows HLC in operation, with HLC timestamps in green, and the machine’s physical time in red. Note how message exchange between P1 and P2 bumps P2’s first HLC component to be higher than its own physical clock.

HLC not only keeps track of physical time, but also provides the same guarantees as the LC. Namely, HLC satisfies the LC condition: if e hb f then HLC.e < HLC.f. These HLC properties allows us to identify consistent cuts the same way we identify them with LC: a cut is consistent when all events have the same timestamp. In situations where we have no event with desired timestamp, we can create a phantom, non-mutating event at the desired timestamp and use it to identify the consistent cut.


Taking a Snapshot

An initiating agent can start the snapshot procedure by sending a snapshot request to every node. Each node, upon receiving the request performs a local snapshot of its current state, and uses the window-logs of state changes to undo the changes until the state reaches the requested HLC time. Each node performs snapshot independently, and there is no need for inter-node coordination on taking the snapshot. Once all machines have arrived to local snapshots at the same HLC time, we have obtained a global distributed snapshot (by virtue of the HLC feature stated above).

Our tool Retroscope also supports incremental snapshots to take an inexpensive snapshot in the vicinity of another snapshot by undoing (or redoing) events to reach the new one. These incremental snapshots are very useful for monitoring tasks, in which we need to explore many past system states in a step-through manner while searching for some invariant violation or fault causes.

We have implemented Retroscope as a set of libraries to be added to existing Java projects to enable retrospective snapshots capabilities. Our Retroscope library provides tools to log and keep track of state changes and HLC time of such changes independently at each node. The current implementation uses in-memory sliding-window log for state history, and have configurable capacity. The Retroscope server library provides the tools to aggregate multiple such event logs and identify consistent cuts across those logs with affinity to physical time. The API also allows querying for consistent cuts that satisfy certain predicates using a SQL-like querying language.

We will talk about Retroscope's querying and monitoring features in a later blog post. 

Evaluation

We have added Retroscope capabilities to several Java applications, such as Voldemort key-value database, Hazelcast in memory data-grid, and ZooKeeper.

Retroscoping Voldemort took less than 1000 lines of code for adding HLC to the network protocol, recording changes in the Retroscope window-log, and performing snapshot on the Voldemort's storage. To quantify the overhead caused by Retroscope, we compared the performance of our modified Voldemort with unmodified version on a 10-node cluster. The figure below illustrates the throughput degradation resulted from adding Retroscope to the system. We observed that in the worst case, the degradation was around 10%, however in some cases we also observed no degradation at all. Latency overheads were similar to the throughput.


Taking a snapshot brings more stress to Voldemort, a disk-based system, as each node now needs to get a copy of it local state and undo changes from that copy. Retroscope allows for the entire snapshot routine to run in non-blocking manner, but with some performance degradation. The figure below illustrate the throughput and latency of the client requests on the 10-node cluster while performing a snapshot on a database of 1,000,000 items. Average throughput degradation of the system while taking a snapshot was 18%, although the performance can be improved by using a separate disk to make a database copy.


We also ran a similar experiment on Hazelcast in-memory datagrid, and observed very little performance degradation associated with the snapshot, since Hazelcast is in-memory system.


(This was a joint post with Aleksey Charapko.)

Sunday, August 27, 2017

Paper summary: Performance clarity as a first-class design principle (HOTOS'17)

This paper appeared in HOTOS'17 and is by Kay Ousterhout, Christopher Canel, Max Wolffe, Sylvia Ratnasamy, and Scott Shenker. (The team is at UC Berkeley Christopher is now at CMU.)

The paper argues that performance clarity is as important a design goal as performance or scalability. Performance clarity means ease of understanding where bottlenecks lie in a deployment and the performance implications of various system changes.

The motivation for giving priority to performance clarity arises from the performance opaqaness of distributed systems we have, and how hard it is to configure/tune them to optimize performance. In current distributed data processing systems, a small change in hardware or software configurations may cause disproportional impact on performance. An earlier paper, Ernest, showed that selecting an appropriate cloud instance type could improve performance by 1.9× without increasing cost. And when things run slowly in a deployment, it is hard to determine where the bottlenecks are.

This discussion on performance clarity reminds me of a Tony Hoare quote on software design:
There are two ways of constructing a software design: One way is to make it so simple that there are obviously no deficiencies, and the Other way is to make it so complicated that there are no obvious deficiencies. The first method is far more difficult. 
This paper advocates to design the system so clear that if there are deficiencies in the deployment/configuration, they become obvious and can be accounted for.

Monotasks

Concretely, the paper proposes an architecture for data analytics frameworks (Spark in particular) in which jobs are decomposed into schedulable small units called "monotasks". Each monotask use exactly one of CPU, disk, and network.

This is an unconventional take. Today's frameworks break jobs into tasks that time-share the use of CPU, disk, and network. Concurrent tasks may contend for resources, even when their aggregate resource use does not exceed the capacity of the machine. In contrast, a single monotask has exclusive access to a resource. The benefit this provides is guaranteed resource isolation without any cross contention from another tasks.



Using monotasks to explicitly separate resources makes the bottleneck visible: the bottleneck is simply the resource (disk, CPU, or network) with the longest queue.


Implementation

The hypothesis tested in the evaluation is that "using monotasks improves performance clarity without sacrificing high performance".

They give a prototype implementation of monotasks for Apache Spark, called MonoSpark, that  achieves performance parity with Spark for benchmark workloads. Their monotask implementation also yields a simple performance model that can predict the effects of future hardware and software changes.

So does monotasks hurt performance? I am quoting from the paper:
"Because using monotasks serializes the resource use of a multitask, the work done by one of today’s multitasks will take longer to complete using monotasks. To compensate, MonoSpark relies on the fact that today’s jobs are typically broken into many more multitasks than there are slots to run those tasks, and MonoSpark can run more concurrent tasks than today’s frameworks. For example, on a worker machine with four CPU cores, Apache Spark would typically run four concurrent tasks: one on each core, as shown in Figure 1. With MonoSpark, on the other hand, four CPU monotasks can run concurrently with a network monotask (to read input data for future CPU monotasks) and with two disk monotasks (to write output), so runtime of a job as a whole is similar using MonoSpark and Spark. In short, using monotasks replaces pipelining with statistical multiplexing, which is robust without requiring careful tuning."

Conclusions

The monotasks idea is very simple. But simple ideas have a chance to win. Performance predictability becomes a snap with this, because it is so easy to see/understand what will happen in a slightly different configuration. Monotasks provide very nice observability and resource isolation features.

I suspect this is somewhat Spark specific. It looks like this works best with predictable and CPU/disk/network balanced tasks/applications. These conditions are satisfied for Spark and Spark workloads.

Finally, the monotasks ideas will not work in current virtual machine, container, or cloud environments. Monotasks require direct access to machines to implement per disk, per CPU, and network schedulers.

Dhalion: self-regulating stream processing in Heron (VLDB'17)

This paper appeared in VLDB'17, and is by Avrilia Floratou (Microsoft), Ashvin Agrawal (Microsoft), Bill Graham (Twitter), Sriram Rao (Microsoft), and Karthik Ramasamy (Streamlio).
 

Dhalion aims to further reduce the complexity of configuring and managing Heron streaming applications. It provides a self-tuning/self-correcting extension to Heron to monitor, diagnose, and correct problems with the deployment. (Dhalion is named after a mythical bird with interesting healing capabilities.)


It turns out there are only 4-5 category of things that can (is likely to) go wrong in a Heron deployment. (I had summarized Heron earlier here.) The detectors and diagnosers in Dhalion check for them. The resolver as a result tries one of these corresponding 4-5 correction actions. And the effectiveness of the correctors are then rated.

Yes, you heard it right, instead of guaranteed-to-work correctors, Dhalion rates the correctors' effectiveness and decides what to try next. "After every action is performed, Dhalion evaluates whether the action was able to resolve the problem or brought the system to a healthier state. If an action does not produce the expected outcome then it is blacklisted and it is not repeated again."

This is not a fool-proof method: what if the reason corrector is ineffective because another fault/condition started developing? Another problem I can see is interference among correctors causing problems. But I think Dhalion tries/evaluates correctors one by one, otherwise the interference issues obfuscates the evaluation of correctors.

So Dhalion takes a probabilistic approach to self-tuning and recovery. But maybe that should not be a cause for alarm.  The system is already faulty, why should the correctors be guaranteed to work? If the corrector framework can try/evaluate them quickly, and converge the system back, that is all that matters. (I am not sure if Dhalion can manage to do this fast enough for most scenarios.)

Saturday, August 19, 2017

Cloud fault-tolerance

I had submitted this paper to SSS'17. But it got rejected. So I made it a technical report and am sharing it here. While the paper is titled "Does the cloud need stabilizing?", it is relevant to the more general cloud fault-tolerance topic.

I think we wrote the paper clearly. Maybe too clearly.


Here is the link to our paperhttps://www.cse.buffalo.edu//tech-reports/2017-02.pdf
(Ideally I would have liked to expand on Section 4. I think that is what we will work on now.)

Below is an excerpt from the introduction of our paper, if you want to skim that before downloading the pdf.
-------------------------------------------------------------------------
The last decade has witnessed rapid proliferation of cloud computing. Internet-scale webservices have been developed providing search services over billions of webpages (such as Google and Bing), and providing social network applications to billions of users (such as Facebook and Twitter). While even the smallest distributed programs (with 3-5 actions) can produce many unanticipated error cases due to concurrency involved, it seems short of a miracle that these web-services are able to operate at those vast scales. These services have their share of occasional mishap and downtimes, but overall they hold up really well.

In this paper, we try to answer what factors contribute most to the high-availability of cloud computing services, what type of fault-tolerance and recovery mechanisms are employed by the cloud computing systems, and whether self-stabilization fits anywhere in that picture.
(Stabilization is a type of fault tolerance that advocates dealing with faults in a principled unified manner instead of on a case by case basis: Instead of trying to figure out how much faults can disrupt the system's operation, stabilization assumes arbitrary state corruption, which covers all possible worst-case collusions of faults and program actions. Stabilization then advocates designing recovery actions that takes the program back to invariant states starting from any arbitrary state.)

Self-stabilization had shown a lot of promise early on for being applicable in the cloud computing domain. The CAP theorem seemed to motivate the need for designing eventually-consistent systems for the cloud and self-stabilization has been pointed out by experts as a promising direction towards developing a cloud computing research agenda. On the other hand, there has not been many examples of stabilization in the clouds. For the last 6 years, the first author has been thinking about writing a "Stabilization in the Clouds" position paper, or even a survey when he thought there would surely be plenty of stabilizing design examples in the cloud. However, this proved to be a tricky undertaking. Discounting the design of eventually-consistent key-value stores and application of Conflict-free Replicated Data Types (CRDTs) for replication within key-value stores, the examples of self-stabilization in the cloud computing domain have been overwhelmingly trivial.

We ascribe the reason self-stabilization has not been prominent in the cloud  to our observation that cloud computing systems use infrastructure support to keep things simple and reduce the need for sophisticated design of fault-tolerance mechanisms. In particular, we identify the following cloud design principles to be the most important factors contributing to the high-availability of cloud services.


  • Keep the services "stateless" to avoid state corruption. By leveraging on distributed stores for maintaining application data and on ZooKeeper for distributed coordination, the cloud computing systems keep the computing nodes almost stateless. Due to abundance of storage nodes, the key-value stores and databases replicate the data multiple times and achieves high-availability and fault-tolerance.
  • Design loosely coupled distributed services where nodes are dispensable/substitutable. The service-oriented architecture, and the RESTful APIs for composing microservices are very prevalent design patterns for cloud computing systems, and they help facilitate the design of loosely-coupled distributed services. This minimizes the footprint and complexity of the global invariants maintained across nodes in the cloud computing systems. Finally, the virtual computing abstractions, such as virtual machines, containers, and lambda computing servers help make computing nodes easily restartable and substitutable for each other.
  • Leverage on low level infrastructure and sharding when building applications. The low-level cloud computing infrastructure often contain more interesting/critical invariants and thus they are  designed by experienced engineers, tested rigorously, and sometimes even formally verified. Higher-level applications leverage on the low-level infrastructure, and avoid complicated invariants as they resort to sharding at the object-level and user-level. Sharding reduces the atomicity of updates, but this level of atomicity has been adequate for most webservices, such as social networks.

A common theme among these principles is that they keep the services simple, and trivially "stabilizing", in the informal sense of the term. Does this mean that self-stabilization research is unwarranted for cloud computing systems? To answer this, we point to some silver lining in the clouds for stabilization research. We notice a trend that even at the application-level, the distributed systems software starts to get more complicated/convoluted as services with more ambitious coordination needs are being build.

In particular, we explore the opportunity of applying self-stabilization to tame the complications that arise when composing multiple microservices to provide higher-level services. This is getting more common with the increased consumer demand for higher-level and more sophisticated web services. The higher-level services are in effect implementing distributed transactions over the federated microservices from multiple geodistributed vendors/parties, and that makes them prone to state unsynchronization and corruption due to  incomplete/failed requests at some microservices. At the data processing systems level, we also highlight a need for self-regulating and self-stabilizing design for realtime stream processing systems, as these systems get more ambitious and complicated as well.

Finally, we point out to a rift in the cloud computing fault model and recovery techniques, which motivates the need for more sophisticated recovery techniques. Traditionally the cloud computing model adopted the crash failure model, and managed to confine the faults within this model. In the cloud, it was feasible to use multiple nodes to redundantly store state, and easily substitute a stateless worker with another one as nodes are abundant and dispensable. However, recent surveys on the topic remark that more complex faults are starting to prevail in the clouds, and recovery techniques of restart, checkpoint-reset, and devops involved rollback and recovery are becoming inadequate.

Friday, August 18, 2017

Paper Summary: Twitter Heron

This summary combines material from "Twitter Heron: Stream Processing at Scale" which appeared at Sigmod 15 and "Twitter Heron: Towards extensible streaming engines" paper which appeared in ICDE'17.

Heron is Twitter's stream processing engine. It replaced Apache Storm use at Twitter, and all production topologies inside Twitter now run on Heron. Heron is API-compatible with Storm, which made it easy for Storm users to migrate to Heron. Reading the two papers, I got the sense that the reason Heron was developed is to improve on the debugability, scalability, and manageability of Storm. While a lot of importance is attributed to performance when comparing systems, these features (debugability, scalability, and manageability) are often more important in real-world use.

The gripes with Storm

Hard to debug. In Storm, each worker can run disparate tasks. Since logs from multiple tasks are written into a single file, it is hard to identify any errors or exceptions that are associated with a particular task. Moreover, a single host may run multiple worker processes, but each of them could belong to different topologies. Thus, when a topology misbehaves (due to load or faulty code or hardware) it is hard to determine the root-causes for the performance degradation.

Wasteful for scheduling resources at the datacenter. Storm assumes that every worker is homogenous. This results in inefficient utilization of allocated resources, and often leads to overprovisioning to match the memory needs of the worker with the highest requirements at all the other workers.

Nimbus-related problems. The Storm Nimbus scheduler does not support resource reservation and isolation at a granular level for Storm workers. Moreover, the Nimbus component is a single point of failure.

ZooKeeper-related gripes. Storm uses Zookeeper extensively to manage heartbeats from the workers and the supervisors. This use of Zookeeper limits the number of workers per topology, and the total number of topologies in a cluster, since ZooKeeper becomes the bottleneck at higher numbers.

Lack of backpressure. In Storm, the sender does not adopt to the speed/capacity of the receiver, instead the messages are dropped if the receiver can't handle them.

Heron's modular architecture


As in Storm, a Heron topology is a directed graph of spouts and bolts. (The spouts are sources of streaming input data, whereas the bolts perform computations on the streams they receive from spouts or other bolts.)

When a topology is submitted to Heron, the Resource Manager first determines how many containers should be allocated for the topology. The first container runs the Topology Master which is the process responsible for managing the topology throughout its existence. The remaining containers each run a Stream Manager, a Metrics Manager and a set of Heron Instances which are essentially spouts or bolts that run on their own JVM. The Stream Manager is the process responsible for routing tuples among Heron Instances. The Metrics Manager collects several metrics about the status of the processes in a container.

The Resource Manager then passes this allocation information to the Scheduler which is responsible for actually allocating the required resources from the underlying scheduling framework such as YARN or Aurora. The Scheduler is also responsible for starting all the Heron processes assigned to the container.

Heron is designed to be compositional, so it is possible to plug extensions into it and customize it. Heron allows the developer to create a new implementation for a specific Heron module (such as the scheduler, resource manager, etc) and plug it in the system without disrupting the remaining modules or the communication mechanisms between them. This modularity/compositionality is made a big deal, and the ICDE17 paper is dedicated entirely to this aspect of Heron design.

Note also that Heron's modular architecture also plays nice with the shared infrastructure at the datacenter. The provisioning of resources (e.g. for containers and even the Topology Master) is cleanly abstracted from the duties of the cluster manager.

Heron Topology Architecture


Having a Topology Master per topology allows each topology to be managed independently of each other (and other systems in the underlying cluster). Also, failure of one topology (which can happen as user-defined code often gets run in the bolts) does not impact the other topologies.

The Stream Manager is another critical component of the system as it manages the routing of tuples among Heron Instances. Each Heron Instance connects to its local Stream Manager to send and receive tuples. All the Stream Managers in a topology connect between themselves to form n*n connections, where n is the number of containers of the topology. (I wonder if this quadratically growing number of connections would cause problems for very large n.)


As you can see in Figure 5, each Heron Instance is executing only a single task (e.g. running a spout or bolt), so it is easy to debug that instance by simply using tools like jstack and heap dump with that process. Since the metrics collection is granular, this makes it transparent as to which component of the topology is failing or slowing down. This small granularity design also provides resource isolation. Incidentally, this reminds me of the ideas argued in "Performance-clarity as a first-class design principle" paper.

Saturday, August 12, 2017

On presenting well

I had written about "how to present your work" earlier. If you haven't read that, that is a good place to start.

I started thinking about this topic recently as my two PhD students Ailidani Ailijiang and Aleksey Charapko had to make their first presentations in ICDCS 2017. I worked with them to prepare their presentations. I listened to 3 drafts of their presentations and helped them revise the presentations. Since the topic of presenting your work was rekindled, I thought I should do another post on this.

Most talks suck

I told my students that most of the conference presentations are dreadful. The good news is that the bar is not high. If you prepare sufficiently and practice, you will be already ahead of the curve.

But why do most talks suck? It is mainly because the presenters got it wrong about the purpose of the talk and try to cover everything in their paper. As a result their slides are not well thought, and the presentation follows the paper outline and tries to cover the paper. The presenters then go through these slides quickly, as if covering more ground means the audience will learn more. And the audience get dazed by powerpoint bullets.

This is a rookie mistake. My students also made that mistake in the first draft of their talks. I told them that if they just fixed this problem, they will be almost there. Below I will give you a step by step walkthrough of how to fix this problem.

Know the game

Covering all things in the paper is a losing battle. The audience has seen a dozen presentations that day, things are getting blurry for them, and their brain is getting mushy. Nowadays you are also competing with Twitter and Facebook for the attention of your audience. (This is one of my biggest pet peeves. What is the deal with people who fly over half the world to get to a conference but then keep checking Twitter and Facebook on their laptops? Seriously?)

Moreover, 20 minutes is a shorter time than you think. You don't stand a chance explaining all your results and the subtleties in your work in 20 minutes. Your best chance is to engage them long enough to communicate the heart of your paper.

So the goal of the presentation is to communicate the essence of your paper, instead of covering all the things in your paper. Instead of presenting too many slides in a poor way, it is best to present fewer slides in a memorable way and get people understand those. If you manage to do this, the audience can read your paper for an in-depth description of your work.

One of the best ways to communicate the idea of a paper is to tell it as a story. A story engages the audience better, and will make the lessons stick longer.

Find your story

This is the most challenging part of your preparation. You need to frame your talk well. You should find the right place to begin, and develop the right story and context to communicate your work. This requires good hard thinking and several iterations.

I helped my students frame their talks. We found stories for their presentations, and restructured their first draft to follow this storyline rather than the paper outline. Then we did a couple more drafts where we revised and refined the flow. Maybe I should do a future post to give a concrete example on finding a story on a presentation. For now, I will just point to this tangentially related post.

The most important thing for framing is to motivate the problem well. Giving concrete examples, albeit simplified, helps for this. 100% of audience should understand the problem, and at least 50% of audience should understand the solution you offer. (This is not to mean you will make the later part of the presentation hard to understand. You will be considered successful if 50% of audience understands despite your best efforts to simplify your explanation.)

When you are trying to communicate your work in simple terms, using analogies help.

Cull your slides

Recall that the purpose of the presentation is to communicate the essence of the result and get people interested to dig in more. How do you do it? You iterate over your slides and remove the slides that don't cut right into the matter. You should have as few slides as you can cut down to. When was the last presentation where you wished you had more slides to present?

I always wished I had less slides to present. For a 20 minute presentation I sometimes get lazy and go with 20 slides. Big mistake! With so many slides, I can't pace myself well and start rushing to finish on time. But if I do my preparation well, I go with 10 slides (with ~3 sentences on each) for a 20 minute presentation. When I had less things to say (because I took the time and prioritized what I want to say), I would be able to communicate those things better and actually manage to teach them to the audience. Yes, although I know the basics about presentations, I botch some presentations up because I am lazy and don't prepare sufficiently.

What is that? Are you telling me it is impossible to present your paper in 10 slides? If you are at least a half-competent researcher/writer, you will know that you can and you should explain things at multiple granularities! You can and should have a 1 sentence summary, 5 sentence summary, and 5 paragraph summary of your work. In your presentation, you should refer to your 1 sentence summary multiple times. The audience should really understand that 1 sentence summary.

Calm your nerves

If you are well prepared with your slides, and practice your talk out loud at least a couple times, that should help calm your nerves, because you know you are prepared and up to the task.

But I won't lie, it is unnerving to be in the spotlight, and have too many eyeballs on you. (Some people argue this fear of public speaking has evolutionary roots. When the early human was standing in the savannah, and felt eyeballs on him, that was bad news: predators are eyeing him and he should be on the run. I don't know how credible this is. Evolutionary psychology has been used for arguing all sorts of things.)

Here are some things that can help calm your nerves.

If you focus on the message you will not focus on your self and ego. The message is the important thing for the audience, so nobody cares if you occasionally stutter and mess up. You are doing a service to the audience by delivering a message. There is no point in agonizing over whether you had a flawless delivery, and whether you looked good while presenting. If you deliver your message, then your content will speak for itself.

Another thing that can help is to reframe the situation. Your heart is beating hard not because you are scared, but because you are excited that you got the opportunity to talk about your work. And it is perfectly fine being excited about your talk. If you reframe it this way, this can even give you an energy boost for your talk.

You also need some alone time, a couple hours before you give the talk. Visualizing the talk will help a lot. Visualize yourself standing there, arguing the important points of the paper, slowly and confidently. Simulate your talk giving experience in your brain.

And don't worry if you get nervous and botch up some presentations. I did my fair share of botching up presentations, and found that people don't notice since most conference presentations are bad anyways. Chalk those as experience points. It will get better with time and you will slowly build immunity to stage fright. When I first started presenting, I was very nervous about standing in front of large crowds. It felt terrifying. But now it is just Tuesday afternoon, teaching Distributed Systems class. So start getting yourself exposed to crowds to build immunity.

A job well done

If you do all that hard work in advance, you will get to enjoy yourself at the presentation. You will speak slowly and confidently. You will make eye contact with the audience, and get visual confirmation that people are following you, which will also energize you. You will get questions after the talk, and that is always a good sign people were engaged.

Both my students nailed their talks. Congratulations to them for a job well done. They got several good questions after the talk, and well earned applauses at the end. I know they will do better in other talks in the future, if they don't forget the lessons from this first presentation: find a good story, explain the heart of your work in as little number of slides as possible.

Related Posts

Nobody wants to read your shit
How I write
How to write your research paper
How to present your work
Book review: "Good prose" and "How to write a lot"
Writing advice

Wednesday, August 9, 2017

Paper summary " Encoding, Fast and Slow: Low-Latency Video Processing Using Thousands of Tiny Threads"

This paper was written earlier than the "PyWren: Occupy the Cloud" paper, and appeared in NSDI'17. In fact this paper has influenced PyWren. The related work says "After the submission of this paper, we sent a preprint to a colleague who then developed PyWren, a framework that executes thousands of Python threads on AWS Lambda. ExCamera’s mu framework differs from PyWren in its focus on heavyweight computation with C++-implemented Linux threads and inter-thread communication."

I had written about AWS Lambda earlier. AWS Lambda is a serverless computing framework (aka a cloud-function framework) designed to execute user-supplied Lambda functions in response to asynchronous events, e.g., message arrivals, file uploads, or API calls made via HTTP requests.

While AWS Lambda is designed for executing asynchronous lightweight tasks for web applications, this paper introduced the "mu" framework to run general-purpose massively parallel heavyweight computations on it. This is done via tricking Lambda workers into executing arbitrary Linux executables (like LinPack written in C++). The mu framework is showcased by deploying an ExCamera application for compute-heavy video encoding. The ExCamera application starts 5000-way parallel jobs with IPC on AWS Lambda.

The paper provides the mu framework as opensource software.

Going from Lambda to Mu

The Lambda idea is that the user offloads her code to the cloud provider, and the provider provisions the servers and runs her code. It is easy to start 1000s of threads in parallel in  sub-second startup latency (assuming warm Lambda instances). Upon receiving an event, AWS Lambda spawns a worker, which executes in a Linux container with up to two 2.8 GHz virtual CPUs, 1,536 MiB RAM, and about 500 MB of disk space.
So if Lambda uses container technology, what is its difference from just using containers? The difference is that once invoked the container stays warm at some server waiting another invocation and this does not get charged to you since you are not actively using it. Lambda bills in 100ms increments, and only when you call the Lambda instance with a function invocation. In contrast, EC2 deploys in minutes and bills in hours, and the Google Compute Engine bills in 10 minute increments.

I think another significant difference is that Lambda forces you to use the container via function calls and in stateless mode. This is a limitation, but maybe this forced constraint accounts for most of the success/interest in this platform. Stateless and disaggregated way of doing things may be more amenable for scalability.

The mu platform has two important components external to the Lambda platform: the coordinator and the rendezvous center. And of course it has Lambda workers, thousands of them available on command.

The mu coordinator

The mu platform has an explicit coordinator. (Recall that the PyWren platform design had a thin shim driver at the laptop.) The coordinator is a long-lived server (e.g., an EC2 VM) that launches jobs and controls their execution.   The coordinator is truly the puppeteer. It manipulates the actions of each Lambda workers individually, and the workers comply as drones. The coordinator contains all of the logic associated with a given computation in the form of per-worker finite-state-machine (FSM) descriptions. For each worker, the coordinator maintains an open TLS connection, the worker’s current state, and its state-transition logic. When the coordinator receives a message from a worker, it applies the state-transition logic to that message, producing a new state and sending the next RPC request to the worker using the AWS Lambda API calls. (These HTTP requests are a bottleneck when launching thousands of workers, so the coordinator uses many parallel TCP connections to the HTTP server--one per worker-- and submits all events in parallel.) For computations in which workers depend on outputs from other workers, mu’s coordinator uses dependency-aware scheduling: the coordinator first assigns tasks whose outputs are consumed, then assigns tasks that consume those outputs.

The Lambda workers

The workers are short-lived Lambda function invocations. All workers in mu use the same generic Lambda function. The user only installs one Lambda function and workers spawn quickly because the function remains warm. The user can include additional executables in the mu worker Lambda function package. The worker can then execute these in response to RPCs from the coordinator. The coordinator can instruct the worker to retrieve from or upload to AWS S3, establish connections to other workers via a rendezvous server, send data to workers over such connections, or run an executable.

The mu rendezvous server

The mu platform employs a rendezvous server that helps each worker communicate with other workers. Like the coordinator, the rendezvous server is long lived. It stores messages from workers and relays them to their destination. This means that the rendezvous server’s connection to the workers can be a bottleneck, and thus fast network connectivity between workers and the rendezvous is required.

Using the mu platform

To design a computation, a user specifies each worker’s sequence of RPC requests and responses in the form of a FSM, which the coordinator executes.  The simplest of mu’s state-machine components represents a single exchange of messages: the coordinator waits for a message from the worker, sends an RPC request, and transitions unconditionally to a new state. To encode control-flow constructs like if-then-else and looping, the mu Python library includes state combinator components. An if-then-else combinator might check whether a previous RPC succeeded, only uploading a result to S3 upon success.

ExCamera video encoding application

The paper implements a video encoder intended for fine-grained parallelism, called ExCamera. ExCamera encodes tiny chunks of the video in independent threads (doing most of the “slow” work in parallel), then stitches those chunks together in a “fast” serial pass, using an encoder written in explicit state-passing style with named intermediate states.

In formats like 4K or virtual reality, an hour of video may take many hours to process as video compression relies on temporal correlations among nearby frames. However, ExCamera cuts this by an order of magnitude by deploying 1000s of Lambda workers in parallel and coordinating them via the mu platform.

As another application of the mu platform, the authors did a live demo for their NSDI17 conference talk. They had recorded 6 hours of video at the conference, and using OpenFace and mu platform on AWS Lambda they selected the frames where George Porter, a coauthor, appeared in the feed. This was done in minutes time and was indeed pretty bold thing to do at a presentation.

Limitations

The paper has a limitations section. (I always wanted to include such a section in my papers. Maybe by including the limitations section you can preemptively disarm straightforward criticisms.) The limitations lists:
+ This is evaluated only on two videos.
+ If everybody used Lambda this way, would this still work?
+ The pipeline specification that the user should provide is complex.
+ A worker failure kills the entire job.

The paper mentions an experiment that ran 640 jobs, using 520,000 workers in total, each run for about a minute on average. In that experiment three jobs failed. This is a low failing rate, but this doesn't scale well with increasing job sizes.