-
Why NoSQL?
- Random Access to Planet-Size Data
- Scales horizontally (add more machines)
-
Scaling MySQL requires extreme measures
- Denormalization: store redundant data
- Caching: store data in memory
- Master/Slave: read from slaves, write to master
- Sharding: split data across multiple machines
- Materialized Views: pre-compute results
- Removing stored procedures
-
Do you really need SQL?
- Your high-transaction queries are probably simple once denormalized
- A simple get/put API is often sufficient
- Looking up values by key is simple, fast, and scalable
-
Use the right tool for the job
- For analytics: Hive, Pig, Spark, etc.
- Export data to a relational database for OLTP
- If you have giant scale data - export to a non-relational database
-
Sample architecture
Customer -> Internet -> Web Servers -> MongoDB <- Spark Streaming | Hadoop YARN / HDFS <- Data source(s)
- Non-relational database, scalable, built ON HDFS
- Based on Google's BigTable
- CRUD
- Create
- Read
- Update
- Delete
- There is no query language, only CRUD APIs
- Fast access to any given ROW
- A ROW is referenced by a unique KEY
- Each ROW has a small number of COLUMN FAMILIES
- A COLUMN FAMILY may contain an arbitrary number of COLUMNS
- You can have a very large number of COLUMNS per COLUMN FAMILY
- Each CELL can have many VERSIONS with given timestamps
- Sparse data is OK - missing columns in a row take no space
Track all the links that connect to a given URL
Key: com.cnn.www (reverse domain name - keys are stored lexicographically, and we want to store all cnn.com links together)
- COLUMN FAMILY: Contents -- With the versioning feature, storing 3 copies of the contents of the page:
- Contents:
<html>...</html>
, ...
- Contents:
- COLUMN FAMILY: Anchor:
- Anchor: cnnsi.com
- Anchor: my.look.ca
- HBase shell
- Java API
- Wrappers for Python, Scala, etc.
- Spark, Hive, Pig, etc.
- REST service
- Thrift service -> Represents the data more compactly -- maximum performance
- Avro service -> Represents the data more compactly -- maximum performance
- Create an HBase table for movie ratings by user
- Show we can quickly query it for individual users
- Good example of sparse data
- Column family: rating
- The row ID will be the user ID.
- Each individual column will represent a movie ID and the value will be the rating.
- Open ports in the VirtualBox VM:
- Settings -> Network -> Advanced -> Port Forwarding -> Add a new rule
- name: HBase REST/Info
- Protocol: TCP
- Host IP: 127.0.0.1
- Host Port: 6666/6667
- Guest Port: 6666/6667
- Settings -> Network -> Advanced -> Port Forwarding -> Add a new rule
- Start HBase
Ambari -> HBase -> Service Actions -> Start
- Log into the VM and start the REST server:
sudo su /usr/hdp/current/hbase-master/bin/hbase-daemon.sh start rest -p 6666 --infoport 6667
Python Client -> REST API -> HBase | HDFS
Install starbase
:sudo pip install starbase
- Write the script:
from starbase import Connection c = Connection("127.0.0.1", "6666") if ratings.exists(): print("Dropping existing ratings table\n") ratings.drop() ratings.create('rating') print("Parsing the ml-100k ratings data...\n") ratingFile = open("/home/maria_dev/ml-100k/u.data", "r") batch = ratings.batch() for line in ratingFile: (userID, movieID, rating, timestamp) = line.split() batch.update(userID, {'rating': {movieID: rating}}) ratingFile.close() print("Committing ratings data to HBase via REST service\n") batch.commit(finalize=True) print("Get back ratings for some users...\n") print("Ratings for user ID 1:\n") print(ratings.fetch("1")) print("Ratings for user ID 33:\n") print(ratings.fetch("33"))
- Must create HBase table first
- The relation must have a unique key as its first field, followed by subsequent columns as you want to have them saved in HBase
- USING clause allows you to STORE into an HBase table
- Can work at scale
- Upload the users file to HDFS:
Ambari -> Files View -> 'users/maria_dev' -> Upload -> Select the file (`u.user`)-> Upload :
- SSH into the VM and run the following command to open the interactive Hbase shell:
hbase shell
- Create the table:
create 'users', 'userinfo'
- Exit the shell:
exit
- Download the following file
wget http://media.sundog-soft.com/hadoop/hbase.pig
(this is a Pig script that will import the users table into HBase)users = LOAD '/user/maria_dev/ml-100k/u.user' USING PigStorage('|') AS (userID:int, age:int, gender:chararray, occupation:chararray, zip:int); STORE users INTO 'hbase://users' USING org.apache.pig.backend.hadoop.hbase.HBaseStorage ( 'userinfo:age,userinfo:gender,userinfo:occupation,userinfo:zip');
- Run the script:
pig hbase.pig
- Check the results:
hbase shell scan 'users'
- Delete the table:
disable 'users' drop 'users'
- Exit the shell:
exit
- Stop the HBase service:
Ambari -> HBase -> Service Actions -> Stop
- Unlike HBase, there is no master node - all nodes are the same
- Data model is similar to HBase
- It's non-relational, but it has a query language (CQL) as its interface
- consistency: every read receives the most recent write or an error
- availability: every request receives a response about whether it succeeded or failed
- partition tolerance: the system continues to work even if the network is partitioned
You can only have two of these at the same time. Cassandra is AP.
- It is "eventually consistent"
- It offers "tunable consistency": you can specify your consistency as part of your requests
Cassandra is more available but less consistent, but it can be adjusted.
Other databases:
- MongoDB: CP
- HBase: CP
- MySQL: CA
- Every node is identical
- The client chooses a node to connect to (usually the closest one) to know where the requested data lives
- Specifying the consistency level: how many nodes must respond to a request before it is considered successful
- Cassandra is a "ring" of nodes
- Data is partitioned across the nodes using consistent hashing
- Data is replicated across multiple nodes for fault tolerance
- Cassandra's great for fast access to rows of information
- An API to do READs and WRITEs
- No JOINS
- All queries must be on primary key
- CQLSH can be used on the command line to interact with Cassandra
- All tables must be in a keyspace - keyspaces are like databases
- DataStax offers a Spark-Cassandra connector
- Allows you to read and write Cassandra tables as DataFrames
- It is smart about passing queries on those DataFrames back down to the appropriate level
- Use cases:
- Use Spark for analytics on Cassandra data
- Use Spark to transform data and store it into Cassandra for transactional use
- Log into the VM
ssh maria_dev@localhost -p 2222 sudo su
- Create the Cassandra repository file:
cd /etc/yum.repos.d/ nano datastax.repo
- Paste the following into the file:
[datastax] name = DataStax Repo for Apache Cassandra baseurl = http://rpm.datastax.com/community enabled = 1 gpgcheck = 0
- Install Cassandra
yum install dsc30
- Start Cassandra
service cassandra start
- Start cqsh
cqlsh --cqlversion=3.4.0
- Create a keyspace (a keyspace in Cassandra is like a database in MySQL)
CREATE KEYSPACE movielens WITH replication = {'class': 'SimpleStrategy', 'replication_factor': 1};
- Use the keyspace
USE movielens;
- Create a table
CREATE TABLE users (user_id int, age int, gender text, occupation text, zip text, PRIMARY KEY (user_id)); DESCRIBE TABLE users; SELECT * FROM users; exit;
- Get a script to import the users table and run it with Spark - make sure the u.user file is in HDFS
under
user/maria_dev/ml-100k/u.user
wget http://media.sundog-soft.com/hadoop/CassandraSpark.py spark-submit --packages datastax:spark-cassandra-connector:2.6.5-M2-s_2.11 CassandraSpark.py
- Check the results
cqlsh --cqlversion=3.4.0
USE movielens; SELECT * FROM users; exit;
A huMONGOus document database.
- You can store any kind of data in it
- MongoDB stores data in JSON-like documents
- An automatic _id field is added if you don't specify one
- You can enforce schemas if you want, but you don't have to
- You can have different fields in different documents in the same collection
- No single "key"
- You can index any field or combination of fields
- You can "shard" across indexes
- Lot of flexibility, but you have to be careful about consistency
- Aimed at enterprise use
- consistency: every read receives the most recent write or an error
- availability: every request receives a response about whether it succeeded or failed
- partition tolerance: the system continues to work even if the network is partitioned
You can only have two of these at the same time. MongoDB is CP.
- Database: a group of collections
- Collection: a group of documents
- Document: a set of key-value pairs
primary -> secondary -> secondary
-> secondary -> secondary
- Single master/primary node
- Maintains backup copies of the database instances
- Secondaries nodes can elect a new master if the master/primary goes down
- The operation log should be long enough to give time to recover the primary node once it's back up
- A majority of the nodes must agree on the primary
- Even number of nodes is not recommended
- If you can't have an odd number of nodes, you can have an "arbiter" node
- You can only have 1 "arbiter" node
- Apps must know enough servers in the replica set to be able to reach one to learn who's the primary
- Fixed in MongoDB 3.6
- Replicas only address durability, not availability
- If the primary goes down, you can't write to the database
- You can still read from the secondaries, but they might be out of date
- Delayed secondaries can be used to recover from human error
- Sharding is a way to partition data across multiple machines
- Multiple replica sets and each replica set is responsible for a range of values
- Auto-sharding sometimes doesn't work
- If your config servers go down, things can get into a bad state
- You must have 3 config servers (prior to MongoDB 3.2)
- In current versions config server are part of a replica set, it just needs to have a primary
- Not just a NoSQL database -- very flexible
- Shell is a full JavaScript interpreter
- Supports many indices
- Only 1 can be used for sharding
- More than a few are discouraged
- Text indices for text searches
- Geospatial indices
- Built-in aggregation capabilities, MapReduce, GridFS
- For some applications, you don't need Hadoop
- MongoDB still integrates with Hadoop, Spark, etc.
- A SQL connector is available
- But MongoDB still isn't designed for joins and normalized data
- Log into the VM
ssh maria_dev@localhost -p 2222 sudo su
- Get the Ambari MongoDB connector
cd /var/lib/ambari-server/resources/stacks/HDP/2.6/services git clone https://github.com/nikunjness/mongo-ambari.git
- Restart Ambari
service ambari restart
- Log into Ambari to finish the setup: http://localhost:8080
- Go to
Actions
->Add Service
and enableMongoDB
, accept all the defaults and deploy - Make sure the users table is in HDFS under
user/maria_dev/ml-100k/u.user
- Import the users table into MongoDB using Spark
wget http://media.sundog-soft.com/hadoop/MongoSpark.py spark-submit --packages org.mongodb.spark:mongo-spark-connector_2.11:2.6.5 MongoSpark.py
- Log into the VM and start the MongoDB shell
ssh maria_dev@localhost -p 2222 mongo
- Query the database without an index
use movielens db.users.find( { "user_id": 100 } ) db.users.explain().find( { "user_id": 100 } )
- Create an index and query the database with it (this will make the query much faster)
db.users.createIndex( { "user_id": 1 } ) db.users.explain().find( { "user_id": 100 } )
- Aggregate all the users by occupation and get the average age for each occupation
db.users.aggregate( [ { $group: { _id: "$occupation", avgAge: { $avg: "$age" } } } ] )
- How many users are in the database?
db.users.count()
- How many users are in the database that are older than 25?
db.users.find( { "age": { $gt: 25 } } ).count()