from unicodedata import name
import bw2data as bd
import bw2calc as bc
from abc import ABC, abstractmethod
from pydantic import BaseModel, ConfigDict, Field, JsonValue
from typing import Dict, Union, Optional,Callable, Literal
import datetime
import pint
import warnings
from enum import StrEnum
from functools import wraps
import functools
import pandas as pd
[docs]
def update_params(func):
"""Decorator for overwrite params before call of concrete methods.
"""
@functools.wraps(func)
def wrapper(self, **model_params):
self.params =self.params | model_params
return func(self, **model_params)
return wrapper
[docs]
def wrap_init(func):
"""Decorator executed before __init__ to load default parameters as instance attributes.
"""
@functools.wraps(func)
def wrapper(self, name, init_arg=None, **model_params):
self.name = name
self.ureg=pint.UnitRegistry()
# check and create parameter dict:
self.params = model_params
for _, p in self.__class__.parameters.items():
if p.name not in model_params and p.default == None:
raise Exception(
f'''The parameter {p.name} is not defined.
It mus be passed as parameter in the __init__ method or be defined as default parameter in the class definition.'''
)
elif p.name not in model_params and p.default != None:
self.params[p.name]= p.default
undefined_params= [p for p in model_params if p not in self.__class__.parameters]
if len(undefined_params) >0:
raise Warning(
f'''The parameters {list(undefined_params)} are not defined for this model. No validy check possible for this parameter.
Please check if the parameter name is correct and if it is defined in the model class definition.'''
)
if 'location' in self.__class__.__dict__:
self.location = self.__class__.location
else:
self.location = 'GLO'
#self.init_model(init_arg, **model_params)
#self.define_flows()
self.converges= False
return func(self)
return wrapper
[docs]
def check_params(func):
"""Decorater for check if all parameter are defined."""
@functools.wraps(func)
def wrapper(self, **model_params):
for _, p in self.parameters.items():
if p.name not in self.params:
raise Exception(f'The parameter {p.name} is not defined. It mus be passed as parameter in the init_model, calculate_model methods or be defined somewhere else.')
elif p.min:
if p.min > self.params[p.name]:
raise Exception(f'The parameter {p.name} is smaler than the defined minimum value of {p.min}. Choose a larger value.')
elif p.max:
if p.max < self.params[p.name]:
raise Exception(f'The parameter {p.name} is larger than the defined maximum value of {p.max}. Choose a smaler value.')
return func(self, **model_params)
return wrapper
[docs]
class parameter(BaseModel):
model_config = ConfigDict(arbitrary_types_allowed=True)
name: str
val: Union[float, list, int, str, pint.Quantity, None] = None
default: Union[float, list, int, str, pint.Quantity, None]
min: Union[float, int, str, pint.Quantity, None] = None
max: Union[float, int, str, pint.Quantity, None] = None
description: Union[str, dict[str, str], None] = None
reference: Union[str, dict[str, str], None] = None
[docs]
class SimModel(ABC):
"""Class containing a simulation model.
Args:
name: model name.
init_arg: Arguments needed for initialising the model.
**model_params: Parameters for the simulation Model.
"""
reference={
'type': 'misc',
'key': '',
'author' :'',
'title' : '',
'license': '',
'url': ''
}
# Description of the model:
description=''
# needed parameters for the model:
parameters={}
#def __init__(self, name, init_arg=None, **model_params):
# super().__init__()
'''self.name = name
self.ureg=pint.UnitRegistry()
# check and create parameter dict:
self.params = model_params
for _, p in self.__class__.parameters.items():
if p.name not in model_params and p.default == None:
raise Exception(
f\'''The parameter {p.name} is not defined.
it mus be passed as parameter in the __init__ method or be defined as default parameter in the class definition.\
'''
)
elif p.name not in model_params and p.default != None:
self.params[p.name]= p.default
undefined_params= [p for p in model_params if p not in self.__class__.parameters]
if len(undefined_params) >0:
raise Warning(
f\'''The parameters {list(undefined_params)} are not defined for this model. No validy check possible for this parameter.
Please check if the parameter name is correct and if it is defined in the model class definition.\'''
)
self.location = 'GLO'
#self.init_model(init_arg, **model_params)
#self.define_flows()
self.converges= False
'''
def __init_subclass__(cls, **kwargs):
super().__init_subclass__(**kwargs)
cls.__init__ = wrap_init(cls.__init__)
if 'init_model' in cls.__dict__:
cls.init_model = update_params(cls.init_model)
if 'calculate_model' in cls.__dict__:
cls.calculate_model = update_params(check_params(cls.calculate_model))
if 'recalculate_model' in cls.__dict__:
cls.recalculate_model = update_params(check_params(cls.recalculate_model))
[docs]
@abstractmethod
def init_model(self, **model_params):
'''Abstract method to initiate the model.
Args:
**model_params: Parameters for the simulation Model.
'''
self.params= self.params|model_params
[docs]
@abstractmethod
def calculate_model(self, **model_params):
'''Abstract method to calculate the model based on the parameters provided.
'''
pass
[docs]
def recalculate_model(self, **model_params):
'''Method to recalculate the model based on the parameters provided.
Executes a new initialising and calculation of the model. Might be
overwritten if a better and faster way to recalculate the model is possible.
'''
self.init_model(**model_params)
self.calculate_model(**model_params)
@property
def technosphere(self):
'''Property of the model technosphere flows.
A dict of technosphere flows, wich needs to be filled by the modelInterface class with brightway datasets.
Dict of the schema:
{'model_flow name': simodin.interface.technosphere_edge }
'''
return self._technosphere
@technosphere.setter
def technosphere(self, technosphere_dict):
'''
Setter to define the model technosphere flows.
'''
self._technosphere= technosphere_dict
@technosphere.getter
def technosphere(self):
data={
"description":[],
"amount":[],
"functional":[],
"dataset_correction":[],
"reference":[],
"allocationfactor":[],
"model_unit":[],
"impact":[],
"source":[],
"target":[],
}
for k, v in self._technosphere.items():
for key, val in data.items():
if key =='amount':
val.append(getattr(v, key)())
else:
val.append(getattr(v, key))
df = pd.DataFrame(data, index = self._technosphere.keys())
return(df)
[docs]
def print_technosphere(self, prop_list):
data= {arg:[] for arg in prop_list }
for k, v in self._technosphere.items():
for key, val in data.items():
if key =='amount':
val.append(getattr(v, key)())
else:
val.append(getattr(v, key))
df = pd.DataFrame(data, index = self._technosphere.keys())
return(df)
@property
def biosphere(self) -> dict:
'''
Property of the model biosphere flows.
Dict of the schema:
{'model_flow name': simodin.interface.biosphere_edge }
'''
return self._biosphere
@biosphere.setter
def biosphere(self, biosphere_dict):
'''
Setter to define the model biosphere flows.
Dict of the schema:
{'model_flow name': simodin.interface.biosphere_edge }
'''
self._biosphere= biosphere_dict
@biosphere.getter
def biosphere(self):
data={
"description":[],
"amount":[],
"source":[],
"target":[],
"dataset_correction":[],
}
for k, v in self._biosphere.items():
for key, val in data.items():
if key =='amount':
val.append(getattr(v, key)())
else:
val.append(getattr(v, key))
df = pd.DataFrame(data, index = self._biosphere.keys())
return(df)
[docs]
@abstractmethod
def define_flows(self):
'''Abstract method to define the model flows.
'''
pass
[docs]
def set_flow_attr(self, flow_name, flow_property, value):
'''Set a property of a flow.
Args:
flow_name: Name of the flow to be set.
flow_property: Property of the flow to be set.
value: Value to be set.
'''
if flow_name in self._technosphere:
if hasattr(self._technosphere[flow_name], flow_property):
setattr(self._technosphere[flow_name], flow_property, value)
else:
raise ValueError(f'Flow property {flow_property} not found in technosphere flow {flow_name}.')
elif flow_name in self._biosphere:
if hasattr(self._biosphere[flow_name], flow_property):
setattr(self._biosphere[flow_name], flow_property, value)
else:
raise ValueError(f'Flow property {flow_property} not found in biosphere flow {flow_name}.')
else:
raise ValueError(f'Flow {flow_name} not found in technosphere or biosphere.')
[docs]
def add_flow(self, flow: Union['technosphere_edge', 'biosphere_edge']):
'''Add a flow to the flow dicts.
Args:
flow: Flow to be added.
'''
if isinstance(flow, technosphere_edge):
if not hasattr(self, '_technosphere'):
self._technosphere= {}
self._technosphere[flow.name]= flow
elif isinstance(flow, biosphere_edge):
if not hasattr(self, '_biosphere'):
self._biosphere= {}
self._biosphere[flow.name]= flow
else:
raise ValueError(f'Flow {flow} is not a valid technosphere or biosphere flow.')
@property
def citation(self):
'''Citation of the model:
'''
# pydantic schema adapted from bw_interface_schemas:
# https://github.com/brightway-lca/bw_interface_schemas
[docs]
class QuantitativeEdgeTypes(StrEnum):
technosphere = "technosphere"
biosphere = "biosphere"
characterization = "characterization"
weighting = "weighting"
normalization = "normalization"
[docs]
class technosphereTypes(StrEnum):
product= "product"
substitution= "substitution"
input= "input"
output= "output"
[docs]
class Edge(BaseModel):
model_config = ConfigDict(arbitrary_types_allowed=True)
edge_type: str
source: bd.backends.proxies.Activity | SimModel|None = None
target: bd.backends.proxies.Activity | SimModel|None = None
comment: Union[str, dict[str, str], None] = None
tags: dict[str, JsonValue] | None = None
properties: dict[str, JsonValue] | None = None
name: str
[docs]
class QuantitativeEdge(Edge):
"""An quantitative edge linking two nodes in the graph."""
edge_type: QuantitativeEdgeTypes
amount: Callable # Union[pint.Quantity, float,Callable]
uncertainty_type: int | None = None
loc: float | None = None
scale: float | None = None
shape: float | None = None
minimum: float | None = None
maximum: float | None = None
negative: bool | None = None
description: Union[str, dict[str, str], None] = None
default_name: str = ''
default_code: str = ''
dataset_correction: float | None = None
[docs]
class technosphere_edge(QuantitativeEdge):
"""A technosphere flow."""
functional: bool = False
reference: bool = False
edge_type: Literal[QuantitativeEdgeTypes.technosphere] = (
QuantitativeEdgeTypes.technosphere
)
model_unit: Union[pint.Unit, str, None] =None
dataset_unit: Union[pint.Unit, str, None] =None
allocationfactor: Union[float, Callable] =1.0
type: technosphereTypes
database: Union[str, None]=None
dataset: Union[str, None]=None
impact: Union[dict[str,Union[pint.Quantity, float]], None]=None
[docs]
class biosphere_edge(QuantitativeEdge):
"""A biosphere flow."""
edge_type: Literal[QuantitativeEdgeTypes.biosphere] = (
QuantitativeEdgeTypes.biosphere
)
model_unit: Union[pint.Unit, str, None] =None
dataset_unit: Union[pint.Unit, str, None] =None
[docs]
class modelInterface(BaseModel):
'''Class for interface external simulation models with brightway25.
Attributes:
-----------
name: Name of the model.
model: The Simulation model as SimModel class.
'''
model_config = ConfigDict(arbitrary_types_allowed=True)
model: SimModel
name: str
params: Optional[Dict[str, Union[float, int, bool, str]]]=None
methods: list=[]
converged: bool= False
ureg: pint.UnitRegistry=pint.UnitRegistry()
method_config: Dict={}
impact_allocated: Dict={}
impact_dissag: Dict={}
impact: dict={}
lca: Optional[bc.MultiLCA]=None
_reference_flow: str=''
def __init__(self, name, model):
super().__init__(name=name, model=model)
self.params = self.model.params
self.ureg = self.model.ureg
[docs]
def add_dataset(self, flow_name, dataset):
'''Link a brightway25 dataset to a model flow.
Args:
flow_name: Name of the flow to be linked.
dataset: Brightway25 dataset to be linked.
'''
if flow_name in self.model._technosphere:
if not isinstance(dataset, bd.backends.proxies.Activity):
raise ValueError(f'Dataset {dataset} is not a valid brightway25 activity.')
if self.model._technosphere[flow_name].target == self.model:
self.model._technosphere[flow_name].source= dataset
elif self.model._technosphere[flow_name].source == self.model:
self.model._technosphere[flow_name].target= dataset
elif flow_name in self.model._biosphere:
if not isinstance(dataset, bd.backends.proxies.Activity):
raise ValueError(f'Dataset {dataset} is not a valid brightway25 biosphere exchange.')
if self.model._biosphere[flow_name].target == self.model:
self.model._biosphere[flow_name].source= dataset
elif self.model._biosphere[flow_name].source == self.model:
self.model._biosphere[flow_name].target= dataset
else:
raise ValueError(f'Flow {flow_name} not found in technosphere or biosphere.')
[docs]
def remove_dataset(self, flow_name):
if flow_name in self.model._technosphere:
if self.model._technosphere[flow_name].target == self.model:
self.model._technosphere[flow_name].source= None
elif self.model._technosphere[flow_name].source == self.model:
self.model._technosphere[flow_name].target= None
elif flow_name in self.model._biosphere:
if self.model._biosphere[flow_name].target == self.model:
self.model._biosphere[flow_name].source= None
elif self.model._biosphere[flow_name].source == self.model:
self.model._biosphere[flow_name].target= None
else:
raise ValueError(f'Flow {flow_name} not found in technosphere or biosphere.')
[docs]
def calculate_background_impact(self):
'''
Calculate the background impact based on the parameters provided.
'''
if self.model._technosphere is None:
raise ValueError("technosphere dict not created. Define and call 'link_technosphere' first.")
background_flows={}
for name, ex in self.model._technosphere.items():
if ex.functional:
continue
if ex.source == self.model:
if isinstance(ex.target, bd.backends.proxies.Activity):
background_flows[name]= {ex.target.id:1}
else:
if isinstance(ex.source, bd.backends.proxies.Activity):
background_flows[name]= {ex.source.id:1}
self.method_config= {'impact_categories':self.methods}
if len(background_flows)==0:
raise ValueError("Technosphere dict got no technosphere flows with an assigned brightway25 activity. LCA calculation abborted.")
data_objs = bd.get_multilca_data_objs(background_flows, self.method_config)
self.lca = bc.MultiLCA(demands=background_flows,
method_config=self.method_config,
data_objs=data_objs
)
self.lca.lci()
self.lca.lcia()
[docs]
def calculate_impact(self):
'''Calculate the impact and returns the allocated impact.
'''
if not hasattr(self, 'lca'):
self.calculate_background_impact()
self._get_reference()
self.impact_allocated = {}
self.impact = {}
self.impact_dissag = {}
for cat in self.method_config['impact_categories']:
self.impact[cat] = 0
self.impact_dissag[cat]={}
#iterate over technosphere flows:
for name, ex in self.model._technosphere.items():
if ex.functional:
continue
#check if technosphere is linked to a bw activity
if not isinstance(ex.target, bd.backends.proxies.Activity) and ex.source == self.model:
continue
if not isinstance(ex.source, bd.backends.proxies.Activity) and ex.target == self.model:
continue
score=self.lca.scores[(cat, name)]*self._get_flow_value(ex)
if ex.dataset_correction != None:
score*= ex.dataset_correction
self.impact[cat] += score
self.impact_dissag[cat][ex.name]=score
# iterate over biosphere flows:
for name, ex in self.model._biosphere.items():
cf_list=bd.Method(cat).load()
if ex.source == self.model:
factor= [flow for flow in cf_list if flow[0]== ex.target.id]
if ex.target== self.model:
factor= [flow for flow in cf_list if flow[0]== ex.source.id]
if len(factor)!=0:
self.impact[cat] += self._get_flow_value(ex)*factor[0][1]
self.impact_allocated[cat]={}
# iterate over functional units:
#if isinstance(self.functional_unit, dict):
for name, ex in self.model._technosphere.items():
if not ex.functional:
continue
else:
if callable(ex.allocationfactor):
allocationfactor= ex.allocationfactor()
else:
allocationfactor= ex.allocationfactor
self.impact_allocated[cat][name] =(
self.impact[cat] *
allocationfactor/self._get_flow_value(self.model._technosphere[self._reference_flow])
)
if ex.impact is None:
ex.impact={}
ex.impact[cat]=self.impact_allocated[cat][name]
return self.impact_allocated
def _get_reference(self):
'''Sets the reference flow according to technosphere definition.
Iterates through the technosphere flows and check for reference flows.
Raises error if more than one reference flows are defined.
'''
ref_list=[name for name, ex in self.model._technosphere.items()
if ex.reference]
if len(ref_list)>1:
raise ValueError(f'More than one reference flows. You have to define only one reference flow with `model.set_flow_attr()`. '
'The list of flows with reference flows is: {ref_list}.')
elif len(ref_list)==0:
raise ValueError(f'No reference flow defined. Use `model.set_flow_attr()` to define exactly one reference flow.')
else:
self._reference_flow= ref_list[0]
def _get_flow_value(self, ex):
'''Get the correct amount value and transform to the correct unit if possible.
Args:
ex: Exchange flow
Returns:
Amount: Amount as float.
'''
if callable(ex.amount):
amount=ex.amount()
else:
amount= ex.amount
warnings.warn(f"No unit check possible for functional flow {ex.name}. Provide the desired output unit in 'technosphere_edge.model_unit' property.",UserWarning)
# check for unit and transform it in the correct unit if possible.
# get dataset unit:
if ex.target!= None and ex.source!= None:
if isinstance(ex.dataset_unit, str):
dataset_unit= ex.dataset_unit
elif ex.target == self.model and 'unit' in ex.source:
dataset_unit=ex.source.get('unit')
elif ex.source == self.model and 'unit' in ex.target:
dataset_unit=ex.target.get('unit')
else:
raise ValueError(f'No dataset unit available for {ex.name}.')
else:
if not ex.functional:
dataset_unit= 'NaU'
raise ValueError(f'No dataset available for {ex.name}.')
else:
if hasattr(ex, 'model_unit') and ex.model_unit!=None and isinstance(ex.model_unit, pint.Unit):
dataset_unit= ex.model_unit
elif hasattr(ex, 'model_unit') and ex.model_unit!=None:
dataset_unit= ex.model_unit
else:
dataset_unit='NaU'
warnings.warn(f"No unit check possible for functional flow {ex.name}. Provide the desired output unit in 'technosphere_edge.model_unit' property.",UserWarning)
# get model flow unit:
# if pint quantity:
if isinstance(amount, pint.Quantity):
if isinstance(dataset_unit, pint.Unit) or dataset_unit in self.model.ureg :
return amount.m_as(dataset_unit)
elif dataset_unit not in self.model.ureg:
#if dataset_unit != ' ':
warnings.warn(f"The model_unit of {ex.name} got no valid Pint Unit. Ignore unit transformation and internal model unit is choosen.", UserWarning)
ex.model_unit = amount.u
return amount.m
# if no pint quantity
elif ex.model_unit!=None and ex.model_unit in self.model.ureg:
if ex.target!= None:
if ex.target == self.model:
return self.model.ureg.Quantity(amount, ex.model_unit).m_as(ex.source.get('unit'))
elif ex.source ==self.model:
return self.model.ureg.Quantity(amount, ex.model_unit).m_as(ex.target.get('unit'))
elif ex.type =='product':
return amount
else:
return amount
elif ex.model_unit!=None and ex.model_unit not in self.model.ureg:
warnings.warn(f"The model flow of {ex.name} got no valid Pint Unit. Ignore unit transformation.", UserWarning)
return amount
else:
warnings.warn(f'No unit check possible for {ex.name}. Use pint units if possible or provide pint compatible model unit name.',UserWarning)
return amount
[docs]
def export_to_bw(self, database=None, identifier=None):
'''Export the model to a brightway dataset.
Creates the database simulation_model_db if no database is passed. Creates a identifier by the model name,
functional unit flow name, and a time stamp if none is passed.
Args:
database: Database in which the model activity should be exported. Default is "simodin_db"
identifier: code for the brightway activity. If empty, the activity code will be created
by the name of the mode, the name of the functional flow, and a timestamp.
If provided but multifunctional, it will create a dataset for each with a code consisting
of the name of the functional flow and the provided identifier.
'''
if not hasattr(self, 'impact_allocated'):
self.calculate_impact()
if database== None:
database = f"simodin_db"
if database not in bd.databases:
bd.Database(database).register()
# iterate over functional flows and create a dataset for each:
code_list=[]
for fun_name, fun_ex in self.model._technosphere.items():
if not fun_ex.functional:
continue
now= datetime.datetime.now()
if identifier==None:
code= f'{self.name}_{fun_name}_{now}'
else:
code= f'{fun_name}_{identifier}'
code_list.append(code)
#create a new node in brightway:
node = bd.Database(database).new_node(
name= fun_name,
unit= fun_ex.model_unit,
code= code,
**self.model.params
)
node.save()
if callable(fun_ex.allocationfactor):
allocationfactor= fun_ex.allocationfactor()
else:
allocationfactor= fun_ex.allocationfactor
#iterate over the technosphere flows and create exchanges to the brightway node for each flow:
for name, ex in self.model._technosphere.items():
if ex.functional: # only handle not functional flows
continue
#check if technosphere is linked to a bw activity
if not isinstance(ex.target, bd.backends.proxies.Activity) and ex.source == self.model:
continue
if not isinstance(ex.source, bd.backends.proxies.Activity) and ex.target == self.model:
continue
allocated_amount= (self._get_flow_value(ex)*allocationfactor /
self._get_flow_value(fun_ex))
#dataset correction for original linked dataset.
if ex.dataset_correction != None:
allocated_amount= allocated_amount*ex.dataset_correction
if ex.target == self.model:
node.new_exchange(
input= ex.source,
amount=allocated_amount,
type = 'technosphere',
).save()
elif ex.source == self.model:
node.new_exchange(
input= ex.target,
amount=allocated_amount,
type = 'technosphere',
).save()
for name, ex in self.model._biosphere.items():
allocated_amount= (self._get_flow_value(ex)*allocationfactor /
self._get_flow_value(fun_ex))
if ex.target == self.model:
node.new_exchange(
input= ex.source,
amount=allocated_amount,
type = 'biosphere',
).save()
elif ex.source == self.model:
node.new_exchange(
input= ex.target,
amount=allocated_amount,
type = 'biosphere',
).save()
node.new_exchange(
input=node,
amount= 1,
type = 'production',
).save()
return code_list