雖然我們在上一節中描述的AMQP模型是通用的并且適用于所有實現,但當我們進入資源管理時,細節特定于代理實現。 因此,在本節中,我們將重點關注僅存在于“spring-rabbit”模塊中的代碼,因為此時,RabbitMQ是唯一受支持的實現。
管理與RabbitMQ代理的連接的核心組件是`ConnectionFactory`接口。 `ConnectionFactory`實現的職責是提供`org.springframework.amqp.rabbit.connection.Connection`的實例,它是`com.rabbitmq.client.Connection`的包裝器。我們提供的唯一具體實現是`CachingConnectionFactory`,默認情況下,它建立一個可由應用程序共享的單個連接代理。共享連接是可能的,因為與AMQP進行消息傳遞的“工作單元”實際上是一個“通道”(在某些方面,這類似于JMS中Connection和Session之間的關系)。可以想象,連接實例提供了`createChannel`方法。 `CachingConnectionFactory`實現支持對這些通道進行緩存,并根據通道是否為事務性為通道維護單獨的緩存。在創建`CachingConnectionFactory`的實例時,可以通過構造函數提供主機名。還應提供用戶名和密碼屬性。如果要配置通道緩存的大小(默認值為25),也可以在此處調用`setChannelCacheSize()`方法。
從版本1.3開始,`CachingConnectionFactory`可以配置為緩存連接以及通道。 在這種情況下,每次調用`createConnection()`都會創建一個新連接(或從緩存中檢索一個空閑連接)。 關閉連接會將其返回到緩存(如果尚未達到緩存大小)。 在此類連接上創建的通道也會被緩存。 在某些環境中使用單獨的連接可能很有用,例如從HA集群中使用,與負載均衡器一起使用,以連接到不同的集群成員。 將`cacheMode`設置為`CacheMode.CONNECTION`。
>這不限制連接數,它指定允許的空閑打開連接數。
>
從1.5.5版開始,提供了一個新屬性`connectionLimit`。 設置此選項后,它將限制允許的連接總數。 設置后,如果達到限制,則使用`channelCheckoutTimeLimit`等待連接變為空閑。 如果超過時間,則拋出`AmqpTimeoutException`。
>當緩存模式為CONNECTION時,不支持自動聲明隊列等(請參閱“自動聲明交換,隊列和綁定”一節)。
>此外,在編寫本文時,rabbitmq-client庫默認為每個連接(5個線程)創建一個固定的線程池。 使用大量連接時,應考慮在CachingConnectionFactory上設置自定義執行程序。 然后,所有連接都將使用相同的執行程序,并且可以共享其線程。 執行程序的線程池應該是無限制的,或者為預期的利用率設置適當的(通常,每個連接至少有一個線程)。 如果在每個連接上創建多個通道,則池大小將影響并發性,因此變量(或簡單緩存)線程池執行程序將是最合適的。
>
重要的是要理解緩存大小(默認情況下)不是限制,而只是可以緩存的通道數。 如果緩存大小為10,則實際上可以使用任意數量的通道。 如果正在使用10個以上的通道并將它們全部返回到緩存中,則10將進入緩存; 其余的將在物理上關閉。
從版本1.6開始,默認通道緩存大小從1增加到25.在大容量,多線程環境中,小緩存意味著以高速率創建和關閉通道。 增加默認緩存大小將避免此開銷。 您應該通過RabbitMQ管理UI監視正在使用的通道,如果您看到許多通道正在創建和關閉,請考慮進一步增加緩存大小。 緩存只會按需增長(以滿足應用程序的并發要求),因此此更改不會影響現有的低容量應用程序。
從版本1.4.2開始,`CachingConnectionFactory`具有屬性`channelCheckoutTimeout`。 當此屬性大于零時,`channelCacheSize`將成為可以在連接上創建的通道數的限制。 如果達到限制,則調用線程將阻塞,直到通道可用或達到此超時,在這種情況下拋出`AmqpTimeoutException`。
>框架內使用的通道(例如`RabbitTemplate`)將可靠地返回到緩存。 如果在框架之外創建通道(例如,通過直接訪問連接并調用`createChannel()`),則必須可靠地(通過關閉)返回它們(可能在`finally`塊中),以避免耗盡通道。
>
~~~java
CachingConnectionFactory connectionFactory = new CachingConnectionFactory("somehost");
connectionFactory.setUsername("guest");
connectionFactory.setPassword("guest");
Connection connection = connectionFactory.createConnection();
~~~
使用xml配置大概如下:
~~~xml
<bean id="connectionFactory"
class="org.springframework.amqp.rabbit.connection.CachingConnectionFactory">
<constructor-arg value="somehost"/>
<property name="username" value="guest"/>
<property name="password" value="guest"/>
</bean>
~~~
>還有一個SingleConnectionFactory實現,它只在框架的單元測試代碼中可用。 它比CachingConnectionFactory更簡單,因為它不緩存通道,但由于缺乏性能和彈性,它不適用于簡單測試之外的實際使用。 如果由于某種原因發現需要實現自己的ConnectionFactory,則AbstractConnectionFactory基類可以提供一個很好的起點。
>
可以使用rabbit命名空間快速方便地創建ConnectionFactory:
~~~xml
<rabbit:connection-factory id="connectionFactory"/>
~~~
在大多數情況下,這將是更好的選擇,因為框架可以為您選擇最佳默認值。 創建的實例將是CachingConnectionFactory。 請記住,通道的默認緩存大小為25.如果要緩存更多通道,請通過channelCacheSize屬性設置更大的值。 在XML中它看起來像這樣:
~~~xml
<bean id="connectionFactory"
class="org.springframework.amqp.rabbit.connection.CachingConnectionFactory">
<constructor-arg value="somehost"/>
<property name="username" value="guest"/>
<property name="password" value="guest"/>
<property name="channelCacheSize" value="50"/>
</bean>
~~~
使用命名空間,您只需添加channel-cache-size屬性:
~~~
<rabbit:connection-factory
id="connectionFactory" channel-cache-size="50"/>
~~~
默認緩存模式是CHANNEL,但您可以將其配置為緩存連接; 在這種情況下,我們使用`connection-cache-size`:
~~~
<rabbit:connection-factory
id="connectionFactory" cache-mode="CONNECTION" connection-cache-size="25"/>
~~~
可以使用命名空間提供主機和端口屬性
~~~
<rabbit:connection-factory
id="connectionFactory" host="somehost" port="5672"/>
~~~
或者,如果在集群環境中運行,請使用addresses屬性。
~~~
<rabbit:connection-factory
id="connectionFactory" addresses="host1:5672,host2:5672"/>
~~~
這是一個自定義線程工廠的示例,它使用rabbitmq-作為線程名稱的前綴。
~~~xml
<rabbit:connection-factory id="multiHost" virtual-host="/bar" addresses="host1:1234,host2,host3:4567"
thread-factory="tf"
channel-cache-size="10" username="user" password="password" />
<bean id="tf" class="org.springframework.scheduling.concurrent.CustomizableThreadFactory">
<constructor-arg value="rabbitmq-" />
</bean>
~~~
>當應用程序配置單個CachingConnectionFactory時,默認情況下使用Spring Boot自動配置,當Broker阻止連接時,應用程序將停止工作。 當它被經紀人阻止時,其任何客戶都會停止工作。 如果我們在同一個應用程序中有生產者和消費者,那么當生產者阻止連接時,我們可能會遇到死鎖,因為Broker上沒有資源,消費者因為連接被阻止而無法釋放它們。 為了緩解這個問題,只需要一個具有相同選項的單獨的CachingConnectionFactory實例 - 一個用于生產者,一個用于消費者。 不建議事務生產者使用單獨的CachingConnectionFactory,因為它們應該重用與消費者事務關聯的
>
從版本2.0.2開始,`RabbitTemplate`具有自動使用第二個連接工廠的配置選項,除非正在使用事務。 有關詳細信息,請參閱“使用單獨連接”一節。 發送者連接的`ConnectionNameStrategy`與調用該方法的結果附加`.publisher`的主策略相同。
從版本1.7.7開始,提供`AmqpResourceNotAvailableException`,當`SimpleConnection.createChannel()`無法創建`Channel`時,會拋出AmqpResourceNotAvailableException,例如,因為達到了`channelMax`限制且緩存中沒有可用的通道。 可以在RetryPolicy中使用此異常來在某些退避后恢復操作。
- 1.前言
- 2.介紹
- 2.1 快速瀏覽
- 3.參考
- 3.1 使用spring amqp
- 3.1.1 AMQP抽象
- 3.1.2 資源的連接和管理
- 介紹
- 配置底層客戶端連接工廠
- RabbitConnectionFactoryBean和配置SSL
- 路由連接工廠
- 隊列親和力和LocalizedQueueConnectionFactory
- 發送確認和返回
- 3.1.3 添加自定義客戶端連接屬性
- 3.1.4 AmqpTemplate
- 介紹
- 添加重試功能
- 發送消息是異步的 - 如何檢測成功和失敗
- 發布的確認和返回
- 3.1.5 發送消息
- 介紹
- 消息構建 API
- 發布的返回
- 3.1.6 接收消息
- 介紹
- 輪詢消費者
- 異步消費者