Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Revert "fix: add monitoring for idle connections and close them" #521

Merged
merged 1 commit into from
Jan 9, 2025
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
49 changes: 0 additions & 49 deletions src/codegate/providers/copilot/provider.py
Original file line number Diff line number Diff line change
Expand Up @@ -151,20 +151,6 @@ def __init__(self, loop: asyncio.AbstractEventLoop):
self._closing = False
self.pipeline_factory = PipelineFactory(SecretsManager())
self.context_tracking: Optional[PipelineContext] = None
self.idle_timeout = 10
self.idle_timer = None

def _reset_idle_timer(self) -> None:
if self.idle_timer:
self.idle_timer.cancel()
self.idle_timer = asyncio.get_event_loop().call_later(
self.idle_timeout, self._handle_idle_timeout
)

def _handle_idle_timeout(self) -> None:
logger.warning("Idle timeout reached, closing connection")
if self.transport and not self.transport.is_closing():
self.transport.close()

def _select_pipeline(self, method: str, path: str) -> Optional[CopilotPipeline]:
if method == "POST" and path == "v1/engines/copilot-codex/completions":
Expand Down Expand Up @@ -229,7 +215,6 @@ def connection_made(self, transport: asyncio.Transport) -> None:
self.transport = transport
self.peername = transport.get_extra_info("peername")
logger.debug(f"Client connected from {self.peername}")
self._reset_idle_timer()

def get_headers_dict(self) -> Dict[str, str]:
"""Convert raw headers to dictionary format"""
Expand Down Expand Up @@ -365,10 +350,8 @@ async def _forward_data_to_target(self, data: bytes) -> None:
pipeline_output = pipeline_output.reconstruct()
self.target_transport.write(pipeline_output)


def data_received(self, data: bytes) -> None:
"""Handle received data from client"""
self._reset_idle_timer()
try:
if not self._check_buffer_size(data):
self.send_error_response(413, b"Request body too large")
Expand Down Expand Up @@ -573,7 +556,6 @@ async def connect_to_target(self) -> None:
logger.error(f"Error during TLS handshake: {e}")
self.send_error_response(502, b"TLS handshake failed")


def send_error_response(self, status: int, message: bytes) -> None:
"""Send error response to client"""
if self._closing:
Expand Down Expand Up @@ -611,37 +593,6 @@ def connection_lost(self, exc: Optional[Exception]) -> None:
self.buffer.clear()
self.ssl_context = None

if self.idle_timer:
self.idle_timer.cancel()

def eof_received(self) -> None:
print("in eof received")
"""Handle connection loss"""
if self._closing:
return

self._closing = True
logger.debug(f"EOF received from {self.peername}")

# Close target transport if it exists and isn't already closing
if self.target_transport and not self.target_transport.is_closing():
try:
self.target_transport.close()
except Exception as e:
logger.error(f"Error closing target transport when EOF: {e}")

# Clear references to help with cleanup
self.transport = None
self.target_transport = None
self.buffer.clear()
self.ssl_context = None

def pause_writing(self) -> None:
print("Transport buffer full, pausing writing")

def resume_writing(self) -> None:
print("Transport buffer ready, resuming writing")

@classmethod
async def create_proxy_server(
cls, host: str, port: int, ssl_context: Optional[ssl.SSLContext] = None
Expand Down
Loading