[Zodb-checkins] SVN: ZODB/branches/jim-zeo-blob/src/ZEO/zrpc/smac.py Queue message processing for main thread so we can support sane file

Jim Fulton jim at zope.com
Tue May 15 15:49:01 EDT 2007


Log message for revision 75778:
  Queue message processing for main thread so we can support sane file
  iteration.
  

Changed:
  U   ZODB/branches/jim-zeo-blob/src/ZEO/zrpc/smac.py

-=-
Modified: ZODB/branches/jim-zeo-blob/src/ZEO/zrpc/smac.py
===================================================================
--- ZODB/branches/jim-zeo-blob/src/ZEO/zrpc/smac.py	2007-05-15 19:31:56 UTC (rev 75777)
+++ ZODB/branches/jim-zeo-blob/src/ZEO/zrpc/smac.py	2007-05-15 19:49:01 UTC (rev 75778)
@@ -101,7 +101,7 @@
         self.__state = 0
         self.__has_mac = 0
         self.__msg_size = 4
-        self.__output_lock = threading.Lock() # Protects __output
+        self.__output_messages = []
         self.__output = []
         self.__closed = False
         # Each side of the connection sends and receives messages.  A
@@ -129,9 +129,14 @@
 
     def setSessionKey(self, sesskey):
         log("set session key %r" % sesskey)
-        self.__hmac_send = hmac.HMAC(sesskey, digestmod=sha)
-        self.__hmac_recv = hmac.HMAC(sesskey, digestmod=sha)
+        def hack():
+            self.__hmac_send = hmac.HMAC(sesskey, digestmod=sha)
+            self.__hmac_recv = hmac.HMAC(sesskey, digestmod=sha)
+            if False:
+                yield ''
 
+        self.message_output(hack())
+        
     def get_addr(self):
         return self.addr
 
@@ -232,87 +237,86 @@
         return True
 
     def writable(self):
-        if len(self.__output) == 0:
-            return False
-        else:
-            return True
+        return bool(self.__output_messages or self.__output)
 
     def should_close(self):
-        self.__output.append(_close_marker)
+        self.__output_messages.append(_close_marker)
 
     def handle_write(self):
-        self.__output_lock.acquire()
-        try:
-            output = self.__output
-            while output:
-                # Accumulate output into a single string so that we avoid
-                # multiple send() calls, but avoid accumulating too much
-                # data.  If we send a very small string and have more data
-                # to send, we will likely incur delays caused by the
-                # unfortunate interaction between the Nagle algorithm and
-                # delayed acks.  If we send a very large string, only a
-                # portion of it will actually be delivered at a time.
+        output = self.__output
+        messages = self.__output_messages
 
-                l = 0
-                for i in range(len(output)):
-                    try:
-                        l += len(output[i])
-                    except TypeError:
-                        # We had an output marker, close the connection
-                        assert output[i] is _close_marker
-                        return self.close()
-                    
-                    if l > SEND_SIZE:
-                        break
+        while output or messages:
+            # Accumulate output into a single string so that we avoid
+            # multiple send() calls, but avoid accumulating too much
+            # data.  If we send a very small string and have more data
+            # to send, we will likely incur delays caused by the
+            # unfortunate interaction between the Nagle algorithm and
+            # delayed acks.  If we send a very large string, only a
+            # portion of it will actually be delivered at a time.
 
-                i += 1
-                # It is very unlikely that i will be 1.
-                v = "".join(output[:i])
-                del output[:i]
+            while messages:
+                message = messages.pop(0)
+                if message.__class__ is str:
+                    self.__message_output(message)
+                elif message is _close_marker:
+                    output.append(message)
+                else:
+                    for m in message:
+                        if m:
+                            self.__message_output(m)
 
+
+            l = 0
+            for i in range(len(output)):
                 try:
-                    n = self.send(v)
-                except socket.error, err:
-                    if err[0] in expected_socket_write_errors:
-                        break # we couldn't write anything
-                    raise
-                if n < len(v):
-                    output.insert(0, v[n:])
-                    break # we can't write any more
-        finally:
-            self.__output_lock.release()
+                    l += len(output[i])
+                except TypeError:
+                    # We had an output marker, close the connection
+                    assert output[i] is _close_marker
+                    return self.close()
 
+                if l > SEND_SIZE:
+                    break
+
+            i += 1
+            # It is very unlikely that i will be 1.
+            v = "".join(output[:i])
+            del output[:i]
+
+            try:
+                n = self.send(v)
+            except socket.error, err:
+                if err[0] in expected_socket_write_errors:
+                    break # we couldn't write anything
+                raise
+            if n < len(v):
+                output.insert(0, v[n:])
+                break # we can't write any more
+
     def handle_close(self):
         self.close()
 
     def message_output(self, message):
-        if __debug__:
-            if self._debug:
-                log("message_output %d bytes: %s hmac=%d" %
-                    (len(message), short_repr(message),
-                    self.__hmac_send and 1 or 0),
-                    level=TRACE)
-
         if self.__closed:
             raise DisconnectedError(
                 "This action is temporarily unavailable.<p>")
-        self.__output_lock.acquire()
-        try:
-            # do two separate appends to avoid copying the message string
-            if self.__hmac_send:
-                self.__output.append(struct.pack(">I", len(message) | MAC_BIT))
-                self.__hmac_send.update(message)
-                self.__output.append(self.__hmac_send.digest())
-            else:
-                self.__output.append(struct.pack(">I", len(message)))
-            if len(message) <= SEND_SIZE:
-                self.__output.append(message)
-            else:
-                for i in range(0, len(message), SEND_SIZE):
-                    self.__output.append(message[i:i+SEND_SIZE])
-        finally:
-            self.__output_lock.release()
+        self.__output_messages.append(message)
 
+    def __message_output(self, message):
+        # do two separate appends to avoid copying the message string
+        if self.__hmac_send:
+            self.__output.append(struct.pack(">I", len(message) | MAC_BIT))
+            self.__hmac_send.update(message)
+            self.__output.append(self.__hmac_send.digest())
+        else:
+            self.__output.append(struct.pack(">I", len(message)))
+        if len(message) <= SEND_SIZE:
+            self.__output.append(message)
+        else:
+            for i in range(0, len(message), SEND_SIZE):
+                self.__output.append(message[i:i+SEND_SIZE])
+
     def close(self):
         if not self.__closed:
             self.__closed = True



More information about the Zodb-checkins mailing list