001/** 002gxfdgvdfg * Licensed to the Apache Software Foundation (ASF) under one or more 003 * contributor license agreements. See the NOTICE file distributed with 004 * this work for additional information regarding copyright ownership. 005 * The ASF licenses this file to You under the Apache License, Version 2.0 006 * (the "License"); you may not use this file except in compliance with 007 * the License. You may obtain a copy of the License at 008 * 009 * http://www.apache.org/licenses/LICENSE-2.0 010 * 011 * Unless required by applicable law or agreed to in writing, software 012 * distributed under the License is distributed on an "AS IS" BASIS, 013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 014 * See the License for the specific language governing permissions and 015 * limitations under the License. 016 */ 017package org.apache.activemq.transport.tcp; 018 019import java.io.DataInputStream; 020import java.io.DataOutputStream; 021import java.io.IOException; 022import java.io.InterruptedIOException; 023import java.net.InetAddress; 024import java.net.InetSocketAddress; 025import java.net.Socket; 026import java.net.SocketAddress; 027import java.net.SocketException; 028import java.net.SocketTimeoutException; 029import java.net.URI; 030import java.net.UnknownHostException; 031import java.nio.ByteBuffer; 032import java.util.HashMap; 033import java.util.Map; 034import java.util.concurrent.CountDownLatch; 035import java.util.concurrent.TimeUnit; 036import java.util.concurrent.atomic.AtomicReference; 037 038import javax.net.SocketFactory; 039 040import org.apache.activemq.Service; 041import org.apache.activemq.TransportLoggerSupport; 042import org.apache.activemq.thread.TaskRunnerFactory; 043import org.apache.activemq.transport.Transport; 044import org.apache.activemq.transport.TransportThreadSupport; 045import org.apache.activemq.util.InetAddressUtil; 046import org.apache.activemq.util.IntrospectionSupport; 047import org.apache.activemq.util.ServiceStopper; 048import org.apache.activemq.wireformat.WireFormat; 049import org.slf4j.Logger; 050import org.slf4j.LoggerFactory; 051 052/** 053 * An implementation of the {@link Transport} interface using raw tcp/ip 054 * 055 * @author David Martin Clavo david(dot)martin(dot)clavo(at)gmail.com (logging improvement modifications) 056 * 057 */ 058public class TcpTransport extends TransportThreadSupport implements Transport, Service, Runnable { 059 private static final Logger LOG = LoggerFactory.getLogger(TcpTransport.class); 060 protected final URI remoteLocation; 061 protected final URI localLocation; 062 protected final WireFormat wireFormat; 063 064 protected int connectionTimeout = 30000; 065 protected int soTimeout; 066 protected int socketBufferSize = 64 * 1024; 067 protected int ioBufferSize = 8 * 1024; 068 protected boolean closeAsync=true; 069 protected Socket socket; 070 protected DataOutputStream dataOut; 071 protected DataInputStream dataIn; 072 protected TimeStampStream buffOut = null; 073 074 protected final InitBuffer initBuffer; 075 076 /** 077 * The Traffic Class to be set on the socket. 078 */ 079 protected int trafficClass = 0; 080 /** 081 * Keeps track of attempts to set the Traffic Class on the socket. 082 */ 083 private boolean trafficClassSet = false; 084 /** 085 * Prevents setting both the Differentiated Services and Type of Service 086 * transport options at the same time, since they share the same spot in 087 * the TCP/IP packet headers. 088 */ 089 protected boolean diffServChosen = false; 090 protected boolean typeOfServiceChosen = false; 091 /** 092 * trace=true -> the Transport stack where this TcpTransport 093 * object will be, will have a TransportLogger layer 094 * trace=false -> the Transport stack where this TcpTransport 095 * object will be, will NOT have a TransportLogger layer, and therefore 096 * will never be able to print logging messages. 097 * This parameter is most probably set in Connection or TransportConnector URIs. 098 */ 099 protected boolean trace = false; 100 /** 101 * Name of the LogWriter implementation to use. 102 * Names are mapped to classes in the resources/META-INF/services/org/apache/activemq/transport/logwriters directory. 103 * This parameter is most probably set in Connection or TransportConnector URIs. 104 */ 105 protected String logWriterName = TransportLoggerSupport.defaultLogWriterName; 106 /** 107 * Specifies if the TransportLogger will be manageable by JMX or not. 108 * Also, as long as there is at least 1 TransportLogger which is manageable, 109 * a TransportLoggerControl MBean will me created. 110 */ 111 protected boolean dynamicManagement = false; 112 /** 113 * startLogging=true -> the TransportLogger object of the Transport stack 114 * will initially write messages to the log. 115 * startLogging=false -> the TransportLogger object of the Transport stack 116 * will initially NOT write messages to the log. 117 * This parameter only has an effect if trace == true. 118 * This parameter is most probably set in Connection or TransportConnector URIs. 119 */ 120 protected boolean startLogging = true; 121 /** 122 * Specifies the port that will be used by the JMX server to manage 123 * the TransportLoggers. 124 * This should only be set in an URI by a client (producer or consumer) since 125 * a broker will already create a JMX server. 126 * It is useful for people who test a broker and clients in the same machine 127 * and want to control both via JMX; a different port will be needed. 128 */ 129 protected int jmxPort = 1099; 130 protected boolean useLocalHost = false; 131 protected int minmumWireFormatVersion; 132 protected SocketFactory socketFactory; 133 protected final AtomicReference<CountDownLatch> stoppedLatch = new AtomicReference<CountDownLatch>(); 134 protected volatile int receiveCounter; 135 136 private Map<String, Object> socketOptions; 137 private int soLinger = Integer.MIN_VALUE; 138 private Boolean keepAlive; 139 private Boolean tcpNoDelay; 140 private Thread runnerThread; 141 142 /** 143 * Connect to a remote Node - e.g. a Broker 144 * 145 * @param wireFormat 146 * @param socketFactory 147 * @param remoteLocation 148 * @param localLocation - e.g. local InetAddress and local port 149 * @throws IOException 150 * @throws UnknownHostException 151 */ 152 public TcpTransport(WireFormat wireFormat, SocketFactory socketFactory, URI remoteLocation, 153 URI localLocation) throws UnknownHostException, IOException { 154 this.wireFormat = wireFormat; 155 this.socketFactory = socketFactory; 156 try { 157 this.socket = socketFactory.createSocket(); 158 } catch (SocketException e) { 159 this.socket = null; 160 } 161 this.remoteLocation = remoteLocation; 162 this.localLocation = localLocation; 163 this.initBuffer = null; 164 setDaemon(false); 165 } 166 167 /** 168 * Initialize from a server Socket 169 * 170 * @param wireFormat 171 * @param socket 172 * @throws IOException 173 */ 174 public TcpTransport(WireFormat wireFormat, Socket socket) throws IOException { 175 this(wireFormat, socket, null); 176 } 177 178 public TcpTransport(WireFormat wireFormat, Socket socket, InitBuffer initBuffer) throws IOException { 179 this.wireFormat = wireFormat; 180 this.socket = socket; 181 this.remoteLocation = null; 182 this.localLocation = null; 183 this.initBuffer = initBuffer; 184 setDaemon(true); 185 } 186 187 /** 188 * A one way asynchronous send 189 */ 190 @Override 191 public void oneway(Object command) throws IOException { 192 checkStarted(); 193 wireFormat.marshal(command, dataOut); 194 dataOut.flush(); 195 } 196 197 /** 198 * @return pretty print of 'this' 199 */ 200 @Override 201 public String toString() { 202 return "" + (socket.isConnected() ? "tcp://" + socket.getInetAddress() + ":" + socket.getPort() + "@" + socket.getLocalPort() 203 : (localLocation != null ? localLocation : remoteLocation)) ; 204 } 205 206 /** 207 * reads packets from a Socket 208 */ 209 @Override 210 public void run() { 211 LOG.trace("TCP consumer thread for " + this + " starting"); 212 this.runnerThread=Thread.currentThread(); 213 try { 214 while (!isStopped()) { 215 doRun(); 216 } 217 } catch (IOException e) { 218 stoppedLatch.get().countDown(); 219 onException(e); 220 } catch (Throwable e){ 221 stoppedLatch.get().countDown(); 222 IOException ioe=new IOException("Unexpected error occurred: " + e); 223 ioe.initCause(e); 224 onException(ioe); 225 }finally { 226 stoppedLatch.get().countDown(); 227 } 228 } 229 230 protected void doRun() throws IOException { 231 try { 232 Object command = readCommand(); 233 doConsume(command); 234 } catch (SocketTimeoutException e) { 235 } catch (InterruptedIOException e) { 236 } 237 } 238 239 protected Object readCommand() throws IOException { 240 return wireFormat.unmarshal(dataIn); 241 } 242 243 // Properties 244 // ------------------------------------------------------------------------- 245 public String getDiffServ() { 246 // This is the value requested by the user by setting the Tcp Transport 247 // options. If the socket hasn't been created, then this value may not 248 // reflect the value returned by Socket.getTrafficClass(). 249 return Integer.toString(this.trafficClass); 250 } 251 252 public void setDiffServ(String diffServ) throws IllegalArgumentException { 253 this.trafficClass = QualityOfServiceUtils.getDSCP(diffServ); 254 this.diffServChosen = true; 255 } 256 257 public int getTypeOfService() { 258 // This is the value requested by the user by setting the Tcp Transport 259 // options. If the socket hasn't been created, then this value may not 260 // reflect the value returned by Socket.getTrafficClass(). 261 return this.trafficClass; 262 } 263 264 public void setTypeOfService(int typeOfService) { 265 this.trafficClass = QualityOfServiceUtils.getToS(typeOfService); 266 this.typeOfServiceChosen = true; 267 } 268 269 public boolean isTrace() { 270 return trace; 271 } 272 273 public void setTrace(boolean trace) { 274 this.trace = trace; 275 } 276 277 public String getLogWriterName() { 278 return logWriterName; 279 } 280 281 public void setLogWriterName(String logFormat) { 282 this.logWriterName = logFormat; 283 } 284 285 public boolean isDynamicManagement() { 286 return dynamicManagement; 287 } 288 289 public void setDynamicManagement(boolean useJmx) { 290 this.dynamicManagement = useJmx; 291 } 292 293 public boolean isStartLogging() { 294 return startLogging; 295 } 296 297 public void setStartLogging(boolean startLogging) { 298 this.startLogging = startLogging; 299 } 300 301 public int getJmxPort() { 302 return jmxPort; 303 } 304 305 public void setJmxPort(int jmxPort) { 306 this.jmxPort = jmxPort; 307 } 308 309 public int getMinmumWireFormatVersion() { 310 return minmumWireFormatVersion; 311 } 312 313 public void setMinmumWireFormatVersion(int minmumWireFormatVersion) { 314 this.minmumWireFormatVersion = minmumWireFormatVersion; 315 } 316 317 public boolean isUseLocalHost() { 318 return useLocalHost; 319 } 320 321 /** 322 * Sets whether 'localhost' or the actual local host name should be used to 323 * make local connections. On some operating systems such as Macs its not 324 * possible to connect as the local host name so localhost is better. 325 */ 326 public void setUseLocalHost(boolean useLocalHost) { 327 this.useLocalHost = useLocalHost; 328 } 329 330 public int getSocketBufferSize() { 331 return socketBufferSize; 332 } 333 334 /** 335 * Sets the buffer size to use on the socket 336 */ 337 public void setSocketBufferSize(int socketBufferSize) { 338 this.socketBufferSize = socketBufferSize; 339 } 340 341 public int getSoTimeout() { 342 return soTimeout; 343 } 344 345 /** 346 * Sets the socket timeout 347 */ 348 public void setSoTimeout(int soTimeout) { 349 this.soTimeout = soTimeout; 350 } 351 352 public int getConnectionTimeout() { 353 return connectionTimeout; 354 } 355 356 /** 357 * Sets the timeout used to connect to the socket 358 */ 359 public void setConnectionTimeout(int connectionTimeout) { 360 this.connectionTimeout = connectionTimeout; 361 } 362 363 public Boolean getKeepAlive() { 364 return keepAlive; 365 } 366 367 /** 368 * Enable/disable TCP KEEP_ALIVE mode 369 */ 370 public void setKeepAlive(Boolean keepAlive) { 371 this.keepAlive = keepAlive; 372 } 373 374 /** 375 * Enable/disable soLinger 376 * @param soLinger enabled if > -1, disabled if == -1, system default otherwise 377 */ 378 public void setSoLinger(int soLinger) { 379 this.soLinger = soLinger; 380 } 381 382 public int getSoLinger() { 383 return soLinger; 384 } 385 386 public Boolean getTcpNoDelay() { 387 return tcpNoDelay; 388 } 389 390 /** 391 * Enable/disable the TCP_NODELAY option on the socket 392 */ 393 public void setTcpNoDelay(Boolean tcpNoDelay) { 394 this.tcpNoDelay = tcpNoDelay; 395 } 396 397 /** 398 * @return the ioBufferSize 399 */ 400 public int getIoBufferSize() { 401 return this.ioBufferSize; 402 } 403 404 /** 405 * @param ioBufferSize the ioBufferSize to set 406 */ 407 public void setIoBufferSize(int ioBufferSize) { 408 this.ioBufferSize = ioBufferSize; 409 } 410 411 /** 412 * @return the closeAsync 413 */ 414 public boolean isCloseAsync() { 415 return closeAsync; 416 } 417 418 /** 419 * @param closeAsync the closeAsync to set 420 */ 421 public void setCloseAsync(boolean closeAsync) { 422 this.closeAsync = closeAsync; 423 } 424 425 // Implementation methods 426 // ------------------------------------------------------------------------- 427 protected String resolveHostName(String host) throws UnknownHostException { 428 if (isUseLocalHost()) { 429 String localName = InetAddressUtil.getLocalHostName(); 430 if (localName != null && localName.equals(host)) { 431 return "localhost"; 432 } 433 } 434 return host; 435 } 436 437 /** 438 * Configures the socket for use 439 * 440 * @param sock the socket 441 * @throws SocketException, IllegalArgumentException if setting the options 442 * on the socket failed. 443 */ 444 protected void initialiseSocket(Socket sock) throws SocketException, IllegalArgumentException { 445 if (socketOptions != null) { 446 // copy the map as its used values is being removed when calling setProperties 447 // and we need to be able to set the options again in case socket is re-initailized 448 Map<String, Object> copy = new HashMap<String, Object>(socketOptions); 449 IntrospectionSupport.setProperties(socket, copy); 450 if (!copy.isEmpty()) { 451 throw new IllegalArgumentException("Invalid socket parameters: " + copy); 452 } 453 } 454 455 try { 456 //only positive values are legal 457 if (socketBufferSize > 0) { 458 sock.setReceiveBufferSize(socketBufferSize); 459 sock.setSendBufferSize(socketBufferSize); 460 } else { 461 LOG.warn("Socket buffer size was set to {}; Skipping this setting as the size must be a positive number.", socketBufferSize); 462 } 463 } catch (SocketException se) { 464 LOG.warn("Cannot set socket buffer size = " + socketBufferSize); 465 LOG.debug("Cannot set socket buffer size. Reason: " + se.getMessage() + ". This exception is ignored.", se); 466 } 467 sock.setSoTimeout(soTimeout); 468 469 if (keepAlive != null) { 470 sock.setKeepAlive(keepAlive.booleanValue()); 471 } 472 473 if (soLinger > -1) { 474 sock.setSoLinger(true, soLinger); 475 } else if (soLinger == -1) { 476 sock.setSoLinger(false, 0); 477 } 478 if (tcpNoDelay != null) { 479 sock.setTcpNoDelay(tcpNoDelay.booleanValue()); 480 } 481 if (!this.trafficClassSet) { 482 this.trafficClassSet = setTrafficClass(sock); 483 } 484 } 485 486 @Override 487 protected void doStart() throws Exception { 488 connect(); 489 stoppedLatch.set(new CountDownLatch(1)); 490 super.doStart(); 491 } 492 493 protected void connect() throws Exception { 494 495 if (socket == null && socketFactory == null) { 496 throw new IllegalStateException("Cannot connect if the socket or socketFactory have not been set"); 497 } 498 499 InetSocketAddress localAddress = null; 500 InetSocketAddress remoteAddress = null; 501 502 if (localLocation != null) { 503 localAddress = new InetSocketAddress(InetAddress.getByName(localLocation.getHost()), 504 localLocation.getPort()); 505 } 506 507 if (remoteLocation != null) { 508 String host = resolveHostName(remoteLocation.getHost()); 509 remoteAddress = new InetSocketAddress(host, remoteLocation.getPort()); 510 } 511 // Set the traffic class before the socket is connected when possible so 512 // that the connection packets are given the correct traffic class. 513 this.trafficClassSet = setTrafficClass(socket); 514 515 if (socket != null) { 516 517 if (localAddress != null) { 518 socket.bind(localAddress); 519 } 520 521 // If it's a server accepted socket.. we don't need to connect it 522 // to a remote address. 523 if (remoteAddress != null) { 524 if (connectionTimeout >= 0) { 525 socket.connect(remoteAddress, connectionTimeout); 526 } else { 527 socket.connect(remoteAddress); 528 } 529 } 530 531 } else { 532 // For SSL sockets.. you can't create an unconnected socket :( 533 // This means the timout option are not supported either. 534 if (localAddress != null) { 535 socket = socketFactory.createSocket(remoteAddress.getAddress(), remoteAddress.getPort(), 536 localAddress.getAddress(), localAddress.getPort()); 537 } else { 538 socket = socketFactory.createSocket(remoteAddress.getAddress(), remoteAddress.getPort()); 539 } 540 } 541 542 initialiseSocket(socket); 543 initializeStreams(); 544 } 545 546 @Override 547 protected void doStop(ServiceStopper stopper) throws Exception { 548 if (LOG.isDebugEnabled()) { 549 LOG.debug("Stopping transport " + this); 550 } 551 552 // Closing the streams flush the sockets before closing.. if the socket 553 // is hung.. then this hangs the close. 554 // closeStreams(); 555 if (socket != null) { 556 if (closeAsync) { 557 //closing the socket can hang also 558 final CountDownLatch latch = new CountDownLatch(1); 559 560 // need a async task for this 561 final TaskRunnerFactory taskRunnerFactory = new TaskRunnerFactory(); 562 taskRunnerFactory.execute(new Runnable() { 563 @Override 564 public void run() { 565 LOG.trace("Closing socket {}", socket); 566 try { 567 socket.close(); 568 LOG.debug("Closed socket {}", socket); 569 } catch (IOException e) { 570 if (LOG.isDebugEnabled()) { 571 LOG.debug("Caught exception closing socket " + socket + ". This exception will be ignored.", e); 572 } 573 } finally { 574 latch.countDown(); 575 } 576 } 577 }); 578 579 try { 580 latch.await(1,TimeUnit.SECONDS); 581 } catch (InterruptedException e) { 582 Thread.currentThread().interrupt(); 583 } finally { 584 taskRunnerFactory.shutdownNow(); 585 } 586 587 } else { 588 // close synchronously 589 LOG.trace("Closing socket {}", socket); 590 try { 591 socket.close(); 592 LOG.debug("Closed socket {}", socket); 593 } catch (IOException e) { 594 if (LOG.isDebugEnabled()) { 595 LOG.debug("Caught exception closing socket " + socket + ". This exception will be ignored.", e); 596 } 597 } 598 } 599 } 600 } 601 602 /** 603 * Override so that stop() blocks until the run thread is no longer running. 604 */ 605 @Override 606 public void stop() throws Exception { 607 super.stop(); 608 CountDownLatch countDownLatch = stoppedLatch.get(); 609 if (countDownLatch != null && Thread.currentThread() != this.runnerThread) { 610 countDownLatch.await(1,TimeUnit.SECONDS); 611 } 612 } 613 614 protected void initializeStreams() throws Exception { 615 TcpBufferedInputStream buffIn = new TcpBufferedInputStream(socket.getInputStream(), ioBufferSize) { 616 @Override 617 public int read() throws IOException { 618 receiveCounter++; 619 return super.read(); 620 } 621 @Override 622 public int read(byte[] b, int off, int len) throws IOException { 623 receiveCounter++; 624 return super.read(b, off, len); 625 } 626 @Override 627 public long skip(long n) throws IOException { 628 receiveCounter++; 629 return super.skip(n); 630 } 631 @Override 632 protected void fill() throws IOException { 633 receiveCounter++; 634 super.fill(); 635 } 636 }; 637 //Unread the initBuffer that was used for protocol detection if it exists 638 //so the stream can start over 639 if (initBuffer != null) { 640 buffIn.unread(initBuffer.buffer.array()); 641 } 642 this.dataIn = new DataInputStream(buffIn); 643 TcpBufferedOutputStream outputStream = new TcpBufferedOutputStream(socket.getOutputStream(), ioBufferSize); 644 this.dataOut = new DataOutputStream(outputStream); 645 this.buffOut = outputStream; 646 647 } 648 649 protected void closeStreams() throws IOException { 650 if (dataOut != null) { 651 dataOut.close(); 652 } 653 if (dataIn != null) { 654 dataIn.close(); 655 } 656 } 657 658 public void setSocketOptions(Map<String, Object> socketOptions) { 659 this.socketOptions = new HashMap<String, Object>(socketOptions); 660 } 661 662 @Override 663 public String getRemoteAddress() { 664 if (socket != null) { 665 SocketAddress address = socket.getRemoteSocketAddress(); 666 if (address instanceof InetSocketAddress) { 667 return "tcp://" + ((InetSocketAddress)address).getAddress().getHostAddress() + ":" + ((InetSocketAddress)address).getPort(); 668 } else { 669 return "" + socket.getRemoteSocketAddress(); 670 } 671 } 672 return null; 673 } 674 675 @Override 676 public <T> T narrow(Class<T> target) { 677 if (target == Socket.class) { 678 return target.cast(socket); 679 } else if ( target == TimeStampStream.class) { 680 return target.cast(buffOut); 681 } 682 return super.narrow(target); 683 } 684 685 @Override 686 public int getReceiveCounter() { 687 return receiveCounter; 688 } 689 690 public static class InitBuffer { 691 public final int readSize; 692 public final ByteBuffer buffer; 693 694 public InitBuffer(int readSize, ByteBuffer buffer) { 695 if (buffer == null) { 696 throw new IllegalArgumentException("Null buffer not allowed."); 697 } 698 this.readSize = readSize; 699 this.buffer = buffer; 700 } 701 } 702 703 /** 704 * @param sock The socket on which to set the Traffic Class. 705 * @return Whether or not the Traffic Class was set on the given socket. 706 * @throws SocketException if the system does not support setting the 707 * Traffic Class. 708 * @throws IllegalArgumentException if both the Differentiated Services and 709 * Type of Services transport options have been set on the same 710 * connection. 711 */ 712 private boolean setTrafficClass(Socket sock) throws SocketException, 713 IllegalArgumentException { 714 if (sock == null 715 || (!this.diffServChosen && !this.typeOfServiceChosen)) { 716 return false; 717 } 718 if (this.diffServChosen && this.typeOfServiceChosen) { 719 throw new IllegalArgumentException("Cannot set both the " 720 + " Differentiated Services and Type of Services transport " 721 + " options on the same connection."); 722 } 723 724 sock.setTrafficClass(this.trafficClass); 725 726 int resultTrafficClass = sock.getTrafficClass(); 727 if (this.trafficClass != resultTrafficClass) { 728 // In the case where the user has specified the ECN bits (e.g. in 729 // Type of Service) but the system won't allow the ECN bits to be 730 // set or in the case where setting the traffic class failed for 731 // other reasons, emit a warning. 732 if ((this.trafficClass >> 2) == (resultTrafficClass >> 2) 733 && (this.trafficClass & 3) != (resultTrafficClass & 3)) { 734 LOG.warn("Attempted to set the Traffic Class to " 735 + this.trafficClass + " but the result Traffic Class was " 736 + resultTrafficClass + ". Please check that your system " 737 + "allows you to set the ECN bits (the first two bits)."); 738 } else { 739 LOG.warn("Attempted to set the Traffic Class to " 740 + this.trafficClass + " but the result Traffic Class was " 741 + resultTrafficClass + ". Please check that your system " 742 + "supports java.net.setTrafficClass."); 743 } 744 return false; 745 } 746 // Reset the guards that prevent both the Differentiated Services 747 // option and the Type of Service option from being set on the same 748 // connection. 749 this.diffServChosen = false; 750 this.typeOfServiceChosen = false; 751 return true; 752 } 753 754 public WireFormat getWireFormat() { 755 return wireFormat; 756 } 757}