Skip to content

Commit

Permalink
Add async method for external column names scanner (#3)
Browse files Browse the repository at this point in the history
  • Loading branch information
fvaleye committed Nov 8, 2021
1 parent 3d50744 commit c5f7e32
Show file tree
Hide file tree
Showing 5 changed files with 245 additions and 229 deletions.
29 changes: 26 additions & 3 deletions python/docs/source/usage.rst
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ Usage
Metadata Guardian
-----------------

Local Source, one result:
Scan the column names of a local source:

>>> from metadata_guardian import DataRules, ColumnScanner, AvailableCategory
>>> from metadata_guardian.source import ParquetSource
Expand All @@ -15,7 +15,7 @@ Local Source, one result:
>>> report = column_scanner.scan_local(source)
>>> report.to_console()

Scan column names of an external Source, one result:
Scan the column names of a external source on a table:

>>> from metadata_guardian import DataRules, ColumnScanner, AvailableCategory
>>> from metadata_guardian.source.external.snowflake_source import SnowflakeSource
Expand All @@ -26,7 +26,30 @@ Scan column names of an external Source, one result:
>>> report = column_scanner.scan_external(source, database_name="database_name", table_name="table_name", include_comment=True)
>>> report.to_console()

Scan column names of a local source, multiple results:
Scan the column names of a external source on database:

>>> from metadata_guardian import DataRules, ColumnScanner, AvailableCategory
>>> from metadata_guardian.source.external.snowflake_source import SnowflakeSource
>>>
>>> data_rules = DataRules.from_available_category(category=AvailableCategory.PII)
>>> source = SnowflakeSource(sf_account="account", sf_user="sf_user", sf_password="sf_password", warehouse="warehouse", schema_name="schema_name")
>>> column_scanner = ColumnScanner(data_rules=data_rules)
>>> report = column_scanner.scan_external(source, database_name="database_name", include_comment=True)
>>> report.to_console()

Scan the column names of an external source for a database asynchronously with asyncio:

>>> import asyncio
>>> from metadata_guardian import DataRules, ColumnScanner, AvailableCategory
>>> from metadata_guardian.source.external.snowflake_source import SnowflakeSource
>>>
>>> data_rules = DataRules.from_available_category(category=AvailableCategory.PII)
>>> source = SnowflakeSource(sf_account="account", sf_user="sf_user", sf_password="sf_password", warehouse="warehouse", schema_name="schema_name")
>>> column_scanner = ColumnScanner(data_rules=data_rules)
>>> report = asyncio.run(column_scanner.scan_external(source, database_name="database_name", include_comment=True))
>>> report.to_console()

Scan the column names of a local source:

>>> from metadata_guardian import DataRules, ColumnScanner, AvailableCategory, MetadataGuardianReport
>>> from metadata_guardian.source import ParquetSource
Expand Down
100 changes: 91 additions & 9 deletions python/metadata_guardian/scanner.py
Original file line number Diff line number Diff line change
@@ -1,8 +1,10 @@
import asyncio
from abc import ABC, abstractmethod
from dataclasses import dataclass
from typing import Optional

from loguru import logger
from pyarrow import cpu_count

from .data_rules import DataRules
from .report import MetadataGuardianReport, ReportResults
Expand All @@ -15,9 +17,9 @@ class Scanner(ABC):
@abstractmethod
def scan_local(self, source: LocalMetadataSource) -> MetadataGuardianReport:
"""
Scan the column names from local source.
Scan the column names from the local source.
:param source: the LocalMetadataSource to scan
:return: Metadata Guardian report
:return: a Metadata Guardian report
"""
pass

Expand All @@ -30,12 +32,32 @@ def scan_external(
include_comment: bool = False,
) -> MetadataGuardianReport:
"""
Scan the column names from external source.
Scan the column names from the external source.
:param source: the ExternalMetadataSource to scan
:param database_name: the name of the database
:param table_name: the name of the table
:param include_comment: the scan include the comment section
:return: Metadata Guardian report
:return: a Metadata Guardian report
"""
pass

@abstractmethod
async def scan_external_async(
self,
source: ExternalMetadataSource,
database_name: str,
tasks_limit: int,
table_name: Optional[str] = None,
include_comment: bool = False,
) -> MetadataGuardianReport:
"""
Scan the column names from the external source asynchronously.
:param source: the ExternalMetadataSource to scan
:param database_name: the name of the database
:param tasks_limit: the limit of the tasks to run in parallel
:param table_name: the name of the table
:param include_comment: the scan include the comment section
:return: a Metadata Guardian report
"""
pass

Expand All @@ -48,9 +70,9 @@ class ColumnScanner(Scanner):

def scan_local(self, source: LocalMetadataSource) -> MetadataGuardianReport:
"""
Scan the column names from source.
Scan the column names from the local source.
:param source: the MetadataSource to scan
:return: Metadata Guardian report
:return: a Metadata Guardian report
"""
return MetadataGuardianReport(
report_results=[
Expand All @@ -76,9 +98,12 @@ def scan_external(
:param database_name: the name of the database
:param table_name: the name of the table
:param include_comment: the scan include the comment section
:return: Metadata Guardian report
:return: a Metadata Guardian report
"""
if table_name:
logger.debug(
f"Get the column names list from the table {database_name}.{table_name}"
)
report = MetadataGuardianReport(
report_results=[
ReportResults(
Expand All @@ -95,12 +120,14 @@ def scan_external(
)
else:
report = MetadataGuardianReport()
table_names_list = source.get_table_names_list(database_name=database_name)
logger.debug(
f"Get the table names list from the database {database_name} for {source.type}"
)
table_names_list = source.get_table_names_list(database_name=database_name)
for table_name in table_names_list:
logger.debug(f"Get the column names list from the table {table_name}")
logger.debug(
f"Get the column names list from the table {database_name}.{table_name}"
)
report.append(
MetadataGuardianReport(
report_results=[
Expand All @@ -119,6 +146,61 @@ def scan_external(
)
return report

async def scan_external_async(
self,
source: ExternalMetadataSource,
database_name: str,
tasks_limit: int = cpu_count(),
table_name: Optional[str] = None,
include_comment: bool = False,
) -> MetadataGuardianReport:
"""
Scan the column names from the external source using a table name or a database name.
Note that it can generate multiple concurrent calls to your metadata source.
:param source: the ExternalMetadataSource to scan
:param database_name: the name of the database
:param tasks_limit: the limit of the tasks to run in parallel
:param table_name: the name of the table
:param include_comment: the scan include the comment section
:return: a Metadata Guardian report
"""
semaphore = asyncio.Semaphore(tasks_limit)

async def async_validate_words(table_name: str) -> ReportResults:
async with semaphore:
logger.debug(
f"Get the column names list from the table {database_name}.{table_name}"
)
loop = asyncio.get_event_loop()
words = await loop.run_in_executor(
None,
source.get_column_names,
database_name,
table_name,
include_comment,
)
return ReportResults(
source=f"{database_name}.{table_name}",
results=self.data_rules.validate_words(words=words),
)

if table_name:
tasks = [async_validate_words(table_name=table_name)]
else:
table_names_list = source.get_table_names_list(database_name=database_name)
logger.debug(
f"Get the table names list from the database {database_name} for {source.type}"
)

tasks = [
async_validate_words(table_name=table_name)
for table_name in table_names_list
]
report_results = await asyncio.gather(*tasks)
report = MetadataGuardianReport(report_results=report_results)
return report


@dataclass
class ContentFileScanner:
Expand Down
Loading

0 comments on commit c5f7e32

Please sign in to comment.