[Zodb-checkins] SVN: ZODB/branches/jim-thready/src/Z rebranch

Jim Fulton jim at zope.com
Tue Dec 22 16:31:06 EST 2009


Log message for revision 106933:
  rebranch

Changed:
  U   ZODB/branches/jim-thready/src/ZEO/StorageServer.py
  U   ZODB/branches/jim-thready/src/ZEO/tests/InvalidationTests.py
  U   ZODB/branches/jim-thready/src/ZEO/tests/servertesting.py
  U   ZODB/branches/jim-thready/src/ZEO/tests/testConversionSupport.py
  U   ZODB/branches/jim-thready/src/ZEO/tests/testZEO.py
  U   ZODB/branches/jim-thready/src/ZEO/tests/testZEO2.py
  U   ZODB/branches/jim-thready/src/ZEO/zrpc/client.py
  U   ZODB/branches/jim-thready/src/ZEO/zrpc/connection.py
  U   ZODB/branches/jim-thready/src/ZODB/FileStorage/FileStorage.py
  U   ZODB/branches/jim-thready/src/ZODB/FileStorage/format.py
  U   ZODB/branches/jim-thready/src/ZODB/tests/testFileStorage.py

-=-
Modified: ZODB/branches/jim-thready/src/ZEO/StorageServer.py
===================================================================
--- ZODB/branches/jim-thready/src/ZEO/StorageServer.py	2009-12-22 21:15:21 UTC (rev 106932)
+++ ZODB/branches/jim-thready/src/ZEO/StorageServer.py	2009-12-22 21:31:06 UTC (rev 106933)
@@ -32,6 +32,7 @@
 
 import transaction
 
+import ZODB.blob
 import ZODB.serialize
 import ZODB.TimeStamp
 import ZEO.zrpc.error
@@ -48,7 +49,7 @@
 from ZODB.POSException import StorageError, StorageTransactionError
 from ZODB.POSException import TransactionError, ReadOnlyError, ConflictError
 from ZODB.serialize import referencesf
-from ZODB.utils import u64, p64, oid_repr, mktemp
+from ZODB.utils import u64, p64, oid_repr
 from ZODB.loglevels import BLATHER
 
 
@@ -87,7 +88,6 @@
     def __init__(self, server, read_only=0, auth_realm=None):
         self.server = server
         # timeout and stats will be initialized in register()
-        self.timeout = None
         self.stats = None
         self.connection = None
         self.client = None
@@ -95,14 +95,14 @@
         self.storage_id = "uninitialized"
         self.transaction = None
         self.read_only = read_only
-        self.locked = 0
+        self.locked = False             # Don't have storage lock
+        self.locked_lock = threading.Lock() # mediate locked access
         self.verifying = 0
         self.store_failed = 0
         self.log_label = _label
         self.authenticated = 0
         self.auth_realm = auth_realm
         self.blob_tempfile = None
-        self.blob_log = []
         # The authentication protocol may define extra methods.
         self._extensions = {}
         for func in self.extensions:
@@ -138,25 +138,37 @@
             label = str(host) + ":" + str(port)
         self.log_label = _label + "/" + label
 
+    def notifyLocked(self):
+        # We don't want to give a lock to a disconnected client and, we
+        # need to avoid a race of giving a lock to a client while it's
+        # disconecting. We check self.connection and set self.locked while
+        # the locked_lock is held, preventing self.connection from being
+        # set to None between the check and setting self.lock.
+        self.locked_lock.acquire()
+        try:
+            if self.connection is None:
+                return False # We've been disconnected. Don't take the lock
+            self.locked = True
+            # What happens if, before processing the trigger we, disconnect,
+            # reconnect, and start a new transaction?
+            # This isn't possible because we never reconnect!
+            self.connection.trigger.pull_trigger(self._restart)
+            return True
+        finally:
+            self.locked_lock.release()
+
     def notifyDisconnected(self):
+        self.locked_lock.acquire()
+        try:
+            self.connection = None
+        finally:
+            self.locked_lock.release()
+
         # When this storage closes, we must ensure that it aborts
         # any pending transaction.
         if self.transaction is not None:
             self.log("disconnected during transaction %s" % self.transaction)
-            if not self.locked:
-                # Delete (d, zeo_storage) from the _waiting list, if found.
-                waiting = self.storage._waiting
-                for i in range(len(waiting)):
-                    d, z = waiting[i]
-                    if z is self:
-                        del waiting[i]
-                        self.log("Closed connection removed from waiting list."
-                                 " Clients waiting: %d." % len(waiting))
-                        break
-
-            if self.transaction:
-                self.tpc_abort(self.transaction.id)
-
+            self.tpc_abort(self.transaction.id)
         else:
             self.log("disconnected")
 
@@ -265,9 +277,12 @@
         if self.auth_realm and not self.authenticated:
             raise AuthError("Client was never authenticated with server!")
 
+        self.connection.auth_done()
+
         if self.storage is not None:
             self.log("duplicate register() call")
             raise ValueError("duplicate register() call")
