Exam 2 study guide

The one-hour study guide for exam 2

Paul Krzyzanowski

Latest update: Tue Dec 12 15:04:43 PST 2017

Disclaimer: This study guide attempts to touch upon the most important topics that may be covered on the exam but does not claim to necessarily cover everything that one needs to know for the exam. Finally, don't take the one hour time window in the title literally.

Mutual exclusion

Go here for lecture notes

Goal: Allow processes to request and be granted exclusive access to named resources. The algorithms should be designed to avoid starvation and ensure that exactly one requesting process gets the resource even if multiple processes make requests concurrently.

Mutual exclusion is responsible for making sure that only one process or thread accesses a resource at a time. In non-distributed systems, it is accomplished through mechanisms such as semaphores and monitors and, at a low level, through instructions such as test-and-set locks. None of these mechanisms work across networks of computers. We need an algorithm that will allow processes to compete for access to a resource and ensure that at most one process will be granted that access. The algorithm must be fair: it has to ensure that all processes that want a resource will eventually get a chance to get it. If this is not the case, starvation occurs, where a process will have to wait indefinitely for a resource.

The “resource” is an arbitrary name that all processes agree to use. It may refer to a critical section of code, a physical device, or some network service. A mutual exclusion algorithm allows a process to request access to this resource and be granted exclusive access to it. This is known as a lock. If the lock has a timeout associated with it, then it is known as a lease.

Distributed algorithms fall into three categories. A centralized approach uses a central coordinator that is responsible for granting access permissions.

A token-based approach allows a process to access a resource if it is holding a “token”; that is, it received an unsolicited message where ownership of the message allows the process to access the resource. Sending the message (token) means forfeiting access.

A contention-based algorithm is one where all processes coordinate together on deciding who gets access to the resource.

The centralized algorithm is a server that accepts REQUEST messages for a resource. If nobody is using the resource, it responds with a GRANT message. If somebody is using the resource, that client simply does not respond. When a client is done with a resource, it sends the server a RELEASE message. The server then sends a GRANT message to the next client in the queue (if there is one).

The token ring algorithm creates a logical communication ring among the processes in the group. A message, called a token, is created for each resource. This token is passed from process to process along the ring. If a process receives a token and does not need to access that resource, it simply passes the token to the next process. Otherwise, it will hold on to the token until it is done with the resource.

Lamport’s mutual exclusion algorithm requires each process that wants a resource to send a timestamped request for the resource to all processes in the group, including itself. Receipt of each message is immediately acknowledged by every process. Each process places the received message in a local priority queue that is sorted by the Lamport timestamp that is in each message (or any totally unique timestamp). A process decides whether it can access the resource by checking whether its own request is the earliest (first) one in the queue of all requests that it has received. If so, it accesses the resource and, when done, sends a release to all members (including itself). The receipt of a release message causes each process to remove the process from the ordered queue. If a process now finds itself as the earliest process in the queue, it knows that it is now its turn to access the resource.

Lamports mutual exclusion algorithm summary: send a request message to everyone in the group, including yourself. Get an acknowledgement from everyone. Each process stores request messages in a priority queue sorted by timestamp. The process ID at the head of the queue can access the resource. When it is done, it sends a release message to everyone.

The Ricart & Agrawala algorithm. like Lamport’s, is also based on using reliable multicasts. A process that wants to access a resource sends a request to all other processes in the group and waits for all of them to respond. If another process is currently using the resource, that process delays its response until it is done. If process A and process B sent out requests concurrently (i.e., two systems want the same resource concurrently), each system compares the timestamp of that request with that of its own request. If process A received a request from process B that is older (a lower timestamp) than the one it sent, then process A will give process B priority to access the resource by sending a response to process B . Otherwise, if process A has the earlier timestamp, it will queue the request from B and continue to wait for all acknowledgements, sending a response to B (and any other processes who wanted the resource) only when it is done using the resource. The key point is that processes A and B will make the same comparison and exactly one will hold back on sending the response.

Ricart & Agrawala’s algorithm summary: send a request message to everyone in the group and wait for acknowledgements from everyone. Any process using the resource will delay sending an acknowledgement.

The Lamport and Ricart & Agrawala algorithms are similar in that they are both contention based and truly distributed. Ricart & Agrawala’s requires fewer messages since there is no explicit release message that needs to be sent; acknowledgements are simply delayed. Neither are efficient compared with the centralized algorithm.

Election algorithms

Go here for lecture notes

Goal: Allow all processes in a group to elect exactly one of the processes in the group. All processes should agree to elect the same process.

Election algorithms are designed to allow a collection of processes to agree upon a single coordinator. All candidate processes are considered to be equally capable of acting as a coordinator. The task is to select exactly one of these processes — and make sure that every process is aware of the selection. In designing these algorithms, we assume that every process has a unique ID (for example, a process ID combined with the machine’s address). The algorithms we examine in this section will choose a winner (a coordinator) by selecting either the highest available process ID or the lowest one. It is important that whatever criteria is used will never result in one process choosing a different winner than another.

The bully algorithm selects the largest process ID as the coordinator. If a process detects a dead coordinator, it sends an ELECTION message to all processes with a higher ID number and waits for any replies. If it gets none within a certain time, it announces itself as a coordinator. When a process receives an ELECTION message it immediately sends a response back to the requestor (so the requestor won’t become the coordinator) and holds an election to see if there are any higher-numbered processes that will respond. If not, then it declares itself as coordinator.

The ring election algorithm requires a logical communication ring among the processes in the group (as we had in the token ring mutual exclusion algorithm). When a process detects a non-responding coordinator, it creates an ELECTION message containing its own process ID and sends it to the next process in the ring. If the process is dead, then it sends the message to the following process. This sequence continues, with each process sending an ELECTION message to the next process in the ring, skipping dead processes until it finds a process that can receive the message. When a process receives an ELECTION message, it adds its own process ID to the body and sends that message out to its neighboring process. When the election message circulates back to the original sender (the sender detects this by seeing its process ID at the head of the list, indicating that it started the election), the sender looks at the list of active processes in the message body and chooses a coordinator from among them. Since multiple elections may have been started concurrently, the algorithm for choosing the leader must yield the same result among the different list. Hence, selecting the highest or the lowest process ID will work. Selecting the first or last process in the list will not.

The Chang and Roberts ring algorithm optimizes the ring algorithm in two ways. First, there is no need to keep the entire list of live processes in the election messages; we can perform the election as we go along. If the goal is to vote for the highest-numbered live process ID and a process receives an election message, it does one of two things: (1) pass it untouched if its process ID is smaller than the one in the message, or (2) replace the process ID In the message with its own if the process ID is greater than the one in the message. When a process receives a message that contains its own process ID, it knows that the message has come full circle and it is the winner. It will then notify each process of the outcome.

The second optimization is to avoid having multiple election messages circulate if multiple processes have initiated elections concurrently. To do this, a process keeps track if it has participated in an election (i.e., has initiated or forwarded an election message). If a process receives a message with a smaller process ID than its own and it is a participant in an election, it simply discards the message. If, on the other hand, the received message contains a greater process ID than its own, it will forward the message even if it is a participant in an election since the message contains a candidate that takes priority over the previous one that was forwarded. Once election results are announced, each process clears its status of participaing in an election.

One problem with election algorithms is when a network gets partitioned (also known as segmented): one set of processes is separated from another and cannot communicate with the other. In this case, each segment may elect its own coordinator and problems can arise. This is known as a split brain situation. To combat this, a redundant network is needed or some alternate communication mechanism to enquire about the state of remote processes.

Another approach that is often taken is to require a majority of prospective coordinators to be available. A quorum is the minimum number of processes that need to be available to participate in an operation. In this case, the operation is an election and we need a quorum of over 50% of the processes. If a network is partitioned into two or more segments, at most one of the partitions will have over 50% of the processes. Those processes that do not will refuse to elect a coordinator.

Consensus

Go here for lecture notes

Goal: Create a fault-tolerant distributed algorithm that enables a bunch of processes to agree on a single value.

