diff --git a/activitysim/abm/models/util/cdap.py b/activitysim/abm/models/util/cdap.py index ac614a768..6ed14d492 100644 --- a/activitysim/abm/models/util/cdap.py +++ b/activitysim/abm/models/util/cdap.py @@ -313,6 +313,20 @@ def get_cached_spec(state: workflow.State, hhsize): return None +def get_cached_joint_spec(hhsize): + + spec_name = cached_joint_spec_name(hhsize) + + spec = inject.get_injectable(spec_name, None) + if spec is not None: + logger.debug( + "build_cdap_joint_spec returning cached injectable spec %s", spec_name + ) + return spec + + return None + + def get_cached_joint_spec(state: workflow.State, hhsize): spec_name = cached_joint_spec_name(hhsize) @@ -345,6 +359,12 @@ def cache_joint_spec(hhsize, spec): inject.add_injectable(spec_name, spec) +def cache_joint_spec(hhsize, spec): + spec_name = cached_joint_spec_name(hhsize) + # cache as injectable + inject.add_injectable(spec_name, spec) + + def build_cdap_spec( state: workflow.State, interaction_coefficients, @@ -550,6 +570,207 @@ def build_cdap_spec( return spec +def build_cdap_joint_spec( + joint_tour_coefficients, hhsize, trace_spec=False, trace_label=None, cache=True +): + """ + Build a spec file for computing joint tour utilities of alternative household member for households of specified size. + We generate this spec automatically from a table of rules and coefficients because the + interaction rules are fairly simple and can be expressed compactly whereas + there is a lot of redundancy between the spec files for different household sizes, as well as + in the vectorized expression of the interaction alternatives within the spec file itself + joint_tour_coefficients has five columns: + label + label of the expression + description + description of the expression + dependency + if the expression is dependent on alternative, and which alternative is it dependent on + (e.g. M_px, N_px, H_px) + expression + expression of the utility term + coefficient + The coefficient to apply for the alternative + The generated spec will have the eval expression in the index, and a utility column for each + alternative (e.g. ['HH', 'HM', 'HN', 'MH', 'MM', 'MN', 'NH', 'NM', 'NN', 'MMJ', 'MNJ', 'NMJ', 'NNJ'] for hhsize 2 with joint alts) + Parameters + ---------- + joint_tour_coefficients : pandas.DataFrame + Rules and coefficients for generating joint tour specs for different household sizes + hhsize : int + household size for which the spec should be built. + Returns + ------- + spec: pandas.DataFrame + """ + + t0 = tracing.print_elapsed_time() + + # cdap joint spec is same for all households of MAX_HHSIZE and greater + hhsize = min(hhsize, MAX_HHSIZE) + + if cache: + spec = get_cached_joint_spec(hhsize) + if spec is not None: + return spec + + expression_name = "Expression" + + # generate a list of activity pattern alternatives for this hhsize + # e.g. ['HH', 'HM', 'HN', 'MH', 'MM', 'MN', 'NH', 'NM', 'NN'] for hhsize=2 + alternatives = ["".join(tup) for tup in itertools.product("HMN", repeat=hhsize)] + + joint_alternatives = [ + "".join(tup) + "J" + for tup in itertools.product("HMN", repeat=hhsize) + if tup.count("M") + tup.count("N") >= 2 + ] + alternatives = alternatives + joint_alternatives + + # spec df has expression column plus a column for each alternative + spec = pd.DataFrame(columns=[expression_name] + alternatives) + + # Before processing the interaction_coefficients, we add add rows to the spec to carry + # the alternative utilities previously computed for each individual into all hh alternative + # columns in which the individual assigned that alternative. The Expression column contains + # the name of the choosers column with that individuals utility for the individual alternative + # and the hh alternative columns that should receive that utility are given a value of 1 + # e.g. M_p1 is a column in choosers with the individual utility to person p1 of alternative M + # Expression MM MN MH NM NN NH HM HN HH + # M_p1 1.0 1.0 1.0 0.0 0.0 0.0 0.0 0.0 0.0 + # N_p1 0.0 0.0 0.0 1.0 1.0 1.0 0.0 0.0 0.0 + for pnum in range(1, hhsize + 1): + for activity in ["M", "N", "H"]: + + new_row_index = len(spec) + spec.loc[new_row_index, expression_name] = add_pn(activity, pnum) + + # list of alternative columns where person pnum has expression activity + # e.g. for M_p1 we want the columns where activity M is in position p1 + alternative_columns = [ + alt for alt in alternatives if alt[pnum - 1] == activity + ] + spec.loc[new_row_index, alternative_columns] = 1 + + # for each row in the joint util table + for row in joint_tour_coefficients.itertuples(): + + # if there is no dependencies + if row.dependency is np.nan: + expression = row.Expression + # add a new row to spec + new_row_index = len(spec) + spec.loc[new_row_index, expression_name] = expression + spec.loc[new_row_index, alternatives] = row.coefficient + # if there is dependencies + else: + dependency_name = row.dependency + expression = row.Expression + coefficient = row.coefficient + if dependency_name in ["M_px", "N_px", "H_px"]: + if "_pxprod" in expression: + prod_conds = row.Expression.split("|") + expanded_expressions = [ + tup + for tup in itertools.product( + range(len(prod_conds)), repeat=hhsize + ) + ] + for expression_tup in expanded_expressions: + expression_list = [] + dependency_list = [] + for counter in range(len(expression_tup)): + expression_list.append( + prod_conds[expression_tup[counter]].replace( + "xprod", str(counter + 1) + ) + ) + if expression_tup[counter] == 0: + dependency_list.append( + dependency_name.replace("x", str(counter + 1)) + ) + + expression_value = "&".join(expression_list) + dependency_value = pd.Series( + np.ones(len(alternatives)), index=alternatives + ) + if len(dependency_list) > 0: + for dependency in dependency_list: + # temp = spec.loc[spec[expression_name]==dependency, alternatives].squeeze().fillna(0) + dependency_value *= ( + spec.loc[ + spec[expression_name] == dependency, + alternatives, + ] + .squeeze() + .fillna(0) + ) + + # add a new row to spec + new_row_index = len(spec) + spec.loc[new_row_index] = dependency_value + spec.loc[new_row_index, expression_name] = expression_value + spec.loc[new_row_index, alternatives] = ( + spec.loc[new_row_index, alternatives] * coefficient + ) + + elif "_px" in expression: + for pnum in range(1, hhsize + 1): + dependency_name = row.dependency.replace("x", str(pnum)) + expression = row.Expression.replace("x", str(pnum)) + + # add a new row to spec + new_row_index = len(spec) + spec.loc[new_row_index] = spec.loc[ + spec[expression_name] == dependency_name + ].squeeze() + spec.loc[new_row_index, expression_name] = expression + spec.loc[new_row_index, alternatives] = ( + spec.loc[new_row_index, alternatives] * coefficient + ) + + # drop dependency rows + spec = spec[~spec[expression_name].str.startswith(("M_p", "N_p", "H_p"))] + + # eval expression goes in the index + spec.set_index(expression_name, inplace=True) + + for c in spec.columns: + spec[c] = spec[c].fillna(0) + + simulate.uniquify_spec_index(spec) + + # make non-joint alts 0 + for c in alternatives: + if c.endswith("J"): + continue + else: + spec[c] = 0 + + if trace_spec: + tracing.trace_df( + spec, + "%s.hhsize%d_joint_spec" % (trace_label, hhsize), + transpose=False, + slicer="NONE", + ) + + if trace_spec: + tracing.trace_df( + spec, + "%s.hhsize%d_joint_spec_patched" % (trace_label, hhsize), + transpose=False, + slicer="NONE", + ) + + if cache: + cache_joint_spec(hhsize, spec) + + t0 = tracing.print_elapsed_time("build_cdap_joint_spec hh_size %s" % hhsize, t0) + + return spec + + def build_cdap_joint_spec( state: workflow.State, joint_tour_coefficients, @@ -992,8 +1213,6 @@ def household_activity_choices( # add joint util to util utils = utils.add(joint_tour_utils) - probs = logit.utils_to_probs(state, utils, trace_label=trace_label) - # select an activity pattern alternative for each household based on probability # result is a series indexed on _hh_index_ with the (0 based) index of the column from probs idx_choices, rands = logit.make_choices(state, probs, trace_label=trace_label) @@ -1287,6 +1506,13 @@ def _run_cdap( lambda x: 1 if "J" in x else 0 ) + # return household joint tour flag + if add_joint_tour_utility: + hh_activity_choices = hh_activity_choices.to_frame(name="hh_choices") + hh_activity_choices["has_joint_tour"] = hh_activity_choices["hh_choices"].apply( + lambda x: 1 if "J" in x else 0 + ) + # if DUMP: # state.tracing.trace_df(hh_activity_choices, '%s.DUMP.hh_activity_choices' % trace_label, # transpose=False, slicer='NONE') diff --git a/activitysim/core/configuration.py b/activitysim/core/configuration.py new file mode 100644 index 000000000..3174653b2 --- /dev/null +++ b/activitysim/core/configuration.py @@ -0,0 +1,291 @@ +from typing import Union + +try: + from pydantic import BaseModel as PydanticBase +except ModuleNotFoundError: + + class PydanticBase: + pass + + +class InputTable(PydanticBase): + """ + The features that define an input table to be read by ActivitySim. + """ + + tablename: str + """Name of the injected table""" + + filename: str = None + """ + Name of the CSV or HDF5 file to read. + + If not provided, defaults to `input_store` + """ + + index_col: str = None + """table column to use for the index""" + + rename_columns: dict[str, str] = None + """dictionary of column name mappings""" + + keep_columns: list[str] = None + """ + columns to keep once read in to memory. + + Save only the columns needed for modeling or analysis to save on memory + and file I/O + """ + + h5_tablename: str = None + """table name if reading from HDF5 and different from `tablename`""" + + +class Settings(PydanticBase): + """ + The overall settings for the ActivitySim model system. + + The input for these settings is typically stored in one main YAML file, + usually called ``settings.yaml``. + + Note that this implementation is presently used only for generating + documentation, but future work may migrate the settings implementation to + actually use this pydantic code to validate the settings before running + the model. + """ + + models: list[str] + """ + list of model steps to run - auto ownership, tour frequency, etc. + + See :ref:`model_steps` for more details about each step. + """ + + resume_after: str = None + """to resume running the data pipeline after the last successful checkpoint""" + + input_table_list: list[InputTable] + """list of table names, indices, and column re-maps for each table in `input_store`""" + + input_store: str = None + """HDF5 inputs file""" + + create_input_store: bool = False + """ + Write the inputs as read in back to an HDF5 store. + + If enabled, this writes the store to the outputs folder to use for subsequent + model runs, as reading HDF5 can be faster than reading CSV files.""" + + households_sample_size: int = None + """ + Number of households to sample and simulate + + If omitted or set to 0, ActivitySim will simulate all households. + """ + trace_hh_id: Union[int, list] = None + """ + Trace household id(s) + + If omitted, no tracing is written out + """ + + trace_od: list[int] = None + """ + Trace origin, destination pair in accessibility calculation + + If omitted, no tracing is written out. + """ + + chunk_training_mode: str = None + """ + The method to use for chunk training. + + Valid values include {disabled, training, production, adaptive}. + See :ref:`chunk_size` for more details. + """ + + chunk_size: int = None + """ + Approximate amount of RAM to allocate to ActivitySim for batch processing. + + See :ref:`chunk_size` for more details. + """ + + chunk_method: str = None + """ + Memory use measure to use for chunking. + + See :ref:`chunk_size`. + """ + + checkpoints: Union[bool, list] = True + """ + When to write checkpoint (intermediate table states) to disk. + + If True, checkpoints are written at each step. If False, no intermediate + checkpoints will be written before the end of run. Or, provide an explicit + list of models to checkpoint. + """ + + check_for_variability: bool = False + """ + Debugging feature to find broken model specifications. + + Enabling this check does not alter valid results but slows down model runs. + """ + + log_alt_losers: bool = False + """ + Write out expressions when all alternatives are unavailable. + + This can be useful for model development to catch errors in specifications. + Enabling this check does not alter valid results but slows down model runs. + """ + + use_shadow_pricing: bool = False + """turn shadow_pricing on and off for work and school location""" + + output_tables: list[str] = None + """list of output tables to write to CSV or HDF5""" + + want_dest_choice_sample_tables: bool = False + """turn writing of sample_tables on and off for all models""" + + cleanup_pipeline_after_run: bool = False + """ + Cleans up pipeline after successful run. + + This will clean up pipeline only after successful runs, by creating a + single-checkpoint pipeline file, and deleting any subprocess pipelines. + """ + + sharrow: Union[bool, str] = False + """ + Set the sharrow operating mode. + + .. versionadded:: 1.2 + + * `false` - Do not use sharrow. This is the default if no value is given. + * `true` - Use sharrow optimizations when possible, but fall back to + legacy `pandas.eval` systems when any error is encountered. This is the + preferred mode for running with sharrow if reliability is more important + than performance. + * `require` - Use sharrow optimizations, and raise an error if they fail + unexpectedly. This is the preferred mode for running with sharrow + if performance is a concern. + * `test` - Run every relevant calculation using both sharrow and legacy + systems, and compare them to ensure the results match. This is the slowest + mode of operation, but useful for development and debugging. + """ + + +class ZarrDigitalEncoding(PydanticBase): + """Digital encoding instructions for skim tables. + + .. versionadded:: 1.2 + """ + + regex: str + """A regular expression for matching skim matrix names. + + All skims with names that match under typical regular expression rules + for Python will be processed together. + """ + + joint_dict: str + """The name of the joint dictionary for this group. + + This must be a unique name for this set of skims, and a new array + will be added to the Dataset with this name. It will be an integer- + type array indicating the position of each element in the jointly + encoded dictionary.""" + + +class TAZ_Settings(PydanticBase): + """ + Complex settings for TAZ skims that are not just OMX file(s). + + .. versionadded:: 1.2 + """ + + omx: str = None + """The filename of the data stored in OMX format. + + This is treated as a fallback for the raw input data, if ZARR format data + is not available. + """ + + zarr: str = None + """The filename of the data stored in ZARR format. + + Reading ZARR data can be much faster than reading OMX format data, so if + this filename is given, the ZARR file format is preferred if it exists. If + it does not exist, then OMX data is read in and then ZARR data is written + out for future usage. + + .. versionadded:: 1.2 + """ + + zarr_digital_encoding: list[ZarrDigitalEncoding] = None + """ + A list of encodings to apply before saving skims in ZARR format. + + .. versionadded:: 1.2 + """ + + +class NetworkSettings(PydanticBase): + """ + Network level of service and skims settings + + The input for these settings is typically stored in one YAML file, + usually called ``network_los.yaml``. + """ + + zone_system: int + """Which zone system type is used. + + * 1 - TAZ only. + * 2 - MAZ and TAZ. + * 3 - MAZ, TAZ, and TAP + """ + + taz_skims: Union[str, TAZ_Settings] = None + """Instructions for how to load and pre-process skim matrices. + + If given as a string, it is interpreted as the location for OMX file(s), + either as a single file or as a glob-matching pattern for multiple files. + The time period for the matrix must be represented at the end of the matrix + name and be seperated by a double_underscore (e.g. `BUS_IVT__AM` indicates base + skim BUS_IVT with a time period of AM. + + Alternatively, this can be given as a nested dictionary defined via the + TAZ_Settings class, which allows for ZARR transformation and pre-processing. + """ + + skim_time_periods: dict + """time period upper bound values and labels + + * ``time_window`` - total duration (in minutes) of the modeled time span (Default: 1440 minutes (24 hours)) + * ``period_minutes`` - length of time (in minutes) each model time period represents. Must be whole factor of ``time_window``. (Default: 60 minutes) + * ``periods`` - Breakpoints that define the aggregate periods for skims and assignment + * ``labels`` - Labels to define names for aggregate periods for skims and assignment + """ + + read_skim_cache: bool = False + """Read cached skims (using numpy memmap) from output directory. + + Reading from memmap is much faster than omx, but the memmap is a huge + uncompressed file. + """ + + write_skim_cache: bool = False + """Write memmapped cached skims to output directory. + + This is needed if you want to use the cached skims to speed up subsequent + runs. + """ + + cache_dir: str = None + """alternate dir to read/write cache files (defaults to output_dir)"""