<ruby id="bdb3f"></ruby>

    <p id="bdb3f"><cite id="bdb3f"></cite></p>

      <p id="bdb3f"><cite id="bdb3f"><th id="bdb3f"></th></cite></p><p id="bdb3f"></p>
        <p id="bdb3f"><cite id="bdb3f"></cite></p>

          <pre id="bdb3f"></pre>
          <pre id="bdb3f"><del id="bdb3f"><thead id="bdb3f"></thead></del></pre>

          <ruby id="bdb3f"><mark id="bdb3f"></mark></ruby><ruby id="bdb3f"></ruby>
          <pre id="bdb3f"><pre id="bdb3f"><mark id="bdb3f"></mark></pre></pre><output id="bdb3f"></output><p id="bdb3f"></p><p id="bdb3f"></p>

          <pre id="bdb3f"><del id="bdb3f"><progress id="bdb3f"></progress></del></pre>

                <ruby id="bdb3f"></ruby>

                合規國際互聯網加速 OSASE為企業客戶提供高速穩定SD-WAN國際加速解決方案。 廣告
                # 如何對單變量時間序列預測的網格搜索樸素方法 > 原文: [https://machinelearningmastery.com/how-to-grid-search-naive-methods-for-univariate-time-series-forecasting/](https://machinelearningmastery.com/how-to-grid-search-naive-methods-for-univariate-time-series-forecasting/) 簡單的預測方法包括樸素地使用最后一個觀測值作為預測或先前觀測值的平均值。 在使用更復雜的方法之前評估簡單預測方法對單變量時間序列預測問題的表現非常重要,因為它們的表現提供了一個下限和比較點,可用于確定模型是否具有給定技能的技能問題。 盡管簡單,諸如幼稚和平均預測策略之類的方法可以根據選擇哪個先前觀察持續存在或者先前觀察到多少平均值來調整到特定問題。通常,調整這些簡單策略的超參數可以為模型表現提供更強大和可防御的下限,以及可以為更復雜方法的選擇和配置提供信息的令人驚訝的結果。 在本教程中,您將了解如何從頭開始構建一個框架,用于網格搜索簡單樸素和平均策略,用于使用單變量數據進行時間序列預測。 完成本教程后,您將了解: * 如何使用前向驗證從頭開始開發網格搜索簡單模型的框架。 * 如何為出生日常時間序列數據網格搜索簡單模型超參數。 * 如何為洗發水銷售,汽車銷售和溫度的月度時間序列數據網格搜索簡單模型超參數。 讓我們開始吧。 ![How to Grid Search Naive Methods for Univariate Time Series Forecasting](https://img.kancloud.cn/ff/4a/ff4a36bbc32812ca203c28e1af3a5501_640x480.jpg) 如何網格搜索單變量時間序列預測的樸素方法 照片由 [Rob 和 Stephanie Levy](https://www.flickr.com/photos/robandstephanielevy/526862866/) ,保留一些權利。 ## 教程概述 本教程分為六個部分;他們是: 1. 簡單的預測策略 2. 開發網格搜索框架 3. 案例研究 1:沒有趨勢或季節性 4. 案例研究 2:趨勢 5. 案例研究 3:季節性 6. 案例研究 4:趨勢和季節性 ## 簡單的預測策略 在測試更復雜的模型之前測試簡單的預測策略是非常重要和有用的。 簡單的預測策略是那些對預測問題的性質很少或根本沒有假設的策略,并且可以快速實現和計算。 結果可用作表現的基線,并用作比較點。如果模型的表現優于簡單預測策略的表現,則可以說它具有技巧性。 簡單預測策略有兩個主題;他們是: * **樸素**,或直接使用觀察值。 * **平均**,或使用先前觀察計算的統計量。 讓我們仔細看看這兩種策略。 ### 樸素的預測策略 樸素的預測涉及直接使用先前的觀察作為預測而沒有任何改變。 它通常被稱為持久性預測,因為之前的觀察是持久的。 對于季節性數據,可以稍微調整這種簡單的方法。在這種情況下,可以保持前一周期中的同時觀察。 這可以進一步推廣到將歷史數據中的每個可能偏移量測試,以用于保持預測值。 例如,給定系列: ```py [1, 2, 3, 4, 5, 6, 7, 8, 9] ``` 我們可以將最后一次觀察(相對指數-1)保持為值 9,或者將第二次最后一次觀察(相對指數-2)保持為 8,依此類推。 ### 平均預測策略 樸素預測之上的一步是平均先前值的策略。 所有先前的觀察結果均使用均值或中位數進行收集和平均,而不對數據進行其他處理。 在某些情況下,我們可能希望將平均計算中使用的歷史縮短到最后幾個觀察結果。 我們可以將這一點推廣到測試每個可能的 n 個先驗觀察的集合以包括在平均計算中的情況。 例如,給定系列: ```py [1, 2, 3, 4, 5, 6, 7, 8, 9] ``` 我們可以平均最后一個觀察(9),最后兩個觀察(8,9),依此類推。 在季節性數據的情況下,我們可能希望將周期中的最后 n 次先前觀測值與預測時間進行平均。 例如,給定具有 3 步循環的系列: ```py [1, 2, 3, 1, 2, 3, 1, 2, 3] ``` 我們可以使用窗口大小為 3 并平均最后一個觀察值(-3 或 1),最后兩個觀察值(-3 或 1 和 - (3 * 2)或 1),依此類推。 ## 開發網格搜索框架 在本節中,我們將開發一個網格搜索框架,用于搜索前一節中描述的兩個簡單預測策略,即樸素和平均策略。 我們可以從實施一個樸素的預測策略開始。 對于給定的歷史觀測數據集,我們可以在該歷史中保留任何值,即從索引-1 處的先前觀察到歷史上的第一次觀察 - (len(data))。 下面的 _naive_forecast()_ 函數實現了從 1 到數據集長度的給定偏移的樸素預測策略。 ```py # one-step naive forecast def naive_forecast(history, n): return history[-n] ``` 我們可以在一個小的設計數據集上測試這個功能。 ```py # one-step naive forecast def naive_forecast(history, n): return history[-n] # define dataset data = [10.0, 20.0, 30.0, 40.0, 50.0, 60.0, 70.0, 80.0, 90.0, 100.0] print(data) # test naive forecast for i in range(1, len(data)+1): print(naive_forecast(data, i)) ``` 首先運行示例打印設計的數據集,然后打印歷史數據集中每個偏移的樸素預測。 ```py [10.0, 20.0, 30.0, 40.0, 50.0, 60.0, 70.0, 80.0, 90.0, 100.0] 100.0 90.0 80.0 70.0 60.0 50.0 40.0 30.0 20.0 10.0 ``` 我們現在可以考慮為平均預測策略開發一個函數。 平均最后 n 次觀測是直截了當的;例如: ```py from numpy import mean result = mean(history[-n:]) ``` 我們可能還想測試觀察分布是非高斯分布的情況下的中位數。 ```py from numpy import median result = median(history[-n:]) ``` 下面的 _average_forecast()_ 函數實現了這一過程,它將歷史數據和一個配置數組或元組指定為平均值作為整數的先前值的數量,以及一個描述計算平均值的方法的字符串(' _ 表示 _'或'_ 中值 _')。 ```py # one-step average forecast def average_forecast(history, config): n, avg_type = config # mean of last n values if avg_type is 'mean': return mean(history[-n:]) # median of last n values return median(history[-n:]) ``` 下面列出了一個小型人為數據集的完整示例。 ```py from numpy import mean from numpy import median # one-step average forecast def average_forecast(history, config): n, avg_type = config # mean of last n values if avg_type is 'mean': return mean(history[-n:]) # median of last n values return median(history[-n:]) # define dataset data = [10.0, 20.0, 30.0, 40.0, 50.0, 60.0, 70.0, 80.0, 90.0, 100.0] print(data) # test naive forecast for i in range(1, len(data)+1): print(average_forecast(data, (i, 'mean'))) ``` 運行該示例將系列中的下一個值預測為先前觀察的連續子集的平均值,從-1 到-10,包括在內。 ```py [10.0, 20.0, 30.0, 40.0, 50.0, 60.0, 70.0, 80.0, 90.0, 100.0] 100.0 95.0 90.0 85.0 80.0 75.0 70.0 65.0 60.0 55.0 ``` 我們可以更新函數以支持季節性數據的平均值,并考慮季節性偏移。 可以向函數添加偏移量參數,當未設置為 1 時,將在收集要包括在平均值中的值之前確定先前觀察的數量向后計數。 例如,如果 n = 1 且 offset = 3,則從 n * offset 或 1 * 3 = -3 處的單個值計算平均值。如果 n = 2 且 offset = 3,則從 1 * 3 或-3 和 2 * 3 或-6 的值計算平均值。 當季節性配置(n *偏移)超出歷史觀察結束時,我們還可以添加一些保護來引發異常。 更新的功能如下所示。 ```py # one-step average forecast def average_forecast(history, config): n, offset, avg_type = config values = list() if offset == 1: values = history[-n:] else: # skip bad configs if n*offset > len(history): raise Exception('Config beyond end of data: %d %d' % (n,offset)) # try and collect n values using offset for i in range(1, n+1): ix = i * offset values.append(history[-ix]) # mean of last n values if avg_type is 'mean': return mean(values) # median of last n values return median(values) ``` 我們可以在季節性循環的小型人為數據集上測試這個函數。 下面列出了完整的示例。 ```py from numpy import mean from numpy import median # one-step average forecast def average_forecast(history, config): n, offset, avg_type = config values = list() if offset == 1: values = history[-n:] else: # skip bad configs if n*offset > len(history): raise Exception('Config beyond end of data: %d %d' % (n,offset)) # try and collect n values using offset for i in range(1, n+1): ix = i * offset values.append(history[-ix]) # mean of last n values if avg_type is 'mean': return mean(values) # median of last n values return median(values) # define dataset data = [10.0, 20.0, 30.0, 10.0, 20.0, 30.0, 10.0, 20.0, 30.0] print(data) # test naive forecast for i in [1, 2, 3]: print(average_forecast(data, (i, 3, 'mean'))) ``` 運行該示例計算[10],[10,10]和[10,10,10]的平均值。 ```py [10.0, 20.0, 30.0, 10.0, 20.0, 30.0, 10.0, 20.0, 30.0] 10.0 10.0 10.0 ``` 可以將樸素預測策略和平均預測策略結合在一起,形成相同的功能。 這些方法之間存在一些重疊,特別是 _n-_ 偏移到歷史記錄中,用于持久化值或確定要平均的值的數量。 讓一個函數支持這兩種策略是有幫助的,這樣我們就可以同時測試這兩種策略的一套配置,作為簡單模型的更廣泛網格搜索的一部分。 下面的 _simple_forecast()_ 函數將兩種策略組合成一個函數。 ```py # one-step simple forecast def simple_forecast(history, config): n, offset, avg_type = config # persist value, ignore other config if avg_type == 'persist': return history[-n] # collect values to average values = list() if offset == 1: values = history[-n:] else: # skip bad configs if n*offset > len(history): raise Exception('Config beyond end of data: %d %d' % (n,offset)) # try and collect n values using offset for i in range(1, n+1): ix = i * offset values.append(history[-ix]) # check if we can average if len(values) < 2: raise Exception('Cannot calculate average') # mean of last n values if avg_type == 'mean': return mean(values) # median of last n values return median(values) ``` 接下來,我們需要建立一些函數,通過前向驗證重復擬合和評估模型,包括將數據集拆分為訓練集和測試集以及評估一步預測。 我們可以使用給定指定大小的分割的切片來分割列表或 NumPy 數據數組,例如,從測試集中的數據中使用的時間步數。 下面的 _train_test_split()_ 函數為提供的數據集和要在測試集中使用的指定數量的時間步驟實現此功能。 ```py # split a univariate dataset into train/test sets def train_test_split(data, n_test): return data[:-n_test], data[-n_test:] ``` 在對測試數據集中的每個步驟進行預測之后,需要將它們與測試集進行比較以計算錯誤分數。 時間序列預測有許多流行的錯誤分數。在這種情況下,我們將使用均方根誤差(RMSE),但您可以將其更改為您的首選度量,例如 MAPE,MAE 等 下面的 _measure_rmse()_ 函數將根據實際(測試集)和預測值列表計算 RMSE。 ```py # root mean squared error or rmse def measure_rmse(actual, predicted): return sqrt(mean_squared_error(actual, predicted)) ``` 我們現在可以實現[前進驗證方案](https://machinelearningmastery.com/backtest-machine-learning-models-time-series-forecasting/)。這是評估尊重觀測時間順序的時間序列預測模型的標準方法。 首先,使用 _train_test_split(_ _)_ 函數將提供的單變量時間序列數據集分成訓練集和測試集。然后枚舉測試集中的觀察數。對于每一個我們都適合所有歷史的模型,并進行一步預測。然后將對時間步驟的真實觀察添加到歷史中,并重復該過程。調用 _simple_forecast_ _()_ 函數以適合模型并進行預測。最后,通過調用 _measure_rmse()_ 函數,將所有一步預測與實際測試集進行比較,計算錯誤分數。 下面的 _walk_forward_validation()_ 函數實現了這一點,采用單變量時間序列,在測試集中使用的一些時間步驟,以及模型配置數組。 ```py # walk-forward validation for univariate data def walk_forward_validation(data, n_test, cfg): predictions = list() # split dataset train, test = train_test_split(data, n_test) # seed history with training dataset history = [x for x in train] # step over each time-step in the test set for i in range(len(test)): # fit model and make forecast for history yhat = simple_forecast(history, cfg) # store forecast in list of predictions predictions.append(yhat) # add actual observation to history for the next loop history.append(test[i]) # estimate prediction error error = measure_rmse(test, predictions) return error ``` 如果您對進行多步預測感興趣,可以在 _simple_forecast_ _()_ 函數中更改 _ 預測(_ _)_ 的調用并且還改變 _measure_rmse()_ 函數中的誤差計算。 我們可以使用不同的模型配置列表重復調用 _walk_forward_validation()_。 一個可能的問題是,可能不會為模型調用某些模型配置組合,并會拋出異常。 我們可以在網格搜索期間捕獲異常并忽略警告,方法是將所有調用包含在 _walk_forward_validation(_ _)_ 中,并使用 try-except 和 block 來忽略警告。我們還可以添加調試支持來禁用這些保護,以防我們想要查看實際情況。最后,如果確實發生錯誤,我們可以返回 _ 無 _ 結果;否則,我們可以打印一些關于評估的每個模型的技能的信息。當評估大量模型時,這很有用。 下面的 _score_model()_ 函數實現了這個并返回(鍵和結果)的元組,其中鍵是測試模型配置的字符串版本。 ```py # score a model, return None on failure def score_model(data, n_test, cfg, debug=False): result = None # convert config to a key key = str(cfg) # show all warnings and fail on exception if debugging if debug: result = walk_forward_validation(data, n_test, cfg) else: # one failure during model validation suggests an unstable config try: # never show warnings when grid searching, too noisy with catch_warnings(): filterwarnings("ignore") result = walk_forward_validation(data, n_test, cfg) except: error = None # check for an interesting result if result is not None: print(' > Model[%s] %.3f' % (key, result)) return (key, result) ``` 接下來,我們需要一個循環來測試不同模型配置的列表。 這是驅動網格搜索過程的主要功能,并將為每個模型配置調用 _score_model()_ 函數。 通過并行評估模型配置,我們可以大大加快網格搜索過程。一種方法是使用 [Joblib 庫](https://pythonhosted.org/joblib/)。 我們可以定義一個 Parallel 對象,其中包含要使用的核心數,并將其設置為硬件中檢測到的分數。 ```py executor = Parallel(n_jobs=cpu_count(), backend='multiprocessing') ``` 然后我們可以創建一個并行執行的任務列表,這將是我們每個模型配置對 score_model()函數的一次調用。 ```py tasks = (delayed(score_model)(data, n_test, cfg) for cfg in cfg_list) ``` 最后,我們可以使用 _Parallel_ 對象并行執行任務列表。 ```py scores = executor(tasks) ``` 而已。 我們還可以提供評估所有模型配置的非并行版本,以防我們想要調試某些內容。 ```py scores = [score_model(data, n_test, cfg) for cfg in cfg_list] ``` 評估配置列表的結果將是元組列表,每個元組的名稱總結了特定的模型配置,并且使用該配置評估的模型的錯誤為 RMSE 或 _ 無 _(如果有)一個錯誤。 我們可以過濾掉所有設置為 _ 無 _ 的分數。 ```py scores = [r for r in scores if r[1] != None] ``` 然后我們可以按照升序排列列表中的所有元組(最好是第一個),然后返回此分數列表以供審閱。 給定單變量時間序列數據集,模型配置列表(列表列表)以及在測試集中使用的時間步數,下面的 _grid_search()_ 函數實現此行為。可選的并行參數允許對所有內核的模型進行開啟或關閉調整,默認情況下處于打開狀態。 ```py # grid search configs def grid_search(data, cfg_list, n_test, parallel=True): scores = None if parallel: # execute configs in parallel executor = Parallel(n_jobs=cpu_count(), backend='multiprocessing') tasks = (delayed(score_model)(data, n_test, cfg) for cfg in cfg_list) scores = executor(tasks) else: scores = [score_model(data, n_test, cfg) for cfg in cfg_list] # remove empty results scores = [r for r in scores if r[1] != None] # sort configs by error, asc scores.sort(key=lambda tup: tup[1]) return scores ``` 我們差不多完成了。 剩下要做的唯一事情是定義模型配置列表以嘗試數據集。 我們可以一般地定義它。我們可能想要指定的唯一參數是系列中季節性組件的周期性(偏移量)(如果存在)。默認情況下,我們假設沒有季節性組件。 下面的 _simple_configs()_ 函數將創建要評估的模型配置列表。 該函數僅需要歷史數據的最大長度作為參數,并且可選地需要任何季節性組件的周期性,其默認為 1(無季節性組件)。 ```py # create a set of simple configs to try def simple_configs(max_length, offsets=[1]): configs = list() for i in range(1, max_length+1): for o in offsets: for t in ['persist', 'mean', 'median']: cfg = [i, o, t] configs.append(cfg) return configs ``` 我們現在有一個網格搜索簡單模型超參數的框架,通過一步前進驗證。 它是通用的,適用于作為列表或 NumPy 數組提供的任何內存中單變量時間序列。 我們可以通過在人為設計的 10 步數據集上進行測試來確保所有部分協同工作。 下面列出了完整的示例。 ```py # grid search simple forecasts from math import sqrt from numpy import mean from numpy import median from multiprocessing import cpu_count from joblib import Parallel from joblib import delayed from warnings import catch_warnings from warnings import filterwarnings from sklearn.metrics import mean_squared_error # one-step simple forecast def simple_forecast(history, config): n, offset, avg_type = config # persist value, ignore other config if avg_type == 'persist': return history[-n] # collect values to average values = list() if offset == 1: values = history[-n:] else: # skip bad configs if n*offset > len(history): raise Exception('Config beyond end of data: %d %d' % (n,offset)) # try and collect n values using offset for i in range(1, n+1): ix = i * offset values.append(history[-ix]) # check if we can average if len(values) < 2: raise Exception('Cannot calculate average') # mean of last n values if avg_type == 'mean': return mean(values) # median of last n values return median(values) # root mean squared error or rmse def measure_rmse(actual, predicted): return sqrt(mean_squared_error(actual, predicted)) # split a univariate dataset into train/test sets def train_test_split(data, n_test): return data[:-n_test], data[-n_test:] # walk-forward validation for univariate data def walk_forward_validation(data, n_test, cfg): predictions = list() # split dataset train, test = train_test_split(data, n_test) # seed history with training dataset history = [x for x in train] # step over each time-step in the test set for i in range(len(test)): # fit model and make forecast for history yhat = simple_forecast(history, cfg) # store forecast in list of predictions predictions.append(yhat) # add actual observation to history for the next loop history.append(test[i]) # estimate prediction error error = measure_rmse(test, predictions) return error # score a model, return None on failure def score_model(data, n_test, cfg, debug=False): result = None # convert config to a key key = str(cfg) # show all warnings and fail on exception if debugging if debug: result = walk_forward_validation(data, n_test, cfg) else: # one failure during model validation suggests an unstable config try: # never show warnings when grid searching, too noisy with catch_warnings(): filterwarnings("ignore") result = walk_forward_validation(data, n_test, cfg) except: error = None # check for an interesting result if result is not None: print(' > Model[%s] %.3f' % (key, result)) return (key, result) # grid search configs def grid_search(data, cfg_list, n_test, parallel=True): scores = None if parallel: # execute configs in parallel executor = Parallel(n_jobs=cpu_count(), backend='multiprocessing') tasks = (delayed(score_model)(data, n_test, cfg) for cfg in cfg_list) scores = executor(tasks) else: scores = [score_model(data, n_test, cfg) for cfg in cfg_list] # remove empty results scores = [r for r in scores if r[1] != None] # sort configs by error, asc scores.sort(key=lambda tup: tup[1]) return scores # create a set of simple configs to try def simple_configs(max_length, offsets=[1]): configs = list() for i in range(1, max_length+1): for o in offsets: for t in ['persist', 'mean', 'median']: cfg = [i, o, t] configs.append(cfg) return configs if __name__ == '__main__': # define dataset data = [10.0, 20.0, 30.0, 40.0, 50.0, 60.0, 70.0, 80.0, 90.0, 100.0] print(data) # data split n_test = 4 # model configs max_length = len(data) - n_test cfg_list = simple_configs(max_length) # grid search scores = grid_search(data, cfg_list, n_test) print('done') # list top 3 configs for cfg, error in scores[:3]: print(cfg, error) ``` 首先運行該示例打印設計的時間序列數據集。 接下來,在評估模型配置及其錯誤時報告它們。 最后,報告前三種配置的配置和錯誤。 我們可以看到,配置為 1 的持久性模型(例如,持續最后一次觀察)實現了所測試的簡單模型的最佳表現,如預期的那樣。 ```py [10.0, 20.0, 30.0, 40.0, 50.0, 60.0, 70.0, 80.0, 90.0, 100.0] > Model[[1, 1, 'persist']] 10.000 > Model[[2, 1, 'persist']] 20.000 > Model[[2, 1, 'mean']] 15.000 > Model[[2, 1, 'median']] 15.000 > Model[[3, 1, 'persist']] 30.000 > Model[[4, 1, 'persist']] 40.000 > Model[[5, 1, 'persist']] 50.000 > Model[[5, 1, 'mean']] 30.000 > Model[[3, 1, 'mean']] 20.000 > Model[[4, 1, 'median']] 25.000 > Model[[6, 1, 'persist']] 60.000 > Model[[4, 1, 'mean']] 25.000 > Model[[3, 1, 'median']] 20.000 > Model[[6, 1, 'mean']] 35.000 > Model[[5, 1, 'median']] 30.000 > Model[[6, 1, 'median']] 35.000 done [1, 1, 'persist'] 10.0 [2, 1, 'mean'] 15.0 [2, 1, 'median'] 15.0 ``` 現在我們有一個強大的網格搜索簡單模型超參數框架,讓我們在一套標準的單變量時間序列數據集上進行測試。 每個數據集上顯示的結果提供了一個表現基準,可用于比較更復雜的方法,如 SARIMA,ETS,甚至機器學習方法。 ## 案例研究 1:沒有趨勢或季節性 “每日女性分娩”數據集總結了 1959 年美國加利福尼亞州每日女性總分娩數。 數據集沒有明顯的趨勢或季節性成分。 ![Line Plot of the Daily Female Births Dataset](https://img.kancloud.cn/82/c2/82c2332333012a46b0561998c9b6224b_1440x780.jpg) 每日女性出生數據集的線圖 您可以從 [DataMarket](https://datamarket.com/data/set/235k/daily-total-female-births-in-california-1959#!ds=235k&display=line) 了解有關數據集的更多信息。 直接從這里下載數據集: * [每日總數 - 女性分娩.sv](https://raw.githubusercontent.com/jbrownlee/Datasets/master/daily-total-female-births.csv) 在當前工作目錄中使用文件名“ _daily-total-female-births.csv_ ”保存文件。 我們可以使用函數 _read_csv()_ 將此數據集作為 Pandas 系列加載。 ```py series = read_csv('daily-total-female-births.csv', header=0, index_col=0) ``` 數據集有一年或 365 個觀測值。我們將使用前 200 個進行訓練,將剩余的 165 個作為測試集。 下面列出了搜索每日女性單變量時間序列預測問題的完整示例網格。 ```py # grid search simple forecast for daily female births from math import sqrt from numpy import mean from numpy import median from multiprocessing import cpu_count from joblib import Parallel from joblib import delayed from warnings import catch_warnings from warnings import filterwarnings from sklearn.metrics import mean_squared_error from pandas import read_csv # one-step simple forecast def simple_forecast(history, config): n, offset, avg_type = config # persist value, ignore other config if avg_type == 'persist': return history[-n] # collect values to average values = list() if offset == 1: values = history[-n:] else: # skip bad configs if n*offset > len(history): raise Exception('Config beyond end of data: %d %d' % (n,offset)) # try and collect n values using offset for i in range(1, n+1): ix = i * offset values.append(history[-ix]) # check if we can average if len(values) < 2: raise Exception('Cannot calculate average') # mean of last n values if avg_type == 'mean': return mean(values) # median of last n values return median(values) # root mean squared error or rmse def measure_rmse(actual, predicted): return sqrt(mean_squared_error(actual, predicted)) # split a univariate dataset into train/test sets def train_test_split(data, n_test): return data[:-n_test], data[-n_test:] # walk-forward validation for univariate data def walk_forward_validation(data, n_test, cfg): predictions = list() # split dataset train, test = train_test_split(data, n_test) # seed history with training dataset history = [x for x in train] # step over each time-step in the test set for i in range(len(test)): # fit model and make forecast for history yhat = simple_forecast(history, cfg) # store forecast in list of predictions predictions.append(yhat) # add actual observation to history for the next loop history.append(test[i]) # estimate prediction error error = measure_rmse(test, predictions) return error # score a model, return None on failure def score_model(data, n_test, cfg, debug=False): result = None # convert config to a key key = str(cfg) # show all warnings and fail on exception if debugging if debug: result = walk_forward_validation(data, n_test, cfg) else: # one failure during model validation suggests an unstable config try: # never show warnings when grid searching, too noisy with catch_warnings(): filterwarnings("ignore") result = walk_forward_validation(data, n_test, cfg) except: error = None # check for an interesting result if result is not None: print(' > Model[%s] %.3f' % (key, result)) return (key, result) # grid search configs def grid_search(data, cfg_list, n_test, parallel=True): scores = None if parallel: # execute configs in parallel executor = Parallel(n_jobs=cpu_count(), backend='multiprocessing') tasks = (delayed(score_model)(data, n_test, cfg) for cfg in cfg_list) scores = executor(tasks) else: scores = [score_model(data, n_test, cfg) for cfg in cfg_list] # remove empty results scores = [r for r in scores if r[1] != None] # sort configs by error, asc scores.sort(key=lambda tup: tup[1]) return scores # create a set of simple configs to try def simple_configs(max_length, offsets=[1]): configs = list() for i in range(1, max_length+1): for o in offsets: for t in ['persist', 'mean', 'median']: cfg = [i, o, t] configs.append(cfg) return configs if __name__ == '__main__': # define dataset series = read_csv('daily-total-female-births.csv', header=0, index_col=0) data = series.values print(data) # data split n_test = 165 # model configs max_length = len(data) - n_test cfg_list = simple_configs(max_length) # grid search scores = grid_search(data, cfg_list, n_test) print('done') # list top 3 configs for cfg, error in scores[:3]: print(cfg, error) ``` 運行該示例將打印模型配置,并在評估模型時打印 RMSE。 在運行結束時報告前三個模型配置及其錯誤。 我們可以看到最好的結果是出生率約為 6.93 的 RMSE,具有以下配置: * **策略**:平均值 * **n** :22 * **函數**:mean() 這是令人驚訝的,因為缺乏趨勢或季節性,我會預期持續-1 或整個歷史數據集的平均值,以產生最佳表現。 ```py ... > Model[[186, 1, 'mean']] 7.523 > Model[[200, 1, 'median']] 7.681 > Model[[186, 1, 'median']] 7.691 > Model[[187, 1, 'persist']] 11.137 > Model[[187, 1, 'mean']] 7.527 done [22, 1, 'mean'] 6.930411499775709 [23, 1, 'mean'] 6.932293117115201 [21, 1, 'mean'] 6.951918385845375 ``` ## 案例研究 2:趨勢 “洗發水”數據集總結了三年內洗發水的月銷售額。 數據集包含明顯的趨勢,但沒有明顯的季節性成分。 ![Line Plot of the Monthly Shampoo Sales Dataset](https://img.kancloud.cn/ae/a5/aea5992c9bbc15a4ef6046500013d962_1438x776.jpg) 月度洗發水銷售數據集的線圖 您可以從 [DataMarket](https://datamarket.com/data/set/22r0/sales-of-shampoo-over-a-three-year-period#!ds=22r0&display=line) 了解有關數據集的更多信息。 直接從這里下載數據集: * [shampoo.csv](https://raw.githubusercontent.com/jbrownlee/Datasets/master/shampoo.csv) 在當前工作目錄中使用文件名“ _shampoo.csv_ ”保存文件。 我們可以使用函數 _read_csv()_ 將此數據集作為 Pandas 系列加載。 ```py # parse dates def custom_parser(x): return datetime.strptime('195'+x, '%Y-%m') # load dataset series = read_csv('shampoo.csv', header=0, index_col=0, date_parser=custom_parser) ``` 數據集有三年,或 36 個觀測值。我們將使用前 24 個用于訓練,其余 12 個用作測試集。 下面列出了搜索洗發水銷售單變量時間序列預測問題的完整示例網格。 ```py # grid search simple forecast for monthly shampoo sales from math import sqrt from numpy import mean from numpy import median from multiprocessing import cpu_count from joblib import Parallel from joblib import delayed from warnings import catch_warnings from warnings import filterwarnings from sklearn.metrics import mean_squared_error from pandas import read_csv from pandas import datetime # one-step simple forecast def simple_forecast(history, config): n, offset, avg_type = config # persist value, ignore other config if avg_type == 'persist': return history[-n] # collect values to average values = list() if offset == 1: values = history[-n:] else: # skip bad configs if n*offset > len(history): raise Exception('Config beyond end of data: %d %d' % (n,offset)) # try and collect n values using offset for i in range(1, n+1): ix = i * offset values.append(history[-ix]) # check if we can average if len(values) < 2: raise Exception('Cannot calculate average') # mean of last n values if avg_type == 'mean': return mean(values) # median of last n values return median(values) # root mean squared error or rmse def measure_rmse(actual, predicted): return sqrt(mean_squared_error(actual, predicted)) # split a univariate dataset into train/test sets def train_test_split(data, n_test): return data[:-n_test], data[-n_test:] # walk-forward validation for univariate data def walk_forward_validation(data, n_test, cfg): predictions = list() # split dataset train, test = train_test_split(data, n_test) # seed history with training dataset history = [x for x in train] # step over each time-step in the test set for i in range(len(test)): # fit model and make forecast for history yhat = simple_forecast(history, cfg) # store forecast in list of predictions predictions.append(yhat) # add actual observation to history for the next loop history.append(test[i]) # estimate prediction error error = measure_rmse(test, predictions) return error # score a model, return None on failure def score_model(data, n_test, cfg, debug=False): result = None # convert config to a key key = str(cfg) # show all warnings and fail on exception if debugging if debug: result = walk_forward_validation(data, n_test, cfg) else: # one failure during model validation suggests an unstable config try: # never show warnings when grid searching, too noisy with catch_warnings(): filterwarnings("ignore") result = walk_forward_validation(data, n_test, cfg) except: error = None # check for an interesting result if result is not None: print(' > Model[%s] %.3f' % (key, result)) return (key, result) # grid search configs def grid_search(data, cfg_list, n_test, parallel=True): scores = None if parallel: # execute configs in parallel executor = Parallel(n_jobs=cpu_count(), backend='multiprocessing') tasks = (delayed(score_model)(data, n_test, cfg) for cfg in cfg_list) scores = executor(tasks) else: scores = [score_model(data, n_test, cfg) for cfg in cfg_list] # remove empty results scores = [r for r in scores if r[1] != None] # sort configs by error, asc scores.sort(key=lambda tup: tup[1]) return scores # create a set of simple configs to try def simple_configs(max_length, offsets=[1]): configs = list() for i in range(1, max_length+1): for o in offsets: for t in ['persist', 'mean', 'median']: cfg = [i, o, t] configs.append(cfg) return configs # parse dates def custom_parser(x): return datetime.strptime('195'+x, '%Y-%m') if __name__ == '__main__': # load dataset series = read_csv('shampoo.csv', header=0, index_col=0, date_parser=custom_parser) data = series.values print(data.shape) # data split n_test = 12 # model configs max_length = len(data) - n_test cfg_list = simple_configs(max_length) # grid search scores = grid_search(data, cfg_list, n_test) print('done') # list top 3 configs for cfg, error in scores[:3]: print(cfg, error) ``` 運行該示例將打印配置,并在評估模型時打印 RMSE。 在運行結束時報告前三個模型配置及其錯誤。 我們可以看到最好的結果是 RMSE 約 95.69 銷售,具有以下配置: * **策略**:堅持 * **n** :2 這是令人驚訝的,因為數據的趨勢結構表明持久的前一個值(-1)將是最好的方法,而不是持久的倒數第二個值。 ```py ... > Model[[23, 1, 'mean']] 209.782 > Model[[23, 1, 'median']] 221.863 > Model[[24, 1, 'persist']] 305.635 > Model[[24, 1, 'mean']] 213.466 > Model[[24, 1, 'median']] 226.061 done [2, 1, 'persist'] 95.69454007413378 [2, 1, 'mean'] 96.01140340258198 [2, 1, 'median'] 96.01140340258198 ``` ## 案例研究 3:季節性 “月平均溫度”數據集總結了 1920 至 1939 年華氏諾丁漢城堡的月平均氣溫,以華氏度為單位。 數據集具有明顯的季節性成分,沒有明顯的趨勢。 ![Line Plot of the Monthly Mean Temperatures Dataset](https://img.kancloud.cn/24/3c/243cfe0fd0e8ab5923b76dcc30ca7a95_1454x766.jpg) 月平均氣溫數據集的線圖 您可以從 [DataMarket](https://datamarket.com/data/set/22li/mean-monthly-air-temperature-deg-f-nottingham-castle-1920-1939#!ds=22li&display=line) 了解有關數據集的更多信息。 直接從這里下載數據集: * [monthly-mean-temp.csv](https://raw.githubusercontent.com/jbrownlee/Datasets/master/monthly-mean-temp.csv) 在當前工作目錄中使用文件名“ _monthly-mean-temp.csv_ ”保存文件。 我們可以使用函數 _read_csv()_ 將此數據集作為 Pandas 系列加載。 ```py series = read_csv('monthly-mean-temp.csv', header=0, index_col=0) ``` 數據集有 20 年,或 240 個觀測值。我們將數據集修剪為過去五年的數據(60 個觀測值),以加快模型評估過程并使用去年或 12 個觀測值進行測試集。 ```py # trim dataset to 5 years data = data[-(5*12):] ``` 季節性成分的周期約為一年,或 12 個觀測值。在準備模型配置時,我們將此作為調用 _simple_configs()_ 函數的季節性時段。 ```py # model configs cfg_list = simple_configs(seasonal=[0, 12]) ``` 下面列出了搜索月平均溫度時間序列預測問題的完整示例網格。 ```py # grid search simple forecast for monthly mean temperature from math import sqrt from numpy import mean from numpy import median from multiprocessing import cpu_count from joblib import Parallel from joblib import delayed from warnings import catch_warnings from warnings import filterwarnings from sklearn.metrics import mean_squared_error from pandas import read_csv # one-step simple forecast def simple_forecast(history, config): n, offset, avg_type = config # persist value, ignore other config if avg_type == 'persist': return history[-n] # collect values to average values = list() if offset == 1: values = history[-n:] else: # skip bad configs if n*offset > len(history): raise Exception('Config beyond end of data: %d %d' % (n,offset)) # try and collect n values using offset for i in range(1, n+1): ix = i * offset values.append(history[-ix]) # check if we can average if len(values) < 2: raise Exception('Cannot calculate average') # mean of last n values if avg_type == 'mean': return mean(values) # median of last n values return median(values) # root mean squared error or rmse def measure_rmse(actual, predicted): return sqrt(mean_squared_error(actual, predicted)) # split a univariate dataset into train/test sets def train_test_split(data, n_test): return data[:-n_test], data[-n_test:] # walk-forward validation for univariate data def walk_forward_validation(data, n_test, cfg): predictions = list() # split dataset train, test = train_test_split(data, n_test) # seed history with training dataset history = [x for x in train] # step over each time-step in the test set for i in range(len(test)): # fit model and make forecast for history yhat = simple_forecast(history, cfg) # store forecast in list of predictions predictions.append(yhat) # add actual observation to history for the next loop history.append(test[i]) # estimate prediction error error = measure_rmse(test, predictions) return error # score a model, return None on failure def score_model(data, n_test, cfg, debug=False): result = None # convert config to a key key = str(cfg) # show all warnings and fail on exception if debugging if debug: result = walk_forward_validation(data, n_test, cfg) else: # one failure during model validation suggests an unstable config try: # never show warnings when grid searching, too noisy with catch_warnings(): filterwarnings("ignore") result = walk_forward_validation(data, n_test, cfg) except: error = None # check for an interesting result if result is not None: print(' > Model[%s] %.3f' % (key, result)) return (key, result) # grid search configs def grid_search(data, cfg_list, n_test, parallel=True): scores = None if parallel: # execute configs in parallel executor = Parallel(n_jobs=cpu_count(), backend='multiprocessing') tasks = (delayed(score_model)(data, n_test, cfg) for cfg in cfg_list) scores = executor(tasks) else: scores = [score_model(data, n_test, cfg) for cfg in cfg_list] # remove empty results scores = [r for r in scores if r[1] != None] # sort configs by error, asc scores.sort(key=lambda tup: tup[1]) return scores # create a set of simple configs to try def simple_configs(max_length, offsets=[1]): configs = list() for i in range(1, max_length+1): for o in offsets: for t in ['persist', 'mean', 'median']: cfg = [i, o, t] configs.append(cfg) return configs if __name__ == '__main__': # define dataset series = read_csv('monthly-mean-temp.csv', header=0, index_col=0) data = series.values print(data) # data split n_test = 12 # model configs max_length = len(data) - n_test cfg_list = simple_configs(max_length, offsets=[1,12]) # grid search scores = grid_search(data, cfg_list, n_test) print('done') # list top 3 configs for cfg, error in scores[:3]: print(cfg, error) ``` 運行該示例將打印模型配置,并在評估模型時打印 RMSE。 在運行結束時報告前三個模型配置及其錯誤。 我們可以看到最好的結果是大約 1.501 度的 RMSE,具有以下配置: * **策略**:平均值 * **n** :4 * **偏移**:12 * **函數**:mean() 這個發現并不太令人驚訝。鑒于數據的季節性結構,我們預計年度周期中先前點的最后幾個觀測值的函數是有效的。 ```py ... > Model[[227, 12, 'persist']] 5.365 > Model[[228, 1, 'persist']] 2.818 > Model[[228, 1, 'mean']] 8.258 > Model[[228, 1, 'median']] 8.361 > Model[[228, 12, 'persist']] 2.818 done [4, 12, 'mean'] 1.5015616870445234 [8, 12, 'mean'] 1.5794579766489512 [13, 12, 'mean'] 1.586186052546763 ``` ## 案例研究 4:趨勢和季節性 “月度汽車銷售”數據集總結了 1960 年至 1968 年間加拿大魁北克省的月度汽車銷量。 數據集具有明顯的趨勢和季節性成分。 ![Line Plot of the Monthly Car Sales Dataset](https://img.kancloud.cn/04/5f/045f949f08b91dfff5ec9152a3aaca14_1462x768.jpg) 月度汽車銷售數據集的線圖 您可以從 [DataMarket](https://datamarket.com/data/set/22n4/monthly-car-sales-in-quebec-1960-1968#!ds=22n4&display=line) 了解有關數據集的更多信息。 直接從這里下載數據集: * [month-car-sales.csv](https://raw.githubusercontent.com/jbrownlee/Datasets/master/monthly-car-sales.csv) 在當前工作目錄中使用文件名“ _monthly-car-sales.csv_ ”保存文件。 我們可以使用函數 _read_csv()_ 將此數據集作為 Pandas 系列加載。 ```py series = read_csv('monthly-car-sales.csv', header=0, index_col=0) ``` 數據集有 9 年或 108 個觀測值。我們將使用去年或 12 個觀測值作為測試集。 季節性成分的期限可能是六個月或 12 個月。在準備模型配置時,我們將嘗試將兩者作為調用 _simple_configs()_ 函數的季節性時段。 ```py # model configs cfg_list = simple_configs(seasonal=[0,6,12]) ``` 下面列出了搜索月度汽車銷售時間序列預測問題的完整示例網格。 ```py # grid search simple forecast for monthly car sales from math import sqrt from numpy import mean from numpy import median from multiprocessing import cpu_count from joblib import Parallel from joblib import delayed from warnings import catch_warnings from warnings import filterwarnings from sklearn.metrics import mean_squared_error from pandas import read_csv # one-step simple forecast def simple_forecast(history, config): n, offset, avg_type = config # persist value, ignore other config if avg_type == 'persist': return history[-n] # collect values to average values = list() if offset == 1: values = history[-n:] else: # skip bad configs if n*offset > len(history): raise Exception('Config beyond end of data: %d %d' % (n,offset)) # try and collect n values using offset for i in range(1, n+1): ix = i * offset values.append(history[-ix]) # check if we can average if len(values) < 2: raise Exception('Cannot calculate average') # mean of last n values if avg_type == 'mean': return mean(values) # median of last n values return median(values) # root mean squared error or rmse def measure_rmse(actual, predicted): return sqrt(mean_squared_error(actual, predicted)) # split a univariate dataset into train/test sets def train_test_split(data, n_test): return data[:-n_test], data[-n_test:] # walk-forward validation for univariate data def walk_forward_validation(data, n_test, cfg): predictions = list() # split dataset train, test = train_test_split(data, n_test) # seed history with training dataset history = [x for x in train] # step over each time-step in the test set for i in range(len(test)): # fit model and make forecast for history yhat = simple_forecast(history, cfg) # store forecast in list of predictions predictions.append(yhat) # add actual observation to history for the next loop history.append(test[i]) # estimate prediction error error = measure_rmse(test, predictions) return error # score a model, return None on failure def score_model(data, n_test, cfg, debug=False): result = None # convert config to a key key = str(cfg) # show all warnings and fail on exception if debugging if debug: result = walk_forward_validation(data, n_test, cfg) else: # one failure during model validation suggests an unstable config try: # never show warnings when grid searching, too noisy with catch_warnings(): filterwarnings("ignore") result = walk_forward_validation(data, n_test, cfg) except: error = None # check for an interesting result if result is not None: print(' > Model[%s] %.3f' % (key, result)) return (key, result) # grid search configs def grid_search(data, cfg_list, n_test, parallel=True): scores = None if parallel: # execute configs in parallel executor = Parallel(n_jobs=cpu_count(), backend='multiprocessing') tasks = (delayed(score_model)(data, n_test, cfg) for cfg in cfg_list) scores = executor(tasks) else: scores = [score_model(data, n_test, cfg) for cfg in cfg_list] # remove empty results scores = [r for r in scores if r[1] != None] # sort configs by error, asc scores.sort(key=lambda tup: tup[1]) return scores # create a set of simple configs to try def simple_configs(max_length, offsets=[1]): configs = list() for i in range(1, max_length+1): for o in offsets: for t in ['persist', 'mean', 'median']: cfg = [i, o, t] configs.append(cfg) return configs if __name__ == '__main__': # define dataset series = read_csv('monthly-car-sales.csv', header=0, index_col=0) data = series.values print(data) # data split n_test = 12 # model configs max_length = len(data) - n_test cfg_list = simple_configs(max_length, offsets=[1,12]) # grid search scores = grid_search(data, cfg_list, n_test) print('done') # list top 3 configs for cfg, error in scores[:3]: print(cfg, error) ``` 運行該示例將打印模型配置,并在評估模型時打印 RMSE。 在運行結束時報告前三個模型配置及其錯誤。 我們可以看到最好的結果是大約 1841.155 銷售的 RMSE,具有以下配置: * **策略**:平均值 * **n** :3 * **偏移**:12 * **功能**:中位數() 所選模型是先前周期中同一點的最后幾次觀察的函數并不奇怪,盡管使用中位數而不是均值可能不會立即明顯,結果比平均值好得多。 ```py ... > Model[[79, 1, 'median']] 5124.113 > Model[[91, 12, 'persist']] 9580.149 > Model[[79, 12, 'persist']] 8641.529 > Model[[92, 1, 'persist']] 9830.921 > Model[[92, 1, 'mean']] 5148.126 done [3, 12, 'median'] 1841.1559321976688 [3, 12, 'mean'] 2115.198495632485 [4, 12, 'median'] 2184.37708988932 ``` ## 擴展 本節列出了一些擴展您可能希望探索的教程的想法。 * **地塊預測**。更新框架以重新擬合具有最佳配置的模型并預測整個測試數據集,然后將預測與測試集中的實際觀察值進行比較。 * **漂移方法**。實施簡單預測的漂移方法,并將結果與??平均和樸素的方法進行比較。 * **另一個數據集**。將開發的框架應用于另外的單變量時間序列問題(例如,來自[時間序列數據集庫](https://datamarket.com/data/list/?q=provider:tsdl))。 如果你探索任何這些擴展,我很想知道。 ## 進一步閱讀 如果您希望深入了解,本節將提供有關該主題的更多資源。 * [預測,維基百科](https://en.wikipedia.org/wiki/Forecasting) * [Joblib:運行 Python 函數作為管道作業](https://pythonhosted.org/joblib/) * [時間序列數據集庫](https://datamarket.com/data/list/?q=provider:tsdl),DataMarket。 ## 摘要 在本教程中,您了解了如何從頭開始構建一個框架,用于網格搜索簡單的樸素和平均策略,用于使用單變量數據進行時間序列預測。 具體來說,你學到了: * 如何使用前向驗證從頭開始開發網格搜索簡單模型的框架。 * 如何為出生日常時間序列數據網格搜索簡單模型超參數。 * 如何為洗發水銷售,汽車銷售和溫度的月度時間序列數據網格搜索簡單模型超參數。 你有任何問題嗎? 在下面的評論中提出您的問題,我會盡力回答。
                  <ruby id="bdb3f"></ruby>

                  <p id="bdb3f"><cite id="bdb3f"></cite></p>

                    <p id="bdb3f"><cite id="bdb3f"><th id="bdb3f"></th></cite></p><p id="bdb3f"></p>
                      <p id="bdb3f"><cite id="bdb3f"></cite></p>

                        <pre id="bdb3f"></pre>
                        <pre id="bdb3f"><del id="bdb3f"><thead id="bdb3f"></thead></del></pre>

                        <ruby id="bdb3f"><mark id="bdb3f"></mark></ruby><ruby id="bdb3f"></ruby>
                        <pre id="bdb3f"><pre id="bdb3f"><mark id="bdb3f"></mark></pre></pre><output id="bdb3f"></output><p id="bdb3f"></p><p id="bdb3f"></p>

                        <pre id="bdb3f"><del id="bdb3f"><progress id="bdb3f"></progress></del></pre>

                              <ruby id="bdb3f"></ruby>

                              哎呀哎呀视频在线观看