The goal of a consensus algorithm is to have a group of processes agree on a common value. The value must be one of the values that was proposed by at least one of the processes. Once the processes agree on the value, it is considered final.

A common use of consensus is in replicated state machines to build fault-tolerant systems. Processes send messages (commands, such as data updates) to the group of replicated servers. These messages are received by each of the servers and added to a log on the server. If one process wants to set “name=value_1” and, concurrently, another process tries to set “name=value_2”, we’d like every instance of our database to consistent. Every copy should have either “name=value_1” or “name=value_2”. It is not acceptable to have some instances have “name=value_1” while others have “name=value_2”. This log must be consistent across all servers so that when the commands in the log are applied to the state machine (i.e., the processes on the servers read their logs and apply the commands) they are all processed in the same sequence on each server. Consensus enables all processes to agree on the order in which commands are added to the replicated logs. Another example of choosing a value is running an election: each process may propose itself as a contender and the consensus algorithm will pick one winner. Yet another example is mutual exclusion: agreement on who gets a lock on a resource.

Achieving consensus is easy if processes don’t fail or we are willing to wait for all processes to recover if they die (as with the two-phase commit protocol). The challenge with a consensus algorithm is to achieve consensus in the presence of faults.

A consensus algorithm must satisfy four goals:

Validity
The outcome must be one of the proposed values.
Uniform agreement
No two processes may ultimately agree on different values.
Integrity
A process must ultimately agree on a single value. It cannot change its mind later.
Progress
The algorithm must eventually terminate such that every process will decide on the same value.

Paxos

Paxos is a fault-tolerant distributed consensus algorithm. It enables participants to agree on a proposed value (arbitrary data) as long as a majority of processes running the Paxos algorithm are alive (more details on this in a little while). Paxos is a general-purpose consensus algorithm. Among other things, it can be used to create agreement among participants on a globally consistent (total) order for client messages. The agreed-upon data, for example, can be commands to update the state of a system or data in a file.

Paxos is made up of three groups of agents: proposers, acceptors, and learners. These need not be separate systems and one system may, and usually does, take on multiple roles. Proposers receive requests from clients. They are responsible for running the protocol by sending messages to and receiving messages from acceptors. Acceptors provide the basis of fault tolerance in the system. Once acceptors agree to a value, they propagate the result to learners, which simply forward the results to the processes that need to act on them.

For now, let’s assume that there is only a single proposer to which all clients send requests. Also, let’s assume there is a single learner.

Without contention from other processes or any failure, what Paxos does is trivial. A client gives the proposer the message it wants to add to the servers. The proposer picks the next highest sequence number that it plans to associate with the requested event and asks all the acceptors to reserve that number. The acceptors agree (there is no reason to disagree!) and the proposer sends the requested message from the client associated with that value. This sequenced message then gets propagated to wherever it has to go by the learner.

Let us now consider the case of several processes making concurrent requests. A proposer picks a sequence number and asks the acceptors to reserve that number, each acceptor makes a promise to not accept any proposals for lower sequence numbers. If any acceptor already accepted a higher sequence number from someone else, it will reject this lower-numbered proposal and the client will have to try again and have the proposer issue a request with another, higher, sequence number. Lower-numbered proposals are rejected to handle the case of possibly old messages reaching acceptors. A rejection includes the highest accepted sequence number and its corresponding value from that acceptor. The proposer is obligated to continue the algorithm using that data instead of its original proposal, which has been rejected. If needed, the client will have to retry the original proposal later by re-running the algorithm. For now, the proposer has to finish this instance of the algorithm to push for group consensus.

Now let’s consider failure. A majority of the acceptors must be alive for the algorithm to function. This majority of live acceptors is called a quorum. Suppose the quorum of acceptors makes a promise to accept a particular sequence number. If some of those acceptors die but others (with old state) come alive, there is guaranteed to be at least one acceptor that remembers the promise and will therefore be able to reject any requests with lower numbers. The quorum is important for two reasons:

  1. It ensures continuity of proposal number information.
  2. It ensures that split-brain problems don’t occur if the network becomes partitioned. It is not possible for a minority of acceptors to continue running the algorithm.

When the entire quorum of acceptors accepts the proposal, the accepted message is sent the learners, which can act on the request (e.g., update system state on a set of machines).

Paxos is a two-phase protocol:

Phase 1: Prepare
The proposer sends the client’s message with a unique proposal number to as many acceptors as it can (at least a majority). Each successive proposal must have a higher number.

When an acceptor receives a proposal, it checks to see if this is the highest proposal number that it has seen. If so, it promises to ignore any smaller proposal numbers it gets. It responds back with either the current proposal (if that’s the highest one it has seen) or the highest proposal that it has received so far (along with the corresponding message).

Phase 2: Accept
After sending proposals to the group of acceptors, the proposer must select the highest numbered proposal that it received and send an accept message to the acceptors. This allows information about the current highest proposal number to propagate to acceptors that may not have received it (for example, because they just came up).

As long as no acceptor receives a higher proposal number during this time, it accepts the proposal, sends an acknowledgement to the proposer, and sends the sequenced message to the learner. The learner discards duplicate messages (that came from multiple acceptors) and sends them to the servers that act on these messages.

One proposer or many?

We temporarily assumed there was only one proposer. For fault tolerance, there could be any number of proposers with which clients communicate. However, that increases the chance that one proposer will pick a proposal number that is less than one that anther proposer chose, resulting in rejection of the proposal. To minimize message rejection, Paxos chooses one of the proposers as the leader, which receives all client requests. Should it die, any other proposer can step in as the leader. A single leader just makes it easier to ensure that we have unique, incrementing sequence numbers. While the algorithm works without a leader, the protocol guarantees progress only if there is a leader. If two proposers act as leaders, it is possible to have a situation where each proposer proposes conflicting updates, leading to no progress in the algorithm.

Raft

Raft is another consensus protocol, created as an easier-to-understand alternative to Paxos. Like Paxos, Raft requires a majority of servers to function. Unlike Paxos, Raft requires a single leader to receive all client requests. Any server can act as a leader and an election takes place to select a leader.

A server can be in a follower, leader, or candidate state. If it is the leader then it receives all client requests. If it is the follower then it is a non-leader and the system has a leader. If it is a candidate then we are holding an election to pick a leader and it may become a leader.

Raft elections

Leaders send periodic heartbeats to all followers. Servers start in a follower state. A server remains in a follower state as long as it receives valid heartbeat messages from a leader or a candidate. If it receives no heartbeat within a specific time interval, the follower may need to become a leader, so it becomes a candidate and runs an election by requesting votes from other servers. If a candidate receives a majority of votes from the other servers, it becomes the leader.

To start an election, each candidate picks a random election timeout. If it receives no heartbeat during that time, it sets its role to candidate, assumes there is no leader, and starts an election.

To start an election, a follower picks a random time interval before it makes itself a candidate. It then votes for itself and sends a request vote message to all other servers. If a server receives a “request vote” message and has not voted yet, it then votes for that candidate. When a candidate gets a majority of votes, it becomes the leader.

Each election marks a new term, identified by an incrementing number.

If a candidate receives a heartbeat from another server and that leader’s term number is at least as large as the candidate’s current term, then the candidate recognizes the leader as legitimate and becomes a follower. On the chance that the candidate does not win or lose an election, it simply times out and starts a new election. Randomized timeouts ensure that split votes rarely occur.

Raft consensus

The elected leader takes client requests. Each message sent by clients is a log entry that will be replicated among the servers. The leader first adds the message to its own log. This entry is considered to be uncommitted. The leader then sends the message to the other servers (the followers). Each follower acknowledges the message. When a majority of followers have acknowledged the message, the entry is applied to the state machine and considered to be committed. The leader then notifies the followers that the entry has been committed and they apply the message to their state machines. Consensus has now been achieved.

Locks vs. leases

