001/**
002 * Licensed to the Apache Software Foundation (ASF) under one or more
003 * contributor license agreements.  See the NOTICE file distributed with
004 * this work for additional information regarding copyright ownership.
005 * The ASF licenses this file to You under the Apache License, Version 2.0
006 * (the "License"); you may not use this file except in compliance with
007 * the License.  You may obtain a copy of the License at
008 *
009 *      http://www.apache.org/licenses/LICENSE-2.0
010 *
011 * Unless required by applicable law or agreed to in writing, software
012 * distributed under the License is distributed on an "AS IS" BASIS,
013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014 * See the License for the specific language governing permissions and
015 * limitations under the License.
016 */
017package org.apache.activemq.transport.tcp;
018
019import java.io.IOException;
020import java.net.InetAddress;
021import java.net.InetSocketAddress;
022import java.net.ServerSocket;
023import java.net.Socket;
024import java.net.SocketException;
025import java.net.SocketTimeoutException;
026import java.net.URI;
027import java.net.URISyntaxException;
028import java.net.UnknownHostException;
029import java.nio.channels.SelectionKey;
030import java.nio.channels.ServerSocketChannel;
031import java.nio.channels.SocketChannel;
032import java.util.HashMap;
033import java.util.concurrent.BlockingQueue;
034import java.util.concurrent.LinkedBlockingQueue;
035import java.util.concurrent.TimeUnit;
036import java.util.concurrent.atomic.AtomicInteger;
037
038import javax.net.ServerSocketFactory;
039import javax.net.ssl.SSLServerSocket;
040
041import org.apache.activemq.Service;
042import org.apache.activemq.ThreadPriorities;
043import org.apache.activemq.TransportLoggerSupport;
044import org.apache.activemq.command.BrokerInfo;
045import org.apache.activemq.openwire.OpenWireFormatFactory;
046import org.apache.activemq.transport.Transport;
047import org.apache.activemq.transport.TransportFactory;
048import org.apache.activemq.transport.TransportServer;
049import org.apache.activemq.transport.TransportServerThreadSupport;
050import org.apache.activemq.transport.nio.SelectorManager;
051import org.apache.activemq.transport.nio.SelectorSelection;
052import org.apache.activemq.util.IOExceptionSupport;
053import org.apache.activemq.util.InetAddressUtil;
054import org.apache.activemq.util.IntrospectionSupport;
055import org.apache.activemq.util.ServiceListener;
056import org.apache.activemq.util.ServiceStopper;
057import org.apache.activemq.util.ServiceSupport;
058import org.apache.activemq.wireformat.WireFormat;
059import org.apache.activemq.wireformat.WireFormatFactory;
060import org.slf4j.Logger;
061import org.slf4j.LoggerFactory;
062
063/**
064 * A TCP based implementation of {@link TransportServer}
065 */
066public class TcpTransportServer extends TransportServerThreadSupport implements ServiceListener {
067
068    private static final Logger LOG = LoggerFactory.getLogger(TcpTransportServer.class);
069    protected ServerSocket serverSocket;
070    protected SelectorSelection selector;
071    protected int backlog = 5000;
072    protected WireFormatFactory wireFormatFactory = new OpenWireFormatFactory();
073    protected final TcpTransportFactory transportFactory;
074    protected long maxInactivityDuration = 30000;
075    protected long maxInactivityDurationInitalDelay = 10000;
076    protected int minmumWireFormatVersion;
077    protected boolean useQueueForAccept = true;
078    protected boolean allowLinkStealing;
079
080    /**
081     * trace=true -> the Transport stack where this TcpTransport object will be, will have a TransportLogger layer
082     * trace=false -> the Transport stack where this TcpTransport object will be, will NOT have a TransportLogger layer,
083     * and therefore will never be able to print logging messages. This parameter is most probably set in Connection or
084     * TransportConnector URIs.
085     */
086    protected boolean trace = false;
087
088    protected int soTimeout = 0;
089    protected int socketBufferSize = 64 * 1024;
090    protected int connectionTimeout = 30000;
091
092    /**
093     * Name of the LogWriter implementation to use. Names are mapped to classes in the
094     * resources/META-INF/services/org/apache/activemq/transport/logwriters directory. This parameter is most probably
095     * set in Connection or TransportConnector URIs.
096     */
097    protected String logWriterName = TransportLoggerSupport.defaultLogWriterName;
098
099    /**
100     * Specifies if the TransportLogger will be manageable by JMX or not. Also, as long as there is at least 1
101     * TransportLogger which is manageable, a TransportLoggerControl MBean will me created.
102     */
103    protected boolean dynamicManagement = false;
104
105    /**
106     * startLogging=true -> the TransportLogger object of the Transport stack will initially write messages to the log.
107     * startLogging=false -> the TransportLogger object of the Transport stack will initially NOT write messages to the
108     * log. This parameter only has an effect if trace == true. This parameter is most probably set in Connection or
109     * TransportConnector URIs.
110     */
111    protected boolean startLogging = true;
112    protected final ServerSocketFactory serverSocketFactory;
113    protected BlockingQueue<Socket> socketQueue = new LinkedBlockingQueue<Socket>();
114    protected Thread socketHandlerThread;
115
116    /**
117     * The maximum number of sockets allowed for this server
118     */
119    protected int maximumConnections = Integer.MAX_VALUE;
120    protected AtomicInteger currentTransportCount = new AtomicInteger();
121
122    public TcpTransportServer(TcpTransportFactory transportFactory, URI location, ServerSocketFactory serverSocketFactory) throws IOException,
123        URISyntaxException {
124        super(location);
125        this.transportFactory = transportFactory;
126        this.serverSocketFactory = serverSocketFactory;
127    }
128
129    public void bind() throws IOException {
130        URI bind = getBindLocation();
131
132        String host = bind.getHost();
133        host = (host == null || host.length() == 0) ? "localhost" : host;
134        InetAddress addr = InetAddress.getByName(host);
135
136        try {
137            this.serverSocket = serverSocketFactory.createServerSocket(bind.getPort(), backlog, addr);
138            configureServerSocket(this.serverSocket);
139        } catch (IOException e) {
140            throw IOExceptionSupport.create("Failed to bind to server socket: " + bind + " due to: " + e, e);
141        }
142        try {
143            setConnectURI(new URI(bind.getScheme(), bind.getUserInfo(), resolveHostName(serverSocket, addr), serverSocket.getLocalPort(), bind.getPath(),
144                bind.getQuery(), bind.getFragment()));
145        } catch (URISyntaxException e) {
146
147            // it could be that the host name contains invalid characters such
148            // as _ on unix platforms so lets try use the IP address instead
149            try {
150                setConnectURI(new URI(bind.getScheme(), bind.getUserInfo(), addr.getHostAddress(), serverSocket.getLocalPort(), bind.getPath(),
151                    bind.getQuery(), bind.getFragment()));
152            } catch (URISyntaxException e2) {
153                throw IOExceptionSupport.create(e2);
154            }
155        }
156    }
157
158    private void configureServerSocket(ServerSocket socket) throws SocketException {
159        socket.setSoTimeout(2000);
160        if (transportOptions != null) {
161
162            // If the enabledCipherSuites option is invalid we don't want to ignore it as the call
163            // to SSLServerSocket to configure it has a side effect on the socket rendering it
164            // useless as all suites are enabled many of which are considered as insecure.  We
165            // instead trap that option here and throw an exception.  We should really consider
166            // all invalid options as breaking and not start the transport but the current design
167            // doesn't really allow for this.
168            //
169            //  see: https://issues.apache.org/jira/browse/AMQ-4582
170            //
171            if (socket instanceof SSLServerSocket) {
172                if (transportOptions.containsKey("enabledCipherSuites")) {
173                    Object cipherSuites = transportOptions.remove("enabledCipherSuites");
174
175                    if (!IntrospectionSupport.setProperty(socket, "enabledCipherSuites", cipherSuites)) {
176                        throw new SocketException(String.format(
177                            "Invalid transport options {enabledCipherSuites=%s}", cipherSuites));
178                    }
179                }
180            }
181
182            IntrospectionSupport.setProperties(socket, transportOptions);
183        }
184    }
185
186    /**
187     * @return Returns the wireFormatFactory.
188     */
189    public WireFormatFactory getWireFormatFactory() {
190        return wireFormatFactory;
191    }
192
193    /**
194     * @param wireFormatFactory
195     *            The wireFormatFactory to set.
196     */
197    public void setWireFormatFactory(WireFormatFactory wireFormatFactory) {
198        this.wireFormatFactory = wireFormatFactory;
199    }
200
201    /**
202     * Associates a broker info with the transport server so that the transport can do discovery advertisements of the
203     * broker.
204     *
205     * @param brokerInfo
206     */
207    @Override
208    public void setBrokerInfo(BrokerInfo brokerInfo) {
209    }
210
211    public long getMaxInactivityDuration() {
212        return maxInactivityDuration;
213    }
214
215    public void setMaxInactivityDuration(long maxInactivityDuration) {
216        this.maxInactivityDuration = maxInactivityDuration;
217    }
218
219    public long getMaxInactivityDurationInitalDelay() {
220        return this.maxInactivityDurationInitalDelay;
221    }
222
223    public void setMaxInactivityDurationInitalDelay(long maxInactivityDurationInitalDelay) {
224        this.maxInactivityDurationInitalDelay = maxInactivityDurationInitalDelay;
225    }
226
227    public int getMinmumWireFormatVersion() {
228        return minmumWireFormatVersion;
229    }
230
231    public void setMinmumWireFormatVersion(int minmumWireFormatVersion) {
232        this.minmumWireFormatVersion = minmumWireFormatVersion;
233    }
234
235    public boolean isTrace() {
236        return trace;
237    }
238
239    public void setTrace(boolean trace) {
240        this.trace = trace;
241    }
242
243    public String getLogWriterName() {
244        return logWriterName;
245    }
246
247    public void setLogWriterName(String logFormat) {
248        this.logWriterName = logFormat;
249    }
250
251    public boolean isDynamicManagement() {
252        return dynamicManagement;
253    }
254
255    public void setDynamicManagement(boolean useJmx) {
256        this.dynamicManagement = useJmx;
257    }
258
259    public boolean isStartLogging() {
260        return startLogging;
261    }
262
263    public void setStartLogging(boolean startLogging) {
264        this.startLogging = startLogging;
265    }
266
267    /**
268     * @return the backlog
269     */
270    public int getBacklog() {
271        return backlog;
272    }
273
274    /**
275     * @param backlog
276     *            the backlog to set
277     */
278    public void setBacklog(int backlog) {
279        this.backlog = backlog;
280    }
281
282    /**
283     * @return the useQueueForAccept
284     */
285    public boolean isUseQueueForAccept() {
286        return useQueueForAccept;
287    }
288
289    /**
290     * @param useQueueForAccept
291     *            the useQueueForAccept to set
292     */
293    public void setUseQueueForAccept(boolean useQueueForAccept) {
294        this.useQueueForAccept = useQueueForAccept;
295    }
296
297    /**
298     * pull Sockets from the ServerSocket
299     */
300    @Override
301    public void run() {
302        final ServerSocketChannel chan = serverSocket.getChannel();
303        if (chan != null) {
304            try {
305                chan.configureBlocking(false);
306                selector = SelectorManager.getInstance().register(chan, new SelectorManager.Listener() {
307                    @Override
308                    public void onSelect(SelectorSelection sel) {
309                        try {
310                            SocketChannel sc = chan.accept();
311                            if (sc != null) {
312                                if (isStopped() || getAcceptListener() == null) {
313                                    sc.close();
314                                } else {
315                                    if (useQueueForAccept) {
316                                        socketQueue.put(sc.socket());
317                                    } else {
318                                        handleSocket(sc.socket());
319                                    }
320                                }
321                            }
322                        } catch (Exception e) {
323                            onError(sel, e);
324                        }
325                    }
326                    @Override
327                    public void onError(SelectorSelection sel, Throwable error) {
328                        Exception e = null;
329                        if (error instanceof Exception) {
330                            e = (Exception)error;
331                        } else {
332                            e = new Exception(error);
333                        }
334                        if (!isStopping()) {
335                            onAcceptError(e);
336                        } else if (!isStopped()) {
337                            LOG.warn("run()", e);
338                            onAcceptError(e);
339                        }
340                    }
341                });
342                selector.setInterestOps(SelectionKey.OP_ACCEPT);
343                selector.enable();
344            } catch (IOException ex) {
345                selector = null;
346            }
347        } else {
348            while (!isStopped()) {
349                Socket socket = null;
350                try {
351                    socket = serverSocket.accept();
352                    if (socket != null) {
353                        if (isStopped() || getAcceptListener() == null) {
354                            socket.close();
355                        } else {
356                            if (useQueueForAccept) {
357                                socketQueue.put(socket);
358                            } else {
359                                handleSocket(socket);
360                            }
361                        }
362                    }
363                } catch (SocketTimeoutException ste) {
364                    // expect this to happen
365                } catch (Exception e) {
366                    if (!isStopping()) {
367                        onAcceptError(e);
368                    } else if (!isStopped()) {
369                        LOG.warn("run()", e);
370                        onAcceptError(e);
371                    }
372                }
373            }
374        }
375    }
376
377    /**
378     * Allow derived classes to override the Transport implementation that this transport server creates.
379     *
380     * @param socket
381     * @param format
382     *
383     * @return a new Transport instance.
384     *
385     * @throws IOException
386     */
387    protected Transport createTransport(Socket socket, WireFormat format) throws IOException {
388        return new TcpTransport(format, socket);
389    }
390
391    /**
392     * @return pretty print of this
393     */
394    @Override
395    public String toString() {
396        return "" + getBindLocation();
397    }
398
399    /**
400     * @param socket
401     * @param bindAddress
402     * @return real hostName
403     * @throws UnknownHostException
404     */
405    protected String resolveHostName(ServerSocket socket, InetAddress bindAddress) throws UnknownHostException {
406        String result = null;
407        if (socket.isBound()) {
408            if (socket.getInetAddress().isAnyLocalAddress()) {
409                // make it more human readable and useful, an alternative to 0.0.0.0
410                result = InetAddressUtil.getLocalHostName();
411            } else {
412                result = socket.getInetAddress().getCanonicalHostName();
413            }
414        } else {
415            result = bindAddress.getCanonicalHostName();
416        }
417        return result;
418    }
419
420    @Override
421    protected void doStart() throws Exception {
422        if (useQueueForAccept) {
423            Runnable run = new Runnable() {
424                @Override
425                public void run() {
426                    try {
427                        while (!isStopped() && !isStopping()) {
428                            Socket sock = socketQueue.poll(1, TimeUnit.SECONDS);
429                            if (sock != null) {
430                                try {
431                                    handleSocket(sock);
432                                } catch (Throwable thrown) {
433                                    if (!isStopping()) {
434                                        onAcceptError(new Exception(thrown));
435                                    } else if (!isStopped()) {
436                                        LOG.warn("Unexpected error thrown during accept handling: ", thrown);
437                                        onAcceptError(new Exception(thrown));
438                                    }
439                                }
440                            }
441                        }
442
443                    } catch (InterruptedException e) {
444                        if (!isStopped() || !isStopping()) {
445                            LOG.info("socketQueue interrupted - stopping");
446                            onAcceptError(e);
447                        }
448                    }
449                }
450            };
451            socketHandlerThread = new Thread(null, run, "ActiveMQ Transport Server Thread Handler: " + toString(), getStackSize());
452            socketHandlerThread.setDaemon(true);
453            socketHandlerThread.setPriority(ThreadPriorities.BROKER_MANAGEMENT - 1);
454            socketHandlerThread.start();
455        }
456        super.doStart();
457    }
458
459    @Override
460    protected void doStop(ServiceStopper stopper) throws Exception {
461        if (selector != null) {
462            selector.disable();
463            selector.close();
464            selector = null;
465        }
466        if (serverSocket != null) {
467            serverSocket.close();
468            serverSocket = null;
469        }
470        if (socketHandlerThread != null) {
471            socketHandlerThread.interrupt();
472            socketHandlerThread = null;
473        }
474        super.doStop(stopper);
475    }
476
477    @Override
478    public InetSocketAddress getSocketAddress() {
479        return (InetSocketAddress) serverSocket.getLocalSocketAddress();
480    }
481
482    protected void handleSocket(Socket socket) {
483        doHandleSocket(socket);
484    }
485
486    final protected void doHandleSocket(Socket socket) {
487        boolean closeSocket = true;
488        boolean countIncremented = false;
489        try {
490            int currentCount;
491            do {
492                currentCount = currentTransportCount.get();
493                if (currentCount >= this.maximumConnections) {
494                     throw new ExceededMaximumConnectionsException(
495                         "Exceeded the maximum number of allowed client connections. See the '" +
496                         "maximumConnections' property on the TCP transport configuration URI " +
497                         "in the ActiveMQ configuration file (e.g., activemq.xml)");
498                 }
499
500            //Increment this value before configuring the transport
501            //This is necessary because some of the transport servers must read from the
502            //socket during configureTransport() so we want to make sure this value is
503            //accurate as the transport server could pause here waiting for data to be sent from a client
504            } while(!currentTransportCount.compareAndSet(currentCount, currentCount + 1));
505            countIncremented = true;
506
507            HashMap<String, Object> options = new HashMap<String, Object>();
508            options.put("maxInactivityDuration", Long.valueOf(maxInactivityDuration));
509            options.put("maxInactivityDurationInitalDelay", Long.valueOf(maxInactivityDurationInitalDelay));
510            options.put("minmumWireFormatVersion", Integer.valueOf(minmumWireFormatVersion));
511            options.put("trace", Boolean.valueOf(trace));
512            options.put("soTimeout", Integer.valueOf(soTimeout));
513            options.put("socketBufferSize", Integer.valueOf(socketBufferSize));
514            options.put("connectionTimeout", Integer.valueOf(connectionTimeout));
515            options.put("logWriterName", logWriterName);
516            options.put("dynamicManagement", Boolean.valueOf(dynamicManagement));
517            options.put("startLogging", Boolean.valueOf(startLogging));
518            options.putAll(transportOptions);
519
520            TransportInfo transportInfo = configureTransport(this, socket);
521            closeSocket = false;
522
523            if (transportInfo.transport instanceof ServiceSupport) {
524                ((ServiceSupport) transportInfo.transport).addServiceListener(this);
525            }
526
527            Transport configuredTransport = transportInfo.transportFactory.serverConfigure(
528                    transportInfo.transport, transportInfo.format, options);
529
530            getAcceptListener().onAccept(configuredTransport);
531
532        } catch (SocketTimeoutException ste) {
533            // expect this to happen
534        } catch (Exception e) {
535            if (closeSocket) {
536                try {
537                    //if closing the socket, only decrement the count it was actually incremented
538                    //where it was incremented
539                    if (countIncremented) {
540                        currentTransportCount.decrementAndGet();
541                    }
542                    socket.close();
543                } catch (Exception ignore) {
544                }
545            }
546
547            if (!isStopping()) {
548                onAcceptError(e);
549            } else if (!isStopped()) {
550                LOG.warn("run()", e);
551                onAcceptError(e);
552            }
553        }
554    }
555
556    protected TransportInfo configureTransport(final TcpTransportServer server, final Socket socket) throws Exception {
557        WireFormat format = wireFormatFactory.createWireFormat();
558        Transport transport = createTransport(socket, format);
559        return new TransportInfo(format, transport, transportFactory);
560    }
561
562    protected class TransportInfo {
563        final WireFormat format;
564        final Transport transport;
565        final TransportFactory transportFactory;
566
567        public TransportInfo(WireFormat format, Transport transport, TransportFactory transportFactory) {
568            this.format = format;
569            this.transport = transport;
570            this.transportFactory = transportFactory;
571        }
572    }
573
574    public int getSoTimeout() {
575        return soTimeout;
576    }
577
578    public void setSoTimeout(int soTimeout) {
579        this.soTimeout = soTimeout;
580    }
581
582    public int getSocketBufferSize() {
583        return socketBufferSize;
584    }
585
586    public void setSocketBufferSize(int socketBufferSize) {
587        this.socketBufferSize = socketBufferSize;
588    }
589
590    public int getConnectionTimeout() {
591        return connectionTimeout;
592    }
593
594    public void setConnectionTimeout(int connectionTimeout) {
595        this.connectionTimeout = connectionTimeout;
596    }
597
598    /**
599     * @return the maximumConnections
600     */
601    public int getMaximumConnections() {
602        return maximumConnections;
603    }
604
605    /**
606     * @param maximumConnections
607     *            the maximumConnections to set
608     */
609    public void setMaximumConnections(int maximumConnections) {
610        this.maximumConnections = maximumConnections;
611    }
612
613    public AtomicInteger getCurrentTransportCount() {
614        return currentTransportCount;
615    }
616
617    @Override
618    public void started(Service service) {
619    }
620
621    @Override
622    public void stopped(Service service) {
623        this.currentTransportCount.decrementAndGet();
624    }
625
626    @Override
627    public boolean isSslServer() {
628        return false;
629    }
630
631    @Override
632    public boolean isAllowLinkStealing() {
633        return allowLinkStealing;
634    }
635
636    @Override
637    public void setAllowLinkStealing(boolean allowLinkStealing) {
638        this.allowLinkStealing = allowLinkStealing;
639    }
640}