Source code for archetypal.schedule

"""archetypal Schedule module."""

import functools
import io
import logging as lg
from datetime import datetime, timedelta
from itertools import groupby
from typing import FrozenSet, Union, List

import numpy as np
import pandas as pd
from energy_pandas import EnergySeries
from eppy.bunch_subclass import BadEPFieldError
from typing_extensions import Literal
from validator_collection import checkers, validators

from archetypal.utils import log


class ScheduleTypeLimits:
    """ScheduleTypeLimits class."""

    __slots__ = ("_name", "_lower_limit", "_upper_limit", "_numeric_type", "_unit_type")

    _NUMERIC_TYPES = ("continuous", "discrete")
    _UNIT_TYPES = (
        "Dimensionless",
        "Temperature",
        "DeltaTemperature",
        "PrecipitationRate",
        "Angle",
        "ConvectionCoefficient",
        "ActivityLevel",
        "Velocity",
        "Capacity",
        "Power",
        "Availability",
        "Percent",
        "Control",
        "Mode",
    )

    def __init__(
        self,
        Name,
        LowerLimit,
        UpperLimit,
        NumericType="Continuous",
        UnitType="Dimensionless",
    ):
        """Initialize object."""
        self.Name = Name
        self.LowerLimit = LowerLimit
        self.UpperLimit = UpperLimit
        self.NumericType = NumericType
        self.UnitType = UnitType

    @property
    def Name(self):
        """Get or set the name of the ScheduleTypeLimits."""
        return self._name

    @Name.setter
    def Name(self, value):
        self._name = validators.string(value)

    @property
    def LowerLimit(self):
        """Get or set the LowerLimit."""
        return self._lower_limit

    @LowerLimit.setter
    def LowerLimit(self, value):
        self._lower_limit = validators.float(value, allow_empty=True)

    @property
    def UpperLimit(self):
        """Get or set the UpperLimit."""
        return self._upper_limit

    @UpperLimit.setter
    def UpperLimit(self, value):
        self._upper_limit = validators.float(value, allow_empty=True)

    @property
    def NumericType(self):
        """Get or set numeric type. Can be null."""
        return self._numeric_type

    @NumericType.setter
    def NumericType(self, value):
        validators.string(value, allow_empty=True)
        if value is not None:
            assert value.lower() in self._NUMERIC_TYPES, (
                f"Input error for value '{value}'. NumericType must "
                f"be one of '{self._NUMERIC_TYPES}'"
            )
        self._numeric_type = value

    @property
    def UnitType(self):
        """Get or set the unit type. Can be null."""
        return self._unit_type

    @UnitType.setter
    def UnitType(self, value):
        value = validators.string(value)
        assert value.lower() in map(str.lower, self._UNIT_TYPES), (
            f"Input error for value '{value}'. UnitType must "
            f"be one of '{self._UNIT_TYPES}'"
        )
        self._unit_type = value

    @classmethod
    def from_dict(cls, data):
        """Create a ScheduleTypeLimit from a dictionary.

        Args:
            data: ScheduleTypeLimit dictionary following the format below.

        .. code-block:: python

            {
            "Name": 'Fractional',
            "LowerLimit": 0,
            "UpperLimit": 1,
            "NumericType": None,
            "UnitType": "Dimensionless"
            }
        """
        return cls(**data)

    @classmethod
    def from_epbunch(cls, epbunch):
        """Create a ScheduleTypeLimits from an epbunch.

        Args:
            epbunch (EpBunch): The epbunch of key "SCHEDULETYPELIMITS".
        """
        assert (
            epbunch.key.upper() == "SCHEDULETYPELIMITS"
        ), f"Expected 'SCHEDULETYPELIMITS' epbunch. Got {epbunch.key}."
        name = epbunch.Name
        lower_limit = epbunch.Lower_Limit_Value
        upper_limit = epbunch.Upper_Limit_Value
        numeric_type = epbunch.Numeric_Type
        unit_type = epbunch.Unit_Type
        return cls(
            Name=name,
            LowerLimit=lower_limit if checkers.is_numeric(lower_limit) else None,
            UpperLimit=upper_limit if checkers.is_numeric(upper_limit) else None,
            NumericType=numeric_type
            if checkers.is_string(numeric_type, minimum_length=1)
            else "Continuous",
            UnitType=unit_type
            if checkers.is_string(unit_type, minimum_length=1)
            else "Dimensionless",
        )

    def to_dict(self):
        """Return ScheduleTypeLimits dictionary representation."""
        return {
            "Name": self.Name,
            "LowerLimit": self.LowerLimit,
            "UpperLimit": self.UpperLimit,
            "NumericType": self.NumericType,
            "UnitType": self.UnitType,
        }

    def to_epbunch(self, idf):
        """Convert self to an epbunch given an idf model.

        Notes:
            The object is added to the idf model.

        Args:
            idf (IDF): An IDF model.

        .. code-block:: python

            SCHEDULETYPELIMITS,
                ,                         !- Name
                ,                         !- Lower Limit Value
                ,                         !- Upper Limit Value
                ,                         !- Numeric Type
                Dimensionless;            !- Unit Type

        Returns:
            EpBunch: The EpBunch object added to the idf model.
        """
        return idf.newidfobject(
            key="SCHEDULETYPELIMITS",
            Name=self.Name,
            Lower_Limit_Value=self.LowerLimit,
            Upper_Limit_Value=self.UpperLimit,
            Numeric_Type=self.NumericType,
            Unit_Type=self.UnitType,
        )

    def duplicate(self):
        """Get copy of self."""
        return self.__copy__()

    def __copy__(self):
        """Get copy of self."""
        return self.__class__(
            self.Name, self.LowerLimit, self.UpperLimit, self.NumericType, self.UnitType
        )

    def __repr__(self):
        """Return the string representation of self."""
        return (
            self.Name
            + f" {self.LowerLimit} < values < {self.UpperLimit}"
            + f"Units: {self.UnitType}"
        )

    def __keys__(self):
        """Get keys of self. Useful for hashing."""
        return (
            self.Name,
            self.LowerLimit,
            self.UpperLimit,
            self.NumericType,
            self.UnitType,
        )

    def __eq__(self, other):
        """Assert self is equal to other."""
        if not isinstance(other, ScheduleTypeLimits):
            return NotImplemented
        else:
            return self.__keys__() == other.__keys__()