When processes rely on locks, there is an implicit assumption that fault-tolerance is lost. The process holding a lock may forget to release a lock or may die. A way to avoid this problem is to use leases instead of locks. A lease is a lock with an expiration time. The downside with a leasing approach is that the resource is unavailable to others until the lease expires. Now we have a trade-off: have long leases with a possibly long wait after a failure or have short leases that need to be renewed frequently.

Paxos is a fault-tolerant way to achieve consensus and can be used for granting leases to resources. However, the overhead of the algorithm isn’t attractive for all situations. A compromise is to use hierarchical leases, or sub-leases. The top-level lease is a coarse-grained lease while sub-leases are typically fine-grained leases. A consensus algorithm such as Paxos is used to elect a coordinator. This coordinator is granted a lease on a set of resources (or all resources) in the system. Now the coordinator hands out leases to processes that need those resources. When the coordinator’s main lease expires, a consensus algorithm has to be run again to grant a new lease and possibly elect a new coordinator but it does not have to be run for every client’s lease request; that is simply handled by the coordinator.

Distributed transactions

Go here for lecture notes

Goal: Create a fault tolerant algorithm that ensures that all group members agree to commit (make their actions permanent). If agreement is not reached, then all group members must agree to abort (undo any changes).

A transaction is a set of operations that operates on, and typically modifies, data. A key facet of a transaction is that it has to be atomic — all results have to be made permanent (commit) and appear to anyone outside the transaction as an indivisible action. If a transaction cannot complete, it must abort, reverting the state of the system to that before it ran. If several transactions run concurrently, the end result must be the same as if those transactions ran in some (any) serial order. The specific order in which transactions execute is known as a schedule.

A transaction-based model guarantees a set of properties known as ACID:

Atomic : The transaction happens as a single indivisible action. Everything succeeds (and the transaction commits) or else the entire transaction is rolled back (the transaction aborts). Other transactions do not see intermediate results.

Consistent : A transaction cannot leave the data in an inconsistent state. If the system has invariants, they must hold after the transaction. For example, the total amount of money in all accounts must be the same before and after a “transfer funds” transaction.

Isolated (Serializable) : Transactions cannot interfere with each other. If transactions run at the same time, the final result must be the same as if they executed in some serial order.

Durable : Once a transaction commits, the results are made permanent. No failures after a commit will cause the results to revert.

A write-ahead log (or transaction log) is used to enable rollback: reverting to the previous state when aborting a transaction. It is also crucial in maintaining the state of the transaction in a stable place in case the process or computer should die. The system will be able to read the log and recover to where it was in the transaction commit protocol.

In a distributed transaction environment, multiple processes participate in a transaction, each executing its own sub-transaction that can commit only if there is unanimous consensus by all participants to do so. Each system runs a transaction manager, a process that is responsible for participating in the commit algorithm algorithm to decide whether to commit or abort its sub-transaction. One of these transaction managers may be elected as the coordinator and initiates and runs the commit algorithm. Alternatively, the coordinator could be a separate process from any of the transaction participants.

Two-phase commit protocol

The two-phase commit protocol uses atomic multicasts to reach a consensus among the group on whether to commit or abort. It uses a coordinator to send a request (“can you commit?”) to every member of the group (reliably, retransmitting as often as needed until all replies are received). Phase 1 is complete when every member of the group (each participant) responds. If the coordinator gets even a single abort response from a participant, it must tell all participants to abort the entire transaction. Otherwise, it will tell all participants to commit it. In phase 2, the coordinator sends the commit or abort order and waits for a response from everyone. In summary, in phase 1, the coordinator gets everyone’s agreement and in phase 2, the coordinator sends the directive to commit or abort.

The write-ahead log in stable storage is crucial for ensuring atomic multicasts (the write-ahead log is also important for transaction rollback, which is used for aborts!). For example, if a participant sent the coordinator a commit response for phase 1 and then died, it must be able to reboot and reconstruct the transaction state from the log; it cannot change its mind after rebooting. The two-phase commit protocol cannot proceed until every participant acknowledges every message.

The two phase commit stalls if any member, coordinator or any participant, dies. It has to wait for the recovery of that member before proceeding with the protocol. A recovery coordinator can step in in certain circumstances. If the coordinator died and a recovery coordinator took over, it queries the participants. If at least one participant has received a commit message then the new coordinator knows that the vote to commit must have been unanimous and it can tell the others to commit. If no participants received a commit message then the new coordinator can restart the protocol. However, if one of the participants died along with the coordinator, confusion may arise. If all the live participants state that they have not received a commit message, the coordinator does not know whether there was a consensus and the dead participant may have been the only one to receive the commit message (which it will process when it recovers). As such, the coordinator cannot tell the other participants to make any progress; it must wait for the dead participant to come back.

Three-phase commit protocol

The two-phase commit protocol is a blocking protocol. If the coordinator or any participant stops running, the entire protocol stalls until the failed process is restarted. The three-phase commit protocol is similar to the two-phase commit protocol but allows entities to time out in certain cases to avoid indefinite waits.

The protocol also supports the use of a recovery coordinator by introducing an extra phase where participants are told what the consensus was so that any participant that received this information before a coordinator died could inform a standby coordinator whether there was a unanimous decision to commit or abort. The three-phase commit protocol propagates the knowledge of the outcome of the election to all participants before starting the commit phase.

In phase 1 of the three-phase commit protocol, the coordinator sends a query to commit request (“can you commit?”) to every member of the group (reliably, retransmitting as often as needed until all replies are received over a period of time). If any of the participants respond with a no or if any failed to respond within a defined time, then send an abort to every participant.

In phase 2, the coordinator sends a agreed to commit message to all participants and gets acknowledgements from everyone. When this message is received, a participant knows that the unanimous decision was to commit. If a participant fails to receive this message in time, then it aborts. At this point, the participants do not commit. However, if a participant receives an abort message then it can immediately abort the transaction.

In phase 3, the coordinator sends a commit message to all participants telling them to commit. If a participant fails to receive this message, it commits anyway since it knows from phase 2 that there was the unanimous decision to commit. If a coordinator crashes during this protocol, another one can step in and query the participants for the commit decision. If every participant received the prepare-to-commit message then the coordinator can issue the commit directives. If some participants received the message, the coordinator now knows that the unanimous decision was to commit and can re-issue the prepare-to-commit request followed by a commit. If no participant received the message, the coordinator can restart to protocol or, if necessary, restart the transaction.

The three-phase commit protocol accomplishes two things:

  1. Enables use of a recovery coordinator. If a coordinator died, a recovery coordinator can query a participant.

    • If the participant is found to be in phase 2, that means that every participant has completed phase 1 and voted on the outcome. The completion of phase 1 is guaranteed. It is possible that some participants may have received commit requests (phase 3). The recovery coordinator can safely resume at phase 2.

    • If the participant was in phase 1, that means NO participant has started commits or aborts. The protocol can start at the beginning

    • If the participant was in phase 3, the coordinator can continue in phase 3 – and make sure everyone gets the commit/abort request

  2. Every phase can now time out – there is no indefinite wait like in the two-phase commit protocol.

    Phase 1:
    Participant aborts if it doesn’t hear from a coordinator in time
    Coordinator sends aborts to all if it doesn’t hear from any participant
    Phase 2:
    If coordinator times out waiting for a participant – assume it crashed, tell everyone to abort
    If participant times out waiting for a coordinator, elect a new coordinator
    Phase 3:
    If a participant fails to hear from a coordinator, it can contact any other participant for results

The three-phase commit protocol suffers from two problems. First, a partitioned network may cause a subset of participants to elect a new coordinator and vote on a different transaction outcome. Secondly, it does not handle fail-recover well. If a coordinator that died recovers, it may read its write-ahead log and resume the protocol at what is now an obsolete state, possibly issuing conflicting directives to what already took place. The protocol does not work well with fail-recover systems.

Paxos commit

The two-phase commit protocol requires all processes in the group to be available to complete the transaction. The three-phase commit introduces timeouts but does not work correctly in all situations. If the two-phase or three-phase protocols are enhanced for fault tolerance to elect an alternate coordinator, one runs into problems where different participants may have received messages from different coordinators.

