[Zodb-checkins] SVN: ZODB/trunk/src/ Added IExternalGC support to ClientStorage.

Jim Fulton jim at zope.com
Thu Dec 18 19:41:55 EST 2008


Log message for revision 94181:
  Added IExternalGC support to ClientStorage.
  

Changed:
  U   ZODB/trunk/src/CHANGES.txt
  U   ZODB/trunk/src/ZEO/ClientStorage.py
  U   ZODB/trunk/src/ZEO/CommitLog.py
  U   ZODB/trunk/src/ZEO/ServerStub.py
  U   ZODB/trunk/src/ZEO/StorageServer.py
  U   ZODB/trunk/src/ZEO/TransactionBuffer.py
  U   ZODB/trunk/src/ZEO/tests/testZEO.py

-=-
Modified: ZODB/trunk/src/CHANGES.txt
===================================================================
--- ZODB/trunk/src/CHANGES.txt	2008-12-19 00:41:53 UTC (rev 94180)
+++ ZODB/trunk/src/CHANGES.txt	2008-12-19 00:41:55 UTC (rev 94181)
@@ -37,7 +37,7 @@
 
 - A new storage interface, IExternalGC, to support external garbage
   collection, http://wiki.zope.org/ZODB/ExternalGC, has been defined
-  and implemented for FileStorage.
+  and implemented for FileStorage and ClientStorage.
 
 - As a small convenience (mainly for tests), you can now specify
   initial data as a string argument to the Blob constructor.

Modified: ZODB/trunk/src/ZEO/ClientStorage.py
===================================================================
--- ZODB/trunk/src/ZEO/ClientStorage.py	2008-12-19 00:41:53 UTC (rev 94180)
+++ ZODB/trunk/src/ZEO/ClientStorage.py	2008-12-19 00:41:55 UTC (rev 94181)
@@ -634,6 +634,7 @@
             ZODB.interfaces.IStorageUndoable,
             ZODB.interfaces.IStorageCurrentRecordIteration,
             ZODB.interfaces.IBlobStorage,
+            ZODB.interfaces.IExternalGC,
             ):
             if (iface.__module__, iface.__name__) in self._info.get(
                 'interfaces', ()):
@@ -969,6 +970,11 @@
         os.rename(blob_filename+'.dl', blob_filename)
         os.chmod(blob_filename, stat.S_IREAD)
 
+    def deleteObject(self, oid, serial, txn):
+        self._check_trans(txn)
+        self._server.deleteObject(oid, serial, id(txn))
+        self._tbuf.store(oid, None)
+
     def loadBlob(self, oid, serial):
         # Load a blob.  If it isn't present and we have a shared blob
         # directory, then assume that it doesn't exist on the server

Modified: ZODB/trunk/src/ZEO/CommitLog.py
===================================================================
--- ZODB/trunk/src/ZEO/CommitLog.py	2008-12-19 00:41:53 UTC (rev 94180)
+++ ZODB/trunk/src/ZEO/CommitLog.py	2008-12-19 00:41:55 UTC (rev 94181)
@@ -37,6 +37,10 @@
     def size(self):
         return self.file.tell()
 
+    def delete(self, oid, serial):
+        self.pickler.dump(('d', oid, serial))
+        self.stores += 1
+
     def store(self, oid, serial, data):
         self.pickler.dump(('s', oid, serial, data))
         self.stores += 1

Modified: ZODB/trunk/src/ZEO/ServerStub.py
===================================================================
--- ZODB/trunk/src/ZEO/ServerStub.py	2008-12-19 00:41:53 UTC (rev 94180)
+++ ZODB/trunk/src/ZEO/ServerStub.py	2008-12-19 00:41:55 UTC (rev 94181)
@@ -225,6 +225,9 @@
     def storeBlobShared(self, oid, serial, data, filename, id):
         self.rpc.callAsync('storeBlobShared', oid, serial, data, filename, id)
 
+    def deleteObject(self, oid, serial, id):
+        self.rpc.callAsync('deleteObject', oid, serial, id)
+
     ##
     # Start two-phase commit for a transaction
     # @param id id used by client to identify current transaction.  The

Modified: ZODB/trunk/src/ZEO/StorageServer.py
===================================================================
--- ZODB/trunk/src/ZEO/StorageServer.py	2008-12-19 00:41:53 UTC (rev 94180)
+++ ZODB/trunk/src/ZEO/StorageServer.py	2008-12-19 00:41:55 UTC (rev 94181)
@@ -468,6 +468,11 @@
     # Most of the real implementations are in methods beginning with
     # an _.
 
+    def deleteObject(self, oid, serial, id):
+        self._check_tid(id, exc=StorageTransactionError)
+        self.stats.stores += 1
+        self.txnlog.delete(oid, serial)
+
     def storea(self, oid, serial, data, id):
         self._check_tid(id, exc=StorageTransactionError)
         self.stats.stores += 1
@@ -518,6 +523,30 @@
         else:
             return self._wait(lambda: self._undo(trans_id))
 
+    def _delete(self, oid, serial):
+        err = None
+        try:
+            self.storage.deleteObject(oid, serial, self.transaction)
+        except (SystemExit, KeyboardInterrupt):
+            raise
+        except Exception, err:
+            self.store_failed = 1
+            if isinstance(err, ConflictError):
+                self.stats.conflicts += 1
+                self.log("conflict error oid=%s msg=%s" %
+                         (oid_repr(oid), str(err)), BLATHER)
+            if not isinstance(err, TransactionError):
+                # Unexpected errors are logged and passed to the client
+                self.log("store error: %s, %s" % sys.exc_info()[:2],
+                         logging.ERROR, exc_info=True)
+            err = self._marshal_error(err)
+            # The exception is reported back as newserial for this oid
+            self.serials.append((oid, err))
+        else:
+            self.invalidated.append(oid)
+
+        return err is None
+
     def _store(self, oid, serial, data):
         err = None
         try:
