Spark is distributed big data processing framework, Hadoop is the pioneer of big data processing, but Spark is the optimized and flexible way of doing it, to understand the need for Spark, lets start with Hadoop.
- HDFS (Hadoop Distributed File System): It acts as a distributed storage layer. HDFS is great for storing large-scale data but does not have querying capabilities.
- MapReduce: A distributed data processing framework that operates over the data stored in HDFS only.
- YARN (Yet Another Resource Manager): YARN as the cluster manager, allocates resources (CPU, memory) to worker nodes and schedules jobs.
- Spark is a big data distributed processing framework like Hadoop's MapReduce, and its API provides a robust way to handle large-scale data efficiently, with key features listed below.
- It's built on Scala Programming language.
- Batch/streaming data using Spark Streaming
- SQL analytics using RDD's and SparkSQL (querying distributed data)
- Machine Learning.
- Graph processing.
Although both Spark and MapReduce does data processing, the way its done is the chief difference.
-
Speed: Spark can perform operations up to 100X faster than MapReduce because MapReduce writes most of the data to disk after each map and reduce operation; however Spark keeps most of the data in memory after each transformation. Spark will write to disk only when the memory is full.
-
Flexible Storage system: MapReduce requires files to be stored only in HDFS, while Spark can work on data stored in a variety of formats like HDFS, AWS S3, Cassandra, HBase etc.
-
Querying Capabilities: Hadoop natively doesn't support querying data, hence tools such as Hive, Hbase, Pig, SparkSQL is built on top of HDFS to provide querying features
-
SparkSql - DataFrame API : a distributed collection of data organized into named columns, resembling a table in a relational database.
-
Real-time analytics: Spark is effective when compared to Hadoop, because Hadoop persist to disk to store intermittent result of map and reduce operations, which results in lot of I/O operations, hence hadoop not good for real-time and iterative processes.
-
Detailed explanation on Hadoop MapReduce Vs Spark (link to integrate.io ): Spark Vs Hadoop
- Although Hadoop plays an optional role in the Apache Spark ecosystem, it primarily used as a resource provider(YARN) to Spark Jobs.
- Step 1: Download Spark - Hadoop binaries.
- Step 2: For Hadoop binaries to work in windows, Download winutils.exe and hadoop.dll files from repo.
- Step 3: After downloading, place winutils.exe and hadoop.dll in spark-3.5.3-bin-hadoop3\bin path (from Step 1).
- For detailed installation and explanation of Spark in Java, check out this repo
- It is the entry point for a Spark application.
Key Responsibilities:
- Cluster Communication: Connects with the cluster manager (YARN, Mesos, Kubernetes or Spark Standalone) to request resources for executing tasks.
- Job Scheduling: Breaks down a Spark application into stages and tasks, then schedules their execution across the cluster.
- RDD Creation: Creates RDDs from external data sources (e.g., HDFS, S3, local file systems). Broadcast Variables and Accumulators: Manages shared variables used across nodes.
- RDD(Resilient Distributed Dataset) is the fundamental data abstraction in Spark.
- It represents an immutable distributed collection of objects that can be processed in parallel across the cluster.
- Fault-tolerant: meaning they can automatically recover lost partitions due to node failures.
- Lazy Evaluation: Transformations on RDDs are not executed immediately. They are only computed when an action is triggered.
To let Spark access java.base/sun.nio.cha packages, else will get unaccessible error.
--add-opens=java.base/sun.nio.ch=ALL-UNNAMED
SparkSession.builder().master("local[*]").appName("SparkBasics").getOrCreate();
- local --> Process the data in single thread.
- local[*] --> where * represents number of logical cores(threads) equivalent to Runtime.getRuntime().availableProcessors(), Spark uses * option to partition the data and execute it parallel based on the number of logical cores.
- local[k] --> where k is hardcoded thread count, local[4]
SparkSession.builder().appName("SparkBasics").getOrCreate();
Spark application can be packaged only as a jar and deployed in a clustered environment such AWS EMR or standalone as desired.
spark-submit --master yarn application.jar
Even though, spark is jar package, real time data processing is possible through, spark-streaming library where Kafka can be used as a source of streaming real-time data.
Note: With Spark jobs being executed as jars and processing real-time data, below scenarios should be handled:
- Logs : If cluster manager is yarn or kubernetes, where using kubectl we can ge the logs else we need to go for other logging mechanism like Kibana (ELK stack)
- Deployment : Gracefully shutdown existing jar and (Handle checkpoints or reprocessing mechanism) in case deploying new jar.
.option("startingOffsets", "latest") // Or "earliest" for replay -- for kafka
.option("checkpointLocation", "/path/to/checkpoint") -- to restart from last checkpoint
Accessing External file system:
clusteredSparkContext.hadoopConfiguration().set("fs.s3a.access.key", "access-key"); -- provide ur own aws s3 bucket access-key
clusteredSparkContext.hadoopConfiguration().set("fs.s3a.secret.key", "secret-key"); -- provide ur own aws s3 bucket secret-key
clusteredSparkContext.hadoopConfiguration().set("fs.s3a.endpoint", "s3.amazonaws.com");
JavaRDD<String> rdd = clusteredSparkContext.textFile("s3a://mybucket//fake_data.txt"); -- s3a://{yourbucketname}//{filename}
Steps to create s3 storage:
- Create aws free tier account
- search s3, create bucket name (should be unique globally)
- Create user group(link to amazon website) in aws.
- Create access key(link to amazon website) in aws.
- On Step 5, access and secret key will be downloaded on your browser in csv format.
Steps to create cluster:
- Search EMR(ElasticMapReduce), select Spark and Hadoop version as you desire
- Provide necessary details, such as instance count and its memory(ideally go for c4.large or m5.xlarge)
- Cluster termination and node replacement: Select auto terminate on idle time and provide the timings as you desire.
- Rest leave it as it is and create cluster
- Now if cluster started successfully we should see as below in Waiting Status and number of instances running in that cluster is at the bottom of the page in Instance Group
- Connect to EC2 EMR instance from windows(using putty and .ppk file) or Linux (Ubuntu or any Linux Cli and .pem file )
- For Putty: follow below steps
- To connect to aws instance from linux cli, need the security/credentials '.pem' file
and once you generated '.pem' file, it will be inside windows, to copy the file from windows to linux, use below commands
Change directory to .pem file location using command : cd /mnt/c/Users/{username}/git/spark Check current directory using command : pwd Copy file from windows to linux using command : cp {yourpemsecurityfile}.pem /home/{username}
For Ubuntu Cli: Get cluster id from above screenshot and execute below command
ssh -i yoursecuritykey.pem hadoop@ec2-65-1-93-113.ap-south-1.compute.amazonaws.com
Network Error : Connection timeout for port 22 via SSH:
- Go to Network & Security --> Security Groups --> Select you Security Group Id (whichever used while creating Cluster) --> Select Inbound Rules as below
- Add Rule to let communication Via SSH through port 22 as below and update the security groups
Set read/write permission: First time you'll see, permission too open error, you need to set permission for your .pem file, so that only you can read or write that file.
chmod 600 yoursecuritykey.pem
Connection to EMR Cluster from CLI is success as below
Upload your executable jar file to s3 bucket and copy that file to your EMR cluster using below cmd
aws s3 cp s3://mybucket/yourjar.jar .
Note: "." at the end indicates current working directory of the cluster.
After copying the jar, execute spark job using below command
spark-submit yourjarname.jar
java.lang.ClassCastException: cannot assign instance of java.lang.invoke.SerializedLambda to field org.apache.spark.rdd.MapPartitionsRDD.f of
type scala.Function3 in instance of org.apache.spark.rdd.MapPartitionsRDD
In Clustered Environment, Spark throws above issue, reason is Driver Program(Application Code) should be copied to Worker Nodes as well. Explained(link to stackoverflow). To fix it, setJars should be populated like (link to file) below.
SparkSession ss = SparkSession.builder().appName("SparkBasics")
.config(new SparkConf()
.setJars(new String[] {"apache-spark-with-java8-1.0-SNAPSHOT.jar"}))
.getOrCreate();
Using Spark UI, metrics such as (time, memory, partitions, executors) can be found.
As we used two worker nodes(two instances), task is shared between two executors as described in above screenshot.