001package org.apache.activemq.transport.auto.nio;
002
003import java.io.IOException;
004import java.net.Socket;
005import java.net.URI;
006import java.net.URISyntaxException;
007import java.nio.ByteBuffer;
008import java.util.Set;
009import java.util.concurrent.ExecutorService;
010import java.util.concurrent.Executors;
011import java.util.concurrent.Future;
012import java.util.concurrent.atomic.AtomicInteger;
013
014import javax.net.ServerSocketFactory;
015import javax.net.ssl.SSLContext;
016import javax.net.ssl.SSLEngine;
017
018import org.apache.activemq.broker.BrokerService;
019import org.apache.activemq.broker.BrokerServiceAware;
020import org.apache.activemq.transport.Transport;
021import org.apache.activemq.transport.auto.AutoTcpTransportServer;
022import org.apache.activemq.transport.nio.AutoInitNioSSLTransport;
023import org.apache.activemq.transport.nio.NIOSSLTransport;
024import org.apache.activemq.transport.tcp.TcpTransport;
025import org.apache.activemq.transport.tcp.TcpTransport.InitBuffer;
026import org.apache.activemq.transport.tcp.TcpTransportFactory;
027import org.apache.activemq.transport.tcp.TcpTransportServer;
028import org.apache.activemq.wireformat.WireFormat;
029import org.slf4j.Logger;
030import org.slf4j.LoggerFactory;
031
032/**
033 * Licensed to the Apache Software Foundation (ASF) under one or more
034 * contributor license agreements.  See the NOTICE file distributed with
035 * this work for additional information regarding copyright ownership.
036 * The ASF licenses this file to You under the Apache License, Version 2.0
037 * (the "License"); you may not use this file except in compliance with
038 * the License.  You may obtain a copy of the License at
039 *
040 *      http://www.apache.org/licenses/LICENSE-2.0
041 *
042 * Unless required by applicable law or agreed to in writing, software
043 * distributed under the License is distributed on an "AS IS" BASIS,
044 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
045 * See the License for the specific language governing permissions and
046 * limitations under the License.
047 */
048public class AutoNIOSSLTransportServer extends AutoTcpTransportServer {
049
050    private static final Logger LOG = LoggerFactory.getLogger(AutoNIOSSLTransportServer.class);
051
052    private SSLContext context;
053
054    public AutoNIOSSLTransportServer(SSLContext context, TcpTransportFactory transportFactory, URI location, ServerSocketFactory serverSocketFactory,
055            BrokerService brokerService, Set<String> enabledProtocols) throws IOException, URISyntaxException {
056        super(transportFactory, location, serverSocketFactory, brokerService, enabledProtocols);
057
058        this.context = context;
059    }
060
061    private boolean needClientAuth;
062    private boolean wantClientAuth;
063
064    protected Transport createTransport(Socket socket, WireFormat format, SSLEngine engine,
065            InitBuffer initBuffer, ByteBuffer inputBuffer, TcpTransportFactory detectedFactory) throws IOException {
066        NIOSSLTransport transport = new NIOSSLTransport(format, socket, engine, initBuffer, inputBuffer);
067        if (context != null) {
068            transport.setSslContext(context);
069        }
070
071        transport.setNeedClientAuth(needClientAuth);
072        transport.setWantClientAuth(wantClientAuth);
073
074
075        return transport;
076    }
077
078    @Override
079    protected TcpTransport createTransport(Socket socket, WireFormat format) throws IOException {
080        throw new UnsupportedOperationException("method not supported");
081    }
082
083    @Override
084    public boolean isSslServer() {
085        return true;
086    }
087
088    public boolean isNeedClientAuth() {
089        return this.needClientAuth;
090    }
091
092    public void setNeedClientAuth(boolean value) {
093        this.needClientAuth = value;
094    }
095
096    public boolean isWantClientAuth() {
097        return this.wantClientAuth;
098    }
099
100    public void setWantClientAuth(boolean value) {
101        this.wantClientAuth = value;
102    }
103
104
105    @Override
106    protected TransportInfo configureTransport(final TcpTransportServer server, final Socket socket) throws Exception {
107        ExecutorService executor = Executors.newSingleThreadExecutor();
108
109        //The SSLEngine needs to be initialized and handshake done to get the first command and detect the format
110        final AutoInitNioSSLTransport in = new AutoInitNioSSLTransport(wireFormatFactory.createWireFormat(), socket);
111        if (context != null) {
112            in.setSslContext(context);
113        }
114        in.start();
115        SSLEngine engine = in.getSslSession();
116
117        Future<?> future = executor.submit(new Runnable() {
118            @Override
119            public void run() {
120                //Wait for handshake to finish initializing
121                do {
122                    in.serviceRead();
123                } while(in.getReadSize().get() < 8);
124            }
125        });
126
127        waitForProtocolDetectionFinish(future, in.getReadSize());
128        in.stop();
129
130        initBuffer = new InitBuffer(in.getReadSize().get(), ByteBuffer.allocate(in.getReadData().length));
131        initBuffer.buffer.put(in.getReadData());
132
133        ProtocolInfo protocolInfo = detectProtocol(in.getReadData());
134
135        if (protocolInfo.detectedTransportFactory instanceof BrokerServiceAware) {
136            ((BrokerServiceAware) protocolInfo.detectedTransportFactory).setBrokerService(brokerService);
137        }
138
139        WireFormat format = protocolInfo.detectedWireFormatFactory.createWireFormat();
140        Transport transport = createTransport(socket, format, engine, initBuffer, in.getInputBuffer(), protocolInfo.detectedTransportFactory);
141
142        return new TransportInfo(format, transport, protocolInfo.detectedTransportFactory);
143    }
144
145
146}
147
148