<ruby id="bdb3f"></ruby>

    <p id="bdb3f"><cite id="bdb3f"></cite></p>

      <p id="bdb3f"><cite id="bdb3f"><th id="bdb3f"></th></cite></p><p id="bdb3f"></p>
        <p id="bdb3f"><cite id="bdb3f"></cite></p>

          <pre id="bdb3f"></pre>
          <pre id="bdb3f"><del id="bdb3f"><thead id="bdb3f"></thead></del></pre>

          <ruby id="bdb3f"><mark id="bdb3f"></mark></ruby><ruby id="bdb3f"></ruby>
          <pre id="bdb3f"><pre id="bdb3f"><mark id="bdb3f"></mark></pre></pre><output id="bdb3f"></output><p id="bdb3f"></p><p id="bdb3f"></p>

          <pre id="bdb3f"><del id="bdb3f"><progress id="bdb3f"></progress></del></pre>

                <ruby id="bdb3f"></ruby>

                ??碼云GVP開源項目 12k star Uniapp+ElementUI 功能強大 支持多語言、二開方便! 廣告
                作者 Boris Lublinsky ,譯者 王麗娟 AWS提供兩種服務——Amazon簡單通知服務(Simple Notification Service)和Amazon簡單隊列服務(Simple Queue Service),兩者結合起來可以為完整的發布/訂閱服務提供支撐。 ## 現有的AWS功能 Amazon[簡單通知服務](http://aws.amazon.com/sns)(Amazon SNS)是一個Web服務,能讓應用、最終用戶和設備立即從云端發送和接收通知。簡化的SNS架構如下圖所示(圖1): ![](https://box.kancloud.cn/2015-08-01_55bc61f0980a6.jpeg) 圖1:Amazon SNS的基礎架構 多個發布應用和多個訂閱應用可以將SNS主題作為中介互相通訊。這樣實現的優點是發布者和訂閱者不需要知道對方,因此,應用可以完全動態地進行集成。SNS支持用多種傳輸協議傳遞通知,包括HTTP、HTTPS、Email、SMS和Amazon簡單隊列(Simple Queue)。 Amazon[簡單隊列服務](http://aws.amazon.com/sqs)(Amazon SQS)提供可靠、可伸縮的托管隊列,用來存儲計算機之間傳輸的消息。使用Amazon SQS,你可以在執行不同任務的應用分布式組件之間移動數據,而不會丟失消息,也不必要求每個組件始終都是可用的。SQS和SNS結合起來會帶來兩個額外的優勢——解除時間上的耦合度,根據消費應用特定的情況提供負載均衡——這是SNS無法單獨提供的。要做到第二個附加優勢,需要同一個應用的多個實例從同一個隊列里讀取消息。下圖展示了SNS和SQS結合的總體架構(圖2)。其中的一個訂閱應用顯示為負載均衡的。 ![](https://box.kancloud.cn/2015-08-01_55bc61f0a4e9e.jpeg) 圖2:結合SNS和SQS 這個實現的主要缺點是,發布者和訂閱者需要明確統一SNS主題的名稱。此外,如果一個特定的消費者想從多個主題獲取信息,那他需要把隊列注冊到多個主題上。 ## 期望中的發布/訂閱實現 這個問題的典型解決方案是采用基于樹的主題組織,大部分發布/訂閱引擎都是這樣實現的。OASIS規范的[Web Services Topics 1.3](http://docs.oasis-open.org/wsn/wsn-ws_topics-1.3-spec-os.pdf)概述了這種組織的主要原則。 這個規范將主題定義為: “……主題是一組通知的組織和分類方式。主題機制為訂閱者推斷出感興趣的通知提供了便捷的方式……發布者可以將通知發布和一或多個主題關聯起來。當訂閱者創建訂閱的時候,可以提供一個主題的過濾器表達式,將訂閱和一或多個主題關聯起來……每個主題都可以有零或多個**子主題**,子主題本身也可以進一步包含子主題。沒有‘父親’的主題叫**根主題**。特定的根主題和它所有的后代會形成一個層次結構(稱為**主題樹**)。” 下面是手機銷售的一個主題樹例子(圖3)。 ![](https://box.kancloud.cn/2015-08-01_55bc61f0b4226.jpeg) 圖3:主題樹示例 主題樹的根表示銷售。銷售可以按區域細分(在我們的例子中有北美、歐洲和亞太地區)。特定區域的銷售還可以按照手機類型進一步細分,依此類推。 在發布/訂閱系統中,這樣的結構之所以重要是因為樹反映了數據的組織。如果消費者對北美的智能手機銷售感興趣,他可以監聽這個特定的主題。如果他對北美所有的銷售都感興趣,那他就可以監聽北美的主題,從子主題獲取所有的通知。 當然,這種方法并不能解決所有的問題。比如說,如果消費者想監聽所有智能手機銷售的事件,他就需要明確訂閱所有地區的智能手機銷售事件。這種情況通常是主題樹設計的問題。樹的設計基于信息的組織和典型的使用模式。在某些情況下,會設計多個主題來滿足不同的內部需求(參見[Web Services Topics 1.3](http://docs.oasis-open.org/wsn/wsn-ws_topics-1.3-spec-os.pdf)里的主題命名空間)。發布/訂閱架構的另一個重要特性就是[基于內容的消息過濾](http://en.wikipedia.org/wiki/Publish–subscribe_pattern): “在基于內容的系統中,如果消息的屬性或內容與訂閱者定義的約束相匹配,消息就只會傳遞給這個訂閱者。訂閱者負責消息的分類。” 換句話說,訂閱者在這種情況下可以使用正則表達式列表,明確指定他們感興趣的消息內容。 把這種過濾和結構化的主題結構結合起來,可以創建出非常靈活和強大的發布/訂閱實現。 我們將在本文中展示如何用AWS組件輕松構建這類系統。 ## 發布/訂閱架構建議 建議給大家的架構如下圖所示(圖4)。在這個架構中,發布/訂閱服務器的實現是一個Tomcat容器里運行的Web應用。我們還充分利用了AWS的[彈性負載均衡器(Elastic Load Balancer)](http://aws.amazon.com/elasticloadbalancing),它可以根據當前的負載動態擴展或縮減發布/訂閱服務器集群的大小。此外,架構還用[關系型數據服務(Relational Data Service)](http://aws.amazon.com/rds)存儲當前的配置,以便動態新增發布/訂閱實例。為了提高整體性能,我們在內存里保留了當前的拓撲結構,盡量減少數據庫訪問的次數。這樣的話,實際的消息路由會非常迅速。這個解決方案需要一種機制,能在拓撲結構發生變化的時候去通知所有的服務器(因為任何服務器都能處理負載均衡器)。Amazon SNS能輕而易舉地做到這一點。最后,我們用Amazon SQS將通知分發給消費者。需要注意的是,一個消費者可以監聽多個隊列。 ![](https://box.kancloud.cn/2015-08-01_55bc61f0be67b.jpeg) 圖4:整體架構建議 ## 發布/訂閱服務器 這個實現的核心是一個自定義的發布/訂閱服務器。服務器實現包括三個主要的層——持久化、域和服務。 ### 持久化 服務器持久化層采用[JPA 2.0](http://jcp.org/en/jsr/detail?id=317)實現,定義了三個主要的實體——主題、訂閱和語義過濾器。 主題實體(清單1)描述了特定主題要存儲的相關信息,包括主題ID(數據庫的內部ID)、主題名稱(標識主題的字符串)、一個布爾變量(定義該主題是否是個根主題)、到父主題和孩子主題的引用(以便對主題層次結構進行遍歷),以及與給定主題關聯的訂閱列表。 ~~~ @Entity @NamedQueries({ ??? @NamedQuery(name="Topic.RootTopics", ??????????????????? query="SELECT t FROM Topic t where t.root='true'"), ??? @NamedQuery(name="Topic.AllTopics",   ???????????????????? query="SELECT t FROM Topic t") }) @Table(name = "Topic") public class Topic { ?@Id @GeneratedValue(strategy=GenerationType.IDENTITY) ?private long id;?? ?// 自動生成的ID ?@Column(name = "name",nullable = false, length = 32) ?private String name;?? ??? ??? ??? ??????// 主題名稱 ?? ?@Column(name = "root",nullable = false) ?private Boolean root = false;?? ??? ???// 根主題標識?? ? ?@ManyToOne(fetch=FetchType.LAZY) ?@JoinColumn(name="TOPIC_ID") ?private Topic parent; ?@OneToMany(mappedBy="parent",cascade=CascadeType.ALL,orphanRemoval=true) ?private List<Topic> children; ? ?@OneToMany(mappedBy="topic",cascade=CascadeType.ALL,orphanRemoval=true) ?private List<Subscription> subscriptions; ~~~ 清單1:主題實體 我們定義了兩個命名的查詢,用來訪問主題:RootTopics獲取從根開始的主題結構,AllTopics獲取所有現有的主題。 這個實體提供了一個完整的主題定義,也可以支持多個主題樹(而不是實現示例的一部分)。 訂閱實體(清單2)描述了訂閱相關的信息,包括訂閱ID(數據庫的內部ID)、隊列名稱(SQS隊列的ARN,ARN即Amazon Resource Name)、對訂閱關聯主題的引用,還有一個語義過濾器列表。只有所有的過濾器都接受消息(見下文),通知才會分發給給定的隊列(客戶端)。如果通知不包含語義過濾器,那來自于關聯主題的所有消息都會直接傳遞給隊列。 ~~~ @Entity @NamedQueries({ ??@NamedQuery(name="Subscription.AllSubscriptions",   ??????????????????? query="SELECT s FROM Subscription s") }) @Table(name = "Subscription") public class Subscription { ????@Id @GeneratedValue(strategy=GenerationType.IDENTITY) ????private long id;?? ?// 自動生成的ID ????@Column(name = "queue",nullable = false, length = 128) ????private String queue; ? ????@ManyToOne(fetch=FetchType.LAZY) ????@JoinColumn(name="TOPIC_ID") ????private Topic topic; ?? ? ????@OneToMany(mappedBy="subscription", ?????  ????????????cascade=CascadeType.ALL,orphanRemoval=true) ????private List<SemanticFilter> filters;? ????…………………………………………………………… ~~~ 清單2:訂閱實體 我們還定義了一個命名的查詢,獲得所有存在的訂閱。 最后,語義過濾器實體(清單3)描述了特定語義過濾器的信息,包括語義過濾器ID(數據庫的內部ID)、該語義過濾器測試的屬性名稱、使用的正則表達式,以及對語義過濾器關聯訂閱的引用。 ~~~ @Entity @NamedQueries({ ??@NamedQuery(name="SemanticFilter.AllSemanticFilters",   ??????????????????? query="SELECT sf FROM SemanticFilter sf") }) @Table(name = "Filter") public class SemanticFilter { ????@Id @GeneratedValue(strategy=GenerationType.IDENTITY) ????private long id;?? ?// 自動生成的ID ?? ? ????@Column(name = "attribute",nullable = false, length = 32) ????private String attribute;?? ??? ??? ??? ????// 屬性名稱 ????@Column(name = "filter",nullable = false, length = 128) ????private String filter;?? ??? ??? ??? ?????// 正則表達式過濾器 ????@ManyToOne(fetch=FetchType.LAZY) ????@JoinColumn(name="SUBSCRIPTION_ID") ????private Subscription subscription; ????…………………………………………………………… ~~~ 清單3:語義過濾器實體 我們一樣定義一個命名的查詢,用來獲取所有現有的語義過濾器。 除了實體,持久化層還包含一個持久化管理類,負責: 管理數據庫訪問和事務 從數據庫讀取、寫入對象 對域對象(見下文)和持久化實體進行相互轉換 發送拓撲結構變化的通知 ### 域模型 域模型對象的主要職責是支持服務操作,包括數據的訂閱和發布,并把通知真正發布到訂閱的隊列上。在這個簡單的實現里,域模型和持久化模型是合在一起的,但為了闡述得更清楚,我們分開介紹。這兩層的數據模型是一樣的,但域對象會多一些明確支持發布/訂閱實現的方法。 過濾器處理的實現(清單4)利用了Java String里對[正則表達式](http://en.wikipedia.org/wiki/Regex)處理的[內置支持](http://www.vogella.com/articles/JavaRegularExpressions/article.html)。 ~~~ ?public boolean accept(String value){   ?? ??? if(value == null)   ?? ??? ??? ??return false;   ?? ??? return value.matches(_pattern); ?} ~~~ 清單4:過濾器處理方法 發布實現(清單5)是訂閱類的一個方法。請注意,這個方法對語義過濾器進行了或操作。如果給定的客戶端能有多個訂閱,或者對訂閱實現進行擴展、讓它支持Boolean函數,那就可以突破這個限制了。 ~~~ public void publish(Map<String, String> attributes, String message){ ?? ????if((_filters != null) && (_filters.size() > 0)){ ????????for(DomainSemanticFilter f : _filters){ ????????????String av = attributes.get(f.getField()); ????????????if(av == null) ????????????????return; ????????????if(!f.accept(av)) ????????????????return; ????????} ????} ????SQSPublisher.getPublisher().sendMessage(_queue, message); } ~~~ 清單5:發布實現 這個實現利用了基于現有AWS Java API的SQSPublisher類(清單6)。 ~~~ import java.io.IOException; import com.amazonaws.auth.AWSCredentials; import com.amazonaws.auth.PropertiesCredentials; import com.amazonaws.services.sqs.AmazonSQSClient; import com.amazonaws.services.sqs.model.CreateQueueRequest; import com.amazonaws.services.sqs.model.DeleteQueueRequest; import com.amazonaws.services.sqs.model.SendMessageRequest; public class SQSPublisher {   ?private static SQSPublisher _publisher;   ??   ?private AmazonSQSClient _sqs;?? ?   ?? ?   ?private SQSPublisher()throws IOException {   ?? ??? ???AWSCredentials credentials = new PropertiesCredentials(   ?? ??? ??? ??? ?this.getClass().getClassLoader(). getResourceAsStream("AwsCredentials.properties"));   ?? ??? ???_sqs = new AmazonSQSClient(credentials);    }   ?public String createQueue(String name){   ?? ??? ??CreateQueueRequest request = new CreateQueueRequest(name);   ????? ??? return _sqs.createQueue(request).getQueueUrl();   ?}   ?public void sendMessage(String queueURL, String message){   ?? ??? ??SendMessageRequest request = new SendMessageRequest(queueURL, message);   ?????? ?? _sqs.sendMessage(request);   ?} ?????public void deleteQueue(String queueURL){   ?? ??? ??DeleteQueueRequest request = new DeleteQueueRequest(queueURL);   ?????? ?? _sqs.deleteQueue(request);   ?}   ?public static synchronized SQSPublisher getPublisher(){   ?? ?????if(_publisher == null)   ?? ??? ??? ?????try {   ?? ??? ??? ??? ????????_publisher = new SQSPublisher();   ?? ??? ??? ?????}catch (IOException e) {   ?? ??? ??? ??? ???????e.printStackTrace();   ?? ??? ??? ?????}   ?? ??? ??return _publisher;   ?} } ~~~ 清單6:SQS發布者 訂閱者可以利用這個類的其他方法創建/銷毀SQS隊列。 除了SQS隊列,我們的實現還利用SNS進行數據庫變化的同步。與SNS的交互由SNSPubSub類實現(清單7),這個實現也利用了AWS SNS Java API。 ~~~ import java.io.IOException; import com.amazonaws.auth.AWSCredentials; import com.amazonaws.auth.PropertiesCredentials; import com.amazonaws.services.sns.AmazonSNSClient; import com.amazonaws.services.sns.model.PublishRequest; import com.amazonaws.services.sns.model.SubscribeRequest; import com.amazonaws.services.sns.model.SubscribeResult; import com.amazonaws.services.sns.model.UnsubscribeRequest; public class SNSPubSub {   ? private static SNSPubSub _topicPublisher;   ? private static String _topicARN;   ? private static String _endpoint;   ?? ?   ? private AmazonSNSClient _sns;   ? private String _protocol = "http";   ? private String _subscriptionARN;   ?? ?   ? private SNSPubSub()throws IOException {   ?? ??? ???AWSCredentials credentials = new PropertiesCredentials(   ?? ??? ??? ??? ?this.getClass().getClassLoader(). getResourceAsStream("AwsCredentials.properties"));   ?? ??? ???_sns = new AmazonSNSClient(credentials);   ? }   ? public void publish(String message){   ?? ??? ???PublishRequest request = new PublishRequest(_topicARN, message);   ?? ??? ???_sns.publish(request);   ? }   ??   ? public void subscribe(){   ?? ??? ???SubscribeRequest request = new SubscribeRequest (_topicARN, _protocol, _endpoint);   ?? ??? ???_sns.subscribe(request);   ? }   ?? ?   ? public void confirmSubscription(String token){   ?? ??? ???ConfirmSubscriptionRequest request = new ?ConfirmSubscriptionRequest(_topicARN, token);   ?? ??? ???ConfirmSubscriptionResult result = _sns .confirmSubscription(request);   ?? ??? ???_subscriptionARN = result.getSubscriptionArn();   ? } ?? ?   ? public void unSubscribe(){   ?? ??? ???if(_subscribed){   ?? ??? ??? ??????UnsubscribeRequest request = new UnsubscribeRequest(_subscriptionARN);   ?? ??? ??????????_sns .unsubscribe(request);   ?? ??? ???}   ? }   ?? ?   ? public static void configureSNS(String topicARN, String endpoint){   ?? ??? ????_topicARN = topicARN;   ?? ???????_endpoint = endpoint;   ? }   ?? ???   ? public static synchronized SNSPubSub getSNS(){   ?? ??? ????if(_topicPublisher == null){   ?? ??? ??? ???????try{   ?? ??? ??? ?????????????_topicPublisher = new SNSPubSub();   ?? ??? ??? ???????}   ?? ??? ??? ???????catch(Exception e){   ?? ??? ??? ?????????????e.printStackTrace();   ?? ??? ??? ???????}   ?? ??? ????}   ?? ??? ????return _topicPublisher;   ? } } ~~~ 清單7:SNS Pub/Sub 使用SNS 使用SNS的時候要謹記:訂閱主題并不意味著你已經準備好監聽主題。SNS訂閱的過程包含兩個步驟。向SNS發送訂閱請求時,SNS返回的響應表明確認訂閱的必要性。這正是清單8既有subscribe方法又有confirmSubscription方法的原因。 ~~~ <xsd:complextype name="NotificationType"> ??<xsd:sequence> ????<xsd:element name="Type" type="xsd:string" /> ????<xsd:element name="MessageId" type="xsd:string" /> ????<xsd:element name="Token" type="xsd:string" minoccurs="0" /> ????<xsd:element name="TopicArn" type="xsd:string" /> ????<xsd:element name="Message" type="xsd:string" /> ????<xsd:element name="SubscribeURL" type="xsd:string" minoccurs="0" /> ????<xsd:element name="Timestamp" type="xsd:string" /> ????<xsd:element name="SignatureVersion" type="xsd:string" /> ????<xsd:element name="Signature" type="xsd:string" /> ????<xsd:element name="SigningCertURL" type="xsd:string" /> ????<xsd:element name="UnsubscribeURL" type="xsd:string" minoccurs="0" /> ??</xsd:sequence> </xsd:complextype> ~~~ 上面的Schema描述了兩種消息類型——確認請求和實際的通知。兩種類型通過Type元素進行區分。如果元素值是“SubscriptionConfirmation”,那它就是訂閱確認的請求,如果是“Notification”,就表明是個真正的通知。 主題類實現了兩個方法(清單8),以便支持發布。 ~~~ public void publish(Map<String, String> attributes, String message){ ???? ????if(_subscriptions == null) ????????return; ????for(DomainSubscription ds : _subscriptions) ????????ds.publish(attributes, message); } public void processPublications(List<DomainTopic> tList, StringTokenizer st) throws PublicationException{ ???? ????tList.add(this); ????if(!st.hasMoreTokens()) ????????return; ????String topic = st.nextToken(); ????for(DomainTopic dt : _children){ ????????if(topic.equalsIgnoreCase(dt.getName())){ ????????????dt.processPublications(tList, st); ????????????return; ????????} ????} ????throw new PublicationException("Subtopic " + topic + " is not found in topic " + _name); } ~~~ 清單8:主題對發布的支持 processPublications方法創建了一個主題列表,這些主題與給定的消息相關聯。這個方法有一個標記過的主題樹字符串,如果標記和主題名稱相對應,就會把當前的主題添加到列表中。主題的publish方法維護一個消息屬性的映射,對主題相關的每個訂閱來說,publish方法還會嘗試著去發布一條消息。 上面的方法都由Domain管理器類的publish方法調用(清單9)。這個方法首先標記主題字符串,然后用processPublications方法創建一個訂閱者感興趣的主題列表。列表一旦被創建好,就會構建一個消息屬性的映射(我們假設是一個XML消息),并把這個映射發布給列表里的所有主題。 ~~~ ???public void publish (String topic, String message){   ?? ?? StringTokenizer st = new StringTokenizer(topic, ".");   ?? ?? List<DomainTopic> topics = new LinkedList<Domaintopic>();   ?? ?? DomainTopic root = PersistenceManager.getPersistenceManager().getRoot();   ?? ?? try {      ?? ??? ??? ???if(!st.hasMoreTokens())   ?? ??? ??? ??? ????return;   ?? ??? ??? ???String t = st.nextToken();      ?? ??? ??? ???if(!t.equalsIgnoreCase(root.getName()))   ?? ??? ??? ??? ????throw new PublicationException("Unrecognized subtopic name " + topic);      ?? ??? ??? ???root.processPublications(topics, st);      ?? ?? }catch (PublicationException e) {      ?? ??? ??? ???e.printStackTrace();   ?? ??? ??? ???return;   ?? ?? }      ?? ?? MessageType msg = null;      ?? ?? try {      ?? ??? ??? ???JAXBElement<MessageType> msgEl = (JAXBElement<MessageType>)   ?? ??? ??? ??? ????_unmarshaller.unmarshal(new ByteArrayInputStream(message.getBytes()));     ?? ??? ??? ???msg = msgEl.getValue();      ?? ?? } catch (JAXBException e) {      ?? ??? ??? ???e.printStackTrace();   ?? ??? ??? ???return;   ?? ?? }      ?? ?? Map<String, String> attributes = new HashMap<String, String>();   ?? ?? MessageEnvelopeType envelope = msg.getEnvelope();   ?? ?? if(envelope != null){   ?? ??? ??? ???for(MessageAttributeType attribute : envelope.getAttribute()){   ?? ??? ??? ??? ????attributes.put(attribute.getName(), attribute.getValue());   ?? ??? ??? ???}   ?? ?? }   ?? ?? for(DomainTopic t : topics)   ?? ??? ??? ???t.publish(attributes, message); } ~~~ 清單9:發布方法實現 ### 服務模型 我們用一組REST服務對發布/訂閱功能進行訪問(清單10)。 ~~~ @Path("/") public class PubSubServiceImplementation {   ? // 功能方法   ? @POST   ? @Path("publish")   ? @Consumes("application/text")   ? public void publish (@QueryParam("topic")String topic, String message) throws PublicationException{   ?? ??? ???DomainManager.getDomainManager().publish(topic, message);   ? }   ? @GET   ? @Path("publish")   ? public void publishGet (@QueryParam("topic")String topic, @QueryParam("message")String message)? throws publicationException{   ?? ??? ???DomainManager.getDomainManager().publish(topic, message);   ? }   ? @POST   ? @Path("synch")   ? @Consumes("text/plain")   ? public void getSynchNotification (Object message){   ?? ??? ???PersistenceManager.setUpdated();   ? }   ? // 配置方法   ? @GET   ? @Path("root")   ? @Produces("application/json")   ? public TopicType getRoot()throws PublicationException {   ?? ??? ???return DomainManager.getDomainManager().getRoot();   ? }   ? @GET   ? @Path("filters")   ? @Produces("application/json")   ? public FiltersType getFilters() throws PublicationException {   ?? ??? ???return DomainManager.getDomainManager().getFilters();   ? }   ? @POST   ? @Path("filter")   ? @Consumes("application/json")   ? public long addFilter(FilterType filter) throws PublicationException {   ?? ??? ???return DomainManager.getDomainManager().addFilter(filter);   ? }   ? @DELETE   ? @Path("filter/{id}")   ? public void deleteFilter(@PathParam("id")long id) throws PublicationException {   ?? ??? ???DomainManager.getDomainManager().removeFilter(id);   ? }   ? @GET   ? @Path("subscriptions")   ? @Produces("application/json")   ? public SubscriptionsType getSubscriptions() throws PublicationException {   ?? ??? ???return DomainManager.getDomainManager().getSubscriptions();   ? }   ? @POST   ? @Path("subscription")   ? @Consumes("application/json")   ? public long addSubscription(SubscriptionType s) throws PublicationException {   ?? ??? ???return DomainManager.getDomainManager().addSubscription(s, null);   ? }   ? @DELETE   ? @Path("subscription/{id}")   ? public void deleteSubscription(@PathParam("id")long id) throws PublicationException {   ?? ??? ???DomainManager.getDomainManager().removeSubscription(id);   ? }   ? @POST   ? @Path("subscriptionFilters/{sid}")   ? @Consumes("application/json")   ? public long assignFilersToSubscription(@PathParam("sid")long sid, IDsType ids)throws PublicationException{   ?? ??? ???return DomainManager.getDomainManager().assignFilersToSubscription(sid, ids);   ? }?? ?   ? @POST   ? @Path("topic")   ? @Consumes("application/json")   ? public long addTopic(TopicType t) throws PublicationException {   ?? ??? ???return DomainManager.getDomainManager().addTopic(t, null);   ? }   ? @DELETE   ? @Path("topic/{id}")   ? public void deleteTopic(@PathParam("id")long id) throws PublicationException {   ?? ??? ???DomainManager.getDomainManager().removeTopic(id);   ? }   ? @POST   ? @Path("topicsubscription/{tid}")   ? @Consumes("application/json")   ? public void assignTopicHierarchy(@PathParam("tid")long tid, IDsType ids) throws PublicationException{   ?? ??? ???DomainManager.getDomainManager().assignTopicHierarchy(tid, ids);   ? }   ? @POST   ? @Path("topicsubscription/{tid}")   ? @Consumes("application/json")   ? public long assignTopicSubscriptions(@PathParam("tid")long tid, IDsType ids)throws PublicationException{   ?? ??? ???return DomainManager.getDomainManager().assignTopicSubscriptions(tid, ids);   ? } ~~~ 清單10:發布/訂閱服務 這些服務的使用者有消息發布者(publish方法)、服務訂閱者(創建/刪除語義過濾器,訂閱,還有訂閱和主題訂閱相關的過濾器)、內部的發布/訂閱實現(獲取同步的服務)和管理應用。 ## 結論 這個實現雖然簡單,但創建了一個非常強大、可擴展的發布/訂閱實現,同時利用了很多現有的AWS功能和少量的Java定制代碼。另外它還充分利用了現有AWS部署功能對負載均衡和容錯的支持。 ## 作者簡介 **Boris Lublinsky博士**是Nokia的主要架構師,參與大數據、SOA、BPM和中間件實現的相關工作。Boris去Nokia前是Herzum軟件的主要架構師,負責為客戶設計大型、可伸縮的SOA系統;在此之前,他是CNA保險的企業架構師,參與CNA集成和SOA策略的設計及實現,構建應用框架,實現面向服務的架構。Boris在企業技術架構和軟件工程方面有二十五年多的經驗。他是OASIS SOA RM委員會的活躍成員,和他人一起編著了《Applied SOA: Service-Oriented Architecture and Design Strategies》一書,另外他還寫了很多關于架構、編程、大數據、SOA和BPM的文章。 查看英文原文:[基于AWS技術實現發布/訂閱服務](http://www.infoq.com/articles/AmazonPubSub) 查看原文:[基于AWS技術實現發布/訂閱服務](http://www.infoq.com/cn/articles/AmazonPubSub)
                  <ruby id="bdb3f"></ruby>

                  <p id="bdb3f"><cite id="bdb3f"></cite></p>

                    <p id="bdb3f"><cite id="bdb3f"><th id="bdb3f"></th></cite></p><p id="bdb3f"></p>
                      <p id="bdb3f"><cite id="bdb3f"></cite></p>

                        <pre id="bdb3f"></pre>
                        <pre id="bdb3f"><del id="bdb3f"><thead id="bdb3f"></thead></del></pre>

                        <ruby id="bdb3f"><mark id="bdb3f"></mark></ruby><ruby id="bdb3f"></ruby>
                        <pre id="bdb3f"><pre id="bdb3f"><mark id="bdb3f"></mark></pre></pre><output id="bdb3f"></output><p id="bdb3f"></p><p id="bdb3f"></p>

                        <pre id="bdb3f"><del id="bdb3f"><progress id="bdb3f"></progress></del></pre>

                              <ruby id="bdb3f"></ruby>

                              哎呀哎呀视频在线观看