<ruby id="bdb3f"></ruby>

    <p id="bdb3f"><cite id="bdb3f"></cite></p>

      <p id="bdb3f"><cite id="bdb3f"><th id="bdb3f"></th></cite></p><p id="bdb3f"></p>
        <p id="bdb3f"><cite id="bdb3f"></cite></p>

          <pre id="bdb3f"></pre>
          <pre id="bdb3f"><del id="bdb3f"><thead id="bdb3f"></thead></del></pre>

          <ruby id="bdb3f"><mark id="bdb3f"></mark></ruby><ruby id="bdb3f"></ruby>
          <pre id="bdb3f"><pre id="bdb3f"><mark id="bdb3f"></mark></pre></pre><output id="bdb3f"></output><p id="bdb3f"></p><p id="bdb3f"></p>

          <pre id="bdb3f"><del id="bdb3f"><progress id="bdb3f"></progress></del></pre>

                <ruby id="bdb3f"></ruby>

                ??一站式輕松地調用各大LLM模型接口,支持GPT4、智譜、豆包、星火、月之暗面及文生圖、文生視頻 廣告
                這篇文章,我們開始 Spring AMQP 項目實戰旅程。 ## 介紹 通過這個項目實戰旅程,你會學習到如何使用 Spring Boot 整合 Spring AMQP,并且使用 RabbitMQ 的消息隊列機制發送郵件。其中,消息生產者負責將用戶的郵件消息發送至消息隊列,而消息消費者從消息隊列中獲取郵件消息進行發送。這個過程,你可以理解成郵局:當你將要發布的郵件放在郵箱中時,您可以確信郵差最終會將郵件發送給收件人。 ## 準備 本教程假定 RabbitMQ 已在標準端口(5672) 的 localhost 上安裝并運行。如果使用不同的主機,端口,連接設置將需要調整。 ~~~null host = localhost · password = guest port = 5672 vhost = / ~~~ ## 實戰旅程 ### 準備工作 這個實戰教程會構建兩個工程項目:email-server-producer 與 email-server-consumer。其中,email-server-producer 是消息生產者工程,email-server-consumer 是消息消費者工程。 **在教程的最后,我會將完整的代碼提交至 github 上面,你可以結合源碼來閱讀這個教程,會有更好的效果。** 現在開始旅程吧。我們使用 Spring Boot 整合 Spring AMQP,并通過 Maven 構建依賴關系。(由于篇幅的問題,我并不會粘貼完整的 pom.xml 配置信息,你可以在 github 源碼中查看完整的配置文件) ~~~xml <dependencies> <!-- spring boot--> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter</artifactId> <exclusions> <exclusion> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-logging</artifactId> </exclusion> </exclusions> </dependency> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-test</artifactId> <scope>test</scope> </dependency> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-amqp</artifactId> </dependency> <dependency> <groupId>org.springframework</groupId> <artifactId>spring-context-support</artifactId> </dependency> <dependency> <groupId>javax.mail</groupId> <artifactId>mail</artifactId> <version>${javax.mail.version}</version> </dependency> </dependencies> ~~~ ### 構建消息生產者 [![](https://gitee.com/chenssy/blog-home/raw/master/image/201810/spring-amqp-email-list1.PNG)](https://gitee.com/chenssy/blog-home/raw/master/image/201810/spring-amqp-email-list1.PNG) 我們使用 Java Config 的方式配置消息生產者。 ~~~java @Configuration @ComponentScan(basePackages = {"com.lianggzone.rabbitmq"}) @PropertySource(value = {"classpath:application.properties"}) public class RabbitMQConfig { @Autowired private Environment env; @Bean public ConnectionFactory connectionFactory() throws Exception { ConnectionFactory connectionFactory = new ConnectionFactory(); connectionFactory.setHost(env.getProperty("mq.host").trim()); connectionFactory.setPort(Integer.parseInt(env.getProperty("mq.port").trim())); connectionFactory.setVirtualHost(env.getProperty("mq.vhost").trim()); connectionFactory.setUsername(env.getProperty("mq.username").trim()); connectionFactory.setPassword(env.getProperty("mq.password").trim()); return connectionFactory; } @Bean public CachingConnectionFactory cachingConnectionFactory() throws Exception { return new CachingConnectionFactory(connectionFactory()); } @Bean public RabbitTemplate rabbitTemplate() throws Exception { RabbitTemplate rabbitTemplate = new RabbitTemplate(cachingConnectionFactory()); rabbitTemplate.setChannelTransacted(true); return rabbitTemplate; } @Bean public AmqpAdmin amqpAdmin() throws Exception { return new RabbitAdmin(cachingConnectionFactory()); } @Bean Queue queue() { String name = env.getProperty("mq.queue").trim(); // 是否持久化 boolean durable = StringUtils.isNotBlank(env.getProperty("mq.queue.durable").trim())? Boolean.valueOf(env.getProperty("mq.queue.durable").trim()) : true; // 僅創建者可以使用的私有隊列,斷開后自動刪除 boolean exclusive = StringUtils.isNotBlank(env.getProperty("mq.queue.exclusive").trim())? Boolean.valueOf(env.getProperty("mq.queue.exclusive").trim()) : false; // 當所有消費客戶端連接斷開后,是否自動刪除隊列 boolean autoDelete = StringUtils.isNotBlank(env.getProperty("mq.queue.autoDelete").trim())? Boolean.valueOf(env.getProperty("mq.queue.autoDelete").trim()) : false; return new Queue(name, durable, exclusive, autoDelete); } @Bean TopicExchange exchange() { String name = env.getProperty("mq.exchange").trim(); // 是否持久化 boolean durable = StringUtils.isNotBlank(env.getProperty("mq.exchange.durable").trim())? Boolean.valueOf(env.getProperty("mq.exchange.durable").trim()) : true; // 當所有消費客戶端連接斷開后,是否自動刪除隊列 boolean autoDelete = StringUtils.isNotBlank(env.getProperty("mq.exchange.autoDelete").trim())? Boolean.valueOf(env.getProperty("mq.exchange.autoDelete").trim()) : false; return new TopicExchange(name, durable, autoDelete); } @Bean Binding binding() { String routekey = env.getProperty("mq.routekey").trim(); return BindingBuilder.bind(queue()).to(exchange()).with(routekey); } } ~~~ 其中,定義了隊列、交換器,以及綁定。事實上,通過這種方式當隊列或交換器不存在的時候,Spring AMQP 會自動創建它。(如果你不希望自動創建,可以在 RabbitMQ 的管理后臺開通隊列和交換器,并注釋掉 queue() 方法和 exchange() 方法)。此外,我們為了更好地擴展,將創建隊列或交換器的配置信息抽離到了配置文件 application.properties。其中,還包括 RabbitMQ 的配置信息。 ~~~null mq.host=localhost mq.username=guest mq.password=guest mq.port=5672 mq.vhost=/ mq.exchange=email_exchange mq.exchange.durable=true mq.exchange.autoDelete=false mq.queue=email_queue mq.queue.durable=true mq.queue.exclusive=false mq.queue.autoDelete=false mq.routekey=email_routekey ~~~ 此外,假設一個生產者發送到一個交換器,而一個消費者從一個隊列接收消息。此時,將隊列綁定到交換器對于連接這些生產者和消費者至關重要。在 Spring AMQP 中,我們定義一個 Binding 類來表示這些連接。我們使用 BindingBuilder 來構建 “流式的 API” 風格。 ~~~java BindingBuilder.bind(queue()).to(exchange()).with(routekey); ~~~ 現在,我們離大功告成已經很近了,需要再定義一個發送郵件任務存入消息隊列的方法。此時,為了更好地擴展,我們定義一個接口和一個實現類,基于接口編程嘛。 ~~~java public interface EmailService { /** * 發送郵件任務存入消息隊列 * @param message * @throws Exception */ void sendEmail(String message) throws Exception; } ~~~ 它的實現類中重寫 sendEmail() 方法,將消息轉碼并寫入到消息隊列中。 ~~~java @Service public class EmailServiceImpl implements EmailService{ private static Logger logger = LoggerFactory.getLogger(EmailServiceImpl.class); @Resource( name = "rabbitTemplate" ) private RabbitTemplate rabbitTemplate; @Value("${mq.exchange}") private String exchange; @Value("${mq.routekey}") private String routeKey; @Override public void sendEmail(String message) throws Exception { try { rabbitTemplate.convertAndSend(exchange, routeKey, message); }catch (Exception e){ logger.error("EmailServiceImpl.sendEmail", ExceptionUtils.getMessage(e)); } } } ~~~ 那么,我們再模擬一個 RESTful API 接口調用的場景,來模擬真實的場景。 ~~~java @RestController() @RequestMapping(value = "/v1/emails") public class EmailController { @Resource private EmailService emailService; @RequestMapping(method = RequestMethod.POST) public JSONObject add(@RequestBody JSONObject jsonObject) throws Exception { emailService.sendEmail(jsonObject.toJSONString()); return jsonObject; } } ~~~ 最后,再寫一個 main 方法,將 Spring Boot 服務運行起來吧。 ~~~java @RestController @EnableAutoConfiguration @ComponentScan(basePackages = {"com.lianggzone.rabbitmq"}) public class WebMain { public static void main(String[] args) throws Exception { SpringApplication.run(WebMain.class, args); } } ~~~ 至此,已經大功告成了。我們可以通過 Postman 發送一個 HTTP 請求。(Postman是一款功能強大的網頁調試與發送網頁HTTP請求的Chrome插件。) ~~~null { "to":"lianggzone@163.com", "subject":"email-server-producer", "text":"<html><head></head><body><h1>郵件測試</h1><p>hello!this is mail test。</p></body></html>" } ~~~ 請參見圖示。 [![](https://gitee.com/chenssy/blog-home/raw/master/image/201810/spring-amqp-email.PNG)](https://gitee.com/chenssy/blog-home/raw/master/image/201810/spring-amqp-email.PNG) 來看看 RabbitMQ 的管理后臺吧,它會出現一個未處理的消息。(地址:http://localhost:15672/#/queues) [![](https://gitee.com/chenssy/blog-home/raw/master/image/201810/spring-amqp-email-admin.PNG)](https://gitee.com/chenssy/blog-home/raw/master/image/201810/spring-amqp-email-admin.PNG) 注意的是,千萬別向我的郵箱發測試消息喲,不然我的郵箱會郵件爆炸的/(ㄒoㄒ)/~。~ ### 構建消息消費者 [![](https://gitee.com/chenssy/blog-home/raw/master/image/201810/spring-amqp-email-list2.PNG)](https://gitee.com/chenssy/blog-home/raw/master/image/201810/spring-amqp-email-list2.PNG) 完成消息生產者之后,我們再來構建一個消息消費者的工程。同樣地,我們使用 Java Config 的方式配置消息消費者。 ~~~java @Configuration @ComponentScan(basePackages = {"com.lianggzone.rabbitmq"}) @PropertySource(value = {"classpath:application.properties"}) public class RabbitMQConfig { @Autowired private Environment env; @Bean public ConnectionFactory connectionFactory() throws Exception { ConnectionFactory connectionFactory = new ConnectionFactory(); connectionFactory.setHost(env.getProperty("mq.host").trim()); connectionFactory.setPort(Integer.parseInt(env.getProperty("mq.port").trim())); connectionFactory.setVirtualHost(env.getProperty("mq.vhost").trim()); connectionFactory.setUsername(env.getProperty("mq.username").trim()); connectionFactory.setPassword(env.getProperty("mq.password").trim()); return connectionFactory; } @Bean public CachingConnectionFactory cachingConnectionFactory() throws Exception { return new CachingConnectionFactory(connectionFactory()); } @Bean public RabbitTemplate rabbitTemplate() throws Exception { RabbitTemplate rabbitTemplate = new RabbitTemplate(cachingConnectionFactory()); rabbitTemplate.setChannelTransacted(true); return rabbitTemplate; } @Bean public AmqpAdmin amqpAdmin() throws Exception { return new RabbitAdmin(cachingConnectionFactory()); } @Bean public SimpleMessageListenerContainer listenerContainer( @Qualifier("mailMessageListenerAdapter") MailMessageListenerAdapter mailMessageListenerAdapter) throws Exception { String queueName = env.getProperty("mq.queue").trim(); SimpleMessageListenerContainer simpleMessageListenerContainer = new SimpleMessageListenerContainer(cachingConnectionFactory()); simpleMessageListenerContainer.setQueueNames(queueName); simpleMessageListenerContainer.setMessageListener(mailMessageListenerAdapter); // 設置手動 ACK simpleMessageListenerContainer.setAcknowledgeMode(AcknowledgeMode.MANUAL); return simpleMessageListenerContainer; } } ~~~ 聰明的你,應該發現了其中的不同。這個代碼中多了一個 listenerContainer() 方法。是的,它是一個監聽器容器,用來監聽消息隊列進行消息處理的。注意的是,我們這里設置手動 ACK 的方式。默認的情況下,它采用自動應答,這種方式中消息隊列會發送消息后立即從消息隊列中刪除該消息。此時,我們通過手動 ACK 方式,如果消費者因宕機或鏈接失敗等原因沒有發送 ACK,RabbitMQ 會將消息重新發送給其他監聽在隊列的下一個消費者,保證消息的可靠性。 當然,我們也定義 application.properties 配置文件。 ~~~null mq.host=localhost mq.username=guest mq.password=guest mq.port=5672 mq.vhost=/ mq.queue=email_queue ~~~ 此外,我們創建了一個 MailMessageListenerAdapter 類來消費消息。 ~~~java @Component("mailMessageListenerAdapter") public class MailMessageListenerAdapter extends MessageListenerAdapter { @Resource private JavaMailSender mailSender; @Value("${mail.username}") private String mailUsername; @Override public void onMessage(Message message, Channel channel) throws Exception { try { // 解析RabbitMQ消息體 String messageBody = new String(message.getBody()); MailMessageModel mailMessageModel = JSONObject.toJavaObject(JSONObject.parseObject(messageBody), MailMessageModel.class); // 發送郵件 String to = mailMessageModel.getTo(); String subject = mailMessageModel.getSubject(); String text = mailMessageModel.getText(); sendHtmlMail(to, subject, text); // 手動ACK channel.basicAck(message.getMessageProperties().getDeliveryTag(), false); }catch (Exception e){ e.printStackTrace(); } } /** * 發送郵件 * @param to * @param subject * @param text * @throws Exception */ private void sendHtmlMail(String to, String subject, String text) throws Exception { MimeMessage mimeMessage = mailSender.createMimeMessage(); MimeMessageHelper mimeMessageHelper = new MimeMessageHelper(mimeMessage); mimeMessageHelper.setFrom(mailUsername); mimeMessageHelper.setTo(to); mimeMessageHelper.setSubject(subject); mimeMessageHelper.setText(text, true); // 發送郵件 mailSender.send(mimeMessage); } } ~~~ 在 onMessage() 方法中,我們完成了三件事情: 1\. 從 RabbitMQ 的消息隊列中解析消息體。 1\. 根據消息體的內容,發送郵件給目標的郵箱。 1\. 手動應答 ACK,讓消息隊列刪除該消息。 這里,JSONObject.toJavaObject() 方法使用 fastjson 將 json 字符串轉換成實體對象 MailMessageModel。注意的是,@Data 是 lombok 類庫的一個注解。 ~~~java @Data public class MailMessageModel { @JSONField(name = "from") private String from; @JSONField(name = "to") private String to; @JSONField(name = "subject") private String subject; @JSONField(name = "text") private String text; @Override public String toString() { StringBuffer sb = new StringBuffer(); sb.append("Email{from:").append(this.from).append(", "); sb.append("to:").append(this.to).append(", "); sb.append("subject:").append(this.subject).append(", "); sb.append("text:").append(this.text).append("}"); return sb.toString(); } } ~~~ Spring 對 Java Mail 有很好的支持。其中,郵件包括幾種類型:簡單文本的郵件、 HTML 文本的郵件、 內嵌圖片的郵件、 包含附件的郵件。這里,我們封裝了一個簡單的 sendHtmlMail() 進行郵件發送。 對了,我們還少了一個郵件的配置類。 ~~~java @Configuration @PropertySource(value = {"classpath:mail.properties"}) @ComponentScan(basePackages = {"com.lianggzone.rabbitmq"}) public class EmailConfig { @Autowired private Environment env; @Bean(name = "mailSender") public JavaMailSender mailSender() { // 創建郵件發送器, 主要提供了郵件發送接口、透明創建Java Mail的MimeMessage、及郵件發送的配置 JavaMailSenderImpl mailSender = new JavaMailSenderImpl(); // 如果為普通郵箱, 非ssl認證等 mailSender.setHost(env.getProperty("mail.host").trim()); mailSender.setPort(Integer.parseInt(env.getProperty("mail.port").trim())); mailSender.setUsername(env.getProperty("mail.username").trim()); mailSender.setPassword(env.getProperty("mail.password").trim()); mailSender.setDefaultEncoding("utf-8"); // 配置郵件服務器 Properties props = new Properties(); // 讓服務器進行認證,認證用戶名和密碼是否正確 props.put("mail.smtp.auth", "true"); props.put("mail.smtp.timeout", "25000"); mailSender.setJavaMailProperties(props); return mailSender; } } ~~~ 這些配置信息,我們在配置文件 mail.properties 中維護。 ~~~null mail.host=smtp.163.com mail.port=25 mail.username=用戶名 mail.password=密碼 ~~~ 最后,我們寫一個 main 方法,將 Spring Boot 服務運行起來吧。 至此,我們也完成了一個消息消費者的工程,它將不斷地從消息隊列中處理郵件消息。 ## 源代碼 > 相關示例完整代碼:[https://github.com/lianggzone/rabbitmq-server](https://github.com/lianggzone/rabbitmq-server)
                  <ruby id="bdb3f"></ruby>

                  <p id="bdb3f"><cite id="bdb3f"></cite></p>

                    <p id="bdb3f"><cite id="bdb3f"><th id="bdb3f"></th></cite></p><p id="bdb3f"></p>
                      <p id="bdb3f"><cite id="bdb3f"></cite></p>

                        <pre id="bdb3f"></pre>
                        <pre id="bdb3f"><del id="bdb3f"><thead id="bdb3f"></thead></del></pre>

                        <ruby id="bdb3f"><mark id="bdb3f"></mark></ruby><ruby id="bdb3f"></ruby>
                        <pre id="bdb3f"><pre id="bdb3f"><mark id="bdb3f"></mark></pre></pre><output id="bdb3f"></output><p id="bdb3f"></p><p id="bdb3f"></p>

                        <pre id="bdb3f"><del id="bdb3f"><progress id="bdb3f"></progress></del></pre>

                              <ruby id="bdb3f"></ruby>

                              哎呀哎呀视频在线观看