001/** 002gxfdgvdfg * Licensed to the Apache Software Foundation (ASF) under one or more 003 * contributor license agreements. See the NOTICE file distributed with 004 * this work for additional information regarding copyright ownership. 005 * The ASF licenses this file to You under the Apache License, Version 2.0 006 * (the "License"); you may not use this file except in compliance with 007 * the License. You may obtain a copy of the License at 008 * 009 * http://www.apache.org/licenses/LICENSE-2.0 010 * 011 * Unless required by applicable law or agreed to in writing, software 012 * distributed under the License is distributed on an "AS IS" BASIS, 013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 014 * See the License for the specific language governing permissions and 015 * limitations under the License. 016 */ 017package org.apache.activemq.transport.tcp; 018 019import java.io.DataInputStream; 020import java.io.DataOutputStream; 021import java.io.IOException; 022import java.io.InterruptedIOException; 023import java.net.InetAddress; 024import java.net.InetSocketAddress; 025import java.net.Socket; 026import java.net.SocketAddress; 027import java.net.SocketException; 028import java.net.SocketTimeoutException; 029import java.net.URI; 030import java.net.UnknownHostException; 031import java.util.HashMap; 032import java.util.Map; 033import java.util.concurrent.CountDownLatch; 034import java.util.concurrent.TimeUnit; 035import java.util.concurrent.atomic.AtomicReference; 036 037import javax.net.SocketFactory; 038 039import org.apache.activemq.Service; 040import org.apache.activemq.thread.DefaultThreadPools; 041import org.apache.activemq.transport.Transport; 042import org.apache.activemq.transport.TransportLoggerFactory; 043import org.apache.activemq.transport.TransportThreadSupport; 044import org.apache.activemq.util.InetAddressUtil; 045import org.apache.activemq.util.IntrospectionSupport; 046import org.apache.activemq.util.ServiceStopper; 047import org.apache.activemq.wireformat.WireFormat; 048import org.slf4j.Logger; 049import org.slf4j.LoggerFactory; 050 051/** 052 * An implementation of the {@link Transport} interface using raw tcp/ip 053 * 054 * @author David Martin Clavo david(dot)martin(dot)clavo(at)gmail.com (logging improvement modifications) 055 * 056 */ 057public class TcpTransport extends TransportThreadSupport implements Transport, Service, Runnable { 058 private static final Logger LOG = LoggerFactory.getLogger(TcpTransport.class); 059 protected final URI remoteLocation; 060 protected final URI localLocation; 061 protected final WireFormat wireFormat; 062 063 protected int connectionTimeout = 30000; 064 protected int soTimeout; 065 protected int socketBufferSize = 64 * 1024; 066 protected int ioBufferSize = 8 * 1024; 067 protected boolean closeAsync=true; 068 protected Socket socket; 069 protected DataOutputStream dataOut; 070 protected DataInputStream dataIn; 071 protected TimeStampStream buffOut = null; 072 073 /** 074 * The Traffic Class to be set on the socket. 075 */ 076 protected int trafficClass = 0; 077 /** 078 * Keeps track of attempts to set the Traffic Class on the socket. 079 */ 080 private boolean trafficClassSet = false; 081 /** 082 * Prevents setting both the Differentiated Services and Type of Service 083 * transport options at the same time, since they share the same spot in 084 * the TCP/IP packet headers. 085 */ 086 protected boolean diffServChosen = false; 087 protected boolean typeOfServiceChosen = false; 088 /** 089 * trace=true -> the Transport stack where this TcpTransport 090 * object will be, will have a TransportLogger layer 091 * trace=false -> the Transport stack where this TcpTransport 092 * object will be, will NOT have a TransportLogger layer, and therefore 093 * will never be able to print logging messages. 094 * This parameter is most probably set in Connection or TransportConnector URIs. 095 */ 096 protected boolean trace = false; 097 /** 098 * Name of the LogWriter implementation to use. 099 * Names are mapped to classes in the resources/META-INF/services/org/apache/activemq/transport/logwriters directory. 100 * This parameter is most probably set in Connection or TransportConnector URIs. 101 */ 102 protected String logWriterName = TransportLoggerFactory.defaultLogWriterName; 103 /** 104 * Specifies if the TransportLogger will be manageable by JMX or not. 105 * Also, as long as there is at least 1 TransportLogger which is manageable, 106 * a TransportLoggerControl MBean will me created. 107 */ 108 protected boolean dynamicManagement = false; 109 /** 110 * startLogging=true -> the TransportLogger object of the Transport stack 111 * will initially write messages to the log. 112 * startLogging=false -> the TransportLogger object of the Transport stack 113 * will initially NOT write messages to the log. 114 * This parameter only has an effect if trace == true. 115 * This parameter is most probably set in Connection or TransportConnector URIs. 116 */ 117 protected boolean startLogging = true; 118 /** 119 * Specifies the port that will be used by the JMX server to manage 120 * the TransportLoggers. 121 * This should only be set in an URI by a client (producer or consumer) since 122 * a broker will already create a JMX server. 123 * It is useful for people who test a broker and clients in the same machine 124 * and want to control both via JMX; a different port will be needed. 125 */ 126 protected int jmxPort = 1099; 127 protected boolean useLocalHost = false; 128 protected int minmumWireFormatVersion; 129 protected SocketFactory socketFactory; 130 protected final AtomicReference<CountDownLatch> stoppedLatch = new AtomicReference<CountDownLatch>(); 131 132 private Map<String, Object> socketOptions; 133 private int soLinger = Integer.MIN_VALUE; 134 private Boolean keepAlive; 135 private Boolean tcpNoDelay; 136 private Thread runnerThread; 137 private volatile int receiveCounter; 138 139 /** 140 * Connect to a remote Node - e.g. a Broker 141 * 142 * @param wireFormat 143 * @param socketFactory 144 * @param remoteLocation 145 * @param localLocation - e.g. local InetAddress and local port 146 * @throws IOException 147 * @throws UnknownHostException 148 */ 149 public TcpTransport(WireFormat wireFormat, SocketFactory socketFactory, URI remoteLocation, 150 URI localLocation) throws UnknownHostException, IOException { 151 this.wireFormat = wireFormat; 152 this.socketFactory = socketFactory; 153 try { 154 this.socket = socketFactory.createSocket(); 155 } catch (SocketException e) { 156 this.socket = null; 157 } 158 this.remoteLocation = remoteLocation; 159 this.localLocation = localLocation; 160 setDaemon(false); 161 } 162 163 /** 164 * Initialize from a server Socket 165 * 166 * @param wireFormat 167 * @param socket 168 * @throws IOException 169 */ 170 public TcpTransport(WireFormat wireFormat, Socket socket) throws IOException { 171 this.wireFormat = wireFormat; 172 this.socket = socket; 173 this.remoteLocation = null; 174 this.localLocation = null; 175 setDaemon(true); 176 } 177 178 /** 179 * A one way asynchronous send 180 */ 181 public void oneway(Object command) throws IOException { 182 checkStarted(); 183 wireFormat.marshal(command, dataOut); 184 dataOut.flush(); 185 } 186 187 /** 188 * @return pretty print of 'this' 189 */ 190 @Override 191 public String toString() { 192 return "" + (socket.isConnected() ? "tcp://" + socket.getInetAddress() + ":" + socket.getPort() 193 : (localLocation != null ? localLocation : remoteLocation)) ; 194 } 195 196 /** 197 * reads packets from a Socket 198 */ 199 public void run() { 200 LOG.trace("TCP consumer thread for " + this + " starting"); 201 this.runnerThread=Thread.currentThread(); 202 try { 203 while (!isStopped()) { 204 doRun(); 205 } 206 } catch (IOException e) { 207 stoppedLatch.get().countDown(); 208 onException(e); 209 } catch (Throwable e){ 210 stoppedLatch.get().countDown(); 211 IOException ioe=new IOException("Unexpected error occured: " + e); 212 ioe.initCause(e); 213 onException(ioe); 214 }finally { 215 stoppedLatch.get().countDown(); 216 } 217 } 218 219 protected void doRun() throws IOException { 220 try { 221 Object command = readCommand(); 222 doConsume(command); 223 } catch (SocketTimeoutException e) { 224 } catch (InterruptedIOException e) { 225 } 226 } 227 228 protected Object readCommand() throws IOException { 229 return wireFormat.unmarshal(dataIn); 230 } 231 232 // Properties 233 // ------------------------------------------------------------------------- 234 public String getDiffServ() { 235 // This is the value requested by the user by setting the Tcp Transport 236 // options. If the socket hasn't been created, then this value may not 237 // reflect the value returned by Socket.getTrafficClass(). 238 return Integer.toString(this.trafficClass); 239 } 240 241 public void setDiffServ(String diffServ) throws IllegalArgumentException { 242 this.trafficClass = QualityOfServiceUtils.getDSCP(diffServ); 243 this.diffServChosen = true; 244 } 245 246 public int getTypeOfService() { 247 // This is the value requested by the user by setting the Tcp Transport 248 // options. If the socket hasn't been created, then this value may not 249 // reflect the value returned by Socket.getTrafficClass(). 250 return this.trafficClass; 251 } 252 253 public void setTypeOfService(int typeOfService) { 254 this.trafficClass = QualityOfServiceUtils.getToS(typeOfService); 255 this.typeOfServiceChosen = true; 256 } 257 258 public boolean isTrace() { 259 return trace; 260 } 261 262 public void setTrace(boolean trace) { 263 this.trace = trace; 264 } 265 266 public String getLogWriterName() { 267 return logWriterName; 268 } 269 270 public void setLogWriterName(String logFormat) { 271 this.logWriterName = logFormat; 272 } 273 274 public boolean isDynamicManagement() { 275 return dynamicManagement; 276 } 277 278 public void setDynamicManagement(boolean useJmx) { 279 this.dynamicManagement = useJmx; 280 } 281 282 public boolean isStartLogging() { 283 return startLogging; 284 } 285 286 public void setStartLogging(boolean startLogging) { 287 this.startLogging = startLogging; 288 } 289 290 public int getJmxPort() { 291 return jmxPort; 292 } 293 294 public void setJmxPort(int jmxPort) { 295 this.jmxPort = jmxPort; 296 } 297 298 public int getMinmumWireFormatVersion() { 299 return minmumWireFormatVersion; 300 } 301 302 public void setMinmumWireFormatVersion(int minmumWireFormatVersion) { 303 this.minmumWireFormatVersion = minmumWireFormatVersion; 304 } 305 306 public boolean isUseLocalHost() { 307 return useLocalHost; 308 } 309 310 /** 311 * Sets whether 'localhost' or the actual local host name should be used to 312 * make local connections. On some operating systems such as Macs its not 313 * possible to connect as the local host name so localhost is better. 314 */ 315 public void setUseLocalHost(boolean useLocalHost) { 316 this.useLocalHost = useLocalHost; 317 } 318 319 public int getSocketBufferSize() { 320 return socketBufferSize; 321 } 322 323 /** 324 * Sets the buffer size to use on the socket 325 */ 326 public void setSocketBufferSize(int socketBufferSize) { 327 this.socketBufferSize = socketBufferSize; 328 } 329 330 public int getSoTimeout() { 331 return soTimeout; 332 } 333 334 /** 335 * Sets the socket timeout 336 */ 337 public void setSoTimeout(int soTimeout) { 338 this.soTimeout = soTimeout; 339 } 340 341 public int getConnectionTimeout() { 342 return connectionTimeout; 343 } 344 345 /** 346 * Sets the timeout used to connect to the socket 347 */ 348 public void setConnectionTimeout(int connectionTimeout) { 349 this.connectionTimeout = connectionTimeout; 350 } 351 352 public Boolean getKeepAlive() { 353 return keepAlive; 354 } 355 356 /** 357 * Enable/disable TCP KEEP_ALIVE mode 358 */ 359 public void setKeepAlive(Boolean keepAlive) { 360 this.keepAlive = keepAlive; 361 } 362 363 /** 364 * Enable/disable soLinger 365 * @param soLinger enabled if > -1, disabled if == -1, system default otherwise 366 */ 367 public void setSoLinger(int soLinger) { 368 this.soLinger = soLinger; 369 } 370 371 public int getSoLinger() { 372 return soLinger; 373 } 374 375 public Boolean getTcpNoDelay() { 376 return tcpNoDelay; 377 } 378 379 /** 380 * Enable/disable the TCP_NODELAY option on the socket 381 */ 382 public void setTcpNoDelay(Boolean tcpNoDelay) { 383 this.tcpNoDelay = tcpNoDelay; 384 } 385 386 /** 387 * @return the ioBufferSize 388 */ 389 public int getIoBufferSize() { 390 return this.ioBufferSize; 391 } 392 393 /** 394 * @param ioBufferSize the ioBufferSize to set 395 */ 396 public void setIoBufferSize(int ioBufferSize) { 397 this.ioBufferSize = ioBufferSize; 398 } 399 400 /** 401 * @return the closeAsync 402 */ 403 public boolean isCloseAsync() { 404 return closeAsync; 405 } 406 407 /** 408 * @param closeAsync the closeAsync to set 409 */ 410 public void setCloseAsync(boolean closeAsync) { 411 this.closeAsync = closeAsync; 412 } 413 414 // Implementation methods 415 // ------------------------------------------------------------------------- 416 protected String resolveHostName(String host) throws UnknownHostException { 417 if (isUseLocalHost()) { 418 String localName = InetAddressUtil.getLocalHostName(); 419 if (localName != null && localName.equals(host)) { 420 return "localhost"; 421 } 422 } 423 return host; 424 } 425 426 /** 427 * Configures the socket for use 428 * 429 * @param sock 430 * @throws SocketException, IllegalArgumentException if setting the options 431 * on the socket failed. 432 */ 433 protected void initialiseSocket(Socket sock) throws SocketException, 434 IllegalArgumentException { 435 if (socketOptions != null) { 436 IntrospectionSupport.setProperties(socket, socketOptions); 437 } 438 439 try { 440 sock.setReceiveBufferSize(socketBufferSize); 441 sock.setSendBufferSize(socketBufferSize); 442 } catch (SocketException se) { 443 LOG.warn("Cannot set socket buffer size = " + socketBufferSize); 444 LOG.debug("Cannot set socket buffer size. Reason: " + se, se); 445 } 446 sock.setSoTimeout(soTimeout); 447 448 if (keepAlive != null) { 449 sock.setKeepAlive(keepAlive.booleanValue()); 450 } 451 452 if (soLinger > -1) { 453 sock.setSoLinger(true, soLinger); 454 } else if (soLinger == -1) { 455 sock.setSoLinger(false, 0); 456 } 457 if (tcpNoDelay != null) { 458 sock.setTcpNoDelay(tcpNoDelay.booleanValue()); 459 } 460 if (!this.trafficClassSet) { 461 this.trafficClassSet = setTrafficClass(sock); 462 } 463 } 464 465 @Override 466 protected void doStart() throws Exception { 467 connect(); 468 stoppedLatch.set(new CountDownLatch(1)); 469 super.doStart(); 470 } 471 472 protected void connect() throws Exception { 473 474 if (socket == null && socketFactory == null) { 475 throw new IllegalStateException("Cannot connect if the socket or socketFactory have not been set"); 476 } 477 478 InetSocketAddress localAddress = null; 479 InetSocketAddress remoteAddress = null; 480 481 if (localLocation != null) { 482 localAddress = new InetSocketAddress(InetAddress.getByName(localLocation.getHost()), 483 localLocation.getPort()); 484 } 485 486 if (remoteLocation != null) { 487 String host = resolveHostName(remoteLocation.getHost()); 488 remoteAddress = new InetSocketAddress(host, remoteLocation.getPort()); 489 } 490 // Set the traffic class before the socket is connected when possible so 491 // that the connection packets are given the correct traffic class. 492 this.trafficClassSet = setTrafficClass(socket); 493 494 if (socket != null) { 495 496 if (localAddress != null) { 497 socket.bind(localAddress); 498 } 499 500 // If it's a server accepted socket.. we don't need to connect it 501 // to a remote address. 502 if (remoteAddress != null) { 503 if (connectionTimeout >= 0) { 504 socket.connect(remoteAddress, connectionTimeout); 505 } else { 506 socket.connect(remoteAddress); 507 } 508 } 509 510 } else { 511 // For SSL sockets.. you can't create an unconnected socket :( 512 // This means the timout option are not supported either. 513 if (localAddress != null) { 514 socket = socketFactory.createSocket(remoteAddress.getAddress(), remoteAddress.getPort(), 515 localAddress.getAddress(), localAddress.getPort()); 516 } else { 517 socket = socketFactory.createSocket(remoteAddress.getAddress(), remoteAddress.getPort()); 518 } 519 } 520 521 initialiseSocket(socket); 522 initializeStreams(); 523 } 524 525 @Override 526 protected void doStop(ServiceStopper stopper) throws Exception { 527 if (LOG.isDebugEnabled()) { 528 LOG.debug("Stopping transport " + this); 529 } 530 531 // Closing the streams flush the sockets before closing.. if the socket 532 // is hung.. then this hangs the close. 533 // closeStreams(); 534 if (socket != null) { 535 if (closeAsync) { 536 //closing the socket can hang also 537 final CountDownLatch latch = new CountDownLatch(1); 538 539 DefaultThreadPools.getDefaultTaskRunnerFactory().execute(new Runnable() { 540 public void run() { 541 try { 542 socket.close(); 543 } catch (IOException e) { 544 if (LOG.isDebugEnabled()) { 545 LOG.debug("Caught exception closing socket", e); 546 } 547 } finally { 548 latch.countDown(); 549 } 550 } 551 }); 552 553 try { 554 latch.await(1,TimeUnit.SECONDS); 555 } catch (InterruptedException e) { 556 Thread.currentThread().interrupt(); 557 } 558 559 } else { 560 561 try { 562 socket.close(); 563 } catch (IOException e) { 564 LOG.debug("Caught exception closing socket",e); 565 } 566 } 567 } 568 } 569 570 /** 571 * Override so that stop() blocks until the run thread is no longer running. 572 */ 573 @Override 574 public void stop() throws Exception { 575 super.stop(); 576 CountDownLatch countDownLatch = stoppedLatch.get(); 577 if (countDownLatch != null && Thread.currentThread() != this.runnerThread) { 578 countDownLatch.await(1,TimeUnit.SECONDS); 579 } 580 } 581 582 protected void initializeStreams() throws Exception { 583 TcpBufferedInputStream buffIn = new TcpBufferedInputStream(socket.getInputStream(), ioBufferSize) { 584 @Override 585 public int read() throws IOException { 586 receiveCounter++; 587 return super.read(); 588 } 589 @Override 590 public int read(byte[] b, int off, int len) throws IOException { 591 receiveCounter++; 592 return super.read(b, off, len); 593 } 594 @Override 595 public long skip(long n) throws IOException { 596 receiveCounter++; 597 return super.skip(n); 598 } 599 @Override 600 protected void fill() throws IOException { 601 receiveCounter++; 602 super.fill(); 603 } 604 }; 605 this.dataIn = new DataInputStream(buffIn); 606 TcpBufferedOutputStream outputStream = new TcpBufferedOutputStream(socket.getOutputStream(), ioBufferSize); 607 this.dataOut = new DataOutputStream(outputStream); 608 this.buffOut = outputStream; 609 } 610 611 protected void closeStreams() throws IOException { 612 if (dataOut != null) { 613 dataOut.close(); 614 } 615 if (dataIn != null) { 616 dataIn.close(); 617 } 618 } 619 620 public void setSocketOptions(Map<String, Object> socketOptions) { 621 this.socketOptions = new HashMap<String, Object>(socketOptions); 622 } 623 624 public String getRemoteAddress() { 625 if (socket != null) { 626 SocketAddress address = socket.getRemoteSocketAddress(); 627 if (address instanceof InetSocketAddress) { 628 return "tcp://" + ((InetSocketAddress)address).getAddress().getHostAddress() + ":" + ((InetSocketAddress)address).getPort(); 629 } else { 630 return "" + socket.getRemoteSocketAddress(); 631 } 632 } 633 return null; 634 } 635 636 @Override 637 public <T> T narrow(Class<T> target) { 638 if (target == Socket.class) { 639 return target.cast(socket); 640 } else if ( target == TimeStampStream.class) { 641 return target.cast(buffOut); 642 } 643 return super.narrow(target); 644 } 645 646 public int getReceiveCounter() { 647 return receiveCounter; 648 } 649 650 /** 651 * @param sock The socket on which to set the Traffic Class. 652 * @return Whether or not the Traffic Class was set on the given socket. 653 * @throws SocketException if the system does not support setting the 654 * Traffic Class. 655 * @throws IllegalArgumentException if both the Differentiated Services and 656 * Type of Services transport options have been set on the same 657 * connection. 658 */ 659 private boolean setTrafficClass(Socket sock) throws SocketException, 660 IllegalArgumentException { 661 if (sock == null 662 || (!this.diffServChosen && !this.typeOfServiceChosen)) { 663 return false; 664 } 665 if (this.diffServChosen && this.typeOfServiceChosen) { 666 throw new IllegalArgumentException("Cannot set both the " 667 + " Differentiated Services and Type of Services transport " 668 + " options on the same connection."); 669 } 670 671 sock.setTrafficClass(this.trafficClass); 672 673 int resultTrafficClass = sock.getTrafficClass(); 674 if (this.trafficClass != resultTrafficClass) { 675 // In the case where the user has specified the ECN bits (e.g. in 676 // Type of Service) but the system won't allow the ECN bits to be 677 // set or in the case where setting the traffic class failed for 678 // other reasons, emit a warning. 679 if ((this.trafficClass >> 2) == (resultTrafficClass >> 2) 680 && (this.trafficClass & 3) != (resultTrafficClass & 3)) { 681 LOG.warn("Attempted to set the Traffic Class to " 682 + this.trafficClass + " but the result Traffic Class was " 683 + resultTrafficClass + ". Please check that your system " 684 + "allows you to set the ECN bits (the first two bits)."); 685 } else { 686 LOG.warn("Attempted to set the Traffic Class to " 687 + this.trafficClass + " but the result Traffic Class was " 688 + resultTrafficClass + ". Please check that your system " 689 + "supports java.net.setTrafficClass."); 690 } 691 return false; 692 } 693 // Reset the guards that prevent both the Differentiated Services 694 // option and the Type of Service option from being set on the same 695 // connection. 696 this.diffServChosen = false; 697 this.typeOfServiceChosen = false; 698 return true; 699 } 700 701 public WireFormat getWireFormat() { 702 return wireFormat; 703 } 704}