Shortcuts

Source code for hfai.datasets.ltsf

from typing import List

import numpy as np
import pandas as pd
from pandas.tseries import offsets
from pandas.tseries.frequencies import to_offset
import torch

from .base import BaseDataset, register_dataset, get_data_dir

"""
Expected file organization:

    [data_dir]
        ETTh1.csv
        ETTh2.csv
        ETTm1.csv
        ETTm2.csv
        exchange_rate.csv
        electricity.csv
        national_illness.csv
        traffic.csv
"""


class TimeFeature:
    def __init__(self):
        pass

    def __call__(self, index: pd.DatetimeIndex) -> np.ndarray:
        pass

    def __repr__(self):
        return self.__class__.__name__ + "()"


class SecondOfMinute(TimeFeature):
    """Minute of hour encoded as value between [-0.5, 0.5]"""

    def __call__(self, index: pd.DatetimeIndex) -> np.ndarray:
        return index.second / 59.0 - 0.5


class MinuteOfHour(TimeFeature):
    """Minute of hour encoded as value between [-0.5, 0.5]"""

    def __call__(self, index: pd.DatetimeIndex) -> np.ndarray:
        return index.minute / 59.0 - 0.5


class HourOfDay(TimeFeature):
    """Hour of day encoded as value between [-0.5, 0.5]"""

    def __call__(self, index: pd.DatetimeIndex) -> np.ndarray:
        return index.hour / 23.0 - 0.5


class DayOfWeek(TimeFeature):
    """Hour of day encoded as value between [-0.5, 0.5]"""

    def __call__(self, index: pd.DatetimeIndex) -> np.ndarray:
        return index.dayofweek / 6.0 - 0.5


class DayOfMonth(TimeFeature):
    """Day of month encoded as value between [-0.5, 0.5]"""

    def __call__(self, index: pd.DatetimeIndex) -> np.ndarray:
        return (index.day - 1) / 30.0 - 0.5


class DayOfYear(TimeFeature):
    """Day of year encoded as value between [-0.5, 0.5]"""

    def __call__(self, index: pd.DatetimeIndex) -> np.ndarray:
        return (index.dayofyear - 1) / 365.0 - 0.5


class MonthOfYear(TimeFeature):
    """Month of year encoded as value between [-0.5, 0.5]"""

    def __call__(self, index: pd.DatetimeIndex) -> np.ndarray:
        return (index.month - 1) / 11.0 - 0.5


class WeekOfYear(TimeFeature):
    """Week of year encoded as value between [-0.5, 0.5]"""

    def __call__(self, index: pd.DatetimeIndex) -> np.ndarray:
        return (index.isocalendar().week - 1) / 52.0 - 0.5


def time_features_from_frequency_str(freq_str: str) -> List[TimeFeature]:
    """Returns a list of time features that will be appropriate for the given frequency string.

    Args:
        freq_str (str): Frequency string of the form [multiple][granularity] such as "12H", "5min", "1D" etc.
    """

    features_by_offsets = {
        offsets.YearEnd: [],
        offsets.QuarterEnd: [MonthOfYear],
        offsets.MonthEnd: [MonthOfYear],
        offsets.Week: [DayOfMonth, WeekOfYear],
        offsets.Day: [DayOfWeek, DayOfMonth, DayOfYear],
        offsets.BusinessDay: [DayOfWeek, DayOfMonth, DayOfYear],
        offsets.Hour: [HourOfDay, DayOfWeek, DayOfMonth, DayOfYear],
        offsets.Minute: [
            MinuteOfHour,
            HourOfDay,
            DayOfWeek,
            DayOfMonth,
            DayOfYear,
        ],
        offsets.Second: [
            SecondOfMinute,
            MinuteOfHour,
            HourOfDay,
            DayOfWeek,
            DayOfMonth,
            DayOfYear,
        ],
    }

    offset = to_offset(freq_str)

    for offset_type, feature_classes in features_by_offsets.items():
        if isinstance(offset, offset_type):
            return [cls() for cls in feature_classes]

    supported_freq_msg = f"""
    Unsupported frequency {freq_str}
    The following frequencies are supported:
        Y   - yearly
            alias: A
        M   - monthly
        W   - weekly
        D   - daily
        B   - business days
        H   - hourly
        T   - minutely
            alias: min
        S   - secondly
    """
    raise RuntimeError(supported_freq_msg)


