The most commonly taught method of ensuring data-integrity during concurrent operations is known as pessimistic locking, or placing a lock around the entire read and update operation.
What we hope to achieve when using optimistic locking is to minimise the time that we are within a lock, and by doing so, increasing our throughput. Because when we hold a lock, we stop all other threads from executing. To do this we need a system that satisfies the following assumptions:
The process of applying all changes or no changes at all is known as a transaction. Transactions guarantee (or they should) atomicity, which basically means all or nothing- you either get all changes or no changes. This helps maintain integrity of our data as well.
Although in simple systems we can just assume that there is only a binary option: abort or apply changes, in more complex systems we can have more options. Take the VCS tool, Git for example. If we commit to a local copy of the shared data (the repository) and someone else commits as well, we can either:
Having a side-effect free transformation is also important. For example if we had stored all the transformations performed on the data and we wanted an arbitrary version, we can just apply the transformations to the initial version. A concrete example:
changes = [
lambda x: x.append(1),
lambda x: x.append(2)
]
data = []
for item in changes:
item(data)
assert data == [1, 2]
No matter how many times we apply the changes, as long as we start from the same initial data we will always get the same final result. This is especially useful if, say we wanted to get the older version of our code. We wouldn't want that to differ due to some external conditions.
Let's see how a trivial Optimistic Locking implementation would look like in Python. First we will need some machinery for the versioned objects:
from threading import RLock, Thread
class Versioned:
def __init__(self, data):
self.data = (data, 0)
self.lock = RLock()
def get(self):
return self.data
def commit(self, new_data, copied_version):
with self.lock:
if copied_version != self.data[1]:
raise Exception
self.data = (new_data, copied_version + 1)
If you look at the Versioned.get
method you will
notice that there is no locking. There needn't be a lock
because we are guaranteed that when the data
is modified by the commit
method, a lock will
be held.
Then we need the threads to update their copies of the data and automatically retry failed transactions:
def task():
while True:
try:
data, version = obj.get()
obj.commit(data + 1, version)
break
except:
pass
obj = Versioned(0)
threads = [Thread(target=task) for _ in range(10)]
[thr.start() for thr in threads]
[thr.join() for thr in threads]
assert obj.get() == (10, 10)
It won't solve all your concurrency issues, but when you get a situation which fits the four assumptions, please consider using optimistic locking.