+
         storage = self.server.storages.get(storage_id)
         if storage is None:
             self.log("unknown storage_id: %s" % storage_id)
@@ -280,8 +295,7 @@
         self.storage_id = storage_id
         self.storage = storage
         self.setup_delegation()
-        self.timeout, self.stats = self.server.register_connection(storage_id,
-                                                                   self)
+        self.stats = self.server.register_connection(storage_id, self)
 
     def get_info(self):
         storage = self.storage
@@ -419,6 +433,7 @@
         self.serials = []
         self.invalidated = []
         self.txnlog = CommitLog()
+        self.blob_log = []
         self.tid = tid
         self.status = status
         self.store_failed = 0
@@ -438,109 +453,99 @@
         if not self._check_tid(id):
             return
         assert self.locked
+
         self.stats.commits += 1
-        self.storage.tpc_finish(self.transaction)
+        self.storage.tpc_finish(self.transaction, self._invalidate)
+        # Note that the tid is still current because we still hold the
+        # commit lock. We'll relinquish it in _clear_transaction.
         tid = self.storage.lastTransaction()
-        if self.invalidated:
-            self.server.invalidate(self, self.storage_id, tid,
-                                   self.invalidated, self.get_size_info())
         self._clear_transaction()
         # Return the tid, for cache invalidation optimization
         return tid
 
-    def tpc_abort(self, id):
-        if not self._check_tid(id):
+    def _invalidate(self, tid):
+        if self.invalidated:
+            self.server.invalidate(self, self.storage_id, tid,
+                                   self.invalidated, self.get_size_info())
+
+    def tpc_abort(self, tid):
+        if not self._check_tid(tid):
             return
         self.stats.aborts += 1
+
+        # Is there a race here?  What if notifyLocked is called after
+        # the check?  Well, we still won't have started committing the actual
+        # storage. That wouldn't happen until _restart is called and that
+        # can't happen while this method is executing, as both are only
+        # run by the client thtread. So no race.
         if self.locked:
             self.storage.tpc_abort(self.transaction)
         self._clear_transaction()
 
     def _clear_transaction(self):
         # Common code at end of tpc_finish() and tpc_abort()
+        self.server.unlock_storage(self)
+        self.locked = 0
+        self.transaction = None
         self.stats.active_txns -= 1
-        self.transaction = None
-        self.txnlog.close()
-        if self.locked:
-            self.locked = 0
-            self.timeout.end(self)
-            self.stats.lock_time = None
-            self.log("Transaction released storage lock", BLATHER)
+        if self.txnlog is not None:
+            self.txnlog.close()
+            self.txnlog = None
+            for oid, oldserial, data, blobfilename in self.blob_log:
+                ZODB.blob.remove_committed(blobfilename)
+            del self.blob_log
 
-            # Restart any client waiting for the storage lock.
-            while self.storage._waiting:
-                delay, zeo_storage = self.storage._waiting.pop(0)
-                try:
-                    zeo_storage._restart(delay)
-                except:
-                    self.log("Unexpected error handling waiting transaction",
-                             level=logging.WARNING, exc_info=True)
-                    zeo_storage.connection.close()
-                    continue
-
-                if self.storage._waiting:
-                    n = len(self.storage._waiting)
-                    self.log("Blocked transaction restarted.  "
-                             "Clients waiting: %d" % n)
-                else:
-                    self.log("Blocked transaction restarted.")
-
-                break
-
     # The following two methods return values, so they must acquire
     # the storage lock and begin the transaction before returning.
 
     # It's a bit vile that undo can cause us to get the lock before vote.
 
-    def undo(self, trans_id, id):
-        self._check_tid(id, exc=StorageTransactionError)
-        if self.locked:
+    def undo(self, trans_id, tid):
+        self._check_tid(tid, exc=StorageTransactionError)
+
+        if self.txnlog is not None:
+            return self._wait(lambda: self._undo(trans_id))
+        else:
             return self._undo(trans_id)
+
+    def vote(self, tid):
+        self._check_tid(tid, exc=StorageTransactionError)
+
+        if self.txnlog is not None:
+            return self._wait(lambda: self._vote())
         else:
-            return self._wait(lambda: self._undo(trans_id))
-
-    def vote(self, id):
-        self._check_tid(id, exc=StorageTransactionError)
-        if self.locked:
             return self._vote()
-        else:
-            return self._wait(lambda: self._vote())
 
-    # When a delayed transaction is restarted, the dance is
-    # complicated.  The restart occurs when one ZEOStorage instance
-    # finishes as a transaction and finds another instance is in the
-    # _waiting list.
 
-    # It might be better to have a mechanism to explicitly send
-    # the finishing transaction's reply before restarting the waiting
-    # transaction.  If the restart takes a long time, the previous
-    # client will be blocked until it finishes.
-
+    _thunk = _delay = None
     def _wait(self, thunk):
         # Wait for the storage lock to be acquired.
+        assert self._thunk == self._delay == None
         self._thunk = thunk
