Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Resolve Errors Caused by Default Record Lookups in Extraction #3813

Closed
wants to merge 4 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 5 additions & 3 deletions cumulusci/core/datasets.py
Original file line number Diff line number Diff line change
Expand Up @@ -132,6 +132,7 @@ def temp_extract_mapping(
schema,
extraction_definition: T.Optional[Path],
opt_in_only: T.Sequence[str],
loading_rules: T.Sequence[SObjectRuleDeclaration] = (),
):
with TemporaryDirectory() as t:
t = Path(t)
Expand All @@ -149,9 +150,10 @@ def temp_extract_mapping(
with extract_mapping.open("w") as f:
yaml.safe_dump(
create_extract_mapping_file_from_declarations(
list(decls.values()), schema, opt_in_only
list(decls.values()), schema, opt_in_only, loading_rules
),
f,
sort_keys=False,
)
yield extract_mapping, decls

Expand All @@ -165,8 +167,9 @@ def extract(
):
options = options or {}
logger = logger or DEFAULT_LOGGER
loading_rules = self._parse_loading_rules_file(loading_rules_file)
with self.temp_extract_mapping(
self.schema, extraction_definition, opt_in_only
self.schema, extraction_definition, opt_in_only, loading_rules
) as (
extract_mapping,
decls,
Expand All @@ -179,7 +182,6 @@ def extract(
mapping=str(extract_mapping),
)
task()
loading_rules = self._parse_loading_rules_file(loading_rules_file)

self._save_load_mapping(list(decls.values()), opt_in_only, loading_rules)
return task.return_values
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ interactions:
\n } ]\n}"
- request:
method: GET
uri: https://orgname.my.salesforce.com/services/data/vxx.0/query/?q=SELECT+Id%2C+AccountNumber%2C+AccountSource%2C+AnnualRevenue%2C+BillingCity%2C+BillingCountry%2C+BillingGeocodeAccuracy%2C+BillingLatitude%2C+BillingLongitude%2C+BillingPostalCode%2C+BillingState%2C+BillingStreet%2C+CleanStatus%2C+DunsNumber%2C+Fax%2C+Industry%2C+Jigsaw%2C+NaicsCode%2C+NaicsDesc%2C+Name%2C+NumberOfEmployees%2C+Ownership%2C+Phone%2C+Rating%2C+RecordTypeId%2C+ShippingCity%2C+ShippingCountry%2C+ShippingGeocodeAccuracy%2C+ShippingLatitude%2C+ShippingLongitude%2C+ShippingPostalCode%2C+ShippingState%2C+ShippingStreet%2C+Sic%2C+SicDesc%2C+Site%2C+TickerSymbol%2C+Tradestyle%2C+Type%2C+Website%2C+YearStarted%2C+ParentId+FROM+Account+WHERE+Name+%21%3D+%27Sample+Account+for+Entitlements%27
uri: https://orgname.my.salesforce.com/services/data/vxx.0/query/?q=SELECT+Id%2C+AccountNumber%2C+AccountSource%2C+AnnualRevenue%2C+BillingCity%2C+BillingCountry%2C+BillingGeocodeAccuracy%2C+BillingLatitude%2C+BillingLongitude%2C+BillingPostalCode%2C+BillingState%2C+BillingStreet%2C+CleanStatus%2C+DunsNumber%2C+Fax%2C+Industry%2C+Jigsaw%2C+NaicsCode%2C+NaicsDesc%2C+Name%2C+NumberOfEmployees%2C+Ownership%2C+Phone%2C+Rating%2C+ShippingCity%2C+ShippingCountry%2C+ShippingGeocodeAccuracy%2C+ShippingLatitude%2C+ShippingLongitude%2C+ShippingPostalCode%2C+ShippingState%2C+ShippingStreet%2C+Sic%2C+SicDesc%2C+Site%2C+TickerSymbol%2C+Tradestyle%2C+Type%2C+Website%2C+YearStarted%2C+RecordTypeId%2C+ParentId+FROM+Account+WHERE+Name+%21%3D+%27Sample+Account+for+Entitlements%27
body: null
headers: *id001
response:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ interactions:
\n } ]\n}"
- request:
method: GET
uri: https://orgname.my.salesforce.com/services/data/vxx.0/query/?q=SELECT+Id%2C+AccountNumber%2C+AccountSource%2C+AnnualRevenue%2C+BillingCity%2C+BillingCountry%2C+BillingGeocodeAccuracy%2C+BillingLatitude%2C+BillingLongitude%2C+BillingPostalCode%2C+BillingState%2C+BillingStreet%2C+CleanStatus%2C+Description%2C+DunsNumber%2C+Fax%2C+Industry%2C+Jigsaw%2C+NaicsCode%2C+NaicsDesc%2C+Name%2C+NumberOfEmployees%2C+Ownership%2C+Phone%2C+Rating%2C+RecordTypeId%2C+ShippingCity%2C+ShippingCountry%2C+ShippingGeocodeAccuracy%2C+ShippingLatitude%2C+ShippingLongitude%2C+ShippingPostalCode%2C+ShippingState%2C+ShippingStreet%2C+Sic%2C+SicDesc%2C+Site%2C+TickerSymbol%2C+Tradestyle%2C+Type%2C+Website%2C+YearStarted%2C+ParentId+FROM+Account+WHERE+Name+%21%3D+%27Sample+Account+for+Entitlements%27
uri: https://orgname.my.salesforce.com/services/data/vxx.0/query/?q=SELECT+Id%2C+AccountNumber%2C+AccountSource%2C+AnnualRevenue%2C+BillingCity%2C+BillingCountry%2C+BillingGeocodeAccuracy%2C+BillingLatitude%2C+BillingLongitude%2C+BillingPostalCode%2C+BillingState%2C+BillingStreet%2C+CleanStatus%2C+Description%2C+DunsNumber%2C+Fax%2C+Industry%2C+Jigsaw%2C+NaicsCode%2C+NaicsDesc%2C+Name%2C+NumberOfEmployees%2C+Ownership%2C+Phone%2C+Rating%2C+ShippingCity%2C+ShippingCountry%2C+ShippingGeocodeAccuracy%2C+ShippingLatitude%2C+ShippingLongitude%2C+ShippingPostalCode%2C+ShippingState%2C+ShippingStreet%2C+Sic%2C+SicDesc%2C+Site%2C+TickerSymbol%2C+Tradestyle%2C+Type%2C+Website%2C+YearStarted%2C+RecordTypeId%2C+ParentId+FROM+Account+WHERE+Name+%21%3D+%27Sample+Account+for+Entitlements%27
body: null
headers: *id002
response:
Expand Down
70 changes: 54 additions & 16 deletions cumulusci/tasks/bulkdata/extract.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,6 @@

