<ruby id="bdb3f"></ruby>

    <p id="bdb3f"><cite id="bdb3f"></cite></p>

      <p id="bdb3f"><cite id="bdb3f"><th id="bdb3f"></th></cite></p><p id="bdb3f"></p>
        <p id="bdb3f"><cite id="bdb3f"></cite></p>

          <pre id="bdb3f"></pre>
          <pre id="bdb3f"><del id="bdb3f"><thead id="bdb3f"></thead></del></pre>

          <ruby id="bdb3f"><mark id="bdb3f"></mark></ruby><ruby id="bdb3f"></ruby>
          <pre id="bdb3f"><pre id="bdb3f"><mark id="bdb3f"></mark></pre></pre><output id="bdb3f"></output><p id="bdb3f"></p><p id="bdb3f"></p>

          <pre id="bdb3f"><del id="bdb3f"><progress id="bdb3f"></progress></del></pre>

                <ruby id="bdb3f"></ruby>

                ??碼云GVP開源項目 12k star Uniapp+ElementUI 功能強大 支持多語言、二開方便! 廣告
                [TOC] # 依賴 ~~~ <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-amqp</artifactId> </dependency> ~~~ # 配置文件 ~~~ # rabbitmq的ip和端口 spring.rabbitmq.host=127.0.0.1 spring.rabbitmq.port=5672 # 賬號密碼 spring.rabbitmq.username=guest spring.rabbitmq.password=guest # 開啟發送確認 #spring.rabbitmq.publisher-confirms=true # 開啟發送失敗退回 #spring.rabbitmq.publisher-returns=true #spring.rabbitmq.dynamic=true # 設置手動應答 #spring.rabbitmq.listener.simple.acknowledge-mode=manual # spring.rabbitmq.listener.direct.acknowledge-mode=manual #spring.rabbitmq.cache.connection.mode=channel # 虛擬主機 #spring.rabbitmq.virtual-host=/ ~~~ # 參數配置詳解 ## 基礎信息 ~~~ spring.rabbitmq.host: 默認localhost spring.rabbitmq.port: 默認5672 spring.rabbitmq.username: 用戶名 spring.rabbitmq.password: 密碼 spring.rabbitmq.virtual-host: 連接到代理時用的虛擬主機 spring.rabbitmq.addresses: 連接到server的地址列表(以逗號分隔),先addresses后host spring.rabbitmq.requested-heartbeat: 請求心跳超時時間,0為不指定,如果不指定時間單位默認為妙 spring.rabbitmq.publisher-confirms: 是否啟用【發布確認】,默認false spring.rabbitmq.publisher-returns: 是否啟用【發布返回】,默認false spring.rabbitmq.connection-timeout: 連接超時時間,單位毫秒,0表示永不超時 ~~~ ## SSL ~~~ spring.rabbitmq.ssl.enabled: 是否支持ssl,默認false spring.rabbitmq.ssl.key-store: 持有SSL certificate的key store的路徑 spring.rabbitmq.ssl.key-store-password: 訪問key store的密碼 spring.rabbitmq.ssl.trust-store: 持有SSL certificates的Trust store spring.rabbitmq.ssl.trust-store-password: 訪問trust store的密碼 spring.rabbitmq.ssl.trust-store-type=JKS:Trust store 類型. spring.rabbitmq.ssl.algorithm: ssl使用的算法,默認由rabiitClient配置 spring.rabbitmq.ssl.validate-server-certificate=true:是否啟用服務端證書驗證 spring.rabbitmq.ssl.verify-hostname=true 是否啟用主機驗證 ~~~ ## 緩存cache ~~~ spring.rabbitmq.cache.channel.size: 緩存中保持的channel數量 spring.rabbitmq.cache.channel.checkout-timeout: 當緩存數量被設置時,從緩存中獲取一個channel的超時時間,單位毫秒;如果為0,則總是創建一個新channel spring.rabbitmq.cache.connection.size: 緩存的channel數,只有是CONNECTION模式時生效 spring.rabbitmq.cache.connection.mode=channel: 連接工廠緩存模式:channel 和 connection ~~~ ## Listener ~~~ spring.rabbitmq.listener.type=simple: 容器類型.simple或direct spring.rabbitmq.listener.simple.auto-startup=true: 是否啟動時自動啟動容器 spring.rabbitmq.listener.simple.acknowledge-mode: 表示消息確認方式,其有三種配置方式,分別是none、manual和auto;默認auto spring.rabbitmq.listener.simple.concurrency: 最小的消費者數量 spring.rabbitmq.listener.simple.max-concurrency: 最大的消費者數量 spring.rabbitmq.listener.simple.prefetch: 一個消費者最多可處理的nack消息數量,如果有事務的話,必須大于等于transaction數量. spring.rabbitmq.listener.simple.transaction-size: 當ack模式為auto時,一個事務(ack間)處理的消息數量,最好是小于等于prefetch的數量.若大于prefetch, 則prefetch將增加到這個值 spring.rabbitmq.listener.simple.default-requeue-rejected: 決定被拒絕的消息是否重新入隊;默認是true(與參數acknowledge-mode有關系) spring.rabbitmq.listener.simple.missing-queues-fatal=true 若容器聲明的隊列在代理上不可用,是否失敗; 或者運行時一個多多個隊列被刪除,是否停止容器 spring.rabbitmq.listener.simple.idle-event-interval: 發布空閑容器的時間間隔,單位毫秒 spring.rabbitmq.listener.simple.retry.enabled=false: 監聽重試是否可用 spring.rabbitmq.listener.simple.retry.max-attempts=3: 最大重試次數 spring.rabbitmq.listener.simple.retry.max-interval=10000ms: 最大重試時間間隔 spring.rabbitmq.listener.simple.retry.initial-interval=1000ms:第一次和第二次嘗試傳遞消息的時間間隔 spring.rabbitmq.listener.simple.retry.multiplier=1: 應用于上一重試間隔的乘數 spring.rabbitmq.listener.simple.retry.stateless=true: 重試時有狀態or無狀態 spring.rabbitmq.listener.direct.acknowledge-mode= ack模式 spring.rabbitmq.listener.direct.auto-startup=true 是否在啟動時自動啟動容器 spring.rabbitmq.listener.direct.consumers-per-queue= 每個隊列消費者數量. spring.rabbitmq.listener.direct.default-requeue-rejected= 默認是否將拒絕傳送的消息重新入隊. spring.rabbitmq.listener.direct.idle-event-interval= 空閑容器事件發布時間間隔. spring.rabbitmq.listener.direct.missing-queues-fatal=false若容器聲明的隊列在代理上不可用,是否失敗. spring.rabbitmq.listener.direct.prefetch= 每個消費者可最大處理的nack消息數量. spring.rabbitmq.listener.direct.retry.enabled=false 是否啟用發布重試機制. spring.rabbitmq.listener.direct.retry.initial-interval=1000ms # Duration between the first and second attempt to deliver a message. spring.rabbitmq.listener.direct.retry.max-attempts=3 # Maximum number of attempts to deliver a message. spring.rabbitmq.listener.direct.retry.max-interval=10000ms # Maximum duration between attempts. spring.rabbitmq.listener.direct.retry.multiplier=1 # Multiplier to apply to the previous retry interval. spring.rabbitmq.listener.direct.retry.stateless=true # Whether retries are stateless or stateful. ~~~ ## Template ~~~ spring.rabbitmq.template.mandatory: 啟用強制信息;默認false spring.rabbitmq.template.receive-timeout: receive() 操作的超時時間 spring.rabbitmq.template.reply-timeout: sendAndReceive() 操作的超時時間 spring.rabbitmq.template.retry.enabled=false: 發送重試是否可用 spring.rabbitmq.template.retry.max-attempts=3: 最大重試次數 spring.rabbitmq.template.retry.initial-interva=1000msl: 第一次和第二次嘗試發布或傳遞消息之間的間隔 spring.rabbitmq.template.retry.multiplier=1: 應用于上一重試間隔的乘數 spring.rabbitmq.template.retry.max-interval=10000: 最大重試時間間隔 ~~~ # 多個數據源 但是很多業務場景下可能監聽不同的MQ服務,而每個MQ服務是不同的業務線自己搭建的服務,需要配置多個MQ源。 在springboot 中配置單個RabbitMQ是極其簡單的,我們只需要使用Springboot為我們自動裝配的RabbitMQ相關的配置就可以了。但是需要配置多個源時,第二個及其以上的就需要單獨配置了,這里我使用的都是單獨配置的。 **抽象配置** ~~~ @Data public abstract class AbstractRabbitConfiguration { protected String host; protected int port; protected String username; protected String password; protected ConnectionFactory connectionFactory() { CachingConnectionFactory connectionFactory = new CachingConnectionFactory(); connectionFactory.setHost(host); connectionFactory.setPort(port); connectionFactory.setUsername(username); connectionFactory.setPassword(password); return connectionFactory; } } ~~~ **第一個源的配置代碼** ~~~ import org.springframework.amqp.rabbit.config.SimpleRabbitListenerContainerFactory; import org.springframework.amqp.rabbit.connection.ConnectionFactory; import org.springframework.amqp.rabbit.core.RabbitAdmin; import org.springframework.amqp.rabbit.core.RabbitTemplate; import org.springframework.beans.factory.annotation.Qualifier; import org.springframework.boot.autoconfigure.amqp.SimpleRabbitListenerContainerFactoryConfigurer; import org.springframework.boot.context.properties.ConfigurationProperties; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; import org.springframework.context.annotation.Primary; @Configuration @ConfigurationProperties("spring.rabbitmq.first") public class FirstRabbitConfiguration extends AbstractRabbitConfiguration { @Bean(name = "firstConnectionFactory") @Primary public ConnectionFactory firstConnectionFactory() { return super.connectionFactory(); } @Bean(name = "firstRabbitTemplate") @Primary public RabbitTemplate firstRabbitTemplate(@Qualifier("firstConnectionFactory") ConnectionFactory connectionFactory) { return new RabbitTemplate(connectionFactory); } @Bean(name = "firstFactory") public SimpleRabbitListenerContainerFactory firstFactory(SimpleRabbitListenerContainerFactoryConfigurer configurer, @Qualifier("firstConnectionFactory") ConnectionFactory connectionFactory) { SimpleRabbitListenerContainerFactory factory = new SimpleRabbitListenerContainerFactory(); configurer.configure(factory, connectionFactory); return factory; } @Bean(value = "firstRabbitAdmin") public RabbitAdmin firstRabbitAdmin(@Qualifier("firstConnectionFactory") ConnectionFactory connectionFactory) { return new RabbitAdmin(connectionFactory); } } ~~~ **第二個源的配置代碼** ~~~ import org.springframework.amqp.rabbit.config.SimpleRabbitListenerContainerFactory; import org.springframework.amqp.rabbit.connection.ConnectionFactory; import org.springframework.amqp.rabbit.core.RabbitAdmin; import org.springframework.amqp.rabbit.core.RabbitTemplate; import org.springframework.beans.factory.annotation.Qualifier; import org.springframework.boot.autoconfigure.amqp.SimpleRabbitListenerContainerFactoryConfigurer; import org.springframework.boot.context.properties.ConfigurationProperties; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; @Configuration @ConfigurationProperties("spring.rabbitmq.second") public class SecondRabbitConfiguration extends AbstractRabbitConfiguration { @Bean(name = "secondConnectionFactory") public ConnectionFactory secondConnectionFactory() { return super.connectionFactory(); } @Bean(name = "secondRabbitTemplate") public RabbitTemplate secondRabbitTemplate(@Qualifier("secondConnectionFactory") ConnectionFactory connectionFactory) { return new RabbitTemplate(connectionFactory); } @Bean(name = "secondFactory") public SimpleRabbitListenerContainerFactory secondFactory(SimpleRabbitListenerContainerFactoryConfigurer configurer, @Qualifier("secondConnectionFactory") ConnectionFactory connectionFactory) { SimpleRabbitListenerContainerFactory factory = new SimpleRabbitListenerContainerFactory(); configurer.configure(factory, connectionFactory); return factory; } @Bean(value = "secondRabbitAdmin") public RabbitAdmin secondRabbitAdmin(@Qualifier("secondConnectionFactory") ConnectionFactory connectionFactory) { return new RabbitAdmin(connectionFactory); } } ~~~ **配置信息** ~~~ spring: application: name: multi-rabbitmq rabbitmq: first: host: 192.168.10.76 port: 30509 username: admin password: 123456 second: host: 192.168.10.76 port: 31938 username: admin password: 123456 ~~~ **測試** 這樣我們的兩個RabbitMQ源就配置好了,接下來我們進行測試使用,為了方便使用,我寫了一個MultiRabbitTemplate.class 方便我們使用不同的源。 ~~~ @Component public abstract class MultiRabbitTemplate { @Autowired @Qualifier(value = "firstRabbitTemplate") public AmqpTemplate firstRabbitTemplate; @Autowired @Qualifier(value = "secondRabbitTemplate") public AmqpTemplate secondRabbitTemplate; } ~~~ 第一個消息發送者類 TestFirstSender.class ~~~ @Component @Slf4j public class TestFirstSender extends MultiRabbitTemplate implements MessageSender { @Override public void send(Object msg) { log.info("rabbitmq1 , msg: {}", msg); firstRabbitTemplate.convertAndSend("rabbitmq1", msg); } public void rabbitmq1sender() { this.send("innerpeacez1"); } } ~~~ 第二個消息發送者類 TestSecondSender.class ~~~ @Component @Slf4j public class TestSecondSender extends MultiRabbitTemplate implements MessageSender { @Override public void send(Object msg) { log.info("rabbitmq2 , msg: {}", msg); secondRabbitTemplate.convertAndSend("rabbitmq2", msg); } public void rabbitmq2sender() { this.send("innerpeacez2"); } } ~~~ **動態創建Queue的消費者** ~~~ @Slf4j @Component public class TestFirstConsumer implements MessageConsumer { @Override @RabbitListener(bindings = @QueueBinding(value = @Queue("rabbitmq1") , exchange = @Exchange("rabbitmq1") , key = "rabbitmq1") , containerFactory = "firstFactory") public void receive(Object obj) { log.info("rabbitmq1 , {}", obj); } } ~~~ ~~~ @Slf4j @Component public class TestSecondConsumer implements MessageConsumer { @Override @RabbitListener(bindings = @QueueBinding(value = @Queue("rabbitmq2") , exchange = @Exchange("rabbitmq2") , key = "rabbitmq2") , containerFactory = "secondFactory") public void receive(Object obj) { log.info("rabbitmq2 , {}", obj); } } ~~~ **測試類** ~~~ @RunWith(SpringRunner.class) @SpringBootTest @Slf4j public class SpringBootMultiRabbitmqApplicationTests extends MultiRabbitTemplate { @Autowired private TestFirstSender firstSender; @Autowired private TestSecondSender secondSender; /** * 一百個線程向 First Rabbitmq 的 rabbitmq1 queue中發送一百條消息 */ @Test public void testFirstSender() { for (int i = 0; i < 100; i++) { new Thread(() -> firstSender.rabbitmq1sender() ).start(); } try { Thread.sleep(1000 * 10); } catch (InterruptedException e) { e.printStackTrace(); } } /** * 一百個線程向 Second Rabbitmq 的 rabbitmq2 queue中發送一百條消息 */ @Test public void testSecondSender() { for (int i = 0; i < 100; i++) { new Thread(() -> secondSender.rabbitmq2sender() ).start(); } try { Thread.sleep(1000 * 10); } catch (InterruptedException e) { e.printStackTrace(); } } } ~~~ **也可以這樣使用** 生產者 ~~~ //注入template @Autowired private AmqpTemplate firstRabbitTemplate; //發送消息 public void Sender(){ firstRabbitTemplate.convertAndSend("exchange","tdemo.user.update","user updated "); } ~~~ 消費者 ~~~ // 監聽器監聽指定的queue,這里也可以指定多個隊列 @RabbitListener(queues = {"messages"}, containerFactory="firstListenerContainerFactory") public void processMessage(String message) { System.out.println("received messages is : "+message); } ~~~
                  <ruby id="bdb3f"></ruby>

                  <p id="bdb3f"><cite id="bdb3f"></cite></p>

                    <p id="bdb3f"><cite id="bdb3f"><th id="bdb3f"></th></cite></p><p id="bdb3f"></p>
                      <p id="bdb3f"><cite id="bdb3f"></cite></p>

                        <pre id="bdb3f"></pre>
                        <pre id="bdb3f"><del id="bdb3f"><thead id="bdb3f"></thead></del></pre>

                        <ruby id="bdb3f"><mark id="bdb3f"></mark></ruby><ruby id="bdb3f"></ruby>
                        <pre id="bdb3f"><pre id="bdb3f"><mark id="bdb3f"></mark></pre></pre><output id="bdb3f"></output><p id="bdb3f"></p><p id="bdb3f"></p>

                        <pre id="bdb3f"><del id="bdb3f"><progress id="bdb3f"></progress></del></pre>

                              <ruby id="bdb3f"></ruby>

                              哎呀哎呀视频在线观看