-        if self.tpc_transaction():
-            d = Delay()
-            self.storage._waiting.append((d, self))
-            self.log("Transaction blocked waiting for storage. "
-                     "Clients waiting: %d." % len(self.storage._waiting))
-            return d
-        else:
+        if self.server.lock_storage(self):
+            assert not self.tpc_transaction()
             self.log("Transaction acquired storage lock.", BLATHER)
+            self.locked = True
             return self._restart()
 
-    def _restart(self, delay=None):
+        self._delay = d = Delay()
+        return d
+
+    def _restart(self):
+        if not self.locked:
+            # Must have been disconnected after locking
+            assert self.connection is None
+            return
+
         # Restart when the storage lock is available.
         if self.txnlog.stores == 1:
             template = "Preparing to commit transaction: %d object, %d bytes"
         else:
             template = "Preparing to commit transaction: %d objects, %d bytes"
+
         self.log(template % (self.txnlog.stores, self.txnlog.size()),
                  level=BLATHER)
 
-        self.locked = 1
-        self.timeout.begin(self)
-        self.stats.lock_time = time.time()
         if (self.tid is not None) or (self.status != ' '):
             self.storage.tpc_begin(self.transaction, self.tid, self.status)
         else:
@@ -575,12 +580,19 @@
             self._clear_transaction()
             raise
 
-        resp = self._thunk()
+        thunk = self._thunk
+        delay = self._delay
+        self._thunk = self._delay = None
+
+        resp = thunk()
         if delay is not None:
             delay.reply(resp)
-        else:
-            return resp
 
+        self.txnlog.close()
+        self.txnlog = None
+        del self.blob_log
+        return resp
+
     # The public methods of the ZEO client API do not do the real work.
     # They defer work until after the storage lock has been acquired.
     # Most of the real implementations are in methods beginning with
@@ -610,14 +622,18 @@
         os.write(self.blob_tempfile[0], chunk)
 
     def storeBlobEnd(self, oid, serial, data, id):
+        self._check_tid(id, exc=StorageTransactionError)
+        assert self.txnlog is not None # effectively not allowed after undo
         fd, tempname = self.blob_tempfile
         self.blob_tempfile = None
         os.close(fd)
         self.blob_log.append((oid, serial, data, tempname))
 
     def storeBlobShared(self, oid, serial, data, filename, id):
+        self._check_tid(id, exc=StorageTransactionError)
+        assert self.txnlog is not None # effectively not allowed after undo
+
         # Reconstruct the full path from the filename in the OID directory
-
         if (os.path.sep in filename
             or not (filename.endswith('.tmp')
                     or filename[:-1].endswith('.tmp')
@@ -735,6 +751,7 @@
         return error
 
     def _vote(self):
+        assert self.locked
         if not self.store_failed:
             # Only call tpc_vote of no store call failed, otherwise
             # the serialnos() call will deliver an exception that will be
@@ -929,8 +946,10 @@
              for name, storage in storages.items()])
         log("%s created %s with storages: %s" %
             (self.__class__.__name__, read_only and "RO" or "RW", msg))
-        for s in storages.values():
-            s._waiting = []
+
+        self.lockers = dict((name, []) for name in storages)
+        self.lockers_lock = threading.Lock()
+
         self.read_only = read_only
         self.auth_protocol = auth_protocol
         self.auth_database = auth_database
@@ -1044,7 +1063,7 @@
         Returns the timeout and stats objects for the appropriate storage.
         """
         self.connections[storage_id].append(conn)
-        return self.timeouts[storage_id], self.stats[storage_id]
+        return self.stats[storage_id]
 
     def _invalidateCache(self, storage_id):
         """We need to invalidate any caches we have.
@@ -1216,7 +1235,51 @@
             if conn.obj in cl:
                 cl.remove(conn.obj)
 
+    def lock_storage(self, zeostore):
+        self.lockers_lock.acquire()
+        try:
+            storage_id = zeostore.storage_id
+            lockers = self.lockers[storage_id]
+            lockers.append(zeostore)
+            if len(lockers) == 1:
+                self.timeouts[storage_id].begin(zeostore)
+                self.stats[storage_id].lock_time = time.time()
+                return True
+            else:
+                zeostore.log("(%r) queue lock: transactions waiting: %s"
+                             % (storage_id, len(lockers)-1))
+        finally:
+            self.lockers_lock.release()
 
+    def unlock_storage(self, zeostore):
+        self.lockers_lock.acquire()
+        try:
+            storage_id = zeostore.storage_id
+            lockers = self.lockers[storage_id]
+            if zeostore in lockers:
+                if lockers[0] == zeostore:
+                    self.timeouts[storage_id].end(zeostore)
+                    self.stats[storage_id].lock_time = None
+                    lockers.pop(0)
+                    while lockers:
+                        zeostore.log("(%r) unlock: transactions waiting: %s"
+                                     % (storage_id, len(lockers)-1))
+                        zeostore = lockers[0]
+                        if zeostore.notifyLocked():
+                            self.timeouts[storage_id].begin(zeostore)
+                            self.stats[storage_id].lock_time = time.time()
+                            break
+                        else:
+                            # The queued client was closed, so dequeue it
+                            lockers.pop(0)
+                else:
+                    lockers.remove(zeostore)
+                    if lockers:
+                        zeostore.log("(%r) dequeue: transactions waiting: %s"
+                                     % (storage_id, len(lockers)-1))
+        finally:
+            self.lockers_lock.release()
+
 class StubTimeoutThread:
 
     def begin(self, client):
