Durante esta practica la idea es emular un ambiente de trabajo, desde un área de innovación solicitan construir un MVP(Producto viable mínimo) de un ambiente de Big Data donde se deban cargar unos archivos CSV que anteriormente se utilizaban en un datawarehouse en MySQl, pero ahora en un entorno de Hadoop.
Desde la gerencia de Infraestructura no están muy convencidos de utilizar esta tecnología por lo que no se asigno presupuesto alguna para esta iniciativa, de forma tal que por el momento no es posible utilizar un Vendor(Azure, AWS, Google) para implementar dicho entorno, es por esto que todo el MVP se deberá implementar utilizando Docker de forma tal que se pueda hacer una demo al sector de infraestructura mostrando las ventajas de utilizar tecnologías de Big Data.
Se pesenta un entorno Docker con Hadoop (HDFS) y la implementación de:
- Spark
- Hive
- HBase
- MongoDB
- Neo4J
- Zeppelin
- Kafka
Es importante mencionar que el entorno completo consume muchos recursos de su equipo, motivo por el cuál, se propondrán ejercicios pero con ambientes reducidos, en función de las herramientas utilizadas.
Ejecute docker network inspect
en la red (por ejemplo, docker-hadoop-spark-hive_default
) para encontrar la IP en la que se publican las interfaces de hadoop. Acceda a estas interfaces con las siguientes URL:
Namenode: http://192.168.1.107:9870/dfshealth.html#tab-overview
Datanode: http://192.168.1.107:9864/
Spark master: http://192.168.1.107:8080/
Spark worker: http://192.168.1.107:8081/
HBase Master-Status: http://192.168.1.107:16010
HBase Zookeeper_Dump: http://192.168.1.107:16010/zk.jsp
HBase Region_Server: http://192.168.1.107:16030
Zeppelin: http://192.168.1.107:8888
Neo4j: http://192.168.1.107:7474
Para implementar se ejecuto
git clone https://github.com/lopezdar222/herramientas_big_data
cd herramientas_big_data
sudo docker-compose -f docker-compose-vX.yml up -d
Se utilizo el entorno docker-compose-v1.yml
Copie los archivos ubicados en la carpeta Datasets, dentro del contenedor "namenode"
sudo docker exec -it namenode bash
cd home
mkdir Datasets
exit
sudo docker cp <path><archivo> namenode:/home/Datasets/<archivo>
Ubicarse en el contenedor "namenode"
sudo docker exec -it namenode bash
Crear un directorio en HDFS llamado "/data".
hdfs dfs -mkdir -p /data
Copiar los archivos csv provistos a HDFS:
hdfs dfs -put /home/Datasets/* /data
Este proceso de creación de la carpeta data y copiado de los arhivos, debe poder ejecutarse desde un shell script.
Nota: Busque dfs.blocksize y dfs.replication en http://<IP_Anfitrion>:9870/conf para encontrar los valores de tamaño de bloque y factor de réplica respectivamente entre otras configuraciones del sistema Hadoop.
Se utiliza el entorno docker-compose-v2.yml
Crear tablas en Hive, a partir de los csv ingestados en HDFS.
Para esto, se puede ubicar dentro del contenedor correspondiente al servidor de Hive, y ejecutar desdea allí los scripts necesarios
sudo docker exec -it hive-server bash
hive
Este proceso de creación las tablas debe poder ejecutarse desde un shell script.
Nota: Para ejecutar un script de Hive, requiere el comando:
hive -f <script.hql>
Las tablas creadas en el punto 2 a partir de archivos en formato csv, deben ser almacenadas en formato Parquet + Snappy. Tener en cuenta además de aplicar particiones para alguna de las tablas.
La mejora en la velocidad de consulta que puede proporcionar un índice tiene el costo del procesamiento adicional para crear el índice y el espacio en disco para almacenar las referencias del índice. Se recomienda que los índices se basen en las columnas que utiliza en las condiciones de filtrado. El índice en la tabla puede degradar su rendimiento en caso de que no los esté utilizando. Crear índices en alguna de las tablas cargadas y probar los resultados:
CREATE INDEX index_name
ON TABLE base_table_name (col_name, ...)
AS index_type
[WITH DEFERRED REBUILD]
[IDXPROPERTIES (property_name=property_value, ...)]
[IN TABLE index_table_name]
[ [ ROW FORMAT ...] STORED AS ...
| STORED BY ... ]
[LOCATION hdfs_path]
[TBLPROPERTIES (...)]
[COMMENT "index comment"];
Ejemplo:
hive> CREATE INDEX index_students ON TABLE students(id)
> AS 'org.apache.hadoop.hive.ql.index.compact.CompactIndexHandler'
> WITH DEFERRED REBUILD ;
ALTER INDEX index_name ON table_name [PARTITION partition_spec] REBUILD;
Ejemplo:
hive> ALTER INDEX index_students ON students REBUILD;
DROP INDEX [IF EXISTS] index_name ON table_name;
hive> DROP INDEX IF EXISTS index_students ON students;
Se puede utilizar el entorno docker-compose-v3.yml
Instrucciones:
1- sudo docker exec -it hbase-master hbase shell
create 'personal','personal_data'
list 'personal'
put 'personal',1,'personal_data:name','Juan'
put 'personal',1,'personal_data:city','Córdoba'
put 'personal',1,'personal_data:age','25'
put 'personal',2,'personal_data:name','Franco'
put 'personal',2,'personal_data:city','Lima'
put 'personal',2,'personal_data:age','32'
put 'personal',3,'personal_data:name','Ivan'
put 'personal',3,'personal_data:age','34'
put 'personal',4,'personal_data:name','Eliecer'
put 'personal',4,'personal_data:city','Caracas'
get 'personal','4'
2-En el namenode del cluster:
hdfs dfs -put personal.csv /hbase/data/personal.csv
3-sudo docker exec -it hbase-master bash
hbase org.apache.hadoop.hbase.mapreduce.ImportTsv -Dimporttsv.separator=',' -Dimporttsv.columns=HBASE_ROW_KEY,personal_data:name,personal_data:city,personal_data:age personal hdfs://namenode:9000/hbase/data/personal.csv
hbase shell
scan 'personal'
create 'album','label','image'
put 'album','label1','label:size','10'
put 'album','label1','label:color','255:255:255'
put 'album','label1','label:text','Family album'
put 'album','label1','image:name','holiday'
put 'album','label1','image:source','/tmp/pic1.jpg'
get 'album','label1'
Instrucciones:
1) sudo docker cp iris.csv mongodb:/data/iris.csv
sudo docker cp iris.json mongodb:/data/iris.json
2) sudo docker exec -it mongodb bash
3) mongoimport /data/iris.csv --type csv --headerline -d dataprueba -c iris_csv
mongoimport --db dataprueba --collection iris_json --file /data/iris.json --jsonArray
4) mongosh
use dataprueba
show collections
db.iris_csv.find()
db.iris_json.find()
5) mongoexport --db dataprueba --collection iris_csv --fields sepal_length,sepal_width,petal_length,petal_width,species --type=csv --out /data/iris_export.csv
mongoexport --db dataprueba --collection iris_json --fields sepal_length,sepal_width,petal_length,petal_width,species --type=json --out /data/iris_export.json
6) Descargar desde https://search.maven.org/search?q=g:org.mongodb.mongo-hadoop los jar:
https://search.maven.org/search?q=a:mongo-hadoop-hive
https://search.maven.org/search?q=a:mongo-hadoop-spark
sudo docker cp mongo-hadoop-hive-2.0.2.jar hive-server:/opt/hive/lib/mongo-hadoop-hive-2.0.2.jar
sudo docker cp mongo-hadoop-core-2.0.2.jar hive-server:/opt/hive/lib/mongo-hadoop-core-2.0.2.jar
sudo docker cp mongo-hadoop-spark-2.0.2.jar hive-server:/opt/hive/lib/mongo-hadoop-spark-2.0.2.jar
sudo docker cp mongo-java-driver-3.12.11.jar hive-server:/opt/hive/lib/mongo-java-driver-3.12.11.jar
7) sudo docker cp iris.hql hive-server:/opt/iris.hql
sudo docker exec -it hive-server bash
8) hiveserver2
chmod 777 iris.hql
hive -f iris.hql
Ejemplo de búsqueda del camino más corto:
https://neo4j.com/docs/graph-data-science/current/algorithms/dijkstra-source-target/
CREATE (a:Location {name: 'A'}),
(b:Location {name: 'B'}),
(c:Location {name: 'C'}),
(d:Location {name: 'D'}),
(e:Location {name: 'E'}),
(f:Location {name: 'F'}),
(a)-[:ROAD {cost: 50}]->(b),
(b)-[:ROAD {cost: 50}]->(a),
(a)-[:ROAD {cost: 50}]->(c),
(c)-[:ROAD {cost: 50}]->(a),
(a)-[:ROAD {cost: 100}]->(d),
(d)-[:ROAD {cost: 100}]->(a),
(b)-[:ROAD {cost: 40}]->(d),
(d)-[:ROAD {cost: 40}]->(b),
(c)-[:ROAD {cost: 40}]->(d),
(d)-[:ROAD {cost: 40}]->(c),
(c)-[:ROAD {cost: 80}]->(e),
(e)-[:ROAD {cost: 80}]->(c),
(d)-[:ROAD {cost: 30}]->(e),
(e)-[:ROAD {cost: 30}]->(d),
(d)-[:ROAD {cost: 80}]->(f),
(f)-[:ROAD {cost: 80}]->(d),
(e)-[:ROAD {cost: 40}]->(f),
(f)-[:ROAD {cost: 40}]->(e);
CALL gds.graph.project(
'miGrafo',
'Location',
'ROAD',
{
relationshipProperties: 'cost'
}
)
MATCH (l:Location) RETURN l
MATCH (source:Location {name: 'A'}), (target:Location {name: 'E'})
CALL gds.shortestPath.dijkstra.write.estimate('miGrafo', {
sourceNode: source,
targetNode: target,
relationshipWeightProperty: 'cost',
writeRelationshipType: 'PATH'
})
YIELD nodeCount, relationshipCount, bytesMin, bytesMax, requiredMemory
RETURN nodeCount, relationshipCount, bytesMin, bytesMax, requiredMemory
MATCH (source:Location {name: 'A'}), (target:Location {name: 'E'})
CALL gds.shortestPath.dijkstra.stream('miGrafo', {
sourceNode: source,
targetNode: target,
relationshipWeightProperty: 'cost'
})
YIELD index, sourceNode, targetNode, totalCost, nodeIds, costs, path
RETURN
index,
gds.util.asNode(sourceNode).name AS sourceNodeName,
gds.util.asNode(targetNode).name AS targetNodeName,
totalCost,
[nodeId IN nodeIds | gds.util.asNode(nodeId).name] AS nodeNames,
costs,
nodes(path) as path
ORDER BY index
Ejemplo de logística: https://neo4j.com/docs/graph-data-science/current/alpha-algorithms/minimum-weight-spanning-tree/
MATCH (n:Location {name: 'A'})
CALL gds.alpha.spanningTree.minimum.write('miGrafo', {
startNodeId: id(n),
relationshipWeightProperty: 'cost',
writeProperty: 'MINST',
weightWriteProperty: 'writeCost'
})
YIELD preProcessingMillis, computeMillis, writeMillis, effectiveNodeCount
RETURN preProcessingMillis, computeMillis, writeMillis, effectiveNodeCount;
MATCH path = (n:Location {name: 'A'})-[:MINST*]-()
WITH relationships(path) AS rels
UNWIND rels AS rel
WITH DISTINCT rel AS rel
RETURN startNode(rel).name AS source, endNode(rel).name AS destination, rel.writeCost AS cost
MATCH (n) DETACH DELETE n
sudo docker cp producto.csv neo4j:/var/lib/neo4j/import/producto.csv
sudo docker cp tipo_producto.csv neo4j:/var/lib/neo4j/import/tipo_producto.csv
sudo docker cp cliente.csv neo4j:/var/lib/neo4j/import/cliente.csv
sudo docker cp venta.csv neo4j:/var/lib/neo4j/import/venta.csv
Ver Archivo "ejemploNeo4J.txt"
HDFS:
En la máquina anfitrión probar WebHDFS:
curl "http://<IP_Anfitrion>:9870/webhdfs/v1/?op=LISTSTATUS"
En el interpreter:
En la parte de "file"
Variable hdfs.url = http://<IP_Anfitrion>:9870/webhdfs/v1/
En nuevo notebook / nueva nota:
%file
ls /
Neo4J:
En el interpreter
En la parte de "neo4J"
Variables
neo4J.url = http://<IP_Anfitrion>:7687
neo4j.auth.user = neo4j
neo4j.auth.password = zeppelin
Se pueden utilizar los entornos docker-compose-v4.yml y docker-compose-kafka.yml
Ubicarse en la línea de comandos del Spark master y comenzar PySpark.
docker exec -it spark-master bash
/spark/bin/pyspark --master spark://spark-master:7077
Cargar raw-flight-data.csv desde HDFS.
from pyspark.sql.types import *
flightSchema = StructType([
StructField("DayofMonth", IntegerType(), False),
StructField("DayOfWeek", IntegerType(), False),
StructField("Carrier", StringType(), False),
StructField("OriginAirportID", IntegerType(), False),
StructField("DestAirportID", IntegerType(), False),
StructField("DepDelay", IntegerType(), False),
StructField("ArrDelay", IntegerType(), False),
]);
flights = spark.read.csv('hdfs://namenode:9000/data/flights/raw-flight-data.csv', schema=flightSchema, header=True)
flights.show()
+----------+---------+-------+---------------+-------------+--------+--------+
|DayofMonth|DayOfWeek|Carrier|OriginAirportID|DestAirportID|DepDelay|ArrDelay|
+----------+---------+-------+---------------+-------------+--------+--------+
| 19| 5| DL| 11433| 13303| -3| 1|
| 19| 5| DL| 14869| 12478| 0| -8|
| 19| 5| DL| 14057| 14869| -4| -15|
| 19| 5| DL| 15016| 11433| 28| 24|
| 19| 5| DL| 11193| 12892| -6| -11|
| 19| 5| DL| 10397| 15016| -1| -19|
| 19| 5| DL| 15016| 10397| 0| -1|
| 19| 5| DL| 10397| 14869| 15| 24|
| 19| 5| DL| 10397| 10423| 33| 34|
| 19| 5| DL| 11278| 10397| 323| 322|
| 19| 5| DL| 14107| 13487| -7| -13|
| 19| 5| DL| 11433| 11298| 22| 41|
| 19| 5| DL| 11298| 11433| 40| 20|
| 19| 5| DL| 11433| 12892| -2| -7|
| 19| 5| DL| 10397| 12451| 71| 75|
| 19| 5| DL| 12451| 10397| 75| 57|
| 19| 5| DL| 12953| 10397| -1| 10|
| 19| 5| DL| 11433| 12953| -3| -10|
| 19| 5| DL| 10397| 14771| 31| 38|
| 19| 5| DL| 13204| 10397| 8| 25|
+----------+---------+-------+---------------+-------------+--------+--------+
only showing top 20 rows
flights.describe()
Ubicarse en la línea de comandos del Spark master y comenzar Scala.
docker exec -it spark-master bash
spark/bin/spark-shell --master spark://spark-master:7077
Cargar raw-flight-data.csv desde HDFS.
case class flightSchema(DayofMonth:String, DayOfWeek:String, Carrier:String, OriginAirportID:String, DestAirportID:String, DepDelay:String, ArrDelay:String)
val flights = spark.read.format("csv").option("sep", ",").option("header", "true").load("hdfs://namenode:9000/data/flights/raw-flight-data.csv").as[flightSchema]
flights.show()
+----------+---------+-------+---------------+-------------+--------+--------+
|DayofMonth|DayOfWeek|Carrier|OriginAirportID|DestAirportID|DepDelay|ArrDelay|
+----------+---------+-------+---------------+-------------+--------+--------+
| 19| 5| DL| 11433| 13303| -3| 1|
| 19| 5| DL| 14869| 12478| 0| -8|
| 19| 5| DL| 14057| 14869| -4| -15|
| 19| 5| DL| 15016| 11433| 28| 24|
| 19| 5| DL| 11193| 12892| -6| -11|
| 19| 5| DL| 10397| 15016| -1| -19|
| 19| 5| DL| 15016| 10397| 0| -1|
| 19| 5| DL| 10397| 14869| 15| 24|
| 19| 5| DL| 10397| 10423| 33| 34|
| 19| 5| DL| 11278| 10397| 323| 322|
| 19| 5| DL| 14107| 13487| -7| -13|
| 19| 5| DL| 11433| 11298| 22| 41|
| 19| 5| DL| 11298| 11433| 40| 20|
| 19| 5| DL| 11433| 12892| -2| -7|
| 19| 5| DL| 10397| 12451| 71| 75|
| 19| 5| DL| 12451| 10397| 75| 57|
| 19| 5| DL| 12953| 10397| -1| 10|
| 19| 5| DL| 11433| 12953| -3| -10|
| 19| 5| DL| 10397| 14771| 31| 38|
| 19| 5| DL| 13204| 10397| 8| 25|
+----------+---------+-------+---------------+-------------+--------+--------+
only showing top 20 rows