class _ScheduleParser:
    """Class used to parse schedules in IDF files."""

    @staticmethod
    def get_interval_day_ep_schedule_values(epbunch) -> np.ndarray:
        """Get values for Schedule:Day:Interval.

        Args:
            epbunch (EpBunch): The schedule EpBunch object.
        """
        number_of_day_sch = int((len(epbunch.fieldvalues) - 3) / 2)

        hourly_values = np.arange(24, dtype=float)
        start_hour = 0
        for i in range(number_of_day_sch):
            value = float(epbunch["Value_Until_Time_{}".format(i + 1)])
            until_time = [
                int(s.strip())
                for s in epbunch["Time_{}".format(i + 1)].split(":")
                if s.strip().isdigit()
            ]
            end_hour = int(until_time[0] + until_time[1] / 60)
            for hour in range(start_hour, end_hour):
                hourly_values[hour] = value

            start_hour = end_hour
        _, _, numeric_type, _ = _ScheduleParser.get_schedule_type_limits_data(epbunch)
        if numeric_type.strip().lower() == "discrete":
            hourly_values = hourly_values.astype(int)

        return hourly_values

    @staticmethod
    def get_hourly_day_ep_schedule_values(epbunch):
        """Get values for Schedule:Day:Hourly.

        Args:
            epbunch (EpBunch): The schedule EpBunch object.
        """
        return np.array(epbunch.fieldvalues[3:])

    @staticmethod
    def get_compact_weekly_ep_schedule_values(
        epbunch, start_date, index=None, strict=False
    ) -> np.ndarray:
        """Get values for schedule:week:compact.

        Args:
            strict:
            epbunch (EpBunch): the name of the schedule
            start_date:
            index:
        """
        if index is None:
            idx = pd.date_range(start=start_date, periods=168, freq="1H")
            slicer_ = pd.Series([False] * (len(idx)), index=idx)
        else:
            slicer_ = pd.Series([False] * (len(index)), index=index)

        weekly_schedules = pd.Series([0] * len(slicer_), index=slicer_.index)
        # update last day of schedule

        num_of_daily_schedules = int(len(epbunch.fieldvalues[2:]) / 2)

        for i in range(num_of_daily_schedules):
            day_type = epbunch["DayType_List_{}".format(i + 1)].lower()
            # This field can optionally contain the prefix “For”
            how = _ScheduleParser._field_set(
                epbunch, day_type.strip("for: "), start_date, slicer_, strict
            )
            if not weekly_schedules.loc[how].empty:
                # Loop through days and replace with day:schedule values
                days = []
                for name, day in weekly_schedules.loc[how].groupby(
                    pd.Grouper(freq="D")
                ):
                    if not day.empty:
                        ref = epbunch.get_referenced_object(
                            "ScheduleDay_Name_{}".format(i + 1)
                        )
                        day.loc[:] = _ScheduleParser.get_schedule_values(
                            sched_epbunch=ref, start_date=start_date, strict=strict
                        )
                        days.append(day)
                new = pd.concat(days)
                slicer_.update(pd.Series([True] * len(new.index), index=new.index))
                slicer_ = slicer_.apply(lambda x: x is True)
                weekly_schedules.update(new)
            else:
                return weekly_schedules.values

        return weekly_schedules.values

    @staticmethod
    def get_daily_weekly_ep_schedule_values(epbunch, start_date, strict) -> np.ndarray:
        """Get values for schedule:week:daily.

        Args:
            strict:
            epbunch (EpBunch): The schedule EpBunch object.
        """
        # 7 list for 7 days of the week
        hourly_values = []
        for day in [
            "Monday",
            "Tuesday",
            "Wednesday",
            "Thursday",
            "Friday",
            "Saturday",
            "Sunday",
        ]:
            ref = epbunch.get_referenced_object("{}_ScheduleDay_Name".format(day))
            h = _ScheduleParser.get_schedule_values(
                sched_epbunch=ref, start_date=start_date, strict=strict
            )
            hourly_values.append(h)
        hourly_values = np.array(hourly_values)
        # shift days earlier by self.startDayOfTheWeek
        hourly_values = np.roll(hourly_values, -start_date.weekday(), axis=0)

        return hourly_values.ravel()

    @staticmethod
    def get_list_day_ep_schedule_values(epbunch, start_date) -> np.ndarray:
        """Get values for schedule:day:list.

        Args:
            start_date:
            epbunch (EpBunch): The schedule epbunch object.
        """
        freq = int(epbunch["Minutes_per_Item"])  # Frequency of the values
        num_values = epbunch.fieldvalues[5:]  # List of values
        method = epbunch["Interpolate_to_Timestep"]  # How to resample

        # fill a list of available values and pad with zeros (this is safer
        # but should not occur)
        all_values = np.arange(int(24 * 60 / freq))
        for i in all_values:
            try:
                all_values[i] = num_values[i]
            except Exception:
                all_values[i] = 0
        # create a fake index to help us with the resampling
        index = pd.date_range(
            start=start_date, periods=(24 * 60) / freq, freq="{}T".format(freq)
        )
        series = pd.Series(all_values, index=index)

        # resample series to hourly values and apply resampler function
        series = series.resample("1H").apply(_how(method))

        return series.values

    @staticmethod
    def get_constant_ep_schedule_values(epbunch) -> np.ndarray:
        """Get values for schedule:constant.

        Args:
            epbunch (EpBunch): The schedule epbunch object.
        """
        hourly_values = np.arange(8760)
        value = float(epbunch["Hourly_Value"])
        for hour in hourly_values:
            hourly_values[hour] = value
        _, _, numeric_type, _ = _ScheduleParser.get_schedule_type_limits_data(epbunch)
        if numeric_type.strip().lower() == "discrete":
            hourly_values = hourly_values.astype(int)

        return hourly_values

    @staticmethod
    def get_file_ep_schedule_values(epbunch) -> np.ndarray:
        """Get values for schedule:file.

        Args:
            epbunch (EpBunch): The schedule epbunch object.
        """
        filename = epbunch["File_Name"]
        column = epbunch["Column_Number"]
        rows = epbunch["Rows_to_Skip_at_Top"]
        # hours = epbunch["Number_of_Hours_of_Data"]
        sep = epbunch["Column_Separator"]
        # interp = epbunch["Interpolate_to_Timestep"]

        file = epbunch.theidf.simulation_dir.files(filename)[0]

        delimeter = _separator(sep)
        skip_rows = int(rows) - 1  # We want to keep the column
        col = [int(column) - 1]  # zero-based
        epbunch = pd.read_csv(
            file, delimiter=delimeter, skiprows=skip_rows, usecols=col
        )

        return epbunch.iloc[:, 0].values

    @staticmethod
    def get_compact_ep_schedule_values(epbunch, start_date, strict) -> np.ndarray:
        """Get values for schedule:compact.

        Args:
            strict:
            start_date:
            epbunch (EpBunch): The schedule epbunch object.
        """
        field_sets = ["through", "for", "interpolate", "until", "value"]
        fields = epbunch.fieldvalues[3:]

        index = pd.date_range(start=start_date, periods=8760, freq="H")
        zeros = np.zeros(len(index))

        slicer_ = pd.Series([False] * len(index), index=index)
        series = pd.Series(zeros, index=index)

        from_day = start_date
        ep_from_day = datetime(start_date.year, 1, 1)
        from_time = "00:00"
        how_interpolate = None
        for field in fields:
            if any([spe in field.lower() for spe in field_sets]):
                f_set, hour, minute, value = _ScheduleParser._field_interpreter(
                    field, epbunch.Name
                )

                if f_set.lower() == "through":
                    # main condition. All sub-conditions must obey a
                    # `Through` condition

                    # First, initialize the slice (all False for now)
                    through_conditions = _ScheduleParser._invalidate_condition(series)

                    # reset from_time
                    from_time = "00:00"

                    # Prepare ep_to_day variable
                    ep_to_day = _ScheduleParser._date_field_interpretation(
                        value, start_date
                    ) + timedelta(days=1)

                    # Calculate Timedelta in days
                    days = (ep_to_day - ep_from_day).days
                    # Add timedelta to startDate
                    to_day = from_day + timedelta(days=days) + timedelta(hours=-1)

                    # slice the conditions with the range and apply True
                    through_conditions.loc[from_day:to_day] = True

                    from_day = to_day + timedelta(hours=1)
                    ep_from_day = ep_to_day
                elif f_set.lower() == "for":
                    # slice specific days
                    # reset from_time
                    from_time = "00:00"

                    for_condition = _ScheduleParser._invalidate_condition(series)
                    fors = value.split()
                    if len(fors) > 1:
                        # if multiple `For`. eg.: For: Weekends Holidays,
                        # Combine all conditions
                        for value in fors:
                            if value.lower() == "allotherdays":
                                # Apply condition to slice
                                how = _ScheduleParser._field_set(
                                    epbunch, value, start_date, slicer_, strict
                                )
                                # Reset for condition
                                for_condition = how
                            else:
                                how = _ScheduleParser._field_set(
                                    epbunch, value, start_date, slicer_, strict
                                )
                                if how is not None:
                                    for_condition.loc[how] = True
                    elif value.lower() == "allotherdays":
                        # Apply condition to slice
                        how = _ScheduleParser._field_set(
                            epbunch, value, start_date, slicer_, strict
                        )
                        # Reset for condition
                        for_condition = how
                    else:
                        # Apply condition to slice
                        how = _ScheduleParser._field_set(
                            epbunch, value, start_date, slicer_, strict
                        )
                        for_condition.loc[how] = True

                    # Combine the for_condition with all_conditions
                    all_conditions = through_conditions & for_condition

                    # update in memory slice
                    # self.sliced_day_.loc[all_conditions] = True
                elif "interpolate" in f_set.lower():
                    # we need to upsample to series to 8760 * 60 values
                    new_idx = pd.date_range(
                        start=start_date, periods=525600, closed="left", freq="T"
                    )
                    series = series.resample("T").pad()
                    series = series.reindex(new_idx)
                    series.fillna(method="pad", inplace=True)
                    through_conditions = through_conditions.resample("T").pad()
                    through_conditions = through_conditions.reindex(new_idx)
                    through_conditions.fillna(method="pad", inplace=True)
                    for_condition = for_condition.resample("T").pad()
                    for_condition = for_condition.reindex(new_idx)
                    for_condition.fillna(method="pad", inplace=True)
                    how_interpolate = value.lower()
                elif f_set.lower() == "until":
                    until_condition = _ScheduleParser._invalidate_condition(series)
                    if series.index.freq.name == "T":
                        # until_time = str(int(hour) - 1) + ':' + minute
                        until_time = timedelta(
                            hours=int(hour), minutes=int(minute)
                        ) - timedelta(minutes=1)

                    else:
                        until_time = str(int(hour) - 1) + ":" + minute
                    until_condition.loc[
                        until_condition.between_time(from_time, str(until_time)).index
                    ] = True
                    all_conditions = (
                        for_condition & through_conditions & until_condition
                    )

                    from_time = str(int(hour)) + ":" + minute
                elif f_set.lower() == "value":
                    # If the therm `Value: ` field is used, we will catch it
                    # here.
                    # update in memory slice
                    slicer_.loc[all_conditions] = True
                    series[all_conditions] = value
                else:
                    # Do something here before looping to the next Field
                    pass
            else:
                # If the term `Value: ` is not used; the variable is simply
                # passed in the Field
                value = float(field)
                series[all_conditions] = value

                # update in memory slice
                slicer_.loc[all_conditions] = True
        if how_interpolate:
            return series.resample("H").mean().values
        else:
            return series.values

    @classmethod
    def get_yearly_ep_schedule_values(cls, epbunch, start_date, strict) -> np.ndarray:
        """Get values for schedule:year.

        Args:
            strict:
            start_date (datetime):
            epbunch (EpBunch): the schedule epbunch.
        """
        # first week
        year = start_date.year
        idx = pd.date_range(start=start_date, periods=8760, freq="1H")
        hourly_values = pd.Series([0] * 8760, index=idx)

        # generate weekly schedules
        num_of_weekly_schedules = int(len(epbunch.fieldvalues[3:]) / 5)

        for i in range(num_of_weekly_schedules):
            ref = epbunch.get_referenced_object("ScheduleWeek_Name_{}".format(i + 1))

            start_month = getattr(epbunch, "Start_Month_{}".format(i + 1))
            end_month = getattr(epbunch, "End_Month_{}".format(i + 1))
            start_day = getattr(epbunch, "Start_Day_{}".format(i + 1))
            end_day = getattr(epbunch, "End_Day_{}".format(i + 1))

            start = datetime.strptime(
                "{}/{}/{}".format(year, start_month, start_day), "%Y/%m/%d"
            )
            end = datetime.strptime(
                "{}/{}/{}".format(year, end_month, end_day), "%Y/%m/%d"
            )
            days = (end - start).days + 1

            end_date = start_date + timedelta(days=days) + timedelta(hours=23)
            how = pd.IndexSlice[start_date:end_date]

            weeks = []
            for name, week in hourly_values.loc[how].groupby(pd.Grouper(freq="168H")):
                if not week.empty:
                    try:
                        week.loc[:] = cls.get_schedule_values(
                            sched_epbunch=ref,
                            start_date=week.index[0],
                            index=week.index,
                            strict=strict,
                        )
                    except ValueError:
                        week.loc[:] = cls.get_schedule_values(
                            sched_epbunch=ref, start_date=week.index[0], strict=strict
                        )[0 : len(week)]
                    finally:
                        weeks.append(week)
            new = pd.concat(weeks)
            hourly_values.update(new)
            start_date += timedelta(days=days)

        return hourly_values.values

    @staticmethod
    def get_schedule_values(
        sched_epbunch, start_date, index=None, strict=False
    ) -> list:
        """Get schedule values for epbunch.

        Args:
            strict:
            sched_epbunch (EpBunch): the schedule epbunch object
            start_date:
            index:
        """
        cls = _ScheduleParser
        sch_type = sched_epbunch.key.upper()

        if sch_type.upper() == "schedule:year".upper():
            hourly_values = cls.get_yearly_ep_schedule_values(
                sched_epbunch, start_date, strict
            )
        elif sch_type.upper() == "schedule:day:interval".upper():
            hourly_values = cls.get_interval_day_ep_schedule_values(sched_epbunch)
        elif sch_type.upper() == "schedule:day:hourly".upper():
            hourly_values = cls.get_hourly_day_ep_schedule_values(sched_epbunch)
        elif sch_type.upper() == "schedule:day:list".upper():
            hourly_values = cls.get_list_day_ep_schedule_values(
                sched_epbunch, start_date
            )
        elif sch_type.upper() == "schedule:week:compact".upper():
            hourly_values = cls.get_compact_weekly_ep_schedule_values(
                sched_epbunch, start_date, index, strict
            )
        elif sch_type.upper() == "schedule:week:daily".upper():
            hourly_values = cls.get_daily_weekly_ep_schedule_values(
                sched_epbunch, start_date, strict
            )
        elif sch_type.upper() == "schedule:constant".upper():
            hourly_values = cls.get_constant_ep_schedule_values(sched_epbunch)
        elif sch_type.upper() == "schedule:compact".upper():
            hourly_values = cls.get_compact_ep_schedule_values(
                sched_epbunch, start_date, strict
            )
        elif sch_type.upper() == "schedule:file".upper():
            hourly_values = cls.get_file_ep_schedule_values(sched_epbunch)
        else:
            log(
                "Archetypal does not currently support schedules of type "
                '"{}"'.format(sch_type),
                lg.WARNING,
            )
            hourly_values = []

        return list(hourly_values)

    @staticmethod
    def _field_interpreter(field, name):
        """Deal with a Field-Set (Through, For, Interpolate, # Until, Value).

        Args:
            name:
            field:

        Returns:
            string: the parsed string
        """
        values_sets = [
            "weekdays",
            "weekends",
            "alldays",
            "allotherdays",
            "sunday",
            "monday",
            "tuesday",
            "wednesday",
            "thursday",
            "friday",
            "saturday",
            "summerdesignday",
            "winterdesignday",
            "holiday",
        ]
        keywords = None

        if "through" in field.lower():
            # deal with through
            if ":" in field.lower():
                # parse colon
                f_set, statement = field.split(":")
                hour = None
                minute = None
                value = statement.strip()
            else:
                msg = (
                    'The schedule "{sch}" contains a Field '
                    'that is not understood: "{field}"'.format(sch=name, field=field)
                )
                raise NotImplementedError(msg)
        elif "for" in field.lower():
            keywords = [word for word in values_sets if word in field.lower()]
            if ":" in field.lower():
                # parse colon
                f_set, statement = field.split(":")
                value = statement.strip()
                hour = None
                minute = None
            elif keywords:
                # get epBunch of the sizing period
                statement = " ".join(keywords)
                f_set = [s for s in field.split() if "for" in s.lower()][0]
                value = statement.strip()
                hour = None
                minute = None
            else:
                # parse without a colon
                msg = (
                    'The schedule "{sch}" contains a Field '
                    'that is not understood: "{field}"'.format(sch=name, field=field)
                )
                raise NotImplementedError(msg)
        elif "interpolate" in field.lower():
            msg = (
                'The schedule "{sch}" contains sub-hourly values ('
                'Field-Set="{field}"). The average over the hour is '
                "taken".format(sch=name, field=field)
            )
            log(msg, lg.WARNING)
            f_set, value = field.split(":")
            hour = None
            minute = None
        elif "until" in field.lower():
            if ":" in field.lower():
                # parse colon
                try:
                    f_set, hour, minute = field.split(":")
                    hour = hour.strip()  # remove trailing spaces
                    minute = minute.strip()  # remove trailing spaces
                    value = None
                except Exception:
                    f_set = "until"
                    hour, minute = field.split(":")
                    hour = hour[-2:].strip()
                    minute = minute.strip()
                    value = None
            else:
                msg = (
                    'The schedule "{sch}" contains a Field '
                    'that is not understood: "{field}"'.format(sch=name, field=field)
                )
                raise NotImplementedError(msg)
        elif "value" in field.lower():
            if ":" in field.lower():
                # parse colon
                f_set, statement = field.split(":")
                value = statement.strip()
                hour = None
                minute = None
            else:
                msg = (
                    'The schedule "{sch}" contains a Field '
                    'that is not understood: "{field}"'.format(sch=name, field=field)
                )
                raise NotImplementedError(msg)
        else:
            # deal with the data value
            f_set = field
            hour = None
            minute = None
            value = field[len(field) + 1 :].strip()

        return f_set, hour, minute, value

    @staticmethod
    def _invalidate_condition(series):
        index = series.index
        periods = len(series)
        return pd.Series([False] * periods, index=index)

    @staticmethod
    def get_schedule_type_limits_data(epbunch):
        """Return schedule type limits info for epbunch."""
        try:
            schedule_limit_name = epbunch.Schedule_Type_Limits_Name
        except Exception:
            # this schedule is probably a 'Schedule:Week:Daily' which does
            # not have a Schedule_Type_Limits_Name field
            return "", "", "", ""
        else:
            (
                lower_limit,
                upper_limit,
                numeric_type,
                unit_type,
            ) = epbunch.theidf.get_schedule_type_limits_data_by_name(
                schedule_limit_name
            )

            return lower_limit, upper_limit, numeric_type, unit_type

    @staticmethod
    def _field_set(schedule_epbunch, field, start_date, slicer_=None, strict=False):
        """Return the proper slicer depending on the _field_set value.

        Available values are: Weekdays, Weekends, Holidays, Alldays,
        SummerDesignDay, WinterDesignDay, Sunday, Monday, Tuesday, Wednesday,
        Thursday, Friday, Saturday, CustomDay1, CustomDay2, AllOtherDays

        Args:
            start_date:
            strict:
            schedule_epbunch:
            field (str): The EnergyPlus field set value.
            slicer_:

        Returns:
            (indexer-like): Returns the appropriate indexer for the series.
        """
        if field.lower() == "weekdays":
            # return only days of weeks
            return lambda x: x.index.dayofweek < 5
        elif field.lower() == "weekends":
            # return only weekends
            return lambda x: x.index.dayofweek >= 5
        elif field.lower() == "alldays":
            # return all days := equivalent to .loc[:]
            return pd.IndexSlice[:]
        elif field.lower() == "allotherdays":
            # return unused days (including special days). Uses the global
            # variable `slicer_`
            import operator

            if slicer_ is not None:
                return _conjunction(
                    *[
                        _ScheduleParser.special_day(
                            schedule_epbunch, field, slicer_, strict, start_date
                        ),
                        ~slicer_,
                    ],
                    logical=operator.or_,
                )
            else:
                raise NotImplementedError
        elif field.lower() == "sunday":
            # return only sundays
            return lambda x: x.index.dayofweek == 6
        elif field.lower() == "monday":
            # return only mondays
            return lambda x: x.index.dayofweek == 0
        elif field.lower() == "tuesday":
            # return only Tuesdays
            return lambda x: x.index.dayofweek == 1
        elif field.lower() == "wednesday":
            # return only Wednesdays
            return lambda x: x.index.dayofweek == 2
        elif field.lower() == "thursday":
            # return only Thursdays
            return lambda x: x.index.dayofweek == 3
        elif field.lower() == "friday":
            # return only Fridays
            return lambda x: x.index.dayofweek == 4
        elif field.lower() == "saturday":
            # return only Saturdays
            return lambda x: x.index.dayofweek == 5
        elif field.lower() == "summerdesignday":
            # return _ScheduleParser.design_day(
            #     schedule_epbunch, field, slicer_, start_date, strict
            # )
            return None
        elif field.lower() == "winterdesignday":
            # return _ScheduleParser.design_day(
            #     schedule_epbunch, field, slicer_, start_date, strict
            # )
            return None
        elif field.lower() == "holiday" or field.lower() == "holidays":
            field = "holiday"
            return _ScheduleParser.special_day(
                schedule_epbunch, field, slicer_, strict, start_date
            )
        elif not strict:
            # If not strict, ignore missing field-sets such as CustomDay1
            return lambda x: x < 0
        else:
            raise NotImplementedError(
                f"Archetypal does not yet support The Field_set '{field}'"
            )

    @staticmethod
    def _date_field_interpretation(field, start_date):
        """Date Field Interpretation.

        Info:
            See EnergyPlus documentation for more details: 1.6.8.1.2 Field:
            Start Date (Table 1.4: Date Field Interpretation)

        Args:
            start_date:
            field (str): The EnergyPlus Field Contents

        Returns:
            (datetime): The datetime object
        """
        # < number > Weekday in Month
        formats = ["%m/%d", "%d %B", "%B %d", "%d %b", "%b %d"]
        date = None
        for format_str in formats:
            # Tru to parse using each defined formats
            try:
                date = datetime.strptime(field, format_str)
            except Exception:
                pass
            else:
                date = datetime(start_date.year, date.month, date.day)
        if date is None:
            # if the defined formats did not work, try the fancy parse
            try:
                date = _ScheduleParser._parse_fancy_string(field, start_date)
            except Exception as e:
                msg = (
                    f"the schedule contains a "
                    f"Field that is not understood: '{field}'"
                )
                raise ValueError(msg, e)
            else:
                return date
        else:
            return date

    @staticmethod
    def _parse_fancy_string(field, start_date):
        """Parse cases such as `3rd Monday in February` or `Last Weekday In Month`.

        Args:
            start_date:
            field (str): The EnergyPlus Field Contents

        Returns:
            (datetime): The datetime object
        """
        import re

        # split the string at the term ' in '
        time, month = field.lower().split(" in ")
        month = datetime.strptime(month, "%B").month

        # split the first part into nth and dayofweek
        nth, dayofweek = time.split(" ")
        if "last" in nth:
            nth = -1  # Use the last one
        else:
            nth = re.findall(r"\d+", nth)  # use the nth one
            nth = int(nth[0]) - 1  # python is zero-based

        weekday = {
            "monday": 0,
            "tuesday": 1,
            "wednesday": 2,
            "thursday": 3,
            "friday": 4,
            "saturday": 5,
            "sunday": 6,
        }

        # parse the dayofweek eg. monday
        dayofweek = weekday.get(dayofweek, 6)

        # create list of possible days using Calendar
        import calendar

        c = calendar.Calendar(firstweekday=start_date.weekday())
        monthcal = c.monthdatescalendar(start_date.year, month)

        # iterate though the month and get the nth weekday
        date = [
            day
            for week in monthcal
            for day in week
            if day.weekday() == dayofweek and day.month == month
        ][nth]
        return datetime(date.year, date.month, date.day)

    @staticmethod
    def special_day(schedule_epbunch, field, slicer_, strict, start_date):
        """Try to get the RunPeriodControl:SpecialDays for the corresponding DayType.

        Args:
            start_date:
            strict:
            field:
            slicer_:
        """
        sp_slicer_ = slicer_.copy()
        sp_slicer_.loc[:] = False
        special_day_types = ["holiday", "customday1", "customday2"]

        dds = schedule_epbunch.theidf.idfobjects["RunPeriodControl:SpecialDays".upper()]
        dd = [
            dd
            for dd in dds
            if dd.Special_Day_Type.lower() == field
            or dd.Special_Day_Type.lower() in special_day_types
        ]
        if len(dd) > 0:
            for dd in dd:
                # can have more than one special day types
                field = dd.Start_Date
                special_day_start_date = _ScheduleParser._date_field_interpretation(
                    field, start_date
                )
                duration = int(dd.Duration)
                to_date = (
                    special_day_start_date
                    + timedelta(days=duration)
                    + timedelta(hours=-1)
                )

                sp_slicer_.loc[special_day_start_date:to_date] = True
            return sp_slicer_
        elif not strict:
            return sp_slicer_
        else:
            msg = (
                'Could not find a "SizingPeriod:DesignDay" object '
                'needed for schedule with Day Type "{}"'.format(field.capitalize())
            )
            raise ValueError(msg)

    @staticmethod
    def design_day(schedule_epbunch, field, slicer_, start_date, strict):
        """Try to get the SizingPeriod:DesignDay for the corresponding Day Type.

        Args:
            strict:
            start_date:
            schedule_epbunch:
            field:
            slicer_:
        """
        sp_slicer_ = slicer_.copy()
        sp_slicer_.loc[:] = False
        dds = schedule_epbunch.theidf.idfobjects["SizingPeriod:DesignDay".upper()]
        dd = [dd for dd in dds if dd.Day_Type.lower() == field]
        if len(dd) > 0:
            for dd in dd:
                # should have found only one design day matching the Day Type
                month = dd.Month
                day = dd.Day_of_Month
                data = str(month) + "/" + str(day)
                ep_start_date = _ScheduleParser._date_field_interpretation(
                    data, start_date
                )
                ep_orig = datetime(start_date.year, 1, 1)
                days_to_speciald = (ep_start_date - ep_orig).days
                duration = 1  # Duration of 1 day
                from_date = start_date + timedelta(days=days_to_speciald)
                to_date = from_date + timedelta(days=duration) + timedelta(hours=-1)

                sp_slicer_.loc[from_date:to_date] = True
            return sp_slicer_
        elif not strict:
            return sp_slicer_
        else:
            msg = (
                f"Could not find a 'SizingPeriod:DesignDay' object "
                f"needed for schedule with Day Type '{field.capitalize()}'"
            )
            raise ValueError(msg)
            data = [dd[0].Month, dd[0].Day_of_Month]
            date = "/".join([str(item).zfill(2) for item in data])
            date = _ScheduleParser._date_field_interpretation(date, start_date)
            return lambda x: x.index == date


