diff --git a/sepal_ui/scripts/gee.py b/sepal_ui/scripts/gee.py index b3807e2f..4c7adb63 100644 --- a/sepal_ui/scripts/gee.py +++ b/sepal_ui/scripts/gee.py @@ -1,15 +1,23 @@ """All the heleper methods to interface Google Earthengine with sepal-ui.""" +import asyncio +import os import time +from concurrent.futures import ThreadPoolExecutor from pathlib import Path from typing import List, Union import ee import ipyvuetify as v +import nest_asyncio +import psutil from sepal_ui.message import ms from sepal_ui.scripts import decorator as sd +# This I have to add because of the error: RuntimeError: This event loop is already running when using jupyter notebook +nest_asyncio.apply() + @sd.need_ee def wait_for_completion(task_descripsion: str, widget_alert: v.Alert = None) -> str: @@ -83,8 +91,91 @@ def is_running(task_descripsion: str) -> ee.batch.Task: return current_task +async def list_assets_concurrent(folders: list, semaphore: asyncio.Semaphore) -> list: + """List assets concurrently using ThreadPoolExecutor. + + Args: + folders: list of folders to list assets from + + Returns: + list of assets for each folder + """ + async with semaphore: + with ThreadPoolExecutor() as executor: + loop = asyncio.get_running_loop() + tasks = [ + loop.run_in_executor(executor, ee.data.listAssets, {"parent": folder}) + for folder in folders + ] + results = await asyncio.gather(*tasks) + return results + + +async def get_assets_async_concurrent(folder: str) -> List[dict]: + """Get all the assets from the parameter folder. every nested asset will be displayed. + + Args: + folder: the initial GEE folder + + Returns: + the asset list. each asset is a dict with 3 keys: 'type', 'name' and 'id' + + """ + folder_queue = asyncio.Queue() + await folder_queue.put(folder) + asset_list = [] + + # Determine system resources + cpu_count = os.cpu_count() + available_memory = psutil.virtual_memory().available + + # 50 MB per task + max_concurrent_tasks = min(30, cpu_count, available_memory // (50 * 1024 * 1024)) + + # Create a semaphore to limit the number of concurrent tasks + semaphore = asyncio.Semaphore(max_concurrent_tasks) + + while not folder_queue.empty(): + current_folders = [await folder_queue.get() for _ in range(folder_queue.qsize())] + assets_groups = await list_assets_concurrent(current_folders, semaphore) + + for assets in assets_groups: + for asset in assets.get("assets", []): + asset_list.append({"type": asset["type"], "name": asset["name"], "id": asset["id"]}) + if asset["type"] == "FOLDER": + await folder_queue.put(asset["name"]) + + return asset_list + + +@sd.need_ee +def get_assets(folder: Union[str, Path] = "", async_=True) -> List[dict]: + """Get all the assets from the parameter folder. every nested asset will be displayed. + + Args: + folder: the initial GEE folder + async_: whether or not the function should be executed asynchronously + + Returns: + the asset list. each asset is a dict with 3 keys: 'type', 'name' and 'id' + + """ + folder = str(folder) or f"projects/{ee.data._cloud_api_user_project}/assets/" + + if async_: + try: + return asyncio.run(get_assets_async_concurrent(folder)) + except Exception as e: + # Log the exception for future debugging + print(f"Error occurred in get_assets_async_concurrent: {e}") + # Fallback to synchronous method + return get_assets_sync(folder) + + return get_assets_sync(folder) + + @sd.need_ee -def get_assets(folder: Union[str, Path] = "") -> List[dict]: +def get_assets_sync(folder: Union[str, Path] = "") -> List[dict]: """Get all the assets from the parameter folder. every nested asset will be displayed. Args: @@ -95,7 +186,6 @@ def get_assets(folder: Union[str, Path] = "") -> List[dict]: """ # set the folder and init the list asset_list = [] - folder = str(folder) or f"projects/{ee.data._cloud_api_user_project}/assets/" def _recursive_get(folder, asset_list):