[Zodb-checkins] SVN: ZODB/branches/jim-thready-zeo/src/ZODB/ Allow multiple file-storage loads to happen at once.

Jim Fulton jim at zope.com
Wed Sep 16 16:10:08 EDT 2009


Log message for revision 104153:
  Allow multiple file-storage loads to happen at once.
  

Changed:
  U   ZODB/branches/jim-thready-zeo/src/ZODB/FileStorage/FileStorage.py
  U   ZODB/branches/jim-thready-zeo/src/ZODB/FileStorage/format.py
  U   ZODB/branches/jim-thready-zeo/src/ZODB/tests/testFileStorage.py

-=-
Modified: ZODB/branches/jim-thready-zeo/src/ZODB/FileStorage/FileStorage.py
===================================================================
--- ZODB/branches/jim-thready-zeo/src/ZODB/FileStorage/FileStorage.py	2009-09-16 15:38:14 UTC (rev 104152)
+++ ZODB/branches/jim-thready-zeo/src/ZODB/FileStorage/FileStorage.py	2009-09-16 20:10:08 UTC (rev 104153)
@@ -38,6 +38,7 @@
 import logging
 import os
 import sys
+import threading
 import time
 import ZODB.blob
 import ZODB.interfaces
@@ -130,7 +131,7 @@
         else:
             self._tfile = None
 
-        self._file_name = file_name
+        self._file_name = os.path.abspath(file_name)
 
         self._pack_gc = pack_gc
         self.pack_keep_old = pack_keep_old
@@ -139,6 +140,9 @@
 
         BaseStorage.BaseStorage.__init__(self, file_name)
 
+        self._oid_locks = {}
+        self._oid_lock_condition = threading.Condition()
+
         index, tindex = self._newIndexes()
         self._initIndex(index, tindex)
 
@@ -147,7 +151,7 @@
         self._file = None
         if not create:
             try:
-                self._file = open(file_name, read_only and 'rb' or 'r+b')
+                self._file = open(file_name, read_only and 'rb' or 'r+b', 0)
             except IOError, exc:
                 if exc.errno == errno.EFBIG:
                     # The file is too big to open.  Fail visibly.
@@ -166,9 +170,10 @@
         if self._file is None and create:
             if os.path.exists(file_name):
                 os.remove(file_name)
-            self._file = open(file_name, 'w+b')
+            self._file = open(file_name, 'w+b', 0)
             self._file.write(packed_version)
 
+        self._files = FilePool(self._file_name)
         r = self._restore_index()
         if r is not None:
             self._used_index = 1 # Marker for testing
@@ -218,6 +223,39 @@
             self.blob_dir = None
             self._blob_init_no_blobs()
 
+    def _lock_oid(self, oid, read=True):
+        self._oid_lock_condition.acquire()
+        while 1:
+            lock = self._oid_locks.get(oid, 0)
+            if read:
+                if lock < 0:
+                    self._oid_lock_condition.wait()
+                    continue
+                lock += 1
+            else:
+                if lock:
+                    self._oid_lock_condition.wait()
+                    continue
+                lock = -1
+            break
+        self._oid_locks[oid] = lock
+        self._oid_lock_condition.release()
+
+    def _unlock_oid(self, oid, read=True):
+        self._oid_lock_condition.acquire()
+        lock = self._oid_locks[oid]
+        if lock > 1:
+            assert read
+            self._oid_locks[oid] = lock - 1
+        else:
+            if read:
+                assert lock == 1
+            else:
+                assert lock == -1
+            del self._oid_locks[oid]
+        self._oid_lock_condition.notifyAll()
+        self._oid_lock_condition.release()
+
     def copyTransactionsFrom(self, other):
         if self.blob_dir:
             return ZODB.blob.BlobStorageMixin.copyTransactionsFrom(self, other)
@@ -403,6 +441,7 @@
 
     def close(self):
         self._file.close()
+        self._files.close()
         if hasattr(self,'_lock_file'):
             self._lock_file.close()
         if self._tfile:
@@ -428,22 +467,32 @@
         """Return pickle data and serial number."""
         assert not version
 
-        self._lock_acquire()
+        self._lock_oid(oid)
         try:
