"""IDF class module.
Various functions for processing EnergyPlus models and retrieving results in
different forms.
"""
import io
import itertools
import logging as lg
import math
import os
import re
import shutil
import sqlite3
import subprocess
import time
import uuid
import warnings
from collections import defaultdict
from io import IOBase, StringIO
from itertools import chain
from math import isclose
from typing import IO, Iterable, Optional, Union
import eppy
import pandas as pd
from energy_pandas import EnergySeries
from eppy.bunch_subclass import BadEPFieldError
from eppy.EPlusInterfaceFunctions.eplusdata import Eplusdata
from eppy.idf_msequence import Idf_MSequence
from eppy.modeleditor import IDDNotSetError, namebunch, newrawobject
from geomeppy import IDF as GeomIDF
from geomeppy.geom.polygons import Polygon3D
from geomeppy.patches import EpBunch, idfreader1, obj2bunch
from geomeppy.recipes import (
_has_correct_orientation,
_is_window,
window_vertices_given_wall,
)
from pandas import DataFrame, Series
from pandas.errors import ParserError
from path import Path
from tabulate import tabulate
from tqdm import tqdm
from archetypal.eplus_interface.basement import BasementThread
from archetypal.eplus_interface.energy_plus import EnergyPlusThread
from archetypal.eplus_interface.exceptions import (
EnergyPlusProcessError,
EnergyPlusVersionError,
EnergyPlusWeatherError,
)
from archetypal.eplus_interface.expand_objects import ExpandObjectsThread
from archetypal.eplus_interface.slab import SlabThread
from archetypal.eplus_interface.transition import TransitionThread
from archetypal.eplus_interface.version import EnergyPlusVersion
from archetypal.idfclass.meters import Meters
from archetypal.idfclass.outputs import Outputs
from archetypal.idfclass.reports import get_report
from archetypal.idfclass.util import get_idf_version, hash_model
from archetypal.idfclass.variables import Variables
from archetypal.reportdata import ReportData
from archetypal.utils import log, settings
def find_and_launch(app_name, app_path_guess, file_path):
app_path = shutil.which(app_name, path=app_path_guess)
assert app_path is not None, f"Could not find {app_name} at '{app_path_guess}'"
assert file_path.exists(), f"{file_path} does not exist."
subprocess.Popen(
(
app_path,
file_path,
)
)
[docs]class IDF(GeomIDF):
"""Class for loading and parsing idf models.
This is the starting point to run simulations and retrieving results.
Wrapper over the geomeppy.IDF class and subsequently the
eppy.modeleditor.IDF class.
"""
IDD = {}
IDDINDEX = {}
BLOCK = {}
OUTPUTTYPES = ("standard", "nocomment1", "nocomment2", "compressed")
# dependencies: dict of <dependant value: independent value>
_dependencies = {
"idd_info": ["iddname", "idfname"],
"idd_index": ["iddname", "idfname"],
"idd_version": ["iddname", "idfname"],
"idfobjects": ["iddname", "idfname"],
"block": ["iddname", "idfname"],
"model": ["iddname", "idfname"],
"sql": [
"as_version",
"annual",
"design_day",
"epw",
"idfname",
"tmp_dir",
],
"htm": [
"as_version",
"annual",
"design_day",
"epw",
"idfname",
"tmp_dir",
],
"meters": [
"idfobjects",
"epw",
"annual",
"design_day",
"readvars",
"as_version",
],
"variables": [
"idfobjects",
"epw",
"annual",
"design_day",
"readvars",
"as_version",
],
"sim_id": [
"idfobjects",
"epw",
"annual",
"design_day",
"readvars",
"as_version",
],
"schedules_dict": ["idfobjects"],
"partition_ratio": ["idfobjects"],
"net_conditioned_building_area": ["idfobjects"],
"energyplus_its": ["annual", "design_day"],
"tmp_dir": ["idfobjects"],
}
_independant_vars = set(chain(*list(_dependencies.values())))
_dependant_vars = set(_dependencies.keys())
_initial_postition = itertools.count(start=1)
def _reset_dependant_vars(self, name):
_reverse_dependencies = {}
for k, v in self._dependencies.items():
for x in v:
_reverse_dependencies.setdefault(x, []).append(k)
for var in _reverse_dependencies[name]:
super().__setattr__(f"_{var}", None)
def __setattr__(self, key, value):
"""Set attribute."""
propobj = getattr(self.__class__, key, None)
if isinstance(propobj, property):
if propobj.fset is None:
raise AttributeError("Cannot set attribute")
# self.__set_on_dependencies(key.strip("_"), value)
else:
propobj.fset(self, value)
self.__set_on_dependencies(key, value)
else:
self.__set_on_dependencies(key, value)
def __set_on_dependencies(self, key, value):
if key in self._dependant_vars:
raise AttributeError("Cannot set this value.")
if key in self._independant_vars:
self._reset_dependant_vars(key)
key = f"_{key}"
super(IDF, self).__setattr__(key, value)
def __init__(
self,
idfname: Optional[Union[str, IO, Path]] = None,
epw=None,
as_version: Union[str, EnergyPlusVersion] = settings.ep_version,
annual=False,
design_day=False,
expandobjects=False,
convert=False,
verbose=settings.log_console,
readvars=True,
prep_outputs=True,
include=None,
custom_processes=None,
output_suffix="L",
epmacro=False,
keep_data=True,
keep_data_err=True,
position=0,
name=None,
output_directory=None,
outputtype="standard",
iddname: Optional[Union[str, IO, Path]] = None,
**kwargs,
):
"""Initialize an IDF object.
Args:
output_directory:
idfname (str or os.PathLike): The path of the idf file to read. If none,
an in-memory IDF object is generated.
epw (str or os.PathLike): The weather-file. If None, epw can be specified in
IDF.simulate().
as_version (str): Specify the target EnergyPlus version for the IDF model.
If as_version is higher than the file version, the model will be
transitioned to the target version. This will not overwrite the file
unless IDF.save() is invoked. See :meth:`IDF.save`,
:meth:`IDF.saveas` and :meth:`IDF.savecopy` for other IO operations
on IDF objects.
annual (bool): If True then force annual simulation (default: False).
design_day (bool): Force design-day-only simulation (default: False).
expandobjects (bool): Run ExpandObjects prior to simulation (default: True).
convert (bool): If True, only convert IDF->epJSON or epJSON->IDF.
outputtype (str): Specifies the idf string representation of the model.
Choices are: "standard", "nocomment1", "nocomment2", "compressed".
EnergyPlus args:
tmp_dir=None,
as_version=None,
prep_outputs=True,
include=None,
keep_original=True,
"""
# Set independents to there original values
if include is None:
include = []
self.idfname = idfname
self.epw = epw
self.file_version = kwargs.get("file_version", None)
self.as_version = as_version if as_version else settings.ep_version
self._custom_processes = custom_processes
self._include = include
self.keep_data_err = keep_data_err
self._keep_data = keep_data
self.output_suffix = output_suffix
self.verbose = verbose
self.readvars = readvars
self.expandobjects = expandobjects
self.convert = convert
self.epmacro = epmacro
self.design_day = design_day
self.annual = annual
self.prep_outputs = prep_outputs
self._position = position
self.output_prefix = None
self.name = (
name
if name is not None
else self.idfname.basename()
if isinstance(self.idfname, Path)
else None
)
self.output_directory = output_directory
# Set dependants to None
self._iddname = Path(iddname) if iddname is not None else None
self._idd_info = None
self._idd_index = None
self._idd_version = None
self._idfobjects = None
self._block = None
self._model = None
self._sql = None
self._sql_file = None
self._htm = None
self._original_ep_version = None
self._schedules_dict = None
self._outputs = None
self._partition_ratio = None
self._area_conditioned = None
self._area_unconditioned = None
self._area_total = None
self._schedules = None
self._meters = None
self._variables = None
self._energyplus_its = 0
self._sim_id = None
self._sim_timestamp = None
self.outputtype = outputtype
self.original_idfname = self.idfname # Save original
try:
# load the idf object by asserting self.idd_info
assert self.idd_info
except Exception as e:
raise e
else:
self._original_cache = hash_model(self)
if self.file_version < self.as_version:
self.upgrade(to_version=self.as_version, overwrite=False)
finally:
# Set model outputs
self._outputs = Outputs(idf=self)
if self.prep_outputs:
(
self._outputs.add_basics()
.add_umi_template_outputs()
.add_custom(outputs=self.prep_outputs)
.add_profile_gas_elect_ouputs()
.apply()
)
@property
def outputtype(self):
return self._outputtype
@outputtype.setter
def outputtype(self, value):
"""Get or set the outputtype for the idf string representation of self."""
assert value in self.OUTPUTTYPES, (
f'Invalid input "{value}" for output_type.'
f"\nOutput type must be one of the following: {self.OUTPUTTYPES}"
)
self._outputtype = value
def __str__(self):
"""Return the name of IDF model."""
return self.name
def __repr__(self):
"""Return a representation of self."""
if self.sim_info is not None:
sim_info = tabulate(self.sim_info.T, tablefmt="orgtbl")
sim_info += f"\n\tFiles at '{self.simulation_dir}'"
else:
sim_info = "\tNot yet simulated"
body = "\n".join([f"IDF object {self.name}", f"at {self.idfname}"])
body += f"\n\tVersion {self.file_version}\nSimulation Info:\n"
body += sim_info
return f"<{body}>"
def __copy__(self):
"""Get a copy of self."""
return self.copy()
[docs] @classmethod
def from_example_files(cls, example_name, epw=None, **kwargs):
"""Load an IDF model from the ExampleFiles folder by name.
Examples:
idf = IDF.from_example_files(
"minimal",
"USA_IL_Chicago-OHare.Intl.AP.725300_TMY3.epw"
)
Args:
example_name (str): The name of the example file to load.
epw (str): The name of the weather file contained the WeatherData folder
or the path to a specific weather file.
**kwargs: keyword arguments passed to the IDF constructor.
Returns:
(IDF): An IDF model.
"""
from pathlib import Path as Pathlib
example_name = Path(example_name)
file = next(
iter(
Pathlib(
EnergyPlusVersion.current().current_install_dir / "ExampleFiles"
).rglob(f"{example_name.stem}.idf")
)
)
if epw is not None:
epw = Path(epw)
if not epw.exists():
epw = next(
iter(
Pathlib(
EnergyPlusVersion.current().current_install_dir
/ "WeatherData"
).rglob(f"{epw.stem}.epw")
),
epw,
)
return cls(file, epw=epw, **kwargs)
[docs] def setiddname(self, iddname, testing=False):
"""Set EnergyPlus IDD path for model.
Sets the version of EnergyPlus which is to be used by eppy.
Args:
iddname (str): Path to the IDD file.
testing (bool):
"""
self.iddname = iddname
self.idd_info = None
self.block = None
[docs] def read(self):
"""Read the IDF file and the IDD file.
If the IDD file had already been read, it will not be read again.
Read populates the following data structures:
- idfobjects : list
- model : list
- idd_info : list
- idd_index : dict
"""
if self.getiddname() is None:
errortxt = (
"IDD file needed to read the idf file. "
"Set it using IDF.setiddname(iddfile)"
)
raise IDDNotSetError(errortxt)
readout = idfreader1(
self.idfname, self.iddname, self, commdct=self.idd_info, block=self.block
)
(self.idfobjects, block, self.model, idd_info, idd_index, idd_version) = readout
self.setidd(idd_info, idd_index, block, idd_version)
[docs] def getiddname(self):
"""Get the name of the current IDD used by eppy."""
return self.iddname
[docs] def setidd(self, iddinfo, iddindex, block, idd_version):
"""Set the IDD to be used by eppy.
Args:
iddindex:
iddinfo (list): Comments and metadata about fields in the IDD.
block (list): Field names in the IDD.
"""
self.idd_info = iddinfo
self.block = block
self.idd_index = iddindex
self.idd_version = idd_version
def _read_idf(self):
"""Read idf file and return bunches."""
self._idd_info = IDF.IDD.get(str(self.as_version), None)
self._idd_index = IDF.IDDINDEX.get(str(self.as_version), None)
self._block = IDF.BLOCK.get(str(self.as_version), None)
bunchdt, block, data, commdct, idd_index, versiontuple = idfreader1(
self.idfname, self.iddname, self, commdct=self._idd_info, block=self._block
)
self._block = IDF.BLOCK[str(self.as_version)] = block
self._idd_info = IDF.IDD[str(self.as_version)] = commdct
self._idd_index = IDF.IDDINDEX[str(self.as_version)] = idd_index
self._idfobjects = bunchdt
self._model = data
self._idd_version = versiontuple
@property
def block(self) -> list:
"""Set EnergyPlus field ID names of the IDF from the IDD."""
if self._block is None:
self._read_idf()
return self._block
@property
def idd_info(self) -> list:
"""Set comments and metadata about fields in the IDD."""
if self._idd_info is None:
self._read_idf()
return self._idd_info
@property
def idd_index(self) -> dict:
"""Set pair of dicts used for fast lookups of names of groups of objects."""
if self._idd_index is None:
self._read_idf()
return self._idd_index
@property
def idfobjects(self) -> dict:
"""Set dict of lists of idf_MSequence objects in the IDF."""
if self._idfobjects is None:
self._read_idf()
return self._idfobjects
@property
def model(self) -> Eplusdata:
"""Set Eplusdata object containing representations of IDF objects."""
if self._model is None:
self._read_idf()
return self._model
@property
def idd_version(self) -> tuple:
"""Return the version of the iddname as a tuple."""
if self._idd_version is None:
self._read_idf()
return self._idd_version
@property
def iddname(self) -> Path:
"""Get or set the iddname path used to parse the idf model."""
if self._iddname is None:
if self.file_version > self.as_version:
raise EnergyPlusVersionError(
f"{self.as_version} cannot be lower then "
f"the version number set in the file: {self.file_version}"
)
self._iddname = self.file_version.current_idd_path
return self._iddname
@iddname.setter
def iddname(self, value):
if value is not None:
value = Path(value)
self._iddname = value
@property
def file_version(self) -> EnergyPlusVersion:
"""Return the :class:`EnergyPlusVersion` of the idf text file."""
if self._file_version is None:
return EnergyPlusVersion(get_idf_version(self.idfname))
else:
return self._file_version
@file_version.setter
def file_version(self, value):
if value is None:
self._file_version = None
else:
self._file_version = EnergyPlusVersion(value)
@property
def custom_processes(self) -> list:
"""Return list of callables. Called on the output files."""
return self._custom_processes
@property
def include(self) -> list:
"""Return list of external files attached to model."""
return self._include
@property
def keep_data_err(self) -> bool:
"""bool: If True, error files are copied back into self.output_folder."""
return self._keep_data_err
@keep_data_err.setter
def keep_data_err(self, value):
if not isinstance(value, bool):
raise TypeError("'keep_data_err' needs to be a bool")
self._keep_data_err = value
@property
def keep_data(self) -> bool:
"""bool: If True, keep data folder."""
return self._keep_data
# region User-Defined Properties (have setter)
@property
def output_suffix(self) -> str:
"""str: The suffix style for output file names (default: L).
- L: Legacy (e.g., eplustbl.csv)
- C: Capital (e.g., eplusTable.csv)
- D: Dash (e.g., eplus-table.csv)
"""
return self._output_suffix
@output_suffix.setter
def output_suffix(self, value):
choices = ["L", "C", "D"]
if value not in choices:
raise ValueError(f"Choices of 'output_suffix' are {choices}")
self._output_suffix = value
@property
def idfname(self) -> Union[Path, StringIO]:
"""Path: The path of the active (parsed) idf model."""
if self._idfname is None:
idfname = StringIO(f"VERSION, {self.as_version};")
self._idfname = idfname
self._reset_dependant_vars("idfname")
else:
if isinstance(self._idfname, StringIO):
pass
else:
self._idfname = Path(self._idfname).expand()
return self._idfname
@idfname.setter
def idfname(self, value):
if not value:
self._idfname = None
elif not isinstance(value, (str, os.PathLike, StringIO, IOBase)):
raise ValueError(f"IDF path must be Path-Like, not {type(value)}")
elif isinstance(value, (str, os.PathLike)):
self._idfname = Path(value).expand()
else:
self._idfname = value
self._reset_dependant_vars("idfname")
@property
def epw(self) -> Path:
"""Path: The weather file path."""
if self._epw is not None:
return Path(self._epw).expand()
@epw.setter
def epw(self, value):
"""Set the epw file path."""
if value:
self._epw = Path(value).expand()
else:
self._epw = None
@property
def verbose(self):
"""bool: If True, print outputs to logging module.
See Also:
:ref:`archetypal.utils.config`
"""
return self._verbose
@verbose.setter
def verbose(self, value):
"""Set verbose status for simulation."""
if not isinstance(value, bool):
raise TypeError("'verbose' needs to be a bool")
self._verbose = value
@property
def expandobjects(self) -> bool:
"""bool: If True, run ExpandObjects prior to simulation."""
return self._expandobjects
@expandobjects.setter
def expandobjects(self, value):
"""Set expandobjects."""
if not isinstance(value, bool):
raise TypeError("'expandobjects' needs to be a bool")
self._expandobjects = value
@property
def readvars(self) -> bool:
"""bool: If True, run ReadVarsESO after simulation."""
return self._readvars
@readvars.setter
def readvars(self, value):
"""Set readvars."""
if not isinstance(value, bool):
raise TypeError("'readvars' needs to be a bool")
self._readvars = value
@property
def epmacro(self) -> bool:
"""bool: If True, run EPMacro prior to simulation."""
return self._epmacro
@epmacro.setter
def epmacro(self, value):
"""Set epmacro."""
if not isinstance(value, bool):
raise TypeError("'epmacro' needs to be a bool")
self._epmacro = value
@property
def design_day(self) -> bool:
"""bool: If True, force design-day-only simulation."""
return self._design_day
@design_day.setter
def design_day(self, value):
"""Set design_day."""
if not isinstance(value, bool):
raise TypeError("'design_day' needs to be a bool")
self._design_day = value
@property
def annual(self) -> bool:
"""bool: If True, force annual simulation."""
return self._annual
@annual.setter
def annual(self, value):
"""Set annual."""
if not isinstance(value, bool):
raise TypeError("'annual' needs to be a bool")
self._annual = value
@property
def convert(self) -> bool:
"""bool: If True, only convert IDF->epJSON or epJSON->IDF.
Dependent on input file type. No simulation.
"""
return self._convert
@convert.setter
def convert(self, value):
if not isinstance(value, bool):
raise TypeError("'convert' needs to be a bool")
self._convert = value
@property
def prep_outputs(self):
"""Bool or set list of custom outputs."""
return self._prep_outputs
@prep_outputs.setter
def prep_outputs(self, value):
self._prep_outputs = value
@property
def as_version(self):
"""Specify the desired :class:`EnergyPlusVersion` for the IDF model."""
if self._as_version is None:
self._as_version = EnergyPlusVersion.current()
return EnergyPlusVersion(self._as_version)
@as_version.setter
def as_version(self, value):
# Parse value and check if above or bellow
self._as_version = EnergyPlusVersion(value)
@property
def output_directory(self) -> Path:
"""Path: The output directory based on the hashing of the original file.
Notes:
The hashing is performed before transitions or modifications. The directory
is not created! Use `self.output_directory.makedir_p()` to create it
without an error if it exists.
"""
if self._output_directory is None:
cache_filename = self._original_cache
output_directory = settings.cache_folder / cache_filename
self._output_directory = output_directory.expand()
return Path(self._output_directory)
@output_directory.setter
def output_directory(self, value):
if value:
value = Path(value)
self._output_directory = value
@property
def output_prefix(self) -> str:
"""str: Prefix for output file names (default: eplus)."""
if self._output_prefix is None:
self._output_prefix = "eplus"
return self._output_prefix
@output_prefix.setter
def output_prefix(self, value):
if value and not isinstance(value, str):
raise TypeError("'output_prefix' needs to be a string")
self._output_prefix = value
@property
def sim_id(self) -> str:
"""str: The unique Id of the simulation.
Based on a subset of hashed variables:
- The idf model itself.
- epw
- annual
- design_day
- readvars
- as_version
"""
self._sim_id = hash_model(
self,
epw=self.epw,
annual=self.annual,
design_day=self.design_day,
readvars=self.readvars,
ep_version=self.as_version,
include=self.include,
)
return self._sim_id
# endregion
@property
def sim_info(self) -> Optional[DataFrame]:
"""DataFrame: Unique number generated for a simulation."""
if self.sql_file is not None:
with sqlite3.connect(self.sql_file) as conn:
sql_query = """select * from Simulations"""
sim_info = pd.read_sql_query(sql_query, con=conn)
return sim_info
else:
return None
@property
def sim_timestamp(self) -> Union[str, Series]:
"""Return the simulation timestamp or "Never" if not ran yet."""
if self.sim_info is None:
return "Never"
else:
return self.sim_info.TimeStamp
@property
def position(self) -> int:
"""int: Position for the progress bar."""
return self._position
@property
def idfversionupdater_dir(self) -> Path:
"""Path: The path of the IDFVersionUpdater folder.
Uses the current module's ep_version.
"""
return (
EnergyPlusVersion.current().current_install_dir
/ "PreProcess"
/ "IDFVersionUpdater"
).expand()
@property
def name(self) -> str:
"""str: Name of the idf model.
Can include the extension (.idf).
"""
return self._name
@name.setter
def name(self, value):
if value is None:
value = f"{uuid.uuid1()}.idf"
elif ".idf" not in value:
value = Path(value).stem + ".idf"
self._name = value
[docs] def sql(self) -> dict:
"""Get the sql table report."""
if self._sql is None:
try:
sql_dict = get_report(
self.idfname,
self.simulation_dir,
output_report="sql",
output_prefix=self.output_prefix,
)
except FileNotFoundError:
# check if htm output is in file
sql_object = self.anidfobject(
key="Output:SQLite".upper(), Option_Type="SimpleAndTabular"
)
if sql_object not in self.idfobjects["Output:SQLite".upper()]:
self.addidfobject(sql_object)
return self.simulate().sql()
except Exception as e:
raise e
else:
self._sql = sql_dict
return self._sql
[docs] def htm(self) -> dict:
"""Get the htm table report."""
if self._htm is None:
try:
htm_dict = get_report(
self.idfname,
self.simulation_dir,
output_report="htm",
output_prefix=self.output_prefix,
)
except FileNotFoundError:
return self.simulate().htm()
else:
self._htm = htm_dict
return self._htm
@property
def energyplus_its(self) -> int:
"""Return number of iterations needed to complete simulation."""
if self._energyplus_its is None:
self._energyplus_its = 0
return self._energyplus_its
[docs] def open_htm(self):
"""Open .htm file in browser."""
import webbrowser
html, *_ = self.simulation_dir.files("*.htm")
webbrowser.open(html.abspath())
[docs] def open_idf(self):
"""Open file in correct version of Ep-Launch."""
if isinstance(self.idfname, StringIO):
# make a temporary file if inmemery IDF.
filepath = self.savecopy(self.output_directory.makedirs_p() / self.name)
else:
filepath = self.idfname
app_path_guess = (
self.file_version.current_install_dir / "PreProcess" / "IDFEditor"
)
find_and_launch("IDFEditor", app_path_guess, filepath.abspath())
[docs] def open_last_simulation(self):
"""Open last simulation in Ep-Launch."""
filepath, *_ = self.simulation_dir.files("*.idf")
app_path_guess = self.file_version.current_install_dir
find_and_launch("EP-Launch", app_path_guess, filepath.abspath())
[docs] def open_err(self):
"""Open last simulation err file in texteditor."""
import webbrowser
filepath, *_ = self.simulation_dir.files("*.err")
webbrowser.open(filepath.abspath())
[docs] def open_mdd(self):
"""Open .mdd file in browser.
This file shows all the report meters along with their “availability” for the
current input file.
"""
import webbrowser
mdd, *_ = self.simulation_dir.files("*.mdd")
webbrowser.open(mdd.abspath())
[docs] def open_mtd(self):
"""Open .mtd file in browser.
This file contains the “meter details” for the run. This shows what report
variables are on which meters and vice versa – which meters contain what
report variables.
"""
import webbrowser
mtd, *_ = self.simulation_dir.files("*.mtd")
webbrowser.open(mtd.abspath())
@property
def sql_file(self):
"""Get the sql file path."""
try:
file, *_ = self.simulation_dir.files("*out.sql")
except (FileNotFoundError, ValueError):
return None
return file.expand()
@property
def mtd_file(self) -> Path:
"""Get the mtd file path."""
try:
file, *_ = self.simulation_dir.files("*.mtd")
except (FileNotFoundError, ValueError):
return self.simulate().mtd_file
return file.expand()
@property
def net_conditioned_building_area(self) -> float:
"""Return the total conditioned area of a building.
Takes into account zone multipliers.
"""
if self._area_conditioned is None:
if self.simulation_dir.exists():
with sqlite3.connect(self.sql_file) as conn:
sql_query = """
SELECT t.Value
FROM TabularDataWithStrings t
WHERE TableName == 'Building Area'
and ColumnName == 'Area'
and RowName == 'Net Conditioned Building Area';
"""
(res,) = conn.execute(sql_query).fetchone()
self._area_conditioned = float(res)
else:
area = 0
zones = self.idfobjects["ZONE"]
zone: EpBunch
for zone in zones:
for surface in zone.zonesurfaces:
if hasattr(surface, "tilt"):
if surface.tilt == 180.0:
part_of = int(
zone.Part_of_Total_Floor_Area.upper() != "NO"
)
multiplier = float(
zone.Multiplier if zone.Multiplier != "" else 1
)
area += surface.area * multiplier * part_of
self._area_conditioned = area
return self._area_conditioned
@property
def unconditioned_building_area(self) -> float:
"""Return the Unconditioned Building Area."""
if self._area_unconditioned is None:
if self.simulation_dir.exists():
with sqlite3.connect(self.sql_file) as conn:
sql_query = """
SELECT t.Value
FROM TabularDataWithStrings t
WHERE TableName == 'Building Area'
and ColumnName == 'Area'
and RowName == 'Unconditioned Building Area';
"""
(res,) = conn.execute(sql_query).fetchone()
self._area_unconditioned = float(res)
else:
area = 0
zones = self.idfobjects["ZONE"]
zone: EpBunch
for zone in zones:
for surface in zone.zonesurfaces:
if hasattr(surface, "tilt"):
if surface.tilt == 180.0:
part_of = int(
zone.Part_of_Total_Floor_Area.upper() == "NO"
)
multiplier = float(
zone.Multiplier if zone.Multiplier != "" else 1
)
area += surface.area * multiplier * part_of
self._area_unconditioned = area
return self._area_unconditioned
@property
def total_building_area(self) -> float:
"""Return the Total Building Area."""
if self._area_total is None:
if self.simulation_dir.exists():
with sqlite3.connect(self.sql_file) as conn:
sql_query = """
SELECT t.Value
FROM TabularDataWithStrings t
WHERE TableName == 'Building Area'
and ColumnName == 'Area' and RowName == 'Total Building Area';
"""
(res,) = conn.execute(sql_query).fetchone()
self._area_total = float(res)
else:
area = 0
zones = self.idfobjects["ZONE"]
zone: EpBunch
for zone in zones:
for surface in zone.zonesurfaces:
if hasattr(surface, "tilt"):
if surface.tilt == 180.0:
multiplier = float(
zone.Multiplier if zone.Multiplier != "" else 1
)
area += surface.area * multiplier
self._area_total = area
return self._area_total
@property
def total_building_volume(self):
return NotImplemented()
@staticmethod
def _get_volume_from_surfs(zone_surfs):
"""Calculate the volume of a zone only and only if the surfaces are such
that you can find a point inside so that you can connect every vertex to
the point without crossing a face.
Adapted from: https://stackoverflow.com/a/19125446
Args:
zone_surfs (list): List of zone surfaces (EpBunch)
"""
vol = 0
for surf in zone_surfs:
polygon_d = Polygon3D(surf.coords) # create Polygon3D from surf
n = len(polygon_d.vertices_list)
v2 = polygon_d[0]
x2 = v2.x
y2 = v2.y
z2 = v2.z
for i in range(1, n - 1):
v0 = polygon_d[i]
x0 = v0.x
y0 = v0.y
z0 = v0.z
v1 = polygon_d[i + 1]
x1 = v1.x
y1 = v1.y
z1 = v1.z
# Add volume of tetrahedron formed by triangle and origin
vol += math.fabs(
x0 * y1 * z2
+ x1 * y2 * z0
+ x2 * y0 * z1
- x0 * y2 * z1
- x1 * y0 * z2
- x2 * y1 * z0
)
return vol / 6.0
@property
def partition_ratio(self) -> float:
"""float: Lineal meters of partitions per m2 of floor area."""
if self._partition_ratio is None:
partition_lineal = 0
zones = self.idfobjects["ZONE"]
zone: EpBunch
for zone in zones:
for surface in [
surf
for surf in zone.zonesurfaces
if surf.key.upper() not in ["INTERNALMASS", "WINDOWSHADINGCONTROL"]
]:
if hasattr(surface, "tilt"):
if (
surface.tilt == 90.0
and surface.Outside_Boundary_Condition != "Outdoors"
):
multiplier = float(
zone.Multiplier if zone.Multiplier != "" else 1
)
partition_lineal += surface.width * multiplier
self._partition_ratio = partition_lineal / max(
self.net_conditioned_building_area, self.unconditioned_building_area
)
return self._partition_ratio
@property
def simulation_files(self) -> list:
"""list: The list of files generated by the simulation."""
try:
return self.simulation_dir.files()
except FileNotFoundError:
return []
@property
def simulation_dir(self) -> Path:
"""Path: The path where simulation results are stored."""
try:
return (self.output_directory / self.sim_id).expand()
except AttributeError:
return Path()
@property
def schedules_dict(self) -> dict:
"""Return the dict of {NAME: schedule} in the model."""
if self._schedules_dict is None:
self._schedules_dict = self._get_all_schedules()
return self._schedules_dict
@property
def outputs(self) -> Outputs:
"""Return the Outputs class associated with the model."""
return self._outputs
@property
def day_of_week_for_start_day(self):
"""Get day of week for start day for the first found RUNPERIOD.
Monday = 0 .. Sunday = 6
"""
import calendar
run_period = next(iter(self.idfobjects["RUNPERIOD"]), None)
if run_period:
day = run_period["Day_of_Week_for_Start_Day"]
else:
log("model does not contain a 'RunPeriod'. Defaulting to Sunday.")
day = "Sunday"
if day.lower() == "sunday":
return calendar.SUNDAY
elif day.lower() == "monday":
return calendar.MONDAY
elif day.lower() == "tuesday":
return calendar.TUESDAY
elif day.lower() == "wednesday":
return calendar.WEDNESDAY
elif day.lower() == "thursday":
return calendar.THURSDAY
elif day.lower() == "friday":
return calendar.FRIDAY
elif day.lower() == "saturday":
return calendar.SATURDAY
else:
# field is null
return 6 # E+ default is Sunday
@property
def meters(self) -> Meters:
"""List of available meters for the :class:`IDF` model.
The :class:`IDF` model must be simulated once (to retrieve the .mdd file).
The listed meters may or may not be included in the idf file. If they are
not, the output is added to the file and the model is simulated again. The
output is appended to the :attr:`IDF.idfobjects` list, but will not overwrite
the original idf file, unless :meth:`IDF.save` is called.
Hint:
Call `idf.meters.<output_group>.<meter_name>.values()` to retreive a
time-series based on the :class:`pandas.Series` class which can be plotted.
See :class:`Meter` and :class:`EnergySeries` for more information.
Example:
The IDF.meters attribute is populated with meters categories
(`Output:Meter` or `Output:Meter:Cumulative`) and each category is
populated with all the available meters.
.. code-block::
>>> IDF.meters.OutputMeter.WaterSystems__MainsWater
>>> IDF.meters.OutputMeterCumulative.WaterSystems__MainsWater
"""
if self._meters is None:
try:
self.simulation_dir.files("*.mdd")
except FileNotFoundError:
raise Exception(
"call IDF.simulate() at least once to get a list of "
"possible meters"
)
else:
self._meters = Meters(self)
return self._meters
@property
def variables(self):
"""List of available meters for the :class:`IDF` model.
The :class:`IDF` model must be simulated once (to retrieve the .mdd file).
The listed meters may or may not be included in the idf file. If they are
not, the output is added to the file and the model is simulated again. The
output is appended to the :attr:`IDF.idfobjects` list, but will not overwrite
the
original idf file, unless :meth:`IDF.save` is called.
Hint:
Call `idf.meters.<output_group>.<meter_name>.values()` to retreive a
time-series based on the :class:`pandas.Series` class which can be plotted.
See :class:`Meter` and :class:`EnergySeries` for more information.
Example:
The IDF.meters attribute is populated with meters categories
(`Output:Meter` or `Output:Meter:Cumulative`) and each category is
populated with all the available meters.
.. code-block::
>>> IDF.variables.OutputVariable
>>> IDF.variables.OutputVariable
"""
if self._variables is None:
try:
self.simulation_dir.files("*.rdd")
except FileNotFoundError:
return "call IDF.simulate() to get a list of possible variables"
else:
self._variables = Variables(self)
return self._variables
[docs] def simulate(self, force=False, **kwargs):
"""Execute EnergyPlus.
Specified kwargs overwrite IDF parameters. ExpandObjects, Basement and Slab
preprocessors are ran before EnergyPlus.
Does not return anything.
Args:
force (bool): Force simulation even though results exist in
`self.simulation_dir`.
Keyword Args:
eplus_file (str): path to the idf file.
weather_file (str): path to the EPW weather file.
output_directory (str, optional): path to the output folder.
ep_version (str, optional): EnergyPlus executable version to use, eg: 9-2-0
output_report: 'sql' or 'htm'
prep_outputs (bool or list, optional): if True, meters and variable
outputs will be appended to the idf files. Can also specify custom
outputs as list of ep-object outputs.
keep_data (bool): If True, files created by EnergyPlus are saved to the
tmp_dir.
annual (bool): If True then force annual simulation (default: False)
design_day (bool): Force design-day-only simulation (default: False)
epmacro (bool): Run EPMacro prior to simulation (default: False)
expandobjects (bool): Run ExpandObjects prior to simulation (default:
True)
readvars (bool): Run ReadVarsESO after simulation (default: False)
output_prefix (str, optional): Prefix for output file names.
output_suffix (str, optional): Suffix style for output file names
(default: L) Choices are:
- L: Legacy (e.g., eplustbl.csv)
- C: Capital (e.g., eplusTable.csv)
- D: Dash (e.g., eplus-table.csv)
version (bool, optional): Display version information (default: False)
verbose (str): Set verbosity of runtime messages (default: v) v: verbose
q: quiet
keep_data_err (bool): If True, errored directory where simulation
occurred is
kept.
include (str, optional): List input files that need to be copied to the
simulation directory. If a string is provided, it should be in a glob
form (see :meth:`pathlib.Path.glob`).
process_files (bool): If True, process the output files and load to a
:class:`~pandas.DataFrame`. Custom processes can be passed using the
:attr:`custom_processes` attribute.
custom_processes (dict(Callback)): if provided, it has to be a
dictionary with the keys being a glob (see
:meth:`pathlib.Path.glob`), and
the value a Callback taking as signature `callback(file: str,
working_dir, simulname) -> Any` All the file matching this glob will
be processed by this callback. Note: they will still be processed by
pandas.read_csv (if they are csv files), resulting in duplicate. The
only way to bypass this behavior is to add the key "*.csv" to that
dictionary.
return_idf (bool): If True, returns the :class:`IDF` object part of the
return tuple.
return_files (bool): It True, all files paths created by the EnergyPlus
run are returned.
Raises:
EnergyPlusProcessError: If an issue occurs with the execution of the
energyplus command.
See Also:
:attr:`IDF.simulation_files`, :attr:`IDF.processed_results` for simulation
outputs.
"""
if (
self.simulation_dir.exists() and not force
): # don't simulate if results exists
return self
# First, update keys with new values
for key, value in kwargs.items():
if f"_{key}" in self.__dict__.keys():
setattr(self, key, value)
if self.as_version != EnergyPlusVersion(self.idd_version):
raise EnergyPlusVersionError(
None, self.idfname, EnergyPlusVersion(self.idd_version), self.as_version
)
include = self.include
if isinstance(include, str):
include = Path().abspath().glob(include)
elif include is not None:
include = [Path(file) for file in include]
# check if a weather file is defined
if not getattr(self, "epw", None):
raise EnergyPlusWeatherError(
f"No weather file specified with {self}. Set 'epw' in IDF("
f"filename, epw='weather.epw').simulate() or in IDF.simulate("
f"epw='weather.epw')"
)
# Todo: Add EpMacro Thread -> if exist in.imf "%program_path%EPMacro"
# Run the expandobjects program if necessary
tmp = (
self.output_directory.makedirs_p() / "expandobjects_run_"
+ str(uuid.uuid1())[0:8]
).mkdir()
# Run the ExpandObjects preprocessor program
expandobjects_thread = ExpandObjectsThread(self, tmp)
try:
expandobjects_thread.start()
expandobjects_thread.join()
# Give time to the subprocess to finish completely
while expandobjects_thread.is_alive():
time.sleep(1)
except (KeyboardInterrupt, SystemExit):
expandobjects_thread.stop()
finally:
tmp.rmtree(ignore_errors=True)
e = expandobjects_thread.exception
if e is not None:
raise e
# Run the Basement preprocessor program if necessary
tmp = (
self.output_directory.makedirs_p() / "runBasement_run_"
+ str(uuid.uuid1())[0:8]
).mkdir()
basement_thread = BasementThread(self, tmp)
try:
basement_thread.start()
basement_thread.join()
# Give time to the subprocess to finish completely
while basement_thread.is_alive():
time.sleep(1)
except KeyboardInterrupt:
basement_thread.stop()
finally:
tmp.rmtree(ignore_errors=True)
e = basement_thread.exception
if e is not None:
raise e
# Run the Slab preprocessor program if necessary
tmp = (
self.output_directory.makedirs_p() / "runSlab_run_" + str(uuid.uuid1())[0:8]
).mkdir()
slab_thread = SlabThread(self, tmp)
try:
slab_thread.start()
slab_thread.join()
# Give time to the subprocess to finish completely
while slab_thread.is_alive():
time.sleep(1)
except KeyboardInterrupt:
slab_thread.stop()
finally:
tmp.rmtree(ignore_errors=True)
e = slab_thread.exception
if e is not None:
raise e
# Run the energyplus program
tmp = (
self.output_directory.makedirs_p() / "eplus_run_" + str(uuid.uuid1())[0:8]
).mkdir()
running_simulation_thread = EnergyPlusThread(self, tmp)
try:
running_simulation_thread.start()
running_simulation_thread.join()
# Give time to the subprocess to finish completely
while running_simulation_thread.is_alive():
time.sleep(1)
except KeyboardInterrupt:
running_simulation_thread.stop()
finally:
tmp.rmtree(ignore_errors=True)
e = running_simulation_thread.exception
if e is not None:
raise e
return self
[docs] def savecopy(self, filename, lineendings="default", encoding="latin-1"):
"""Save a copy of the file with the filename passed.
Args:
filename (str): Filepath to save the file.
lineendings (str): Line endings to use in the saved file. Options are
'default', 'windows' and 'unix' the default is 'default' which uses
the line endings for the current system.
encoding (str): Encoding to use for the saved file. The default is
'latin-1' which is compatible with the EnergyPlus IDFEditor.
Returns:
Path: The new file path.
"""
super(IDF, self).save(filename, lineendings, encoding)
return Path(filename)
[docs] def copy(self):
"""Return a copy of self as an in memory IDF.
The copy is a new IDF object with the same parameters and arguments as self
but is not attached to an file. Use IDF.saveas("idfname.idf", inplace=True)
to save the copy to a file inplace. self.idfname will now be idfname.idf
"""
return self.saveas(io.StringIO(""))
[docs] def save(self, lineendings="default", encoding="latin-1", **kwargs):
"""Write the IDF model to the text file.
Uses :meth:`~eppy.modeleditor.IDF.save` but also brings over existing
simulation results.
Args:
lineendings (str) : Line endings to use in the saved file. Options are
'default', 'windows' and 'unix' the default is 'default' which uses
the line endings for the current system.
encoding (str): Encoding to use for the saved file. The default is
'latin-1' which is compatible with the EnergyPlus IDFEditor.
Returns:
IDF: The IDF model
"""
super(IDF, self).save(
filename=self.idfname, lineendings=lineendings, encoding=encoding
)
log(f"saved '{self.name}' at '{self.idfname}'")
return self
[docs] def saveas(
self, filename, lineendings="default", encoding="latin-1", inplace=False
):
"""Save the IDF model as.
Writes a new text file and load a new instance of the IDF class (new object).
Args:
filename (str): Filepath to save the file. If None then use the IDF.idfname
parameter. Also accepts a file handle.
lineendings (str) : Line endings to use in the saved file. Options are
'default', 'windows' and 'unix' the default is 'default' which uses
the line endings for the current system.
encoding (str): Encoding to use for the saved file. The default is
'latin-1' which is compatible with the EnergyPlus IDFEditor.
inplace (bool): If True, applies the new filename to self directly,
else a new object is returned with the new filename.
Returns:
IDF: A new IDF object based on the new location file.
"""
super(IDF, self).save(
filename=filename, lineendings=lineendings, encoding=encoding
)
import inspect
sig = inspect.signature(IDF.__init__)
kwargs = {
key: getattr(self, key)
for key in [a for a in sig.parameters]
if key not in ["self", "idfname", "kwargs"]
}
as_idf = IDF(filename, **kwargs)
# copy simulation_dir over to new location
file: Path
if self.simulation_files:
# If simulation files exist in cache, copy to new cache location
as_idf.simulation_dir.makedirs_p()
for file in self.simulation_files:
if self.output_prefix in file:
name = file.replace(self.output_prefix, as_idf.output_prefix)
name = Path(name).basename()
else:
name = file.basename()
try:
file.copy(as_idf.simulation_dir / name)
except shutil.SameFileError:
# A copy of self would have the same files in the simdir and
# throw an error.
pass
if inplace:
# If inplace, replace content of self with content of as_idf.
self.__dict__.update(as_idf.__dict__)
else:
# return the new object.
return as_idf
[docs] def process_results(self):
"""Return the list of processed results.
Processes are defined by :attr:`custom_processes` as a list of tuple(file,
result). A default process looks for csv files and tries to parse them into
:class:`~pandas.DataFrame` objects.
Returns:
list: List of two-tuples.
Info:
For processed_results to work more consistently, it may be necessary to
add the "readvars=True" parameter to :func:`IDF.simulate` as this one is
set to false by default.
"""
processes = {"*.csv": _process_csv}
custom_processes = self.custom_processes
if custom_processes:
processes.update(custom_processes)
try:
results = []
for glob, process in processes.items():
results.extend(
[
(
file.basename(),
process(
file,
working_dir=os.getcwd(),
simulname=self.output_prefix,
),
)
for file in self.simulation_dir.files(glob)
]
)
except FileNotFoundError:
raise ValueError("No results to process. Have you called IDF.simulate()?")
else:
return results
[docs] def add_idf_object_from_idf_string(self, idf_string):
"""Add an IDF object (or more than one) from an EnergyPlus text string.
Args:
idf_string (str): A text string fully describing an EnergyPlus object.
"""
# Load the idf_string as a new model.
loaded_string = IDF(
StringIO(idf_string),
file_version=self.file_version,
as_version=self.as_version,
prep_outputs=False,
)
# Loop on all objects and using self.newidfobject
added_objects = []
for sequence in loaded_string.idfobjects.values():
if sequence:
for obj in sequence:
data = obj.to_dict()
key = data.pop("key")
added_objects.append(self.newidfobject(key=key.upper(), **data))
del loaded_string # remove loaded_string model
return added_objects
[docs] def upgrade(self, to_version=None, overwrite=True):
"""`EnergyPlus` idf version updater using local transition program.
Update the EnergyPlus simulation file (.idf) to the latest available
EnergyPlus version installed on this machine. Optionally specify a version
(eg.: "9-2-0") to aim for a specific version. The output will be the path of
the updated file. The run is multiprocessing_safe.
Hint:
If attempting to upgrade an earlier version of EnergyPlus (pre-v7.2.0),
specific binaries need to be downloaded and copied to the
EnergyPlus*/PreProcess/IDFVersionUpdater folder. More info at
`Converting older version files
<http://energyplus.helpserve.com/Knowledgebase/List/Index/46
/converting-older-version-files>`_.
Args:
to_version (str, optional): EnergyPlus version in the form "X-X-X".
overwrite (bool): If True, original idf file is overwritten with new
transitioned file.
Raises:
EnergyPlusProcessError: If version updater fails.
EnergyPlusVersionError:
CalledProcessError:
"""
# First, set versions
if to_version is None:
to_version = EnergyPlusVersion.latest()
elif isinstance(to_version, (str, tuple)):
to_version = EnergyPlusVersion(to_version)
# second check if upgrade needed
if self.file_version == to_version:
return
elif self.file_version > to_version:
raise EnergyPlusVersionError(self.name, self.file_version, to_version)
else:
self.as_version = to_version # set version number
# execute transitions
tmp = (
self.output_directory / "Transition_run_" + str(uuid.uuid1())[0:8]
).makedirs_p()
slab_thread = TransitionThread(self, tmp, overwrite=overwrite)
slab_thread.start()
slab_thread.join()
while slab_thread.is_alive():
time.sleep(1)
tmp.rmtree(ignore_errors=True)
e = slab_thread.exception
if e is not None:
raise e
[docs] def wwr(self, azimuth_threshold=10, round_to=10):
"""Return the Window-to-Wall Ratio by major orientation.
Optionally round up the WWR value to nearest value (eg.: nearest 10 %).
Args:
azimuth_threshold (int): Defines the incremental major orientation
azimuth angle. Due to possible rounding errors, some surface
azimuth can be rounded to values different than the main
directions (eg.: 89 degrees instead of 90 degrees). Defaults to
increments of 10 degrees.
round_to (float): Optionally round the WWR value to nearest value
(eg.: nearest 10). If None, this is ignored and the float is
returned.
Returns:
(pd.DataFrame): A DataFrame with the total wall area, total window
area and WWR for each main orientation of the building.
"""
import math
def roundto(x, to=10.0):
"""Round up to closest `to` number."""
from builtins import round
if to and not math.isnan(x):
return int(round(x / to)) * to
else:
return x
total_surface_area = defaultdict(int)
total_window_area = defaultdict(int)
zones = self.idfobjects["ZONE"]
zone: EpBunch
for zone in zones:
multiplier = float(zone.Multiplier if zone.Multiplier != "" else 1)
for surface in [
surf
for surf in zone.zonesurfaces
if surf.key.upper() not in ["INTERNALMASS", "WINDOWSHADINGCONTROL"]
]:
if isclose(surface.tilt, 90, abs_tol=10):
if surface.Outside_Boundary_Condition.lower() == "outdoors":
surf_azim = roundto(surface.azimuth, to=azimuth_threshold)
total_surface_area[surf_azim] += surface.area * multiplier
for subsurface in surface.subsurfaces:
if hasattr(subsurface, "tilt"):
if isclose(subsurface.tilt, 90, abs_tol=10):
if subsurface.Surface_Type.lower() == "window":
surf_azim = roundto(
subsurface.azimuth, to=azimuth_threshold
)
total_window_area[surf_azim] += (
subsurface.area * multiplier
)
if isclose(subsurface.tilt, 180, abs_tol=80):
total_window_area["sky"] += subsurface.area * multiplier
# Fix azimuth = 360 which is the same as azimuth 0
total_surface_area[0] += total_surface_area.pop(360, 0)
total_window_area[0] += total_window_area.pop(360, 0)
# Create dataframe with wall_area, window_area and wwr as columns and azimuth
# as indexes
from sigfig import round
df = (
pd.DataFrame(
{"wall_area": total_surface_area, "window_area": total_window_area}
)
.rename_axis("Azimuth")
.fillna(0)
)
df.wall_area = df.wall_area.apply(round, decimals=1)
df.window_area = df.window_area.apply(round, decimals=1)
df["wwr"] = (df.window_area / df.wall_area).fillna(0).apply(round, 2)
df["wwr_rounded_%"] = (
(df.window_area / df.wall_area * 100)
.fillna(0)
.apply(lambda x: roundto(x, to=round_to))
)
return df
[docs] def space_heating_profile(
self,
units="kWh",
energy_out_variable_name=None,
name="Space Heating",
EnergySeries_kwds=None,
):
"""Return space-heating time series.
Args:
units (str): Units to convert the energy profile to. Will detect the
units of the EnergyPlus results.
energy_out_variable_name (list-like): a list of EnergyPlus Variable
names.
name (str): Name given to the EnergySeries.
EnergySeries_kwds (dict, optional): keywords passed to
:func:`EnergySeries.from_sqlite`
Returns:
EnergySeries
"""
if EnergySeries_kwds is None:
EnergySeries_kwds = {}
start_time = time.time()
if energy_out_variable_name is None:
energy_out_variable_name = (
"Air System Total Heating Energy",
"Zone Ideal Loads Zone Total Heating Energy",
)
series = self._energy_series(
energy_out_variable_name, units, name, EnergySeries_kwds=EnergySeries_kwds
)
log(
"Retrieved Space Heating Profile in {:,.2f} seconds".format(
time.time() - start_time
)
)
return series
[docs] def space_cooling_profile(
self,
units="kWh",
energy_out_variable_name=None,
name="Space Cooling",
EnergySeries_kwds=None,
):
"""Return space-cooling time series.
Args:
units (str): Units to convert the energy profile to. Will detect the
units of the EnergyPlus results.
energy_out_variable_name (list-like): a list of EnergyPlus
name (str): Name given to the EnergySeries.
EnergySeries_kwds (dict, optional): keywords passed to
:func:`EnergySeries.from_sqlite`
Returns:
EnergySeries
"""
if EnergySeries_kwds is None:
EnergySeries_kwds = {}
start_time = time.time()
if energy_out_variable_name is None:
energy_out_variable_name = (
"Air System Total Cooling Energy",
"Zone Ideal Loads Zone Total Cooling Energy",
)
series = self._energy_series(
energy_out_variable_name, units, name, EnergySeries_kwds=EnergySeries_kwds
)
log(
"Retrieved Space Cooling Profile in {:,.2f} seconds".format(
time.time() - start_time
)
)
return series
[docs] def service_water_heating_profile(
self,
units="kWh",
energy_out_variable_name=None,
name="Space Heating",
EnergySeries_kwds=None,
):
"""Return service water heating (domestic hot water) time series.
Args:
units (str): Units to convert the energy profile to. Will detect the
units of the EnergyPlus results.
energy_out_variable_name (list-like): a list of EnergyPlus Variable
names.
name (str): Name given to the EnergySeries.
EnergySeries_kwds (dict, optional): keywords passed to
:func:`EnergySeries.from_sqlite`
Returns:
EnergySeries
"""
if EnergySeries_kwds is None:
EnergySeries_kwds = {}
start_time = time.time()
if energy_out_variable_name is None:
energy_out_variable_name = ("WaterSystems:EnergyTransfer",)
series = self._energy_series(
energy_out_variable_name, units, name, EnergySeries_kwds=EnergySeries_kwds
)
log(
"Retrieved Service Water Heating Profile in {:,.2f} seconds".format(
time.time() - start_time
)
)
return series
[docs] def custom_profile(
self,
energy_out_variable_name,
name,
units="kWh",
prep_outputs=None,
EnergySeries_kwds=None,
):
"""Return user-defined time series.
Args:
energy_out_variable_name (list-like): a list of EnergyPlus
name (str): Name given to the EnergySeries.
units (str): Units to convert the energy profile to. Will detect the
units of the EnergyPlus results.
prep_outputs:
EnergySeries_kwds (dict, optional): keywords passed to
:func:`EnergySeries.from_sqlite`
Returns:
EnergySeries
"""
if EnergySeries_kwds is None:
EnergySeries_kwds = {}
start_time = time.time()
series = self._energy_series(
energy_out_variable_name,
units,
name,
prep_outputs,
EnergySeries_kwds=EnergySeries_kwds,
)
log("Retrieved {} in {:,.2f} seconds".format(name, time.time() - start_time))
return series
[docs] def newidfobject(self, key, **kwargs) -> Optional[EpBunch]:
"""Define EpBunch object and add to model.
The function will test if the object exists to prevent duplicates.
Args:
key (str): The type of IDF object. This must be in ALL_CAPS.
**kwargs: Keyword arguments in the format `field=value` used to set
fields in the EnergyPlus object.
Example:
>>> from archetypal import IDF
>>> IDF.newidfobject(
>>> key="Schedule:Constant".upper(),
>>> Name="AlwaysOn",
>>> Schedule_Type_Limits_Name="",
>>> Hourly_Value=1,
>>> )
Returns:
EpBunch: the object, if successful
None: If an error occured.
"""
# get list of objects
existing_objs = self.idfobjects[key] # a list
# create new object
try:
new_object = self.anidfobject(key, **kwargs)
except BadEPFieldError as e:
raise e
else:
# If object is supposed to be 'unique-object', delete all objects to be
# sure there is only one of them when creating new object
# (see following line)
if "unique-object" in set().union(
*(d.objidd[0].keys() for d in existing_objs)
):
for obj in existing_objs:
self.removeidfobject(obj)
log(
f"{obj} is a 'unique-object'; Removed and replaced with"
f" {new_object}",
lg.DEBUG,
)
self.addidfobject(new_object)
return new_object
if new_object in existing_objs:
# If obj already exists, simply return the existing one.
log(
f"object '{new_object}' already exists in {self.name}. "
f"Skipping.",
lg.DEBUG,
)
return next(x for x in existing_objs if x == new_object)
elif new_object not in existing_objs and new_object.nameexists():
# Object does not exist (because not equal) but Name exists.
obj = self.getobject(
key=new_object.key.upper(), name=new_object.Name.upper()
)
self.removeidfobject(obj)
self.addidfobject(new_object)
log(
f"{obj} exists but has different attributes; Removed and replaced "
f"with {new_object}",
lg.DEBUG,
)
return new_object
else:
# add to model and return
self.addidfobject(new_object)
log(f"object '{new_object}' added to '{self.name}'", lg.DEBUG)
return new_object
[docs] def addidfobject(self, new_object) -> EpBunch:
"""Add an IDF object to the model.
Args:
new_object (EpBunch): The IDF object to add.
Returns:
EpBunch: object.
"""
key = new_object.key.upper()
self.idfobjects[key].append(new_object)
self._reset_dependant_vars("idfobjects")
return new_object
[docs] def removeidfobject(self, idfobject):
"""Remove an IDF object from the model.
Args:
idfobject (EpBunch): The object to remove from the model.
"""
key = idfobject.key.upper()
self.idfobjects[key].remove(idfobject)
self._reset_dependant_vars("idfobjects")
[docs] def removeidfobjects(self, idfobjects: Iterable[EpBunch]):
"""Remove an IDF object from the model.
Args:
idfobjects: The object to remove from the model.
"""
for idfobject in idfobjects:
key = idfobject.key.upper()
self.idfobjects[key].remove(idfobject)
self._reset_dependant_vars("idfobjects")
[docs] def anidfobject(self, key: str, aname: str = "", **kwargs) -> EpBunch:
"""Define and create an object, but don't add it to the model.
See :func:`~archetypal.idfclass.idf.IDF.newidfobject`). If you don't specify
a value for a field, the default value will be set.
Example:
>>> from archetypal import IDF
>>> IDF.anidfobject("CONSTRUCTION")
>>> IDF.anidfobject(
>>> key="CONSTRUCTION",
>>> Name='Interior Ceiling_class',
>>> Outside_Layer='LW Concrete',
>>> Layer_2='soundmat'
>>> )
Args:
key (str): The type of IDF object. This must be in ALL_CAPS.
aname (str): This parameter is not used. It is left there for backward
compatibility.
kwargs: Keyword arguments in the format `field=value` used to set
fields in the EnergyPlus object.
Returns:
EpBunch: object.
"""
obj = newrawobject(self.model, self.idd_info, key)
abunch = obj2bunch(self.model, self.idd_info, obj)
if aname:
warnings.warn(
"The aname parameter should no longer be used (%s)." % aname,
UserWarning,
)
namebunch(abunch, aname)
for k, v in kwargs.items():
try:
abunch[k] = v
except BadEPFieldError as e:
# Backwards compatibility
if str(e) == "unknown field Key_Name":
abunch["Name"] = v
else:
raise e
abunch.theidf = self
return abunch
[docs] def get_schedule_type_limits_data_by_name(self, schedule_limit_name):
"""Return the data for a particular 'ScheduleTypeLimits' object.
Args:
schedule_limit_name:
"""
schedule = self.getobject("ScheduleTypeLimits".upper(), schedule_limit_name)
if schedule is not None:
lower_limit = schedule["Lower_Limit_Value"]
upper_limit = schedule["Upper_Limit_Value"]
numeric_type = schedule["Numeric_Type"]
unit_type = schedule["Unit_Type"]
if schedule["Unit_Type"] == "":
unit_type = numeric_type
return lower_limit, upper_limit, numeric_type, unit_type
else:
return "", "", "", ""
[docs] def get_schedule_epbunch(self, name, sch_type=None):
"""Return the epbunch of a particular schedule name.
If the schedule type is known, retrievess it quicker.
Args:
name (str): The name of the schedule to retreive in the IDF file.
sch_type (str): The schedule type, e.g.: "SCHEDULE:YEAR".
"""
if sch_type is None:
try:
return self.schedules_dict[name.upper()]
except KeyError:
raise KeyError(
'Unable to find schedule "{}" of type "{}" '
'in idf file "{}"'.format(name, sch_type, self.name)
)
else:
return self.getobject(sch_type.upper(), name)
def _get_all_schedules(self, yearly_only=False):
"""Return all schedule ep_objects in a dict with their name as a key.
Args:
yearly_only (bool): If True, return only yearly schedules
Returns:
(dict of eppy.bunch_subclass.EpBunch): the schedules with their
name as a key
"""
schedule_types = list(map(str.upper, self.getiddgroupdict()["Schedules"]))
if yearly_only:
schedule_types = [
"Schedule:Year".upper(),
"Schedule:Compact".upper(),
"Schedule:Constant".upper(),
"Schedule:File".upper(),
]
scheds = {}
for sched_type in schedule_types:
for sched in self.idfobjects[sched_type]:
try:
if sched.key.upper() in schedule_types:
scheds[sched.Name.upper()] = sched
except KeyError:
pass
return scheds
def _get_used_schedules(self, yearly_only=False):
"""Return all used schedules.
Args:
yearly_only (bool): If True, return only yearly schedules
Returns:
(list): the schedules names
"""
schedule_types = [
"Schedule:Day:Hourly".upper(),
"Schedule:Day:Interval".upper(),
"Schedule:Day:List".upper(),
"Schedule:Week:Daily".upper(),
"Schedule:Year".upper(),
"Schedule:Week:Compact".upper(),
"Schedule:Compact".upper(),
"Schedule:Constant".upper(),
"Schedule:File".upper(),
]
used_schedules = []
all_schedules = self._get_all_schedules(yearly_only=yearly_only)
for object_name in self.idfobjects:
for obj in self.idfobjects[object_name]:
if obj.key.upper() not in schedule_types:
for fieldvalue in obj.fieldvalues:
try:
if (
fieldvalue.upper() in all_schedules.keys()
and fieldvalue not in used_schedules
):
used_schedules.append(fieldvalue)
except (KeyError, AttributeError):
pass
return used_schedules
@property
def width(self):
"""Get the width of the building [m]."""
bbox = self.bounding_box()
return max(bbox.xs) - min(bbox.xs)
@property
def length(self):
"""Get the length of the building [m]."""
bbox = self.bounding_box()
return max(bbox.ys) - min(bbox.ys)
[docs] def resize(self, width, length):
"""Resize the floor plate to x and y [meters].
Args:
width (float): The new width [m] of the building (x axis).
length (float): The new length [m] of the building (y axis).
"""
x_scale = width / self.width
y_scale = length / self.length
self.scale(x_scale, axes="x")
self.scale(y_scale, axes="y")
[docs] def rename(self, objkey, objname, newname):
"""Rename all the references to this objname.
Function comes from eppy.modeleditor and was modified to compare the
name to rename with a lower string (see
idfobject[idfobject.objls[findex]].lower() == objname.lower()).
Args:
objkey (str): EpBunch we want to rename and rename all the
occurrences where this object is in the IDF file
objname (str): The name of the EpBunch to rename
newname (str): New name used to rename the EpBunch
Returns:
theobject (EpBunch): The renamed idf object
"""
refnames = eppy.modeleditor.getrefnames(self, objkey)
for refname in refnames:
objlists = eppy.modeleditor.getallobjlists(self, refname)
# [('OBJKEY', refname, fieldindexlist), ...]
for robjkey, refname, fieldindexlist in objlists:
idfobjects = self.idfobjects[robjkey]
for idfobject in idfobjects:
for findex in fieldindexlist: # for each field
if (
idfobject[idfobject.objls[findex]].lower()
== objname.lower()
):
idfobject[idfobject.objls[findex]] = newname
theobject = self.getobject(objkey, objname)
fieldname = [item for item in theobject.objls if item.endswith("Name")][0]
theobject[fieldname] = newname
return theobject
[docs] def set_wwr(
self,
wwr: float = None,
construction: Optional[str] = None,
force: bool = False,
wwr_map: Optional[dict] = None,
orientation: Optional[str] = None,
surfaces: Optional[Iterable] = None,
):
"""Set Window-to-Wall Ratio of external walls.
Different WWR can be applied to specific wall orientations using the
`wwr_map` dictionary. This map is a dict of wwr values, keyed by
`wall.azimuth`, which overrides the default passed as `wwr`. Note that
`wall.azimuth` is rounded to the closest integer.
They can also be applied to walls oriented to a compass point, e.g. north,
which will apply to walls which have an azimuth within 45 degrees of due north.
Args:
wwr: The window to wall ratio to apply to all orientations. If wwr_map is
specified, `wwr` is the default wwr for oreintations not defined in
the mapping.
construction: Name of a window construction to apply.
force: True to create windows on walls that don't have any.
wwr_map: Mapping from wall orientation (azimuth) to WWR, e.g.
{180: 0.25, 90: 0.2}.
orientation: One of "north", "east", "south", "west". Walls within 45
degrees will be affected.
surfaces: Iterable of surfaces to set the window to wall ratio of.
"""
# Taken from `geomeppy.idf.IDF.set_wwr` since pull request is not being
# reviewed as of 2021-11-10.
try:
ggr: Optional[Idf_MSequence] = self.idfobjects["GLOBALGEOMETRYRULES"][0]
except IndexError:
ggr = None
# check orientation
orientations = {
"north": 0.0,
"east": 90.0,
"south": 180.0,
"west": 270.0,
None: None,
}
degrees = orientations.get(orientation, None)
external_walls = filter(
lambda x: x.Outside_Boundary_Condition.lower() == "outdoors",
surfaces or self.getsurfaces("wall"),
)
external_walls = filter(
lambda x: _has_correct_orientation(x, degrees), external_walls
)
subsurfaces = self.getsubsurfaces()
base_wwr = wwr
for wall in external_walls:
# get any subsurfaces on the wall
wall_subsurfaces = list(
filter(lambda x: x.Building_Surface_Name == wall.Name, subsurfaces)
)
if wall_subsurfaces and not construction:
constructions = list(
{
wss.Construction_Name
for wss in wall_subsurfaces
if _is_window(wss)
}
)
if len(constructions) > 1:
raise ValueError(
'Not all subsurfaces on wall "{name}" have the same construction'.format(
name=wall.Name
)
)
construction = constructions[0]
wwr = (wwr_map or {}).get(round(wall.azimuth), base_wwr)
if not wwr:
continue
if len(wall_subsurfaces) == 0 and not force:
# Don't create windows on walls that don't have any windows already.
continue
# remove all subsurfaces
for ss in wall_subsurfaces:
self.removeidfobject(ss)
coords = window_vertices_given_wall(wall, wwr)
window = self.newidfobject(
"FENESTRATIONSURFACE:DETAILED",
Name="%s window" % wall.Name,
Surface_Type="Window",
Construction_Name=construction or "",
Building_Surface_Name=wall.Name,
View_Factor_to_Ground="autocalculate", # from the surface angle
)
window.setcoords(coords, ggr)
def _energy_series(
self,
energy_out_variable_name,
units,
name,
prep_outputs=None,
EnergySeries_kwds=None,
) -> EnergySeries:
"""Query the report data and return an EnergySeries.
Args:
energy_out_variable_name:
units:
name:
prep_outputs (list):
EnergySeries_kwds:
"""
if prep_outputs:
self.outputs.add_custom(prep_outputs).apply()
self.simulate()
rd = ReportData.from_sqlite(self.sql_file, table_name=energy_out_variable_name)
profile = EnergySeries.from_reportdata(
rd, to_units=units, name=name, **EnergySeries_kwds
)
return profile
def _execute_transitions(self, idf_file, to_version, **kwargs):
trans_exec = {
EnergyPlusVersion(
re.search(r"to-V(([\d])-([\d])-([\d]))", executable).group(1)
): executable
for executable in self.idfversionupdater_dir.files("Transition-V*")
}
transitions = [
key for key in trans_exec if to_version >= key > self.file_version
]
transitions.sort()
for trans in tqdm(
transitions,
position=self.position,
desc=f"transition file #{self.position}-{self.name}",
):
if not trans_exec[trans].exists():
raise EnergyPlusProcessError(
cmd=trans_exec[trans],
stderr="The specified EnergyPlus version (v{}) does not have"
" the required transition program '{}' in the "
"PreProcess folder. See the documentation "
"(archetypal.readthedocs.io/troubleshooting.html#missing"
"-transition-programs) "
"to solve this issue".format(to_version, trans_exec[trans]),
idf=self,
)
else:
cmd = [trans_exec[trans], idf_file]
with subprocess.Popen(
cmd,
stdout=subprocess.PIPE,
stderr=subprocess.PIPE,
cwd=self.idfversionupdater_dir,
) as process:
process_output, error_output = process.communicate()
log(
process_output.decode("utf-8"),
level=lg.DEBUG,
name="transition_" + self.name,
filename="transition_" + self.name,
log_dir=self.idfversionupdater_dir,
)
if error_output:
log(
error_output.decode("utf-8"),
level=lg.DEBUG,
name="transition_" + self.name,
filename="transition_" + self.name,
log_dir=self.idfversionupdater_dir,
)
def _process_csv(file, working_dir, simulname):
"""Process csv file.
Args:
file:
working_dir:
simulname:
"""
log("looking for csv output, return the csv files in DataFrames if any")
if "table" in file.basename():
tables_out = working_dir.abspath() / "tables"
tables_out.makedirs_p()
file.copy(tables_out / "%s_%s.csv" % (file.basename().stripext(), simulname))
return
log("try to store file %s in DataFrame" % file)
try:
df = pd.read_csv(file, sep=",", encoding="us-ascii")
except ParserError:
pass
else:
log("file %s stored" % file)
return df