forked from ifpb/Integracao-dados-overview
-
Notifications
You must be signed in to change notification settings - Fork 0
/
deduplicar.py
36 lines (25 loc) · 1.15 KB
/
deduplicar.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
import os
import pyspark.sql.functions as func
import findspark
from config import CONFIG
findspark.init()
from pyspark.sql import SparkSession
spark = SparkSession.builder.master("local").appName(CONFIG['APP_NAME']).getOrCreate()
"""
Recupera a lista de usuários construída no arquivo (converter.py) e realiza a remoção das instâncias duplicadas
"""
def deduplicate():
## Carrega lista de usuários persistida no formato Parquet
users = spark.read.parquet(CONFIG['OUTPUT_PATH'])
## Cria um grupo contendo um id único e a data de última atualização das instâncias vinculadas ao id corrente
cluster = users.groupBy('id').agg(func.max("update_date").alias('update_date'))
## Faz o join do dataframe completo com o grupo, removendo as duplicatas
users_deduplicated = users.join(cluster, ['id', 'update_date'])\
.sort(users.id.asc())
## Exibe o resultado
users_deduplicated.show()
## Persiste o novo dataframe em um novo Parquet
if not os.path.isdir(CONFIG['OUTPUT_PATH_DEDUPLICATED']):
users_deduplicated.write.parquet(CONFIG['OUTPUT_PATH_DEDUPLICATED'])
return users_deduplicated
deduplicate()