A. rocketmq 發送失敗一般怎麼處理
一:RocketMQ簡介
RocketMQ是一款分布式、隊列模型的消息中間件,具有以下特點:
1.能夠保證嚴格的消息順序
2.提供豐富的消息拉取模式
3.高效的訂閱者水平擴展能力
4.實時的消息訂閱機制
5.億級消息堆積能力
二:安裝RocketMQ
下載源碼
首先我們從githup上獲取RocketMQ的源碼,目前最新的版本為3.5.8,下載地址為: 或者 wget /alibaba/RocketMQ/archive/v3.5.8.tar.gz。請注意:此時我們下載的是源碼,直接解壓時不能用的,所以我們需要編譯之後才能使用。
編譯源碼
在進行編譯源碼之前我們需要安裝JDK。如果你已經安裝過了,請跳過這里。如果你還沒有安裝過JDK,請參考這篇文章(Linux環境下安裝JDK)。然後我們還需要安裝一下Maven。Maven的安裝還是比較簡單,只需要去官方上下載的安裝吧,然後直接解壓,再配置一下環境變數就OK。接下來我們把剛才下載來的RockeMQ的源碼解壓到/usr/local/rockemq-source文件夾中。在源碼中有一個Install.sh。如圖所示:
。運行sh install.sh。在編譯完成之後,我們只要target目錄下的alibaba-rocketmq這個文件夾中內容,把alibaba-rocketmq文件夾中的內容移動到/usr/local/rocketmq中。如果你不想編譯的話,可以從這里下載編譯之後的rocketmq。(rocketmq3.5.8)。
配置環境變數
接下來我們需要配置一下環境變數。在終端中輸入以下命令:vi /etc/profile ,在文件的末尾中添加如下兩句話:export rocketmq=/usr/local/rocketmq export PATH=$PATH:$rocketmq/bin。接下來我們使配置的換將變數生效:source /etc/profile.
三:啟動RocketMQ
接下來我們啟動一下剛才編譯的RocketMQ.在啟動之前我們需要修改一下RocketMQ啟動的內存大小(如果你的系統內存比較大的話,請忽略)。我們進入到/usr/local/rocketmq/bin中,在終端中輸入以下命令修改mqnamesrv的內存大小:vi runserver.sh.修改為如圖的內容:
,接下來修改broker的內存大小:vi runbroker.sh:
啟動mqnameserver
進入到/usr/local/rocketmq/bin中輸入以下命令:nohup sh mqnamesrv > ~/logs/rocketmqlogs/namesrv.log 2>&1 &。注意最後的這個 & 不要少。
啟動mqbroker
進入到/usr/local/rocketmq/bin中輸入以下命令:nohup sh mqbroker -n localhost:9876 autoCreateTopicEnable=true > ~/logs/rocketmqlogs/broker.log 2>&1 &。注意:localhost可以換成你剛才啟動mqnamesrv的IP。autoCreateTopicEnable=true
這句話不要少了。最後的 & 也不要少了。
我們可以通過 ps aux | grep java命令來查看啟動的情況。
到此,rocketmq的安裝完畢。
四:RocketMQ的小例子
procer:
[java] view plain
package com.zkn.newlearn.rocketmq;
import com.alibaba.rocketmq.client.exception.MQBrokerException;
import com.alibaba.rocketmq.client.exception.MQClientException;
import com.alibaba.rocketmq.client.procer.DefaultMQProcer;
import com.alibaba.rocketmq.client.procer.SendResult;
import com.alibaba.rocketmq.common.message.Message;
import com.alibaba.rocketmq.remoting.exception.RemotingException;
import java.util.concurrent.TimeUnit;
/**
* Created by zkn on 2016/10/27.
*/
public class ProcerTest01 {
public static void main(String[] args) {
/**
* 一個應用創建一個Procer,由應用來維護此對象,可以設置為全局對象或者單例<br>
* 注意:ProcerGroupName需要由應用來保證唯一<br>
* ProcerGroup這個概念發送普通的消息時,作用不大,但是發送分布式事務消息時,比較關鍵,
* 因為伺服器會回查這個Group下的任意一個Procer
*/
DefaultMQProcer procer = new DefaultMQProcer("ProcerGroupName");
//procer.setNamesrvAddr("192.168.180.1:9876");
procer.setNamesrvAddr("192.168.180.133:9876");
procer.setInstanceName("Procer");
/**
* Procer對象在使用之前必須要調用start初始化,初始化一次即可<br>
* 注意:切記不可以在每次發送消息時,都調用start方法
*/
try {
procer.start();
} catch (MQClientException e) {
e.printStackTrace();
}
for (int i = 0; i < 100; i++) {
try {
/**
* 下面這段代碼表明一個Procer對象可以發送多個topic,多個tag的消息。
* 注意:send方法是同步調用,只要不拋異常就標識成功。但是發送成功也可會有多種狀態,<br>
* 例如消息寫入Master成功,但是Slave不成功,這種情況消息屬於成功,但是對於個別應用如果對消息可靠性要求極高,<br>
* 需要對這種情況做處理。另外,消息可能會存在發送失敗的情況,失敗重試由應用來處理。
*/
{
Message msg = new Message("TopicTest1",// topic
"TagA",// tag
"OrderID001",// key
("Hello MetaQ").getBytes());// body
SendResult sendResult = procer.send(msg);
System.out.println(sendResult);
}
{
Message msg = new Message("TopicTest2",
"TagB",
"OrderID001",
("Hello MetaQ TagB".getBytes()));
SendResult sendResult = procer.send(msg);
System.out.println(sendResult);
}
{
Message msg = new Message("TopicTest3",
"TagC",
"OrderID001",
("Hello MetaQ TagC").getBytes());
SendResult sendResult = procer.send(msg);
System.out.println(sendResult);
}
TimeUnit.MILLISECONDS.sleep(1000);
} catch (MQClientException e) {
e.printStackTrace();
} catch (InterruptedException e) {
e.printStackTrace();
} catch (RemotingException e) {
e.printStackTrace();
} catch (MQBrokerException e) {
e.printStackTrace();
}
}
/**
* 應用退出時,要調用shutdown來清理資源,關閉網路連接,從MetaQ伺服器上注銷自己
* 注意:我們建議應用在JBOSS、Tomcat等容器的退出銷毀方法里調用shutdown方法
*/
procer.shutdown();
}
}
B. rocketmq的push模式會無限拉取消息嗎
DefaultMQPushConsumerImpl中各個對象的主要功能如下:
RebalancePushImpl:主要負責決定,當前的consumer應該從哪些Queue中消費消息;
1)PullAPIWrapper:長連接,負責從broker處拉取消息,然後利用ConsumeMessageService回調用戶的Listener執行消息消費邏輯;
2)ConsumeMessageService:實現所謂的"Push-被動"消費機制;從Broker拉取的消息後,封裝成ConsumeRequest提交給ConsumeMessageSerivce,此service負責回調用戶的Listener消費消息;
3)OffsetStore:維護當前consumer的消費記錄(offset);有兩種實現,Local和Rmote,Local存儲在本地磁碟上,適用於BROADCASTING廣播消費模式;而Remote則將消費進度存儲在Broker上,適用於CLUSTERING集群消費模式;
4)MQClientFactory:大雜燴,負責管理client(consumer、procer),並提供多中功能介面供各個Service(Rebalance、PullMessage等)調用;大部分邏輯均在這個類中完成;
1 消息拉取入口
調用DefaultMQPushConsumerImpl.pullMessage(PullRequest pullRequest)方法進行消息的拉取。該方法的大致邏輯如下:
1、檢查PullRequest對象中的ProcessQueue對象的dropped是否為true(在RebalanceService線程中為topic下的MessageQueue創建拉取消息請求時要維護對應的ProcessQueue對象,若Consumer不再訂閱該topic則會將該對象的dropped置為true);若是則認為該請求是已經取消的,則直接跳出該方法;
2、更新PullRequest對象中的ProcessQueue對象的時間戳(ProcessQueue.lastPullTimestamp)為當前時間戳;
3、檢查該Consumer是否運行中,即DefaultMQPushConsumerImpl.serviceState是否為RUNNING;若不是運行狀態或者是暫停狀態(DefaultMQPushConsumerImpl.pause=true),則調用PullMessageService.executePullRequestLater(PullRequest pullRequest, long timeDelay)方法延遲再拉取消息,其中timeDelay=3000;該方法的目的是在3秒之後再次將該PullRequest對象放入PullMessageService. pullRequestQueue隊列中;並跳出該方法;
4、進行流控。若ProcessQueue對象的msgCount大於了消費端的流控閾值(DefaultMQPushConsumer.pullThresholdForQueue,默認值為1000),則調用PullMessageService.executePullRequestLater方法,在50毫秒之後重新該PullRequest請求放入PullMessageService.pullRequestQueue隊列中;並跳出該方法;
5、若不是順序消費(即DefaultMQPushConsumerImpl.consumeOrderly等於false),則檢查ProcessQueue對象的msgTreeMap:TreeMap<Long,MessageExt>變數的第一個key值與最後一個key值之間的差額,該key值表示查詢的隊列偏移量queueoffset;若差額大於閾值(由DefaultMQPushConsumer. consumeConcurrentlyMaxSpan指定,默認是2000),則調用PullMessageService.executePullRequestLater方法,在50毫秒之後重新將該PullRequest請求放入PullMessageService.pullRequestQueue隊列中;並跳出該方法;
6、以PullRequest.messageQueue對象的topic值為參數從RebalanceImpl.subscriptionInner: ConcurrentHashMap<String /* topic */, SubscriptionData>中獲取對應的SubscriptionData對象,若該對象為null,考慮到並發的關系,調用executePullRequestLater方法,稍後重試;並跳出該方法;
7、若消息模型為集群模式(RebalanceImpl.messageModel等於CLUSTERING),則以PullRequest對象的MessageQueue變數值、type =READ_FROM_MEMORY(從內存中獲取消費進度offset值)為參數調用DefaultMQPushConsumerImpl. offsetStore對象(初始化為RemoteBrokerOffsetStore對象)的readOffset(MessageQueue mq, ReadOffsetType type)方法從本地內存中獲取消費進度offset值。若該offset值大於0 則置臨時變數commitOffsetEnable等於true否則為false;該offset值作為pullKernelImpl方法中的commitOffset參數,在Broker端拉取消息之後根據commitOffsetEnable參數值決定是否用該offset更新消息進度。該readOffset方法的邏輯是:以入參MessageQueue對象從RemoteBrokerOffsetStore.offsetTable:ConcurrentHashMap <MessageQueue,AtomicLong>變數中獲取消費進度偏移量;若該偏移量不為null則返回該值,否則返回-1;
8、當每次拉取消息之後需要更新訂閱關系(由DefaultMQPushConsumer. postSubscriptionWhenPull參數表示,默認為false)並且以topic值參數從RebalanceImpl.subscriptionInner獲取的SubscriptionData對象的classFilterMode等於false(默認為false),則將sysFlag標記的第3個位元組置為1,否則該位元組置為0;
9、該sysFlag標記的第1個位元組置為commitOffsetEnable的值;第2個位元組(suspend標記)置為1;第4個位元組置為classFilterMode的值;
10、 初始化匿名內部類PullCallback,實現了onSucess/onException方法; 該方法只有在非同步請求的情況下才會回調;
11、調用底層的拉取消息API介面:PullAPIWrapper.pullKernelImpl(MessageQueue mq, String subExpression, long subVersion,long offset, int maxNums, int sysFlag,long commitOffset,long brokerSuspendMaxTimeMillis, long timeoutMillis, CommunicationMode communicationMode, PullCallback pullCallback)方法進行消息拉取操作(詳見5.9小節)。將回調類PullCallback傳入該方法中,當採用非同步方式拉取消息(詳見5.10.2小節)時,在收到響應之後會回調該回調類的方法。
2 拉取消息的底層API介面(PullAPIWrapper.pullKernelImpl)
PUSH模式和PULL模式下的拉取消息的操作均調用PullAPIWrapper.pullKernelImpl(MessageQueue mq, String subExpression, long subVersion,long offset, int maxNums, int sysFlag,long commitOffset,long brokerSuspendMaxTimeMillis, long timeoutMillis, CommunicationMode communicationMode, PullCallback pullCallback)方法完成。該方法的大致邏輯如下:
1、獲取Broker的ID。以入參MessageQueue對象為參數調用PullAPIWrapper.recalculatePullFromWhichNode(MessageQueue mq)方法,在該方法中,先判斷PullAPIWrapper.connectBrokerByUser變數是否為true(在FiltersrvController中啟動時會設置為true,默認為false),若是則直接返回0(主用Broker的brokerId);否則以MessageQueue對象為參數從PullAPIWrapper.pullFromWhichNodeTable:ConcurrentHashMap<MessageQueue, AtomicLong/* brokerId */>獲取brokerId,若該值不為null則返回該值,否則返回0(主用Broker的brokerId);
2、調用MQClientInstance.findBrokerAddressInSubscribe(String brokerName ,long brokerId,boolean onlyThisBroker) 方法查找Broker地址,其中onlyThisBroker=false,表示若指定的brokerId未獲取到地址則獲取其他BrokerId的地址也行。在該方法中根據brokerName和brokerId參數從MQClientInstance.brokerAddrTable: ConcurrentHashMap<String/* Broker Name */, HashMap<Long/* brokerId */, String/* address */>>變數中獲取對應的Broker地址,若獲取不到則從brokerName下面的Map列表中找其他地址返回即可;
3、若在上一步未獲取到Broker地址,則以topic參數調用MQClientInstance.(String topic)方法,然後在執行第2步的操作,直到獲取到Broker地址為止;
4、若獲取的Broker地址是備用Broker,則將標記位sysFlag的第1個位元組置為0,即在消費完之後不提交消費進度;
5、檢查標記位sysFlag的第4個位元組(即SubscriptionData. classFilterMode)是否為1;若等於1,則調用PullAPIWrapper.(String topic, String brokerAddr)方法獲取Filter伺服器地址。大致邏輯如下:
5.1)根據topic參數值從MQClientInstance.topicRouteTable: ConcurrentHashMap<String/*Topic*/,TopicRouteData>變數中獲取TopicRouteData對象,
5.2)以Broker地址為參數從該TopicRouteData對象的filterServerTable:HashMap<String/* brokerAddr*/,List<String>/* FilterServer*/>變數中獲取該Broker下面的所有Filter伺服器地址列表;
5.3)若該地址列表不為空,則隨機選擇一個Filter伺服器地址返回;否則向調用層拋出異常,該pullKernelImpl方法結束;
6、構建PullMessageRequestHeader對象,其中queueOffset變數值等於入參offset;
7、若執行了第5步則向獲取的Filter伺服器發送PULL_MESSAGE請求信息,否則向Broker發送PULL_MESSAGE請求信息。初始化PullMessageRequestHeader對象,然後調用MQClientAPIImpl.pullMessage(String addr, PullMessageRequestHeader requestHeader, long timeoutMillis, CommunicationMode communicationMode, PullCallback pullCallback)方法向Broker地址或者Filter地址發送PULL_MESSAGE請求信息(詳見5.10小節);
3 發送遠程請求拉取消息的邏輯(PULL_MESSAGE)
在MQClientAPIImpl.pullMessage方法中,根據入參communicationMode的值分為非同步拉取和同步拉取方式兩種。
無論是非同步方式拉取還是同步方式拉取,在發送拉取請求之前都會構造一個ResponseFuture對象,以請求消息的序列號為key值,存入NettyRemotingAbstract.responseTable:ConcurrentHashMap<Integer /* opaque */, ResponseFuture>變數中,對該變數有幾種情況會處理:
1、發送失敗後直接刪掉responseTable變數中的相應記錄;
2、收到響應消息之後,會以響應消息中的序列號(由服務端根據請求消息的序列號原樣返回)從responseTable中查找ResponseFuture對象,並設置該對象的responseCommand變數。若是同步發送會喚醒等待響應的ResponseFuture.waitResponse方法;若是非同步發送會調用ResponseFuture.executeInvokeCallback()方法完成回調邏輯處理;
3、在NettyRemotingClient.start()啟動時,也會初始化定時任務,該定時任務每隔1秒定期掃描responseTable列表,遍歷該列表中的ResponseFuture對象,檢查等待響應是否超時,若超時,則調用ResponseFuture. executeInvokeCallback()方法,並將該對象從responseTable列表中刪除;
3.1 同步方式拉取消息
對於同步發送方式,調用MQClientAPIImpl.pullMessageSync(String addr, RemotingCommand request, long timeoutMillis)方法。大致步驟如下:
1、調用RemotingClient.invokeSync(String addr, RemotingCommand request, long timeoutMillis)方法:
1.1)獲取Broker地址的Channel信息。根據broker地址從RemotingClient.channelTables:ConcurrentHashMap<String /* addr */, ChannelWrapper>變數中獲取ChannelWrapper對象並返回該對象的Channel變數;若沒有ChannelWrapper對象則與broker地址建立新的連接並將連接信息存入channelTables變數中,便於下次使用;
1.2)若NettyRemotingClient.rpcHook:RPCHook變數不為空(該變數在應用層初始化DefaultMQPushConsumer或者DefaultMQPullConsumer對象傳入該值),則調用RPCHook.doBeforeRequest(String remoteAddr, RemotingCommand request)方法;
1.3)調用NettyRemotingAbstract.invokeSyncImpl(Channel channel, RemotingCommand request, long timeoutMillis)方法,該方法的邏輯如下:
A)使用請求的序列號(opaue)、超時時間初始化ResponseFuture對象;並將該ResponseFuture對象存入NettyRemotingAbstract.responseTable: ConcurrentHashMap<Integer /* opaque */, ResponseFuture>變數中;
B)調用Channel.writeAndFlush(Object msg)方法將請求對象RemotingCommand發送給Broker;然後調用addListener(GenericFutureListener<? extends Future<? super Void>> listener)方法添加內部匿名類:該內部匿名類實現了ChannelFutureListener介面的operationComplete方法,在發送完成之後回調該監聽類的operationComplete方法,在該方法中,首先調用ChannelFuture. isSuccess()方法檢查是否發送成功,若成功則置ResponseFuture對象的sendRequestOK等於true並退出此回調方法等待響應結果;若不成功則置ResponseFuture對象的sendRequestOK等於false,然後從NettyRemotingAbstract.responseTable中刪除此請求序列號(opaue)的記錄,置ResponseFuture對象的responseCommand等於null,並喚醒ResponseFuture.waitResponse(long timeoutMillis)方法的等待;
C)調用ResponseFuture.waitResponse(long timeoutMillis)方法等待響應結果;在發送失敗或者收到響應消息(詳見5.10.3小節)或者超時的情況下會喚醒該方法返回ResponseFuture.responseCommand變數值;
D)若上一步返回的responseCommand值為null,則拋出異常:若ResponseFuture.sendRequestOK為true,則拋出RemotingTimeoutException異常,否則拋出RemotingSendRequestException異常;
E)若上一步返回的responseCommand值不為null,則返回responseCommand變數值;
1.4)若NettyRemotingClient.rpcHook: RPCHook變數不為空,則調用RPCHook.doAfterResponse(String remoteAddr, RemotingCommand request)方法;
2、以上一步的返回值RemotingCommand對象為參數調用MQClientAPIImpl. processPullResponse (RemotingCommand response)方法將返回對象解析並封裝成PullResultExt對象然後返回給調用者,響應消息的結果狀態轉換如下:
2.1)若RemotingCommand對象的Code等於SUCCESS,則PullResultExt.pullStatus=FOUND;
2.2)若RemotingCommand對象的Code等於PULL_NOT_FOUND,則PullResultExt.pullStatus= NO_NEW_MSG;
2.3)若RemotingCommand對象的Code等於PULL_RETRY_IMMEDIATELY,則PullResultExt.pullStatus= NO_MATCHED_MSG;
2.3)若RemotingCommand對象的Code等於PULL_OFFSET_MOVED,則PullResultExt.pullStatus= OFFSET_ILLEGAL;
3.2 非同步方式拉取消息
對於非同步方式拉取消息,調用MQClientAPIImpl.pullMessageAsync(String addr, RemotingCommand request, long timeoutMillis, PullCallback pullCallback)方法。大致邏輯如下:
1、定義了一個內部匿名InvokeCallback類並實現operationComplete (ResponseFuture responseFuture)方法;該方法的大致邏輯如下:
1.1)從入參ResponseFuture對象中獲取傳輸的響應對象RemotingCommand;
1.2)若該響應對象RemotingCommand不為空;則首先調用MQClientAPIImpl. processPullResponse (RemotingCommand response)方法對返回對象解析並封裝成PullResultExt對象,其中PullResultExt.messageBinary等於響應消息的body;然後以PullResultExt對象為參數調用回調類PullCallback對象的onSuccess方法(調用應用層定義的回調方法,詳見5.5.2小節),若在此過程中出現異常則調用PullCallback對象的onException方法(調用應用層定義的回調方法);
1.3)若該返回對象RemotingCommand為空;則檢查ResponseFuture.sendRequestOK是否為true,若不是則發送請求失敗;若發生成功再檢查是否等待超時;對於每種異常情況均調用PullCallback對象的onException方法由應用層來處理異常情況;
2、以匿名類InvokeCallback為參數調用NettyRemotingClient.invokeAsync(String addr, RemotingCommand request, long timeoutMillis, InvokeCallback invokeCallback)方法,大致邏輯如下:
2.1)獲取Broker地址的Channel信息。根據broker地址從RemotingClient.channelTables: ConcurrentHashMap<String /* addr */, ChannelWrapper>變數中獲取ChannelWrapper對象並返回該對象的Channel變數;若沒有ChannelWrapper對象則與broker地址建立新的連接並將連接信息存入channelTables變數中,便於下次使用;
2.2)若NettyRemotingClient.rpcHook: RPCHook變數不為空(該變數在應用層初始化DefaultMQPushConsumer或者DefaultMQPullConsumer對象傳入該值),則調用RPCHook.doBeforeRequest(String remoteAddr, RemotingCommand request)方法;
2.3)調用NettyRemotingAbstract.invokeAsyncImpl(Channel channel, RemotingCommand request,long timeoutMillis,InvokeCallback invokeCallback)方法,該方法的大致邏輯如下:
A)利用java.util.concurrent.Semaphore.tryAcquire(long timeout,TimeUnitunit)獲取信號量,保證該方法的業務邏輯同時執行的線程個數;
B)使用請求的序列號(opaue)、超時時間、InvokeCallback對象、 用Semaphore初始化的SemaphoreReleaseOnlyOnce對象(該對象是確保在釋放信號量是只釋放一次)初始化ResponseFuture對象,並根據請求的序列號(opaue)作為key值,將該ResponseFuture對象存入NettyRemotingAbstract. responseTable對象中;
C)調用Channel.writeAndFlush(Object msg)方法將請求對象發送給Broker,並且添加監聽器,再消息發送完成之後回調該監聽器,該監聽器是內部匿名類,該類實現了ChannelFutureListener介面的operationComplete(ChannelFuture f)方法,該方法的邏輯如下:
C.1)首先調用ChannelFuture.isSuccess()方法檢查是否發送成功,若成功則置ResponseFuture對象的sendRequestOK等於true並退出此回調方法等待對方的響應消息;若不成功則置ResponseFuture對象的sendRequestOK等於false,然後繼續執行下面的邏輯,主要目的是立即向應用層返回發送失敗的響應消息,無需再等待對方的響應結果;
C.2)根據請求的序列號(opaue)從responseTable中刪除相應的ResponseFuture對象記錄;
C.3)將ResponseFuture.responseCommand變數置為null;
C.4)調用ResponseFuture.executeInvokeCallback()方法,在該方法中執行InvokeCallback.OperationComplete(ResponseFuture)方法完成回調工作,在executeInvokeCallback方法之前,確保ResponseFuture. executeCallbackOnlyOnce的值為false並且成功更新為true,由於executeCallbackOnlyOnce在初始化時為false,若更新失敗說明該回調方法已經執行過了,故不在執行;
C.5)最後調用SemaphoreReleaseOnlyOnce對象的realse,釋放信號量;
————————————————
版權聲明:本文為CSDN博主「meilong_whpu」的原創文章,遵循 CC 4.0 BY-SA 版權協議,轉載請附上原文出處鏈接及本聲明。
原文鏈接:https://blog.csdn.net/meilong_whpu/java/article/details/77076298
C. java的動態代理為什麼要用介面
代理模式的定義:為其他對象提供一種代理以控制對這個對象的訪問。在某內些情況下,容一個對象不適合或者不能直接引用另一個對象,而代理對象可以在客戶端和目標對象之間起到中介的作用。
介面是一種規范,定義了一組相似的行為。
通俗一些就是,當調用代理類的方法時與調用被代理類的方法時在寫法上是沒有任何區別的,只有介面才能保證這種一致性。