五、RabbitMQ-客户端源码之AMQChannel
作者:朱小厮 | 出自:https://hiddenpps.blog.csdn.net/column/info/14800
AMQChannel是一个抽象类,是ChannelN的父类。其中包含唯一的抽象方法:
/**
* Protected API - called by nextCommand to check possibly handle an incoming Command before it is returned to the caller of nextCommand. If this method
* returns true, the command is considered handled and is not passed back to nextCommand's caller; if it returns false, nextCommand returns the command as
* usual. This is used in subclasses to implement handling of Basic.Return and Basic.Deliver messages, as well as Channel.Close and Connection.Close.
* @param command the command to handle asynchronously
* @return true if we handled the command; otherwise the caller should consider it "unhandled"
*/
public abstract boolean processAsync(Command command) throws IOException;
有关processAsync()这个方法的会在介绍ChannelN类的时候详细阐述([[八]RabbitMQ-客户端源码之ChannelN][RabbitMQ-_ChannelN])。
首先来说下AMQChannel的成员变量:
protected final Object _channelMutex = new Object();
/** The connection this channel is associated with. */
private final AMQConnection _connection;
/** This channel's channel number. */
private final int _channelNumber;
/** Command being assembled */
private AMQCommand _command = new AMQCommand();
/** The current outstanding RPC request, if any. (Could become a queue in future.) */
private RpcContinuation _activeRpc = null;
/** Whether transmission of content-bearing methods should be blocked */
public volatile boolean _blockContent = false;
- _channelMutex这个是内部用来当对象锁的,没有实际的意义,可忽略
- _connection是指AMQConnection这个对象。
- _channelNumber是指channel number, 这个应该不用多解释了吧。通道编号为0的代表全局连接中的所有帧,1-65535代表特定通道的帧.
- _command是内部处理使用的对象,调用AMQCommand的方法来处理一些东西。
- _activeRpc是指当前未处理完的rpc请求(the current outstanding rpc request)。
- _blockContent 是在Channel.Flow里用到的,其余情况都是false
在AMQChannel的构造函数中,只有两个参数:AMQConnection connection以及int channelNumber.
AMQChannel中有个handleFrame方法:
/**
* Private API - When the Connection receives a Frame for this
* channel, it passes it to this method.
* @param frame the incoming frame
* @throws IOException if an error is encountered
*/
public void handleFrame(Frame frame) throws IOException {
AMQCommand command = _command;
if (command.handleFrame(frame)) { // a complete command has rolled off the assembly line
_command = new AMQCommand(); // prepare for the next one
handleCompleteInboundCommand(command);
}
/**
* Private API - handle a command which has been assembled
* @throws IOException if there's any problem
*
* @param command the incoming command
* @throws IOException
*/
public void handleCompleteInboundCommand(AMQCommand command) throws IOException {
// First, offer the command to the asynchronous-command
// handling mechanism, which gets to act as a filter on the
// incoming command stream. If processAsync() returns true,
// the command has been dealt with by the filter and so should
// not be processed further. It will return true for
// asynchronous commands (deliveries/returns/other events),
// and false for commands that should be passed on to some
// waiting RPC continuation.
if (!processAsync(command)) {
// The filter decided not to handle/consume the command,
// so it must be some reply to an earlier RPC.
nextOutstandingRpc().handleCommand(command);
markRpcFinished();
}
}
这个在[[六]RabbitMQ-客户端源码之AMQCommand][RabbitMQ-_AMQCommand]有所介绍,主要是用来处理Frame帧的,当调用AMQCommand的handleFrame处理之后返回为true是,即处理完毕时继续调用handleCompleteInboundCommand方法。这其中也牵涉到AMQConnection的MainLoop内部类,具体可以看看:[[六]RabbitMQ-客户端源码之AMQCommand][RabbitMQ-_AMQCommand]。
AMQChannel中有很多方法带有rpc的字样,这来做一个整理。
首先是:
public void enqueueRpc(RpcContinuation k)
synchronized (_channelMutex) {
boolean waitClearedInterruptStatus = false;
while (_activeRpc != null) {
try {
_channelMutex.wait();
} catch (InterruptedException e) {
waitClearedInterruptStatus = true;
}
}
if (waitClearedInterruptStatus) {
Thread.currentThread().interrupt();
}
_activeRpc = k;
}
}
这个方法在AMQConnection.start()方法中有过使用:_channel0.enqueueRpc(conStartBroker)。这个方法就是将参数付给成员变量_activeRpc,至于这个RpcContinuation到底是个什么gui,我们下面再讲。
继续下一个方法:
public boolean isOutstandingRpc()
synchronized (_channelMutex) {
return (_activeRpc != null);
}
}
这个方法是判断一下当前的_activeRpc是否为null,为null则为false,否则为true。看方法的名字应该猜出大半。
下面一个方法:
public RpcContinuation nextOutstandingRpc()
synchronized (_channelMutex) {
RpcContinuation result = _activeRpc;
_activeRpc = null;
_channelMutex.notifyAll();
return result;
}
}
方法将当前的_activeRpc返回,并置AQMChannel的_activeRpc为null。
接下来几个方法联系性很强:
/**
* Protected API - sends a {@link Method} to the broker and waits for the
* next in-bound Command from the broker: only for use from
* non-connection-MainLoop threads!
*/
public AMQCommand rpc(Method m)
throws IOException, ShutdownSignalException
return privateRpc(m);
public AMQCommand rpc(Method m, int timeout)
throws IOException, ShutdownSignalException, TimeoutException {
return privateRpc(m, timeout);
private AMQCommand privateRpc(Method m)
throws IOException, ShutdownSignalException
SimpleBlockingRpcContinuation k = new SimpleBlockingRpcContinuation();
rpc(m, k);
// At this point, the request method has been sent, and we
// should wait for the reply to arrive.
//
// Calling getReply() on the continuation puts us to sleep
// until the connection's reader-thread throws the reply over
// the fence.
return k.getReply();
private AMQCommand privateRpc(Method m, int timeout)
throws IOException, ShutdownSignalException, TimeoutException {
SimpleBlockingRpcContinuation k = new SimpleBlockingRpcContinuation();
rpc(m, k);
return k.getReply(timeout);
public void rpc(Method m, RpcContinuation k)
throws IOException
synchronized (_channelMutex) {
ensureIsOpen();
quiescingRpc(m, k);
}
public void quiescingRpc(Method m, RpcContinuation k)
throws IOException
synchronized (_channelMutex) {
enqueueRpc(k);
quiescingTransmit(m);
}
}
主要是看最后一个方法——quiescingRpc.这个方法说白就两行代码:
enqueueRpc(k);是将由privateRpc等方法内部创建的SimpleBlockingRpcContinuation对象附给当前的AQMChannel对象的成员变量_activeRpc
关于quiescingTransmit(m)就要接下去看了:
public void quiescingTransmit(Method m) throws IOException {
synchronized (_channelMutex) {
quiescingTransmit(new AMQCommand(m));
}
public void quiescingTransmit(AMQCommand c) throws IOException {
synchronized (_channelMutex) {
if (c.getMethod().hasContent()) {
while (_blockContent) {
try {
_channelMutex.wait();
} catch (InterruptedException e) {}
// This is to catch a situation when the thread wakes up during
// shutdown. Currently, no command that has content is allowed
// to send anything in a closing state.
ensureIsOpen();
}
}
c.transmit(this);
}
}
上面代码只需要看: c.transmit(this);这一句,其余的都是摆设。看到这里,就调用了AMQCommand的transmit方法,这个transmit方法就是讲AMQChannel中封装的内容发给broker,然后等待broker返回,进而通过之前附值的_activeRpc来处理回传的帧。
虽然之前在AMQConnection([[二]RabbitMQ-客户端源码之AMQConnection][RabbitMQ-_AMQConnection])中详细讲述了start()方法,但是这里还是要来拿这个来举例这个AMQChannel中的rpc怎么使用
在AMQConnection中有这么一段代码:
Method method = (challenge == null)
? new AMQP.Connection.StartOk.Builder()
.clientProperties(_clientProperties)
.mechanism(sm.getName())
.response(response)
.build()
: new AMQP.Connection.SecureOk.Builder().response(response).build();
try {
Method serverResponse = _channel0.rpc(method, HANDSHAKE_TIMEOUT/2).getMethod();
if (serverResponse instanceof AMQP.Connection.Tune) {
connTune = (AMQP.Connection.Tune) serverResponse;
} else {
challenge = ((AMQP.Connection.Secure) serverResponse).getChallenge();
response = sm.handleChallenge(challenge, this.username, this.password);
}
客户端将Method封装成Connection.StartOk帧之后等待broker返回Connection.Tune帧。
此时调用了AMQChannel的rpc(Method m, int timeout)方法,其间接调用了AMQChannel的privateRpc(Method, int timeout)方法。代码详情上面已经罗列出来。
注意privateRpc(Method, int timeout)方法的最有一句返回:return k.getReply(timeout);这句代码的意思是SimpleBlockingRpcContinuation对象在等待broker的返回,确切的来说是MainLoop线程处理之后返回,即AMQChannel类中handleCompleteInboundCommand方法的nextOutstandingRpc().handleCommand(command)这行代码。
AQMChannel还有些其他的内容,都是边缘性的东西,这里还剩下个RpcContinuation要着重阐述下的:
public interface RpcContinuation {
void handleCommand(AMQCommand command);
void handleShutdownSignal(ShutdownSignalException signal);
public static abstract class BlockingRpcContinuation
implements RpcContinuation {
public final BlockingValueOrException
_blocker =
new BlockingValueOrException
();
public void handleCommand(AMQCommand command) {
_blocker.setValue(transformReply(command));
}
public void handleShutdownSignal(ShutdownSignalException signal) {
_blocker.setException(signal);
}
public T getReply() throws ShutdownSignalException
{
return _blocker.uninterruptibleGetValue();
}
public T getReply(int timeout)
throws ShutdownSignalException, TimeoutException
{
return _blocker.uninterruptibleGetValue(timeout);
}
public abstract T transformReply(AMQCommand command);
public static class SimpleBlockingRpcContinuation
extends BlockingRpcContinuation
public AMQCommand transformReply(AMQCommand command) {
return command;
}
RPCContinuation只是一个接口,而BlockingRpcContinuation这个抽象类缺似乎略有门道。而SimpleBlockingRpcContinuation只是将BlockingRpcContinuation中的handleCommand方法便成为:
_blocker.setValue(command);
BlockingRpcContinuation类主要操纵了BlockingValueOrException _blocker这个成员变量。再接下深究BlockingValueOrException其实是继承了BlockingCell,对其做了一下简单的封装。最后来看下BlockingCell是个什么鬼, 截取部分代码如下:
public class BlockingCell
{
private boolean _filled = false;
private T _value;
public synchronized T get() throws InterruptedException {
while (!_filled) {
wait();
}
return _value;
}
其实这个就是capacity为1的BlockingQueue,顾美其名曰BlockingCell,绕了大半圈,原来AMQChannel中的_activeRpc就是个这么玩意儿~