FrameBuffer是ThriftNIO服務器端的一個核心組件,它一方面承擔了NIO編程中的緩沖區的功能,另一方面還承擔了RPC方法調用的職責。

FrameBufferState定義了FrameBuffer作為緩沖區的讀寫狀態
~~~
private enum FrameBufferState {
// in the midst of reading the frame size off the wire
// 讀Frame消息頭,實際是4字節表示Frame長度
?READING_FRAME_SIZE,
// reading the actual frame data now, but not all the way done yet
// 讀Frame消息體
?READING_FRAME,
// completely read the frame, so an invocation can now happen
// 讀滿包
?READ_FRAME_COMPLETE,
// waiting to get switched to listening for write events
// 等待注冊寫
?AWAITING_REGISTER_WRITE,
// started writing response data, not fully complete yet
// 寫半包
?WRITING,
// another thread wants this framebuffer to go back to reading
// 等待注冊讀
?AWAITING_REGISTER_READ,
// we want our transport and selection key invalidated in the selector
// thread
// 等待關閉
?AWAITING_CLOSE
}
~~~
值得注意的是,FrameBuffer讀數據時,
1. 先讀4字節的Frame消息頭,
2. 然后改變FrameBufferState,從READING_FRMAE_SIZE到READING_FRAME,并根據讀到的Frame長度修改Buffer的長度
3. 再次讀Frame消息體,如果讀完就修改狀態到READ_FRAME_COMPLETE,否則還是把FrameBuffer綁定到SelectionKey,下次繼續讀
~~~
public boolean read() {
if (state_ == FrameBufferState.READING_FRAME_SIZE) {
// try to read the frame size completely
if (!internalRead()) {
return false;
}
// if the frame size has been read completely, then prepare to read the
// actual frame.
if (buffer_.remaining() == 0) {
// pull out the frame size as an integer.
int frameSize = buffer_.getInt(0);
if (frameSize <= 0) {
LOGGER.error("Read an invalid frame size of " + frameSize
+ ". Are you using TFramedTransport on the client side?");
return false;
}
// if this frame will always be too large for this server, log the
// error and close the connection.
if (frameSize > MAX_READ_BUFFER_BYTES) {
LOGGER.error("Read a frame size of " + frameSize
+ ", which is bigger than the maximum allowable buffer size for ALL connections.");
return false;
}
// if this frame will push us over the memory limit, then return.
// with luck, more memory will free up the next time around.
if (readBufferBytesAllocated.get() + frameSize > MAX_READ_BUFFER_BYTES) {
return true;
}
// increment the amount of memory allocated to read buffers
readBufferBytesAllocated.addAndGet(frameSize);
// reallocate the readbuffer as a frame-sized buffer
buffer_ = ByteBuffer.allocate(frameSize);
state_ = FrameBufferState.READING_FRAME;
} else {
// this skips the check of READING_FRAME state below, since we can't
// possibly go on to that state if there's data left to be read at
// this one.
return true;
}
}
// it is possible to fall through from the READING_FRAME_SIZE section
// to READING_FRAME if there's already some frame data available once
// READING_FRAME_SIZE is complete.
if (state_ == FrameBufferState.READING_FRAME) {
if (!internalRead()) {
return false;
}
// since we're already in the select loop here for sure, we can just
// modify our selection key directly.
if (buffer_.remaining() == 0) {
// get rid of the read select interests
selectionKey_.interestOps(0);
state_ = FrameBufferState.READ_FRAME_COMPLETE;
}
return true;
}
// if we fall through to this point, then the state must be invalid.
LOGGER.error("Read was called but state is invalid (" + state_ + ")");
return false;
}
~~~
internalRead方法實際調用了SocketChannel來讀數據。注意SocketChannel返回值小于0的情況:
n 有數據的時候返回讀取到的字節數。
0 沒有數據并且沒有達到流的末端時返回0。
-1 當達到流末端的時候返回-1。
當Channel有數據時并且是最后的數據 時,實際會讀兩次,第一次返回字節數,第二次返回-1。這個是底層Selector實現的。
~~~
private boolean internalRead() {
try {
if (trans_.read(buffer_) < 0) {
return false;
}
return true;
} catch (IOException e) {
LOGGER.warn("Got an IOException in internalRead!", e);
return false;
}
}
~~~
在看寫緩沖時的情況
1. 寫之前必須把FrameBuffer的狀態改成WRITING,后面會有具體例子
2. 如果沒寫任何數據,就返回false
3. 如果寫完了,就需要把SelectionKey注冊的寫事件取消。Thrift是直接把SelectionKey注冊事件改成讀了,而常用的做法一般是把寫事件取消就行了。關于更多NIO寫事件的注冊問題,看這篇:[http://blog.csdn.net/iter_zc/article/details/39291129](http://blog.csdn.net/iter_zc/article/details/39291129)
~~~
public boolean write() {
if (state_ == FrameBufferState.WRITING) {
try {
if (trans_.write(buffer_) < 0) {
return false;
}
} catch (IOException e) {
LOGGER.warn("Got an IOException during write!", e);
return false;
}
// we're done writing. now we need to switch back to reading.
if (buffer_.remaining() == 0) {
prepareRead();
}
return true;
}
LOGGER.error("Write was called, but state is invalid (" + state_ + ")");
return false;
}
~~~
FrameBuffer可以根據SelectionKey的狀態來切換自身狀態,也可以根據自身狀態來選擇注冊的Channel事件
~~~
public void changeSelectInterests() {
if (state_ == FrameBufferState.AWAITING_REGISTER_WRITE) {
// set the OP_WRITE interest
selectionKey_.interestOps(SelectionKey.OP_WRITE);
state_ = FrameBufferState.WRITING;
} else if (state_ == FrameBufferState.AWAITING_REGISTER_READ) {
prepareRead();
} else if (state_ == FrameBufferState.AWAITING_CLOSE) {
close();
selectionKey_.cancel();
} else {
LOGGER.error("changeSelectInterest was called, but state is invalid (" + state_ + ")");
}
}
~~~
說完了FrameBuffer作為NIO緩沖區的功能,再看看它作為RPC方法調用模型的重要組件的功能。
FrameBuffer提供了invoker方法,當讀滿包時,從消息頭拿到要調用的方法,然后通過它管理的Processor來完成實際方法調用。然后切換到寫模式來寫消息體
具體的調用模型看這篇: [http://blog.csdn.net/iter_zc/article/details/39692951](http://blog.csdn.net/iter_zc/article/details/39692951)
~~~
public void invoke() {
TTransport inTrans = getInputTransport();
TProtocol inProt = inputProtocolFactory_.getProtocol(inTrans);
TProtocol outProt = outputProtocolFactory_.getProtocol(getOutputTransport());
try {
processorFactory_.getProcessor(inTrans).process(inProt, outProt);
responseReady();
return;
} catch (TException te) {
LOGGER.warn("Exception while invoking!", te);
} catch (Throwable t) {
LOGGER.error("Unexpected throwable while invoking!", t);
}
// This will only be reached when there is a throwable.
state_ = FrameBufferState.AWAITING_CLOSE;
requestSelectInterestChange();
}
public void responseReady() {
???? // the read buffer is definitely no longer in use, so we will decrement
???? // our read buffer count. we do this here as well as in close because
???? // we'd like to free this read memory up as quickly as possible for other
???? // clients.
???? readBufferBytesAllocated.addAndGet(-buffer_.array().length);
???? if (response_.len() == 0) {
?????? // go straight to reading again. this was probably an oneway method
?????? state_ = FrameBufferState.AWAITING_REGISTER_READ;
?????? buffer_ = null;
???? } else {
?????? buffer_ = ByteBuffer.wrap(response_.get(), 0, response_.len());
?????? // set state that we're waiting to be switched to write. we do this
?????? // asynchronously through requestSelectInterestChange() because there is
?????? // a possibility that we're not in the main thread, and thus currently
?????? // blocked in select(). (this functionality is in place for the sake of
?????? // the HsHa server.)
?????? state_ = FrameBufferState.AWAITING_REGISTER_WRITE;
???? }
???? requestSelectInterestChange();
?? }
~~~
寫消息體responseReday()方法時,我們看到Thrift是如何處理寫的
1. 創建ByteBuffer
2. 修改狀態到AWAITING_REGISTER_WRITE
3. 調用requestSelecInteresetChange()方法來注冊Channel的寫事件
4. 當Selector根據isWriteable狀態來調用要寫的Channel時,會調用FrameBuffer的write方法,上面說了write方法寫滿包后,會取消注冊的寫事件。