diff --git a/pages/data.py b/pages/data.py index 25b7c12..f42926b 100644 --- a/pages/data.py +++ b/pages/data.py @@ -30,6 +30,7 @@ ]), html.Div(id='tabs-content'), dcc.Store(id='selected-tab', data='tab-uuids-datatable'), # Store to hold selected tab + dcc.Interval(id='interval-load-more', interval=20000, n_intervals=0), # default loading at 10s, can be lowered or hightened based on perf (usual process local is 3s) dcc.Store(id='store-uuids', data=[]), # Store to hold the original UUIDs data dcc.Store(id='store-loaded-uuids', data={'data': [], 'loaded': False}), # Store to track loaded data # RadioItems for key list switch, wrapped in a div that can hide/show @@ -173,9 +174,11 @@ def show_keylist_switch(tab): return style + @callback( Output('tabs-content', 'children'), Output('store-loaded-uuids', 'data'), + Output('interval-load-more', 'disabled'), # Disable interval when all data is loaded Input('tabs-datatable', 'value'), Input('store-uuids', 'data'), Input('store-excluded-uuids', 'data'), @@ -185,215 +188,144 @@ def show_keylist_switch(tab): Input('date-picker', 'start_date'), Input('date-picker', 'end_date'), Input('date-picker-timezone', 'value'), - Input('keylist-switch', 'value'), # For trajectories or other dynamic data - State('store-loaded-uuids', 'data'), # State for loaded UUIDs - State('store-loaded-uuids', 'loaded') # State indicating if all UUIDs are loaded + Input('interval-load-more', 'n_intervals'), # Interval to trigger the loading of more data + Input('keylist-switch', 'value'), # Add keylist-switch to trigger data refresh on change + State('store-loaded-uuids', 'data'), # Use State to track already loaded data + State('store-loaded-uuids', 'loaded') # Keep track if we have finished loading all data ) -def render_content( - tab, - store_uuids, - store_excluded_uuids, - store_trips, - store_demographics, - store_trajectories, - start_date, - end_date, - timezone, - key_list, - loaded_uuids_store, - all_data_loaded -): - with ect.Timer(verbose=False) as total_timer: - # Initialize default return values - tabs_content = None - updated_loaded_uuids_store = loaded_uuids_store - - if tab == 'tab-uuids-datatable': - # **Reverted UUIDs Handling: Simple, Non-Batch Loading** - with ect.Timer(verbose=False) as stage_timer: - try: - # Stage 1: Validate store_uuids structure - if not isinstance(store_uuids, dict) or 'data' not in store_uuids: - logging.error(f"Expected store_uuids to be a dict with a 'data' key, but got {type(store_uuids)}") - tabs_content = html.Div([html.P("Data structure error.")]) - return tabs_content, loaded_uuids_store, True - - # Stage 2: Extract and validate UUIDs list - uuids_list = store_uuids['data'] - if not isinstance(uuids_list, list): - logging.error(f"Expected store_uuids['data'] to be a list but got {type(uuids_list)}") - tabs_content = html.Div([html.P("Data structure error.")]) - return tabs_content, loaded_uuids_store, True - - # Stage 3: Process UUIDs data - processed_data = db_utils.add_user_stats(uuids_list) - logging.debug(f"Processed {len(processed_data)} UUIDs.") - - # Stage 4: Check user permissions - has_perm = perm_utils.has_permission('data_uuids') - if not has_perm: - logging.warning("User does not have permission to view UUIDs.") - tabs_content = html.Div([html.P("You do not have permission to view UUIDs.")]) - return tabs_content, loaded_uuids_store, True - - # Stage 5: Get relevant columns and create DataFrame - columns = perm_utils.get_uuids_columns() - df = pd.DataFrame(processed_data) - - if df.empty: - logging.info("No UUID data available to display.") - tabs_content = html.Div([html.P("No UUID data available.")]) - return tabs_content, loaded_uuids_store, True - - # Stage 6: Drop unauthorized columns - df = df.drop(columns=[col for col in df.columns if col not in columns], errors='ignore') - logging.debug(f"Columns after filtering: {df.columns.tolist()}") - - # Stage 7: Populate the DataTable - datatable = populate_datatable(df) - - tabs_content = html.Div([ - datatable, - html.P(f"Total UUIDs: {len(df)}", style={'margin': '15px 5px'}) - ]) - - # Stage 8: Disable the interval as batch loading is not used - - except Exception as e: - logging.exception("An error occurred while processing UUIDs tab.") - tabs_content = html.Div([html.P("An error occurred while loading UUIDs data.")]) - - # Store timing for 'tab-uuids-datatable' - esdsq.store_dashboard_time( - "admin/data/render_content/tab_uuids_datatable", - stage_timer # Pass the Timer object - ) - - elif tab == 'tab-trips-datatable': - # **Handle Trips Tab with Batch Loading (New Implementation)** - with ect.Timer(verbose=False) as stage_timer: - data = store_trips.get("data", []) - columns = perm_utils.get_allowed_trip_columns() - columns.update(col['label'] for col in perm_utils.get_allowed_named_trip_columns()) - columns.update(store_trips.get("userinputcols", [])) - has_perm = perm_utils.has_permission('data_trips') - - df = pd.DataFrame(data) - if df.empty or not has_perm: - tabs_content = None - else: - df = df.drop(columns=[col for col in df.columns if col not in columns], errors='ignore') - df = clean_location_data(df) - - trips_table = populate_datatable(df, 'trips-table') - tabs_content = html.Div([ - html.Button( - 'Display columns with raw units', - id='button-clicked', - n_clicks=0, - style={'marginLeft': '5px'} - ), - trips_table - ]) - # No changes to UUIDs store or interval - updated_loaded_uuids_store = loaded_uuids_store - - # Store timing for 'tab-trips-datatable' - esdsq.store_dashboard_time( - "admin/data/render_content/tab_trips_datatable", - stage_timer # Pass the Timer object - ) - - elif tab == 'tab-demographics-datatable': - # **Handle Demographics Tab** - with ect.Timer(verbose=False) as stage_timer: - data = store_demographics.get("data", {}) - has_perm = perm_utils.has_permission('data_demographics') - - if len(data) == 1: - # Single survey available - single_survey_data = list(data.values())[0] - df = pd.DataFrame(single_survey_data) - columns = list(df.columns) - if not df.empty and has_perm: - tabs_content = populate_datatable(df) - else: - tabs_content = None - elif len(data) > 1: - if not has_perm: - tabs_content = None - else: - tabs_content = html.Div([ - dcc.Tabs(id='subtabs-demographics', value=list(data.keys())[0], children=[ - dcc.Tab(label=key, value=key) for key in data - ]), - html.Div(id='subtabs-demographics-content') - ]) - else: - tabs_content = None - # No changes to UUIDs store or interval - updated_loaded_uuids_store = loaded_uuids_store - - # Store timing for 'tab-demographics-datatable' - esdsq.store_dashboard_time( - "admin/data/render_content/tab_demographics_datatable", - stage_timer # Pass the Timer object +def render_content(tab, store_uuids, store_excluded_uuids, store_trips, store_demographics, store_trajectories, + start_date, end_date, timezone, n_intervals, key_list, loaded_uuids_store, all_data_loaded): + initial_batch_size = 10 # Define the batch size for loading UUIDs + + # Update selected tab + selected_tab = tab + logging.debug(f"Selected tab: {selected_tab}") + # Handle the UUIDs tab without fullscreen loading spinner + if tab == 'tab-uuids-datatable': + # Ensure store_uuids contains the key 'data' which is a list of dictionaries + if not isinstance(store_uuids, dict) or 'data' not in store_uuids: + logging.error(f"Expected store_uuids to be a dict with a 'data' key, but got {type(store_uuids)}") + return html.Div([html.P("Data structure error.")]), loaded_uuids_store, True + + # Extract the list of UUIDs from the dict + uuids_list = store_uuids['data'] + + # Ensure uuids_list is a list for slicing + if not isinstance(uuids_list, list): + logging.error(f"Expected store_uuids['data'] to be a list but got {type(uuids_list)}") + return html.Div([html.P("Data structure error.")]), loaded_uuids_store, True + + # Retrieve already loaded data from the store + loaded_data = loaded_uuids_store.get('data', []) + total_loaded = len(loaded_data) + + # Handle lazy loading + if not loaded_uuids_store.get('loaded', False): + total_to_load = total_loaded + initial_batch_size + total_to_load = min(total_to_load, len(uuids_list)) # Avoid loading more than available + + logging.debug(f"Loading next batch of UUIDs: {total_loaded} to {total_to_load}") + + # Slice the list of UUIDs from the dict + new_data = uuids_list[total_loaded:total_to_load] + + if new_data: + # Process and append the new data to the loaded store + processed_data = db_utils.add_user_stats(new_data, initial_batch_size) + loaded_data.extend(processed_data) + + # Update the store with the new data + loaded_uuids_store['data'] = loaded_data + loaded_uuids_store['loaded'] = len(loaded_data) >= len(uuids_list) # Mark all data as loaded if done + + logging.debug(f"New batch loaded. Total loaded: {len(loaded_data)}") + + # Prepare the data to be displayed + columns = perm_utils.get_uuids_columns() # Get the relevant columns + df = pd.DataFrame(loaded_data) + + if df.empty or not perm_utils.has_permission('data_uuids'): + logging.debug("No data or permission issues.") + return html.Div([html.P("No data available or you don't have permission.")]), loaded_uuids_store, True + + df = df.drop(columns=[col for col in df.columns if col not in columns]) + + logging.debug("Returning appended data to update the UI.") + content = html.Div([ + populate_datatable(df), + html.P( + f"Showing {len(loaded_data)} of {len(uuids_list)} UUIDs." + + (f" Loading 10 more..." if not loaded_uuids_store.get('loaded', False) else ""), + style={'margin': '15px 5px'} ) - - elif tab == 'tab-trajectories-datatable': - # **Handle Trajectories Tab with Batch Loading (New Implementation)** - with ect.Timer(verbose=False) as stage_timer: - start_date_only, end_date_only = iso_to_date_only(start_date, end_date) - - # Fetch new data based on the selected key_list from the keylist-switch - if not store_trajectories or key_list: - store_trajectories = update_store_trajectories( - start_date_only, end_date_only, timezone, store_excluded_uuids, key_list - ) - - data = store_trajectories.get("data", []) - if data: - columns = list(data[0].keys()) - columns = perm_utils.get_trajectories_columns(columns) - has_perm = perm_utils.has_permission('data_trajectories') - - df = pd.DataFrame(data) - if df.empty or not has_perm: - tabs_content = None - else: - df = df.drop(columns=[col for col in df.columns if col not in columns], errors='ignore') - tabs_content = populate_datatable(df) - else: - tabs_content = None - # No changes to UUIDs store or interval - updated_loaded_uuids_store = loaded_uuids_store - - # Store timing for 'tab-trajectories-datatable' - esdsq.store_dashboard_time( - "admin/data/render_content/tab_trajectories_datatable", - stage_timer # Pass the Timer object - ) - - else: - # **Handle Any Other Tabs (if applicable)** - with ect.Timer(verbose=False) as stage_timer: - tabs_content = None - updated_loaded_uuids_store = loaded_uuids_store - # Store timing for 'other_tabs' - esdsq.store_dashboard_time( - "admin/data/render_content/other_tabs", - stage_timer # Pass the Timer object - ) - - logging.info(f"Rendered content for tab: {tab}") - - # Store total timing after all stages - esdsq.store_dashboard_time( - "admin/data/render_content/total_time", - total_timer # Pass the Timer object - ) - logging.info(f"Total time taken to render content for tab '{tab}': {total_timer.elapsed}") - return tabs_content, updated_loaded_uuids_store + ]) + return content, loaded_uuids_store, False if not loaded_uuids_store['loaded'] else True + + # Handle other tabs normally + elif tab == 'tab-trips-datatable': + data = store_trips["data"] + columns = perm_utils.get_allowed_trip_columns() + columns.update(col['label'] for col in perm_utils.get_allowed_named_trip_columns()) + columns.update(store_trips["userinputcols"]) + has_perm = perm_utils.has_permission('data_trips') + + df = pd.DataFrame(data) + if df.empty or not has_perm: + return None, loaded_uuids_store, True + + df = df.drop(columns=[col for col in df.columns if col not in columns]) + df = clean_location_data(df) + + trips_table = populate_datatable(df, 'trips-table') + logging.debug(f"Returning 3 values: {trips_table}, {loaded_uuids_store}, True") + return html.Div([ + html.Button('Display columns with raw units', id='button-clicked', n_clicks=0, style={'marginLeft': '5px'}), + trips_table + ]), loaded_uuids_store, True + + elif tab == 'tab-demographics-datatable': + data = store_demographics["data"] + has_perm = perm_utils.has_permission('data_demographics') + + if len(data) == 1: + data = list(data.values())[0] + columns = list(data[0].keys()) + elif len(data) > 1: + if not has_perm: + return None, loaded_uuids_store, True + return html.Div([ + dcc.Tabs(id='subtabs-demographics', value=list(data.keys())[0], children=[ + dcc.Tab(label=key, value=key) for key in data + ]), + html.Div(id='subtabs-demographics-content') + ]), loaded_uuids_store, True + + elif tab == 'tab-trajectories-datatable': + (start_date, end_date) = iso_to_date_only(start_date, end_date) + + # Fetch new data based on the selected key_list from the keylist-switch + if store_trajectories == {} or key_list: # Ensure data is refreshed when key_list changes + store_trajectories = update_store_trajectories(start_date, end_date, timezone, store_excluded_uuids, key_list) + + data = store_trajectories.get("data", []) + if data: + columns = list(data[0].keys()) + columns = perm_utils.get_trajectories_columns(columns) + has_perm = perm_utils.has_permission('data_trajectories') + + df = pd.DataFrame(data) + if df.empty or not has_perm: + # If no permission or data, disable interval and return empty content + return None, loaded_uuids_store, True + + # Filter the columns based on permissions + df = df.drop(columns=[col for col in df.columns if col not in columns]) + + # Return the populated DataTable + return populate_datatable(df), loaded_uuids_store, True + + # Default case: if no data is loaded or the tab is not handled + return None, loaded_uuids_store, True @callback( diff --git a/utils/db_utils.py b/utils/db_utils.py index ad67c7a..69f2f8f 100644 --- a/utils/db_utils.py +++ b/utils/db_utils.py @@ -472,98 +472,88 @@ def query_trajectories(start_date: str, end_date: str, tz: str, key_list: list[s ) return df -def add_user_stats(user_data): - """ - Adds statistical data to each user in the provided user_data list. +# unchanged for now -- since reverting +def add_user_stats(user_data, batch_size=5): + start_time = time.time() + time_format = 'YYYY-MM-DD HH:mm:ss' - For each user, it calculates total trips, labeled trips, and retrieves profile information. - Additionally, it records the timestamps of the first trip, last trip, and the last API call. + def process_user(user): + user_uuid = UUID(user['user_id']) + + # Fetch aggregated data for all users once and cache it + ts_aggregate = esta.TimeSeries.get_aggregate_time_series() - :param user_data (list[dict]): List of user dictionaries to be enriched with stats. - :return: The list of user dictionaries with added statistical data. - """ - with ect.Timer(verbose=False) as total_timer: - logging.info("Adding user stats") + # Fetch data for the user, cached for repeated queries + profile_data = edb.get_profile_db().find_one({'user_id': user_uuid}) - for user in user_data: - with ect.Timer(verbose=False) as stage_timer: - try: - logging.debug(f"Processing user {user['user_id']}") - user_uuid = UUID(user['user_id']) - - # Stage 1: Calculate Total Trips - total_trips = esta.TimeSeries.get_aggregate_time_series().find_entries_count( - key_list=["analysis/confirmed_trip"], - extra_query_list=[{'user_id': user_uuid}] - ) - user['total_trips'] = total_trips + total_trips = ts_aggregate.find_entries_count( + key_list=["analysis/confirmed_trip"], + extra_query_list=[{'user_id': user_uuid}] + ) + labeled_trips = ts_aggregate.find_entries_count( + key_list=["analysis/confirmed_trip"], + extra_query_list=[{'user_id': user_uuid}, {'data.user_input': {'$ne': {}}}] + ) + + user['total_trips'] = total_trips + user['labeled_trips'] = labeled_trips + + if profile_data: + user['platform'] = profile_data.get('curr_platform') + user['manufacturer'] = profile_data.get('manufacturer') + user['app_version'] = profile_data.get('client_app_version') + user['os_version'] = profile_data.get('client_os_version') + user['phone_lang'] = profile_data.get('phone_lang') + + if total_trips > 0: + ts = esta.TimeSeries.get_time_series(user_uuid) + first_trip_ts = ts.get_first_value_for_field( + key='analysis/confirmed_trip', + field='data.end_ts', + sort_order=pymongo.ASCENDING + ) + if first_trip_ts != -1: + user['first_trip'] = arrow.get(first_trip_ts).format(time_format) - # Stage 2: Calculate Labeled Trips - labeled_trips = esta.TimeSeries.get_aggregate_time_series().find_entries_count( - key_list=["analysis/confirmed_trip"], - extra_query_list=[{'user_id': user_uuid}, {'data.user_input': {'$ne': {}}}] - ) - user['labeled_trips'] = labeled_trips - - # Stage 3: Retrieve Profile Data - profile_data = edb.get_profile_db().find_one({'user_id': user_uuid}) - user['platform'] = profile_data.get('curr_platform') - user['manufacturer'] = profile_data.get('manufacturer') - user['app_version'] = profile_data.get('client_app_version') - user['os_version'] = profile_data.get('client_os_version') - user['phone_lang'] = profile_data.get('phone_lang') - - if total_trips > 0: - time_format = 'YYYY-MM-DD HH:mm:ss' - ts = esta.TimeSeries.get_time_series(user_uuid) - - # Stage 4: Get First Trip Timestamp - start_ts = ts.get_first_value_for_field( - key='analysis/confirmed_trip', - field='data.end_ts', - sort_order=pymongo.ASCENDING - ) - if start_ts != -1: - user['first_trip'] = arrow.get(start_ts).format(time_format) - - # Stage 5: Get Last Trip Timestamp - end_ts = ts.get_first_value_for_field( - key='analysis/confirmed_trip', - field='data.end_ts', - sort_order=pymongo.DESCENDING - ) - if end_ts != -1: - user['last_trip'] = arrow.get(end_ts).format(time_format) - - # Stage 6: Get Last API Call Timestamp - last_call = ts.get_first_value_for_field( - key='stats/server_api_time', - field='data.ts', - sort_order=pymongo.DESCENDING - ) - if last_call != -1: - user['last_call'] = arrow.get(last_call).format(time_format) - - except Exception as e: - logging.exception(f"An error occurred while processing user {user.get('user_id', 'Unknown')}: {e}") - finally: - # Store timing for processing each user - # I'm hesistant to store this because it will be a lot of data - # esdsq.store_dashboard_time( - # f"admin/db_utils/add_user_stats/process_user_{user['user_id']}", - # stage_timer # Pass the Timer object - # ) - pass + last_trip_ts = ts.get_first_value_for_field( + key='analysis/confirmed_trip', + field='data.end_ts', + sort_order=pymongo.DESCENDING + ) + if last_trip_ts != -1: + user['last_trip'] = arrow.get(last_trip_ts).format(time_format) + + last_call_ts = ts.get_first_value_for_field( + key='stats/server_api_time', + field='data.ts', + sort_order=pymongo.DESCENDING + ) + if last_call_ts != -1: + user['last_call'] = arrow.get(last_call_ts).format(time_format) - logging.info("Finished adding user stats") - - # Store total timing for the entire function - esdsq.store_dashboard_time( - "admin/db_utils/add_user_stats/total_time", - total_timer # Pass the Timer object - ) - - return user_data + return user + + def batch_process(users_batch): + with ThreadPoolExecutor() as executor: # Adjust max_workers based on CPU cores + futures = [executor.submit(process_user, user) for user in users_batch] + processed_batch = [future.result() for future in as_completed(futures)] + return processed_batch + + total_users = len(user_data) + processed_data = [] + + for i in range(0, total_users, batch_size): + batch = user_data[i:i + batch_size] + processed_batch = batch_process(batch) + processed_data.extend(processed_batch) + + logging.debug(f'Processed {len(processed_data)} users out of {total_users}') + + end_time = time.time() # End timing + execution_time = end_time - start_time + logging.debug(f'Time taken to add_user_stats: {execution_time:.4f} seconds') + + return processed_data def query_segments_crossing_endpoints( poly_region_start,