跳至主要內容

七、RabbitMQ-客户端源码之AMQPImpl+Method

安图新大约 5 分钟消息队列

作者:朱小厮 | 出自:https://hiddenpps.blog.csdn.net/column/info/14800open in new window

AMQPImpl类包括AMQP接口(public class AMQImpl implements AMQP)主要囊括了AMQP协议中的通信帧的类别。

这里以Connection.Start帧做一个例子。

public static class Connection {
    public static final int INDEX = 10;
    public static class Start
        extends Method
        implements com.rabbitmq.client.AMQP.Connection.Start
    {
        public static final int INDEX = 10;
        private final int versionMajor;
        private final int versionMinor;
        private final Map
   
     
      
     serverProperties;
        private final LongString mechanisms;
        private final LongString locales;
....//下面省略很多代码。。。
   
     

可以看到Start类是Connection类的内部静态子类,表示此Start类为Connection.Start,而且Start类是继承Method方法的,包括接下来所有的AMQP协议帧都是继承这个Method方法,Method可以看成用来区分AMQP协议帧的类型。

Method类是一个抽象类(Base class for AMQP method objects, specialized by autogenerated code in AMQP.java),我们来看下Method类的代码:

public abstract class Method implements com.rabbitmq.client.Method {
    /** {@inheritDoc} */
    public abstract int protocolClassId(); /* properly an unsigned short */
    /** {@inheritDoc} */
    public abstract int protocolMethodId(); /* properly an unsigned short */
    /** {@inheritDoc} */
    public abstract String protocolMethodName();
    /**
     * Tell if content is present.
     * @return true if the wire-protocol for this method should involve a content header and body,
     * or false if it should just involve a single method frame.
     */
    public abstract boolean hasContent();
    /**
     * Visitor support (double-dispatch mechanism).
     * @param visitor the visitor object
     * @return the result of the "visit" operation
     * @throws IOException if an error is encountered
     */
    public abstract Object visit(MethodVisitor visitor) throws IOException;
    /**
     * Private API - Autogenerated writer for this method.
     * @param writer interface to an object to write the method arguments
     * @throws IOException if an error is encountered
     */
    public abstract void writeArgumentsTo(MethodArgumentWriter writer) throws IOException;
    /**
     * Public API - debugging utility
     * @param buffer the buffer to append debug data to
     */
    public void appendArgumentDebugStringTo(StringBuilder buffer) {
        buffer.append("(?)");
    }
    @Override public String toString() {
        StringBuilder sb = new StringBuilder();
        sb.append("#method<").append(protocolMethodName()).append(">");
        this.appendArgumentDebugStringTo(sb);
        return sb.toString();
    }
    public Frame toFrame(int channelNumber) throws IOException {
        Frame frame = new Frame(AMQP.FRAME_METHOD, channelNumber);
        DataOutputStream bodyOut = frame.getOutputStream();
        bodyOut.writeShort(protocolClassId());
        bodyOut.writeShort(protocolMethodId());
        MethodArgumentWriter argWriter = new MethodArgumentWriter(new ValueWriter(bodyOut));
        writeArgumentsTo(argWriter);
        argWriter.flush();
        return frame;
    }

代码不长。挑几个解释下。

protocolClassId()和protocolMethodId():每一个Method(Connection.Start/.StartOk, Connection.Tune/.TuneOk等等)都包含classId和methodId,可以参考下图:

 

protocolMethodName()返回本Method的名称,比如Connection.Start的就是:

public String protocolMethodName() { return "connection.start";}

boolean hasContent()用来区分这个Method之后是否有Content-Body,比如Connection.Start的为:

public boolean hasContent() { return false; }

又比如Basic.Publish的为:

public boolean hasContent() { return true; }


好了,这里可以回来接着讲AMQPImpl了。

下面是一张表,用来涵盖AQMP协议各个种类的Method以及其一些属性,看完这张表就看完了AMQPImpl的全部。

Method-NameclassIdmethodIdhasContent
Connection.Start1010false
Connection.StartOk1011false
Connection.Secure1020false
Connection.SecureOk1021false
Connection.Tune1030false
Connection.TuneOk1031false
Connection.Open1040false
Connection.OpenOk1041false
Connection.Close1050false
Connection.CloseOk1051false
Connection.Blocked1060false
Connection.Unblocked1061false
Channel.Open2010false
Channel.OpenOk2011false
Channel.Flow2020false
Channel.FlowOk2021false
Channel.Close2040false
Channel.CloseOk2041false
Access.Request3010false
Access.RequestOk3011false
Exchange.Declare4010false
Exchange.DeclareOk4011false
Exchange.Delete4020false
Exchange.DeleteOk4021false
Exchange.Bind4030false
Exchange.BindOk4031false
Exchange.Unbind4040false
Exchange.UnbindOk4051false
Queue.Declare5010false
Queue.DeclareOk5011false
Queue.Bind5020false
Queue.BindOk5021false
Queue.Purge5030false
Queue.PurgeOk5031false
Queue.Delete5040false
Queue.DeleteOk5041false
Queue.Unbind5050false
Queue.UnbindOk5051false
Basic.Qos6010false
Basic.QosOk6011false
Basic.Consume6020false
Basic.ConsumeOk6021false
Basic.Cancel6030false
Basic.CancelOk6031false
Basic.Publish6040true
Basic.Return6050true
Basic.Deliver6060true
Basic.Get6070false
Basic.GetOk6071true
Basic.GetEmpty6072false
Basic.Ack6080false
Basic.Reject6090false
Basic.RecoverAsync60100false
Basic.Recover60110false
Basic.RecoverOk60111false
Basic.Nack60120false
Tx.Select9010false
Tx.SelectOk9011false
Tx.Commit9020false
Tx.CommitOk9021false
Tx.Rollback9030false
Tx.RollbackOk9031false
Confirm.Select8510false
Confirm.SelectOk8511false
上次编辑于:
贡献者: Andy