{ "cells": [ { "attachments": {}, "cell_type": "markdown", "metadata": { "id": "lRddWmT6YRuA" }, "source": [ "# Time Window Data Loader" ] }, { "attachments": {}, "cell_type": "markdown", "metadata": { "id": "NN11LrySYRuE" }, "source": [ "以上我們學了一個時間序列資料基本的性值,並且也介紹最天真的幾種預測方式。\n", "\n", "而訓練模型時,常會需要將資料作隨機抽取,以便讓模型更新時增加隨機性也可以稍微避免over fitting。\n", "\n", "但是時間序列資料具局部關聯性,所以不能隨便抽樣離散的時間點,必須以time window為單位抽樣資料\n", "\n", "這邊我們介紹如何使用tf.data.Dataset來組成這樣抽樣的Dataloader:\n", "- tf.data.Dataset.window\n", "- Window-wise tf.data.Dataset\n", "- Complete Data Loader" ] }, { "attachments": {}, "cell_type": "markdown", "metadata": { "id": "3EwBJlTJYRuF" }, "source": [ "**開始前先import必要套件**" ] }, { "cell_type": "code", "execution_count": null, "metadata": { "id": "B8dAR6IsYRuG" }, "outputs": [], "source": [ "%matplotlib notebook\n", "\n", "import numpy as np\n", "import matplotlib.pyplot as plt\n", "\n", "from plotly import express as px" ] }, { "attachments": {}, "cell_type": "markdown", "metadata": { "id": "Ns66GRyGYRuH" }, "source": [ "**畫圖的功能以及toy data產生器**" ] }, { "cell_type": "code", "execution_count": null, "metadata": { "id": "k13ZaduTYRuH" }, "outputs": [], "source": [ "def plot_series(time, series, start=0, end=None, labels=None, title=None):\n", " # Visualizes time series data\n", " # Args:\n", " # time (array of int) - 時間點, 長度為T\n", " # series (list of array of int) - 時間點對應的資料列表,列表內時間序列數量為D,\n", " # 每筆資料長度為T,若非為列表則轉為列表\n", " # start (int) - 開始的資料序(第幾筆)\n", " # end (int) - 結束繪製的資料序(第幾筆)\n", " # labels (list of strings)- 對於多時間序列或多維度的標註\n", " # title (string)- 圖片標題\n", "\n", " # 若資料只有一筆,則轉為list\n", " if type(series) != list:\n", " series = [series]\n", "\n", " if not end:\n", " end = len(series[0])\n", "\n", " if labels:\n", " # 設立dictionary, 讓plotly畫訊號線時可以標註label\n", " dictionary = {\"time\": time}\n", " for idx, l in enumerate(labels):\n", " # 截斷資料,保留想看的部分,並分段紀錄於dictionary中\n", " dictionary.update({l: series[idx][start:end]})\n", " # 畫訊號線\n", " fig = px.line(dictionary,\n", " x=\"time\",\n", " y=list(dictionary.keys())[1:],\n", " width=1000,\n", " height=400,\n", " title=title)\n", " else:\n", " # 畫訊號線\n", " fig = px.line(x=time, y=series, width=1000, height=400, title=title)\n", " fig.show()\n", "\n", "\n", "def trend(time, slope=0):\n", " # 產生合成水平直線資料,其長度與時間等長,直線趨勢與設定slope相同\n", " # Args:\n", " # time (array of int) - 時間點, 長度為T\n", " # slope (float) - 設定資料的傾斜程度與正負\n", " # Returns:\n", " # series (array of float) - 產出slope 與設定相同的一條線\n", "\n", " series = slope * time\n", "\n", " return series\n", "\n", "\n", "def seasonal_pattern(season_time, pattern_type='triangle'):\n", " # 產生某個特定pattern,\n", " # Args:\n", " # season_time (array of float) - 周期內的時間點, 長度為T\n", " # pattern_type (str) - 這邊提供triangle與cosine\n", " # Returns:\n", " # data_pattern (array of float) - 根據自訂函式產出特定的pattern\n", "\n", " # 用特定function生成pattern\n", " if pattern_type == 'triangle':\n", " data_pattern = np.where(season_time < 0.5,\n", " season_time*2,\n", " 2-season_time*2)\n", " if pattern_type == 'cosine':\n", " data_pattern = np.cos(season_time*np.pi*2)\n", "\n", " return data_pattern\n", "\n", "\n", "def seasonality(time, period, amplitude=1, phase=30, pattern_type='triangle'):\n", " # Repeats the same pattern at each period\n", " # Args:\n", " # time (array of int) - 時間點, 長度為T\n", " # period (int) - 週期長度,必小於T\n", " # amplitude (float) - 序列幅度大小\n", " # phase (int) - 相位,為遞移量,正的向左(提前)、負的向右(延後)\n", " # pattern_type (str) - 這邊提供triangle與cosine\n", " # Returns:\n", " # data_pattern (array of float) - 有指定周期、振幅、相位、pattern後的time series\n", "\n", " # 將時間依週期重置為0\n", " season_time = ((time + phase) % period) / period\n", "\n", " # 產生週期性訊號並乘上幅度\n", " data_pattern = amplitude * seasonal_pattern(season_time, pattern_type)\n", "\n", " return data_pattern\n", "\n", "\n", "def noise(time, noise_level=1, seed=None):\n", " # 合成雜訊,這邊用高斯雜訊,機率密度為常態分布\n", " # Args:\n", " # time (array of int) - 時間點, 長度為T\n", " # noise_level (float) - 雜訊大小\n", " # seed (int) - 同樣的seed可以重現同樣的雜訊\n", " # Returns:\n", " # noise (array of float) - 雜訊時間序列\n", "\n", " # 做一個基於某個seed的雜訊生成器\n", " rnd = np.random.RandomState(seed)\n", "\n", " # 生與time同長度的雜訊,並且乘上雜訊大小 (不乘的話,標準差是1)\n", " noise = rnd.randn(len(time)) * noise_level\n", "\n", " return noise\n", "\n", "\n", "def toy_generation(time=np.arange(4 * 365),\n", " bias=500.,\n", " slope=0.1,\n", " period=180,\n", " amplitude=40.,\n", " phase=30,\n", " pattern_type='triangle',\n", " noise_level=5.,\n", " seed=2022):\n", " signal_series = bias\\\n", " + trend(time, slope)\\\n", " + seasonality(time,\n", " period,\n", " amplitude,\n", " phase,\n", " pattern_type)\n", " noise_series = noise(time, noise_level, seed)\n", "\n", " series = signal_series+noise_series\n", " return series" ] }, { "cell_type": "code", "execution_count": null, "metadata": { "id": "YHTIPK-NYRuJ" }, "outputs": [], "source": [ "def split(x, train_size):\n", " # 最簡單直接取前後,並且時間點也記得要切,我們直接立個function\n", " return x[..., :train_size], x[..., train_size:]\n", "\n", "\n", "# 先合成資料\n", "time = np.arange(4 * 365) # 定義時間點\n", "series_sample = toy_generation(time) # 這就是我們合成出來的資料\n", "\n", "time_train, time_test = split(time, 365*3)\n", "series_train, series_test = split(series_sample, 365*3)" ] }, { "attachments": {}, "cell_type": "markdown", "metadata": { "id": "NEqK4_zrYRuK" }, "source": [ "## tf.data.Dataset.window" ] }, { "attachments": {}, "cell_type": "markdown", "metadata": { "id": "cTrF1VIMYRuK" }, "source": [ "這邊我們會使用tf.data API,\n", "\n", "裡面用tf.data.Dataset這個套件,將資料作成一個生成器:每次丟出特定處理過的部分資料,並且轉為tf.Tensor型態。\n", "\n", "詳細使用方法請參考我們Deep Learning章節。" ] }, { "cell_type": "code", "execution_count": null, "metadata": { "id": "8JsP3YPLYRuL" }, "outputs": [], "source": [ "import tensorflow.data as tfd" ] }, { "cell_type": "code", "execution_count": null, "metadata": { "id": "NwIxv3IfYRuL" }, "outputs": [], "source": [ "dataset = tfd.Dataset.range(6)\n", "print('Original Dataset')\n", "for d in dataset:\n", " print(d, d.numpy()) # 可以用.numpy()轉成numpy格式,方便印出來看" ] }, { "attachments": {}, "cell_type": "markdown", "metadata": { "id": "SlEWv7GIYRuM" }, "source": [ "使用```.window(size=W,shift=B)```功能可以將原本data以```W```為單位輸出,並每次往右位移```B```個單位找第一個資料,所以每個window的起始點間距為B\n", "\n", "$w[k,\\tau]=y[B*k+\\tau]$, for $\\tau\\in\\{0,1,2,...,W\\}$" ] }, { "attachments": {}, "cell_type": "markdown", "metadata": { "id": "y_4JDlNWYRuM" }, "source": [ "" ] }, { "attachments": {}, "cell_type": "markdown", "metadata": { "id": "2WaFdE8UYRuM" }, "source": [ "切成好幾個dataset" ] }, { "cell_type": "code", "execution_count": null, "metadata": { "id": "EvyRgROoYRuM" }, "outputs": [], "source": [ "print('Windowed Datasets')\n", "for ds in dataset.window(size=3, shift=2):\n", " print(ds)\n", " print([d.numpy() for d in ds])\n", " print('------')" ] }, { "attachments": {}, "cell_type": "markdown", "metadata": { "id": "I-9qh5RRYRuN" }, "source": [ "用```drop_remainder=True```可以把不滿足window size的dataset丟掉" ] }, { "cell_type": "code", "execution_count": null, "metadata": { "id": "Cz6sUGsqYRuN", "scrolled": true }, "outputs": [], "source": [ "print('Windowed Datasets')\n", "for ds in dataset.window(size=3, shift=1, drop_remainder=True):\n", " print(ds)\n", " print([d.numpy() for d in ds])\n", " print('------')" ] }, { "attachments": {}, "cell_type": "markdown", "metadata": { "id": "BEnJJ_NSYRuN" }, "source": [ "## Window-wise tf.Dataset" ] }, { "attachments": {}, "cell_type": "markdown", "metadata": { "id": "uqDpApRPYRuN" }, "source": [ "但我們希望一次餵給模型的不是一個dataset還要迴圈,而是一串資料。\n", "\n", "所以我們用```flat_map(mapfun)```處理剛剛的結果,它會將每個資料夾先經過指定處理```mapfun```後,輸出成一個tf.Tensor而不是資料集" ] }, { "cell_type": "code", "execution_count": null, "metadata": { "id": "wrwBhcluYRuO" }, "outputs": [], "source": [ "W = 4\n", "dataset = tfd.Dataset.range(10)\n", "win_ds = dataset.window(size=W, shift=1, drop_remainder=True)\n", "win_ds = win_ds.flat_map(lambda ds: ds.batch(W))\n", "for data in win_ds:\n", " print(data.numpy())" ] }, { "attachments": {}, "cell_type": "markdown", "metadata": { "id": "FhUacT83YRuO" }, "source": [ "最後我們將指定windo-wize的input資料與forcast目標\n", "\n", "**預測下一個內容**\n", "* 輸入序列: window中的前W-1個資料\n", "* forcast目標: window中最後1個資料" ] }, { "cell_type": "code", "execution_count": null, "metadata": { "id": "0lfd9RjxYRuO" }, "outputs": [], "source": [ "ds = win_ds.map(lambda x: (x[:-1], x[-1:]))\n", "for x, y in ds:\n", " print(\"x = \", x.numpy(), \"y = \", y.numpy())" ] }, { "attachments": {}, "cell_type": "markdown", "metadata": { "id": "PYBNB7-SYRuO" }, "source": [ "**預測下K個內容**\n", "* 輸入序列: window中的前W-K個資料\n", "* forcast目標: window中最後K個資料" ] }, { "cell_type": "code", "execution_count": null, "metadata": { "id": "pYSzfz2gYRuP" }, "outputs": [], "source": [ "K = 2\n", "ds = win_ds.map(lambda x: (x[:-K], x[-K:]))\n", "for x, y in ds:\n", " print(\"x = \", x.numpy(), \"y = \", y.numpy())" ] }, { "attachments": {}, "cell_type": "markdown", "metadata": { "id": "N71B-7llYRuP" }, "source": [ "## Complete Data Loader" ] }, { "attachments": {}, "cell_type": "markdown", "metadata": { "id": "0Y98B8aoYRuP" }, "source": [ "我們的目標是要產生一個可以作各種資料分配操控的data loader,我們現在已經可以一次生成一個x,y\n", "\n", "而訓練目標通常以預測下一個資料為主,我們可以先把剛剛的dataset產生的包成function:" ] }, { "cell_type": "code", "execution_count": null, "metadata": { "id": "nys8r4VvYRuP" }, "outputs": [], "source": [ "def win_ar_ds(series, size, shift=1):\n", " # 輸出Window-wise Forcasting Dataset\n", " # Args:\n", " # series (array of float) - 時序資料, 長度為T\n", " # size (int) - Window大小\n", " # shift (int) - 每個window起始點間距\n", " # Returns:\n", " # (tf.data.Dataset(母類名稱,切確type為MapDataset)) - 一個一次生一個window的生成器\n", " ds = tfd.Dataset.from_tensor_slices(series)\n", " ds = ds.window(size=size+1, shift=1, drop_remainder=True)\n", " ds = ds.flat_map(lambda ds: ds.batch(size+1))\n", " return ds.map(lambda x: (x[:-1], x[-1:]))" ] }, { "cell_type": "code", "execution_count": null, "metadata": { "id": "HPxuraoWYRuP" }, "outputs": [], "source": [ "train_ds = win_ar_ds(series_train, size=10) # 切time series\n", "time_ds = win_ar_ds(time_train, size=10) # time那邊也可以切出來對照一下" ] }, { "cell_type": "code", "execution_count": null, "metadata": { "id": "UKAhEck0YRuQ" }, "outputs": [], "source": [ "# 我們吐前三個資料出來看\n", "xx = []\n", "yy = []\n", "time_xx = []\n", "time_yy = []\n", "for (time_x, time_y), (x, y) in zip(time_ds.take(3), train_ds.take(3)):\n", " time_xx.append(time_x.numpy())\n", " time_yy.append(time_y[0].numpy())\n", " xx.append(x.numpy())\n", " yy.append(y[0].numpy())" ] }, { "cell_type": "code", "execution_count": null, "metadata": { "id": "jWRZJ6CZYRuQ" }, "outputs": [], "source": [ "for i in range(3):\n", " plot_series(time_xx[i], xx[i], title=f'label {time_yy[i]} ={yy[i]}')" ] }, { "attachments": {}, "cell_type": "markdown", "metadata": { "id": "kaStNh_tZMkG" }, "source": [ "可看出三串 windowed series具時間推移的關係" ] }, { "attachments": {}, "cell_type": "markdown", "metadata": { "id": "g8watwQTYRuQ" }, "source": [ "後面可以用tf.data.Dataset常用的一些 cache, prefetch等增加效率技巧\n", "\n", "並且對dataset作shuffle以及batch" ] }, { "cell_type": "code", "execution_count": null, "metadata": { "id": "apo-Oh6rYRuQ" }, "outputs": [], "source": [ "train_ds = win_ar_ds(series_train, size=10) # 切time series\n", "train_loader = train_ds.cache().prefetch(-1).shuffle(1000).batch(16)" ] }, { "attachments": {}, "cell_type": "markdown", "metadata": { "id": "ANLPj9GpYRuQ" }, "source": [ "所以訓練的framework大概就長這樣:" ] }, { "cell_type": "code", "execution_count": null, "metadata": { "id": "YmCtUUAJYRuQ" }, "outputs": [], "source": [ "# run loader\n", "for x, y in train_loader:\n", " # 1. 跑model\n", " # 2. 計算loss\n", " # 3. 更新模型\n", " break\n", "print(\"x shape:\", x.shape, x.dtype)\n", "print(\"y shape:\", y.shape, y.dtype)" ] }, { "attachments": {}, "cell_type": "markdown", "metadata": { "id": "d_KGjSwFYRuR" }, "source": [ "或一些處理time series的tf.keras.Model 可以使用model.fit:\n", "\n", "```model.fit(train_loader)```\n", "\n", "後面我們就來train一些可以train的model" ] }, { "attachments": {}, "cell_type": "markdown", "metadata": { "id": "eMW3A0MOYRuR" }, "source": [ "## Compare to original Regression Data Loader" ] }, { "attachments": {}, "cell_type": "markdown", "metadata": { "id": "25TImRjGYRuR" }, "source": [ "時間序列在預測時也可以混入各種不同序列,例如時間本身,或者一些sine,cosine訊號,\n", "\n", "只要認為具有提供資訊的價值都可以放進來,在迴歸時稱為regressor(迴歸因子)。\n", "\n", "相較於window-wise prediction, 最簡單的時間序列regression就沒有window,而是靠著已研究過的regressor來擬和時間序列" ] }, { "cell_type": "code", "execution_count": null, "metadata": { "id": "phRfULOYYRuR" }, "outputs": [], "source": [ "def regressor_ds(*regressors, series):\n", " # 輸出Window-wise Regressor Forcasting Dataset\n", " # Args:\n", " # regressors (arguments of array of float) - 多個迴歸因子,每個長度為T\n", " # series (array of float) - 預測對象,長度\n", " # Returns:\n", " # (tf.data.Dataset(母類名稱,切確type為TensorSliceDataset))\n", " # - 一次生regressors和time series的dataset\n", "\n", " ds = tfd.Dataset.from_tensor_slices((np.stack(regressors, -1), series))\n", " return ds" ] }, { "cell_type": "code", "execution_count": null, "metadata": { "id": "mfabzpTgYRuR" }, "outputs": [], "source": [ "cos_train = seasonality(time_train, 180, 40., 30, 'cosine')\n", "triag_train = seasonality(time_train, 180, 40., 30, 'triangle')\n", "\n", "train_ds_t = regressor_ds(time_train.astype(\"float64\"),\n", " cos_train, triag_train, series=series_train)" ] }, { "cell_type": "code", "execution_count": null, "metadata": { "id": "ckEiyQTeYRuS" }, "outputs": [], "source": [ "for mix, y in train_ds_t.batch(360).shuffle(100):\n", " break\n", "plot_series(mix[:, 0],\n", " [mix[:, 0], mix[:, 1]+500, mix[:, 2]+500, y],\n", " labels=['t', 'cosine', 'triangle', 'series'])" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [] } ], "metadata": { "colab": { "provenance": [] }, "kernelspec": { "display_name": "Python 3 (ipykernel)", "language": "python", "name": "python3" }, "language_info": { "codemirror_mode": { "name": "ipython", "version": 3 }, "file_extension": ".py", "mimetype": "text/x-python", "name": "python", "nbconvert_exporter": "python", "pygments_lexer": "ipython3", "version": "3.7.12" } }, "nbformat": 4, "nbformat_minor": 4 }