CS 3700 - Networks and Distributed Systems

Project 6: Distributed, Replicated Key-Value Store

The milestone for this project is due at 11:59pm on Friday, December 4, 2020.
The final project is due at 11:59pm on Friday, December 18, 2020.


In his project, you will build a (relatively) simple, distributed, replicated key-value datastore. A key-value datastore is a very simple type of database that supports two API calls from clients: put(key, value) and get(key). The former API allows a client application to store a key-value pair in the database, while the latter API allows a client to retrieve a previously stored value by supplying its key. Real-world examples of distributed key-value datastores include memcached, Redis, DynamoDB, etc.

Of course, it would be simple to build a key-value store if it was a single process. However, your system must be replicated and support strong consistency guarantees. Thus, you will be implementing a simplified version of the Raft consensus protocol. Your datastore will be run multiple times, in parallel, and it will use the Raft protocol to maintain consensus among the replicas.

Your datastore will be tested for both correctness and performance. We will provide a testbed for your datastore that will simulate clients who execute put() and get() commands, as well as an unreliable network that can drop packets or make hosts unavailable. Part of your grade will come from the overhead your system has (i.e., fewer packets will result in a higher score), while another part will depend on the speed at which your datastore answers client queries (i.e. what is the query latency). Your results will be compared to your classmates' via a leaderboard.

Language and Libraries

You may write your code in whatever language you choose, as long as your code compiles and runs on unmodified Khoury College Linux machines on the command line. Do not use libraries that are not installed by default on the Khoury College Linux machines. Similarly, your code must compile and run on the command line. You may use IDEs (e.g. Eclipse) during development, but do not turn in your IDE project without a Makefile. Make sure you code has no dependencies on your IDE.

You may not use libraries or modules that implement consensus protocols. This includes any library that implements Raft, Paxos, Replicated View-state, or similar protocols. Obviously, you cannot use any libraries or software packages that implement a replicated key-value datastore. For example, your program cannot be a thin wrapper around memcached, etc. You may use libraries or modules that implement local database storage (e.g. SQLite, BerkeleyDB, LevelDB) if you want to use them for persistent storage within each replica. If you have any questions about whether a particular library or module is allowed, post on Piazza.

Your Program

For this project, you will submit one program named 3700kvstore that implements the replicated datastore. You may use any language of your choice, and we will give you basic starter code in Python. Keep in mind that you are writing a program that will be run multiple times, in parallel, to form a distributed system.

