Exam 2 Study Guide

The one-hour study guide for exam 2

Paul Krzyzanowski

November 2023

Disclaimer: This study guide attempts to touch upon the most important topics that may be covered on the exam but does not claim to necessarily cover everything that one needs to know for the exam. Finally, don't take the one hour time window in the title literally.

Last update: Fri Nov 3 21:15:10 EDT 2023

Mutual exclusion

Goal: Allow processes to request and be granted exclusive access to named resources. The algorithms should be designed to avoid starvation and ensure that exactly one requesting process gets the resource even if multiple processes make requests concurrently.

Mutual exclusion is responsible for making sure that only one process or thread accesses a resource at a time. Processes do this by requesting a lock on a resource. In operating systems, it is implemented through mechanisms such as semaphores and monitors and, at a low level, through instructions such as test-and-set locks. None of these mechanisms work across networks of computers. We need an algorithm that will allow processes to compete for access to a resource and ensure that at most one process will be granted access. The algorithm must be fair: it must ensure that all processes that want a resource will eventually get a chance to get it. If this is not the case, starvation occurs, where a process will have to wait indefinitely for a resource.

The “resource” is an arbitrary name that all processes agree to use. It may refer to a critical section of code, a file, a field in a database, a physical device, or some network service. A mutual exclusion algorithm allows a process to request access to this resource and be granted exclusive access to it. This is known as a lock. If the lock has a timeout (an expiration) associated with it, then it is known as a lease.

There are three basic properties that we expect from a mutual exclusion algorithm are:

  1. Safety: This simply means the algorithm works and only one process can hold a resource at a time.

  2. Liveness: The algorithm should make progress and not wait forever for a message that will never arrive.

  3. Fairness: Every process that wants a resource should get a fair chance to get it. For example, two processes cannot continuously hand the resource back and forth to each other, disallowing a third from being able to get it.

Distributed algorithms fall into three categories. A centralized approach uses a central coordinator that is responsible for granting access permissions.

A token-based approach allows a process to access a resource if it is holding a “token”; that is, it received an unsolicited message where ownership of the message allows the process to access the resource. Sending the message (token) means forfeiting access.

A contention-based algorithm is one where all processes coordinate together on deciding who gets access to the resource.

The centralized algorithm runs on a single server that accepts REQUEST messages for a resource. If nobody is using the resource, it responds with a GRANT message and places the request on a FIFO (first-in, first-out) queue of requests for that resource. If somebody is using the resource, that the server simply does not respond but also places the request on the queue of requests. When a client is done with a resource, it sends the server a RELEASE message. The server removes the process' ID from the queue and sends a GRANT message to the next client in the queue (if there is one).

The token ring algorithm creates a logical communication ring among the processes in the group, where each process communicates with a single neighbor. A message, called a token, is created for each resource. This token is passed from process to process along the ring. If a process receives a token and does not need to access that resource, it simply passes the token to the next process. Otherwise, it will hold on to the token until it is done with the resource.

Lamport’s mutual exclusion algorithm requires each process that wants a resource to send a timestamped request for the resource to all processes in the group, including itself. Receipt of each message is immediately acknowledged by every process. Each process places the received message in a local priority queue that is sorted by the Lamport timestamp that is in each message (or any totally unique timestamp). A process decides whether it can access the resource by checking whether its own request is the earliest (first) one in the queue of all requests that it has received. If so, it accesses the resource and, when done, sends a release message to all members (including itself). The receipt of a release message causes each process to remove that process ID from the ordered queue. If a process now finds itself as the earliest process in the queue, it – and everyone else – knows that it is now its turn to access the resource.

Lamport’s mutual exclusion algorithm summary: send a request message to everyone in the group, including yourself. Get an acknowledgement from everyone. Each process stores request messages in a priority queue sorted by timestamp. The process ID at the head of the queue can access the resource. When it is done, it sends a release message to everyone.

The Ricart & Agrawala algorithm, like Lamport’s, is also based on using reliable multicasts. A process that wants to access a resource sends a request to all other processes in the group and waits for all of them to respond. As with Lamport’s mutual exclusion algorithm, the request message contains a unique Lamport timestamp. If another process is currently using the resource, that process delays its response until it is done. If process A and process B sent out requests concurrently (i.e., two systems want the same resource at approximately the same time), each of those systems compares the timestamp of that request with that of its own request. If process A received a request from process B that is older (a lower timestamp) than the one it sent, then process A will give process B priority to access the resource by sending a response to process B . Otherwise, if process A has the earlier timestamp, it will queue the request from B and continue to wait for all acknowledgements, sending a response to B (and any other processes who wanted the resource) only when it is done using the resource. The key point is that processes A and B will make the same comparison and exactly one will hold back on sending the response.

Ricart & Agrawala’s algorithm summary: send a request message to everyone in the group and wait for acknowledgements from everyone. Any process using the resource will delay sending an acknowledgement.

The Lamport and Ricart & Agrawala algorithms are similar in that they are both contention based and truly distributed. Ricart & Agrawala’s requires fewer messages since there is no explicit release message that needs to be sent; acknowledgements are simply delayed. Neither are efficient compared with the centralized algorithm.

Election algorithms

Goal: Allow all processes in a group to elect exactly one of the processes in the group. All processes should agree to elect the same process.

Election algorithms are designed to allow a collection of processes to agree upon a single coordinator. All candidate processes are considered to be equally capable of acting as a coordinator. The task is to select exactly one of these processes — and make sure that every process is aware of the selection. In designing these algorithms, we assume that every process has a unique ID (for example, a process ID combined with the machine’s address). The algorithms we examine in this section will choose a winner (a coordinator) by selecting either the highest available process ID or the lowest one. It is important that whatever criteria is used will never result in one process choosing a different winner than another even multiple concurrent elections are run.

The bully algorithm selects the largest process ID as the coordinator. If a process detects a dead coordinator, it sends an ELECTION message to all processes with a higher ID number and waits for any replies. If it gets none within a certain time, it announces itself as a coordinator. When a process receives an ELECTION message it immediately sends a response back to the requestor (so the requestor won’t become the coordinator) and holds an election to see if there are any higher-numbered processes that will respond. If not, then it declares itself as coordinator.

The ring election algorithm requires a logical communication ring among the processes in the group (as we had in the token ring mutual exclusion algorithm). When a process detects a non-responding coordinator, it creates an ELECTION message containing its own process ID and sends it to the next process in the ring. If the process is dead, then it sends the message to the following process. This sequence continues, with each process sending an ELECTION message to the next process in the ring, skipping dead processes until it finds a process that can receive the message. When a process receives an ELECTION message, it adds its own process ID to the body and sends that message out to its neighboring process. When the election message circulates back to the original sender (the sender detects this by seeing its process ID at the head of the list, indicating that it started the election), the sender looks at the list of active processes in the message body and chooses a coordinator from among them. Since multiple elections may have been started concurrently, the algorithm for choosing the leader must yield the same result among the different list. Hence, selecting the highest or the lowest process ID will work. Selecting the first or last process in the list will not.

The Chang and Roberts ring algorithm improves on the ring algorithm in two ways. First, there is no need to keep the entire list of live processes in the election messages; we can perform the election as we go along. If the goal is to vote for the highest-numbered live process ID and a process receives an election message, it does one of two things: (1) passes it untouched if its process ID is smaller than the one in the message, or (2) replaces the process ID in the message with its own if the process ID is greater than the one in the message. When a process receives a message that contains its own process ID, it knows that the message has come full circle and it is the winner. It will then notify each process of the outcome.

The second optimization is to avoid having multiple election messages circulate if multiple processes have initiated elections concurrently. To do this, a process keeps track if it has participated in an election (i.e., has initiated or forwarded an election message). If a process receives a message with a smaller process ID than its own and it is a participant in an election, it simply discards the message. If, on the other hand, the received message contains a greater process ID than its own, it will forward the message even if it is a participant in an election since the message contains a candidate that takes priority over the previous one that was forwarded. Once election results are announced, each process clears its status of participating in an election. There is still the opportunity to have multiple elections circulate (no harm done) but they are aborted when it makes sense to do so.

