[Zodb-checkins] SVN: ZODB/trunk/src/ZEO/ Implementated checkCurrentSerialInTransaction for ZEO.

Jim Fulton jim at zope.com
Thu Sep 2 09:55:29 EDT 2010


Log message for revision 116133:
  Implementated checkCurrentSerialInTransaction for ZEO.
  
  Also did some storage server implementation cleanup, removing some
  dups and folding a small module into StorageServer.py.
  

Changed:
  U   ZODB/trunk/src/ZEO/ClientStorage.py
  D   ZODB/trunk/src/ZEO/CommitLog.py
  D   ZODB/trunk/src/ZEO/README.txt
  U   ZODB/trunk/src/ZEO/ServerStub.py
  U   ZODB/trunk/src/ZEO/StorageServer.py

-=-
Modified: ZODB/trunk/src/ZEO/ClientStorage.py
===================================================================
--- ZODB/trunk/src/ZEO/ClientStorage.py	2010-09-02 13:55:26 UTC (rev 116132)
+++ ZODB/trunk/src/ZEO/ClientStorage.py	2010-09-02 13:55:29 UTC (rev 116133)
@@ -931,6 +931,11 @@
         self._tbuf.store(oid, data)
         return self._check_serials()
 
+    def checkCurrentSerialInTransaction(self, oid, serial, transaction):
+        self._check_trans(transaction)
+        self._server.checkCurrentSerialInTransaction(oid, serial,
+                                                     id(transaction))
+
     def storeBlob(self, oid, serial, data, blobfilename, version, txn):
         """Storage API: store a blob object."""
         assert not version

Deleted: ZODB/trunk/src/ZEO/CommitLog.py
===================================================================
--- ZODB/trunk/src/ZEO/CommitLog.py	2010-09-02 13:55:26 UTC (rev 116132)
+++ ZODB/trunk/src/ZEO/CommitLog.py	2010-09-02 13:55:29 UTC (rev 116133)
@@ -1,64 +0,0 @@
-##############################################################################
-#
-# Copyright (c) 2001, 2002 Zope Foundation and Contributors.
-# All Rights Reserved.
-#
-# This software is subject to the provisions of the Zope Public License,
-# Version 2.1 (ZPL).  A copy of the ZPL should accompany this distribution.
-# THIS SOFTWARE IS PROVIDED "AS IS" AND ANY AND ALL EXPRESS OR IMPLIED
-# WARRANTIES ARE DISCLAIMED, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
-# WARRANTIES OF TITLE, MERCHANTABILITY, AGAINST INFRINGEMENT, AND FITNESS
-# FOR A PARTICULAR PURPOSE
-#
-##############################################################################
-"""Log a transaction's commit info during two-phase commit.
-
-A storage server allows multiple clients to commit transactions, but
-must serialize them as the actually execute at the server.  The
-concurrent commits are achieved by logging actions up until the
-tpc_vote().  At that point, the entire transaction is committed on the
-real storage.
-
-"""
-
-import cPickle
-import tempfile
-
-
-class CommitLog:
-
-    def __init__(self):
-        self.file = tempfile.TemporaryFile(suffix=".comit-log")
-        self.pickler = cPickle.Pickler(self.file, 1)
-        self.pickler.fast = 1
-        self.stores = 0
-
-    def size(self):
-        return self.file.tell()
-
-    def delete(self, oid, serial):
-        self.pickler.dump(('_delete', (oid, serial)))
-        self.stores += 1
-
-    def store(self, oid, serial, data):
-        self.pickler.dump(('_store', (oid, serial, data)))
-        self.stores += 1
-
-    def restore(self, oid, serial, data, prev_txn):
-        self.pickler.dump(('_restore', (oid, serial, data, prev_txn)))
-        self.stores += 1
-
-    def undo(self, transaction_id):
-        self.pickler.dump(('_undo', (transaction_id, )))
-        self.stores += 1
-
-    def __iter__(self):
-        self.file.seek(0)
-        unpickler = cPickle.Unpickler(self.file)
-        for i in range(self.stores):
-            yield unpickler.load()
-
-    def close(self):
-        if self.file:
-            self.file.close()
-            self.file = None

