################################################################################
# Module: utils.py
# Description: Utility functions for configuration, logging
# License: MIT, see full license in LICENSE.txt
# Web: https://github.com/samuelduchesne/archetypal
################################################################################
# OSMnx
#
# Copyright (c) 2019 Geoff Boeing https://geoffboeing.com/
#
# Part of the following code is a derivative work of the code from the OSMnx
# project, which is licensed MIT License. This code therefore is also
# licensed under the terms of the The MIT License (MIT).
################################################################################
import contextlib
import datetime as dt
import json
import logging as lg
import multiprocessing
import os
import sys
import time
import unicodedata
import warnings
from collections import OrderedDict
from concurrent.futures._base import as_completed
import numpy as np
import pandas as pd
from pandas.io.json import json_normalize
from path import Path
from tqdm import tqdm
from archetypal import __version__, settings
from archetypal.eplus_interface.version import EnergyPlusVersion
[docs]def config(
data_folder=settings.data_folder,
logs_folder=settings.logs_folder,
imgs_folder=settings.imgs_folder,
cache_folder=settings.cache_folder,
use_cache=settings.use_cache,
log_file=settings.log_file,
log_console=settings.log_console,
log_level=settings.log_level,
log_name=settings.log_name,
log_filename=settings.log_filename,
useful_idf_objects=settings.useful_idf_objects,
umitemplate=settings.umitemplate,
default_weight_factor="area",
ep_version=settings.ep_version,
debug=settings.debug,
):
"""Package configurations. Call this method at the beginning of script or at the
top of an interactive python environment to set package-wide settings.
Args:
data_folder (str): where to save and load data files.
logs_folder (str): where to write the log files.
imgs_folder (str): where to save figures.
cache_folder (str): where to save the simulation results.
use_cache (bool): if True, use a local cache to save/retrieve DataPortal API
calls for the same requests.
log_file (bool): if true, save log output to a log file in logs_folder.
log_console (bool): if true, print log output to the console.
log_level (int): one of the logger.level constants.
log_name (str): name of the logger.
log_filename (str): name of the log file.
useful_idf_objects (list): a list of useful idf objects.
umitemplate (str): where the umitemplate is located.
default_weight_factor:
ep_version (str): EnergyPlus version to use. eg. "9-2-0".
debug (bool): Use debug behavior in various part of code base.
Returns:
None
"""
# set each global variable to the passed-in parameter value
settings.use_cache = use_cache
settings.cache_folder = Path(cache_folder).expand().makedirs_p()
settings.data_folder = Path(data_folder).expand().makedirs_p()
settings.imgs_folder = Path(imgs_folder).expand().makedirs_p()
settings.logs_folder = Path(logs_folder).expand().makedirs_p()
settings.log_console = log_console
settings.log_file = log_file
settings.log_level = log_level
settings.log_name = log_name
settings.log_filename = log_filename
settings.useful_idf_objects = useful_idf_objects
settings.umitemplate = umitemplate
settings.zone_weight.set_weigth_attr(default_weight_factor)
settings.ep_version = ep_version
settings.debug = debug
# if logging is turned on, log that we are configured
if settings.log_file or settings.log_console:
log("Configured archetypal")
[docs]def log(
message,
level=None,
name=None,
filename=None,
avoid_console=False,
log_dir=None,
verbose=False,
):
"""Write a message to the log file and/or print to the the console.
Args:
message (str): the content of the message to log
level (int): one of the logger.level constants
name (str): name of the logger
filename (str): name of the log file
avoid_console (bool): If True, don't print to console for this message
only
log_dir (str, optional): directory of log file. Defaults to
settings.log_folder
verbose: If True, settings.log_console is overridden.
"""
if level is None:
level = settings.log_level
if name is None:
name = settings.log_name
if filename is None:
filename = settings.log_filename
logger = None
# if logging to file is turned on
if settings.log_file:
# get the current logger (or create a new one, if none), then log
# message at requested level
logger = get_logger(level=level, name=name, filename=filename, log_dir=log_dir)
if level == lg.DEBUG:
logger.debug(message)
elif level == lg.INFO:
logger.info(message)
elif level == lg.WARNING:
logger.warning(message)
elif level == lg.ERROR:
logger.error(message)
# if logging to console is turned on, convert message to ascii and print to
# the console
if settings.log_console or verbose or level == lg.ERROR and not avoid_console:
# capture current stdout, then switch it to the console, print the
# message, then switch back to what had been the stdout. this prevents
# logging to notebook - instead, it goes to console
standard_out = sys.stdout
sys.stdout = sys.__stdout__
# convert message to ascii for console display so it doesn't break
# windows terminals
message = (
unicodedata.normalize("NFKD", str(message))
.encode("ascii", errors="replace")
.decode()
)
tqdm.write(message)
sys.stdout = standard_out
if level == lg.WARNING:
warnings.warn(message)
return logger
def get_logger(level=None, name=None, filename=None, log_dir=None):
"""Create a logger or return the current one if already instantiated.
Args:
level (int): one of the logger.level constants.
name (str): name of the logger.
filename (str): name of the log file.
log_dir (str, optional): directory of the log file. Defaults to
settings.log_folder.
Returns:
logging.Logger: a Logger
"""
if isinstance(log_dir, str):
log_dir = Path(log_dir)
if level is None:
level = settings.log_level
if name is None:
name = settings.log_name
if filename is None:
filename = settings.log_filename
logger = lg.getLogger(name)
# if a logger with this name is not already set up
if not getattr(logger, "handler_set", None):
# get today's date and construct a log filename
todays_date = dt.datetime.today().strftime("%Y_%m_%d")
if not log_dir:
log_dir = settings.logs_folder
log_filename = log_dir / "{}_{}.log".format(filename, todays_date)
# if the logs folder does not already exist, create it
if not log_dir.exists():
log_dir.makedirs_p()
# create file handler and log formatter and set them up
try:
handler = lg.FileHandler(log_filename, encoding="utf-8")
except:
handler = lg.StreamHandler()
formatter = lg.Formatter(
"%(asctime)s [%(process)d] %(levelname)s - %(name)s - %(" "message)s"
)
handler.setFormatter(formatter)
logger.addHandler(handler)
logger.setLevel(level)
logger.handler_set = True
return logger
def close_logger(logger=None, level=None, name=None, filename=None, log_dir=None):
"""
Args:
logger:
level:
name:
filename:
log_dir:
"""
if not logger:
# try get logger by name
logger = get_logger(level=level, name=name, filename=filename, log_dir=log_dir)
handlers = logger.handlers[:]
for handler in handlers:
handler.close()
logger.removeHandler(handler)
[docs]def weighted_mean(series, df, weighting_variable):
"""Compute the weighted average while ignoring NaNs. Implements
:func:`numpy.average`.
Args:
series (pandas.Series): the *series* on which to compute the mean.
df (pandas.DataFrame): the *df* containing weighting variables.
weighting_variable (str or list or tuple): Name of weights to use in
*df*. If multiple values given, the values are multiplied together.
Returns:
numpy.ndarray: the weighted average
"""
# get non-nan values
index = ~np.isnan(series.values.astype("float"))
# Returns weights. If multiple `weighting_variable`, df.prod will take care
# of multipling them together.
if not isinstance(weighting_variable, list):
weighting_variable = [weighting_variable]
try:
weights = df.loc[series.index, weighting_variable].astype("float").prod(axis=1)
except Exception:
raise
# Try to average
try:
wa = np.average(series[index].astype("float"), weights=weights[index])
except ZeroDivisionError:
log("Cannot aggregate empty series {}".format(series.name), lg.WARNING)
return np.NaN
except Exception:
raise
else:
return wa
[docs]def top(series, df, weighting_variable):
"""Compute the highest ranked value weighted by some other variable.
Implements
:func:`pandas.DataFrame.nlargest`.
Args:
series (pandas.Series): the *series* on which to compute the ranking.
df (pandas.DataFrame): the *df* containing weighting variables.
weighting_variable (str or list or tuple): Name of weights to use in
*df*. If multiple values given, the values are multiplied together.
Returns:
numpy.ndarray: the weighted top ranked variable
"""
# Returns weights. If multiple `weighting_variable`, df.prod will take care
# of multipling them together.
if not isinstance(series, pd.Series):
raise TypeError(
'"top()" only works on Series, ' "not DataFrames\n{}".format(series)
)
if not isinstance(weighting_variable, list):
weighting_variable = [weighting_variable]
try:
idx_ = (
df.loc[series.index]
.groupby(series.name)
.apply(lambda x: safe_prod(x, df, weighting_variable))
)
if not idx_.empty:
idx = idx_.nlargest(1).index
else:
log('No such names "{}"'.format(series.name))
return np.NaN
except KeyError:
log("Cannot aggregate empty series {}".format(series.name), lg.WARNING)
return np.NaN
except Exception:
raise
else:
if idx.isnull().any():
return np.NaN
else:
return pd.to_numeric(idx, errors="ignore").values[0]
def safe_prod(x, df, weighting_variable):
"""
Args:
x:
df:
weighting_variable:
"""
df_ = df.loc[x.index, weighting_variable]
if not df_.empty:
return df_.astype("float").prod(axis=1).sum()
else:
return 0
[docs]def copy_file(files, where=None):
"""Handles a copy of test idf files
Args:
files (str or list): path(s) of the file(s) to copy
where (str): path where to save the copy(ies)
"""
import os
import shutil
if isinstance(files, str):
files = [files]
files = {os.path.basename(k): k for k in files}
# defaults to cache folder
if where is None:
where = settings.cache_folder
for file in files:
dst = os.path.join(where, file)
output_folder = where
if not os.path.isdir(output_folder):
os.makedirs(output_folder)
shutil.copyfile(files[file], dst)
files[file] = dst
return _unpack_tuple(list(files.values()))
@contextlib.contextmanager
def cd(path):
"""
Args:
path:
"""
log("initially inside {0}".format(os.getcwd()))
CWD = os.getcwd()
os.chdir(path)
log("inside {0}".format(os.getcwd()))
try:
yield
finally:
os.chdir(CWD)
log("finally inside {0}".format(os.getcwd()))
[docs]def load_umi_template(json_template):
"""Load umi template file to list of dict.
Args:
json_template (str): filepath to an umi json_template.
Returns:
list: list of dict.
"""
if os.path.isfile(json_template):
with open(json_template) as f:
dicts = json.load(f, object_pairs_hook=OrderedDict)
return [{key: json_normalize(value)} for key, value in dicts.items()]
else:
raise ValueError("File {} does not exist".format(json_template))
[docs]def check_unique_name(first_letters, count, name, unique_list, suffix=False):
"""Making sure new_name does not already exist
Args:
first_letters (str): string at the beginning of the name, giving a hint
on what the variable is.
count (int): increment to create a unique id in the name.
name (str): name that was just created. To be verified that it is unique
in this function.
unique_list (list): list where unique names are stored.
suffix (bool):
Returns:
new_name (str): name that is unique
"""
if suffix:
while name in unique_list:
count += 1
end_count = "%03d" % count
name = name[:-3] + end_count
else:
while name in unique_list:
count += 1
end_count = "%06d" % count
name = first_letters + "_" + end_count
return name, count
[docs]def angle(v1, v2, acute=True):
"""Calculate the angle between 2 vectors
Args:
v1 (Vector3D): vector 1
v2 (Vector3D): vector 2
acute (bool): If True, give the acute angle, else gives the obtuse one.
Returns:
angle (float): angle between the 2 vectors in degree
"""
angle = np.arccos(np.dot(v1, v2) / (np.linalg.norm(v1) * np.linalg.norm(v2)))
if acute == True:
return angle
else:
return 2 * np.pi - angle
[docs]def float_round(num, n):
"""Makes sure a variable is a float and round it at "n" decimals
Args:
num (str, int, float): number we want to make sure is a float
n (int): number of decimals
Returns:
num (float): a float rounded number
"""
num = float(num)
num = round(num, n)
return num
[docs]def timeit(method):
"""Use this method as a decorator on a function to calculate the time it
take to complete. Uses the :func:`log` method.
Examples:
>>> @timeit
>>> def myfunc():
>>> return 'is a function'
Args:
method (function): A function.
"""
def timed(*args, **kwargs):
ts = time.time()
log("Executing %r..." % method.__qualname__)
result = method(*args, **kwargs)
te = time.time()
tt = te - ts
try:
try:
name = result.Name
except:
name = result.__qualname__
except:
name = str(result)
if tt > 0.001:
log("Completed %r for %r in %.3f s" % (method.__qualname__, name, tt))
else:
log(
"Completed %r for %r in %.3f ms"
% (method.__qualname__, name, tt * 1000)
)
return result
return timed
[docs]def lcm(x, y):
"""This function takes two integers and returns the least common multiple."""
# choose the greater number
if x > y:
greater = x
else:
greater = y
while True:
if (greater % x == 0) and (greater % y == 0):
lcm = greater
break
greater += 1
return lcm
def reduce(function, iterable, **attr):
"""
Args:
function:
iterable:
**attr:
"""
if iterable:
it = iter(iterable)
value = next(it)
for element in it:
value = function(value, element, **attr)
return value
else:
return None
def _unpack_tuple(x):
"""Unpacks one-element tuples for use as return values
Args:
x:
"""
if len(x) == 1:
return x[0]
else:
return x
[docs]def recursive_len(item):
"""Calculate the number of elements in nested list
Args:
item (list): list of lists (i.e. nested list)
Returns:
Total number of elements in nested list
"""
if type(item) == list:
return sum(recursive_len(subitem) for subitem in item)
else:
return 1
[docs]def rotate(l, n):
"""Shift list elements to the left
Args:
l (list): list to rotate
n (int): number to shift list to the left
Returns:
list: shifted list.
"""
return l[n:] + l[:n]
[docs]def parallel_process(
in_dict,
function,
processors=-1,
use_kwargs=True,
show_progress=True,
position=0,
debug=False,
executor=None,
):
"""A parallel version of the map function with a progress b
Examples:
>>> from archetypal import IDF
>>> files = ['tests/input_data/problematic/nat_ventilation_SAMPLE0.idf',
>>> 'tests/input_data/regular/5ZoneNightVent1.idf']
>>> wf = 'tests/input_data/CAN_PQ_Montreal.Intl.AP.716270_CWEC.epw'
>>> rundict = {file: dict(idfname=file, epw=wf,
>>> as_version="9-2-0", annual=True,
>>> prep_outputs=True, expandobjects=True,
>>> verbose='q')
>>> for file in files}
>>> result = parallel_process(rundict, IDF, use_kwargs=True)
Args:
in_dict (dict): A dictionary to iterate over. `function` is applied to value
and key is used as an identifier.
function (callable): A python function to apply to the elements of
in_dict
processors (int): The number of cores to use.
use_kwargs (bool): If True, pass the kwargs as arguments to `function`.
debug (bool): If True, will raise any error on any process.
position: Specify the line offset to print the tqdm bar (starting from 0)
Automatic if unspecified. Useful to manage multiple bars at once
(eg, from threads).
executor (Executor)
Returns:
[function(array[0]), function(array[1]), ...]
"""
if executor is None:
from concurrent.futures import ThreadPoolExecutor
_executor_factory = ThreadPoolExecutor
else:
_executor_factory = executor
from tqdm import tqdm
if processors == -1:
processors = min(len(in_dict), multiprocessing.cpu_count())
kwargs = {
"desc": function.__name__,
"total": len(in_dict),
"unit": "runs",
"unit_scale": True,
"position": position,
"disable": not show_progress,
}
if processors == 1:
futures = []
out = []
for a in tqdm(in_dict, **kwargs):
if use_kwargs:
futures.append(submit(function, **in_dict[a]))
else:
futures.append(submit(function, in_dict[a]))
for job in futures:
out.append(job)
else:
with _executor_factory(
max_workers=processors,
initializer=config,
initargs=(
settings.data_folder,
settings.logs_folder,
settings.imgs_folder,
settings.cache_folder,
settings.use_cache,
settings.log_file,
settings.log_console,
settings.log_level,
settings.log_name,
settings.log_filename,
settings.useful_idf_objects,
settings.umitemplate,
"area",
settings.ep_version,
settings.debug,
),
) as executor:
out = []
futures = []
if use_kwargs:
for a in in_dict:
future = executor.submit(function, **in_dict[a])
futures.append(future)
else:
for a in in_dict:
future = executor.submit(function, in_dict[a])
futures.append(future)
# Print out the progress as tasks complete
for job in tqdm(as_completed(futures), **kwargs):
# Read result from future
try:
result_done = job.result()
except Exception as e:
if debug:
lg.warning(str(e))
raise e
result_done = e
# Append to the list of results
out.append(result_done)
return out
def submit(fn, *args, **kwargs):
"""return fn or Exception"""
try:
return fn(*args, **kwargs)
except Exception as e:
return e
def is_referenced(name, epbunch, fieldname="Zone_or_ZoneList_Name"):
"""bool: Returns True if name is in referenced object fieldname"""
refobj = epbunch.get_referenced_object(fieldname)
if not refobj:
refobj = epbunch.get_referenced_object("Zone_Name") # Backwards Compatibility
if not refobj:
pass
elif refobj.key.upper() == "ZONE":
return name in refobj.Name
elif refobj.key.upper() == "ZONELIST":
raise NotImplementedError(
f"Checking against a ZoneList is "
f"not yet supported in archetypal "
f"v{__version__}"
)
raise ValueError(
f"Invalid referring object returned while "
f"referencing object name: Looking for '{name}' in "
f"object {refobj}"
)
def docstring_parameter(*args, **kwargs):
"""Replaces variables in foo.__doc__ by calling obj.__doc__ =
obj.__doc__.format(* args, ** kwargs)
"""
def dec(obj):
obj.__doc__ = obj.__doc__.format(*args, **kwargs)
return obj
return dec
def extend_class(cls):
"""Given class cls, apply decorator @extend_class to function f so
that f becomes a regular method of cls:
Example:
>>> class cls: pass
>>> @extend_class(cls)
... def f(self):
... pass
Extending class has several usages:
1. There are classes A, B, ... Z, all defining methods foo and b
Though the usual approach is to group the code around class
definitions in files A.py, B.py, ..., Z.py, it is sometimes more
convenient to group all definitions of A.foo(), B.foo(), ... up
to Z.foo(), in one file "foo.py", and all definitions of bar in
file "bpy".
2. Another usage of @extend_class is building a class step-by-step
--- first creating an empty class, and later populating it with
methods.
3. Finally, it is possible to @extend several classes
simultaneously with the same method, as in the example below,
where classes A and B share method foo.
Example:
>>> class A: pass # empty class
...
>>> class B: pass # empty class
...
>>> @extend_class(A)
... @extend_class(B)
... def foo(self,s):
... print s
...
>>> a = A()
>>> a.foo('hello')
hello
>>> b = B()
>>> b.foo('world')
world
Limitations:
1. @extend_class won't work on builtin classes, such as int.
2. Not tested on python 3.
Author:
victorlei@gmail.com
"""
return lambda f: (setattr(cls, f.__name__, f) or f)
class CustomJSONEncoder(json.JSONEncoder):
def default(self, obj):
if isinstance(obj, np.bool_):
return bool(obj)
return obj