Using Paxos for distributed commit enables us to have a fault-tolerant infrastructure that reaches agreement on commit or abort decisions. Multiple systems run the Paxos algorithm for fault tolerance. Each participant contacts its own instance of the Paxos algorithm on these systems to ensure that Paxos agrees on that participant’s commit or abort decision. Paxos then forwards that decision to learner processes, which decide on the overall commit or abort based on whether all participants sent a commit message (each learner gets the same data and hence reaches the same decision; the multiple learners are there for fault tolerance). They then notify each of the participants with the commit/_abort_ decision.

Brewer’s CAP Theorem

Ideally, we’d like three properties in a replicated distributed system:

Consistency
The data retrieved from the system should be the same regardless of which server is contacted.
Availability
The system should always be available to handle requests.
Partition tolerance
The system should continue to function even if some network links do not work and one group of computers cannot talk to another. For example, a link between two data centers may go down.

Eric Brewer’s CAP Theorem states that you can have either consistency or availability in the presence of partitions but not both. Quite often, the theorem is summarized as: if you want consistency, availability, and partition tolerance, you have to settle for at most two out of three of these.

With distributed architectures, we generally strive for high availability and the ability to withstand partitions (occasional breaks in the ability of nodes to communicate). Hence, we will have to give up on consistency and break the guarantees of ACID. An alternative to the requirements of ACID is BASE.

BASE stands for Basic Availability, Soft-state, Eventual consistency. Instead of requiring consistency after every transaction, it is enough for the data to eventually get to a consistent state. The downside is that some processes may access stale data which has not yet been brought into a consistent state.

Concurrency control

Go here for lecture notes

Goal: Allow multiple transactions to run concurrently but ensure that resource access is controlled to give results that are identical if they ran in some serial sequence. This preserves the “Isolated” guarantee of ACID transaction semantics.

The goal of concurrency control is to allow multiple transactions to run concurrently (that’s great for performance!) but to ensure that data access is controlled such that the net effect is the same as if the transactions all ran in some serialized order. That is, we cannot have the net result be one where transactions read an interim state of data from another transaction.

Exclusive locks, via a lock manager, are an easy way to accomplish this. A transaction can grab locks for the resources it needs. That ensures mutual exclusion. To ensure serializability, it is important that a transaction does not acquire any new locks after it has released a lock. If we do not do this, transaction A may create a result based on the completion of transaction B while transaction B might have used data that A wrote and unlocked. This technique is known as two-phase locking. The first phase is the growing phase, in which locks are acquired. The second phase is the shrinking phase, in which locks are released.

The problem with two-phase locking is that, if a transaction that has released some locks now aborts, there is a chance that other transactions have used data that was modified by the transaction. In that case, those transactions (and all transactions that depend on them) have to abort as well. This condition is called cascading aborts. Strict two-phase locking avoids this problem by requiring all locks to be held until the end. The shrinking phase, in effect, is an atomic operation that occurs at the very end of the transaction. You lose concurrency this way but avoid having to process cascading aborts.

Exclusive locking for every resource access is a bit aggressive. Consider a transaction that just reads data. It needs to lock that data to ensure that no other transaction modifies it but nothing would go wrong if another transaction also wanted to read that same data. We can achieve greater concurrency by distinguishing read locks from write locks. Read locks need not be exclusive: any number of them can be granted. However, if there are any read locks on an object, a write lock cannot be granted and the transaction must wait. Similarly, if there is a write lock on an object then any read lock requests or write lock requests must wait.

A way of increasing concurrency even beyond read/write locks is two-version-based concurrency control, also known as multiversion concurrency control. In this case, one transaction writes tentative versions (private versions) while other transactions read existing, previously committed versions of the data. This allows transactions to request read locks even if another transaction has a write lock on the object. When the transaction that has a write lock is ready to commit, it converts its write lock to a commit lock, waits until all transactions that have a read lock for that object complete, and then makes its modified version permanent. During a commit lock, no other transaction can grab any lock on that object. This allows increased concurrency but transactions that write data risk waiting when they attempt to commit and make their data permanent.

Concurrency control techniques that rely on locking force locks to be held while the transaction is using the resource, or, in some cases, until the end of the transaction. This restricts the maximum amount of concurrency that may be achieved in the system. Optimistic concurrency control techniques assume that transactions are more likely to complete than not. As such, it is better to put more effort on rectifying errors from having used data from aborted transactions than to lock access to the data. A fully optimistic system uses no locks and checks for conflicts at commit time. Optimistic concurrency control has three phases of operation:

  1. Working phase. The transaction reads and writes data. A private workspace is often, but not always, used to keep non-committed results isolated.

  2. Validation phase. When the transaction is ready to commit, a check is made to see if there is any conflict with other transactions that took place during this time. For example, if some transaction A modified data and committed but transaction B read data before A modified that data, then transaction B cannot be allowed to commit because that would violate serializability.

  3. Update phase. Tentative changes are made permanent and the transaction commits.

Timestamp ordering allows concurrency by keeping track of two timestamps per object: the timestamp of the last committed that read the object and the timestamp of the last committed transaction that wrote the object. If a transaction wants to write an object, it compares its own timestamp with the object’s write timestamp. If the object’s timestamp is older then we have good ordering and the transaction can proceed. Otherwise the transaction aborts and is restarted.

Distributed Deadlock

Go here for lecture notes

Goal: Determine if a transactions will wait for resources in such a manner as to create an indefinite wait, called deadlock. Find ways to ensure that this will not happen.

A deadlock occurs when there is a circular dependency on processes holding and requesting resources. The four conditions that must hold are:

  1. mutual exclusion: A resource can be held by at most one process.
  2. hold and wait: Processes that already hold resources can wait for another resource.
  3. non-preemption: A resource, once granted, cannot be taken away.
  4. circular wait: Two or more processes are waiting for resources held by one of the other processes.

Three approaches can be used for managing deadlock in distributed systems.

Detection
A centralized deadlock detection approach uses a central coordinator to manage a resource graph of processes and the resources they are using. Each time a process wants to grab or releases a resource, it sends a message to this coordinator (waiting-for or releasing). The coordinator builds a graph of all the processes and the resources they are holding or waiting for. This is called a wait-for graph. If a cycle is detected, in the graph then the coordinator knows a deadlock exists. In some cases, if release and waiting-for messages are received out of order, they can lead the coordinator to believe that there is a deadlock cycle when none really exists. In reality the release message should have been processed first and would cause the deadlock to not happen. This condition is known as false deadlock.

The Chandy-Misra-Haas distributed deadlock detection algorithm has a process send a probe message to a process that is holding a resource prior to waiting for the resource. The receiving process forwards the probe to every process that contains resources it is waiting for. This is called edge chasing. If the original process receives its own probe message then it knows that a dependency cycle, and hence deadlock, will exist if it waits for the resource it wants.

Prevention
Deadlock prevention approaches require processes to access resources in restricted ways to ensure that a deadlock cannot occur. The approach to implementing this is to make decisions based on the timestamps of each transaction competing for resources.

The wait-die algorithm states that if a younger process is using the resource, then the older process (that wants the resource) waits. If an older process is holding a resource, then the younger process that wants the resource kills itself (that’s ok; transactions are designed to be restartable). This forces the resource utilization graph to be directed from older to younger processes, making cycles impossible.

The wound-wait algorithm ensures that the graph flows from young to old and cycles are again impossible. an old process will preempt (kill) the younger process that holds a resource. If a younger process wants a resource that an older one is using, then it waits until the old process is done.

Avoidance
This requires predicting the precise resources that will be needed, the times they will be needed, and which processes will need them and manage resource allocation or transaction scheduling to ensure that this will not happen. This is generally impossible to predict and hence is not a practical approach.

Network Attached Storage

Go here for lecture notes

Goal: Allow multiple clients to access files from file servers on a network.

To provide the same system call interface for supporting different local file systems as well as remote files, operating systems generally rely on a layer of abstraction that allows different file system-specific interfaces to coexist underneath the common system calls. On most Unix-derived systems (e.g., Linux, BSD, OS X, SunOS), this is known as the VFS layer (Virtual File System).