-            pos = self._lookup_pos(oid)
-            h = self._read_data_header(pos, oid)
-            if h.plen:
-                data = self._file.read(h.plen)
-                return data, h.tid
-            elif h.back:
-                # Get the data from the backpointer, but tid from
-                # current txn.
-                data = self._loadBack_impl(oid, h.back)[0]
-                return data, h.tid
-            else:
-                raise POSKeyError(oid)
+
+            self._lock_acquire()
+            try:
+                pos = self._lookup_pos(oid)
+                _file = self._files.get()
+            finally:
+                self._lock_release()
+
+            try:
+                h = self._read_data_header(pos, oid, _file)
+                if h.plen:
+                    data = _file.read(h.plen)
+                    return data, h.tid
+                elif h.back:
+                    # Get the data from the backpointer, but tid from
+                    # current txn.
+                    data = self._loadBack_impl(oid, h.back, _file=_file)[0]
+                    return data, h.tid
+                else:
+                    raise POSKeyError(oid)
+            finally:
+                self._files.put(_file)
         finally:
-            self._lock_release()
+            self._unlock_oid(oid)
 
     def loadSerial(self, oid, serial):
         self._lock_acquire()
@@ -467,9 +516,14 @@
         self._lock_acquire()
         try:
             pos = self._lookup_pos(oid)
+            _file = self._files.get()
+        finally:
+            self._lock_release()
+
+        try:
             end_tid = None
             while True:
-                h = self._read_data_header(pos, oid)
+                h = self._read_data_header(pos, oid, _file)
                 if h.tid < tid:
                     break
 
@@ -479,14 +533,14 @@
                     return None
 
             if h.back:
-                data, _, _, _ = self._loadBack_impl(oid, h.back)
+                data, _, _, _ = self._loadBack_impl(oid, h.back, _file=_file)
                 return data, h.tid, end_tid
             else:
-                return self._file.read(h.plen), h.tid, end_tid
-
+                return _file.read(h.plen), h.tid, end_tid
         finally:
-            self._lock_release()
+            self._files.put(_file)
 
+
     def store(self, oid, oldserial, data, version, transaction):
         if self._is_read_only:
             raise POSException.ReadOnlyError()
@@ -736,6 +790,36 @@
         finally:
             self._lock_release()
 
+    def tpc_finish(self, transaction, f=None):
+
+        try:
+            # Get write locks
+            locked = []
+            for oid in self._tindex:
+                self._lock_oid(oid, False)
+                locked.append(oid)
+
+            self._lock_acquire()
+            try:
+                if transaction is not self._transaction:
+                    return
+                try:
+                    if f is not None:
+                        f(self._tid)
+                    u, d, e = self._ude
+                    self._finish(self._tid, u, d, e)
+                    self._clear_temp()
+                finally:
+                    self._ude = None
+                    self._transaction = None
+                    self._commit_lock_release()
+            finally:
+                self._lock_release()
+
+        finally:
+            for oid in locked:
+                self._unlock_oid(oid, False)
+
     def _finish(self, tid, u, d, e):
         # If self._nextpos is 0, then the transaction didn't write any
         # data, so we don't bother writing anything to the file.
@@ -1131,18 +1215,20 @@
             opos, index = pack_result
             self._lock_acquire()
             try:
+                self._files.close()
                 self._file.close()
                 try:
                     os.rename(self._file_name, oldpath)
                 except Exception:
-                    self._file = open(self._file_name, 'r+b')
+                    self._file = open(self._file_name, 'r+b', 0)
                     raise
 
                 # OK, we're beyond the point of no return
                 os.rename(self._file_name + '.pack', self._file_name)
                 if not self.pack_keep_old:
                     os.remove(oldpath)
-                self._file = open(self._file_name, 'r+b')
+                self._file = open(self._file_name, 'r+b', 0)
+                self._files.open()
                 self._initIndex(index, self._tindex)
                 self._pos = opos
                 self._save_index()
@@ -2001,3 +2087,61 @@
              'description': d}
         d.update(e)
         return d
