001/** 002 * Licensed to the Apache Software Foundation (ASF) under one or more 003 * contributor license agreements. See the NOTICE file distributed with 004 * this work for additional information regarding copyright ownership. 005 * The ASF licenses this file to You under the Apache License, Version 2.0 006 * (the "License"); you may not use this file except in compliance with 007 * the License. You may obtain a copy of the License at 008 * 009 * http://www.apache.org/licenses/LICENSE-2.0 010 * 011 * Unless required by applicable law or agreed to in writing, software 012 * distributed under the License is distributed on an "AS IS" BASIS, 013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 014 * See the License for the specific language governing permissions and 015 * limitations under the License. 016 */ 017package org.apache.activemq.transport.mqtt; 018 019import java.io.IOException; 020import java.net.Socket; 021import java.net.URI; 022import java.net.UnknownHostException; 023import java.nio.ByteBuffer; 024 025import javax.net.SocketFactory; 026import javax.net.ssl.SSLEngine; 027 028import org.apache.activemq.transport.nio.NIOSSLTransport; 029import org.apache.activemq.wireformat.WireFormat; 030import org.fusesource.hawtbuf.DataByteArrayInputStream; 031 032public class MQTTNIOSSLTransport extends NIOSSLTransport { 033 034 private MQTTCodec codec; 035 036 public MQTTNIOSSLTransport(WireFormat wireFormat, SocketFactory socketFactory, URI remoteLocation, URI localLocation) throws UnknownHostException, IOException { 037 super(wireFormat, socketFactory, remoteLocation, localLocation); 038 } 039 040 public MQTTNIOSSLTransport(WireFormat wireFormat, Socket socket) throws IOException { 041 super(wireFormat, socket, null, null, null); 042 } 043 044 public MQTTNIOSSLTransport(WireFormat wireFormat, Socket socket, 045 SSLEngine engine, InitBuffer initBuffer, ByteBuffer inputBuffer) throws IOException { 046 super(wireFormat, socket, engine, initBuffer, inputBuffer); 047 } 048 049 @Override 050 protected void initializeStreams() throws IOException { 051 codec = new MQTTCodec(this, (MQTTWireFormat) getWireFormat()); 052 super.initializeStreams(); 053 if (inputBuffer.position() != 0 && inputBuffer.hasRemaining()) { 054 serviceRead(); 055 } 056 } 057 058 @Override 059 protected void processCommand(ByteBuffer plain) throws Exception { 060 byte[] fill = new byte[plain.remaining()]; 061 plain.get(fill); 062 DataByteArrayInputStream dis = new DataByteArrayInputStream(fill); 063 codec.parse(dis, fill.length); 064 } 065 066 /* (non-Javadoc) 067 * @see org.apache.activemq.transport.nio.NIOSSLTransport#doInit() 068 */ 069 @Override 070 protected void doInit() throws Exception { 071 if (initBuffer != null) { 072 nextFrameSize = -1; 073 receiveCounter += initBuffer.readSize; 074 initBuffer.buffer.flip(); 075 processCommand(initBuffer.buffer); 076 } 077 } 078 079 080}