There are a couple of models for implementing distributed file systems: the download/upload model_ or the remote procedure call_ model. In a stateful file system, the server maintains varying amounts of state about client access to files (e.g., whether a file is open, whether a file has been downloaded, cached blocks, modes of access). In a stateless file system, the server maintains no state about a client’s access to files. The design of a file system will influence the access semantics to files. Sequential semantics are what we commonly expect to see in file systems, where reads return the results of previous writes. Session semantics occur when an application owns the file for the entire access session, writing the contents of the file only upon close, thereby making the updates visible to others after the file is closed, and overwriting any modifications made by others prior to that.

NFS

NFS was designed as a stateless, RPC-based model implementing commands such as read bytes, write bytes, link files, create a directory, and remove a file. Since the server does not maintain any state, there is no need for remote open or close procedures: these only establish state on the client. NFS works well in faulty environments: there’s no state to restore if a client or server crashes. To improve performance, a client reads data a block (8 KB by default) at a time and performs read-ahead (fetching future blocks before they are needed). NFS suffers from ambiguous semantics because the server (or other clients) has no idea what blocks the client has cached and the client does not know whether its cached blocks are still valid. NFS uses validation, where the client compares modification times from server requests to the times of data that it cached. However, it does this only if there are file operations to the server. Otherwise, the client simply invalidates the blocks after a few seconds. In NFS’s original stateless design, File locking could not be supported since that would require the server to keep state. It was later added through a separate lock manager that maintained the state of locks.

To facilitate larger deployments, NFS introduced the automounter. It was common to have an environment with many clients, each mounting many remote file systems. In such an environment, if all clients start up at approximately the same time, they can flood the server with mount requests. The automounter mounts remote directories only when they are first accessed. To make keeping track of mount points easier across many machines, automounter maps are configuration files that define what remote directories get mounted. These can be distributed across a set of clients.

AFS

AFS was designed as an improvement over NFS to support file sharing on a massive scale. NFS suffered because clients would never cache data for a long time (not knowing if it would become obsolete) and had to frequently contact the server. AFS introduced the use of a partition on a client’s disk to cache large amounts of data for a long time: whole file caching and long-term caching. It supports a file download-upload model. The entire file is downloaded on first access (whole file download) and uploaded back to the server after a close only if it was modified. Because of this behavior, AFS provides session semantics: the last one to close a modified file wins and other changes (earlier closes) are lost.

During file access, the client need never bother the server: it already has the file. When a client first downloads a file, the server makes a callback promise: it maintains a list of each client that has downloaded a copy of a certain file. Whenever it gets an update for that file, the server goes through the list and sends a callback to each client that may have a cached copy so that it can be invalidated on the client. The next time the client opens that file, it will download it from the server. Files under AFS are shared in units called volumes. A volume is just a directory (with its subdirectories and files) on a file server that is assigned a unique ID among the cell of machines (remember cells from DCE RPC?). If an administrator decides to move the volume to another server, the old server can issue a referral to the new server. This allows the client to remain unaware of resource movement.

Coda

Coda was built on top of AFS and focused on two things: supporting replicated servers and disconnected operation. To support replicated storage, AFS’s concept of a volume was expanded into that of a Volume Storage Group (VSG). Given that some volumes may be inaccessible at a particular point in time, the Accessible Volume Storage Group (AVSG) refers to the subset of the VSG that the client can currently access. Before a client accesses a file, it first looks up the replicated volume ID of the file to get the list of servers containing replicated volumes and the respective local volume IDs. While it can read files from any available server, it first checks the versions from all of them to ensure that one or more servers don’t have out-of-date files. If the client discovers that a server has an old version of a file, it initiates a resolution process by sending a message to that server, telling it to update its old versions. When a client closes a file, if there were any modifications, the changes are written out to all available replicated volumes.

If no servers are available for access, the client goes into disconnected operation mode. In this mode, no attempt is made to contact the server and any file updates are logged instead in a client modification log (CML). Upon connection, the client plays back the log to send updated files to the servers and receive invalidations. If conflicts arise (e.g., the file may have been modified on the server while the client was disconnected) user intervention may be required.

Because there’s a chance that users may need to access files that are not yet in the local cache, Coda supports hoarding, which is a term for user-directed caching. It provides a user interface that allows a user to look over what is already in the cache and bring additional files into the cache if needed.

DFS (AFS version 3)

AFS evolved over the years. The most significant evolutionary version is version 3, which was adopted as the recommended distributed file system in the Distributed Computing Environment (DCE) where it is named DFS (Distributed File System).

The primary design goal of this system was to avoid the unpredictable lost data problems of session semantics if multiple clients are modifying the same file. The concept of tokens was introduced. A token is permission given by the server to the client to perform certain operations on a file and cache a file’s data. The system has four classes of tokens: open, data, status, and lock tokens. An open token must be obtained to have permission to open a file. A read data token must be obtained for a byte range of a file to have permission to access that part of the file. Similarly, a write data token is needed to write the file. Status tokens tell the client that it may be able to cache file attributes. These tokens give the server control over who is doing what to a file. Tokens are granted and revoked by the server. For example, if one client needs to write to a file then any outstanding read and write data tokens that were issued to any clients for that byte range get revoked: those clients are now denied access until they get new tokens.

SMB/CIFS

Microsoft’s Server Message Block protocol was designed as a connection-oriented, stateful file system with a priority on file consistency and support of locking rather than client caching and performance. While it does not use remote procedure calls, its access principle is the same: requests (message blocks) are functional messages, providing file access commands such as open, create, rename, read, write, and close.

With the advent of Windows NT 4.0 and an increasing need to provide improved performance via caching, Microsoft introduced the concept of opportunistic locks (oplocks) into the operating system. This is a modified form of DFS’s tokens. An oplock tells the client how it may cache data. It allows clients to cache information without worrying about changes to the file at the server. At any time, a client’s oplock may be revoked or changed by the server. The mechanism has been extended since Windows 7. There are currently eight oplocks (do not memorize thede but have an understanding of what they do):

  1. A level 1 oplock (exclusive oplock) provides the client with exclusive access to the file (nobody else is reading or writing it), so it can cache lock information, file attributes, and perform read-aheads and write-behinds.

  2. A level 2 oplock (shared oplock) is granted in cases where multiple processes may read a file and no processes are writing to it.

  3. A batch oplock is also exclusive and allows a client to keep a file open on the server even if the local process using it closed the file. This optimizes cases where a process repeatedly opens and closes tehe same files (e.g., batch script execution).

  4. A filter oplock is exclusive and allows applications that hold the oplock to be preempted whenever other processes or clients need to access the file.

  5. A read oplock (R) is a shared oplock that is essentially the same as a level 2 oplock. It supports read caching.

  6. A read-handle oplock (RH) allows multiple readers and keeps the file open on the server even if the client process closes it. It is similar to the batch oplock but is shared and does not support file modifications. It supports read caching and handle caching.

  7. A read-write oplock (RW) gives a client exclusive access to the file and supports read and write caching. It is essentially the same the the level 1, or exclusive, oplock.

  8. A read-write-handle oplock (RWH) enables a client to keep a file open on the server even if the client process closes it. It is exclusive and similar to the batch oplock.

The last four oplocks have been added since Windows 7 and are somewhat redundant with earlier mechanisms.

Oplocks may be revoked (broken) by the server. For example, if Process A has a read-write oplock, it can cache all read and write operations, knowing that no other client is modifying or reading that file. If another client, Process B, opens the file for reading, a conflict is detected. Process B is suspended and Process A is informed of the oplock break. This gives Process A a chance to send any cached changes to the server before Process B resumes execution.

SMB 2 and beyond

The SMB protocol was known to be chatty. Common tasks often required several round-trip messages. It was originally designed for LANs and did not perform optimally either on wide area networks (WANs) or on today’s high-speed LANs (1–100 Gbps). The SMB protocol was dramatically cleaned up with the introduction of the Microsoft Vista operating system (SMB 2), with minor changes added in Windows 7 (SMB 2.1) and even more in Windows 8 (SMB 3). Apple has dropped its proprietary AFP protocol in favor of SMB 2 in OS X 10.10 (Mavericks). We will focus our discussion on the significant changes introduced in SMB 2.