If you use C or any other compiled language, your executable should be named 3700kvstore. If you use an interpreted language, your script must be called 3700kvstore, must include a shebang, and be marked as executable. If you use a virtual machine-based language (like Java or C#), you must write a brief Bash shell script, named 3700kvstore, that conforms to the input syntax given below and then launches your program using whatever incantations are necessary. For example, if you write your solution in Java, your Bash script might resemble

#!/usr/bin/perl -w
$args = join(' ', @ARGV);
print 'java -jar 3700kvstore.jar $args';
Or, if you use python, your script might start with
import foo from bar
and should be marked executable.

Starter Code

Very basic starter code for the assignment in Python, as well as the testbed scripts, are available in /course/cs3700f20/code/project6. To get started, you should copy this directory into your own local directory (i.e., cp -r /course/cs3700f20/code/project6 ~/), since you will need the run.py and test.py scripts, as well as the example configuration files.

The starter code provides a bare-bones implementation of a datastore that simply connects to the LAN and broadcasts a "no-op" message once every second. You may use this code as a basis for your project if you wish, but it is strongly recommended that you do not do so unless you are comfortable with Python.

Testing Your Code

In order to evaluate your replicated datastore, we have provided a simulated test environment. The simulator will create an emulated network and all necessary sockets, execute several copies of your datastore with the appropriate command line arguments, route messages between the datastore replicas, and generate requests from clients. This script is included in the starter code, and you can run it by executing
./run.py <config-file>
where <config-file> is the configuration file that describes the test configuration you would like to use. Note that you will not need to modify the run script, or parse the config files (the run script parses the config files). You may create custom configs to test your code under different scenarios if you want to.

Config File Format

The configuration file that you pass to ./run.py contains a number of parameters that control the simulation. The file is formatted in JSON and has the following elements For example, a simple configuration with no events and a read-heavy workload might look like the following
    "lifetime": 30,
    "replicas": 5,
    "requests": 500,
    "mix": 0.9,
    "tests" : {
        "benchmarks" : {
            "total_msgs"     : [1200, 3000, 5000],
            "failures"       : [0, 1, 2],
            "duplicates"     : [0, 2, 5],
            "median_latency" : [0.0004, 0.002, 0.05]
and a more complex configuration with events and a lossy network might be
    "lifetime": 30,
    "replicas": 5,
    "requests": 300,
    "mix" : 0.2,
    "drops" : 0.15,
    "end_wait" : 5,
    "events" : [{"type": "kill_leader", "time": 8},
                {"type": "kill_leader", "time": 16}],
    "tests" : {
        "benchmarks" : {
            "total_msgs"     : [1000, 3000, 4000],
            "failures"       : [1, 10, 100],
            "duplicates"     : [0, 2, 10],
            "median_latency" : [0.00015, 0.005, 0.05]

./run.py Output

The ./run.py script will output any errors it encounters during the simulation, including malformed messages, messages to unknown destinations, replicas that unexpectedly quit, etc. Once the simulation completes, ./run.py prints (1) some statistics about your datastore's performance and behavior, (2) whether your datastore passed the correctness checks, and if not, why not, (3) how your datastore faired on the performance benchmarks. Note that performance is assessed only if the datastore passes the correctness checks.

Here is an example of the simulator's output when a datastore fails the correctness checks:

bash$ ./run.py config.json
# Simulation Finished

## Useful Information and Statistics
Leaders: FFFF 0001 FFFF 0003
Replicas that died/were killed: 0/2
Total messages sent: 6370
Total messages dropped: 183
Total client get()/put() requests: 60/40
Total duplicate responses: 3
Total unanswered get()/put() requests: 33/3
Total redirects: 19
Total get()/put() failures: 15/31
Total get() with incorrect response: 7

## Correctness Checks
    Error: >0 incorrect responses to get()
    Error: insufficient get() requests answered (33 > 60 * 0.50)

## Correctness Checks Failed, Skipping Performance Tests
Ideally, you would like all get() and put() requests to succeed without failing and for them to have low latency. Obviously, if your system is returning incorrect values to get() requests then your datastore has consistency issues. Furthermore, you would like the total number of packets to be as low as possible, i.e. the overhead of your datastore on the network should be low.

Here is another example when the correctness checks pass; notice the performance results are now printed:

bash$ ./run.py config.json
# Simulation Finished
## Useful Information and Statistics
Leaders: FFFF 0001 FFFF 0003
Replicas that died/were killed: 0/2
Total messages sent: 6370
Total messages dropped: 183
Total client get()/put() requests: 60/40
Total duplicate responses: 3
Total unanswered get()/put() requests: 0/3
Total redirects: 19
Total get()/put() failures: 15/31
Total get() with incorrect response: 0
## Correctness Checks
    All correctness tests passed

## Performance Tests
## <test metric>: <your score> <benchmark score>, <test result>
    Total Messages Between Replicas: 6370 >= 1000, Failed
    Total Failures and Unanswered Requests: 49 < 60, Passed
    Duplicate Responses to Clients: 3 < 4, Partial credit, needs improvement
    Median Response Latency to Clients: 0.0001 < 0.0002, Bonus!
In this case, the performance results of the datastore are mixed. This implementation has extremely low median latency and is earning bonus point, and the number of failures/unanswered requests is acceptable, but the datastore could be improved by sending fewer duplicate requests and many fewer messages overall.

Testing Script

Additionally, we have included a basic test script that runs your code under a variety of different configurations and also checks your code's compatibility with the grading script. If your code fails in the test script we provide, you can be assured that it will fare poorly when run under the grading script. To run the test script, simply type
bash$ ./test.py
Basic tests (5 replicas, 30 seconds, 100 requests):
	No drops, no failures, 80% read		[PASS]     Performance Tiers: 3 1 2 0
	No drops, no failures, 60% read		[PASS]     Performance Tiers: 2 1 2 0
	No drops, no failures, 40% read		[PASS]     Performance Tiers: 2 1 1 0
	No drops, no failures, 20% read		[PASS]     Performance Tiers: 3 2 2 1
Unreliable network tests (5 replicas, 30 seconds, 150 requests):
	10% drops, no failures, 80% read	[FAIL]
This will run your datastore on a number of test configurations, and will output whether your program performs sufficiently in each case. Note that the performance information is only printed if the datastore passes the correctness checks for a given test. The performance tier numbers correspond to Bonus (0), Passed (1), Needs Improvement (2), and Failed (3) with respect to total messages, failures/unanswered, duplicates, and median latency, respectively. If you wish to run one of the tests manually, you can do so with
bash$ ./run.py test-whatever.json

Message Format

To simplify this project, instead of using real packet formats, we will be sending our data across the wire in JSON (many languages have utilities to encode and decode JSON, and you are welcome to use these libraries). All messages must be encoded as a dictionary, and they must include the following four keys (at a minimum) : The simulator uses src and dst instead of IP addresses in order to route and deliver messages. Furthermore, the simulator supports multicast: if dst is set to "FFFF", the message will be delivered to all replicas (use multicast sparingly, since it is expensive). leader is the ID of the replica that the sender of the message believes is the leader. All messages must include the leader so that the simulator can learn which replica is the leader (otherwise, the simulator would have no way of determining this information).

type describes the type of the message. You may define custom types in order to implement your datastore (and you may add custom keys to the message dictionary in these cases). However, there are several message types that your replicas must support in order to handle requests from clients.

Note that in all of the above cases, the MID in a request must match the MID in the response. For example, the following would be a legal series of requests and responses, where 001A is a client and 0000 and 0001 are replicas:
Request 1     {"src": "001A", "dst": "0001", "leader": "FFFF",
                "type": "get", "MID": "4D61ACF83027", "key": "name"}
Response 1    {"src": "0001", "dst": "001A", "leader": "0000",
               "type": "redirect", "MID": "4D61ACF83027"}
Request 2     {"src": "001A", "dst": "0000", "leader": "0000",
               "type": "get", "MID": "9AB4CE50023", "key": "name"}
Response 2    {"src": "0000", "dst": "001A", "leader": "0000", "type": "ok",
               "MID": "9AB4CE50023", "value": "Christo Wilson"}
Again, you will need to develop additional, custom message types in order to implement the Raft consensus protocol. As long as your messages include the four minimum required fields (src, dst, leader, type), the simulator will ensure that your messages are delivered.

Command Line Specification

The command line syntax for your 3700kvstore program is given below. The simulator will pass parameters to each replica representing (1) the ID of the replica, and (2) the IDs of all other replicas in the system. The syntax for launching your datastore is therefore:
./3700kvstore <your ID> <ID of second replica> [ID3 [ID4 ...]]
For simplicity, all replica IDs are unique four-digit hexadecimal numbers (e.g., 0AA1 or F29A). You will use these IDs as the src and dst in your messages. Clients will also be assigned unique IDs by the simulator.

Connecting to the LAN

We will be using UNIX domain sockets to emulate a LAN. Each of your replicas will connect to a single domain socket (the way a server would connect to a single Ethernet cable). A replica will send and receive all messages over this socket (i.e. messages to/from other replicas, as well as messages to/from clients). Your program should be constantly reading from the socket make sure it receives all messages (they will be buffered if you don't read immediately). The simulator will take care of routing all sent messages to the appropriate destinations; thus, it's okay if you're not intimately familiar with how Domain Sockets work, or with how the simulator works.

Each replica should connect to a Domain Socket named "ID" (no-quotes), where ID is the replica's ID (i.e. the first ID it receives on the command line). We will be using the SOCK_SEQPACKET socket type, which provides a reliable message-oriented stream. Note that unlike Project 2, you do not need to pad the name of the domain sockets with \0.

Exactly how to connect to a UNIX domain socket depends on your programming language. For example, if you were using perl to complete the project, your code for connecting would look like:

use IO::Socket::UNIX;
my $lan = IO::Socket::UNIX->new(
    Peer => "<lan>"
You can then read and write from the $lan variable. In python, your code would look like
from socket import socket, SOCK_SEQPACKET, AF_UNIX
s.connect ('<lan>')
with similar results.

We encourage you to write your code in an event-driven style using select() or poll(). This will keep your code single-threaded and will make debugging your code significantly easier. Alternatively, you can implement your datastore in a threaded model, but expect it to be significantly more difficult to debug.

Datastore Requirements and Assumptions

The goal of your system is to accept put()s from clients and retrieve the corresponding data when a get() is issued. To ensure that data is not lost when a process crashes, all data from clients must be replicated, which then raises the dueling issues of how to maintain consistency and achieve high-availability. To meet these goals, your datastore will implement the Raft consensus protocol. Ultimately, your datastore should achieve the following two goals:
  1. Consistency - clients should always receive correct answers to get() requests.
  2. Availability - clients should be able to execute put() and get() requests at any time with low latency (i.e. your system should execute requests quickly).
Raft is a complicated protocol, and real-world datastores are extremely complicated artifacts. To simplify this project, there are several things you do not need to implement:

Implementing Raft

The Raft paper is specifically designed to be easy to read. To implement the protocol you should definitely start by reading the paper. Additional papers and resources are available on the Raft Github. I would suggest the following series of steps to begin working on your datastore implementation:
  1. Add basic support for responding to client get() and put() requests. At this point, you can respond to all requests with a "type": "fail" message.
  2. Implement the Raft election protocol (section 5.2 of the Raft paper); add the ability to respond to get() and put() requests with "type": "redirect" messages.
  3. Add a timeout to detect leader failures (i.e. if you don't hear from the leader in X milliseconds...) and make sure that the new election proceeds correctly.
  4. Implement a basic, empty version of the AppendEntries RPC call that doesn't replicate any data, but acts as a keepalive message from the leader to other replicas to prevent unnecessary elections.
  5. Implement the transaction log and the "state machine" (i.e. a dictionary containing the key/value pairs from clients, Section 5.3). Don't bother replicating the transactions, just ensure that the leader is able to correctly answer get() and put() requests.
  6. Improve your AppendEntries RPC call to actually send data to replicas. Ensure that updates are only committing when a quorum is in agreement.
  7. Add support for retrying failed commits and test it by experimenting with lossy network simulations.
  8. If you haven't already, modify the leader election to support the additional restrictions in Section 5.4.1; test your implementation on lossy networks with failed leaders.
  9. Implement the subtle commit restriction given in Section 5.4.2.
  10. Improve your AppendEntries RPC call to implement batching, i.e. a single AppendEntries may send multiple outstanding log entries to a given replica.
  11. Test, test, test, and test some more ;)
Step 6 will probably require the most time in terms of writing code and debugging, since it is the crux of the algorithm. Implementing steps 7-10 are necessary to ensure correctness of the protocol, but shouldn't be too difficult.

Performance Leaderboard

Part of your grade will be based on the performance of your datastore implementation, in terms of total messages sent, median latency, etc. To help you know how you're doing, the ./test.py script will report performance data from several of the harder tests to a central database. Note that your performance metrics are only reported if your are passing the essential correctness checks; if your scores aren't showing up, that means the correctness checks are not passing, or you need to improve your performance.

In order to see how your project ranks, you can run

bash$ /course/cs3700f20/bin/project6/printstats
----- TEST: 20% drops, 2 replica failure, 20% read -----
Least overhead:
1: cbw                         200 packets
2: foo                         220 packets

Lowest query latency:
1: foo                         1.00000
 2: cbw                         1.15000
which will print out the rank of each group for each performance test. In this particular example, cbw's project has lower overhead but answers queries more slowly. Obviously, you would ideally have like to have lower latency and fewer total packets sent.

Submitting Your Milestone

This is a very challenging project. To make sure that students start early, we require that students turn in a milestone. To complete the milestone, you must turn in a 3700kvstore program that is able to pass three of the test cases: simple-1, simple-2, and crash-1. The milestone is worth 1% of your final grade.

To submit the milestone, follow the turn-in instructions below, but substitute project6-milestone for project6 in the register and turnin commands.

Submitting Your Final Project

If you have not done so already, register yourself for our grading system using the following command:
$ /course/cs3700f20/bin/register-student [NUID]
NUID is your Northeastern ID number, including any leading zeroes.

Before turning in your project, you and your partner(s) must register your group. To register yourself in a group, execute the following script:

$ /course/cs3700f20/bin/register project6 [team name]
This command registers you for the milestone submission and the final submission. This will either report back success or will give you an error message. If you have trouble registering, please contact the course staff. You and your partner(s) must all run this script with the same [team name]. This is how we know you are part of the same group.

To turn-in your project, you should submit your (thoroughly documented) code along with two other files:

Your README.md, Makefile, source code, etc. should all be placed in a directory. You submit your project by running the turn-in script as follows:
$ /course/cs3700f20/bin/turnin <project6-milestone|project6> [project directory]
The first parameter determines if you are turning in the milestone or the final submission. [project directory] is the name of the directory with your submission. The script will print out every file that you are submitting, so make sure that it prints out all of the files you wish to submit! The turn-in script will not accept submissions that are missing a README.md or a Makefile. Only one group member needs to submit your project. Your group may submit as many times as you wish; only the last submission will be graded, and the time of the last submission will determine whether your assignment is late.

Double Checking Your Submission

To try and make sure that your submission is (1) complete and (2) will work with our grading scripts, we provide a simple script that checks the formatting of your submission. This script is available on the Khoury College Linux machines and can be executed using the following command:
/course/cs3700f20/code/check/raft_fmt_chk.py [path to your project directory]
This script will attempt to make sure that the correct files (e.g. README.md and Makefile) are available in the given directory, that your Makefile will run without errors (or is empty), and that after running the Makefile a program named 3700kvstore exists in the directory. The script will also try to determine if your files use Windows-style line endings (\r\n) as opposed to Unix-style line endings (\n). If your files are Windows-encoded, you should convert them to Unix-encoding using the dos2unix utility before turning in.


This project is worth 15% of your final grade in total. 1% comes from the milestone and 14% comes from the rest of the project. The final grading in this project will consist of At a minimum, your code must pass the test suite without errors or crashes, and it must obey the requirements specified above. Note that you will not be graded based on your performance in the leaderboards, they are simply for fun and edification. All student code will be scanned by plagiarism detection software to ensure that students are not copying code from the internet or each other.

You can see your grades for this course at any time by using the gradesheet program that is available on the Khoury College machines.

$ /course/cs3700f20/bin/gradesheet