@@ -652,7 +681,9 @@
             store_type = store[0]
             store_args = store[1:]
 
-            if store_type == 's':
+            if store_type == 'd':
+                do_store = self._delete
+            elif store_type == 's':
                 do_store = self._store
             elif store_type == 'r':
                 do_store = self._restore

Modified: ZODB/trunk/src/ZEO/TransactionBuffer.py
===================================================================
--- ZODB/trunk/src/ZEO/TransactionBuffer.py	2008-12-19 00:41:53 UTC (rev 94180)
+++ ZODB/trunk/src/ZEO/TransactionBuffer.py	2008-12-19 00:41:55 UTC (rev 94181)
@@ -87,7 +87,7 @@
             self.pickler.dump((oid, data))
             self.count += 1
             # Estimate per-record cache size
-            self.size = self.size + len(data) + 31
+            self.size = self.size + (data and len(data) or 0) + 31
         finally:
             self.lock.release()
 

Modified: ZODB/trunk/src/ZEO/tests/testZEO.py
===================================================================
--- ZODB/trunk/src/ZEO/tests/testZEO.py	2008-12-19 00:41:53 UTC (rev 94180)
+++ ZODB/trunk/src/ZEO/tests/testZEO.py	2008-12-19 00:41:55 UTC (rev 94181)
@@ -342,6 +342,7 @@
              ('ZODB.interfaces', 'IStorageIteration'),
              ('ZODB.interfaces', 'IStorageUndoable'),
              ('ZODB.interfaces', 'IStorageCurrentRecordIteration'),
+             ('ZODB.interfaces', 'IExternalGC'),
              ('ZODB.interfaces', 'IStorage'),
              ('zope.interface', 'Interface'),
              ),
@@ -1128,7 +1129,6 @@
 def history_over_zeo():
     """
     >>> addr, _ = start_server()
-    >>> import ZEO, ZODB.blob, transaction
     >>> db = ZEO.DB(addr)
     >>> wait_connected(db.storage)
     >>> conn = db.open()
@@ -1143,8 +1143,7 @@
 def dont_log_poskeyerrors_on_server():
     """
     >>> addr, admin = start_server()
-    >>> import ZEO.ClientStorage
-    >>> cs = ZEO.ClientStorage.ClientStorage(addr)
+    >>> cs = ClientStorage(addr)
     >>> cs.load(ZODB.utils.p64(1))
     Traceback (most recent call last):
     ...
@@ -1156,7 +1155,53 @@
     False
     """
 
+def delete_object_multiple_clients():
+    """If we delete on one client, the delete should be reflected on the other.
 
+    First, we'll create an object:
+    
+    >>> addr, _ = start_server()
+    >>> db = ZEO.DB(addr)
+    >>> conn = db.open()
+    >>> conn.root()[0] = conn.root().__class__()
+    >>> transaction.commit()
+    >>> oid = conn.root()[0]._p_oid
+
+    We verify that we can read it in another client, which also loads
+    it into the client cache.
+    
+    >>> cs = ClientStorage(addr)
+    >>> p, s = cs.load(oid)
+    
+    Now, we'll remove the object:
+
+    >>> txn = transaction.begin()
+    >>> db.storage.tpc_begin(txn)
+    >>> db.storage.deleteObject(oid, s, txn)
+    >>> db.storage.tpc_vote(txn)
+    >>> db.storage.tpc_finish(txn)
+
+    And we'll get a POSKeyError if we try to access it:
+
+    >>> db.storage.load(oid) # doctest: +ELLIPSIS
+    Traceback (most recent call last):
+    ...
+    POSKeyError: ...
+
+    We'll wait for our other storage to get the invalidation and then
+    try to access the object. We'll get a POSKeyError there too:
+    
+    >>> tid = db.storage.lastTransaction()
+    >>> forker.wait_until(lambda : cs.lastTransaction() == tid)
+    >>> cs.load(oid) # doctest: +ELLIPSIS
+    Traceback (most recent call last):
+    ...
+    POSKeyError: ...
+
+    >>> db.close()
+    >>> cs.close()
+    """
+
 slow_test_classes = [
     BlobAdaptedFileStorageTests, BlobWritableCacheTests,
     DemoStorageTests, FileStorageTests, MappingStorageTests,
@@ -1207,6 +1252,16 @@
 def create_storage_shared(name, blob_dir):
     return ServerManagingClientStorage(name, blob_dir, True)
 
+class ServerManagingClientStorageForIExternalGCTest(
+    ServerManagingClientStorage):
+
+    def pack(self, t=None, referencesf=None):
+        ServerManagingClientStorage.pack(self, t, referencesf, wait=True)
+        # Packing doesn't clear old versions out of zeo client caches,
+        # so we'll clear the caches.
+        self._cache.clear()
+        ZEO.ClientStorage._check_blob_cache_size(self.blob_dir, 0)
+
 def test_suite():
     suite = unittest.TestSuite()
 
@@ -1227,6 +1282,10 @@
             setUp=forker.setUp, tearDown=zope.testing.setupstack.tearDown,
             ),
         )
+    zeo.addTest(PackableStorage.IExternalGC_suite(
+        lambda :
+        ServerManagingClientStorageForIExternalGCTest('data.fs', 'blobs')
+        ))
     for klass in quick_test_classes:
         zeo.addTest(unittest.makeSuite(klass, "check"))
     zeo.layer = ZODB.tests.util.MininalTestLayer('testZeo-misc')



More information about the Zodb-checkins mailing list