Skip to content

Commit

Permalink
Moved PySpark -> Optional Dependency
Browse files Browse the repository at this point in the history
  • Loading branch information
Sid Mohan authored and Sid Mohan committed May 18, 2024
1 parent af9eb85 commit 155234f
Show file tree
Hide file tree
Showing 6 changed files with 56 additions and 38 deletions.
2 changes: 1 addition & 1 deletion datafog/__about__.py
Original file line number Diff line number Diff line change
@@ -1 +1 @@
__version__ = "3.2.0"
__version__ = "3.2.1b1"
28 changes: 24 additions & 4 deletions datafog/processing/spark_processing/pyspark_udfs.py
Original file line number Diff line number Diff line change
@@ -1,8 +1,8 @@
import requests
import spacy
from pyspark.sql import SparkSession
from pyspark.sql.functions import udf
from pyspark.sql.types import ArrayType, StringType, StructField, StructType
import importlib
import subprocess
import sys

PII_ANNOTATION_LABELS = ["DATE_TIME", "LOC", "NRP", "ORG", "PER"]
MAXIMAL_STRING_SIZE = 1000000
Expand All @@ -14,6 +14,11 @@ def pii_annotator(text: str, broadcasted_nlp) -> list[list[str]]:
Returns:
list[list[str]]: Values as arrays in order defined in the PII_ANNOTATION_LABELS.
"""
ensure_installed("pyspark")
from pyspark.sql import SparkSession
from pyspark.sql.functions import udf
from pyspark.sql.types import ArrayType, StringType, StructField, StructType

if text:
if len(text) > MAXIMAL_STRING_SIZE:
# Cut the strings for required sizes
Expand All @@ -35,13 +40,28 @@ def pii_annotator(text: str, broadcasted_nlp) -> list[list[str]]:


def broadcast_pii_annotator_udf(
spark_session: SparkSession, spacy_model: str = "en_spacy_pii_fast"
spark_session = None, spacy_model: str = "en_spacy_pii_fast"
):
"""Broadcast PII annotator across Spark cluster and create UDF"""
ensure_installed("pyspark")
from pyspark.sql import SparkSession
from pyspark.sql.functions import udf
from pyspark.sql.types import ArrayType, StringType, StructField, StructType
if not spark_session:
spark_session = SparkSession.builder.getOrCreate()
broadcasted_nlp = spark_session.sparkContext.broadcast(spacy.load(spacy_model))

pii_annotation_udf = udf(
lambda text: pii_annotator(text, broadcasted_nlp),
ArrayType(ArrayType(StringType())),
)
return pii_annotation_udf


def ensure_installed(self, package_name):
try:
importlib.import_module(package_name)
except ImportError:
subprocess.check_call(
[sys.executable, "-m", "pip", "install", package_name]
)
56 changes: 27 additions & 29 deletions datafog/services/spark_service.py
Original file line number Diff line number Diff line change
@@ -1,38 +1,36 @@
import json
from typing import Any, List
import importlib
import subprocess
import sys

from pyspark.sql import DataFrame, SparkSession
from pyspark.sql.functions import udf
from pyspark.sql.types import ArrayType, StringType


class SparkService:
def __init__(self):
self.spark = self.create_spark_session()
self.ensure_installed("pyspark")

from pyspark.sql import DataFrame, SparkSession
from pyspark.sql.functions import udf
from pyspark.sql.types import ArrayType, StringType
self.SparkSession = SparkSession
self.DataFrame = DataFrame
self.udf = udf
self.ArrayType = ArrayType
self.StringType = StringType

def create_spark_session(self):
return self.SparkSession.builder.appName("datafog").getOrCreate()

def read_json(self, path: str) -> List[dict]:
return self.spark.read.json(path).collect()

def ensure_installed(self, package_name):
try:
importlib.import_module(package_name)
except ImportError:
subprocess.check_call(
[sys.executable, "-m", "pip", "install", package_name]
)

@staticmethod
def create_spark_session() -> SparkSession:
"""Create and configure a Spark session."""
return (
SparkSession.builder.appName("DataFog")
.config("spark.driver.memory", "8g")
.config("spark.executor.memory", "8g")
.getOrCreate()
)

def create_dataframe(self, data: List[tuple], schema: List[str]) -> DataFrame:
"""Convert a list of tuples to a Spark DataFrame with the specified schema."""
return self.spark.createDataFrame(data, schema)

def add_pii_annotations(self, df: DataFrame, annotation_udf: udf) -> DataFrame:
"""Add a new column to DataFrame with PII annotations."""
return df.withColumn("pii_annotations", annotation_udf(df["text"]))

def write_to_json(self, data: Any, output_path: str):
"""Write data to a JSON file."""
with open(output_path, "w") as f:
json.dump(data, f)

def stop(self):
"""Stop the Spark session."""
self.spark.stop()
2 changes: 1 addition & 1 deletion requirements.txt
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ en_spacy_pii_fast==0.0.0
# transformers==4.40.1
spacy==3.4.4
# torch==2.2.2
pyspark==3.4.1
# pyspark==3.4.1
pytest==8.0.2
Requests==2.31.0
setuptools==58.1.0
Expand Down
4 changes: 2 additions & 2 deletions setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@


def __version__():
return "3.2.0"
return "3.2.1b1"


project_urls = {
Expand Down Expand Up @@ -34,7 +34,7 @@ def __version__():
"en_spacy_pii_fast==0.0.0",
# "transformers==4.40.1",
# "torch==2.2.2",
"pyspark==3.4.1",
# "pyspark==3.4.1",
"pydantic==1.10.8",
"Pillow",
"sentencepiece",
Expand Down
2 changes: 1 addition & 1 deletion tox.ini
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
[tox]
envlist = py310, py311
envlist = py310
isolated_build = True

[testenv]
Expand Down

0 comments on commit 155234f

Please sign in to comment.