-
Notifications
You must be signed in to change notification settings - Fork 0
/
bot.py
427 lines (334 loc) · 13.7 KB
/
bot.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
# -*- coding: utf-8 -*-
import requests
import json
import datetime
import math
import sys
import os
import tweepy
from pyexcel_ods import get_data
from apscheduler.schedulers.blocking import BlockingScheduler
from sklearn import linear_model
import numpy as np
# VARIABLES GLOBALES
URL_PREFIJO = 'https://www.mscbs.gob.es/profesionales/saludPublica/ccayes/alertasActual/nCov/documentos/Informe_Comunicacion_'
URL_SUFIJO = '.ods'
FECHA_INICIO_VACUNACION = datetime.date(2021, 1, 18)
DIAS_ENTRE_DOSIS = 28
TAMANO_BARRA_PROGRESO = 8
RESTDB_URL 'xxx/rest'
RESTDB_TOKEN 'xxx'
TW_CONSUMER_KEY = 'xxx'
TW_CONSUMER_SECRET = 'xxx'
TW_ACCESS_TOKEN = 'xxx'
TW_ACCESS_TOKEN_SECRET = 'xxx'
TELEGRAM_TOKEN = 'xxx:xxx'
TELEGRAM_CHAT_ID = 'xxx'
POBLACION_ESP = 46940000
POBLACION_ANDALUCIA = 8464411
POBLACION_ARAGON = 1329391
POBLACION_ASTURIAS = 1018784
POBLACION_BALEARES = 1171543
POBLACION_CANARIAS = 2175952
POBLACION_CANTABRIA = 582905
POBLACION_CASTILLA_LEON = 2394918
POBLACION_CASTILLA_MANCHA = 2045221
POBLACION_CATALUNA = 7780479
POBLACION_CEUTA = 84202
POBLACION_C_VALENCIANA = 5057353
POBLACION_EXTREMADURA = 1063987
POBLACION_GALICIA = 2701819
POBLACION_LA_RIOJA = 319914
POBLACION_MADRID = 6779888
POBLACION_MELILLA = 87076
POBLACION_MURCIA = 1511251
POBLACION_NAVARRA = 661197
POBLACION_PAIS_VASCO = 2220504
# FUNCIONES AUXILIARES
def procesar_json_array(array_bruto, tipo):
# Procesamos un array de str para convertirlo a su formato (float o int)
store_list = []
for item in array_bruto:
if tipo == 'float':
store_list.append(float(item))
else:
store_list.append(int(item))
return store_list
def obtener_dias_porcentajes():
# Formulamos la URL
url = RESTDB_URL+'/data'
# Realizamos la petición
response = requests.get(url=url, headers={'x-apikey': RESTDB_TOKEN})
# Si hay datos, se devuelve, si no, devolvemos arrays vacíos
try:
js = json.loads(response.content)
id = js[0][u'_id']
porcentajes_bruto = js[0][u'porcentajes']
dias_bruto = js[0][u'dias']
# Procesamos los arrays
porcentajes = procesar_json_array(porcentajes_bruto, 'float')
dias = procesar_json_array(dias_bruto, 'int')
return {'id': id, 'porcentajes': porcentajes, 'dias': dias}
except:
return {'id': 0, 'porcentajes': [], 'dias': []}
def actualizar_dias_porcentajes(dia, porcentaje):
# Obtenemos los arrays que ya hay guardados
datos = obtener_dias_porcentajes()
# Extraemos los arrays del diccionario
id = datos['id']
porcentajes = datos['porcentajes']
dias = datos['dias']
# Añadimos los nuevos valores a los arrays
porcentajes.append(porcentaje)
dias.append(dia)
# Preparamos el cuerpo de la petición
data_post = {'porcentajes': porcentajes, 'dias': dias}
# Formulamos la URL
url_post = RESTDB_URL + '/data' + id
# Realizamos la petición
response = requests.put(url=url_post, headers={'x-apikey': RESTDB_TOKEN}, data=data_post)
def obtener_ultima_fecha():
# Formulamos la URL
url = RESTDB_URL + '/fecha'
# Realizamos la petición
response = requests.get(url=url, headers={'x-apikey': RESTDB_TOKEN})
# Si hay una fecha la devolvemos, sino, devolvemos un str vacío
try:
return json.loads(response.content)[0][u'fecha']
except:
return ''
def guardar_ultima_fecha(fecha):
# Obtener la ultima fecha
ultima_fecha = obtener_ultima_fecha()
# Si hay alguna guardada, la eliminamos
if ultima_fecha != '':
url_delete = RESTDB_URL + '/fecha/*?q={"fecha": "'+ultima_fecha+'"}'
response = requests.delete(url=url_delete, headers={'x-apikey': RESTDB_TOKEN})
# Preparamos el cuerpo de la petición
data_post = {'fecha': fecha}
# Formulamos la URL
url_post = RESTDB_URL + '/fecha'
# Realizamos la petición
response = requests.post(url=url_post, headers={'x-apikey': RESTDB_TOKEN}, data=data_post)
def obtener_mes_esp(month):
# Devuelve el equivalente del str en español del mes que se pase
switcher = {
1: 'enero',
2: 'febrero',
3: 'marzo',
4: 'abril',
5: 'mayo',
6: 'junio',
7: 'julio',
8: 'agosto',
9: 'septiembre',
10: 'octubre',
11: 'noviembre',
12: 'diciembre'
}
return switcher.get(month)
def convertidor_fecha(o):
if isinstance(o, datetime.datetime):
return o.__str__()
def avanzar_fecha_dias(fecha, dias):
# Avanza la fecha el número de días que se pase por parámetros
return datetime.date.fromordinal(fecha.toordinal()+dias)
def sendNotification(notification, emoji):
# Formulamos el string de la noticicación
msg = emoji+' *Vacunas COVID Twitter bot*\n\n'+notification
# Formulamos la URL de la petición
send_text = 'https://api.telegram.org/bot' + TELEGRAM_TOKEN + '/sendMessage?chat_id=' + TELEGRAM_CHAT_ID + '&parse_mode=Markdown&text=' + msg
# Realizamos la petición
response = requests.get(send_text)
return response.json()
# FUNCIONES DEL BOT
def obtener_datos(today):
# Obtenemos la fecha del día de ayer para obtener sus datos
fecha_url = avanzar_fecha_dias(today, 0).strftime('%Y%m%d')
# Obtenemos la fecha del último tweet para evitar repeticiones
fecha_ultimo = obtener_ultima_fecha()
# Si ya hemos publicado, devolvemos el 2
if (fecha_url <= fecha_ultimo):
return 2
# Conformamos la URL de la petición
url = URL_PREFIJO + fecha_url + URL_SUFIJO
# Probamos a obtener el fichero, si no puede, devolvemos el error 3
try:
r = requests.get(url, allow_redirects=True)
except:
return 3
# Probamos a abrir el fichero, si no se puede, quiere decir que no están los datos todavía, devolvemos 1
try:
open('data.ods', 'wb').write(r.content)
except:
return 1
# En el caso de que si escriba el fichero, obtenemos los datos en ODS
try:
ods = get_data("data.ods")
except:
return 1
# Obtenemos un diccionario del JSON
datos = json.loads(json.dumps(ods, default=convertidor_fecha, indent=4))
return datos
def obtener_fecha_estimada(porc_buscado, today):
# Obtenemos los datos
datos = obtener_dias_porcentajes()
# Declaramos los arrays
x = np.array(datos['porcentajes']).astype(np.float64).reshape((-1, 1))
y = np.array(datos['dias']).astype(np.float64)
# Inicializamos el modelo lineal y añadimos los datos
model = linear_model.LinearRegression()
model.fit(x, y)
# Realizamos la predicción de dias
x_predict = [[porc_buscado]]
y_predict = int(model.predict(x_predict)[0])
# Calculamos la fecha estimada
fecha_est = FECHA_INICIO_VACUNACION + datetime.timedelta(y_predict)
# Formulamos el strig con la fecha estimada
fecha_est_str = str(fecha_est.day) + ' de ' + obtener_mes_esp(fecha_est.month) + ' del ' + str(fecha_est.year)
return fecha_est_str
def obtener_fecha_actual(today):
return str(today.day) + ' de ' + obtener_mes_esp(today.month)
def obtener_barra_progreso(porc_vac):
simbolo_vacio = '○'
simbolo_lleno = '●'
return (int(porc_vac/TAMANO_BARRA_PROGRESO)*simbolo_lleno) + ( int((100/TAMANO_BARRA_PROGRESO) - int(porc_vac/TAMANO_BARRA_PROGRESO)) *simbolo_vacio)
def obtener_str_comunidad(comunidad, fila, poblacion):
personas_vacunadas = fila[-2]
porc_vac = round((personas_vacunadas/float(poblacion)*100), 2)
barra = obtener_barra_progreso(porc_vac)
return '{} → {}%\n{}\n\n'.format(comunidad, str(porc_vac).replace('.', ','), barra)
def obtener_tweets_comunidades(datos):
comunidades = []
# Andalucia
comunidades.append(obtener_str_comunidad('Andalucía', datos[1], POBLACION_ANDALUCIA))
# Aragon
comunidades.append(obtener_str_comunidad('Aragón', datos[2], POBLACION_ARAGON))
# Asturias
comunidades.append(obtener_str_comunidad('Asturias', datos[3], POBLACION_ASTURIAS))
# Baleares
comunidades.append(obtener_str_comunidad('Baleares', datos[4], POBLACION_BALEARES))
# Canarias
comunidades.append(obtener_str_comunidad('Canarias', datos[5], POBLACION_CANARIAS))
# Cantabria
comunidades.append(obtener_str_comunidad('Cantabria', datos[6], POBLACION_CANTABRIA))
# Castilla y León
comunidades.append(obtener_str_comunidad('Castilla y león', datos[7], POBLACION_CASTILLA_LEON))
# Castilla-La Mancha
comunidades.append(obtener_str_comunidad('Castilla-La Mancha', datos[8], POBLACION_CASTILLA_MANCHA))
# Cataluña
comunidades.append(obtener_str_comunidad('Cataluña', datos[9], POBLACION_CATALUNA))
# C. Valenciana
comunidades.append(obtener_str_comunidad('C. Valenciana', datos[10], POBLACION_C_VALENCIANA))
# Extremadura
comunidades.append(obtener_str_comunidad('Extremadura', datos[11], POBLACION_EXTREMADURA))
# Galicia
comunidades.append(obtener_str_comunidad('Galicia', datos[12], POBLACION_GALICIA))
# La rioja
comunidades.append(obtener_str_comunidad('La Rioja', datos[13], POBLACION_LA_RIOJA))
# Madrid
comunidades.append(obtener_str_comunidad('Madrid', datos[14], POBLACION_MADRID))
# Murcia
comunidades.append(obtener_str_comunidad('Murcia', datos[15], POBLACION_MURCIA))
# Navarra
comunidades.append(obtener_str_comunidad('Navarra', datos[16], POBLACION_NAVARRA))
# País Vasco
comunidades.append(obtener_str_comunidad('País Vasco', datos[17], POBLACION_PAIS_VASCO))
# Ceuta
comunidades.append(obtener_str_comunidad('Ceuta', datos[18], POBLACION_CEUTA))
# Melilla
comunidades.append(obtener_str_comunidad('Melilla', datos[19], POBLACION_MELILLA))
return comunidades
def publicar_tweet(tweet, comunidades, today):
notificacion = tweet + '\n-------\n'
# Configuramos el bbot
auth = tweepy.OAuthHandler(TW_CONSUMER_KEY, TW_CONSUMER_SECRET)
auth.set_access_token(TW_ACCESS_TOKEN, TW_ACCESS_TOKEN_SECRET)
# Iniciamos el cliente
api = tweepy.API(auth)
# Publicamos el tweet
hilo = api.update_status(tweet)
# Publicamos el hilo de las comunidades
aux = 0
respuesta = ''
for c in comunidades:
if (aux <= 4):
aux += 1
respuesta = respuesta + c
else:
hilo = api.update_status(status=respuesta, in_reply_to_status_id=hilo.id, auto_populate_reply_metadata=True)
notificacion = notificacion+respuesta+'-------\n'
aux = 1
respuesta = c
hilo = api.update_status(status=respuesta, in_reply_to_status_id=hilo.id, auto_populate_reply_metadata=True)
notificacion = notificacion+respuesta+'-------\n'
# Aunque se haya publicado hoy, los datos que se muestran son los del día anterior (por eso el -1)
guardar_ultima_fecha(avanzar_fecha_dias(today, 0).strftime('%Y%m%d'))
# Mandamos notificación informando del Tweet
sendNotification(notificacion, 'ℹ️')
return 0
def main(today):
# Obtenemos los datos y si hay fallo lo devolvemos
datos = obtener_datos(today)
if (isinstance(datos, int)):
return datos
# Extraemos los datos que nos pueden interesar
try:
array_totales = datos[u'Comunicación'][21]
dosis_administradas = array_totales[-4]
porc_admin = round((dosis_administradas/float(POBLACION_ESP)*100), 2)
personas_una_dosis = array_totales[-2]
personas_una_dosis_porc = round((personas_una_dosis/float(POBLACION_ESP)*100), 2)
personas_vacunadas = array_totales[-1]
porc_vac = round((personas_vacunadas/float(POBLACION_ESP)*100), 2)
except:
return 4
# Actualizamos los arrays de días y porcentajes
actualizar_dias_porcentajes((today-FECHA_INICIO_VACUNACION).days, porc_vac)
# Obtenemos el string de la fecha de hoy
fecha_actual_str = obtener_fecha_actual(today)
# Obtenemos el string de la fecha estimada
fecha_estimada_str = obtener_fecha_estimada(50, today)
# Obtenemos la barra de progreso
barra_progreso = obtener_barra_progreso(porc_vac)
# Formulamos el tweet
tweet = """{} \n\n💉 Dosis administradas: {} \n👤 Personas 1 dosis: {} ({}%) \n\n💃 Personas vacunadas: {} \nPOBLACIÓN VACUNADA: {}% ✌️\n\n{}\n\n📆 Est. 50%: {}""".format(fecha_actual_str, "{:,}".format(dosis_administradas).replace(',','.'), "{:,}".format(personas_una_dosis).replace(',','.'), str(personas_una_dosis_porc).replace('.', ','), "{:,}".format(personas_vacunadas).replace(',','.'), str(porc_vac).replace('.', ','), barra_progreso, fecha_estimada_str)
# Ontenemos los tweets de las comunidades
comunidades = obtener_tweets_comunidades(datos[u'Comunicación'])
# Publicamos el tweet
publicar_tweet(tweet, comunidades, today)
# Devolvemos 0 sino hay problemas
return 0
def init():
today = datetime.date.today()
# Tratar caso en el que se pide un día anterior
try:
dias = int(sys.argv[1])*-1
except:
dias = 0
today = avanzar_fecha_dias (today, dias)
# Lanzar el programa principal
resultado = main(today)
# Tratamiento de errores
if (resultado == 0):
msg = 'El tweet se ha publicado correctamente'
print(msg)
sendNotification(msg, '✅')
elif (resultado == 1):
msg = 'Todavía no hay datos para mostrar'
print(msg)
sendNotification(msg, '⚠️')
elif (resultado == 2):
msg = 'Ya se han publicado los datos de {}'.format(avanzar_fecha_dias(today, 0).strftime('%d-%m-%Y'))
print(msg)
sendNotification(msg, '⚠️')
elif (resultado == 3):
msg = 'Error al obtener los datos'
print(msg)
sendNotification(msg, '❌')
elif (resultado == 4):
msg = 'El Excel ha cambiado de formato'
print(msg)
sendNotification(msg, '❌')
if __name__ == "__main__":
init()