-
Notifications
You must be signed in to change notification settings - Fork 1
/
process.py
208 lines (173 loc) · 6.62 KB
/
process.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
import glob
import os
import pandas as pd
import geopandas as gpd
import numpy as np
import matplotlib.pyplot as plt
import json
from shapely.geometry import Point
import zipfile
# Define file paths
zip_file_path = "data.zip"
extracted_folder_path = "extracted/"
output_csv_path = "output/pcn_system_supplier_counts_with_icb.csv"
output_map_path = "output/pcn_map.png"
# Unzip the provided files, including nested zip files
with zipfile.ZipFile(zip_file_path, "r") as zip_ref:
zip_ref.extractall(extracted_folder_path)
# Helper function to extract from nested zip files
def extract_nested_zip(file_path, extract_to):
with zipfile.ZipFile(file_path, "r") as zip_ref:
zip_ref.extractall(extract_to)
# Check each file in the extracted directory, if it's a zip file, extract it
for root, dirs, files in os.walk(extracted_folder_path):
for file in files:
if file.endswith(".zip"):
extract_nested_zip(os.path.join(root, file), root)
# Define paths to the extracted files
excel_file_path = os.path.join(extracted_folder_path, "ePCN.xlsx")
csv_file_path = glob.glob(extracted_folder_path + "gp-reg-pat-prac-map*.csv")
csv_file_path = csv_file_path[0] if csv_file_path else None
json_file_path = os.path.join(extracted_folder_path, "pcn_map.json")
# Define dummy data functions for fallback
def generate_dummy_excel():
# Generate a dataframe similar in structure to 'ePCN.xlsx'
dummy_excel_data = pd.DataFrame(
{
"practice_code": [f"P{i:04d}" for i in range(1, 6)],
"pcn_code": [f"N{i:03d}" for i in range(1, 6)],
"ICB": [f"B{i:02d}" for i in range(1, 6)],
}
)
return dummy_excel_data
def generate_dummy_csv():
# Generate a dataframe similar in structure to 'POMI_APR2023_to_SEP2023.csv'
dummy_csv_data = pd.DataFrame(
{
"practice_code": [f"P{i:04d}" for i in range(1, 6)],
"system_supplier": ["EMIS" if i % 2 == 0 else "TPP" for i in range(1, 6)],
}
)
return dummy_csv_data
def generate_dummy_json():
# Generate a JSON object similar in structure to 'pcn_map.json'
dummy_json_data = {
"type": "FeatureCollection",
"features": [
{
"type": "Feature",
"properties": {"code": f"N{i:03d}", "name": f"PCN{i:03d}"},
"geometry": {
"type": "Point",
"coordinates": [-0.1278 + i * 0.1, 51.5074 + i * 0.1],
},
}
for i in range(1, 6)
],
}
return dummy_json_data
# Check if files exist and read or generate dummy data accordingly
if os.path.isfile(excel_file_path):
pcn_core_partner_details = pd.read_excel(
excel_file_path, sheet_name="PCN Core Partner Details"
)
else:
pcn_core_partner_details = generate_dummy_excel()
if csv_file_path and os.path.isfile(csv_file_path):
csv_data = pd.read_csv(csv_file_path)
csv_data = csv_data.rename(
columns={"PRACTICE_CODE": "practice_code", "SUPPLIER_NAME": "system_supplier"}
)
else:
print(f"CSV file not found in {extracted_folder_path}. Using dummy data.")
csv_data = generate_dummy_csv()
if os.path.isfile(json_file_path):
with open(json_file_path, "r") as json_file:
pcn_geo_data = json.load(json_file)
else:
pcn_geo_data = generate_dummy_json()
# Some PCN-practice relationships have ended. This is indicated by the 'Practice to PCN\nRelationship\nEnd Date' column being
# non-null. We want to ignore these practices when counting the number of TPP and EMIS practices.
pcn_core_partner_details = pcn_core_partner_details[
pcn_core_partner_details["Practice to PCN\nRelationship\nEnd Date"].isna()
]
# assert that the resulting dataframe has no duplicates across 'Partner\nOrganisation\nCode'
assert not pcn_core_partner_details.duplicated(
subset=["Partner\nOrganisation\nCode"]
).any(), "Duplicate entries in 'Partner Organisation Code'"
# Process the Excel data
pcn_core_partner_details_renamed = pcn_core_partner_details.rename(
columns={
"Partner\nOrganisation\nCode": "practice_code",
"PCN Code": "pcn_code",
"Practice\nParent\nSub ICB Loc Code": "sub_ICB",
}
)
# Process the CSV data
# Define a custom function to get the last value when ordered by 'Order_Column'
def last_value_ordered(group):
# sorted_group = group.sort_values(by='Order_Column')
last_row = group.iloc[-1]
return last_row
csv_data_relevant = csv_data.groupby("practice_code").apply(last_value_ordered)
csv_data_relevant = csv_data_relevant[["practice_code", "system_supplier"]].reset_index(
drop=True
)
# Merge and count system suppliers per PCN
merged_data = pd.merge(
pcn_core_partner_details_renamed[["practice_code", "pcn_code"]],
csv_data_relevant,
on="practice_code",
how="left",
)
pcn_system_supplier_counts = (
merged_data.groupby(["pcn_code", "system_supplier"]).size().unstack(fill_value=0)
)
pcn_system_supplier_counts.reset_index(inplace=True)
# Add ICB values to the CSV
unique_icb_data = pcn_core_partner_details_renamed[
["pcn_code", "sub_ICB"]
].drop_duplicates(subset=["pcn_code"])
updated_pcn_counts_with_icb = pd.merge(
pcn_system_supplier_counts, unique_icb_data, on="pcn_code", how="left"
)
# Save to CSV
updated_pcn_counts_with_icb.to_csv(output_csv_path, index=False)
tpp_total = updated_pcn_counts_with_icb["TPP"].sum()
emis_total = updated_pcn_counts_with_icb["EMIS"].sum()
# Create GeoDataFrame for the map
gdf = gpd.GeoDataFrame.from_features(pcn_geo_data["features"])
gdf.crs = "EPSG:4326"
gdf = gdf.to_crs("EPSG:27700")
# Merge with color data for the map
pcn_color_data = updated_pcn_counts_with_icb[["pcn_code", "EMIS", "TPP"]].copy()
pcn_color_data["emis_proportion"] = pcn_color_data["EMIS"] / (
pcn_color_data["EMIS"] + pcn_color_data["TPP"]
)
coolwarm_cmap = plt.cm.get_cmap("coolwarm")
pcn_color_data["gradient_color"] = pcn_color_data["emis_proportion"].apply(
coolwarm_cmap
)
gdf_colored = gdf.merge(pcn_color_data, left_on="code", right_on="pcn_code")
# Plot and save map
fig, ax = plt.subplots(1, 1, figsize=(15, 15))
gdf_colored.plot(ax=ax, color=gdf_colored["gradient_color"])
ax.axis("off")
sm = plt.cm.ScalarMappable(cmap=coolwarm_cmap, norm=plt.Normalize(vmin=0, vmax=1))
sm._A = []
cbar = fig.colorbar(sm, ax=ax)
cbar.set_label("Proportion of EMIS (blue) to TPP (red)")
plt.text(
0.01, 0.95, f"Total TPP Practices: {tpp_total}", transform=ax.transAxes, fontsize=16
)
plt.text(
0.01,
0.90,
f"Total EMIS Practices: {emis_total}",
transform=ax.transAxes,
fontsize=16,
)
plt.savefig(output_map_path)
# breakpoint()
# Return paths of generated files
output_csv_path, output_map_path