Skip to content

Commit

Permalink
[KAN-79] MYSQL -> ES 동기화 배치 작업
Browse files Browse the repository at this point in the history
  • Loading branch information
sinkyoungdeok committed May 21, 2024
1 parent 48b74d8 commit 9bbc969
Show file tree
Hide file tree
Showing 5 changed files with 77 additions and 13 deletions.
17 changes: 10 additions & 7 deletions csv-to-es.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@
index_name = f"restaurant_{now.strftime('%Y_%m_%d_%H-%M')}"

# Elasticsearch 클라이언트 설정
es = Elasticsearch("http://es-singlenode:9200")
es = Elasticsearch("http://localhost:9200")

# 새 인덱스 생성 및 매핑 설정
if not es.indices.exists(index=index_name):
Expand All @@ -31,9 +31,12 @@
"id": {"type": "long"},
"name": {"type": "text", "analyzer": "korean"},
"original_category": {"type": "text", "analyzer": "korean"},
"naver_review_count": {"type": "long"},
"address": {"type": "text", "analyzer": "korean"},
"naver_rating": {"type": "float"},
"naver_review_count": {"type": "long"},
"naver_rating_avg": {"type": "float"},
"review_count": {"type": "long"},
"rating_avg": {"type": "float"},
"like_count": {"type": "long"},
"number": {"type": "text"},
"image_url": {"type": "text"},
"category": {"type": "text", "analyzer": "korean"},
Expand Down Expand Up @@ -91,7 +94,7 @@
"original_category": row['category'],
"naver_review_count": row['review_count'].replace('+', ''),
"address": row['address'],
"naver_rating": rating,
"naver_rating_avg": rating,
"number": number,
"image_url": restaurant_image_url,
"category": row['custom_category'],
Expand All @@ -102,14 +105,14 @@
data.pop("discount_content")
if data.get("naver_review_count") is None:
data.pop("naver_review_count")
if data.get("naver_rating") is None:
data.pop("naver_rating")
if data.get("naver_rating_avg") is None:
data.pop("naver_rating_avg")
if data.get("number") is None:
data.pop("number")
if data.get("image_url") is None:
data.pop("image_url")

response = es.index(index=index_name, id=row['name'], document=data)
response = es.index(index=index_name, id=row['id'], document=data)
print(f"Indexed document ID: {response['_id']}, Result: {response['result']}")

# 앨리어스 확인 및 설정
Expand Down
2 changes: 2 additions & 0 deletions mysql-to-es-requirements.txt
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
mysql-connector-python
elasticsearch
8 changes: 8 additions & 0 deletions mysql-to-es.Dockerfile
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
FROM python:3.8-slim

COPY mysql-to-es-requirements.txt mysql-to-es-requirements.txt
COPY mysql-to-es.py mysql-to-es.py

RUN pip install -r mysql-to-es-requirements.txt

ENTRYPOINT ["python3", "mysql-to-es.py"]
56 changes: 56 additions & 0 deletions mysql-to-es.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,56 @@
import mysql.connector
from elasticsearch import Elasticsearch, helpers

# MySQL database connection parameters
db_config = {
'user': 'skku-user',
'password': 'skku-pw',
'host': 'skku-db',
'database': 'skku',
'port': 3306
}

# Elasticsearch connection parameters
es = Elasticsearch(['http://localhost:9200'])


def fetch_restaurant_data():
connection = mysql.connector.connect(**db_config)
cursor = connection.cursor(dictionary=True)

query = """
SELECT id, name, rating_avg, review_count, like_count
FROM restaurants
"""

cursor.execute(query)
data = cursor.fetchall()

cursor.close()
connection.close()

return data


def update_elasticsearch(data):
actions = [
{
"_op_type": "update",
"_index": "restaurant",
"_id": restaurant['id'],
"doc": {
"rating_avg": restaurant['rating_avg'],
"review_count": restaurant['review_count'],
"like_count": restaurant['like_count']
},
"doc_as_upsert": True
}
for restaurant in data
]

helpers.bulk(es, actions)


restaurant_data = fetch_restaurant_data()
update_elasticsearch(restaurant_data)
print("Elasticsearch update complete")
7 changes: 1 addition & 6 deletions mysql_batch/insert.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,11 +18,6 @@ def insert_into_restaurants(cursor, restaurant):
);
"""

try:
rating = float(restaurant['rating'])
except ValueError:
rating = 0.0

cursor.execute(insert_query, (
restaurant['id'],
restaurant['name'],
Expand All @@ -31,7 +26,7 @@ def insert_into_restaurants(cursor, restaurant):
0,
restaurant['address'],
restaurant['number'],
rating,
0,
restaurant['image_url'],
0,
restaurant['discount_content']
Expand Down

0 comments on commit 9bbc969

Please sign in to comment.