SMB 2 added six key changes to its design:

Reduced complexity
The set of SMB commands went from over 100 commands down to 19.
Pipelining
Pipelining is the ability to send additional commands before the response to a prior command is received. Traditionally, SMB (and any RPC-based system) would have to wait for one command to complete before sending an additional command. The goal of pipelining is to keep more data in flight and use more of the available network bandwidth.

To give the server control over getting an overly high rate of client requests, credit-based flow control is used. With credit-based flow control, the server creates a small number of credits and later increases this number as needed. The server sends these credits to each client. The client needs credits to send a message to the server. Each time a message is sent, it decrements its credit balance. This allows server to control the rate of messages from any client and avoid buffer overflow.

Note that TCP uses congestion control, which results in data loss and wild oscillations in traffic intensity (TCP keeps increasing its transmission window size until packet loss occurs; then it cuts the value of the buffer in half and starts increasing again).

Compounding
Compounding is similar to pipelining but now allows multiple commands to be sent in one message. It avoids the need to optimize the system by creating commands that combine common sets of operations. Instead, one can send an arbitrary set of commands in one request. For instance, instead of the old SMB RENAME command, the following set of commands are sent: CREATE (create new file or open existing); SET_INFO; CLOSE. Compounding reduces network time because multiple requests can be placed within one message.
Larger read/write sizes
Fast networks can handle larger packet sizes and hence transfer larger read and write buffers more efficiently.
Improved caching
SMB 2 improved its ability to cache folder and file properties. This avoids messages to the server to retrieve these properties.
Durable handles
If there was a temporary network disconnection, An SMB client would lose its connection to the server and have to reestablish all connections and remount all file systems and reopen all files. With SMB 2, the connection has to be reestablished but all handles to open files will remain valid.

NFS version 4

While NFS version 3 is still widely used, NFS version 4 introduced significant changes and is a departure from the classic stateless design of NFS. The server is now stateful and is able to control client-side cache coherence better, allowing clients to cache data for a longer time. Servers can grant clients the ability to do specific actions on a file to enable more aggressive client caching. This is similar to SMB’s oplocks. Callbacks will notify a client when file or directory contents have changed. Because of the stateful server, NFS now acquired open and close operations.

Like SMB 2, NFS now supports compound RPC, where multiple operations can be grouped together into a single request. Sending one message instead of a series of messages reduces overall round-trip time significantly.

NFSv4 also added strong authentication and encryption and support file system replication and migration. This includes a mechanism of sending referrals similar to that used by AFS.

Chubby

Go here for lecture notes

Goal: Create a highly-available centralized lease manager and file system for small files that can serve as a name server and configuration repository.

Chubby is a highly available and persistent distributed lock service and file system created by Google and used in various projects and frameworks, including Bigtable and MapReduce. Its purpose is to manage locks for resources and store configuration information for various distributed services throughout the Google cluster environment. The key design goal of Chubby was high availability and high reliability.

By default, the service runs on five computers as five active replicas. This grouping is called a Chubby cell. One of these replicas is elected as the master to serve client requests. Only the master will serve client requests; the other machines in the cell are for fault tolerance in case the master dies. The only request they will answer from clients is to tell them who the master is.

A majority of the replicas in the cell must be running for the service to work. Paxos is used as a leader election algorithm and is used to keep the replicas consistent (i.e., make sure that all updates take place in the same order on all replicas). Typically, there will be one Chubby cell per datacenter.

In addition to providing locks, Chubby is designed for managing relatively small amounts of data: items such as system configuration and state information for various services. Chubby provides clients with a namespace of files & directories. Every file or directory can be treated as a lock file and every file may, of course, be used to store data. The name of a lock is the name of its file: its hierarchical pathname. The interface to Chubby is not that of a native file system. There is no kernel module and client software communicates with Chubby via an API that sends remote procedure calls to the Chubby master.

File operations are somewhat different from those offered by conventional file systems. Files can be read and written only in their entirety: there are no seek or byte-range read and write operations. When a file is opened by the client, it is downloaded and a lease for that file is established. In this way, Chubby keeps track of which clients have cached copies of a file. All writes from a client must be sent to the Chubby master (we have a write-through cache). Chubby then sends invalidations to all clients that have cached copies.

Locks are advisory and can be either exclusive (one writer) or not (multiple readers). A client can send a request to acquire a lock for a file, release it, and also assign and check sequence numbers for a lock. Clients can also subscribe to receive events for an open file. These events include notification of file modification, the creation of directories or files under an open directory, and lock acquisition.

Chubby is designed primarily for managing coarse-grained locks. Fine-grained locks are locks that are generally used for a small object, such as a row in a table of a database. They are generally held held for a short duration, seconds or less. Coarse-grained locks typically control larger structures, such as an entire table or an entire database. They might be held for hours or days. They are acquired rarely compared to fine-grained locks. Hence, a server that is designed for coarse-grained locks can handle more clients.

Even though Chubby uses the term “coarse-grained”, it doesn’t exactly fit a “pure” coarse/fine grained lock model, but that model does not necessarily work well in real systems. In theory, the concept of a coarse-grained lock is to grant a process a lock for a large pool of objects of a particular class and then have that process be the lock manager for those objects. This is essentially what Chubby does but Chubby doesn’t keep track of all the objects. Instead, the top (coarse) level is a set of services, such as Bigtable tables, a GFS file system, or Pregel frameworks. Chubby allows them to ensure there is a single master and to lock any critical resources that might be shared among these applications. Those applications then handle the fine-grained aspect of giving out locks for data blocks, table cells, or synchronizing communication barriers.

Because Chubby does not hold huge amounts of data but may serve thousands of clients, all Chubby data is cached in memory. For fault tolerance, all data is written through to the disk when modified, and is propagated to replicas via the Paxos consensus algorithm to ensure consistent ordering. The entire database is backed up to the Google File System (GFS) every few hours to ensure that critical configuration information is saved even if all replicas die.

Chubby’s fault-tolerant locking makes it a good service for leader election. If a group of processes wants to elect a leader, they can each request a lock on the same Chubby file. Whoever gets the lock is the leader (for the lease period of the lock).

Parallel file systems

Go here for lecture notes

Goal: Store huge files (terabyte and larger) in a file system that is distributed across many (thousands) of machines.

Conventional file systems lay out an entire file system on one logical storage device (a disk or NAND flash memory). This means that the metadata and data for all files and directories is located in this one device. In some cases, the device might be a collection of disks (e.g., a RAID drive or several disks connected by a volume manager) but, to the operating system, it appears as one device – one collection of blocks. This places a restriction on each file system that it cannot span one machine. Conventional distributed file systems, used for network attached storage (NAS), basically use a file system access protocol but interact with one or more servers running these conventional file systems.

With parallel file systems, the metadata and data of files are separated onto different entities. The metadata of a file is the set of attributes about a file: its name, size, access permissions, time stamps, etc. The data of a file is, well, the file’s data. Metadata does not take up much space. It’s a small set of attributes and each attribute is generally only a few bytes in size. Data, however, can be huge: terabytes or more for large files.

The idea of a parallel file system is to have a system – or systems – manage the metadata. As part of the metadata for each file, the system will identify the systems and logical blocks holding the file’s data. By doing this, we gain several benefits:

  1. A file’s data is no longer limited to the capacity of any one system and can be spread out across many systems.

  2. Because data blocks are distributed among multiple systems, access is effectively load balanced across them. Moreover, multiple systems can process different file or different parts of a file with read operations taking place on different systems. This provides scalable bandwidth, comparable to striping in storage arrays.

  3. Data blocks can be replicated on multiple systems, providing fault tolerance and the potential for greater read bandwidth since processes can read from any of the replicated data blocks.

