-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathkinopoisk.py
164 lines (133 loc) · 6.91 KB
/
kinopoisk.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
from Parser.parser import IParser
import asyncio
import aiohttp
import logging
from selenium import webdriver
from webdriver_manager.chrome import ChromeDriverManager
from selenium.webdriver.chrome.options import Options
from bs4 import BeautifulSoup
from fake_useragent import UserAgent
import Extentions.json_serializer as jsonext
import configparser
import sys
import Extentions.ini_deserializer as ini
from lxml import html as lxml
#configurate
CONFIG = configparser.ConfigParser()
F_NAME = str(sys.argv[0]).split('.')[0] + '_settings.ini'
ini.IniDeserializer.deserialize(CONFIG, F_NAME)
API = CONFIG['GENERAL']['api']
URL = CONFIG['GENERAL']['url']
KEY = CONFIG['GENERAL']['key']
MAIN = CONFIG['PARSER']['mainpage']
KINORATE = CONFIG['PARSER']['kinoRating']
IMDBRATE = CONFIG['PARSER']['IMDbRating']
HEADERS = {
'user-agent': UserAgent().random,
'X-API-KEY' : KEY,
'Accept': '*/*'
}
class KinopoiskParser(IParser):
def __init__(self):
logging.basicConfig(filename='parser.log', encoding='utf-8', level=logging.DEBUG,
format='%(asctime)s %(message)s', datefmt='%m/%d/%Y %I:%M:%S %p', filemode='w')
self.__chrome_options = Options()
self.__chrome_options.add_argument("--headless")
async def get_html(self, url):
async with aiohttp.ClientSession() as session:
async with session.get(url, headers=HEADERS) as resp:
if resp.status == 200:
logging.info(f'Connect to api status: {resp.status}')
return await resp.json()
else:
logging.error(f'Connection error, status: {resp.status}')
resp.close()
#async is not working to webdriver
def __get_html_api(self, url) -> webdriver:
driver = webdriver.Chrome(ChromeDriverManager().install(), options=self.__chrome_options)
driver.implicitly_wait(5)
driver.get(url)
return driver
async def parse(self, id:int):
try:
response = await self.get_html(f'{API}{id}')
driver = self.__get_html_api(f'{URL}{id}')
movie = self.__get_content(driver)
response['data'].update(movie)
loop = asyncio.get_event_loop()
loop.run_until_complete(await jsonext.JsonSerializer.serialize(response, id))
loop.close()
print("Parsing successfully")
except Exception as exc:
logging.error(exc)
def __get_content(self, html:webdriver) -> dict:
try:
soup = BeautifulSoup(html.page_source, 'lxml')
items = soup.find_all('div', class_=f"styles_root{MAIN}")
dom = lxml.fromstring(soup.renderContents())
for item in items:
kinotype = item.find('h3', class_='film-page-section-title').get_text()
if kinotype[2:-1] == 'сериал':
#serial
act_length = len(dom.xpath('//*[@id="__next"]/div/div[2]/div[2]/div[2]/div/div[3]/div/div/div[2]/div[2]/div[2]/div[1]/ul/li')) + 1
actors = self.__actors_worker(act_length,
dom, '//*[@id="__next"]/div/div[2]/div[2]/div[2]/div/div[3]/div/div/div[2]/div[2]/div[2]/div[1]/ul/li')
dist_length = len(dom.xpath('//*[@id="__next"]/div/div[2]/div[2]/div[2]/div/div[3]/div/div/div[2]/div[1]/div/div[5]/div[2]/a')) + 1
distributors = self.__distributor_worker(dist_length,
dom, '//*[@id="__next"]/div/div[2]/div[2]/div[2]/div/div[3]/div/div/div[2]/div[1]/div/div[5]/div[2]/a')
else:
#film
act_length = len(dom.xpath('//*[@id="__next"]/div/div[2]/div[2]/div[2]/div/div[3]/div/div/div[2]/div[2]/div/div/ul/li')) + 1
actors = self.__actors_worker(act_length,
dom, '//*[@id="__next"]/div/div[2]/div[2]/div[2]/div/div[3]/div/div/div[2]/div[2]/div/div[1]/ul/li')
dist_length = len(dom.xpath('//*[@id="__next"]/div/div[2]/div[2]/div[2]/div/div[3]/div/div/div[2]/div[1]/div/div[5]/div[2]/a')) + 1
distributors = self.__distributor_worker(dist_length,
dom, '//*[@id="__next"]/div/div[2]/div[2]/div[2]/div/div[3]/div/div/div[2]/div[1]/div/div[5]/div[2]/a')
movie = {}
for item in items:
movie = {
'ratingKinopoisk': item.find('a', class_=f'styles_rootLink{KINORATE}').get_text()
if item.find('a', class_=f'styles_rootLink{KINORATE}') is not None else None,
'ratingIMDb': item.find('span', class_=f'styles_valueSection{IMDBRATE}').get_text()
if item.find('span', class_=f'styles_valueSection{IMDBRATE}') is not None else None,
'distributors': distributors if bool(distributors) is not False else None,
'actors': actors if bool(actors) is not False else None
}
except Exception as ex:
logging.error(f'Error: {ex}')
finally:
self.dispose(html)
return movie
def __actors_worker(self, act_lenght, dom, path):
if act_lenght == 1:
act_lenght = len(dom.xpath('//*[@id="__next"]/div/div[2]/div[2]/div[2]/div/div[3]/div/div/div[2]/div[2]/div/div[1]/ul/li'))+1
if act_lenght == 1:
act_lenght = len(dom.xpath('//*[@id="__next"]/div/div[2]/div[1]/div[2]/div/div[3]/div/div/div[2]/div[2]/div[1]/div[1]/ul/li'))+1
if bool(dom.xpath(path)) is False:
path = '//*[@id="__next"]/div/div[2]/div[2]/div[2]/div/div[3]/div/div/div[2]/div[2]/div/div[1]/ul/li'
if bool(dom.xpath(path)) is False:
path = '//*[@id="__next"]/div/div[2]/div[1]/div[2]/div/div[3]/div/div/div[2]/div[2]/div[1]/div[1]/ul/li'
actors = []
for j in range(1, act_lenght):
actors.append({'actor': ''.join(map(str, dom.xpath(f'{path}[{j}]/a/text()')))})
for element in actors:
if element['actor']=='':
actors.remove(element)
return actors
def __distributor_worker(self, dist_length, dom, path):
if dist_length == 1:
dist_length = len(dom.xpath('//*[@id="__next"]/div/div[2]/div[1]/div[2]/div/div[3]/div/div/div[2]/div[1]/div/div[5]/div[2]/a'))+1
if bool(dom.xpath(path)) is False:
path = '//*[@id="__next"]/div/div[2]/div[1]/div[2]/div/div[3]/div/div/div[2]/div[1]/div/div[5]/div[2]/a'
distributors = []
for i in range(1, dist_length):
distributors.append({'distributor' : ''.join(map(str, dom.xpath(f'{path}[{i}]/text()')))})
for dis in distributors:
if dis['distributor'] == '...':
distributors.remove(dis)
return distributors
def dispose(self, driver: webdriver):
driver.close()
driver.quit()
driver.stop_client()
logging.info('Chrome driver is closed, memory is cleared')