forked from m-socha/ECE-454-Notes
-
Notifications
You must be signed in to change notification settings - Fork 0
/
notes.tex
936 lines (773 loc) · 61.8 KB
/
notes.tex
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
697
698
699
700
701
702
703
704
705
706
707
708
709
710
711
712
713
714
715
716
717
718
719
720
721
722
723
724
725
726
727
728
729
730
731
732
733
734
735
736
737
738
739
740
741
742
743
744
745
746
747
748
749
750
751
752
753
754
755
756
757
758
759
760
761
762
763
764
765
766
767
768
769
770
771
772
773
774
775
776
777
778
779
780
781
782
783
784
785
786
787
788
789
790
791
792
793
794
795
796
797
798
799
800
801
802
803
804
805
806
807
808
809
810
811
812
813
814
815
816
817
818
819
820
821
822
823
824
825
826
827
828
829
830
831
832
833
834
835
836
837
838
839
840
841
842
843
844
845
846
847
848
849
850
851
852
853
854
855
856
857
858
859
860
861
862
863
864
865
866
867
868
869
870
871
872
873
874
875
876
877
878
879
880
881
882
883
884
885
886
887
888
889
890
891
892
893
894
895
896
897
898
899
900
901
902
903
904
905
906
907
908
909
910
911
912
913
914
915
916
917
918
919
920
921
922
923
924
925
926
927
928
929
930
931
932
933
934
935
936
\documentclass[12pt,titlepage]{article}
\usepackage[margin=1in]{geometry}
\usepackage{parskip}
\let\stdsection\section
\renewcommand\section{\clearpage\stdsection}
\usepackage{hyperref}
\hypersetup{
linktoc=all
}
\begin{document}
\begin{titlepage}
\vspace*{\fill}
\centering
\textbf{\Huge ECE 454 Course Notes} \\ [0.4em]
\textbf{\Large Distributed Computing} \\ [1em]
\textbf{\Large Parsa Torabian} \\ [1em]
\Large Forked from Michael Socha \\
\textbf{\large University of Waterloo} \\
\textbf{\large Spring 2019} \\
\vspace*{\fill}
\end{titlepage}
\newpage
\pagenumbering{roman}
\tableofcontents
\newpage
\pagenumbering{arabic}
\section{Course Overview}
This course is intended to provide an introduction into distributed systems. \\ \\ Topics covered include:
\begin{itemize}
\item Some of the general architectures, protocols, and algorithms underlying modern distributed systems
\item Techniques for making distributed systems scalable and dependable, as well as the relevant trade-offs
\item Remote procedure call (RPC) frameworks for implementing distributed services
\item Scalable data processing frameworks for solving analytics problems
\item Fault-tolerant coordination services for configuration management, synchronization, and failure-detection
in distributed software systems
\item Stream-processing engines for real-time analytics
\item Relative merits of distributed versus centralized systems \\
\end{itemize}
Open-source software covered:
\begin{itemize}
\item Memcached (client-server paradigm)
\item Apache Thrift (Remote Procedure Calls)
\item Apache Hadoop (Big Data Processing)
\item Apache Spark (Big Data Processing)
\item Apache Zookeeper (Distributed Coordination)
\item Apache Kafka (Data Streaming)
\end{itemize}
\section{Introduction}
\subsection{What is a Distributed System?}
A distributed system is a collection of autonomous computing elements that appear to its users to be a single coherent system.
\subsection{Rationale Behind Distributed Systems}
Distributed systems are often used for the following reasons:
\begin{itemize}
\item Resource sharing
\item Integrating multiple systems
\item Centralized systems may not be as effective as many smaller systems working together
\item Users themselves may be mobile and distributed physically from one another
\end{itemize}
\subsection{Middleware}
Middleware is a layer of software that separates applications from their underlying platform. Middleware is often used by distributed
systems to support heterogeneous computers and networks while offering a single-system view. Common middleware services include:
\begin{itemize}
\item Communication (e.g. add job to remote queue)
\item Transactions (e.g. access multiple independent services atomically)
\item Service composition (e.g. Independent map system enhanced with independent weather forecast system)
\item Reliability (e.g. replicated state machine)
\end{itemize}
\subsection{Goals of Distributed Systems}
\subsubsection{Supporting Resource Sharing}
Resources can include:
\begin{itemize}
\item Peripheral devices (e.g. printers, video cameras)
\item Storage facilities (e.g. file servers)
\item Enterprise data (e.g. payroll)
\item Web page (e.g. Web search)
\item CPUs (e.g. supercomputers)
\end{itemize}
\subsubsection{Making Distribution Transparent}
Distribution transparency attempts to hide that processes and resources are physically distributed. Types of transparency include:
\begin{itemize}
\item \textbf{Access:} Hide differences in data representation and how data is accessed
\item \textbf{Location:} Hide where resource is located
\item \textbf{Migration:} Hide that a resource may move to another location
\item \textbf{Relocation:} Hide that a resource may move to another location while in use
\item \textbf{Replication:} Hide that a resource is replicated
\item \textbf{Concurrency:} Hide that a resource may be shared by users competing for that resource
\item \textbf{Failure:} Hide the failure and recovery of a resource
\end{itemize}
\subsubsection{Being Open}
An open distributed system offers components that can be easily used by or integrated into other systems. Key properties of openness
include:
\begin{itemize}
\item Interoperability
\item Composability
\item Extensibility
\item Separation of policy from mechanism
\end{itemize}
\subsubsection{Being Scalable}
Scalability covers a system's ability to expand along three axes:
\begin{itemize}
\item Size (e.g. adding users and resources)
\item Geography (e.g. users across large distances)
\item Administration (e.g. multiple independent admins)
\end{itemize}
Centralized systems tend to have limited scalability.
A few common scaling techniques include:
\begin{itemize}
\item Hiding communication latencies (i.e. trying to avoid waiting for responses from remote-server machines)
\item Partitioning (i.e. taking a component, splitting it into smaller parts, and spreading those parts across the system)
\item Replication (i.e. adding multiple copies of a component to increase availability, balance load, support caching, etc.)
\end{itemize}
\subsection{Common Fallacies of Distributed Computing}
The following are common beginner assumptions that can lead to major trouble:
\begin{itemize}
\item The network is reliable
\item The network is secure
\item The network is homogeneous
\item The network topology is static
\item Latency is 0
\item Bandwidth is unlimited
\item Transport cost is 0
\item There is one administrator
\end{itemize}
\subsection{Types of Distributed Systems}
\begin{itemize}
\item \textbf{Web sites and Web services}
\item ?Difference from cluster computing? \textbf{High performance computing (HPC):} High-performance computing in distributed memory settings, where message communication is used instead of shared memory.
\item \textbf{Cluster Computing:} Distributing CPU or I/O intensive tasks over multiple servers. Eg: Hadoop
\item \textbf{Cloud and Grid Computing:} Grid computing focuses on combining resources from different institutions, while cloud computing provides access to shared pools of configurable system resources. Eg: IAAS, PAAS, SAAS
\item \textbf{Transaction Processing:} Distributed transactions coordinated by transaction processing (TP) monitor.
\item \textbf{Enterprise Application Integration (EAI):} Middleware often used as communication facilitator in such systems. Eg: Client Apps to EAI Middleware to Enterprise Apps
\item \textbf{Sensor Networks:} Each sensor in a network can process and store data, which can be queried by some operator. Sensors often rely on in-network data
processing to reduce communication costs. Eg: Irrigation
\end{itemize}
\section{Architectures}
\subsection{Definitions}
\begin{itemize}
\item \textbf{Component:} A modular unit with a well-defined interface
\item \textbf{Connector:} Mechanism that mediates communication, coordination, or cooperation among components
\item \textbf{Software architecture:} Organization of software components
\item \textbf{System architecture:} Instantiation of software architecture in which software components are placed on real machines
\item \textbf{Autonomic System:} System that adapts to its environment by monitoring its own behavior and reacting accordingly
\end{itemize}
\subsection{Architectural Styles}
\subsubsection{Layered}
Control flows from layer-to-layer; requests flow down the hierarchy and responses flow up, with each layer only interacting with its two neighboring layers.
Layered architectures are often used to support client-server interactions, where a client component requests a service from a server component and waits for
a response. Many enterprise systems are organized intro three layers, namely a user interface, application layer, and database. Note that middle layers may
act as a client to the layer below and a server to the layer above.
Vertical distribution describes the logical layers of a system being organized as separate physical tiers. Horizontal distribution describes a single logical
layer being split across multiple machines. ?How do these two types of distribution affect performance, scalability, and dependability?
\subsubsection{Object-Based Architecture}
Components are much more loosely organized than in a layered architecture, with components being able to interact freely with one another and no strict concept
of layer.
\subsubsection{Data-Centered Architecture}
Components communicate by using a shared data repository, such as a database or file system.
\subsubsection{Event-Based Architecture}
Components communicate by sharing events. Publish/subscribe systems can be used for sharing news, balancing workloads, asynchronous workflows, etc.\\
In practice, many systems are hybrids of the architectures listed above.
\subsubsection{P2P Systems}
Peer-to-peer systemms rely on horizontal distribution and are designed to deal with churn (machines joining and leaving). They organize processes in a overlay network that defines a set of communication channels. Eg: Chord is a P2P distributed has table (DHT) that uses a ring overlay?
\subsubsection{Hybrid Architectures}
BitTorrent combines client-server and P2P architectures. Client nodes obtain tracker information from a server and then exchange data with peer nodes.
\subsection{Self-Management}
Self-managing systems can be constructed using a feedback loop that monitors system behaviors and adjusts the system's internal operation.
\section{Processes}
\subsection{Controlling Processes in Linux}
Below are some useful process-related commands:
\begin{itemize}
\item ps - lists running processes
\item top - lists processes with top resource usage
\item kill / pkill / killall - used to terminate processes
\item jobs - lists currently running jobs
\item bg - backgrounds a job
\item fg - foregrounds a job
\item nice / renice - sets priority of processes
\item CTRL-C - stops job in terminal
\item CTRL-Z - suspends job in terminal (use fg or bg to resume)
\end{itemize}
\subsection{Context Switching During IPC}
Inter-process communication can be used to help coordinate processes. However, IPC can be costly, since it requires a context switch from user to
kernel space and back.
\subsection{Threads}
Threads running in the same process can communicate through shared memory. Threads are sometimes referred to as lightweight processes (LWPs).
Multithreading applications often follow a dispatcher/worker design, where a dispatcher thread receives requests and feeds them to a pool
of worker threads.
\subsection{Hardware and Software Interfaces}
Processes and threads may interact with underlying hardware either directly through processor instructions or indirectly through library functions
or operating system calls. The general layers of interaction (from more abstract to less) are applications, libraries, operating system, and hardware.
These layers may interact with any layer below. Distributed systems often run in virtual environments that manage interactions with lower layers.
A major benefit of such virtualization is improved portability. Virtual machine monitors can offer additional benefits, including server consolidation,
live migration of VMs to support load balancing and proactive maintenance, and VM replication. ?Drawbacks of VMs?
\subsection{Interfaces in Network Systems}
Networked applications communicate by exchanging messages. The format of these messages is determined by a message-passing protocol.
\subsection{Server Clusters}
Servers in a cluster are often organized in three physical tiers. The first is a load balancer, followed by application servers followed by a database
or file system.
\section{Communication}
\subsection{Access Transparency}
Middleware can be used to provide access transparency for a distributed system, meaning that differences in data representation and how data is accessed
can be hidden. This is typically done by middleware isolating the application layer from the transport layer.
\subsection{Remote Procedure Calls (RPCs)}
RPCs serve as the equivalent of conventional procedure calls, but for distributed systems. RPCs are implemented using a client-server protocol, where
a client interacts with a network using a piece of software known as a client stub, and a server interacts with a network using a server stub. The
steps of a typically RPC are detailed below:
\begin{enumerate}
\item Client process invokes client stub using ordinary procedure call.
\item Client stub builds message, passes it to client OS.
\item Client OS sends message to server OS.
\item Server OS delivers message to server stub.
\item Server stub unpacks parameters, invokes appropriate handler in server process.
\item Handler processes message, returns result to server stub.
\item Server stub packs result into message, passes it to server OS.
\item Server OS sends message to client OS.
\item Client OS delivers message to client stub.
\item Client stub unpacks result, delivers it to client process.
\end{enumerate}
\subsubsection{Representation of Values}
Packing parameters into a message is known as parameters marshalling (with unpacking known as demarshalling). Although data representations can be
different across multiple machines (e.g. big-endian vs little-endian representation), the purpose of marshalling is to represent data in a
machine-independent and network-independent format that all communicating parties expect to receive. The signatures of RPC calls can be defined
using what is known as an interface definition language (IDL).
\subsubsection{Synchronous vs Asynchronous}
In a synchronous RPC, the client waits for a return value from the server before resuming execution. In an asynchronous RPC, the client may resume
execution as soon as the server acknowledges receipt of the request (client does not need to wait for a return value). A variation of an asynchronous
request where a client does not wait to receive any acknowledgment from the server is known as a one-way RPC.
\subsection{Message Queuing Model}
As an alternative to RPCs, components in a distributed system may also communicate using a message queue that persists messages until they are consumed
by a receiver. This technique allows for persistent communication that does not need to be tightly coupled in time. The primitive actions of a simple
message queue are:
\begin{itemize}
\item \textbf{Put:} Append message to queue.
\item \textbf{Get:} Block until queue not empty, then remove first message.
\item \textbf{Poll:} Check specified queue for messages, remove the first message. Never block.
\item \textbf{Notify:} Install handler to be called when message put into queue.
\end{itemize}
A key disadvantage of message queuing is that the delivery of the message ultimately rests with the receiver, and often cannot be guaranteed. Message
queuing follows a design similar to publish-subscribe architectures, and is an example of message-oriented middleware (MOM).
\subsection{Coupling Between Communicating Processes}
\subsubsection{Referential Coupling}
Referential coupling means that one process has to explicitly reference another one for them to communicate (e.g. connecting to web server using IP
address and port number).
\subsubsection{Temporal Coupling}
Temporal coupling means that processes must both be running for them to communicate (e.g. client cannot execute RPC if server is down).
\section{Apache Thrift}
\subsection{Overview}
Apache Thrift is an IDL first developed by Facebook, and now managed as an open-source project in the Apache Software Foundation. Thrift provides
a software stack and code generation engine to support RPCs between applications written in a wide variety of common languages, including C++, Java,
Python, and PHP.
\subsection{Thrift Software Stack}
The elements in the Thrift software stack, from top to bottom, are as follows:
\begin{itemize}
\item \textbf{Server:} Sets up the lower layers of the stack, and then awaits incoming connections, which it hands off to the processor. Servers
can be single or multi-threaded.
\item \textbf{Processor:} Handles reading and writing IO streams, and is generated by the Thrift compiler.
\item \textbf{Protocol:} Defines mechanism to map in-memory data structures to wire format (e.g. JSON, XML, compact binary)
\item \textbf{Transport:} Handles reading to and writing from network (e.g. using HTTP, raw TCP, etc)
\end{itemize}
\subsection{Distribution Transparency}
If Thrift clients must know the hostname and port for a given service, location transparency is violated. Also, although IDLs seek to provide access
transparency, that too may be violated in certain conditions (e.g. when certain protocol or transport exceptions are thrown).
\subsection{Application Protocol Versioning}
Thrift fields can be modified and remain compatible with old versions, provided that the rules below are followed:
\begin{itemize}
\item Fields are associated with tag numbers, which should never be changed.
\item New fields must be optional and provide default values.
\item Fields no longer needed can be removed, but their tag numbers cannot be reused.
\end{itemize}
\subsection{IDL Syntax}
\begin{itemize}
\item \textbf{Base Types:} bool, byte, i16, i32, i64, double, binary, string, void
\item \textbf{Containers:} list, set, map (of types from above)
\item \textbf{Other types:} const, typdef, enum, struct, exception
\item \textbf{Field modifiers:} required, optional, default
\item \textbf{Services and procedures:} service, extends, oneway
\item \textbf{namespace:} determines java package
\item \textbf{File inclusion:} Similar to C/C++ preprocessor
\end{itemize}
\subsection{Todo: Java Server implementations}
\begin{itemize}
\item \textbf{TSimpleServer:} uses a single thread and blocking I/O
\item \textbf{TNonblockingServer:} uses a single thread and non-blocking I/O. It can handle parallel connections but executes requests serially just like TSimpleServer
\item \textbf{THsHaServer:} uses one thread for network I/O and a pool of worker threads. It can process multiple requests in parallel
\item \textbf{TThreadPoolServer:} uses one thread to accept connections and then handles each connection using a dedicated thread drawn from a pool of worker threads.
\item \textbf{TThreadedSelectorServer:} uses a pool of threads for network I/O and a pool of worker threads for request processing.
\end{itemize}
\subsection{Some Programming Tips}
\begin{itemize}
\item To separate policy from mechanism, it is generally a bad idea to hardcode hostnames and port numbers; it is usually preferable to accept command line
arguments or use a property file.
\item Objects built on top of TCP/IP connections (e.g. TSocket, TServerSocket) should be reused if possible to avoid to overhead of establishing and tearing
down connections.
\item By default Thrift transports, protocols and client stubs are not thread safe; different threads should share these items carefully, or not share them
at all.
\end{itemize}
\section{Distributed File Systems}
\subsection{Modes of Access}
\begin{itemize}
\item \textbf{Remote access model:} Client sends requests to access file stored on remote server.
\item \textbf{Upload/download model:} File is downloaded from remote server, processed on client, and uploaded back to the server.
\end{itemize}
\subsection{Network File System (NFS)}
NFS is a DFS first developed by Sun Microsystems in 1984, and remains heavily used in Unix-like systems. Features include:
\begin{itemize}
\item Client-side caching to reduce client-server communication
\item Delegation of files from a server to a client, where a client can temporarily store and access a file, after which the delegation is recalled and
the file returned.
\item Compound procedures (e.g. lookup file, open file, and read data all in one call instead of 3). Note that NFS uses RPCs internally.
\item Exporting different parts of a file system to different remote servers.
\end{itemize}
\subsection{Google File System (GFS)}
GFS is a DFS that stripes files across multiple commodity servers, and is layered on top of ordinary Linux file systems. Although GFS is proprietary, the
Hadoop Distributed File System (HDFS) is a simplified open-source implementation of GFS. Below are a few key properties of GFS:
\begin{itemize}
\item \textbf{Synchronization:} A GFS master stores data about files and chunks. This metadata is cached in the main memory of chunk servers, and updates to
these chunks servers are written to their own main memory. The master periodically polls the chunk servers to keep this metadata consistent.
\item \textbf{Reads:} For reads, a client sends a file name and chunk index to the master, which responds with an address for the server storing that chunk.
\item \textbf{Writes:} Updates are written directly to chunk servers, after which the changes are propagated through all primary and then secondary replicas.
\end{itemize}
\subsection{File Sharing Semantics}
In centralized file systems, reads and writes are strictly ordered in time. This ensures that an application can always read its own writes. This behavior is
often described as UNIX semantics, and can be attained in a DFS so long as there is a single file server and files are not cached.
When a cached file in a DFS is modified, it is impractical to immediately propagate the changes back to the remote server (this would largely defeat the purpose
or caching in the first place). Instead, a widely implemented rule is that changes are only visible to the process or machine that modified a file, and only made
visible to other processes or machines when the file is closed. This rule is known as session semantics.
Semantics of DFS can be defined using combinations of various techniques. For example, NFSv4 supports session semantics and byte range file locking, and HDFS
provides immutable files along with an append function (e.g. for storing log data).
\section{Hadoop MapReduce}
\subsection{Background}
MapReduce frameworks address the problem of parallelizing computations on big data sets across many machines. Google did much of the initial work on a generic
MapReduce framework intended to phase out the need for special-purpose frameworks for different kinds of computations. Hadoop MapReduce is the most prominent
open-source implementation of Google's MapReduce framework, and was created by Yahoo engineers Doug Cutting and Mike Cafarella.
\subsection{High-level Architecture}
Hadoop consists of a MapReduce engine and an HDFS system. The MapReduce layer contains a JobTracker, to which clients can submit MapReduce jobs. This JobTracker
pushes work to available TaskTracker nodes, with the goal of picking a node close to the data that job requires.
\subsection{MapReduce Basics}
A few key guiding principles of MapReduce are:
\begin{itemize}
\item Components do not share data arbitrarily, as the communication overhead for keeping data synchronized across nodes can be very high.
\item Data elements are immutable. Computation occurs by generating new outputs, which can then be forwarded into the next computation phase.
\item MapReduce transforms lists of input data elements into lists of output data elements. A single MapReduce program typically does so twice, once for a
Map phase and once for a Reduce phase.
\end{itemize}
\subsubsection{Mappers}
A list of data elements is provided to a mapper, which transforms each element to some output element.
\subsubsection{Reducers}
A reducer receives an iterator of input values, and combines these values together to produce a single value.
\subsection{Technical Definitions}
\begin{itemize}
\item \textbf{InputSplit:} A unit of work assigned to a single map task.
\item \textbf{InputFormat:} Determines how input files are parsed, which also determines the InputSplit.
\item \textbf{RecordReader:} Loads data from input split, creating key-value pairs used by the mapper.
\item \textbf{Partitioner:} Determines which partition key-values pairs should go to.
\item \textbf{OutputFormatter:} Determines how output files are formatted.
\item \textbf{RecordWriter:} Writes records to output files.
\end{itemize}
\subsection{Fault Tolerance}
The main way Hadoop achieves fault tolerance is by restarting failed tasks. TaskTracker nodes are in constant communication with the JobTracker node, so should
a TaskTracker fail to respond in a given period of time, the JobTracker will assume it has crashed. If the job that crashed was in a mapping phase, then other
TaskTracker nodes will receive requests to re-run all map tasks previously run by the failed node. If the job that crashed was in a reduce phase, then other
TaskTracker nodes will receive requests to re-run all reduce tasks that were in progress on the failed node.
Such a simple fault tolerance mechanism is possible because mappers and reducers limit their communication with one another and the outside world. A potential
drawback with such an approach is that a few slow nodes (called stragglers) can create bottlenecks that hold up the rest of the program. One strategy to remedy
this problem is known as speculative execution, where the same task is assigned to multiple nodes to decrease the expected time in which it will be finished.
\subsection{Common Programming Patterns}
\subsubsection{Counting and Summing}
The simplest mapper for a counting problem would output a (key, 1) tuple for an instance of a given key. Alternatively, the mapper can aggregate data for an
entire document, or a combiner can be run just after the mapper to aggregate data across documents processes by a map task in a similar way to how a reducer
would.
\subsubsection{Selection}
Selection returns a subset of input elements that meet a certain requirement. Selection can be handled entirely in the map task; a reducer need not be
specified, in which case one output file will be generated per map task.
\subsubsection{Projection}
Projection returns a subset of fields of each input element (e.g. $a,b,c$ becomes $a,b$). A mapper can handle projection on individual elements, while a
reducer is only needed to eliminate duplicates.
\subsubsection{Index Inversion}
Index inversion produces a mapping of terms to document ids. Mappers emit (term, id) tuples, while reducers combine these tuples to generate lists of ids
for each term.
\subsubsection{?Cross-Correlation}
Cross-correlation problems provide as input a set of tuples of items, and for each possible pair of items, the number of items where the tuples co-occur is
measured. If $N$ items are provided, then $N^2$ values should be reported. This problem can be represented through a matrix if $N^2$ is small enough.
For larger values of $N^2$, MapReduce can be effective.
A pairing implementation where mappers return tuples with a pair and a 1 count is simple but often not performant. A ``stripes'' approach where mappers
return tuples with an item and a list of items it appears with tends to perform better despite requiring more memory for map-side aggregation, since there
end up being roughly $N$ keys (one for each element) instead of $N^2$ (one for each pair).
\section{Apache Spark}
\subsection{Background}
Cluster computing frameworks (e.g. Hadoop) allow for large-scale data computations that automatically handle work distribution and fault tolerance. However,
many of these frameworks do not leverage distributed memory efficiently, making them ineffective for computations with intermediate results. Apache Spark
introduces the concept of resilient distributed datasets (RDDs), which are parallel, fault-tolerant data structures that persist intermediary results in memory.
\subsection{Lineage}
A lineage is a model of the flow of data between RDDs. Spark designs a lineage to perform efficient computations, and also uses the lineage to determine which
RDDs to rebuild to recover from failures.
\subsection{Transformations and Actions}
Transformations are data operations that convert an RDD or a pair of RDDs into another RDD. Actions are data operations that convert an RDD into an output. When
an action is invoked on an RDD, the Spark scheduler puts together a DAG of transformations.
\subsubsection{Narrow vs Wide Dependencies}
The transformations in the DAG are grouped into stages. A single stage is a collection of transformations with narrow dependencies, meaning that one partition of
the output depends only on a single partition of the input (e.g. map, filter). The boundaries between stages feature wide dependencies, meaning that a single
partition of output may correspond to multiple partitions of input (e.g. group by key), so the transformations may require a shuffle.
\subsubsection{Pagerank}
???
\subsection{Spark vs Hadoop MapReduce}
Two common differences between Spark and MapReduce are:
\begin{itemize}
\item Spark stores intermediary results in memory, while MapReduce dumps results to HDFS between each job. The MapReduce approach can lead to unnecessary I/O
when running multiple jobs in sequence.
\item Spark can support some more complicated workflows within a single job rather than running several new ones.
\end{itemize}
\section{Distributed Graph Processing}
Many data sets (e.g. web hyperlinks, social networks, transportation routes) can be modeled using graphs. Computations concerning very large graphs can benefit
from distributed architectures. However, previously examined cluster computing frameworks (e.g. Hadoop MapReduce, Apache Spark) are typically not good fits for
graph processing, which motivates the creation of separate distributed graph processing frameworks.
\subsection{Google's Solution: Pregel}
Pregel is a proprietary framework developed by Google to perform computations on large distributed graphs with performance and ease of programming in mind.
A master/worker model similar to those of MapReduce and Spark is used, where each worker is responsible for a particular vertex partition. The framework
maintains some state information for each vertex, including:
\begin{itemize}
\item Problem-specific values
\item List of messages sent to vertex
\item List of outgoing edges from a vertex
\item A binary active/inactive state
\end{itemize}
\subsubsection{Supersteps}
Pregel applies a bulk synchronous parallel (BSP) model of computation that where a computation is divided into iterative rounds called supersteps. Workers
perform their computations asynchronously within each superstep, and only exchange data between supersteps. Specific actions that can be taken by a worker
within a superstep include:
\begin{itemize}
\item Receiving messages sent in the previous superstep.
\item Sending messages to be received in the next superstep.
\item Modifying vertex values or edges.
\item Deactivating a vertex, which is reactivated when it receives a message.
\end{itemize}
This distributed execution stops when all vertices are inactive and with no more messages to process.
\subsubsection{Initialization}
The master is responsible for assigning a section of the vertices to each worker. The default partitioner uses a simple hash function over vertices,
which generates a fairly even distribution of vertices. To take advantage of other properties of a graph (e.g. exploiting locality), the default
partitioning scheme can be overridden.
\subsubsection{Combiners and Aggregators}
Pregel supports combiners, which serve to reduce the amount of data exchanges over a network and the number of messages. Combiners are often applicable
when the function applied at each vertex is commutative and associative (e.g. sum, max).
Pregel also supports aggregators, which are used to compute aggregate statistics from vertex-reported values.
\subsubsection{Fault Tolerance}
At the end of each superstep, the master instructs workers to save their state (i.e. vertex values, edge values, incoming messages, aggregator values, etc.)
to persistent storage. When the master detects the failure of a worker node, it rolls back all workers to the last successful superstep, and the computations
resume. If deterministic replay of sent messages is possible, then there are some more efficient recovery mechanisms that only require that the failed worker
is rolled back to the last successful superstep.
\subsection{Open-source Implementations}
Pregel-like APIs are supported by various open-source frameworks such as Apache Giraph. Other APIs support centralized graph processing (e.g. GraphChi), which
tends to perform better for fairly small datasets that can fit in the main memory of a single machine.
\section{Consistency and Replication}
\subsection{Replication Overview}
\subsubsection{Purposes of Replication}
The main purposes behind data replication include:
\begin{itemize}
\item \textbf{Increasing reliability}, since data is more likely to be accessible if it exists on more replicas.
\item \textbf{Increasing throughput}, since replicas can serve read operations in parallel.
\item \textbf{Lowering latency}, since replicas physically close to clients can avoid costly round trips.
\end{itemize}
\subsubsection{Replicated Data Stores}
In replicated data stores, each data object is replicated across multiple hosts. Replicas may reside on the same host
as processes that interact with them, or may be remote.
\subsection{Consistency Models}
Consistency models describe the extent to which replicas can disagree on the current state of data. Managing consistency tends to
be straightforward for read-only data, but the need for sophisticated models arises once replicas hold mutable states, especially
ones shared among multiple processes. Selecting a good consistency model is often challenging, as application requirements
rarely map neatly to a specific model.
\subsubsection{Sequential Consistency}
Sequential consistency requires that:
\begin{enumerate}
\item The result of any execution is the same as if the operations by all processes on the data store were performed in some
sequential order.
\item The operations of each individual process appear in the order specified in its program.
\end{enumerate}
The notion of this consistency model is borrowed from shared memory multiprocessing.
\subsubsection{Causal Consistency}
The concept of causal consistency is also borrowed from shared memory multiprocessing, and is based on a notion of similarity similar
to Lamport's ``happens before'' model. A data store is causally consistent if writes related by some ``causally precedes'' operation
are seen by all processes in the same order. The ``causally precedes'' operator is defined as follows:
\begin{enumerate}
\item Op1 causally precedes Op2 if Op1 occurs before Op2 in the same process.
\item Op1 causally precedes Op2 if Op2 reads a value written by Op1.
\end{enumerate}
\subsubsection{Linearizability}
A data store is linearizable when the result of any execution is the same as if all operations on the data store were executed in some
sequential order that extends Lamport's ``happens before'' model. This means that if Op1 finishes before Op2 beings, then Op1 must precede
Op2 in sequential order.
\subsubsection{Eventual Consistency}
The idea behind eventual consistency is that if no updates take place for some time, all replicas will gradually become consistent.
Eventual consistency allows for different processes to observe write operations taking effect in different orders, even when the operations
are related by ``causally precedes'' or ``happens before''.
Eventual consistency is a very weak property on its own, and appears to guarantee very little if updates are applied continuously. To strengthen
these guarantees, session guarantees can be applied, which restrict the behavior of a single process in a single session. Examples of session
guarantees include:
\begin{itemize}
\item \textbf{Monotonic reads}, which means that if a process reads a particular value of a data item, then successive reads in that process will
return that value or a newer one.
\item \textbf{Read your own writes}, which means the effect of a write operation will always be seen in successive read operations by the same process.
\end{itemize}
\subsection{Replication Protocols}
\subsubsection{Primary-Based Replication Protocols}
Primary-based replication protocols involve updates being performed on a single primary replica, with updates then pushed to any backup replicas.
If a primary replica is stationary, where it may be often updated remotely by other servers, this protocol can be classified as remote-write. Alternatively,
local updates may be enabled by allowing the primary replica to migrate from server to server, in which case the protocol can be classified as local-write.
Primary-based replication protocols allow for some strong consistency models. However, a single primary replica can lead to performance bottlenecks and
availability problems.
\subsubsection{Quorum-Based Replication Protocols}
Quorum-based replication protocols allow all replicas to perform updates and to service reads. However, each update and read must be performed on a
sufficiently large subset of replicas, known as the write and read quorum respectively.
Let $N$ be the total number of replicas, $N_R$ be the read quorum, and $N_W$ be the write quorum. In a distributed database, the following two rules must
be satisfied:
\begin{enumerate}
\item $N_R + N_W > N$, which enables the detection of read-write conflicts.
\item $N_W + N_W > N$, which enables the detection of write-write conflicts.
\end{enumerate}
Subsets of replicas that do not satisfy the rules above are referred to as partial quorums. Alternate means must be found for resolving conflicts, such
as tagging updates with timestamps.
\subsubsection{Eventually-Consistent Replication Protocols}
In simple cases of eventual consistency, reads and updates are serviced by the closest replica. This replica lazily (i.e. async to original update) propagates
updates to other replicas. Replicas can go out of sync for various reasons, such as one being unreachable due to network failure. This is handled by what is
known as an anti-entropy mechanism. One sample anti-entropy mechanism is different replicas exchanging hash trees (also known as Merkle trees), which contain
hashes of blocks of data that can be used to find where two data sets differ.
\section{Fault Tolerance}
\subsection{Dependability}
Fault tolerance is closely related to the concept of dependability, which implies the following requirements:
\begin{itemize}
\item \textbf{Availability:} The system should operate correctly at a given instant in time (e.g. 99\% availability means a system operates
correctly 99\% of the time).
\item \textbf{Reliability:} The system should run continuously without interruption (e.g. mean time between failures (MTBF) of one month).
\item \textbf{Safety:} System failures should not have catastrophic consequences.
\item \textbf{Maintainability:} A failed system should be easy to repair.
\end{itemize}
\subsection{Failures}
A failure is occurring when a system cannot fulfill its promises. Five major types of failures of servers in distributed systems are:
\begin{itemize}
\item \textbf{Crash failure:} A server is working correctly and then halts.
\item \textbf{Omission failure:} A server fails to respond to requests or fails to receive or send messages.
\item \textbf{Timing failure:} A server's response falls outside of some specified time window.
\item \textbf{Response failure:} A server's response is incorrect.
\item \textbf{Arbitrary failure:} A server produces arbitrary failures at arbitrary times.
\end{itemize}
\subsection{Errors and Faults}
An error is a part of a system's state that may lead to failure (e.g. corrupt data read from hard disk). The fault is the underlying cause
of the error (e.g. hard disk head crashes). The main types of faults are:
\begin{itemize}
\item \textbf{Transient faults,} which emerge once and then disappear (e.g. birds flying in front of a microwave receiver)
\item \textbf{Intermittent faults,} which tend to reappear (e.g. loose contact on electrical connector)
\item \textbf{Permanent faults,} which continue to exist until some faulty component is replaced (e.g. burned out power supply)
\end{itemize}
\subsection{Masking Failure by Redundancy}
Triple-modular redundancy (or more generally, N-modular redundancy) is an approach to fault tolerance where multiple (3 or N) subsystems perform the same process,
after which the result is determined by a majority voting system. Thus, the system can continue to function if some of its subsystems fail so long as the
remaining systems can win a majority vote. To support voting, processes must be able to communicate with one another. This communication can be setup as
a flat group (i.e. complete graph between all processes) or in some sort of hierarchical structure with a coordinator process and multiple worker processes.
\subsection{Consensus Problem}
The consensus problem in distributed computing can be defined as follows:
\begin{itemize}
\item Each process has procedures propose(val) and decide()
\item Each process first proposes a value by calling propose(val) once
\item Each process learns the value agreed upon by calling decide()
\end{itemize}
The following properties must hold:
\begin{itemize}
\item \textbf{Agreement:} Two calls to decide() cannot return different values.
\item \textbf{Validity:} If decide() returns a value val, then some process called propose(val).
\item \textbf{Liveness:} Calls to decide() and propose(val), if a process does not fail, must eventually terminate.
\end{itemize}
\subsubsection{Variations}
The solvability of the consensus problem depends on several factors of a distributed environment, including:
\begin{itemize}
\item Whether processes are synchronous or asynchronous to one another
\item Communication delays between processes
\item Message delivery order
\item Whether messages are unicast (one-to-one) or multicast (one-to-many)
\end{itemize}
\subsection{RPC Semantics Under Failures}
RPC systems may exhibit the following failure scenarios, which depend on when a failure occurs in an RPC call:
\begin{itemize}
\item Client unable to locate server.
\item Request message from client to server is lost.
\item Server crashes after receiving request.
\item Reply message from server to client is lost.
\item Client crashes after sending request.
\end{itemize}
\subsubsection{Dealing with RPC Server Crashes}
When the server crashes after receiving a request, the client may not know whether the server crashed
before or after executing the request. Techniques for dealing with such RPC server crashes include:
\begin{itemize}
\item Reissuing the request, leading to at-least-once semantics, which are safe as long as the request is idempotent
(i.e. repeated executions have the same effect as one execution).
\item Not reissuing the request and reporting a failure, leading to at-most-once semantics, with no guarantee that the
request was processed.
\item Determining whether the request was processed, and if it was not, reissuing the request. This leads to exactly-once
semantics, which may be difficult to implement as there may be no clear way of knowing what action was performed.
\end{itemize}
\section{Apache ZooKeeper}
ZooKeeper is a centralized system that manages distributed systems as a hierarchical key-value store. Common uses of ZooKeeper
include maintaining configuration information, providing distributed synchronization, and managing naming registries for distributed
systems. ZooKeeper emphasizes good performance (particularly for read-dominant workloads), being general enough to be used for many
different purposes, reliability, and ease of use.
\subsection{Data Model}
ZooKeeper's data model is a hierarchical key-value store similar to a file system. Nodes in this store are called znodes, and may
contain data and children. Reads and writes to a single node are considered to be atomic, with values read or written fully or not
at all.
\subsection{Node Flags}
Two important flags nodes can carry are:
\begin{itemize}
\item \textbf{Ephemeral flags}, which make nodes exist as long as the session that created them is active, unless they were explicitly deleted.
\item \textbf{Sequence flags}, which make nodes append a monotonically increasing counter to the end of their path.
\end{itemize}
\subsection{Consistency Model}
ZooKeeper ensures that writes are linearizable and that reads are serializable (a similar property to sequential consistency).
ZooKeeper also guarantees per-client FIFO servicing of requests.
\subsection{Servers}
When running in replicated mode, all servers have a copy of the state in memory. A leader is elected at startup, and all updates go
through this leader. Update responses are sent once a majority of servers have persisted the change. Thus, in order to tolerate $n$
failures, $2n + 1$ replicated servers are required.
\section{Distributed Commits and Checkpoints}
\subsection{Transactions}
Transactions are indivisible operations with the following properties:
\begin{itemize}
\item \textbf{Atomic:} An operation occurs fully or not at all. This can be hard to achieve in distributed environments.
\item \textbf{Consistent:} A transaction is a valid transformation of the state.
\item \textbf{Isolated:} A transaction is not aware of other concurrent transactions.
\item \textbf{Durable:} Once a transaction completes, its updates persist, even in the event of failure.
\end{itemize}
\subsection{Two-Phase Commits}
The two-phase commit (2PC) is a distributed transaction commitment protocol. Systems running this protocol consist of a coordinator
and participants.
In the first phase, the coordinator asks participants whether they are ready to commit. These participants respond by voting. In the
second phase, the coordinator analyzes the participant votes. If all participants voted to commit, the commit proceeds. Otherwise, the
transaction is aborted.
The 2PC procedure described above makes a few key assumptions, including that processes are synchronous and that communication delays are bounded.
\subsubsection{Recovery from Failures}
Participants are able to make progress so long as they have received a decision. This decision is typically received from the coordinator, though
in the case of a coordinator crash, may be received from another participant.
The protocol described above blocks when all participants are waiting for an answer and the coordinator crashes, though some more complicated
implementations can overcome this. However, even in these more complicated implementations, the simultaneous failure of the coordinator and
some participants can make it difficult to detect whether all participants are ready for an answer.
\subsubsection{Distributed Checkpoints}
Recovery after failure is only possible if the collection of checkpoints by individual processes forms what is called a distributed snapshot. A
distributed snapshot requires that process checkpoints (i.e. representations of state at a certain point) contain a corresponding send event for
each message received. The most recent distributed snapshot is called the recovery line.
Coordinated checkpointing algorithms can be applied to create recovery lines. A sample two phase checkpointing algorithm works as follows:
\begin{enumerate}
\item The coordinator sends a checkpoint request message to all participant processes. When a participant receives the message, it pauses processing
of incoming messages, forms a checkpoint, and returns an acknowledgment (ACK) to the coordinator.
\item Once all ACKs are received by the coordinator, it sends a message to the participants that they can resume processing incoming messages.
\end{enumerate}
\section{RAFT}
\section{Apache Kafka}
Apache Kafka is an open-source distributed stream-processing platform. A few key features of Kafka are publish-subscribe message oriented communication,
real-time stream processing and distributed and replicated storage of these messages and streams. Common use cases for Kafka include:
\begin{itemize}
\item High-throughput messaging
\item Activity tracking
\item Metric collection
\item Log aggregation
\end{itemize}
\subsection{Topics and Logs}
A topic is a stream of records. Each topic can have multiple partitions, which allow a topic to be parallelized by splitting its data among multiple
Kafka brokers (i.e. servers), which in turn can have multiple replicas. Each record in a partition has a unique offset.
\subsection{Producers and Consumers}
Producers push records to Kafka brokers for a specific partition. These pushes can be batched (i.e. multiple records delivered in one operation), and
idempotent delivery is supported (i.e. duplicate commits can be prevented).
Consumers pull batches of records from Kafka brokers. Each consumer has a position within a partition, which can be updated once the records
are processed.
\subsection{Stream Semantics}
The main two ways of interpreting streams are:
\begin{itemize}
\item \textbf{Record streams}, where each record represents a single event (e.g. a single purchase).
\item \textbf{Changelog streams}, where each record represents an update to a state (e.g. a bank account balance change).
\end{itemize}
Kafka provides different APIs for interacting with the different types of streams. The KStream API is used to interact with record streams, while
the KTable API is used to interact with changelog streams. The KGroupedStream API can be used when converting from a KStream to a KTable.
\subsection{Windowed Streams}
Windowing allows for grouping records close in time. A few common types of windows include:
\begin{itemize}
\item \textbf{Hopping time windows}, which are defined by a size and advance interval (hop). An example is to compute something every 10 seconds (hop)
for the last 60 seconds of data (size).
\item \textbf{Tumbling time windows}, which are special non-overlapping versions of hopping time windows (i.e. the hop and size are equal).
\item \textbf{Sliding windows}, which slide continuously over the time axis.
\item \textbf{Session windows}, which aggregate data by activity periods (i.e. new windows are created once a period of inactivity exceeds a certain length).
\end{itemize}
\section{Clock Synchronization}
Clocks are instruments used to measure time. A lack of synchronization among clocks in a distributed system can result in unclear ordering of events.
\subsection{Background on Clocks}
\subsubsection{Calendars}
A few significant calendars used historically include:
\begin{itemize}
\item \textbf{Roman Calendar:} The Roman calendar is a lunar calendar (i.e. based on moon phases) with months of 29 or 30 days. The original Roman
calendar had only 10 months, with winter days unallocated. Reforms such as adding two extra months were later introduced, but the calendar remained
difficult to align with seasons, since it is a lunar calendar.
\item \textbf{Julian Calendar:} The Julian calendar was the first solar calendar (i.e. based on the Earth's movement around the sun). Introduced in
45 BC, the calendar improved on the Roman calendar, such as by adding the concept of a leap year (but not very accurately).
\item \textbf{Gregorian Calendar:} The Gregorian calendar is the most widely used calendar in the world. Introduced in 1582, its main improvement
over the Julian calendar is measuring leap years far more accurately (accurate to 1 day in 7700 years).
\end{itemize}
\subsubsection{Timekeeping Standards}
A few standards used for timekeeping include:
\begin{itemize}
\item \textbf{Solar day:} The interval between two consecutive transits of the sun (i.e. sun reaching highest point of the day). This is not constant.
\item \textbf{TAI (Temps Atomique International):} International time scale based on the average of multiple Cesium 133 atomic clocks.
\item \textbf{UTC (Coordinated Universal Time):} The world's primary time standard. Based on TAI, UTC uses leap seconds at irregular intervals to
compensate for the Earth's slowing rotation. As a result, UTC is currently tens of seconds behind TAI.
\end{itemize}
\subsubsection{Limitations of Atomic Clocks}
Atomic clocks are the most accurate timekeeping devices known, but are limited by relativistic time dilation. This was confirmed by the Hafele-Keating
experiment, in which atomic clocks were flown around the world and compared to stationary ones.
\subsection{Terminology}
Let $C(t)$ denote the value of a clock $C$ at a reference time (e.g. UTC) $t$.
\begin{itemize}
\item \textbf{Clock skew} of $C$ relative to $t$ is $\frac{dC}{dt} - 1$.
\item \textbf{Offset} of $C$ relative to $t$ is $C(t) - t$.
\item \textbf{Maximum drift rate} of $C$ is a constant $\rho$ such that $1 - \rho \leq \frac{dC}{dt} \leq 1 + \rho$.
\end{itemize}
\subsection{Network Time Protocol (NTP)}
NTP is a common time synchronization protocol for computers over variable-latency data networks.
Consider two hosts (A and B). Host A sends a request at $T_1$ to host B. Host B receives this request at $T_2$. Host B replies at $T_3$, and host $A$
receives this reply at $T_4$. The offset of B relative to A is $\theta = \frac{(T_2 - T_1) + (T_3 - T_4)}{2}$. The estimated one-way network delay is
$\delta = \frac{(T_4 - T_1) - (T_3 - T_2)}{2}$. NTP typically collects multiple $(\theta, \delta)$ pairs and uses the minimum value of $\delta$ as the
best estimation of the delay. The corresponding $\theta$ is considered to be the most reliable estimate of the offset.
\subsubsection{Clock Strata}
A reference clock, such as an atomic clock, is said to operate at stratum 0. A computer at stratum 1 has its time controlled by a stratum 0 device, a
computer at stratum 2 has its time controlled by a stratum 1 device, and so forth. Computers may only adjust the time of computers at greater strata, and
when adjusting, must make sure time does not appear to flow backwards.
\subsubsection{Precision}
NTP accuracy is typically measured in 10s of milliseconds. Some other protocols, such as Precision Time Protocol (PTP), can be used to achieve
sub-millisecond accuracy where NTP does not suffice.
\subsection{Logical Clocks}
Logical clocks are not concerned with the precise timing of events, but instead make sure that the time at each process follows a ``happens before''
relation (e.g. a message cannot be received before it is sent).
\subsubsection{Lamport Timestamping}
Lamport timestamping is a technique to enforce the ``happens before'' relation described above. The algorithm is:
\begin{itemize}
\item Before executing an event (send or receive), a process increments a counter.
\item Sent messages are tagged with the sender's timestamp.
\item Upon receiving a message, a process sets its counter to be the max of its current counter and the received counter (and increments the result).
\end{itemize}
\subsubsection{Vector Clocks}
Vector clocks represent logical time similar to Lamport timestamping. However, instead of a single counter, each clock stores a vector of a counter at each
process. Vector changes are similar to those of Lamport timestamping, but applied to a single entry in the vector instead of a process-wide counter.
Lamport timestamping guarantees that if an event $a$ happened before $b$, then the clock at a process that handled both $a$ and $b$ is smaller for $a$.
However, the reverse of the relationship does not hold for Lamport timestamping (i.e. a lesser time at $a$ than $b$ does not imply that $a$ happened
before $b$. Vector clocks guarantee the reverse as well, so they are said to provide a complete characterization of causality.
\section{CAP Principle}
The CAP theorem, also known as Brewer's theorem, states that it is impossible for a distributed system to provide all three of:
\begin{itemize}
\item \textbf{Consistency}, which means that nodes in a system agree on the most recent state of data.
\item \textbf{Availability}, which means that nodes are able to execute read-only queries and updates.
\item \textbf{Partition tolerance}, which means that the system continues to function if its servers are separated into disjoint sets (e.g. because of a network
failure).
\end{itemize}
\subsection{CP vs AP Systems}
CAP can be interpreted as that when a network partition occurs, systems need to choose between consistency (C) and availability (A). CP systems select
consistency over availability, while AP systems select availability over consistency. AP systems are appropriate for many applications that are
latency-sensitive, inconsistency-tolerant, and do not rely on transactions.
CP systems are able to enforce stronger consistency models (e.g. serializability, linearizability) than AP systems, which may strive for eventual or
causal consistency.
\subsection{PACELC}
PACELC is an extension of the CAP theorem that states that:
\begin{itemize}
\item If there is a \textbf{P}artition, a choice must be made between \textbf{A}vailability and \textbf{C}onsistency.
\item \textbf{E}lse, a choice must be made between (lower) \textbf{L}atency and \textbf{C}onsistency.
\end{itemize}
\subsection{Quorum-Based Consistency and CAP}
Systems that ensures every read observes the effects of all writes that completed before the read started, such as through using quorums, are said to
have strong consistency. This type of consistency is considered to be ``C'' in the context of the CAP theorem.
Some systems allow consistency settings to be determined on a per-request basis (e.g. one, quorum, all). These systems are said to have tunable consistency.
In general, systems where a read or write must occur at specific machines are not available. One solution to this is using sloppy quorums, in which a set
of replicas can change dynamically, such as to adjust to network partitions. This is used in database systems with hinted handoff, where an arbitrary node
can accept an update; if that update should ideally go to a different node, the update will be transferred to that node once the network partition is removed.
\end{document}