From c58b237f628846af77979f0d1ca58eebb6e8549f Mon Sep 17 00:00:00 2001 From: senshi-x <6715079+senshi-x@users.noreply.github.com> Date: Mon, 18 Mar 2024 17:12:50 +0100 Subject: [PATCH] Rewrite - Proper separation of backend and frontend. Fresh rich frontend (#31) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit * Typisierung von Variablen hinzugefügt, typing * Bessere Trennung von Front- (main.py) und Backend (Connection.py) * Deutlichere Empfehlungen für bessere Sicherheit im settings example * Verbessertes Handling von doppelten Dateinamen. Zuerst wird Zeitstempel angehängt, anschließend hochzählende Zahl. * GUI auf rich menu umgestellt, dadurch etwas hübscher, lesbarer und mehr nützliche Infos. * Es wird nun mindestens Python-Version 3.8 benötigt! --------- Co-authored-by: Senshi <6715079+WGPSenshi@users.noreply.github.com> --- .editorconfig | 12 + ComdirectConnection.py | 326 +++++++++++------------ README.md | 26 +- main.py | 585 +++++++++++++++++++++-------------------- requirements.txt | 9 +- settings.ini.example | 40 +-- settings.py | 110 +++----- setup.py | 17 -- 8 files changed, 534 insertions(+), 591 deletions(-) create mode 100644 .editorconfig delete mode 100644 setup.py diff --git a/.editorconfig b/.editorconfig new file mode 100644 index 0000000..a1027e5 --- /dev/null +++ b/.editorconfig @@ -0,0 +1,12 @@ +# EditorConfig is awesome: https://EditorConfig.org + +# top-most EditorConfig file +root = true + +[*] +indent_style = space +indent_size = 4 +end_of_line = lf +charset = utf-8 +trim_trailing_whitespace = true +insert_final_newline = false \ No newline at end of file diff --git a/ComdirectConnection.py b/ComdirectConnection.py index 2d22c13..a7ef6a9 100644 --- a/ComdirectConnection.py +++ b/ComdirectConnection.py @@ -1,58 +1,122 @@ +from typing import Any import requests -import random -import string import json +import secrets from datetime import datetime baseUrl = "https://api.comdirect.de/" +class XOnceAuthenticationInfo: + id: str + typ: str + challenge: str + availableTypes: list[str] + + def __init__(self, data: dict[str, str]): + self.id = data["id"] + self.typ = data["typ"] + if hasattr(data, "challenge"): + self.challenge = data["challenge"] + self.availableTypes = [] + for x in data["availableTypes"]: + self.availableTypes.append(x) + + +class DocumentMeta: + archived: bool + dateRead: datetime | None + alreadyRead: bool + predocumentExists: bool + + def __init__(self, data: dict[str, Any]): + # print(json.dumps(data, indent=4)) + self.archived = data["archived"] + if "dateRead" in data: + self.dateRead = datetime.strptime(data["dateRead"], "%Y-%m-%d") + self.alreadyRead = data["alreadyRead"] + self.predocumentExists = data["predocumentExists"] + + +class Document: + documentId: str + name: str + dateCreation: datetime + mimeType: str + deleteable: bool + advertisement: bool + documentMetadata: DocumentMeta + + def __init__(self, data: dict[str, Any]): + self.documentId = data["documentId"] + self.name = data["name"] + self.dateCreation = datetime.strptime(data["dateCreation"], "%Y-%m-%d") + self.mimeType = data["mimeType"] + self.deletable = data["deletable"] + self.advertisement = data["advertisement"] + self.documentMetadata = DocumentMeta(data["documentMetaData"]) + + +class DocumentList: + index: int + matches: int + unreadMessages: int + dateOldestEntry: datetime + matchesInThisResponse: int + allowedToSeeAllDocuments: bool + documents: list[Document] + + def __init__(self, data: dict[str, Any]): + self.index = data["paging"]["index"] + self.matches = data["paging"]["matches"] + self.unreadMessages = data["aggregated"]["unreadMessages"] + self.dateOldestEntry = datetime.strptime(data["aggregated"]["dateOldestEntry"], "%Y-%m-%d") + self.matchesInThisResponse = data["aggregated"]["matchesInThisResponse"] + self.allowedToSeeAllDocuments = data["aggregated"]["allowedToSeeAllDocuments"] + self.documents = [] + for x in data["values"]: + self.documents.append(Document(x)) + + class Connection: - def __init__(self, client_id, client_secret, username, password): + attempts: int = 0 + client_id: str + client_secret: str + username: str + password: str + sessionId: str = secrets.token_urlsafe(32) # length must be <= 32 + requestId: str = datetime.now().strftime("%H%M%S%f")[:-3] # length must be == 9 + + def __init__(self, client_id: str, client_secret: str, username: str, password: str): self.client_id = client_id self.client_secret = client_secret self.username = username self.password = password - self.sessionId = "" - self.requestId = "" - for _ in range(12): - self.sessionId += random.choice(string.ascii_lowercase + string.digits) - self.requestId = datetime.now().strftime("%Y%m%d%H%M%S") + # self.sessionId = "" + # for _ in range(12): + # self.sessionId += random.choice(string.ascii_lowercase + string.digits) + # self.requestId = datetime.now().strftime("%Y%m%d%H%M%S") - def login(self): + def initSession(self): self.__getOAuth() self.__getSession() - self.__getTANChallenge() - self.__getCDSecondary() - - def __getHeaders(self, contentType="application/json", requestId=""): - - if not requestId: - self.requestId = datetime.now().strftime("%Y%m%d%H%M%S") + return self.__getTANChallenge() + def __getHeaders(self, contentType: str = "application/json", requestId: str = ""): headers = {"Accept": "application/json", "Content-Type": contentType} - try: - - if self.access_token: - headers["Authorization"] = "Bearer " + self.access_token - except Exception as err: - pass + if hasattr(self, "access_token"): + headers["Authorization"] = "Bearer " + self.access_token - try: - if self.sessionId: - headers["x-http-request-info"] = str( - { - "clientRequestId": { - "sessionId": self.sessionId, - "requestId": self.requestId, - } + if hasattr(self, "sessionId"): + headers["x-http-request-info"] = str( + { + "clientRequestId": { + "sessionId": self.sessionId, + "requestId": self.requestId, } - ) - except Exception as err: - print(err) - print("no self.sessionId set") - pass + } + ) return headers @@ -69,37 +133,31 @@ def __getOAuth(self): headers=self.__getHeaders("application/x-www-form-urlencoded"), ) - try: - if r.status_code == 200: - rjson = r.json() - self.access_token = rjson["access_token"] - self.refresh_token = rjson["refresh_token"] - else: - status = r.status_code - reason = "" - if status == 401: - reason = "This usually means wrong clientID/clientSecret" - elif status == 400: - reason = "This usually means wrong username/pwd" - print(f"HTTP Status: {r.status_code} | {r.json()['error_description']} | {reason}") - except Exception as err: - raise err + if r.status_code == 200: + rjson = r.json() + self.access_token = rjson["access_token"] + self.refresh_token = rjson["refresh_token"] + else: + status = r.status_code + reason = "" + if status == 401: + reason = "This usually means wrong clientID/clientSecret" + elif status == 400: + reason = "This usually means wrong username/pwd" + print(f"HTTP Status: {r.status_code} | {r.json()['error_description']} | {reason}") + r.raise_for_status() + return r def __getSession(self): """ Retrieve the current session, initializes if not existing. """ headers = self.__getHeaders("application/x-www-form-urlencoded") - r = requests.get( - baseUrl + "api/session/clients/user/v1/sessions", headers=headers - ) + r = requests.get(baseUrl + "api/session/clients/user/v1/sessions", headers=headers) if r.status_code == 200: - try: - self.sessionApiId = r.json()[0]["identifier"] - except Exception as err: - raise err - else: - print(f"HTTP Status: {r.status_code} | {r.json()}") + self.sessionApiId = r.json()[0]["identifier"] + r.raise_for_status() + return r def __getTANChallenge(self): """ @@ -107,10 +165,7 @@ def __getTANChallenge(self): WARNING: More than 5 failed/unverified attempts will lead the banking access to be locked and requires unlocking by customer support!!! """ r = requests.post( - baseUrl - + "api/session/clients/user/v1/sessions/" - + self.sessionApiId - + "/validate", + baseUrl + "api/session/clients/user/v1/sessions/" + self.sessionApiId + "/validate", json={ "identifier": self.sessionApiId, "sessionTanActive": True, @@ -118,48 +173,18 @@ def __getTANChallenge(self): }, headers=self.__getHeaders("application/json"), ) + r.raise_for_status() + return r - if r.status_code == 201: - self.__getSessionTAN(r.headers) - else: - print(f"HTTP Status: {r.status_code} | {r.json()}") - - def __getSessionTAN(self, validationHeaders): + def getSessionTAN(self, challenge_id: str, challenge_tan: str): """ Retrieves a valid TAN after the user has solved the challenge. """ - xauthinfoheaders = json.loads(validationHeaders["x-once-authentication-info"]) headers = self.__getHeaders("application/json") - headers["x-once-authentication-info"] = json.dumps( - {"id": xauthinfoheaders["id"]} - ) - if xauthinfoheaders["typ"] == "P_TAN_PUSH": - # If Push-TAN, user needs to approve the TAN in app, that's it. - print( - "You are using PushTAN. Please use your smartphone's Comdirect photoTAN app to validate the access request to your 'personal area'." - ) - print( - "Please only continue once you have done so! Failure to validate this request for 5 consecutive times will result in your access being blocked." - ) - input( - "Press ENTER after you have cleared the PushTAN challenge on your phone." - ) - elif xauthinfoheaders["typ"] == "P_TAN": - # If photoTAN, user needs to solve the challenge and provide the tan manually. - tan = self.__challenge_ptan(xauthinfoheaders["challenge"]) - headers["x-once-authentication"] = tan - elif xauthinfoheaders["typ"] == "M_TAN": - # If mobile TAN, user gets TAN via mobile. - tan = self.__challenge_mtan(xauthinfoheaders["challenge"]) - headers["x-once-authentication"] = tan - else: - print( - "Sorry, the TAN type " - + xauthinfoheaders["typ"] - + " is not yet supported" - ) - exit(1) + headers["x-once-authentication-info"] = json.dumps({"id": challenge_id}) + if challenge_tan != "": + headers["x-once-authentication"] = challenge_tan r = requests.patch( baseUrl + "api/session/clients/user/v1/sessions/" + self.sessionApiId, @@ -170,33 +195,9 @@ def __getSessionTAN(self, validationHeaders): }, headers=headers, ) - if r.status_code != 200: - print(f"HTTP Status: {r.status_code} | {r.json()}") - - def __challenge_ptan(self, challenge): - """ - Challenge to solve for photo TAN - challenge : Base64 encoded image data - """ - from PIL import Image - import base64 - import io + return r - Image.open(io.BytesIO(base64.b64decode(challenge))).show() - print(" Please follow the usual photo TAN challenge process.") - tan = input("Enter the TAN code: ") - return tan - - def __challenge_mtan(self, challenge): - """ - Challenge to get the mobile TAN - """ - - print(" Please follow the usual mobile TAN challenge process.") - tan = input("Enter the TAN code: ") - return tan - - def __getCDSecondary(self): + def getCDSecondary(self): r = requests.post( baseUrl + "oauth/token", headers={ @@ -225,14 +226,8 @@ def __getCDSecondary(self): # The following are provided, but serve no actual use. # self.bpid = rjson["bpid"] # self.kontaktId = rjson["kontaktId"] - else: - raise RuntimeWarning( - r.request.url - + " exited with " - + str(r.status_code) - + ": " - + json.dumps(r.json()) - ) + r.raise_for_status() + return r def refresh(self): r = requests.post( @@ -253,6 +248,7 @@ def refresh(self): self.access_token = rjson["access_token"] self.refresh_token = rjson["refresh_token"] self.scope = rjson["scope"] # Currently always "full access" + r.raise_for_status() def revoke(self): r = requests.delete( @@ -263,45 +259,34 @@ def revoke(self): "Authorization": "Bearer " + self.access_token, }, ) - if r.status_code != 204: - print("Something went wrong trying to revoke your access token.") + r.raise_for_status() + return r - def getMessagesList(self, start=0, count=1000): + def getMessagesList(self, start: int = 0, count: int = 1000): + headers = { + "Accept": "application/json", + "Authorization": "Bearer " + self.access_token, + "x-http-request-info": str( + { + "clientRequestId": { + "sessionId": self.sessionId, + "requestId": self.requestId, + }, + } + ), + } r = requests.get( - baseUrl - + "api/messages/clients/user/v2/documents?paging-first=" - + str(start) - + "&paging-count=" - + str(count), - headers={ - "Accept": "application/json", - "Authorization": "Bearer " + self.access_token, - "x-http-request-info": str( - { - "clientRequestId": { - "sessionId": self.sessionId, - "requestId": self.requestId, - }, - } - ), - }, + baseUrl + "api/messages/clients/user/v2/documents?paging-first=" + str(start) + "&paging-count=" + str(count), + headers=headers, ) - if r.status_code != 200: - raise RuntimeWarning( - r.request.url - + " exited with " - + str(r.status_code) - + ": " - + json.dumps(r.json()) - ) - # print(json.dumps(r.json(), indent=4)) - return r.json() + r.raise_for_status() + return DocumentList(r.json()) - def downloadMessage(self, document): + def downloadDocument(self, document: Document): r = requests.get( - baseUrl + "api/messages/v2/documents/" + document["documentId"], + f"{baseUrl}api/messages/v2/documents/{document.documentId}", headers={ - "Accept": document["mimeType"], + "Accept": document.mimeType, "Content-Type": "application/x-www-form-urlencoded", "Authorization": "Bearer " + self.access_token, "x-http-request-info": str( @@ -314,10 +299,5 @@ def downloadMessage(self, document): ), }, ) - if r.status_code == 200: - return r.content - else: - print(r.status_code) - # print(json.dumps(r.json(), indent=4)) - print(r.json()) - raise RuntimeWarning("Document could not be retrieved!") + r.raise_for_status() + return r.content diff --git a/README.md b/README.md index 0b62e3c..cd3b423 100644 --- a/README.md +++ b/README.md @@ -2,19 +2,19 @@ Lädt alle PDF-Dokumente aus einer beliebigen Zeitspanne herunter. -Benötigt wird Python 3.x +Benötigt wird mindestens **Python 3.8** -Das Tool muss mittels Kommandozeile gestartet und bedient werden. +Das Tool muss mittels Kommandozeile gestartet und bedient werden. Es ist zwingend erforderlich, die `settings.ini` anzulegen. -Es werden sowohl das Photo-PushTAN- als auch Mobile-TAN-Verfahren unterstützt. +Es werden sowohl das Photo-PushTAN- als auch Mobile-TAN-Verfahren unterstützt. Das klassische PhotoTAN-Verfahren ist implementiert, aber noch nicht getestet. # Setup Im Verzeichnis einmalig ausführen: -> pip install -r requirements.txt - +> pip install -Ur requirements.txt + oder -> python -m pip install -r requirements.txt +> python -m pip install -Ur requirements.txt Die `settings.ini` konfigurieren und bereitstellen (siehe Kapitel unten). @@ -38,17 +38,19 @@ Die folgenden Einstellungen erlauben es, das Verhalten des Downloads zu konfigur - **downloadSource** = Auswahl der Datenherkunft. -Siehe settings.ini.example als Beispieldatei. +Siehe **settings.ini.example** als Beispieldatei. ### Outputdir Es können relative Pfade angegeben werden. Diese werden ausgehend vom Skriptverzeichnis aufgelöst. Z.b. Dokumente als Unterverzeichnis. -Wird ein absoluter Pfad angegeben (z.b. *C:\\Benutzer\\Annonymus\\Dokumente\\Bank\\Comdirect\\PDFs* ), so wird dieser auch korrekt verwendet. +Wird ein absoluter Pfad angegeben (z.b. *C:\\\\Benutzer\\\\Annonymus\\\\Dokumente\\\\Bank\\\\Comdirect\\\\PDFs* ), so wird dieser auch korrekt verwendet. -Wichtig: "\" als Pfad-Trenner muss immer doppelt angegeben werden wie in obigem Beispiel! +Wichtig: "\\" als Pfad-Trenner muss immer doppelt angegeben werden wie in obigem Beispiel! ## Verwendet: -- Python 3.x +- Python 3.8+ - Python-Bibliotheken: - - Pathvalidate (für Validierung der Ausgabedateinamen) - - Requests (für REST-Anfragen) + - pathvalidate (für Validierung der Ausgabedateinamen) + - pillow (für PhotoTAN-Verfahren) + - requests (für REST-Anfragen) + - rich (für hübsches Terminal-UI) diff --git a/main.py b/main.py index 1f8ad77..4ba9153 100755 --- a/main.py +++ b/main.py @@ -1,22 +1,52 @@ #!/usr/bin/env python3 -from ComdirectConnection import Connection +import json +from ComdirectConnection import Connection, Document, XOnceAuthenticationInfo from settings import Settings -from pathvalidate import sanitize_filename +from pathvalidate._filename import sanitize_filename +from typing import Any from enum import Enum +from rich.console import Console +from rich.table import Table +from rich.prompt import IntPrompt +from rich.progress import ( + BarColumn, + Progress, + TextColumn, + TimeRemainingColumn, + TaskProgressColumn +) import os -import time -import datetime + +ui_width= 200 +console = Console(width=ui_width) + +class IntPromptDeutsch(IntPrompt): + validate_error_message = "[prompt.invalid]Bitte einen gültigen Wert eingeben" + illegal_choice_message = "[prompt.invalid.choice]Bitte eine der gültigen Optionen auswählen" + + +def print(string: Any, highlight : bool| None= None): + console.print(string, highlight=highlight) class DownloadSource(Enum): - archivedOnly = 'archivedOnly' - notArchivedOnly = 'notArchivedOnly' - all = 'all' + archivedOnly = "archivedOnly" + notArchivedOnly = "notArchivedOnly" + all = "all" class Main: - def __init__(self, dirname): + conn: Connection + onlineDocumentsDict: dict[int, Document] = {} + onlineAdvertismentIndicesList: list[int] = [] + onlineArchivedIndicesList: list[int] = [] + onlineUnreadIndicesList: list[int] = [] + onlineFileNameMatchingIndicesList: list[int] = [] + onlineNotYetDownloadedIndicesList: list[int] = [] + onlineAlreadyDownloadedIndicesList: list[int] = [] + + def __init__(self, dirname: str): self.dirname = dirname try: self.settings = Settings(dirname) @@ -25,157 +55,157 @@ def __init__(self, dirname): input("Press ENTER to close. Create settings.ini from the example before trying again.") exit(0) - self.conn = False - - self.onlineDocumentsDict = {} - self.onlineAdvertismentIndicesList = [] - self.onlineArchivedIndicesList = [] - self.onlineUnreadIndicesList = [] - self.onlineFileNameMatchingIndicesList = [] - self.onlineNotYetDownloadedIndicesList = [] - self.onlineAlreadyDownloadedIndicesList = [] - self.showMenu() - def __printFullWidth(self, printString, align="center", filler="-", width=74): - printString = str(printString) - spaces = width - len(printString) - if spaces % 2 == 1: - printString += " " - spaces = width - len(printString) - - if align == "center": - filler = int(spaces / 2) - print(filler * "-" + printString + filler * "-") - elif align == "left": - filler = int(spaces) - print(printString + filler * "-") - elif align == "right": - filler = int(spaces) - print(filler * "-" + printString) - - def __printLeftRight(self, printLeftString, printRightString, filler=".", width=74): - printLeftString = str(printLeftString) - printRightString = str(printRightString) - spaces = width - len(printLeftString) - len(printRightString) - print(printLeftString + (spaces * filler) + printRightString) - def showMenu(self): def __print_menu(): - self.__printFullWidth("--") - self.__printFullWidth(" Comdirect Documents Downloader ") - self.__printFullWidth(" by WGPSenshi & retiredHero ") - self.__printFullWidth("--") - onlineStatus = " online " - if not self.conn: - onlineStatus = " not online " - self.__printFullWidth("Current Status: " + onlineStatus, "left") - self.__printFullWidth("--") - print("1. Show Current Settings ") - print("2. Reload Settings From File settings.ini ") - print("3. Show Status Local Files (WIP) ") - print("4. Show Status Online Files ") - # print("5. (WIP) ") - print("6. Start Download / Update Local Files ") - print("0. Exit ") - self.__printFullWidth("--") + onlineStatus = "[green]ONLINE[/green]" + if not hasattr(self, "conn"): + onlineStatus = "[red]OFFLINE[/red]" + + console.clear() + header = Table(box=None, width= int(ui_width / 2)) + header.add_column(justify="left", width=5) + header.add_column(justify="center") + header.add_row("", "[b]Comdirect Documents Downloader", "") + header.add_row("", "[dim]by [cyan]Senshi_x[/cyan] and [cyan]retiredHero[/cyan]", "") + header.add_row("", f"{onlineStatus}", "") + table = Table(width= int(ui_width / 2)) + table.add_column("", no_wrap=True, width=3, style="blue b") + table.add_column("Aktion", style="cyan", ratio=999) + table.add_row("(1)", "Einstellungen anzeigen") + table.add_row("(2)", "Einstellungen neu aus Datei laden") + table.add_row("(3)", "Status verfügbarer Dateien anzeigen (online)") + table.add_row("(4)", "Verfügbare Dateien herunterladen (online)") + table.add_row("(0)", "Beenden") + + print(header) + print(table) loop = True + val = 0 + __print_menu() + # user_input = Prompt.ask("Wählen Sie eine Aktion", choices=["1", "2", "3", "4", "0"]) while loop: __print_menu() - user_input = input("Choose an option [1-6] / [0]: ") - val = 0 - - try: - val = int(user_input) - except ValueError: - print( - "No.. input is not a valid integer number! Press Enter to continue!" - ) - continue + val = IntPromptDeutsch.ask("Wählen Sie eine Aktion", choices=["1", "2", "3", "4", "0"]) if val == 1: # Show Current Settings - self.__printFullWidth("--") - self.__printFullWidth(" Current Settings ") - self.settings.showSettings() - self.__printFullWidth("--") + tSettings = Table() + tSettings.add_column("Schlüssel") + tSettings.add_column("Wert") + settings = self.settings.getSettings() + for key in settings: + value = settings[key] + if key in ["clientsecret", "pwd"]: + value = "******" + tSettings.add_row(key, value) + console.print(tSettings) elif val == 2: # Reload Settings from file - self.__printFullWidth("--") - self.__printFullWidth(" Reload Settings From File settings.ini ") + print("[i][cyan]Einstellungen wurden neu aus der settings.ini eingelesen.") self.settings.readSettings() - self.__printFullWidth("--") elif val == 3: - # show status local files - print("-3-") - elif val == 4: # show status online files - print("-4-") self.__startConnection() self.__loadDocuments() self.__showStatusOnlineDocuments() - elif val == 5: - # - - print("-5-") - elif val == 6: + elif val == 4: # start download of files - print("-6-") self.__startConnection() self.__loadDocuments() self.__processOnlineDocuments() elif val == 0: loop = False - else: - print("not a valid input") if not val == 0: - input("Press Enter to Return to Menu!") + console.input("[b][blue]Enter[/blue][/b] drücken, um ins Menü zurückzukehren!") return val def __startConnection(self): """ - ToDo: Check if all settings are set for connection! + ToDo: Check if all settings are set for connection! """ - if self.settings and not self.conn: - try: - self.conn = Connection( - username=self.settings.getValueForKey("user"), - password=self.settings.getValueForKey("pwd"), - client_id=self.settings.getValueForKey("clientId"), - client_secret=self.settings.getValueForKey("clientSecret"), - ) - self.conn.login() - except Exception as err: - print(err) - else: - print("You are already online!") + if not self.settings or hasattr(self, "conn"): + print("Sie sind bereits angemeldet!") + return + self.conn = Connection( + username=self.settings.getValueForKey("user"), + password=self.settings.getValueForKey("pwd"), + client_id=self.settings.getValueForKey("clientId"), + client_secret=self.settings.getValueForKey("clientSecret"), + ) + + attempts = 0 + while attempts < 3: + xauthinfoheaders: XOnceAuthenticationInfo = XOnceAuthenticationInfo(json.loads(self.conn.initSession().headers["x-once-authentication-info"])) + attempts += 1 + tan = "" + if xauthinfoheaders.typ == "P_TAN_PUSH": + tan = "" + print("Sie verwenden PushTAN. Bitte nutzen Sie nun die comdirect photoTAN app auf Ihrem Smartphone, um die Zugriffsanfrage namens 'Login persönlicher Bereich' zu genehmigen.") + print("Bitte fahren Sie erst fort, wenn Sie dies getan haben! Nach dem fünften aufeinanderfolgenden Fehlversuch sperrt Comdirect den Zugang aus Sicherheitsgründen.") + console.input("Drücken Sie ENTER, nachdem Sie die PushTAN Anfrage auf Ihrem Gerät genehmigt haben.") + elif xauthinfoheaders.typ == "P_TAN" and hasattr(xauthinfoheaders, "challenge"): + from PIL import Image + import base64 + import io + Image.open(io.BytesIO(base64.b64decode(xauthinfoheaders.challenge))).show() + print("Bitte führen Sie die PhotoTAN Freigabe wie gewohnt mit ihrem Lesegerät oder App durch.") + tan = input("Geben Sie die TAN ein: ") + elif xauthinfoheaders.typ == "M_TAN" and hasattr(xauthinfoheaders, "challenge"): + print(f"Bitte prüfen Sie Ihr Smartphone mit der Nummer {xauthinfoheaders.challenge} auf die erhaltene M-TAN") + tan = input("Geben Sie die TAN ein: ") + else: + print(f"Tut mir Leid, das TAN-Verfahren {xauthinfoheaders.typ} wird (noch?) nicht unterstützt.") + exit(1) + r = self.conn.getSessionTAN(xauthinfoheaders.id, tan) + rjson = r.json() + if r.status_code == 422 and rjson["code"] == "expired": + print("Der Zeitraum für die TAN-Freigabeanforderung ist abgelaufen. Bitte erneut versuchen.") + elif r.status_code == 400 and rjson["code"] == "TAN_UNGUELTIG": + print(rjson["messages"][0]["message"]) + elif r.status_code != 200: + print(f"HTTP Status: {r.status_code} | {r.json()}") + if attempts > 2: + print("---") + print( + "Es sind drei Freigabeversuche in Folge fehlgeschlagen. Bitte vergewissern Sie sich, dass Sie korrekt arbeiten. " + "Sollten Sie unsicher sein, melden Sie sich einmal regulär auf der Comdirect-Webseite an, um eine Sperrung nach fünf aufeinanderfolgenden Fehlversuchen zu vermeiden." + ) + print("---") + exit(1) + # If successful, we trigger the secondary workflow to finish login + self.conn.getCDSecondary() + break + print("Login erfolgreich!") def __loadDocuments(self): - if not self.conn: + if not hasattr(self, "conn"): raise NameError("conn not set!") if self.onlineDocumentsDict: return + # Getting a single value is needed to grab pagination info messagesMeta = self.conn.getMessagesList(0, 1) x = 0 # Process batches of 1000. Max batchsize is 1000 (API restriction) batchSize = 1000 self.onlineDocumentsDict = {} - while x < messagesMeta["paging"]["matches"]: + while x < messagesMeta.matches: messagesMeta = self.conn.getMessagesList(x, batchSize) - for idx, documentMeta in enumerate(messagesMeta["values"]): - self.onlineDocumentsDict[x + idx] = messagesMeta["values"][idx] + for idx, document in enumerate(messagesMeta.documents): + self.onlineDocumentsDict[x + idx] = document x += batchSize def __showStatusOnlineDocuments(self): - self.onlineAdvertismentIndicesList = [] self.onlineArchivedIndicesList = [] self.onlineFileNameMatchingIndicesList = [] @@ -192,203 +222,178 @@ def __showStatusOnlineDocuments(self): self.__processOnlineDocuments(True) # show result: - self.__printFullWidth("--") - self.__printFullWidth(" Status Online Files ") - self.__printFullWidth("--") - onlineStatus = " online " - if not self.conn: - onlineStatus = " not online " - self.__printFullWidth("Current Status: " + onlineStatus, "left") - self.__printFullWidth("--") - self.__printLeftRight( - "Files online all count: ", str(self.countOnlineAll), ".", 20 - ) - self.__printLeftRight( - "Files online advertisment count: ", - str(len(self.onlineAdvertismentIndicesList)), - ".", - 20, - ) - self.__printLeftRight( - "Files online not yet read count: ", - str(len(self.onlineUnreadIndicesList)), - ".", - 20, - ) - self.__printLeftRight( - "Files online in archive count: ", - str(len(self.onlineArchivedIndicesList)), - ".", - 20, - ) - self.__printLeftRight( - "Files online filename matches count: ", - str(len(self.onlineFileNameMatchingIndicesList)), - ".", - 20, - ) - self.__printLeftRight( - "Files online already downloaded count: ", - str(len(self.onlineAlreadyDownloadedIndicesList)), - ".", - 20, - ) - self.__printLeftRight( - "Files online not yet downloaded count: ", - str(len(self.onlineNotYetDownloadedIndicesList)), - ".", - 20, - ) - self.__printFullWidth("--") - - def __processOnlineDocuments(self, isCountRun=False): - + table = Table(width= int(ui_width / 2)) + table.add_column("", no_wrap=True, ratio = 999) + table.add_column("Anzahl", style="blue b", width = 10, justify="right") + table.add_row("Online-Dokumente gesamt", str(self.countOnlineAll)) + table.add_section() + table.add_row("Davon ungelesen", str(len(self.onlineUnreadIndicesList))) + table.add_row("Davon bereits heruntergeladen", str(len(self.onlineAlreadyDownloadedIndicesList)), style="dim") + table.add_row("Davon noch nicht heruntergeladen", str(len(self.onlineNotYetDownloadedIndicesList)), style="dim") + table.add_row("Davon Werbung", str(len(self.onlineAdvertismentIndicesList)), style="dim") + table.add_row("Davon archiviert", str(len(self.onlineArchivedIndicesList)), style="dim") + if self.settings.getBoolValueForKey("downloadOnlyFilenames"): + table.add_row("Davon in der Liste gewünschter Dateinamen", str(len(self.onlineFileNameMatchingIndicesList)), style="dim") + print(table) + + def __processOnlineDocuments(self, isCountRun: bool = False): if not self.onlineDocumentsDict: return - menuWidth = 200 - - def __printStatus(idx, document, status=""): + def __printStatus(idx: int, document: Document, status: str = ""): # fill idx to 5 chars - idx = str(idx) - idx = idx.zfill(5) - if not isCountRun: - self.__printLeftRight( - idx - + " - " - + document["dateCreation"] - + " - " - + document["name"] - + " - " - + document["mimeType"], - status, - ".", - menuWidth, - ) - - overwrite = False # Only download new files - useSubFolders = self.settings.getBoolValueForKey("useSubFolders") - outputDir = self.settings.getValueForKey("outputDir") - isDownloadOnlyFilename = self.settings.getBoolValueForKey( - "downloadOnlyFilenames" - ) - downloadFilenameList = self.settings.getValueForKey( - "downloadOnlyFilenamesArray" - ) - downloadSource = self.settings.getValueForKey( - "downloadSource" + if isCountRun: + return + printLeftString = f"{str(idx):>5} | [cyan]{document.dateCreation.strftime("%Y-%m-%d")}[/cyan] | {sanitize_filename(document.name)}" + printRightString = status + filler: str = " " + spaces = ui_width - len(printLeftString) - len(printRightString) + if console is Console: + progress.console.print(printLeftString + (spaces * filler) + printRightString, highlight=False) + else: + print(printLeftString + (spaces * filler) + printRightString, highlight=False) + + def __isFileEqual(filepath : str, newdata : bytes): + with open(filepath, 'rb') as f: + data = f.read() + return data == newdata + progress = Progress( + TextColumn("[progress.description]{task.description}"), + BarColumn( bar_width= 150 ), + TaskProgressColumn(), + TimeRemainingColumn(), + console = console, + transient=isCountRun ) + with progress: + overwrite = False # Only download new files + useSubFolders = self.settings.getBoolValueForKey("useSubFolders") + outputDir = self.settings.getValueForKey("outputDir") + downloadFilenameList = self.settings.getValueForKey("downloadOnlyFilenamesArray") + downloadSource = self.settings.getValueForKey("downloadSource") + + countAll = len(self.onlineDocumentsDict) + countProcessed = 0 + countSkipped = 0 + countDownloaded = 0 + + task =progress.add_task("Downloading...",total=countAll) + # for idx in range(len(self.onlineDocumentsDict)): # documentMeta in enumerate(self.onlineDocumentsDict): + for idx in self.onlineDocumentsDict: + progress.advance(task) + document = self.onlineDocumentsDict[idx] + firstFilename = document.name.split(" ", 1)[0] + subFolder = "" + myOutputDir = outputDir + countProcessed += 1 + + # counting + if document.advertisement: + self.onlineAdvertismentIndicesList.append(idx) + if document.documentMetadata.archived: + self.onlineArchivedIndicesList.append(idx) + if firstFilename in downloadFilenameList: + self.onlineFileNameMatchingIndicesList.append(idx) + if not document.documentMetadata.alreadyRead: + self.onlineUnreadIndicesList.append(idx) + + # check for setting "download source" + if downloadSource == DownloadSource.archivedOnly.value and not document.documentMetadata.archived or downloadSource == DownloadSource.notArchivedOnly.value and document.documentMetadata.archived: + __printStatus(idx, document, "SKIPPED - not in selected download source") + countSkipped += 1 + continue - countAll = len(self.onlineDocumentsDict) - countProcessed = 0 - countSkipped = 0 - countDownloaded = 0 - - # for idx in range(len(self.onlineDocumentsDict)): # documentMeta in enumerate(self.onlineDocumentsDict): - for idx in self.onlineDocumentsDict: - documentMeta = self.onlineDocumentsDict[idx] - docName = documentMeta["name"] - firstFilename = docName.split(" ", 1)[0] - docMimeType = documentMeta["mimeType"] - docCreateDate = documentMeta["dateCreation"] - isDocAdvertisement = ( - True if str(documentMeta["advertisement"]).lower() == "true" else False - ) - isDocArchived = ( - True - if str(documentMeta["documentMetaData"]["archived"]).lower() == "true" - else False - ) - isAlreadyRead = ( - True - if str(documentMeta["documentMetaData"]["alreadyRead"]).lower() - == "true" - else False - ) - - subFolder = "" - myOutputDir = outputDir - countProcessed += 1 - - # counting - if isDocAdvertisement: - self.onlineAdvertismentIndicesList.append(idx) - - if isDocArchived: - self.onlineArchivedIndicesList.append(idx) - - if firstFilename in downloadFilenameList: - self.onlineFileNameMatchingIndicesList.append(idx) - - if not isAlreadyRead: - self.onlineUnreadIndicesList.append(idx) - - # check for setting "download source" - if downloadSource == DownloadSource.archivedOnly.value and not isDocArchived or\ - downloadSource == DownloadSource.notArchivedOnly.value and isDocArchived: - __printStatus(idx, documentMeta, "SKIPPED - not in selected download source") - countSkipped += 1 - continue - - # check for setting "only download if filename is in filename list" - if isDownloadOnlyFilename and not firstFilename in downloadFilenameList: - __printStatus( - idx, documentMeta, "SKIPPED - filename not in filename list" - ) - countSkipped += 1 - continue - - if docMimeType == "application/pdf": - subFolder = firstFilename - docName += ".pdf" - elif docMimeType == "text/html": - docName += ".html" - subFolder = "html" - - if useSubFolders: - myOutputDir = os.path.join(outputDir, sanitize_filename(subFolder)) - if not os.path.exists(myOutputDir): - os.makedirs(myOutputDir) - - filepath = os.path.join(myOutputDir, sanitize_filename(docName)) - - # check if already downloaded - if os.path.exists(filepath): - self.onlineAlreadyDownloadedIndicesList.append(idx) - if not overwrite: - __printStatus(idx, documentMeta, "SKIPPED - no overwrite") + # check for setting "only download if filename is in filename list" + if self.settings.getBoolValueForKey("downloadOnlyFilenames") and not firstFilename in downloadFilenameList: + __printStatus(idx, document, "SKIPPED - filename not in filename list") countSkipped += 1 continue - else: - self.onlineNotYetDownloadedIndicesList.append(idx) - - # do the download - if not bool(self.settings.getBoolValueForKey("dryRun")) and not isCountRun: - docContent = self.conn.downloadMessage(documentMeta) - moddate = time.mktime( - datetime.datetime.strptime(docCreateDate, "%Y-%m-%d").timetuple() - ) + filename = document.name + if document.mimeType == "application/pdf": + subFolder = "pdf" + filename += ".pdf" + elif document.mimeType == "text/html": + subFolder = "html" + filename += ".html" + else: + __printStatus(idx, document, f"Unknown mimeType {document.mimeType}") + + if useSubFolders: + myOutputDir : str = os.path.join(outputDir, sanitize_filename(subFolder)) + if not os.path.exists(myOutputDir): + os.makedirs(myOutputDir) + + filepath = os.path.join(myOutputDir, sanitize_filename(filename)) + + # do the download + if bool(self.settings.getBoolValueForKey("dryRun")) or isCountRun: + __printStatus(idx, document, "HERUNTERGELADEN - Testlauf, kein tatsächlicher Download") + countDownloaded += 1 + continue + + docDate = document.dateCreation.timestamp() + docContent = None + + # check if already downloaded + if os.path.exists(filepath): + if (self.settings.getBoolValueForKey("appendIfNameExists")): + if (docDate!= os.path.getmtime(filepath)): # If not the same, we simply append the date + # print(document.name) + # print(f"{docDate} {document.dateCreation.strftime("%Y-%m-%d")}") + # print(filepath) + # print(f"{os.path.getmtime(filepath)} {datetime.fromtimestamp(os.path.getmtime(filepath)).strftime("%Y-%m-%d")}") + path, suffix = filepath.rsplit(".",1) + filepath = f"{path}_{document.dateCreation.strftime("%Y-%m-%d")}.{suffix}" + # print("New filepath" + filepath) + if os.path.exists(filepath): # If there's multiple per same day, we append a counter + docContent = self.conn.downloadDocument(document) # Gotta load early to check if content is same + if __isFileEqual(filepath, docContent): + __printStatus(idx, document, "ÜBERSPRUNGEN - Datei bereits heruntergeladen") + countSkipped += 1 + self.onlineAlreadyDownloadedIndicesList.append(idx) + continue + path, suffix = filepath.rsplit(".",1) + if path[-3] == "-" and path[-1].isdigit(): # We assume this is the day split YYYY-mm-dd, so no duplicate existed yet + path += "_1" + else: + counter = int(path[-1]) + 1 # We increase the counter by 1 + path = path[:-1] + str(counter) + filepath = f"{path}.{suffix}" + # print("New filepath" + filepath) + if os.path.exists(filepath): # Enough is enough... + __printStatus(idx, document, "ÜBERSPRUNGEN - Datei bereits heruntergeladen") + self.onlineNotYetDownloadedIndicesList.append(idx) + continue + elif not overwrite: + __printStatus(idx, document, "ÜBERSPRUNGEN - Datei bereits heruntergeladen") + countSkipped += 1 + self.onlineAlreadyDownloadedIndicesList.append(idx) + continue + elif not overwrite: + __printStatus(idx, document, "ÜBERSPRUNGEN - appendIfNameExists ist FALSE") + countSkipped += 1 + self.onlineAlreadyDownloadedIndicesList.append(idx) + continue + if not docContent: # Ensure data is loaded + docContent = self.conn.downloadDocument(document) with open(filepath, "wb") as f: f.write(docContent) # shutil.copyfileobj(docContent, f) - os.utime(filepath, (moddate, moddate)) - __printStatus(idx, documentMeta, "DOWNLOADED") - countDownloaded += 1 - else: - __printStatus( - idx, documentMeta, "DOWNLOADED - dry run, so not really downloaded" - ) + os.utime(filepath, (docDate, docDate)) + __printStatus(idx, document, "HERUNTERGELADEN") countDownloaded += 1 - # last line, summary status: - if not isCountRun: - menuWidth = 74 - self.__printFullWidth("--", "center", "-", menuWidth) - self.__printFullWidth("Status Files Downloading", "left", "-", menuWidth) - print("All: " + str(countAll) + " files") - print("Processed: " + str(countProcessed) + " files") - print("Downloaded: " + str(countDownloaded) + " files") - print("Skipped: " + str(countSkipped) + " files") + # last line, summary status: + if not isCountRun: + table = Table(width= int(ui_width / 2)) + table.add_column("Zusammenfassung", no_wrap=True, ratio = 999) + table.add_column("Anzahl", style="blue b", width = 10, justify="right") + table.add_row("Dokumente gesamt", str(countAll)) + table.add_section() + table.add_row("Davon verarbeitet", str(countProcessed)) + table.add_row("Davon heruntergeladen", str(countDownloaded)) + table.add_row("Davon übersprungen", str(countSkipped), style="dim") + print(table) dirname = os.path.dirname(__file__) diff --git a/requirements.txt b/requirements.txt index 5038fd9..0c5fc60 100644 --- a/requirements.txt +++ b/requirements.txt @@ -1,3 +1,6 @@ -pathvalidate==2.4.1 -Pillow==10.2.0 -requests==2.31.0 \ No newline at end of file +# Automatically generated by https://github.com/damnever/pigar. + +pathvalidate~=3.2.0 +pillow~=10.2.0 +requests~=2.31.0 +rich~=13.7.0 diff --git a/settings.ini.example b/settings.ini.example index dc6d4b5..96c6cb4 100644 --- a/settings.ini.example +++ b/settings.ini.example @@ -1,16 +1,17 @@ [DEFAULT] -# NOTE: You may NOT use inline comments! Inline comments will be recognized as part of the value! -#comdirect login data -# PLEASE NOTE: -# The values for user, pw, clientID and clientSecret are PRIVATE and SENSITIVE. NEVER share them with ANYONE! +# NOTE: Keine Inline-Kommentare nutzen! Diese werden als Teil des Wertes interpretiert! +# Comdirect login data + +# BITTE BEACHTEN: # Die Werte für user, pw, clientID und clientSecret sind GEHEIM und SCHÜTZENSWERT. Teile sie NIEMALS mit irgendjemanden! +# Wird für user, pwd, clientId oder clientSecret hier kein Wert hinterlegt, so fordert das Programm bei Bedarf diese eine (sichere) Eingabe dieser. +# Für erhöhte Sicherheit wird empfohlen, zumindest pwd und clientSecret hier NICHT zu hinterlegen und nur im Programm einzugeben. user=Zugangsnummer -pwd=PIN/Passwort +#pwd=PIN/Passwort -#comdirect API access data (https://kunde.comdirect.de/itx/oauth/privatkunden) -#ClientID+Secret, die via API Access abrufbar ist (https://kunde.comdirect.de/itx/oauth/privatkunden) -clientId=**** -clientSecret=**** +# ClientID+Secret, die via API Access abrufbar ist (https://kunde.comdirect.de/itx/oauth/privatkunden) +clientId=**** +#clientSecret=**** #output directory outputDir=Dokumente @@ -18,19 +19,18 @@ outputDir=Dokumente #dryRun [True/False], bei dryRun=True wird nichts heruntergeladen; nur simuliert dryRun=False -#useSubFolders[True/False], bei True werden Dokumente in Unterverzeichnisse sortiert () +# Wenn mehrere Dateien den gleichen Namen haben, wird zuerst versucht, das Datum an den Dateinamen anzuhängen (YYYY-MM-DD). +# Gibt es mehrere Dokumente gleichen Namens am gleichen Tag, so wird ein Zähler angehängt (_x) und ggf. hochgezählt. +# Wenn auch das aus irgendeinem Grund nicht ausreichen sollte, hat man Pech gehabt. Irgendwo ist's genug mit den Ausnahmen. +appendIfNameExists=True + +# Bei True werden Dokumente in Unterverzeichnisse sortiert () useSubFolders=False -#downloadOnlyFilenames [True/False], bei True werden nur Dateien heruntergeladen, die mit einem bestimmten Namen anfangen. Alle anderen werden übersprungen. +# Bei True werden nur Dokumente heruntergeladen, deren erstes Wort (Text bis zum ersten Leerzeichen) in folgender Liste steht. Alle anderen werden übersprungen. downloadOnlyFilenames=True -# gehört zu downloadOnlyFilenames: hier werden die Dateinamen angegeben, welche heruntergeladen werden sollen als Dictionary: +# gehört zu downloadOnlyFilenames: hier werden die Dateinamen angegeben, welche heruntergeladen werden sollen: downloadOnlyFilenamesArray={"Finanzreport", "Jahressteuerbescheinigung", "Wertpapierabrechnung", "Steuermitteilung", "Gutschrift", "Dividendengutschrift", "Ertragsgutschrift"} -#[archivedOnly/notArchivedOnly/all] selection for document source -downloadSource=archivedOnly - -#not yet used WIP -range= -#useLocalArchive= -#localArchiveDir= -#moveToLocalArchiveBeforeDate= +#[archivedOnly/notArchivedOnly/all] Auswahl der Quelle. Hier kann eingestellt werden, ob nur im Postfach als "archiviert" markierte Dokumente heruntergeladen werden sollen. +downloadSource=all diff --git a/settings.py b/settings.py index cdd61c8..3ea823c 100644 --- a/settings.py +++ b/settings.py @@ -1,13 +1,10 @@ -from ComdirectConnection import Connection import os import configparser -from pathvalidate import sanitize_filename -import datetime import getpass class Settings: - def __init__(self, dirname): + def __init__(self, dirname: str): self.dirname = dirname self.settingsFileName = "settings.ini" self.readSettings() @@ -16,7 +13,6 @@ def readSettings(self): # If you want comfort, put your data into the settings.ini # If you understandably don't want to leave all credentials in a clear textfile, # just leave them out and you will be prompted for them. - print("loading settings...") absSettingsDirName = os.path.join(self.dirname, self.settingsFileName) if os.path.isfile(absSettingsDirName): @@ -24,55 +20,38 @@ def readSettings(self): self.__config.read(absSettingsDirName) try: if not self.__isSettingNameFilledInConfig("user"): - self.__config["DEFAULT"]["user"] = self.__getInputForString( - "Please enter your user number / Kundennummer: " - ) + self.__config["DEFAULT"]["user"] = self.__getInputForString("Bitte geben Sie Ihre Kundennummer ein: ") if not self.__isSettingNameFilledInConfig("pwd"): - self.__config["DEFAULT"]["pwd"] = getpass.getpass( - prompt="Please enter your password: ", stream=None - ) + self.__config["DEFAULT"]["pwd"] = getpass.getpass(prompt="Bitte geben Sie das dazugehörige Passwort ein: ", stream=None) if not self.__isSettingNameFilledInConfig("clientId"): - self.__config["DEFAULT"]["clientId"] = self.__getInputForString( - "Please enter your clientId for API access: " - ) + self.__config["DEFAULT"]["clientId"] = self.__getInputForString("Bitte geben Sie die oAuth clientId für den API-Zugang ein: ") if not self.__isSettingNameFilledInConfig("clientSecret"): - self.__config["DEFAULT"]["clientSecret"] = self.__getInputForString( - "Please enter your clientSecret for API access: " - ) + self.__config["DEFAULT"]["clientSecret"] = self.__getInputForString("Bitte geben Sie Ihr oAuth clientSecret für den API Zugang ein: ") if not self.__isSettingNameFilledInConfig("outputDir"): - self.__config["DEFAULT"]["outputDir"] = self.__getInputForString( - "Please enter the path to the folder you want reports to be downloaded to: " - ) - - if not self.__config.has_option(None, "dryRun"): - self.__config["DEFAULT"][ - "dryRun" - ] = self.__checkInputForBoolTrueString( - self.__getInputForString( - "Should the run a test run? (no files get downloaded) [yes/no]: " - ) - ) + self.__config["DEFAULT"]["outputDir"] = self.__getInputForString("Bitte geben Sie das Zielverzeichnis an, in welches die Dokumente heruntergeladen werden sollen: ") + + if not self.__config.has_option("", "dryRun"): + self.__config["DEFAULT"]["dryRun"] = str(self.__isTruthy(self.__getInputForString("Soll dies ein Testlauf sein (keine Dateien werden heruntergeladen)? [ja/nein]: "))) except Exception as error: print("ERROR", error) exit(-1) # check out dir right away.. - self.outputDir = self.__createIfNotExistDir( - self.__config["DEFAULT"]["outputDir"] - ) - - print("settings set.") + self.outputDir = self.__createIfNotExistDir(self.__config["DEFAULT"]["outputDir"]) else: raise NameError("please provide settings.ini to start program.") + def getSettings(self): + return self.__config["DEFAULT"] + def showSettings(self): for key in self.__config["DEFAULT"]: output = key + ": " - if key == "pwd": + if key in ["pwd", "clientsecret"]: pwOut = "" for _ in range(len(self.__config["DEFAULT"][key])): pwOut += "*" @@ -81,69 +60,48 @@ def showSettings(self): output += self.__config["DEFAULT"][key] print(output) - def getValueForKey(self, settingName, section="DEFAULT"): + def getValueForKey(self, settingName: str, section: str = "DEFAULT"): if self.__isSettingNameFilledInConfig(settingName, section): return self.__config[section][settingName] else: raise NameError("SettingName not set") - def getBoolValueForKey(self, settingName, section="DEFAULT"): + def getBoolValueForKey(self, settingName: str, section: str = "DEFAULT"): if self.__isSettingNameFilledInConfig(settingName, section): - return self.__checkInputForBoolTrue(self.__config[section][settingName]) + return self.__isTruthy(self.__config[section][settingName]) else: raise NameError("SettingName not set") - def __isSettingNameFilledInConfig(self, settingName, section="DEFAULT"): - isAvailAndFilled = True - + def __isSettingNameFilledInConfig(self, settingName: str, section: str = "DEFAULT"): if settingName not in self.__config[section]: - isAvailAndFilled = False - - if not self.__config.has_option(None, settingName): - isAvailAndFilled = False - - if not self.__config[section][settingName]: - isAvailAndFilled = False - - return isAvailAndFilled - - def __getInputForString(self, printString): + return False + elif not self.__config.has_option("", settingName): + return False + elif not self.__config[section][settingName]: + return False + return True + + def __getInputForString(self, printString: str): # print("----------------------------------------------------------------") inp = input(printString) # print("----------------------------------------------------------------") return inp - def __printMessage(self, message): + def __printMessage(self, message: str): print(message) - def __checkInputForBoolTrue(self, inputString): - retValue = False - if inputString.lower() in ["true", "yes", "y", "1"]: - retValue = True - return retValue - - def __checkInputForBoolTrueString(self, inputString): - retValue = "False" - if inputString.lower() in ["true", "yes", "y", "1"]: - retValue = "True" - return retValue - - def __createIfNotExistDir(self, dir): - self.__printMessage("Checking if given outputDir exists...") + def __isTruthy(self, inputString: str): + return inputString.lower() in ["ja", "j", "true", "yes", "y", "1"] + def __createIfNotExistDir(self, dir: str): if not os.path.isabs(dir): dir = os.path.join(self.dirname, dir) if not os.path.exists(dir): - shouldCreateDir = self.__getInputForString( - "Path not found. Should I create it? (yes/no):" - ) - if shouldCreateDir.lower() in ["true", "yes", "y", "1"]: + shouldCreateDir = self.__getInputForString("Zielverzeichnis nicht gefunden. Soll es erstell werden? (ja/nein): ") + if self.__isTruthy(shouldCreateDir): os.makedirs(dir) - self.__printMessage("Path created: " + dir) else: - self.__printMessage("Path not created, script exited 0") - exit - else: - self.__printMessage("Moving on. Path exists: " + dir) + self.__printMessage("Zielverzeichnis wurde nicht erstellt. Bis zum nächsten Mal!") + exit(0) return dir diff --git a/setup.py b/setup.py deleted file mode 100644 index 4b46d55..0000000 --- a/setup.py +++ /dev/null @@ -1,17 +0,0 @@ -from cx_Freeze import setup, Executable - -# Dependencies are automatically detected, but it might need -# fine tuning. -buildOptions = dict(packages = [], excludes = []) - -base = 'Console' - -executables = [ - Executable('main.py', base=base, targetName = 'cdapiHelper') -] - -setup(name='Comdirect API Helper', - version = '1.0', - description = '', - options = dict(build_exe = buildOptions), - executables = executables)