@@ -1238,7 +1301,6 @@
         self._client = None
         self._deadline = None
         self._cond = threading.Condition() # Protects _client and _deadline
-        self._trigger = trigger()
 
     def begin(self, client):
         # Called from the restart code the "main" thread, whenever the
@@ -1281,7 +1343,8 @@
             if howlong <= 0:
                 client.log("Transaction timeout after %s seconds" %
                            self._timeout)
-                self._trigger.pull_trigger(lambda: client.connection.close())
+                client.connection.trigger.pull_trigger(
+                    lambda: client.connection.close())
             else:
                 time.sleep(howlong)
 
@@ -1337,7 +1400,7 @@
         self.rpc.callAsync('endVerify')
 
     def invalidateTransaction(self, tid, args):
-        self.rpc.callAsyncNoPoll('invalidateTransaction', tid, args)
+        self.rpc.callAsync('invalidateTransaction', tid, args)
 
     def serialnos(self, arg):
         self.rpc.callAsync('serialnos', arg)
@@ -1363,7 +1426,7 @@
 class ClientStub308(ClientStub):
 
     def invalidateTransaction(self, tid, args):
-        self.rpc.callAsyncNoPoll(
+        self.rpc.callAsync(
             'invalidateTransaction', tid, [(arg, '') for arg in args])
 
     def invalidateVerify(self, oid):

Modified: ZODB/branches/jim-thready/src/ZEO/tests/InvalidationTests.py
===================================================================
--- ZODB/branches/jim-thready/src/ZEO/tests/InvalidationTests.py	2009-12-22 21:15:21 UTC (rev 106932)
+++ ZODB/branches/jim-thready/src/ZEO/tests/InvalidationTests.py	2009-12-22 21:31:06 UTC (rev 106933)
@@ -318,9 +318,9 @@
         # tearDown then immediately, but if other threads are still
         # running that can lead to a cascade of spurious exceptions.
         for t in threads:
-            t.join(10)
+            t.join(30)
         for t in threads:
-            t.cleanup()
+            t.cleanup(10)
 
     def checkConcurrentUpdates2Storages_emulated(self):
         self._storage = storage1 = self.openClientStorage()
@@ -378,6 +378,34 @@
         db1.close()
         db2.close()
 
+    def checkConcurrentUpdates19Storages(self):
+        n = 19
+        dbs = [DB(self.openClientStorage()) for i in range(n)]
+        self._storage = dbs[0].storage
+        stop = threading.Event()
+
+        cn = dbs[0].open()
+        tree = cn.root()["tree"] = OOBTree()
+        transaction.commit()
+        cn.close()
+
+        # Run threads that update the BTree
+        cd = {}
+        threads = [self.StressThread(dbs[i], stop, i, cd, i, n)
+                   for i in range(n)]
+        self.go(stop, cd, *threads)
+
+        while len(set(db.lastTransaction() for db in dbs)) > 1:
+            _ = [db._storage.sync() for db in dbs]
+
+        cn = dbs[0].open()
+        tree = cn.root()["tree"]
+        self._check_tree(cn, tree)
+        self._check_threads(tree, *threads)
+
+        cn.close()
+        _ = [db.close() for db in dbs]
+
     def checkConcurrentUpdates1Storage(self):
         self._storage = storage1 = self.openClientStorage()
         db1 = DB(storage1)

Modified: ZODB/branches/jim-thready/src/ZEO/tests/servertesting.py
===================================================================
--- ZODB/branches/jim-thready/src/ZEO/tests/servertesting.py	2009-12-22 21:15:21 UTC (rev 106932)
+++ ZODB/branches/jim-thready/src/ZEO/tests/servertesting.py	2009-12-22 21:31:06 UTC (rev 106933)
@@ -56,3 +56,13 @@
 
     def callAsync(self, meth, *args):
         print self.name, 'callAsync', meth, repr(args)
+
+    @property
+    def trigger(self):
+        return self
+
+    def pull_trigger(self, func):
+        func()
+
+    def auth_done(self):
+        pass

Modified: ZODB/branches/jim-thready/src/ZEO/tests/testConversionSupport.py
===================================================================
--- ZODB/branches/jim-thready/src/ZEO/tests/testConversionSupport.py	2009-12-22 21:15:21 UTC (rev 106932)
+++ ZODB/branches/jim-thready/src/ZEO/tests/testConversionSupport.py	2009-12-22 21:31:06 UTC (rev 106933)
@@ -13,6 +13,8 @@
 ##############################################################################
 import unittest
 from zope.testing import doctest
+import ZEO.zrpc.connection
+import ZEO.tests.servertesting
 
 class FakeStorageBase:
 
@@ -29,6 +31,9 @@
     def __len__(self):
         return 4
 
+    def registerDB(self, *args):
+        pass
+
 class FakeStorage(FakeStorageBase):
 
     def record_iternext(self, next=None):
@@ -50,15 +55,25 @@
     def register_connection(*args):
         return None, None
 
+class FauxConn:
+    addr = 'x'
+    thread_ident = unregistered_thread_ident = None
+    peer_protocol_version = (
+        ZEO.zrpc.connection.Connection.current_protocol)
+
+    def auth_done(self):
+        pass
+
 def test_server_record_iternext():
     """
-    
+
 On the server, record_iternext calls are simply delegated to the
 underlying storage.
 
     >>> import ZEO.StorageServer
 
     >>> zeo = ZEO.StorageServer.ZEOStorage(FakeServer(), False)
+    >>> zeo.notifyConnected(FauxConn())
     >>> zeo.register('1', False)
 
     >>> next = None
@@ -71,13 +86,14 @@
     2
     3
     4
-    
+
 The storage info also reflects the fact that record_iternext is supported.
 
     >>> zeo.get_info()['supports_record_iternext']
     True
 
     >>> zeo = ZEO.StorageServer.ZEOStorage(FakeServer(), False)
+    >>> zeo.notifyConnected(FauxConn())
     >>> zeo.register('2', False)
 
     >>> zeo.get_info()['supports_record_iternext']
@@ -152,7 +168,7 @@
     4
 
 """
-    
+
 def history_to_version_compatible_storage():
     """
     Some storages work under ZODB <= 3.8 and ZODB >= 3.9.
@@ -163,15 +179,19 @@
     ...     return oid,version,size
 
     A ZEOStorage such as the following should support this type of storage:
-    
+
     >>> class OurFakeServer(FakeServer):
     ...   storages = {'1':VersionCompatibleStorage()}
     >>> import ZEO.StorageServer
-    >>> zeo = ZEO.StorageServer.ZEOStorage(OurFakeServer(), False)
+    >>> zeo = ZEO.StorageServer.ZEOStorage(
+    ...     ZEO.tests.servertesting.StorageServer(
+    ...        'test', {'1':VersionCompatibleStorage()}))
+    >>> zeo.notifyConnected(ZEO.tests.servertesting.Connection())
     >>> zeo.register('1', False)
 
-    The ZEOStorage should sort out the following call such that the storage gets
-    the correct parameters and so should return the parameters it was called with:
+    The ZEOStorage should sort out the following call such that the
+    storage gets the correct parameters and so should return the
+    parameters it was called with:
 
     >>> zeo.history('oid',99)
     ('oid', '', 99)
@@ -181,7 +201,7 @@
 
     >>> from ZEO.StorageServer import ZEOStorage308Adapter
     >>> zeo = ZEOStorage308Adapter(VersionCompatibleStorage())
-    
+
     The history method should still return the parameters it was called with:
 
     >>> zeo.history('oid','',99)

Modified: ZODB/branches/jim-thready/src/ZEO/tests/testZEO.py
===================================================================
--- ZODB/branches/jim-thready/src/ZEO/tests/testZEO.py	2009-12-22 21:15:21 UTC (rev 106932)
+++ ZODB/branches/jim-thready/src/ZEO/tests/testZEO.py	2009-12-22 21:31:06 UTC (rev 106933)
@@ -25,7 +25,6 @@
 from ZODB.tests.MinPO import MinPO
 from ZODB.tests.StorageTestBase import zodb_unpickle
 
-import asyncore
 import doctest
 import logging
 import os
@@ -464,7 +463,11 @@
             pass
 
         time.sleep(.1)
-        self.failIf(self._storage.is_connected())
+        try:
+            self.failIf(self._storage.is_connected())
+        except:
+            print log
+            raise
         self.assertEqual(len(ZEO.zrpc.connection.client_map), 1)
         del ZEO.zrpc.connection.client_logger.critical
         self.assertEqual(log[0][0], 'The ZEO client loop failed.')
@@ -738,6 +741,14 @@
     blob_cache_dir = 'blobs'
     shared_blob_dir = True
 
+class FauxConn:
+    addr = 'x'
+    peer_protocol_version = (
+        ZEO.zrpc.connection.Connection.current_protocol)
+
+    def auth_done(self):
+        pass
+
 class StorageServerClientWrapper:
 
     def __init__(self):
@@ -754,8 +765,8 @@
     def __init__(self, server, storage_id):
         self.storage_id = storage_id
         self.server = ZEO.StorageServer.ZEOStorage(server, server.read_only)
+        self.server.notifyConnected(FauxConn())
         self.server.register(storage_id, False)
-        self.server._thunk = lambda : None
         self.server.client = StorageServerClientWrapper()
 
     def sortKey(self):
@@ -777,7 +788,6 @@
         self.server.tpc_begin(id(transaction), '', '', {}, None, ' ')
 
     def tpc_vote(self, transaction):
-        self.server._restart()
         self.server.vote(id(transaction))
         result = self.server.client.serials[:]
         del self.server.client.serials[:]
@@ -860,6 +870,8 @@
     >>> fs = FileStorage('t.fs')
     >>> sv = StorageServer(('', get_port()), dict(fs=fs))
     >>> s = ZEOStorage(sv, sv.read_only)
+
+    >>> s.notifyConnected(FauxConn())
     >>> s.register('fs', False)
 
 If we ask for the last transaction, we should get the last transaction

Modified: ZODB/branches/jim-thready/src/ZEO/tests/testZEO2.py
===================================================================
--- ZODB/branches/jim-thready/src/ZEO/tests/testZEO2.py	2009-12-22 21:15:21 UTC (rev 106932)
+++ ZODB/branches/jim-thready/src/ZEO/tests/testZEO2.py	2009-12-22 21:31:06 UTC (rev 106933)
@@ -92,9 +92,9 @@
 handled correctly:
 
     >>> zs1.tpc_abort('0') # doctest: +ELLIPSIS
+    (511/test-addr) ('1') unlock: transactions waiting: 0
     2 callAsync serialnos ...
     reply 1 None
-    (511/test-addr) Blocked transaction restarted.
 
     >>> fs.tpc_transaction() is not None
     True

Modified: ZODB/branches/jim-thready/src/ZEO/zrpc/client.py
===================================================================
--- ZODB/branches/jim-thready/src/ZEO/zrpc/client.py	2009-12-22 21:15:21 UTC (rev 106932)
+++ ZODB/branches/jim-thready/src/ZEO/zrpc/client.py	2009-12-22 21:31:06 UTC (rev 106933)
@@ -84,7 +84,6 @@
         try:
             t = self.thread
             self.thread = None
-            conn = self.connection
         finally:
             self.cond.release()
         if t is not None:
@@ -94,9 +93,6 @@
             if t.isAlive():
                 log("CM.close(): self.thread.join() timed out",
                     level=logging.WARNING)
-        if conn is not None:
-            # This will call close_conn() below which clears self.connection
-            conn.close()
 
     def attempt_connect(self):
         """Attempt a connection to the server without blocking too long.

Modified: ZODB/branches/jim-thready/src/ZEO/zrpc/connection.py
===================================================================
--- ZODB/branches/jim-thready/src/ZEO/zrpc/connection.py	2009-12-22 21:15:21 UTC (rev 106932)
+++ ZODB/branches/jim-thready/src/ZEO/zrpc/connection.py	2009-12-22 21:31:06 UTC (rev 106933)
@@ -16,17 +16,16 @@
 import errno
 import select
 import sys
+import thread
 import threading
 import logging
 
-import traceback, time
-
 from ZEO.zrpc import smac
 from ZEO.zrpc.error import ZRPCError, DisconnectedError
 from ZEO.zrpc.marshal import Marshaller, ServerMarshaller
 from ZEO.zrpc.trigger import trigger
 from ZEO.zrpc.log import short_repr, log
-from ZODB.loglevels import BLATHER, TRACE
+from ZODB.loglevels import BLATHER
 import ZODB.POSException
 
 REPLY = ".reply" # message name used for replies
@@ -144,7 +143,7 @@
                     if obj is client_trigger:
                         continue
                     try:
-                        obj.mgr.client.close()
+                        obj.mgr.client.close(True)
                     except:
                         map.pop(fd, None)
                         try:
@@ -759,11 +758,6 @@
             self.log("wait(%d)" % msgid, level=TRACE)
 
         self.trigger.pull_trigger()
-
-        # Delay used when we call asyncore.poll() directly.
-        # Start with a 1 msec delay, double until 1 sec.
-        delay = 0.001
-
         self.replies_cond.acquire()
         try:
             while 1:
@@ -794,7 +788,6 @@
         self.trigger.pull_trigger()
 
 
-
 class ManagedServerConnection(Connection):
     """Server-side Connection subclass."""
 
@@ -802,13 +795,19 @@
     unlogged_exception_types = (ZODB.POSException.POSKeyError, )
 
     # Servers use a shared server trigger that uses the asyncore socket map
-    trigger = trigger()
+    #trigger = trigger()
 
     def __init__(self, sock, addr, obj, mgr):
         self.mgr = mgr
-        Connection.__init__(self, sock, addr, obj, 'S')
+        map={}
+        Connection.__init__(self, sock, addr, obj, 'S', map=map)
         self.marshal = ServerMarshaller()
+        self.trigger = trigger(map)
 
+        t = threading.Thread(target=server_loop, args=(map, self))
+        t.setDaemon(True)
+        t.start()
+
     def handshake(self):
         # Send the server's preferred protocol to the client.
         self.message_output(self.current_protocol)
@@ -821,6 +820,27 @@
         self.obj.notifyDisconnected()
         Connection.close(self)
 
+    thread_ident = unregistered_thread_ident = None
+    def poll(self):
+        "Invoke asyncore mainloop to get pending message out."
+        ident = self.thread_ident
+        if ident is not None and thread.get_ident() == ident:
+            self.handle_write()
+        else:
+            self.trigger.pull_trigger()
+
+    def auth_done(self):
+        # We're done with the auth dance. We can be fast now.
+        self.thread_ident = self.unregistered_thread_ident
+
+def server_loop(map, conn):
+    conn.unregistered_thread_ident = thread.get_ident()
+
+    while len(map) > 1:
+        asyncore.poll(30.0, map)
+    for o in map.values():
+        o.close()
+
 class ManagedClientConnection(Connection):
     """Client-side Connection subclass."""
     __super_init = Connection.__init__

Modified: ZODB/branches/jim-thready/src/ZODB/FileStorage/FileStorage.py
===================================================================
--- ZODB/branches/jim-thready/src/ZODB/FileStorage/FileStorage.py	2009-12-22 21:15:21 UTC (rev 106932)
+++ ZODB/branches/jim-thready/src/ZODB/FileStorage/FileStorage.py	2009-12-22 21:31:06 UTC (rev 106933)
@@ -36,6 +36,7 @@
 import logging
 import os
 import sys
+import threading
 import time
 import ZODB.blob
 import ZODB.interfaces
@@ -128,7 +129,7 @@
         else:
             self._tfile = None
 
-        self._file_name = file_name
+        self._file_name = os.path.abspath(file_name)
 
         self._pack_gc = pack_gc
         self.pack_keep_old = pack_keep_old
@@ -167,6 +168,7 @@
             self._file = open(file_name, 'w+b')
             self._file.write(packed_version)
 
+        self._files = FilePool(self._file_name)
         r = self._restore_index()
         if r is not None:
             self._used_index = 1 # Marker for testing
@@ -401,6 +403,7 @@
 
     def close(self):
         self._file.close()
+        self._files.close()
         if hasattr(self,'_lock_file'):
             self._lock_file.close()
         if self._tfile:
@@ -426,22 +429,25 @@
         """Return pickle data and serial number."""
         assert not version
 
-        self._lock_acquire()
+        _file = self._files.get()
         try:
+
             pos = self._lookup_pos(oid)
-            h = self._read_data_header(pos, oid)
+
+            h = self._read_data_header(pos, oid, _file)
             if h.plen:
-                data = self._file.read(h.plen)
+                data = _file.read(h.plen)
                 return data, h.tid
             elif h.back:
                 # Get the data from the backpointer, but tid from
                 # current txn.
-                data = self._loadBack_impl(oid, h.back)[0]
+                data = self._loadBack_impl(oid, h.back, _file=_file)[0]
                 return data, h.tid
             else:
                 raise POSKeyError(oid)
+
         finally:
-            self._lock_release()
+            self._files.put(_file)
 
     def loadSerial(self, oid, serial):
         self._lock_acquire()
@@ -462,12 +468,13 @@
             self._lock_release()
 
     def loadBefore(self, oid, tid):
-        self._lock_acquire()
+        _file = self._files.get()
         try:
             pos = self._lookup_pos(oid)
+
             end_tid = None
             while True:
-                h = self._read_data_header(pos, oid)
+                h = self._read_data_header(pos, oid, _file)
                 if h.tid < tid:
                     break
 
@@ -477,14 +484,14 @@
                     return None
 
             if h.back:
-                data, _, _, _ = self._loadBack_impl(oid, h.back)
+                data, _, _, _ = self._loadBack_impl(oid, h.back, _file=_file)
                 return data, h.tid, end_tid
             else:
-                return self._file.read(h.plen), h.tid, end_tid
-
+                return _file.read(h.plen), h.tid, end_tid
         finally:
-            self._lock_release()
+            self._files.put(_file)
 
+
     def store(self, oid, oldserial, data, version, transaction):
         if self._is_read_only:
             raise POSException.ReadOnlyError()
@@ -734,6 +741,31 @@
         finally:
             self._lock_release()
 
+    def tpc_finish(self, transaction, f=None):
+
+        # Get write lock
+        self._files.write_lock()
+        try:
+            self._lock_acquire()
+            try:
+                if transaction is not self._transaction:
+                    return
+                try:
+                    if f is not None:
+                        f(self._tid)
+                    u, d, e = self._ude
+                    self._finish(self._tid, u, d, e)
+                    self._clear_temp()
+                finally:
+                    self._ude = None
+                    self._transaction = None
+                    self._commit_lock_release()
+            finally:
+                self._lock_release()
+
+        finally:
+            self._files.write_unlock()
+
     def _finish(self, tid, u, d, e):
         # If self._nextpos is 0, then the transaction didn't write any
         # data, so we don't bother writing anything to the file.
@@ -1130,8 +1162,10 @@
                 return
             have_commit_lock = True
             opos, index = pack_result
+            self._files.write_lock()
             self._lock_acquire()
             try:
+                self._files.empty()
                 self._file.close()
                 try:
                     os.rename(self._file_name, oldpath)
@@ -1145,6 +1179,7 @@
                 self._initIndex(index, self._tindex)
                 self._pos = opos
             finally:
+                self._files.write_unlock()
                 self._lock_release()
 
             # We're basically done.  Now we need to deal with removed
@@ -2036,3 +2071,72 @@
              'description': d}
         d.update(e)
         return d