This design approach is the basis for the Google File System (GFS), the Hadoop Distributed File System (HDFS, essentially a clone of GFS), and distributed file systems used in supercomputing clusters, such as Luster and GlusterFS. We will not look at these last two but the concepts are similar.

Google File System (GFS)

The Google File System is designed to support large data-intensive applications (the kind of algorithms Google uses for search and other services). The system is designed for an environment where there are many thousands of storage servers, some of which are expected to be down at any given point in time. The data that is managed comprises of billions of objects and many petabytes of data. GFS is not designed to be a general-purpose file system and is optimized for large files (rather than lots of small files), streaming reads (more common than writes), and atomic appends (which may be done concurrently by multiple clients).

Each GFS cluster contains one master file server. This is a faster and more reliable machine that manages file system metadata, such as names and attributes of files and directories. None of the actual file data resides on this system. Instead, it contains a mapping of the file contents to the chunks that hold the data. Chunks are fixed-size, 64 MB blocks and are stored in chunkservers. The chunkservers are less reliable than the master and are replicated so that each chunk is stored on typically three three separate chunkservers.

Clients contact the master to look up files. The master gives them a chunkserver name and chunk handles for the files requested.

To write to a file, the master grants a chunk lease to one of the replicated chunks. The chunkserver holding this is the primary replica chunkserver for that chunk. Writing is a two-stage process. First, the client writes to one replica. That replica then forwards the data to another replica chunk server, and so on until all replicas for that chunk receive the data. This relieves load from the client in that the client has to only write one copy. It also doesn’t put any one computer within GFS in the position of having to send out N copies of the data.

Once all replicas acknowledge receipt of the data, the second stage, writing, takes place. The client sends a write request to the primary chunkserver, identifying the data that was sent. The primary assigns a sequence to the write operations that take place on the data and sends write requests to all the replicas so that they will write data in the same sequence-number order. Note that the data flow (data transfer) from client is chained: it goes to the primary replica, then to secondary replica, then another secondary replica, etc. The control flow (write commands) goes from the primary directly to all of the secondaries (but is a much, much smaller message).

Hadoop Distributed File System

Apache’s Hadoop Distributed File System (HDFS) is incredibly similar to GFS and is designed with essentially the same goals. The key distinction is that it does not support modifying a file once it is created, but it does support appends. This avoids the need to manage leases or locks for the file.

Like GFS’s master, HDFS uses a separate server, called a NameNode, that is responsible for keeping track of the filesystem namespace and the location of replicated file blocks. File data is broken into 128 MB blocks (default, but configurable per file) and is stored on DataNodes (GFS’s chunkservers). Each block (GFS chunk) is replicated on multiple DataNodes (the default is three, as with GFS). As with GFS, all file system metadata, names and block maps, is stored in memory on the DataNode for performance. File writes from clients are pipelined through each of the replica DataNodes that will hold the data.

HDFS uses rack-aware logic for determining which computers should host replicas for a block. The first replica is targeted to a computer in the same rack as the requesting client (to minimize latency). Second and additional replicas come from other racks in the data center for fault tolerance in case the first rack fails. The process for writing a block is essentially the same as in GFS. Data writes are pipelined to get data onto the replicas and then an update command is sent by the primary replica to all the others to write the block (this is essentially a commit operation, as it makes the data permanent).

Dropbox: Cloud-based file synchronization

Go here for lecture notes

Goal: Provide an Internet service that synchronizes part of a user’s file system to remote servers. Any changes are propagated back to any devices that a user uses for synchronization. In this manner, multiple user computers can keep their data synchronized.

Dropbox is one of the first of a set of popular services that provide “cloud-based file storage” – a service that allows you to store data on remote servers and access it from anywhere. Dropbox does this in a manner that is largely transparent to the user. The user designates a directory (aka folder) that will remain in sync with the server. Any local changes will be propagated to the server and any changes from the server will be propagated to the local computer. Multiple computers can thus keep data in sync, with changes on one computer being sent to dropbox servers and then synchronized on other computers.

The Dropbox service started in 2008 and grew rapidly in popularity and, hence, the amount of users and data. It serves as a good case study in scaling a system. Currently (2012), Dropbox handles over a 100 million users synchronizing a billion files each day. One way that Dropbox differs from other data access services such as Twitter, Facebook, Reddit, and others is that those services have an extremely high read to write ratio. Any content that is created is usually read many, many times. With Dropbox, in contrast, the read to write ratio is close to 1. Because the primary use case is file synchronization, remote data is rarely accessed except to synchronize it with other machines. As such, Dropbox has to deal with an inordinately high number of uploads.

Dropbox began life as one server, using a mySQL database to keep track of users and their files. As the server ran out of disk space, all file data was moved over to use Amazon’s S3 service (Simple Storage Service; a web services interface to store and retrieve objects where each object is identified with a unique key). Dropbox ran a database server that stored all the metadata (information about the file) and a server that provided a web portal and interacted with client software. At this time, each client ran a process that would look through a directory to identify changed files and send them to dropbox. In addition, the program polled the server periodically to see if there are any changes that need to be downloaded.

Tens of thousands of servers all polling a server to see if anything changed generates considerable load, so the next design change was to add a notification server. Instead of having a client ask periodically, the notification server sends a message telling the client that there are changes. Since clients may be behind firewalls and it may not be possible for a notification server to connect to them, the notification server relies on having the client establish a TCP connection to it. The single server that handled all requests was also split in two: a metadata server handled everything related to information about users and files while a blockserver, hosted at Amazon, handled everything having to do with reading and writing file data.

As the number of users grew even larger, a two-level hierarchy of notification servers was added since each server could manage only around one million connections. The metadata server, block server, and database were also replicated and load balanced.

Distributed Lookup Services (Distributed Hash Tables)

Go here for lecture notes

Goal: Create a highly-scalable, completely decentralized key-value store

The purpose of a distributed lookup service is to find the computer that has data that corresponds to a key that you have. For example, the key can be the name of the song and the computer is one that is hosting the MP3 file, or the key can be your customer ID and the corresponding data is the contents of your shopping cart.

The most straightforward approach is to use a central coordinator to store all the keys and their mapping to computers. You ask the server to look up a key and it gives you the computer containing the content. This is the Napster implementation, which was one of the first peer-to-peer file sharing services. A coordinator served as the web service that kept track of who was hosting which music files for download but the files themselves were hosted by the community of users. The Google File System, GFS, also uses a central coordinator although the corresponding data for a key (file) is distributed among multiple chunkservers.

Another approach is to “flood” the network with queries. What happens here is that your computer knows of a few peer computers, not the entire collection. It sends the request to these computers. Those computers, in turn, know of other computers. When a computer needs to look up a key, it forwards the request to all of its peer nodes. If a peer does not have the content, it repeats the process and forwards the request to its peers (and they forward the request to their peers …). A time to live (TTL) value in the request is decremented with each hop and the request is no longer forwarded if the hop count drops below zero. If one of the peers has the content, then it responds to whoever sent it the request. That node, in turn sends the response back to its requestor and the process continues back until the originator of the query gets the response. This is called back propagation. This approach was used by the Gnutella file sharing system.

Flooding uses what is known as an overlay network. An overlay network is a logical network that is built on top of another network. Typically, computers on the network have knowledge of only some of the other computers on the network and will have to use these “neighbor” nodes to route traffic to more distant nodes.

Finally, the distributed hash table (DHT) approach is a set of solutions that is based on hashing the search key to a number that is then used to find the node responsible for items that hash to that number. A key difference between the DHT approach and the centralized or flooding approaches is that the hash of the key determines which node is responsible for holding information relating to the key.

Consistent hashing

Conventional hash functions require most keys to be remapped if the size of the hash table changes: that is, keys will usually hash to a different value when the size of the table changes. For a distributed hash, this will be particularly problematic. It would mean that if we remove or add a node to a group of computers managing a DHT, a very large percentage of data will have to migrate onto different systems. A consistent hash is a hashing technique where most keys will not need to be reamapped if the number of slots in the table changes. On average, only k/n keys will need to be remapped for a system where k is the number of keys and n is the number of slots in the table.

