Skip to content

Commit

Permalink
more cache
Browse files Browse the repository at this point in the history
Signed-off-by: dentiny <dentinyhao@gmail.com>
  • Loading branch information
dentiny committed Jan 11, 2025
1 parent b99bf1e commit 1b8f3b1
Show file tree
Hide file tree
Showing 2 changed files with 27 additions and 185 deletions.
202 changes: 27 additions & 175 deletions python/ray/_private/runtime_env/agent/runtime_env_agent.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,13 +2,9 @@
import logging
import os
import time
import traceback
from collections import defaultdict
from dataclasses import dataclass
from typing import Callable, Dict, List, Set, Tuple
from ray._private.ray_constants import (
DEFAULT_RUNTIME_ENV_TIMEOUT_SECONDS,
)

import ray._private.runtime_env.agent.runtime_env_consts as runtime_env_consts
from ray._private.ray_logging import setup_component_logger
Expand Down Expand Up @@ -144,7 +140,7 @@ def decrease_reference(
self, runtime_env: RuntimeEnv, serialized_env: str, source_process: str
) -> None:
if source_process in self._reference_exclude_sources:
return list()
return
self._decrease_reference_for_runtime_env(serialized_env)
uris = self._uris_parser(runtime_env)
self._decrease_reference_for_uris(uris)
Expand Down Expand Up @@ -314,21 +310,6 @@ async def _setup_runtime_env(
per_job_logger = self.get_or_create_logger(request.job_id, log_files)
context = RuntimeEnvContext(env_vars=runtime_env.env_vars())

# Warn about unrecognized fields in the runtime env.
for name, _ in runtime_env.plugins():
if name not in self._plugin_manager.plugins:
per_job_logger.warning(
f"runtime_env field {name} is not recognized by "
"Ray and will be ignored. In the future, unrecognized "
"fields in the runtime_env will raise an exception."
)

# Creates each runtime env URI by their priority. `working_dir` is special
# because it needs to be created before other plugins. All other plugins are
# created in the priority order (smaller priority value -> earlier to
# create), with a special environment variable being set to the working dir.
# ${RAY_RUNTIME_ENV_CREATE_WORKING_DIR}

# First create working dir...
working_dir_ctx = self._plugin_manager.plugins[WorkingDirPlugin.name]
await create_for_plugin_if_needed(
Expand Down Expand Up @@ -359,94 +340,23 @@ async def _create_runtime_env_with_retry(
setup_timeout_seconds,
runtime_env_config: RuntimeEnvConfig,
) -> Tuple[bool, str, str]:
"""
Create runtime env with retry times. This function won't raise exceptions.
Args:
runtime_env: The instance of RuntimeEnv class.
setup_timeout_seconds: The timeout of runtime environment creation for
each attempt.
Returns:
a tuple which contains result (bool), runtime env context (str), error
message(str).
"""
self._logger.info(
f"Creating runtime env: {serialized_env} with timeout "
f"{setup_timeout_seconds} seconds."
runtime_env_setup_task = _setup_runtime_env(runtime_env, runtime_env_config)
runtime_env_context = await asyncio.wait_for(
runtime_env_setup_task, timeout=setup_timeout_seconds
)
serialized_context = None
error_message = None
for _ in range(runtime_env_consts.RUNTIME_ENV_RETRY_TIMES):
try:
runtime_env_setup_task = _setup_runtime_env(
runtime_env, runtime_env_config
)
runtime_env_context = await asyncio.wait_for(
runtime_env_setup_task, timeout=setup_timeout_seconds
)
serialized_context = runtime_env_context.serialize()
error_message = None
break
except Exception as e:
err_msg = f"Failed to create runtime env {serialized_env}."
self._logger.exception(err_msg)
error_message = "".join(
traceback.format_exception(type(e), e, e.__traceback__)
)
if isinstance(e, asyncio.TimeoutError):
hint = (
f"Failed to install runtime_env within the "
f"timeout of {setup_timeout_seconds} seconds. Consider "
"increasing the timeout in the runtime_env config. "
"For example: \n"
' runtime_env={"config": {"setup_timeout_seconds":'
" 1800}, ...}\n"
"If not provided, the default timeout is "
f"{DEFAULT_RUNTIME_ENV_TIMEOUT_SECONDS} seconds. "
)
error_message = hint + error_message
await asyncio.sleep(
runtime_env_consts.RUNTIME_ENV_RETRY_INTERVAL_MS / 1000
)
if error_message:
self._logger.error(
"Runtime env creation failed for %d times, "
"don't retry any more.",
runtime_env_consts.RUNTIME_ENV_RETRY_TIMES,
)
return False, None, error_message
serialized_context = runtime_env_context.serialize()
return True, serialized_context, None

serialized_env = request.serialized_runtime_env
if serialized_env not in self._env_locks:
# async lock to prevent the same env being concurrently installed
self._env_locks[serialized_env] = asyncio.Lock()
async with self._env_locks[serialized_env]:
if serialized_env in self._runtime_env_cache:
runtime_env = self._runtime_env_cache[serialized_env]
else:
self._logger.info(
"Successfully created runtime env: %s, the context: %s",
serialized_env,
serialized_context,
)
return True, serialized_context, None

try:
serialized_env = request.serialized_runtime_env
if serialized_env not in self._env_locks:
# async lock to prevent the same env being concurrently installed
self._env_locks[serialized_env] = asyncio.Lock()
async with self._env_locks[serialized_env]:
if serialized_env in self._runtime_env_cache:
runtime_env = self._runtime_env_cache[serialized_env]
else:
runtime_env = RuntimeEnv.deserialize(serialized_env)
self._runtime_env_cache[serialized_env] = runtime_env

except Exception as e:
self._logger.exception(
"[Increase] Failed to parse runtime env: " f"{serialized_env}"
)
return runtime_env_agent_pb2.GetOrCreateRuntimeEnvReply(
status=agent_manager_pb2.AGENT_RPC_STATUS_FAILED,
error_message="".join(
traceback.format_exception(type(e), e, e.__traceback__)
),
)
runtime_env = RuntimeEnv.deserialize(serialized_env)
self._runtime_env_cache[serialized_env] = runtime_env

# Increase reference
self._reference_table.increase_reference(
Expand All @@ -461,46 +371,14 @@ async def _create_runtime_env_with_retry(
if serialized_env in self._env_cache:
serialized_context = self._env_cache[serialized_env]
result = self._env_cache[serialized_env]
if result.success:
context = result.result
self._logger.info(
"Runtime env already created "
f"successfully. Env: {serialized_env}, "
f"context: {context}"
)
return runtime_env_agent_pb2.GetOrCreateRuntimeEnvReply(
status=agent_manager_pb2.AGENT_RPC_STATUS_OK,
serialized_runtime_env_context=context,
)
else:
error_message = result.result
self._logger.info(
"Runtime env already failed. "
f"Env: {serialized_env}, "
f"err: {error_message}"
)
# Recover the reference.
self._reference_table.decrease_reference(
runtime_env, serialized_env, request.source_process
)
return runtime_env_agent_pb2.GetOrCreateRuntimeEnvReply(
status=agent_manager_pb2.AGENT_RPC_STATUS_FAILED,
error_message=error_message,
)

if SLEEP_FOR_TESTING_S:
self._logger.info(f"Sleeping for {SLEEP_FOR_TESTING_S}s.")
time.sleep(int(SLEEP_FOR_TESTING_S))
context = result.result
return runtime_env_agent_pb2.GetOrCreateRuntimeEnvReply(
status=agent_manager_pb2.AGENT_RPC_STATUS_OK,
serialized_runtime_env_context=context,
)

runtime_env_config = RuntimeEnvConfig.from_proto(request.runtime_env_config)

# accroding to the document of `asyncio.wait_for`,
# None means disable timeout logic
setup_timeout_seconds = (
None
if runtime_env_config["setup_timeout_seconds"] == -1
else runtime_env_config["setup_timeout_seconds"]
)
setup_timeout_seconds = None

start = time.perf_counter()
(
Expand All @@ -513,51 +391,25 @@ async def _create_runtime_env_with_retry(
runtime_env_config,
)
creation_time_ms = int(round((time.perf_counter() - start) * 1000, 0))
if not successful:
# Recover the reference.
self._reference_table.decrease_reference(
runtime_env, serialized_env, request.source_process
)
# Add the result to env cache.
self._env_cache[serialized_env] = CreatedEnvResult(
successful,
serialized_context if successful else error_message,
creation_time_ms,
)
# Reply the RPC
return runtime_env_agent_pb2.GetOrCreateRuntimeEnvReply(
status=agent_manager_pb2.AGENT_RPC_STATUS_OK
if successful
else agent_manager_pb2.AGENT_RPC_STATUS_FAILED,
status=agent_manager_pb2.AGENT_RPC_STATUS_OK,
serialized_runtime_env_context=serialized_context,
error_message=error_message,
)

async def DeleteRuntimeEnvIfPossible(self, request):
self._logger.info(
f"Got request from {request.source_process} to decrease "
"reference for runtime env: "
f"{request.serialized_runtime_env}."
)

try:
runtime_env = RuntimeEnv.deserialize(request.serialized_runtime_env)
except Exception as e:
self._logger.exception(
"[Decrease] Failed to parse runtime env: "
f"{request.serialized_runtime_env}"
)
return runtime_env_agent_pb2.GetOrCreateRuntimeEnvReply(
status=agent_manager_pb2.AGENT_RPC_STATUS_FAILED,
error_message="".join(
traceback.format_exception(type(e), e, e.__traceback__)
),
)

runtime_env = None
serialized_env = request.serialized_runtime_env
async with self._env_locks[serialized_env]:
runtime_env = self._runtime_env_cache[serialized_env]
self._reference_table.decrease_reference(
runtime_env, request.serialized_runtime_env, request.source_process
)

return runtime_env_agent_pb2.DeleteRuntimeEnvIfPossibleReply(
status=agent_manager_pb2.AGENT_RPC_STATUS_OK
)
Expand Down
10 changes: 0 additions & 10 deletions python/ray/_private/runtime_env/plugin.py
Original file line number Diff line number Diff line change
Expand Up @@ -242,23 +242,13 @@ async def create_for_plugin_if_needed(
uris = plugin.get_uris(runtime_env)

if not uris:
logger.debug(
f"No URIs for runtime env plugin {plugin.name}; "
"create always without checking the cache."
)
await plugin.create(None, runtime_env, context, logger=logger)

for uri in uris:
if uri not in uri_cache:
logger.debug(f"Cache miss for URI {uri}.")
size_bytes = await plugin.create(uri, runtime_env, context, logger=logger)
uri_cache.add(uri, size_bytes, logger=logger)
else:
logger.info(
f"Runtime env {plugin.name} {uri} is already installed "
"and will be reused. Search "
"all runtime_env_setup-*.log to find the corresponding setup log."
)
uri_cache.mark_used(uri, logger=logger)

plugin.modify_context(uris, runtime_env, context, logger)

0 comments on commit 1b8f3b1

Please sign in to comment.