diff --git a/docs/system-design/raw.drawio b/docs/system-design/raw.drawio index a048d94..0c53a79 100644 --- a/docs/system-design/raw.drawio +++ b/docs/system-design/raw.drawio @@ -1,4 +1,4 @@ - + @@ -39,7 +39,7 @@ - + @@ -76,7 +76,7 @@ - + @@ -129,7 +129,7 @@ - + @@ -140,16 +140,16 @@ - + - + - + - + diff --git a/src/pixel_battle/deployment/chunk_stream_compression_coordination_service/__init__.py b/src/pixel_battle/deployment/chunk_image_refresh_orchestrator/__init__.py similarity index 100% rename from src/pixel_battle/deployment/chunk_stream_compression_coordination_service/__init__.py rename to src/pixel_battle/deployment/chunk_image_refresh_orchestrator/__init__.py diff --git a/src/pixel_battle/deployment/chunk_image_refresh_orchestrator/__main__.py b/src/pixel_battle/deployment/chunk_image_refresh_orchestrator/__main__.py new file mode 100644 index 0000000..f35062a --- /dev/null +++ b/src/pixel_battle/deployment/chunk_image_refresh_orchestrator/__main__.py @@ -0,0 +1,21 @@ +import asyncio + +from pixel_battle.deployment.chunk_image_refresh_orchestrator.di import ( + container, +) +from pixel_battle.presentation.distributed_tasks.refresh_chunk_view import ( + RefreshChunkViewTask, +) + + +async def main() -> None: + task = await container.get(RefreshChunkViewTask) + + try: + await task.start_pushing() + finally: + await container.close() + + +if __name__ == "__main__": + asyncio.run(main()) diff --git a/src/pixel_battle/deployment/chunk_stream_compression_service/di.py b/src/pixel_battle/deployment/chunk_image_refresh_orchestrator/di.py similarity index 82% rename from src/pixel_battle/deployment/chunk_stream_compression_service/di.py rename to src/pixel_battle/deployment/chunk_image_refresh_orchestrator/di.py index 7fba146..0be12e4 100644 --- a/src/pixel_battle/deployment/chunk_stream_compression_service/di.py +++ b/src/pixel_battle/deployment/chunk_image_refresh_orchestrator/di.py @@ -8,7 +8,7 @@ ) -chunk_stream_compression_service_container = make_async_container( +container = make_async_container( OutOfProcessInfrastructureProvider(), OutOfProcessInfrastructureAdapterProvider(), InteractorProvider(), diff --git a/src/pixel_battle/deployment/chunk_stream_compression_service/__init__.py b/src/pixel_battle/deployment/chunk_image_refresh_script/__init__.py similarity index 100% rename from src/pixel_battle/deployment/chunk_stream_compression_service/__init__.py rename to src/pixel_battle/deployment/chunk_image_refresh_script/__init__.py diff --git a/src/pixel_battle/deployment/chunk_image_refresh_script/__main__.py b/src/pixel_battle/deployment/chunk_image_refresh_script/__main__.py new file mode 100644 index 0000000..445929c --- /dev/null +++ b/src/pixel_battle/deployment/chunk_image_refresh_script/__main__.py @@ -0,0 +1,19 @@ +import asyncio + +from pixel_battle.deployment.chunk_image_refresh_script.di import container +from pixel_battle.presentation.scripts.refresh_chunk_image import ( + RefreshChunkImageScript, +) + + +async def main() -> None: + script = await container.get(RefreshChunkImageScript) + + try: + await script() + finally: + await container.close() + + +if __name__ == "__main__": + asyncio.run(main()) diff --git a/src/pixel_battle/deployment/compression_script/di.py b/src/pixel_battle/deployment/chunk_image_refresh_script/di.py similarity index 84% rename from src/pixel_battle/deployment/compression_script/di.py rename to src/pixel_battle/deployment/chunk_image_refresh_script/di.py index 0495d3a..6e3699f 100644 --- a/src/pixel_battle/deployment/compression_script/di.py +++ b/src/pixel_battle/deployment/chunk_image_refresh_script/di.py @@ -8,7 +8,7 @@ ) -compression_script_container = make_async_container( +container = make_async_container( OutOfProcessInfrastructureProvider(), OutOfProcessInfrastructureAdapterProvider(), InteractorProvider(), diff --git a/src/pixel_battle/deployment/compression_script/__init__.py b/src/pixel_battle/deployment/chunk_image_refresh_worker/__init__.py similarity index 100% rename from src/pixel_battle/deployment/compression_script/__init__.py rename to src/pixel_battle/deployment/chunk_image_refresh_worker/__init__.py diff --git a/src/pixel_battle/deployment/chunk_image_refresh_worker/__main__.py b/src/pixel_battle/deployment/chunk_image_refresh_worker/__main__.py new file mode 100644 index 0000000..0756e58 --- /dev/null +++ b/src/pixel_battle/deployment/chunk_image_refresh_worker/__main__.py @@ -0,0 +1,19 @@ +import asyncio + +from pixel_battle.deployment.chunk_image_refresh_worker.di import container +from pixel_battle.presentation.distributed_tasks.refresh_chunk_view import ( + RefreshChunkViewTask, +) + + +async def main() -> None: + task = await container.get(RefreshChunkViewTask) + + try: + await task.start_pulling() + finally: + await container.close() + + +if __name__ == "__main__": + asyncio.run(main()) diff --git a/src/pixel_battle/deployment/chunk_stream_compression_coordination_service/di.py b/src/pixel_battle/deployment/chunk_image_refresh_worker/di.py similarity index 79% rename from src/pixel_battle/deployment/chunk_stream_compression_coordination_service/di.py rename to src/pixel_battle/deployment/chunk_image_refresh_worker/di.py index 4f96a16..0be12e4 100644 --- a/src/pixel_battle/deployment/chunk_stream_compression_coordination_service/di.py +++ b/src/pixel_battle/deployment/chunk_image_refresh_worker/di.py @@ -8,7 +8,7 @@ ) -chunk_stream_compression_coordination_service_container = make_async_container( +container = make_async_container( OutOfProcessInfrastructureProvider(), OutOfProcessInfrastructureAdapterProvider(), InteractorProvider(), diff --git a/src/pixel_battle/deployment/chunk_reading_service/asgi.py b/src/pixel_battle/deployment/chunk_reading_service/asgi.py index 51be6a1..a685a0d 100644 --- a/src/pixel_battle/deployment/chunk_reading_service/asgi.py +++ b/src/pixel_battle/deployment/chunk_reading_service/asgi.py @@ -1,12 +1,10 @@ -from pixel_battle.deployment.chunk_reading_service.di import ( - chunk_reading_service_container, -) +from pixel_battle.deployment.chunk_reading_service.di import container from pixel_battle.deployment.common.asgi import ASGIApp, LazyASGIApp from pixel_battle.presentation.web.app import app_from async def app_factory() -> ASGIApp: - return await app_from(chunk_reading_service_container) + return await app_from(container) app = LazyASGIApp(factory=app_factory) diff --git a/src/pixel_battle/deployment/chunk_reading_service/di.py b/src/pixel_battle/deployment/chunk_reading_service/di.py index 5481909..3926e9f 100644 --- a/src/pixel_battle/deployment/chunk_reading_service/di.py +++ b/src/pixel_battle/deployment/chunk_reading_service/di.py @@ -26,7 +26,7 @@ def provide_coroutines(self) -> AppCoroutines: return [] -chunk_reading_service_container = make_async_container( +container = make_async_container( OutOfProcessInfrastructureProvider(), OutOfProcessInfrastructureAdapterProvider(), InteractorProvider(), diff --git a/src/pixel_battle/deployment/chunk_stream_compression_coordination_service/__main__.py b/src/pixel_battle/deployment/chunk_stream_compression_coordination_service/__main__.py deleted file mode 100644 index 6270954..0000000 --- a/src/pixel_battle/deployment/chunk_stream_compression_coordination_service/__main__.py +++ /dev/null @@ -1,23 +0,0 @@ -import asyncio - -from pixel_battle.deployment.chunk_stream_compression_coordination_service.di import ( # noqa: E501 - chunk_stream_compression_coordination_service_container, -) -from pixel_battle.presentation.distributed_tasks.update_chunk_view import ( - UpdateChunkViewTask, -) - - -async def main() -> None: - task = await chunk_stream_compression_coordination_service_container.get( - UpdateChunkViewTask - ) - - try: - await task.start_pushing() - finally: - await chunk_stream_compression_coordination_service_container.close() - - -if __name__ == "__main__": - asyncio.run(main()) diff --git a/src/pixel_battle/deployment/chunk_stream_compression_service/__main__.py b/src/pixel_battle/deployment/chunk_stream_compression_service/__main__.py deleted file mode 100644 index ea03f05..0000000 --- a/src/pixel_battle/deployment/chunk_stream_compression_service/__main__.py +++ /dev/null @@ -1,23 +0,0 @@ -import asyncio - -from pixel_battle.deployment.chunk_stream_compression_service.di import ( - chunk_stream_compression_service_container, -) -from pixel_battle.presentation.distributed_tasks.update_chunk_view import ( - UpdateChunkViewTask, -) - - -async def main() -> None: - task = await chunk_stream_compression_service_container.get( - UpdateChunkViewTask - ) - - try: - await task.start_pulling() - finally: - await chunk_stream_compression_service_container.close() - - -if __name__ == "__main__": - asyncio.run(main()) diff --git a/src/pixel_battle/deployment/chunk_streaming_service/asgi.py b/src/pixel_battle/deployment/chunk_streaming_service/asgi.py index 9303126..7f66159 100644 --- a/src/pixel_battle/deployment/chunk_streaming_service/asgi.py +++ b/src/pixel_battle/deployment/chunk_streaming_service/asgi.py @@ -1,12 +1,10 @@ -from pixel_battle.deployment.chunk_streaming_service.di import ( - chunk_streaming_service_container, -) +from pixel_battle.deployment.chunk_streaming_service.di import container from pixel_battle.deployment.common.asgi import ASGIApp, LazyASGIApp from pixel_battle.presentation.web.app import app_from async def app_factory() -> ASGIApp: - return await app_from(chunk_streaming_service_container) + return await app_from(container) app = LazyASGIApp(factory=app_factory) diff --git a/src/pixel_battle/deployment/chunk_streaming_service/di.py b/src/pixel_battle/deployment/chunk_streaming_service/di.py index 8b4c1a8..3afc753 100644 --- a/src/pixel_battle/deployment/chunk_streaming_service/di.py +++ b/src/pixel_battle/deployment/chunk_streaming_service/di.py @@ -29,7 +29,7 @@ def provide_coroutines(self, streaming: Streaming) -> AppCoroutines: return [streaming.start()] -chunk_streaming_service_container = make_async_container( +container = make_async_container( OutOfProcessInfrastructureProvider(), OutOfProcessInfrastructureAdapterProvider(), InteractorProvider(), diff --git a/src/pixel_battle/deployment/chunk_writing_service/asgi.py b/src/pixel_battle/deployment/chunk_writing_service/asgi.py index bcf9fce..a94984b 100644 --- a/src/pixel_battle/deployment/chunk_writing_service/asgi.py +++ b/src/pixel_battle/deployment/chunk_writing_service/asgi.py @@ -1,12 +1,10 @@ -from pixel_battle.deployment.chunk_writing_service.di import ( - chunk_writing_service_container, -) +from pixel_battle.deployment.chunk_writing_service.di import container from pixel_battle.deployment.common.asgi import ASGIApp, LazyASGIApp from pixel_battle.presentation.web.app import app_from async def app_factory() -> ASGIApp: - return await app_from(chunk_writing_service_container) + return await app_from(container) app = LazyASGIApp(factory=app_factory) diff --git a/src/pixel_battle/deployment/chunk_writing_service/di.py b/src/pixel_battle/deployment/chunk_writing_service/di.py index 88de7da..9a65e15 100644 --- a/src/pixel_battle/deployment/chunk_writing_service/di.py +++ b/src/pixel_battle/deployment/chunk_writing_service/di.py @@ -27,7 +27,7 @@ def provide_coroutines(self) -> AppCoroutines: return [] -chunk_writing_service_container = make_async_container( +container = make_async_container( OutOfProcessInfrastructureProvider(), OutOfProcessInfrastructureAdapterProvider(), InteractorProvider(), diff --git a/src/pixel_battle/deployment/common/di.py b/src/pixel_battle/deployment/common/di.py index 39053ff..b14a8c6 100644 --- a/src/pixel_battle/deployment/common/di.py +++ b/src/pixel_battle/deployment/common/di.py @@ -1,4 +1,4 @@ -from typing import Any, AsyncIterator +from typing import AsyncIterator from dishka import Provider, Scope, alias, provide from redis.asyncio import RedisCluster @@ -6,8 +6,8 @@ from pixel_battle.application.interactors.recolor_pixel import ( RecolorPixel, ) -from pixel_battle.application.interactors.update_chunk_view import ( - UpdateChunkView, +from pixel_battle.application.interactors.refresh_chunk_view import ( + RefreshChunkView, ) from pixel_battle.application.interactors.view_chunk import ( ViewChunk, @@ -15,20 +15,14 @@ from pixel_battle.application.interactors.view_chunk_stream import ( ViewChunkStream, ) -from pixel_battle.application.ports.broker import Broker from pixel_battle.application.ports.chunk_view import ( ChunkView, DefaultChunkViewWhen, ) from pixel_battle.application.ports.chunk_views import ChunkViews from pixel_battle.application.ports.clock import Clock -from pixel_battle.application.ports.lock import Lock -from pixel_battle.application.ports.offsets import Offsets +from pixel_battle.application.ports.pixel_queue import PixelQueue from pixel_battle.application.ports.user_data_signing import UserDataSigning -from pixel_battle.infrastructure.adapters.broker import ( - InMemoryBroker, - RedisClusterStreamBroker, -) from pixel_battle.infrastructure.adapters.chunk_view import ( DefaultPNGImageChunkViewWhen, PNGImageChunkView, @@ -41,23 +35,19 @@ LocalClock, RedisClusterRandomNodeClock, ) -from pixel_battle.infrastructure.adapters.lock import ( - FakeLock, - InRedisClusterLock, -) -from pixel_battle.infrastructure.adapters.offsets import ( - InMemoryOffsets, - InRedisClusterRedisStreamOffsets, +from pixel_battle.infrastructure.adapters.pixel_queue import ( + InMemoryPixelQueue, + RedisClusterStreamPixelQueue, ) from pixel_battle.infrastructure.adapters.user_data_signing import ( UserDataSigningToHS256JWT, ) from pixel_battle.infrastructure.envs import Envs -from pixel_battle.presentation.distributed_tasks.update_chunk_view import ( - UpdateChunkViewTask, +from pixel_battle.presentation.distributed_tasks.refresh_chunk_view import ( + RefreshChunkViewTask, ) -from pixel_battle.presentation.scripts.update_chunk_view import ( - UpdateChunkViewScript, +from pixel_battle.presentation.scripts.refresh_chunk_image import ( + RefreshChunkImageScript, ) from pixel_battle.presentation.web.streaming import Streaming @@ -107,22 +97,10 @@ def provide_chunk_views( ) @provide - def provide_broker( + def provide_pixel_queue( self, canvas_redis_cluster: CanvasRedisCluster - ) -> Broker[Any]: - return RedisClusterStreamBroker(redis_cluster=canvas_redis_cluster) - - @provide - def provide_lock(self, canvas_redis_cluster: CanvasRedisCluster) -> Lock: - return InRedisClusterLock(redis_cluster=canvas_redis_cluster) - - @provide - def provide_offsets( - self, canvas_redis_cluster: CanvasRedisCluster - ) -> Offsets[Any]: - return InRedisClusterRedisStreamOffsets( - redis_cluster=canvas_redis_cluster - ) + ) -> PixelQueue: + return RedisClusterStreamPixelQueue(redis_cluster=canvas_redis_cluster) @provide def provide_clock(self, canvas_redis_cluster: CanvasRedisCluster) -> Clock: @@ -146,11 +124,9 @@ def provide_user_data_signing(self) -> UserDataSigning[str]: provide_chunk_views = provide( lambda _: InMemoryChunkViews(), provides=ChunkViews[PNGImageChunkView] ) - provide_broker = provide( - lambda _: InMemoryBroker(), provides=Broker[Any] + provide_pixel_queue = provide( + lambda _: InMemoryPixelQueue(), provides=PixelQueue ) - provide_offsets = provide(lambda _: InMemoryOffsets, provides=Offsets[Any]) - provide_lock = provide(FakeLock, provides=Lock) provide_clock = provide(LocalClock, provides=Clock) provide_default_png_image_chunk_view_when = provide( @@ -165,16 +141,16 @@ class InteractorProvider(Provider): provide_recolor_pixel = provide(RecolorPixel[str]) provide_view_chunk_stream = provide(ViewChunkStream) - provide_update_chunk_view = provide(UpdateChunkView[PNGImageChunkView, Any]) - provide_any_update_chunk_view = alias( - source=UpdateChunkView[PNGImageChunkView, Any], - provides=UpdateChunkView[ChunkView, Any], + provide_refresh_chunk_view = provide(RefreshChunkView[PNGImageChunkView]) + provide_any_refresh_chunk_view = alias( + source=RefreshChunkView[PNGImageChunkView], + provides=RefreshChunkView[ChunkView], ) - provide_view_chunk = provide(ViewChunk[PNGImageChunkView, Any]) + provide_view_chunk = provide(ViewChunk[PNGImageChunkView]) provide_any_view_chunk = alias( - source=ViewChunk[PNGImageChunkView, Any], - provides=ViewChunk[ChunkView, Any], + source=ViewChunk[PNGImageChunkView], + provides=ViewChunk[ChunkView], ) @@ -194,9 +170,9 @@ class ScriptProvider(Provider): @provide def provide_update_chunk_view_script( - self, update_chunk_view: UpdateChunkView[ChunkView, Any], - ) -> UpdateChunkViewScript: - return UpdateChunkViewScript(update_chunk_view=update_chunk_view) + self, refresh_chunk_view: RefreshChunkView[PNGImageChunkView], + ) -> RefreshChunkImageScript: + return RefreshChunkImageScript(refresh_chunk_view=refresh_chunk_view) class DistributedTaskProvider(Provider): @@ -206,9 +182,9 @@ class DistributedTaskProvider(Provider): def provide_update_chunk_view_task( self, canvas_metadata_redis_cluster: CanvasMetadataRedisCluster, - update_chunk_view: UpdateChunkView[PNGImageChunkView, Any], - ) -> UpdateChunkViewTask: - return UpdateChunkViewTask( - update_chunk_view=update_chunk_view, + refresh_chunk_view: RefreshChunkView[ChunkView], + ) -> RefreshChunkViewTask: + return RefreshChunkViewTask( + refresh_chunk_view=refresh_chunk_view, redis_cluster=canvas_metadata_redis_cluster, ) diff --git a/src/pixel_battle/deployment/compression_script/__main__.py b/src/pixel_battle/deployment/compression_script/__main__.py deleted file mode 100644 index c7e0c3b..0000000 --- a/src/pixel_battle/deployment/compression_script/__main__.py +++ /dev/null @@ -1,21 +0,0 @@ -import asyncio - -from pixel_battle.deployment.compression_script.di import ( - compression_script_container, -) -from pixel_battle.presentation.scripts.update_chunk_view import ( - UpdateChunkViewScript, -) - - -async def main() -> None: - script = await compression_script_container.get(UpdateChunkViewScript) - - try: - await script() - finally: - await compression_script_container.close() - - -if __name__ == "__main__": - asyncio.run(main()) diff --git a/src/pixel_battle/deployment/god_service/asgi.py b/src/pixel_battle/deployment/god_service/asgi.py index 4081f91..9c3bf64 100644 --- a/src/pixel_battle/deployment/god_service/asgi.py +++ b/src/pixel_battle/deployment/god_service/asgi.py @@ -1,10 +1,10 @@ from pixel_battle.deployment.common.asgi import ASGIApp, LazyASGIApp -from pixel_battle.deployment.god_service.di import god_service_container +from pixel_battle.deployment.god_service.di import container from pixel_battle.presentation.web.app import app_from async def app_factory() -> ASGIApp: - return await app_from(god_service_container) + return await app_from(container) app = LazyASGIApp(factory=app_factory) diff --git a/src/pixel_battle/deployment/god_service/di.py b/src/pixel_battle/deployment/god_service/di.py index abe901d..ac2ab7a 100644 --- a/src/pixel_battle/deployment/god_service/di.py +++ b/src/pixel_battle/deployment/god_service/di.py @@ -7,8 +7,8 @@ OutOfProcessInfrastructureProvider, StreamingProvider, ) -from pixel_battle.presentation.distributed_tasks.update_chunk_view import ( - UpdateChunkViewTask, +from pixel_battle.presentation.distributed_tasks.refresh_chunk_view import ( + RefreshChunkViewTask, ) from pixel_battle.presentation.web.app import AppCoroutines, AppRouters from pixel_battle.presentation.web.routes.healthchek import ( @@ -42,16 +42,16 @@ def provide_routers(self) -> AppRouters: def provide_coroutines( self, streaming: Streaming, - update_chunk_view_task: UpdateChunkViewTask, + refresh_chunk_view_task: RefreshChunkViewTask, ) -> AppCoroutines: return [ - update_chunk_view_task.start_pulling(), - update_chunk_view_task.start_pushing(), + refresh_chunk_view_task.start_pulling(), + refresh_chunk_view_task.start_pushing(), streaming.start(), ] -god_service_container = make_async_container( +container = make_async_container( OutOfProcessInfrastructureProvider(), OutOfProcessInfrastructureAdapterProvider(), InteractorProvider(), diff --git a/src/pixel_battle/deployment/showcase_service/asgi.py b/src/pixel_battle/deployment/showcase_service/asgi.py index 51f88c5..c874165 100644 --- a/src/pixel_battle/deployment/showcase_service/asgi.py +++ b/src/pixel_battle/deployment/showcase_service/asgi.py @@ -1,12 +1,10 @@ from pixel_battle.deployment.common.asgi import ASGIApp, LazyASGIApp -from pixel_battle.deployment.showcase_service.di import ( - showcase_service_container, -) +from pixel_battle.deployment.showcase_service.di import container from pixel_battle.presentation.web.app import app_from async def app_factory() -> ASGIApp: - return await app_from(showcase_service_container) + return await app_from(container) app = LazyASGIApp(factory=app_factory) diff --git a/src/pixel_battle/deployment/showcase_service/di.py b/src/pixel_battle/deployment/showcase_service/di.py index 0ce5c0a..7b51ef7 100644 --- a/src/pixel_battle/deployment/showcase_service/di.py +++ b/src/pixel_battle/deployment/showcase_service/di.py @@ -37,7 +37,7 @@ def provide_coroutines(self) -> AppCoroutines: return [] -showcase_service_container = make_async_container( +container = make_async_container( ProcessInfrastructureAdapterProvider(), InteractorProvider(), ShowcaseServiceProvider(), diff --git a/src/pixel_battle/infrastructure/adapters/pixel_queue.py b/src/pixel_battle/infrastructure/adapters/pixel_queue.py index 57e5845..32aac85 100644 --- a/src/pixel_battle/infrastructure/adapters/pixel_queue.py +++ b/src/pixel_battle/infrastructure/adapters/pixel_queue.py @@ -41,7 +41,7 @@ def __init__( self, pixels: Iterable[Pixel[RGBColor]] = tuple(), *, - pulling_timeout_seconds: int | float, + pulling_timeout_seconds: int | float = 0, ) -> None: self.__pixels_by_chunk = defaultdict(list) self.__pulling_timeout_seconds = pulling_timeout_seconds @@ -250,6 +250,9 @@ async def __commit_offset( chunk: Chunk, results: RedisStreamResults, ) -> None: + if not results: + return + last_readed_event_offset = results[0][1][-1][0] if process is None: diff --git a/src/pixel_battle/presentation/distributed_tasks/update_chunk_view.py b/src/pixel_battle/presentation/distributed_tasks/refresh_chunk_view.py similarity index 68% rename from src/pixel_battle/presentation/distributed_tasks/update_chunk_view.py rename to src/pixel_battle/presentation/distributed_tasks/refresh_chunk_view.py index e3e4dbe..ff12908 100644 --- a/src/pixel_battle/presentation/distributed_tasks/update_chunk_view.py +++ b/src/pixel_battle/presentation/distributed_tasks/refresh_chunk_view.py @@ -2,24 +2,20 @@ from dataclasses import dataclass from functools import cached_property from itertools import product -from typing import ( - Any, - ClassVar, - NoReturn, -) +from typing import Any, ClassVar, NoReturn from redis.asyncio import RedisCluster -from pixel_battle.application.interactors.update_chunk_view import ( - UpdateChunkView, +from pixel_battle.application.interactors.refresh_chunk_view import ( + RefreshChunkView, ) -class UpdateChunkViewCommandError(Exception): ... +class RefreshChunkViewCommandError(Exception): ... @dataclass(kw_only=True, frozen=True, slots=True) -class UpdateChunkViewCommand: +class RefreshChunkViewCommand: chunk_number_x: int chunk_number_y: int @@ -28,26 +24,26 @@ def __post_init__(self) -> None: is_chunk_number_y_valid = self.chunk_number_y in range(10) if not is_chunk_number_x_valid or not is_chunk_number_y_valid: - raise UpdateChunkViewCommandError(str(self)) + raise RefreshChunkViewCommandError(str(self)) def to_bytes(self) -> bytes: return bytes([self.chunk_number_x * 10 + self.chunk_number_y]) @classmethod - def from_bytes(cls, bytes_: bytes) -> "UpdateChunkViewCommand": + def from_bytes(cls, bytes_: bytes) -> "RefreshChunkViewCommand": chunk_number_x = bytes_[0] // 10 chunk_number_y = bytes_[0] - chunk_number_x * 10 - return UpdateChunkViewCommand( + return RefreshChunkViewCommand( chunk_number_x=chunk_number_x, chunk_number_y=chunk_number_y ) @dataclass(kw_only=True, frozen=True) -class UpdateChunkViewTask: - update_chunk_view: UpdateChunkView[Any, Any] +class RefreshChunkViewTask: + refresh_chunk_view: RefreshChunkView[Any] redis_cluster: RedisCluster - __queue_key: ClassVar = b"update_chunk_view" + __queue_key: ClassVar = b"refresh_chunk_view" async def start_pushing(self) -> NoReturn: while True: @@ -59,26 +55,26 @@ async def start_pulling(self) -> NoReturn: command = await self.__pull_one_command() await self.__execute(command) - async def __pull_one_command(self) -> UpdateChunkViewCommand: + async def __pull_one_command(self) -> RefreshChunkViewCommand: result = await self.redis_cluster.bzmpop( # type: ignore[misc] 0, 1, [self.__queue_key], min=True ) command_bytes: bytes = result[1][0][0] - return UpdateChunkViewCommand.from_bytes(command_bytes) + return RefreshChunkViewCommand.from_bytes(command_bytes) async def __push_commands(self) -> None: await self.redis_cluster.zadd(self.__queue_key, self.__mapping_to_push) - async def __execute(self, command: UpdateChunkViewCommand) -> None: - await self.update_chunk_view( + async def __execute(self, command: RefreshChunkViewCommand) -> None: + await self.refresh_chunk_view( command.chunk_number_x, command.chunk_number_y ) @cached_property def __mapping_to_push(self) -> dict[bytes, int]: commands = ( - UpdateChunkViewCommand( + RefreshChunkViewCommand( chunk_number_x=chunk_number_x, chunk_number_y=chunk_number_y ) for chunk_number_x, chunk_number_y in product(range(10), repeat=2) diff --git a/src/pixel_battle/presentation/scripts/update_chunk_view.py b/src/pixel_battle/presentation/scripts/refresh_chunk_image.py similarity index 77% rename from src/pixel_battle/presentation/scripts/update_chunk_view.py rename to src/pixel_battle/presentation/scripts/refresh_chunk_image.py index c097436..6f52a35 100644 --- a/src/pixel_battle/presentation/scripts/update_chunk_view.py +++ b/src/pixel_battle/presentation/scripts/refresh_chunk_image.py @@ -2,19 +2,17 @@ from dataclasses import dataclass from datetime import datetime from io import TextIOBase -from typing import Any, cast +from typing import cast -from pixel_battle.application.interactors.update_chunk_view import ( - UpdateChunkView, -) -from pixel_battle.application.ports.chunk_view import ( - ChunkView, +from pixel_battle.application.interactors.refresh_chunk_view import ( + RefreshChunkView, ) +from pixel_battle.infrastructure.adapters.chunk_view import PNGImageChunkView @dataclass(kw_only=True, frozen=True, slots=True) -class UpdateChunkViewScript: - update_chunk_view: UpdateChunkView[ChunkView, Any] +class RefreshChunkImageScript: + refresh_chunk_view: RefreshChunkView[PNGImageChunkView] ok_file: TextIOBase = cast(TextIOBase, sys.stdout) error_file: TextIOBase = cast(TextIOBase, sys.stderr) @@ -32,14 +30,14 @@ async def __call__(self) -> None: start_time = datetime.now() - await self.update_chunk_view(chunk_number_x, chunk_number_y) + await self.refresh_chunk_view(chunk_number_x, chunk_number_y) end_time = datetime.now() time_delta = end_time - start_time delta_seconds = time_delta.total_seconds() - time_message = f"(Update lasted {delta_seconds} seconds)" - self.__print(f"The chunk view was updated. {time_message}") + time_message = f"(Lasted {delta_seconds} seconds)" + self.__print(f"The chunk image was refreshed. {time_message}") def __is_help(self) -> bool: has_flag = len(sys.argv) == 2 and sys.argv[1] == "--help" # noqa: PLR2004 diff --git a/src/pixel_battle/presentation/web/routes/view_chunk.py b/src/pixel_battle/presentation/web/routes/view_chunk.py index 8580aa7..a4297a6 100644 --- a/src/pixel_battle/presentation/web/routes/view_chunk.py +++ b/src/pixel_battle/presentation/web/routes/view_chunk.py @@ -1,5 +1,3 @@ -from typing import Any - from dishka.integrations.fastapi import FromDishka, inject from fastapi import APIRouter from fastapi.responses import Response @@ -16,7 +14,7 @@ @router.get("/canvas/chunk/{chunk_number_x}/{chunk_number_y}") @inject async def view_chunk( - view_chunk: FromDishka[ViewChunk[PNGImageChunkView, Any]], + view_chunk: FromDishka[ViewChunk[PNGImageChunkView]], chunk_number_x: ChunkNumberX, chunk_number_y: ChunkNumberY, ) -> Response: diff --git a/tests/test_pixel_battle/test_presentation/test_distributed_tasks/test_update_chunk_view/__init__.py b/tests/test_pixel_battle/test_presentation/test_distributed_tasks/test_refresh_chunk_view/__init__.py similarity index 100% rename from tests/test_pixel_battle/test_presentation/test_distributed_tasks/test_update_chunk_view/__init__.py rename to tests/test_pixel_battle/test_presentation/test_distributed_tasks/test_refresh_chunk_view/__init__.py diff --git a/tests/test_pixel_battle/test_presentation/test_distributed_tasks/test_refresh_chunk_view/test_command.py b/tests/test_pixel_battle/test_presentation/test_distributed_tasks/test_refresh_chunk_view/test_command.py new file mode 100644 index 0000000..91ea3b9 --- /dev/null +++ b/tests/test_pixel_battle/test_presentation/test_distributed_tasks/test_refresh_chunk_view/test_command.py @@ -0,0 +1,23 @@ +from pytest import mark + +from pixel_battle.presentation.distributed_tasks.refresh_chunk_view import ( + RefreshChunkViewCommand, +) + + +@mark.parametrize( + "command", + [ + RefreshChunkViewCommand(chunk_number_x=0, chunk_number_y=0), + RefreshChunkViewCommand(chunk_number_x=9, chunk_number_y=9), + RefreshChunkViewCommand(chunk_number_x=0, chunk_number_y=9), + RefreshChunkViewCommand(chunk_number_x=9, chunk_number_y=0), + RefreshChunkViewCommand(chunk_number_x=1, chunk_number_y=9), + RefreshChunkViewCommand(chunk_number_x=9, chunk_number_y=1), + RefreshChunkViewCommand(chunk_number_x=5, chunk_number_y=5), + ], +) +def test_isomorphism(command: RefreshChunkViewCommand) -> None: + next_command = RefreshChunkViewCommand.from_bytes(command.to_bytes()) + + assert next_command == command diff --git a/tests/test_pixel_battle/test_presentation/test_distributed_tasks/test_update_chunk_view/test_command.py b/tests/test_pixel_battle/test_presentation/test_distributed_tasks/test_update_chunk_view/test_command.py deleted file mode 100644 index 8ce442d..0000000 --- a/tests/test_pixel_battle/test_presentation/test_distributed_tasks/test_update_chunk_view/test_command.py +++ /dev/null @@ -1,23 +0,0 @@ -from pytest import mark - -from pixel_battle.presentation.distributed_tasks.update_chunk_view import ( - UpdateChunkViewCommand, -) - - -@mark.parametrize( - "command", - [ - UpdateChunkViewCommand(chunk_number_x=0, chunk_number_y=0), - UpdateChunkViewCommand(chunk_number_x=9, chunk_number_y=9), - UpdateChunkViewCommand(chunk_number_x=0, chunk_number_y=9), - UpdateChunkViewCommand(chunk_number_x=9, chunk_number_y=0), - UpdateChunkViewCommand(chunk_number_x=1, chunk_number_y=9), - UpdateChunkViewCommand(chunk_number_x=9, chunk_number_y=1), - UpdateChunkViewCommand(chunk_number_x=5, chunk_number_y=5), - ], -) -def test_isomorphism(command: UpdateChunkViewCommand) -> None: - next_command = UpdateChunkViewCommand.from_bytes(command.to_bytes()) - - assert next_command == command diff --git a/tests/test_pixel_battle/test_presentation/test_scripts/test_update_chunk_view.py b/tests/test_pixel_battle/test_presentation/test_scripts/test_refresh_chunk_image.py similarity index 66% rename from tests/test_pixel_battle/test_presentation/test_scripts/test_update_chunk_view.py rename to tests/test_pixel_battle/test_presentation/test_scripts/test_refresh_chunk_image.py index a0f5087..82b3e4c 100644 --- a/tests/test_pixel_battle/test_presentation/test_scripts/test_update_chunk_view.py +++ b/tests/test_pixel_battle/test_presentation/test_scripts/test_refresh_chunk_image.py @@ -3,22 +3,20 @@ from pytest import fixture, mark -from pixel_battle.application.interactors.update_chunk_view import ( - UpdateChunkView, +from pixel_battle.application.interactors.refresh_chunk_view import ( + RefreshChunkView, ) -from pixel_battle.infrastructure.adapters.broker import InMemoryBroker from pixel_battle.infrastructure.adapters.chunk_view import ( DefaultCollectionChunkViewWhen, ) from pixel_battle.infrastructure.adapters.chunk_views import InMemoryChunkViews -from pixel_battle.infrastructure.adapters.lock import FakeLock -from pixel_battle.infrastructure.adapters.offsets import InMemoryOffsets -from pixel_battle.presentation.scripts.update_chunk_view import ( - UpdateChunkViewScript, +from pixel_battle.infrastructure.adapters.pixel_queue import InMemoryPixelQueue +from pixel_battle.presentation.scripts.refresh_chunk_image import ( + RefreshChunkImageScript, ) -type Script = UpdateChunkViewScript +type Script = RefreshChunkImageScript @fixture @@ -26,18 +24,16 @@ def script() -> Script: ok_file = StringIO() error_file = StringIO() - update_chunk_view = UpdateChunkView( - broker=InMemoryBroker(), - lock=FakeLock(), + refresh_chunk_view = RefreshChunkView( + pixel_queue=InMemoryPixelQueue(), default_chunk_view_when=DefaultCollectionChunkViewWhen(), chunk_views=InMemoryChunkViews(), - offsets_of_latest_compressed_events=InMemoryOffsets(), ) - return UpdateChunkViewScript( + return RefreshChunkImageScript( ok_file=ok_file, error_file=error_file, - update_chunk_view=update_chunk_view, + refresh_chunk_view=refresh_chunk_view, ) @@ -48,8 +44,8 @@ async def test_ok(script: Script) -> None: output = script.ok_file.getvalue().split("\n") - assert output[0].startswith("The chunk view was updated. (") - assert output[0].endswith(")") + assert output[0].startswith("The chunk image was refreshed. (Lasted ") + assert output[0].endswith(" seconds)") @mark.parametrize( diff --git a/tests/test_pixel_battle/test_presentation/test_web/conftest.py b/tests/test_pixel_battle/test_presentation/test_web/conftest.py index ab2746b..0d870b5 100644 --- a/tests/test_pixel_battle/test_presentation/test_web/conftest.py +++ b/tests/test_pixel_battle/test_presentation/test_web/conftest.py @@ -1,6 +1,6 @@ from datetime import UTC, datetime from functools import partial -from typing import Any, AsyncIterator +from typing import AsyncIterator from dishka import AsyncContainer, Provider, Scope, make_async_container from dishka.integrations.fastapi import setup_dishka @@ -18,15 +18,12 @@ from pixel_battle.application.interactors.view_chunk_stream import ( ViewChunkStream, ) -from pixel_battle.application.ports.broker import Broker from pixel_battle.application.ports.chunk_view import DefaultChunkViewWhen from pixel_battle.application.ports.chunk_views import ChunkViews from pixel_battle.application.ports.clock import Clock -from pixel_battle.application.ports.lock import Lock -from pixel_battle.application.ports.offsets import Offsets +from pixel_battle.application.ports.pixel_queue import PixelQueue from pixel_battle.application.ports.user_data_signing import UserDataSigning from pixel_battle.entities.space.time import Time -from pixel_battle.infrastructure.adapters.broker import InMemoryBroker from pixel_battle.infrastructure.adapters.chunk_view import ( DefaultPNGImageChunkViewWhen, PNGImageChunkView, @@ -35,12 +32,7 @@ InMemoryChunkViews, ) from pixel_battle.infrastructure.adapters.clock import StoppedClock -from pixel_battle.infrastructure.adapters.lock import ( - FakeLock, -) -from pixel_battle.infrastructure.adapters.offsets import ( - InMemoryOffsets, -) +from pixel_battle.infrastructure.adapters.pixel_queue import InMemoryPixelQueue from pixel_battle.infrastructure.adapters.user_data_signing import ( UserDataSigningToHS256JWT, ) @@ -75,15 +67,10 @@ def container() -> AsyncContainer: provider.provide( lambda: InMemoryChunkViews(), provides=ChunkViews[PNGImageChunkView] ) - provider.provide(lambda: InMemoryOffsets(), provides=Offsets[int]) - provider.provide(lambda: InMemoryBroker(), provides=Broker) + provider.provide(lambda: InMemoryPixelQueue(), provides=PixelQueue) provider.provide(StoppedClock, provides=Clock) - provider.provide(FakeLock, provides=Lock) provider.provide(RecolorPixel[str]) - provider.provide( - ViewChunk[PNGImageChunkView, int], - provides=ViewChunk[PNGImageChunkView, Any] - ) + provider.provide(ViewChunk[PNGImageChunkView]) provider.provide(ViewChunkStream) @partial(provider.provide, provides=Streaming) diff --git a/tests/test_pixel_battle/test_presentation/test_web/test_routes/test_view_chunk.py b/tests/test_pixel_battle/test_presentation/test_web/test_routes/test_view_chunk.py index a90f080..ca14b1f 100644 --- a/tests/test_pixel_battle/test_presentation/test_web/test_routes/test_view_chunk.py +++ b/tests/test_pixel_battle/test_presentation/test_web/test_routes/test_view_chunk.py @@ -6,7 +6,7 @@ from PIL.Image import open from pytest import mark -from pixel_battle.application.ports.broker import Broker +from pixel_battle.application.ports.pixel_queue import PixelQueue from pixel_battle.entities.core.pixel import Pixel from pixel_battle.entities.geometry.vector import Vector from pixel_battle.entities.space.color import black, red @@ -18,10 +18,10 @@ async def test_ok( client: AsyncClient, stage: str, container: AsyncContainer ) -> None: async with container() as request_container: - broker = await request_container.get(Broker[int]) + queue = await request_container.get(PixelQueue) - await broker.push_event_with(pixel=Pixel(position=Vector(x=5), color=black)) - await broker.push_event_with(pixel=Pixel(position=Vector(y=5), color=red)) + await queue.push(Pixel(position=Vector(x=5), color=black)) + await queue.push(Pixel(position=Vector(y=5), color=red)) response = await client.get("/canvas/chunk/0/0")