From 5e49e4ad39e57c40aa3509997de6e920fcfd4b12 Mon Sep 17 00:00:00 2001 From: Corvince Date: Tue, 27 Feb 2024 13:39:04 +0100 Subject: [PATCH] Add cell-centric discrete spaces (experimental) (#1994) ## Summary This PR introduces an alteranative implementation for discrete spaces. This implementation centers on the explicit inclusion of a Cell class. Agents can occupy cells. Cells have connections, specifying their neighbors. The resulting classes offer a cell centric API where agents interact with a cell, and query the cell for its neighbors. To capture a collection of cells, and their content (_i.e._, Agents), this PR adds a new CellCollection class. This is an immutable collection of cells with convenient attribute accessors to the cells, or their agents. This PR also includes a CellAgent class which extends the default Agent class by adding a `move_to` method that works in conjunction with the new discrete spaces. From a performance point of view, the current code is a bit slower in building the grid and cell data structure, but in most use cases this increase in time for model initialization will be more than offset by the faster retrieval of neighboring cells and the agents that occupy them. ## Motive The PR emerged out of various experiments aimed at improving the performance of the current discrete space code. Moreover, it turned out that a cell centric API resolved various open issues (_e.g._, #1900, #1903, #1953). ## Implementation The key idea is to have Cells with connections, and using this to generate neighborhoods for a given radius. So all discrete space classes are in essence a [linked data structure](https://en.wikipedia.org/wiki/Linked_data_structure). The cell centric API idea is used to implement 4 key discrete space classes: OrthogonalMooreGrid, OrthogonalVonNeumannGrid (alternative for SingleGrid and MultiGrid, and moore and von Neumann neighborhood) , HexGrid (alternative for SingleHexGrid and MultiHexGrid), and Network (alternative for NetworkGrid). Cells have a capacity, so there is no longer a need for seperating Single and Multi grids. Moore and von Neumann reflect different neighborhood connections and so are now implemented as seperate classes. --------- Co-authored-by: Jan Kwakkel --- benchmarks/Schelling/schelling.py | 51 +- benchmarks/WolfSheep/wolf_sheep.py | 68 +-- mesa/experimental/__init__.py | 4 + mesa/experimental/cell_space/__init__.py | 23 + mesa/experimental/cell_space/cell.py | 152 ++++++ mesa/experimental/cell_space/cell_agent.py | 37 ++ .../cell_space/cell_collection.py | 81 +++ .../experimental/cell_space/discrete_space.py | 65 +++ mesa/experimental/cell_space/grid.py | 206 ++++++++ mesa/experimental/cell_space/network.py | 40 ++ tests/test_cell_space.py | 465 ++++++++++++++++++ 11 files changed, 1136 insertions(+), 56 deletions(-) create mode 100644 mesa/experimental/cell_space/__init__.py create mode 100644 mesa/experimental/cell_space/cell.py create mode 100644 mesa/experimental/cell_space/cell_agent.py create mode 100644 mesa/experimental/cell_space/cell_collection.py create mode 100644 mesa/experimental/cell_space/discrete_space.py create mode 100644 mesa/experimental/cell_space/grid.py create mode 100644 mesa/experimental/cell_space/network.py create mode 100644 tests/test_cell_space.py diff --git a/benchmarks/Schelling/schelling.py b/benchmarks/Schelling/schelling.py index c4b7b01b520..c7dd3bf1deb 100644 --- a/benchmarks/Schelling/schelling.py +++ b/benchmarks/Schelling/schelling.py @@ -1,12 +1,14 @@ -import mesa +from mesa import Model +from mesa.experimental.cell_space import CellAgent, OrthogonalMooreGrid +from mesa.time import RandomActivation -class SchellingAgent(mesa.Agent): +class SchellingAgent(CellAgent): """ Schelling segregation agent """ - def __init__(self, unique_id, model, agent_type): + def __init__(self, unique_id, model, agent_type, radius, homophily): """ Create a new Schelling agent. Args: @@ -16,31 +18,32 @@ def __init__(self, unique_id, model, agent_type): """ super().__init__(unique_id, model) self.type = agent_type + self.radius = radius + self.homophily = homophily def step(self): similar = 0 - for neighbor in self.model.grid.iter_neighbors( - self.pos, moore=True, radius=self.model.radius - ): + neighborhood = self.cell.neighborhood(radius=self.radius) + for neighbor in neighborhood.agents: if neighbor.type == self.type: similar += 1 # If unhappy, move: - if similar < self.model.homophily: - self.model.grid.move_to_empty(self) + if similar < self.homophily: + self.move_to(self.model.grid.select_random_empty_cell()) else: self.model.happy += 1 -class Schelling(mesa.Model): +class Schelling(Model): """ Model class for the Schelling segregation model. """ def __init__( self, - width=40, height=40, + width=40, homophily=3, radius=1, density=0.8, @@ -51,7 +54,7 @@ def __init__( Create a new Schelling model. Args: - width, height: Size of the space. + height, width: Size of the space. density: Initial Chance for a cell to populated minority_pc: Chances for an agent to be in minority class homophily: Minimum number of agents of same class needed to be happy @@ -59,15 +62,18 @@ def __init__( seed: Seed for Reproducibility """ super().__init__(seed=seed) - self.width = width self.height = height + self.width = width self.density = density self.minority_pc = minority_pc - self.homophily = homophily - self.radius = radius - self.schedule = mesa.time.RandomActivation(self) - self.grid = mesa.space.SingleGrid(width, height, torus=True) + self.schedule = RandomActivation(self) + self.grid = OrthogonalMooreGrid( + [height, width], + torus=True, + capacity=1, + random=self.random, + ) self.happy = 0 @@ -75,11 +81,13 @@ def __init__( # We use a grid iterator that returns # the coordinates of a cell as well as # its contents. (coord_iter) - for _, pos in self.grid.coord_iter(): + for cell in self.grid: if self.random.random() < self.density: agent_type = 1 if self.random.random() < self.minority_pc else 0 - agent = SchellingAgent(self.next_id(), self, agent_type) - self.grid.place_agent(agent, pos) + agent = SchellingAgent( + self.next_id(), self, agent_type, radius, homophily + ) + agent.move_to(cell) self.schedule.add(agent) def step(self): @@ -93,13 +101,12 @@ def step(self): if __name__ == "__main__": import time - # model = Schelling(seed=15, width=40, height=40, homophily=3, radius=1, density=0.625) + # model = Schelling(seed=15, height=40, width=40, homophily=3, radius=1, density=0.625) model = Schelling( - seed=15, width=100, height=100, homophily=8, radius=2, density=0.8 + seed=15, height=100, width=100, homophily=8, radius=2, density=0.8 ) start_time = time.perf_counter() for _ in range(100): model.step() - print(time.perf_counter() - start_time) diff --git a/benchmarks/WolfSheep/wolf_sheep.py b/benchmarks/WolfSheep/wolf_sheep.py index 5d4d9fef5ff..11e07a9e954 100644 --- a/benchmarks/WolfSheep/wolf_sheep.py +++ b/benchmarks/WolfSheep/wolf_sheep.py @@ -9,41 +9,40 @@ Northwestern University, Evanston, IL. """ -import mesa +import math +from mesa import Model +from mesa.experimental.cell_space import CellAgent, OrthogonalVonNeumannGrid +from mesa.time import RandomActivationByType -class Animal(mesa.Agent): - def __init__(self, unique_id, model, moore, energy, p_reproduce, energy_from_food): + +class Animal(CellAgent): + def __init__(self, unique_id, model, energy, p_reproduce, energy_from_food): super().__init__(unique_id, model) self.energy = energy self.p_reproduce = p_reproduce self.energy_from_food = energy_from_food - self.moore = moore def random_move(self): - next_moves = self.model.grid.get_neighborhood(self.pos, self.moore, True) - next_move = self.random.choice(next_moves) - # Now move: - self.model.grid.move_agent(self, next_move) + self.move_to(self.cell.neighborhood().select_random_cell()) def spawn_offspring(self): self.energy /= 2 offspring = self.__class__( self.model.next_id(), self.model, - self.moore, self.energy, self.p_reproduce, self.energy_from_food, ) - self.model.grid.place_agent(offspring, self.pos) + offspring.move_to(self.cell) self.model.schedule.add(offspring) def feed(self): ... def die(self): - self.model.grid.remove_agent(self) + self.cell.remove_agent(self) self.remove() def step(self): @@ -67,8 +66,9 @@ class Sheep(Animal): def feed(self): # If there is grass available, eat it - agents = self.model.grid.get_cell_list_contents(self.pos) - grass_patch = next(obj for obj in agents if isinstance(obj, GrassPatch)) + grass_patch = next( + obj for obj in self.cell.agents if isinstance(obj, GrassPatch) + ) if grass_patch.fully_grown: self.energy += self.energy_from_food grass_patch.fully_grown = False @@ -80,8 +80,7 @@ class Wolf(Animal): """ def feed(self): - agents = self.model.grid.get_cell_list_contents(self.pos) - sheep = [obj for obj in agents if isinstance(obj, Sheep)] + sheep = [obj for obj in self.cell.agents if isinstance(obj, Sheep)] if len(sheep) > 0: sheep_to_eat = self.random.choice(sheep) self.energy += self.energy_from_food @@ -90,7 +89,7 @@ def feed(self): sheep_to_eat.die() -class GrassPatch(mesa.Agent): +class GrassPatch(CellAgent): """ A patch of grass that grows at a fixed rate and it is eaten by sheep """ @@ -100,7 +99,7 @@ def __init__(self, unique_id, model, fully_grown, countdown): Creates a new patch of grass Args: - grown: (boolean) Whether the patch of grass is fully grown or not + fully_grown: (boolean) Whether the patch of grass is fully grown or not countdown: Time for the patch of grass to be fully grown again """ super().__init__(unique_id, model) @@ -117,7 +116,7 @@ def step(self): self.countdown -= 1 -class WolfSheep(mesa.Model): +class WolfSheep(Model): """ Wolf-Sheep Predation Model @@ -126,7 +125,6 @@ class WolfSheep(mesa.Model): def __init__( self, - seed, height, width, initial_sheep, @@ -136,7 +134,7 @@ def __init__( grass_regrowth_time, wolf_gain_from_food=13, sheep_gain_from_food=5, - moore=False, + seed=None, ): """ Create a new Wolf-Sheep model with the given parameters. @@ -152,6 +150,7 @@ def __init__( once it is eaten sheep_gain_from_food: Energy sheep gain from grass, if enabled. moore: + seed """ super().__init__(seed=seed) # Set parameters @@ -161,9 +160,15 @@ def __init__( self.initial_wolves = initial_wolves self.grass_regrowth_time = grass_regrowth_time - self.schedule = mesa.time.RandomActivationByType(self) - self.grid = mesa.space.MultiGrid(self.height, self.width, torus=False) + self.schedule = RandomActivationByType(self) + self.grid = OrthogonalVonNeumannGrid( + [self.height, self.width], + torus=False, + capacity=math.inf, + random=self.random, + ) + # Create sheep: for _ in range(self.initial_sheep): pos = ( self.random.randrange(self.width), @@ -171,14 +176,9 @@ def __init__( ) energy = self.random.randrange(2 * sheep_gain_from_food) sheep = Sheep( - self.next_id(), - self, - moore, - energy, - sheep_reproduce, - sheep_gain_from_food, + self.next_id(), self, energy, sheep_reproduce, sheep_gain_from_food ) - self.grid.place_agent(sheep, pos) + sheep.move_to(self.grid[pos]) self.schedule.add(sheep) # Create wolves @@ -189,21 +189,21 @@ def __init__( ) energy = self.random.randrange(2 * wolf_gain_from_food) wolf = Wolf( - self.next_id(), self, moore, energy, wolf_reproduce, wolf_gain_from_food + self.next_id(), self, energy, wolf_reproduce, wolf_gain_from_food ) - self.grid.place_agent(wolf, pos) + wolf.move_to(self.grid[pos]) self.schedule.add(wolf) # Create grass patches possibly_fully_grown = [True, False] - for _agent, pos in self.grid.coord_iter(): + for cell in self.grid: fully_grown = self.random.choice(possibly_fully_grown) if fully_grown: countdown = self.grass_regrowth_time else: countdown = self.random.randrange(self.grass_regrowth_time) patch = GrassPatch(self.next_id(), self, fully_grown, countdown) - self.grid.place_agent(patch, pos) + patch.move_to(cell) self.schedule.add(patch) def step(self): @@ -213,7 +213,7 @@ def step(self): if __name__ == "__main__": import time - model = WolfSheep(15, 25, 25, 60, 40, 0.2, 0.1, 20) + model = WolfSheep(25, 25, 60, 40, 0.2, 0.1, 20, seed=15) start_time = time.perf_counter() for _ in range(100): diff --git a/mesa/experimental/__init__.py b/mesa/experimental/__init__.py index c04f22589b3..961b8762791 100644 --- a/mesa/experimental/__init__.py +++ b/mesa/experimental/__init__.py @@ -1 +1,5 @@ from .jupyter_viz import JupyterViz, make_text, Slider # noqa +from mesa.experimental import cell_space + + +__all__ = ["JupyterViz", "make_text", "Slider", "cell_space"] diff --git a/mesa/experimental/cell_space/__init__.py b/mesa/experimental/cell_space/__init__.py new file mode 100644 index 00000000000..dce296aebce --- /dev/null +++ b/mesa/experimental/cell_space/__init__.py @@ -0,0 +1,23 @@ +from mesa.experimental.cell_space.cell import Cell +from mesa.experimental.cell_space.cell_agent import CellAgent +from mesa.experimental.cell_space.cell_collection import CellCollection +from mesa.experimental.cell_space.discrete_space import DiscreteSpace +from mesa.experimental.cell_space.grid import ( + Grid, + HexGrid, + OrthogonalMooreGrid, + OrthogonalVonNeumannGrid, +) +from mesa.experimental.cell_space.network import Network + +__all__ = [ + "CellCollection", + "Cell", + "CellAgent", + "DiscreteSpace", + "Grid", + "HexGrid", + "OrthogonalMooreGrid", + "OrthogonalVonNeumannGrid", + "Network", +] diff --git a/mesa/experimental/cell_space/cell.py b/mesa/experimental/cell_space/cell.py new file mode 100644 index 00000000000..55264f68daa --- /dev/null +++ b/mesa/experimental/cell_space/cell.py @@ -0,0 +1,152 @@ +from __future__ import annotations + +from functools import cache +from random import Random +from typing import TYPE_CHECKING + +from mesa.experimental.cell_space.cell_collection import CellCollection + +if TYPE_CHECKING: + from mesa.experimental.cell_space.cell_agent import CellAgent + + +class Cell: + """The cell represents a position in a discrete space. + + Attributes: + coordinate (Tuple[int, int]) : the position of the cell in the discrete space + agents (List[Agent]): the agents occupying the cell + capacity (int): the maximum number of agents that can simultaneously occupy the cell + properties (dict[str, Any]): the properties of the cell + random (Random): the random number generator + + """ + + __slots__ = [ + "coordinate", + "_connections", + "agents", + "capacity", + "properties", + "random", + ] + + # def __new__(cls, + # coordinate: tuple[int, ...], + # capacity: float | None = None, + # random: Random | None = None,): + # if capacity != 1: + # return object.__new__(cls) + # else: + # return object.__new__(SingleAgentCell) + + def __init__( + self, + coordinate: tuple[int, ...], + capacity: float | None = None, + random: Random | None = None, + ) -> None: + """ " + + Args: + coordinate: + capacity (int) : the capacity of the cell. If None, the capacity is infinite + random (Random) : the random number generator to use + + """ + super().__init__() + self.coordinate = coordinate + self._connections: list[Cell] = [] # TODO: change to CellCollection? + self.agents = [] # TODO:: change to AgentSet or weakrefs? (neither is very performant, ) + self.capacity = capacity + self.properties: dict[str, object] = {} + self.random = random + + def connect(self, other: Cell) -> None: + """Connects this cell to another cell. + + Args: + other (Cell): other cell to connect to + + """ + self._connections.append(other) + + def disconnect(self, other: Cell) -> None: + """Disconnects this cell from another cell. + + Args: + other (Cell): other cell to remove from connections + + """ + self._connections.remove(other) + + def add_agent(self, agent: CellAgent) -> None: + """Adds an agent to the cell. + + Args: + agent (CellAgent): agent to add to this Cell + + """ + n = len(self.agents) + + if self.capacity and n >= self.capacity: + raise Exception( + "ERROR: Cell is full" + ) # FIXME we need MESA errors or a proper error + + self.agents.append(agent) + + def remove_agent(self, agent: CellAgent) -> None: + """Removes an agent from the cell. + + Args: + agent (CellAgent): agent to remove from this cell + + """ + self.agents.remove(agent) + agent.cell = None + + @property + def is_empty(self) -> bool: + """Returns a bool of the contents of a cell.""" + return len(self.agents) == 0 + + @property + def is_full(self) -> bool: + """Returns a bool of the contents of a cell.""" + return len(self.agents) == self.capacity + + def __repr__(self): + return f"Cell({self.coordinate}, {self.agents})" + + # FIXME: Revisit caching strategy on methods + @cache # noqa: B019 + def neighborhood(self, radius=1, include_center=False): + return CellCollection( + self._neighborhood(radius=radius, include_center=include_center), + random=self.random, + ) + + # FIXME: Revisit caching strategy on methods + @cache # noqa: B019 + def _neighborhood(self, radius=1, include_center=False): + # if radius == 0: + # return {self: self.agents} + if radius < 1: + raise ValueError("radius must be larger than one") + if radius == 1: + neighborhood = {neighbor: neighbor.agents for neighbor in self._connections} + if not include_center: + return neighborhood + else: + neighborhood[self] = self.agents + return neighborhood + else: + neighborhood = {} + for neighbor in self._connections: + neighborhood.update( + neighbor._neighborhood(radius - 1, include_center=True) + ) + if not include_center: + neighborhood.pop(self, None) + return neighborhood diff --git a/mesa/experimental/cell_space/cell_agent.py b/mesa/experimental/cell_space/cell_agent.py new file mode 100644 index 00000000000..abc5155a670 --- /dev/null +++ b/mesa/experimental/cell_space/cell_agent.py @@ -0,0 +1,37 @@ +from __future__ import annotations + +from typing import TYPE_CHECKING + +from mesa import Agent, Model + +if TYPE_CHECKING: + from mesa.experimental.cell_space.cell import Cell + + +class CellAgent(Agent): + """Cell Agent is an extension of the Agent class and adds behavior for moving in discrete spaces + + + Attributes: + unique_id (int): A unique identifier for this agent. + model (Model): The model instance to which the agent belongs + pos: (Position | None): The position of the agent in the space + cell: (Cell | None): the cell which the agent occupies + """ + + def __init__(self, unique_id: int, model: Model) -> None: + """ + Create a new agent. + + Args: + unique_id (int): A unique identifier for this agent. + model (Model): The model instance in which the agent exists. + """ + super().__init__(unique_id, model) + self.cell: Cell | None = None + + def move_to(self, cell) -> None: + if self.cell is not None: + self.cell.remove_agent(self) + self.cell = cell + cell.add_agent(self) diff --git a/mesa/experimental/cell_space/cell_collection.py b/mesa/experimental/cell_space/cell_collection.py new file mode 100644 index 00000000000..9ca36589849 --- /dev/null +++ b/mesa/experimental/cell_space/cell_collection.py @@ -0,0 +1,81 @@ +from __future__ import annotations + +import itertools +from collections.abc import Iterable, Mapping +from functools import cached_property +from random import Random +from typing import TYPE_CHECKING, Callable, Generic, TypeVar + +if TYPE_CHECKING: + from mesa.experimental.cell_space.cell import Cell + from mesa.experimental.cell_space.cell_agent import CellAgent + +T = TypeVar("T", bound="Cell") + + +class CellCollection(Generic[T]): + """An immutable collection of cells + + Attributes: + cells (List[Cell]): The list of cells this collection represents + agents (List[CellAgent]) : List of agents occupying the cells in this collection + random (Random) : The random number generator + + """ + + def __init__( + self, + cells: Mapping[T, list[CellAgent]] | Iterable[T], + random: Random | None = None, + ) -> None: + if isinstance(cells, dict): + self._cells = cells + else: + self._cells = {cell: cell.agents for cell in cells} + + # + self._capacity: int = next(iter(self._cells.keys())).capacity + + if random is None: + random = Random() # FIXME + self.random = random + + def __iter__(self): + return iter(self._cells) + + def __getitem__(self, key: T) -> Iterable[CellAgent]: + return self._cells[key] + + # @cached_property + def __len__(self) -> int: + return len(self._cells) + + def __repr__(self): + return f"CellCollection({self._cells})" + + @cached_property + def cells(self) -> list[T]: + return list(self._cells.keys()) + + @property + def agents(self) -> Iterable[CellAgent]: + return itertools.chain.from_iterable(self._cells.values()) + + def select_random_cell(self) -> T: + return self.random.choice(self.cells) + + def select_random_agent(self) -> CellAgent: + return self.random.choice(list(self.agents)) + + def select(self, filter_func: Callable[[T], bool] | None = None, n=0): + # FIXME: n is not considered + if filter_func is None and n == 0: + return self + + return CellCollection( + { + cell: agents + for cell, agents in self._cells.items() + if filter_func is None or filter_func(cell) + } + ) diff --git a/mesa/experimental/cell_space/discrete_space.py b/mesa/experimental/cell_space/discrete_space.py new file mode 100644 index 00000000000..d2161c5b46a --- /dev/null +++ b/mesa/experimental/cell_space/discrete_space.py @@ -0,0 +1,65 @@ +from __future__ import annotations + +from functools import cached_property +from random import Random +from typing import Generic, TypeVar + +from mesa.experimental.cell_space.cell import Cell +from mesa.experimental.cell_space.cell_collection import CellCollection + +T = TypeVar("T", bound=Cell) + + +class DiscreteSpace(Generic[T]): + """Base class for all discrete spaces. + + Attributes: + capacity (int): The capacity of the cells in the discrete space + all_cells (CellCollection): The cells composing the discrete space + random (Random): The random number generator + cell_klass (Type) : the type of cell class + empties (CellCollection) : collecction of all cells that are empty + + """ + + def __init__( + self, + capacity: int | None = None, + cell_klass: type[T] = Cell, + random: Random | None = None, + ): + super().__init__() + self.capacity = capacity + self._cells: dict[tuple[int, ...], T] = {} + if random is None: + random = Random() # FIXME should default to default rng from model + self.random = random + self.cell_klass = cell_klass + + self._empties: dict[tuple[int, ...], None] = {} + self._empties_initialized = False + + @property + def cutoff_empties(self): + return 7.953 * len(self._cells) ** 0.384 + + def _connect_single_cell(self, cell: T): + ... + + @cached_property + def all_cells(self): + return CellCollection({cell: cell.agents for cell in self._cells.values()}) + + def __iter__(self): + return iter(self._cells.values()) + + def __getitem__(self, key): + return self._cells[key] + + @property + def empties(self) -> CellCollection: + return self.all_cells.select(lambda cell: cell.is_empty) + + def select_random_empty_cell(self) -> T: + """select random empty cell""" + return self.random.choice(list(self.empties)) diff --git a/mesa/experimental/cell_space/grid.py b/mesa/experimental/cell_space/grid.py new file mode 100644 index 00000000000..cc4b4b9e489 --- /dev/null +++ b/mesa/experimental/cell_space/grid.py @@ -0,0 +1,206 @@ +from __future__ import annotations + +from collections.abc import Sequence +from itertools import product +from random import Random +from typing import Generic, TypeVar + +from mesa.experimental.cell_space import Cell, DiscreteSpace + +T = TypeVar("T", bound=Cell) + + +class Grid(DiscreteSpace, Generic[T]): + """Base class for all grid classes + + Attributes: + dimensions (Sequence[int]): the dimensions of the grid + torus (bool): whether the grid is a torus + capacity (int): the capacity of a grid cell + random (Random): the random number generator + _try_random (bool): whether to get empty cell be repeatedly trying random cell + + """ + + def __init__( + self, + dimensions: Sequence[int], + torus: bool = False, + capacity: float | None = None, + random: Random | None = None, + cell_klass: type[T] = Cell, + ) -> None: + super().__init__(capacity=capacity, random=random, cell_klass=cell_klass) + self.torus = torus + self.dimensions = dimensions + self._try_random = True + self._ndims = len(dimensions) + self._validate_parameters() + + coordinates = product(*(range(dim) for dim in self.dimensions)) + + self._cells = { + coord: cell_klass(coord, capacity, random=self.random) + for coord in coordinates + } + self._connect_cells() + + def _connect_cells(self) -> None: + if self._ndims == 2: + self._connect_cells_2d() + else: + self._connect_cells_nd() + + def _connect_cells_2d(self) -> None: + ... + + def _connect_cells_nd(self) -> None: + ... + + def _validate_parameters(self): + if not all(isinstance(dim, int) and dim > 0 for dim in self.dimensions): + raise ValueError("Dimensions must be a list of positive integers.") + if not isinstance(self.torus, bool): + raise ValueError("Torus must be a boolean.") + if self.capacity is not None and not isinstance(self.capacity, (float, int)): + raise ValueError("Capacity must be a number or None.") + + def select_random_empty_cell(self) -> T: + # FIXME:: currently just a simple boolean to control behavior + # FIXME:: basically if grid is close to 99% full, creating empty list can be faster + # FIXME:: note however that the old results don't apply because in this implementation + # FIXME:: because empties list needs to be rebuild each time + # This method is based on Agents.jl's random_empty() implementation. See + # https://github.com/JuliaDynamics/Agents.jl/pull/541. For the discussion, see + # https://github.com/projectmesa/mesa/issues/1052 and + # https://github.com/projectmesa/mesa/pull/1565. The cutoff value provided + # is the break-even comparison with the time taken in the else branching point. + if self._try_random: + while True: + cell = self.all_cells.select_random_cell() + if cell.is_empty: + return cell + else: + return super().select_random_empty_cell() + + def _connect_single_cell_nd(self, cell: T, offsets: list[tuple[int, ...]]) -> None: + coord = cell.coordinate + + for d_coord in offsets: + n_coord = tuple(c + dc for c, dc in zip(coord, d_coord)) + if self.torus: + n_coord = tuple(nc % d for nc, d in zip(n_coord, self.dimensions)) + if all(0 <= nc < d for nc, d in zip(n_coord, self.dimensions)): + cell.connect(self._cells[n_coord]) + + def _connect_single_cell_2d(self, cell: T, offsets: list[tuple[int, int]]) -> None: + i, j = cell.coordinate + height, width = self.dimensions + + for di, dj in offsets: + ni, nj = (i + di, j + dj) + if self.torus: + ni, nj = ni % height, nj % width + if 0 <= ni < height and 0 <= nj < width: + cell.connect(self._cells[ni, nj]) + + +class OrthogonalMooreGrid(Grid[T]): + """Grid where cells are connected to their 8 neighbors. + + Example for two dimensions: + directions = [ + (-1, -1), (-1, 0), (-1, 1), + ( 0, -1), ( 0, 1), + ( 1, -1), ( 1, 0), ( 1, 1), + ] + """ + + def _connect_cells_2d(self) -> None: + # fmt: off + offsets = [ + (-1, -1), (-1, 0), (-1, 1), + ( 0, -1), ( 0, 1), + ( 1, -1), ( 1, 0), ( 1, 1), + ] + # fmt: on + height, width = self.dimensions + + for cell in self.all_cells: + self._connect_single_cell_2d(cell, offsets) + + def _connect_cells_nd(self) -> None: + offsets = list(product([-1, 0, 1], repeat=len(self.dimensions))) + offsets.remove((0,) * len(self.dimensions)) # Remove the central cell + + for cell in self.all_cells: + self._connect_single_cell_nd(cell, offsets) + + +class OrthogonalVonNeumannGrid(Grid[T]): + """Grid where cells are connected to their 4 neighbors. + + Example for two dimensions: + directions = [ + (0, -1), + (-1, 0), ( 1, 0), + (0, 1), + ] + """ + + def _connect_cells_2d(self) -> None: + # fmt: off + offsets = [ + (-1, 0), + (0, -1), (0, 1), + ( 1, 0), + ] + # fmt: on + height, width = self.dimensions + + for cell in self.all_cells: + self._connect_single_cell_2d(cell, offsets) + + def _connect_cells_nd(self) -> None: + offsets = [] + dimensions = len(self.dimensions) + for dim in range(dimensions): + for delta in [ + -1, + 1, + ]: # Move one step in each direction for the current dimension + offset = [0] * dimensions + offset[dim] = delta + offsets.append(tuple(offset)) + + for cell in self.all_cells: + self._connect_single_cell_nd(cell, offsets) + + +class HexGrid(Grid[T]): + def _connect_cells_2d(self) -> None: + # fmt: off + even_offsets = [ + (-1, -1), (-1, 0), + ( 0, -1), ( 0, 1), + ( 1, -1), ( 1, 0), + ] + odd_offsets = [ + (-1, 0), (-1, 1), + ( 0, -1), ( 0, 1), + ( 1, 0), ( 1, 1), + ] + # fmt: on + + for cell in self.all_cells: + i = cell.coordinate[0] + offsets = even_offsets if i % 2 == 0 else odd_offsets + self._connect_single_cell_2d(cell, offsets=offsets) + + def _connect_cells_nd(self) -> None: + raise NotImplementedError("HexGrids are only defined for 2 dimensions") + + def _validate_parameters(self): + super()._validate_parameters() + if len(self.dimensions) != 2: + raise ValueError("HexGrid must have exactly 2 dimensions.") diff --git a/mesa/experimental/cell_space/network.py b/mesa/experimental/cell_space/network.py new file mode 100644 index 00000000000..57c4d492bb0 --- /dev/null +++ b/mesa/experimental/cell_space/network.py @@ -0,0 +1,40 @@ +from random import Random +from typing import Any, Optional + +from mesa.experimental.cell_space.cell import Cell +from mesa.experimental.cell_space.discrete_space import DiscreteSpace + + +class Network(DiscreteSpace): + """A networked discrete space""" + + def __init__( + self, + G: Any, # noqa: N803 + capacity: Optional[int] = None, + random: Optional[Random] = None, + cell_klass: type[Cell] = Cell, + ) -> None: + """A Networked grid + + Args: + G: a NetworkX Graph instance. + capacity (int) : the capacity of the cell + random (Random): + CellKlass (type[Cell]): The base Cell class to use in the Network + + """ + super().__init__(capacity=capacity, random=random, cell_klass=cell_klass) + self.G = G + + for node_id in self.G.nodes: + self._cells[node_id] = self.cell_klass( + node_id, capacity, random=self.random + ) + + for cell in self.all_cells: + self._connect_single_cell(cell) + + def _connect_single_cell(self, cell): + for node_id in self.G.neighbors(cell.coordinate): + cell.connect(self._cells[node_id]) diff --git a/tests/test_cell_space.py b/tests/test_cell_space.py new file mode 100644 index 00000000000..050e39a09a6 --- /dev/null +++ b/tests/test_cell_space.py @@ -0,0 +1,465 @@ +import random + +import pytest + +from mesa import Model +from mesa.experimental.cell_space import ( + Cell, + CellAgent, + CellCollection, + HexGrid, + Network, + OrthogonalMooreGrid, + OrthogonalVonNeumannGrid, +) + + +def test_orthogonal_grid_neumann(): + width = 10 + height = 10 + grid = OrthogonalVonNeumannGrid((width, height), torus=False, capacity=None) + + assert len(grid._cells) == width * height + + # von neumann neighborhood, torus false, top left corner + assert len(grid._cells[(0, 0)]._connections) == 2 + for connection in grid._cells[(0, 0)]._connections: + assert connection.coordinate in {(0, 1), (1, 0)} + + # von neumann neighborhood, torus false, top right corner + for connection in grid._cells[(0, width - 1)]._connections: + assert connection.coordinate in {(0, width - 2), (1, width - 1)} + + # von neumann neighborhood, torus false, bottom left corner + for connection in grid._cells[(height - 1, 0)]._connections: + assert connection.coordinate in {(height - 1, 1), (height - 2, 0)} + + # von neumann neighborhood, torus false, bottom right corner + for connection in grid._cells[(height - 1, width - 1)]._connections: + assert connection.coordinate in { + (height - 1, width - 2), + (height - 2, width - 1), + } + + # von neumann neighborhood middle of grid + assert len(grid._cells[(5, 5)]._connections) == 4 + for connection in grid._cells[(5, 5)]._connections: + assert connection.coordinate in {(4, 5), (5, 4), (5, 6), (6, 5)} + + # von neumann neighborhood, torus True, top corner + grid = OrthogonalVonNeumannGrid((width, height), torus=True, capacity=None) + assert len(grid._cells[(0, 0)]._connections) == 4 + for connection in grid._cells[(0, 0)]._connections: + assert connection.coordinate in {(0, 1), (1, 0), (0, 9), (9, 0)} + + # von neumann neighborhood, torus True, top right corner + for connection in grid._cells[(0, width - 1)]._connections: + assert connection.coordinate in {(0, 8), (0, 0), (1, 9), (9, 9)} + + # von neumann neighborhood, torus True, bottom left corner + for connection in grid._cells[(9, 0)]._connections: + assert connection.coordinate in {(9, 1), (9, 9), (0, 0), (8, 0)} + + # von neumann neighborhood, torus True, bottom right corner + for connection in grid._cells[(9, 9)]._connections: + assert connection.coordinate in {(9, 0), (9, 8), (8, 9), (0, 9)} + + +def test_orthogonal_grid_neumann_3d(): + width = 10 + height = 10 + depth = 10 + grid = OrthogonalVonNeumannGrid((width, height, depth), torus=False, capacity=None) + + assert len(grid._cells) == width * height * depth + + # von neumann neighborhood, torus false, top left corner + assert len(grid._cells[(0, 0, 0)]._connections) == 3 + for connection in grid._cells[(0, 0, 0)]._connections: + assert connection.coordinate in {(0, 0, 1), (0, 1, 0), (1, 0, 0)} + + # von neumann neighborhood, torus false, top right corner + for connection in grid._cells[(0, width - 1, 0)]._connections: + assert connection.coordinate in { + (0, width - 1, 1), + (0, width - 2, 0), + (1, width - 1, 0), + } + + # von neumann neighborhood, torus false, bottom left corner + for connection in grid._cells[(height - 1, 0, 0)]._connections: + assert connection.coordinate in { + (height - 1, 0, 1), + (height - 1, 1, 0), + (height - 2, 0, 0), + } + + # von neumann neighborhood, torus false, bottom right corner + for connection in grid._cells[(height - 1, width - 1, 0)]._connections: + assert connection.coordinate in { + (height - 1, width - 1, 1), + (height - 1, width - 2, 0), + (height - 2, width - 1, 0), + } + + # von neumann neighborhood middle of grid + assert len(grid._cells[(5, 5, 5)]._connections) == 6 + for connection in grid._cells[(5, 5, 5)]._connections: + assert connection.coordinate in { + (4, 5, 5), + (5, 4, 5), + (5, 5, 4), + (5, 5, 6), + (5, 6, 5), + (6, 5, 5), + } + + # von neumann neighborhood, torus True, top corner + grid = OrthogonalVonNeumannGrid((width, height, depth), torus=True, capacity=None) + assert len(grid._cells[(0, 0, 0)]._connections) == 6 + for connection in grid._cells[(0, 0, 0)]._connections: + assert connection.coordinate in { + (0, 0, 1), + (0, 1, 0), + (1, 0, 0), + (0, 0, 9), + (0, 9, 0), + (9, 0, 0), + } + + +def test_orthogonal_grid_moore(): + width = 10 + height = 10 + + # Moore neighborhood, torus false, top corner + grid = OrthogonalMooreGrid((width, height), torus=False, capacity=None) + assert len(grid._cells[(0, 0)]._connections) == 3 + for connection in grid._cells[(0, 0)]._connections: + assert connection.coordinate in {(0, 1), (1, 0), (1, 1)} + + # Moore neighborhood middle of grid + assert len(grid._cells[(5, 5)]._connections) == 8 + for connection in grid._cells[(5, 5)]._connections: + # fmt: off + assert connection.coordinate in {(4, 4), (4, 5), (4, 6), + (5, 4), (5, 6), + (6, 4), (6, 5), (6, 6)} + # fmt: on + + # Moore neighborhood, torus True, top corner + grid = OrthogonalMooreGrid([10, 10], torus=True, capacity=None) + assert len(grid._cells[(0, 0)]._connections) == 8 + for connection in grid._cells[(0, 0)]._connections: + # fmt: off + assert connection.coordinate in {(9, 9), (9, 0), (9, 1), + (0, 9), (0, 1), + (1, 9), (1, 0), (1, 1)} + # fmt: on + + +def test_orthogonal_grid_moore_3d(): + width = 10 + height = 10 + depth = 10 + + # Moore neighborhood, torus false, top corner + grid = OrthogonalMooreGrid((width, height, depth), torus=False, capacity=None) + assert len(grid._cells[(0, 0, 0)]._connections) == 7 + for connection in grid._cells[(0, 0, 0)]._connections: + assert connection.coordinate in { + (0, 0, 1), + (0, 1, 0), + (0, 1, 1), + (1, 0, 0), + (1, 0, 1), + (1, 1, 0), + (1, 1, 1), + } + + # Moore neighborhood middle of grid + assert len(grid._cells[(5, 5, 5)]._connections) == 26 + for connection in grid._cells[(5, 5, 5)]._connections: + # fmt: off + assert connection.coordinate in {(4, 4, 4), (4, 4, 5), (4, 4, 6), (4, 5, 4), (4, 5, 5), (4, 5, 6), (4, 6, 4), (4, 6, 5), (4, 6, 6), + (5, 4, 4), (5, 4, 5), (5, 4, 6), (5, 5, 4), (5, 5, 6), (5, 6, 4), (5, 6, 5), (5, 6, 6), + (6, 4, 4), (6, 4, 5), (6, 4, 6), (6, 5, 4), (6, 5, 5), (6, 5, 6), (6, 6, 4), (6, 6, 5), (6, 6, 6)} + # fmt: on + + # Moore neighborhood, torus True, top corner + grid = OrthogonalMooreGrid((width, height, depth), torus=True, capacity=None) + assert len(grid._cells[(0, 0, 0)]._connections) == 26 + for connection in grid._cells[(0, 0, 0)]._connections: + # fmt: off + assert connection.coordinate in {(9, 9, 9), (9, 9, 0), (9, 9, 1), (9, 0, 9), (9, 0, 0), (9, 0, 1), (9, 1, 9), (9, 1, 0), (9, 1, 1), + (0, 9, 9), (0, 9, 0), (0, 9, 1), (0, 0, 9), (0, 0, 1), (0, 1, 9), (0, 1, 0), (0, 1, 1), + (1, 9, 9), (1, 9, 0), (1, 9, 1), (1, 0, 9), (1, 0, 0), (1, 0, 1), (1, 1, 9), (1, 1, 0), (1, 1, 1)} + # fmt: on + + +def test_orthogonal_grid_moore_4d(): + width = 10 + height = 10 + depth = 10 + time = 10 + + # Moore neighborhood, torus false, top corner + grid = OrthogonalMooreGrid((width, height, depth, time), torus=False, capacity=None) + assert len(grid._cells[(0, 0, 0, 0)]._connections) == 15 + for connection in grid._cells[(0, 0, 0, 0)]._connections: + assert connection.coordinate in { + (0, 0, 0, 1), + (0, 0, 1, 0), + (0, 0, 1, 1), + (0, 1, 0, 0), + (0, 1, 0, 1), + (0, 1, 1, 0), + (0, 1, 1, 1), + (1, 0, 0, 0), + (1, 0, 0, 1), + (1, 0, 1, 0), + (1, 0, 1, 1), + (1, 1, 0, 0), + (1, 1, 0, 1), + (1, 1, 1, 0), + (1, 1, 1, 1), + } + + # Moore neighborhood middle of grid + assert len(grid._cells[(5, 5, 5, 5)]._connections) == 80 + for connection in grid._cells[(5, 5, 5, 5)]._connections: + # fmt: off + assert connection.coordinate in {(4, 4, 4, 4), (4, 4, 4, 5), (4, 4, 4, 6), (4, 4, 5, 4), (4, 4, 5, 5), (4, 4, 5, 6), (4, 4, 6, 4), (4, 4, 6, 5), (4, 4, 6, 6), + (4, 5, 4, 4), (4, 5, 4, 5), (4, 5, 4, 6), (4, 5, 5, 4), (4, 5, 5, 5), (4, 5, 5, 6), (4, 5, 6, 4), (4, 5, 6, 5), (4, 5, 6, 6), + (4, 6, 4, 4), (4, 6, 4, 5), (4, 6, 4, 6), (4, 6, 5, 4), (4, 6, 5, 5), (4, 6, 5, 6), (4, 6, 6, 4), (4, 6, 6, 5), (4, 6, 6, 6), + (5, 4, 4, 4), (5, 4, 4, 5), (5, 4, 4, 6), (5, 4, 5, 4), (5, 4, 5, 5), (5, 4, 5, 6), (5, 4, 6, 4), (5, 4, 6, 5), (5, 4, 6, 6), + (5, 5, 4, 4), (5, 5, 4, 5), (5, 5, 4, 6), (5, 5, 5, 4), (5, 5, 5, 6), (5, 5, 6, 4), (5, 5, 6, 5), (5, 5, 6, 6), + (5, 6, 4, 4), (5, 6, 4, 5), (5, 6, 4, 6), (5, 6, 5, 4), (5, 6, 5, 5), (5, 6, 5, 6), (5, 6, 6, 4), (5, 6, 6, 5), (5, 6, 6, 6), + (6, 4, 4, 4), (6, 4, 4, 5), (6, 4, 4, 6), (6, 4, 5, 4), (6, 4, 5, 5), (6, 4, 5, 6), (6, 4, 6, 4), (6, 4, 6, 5), (6, 4, 6, 6), + (6, 5, 4, 4), (6, 5, 4, 5), (6, 5, 4, 6), (6, 5, 5, 4), (6, 5, 5, 5), (6, 5, 5, 6), (6, 5, 6, 4), (6, 5, 6, 5), (6, 5, 6, 6), + (6, 6, 4, 4), (6, 6, 4, 5), (6, 6, 4, 6), (6, 6, 5, 4), (6, 6, 5, 5), (6, 6, 5, 6), (6, 6, 6, 4), (6, 6, 6, 5), (6, 6, 6, 6)} + # fmt: on + + +def test_orthogonal_grid_moore_1d(): + width = 10 + + # Moore neighborhood, torus false, left edge + grid = OrthogonalMooreGrid((width,), torus=False, capacity=None) + assert len(grid._cells[(0,)]._connections) == 1 + for connection in grid._cells[(0,)]._connections: + assert connection.coordinate in {(1,)} + + # Moore neighborhood middle of grid + assert len(grid._cells[(5,)]._connections) == 2 + for connection in grid._cells[(5,)]._connections: + assert connection.coordinate in {(4,), (6,)} + + # Moore neighborhood, torus True, left edge + grid = OrthogonalMooreGrid((width,), torus=True, capacity=None) + assert len(grid._cells[(0,)]._connections) == 2 + for connection in grid._cells[(0,)]._connections: + assert connection.coordinate in {(1,), (9,)} + + +def test_cell_neighborhood(): + # orthogonal grid + + ## von Neumann + width = 10 + height = 10 + grid = OrthogonalVonNeumannGrid((width, height), torus=False, capacity=None) + for radius, n in zip(range(1, 4), [2, 5, 9]): + neighborhood = grid._cells[(0, 0)].neighborhood(radius=radius) + assert len(neighborhood) == n + + ## Moore + width = 10 + height = 10 + grid = OrthogonalMooreGrid((width, height), torus=False, capacity=None) + for radius, n in zip(range(1, 4), [3, 8, 15]): + neighborhood = grid._cells[(0, 0)].neighborhood(radius=radius) + assert len(neighborhood) == n + + with pytest.raises(ValueError): + grid._cells[(0, 0)].neighborhood(radius=0) + + # hexgrid + width = 10 + height = 10 + grid = HexGrid((width, height), torus=False, capacity=None) + for radius, n in zip(range(1, 4), [2, 6, 11]): + neighborhood = grid._cells[(0, 0)].neighborhood(radius=radius) + assert len(neighborhood) == n + + width = 10 + height = 10 + grid = HexGrid((width, height), torus=False, capacity=None) + for radius, n in zip(range(1, 4), [5, 10, 17]): + neighborhood = grid._cells[(1, 0)].neighborhood(radius=radius) + assert len(neighborhood) == n + + # networkgrid + + +def test_hexgrid(): + width = 10 + height = 10 + + grid = HexGrid((width, height), torus=False) + assert len(grid._cells) == width * height + + # first row + assert len(grid._cells[(0, 0)]._connections) == 2 + for connection in grid._cells[(0, 0)]._connections: + assert connection.coordinate in {(0, 1), (1, 0)} + + # second row + assert len(grid._cells[(1, 0)]._connections) == 5 + for connection in grid._cells[(1, 0)]._connections: + # fmt: off + assert connection.coordinate in {(0, 0), (0, 1), + (1, 1), + (2, 0), (2, 1)} + + # middle odd row + assert len(grid._cells[(5, 5)]._connections) == 6 + for connection in grid._cells[(5, 5)]._connections: + # fmt: off + assert connection.coordinate in {(4, 5), (4, 6), + (5, 4), (5, 6), + (6, 5), (6, 6)} + + # fmt: on + + # middle even row + assert len(grid._cells[(4, 4)]._connections) == 6 + for connection in grid._cells[(4, 4)]._connections: + # fmt: off + assert connection.coordinate in {(3, 3), (3, 4), + (4, 3), (4, 5), + (5, 3), (5, 4)} + + # fmt: on + + grid = HexGrid((width, height), torus=True) + assert len(grid._cells) == width * height + + # first row + assert len(grid._cells[(0, 0)]._connections) == 6 + for connection in grid._cells[(0, 0)]._connections: + # fmt: off + assert connection.coordinate in {(9, 9), (9, 0), + (0, 9), (0, 1), + (1, 9), (1, 0)} + + # fmt: on + + +def test_networkgrid(): + import networkx as nx + + n = 10 + m = 20 + seed = 42 + G = nx.gnm_random_graph(n, m, seed=seed) # noqa: N806 + grid = Network(G) + + assert len(grid._cells) == n + + for i, cell in grid._cells.items(): + for connection in cell._connections: + assert connection.coordinate in G.neighbors(i) + + +def test_empties_space(): + import networkx as nx + + n = 10 + m = 20 + seed = 42 + G = nx.gnm_random_graph(n, m, seed=seed) # noqa: N806 + grid = Network(G) + + assert len(grid.empties) == n + + model = Model() + for i in range(8): + grid._cells[i].add_agent(CellAgent(i, model)) + + cell = grid.select_random_empty_cell() + assert cell.coordinate in {8, 9} + + +def test_cell(): + cell1 = Cell((1,), capacity=None, random=random.Random()) + cell2 = Cell((2,), capacity=None, random=random.Random()) + + # connect + cell1.connect(cell2) + assert cell2 in cell1._connections + + # disconnect + cell1.disconnect(cell2) + assert cell2 not in cell1._connections + + # remove cell not in connections + with pytest.raises(ValueError): + cell1.disconnect(cell2) + + # add_agent + model = Model() + agent = CellAgent(1, model) + + cell1.add_agent(agent) + assert agent in cell1.agents + + # remove_agent + cell1.remove_agent(agent) + assert agent not in cell1.agents + + with pytest.raises(ValueError): + cell1.remove_agent(agent) + + cell1 = Cell((1,), capacity=1, random=random.Random()) + cell1.add_agent(CellAgent(1, model)) + assert cell1.is_full + + with pytest.raises(Exception): + cell1.add_agent(CellAgent(2, model)) + + +def test_cell_collection(): + cell1 = Cell((1,), capacity=None, random=random.Random()) + + collection = CellCollection({cell1: cell1.agents}, random=random.Random()) + assert len(collection) == 1 + assert cell1 in collection + + rng = random.Random() + n = 10 + collection = CellCollection([Cell((i,), random=rng) for i in range(n)], random=rng) + assert len(collection) == n + + cell = collection.select_random_cell() + assert cell in collection + + cells = collection.cells + assert len(cells) == n + + agents = collection.agents + assert len(list(agents)) == 0 + + cells = collection.cells + model = Model() + cells[0].add_agent(CellAgent(1, model)) + cells[3].add_agent(CellAgent(2, model)) + cells[7].add_agent(CellAgent(3, model)) + agents = collection.agents + assert len(list(agents)) == 3 + + agent = collection.select_random_agent() + assert agent in set(collection.agents) + + agents = collection[cells[0]] + assert agents == cells[0].agents