- 1. 모던 데이터 엔지니어링
- 2. Batch & Stream Processing
- 3. Dataflow Orchestration
- 4. Apache Spark 환경 설정
- 5. spark 기본
- 6. 병럴처리와 분산처리
- 7. RDD
- Key-Value RDD 이란
- Single-Value RDD vs Key-Value RDD
- Key-Value RDD 개념
- Key-Value RDD - Reduction
- Key-Value RDD - Join
- Key-Value RDD - Mapping values
- Key-Value RDD - 예시
- RDD Transformations vs Actions
- Transformations
- Narrow Transformations
- Wide Transformations
- Lazy 연산의 장점
- Storage Level
- Cache & Persist
- Master Worker Topology
- Spark 동작 과정
- Reduction Operations
- Parallel Reduction
- Reduction Actions
- Reduce
- Partition
- Fold
- Fold & Partition
- GroupBy
- Aggregate
- Key-Value RDD Transformations & Actions
- Key-Value RDD - GroupByKey
- Key-Value RDD - ReduceByKey
- Key-Value RDD - mapValues
- Key-Value RDD - countByKey
- Key-Value RDD - keys()
- Key-Value RDD - Joins
- Shuffling
- Partitioner를 이용한 성능 최적화 (Shuffle 최소화)
- Partition의 목적
- Partition 특징
- Partition의 종류
- Hash Partitioning
- Range Partitioning
- Memory & Disk Partition
- Disk Partition
- Repartition & Coalesce
- 연산 중에 파티션을 만드는 작업들
- map vs mapValues
- 8. Spark SQL
- Structured Data vs Unstructured Data
- Structured Data vs RDDs
- Spark SQL
- Spark SQL의 목적
- Spark SQL 소개
- DataFrame
- SparkSession
- DataFrame 만드는 법
- RDD로부터 DataFrame 만들기
- 파일로부터 DataFrame 만들기
- DataFrame을 데이터베이스 테이블처럼 사용하기
- Spark에서 사용할 수 있는 SQL문
- Python에서 Spark SQL 사용하기
- RDD를 사용안하고 DataFrame을 사용했을 때의 장점
- Datasets
- SQL 실습
- DataFrame 특징
- DataFrame의 스키마를 확인하는 법
- DataFrame Operations
- DataFrame Select
- DataFrame Agg
- DataFrame GroupBy
- DataFrame Join
- Spark SQL로 트립 수 세기
- Spark SQL로 뉴욕의 각 행정구 별 데이터 추출하기
- Spark의 두개의 엔진
- Logical Plan이란
- Physical Plan이란
- Catalyst 란
- Catalyst Logical Plan -> Physical Plan 동작 순서
- Catalyst Pipeline
- Logical Planning 최적화
- Explain
- Tungsten
- UDF
- 뉴욕 택시 데이터 분석
- 9. MLlib
- 10. Spark Streaming
- 11. Apache Airflow
- Apache Airflow란
- 워크플로우 관리 문제
- cron script와 같은 기존 방식의 문제점
- AirFlow란
- Workflow란
- Airflow의 구성요소
- Operator
- 작업(Task)
- Airflow의 유용성
- Airflow의 One Node Architecture
- Airflow의 Multi Node Architecture
- Airflow 동작 방식
- DAG의 생성과 실행
- Airflow 설치
- Airflow CLI command
- Airflow DAGs 대시보드
- Airflow DAG View
- NFT 파이프라인 프로젝트 소개
- NFT 파이프라인 - DAG Skeleton
- Airflow - 내장 Operators
- Airflow - Action Operator
- NFT 파이프 라인 - create table task 추가
- NFT 파이프 라인 - Sensor 로 API 확인하기
- NFT 파이프 라인 - OpenSea API 오류 대처법
- NFT 파이프 라인 - HttpOperator로 데이터 불러오기
- NFT 파이프 라인 - process
- NFT 파이프 라인 - store
- NFT 파이프 라인 - 테스크간 의존성 만들기
- Backfill
- Airflow로 Spark 파이프라인 관리하기 - Airflow와 Spark 환경세팅 및 사용하기
- 택시비 예측 파이프라인 만들기
- 12. Kafka
- 전통적인 아키텍쳐
- 전통적인 아키텍처의 문제점
- Kafka 소개 1
- Kafka 소개 2
- Kafka를 이용한 아키텍쳐
- Kafka의 장점들
- Kafka 사용 예
- Kafka 구성
- Kafka를 이용한 아키텍처 - 상세
- Kafka Topic
- Kafka Partition
- Kafka Message
- Kafka Offset
- Kafka Cluster
- Kafka Broker
- Kafka Producer & Consumer
- Kafka Consumer Group
- Rebalancing
- Zookeeper
- Key 에 따른 Message 전송
- Replication Factor
- 파티션 리더
- Consumer Group & Partition & Producer
- Kafka python 설치
- Kafka pyhton Consumer Producer 간단예제
- zookeeper, kafka, kafdrop 를 docker-compose로 실행하기
- kafka topic 생성
- CSV를 스트림으로 바꿔주는 Producer
- 비정상 데이터 탐지
- 13. Flink와 스트리밍 프로세싱
- Apache Flink 란
- Flink 소개
- Stream Processing은 언제 쓰일까
- Batch Processing vs Stream Processing
- Flink의 기본적인 처리 구조
- Hadoop vs Spark vs Flink 특징 비교
- Hadoop vs Spark vs Flink 데이터 처리 방식 비교
- Hadoop vs Spark vs Flink 개발 편의성 비교
- Spark vs Flink 비교
- 마이크로 배치 vs Window
- Spark vs Flink 개발 비교
- Flink의 대단한 점
- Flink 구성
- Flink Storage Streaming
- Flink Deployment
- Flink 내부 구조
- Flink의 Connectors
- Flink의 써드파티 프로젝트
- Flink 프로그램의 일반적인 플로우
- State
- State Backend
- Keyed State
- State 저장
- 체크포인팅
- Barriers
- Snapshotting
- 체크포인트 정렬
- Recovery
- Savepoints
- Exactly once vs At least once
- 데이터 처리시 시간 개념이 들어갈 때
- Time의 종류
- Processing Time
- Event Time
- Evemt Time과 Processing Time이 안 맞을 떄
- Watermark
- 병렬 환경에서의 Watermark
- Flink의 클러스터 매니저
- Flink의 아키텍처 - Job Manager
- Flink의 아키텍처 - Task Manager
- Flink의 아키텍처 - Task Slots
- Pyflink 역사
- Pyflink 란
- Pyflink의 퍼포먼스 최적화
- flink 설치
- flink 클러스터 실행 및 종료
- ETL
- E: 추출 Extract
- T: 스미카에 맞게 변환 Transform
- L: 디비에 저장 Load
- 데이터로 할 수 있는 일이 다양해지고 형태를 예측하기 불가능해지면서 스키마를 정의하기 힘들어 졌다.
- 실시간성을 요구하는 기능들
- 빨라지는 기능 추가
- 실시간 로그
- 비정형 데이터
- 서드 파티 데이터
- 최대한 많은 데이터를 미리 저장해두고 많은 양의 프로세싱을 할 수 있게 됐다.
- 컴퓨팅 파워에 대한 비용 최적화보다 비즈니스와 속도를 최적화하는 쪽이 이득이 크게 됐다.
- ELT
- E: 데이터 추출 Extract
- L: 일단 저장
- T: 쓰임새에 따라 변환
- 예
- 데이터의 로그를 Spark나 FLink를 통해 어느정도 정리 후 저장 (E&L)
- 어플리케이션 혹은 분석 툴에서 이용 가능하도록 변환 (T)
- 시스템의 복잡도에 따라 데이터 추출과 적재를 한번에 하기도 한다.
- 클라우드 웨어하우스 - Snowflake, Google Big Query
- Hadoop -> Databricks, Presto
- 실시간 빅데이터 처리 (Stream Processing)
- ETL -> ELT
- Dataflow 자동화 (Airflow)
- 데이터 분석 팀을 두기 보단 누구나 분석할 수 있도록
- 중앙화 되는 데이터 플랫폼 관리 (access control, data book)
- 소스: 비즈니스와 운영 데이터 생성
- 수집 및 변환: 운영 시스템에서 데이터 추출 -> 추출된 데이터를 저장하고 스키마 관리 -> 데이터를 분석할 수 있도록 변환
- 저장: 데이터를 쿼리와 처리 시스템이 쓸 수 있도록 저장, 비용과 확장성면으로 최적화
- 과거&예측: 데이터 분석을 위한 인사이트 만들기(Query), 저장된 데이터를 이용해 쿼리를 실행하고 필요시 분산처리(Processing), 과거에 무슨 일이 일어났는지 혹은 미래에 무슨일이 일어날지(ML)
- 출력: 데이터 분석을 내부와 외부 유저에게 제공, 데이터 모델을 운영 시스템에 적용
- Sources, Storage, Query: 서비스 레벨 보다는 로우 레벨 문제들을 푸는 분야
- Ingestion & Transformation, Processing: 일반적인 엔지니어링이 집중하는 분야
- 배치(Batch) == 일괄
- 배치 프로세싱(Batch Processing) == 일괄처리
- 많은 양의 데이터를 정해진 시간에 한꺼번에 처리하는 것
- 한정된 대량의 데이터
- 특정 시간
- 일괄 처리
- 전통적으로 쓰이는 데이터 처리 방법
- 실시간성을 보장하지 않아도 될 때
- 데이터를 한꺼번에 처리할 수 있을 때
- 무거운 처리를 할 때 ex) ML 학습
- 예시
- 매일 다음 14일의 수요와 공급을 예측
- 매주 사이트에서 관심을 보인 유저들에게 마케팅 이메일 전송
- 매주 발행하는 뉴스레터
- 매주 새로운 데이터로 머신러닝 알고리즘 학습
- 매일 아침 웹 스크래핑/크롤링
- 매달 월급 지급
- 실시간으로 쏟아지는 데이터를 계속 처리하는 것
- 이벤트가 생길 때 마다, 데이터가 들어올 때 마다 처리
- 뷸규칙적으로 데이터가 들어오는 환경일 때
- 여러개의 이벤트가 한꺼번에 들어올 때
- 오랜 시간 동안 이벤트가 하나도 들어오지 않을 떄
불규칙적으로 데이터가 들어올 떄를 가정
- 배치 프로세싱
- 배치당 처리하는 데이터 수가 달라지면서 리소스를 비효율적으로 사용하게 된다.
- 스트림 프로세싱
- 데이터가 생성되어 요청이 들어로 때 마다 처리할 수 있다.
- 실시간성을 보장해야 될 때
- 데이터가 여러 소스로부터 들어올 때
- 데이터가 가끔 들어오거나 지속적으로 들어올 때
- 가벼운 처리를 할 때 (Rule-based)
- 예시
- 사기 거래 탐지 (Fraud Detection)
- 이상 탐지 (Anomaly Detection)
- 실시간 알림
- 비즈니스 모니터링
- 실시간 수요/공급 측정 및 가격 책정
- 실시간 기능이 들어가는 애플리케이션
- 일반적인 배치 플로우
- 데이터를 모아서
- 데이터베이스에서 읽어서 처리
- 다시 데이터베이스에 담기
- 일반적인 스트림 처리 플로우
- 데이터가 들어올 때 마다(ingest)
- 쿼리/처리 후 State를 업데이트
- DB에 담기
- 데이터를 조금씩 모아서 프로세싱하는 방식
- Batch 프로세싱을 잘게 쪼개서 스트리밍을 흉내내는 방식
- 테스크 스케줄링
- 분산 실행
- 테스크간 의존성 관리
- 서비스가 커지면서 데이터 플랫폼의 복잡도가 커짐
- 데이터가 사용자와 직접 연관되는 경우가 늘어남 (워크플로우가 망가지면 서비스도 망가짐)
- 테스크 하나하나가 중요해짐
- 테스크간 의존성도 생김
- Apache Airflow
- python
- 주피터 노트북
- java
- spark
- pyspark
- https://www.anaconda.com/products/distribution
- 아나콘다 설치하면 python과 python의 기본 패키지들은 자동으로 설치되고, python과 주피터 노트북을 동시에 쉽게 설치가 가능하다.
brew install --cask adoptopenjdk8
brew install scala
brew install apache-spark
pip --version # 경로가 anaconda 인것을 확인
pip install pyspark
pyspark # spark 터미널이 뜨는지 확인
git clone https://github.com/keon/data-engineering.git
- https://www1.nyc.gov/site/tlc/about/tlc-trip-record-data.page
- 2020 March - High Volume For-Hire Vehicle Trip Records 다운로드
- clone 한 리포에 넣기
- data-engineering/01-spark/data/fhvhv_tripdata_2020-03.parquet
필드 이름 | 설명 |
---|---|
hvfhs_license_num | 회사 면허 번호 |
dispatching_base_num | 지역 라이센스 번호 |
pickup_datetime | 승차 시간 |
dropoff_datetime | 하차 시간 |
PULocationID | 승차 지역 ID |
DOLocationID | 하차 지역 ID |
SR_Flag | 합승 여부 Flag |
- HDFS 파일 시스템
- Yarn 리소스 관리
- Map Reduce 연산 엔진 -> Spark가 이것을 대체한다.
- 빠르다 = 빅데이터의 In-Memory 연산
- 노드는 필요에 따라 계속 늘릴 수 있다.
- 수평적 확장이 가능하다.
- Hadoop MapReduce 보다 빠르다
- 메모리 상에선 100배
- 디스크 상에선 10배
- Lazy Evaluation
- 태스크를 정의할 때는 연산을 하지 않다가 결과가 필요할 때 연산한다.
- 기다리면서 연산 과정을 최적화 할 수 있다.
- Driver Program, Cluster Manager, Worker Node 로 이루어져 있다.
- Driver Program: 우리가 사용하는 컴퓨터, python | java | scala 와 같은 script로 task을 정의한다.
- Cluster Manager: 정의된 task 즉 일거리를 분배 한다.
- hadoop에서는 yarn cluster manager을 사용할 수 있다.
- aws에서는 elastic mapreduce manager을 사용할 수 있다.
- Worker Node
- 1CPU코어 당 1Node 배치
- 인메모리 연산을 진행한다.
- spark는 확장성을 고려해서 설계 했기 때문
- Resilient Distributed Dataset (RDD)
- 여러 분산 노드에 걸쳐서 저장
- 변경이 불가능
- 여러개의 파티션으로 분리
Pandas | Spark |
---|---|
1개의 노드 | 여러개의 노드 |
Eager Execution - 코드가 바로 실행 | Lazy Execution - 실행이 필요할 때 까지 기다림 |
컴퓨터 하드웨어에 제한을 받음 | 수평적 확장이 가능 |
In-Memory 연산 | In-Memory 연산 |
Mutable Data | Immutable Data |
- Spark 1.0
- 2014 년 정식 발표
- RDD를 이용한 인메모리 처리 방식
- DataFrame (V1.3)
- Project Tungsten - 엔진 업그레이드로 메모리와 CPU 효율 회적화
- Spark 2.0
- 2016 년 발표
- 단순화 되고 성능 개선
- Structed Streaming
- Dataset 이라는 DataFrame의 확장형 자료구조 등장
- Catalyst Optimizer 프로젝트 - 언어에 상관없이 동일한 성능을 보장 - Scala, Java, Python, R
- Spark 3.0
- 2020 년 발표
- Mlib 기능 추가
- Spark SQL 기능 추가
- Spark 2.4보다 약 2배 빨라짐 - Adaptive execution, Dynamic partition pruning
- PySpark 사용성 개선
- 딥러닝 지원 강화 - GPU노드 지원, 머신러닝 프레임워크와 연계 가능
- GraphX - 분산 그래프 연산
- Python2 지원이 끊김
- 쿠버네티스 지원 강화
- 새 기능이 추가되고 성능이 좋아지고 있지만, 근본은 바뀌지 않는다.
- Spark Core
- Spark SQL
- Spark Streaming
- MLlib
- GraphX
lines = sc.textFile("") # lines == RDD
- Resilient Distributed Dataset
- 데이터는 클러스터에 흩어져있지만 하나의 파일인것 처럼 사용 가능
- 탄력적이고 불변하는 성질이 있다 (Resilient & Immutable)
- 데이터가 여러군데서 연산을 하다가 여러 노드 중 하나가 망가진다면? (네트워크 장애 | 하드웨어 / 메모리 문제 | 알수없는 갖가지 이유 떄문에)
- 데이터가 불변(Imuutable) 하면 문제가 일어날 때 복원이 가능해진다.
- RDD1이 변환을 거치면, RDD1이 바뀌는게 아니라 새로운 RDD2가 만들어진다. (Imuutable)
- 변환을 거칠 때 마다 연산의 기록이 남는다.
- RDD의 변환 과정은 하나의 비순환 그래프(Acyclic Graph)로 그릴 수 있는데, 이 특징 덕분에 문제가 생길 경우에 쉽게 전 RDD로 돌아갈 수 있다.
- Node 1이 연산 중 문제가 생기면 다시 복원 후 Node2 에서 연산하면 된다. (Resillient)
- 컴파일시 Type을 판별할 수 있어 문제를 일찍 발견할 수 있다.
- Structured / Unstructured 둘다 담을 수 있다.
- Unstructured Data - 로그 or 자연어
- Structured Data - RDB or DataFrame
- 결과가 필요할 떄 까지 연산을 하지 않는다
- 두가지 연산이 있는데, T = 변환 A = 액션, 예) RDD1 -> T -> RDD2 -> T -> RDD3 -> A --> RDD4
- 액션을 할 때 까지 변환은 실행되지 않는다.
- Action을 만나면 그때 변환(T) 연산을 진행한다.
- Spark Operation = Transform + Action
- 유연하다
- 짧은 코드로 할 수 있는게 많다
- 개발할 때 무엇보다는 어떻게에 대해 더 생각하게 한다 (how-to)
- 게으른 연산 덕분에 데이터가 어떻게 변환될지 생각하게 된다
- 데이터가 지나갈 길을 닦는 느낌
spark-submit count_trips.py # 트립 수 세기
python3 visualiza_trips_date.py # 차트로 그리기
jupyter notebook . # 주피터로 count_trips.ipynb 열기 -> 코드에 대한 자세한 설명들
RDD.map(<task>)
- 데이터를 여러개로 쪼개고
- 여러 쓰레드에서 각자 task를 적용
- 각자 만든 결과값을 합치는 과정
- 데이터를 여러개로 쪼개서 여러 노드로 보낸다.
- 여러 노드에서 각자 독립적으로 task를 적용
- 각자 만든 결과값을 합치는 과정
- 노드간 통신 같이 신경써야될 것이 늘어난다
- Spark를 이용하면 분산된 환경에서도 일반적인 병렬처리를 하듯이 코드를 짜는게 가능하다.
- Spark는 분산된 환경에서 데이터 병렬 모델을 구현해서 추상화 시켜주기 때문에 가능한것이다. (RDD)
- 그렇다고 생각 없이 spark 코딩을 하면 성능을 끌어내기는 힘들다. (노드간 통신 속도를 신경써야 함)
- 분산처리로 넘어가면서 신경써야될 문제가 많아졌다.
- 부분 실패 - 노드 몇개가 프로그램과 상관 없는 이유로 인해 실패
- 속도 - 많은 네트워크 통신을 필요로 하는 작업은 속도가 저하
RDD.map(A).filter(B).reduceByKey(C).take(100) # 1
RDD.map(A).reduceByKey(C).filter(B).take(100) # 2
"""
1번이 더 좋은 성능의 코드이다.
reduceByKey는 여러노드에서 데이터를 가져오기 떄문에 통신을 필요로 하는데,
filter를 통해서 데이터양을 줄이고 처리하는것이 효율적이기 때문
메모리 > 디스크 > 네트워크 순으로 빠르기떄문에 메모리에서 최대한 많이 처리하는 것이 좋다.
네트워크는 메모리 연산에 비해 100만배 정도 느리다
"""
- Structured Data를 Spark와 연계해서 쓸수 있게 해주는 도구 중 하나이다.
- Key와 Value 쌍을 갖는 Key-Value RDD
- (Key, Value) 쌍을 갖기 때문에 Pairs RDD라도고 불림
- 간단한 데이터베이스처럼 다룰 수 있다.
- Single-Value RDD: 테스트에 등장하는 단어 수 세기 (날짜) -> 1차원 적인 연산
- Key-Value RDD: 넷플릭스 드라마가 받은 평균 별점 (날짜, 승객수) -> 고차원 적인 연산
- Key와 Value 쌍을 가진다
- 예) 지역 ID 별로 택시 운행수는 어떻게 될까?
- Key: 지역 ID
- Value: 운행 수
- 다른예) 드라마 별로 별점 수 모아보기, 평균 구하기
- 다른예) 이커머스 사이트에서 상품당 별 평점 구하기
- 예) 지역 ID 별로 택시 운행수는 어떻게 될까?
- 코드상으로는 많이 다르지 않다
pairs = rdd.map(lambda x: (x,1))
"""
[
지역
지역
]
[
(지역, 1)
(지역, 1)
]
"""
- reduceByKey() - 키 값을 기준으로 테스크 처리
- groupByKey() - 키 값을 기준으로 벨류를 묶는다
- sortByKey() - 키 값을 기준으로 정렬
- keys() - 키 값 추출
- values() - 벨류값 추출
pairs = rdd.map(lambda x: (x,1))
count = pairs.reduceByKey(lambda a, b,: a+b)
"""
짜장면
짜장면
짬뽕
짬뽕
(짜장면, 1)
(짜장면, 1)
(짬뽕, 1)
(짬뽕, 1)
(짜장면, 2)
(짬뽕, 2)
"""
- join
- rightOuterJoin
- leftOuterJoin
- subtractByKey
- key를 바꾸지 않는경우 map()대신 value만 다루는 mapValues() 함수를 쓰는게 좋다
- spark 내부에서 파티션을 유지할 수 있어서 더욱 효율적이다.
- mapValues(), flatMapValues() 두개다 Value만 다루는 연산들이고 RDD에서 key는 유지됨
1-spark/category-review-average.ipynb
- Transformation
- 결과값으로 새로운 RDD를 반환
- 지연 실행 - Lazy Execution
- map()
- flatMap()
- filter()
- distinct()
- reduceByKey()
- groupByKey()
- mapValues()
- flatMapValues()
- sortByKey()
- Actions
- 결과값을 연산하여 출력하거나 저장 (python object 반환 )
- 즉시 실행 - Eager Execution
- collect()
- count()
- countByValues()
- take()
- top()
- reduce()
- fold()
- foreach()
1-spark/rdd_transformations_actions.ipynb
- transformations = Narrow + Wide
- 1:1 변환
- filter(), map(), flatMap(), sample(), union()
- 1열을 조작하기 위해 다른 열/파티션의 데이터를 쓸 필요가 없음.
- Shuffling
- Intersection and join, distinct, cartesian, reduceByKey(), groupByKey()
- 아웃풋 RDD의 파티션에 다른 파티션의 데이터가 들어갈 수 있음
- 성능상 많은 리소스를 요구하게 되고, 최소화하고 최적화가 필요하다.
- 메모리를 최대한 활용할 수 있다. (디스크, 네트워크 연산을 최소화 할 수 있다.)
- 데이터를 다루는 task는 반복되는 경우가 많아서(ex 머신러닝 학습), Lazy로 처리하면 비효율적인부분을 효율적으로 처리할 수 있다.
- Task -> Disk -> Task -> Disk 로 작업을 하면 Disk에 자주들르게 되어서 비효율적이다.
- Task -> Task 로 넘어갈 때 in-memory로 주고받으면 효율적이다.
- in-memory로 주고 받으려면 어떤 데이터를 메모리에 남겨야 할 지 알아야 가능하다.
- Transformations는 지연 실행 되기 때문에 메모리에 저장해둘 수 있다.
- MEMORY_ONLY: 메모리에만 저장
- MEMORY_AND_DISK: 메모리와 디스크 모두 저장, 메모리에 없을경우 디스크까지 보겠다.
- MEMORY_ONLY_SER: 메모리를 아끼기 위해서 serialize (꺼내올 때 deserialize 과정이 추가됨)
- MEMORY_AND_DISK_SER: 메모리와 디스크에 serialize
- DISK_ONLY: 디스크에만
- 데이터를 메모리에 남겨두고 싶을 때 사용할 수 있는 함수
categoryReviews = filtered_lines.map(parse)
result1 = categoryReviews.take(10)
result2 = categoryReviews.mapValues(lambda x: (x,1)).collect()
# categoryReviews는 result1과 result2를 만들면서 2번 만들어짐.
# .persist()를 추가하면 메모리에 저장해두고 쓸 수 있음
# categoryReviews = filtered_lines.map(parse).cache()
- Cache
- 디폴트 Storage Level 사용
- RDD: MEMORY_ONLY
- DF: MEMORY_AND_DISK
- Persist
- Storage Level을 사용자가 원하는대로 지정 가능
- spark는 Master Worker Topology로 구성 되어 있다.
- 스파크를 쓰면서 잊지 말아야 할 점
- 항상 데이터가 여러 곳에 분산되어 있다는 것
- 같은 연산이어도 여러 노드에 걸쳐서 실행 된다는 점
- Driver Program이 Spark Context를 생성해서 어플리케이션을 만든다.
- Spark Context가 Cluster Manager에 연결을 한다.
- Cluster Manager가 자원들을 할당한다.
- Cluster Manager가 클러스터에 있는 노드들의 Executor를 수집한다.
- Executor들은 연산을 수행하고 데이터를 저장한다.
- Spark Context가 Executor 들에게 실행할 Task를 전송한다음에
- 실행된 Task들이 결과값들을 내뱉는데, 이것을 Driver Program에 보내게 된다.
RDD.foreach(lambda x: print(x))
"""
Driver Program에서 위 코드를 실행하면 실행결과가 아무것도 나오지 않는다.
왜냐하면 foreach가 액션이기 때문에, Driver가 아닌 Executor에서 바로 실행 되기 떄문이다.
"""
foods = sc.parallelize(["짜장면","마라탕", ...])
three = foods.take(3)
"""
three 결과값은 Driver Program에 저장 된다.
일반적으로 액션은 Driver Pgogram이 Worker Node로부터 데이터를 받는 것 까지 포함 한다.
결국, Executor에게 take 연산을 시행하라고 명령하고, 그결과를 driver node에게 돌려달라고 요청하는 것이다.
"""
- Reduction: 요소들을 모아서 하나로 합치는 작업, 많은 Spark의 연산들이 reduction이다.
- 대부분의 Action은 Reduction이다.
- Reduction: 근접하는 요소들을 모아서 하나의 결과로 만드는 일
- 파일 저장, collect() 등과 같이 Reduction이 아닌 액션도 있다.
- 파티션 마다 독립적으로 작업을 처리할 수 있어야 분산된 병렬 처리가 가능하다.
- 파티션이 다른 파티션의 결과에 의존하게 되면, 한 테스크가 전 테스크를 기다려야 되기 때문에 작업을 동시에 처리할 수 없게 되고 병렬 처리가 불가능해지므로 분산에 의미가 없어진다.
- 대표적인 Reduction Actions: Reduce, Fold, GroupBy, Aggregate
from operator import add
sc.parallelize([1,2,3,4,5]).reduce(add)
# 15
- 파티션이 어떻게 나뉠지 프로그래머가 정확히 알기 어렵다.
- 연산의 순서와 상관 없이 결과 값을 보장하려면
- 교환 법칙 (ab = ba)
- 결합 법칙 (ab)c = a(bc)
# 파티션에 따라 결과 값이 달라지게 된다.
# 분산된 파티션들의 연산과 합치는 부분을 나눠서 생각해야 한다.
>>> sc.parallelize([1,2,3,4]).reduce(lambda x,y: (x*2)+y) # 파티션 지정 X
26
>>> sc.parallelize([1,2,3,4],1).reduce(lambda x,y: (x*2)+y) # 파티션 1개로 지정
26
>>> sc.parallelize([1,2,3,4],2).reduce(lambda x,y: (x*2)+y) # 파티션 2개로 지정
18
>>> sc.parallelize([1,2,3,4],3).reduce(lambda x,y: (x*2)+y) # 파티션 3개로 지정
18
>>> sc.parallelize([1,2,3,4],4).reduce(lambda x,y: (x*2)+y) # 파티션 4개로 지정
26
"""
(1,2,3,4) -> ((1*2+2)*2+3)*2+4=26 # 파티션 1
(1,2)(3,4) -> ((1*2+2)*2 + (3*2)+4) = 18 # 파티션 2
"""
- Reduce와 유사하지만, Fold는 시작값을 설정해준다 라는 부분만 다름.
from operator import add
sc.parallelize([1,2,3,4,5]).fold(0, add)
# 15
rdd = sc.parallelize([2,3,4],4)
rdd.reduce(lambda x, y: x*y) # 24
rdd.fold(1, lambda x, y: x*y) # 24
rdd.reduce(lambda x, y: x+y) # 9 (0+2+3+4 =9)
rdd.fold(1, lambda x, y: x+y) # 14 (1+1) + (1+2) + (1+3) + (1+4) = 14 , 각 파티션의 시작값이 1
rdd = sc.parallelize([1,1,2,3,5,8])
result = rdd.groupBy(lambda x: x % 2).collect()
sorted([(x, sorted(y)) for (x,y) in result])
# [(0, [2,8]), (1, [1,1,3,5])]
- RDD 데이터 타입과 Action 결과 타입이 다를 경우 사용
- 파티션 단위의 연산 결과를 합치는 과정을 거친다
- RDD.aggregate(zeroValue, seqOp, combOp)
- zeroValue: 각 파티션에서 누적할 시작 값
- seqOp: 타입 변경 함수
- combOp: 합치는 함수
- 많이 쓰이는 reduction action
- 대부분의 데이터 작업은 크고 복잡한 데이터 타입 -> 정제된 데이터
seqOp = (lambda x,y: (x[0] + y, x[1] + 1))
combOp = (lambda x,y: (x[0] + y[0], x[1] + y[1]))
sc.parallelize([1,2,3,4]).aggregate((0,0), seqOp, combOp) # (10,4)
sc.parallelize([]).aggregate((0,0), seqOp, combOp) # (0,0)
- Transformations
- groupByKey
- reduceByKey
- mapValues
- keys
- join (+leftOuterJoin, rightOuterJoin)
- Actions
- countByKey
- Key-Value RDD에서 Tranformations가 많은 이유: 처리과정에서 나온 결과값이 파티션이 유지가 안되더라도 값이 굉장히 크기 때문에
- groupBy: 함수를 기준으로 Group
rdd = parallelize([1,1,2,3,5,8])
result = rdd.groupBy(lambda x: x % 2).collect()
sorted([(x, sorted(y)) for (x,y) in result()])
# [(0, [2,8]), (1, [1,1,3,5])]
- groupByKey: Key를 기준으로 Group
rdd = parallelize([("a", 1), ("b", 1), ("a", 1)])
sorted(rdd.groupByKey().mapValues(len).collect())
# [('a',2), ('b',1)]
sorted(rdd.groupByKey().mapValues(list).collect())
# [('a', [1,1]), ('b', [1])]
- reduce: 함수를 기준으로 요소들을 합침 (action)
sc.parallelize([1,2,3,4,5]).reduce(add)
# 15
- reduceBykey: key를 기준으로 그룹을 만들고 합침 (trans)
rdd = sc.parallelize([("a",1), ("b",1), ("a",1)])
sorted(rdd.reduceByKey(add).collect())
# [('a',2), ('b',1)]
- 개념적으로는 groupByKey + reduction
- 하지만, groupByKey보다 훨씬 빠르다
- 함수를 밸류에게만 적용한다
- 파티션과 키는 그대로 납둔다. (파티션과 키를 왔다갔다 하지않아서 네트워크 비용을 줄일 수 있다)
x = sc.parallelize([("a", ["apple","banana","lemon"]), ("b", ["grapes"])])
def f(x): return len(x)
x.mapValues(f).collect()
# [('a',3), ('b',1)]
- 각 키가 가진 요소들을 센다
rdd = sc.parallelize([("a",1), ("b",1), ("a",1)])
sorted(rdd.countByKey().items())
# [('a',2), ('b',1)]
- Transformation
- 모든 Key를 가진 RDD를 생성
m = sc.parallelize([(1,2), (3,4)]).keys()
m.collect()
# [1,3]
- Transformation
- 여러개의 RDD를 합치는데 사용
- 대표적으로 두가지의 join 방식이 존재한다.
- Inner Join (join)
- Outer join (left outer, right outer)
rdd1 = sc.parallelize([("foo",1), ("bar",2), ("baz",3)])
rdd2 = sc.parallelize([("foo",4), ("bar",5), ("bar", 6), ("zoo", 1)])
rdd1.join(rdd2).collect()
# [('bar',(2,5)), ('bar', (2,6)), ('foo', (1,4))]
rdd1.leftOuterJoin(rdd2).collect()
# [('baz', (3, None)), ('bar', (2,5)), ('bar', (2,6)), ('foo', (1,4))]
rdd1.rightOuterJoin(rdd2).collect()
# [('bar', (2,5)), ('bar', (2,6)), ('zoo', (None,1)), ('foo', (1,4))]
- 그룹핑시 데이터를 한 노드에서 다른노드로 옮길 때 사용
- 성능을 (많이) 저하시킨다
- groupByKey를 할 때도 발생한다.
- 여러 노드에서 데이터를 주고 받게 되서 네트워크 연산의 비용이 높다
- Shuffle을 일으킬 수 있는 작업들
- Join, leftOuterJoin, rightOuterJoin
- GroupByKey
- ReduceByKey
- ComebineByKey
- Distinct
- Intersection
- Repartition
- Coalesce
- Shuffle이 발생하는 시점
- 결과로 나오는 RDD가 원본 RDD의 다른 요소를 참조하거나 다른 RDD를 참조할 때
- 미리 파티션을 만들어 두고 캐싱 후 reduceByKey 실행
- 미리 파티션을 만들어 두고 캐싱 후 join 실행
- 둘다 파티션과 캐싱을 조합해서 최대한 로컬 환경에서 연산이 실행되도록 하는 방식
- 셔플을 최소화하면 10배의 성능 향상이 가능하다.
예시 groupByKey vs reduceByKey
# reduceByKey
(textRDD
.flatMap(lambda lin: line.split()) # 동일한 노드에서 실행
.map(lambda work: (word, 1)) # 동일한 노드에서 실행
.reduceByKey(lambda a, b: a+b)) # 셔플 발생
# groupByKey
(textRDD
.flatMap(lambda line: line.split())
.map(lambda word: (word,1))
.groupByKey() # 셔플 발생
.map(lambda (w, counts): (w, sum(counts))))
# 가급적이면, groupByKey대신에 reduceByKey로 대체 가능하니까 reduceByKey를 사용하자.
- 데이터를 최대한 균일하게 퍼트리고, 쿼리가 같이 되는 데이터를 최대한 옆에 두어 검색 성능을 향상
- RDD는 쪼개져서 여러 파티션에 저장됨
- 하나의 파티션은 하나의 노드(서버)에
- 하나의 노드는 여러개의 파티션을 가질 수 있음
- 파티션의 크기와 배치는 자유롭게 설정 가능하며 성능에 큰 영향을 미침
- Key-Value RDD를 사용할 때만 의미가 있다.
- 스파크의 파티셔닝 == 일반 프로그래밍에서 자료구조를 선택하는 것
- Hash Partitioning
- Range Partitioning
- 데이터를 여러 파티션에 균일하게 분배하는 방식
- 딕셔너리와 비슷한 방식으로 분배
- 잘못된 사용
- 데이터를 여러 파티션에 균일하게 분배하는 방식인데,
- [극단적인 예] 2개의 파티션이 있는 상황에서 짝수의 Key만 있는 데이터셋에 Hash 함수가 (x%2)인 경우 (한쪽 파티션만 사용.)
- 순서가 있는, 정렬된 파티셔닝
- 키의 순서에 따라 정렬
- 키의 집합의 순서에 따라 정렬
- 서비스의 쿼리 패턴이 날짜 위주면 일별 Range Partition 고려
- Disk에서: partitionBy() (보통 이것을 많이 사용)
- 메모리에서: repartition(), coalesce()
- 사용자가 지정한 파티션을 가지는 RDD를 생성하는 함수: partitionBy()
- 파티션을 만든 후엔 persist()를 해야 한다
- 하지않으면, 다음 연산에 불릴떄 마다 반복하게 된다 (셔플링이 반복적으로 일어난다)
pairs = sc.parallelize([1,2,3,4,2,4,1]).map(lambda x: (x,x))
pairs.collect()
# [(1,1),(2,2),(3,3),(4,4),(2,2),(4,4),(1,1)]
pairs.partitionBy(2).glom().collect()
# [[(2,2), (4,4), (2,2), (4,4)], [(1,1), (3,3), (1,1)]]
pairs.partitionBy(2, lambda x: x%2).glom().collect()
# [[(2,2), (4,4), (2,2), (4,4)], [(1,1), (3,3), (1,1)]]
# glom은 파티션정보까지 같이 보는 함수
- 둘다 파티션의 갯수를 조절하는데 사용
- 둘다 shuffling을 동반하여 매우 비싼 작업
- Repartition: 파티션의 크기를 줄이거나 늘리는데 사용
- Coalesce: 파티션의 크기를 줄이는데 사용 (줄일땐 Repartition보다 성능이 좋음 )
- Join (+ Outer join)
- groupByKey
- reduceByKey
- foldByKey
- partitionBy
- Sort
- mapValues (parent)
- flatMapValues (parent)
- filter (parent)
- 등
- mapValues, flatMapValues, filter는 parent RDD에서 파티션이 정의되어 있으면 그걸 그대로 사용
- map, flatMap은 왜 파티션을 안만들까? => map, flatMap은 key값이 바뀔 수 있기 때문에 파티션을 해놓은게 의미가 없어질 수 있기 때문
- 그래서 파티션이 잘 정의되어 있다면 mapValues, flatMapValues를 쓰는것이 좋다.
join().filter() vs filter().join() 을 비교하면 당연히 filter().join()이 성능이 더 빠르다.
위와 같은 고민을 스파크가 알아서 해주면 좋겠는데, 어떻게 가능할까?
데이터가 구조화 되어 있다면 자동으로 최적화가 가능하다.
- Unstructured: Free Form
- 로그 파일
- 이미지
- Semi Structured: 행과 열
- CSV
- JSON
- XML
- Structured: 행과 열 + 데이터 타입 (스키마)
- 데이터베이스
- RDD에서는
- 데이터의 구조를 모르기 떄문에 데이터를 다루는 것을 개발자에게 의존한다.
- map, flatMap, filter 등을 통해 유저가 만든 function을 수행
- Structured Data에서는
- 데이터의 구조를 이미 알고 있으므로 어떤 테스크를 수행할 것인지 정의만 하면 됨
- 최적화도 자동으로 할 수 있음
- 구조화된 데이터를 다룰 수 있게 해준다.
- 유저가 일일이 function을 정의하는 일 없이 작업을 수행 할 수 있다.
- 자동으로 연산이 최적화 된다
- 스파크 프로그래밍 내부에서 관계형 처리를 하기 위해 사용
- 스키마의 정보를 이용해 자동으로 최적화를 하기 위해 사용
- 외부 데이터셋을 사용하기 쉽게 하기 위해 사용
- 스파크 위에 구현된 하나의 패키지
- 3개의 주요 API
- SQL
- DataFrame
- Datasets
- 2개의 백엔드 컴포넌트
- Catalyst - 쿼리 최적화 엔진
- Tungsten - 시리얼라이저(용량 최적화)
- Spark Core에 RDD가 있다면, Spark SQL에는 DataFrame이 있다.
- DataFrame은 테이블 데이터셋이라고 보면 됨
- 개념적으로는 RDD에 스키마가 적용된 것이라고 보면 됨
- Spark Core에 SparkContext가 있다면 Spark SQL에는 SparkSession이 있다.
spark = SparkSession.builder.appName("test-app").getOrCreate()
- RDD에서 스키마를 정의한다음 변형을 하거나
- CSV, JSON등의 데이터를 받아오면 된다
- Schema를 자동으로 유추해서 DataFrame 만들기
- Schema를 사용자가 정의하기
# RDD 만들기
lines = sc.textfile("example.csv")
data = lines.map(lambda x: x.split(","))
preprocessed = data.map(lambda x: Row(name=x[0], price=int(x[1])))
# Infer (Schema를 유추해서 만들기)
df = spark.createDataFrame(preprocessed)
# Specify (Schema를 사용자가 정의하기)
schema = StructType(
StructField("name", StringType(), True),
StructField("price", StringType(), True)
)
spark.createDataFrame(preprocessed, schema).show()
from pyspark.sql import SparkSession
spark = SparkSession.builder.appName("test-app").getOrCreate()
# JSON
dataframe = spark.read.json('dataset/nyt2.json')
# TXT FILE
dataframe_txt = spark.read.text('text_data.txt')
# CSV FILE
dataframe_csv = spark.read.csv('csv_data.csv')
# PARQUET FILE
dataframe_parquet = spark.read.load('parquet.data.parquet')
- createOrReplaceTempView() 함수로 temporary view를 만들어 줘야 함.
data.createOrReplaceTempView("mobility_data")
spark.sql("SELECT pickup_datetime FROM mobility_data LIMIT 5").show()
- Hive Query Language와 거의 동일
- Select
- From
- Where
- Count
- Having
- Group By
- Order By
- Sort By
- Distinct
- Join
- Spark SQL을 사용하기 위해 사용하는 SparkSession
- SparkSession 으로 불러오는 데이터는 DataFrame
from pyspark.sql import SparkSession
spark = SparkSession.builder.appName("test-app").getOrCreate()
# JSON
dataframe = spark.read.json('dataset/nyt2.json')
# TXT FILE
dataframe_txt = spark.read.text('text_data.txt')
# CSV FILE
dataframe_csv = spark.read.csv('csv_data.csv')
# PARQUET FILE
dataframe_parquet = spark.read.load('parquet.data.parquet')
- SQL문을 사용해서 쿼리가 가능하다.
data.createOrReplaceTempView("mobility_data")
spark.sql("SELECT pickup_datetime FROM mobility_data LIMIT 5").show()
- 함수를 사용해서 쿼리도 가능하다.
df.select(df['name'], df['age'] + 1).show()
df.filter(df['age'] > 21).show()
df.groupBy("age").count().show()
- DataFrame을 RDD로 변환해 사용할 수도 있다.
rdd = df.rdd.map(tuple)
- (하지만, RDD를 덜 사용하는 쪽이 좋다)
- MLLib이나 Spark Streaming 같은 다른 스파크 모듈들과 사용하기 편하다.
- 개발하기 편하다.
- 최적화도 알아서 된다.
- Type이 있는 DataFrame
- PySpark에선 크게 신경쓰지 않아도 된다.
./1-spark/learn-sql.ipynb
- 관계형 데이터이다.
- 한마디로 관계형 데이터셋 = RDD + Relation
- RDD가 함수형 API를 가졌다면 DataFrame은 선언형 API
- 자동으로 최적화가 가능
- 타입이 없다
- RDD의 확장판
- 지연 실행 (Lazy Execution)
- 분산 저장
- Immutable
- 열(Row) 객체가 있다
- SQL 쿼리를 실행할 수 있다.
- 스키마를 가질 수 있고 이를 통해 성능을 더욱 최적화 할 수 있다
- CSV, JSON, Hive 등으로 읽어오거나 변환이 가능
- dtypes
- show()
- 테이블 형태로 데이터를 출력
- 첫 20개의 열만 보여준다
- printSchema()
- 스키마를 트리 형태로 볼 수 있다.
- SQL 과 비슷한 작업이 가능하다.
- Select
- Where
- Limit
- OrderBy
- GroupBy
- Join
- 사용자가 원하는 Column이나 데이터를 추출 하는데 사용
df.select('*').collect()
df.select('name','age').collect()
df.select(df.name, (df.age+10).alias('age')).collect()
- Aggregate의 약자로, 그룹핑 후 데이터를 하나로 합치는 작업
df.agg({"age",: "max"}).collect()
# [Row(max(age)=5)]
from pyspark.sql improt functions as F
df.agg(F.min(df.age)).collect()
# [Row(min(age)=2)]
- 사용자가 지정한 column을 기준으로 데이터를 Grouping하는 작업
df.groupBy().avg().collect()
# [Row(avg(age)=3.5)]
sorted(df.groupBy('name').agg({'age': 'mean'}).collect())
# [Row(name='Alice', avg(age)=2.0), Row(name='Bob', avg(age)=5.0)]
sorted(df.groupBy(df.name).avg().collect())
# [Row(name='Alice', avg(age)=2.0), Row(name='Bob', avg(age)=5.0)]
sorted(df.groupBy(['name', df.age]).count().collect())
# [Row(name='Alice', age=2, count=1), Row(name='Bob', age=5, count=1)]
- 다른 DataFrame과 사용자가 지정한 Column을 기준으로 합치는 작업
df.join(df2, 'name').select(df.name, df2.height).collect()
# [Row(name='Bob', height=85)]
- 이전에 RDD로 실습해보았는데, 이번엔 Spark SQL로 해보자.
./1-spark/trip_count_sql.ipynb
- 두 테이블의 JOIN 실습
./1-spark/trip_count_sql_by_zone-Copy1.ipynb
- 스파크는 쿼리를 돌리기 위해 두가지 엔진을 사용한다.
- Catalyst, Tungsten
- 수행 해야 하는 모든 transformation 단계에 대한 추상화
- 데이터가 어떻게 변해야 하는지 정의하지만,
- 실제 어디서 어떻게 동작 하는지는 정의하지 않음
- Logical Plan이 어떻게 클러스터 위에서 실행 될지 정의
- 실행 전략을 만들고 Cost Model에 따라 최적화
- SQL과 DataFrame이 구조가 있는 데이터를 다룰 수 있게 해주는 모듈
- Logical Plan을 Physical Plan으로 바꾸는 일을 한다.
- 분석: DataFrame 객체의 relation을 계산, 칼럼의 타입과 이름 확인
- Logical Plan 최적화
- 상수로 표현된 표현식을 Compile Time에 계산 (x runtime)
- Predicate Pushdown: join & filter -> filter & join
- Projection Pruning: 연산에 필요한 칼럼만 가져오기
- Physical Plan 만들기: Spark에서 실행 가능한 Plan으로 변환
- 코드 제네레이션: 최적화된 Physical Plan을 Java Bytecode로
SELECT zone_data.Zone, count(*) AS trips \
FROM trip_data JOIN zone_data \
ON trip_data.PULocationID = zone_data.LocationID \
WHERE trip_data.hvfhs_license_num = 'HV0003' \
GROUP BY zone_data.Zone order by trips desc
기본 순서
- Scan: 두개의 테이블에서 데이터 추출
- Join:
join
- Filter:
trip_data.hvfhs_license_num = 'HV0003
- Project:
count(*) AS trips
- Aggregate:
group by
최적화
- Scan: 두개의 테이블에서 데이터 추출
- Filter:
trip_data.hvfhs_license_num = 'HV0003
- Join:
join
- Project:
count(*) AS trips
- Aggregate:
group by
spark.sql(query).explain(True)
- explain(True) 명령어를 입력하면 아래의 정보들을 보여준다
- Parsed Logical Plan: 사용자가 쓴 코드 그대로
- Analyzed Logical Plan: 사용자가 지정한 테이블의 무슨 컬럼이 있는지 확인한다.
- Optimized Logical Plan: Filtering코드를 더 빨리 하는 등 최적화된 코드를 보여준다
- Physical Plan: 디테일한 Plan을 보여줌
- explain(True 없이) 명령어를 입력하면 아래 정보만 나온다.
- Physical Plan
- Physical Plan이 선택되고 나면 분산 환경에서 실행될 Bytecode가 만들어진다. (Code Generation)
- 스파크 엔진의 성능 향상이 목적
- 메모리 관리 최적화
- 캐시 활용 연산
- 코드 생성
- user-defined-functions
- sql 문안에서 쓸 수 있는 function을 만드는것
실습
./1-spark/user-defined-functions.ipynb
./1-spark/taxi-analysis.ipynb
- Machine Learning Library
- ML을 쉽고 확장성 있게 적용하기 위해 사용
- 머신러닝 파이프라인 개발을 쉽게 하기 위해
- 데이터를 이용해 코딩을 하는 일
- 최적화와 같은 방법을 통해 패턴을 찾는일
- 알고리즘
- Classification
- Regression
- Clustering
- Recommendation
- 파이프라인
- Training
- Evaluating
- Tuning
- Persistence
- Feature Engineering
- Extraction
- Transformation
- Utils
- Linear algebra
- Statistics
- 데이터 로딩 -> 전처리 -> 학습 -> 모델 평가
- 파라미터 튜닝 후 위 과정을 다시 시도
- 피쳐 엔지니어링
- 통계적 연산
- 흔히 쓰이는 ML알고리즘들
- Regression (Linea, Logistic)
- Support Vector Machines
- Naive Bayes
- Decision Tree
- K-Means clustering
- 추천 (Alternating Least Squares)
- 아직 RDD API가 있지만, "maintenance mode"
- 새로운 API는 개발이 끊김
- DataFrame을 쓰는 MLlib API를 Spark ML이라고도 부름
- DataFrame
- Transformer
- Estimator
- Evaluator
- Pipeline
- Parameter
- 피쳐 변환과 학습된 모델을 추상화
- 모든 Transformer는 transform() 함수를 갖고 있다
- 데이터를 학습이 가능한 포멧으로 바꾼다
- DF를 받아 새로운 DF를 만드는데, 보통 하나 이상의 column을 더하게 된다
- 예)
- Data Normalization
- Tokenization
- 카테고리컬 데이터를 숫자로 (one-hot encoding)
- 모델의 학습 과정을 추상화
- 모든 Estimator는 fit() 함수를 갖고 있다
- fit()은 DataFrame을 받아 Model을 반환
- 모델을 하나의 Transformer
- 예)
- lr = LinearRegression()
- model = lr.fit(data)
- metric을 기반으로 모델의 성능을 평가
- 예) Root mean squared error (RMSE)
- 모델을 여러개 만들어서, 성능을 평가 후 가장 좋은 모델을 뽑는 방식으로 모델 튜닝을 자동화 할 수 있다.
- 예)
- BinarClassificationEvaluator
- CrossValidator
- ML의 워크플로우를 정의할 때 사용
- 여러 stage를 담고 있다
- 저장될 수 있다. (persist)
- 파이프라인 예: 데이터로딩 -> 전처리 -> 학습 -> 모델평가
- Transformer -> Tranformer -> Estimator -> Evaluator -> Model
./1-spark/logistic-regression.ipynb
./1-spark/pipeline.ipynb
- Alternating Least Squares
- 아직 못본 영화들의 평점을 예하고,
- 값을 정렬해서 제일 위에서 부터 유저에게 전달하는 것이 추천이다.
./1-spark/movie-recommendation.ipynb
- 지도 학습
- Regression, Classification 둘다 지도학습이다.
- Regression: 예측된 값이 실수
- Classification: 예측된 값이 클래스(카테고리)
./1-spark/taxi-fare-prediction.ipynb
./1-spark/taxi-fare-prediction-2.ipynb
./1-spark/taxi-fare-prediction-hyper.ipynb
./1-spark/taxi-fare-prediction-hyper.ipynb
- SQL 엔진 위에 만들어진 분산 스트림 처리 프로세싱
- 데이터 스트림을 처리할 때 사용
- 시간대 별로 데이터를 합쳐(aggregate) 분석 할 수 있음
- kafka, Amazon Kinesis, HDFS 등과 연결 가능
- 체크포인트를 만들어서 부분적인 결함이 발생해도 다시 돌아가서 데이터를 처리할 수 있다.
- 데이터 스트림은 무한한 테이블이다.
- input Data Stream --SparkStreaming--> batches of input data --SparkEngine--> batches of processed data
- Spark Stream의 기본적인 추상화
- 내부적으론 RDD의 연속이고 RDD의 속성을 이어받음
- 지금의 데이터를 처리하기 위해 이전 데이터에 대한 정보가 필요할 때
- 데이터를 어디에서 읽어올 지 명시
- 여러 데이터 소스를 사용해 join()이나 union()으로 합쳐 쓸 수 있다
spark.readStream.format("kafka")
.option("kafka.bootstrap.servers", ...)
.option("subscribe","topic")
.load()
spark.readStream.format("kafka")
.option("kafka.bootstrap.servers", ...)
.option("subscribe","topic")
.load()
.selectExpr("cast(value as string) as json")
.select(from_json("json", schema).as("data"))
spark.readStream.format("kafka")
.option("kafka.bootstrap.servers", ...)
.option("subscribe","topic")
.load()
.selectExpr("cast(value as string) as json")
.select(from_json("json", schema).as("data"))
.writeStream.format("parquet")
.trigger("1 minute") # <-- micro-batch 실행 간격
.option("checkpointLocation", "...")
.start()
- Map
- FlatMap
- Filter
- ReduceByKey
- 이전 데이터에 대한 정보를 State로 주고 받을 수 있다.
- 예) 카테고리별 (키값 별) 총합
terminal1) nc -lk 9999 # 소켓 열기
terminal2) python3 ./1-spark/streaming.py
terminal1) test testa testb
terminal1) test test testa
- 에어비앤비에서 개발한 워크플로우 스케줄링, 모니터링 플랫폼
- 실제 데이터의 처리가 이루어지는 곳은 아니다.
- 2016년 아파치 재단 incubator program
- 현재 아파치 탑레벨 프로젝트
- Airbnb, Yahoo, Paypal, Intel, Stripe
- 매일 10시에 주기적으로 돌아가는 데이터 파이프라인을 만들려면?
- 기존 방식: cron script로 사용
- 매일 10시에 주기적으로 돌아가는 데이터 파이프라인 (외부 api로 download -> process(Spark Job) -> store(DB))들을 수십개 만들어야 한다면?
- 실패 복구: 언제 어떻게 다시 실행할 것인가? Backfill
- 모니터링: 잘 돌아가고 있는지 확인하기 힘들다
- 의존성 관리: 데이터 파이프라인간 의존성이 있는 경우 상위 데이터 파이프라인이 잘 돌아가고 있는지 파악이 힘들다
- 확장성: 중앙화 해서 관리하는 툴이 없기 떄문에 분산된 환경에서 파이프라인들을 관리하기 어렵다
- 배포: 새로운 워크플로우를 배포하기 힘들다
- 워크플로우를 작성하고 스케줄링하고 모니터링 하는 작업을 프로그래밍 할 수 있게 해주는 플랫폼
- 파이썬으로 쉬운 프로그래밍이 가능
- 분산된 환경에서 확장성이 있음
- 웹 대시보드 (UI)
- 커스터마이징이 가능
- 의존성으로 연결된 작업(task)들의 집합 == DAG == Directed Acyclic Graph
- 웹 서버 - 웹 대시보드 UI
- 스케줄러 - 워크플로우가 언제 실행되는지 관리
- Metastore - 메타데이터 관리
- Executor - 테스크가 어떻게 실행되는지 정의
- Worker - 테스크를 실행하는 프로세스
- 작업을 정의하는데 사용
- Action Operators: 실제 연산을 수행
- Transfer Operators: 데이터를 옮김
- Sensor Operators: 테스크를 언제 실행시킬 트리거를 기다림
- Operator를 실행시키면 Task가 된다
- Task = Operator Instance
- 여러 데이터 엔지니어링 환경에서 유용하게 쓰일 수 있다
- 데이터 웨어하우스
- 머신러닝
- 분석
- 실험
- 데이터 인프라 관리
- WebServer, Metastore, Scheduler, Executor가 존재
- 동작 과정
- Metastore에서 dag에 대한 정보를 담고 있어서, Web server와 Scheduler가 그 정보를 읽어 오고 Executor로 이 정보를 보내서 실행을 한다.
- 이렇게 실행된 Task Instance는 metastore로 보내져서 상태를 업데이트 한다.
- 이렇게 업데이트된 상태를 다시 Web Server와 Scheduler가 읽어와서 Task가 잘 완료가 되었는지 확인을 한다.
- Executor에 Queue가 존재해서 순서를 정할 수 있게 된다.
- Queue가 Executor 바깥에 존재 한다 (One Node Architecture와의 큰 차이점)
- Celery Broker가 Queue이다.
- 동작 과정
- MetaStore에서 dag정보를 webserver와 scheduler가 정보를 읽고, celery executor를 통해서 celery broker에 task 순서대로 담는다.
- 순서대로 담긴 task를 worker들이 하나씨 가져가서 순서대로 실행된다.
- 이렇게 실행된 dag들은 완료되면 celery executor 그리고 metastore에 보고가 된다.
- 이렇게 완료된 상태를 UI와 Scheduler가 다시읽어와서 완료되는 것을 확인한다.
- DAG를 작성하여 Workflow를 만든다. DAG는 Task로 구성되어 있다
- Task는 Operator가 인스턴스화 된 것
- DAG를 실행시킬 때 Scheduler가 DagRun 오브젝트를 만든다
- DagRun 오브젝트는 Task Instance를 만든다
- Worker가 Task를 수행 후 DagRun 의 상태를 "완료"로 바꿔놓는다.
- 유저가 새로운 DAG를 작성 후 Folder DAGs 안에 배치
- Web Server와 Scheduler가 DAG를 파싱
- Scheduler가 Metastore를 통해 DagRun 오브젝트를 생성
- DagRun은 사용자가 작성한 DAG의 인스턴스
- DagRun status: Running
- Scheduler가 Task Instance 오브젝트 (Dag run 오브젝트의 인스턴스 == Task Instance) 를 스케줄링
- Trigger가 상황에 맞으면 Scheduler가 Task Instance를 Executor로 보냄
- Executor가 그 Task를 실행시킨 다음, 완료후 Metastore에 완료했다고 보고한다. (완료된 Task Instance는 Dag Run을 업데이트 한다)
- Scheduler가 Metastore를 통해서 DAG 실행이 완료됐나 확인을 하고 DagRun Status를 Completed로 변경한다.
- Web Server가 Metastore를 통해서 DAG 실행이 완료됐나 확인을 하고 UI 업데이트를 한다.
# m1 에서는 이 방법으로 설치 안됨.
pip --version # anaconda 로 설치된지 확인
pip install apache-airflow
airflow db init
airflow werbserver -p 8080
airflow users create --role Admin --username admin --email admin --firstname admin --lastname admin --password admin
# m1
curl -LfO 'https://airflow.apache.org/docs/apache-airflow/2.1.1/docker-compose.yaml'
docker-compose up airflow-init
docker-compose up -d
docker exec -it 64bb1d858ab5ad7babfad795a6e3dc60121e27b15a83c37bda4f54a6a /bin/sh # webserver container 접속
airflow users create --role Admin --username admin --email admin --firstname admin --lastname admin --password admin
airflow -h
: 각종 명령어 설명 보기airflow webserver
: webserver 시작airflow users create ~~
: user 추가airflow scheduler
: scheduler 시작airflow db init
: db에 기본적인 파이프라인 생성 및 기본 설정airflow dags list
: 현재 돌아가는 dag들 출력airflow tasks list example_xcom
: example_xcom 안에 존재하는 task들 조회airflow dgas trigger -e 2022-01-01 example_xcom
: 특정 dag를 트리거
Owner
: Dag 관리자Runs
: 실행 중인 DAG의 상태Schedule
: 주기를 나타내는 설정Last Run
: 최근 실행 날짜Next Run
: 다음 실행이 언제될지 나타냄Recent Tasks
: 방금 실행된 Task들을 보여줌Actions
: DAG를 지우거나 실행Links
: 마우스 갖다대면 여러가지 Link들이 보임
Tree
: Task들의 상태를 보기 편함Graph
: Task들의 의존성을 확인할 때 좋음, 각 Task들의 Log 정보 등을 확인하기에도 좋음Calendar
: 날짜별로 실패 없이 잘 돌아갔나 확인 가능Task Duration
,Task Tries
,Landing Times
: 날짜기반으로 뭔가확인인데 설치 직후엔 볼게 없음Gantt
: 각각의 task가 실행하면서 얼만큼의 시간을 소비했나 볼 수 있다.Details
: 여러가지 Metadata 확인Code
: DAG 코드 확인
- OpenSea 사이트의 NFT데이터를 추출해 테이블에 저장하기
- 테이블 생성 -> API 확인 -> NFT 정보 추출 -> NFT 정보 가공 -> NFT 정보 저장
./2-airflow/01-sqlite.py # 기본 dag 구성
# 생성 후 dag 대시보드에 등장하는지 확인
- BashOperator
- PythonOperator
- EmailOperator
- Action Operator는 액션을 실행한다 (데이터를 추출, 데이터 프로세싱 등)
- Transfer Operator는 데이터를 옮길 때 사용
- Sensors: 조건이 맞을 때 까지 기다린다
Airflow 대시보드 -> Admin -> Connections -> 추가 -> connection id =db_sqlite, conneciton Type = Sqlite 로 Save
./2-airflow/02-create-table.py
airflow tasks test nft-pipeline creating_table 2021-01-01 # task 실행
Airflow 대시보드 -> Admin -> Connections -> 추가 -> connection id = opensea_api, conneciton Type = http, host: https://api.opensea.io/ 로 Save
./2-airflow/03-sensor.py
airflow tasks test nft-pipeline is_api_available 2021-01-01 # task 실행
출처: https://github.com/keon/data-engineering/tree/main/02-airflow
Airflow 대시보드 -> Admin -> Connections -> 추가 -> connection id = githubcontent_api, conneciton Type = http, host: https://raw.githubusercontent.com/ 로 Save
./2-airflow/03-sensor.py
airflow tasks test nft-pipeline is_api_available 2021-01-01 # task 실행
./2-airflow/04-extract-data.py
airflow tasks test nft-pipeline extract_nft 2021-01-01 # task 실행
./2-airflow/05-process.py
airflow tasks test nft-pipeline process_nft 2021-01-01 # task 실행
cat /tmp/processed_nft.csv # 결과 확인
./2-airflow/06-store.py
airflow tasks test nft-pipeline store_nft 2021-01-01 # task 실행
# docker에서는 'airflow.db' 가 따로 없는듯. 그래서 해결은 못했음.
./2-airflow/07-dependency.py
airflow에서 DAG 활성화 해서 순차적으로 실행되는지 확인.
- 매일 주기적으로 돌아가는 파이프라인을 멈췄다가 몇일 뒤 실행시키면 어떻게 될까?
- 예를 들어, 하루에 한번씩 돌아가는 DAG가 1월1일에 실행됐다가, 1월2일에 멈췄었고 1월4일에 다시 시작하면 어떻게 될까?
- DAG 설정 코드에
catchup(False)
이면 1월 4일에 다시 시작하면 1월 4일기준으로 돌아간다. - DAG 설정 코드에
catchup(True)
이면 1월 4일에 다시 시작하면 1월 2일기준으로 돌아간다.
- DAG 시작 날짜를
2021-01-01
로 해두고, 현재2022-08-06
에catcup(True)
로하면 어떻게 될까?- 기존에 이미 실행된게 있으면 돌아가지 않는다. -> 기존 DAG를 지우고, Browse -> DAG Run -> nft-pipeline 제거
- 제거하고나면 바로 1월1일부터 거의 1년치가 동시에 돌아가게 된다.
1. webserver docker 접속
2. pip install apache-airflow-providers-apache-spark
3. fhvhv_tripdata_2020-03.csv 파일 webserver로 전송
# webserver docker에서 count_trips.py 작성
# 패키지를 가져오고
from pyspark import SparkConf, SparkContext
import pandas as pd
# Spark 설정
conf = SparkConf().setMaster("local").setAppName("uber-date-trips")
sc = SparkContext(conf=conf)
# 우리가 가져올 데이터가 있는 파일
directory = "/home/airflow/data"
filename = "fhvhv_tripdata_2020-03.csv"
# 데이터 파싱
lines = sc.textFile(f"file:///{directory}/{filename}")
header = lines.first()
filtered_lines = lines.filter(lambda row:row != header)
# 필요한 부분만 골라내서 세는 부분
# countByValue로 같은 날짜등장하는 부분을 센다
dates = filtered_lines.map(lambda x: x.split(",")[2].split(" ")[0])
result = dates.countByValue()
# 아래는 Spark코드가 아닌 일반적인 파이썬 코드
# CSV로 결과값 저장
pd.Series(result, name="trips").to_csv("trips_date.csv")
Admin -> Connectors -> 추가
Connect id: spark_local
Connection Type: Spark
Host: local
Save
airflow tasks test spark-example submit_job 2021-01-01
./2-airflow/dags/spark-example.py # 코드 위치
./2-airflow/taxi-price.py
- SystemA, SystemB 각각 데이터 쌓인 것을 Data Lake로 보내는 파이프라인을 각각 만들어줘야 함.
- 시스템을 더할수록 기하급수적으로 복잡해진다.
- 여러가지 통신 프로토콜을 지원해야 한다 (HTTP, GRPC, TCP, MQ)
- 데이터 포멧도 다르다 (CSV, JSON, XML)
- Point-of-failure 가 많다
- 시스템 A,B,C,D,E,F 각각의 신뢰도가 99% 라고 했을 때
- 시스템 A,B,C,D,E,F를 묶었을 때의 신뢰도 = 99% ^6 = 94.1%
- 각각의 연결고리 어디서 에러가 나고 있는지 모니터링 하기도 힘들다
- LinkedIn에서 개발
- Apache Software로 넘어가 2011년 오픈소스화
- Apple, eBay, Uber, ArBnB, Netflix 등에서 사용중
- 분산 스트리밍 플랫폼
- Source 시스템은 Kafka로 메시지를 보내고
- Destination 시스템은 Kafka로 부터 메시지를 받는다
- 확장성이 있고, 장애 허용 (fault tolerant)을 하며, 성능이 좋다.
- 시스템간 의존성을 간접적으로 만든다
- 확장성: 새 시스템을 더할 때 마다 복잡도가 선형적으로 올라간다
- Kafka를 이용해 통신 프로토콜을 통합하기 쉽다
- 확장성: 하루에 1조개의 메시지를 처리할 수 있고, Petabyte의 데이터를 처리 가능
- 메시지 처리 속도: 2MS
- 가용성(availability): 클러스터 환경에서 작동
- 데이터 저장 성능: 분산 처리, 내구성, 장애 허용 (fault tolerant)
- 시스템간 메시지 큐
- 로그 수집
- 스트림 프로세싱
- 이벤트 드리븐 기능들
- Netflix: 실시간 모니터링
- Expedia: 이벤트 드리븐 아키텍처
- Uber: 실시간 가격 조정, 실시간 수요 예측
- Topic
- Kafka Broker
- Kafka Producer
- Kafka Consumer
- Kafka Partition
- Kafka Message
- Kafka Offset
- Kafka Consumer Group
- Kafka Cluster
- Zookeeper
- Producer 와 Consumer가 소통을 하는 하나의 채널
- 데이터 스트림이 어디에 Publish 될지 정하는데 쓰임
- 토픽은 파일 시스템의 폴더의 개념과 유사하다.
- Producer는 토픽을 지정하고 메시지를 게시 (Post)
- Consumer는 토픽으로부터 메시지를 받아옴
- 카프카의 메시지는 디스크에 정렬되어 저장 되며, 새로운 메시지가 도착하면 지속적으로 로그에 기록
- Kafka Topic이 Partition으로 나뉜다.
- Partition은 디스크에 어떻게 저장이 되는지 가르는 기준이 된다.
- 카프카의 토픽은 파티션의 그룹이라고 할 수 있다.
- 디스크에는 파티션 단위로 저장
- 파티션마다 commit Log 가 쌓이게 된다
- 파티션에 쌓이는 기록들은 정렬이 되어 있고 불변(immutable)하다
- 파티션의 모든 기록들은 Offset이라는 ID를 부여받는다.
- 카프카의 메시지는 Byte의 배열
- 흔히 단순 String, JSON이나 Avro 사용
- 크기에는 제한이 없지만, 성능을 위해서는 작게 유지하는것이 좋다
- 데이터는 사용자가 지정한 시간만큼 저장한다 (Retention Period), topic 별로 지정도 가능
- Consumer가 데이터를 받아가고 나서도 데이터는 저장된다
- Retention Period가 지나면 데이터는 자동으로 삭제
- 장애가 있을 경우, Retention Period 기간 안에 해결을 해야 한다.
- Retention Period 지난 후에 문제가 생겼을 경우, Data Lake 까지 내려가서 데이터를 읽어와서 프로세싱 해야 한다
- 보내는 메시지는 Offset을 가지게된다.
- Offset은 Partition안에 메시지가 순서대로 정렬되는데, 정렬된 순서 및 값을 의미한다.
- 카프카 클러스터는 여러개의 카프카 브로커(서버)를 가질 수 있따
- 카프카 토픽을 생성하면 모든 카프카 브로커에 생성된다
- 카프카 파티션은 여러 브로커에 걸쳐서 생성된다
- 카프카의 서버로도 불린다.
- Topic을 전달하는 역할을 한다.
- Producer: 메시지를 전달하는 주체
- 카프카 토픽으로 메시지를 게시(post)하는 클라어인트 애플리케이션
- 메시지를 어느 파티션에 넣을지 결정 (key)
- Consumer: 메시지를 전달받는 주체
- Consumer를 묶어서 Consumer Group이라고 한다.
- Consumer 1개가 Consumer Group이 될 수 있고, 여러개가 될 수 도 있다.
- Consumer Group을 별도로 지정안하면, Consumer 1개당 Group1개씩 지정된다.
- 각 Consumer Group은 모든 파티션으로부터 데이터를 받을 수 있다.
- Consumer는 지정된 파티션으로부터 데이터를 받을 수 있다.
- Consumer1,2가 Consumer Group으로 이루어져 있는 경우, 각 Consumer마다 특정 지정된 파티션에 대해서만 데이털르 전달 받게 된다.
- Partition 4개, Consumer Group안에 Consumer가 3개 있는경우, 3개 각 파티션마다 Consumer에 할당되고 남은 1개의 파티션은 Consumer중에 랜덤으로 배정된다.
- 근데 여기에서 Consumer Group안에 Consumer가 1개가 추가되는 경우 Rebalancing이 일어난다.
- 남은 1개의 파티션이 새로 추가된 Consumer로 전달되도록 Rebalancing이 일어난다.
- Consumer가 제거되거나 추가될 때 rebalancing이 이루어 진다.
- 카프카 클러스터의 여러 요소들을 설정하는데 사용됨
- 메타데이터 설정, 토픽 설정, Replication Factor 등을 조절하는데 사용
- 분산 시스템간의 정보 공유, 상태 체크, 서버들 간의 동기화
- 분산 시스템의 일부이기 때문에 동작을 멈춘다면 분산 시스템에 영향
- 주키퍼 역시 클러스터로 구성
- 클러스터는 홀수로 구성되어 문제가 생겼을 경우 과반수가 가진 데이터를 기준으로 일관성 유지
- 하는일
- 클러스터관리: 클러스터에 존재하는 브로커를 관리하고 모니터링
- Topic 관리: 토픽 리스트를 관리하고 토픽에 할당된 파티션과 Replication관리
- 파티션 리더 관리: 파티션의 리더가 될 브로커를 선택하고, 리더가 다운될 경우 다음 리더를 선택
- 브로커들끼리 서로를 발견할 수 있도록 정보 전달
- Key 없이 전송: Producer가 메시지를 게시하면 Round-Robin 방식으로 파티션에 분배한다.
- Key 와함께 전송: 같은 Key를 가진 메시지들은 같은 파티션에게 보내진다
- 각 브로커는 복제된 파티션중 대표를 하는 파티션 리더를 가지게 된다.
- 모든 Read/Write는 파티션 리더를 통해서 이루어지게 됨
- 다른 파티션들은 파티션 리더를 복제
- Partition을 1개로 만들어놓고 Consumer Group안에 Consumer를 2개로 만든다면
- Producer에서 데이터를 아무리 보내도 Consumer1 로만 데이터를 보내게된다.
- Partition을 2개로 만들어놓고 Consumer Group안에 Consumer를 2개로 만든다면
- Producer에서 데이터를 보내면, Consumer1,2 각각에 균등하게 보내게 된다.
pip install kafka-python
./3-kafka/consumer.py
./3-kafka/producer.py
# m1 에서도 잘 작동함.
./3-kafka/docker-compose.yml
docker exec -it 03-kafka_kafka1_1 kafka-topics --bootstrap-server=localhost:19091 --create --topic first-cluster-topic --partitions 3 --replication-factor 1
# kafdrop 에서도 ui로 topic 생성 가능
./3-kafka/trips_producer.py
./3-kafka/trips_consumer.py
- payment_producer에서 랜덤 payment 데이터들을 kafka payment 토픽으로 전송 한다. (producer)
- fraud_detector에서 payment 토픽에서 데이터를 전달받아 비트코인 데이터면 fraud_payments(사기) 토픽으로 전송하고 정상 데이터면 legit_payments 토픽으로 데이터를 전송한다. 즉, consumer와 producer가 둘다 공존하고 있다.
- legit_processor는 정상 데이터들을 전달받아 처리하는 consumer이다.
- fraud_processor는 비정상 데이터들을 전달받아 처리하는 consumer이다.
./3-kafka/fraud_detection/*
- Spark: 배치 프로세싱을 위한 프레임워크
- Flink: 스트림 프로세싱을 위한 프레임워크
- 2009년 개발 시작 ~ 2016년 첫 stable 버전 공개
- 오픈소스 스트림 프로세싱 프레임워크
- 분산처리 / 고성능 / 고가용성
- 배치 프로세싱 또한 지원한다
- Spark보다 빠른 속도
- Fault-tolerance: 시스템 장애시 장애 직전으로 돌아가서 다시 시작할 수 있다
- 활발한 개발 - 그래프 프로세싱, 머신러닝, 텍스트 처리, 등 라이브러리와 여러가지 라이브러리 / 프레임워크와 연동
- Rescalability: 실행 도중 리소스 추가 가능
- 배치 프로세싱은 한정된 데이터를 가지고 다뤘다면, 스트림 프로세싱은 무한하게 데이터가 들어올 수 있을 때 다룬다.
- 주식 거래소
- 웹 서버
- 센서 데이터 처리
- 이벤트 드리븐 어플리케이션
- 비정상 거래 탐지
- Batch
- 한정된 데이터를 다룰 때 사용
- 모든 데이터셋을 읽은 후 처리 가능
- 주기적으로 실행되는 작업
- 처리속도보다는 처리량에 포커스
- Stream
- 데이터가 무한이라고 가정
- 데이터가 도착할 때 마다 처리
- 실시간으로 실행되는 작업
- 처리량보다 처리속도에 포커스
- Streaming Dataflow:
- Sources: 한개 혹은 여러개의 데이터 소스가 있을 수 있다
- Operators: 데이터를 변환 (transformation)
- Sink: 데이터플로우의 마지막 부분
- 여러 데이터 소스로 부터 읽어와서, Sink를 통해 여러 데이터 소스로 보낼 수 있다.
- Hadoop
- Batch Processing
- Disk에서 데이터를 읽고 처리
- Spark
- Hadoop에서 개선해서 만든 프로젝트
- Hadoop에 비해 속도가 빠르다
- Batch Processing
- (Batch based Streaming) -> micro batch로 streaming을 할 수 있는 라이브러리가 있다
- In-Memory 데이터 처리
- Flink
- Stream Processing
- In-Memory 데이터 처리
- Hadoop
- Input ---Mapper--> 상태1 ---Mapper(Disk)--> Reducer --> Output
- Mapper를 통해서 Reducer에 전달이 될 때 Disk를 거치기 때문에 고성능을 내기 힘들다 (Disk를 거치는게 시간 소요가 많이 된다)
- Spark
- Input --> 상태1 --Transformation(in-memory)--> 상태2 --> Output
- in-memory tranformation을 통해서 Hadoop에 비해 훨씬 성능이 빠르다.
- Flink
- Input --> 상태1 --Transformation(in-memory)--> 상태2 --> Output
- Flow자체는 Spark와 매우 유사한데, Batch Processing이냐 Stream Processing이냐 차이가 있다.
- Hadoop
- 데이터 처리 방법을 손수 코딩해줘야 한다
- 낮은 단계의 추상화
- Spark
- 높은 단계의 추상화
- 쉬운 프로그래밍
- RDD
- Flink
- 높은 단계의 추상화
- 쉬운 프로그래밍
- Dataflows
- Spark & Flink 모두 개발 커뮤니티가 활성화 되어 있고, API 라이브러리가 개발이 잘 되어 있다
- 예) Spark - MLlib, Flink - FlinkML
- Spark
- Spark는 진정한 실시간 데이터 처리가 아니다
- 스파크의 엔진은 배치 프로세싱 기준
- 마이크로 배칭
- Flink
- 실시간 데이터 처리
- 플링크의 엔진은 스트림 프로세싱 기준
- 마이크로 배치: 데이터 중 일부분 떼와서 배치 프로세싱
- Window: 시간을 정한 후, 그 시간부터 10초 사이의 데이터를 window로 묶어 사용
- Spark
- Scala로 개발되어 있음
- 효율적인 메모리 관리가 어렵다
- Out of Memory 에러가 자주 발생
- 의존성 관리로 DAG 사용
- Flink
- Java로 개발되어 있음
- 내장 메모리 매니저
- Out of Memory 에러가 자주 안난다
- Controlled cyclic dependency graph (ML 같이 반복적인 작업에 최적화)
- Flink는 아래 스펙들을 갖고 있는 첫번째 오픈소스 프레임워크
- 클러스터를 이루고 100만 단위의 이벤트를 처리
- Latency 가 1초 이하(sub-second)
- Exactly-once: 1번 이상의 처리를 보장 -> 보통 다른 시스템들은 at least once가 대부분이다 (한 번 이상의 처리를 하거나 보장을 못하고 중복으로 처리할 수 도 있고 데이터를 잃어버릴수도 있음)
- 정확한 결과를 보장
- Storage
- Deployment/Environment
- Engine
- Flink는 Spark와 마찬가지로 데이터를 처리만 하는 시스템이다.
- 따라서, 각종 저장 시스템들과 연동이 가능하도록 설계
- HDFS
- Local File System
- Mongo DB
- RDBMS (MySQL, Postgres)
- S3
- Rabbit MQ
- 리소스 관리도 여러 시스템과 연동하여 이용 가능하다.
- Local
- Standalone 클러스터
- YARN
- Mesos
- AWS / GCP
- SQL: High-level Language
- Table API: Declarative DSL
- Data Stream / DataSet API: Core APIs
- Stateful Stream Processing: Low-level building block (streams, state, [event] time)
- 4번을 그대로 쓸수도 있지만, 보통은 이 파트를 쓰지 않는다.
- 실제로는 3번을 사용하게 되는데, Data Stream은 스트림 프로세싱할 때 사용하고, Dataset API는 배치 프로세싱할 때 사용하는데
- Data Set API는 점점 안쓰는 추세이며 곧 Deprecated 될 수 있다.
- 2번: SparkSQL과 비슷하게 프로그래밍을 선언적으로 할 수 있도록 해줌. Spark와는 다르게 Table이 Dynamic하게 변경되는점이 다르다.
- 1번: 가장 높은 단계의 추상화, SQL로 프로그래밍을 할 수 있다.
- Flink는 여러 Connector 들과 연결 가능
- sink는 데이터를 저장하는 곳, source는 데이터를 입력을 받을 수 있는 곳
- Apache Kafka (sink / source)
- Elastic Search (sink)
- HDFS (sink)
- RabbitMQ (sink, source)
- Amazon Kinesis (sink, source)
- Twitter Streaming API (source)
- Apache Cassandra (sink)
- Redis (sink)
- Apache Zepplin - 웹 베이스 노트북
- Apache Mahout - 머신러닝 라이브러리
- Cascading - Workflows 매니지먼트
- Apache Beam - Data pipeline 생성 / 관리 툴
- Source -> Operations,Transformations -> Sink
- Source: RDB, Kafka, Local file
- Sink: Kafka, HDFS, RDB 등
- event 각각을 독립적으로 처리하면 state가 필요 없다
- 여러 event를 한꺼번에 보려고 할 대 state가 필요하다 - stateful
- 예
- 패턴을 찾는 일
- 데이터를 시간별로 합치는 일
- 머신러닝 트레이닝
- 과거의 데이터를 참고해야하는 일
- flink는 따라서 state를 갖고 있다
- checkpoints와 savepoints로 state를 저장해서 내결함성을 갖도록 설계
- queryable state를 이용해서 밖에서 state를 관찰할 수도 있다
- Data Stream API를 사용할 때 여러가지 경우로 state를 사용하게 된다
- Window로 데이터 모아보기
- Transformations (key-value state)
- CheckpointedFunction으로 로컬 변수를 fault tolerant하게 만들기
- HashMapStateBackend
- Java Heap에 저장
- Hash Table에 변수와 Trigger를 저장
- 큰 state, 긴 windows, 큰 key/value 쌍을 저장할 때 권장
- 고가용성 환경
- 메모리 사용으로 빠른 처리
- EmbeddedRocksDBStateBackend
- RocksDB에 저장
- 데이터는 byte array로 시리얼라이즈 되어 저장
- 매우 큰 state, 긴 window, 큰 key/value state 저장
- 고가용성 환경
- Disk와 Serialize 사용으로 성능은 뒤떨어지고 / 처리량이 늘어난다 (tradeoff)
- Key-Value store
- Keyed stream에서만 이용 가능
- 예를들어 각 이벤트는 id, value 스키마를 갖고
- 각 id마다 value를 더하고 싶을 때 keyed state를 이용
- 장애 허용을 가능하게 해주는 기능들
- Stream replay
- Checkpointing
- checkpoint를 얼마나 자주 저장해야 하나.
- Trade off 존재
- 가벼운 state를 가진 프로그램은 자주 저장해주어도 된다
- checkpoint를 한 이후에 시스템이 망가질 경우
- 플링크는 작동을 멈추고
- 체크포인트로 리셋
- 분산된 데이터 스트림에서 어떻게 snapshot을 만들까
- Chandy-Lamport 알고리즘.
- 비동기적으로 실행
- 데이터를 시간별로 나누는 barrier를 삽입해 snapshot이 가능하다
- barrier는 가벼워서 스트림에 방해되지 않도록 설계
- Sink operator가 barrier를 받아서 새로운 checkpoint를 만든다
- 사이사이에 끼어놓은 barrier를 기록을 하는 과정이다.
- barrier와 state를 기록한다.
- 데이터가 오는대로 받아들여 체크포인트 만들기
- 빠른 속도를 위한 프로그램을 만들 때 사용
- Exactly-once 보다는 at-least-once를 보장한다.
- 장애가 나면 마지막 체크포인트를 불러온다
- 시스템은 dataflow 전체를 re-deploy 한다
- 각 operator에게 체크포인트의 state 정보를 주입한다
- 입력 stream도 체크포인트일 때로 돌려놓는다 - 입력 스트림 자체가 체크포인트로 돌려놓는 작업을 지원해야 한다(이런점에서 kafka랑 궁합이 잘 맞는다)
- 재시작
- 사용자가 지정한 체크포인트
- 다른 체크포인트처럼 자동으로 없어지지 않는다
- 분산 환경에서 체크포인트 정렬 여부
- 속도가 중요할 경우 at least once 사용
- Time Series Analysis할 때
- Windows쓸 때
- Event time이 중요 할 때
- Event Time
- Processing Time
- 데이터를 처리하는 시스템의 시간
- Hourly time window
- 9:15분 시스템 시작
- 9:15 - 10:00
- 10:00 - 11:00
- 가장 빠른 성능과 Low Latency
- 하지만 분산되고 비동기적인 환경에서는 결정적(deterministic) 이지 못한다
- 이벤트가 시스템에 도달하는 속도에 달렸기 때문에
- Event가 생성된 곳에서 만들어진 시간
- Flink에 도달하기 전 이벤트 자체에 기록 보관
- 시간은 시스템이 아니라 data 자체에 의존
- 이벤트 타임 프로그램은 Event Time Watermark를 생성해야 된다
- Evnet Time에 의존하는 시스템은 시간의 흐름을 재는 방법이 따로 필요하다
- 예) 1시간 짜리 window operation이면 1시간이 흘렀다는 것을 알아야 한다
- Event Time과 Processing Time은 싱크가 안맞을 수 있다
- 예) 1주 짜리 데이터를 몇초 만에 계산할 수 있다
- 그래서 나온것이 watermark
- Flink가 event time의 흐름을 재는 방법
- Watermark(t): timestamp t <= t (적어도 t까진 왔다)
- 여러 input stream을 받는 operator의 경우 가장 낮은 event time을 사용
- 분산 시스템으로서 컴퓨팅 리소스 분배가 효율적이어야 한다
- 리소스 매니저의 종류
- YARN
- Kubernetes
- Task 스케줄링 (다음 Task가 언제 실행 될 지)
- 실패/완료된 Tasks 관리
- 체크포인트 관리
- 실패시 Recovery
- 3가지의 컴포넌트
- Resource Manager - task solt 관리
- Dispatcher - Flink app을 등록 하는 REST API & web UI
- JobMaster - 1개의 JobGraph 관리
- aka workers
- Dataflow의 task를 실행하는 주체
- Task slot - 테스크 매니저를 스케줄링하는 가장 작은 단위
- Task slot으로 동시에 실행될 수 있는 tasks 설정
- Task Worker (TaskManager)는 JVM 프로세스
- 여러 쓰레드에서 하나 혹은 여러개의 sub task를 실행 가능
- 하나의 TaskManager가 가질 수 있는 Task 수는 Task Slot으로 조절
- 2019년 8월 Pyflink (Table API) 베타 버전
- 2020년 2월 Apache-flink가 pypi에서 다운로드 가능 해짐
- 2020년 7월 Python UDF, SQL, UDF metrics, Pandas UDFs
- 파이썬인 이유
- Data Sciencedhk 가장 가까운 언어
- 머신러닝 프레이무어크, 판다스 등의 라이브러리
- Apache Flink위에 올려진 Python API
- 파이썬으로 스트림 프로세싱을 할 수 있다
- Data를 주고 받는 것을 최소화
- Serialization / Deserialization
- 빠른 Python UDF
- CPU utilization
https://www.apache.org/dyn/closer.lua/flink/flink-1.15.1/flink-1.15.1-bin-scala_2.12.tgz # 접속 후 다운로드
pyflink 설치
pip install apache-flink
WordCount java 실행
./bin/flink run examples/streaming/WordCount.jar
tail log/flink-*-taskexecutor-*.out
WordCount python 실행
./bin/flink run --python examples/python/datastream/word_count.py
# Job has been submitted with JobID cc2541ee4ded7e34d7b12d812134fd96
- 실행
$ ./bin/start-cluster.sh
Starting cluster.
Starting standalonesession daemon on host.
Starting taskexecutor daemon on host.
- 실행 확인
$ ps aux | grep flink
- 종료
$ ./bin/stop-cluster.sh
- 웹 ui 확인
http://localhost:8081/#/overview