diff --git a/src/daq/store/csv.py b/src/daq/store/csv.py index f26d2fa..c4b9c93 100644 --- a/src/daq/store/csv.py +++ b/src/daq/store/csv.py @@ -1,10 +1,10 @@ import csv import os +from collections import deque from dataclasses import dataclass from datetime import datetime from io import TextIOWrapper from pathlib import Path -from queue import Empty, Queue from typing import Any, cast from daq.models import DAQJobConfig @@ -31,7 +31,7 @@ class DAQJobStoreCSVConfig(DAQJobConfig): class CSVFile: file: TextIOWrapper last_flush_date: datetime - write_queue: Queue[list[Any]] + write_queue: deque[list[Any]] class DAQJobStoreCSV(DAQJobStore): @@ -55,11 +55,11 @@ def handle_message(self, message: DAQJobMessageStore) -> bool: # Write headers if the file is new if new_file: - file.write_queue.put(message.keys) + file.write_queue.append(message.keys) # Append rows to write_queue for row in message.data: - file.write_queue.put(row) + file.write_queue.append(row) return True @@ -74,7 +74,7 @@ def _open_csv_file(self, file_path: str) -> tuple[CSVFile, bool]: Path(file_path).touch() # Open file - file = CSVFile(open(file_path, "a"), datetime.now(), Queue()) + file = CSVFile(open(file_path, "a"), datetime.now(), deque()) self._open_csv_files[file_path] = file else: file_exists = True @@ -104,8 +104,8 @@ def store_loop(self): rows_to_write.clear() for _ in range(DAQ_JOB_STORE_CSV_WRITE_BATCH_SIZE): try: - rows_to_write.append(file.write_queue.get_nowait()) - except Empty: + rows_to_write.append(file.write_queue.pop()) + except IndexError: break if len(rows_to_write) > 0: writer.writerows(rows_to_write) diff --git a/src/tests/test_csv.py b/src/tests/test_csv.py index 27f830f..6c5843b 100644 --- a/src/tests/test_csv.py +++ b/src/tests/test_csv.py @@ -1,6 +1,6 @@ import unittest +from collections import deque from datetime import datetime, timedelta -from queue import Queue from unittest.mock import MagicMock, mock_open, patch from daq.store.csv import ( @@ -38,7 +38,7 @@ def test_handle_message_new_file( mock_open.assert_called_once_with("test.csv", "a") self.assertIn("test.csv", self.store._open_csv_files) file = self.store._open_csv_files["test.csv"] - self.assertEqual(file.write_queue.qsize(), 3) # 1 header + 2 rows + self.assertEqual(len(file.write_queue), 3) # 1 header + 2 rows @patch("daq.store.csv.modify_file_path", return_value="test.csv") @patch("builtins.open", new_callable=mock_open) @@ -58,14 +58,14 @@ def test_handle_message_existing_file(self, mock_exists, mock_open, mock_add_dat mock_open.assert_called_once_with("test.csv", "a") self.assertIn("test.csv", self.store._open_csv_files) file = self.store._open_csv_files["test.csv"] - self.assertEqual(file.write_queue.qsize(), 2) # 2 rows only, no header + self.assertEqual(len(file.write_queue), 2) # 2 rows only, no header def test_flush(self): file = CSVFile( file=MagicMock(), last_flush_date=datetime.now() - timedelta(seconds=DAQ_JOB_STORE_CSV_FLUSH_INTERVAL_SECONDS + 1), - write_queue=Queue(), + write_queue=deque(), ) result = self.store._flush(file) self.assertTrue(result) @@ -80,10 +80,10 @@ def test_store_loop(self, mock_csv_writer): file=MagicMock(closed=False), last_flush_date=datetime.now() - timedelta(seconds=DAQ_JOB_STORE_CSV_FLUSH_INTERVAL_SECONDS + 1), - write_queue=Queue(), + write_queue=deque(), ) - file.write_queue.put(["row1_col1", "row1_col2"]) - file.write_queue.put(["row2_col1", "row2_col2"]) + file.write_queue.append(["row1_col1", "row1_col2"]) + file.write_queue.append(["row2_col1", "row2_col2"]) self.store._open_csv_files["test.csv"] = file mock_writer_instance = mock_csv_writer.return_value @@ -99,10 +99,10 @@ def test_store_loop_writerows(self, mock_csv_writer): file=MagicMock(closed=False), last_flush_date=datetime.now() - timedelta(seconds=DAQ_JOB_STORE_CSV_FLUSH_INTERVAL_SECONDS + 1), - write_queue=Queue(), + write_queue=deque(), ) - file.write_queue.put(["row1_col1", "row1_col2"]) - file.write_queue.put(["row2_col1", "row2_col2"]) + file.write_queue.append(["row1_col1", "row1_col2"]) + file.write_queue.append(["row2_col1", "row2_col2"]) self.store._open_csv_files["test.csv"] = file mock_writer_instance = mock_csv_writer.return_value @@ -110,7 +110,7 @@ def test_store_loop_writerows(self, mock_csv_writer): self.store.store_loop() mock_writer_instance.writerows.assert_called_with( - [["row1_col1", "row1_col2"], ["row2_col1", "row2_col2"]] + [["row2_col1", "row2_col2"], ["row1_col1", "row1_col2"]] ) self.assertTrue(file.file.flush.called) @@ -118,7 +118,7 @@ def test_del(self): file = CSVFile( file=MagicMock(closed=False), last_flush_date=datetime.now(), - write_queue=Queue(), + write_queue=deque(), ) self.store._open_csv_files["test.csv"] = file