CAN: Content-Addressable Network

CAN implements a logical x-y grid (although it can be applied to an arbitrary number of dimensions). A key is hashed by two hashing functions, one for each dimension (e.g., an x-hash and a y-hash). The result of these hashes identifies an x, y point on the grid. Each node is responsible for managing all keys that are located within a rectangle on the grid; that is, a node will be responsible for all keys whose x-hash falls in some range between xa and xb and whose y-hash falls in some range between ya and yb. This rectangle is known as a zone. If a node is contacted with a query for a key, it will hash the key and determine if it is present within its zone. If it falls within the node’s zone, the node can satisfy the request. If the hashed values are out of range, the node will forward the request to one of four neighbors (north, east, south, or west), which will then invoke the same logic to either process the query of forward it onto other nodes. The average route for a system with n nodes is O(sqrt(n)) hops.

CAN scales because each zone can be split into two, either horizontally or vertically, and the process can be repeated over and over as needed. For each split, some of the keys will have to move to the new node. For fault tolerance, key, value data can be replicated on one or more neighboring nodes and each node will know not just its neighbors but its neighbors’ neighbors. If a node is not reachable, the request will be sent to a neighboring node. Even though we discussed CAN in two dimensions, it can be implemented in an arbitrary number of dimensions. For d dimensions, each node needs to keep track of 2d neighbors.

Chord

Chord constructs a logical ring representing all possible hash values (bucket positions). Note that for hash functions such as a 160-bit SHA–1 hash, this is an insanely huge value of approximately 1.46 x 1048 slots. Each node in the system is assigned a position in the huge logical ring by hashing its IP address. Because the vast majority of bucket positions will be empty, (key, value) data is stored either at the node to which the key hashes (if, by some rare chance, the key hashes to the same value that the node’s IP address hashed) or on a successor node, the next node that would be encountered as the ring is traversed clockwise. For a simple example, let us suppose that we have a 4-bit hash (0..15) and nodes occupying positions 2 and 7. If a key hashes to 4, the successor node is 7 and hence the computer at node 7 will be responsible for storing all data for keys that hash to 4. It is also responsible for storing all data to keys that hash to 3, 5, 6, and 7.

If a node only knows of its clockwise neighbor node, then any query that a node cannot handle will be forwarded to a neighboring node. This results in an unremarkable O(n) lookup time for a system with n nodes. An alternate approach is to have each node keep a list of all the other nodes in the group. This way, any node will be able to find out out which node is responsible for the data on a key simply by hashing the key and traversing the list to find the first node ≥ the hash of the key. This gives us an impressive O(1) performance at the cost of having to maintain a full table of all the nodes in the system on each node. A compromise approach to have a bounded table size is to use finger tables. A finger table is a partial list of nodes with each element in the table identifying a node that is a power of two away from the current node. Element 0 of the table is the next node (20 = 1 away), element 1 of the table is the node after that (21 = 2 away), element 2 of the table four nodes removed (22), element 3 of the table eight nodes removed (23), and so on. With finger tables, O(log n) nodes need to be contacted to find the owner of a key.

Distributed hash table case study: Amazon Dynamo

Go here for lecture notes

Goal: Create a practical implementation of a key-value store that is highly scalable, completely decentralized, replicated, and efficient.

Dynamo is a highly available key-value store created internally within Amazon. Many services within Amazon, such as best seller lists, shopping carts, user preferences, session management, etc., neeed only primary key access to data. A multi-table relational database system would be overkill and would limit scalability and availability. With Dynamo, an application can configure its instance of Dynamo for desired availability (# replicas) and consistency.

Dynamo supports two operations: get(key) and put(key, data, context). The data is an arbitrary bunch of bytes (typically less than 1 MB) and is always identified by a unique key. The (key, data) values are distributed over a large set of servers and replicated for availability. The context is a value that is returned with a get operation and then sent back with a future put operation. The context is meaningless to the user program and is used like a cookie. Internally, it contains versioning information.

The system is decentralized: there is no need for a central coordinator to oversee operations. Every node has equivalent responsibilities and the system can grow dynamically by adding new nodes.

Partitioning: scalability

Since massive scalability is a key design aspect of Dynamo, data storage is distributed among many nodes. Dynamo relies on consistent hashing to identify which node will be responsible for a specific key. With normal hashing, a change in the number of slots in the table (e.g., number of nodes since we are hashing to find a node) will result in having to remap all keys. With consistent hashing, this is not the case and only K/n keys need to be remapped, where K is the number of keys and n is the number of slots (nodes).

Nodes are arranged in a logical ring. Each node is assigned a random “token” (a number) that represents its position in the logical ring. It is responsible for all keys that hash to any number between the node’s number and its predecessor’s number. When a key is hashed, a node will traverse a logical ring of node values to find the first node with a position greater than or equal to that hashed result. This tells it which node is responsible for handling that key. Since nodes are not added that frequently, every node can store a cached copy of this ring (the size is the list of N hash values, where N is the number of nodes). Moreover, to avoid the extra hop resulting because a client sent a request to the wrong Dynamo node, a client may also get a copy of this table. The system has been described as a zero-hop distributed hash table (DHT).

Adding or removing nodes affects only the node’s immediate neighbor in the ring. If a node is added, the successor node will give up some range of values to the new node. If a node is removed, the key, value data it manages will have to be moved over to the successor. In reality, a physical node is assigned to manage multiple points in the ring (multiple ranges of hash values). Thes nodes in the algorithm that we just discussed are really virtual nodes. A real machine, or physical node, will be responsible for managing multiple virtual nodes. The advantage of doing this is that we can attain balanced load distribution. If a machine becomes unavailable, the load is evenly distributed among the available nodes. If a new machine is added, each virtual node within it will take a set of values from other virtual nodes, effectively taking on a small amount of load from a set of other machines instead of from just the neighboring system (successor node). Moreover, if a more powerful machine is added, it can be assigned a larger number of virtual nodes so that it bears a greater percentage of the query workload.

Replication: availability

Dynamo is designed as an always writable data store. Data can be replicated on N hosts, where the value N is configurable per instance of Dynamo. The node that is responsible for storing the key-value is assigned the role of coordinator for that key and is in charge of replication for that key. The coordinator copies (key, value) data onto N–1 successor nodes clockwise in the ring. Copying is asynchronous (in the background) and Dynamo provides an eventually consistent model. This replication technique is called optimistic replication, which means that replicas are not guaranteed to be identical at all times. If a client cannot contact the coordinator, it sends the request to a node holding a replica, which will then periodically try to contact the coordinator.

Because data is replicated and the system does not provide ACID guarantees, it is possible that replicas may end up holding different versions. Dynamo favors application-based conflict resolution since the application is aware of the meaning of the data associated with the key and can act upon it intelligently. For example, the application may choose to merge two versions of a shopping cart. If the application does not want to bother with this, the system falls back on a last write wins strategy. Dynamo uses vector clocks to identify versions of stored data and to distinguish between versions that have been modified concurrently from versions that are causally related (a later one is just an update of an earlier one). For example, one can identify whether there are two versions of a shopping cart that have been updated independently or whether one version is just a newer modification of an older shopping card. A vector clock value is a set of (node, counter) pairs. This set of values constitutes the version of the data. It is returned as the “context” with a get operation and updated and stored when that context is passed as a parameter to a put operation.

Because Dynamo is completely decentralized and does not rely on coordinator (unlike GFS, for example), each node serves three functions:

  1. Managing get and put requests. A node may act as a coordinator responsible for a particular key or may forward the request to the node that has that responsibility.

  2. Keeping track of membership and detecting failures. A node will detect if other nodes are out of service and keep track of the list of nodes in the system and their associated hash ranges.

  3. Local persistant storage. Each node is responsible for being either the primary or replica store for keys that hash to a certain range of values. These (key,value) pairs are stored within that node using a variety of mechanisms depending on application needs. These include the Berkeley Database Transactional Data Store, MySQL (for large objects), or an in-memory buffer with persistent backing store (for highest performance).