Deleted: ZODB/trunk/src/ZEO/README.txt
===================================================================
--- ZODB/trunk/src/ZEO/README.txt	2010-09-02 13:55:26 UTC (rev 116132)
+++ ZODB/trunk/src/ZEO/README.txt	2010-09-02 13:55:29 UTC (rev 116133)
@@ -1,44 +0,0 @@
-=======
-ZEO 2.0
-=======
-
-What's ZEO?
------------
-
-ZEO stands for Zope Enterprise Objects.  ZEO is an add-on for Zope
-that allows multiple processes to connect to a single ZODB storage.
-Those processes can live on different machines, but don't need to.
-ZEO 2 has many improvements over ZEO 1, and is incompatible with ZEO 1;
-if you upgrade an existing ZEO 1 installation, you must upgrade the
-server and all clients simultaneous.  If you received ZEO 2 as part of
-the ZODB 3 distribution, the ZEO 1 sources are provided in a separate
-directory (ZEO1).  Some documentation for ZEO is available in the ZODB 3
-package in the Doc subdirectory.  ZEO depends on the ZODB software; it
-can be used with the version of ZODB distributed with Zope 2.5.1 or
-later.  More information about ZEO can be found in the ZODB Wiki:
-
-    http://www.zope.org/Wikis/ZODB
-
-What's here?
-------------
-
-This list of filenames is mostly for ZEO developers::
-
- ClientCache.py          client-side cache implementation
- ClientStorage.py        client-side storage implementation
- ClientStub.py           RPC stubs for callbacks from server to client
- CommitLog.py            buffer used during two-phase commit on the server
- Exceptions.py           definitions of exceptions
- ICache.py               interface definition for the client-side cache
- ServerStub.py           RPC stubs for the server
- StorageServer.py        server-side storage implementation
- TransactionBuffer.py    buffer used for transaction data in the client
- __init__.py             near-empty file to make this directory a package
- simul.py                command-line tool to simulate cache behavior
- start.py                command-line tool to start the storage server
- stats.py                command-line tool to process client cache traces
- tests/                  unit tests and other test utilities
- util.py                 utilities used by the server startup tool
- version.txt             text file indicating the ZEO version
- zrpc/                   subpackage implementing Remote Procedure Call (RPC)
-

Modified: ZODB/trunk/src/ZEO/ServerStub.py
===================================================================
--- ZODB/trunk/src/ZEO/ServerStub.py	2010-09-02 13:55:26 UTC (rev 116132)
+++ ZODB/trunk/src/ZEO/ServerStub.py	2010-09-02 13:55:29 UTC (rev 116133)
@@ -199,6 +199,9 @@
     def storea(self, oid, serial, data, id):
         self.rpc.callAsync('storea', oid, serial, data, id)
 
+    def checkCurrentSerialInTransaction(self, oid, serial, id):
+        self.rpc.callAsync('checkCurrentSerialInTransaction', oid, serial, id)
+
     def restorea(self, oid, serial, data, prev_txn, id):
         self.rpc.callAsync('restorea', oid, serial, data, prev_txn, id)
 

Modified: ZODB/trunk/src/ZEO/StorageServer.py
===================================================================
--- ZODB/trunk/src/ZEO/StorageServer.py	2010-09-02 13:55:26 UTC (rev 116132)
+++ ZODB/trunk/src/ZEO/StorageServer.py	2010-09-02 13:55:29 UTC (rev 116133)
@@ -22,7 +22,6 @@
 
 from __future__ import with_statement
 
-from ZEO.CommitLog import CommitLog
 from ZEO.Exceptions import AuthError
 from ZEO.monitor import StorageStats, StatsServer
 from ZEO.zrpc.connection import ManagedServerConnection, Delay, MTDelay, Result
@@ -477,6 +476,7 @@
                     if not getattr(self, op)(*args):
                         break
 
+
                 # Blob support
                 while self.blob_log and not self.store_failed:
                     oid, oldserial, data, blobfilename = self.blob_log.pop()
@@ -531,6 +531,10 @@
         self.stats.stores += 1
         self.txnlog.store(oid, serial, data)
 
+    def checkCurrentSerialInTransaction(self, oid, serial, id):
+        self._check_tid(id, exc=StorageTransactionError)
+        self.txnlog.checkread(oid, serial)
+
     def restorea(self, oid, serial, data, prev_txn, id):
         self._check_tid(id, exc=StorageTransactionError)
         self.stats.stores += 1
@@ -581,6 +585,20 @@
         self._check_tid(tid, exc=StorageTransactionError)
         self.txnlog.undo(trans_id)
 
+    def _op_error(self, oid, err, op):
+        self.store_failed = 1
+        if isinstance(err, ConflictError):
+            self.stats.conflicts += 1
+            self.log("conflict error oid=%s msg=%s" %
+                     (oid_repr(oid), str(err)), BLATHER)
+        if not isinstance(err, TransactionError):
+            # Unexpected errors are logged and passed to the client
+            self.log("%s error: %s, %s" % ((op,)+ sys.exc_info()[:2]),
+                     logging.ERROR, exc_info=True)
+        err = self._marshal_error(err)
+        # The exception is reported back as newserial for this oid
+        self.serials.append((oid, err))
+
     def _delete(self, oid, serial):
         err = None
         try:
@@ -588,23 +606,24 @@
         except (SystemExit, KeyboardInterrupt):
             raise
         except Exception, err:
