diff --git a/pages/data.py b/pages/data.py index f42926b..25b7c12 100644 --- a/pages/data.py +++ b/pages/data.py @@ -30,7 +30,6 @@ ]), html.Div(id='tabs-content'), dcc.Store(id='selected-tab', data='tab-uuids-datatable'), # Store to hold selected tab - dcc.Interval(id='interval-load-more', interval=20000, n_intervals=0), # default loading at 10s, can be lowered or hightened based on perf (usual process local is 3s) dcc.Store(id='store-uuids', data=[]), # Store to hold the original UUIDs data dcc.Store(id='store-loaded-uuids', data={'data': [], 'loaded': False}), # Store to track loaded data # RadioItems for key list switch, wrapped in a div that can hide/show @@ -174,11 +173,9 @@ def show_keylist_switch(tab): return style - @callback( Output('tabs-content', 'children'), Output('store-loaded-uuids', 'data'), - Output('interval-load-more', 'disabled'), # Disable interval when all data is loaded Input('tabs-datatable', 'value'), Input('store-uuids', 'data'), Input('store-excluded-uuids', 'data'), @@ -188,144 +185,215 @@ def show_keylist_switch(tab): Input('date-picker', 'start_date'), Input('date-picker', 'end_date'), Input('date-picker-timezone', 'value'), - Input('interval-load-more', 'n_intervals'), # Interval to trigger the loading of more data - Input('keylist-switch', 'value'), # Add keylist-switch to trigger data refresh on change - State('store-loaded-uuids', 'data'), # Use State to track already loaded data - State('store-loaded-uuids', 'loaded') # Keep track if we have finished loading all data + Input('keylist-switch', 'value'), # For trajectories or other dynamic data + State('store-loaded-uuids', 'data'), # State for loaded UUIDs + State('store-loaded-uuids', 'loaded') # State indicating if all UUIDs are loaded ) -def render_content(tab, store_uuids, store_excluded_uuids, store_trips, store_demographics, store_trajectories, - start_date, end_date, timezone, n_intervals, key_list, loaded_uuids_store, all_data_loaded): - initial_batch_size = 10 # Define the batch size for loading UUIDs - - # Update selected tab - selected_tab = tab - logging.debug(f"Selected tab: {selected_tab}") - # Handle the UUIDs tab without fullscreen loading spinner - if tab == 'tab-uuids-datatable': - # Ensure store_uuids contains the key 'data' which is a list of dictionaries - if not isinstance(store_uuids, dict) or 'data' not in store_uuids: - logging.error(f"Expected store_uuids to be a dict with a 'data' key, but got {type(store_uuids)}") - return html.Div([html.P("Data structure error.")]), loaded_uuids_store, True - - # Extract the list of UUIDs from the dict - uuids_list = store_uuids['data'] - - # Ensure uuids_list is a list for slicing - if not isinstance(uuids_list, list): - logging.error(f"Expected store_uuids['data'] to be a list but got {type(uuids_list)}") - return html.Div([html.P("Data structure error.")]), loaded_uuids_store, True - - # Retrieve already loaded data from the store - loaded_data = loaded_uuids_store.get('data', []) - total_loaded = len(loaded_data) - - # Handle lazy loading - if not loaded_uuids_store.get('loaded', False): - total_to_load = total_loaded + initial_batch_size - total_to_load = min(total_to_load, len(uuids_list)) # Avoid loading more than available - - logging.debug(f"Loading next batch of UUIDs: {total_loaded} to {total_to_load}") - - # Slice the list of UUIDs from the dict - new_data = uuids_list[total_loaded:total_to_load] - - if new_data: - # Process and append the new data to the loaded store - processed_data = db_utils.add_user_stats(new_data, initial_batch_size) - loaded_data.extend(processed_data) - - # Update the store with the new data - loaded_uuids_store['data'] = loaded_data - loaded_uuids_store['loaded'] = len(loaded_data) >= len(uuids_list) # Mark all data as loaded if done - - logging.debug(f"New batch loaded. Total loaded: {len(loaded_data)}") - - # Prepare the data to be displayed - columns = perm_utils.get_uuids_columns() # Get the relevant columns - df = pd.DataFrame(loaded_data) - - if df.empty or not perm_utils.has_permission('data_uuids'): - logging.debug("No data or permission issues.") - return html.Div([html.P("No data available or you don't have permission.")]), loaded_uuids_store, True - - df = df.drop(columns=[col for col in df.columns if col not in columns]) - - logging.debug("Returning appended data to update the UI.") - content = html.Div([ - populate_datatable(df), - html.P( - f"Showing {len(loaded_data)} of {len(uuids_list)} UUIDs." + - (f" Loading 10 more..." if not loaded_uuids_store.get('loaded', False) else ""), - style={'margin': '15px 5px'} +def render_content( + tab, + store_uuids, + store_excluded_uuids, + store_trips, + store_demographics, + store_trajectories, + start_date, + end_date, + timezone, + key_list, + loaded_uuids_store, + all_data_loaded +): + with ect.Timer(verbose=False) as total_timer: + # Initialize default return values + tabs_content = None + updated_loaded_uuids_store = loaded_uuids_store + + if tab == 'tab-uuids-datatable': + # **Reverted UUIDs Handling: Simple, Non-Batch Loading** + with ect.Timer(verbose=False) as stage_timer: + try: + # Stage 1: Validate store_uuids structure + if not isinstance(store_uuids, dict) or 'data' not in store_uuids: + logging.error(f"Expected store_uuids to be a dict with a 'data' key, but got {type(store_uuids)}") + tabs_content = html.Div([html.P("Data structure error.")]) + return tabs_content, loaded_uuids_store, True + + # Stage 2: Extract and validate UUIDs list + uuids_list = store_uuids['data'] + if not isinstance(uuids_list, list): + logging.error(f"Expected store_uuids['data'] to be a list but got {type(uuids_list)}") + tabs_content = html.Div([html.P("Data structure error.")]) + return tabs_content, loaded_uuids_store, True + + # Stage 3: Process UUIDs data + processed_data = db_utils.add_user_stats(uuids_list) + logging.debug(f"Processed {len(processed_data)} UUIDs.") + + # Stage 4: Check user permissions + has_perm = perm_utils.has_permission('data_uuids') + if not has_perm: + logging.warning("User does not have permission to view UUIDs.") + tabs_content = html.Div([html.P("You do not have permission to view UUIDs.")]) + return tabs_content, loaded_uuids_store, True + + # Stage 5: Get relevant columns and create DataFrame + columns = perm_utils.get_uuids_columns() + df = pd.DataFrame(processed_data) + + if df.empty: + logging.info("No UUID data available to display.") + tabs_content = html.Div([html.P("No UUID data available.")]) + return tabs_content, loaded_uuids_store, True + + # Stage 6: Drop unauthorized columns + df = df.drop(columns=[col for col in df.columns if col not in columns], errors='ignore') + logging.debug(f"Columns after filtering: {df.columns.tolist()}") + + # Stage 7: Populate the DataTable + datatable = populate_datatable(df) + + tabs_content = html.Div([ + datatable, + html.P(f"Total UUIDs: {len(df)}", style={'margin': '15px 5px'}) + ]) + + # Stage 8: Disable the interval as batch loading is not used + + except Exception as e: + logging.exception("An error occurred while processing UUIDs tab.") + tabs_content = html.Div([html.P("An error occurred while loading UUIDs data.")]) + + # Store timing for 'tab-uuids-datatable' + esdsq.store_dashboard_time( + "admin/data/render_content/tab_uuids_datatable", + stage_timer # Pass the Timer object + ) + + elif tab == 'tab-trips-datatable': + # **Handle Trips Tab with Batch Loading (New Implementation)** + with ect.Timer(verbose=False) as stage_timer: + data = store_trips.get("data", []) + columns = perm_utils.get_allowed_trip_columns() + columns.update(col['label'] for col in perm_utils.get_allowed_named_trip_columns()) + columns.update(store_trips.get("userinputcols", [])) + has_perm = perm_utils.has_permission('data_trips') + + df = pd.DataFrame(data) + if df.empty or not has_perm: + tabs_content = None + else: + df = df.drop(columns=[col for col in df.columns if col not in columns], errors='ignore') + df = clean_location_data(df) + + trips_table = populate_datatable(df, 'trips-table') + tabs_content = html.Div([ + html.Button( + 'Display columns with raw units', + id='button-clicked', + n_clicks=0, + style={'marginLeft': '5px'} + ), + trips_table + ]) + # No changes to UUIDs store or interval + updated_loaded_uuids_store = loaded_uuids_store + + # Store timing for 'tab-trips-datatable' + esdsq.store_dashboard_time( + "admin/data/render_content/tab_trips_datatable", + stage_timer # Pass the Timer object + ) + + elif tab == 'tab-demographics-datatable': + # **Handle Demographics Tab** + with ect.Timer(verbose=False) as stage_timer: + data = store_demographics.get("data", {}) + has_perm = perm_utils.has_permission('data_demographics') + + if len(data) == 1: + # Single survey available + single_survey_data = list(data.values())[0] + df = pd.DataFrame(single_survey_data) + columns = list(df.columns) + if not df.empty and has_perm: + tabs_content = populate_datatable(df) + else: + tabs_content = None + elif len(data) > 1: + if not has_perm: + tabs_content = None + else: + tabs_content = html.Div([ + dcc.Tabs(id='subtabs-demographics', value=list(data.keys())[0], children=[ + dcc.Tab(label=key, value=key) for key in data + ]), + html.Div(id='subtabs-demographics-content') + ]) + else: + tabs_content = None + # No changes to UUIDs store or interval + updated_loaded_uuids_store = loaded_uuids_store + + # Store timing for 'tab-demographics-datatable' + esdsq.store_dashboard_time( + "admin/data/render_content/tab_demographics_datatable", + stage_timer # Pass the Timer object ) - ]) - return content, loaded_uuids_store, False if not loaded_uuids_store['loaded'] else True - - # Handle other tabs normally - elif tab == 'tab-trips-datatable': - data = store_trips["data"] - columns = perm_utils.get_allowed_trip_columns() - columns.update(col['label'] for col in perm_utils.get_allowed_named_trip_columns()) - columns.update(store_trips["userinputcols"]) - has_perm = perm_utils.has_permission('data_trips') - - df = pd.DataFrame(data) - if df.empty or not has_perm: - return None, loaded_uuids_store, True - - df = df.drop(columns=[col for col in df.columns if col not in columns]) - df = clean_location_data(df) - - trips_table = populate_datatable(df, 'trips-table') - logging.debug(f"Returning 3 values: {trips_table}, {loaded_uuids_store}, True") - return html.Div([ - html.Button('Display columns with raw units', id='button-clicked', n_clicks=0, style={'marginLeft': '5px'}), - trips_table - ]), loaded_uuids_store, True - - elif tab == 'tab-demographics-datatable': - data = store_demographics["data"] - has_perm = perm_utils.has_permission('data_demographics') - - if len(data) == 1: - data = list(data.values())[0] - columns = list(data[0].keys()) - elif len(data) > 1: - if not has_perm: - return None, loaded_uuids_store, True - return html.Div([ - dcc.Tabs(id='subtabs-demographics', value=list(data.keys())[0], children=[ - dcc.Tab(label=key, value=key) for key in data - ]), - html.Div(id='subtabs-demographics-content') - ]), loaded_uuids_store, True - - elif tab == 'tab-trajectories-datatable': - (start_date, end_date) = iso_to_date_only(start_date, end_date) - - # Fetch new data based on the selected key_list from the keylist-switch - if store_trajectories == {} or key_list: # Ensure data is refreshed when key_list changes - store_trajectories = update_store_trajectories(start_date, end_date, timezone, store_excluded_uuids, key_list) - - data = store_trajectories.get("data", []) - if data: - columns = list(data[0].keys()) - columns = perm_utils.get_trajectories_columns(columns) - has_perm = perm_utils.has_permission('data_trajectories') - - df = pd.DataFrame(data) - if df.empty or not has_perm: - # If no permission or data, disable interval and return empty content - return None, loaded_uuids_store, True - - # Filter the columns based on permissions - df = df.drop(columns=[col for col in df.columns if col not in columns]) - - # Return the populated DataTable - return populate_datatable(df), loaded_uuids_store, True - - # Default case: if no data is loaded or the tab is not handled - return None, loaded_uuids_store, True + + elif tab == 'tab-trajectories-datatable': + # **Handle Trajectories Tab with Batch Loading (New Implementation)** + with ect.Timer(verbose=False) as stage_timer: + start_date_only, end_date_only = iso_to_date_only(start_date, end_date) + + # Fetch new data based on the selected key_list from the keylist-switch + if not store_trajectories or key_list: + store_trajectories = update_store_trajectories( + start_date_only, end_date_only, timezone, store_excluded_uuids, key_list + ) + + data = store_trajectories.get("data", []) + if data: + columns = list(data[0].keys()) + columns = perm_utils.get_trajectories_columns(columns) + has_perm = perm_utils.has_permission('data_trajectories') + + df = pd.DataFrame(data) + if df.empty or not has_perm: + tabs_content = None + else: + df = df.drop(columns=[col for col in df.columns if col not in columns], errors='ignore') + tabs_content = populate_datatable(df) + else: + tabs_content = None + # No changes to UUIDs store or interval + updated_loaded_uuids_store = loaded_uuids_store + + # Store timing for 'tab-trajectories-datatable' + esdsq.store_dashboard_time( + "admin/data/render_content/tab_trajectories_datatable", + stage_timer # Pass the Timer object + ) + + else: + # **Handle Any Other Tabs (if applicable)** + with ect.Timer(verbose=False) as stage_timer: + tabs_content = None + updated_loaded_uuids_store = loaded_uuids_store + # Store timing for 'other_tabs' + esdsq.store_dashboard_time( + "admin/data/render_content/other_tabs", + stage_timer # Pass the Timer object + ) + + logging.info(f"Rendered content for tab: {tab}") + + # Store total timing after all stages + esdsq.store_dashboard_time( + "admin/data/render_content/total_time", + total_timer # Pass the Timer object + ) + logging.info(f"Total time taken to render content for tab '{tab}': {total_timer.elapsed}") + return tabs_content, updated_loaded_uuids_store @callback( diff --git a/utils/db_utils.py b/utils/db_utils.py index 69f2f8f..ad67c7a 100644 --- a/utils/db_utils.py +++ b/utils/db_utils.py @@ -472,88 +472,98 @@ def query_trajectories(start_date: str, end_date: str, tz: str, key_list: list[s ) return df -# unchanged for now -- since reverting -def add_user_stats(user_data, batch_size=5): - start_time = time.time() - time_format = 'YYYY-MM-DD HH:mm:ss' +def add_user_stats(user_data): + """ + Adds statistical data to each user in the provided user_data list. - def process_user(user): - user_uuid = UUID(user['user_id']) - - # Fetch aggregated data for all users once and cache it - ts_aggregate = esta.TimeSeries.get_aggregate_time_series() + For each user, it calculates total trips, labeled trips, and retrieves profile information. + Additionally, it records the timestamps of the first trip, last trip, and the last API call. - # Fetch data for the user, cached for repeated queries - profile_data = edb.get_profile_db().find_one({'user_id': user_uuid}) - - total_trips = ts_aggregate.find_entries_count( - key_list=["analysis/confirmed_trip"], - extra_query_list=[{'user_id': user_uuid}] - ) - labeled_trips = ts_aggregate.find_entries_count( - key_list=["analysis/confirmed_trip"], - extra_query_list=[{'user_id': user_uuid}, {'data.user_input': {'$ne': {}}}] - ) + :param user_data (list[dict]): List of user dictionaries to be enriched with stats. + :return: The list of user dictionaries with added statistical data. + """ + with ect.Timer(verbose=False) as total_timer: + logging.info("Adding user stats") - user['total_trips'] = total_trips - user['labeled_trips'] = labeled_trips - - if profile_data: - user['platform'] = profile_data.get('curr_platform') - user['manufacturer'] = profile_data.get('manufacturer') - user['app_version'] = profile_data.get('client_app_version') - user['os_version'] = profile_data.get('client_os_version') - user['phone_lang'] = profile_data.get('phone_lang') - - if total_trips > 0: - ts = esta.TimeSeries.get_time_series(user_uuid) - first_trip_ts = ts.get_first_value_for_field( - key='analysis/confirmed_trip', - field='data.end_ts', - sort_order=pymongo.ASCENDING - ) - if first_trip_ts != -1: - user['first_trip'] = arrow.get(first_trip_ts).format(time_format) - - last_trip_ts = ts.get_first_value_for_field( - key='analysis/confirmed_trip', - field='data.end_ts', - sort_order=pymongo.DESCENDING - ) - if last_trip_ts != -1: - user['last_trip'] = arrow.get(last_trip_ts).format(time_format) + for user in user_data: + with ect.Timer(verbose=False) as stage_timer: + try: + logging.debug(f"Processing user {user['user_id']}") + user_uuid = UUID(user['user_id']) + + # Stage 1: Calculate Total Trips + total_trips = esta.TimeSeries.get_aggregate_time_series().find_entries_count( + key_list=["analysis/confirmed_trip"], + extra_query_list=[{'user_id': user_uuid}] + ) + user['total_trips'] = total_trips - last_call_ts = ts.get_first_value_for_field( - key='stats/server_api_time', - field='data.ts', - sort_order=pymongo.DESCENDING - ) - if last_call_ts != -1: - user['last_call'] = arrow.get(last_call_ts).format(time_format) + # Stage 2: Calculate Labeled Trips + labeled_trips = esta.TimeSeries.get_aggregate_time_series().find_entries_count( + key_list=["analysis/confirmed_trip"], + extra_query_list=[{'user_id': user_uuid}, {'data.user_input': {'$ne': {}}}] + ) + user['labeled_trips'] = labeled_trips + + # Stage 3: Retrieve Profile Data + profile_data = edb.get_profile_db().find_one({'user_id': user_uuid}) + user['platform'] = profile_data.get('curr_platform') + user['manufacturer'] = profile_data.get('manufacturer') + user['app_version'] = profile_data.get('client_app_version') + user['os_version'] = profile_data.get('client_os_version') + user['phone_lang'] = profile_data.get('phone_lang') + + if total_trips > 0: + time_format = 'YYYY-MM-DD HH:mm:ss' + ts = esta.TimeSeries.get_time_series(user_uuid) + + # Stage 4: Get First Trip Timestamp + start_ts = ts.get_first_value_for_field( + key='analysis/confirmed_trip', + field='data.end_ts', + sort_order=pymongo.ASCENDING + ) + if start_ts != -1: + user['first_trip'] = arrow.get(start_ts).format(time_format) + + # Stage 5: Get Last Trip Timestamp + end_ts = ts.get_first_value_for_field( + key='analysis/confirmed_trip', + field='data.end_ts', + sort_order=pymongo.DESCENDING + ) + if end_ts != -1: + user['last_trip'] = arrow.get(end_ts).format(time_format) + + # Stage 6: Get Last API Call Timestamp + last_call = ts.get_first_value_for_field( + key='stats/server_api_time', + field='data.ts', + sort_order=pymongo.DESCENDING + ) + if last_call != -1: + user['last_call'] = arrow.get(last_call).format(time_format) + + except Exception as e: + logging.exception(f"An error occurred while processing user {user.get('user_id', 'Unknown')}: {e}") + finally: + # Store timing for processing each user + # I'm hesistant to store this because it will be a lot of data + # esdsq.store_dashboard_time( + # f"admin/db_utils/add_user_stats/process_user_{user['user_id']}", + # stage_timer # Pass the Timer object + # ) + pass - return user - - def batch_process(users_batch): - with ThreadPoolExecutor() as executor: # Adjust max_workers based on CPU cores - futures = [executor.submit(process_user, user) for user in users_batch] - processed_batch = [future.result() for future in as_completed(futures)] - return processed_batch - - total_users = len(user_data) - processed_data = [] - - for i in range(0, total_users, batch_size): - batch = user_data[i:i + batch_size] - processed_batch = batch_process(batch) - processed_data.extend(processed_batch) - - logging.debug(f'Processed {len(processed_data)} users out of {total_users}') - - end_time = time.time() # End timing - execution_time = end_time - start_time - logging.debug(f'Time taken to add_user_stats: {execution_time:.4f} seconds') - - return processed_data + logging.info("Finished adding user stats") + + # Store total timing for the entire function + esdsq.store_dashboard_time( + "admin/db_utils/add_user_stats/total_time", + total_timer # Pass the Timer object + ) + + return user_data def query_segments_crossing_endpoints( poly_region_start,