-
Notifications
You must be signed in to change notification settings - Fork 3
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
- Loading branch information
Sid Mohan
authored and
Sid Mohan
committed
Feb 23, 2024
1 parent
3dea5e1
commit f5dfaa4
Showing
21 changed files
with
1,914 additions
and
721 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Large diffs are not rendered by default.
Oops, something went wrong.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -1,14 +1,8 @@ | ||
# datafog-python/src/datafog/__init__.py | ||
from posthog import Posthog | ||
|
||
from .__about__ import __version__ | ||
from .pii_tools import presidio | ||
|
||
posthog = Posthog( | ||
"phc_v6vMICyVCGoYZ2s2xUWB4qoTPoMNFGv2u1q0KnBpaIb", host="https://app.posthog.com" | ||
) | ||
from .pii_tools import PresidioEngine | ||
|
||
__all__ = [ | ||
"__version__", | ||
"presidio", | ||
"PresidioEngine", | ||
] |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,66 @@ | ||
from presidio_analyzer import AnalyzerEngine, RecognizerRegistry | ||
from presidio_analyzer.nlp_engine import NlpEngineProvider | ||
|
||
from .analyzer import CustomSpacyRecognizer | ||
|
||
|
||
# Helper methods | ||
def analyzer_engine(): | ||
"""Return AnalyzerEngine.""" | ||
|
||
spacy_recognizer = CustomSpacyRecognizer() | ||
configuration = { | ||
"nlp_engine_name": "spacy", | ||
"models": [{"lang_code": "en", "model_name": "en_spacy_pii_fast"}], | ||
} | ||
|
||
# Create NLP engine based on configuration | ||
provider = NlpEngineProvider(nlp_configuration=configuration) | ||
nlp_engine = provider.create_engine() | ||
|
||
registry = RecognizerRegistry() | ||
|
||
# add rule-based recognizers | ||
registry.load_predefined_recognizers(nlp_engine=nlp_engine) | ||
registry.add_recognizer(spacy_recognizer) | ||
|
||
# remove the nlp engine we passed, to use custom label mappings | ||
registry.remove_recognizer("SpacyRecognizer") | ||
|
||
analyzer = AnalyzerEngine( | ||
nlp_engine=nlp_engine, registry=registry, supported_languages=["en"] | ||
) | ||
|
||
return analyzer | ||
|
||
|
||
def annotate(text, analysis_results): | ||
tokens = [] | ||
# sort by start index | ||
results = sorted(analysis_results, key=lambda x: x.start) | ||
for i, res in enumerate(results): | ||
if i == 0: | ||
tokens.append(text[: res.start]) | ||
|
||
# append entity text and entity type | ||
tokens.append((text[res.start : res.end], res.entity_type)) | ||
|
||
# if another entity coming i.e. we're not at the last results element, add text up to next entity | ||
if i != len(results) - 1: | ||
tokens.append(text[res.end : results[i + 1].start]) | ||
# if no more entities coming, add all remaining text | ||
else: | ||
tokens.append(text[res.end :]) | ||
return tokens | ||
|
||
|
||
def scan(text, **kwargs): | ||
# Set default values for any parameters not provided | ||
kwargs.setdefault("language", "en") | ||
kwargs.setdefault("score_threshold", 0.35) | ||
kwargs.setdefault("nlp_artifacts", None) | ||
|
||
# init analyzer instance | ||
analyzer = analyzer_engine() | ||
# Call the analyze method with the supported parameters | ||
return analyzer.analyze(text, **kwargs) |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,126 @@ | ||
import logging | ||
from typing import List, Optional, Set, Tuple | ||
|
||
from presidio_analyzer import AnalysisExplanation, LocalRecognizer, RecognizerResult | ||
|
||
logger = logging.getLogger("presidio-module") | ||
|
||
|
||
class CustomSpacyRecognizer(LocalRecognizer): | ||
|
||
ENTITIES = [ | ||
"LOCATION", | ||
"PERSON", | ||
"NRP", | ||
"ORGANIZATION", | ||
"DATE_TIME", | ||
] | ||
|
||
DEFAULT_EXPLANATION = "Identified as {} by the PII Detection Model" | ||
|
||
CHECK_LABEL_GROUPS = [ | ||
({"LOCATION"}, {"LOC", "LOCATION", "STREET_ADDRESS", "COORDINATE"}), | ||
({"PERSON"}, {"PER", "PERSON"}), | ||
({"NRP"}, {"NORP", "NRP"}), | ||
({"ORGANIZATION"}, {"ORG"}), | ||
({"DATE_TIME"}, {"DATE_TIME"}), | ||
] | ||
|
||
MODEL_LANGUAGES = { | ||
"en": "beki/en_spacy_pii_fast", | ||
} | ||
|
||
PRESIDIO_EQUIVALENCES = { | ||
"PER": "PERSON", | ||
"LOC": "LOCATION", | ||
"ORG": "ORGANIZATION", | ||
"NROP": "NRP", | ||
"DATE_TIME": "DATE_TIME", | ||
} | ||
|
||
def __init__( | ||
self, | ||
supported_language: str = "en", | ||
supported_entities: Optional[List[str]] = None, | ||
check_label_groups: Optional[Tuple[Set, Set]] = None, | ||
context: Optional[List[str]] = None, | ||
ner_strength: float = 0.85, | ||
): | ||
self.ner_strength = ner_strength | ||
self.check_label_groups = ( | ||
check_label_groups if check_label_groups else self.CHECK_LABEL_GROUPS | ||
) | ||
supported_entities = supported_entities if supported_entities else self.ENTITIES | ||
super().__init__( | ||
supported_entities=supported_entities, | ||
supported_language=supported_language, | ||
) | ||
|
||
def load(self) -> None: | ||
"""Load the model, not used. Model is loaded during initialization.""" | ||
pass | ||
|
||
def get_supported_entities(self) -> List[str]: | ||
""" | ||
Return supported entities by this model. | ||
:return: List of the supported entities. | ||
""" | ||
return self.supported_entities | ||
|
||
def build_spacy_explanation( | ||
self, original_score: float, explanation: str | ||
) -> AnalysisExplanation: | ||
""" | ||
Create explanation for why this result was detected. | ||
:param original_score: Score given by this recognizer | ||
:param explanation: Explanation string | ||
:return: | ||
""" | ||
explanation = AnalysisExplanation( | ||
recognizer=self.__class__.__name__, | ||
original_score=original_score, | ||
textual_explanation=explanation, | ||
) | ||
return explanation | ||
|
||
def analyze(self, text, entities, nlp_artifacts=None): # noqa D102 | ||
results = [] | ||
if not nlp_artifacts: | ||
logger.warning("No NLP artifacts provided for analysis") | ||
return results | ||
|
||
ner_entities = nlp_artifacts.entities | ||
|
||
for entity in entities: | ||
if entity not in self.supported_entities: | ||
continue | ||
for ent in ner_entities: | ||
if not self.__check_label(entity, ent.label_, self.check_label_groups): | ||
continue | ||
textual_explanation = self.DEFAULT_EXPLANATION.format(ent.label_) | ||
explanation = self.build_spacy_explanation( | ||
self.ner_strength, textual_explanation | ||
) | ||
|
||
spacy_result = RecognizerResult( | ||
entity_type=entity, | ||
start=ent.start_char, | ||
end=ent.end_char, | ||
score=self.ner_strength, | ||
analysis_explanation=explanation, | ||
recognition_metadata={ | ||
RecognizerResult.RECOGNIZER_NAME_KEY: self.name | ||
}, | ||
) | ||
|
||
results.append(spacy_result) | ||
|
||
return results | ||
|
||
@staticmethod | ||
def __check_label( | ||
entity: str, label: str, check_label_groups: Tuple[Set, Set] | ||
) -> bool: | ||
return any( | ||
[entity in egrp and label in lgrp for egrp, lgrp in check_label_groups] | ||
) |
Oops, something went wrong.