Before starting the migration code Guidelinies needed to be included to keep the code simple and readable.
First in the ETL should be get only the column you need. It prevent errors in the development and later it is stable in the operating.
Qlik code snippet
table_name:
LOAD id,
name,
value
RESIDENT master_data;
Python code snippet
table_name = master_data.select(F.col("id"), F.col('name'), F.col('value'))
Qlik code snippet
table_name:
LOAD
id as id_core,
name as name_core,
value as value_core
RESIDENT master_data;
Python code snippet
table_name = table_name.withColumnRenamed("ID", "ID_CORE")
table_name = table_name.withColumnRenamed("NAME", "NAME_CORE")
table_name = table_name.withColumnRenamed("VALUE", "VALUE_CORE")
table_name = table_name
.withColumnRenamed("ID", "ID_CORE").withColumnRenamed("NAME", "NAME_CORE").withColumnRenamed("VALUE", "VALUE_CORE")
It is important to set what kind of join it is. In the example it is left join 'how="left"'
Qlik code snippet
left join(table_name)
LOAD id,
name,
value
RESIDENT master_data;
Python code snippet
table_name = table_name.join(master_data, "ID", how="left")
Qlik code snippet
table_name:
LOAD
ID,
NAME,
VALUE
RESIDENT master_data
WHERE VALUE = '1';
Python code snippet
table_name = table_name.filter(table_name.VALUE == '1')
Other filter operator
# filter <, >, >=, <=
table_name = table_name.filter(table_name.VALUE >= '1')
# combined
table_name = table_name.filter((table_name.VALUE > '1') & (table_name.VALUE < '10'))
# is in, list filter
table_name = table_name.filter(col('VALUE').isin([1, ,2, 3]))
Qlik code snippet
table_name:
LOAD
ID,
NAME,
if(VALUE = TRUE(), 1, 0) as VALUE
RESIDENT master_data
Python code snippet
from pyspark.sql import functions as F
true_false_numeric = F.udf(lambda x: 1 if x else 0)
table_name = master_data.withColumn('VALUE', true_false_numeric(master_data.VALUE))
For changes in fields it is mandatory to write user defined function. In the example the function maps one or zero to values true or false. To use functions in PySpark the package "pyspark.sql" is required: "from pyspark.sql import functions as F"
pyspark.sql functions includes some basic sql function like max and min in field. Qlik code snippet
table_name:
LOAD
ID,
NAME,
max(VALUE) as max_value
RESIDENT master_data
GROUP BY ID, NAME
Python code snippet
from pyspark.sql import functions as F
table_name = table_name.groupBy(
'ID', 'NAME'
).agg(F.max('VALUE').alias('max_value'))
Qlik code snippet
table_name:
LOAD
id,
name,
'something text' as value
RESIDENT master_data;
Python code snippet
from pyspark.sql import functions as F
table_name = table_name.withColumn('value', F.lit('something text'))
Qlik code snippet
table_name:
LOAD
name & '-' & value as id,
name,
value
RESIDENT master_data;
Python code snippet
from pyspark.sql import functions as F
table_name = table_name.withColumn('id', F.concat_ws('-', 'name', 'value'))
To order and aggregate at the same time it is better to use Window function in PySpark
Qlik code snippet
table_name:
LOAD
NAME,
FirstValue(VALUE) as max_value
RESIDENT master_data
GROUP BY NAME
ORDER BY DATE desc;
Python code snippet
from pyspark.sql.window import Window
window_get_last_event = Window.partitionBy('NAME').orderBy(F.col('DATE').desc())
table_name = table_name.withColumn('ROW_NUMBER', F.row_number().over(window_get_last_event))
table_name = table_name.filter(F.col('ROW_NUMBER') == 1)
Qlik code snippet
STORE table_name into 'path/to/folder/name.csv'(txt);
Python code snippet
table_name.write.mode('overwrite').option('header', True).csv('s3://bucket_name/folder/name')