Skip to content

Commit

Permalink
refactor: remove batching from dep field import (#566)
Browse files Browse the repository at this point in the history
* refactor: remove batching from dep field import

* fix: add missing code
  • Loading branch information
Hrishabh17 authored Oct 17, 2024
1 parent 3c44418 commit 428eabd
Show file tree
Hide file tree
Showing 3 changed files with 41 additions and 131 deletions.
153 changes: 31 additions & 122 deletions apps/sage_intacct/dependent_fields.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
import logging
from datetime import datetime, timezone
from typing import Dict, List
from typing import Dict
from time import sleep

from django.contrib.postgres.fields import JSONField
Expand Down Expand Up @@ -68,76 +68,13 @@ def post_dependent_cost_code(import_log: ImportLog, dependent_field_setting: Dep
use_job_code_in_naming = 'PROJECT' in configuration.import_code_fields
use_cost_code_in_naming = 'COST_CODE' in configuration.import_code_fields

BATCH_SIZE = 200
posted_cost_codes = set()
total_batches = 0
processed_batches = 0
is_errored = False
cost_type_ids = []

cost_types = CostType.objects.filter(**filters).values_list('id', flat=True)

for cost_type_id in cost_types.iterator(chunk_size=BATCH_SIZE):
cost_type_ids.append(cost_type_id)

if len(cost_type_ids) >= BATCH_SIZE:
total_batches_processed, batches_processed, batch_errored = process_cost_code_batch(
platform,
cost_type_ids,
posted_cost_codes,
use_job_code_in_naming,
use_cost_code_in_naming,
dependent_field_setting,
is_enabled
)
total_batches += total_batches_processed
processed_batches += batches_processed
is_errored = is_errored or batch_errored
cost_type_ids = []

if cost_type_ids:
total_batches_processed, batches_processed, batch_errored = process_cost_code_batch(
platform,
cost_type_ids,
posted_cost_codes,
use_job_code_in_naming,
use_cost_code_in_naming,
dependent_field_setting,
is_enabled
)
total_batches += total_batches_processed
processed_batches += batches_processed
is_errored = is_errored or batch_errored

if is_errored or import_log.status != 'IN_PROGRESS':
import_log.status = 'PARTIALLY_FAILED'
else:
import_log.status = 'COMPLETE'
import_log.error_log = []
import_log.last_successful_run_at = last_successful_run_at

import_log.total_batches_count = total_batches
import_log.processed_batches_count = processed_batches
import_log.save()

return posted_cost_codes, is_errored


def process_cost_code_batch(
platform: PlatformConnector,
cost_type_ids: List[int],
posted_cost_codes: set[str],
use_job_code_in_naming: bool,
use_cost_code_in_naming: bool,
dependent_field_setting: DependentFieldSetting,
is_enabled: bool
):
total_batches = 0
processed_batches = 0
is_errored = False

projects_batch = (
CostType.objects.filter(workspace_id=dependent_field_setting.workspace_id, id__in=cost_type_ids)
CostType.objects.filter(**filters)
.values('project_name', 'project_id')
.annotate(
cost_codes=JSONBAgg(
Expand All @@ -164,6 +101,8 @@ def process_cost_code_batch(
).values_list('value', flat=True)
)

logger.info(f'Posting Cost Codes | WORKSPACE_ID: {dependent_field_setting.workspace_id} | Existing Projects in Fyle COUNT: {len(existing_projects_in_fyle)}')

for project in projects_batch:
payload = []
cost_code_names = set()
Expand Down Expand Up @@ -192,53 +131,6 @@ def process_cost_code_batch(
is_errored = True
logger.error(f'Exception while posting dependent cost code | Error: {exception} | Payload: {payload}')

return total_batches, processed_batches, is_errored


@handle_import_exceptions
def post_dependent_cost_type(import_log: ImportLog, dependent_field_setting: DependentFieldSetting, platform: PlatformConnector, filters: Dict):
configuration = Configuration.objects.filter(workspace_id=dependent_field_setting.workspace_id).first()

last_successful_run_at = datetime.now(timezone.utc)
use_cost_code_in_naming = 'COST_CODE' in configuration.import_code_fields
use_cost_type_code_in_naming = 'COST_TYPE' in configuration.import_code_fields

BATCH_SIZE = 200
total_batches = 0
processed_batches = 0
is_errored = False
cost_type_ids = []

cost_types = CostType.objects.filter(is_imported=False, **filters).values_list('id', flat=True)

for cost_type_id in cost_types.iterator(chunk_size=BATCH_SIZE):
cost_type_ids.append(cost_type_id)

if len(cost_type_ids) >= BATCH_SIZE:
total_batches_processed, batches_processed, batch_errored = process_cost_type_batch(
platform,
cost_type_ids,
use_cost_code_in_naming,
use_cost_type_code_in_naming,
dependent_field_setting
)
total_batches += total_batches_processed
processed_batches += batches_processed
is_errored = is_errored or batch_errored
cost_type_ids = []

if cost_type_ids:
total_batches_processed, batches_processed, batch_errored = process_cost_type_batch(
platform,
cost_type_ids,
use_cost_code_in_naming,
use_cost_type_code_in_naming,
dependent_field_setting
)
total_batches += total_batches_processed
processed_batches += batches_processed
is_errored = is_errored or batch_errored

if is_errored or import_log.status != 'IN_PROGRESS':
import_log.status = 'PARTIALLY_FAILED'
else:
Expand All @@ -250,22 +142,23 @@ def post_dependent_cost_type(import_log: ImportLog, dependent_field_setting: Dep
import_log.processed_batches_count = processed_batches
import_log.save()

return is_errored
return posted_cost_codes, is_errored


@handle_import_exceptions
def post_dependent_cost_type(import_log: ImportLog, dependent_field_setting: DependentFieldSetting, platform: PlatformConnector, filters: Dict):
configuration = Configuration.objects.filter(workspace_id=dependent_field_setting.workspace_id).first()

last_successful_run_at = datetime.now(timezone.utc)
use_cost_code_in_naming = 'COST_CODE' in configuration.import_code_fields
use_cost_type_code_in_naming = 'COST_TYPE' in configuration.import_code_fields

def process_cost_type_batch(
platform: PlatformConnector,
cost_type_ids: List[int],
use_cost_code_in_naming: bool,
use_cost_type_code_in_naming: bool,
dependent_field_setting: DependentFieldSetting
):
total_batches = 0
processed_batches = 0
is_errored = False

cost_types_batch = (
CostType.objects.filter(id__in=cost_type_ids)
CostType.objects.filter(**filters)
.values('task_name', 'task_id')
.annotate(
cost_types=JSONBAgg(
Expand All @@ -280,6 +173,8 @@ def process_cost_type_batch(
)
)

logger.info(f'Posting Cost Types | WORKSPACE_ID: {dependent_field_setting.workspace_id} | Existing Cost Code in Fyle COUNT: {len(cost_types_batch)}')

for cost_types in cost_types_batch:
payload = []
cost_code_name = prepend_code_to_name(use_cost_code_in_naming, cost_types['task_name'], cost_types['task_id'])
Expand All @@ -305,7 +200,18 @@ def process_cost_type_batch(
is_errored = True
logger.error(f'Exception while posting dependent cost type | Error: {exception} | Payload: {payload}')

return total_batches, processed_batches, is_errored
if is_errored or import_log.status != 'IN_PROGRESS':
import_log.status = 'PARTIALLY_FAILED'
else:
import_log.status = 'COMPLETE'
import_log.error_log = []
import_log.last_successful_run_at = last_successful_run_at

import_log.total_batches_count = total_batches
import_log.processed_batches_count = processed_batches
import_log.save()

return is_errored


def post_dependent_expense_field_values(workspace_id: int, dependent_field_setting: DependentFieldSetting, platform: PlatformConnector = None, cost_code_import_log: ImportLog = None, cost_type_import_log: ImportLog = None):
Expand All @@ -318,9 +224,12 @@ def post_dependent_expense_field_values(workspace_id: int, dependent_field_setti

if dependent_field_setting.last_successful_import_at:
filters['updated_at__gte'] = dependent_field_setting.last_successful_import_at
else:
filters['is_imported'] = False

posted_cost_types, is_cost_code_errored = post_dependent_cost_code(cost_code_import_log, dependent_field_setting, platform, filters)
if posted_cost_types:
filters['is_imported'] = False
filters['task_name__in'] = list(set(posted_cost_types))

if cost_code_import_log.status in ['FAILED', 'FATAL']:
Expand Down
9 changes: 6 additions & 3 deletions apps/sage_intacct/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -216,7 +216,8 @@ def sync_departments(self):
'display_name': 'department',
'value': department['TITLE'],
'destination_id': department['DEPARTMENTID'],
'active': True
'active': True,
'code': department['DEPARTMENTID']
})

DestinationAttribute.bulk_create_or_update_destination_attributes(
Expand Down Expand Up @@ -260,7 +261,8 @@ def sync_expense_types(self):
'detail': {
'gl_account_no': expense_type['GLACCOUNTNO'],
'gl_account_title': expense_type['GLACCOUNTTITLE']
}
},
'code': expense_type['ACCOUNTLABEL']
})

DestinationAttribute.bulk_create_or_update_destination_attributes(
Expand Down Expand Up @@ -381,7 +383,8 @@ def sync_projects(self):
'value': project['NAME'],
'destination_id': project['PROJECTID'],
'active': project['STATUS'] == 'active',
'detail': detail
'detail': detail,
'code': project['PROJECTID']
})

DestinationAttribute.bulk_create_or_update_destination_attributes(
Expand Down
10 changes: 4 additions & 6 deletions tests/test_sageintacct/test_dependent_fields.py
Original file line number Diff line number Diff line change
Expand Up @@ -70,11 +70,10 @@ def test_post_dependent_cost_type(mocker, db, create_cost_type, create_dependent

cost_types = CostType.objects.filter(workspace_id=workspace_id, is_imported=True).update(is_imported=False)

mocker.patch('apps.sage_intacct.dependent_fields.process_cost_type_batch', side_effect=Exception('Something went wrong'))
mocker.patch('apps.sage_intacct.dependent_fields.post_dependent_cost_code', side_effect=Exception('Something went wrong'))
post_dependent_cost_type(import_log, create_dependent_field_setting, platform, {'workspace_id': 1})

assert import_log.status == 'FATAL'
assert import_log.error_log['message'] == 'Something went wrong'
assert import_log.status == 'PARTIALLY_FAILED'


def test_post_dependent_cost_code(mocker, db, create_cost_type, create_dependent_field_setting):
Expand Down Expand Up @@ -109,11 +108,10 @@ def test_post_dependent_cost_code(mocker, db, create_cost_type, create_dependent
import_log.status = 'IN_PROGRESS'
import_log.save()

mocker.patch('apps.sage_intacct.dependent_fields.process_cost_code_batch', side_effect=Exception('Something went wrong'))
mocker.patch('apps.sage_intacct.dependent_fields.sync_sage_intacct_attributes', side_effect=Exception('Something went wrong'))
post_dependent_cost_code(import_log, create_dependent_field_setting, platform, {'workspace_id': 1})

assert import_log.status == 'FATAL'
assert import_log.error_log['message'] == 'Something went wrong'
assert import_log.status == 'PARTIALLY_FAILED'


def test_post_dependent_expense_field_values(db, mocker, create_cost_type, create_dependent_field_setting):
Expand Down

0 comments on commit 428eabd

Please sign in to comment.