Distributed Computing Fundamentals Guide
Distributed Computing Fundamentals Guide
COMPUTING FROM
FIRST PRINCIPLES
By
Kenneth Emeka Odoh
[Link]
May 2025
Preface
”I would maintain that thanks are the highest form of thought; and that
gratitude is happiness doubled by wonder.” – G.K. Chesterton
i
Acknowledgement
”When it comes to life the critical thing is whether you take things for granted
or take them with gratitude.” – G.K. Chesterton
Archit Goyal reached out and provided source code on CRDT among other major
reviewing and coding contributions. Archit is a Senior Software Engineer at LinkedIn,
deeply involved in building and productionizing large-scale distributed data systems.
He is one of the early engineers behind LinkedIns Flink Batch platform, powering
Ads AI workloads and high-throughput, low-latency pipelines. Much of his work
focuses on scaling stateful computations, disaggregated shuffle, checkpointing trade-
offs, failure recovery, resource fairness, and multi-cluster observability.
I want to thank Anselm Eickhoff and Charles Krempeaux, who were responsive to
my questions about CRDT. Also, I would also like to thank Harrington Joseph, who
suggested that I focus on gaining a low-level understanding of parallel programming.
We provided credits for both anonymous reviewers that provided tangible support in
making this manuscript.
Kindly make a free-will donation to support my work, including open-source de-
velopment, blogging, research papers, or textbooks. Thank you so much for your
financial support.
Donate: [Link]
ii
Contents
Chapter 1 Introduction 2
1.1 Algorithm Implementations . . . . . . . . . . . . . . . . . . . . . . . 5
1.2 Overview . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . 6
iii
4.4 Stabilization Algorithm . . . . . . . . . . . . . . . . . . . . . . . . . 56
4.5 Byzantine Protocol (BFT) . . . . . . . . . . . . . . . . . . . . . . . . 60
4.6 Distributed Commit and Transaction . . . . . . . . . . . . . . . . . . 60
4.6.1 Atomic Commit Protocols . . . . . . . . . . . . . . . . . . . . 64
4.7 Routing Algorithm . . . . . . . . . . . . . . . . . . . . . . . . . . . . 67
Chapter 8 Miscellaneous 90
8.1 Accelerated Computing . . . . . . . . . . . . . . . . . . . . . . . . . . 90
8.2 Storage Systems . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . 90
8.3 Coding Philosophy . . . . . . . . . . . . . . . . . . . . . . . . . . . . 92
8.3.1 Review of Selected Source Codes . . . . . . . . . . . . . . . . 94
8.4 Case Studies . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . 97
8.4.1 Implementation of AutoScaling Framework (Serverless) . . . . 97
8.4.2 Distributed Computing Patterns . . . . . . . . . . . . . . . . . 100
8.5 Practical Considerations . . . . . . . . . . . . . . . . . . . . . . . . . 103
8.5.1 Tips for Testing . . . . . . . . . . . . . . . . . . . . . . . . . . 104
iv
8.5.2 Evaluation Metrics . . . . . . . . . . . . . . . . . . . . . . . . 104
8.6 Exercises for the Readers . . . . . . . . . . . . . . . . . . . . . . . . . 105
References 107
1
Chapter 1
Introduction
Have you ever wondered how a typical distributed system works under the hood?
Are you looking for a pedagogical guide with complete implementations and tricks of
the trade? Look no further and read my writing on the topic. We have implemented
several foundational algorithms in Distributed Computing. This paradigm has be-
come ubiquitous in the industry where multiple systems interact to solve compu-
tational tasks, with applications including distributed stores (databases), IoT sensor
networks, and the Internet. The exponential growth in deploying Distributed systems
for solving real-world problems has led to a resurgence in understanding Distributed
1
Computing from scratch . Consensus is a foundational requirement for distributed
systems, allowing disparate nodes to synthesize a consistent global perspective from
their local operations. This collaborative agreement forms the very objective of dis-
tributed algorithms.
Distributed systems operate in inherently uncertain and chaotic environments,
facing network unreliability through intermittent connectivity, message loss, and out-
of-order delivery. Therefore, fault tolerance must be a fundamental architectural
requirement, not an afterthought. As a result, managing these complexities demands
rigorous correctness guarantees to prevent unintended behaviors at scale across multi-
ple nodes and unreliable channels. In production-grade software, formal verification is
1
My research in Distributed Systems is a two-phased process: The first phase was cov-
ered in the [Link]
[Link]. The second phase is the current book that you are reading. A multi-phased
approach to research has helped to minimize risk.
2
crucial to reduce bugs and guarantee safety and liveness properties are never violated,
especially in mission-critical applications.
A few years ago, my deficiency in the low-level knowledge required to build a
large-scale distributed system without utilizing third-party packages motivated me
to research this topic from first principles. I subsequently became a better Software
Engineer in the process. My initial foray into distributed systems began by partici-
pating in a Distributed System meetup in Vancouver, where I eventually became the
organizer. Our study group utilized materials from the Distributed Systems course
at KTH Sweden. Our goal was mainly to gain intuition and theoretical knowledge
on the subject. Fortunately, I took a further step by completing the Parallel, Con-
current, and Distributed Programming in Java Specialization on Coursera, where
OpenMPI was mentioned. I recognized this as the missing link in my exploration of
implementing low-level distributed algorithms.
As I started researching, I noticed a troubling trend in Distributed Systems re-
search: the most significant work in the field tends to be done in top research labs,
advanced technical schools, and by seasoned open-source contributors. Unfortunately,
this status quo is unacceptable. Hence, I am motivated to invest in this research to
distill this knowledge to a diverse audience. Our focus is on delivering scientific con-
tent without diluting its quality and maintaining world-class rigor. We aim to create
disruptive change by sharing knowledge hidden in plain sight. One axiom from the
Zen of Python posits that ”practicality beats purity,” so rather than pontificate on the
state-of-the-art distributed algorithms, we have adopted the approach of solidifying
the fundamentals.
Notes:
1. This work is based on non-proprietary content unrelated to my employer. The
blog summarizes the first phase of my two-phased research on low-level Dis-
tributed systems. The second phase is completed in this textbook.
3. Unlike other intellectual works, some of our references will be secondary sources.
This decision is taken to keep our references within a reasonable scope.
Full disclosure: I implemented portions of the KTH Distributed system course that
ran on Edx, taught by Professor Haridi. I am grateful as it was my first exposure to
3
Distributed computing. I will also include some of his slides (Paxos, sequence Paxos)
in this book with full attribution.
Terminologies
2. Nodes, replicas, and processes will be used interchangeably and have the same
meaning within the context of this book.
12. Volume: This refers to the available storage space accessible by the operating
system.
4
1.1 Algorithm Implementations
5
Distributed algorithms. Hence, all implementations will strictly use OpenMPI in C.
Event-based programming based on message passing serves as a powerful abstraction
for building Distributed systems. Therefore, we are using this approach for all of our
implementations. For the sake of pedagogy, we have omitted tracing and invariant
proofs. While these are important in Distributed systems scholarship, we anticipate
that users will learn by doing and not be hindered by excessive mathematics. However,
for those who wish to focus more on writing proofs, we recommend this resource [33].
1.2 Overview
6
systems that do not depend on a single server, as central servers are a source of single
points of failure. Building peer-to-peer networks can help produce the redundancy
needed to achieve a system where a single server is not a bottleneck.
Chapter 7 describes the formal verification of Distributed systems. It is imperative
that in mission-critical systems, safety and liveness properties should not be violated
at any time.
Chapter 8 describes topics such as distributed commit, coding philosophy, case
studies, practical considerations for deploying distributed systems in the wild, testing
strategies, evaluation metrics, and exercises.
7
Chapter 2
”He who knows all the answers has not been asked all the questions.” – Con-
fucius
The socket is the fundamental file descriptor for networking. Recalling the Unix
philosophy that everything is a file, the ‘socket()‘ function returns a descriptor. Send-
ing a message over a network becomes analogous to writing to a file stream, while
receiving a message is comparable to reading a file stream. However, in this context,
the network acts as the channel for this communication. There are two basic socket
types, as outlined by [27]:
8
Most local networks utilize internal IP addresses, whereas Internet-facing gateways
employ external IP addresses with Network Address Translation (NAT) performing
the conversion from the Internet IP to the internal IP.
A server typically operates in a listening mode. Clients send messages to the
server to trigger actions, resulting in a response to the caller. Socket programming
is a method for machines to connect over a network. This communication occurs via
a protocol using a set of predefined conventions that enable parties to understand
message structures, codebooks, and other auxiliary information necessary for the
synthesis of exchanged data.
Network Protocol
Network protocols support varying degrees of error recovery, fault tolerance, and
overhead rates for message transfer. The design requirement should inform the choice
1
of the network [Link] it Transmission Control Protocol (TCP) , User Data-
2 3
gram Protocol (UDP) , Quick UDP Internet Connections (QUIC) , or Licklider
4
Transmission Protocol (LTP) respectively.
Transmission Control Protocol (TCP) is a connection-oriented protocol operat-
ing at the transport layer. This protocol requires a dedicated connection must be
established between the sender and receiver before data can be exchanged. A key
characteristic of TCP is its provision of congestion control mechanisms, which dy-
namically adjust the transmission rate to avoid network overload. Furthermore, TCP
guarantees reliable and order-preserving delivery of packets, ensuring that all data
arrives at the destination correctly and in the order in which packets were sent. How-
ever, these features contribute to TCP being considered a ”heavyweight” protocol due
to the overhead associated with connection management and reliability mechanisms.
Error recovery gets handled by the protocol, where lost messages are re-sent. The
TCP protocol described in Figure 2.1.
1
[Link]
2
[Link]
3
[Link]
4
[Link]
9
Figure 2.1: Sequence diagram for TCP.
10
Figure 2.2: Sequence diagram for UDP.
11
illustration in this blog.
When communicating between nodes in Distributed systems, a channel is nec-
essary to provide a medium for message transfer. The types of channels in use in
Distributed systems [37] include:
• Stubborn link
• Perfect link
Each of these links has specific and unique characteristics. For more information
on this subject, please consult the following resources:
In Unix, ‘fork()‘ creates a new process, where a parent process creates a new child
process. The parent process must wait for the child process to exit, allowing the
child process to terminate properly. This behaviour can become problematic if the
child process becomes defunct (a zombie) and the parent process ignores waiting. In
some systems, the ‘init‘ process reaps (destroys) defunct processes. A child process
becomes a zombie until the parent process waits or the parent ignores the ‘SIGCHLD‘
signal [27, 28]. Ignoring the waiting for the child process to exit in the parent process:
int main()
{
signal(SIGCHLD, SIG_IGN); //don’t wait
fork();
}
12
Processes and threads are core foundations for building Distributed computing
systems [9, 12]. Processes are units of work distribution. Threads are units of concur-
rency. Parallel programming presents significant challenges, including data sharing,
coordination, deadlock, lock granularity, and others [9, 12]. It is possible to have
multiple threads within a single process. Some benefits of this include [9, 12]:
• Performance (increased throughput); note that if the process blocks, every in-
ternal thread blocks as well.
• Scalability.
There are different forms of parallelism (task, functional, loop, data-flow) [9].
Java’s popular fork-join framework is based on the divide and conquer paradigm,
which is useful, where the problem can be decomposed in this sub-problem struc-
ture [9].
When using threads or processes, it is ideal to utilize the optimal number to
prevent reduced performance due to degradation from increased load (due to process
isolation) and communication costs for sharing information between processes.
Promises and futures are a popular form of parallelism where a deferred call is
made while simultaneously performing another disjoint task in simultaneously paral-
lel, and then joining on the result from the main thread.
Guaranteeing reproducibility can be a desirable goal. Quasi-randomness may be
acceptable in some contexts. A parallel program can exhibit the following character-
istics to address this behavior [9]:
• Functional determinism: the same input to the same function always yields the
same output.
13
• Structural determinism: any repetition of the parallel program yields the same
computational graph.
Let’s say that your process is processing permutation-invariant data (e.g., unsorted
data); in this case, only functional dependency is needed, not structural determinism.
This characteristics can provide clever shortcuts to optimize your program.
Without a fork or join operation, the computational graph shows a straight line
from the start to the finish nodes. In our example, S2 and S3 runs in parallel, as
shown in Figure 2.3.
Metric performance:
• Fork edge: connects a fork operation to the first step of a child task.
14
Ideal parallelism (ip) is given by: ip = work/span For a sequential program, the
ideal parallelism is 1. This requires having a computation graph [9]. Unfortunately,
we don’t always have a computation graph. In such cases, we can use Amdahl’s
law, which states that the speedup of a threaded program is limited by the sequential
portion of the computation across all processes [9, 12]. There are also alternatives like
share-nothing architecture (using the message-passing paradigm), e.g., actor model,
and distributed actors, which make use of message passing. Non-blocking approaches
using ‘getAndAdd()‘ and ‘compareAndSet()‘ are also available [9].
• Liveness: even with traffic, every vehicle is guaranteed to eventually leave the
junction.
15
problems posed in the literature with wide-applicability. These include the producer-
consumer problem (with multiple variants), the reader-writer problem, the starvation
problem (bounded wait on a semaphore), and the dining philosophers problem [9, 23,
28].
A barrier is a construct that blocks multiple threads and releases every blocked
thread upon the arrival of the last one [23]. This paradigm is commonly known as a
phaser in Java and is widely employed in MPI for creating collective operations. This
construct can also be used for pipelining [9, 23, 28]. A good book on interprocess
communication is Beej’s Guide to Unix IPC by Brian Hall. There are multiple venues
for sharing information between processes in Unix.
16
2.2.2 OpenMPI
OpenMPI [6, 7, 8, 10] is an implementation of the message-passing paradigm
for the exchange of information between processes, widely used in rendering farms,
high-performance computing, and Scientific computing. For example, Curie, a French
supercomputer, makes extensive use of OpenMPI for processing workflow. One of the
best resources for learning MPI on the Internet can be found on OpenMPI website.
The number of processes, size, is determined during the setup of the OpenMPI
communicator. However, adjustments can be made using sophisticated process group
management. Every process is assigned a rank, ranging from 0 to size-1.
• mpi bcast: all processes must wait until all processes have reached the same
collective.
Point-to-Point
Collective Operation
• reduction: one process gets data from every other process and applies transfor-
mation (sum, minimum, maximum).
• scatter: a single process partitions the data and sends each chunk to every other
process e.g. MPI Scatter.
• gather a single process assembly data from different processes in a buffer e.g
MPI Gather. MPI AllGather is to gather and scatter the results from every
process. There are synchronization primitives like locks, barrier
17
One-Sided Communication
MPI allows for remote memory access (RMA). Here are some commands which
include:
• MPI GET()
• MPI PUT()
• MPI Accumulate()
Unlike the two-sided communication model, which requires a sender and a receiver
for data transfer, the one-sided communication model allows a process to directly
access another process’s memory space. Consequently, only one process initiates
communication directly, minimizing data transfer and CPU intervention. However,
there is a caveat that the ordering of RMA operations cannot be safely guaran-
teed [6, 10]. In our implementation of Distributed shared primitives, we made ex-
tensive use of one-sided communication. While RMA is inherently non-blocking, we
are currently employing it in a blocking mode using locks. Implementing an epoch
with MPI Win fence might be a more suitable approach in our logic to fully leverage
the non-blocking features. MPI Compare and swap could also present a better alter-
native [6, 10]. If one-sided communication is used in place of the default two-sided
communication, the concept of explicit acknowledgments (ACKS or receipt confirma-
tions) as in two-sided communication doesn’t directly apply. The communication is
more akin to direct memory manipulation. Here is an example of an epoch:
18
Is using a fence, better than using locks on the level of concurrency granularity?
Hints, MPI Win fence is a collective using ideas from barriers.
Best Practices
• It is impossible to mix MPI GET, MPI PUT, MPI Accumulate in a single epoch
Benefits
19
Chapter 3
Distributed Systems [9, 12] are sets of nodes (devices) connected by communica-
tion links that operate as a single, cohesive system. Although each device functions
locally and independently, the overall system appears as a unified global entity, pro-
viding the ”single view illusion.” Examples include the Internet, edge computing
environments, mobile networks, and sensor networks, as illustrated in Figure 3.1.
20
Figure 3.1: Network of nodes in a cluster.
• Scalability
• Load distribution
21
latency. The CAP theorem (stating that only two out of three properties can be
simultaneously satisfied) is an acronym used for reasoning about the fundamental
trade-offs in the design of Distributed systems.
• The network is reliable (networks experience failures; always plan for retries
and acknowledgments).
• There is zero latency (account for the non-negligible transmission time for pack-
ets across the network).
• Transport cost is zero (similar to ”no latency,” always anticipate delays due to
network transport).
22
• The network is homogeneous (expect that network partitions may occur).
In our work, we have adopted the crash-stop model due to its relative simplicity.
To achieve fault tolerance, Distributed systems must be designed to cope with failures.
This is typically accomplished through a combination of techniques, including retrying
lost packets, replicating data across nodes for enhanced availability, and employing
mechanisms for replacing failed components (e.g., electing a new leader if the current
one fails). Fault tolerance is the system’s ability to maintain operation in the presence
of faults.
One common use case for Distributed systems is Sharding, as depicted in Fig-
ure 3.2. In the context of databases, sharding is the process of horizontally parti-
tioning a database table into multiple independent subsets. Sharding (also known
as partitioning) is often combined with replication (duplicating data across multiple
nodes) to enhance data availability. These techniques involve organizing data into
chunks that may or may not overlap.
23
24
• The system starts in the same state across all replicas, and if all operations are
deterministic, it ends in the same final state across all correct replicas.
• Coordination: Managing other nodes, such as the leader in the Raft consensus
algorithm.
25
3.1.1 FLP Impossibility of Consensus
The Fischer-Lynch-Paterson (FLP) impossibility result describes the inherent lim-
itations of achieving consensus in asynchronous distributed systems. It states that in
an asynchronous system subject to even a single crash-stop failure, it is impossible to
guarantee that a consensus algorithm will always satisfy the following three crucial
properties [4, 12, 37]:
• Agreement: Every correct node must eventually decide on the same value.
• Validity: If all initially proposed values are the same, then any decided value
must be that value. (A weaker form states that the decided value must be one
of the initially proposed values.)
The FLP result highlights the fundamental challenges of achieving reliable consen-
sus in systems with unpredictable timing, applicable to asynchronous, synchronous,
and partially synchronous systems [4, 12, 37]:
26
• Two nodes need to agree on a specific value before a defined time limit.
27
clock [4]. Two primary types of logical clocks are commonly used:
• Lamport clock
• Vector clock
To show the differences between a Lamport clock and a vector clock. We describe
a distributed system comprising three processes: P1, P2, and P3. A Vector Clock is
a logical time mechanism where each process maintains a vector of timestamps. The
i-th element of the vector in process Pj represents Pj ’s knowledge of the number of
events that have occurred in process Pi . This allows for the determination of causal
relationships between events.
28
Figure 3.3: Lamport Clock Description.
29
The description of an example of a Lamport clock in use is shown in Figure 3.3.
Process P1
Process P2
Process P2 starts, receives a message, executes internal events, and sends a mes-
sage:
• Send(n) (LP=5): Before sending message n to Process P3, the Lamport clock
is incremented to 5. The timestamp of the send event is 5.
30
Process P3
Communication
The interaction between processes through message passing is crucial for the Lam-
port clock’s mechanism:
In summary, the Lamport clock in this code snippet assigns logical timestamps
to events in each process based on local increments and the timestamps of received
messages, ensuring a partial ordering of events consistent with the causal ”happens-
before” relationship.
31
3.2.2 Vector clock
The Vector clock, in contrast, is based on partial order and effectively captures
both causality and non-causality [13]. The ability to detect non-causality is crucial
for identifying events that occur concurrently.
32
Figure 3.4: Vector Clock Description.
33
The description of an example of a Vector clock in use is shown in Figure 3.4.
Process P1
• Event A (VC=[1, 0, 0]): Upon the occurrence of the first event in P1, the
first element of its vector clock (representing its event count) is incremented to
1.
• Event B (VC=[2, 0, 0]): The second event in P1 increments its local event
count in the vector clock to 2.
• Send(m) (VC=[3, 0, 0]): Before sending message m to Process P2, P1’s local
event count is incremented to 3. The message m is sent carrying this vector
timestamp.
Process P2
The Vector Clock in Process P2 evolves as it receives a message and executes local
events:
• Start (VC=[0, 0, 0]): Process P2 initializes its vector clock with all compo-
nents set to 0.
34
• Send(n) (VC=[4, 2, 0]): Before sending message n to Process P3, P2’s
local event count is incremented to 2. The message n is sent with this vector
timestamp.
Process P3
The Vector Clock in Process P3 evolves upon receiving a message and executing
local events:
• Start (VC=[0, 0, 0]): Process P3 initializes its vector clock with all compo-
nents set to 0.
Communication
• Message m is sent from P1 with the vector timestamp [3, 0, 0] to P2, which
influences P2’s vector clock upon reception.
• Message n is sent from P2 with the vector timestamp [4, 2, 0] to P3, which
influences P3’s vector clock upon reception.
35
In summary, the Vector Clock mechanism illustrated in the code assigns a vector
of timestamps to each event, allowing for the determination of causal relationships
between events across the distributed system. The vector timestamps are updated
based on local event occurrences and the vector timestamps received with messages,
ensuring that each process maintains a view of the progress of all other processes that
is consistent with causality.
A Vector clock can accurately determine whether two operations are concurrent
or causally dependent on each other, a distinction that the Lamport clock cannot
reliably make.
• Nodes periodically exchange heartbeat messages with all other processes they
believe to be alive.
• If a node does not receive a heartbeat response from another process within a
predefined timeout period, it begins to suspect that process.
36
• If a message is subsequently received from a suspected node, the suspicion is
typically revised (the node is no longer suspected), and the timeout period for
that node might be increased.
37
a process crash. By maintaining per-node timeout values and dynamically adjust-
ing them based on communication history, the failure detector can learn the typical
behavior of the network and become more resilient to transient network faults. Fail-
ure detectors can be further enhanced by expanding the types of failures they can
identify. Our implementation targets a minimal subset of potential failures. Ideally,
the design of a failure detector should leverage domain-specific knowledge about the
types of failures that are most likely to occur in a given system, allowing for tailored
detection mechanisms.
• A circuit breaker makes use of a failure detector that scans for breakdowns
in your cluster. A failure is determined based on a set threshold (consider
using the accuracy and completeness properties). Once a failure rate threshold
is exceeded, the resource becomes unavailable and hence cannot accept new
38
requests until a set time has elapsed, at which point it is assumed that the
unavailable resource has recovered and can begin to process requests again.
The recovery process is then re-triggered by a secondary resource, e.g., placing
the circuit breaker behind a load balancer. When you visit a website and see a
banner indicating that the site is under maintenance, you have probably been
redirected by a circuit breaker in action.
• The retry pattern would suffice for transient failures, which are short-lived er-
rors. However, for prolonged failures, the circuit breaker is ideal. Care must
be taken when using the circuit breaker to handle cascading failures, which
occur when downstream systems become non-functional due to dependency on
an unavailable parent system. Correlation vectors can help with distributed
debugging of such failures.
39
Chapter 4
• Each node in the system is assumed to have a network channel enabling com-
munication with every other node.
40
• Upon reaching consensus, every node in the system will have decided on the
same value.
The following concurrency-related properties [4, 12, 37] are generally applicable
in parallel and concurrent programming paradigms:
• Safety: Only values proposed by proposers can be chosen as the consensus value.
The main characteristics of the Paxos algorithm are [4, 37, 12]:
The Paxos algorithm defines several roles that nodes in the system can assume:
• Client: The entity that initiates the consensus process by proposing a value.
• Proposer: A node that receives a proposal from a client and attempts to get it
accepted by the acceptors.
• Acceptor: A node that receives proposals and votes from proposers, deciding
whether to accept a proposed value.
• Learner: A node that learns the decided value once a quorum of acceptors has
accepted it.
We will now discuss Single Value Paxos and Sequence Paxos in the subsequent
subsections.
41
Single Valued Paxos
• Proposers send accept requests (containing the proposal number and a pro-
posed value) to the same quorum of acceptors. The value might be the original
proposed value or the highest-numbered value already accepted by any of the
acceptors in their responses to the prepare request.
The implementation of the Single Value Paxos algorithm discussed in this book
is visually represented in Figure 4.1 and Figure 4.2. Figure 4.1 illustrates the initial
states of each node at the internal start of the Single Value Paxos process.
42
Figure 4.1: Initialization of states at the internal start of Paxos Algorithm [4].
Figure 4.2 presents a sequence (interaction) diagram depicting the messages ex-
changed between the different roles (Client, Proposer, Acceptor, Learner) in the net-
work during the Single Value Paxos algorithm.
43
Figure 4.2: Sequence of message flow in a Paxos Algorithm [4].
It’s worth noting that there are several variations of Single Value Paxos, includ-
ing Fast Paxos, Egalitarian Paxos, and Flexible Paxos [37], each offering different
performance characteristics or fault-tolerance trade-offs.
Sequence Paxos
The Sequence Paxos algorithm extends the Single Value Paxos to achieve agree-
ment on a sequence of values (often representing a log of events) in a consistent order
across all nodes [4, 37, 12]. The general approach involves:
• Repetitively performing the Single Value Paxos algorithm for each position
(slot) in the sequence, ensuring that the decisions for each slot are made in
order.
• Maintaining a strict prefix invariant: If all nodes have decided on the values for
the first i slots, then these decided prefixes must be identical across all nodes.
44
In more detail, the fundamental principles of the Sequence Paxos algorithm in-
clude:
Figure 4.4 presents a sequence (interaction) diagram illustrating the message flow
between the different roles in the network during the Sequence Paxos algorithm.
45
Figure 4.4: Message flow in a Sequence Paxos Algorithm [4].
46
Figure 4.5: Maintaining prefix of log invariants in a Sequence Paxos Algorithm [4].
• A server can become a candidate by waiting for some set time and if no ping
47
from others is received. It becomes a candidate.
Note: timeout must be longer than the duration of the leader election.
The choice of delay has a significant impact on leader election.
The rule of thumb for deciding the duration
– If the value is set too low, then the second candidate begins the election
before the first election triggered by the first candidate.
– if too high, then it will take too long for the election to start after the old
leader has died. The new candidate starts an election.
A variation using ballots for a leadership election to choose a leader from a set
of candidates is shown below. The following code snippet presents a ballot-based
48
approach for electing a leader from a set of candidate servers within an MPI (Message
Passing Interface) environment as seen in leader-election3.c:
// Function responsible for initiating and managing the leader election process.
void leaderElection(int my_rank, int num_procs, int *leader,
MPI_Datatype mpi_data_type) {
process_cnts = create_shared_var(MPI_COMM_WORLD, 0);
int cnt = 0; // Local counter for the current election attempt
int run_loop = 1;
// Example timeout duration for detecting leader failure
double duration = 2.0;
double beg = MPI_Wtime();
double end = beg;
int msg_flag;
MPI_Status status;
int is_leader_dead = (*leader == MPI_PROC_NULL);
// Initially true if no leader is known
MPI_Request requests[num_procs];
while (run_loop) {
is_leader_dead = isLeaderFailed((*leader));
if (is_leader_dead) {
beg = MPI_Wtime();
while ((end - beg) <= duration) {
end = MPI_Wtime();
MPI_Iprobe(MPI_ANY_SOURCE, MPI_ANY_TAG,
MPI_COMM_WORLD, &msg_flag, &status);
// Break the timeout loop if any message is received
if (msg_flag) break;
}
// Check again if another process has already started an election
// and potentially updated the leader.
is_leader_dead = isLeaderFailed((*leader));
if (is_leader_dead) {
49
printf("Process %d: A leader process (pid=%d)
has failed or no initial leader\n", my_rank, *leader);
data package;
memset(&package, 0, sizeof(package));
// Generate a unique ballot based on time
[Link] = (int)time(NULL);
[Link] = my_rank;
50
// Static local counter for the number of promises received
static int cnt = 0;
enum msgTag ctag;
data accept_value;
memset(&accept_value, 0, sizeof(accept_value));
// If the received ballot is higher than the current maximum, update it.
if ([Link] > max_ballot) {
max_ballot = [Link];
// Store the data associated with the highest ballot.
accept_value = recv;
}
51
Note
Remember to reset both counters for both functions at the end of the leader
election.
It is crucial to remember to reset the shared counters (‘process cnts‘, ‘proc cnts‘,
and ‘process max ballot‘) and to release their associated MPI Window objects using
‘MPI Win free()‘ at the end of the leader election process. Failure to do so can lead to
interference with subsequent MPI operations. The provided ‘resetElectionCounters()‘
function attempts to perform this cleanup. Additionally, you will need to free the
MPI datatype ‘mpi data type‘ using ‘MPI Type free()‘.
The ‘MPI Barrier(MPI COMM WORLD)‘ calls are essential for ensuring that all
processes synchronize at specific points within the algorithm. In the ‘leaderElection()‘
function, the barrier ensures that all processes have completed their initial leader fail-
ure detection and potential election triggering before proceeding to the next iteration
of the loop. In the ‘main()‘ function, barriers are used to synchronize the printing of
leader information and to ensure that all processes participate in the ‘checkLeader()‘
phase after receiving potential leader announcements.
The provided code snippet includes placeholders and assumes the existence of
atomic primitives (‘create shared var‘, ‘increment counter‘, ‘reset var‘) that would
typically be implemented using MPI Windows for shared memory access and syn-
chronization, as potentially demonstrated in your Paxos implementation. You will
need to ensure that these primitives are correctly implemented and accessible within
your election algorithm code. The ‘isLeaderFailed()‘ function is a simplified place-
holder for failure detection and should be replaced with a more robust mechanism
(e.g., monitoring heartbeats) in a production-ready system. The ‘checkLeader()‘ func-
tion now uses static local variables (‘max ballot‘ and ‘cnt‘) to maintain state across
multiple invocations within a single election cycle. The ‘initializeElectionCounters()‘
function is added for the proper initialization of the shared counters and variables be-
fore the election begins. The ‘SAFE MPI CALL‘ macro is included as a good practice
for handling potential errors in MPI function calls. The ‘main()‘ function provides a
basic example of how the ‘leaderElection‘ and ‘checkLeader‘ functions might be used
within an MPI program.
Add the required barriers to make the example work as expected. Look at my
52
implementations of Paxos on how to create proper atomic primitives.
• Candidate: A node that initiates an election with the goal of becoming the
leader.
• Leader: The candidate that successfully wins the election and assumes the
responsibility of coordinating the consensus process.
Despite its design for simplicity, Raft can still encounter potential issues:
53
• Missing log entries: Unexpected errors, such as node crashes or network disrup-
tions, could potentially cause some decided log entries to be lost or not fully
replicated across the cluster.
• Divergent logs: If the cluster experiences issues like multiple leaders (even tran-
siently) or failures during log replication, different nodes might end up with
logs containing conflicting decided values at the same index. This divergence
violates the fundamental consistency property that Raft aims to maintain.
• Can an election choose multiple leaders [12]? Yes, although Raft’s mechanisms
are designed to strongly discourage this, transient network issues or timing-
dependent failures could theoretically lead to split votes and the possibility of
multiple nodes believing they are leaders for a short period.
• Can an election fail to choose a leader [12]? Yes, scenarios such as all candidates
failing or a persistent split vote across the cluster could result in no single
candidate obtaining a majority, thus leading to a failed election.
if (rank == leader)
{
// This node acts as the proposer,
54
// initiating and coordinating log replication.
}
else
{
// Example: Assign nodes with even rank as acceptors.
isAcceptor = rank % 2;
if (isAcceptor)
{
// This node acts as an acceptor, voting on proposed log entries.
}
else
{
// This node acts as a learner, receiving
// and storing committed log entries.
}
}
if (rank == leader)
{
// This node acts as the proposer.
}
else
{
// Example: The node with rank immediately
// following the leader is the learner.
isLearner = (leader + 1) % n;
if (isLearner)
{
// This node acts as the learner.
}
else
55
{
// This node acts as the acceptor. In this example,
// there is only one acceptor (excluding the leader and learner).
}
}
56
Snapshotting involves saving the configuration states of a distributed computa-
tion for later retrieval [43, 45]. When an error is detected, the system can revert to a
previously known good configuration saved in the snapshot (a change point). Snap-
shotting is desirable when manual intervention to resolve errors is costly. Retrieving
the saved change point when an illegal state is reached can guarantee self-healing in
the distributed computing system. For reliability, it is essential that the saved change
point has not been compromised by a malicious node, ensuring the system’s return
to correct behavior.
We have implemented a mechanism for saving the decided value in a single-valued
Paxos algorithm. The following outline is a snippet from the file named single-paxos3-
snapshot.c. This implementation includes some custom details that are not part of
the general Paxos specification. Specifically, I created a new role, ’telemetry,’ in
addition to the standard proposer, acceptor, decider, and learner roles. I also created
a dedicated communicator for the telemetry role to simulate logging the distributed
system’s state as quickly as possible. This is a simplification, as we have omitted the
data storage logic.
57
int cnt =0;
int ret;
data recv;
memset(&recv, 0, sizeof(recv));
while (1)
{
/* Receive message from any process */
if(flag != 0)
{
ret = MPI_Irecv(&recv, 1, mpi_data_type,
MPI_ANY_SOURCE, MPI_ANY_TAG, comm, &requests[my_rank]);
flag = 0;
}
MPI_Test(&requests[my_rank], &flag, &status[my_rank]);
if (flag != 0)
{
if (ret == MPI_SUCCESS )
{
enum manageTag tag = status[my_rank].MPI_TAG;
int source = status[my_rank].MPI_SOURCE;
if (tag == mSNAPSHOT)
{
printf("################ SAVING SNAPSHOT ###############");
printf("recv.custom_round_number: %d,
recv.round_number: %d, [Link]: %d\n", recv.custom_round_number,
recv.round_number, [Link]);
printf("################# END SNAPSHOT ################");
58
else if (tag == mREVERT)
{
printf("################ DELETE SNAPSHOT ###############");
printf("PERFORM CUSTOM LOGIC FOR MANAGING SNAPSHOT");
printf("################# END SNAPSHOT ################");
}
if (cnt>0)
{
if (!flag)
MPI_Cancel( &requests[my_rank] );
break;
}
}
}
// retrieve snapshot
handle_snapshot_messages(row_comm[TELEMETRY], my_rank,
requests, status, mpi_data_type, telemetry_msg_cnts);
59
4.5 Byzantine Protocol (BFT)
1
The resiliency discussed in SubSections 4.1, 4.3 would fail if there is < 2
of the
number of live processes. More robust algorithms can handle cases of failure exceeding
the tolerance bound of traditional Paxos and raft algorithms. Some algorithms have
mechanisms to handle cases where the failed process can take arbitrary actions such
as sending bad messages and activating unnecessary state transitions, and even denial
of service can affect a distributed system. It does not prevent some interruption, but
it can mitigate the attack and ensure that an agreement can still be reached.
These algorithms with robust error handling included in the consensus algorithm
are known as Byzantine protocols. These allow for building distributed systems that
function in chaotic channels with significant disruption, as we can achieve a quorum
1
with at least 3
of the number of live processes. We consider scenarios with parallel
leaders and also scenarios with a single leader serving as the proposer. There are
several papers describing BFT [18, 41] with varying tolerance guarantees.
Commit is the immutable operation that finalizes a transaction, making its mod-
ifications permanent and visible throughout the system. In contrast, a transaction
represents a logical and atomic unit of work, potentially comprising multiple indi-
vidual operations (reads, writes, updates, deletions). A transaction is the combined
procedure for performing a task, consisting of a sequence of steps. Commit serves as
the concluding step of a transaction, confirming its successful execution and guaran-
teeing the durability of all associated changes. Transactions prevent race conditions,
inconsistent updates, and conflicting operations, and can provide strict guarantees on
the execution plan.
Atomic commit requires that a collection of processes jointly decide whether a
transaction is committed or aborted. The properties of a transaction include:
60
• Durability: committed transactions are saved for later access.
• Nested transaction has a hierarchical structure where the transaction can trigger
a set of sub-transactions.
Let’s set up a simple, single-system banking scenario with two accounts (A and
B) to prepare for our discussion of distributed protocols in SubSection 4.6.1. We will
illustrate a basic transfer using a flat transaction (Figure 4.7) and a more complex
breakdown using a nested transaction (Figure 4.8).
Figure 4.7 depicts a basic banking transaction, such as a fund transfer. It out-
lines the sequential operations involved and the two potential outcomes: successful
completion (Commit) or failure necessitating a rollback (Abort) to maintain account
integrity. For instance, insufficient funds in Debt Account A during a debit would
likely lead to an Abort, a scenario covered by the ”Failure” path originating from
Credit Account B.
61
adequate funds through a ”CheckBalance” operation. Based on the outcome of this
check, the system either proceeds to a ”SufficientFunds” state, culminating in a suc-
cessful ”Commit Sub-Transaction: Verify Funds,” or encounters ”InsufficientFunds,”
leading to an ”Abort Sub-Transaction: Verify Funds.”. The subsequent step, ”Start
Sub-Transaction: Transfer Funds,” is initiated solely upon the successful completion
(commit) of the ”Verify Funds” sub-transaction. This second sub-process involves
the core actions of ”Debit Account A” and ”Credit Account B.” A successful execu-
tion of both these actions results in a ”Commit Sub-Transaction: Transfer Funds.”
Conversely, any failure during either the debit or credit operation leads to an ”Abort
Sub-Transaction: Transfer Funds.”. The success of the entire transaction, spanning
from the initial ”Start Transaction” to the final ”Commit Transaction,” is contin-
gent upon the successful commitment of both constituent sub-transactions. If either
the ”Abort Sub-Transaction: Verify Funds” or the ”Abort Sub-Transaction: Transfer
Funds” occurs, the entire operation is rolled back, resulting in an ”Abort Trans-
action.” This mechanism ensures the integrity of the data by reverting any partial
modifications. Consequently, the final status of the primary transaction is directly
determined by the combined outcomes of its sub-transactions.
62
Figure 4.8: Nested Transaction containing sub-transactions depicting a bank
transaction.
We aim to perform transactions between a client and a server. The client begins a
transaction that must be executed on the server. In the case of a one-phase commit,
which is unidirectional, the server cannot solely decide to abort a transaction during
a client request. Due to the lack of concurrency control, if the server aborts the
client’s request, then the client will be unaware of the server’s action. The client has
63
to make another call to know the state of the server. An improvement is to introduce
a coordinator who is always alive to help serialize the actions between the client and
the server. This improvement has led to the development of several atomic control
protocols in SubSection 4.6.1.
Two-Phase Commit (2PC) protocol permits any party (participant) can decide
to roll back its transaction (due to failure) and the entire transaction is globally
reverted [20, 43]. This requires a coordinator to facilitate concurrency control. The
algorithm has two phases [20]: in the first phase, each party votes to commit or abort
a transaction. Once, a party has voted to commit a transaction, it is binding and
immutable. After voting each party goes to the ”prepared” state. In the second
phase, each party in the transaction jointly executes the decision. If any party aborts
or fails, then the overall transaction is aborted. The two phases ensure that all parties
reach the same decision on committing or rolling back the transaction.
We describe the Two-Phase Commit Protocol depicted in Figure 4.9 as follows:
• Upon receipt of the votes from the participants, if every vote is a ’yes’, then
the ’doCommit’ request is sent to every participant. However, if any vote is a
’no’, then the current transaction on the coordinator is aborted, and then the
’doAbort’ command is sent to every participant.
64
Figure 4.9: Two-Phase Commit [20].
Three-Phase Commit
Two-Phase Commit fails when the coordinator is dead and participants in the
transaction cannot determine the state of the transaction resulting in longer polling
intervals as parties try to connect and get information from the coordinator. Three-
Phase Commit solves the problem by taking more rounds [20, 43] as we perform a
precommit round and obtain ACKS before proceeding to commit.
We describe the Two-Phase Commit Protocol depicted in Figure 4.10 as follows:
65
• Upon receipt of the votes from the participants, if every vote is a ’yes’, then the
’doPreCommit’ request is sent to every participant. However, if any vote is a
’no’, then the current transaction on the coordinator is aborted, and then the
’doAbort’ command is sent to every participant.
66
Figure 4.10: Three-Phase Commit [20].
67
• Layer 2 such as 802.1d [Link] spanning tree protocol, 802.1AB link aggregation
control protocol, link layer discovery protocol, RFC 1812 IP routing.
68
Chapter 5
Anti-Entropy Techniques
”Nothing is more important than the state of the soul.” – G.K. Chesterton
69
be directly attributed to how well it proactively handles these inherent inconsisten-
cies. While the specific implementation details and proprietary techniques might vary
significantly between different systems, the general frameworks for reasoning about
and systematically addressing entropy are based on well-established principles and
algorithmic patterns.
One fundamental approach to mitigating these issues involves the deployment of
background processes that periodically perform checks for discrepancies in the state
held by different nodes. Upon detecting an out-of-sync condition, these processes
can automatically trigger retry mechanisms to re-request missing or potentially cor-
rupted data or employ sophisticated error correction techniques to repair incomplete
or inconsistent data, thereby facilitating conflict resolution and restoring a consis-
tent state. A straightforward yet often effective conflict resolution strategy is for a
node that detects an inconsistency to request the correct or missing data from its
neighboring nodes and subsequently updates its local state upon successful retrieval
during these retry attempts. Well-known examples of such proactive synchronization
techniques include read repair, where inconsistencies are actively detected and fixed
during read operations initiated by clients, and hinted handoff, a mechanism where
a temporarily available neighbor node accepts and temporarily stores writes destined
for a failed node, ensuring that these writes are delivered once the intended recipient
recovers and rejoins the system.
The Gossip protocol [25, 29, 30, 32] presents a robust and widely applicable anti-
entropy mechanism that can be effectively employed in distributed systems to promote
data consistency [37]. This protocol leverages a probabilistic approach to disseminate
information (messages) across a network, drawing a compelling analogy to the way
rumors or information propagate through a social network. A significant advantage of
the Gossip algorithm is its inherent fault tolerance and resilience to failures, even in
the face of substantial and unpredictable network disruptions or node outages. While
the epidemic approach, characterized by the rapid and potentially ubiquitous spread
of information to all reachable nodes, is a common and intuitive implementation
strategy, Gossip protocols can also be implemented using non-epidemic strategies
70
that strategically leverage overlay network structures or maintain partial views of the
system’s membership to optimize message dissemination and resource utilization.
5.2 CRDT
Conflict-free Replicated Data Types (CRDTs) represent another powerful and in-
creasingly popular class of anti-entropy mechanisms in distributed systems. Riak, a
distributed NoSQL database, was a notable pioneer in the widespread adoption and
practical application of CRDTs [5, 37]. A CRDT is a specialized type of data struc-
ture meticulously designed to maintain a consistent state across multiple independent
replicas, regardless of the order in which concurrent operations are executed on those
individual replicas. This inherent order-independence property makes CRDTs excep-
tionally well-suited for scenarios involving collaborative applications and seamless re-
mote synchronization (replication) of data across numerous geographically distributed
devices, a task that can be inherently complex to manage for data consistency using
traditional concurrency control mechanisms. Mathematically, CRDTs can be for-
mally characterized as partially ordered monoids that possess a well-defined lattice
structure. To ensure their conflict-free nature, they must satisfy the algebraic prop-
erties of commutativity (the order in which operations are applied does not affect
the final state) and idempotence (applying the same operation multiple times has the
same effect as applying it only once).
CRDTs offer several compelling advantages for building highly available and even-
tually consistent distributed applications:
71
• Local-first application: Operations performed on a CRDT can be applied
locally on a replica immediately without the need to wait for synchronization
with other replicas. This ”local-first” characteristic significantly improves the
responsiveness and perceived performance of applications, leading to a better
user experience, especially in high-latency network environments.
A diverse range of CRDTs has been developed, each tailored to manage different
types of data and support specific sets of operations. Common examples include map
CRDTs (such as the Last-Write-Wins Map - LWW-Map), set CRDTs (such as the
Grow-Only Set - G-Set), and many other specialized data structures designed for
various consistency and concurrency control requirements [5, 37].
We have implemented a version of LWW-Map is shown in lamport1-majority-
voting8.c. Basically, we make updates and use the largest timestamp of processes to
determine the final value of a map.
The provided C source code implements a Grow-only Counter (GCounter) as
shown in crdt-gcounter.c, a type of Conflict-free Replicated Data Type (CRDT),
utilizing the Message Passing Interface (MPI) for parallel operation. Each paral-
lel process maintains a local state stored in the ‘GCounter‘ structure, where the
‘counts‘ array tracks the contributions of every process on a per-index level. The
core CRDT operation is the merge, which is performed using the collective primitive
‘MPI Allreduce‘ with the reduction operator ‘MPI MAX‘. This operation ensures
that every process receives the global state where each element of the merged ar-
ray is the maximum of the corresponding local element from all contributing arrays,
correctly satisfying the GCounter’s merge property: merge(A, B) = max(Ai , Bi ).
Finally, the ‘gcounter value‘ function calculates the counter’s total by performing a
summation over all indices of the merged array; while the true CRDT merge logic uses
‘MPI MAX‘, the summation provides a shorthand method to check that the merged
arrays have similar content and allows for a final, consistent global value check across
all processes for demonstration purposes.
lww-map.c implements a Last-Write-Wins Map (LWW-Map), a type of Conflict-
free Replicated Data Type (CRDT), using the Message Passing Interface (MPI) to
handle state replication and global merging across multiple parallel processes. The
basis of the code is to allow independent, concurrent updates (puts and deletes) to a
key-value store on different nodes, with a deterministic merge function that resolves
72
conflicts using a logical timestamp and a node ID tie-breaker. The core data struc-
ture, ‘LWWMap‘, holds an array of ‘Entry‘ structs, where each entry records the
key, value, a logical timestamp (‘ts‘), the node ID (‘node‘) that made the update,
and a tombstone flag (‘tomb‘) to mark deletions. The ‘put‘ and ‘del‘ functions in-
crement the map’s local clock (m → clock) before writing the entry, ensuring newer
operations have a higher timestamp. The critical functionality is handled by the
‘global merge‘ function: Rank 0 first collects the local state arrays from all other
ranks using ‘MPI Gather‘ to determine the size and subsequent ‘MPI Recv‘ (or local
‘memcpy‘ for itself) to gather all individual entries into a single large buffer. Rank 0
then performs the actual merge logic: it iterates through the collected entries and for
each unique key, it compares the timestamp and node ID of the new entry against the
currently merged entry using the ‘newer‘ function, keeping only the one with the later
timestamp or higher node ID. Finally, Rank 0 broadcasts the resulting, fully merged
‘LWWMap‘ state back to all other processes using ‘MPI Bcast‘, ensuring every node
is synchronized. The ‘main‘ function demonstrates a conflict scenario where differ-
ent ranks update or delete the same keys (’a’, ’b’, ’c’), followed by a global merge
and a call to ‘print visible‘, which displays the final, consistent state across all ranks,
omitting any entries marked by a tombstone.
73
5.4 Ancillary Structures
Ancillary data structures, while not directly mechanisms for achieving consensus
or propagating updates, can play a vital role in efficiently verifying whether the
state held by different nodes in a distributed system is in disagreement or has become
inconsistent. Structures like the Merkle tree provide an efficient method for identifying
and potentially facilitating the resolution of such conflicts. A Merkle tree is a tree-like
data structure where each non-leaf node is a cryptographic hash of its child nodes,
and the leaf nodes are cryptographic hashes of individual data chunks or blocks.
This hierarchical hashing mechanism allows for the efficient and secure verification of
data integrity and consistency across distributed systems. We can effectively identify
disagreements among nodes if they fail to produce matching results during audit or
consistency proofs based on their respective Merkle trees. Let us delve into the details
of these proofs:
• Consistency proof: A consistency proof allows a node that holds one version
of a Merkle tree (representing the state of the data at a particular point in
time) to efficiently and cryptographically verify that another node holding a
potentially different version of the same Merkle tree represents a consistent
historical version of its own tree. This enables efficient comparison of different
states of the same dataset over time, identifying points of divergence or ensuring
that one version is indeed an ancestor of another.
74
• Data synchronization: Efficiently identifying the specific data chunks or sub-
trees that differ between nodes by comparing their Merkle tree structures, al-
lowing for targeted synchronization of only the differing data and minimizing
the overhead of transferring large, identical datasets.
75
maintaining high availability.
• Strong consistency: This stringent approach ensures that every node in the
distributed system is effectively locked or prevented from serving potentially
stale data until all nodes have successfully acknowledged and applied an update
to a new value. This guarantees that all read operations will reflect the most
recent write operation, providing a linearizable view of the data. However,
achieving strong consistency can significantly impact the system’s availability
and performance, especially in the presence of network partitions or transient
node failures, as the system might become unavailable if a quorum of nodes
cannot be reached.
76
For readers interested in a more in-depth exploration of the theoretical underpin-
nings and practical applications of various error recovery codes, the following highly
regarded texts are recommended as valuable resources:
77
erasure coding configuration to function effectively in a real-world storage system, a
total of n + m storage drives or nodes are required. The key advantage is that the
storage overhead is determined by the ratio of m to n, which can be significantly
lower than the 1 : 1 overhead of full replication while still offering substantial fault
tolerance (the ability to recover from the loss of any m drives).
1
See an illustrative example of optimizing a distributed data store for operation at significant
scale in the Uber blog
78
Chapter 6
Peer-to-Peer Computing
”Life is a wheel of fortune and it’s my turn to spin it.” – Tupac Amaru Shakur
In the peer-to-peer (P2P) paradigm, each network node functions as both a client
and a server simultaneously. This allows any node to communicate directly with
its neighbors without the necessity of a dedicated central server. This decentralized
arrangement inherently avoids a single point of failure, enhancing the system’s ro-
bustness. However, some hybrid peer-to-peer systems exist where a central server is
utilized for initial orchestration and connection establishment. These servers store
information about the peers present in the network, facilitating the discovery of other
nodes.
In contrast, the traditional client-server paradigm restricts direct communication
between clients. For one client to interact with another, it typically needs to update
the state on the central server, which then allows other clients to connect to the server
and retrieve the shared information. In this model, the server becomes a critical
component for all communication, making it a significant single point of failure for
the entire system.
Peer-to-peer computing offers notable benefits, including reduced storage and com-
putational costs, achieved by distributing the burden of computation and data storage
across several participating nodes. The applications of peer-to-peer computing span
a wide range of domains, including distributed file systems (e.g., OceanStore [31],
PAST [24], CFS [21]), file sharing networks (e.g., BitTorrent [1], FastTrack [2]), and
large-scale Grid computing platforms [3].
79
We present a peer-to-peer network of 4 nodes (peers) named ”A”, ”B”, ”C”, and
”D” as shown in Figure 6.1. Each node can connect with any other node in the
network. There is no central server or intermediary required for one peer to interact
with another node resulting in decentralized network.
@TODO: IMPLEMENTATION
We will now discuss the categorization of peer-to-peer systems to address core
concepts such as overlay networks, unstructured P2P systems, and structured P2P
systems, as covered in Sections 6.1, 6.2, and 6.3 respectively.
80
6.1 Resilient Overlay Networks
81
6.2 Unstructured P2P Systems
82
Chapter 7
”It is better to be roughly right than precisely wrong.” – John Maynard Keynes
83
through less formal but still valuable testing methodologies such as unit tests, inte-
gration tests, and fault-injection tests. These testing approaches can help to uncover
subtle bugs and significantly improve the overall quality of the distributed software.
Software testing, in general, is less formal compared to the rigorous mathematical
proofs employed in formal verification. Unit tests should be strategically designed
to identify potential failure modes within individual components and to achieve high
coverage of the problem space related to these failures. Integration tests play a cru-
cial role in verifying the behavior of the system as different nodes communicate with
each other over a simulated or real network channel within a cluster. A particularly
insightful type of integration test is the fault-injection test, where engineers intention-
ally introduce specific failure scenarios into the running system and carefully observe
how the collaborating components behave under these adverse conditions. For in-
stance, in a distributed data store, a fault-injection test might involve simulating a
node failure to observe if there is any data loss or corruption. Metamorphic testing
is another valuable technique that can enhance test coverage by leveraging known
relationships between different inputs and outputs and checking for violations of in-
variants that should hold even in the presence of failures. Several notable examples
of robust testing practices in the industry include Netflix Simian Army, a suite of
tools designed to randomly introduce failures into Netflix’s cloud infrastructure to
ensure resilience; Jepsen, a rigorous testing framework for distributed databases that
focuses on uncovering consistency violations under various fault conditions; and Verdi,
a research project focused on the formal verification of distributed systems implemen-
tations.
Formal verification serves as an overarching term for a collection of techniques
aimed at mathematically proving whether a system’s design or implementation ad-
heres to a precisely defined set of specifications. A specification is a written document
that describes the intended behavior and properties of a system in an unambiguous
manner. While human language is often verbose and can include more information
than is strictly necessary to convey the core requirements, mathematics offers a more
concise and precise language for describing most phenomena in the physical and com-
putational worlds. Unfortunately, even mathematical proofs can sometimes rely on
informal arguments expressed in human language. Mathematical logic, particularly
84
propositional logic, and temporal logic, can help to eliminate this redundancy and im-
precision, leading to more rigorous and unambiguous specifications and proofs, which
form the basis of formal methods.
The primary goal of formal verification is to identify potential errors and flaws in
the system’s design early in the development lifecycle, based on the formal specifica-
tions. Therefore, the specification should be carefully derived from the core require-
ments of the system to be truly useful in detecting critical issues. The specification
must be kept as simple and focused as possible, so that the constraints on the sys-
tem’s behavior can be clearly and precisely defined, thereby mitigating the risk of
the ”second-system effect” (where an over-engineered second attempt at a system
introduces more problems than it solves). A useful rule of thumb is to exclude any
information from the specification that does not directly contribute to understanding
the fundamental behavior and properties of the system, as this extraneous informa-
tion is less likely to reveal critical violations. Creating a precise and comprehensive
specification is a significant challenge in software development. Careful consideration
must be given to composing a realistic specification that accurately reflects the sys-
tem’s requirements and subsequently writing source code that provably satisfies this
specification. Ideally, the effort involved in writing the formal specification should be
relatively less than the effort required to write the actual source code that implements
and verifies the specified behavior, to avoid doubling the programmer’s workload.
Temporal logic is a specific type of formal logic that is particularly well-suited for
proving the liveness properties of concurrent and distributed systems, ensuring that
the system will eventually reach desirable states.
85
communication protocols to concurrency control algorithms. Several tools, such as
the TLA+ Toolbox and the interactive proof assistant Coq, support the specification
and verification of systems using TLA+.
86
One such tool is Valgrind. We show how to use 3 tools associated with Valgrind.
The following description is done with the assumption that the user has previously
installed Valgrind on the system.
memcheck
We can analyze the memory usage and health of a any program say test5.c in our
example, which utilizes the OpenMPI library. This assumes that Valgrind is already
installed on the system.
1. in use at exit: The amount of heap memory that was still allocated when the
program terminated. A non-zero value suggests memory leaks.
2. total heap usage: The total number of allocation and deallocation operations
and the total bytes allocated.
87
LEAK SUMMARY: Categorizes unfreed memory blocks:
1. definitely lost: Memory that your program allocated and then lost all pointers
to. This is a clear memory leak that you should fix.
2. indirectly lost: Memory that is pointed to only by definitely lost memory. Fixing
the ”definitely lost” blocks will usually also resolve these.
3. possibly lost: Memory that might still be reachable through some complex
pointer paths, but Valgrind isn’t entirely sure. Investigate these, as they often
indicate leaks.
4. still reachable: Memory that was allocated but never freed, but the program
still holds pointers to it at exit. While technically not a leak in the strictest
sense, it’s often good practice to free this memory as well. You can suppress
these reports with –show-reachable=no.
It is better to reduce the size and number of blocks that are ’definitely lost or
indirectly lost’ as part of improving your source code.
Helgrind and DRD are primarily designed to analyze programs using POSIX
threads (pthreads) for concurrency errors and violations. It can provide a sanity
check for the OpenMPI program, but it was not originally designed for the message-
passing paradigm of our underlying runtime.
To get better output, get separate logs from each running process using the com-
mand provided below.
88
Interpreting the Output from Helgrind and DRD:
1. Valgrind (Helgrind or DRD) will output any detected threading errors or po-
tential deadlocks to the standard error (stderr).
2. Carefully examine the output. Look for reports of lock contention, potential
deadlocks (in Helgrind), or data races (in DRD).
3. Remember that these tools are interpreting the MPI processes as independent
programs that might be using internal threading within the MPI library itself.
The deadlocks they detect might be related to these internal threads or might
be misinterpretations of MPI communication.
89
Chapter 8
Miscellaneous
”Any subject can be made interesting, and therefore any subject can be made
boring.” – Hilaire Belloc
There are several computing architectures where specialized processors are de-
signed to relieve the burden of computationally intensive tasks. Custom accelera-
tors are crafted to offload computationally expensive tasks in an optimized manner
onto dedicated hardware. Accelerators are experiencing increasing adoption. For
example, the Graphics Processing Unit (GPU) is optimized for rapid matrix-based
operations and can function as a co-processor alongside the system’s main processor.
Furthermore, specialized cryptographic accelerators exist, such as the Intel Advanced
Encryption Standard Instructions (AES-NI), which are utilized by cryptographic li-
braries like OpenSSL and LibreSSL. Offloading specific tasks to these accelerators
can be viewed as a form of distributed system operating between the main processor
and these specialized hardware components.
The Unix file system provides a convenient hierarchical structure for the storage of
data in files and directories. It was not built by default to be fault-tolerant. However,
using a RAID structure, where many disks work together as a replicated data store,
90
can help to achieve resilience in the face of failure. There is also a problem related to
the increased storage cost, where the metadata per directory becomes significant at
petabyte scales and beyond. There is room for optimization, as the metadata has to
be loaded into memory from the disk to access the file. Due to the locality of data,
this is a significant bottleneck for scalability.
Every volume, despite its sophisticated architecture, includes the following core
features: a data file (the actual stored data), an index file (a searchable data struc-
ture), a journal file (which allows for persistence between restarts), a checkpoint (a
snapshot that provides safe points for recovery after failures), and a client (an appli-
cation that has access to the file system). These features have various names across
different file systems, such as HDFS [40], Google File System [26], and Tectonic [36].
Some file systems [38] allow for sequential modification to a disk in a log-like manner.
Another way to categorize storage systems is as single-tenant (Haystack [15], F4 [35],
HDFS [40]) or multi-tenant (Tectonic [36]). A number of these storage systems utilize
variants of Paxos and Raft to achieve distributed consensus across volumes. For more
information on distributed consensus algorithms, please refer to Chapter 4, Sections
4.1 and 4.3.
There is room for optimizations to improve storage, leading to the development of
some distributed storage systems. Haystack [15] improves upon the Unix file system
by saving metadata in memory, thereby making lookups faster, while reading the ac-
tual file saved in the volume involves disk operations, thereby increasing throughput.
The metadata is spread over a wide area of memory, allowing for compact repre-
sentation in the Haystack store, rather than each directory storing the metadata.
However, it is necessary to ensure that there is sufficient memory to load a significant
amount of the metadata for lookup purposes. When an object is saved in a volume
in a Haystack store, it is replicated to many secondary volumes for redundancy. The
Haystack directory maps each volume to the metadata to facilitate reading the actual
file using the metadata. The Haystack cache is used to increase the lookup speed of
popular objects. This is ideal for immutable blobs, hence it follows the philosophy of
write once, read always, and rarely update.
Haystack is deficient in utilizing the access patterns of requests for objects, and
even the Haystack cache is not sufficient for granular control of access patterns. Hence,
F4 [35] was developed to allow for incorporating access patterns into the design of
91
a store. Rather than a RAID-like replication scheme, F4 utilizes distributed erasure
coding and Reed-Solomon codes to ensure smarter, lower-cost replication to several
secondary volumes. Objects saved in the store begin as ”hot” and then ”warm” over
time. The idea is based on the pattern that newer objects are more likely to be
accessed, while older objects are less likely to be accessed. This concept is known as
temperature zones. Volumes are structured in a way that once a memory limit for
a volume is reached, it is closed for writing and becomes read-only. These volumes
are organized into temperature zones to take advantage of access patterns. There are
also many systems (Pelican [14], RADOS [48]) that allow for intelligent use of access
patterns, and a system (CRUSH [47]) that provides the option to specify weights for
access patterns. Tectonic [36] is an evolution of the Haystack storage system, as it
introduced multi-tenant features. Each tenant has its own namespace and delegates
metadata lookup to a dedicated key store.
• Made use of mpi, so we don’t have to consider the low-level socket networking
stack and their quicks, between IPV4, and IPV6.
• Our philosophy has been to think locally and act globally. We do computations
on the node and communicate with other nodes by messaging.
• For the Paxos algorithm when using Unix timestamps as the round number or
ballots for their monotonically increasing properties, then a necessary prerequi-
site is to synchronize the time settings on at least the set of proposers, or across
92
the cluster, but that may be unnecessary. This paradigm was heavily used in
our implementations.
• Organizing the processes into groups with custom communicators. This allows
for targeted synchronization for grouped processes without impacting the total
processes in the application. This logic allows for precise control of a group of
processes.
• We can cancel pending requests and tune the criteria for quorum based on
business needs. This would impact how resilience of the Distributed Systems.
• We use pooling on receiving the message and checking each tag, rather than
waiting on specific tags to make code modular. With my quest to go low
level, rather than use C++, I tried objected-oriented design in C to enhance
modularity but abandoned the idea due to loss of type safety.
• Make use of a simple structure. Even our log for sequence Paxos is not a log,
but an abused linked list with some atomic primitives.
• Our sequence Paxos uses a single Paxos on each item that the proposer will
send. Unfortunately, our logic is restricting to only the possibility of having one
proposer.
• It is good to take steps to avoid both deadlocks and livelocks. Deadlock can
happen in a mismatched message order between send and receive especially in
blocking mode. It is possible in non-blocking mode to consider how request
objects are owned between the receiving and sending nodes.
93
• Livelock is possible too in a non-blocking case when we pull in a busy wait
manner. We exit from the end of the loop when we have received the expected
number of messages. It can be sensible to keep track of the number of exchanged
messages to force an exit from the endless loop.
• There are problems with passing pointers across nodes. This is because we
don’t have a universal shared memory. Always keep pointers local as a lack of
distributed memory makes indirection on a pointer useless.
• two-phase-commit.c
The execution of the transactional functions happens at slightly different stages
for the coordinator and the participants. The coordinator acts as the trigger and
executes the ”commit” action locally based on the votes, while the participants
execute it upon the coordinator’s command.
Potential Issues:
94
– Error Handling After Decision: If the coordinator successfully decides to
commit and sends COMMIT messages, but one or more participants fail
to execute the commit, the coordinator isn’t aware of this failure in the
current implementation.
• sequence-paxos4.c
The C code implements the Sequence Paxos consensus algorithm using MPI to
coordinate actions between processes acting as Clients, Proposers, Acceptors,
and Learners. It’s designed to agree on a sequence of values in a distributed
system, maintaining order even with potential failures. The Client proposes a
sequence of values, and the Proposers, Acceptors, and Learners work through
the Paxos protocol for each value to achieve agreement. Learners inform the
Client of decided sequence positions, ensuring the Client proposes the next value
in the sequence only after the previous one is confirmed. This process continues
until all values are agreed upon, demonstrating distributed consensus on an
ordered sequence.
• lamport1-majority-voting8.c
The C code simulates a distributed key-value store where processes use Lamport
clocks and majority voting to handle concurrent updates. MPI is used for
communication, and each process maintains a local copy of the key-value store.
When a process wants to update a value, it sends messages to other processes
containing the key, the value, and its Lamport clock. Processes update their
local Lamport clocks based on received messages to track the order of events.
Each process gathers messages and uses the Lamport timestamps to determine
the most recent (agreed-upon) value based on a majority, effectively resolving
conflicts in a distributed setting.
Defensive Programming
One good practice is to wrap function calls to facilitate debugging and resilience
of OpenMPI program.
#include <mpi.h>
#include <stdio.h>
95
#define SAFE_MPI_CALL(call) \
do { \
int ret = (call); \
if (ret != MPI_SUCCESS) { \
char error_string[MPI_MAX_ERROR_STRING]; \
int length_of_error_string; \
MPI_Error_string(ret, error_string, &length_of_error_string); \
fprintf(stderr, "MPI Error (%s:%d): %s\n", __FILE__, __LINE__, error_s
MPI_Abort(MPI_COMM_WORLD, ret); \
} \
} while (0)
This code introduces a safety mechanism for MPI function calls through a macro
named SAFE MPI CALL. When you wrap an MPI function call with this macro, it
automatically checks if the function executed successfully. If an error occurred, the
macro will fetch the specific error message from the MPI library, report it to the error
output along with the exact location in your code where the failure happened, and
then immediately stop the entire MPI program. This ensures that errors in MPI oper-
ations are clearly identified and prevent the program from continuing in a potentially
unstable or incorrect state. Using this macro promotes more robust and easier-to-
debug MPI applications by standardizing error checking. Unfortunately, there is an
overhead introduced by the macros, but consistent error handling outweighs the po-
tential downsides.
We have saved the best for the last. You must have noticed recurring themes in
96
the book where we make abstractions. For example, our leader election uses a failure
detector. Decomposing systems into components allows one to focus on a part of a
problem at the time. People call it adding one more layer of indirection, it helps to
reason about systems but avoid leaky abstractions at all costs. ”Most problems in
Computer science can be solved with another level of abstraction”.
• MIN THRESHOLD = 25: The lower CPU utilization threshold. If the average
97
CPU load falls below this, the script will potentially reduce the number of MPI
processes (though the current logic only increases).
• MAX THRESHOLD = 75: The upper CPU utilization threshold. If the average
CPU load exceeds this, the script will increase the number of MPI processes.
• PULSE TIME = 300: The interval (in seconds) at which the script checks the
CPU utilization and potentially scales the number of processes. This determines
how frequently the autoscaling logic is evaluated (currently set to 5 minutes).
Helper Functions
• get process output(process): Takes a [Link] object waits for the pro-
cess to finish, and returns its standard output (stdout), standard error (stderr),
and return code.
• run process(process): Takes a [Link] object prints its PID and in-
cludes commented-out example code for potentially doing other work while the
process runs or checking its status. It also includes a try...except...finally block
to get the process output and ensure the process is killed in case of an error or
completion.
98
• It initializes variables: command, process, pid to track the currently running
MPI process, num of processes to the initial number of MPI processes (set to
8), and previous cpu avg to 0 to ensure the first iteration triggers an action.
• It enters an infinite while True loop, which represents the continuous monitoring
and scaling process.
• Scaling Logic:
• Throttling: After the scaling decision (or if no scaling was needed), the script
pauses for PULSE TIME (300 seconds) before checking the CPU load again.
• Updating History: Finally, it updates previous cpu avg with the current cpu avg
for the next iteration’s change detection.
99
Remarks
However, when scaling down, we would need to explicitly terminate processes and
maintain a record of the active process count to ensure accurate resource management.
100
Figure 8.1: Map Reduce.
• mpi-scatter to send to every process (or even mpi send). An advanced user
may send to a group of users to handle intermediate nodes where secondary
reduction may be performed on a subset of nodes.
101
• Do computation between a subset of the node and set the result of the compu-
tation to a nonoverlapping set to aggregate the result.
• Regroup appropriately.
The author encourages the user to implement map-reduce using the description
provided in this section.
The shared variable is protected with a lock to protect a critical section and com-
pare and swap can be used to update the critical section in a non-blocking manner.
I am not sure of how compare-and-swap works in mpi, my implementation of dis-
tributed shared primitive would be enhanced without blocking, which is more in line
with our non-blocking approach in the core philosophy. if you adjust the resilience of
your algorithm to enhance fault tolerance within the limit provided by quorum, then
remember to cancel every outstanding request to prevent livelocks. All of our ap-
proaches to building a Distributed shared primitive are influenced by the work. One
of our variations of a shared variable across processes made use of one-sided commu-
nication to pass messages between processes and always select the maximum. This
works in the form of quasi-atomics and only works because our application makes use
of the monotonic increasing property as an invariant. See an example of commented
use in single-paxos3-snapshot.c.
102
{
val = MAX(val, vals[i]);
}
free(vals);
return val;
}
Another variation is just using the latest value as the total value shared across
every process. This logic makes it easier to implement a compare and swap in the
future if we decide to go fully non-blocking. See an example of use in single-paxos3-
snapshot.c.
Distributed Hashmap
We have used the Lamport clock to make a distributed hashmap where every node
has a local map (key-value pair) and in agreement, every node has the same key value
key based on quorum.
103
• Setting appropriate time drift to specific acceptable clock drift in your failure
detectors.
• The number of clients that can participate in quorum using the limits of the
synchronous, asynchronous, and partially Synchronous.
• Identify invariants as relations and use metamorphic testing to test with higher
coverage.
• Add assert on bad conditions violations within the source code and scope within
a condition flag that can be passed as a command-line argument to toggle the
assertion when needed.
104
• Time complexity
– communication steps
– All communication steps take one unit. The time between send(m) and
deliver(m) is at most one unit.
3. Write tests for the Distributed algorithm discussed in the book. We will give
the user the task of implementing tests as an exercise.
5. Set up a testbed with a simulated LAN with vagrant VM for running mpi
cluster.
7. Improve the [Link] using our OpenMPI framework. See our implemen-
tation in SubSection 8.4.1, then roll out yours and enjoy the challenge. Hints:
Use the ideas of incremental spawning of processes, rather than shutting every
process down and restarting with the expected number of nodes.
105
8. This project involves training a fundamental linear regression model. The opti-
mization method to be used is Stochastic Gradient Descent (SGD), a common
algorithm detailed at Stochastic Gradient Descent. The overarching goal is to
create the initial building blocks for a distributed learning system, specifically
focusing on Federated Learning. Federated Learning is a technique that allows
multiple devices or organizations to train a machine learning model collabora-
tively without sharing their data.
To simulate this distributed environment, we will use the OpenMPI framework.
Within this framework, each running process is an independent ’node’ or par-
ticipant in the federated learning system.
For the data used in training, there are two possible approaches:
106
References
107
[9] Parallel, Concurrent, and Distributed Programming in Java Specialization.
[Link] Date accessed: Septem-
ber 5, 2022.
[10] Parallel Programming for Science and Engineering Using MPI, OpenMP, and the
PETSc library. [Link]
[Link]. Date accessed: September 5, 2022.
[13] Attiya, Hagit and Welch, Jennifer . Distributed Computing: Fundamentals, Sim-
ulations and Advanced Topics. Wiley & Sons Inc., 2004.
[14] Shobana Balakrishnan, Richard Black, Austin Donnelly, Paul England, Adam
Glass, Dave Harper, Sergey Legtchenko, Aaron Ogus, Eric Peterson, and Antony
Rowstron. Pelican: A Building Block for Exascale Cold Data Storage. In Pro-
ceedings of the 11th USENIX Symposium on Operating Systems Design and Im-
plementation, pages 351–365, 2014.
[15] Beaver, Doug and Kumar, Sanjeev and Li, Harry C. and Sobel, Jason and Vajgel,
Peter . Finding a Needle in Haystack: Facebook’s Photo Storage. In Proceedings
of the 9th USENIX Symposium on Operating Systems Design and Implementa-
tion, 2010.
[16] Matthew Caesar, Miguel Castro, Edmund B. Nightingale, Greg O’Shea, and
Antony Rowstron. Virtual ring routing: Network routing inspired by dhts. SIG-
COMM Computer Communication Review, 36(4):351–362, 2006.
[17] Miguel Castro, Manuel Costa, and Antony Rowstron. Debunking Some Myths
108
about Structured and Unstructured Overlays. In Proceedings of the 2nd Con-
ference on Symposium on Networked Systems Design & Implementation, pages
85–98, 2005.
[18] Miguel Castro and Barbara Liskov. Practical byzantine fault tolerance and proac-
tive recovery. ACM Transactions on Computer Systems, 20(4):398–461, 2002.
[19] Ian Clarke, Oskar Sandberg, Brandon Wiley, and Theodore W. Hong. Freenet: A
Distributed Anonymous Information Storage and Retrieval System. In Designing
Privacy Enhancing Technologies, number 2009 in Lecture Notes in Computer
Science, pages 46–66, 2001.
[20] Coulouris, George and Dollimore, Jean and Kindberg, Tim and Blair, Gordon .
Distributed Systems: Concepts and Design. Addison-Wesley., 2012.
[21] Frank Dabek, M. Frans Kaashoek, David R. Karger, Robert Tappan Morris, and
Ion Stoica. Wide-Area Cooperative Storage with CFS. In Proceedings of the
Symposium on Operating Systems Principles, SOSP, volume 35, pages 202–215,
2001. Operating System Review.
[22] David G. Andersen and Hari Balakrishnan and M. Frans Kaashoek and Robert
Tappan Morris. The Case for Resilient Overlay Networks. In Proceedings of
HotOS-VIII: 8th Workshop on Hot Topics in Operating Systems, pages 152–157.
IEEE Computer Society, 2001.
[26] Sanjay Ghemawat, Howard Gobioff, and Shun-Tak Leung. The Google file sys-
tem. In Proceedings of 19th ACM symposium on Operating systems principles,
pages 29–43, 2003.
109
[27] Brian Hall. Beej’s Guide to Network Programming Using Internet Sockets. 2020.
[29] Márk Jelasity, Spyros Voulgaris, Rachid Guerraoui, Anne-Marie Kermarrec, and
Maarten van Steen. Gossip-based peer sampling. ACM Transactions on Com-
puter Systems, 25(3):8–45, 2007.
[30] Anne-Marie Kermarrec and Maarten van Steen. Gossiping in distributed systems.
ACM SIGOPS Operating Systems Review, 41(5):2–7, October 2007.
[31] Kubiatowicz, John and Bindel, David and Chen, Yan and Czerwinski, Steven
E. and Eaton, Patrick R. and Geels, Dennis and Gummadi, Ramakrishna and
Rhea, Sean C. and Weatherspoon, Hakim and Weimer, Westley and Wells, Chris
and Zhao, Ben Y. OceanStore: An Architecture for Global-Scale Persistent
Storage. In Proceedings of the International Conference on Architectural Support
for Programming Languages and Operating Systems, pages 190–201, 2000.
[32] João Leitão, José Pereira, and LuÍs Rodrigues. Gossip-Based Broadcast, pages
831–860. Springer, Boston, MA, 2010.
[33] Lynch, Nancy A. and Patt-Shamir, Boaz . Distributed Algorithms: Lecture Notes
for 6.852. Course note for Graduate course in Distributed Algorithms, 1993.
[35] Muralidhar, Subramanian and Lloyd, Wyatt and Roy, Sabyasachi and Hill, Cory
and Lin, Ernest and Liu, Weiwen and Pan, Satadru and Shankar, Shiva and
Sivakumar, Viswanath and Tang, Linpeng and Kumar, Sanjeev. F4: Facebook’s
Warm BLOB Storage System. In Proceedings of the 11th USENIX Conference
on Operating Systems Design and Implementation, pages 383–398, 2014.
[36] Satadru Pan, Theano Stavrinos, Yunqiao Zhang, Atul Sikaria, Pavel Zakharov,
Abhinav Sharma, Shiva P. Shankar, Mike Shuey, Richard Wareing, Monika Gan-
gapuram, Guanglei Cao, Christian Preseau, Pratap Singh, Kestutis Patieju-
nas, J. R. Tipton, Ethan Katz-Bassett, and Wyatt Lloyd. Facebook’s Tectonic
110
Filesystem: Efficiency from Exascale. In Proceedings of the 19th USENIX Con-
ference on File and Storage Technologies, pages 217–231, 2021.
[37] Alex Petrov. Database Internals: A Deep-dive into how distributed data systems
works. O’Reilly Media, Inc., 2019.
[38] Mendel Rosenblum and John K. Ousterhout. The Design and Implementation
of a Log-Structured File System. ACM Transactions on Computer Systems,
10(1):26–52, 1992.
[39] Antony Rowstron and Peter Druschel. Pastry: Scalable, Decentralized Object
Location, and Routing for Large-Scale Peer-to-Peer Systems. pages 329–350,
Berlin, Heidelberg, 2001. Springer.
[40] konstantin Shvachko, Hairong Kuang, Sanjay Radia, and Robert Chansler. The
Hadoop Distributed File System. In Proceedings of the 26th IEEE Symposium
on Mass Storage Systems and Technologies (MSST) , pages 1–10, 2010.
[41] Chrysoula Stathakopoulou, Tudor David, Matej Pavlovic, and Marko Vukolic.
Solution: Mir-BFT: Scalable and robust BFT for decentralized networks. Journal
of Systems Research (JSys), 2(1):1–34, 2022.
[42] Ion Stoica, Robert Morris, David Liben-Nowell, David R. Karger, M. Frans
Kaashoek, Frank Dabek, and Hari Balakrishnan. Chord: A Scalable Peer-to-
Peer Lookup Protocol for Internet Applications. IEEE/ACM Transactions on
Networking, 11(1):17–32, 2003.
[44] Ian J. Taylor and Andrew B. Harrison. Gnutella, pages 181–196. Springer,
London, 2009.
111
Programs. In Proceedings of the 14th ACM SIGPLAN Symposium on Principles
and Practice of Parallel Programming, pages 261–270, 2009.
[47] Sage A. Weil, Scott A. Brandt, Ethan L. Miller, and Carlos Maltzahn. CRUSH:
Controlled, Scalable, Decentralized Placement of Replicated Data. In Proceedings
of the ACM/IEEE Conference on Supercomputing, 2006.
[48] Sage A. Weil, Andrew W. Leung, Scott A. Brandt, and Carlos Maltzahn. RA-
DOS: A Scalable, Reliable Storage Service for Petabyte-Scale Storage Clusters.
In Proceedings of the 2nd International Workshop on Petascale Data Storage:
Held in Conjunction with Supercomputing ’07, pages 35–44, 2007.
[49] B.Y. Zhao, Ling Huang, J. Stribling, S.C. Rhea, A.D. Joseph, and J.D. Kubia-
towicz. Tapestry: a resilient global-scale overlay for service deployment. IEEE
Journal on Selected Areas in Communications, 22(1):41–53, 2004.
112
Formal verification provides a methodical approach for detecting potential errors in a distributed system's design by mathematically proving whether a system adheres to its specifications. This process increases confidence in the absence of critical bugs by ensuring that the system's implementation matches its formal specification, thus preventing design-level discrepancies. However, formal verification can be resource-intensive, requiring substantial effort to create precise specifications and perform rigorous proofs. This makes it an expensive and time-consuming endeavor, potentially limiting its practicality in all scenarios .
Defensive programming enhances MPI application robustness by implementing error-handling mechanisms that preemptively address failures. Using macros to wrap MPI function calls helps identify and address errors during execution. If a function fails, the error is reported with a specific message and location, aiding in debugging by providing direct feedback. This approach prevents the program from continuing in an unstable state, thereby ensuring errors are consistently managed, ultimately leading to more reliable and easier-to-debug applications .
Metamorphic testing enhances quality assurance by using known relationships between inputs and outputs to evaluate the correctness of a distributed system. By checking for violations of these relationships, testers can identify inconsistencies and potential bugs that traditional testing methods might miss. This strategy allows for better test coverage and helps ensure the system behaves correctly even under failure conditions, making it an effective tool for identifying subtle bugs in complex distributed environments .
In distributed systems, strong consistency guarantees that all read operations reflect the most recent write, maintaining a linearizable view of data. This approach prioritizes data accuracy and eliminates the risk of serving stale data but can compromise availability. During network partitions or node failures, a system requiring strong consistency may become unavailable if a quorum of nodes is unreachable. Conversely, eventual consistency focuses on maintaining high availability and partition tolerance, allowing the system to remain operational during failures. However, it may temporarily serve outdated data until the replicas converge to a consistent state, which incurs a trade-off against immediate consistency .
To prevent deadlocks in distributed systems, it's crucial to ensure proper sequencing of send and receive operations, especially in blocking modes, and to manage ownership of request objects between sending and receiving nodes. Implementing timeouts can also prevent resources from waiting indefinitely. Livelocks can be managed by avoiding busy-wait styles, ensuring that the system exits loops after expected message counts are achieved, thus preventing indefinite operation. These strategies involve careful design of communication protocols to avoid cyclic waiting and endless state changes without progress .
Message complexity and time complexity are crucial metrics for evaluating the efficiency of distributed systems. Message complexity refers to the number of messages required for completing an operation, which impacts the system's resource usage. Time complexity measures the interval required to execute operations, reflecting on the system's speed and responsiveness. Evaluating these metrics helps identify bottlenecks and optimize the system's performance by understanding how message exchanges and processing times affect scalability and load handling in distributed environments .
Implementing telemetry for distributed algorithm experimentation requires setting up a comprehensive monitoring system to capture communication and processing metrics accurately. It's necessary to design telemetry to efficiently aggregate data across processes, consider the overhead introduced, and ensure minimal impact on system performance. Scalable telemetry should capture real-time data, provide insights into algorithm efficiency, and support post-experiment analysis for optimizing distributed systems. Using shared distributed primitives can facilitate coordinated telemetry across nodes, ensuring that the experimentation environment accurately reflects operational scenarios .
Structured P2P systems enhance query matching efficiency by imposing a well-defined structure on the network topology. This structure, often based on Distributed Hash Tables (DHTs), associates each node with a unique key, enabling data to be precisely indexed and located. When a query is initiated, it is routed through the network using the key, efficiently directing the query to the node likely holding the desired data. This structured approach contrasts with unstructured systems where queries are broadcast more indiscriminately, which can lead to inefficiencies .
Erasure coding in distributed storage systems offers a more storage-efficient solution compared to traditional full replication methods. Unlike full replication strategies, which require a complete copy of data to be stored, erasure coding reduces the storage overhead significantly. This is achieved while maintaining robust fault tolerance by allowing the system to reconstruct lost data even when multiple storage drives or nodes fail concurrently. This efficiency makes erasure coding particularly valuable in large-scale systems .
The two-phase commit protocol ensures consistency by coordinating commits across participating nodes. The first phase involves the coordinator requesting approval from participants, and in the second phase, based on participant responses, the coordinator decides to either commit or abort the transaction. Despite its effectiveness, this protocol has shortcomings, such as the lack of confirmation messages after committing, which means participants may fail to execute the commit unbeknownst to the coordinator. This absence of feedback can lead to inconsistencies if an error occurs post-decision .