One problem with election algorithms is when a network gets partitioned (also known as segmented): one set of processes is separated from another and cannot communicate with the other. In this case, each segment may elect its own coordinator and problems can arise. This is known as a split brain situation. To combat this, a redundant network is needed or some alternate communication mechanism to enquire about the state of remote processes.

Another approach that is often taken is to require a majority of prospective coordinators to be available. A quorum is the minimum number of processes that need to be available to participate in an operation. In this case, the operation is an election and we need a quorum of over 50% of the processes. If a network is partitioned into two or more segments, at most one of the partitions will have over 50% of the processes. Those processes that do not will refuse to elect a coordinator.

Raft Consensus

Goal: Create a fault-tolerant distributed algorithm that enables a group of processes to agree on a single value.

The goal of a consensus algorithm is to have a group of processes agree on a common value. The value must be one of the values that was proposed by at least one of the processes. Once the processes agree on the value, it is considered final.

Replicated state machines

The most common use of consensus is to create create fault-tolerant systems by creating replicated state machines. fault-tolerant systems. Processes send messages (commands, such as data updates) to a group of replicated servers. These messages are received by each of the servers and added to a log on that server. If one process wants to set name=value_1 and, concurrently, another process tries to set name=value_2, we would like every instance of our database to consistent. Every copy should have either name=value_1 or name=value_2. It is not acceptable to have some instances have name=value_1 while others have name=value_2.

This log must be consistent across all servers so that when the commands in the log are applied to the state machine (i.e., the processes on the servers read their logs and apply the commands) they are all processed in the same sequence on each server. Consensus enables all processes to agree on the order in which commands are added to the replicated logs.

Another, simpler, use of consensus is running an election: each process may propose itself as a contender and the consensus algorithm will pick one winner. Yet another example is mutual exclusion: agreement on who gets a lock on a resource, although software must be built around this to manage a queue of outstanding requests.

Achieving consensus is easy if processes don’t fail or we are willing to wait for all processes to recover if they die (as with the two-phase commit protocol that we will examine later). The challenge with a consensus algorithm is to achieve consensus in the presence of faults.

A consensus algorithm must satisfy four goals:

The outcome must be one of the proposed values.
Uniform agreement
No two processes may ultimately agree on different values.
A process must ultimately agree on a single value. It cannot change its mind later.
Progress (termination)
The algorithm must eventually terminate such that every process will decide on the same value.