from cumulusci.core.exceptions import (
BulkDataException,
ConfigError,
CumulusCIException,
TaskOptionsError,
)
Expand Down Expand Up @@ -76,6 +75,7 @@ def _init_options(self, kwargs):
self.options.get("drop_missing_schema") or False
)
self._id_generators = {}
self.has_logged_missing_lookups_warning = False

def _run_task(self):
self._init_mapping()
Expand Down Expand Up @@ -336,42 +336,80 @@ def throw(string): # pragma: no cover
key_attr = getattr(model, key_field, None) or throw(
f"key_field {key_field} not found in table {mapping.table}"
)

# Keep track of total mapping operations
total_mapping_operations = 0

valid_sf_ids = set() # Initialize an empty set to store valid SF IDs
valid_local_ids = set() # Initialize an empty set to store local IDs
for lookup_mapping in lookup_mappings:
lookup_model = self.models.get(lookup_mapping.get_sf_id_table())
# Get the sf ids corresponding to the lookup
sf_ids_tuples = (
self.session.query(lookup_model.sf_id)
.filter(lookup_model.sf_id.isnot(None))
.distinct()
) # Get unique SF IDs
valid_sf_ids.update(sf_id[0] for sf_id in sf_ids_tuples)
# Get the local ids corresponding to Salesforce Ids
local_ids_tuples = (
self.session.query(lookup_model.id)
.filter(lookup_model.sf_id.in_(valid_sf_ids))
.distinct()
) # Get unique local IDs
valid_local_ids.update(local_id[0] for local_id in local_ids_tuples)
try:
update_query = (
(
self.session.query(model)
.filter(key_attr.isnot(None), key_attr == lookup_model.sf_id)
.update({key_attr: lookup_model.id}, synchronize_session=False)
)
total_mapping_operations += update_query.rowcount
except NotImplementedError:
# Some databases such as sqlite don't support multitable update
mappings = []
for row, lookup_id in self.session.query(
model, lookup_model.id
).join(lookup_model, key_attr == lookup_model.sf_id):
mappings.append({"id": row.id, key_field: lookup_id})
total_mapping_operations += len(mappings)
self.session.bulk_update_mappings(model, mappings)
# Count the total number of rows excluding those with no entry for that field
total_rows = (
# Get the rows which will be deleted
rows_to_delete = (
self.session.query(model)
.filter(
key_attr.isnot(None), # Ensure key_attr is not None
key_attr.isnot(""), # Ensure key_attr is not an empty string
key_attr.isnot(None),
key_attr.isnot(""),
~key_attr.in_(valid_sf_ids),
~key_attr.in_(valid_local_ids),
)
.count()
.all()
)

if total_mapping_operations != total_rows:
raise ConfigError(
f"Total mapping operations ({total_mapping_operations}) do not match total non-empty rows ({total_rows}) for lookup_key: {lookup_key}. Mention all related tables for lookup: {lookup_key}"
# Log a warning before deleting rows, but only once
if rows_to_delete and not self.has_logged_missing_lookups_warning:
self.logger.warning(
"Some rows were skipped due to referencing default records or missing lookup tables. "
"Ensure data completeness and include all relevant tables for extraction."
)
self.has_logged_missing_lookups_warning = (
True # Set the flag so the warning isn't logged again
)

# Log details of rows to be deleted
if rows_to_delete:
self.logger.warning(
f"{len(rows_to_delete)} rows from '{mapping.table}' where lookup '{lookup_key}' was not found were not extracted."
)
# Delete the rows from get_sf_id_table()
delete_ids = [row.id for row in rows_to_delete]
sf_id_table_model = self.models.get(mapping.get_sf_id_table())
self.session.query(sf_id_table_model).filter(
sf_id_table_model.id.in_(delete_ids)
).delete(synchronize_session=False)

# Delete the rows
(
self.session.query(model)
.filter(
model.id.in_([row.id for row in rows_to_delete])
) # Filter by ID
.delete(synchronize_session=False)
)
self.session.commit()

def _create_tables(self):
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@


class SObjDependency(T.NamedTuple):
table_name_from: str
table_name_from: T.Union[str, None]
table_names_to: T.Union[str, T.Tuple[str, ...]]
field_name: str
priority: bool = False
Expand Down
Original file line number Diff line number Diff line change
@@ -1,45 +1,52 @@
# pyright: strict

import typing as T
from itertools import chain

from snowfakery.cci_mapping_files.declaration_parser import SObjectRuleDeclaration

from cumulusci.salesforce_api.org_schema import Schema
from cumulusci.tasks.bulkdata.generate_mapping_utils.generate_mapping_from_declarations import (
SimplifiedExtractDeclarationWithLookups,
classify_and_filter_lookups,
discover_dependendencies,
)
from cumulusci.tasks.bulkdata.mapping_parser import MappingStep

from ..extract_dataset_utils.synthesize_extract_declarations import (
ExtractDeclaration,
flatten_declarations,
)


def _mapping_decl_for_extract_decl(
decl: SimplifiedExtractDeclarationWithLookups,
):
"""Make a CCI extract mapping step from a SimplifiedExtractDeclarationWithLookups"""
lookups = {lookup: {"table": tables} for lookup, tables in decl.lookups.items()}
mapping_dict: dict[str, T.Any] = {
"sf_object": decl.sf_object,
}
if decl.where:
mapping_dict["soql_filter"] = decl.where
if decl.api:
mapping_dict["api"] = decl.api.value
mapping_dict["fields"] = decl.fields
if lookups:
mapping_dict["lookups"] = lookups

return (f"Extract {decl.sf_object}", mapping_dict)
from .load_mapping_file_generator import generate_load_mapping_file


def create_extract_mapping_file_from_declarations(
decls: T.List[ExtractDeclaration], schema: Schema, opt_in_only: T.Sequence[str]
decls: T.List[ExtractDeclaration],
schema: Schema,
opt_in_only: T.Sequence[str],
loading_rules: T.Sequence[SObjectRuleDeclaration] = (),
):
"""Create a mapping file sufficient for driving an extract process
from an extract declarations file."""
assert decls is not None
simplified_decls = flatten_declarations(decls, schema, opt_in_only)
simplified_decls = classify_and_filter_lookups(simplified_decls, schema)
mappings = [_mapping_decl_for_extract_decl(decl) for decl in simplified_decls]
return dict(pair for pair in mappings if pair)
simplified_decls_w_lookups = classify_and_filter_lookups(simplified_decls, schema)
intertable_dependencies = discover_dependendencies(simplified_decls_w_lookups)

def _mapping_step(decl: SimplifiedExtractDeclarationWithLookups):
fields = tuple(chain(decl.fields, decl.lookups.keys()))
return MappingStep(
sf_object=decl.sf_object,
fields=dict(zip(fields, fields)),
soql_filter=decl.where if decl.where else None,
api=decl.api
# lookups=lookups, # lookups can be re-created later, for simplicity
)

mapping_steps = [_mapping_step(decl) for decl in simplified_decls_w_lookups]

# To generate mapping file with the correct order
mappings = generate_load_mapping_file(
mapping_steps, intertable_dependencies, loading_rules
)
return mappings
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ def create_load_mapping_file_from_extract_declarations(
"""Create a mapping file from Extract declarations"""
simplified_decls = flatten_declarations(decls, schema, opt_in_only) # FIXME
simplified_decls_w_lookups = classify_and_filter_lookups(simplified_decls, schema)
intertable_dependencies = _discover_dependendencies(simplified_decls_w_lookups)
intertable_dependencies = discover_dependendencies(simplified_decls_w_lookups)

def _mapping_step(decl):
fields = tuple(chain(decl.fields, decl.lookups.keys()))
Expand All @@ -48,7 +48,7 @@ def _mapping_step(decl):
return mappings


def _discover_dependendencies(simplified_decls: T.Sequence):
def discover_dependendencies(simplified_decls: T.Sequence[SimplifiedExtractDeclarationWithLookups]) -> OrderedSet:
"""Look at all of the lookups in a set of declarations to determine
what depends on what"""
intertable_dependencies = OrderedSet()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,9 +30,9 @@ def test_simple_generate_mapping_from_declarations(self, org_config):
) as schema:
mf = create_extract_mapping_file_from_declarations(declarations, schema, ())
assert mf == {
"Extract Account": {
"api": "smart",
"Insert Account": {
"sf_object": "Account",
"table": "Account",
"fields": ["Name", "Description"],
"soql_filter": "Name != 'Sample Account for " "Entitlements'",
}
Expand All @@ -57,19 +57,20 @@ def test_generate_mapping_from_declarations__lookups(self, org_config):
include_counts=True,
) as schema:
mf = create_extract_mapping_file_from_declarations(declarations, schema, ())
print(mf)
assert mf == {
"Extract Account": {
"api": "smart",
"Insert Account": {
"sf_object": "Account",
"table": "Account",
"soql_filter": "Name != 'Sample Account for Entitlements'",
"fields": ["Name", "Description"],
},
"Extract Contact": {
"api": "smart",
"Insert Contact": {
"sf_object": "Contact",
"table": "Contact",
"fields": ["LastName"],
"lookups": {"AccountId": {"table": ("Account",)}},
"lookups": {
"AccountId": {"table": ["Account"], "key_field": "AccountId"}
},
},
}

Expand Down Expand Up @@ -99,33 +100,36 @@ def test_generate_mapping_from_declarations__polymorphic_lookups(self, org_confi
mf = create_extract_mapping_file_from_declarations(declarations, schema, ())
print(mf)
assert mf == {
"Extract Account": {
"api": "smart",
"Insert Account": {
"sf_object": "Account",
"table": "Account",
"soql_filter": "Name != 'Sample Account for Entitlements'",
"fields": ["Name", "Description"],
},
"Extract Contact": {
"api": "smart",
"Insert Contact": {
"sf_object": "Contact",
"table": "Contact",
"fields": ["LastName"],
"lookups": {"AccountId": {"table": ("Account",)}},
"lookups": {
"AccountId": {"table": ["Account"], "key_field": "AccountId"}
},
},
"Extract Lead": {
"api": "smart",
"Insert Lead": {
"sf_object": "Lead",
"table": "Lead",
"fields": ["LastName", "Company"],
},
"Extract Event": {
"api": "smart",
"Insert Event": {
"sf_object": "Event",
"table": "Event",
"fields": ["Subject"],
"lookups": {
"WhoId": {
"table": (
"table": [
"Contact",
"Lead",
)
],
"key_field": "WhoId",
}
},
},
Expand Down
2 changes: 1 addition & 1 deletion cumulusci/tasks/bulkdata/mapping_parser.py
Original file line number Diff line number Diff line change
Expand Up @@ -91,7 +91,7 @@ class BulkMode(StrEnum):

class MappingStep(CCIDictModel):
"Step in a load or extract process"
sf_object: str
sf_object: T.Union[str, None]
table: Optional[str] = None
fields_: Dict[str, str] = Field({}, alias="fields")
lookups: Dict[str, MappingLookup] = {}
Expand Down
Loading
Loading