Map 是一種很常見的數據結構,用于存儲一些無序的鍵值對。在主流的編程語言中,默認就自帶它的實現。C、C++ 中的 STL 就實現了 Map,JavaScript 中也有 Map,Java 中有 HashMap,Swift 和 Python 中有 Dictionary,Go 中有 Map,Objective-C 中有 NSDictionary、NSMutableDictionary。
上面這些 Map 都是線程安全的么?答案是否定的,并非全是線程安全的。那如何能實現一個線程安全的 Map 呢?想回答這個問題,需要先從如何實現一個 Map 說起。
一. 選用什么數據結構實現 Map ?
Map 是一個非常常用的數據結構,一個無序的 key/value 對的集合,其中 Map 所有的 key 都是不同的,然后通過給定的 key 可以在常數時間 O(1) 復雜度內查找、更新或刪除對應的 value。
要想實現常數級的查找,應該用什么來實現呢?讀者應該很快會想到哈希表。確實,Map 底層一般都是使用數組來實現,會借用哈希算法輔助。對于給定的 key,一般先進行 hash 操作,然后相對哈希表的長度取模,將 key 映射到指定的地方。
哈希算法有很多種,選哪一種更加高效呢?
1. 哈希函數
MD5 和 SHA1 可以說是目前應用最廣泛的 Hash 算法,而它們都是以 MD4 為基礎設計的。
MD4(RFC 1320) 是 MIT 的Ronald L. Rivest 在 1990 年設計的,MD 是 Message Digest(消息摘要) 的縮寫。它適用在32位字長的處理器上用高速軟件實現——它是基于 32位操作數的位操作來實現的。
MD5(RFC 1321) 是 Rivest 于1991年對 MD4 的改進版本。它對輸入仍以512位分組,其輸出是4個32位字的級聯,與 MD4 相同。MD5 比 MD4 來得復雜,并且速度較之要慢一點,但更安全,在抗分析和抗差分方面表現更好。
SHA1 是由 NIST NSA 設計為同 DSA 一起使用的,它對長度小于264的輸入,產生長度為160bit 的散列值,因此抗窮舉 (brute-force)
性更好。SHA-1 設計時基于和 MD4 相同原理,并且模仿了該算法。
常用的 hash 函數有 SHA-1,SHA-256,SHA-512,MD5 。這些都是經典的 hash 算法。在現代化生產中,還會用到現代的 hash 算法。下面列舉幾個,進行性能對比,最后再選其中一個源碼分析一下實現過程。
(1) Jenkins Hash 和 SpookyHash
1997年 Bob Jenkins 在《 Dr. Dobbs Journal》雜志上發表了一片關于散列函數的文章《A hash function for hash Table lookup》。這篇文章中,Bob 廣泛收錄了很多已有的散列函數,這其中也包括了他自己所謂的“lookup2”。隨后在2006年,Bob 發布了 lookup3。lookup3 即為 Jenkins Hash。更多有關 Bob’s 散列函數的內容請參閱維基百科:Jenkins hash function。memcached的 hash 算法,支持兩種算法:jenkins, murmur3,默認是 jenkins。
2011年 Bob Jenkins 發布了他自己的一個新散列函數
SpookyHash(這樣命名是因為它是在萬圣節發布的)。它們都擁有2倍于 MurmurHash 的速度,但他們都只使用了64位數學函數而沒有32位版本,SpookyHash 給出128位輸出。
(2) MurmurHash
MurmurHash 是一種非加密型哈希函數,適用于一般的哈希檢索操作。
Austin Appleby 在2008年發布了一個新的散列函數——MurmurHash。其最新版本大約是 lookup3 速度的2倍(大約為1 byte/cycle),它有32位和64位兩個版本。32位版本只使用32位數學函數并給出一個32位的哈希值,而64位版本使用了64位的數學函數,并給出64位哈希值。根據Austin的分析,MurmurHash具有優異的性能,雖然 Bob Jenkins 在《Dr. Dobbs article》雜志上聲稱“我預測 MurmurHash 比起lookup3要弱,但是我不知道具體值,因為我還沒測試過它”。MurmurHash能夠迅速走紅得益于其出色的速度和統計特性。當前的版本是MurmurHash3,Redis、Memcached、Cassandra、HBase、Lucene都在使用它。
下面是用 C 實現 MurmurHash 的版本:
C
uint32_t murmur3_32(const char *key, uint32_t len, uint32_t seed) {
static const uint32_t c1 = 0xcc9e2d51;
static const uint32_t c2 = 0x1b873593;
static const uint32_t r1 = 15;
static const uint32_t r2 = 13;
static const uint32_t m = 5;
static const uint32_t n = 0xe6546b64;
uint32_t hash = seed;
const int nblocks = len / 4;
const uint32_t *blocks = (const uint32_t *) key;
int i;
for (i = 0; i < nblocks; i++) {
uint32_t k = blocks[i];
k *= c1;
k = (k << r1) | (k >> (32 - r1));
k *= c2;
hash ^= k;
hash = ((hash << r2) | (hash >> (32 - r2))) * m + n;
}
const uint8_t *tail = (const uint8_t *) (key + nblocks * 4);
uint32_t k1 = 0;
switch (len & 3) {
case 3:
k1 ^= tail[2] << 16;
case 2:
k1 ^= tail[1] << 8;
case 1:
k1 ^= tail[0];
k1 *= c1;
k1 = (k1 << r1) | (k1 >> (32 - r1));
k1 *= c2;
hash ^= k1;
}
hash ^= len;
hash ^= (hash >> 16);
hash *= 0x85ebca6b;
hash ^= (hash >> 13);
hash *= 0xc2b2ae35;
hash ^= (hash >> 16);
return hash;
}
(3) CityHash 和 FramHash
這兩種算法都是 Google 發布的字符串算法。
CityHash 是2011年 Google 發布的字符串散列算法,和 murmurhash 一樣,屬于非加密型 hash 算法。CityHash 算法的開發是受到了 MurmurHash 的啟發。其主要優點是大部分步驟包含了至少兩步獨立的數學運算。現代 CPU 通常能從這種代碼獲得最佳性能。CityHash 也有其缺點:代碼較同類流行算法復雜。Google 希望為速度而不是為了簡單而優化,因此沒有照顧較短輸入的特例。Google發布的有兩種算法:cityhash64 與 cityhash128。它們分別根據字串計算 64 和 128 位的散列值。這些算法不適用于加密,但適合用在散列表等處。CityHash 的速度取決于CRC32 指令,目前為SSE 4.2(Intel Nehalem及以后版本)。
相比 Murmurhash 支持32、64、128bit, Cityhash 支持64、128、256bit 。
2014年 Google 又發布了 FarmHash,一個新的用于字符串的哈希函數系列。FarmHash 從 CityHash 繼承了許多技巧和技術,是它的后繼。FarmHash 有多個目標,聲稱從多個方面改進了 CityHash。與 CityHash 相比,FarmHash 的另一項改進是在多個特定于平臺的實現之上提供了一個接口。這樣,當開發人員只是想要一個用于哈希表的、快速健壯的哈希函數,而不需要在每個平臺上都一樣時,FarmHash 也能滿足要求。目前,FarmHash 只包含在32、64和128位平臺上用于字節數組的哈希函數。未來開發計劃包含了對整數、元組和其它數據的支持。
(4) xxHash
xxHash 是由 Yann Collet 創建的非加密哈希函數。它最初用于 LZ4 壓縮算法,作為最終的錯誤檢查簽名的。該 hash 算法的速度接近于 RAM 的極限。并給出了32位和64位的兩個版本。現在它被廣泛使用在PrestoDB、RocksDB、MySQL、ArangoDB、PGroonga、Spark 這些數據庫中,還用在了 Cocos2D、Dolphin、Cxbx-reloaded 這些游戲框架中,
下面這有一個性能對比的實驗。測試環境是 Open-Source SMHasher program by Austin Appleby ,它是在 Windows 7 上通過 Visual C 編譯出來的,并且它只有唯一一個線程。CPU 內核是 Core 2 Duo @3.0GHz。
上表里面的 hash 函數并不是所有的 hash 函數,只列舉了一些常見的算法。第二欄是速度的對比,可以看出來速度最快的是 xxHash 。第三欄是哈希的質量,哈希質量最高的有5個,全是5星,xxHash、MurmurHash 3a、CityHash64、MD5-32、SHA1-32 。從表里的數據看,哈希質量最高,速度最快的還是 xxHash。
(4) memhash
這個哈希算法筆者沒有在網上找到很明確的作者信息。只在 Google 的 Go 的文檔上有這么幾行注釋,說明了它的靈感來源:
Go
// Hashing algorithm inspired by
// xxhash: https://code.google.com/p/xxhash/
// cityhash: https://code.google.com/p/cityhash/
它說 memhash 的靈感來源于 xxhash 和 cityhash。那么接下來就來看看 memhash 是怎么對字符串進行哈希的。
Go
const (
// Constants for multiplication: four random odd 32-bit numbers.
m1 = 3168982561
m2 = 3339683297
m3 = 832293441
m4 = 2336365089
)
func memhash(p unsafe.Pointer, seed, s uintptr) uintptr {
if GOARCH == "386" && GOOS != "nacl" && useAeshash {
return aeshash(p, seed, s)
}
h := uint32(seed + s*hashkey[0])
tail:
switch {
case s == 0:
case s < 4:
h ^= uint32(*(*byte)(p))
h ^= uint32(*(*byte)(add(p, s>>1))) << 8
h ^= uint32(*(*byte)(add(p, s-1))) << 16
h = rotl_15(h*m1) * m2
case s == 4:
h ^= readUnaligned32(p)
h = rotl_15(h*m1) * m2
case s <= 8:
h ^= readUnaligned32(p)
h = rotl_15(h*m1) * m2
h ^= readUnaligned32(add(p, s-4))
h = rotl_15(h*m1) * m2
case s <= 16:
h ^= readUnaligned32(p)
h = rotl_15(h*m1) * m2
h ^= readUnaligned32(add(p, 4))
h = rotl_15(h*m1) * m2
h ^= readUnaligned32(add(p, s-8))
h = rotl_15(h*m1) * m2
h ^= readUnaligned32(add(p, s-4))
h = rotl_15(h*m1) * m2
default:
v1 := h
v2 := uint32(seed * hashkey[1])
v3 := uint32(seed * hashkey[2])
v4 := uint32(seed * hashkey[3])
for s >= 16 {
v1 ^= readUnaligned32(p)
v1 = rotl_15(v1*m1) * m2
p = add(p, 4)
v2 ^= readUnaligned32(p)
v2 = rotl_15(v2*m2) * m3
p = add(p, 4)
v3 ^= readUnaligned32(p)
v3 = rotl_15(v3*m3) * m4
p = add(p, 4)
v4 ^= readUnaligned32(p)
v4 = rotl_15(v4*m4) * m1
p = add(p, 4)
s -= 16
}
h = v1 ^ v2 ^ v3 ^ v4
goto tail
}
h ^= h >> 17
h *= m3
h ^= h >> 13
h *= m4
h ^= h >> 16
return uintptr(h)
}
// Note: in order to get the compiler to issue rotl instructions, we
// need to constant fold the shift amount by hand.
// TODO: convince the compiler to issue rotl instructions after inlining.
func rotl_15(x uint32) uint32 {
return (x << 15) | (x >> (32 - 15))
}
m1、m2、m3、m4 是4個隨機選的奇數,作為哈希的乘法因子。
Go
// used in hash{32,64}.go to seed the hash function
var hashkey [4]uintptr
func alginit() {
// Install aes hash algorithm if we have the instructions we need
if (GOARCH == "386" || GOARCH == "amd64") &&
GOOS != "nacl" &&
cpuid_ecx&(1<<25) != 0 && // aes (aesenc)
cpuid_ecx&(1<<9) != 0 && // sse3 (pshufb)
cpuid_ecx&(1<<19) != 0 { // sse4.1 (pinsr{d,q})
useAeshash = true
algarray[alg_MEM32].hash = aeshash32
algarray[alg_MEM64].hash = aeshash64
algarray[alg_STRING].hash = aeshashstr
// Initialize with random data so hash collisions will be hard to engineer.
getRandomData(aeskeysched[:])
return
}
getRandomData((*[len(hashkey) * sys.PtrSize]byte)(unsafe.Pointer(&hashkey))[:])
hashkey[0] |= 1 // make sure these numbers are odd
hashkey[1] |= 1
hashkey[2] |= 1
hashkey[3] |= 1
}
在這個初始化的函數中,初始化了2個數組,數組里面裝的都是隨機的 hashkey。在 386、 amd64、非 nacl 的平臺上,會用 aeshash 。這里會把隨機的 key 生成好,存入到 aeskeysched 數組中。同理,hashkey 數組里面也會隨機好4個數字。最后都按位與了一個1,就是為了保證生成出來的隨機數都是奇數。
接下來舉個例子,來看看 memhash 究竟是如何計算哈希值的。
Go
func main() {
r := [8]byte{'h', 'a', 'l', 'f', 'r', 'o', 's', 't'}
pp := memhashpp(unsafe.Pointer(&r), 3, 7)
fmt.Println(pp)
}
為了簡單起見,這里用筆者的名字為例算出哈希值,種子簡單一點設置成3。
第一步計算 h 的值。
Go
h := uint32(seed + s*hashkey[0])
這里假設 hashkey[0] = 1,那么 h 的值為 3 + 7 * 1 = 10 。由于 s < 8,那么就會進行以下的處理:
Go
case s <= 8:
h ^= readUnaligned32(p)
h = rotl_15(h*m1) * m2
h ^= readUnaligned32(add(p, s-4))
h = rotl_15(h*m1) * m2
readUnaligned32()函數會把傳入的 unsafe.Pointer 指針進行2次轉換,先轉成 *uint32 類型,然后再轉成 *(*uint32) 類型。
接著進行異或操作:
接著第二步 h * m1 = 1718378850 * 3168982561 = 3185867170
由于是32位的乘法,最終結果是64位的,高32位溢出,直接舍棄。
乘出來的結果當做 rotl_15() 入參。
Go
func rotl_15(x uint32) uint32 {
return (x << 15) | (x >> (32 - 15))
}
這個函數里面對入參進行了兩次位移操作。
最后將兩次位移的結果進行邏輯或運算:
接著再進行一次 readUnaligned32() 轉換:
轉換完再進行一次異或。此時 h = 2615762644。
然后還要再進行一次 rotl_15() 變換。這里就不畫圖演示了。變換完成以后 h = 2932930721。
最后執行 hash 的最后一步:
Go
h ^= h >> 17
h *= m3
h ^= h >> 13
h *= m4
h ^= h >> 16
先右移17位,然后異或,再乘以m3,再右移13位,再異或,再乘以m4,再右移16位,最后再異或。
通過這樣一系列的操作,最后就能生成出 hash 值了。最后 h = 1870717864。感興趣的同學可以算一算。
(5)AES Hash
在上面分析 Go 的 hash 算法的時候,我們可以看到它對 CPU 是否支持 AES 指令集進行了判斷,當 CPU 支持 AES 指令集的時候,它會選用 AES Hash 算法,當 CPU 不支持 AES 指令集的時候,換成 memhash 算法。
AES 指令集全稱是高級加密標準指令集(或稱英特爾高級加密標準新指令,簡稱AES-NI)是一個 x86指令集架構 的擴展,用于 Intel 和 AMD微處理器 。
利用 AES 實現 Hash 算法性能會很優秀,因為它能提供硬件加速。
具體代碼實現如下,匯編程序,注釋見下面程序中:
C
// aes hash 算法通過 AES 硬件指令集實現
TEXT runtime·aeshash(SB),NOSPLIT,$0-32
MOVQ p+0(FP), AX // 把ptr移動到data數據段中
MOVQ s+16(FP), CX // 長度
LEAQ ret+24(FP), DX
JMP runtime·aeshashbody(SB)
TEXT runtime·aeshashstr(SB),NOSPLIT,$0-24
MOVQ p+0(FP), AX // 把ptr移動到字符串的結構體中
MOVQ 8(AX), CX // 字符串長度
MOVQ (AX), AX // 字符串的數據
LEAQ ret+16(FP), DX
JMP runtime·aeshashbody(SB)
最終的 hash 的實現都在 aeshashbody 中:
C
// AX: 數據
// CX: 長度
// DX: 返回的地址
TEXT runtime·aeshashbody(SB),NOSPLIT,$0-0
// SSE 寄存器中裝填入我們的隨機數種子
MOVQ h+8(FP), X0 // 每個table中hash種子有64 位
PINSRW $4, CX, X0 // 長度占16位
PSHUFHW $0, X0, X0 // 壓縮高位字亂序,重復長度4次
MOVO X0, X1 // 保存加密前的種子
PXOR runtime·aeskeysched(SB), X0 // 對每一個處理中的種子進行邏輯異或
AESENC X0, X0 // 加密種子
CMPQ CX, $16
JB aes0to15
JE aes16
CMPQ CX, $32
JBE aes17to32
CMPQ CX, $64
JBE aes33to64
CMPQ CX, $128
JBE aes65to128
JMP aes129plus
// aes 從 0 - 15
aes0to15:
TESTQ CX, CX
JE aes0
ADDQ $16, AX
TESTW $0xff0, AX
JE endofpage
//當前加載的16位字節的地址不會越過一個頁面邊界,所以我們可以直接加載它。
MOVOU -16(AX), X1
ADDQ CX, CX
MOVQ $masks<>(SB), AX
PAND (AX)(CX*8), X1
final1:
PXOR X0, X1 // 異或數據和種子
AESENC X1, X1 // 連續加密3次
AESENC X1, X1
AESENC X1, X1
MOVQ X1, (DX)
RET
endofpage:
// 地址結尾是1111xxxx。 這樣就可能超過一個頁面邊界,所以在加載完最后一個字節后停止加載。然后使用pshufb將字節向下移動。
MOVOU -32(AX)(CX*1), X1
ADDQ CX, CX
MOVQ $shifts<>(SB), AX
PSHUFB (AX)(CX*8), X1
JMP final1
aes0:
// 返回輸入的并且已經加密過的種子
AESENC X0, X0
MOVQ X0, (DX)
RET
aes16:
MOVOU (AX), X1
JMP final1
aes17to32:
// 開始處理第二個起始種子
PXOR runtime·aeskeysched+16(SB), X1
AESENC X1, X1
// 加載要被哈希算法處理的數據
MOVOU (AX), X2
MOVOU -16(AX)(CX*1), X3
// 異或種子
PXOR X0, X2
PXOR X1, X3
// 連續加密3次
AESENC X2, X2
AESENC X3, X3
AESENC X2, X2
AESENC X3, X3
AESENC X2, X2
AESENC X3, X3
// 拼接并生成結果
PXOR X3, X2
MOVQ X2, (DX)
RET
aes33to64:
// 處理第三個以上的起始種子
MOVO X1, X2
MOVO X1, X3
PXOR runtime·aeskeysched+16(SB), X1
PXOR runtime·aeskeysched+32(SB), X2
PXOR runtime·aeskeysched+48(SB), X3
AESENC X1, X1
AESENC X2, X2
AESENC X3, X3
MOVOU (AX), X4
MOVOU 16(AX), X5
MOVOU -32(AX)(CX*1), X6
MOVOU -16(AX)(CX*1), X7
PXOR X0, X4
PXOR X1, X5
PXOR X2, X6
PXOR X3, X7
AESENC X4, X4
AESENC X5, X5
AESENC X6, X6
AESENC X7, X7
AESENC X4, X4
AESENC X5, X5
AESENC X6, X6
AESENC X7, X7
AESENC X4, X4
AESENC X5, X5
AESENC X6, X6
AESENC X7, X7
PXOR X6, X4
PXOR X7, X5
PXOR X5, X4
MOVQ X4, (DX)
RET
aes65to128:
// 處理第七個以上的起始種子
MOVO X1, X2
MOVO X1, X3
MOVO X1, X4
MOVO X1, X5
MOVO X1, X6
MOVO X1, X7
PXOR runtime·aeskeysched+16(SB), X1
PXOR runtime·aeskeysched+32(SB), X2
PXOR runtime·aeskeysched+48(SB), X3
PXOR runtime·aeskeysched+64(SB), X4
PXOR runtime·aeskeysched+80(SB), X5
PXOR runtime·aeskeysched+96(SB), X6
PXOR runtime·aeskeysched+112(SB), X7
AESENC X1, X1
AESENC X2, X2
AESENC X3, X3
AESENC X4, X4
AESENC X5, X5
AESENC X6, X6
AESENC X7, X7
// 加載數據
MOVOU (AX), X8
MOVOU 16(AX), X9
MOVOU 32(AX), X10
MOVOU 48(AX), X11
MOVOU -64(AX)(CX*1), X12
MOVOU -48(AX)(CX*1), X13
MOVOU -32(AX)(CX*1), X14
MOVOU -16(AX)(CX*1), X15
// 異或種子
PXOR X0, X8
PXOR X1, X9
PXOR X2, X10
PXOR X3, X11
PXOR X4, X12
PXOR X5, X13
PXOR X6, X14
PXOR X7, X15
// 連續加密3次
AESENC X8, X8
AESENC X9, X9
AESENC X10, X10
AESENC X11, X11
AESENC X12, X12
AESENC X13, X13
AESENC X14, X14
AESENC X15, X15
AESENC X8, X8
AESENC X9, X9
AESENC X10, X10
AESENC X11, X11
AESENC X12, X12
AESENC X13, X13
AESENC X14, X14
AESENC X15, X15
AESENC X8, X8
AESENC X9, X9
AESENC X10, X10
AESENC X11, X11
AESENC X12, X12
AESENC X13, X13
AESENC X14, X14
AESENC X15, X15
// 拼裝結果
PXOR X12, X8
PXOR X13, X9
PXOR X14, X10
PXOR X15, X11
PXOR X10, X8
PXOR X11, X9
PXOR X9, X8
MOVQ X8, (DX)
RET
aes129plus:
// 處理第七個以上的起始種子
MOVO X1, X2
MOVO X1, X3
MOVO X1, X4
MOVO X1, X5
MOVO X1, X6
MOVO X1, X7
PXOR runtime·aeskeysched+16(SB), X1
PXOR runtime·aeskeysched+32(SB), X2
PXOR runtime·aeskeysched+48(SB), X3
PXOR runtime·aeskeysched+64(SB), X4
PXOR runtime·aeskeysched+80(SB), X5
PXOR runtime·aeskeysched+96(SB), X6
PXOR runtime·aeskeysched+112(SB), X7
AESENC X1, X1
AESENC X2, X2
AESENC X3, X3
AESENC X4, X4
AESENC X5, X5
AESENC X6, X6
AESENC X7, X7
// 逆序開始,從最后的block開始處理,因為可能會出現重疊的情況
MOVOU -128(AX)(CX*1), X8
MOVOU -112(AX)(CX*1), X9
MOVOU -96(AX)(CX*1), X10
MOVOU -80(AX)(CX*1), X11
MOVOU -64(AX)(CX*1), X12
MOVOU -48(AX)(CX*1), X13
MOVOU -32(AX)(CX*1), X14
MOVOU -16(AX)(CX*1), X15
// 異或種子
PXOR X0, X8
PXOR X1, X9
PXOR X2, X10
PXOR X3, X11
PXOR X4, X12
PXOR X5, X13
PXOR X6, X14
PXOR X7, X15
// 計算剩余128字節塊的數量
DECQ CX
SHRQ $7, CX
aesloop:
// 加密狀態
AESENC X8, X8
AESENC X9, X9
AESENC X10, X10
AESENC X11, X11
AESENC X12, X12
AESENC X13, X13
AESENC X14, X14
AESENC X15, X15
// 在同一個block塊中加密狀態,進行異或運算
MOVOU (AX), X0
MOVOU 16(AX), X1
MOVOU 32(AX), X2
MOVOU 48(AX), X3
AESENC X0, X8
AESENC X1, X9
AESENC X2, X10
AESENC X3, X11
MOVOU 64(AX), X4
MOVOU 80(AX), X5
MOVOU 96(AX), X6
MOVOU 112(AX), X7
AESENC X4, X12
AESENC X5, X13
AESENC X6, X14
AESENC X7, X15
ADDQ $128, AX
DECQ CX
JNE aesloop
// 最后一步,進行3次以上的加密
AESENC X8, X8
AESENC X9, X9
AESENC X10, X10
AESENC X11, X11
AESENC X12, X12
AESENC X13, X13
AESENC X14, X14
AESENC X15, X15
AESENC X8, X8
AESENC X9, X9
AESENC X10, X10
AESENC X11, X11
AESENC X12, X12
AESENC X13, X13
AESENC X14, X14
AESENC X15, X15
AESENC X8, X8
AESENC X9, X9
AESENC X10, X10
AESENC X11, X11
AESENC X12, X12
AESENC X13, X13
AESENC X14, X14
AESENC X15, X15
PXOR X12, X8
PXOR X13, X9
PXOR X14, X10
PXOR X15, X11
PXOR X10, X8
PXOR X11, X9
PXOR X9, X8
MOVQ X8, (DX)
RET
2. 哈希沖突處理
(1)鏈表數組法
鏈表數組法比較簡單,每個鍵值對表長取模,如果結果相同,用鏈表的方式依次往后插入。
假設待插入的鍵值集合是{ 2,3,5,7,11,13,19},表長 MOD 8。假設哈希函數在[0,9)上均勻分布。如上圖。
接下來重點進行性能分析:
查找鍵值 k,假設鍵值 k 不在哈希表中,h(k) 在 [0,M) 中均勻分布,即 P(h(k) = i) = 1/M 。令 Xi 為哈希表 ht[ i ] 中包含鍵值的個數。如果 h(k) = i ,則不成功查找 k 的鍵值比較次數是 Xi,于是:
成功查找的分析稍微復雜一點。要考慮添加哈希表的次序,不考慮有相同鍵值的情況,假設 K = {k1,k2,……kn},并且假設從空哈希表開始按照這個次序添加到哈希表中。引入隨機變量,如果 h(ki) = h(kj),那么 Xij = 1;如果 h(ki) != h(kj),那么 Xij = 0 。
由于之前的假設哈希表是均勻分布的,所以 P(Xij = i) = E(Xij) = 1/M ,這里的 E(X) 表示隨機變量 X 的數學期望。再假設每次添加鍵值的時候都是把添加在鏈表末端。令 Ci 為查找 Ki 所需的鍵值比較次數,由于不能事先確定查找 Ki 的概率,所以假定查找不同鍵值的概率都是相同的,都是 1/n ,則有:
由此我們可以看出,哈希表的性能和表中元素的多少關系不大,而和填充因子 α 有關。如果哈希表長和哈希表中元素個數成正比,則哈希表查找的復雜度為 O(1) 。
綜上所述,鏈表數組的成功與不成功的平均鍵值比較次數如下:
(2)開放地址法 —— 線性探測
線性探測的規則是 hi = ( h(k) + i ) MOD M。舉個例子,i = 1,M = 9。
這種處理沖突的方法,一旦發生沖突,就把位置往后加1,直到找到一個空的位置。
舉例如下,假設待插入的鍵值集合是{2,3,5,7,11,13,19},線性探測的發生沖突以后添加的值為1,那么最終結果如下:
線性探測哈希表的性能分析比較復雜,這里就僅給出結果。
(3)開放地址法 —— 平方探測
線性探測的規則是 h0 = h(k) ,hi = ( h0 + i * i ) MOD M。
舉例如下,假設待插入的鍵值集合是{2,3,5,7,11,13,20},平方探測的發生沖突以后添加的值為查找次數的平方,那么最終結果如下:
平方探測在線性探測的基礎上,加了一個二次曲線。當發生沖突以后,不再是加一個線性的參數,而是加上探測次數的平方。
平方探測有一個需要注意的是,M的大小有講究。如果M不是奇素數,那么就可能出現下面這樣的問題,即使哈希表里面還有空的位置,但是卻有元素找不到要插入的位置。
舉例,假設 M = 10,待插入的鍵值集合是{0,1,4,5,6,9,10},當前面6個鍵值插入哈希表中以后,10就再也無法插入了。
所以在平方探測中,存在下面這則規律:
如果 M 為奇素數,則下面的 ?M / 2? 位置 h0,h1,h2 …… h?M/2? 互不相同。其中,hi = (h0 + i * i ) MOD M。
這面這則規律可以用反證法證明。假設 hi = hj,i > j;0<=i,j<= ?M/2?,則 h0 + i* i = ( h0 + j * j ) MOD M,從而 M 可以整除 ( i + j )( i - j )。由于 M 為素數,并且 0 < i + j,i - j < M,當且僅當 i = j 的時候才能滿足。
上述規則也就說明了一點,只要 M 為奇素數,平方探測至少可以遍歷哈希表一般的位置。所以只要哈希表的填充因子 α <= 1 / 2 ,平方探測總能找到可插入的位置。
上述舉的例子,之所以鍵值10無法插入,原因也因為 α > 1 / 2了,所以不能保證有可插入的位置了。
(4)開放地址法 —— 雙哈希探測
雙哈希探測是為了解決聚集的現象。無論是線性探測還是平方探測,如果 h(k1) 和 h(k2) 相鄰,則它們的探測序列也都是相鄰的,這就是所謂的聚集現象。為了避免這種現象,所以引入了雙哈希函數 h2,使得兩次探測之間的距離為 h2(k)。所以探測序列為 h0 = h1(k),hi = ( h0 + i * h2(k) ) MOD M 。實驗表明,雙哈希探測的性能類似于隨機探測。
關于雙哈希探測和平方探測的平均查找長度比線性探測更加困難。所以引入隨機探測的概念來近似這兩種探測。隨機探測是指探測序列 { hi } 在區間 [0,M]中等概率獨立隨機選取,這樣 P(hi = j) = 1/M 。
假設探測序列為 h0,h1,……,hi。在哈希表的 hi 位置為空,在 h0,h1,……,hi-1 的位置上哈希表不是空,此次查找的鍵值比較次數為 i。令隨機變量 X 為一次不成功查找所需的鍵值比較次數。由于哈希表的填充因子為 α,所以在一個位置上哈希表為空值的概率為 1 - α ,為非空值的概率為 α,所以 P( X = i ) = α^i * ( 1 - α ) 。
在概率論中,上述的分布叫幾何分布。
假定哈希表元素的添加順序為 {k1,k2,…… ,kn},令 Xi 為當哈希表只包含 {k1,k2,…… ,ki} 時候一次不成功查找的鍵值比較次數,注意,這個時候哈希表的填充因子為 i/M ,則查找 k(i+1) 的鍵值次數為 Yi = 1 + Xi。假定查找任意一個鍵值的概率為 1/n,則一次成功查找的平均鍵值比較次數為:
綜上所述,平方探測和雙哈希探測的成功與不成功的平均鍵值比較次數如下:
總的來說,在數據量非常大的情況下,簡單的 hash 函數不可避免不產生碰撞,即使采用了合適的處理碰撞的方法,依舊有一定時間復雜度。所以想盡可能的避免碰撞,還是要選擇高性能的 hash 函數,或者增加 hash 的位數,比如64位,128位,256位,這樣碰撞的幾率會小很多。
3. 哈希表的擴容策略
隨著哈希表裝載因子的變大,發生碰撞的次數變得越來也多,哈希表的性能變得越來越差。對于單獨鏈表法實現的哈希表,尚可以容忍,但是對于開放尋址法,這種性能的下降是不能接受的,因此對于開放尋址法需要尋找一種方法解決這個問題。
在實際應用中,解決這個問題的辦法是動態的增大哈希表的長度,當裝載因子超過某個閾值時增加哈希表的長度,自動擴容。每當哈希表的長度發生變化之后,所有 key 在哈希表中對應的下標索引需要全部重新計算,不能直接從原來的哈希表中拷貝到新的哈希表中。必須一個一個計算原來哈希表中的 key 的哈希值并插入到新的哈希表中。這種方式肯定是達不到生產環境的要求的,因為時間復雜度太高了,O(n),數據量一旦大了,性能就會很差。Redis 想了一種方法,就算是觸發增長時也只需要常數時間 O(1) 即可完成插入操作。解決辦法是分多次、漸進式地完成的舊哈希表到新哈希表的拷貝而不是一次拷貝完成。
接下來以 Redis 為例,來談談它是哈希表是如何進行擴容并且不太影響性能的。
Redis 對字典的定義如下:
C
/*
* 字典
*
* 每個字典使用兩個哈希表,用于實現漸進式 rehash
*/
typedef struct dict {
// 特定于類型的處理函數
dictType *type;
// 類型處理函數的私有數據
void *privdata;
// 哈希表(2 個)
dictht ht[2];
// 記錄 rehash 進度的標志,值為 -1 表示 rehash 未進行
int rehashidx;
// 當前正在運作的安全迭代器數量
int iterators;
} dict;
從定義上我們可以看到,Redis 字典保存了2個哈希表,哈希表ht[1]就是用來 rehash 的。
在 Redis 中定義了如下的哈希表的數據結構:
C
/*
* 哈希表
*/
typedef struct dictht {
// 哈希表節點指針數組(俗稱桶,bucket)
dictEntry **table;
// 指針數組的大小
unsigned long size;
// 指針數組的長度掩碼,用于計算索引值
unsigned long sizemask;
// 哈希表現有的節點數量
unsigned long used;
} dictht;
table 屬性是個數組, 數組的每個元素都是個指向 dictEntry 結構的指針。
每個 dictEntry 都保存著一個鍵值對, 以及一個指向另一個 dictEntry 結構的指針:
C
/*
* 哈希表節點
*/
typedef struct dictEntry {
// 鍵
void *key;
// 值
union {
void *val;
uint64_t u64;
int64_t s64;
} v;
// 鏈往后繼節點
struct dictEntry *next;
} dictEntry;
next 屬性指向另一個 dictEntry 結構, 多個 dictEntry 可以通過 next 指針串連成鏈表, 從這里可以看出, dictht 使用鏈地址法來處理鍵碰撞問題的。
dictAdd 在每次向字典添加新鍵值對之前, 都會對哈希表 ht[0] 進行檢查, 對于 ht[0] 的 size 和 used 屬性, 如果它們之間的比率 ratio = used / size 滿足以下任何一個條件的話,rehash 過程就會被激活:
自然 rehash : ratio >= 1 ,且變量 dict_can_resize 為真。
強制 rehash : ratio 大于變量 dict_force_resize_ratio (目前版本中, dict_force_resize_ratio 的值為 5 )。
假設當前的字典需要擴容 rehash,那么 Redis 會先設置字典的 rehashidx 為 0 ,標識著 rehash 的開始;再為 ht[1]->table 分配空間,大小至少為 ht[0]->used 的兩倍。
如上圖, ht[1]->table 已經分配空間了8個空間了。
接著,開始 rehash 。將 ht[0]->table 內的鍵值移動到 ht[1]->table 中,鍵值的移動不是一次完成的,分多次進行。
上圖可以看出來, ht[0] 中的一部分鍵值已經遷移到 ht[1] 中了,并且此時還有新的鍵值插入進來,是直接插入到 ht[1] 中的,不會再插入到 ht[0] 中了。保證了 ht[0] 只減不增。
在 rehash 進行的過程中,不斷的有新的鍵值插入進來,也不斷的把 ht[0] 中的鍵值都遷移過來,直到 ht[0] 中的鍵值都遷移過來為止。注意 Redis 用的是頭插法,新值永遠都插在鏈表的第一個位置,這樣也不用遍歷到鏈表的最后,省去了 O(n) 的時間復雜度。進行到上圖這種情況,所有的節點也就遷移完畢了。
rehash 在結束之前會進行清理工作,釋放 ht[0] 的空間;用 ht[1] 來代替 ht[0] ,使原來的 ht[1] 成為新的 ht[0] ;創建一個新的空哈希表,并將它設置為 ht[1] ;將字典的 rehashidx 屬性設置為 -1 ,標識 rehash 已停止;
最終 rehash 結束以后情況如上圖。如果還下次還需要 rehash ,重復上述過程即可。這種分多次,漸進式 rehash 的方式也成就了 Redis 的高性能。
值得一提的是,Redis 是支持字典的 reshrink 操作的。操作步驟就是 rehash 的逆過程。
二. 紅黑樹優化
讀到這里,讀者應該已經明白了到底用什么方式來控制 map 使得 Hash 碰撞的概率又小,哈希桶數組占用空間又少了吧,答案就是選擇好的 Hash 算法和增加擴容機制。
Java 在 JDK1.8 對 HashMap 底層的實現再次進行了優化。
上圖是來自美團博客總結的。從這里我們可以發現:
Java 底層初始桶的個數是16個,負載因子默認是0.75,也就是說當鍵值第一次達到12個的時候就會進行擴容 resize。擴容的臨界值在64,當超過了64以后,并且沖突節點為8或者大于8,這個時候就會觸發紅黑樹轉換。為了防止底層鏈表過長,鏈表就轉換為紅黑樹。
換句話說,當桶的總個數沒有到64個的時候,即使鏈表長為8,也不會進行紅黑樹轉換。
如果節點小于6個,紅黑樹又會重新退化成鏈表。
當然這里之所以選擇用紅黑樹來進行優化,保證最壞情況不會退化成 O(n),紅黑樹能保證最壞時間復雜度也為 O(log n)。
在美團博客中也提到了,Java 在 JDK1.8 中還有一個值得學習的優化。Java 在 rehash 的鍵值節點遷移過程中,不需要再次計算一次 hash 計算!
由于使用了2次冪的擴展(指長度擴為原來2倍),所以,元素的位置要么是在原位置,要么是在原位置再移動2次冪的位置。看下圖可以明白這句話的意思,n 為 table 的長度,圖(a)表示擴容前的 key1 和 key2 兩種 key 確定索引位置的示例,圖(b)表示擴容后 key1 和
key2 兩種 key 確定索引位置的示例,其中 hash1 是 key1 對應的哈希與高位運算結果。
元素在重新計算 hash 之后,因為 n 變為2倍,那么 n-1 的 mask 范圍在高位多1bit(紅色),因此新的 index 就會發生這樣的變化:
所以在擴容以后,就只需要看擴容容量以后那個位上的值為0,還是為1,如果是0,代表索引不變,如果是1,代表的是新的索引值等于原來的索引值加上 oldCap 即可,這樣就不需要再次計算一次 hash 了。
上圖是把16擴容到32的情況。
三. Go 中 Map 的具體實現舉例
讀到這里,讀者對如何設計一個 Map 應該有一些自己的想法了。選擇一個優秀的哈希算法,用鏈表 + 數組 作為底層數據結構,如何擴容和優化,這些應該都有了解了。讀到這里也許讀者認為本篇文章內容已經過半了,不過前面這些都是偏理論,接下來也許才到了本文的重點部分 —— 從零開始分析一下完整的 Map 實現。
接下來筆者對 Go 中的 Map 的底層實現進行分析,也算是對一個 Map 的具體實現和重要的幾個操作,添加鍵值,刪除鍵值,擴容策略進行舉例。
Go 的 map 實現在 /src/runtime/hashmap.go 這個文件中。
map 底層實質還是一個 hash table。
先來看看 Go 定義了一些常量。
Go
const (
// 一個桶里面最多可以裝的鍵值對的個數,8對。
bucketCntBits = 3
bucketCnt = 1 << bucketCntBits
// 觸發擴容操作的最大裝載因子的臨界值
loadFactor = 6.5
// 為了保持內聯,鍵 和 值 的最大長度都是128字節,如果超過了128個字節,就存儲它的指針
maxKeySize = 128
maxValueSize = 128
// 數據偏移應該是 bmap 的整數倍,但是需要正確的對齊。
dataOffset = unsafe.Offsetof(struct {
b bmap
v int64
}{}.v)
// tophash 的一些值
empty = 0 // 沒有鍵值對
evacuatedEmpty = 1 // 沒有鍵值對,并且桶內的鍵值被遷移走了。
evacuatedX = 2 // 鍵值對有效,并且已經遷移了一個表的前半段
evacuatedY = 3 // 鍵值對有效,并且已經遷移了一個表的后半段
minTopHash = 4 // 最小的 tophash
// 標記
iterator = 1 // 當前桶的迭代子
oldIterator = 2 // 舊桶的迭代子
hashWriting = 4 // 一個goroutine正在寫入map
sameSizeGrow = 8 // 當前字典增長到新字典并且保持相同的大小
// 迭代子檢查桶ID的哨兵
noCheck = 1<<(8*sys.PtrSize) - 1
)
這里值得說明的一點是觸發擴容操作的臨界值6.5是怎么得來的。這個值太大會導致overflow buckets過多,查找效率降低,過小會浪費存儲空間。
據 Google 開發人員稱,這個值是一個測試的程序,測量出來選擇的一個經驗值。
%overflow : 溢出率,平均一個 bucket 有多少個 鍵值kv 的時候會溢出。
bytes/entry :
平均存一個 鍵值kv 需要額外存儲多少字節的數據。
hitprobe :
查找一個存在的 key 平均查找次數。
missprobe :
查找一個不存在的 key 平均查找次數。
經過這幾組測試數據,最終選定 6.5 作為臨界的裝載因子。
接著看看 Go 中 map header 的定義:
Go
type hmap struct {
count int // map 的長度
flags uint8
B uint8 // log以2為底,桶個數的對數 (總共能存 6.5 * 2^B 個元素)
noverflow uint16 // 近似溢出桶的個數
hash0 uint32 // 哈希種子
buckets unsafe.Pointer // 有 2^B 個桶的數組. count==0 的時候,這個數組為 nil.
oldbuckets unsafe.Pointer // 舊的桶數組一半的元素
nevacuate uintptr // 擴容增長過程中的計數器
extra *mapextra // 可選字段
}
在 Go 的 map header 結構中,也包含了2個指向桶數組的指針,buckets 指向新的桶數組,oldbuckets 指向舊的桶數組。這點和 Redis 字典中也有兩個 dictht 數組類似。
hmap 的最后一個字段是一個指向 mapextra 結構的指針,它的定義如下:
Go
type mapextra struct {
overflow [2]*[]*bmap
nextOverflow *bmap
}
如果一個鍵值對沒有找到對應的指針,那么就會把它們先存到溢出桶 overflow 里面。在 mapextra 中還有一個指向下一個可用的溢出桶的指針。
溢出桶 overflow 是一個數組,里面存了2個指向 *bmap 數組的指針。overflow[0] 里面裝的是 hmap.buckets 。overflow[1] 里面裝的是 hmap.oldbuckets。
再看看桶的數據結構的定義,bmap 就是 Go 中 map 里面桶對應的結構體類型。
Go
type bmap struct {
tophash [bucketCnt]uint8
}
桶的定義比較簡單,里面就只是包含了一個 uint8 類型的數組,里面包含8個元素。這8個元素存儲的是 hash 值的高8位。
在 tophash 之后的內存布局里還有2塊內容。緊接著 tophash 之后的是8對 鍵值 key- value 對。并且排列方式是 8個 key 和 8個 value 放在一起。
8對 鍵值 key- value 對結束以后緊接著一個 overflow 指針,指向下一個 bmap。從此也可以看出 Go 中 map是用鏈表的方式處理 hash 沖突的。
為何 Go 存儲鍵值對的方式不是普通的 key/value、key/value、key/value……這樣存儲的呢?它是鍵 key 都存儲在一起,然后緊接著是 值value 都存儲在一起,為什么會這樣呢?
在 Redis 中,當使用 REDIS_ENCODING_ZIPLIST 編碼哈希表時, 程序通過將鍵和值一同推入壓縮列表, 從而形成保存哈希表所需的鍵-值對結構,如上圖。新添加的 key-value 對會被添加到壓縮列表的表尾。
這種結構有一個弊端,如果存儲的鍵和值的類型不同,在內存中布局中所占字節不同的話,就需要對齊。比如說存儲一個 map[int64]int8 類型的字典。
Go 為了節約內存對齊的內存消耗,于是把它設計成上圖所示那樣。
如果 map 里面存儲了上萬億的大數據,這里節約出來的內存空間還是比較可觀的。
1. 新建 Map
makemap 新建了一個 Map,如果入參 h 不為空,那么 map 的 hmap 就是入參的這個 hmap,如果入參 bucket 不為空,那么這個 bucket 桶就作為第一個桶。
Go
func makemap(t *maptype, hint int64, h *hmap, bucket unsafe.Pointer) *hmap {
// hmap 的 size 大小的值非法
if sz := unsafe.Sizeof(hmap{}); sz > 48 || sz != t.hmap.size {
println("runtime: sizeof(hmap) =", sz, ", t.hmap.size =", t.hmap.size)
throw("bad hmap size")
}
// 超過范圍的 hint 值都為0
if hint < 0 || hint > int64(maxSliceCap(t.bucket.size)) {
hint = 0
}
// key 值的類型不是 Go 所支持的
if !ismapkey(t.key) {
throw("runtime.makemap: unsupported map key type")
}
// 通過編譯器和反射檢車 key 值的 size 是否合法
if t.key.size > maxKeySize && (!t.indirectkey || t.keysize != uint8(sys.PtrSize)) ||
t.key.size <= maxKeySize && (t.indirectkey || t.keysize != uint8(t.key.size)) {
throw("key size wrong")
}
// 通過編譯器和反射檢車 value 值的 size 是否合法
if t.elem.size > maxValueSize && (!t.indirectvalue || t.valuesize != uint8(sys.PtrSize)) ||
t.elem.size <= maxValueSize && (t.indirectvalue || t.valuesize != uint8(t.elem.size)) {
throw("value size wrong")
}
// 雖然以下的變量我們不依賴,而且可以在編譯階段檢查下面這些值的合法性,
// 但是我們還是在這里檢測。
// key 值對齊超過桶的個數
if t.key.align > bucketCnt {
throw("key align too big")
}
// value 值對齊超過桶的個數
if t.elem.align > bucketCnt {
throw("value align too big")
}
// key 值的 size 不是 key 值對齊的倍數
if t.key.size%uintptr(t.key.align) != 0 {
throw("key size not a multiple of key align")
}
// value 值的 size 不是 value 值對齊的倍數
if t.elem.size%uintptr(t.elem.align) != 0 {
throw("value size not a multiple of value align")
}
// 桶個數太小,無法正確對齊
if bucketCnt < 8 {
throw("bucketsize too small for proper alignment")
}
// 數據偏移量不是 key 值對齊的整數倍,說明需要在桶中填充 key
if dataOffset%uintptr(t.key.align) != 0 {
throw("need padding in bucket (key)")
}
// 數據偏移量不是 value 值對齊的整數倍,說明需要在桶中填充 value
if dataOffset%uintptr(t.elem.align) != 0 {
throw("need padding in bucket (value)")
}
B := uint8(0)
for ; overLoadFactor(hint, B); B++ {
}
// 分配內存并初始化哈希表
// 如果此時 B = 0,那么 hmap 中的 buckets 字段稍后分配
// 如果 hint 值很大,初始化這塊內存需要一段時間。
buckets := bucket
var extra *mapextra
if B != 0 {
var nextOverflow *bmap
// 初始化 bucket 和 nextOverflow
buckets, nextOverflow = makeBucketArray(t, B)
if nextOverflow != nil {
extra = new(mapextra)
extra.nextOverflow = nextOverflow
}
}
// 初始化 hmap
if h == nil {
h = (*hmap)(newobject(t.hmap))
}
h.count = 0
h.B = B
h.extra = extra
h.flags = 0
h.hash0 = fastrand()
h.buckets = buckets
h.oldbuckets = nil
h.nevacuate = 0
h.noverflow = 0
return h
}
新建一個 map 最重要的就是分配內存并初始化哈希表,在 B 不為0的情況下,還會初始化 mapextra 并且會 buckets 會被重新生成。
Go
func makeBucketArray(t *maptype, b uint8) (buckets unsafe.Pointer, nextOverflow *bmap) {
base := uintptr(1 << b)
nbuckets := base
if b >= 4 {
nbuckets += 1 << (b - 4)
sz := t.bucket.size * nbuckets
up := roundupsize(sz)
// 如果申請 sz 大小的桶,系統只能返回 up 大小的內存空間,那么桶的個數為 up / t.bucket.size
if up != sz {
nbuckets = up / t.bucket.size
}
}
buckets = newarray(t.bucket, int(nbuckets))
// 當 b > 4 并且計算出來桶的個數與 1 << b 個數不等的時候,
if base != nbuckets {
// 此時 nbuckets 比 base 大,那么會預先分配 nbuckets - base 個 nextOverflow 桶
nextOverflow = (*bmap)(add(buckets, base*uintptr(t.bucketsize)))
last := (*bmap)(add(buckets, (nbuckets-1)*uintptr(t.bucketsize)))
last.setoverflow(t, (*bmap)(buckets))
}
return buckets, nextOverflow
}
這里的 newarray 就已經是 mallocgc 了。
從上述代碼里面可以看出,只有當 B >=4 的時候,makeBucketArray 才會生成 nextOverflow 指針指向 bmap,從而在 Map 生成 hmap 的時候才會生成 mapextra 。
當 B = 3 ( B < 4 ) 的時候,初始化 hmap 只會生成8個桶。
當 B = 4 ( B >= 4 ) 的時候,初始化 hmap 的時候還會額外生成 mapextra ,并初始化 nextOverflow。mapextra 的 nextOverflow 指針會指向第16個桶結束,第17個桶的首地址。第17個桶(從0開始,也就是下標為16的桶)的 bucketsize - sys.PtrSize 地址開始存一個指針,這個指針指向當前整個桶的首地址。這個指針就是 bmap 的 overflow 指針。
當 B = 5 ( B >= 4 ) 的時候,初始化 hmap 的時候還會額外生成 mapextra ,并初始化 nextOverflow。這個時候就會生成總共34個桶了。同理,最后一個桶大小減去一個指針的大小的地址開始存儲一個 overflow 指針。
2. 查找 Key
在 Go 中,如果字典里面查找一個不存在的 key ,查找不到并不會返回一個 nil ,而是返回當前類型的零值。比如,字符串就返回空字符串,int 類型就返回 0 。
Go
func mapaccess1(t *maptype, h *hmap, key unsafe.Pointer) unsafe.Pointer {
if raceenabled && h != nil {
// 獲取 caller 的 程序計數器 program counter
callerpc := getcallerpc(unsafe.Pointer(&t))
// 獲取 mapaccess1 的程序計數器 program counter
pc := funcPC(mapaccess1)
racereadpc(unsafe.Pointer(h), callerpc, pc)
raceReadObjectPC(t.key, key, callerpc, pc)
}
if msanenabled && h != nil {
msanread(key, t.key.size)
}
if h == nil || h.count == 0 {
return unsafe.Pointer(&zeroVal[0])
}
// 如果多線程讀寫,直接拋出異常
// 并發檢查 go hashmap 不支持并發訪問
if h.flags&hashWriting != 0 {
throw("concurrent map read and map write")
}
alg := t.key.alg
// 計算 key 的 hash 值
hash := alg.hash(key, uintptr(h.hash0))
m := uintptr(1)<<h.B - 1
// hash % (1<<B - 1) 求出 key 在哪個桶
b := (*bmap)(add(h.buckets, (hash&m)*uintptr(t.bucketsize)))
// 如果當前還存在 oldbuckets 桶
if c := h.oldbuckets; c != nil {
// 當前擴容不是等量擴容
if !h.sameSizeGrow() {
// 如果 oldbuckets 未遷移完成 則找找 oldbuckets 中對應的 bucket(低 B-1 位)
// 否則為 buckets 中的 bucket(低 B 位)
// 把 mask 縮小 1 倍
m >>= 1
}
oldb := (*bmap)(add(c, (hash&m)*uintptr(t.bucketsize)))
if !evacuated(oldb) {
// 如果 oldbuckets 桶存在,并且還沒有擴容遷移,就在老的桶里面查找 key
b = oldb
}
}
// 取出 hash 值的高 8 位
top := uint8(hash >> (sys.PtrSize*8 - 8))
// 如果 top 小于 minTopHash,就讓它加上 minTopHash 的偏移。
// 因為 0 - minTopHash 這區間的數都已經用來作為標記位了
if top < minTopHash {
top += minTopHash
}
for {
for i := uintptr(0); i < bucketCnt; i++ {
// 如果 hash 的高8位和當前 key 記錄的不一樣,就找下一個
// 這樣比較很高效,因為只用比較高8位,不用比較所有的 hash 值
// 如果高8位都不相同,hash 值肯定不同,但是高8位如果相同,那么就要比較整個 hash 值了
if b.tophash[i] != top {
continue
}
// 取出 key 值的方式是用偏移量,bmap 首地址 + i 個 key 值大小的偏移量
k := add(unsafe.Pointer(b), dataOffset+i*uintptr(t.keysize))
if t.indirectkey {
k = *((*unsafe.Pointer)(k))
}
// 比較 key 值是否相等
if alg.equal(key, k) {
// 如果找到了 key,那么取出 value 值
// 取出 value 值的方式是用偏移量,bmap 首地址 + 8 個 key 值大小的偏移量 + i 個 value 值大小的偏移量
v := add(unsafe.Pointer(b), dataOffset+bucketCnt*uintptr(t.keysize)+i*uintptr(t.valuesize))
if t.indirectvalue {
v = *((*unsafe.Pointer)(v))
}
return v
}
}
// 如果當前桶里面沒有找到相應的 key ,那么就去下一個桶去找
b = b.overflow(t)
// 如果 b == nil,說明桶已經都找完了,返回對應type的零值
if b == nil {
return unsafe.Pointer(&zeroVal[0])
}
}
}
具體實現代碼如上,詳細解釋見代碼。
如上圖,這是一個查找 key 的全過程。
首先計算出 key 對應的 hash 值,hash 值對 B 取余。
這里有一個優化點。m % n 這步計算,如果 n 是2的倍數,那么可以省去這一步取余操作。
Go
m % n = m & ( n - 1 )
這樣優化就可以省去耗時的取余操作了。這里例子中計算完取出來是 0010 ,也就是2,于是對應的是桶數組里面的第3個桶。為什么是第3個桶呢?首地址指向第0個桶,往下偏移2個桶的大小,于是偏移到了第3個桶的首地址了,具體實現可以看上述代碼。
hash 的低 B 位決定了桶數組里面的第幾個桶,hash 值的高8位決定了這個桶數組 bmap 里面 key 存在 tophash 數組的第幾位了。如上圖,hash 的高8位用來和 tophash 數組里面的每個值進行對比,如果高8位和 tophash[i] 不等,就直接比下一個。如果相等,則取出 bmap 里面對應完整的 key,再比較一次,看是否完全一致。
整個查找過程優先在 oldbucket 里面找(如果存在 lodbucket 的話),找完再去新 bmap 里面找。
有人可能會有疑問,為何這里要加入 tophash 多一次比較呢?
tophash 的引入是為了加速查找的。由于它只存了 hash 值的高8位,比查找完整的64位要快很多。通過比較高8位,迅速找到高8位一致hash 值的索引,接下來再進行一次完整的比較,如果還一致,那么就判定找到該 key 了。
如果找到了 key 就返回對應的 value。如果沒有找到,還會繼續去 overflow 桶繼續尋找,直到找到最后一個桶,如果還沒有找到就返回對應類型的零值。
3. 插入 Key
插入 key 的過程和查找 key 的過程大體一致。
Go
func mapassign(t *maptype, h *hmap, key unsafe.Pointer) unsafe.Pointer {
if h == nil {
panic(plainError("assignment to entry in nil map"))
}
if raceenabled {
// 獲取 caller 的 程序計數器 program counter
callerpc := getcallerpc(unsafe.Pointer(&t))
// 獲取 mapassign 的程序計數器 program counter
pc := funcPC(mapassign)
racewritepc(unsafe.Pointer(h), callerpc, pc)
raceReadObjectPC(t.key, key, callerpc, pc)
}
if msanenabled {
msanread(key, t.key.size)
}
// 如果多線程讀寫,直接拋出異常
// 并發檢查 go hashmap 不支持并發訪問
if h.flags&hashWriting != 0 {
throw("concurrent map writes")
}
alg := t.key.alg
// 計算 key 值的 hash 值
hash := alg.hash(key, uintptr(h.hash0))
// 在計算完 hash 值以后立即設置 hashWriting 變量的值,如果在計算 hash 值的過程中沒有完全寫完,可能會導致 panic
h.flags |= hashWriting
// 如果 hmap 的桶的個數為0,那么就新建一個桶
if h.buckets == nil {
h.buckets = newarray(t.bucket, 1)
}
again:
// hash 值對 B 取余,求得所在哪個桶
bucket := hash & (uintptr(1)<<h.B - 1)
// 如果還在擴容中,繼續擴容
if h.growing() {
growWork(t, h, bucket)
}
// 根據 hash 值的低 B 位找到位于哪個桶
b := (*bmap)(unsafe.Pointer(uintptr(h.buckets) + bucket*uintptr(t.bucketsize)))
// 計算 hash 值的高 8 位
top := uint8(hash >> (sys.PtrSize*8 - 8))
if top < minTopHash {
top += minTopHash
}
var inserti *uint8
var insertk unsafe.Pointer
var val unsafe.Pointer
for {
// 遍歷當前桶所有鍵值,查找 key 對應的 value
for i := uintptr(0); i < bucketCnt; i++ {
if b.tophash[i] != top {
if b.tophash[i] == empty && inserti == nil {
// 如果往后找都沒有找到,這里先記錄一個標記,方便找不到以后插入到這里
inserti = &b.tophash[i]
// 計算出偏移 i 個 key 值的位置
insertk = add(unsafe.Pointer(b), dataOffset+i*uintptr(t.keysize))
// 計算出 val 所在的位置,當前桶的首地址 + 8個 key 值所占的大小 + i 個 value 值所占的大小
val = add(unsafe.Pointer(b), dataOffset+bucketCnt*uintptr(t.keysize)+i*uintptr(t.valuesize))
}
continue
}
// 依次取出 key 值
k := add(unsafe.Pointer(b), dataOffset+i*uintptr(t.keysize))
// 如果 key 值是一個指針,那么就取出改指針對應的 key 值
if t.indirectkey {
k = *((*unsafe.Pointer)(k))
}
// 比較 key 值是否相等
if !alg.equal(key, k) {
continue
}
// 如果需要更新,那么就把 t.key 拷貝從 k 拷貝到 key
if t.needkeyupdate {
typedmemmove(t.key, k, key)
}
// 計算出 val 所在的位置,當前桶的首地址 + 8個 key 值所占的大小 + i 個 value 值所占的大小
val = add(unsafe.Pointer(b), dataOffset+bucketCnt*uintptr(t.keysize)+i*uintptr(t.valuesize))
goto done
}
ovf := b.overflow(t)
if ovf == nil {
break
}
b = ovf
}
// 沒有找到當前的 key 值,并且檢查最大負載因子,如果達到了最大負載因子,或者存在很多溢出的桶
if !h.growing() && (overLoadFactor(int64(h.count), h.B) || tooManyOverflowBuckets(h.noverflow, h.B)) {
// 開始擴容
hashGrow(t, h)
goto again // Growing the table invalidates everything, so try again
}
// 如果找不到一個空的位置可以插入 key 值
if inserti == nil {
// all current buckets are full, allocate a new one.
// 意味著當前桶已經全部滿了,那么就生成一個新的
newb := h.newoverflow(t, b)
inserti = &newb.tophash[0]
insertk = add(unsafe.Pointer(newb), dataOffset)
val = add(insertk, bucketCnt*uintptr(t.keysize))
}
// store new key/value at insert position
if t.indirectkey {
// 如果是存儲 key 值的指針,這里就用 insertk 存儲 key 值的地址
kmem := newobject(t.key)
*(*unsafe.Pointer)(insertk) = kmem
insertk = kmem
}
if t.indirectvalue {
// 如果是存儲 value 值的指針,這里就用 val 存儲 key 值的地址
vmem := newobject(t.elem)
*(*unsafe.Pointer)(val) = vmem
}
// 將 t.key 從 insertk 拷貝到 key 的位置
typedmemmove(t.key, insertk, key)
*inserti = top
// hmap 中保存的總 key 值的數量 + 1
h.count++
done:
// 禁止并發寫
if h.flags&hashWriting == 0 {
throw("concurrent map writes")
}
h.flags &^= hashWriting
if t.indirectvalue {
// 如果 value 里面存儲的是指針,那么取值該指針指向的 value 值
val = *((*unsafe.Pointer)(val))
}
return val
}
插入 key 的過程中和查找 key 有幾點不同,需要注意:
1.如果找到要插入的 key ,只需要直接更新對應的 value 值就好了。
2.如果沒有在 bmap 中沒有找到待插入的 key ,這么這時分幾種情況。 情況一: bmap 中還有空位,在遍歷 bmap 的時候預先標記空位,一旦查找結束也沒有找到 key,就把 key 放到預先遍歷時候標記的空位上。 情況二:bmap中已經沒有空位了。這個時候 bmap 裝的很滿了。此時需要檢查一次最大負載因子是否已經達到了。如果達到了,立即進行擴容操作。擴容以后在新桶里面插入 key,流程和上述的一致。如果沒有達到最大負載因子,那么就在新生成一個 bmap,并把前一個 bmap 的 overflow 指針指向新的 bmap。
3.在擴容過程中,oldbucke t是被凍結的,查找 key 時會在 oldbucket 中查找,但不會在 oldbucket 中插入數據。如果在 oldbucket 是找到了相應的key,做法是將它遷移到新 bmap 后加入 evalucated 標記。
其他流程和查找 key 基本一致,這里就不再贅述了。
3. 刪除 Key
Go
func mapdelete(t *maptype, h *hmap, key unsafe.Pointer) {
if raceenabled && h != nil {
// 獲取 caller 的 程序計數器 program counter
callerpc := getcallerpc(unsafe.Pointer(&t))
// 獲取 mapdelete 的程序計數器 program counter
pc := funcPC(mapdelete)
racewritepc(unsafe.Pointer(h), callerpc, pc)
raceReadObjectPC(t.key, key, callerpc, pc)
}
if msanenabled && h != nil {
msanread(key, t.key.size)
}
if h == nil || h.count == 0 {
return
}
// 如果多線程讀寫,直接拋出異常
// 并發檢查 go hashmap 不支持并發訪問
if h.flags&hashWriting != 0 {
throw("concurrent map writes")
}
alg := t.key.alg
// 計算 key 值的 hash 值
hash := alg.hash(key, uintptr(h.hash0))
// 在計算完 hash 值以后立即設置 hashWriting 變量的值,如果在計算 hash 值的過程中沒有完全寫完,可能會導致 panic
h.flags |= hashWriting
bucket := hash & (uintptr(1)<<h.B - 1)
// 如果還在擴容中,繼續擴容
if h.growing() {
growWork(t, h, bucket)
}
// 根據 hash 值的低 B 位找到位于哪個桶
b := (*bmap)(unsafe.Pointer(uintptr(h.buckets) + bucket*uintptr(t.bucketsize)))
// 計算 hash 值的高 8 位
top := uint8(hash >> (sys.PtrSize*8 - 8))
if top < minTopHash {
top += minTopHash
}
for {
// 遍歷當前桶所有鍵值,查找 key 對應的 value
for i := uintptr(0); i < bucketCnt; i++ {
if b.tophash[i] != top {
continue
}
k := add(unsafe.Pointer(b), dataOffset+i*uintptr(t.keysize))
// 如果 k 是指向 key 的指針,那么這里需要取出 key 的值
k2 := k
if t.indirectkey {
k2 = *((*unsafe.Pointer)(k2))
}
if !alg.equal(key, k2) {
continue
}
// key 的指針置空
if t.indirectkey {
*(*unsafe.Pointer)(k) = nil
} else {
// 清除 key 的內存
typedmemclr(t.key, k)
}
v := unsafe.Pointer(uintptr(unsafe.Pointer(b)) + dataOffset + bucketCnt*uintptr(t.keysize) + i*uintptr(t.valuesize))
// value 的指針置空
if t.indirectvalue {
*(*unsafe.Pointer)(v) = nil
} else {
// 清除 value 的內存
typedmemclr(t.elem, v)
}
// 清空 tophash 里面的值
b.tophash[i] = empty
// map 里面 key 的總個數減1
h.count--
goto done
}
// 如果沒有找到,那么就繼續查找 overflow 桶,一直遍歷到最后一個
b = b.overflow(t)
if b == nil {
goto done
}
}
done:
if h.flags&hashWriting == 0 {
throw("concurrent map writes")
}
h.flags &^= hashWriting
}
刪除操作主要流程和查找 key 流程也差不多,找到對應的 key 以后,如果是指針指向原來的 key,就把指針置為 nil。如果是值就清空它所在的內存。還要清理 tophash 里面的值最后把 map 的 key 總個數計數器減1 。
如果在擴容過程中,刪除操作會在擴容以后在新的 bmap 里面刪除。
查找的過程依舊會一直遍歷到鏈表的最后一個 bmap 桶。
4. 增量翻倍擴容
這部分算是整個 Map 實現比較核心的部分了。我們都知道 Map 在不斷的裝載 Key 值的時候,查找效率會變的越來越低,如果此時不進行擴容操作的話,哈希沖突使得鏈表變得越來越長,性能也就越來越差。擴容勢在必行。
但是擴容過程中如果阻斷了 Key 值的寫入,在處理大數據的時候會導致有一段不響應的時間,如果用在高實時的系統中,那么每次擴容都會卡幾秒,這段時間都不能相應任何請求。這種性能明顯是不能接受的。所以要既不影響寫入,也同時要進行擴容。這個時候就應該增量擴容了。
這里增量擴容其實用途已經很廣泛了,之前舉例的 Redis 就采用的增量擴容策略。
接下來看看 Go 是怎么進行增量擴容的。
在 Go 的 mapassign 插入 Key 值、mapdelete 刪除 key 值的時候都會檢查當前是否在擴容中。
Go
func growWork(t *maptype, h *hmap, bucket uintptr) {
// 確保我們遷移了所有 oldbucket
evacuate(t, h, bucket&h.oldbucketmask())
// 再遷移一個標記過的桶
if h.growing() {
evacuate(t, h, h.nevacuate)
}
}
從這里我們可以看到,每次執行一次 growWork 會遷移2個桶。一個是當前的桶,這算是局部遷移,另外一個是 hmap 里面指向的 nevacuate 的桶,這算是增量遷移。
在插入 Key 值的時候,如果當前在擴容過程中,oldbucket 是被凍結的,查找時會先在 oldbucket 中查找,但不會在oldbucket中插入數據。只有在 oldbucket 找到了相應的 key,那么將它遷移到新 bucket 后加入 evalucated 標記。
在刪除 Key 值的時候,如果當前在擴容過程中,優先查找 bucket,即新桶,找到一個以后把它對應的 Key、Value 都置空。如果 bucket 里面找不到,才會去 oldbucket 中去查找。
每次插入 Key 值的時候,都會判斷一下當前裝載因子是否超過了 6.5,如果達到了這個極限,就立即執行擴容操作 hashGrow。這是擴容之前的準備工作。
Go
func hashGrow(t *maptype, h *hmap) {
// 如果達到了最大裝載因子,就需要擴容。
// 不然的話,一個桶后面鏈表跟著一大堆的 overflow 桶
bigger := uint8(1)
if !overLoadFactor(int64(h.count), h.B) {
bigger = 0
h.flags |= sameSizeGrow
}
// 把 hmap 的舊桶的指針指向當前桶
oldbuckets := h.buckets
// 生成新的擴容以后的桶,hmap 的 buckets 指針指向擴容以后的桶。
newbuckets, nextOverflow := makeBucketArray(t, h.B+bigger)
flags := h.flags &^ (iterator | oldIterator)
if h.flags&iterator != 0 {
flags |= oldIterator
}
// B 加上新的值
h.B += bigger
h.flags = flags
// 舊桶指針指向當前桶
h.oldbuckets = oldbuckets
// 新桶指針指向擴容以后的桶
h.buckets = newbuckets
h.nevacuate = 0
h.noverflow = 0
if h.extra != nil && h.extra.overflow[0] != nil {
if h.extra.overflow[1] != nil {
throw("overflow is not nil")
}
// 交換 overflow[0] 和 overflow[1] 的指向
h.extra.overflow[1] = h.extra.overflow[0]
h.extra.overflow[0] = nil
}
if nextOverflow != nil {
if h.extra == nil {
// 生成 mapextra
h.extra = new(mapextra)
}
h.extra.nextOverflow = nextOverflow
}
// 實際拷貝鍵值對的過程在 evacuate() 中
}
用圖表示出它的流程:
hashGrow 操作算是擴容之前的準備工作,實際拷貝的過程在 evacuate 中。
hashGrow 操作會先生成擴容以后的新的桶數組。新的桶數組的大小是之前的2倍。然后 hmap 的 buckets 會指向這個新的擴容以后的桶,而 oldbuckets 會指向當前的桶數組。
處理完 hmap 以后,再處理 mapextra,nextOverflow 的指向原來的 overflow 指針,overflow 指針置為 null。
到此就做好擴容之前的準備工作了。
Go
func evacuate(t *maptype, h *hmap, oldbucket uintptr) {
b := (*bmap)(add(h.oldbuckets, oldbucket*uintptr(t.bucketsize)))
// 在準備擴容之前桶的個數
newbit := h.noldbuckets()
alg := t.key.alg
if !evacuated(b) {
// TODO: reuse overflow buckets instead of using new ones, if there
// is no iterator using the old buckets. (If !oldIterator.)
var (
x, y *bmap // 在新桶里面 低位桶和高位桶
xi, yi int // key 和 value 值的索引值分別為 xi , yi
xk, yk unsafe.Pointer // 指向 x 和 y 的 key 值的指針
xv, yv unsafe.Pointer // 指向 x 和 y 的 value 值的指針
)
// 新桶中低位的一些桶
x = (*bmap)(add(h.buckets, oldbucket*uintptr(t.bucketsize)))
xi = 0
// 擴容以后的新桶中低位的第一個 key 值
xk = add(unsafe.Pointer(x), dataOffset)
// 擴容以后的新桶中低位的第一個 key 值對應的 value 值
xv = add(xk, bucketCnt*uintptr(t.keysize))
// 如果不是等量擴容
if !h.sameSizeGrow() {
y = (*bmap)(add(h.buckets, (oldbucket+newbit)*uintptr(t.bucketsize)))
yi = 0
yk = add(unsafe.Pointer(y), dataOffset)
yv = add(yk, bucketCnt*uintptr(t.keysize))
}
// 依次遍歷溢出桶
for ; b != nil; b = b.overflow(t) {
k := add(unsafe.Pointer(b), dataOffset)
v := add(k, bucketCnt*uintptr(t.keysize))
// 遍歷 key - value 鍵值對
for i := 0; i < bucketCnt; i, k, v = i+1, add(k, uintptr(t.keysize)), add(v, uintptr(t.valuesize)) {
top := b.tophash[i]
if top == empty {
b.tophash[i] = evacuatedEmpty
continue
}
if top < minTopHash {
throw("bad map state")
}
k2 := k
// key 值如果是指針,則取出指針里面的值
if t.indirectkey {
k2 = *((*unsafe.Pointer)(k2))
}
useX := true
if !h.sameSizeGrow() {
// 如果不是等量擴容,則需要重新計算 hash 值,不管是高位桶 x 中,還是低位桶 y 中
hash := alg.hash(k2, uintptr(h.hash0))
if h.flags&iterator != 0 {
if !t.reflexivekey && !alg.equal(k2, k2) {
// 如果兩個 key 不相等,那么他們倆極大可能舊的 hash 值也不相等。
// tophash 對要遷移的 key 值也是沒有多大意義的,所以我們用低位的 tophash 輔助擴容,標記一些狀態。
// 為下一個級 level 重新計算一些新的隨機的 hash 值。以至于這些 key 值在多次擴容以后依舊可以均勻分布在所有桶中
// 判斷 top 的最低位是否為1
if top&1 != 0 {
hash |= newbit
} else {
hash &^= newbit
}
top = uint8(hash >> (sys.PtrSize*8 - 8))
if top < minTopHash {
top += minTopHash
}
}
}
useX = hash&newbit == 0
}
if useX {
// 標記低位桶存在 tophash 中
b.tophash[i] = evacuatedX
// 如果 key 的索引值到了桶最后一個,就新建一個 overflow
if xi == bucketCnt {
newx := h.newoverflow(t, x)
x = newx
xi = 0
xk = add(unsafe.Pointer(x), dataOffset)
xv = add(xk, bucketCnt*uintptr(t.keysize))
}
// 把 hash 的高8位再次存在 tophash 中
x.tophash[xi] = top
if t.indirectkey {
// 如果是指針指向 key ,那么拷貝指針指向
*(*unsafe.Pointer)(xk) = k2 // copy pointer
} else {
// 如果是指針指向 key ,那么進行值拷貝
typedmemmove(t.key, xk, k) // copy value
}
// 同理拷貝 value
if t.indirectvalue {
*(*unsafe.Pointer)(xv) = *(*unsafe.Pointer)(v)
} else {
typedmemmove(t.elem, xv, v)
}
// 繼續遷移下一個
xi++
xk = add(xk, uintptr(t.keysize))
xv = add(xv, uintptr(t.valuesize))
} else {
// 這里是高位桶 y,遷移過程和上述低位桶 x 一致,下面就不再贅述了
b.tophash[i] = evacuatedY
if yi == bucketCnt {
newy := h.newoverflow(t, y)
y = newy
yi = 0
yk = add(unsafe.Pointer(y), dataOffset)
yv = add(yk, bucketCnt*uintptr(t.keysize))
}
y.tophash[yi] = top
if t.indirectkey {
*(*unsafe.Pointer)(yk) = k2
} else {
typedmemmove(t.key, yk, k)
}
if t.indirectvalue {
*(*unsafe.Pointer)(yv) = *(*unsafe.Pointer)(v)
} else {
typedmemmove(t.elem, yv, v)
}
yi++
yk = add(yk, uintptr(t.keysize))
yv = add(yv, uintptr(t.valuesize))
}
}
}
// Unlink the overflow buckets & clear key/value to help GC.
if h.flags&oldIterator == 0 {
b = (*bmap)(add(h.oldbuckets, oldbucket*uintptr(t.bucketsize)))
// Preserve b.tophash because the evacuation
// state is maintained there.
if t.bucket.kind&kindNoPointers == 0 {
memclrHasPointers(add(unsafe.Pointer(b), dataOffset), uintptr(t.bucketsize)-dataOffset)
} else {
memclrNoHeapPointers(add(unsafe.Pointer(b), dataOffset), uintptr(t.bucketsize)-dataOffset)
}
}
}
// Advance evacuation mark
if oldbucket == h.nevacuate {
h.nevacuate = oldbucket + 1
// Experiments suggest that 1024 is overkill by at least an order of magnitude.
// Put it in there as a safeguard anyway, to ensure O(1) behavior.
stop := h.nevacuate + 1024
if stop > newbit {
stop = newbit
}
for h.nevacuate != stop && bucketEvacuated(t, h, h.nevacuate) {
h.nevacuate++
}
if h.nevacuate == newbit { // newbit == # of oldbuckets
// Growing is all done. Free old main bucket array.
h.oldbuckets = nil
// Can discard old overflow buckets as well.
// If they are still referenced by an iterator,
// then the iterator holds a pointers to the slice.
if h.extra != nil {
h.extra.overflow[1] = nil
}
h.flags &^= sameSizeGrow
}
}
}
上述函數就是遷移過程最核心的拷貝工作了。
整個遷移過程并不難。這里需要說明的是 x ,y 代表的意義。由于擴容以后,新的桶數組是原來桶數組的2倍。用 x 代表新的桶數組里面低位的那一半,用 y 代表高位的那一半。其他的變量就是一些標記了,游標和標記 key - value 原來所在的位置。詳細的見代碼注釋。
上圖中表示了遷移開始之后的過程。可以看到舊的桶數組里面的桶在遷移到新的桶中,并且新的桶也在不斷的寫入新的 key 值。
一直拷貝鍵值對,直到舊桶中所有的鍵值都拷貝到了新的桶中。
最后一步就是釋放舊桶,oldbuckets 的指針置為 null。到此,一次遷移過程就完全結束了。
5. 等量擴容
嚴格意義上這種方式并不能算是擴容。但是函數名是 Grow,姑且暫時就這么叫吧。
在 go1.8 的版本開始,添加了 sameSizeGrow,當 overflow buckets 的數量超過一定數量 (2^B) 但裝載因子又未達到 6.5 的時候,此時可能存在部分空的bucket,即 bucket 的使用率低,這時會觸發sameSizeGrow,即 B 不變,但走數據遷移流程,將 oldbuckets 的數據重新緊湊排列提高 bucket 的利用率。當然在 sameSizeGrow 過程中,不會觸發 loadFactorGrow。
四. Map 實現中的一些優化
讀到這里,相信讀者心里應該很清楚如何設計并實現一個 Map 了吧。包括 Map 中的各種操作的實現。在探究如何實現一個線程安全的 Map 之前,先把之前說到個一些亮點優化點,小結一下。
在 Redis 中,采用增量式擴容的方式處理哈希沖突。當平均查找長度超過 5 的時候就會觸發增量擴容操作,保證 hash 表的高性能。
同時 Redis 采用頭插法,保證插入 key 值時候的性能。
在 Java 中,當桶的個數超過了64個以后,并且沖突節點為8或者大于8,這個時候就會觸發紅黑樹轉換。這樣能保證鏈表在很長的情況下,查找長度依舊不會太長,并且紅黑樹保證最差情況下也支持 O(log n) 的時間復雜度。
Java 在遷移之后有一個非常好的設計,只需要比較遷移之后桶個數的最高位是否為0,如果是0,key 在新桶內的相對位置不變,如果是1,則加上桶的舊的桶的個數 oldCap 就可以得到新的位置。
在 Go 中優化的點比較多:
哈希算法選用高效的 memhash 算法 和 CPU AES指令集。AES 指令集充分利用 CPU 硬件特性,計算哈希值的效率超高。
key - value 的排列設計成 key 放在一起,value 放在一起,而不是key,value成對排列。這樣方便內存對齊,數據量大了以后節約內存對齊造成的一些浪費。
key,value 的內存大小超過128字節以后自動轉成存儲一個指針。
tophash 數組的設計加速了 key 的查找過程。tophash 也被復用,用來標記擴容操作時候的狀態。
用位運算轉換求余操作,m % n ,當 n = 1 << B 的時候,可以轉換成 m & (1 << B - 1) 。
增量式擴容。
等量擴容,緊湊操作。
Go 1.9 版本以后,Map 原生就已經支持線程安全。(在下一章中重點討論這個問題)
當然 Go 中還有一些需要再優化的地方:
在遷移的過程中,當前版本不會重用 overflow 桶,而是直接重新申請一個新的桶。這里可以優化成優先重用沒有指針指向的 overflow 桶,當沒有可用的了,再去申請一個新的。這一點作者已經寫在了 TODO 里面了。
動態合并多個 empty 的桶。
當前版本中沒有 shrink 操作,Map 只能增長而不能收縮。這塊 Redis 有相關的實現。
(鑒于單篇文章的長度,線程安全部分全部放到下篇去講,稍后更新下篇)
- Go語言基礎篇
- Go語言簡介
- Go語言教程
- Go語言環境安裝
- Go語言結構
- Go語言基礎語法
- Go語言數據類型
- Go語言變量
- Go語言提高篇
- Go語言實現貪吃蛇
- Go 諺語
- 解決連通性問題的四種算法
- golang 幾種字符串的連接方式
- Go JSON 技巧
- Go += 包版本
- Golang 編譯成 DLL 文件
- Go指南:牛頓法開方
- Go語言異步服務器框架原理和實現
- Golang適合高并發場景的原因分析
- 如何設計并實現一個線程安全的 Map ?(上篇)
- go語言執行cmd命令關機、重啟等
- IT雜項
- IT 工程師的自我管理
- IT界不為人知的14個狗血故事
- Go語言版本說明
- Go 1.10中值得關注的幾個變化
- Golang面試題解析
- Golang面試題
- Golang語言web開發
- golang 模板(template)的常用基本語法
- go語言快速入門:template模板
- Go Template學習筆記
- LollipopGo框架
- 框架簡介
- Golang語言版本設計模式
- 設計模式-單例模式
- Golang語言資源下載
- 公眾賬號
- leaf
- 合作講師
- 公開課目錄