On my most recent coding escapades, I had to design & implement a webserver capable of handling a staggering number of requests concurrently. Now, I could have gone with an abstracted high level multi-threading framework like Akka or Vert.x but the challenge was to only use the libraries provided by core Java.
The entire source code for this project’s implementation can be found at my github repo –> https://github.com/kitark06/king
The below jumplist provides a quick access to specific parts of the article.- Lets get familiar with the Parlance…
- Parallelize operations based on Data [Object Value]
- When using synchronized collections, operations are thread safe, initialization is not…
- Synchronization works on object instances, not object equality.
- Designing a Cached Object Distributor.
- Adding all the pieces together…
- Generating a random session id when user logs in with his ID
- Allowing user to submit scores for a level with this session id
- Retrieving a list of high scores for a specific level.
With these goals in mind, I based my plan of attack around the Data-parallel approach for this task. A while back I wrote an article where I shed some light on 2 means of making your code run in parallel. You can check that by clicking –> “MultiThreading in Applications featuring JDBC“
Lets get familiar with the Parlance…
In all multi-threaded programs, there are parts of code called the Critical-Section where if more than one threads are allowed to work in tandem, they might end up modifying each other’s data & corrupt the outcome. This is called a Race-Condition because multiple threads race against each other and end up sabotaging each other’s data. We avoid race conditions by Synchronizing these parts of the code to ensure only one thread can execute the code at any given time. This makes the code Thread-Safe.
You can think of the synchronized sections of the code as being guarded by a gate. Any thread which wants to run the code, must first acquire a Lock, which it promptly uses to lock other threads out to gain exclusive access. Once it has executed the code, it releases the lock & makes the code available to be executed by other threads. This process of acquiring a lock to gain exclusive access is called Mutual-Exclusion & the lock object is called a Monitor or more colloquially a Mutex Object.
Our goal in designing would be to use clever ways to try and reduce the critical section as much as possible by minimizing the synchronized blocks or by modifying their mutex lock condition so as to decrease locking & increase the parallelism of our code. Phew!! With all the esoteric words aside, lets dive straight into the meat of the article.
Parallelize operations based on Data [Object Value]
This is most profound choice to make as it will affect how the solution is designed. The key deciding factor (no pun intended) is the presence of primary key or an identifier associated with your data. If the operations with your data have little coupling with other keyed data, then its an ideal candidate for Data parallel approach.
For this style to work, we ideally need a data structure with minimal sync/locking when doing operations over decoupled data [different keys] so that multiple threads can do their I/O parallelly. Naturally, this works well with a key-value based data structure like an implementation of Map which is Thread-safe.
ConcurrentHashMap is once such key-value based data structure which guarantees us correct results under multithreaded workloads. Operations happening on the same bucket [same key] are synchronized, preventing multiple threads to work parallelly which does reduce the speed by a little bit. But operations involving different keys target different buckets and they work at almost full speeds parallelly.
In my implementation, all the operations were happening on the game’s level_Id which served as the spiritual primary key. I/O for different levels would happen without any locking or slowdowns and thus the implementation provided a high level of throughput.
An even more granular approach would be using both the level_Id & userID to lock certain operations which happen for the same user and level. Since the probability of this happening parallelly by two threads is low, this will lead to minimal locking. As a rule of thumb, the more specific the locking condition, the less are the chances of it happening simultaneously. Which leads to fewer non-blocking code, increasing throughput !!
This way, our code can operate on different sections of the concurrent HashMap all while being thread-safe and also attaining breath-taking speeds thanks to leveraging all those CPU cores via threads.
When using synchronized collections, operations are thread safe, initialization is not…
Since we are working with a thread safe synchronized collection, we can be sure that the collection operations [api calls] won’t collide with each other. However, operations which happen external to the collection need to be synched.
Eg,
A simple overlooked initialization operation could wreak havoc if not properly handled. Let me show how. To maintain a list of high-scores sorted in descending order, I used the following arrangement.
highScoresPerlevel = ConcurrentHashMap <String level_Id , SortedSet<UserScore>>
Level initialization happen adhoc [lazy evaluation] when a request for that level is encountered for the first time. Thus, before making I/O over the level, I first need to verify if the level’s backing data structure is initialized. Something like,
levelSortedSet = highScoresPerlevel.get(level_Id);
if (levelSortedSet == null)
levelSortedSet = new SortedSet<UserScore>();
This operation is NOT thread safe because the check ifnull and create are not atomic. There are myriad ways in which this operation can lead to corruption, one of which is shown below
- Thread A & B try to use the collection
- Thread A checks that the collection isnull & proceeds to create it BUT
- Before Thread A can create the collection, the context switches to Thread B
- Thread B also checks & realizes that the collection isnull and actually succeeds in creating it.
- It adds a score to it and context switches to Thread A
- A is unaware of the initialization done by B.
- It picks up from its last known state, which is point 2 (collection isnull, need to create)
- It creates a collection and overwrites the datastructure created by Thread B which had a value
- Thus, the entry added by thread B is lost.
We remedy this problem by synchronizing this part of the code. But look closely & we realize that this section only poses a problem if multiple threads try to create the datastructure for the same value/level. If they operate on different levels, they can execute in parallel.
So rather than blanket-banning & synchronizing the entire operation, we will reduce the scope of this critical section by only making it applicable for threads which work on the same level. But there is a problem..
Synchronization works on object instances, not object equality.
A very important realization to have is that synchronization always works on the same instance. It does not depend on the values of the object. If you want to sync more than one threads on a particular value, they must lock over the same Mutex instance. For example, consider the following 2 request which arrive simultaneously.
Post score 1000, level 1 , user “abc”.
Post score 2000, level 1 , user “pqr”.
I want to sync these operations as they belong to the same level so that they don’t happen in parallel. But the below mentioned code wont give the intended result as the synchronization is happening on the method argument which basically are 2 different Mutex object instances, albeit with same value.
updateScore(ScoreObject score)
{
synchronized(score.getLevel_Id)
{
levelSortedSet = highScoresPerlevel.get(level_Id);
if ( levelSortedSet == null )
levelSortedSet = new SortedSet<UserScore>();
}
}
Designing a Cached Object Distributor.
To get the intended result, we need to use the same object as Mutex. To do this, we can create a custom Class which would serve as our Mutex provider. This class will have a method to which we will pass a String, which will work as our value. This class will create & cache Mutex objects against String values and distribute the same object to all the threads who request a mutex using the same key.
This way, as same instances are distributed to multiple threads requesting the lock, we can achieve synchronization for threads using the same values.
public class Mutex
{
private Map<String, WeakReference<Mutex>> locker =
Collections.synchronizedMap(new WeakHashMap<String, WeakReference<Mutex>>());
/**
* Gets lock which is level specific.
*
* @param levelId the level id
* @return the lock
*/
public synchronized Mutex getLock(String levelId)
{
WeakReference<Mutex> mutex = locker.get(levelId);
if (mutex == null || mutex.get() == null)
{
mutex = new WeakReference<Mutex>(new Mutex());
locker.put(levelId, mutex);
}
return mutex.get();
}
}
WeakReferences are used to help with Garbage collection of the Mutex objects. Ideally, we don’t want the mutex to be GCed as long as its being referenced & used as a lock for our synchronized block. However, as soon as the thread releases the lock, the mutex should be removed from the cache so that it doesn’t persist in our map & cause a memory leak.
Unlike Strong reference, a WeakReference wont stop an object from getting GCed, but at the same time provides us with a pointer to that object. The object is not marked for GC till it has a strong reference to it, which in our case, is as long as the thread is using the mutex as a lock. As soon as the thread exits the sync block, it releases the lock and hence our mutex has no strong reference pointing to it & would be eligible for garbage collection. At the next GC event, the mutex will be garbage collected. As we make sure to never hold a reference to the Mutex Object in our code, we can safely bank on the JVM to prevent a memory leak scenario & at the same time not worry about clearing the objects ourselves.
There is one caveat though. If you look closely, you will see that our cache is basically a Map<String, WeakReference> where the weak references are pointers to our mutex object. While this makes sure that our mutexes are GCed, our weak references are in turned being strongly referenced by the Strings and thus will not be GCed. So over time, we will end up with a map having a lot of weak references which point to null because their mutexes have been reclaimed by the CG. This does not pose a huge problem, as the WeakReferences are merely pointers and wont consume a whole lot of memory.
A simple cleanup thread can be written & scheduled to mitigate this issue which would periodically run & iterate all the values in our cache, find WeakReferences which point to null, and then remove the subsequent entry.
Adding all the pieces together..
The below mentioned code is executed by a thread & has an instance of Mutex provided to it. This instance shared amongst all the threads which makes sure the sync happens correctly. They use the getLock method to pass a String , the level_Id and get a mutex.
Threads which want to lock on the same value, will be given the same instance of the mutex and will lock each other out, thereby attaining the desired synchronized behavior.
// Global/Field variable which is set via the constructor so as to ensure all threads use the same instance of the Mutex distributor.
Mutex mutex;
updateScore(ScoreObject score)
{
synchronized(mutex.getLock(score.getLevel_Id))
{
levelSortedSet = highScoresPerlevel.get(level_Id);
if ( levelSortedSet == null )
levelSortedSet = new SortedSet<UserScore>();
}
}