<ruby id="bdb3f"></ruby>

    <p id="bdb3f"><cite id="bdb3f"></cite></p>

      <p id="bdb3f"><cite id="bdb3f"><th id="bdb3f"></th></cite></p><p id="bdb3f"></p>
        <p id="bdb3f"><cite id="bdb3f"></cite></p>

          <pre id="bdb3f"></pre>
          <pre id="bdb3f"><del id="bdb3f"><thead id="bdb3f"></thead></del></pre>

          <ruby id="bdb3f"><mark id="bdb3f"></mark></ruby><ruby id="bdb3f"></ruby>
          <pre id="bdb3f"><pre id="bdb3f"><mark id="bdb3f"></mark></pre></pre><output id="bdb3f"></output><p id="bdb3f"></p><p id="bdb3f"></p>

          <pre id="bdb3f"><del id="bdb3f"><progress id="bdb3f"></progress></del></pre>

                <ruby id="bdb3f"></ruby>

                企業??AI智能體構建引擎,智能編排和調試,一鍵部署,支持知識庫和私有化部署方案 廣告
                [TOC] # RabbitmqAdmin使用 ~~~xml <dependencies> <dependency> <groupId>org.springframework.amqp</groupId> <artifactId>spring-rabbit</artifactId> <version>1.7.3.RELEASE</version> </dependency> </dependencies> ~~~ 容器中納入ConnectionFactory和RabbitAdmin管理 ~~~java @Configuration public class MQConfig { @Bean public ConnectionFactory connectionFactory(){ CachingConnectionFactory factory = new CachingConnectionFactory(); factory.setUri("amqp://zhihao.miao:123456@192.168.1.131:5672"); return factory; } @Bean public RabbitAdmin rabbitAdmin(ConnectionFactory connectionFactory){ return new RabbitAdmin(connectionFactory); } } ~~~ 應用類,使用RabbitAdmin進行Exchange,Queue,Binding操作 ~~~java import org.springframework.amqp.core.*; import org.springframework.amqp.rabbit.core.RabbitAdmin; import org.springframework.context.annotation.AnnotationConfigApplicationContext; import org.springframework.context.annotation.ComponentScan; import java.util.HashMap; import java.util.Map; @ComponentScan public class Application { public static void main(String[] args) { AnnotationConfigApplicationContext context = new AnnotationConfigApplicationContext(Application.class); RabbitAdmin rabbitAdmin = context.getBean(RabbitAdmin.class); System.out.println(rabbitAdmin); //創建四種類型的Exchange,可重復執行 rabbitAdmin.declareExchange(new DirectExchange("zhihao.direct.exchange",true,false)); rabbitAdmin.declareExchange(new TopicExchange("zhihao.topic.exchange",true,false)); rabbitAdmin.declareExchange(new FanoutExchange("zhihao.fanout.exchange",true,false)); rabbitAdmin.declareExchange(new HeadersExchange("zhihao.header.exchange",true,false)); //刪除Exchange //rabbitAdmin.deleteExchange("zhihao.header.exchange"); //定義隊列 rabbitAdmin.declareQueue(new Queue("zhihao.debug",true)); rabbitAdmin.declareQueue(new Queue("zhihao.info",true)); rabbitAdmin.declareQueue(new Queue("zhihao.error",true)); //刪除隊列 //rabbitAdmin.deleteQueue("zhihao.debug"); //將隊列中的消息全消費掉 rabbitAdmin.purgeQueue("zhihao.info",false); //綁定,指定要綁定的Exchange和Route key rabbitAdmin.declareBinding(new Binding("zhihao.debug",Binding.DestinationType.QUEUE, "zhihao.direct.exchange","zhihao.hehe",new HashMap())); rabbitAdmin.declareBinding(new Binding("zhihao.info",Binding.DestinationType.QUEUE, "zhihao.direct.exchange","zhihao.haha",new HashMap())); rabbitAdmin.declareBinding(new Binding("zhihao.error",Binding.DestinationType.QUEUE, "zhihao.direct.exchange","zhihao.welcome",new HashMap())); //綁定header exchange Map<String,Object> headerValues = new HashMap<>(); headerValues.put("type",1); headerValues.put("size",10); //whereAll指定了x-match: all參數 rabbitAdmin.declareBinding(BindingBuilder.bind(new Queue("zhihao.debug")). to(new HeadersExchange("zhihao.header.exchange")).whereAll(headerValues).match()); //whereAll指定了x-match: any參數 rabbitAdmin.declareBinding(BindingBuilder.bind(new Queue("zhihao.info")). to(new HeadersExchange("zhihao.header.exchange")).whereAny(headerValues).match()); //進行解綁 rabbitAdmin.removeBinding(BindingBuilder.bind(new Queue("zhihao.info")). to(new TopicExchange("zhihao.direct.exchange")).with("zhihao.info")); //聲明topic類型的exchange rabbitAdmin.declareExchange(new TopicExchange("zhihao.hehe.exchange",true,false)); rabbitAdmin.declareExchange(new TopicExchange("zhihao.miao.exchange",true,false)); //exchange與exchange綁定 rabbitAdmin.declareBinding(new Binding("zhihao.hehe.exchange",Binding.DestinationType.EXCHANGE, "zhihao.miao.exchange","zhihao",new HashMap())); //使用BindingBuilder進行綁定 rabbitAdmin.declareBinding(BindingBuilder.bind(new Queue("zhihao.debug")). to(new TopicExchange("zhihao.topic.exchange")).with("zhihao.miao")); //rabbitAdmin.declareBinding(new Binding("amq.rabbitmq.trace",Binding.DestinationType.EXCHANGE, //"amq.rabbitmq.log","zhihao",new HashMap())); context.close(); } } ~~~ **Exchange ,Queue,Binding的自動聲明** 1. 直接把要自動聲明的組件Bean納入到spring容器中管理即可。 自動聲明發生的rabbitmq第一次連接創建的時候。如果系統從啟動到停止沒有創建任何連接,則不會自動創建。 2. 自定聲明支持單個和多個。 **自動聲明Exchange**: ~~~java import org.springframework.amqp.core.*; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; @Configuration public class DeclareConfig { //聲明direct類型的Exchange @Bean public Exchange directExchange(){ return new DirectExchange("zhihao.direct.exchange",true,false); } //聲明topic類型的Exchange @Bean public Exchange topicExchange(){ return new TopicExchange("zhihao.topic.exchange",true,false); } //聲明fanout類型的Exchange @Bean public Exchange fanoutExchange(){ return new FanoutExchange("zhihao.fanout.exchange",true,false); } //聲明headers類型的Exchange @Bean public Exchange headersExchange(){ return new HeadersExchange("zhihao.header.exchange",true,false); } } ~~~ 配置類,在spring容器中納入ConnectionFactory實例和RabbitAdmin實例 ~~~java import org.springframework.amqp.rabbit.connection.CachingConnectionFactory; import org.springframework.amqp.rabbit.connection.ConnectionFactory; import org.springframework.amqp.rabbit.core.RabbitAdmin; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; @Configuration public class MQConfig { @Bean public ConnectionFactory connectionFactory(){ CachingConnectionFactory factory = new CachingConnectionFactory(); factory.setUri("amqp://zhihao.miao:123456@192.168.1.131:5672"); return factory; } @Bean public RabbitAdmin rabbitAdmin(ConnectionFactory connectionFactory){ return new RabbitAdmin(connectionFactory); } } ~~~ 啟動應用類,自動聲明發生的rabbitmq第一次連接創建的時候。如果系統從啟動到停止沒有創建任何連接,則不會自動創建。 ~~~ import org.springframework.amqp.rabbit.core.RabbitAdmin; import org.springframework.context.annotation.AnnotationConfigApplicationContext; import org.springframework.context.annotation.ComponentScan; @ComponentScan public class Application { public static void main(String[] args) { AnnotationConfigApplicationContext context = new AnnotationConfigApplicationContext(Application.class); //使得客戶端第一次連接rabbitmq context.getBean(RabbitAdmin.class).getQueueProperties("**"); context.close(); } } ~~~ **隊列的自動聲明** ~~~java import org.springframework.amqp.core.*; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; @Configuration public class DeclareConfig { @Bean public Queue debugQueue(){ return new Queue("zhihao.debug",true); } @Bean public Queue infoQueue(){ return new Queue("zhihao.info",true); } @Bean public Queue errorQueue(){ return new Queue("zhihao.error",true); } } ~~~ 上面的Application和DeclareConfig不列舉出來了,執行Application應用啟動類,查看web管控臺的隊列生成 **綁定的自動生成** DeclareConfig類中, ~~~ import org.springframework.amqp.core.Binding; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; import java.util.HashMap; @Configuration public class DeclareConfig { @Bean public Binding binding(){ return new Binding("zhihao.debug",Binding.DestinationType.QUEUE, "zhihao.direct.exchange","zhihao.debug",new HashMap()); } @Bean public Binding binding2(){ return new Binding("zhihao.info",Binding.DestinationType.QUEUE, "zhihao.direct.exchange","zhihao.info",new HashMap()); } @Bean public Binding binding3(){ return new Binding("zhihao.error",Binding.DestinationType.QUEUE, "zhihao.direct.exchange","zhihao.error",new HashMap()); } } ~~~ **一次性生成多個queue,exchange,binding** ~~~ import org.springframework.amqp.core.*; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; import java.util.ArrayList; import java.util.HashMap; import java.util.List; @Configuration public class DeclareConfig { @Bean public List<Queue> queues(){ List<Queue> queueList = new ArrayList<>(); queueList.add(new Queue("chao.wang.debug",true)); queueList.add(new Queue("chao.wang.info",true)); queueList.add(new Queue("chao.wang.error",true)); return queueList; } @Bean public List<Exchange> exchanges(){ List<Exchange> exchangeList = new ArrayList<>(); exchangeList.add(new TopicExchange("chao.wang.debug.topic.exchange",true,false)); exchangeList.add(new TopicExchange("chao.wang.info.topic.exchange",true,false)); exchangeList.add(new TopicExchange("chao.wang.error.topic.exchange",true,false)); return exchangeList; } @Bean public List<Binding> bindings(){ List<Binding> bindingList = new ArrayList<>(); bindingList.add(BindingBuilder.bind(new Queue("chao.wang.debug")). to(new TopicExchange("chao.wang.debug.topic.exchange")).with("chao.wang.#")); bindingList.add(BindingBuilder.bind(new Queue("chao.wang.info")). to(new TopicExchange("chao.wang.debug.topic.exchange")).with("chao.wang.*")); bindingList.add(BindingBuilder.bind(new Queue("chao.wang.error")). to(new TopicExchange("chao.wang.debug.topic.exchange")).with("chao.wang.error.*")); return bindingList; } } ~~~ 上面的Application和DeclareConfig不列舉出來了,執行Application應用啟動類,查看web管控臺Exchange,Queue,Binding都已經生成。 **注意** 當聲明隊列是以amp開頭的時候,隊列是不能創建聲明的。 ~~~java @Bean public Queue amqQueue(){ return new Queue("amp.log",true); } ~~~ # 總結 **自動聲明的一些條件** > * 要有連接(對rabbitmq的連接) > * 容器中要有`org.springframework.amqp.rabbit.core.RabbitAdmin`的實例 > * `RabbitAdmin`的`autoStartup`屬性必須為true。 > * 如果`ConnectionFactory`使用的是`CachingConnectionFactory`,則`cacheMode`必須是`CachingConnectionFactory.CacheMode.CHANNEL`(默認)。 > * 所要聲明的組件(`Queue`,`Exchange`和`Binding`)的`shouldDeclare`必須是`true`(默認就是`true`) > * `Queue`隊列的名字不能以`amq.`開頭。 注意:`Queue`,`Exchange`和`Binding`都直接或者間接的繼承`Declarable`,而`Declarable`中定義了`shouldDeclare`的方法。 # 自動聲明源碼分析 `org.springframework.amqp.rabbit.core.RabbitAdmin`實現`InitializingBean`接口,在`BeanFactory`設置完所有屬性之后執行特定初始化(`afterPropertiesSet`方法) `RabbitAdmin`的`afterPropertiesSet`方法, ~~~ @Override public void afterPropertiesSet() { synchronized (this.lifecycleMonitor) { //autoStartup屬性的值為false的時候,直接return if (this.running || !this.autoStartup) { return; } //connectionFactory實例如果是CachingConnectionFactory,并且CacheMode是CacheMode.CONNECTION也會return下面不執行了。 if (this.connectionFactory instanceof CachingConnectionFactory && ((CachingConnectionFactory) this.connectionFactory).getCacheMode() == CacheMode.CONNECTION) { this.logger.warn("RabbitAdmin auto declaration is not supported with CacheMode.CONNECTION"); return; } //連接的監聽器 this.connectionFactory.addConnectionListener(new ConnectionListener() { // Prevent stack overflow... private final AtomicBoolean initializing = new AtomicBoolean(false); @Override public void onCreate(Connection connection) { if (!initializing.compareAndSet(false, true)) { // If we are already initializing, we don't need to do it again... return; } try { //執行這個方法 initialize(); } finally { initializing.compareAndSet(true, false); } } @Override public void onClose(Connection connection) { } }); this.running = true; } } ~~~ `RabbitAdmin`的`initialize`方法,聲明所有`exchanges`,`queues`和`bindings` ~~~ /** * Declares all the exchanges, queues and bindings in the enclosing application context, if any. It should be safe * (but unnecessary) to call this method more than once. */ public void initialize() { if (this.applicationContext == null) { this.logger.debug("no ApplicationContext has been set, cannot auto-declare Exchanges, Queues, and Bindings"); return; } this.logger.debug("Initializing declarations"); //得到容器中所有的Exchange Collection<Exchange> contextExchanges = new LinkedList<Exchange>( this.applicationContext.getBeansOfType(Exchange.class).values()); //得到容器中所有的Queue Collection<Queue> contextQueues = new LinkedList<Queue>( this.applicationContext.getBeansOfType(Queue.class).values()); //得到容器中所有的Binding Collection<Binding> contextBindings = new LinkedList<Binding>( this.applicationContext.getBeansOfType(Binding.class).values()); //獲取容器中所有的Collection,如果容器中所有元素是Exchange,Queue或者Binding的時候將這些實例也加入到spring容器中。 @SuppressWarnings("rawtypes") Collection<Collection> collections = this.applicationContext.getBeansOfType(Collection.class, false, false) .values(); for (Collection<?> collection : collections) { if (collection.size() > 0 && collection.iterator().next() instanceof Declarable) { for (Object declarable : collection) { if (declarable instanceof Exchange) { contextExchanges.add((Exchange) declarable); } else if (declarable instanceof Queue) { contextQueues.add((Queue) declarable); } else if (declarable instanceof Binding) { contextBindings.add((Binding) declarable); } } } } //進行了filter過濾, final Collection<Exchange> exchanges = filterDeclarables(contextExchanges); final Collection<Queue> queues = filterDeclarables(contextQueues); final Collection<Binding> bindings = filterDeclarables(contextBindings); for (Exchange exchange : exchanges) { if ((!exchange.isDurable() || exchange.isAutoDelete()) && this.logger.isInfoEnabled()) { this.logger.info("Auto-declaring a non-durable or auto-delete Exchange (" + exchange.getName() + ") durable:" + exchange.isDurable() + ", auto-delete:" + exchange.isAutoDelete() + ". " + "It will be deleted by the broker if it shuts down, and can be redeclared by closing and " + "reopening the connection."); } } for (Queue queue : queues) { if ((!queue.isDurable() || queue.isAutoDelete() || queue.isExclusive()) && this.logger.isInfoEnabled()) { this.logger.info("Auto-declaring a non-durable, auto-delete, or exclusive Queue (" + queue.getName() + ") durable:" + queue.isDurable() + ", auto-delete:" + queue.isAutoDelete() + ", exclusive:" + queue.isExclusive() + ". " + "It will be redeclared if the broker stops and is restarted while the connection factory is " + "alive, but all messages will be lost."); } } this.rabbitTemplate.execute(new ChannelCallback<Object>() { @Override public Object doInRabbit(Channel channel) throws Exception { //聲明exchange,如果exchange是默認的exchange那么也不會聲明。 declareExchanges(channel, exchanges.toArray(new Exchange[exchanges.size()])); //聲明隊列,如果隊列名以amq.開頭的也不會進行聲明 declareQueues(channel, queues.toArray(new Queue[queues.size()])); declareBindings(channel, bindings.toArray(new Binding[bindings.size()])); return null; } }); this.logger.debug("Declarations finished"); } ~~~ `filterDeclarables`方法過濾一些`Exchange`,`Queue`,`Binding`,因為這三個類都是繼承`Declarable這個類`, ~~~ private <T extends Declarable> Collection<T> filterDeclarables(Collection<T> declarables) { Collection<T> filtered = new ArrayList<T>(); for (T declarable : declarables) { Collection<?> adminsWithWhichToDeclare = declarable.getDeclaringAdmins(); //shouldDeclare屬性必須是true,否則就會被過濾掉了 if (declarable.shouldDeclare() && (adminsWithWhichToDeclare.isEmpty() || adminsWithWhichToDeclare.contains(this))) { filtered.add(declarable); } } return filtered; } ~~~ 聲明Exchanges ~~~ private void declareExchanges(final Channel channel, final Exchange... exchanges) throws IOException { for (final Exchange exchange : exchanges) { if (this.logger.isDebugEnabled()) { this.logger.debug("declaring Exchange '" + exchange.getName() + "'"); } //不是默認的Exchange if (!isDeclaringDefaultExchange(exchange)) { try { //是否是delayed類型的Exchange if (exchange.isDelayed()) { Map<String, Object> arguments = exchange.getArguments(); if (arguments == null) { arguments = new HashMap<String, Object>(); } else { arguments = new HashMap<String, Object>(arguments); } arguments.put("x-delayed-type", exchange.getType()); //調用exchangeDeclare進行聲明 channel.exchangeDeclare(exchange.getName(), DELAYED_MESSAGE_EXCHANGE, exchange.isDurable(), exchange.isAutoDelete(), exchange.isInternal(), arguments); } else { //調用exchangeDeclare進行聲明 channel.exchangeDeclare(exchange.getName(), exchange.getType(), exchange.isDurable(), exchange.isAutoDelete(), exchange.isInternal(), exchange.getArguments()); } } catch (IOException e) { logOrRethrowDeclarationException(exchange, "exchange", e); } } } } ~~~ 聲明Queue隊列 ~~~ private DeclareOk[] declareQueues(final Channel channel, final Queue... queues) throws IOException { List<DeclareOk> declareOks = new ArrayList<DeclareOk>(queues.length); for (int i = 0; i < queues.length; i++) { Queue queue = queues[i]; //隊列不以amq.開頭的隊列才能進行聲明 if (!queue.getName().startsWith("amq.")) { if (this.logger.isDebugEnabled()) { this.logger.debug("declaring Queue '" + queue.getName() + "'"); } try { try { //進行隊列聲明 DeclareOk declareOk = channel.queueDeclare(queue.getName(), queue.isDurable(), queue.isExclusive(), queue.isAutoDelete(), queue.getArguments()); declareOks.add(declareOk); } catch (IllegalArgumentException e) { if (this.logger.isDebugEnabled()) { this.logger.error("Exception while declaring queue: '" + queue.getName() + "'"); } try { if (channel instanceof ChannelProxy) { ((ChannelProxy) channel).getTargetChannel().close(); } } catch (TimeoutException e1) { } throw new IOException(e); } } catch (IOException e) { logOrRethrowDeclarationException(queue, "queue", e); } } this.logger.debug("Queue with name that starts with 'amq.' cannot be declared."); } return declareOks.toArray(new DeclareOk[declareOks.size()]); } ~~~ binding聲明: ~~~java private void declareBindings(final Channel channel, final Binding... bindings) throws IOException { for (Binding binding : bindings) { if (this.logger.isDebugEnabled()) { this.logger.debug("Binding destination [" + binding.getDestination() + " (" + binding.getDestinationType() + ")] to exchange [" + binding.getExchange() + "] with routing key [" + binding.getRoutingKey() + "]"); } try { //QUEUE類型的綁定 if (binding.isDestinationQueue()) { //并且不是綁定到默認的Default Exchange if (!isDeclaringImplicitQueueBinding(binding)) { //綁定隊列 channel.queueBind(binding.getDestination(), binding.getExchange(), binding.getRoutingKey(), binding.getArguments()); } } else { //Exchange類型的綁定 channel.exchangeBind(binding.getDestination(), binding.getExchange(), binding.getRoutingKey(), binding.getArguments()); } } catch (IOException e) { logOrRethrowDeclarationException(binding, "binding", e); } } } ~~~
                  <ruby id="bdb3f"></ruby>

                  <p id="bdb3f"><cite id="bdb3f"></cite></p>

                    <p id="bdb3f"><cite id="bdb3f"><th id="bdb3f"></th></cite></p><p id="bdb3f"></p>
                      <p id="bdb3f"><cite id="bdb3f"></cite></p>

                        <pre id="bdb3f"></pre>
                        <pre id="bdb3f"><del id="bdb3f"><thead id="bdb3f"></thead></del></pre>

                        <ruby id="bdb3f"><mark id="bdb3f"></mark></ruby><ruby id="bdb3f"></ruby>
                        <pre id="bdb3f"><pre id="bdb3f"><mark id="bdb3f"></mark></pre></pre><output id="bdb3f"></output><p id="bdb3f"></p><p id="bdb3f"></p>

                        <pre id="bdb3f"><del id="bdb3f"><progress id="bdb3f"></progress></del></pre>

                              <ruby id="bdb3f"></ruby>

                              哎呀哎呀视频在线观看