[Zodb-checkins] SVN: ZODB/trunk/src/ZODB/ Multi-threaded IO support.

Jim Fulton jim at zope.com
Mon Feb 1 14:12:20 EST 2010


Log message for revision 108696:
  Multi-threaded IO support.
  

Changed:
  U   ZODB/trunk/src/ZODB/FileStorage/FileStorage.py
  U   ZODB/trunk/src/ZODB/FileStorage/format.py
  U   ZODB/trunk/src/ZODB/tests/testFileStorage.py

-=-
Modified: ZODB/trunk/src/ZODB/FileStorage/FileStorage.py
===================================================================
--- ZODB/trunk/src/ZODB/FileStorage/FileStorage.py	2010-02-01 19:12:17 UTC (rev 108695)
+++ ZODB/trunk/src/ZODB/FileStorage/FileStorage.py	2010-02-01 19:12:19 UTC (rev 108696)
@@ -36,6 +36,7 @@
 import logging
 import os
 import sys
+import threading
 import time
 import ZODB.blob
 import ZODB.interfaces
@@ -128,7 +129,7 @@
         else:
             self._tfile = None
 
-        self._file_name = file_name
+        self._file_name = os.path.abspath(file_name)
 
         self._pack_gc = pack_gc
         self.pack_keep_old = pack_keep_old
@@ -167,6 +168,7 @@
             self._file = open(file_name, 'w+b')
             self._file.write(packed_version)
 
+        self._files = FilePool(self._file_name)
         r = self._restore_index()
         if r is not None:
             self._used_index = 1 # Marker for testing
@@ -401,6 +403,7 @@
 
     def close(self):
         self._file.close()
+        self._files.close()
         if hasattr(self,'_lock_file'):
             self._lock_file.close()
         if self._tfile:
@@ -426,22 +429,22 @@
         """Return pickle data and serial number."""
         assert not version
 
-        self._lock_acquire()
+        _file = self._files.get()
         try:
             pos = self._lookup_pos(oid)
-            h = self._read_data_header(pos, oid)
+            h = self._read_data_header(pos, oid, _file)
             if h.plen:
-                data = self._file.read(h.plen)
+                data = _file.read(h.plen)
                 return data, h.tid
             elif h.back:
                 # Get the data from the backpointer, but tid from
                 # current txn.
-                data = self._loadBack_impl(oid, h.back)[0]
+                data = self._loadBack_impl(oid, h.back, _file=_file)[0]
                 return data, h.tid
             else:
                 raise POSKeyError(oid)
         finally:
-            self._lock_release()
+            self._files.put(_file)
 
     def loadSerial(self, oid, serial):
         self._lock_acquire()
@@ -462,12 +465,12 @@
             self._lock_release()
 
     def loadBefore(self, oid, tid):
-        self._lock_acquire()
+        _file = self._files.get()
         try:
             pos = self._lookup_pos(oid)
             end_tid = None
             while True:
-                h = self._read_data_header(pos, oid)
+                h = self._read_data_header(pos, oid, _file)
                 if h.tid < tid:
                     break
 
@@ -477,13 +480,12 @@
                     return None
 
             if h.back:
-                data, _, _, _ = self._loadBack_impl(oid, h.back)
+                data, _, _, _ = self._loadBack_impl(oid, h.back, _file=_file)
                 return data, h.tid, end_tid
             else:
-                return self._file.read(h.plen), h.tid, end_tid
-
+                return _file.read(h.plen), h.tid, end_tid
         finally:
-            self._lock_release()
+            self._files.put(_file)
 
     def store(self, oid, oldserial, data, version, transaction):
         if self._is_read_only:
@@ -735,6 +737,32 @@
         finally:
             self._lock_release()
 
+    def tpc_finish(self, transaction, f=None):
+
+        # Get write lock
+        self._files.write_lock()
+        try:
+            self._lock_acquire()
+            try:
+                if transaction is not self._transaction:
+                    raise POSException.StorageTransactionError(
+                        "tpc_finish called with wrong transaction")
+                try:
+                    if f is not None:
+                        f(self._tid)
+                    u, d, e = self._ude
+                    self._finish(self._tid, u, d, e)
+                    self._clear_temp()
+                finally:
+                    self._ude = None
+                    self._transaction = None
+                    self._commit_lock_release()
+            finally:
+                self._lock_release()
+
+        finally:
+            self._files.write_unlock()
+
     def _finish(self, tid, u, d, e):
         # If self._nextpos is 0, then the transaction didn't write any
         # data, so we don't bother writing anything to the file.
@@ -1131,8 +1159,10 @@
                 return
             have_commit_lock = True
             opos, index = pack_result
+            self._files.write_lock()
             self._lock_acquire()
             try:
+                self._files.empty()
                 self._file.close()
                 try:
                     os.rename(self._file_name, oldpath)
@@ -1146,6 +1176,7 @@
                 self._initIndex(index, self._tindex)
                 self._pos = opos
             finally:
+                self._files.write_unlock()
                 self._lock_release()
 
             # We're basically done.  Now we need to deal with removed
@@ -2037,3 +2068,72 @@
              'description': d}
         d.update(e)
         return d
+
+class FilePool:
+
+    closed = False
+    writing = False
+
+    def __init__(self, file_name):
+        self.name = file_name
+        self._files = []
+        self._out = []
+        self._cond = threading.Condition()
+
+    def write_lock(self):
+        self._cond.acquire()
+        try:
+            self.writing = True
+            while self._out:
+                self._cond.wait()
+        finally:
+            self._cond.release()
+
+    def write_unlock(self):
+        self._cond.acquire()
+        self.writing = False
+        self._cond.notifyAll()
+        self._cond.release()
+
+    def get(self):
+        self._cond.acquire()
+        try:
+            while self.writing:
+                self._cond.wait()
+            if self.closed:
+                raise ValueError('closed')
+
+            try:
+                f = self._files.pop()
+            except IndexError:
+                f = open(self.name, 'rb')
+            self._out.append(f)
+            return f
+        finally:
+            self._cond.release()
+
+    def put(self, f):
+        self._out.remove(f)
+        self._files.append(f)
+        if not self._out:
+            self._cond.acquire()
+            try:
+                if self.writing and not self._out:
+                    self._cond.notifyAll()
+            finally:
+                self._cond.release()
+
+    def empty(self):
+        while self._files:
+            self._files.pop().close()
+
+    def close(self):
+        self._cond.acquire()
+        self.closed = True
+        self._cond.release()
+
+        self.write_lock()
+        try:
+            self.empty()
+        finally:
+            self.write_unlock()

Modified: ZODB/trunk/src/ZODB/FileStorage/format.py
===================================================================
--- ZODB/trunk/src/ZODB/FileStorage/format.py	2010-02-01 19:12:17 UTC (rev 108695)
+++ ZODB/trunk/src/ZODB/FileStorage/format.py	2010-02-01 19:12:19 UTC (rev 108696)
@@ -134,21 +134,24 @@
         self._file.seek(pos)
         return u64(self._file.read(8))
 
-    def _read_data_header(self, pos, oid=None):
+    def _read_data_header(self, pos, oid=None, _file=None):
         """Return a DataHeader object for data record at pos.
 
         If ois is not None, raise CorruptedDataError if oid passed
         does not match oid in file.
         """
-        self._file.seek(pos)
-        s = self._file.read(DATA_HDR_LEN)
+        if _file is None:
+            _file = self._file
+
+        _file.seek(pos)
+        s = _file.read(DATA_HDR_LEN)
         if len(s) != DATA_HDR_LEN:
             raise CorruptedDataError(oid, s, pos)
         h = DataHeaderFromString(s)
         if oid is not None and oid != h.oid:
             raise CorruptedDataError(oid, s, pos)
         if not h.plen:
-            h.back = u64(self._file.read(8))
+            h.back = u64(_file.read(8))
         return h
 
     def _read_txn_header(self, pos, tid=None):
@@ -164,20 +167,22 @@
         h.ext = self._file.read(h.elen)
         return h
 
-    def _loadBack_impl(self, oid, back, fail=True):
+    def _loadBack_impl(self, oid, back, fail=True, _file=None):
         # shared implementation used by various _loadBack methods
         #
         # If the backpointer ultimately resolves to 0:
         # If fail is True, raise KeyError for zero backpointer.
         # If fail is False, return the empty data from the record
         # with no backpointer.
+        if _file is None:
+            _file = self._file
         while 1:
             if not back:
                 # If backpointer is 0, object does not currently exist.
                 raise POSKeyError(oid)
-            h = self._read_data_header(back)
+            h = self._read_data_header(back, _file=_file)
             if h.plen:
-                return self._file.read(h.plen), h.tid, back, h.tloc
+                return _file.read(h.plen), h.tid, back, h.tloc
             if h.back == 0 and not fail:
                 return None, h.tid, back, h.tloc
             back = h.back

Modified: ZODB/trunk/src/ZODB/tests/testFileStorage.py
===================================================================
--- ZODB/trunk/src/ZODB/tests/testFileStorage.py	2010-02-01 19:12:17 UTC (rev 108695)
+++ ZODB/trunk/src/ZODB/tests/testFileStorage.py	2010-02-01 19:12:19 UTC (rev 108696)
@@ -587,10 +587,10 @@
 
     >>> handler.uninstall()
 
-    >>> fs.load('\0'*8, '')
+    >>> fs.load('\0'*8, '') # doctest: +ELLIPSIS
     Traceback (most recent call last):
     ...
-    ValueError: I/O operation on closed file
+    ValueError: ...
 
     >>> db.close()
     >>> fs = ZODB.FileStorage.FileStorage('data.fs')



More information about the Zodb-checkins mailing list