###############################################################
# vegetation.py
###############################################################
from pathlib import Path
import numpy as np
import json
from dycove.sim.vegetation_data import VegetationAttributes, VegCohort
from dycove.utils.simulation_reporting import Reporter
from dycove.utils.array_math import cell_averaging, sum_product, sum_elementwise
from dycove.constants import MIN_FRACTION, BURIAL_FRACTION, SCOUR_FRACTION, INIT_FRAC_MORT
r = Reporter()
##### ---------- Shared methods ---------- #####
[docs]
class SharedVegMethods:
[docs]
def compute_veg_model_quantities(self):
"""
Function to compute weighted area averages of model vegetation variables.
Handles the various fractions that are tracked within each grid cell.
This function works the same no matter the number of species, and so it is shared
by both classes.
"""
# For single species, self.cohorts is a list of VegCohort objects
# For multiple species, self.cohorts is a flattened list of VegCohort objects for all species
cohorts = self.cohorts
if not self.cohorts:
self.stemdensity = None
self.stemdiameter = None
self.stemheight = None
return
# Get list of cohort fractions and other attributes
fractions = [c.fraction for c in cohorts]
densities = [c.density for c in cohorts]
diameters = [c.diameter for c in cohorts]
heights = [c.height for c in cohorts]
# Stem density in a single cell: sum of n_i*frac_i over all fractions frac_i in cell
self.stemdensity = sum_product(fractions, densities)
# For veg height/diameter we just need a weighted average value in each cell based on cell fractions.
# Cells may have a sum of fractions less than 1, so we need to safely divide by the sum of fractions
self.stemdiameter = cell_averaging(fractions, diameters)
self.stemheight = cell_averaging(fractions, heights)
[docs]
class VegetationSpecies(SharedVegMethods):
"""
A single vegetation species. Can (and will) handle multiple cohorts of the species.
Parameters
----------
input_veg_filename : str or Path
Path to JSON file containing vegetation attributes.
mor : int
1 if we want to consider morphology (burial/scour) for this vegetation unit, else 0
rand_seed_frac : float
Fraction of potentially colonized cells that will actually colonize, distributed randomly.
rand_seed_method : str
Options are 'random' or 'deterministic'. If rand_seed_frac is between 0 and 1,
'deterministic' will use a seed to evaluate the SAME set of cells for each colonization,
whereas 'random' will be truly random each time.
"""
[docs]
def __init__(self,
input_veg_filename,
species_name=None,
mor=0,
rand_seed_frac=1.0,
rand_seed_method="random",
):
self.attrs = self.load_vegetation_attributes(Path(input_veg_filename))
self.mor = mor
self.seed_frac = rand_seed_frac
self.seed_method = rand_seed_method
if species_name is not None:
self.name = species_name
else:
self.name = str(Path(input_veg_filename).stem)
# Will become list of VegCohort objects, one for each cohort/colonization that occurs
self.cohorts: list[VegCohort] = []
[docs]
@staticmethod
def load_vegetation_attributes(filename: Path) -> VegetationAttributes:
""" Parse vegetation input json file and store attributes in a dataclass. """
with open(filename, "r") as f:
data = json.load(f)
ls_data = data.pop("life_stage_attr")
for key in ls_data[0].keys():
data[key] = [stage[key] for stage in ls_data]
return VegetationAttributes(**data)
[docs]
def colonization(self, ets, min_depths, max_depths, fl_dr, combined_cohorts=None):
"""
Colonize a :class:`~dycove.sim.vegetation_data.VegCohort` by adding a new
fraction to cells.
This method only adds new fractions. Effective stem diameters and heights are
computed elsewhere.
Parameters
----------
ets : int
Current ecological timestep within current ecological year.
min_depths : numpy.ndarray
Array of minimum water depths [m] at each cell over the previous period.
max_depths : numpy.ndarray
Array of maximum water depths [m] at each cell over the previous period.
fl_dr : float
Wet/dry threshold [m]; passed from constants.py).
combined_cohorts : list of VegetationSpecies or None, optional
Relevant only if multiple species are present. Provides information about
other species occupying space in cells. See colonization method of
:class:`~dycove.sim.vegetation.MultipleVegetationSpecies` for the use-case.
"""
if self.attrs.start_col_ets <= ets < self.attrs.end_col_ets:
r.report(f"Doing colonization for species \"{self.name}\", at ETS {ets}")
# Get conditions for colonization
dry_cond = (min_depths < fl_dr)
fld_cond = (max_depths >= fl_dr)
# Get indices for potential colonization
potential_inds = self.find_potential_inds(dry_cond, fld_cond)
# Create mask for potential colonization based on input fraction and method
rand_mask = self.create_seed_fraction_mask(len(min_depths))
# Apply potential inds to mask
potential_inds_mask = np.zeros_like(rand_mask, dtype=bool)
potential_inds_mask[potential_inds] = True
selected_inds = potential_inds_mask & rand_mask
existing_cohorts = combined_cohorts if combined_cohorts else self.cohorts
new_fraction = self.compute_new_fraction(existing_cohorts, selected_inds)
# Create new cohort for latest colonization
# new_fraction is an array, other quantities are scalars
new_cohort = VegCohort(
name=self.name,
fraction=new_fraction,
density=self.attrs.stemdens[0],
diameter=self.attrs.stemdiam_0,
height=self.attrs.stemht_0,
rootlength=self.attrs.rootlength_0,
lifestage=1,
lifestage_year=1,
)
self.cohorts.append(new_cohort)
def find_potential_inds(self, dry_cell_arr: np.ndarray, fld_cell_arr: np.ndarray):
# Cells must experience both wet AND dry conditions during previous ETS
if self.attrs.col_method == 1:
return np.where(dry_cell_arr & fld_cell_arr)[0]
# Cells must be wet at some point during previous ETS
elif self.attrs.col_method == 2:
return np.where(fld_cell_arr)[0]
# Cells must be dry at some point during previous ETS
elif self.attrs.col_method == 3:
return np.where(dry_cell_arr)[0]
# All cells can be colonized regardless of prior conditions
elif self.attrs.col_method == 4:
return np.arange(len(dry_cell_arr))
[docs]
def create_seed_fraction_mask(self, array_len: int):
""" Create a mask based on random seeding """
# Number of indices where we will allow colonization
n_inds = int(np.floor(self.seed_frac*array_len))
# Initialize mask with all False
rand_mask = np.zeros(array_len, dtype=bool)
# Set RNG depending on seeding style choice
if self.seed_method == "deterministic":
rng = np.random.default_rng(0)
else:
rng = np.random.default_rng()
inds = rng.choice(array_len, n_inds, replace=False)
rand_mask[inds] = True
return rand_mask
[docs]
def compute_new_fraction(self, existing_cohorts: list, inds2colonize: np.ndarray):
""" Compute a new cohort’s fractional cover based on available space. """
new_fraction = np.zeros(len(inds2colonize))
existing_fractions = [c.fraction for c in existing_cohorts]
if not existing_fractions:
# Create vegetation fraction array based on the hydro conditions and random filtering
new_fraction[inds2colonize] = self.attrs.fraction_0
else:
# TODO: Maybe find a better way of avoiding continuous appends with each colonization.
# E.g., avoid adding a new fraction if no space anywhere, etc. But that would mean
# that ZERO cells anywhere have favorable hydro conditions AND space available
# (not many cases).
# Determine available space based on sum of fractions in each cell
fraction_uncovered = 1 - sum_elementwise(existing_fractions)
# Compute new fraction to be colonized; min: space available, max: initial veg fraction
candidate_fraction = np.minimum(fraction_uncovered, self.attrs.fraction_0)
# We apply new fractions where partial flooding condition is met, and within cells selected via seed_frac
new_fraction[inds2colonize] = candidate_fraction[inds2colonize]
return new_fraction
[docs]
def mortality(self, hydro_vars, morpho_vars):
""" Delegate to internal methods. """
self.mortality_hydrodynamic(**hydro_vars)
self.mortality_morphodynamic(**morpho_vars)
self.apply_mortality()
#self.apply_mortality_using_initial_fractions()
[docs]
def mortality_hydrodynamic(self, fld_frac, dry_frac, vel_max):
"""
Compute linear mortality functions for each hydrodynamic stressor (if activated).
Hydrodynamic mortality is computed independently from vegetation characteristics
like stem height, root length, etc. Mortality parameters can be dependent on life
stage though, so we compute mortality for each life stage and apply to the
fractions later (self.apply_mortality).
Parameters
----------
fld_frac : numpy.ndarray
Array containing fractions of time that each cell was flooded during the
previous period.
dry_frac : numpy.ndarray
Array containing fractions of time that each cell was dry during the
previous period.
vel_max : numpy.ndarray
Array of maximum water velocities [m/s] at each cell over the previous period.
"""
# These arrays of potential mortality get saved as cohort attributes, along with applied mortality below
self.potential_mort_flood = [None]*self.attrs.nls
self.potential_mort_desic = [None]*self.attrs.nls
self.potential_mort_uproot = [None]*self.attrs.nls
# Loop over life stages present in vegetation file, thresholds depend on lifestage
for n in range(self.attrs.nls):
if self.attrs.flood_no_mort[n] == 0: # if flood mortality is turned off, flag = 0 (TODO: create a better flag)
self.potential_mort_flood[n] = np.zeros_like(fld_frac)
else:
self.potential_mort_flood[n] = self.linear_mortality_func(fld_frac,
self.attrs.flood_no_mort[n],
self.attrs.flood_all_mort[n])
if self.attrs.desic_no_mort[n] == 0: # if dessication is turned off, flag = 0 (TODO: create a better flag)
self.potential_mort_desic[n] = np.zeros_like(dry_frac)
else:
self.potential_mort_desic[n] = self.linear_mortality_func(dry_frac,
self.attrs.desic_no_mort[n],
self.attrs.desic_all_mort[n])
if self.attrs.uproot_no_mort[n] == 0: # if uprooting is turned off, flag = 0 (TODO: create a better flag)
self.potential_mort_uproot[n] = np.zeros_like(vel_max)
else:
self.potential_mort_uproot[n] = self.linear_mortality_func(vel_max,
self.attrs.uproot_no_mort[n],
self.attrs.uproot_all_mort[n])
# TODO: Verify that we want the default scour_frac to be 10%, previous codes have just used 100% same as stem burial
[docs]
def mortality_morphodynamic(self, bl_diff, burial_frac=BURIAL_FRACTION, scour_frac=SCOUR_FRACTION):
"""
Compute linear mortality functions for each morphodynamic stressor (if activated).
Morphodynamic stressors (burial, scour) are not currently functions of life stage,
and they are binary (e.g., stem is either buried or not).
Creates arrays of zeros if using a non-morphology model or with morphology turned
off.
Parameters
----------
bl_diff : numpy.ndarray
Array of cell-wise differences in bed level [m], from beginning to end of the
previous period, where positive values signify burial.
burial_frac : float
Fraction of stem height above which vegetation is considered buried.
scour_frac : float
Fraction of root length above which vegetation is considered scoured.
Notes
-----
- If morphology is turned on (mor=1), but morphology is not active in the model (e.g., ANUGA)
bl_diff will be passed as zero-difference arrays, and scour/burial will be set to zero.
"""
# Loop over fractions and their current shoot/root lengths and compare to erosion/sedimentation
for c in self.cohorts:
# If morphology is off, we still need these zero arrays for calculations in apply_mortality()
if self.mor == 0:
c.potential_mort_burial = np.zeros_like(c.fraction)
c.potential_mort_scour = np.zeros_like(c.fraction)
else:
c.potential_mort_burial = np.where(bl_diff >= burial_frac*c.height, 1, 0)
c.potential_mort_scour = np.where(bl_diff <= -scour_frac*c.rootlength, 1, 0)
[docs]
def apply_mortality(self):
"""
Multiply potential mortality by vegetation fractions to determine actual mortality.
Populate mortality-related fields of each active VegCohort object.
"""
# TODO: We track the fractions lost via each cause, but if all causes yield mortality fractions of 1,
# how should we track causes when, for instance, a cell only had a single fraction covering 40%?
# Is there an order of operations?
for c in self.cohorts:
# Reference fraction (initial or existing fraction) used in applied mortality calculations
ref_fraction = self.attrs.fraction_0 if INIT_FRAC_MORT else c.fraction
# Vegetation fractions lost to flooding
c.potential_mort_flood = self.potential_mort_flood[c.lifestage-1]
c.applied_mort_flood = np.where(c.fraction > MIN_FRACTION, ref_fraction * c.potential_mort_flood, 0.)
# Vegetation fractions lost to dessication
c.potential_mort_desic = self.potential_mort_desic[c.lifestage-1]
c.applied_mort_desic = np.where(c.fraction > MIN_FRACTION, ref_fraction * c.potential_mort_desic, 0.)
# Vegetation fractions lost to uprooting
c.potential_mort_uproot = self.potential_mort_uproot[c.lifestage-1]
c.applied_mort_uproot = np.where(c.fraction > MIN_FRACTION, ref_fraction * c.potential_mort_uproot, 0.)
# Vegetation fractions lost to deposition
c.applied_mort_burial = np.where(c.fraction > MIN_FRACTION, ref_fraction * c.potential_mort_burial, 0.)
# Vegetation fractions lost to erosion
c.applied_mort_scour = np.where(c.fraction > MIN_FRACTION, ref_fraction * c.potential_mort_scour, 0.)
# Subtract all mortality fractions from actual fractions, but maintain minimum fraction of zero
c.applied_mort_total = c.applied_mort_flood + c.applied_mort_desic + c.applied_mort_uproot + \
c.applied_mort_burial + c.applied_mort_scour
fractions_left = c.fraction - c.applied_mort_total
fractions_left = np.maximum(fractions_left, 0) # no negative fractions
# Update fractions in cohort
# For fractions that decay slowly over time, round down to zero when they get small enough
c.fraction = np.where(fractions_left > MIN_FRACTION, fractions_left, 0.)
# def apply_mortality_using_initial_fractions(self):
# """
# Replacing `c.fraction` with `self.attrs.fraction_0`, so mortality is
# always a function of initial colonization fraction.
# ! ---------- NOT CURRENTLY IN USE OR TESTED ----------- !
# """
# for c in self.cohorts:
# # Vegetation fractions lost to flooding
# c.potential_mort_flood = self.potential_mort_flood[c.lifestage-1]
# # For accounting purposes, no mortality if there is no fraction
# c.applied_mort_flood = np.where(c.fraction > 0.01, self.attrs.fraction_0*c.potential_mort_flood, 0.)
# # Vegetation fractions lost to dessication
# c.potential_mort_desic = self.potential_mort_desic[c.lifestage-1]
# c.applied_mort_desic = np.where(c.fraction > 0.01, self.attrs.fraction_0*c.potential_mort_desic, 0.)
# # Vegetation fractions lost to uprooting
# c.potential_mort_uproot = self.potential_mort_uproot[c.lifestage-1]
# c.applied_mort_uproot = np.where(c.fraction > 0.01, self.attrs.fraction_0*c.potential_mort_uproot, 0.)
# # Vegetation fractions lost to deposition
# c.applied_mort_burial = np.where(c.fraction > 0.01, self.attrs.fraction_0*c.potential_mort_burial, 0.)
# # Vegetation fractions lost to erosion
# c.applied_mort_scour = np.where(c.fraction > 0.01, self.attrs.fraction_0*c.potential_mort_scour, 0.)
# # Subtract all mortality fractions from actual fractions, but maintain minimum fraction of zero
# c.applied_mort_total = c.applied_mort_flood + c.applied_mort_desic + c.applied_mort_uproot + \
# c.applied_mort_burial + c.applied_mort_scour
# fractions_left = c.fraction - c.applied_mort_total
# fractions_left = np.maximum(fractions_left, 0) # no negative fractions
# # Update fractions in cohort
# # For fractions that decay slowly over time, round down to zero when they get small enough
# c.fraction = np.where(fractions_left > MIN_FRACTION, fractions_left, 0.)
[docs]
@staticmethod
def linear_mortality_func(stressor, th_min, th_max):
"""
Generic, hydrodynamic stressor mortality function (linear).
Parameters
----------
stressor : numpy.ndarray
Array of relevant stressor magnitude for each cell. For flooding/dessication,
it is an array of fraction of time where cells are wet/dry. For uprooting,
it is an array of maximum velocities.
th_min : float
Stressor value below which there is no mortality. Comes from JSON input file.
th_max : float
Stressor value above which there is total mortality. Comes from JSON input file.
"""
# Compute fractional mortality based on linear interpolation
mort_frac = (stressor - th_min)/(th_max - th_min)
# Updated, cleaner method to just bring all fractions outside the 0-1 range to 0 and 1
mort_frac[mort_frac > 1] = 1
mort_frac[mort_frac < 0] = 0
return mort_frac
# # TODO: Re-implement this, removed once the multi-species capability was added
# # The likelihood of this being called is fairly low
# def clean_out_old_fractions(self):
# # If all fractions within a given cohort are zero, remove it from the list
# cohorts_to_remove = []
# for i, c in enumerate(self.cohorts):
# if np.sum(c.fraction) < 0.001:
# cohorts_to_remove.append(i)
# r.report("Removed a cohort that has disappeared.")
# if cohorts_to_remove:
# self.remove_old_cohorts(cohorts_to_remove)
[docs]
def stemheight_growth(self, ets):
""" Computes stem height based on growth functions in VegAttributes. """
# During first growth ets, height starts at previous winter value, so no need to loop
if ets > self.attrs.start_growth_ets:
for c in self.cohorts:
# If none of these conditions are met, height stays the same
if ets <= self.attrs.end_growth_ets and c.height < self.attrs.stemht_max[c.lifestage-1]:
c.height += self.attrs.ht_growth_rates[c.lifestage-1]
elif ets >= self.attrs.winter_ets:
# Drop down to winter height, but by some chance if we are already below winter max, do nothing
c.height = min(c.height, self.attrs.stemht_winter_max[c.lifestage-1])
[docs]
def stemdiam_growth(self, ets):
""" Computes stem diameter based on growth functions in VegAttributes. """
# During first growth ets, height starts at previous winter value, so no need to loop
# (growth realized by next ETS)
if self.attrs.start_growth_ets < ets <= self.attrs.end_growth_ets:
for c in self.cohorts:
if c.diameter < self.attrs.stemdiam_max[c.lifestage-1] and ets < self.attrs.winter_ets:
c.diameter += self.attrs.diam_growth_rates[c.lifestage-1] # else, remains constant
[docs]
def root_growth(self, ets):
""" Computes root length based on growth functions in VegAttributes. """
# During first growth ETS, height starts at previous winter value, so no need to loop
# (growth realized by next ETS)
if self.attrs.start_growth_ets < ets <= self.attrs.end_growth_ets:
for c in self.cohorts:
if c.rootlength < self.attrs.rootlength_max[c.lifestage-1] and ets < self.attrs.winter_ets:
c.rootlength += self.attrs.root_growth_rates[c.lifestage-1] # else, remain constant
[docs]
def update_lifestage_and_stemdensity(self):
"""
Function to be called at the end of every eco year.
Updates the life stage and the year within the life stage for each cohort.
Stem density update is done here because it only depends on eco year and does not
change during the year.
"""
cohorts_to_remove = []
for i, c in enumerate(self.cohorts):
# If we have reached final year in the life stage
if c.lifestage_year == self.attrs.years_max[c.lifestage-1]:
# If unit has not yet reached final life stage
if c.lifestage != self.attrs.nls:
c.lifestage += 1 # Move this fraction to next life stage
c.lifestage_year = 1 # Restart counter for years within life stage
# TODO: Consider moving this elsewhere?
c.density = self.attrs.stemdens[c.lifestage-1] # Apply stemdens from the NEXT lifestage (lifestage was just increased by 1)
else:
# REMOVE FRACTION FROM LIST OF FRACTIONS, need to be careful with this
cohorts_to_remove.append(i)
else:
c.lifestage_year += 1
if cohorts_to_remove:
self.remove_old_cohorts(cohorts_to_remove)
def remove_old_cohorts(self, list_of_inds):
self.cohorts = [x for i, x in enumerate(self.cohorts) if i not in list_of_inds]
def get_drag(self):
# Return single drag coefficient for this species
return self.attrs.drag
[docs]
class MultipleVegetationSpecies(SharedVegMethods):
"""
A class for handling multiple vegetation species.
We want to keep the hydrodynamics code clean, avoiding extra if statements regarding
the number of species. This class handles the extra steps required, mostly containing
wrappers of functions from VegetationSpecies that distribute the tasks to each species
present.
Parameters
----------
species_list : list[VegetationSpecies]
A list of VegetationSpecies objects.
"""
[docs]
def __init__(self, species_list: list[VegetationSpecies]):
# List of individual vegetation species objects
self.species_list = species_list
self.check_species_consistency()
@property
def cohorts(self) -> list[VegCohort]:
# Flatten vegetation cohorts across species into single list
return [c for sp in self.species_list for c in sp.cohorts]
def colonization(self, ets, min_depths, max_depths, fl_dr):
for sp in self.species_list:
# Passing flattened list of all cohorts for all species for colonization calculation
sp.colonization(ets, min_depths, max_depths, fl_dr, combined_cohorts=self.cohorts)
r.report(f"Number of veg fractions total: {len(self.cohorts)}")
def stemheight_growth(self, ets):
for sp in self.species_list:
sp.stemheight_growth(ets)
def stemdiam_growth(self, ets):
for sp in self.species_list:
sp.stemdiam_growth(ets)
def root_growth(self, ets):
for sp in self.species_list:
sp.root_growth(ets)
def mortality(self, hydro_vars, morpho_vars):
for sp in self.species_list:
sp.mortality(hydro_vars, morpho_vars)
def update_lifestage_and_stemdensity(self):
for sp in self.species_list:
sp.update_lifestage_and_stemdensity()
def check_species_consistency(self):
# Inherit the "mor" input parameter from the individual species object
# We cannot have some species with mor=1 and others with mor=0
if len(set([sp.mor for sp in self.species_list])) != 1:
msg = "All vegetation species inputs must have the same value for 'mor'"
r.report(msg, level="ERROR")
raise ValueError(msg)
self.mor = self.species_list[0].mor
[docs]
def get_drag(self):
"""
Return average of drag coefficients for all species.
Theoretically, we could take the weighted average of drag attributes based on species and
lifestage, but DFM can only accept a single value. So for consistency we use a constant
value throughout the simulation.
"""
return np.mean([sp.get_drag() for sp in self.species_list])