+
+class FilePool:
+
+    closed = False
+    writing = False
+
+    def __init__(self, file_name):
+        self.name = file_name
+        self._files = []
+        self._out = []
+        self._cond = threading.Condition()
+
+    def write_lock(self):
+        self._cond.acquire()
+        try:
+            self.writing = True
+            while self._out:
+                self._cond.wait()
+        finally:
+            self._cond.release()
+
+    def write_unlock(self):
+        self._cond.acquire()
+        self.writing = False
+        self._cond.notifyAll()
+        self._cond.release()
+
+    def get(self):
+        self._cond.acquire()
+        try:
+            while self.writing:
+                self._cond.wait()
+            if self.closed:
+                raise ValueError('closed')
+
+            try:
+                f = self._files.pop()
+            except IndexError:
+                f = open(self.name, 'rb')
+            self._out.append(f)
+            return f
+        finally:
+            self._cond.release()
+
+    def put(self, f):
+        self._out.remove(f)
+        self._files.append(f)
+        if not self._out:
+            self._cond.acquire()
+            try:
+                if self.writing and not self._out:
+                    self._cond.notifyAll()
+            finally:
+                self._cond.release()
+
+    def empty(self):
+        while self._files:
+            self._files.pop().close()
+
+    def close(self):
+        self._cond.acquire()
+        self.closed = True
+        self._cond.release()
+
+        self.write_lock()
+        try:
+            self.empty()
+        finally:
+            self.write_unlock()

