Skip to content
Snippets Groups Projects
Commit 57c22b34 authored by Alan De Smet's avatar Alan De Smet
Browse files

Working on overhauling exclusivefilelock

The existing exclusivefilelock.ExclusiveFileLock only worked safely if
you were doing VERY specific things, making it easy to mess up and
invalidate the exclusive lock. Trying to make robust.
parent 17ecd0c8
No related branches found
No related tags found
No related merge requests found
......@@ -2,7 +2,270 @@ import os
import fcntl
import logging
class ExclusiveLockFile:
""" Lock a file by name or handle, returning an open file handle
Intended to be used in a "with" block as a context manager.
>>> import tempfile
>>> with tempfile.TemporaryDirectory() as dir:
... lockfile = dir+"/lock"
... open(lockfile,"w").close()
... with ExclusiveLockFile(lockfile) as f:
... # Can assume exclusive control of lockfile
... # Can use f as a file object
... pass
... f = open(lockfile,"w")
... with ExclusiveLockFile(f):
... # Can assume exclusive control of lockfile
... pass
"""
def __init__(self, filename_or_file):
""" prepare to lock filename_or_file
filename_or_file can be str or bytes path to a file,
a Path-like object, or a file object.
file-like objects will only work to the extent that
fcntl.lockf works on them.
>>> from exclusivelockfile import ExclusiveLockFile
>>> from tempfile import mkdtemp
>>> tmpdir = mkdtemp()
>>> filename = tmpdir+"/lock"
>>>
>>> # You can't lock a file until it exists
>>>
>>> ExclusiveLockFile(filename) # doctest: +ELLIPSIS
Traceback (most recent call last):
...
FileNotFoundError: [Errno 2] No such file or directory: '.../lock'
>>>
>>> # You can lock a file-like object that support f.name and fcntl.lockf(f)
>>>
>>> f = open(filename,"w")
>>> with ExclusiveLockFile(f):
... # Can assume exclusive control of filename
... pass
>>> f.close()
>>>
>>> # You can lock an existing file by name (str, bytes, or Path-like object).
>>>
>>> with ExclusiveLockFile(filename) as f:
... # Can assume exclusive control of filename
... # Can use f as a file object
... pass
>>>
>>> # Also supports manual use; you MUST call unlock in this case!
>>>
>>> l = ExclusiveLockFile(filename)
>>> l.lock()
>>> # Can assume exclusive control of filename
>>> l.unlock()
>>>
>>> # Error cases
>>> l.unlock() # doctest: +ELLIPSIS
Traceback (most recent call last):
...
RuntimeError: Attempting to unlock .../lock, but I don't have that lock
>>> l.lock()
>>> l.lock() # doctest: +ELLIPSIS
Traceback (most recent call last):
...
RuntimeError: Attempting to lock .../lock, but I already have that lock
>>>
>>> import os
>>> os.unlink(filename)
>>> os.rmdir(tmpdir)
"""
try:
# Ensure it's Path-like
os.fspath(filename_or_file)
self.file = None
self.filename = filename_or_file
except:
# Hopefully it's file-like
self.file = filename_or_file
self.filename = None
if self.file is None:
# r+ so we don't create the file if it doesn't alreasy exist.
self.file = open(filename_or_file, "r+")
else:
try:
self.filename = self.file.name
except AttributeError:
raise TypeError(f"filename_or_file passed in does not appear to a a str, bytes, Path-like object, or a file object. It is a {type(filename_or_file)}")
self.locked = False
def lock(self):
if self.locked:
raise RuntimeError(f"Attempting to lock {self.filename}, but I already have that lock")
fcntl.lockf(self.file, fcntl.LOCK_EX)
self.locked = True
logging.debug("Locked {0}".format(self.filename))
def unlock(self):
if not self.locked:
raise RuntimeError(f"Attempting to unlock {self.filename}, but I don't have that lock")
fcntl.lockf(self.file, fcntl.LOCK_UN)
self.locked = False
logging.debug("Unlocked {0}".format(self.filename))
def __enter__(self):
self.lock()
def __exit__(self, type, value, traceback):
self.unlock()
def samefile(f1, f2):
""" Do do file objects correspond to the same file on disk?
All exceptions are eaten and the answer assumed to be False
>>> from tempfile import mkdtemp
>>> tmpdir = mkdtemp()
>>> a = open(tmpdir+"/a","w")
>>> b = open(tmpdir+"/b","w")
>>> a2 = open(tmpdir+"/a","w")
>>> samefile(a,b)
False
>>> samefile(a,a2)
True
>>> os.rename(tmpdir+"/a", tmpdir+"/c")
>>> a3 = open(tmpdir+"/c","w")
>>> samefile(a,a3)
True
>>> c = open(tmpdir+"/a","w")
>>> samefile(a,c)
False
>>> import io
>>> x = io.StringIO()
>>> samefile(a,x)
False
>>> samefile(a,88)
False
>>> import os
>>> os.unlink(tmpdir+"/a")
>>> os.unlink(tmpdir+"/b")
>>> os.unlink(tmpdir+"/c")
>>> os.rmdir(tmpdir)
"""
try:
f1_stat = os.stat(f1.fileno())
f2_stat = os.stat(f2.fileno())
r = ((f1_stat.st_ino == f2_stat.st_ino) and
(f1_stat.st_dev == f2_stat.st_dev))
return r
except Exception as e:
return False
def file_matches_filename(file, filename):
""" Does file object file correspond to what is now at filename?
This is a simple wrapper around samefile.
All exceptions are eaten and the answer assumed to be False
>>> from tempfile import mkdtemp
>>> tmpdir = mkdtemp()
>>> a = open(tmpdir+"/a","w")
>>> b = open(tmpdir+"/b","w")
>>> file_matches_filename(a,tmpdir+"/a")
True
>>> file_matches_filename(a,tmpdir+"/b")
False
>>> file_matches_filename(a,tmpdir+"/missingfile")
False
>>> os.rename(tmpdir+"/a", tmpdir+"/c")
>>> c = open(tmpdir+"/c","w")
>>> newa = open(tmpdir+"/a","w")
>>> file_matches_filename(a,tmpdir+"/a")
False
>>> file_matches_filename(a,tmpdir+"/c")
True
>>> import os
>>> os.unlink(tmpdir+"/a")
>>> os.unlink(tmpdir+"/b")
>>> os.unlink(tmpdir+"/c")
>>> os.rmdir(tmpdir)
"""
try:
r = samefile(file, open(filename))
return r
except Exception as e:
return False
class AtomicCreateIfMissing:
""" Context manager to atomically crate a file
>>> from exclusivelockfile import AtomicCreateIfMissing
>>> from tempfile import mkdtemp
>>> tmpdir = mkdtemp()
>>> filename = tmpdir+"/result"
>>> #
>>> with AtomicCreateIfMissing(filename) as f:
... if f is None:
... raise Exception("This won't happen, as filename doens't exist yet")
"""
"""
... f.write("this is the body of a test file\n")
... with open(filename) as input:
... print(input.read()) # prints nothing; file is still empty
>>> with open(filename) as f: print(f.read())
this is the body of a test file
>>> with AtomicCreateIfMissing(filename) as f:
... if f is None:
... return
... raise Exception("This won't happen, as the file already exists and AtomicCreateIfMissing returned None above")
"""
def __init__(self, filename, partial_suffix=".part"):
self.filename = filename
self.suffix = partial_suffix
self.lockfilename = filename + partial_suffix
self.lock = None
def __enter__(self):
if os.path.exists(self.filename):
# While redundant, this short circuit succeeds in an expected
# common case
logging.debug(f"AtomicCreateIfMissing({self.filename}): File already exists; returning None")
return None
f1 = open(self.lockfilename, "w")
lock = ExclusiveLockFile(f1)
lock.lock()
if file_matches_filename(f1, self.filename):
logging.debug(f"AtomicCreateIfMissing({self.filename}): File created while I was trying to; returning None")
lock.unlock()
return None
if not file_matches_filename(f1, self.lockfilename):
lock.unlock()
import subprocess
subprocess.run(["ls","-l",os.path.dirname(self.filename)])
import time
print("ohno")
#time.sleep(10j
raise Exception(f"Someting unexpected happened. I opened {self.lockfilename}, but it's not there anymore, and it didn't get moved to {self.filename}.")
logging.debug(f"AtomicCreateIfMissing({self.filename}): I am handling creation")
self.lock = lock
return f1
def __exit__(self, type, value, traceback):
if self.lock is not None:
os.rename(self.lockfilename, self.filename)
self.lock.unlock()
if False:
class ExclusiveLockFileOLD:
""" Lock a temporary file created for the purpose.
Intended to be used in a "with" block as a context manager.
......
......@@ -58,6 +58,162 @@ class DTestCase(unittest.TestCase):
#with tempfile.NamedTemporaryFile() as f:
ELFT_MSG_WAITING = "waiting for lock"
ELFT_MSG_LOCKED = "claimed lock, waiting on pipe"
ELFT_MSG_UNLOCKED = "released lock"
ELFT_MSG_DONE = "exiting"
def ELFT_lock_sleep_exit(title, file, pipe, queue):
queue.put(f"{title} {ELFT_MSG_WAITING}")
from csppfetch.exclusivelockfile import ExclusiveLockFile
import time
with ExclusiveLockFile(file):
queue.put(f"{title} {ELFT_MSG_LOCKED}")
pipe.recv()
queue.put(f"{title} {ELFT_MSG_UNLOCKED}")
def ELFT_lock_proc(title, file, qresults, qcommands):
qresults.put(f"{title} {ELFT_MSG_WAITING}")
from csppfetch.exclusivelockfile import ExclusiveLockFile
import time
with ExclusiveLockFile(file):
qresults.put(f"{title} {ELFT_MSG_LOCKED}")
cmd = qcommands.get()
if cmd != "unlock":
qresults.put(f"{title} received {cmd} expected unlock")
return 1
qresults.put(f"{title} {ELFT_MSG_UNLOCKED}")
cmd = qcommands.get()
if cmd != "done":
qresults.put(f"{title} received {cmd} expected done")
return 1
qresults.put(f"{title} {ELFT_MSG_DONE}")
class _ExclusiveLockFileTests(DTestCase):
def Xcreate_lock_process(self, name, lockfile):
from multiprocessing import Process,Pipe
parent_pipe,child_pipe = Pipe()
p = Process(target=ELFT_lock_sleep_exit,
args=[name, lockfile, child_pipe])
return(p,parent_pipe)
def create_lock_process(self, name, lockfile):
from multiprocessing import Process, Queue
qresults = Queue()
qcommands = Queue()
p = Process(target=ELFT_lock_proc,
args=[name, lockfile, qresults, qcommands])
return(p,qresults,qcommands)
def Xtest_exclusivelockfile(self):
from tempfile import TemporaryDirectory
from multiprocessing import Queue
# Testing multi-process locking code is awful.
with TemporaryDirectory() as dir:
p1,p2,p3 = [None]*3
try:
queue = Queue()
lockfile = dir+"/lock"
p1,p1_pipe = self.create_lock_process("p1", lockfile, queue)
p2,p2_pipe = self.create_lock_process("p2", lockfile, queue)
p3,p3_pipe = self.create_lock_process("p3", lockfile, queue)
p1.start()
self.assertEqual(queue.get(), f"p1 {ELFT_MSG_WAITING}")
self.assertEqual(queue.get(), f"p1 {ELFT_MSG_LOCKED}")
self.assertTrue(queue.empty())
p2.start()
self.assertEqual(queue.get(), f"p2 {ELFT_MSG_WAITING}")
self.assertTrue(queue.empty())
p1_pipe.send("go")
more = sorted([queue.get(),queue.get()])
self.assertEqual(more, [f"p1 {ELFT_MSG_UNLOCKED}", f"p2 {ELFT_MSG_LOCKED}"])
p1.join()
p1 = None
# When we get "p1 {ELFT_MSG_UNLOCKED}", the lockfile should be unlinked
# What happened with p3 shows up?
p3.start()
self.assertEqual(queue.get(), f"p3 {ELFT_MSG_WAITING}")
#print("1",queue.get())
#print("2",queue.get())
#print("3",queue.get())
self.assertTrue(queue.empty())
p2_pipe.send("go")
more = sorted([queue.get(),queue.get()])
self.assertEqual(more, [f"p2 {ELFT_MSG_UNLOCKED}", f"p3 {ELFT_MSG_LOCKED}"])
p2.join()
p2 = None
p3_pipe.send("go")
self.assertEqual(queue.get(), f"p3 {ELFT_MSG_UNLOCKED}")
p3.join()
p3 = None
self.assertTrue(queue.empty())
finally:
if p1 is not None: p1.terminate()
if p2 is not None: p2.terminate()
if p3 is not None: p3.terminate()
def test_exclusivelockfile(self):
from tempfile import TemporaryDirectory
# Trying to deterministically test non-deterministic code is awful
with TemporaryDirectory() as dir:
p1,p2,p3 = [None]*3
try:
lockfile = dir+"/lock"
p1,p1_res,p1_cmd = self.create_lock_process("p1", lockfile)
p2,p2_res,p2_cmd = self.create_lock_process("p2", lockfile)
p3,p3_res,p3_cmd = self.create_lock_process("p3", lockfile)
p1.start()
self.assertEqual(p1_res.get(), f"p1 {ELFT_MSG_WAITING}")
self.assertEqual(p1_res.get(), f"p1 {ELFT_MSG_LOCKED}")
self.assertTrue(p1_res.empty())
p2.start()
self.assertEqual(p2_res.get(), f"p2 {ELFT_MSG_WAITING}")
self.assertTrue(p2_res.empty())
self.assertTrue(p1_res.empty())
p1_cmd.put("unlock")
self.assertEqual(p1_res.get(), f"p1 {ELFT_MSG_UNLOCKED}")
self.assertEqual(p2_res.get(), f"p2 {ELFT_MSG_LOCKED}")
self.assertTrue(p2_res.empty())
self.assertTrue(p1_res.empty())
p1_cmd.put("done")
self.assertEqual(p1_res.get(), f"p1 {ELFT_MSG_DONE}")
self.assertTrue(p2_res.empty())
self.assertTrue(p1_res.empty())
self.assertTrue(False)
more = sorted([queue.get(),queue.get()])
self.assertEqual(more, [f"p1 {ELFT_MSG_UNLOCKED}", f"p2 {ELFT_MSG_LOCKED}"])
p1.join()
p1 = None
# When we get "p1 {ELFT_MSG_UNLOCKED}", the lockfile should be unlinked
# What happened with p3 shows up?
p3.start()
self.assertEqual(queue.get(), f"p3 {ELFT_MSG_WAITING}")
#print("1",queue.get())
#print("2",queue.get())
#print("3",queue.get())
self.assertTrue(queue.empty())
p2_pipe.send("go")
more = sorted([queue.get(),queue.get()])
self.assertEqual(more, [f"p2 {ELFT_MSG_UNLOCKED}", f"p3 {ELFT_MSG_LOCKED}"])
p2.join()
p2 = None
p3_pipe.send("go")
self.assertEqual(queue.get(), f"p3 {ELFT_MSG_UNLOCKED}")
p3.join()
p3 = None
self.assertTrue(queue.empty())
finally:
if p1 is not None: p1.terminate()
if p2 is not None: p2.terminate()
if p3 is not None: p3.terminate()
class _DownloadFunctionsTests(DTestCase):
def test_download_already_present_writable(self):
......@@ -479,13 +635,25 @@ def load_tests(loader, tests, ignore):
'd': sst,
}
print("start", tests.countTestCases())
tests.addTests(doctest.DocTestSuite(globs=context))
print("self",tests.countTestCases())
tests.addTests(doctest.DocTestSuite(csppfetch, globs=context))
print("csppfetch", tests.countTestCases())
import csppfetch.exclusivelockfile as exclusivelockfile
import csppfetch.daterange as daterange
tests.addTests(doctest.DocTestSuite(exclusivelockfile, globs=context))
print("exclusivelockfile", tests.countTestCases())
tests.addTests(doctest.DocTestSuite(daterange, globs=context))
print("daterange", tests.countTestCases())
tests.addTests(doctest.DocTestSuite(csppfetch.timelimit, globs=context))
print("timelimit", tests.countTestCases())
return tests
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Please register or to comment