Skip to content

brumbrumm/qlik_pyspark_migration

Folders and files

NameName
Last commit message
Last commit date

Latest commit

 

History

2 Commits
 
 

Repository files navigation

How to migrate ETL script from QLikView / Qlik Sense to PySpark Cheatsheet

QlikView / Qlik Sense to PySpark commands mapping

Before starting the migration code Guidelinies needed to be included to keep the code simple and readable.

SELECT

First in the ETL should be get only the column you need. It prevent errors in the development and later it is stable in the operating.

Qlik code snippet

table_name:
LOAD id,
     name, 
     value 
RESIDENT master_data;

Python code snippet

table_name = master_data.select(F.col("id"), F.col('name'), F.col('value'))

RENAME COLUMN

Qlik code snippet

table_name:
LOAD 
     id as id_core,
     name as name_core,
     value as value_core
RESIDENT master_data;

Python code snippet

table_name = table_name.withColumnRenamed("ID", "ID_CORE")

table_name = table_name.withColumnRenamed("NAME", "NAME_CORE")

table_name = table_name.withColumnRenamed("VALUE", "VALUE_CORE")
table_name = table_name

.withColumnRenamed("ID", "ID_CORE").withColumnRenamed("NAME", "NAME_CORE").withColumnRenamed("VALUE", "VALUE_CORE")

JOIN

It is important to set what kind of join it is. In the example it is left join 'how="left"'

Qlik code snippet

left join(table_name)
LOAD id,
     name, 
     value 
RESIDENT master_data;

Python code snippet

table_name = table_name.join(master_data, "ID", how="left")

WHERE

Qlik code snippet

table_name:
LOAD
  ID,
  NAME,
  VALUE
RESIDENT master_data
WHERE VALUE = '1';

Python code snippet

table_name = table_name.filter(table_name.VALUE == '1')

Other filter operator

# filter <, >, >=, <=
table_name = table_name.filter(table_name.VALUE >= '1')
# combined
table_name = table_name.filter((table_name.VALUE > '1') & (table_name.VALUE < '10'))
# is in, list filter
table_name = table_name.filter(col('VALUE').isin([1, ,2, 3]))

In field changes

Qlik code snippet

table_name:
LOAD
  ID,
  NAME,
  if(VALUE = TRUE(), 1, 0) as VALUE
RESIDENT master_data

Python code snippet

from pyspark.sql import functions as F

true_false_numeric = F.udf(lambda x: 1 if x else 0)

table_name = master_data.withColumn('VALUE', true_false_numeric(master_data.VALUE))

For changes in fields it is mandatory to write user defined function. In the example the function maps one or zero to values true or false. To use functions in PySpark the package "pyspark.sql" is required: "from pyspark.sql import functions as F"

pyspark.sql functions includes some basic sql function like max and min in field. Qlik code snippet

table_name:
LOAD
  ID,
  NAME,
  max(VALUE) as max_value
RESIDENT master_data
GROUP BY ID, NAME

Python code snippet

from pyspark.sql import functions as F

table_name = table_name.groupBy(
    'ID', 'NAME'
).agg(F.max('VALUE').alias('max_value'))

Create new columns

Qlik code snippet

table_name:
LOAD
  id,
  name,
  'something text' as value
RESIDENT master_data;

Python code snippet

from pyspark.sql import functions as F


table_name = table_name.withColumn('value', F.lit('something text'))

Combine columns

Qlik code snippet

table_name:
LOAD
  name & '-' & value as id,
  name,
  value
RESIDENT master_data;

Python code snippet

from pyspark.sql import functions as F

table_name = table_name.withColumn('id', F.concat_ws('-', 'name', 'value'))

Order by and Group By combined

To order and aggregate at the same time it is better to use Window function in PySpark

Qlik code snippet

table_name:
LOAD
  NAME,
  FirstValue(VALUE) as max_value
RESIDENT master_data
GROUP BY NAME
ORDER BY DATE desc;

Python code snippet

from pyspark.sql.window import Window

window_get_last_event = Window.partitionBy('NAME').orderBy(F.col('DATE').desc())
table_name = table_name.withColumn('ROW_NUMBER', F.row_number().over(window_get_last_event))
table_name = table_name.filter(F.col('ROW_NUMBER') == 1)

Save / Store as csv

Qlik code snippet

STORE table_name into 'path/to/folder/name.csv'(txt);

Python code snippet

table_name.write.mode('overwrite').option('header', True).csv('s3://bucket_name/folder/name')

Releases

No releases published

Packages

No packages published