Skip to content

Commit

Permalink
Set Spark postgres credentials via variable
Browse files Browse the repository at this point in the history
  • Loading branch information
hfxbse committed Nov 19, 2024
1 parent 649b4e7 commit 2889874
Show file tree
Hide file tree
Showing 5 changed files with 105 additions and 80 deletions.
5 changes: 3 additions & 2 deletions airflow.Dockerfile
Original file line number Diff line number Diff line change
Expand Up @@ -11,8 +11,9 @@ RUN wget https://jdbc.postgresql.org/download/postgresql-42.7.4.jar -P /home/ai
RUN mv /tmp/upstream/exercises/winter_semester_2024-2025/05_airflow/plugins /home/airflow/airflow
RUN mv /tmp/upstream/exercises/winter_semester_2024-2025/05_airflow/dags /home/airflow/airflow
RUN mv /tmp/upstream/exercises/winter_semester_2024-2025/05_airflow/python /home/airflow/airflow
RUN sed -i 's/hadoop:/${HADOOP_HOST}:/g' /home/airflow/hadoop/etc/hadoop/core-site.xml
RUN sed -i 's/hadoop:/${HADOOP_HOST}:/g' /home/airflow/hadoop/etc/hadoop/yarn-site.xml
# Pyarrow fails to interperet placeholder, subsitute instead
RUN sed -i "34 i sed -i 's/hadoop:/$HADOOP_HOST:/g' /home/airflow/hadoop/etc/hadoop/core-site.xml" /startup.sh
RUN sed -i "34 i sed -i 's/hadoop:/$HADOOP_HOST:/g' /home/airflow/hadoop/etc/hadoop/yarn-site.xml" /startup.sh
# Setting AIRFLOW__WEBSERVER__BASE_URL did not get applied for unknown reasons, update the config file instead
RUN sed -i "34 i sed -i 's#base_url = http://localhost:8080#base_url = http://localhost:8080/airflow#' /home/airflow/airflow/airflow.cfg" /startup.sh

Expand Down
151 changes: 80 additions & 71 deletions airflow/dags/cell_coverage.py
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,13 @@
'owner': 'airflow'
}

postgres_connection_args = [
'--postgres-user', Variable.get("POSTGRES_USER", default_var='postgres'),
'--postgres-port', Variable.get("POSTGRES_POST", default_var='5432'),
'--postgres-host', Variable.get("POSTGRES_HOST", default_var="user-db"),
'--postgres-password', Variable.get("POSTGRES_PASSWORD", default_var="password"),
]