Modified: ZODB/branches/jim-thready/src/ZODB/FileStorage/format.py
===================================================================
--- ZODB/branches/jim-thready/src/ZODB/FileStorage/format.py	2009-12-22 21:15:21 UTC (rev 106932)
+++ ZODB/branches/jim-thready/src/ZODB/FileStorage/format.py	2009-12-22 21:31:06 UTC (rev 106933)
@@ -134,21 +134,24 @@
         self._file.seek(pos)
         return u64(self._file.read(8))
 
-    def _read_data_header(self, pos, oid=None):
+    def _read_data_header(self, pos, oid=None, _file=None):
         """Return a DataHeader object for data record at pos.
 
         If ois is not None, raise CorruptedDataError if oid passed
         does not match oid in file.
         """
-        self._file.seek(pos)
-        s = self._file.read(DATA_HDR_LEN)
+        if _file is None:
+            _file = self._file
+
+        _file.seek(pos)
+        s = _file.read(DATA_HDR_LEN)
         if len(s) != DATA_HDR_LEN:
             raise CorruptedDataError(oid, s, pos)
         h = DataHeaderFromString(s)
         if oid is not None and oid != h.oid:
             raise CorruptedDataError(oid, s, pos)
         if not h.plen:
-            h.back = u64(self._file.read(8))
+            h.back = u64(_file.read(8))
         return h
 
     def _read_txn_header(self, pos, tid=None):
