Skip to content

Commit

Permalink
Merge pull request #12 from lsst-dm/tickets/DM-46311
Browse files Browse the repository at this point in the history
DM-46311: Explicitly control accepts to avoid maxing out open files.
  • Loading branch information
ktlim authored Oct 9, 2024
2 parents e31ecc2 + 8ae035b commit e117c22
Showing 1 changed file with 56 additions and 20 deletions.
76 changes: 56 additions & 20 deletions python/s3daemon/s3daemon.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
import asyncio
import logging
import os
import socket
import time

import aiobotocore.session
Expand All @@ -30,6 +31,7 @@
max_connections = int(os.environ.get("S3DAEMON_MAX_CONNECTIONS", 25))
connect_timeout = float(os.environ.get("S3DAEMON_CONNECT_TIMEOUT", 5.0))
max_retries = int(os.environ.get("S3DAEMON_MAX_RETRIES", 2))
max_clients = int(os.environ.get("S3DAEMON_MAX_CLIENTS", 25))

config = botocore.config.Config(
max_pool_connections=max_connections,
Expand Down Expand Up @@ -61,30 +63,48 @@
log.setLevel(logging.INFO)


async def handle_client(client, reader, writer):
async def handle_client(client, conn):
"""Handle a client connection to the server socket.
Parameters
----------
client : `S3`
client : `aiobotocore.client.S3`
The S3 client to use to talk to the server.
reader : `asyncio.StreamReader`
A stream connected to the socket to read the filename/destination pair.
writer : `asyncio.StreamWriter`
A stream connected to the socket to write back status information.
conn : `socket.socket`
The socket connected to the client. This function closes the socket.
"""
filename, dest = (await reader.readline()).decode("UTF-8").rstrip().split(" ")
start = time.time()
# ignore the alias
_, bucket, key = dest.split("/", maxsplit=2)
key = None
try:
buffer = conn.recv(4096)
if buffer == "":
conn.send(b"Empty request")
conn.close()
log.error("Empty request")
return
while (pos := buffer.rfind(b"\n")) == -1:
newbuf = conn.recv(4096)
if newbuf == "":
# Socket closed; handle as if it ended in a newline
pos = len(buffer)
break
buffer += newbuf
buffer = buffer[:pos]
filename, dest = buffer.decode("UTF-8").rstrip().split(" ")
# ignore the alias
_, bucket, key = dest.split("/", maxsplit=2)
with open(filename, "rb") as f:
await client.put_object(Body=f, Bucket=bucket, Key=key)
writer.write(b"Success")
log.info("%f %f sec - %s", start, time.time() - start, filename)
conn.send(b"Success")
log.info("%f %f sec - %s", start, time.time() - start, key)
except Exception as e:
writer.write(bytes(repr(e), "UTF-8"))
log.exception("%f %f sec - %s", start, time.time() - start, filename)
log.exception("%f %f sec - %s", start, time.time() - start, key)
try:
conn.send(repr(e).encode("UTF-8"))
except Exception:
pass
finally:
conn.close()


async def go():
Expand All @@ -97,14 +117,30 @@ async def go():
endpoint_url=endpoint_url,
config=config,
) as client:

async def client_cb(reader, writer):
await handle_client(client, reader, writer)

server = await asyncio.start_server(client_cb, host, port)
sem = asyncio.Semaphore(max_clients)
background_tasks = set()
log.info("Starting server")
async with server:
await server.serve_forever()
with socket.create_server((host, port)) as s:
# We don't want to block in accept(); we need to run other tasks.
s.setblocking(False)
while True:
# Do not allow more accepts if we're already handling the
# maximum number of clients.
await sem.acquire()
while True:
try:
conn, _ = s.accept()
break
except (TimeoutError, BlockingIOError):
# Allow other tasks to run.
await asyncio.sleep(0)
task = asyncio.create_task(handle_client(client, conn))
# Add to set to avoid premature cleanup.
background_tasks.add(task)
# Release semaphore when task is handled.
task.add_done_callback(lambda _: sem.release())
# Remove from set when finished.
task.add_done_callback(background_tasks.discard)


def main():
Expand Down

0 comments on commit e117c22

Please sign in to comment.