-            self.store_failed = 1
-            if isinstance(err, ConflictError):
-                self.stats.conflicts += 1
-                self.log("conflict error oid=%s msg=%s" %
-                         (oid_repr(oid), str(err)), BLATHER)
-            if not isinstance(err, TransactionError):
-                # Unexpected errors are logged and passed to the client
-                self.log("store error: %s, %s" % sys.exc_info()[:2],
-                         logging.ERROR, exc_info=True)
-            err = self._marshal_error(err)
-            # The exception is reported back as newserial for this oid
-            self.serials.append((oid, err))
+            self._op_error(oid, err, 'delete')
         else:
             self.invalidated.append(oid)
 
         return err is None
 
+    def _checkread(self, oid, serial):
+        err = None
+        try:
+            self.storage.checkCurrentSerialInTransaction(
+                oid, serial, self.transaction)
+        except (SystemExit, KeyboardInterrupt):
+            raise
+        except Exception, err:
+            self._op_error(oid, err, 'checkCurrentSerialInTransaction')
+
+        return err is None
+
     def _store(self, oid, serial, data, blobfile=None):
         err = None
         try:
@@ -617,18 +636,7 @@
         except (SystemExit, KeyboardInterrupt):
             raise
         except Exception, err:
-            self.store_failed = 1
-            if isinstance(err, ConflictError):
-                self.stats.conflicts += 1
-                self.log("conflict error oid=%s msg=%s" %
-                         (oid_repr(oid), str(err)), BLATHER)
-            if not isinstance(err, TransactionError):
-                # Unexpected errors are logged and passed to the client
-                self.log("store error: %s, %s" % sys.exc_info()[:2],
-                         logging.ERROR, exc_info=True)
-            err = self._marshal_error(err)
-            # The exception is reported back as newserial for this oid
-            newserial = [(oid, err)]
+            self._op_error(oid, err, 'store')
         else:
             if serial != "\0\0\0\0\0\0\0\0":
                 self.invalidated.append(oid)
@@ -636,8 +644,7 @@
             if isinstance(newserial, str):
                 newserial = [(oid, newserial)]
 
-        if newserial:
-            for oid, s in newserial:
+            for oid, s in newserial or ():
 
                 if s == ResolvedSerial:
                     self.stats.conflicts_resolved += 1
@@ -656,14 +663,7 @@
         except (SystemExit, KeyboardInterrupt):
             raise
         except Exception, err:
-            self.store_failed = 1
-            if not isinstance(err, TransactionError):
-                # Unexpected errors are logged and passed to the client
-                self.log("store error: %s, %s" % sys.exc_info()[:2],
-                         logging.ERROR, exc_info=True)
-            err = self._marshal_error(err)
-            # The exception is reported back as newserial for this oid
-            self.serials.append((oid, err))
+            self._op_error(oid, err, 'restore')
 
         return err is None
 
@@ -674,14 +674,7 @@
         except (SystemExit, KeyboardInterrupt):
             raise
         except Exception, err:
-            self.store_failed = 1
-            if not isinstance(err, TransactionError):
-                # Unexpected errors are logged and passed to the client
-                self.log("store error: %s, %s" % sys.exc_info()[:2],
-                         logging.ERROR, exc_info=True)
-            err = self._marshal_error(err)
-            # The exception is reported back as newserial for this oid
-            self.serials.append((z64, err))
+            self._op_error(z64, err, 'undo')
         else:
             self.invalidated.extend(oids)
             self.serials.extend((oid, ResolvedSerial) for oid in oids)
@@ -1535,3 +1528,44 @@
         host, port = addr
         return str(host) + ":" + str(port)
 
+class CommitLog:
+
+    def __init__(self):
+        self.file = tempfile.TemporaryFile(suffix=".comit-log")
+        self.pickler = cPickle.Pickler(self.file, 1)
+        self.pickler.fast = 1
+        self.stores = 0
+
+    def size(self):
+        return self.file.tell()
+
+    def delete(self, oid, serial):
+        self.pickler.dump(('_delete', (oid, serial)))
+        self.stores += 1
+
+    def checkread(self, oid, serial):
+        self.pickler.dump(('_checkread', (oid, serial)))
+        self.stores += 1
+
+    def store(self, oid, serial, data):
+        self.pickler.dump(('_store', (oid, serial, data)))
+        self.stores += 1
+
+    def restore(self, oid, serial, data, prev_txn):
+        self.pickler.dump(('_restore', (oid, serial, data, prev_txn)))
+        self.stores += 1
+
+    def undo(self, transaction_id):
+        self.pickler.dump(('_undo', (transaction_id, )))
+        self.stores += 1
+
+    def __iter__(self):
+        self.file.seek(0)
+        unpickler = cPickle.Unpickler(self.file)
+        for i in range(self.stores):
+            yield unpickler.load()
+
+    def close(self):
+        if self.file:
+            self.file.close()
+            self.file = None



More information about the Zodb-checkins mailing list