<ruby id="bdb3f"></ruby>

    <p id="bdb3f"><cite id="bdb3f"></cite></p>

      <p id="bdb3f"><cite id="bdb3f"><th id="bdb3f"></th></cite></p><p id="bdb3f"></p>
        <p id="bdb3f"><cite id="bdb3f"></cite></p>

          <pre id="bdb3f"></pre>
          <pre id="bdb3f"><del id="bdb3f"><thead id="bdb3f"></thead></del></pre>

          <ruby id="bdb3f"><mark id="bdb3f"></mark></ruby><ruby id="bdb3f"></ruby>
          <pre id="bdb3f"><pre id="bdb3f"><mark id="bdb3f"></mark></pre></pre><output id="bdb3f"></output><p id="bdb3f"></p><p id="bdb3f"></p>

          <pre id="bdb3f"><del id="bdb3f"><progress id="bdb3f"></progress></del></pre>

                <ruby id="bdb3f"></ruby>

                合規國際互聯網加速 OSASE為企業客戶提供高速穩定SD-WAN國際加速解決方案。 廣告
                FrameBuffer是ThriftNIO服務器端的一個核心組件,它一方面承擔了NIO編程中的緩沖區的功能,另一方面還承擔了RPC方法調用的職責。 ![](https://box.kancloud.cn/2016-02-19_56c6c62b9330f.jpg) FrameBufferState定義了FrameBuffer作為緩沖區的讀寫狀態 ~~~ private enum FrameBufferState { // in the midst of reading the frame size off the wire // 讀Frame消息頭,實際是4字節表示Frame長度 ?READING_FRAME_SIZE, // reading the actual frame data now, but not all the way done yet // 讀Frame消息體 ?READING_FRAME, // completely read the frame, so an invocation can now happen // 讀滿包 ?READ_FRAME_COMPLETE, // waiting to get switched to listening for write events // 等待注冊寫 ?AWAITING_REGISTER_WRITE, // started writing response data, not fully complete yet // 寫半包 ?WRITING, // another thread wants this framebuffer to go back to reading // 等待注冊讀 ?AWAITING_REGISTER_READ, // we want our transport and selection key invalidated in the selector // thread // 等待關閉 ?AWAITING_CLOSE } ~~~ 值得注意的是,FrameBuffer讀數據時, 1. 先讀4字節的Frame消息頭, 2. 然后改變FrameBufferState,從READING_FRMAE_SIZE到READING_FRAME,并根據讀到的Frame長度修改Buffer的長度 3. 再次讀Frame消息體,如果讀完就修改狀態到READ_FRAME_COMPLETE,否則還是把FrameBuffer綁定到SelectionKey,下次繼續讀 ~~~ public boolean read() { if (state_ == FrameBufferState.READING_FRAME_SIZE) { // try to read the frame size completely if (!internalRead()) { return false; } // if the frame size has been read completely, then prepare to read the // actual frame. if (buffer_.remaining() == 0) { // pull out the frame size as an integer. int frameSize = buffer_.getInt(0); if (frameSize <= 0) { LOGGER.error("Read an invalid frame size of " + frameSize + ". Are you using TFramedTransport on the client side?"); return false; } // if this frame will always be too large for this server, log the // error and close the connection. if (frameSize > MAX_READ_BUFFER_BYTES) { LOGGER.error("Read a frame size of " + frameSize + ", which is bigger than the maximum allowable buffer size for ALL connections."); return false; } // if this frame will push us over the memory limit, then return. // with luck, more memory will free up the next time around. if (readBufferBytesAllocated.get() + frameSize > MAX_READ_BUFFER_BYTES) { return true; } // increment the amount of memory allocated to read buffers readBufferBytesAllocated.addAndGet(frameSize); // reallocate the readbuffer as a frame-sized buffer buffer_ = ByteBuffer.allocate(frameSize); state_ = FrameBufferState.READING_FRAME; } else { // this skips the check of READING_FRAME state below, since we can't // possibly go on to that state if there's data left to be read at // this one. return true; } } // it is possible to fall through from the READING_FRAME_SIZE section // to READING_FRAME if there's already some frame data available once // READING_FRAME_SIZE is complete. if (state_ == FrameBufferState.READING_FRAME) { if (!internalRead()) { return false; } // since we're already in the select loop here for sure, we can just // modify our selection key directly. if (buffer_.remaining() == 0) { // get rid of the read select interests selectionKey_.interestOps(0); state_ = FrameBufferState.READ_FRAME_COMPLETE; } return true; } // if we fall through to this point, then the state must be invalid. LOGGER.error("Read was called but state is invalid (" + state_ + ")"); return false; } ~~~ internalRead方法實際調用了SocketChannel來讀數據。注意SocketChannel返回值小于0的情況: n 有數據的時候返回讀取到的字節數。 0 沒有數據并且沒有達到流的末端時返回0。 -1 當達到流末端的時候返回-1。 當Channel有數據時并且是最后的數據 時,實際會讀兩次,第一次返回字節數,第二次返回-1。這個是底層Selector實現的。 ~~~ private boolean internalRead() { try { if (trans_.read(buffer_) < 0) { return false; } return true; } catch (IOException e) { LOGGER.warn("Got an IOException in internalRead!", e); return false; } } ~~~ 在看寫緩沖時的情況 1. 寫之前必須把FrameBuffer的狀態改成WRITING,后面會有具體例子 2. 如果沒寫任何數據,就返回false 3. 如果寫完了,就需要把SelectionKey注冊的寫事件取消。Thrift是直接把SelectionKey注冊事件改成讀了,而常用的做法一般是把寫事件取消就行了。關于更多NIO寫事件的注冊問題,看這篇:[http://blog.csdn.net/iter_zc/article/details/39291129](http://blog.csdn.net/iter_zc/article/details/39291129) ~~~ public boolean write() { if (state_ == FrameBufferState.WRITING) { try { if (trans_.write(buffer_) < 0) { return false; } } catch (IOException e) { LOGGER.warn("Got an IOException during write!", e); return false; } // we're done writing. now we need to switch back to reading. if (buffer_.remaining() == 0) { prepareRead(); } return true; } LOGGER.error("Write was called, but state is invalid (" + state_ + ")"); return false; } ~~~ FrameBuffer可以根據SelectionKey的狀態來切換自身狀態,也可以根據自身狀態來選擇注冊的Channel事件 ~~~ public void changeSelectInterests() { if (state_ == FrameBufferState.AWAITING_REGISTER_WRITE) { // set the OP_WRITE interest selectionKey_.interestOps(SelectionKey.OP_WRITE); state_ = FrameBufferState.WRITING; } else if (state_ == FrameBufferState.AWAITING_REGISTER_READ) { prepareRead(); } else if (state_ == FrameBufferState.AWAITING_CLOSE) { close(); selectionKey_.cancel(); } else { LOGGER.error("changeSelectInterest was called, but state is invalid (" + state_ + ")"); } } ~~~ 說完了FrameBuffer作為NIO緩沖區的功能,再看看它作為RPC方法調用模型的重要組件的功能。 FrameBuffer提供了invoker方法,當讀滿包時,從消息頭拿到要調用的方法,然后通過它管理的Processor來完成實際方法調用。然后切換到寫模式來寫消息體 具體的調用模型看這篇: [http://blog.csdn.net/iter_zc/article/details/39692951](http://blog.csdn.net/iter_zc/article/details/39692951) ~~~ public void invoke() { TTransport inTrans = getInputTransport(); TProtocol inProt = inputProtocolFactory_.getProtocol(inTrans); TProtocol outProt = outputProtocolFactory_.getProtocol(getOutputTransport()); try { processorFactory_.getProcessor(inTrans).process(inProt, outProt); responseReady(); return; } catch (TException te) { LOGGER.warn("Exception while invoking!", te); } catch (Throwable t) { LOGGER.error("Unexpected throwable while invoking!", t); } // This will only be reached when there is a throwable. state_ = FrameBufferState.AWAITING_CLOSE; requestSelectInterestChange(); } public void responseReady() { ???? // the read buffer is definitely no longer in use, so we will decrement ???? // our read buffer count. we do this here as well as in close because ???? // we'd like to free this read memory up as quickly as possible for other ???? // clients. ???? readBufferBytesAllocated.addAndGet(-buffer_.array().length); ???? if (response_.len() == 0) { ?????? // go straight to reading again. this was probably an oneway method ?????? state_ = FrameBufferState.AWAITING_REGISTER_READ; ?????? buffer_ = null; ???? } else { ?????? buffer_ = ByteBuffer.wrap(response_.get(), 0, response_.len()); ?????? // set state that we're waiting to be switched to write. we do this ?????? // asynchronously through requestSelectInterestChange() because there is ?????? // a possibility that we're not in the main thread, and thus currently ?????? // blocked in select(). (this functionality is in place for the sake of ?????? // the HsHa server.) ?????? state_ = FrameBufferState.AWAITING_REGISTER_WRITE; ???? } ???? requestSelectInterestChange(); ?? } ~~~ 寫消息體responseReday()方法時,我們看到Thrift是如何處理寫的 1. 創建ByteBuffer 2. 修改狀態到AWAITING_REGISTER_WRITE 3. 調用requestSelecInteresetChange()方法來注冊Channel的寫事件 4. 當Selector根據isWriteable狀態來調用要寫的Channel時,會調用FrameBuffer的write方法,上面說了write方法寫滿包后,會取消注冊的寫事件。
                  <ruby id="bdb3f"></ruby>

                  <p id="bdb3f"><cite id="bdb3f"></cite></p>

                    <p id="bdb3f"><cite id="bdb3f"><th id="bdb3f"></th></cite></p><p id="bdb3f"></p>
                      <p id="bdb3f"><cite id="bdb3f"></cite></p>

                        <pre id="bdb3f"></pre>
                        <pre id="bdb3f"><del id="bdb3f"><thead id="bdb3f"></thead></del></pre>

                        <ruby id="bdb3f"><mark id="bdb3f"></mark></ruby><ruby id="bdb3f"></ruby>
                        <pre id="bdb3f"><pre id="bdb3f"><mark id="bdb3f"></mark></pre></pre><output id="bdb3f"></output><p id="bdb3f"></p><p id="bdb3f"></p>

                        <pre id="bdb3f"><del id="bdb3f"><progress id="bdb3f"></progress></del></pre>

                              <ruby id="bdb3f"></ruby>

                              哎呀哎呀视频在线观看