Wsgiserver 0.2 High Quality Access
def __init__(self, host='127.0.0.1', port=8000, application=None): self.host = host self.port = port self.application = application self.server_socket = None self.running = False self.connections = [] def bind_and_listen(self): """Create and bind server socket""" self.server_socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM) self.server_socket.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1) self.server_socket.bind((self.host, self.port)) self.server_socket.listen(5) self.server_socket.setblocking(False) def serve_forever(self): """Main server loop""" self.bind_and_listen() self.running = True print(f"Serving on http://self.host:self.port") while self.running: try: readable, _, _ = select.select([self.server_socket], [], [], 1) if readable: client_socket, addr = self.server_socket.accept() self.handle_client(client_socket, addr) except Exception as e: print(f"Error: e") def handle_client(self, client_socket, addr): """Handle incoming client connection""" handler = WSGIHandler(client_socket, addr, self.application) handler.handle_request() def shutdown(self): """Gracefully shutdown server""" self.running = False if self.server_socket: self.server_socket.close() import os import sys from io import BytesIO class WSGIHandler: """Handle individual HTTP requests"""
COPY requirements.txt . RUN pip install --no-cache-dir -r requirements.txt wsgiserver 0.2
time.sleep(0.5)
server = WSGIServer('127.0.0.1', 8889, app) thread = threading.Thread(target=server.serve_forever) thread.daemon = True thread.start() def __init__(self, host='127
server.shutdown() # Run all tests pytest With coverage pytest --cov=wsgiserver --cov-report=html Run specific test pytest tests/test_server.py::test_basic_request -v 5. Performance Optimization 5.1 Connection Pooling class ConnectionPool: """Manage persistent connections""" def __init__(self, max_connections=100): self.pool = [] self.max_connections = max_connections def acquire(self): if self.pool: return self.pool.pop() return None def release(self, connection): if len(self.pool) < self.max_connections: self.pool.append(connection) 5.2 Asynchronous Support import asyncio class AsyncWSGIServer: """Asynchronous version using asyncio""" _ = select.select([self.server_socket]
[Install] WantedBy=multi-user.target # Dockerfile FROM python:3.11-slim WORKDIR /app