def cell_coverage_dag():
return DAG(
Expand Down Expand Up @@ -165,81 +172,83 @@ def initial_data(dag):


with cell_coverage_dag() as dag:
check_for_initial_data = HdfsGetFileOperator(
task_id='check-for-initial-data',
hdfs_conn_id='hdfs',
remote_file=f'{RAW_DIRECTORY_PATH}/cell_towers.csv',
local_file='/dev/null',
)

download_initial_data = DummyOperator(
task_id='download-initial-data',
trigger_rule=TriggerRule.ALL_FAILED,
)

create_tmp_dir = BashOperator(
task_id='create-tmp-dir',
bash_command=f'mkdir -p {TMP_DIRECTORY_PATH}',
)

clear_tmp_dir = BashOperator(
task_id='clear-tmp-dir',
bash_command=f'rm -rf {TMP_DIRECTORY_PATH}/*',
)

download_all_diffs = DummyOperator(
task_id=f'download-remaining-diffs',
trigger_rule=TriggerRule.ALL_SUCCESS,
)
# check_for_initial_data = HdfsGetFileOperator(
# task_id='check-for-initial-data',
# hdfs_conn_id='hdfs',
# remote_file=f'{RAW_DIRECTORY_PATH}/cell_towers.csv',
# local_file='/dev/null',
# )
#
# download_initial_data = DummyOperator(
# task_id='download-initial-data',
# trigger_rule=TriggerRule.ALL_FAILED,
# )
#
# create_tmp_dir = BashOperator(
# task_id='create-tmp-dir',
# bash_command=f'mkdir -p {TMP_DIRECTORY_PATH}',
# )
#
# clear_tmp_dir = BashOperator(
# task_id='clear-tmp-dir',
# bash_command=f'rm -rf {TMP_DIRECTORY_PATH}/*',
# )
#
# download_all_diffs = DummyOperator(
# task_id=f'download-remaining-diffs',
# trigger_rule=TriggerRule.ALL_SUCCESS,
# )

parse_initial_data = SparkSubmitOperator(
task_id='parse-initial-data',
conn_id='spark',
application=f'{SPARK_APPLICATIONS}/init_data.py',
application_args=postgres_connection_args,
verbose=True
)

move_previous = HdfsMoveOperator(
task_id='move-previous',
hdfs_conn_id='hdfs',
path=FINAL_DIRECTORY_PATH,
new_path=TMP_DIRECTORY_PATH,
trigger_rule=TriggerRule.ALL_DONE
)

parse_diffs = SparkSubmitOperator(
task_id='parse-diffs',
conn_id='spark',
application=f'{SPARK_APPLICATIONS}/diffs.py',
verbose=True,
)

delete_left_overs = HdfsDeleteOperator(
task_id='delete-left-overs',
hdfs_conn_id='hdfs',
path=FINAL_DIRECTORY_PATH,
trigger_rule=TriggerRule.ALL_FAILED
)

restore_previous = HdfsMoveOperator(
task_id='restore-previous',
hdfs_conn_id='hdfs',
path=TMP_DIRECTORY_PATH,
new_path=FINAL_DIRECTORY_PATH
)

delete_tmp_dir = HdfsDeleteOperator(
task_id='delete-tmp-previous',
hdfs_conn_id='hdfs',
path=TMP_DIRECTORY_PATH,
)

create_tmp_dir >> clear_tmp_dir >> check_for_initial_data >> [download_initial_data, download_all_diffs]

download_initial_start, download_initial_end = initial_data(dag)
download_initial_data >> download_initial_start
download_initial_end >> parse_initial_data

download_diffs_start, download_diffs_end = download_diffs(dag)
download_all_diffs >> download_diffs_start
download_diffs_end >> delete_tmp_dir >> move_previous >> parse_diffs >> delete_left_overs >> restore_previous
#
# move_previous = HdfsMoveOperator(
# task_id='move-previous',
# hdfs_conn_id='hdfs',
# path=FINAL_DIRECTORY_PATH,
# new_path=TMP_DIRECTORY_PATH,
# trigger_rule=TriggerRule.ALL_DONE
# )
#
# parse_diffs = SparkSubmitOperator(
# task_id='parse-diffs',
# conn_id='spark',
# application=f'{SPARK_APPLICATIONS}/diffs.py',
# application_args=postgres_connection_args,
# verbose=True,
# )
#
# delete_left_overs = HdfsDeleteOperator(
# task_id='delete-left-overs',
# hdfs_conn_id='hdfs',
# path=FINAL_DIRECTORY_PATH,
# trigger_rule=TriggerRule.ALL_FAILED
# )
#
# restore_previous = HdfsMoveOperator(
# task_id='restore-previous',
# hdfs_conn_id='hdfs',
# path=TMP_DIRECTORY_PATH,
# new_path=FINAL_DIRECTORY_PATH
# )
#
# delete_tmp_dir = HdfsDeleteOperator(
# task_id='delete-tmp-previous',
# hdfs_conn_id='hdfs',
# path=TMP_DIRECTORY_PATH,
# )
#
# create_tmp_dir >> clear_tmp_dir >> check_for_initial_data >> [download_initial_data, download_all_diffs]
#
# download_initial_start, download_initial_end = initial_data(dag)
# download_initial_data >> download_initial_start
# download_initial_end >> parse_initial_data
#
# download_diffs_start, download_diffs_end = download_diffs(dag)
# download_all_diffs >> download_diffs_start
# download_diffs_end >> delete_tmp_dir >> move_previous >> parse_diffs >> delete_left_overs >> restore_previous
4 changes: 3 additions & 1 deletion spark/diffs.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,8 @@
DIFF_DIRECTORY_PATH = f'{RAW_DIRECTORY_PATH}/diffs'

if __name__ == "__main__":
db_config = get_database_arguments()

session = spark_session()
date_rows = session.read.format('csv').options(
header='false',
Expand Down Expand Up @@ -34,4 +36,4 @@
unchanged_identifiers = previous.select(previous.identifier).subtract(changes.select(changes.identifier))
unchanged = previous.join(unchanged_identifiers, ['identifier'], how='left_semi')

spark_writer(unchanged.unionByName(changes), override=True)
spark_writer(unchanged.unionByName(changes), postgres_config=db_config)
4 changes: 3 additions & 1 deletion spark/init_data.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@
from spark_io import *

if __name__ == '__main__':
db_config = get_database_arguments()

raw = spark_raw_reader(spark_session()).load(f'{RAW_DIRECTORY_PATH}/cell_towers.csv')
spark_writer(calculate_identifier(raw), override=True)
spark_writer(calculate_identifier(raw), postgres_config=db_config)
21 changes: 16 additions & 5 deletions spark/spark_io.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,5 @@
import argparse

import pyspark
from pyspark.sql.types import StructType, StructField, IntegerType, StringType, DoubleType
from pyspark.sql import SparkSession
Expand Down Expand Up @@ -63,10 +65,19 @@ def final_columns(frame):
return frame.select('identifier', 'radio', 'lat', 'lon', 'range')


def spark_writer(frame, override=False):
def get_database_arguments(parser=argparse.ArgumentParser()):
parser.add_argument("--postgres-user", default='postgres')
parser.add_argument("--postgres-port", default='5432')
parser.add_argument("--postgres-host", required=True)
parser.add_argument("--postgres-password", required=True)

return parser.parse_args()


def spark_writer(frame, postgres_config):
values = ', '.join([f"'{technology}'" for technology in TECHNOLOGIES])

mode = 'overwrite' if override else 'append'
mode = 'overwrite'
frame = final_columns(frame).filter(f'radio IN ({values})')

frame.printSchema()
Expand All @@ -80,11 +91,11 @@ def spark_writer(frame, override=False):
).partitionBy('radio').saveAsTable('cell_towers')

frame.write.format('jdbc').options(
url='jdbc:postgresql://user-db:5432/postgres',
url=f'jdbc:postgresql://{postgres_config["postgres-host"]}:{postgres_config["postgres-port"]}/{postgres_config["postgres-user"]}',
driver='org.postgresql.Driver',
dbtable=TABLE_NAME,
user='postgres',
password='password',
user=postgres_config["postgres-port"],
password=postgres_config["postgres-password"],
createTableOptions=
f'PARTITION BY LIST (radio);' + ' '.join([
f"CREATE TABLE {TABLE_NAME}_{technology} PARTITION OF {TABLE_NAME} FOR VALUES IN ('{technology}');"
Expand Down

0 comments on commit 2889874

Please sign in to comment.