<ruby id="bdb3f"></ruby>

    <p id="bdb3f"><cite id="bdb3f"></cite></p>

      <p id="bdb3f"><cite id="bdb3f"><th id="bdb3f"></th></cite></p><p id="bdb3f"></p>
        <p id="bdb3f"><cite id="bdb3f"></cite></p>

          <pre id="bdb3f"></pre>
          <pre id="bdb3f"><del id="bdb3f"><thead id="bdb3f"></thead></del></pre>

          <ruby id="bdb3f"><mark id="bdb3f"></mark></ruby><ruby id="bdb3f"></ruby>
          <pre id="bdb3f"><pre id="bdb3f"><mark id="bdb3f"></mark></pre></pre><output id="bdb3f"></output><p id="bdb3f"></p><p id="bdb3f"></p>

          <pre id="bdb3f"><del id="bdb3f"><progress id="bdb3f"></progress></del></pre>

                <ruby id="bdb3f"></ruby>

                ??碼云GVP開源項目 12k star Uniapp+ElementUI 功能強大 支持多語言、二開方便! 廣告
                [TOC] # 簡介 Lettuce 是一個可伸縮的線程安全的 Redis 客戶端,支持同步、異步和響應式模式。多個線程可以共享一個連接實例,而不必擔心多線程并發問題。它基于優秀 netty NIO 框架構建,支持 Redis 的高級功能,如 Sentinel,集群,流水線,自動重新連接和 Redis 數據模型。 # redis單機情況 目前,Lettuce 官方發布的最新的版本為[5.0.4](https://lettuce.io/core/5.0.4.RELEASE/api/),自 5.X 開始,Lettuce 進行了全面重構,與之前的版本相差較大,甚至連包名都全然不同(點擊可查看[5.0.4](https://lettuce.io/core/5.0.4.RELEASE/api/)和[4.4.5](https://lettuce.io/lettuce-4/4.4.5.Final/api/)版本),本文基于最新的版本 5.0.4 介紹 Lettuce 的用法,pom 文件中添加 Lettuce 依賴如下: ~~~ <dependency> <groupId>io.lettuce</groupId> <artifactId>lettuce-core</artifactId> <version>5.0.4.RELEASE</version> </dependency> ~~~ ~~~ import io.lettuce.core.RedisClient; import io.lettuce.core.RedisURI; import io.lettuce.core.api.StatefulRedisConnection; import io.lettuce.core.api.sync.RedisCommands; public class Single { public static void main(String[] args) { // 利用redis-server所綁定的IP和Port創建URI, RedisURI redisURI = RedisURI.create("127.0.0.1", 6379); // 創建集Redis單機模式客戶端 RedisClient redisClient = RedisClient.create(redisURI); // 開啟連接 StatefulRedisConnection<String, String> connect = redisClient.connect(); RedisCommands<String, String> cmd = connect.sync(); // set操作,成功則返回OK cmd.set("key", "value-test"); // get操作,成功命中則返回對應的value,否則返回null cmd.get("key"); // 刪除指定的key cmd.del("key"); // 獲取redis-server信息,內容極為豐富 cmd.info(); // 列表操作 String[] valuelist = {"China","Americal","England"}; // 將一個或多個值插入到列表頭部,此處插入多個 cmd.lpush("listName", valuelist); // 移出并獲取列表的第一個元素 System.out.println(cmd.lpop("listName")); // 獲取列表長度 System.out.println(cmd.llen("listName")); // 通過索引獲取列表中的元素 System.out.println(cmd.lindex("listName", 1)); } } ~~~ 注意 如果 redis-server 設置了訪問密碼,在進行緩存讀寫操作之前需要進行鑒權,代碼片段如下: ~~~ // 開啟連接 StatefulRedisConnection<String, String> connect = redisClient.connect(); RedisCommands<String, String> cmd = connect.sync(); // 如果redis-server設置了訪問密碼,則需鑒權,否則不可訪問 cmd.auth("my-password"); // set操作,成功則返回OK cmd.set("key", "value-test"); ~~~ # redis集群模式 首先介紹一個集群模式下的實例,對比單機模式,讀者不難發現,除了創建客戶端差別明顯外,其它部分幾無差別 ~~~ import java.util.ArrayList; import java.util.List; import io.lettuce.core.RedisURI; import io.lettuce.core.cluster.RedisClusterClient; import io.lettuce.core.cluster.api.StatefulRedisClusterConnection; import io.lettuce.core.cluster.api.sync.RedisClusterCommands; public class Cluster { public static void main(String[] args) { // 利用redis-server所綁定的IP和Port創建URI, List<RedisURI> redisURIList = new ArrayList<RedisURI>(); String[] ipSet = {"100.x.x.152","100.x.x.153","100.x.x.154"}; int port = 6379; for (int i=0; i<3; i++) { RedisURI temp = RedisURI.create(ipSet[i], port); redisURIList.add(temp); } // 創建集Redis集群模式客戶端 RedisClusterClient redisClusterClient = RedisClusterClient.create(redisURIList); // 連接到Redis集群 StatefulRedisClusterConnection<String, String> clusterCon = redisClusterClient.connect(); // 獲取集群同步命令對象 RedisClusterCommands<String, String> commands = clusterCon.sync(); // set操作,成功則返回OK commands.set("key", "value-test"); // get操作,成功命中則返回對應的value,否則返回null commands.get("key"); // 刪除指定的key commands.del("key"); // 獲取redis-server信息,內容極為豐富 commands.info(); // 列表操作 String[] valuelist = {"China","Americal","England"}; // 將一個或多個值插入到列表頭部,此處插入多個 commands.lpush("listName", valuelist); // 移出并獲取列表的第一個元素 commands.lpop("listName"); // 獲取列表長度 commands.llen("listName"); // 通過索引獲取列表中的元素 commands.lindex("listName", 1); } } ~~~ ## 重要接口說明 與單機模式相比,集群模式下命令集要豐富得多,如下圖所示, Lettuce 提供的方法可支持集群模式下的所有命令。其中,有幾個重要的方法讀者需要掌握:clusterAddSlots, clusterFailover, clusterForget, clusterInfo, clusterMeet, clusterNodes 及clusterReplicate ![](https://img.kancloud.cn/c6/8f/c68fa70fc529cffdc25920f5230760a4_614x564.png) 1. clusterMeet(String ip, int port):以當前節點為基準,將 ip 和 port 所對應的節點納入集群; 2. clusterAddSlots(int ...slots):為當前節點指派 slot,只有被指派 slot 的節點才是真正意義上的 master; 3. clusterReplicate(String nodeId):將當前節點設置為 nodeId 所對應的主節點的從; 4. clusterFailover(boolean force):發起故障倒換,將當前節點升為主節點,當前節點原本對應的主節點則降為從節點; 5. clusterForget(String nodeId):將 nodeId 所對應的節點從集群中刪除; 6. clusterInfo():獲取集群運行狀態信息; 7. clusterNodes():獲取集群節點的詳細信息; ## 小技巧 使用 Lettuce時,創建客戶端之后還需連接到集群方可,分別調用了 create() 方法和 connect() 方法,如下代碼片段所示: ~~~ // 創建集Redis集群模式客戶端 RedisClusterClient redisClusterClient = RedisClusterClient.create(redisURIList); // 連接到Redis集群 StatefulRedisClusterConnection<String, String> clusterCon = redisClusterClient.connect(); ~~~ 不知讀者是否思考過一個問題:集群連接和單機連接到底有什么區別?為什么一個集群連接就可以操作集群?事實上,所謂集群連接本質上就是一個單機連接的集合,即集群連接包含了到集群中所有節點的連接(單機連接)。既然如此,在集群模式下,當我們需要用到單機連接時,就不必再創建連接了,而是直接從集群連接中“取”出需要的單機連接,這是非常有益的,可以極大的減少資源的消耗,提升性能。如下實例: ~~~ // 創建集Redis集群模式客戶端 RedisClusterClient redisClusterClient = RedisClusterClient.create(redisURIList); // 連接到Redis集群 StatefulRedisClusterConnection<String, String> clusterCon = redisClusterClient.connect(); // 從集群連接中取出單機連接 // 方式1:根據ip和端口獲取單機連接 StatefulRedisConnection<String, String> conn1 = clusterCon.getConnection(host, port); // 方式2:根據nodeId獲取單機連接 StatefulRedisConnection<String, String> conn2 = clusterCon.getConnection(nodeId); ~~~ # Lettuce 創建 Redis 集群 Redis 集群模式至少需要 3 個主節點,作為舉例,本文搭建一個 3 主 3 從的精簡集群,麻雀雖小,五臟俱全。主從關系如下圖所示,其中 M 代碼 Master 節點,S 代表 Slave 節點,A-M 和 A-S 為一對主從節點。 ![](https://img.kancloud.cn/5a/fb/5afb0cd7cebb8bc40ae6ae4aeaca0cb0_601x357.png) 由于筆者只有一臺物理機,因此在同一臺機器上分別啟動 6 個 redis-server 進程以創建 3 主 3 從Redis集群,6 個 redis-server 進程分別綁定端口號為 6379,6380,6381,6382,6383,6384 ## Redis 集群創建的步驟 **(1)相互感知,初步形成集群** 在上文中,我們已經成功拉起了 6 個 redis-server 進程,每個進程視為一個節點,這些節點仍處于孤立狀態,它們相互之間無法感知對方的存在,既然要創建集群,首先需要讓這些孤立的節點相互感知,形成一個集群; **(2)分配 Slot 給期望的主節點** 形成集群之后,仍然無法提供服務,Redis 集群模式下,數據存儲于 16384 個 Slot 中,我們需要將這些 Slot 指派給期望的主節點。何為期望呢?我們有 6 個節點,3 主 3 備,我們只能將 Slot 指派給 3 個主節點,至于哪些節點為主節點,我們可以自定義。 **(3)設置從節點** Slot 分配完成后,被分配 Slot 的節點將成為真正可用的主節點,剩下的沒有分到 Slot 的節點,即便狀態標志為 Master,實際上也不能提供服務。接下來,出于可靠性的考量,我們需要將這些沒有被指派 Slot 的節點指定為可用主節點的從節點(Slave)。 經過上述三個步驟,一個精簡的 3 主 3 從 Redis 集群就搭建完成了。 ## 基于 Lettuce 的創建集群代碼 根據上述步驟,基于 Lettuce 創建集群的代碼如下(僅供入門參考): ~~~ import java.util.ArrayList; import java.util.List; import io.lettuce.core.RedisClient; import io.lettuce.core.RedisCommandTimeoutException; import io.lettuce.core.RedisConnectionException; import io.lettuce.core.RedisException; import io.lettuce.core.RedisURI; import io.lettuce.core.api.StatefulRedisConnection; public class CreateCluster { public static void main(String[] args) throws InterruptedException { createCluster(); } private static void createCluster() throws InterruptedException { // 初始化集群節點列表,并指定主節點列表和從節點列表 List<ClusterNode> clusterNodeList = new ArrayList<ClusterNode>(); List<ClusterNode> masterNodeList = new ArrayList<ClusterNode>(); List<ClusterNode> slaveNodeList = new ArrayList<ClusterNode>(); String[] endpoints = {"127.0.0.1:6379", "127.0.0.1:6380", "127.0.0.1:6381" , "127.0.0.1:6382", "127.0.0.1:6383", "127.0.0.1:6384"}; int index = 0; for (String endpoint : endpoints) { String[] ipAndPort = endpoint.split(":"); ClusterNode node = new ClusterNode(ipAndPort[0], Integer.parseInt(ipAndPort[1])); clusterNodeList.add(node); // 將6379,6380,6381設置為主節點,其余為從節點 if (index < 3) { masterNodeList.add(node); } else { slaveNodeList.add(node); } index++; } // 分別與各個Redis節點建立通信連接 for (ClusterNode node : clusterNodeList) { RedisURI redisUri = RedisURI.Builder.redis(node.getHost(), node.getPort()).build(); RedisClient redisClient = RedisClient.create(redisUri); try { StatefulRedisConnection<String, String> connection = redisClient.connect(); node.setConnection(connection); } catch (RedisException e) { System.out.println("connection failed-->" + node.getHost() + ":" + node.getPort()); } } // 執行cluster meet命令是各個孤立的節點相互感知,初步形成集群。 // 只需以一個節點為基準,讓所有節點與之meet即可 ClusterNode firstNode = null; for (ClusterNode node : clusterNodeList) { if (firstNode == null) { firstNode = node; } else { try { node.getConnection().sync().clusterMeet(firstNode.getHost(), firstNode.getPort()); } catch (RedisCommandTimeoutException | RedisConnectionException e) { System.out.println("meet failed-->" + node.getHost() + ":" + node.getPort()); } } } // 為主節點指派slot,將16384個slot分成三份:5461,5461,5462 int[] slots = {0, 5460, 5461, 10921, 10922, 16383}; index = 0; for (ClusterNode node : masterNodeList) { node.setSlotsBegin(slots[index]); index++; node.setSlotsEnd(slots[index]); index++; } // 通過與各個主節點的連接,執行addSlots命令為主節點指派slot System.out.println("Start to set slots..."); for (ClusterNode node : masterNodeList) { try { node.getConnection().sync().clusterAddSlots(createSlots(node.getSlotsBegin(), node.getSlotsEnd())); } catch (RedisCommandTimeoutException | RedisConnectionException e) { System.out.println("add slots failed-->" + node.getHost() + ":" + node.getPort()); } } // 延時5s,等待slot指派完成 sleep(5000); // 為已經指派slot的主節點設置從節點,6379,6380,6381分別對應6382,6383,6384 index = 0; for (ClusterNode node : slaveNodeList) { try { node.getConnection().sync().clusterReplicate(masterNodeList.get(index).getMyId()); } catch (RedisCommandTimeoutException | RedisConnectionException e) { System.out.println("replicate failed-->" + node.getHost() + ":" + node.getPort()); } } // 關閉連接,銷毀客戶端,釋放資源 for (ClusterNode node : clusterNodeList) { node.getConnection().close(); node.getClient().shutdown(); } } public static int[] createSlots(int from, int to) { int[] result = new int[to - from + 1]; int counter = 0; for (int i = from; i <= to; i++) { result[counter++] = i; } return result; } } ~~~ ~~~ //定義集群節點描述類 class ClusterNode { private String host; private int port; private int slotsBegin; private int slotsEnd; private String myId; private String masterId; private StatefulRedisConnection<String, String> connection; private RedisClient redisClient; public ClusterNode(String host, int port) { this.host = host; this.port = port; this.slotsBegin = 0; this.slotsEnd = 0; this.myId = null; this.masterId = null; } public String getHost() { return host; } public int getPort() { return port; } public void setMaster(String masterId) { this.masterId = masterId; } public String getMaster() { return masterId; } public void setMyId(String myId) { this.myId = myId; } public String getMyId() { return myId; } public void setSlotsBegin(int first) { this.slotsBegin = first; } public void setSlotsEnd(int last) { this.slotsEnd = last; } public int getSlotsBegin() { return slotsBegin; } public int getSlotsEnd() { return slotsEnd; } public void setConnection(StatefulRedisConnection<String, String> connection) { this.connection = connection; } public void setClient(RedisClient client) { this.redisClient = client; } public StatefulRedisConnection<String, String> getConnection() { return connection; } public RedisClient getClient() { return redisClient; } } ~~~ ## Lettuce方法獲取并解析集群狀態信息 Lettuce 提供了與 Redis 的 cluster info,cluster nodes 及 info 命令對應的方法,分別為:clusterInfo(), clusterNodes()和info(),是不是覺得很親切? 僅僅只是獲取信息還不夠,如此復雜的信息,雖然人可以一眼看出要點,但用程序來解析卻是一件很麻煩的事情。Lettuce 已經考慮到了這一點,為此提供了專門的方法來解析獲取到的集群節點信息,以 clusterNodes() 為例: ![](https://img.kancloud.cn/d8/fe/d8fe5c8807af858710d27c01e45d763b_643x156.png) **例子** 一個可用的 Redis 集群,其 16384 個 slot 必須全部處于正常工作狀態,換句話說,這些 slots 對應的 master 必須是正常的。以下我們通過解析 clusterNodes() 方法獲取的信息來判斷集群狀態是否正常,如果不正常,還可以進一步識別出不正常的節點。 **注意** 下面的程序僅僅是舉例,事實上,通過解析 clusterNodes() 方法獲取的信息可以獲取集群節點的運行狀態,主從關系,slot 分布等重要信息。 ![](https://img.kancloud.cn/bc/70/bc706124c35f5a7e67b10d0d5893616a_642x393.png) **例子的完整程序** ~~~ import java.util.ArrayList; import java.util.List; import io.lettuce.core.RedisURI; import io.lettuce.core.cluster.RedisClusterClient; import io.lettuce.core.cluster.api.StatefulRedisClusterConnection; import io.lettuce.core.cluster.api.sync.RedisClusterCommands; import io.lettuce.core.cluster.models.partitions.ClusterPartitionParser; import io.lettuce.core.cluster.models.partitions.Partitions; import io.lettuce.core.cluster.models.partitions.RedisClusterNode; public class ClusterState { public static void main(String[] args) { // 利用redis-server所綁定的IP和Port創建URI, List<RedisURI> redisURIList = new ArrayList<RedisURI>(); // 筆者在一臺物理機上啟動6個redis-server進程,ip均為127.X,端口為6379~6384 String ip = "127.0.0.1"; int port = 6379; for (int i = 0; i < 6; i++) { RedisURI temp = RedisURI.create(ip, port + i); redisURIList.add(temp); } // 創建集Redis集群模式客戶端 RedisClusterClient redisClusterClient = RedisClusterClient.create(redisURIList); // 連接到Redis集群 StatefulRedisClusterConnection<String, String> clusterCon = redisClusterClient.connect(); // 獲取集群同步命令對象 RedisClusterCommands<String, String> commands = clusterCon.sync(); // 獲取集群節點信息并解析 Partitions partitions = ClusterPartitionParser.parse(commands.clusterNodes()); int slotsSize = 0; for (RedisClusterNode partition : partitions) { if (partition.getFlags().contains(RedisClusterNode.NodeFlag.FAIL) || partition.getFlags().contains(RedisClusterNode.NodeFlag.EVENTUAL_FAIL) || partition.getFlags().contains(RedisClusterNode.NodeFlag.NOADDR)) { System.out.println("The node's state is not normal:" + partition.getUri()); continue; } slotsSize += partition.getSlots().size(); } if (slotsSize < 16384) { System.out.println("Cluster_slots_assigned is:" + slotsSize); } else { System.out.println("Cluster_state is OK."); } } } ~~~ # 遇到的問題 **堆內存溢出事件** 在實際應用場景下,Redis 集群可能出現節點故障下線、新節點加入、主從倒換等事件,這些事件都會導致 Redis 集群拓撲結構改變,作為客戶端的 Lettuce 自然也需要刷新保存的拓撲結構甚至重新建立連接,否則,客戶端與服務端之間的通道可能無法工作。 出于對上述原因考慮,為提高可用性,筆者曾經主導過的一個項目通過一個線程來定時檢測連接是否可用,如果不可用便重建連接。但是,當時犯了一個錯誤:重建連接時,僅僅關閉了舊的連接,卻沒有銷毀客戶端,而客戶端是極為占用資源的。 由于連接不可用的場景并不多,上述問題一直處于潛伏狀態,直到有一天網絡出現問題,因連接不可用而一次次重建連接,同時重建了客戶端。一段時間后,Lettuce 相關的線程竟堆積了近 300 個,而相關進程預設的內存不過 2G,進而出現了內存溢出。 **規避方法:** 簡而言之,對于不再使用的客戶端和連接一定要顯示的關閉,如下代碼所示: ![](https://img.kancloud.cn/62/c1/62c1a04d1e2f8eb94fa8b28b1ceb4924_643x158.png)
                  <ruby id="bdb3f"></ruby>

                  <p id="bdb3f"><cite id="bdb3f"></cite></p>

                    <p id="bdb3f"><cite id="bdb3f"><th id="bdb3f"></th></cite></p><p id="bdb3f"></p>
                      <p id="bdb3f"><cite id="bdb3f"></cite></p>

                        <pre id="bdb3f"></pre>
                        <pre id="bdb3f"><del id="bdb3f"><thead id="bdb3f"></thead></del></pre>

                        <ruby id="bdb3f"><mark id="bdb3f"></mark></ruby><ruby id="bdb3f"></ruby>
                        <pre id="bdb3f"><pre id="bdb3f"><mark id="bdb3f"></mark></pre></pre><output id="bdb3f"></output><p id="bdb3f"></p><p id="bdb3f"></p>

                        <pre id="bdb3f"><del id="bdb3f"><progress id="bdb3f"></progress></del></pre>

                              <ruby id="bdb3f"></ruby>

                              哎呀哎呀视频在线观看