@@ -164,20 +167,22 @@
         h.ext = self._file.read(h.elen)
         return h
 
-    def _loadBack_impl(self, oid, back, fail=True):
+    def _loadBack_impl(self, oid, back, fail=True, _file=None):
         # shared implementation used by various _loadBack methods
         #
         # If the backpointer ultimately resolves to 0:
         # If fail is True, raise KeyError for zero backpointer.
         # If fail is False, return the empty data from the record
         # with no backpointer.
+        if _file is None:
+            _file = self._file
         while 1:
             if not back:
                 # If backpointer is 0, object does not currently exist.
                 raise POSKeyError(oid)
-            h = self._read_data_header(back)
+            h = self._read_data_header(back, _file=_file)
             if h.plen:
-                return self._file.read(h.plen), h.tid, back, h.tloc
+                return _file.read(h.plen), h.tid, back, h.tloc
             if h.back == 0 and not fail:
                 return None, h.tid, back, h.tloc
             back = h.back

Modified: ZODB/branches/jim-thready/src/ZODB/tests/testFileStorage.py
===================================================================
--- ZODB/branches/jim-thready/src/ZODB/tests/testFileStorage.py	2009-12-22 21:15:21 UTC (rev 106932)
+++ ZODB/branches/jim-thready/src/ZODB/tests/testFileStorage.py	2009-12-22 21:31:06 UTC (rev 106933)
@@ -587,10 +587,10 @@
 
     >>> handler.uninstall()
 
-    >>> fs.load('\0'*8, '')
+    >>> fs.load('\0'*8, '') # doctest: +ELLIPSIS
     Traceback (most recent call last):
     ...
-    ValueError: I/O operation on closed file
+    ValueError: ...
 
     >>> db.close()
     >>> fs = ZODB.FileStorage.FileStorage('data.fs')



More information about the Zodb-checkins mailing list