From 428eabd9db790286765c4117fc7fd9840003528d Mon Sep 17 00:00:00 2001 From: Hrishabh Tiwari <74908943+Hrishabh17@users.noreply.github.com> Date: Thu, 17 Oct 2024 12:41:17 +0530 Subject: [PATCH] refactor: remove batching from dep field import (#566) * refactor: remove batching from dep field import * fix: add missing code --- apps/sage_intacct/dependent_fields.py | 153 ++++-------------- apps/sage_intacct/utils.py | 9 +- .../test_sageintacct/test_dependent_fields.py | 10 +- 3 files changed, 41 insertions(+), 131 deletions(-) diff --git a/apps/sage_intacct/dependent_fields.py b/apps/sage_intacct/dependent_fields.py index cf087ccd..0889052b 100644 --- a/apps/sage_intacct/dependent_fields.py +++ b/apps/sage_intacct/dependent_fields.py @@ -1,6 +1,6 @@ import logging from datetime import datetime, timezone -from typing import Dict, List +from typing import Dict from time import sleep from django.contrib.postgres.fields import JSONField @@ -68,76 +68,13 @@ def post_dependent_cost_code(import_log: ImportLog, dependent_field_setting: Dep use_job_code_in_naming = 'PROJECT' in configuration.import_code_fields use_cost_code_in_naming = 'COST_CODE' in configuration.import_code_fields - BATCH_SIZE = 200 posted_cost_codes = set() total_batches = 0 processed_batches = 0 is_errored = False - cost_type_ids = [] - - cost_types = CostType.objects.filter(**filters).values_list('id', flat=True) - - for cost_type_id in cost_types.iterator(chunk_size=BATCH_SIZE): - cost_type_ids.append(cost_type_id) - - if len(cost_type_ids) >= BATCH_SIZE: - total_batches_processed, batches_processed, batch_errored = process_cost_code_batch( - platform, - cost_type_ids, - posted_cost_codes, - use_job_code_in_naming, - use_cost_code_in_naming, - dependent_field_setting, - is_enabled - ) - total_batches += total_batches_processed - processed_batches += batches_processed - is_errored = is_errored or batch_errored - cost_type_ids = [] - - if cost_type_ids: - total_batches_processed, batches_processed, batch_errored = process_cost_code_batch( - platform, - cost_type_ids, - posted_cost_codes, - use_job_code_in_naming, - use_cost_code_in_naming, - dependent_field_setting, - is_enabled - ) - total_batches += total_batches_processed - processed_batches += batches_processed - is_errored = is_errored or batch_errored - - if is_errored or import_log.status != 'IN_PROGRESS': - import_log.status = 'PARTIALLY_FAILED' - else: - import_log.status = 'COMPLETE' - import_log.error_log = [] - import_log.last_successful_run_at = last_successful_run_at - - import_log.total_batches_count = total_batches - import_log.processed_batches_count = processed_batches - import_log.save() - - return posted_cost_codes, is_errored - - -def process_cost_code_batch( - platform: PlatformConnector, - cost_type_ids: List[int], - posted_cost_codes: set[str], - use_job_code_in_naming: bool, - use_cost_code_in_naming: bool, - dependent_field_setting: DependentFieldSetting, - is_enabled: bool -): - total_batches = 0 - processed_batches = 0 - is_errored = False projects_batch = ( - CostType.objects.filter(workspace_id=dependent_field_setting.workspace_id, id__in=cost_type_ids) + CostType.objects.filter(**filters) .values('project_name', 'project_id') .annotate( cost_codes=JSONBAgg( @@ -164,6 +101,8 @@ def process_cost_code_batch( ).values_list('value', flat=True) ) + logger.info(f'Posting Cost Codes | WORKSPACE_ID: {dependent_field_setting.workspace_id} | Existing Projects in Fyle COUNT: {len(existing_projects_in_fyle)}') + for project in projects_batch: payload = [] cost_code_names = set() @@ -192,53 +131,6 @@ def process_cost_code_batch( is_errored = True logger.error(f'Exception while posting dependent cost code | Error: {exception} | Payload: {payload}') - return total_batches, processed_batches, is_errored - - -@handle_import_exceptions -def post_dependent_cost_type(import_log: ImportLog, dependent_field_setting: DependentFieldSetting, platform: PlatformConnector, filters: Dict): - configuration = Configuration.objects.filter(workspace_id=dependent_field_setting.workspace_id).first() - - last_successful_run_at = datetime.now(timezone.utc) - use_cost_code_in_naming = 'COST_CODE' in configuration.import_code_fields - use_cost_type_code_in_naming = 'COST_TYPE' in configuration.import_code_fields - - BATCH_SIZE = 200 - total_batches = 0 - processed_batches = 0 - is_errored = False - cost_type_ids = [] - - cost_types = CostType.objects.filter(is_imported=False, **filters).values_list('id', flat=True) - - for cost_type_id in cost_types.iterator(chunk_size=BATCH_SIZE): - cost_type_ids.append(cost_type_id) - - if len(cost_type_ids) >= BATCH_SIZE: - total_batches_processed, batches_processed, batch_errored = process_cost_type_batch( - platform, - cost_type_ids, - use_cost_code_in_naming, - use_cost_type_code_in_naming, - dependent_field_setting - ) - total_batches += total_batches_processed - processed_batches += batches_processed - is_errored = is_errored or batch_errored - cost_type_ids = [] - - if cost_type_ids: - total_batches_processed, batches_processed, batch_errored = process_cost_type_batch( - platform, - cost_type_ids, - use_cost_code_in_naming, - use_cost_type_code_in_naming, - dependent_field_setting - ) - total_batches += total_batches_processed - processed_batches += batches_processed - is_errored = is_errored or batch_errored - if is_errored or import_log.status != 'IN_PROGRESS': import_log.status = 'PARTIALLY_FAILED' else: @@ -250,22 +142,23 @@ def post_dependent_cost_type(import_log: ImportLog, dependent_field_setting: Dep import_log.processed_batches_count = processed_batches import_log.save() - return is_errored + return posted_cost_codes, is_errored + + +@handle_import_exceptions +def post_dependent_cost_type(import_log: ImportLog, dependent_field_setting: DependentFieldSetting, platform: PlatformConnector, filters: Dict): + configuration = Configuration.objects.filter(workspace_id=dependent_field_setting.workspace_id).first() + last_successful_run_at = datetime.now(timezone.utc) + use_cost_code_in_naming = 'COST_CODE' in configuration.import_code_fields + use_cost_type_code_in_naming = 'COST_TYPE' in configuration.import_code_fields -def process_cost_type_batch( - platform: PlatformConnector, - cost_type_ids: List[int], - use_cost_code_in_naming: bool, - use_cost_type_code_in_naming: bool, - dependent_field_setting: DependentFieldSetting -): total_batches = 0 processed_batches = 0 is_errored = False cost_types_batch = ( - CostType.objects.filter(id__in=cost_type_ids) + CostType.objects.filter(**filters) .values('task_name', 'task_id') .annotate( cost_types=JSONBAgg( @@ -280,6 +173,8 @@ def process_cost_type_batch( ) ) + logger.info(f'Posting Cost Types | WORKSPACE_ID: {dependent_field_setting.workspace_id} | Existing Cost Code in Fyle COUNT: {len(cost_types_batch)}') + for cost_types in cost_types_batch: payload = [] cost_code_name = prepend_code_to_name(use_cost_code_in_naming, cost_types['task_name'], cost_types['task_id']) @@ -305,7 +200,18 @@ def process_cost_type_batch( is_errored = True logger.error(f'Exception while posting dependent cost type | Error: {exception} | Payload: {payload}') - return total_batches, processed_batches, is_errored + if is_errored or import_log.status != 'IN_PROGRESS': + import_log.status = 'PARTIALLY_FAILED' + else: + import_log.status = 'COMPLETE' + import_log.error_log = [] + import_log.last_successful_run_at = last_successful_run_at + + import_log.total_batches_count = total_batches + import_log.processed_batches_count = processed_batches + import_log.save() + + return is_errored def post_dependent_expense_field_values(workspace_id: int, dependent_field_setting: DependentFieldSetting, platform: PlatformConnector = None, cost_code_import_log: ImportLog = None, cost_type_import_log: ImportLog = None): @@ -318,9 +224,12 @@ def post_dependent_expense_field_values(workspace_id: int, dependent_field_setti if dependent_field_setting.last_successful_import_at: filters['updated_at__gte'] = dependent_field_setting.last_successful_import_at + else: + filters['is_imported'] = False posted_cost_types, is_cost_code_errored = post_dependent_cost_code(cost_code_import_log, dependent_field_setting, platform, filters) if posted_cost_types: + filters['is_imported'] = False filters['task_name__in'] = list(set(posted_cost_types)) if cost_code_import_log.status in ['FAILED', 'FATAL']: diff --git a/apps/sage_intacct/utils.py b/apps/sage_intacct/utils.py index eedd39a4..f58749a8 100644 --- a/apps/sage_intacct/utils.py +++ b/apps/sage_intacct/utils.py @@ -216,7 +216,8 @@ def sync_departments(self): 'display_name': 'department', 'value': department['TITLE'], 'destination_id': department['DEPARTMENTID'], - 'active': True + 'active': True, + 'code': department['DEPARTMENTID'] }) DestinationAttribute.bulk_create_or_update_destination_attributes( @@ -260,7 +261,8 @@ def sync_expense_types(self): 'detail': { 'gl_account_no': expense_type['GLACCOUNTNO'], 'gl_account_title': expense_type['GLACCOUNTTITLE'] - } + }, + 'code': expense_type['ACCOUNTLABEL'] }) DestinationAttribute.bulk_create_or_update_destination_attributes( @@ -381,7 +383,8 @@ def sync_projects(self): 'value': project['NAME'], 'destination_id': project['PROJECTID'], 'active': project['STATUS'] == 'active', - 'detail': detail + 'detail': detail, + 'code': project['PROJECTID'] }) DestinationAttribute.bulk_create_or_update_destination_attributes( diff --git a/tests/test_sageintacct/test_dependent_fields.py b/tests/test_sageintacct/test_dependent_fields.py index b5338759..289e6e06 100644 --- a/tests/test_sageintacct/test_dependent_fields.py +++ b/tests/test_sageintacct/test_dependent_fields.py @@ -70,11 +70,10 @@ def test_post_dependent_cost_type(mocker, db, create_cost_type, create_dependent cost_types = CostType.objects.filter(workspace_id=workspace_id, is_imported=True).update(is_imported=False) - mocker.patch('apps.sage_intacct.dependent_fields.process_cost_type_batch', side_effect=Exception('Something went wrong')) + mocker.patch('apps.sage_intacct.dependent_fields.post_dependent_cost_code', side_effect=Exception('Something went wrong')) post_dependent_cost_type(import_log, create_dependent_field_setting, platform, {'workspace_id': 1}) - assert import_log.status == 'FATAL' - assert import_log.error_log['message'] == 'Something went wrong' + assert import_log.status == 'PARTIALLY_FAILED' def test_post_dependent_cost_code(mocker, db, create_cost_type, create_dependent_field_setting): @@ -109,11 +108,10 @@ def test_post_dependent_cost_code(mocker, db, create_cost_type, create_dependent import_log.status = 'IN_PROGRESS' import_log.save() - mocker.patch('apps.sage_intacct.dependent_fields.process_cost_code_batch', side_effect=Exception('Something went wrong')) + mocker.patch('apps.sage_intacct.dependent_fields.sync_sage_intacct_attributes', side_effect=Exception('Something went wrong')) post_dependent_cost_code(import_log, create_dependent_field_setting, platform, {'workspace_id': 1}) - assert import_log.status == 'FATAL' - assert import_log.error_log['message'] == 'Something went wrong' + assert import_log.status == 'PARTIALLY_FAILED' def test_post_dependent_expense_field_values(db, mocker, create_cost_type, create_dependent_field_setting):