001package org.apache.activemq.transport.auto.nio; 002 003import java.io.IOException; 004import java.net.Socket; 005import java.net.URI; 006import java.net.URISyntaxException; 007import java.nio.ByteBuffer; 008import java.util.Set; 009import java.util.concurrent.ExecutorService; 010import java.util.concurrent.Executors; 011import java.util.concurrent.Future; 012import java.util.concurrent.atomic.AtomicInteger; 013 014import javax.net.ServerSocketFactory; 015import javax.net.ssl.SSLContext; 016import javax.net.ssl.SSLEngine; 017 018import org.apache.activemq.broker.BrokerService; 019import org.apache.activemq.broker.BrokerServiceAware; 020import org.apache.activemq.transport.Transport; 021import org.apache.activemq.transport.auto.AutoTcpTransportServer; 022import org.apache.activemq.transport.nio.AutoInitNioSSLTransport; 023import org.apache.activemq.transport.nio.NIOSSLTransport; 024import org.apache.activemq.transport.tcp.TcpTransport; 025import org.apache.activemq.transport.tcp.TcpTransport.InitBuffer; 026import org.apache.activemq.transport.tcp.TcpTransportFactory; 027import org.apache.activemq.transport.tcp.TcpTransportServer; 028import org.apache.activemq.wireformat.WireFormat; 029import org.slf4j.Logger; 030import org.slf4j.LoggerFactory; 031 032/** 033 * Licensed to the Apache Software Foundation (ASF) under one or more 034 * contributor license agreements. See the NOTICE file distributed with 035 * this work for additional information regarding copyright ownership. 036 * The ASF licenses this file to You under the Apache License, Version 2.0 037 * (the "License"); you may not use this file except in compliance with 038 * the License. You may obtain a copy of the License at 039 * 040 * http://www.apache.org/licenses/LICENSE-2.0 041 * 042 * Unless required by applicable law or agreed to in writing, software 043 * distributed under the License is distributed on an "AS IS" BASIS, 044 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 045 * See the License for the specific language governing permissions and 046 * limitations under the License. 047 */ 048public class AutoNIOSSLTransportServer extends AutoTcpTransportServer { 049 050 private static final Logger LOG = LoggerFactory.getLogger(AutoNIOSSLTransportServer.class); 051 052 private SSLContext context; 053 054 public AutoNIOSSLTransportServer(SSLContext context, TcpTransportFactory transportFactory, URI location, ServerSocketFactory serverSocketFactory, 055 BrokerService brokerService, Set<String> enabledProtocols) throws IOException, URISyntaxException { 056 super(transportFactory, location, serverSocketFactory, brokerService, enabledProtocols); 057 058 this.context = context; 059 } 060 061 private boolean needClientAuth; 062 private boolean wantClientAuth; 063 064 protected Transport createTransport(Socket socket, WireFormat format, SSLEngine engine, 065 InitBuffer initBuffer, ByteBuffer inputBuffer, TcpTransportFactory detectedFactory) throws IOException { 066 NIOSSLTransport transport = new NIOSSLTransport(format, socket, engine, initBuffer, inputBuffer); 067 if (context != null) { 068 transport.setSslContext(context); 069 } 070 071 transport.setNeedClientAuth(needClientAuth); 072 transport.setWantClientAuth(wantClientAuth); 073 074 075 return transport; 076 } 077 078 @Override 079 protected TcpTransport createTransport(Socket socket, WireFormat format) throws IOException { 080 throw new UnsupportedOperationException("method not supported"); 081 } 082 083 @Override 084 public boolean isSslServer() { 085 return true; 086 } 087 088 public boolean isNeedClientAuth() { 089 return this.needClientAuth; 090 } 091 092 public void setNeedClientAuth(boolean value) { 093 this.needClientAuth = value; 094 } 095 096 public boolean isWantClientAuth() { 097 return this.wantClientAuth; 098 } 099 100 public void setWantClientAuth(boolean value) { 101 this.wantClientAuth = value; 102 } 103 104 105 @Override 106 protected TransportInfo configureTransport(final TcpTransportServer server, final Socket socket) throws Exception { 107 ExecutorService executor = Executors.newSingleThreadExecutor(); 108 109 //The SSLEngine needs to be initialized and handshake done to get the first command and detect the format 110 final AutoInitNioSSLTransport in = new AutoInitNioSSLTransport(wireFormatFactory.createWireFormat(), socket); 111 if (context != null) { 112 in.setSslContext(context); 113 } 114 in.start(); 115 SSLEngine engine = in.getSslSession(); 116 117 Future<?> future = executor.submit(new Runnable() { 118 @Override 119 public void run() { 120 //Wait for handshake to finish initializing 121 do { 122 in.serviceRead(); 123 } while(in.getReadSize().get() < 8); 124 } 125 }); 126 127 waitForProtocolDetectionFinish(future, in.getReadSize()); 128 in.stop(); 129 130 initBuffer = new InitBuffer(in.getReadSize().get(), ByteBuffer.allocate(in.getReadData().length)); 131 initBuffer.buffer.put(in.getReadData()); 132 133 ProtocolInfo protocolInfo = detectProtocol(in.getReadData()); 134 135 if (protocolInfo.detectedTransportFactory instanceof BrokerServiceAware) { 136 ((BrokerServiceAware) protocolInfo.detectedTransportFactory).setBrokerService(brokerService); 137 } 138 139 WireFormat format = protocolInfo.detectedWireFormatFactory.createWireFormat(); 140 Transport transport = createTransport(socket, format, engine, initBuffer, in.getInputBuffer(), protocolInfo.detectedTransportFactory); 141 142 return new TransportInfo(format, transport, protocolInfo.detectedTransportFactory); 143 } 144 145 146} 147 148