[Zope-CVS] CVS: Packages/tcpwatch - tcpwatch:1.4

Shane Hathaway shane@cvs.zope.org
Wed, 20 Feb 2002 18:48:07 -0500


Update of /cvs-repository/Packages/tcpwatch
In directory cvs.zope.org:/tmp/cvs-serv20461

Modified Files:
	tcpwatch 
Log Message:
Support for running as a real HTTP proxy server.  (Untested)


=== Packages/tcpwatch/tcpwatch 1.3 => 1.4 ===
     _dests = ()
 
-    def __init__(self):
+    def __init__(self, conn=None):
         self._outbuf = []
-        asyncore.dispatcher.__init__(self)
+        asyncore.dispatcher.__init__(self, conn)
 
     def set_dests(self, dests):
         """Sets the destination streams.
@@ -119,11 +119,14 @@
         for d in self._dests:
             d.write('')  # A blank string means the socket just connected.
 
+    def received(self, data):
+        for d in self._dests:
+            d.write(data)
+
     def handle_read(self):
         data = self.recv(RECV_BUFFER_SIZE)
         if data:
-            for d in self._dests:
-                d.write(data)
+            self.received(data)
 
     def handle_write(self):
         if not self.connected:
@@ -180,7 +183,7 @@
 
     transaction = 1
 
-    def __init__(self, connection_number, client_addr, server_addr):
+    def __init__(self, connection_number, client_addr, server_addr=None):
         self.opened = time()
         self.connection_number = connection_number
         self.client_addr = client_addr
@@ -212,7 +215,7 @@
         if info:
             # Got a connection.
             conn, addr = info
-            conn.setblocking (0)
+            conn.setblocking(0)
 
             ep1 = ForwardingEndpoint()  # connects client to self
             ep2 = ForwardingEndpoint()  # connects self to server
@@ -299,9 +302,15 @@
         self.flush()
 
     def connection_from(self, fci):
-        self._output_message(
-            '%s:%s forwarded to %s:%s' %
-            (tuple(fci.client_addr) + tuple(fci.server_addr)), 1)
+        if fci.server_addr is not None:
+            self._output_message(
+                '%s:%s forwarded to %s:%s' %
+                (tuple(fci.client_addr) + tuple(fci.server_addr)), 1)
+        else:
+            self._output_message(
+                'connection from %s:%s' %
+                (tuple(fci.client_addr)), 1)
+
         if fci.transaction > 1:
             self._output_message(
                 ('HTTP transaction #%d' % fci.transaction), 1)
@@ -885,6 +894,178 @@
                     self.resp_index = new_index
 
 
+#############################################################################
+#
+# HTTP proxy
+#
+#############################################################################
+
+
+class HTTPProxyResponseBuffer:
+    """Ensures that responses on a persistent HTTP connection occur
+    in the correct order."""
+
+    finished = 0
+
+    def __init__(self, proxy_conn, observer_stream=None):
+        self.response_parser = HTTPStreamParser(0)
+        self.proxy_conn = proxy_conn
+        self.observer_stream = observer_stream
+        self.held = []
+
+    def _isMyTurn(self):
+        bufs = self.proxy_conn._response_buffers
+        if bufs:
+            return (bufs[0] is self)
+        return 1
+
+    def write(self, data):
+        # Received data from the server.
+        parser = self.response_parser
+        if not parser.completed:
+            parser.received(data)
+        if parser.completed:
+            self.finished = 1
+            self.flush()
+            return
+        observer_stream = self.observer_stream
+        if observer_stream is not None:
+            observer_stream.write(data)
+        if not self._isMyTurn():
+            # Wait for earlier responses to finish.
+            self.held.append(data)
+            return
+        self.flush()
+        self.proxy_conn.write(data)
+
+    def flush(self):
+        if self.held and self._isMyTurn():
+            data = ''.join(self.held) + data
+            del self.held[:]
+            self.proxy_conn.write(data)
+        if self.finished:
+            bufs = self.proxy_conn._response_buffers
+            if bufs and bufs[0] is self:
+                del bufs[0]
+            if bufs:
+                bufs[0].flush()  # kick.
+
+    def close(self):
+        observer_stream = self.observer_stream
+        if observer_stream is not None:
+            observer_stream.close()
+        self.finished = 1
+        self.flush()
+
+
+
+class HTTPProxyConnection (ForwardingEndpoint):
+
+    _req_parser = None
+    _obs = None
+    _transaction = 0
+
+    def __init__(self, conn, factory, counter, addr):
+        asyncore.dispatcher.__init__(self, conn)
+        self._obs_factory = factory
+        self._counter = counter
+        self._client_addr = addr
+        self._response_buffers = []
+        self._newRequest()
+
+    def _newRequest(self):
+        if self._req_parser is None:
+            self._req_parser = HTTPStreamParser(1)
+        factory = self._obs_factory
+        if factory is not None:
+            fci = ForwardedConnectionInfo(self._counter, self._client_addr)
+            obs = factory(fci)
+            self._transaction = self._transaction + 1
+            obs.transaction = self._transaction
+            self._obs = obs
+
+    def received(self, data):
+        while data:
+            parser = self._req_parser
+            if parser is None:
+                # Begin another request.
+                self._newRequest()
+                parser = self._req_parser
+            obs = self._obs
+            if not parser.completed:
+                # Not yet connected to a server.
+                consumed = parser.received(data)
+                if obs is not None:
+                    obs.received(data[:consumed], 1)
+                data = data[:consumed]
+            if parser.completed:
+                # Connected to a server.
+                self.openProxyConnection(parser)
+                # Expect a new request or a closed connection.
+                self._obs = None
+                self._req_parser = None
+
+    def openProxyConnection(self, request):
+        url = request.first_line.strip().split(' ', 1)[-1]
+        pos = url.rfind(' HTTP/')
+        if pos >= 0:
+            url = url[:pos].rstrip()
+        if url.startswith('http://'):
+            host = url[7:].split('/', 1)[0]
+        else:
+            host = request.headers.get('HOST')
+        if not host:
+            raise NotImplementedError, 'Not a full HTTP proxy'
+
+        if ':' in host:
+            host, port = host.split(':')
+            port = int(port)
+        else:
+            port = 80
+
+        obs = self._obs
+
+        if obs is not None:
+            eo = EndpointObserver(obs, 0)
+            buf = HTTPProxyResponseBuffer(self, eo)
+        else:
+            buf = HTTPProxyResponseBuffer(self)
+        self._response_buffers.append(buf)
+
+        ep = ForwardingEndpoint()  # connects server to buf (to self)
+        ep.set_dests((buf,))
+        ep.create_socket(socket.AF_INET, socket.SOCK_STREAM)
+        ep.connect((host, port))
+
+
+
+class HTTPProxyService (asyncore.dispatcher):
+
+    _counter = 0
+
+    def __init__(self, listen_host, listen_port, observer_factory=None):
+        self._obs_factory = observer_factory
+        asyncore.dispatcher.__init__(self)
+        self.create_socket(socket.AF_INET, socket.SOCK_STREAM)
+        self.set_reuse_addr()
+        self.bind((listen_host, listen_port))
+        self.listen(5)
+
+    def handle_accept(self):
+        info = self.accept()
+        if info:
+            # Got a connection.
+            conn, addr = info
+            conn.setblocking(0)
+            counter = self._counter + 1
+            self._counter = counter
+            HTTPProxyConnection(conn, self._obs_factory, counter, addr)
+
+    def handle_error(self):
+        # Don't stop the server.
+        import traceback
+        traceback.print_exc()
+
 
 #############################################################################
 #
@@ -904,6 +1085,7 @@
      Set up a forwarded connection to a specified host, bound to an interface
 
   -h (or --http) Parse as HTTP, splitting up multi-transaction connections
+  -p [<listen_host>:]<listen_port> Run an HTTP proxy (implies -h)
   -s Output to stdout instead of a Tkinter window
   -n No color in GUI (consumes less memory)
 """