# 十三、使用 TensorFlow 加載和預處理數據
> 譯者:[@SeanCheney](https://www.jianshu.com/u/130f76596b02)
目前為止,我們只是使用了存放在內存中的數據集,但深度學習系統經常需要在大數據集上訓練,而內存放不下大數據集。其它的深度學習庫通過對大數據集做預處理,繞過了內存限制,但 TensorFlow 通過 Data API,使一切都容易了:只需要創建一個數據集對象,告訴它去哪里拿數據,以及如何做轉換就行。TensorFlow 負責所有的實現細節,比如多線程、隊列、批次和預提取。另外,Data API 和 tf.keras 可以無縫配合!
Data API 還可以從現成的文件(比如 CSV 文件)、固定大小的二進制文件、使用 TensorFlow 的 TFRecord 格式的文件(支持大小可變的記錄)讀取數據。TFRecord 是一個靈活高效的二進制格式,基于 Protocol Buffers(一個開源二進制格式)。Data API 還支持從 SQL 數據庫讀取數據。另外,許多開源插件也可以用來從各種數據源讀取數據,包括谷歌的 BigQuery。
高效讀取大數據集不是唯一的難點:數據還需要進行預處理,通常是歸一化。另外,數據集中并不是只有數值字段:可能還有文本特征、類型特征,等等。這些特征需要編碼,比如使用獨熱編碼或嵌入(后面會看到,嵌入嵌入是用來標識類型或 token 的緊密矢量)。預處理的一種方式是寫自己的自定義預處理層,另一種是使用 Kera 的標準預處理層。
本章中,我們會介紹 Data API,TFRecord 格式,以及如何創建自定義預處理層,和使用 Keras 的預處理層。還會快速學習 TensorFlow 生態的一些項目:
* TF Transform (tf.Transform):可以用來編寫單獨的預處理函數,它可以在真正訓練前,運行在完整訓練集的批模式中,然后輸出到 TF Function,插入到訓練好的模型中。只要模型在生產環境中部署好了,就能隨時預處理新的實例。
* TF Datasets (TFDS)。提供了下載許多常見數據集的函數,包括 ImageNet,和數據集對象(可用 Data API 操作)。
## Data API
整個 Data API 都是圍繞數據集`dataset`的概念展開的:可以猜得到,數據集表示一連串數據項。通常你是用的數據集是從硬盤里逐次讀取數據的,簡單起見,我們是用`tf.data.Dataset.from_tensor_slices()`創建一個存儲于內存中的數據集:
```py
>>> X = tf.range(10) # any data tensor
>>> dataset = tf.data.Dataset.from_tensor_slices(X)
>>> dataset
<TensorSliceDataset shapes: (), types: tf.int32>
```
函數`from_tensor_slices()`取出一個張量,創建了一個`tf.data.Dataset`,它的元素是`X`的全部切片,因此這個數據集包括 10 項:張量 0、1、2、。。。、9。在這個例子中,使用`tf.data.Dataset.range(10)`也能達到同樣的效果。
可以像下面這樣對這個數據集迭代:
```py
>>> for item in dataset:
... print(item)
...
tf.Tensor(0, shape=(), dtype=int32)
tf.Tensor(1, shape=(), dtype=int32)
tf.Tensor(2, shape=(), dtype=int32)
[...]
tf.Tensor(9, shape=(), dtype=int32)
```
### 鏈式轉換
有了數據集之后,通過調用轉換方法,可以對數據集做各種轉換。每個方法會返回一個新的數據集,因此可以將轉換像下面這樣鏈接起來(見圖 13-1):
```py
>>> dataset = dataset.repeat(3).batch(7)
>>> for item in dataset:
... print(item)
...
tf.Tensor([0 1 2 3 4 5 6], shape=(7,), dtype=int32)
tf.Tensor([7 8 9 0 1 2 3], shape=(7,), dtype=int32)
tf.Tensor([4 5 6 7 8 9 0], shape=(7,), dtype=int32)
tf.Tensor([1 2 3 4 5 6 7], shape=(7,), dtype=int32)
tf.Tensor([8 9], shape=(2,), dtype=int32)
```
圖 13-1 鏈接數據集轉換
在這個例子中,我們先在原始數據集上調用了`repeat()`方法,返回了一個重復了原始數據集 3 次的新數據集。當然,這步不會復制數據集中的數據三次(如果調用這個方法時沒有加參數,新數據集會一直重復源數據集,必須讓迭代代碼決定何時退出)。然后我們在新數據集上調用了`batch()`方法,這步又產生了一個新數據集。這一步會將上一個數據集的分成 7 個一批次。最后,做一下迭代。可以看到,最后的批次只有兩個元素,可以設置`drop_remainder=True`,丟棄最后的兩項,將數據對齊。
> 警告:數據集方法不修改數據集,只是生成新的數據集而已,所以要做新數據集的賦值(即使用`dataset = ...`)。
還可以通過`map()`方法轉換元素。比如,下面的代碼創建了一個每個元素都翻倍的新數據集:
```py
>>> dataset = dataset.map(lambda x: x * 2) # Items: [0,2,4,6,8,10,12]
```
這個函數可以用來對數據做預處理。有時可能會涉及復雜的計算,比如改變形狀或旋轉圖片,所以通常需要多線程來加速:只需設置參數`num_parallel_calls`就行。注意,傳遞給`map()`方法的函數必須是可以轉換為 TF Function。
`map()`方法是對每個元素做轉換的,`apply()`方法是對數據整體做轉換的。例如,下面的代碼對數據集應用了`unbatch()`函數(這個函數目前是試驗性的,但很有可能加入到以后的版本中)。新數據集中的每個元素都是一個單整數張量,而不是批次大小為 7 的整數。
```py
>>> dataset = dataset.apply(tf.data.experimental.unbatch()) # Items: 0,2,4,...
```
還可以用`filter()`方法做過濾:
```py
>>> dataset = dataset.filter(lambda x: x < 10) # Items: 0 2 4 6 8 0 2 4 6...
```
`take()`方法可以用來查看數據:
```py
>>> for item in dataset.take(3):
... print(item)
...
tf.Tensor(0, shape=(), dtype=int64)
tf.Tensor(2, shape=(), dtype=int64)
tf.Tensor(4, shape=(), dtype=int64)
```
### 打散數據
當訓練集中的實例是獨立同分布時,梯度下降的效果最好(見第 4 章)。實現獨立同分布的一個簡單方法是使用`shuffle()`方法。它能創建一個新數據集,新數據集的前面是一個緩存,緩存中是源數據集的開頭元素。然后,無論什么時候取元素,就會從緩存中隨便隨機取出一個元素,從源數據集中取一個新元素替換。從緩沖器取元素,直到緩存為空。必須要指定緩存的大小,最好大一點,否則隨機效果不明顯。不要查出內存大小,即使內存夠用,緩存超過數據集也是沒有意義的。可以提供一個隨機種子,如果希望隨機的順序是固定的。例如,下面的代碼創建并顯示了一個包括 0 到 9 的數據集,重復 3 次,用大小為 5 的緩存做隨機,隨機種子是 42,批次大小是 7:
```py
>>> dataset = tf.data.Dataset.range(10).repeat(3) # 0 to 9, three times
>>> dataset = dataset.shuffle(buffer_size=5, seed=42).batch(7)
>>> for item in dataset:
... print(item)
...
tf.Tensor([0 2 3 6 7 9 4], shape=(7,), dtype=int64)
tf.Tensor([5 0 1 1 8 6 5], shape=(7,), dtype=int64)
tf.Tensor([4 8 7 1 2 3 0], shape=(7,), dtype=int64)
tf.Tensor([5 4 2 7 8 9 9], shape=(7,), dtype=int64)
tf.Tensor([3 6], shape=(2,), dtype=int64)
```
> 提示:如果在隨機數據集上調用`repeat()`方法,默認下,每次迭代的順序都是新的。通常這樣沒有問題,但如果你想讓每次迭代的順序一樣(比如,測試或調試),可以設置`reshuffle_each_iteration=False`。
對于內存放不下的大數據集,這個簡單的隨機緩存方法就不成了,因為緩存相比于數據集就小太多了。一個解決方法是將源數據本身打亂(例如,Linux 可以用`shuf`命令打散文本文件)。這樣肯定能提高打散的效果!即使源數據打散了,你可能還想再打散一點,否則每個周期可能還會出現同樣的順序,模型最后可能是偏的(比如,源數據順序偶然導致的假模式)。為了將實例進一步打散,一個常用的方法是將源數據分成多個文件,訓練時隨機順序讀取。但是,相同文件中的實例仍然靠的太近。為了避免這點,可以同時隨機讀取多個文件,做交叉。在最頂層,可以用`shuffle()`加一個隨機緩存。如果這聽起來很麻煩,不用擔心:Data API 都為你實現了,幾行代碼就行。
#### 多行數據交叉
首先,假設加載了加州房價數據集,打散它(除非已經打散了),分成訓練集、驗證集、測試集。然后將每個數據集分成多個 csv 文件,每個如下所示(每行包含 8 個輸入特征加上目標中位房價):
```py
MedInc,HouseAge,AveRooms,AveBedrms,Popul,AveOccup,Lat,Long,MedianHouseValue
3.5214,15.0,3.0499,1.1065,1447.0,1.6059,37.63,-122.43,1.442
5.3275,5.0,6.4900,0.9910,3464.0,3.4433,33.69,-117.39,1.687
3.1,29.0,7.5423,1.5915,1328.0,2.2508,38.44,-122.98,1.621
[...]
```
再假設`train_filepaths`包括了訓練文件路徑的列表(還要`valid_filepaths`和`test_filepaths`):
```py
>>> train_filepaths
['datasets/housing/my_train_00.csv', 'datasets/housing/my_train_01.csv',...]
```
另外,可以使用文件模板,比如`train_filepaths = "datasets/housing/my_train_*.csv"`。現在,創建一個數據集,包括這些文件路徑:
```py
filepath_dataset = tf.data.Dataset.list_files(train_filepaths, seed=42)
```
默認,`list_files()`函數返回一個文件路徑打散的數據集。也可以設置`shuffle=False`,文件路徑就不打散了。
然后,可以調用`leave()`方法,一次讀取 5 個文件,做交叉操作(跳過第一行表頭,使用`skip()`方法):
```py
n_readers = 5
dataset = filepath_dataset.interleave(
lambda filepath: tf.data.TextLineDataset(filepath).skip(1),
cycle_length=n_readers)
```
`interleave()`方法會創建一個數據集,它從`filepath_dataset`讀 5 條文件路徑,對每條路徑調用函數(例子中是用的匿名函數)來創建數據集(例子中是`TextLineDataset`)。為了更清楚點,這一步總歐諾個由七個數據集:文件路徑數據集,交叉數據集,和五個`TextLineDatasets`數據集。當迭代交叉數據集時,會循環`TextLineDatasets`,每次讀取一行,知道數據集為空。然后會從`filepath_dataset`再獲取五個文件路徑,做同樣的交叉,直到文件路徑為空。
> 提示:為了交叉得更好,最好讓文件有相同的長度,否則長文件的尾部不會交叉。
默認情況下,`interleave()`不是并行的,只是順序從每個文件讀取一行。如果想變成并行讀取文件,可以設定參數`num_parallel_calls`為想要的線程數(`map()`方法也有這個參數)。還可以將其設置為`tf.data.experimental.AUTOTUNE`,讓 TensorFlow 根據 CPU 自己找到合適的線程數(目前這是個試驗性的功能)。看看目前數據集包含什么:
```py
>>> for line in dataset.take(5):
... print(line.numpy())
...
b'4.2083,44.0,5.3232,0.9171,846.0,2.3370,37.47,-122.2,2.782'
b'4.1812,52.0,5.7013,0.9965,692.0,2.4027,33.73,-118.31,3.215'
b'3.6875,44.0,4.5244,0.9930,457.0,3.1958,34.04,-118.15,1.625'
b'3.3456,37.0,4.5140,0.9084,458.0,3.2253,36.67,-121.7,2.526'
b'3.5214,15.0,3.0499,1.1065,1447.0,1.6059,37.63,-122.43,1.442'
```
忽略表頭行,這是五個 csv 文件的第一行,隨機選取的。看起來不錯。但是也看到了,都是字節串,需要解析數據,縮放數據。
### 預處理數據
實現一個小函數來做預處理:
```py
X_mean, X_std = [...] # mean and scale of each feature in the training set
n_inputs = 8
def preprocess(line):
defs = [0.] * n_inputs + [tf.constant([], dtype=tf.float32)]
fields = tf.io.decode_csv(line, record_defaults=defs)
x = tf.stack(fields[:-1])
y = tf.stack(fields[-1:])
return (x - X_mean) / X_std, y
```
逐行看下代碼:
* 首先,代碼假定已經算好了訓練集中每個特征的平均值和標準差。`X_mean`和`X_std`是 1D 張量(或 NumPy 數組),包含八個浮點數,每個都是特征。
* `preprocess()`函數從 csv 取一行,開始解析。使用`tf.io.decode_csv()`函數,接收兩個參數,第一個是要解析的行,第二個是一個數組,包含 csv 文件每列的默認值。這個數組不僅告訴 TensorFlow 每列的默認值,還有總列數和數據類型。在這個例子中,是告訴 TensorFlow,所有特征列都是浮點數,缺失值默認為,但提供了一個類型是`tf.float32`的空數組,作為最后一列(目標)的默認值:數組告訴 TensorFlow 這一列包含浮點數,但沒有默認值,所以碰到空值時會報異常。
* `decode_csv()`函數返回一個標量張量(每列一個)的列表,但應該返回 1D 張量數組。所以在所有張量上調用了`tf.stack()`,除了最后一個。然后對目標值做同樣的操作(讓其成為只包含一個值,而不是標量張量的 1D 張量數組)。
* 最后,對特征做縮放,減去平均值,除以標準差,然后返回包含縮放特征和目標值的元組。
測試這個預處理函數:
```py
>>> preprocess(b'4.2083,44.0,5.3232,0.9171,846.0,2.3370,37.47,-122.2,2.782')
(<tf.Tensor: id=6227, shape=(8,), dtype=float32, numpy=
array([ 0.16579159, 1.216324 , -0.05204564, -0.39215982, -0.5277444 ,
-0.2633488 , 0.8543046 , -1.3072058 ], dtype=float32)>,
<tf.Tensor: [...], numpy=array([2.782], dtype=float32)>)
```
很好,接下來將函數應用到數據集上。
### 整合
為了讓代碼可復用,將前面所有討論過的東西編程一個小函數:創建并返回一個數據集,可以高效從多個 csv 文件加載加州房價數據集,做預處理、打散、選擇性重復,做批次(見圖 3-2):
```py
def csv_reader_dataset(filepaths, repeat=1, n_readers=5,
n_read_threads=None, shuffle_buffer_size=10000,
n_parse_threads=5, batch_size=32):
dataset = tf.data.Dataset.list_files(filepaths)
dataset = dataset.interleave(
lambda filepath: tf.data.TextLineDataset(filepath).skip(1),
cycle_length=n_readers, num_parallel_calls=n_read_threads)
dataset = dataset.map(preprocess, num_parallel_calls=n_parse_threads)
dataset = dataset.shuffle(shuffle_buffer_size).repeat(repeat)
return dataset.batch(batch_size).prefetch(1)
```
代碼條理很清晰,除了最后一行的`prefetch(1)`,對于提升性能很關鍵。
### 預提取
通過調用`prefetch(1)`,創建了一個高效的數據集,總能提前一個批次。換句話說,當訓練算法在一個批次上工作時,數據集已經準備好下一個批次了(從硬盤讀取數據并做預處理)。這樣可以極大提升性能,解釋見圖 13-3。如果加載和預處理還要是多線程的(通過設置`interleave()`和`map()`的`num_parallel_calls`),可以利用多 CPU,準備批次數據可以比在 GPU 上訓練還快:這樣 GPU 就可以 100%利用起來了(排除數據從 CPU 傳輸到 GPU 的時間),訓練可以快很多。
圖 13-3 通過預提取,讓 CPU 和 GPU 并行工作:GPU 在一個批次上工作時,CPU 準備下一個批次
> 提示:如果想買一塊 GPU 顯卡的話,它的處理能力和顯存都是非常重要的。另一個同樣重要的,是顯存帶寬,即每秒可以進入或流出內存的 GB 數。
如果數據集不大,內存放得下,可以使用數據集的`cache()`方法將數據集存入內存。通常這步是在加載和預處理數據之后,在打散、重復、分批次之前。這樣做的話,每個實例只需做一次讀取和處理,下一個批次仍能提前準備。
你現在知道如何搭建高效輸入管道,從多個文件加載和預處理數據了。我們討論了最常用的數據集方法,但還有一些你可能感興趣:`concatenate()`、`zip()`、`window()`、`reduce()`、`shard()`、`flat_map()`、和`padded_batch()`。還有兩個類方法:`from_generator()`和`from_tensors()`,它們能從 Python 生成器或張量列表創建數據集。更多細節請查看 API 文檔。`tf.data.experimental`中還有試驗性功能,其中許多功能可能會添加到未來版本中。
### tf.keras 使用數據集
現在可以使用`csv_reader_dataset()`函數為訓練集創建數據集了。注意,不需要將數據重復,tf.keras 會做重復。還為驗證集和測試集創建了數據集:
```py
train_set = csv_reader_dataset(train_filepaths)
valid_set = csv_reader_dataset(valid_filepaths)
test_set = csv_reader_dataset(test_filepaths)
```
現在就可以利用這些數據集來搭建和訓練 Keras 模型了。我們要做的就是將訓練和驗證集傳遞給`fit()`方法,而不是`X_train`、`y_train`、`X_valid`、`y_valid`:
```py
model = keras.models.Sequential([...])
model.compile([...])
model.fit(train_set, epochs=10, validation_data=valid_set)
```
相似的,可以將數據集傳遞給`evaluate()`和`predict()`方法:
```py
model.evaluate(test_set)
new_set = test_set.take(3).map(lambda X, y: X) # pretend we have 3 new instances
model.predict(new_set) # a dataset containing new instances
```
跟其它集合不同,`new_set`通常不包含標簽(如果包含標簽,也會被 Keras 忽略)。注意,在所有這些情況下,還可以使用 NumPy 數組(但仍需要加載和預處理)。
如果你想創建自定義訓練循環(就像 12 章那樣),你可以在訓練集上迭代:
```py
for X_batch, y_batch in train_set:
[...] # perform one Gradient Descent step
```
事實上,還可以創建一個 TF 函數(見第 12 章)來完成整個訓練循環:
```py
@tf.function
def train(model, optimizer, loss_fn, n_epochs, [...]):
train_set = csv_reader_dataset(train_filepaths, repeat=n_epochs, [...])
for X_batch, y_batch in train_set:
with tf.GradientTape() as tape:
y_pred = model(X_batch)
main_loss = tf.reduce_mean(loss_fn(y_batch, y_pred))
loss = tf.add_n([main_loss] + model.losses)
grads = tape.gradient(loss, model.trainable_variables)
optimizer.apply_gradients(zip(grads, model.trainable_variables))
```
祝賀,你現在知道如何使用 Data API 創建強大的輸入管道了!但是,目前為止我們使用的 CSV 文件,雖然常見又簡單方便,但不夠高效,不支持大或復雜的數據結構(比如圖片或音頻)。這就是 TFRecord 要解決的。
> 提示:如果你對 csv 文件感到滿意(或其它任意格式),就不必使用 TFRecord。就像老話說的,只要沒壞就別修!TFRecord 是為解決訓練過程中加載和解析數據時碰到的瓶頸。
## TFRecord 格式
TFRecord 格式是 TensorFlow 偏愛的存儲大量數據并高效讀取的數據。它是非常簡單的二進制格式,只包含不同大小的二進制記錄的數據(每個記錄包括一個長度、一個 CRC 校驗和,校驗和用于檢查長度是否正確,真是的數據,和一個數據的 CRC 校驗和,用于檢查數據是否正確)。可以使用`tf.io.TFRecordWriter`類輕松創建 TFRecord 文件:
```py
with tf.io.TFRecordWriter("my_data.tfrecord") as f:
f.write(b"This is the first record")
f.write(b"And this is the second record")
```
然后可以使用`tf.data.TFRecordDataset`來讀取一個或多個 TFRecord 文件:
```py
filepaths = ["my_data.tfrecord"]
dataset = tf.data.TFRecordDataset(filepaths)
for item in dataset:
print(item)
```
輸出是:
```py
tf.Tensor(b'This is the first record', shape=(), dtype=string)
tf.Tensor(b'And this is the second record', shape=(), dtype=string)
```
> 提示:默認情況下,`TFRecordDataset`會逐一讀取數據,但通過設定`num_parallel_reads`可以并行讀取并交叉數據。另外,你可以使用`list_files()`和`interleave()`獲得同樣的結果。
### 壓縮 TFRecord 文件
有的時候壓縮 TFRecord 文件很有必要,特別是當需要網絡傳輸的時候。你可以通過設定`options`參數,創建壓縮的 TFRecord 文件:
```py
options = tf.io.TFRecordOptions(compression_type="GZIP")
with tf.io.TFRecordWriter("my_compressed.tfrecord", options) as f:
[...]
```
當讀取壓縮 TFRecord 文件時,需要指定壓縮類型:
```py
dataset = tf.data.TFRecordDataset(["my_compressed.tfrecord"],
compression_type="GZIP")
```
### 簡要介紹協議緩存
即便每條記錄可以使用任何二進制格式,TFRecord 文件通常包括序列化的協議緩存(也稱為 protobuf)。這是一種可移植、可擴展的高效二進制格式,是谷歌在 2001 年開發,并在 2008 年開源的;協議緩存現在使用廣泛,特別是在 gRPC,谷歌的遠程調用系統中。定義語言如下:
```py
syntax = "proto3";
message Person {
string name = 1;
int32 id = 2;
repeated string email = 3;
}
```
定義寫道,使用的是協議緩存的版本 3,指定每個`Person`對象可以有一個`name`,類型是字符串,類型是 int32 的`id`,0 個或多個`email`字段,每個都是字符串。數字 1、2、3 是字段標識符:用于每條數據的二進制表示。當你在`.proto`文件中有了一個定義,就可以編譯了。這就需要`protoc`,協議緩存編譯器,來生成 Python(或其它語言)的訪問類。注意,要使用的緩存協議的定義已經編譯好了,它們的 Python 類是 TensorFlow 的一部分,所以就不必使用`protoc`了。你需要知道的知識如何使用 Python 的緩存協議訪問類。為了講解,看一個簡單的例子,使用訪問類來生成`Person`緩存協議:
```py
>>> from person_pb2 import Person # 引入生成的訪問類
>>> person = Person(name="Al", id=123, email=["a@b.com"]) # 創建一個 Person
>>> print(person) # 展示 Person
name: "Al"
id: 123
email: "a@b.com"
>>> person.name # 讀取一個字段
"Al"
>>> person.name = "Alice" # 修改一個字段
>>> person.email[0] # 重復的字段可以像數組一樣訪問
"a@b.com"
>>> person.email.append("c@d.com") # 添加 email 地址
>>> s = person.SerializeToString() # 將對象序列化為字節串
>>> s
b'\n\x05Alice\x10{\x1a\x07a@b.com\x1a\x07c@d.com'
>>> person2 = Person() # 創建一個新 Person
>>> person2.ParseFromString(s) #解析字節串(字節長度 27)
27
>>> person == person2 # 現在相等
True
```
簡而言之,我們引入了`protoc`生成的類`Person`,創建了一個實例,展示、讀取、并寫入新字段,然后使用`SerializeToString()`將其序列化。序列化的數據就可以保存或通過網絡傳輸了。當讀取或接收二進制數據時,可以使用`ParseFromString()`方法來解析,就得到了序列化對象的復制。
可以將序列化的`Person`對象存儲為 TFRecord 文件,然后可以加載和解析。但是`SerializeToString()`和`ParseFromString()`不是 TensorFlow 運算(這段代碼中的其它代碼也不是 TensorFlow 運算),因此 TensorFlow 函數中不能含有這兩個方法(除非將其包裝進`tf.py_function()`運算,但會使代碼速度變慢,移植性變差)。幸好,TensorFlow 還有提供了解析運算的特殊協議緩存。
### TensorFlow 協議緩存
TFRecord 文件主要使用的協議緩存是`Example`,它表示數據集中的一個實例,包括命名特征的列表,每個特征可以是字節串列表、或浮點列表、或整數列表。下面是一個協議緩存的定義:
```py
syntax = "proto3";
message BytesList { repeated bytes value = 1; }
message FloatList { repeated float value = 1 [packed = true]; }
message Int64List { repeated int64 value = 1 [packed = true]; }
message Feature {
oneof kind {
BytesList bytes_list = 1;
FloatList float_list = 2;
Int64List int64_list = 3;
}
};
message Features { map<string, Feature> feature = 1; };
message Example { Features features = 1; };
```
`BytesList`、`FloatList`、`Int64List`的定義都很清楚。注意,重復的數值字段使用了`[packed = true]`,目的是高效編碼。`Feature`包含的是`BytesList`、`FloatList`、`Int64List`三者之一。`Features`(帶 s)是包含特征名和對應特征值的字典。最后,一個`Example`值包含一個`Features`對象。下面是一個如何創建`tf.train.Example`的例子,表示的是之前同樣的人,并存儲為 TFRecord 文件:
```py
from tensorflow.train import BytesList, FloatList, Int64List
from tensorflow.train import Feature, Features, Example
person_example = Example(
features=Features(
feature={
"name": Feature(bytes_list=BytesList(value=[b"Alice"])),
"id": Feature(int64_list=Int64List(value=[123])),
"emails": Feature(bytes_list=BytesList(value=[b"a@b.com",
b"c@d.com"]))
}))
```
這段代碼有點冗長和重復,但很清晰(可以很容易將其包裝起來)。現在有了`Example`協議緩存,可以調用`SerializeToString()`方法將其序列化,然后將結果數據存入 TFRecord 文件:
```py
with tf.io.TFRecordWriter("my_contacts.tfrecord") as f:
f.write(person_example.SerializeToString())
```
通常需要寫不止一個`Example`!一般來說,你需要寫一個轉換腳本,讀取當前格式(例如 csv),為每個實例創建`Example`協議緩存,序列化并存儲到若干 TFRecord 文件中,最好再打散。這些需要花費不少時間,如有必要再這么做(也許 CSV 文件就足夠了)。
有了序列化好的`Example`TFRecord 文件之后,就可以加載了。
### 加載和解析 Example
要加載序列化的`Example`協議緩存,需要再次使用`tf.data.TFRecordDataset`,使用`tf.io.parse_single_example()`解析每個`Example`。這是一個 TensorFlow 運算,所以可以包裝進 TF 函數。它至少需要兩個參數:一個包含序列化數據的字符串標量張量,和每個特征的描述。描述是一個字典,將每個特征名映射到`tf.io.FixedLenFeature`描述符,描述符指明特征的形狀、類型和默認值,或(當特征列表長度可能變化時,比如`"email"特征`)映射到`tf.io.VarLenFeature`描述符,它只指向類型。
下面的代碼定義了描述字典,然后迭代`TFRecordDataset`,解析序列化的`Example`協議緩存:
```py
feature_description = {
"name": tf.io.FixedLenFeature([], tf.string, default_value=""),
"id": tf.io.FixedLenFeature([], tf.int64, default_value=0),
"emails": tf.io.VarLenFeature(tf.string),
}
for serialized_example in tf.data.TFRecordDataset(["my_contacts.tfrecord"]):
parsed_example = tf.io.parse_single_example(serialized_example,
feature_description)
```
長度固定的特征會像常規張量那樣解析,而長度可變的特征會作為稀疏張量解析。可以使用`tf.sparse.to_dense()`將稀疏張量轉變為緊密張量,但只是簡化了值的訪問:
```py
>>> tf.sparse.to_dense(parsed_example["emails"], default_value=b"")
<tf.Tensor: [...] dtype=string, numpy=array([b'a@b.com', b'c@d.com'], [...])>
>>> parsed_example["emails"].values
<tf.Tensor: [...] dtype=string, numpy=array([b'a@b.com', b'c@d.com'], [...])>
```
`BytesList`可以包含任意二進制數據,序列化對象也成。例如,可以使用`tf.io.encode_jpeg()`將圖片編碼為 JPEG 格式,然后將二進制數據放入`BytesList`。然后,當代碼讀取`TFRecord`時,會從解析`Example`開始,再調用`tf.io.decode_jpeg()`解析數據,得到原始圖片(或者可以使用`tf.io.decode_image()`,它能解析任意`BMP`、`GIF`、`JPEG`、`PNG`格式)。你還可以通過`tf.io.serialize_tensor()`序列化張量,將結果字節串放入`BytesList`特征,將任意張量存儲在`BytesList`中。之后,當解析`TFRecord`時,可以使用`tf.io.parse_tensor()`解析數據。
除了使用`tf.io.parse_single_example()`逐一解析`Example`,你還可以通過`tf.io.parse_example()`逐批次解析:
```py
dataset = tf.data.TFRecordDataset(["my_contacts.tfrecord"]).batch(10)
for serialized_examples in dataset:
parsed_examples = tf.io.parse_example(serialized_examples,
feature_description)
```
可以看到`Example`協議緩存對大多數情況就足夠了。但是,如果處理的是嵌套列表,就會比較麻煩。比如,假設你想分類文本文檔。每個文檔可能都是句子的列表,而每個句子又是詞的列表。每個文檔可能還有評論列表,評論又是詞的列表。可能還有上下文數據,比如文檔的作者、標題和出版日期。TensorFlow 的`SequenceExample`協議緩存就是為了處理這種情況的。
### 使用`SequenceExample`協議緩存處理嵌套列表
下面是`SequenceExample`協議緩存的定義:
```py
message FeatureList { repeated Feature feature = 1; };
message FeatureLists { map<string, FeatureList> feature_list = 1; };
message SequenceExample {
Features context = 1;
FeatureLists feature_lists = 2;
};
```
`SequenceExample`包括一個上下文數據的`Features`對象,和一個包括一個或多個命名`FeatureList`對象(比如,一個`FeatureList`命名為`"content"`,另一個命名為`"comments"`)的`FeatureLists`對象。每個`FeatureList`包含`Feature`對象的列表,每個`Feature`對象可能是字節串、64 位整數或浮點數的列表(這個例子中,每個`Feature`表示的是一個句子或一條評論,格式或許是詞的列表)。創建`SequenceExample`,將其序列化、解析,和創建、序列化、解析`Example`很像,但必須要使用`tf.io.parse_single_sequence_example()`來解析單個的`SequenceExample`或用`tf.io.parse_sequence_example()`解析一個批次。兩個函數都是返回一個包含上下文特征(字典)和特征列表(也是字典)的元組。如果特征列表包含大小可變的序列(就像前面的例子),可以將其轉化為嵌套張量,使用`tf.RaggedTensor.from_sparse()`:
```py
parsed_context, parsed_feature_lists = tf.io.parse_single_sequence_example(
serialized_sequence_example, context_feature_descriptions,
sequence_feature_descriptions)
parsed_content = tf.RaggedTensor.from_sparse(parsed_feature_lists["content"])
```
現在你就知道如何高效存儲、加載和解析數據了,下一步是準備數據。
## 預處理輸入特征
為神經網絡準備數據需要將所有特征轉變為數值特征,做一些歸一化工作等等。特別的,如果數據包括類型特征或文本特征,也需要轉變為數字。這些工作可以在準備數據文件的時候做,使用 NumPy、Pandas、Scikit-Learn 這樣的工作。或者,可以在用 Data API 加載數據時,實時預處理數據(比如,使用數據集的`map()`方法,就像前面的例子),或者可以給模型加一個預處理層。接下來,來看最后一種方法。
例如,這個例子是使用`Lambda`層實現標準化層。對于每個特征,減去其平均值,再除以標準差(再加上一個平滑項,避免 0 除):
```py
means = np.mean(X_train, axis=0, keepdims=True)
stds = np.std(X_train, axis=0, keepdims=True)
eps = keras.backend.epsilon()
model = keras.models.Sequential([
keras.layers.Lambda(lambda inputs: (inputs - means) / (stds + eps)),
[...] # 其它層
])
```
并不難。但是,你也許更想要一個獨立的自定義層(就像 Scikit-Learn 的`StandardScaler`),而不是像`means`和`stds`這樣的全局變量:
```py
class Standardization(keras.layers.Layer):
def adapt(self, data_sample):
self.means_ = np.mean(data_sample, axis=0, keepdims=True)
self.stds_ = np.std(data_sample, axis=0, keepdims=True)
def call(self, inputs):
return (inputs - self.means_) / (self.stds_ + keras.backend.epsilon())
```
使用這個標準化層之前,你需要使用`adapt()`方法將其適配到數據集樣本。這么做就能使用每個特征的平均值和標準差:
```py
std_layer = Standardization()
std_layer.adapt(data_sample)
```
這個樣本必須足夠大,可以代表數據集,但不必是完整的訓練集:通常幾百個隨機實例就夠了(但還是要取決于任務)。然后,就可以像普通層一樣使用這個預處理層了:
```py
model = keras.Sequential()
model.add(std_layer)
[...] # create the rest of the model
model.compile([...])
model.fit([...])
```
可能以后還會有`keras.layers.Normalization`層,和這個自定義`Standardization`層差不多:先創建層,然后對數據集做適配(向`adapt()`方法傳遞樣本),最后像普通層一樣使用。
接下來看看類型特征。先將其編碼為獨熱矢量。
### 使用獨熱矢量編碼類型特征
考慮下第 2 章中的加州房價數據集的`ocean_proximity`特征:這是一個類型特征,有五個值:`"<1H OCEAN"`、`"INLAND"`、`"NEAR OCEAN"`、`"NEAR BAY"`、`"ISLAND"`。輸入給神經網絡之前,需要對其進行編碼。因為類型不多,可以使用獨熱編碼。先將每個類型映射為索引(0 到 4),使用一張查詢表:
```py
vocab = ["<1H OCEAN", "INLAND", "NEAR OCEAN", "NEAR BAY", "ISLAND"]
indices = tf.range(len(vocab), dtype=tf.int64)
table_init = tf.lookup.KeyValueTensorInitializer(vocab, indices)
num_oov_buckets = 2
table = tf.lookup.StaticVocabularyTable(table_init, num_oov_buckets)
```
逐行看下代碼:
* 先定義詞典:也就是所有類型的列表。
* 然后創建張量,具有索引 0 到 4。
* 接著,創建查找表的初始化器,傳入類型列表和對應索引。在這個例子中,因為已經有了數據,所以直接用`KeyValueTensorInitializer`就成了;但如果類型是在文本中(一行一個類型),就要使用`TextFileInitializer`。
* 最后兩行創建了查找表,傳入初始化器并指明未登錄詞(oov)桶的數量。如果查找的類型不在詞典中,查找表會計算這個類型的哈希,使用哈希分配一個未知的類型給未登錄詞桶。索引序號接著現有序號,所以這個例子中的兩個未登錄詞的索引是 5 和 6。
為什么使用桶呢?如果類型數足夠大(例如,郵編、城市、詞、產品、或用戶),數據集也足夠大,或者數據集持續變化,這樣的話,獲取類型的完整列表就不容易了。一個解決方法是根據數據樣本定義(而不是整個訓練集),為其它不在樣本中的類型加上一些未登錄詞桶。訓練中碰到的未知類型越多,要使用的未登錄詞桶就要越多。事實上,如果未登錄詞桶的數量不夠,就會發生碰撞:不同的類型會出現在同一個桶中,所以神經網絡就無法區分了。
現在用查找表將小批次的類型特征編碼為獨熱矢量:
```py
>>> categories = tf.constant(["NEAR BAY", "DESERT", "INLAND", "INLAND"])
>>> cat_indices = table.lookup(categories)
>>> cat_indices
<tf.Tensor: id=514, shape=(4,), dtype=int64, numpy=array([3, 5, 1, 1])>
>>> cat_one_hot = tf.one_hot(cat_indices, depth=len(vocab) + num_oov_buckets)
>>> cat_one_hot
<tf.Tensor: id=524, shape=(4, 7), dtype=float32, numpy=
array([[0., 0., 0., 1., 0., 0., 0.],
[0., 0., 0., 0., 0., 1., 0.],
[0., 1., 0., 0., 0., 0., 0.],
[0., 1., 0., 0., 0., 0., 0.]], dtype=float32)>
```
可以看到,`"NEAR BAY"`映射到了索引 3,未知類型`"DESERT"`映射到了兩個未登錄詞桶之一(索引 5),`"INLAND"`映射到了索引 1 兩次。然后使用`tf.one_hot()`來做獨熱編碼。注意,需要告訴該函數索引的總數量,索引總數等于詞典大小加上未登錄詞桶的數量。現在你就知道如何用 TensorFlow 將類型特征編碼為獨熱矢量了。
和之前一樣,將這些操作寫成一個獨立的類并不難。`adapt()`方法接收一個數據樣本,提取其中的所有類型。創建一張查找表,將類型和索引映射起來。`call()`方法會使用查找表將輸入類型和索引建立映射。目前,Keras 已經有了一個名為`keras.layers.TextVectorization`的層,它的功能就是上面這樣:`adapt()`從樣本中提取詞表,`call()`將每個類型映射到詞表的索引。如果要將索引變為獨熱矢量的話,可以將這個層添加到模型開始的地方,后面根生一個可以用`tf.one_hot()`的`Lambda`層。
這可能不是最佳解決方法。每個獨熱矢量的大小是詞表長度加上未登錄詞桶的大小。當類型不多時,這么做可以,但如果詞表很大,最好使用“嵌入“來做。
> 提示:一個重要的原則,如果類型數小于 10,可以使用獨熱編碼。如果類型超過 50 個(使用哈希桶時通常如此),最好使用嵌入。類型數在 10 和 50 之間時,最好對兩種方法做個試驗,看哪個更合適。
### 使用嵌入編碼類型特征
嵌入是一個可訓練的表示類型的緊密矢量。默認時,嵌入是隨機初始化的,`"NEAR BAY"`可能初始化為`[0.131, 0.890]`,`"NEAR OCEAN"`可能初始化為`[0.631, 0.791]`。
這個例子中,使用的是 2D 嵌入,維度是一個可調節的超參數。因為嵌入是可以訓練的,它能在訓練中提高性能;當嵌入表示相似的類時,梯度下降會使相似的嵌入靠的更近,而`"INLAND"`會偏的更遠(見圖 13-4)。事實上,表征的越好,越利于神經網絡做出準確的預測,而訓練會讓嵌入更好的表征類型,這被稱為表征學習(第 17 章會介紹其它類型的表征學習)。
圖 13-4 嵌入的表征會在訓練中提高
> 詞嵌入
>
> 嵌入不僅可以實現當前任務的表征,同樣的嵌入也可以用于其它的任務。最常見的例子是詞嵌入(即,單個詞的嵌入):對于自然語言處理任務,最好使用預訓練的詞嵌入,而不是使用自己訓練的。
>
> 使用矢量表征詞可以追溯到 1960 年代,許多復雜的技術用于生成向量,包括使用神經網絡。進步發生在 2013 年,Tomá? Mikolov 和谷歌其它的研究院發表了一篇論文《Distributed Representations of Words and Phrases and their Compositionality》([https://arxiv.org/abs/1310.4546](https://links.jianshu.com/go?to=https%3A%2F%2Farxiv.org%2Fabs%2F1310.4546)),介紹了一種用神經網絡學習詞嵌入的技術,效果遠超以前的技術。可以實現在大文本語料上學習嵌入:用神經網絡預測給定詞附近的詞,得到了非常好的詞嵌入。例如,同義詞有非常相近的詞嵌入,語義相近的詞,比如法國、西班牙和意大利靠的也很近。
>
> 不止是相近:詞嵌入在嵌入空間的軸上的分布也是有意義的。下面是一個著名的例子:如果計算 King – Man + Woman,結果與 Queen 非常相近(見圖 13-5)。換句話,詞嵌入編碼了性別。相似的,可以計算 Madrid – Spain + France,結果和 Paris 很近。
>
> 圖 13-5 相似詞的詞嵌入也相近,一些軸編碼了概念
>
> 但是,詞嵌入有時偏差很大。例如,盡管詞嵌入學習到了男人是國王,女人是王后,詞嵌入還學到了男人是醫生、女人是護士。這是非常大的性別偏差。
來看下如何手動實現嵌入。首先,需要創建一個包含每個類型嵌入(隨機初始化)的嵌入矩陣。每個類型就有一行,每個未登錄詞桶就有一行,每個嵌入維度就有一列:
```py
embedding_dim = 2
embed_init = tf.random.uniform([len(vocab) + num_oov_buckets, embedding_dim])
embedding_matrix = tf.Variable(embed_init)
```
這個例子用的是 2D 嵌入,通常的嵌入是 10 到 300 維,取決于任務和詞表大小(需要調節詞表大小超參數)。
嵌入矩陣是一個隨機的 6 × 2 矩陣,存入一個變量(因此可以在訓練中被梯度下降調節):
```py
>>> embedding_matrix
<tf.Variable 'Variable:0' shape=(6, 2) dtype=float32, numpy=
array([[0.6645621 , 0.44100678],
[0.3528825 , 0.46448255],
[0.03366041, 0.68467236],
[0.74011743, 0.8724445 ],
[0.22632635, 0.22319686],
[0.3103881 , 0.7223358 ]], dtype=float32)>
```
使用嵌入編碼之前的類型特征:
```py
>>> categories = tf.constant(["NEAR BAY", "DESERT", "INLAND", "INLAND"])
>>> cat_indices = table.lookup(categories)
>>> cat_indices
<tf.Tensor: id=741, shape=(4,), dtype=int64, numpy=array([3, 5, 1, 1])>
>>> tf.nn.embedding_lookup(embedding_matrix, cat_indices)
<tf.Tensor: id=864, shape=(4, 2), dtype=float32, numpy=
array([[0.74011743, 0.8724445 ],
[0.3103881 , 0.7223358 ],
[0.3528825 , 0.46448255],
[0.3528825 , 0.46448255]], dtype=float32)>
```
`tf.nn.embedding_lookup()`函數根據給定的索引在嵌入矩陣中查找行。例如,查找表說`"INLAND"`類型位于索引 1,`tf.nn.embedding_lookup()`就返回嵌入矩陣的行 1:`[0.3528825, 0.46448255]`。
Keras 提供了`keras.layers.Embedding`層來處理嵌入矩陣(默認可訓練);當這個層初始化時,會隨機初始化嵌入矩陣,當被調用時,就返回索引所在的嵌入矩陣的那行:
```py
>>> embedding = keras.layers.Embedding(input_dim=len(vocab) + num_oov_buckets,
... output_dim=embedding_dim)
...
>>> embedding(cat_indices)
<tf.Tensor: id=814, shape=(4, 2), dtype=float32, numpy=
array([[ 0.02401174, 0.03724445],
[-0.01896119, 0.02223358],
[-0.01471175, -0.00355174],
[-0.01471175, -0.00355174]], dtype=float32)>
```
將這些內容放到一起,創建一個 Keras 模型,可以處理類型特征(和數值特征),學習每個類型(和未登錄詞)的嵌入:
```py
regular_inputs = keras.layers.Input(shape=[8])
categories = keras.layers.Input(shape=[], dtype=tf.string)
cat_indices = keras.layers.Lambda(lambda cats: table.lookup(cats))(categories)
cat_embed = keras.layers.Embedding(input_dim=6, output_dim=2)(cat_indices)
encoded_inputs = keras.layers.concatenate([regular_inputs, cat_embed])
outputs = keras.layers.Dense(1)(encoded_inputs)
model = keras.models.Model(inputs=[regular_inputs, categories],
outputs=[outputs])
```
這個模型有兩個輸入:一個常規輸入,每個實例包括 8 個數值特征,機上一個類型特征。使用`Lambda`層查找每個類型的索引,然后用索引查找嵌入。接著,將嵌入和常規輸入連起來,作為編碼輸入進神經網絡。此時可以加入任意種類的神經網絡,但只是添加了一個緊密輸出層。
當`keras.layers.TextVectorization`準備好之后,可以調用它的`adapt()`方法,從數據樣本提取詞表(會自動創建查找表)。然后加入到模型中,就可以執行索引查找了(替換前面代碼的`Lambda`層)。
> 筆記:獨熱編碼加緊密層(沒有激活函數和偏差項),等價于嵌入層。但是,嵌入層用的計算更少(嵌入矩陣越大,性能差距越明顯)。緊密層的權重矩陣扮演的是嵌入矩陣的角色。例如,大小為 20 的獨熱矢量和 10 個單元的緊密層加起來,等價于`input_dim=20`、`output_dim=10`的嵌入層。作為結果,嵌入的維度超過后面的層的神經元數是浪費的。
再進一步看看 Keras 的預處理層。
### Keras 預處理層
Keras 團隊打算提供一套標準的 Keras 預處理層,現在已經可用了,[鏈接](https://links.jianshu.com/go?to=https%3A%2F%2Fgithub.com%2Fkeras-team%2Fgovernance%2Fblob%2Fmaster%2Frfcs%2F20190502-preprocessing-layers.md)。新的 API 可能會覆蓋舊的 Feature Columns API。
我們已經討論了其中的兩個:`keras.layers.Normalization`用來做特征標準化,`TextVectorization`層用于將文本中的詞編碼為詞典的索引。對于這兩個層,都是用數據樣本調用它的`adapt()`方法,然后如常使用。其它的預處理層也是這么使用的。
API 中還提供了`keras.layers.Discretization`層,它能將連續數據切成不同的組,將每個組斌嗎為獨熱矢量。例如,可以用它將價格分成是三類,低、中、高,編碼為[1, 0, 0]、[0, 1, 0]、[0, 0, 1]。當然,這么做會損失很多信息,但有時,相對于連續數據,這么做可以發現不那么明顯的規律。
> 警告:`Discretization`層是不可微的,只能在模型一開始使用。事實上,模型的預處理層會在訓練時凍結,因此預處理層的參數不會被梯度下降影響,所以可以是不可微的。這還意味著,如果想讓預處理層可訓練的話,不能在自定義預處理層上直接使用嵌入層,而是應該像前民的例子那樣分開來做。
還可以用類`PreprocessingStage`將多個預處理層鏈接起來。例如,下面的代碼創建了一個預處理管道,先將輸入歸一化,然后離散(有點類似 Scikit-Learn 的管道)。當將這個管道應用到數據樣本時,可以作為常規層使用(還得是在模型的前部,因為包含不可微分的預處理層):
```py
normalization = keras.layers.Normalization()
discretization = keras.layers.Discretization([...])
pipeline = keras.layers.PreprocessingStage([normalization, discretization])
pipeline.adapt(data_sample)
```
`TextVectorization`層也有一個選項用于輸出詞頻向量,而不是詞索引。例如,如果詞典包括三個詞,比如`["and", "basketball", "more"]`,則`"more and more"`會映射為`[1, 0, 2]`:`"and"`出現了一次,`"basketball"`沒有出現,`"more"`出現了兩次。這種詞表征稱為詞袋,因為它完全失去了詞的順序。常見詞,比如`"and"`,會在文本中有更高的值,盡管沒什么實際意義。因此,詞頻向量中應該降低常見詞的影響。一個常見的方法是將詞頻除以出現該詞的文檔數的對數。這種方法稱為詞頻-逆文檔頻率(TF-IDF)。例如,假設`"and"`、`"basketball"`、`"more"`分別出現在了 200、10、100 個文檔中:最終的矢量應該是`[1/log(200), 0/log(10), 2/log(100)]`,大約是`[0.19, 0., 0.43]`。`TextVectorization`層會有 TF-IDF 的選項。
> 筆記:如果標準預處理層不能滿足你的任務,你還可以選擇創建自定義預處理層,就像前面的`Standardization`。創建一個`keras.layers.PreprocessingLayer`子類,`adapt()`方法用于接收一個`data_sample`參數,或者再有一個`reset_state`參數:如果是`True`,則`adapt()`方法在計算新狀態之前重置現有的狀態;如果是`False`,會更新現有的狀態。
可以看到,這些 Keras 預處理層可以使預處理更容易!現在,無論是自定義預處理層,還是使用 Keras 的,預處理都可以實時進行了。但在訓練中,最好再提前進行預處理。下面來看看為什么,以及怎么做。
## TF Transform
預處理非常消耗算力,訓練前做預處理相對于實時處理,可以極大的提高速度:數據在訓練前,每個實例就處理一次,而不是在訓練中每個實例在每個周期就處理一次。前面提到過,如果數據集小到可以存入內存,可以使用`cache()`方法。但如果太大,可以使用 Apache Beam 或 Spark。它們可以在大數據上做高效的數據預處理,還可以分布進行,使用它們就能在訓練前處理所有訓練數據了。
雖然訓練加速了,但帶來一個問題:一旦模型訓練好了,假如想部署到移動 app 上,還是需要寫一些預處理數據的代碼。假如想部署到 TensorFlow.js,還是需要預處理代碼。這是一個維護難題:無論何時想改變預處理邏輯,都需要更新 Apache Beam 的代碼、移動端代碼、JavaScript 代碼。不僅耗時,也容易出錯:不同端的可能有細微的差別。訓練/實際產品表現之間的偏差會導致 bug 或使效果大打折扣。
一種解決辦法是在部署到 app 或瀏覽器之前,給訓練好的模型加上額外的預處理層,來做實時的預處理。這樣好多了,只有兩套代碼 Apache Beam 或 Spark 代碼,和預處理層代碼。
如果只需定義一次預處理操作呢?這就是 TF Transform 要做的。TF Transform 是[TensorFlow Extended (TFX)](https://links.jianshu.com/go?to=https%3A%2F%2Ftensorflow.org%2Ftfx)的一部分,這是一個端到端的 TensorFlow 模型生產化平臺。首先,需要安裝(TensorFlow 沒有捆綁)。然后通過 TF Transform 函數來做縮放、分桶等操作,一次性定義預處理函數。你還可以使用任意需要的 TensorFlow 運算。如果只有兩個特征,預處理函數可能如下:
```py
import tensorflow_transform as tft
def preprocess(inputs): # inputs = 輸入特征批次
median_age = inputs["housing_median_age"]
ocean_proximity = inputs["ocean_proximity"]
standardized_age = tft.scale_to_z_score(median_age)
ocean_proximity_id = tft.compute_and_apply_vocabulary(ocean_proximity)
return {
"standardized_median_age": standardized_age,
"ocean_proximity_id": ocean_proximity_id
}
```
然后,TF Transform 可以使用 Apache Beam(可以使用其`AnalyzeAndTransformDataset`類)在整個訓練集上應用這個`preprocess()`函數。在使用過程中,還會計算整個訓練集上的必要統計數據:這個例子中,是`housing_median_age`和`the ocean_proximity`的平均值和標準差。計算這些數據的組件稱為分析器。
更重要的,TF Transform 還會生成一個等價的 TensorFlow 函數,可以放入部署的模型中。這個 TF 函數包括一些常量,對應于 Apache Beam 的統計值(平均值、標準差和詞典)。
有了 Data API、TFRecord,Keras 預處理層和 TF Transform,可以為訓練搭建高度伸縮的輸入管道,可以是生產又快,遷移性又好。
但是,如果只想使用標準數據集呢?只要使用 TFDS 就成了。
## TensorFlow Datasets(TFDS)項目
從[TensorFlow Datasets](https://links.jianshu.com/go?to=https%3A%2F%2Ftensorflow.org%2Fdatasets)項目,可以非常方便的下載一些常見的數據集,從小數據集,比如 MNIST 或 Fashion MNIST,到大數據集,比如 ImageNet(需要大硬盤)。包括了圖片數據集、文本數據集(包括翻譯數據集)、和音頻視頻數據集。可以訪問[https://www.tensorflow.org/datasets/datasets](https://links.jianshu.com/go?to=https%3A%2F%2Fwww.tensorflow.org%2Fdatasets%2Fdatasets),查看完整列表,每個數據集都有介紹。
TensorFlow 沒有捆綁 TFDS,所以需要使用 pip 安裝庫`tensorflow-datasets`。然后調用函數`tfds.load()`,就能下載數據集了(除非之前下載過),返回的數據是數據集的字典(通常是一個是訓練集,一個是測試集)。例如,下載 MNIST:
```py
import tensorflow_datasets as tfds
dataset = tfds.load(name="mnist")
mnist_train, mnist_test = dataset["train"], dataset["test"]
```
然后可以對其應用任意轉換(打散、批次、預提取),然后就可以訓練模型了。下面是一個簡單的例子:
```py
mnist_train = mnist_train.shuffle(10000).batch(32).prefetch(1)
for item in mnist_train:
images = item["image"]
labels = item["label"]
[...]
```
> 提示:`load()`函數打散了每個下載的數據分片(只是對于訓練集)。但還不夠,最好再自己做打散。
注意,數據集中的每一項都是一個字典,包含特征和標簽。但 Keras 期望每項都是一個包含兩個元素(特征和標簽)的元組。可以使用`map()`對數據集做轉換,如下:
```py
mnist_train = mnist_train.shuffle(10000).batch(32)
mnist_train = mnist_train.map(lambda items: (items["image"], items["label"]))
mnist_train = mnist_train.prefetch(1)
```
更簡單的方式是讓`load()`函數來做這個工作,只要設定`as_supervised=True`(顯然這只適用于有標簽的數據集)。你還可以將數據集直接傳給 tf.keras 模型:
```py
dataset = tfds.load(name="mnist", batch_size=32, as_supervised=True)
mnist_train = dataset["train"].prefetch(1)
model = keras.models.Sequential([...])
model.compile(loss="sparse_categorical_crossentropy", optimizer="sgd")
model.fit(mnist_train, epochs=5)
```
這一章很技術,你可能覺得沒有神經網絡的抽象美,但事實是深度學習經常要涉及大數據集,知道如果高效加載、解析和預處理,是一個非常重要的技能。下一章會學習卷積神經網絡,它是一種用于圖像處理和其它應用的、非常成功的神經網絡。
## 練習
1. 為什么要使用 Data API ?
2. 將大數據分成多個文件有什么好處?
3. 訓練中,如何斷定輸入管道是瓶頸?如何處理瓶頸?
4. 可以將任何二進制數據存入 TFRecord 文件嗎,還是只能存序列化的協議緩存?
5. 為什么要將數據轉換為 Example 協議緩存?為什么不使用自己的協議緩存?
6. 使用 TFRecord 時,什么時候要壓縮?為什么不系統化的做?
7. 數據預處理可以在寫入數據文件時,或在 tf.data 管道中,或在預處理層中,或使用 TF Transform。這幾種方法各有什么優缺點?
8. 說出幾種常見的編碼類型特征的方法。文本如何編碼?
9.加載 Fashion MNIST 數據集;將其分成訓練集、驗證集和測試集;打散訓練集;將每個數據及村委多個 TFRecord 文件。每條記錄應該是有兩個特征的序列化的 Example 協議緩存:序列化的圖片(使用`tf.io.serialize_tensor()`序列化每張圖片),和標簽。然后使用 tf.data 為每個集合創建一個高效數據集。最后,使用 Keras 模型訓練這些數據集,用預處理層標準化每個特征。讓輸入管道越高效越好,使用 TensorBoard 可視化地分析數據。
10. 在這道題中,你要下載一個數據集,分割它,創建一個 tf.data.Dataset,用于高效加載和預處理,然后搭建一個包含嵌入層的二分類模型:
a. 下載[Large Movie Review Dataset](https://links.jianshu.com/go?to=https%3A%2F%2Fhoml.info%2Fimdb),它包含 50000 條 IMDB 的影評。數據分為兩個目錄,train 和 test,每個包含 12500 條正面評價和 12500 條負面評價。每條評價都存在獨立的文本文件中。還有其他文件和文件夾(包括預處理的詞袋),但這個練習中用不到。
b. 將測試集分給成驗證集(15000)和測試集(10000)。
c. 使用 tf.data,為每個集合創建高效數據集。
d.創建一個二分類模型,使用`TextVectorization`層來預處理每條影評。如果`TextVectorization`層用不了(或者你想挑戰下),則創建自定義的預處理層:使用`tf.strings`包中的函數,比如`lower()`來做小寫,`regex_replace()`來替換帶有空格的標點,`split()`來分割詞。用查找表輸出詞索引,`adapt()`方法中要準備好。
e. 加入嵌入層,計算每條評論的平均嵌入,乘以詞數的平方根。這個縮放過的平均嵌入可以傳入剩余的模型中。
f. 訓練模型,看看準確率能達到多少。嘗試優化管道,讓訓練越快越好。
g. 施一公 TFDS 加載同樣的數據集:`tfds.load("imdb_reviews")`。
參考答案見附錄 A。