Skip to content
This repository has been archived by the owner on Sep 7, 2023. It is now read-only.

Commit

Permalink
Fix some linter errors and format with Black
Browse files Browse the repository at this point in the history
  • Loading branch information
AsgerPetersen committed Aug 23, 2019
1 parent ac8cb71 commit e02fabd
Show file tree
Hide file tree
Showing 7 changed files with 262 additions and 233 deletions.
21 changes: 11 additions & 10 deletions Kortforsyningen/config.py
Original file line number Diff line number Diff line change
@@ -1,15 +1,14 @@
from qgis.PyQt import QtCore
from .kf_config import KfConfig
from .local_config import LocalConfig
from qgis.PyQt import (
QtCore
)


class Config(QtCore.QObject):

kf_con_error = QtCore.pyqtSignal()
kf_settings_warning = QtCore.pyqtSignal()
loaded = QtCore.pyqtSignal()

def __init__(self, settings):
super(Config, self).__init__()
self.settings = settings
Expand All @@ -26,17 +25,19 @@ def __init__(self, settings):

def propagate_kf_settings_warning(self):
self.kf_settings_warning.emit()

def propagate_kf_con_error(self):
self.kf_con_error.emit()

def begin_load(self):
self.kf_config.begin_load()

def _handle_kf_config_loaded(self):
self.categories = []
self.categories_list = []
if self.settings.value('use_custom_file') and self.settings.value('only_background'):
if self.settings.value("use_custom_file") and self.settings.value(
"only_background"
):
self.kf_categories = []
background_category = self.kf_config.get_background_category()
if background_category:
Expand All @@ -47,18 +48,18 @@ def _handle_kf_config_loaded(self):
self.categories = self.kf_categories + self.local_categories
self.categories_list.append(self.kf_categories)
self.categories_list.append(self.local_categories)

# Tell the world
self.loaded.emit()

def get_category_lists(self):
return self.categories_list

def get_categories(self):
return self.categories

def get_kf_maplayer_node(self, id):
return self.kf_config.get_maplayer_node(id)

def get_local_maplayer_node(self, id):
return self.local_config.get_maplayer_node(id)
135 changes: 84 additions & 51 deletions Kortforsyningen/kf_config.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,30 +2,40 @@
import codecs
import os.path
import datetime
import traceback
import json
import hashlib
import glob
from qgis.gui import QgsMessageBar
from qgis.core import *
from qgis.PyQt.QtCore import QCoreApplication, QFileInfo, QFile, QUrl, QSettings, QTranslator, qVersion, QIODevice
from qgis.PyQt.QtCore import (
QCoreApplication,
QFileInfo,
QFile,
QUrl,
QSettings,
QTranslator,
qVersion,
QIODevice,
)
from qgis.PyQt.QtNetwork import QNetworkAccessManager, QNetworkRequest
from qgis.PyQt.QtWidgets import QAction, QMenu, QPushButton
from qgis.PyQt.QtGui import QIcon

from qgis.PyQt import QtCore, QtXml
import traceback
import json

import hashlib
import glob

from .qlr_file import QlrFile

FILE_MAX_AGE = datetime.timedelta(hours=12)
KF_SERVICES_URL = 'http://services.kortforsyningen.dk/service?request=GetServices&token={{kf_token}}'
KF_SERVICES_URL = (
"http://services.kortforsyningen.dk/service?request=GetServices&token={{kf_token}}"
)


def log_message(message):
QgsMessageLog.logMessage(message, 'Kortforsyningen plugin')
QgsMessageLog.logMessage(message, "Kortforsyningen plugin")


class KfConfig(QtCore.QObject):

kf_con_error = QtCore.pyqtSignal()
kf_settings_warning = QtCore.pyqtSignal()
loaded = QtCore.pyqtSignal()
Expand All @@ -46,7 +56,11 @@ def __init__(self, settings):
self._qlr_network_manager.finished.connect(self._handle_qlr_response)

def begin_load(self):
self.cached_kf_qlr_filename = self.settings.value('cache_path') + hashlib.md5(self.settings.value('token').encode()).hexdigest() +'_kortforsyning_data.qlr'
self.cached_kf_qlr_filename = (
self.settings.value("cache_path")
+ hashlib.md5(self.settings.value("token").encode()).hexdigest()
+ "_kortforsyning_data.qlr"
)
self.allowed_kf_services = {}
if self.settings.is_set():
try:
Expand All @@ -71,32 +85,37 @@ def _handle_services_response(self, network_reply):
self.background_category = None
self.categories = []
self.kf_con_error.emit()
log_message(f'Network error getting services from kf. Error code : ' + str(network_reply.error()))
log_message(
f"Network error getting services from kf. Error code : "
+ str(network_reply.error())
)
return
response = str(network_reply.readAll(), 'utf-8')
response = str(network_reply.readAll(), "utf-8")
doc = QtXml.QDomDocument()
doc.setContent(response)
service_types = doc.documentElement().childNodes()
i = 0
allowed = {}
allowed['any_type'] = {'services': []}
allowed["any_type"] = {"services": []}
while i < service_types.count():
service_type = service_types.at(i)
service_type_name = service_type.nodeName()
allowed[service_type_name] = {'services': []}
allowed[service_type_name] = {"services": []}
services = service_type.childNodes()
j = 0
while j < services.count():
service = services.at(j)
service_name = service.nodeName()
allowed[service_type_name]['services'].append(service_name)
allowed['any_type']['services'].append(service_name)
allowed[service_type_name]["services"].append(service_name)
allowed["any_type"]["services"].append(service_name)
j = j + 1
i = i + 1
self.allowed_kf_services = allowed
if not allowed['any_type']['services']:
if not allowed["any_type"]["services"]:
self.kf_con_error.emit()
log_message(f"Kortforsyningen returned an empty list of allowed services for token: {self.settings.value('token')}")
log_message(
f"Kortforsyningen returned an empty list of allowed services for token: {self.settings.value('token')}"
)
# Go on and get QLR
self._get_qlr_file()

