[Zope3-checkins] SVN: Zope3/trunk/src/zope/app/locking/ Fixed bug a timeout bug when locking an object.

Michael Kerrin michael.kerrin at openapp.biz
Tue Jan 3 14:38:32 EST 2006


Log message for revision 41113:
  Fixed bug a timeout bug when locking an object.
  

Changed:
  U   Zope3/trunk/src/zope/app/locking/storage.py
  U   Zope3/trunk/src/zope/app/locking/tests.py

-=-
Modified: Zope3/trunk/src/zope/app/locking/storage.py
===================================================================
--- Zope3/trunk/src/zope/app/locking/storage.py	2006-01-03 19:15:42 UTC (rev 41112)
+++ Zope3/trunk/src/zope/app/locking/storage.py	2006-01-03 19:38:32 UTC (rev 41113)
@@ -111,6 +111,9 @@
         """
         Set the current lock for an object.
         """
+        ## call cleanup first so that if there is already a lock that
+        ## has timed out for the object then we don't delete it.
+        self.cleanup()
         keyref = IKeyReference(object)
         self.locks[keyref] = lock
         pid = lock.principal_id
@@ -119,7 +122,6 @@
             value = self.timeouts.get(ts, [])
             value.append(keyref)
             self.timeouts[ts] = value
-        self.cleanup()
 
     def delLock(self, object):
         """
@@ -130,7 +132,7 @@
 
     def cleanup(self):
         # We occasionally want to clean up expired locks to keep them
-        # from accumulating over time and slowing things down. 
+        # from accumulating over time and slowing things down.
         for key in self.timeouts.keys(max=int(timefunc())):
             for keyref in self.timeouts[key]:
                 if self.locks.get(keyref, None) is not None:

Modified: Zope3/trunk/src/zope/app/locking/tests.py
===================================================================
--- Zope3/trunk/src/zope/app/locking/tests.py	2006-01-03 19:15:42 UTC (rev 41112)
+++ Zope3/trunk/src/zope/app/locking/tests.py	2006-01-03 19:38:32 UTC (rev 41113)
@@ -17,13 +17,25 @@
 $Id:$
 """
 
-import sys, unittest
+import sys, unittest, time
 from zope.component.tests.placelesssetup import PlacelessSetup
 import zope.event
 from zope.testing import doctest
 from transaction import abort
 
+from zope.interface import Interface
+from zope.app.testing import ztapi
+from zope.app.file.file import File
+from zope.app.folder.folder import Folder
+from zope.app.locking.interfaces import ILockable, ILockTracker
+from zope.app.locking.adapter import LockingAdapterFactory
+from zope.app.locking.adapter import LockingPathAdapter
+from zope.app.locking.storage import ILockStorage, PersistentLockStorage
+from zope.app.traversing.interfaces import IPathAdapter
+from zope.app.keyreference.interfaces import IKeyReference
+from zope.security.testing import Principal, Participation
 
+
 class FakeModule:
     def __init__(self, dict):
         self.__dict = dict
@@ -55,22 +67,75 @@
         return cmp(id(self.object), id(other.object))
 
 
+class TestLockStorage(unittest.TestCase):
 
+    def setUp(self):
+        super(TestLockStorage, self).setUp()
+
+        ztapi.provideAdapter(Interface, IKeyReference, FakeKeyReference)
+        ztapi.provideAdapter(Interface, ILockable, LockingAdapterFactory)
+        ztapi.provideAdapter(None, IPathAdapter, LockingPathAdapter,
+                             "locking")
+
+        self.storage = storage = PersistentLockStorage()
+        ztapi.provideUtility(ILockStorage, storage)
+        ztapi.provideUtility(ILockTracker, storage)
+
+    def tearDown(self):
+        super(TestLockStorage, self).tearDown()
+        del self.storage
+
+    def test_timeout(self):
+        # fake time function to avoid a time.sleep in tests
+        def faketime(t):
+            zope.app.locking.storage.timefunc = lambda : t
+
+        ## test the cleanup of timedout locks.
+        content = File('some content', 'text/plain')
+        lockable = ILockable(content)
+        participation = Participation(Principal('michael'))
+        zope.security.management.newInteraction(participation)
+        now = time.time()
+        faketime(now)
+        lockinfo = lockable.lock(timeout = 1)
+        lockinfo.created = now
+        # two seconds pass
+        faketime(now + 2.0)
+        ## now lockable.locked is False since the lock has timed out
+        self.assertEqual(lockable.locked(), False)
+        ## since lockable.locked is False lockable.lock should succeed
+        ## assume this is done 3 seconds after the first lock
+        lockinfo = lockable.lock(timeout = 20)
+        lockinfo.created = now + 3.0
+        faketime(now + 4.0) # let time pass
+        self.assertEqual(lockable.locked(), True)
+        zope.security.management.endInteraction()
+        # reset the time function
+        zope.app.locking.storage.timefunc = time.time
+
+    def test_folder_lock(self):
+        folder = Folder()
+        file = File('some content', 'text/plain')
+        folder['file'] = file
+
+        participation = Participation(Principal('michael'))
+        zope.security.management.newInteraction(participation)
+
+        lockablefolder = ILockable(folder)
+        lockablefolder.lock()
+        self.assertEqual(lockablefolder.locked(), True)
+        lockablefile = ILockable(folder['file'])
+        self.assertEqual(lockablefile.locked(), False)
+
+        zope.security.management.endInteraction()
+
 def setUp(test):
     ps.setUp()
     dict = test.globs
     dict.clear()
-    dict['__name__'] = name    
+    dict['__name__'] = name
     sys.modules[name] = FakeModule(dict)
 
-    from zope.app.testing import ztapi
-    from zope.interface import Interface
-    from zope.app.locking.interfaces import ILockable, ILockTracker
-    from zope.app.locking.adapter import LockingAdapterFactory
-    from zope.app.locking.adapter import LockingPathAdapter
-    from zope.app.locking.storage import ILockStorage, PersistentLockStorage
-    from zope.app.traversing.interfaces import IPathAdapter
-
     ztapi.provideAdapter(Interface, IKeyReference, FakeKeyReference)
     ztapi.provideAdapter(Interface, ILockable, LockingAdapterFactory)
     ztapi.provideAdapter(None, IPathAdapter, LockingPathAdapter,
@@ -94,10 +159,15 @@
     zope.event.subscribers.pop()
     zope.app.locking.storage.timefunc = time.time
 
+
 def test_suite():
-    return doctest.DocFileSuite('README.txt', setUp=setUp, tearDown=tearDown,
-                                optionflags=(doctest.ELLIPSIS)
-                                )
+    suite = unittest.TestSuite()
+    suite.addTest(doctest.DocFileSuite('README.txt', setUp=setUp,
+                                       tearDown=tearDown,
+                                       optionflags=(doctest.ELLIPSIS)
+                                       ))
+    suite.addTest(unittest.makeSuite(TestLockStorage))
+    return suite
 
 
 if __name__ == '__main__':



More information about the Zope3-Checkins mailing list