From fe21f7b4def305816c5e1dfe839080e8a7a2fa08 Mon Sep 17 00:00:00 2001 From: TeachMeTW Date: Fri, 1 Nov 2024 15:30:58 -0700 Subject: [PATCH] Add per-operation timing to segment_current_trips using ect.Timer - Wrapped each significant operation within the `segment_current_trips` function with `ect.Timer` context managers. - Named each timer using the pattern `ecwp.PipelineStages.TRIP_SEGMENTATION.name + "/operation"` for consistent identification. - After each timed block, recorded the elapsed time by calling `esds.store_pipeline_time` with the appropriate parameters. - Ensured that only timing-related code was added without altering existing logic, error handling, or formatting. This enhancement enables granular performance monitoring of the trip segmentation process, allowing for better identification of potential bottlenecks and optimization opportunities. Added more in depth timings for create_places_and_trips and segment_current_trips --- .../intake/segmentation/trip_segmentation.py | 375 +++++++++++++----- 1 file changed, 281 insertions(+), 94 deletions(-) diff --git a/emission/analysis/intake/segmentation/trip_segmentation.py b/emission/analysis/intake/segmentation/trip_segmentation.py index d6828af77..e233183a8 100644 --- a/emission/analysis/intake/segmentation/trip_segmentation.py +++ b/emission/analysis/intake/segmentation/trip_segmentation.py @@ -7,6 +7,7 @@ from builtins import * from builtins import object import logging +import time import emission.storage.timeseries.abstract_timeseries as esta import emission.storage.decorations.place_queries as esdp @@ -23,6 +24,9 @@ import emission.analysis.intake.segmentation.restart_checking as eaisr import emission.core.common as ecc +import emission.storage.decorations.stats_queries as esds +import emission.core.timer as ect +import emission.core.wrapper.pipelinestate as ecwp class TripSegmentationMethod(object): def segment_into_trips(self, timeseries, time_query): @@ -47,68 +51,172 @@ def segment_into_trips(self, timeseries, time_query): pass def segment_current_trips(user_id): - ts = esta.TimeSeries.get_time_series(user_id) - time_query = epq.get_time_range_for_segmentation(user_id) + with ect.Timer() as timer_get_time_series: + ts = esta.TimeSeries.get_time_series(user_id) + esds.store_pipeline_time( + user_id, + ecwp.PipelineStages.TRIP_SEGMENTATION.name + "/SEGMENT_CURRENT_TRIPS/get_time_series", + time.time(), + timer_get_time_series.elapsed + ) + + with ect.Timer() as timer_get_time_range: + time_query = epq.get_time_range_for_segmentation(user_id) + esds.store_pipeline_time( + user_id, + ecwp.PipelineStages.TRIP_SEGMENTATION.name + "/SEGMENT_CURRENT_TRIPS/get_time_range", + time.time(), + timer_get_time_range.elapsed + ) import emission.analysis.intake.segmentation.trip_segmentation_methods.dwell_segmentation_time_filter as dstf import emission.analysis.intake.segmentation.trip_segmentation_methods.dwell_segmentation_dist_filter as dsdf - dstfsm = dstf.DwellSegmentationTimeFilter(time_threshold = 5 * 60, # 5 mins - point_threshold = 9, - distance_threshold = 100) # 100 m - - dsdfsm = dsdf.DwellSegmentationDistFilter(time_threshold = 10 * 60, # 10 mins - point_threshold = 9, - distance_threshold = 50) # 50 m - - filter_methods = {"time": dstfsm, "distance": dsdfsm} - filter_method_names = {"time": "DwellSegmentationTimeFilter", "distance": "DwellSegmentationDistFilter"} - # We need to use the appropriate filter based on the incoming data - # So let's read in the location points for the specified query - loc_df = ts.get_data_df("background/filtered_location", time_query) + + + with ect.Timer() as timer_initialize_filters: + dstfsm = dstf.DwellSegmentationTimeFilter(time_threshold=5 * 60, # 5 mins + point_threshold=9, + distance_threshold=100) # 100 m + + dsdfsm = dsdf.DwellSegmentationDistFilter(time_threshold=10 * 60, # 10 mins + point_threshold=9, + distance_threshold=50) # 50 m + esds.store_pipeline_time( + user_id, + ecwp.PipelineStages.TRIP_SEGMENTATION.name + "/SEGMENT_CURRENT_TRIPS/initialize_filters", + time.time(), + timer_initialize_filters.elapsed + ) + + with ect.Timer() as timer_setup_filter_methods: + filter_methods = {"time": dstfsm, "distance": dsdfsm} + filter_method_names = {"time": "DwellSegmentationTimeFilter", "distance": "DwellSegmentationDistFilter"} + # We need to use the appropriate filter based on the incoming data + # So let's read in the location points for the specified query + esds.store_pipeline_time( + user_id, + ecwp.PipelineStages.TRIP_SEGMENTATION.name + "/SEGMENT_CURRENT_TRIPS/setup_filter_methods", + time.time(), + timer_setup_filter_methods.elapsed + ) + + with ect.Timer() as timer_fetch_location_data: + loc_df = ts.get_data_df("background/filtered_location", time_query) + esds.store_pipeline_time( + user_id, + ecwp.PipelineStages.TRIP_SEGMENTATION.name + "/SEGMENT_CURRENT_TRIPS/fetch_location_data", + time.time(), + timer_fetch_location_data.elapsed + ) + if len(loc_df) == 0: # no new segments, no need to keep looking at these again logging.debug("len(loc_df) == 0, early return") epq.mark_segmentation_done(user_id, None) return - out_of_order_points = loc_df[loc_df.ts.diff() < 0] + with ect.Timer() as timer_check_out_of_order: + out_of_order_points = loc_df[loc_df.ts.diff() < 0] + esds.store_pipeline_time( + user_id, + ecwp.PipelineStages.TRIP_SEGMENTATION.name + "/SEGMENT_CURRENT_TRIPS/check_out_of_order_points", + time.time(), + timer_check_out_of_order.elapsed + ) + if len(out_of_order_points) > 0: - logging.info("Found out of order points!") - logging.info("%s" % out_of_order_points) - # drop from the table - loc_df = loc_df.drop(out_of_order_points.index.tolist()) - loc_df.reset_index(inplace=True) - # invalidate in the database. - out_of_order_id_list = out_of_order_points["_id"].tolist() - logging.debug("out_of_order_id_list = %s" % out_of_order_id_list) - for ooid in out_of_order_id_list: - ts.invalidate_raw_entry(ooid) - - filters_in_df = loc_df["filter"].dropna().unique() - logging.debug("Filters in the dataframe = %s" % filters_in_df) + with ect.Timer() as timer_handle_out_of_order: + logging.info("Found out of order points!") + logging.info("%s" % out_of_order_points) + # drop from the table + loc_df = loc_df.drop(out_of_order_points.index.tolist()) + loc_df.reset_index(inplace=True) + # invalidate in the database. + out_of_order_id_list = out_of_order_points["_id"].tolist() + logging.debug("out_of_order_id_list = %s" % out_of_order_id_list) + for ooid in out_of_order_id_list: + ts.invalidate_raw_entry(ooid) + esds.store_pipeline_time( + user_id, + ecwp.PipelineStages.TRIP_SEGMENTATION.name + "/SEGMENT_CURRENT_TRIPS/handle_out_of_order_points", + time.time(), + timer_handle_out_of_order.elapsed + ) + + with ect.Timer() as timer_identify_filters: + filters_in_df = loc_df["filter"].dropna().unique() + logging.debug("Filters in the dataframe = %s" % filters_in_df) + esds.store_pipeline_time( + user_id, + ecwp.PipelineStages.TRIP_SEGMENTATION.name + "/SEGMENT_CURRENT_TRIPS/identify_active_filters", + time.time(), + timer_identify_filters.elapsed + ) + if len(filters_in_df) == 1: - # Common case - let's make it easy - - segmentation_points = filter_methods[filters_in_df[0]].segment_into_trips(ts, - time_query) + with ect.Timer() as timer_segment_single_filter: + segmentation_points = filter_methods[filters_in_df[0]].segment_into_trips(ts, time_query) + esds.store_pipeline_time( + user_id, + ecwp.PipelineStages.TRIP_SEGMENTATION.name + "/SEGMENT_CURRENT_TRIPS/segment_single_filter", + time.time(), + timer_segment_single_filter.elapsed + ) else: - segmentation_points = get_combined_segmentation_points(ts, loc_df, time_query, - filters_in_df, - filter_methods) - # Create and store trips and places based on the segmentation points - if segmentation_points is None: - epq.mark_segmentation_failed(user_id) - elif len(segmentation_points) == 0: - # no new segments, no need to keep looking at these again - logging.debug("len(segmentation_points) == 0, early return") - epq.mark_segmentation_done(user_id, None) - else: - try: - create_places_and_trips(user_id, segmentation_points, filter_method_names[filters_in_df[0]]) - epq.mark_segmentation_done(user_id, get_last_ts_processed(filter_methods)) - except: - logging.exception("Trip generation failed for user %s" % user_id) + with ect.Timer() as timer_segment_combined_filters: + segmentation_points = get_combined_segmentation_points(ts, loc_df, time_query, + filters_in_df, + filter_methods) + esds.store_pipeline_time( + user_id, + ecwp.PipelineStages.TRIP_SEGMENTATION.name + "/SEGMENT_CURRENT_TRIPS/segment_combined_filters", + time.time(), + timer_segment_combined_filters.elapsed + ) + + with ect.Timer() as timer_process_segmentation: + if segmentation_points is None: epq.mark_segmentation_failed(user_id) + elif len(segmentation_points) == 0: + # no new segments, no need to keep looking at these again + logging.debug("len(segmentation_points) == 0, early return") + epq.mark_segmentation_done(user_id, None) + else: + try: + with ect.Timer() as timer_create_places_trips: + create_places_and_trips(user_id, segmentation_points, filter_method_names[filters_in_df[0]]) + esds.store_pipeline_time( + user_id, + ecwp.PipelineStages.TRIP_SEGMENTATION.name + "/SEGMENT_CURRENT_TRIPS/create_places_and_trips", + time.time(), + timer_create_places_trips.elapsed + ) + + with ect.Timer() as timer_mark_done: + epq.mark_segmentation_done(user_id, get_last_ts_processed(filter_methods)) + esds.store_pipeline_time( + user_id, + ecwp.PipelineStages.TRIP_SEGMENTATION.name + "/SEGMENT_CURRENT_TRIPS/mark_segmentation_done", + time.time(), + timer_mark_done.elapsed + ) + except: + with ect.Timer() as timer_handle_failure: + logging.exception("Trip generation failed for user %s" % user_id) + epq.mark_segmentation_failed(user_id) + esds.store_pipeline_time( + user_id, + ecwp.PipelineStages.TRIP_SEGMENTATION.name + "/SEGMENT_CURRENT_TRIPS/handle_segmentation_failure", + time.time(), + timer_handle_failure.elapsed + ) + esds.store_pipeline_time( + user_id, + ecwp.PipelineStages.TRIP_SEGMENTATION.name + "/SEGMENT_CURRENT_TRIPS/process_segmentation_points", + time.time(), + timer_process_segmentation.elapsed + ) + def get_combined_segmentation_points(ts, loc_df, time_query, filters_in_df, filter_methods): """ @@ -181,13 +289,40 @@ def create_places_and_trips(user_id, segmentation_points, segmentation_method_na # description of dealing with gaps in tracking can be found in the wiki. # Let us first deal with the easy case. # restart_events_df = get_restart_events(ts, time_query) - ts = esta.TimeSeries.get_time_series(user_id) - last_place_entry = esdp.get_last_place_entry(esda.RAW_PLACE_KEY, user_id) + with ect.Timer() as timer_retrieve_ts: + ts = esta.TimeSeries.get_time_series(user_id) + esds.store_pipeline_time( + user_id, + ecwp.PipelineStages.name + "/CREATE_PLACES_TRIPS/retrieve_time_series", + time.time(), + timer_retrieve_ts.elapsed + ) + + with ect.Timer() as timer_get_last_place: + last_place_entry = esdp.get_last_place_entry(esda.RAW_PLACE_KEY, user_id) + esds.store_pipeline_time( + user_id, + ecwp.PipelineStages.name + "/CREATE_PLACES_TRIPS/get_last_place_entry", + time.time(), + timer_get_last_place.elapsed + ) + if last_place_entry is None: - last_place = start_new_chain(user_id) - last_place.source = segmentation_method_name - last_place_entry = ecwe.Entry.create_entry(user_id, - "segmentation/raw_place", last_place, create_id = True) + with ect.Timer() as timer_start_new_chain: + last_place = start_new_chain(user_id) + last_place.source = segmentation_method_name + last_place_entry = ecwe.Entry.create_entry( + user_id, + "segmentation/raw_place", + last_place, + create_id=True + ) + esds.store_pipeline_time( + user_id, + ecwp.PipelineStages.name + "/CREATE_PLACES_TRIPS/start_new_chain", + time.time(), + timer_start_new_chain.elapsed + ) else: last_place = last_place_entry.data @@ -204,45 +339,97 @@ def create_places_and_trips(user_id, segmentation_points, segmentation_method_na logging.debug("start_loc = %s, end_loc = %s" % (start_loc, end_loc)) # Stitch together the last place and the current trip - curr_trip = ecwrt.Rawtrip() - curr_trip.source = segmentation_method_name - curr_trip_entry = ecwe.Entry.create_entry(user_id, - "segmentation/raw_trip", curr_trip, create_id = True) - - new_place = ecwrp.Rawplace() - new_place.source = segmentation_method_name - new_place_entry = ecwe.Entry.create_entry(user_id, - "segmentation/raw_place", new_place, create_id = True) - - if found_untracked_period(ts, last_place_entry.data, start_loc, segmentation_method_name): - # Fill in the gap in the chain with an untracked period - curr_untracked = ecwut.Untrackedtime() - curr_untracked.source = segmentation_method_name - curr_untracked_entry = ecwe.Entry.create_entry(user_id, - "segmentation/raw_untracked", curr_untracked, create_id=True) - - restarted_place = ecwrp.Rawplace() - restarted_place.source = segmentation_method_name - restarted_place_entry = ecwe.Entry.create_entry(user_id, - "segmentation/raw_place", restarted_place, create_id=True) - - untracked_start_loc = ecwe.Entry(ts.get_entry_at_ts("background/filtered_location", - "data.ts", last_place_entry.data.enter_ts)).data - untracked_start_loc["ts"] = untracked_start_loc.ts + epq.END_FUZZ_AVOID_LTE - _link_and_save(ts, last_place_entry, curr_untracked_entry, restarted_place_entry, - untracked_start_loc, start_loc) - logging.debug("Created untracked period %s from %s to %s" % - (curr_untracked_entry.get_id(), curr_untracked_entry.data.start_ts, curr_untracked_entry.data.end_ts)) - logging.debug("Resetting last_place_entry from %s to %s" % - (last_place_entry, restarted_place_entry)) - last_place_entry = restarted_place_entry - - _link_and_save(ts, last_place_entry, curr_trip_entry, new_place_entry, start_loc, end_loc) - last_place_entry = new_place_entry - - # The last last_place hasn't been stitched together yet, but we - # need to save it so that it can be the last_place for the next run - ts.insert(last_place_entry) + with ect.Timer() as timer_stitch: + curr_trip = ecwrt.Rawtrip() + curr_trip.source = segmentation_method_name + curr_trip_entry = ecwe.Entry.create_entry( + user_id, + "segmentation/raw_trip", + curr_trip, + create_id=True + ) + + new_place = ecwrp.Rawplace() + new_place.source = segmentation_method_name + new_place_entry = ecwe.Entry.create_entry( + user_id, + "segmentation/raw_place", + new_place, + create_id=True + ) + + if found_untracked_period(ts, last_place_entry.data, start_loc, segmentation_method_name): + # Fill in the gap in the chain with an untracked period + curr_untracked = ecwut.Untrackedtime() + curr_untracked.source = segmentation_method_name + curr_untracked_entry = ecwe.Entry.create_entry( + user_id, + "segmentation/raw_untracked", + curr_untracked, + create_id=True + ) + + restarted_place = ecwrp.Rawplace() + restarted_place.source = segmentation_method_name + restarted_place_entry = ecwe.Entry.create_entry( + user_id, + "segmentation/raw_place", + restarted_place, + create_id=True + ) + + untracked_start_loc = ecwe.Entry( + ts.get_entry_at_ts( + "background/filtered_location", + "data.ts", + last_place_entry.data.enter_ts + ) + ).data + untracked_start_loc["ts"] = untracked_start_loc.ts + epq.END_FUZZ_AVOID_LTE + _link_and_save( + ts, + last_place_entry, + curr_untracked_entry, + restarted_place_entry, + untracked_start_loc, + start_loc + ) + logging.debug( + "Created untracked period %s from %s to %s" % + (curr_untracked_entry.get_id(), curr_untracked_entry.data.start_ts, curr_untracked_entry.data.end_ts) + ) + logging.debug( + "Resetting last_place_entry from %s to %s" % + (last_place_entry, restarted_place_entry) + ) + last_place_entry = restarted_place_entry + + _link_and_save( + ts, + last_place_entry, + curr_trip_entry, + new_place_entry, + start_loc, + end_loc + ) + last_place_entry = new_place_entry + esds.store_pipeline_time( + user_id, + ecwp.PipelineStages.name + "/CREATE_PLACES_TRIPS/stitch_trip_place", + time.time(), + timer_stitch.elapsed + ) + + with ect.Timer() as timer_insert_last_place: + # The last last_place hasn't been stitched together yet, but we + # need to save it so that it can be the last_place for the next run + ts.insert(last_place_entry) + esds.store_pipeline_time( + user_id, + ecwp.PipelineStages.name + "/CREATE_PLACES_TRIPS/insert_last_place", + time.time(), + timer_insert_last_place.elapsed + ) def _link_and_save(ts, last_place_entry, curr_trip_entry, new_place_entry, start_loc, end_loc): stitch_together_start(last_place_entry, curr_trip_entry, start_loc)