Source code for dycove.sim.vegetation

###############################################################
#  vegetation.py
###############################################################

from pathlib import Path
import numpy as np
import json

from dycove.sim.vegetation_data import VegetationAttributes, VegCohort
from dycove.utils.simulation_reporting import Reporter
from dycove.utils.array_math import cell_averaging, sum_product, sum_elementwise
from dycove.constants import MIN_FRACTION, BURIAL_FRACTION, SCOUR_FRACTION, INIT_FRAC_MORT

r = Reporter()


##### ---------- Shared methods ---------- #####
[docs] class SharedVegMethods:
[docs] def compute_veg_model_quantities(self): """ Function to compute weighted area averages of model vegetation variables. Handles the various fractions that are tracked within each grid cell. This function works the same no matter the number of species, and so it is shared by both classes. """ # For single species, self.cohorts is a list of VegCohort objects # For multiple species, self.cohorts is a flattened list of VegCohort objects for all species cohorts = self.cohorts if not self.cohorts: self.stemdensity = None self.stemdiameter = None self.stemheight = None return # Get list of cohort fractions and other attributes fractions = [c.fraction for c in cohorts] densities = [c.density for c in cohorts] diameters = [c.diameter for c in cohorts] heights = [c.height for c in cohorts] # Stem density in a single cell: sum of n_i*frac_i over all fractions frac_i in cell self.stemdensity = sum_product(fractions, densities) # For veg height/diameter we just need a weighted average value in each cell based on cell fractions. # Cells may have a sum of fractions less than 1, so we need to safely divide by the sum of fractions self.stemdiameter = cell_averaging(fractions, diameters) self.stemheight = cell_averaging(fractions, heights)
[docs] class VegetationSpecies(SharedVegMethods): """ A single vegetation species. Can (and will) handle multiple cohorts of the species. Parameters ---------- input_veg_filename : str or Path Path to JSON file containing vegetation attributes. mor : int 1 if we want to consider morphology (burial/scour) for this vegetation unit, else 0 rand_seed_frac : float Fraction of potentially colonized cells that will actually colonize, distributed randomly. rand_seed_method : str Options are 'random' or 'deterministic'. If rand_seed_frac is between 0 and 1, 'deterministic' will use a seed to evaluate the SAME set of cells for each colonization, whereas 'random' will be truly random each time. """
[docs] def __init__(self, input_veg_filename, species_name=None, mor=0, rand_seed_frac=1.0, rand_seed_method="random", ): self.attrs = self.load_vegetation_attributes(Path(input_veg_filename)) self.mor = mor self.seed_frac = rand_seed_frac self.seed_method = rand_seed_method if species_name is not None: self.name = species_name else: self.name = str(Path(input_veg_filename).stem) # Will become list of VegCohort objects, one for each cohort/colonization that occurs self.cohorts: list[VegCohort] = []
[docs] @staticmethod def load_vegetation_attributes(filename: Path) -> VegetationAttributes: """ Parse vegetation input json file and store attributes in a dataclass. """ with open(filename, "r") as f: data = json.load(f) ls_data = data.pop("life_stage_attr") for key in ls_data[0].keys(): data[key] = [stage[key] for stage in ls_data] return VegetationAttributes(**data)
[docs] def colonization(self, ets, min_depths, max_depths, fl_dr, combined_cohorts=None): """ Colonize a :class:`~dycove.sim.vegetation_data.VegCohort` by adding a new fraction to cells. This method only adds new fractions. Effective stem diameters and heights are computed elsewhere. Parameters ---------- ets : int Current ecological timestep within current ecological year. min_depths : numpy.ndarray Array of minimum water depths [m] at each cell over the previous period. max_depths : numpy.ndarray Array of maximum water depths [m] at each cell over the previous period. fl_dr : float Wet/dry threshold [m]; passed from constants.py). combined_cohorts : list of VegetationSpecies or None, optional Relevant only if multiple species are present. Provides information about other species occupying space in cells. See colonization method of :class:`~dycove.sim.vegetation.MultipleVegetationSpecies` for the use-case. """ if self.attrs.start_col_ets <= ets < self.attrs.end_col_ets: r.report(f"Doing colonization for species \"{self.name}\", at ETS {ets}") # Get conditions for colonization dry_cond = (min_depths < fl_dr) fld_cond = (max_depths >= fl_dr) # Get indices for potential colonization potential_inds = self.find_potential_inds(dry_cond, fld_cond) # Create mask for potential colonization based on input fraction and method rand_mask = self.create_seed_fraction_mask(len(min_depths)) # Apply potential inds to mask potential_inds_mask = np.zeros_like(rand_mask, dtype=bool) potential_inds_mask[potential_inds] = True selected_inds = potential_inds_mask & rand_mask existing_cohorts = combined_cohorts if combined_cohorts else self.cohorts new_fraction = self.compute_new_fraction(existing_cohorts, selected_inds) # Create new cohort for latest colonization # new_fraction is an array, other quantities are scalars new_cohort = VegCohort( name=self.name, fraction=new_fraction, density=self.attrs.stemdens[0], diameter=self.attrs.stemdiam_0, height=self.attrs.stemht_0, rootlength=self.attrs.rootlength_0, lifestage=1, lifestage_year=1, ) self.cohorts.append(new_cohort)
def find_potential_inds(self, dry_cell_arr: np.ndarray, fld_cell_arr: np.ndarray): # Cells must experience both wet AND dry conditions during previous ETS if self.attrs.col_method == 1: return np.where(dry_cell_arr & fld_cell_arr)[0] # Cells must be wet at some point during previous ETS elif self.attrs.col_method == 2: return np.where(fld_cell_arr)[0] # Cells must be dry at some point during previous ETS elif self.attrs.col_method == 3: return np.where(dry_cell_arr)[0] # All cells can be colonized regardless of prior conditions elif self.attrs.col_method == 4: return np.arange(len(dry_cell_arr))
[docs] def create_seed_fraction_mask(self, array_len: int): """ Create a mask based on random seeding """ # Number of indices where we will allow colonization n_inds = int(np.floor(self.seed_frac*array_len)) # Initialize mask with all False rand_mask = np.zeros(array_len, dtype=bool) # Set RNG depending on seeding style choice if self.seed_method == "deterministic": rng = np.random.default_rng(0) else: rng = np.random.default_rng() inds = rng.choice(array_len, n_inds, replace=False) rand_mask[inds] = True return rand_mask
[docs] def compute_new_fraction(self, existing_cohorts: list, inds2colonize: np.ndarray): """ Compute a new cohort’s fractional cover based on available space. """ new_fraction = np.zeros(len(inds2colonize)) existing_fractions = [c.fraction for c in existing_cohorts] if not existing_fractions: # Create vegetation fraction array based on the hydro conditions and random filtering new_fraction[inds2colonize] = self.attrs.fraction_0 else: # TODO: Maybe find a better way of avoiding continuous appends with each colonization. # E.g., avoid adding a new fraction if no space anywhere, etc. But that would mean # that ZERO cells anywhere have favorable hydro conditions AND space available # (not many cases). # Determine available space based on sum of fractions in each cell fraction_uncovered = 1 - sum_elementwise(existing_fractions) # Compute new fraction to be colonized; min: space available, max: initial veg fraction candidate_fraction = np.minimum(fraction_uncovered, self.attrs.fraction_0) # We apply new fractions where partial flooding condition is met, and within cells selected via seed_frac new_fraction[inds2colonize] = candidate_fraction[inds2colonize] return new_fraction
[docs] def mortality(self, hydro_vars, morpho_vars): """ Delegate to internal methods. """ self.mortality_hydrodynamic(**hydro_vars) self.mortality_morphodynamic(**morpho_vars) self.apply_mortality()
#self.apply_mortality_using_initial_fractions()
[docs] def mortality_hydrodynamic(self, fld_frac, dry_frac, vel_max): """ Compute linear mortality functions for each hydrodynamic stressor (if activated). Hydrodynamic mortality is computed independently from vegetation characteristics like stem height, root length, etc. Mortality parameters can be dependent on life stage though, so we compute mortality for each life stage and apply to the fractions later (self.apply_mortality). Parameters ---------- fld_frac : numpy.ndarray Array containing fractions of time that each cell was flooded during the previous period. dry_frac : numpy.ndarray Array containing fractions of time that each cell was dry during the previous period. vel_max : numpy.ndarray Array of maximum water velocities [m/s] at each cell over the previous period. """ # These arrays of potential mortality get saved as cohort attributes, along with applied mortality below self.potential_mort_flood = [None]*self.attrs.nls self.potential_mort_desic = [None]*self.attrs.nls self.potential_mort_uproot = [None]*self.attrs.nls # Loop over life stages present in vegetation file, thresholds depend on lifestage for n in range(self.attrs.nls): if self.attrs.flood_no_mort[n] == 0: # if flood mortality is turned off, flag = 0 (TODO: create a better flag) self.potential_mort_flood[n] = np.zeros_like(fld_frac) else: self.potential_mort_flood[n] = self.linear_mortality_func(fld_frac, self.attrs.flood_no_mort[n], self.attrs.flood_all_mort[n]) if self.attrs.desic_no_mort[n] == 0: # if dessication is turned off, flag = 0 (TODO: create a better flag) self.potential_mort_desic[n] = np.zeros_like(dry_frac) else: self.potential_mort_desic[n] = self.linear_mortality_func(dry_frac, self.attrs.desic_no_mort[n], self.attrs.desic_all_mort[n]) if self.attrs.uproot_no_mort[n] == 0: # if uprooting is turned off, flag = 0 (TODO: create a better flag) self.potential_mort_uproot[n] = np.zeros_like(vel_max) else: self.potential_mort_uproot[n] = self.linear_mortality_func(vel_max, self.attrs.uproot_no_mort[n], self.attrs.uproot_all_mort[n])
# TODO: Verify that we want the default scour_frac to be 10%, previous codes have just used 100% same as stem burial
[docs] def mortality_morphodynamic(self, bl_diff, burial_frac=BURIAL_FRACTION, scour_frac=SCOUR_FRACTION): """ Compute linear mortality functions for each morphodynamic stressor (if activated). Morphodynamic stressors (burial, scour) are not currently functions of life stage, and they are binary (e.g., stem is either buried or not). Creates arrays of zeros if using a non-morphology model or with morphology turned off. Parameters ---------- bl_diff : numpy.ndarray Array of cell-wise differences in bed level [m], from beginning to end of the previous period, where positive values signify burial. burial_frac : float Fraction of stem height above which vegetation is considered buried. scour_frac : float Fraction of root length above which vegetation is considered scoured. Notes ----- - If morphology is turned on (mor=1), but morphology is not active in the model (e.g., ANUGA) bl_diff will be passed as zero-difference arrays, and scour/burial will be set to zero. """ # Loop over fractions and their current shoot/root lengths and compare to erosion/sedimentation for c in self.cohorts: # If morphology is off, we still need these zero arrays for calculations in apply_mortality() if self.mor == 0: c.potential_mort_burial = np.zeros_like(c.fraction) c.potential_mort_scour = np.zeros_like(c.fraction) else: c.potential_mort_burial = np.where(bl_diff >= burial_frac*c.height, 1, 0) c.potential_mort_scour = np.where(bl_diff <= -scour_frac*c.rootlength, 1, 0)
[docs] def apply_mortality(self): """ Multiply potential mortality by vegetation fractions to determine actual mortality. Populate mortality-related fields of each active VegCohort object. """ # TODO: We track the fractions lost via each cause, but if all causes yield mortality fractions of 1, # how should we track causes when, for instance, a cell only had a single fraction covering 40%? # Is there an order of operations? for c in self.cohorts: # Reference fraction (initial or existing fraction) used in applied mortality calculations ref_fraction = self.attrs.fraction_0 if INIT_FRAC_MORT else c.fraction # Vegetation fractions lost to flooding c.potential_mort_flood = self.potential_mort_flood[c.lifestage-1] c.applied_mort_flood = np.where(c.fraction > MIN_FRACTION, ref_fraction * c.potential_mort_flood, 0.) # Vegetation fractions lost to dessication c.potential_mort_desic = self.potential_mort_desic[c.lifestage-1] c.applied_mort_desic = np.where(c.fraction > MIN_FRACTION, ref_fraction * c.potential_mort_desic, 0.) # Vegetation fractions lost to uprooting c.potential_mort_uproot = self.potential_mort_uproot[c.lifestage-1] c.applied_mort_uproot = np.where(c.fraction > MIN_FRACTION, ref_fraction * c.potential_mort_uproot, 0.) # Vegetation fractions lost to deposition c.applied_mort_burial = np.where(c.fraction > MIN_FRACTION, ref_fraction * c.potential_mort_burial, 0.) # Vegetation fractions lost to erosion c.applied_mort_scour = np.where(c.fraction > MIN_FRACTION, ref_fraction * c.potential_mort_scour, 0.) # Subtract all mortality fractions from actual fractions, but maintain minimum fraction of zero c.applied_mort_total = c.applied_mort_flood + c.applied_mort_desic + c.applied_mort_uproot + \ c.applied_mort_burial + c.applied_mort_scour fractions_left = c.fraction - c.applied_mort_total fractions_left = np.maximum(fractions_left, 0) # no negative fractions # Update fractions in cohort # For fractions that decay slowly over time, round down to zero when they get small enough c.fraction = np.where(fractions_left > MIN_FRACTION, fractions_left, 0.)
# def apply_mortality_using_initial_fractions(self): # """ # Replacing `c.fraction` with `self.attrs.fraction_0`, so mortality is # always a function of initial colonization fraction. # ! ---------- NOT CURRENTLY IN USE OR TESTED ----------- ! # """ # for c in self.cohorts: # # Vegetation fractions lost to flooding # c.potential_mort_flood = self.potential_mort_flood[c.lifestage-1] # # For accounting purposes, no mortality if there is no fraction # c.applied_mort_flood = np.where(c.fraction > 0.01, self.attrs.fraction_0*c.potential_mort_flood, 0.) # # Vegetation fractions lost to dessication # c.potential_mort_desic = self.potential_mort_desic[c.lifestage-1] # c.applied_mort_desic = np.where(c.fraction > 0.01, self.attrs.fraction_0*c.potential_mort_desic, 0.) # # Vegetation fractions lost to uprooting # c.potential_mort_uproot = self.potential_mort_uproot[c.lifestage-1] # c.applied_mort_uproot = np.where(c.fraction > 0.01, self.attrs.fraction_0*c.potential_mort_uproot, 0.) # # Vegetation fractions lost to deposition # c.applied_mort_burial = np.where(c.fraction > 0.01, self.attrs.fraction_0*c.potential_mort_burial, 0.) # # Vegetation fractions lost to erosion # c.applied_mort_scour = np.where(c.fraction > 0.01, self.attrs.fraction_0*c.potential_mort_scour, 0.) # # Subtract all mortality fractions from actual fractions, but maintain minimum fraction of zero # c.applied_mort_total = c.applied_mort_flood + c.applied_mort_desic + c.applied_mort_uproot + \ # c.applied_mort_burial + c.applied_mort_scour # fractions_left = c.fraction - c.applied_mort_total # fractions_left = np.maximum(fractions_left, 0) # no negative fractions # # Update fractions in cohort # # For fractions that decay slowly over time, round down to zero when they get small enough # c.fraction = np.where(fractions_left > MIN_FRACTION, fractions_left, 0.)
[docs] @staticmethod def linear_mortality_func(stressor, th_min, th_max): """ Generic, hydrodynamic stressor mortality function (linear). Parameters ---------- stressor : numpy.ndarray Array of relevant stressor magnitude for each cell. For flooding/dessication, it is an array of fraction of time where cells are wet/dry. For uprooting, it is an array of maximum velocities. th_min : float Stressor value below which there is no mortality. Comes from JSON input file. th_max : float Stressor value above which there is total mortality. Comes from JSON input file. """ # Compute fractional mortality based on linear interpolation mort_frac = (stressor - th_min)/(th_max - th_min) # Updated, cleaner method to just bring all fractions outside the 0-1 range to 0 and 1 mort_frac[mort_frac > 1] = 1 mort_frac[mort_frac < 0] = 0 return mort_frac
# # TODO: Re-implement this, removed once the multi-species capability was added # # The likelihood of this being called is fairly low # def clean_out_old_fractions(self): # # If all fractions within a given cohort are zero, remove it from the list # cohorts_to_remove = [] # for i, c in enumerate(self.cohorts): # if np.sum(c.fraction) < 0.001: # cohorts_to_remove.append(i) # r.report("Removed a cohort that has disappeared.") # if cohorts_to_remove: # self.remove_old_cohorts(cohorts_to_remove)
[docs] def stemheight_growth(self, ets): """ Computes stem height based on growth functions in VegAttributes. """ # During first growth ets, height starts at previous winter value, so no need to loop if ets > self.attrs.start_growth_ets: for c in self.cohorts: # If none of these conditions are met, height stays the same if ets <= self.attrs.end_growth_ets and c.height < self.attrs.stemht_max[c.lifestage-1]: c.height += self.attrs.ht_growth_rates[c.lifestage-1] elif ets >= self.attrs.winter_ets: # Drop down to winter height, but by some chance if we are already below winter max, do nothing c.height = min(c.height, self.attrs.stemht_winter_max[c.lifestage-1])
[docs] def stemdiam_growth(self, ets): """ Computes stem diameter based on growth functions in VegAttributes. """ # During first growth ets, height starts at previous winter value, so no need to loop # (growth realized by next ETS) if self.attrs.start_growth_ets < ets <= self.attrs.end_growth_ets: for c in self.cohorts: if c.diameter < self.attrs.stemdiam_max[c.lifestage-1] and ets < self.attrs.winter_ets: c.diameter += self.attrs.diam_growth_rates[c.lifestage-1] # else, remains constant
[docs] def root_growth(self, ets): """ Computes root length based on growth functions in VegAttributes. """ # During first growth ETS, height starts at previous winter value, so no need to loop # (growth realized by next ETS) if self.attrs.start_growth_ets < ets <= self.attrs.end_growth_ets: for c in self.cohorts: if c.rootlength < self.attrs.rootlength_max[c.lifestage-1] and ets < self.attrs.winter_ets: c.rootlength += self.attrs.root_growth_rates[c.lifestage-1] # else, remain constant
[docs] def update_lifestage_and_stemdensity(self): """ Function to be called at the end of every eco year. Updates the life stage and the year within the life stage for each cohort. Stem density update is done here because it only depends on eco year and does not change during the year. """ cohorts_to_remove = [] for i, c in enumerate(self.cohorts): # If we have reached final year in the life stage if c.lifestage_year == self.attrs.years_max[c.lifestage-1]: # If unit has not yet reached final life stage if c.lifestage != self.attrs.nls: c.lifestage += 1 # Move this fraction to next life stage c.lifestage_year = 1 # Restart counter for years within life stage # TODO: Consider moving this elsewhere? c.density = self.attrs.stemdens[c.lifestage-1] # Apply stemdens from the NEXT lifestage (lifestage was just increased by 1) else: # REMOVE FRACTION FROM LIST OF FRACTIONS, need to be careful with this cohorts_to_remove.append(i) else: c.lifestage_year += 1 if cohorts_to_remove: self.remove_old_cohorts(cohorts_to_remove)
def remove_old_cohorts(self, list_of_inds): self.cohorts = [x for i, x in enumerate(self.cohorts) if i not in list_of_inds] def get_drag(self): # Return single drag coefficient for this species return self.attrs.drag
[docs] class MultipleVegetationSpecies(SharedVegMethods): """ A class for handling multiple vegetation species. We want to keep the hydrodynamics code clean, avoiding extra if statements regarding the number of species. This class handles the extra steps required, mostly containing wrappers of functions from VegetationSpecies that distribute the tasks to each species present. Parameters ---------- species_list : list[VegetationSpecies] A list of VegetationSpecies objects. """
[docs] def __init__(self, species_list: list[VegetationSpecies]): # List of individual vegetation species objects self.species_list = species_list self.check_species_consistency()
@property def cohorts(self) -> list[VegCohort]: # Flatten vegetation cohorts across species into single list return [c for sp in self.species_list for c in sp.cohorts] def colonization(self, ets, min_depths, max_depths, fl_dr): for sp in self.species_list: # Passing flattened list of all cohorts for all species for colonization calculation sp.colonization(ets, min_depths, max_depths, fl_dr, combined_cohorts=self.cohorts) r.report(f"Number of veg fractions total: {len(self.cohorts)}") def stemheight_growth(self, ets): for sp in self.species_list: sp.stemheight_growth(ets) def stemdiam_growth(self, ets): for sp in self.species_list: sp.stemdiam_growth(ets) def root_growth(self, ets): for sp in self.species_list: sp.root_growth(ets) def mortality(self, hydro_vars, morpho_vars): for sp in self.species_list: sp.mortality(hydro_vars, morpho_vars) def update_lifestage_and_stemdensity(self): for sp in self.species_list: sp.update_lifestage_and_stemdensity() def check_species_consistency(self): # Inherit the "mor" input parameter from the individual species object # We cannot have some species with mor=1 and others with mor=0 if len(set([sp.mor for sp in self.species_list])) != 1: msg = "All vegetation species inputs must have the same value for 'mor'" r.report(msg, level="ERROR") raise ValueError(msg) self.mor = self.species_list[0].mor
[docs] def get_drag(self): """ Return average of drag coefficients for all species. Theoretically, we could take the weighted average of drag attributes based on species and lifestage, but DFM can only accept a single value. So for consistency we use a constant value throughout the simulation. """ return np.mean([sp.get_drag() for sp in self.species_list])