[docs]class Schedule: """Class handling any EnergyPlus schedule object.""" def __init__( self, Name: str, start_day_of_the_week: FrozenSet[Literal[0, 1, 2, 3, 4, 5, 6]] = 0, strict: bool = False, Type: Union[str, ScheduleTypeLimits] = None, Values: Union[List[Union[int, float]], np.ndarray] = None, **kwargs, ): """Initialize object. Args: Name (str): The schedule name in the idf model. start_day_of_the_week (int): 0-based day of week (Monday=0). Default is None which looks for the start day in the IDF model. strict (bool): if True, schedules that have the Field-Sets such as Holidays and CustomDay will raise an error if they are absent from the IDF file. If False, any missing qualifiers will be ignored. Type (str, ScheduleTypeLimits): This field contains a reference to the Schedule Type Limits object. If found in a list of Schedule Type Limits (see above), then the restrictions from the referenced object will be used to validate the current field values. If None, no validation will occur. Values (ndarray): A 24 or 8760 list of schedule values. **kwargs: """ try: super(Schedule, self).__init__(Name, **kwargs) except Exception: pass # todo: make this more robust self.Name = Name self.strict = strict self.startDayOfTheWeek = start_day_of_the_week self.year = get_year_for_first_weekday(self.startDayOfTheWeek) self.Values = Values self.Type = Type @property def Type(self): """Get or set the schedule type limits object. Can be None.""" return self._schedule_type_limits @Type.setter def Type(self, value): if isinstance(value, str): if "fraction" in value.lower(): value = ScheduleTypeLimits("Fraction", 0, 1) elif value.lower() == "temperature": value = ScheduleTypeLimits("Temperature", -100, 100) elif value.lower() == "on/off": value = ScheduleTypeLimits("On/Off", 0, 1, "Discrete", "availability") else: value = ScheduleTypeLimits("Fraction", 0, 1) log( f"'{value}' is not a known ScheduleTypeLimits for " f"{self.Name}. Please instantiate the object before " f"passing as the 'Type' parameter.", level=lg.WARNING, ) assert isinstance(value, ScheduleTypeLimits), value self._schedule_type_limits = value @property def Values(self): """Get or set the list of schedule values.""" return self._values @Values.setter def Values(self, value): if isinstance(value, np.ndarray): assert value.ndim == 1, value.ndim value = value.tolist() self._values = validators.iterable(value, allow_empty=True) @property def Name(self): """Get or set the name of the schedule.""" return self._name @Name.setter def Name(self, value): self._name = value
[docs] @classmethod def from_values( cls, Name: str, Values: List[Union[float, int]], Type: str = "Fraction", **kwargs, ): """Create a Schedule from a list of Values.""" return cls(Name=Name, Values=Values, Type=Type, **kwargs)
[docs] @classmethod def from_epbunch(cls, epbunch, strict=False, Type=None, **kwargs): """Create a Schedule from an epbunch. Args: epbunch: strict: **kwargs: """ if Type is None: try: type_limit_ep = epbunch.get_referenced_object( "Schedule_Type_Limits_Name" ) Type = ScheduleTypeLimits.from_epbunch(type_limit_ep) except (BadEPFieldError, AttributeError): pass name = epbunch.Name start_day_of_the_week = epbunch.theidf.day_of_week_for_start_day start_date = datetime(get_year_for_first_weekday(start_day_of_the_week), 1, 1) schedule = cls( Name=kwargs.pop("Name", name), epbunch=epbunch, start_day_of_the_week=kwargs.pop( "start_day_of_the_week", start_day_of_the_week ), Type=Type, DataSource=kwargs.pop("DataSource", epbunch.theidf.name), Values=np.array( _ScheduleParser.get_schedule_values( epbunch, start_date=start_date, strict=strict ) ), **kwargs, ) return schedule
[docs] @classmethod def constant_schedule(cls, value=1, Name="AlwaysOn", Type="Fraction", **kwargs): """Initialize a schedule with a constant value for the whole year. Defaults to a schedule with a value of 1, named 'AlwaysOn'. Args: value (float, optional): The value for the constant schedule. Defaults to 1. Name (str, optional): The name of the schedule. Defaults to Always On. **kwargs: """ return cls.from_values( Name=Name, Values=np.ones((8760,)) * value, **kwargs, )
@property def all_values(self) -> np.ndarray: """Return numpy array of schedule Values.""" return np.array(self._values) @all_values.setter def all_values(self, value): self._values = validators.iterable(value, maximum_length=8760) @property def max(self): """Get the maximum value of the schedule.""" return max(self.all_values) @property def min(self): """Get the minimum value of the schedule.""" return min(self.all_values) @property def mean(self): """Get the mean value of the schedule.""" return np.average(self.all_values) @property def series(self): """Return an :class:`EnergySeries`.""" index = pd.date_range( start=self.startDate, periods=self.all_values.size, freq="1H" ) if self.Type is not None: units = self.Type.UnitType else: units = None return EnergySeries(self.all_values, index=index, name=self.Name, units=units)
[docs] @staticmethod def get_schedule_type_limits_name(epbunch): """Return the Schedule Type Limits name associated to this schedule.""" try: schedule_limit_name = epbunch.Schedule_Type_Limits_Name except Exception: return "unknown" else: return schedule_limit_name
@property def startDate(self): """Get the start date of the schedule. Satisfies `startDayOfTheWeek`.""" year = get_year_for_first_weekday(self.startDayOfTheWeek) return datetime(year, 1, 1)
[docs] def scale(self, diversity=0.1): """Scale the schedule values by a diversity factor around the average.""" average = np.average(self.Values) new_values = ((average - self.Values) * diversity) + self.Values self.Values = new_values return self
[docs] def replace(self, new_values: Union[pd.Series]): """Replace values with new values while keeping the full load hours constant. Time steps that are not specified in `new_values` will be adjusted to keep the full load hours of the schedule constant. No check whether the new schedule stays between the bounds set by self.Type is done. Be aware. """ assert isinstance(new_values.index, pd.DatetimeIndex), ( "The index of `new_values` must be a `pandas.DatetimeIndex`. Instead, " f"`{type(new_values.index)}` was provided." ) assert not self.series.index.difference(new_values.index).empty, ( "There is no overlap between self.index and new_values.index. Please " "check your dates." ) # create a copy of self.series as orig. orig = self.series.copy() new_data = new_values.values # get the new_values index idx = new_values.index # compute the difference in values with the original data and the new data. diff = orig.loc[idx] - new_data.reshape(-1) # replace the original data with new values at their location. orig.loc[idx] = new_values # adjust remaining time steps with the average difference. Inplace. orig.loc[orig.index.difference(idx)] += diff.sum() / len( orig.index.difference(idx) ) new = orig # assert the sum has not changed as a sanity check. np.testing.assert_array_almost_equal(self.series.sum(), new.sum()) # replace values of self with new values. self.Values = new.tolist()
[docs] def plot(self, **kwargs): """Plot the schedule. Implements the .loc accessor on the series object. Notes: Plotting can also be acheived through the series property: `Schedule.series.plot()`. Examples: >>> from archetypal import IDF >>> idf = IDF() >>> epbunch = idf.schedules_dict["NECB-A-Thermostat Setpoint-Heating"] >>> s = Schedule.from_epbunch(epbunch) >>> ) >>> s.plot(drawstyle="steps-post") Args: **kwargs (dict): keyword arguments passed to :meth:`EnergySeries.plot`. """ return self.series.plot(**kwargs)
[docs] def plot2d(self, **kwargs): """Plot the carpet plot of the schedule.""" if self.Type is not None: vmin = self.Type.LowerLimit vmax = self.Type.UpperLimit else: vmin, vmax = (None, None) pretty_plot_kwargs = { "cmap": "Greys", "show": True, "figsize": (7, 2), "dpi": 72, "vmin": vmin, "vmax": vmax, } pretty_plot_kwargs.update(kwargs) return self.series.plot2d(**pretty_plot_kwargs)
plot2d.__doc__ += EnergySeries.plot2d.__doc__
[docs] def to_year_week_day(self): """Convert to three-tuple epbunch given an idf model. Returns 'Schedule:Year', 'Schedule:Week:Daily' and 'Schedule:Day:Hourly' representations. Returns: 3-element tuple containing - **yearly** (*Schedule*): The yearly schedule object - **weekly** (*list of Schedule*): The list of weekly schedule objects - **daily** (*list of Schedule*):The list of daily schedule objects """ from archetypal.template.schedule import ( DaySchedule, WeekSchedule, YearSchedule, YearSchedulePart, ) full_year = np.array(self.all_values) # array of shape (8760,) # reshape to (365, 24) Values = full_year.reshape(-1, 24) # shape (365, 24) # create unique days unique_days, nds = np.unique(Values, axis=0, return_inverse=True) ep_days = [] dict_day = {} for count_day, unique_day in enumerate(unique_days): name = f"d_{self.Name}_{count_day:02d}" dict_day[name] = unique_day # Create idf_objects for schedule:day:hourly ep_day = DaySchedule( Name=name, Type=self.Type, Values=[unique_day[i] for i in range(24)] ) ep_days.append(ep_day) # create unique weeks from unique days try: unique_weeks, nwsi, nws, count = np.unique( full_year[: 364 * 24, ...].reshape(-1, 168), return_index=True, axis=0, return_inverse=True, return_counts=True, ) except ValueError: raise ValueError( "Looks like the idf model needs to be rerun with 'annual=True'" ) # We use the calendar module to set the week days order import calendar # initialize the calendar object c = calendar.Calendar(firstweekday=self.startDayOfTheWeek) # Appending unique weeks in dictionary with name and values of weeks as # keys # {'name_week': {'dayName':[]}} dict_week = {} for count_week, unique_week in enumerate(unique_weeks): week_id = f"w_{self.Name}_{count_week:02d}" dict_week[week_id] = {} for i, day in zip(list(range(0, 7)), list(c.iterweekdays())): day_of_week = unique_week[..., i * 24 : (i + 1) * 24] for key, ep_day in zip(dict_day, ep_days): if (day_of_week == dict_day[key]).all(): dict_week[week_id]["day_{}".format(day)] = ep_day # Create idf_objects for schedule:week:daily # Create ep_weeks list and iterate over dict_week ep_weeks = {} for week_id in dict_week: ep_week = WeekSchedule( Name=week_id, Days=[ dict_week[week_id]["day_{}".format(day_num)] for day_num in c.iterweekdays() ], ) ep_weeks[week_id] = ep_week blocks = {} from_date = datetime(self.year, 1, 1) bincount = [sum(1 for _ in group) for key, group in groupby(nws + 1) if key] week_order = { i: v for i, v in enumerate( np.array([key for key, group in groupby(nws + 1) if key]) - 1 ) } for i, (week_n, count) in enumerate(zip(week_order, bincount)): week_id = list(dict_week)[week_order[i]] to_date = from_date + timedelta(days=int(count * 7), hours=-1) blocks[i] = YearSchedulePart( FromDay=from_date.day, FromMonth=from_date.month, ToDay=to_date.day, ToMonth=to_date.month, Schedule=ep_weeks[week_id], ) from_date = to_date + timedelta(hours=1) # If this is the last block, force end of year if i == len(bincount) - 1: blocks[i].ToDay = 31 blocks[i].ToMonth = 12 ep_year = YearSchedule(self.Name, Type=self.Type, Parts=list(blocks.values())) return ep_year, list(ep_weeks.values()), ep_days
def __len__(self): """Get the length of all values of the schedule.""" return len(self.all_values) def __add__(self, other): """Add self and other.""" if isinstance(other, Schedule): return self.all_values + other.all_values elif isinstance(other, list): return self.all_values + other else: raise NotImplementedError def __sub__(self, other): """Subtract self and other.""" if isinstance(other, Schedule): return self.all_values - other.all_values elif isinstance(other, list): return self.all_values - other else: raise NotImplementedError def __mul__(self, other): """Multiply self with other.""" if isinstance(other, Schedule): return self.all_values * other.all_values elif isinstance(other, list): return self.all_values * other else: raise NotImplementedError def _repr_svg_(self): """SVG representation for iPython notebook.""" if self.Type is not None: vmin = self.Type.LowerLimit vmax = self.Type.UpperLimit else: vmin, vmax = (None, None) fig, ax = self.series.plot2d( cmap="Greys", show=False, figsize=(7, 2), dpi=72, vmin=vmin, vmax=vmax ) f = io.BytesIO() fig.savefig(f, format="svg") return f.getvalue()
[docs] def combine(self, other, weights=None, quantity=None): """Combine two schedule objects together. Args: other (Schedule): the other Schedule object to combine with. weights (list-like, optional): A list-like object of len 2. If None, equal weights are used. quantity: scalar value that will be multiplied by self before the averaging occurs. This ensures that the resulting schedule returns the correct integrated value. Returns: (Schedule): the combined Schedule object. """ # Check if other is None. Simply return self if not other: return self if not self: return other # Check if other is the same type as self if not isinstance(other, self.__class__): msg = "Cannot combine %s with %s" % ( self.__class__.__name__, other.__class__.__name__, ) raise NotImplementedError(msg) # check if the schedule is the same if all(self.all_values == other.all_values): return self if not weights: weights = [1, 1] new_values = np.average( [self.all_values, other.all_values], axis=0, weights=weights ) # the new object's name name = "+".join([self.Name, other.Name]) new_obj = self.__class__(name, value=new_values) return new_obj
def _conjunction(*conditions, logical=np.logical_and): return functools.reduce(logical, conditions) def _separator(sep): if sep == "Comma": return "," elif sep == "Tab": return "\t" elif sep == "Fixed": return None elif sep == "Semicolon": return ";" else: return "," def _how(how): if how.lower() == "average": return "mean" elif how.lower() == "linear": return "interpolate" elif how.lower() == "no": return "max" else: return "max" def get_year_for_first_weekday(weekday: FrozenSet[Literal[0, 1, 2, 3, 4, 5, 6]] = 0): """Get the year that starts on 'weekday', eg. Monday=0. Args: weekday (int): 0-based day of week (Monday=0). Default is None which looks for the start day in the IDF model. Returns: (int): The year number for which the first starts on :attr:`weekday`. """ import calendar if weekday > 6: raise ValueError("weekday must be between 0 and 6") year = 2020 not_found = True while not_found: firstday = calendar.weekday(year, 1, 1) if firstday == weekday and not calendar.isleap(year): not_found = False else: year = year - 1 return year