Expand All @@ -115,15 +134,19 @@ def _get_qlr_file(self):
self._request_kf_qlr_file()

def _request_kf_qlr_file(self):
url_to_get = self.settings.value('kf_qlr_url')
url_to_get = self.settings.value("kf_qlr_url")
self._qlr_network_manager.get(QNetworkRequest(QUrl(url_to_get)))


def _handle_qlr_response(self, network_reply):
if network_reply.error():
log_message(u'No contact to the configuration at ' + self.settings.value('kf_qlr_url') + '. Error code : ' + str(network_reply.error()))
log_message(
"No contact to the configuration at "
+ self.settings.value("kf_qlr_url")
+ ". Error code : "
+ str(network_reply.error())
)
else:
response = str(network_reply.readAll(), 'utf-8')
response = str(network_reply.readAll(), "utf-8")
response = self.insert_token(response)
self.write_cached_kf_qlr(response)
# Now load and use it
Expand All @@ -135,74 +158,84 @@ def _load_config_from_cached_kf_qlr(self):
self.loaded.emit()

def get_categories(self):
return self.categories
return self.categories

def get_background_category(self):
return self.background_category
return self.background_category

def get_maplayer_node(self, id):
return self.kf_qlr_file.get_maplayer_node(id)
return self.kf_qlr_file.get_maplayer_node(id)

def get_kf_categories(self):
kf_categories = []
kf_background_category = None
groups_with_layers = self.kf_qlr_file.get_groups_with_layers()
for group in groups_with_layers:
kf_category = {
'name': group['name'],
'selectables': []
}
for layer in group['layers']:
if self.user_has_access(layer['service']):
kf_category['selectables'].append({
'type': 'layer',
'source': 'kf',
'name': layer['name'],
'id': layer['id']
kf_category = {"name": group["name"], "selectables": []}
for layer in group["layers"]:
if self.user_has_access(layer["service"]):
kf_category["selectables"].append(
{
"type": "layer",
"source": "kf",
"name": layer["name"],
"id": layer["id"],
}
)
if len(kf_category['selectables']) > 0:
if len(kf_category["selectables"]) > 0:
kf_categories.append(kf_category)
if group['name'] == 'Baggrundskort':
if group["name"] == "Baggrundskort":
kf_background_category = kf_category
return kf_background_category, kf_categories

def user_has_access(self, service_name):
return service_name in self.allowed_kf_services['any_type']['services']
return service_name in self.allowed_kf_services["any_type"]["services"]

def get_custom_categories(self):
return []

def _read_cached_kf_qlr(self):
#return file(unicode(self.cached_kf_qlr_filename)).read()
# return file(unicode(self.cached_kf_qlr_filename)).read()
f = QFile(self.cached_kf_qlr_filename)
f.open(QIODevice.ReadOnly)
return f.readAll()

def write_cached_kf_qlr(self, contents):
"""We only call this function IF we have a new version downloaded"""
# Remove old versions file
for filename in glob.glob(self.settings.value('cache_path') +'*_kortforsyning_data.qlr'):
for filename in glob.glob(
self.settings.value("cache_path") + "*_kortforsyning_data.qlr"
):
os.remove(filename)

# Write new version
with codecs.open(self.cached_kf_qlr_filename, 'w', 'utf-8') as f:
with codecs.open(self.cached_kf_qlr_filename, "w", "utf-8") as f:
f.write(contents)

def debug_write_allowed_services(self):
try:
debug_filename = self.settings.value('cache_path') + self.settings.value('username') + '.txt'
debug_filename = (
self.settings.value("cache_path")
+ self.settings.value("username")
+ ".txt"
)
if os.path.exists(debug_filename):
os.remove(debug_filename)
with codecs.open(debug_filename, 'w', 'utf-8') as f:
f.write(json.dumps(self.allowed_kf_services['any_type']['services'], indent=2).replace('[', '').replace(']', ''))
except Exception as e:
with codecs.open(debug_filename, "w", "utf-8") as f:
f.write(
json.dumps(
self.allowed_kf_services["any_type"]["services"], indent=2
)
.replace("[", "")
.replace("]", "")
)
except Exception:
pass

def insert_token(self, text):
result = text
replace_vars = {}
replace_vars["kf_token"] = self.settings.value('token')
replace_vars["kf_token"] = self.settings.value("token")
for i, j in replace_vars.items():
result = result.replace("{{" + str(i) + "}}", str(j))
return result
Loading

0 comments on commit e02fabd

Please sign in to comment.