def time_features(dates, timeenc=1, freq="h"):
    """
    ``time_features`` takes in a ``dates`` dataframe with a `'dates`' column and extracts the date down to ``freq`` where freq can be any of the following if ``timeenc`` is 0:

    .. code-block:: python

        m - [month]
        w - [month]
        d - [month, day, weekday]
        b - [month, day, weekday]
        h - [month, day, weekday, hour]
        t - [month, day, weekday, hour, *minute]

    If ``timeenc`` is 1, a similar, but different list of ``freq`` values are supported (all encoded between [-0.5 and 0.5]):

    .. code-block:: python

        Q - [month]
        M - [month]
        W - [Day of month, week of year]
        D - [Day of week, day of month, day of year]
        B - [Day of week, day of month, day of year]
        H - [Hour of day, day of week, day of month, day of year]
        T - [Minute of hour*, hour of day, day of week, day of month, day of year]
        S - [Second of minute, minute of hour, hour of day, day of week, day of month, day of year]

    minute returns a number from 0-3 corresponding to the 15 minute period it falls into.
    """
    if timeenc == 0:
        dates["month"] = dates.date.apply(lambda row: row.month, 1)
        dates["day"] = dates.date.apply(lambda row: row.day, 1)
        dates["weekday"] = dates.date.apply(lambda row: row.weekday(), 1)
        dates["hour"] = dates.date.apply(lambda row: row.hour, 1)
        dates["minute"] = dates.date.apply(lambda row: row.minute, 1)
        dates["minute"] = dates.minute.map(lambda x: x // 15)
        freq_map = {
            "y": [],
            "m": ["month"],
            "w": ["month"],
            "d": ["month", "day", "weekday"],
            "b": ["month", "day", "weekday"],
            "h": ["month", "day", "weekday", "hour"],
            "t": ["month", "day", "weekday", "hour", "minute"],
        }
        return dates[freq_map[freq.lower()]].values
    if timeenc == 1:
        dates = pd.to_datetime(dates.date.values)
        return np.vstack([feat(dates) for feat in time_features_from_frequency_str(freq)]).transpose(1, 0)


class StandardScaler:
    def __init__(self, mean, std):
        self.mean = mean
        self.std = std

    def inverse_transform(self, data):
        mean = torch.from_numpy(self.mean).type_as(data).to(data.device) if torch.is_tensor(data) else self.mean
        std = torch.from_numpy(self.std).type_as(data).to(data.device) if torch.is_tensor(data) else self.std
        return (data * std) + mean


[docs]@register_dataset class LTSF(BaseDataset): """ 这是一个用于长时序预测的时间序列数据集 该数据集包含电力、经济、医疗、交通等不同领域下的长期时间序列数据。更多信息参考:https://github.com/thuml/Autoformer Args: data_name (str): 具体的数据名字,包括:``ETTh1``,``ETTh2``,``ETTm1``,``ETTm2``,``exchange_rate``,``electricity``,``national_illness``,``traffic`` split (str): 数据集划分形式,包括:训练集(``train``)或者验证集(``val``) seq_len (int): 描述时间序列数据的序列长度 label_len (int): 描述时间序列数据的标签长度 pred_len (int): 描述时间序列数据的预测长度 features (str): 预测的维度,包括:多维预测多维(``M``),单维预测单维(``S``),多维预测单维(``MS``) target (str): 具体要预测的指标列,当 ``features`` 为 ``S`` 或者 ``MS`` 时奏效 timeenc (int): 不同的时间特征编码,默认为 ``0`` freq (str): 时间特征编码的频率, 包括:每秒(``s``),每分钟(``t``),每小时(``h``),每天(``d``),每工作日(``b``),每星期(``w``),每月(``m``)。你也可以选择更定制的频次,例如:``15min`` 或者 ``3h`` Returns: seq_x, seq_y, seq_x_mask, seq_y_mask (np.ndarray, np.ndarray, np.ndarray, np.ndarray): 返回的每个样本是一个四元组,包括历史指标序列,未来指标序列,历史时间位置信息编码,未来时间位置信息编码 Examples: .. code-block:: python from hfai.datasets import LTSF dataset = LTSF(data_name, split, seq_len, label_len, pred_len, features, target, timeenc, freq) loader = dataset.loader(batch_size=64, num_workers=4) for seq_x, seq_y, seq_x_mask, seq_y_mask in loader: # training model """ def __init__( self, data_name: str, split: str, seq_len=96, label_len=48, pred_len=24, features="S", target="OT", timeenc=0, freq="h", ) -> None: super(LTSF, self).__init__() assert data_name in [ "ETTh1", "ETTh2", "ETTm1", "ETTm2", "exchange_rate", "electricity", "national_illness", "traffic", ] self.data_path = str(get_data_dir() / "LongTermSeriesData" / f"{data_name}.csv") assert split in ["train", "val"] self.split = split self.seq_len = seq_len self.label_len = label_len self.pred_len = pred_len self.features = features self.target = target self.timeenc = timeenc self.freq = freq self.dataset = self.__load_dataset__() def __len__(self): return len(self.dataset) def __load_dataset__(self): df_raw = pd.read_csv(self.data_path) num_train = int(len(df_raw) * 0.8) num_val = len(df_raw) - num_train if self.split == "train": split_start, split_end = 0, num_train elif self.split == "val": split_start, split_end = num_train - self.seq_len, num_train + num_val else: raise ValueError("invalid parameters: split: {}".format(self.split)) if self.features == "M" or self.features == "MS": cols_data = df_raw.columns[1:] df_data = df_raw[cols_data] elif self.features == "S": df_data = df_raw[[self.target]] else: raise ValueError("invalid parameters: features: {}".format(self.features)) mean, std = df_data.values.mean(0), df_data.values.std(0) data = (df_data.values - mean) / std self.scaler = StandardScaler(mean, std) data_x = data[split_start:split_end] data_y = df_data.values[split_start:split_end] df_stamp = df_raw[["date"]][split_start:split_end] df_stamp["date"] = pd.to_datetime(df_stamp.date) data_stamp = time_features(df_stamp, timeenc=self.timeenc, freq=self.freq) dataset = [] for i in range(len(data_x) - self.seq_len - self.pred_len + 1): s_begin = i s_end = s_begin + self.seq_len r_begin = s_end - self.label_len r_end = r_begin + self.label_len + self.pred_len seq_x = data_x[s_begin:s_end] seq_y = np.concatenate( [data_x[r_begin : r_begin + self.label_len], data_y[r_begin + self.label_len : r_end]], 0 ) seq_x_mark = data_stamp[s_begin:s_end] seq_y_mark = data_stamp[r_begin:r_end] dataset.append((seq_x, seq_y, seq_x_mark, seq_y_mark)) return dataset def __getitem__(self, indices): samples = [] for i in indices: item = self.dataset[i] samples.append(item) return samples
[docs] def get_scaler(self): """获取LTSF数据的统计特征信息。 Returns: 数据分布统计对象,包含:指标数据的均值(``mean``),指标数据的方差(``std``) """ return self.scaler