Raft is a fault-tolerant distributed consensus algorithm. It was created as an easier-to-understand alternative to Paxos (the first provably correct distributed consensus protocol and was designed specifically to make it easy to implement replicated logs, something that was not direcly achievable with Paxos. It can be easily used to create agreement among participants on a globally consistent (total) order for client messages. These messages can be commands to update the state of a system or data in a file. By creating a consistent order of these messages among all servers, we ensure that each of them applies changes in the same order and hence each server is an identical replica of each other.

Raft achieves consensus as long as a a majority of servers running the protocol are functioning and accessible. It also requires a single elected leader to receive all client requests. Any server can act as a leader and an election takes place to select a leader.

A server can be in a follower, leader, or candidate state. If it is the leader then it receives all client requests. If it is the follower then it is a non-leader and the system has a leader. Followers only accept messages from leaders and not from clients. If it is a candidate then we are holding an election to pick a leader and the server may become a leader.

Raft elections

Leaders send periodic heartbeats to all followers. Servers start in a follower state. A server remains in a follower state as long as it receives valid heartbeat messages from a leader or a candidate. If it receives no heartbeat within a specific time interval, the follower may need to become a leader, so it becomes a candidate and runs an election by requesting votes from other servers. If a candidate receives a majority of votes from the other servers, it becomes the leader.

To start an election, a follower waits a random time interval before it makes itself a candidate. It then votes for itself and sends a request vote RPC message to all other servers. If a server receives a request vote message and has not voted yet, it then votes for that candidate. When a candidate gets a majority of votes, it becomes the leader.

Each election marks a new term, identified by an incrementing number.

If a candidate receives a heartbeat from another server and that leader’s term number is at least as large as the candidate’s current term, then the candidate recognizes the leader as legitimate and becomes a follower.

It’s possible that multiple candidates receive votes but none receives a majority of votes. This indicates a split vote situation. On the chance that a candidate does not win or lose an election, it simply times out and starts a new election. Randomized timeouts ensure that split votes rarely occur since it’s unlikely that multiple candidates will start their elections at the same time: one candidate will usually have the advantage of requesting votes before others get a chance to do so.

Raft log replication

The elected leader accepts client requests. Each message sent by clients becomes a log entry that will be replicated among the servers. The leader first adds the message to its own log. At this point, the entry is considered to be uncommitted. The leader then sends the message to the other servers (the followers). Each follower acknowledges the message. When a majority of followers have acknowledged the message, the entry is applied to the state machine and considered to be committed. The leader then notifies the followers that the entry has been committed and they apply the message to their state machines. Consensus has now been achieved.


Goal: Create a highly-available centralized lease manager and file system for small files that can serve as a name server and configuration repository.

Chubby is a highly available and persistent distributed lock service and file system created by Google and used in various projects and frameworks, including Bigtable and MapReduce. Its purpose is to manage locks for resources and store configuration information for various distributed services throughout the Google cluster environment. The key design goal of Chubby was high availability and high reliability.

By default, the service runs on five computers as five active replicas. This grouping is called a Chubby cell. One of these replicas is elected as the master to serve client requests. Only the master will serve client requests; the other machines in the cell are for fault tolerance in case the master dies. The only request they will answer from clients is to tell them who the master is.

A majority of the replicas in the cell must be running for the service to work. The Paxos distributed consensus algorithm is used to agree on a leader and is used for state machine replication. That is, it ensures that all updates take place in the same order on all replicas to keep the replicas consistent. Typically, there will be one Chubby cell per data center.

In addition to providing locks, Chubby is designed to manage relatively small amounts of data: items such as system configuration and state information for various services. Chubby provides clients with a namespace of files & directories. Every file or directory can be treated as a lock file and every file may, of course, be used to store data. The name of a lock is the name of its file: its hierarchical pathname. The interface to Chubby is not that of a native file system. There is no kernel module and client software communicates with Chubby via an API that sends remote procedure calls to the Chubby master.

File operations are somewhat different from those offered by conventional file systems. Files can be read and written only in their entirety: there are no seek or byte-range read and write operations. When a file is opened by the client, it is downloaded and a lease for that file is established. In this way, Chubby keeps track of which clients have cached copies of a file. All writes from a client must be sent to the Chubby master (we have a write-through cache). Chubby then sends invalidations to all clients that have cached copies.

Locks are advisory and can be either exclusive (one writer) or not (multiple readers). A client can send a request to acquire a lock for a file, release it, and also assign and check sequence numbers for a lock. Clients can also subscribe to receive events for an open file. These events include notification of file modification, the creation of directories or files under an open directory, and lock acquisition.

Chubby is designed primarily for managing coarse-grained locks. Fine-grained locks are locks that are generally used for a small object, such as a row in a table of a database. They are generally held held for a short duration, seconds or less. Coarse-grained locks typically control larger structures, such as an entire table or an entire database. They might be held for hours or days. They are acquired rarely compared to fine-grained locks. Hence, a server that is designed for coarse-grained locks can handle more clients.

Even though Chubby uses the term “coarse-grained”, it doesn’t exactly fit a “pure” coarse/fine grained lock model, but that model does not necessarily work well in real systems. In theory, the concept of a coarse-grained lock is to grant a process a lock for a large pool of objects of a particular class and then have that process be the lock manager for those objects. This is essentially what Chubby does but Chubby doesn’t keep track of all the objects. Instead, the top (coarse) level is a set of services, such as Bigtable tables, a GFS file system, or Pregel frameworks. Chubby allows them to ensure there is a single master and to lock any critical resources that might be shared among these applications. Those applications then handle the fine-grained aspect of giving out locks for data blocks, table cells, or synchronizing communication barriers.

Because Chubby does not hold huge amounts of data but may serve thousands of clients, all Chubby data is cached in memory. For fault tolerance, all data is written through to the disk when modified, and is propagated to replicas via the Paxos consensus algorithm to ensure consistent ordering. The entire database is backed up to the Google File System (GFS) every few hours to ensure that critical configuration information is saved even if all replicas die.

Chubby’s fault-tolerant locking makes it a good service for leader election. If a group of processes wants to elect a leader, they can each request a lock on the same Chubby file. Whoever gets the lock is the leader (for the lease period of the lock).

Network Attached Storage

Goal: Allow multiple clients to access files from file servers on a network.

To provide the same system call interface for supporting different local file systems as well as remote files, operating systems generally rely on a layer of abstraction that allows different file system-specific interfaces to coexist underneath the common system calls. On most Unix-derived systems (e.g., Linux, BSD, OS X, SunOS), this is known as the VFS layer (Virtual File System).

There are a couple of models for implementing distributed file systems: the _download/upload model _or the remote procedure call model. In a stateful file system, the server maintains varying amounts of state about client access to files (e.g., whether a file is open, whether a file has been downloaded, cached blocks, modes of access). In a stateless file system, the server maintains no state about a client’s access to files. The design of a file system will influence the access semantics to files. Sequential semantics are what we commonly expect to see in file systems, where reads return the results of previous writes. Session semantics occur when an application owns the file for the entire access session, writing the contents of the file only upon close, thereby making the updates visible to others after the file is closed, and overwriting any modifications made by others prior to that.


NFS was designed as a stateless, RPC-based model implementing commands such as read bytes, write bytes, link files, create a directory, and remove a file. Since the server does not maintain any state, there is no need for remote open or close procedures: these only establish state on the client. NFS works well in faulty environments: there’s no state to restore if a client or server crashes. To improve performance, a client reads data a block (8 KB by default) at a time and performs read-ahead (fetching future blocks before they are needed). NFS suffers from ambiguous semantics because the server (or other clients) has no idea what blocks the client has cached and the client does not know whether its cached blocks are still valid. NFS uses validation, where the client compares modification times from server requests to the times of data that it cached. However, it does this only if there are file operations to the server. Otherwise, the client simply invalidates the blocks after a few seconds. In NFS’s original stateless design, file locking could not be supported since that would require the server to keep state. It was later added through a separate lock manager that maintained the state of locks.

To facilitate larger deployments, NFS introduced the automounter. It was common to have an environment with many clients, each mounting many remote file systems. In such an environment, if all clients start up at approximately the same time, they can flood the server with mount requests. The automounter mounts remote directories only when they are first accessed. To make keeping track of mount points easier across many machines, automounter maps are configuration files that define what remote directories get mounted. These can be distributed across a set of clients.


AFS was designed as an improvement over NFS to support file sharing on a massive scale. NFS suffered because clients would never cache data for a long time (not knowing if it would become obsolete) and had to frequently contact the server. AFS introduced the use of a partition on a client’s disk to cache large amounts of data for a long time: whole file caching and long-term caching. It supports a file download-upload model. The entire file is downloaded on first access (whole file download) and uploaded back to the server after a close only if it was modified. Because of this behavior, AFS provides session semantics: the last one to close a modified file wins and other changes (earlier closes) are lost.

During file access, the client need never bother the server: it already has the file. When a client first downloads a file, the server makes a callback promise: it maintains a list of each client that has downloaded a copy of a certain file. Whenever it gets an update for that file, the server goes through the list and sends a callback to each client that may have a cached copy so that it can be invalidated on the client. The next time the client opens that file, it will download it from the server. Files under AFS are shared in units called volumes. A volume is just a directory (with its subdirectories and files) on a file server that is assigned a unique ID among the cell of machines (remember cells from DCE RPC?). If an administrator decides to move the volume to another server, the old server can issue a referral to the new server. This allows the client to remain unaware of resource movement.


Coda was built on top of AFS and focused on two things: supporting replicated servers and disconnected operation. To support replicated storage, AFS’s concept of a volume was expanded into that of a Volume Storage Group (VSG). Given that some volumes may be inaccessible at a particular point in time, the Accessible Volume Storage Group (AVSG) refers to the subset of the VSG that the client can currently access. Before a client accesses a file, it first looks up the replicated volume ID of the file to get the list of servers containing replicated volumes and the respective local volume IDs. While it can read files from any available server, it first checks the versions from all of them to ensure that one or more servers don’t have out-of-date files. If the client discovers that a server has an old version of a file, it initiates a resolution process by sending a message to that server, telling it to update its old versions. When a client closes a file, if there were any modifications, the changes are written out to all available replicated volumes.

If no servers are available for access, the client goes into disconnected operation mode. In this mode, no attempt is made to contact the server and any file updates are logged instead in a client modification log (CML). Upon connection, the client plays back the log to send updated files to the servers and receive invalidations. If conflicts arise (e.g., the file may have been modified on the server while the client was disconnected) user intervention may be required.

Because there’s a chance that users may need to access files that are not yet in the local cache, Coda supports hoarding, which is a term for user-directed caching. It provides a user interface that allows a user to look over what is already in the cache and bring additional files into the cache if needed.

DFS (AFS version 3)

AFS evolved over the years. The most significant evolutionary version is version 3, which was adopted as the recommended distributed file system in the Distributed Computing Environment (DCE) where it is named DFS (Distributed File System).

The primary design goal of this system was to avoid the unpredictable lost data problems of session semantics if multiple clients are modifying the same file. The concept of tokens was introduced. A token is permission given by the server to the client to perform certain operations on a file and cache a file’s data. The system has four classes of tokens: open, data, status, and lock tokens. An open token must be obtained to have permission to open a file. A read data token must be obtained for a byte range of a file to have permission to access that part of the file. Similarly, a write data token is needed to write the file. Status tokens tell the client that it may be able to cache file attributes. These tokens give the server control over who is doing what to a file. Tokens are granted and revoked by the server. For example, if one client needs to write to a file then any outstanding read and write data tokens that were issued to any clients for that byte range get revoked: those clients are now denied access until they get new tokens.


Microsoft’s Server Message Block protocol was designed as a connection-oriented, stateful file system with a priority on file consistency and support of locking rather than client caching and performance. While it does not use remote procedure calls, its access principle is the same: requests (message blocks) are functional messages, providing file access commands such as open, create, rename, read, write, and close.

With the advent of Windows NT 4.0 and an increasing need to provide improved performance via caching, Microsoft introduced the concept of opportunistic locks (oplocks) into the operating system. This is a modified form of DFS’s tokens. An oplock tells the client how it may cache data. It allows clients to cache information without worrying about changes to the file at the server. At any time, a client’s oplock may be revoked or changed by the server. The mechanism has been extended since Windows 7 and is generally referred to as leases. There are currently eight oplocks (do not memorize these but have an understanding of what they do):

  1. A level 1 oplock (exclusive oplock) provides the client with exclusive access to the file (nobody else is reading or writing it), so it can cache lock information, file attributes, and perform read-aheads and write-behinds.

  2. A level 2 oplock (shared oplock) is granted in cases where multiple processes may read a file and no processes are writing to it.

  3. A batch oplock is also exclusive and allows a client to keep a file open on the server even if the local process using it closed the file. This optimizes cases where a process repeatedly opens and closes tehe same files (e.g., batch script execution).

  4. A filter oplock is exclusive and allows applications that hold the oplock to be preempted whenever other processes or clients need to access the file.

  5. A read oplock (R) is a shared oplock that is essentially the same as a level 2 oplock. It supports read caching.

  6. A read-handle oplock (RH) allows multiple readers and keeps the file open on the server even if the client process closes it. It is similar to the batch oplock but is shared and does not support file modifications. It supports read caching and handle caching.

  7. A read-write oplock (RW) gives a client exclusive access to the file and supports read and write caching. It is essentially the same the the level 1, or exclusive, oplock.

  8. A read-write-handle oplock (RWH) enables a client to keep a file open on the server even if the client process closes it. It is exclusive and similar to the batch oplock.

The last four oplocks have been added since Windows 7 and are somewhat redundant with earlier mechanisms.

Oplocks may be revoked (broken) by the server. For example, if Process A has a read-write oplock, it can cache all read and write operations, knowing that no other client is modifying or reading that file. If another client, Process B, opens the file for reading, a conflict is detected. Process B is suspended and Process A is informed of the oplock break. This gives Process A a chance to send any cached changes to the server before Process B resumes execution.

SMB 2 and beyond

The SMB protocol was known to be chatty. Common tasks often required several round-trip messages. It was originally designed for LANs and did not perform optimally either on wide area networks (WANs) or on today’s high-speed LANs (1–100 Gbps). The SMB protocol was dramatically cleaned up with the introduction of the Microsoft Vista operating system (SMB 2), with minor changes added in Windows 7 (SMB 2.1) and even more in Windows 8 (SMB 3). Apple has dropped its proprietary AFP protocol in favor of SMB 2 in OS X 10.10 (Mavericks). We will focus our discussion on the significant changes introduced in SMB 2.

SMB 2 added six key changes to its design:

Reduced complexity
The set of SMB commands went from over 100 commands down to 19.
Pipelining is the ability to send additional commands before the response to a prior command is received. Traditionally, SMB (and any RPC-based system) would have to wait for one command to complete before sending an additional command. The goal of pipelining is to keep more data in flight and use more of the available network bandwidth.
To give the server control over getting an overly high rate of client requests, credit-based flow control is used. With credit-based flow control, the server creates a small number of credits and later increases this number as needed. The server sends these credits to each client. The client needs credits to send a message to the server. Each time a message is sent, it decrements its credit balance. This allows server to control the rate of messages from any client and avoid buffer overflow.
Note that TCP uses congestion control, which results in data loss and wild oscillations in traffic intensity (TCP keeps increasing its transmission window size until packet loss occurs; then it cuts the value of the buffer in half and starts increasing again).
Compounding is similar to pipelining but now allows multiple commands to be sent in one message. It avoids the need to optimize the system by creating commands that combine common sets of operations. Instead, one can send an arbitrary set of commands in one request. For instance, instead of the old SMB RENAME command, the following set of commands are sent: CREATE (create new file or open existing); SET_INFO; CLOSE. Compounding reduces network time because multiple requests can be placed within one message.
Larger read/write sizes
Fast networks can handle larger packet sizes and hence transfer larger read and write buffers more efficiently.
Improved caching
SMB 2 improved its ability to cache folder and file properties. This avoids messages to the server to retrieve these properties.
Durable handles
If there was a temporary network disconnection, An SMB client would lose its connection to the server and have to reestablish all connections and remount all file systems and reopen all files. With SMB 2, the connection has to be reestablished but all handles to open files will remain valid.

NFS version 4

While NFS version 3 is still widely used, NFS version 4 introduced significant changes and is a departure from the classic stateless design of NFS. The server is now stateful and is able to control client-side cache coherence better, allowing clients to cache data for a longer time. Servers can grant clients the ability to do specific actions on a file to enable more aggressive client caching. This is similar to SMB’s oplocks. Because of the stateful server, NFS now acquired open and close operations.

The addition of callbacks will notify a client when file or directory contents have changed.

Like SMB 2, NFS now supports compound RPC, where multiple operations can be grouped together into a single request. Sending one message instead of a series of messages reduces overall round-trip time significantly.

NFSv4 also added strong authentication and encryption and support file system replication and migration. This includes a mechanism of sending referrals similar to that used by AFS.

Parallel file systems

Goal: Store huge files (terabyte and larger) in a file system that is distributed across many (thousands) of machines.

Conventional file systems lay out an entire file system on one logical storage device (a disk or NAND flash memory). This means that the metadata and data for all files and directories is located in this one device. In some cases, the device might be a collection of disks (e.g., a RAID drive or several disks connected by a volume manager) but, to the operating system, it appears as one device – one collection of blocks. This places a restriction on each file system that it cannot span one machine. Conventional distributed file systems, used for network attached storage (NAS), basically use a file system access protocol but interact with one or more servers running these conventional file systems.

With parallel file systems, the metadata and data of files are separated onto different entities. The metadata of a file is the set of attributes about a file: its name, size, access permissions, time stamps, etc. The data of a file is, well, the file’s data. Metadata does not take up much space. It’s a small set of attributes and each attribute is generally only a few bytes in size. Data, however, can be huge: terabytes or more for large files.

The idea of a parallel file system is to have a system – or systems – manage the metadata. As part of the metadata for each file, the system will identify the systems and logical blocks holding the file’s data. By doing this, we gain several benefits:

  1. A file’s data is no longer limited to the capacity of any one system and can be spread out across many systems.

  2. Because data blocks are distributed among multiple systems, access is effectively load balanced across them. Moreover, multiple systems can process different file or different parts of a file with read operations taking place on different systems. This provides scalable bandwidth, comparable to striping in storage arrays.

  3. Data blocks can be replicated on multiple systems, providing fault tolerance and the potential for greater read bandwidth since processes can read from any of the replicated data blocks.

This design approach is the basis for the Google File System (GFS), the Hadoop Distributed File System (HDFS, essentially a clone of GFS), and distributed file systems used in supercomputing clusters, such as Luster and GlusterFS. We will not look at these last two but the concepts are similar.

Google File System (GFS)

The Google File System is designed to support large data-intensive applications (the kind of algorithms Google uses for search and other services). The system is designed for an environment where there are many thousands of storage servers, some of which are expected to be down at any given point in time. The data that is managed comprises of billions of objects and many petabytes of data. GFS is not designed to be a general-purpose file system and is optimized for large files (rather than lots of small files), streaming reads (more common than writes), and atomic appends (which may be done concurrently by multiple clients).

Each GFS cluster contains one master file server. This is a faster and more reliable machine that manages file system metadata, such as names and attributes of files and directories. None of the actual file data resides on this system. Instead, it contains a mapping of the file contents to the chunks that hold the data. Chunks are fixed-size, 64 MB blocks and are stored in chunkservers. The chunkservers are less reliable than the master and are replicated so that each chunk is stored on typically three three separate chunkservers.

Clients contact the master to look up files. The master gives them a chunkserver name and chunk handles for the files requested.

To write to a file, the master grants a chunk lease to one of the replicated chunks. The chunkserver holding this is the primary replica chunkserver for that chunk. Writing is a two-stage process. First, the client writes to one replica. That replica then forwards the data to another replica chunk server, and so on until all replicas for that chunk receive the data. This relieves the load and network demands on the client in that the client has to only write one copy. It also doesn’t put any one computer within GFS in the position of having to send out N copies of the data.

Note that the data flow (data transfer) from a client is chained: it goes to the primary replica, then to secondary replica, then another secondary replica, etc. This pipelining optimizes the bandwidth on each link. Writes are pipelined; a chunkserver sends data to the next replica at the same time as it is receiving it. This minimizes latency. To further reduce latency, each system will try to pick the nearest system that has not yet been added to the chain to send data. The client has the choice of all the replicas and can pick the closest one.

Once all replicas acknowledge receipt of the data, the second stage, writing, takes place. The client sends a write request to the primary chunkserver, identifying the data that was sent. The primary assigns a sequence to the write operations that take place on the data and sends write requests to all the replicas so that they will write data in the same sequence-number order.

The control flow (write commands) is sent from the client to the primary and then goes from the primary directly to all of the secondaries (but is a much, much smaller message).

Hadoop Distributed File System

Apache’s Hadoop Distributed File System (HDFS) is incredibly similar to GFS and is designed with essentially the same goals. The key distinction is that it does not support modifying a file once it is created, but it does support appends. This avoids the need to manage leases or locks for the file.

Like GFS’s master, HDFS uses a separate server, called a NameNode, that is responsible for keeping track of the filesystem namespace and the location of replicated file blocks. File data is broken into 128 MB blocks (default, but configurable per file) and is stored on DataNodes (GFS’s chunkservers). Each block (GFS chunk) is replicated on multiple DataNodes (the default is three, as with GFS). As with GFS, all file system metadata, names and block maps, is stored in memory on the DataNode for performance. File writes from clients are pipelined through each of the replica DataNodes that will hold the data.

HDFS uses rack-aware logic for determining which computers should host replicas for a block. The first replica is targeted to a computer in the same rack as the requesting client (to minimize latency). Second and additional replicas come from other racks in the data center for fault tolerance in case the first rack fails. The process for writing a block is essentially the same as in GFS. Data writes are pipelined to get data onto the replicas and then an update command is sent by the primary replica to all the others to write the block (this is essentially a commit operation, as it makes the data permanent).

Distributed Lookup Services (Distributed Hash Tables)

Goal: Create a highly-scalable, completely decentralized key-value store

The purpose of a distributed lookup service is to find the computer that has data that corresponds to a key that you have. For example, the key can be the name of the song and the computer is one that is hosting the MP3 file, or the key can be your customer ID and the corresponding data is the contents of your shopping cart.

The most straightforward approach is to use a central coordinator to store all the keys and their mapping to computers. You ask the server to look up a key and it gives you the computer containing the content. This is the Napster implementation, which was one of the first peer-to-peer file sharing services. A coordinator served as the web service that kept track of who was hosting which music files for download but the files themselves were hosted by the community of users. The Google File System, GFS, also uses a central coordinator although the corresponding data for a key (file) is distributed among multiple chunkservers.

Another approach is to “flood” the network with queries. What happens here is that your computer knows of a few peer computers, not the entire collection. It sends the request to these computers. Those computers, in turn, know of other computers. When a computer needs to look up a key, it forwards the request to all of its peer nodes. If a peer does not have the content, it repeats the process and forwards the request to its peers (and they forward the request to their peers …). A time to live (TTL) value in the request is decremented with each hop and the request is no longer forwarded if the hop count drops below zero. If one of the peers has the content, then it responds to whoever sent it the request. That node, in turn sends the response back to its requestor and the process continues back until the originator of the query gets the response. This is called back propagation. This approach was used by the Gnutella file sharing system.

Flooding uses what is known as an overlay network. An overlay network is a logical network that is built on top of another network. Typically, computers on the network have knowledge of only some of the other computers on the network and will have to use these “neighbor” nodes to route traffic to more distant nodes.

Finally, the distributed hash table (DHT) approach is a set of solutions that is based on hashing the search key to a number that is then used to find the node responsible for items that hash to that number. A key difference between the DHT approach and the centralized or flooding approaches is that the hash of the key determines which node is responsible for holding information relating to the key.

Consistent hashing

Conventional hash functions require most keys to be remapped if the size of the hash table changes: that is, keys will usually hash to a different value when the size of the table changes. For a distributed hash, this will be particularly problematic. It would mean that if we remove or add a node to a group of computers managing a DHT, a very large percentage of data will have to migrate onto different systems. A consistent hash is a hashing technique where most keys will not need to be reamapped if the number of slots in the table changes. On average, only k/n keys will need to be remapped for a system where k is the number of keys and n is the number of slots in the table.

CAN: Content-Addressable Network

CAN implements a logical x-y grid (although it can be applied to an arbitrary number of dimensions). A key is hashed by two hashing functions, one for each dimension (e.g., an x-hash and a y-hash). The result of these hashes identifies an x, y point on the grid. Each node is responsible for managing all keys that are located within a rectangle on the grid; that is, a node will be responsible for all keys whose x-hash falls in some range between xa and xb and whose y-hash falls in some range between ya and yb. This rectangle is known as a zone. If a node is contacted with a query for a key, it will hash the key and determine if it is present within its zone. If it falls within the node’s zone, the node can satisfy the request. If the hashed values are out of range, the node will forward the request to one of four neighbors (north, east, south, or west), which will then invoke the same logic to either process the query of forward it onto other nodes. The average route for a system with n nodes is O(sqrt(n)) hops.

CAN scales because each zone can be split into two, either horizontally or vertically, and the process can be repeated over and over as needed. For each split, some of the keys will have to move to the new node. For fault tolerance, key, value data can be replicated on one or more neighboring nodes and each node will know not just its neighbors but its neighbors' neighbors. If a node is not reachable, the request will be sent to a neighboring node. Even though we discussed CAN in two dimensions, it can be implemented in an arbitrary number of dimensions. For d dimensions, each node needs to keep track of 2d neighbors.


Chord constructs a logical ring representing all possible hash values (bucket positions). Note that for hash functions such as a 160-bit SHA-1 hash, this is an insanely huge value of approximately 1.46 x 1048 slots. Each node in the system is assigned a position in the huge logical ring by hashing its IP address. Because the vast majority of bucket positions will be empty, (key, value) data is stored either at the node to which the key hashes (if, by some rare chance, the key hashes to the same value that the node’s IP address hashed) or on a successor node, the next node that would be encountered as the ring is traversed clockwise. For a simple example, let us suppose that we have a 4-bit hash (0..15) and nodes occupying positions 2 and 7. If a key hashes to 4, the successor node is 7 and hence the computer at node 7 will be responsible for storing all data for keys that hash to 4. It is also responsible for storing all data to keys that hash to 3, 5, 6, and 7.

If a node only knows of its clockwise neighbor node, then any query that a node cannot handle will be forwarded to a neighboring node. This results in an unremarkable O(n) lookup time for a system with n nodes. An alternate approach is to have each node keep a list of all the other nodes in the group. This way, any node will be able to find out out which node is responsible for the data on a key simply by hashing the key and traversing the list to find the first node ≥ the hash of the key. This gives us an impressive O(1) performance at the cost of having to maintain a full table of all the nodes in the system on each node. A compromise approach to have a bounded table size is to use finger tables. A finger table is a partial list of nodes with each element in the table identifying a node that is a power of two away from the current node. Element 0 of the table is the next node (20 = 1 away), element 1 of the table is the node after that (21 = 2 away), element 2 of the table four nodes removed (22), element 3 of the table eight nodes removed (23), and so on. With finger tables, O(log n) nodes need to be contacted to find the owner of a key.

Distributed hash table case study: Amazon Dynamo

Goal: Create a practical implementation of a key-value store that is highly scalable, completely decentralized, replicated, and efficient.

Dynamo is a highly available key-value store created internally within Amazon. Many services within Amazon, such as best seller lists, shopping carts, user preferences, session management, etc., need only primary key access to data. A multi-table relational database system would be overkill and would limit scalability and availability. With Dynamo, an application can configure its instance of Dynamo for desired availability (# replicas) and consistency.

Dynamo supports two operations: get(key) and put(key, data, context). The data is an arbitrary bunch of bytes (typically less than 1 MB) and is always identified by a unique key. The (key, data) values are distributed over a large set of servers and replicated for availability. The context is a value that is returned with a get operation and then sent back with a future put operation. The context is meaningless to the user program and is used like a cookie. Internally, it contains versioning information.

The system is decentralized: there is no need for a central coordinator to oversee operations. Every node has equivalent responsibilities and the system can grow dynamically by adding new nodes.

Partitioning: scalability

Since massive scalability is a key design aspect of Dynamo, data storage is distributed among many nodes. Dynamo relies on consistent hashing to identify which node will be responsible for a specific key. With normal hashing, a change in the number of slots in the table (e.g., number of nodes since we are hashing to find a node) will result in having to remap all keys. With consistent hashing, this is not the case and only K/n keys need to be remapped, where K is the number of keys and n is the number of slots (nodes).

Nodes are arranged in a logical ring. Each node is assigned a random “token” (a number) that represents its position in the logical ring. It is responsible for all keys that hash to any number between the node’s number and its predecessor’s number. When a key is hashed, a node will traverse a logical ring of node values to find the first node with a position greater than or equal to that hashed result. This tells it which node is responsible for handling that key. Since nodes are not added that frequently, every node can store a cached copy of this ring (the size is the list of N hash values and server addresses, where N is the number of nodes). Moreover, to avoid the extra hop resulting because a client sent a request to the wrong Dynamo node, a client may also get a copy of this table. The system has been described as a zero-hop distributed hash table (DHT).

Adding or removing nodes affects only the node’s immediate neighbor in the ring. If a node is added, the successor node will give up some range of values to the new node. If a node is removed, the key, value data it manages will have to be moved over to the successor. In reality, a physical node is assigned to manage multiple points in the ring (multiple ranges of hash values). The nodes in the algorithm that we just discussed are really virtual nodes. A real machine, or physical node, will be responsible for managing multiple virtual nodes. The advantage of doing this is that we can attain balanced load distribution. If a machine becomes unavailable, the load is evenly distributed among the available nodes. If a new machine is added, each virtual node within it will take a set of values from other virtual nodes, effectively taking on a small amount of load from a set of other machines instead of from just the neighboring system (successor node). Moreover, if a more powerful machine is added, it can be assigned a larger number of virtual nodes so that it bears a greater percentage of the query workload.

Replication: availability

Dynamo is designed as an always writable data store. Data can be replicated on N hosts, where the value N is configurable per instance of Dynamo. The node that is responsible for storing the key-value is assigned the role of coordinator for that key and is in charge of replication for that key. The coordinator copies (key, value) data onto N-1 successor nodes clockwise in the ring. Copying is asynchronous (in the background) and Dynamo provides an eventually consistent model. This replication technique is called optimistic replication, which means that replicas are not guaranteed to be identical at all times. If a client cannot contact the coordinator, it sends the request to a node holding a replica, which will then periodically try to contact the coordinator.

Because data is replicated and the system does not provide ACID guarantees, it is possible that replicas may end up holding different versions. Dynamo favors application-based conflict resolution since the application is aware of the meaning of the data associated with the key and can act upon it intelligently. For example, the application may choose to merge two versions of a shopping cart. If the application does not want to bother with this, the system falls back on a last write wins strategy. Dynamo uses vector clocks to identify versions of stored data and to distinguish between versions that have been modified concurrently from versions that are causally related (a later one is just an update of an earlier one). For example, one can identify whether there are two versions of a shopping cart that have been updated independently or whether one version is just a newer modification of an older shopping card. A vector clock value is a set of (node, counter) pairs. This set of values constitutes the version of the data. It is returned as the “context” with a get operation and updated and stored when that context is passed as a parameter to a put operation.

Because Dynamo is completely decentralized and does not rely on coordinator (unlike GFS, for example), each node serves three functions:

  1. Managing get and put requests. A node may act as a coordinator responsible for a particular key or may forward the request to the node that has that responsibility.

  2. Keeping track of membership and detecting failures. A node will detect if other nodes are out of service and keep track of the list of nodes in the system and their associated hash ranges.

  3. Local persistent storage. Each node is responsible for being either the primary or replica store for keys that hash to a certain range of values. These (key, value) pairs are stored within that node using a variety of mechanisms depending on application needs. These include the Berkeley Database Transactional Data Store, MySQL (for large objects), or an in-memory buffer with persistent backing store (for highest performance).

Domain Name System

Goal: Build a wide-area distributed name servers for resolving information about Internet domain names

The Internet Domain Name System (DNS) is the naming system for nodes on the Internet that associates human-friendly compound names with numeric IP addresses and other information about that node. IP domain names (e.g., cs.rutgers.edu) are hierarchical and assigned entirely independently from their corresponding addresses (e.g., Any name can be associated with any address.

IP addresses are managed globally by the IANA, the Internet Assigned Numbers Authority, which hands off management to Regional Internet Registries that, in turn assign blocks of IP addresses to ISPs. Internet domain names are also assigned hierarchically, with hundreds of top-level domains (.com, .org, .ca, .es, .yoga, .wedding, …) under a single root. The IANA delegates the management of individual top-level domain names to various companies. These are called domain name registry operators, each of which operates a NIC, or Network Information Center. Domain name registrars are companies that allow end users to register domain names. They update the registry database at the NIC. For example, the .yoga top-level domain is operated by a company in Ireland called Top Level Domain Holdings Ltd. When you register a domain with a domain name registrar such as Namecheap or GoDaddy, the company contacts Top Level Domain Holdings to update the master database of information about domains registered under .yoga.

The Domain Name System is implemented through a collection of DNS servers. It is an example of a distributed name server for resolving Internet domain names into IP addresses. Each server is responsible for answering questions about machines within its zone. A zone is a subtree of the Internet Domain name space that is managed by that server. A registered domain name must include the address of one or more name servers that are responsible for answering queries about hosts within that domain.

Each DNS server will do one of several things: (1) answer a request if it already knows the answer, (2) contact another name server(s) to search for the answer, (3) return an error if the domain name does not exist, or (4) return a referral: the address of another name server that may know more of the answer.

For example, a search for mail.pk.org (with no cached information) begins by querying one of several replicated root name servers. These keep track of the name servers responsible for top-level domains. This query will give you a referral to a name server that is responsible the .org domain. Querying that name server will give you a name server responsible for pk.org. Finally, the name server responsible for pk.org will provide the IP address or an authoritative “this host does not exist” response. Along the way, referrals may be cached so that a name server need not go through the entire process again.

An iterative name resolution process is one where you go through the name hierarchy to find lower-level name servers that will eventually take you to the final component of the name. This is the way DNS servers operate when they return a referral to another name server if they are not responsible for an address and is the common way for DNS server to work since a server does not have to maintain any state. A recursive name resolution process is one where the name server itself will invoke other name servers as needed to get the answer to the query. DNS supports both forms. The local DNS server, called a DNS resolver (for example at your ISP) will generally support recursive requests and perform the iteration itself rather than passing referrals back to the client.

Distributed transactions

Goal: Create a fault tolerant algorithm that ensures that all group members agree to commit (make their actions permanent). If agreement is not reached, then all group members must agree to abort (undo any changes).

A transaction is a set of operations that operates on, and typically modifies, data. A key facet of a transaction is that it has to be atomic — all results have to be made permanent (commit) and appear to anyone outside the transaction as an indivisible action. If a transaction cannot complete, it must abort, reverting the state of the system to that before it ran. If several transactions run concurrently, the end result must be the same as if those transactions ran in some (any) serial order. The specific order in which transactions execute is known as a schedule.

A transaction-based model guarantees a set of properties known as ACID:

The transaction happens as a single indivisible action. Everything succeeds (and the transaction commits) or else the entire transaction is rolled back (the transaction aborts). Other transactions do not see intermediate results.
A transaction cannot leave the data in an inconsistent state. If the system has invariants, they must hold after the transaction. For example, the total amount of money in all accounts must be the same before and after a “transfer funds” transaction.
Isolated (Serializable)
Transactions cannot interfere with each other. If transactions run at the same time, the final result must be the same as if they executed in some serial order.
Once a transaction commits, the results are made permanent. No failures after a commit will cause the results to revert.

A write-ahead log (or transaction log) is used to enable rollback: reverting to the previous state when aborting a transaction. It is also crucial in maintaining the state of the transaction in a stable place in case the process or computer should die. The system will be able to read the log and recover to where it was in the transaction commit protocol.

In a distributed transaction environment, multiple processes participate in a transaction, each executing its own sub-transaction that can commit only if there is unanimous consensus by all participants to do so. Each system runs a transaction manager, a process that is responsible for participating in the commit algorithm algorithm to decide whether to commit or abort its sub-transaction. One of these transaction managers may be elected as the coordinator and initiates and runs the commit algorithm. Alternatively, the coordinator could be a separate process from any of the transaction participants.

Two-phase commit protocol

The two-phase commit protocol uses atomic multicasts to reach a consensus among the group on whether to commit or abort. It uses a coordinator to send a request (“can you commit?”) to every member of the group (reliably, retransmitting as often as needed until all replies are received). Phase 1 is complete when every member of the group (each participant) responds. If the coordinator gets even a single abort response from a participant, it must tell all participants to abort the entire transaction. Otherwise, it will tell all participants to commit it. In phase 2, the coordinator sends the commit or abort order and waits for a response from everyone. In summary, in phase 1, the coordinator gets everyone’s agreement and in phase 2, the coordinator sends the directive to commit or abort.

The write-ahead log in stable storage is crucial for ensuring atomic multicasts (the write-ahead log is also important for transaction rollback, which is used for aborts!). For example, if a participant sent the coordinator a commit response for phase 1 and then died, it must be able to reboot and reconstruct the transaction state from the log; it cannot change its mind after rebooting. The two-phase commit protocol cannot proceed until every participant acknowledges every message.

The two phase commit stalls if any member, coordinator or any participant, dies. It has to wait for the recovery of that member before proceeding with the protocol. A recovery coordinator can step in in certain circumstances. If the coordinator died and a recovery coordinator took over, it queries the participants. If at least one participant has received a commit message then the new coordinator knows that the vote to commit must have been unanimous and it can tell the others to commit. If no participants received a commit message then the new coordinator can restart the protocol. However, if one of the participants died along with the coordinator, confusion may arise. If all the live participants state that they have not received a commit message, the coordinator does not know whether there was a consensus and the dead participant may have been the only one to receive the commit message (which it will process when it recovers). As such, the coordinator cannot tell the other participants to make any progress; it must wait for the dead participant to come back.

Three-phase commit protocol

The two-phase commit protocol is a blocking protocol. If the coordinator or any participant stops running, the entire protocol stalls until the failed process is restarted. The three-phase commit protocol is similar to the two-phase commit protocol but allows entities to time out in certain cases to avoid indefinite waits.

The protocol also supports the use of a recovery coordinator by introducing an extra phase where participants are told what the consensus was so that any participant that received this information before a coordinator died could inform a standby coordinator whether there was a unanimous decision to commit or abort. The three-phase commit protocol propagates the knowledge of the outcome of the election to all participants before starting the commit phase.

In phase 1 of the three-phase commit protocol, the coordinator sends a query to commit request (“can you commit?”) to every member of the group (reliably, retransmitting as often as needed until all replies are received over a period of time). If any of the participants respond with a no or if any failed to respond within a defined time, then send an abort to every participant.

In phase 2, the coordinator sends a agreed to commit message to all participants and gets acknowledgements from everyone. When this message is received, a participant knows that the unanimous decision was to commit. If a participant fails to receive this message in time, then it aborts. At this point, the participants do not commit. However, if a participant receives an abort message then it can immediately abort the transaction.

In phase 3, the coordinator sends a commit message to all participants telling them to commit. If a participant fails to receive this message, it commits anyway since it knows from phase 2 that there was the unanimous decision to commit. If a coordinator crashes during this protocol, another one can step in and query the participants for the commit decision. If every participant received the prepare-to-commit message then the coordinator can issue the commit directives. If some participants received the message, the coordinator now knows that the unanimous decision was to commit and can re-issue the prepare-to-commit request followed by a commit. If no participant received the message, the coordinator can restart to protocol or, if necessary, restart the transaction.

The three-phase commit protocol accomplishes two things:

  1. Enables use of a recovery coordinator. If a coordinator died, a recovery coordinator can query a participant.

    • If the participant is found to be in phase 2, that means that every participant has completed phase 1 and voted on the outcome. The completion of phase 1 is guaranteed. It is possible that some participants may have received commit requests (phase 3). The recovery coordinator can safely resume at phase 2.

    • If the participant was in phase 1, that means NO participant has started commits or aborts. The protocol can start at the beginning

    • If the participant was in phase 3, the coordinator can continue in phase 3 – and make sure everyone gets the commit/abort request

  2. Every phase can now time out – there is no indefinite wait like in the two-phase commit protocol.

    Phase 1:
    Participant aborts if it doesn’t hear from a coordinator in time
    Coordinator sends aborts to all if it doesn’t hear from any participant
    Phase 2:
    If coordinator times out waiting for a participant – assume it crashed, tell everyone to abort
    If participant times out waiting for a coordinator, elect a new coordinator
    Phase 3:
    If a participant fails to hear from a coordinator, it can contact any other participant for results

The three-phase commit protocol suffers from two problems. First, a partitioned network may cause a subset of participants to elect a new coordinator and vote on a different transaction outcome. Secondly, it does not handle fail-recover well. If a coordinator that died recovers, it may read its write-ahead log and resume the protocol at what is now an obsolete state, possibly issuing conflicting directives to what already took place. The protocol does not work well with fail-recover systems. Perhaps most importantly, the three-phase commit protocol is less efficient than the two-phase commit protocol. We expect the vast bulk of transactions to commit without aborts or failed coordinators. With the three phase commit protocol, we incur an extra set of messages (which are logged onto stable storage) to enable systems to time out and abort or to enable a recovery coordinator to take over. In other words, we pay a 50% communication penalty to handle an event that we rarely expect to occur.

Brewer’s CAP Theorem

Ideally, we’d like three properties in a replicated distributed system:

The data retrieved from the system should be the same regardless of which server is contacted.
The system should always be available to handle requests.
Partition tolerance
The system should continue to function even if some network links do not work and one group of computers cannot talk to another. For example, a link between two data centers may go down.

Eric Brewer’s CAP Theorem states that you can have either consistency or availability in the presence of partitions but not both. Quite often, the theorem is summarized as: if you want consistency, availability, and partition tolerance, you have to settle for at most two out of three of these. However, we expect real-world distributed systems to be at risk of network partitions, so the choice is really between consistency and availability.

With distributed architectures, we generally strive for high availability and the ability to withstand partitions (occasional breaks in the ability of nodes to communicate). Hence, we will have to give up on consistency and break the guarantees of ACID. An alternative to the requirements of ACID is BASE.

BASE stands for Basic Availability, Soft-state, Eventual consistency. Instead of requiring consistency after every transaction, it is enough for the data to eventually get to a consistent state. The downside is that some processes may access stale data which has not yet been brought into a consistent state.

Concurrency control

Goal: Allow multiple transactions to run concurrently but ensure that resource access is controlled to give results that are identical if they ran in some serial sequence. This preserves the “Isolated” guarantee of ACID transaction semantics.

A schedule is a sequence of transactions. A serial schedule is one where transactions are executed sequentially: a new transaction starts when a previous one commits (or aborts). This is inefficient, since we are forcing transactions to run one at a time. However, it ensures isolation (the I in ACID).

The goal of concurrency control is to allow multiple transactions to run concurrently (which is great for performance) while ensuring that data access is controlled such that the net effect is the same as if the transactions all ran in some serial order. That is, we cannot have the net result be one where transactions read an interim state of data from another transaction.

Two-phase locking (2PL)

Exclusive locks, via a lock manager, help us accomplish this. A transaction can grab locks for the resources it needs. That ensures mutual exclusion. To ensure serializability, it is important that a transaction does not acquire any new locks after it has released a lock. If we do not do this, transaction A may create a result based on the completion of transaction B while transaction B might have used data that A wrote and unlocked. This technique is known as two-phase locking (2PL). The first phase is the growing phase, in which locks are acquired. The second phase is the shrinking phase, in which locks are released.

Strong strict two-phase locking (SS2PL)

The problem with two-phase locking is that, if a transaction that has released some locks now aborts, there is a chance that other transactions have used data that was modified by the transaction. In that case, those transactions (and all transactions that depend on them) have to abort as well. This condition is called cascading aborts. Strong strict two-phase locking (SS2PL) avoids this problem by requiring all locks to be held until the end. The shrinking phase, in effect, is an atomic operation that occurs at the very end of the transaction. You lose concurrency this way but avoid having to process cascading aborts.

Exclusive and shared locks

Exclusive locking for every resource access is a bit aggressive. Consider a transaction that just reads data. It needs to lock that data to ensure that no other transaction modifies it but nothing would go wrong if another transaction also wanted to read that same data. We can achieve greater concurrency by distinguishing read locks from write locks. Read locks (called shared locks) need not be exclusive: any number of them can be granted. However, if there are any read locks on an object, a write lock cannot be granted and the transaction must wait. Similarly, if there is a write lock (called an exclusive lock) on an object then any read lock requests or write lock requests must wait.

Two-version-based concurrency control

A way of increasing concurrency even beyond read/write locks is two-version-based concurrency control. In this case, one transaction writes tentative versions (private versions) while other transactions read existing, previously committed versions of the data. This allows transactions to request read locks even if another transaction has a write lock on the object. When the transaction that has a write lock is ready to commit, it converts its write locks to a commit locks, waits until any transactions that have a read lock for that object complete, and then makes its modified version permanent. During a commit lock, no other transaction can grab any lock on that object. This allows increased concurrency but transactions that write data risk waiting when they attempt to commit and make their data permanent.

Optimistic concurrency control

Concurrency control techniques that rely on locking force locks to be held while the transaction is using the resource, or, in some cases, until the end of the transaction. This restricts the maximum amount of concurrency that may be achieved in the system. Optimistic concurrency control techniques assume that transactions are more likely to complete than not. As such, it is better to put more effort on rectifying errors from having used data from aborted transactions than to lock access to the data. A fully optimistic system uses no locks and checks for conflicts at commit time. Optimistic concurrency control has three phases of operation:

  1. Working phase. The transaction reads and writes data. A private workspace is often, but not always, used to keep non-committed results isolated.

  2. Validation phase. When the transaction is ready to commit, a check is made to see if there is any conflict with other transactions that took place during this time. For example, if some transaction A modified data and committed but transaction B read data before A modified that data, then transaction B cannot be allowed to commit because that would violate serializability.

  3. Update phase. Tentative changes are made permanent and the transaction commits.

Timestamp ordering

Timestamp ordering allows lock-free concurrency control by keeping track of two timestamps per object: the timestamp of the last committed that read the object and the timestamp of the last committed transaction that wrote the object. If a transaction wants to write an object, it compares its own timestamp with the object’s write timestamp. If the object’s timestamp is older then we have good ordering and the transaction can proceed. Otherwise the transaction aborts and is restarted.

Multiversion concurrency control

We can extend two-version-based concurrency control to support multiple versions of an object and apply the concepts of timestamp ordering to create multiversion concurrency control (MVCC).

In multiversion concurrency control, no locks are used. The system can maintain multiple versions of an object (e.g., a field in a database table). Each transaction has a timestamp associated with it that marks the start of the transaction. Each version of an object has two timestamps: the read timestamp, which records the time the object was read, and the write timestamp, which records the time the object was modified.

With MVCC, a transaction will never have to wait to read an object; it simply reads an the latest version of the object that has a write timestamp earlier than that of the transaction. A write cannot take place if the are any other uncommitted transactions that read the object and have a timestamp earlier than the read timestamp of our transaction. This is because an earlier transaction depends on a previous value of the object and we need to consider the possibility that the earlier transaction may modify that object.

Locks vs. leases

When processes rely on locks, there is an implicit assumption that fault-tolerance is lost. The process holding a lock may forget to release a lock or may die. A way to avoid this problem is to use leases instead of locks. A lease is a lock with an expiration time. The downside with a leasing approach is that the resource is unavailable to others until the lease expires. Now we have a trade-off: have long leases with a possibly long wait after a failure or have short leases that need to be renewed frequently.

Fault-tolerant consensus protocols, such as Raft or Paxos, can be used for granting leases to resources. However, the overhead of the algorithm isn’t attractive for all situations, especially when short-term leases are needed. A compromise is to use hierarchical leases, or sub-leases. The top-level lease is a coarse-grained lease while sub-leases are typically fine-grained leases. A consensus algorithm is used to elect a coordinator. This coordinator is granted a lease on a set of resources (or all resources) in the system. Now the coordinator hands out leases to processes that need those resources. When the coordinator’s main lease expires, a consensus algorithm has to be run again to grant a new lease and possibly elect a new coordinator but it does not have to be run for every client’s lease request; that is simply handled by the coordinator.

Distributed Deadlock

Goal: Determine if a transaction will wait for resources in such a manner as to create an indefinite wait, called deadlock. Find ways to ensure that this will not happen.

To ensure consistency, a transaction get locks for resources (e.g., objects, database rows, files) it needs to access. This ensures that other transactions will not modify the data it needs during its execution or read intermediate results. A deadlock occurs when there is a circular dependency on processes holding and requesting locks on resources.

Note that we will use processes and transactions interchangeably. A transaction is simply a process that can be aborted. That is, it can revert any modifications that it made before exiting.

The four conditions that must hold are:

  1. mutual exclusion: A resource can be held by at most one process.
  2. hold and wait: Processes that already hold resources can wait for another resource.
  3. non-preemption: A resource, once granted, cannot be taken away.
  4. circular wait: Two or more processes are waiting for resources held by one of the other processes.

Three approaches can be used for managing deadlock in distributed systems.

A centralized deadlock detection approach uses a central coordinator to manage a resource graph of processes and the resources they are using. Each time a process gets a lock or releases a lock on a resource, it sends a message to this coordinator (waiting-for or releasing). The coordinator builds a graph of all the processes and the resources they are holding or waiting for. This is called a wait-for graph. If a cycle is detected, in the graph then the coordinator knows a deadlock exists. In some cases, if release and waiting-for messages are received out of order, they can lead the coordinator to believe that there is a deadlock cycle when none really exists. In reality, the release message should have been processed first and would cause the deadlock to not happen. This condition is known as phantom deadlock.
The Chandy-Misra-Haas distributed deadlock detection algorithm has a process send a probe message to a process that is holding a resource prior to waiting for the resource. The receiving process forwards the probe to every process that contains resources it is waiting for. This is called edge chasing. If the original process receives its own probe message then it knows that a dependency cycle, and hence deadlock, will exist if it waits for the resource it wants.
Deadlock prevention requires the system to ensure that at least one of the conditions that is necessary for deadlock cannot occur. This can be implemented by making decisions based on the timestamps of each transaction competing for resources.
The wait-die algorithm states that if a younger process is using the resource, then the older process (that wants the resource) waits. If an older process is holding a resource, then the younger process that wants the resource kills itself (that’s ok; transactions are designed to be restartable). This forces the resource utilization graph to be directed from older to younger processes, making cycles impossible.
The wound-wait algorithm ensures that the graph flows from young to old and cycles are again impossible. an old process will preempt (kill) the younger process that holds a resource. If a younger process wants a resource that an older one is using, then it waits until the old process is done.
This requires knowing which locks will be needed by transactions and then managing resource allocation or transaction scheduling to ensure that deadlock will not happen. This requires a priori knowledge of which resources each transaction will need and when each transaction is scheduled to execute. As such, it is usually not a practical approach in distributed environments (it can be used in operating systems; the banker’s algorithm tries to find an execution sequence that will avoid deadlock … but we will not cover it here). One way in which this can be implemented is that a transaction will try to get all the locks it needs before it starts to execute. If it cannot get every one of those locks, it will abort rather than wait.
Last modified November 3, 2023.
recycled pixels