forked from LukeEcomod/SpaFHy_v1
-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathtopmodel.py
305 lines (273 loc) · 10.6 KB
/
topmodel.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
# -*- coding: utf-8 -*-
"""
Created on Fri Jun 02 15:14:11 2017
@author: slauniai
******************************************************************************
TopModel (Beven & Kirkby) -implementation for SpatHy -integration
Topmodel() allows spatially varying soil depths and transmissivity
Topmodel_Homogenous() assumes constant properties and hydrologic similarity \n
retermined from TWI = log (a / tan(b))
(C) Samuli Launiainen, 2016-
Last edit: 7.2.2018 / Samuli Launiainen
******************************************************************************
"""
import numpy as np
# import matplotlib.pyplot as plt
eps = np.finfo(float).eps # machine epsilon
class Topmodel_Homogenous():
def __init__(self, pp, cellarea, cmask, flowacc, slope, S_initial=None,
outputs=False):
"""
sets up Topmodel for the catchment assuming homogenous
effective soil depth 'm' and sat. hydr. conductivity 'ko'.
This is the 'classic' version of Topmodel where hydrologic similarity\
index is TWI = log(a / tan(b)).
Args:
pp - parameter dict with keys:
dt - timestep [s]
ko - soil transmissivity at saturation [m/s]
m - effective soil depth (m), i.e. decay factor of Ksat with depth
twi_cutoff - max allowed twi -index
so - initial catchment average saturation deficit (m)
cmask - catchment mask, 1 = catchment_cell
cellarea - gridcell area [m2]
flowacc - flow accumulation per unit contour length (m)
slope - local slope (deg)
S_initial - initial storage deficit, overrides that in 'pp'
outputs - True stores outputs after each timestep into dictionary
"""
if not S_initial:
S_initial = pp['so']
self.dt = float(pp['dt'])
self.cmask = cmask
self.CellArea = cellarea
dx = cellarea**0.5
self.CatchmentArea = np.size(cmask[cmask == 1])*self.CellArea
# topography
self.a = flowacc*cmask # flow accumulation grid
self.slope = slope*cmask # slope (deg) grid
# effective soil depth [m]
self.M = pp['m']
# lat. hydr. conductivity at surface [m2/timestep]
# self.To = pp['ko']*pp['m']*self.dt
self.To = pp['ko']*self.dt
"""
local and catchment average hydrologic similarity indices (xi, X).
Set xi > twi_cutoff equal to cutoff value to remove tail of twi-distribution.
This concerns mainly the stream network cells. 'Outliers' in twi-distribution are
problem for streamflow prediction
"""
slope_rad = np.radians(self.slope) # deg to rad
xi = np.log(self.a / dx / (np.tan(slope_rad) + eps))
# apply cutoff
clim = np.percentile(xi[xi > 0], pp['twi_cutoff'])
xi[xi > clim] = clim
self.xi = xi
self.X = 1.0 / self.CatchmentArea*np.nansum(self.xi*self.CellArea)
# baseflow rate when catchment Smean=0.0
self.Qo = self.To*np.exp(-self.X)
# catchment average saturation deficit S [m] is the only state variable
s = self.local_s(S_initial)
s[s < 0] = 0.0
self.S = np.nanmean(s)
# create dictionary of empty lists for saving results
if outputs:
self.results = {'S': [], 'Qb': [], 'Qr': [], 'Qt': [], 'qr': [],
'fsat': [], 'Mbe': [], 'R': []
}
def local_s(self, Smean):
"""
computes local storage deficit s [m] from catchment average
"""
s = Smean + self.M*(self.X - self.xi)
return s
def subsurfaceflow(self):
"""subsurface flow to stream network (per unit catchment area)"""
Qb = self.Qo*np.exp(-self.S / (self.M + eps))
return Qb
def run_timestep(self, R):
"""
runs a timestep, updates saturation deficit and returns fluxes
Args:
R - recharge [m per unit catchment area] during timestep
OUT:
Qb - baseflow [m per unit area]
Qr - returnflow [m per unit area]
qr - distributed returnflow [m]
fsat - saturated area fraction [-]
Note:
R is the mean drainage [m] from bucketgrid.
"""
# initial conditions
So = self.S
s = self.local_s(So)
# subsurface flow, based on initial state
Qb = self.subsurfaceflow()
# update storage deficit and check where we have returnflow
S = So + Qb - R
s = self.local_s(S)
# returnflow grid
qr = -s
qr[qr < 0] = 0.0 # returnflow grid, m
# average returnflow per unit area
Qr = np.nansum(qr)*self.CellArea / self.CatchmentArea
# now all saturation excess is in Qr so update s and S.
# Deficit increases when Qr is removed
S = S + Qr
self.S = S
# saturated area fraction
ix = np.where(s <= 0)
# fsat = np.max(np.shape(ix))*self.CellArea / self.CatchmentArea
fsat = len(ix[0])*self.CellArea / self.CatchmentArea
del ix
# check mass balance
dS = (So - self.S)
dF = R - Qb - Qr
mbe = dS - dF
# append results
if hasattr(self, 'results'):
self.results['R'].append(R)
self.results['S'].append(self.S)
self.results['Qb'].append(Qb)
self.results['Qr'].append(Qr)
self.results['qr'].append(qr)
self.results['fsat'].append(fsat)
self.results['Mbe'].append(mbe)
return Qb, Qr, qr, fsat
#class Topmodel():
# def __init__(self, pp, ksat=None, cellarea=None, cmask=None, flowacc=None,
# slope=None, S_initial=None, outputs=False):
# """
# Topmodel, allows for spatially varying 'effective soil depth m' and
# transmissivity 'to'.
# Based on Saulnier et al. 1997. J.Hydrol. 202, 158-172.
# Args:
# pp - parameter dict with keys {'dt','to','m', 'So'}
# ksat - saturated hydr. conductivity (ms-1), array or grid
# cmask - catchment mask, 1 = catchment_cell
# cellarea - area of each cell (or index class)
# flowacc - flow accumulation per unit contour length (m)
# slope - local slope (rad)
# outputs - True stores outputs after each timestep
#
# in pp: keys
# dt - timestep [s]
# to - soil transmissivity at saturation [m/s]
# m - effective soil depth (m), i.e. decay factor of Ksat with depth
# so - initial catchment average saturation deficit (m)
# """
# if not S_initial:
# S_initial = pp['so']
#
# self.dt = pp['dt']
# self.cmask = cmask
# self.CellArea = cellarea
# self.CatchmentArea = np.size(cmask[cmask == 1])*self.CellArea
#
# # topography
# self.a = flowacc*cmask # flow accumulation grid
# self.slope = slope*cmask # slope (deg) grid
#
# # local m and average M effective soil depth [m]
# self.m = pp['m']*cmask
# self.M = 1.0 / self.CatchmentArea*np.nansum(self.m*self.CellArea)
#
# self.To = pp['to']*self.dt
#
# # local xi and catchment average X hydrologic similarity indices
# rad = np.radians(self.slope) # deg to rad
#
# self.xi = self.m*np.log(self.a / (self.To*(np.tan(rad) + eps)))
# self.X = 1.0 / self.CatchmentArea*np.nansum(self.xi*self.CellArea)
# # print('X', self.X)
#
# # baseflow rate when catchment Smean=0.0
# self.Qo = np.exp(-self.X / self.M)
#
# # state vars.: local s and catchment average S saturation deficit [m]
# s = self.local_s(S_initial)
# s[s < 0] = 0.0
# self.S = np.nanmean(s)
#
# # create dictionary of empty lists for saving results
# if outputs:
# self.results = {'S': [], 'Qt': [], 'Qb': [], 'Qf': [], 'Roff': []}
#
# def local_s(self, Smean):
# """computes local storage deficit s [m] """
# s = Smean + self.M*(self.X - self.xi)
# return s
#
# def subsurfaceflow(self):
# """subsurface flow to stream network (per unit catchment area)"""
# Qb = self.Qo*np.exp(-self.S / (self.M + eps))
# return Qb
#
# def run_timestep(self, R):
# """
# runs a timestep, updates saturation deficit and returns fluxes
# IN: R - recharge [m per unit catchment area] during timestep
# OUT:
# Qb - baseflow [m per unit area]
# Qf - returnflow [m per unit area]
# Roff - runoff [m per unit area], recharge to initially sat. area
# fsat - saturated area fraction [-]
# qf - returnflow [m per unit area] in matrix
# """
#
# # initial conditions
# So = self.S
# s = self.local_s(So)
#
# # subsurface flow, based on initial state
# Qb = self.subsurfaceflow()
#
# # for outputs & use of outcommented lines
# Rec = R
# Roff = R - Rec
#
## use this snippet if input R includes drainage also from sat. cells.
## input to ground water storage is from unsaturated cells only
## ix = np.where(s > 0) # unsaturated cells
## uf = np.max(np.shape(ix))*self.CellArea/self.CatchmentArea
## Rec = R*uf
## del ix
##
## #and remaining forms runoff
## Roff = R - Rec
## roff = np.zeros(np.shape(s))
## roff[np.where(s <= 0)] = R
#
# # update catchment water balance & comp. returnflow to surface
# S = So + Qb - Rec
# s = self.local_s(S)
#
# # return flow (saturation excess) occurs from 'new' saturated cells
# ix = np.where(s <= 0)
# fsat = np.max(np.shape(ix))*self.CellArea / self.CatchmentArea
# del ix
#
# # returnflow grid
# qf = -s
# qf[qf < 0] = 0.0 # returnflow grid, m
# # average returnflow per unit area
# Qf = np.nansum(qf)*self.CellArea/self.CatchmentArea
#
# # now all saturation excess is in Qf so update s and S
# S = S + Qf
# self.S = S
#
# # check mass balance
# # dS = (self.S - So)
# # dF = R - Roff - Qb - Qf
# # print ('topmbe [mm]', 1000*dS + 1000*dF)
#
# # append results
# if hasattr(self, 'results'):
# self.results['S'].append(self.S)
# self.results['Qt'].append(Qb + Qf + Roff)
# self.results['Qb'].append(Qb)
# self.results['Qf'].append(Qf)
# self.results['Roff'].append(Roff)
# self.results['fsat'].append(fsat)
# return Qb, Qf, Roff, fsat, qf