+
+class FilePool:
+
+    closed = False
+
+    def __init__(self, file_name):
+        self.name = file_name
+        self._files = []
+        self._out = []
+        self._cond = threading.Condition()
+
+    def get(self):
+        self._cond.acquire()
+        try:
+            if self.closed:
+                raise ValueError('closed')
+            try:
+                f = self._files.pop()
+            except IndexError:
+                f = open(self.name, 'rb', 0)
+            self._out.append(f)
+            return f
+        finally:
+            self._cond.release()
+
+    def put(self, f):
+        self._cond.acquire()
+        try:
+            self._out.remove(f)
+            if self.closed:
+                f.close()
+                if not self._out:
+                    self._cond.notifyAll()
+            else:
+                self._files.append(f)
+        finally:
+            self._cond.release()
+
+    def close(self):
+        self._cond.acquire()
+        try:
+            self.closed = True
+            while self._out:
+                self._cond.wait()
+            while self._files:
+                self._files.pop().close()
+        finally:
+            self._cond.release()
+
+    def open(self):
+        self._cond.acquire()
+        try:
+            assert self.closed
+            assert not self._files
+            assert not self._out
+            self.closed = False
+        finally:
+            self._cond.release()

Modified: ZODB/branches/jim-thready-zeo/src/ZODB/FileStorage/format.py
===================================================================
--- ZODB/branches/jim-thready-zeo/src/ZODB/FileStorage/format.py	2009-09-16 15:38:14 UTC (rev 104152)
+++ ZODB/branches/jim-thready-zeo/src/ZODB/FileStorage/format.py	2009-09-16 20:10:08 UTC (rev 104153)
@@ -134,21 +134,24 @@
         self._file.seek(pos)
         return u64(self._file.read(8))
 
-    def _read_data_header(self, pos, oid=None):
+    def _read_data_header(self, pos, oid=None, _file=None):
         """Return a DataHeader object for data record at pos.
 
         If ois is not None, raise CorruptedDataError if oid passed
         does not match oid in file.
         """
-        self._file.seek(pos)
-        s = self._file.read(DATA_HDR_LEN)
+        if _file is None:
+            _file = self._file
+
+        _file.seek(pos)
+        s = _file.read(DATA_HDR_LEN)
         if len(s) != DATA_HDR_LEN:
             raise CorruptedDataError(oid, s, pos)
         h = DataHeaderFromString(s)
         if oid is not None and oid != h.oid:
             raise CorruptedDataError(oid, s, pos)
         if not h.plen:
-            h.back = u64(self._file.read(8))
+            h.back = u64(_file.read(8))
         return h
 
     def _read_txn_header(self, pos, tid=None):
@@ -164,20 +167,22 @@
         h.ext = self._file.read(h.elen)
         return h
 
-    def _loadBack_impl(self, oid, back, fail=True):
+    def _loadBack_impl(self, oid, back, fail=True, _file=None):
         # shared implementation used by various _loadBack methods
         #
         # If the backpointer ultimately resolves to 0:
         # If fail is True, raise KeyError for zero backpointer.
         # If fail is False, return the empty data from the record
         # with no backpointer.
+        if _file is None:
+            _file = self._file
         while 1:
             if not back:
                 # If backpointer is 0, object does not currently exist.
                 raise POSKeyError(oid)
-            h = self._read_data_header(back)
+            h = self._read_data_header(back, _file=_file)
             if h.plen:
-                return self._file.read(h.plen), h.tid, back, h.tloc
+                return _file.read(h.plen), h.tid, back, h.tloc
             if h.back == 0 and not fail:
                 return None, h.tid, back, h.tloc
             back = h.back

Modified: ZODB/branches/jim-thready-zeo/src/ZODB/tests/testFileStorage.py
===================================================================
--- ZODB/branches/jim-thready-zeo/src/ZODB/tests/testFileStorage.py	2009-09-16 15:38:14 UTC (rev 104152)
+++ ZODB/branches/jim-thready-zeo/src/ZODB/tests/testFileStorage.py	2009-09-16 20:10:08 UTC (rev 104153)
@@ -583,10 +583,10 @@
 
     >>> handler.uninstall()
 
-    >>> fs.load('\0'*8, '')
+    >>> fs.load('\0'*8, '') # doctest: +ELLIPSIS
     Traceback (most recent call last):
     ...
-    ValueError: I/O operation on closed file
+    ValueError: ...
 
     >>> db.close()
     >>> fs = ZODB.FileStorage.FileStorage('data.fs')



More information about the Zodb-checkins mailing list