001/** 002 * Licensed to the Apache Software Foundation (ASF) under one or more 003 * contributor license agreements. See the NOTICE file distributed with 004 * this work for additional information regarding copyright ownership. 005 * The ASF licenses this file to You under the Apache License, Version 2.0 006 * (the "License"); you may not use this file except in compliance with 007 * the License. You may obtain a copy of the License at 008 * 009 * http://www.apache.org/licenses/LICENSE-2.0 010 * 011 * Unless required by applicable law or agreed to in writing, software 012 * distributed under the License is distributed on an "AS IS" BASIS, 013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 014 * See the License for the specific language governing permissions and 015 * limitations under the License. 016 */ 017package org.apache.activemq.transport.mqtt; 018 019import java.io.IOException; 020 021import javax.jms.JMSException; 022import org.apache.activemq.transport.tcp.TcpTransport; 023import org.fusesource.hawtbuf.DataByteArrayInputStream; 024import org.fusesource.hawtbuf.DataByteArrayOutputStream; 025import org.fusesource.mqtt.codec.*; 026 027public class MQTTCodec { 028 029 TcpTransport transport; 030 031 DataByteArrayOutputStream currentCommand = new DataByteArrayOutputStream(); 032 boolean processedHeader = false; 033 String action; 034 byte header; 035 int contentLength = -1; 036 int previousByte = -1; 037 int payLoadRead = 0; 038 039 public MQTTCodec(TcpTransport transport) { 040 this.transport = transport; 041 } 042 043 public void parse(DataByteArrayInputStream input, int readSize) throws Exception { 044 int i = 0; 045 byte b; 046 while (i++ < readSize) { 047 b = input.readByte(); 048 // skip repeating nulls 049 if (!processedHeader && b == 0) { 050 previousByte = 0; 051 continue; 052 } 053 054 if (!processedHeader) { 055 i += processHeader(b, input); 056 if (contentLength == 0) { 057 processCommand(); 058 } 059 060 } else { 061 062 if (contentLength == -1) { 063 // end of command reached, unmarshal 064 if (b == 0) { 065 processCommand(); 066 } else { 067 currentCommand.write(b); 068 } 069 } else { 070 // read desired content length 071 if (payLoadRead == contentLength) { 072 processCommand(); 073 i += processHeader(b, input); 074 } else { 075 currentCommand.write(b); 076 payLoadRead++; 077 } 078 } 079 } 080 081 previousByte = b; 082 } 083 if (processedHeader && payLoadRead == contentLength) { 084 processCommand(); 085 } 086 } 087 088 /** 089 * sets the content length 090 * 091 * @return number of bytes read 092 */ 093 private int processHeader(byte header, DataByteArrayInputStream input) { 094 this.header = header; 095 byte digit; 096 int multiplier = 1; 097 int read = 0; 098 int length = 0; 099 do { 100 digit = input.readByte(); 101 length += (digit & 0x7F) * multiplier; 102 multiplier <<= 7; 103 read++; 104 } while ((digit & 0x80) != 0); 105 106 contentLength = length; 107 processedHeader = true; 108 return read; 109 } 110 111 112 private void processCommand() throws Exception { 113 MQTTFrame frame = new MQTTFrame(currentCommand.toBuffer().deepCopy()).header(header); 114 transport.doConsume(frame); 115 processedHeader = false; 116 currentCommand.reset(); 117 contentLength = -1; 118 payLoadRead = 0; 119 } 120 121 public static String commandType(byte header) throws IOException, JMSException { 122 123 byte messageType = (byte) ((header & 0xF0) >>> 4); 124 switch (messageType) { 125 case PINGREQ.TYPE: { 126 return "PINGREQ"; 127 } 128 case CONNECT.TYPE: { 129 return "CONNECT"; 130 } 131 case DISCONNECT.TYPE: { 132 return "DISCONNECT"; 133 } 134 case SUBSCRIBE.TYPE: { 135 return "SUBSCRIBE"; 136 } 137 case UNSUBSCRIBE.TYPE: { 138 return "UNSUBSCRIBE"; 139 } 140 case PUBLISH.TYPE: { 141 return "PUBLISH"; 142 } 143 case PUBACK.TYPE: { 144 return "PUBACK"; 145 } 146 case PUBREC.TYPE: { 147 return "PUBREC"; 148 } 149 case PUBREL.TYPE: { 150 return "PUBREL"; 151 } 152 case PUBCOMP.TYPE: { 153 return "PUBCOMP"; 154 } 155 default: 156 return "UNKNOWN"; 157 } 158 159 } 160 161}