From b25d3b77e582b1420fe02abbf57634f45a79699a Mon Sep 17 00:00:00 2001 From: Joshua Date: Wed, 2 Oct 2024 19:11:03 +0200 Subject: [PATCH] Add manual reference price matching and marking MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit - Implemented manual reference prices for specific energy drink products based on size patterns in descriptions. - Overrode API-provided reference prices with manual prices when a match is found. - Added visual indicators (🛠️) to Telegram messages to denote manually set reference prices. - Enhanced blacklist filtering to be case-insensitive for more effective exclusion of unwanted offers. - Improved logging to track whether reference prices are sourced manually or from the API. - Fixed indentation errors to ensure proper script execution." --- energy_drink_offers.py | 334 +++++++++++++++++++++++++++++------------ 1 file changed, 237 insertions(+), 97 deletions(-) diff --git a/energy_drink_offers.py b/energy_drink_offers.py index 12e2af0..e42b646 100644 --- a/energy_drink_offers.py +++ b/energy_drink_offers.py @@ -1,11 +1,10 @@ -""" +"""" Energy Drink Offers Bot This script fetches energy drink offers and sends them to a Telegram bot. """ import os import re -import datetime import json import random import html @@ -13,6 +12,12 @@ import requests from bs4 import BeautifulSoup +import configparser +import logging + +# ================================ +# Configuration and Logging Setup +# ================================ # File paths for config and facts CONFIG_FILE_PATH = 'config.ini' @@ -35,13 +40,51 @@ 'Leipzig': '04109', } +# List of most popular supermarkets in Germany +MOST_POPULAR_SUPERMARKETS = [ + 'EDEKA', 'REWE', 'Lidl', 'ALDI NORD', 'PENNY', 'Kaufland', + 'Netto Marken-Discount', 'Rossmann', 'dm', 'Real', 'ALDI SÜD', 'tegut' +] + +# Manual reference prices based on product name and size patterns +MANUAL_REFERENCE_PRICES = [ + { + 'product_name': 'red bull energy drink', + 'size_patterns': [r'0[.,]25\s*l', r'025\s*l'], + 'reference_price': 1.39 + }, + { + 'product_name': 'effect energy drink', + 'size_patterns': [r'1[-\s]?l[-\s]?fl', r'1\s*l\s*fl'], + 'reference_price': 2.49 + }, + { + 'product_name': 'effect energy drink', + 'size_patterns': [r'0[.,]33\s*liter', r'0.33\s*liter'], + 'reference_price': 1.09 + } +] + +# Configure logging +logging.basicConfig( + level=logging.INFO, + format='%(asctime)s - %(levelname)s - %(message)s', + handlers=[ + logging.FileHandler("energy_drink_bot.log"), + logging.StreamHandler() + ] +) + +# ============================= +# Configuration File Handling +# ============================= + # Read configuration from config.ini config = configparser.ConfigParser() - def update_api_keys(x_apikey, x_clientkey): """Updates the API keys in the config.ini file.""" - print("Updating API keys in config.ini...") + logging.info("Updating API keys in config.ini...") # Ensure the API section exists if not config.has_section('API'): @@ -55,69 +98,74 @@ def update_api_keys(x_apikey, x_clientkey): try: with open(CONFIG_FILE_PATH, 'w', encoding='utf-8') as config_file: config.write(config_file) - print("API keys successfully written to config.ini.") + logging.info("API keys successfully written to config.ini.") except Exception as error: - print(f"Error writing to config.ini: {error}") - + logging.error(f"Error writing to config.ini: {error}") def retrieve_api_keys(): """Retrieves API keys from the Marktguru website.""" - print("Retrieving API keys from Marktguru...") - - response = requests.get("https://www.marktguru.de/") - if response.status_code == 200: - soup = BeautifulSoup(response.content, 'html.parser') - - # Search for API keys in the HTML or scripts using regular expressions - scripts = soup.find_all('script') - x_apikey = None - x_clientkey = None - - for script in scripts: - script_content = script.string - if script_content: - apikey_match = re.search(r'"x_apikey":"(.*?)"', script_content) - clientkey_match = re.search(r'"x_clientkey":"(.*?)"', script_content) - - if apikey_match: - x_apikey = apikey_match.group(1) - if clientkey_match: - x_clientkey = clientkey_match.group(1) - - if x_apikey and x_clientkey: - break - - if x_apikey and x_clientkey: - print(f"Retrieved x_apikey: {x_apikey}, x_clientkey: {x_clientkey}") - update_api_keys(x_apikey, x_clientkey) - else: - print("Failed to find API keys in the response.") - else: - print(f"Failed to retrieve API keys. Status code: {response.status_code}") + logging.info("Retrieving API keys from Marktguru...") + try: + response = requests.get("https://www.marktguru.de/", timeout=10) + if response.status_code == 200: + soup = BeautifulSoup(response.content, 'html.parser') + + # Search for API keys in the HTML or scripts using regular expressions + scripts = soup.find_all('script') + x_apikey = None + x_clientkey = None + + for script in scripts: + script_content = script.string + if script_content: + apikey_match = re.search(r'"x_apikey":"(.*?)"', script_content) + clientkey_match = re.search(r'"x_clientkey":"(.*?)"', script_content) + + if apikey_match: + x_apikey = apikey_match.group(1) + if clientkey_match: + x_clientkey = clientkey_match.group(1) + + if x_apikey and x_clientkey: + break + + if x_apikey and x_clientkey: + logging.info(f"Retrieved x_apikey: {x_apikey}, x_clientkey: {x_clientkey}") + update_api_keys(x_apikey, x_clientkey) + else: + logging.error("Failed to find API keys in the response.") + else: + logging.error(f"Failed to retrieve API keys. Status code: {response.status_code}") + except requests.RequestException as e: + logging.error(f"Exception occurred while retrieving API keys: {e}") +# Initialize config if os.path.exists(CONFIG_FILE_PATH): - print("Loading config.ini...") + logging.info("Loading config.ini...") config.read(CONFIG_FILE_PATH) else: - print("Creating new config.ini...") + logging.info("Creating new config.ini...") config['Telegram'] = {'bot_token': '', 'chat_id': ''} config['API'] = {'x_apikey': '', 'x_clientkey': ''} - with open(CONFIG_FILE_PATH, 'w', encoding='utf-8') as config_file: - config.write(config_file) + try: + with open(CONFIG_FILE_PATH, 'w', encoding='utf-8') as config_file: + config.write(config_file) + logging.info("config.ini created successfully.") + except Exception as error: + logging.error(f"Error creating config.ini: {error}") +# Retrieve API keys if not present if not config.get('API', 'x_apikey') or not config.get('API', 'x_clientkey'): retrieve_api_keys() +# Telegram credentials TELEGRAM_BOT_TOKEN = config.get('Telegram', 'bot_token') CHAT_ID = config.get('Telegram', 'chat_id') -# List of most popular supermarkets in Germany -MOST_POPULAR_SUPERMARKETS = [ - 'EDEKA', 'REWE', 'Lidl', 'ALDI NORD', 'PENNY', 'Kaufland', - 'Netto Marken-Discount', 'Rossmann', 'dm', 'Real', 'ALDI SÜD', 'tegut' -] - +# ============================== +# Utility Functions +# ============================== def load_energy_drink_facts(): """Loads random energy drink facts from facts.json.""" @@ -130,11 +178,10 @@ def load_energy_drink_facts(): facts = json.loads(data).get('facts', []) return [fact for fact in facts if fact.strip()] except json.JSONDecodeError: - print(f"Error decoding '{FACTS_FILE_PATH}'. Please check the JSON format.") + logging.error(f"Error decoding '{FACTS_FILE_PATH}'. Please check the JSON format.") return [] return [] - def fetch_offers(zip_code): """Fetches energy drink offers for a given city.""" api_url = "https://api.marktguru.de/api/v1/offers/search" @@ -153,16 +200,21 @@ def fetch_offers(zip_code): 'Content-Type': 'application/json', } - response = requests.get(api_url, headers=headers, params=params) - if response.status_code == 200: - offers = response.json().get('results', []) - if USE_SUPERMARKET_FILTER: - return [offer for offer in offers if offer.get('advertisers', [{}])[0].get('name', '') in MOST_POPULAR_SUPERMARKETS] - return offers - print(f"Error {response.status_code}: {response.text}") + try: + response = requests.get(api_url, headers=headers, params=params, timeout=10) + if response.status_code == 200: + offers = response.json().get('results', []) + if USE_SUPERMARKET_FILTER: + return [ + offer for offer in offers + if offer.get('advertisers', [{}])[0].get('name', '') in MOST_POPULAR_SUPERMARKETS + ] + return offers + logging.error(f"Error {response.status_code}: {response.text}") + except requests.RequestException as e: + logging.error(f"Exception occurred while fetching offers: {e}") return [] - def load_blacklist(blacklist_file='blacklist.json'): """Loads blacklisted terms from blacklist.json.""" try: @@ -170,14 +222,26 @@ def load_blacklist(blacklist_file='blacklist.json'): data = file.read().strip() if not data: return [] - return json.loads(data).get('blacklisted_terms', []) + # Convert all blacklisted terms to lowercase for case-insensitive matching + return [term.lower() for term in json.loads(data).get('blacklisted_terms', [])] except FileNotFoundError: - print(f"Blacklist file '{blacklist_file}' not found.") + logging.error(f"Blacklist file '{blacklist_file}' not found.") return [] except json.JSONDecodeError: - print(f"Error decoding '{blacklist_file}'. Please check the JSON format.") + logging.error(f"Error decoding '{blacklist_file}'. Please check the JSON format.") return [] +def get_manual_reference_price(product_name, description): + """ + Returns the manual reference price for a given product based on its name and description. + If no manual reference price is found, returns None. + """ + for entry in MANUAL_REFERENCE_PRICES: + if entry['product_name'] in product_name: + for pattern in entry['size_patterns']: + if re.search(pattern, description, re.IGNORECASE): + return entry['reference_price'] + return None def split_message(message, max_length=4000): """Splits long messages into chunks.""" @@ -195,13 +259,16 @@ def split_message(message, max_length=4000): return parts return [message] +# ====================================== +# Offer Formatting and Message Generation +# ====================================== def format_offers_for_all_cities(all_offers): """Formats the offers and generates the final message.""" today = datetime.today() week_number = today.isocalendar()[1] - message = f"🥤 Wöchentliche Energy Drink Angebote 🥤\n\n" - message += f"Woche {week_number} ({today.strftime('%d.%m.%Y')})\n\n" + message = f"🥤 Weekly Energy Drink Offers 🥤\n\n" + message += f"Week {week_number} ({today.strftime('%d.%m.%Y')})\n\n" blacklist = load_blacklist() @@ -210,12 +277,22 @@ def format_offers_for_all_cities(all_offers): for city, offers in all_offers.items(): for offer in offers: - store = html.escape(offer.get('advertisers', [{}])[0].get('name', 'Unbekannter Laden')) + store = html.escape(offer.get('advertisers', [{}])[0].get('name', 'Unknown Store')) price = offer.get('price') - product_name = html.escape(offer.get('product', {}).get('name', 'Kein Titel').strip().lower()) - brand_name = html.escape(offer.get('brand', {}).get('name', 'Unbekannte Marke')) + if price is not None: + try: + price = float(price) + except ValueError: + price = 0.0 + else: + price = 0.0 # Fallback if price is not available + + product_name = html.escape(offer.get('product', {}).get('name', 'No Title').strip().lower()) + brand_name = html.escape(offer.get('brand', {}).get('name', 'Unknown Brand')) + # Case-insensitive blacklist filtering if any(blacklisted_term in product_name for blacklisted_term in blacklist): + logging.info(f"Excluded offer '{product_name}' due to blacklist.") continue unique_identifier = (product_name, brand_name, price, store) @@ -228,27 +305,76 @@ def format_offers_for_all_cities(all_offers): if store not in unique_offers: unique_offers[store] = [] - description = html.escape(offer.get('description', '')) + description = html.escape(offer.get('description', '').lower()) # Convert description to lowercase for matching validity = '' if 'validityDates' in offer and offer['validityDates']: - valid_from = datetime.strptime(offer['validityDates'][0].get('from', '')[:10], '%Y-%m-%d').strftime('%d.%m.%Y') - valid_to = datetime.strptime(offer['validityDates'][0].get('to', '')[:10], '%Y-%m-%d').strftime('%d.%m.%Y') - validity = f"{valid_from} bis {valid_to}" + try: + valid_from = datetime.strptime(offer['validityDates'][0].get('from', '')[:10], '%Y-%m-%d').strftime('%d.%m.%Y') + valid_to = datetime.strptime(offer['validityDates'][0].get('to', '')[:10], '%Y-%m-%d').strftime('%d.%m.%Y') + validity = f"{valid_from} to {valid_to}" + except (ValueError, TypeError): + validity = "Validity date not available" else: - validity = "Gültigkeitsdatum nicht verfügbar" - - offer_text = ( - f"• {brand_name.title()} {product_name.title()}\n" - f" 💰 Preis: €{price:.2f}\n" - f" 📄 {description}\n" - f" 📅 Gültig: {validity}\n\n" - ) + validity = "Validity date not available" + + # Retrieve manual reference price if applicable + manual_reference_price = get_manual_reference_price(product_name, description) + if manual_reference_price is not None: + reference_price = manual_reference_price + reference_source = "manual" # Flag to indicate manual reference price + logging.info(f"Using manual reference price for '{product_name}': €{reference_price:.2f}") + else: + # Extract referencePrice as UVP, if available + reference_price = offer.get('referencePrice') + if reference_price: + try: + reference_price = float(reference_price) + reference_source = "api" # Flag to indicate API-provided reference price + logging.info(f"Using API reference price for '{product_name}': €{reference_price:.2f}") + except ValueError: + reference_price = None + reference_source = "none" + logging.warning(f"Invalid referencePrice format for '{product_name}'.") + else: + reference_price = None + reference_source = "none" + + # Calculate savings if reference_price is available + if reference_price is not None: + try: + savings = reference_price - price + except TypeError: + savings = 0.0 + else: + savings = 0.0 + + # Updated Price Formatting with Manual Reference Price Indicator + if reference_price and savings > 0: + # Determine the indicator based on the reference source + if reference_source == "manual": + manual_indicator = " 🛠️" # You can choose any suitable emoji or text + else: + manual_indicator = "" + offer_text = ( + f"• {brand_name.title()} {product_name.title()}\n" + f" 💶Price:€{reference_price:.2f} €{price:.2f} (You save €{savings:.2f}!){manual_indicator}\n" + f" 📄 {description}\n" + f" 📅 Valid: {validity}\n\n" + ) + else: + # Fallback if referencePrice is not available or no savings + offer_text = ( + f"• {brand_name.title()} {product_name.title()}\n" + f" 💶Price: €{price:.2f}\n" + f" 📄 {description}\n" + f" 📅 Valid: {validity}\n\n" + ) unique_offers[store].append(offer_text) for store, offers in unique_offers.items(): if offers: - message += f"Angebote bei {store}:\n" + message += f"Offers at {store}:\n" message += ''.join(offers) message += '\n' @@ -257,20 +383,23 @@ def format_offers_for_all_cities(all_offers): if energy_drink_facts: fact = random.choice(energy_drink_facts) if energy_drink_facts else None if fact: - message += "🔍 Energy Drink Fakt 🔍\n\n" + message += "🔍 Energy Drink Fact 🔍\n\n" message += f"{html.escape(fact)}\n" else: - message += "Keine Energy Drink Fakten verfügbar." + message += "No Energy Drink facts available." return split_message(message) +# ====================================== +# Telegram Messaging Functions +# ====================================== def send_telegram_message(message_parts): """Sends multiple messages if needed and pins the last one.""" if config.has_option('Telegram', 'chat_id'): chat_ids = config.get('Telegram', 'chat_id').split(',') else: - print("No 'chat_id' found in config.ini. Please add chat IDs.") + logging.error("No 'chat_id' found in config.ini. Please add chat IDs.") return for chat_id in chat_ids: @@ -285,19 +414,21 @@ def send_telegram_message(message_parts): 'parse_mode': 'HTML' } - response = requests.post(telegram_url, data=payload) - - if response.status_code == 200: - print(f"Message part sent successfully to chat ID {chat_id}!") - message_id_to_pin = response.json().get('result', {}).get('message_id') - else: - print(f"Failed to send message to chat ID {chat_id}. Error {response.status_code}: {response.text}") + try: + response = requests.post(telegram_url, data=payload, timeout=10) + if response.status_code == 200: + logging.info(f"Message part sent successfully to chat ID {chat_id}!") + message_id_to_pin = response.json().get('result', {}).get('message_id') + else: + logging.error(f"Failed to send message to chat ID {chat_id}. Error {response.status_code}: {response.text}") + return + except requests.RequestException as e: + logging.error(f"Exception occurred while sending message to chat ID {chat_id}: {e}") return if message_id_to_pin: pin_message_in_chat(chat_id, message_id_to_pin) - def pin_message_in_chat(chat_id, message_id): """Pins a message in a Telegram chat.""" pin_url = f"https://api.telegram.org/bot{TELEGRAM_BOT_TOKEN}/pinChatMessage" @@ -306,24 +437,33 @@ def pin_message_in_chat(chat_id, message_id): 'message_id': message_id } - response = requests.post(pin_url, data=payload) - - if response.status_code == 200: - print(f"Message pinned successfully in chat ID {chat_id}!") - else: - print(f"Failed to pin message in chat ID {chat_id}. Error {response.status_code}: {response.text}") + try: + response = requests.post(pin_url, data=payload, timeout=10) + if response.status_code == 200: + logging.info(f"Message pinned successfully in chat ID {chat_id}!") + else: + logging.error(f"Failed to pin message in chat ID {chat_id}. Error {response.status_code}: {response.text}") + except requests.RequestException as e: + logging.error(f"Exception occurred while pinning message in chat ID {chat_id}: {e}") +# ============================ +# Main Execution Function +# ============================ def main(): """Main function to fetch offers and send them to Telegram.""" all_offers = {} for city, zip_code in BIG_CITIES.items(): + logging.info(f"Fetching offers for {city} (ZIP: {zip_code})...") offers = fetch_offers(zip_code) all_offers[city] = offers message_parts = format_offers_for_all_cities(all_offers) send_telegram_message(message_parts) +# ====================== +# Script Entry Point +# ====================== if __name__ == '__main__': main()