001/**
002 * Licensed to the Apache Software Foundation (ASF) under one or more
003 * contributor license agreements.  See the NOTICE file distributed with
004 * this work for additional information regarding copyright ownership.
005 * The ASF licenses this file to You under the Apache License, Version 2.0
006 * (the "License"); you may not use this file except in compliance with
007 * the License.  You may obtain a copy of the License at
008 *
009 *      http://www.apache.org/licenses/LICENSE-2.0
010 *
011 * Unless required by applicable law or agreed to in writing, software
012 * distributed under the License is distributed on an "AS IS" BASIS,
013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014 * See the License for the specific language governing permissions and
015 * limitations under the License.
016 */
017package org.apache.activemq;
018
019import java.io.IOException;
020import java.io.InputStream;
021import java.io.OutputStream;
022import java.net.URI;
023import java.net.URISyntaxException;
024import java.util.HashMap;
025import java.util.Iterator;
026import java.util.Map;
027import java.util.concurrent.*;
028import java.util.concurrent.atomic.AtomicBoolean;
029import java.util.concurrent.atomic.AtomicInteger;
030
031import javax.jms.Connection;
032import javax.jms.ConnectionConsumer;
033import javax.jms.ConnectionMetaData;
034import javax.jms.DeliveryMode;
035import javax.jms.Destination;
036import javax.jms.ExceptionListener;
037import javax.jms.IllegalStateException;
038import javax.jms.InvalidDestinationException;
039import javax.jms.JMSException;
040import javax.jms.Queue;
041import javax.jms.QueueConnection;
042import javax.jms.QueueSession;
043import javax.jms.ServerSessionPool;
044import javax.jms.Session;
045import javax.jms.Topic;
046import javax.jms.TopicConnection;
047import javax.jms.TopicSession;
048import javax.jms.XAConnection;
049
050import org.apache.activemq.advisory.DestinationSource;
051import org.apache.activemq.blob.BlobTransferPolicy;
052import org.apache.activemq.command.ActiveMQDestination;
053import org.apache.activemq.command.ActiveMQMessage;
054import org.apache.activemq.command.ActiveMQTempDestination;
055import org.apache.activemq.command.ActiveMQTempQueue;
056import org.apache.activemq.command.ActiveMQTempTopic;
057import org.apache.activemq.command.BrokerInfo;
058import org.apache.activemq.command.Command;
059import org.apache.activemq.command.CommandTypes;
060import org.apache.activemq.command.ConnectionControl;
061import org.apache.activemq.command.ConnectionError;
062import org.apache.activemq.command.ConnectionId;
063import org.apache.activemq.command.ConnectionInfo;
064import org.apache.activemq.command.ConsumerControl;
065import org.apache.activemq.command.ConsumerId;
066import org.apache.activemq.command.ConsumerInfo;
067import org.apache.activemq.command.ControlCommand;
068import org.apache.activemq.command.DestinationInfo;
069import org.apache.activemq.command.ExceptionResponse;
070import org.apache.activemq.command.Message;
071import org.apache.activemq.command.MessageDispatch;
072import org.apache.activemq.command.MessageId;
073import org.apache.activemq.command.ProducerAck;
074import org.apache.activemq.command.ProducerId;
075import org.apache.activemq.command.RemoveInfo;
076import org.apache.activemq.command.RemoveSubscriptionInfo;
077import org.apache.activemq.command.Response;
078import org.apache.activemq.command.SessionId;
079import org.apache.activemq.command.ShutdownInfo;
080import org.apache.activemq.command.WireFormatInfo;
081import org.apache.activemq.management.JMSConnectionStatsImpl;
082import org.apache.activemq.management.JMSStatsImpl;
083import org.apache.activemq.management.StatsCapable;
084import org.apache.activemq.management.StatsImpl;
085import org.apache.activemq.state.CommandVisitorAdapter;
086import org.apache.activemq.thread.Scheduler;
087import org.apache.activemq.thread.TaskRunnerFactory;
088import org.apache.activemq.transport.FutureResponse;
089import org.apache.activemq.transport.ResponseCallback;
090import org.apache.activemq.transport.Transport;
091import org.apache.activemq.transport.TransportListener;
092import org.apache.activemq.transport.failover.FailoverTransport;
093import org.apache.activemq.util.IdGenerator;
094import org.apache.activemq.util.IntrospectionSupport;
095import org.apache.activemq.util.JMSExceptionSupport;
096import org.apache.activemq.util.LongSequenceGenerator;
097import org.apache.activemq.util.ServiceSupport;
098import org.slf4j.Logger;
099import org.slf4j.LoggerFactory;
100
101public class ActiveMQConnection implements Connection, TopicConnection, QueueConnection, StatsCapable, Closeable, StreamConnection, TransportListener, EnhancedConnection {
102
103    public static final String DEFAULT_USER = ActiveMQConnectionFactory.DEFAULT_USER;
104    public static final String DEFAULT_PASSWORD = ActiveMQConnectionFactory.DEFAULT_PASSWORD;
105    public static final String DEFAULT_BROKER_URL = ActiveMQConnectionFactory.DEFAULT_BROKER_URL;
106
107    private static final Logger LOG = LoggerFactory.getLogger(ActiveMQConnection.class);
108
109    public final ConcurrentHashMap<ActiveMQTempDestination, ActiveMQTempDestination> activeTempDestinations = new ConcurrentHashMap<ActiveMQTempDestination, ActiveMQTempDestination>();
110
111    protected boolean dispatchAsync=true;
112    protected boolean alwaysSessionAsync = true;
113
114    private TaskRunnerFactory sessionTaskRunner;
115    private final ThreadPoolExecutor executor;
116
117    // Connection state variables
118    private final ConnectionInfo info;
119    private ExceptionListener exceptionListener;
120    private ClientInternalExceptionListener clientInternalExceptionListener;
121    private boolean clientIDSet;
122    private boolean isConnectionInfoSentToBroker;
123    private boolean userSpecifiedClientID;
124
125    // Configuration options variables
126    private ActiveMQPrefetchPolicy prefetchPolicy = new ActiveMQPrefetchPolicy();
127    private BlobTransferPolicy blobTransferPolicy;
128    private RedeliveryPolicy redeliveryPolicy;
129    private MessageTransformer transformer;
130
131    private boolean disableTimeStampsByDefault;
132    private boolean optimizedMessageDispatch = true;
133    private boolean copyMessageOnSend = true;
134    private boolean useCompression;
135    private boolean objectMessageSerializationDefered;
136    private boolean useAsyncSend;
137    private boolean optimizeAcknowledge;
138    private long optimizeAcknowledgeTimeOut = 0;
139    private boolean nestedMapAndListEnabled = true;
140    private boolean useRetroactiveConsumer;
141    private boolean exclusiveConsumer;
142    private boolean alwaysSyncSend;
143    private int closeTimeout = 15000;
144    private boolean watchTopicAdvisories = true;
145    private long warnAboutUnstartedConnectionTimeout = 500L;
146    private int sendTimeout =0;
147    private boolean sendAcksAsync=true;
148    private boolean checkForDuplicates = true;
149
150    private final Transport transport;
151    private final IdGenerator clientIdGenerator;
152    private final JMSStatsImpl factoryStats;
153    private final JMSConnectionStatsImpl stats;
154
155    private final AtomicBoolean started = new AtomicBoolean(false);
156    private final AtomicBoolean closing = new AtomicBoolean(false);
157    private final AtomicBoolean closed = new AtomicBoolean(false);
158    private final AtomicBoolean transportFailed = new AtomicBoolean(false);
159    private final CopyOnWriteArrayList<ActiveMQSession> sessions = new CopyOnWriteArrayList<ActiveMQSession>();
160    private final CopyOnWriteArrayList<ActiveMQConnectionConsumer> connectionConsumers = new CopyOnWriteArrayList<ActiveMQConnectionConsumer>();
161    private final CopyOnWriteArrayList<ActiveMQInputStream> inputStreams = new CopyOnWriteArrayList<ActiveMQInputStream>();
162    private final CopyOnWriteArrayList<ActiveMQOutputStream> outputStreams = new CopyOnWriteArrayList<ActiveMQOutputStream>();
163    private final CopyOnWriteArrayList<TransportListener> transportListeners = new CopyOnWriteArrayList<TransportListener>();
164
165    // Maps ConsumerIds to ActiveMQConsumer objects
166    private final ConcurrentHashMap<ConsumerId, ActiveMQDispatcher> dispatchers = new ConcurrentHashMap<ConsumerId, ActiveMQDispatcher>();
167    private final ConcurrentHashMap<ProducerId, ActiveMQMessageProducer> producers = new ConcurrentHashMap<ProducerId, ActiveMQMessageProducer>();
168    private final LongSequenceGenerator sessionIdGenerator = new LongSequenceGenerator();
169    private final SessionId connectionSessionId;
170    private final LongSequenceGenerator consumerIdGenerator = new LongSequenceGenerator();
171    private final LongSequenceGenerator producerIdGenerator = new LongSequenceGenerator();
172    private final LongSequenceGenerator tempDestinationIdGenerator = new LongSequenceGenerator();
173    private final LongSequenceGenerator localTransactionIdGenerator = new LongSequenceGenerator();
174
175    private AdvisoryConsumer advisoryConsumer;
176    private final CountDownLatch brokerInfoReceived = new CountDownLatch(1);
177    private BrokerInfo brokerInfo;
178    private IOException firstFailureError;
179    private int producerWindowSize = ActiveMQConnectionFactory.DEFAULT_PRODUCER_WINDOW_SIZE;
180
181    // Assume that protocol is the latest. Change to the actual protocol
182    // version when a WireFormatInfo is received.
183    private final AtomicInteger protocolVersion = new AtomicInteger(CommandTypes.PROTOCOL_VERSION);
184    private final long timeCreated;
185    private final ConnectionAudit connectionAudit = new ConnectionAudit();
186    private DestinationSource destinationSource;
187    private final Object ensureConnectionInfoSentMutex = new Object();
188    private boolean useDedicatedTaskRunner;
189    protected volatile CountDownLatch transportInterruptionProcessingComplete;
190    private long consumerFailoverRedeliveryWaitPeriod;
191    private Scheduler scheduler;
192    private boolean messagePrioritySupported = true;
193    private boolean transactedIndividualAck = false;
194    private boolean nonBlockingRedelivery = false;
195
196    /**
197     * Construct an <code>ActiveMQConnection</code>
198     *
199     * @param transport
200     * @param factoryStats
201     * @throws Exception
202     */
203    protected ActiveMQConnection(final Transport transport, IdGenerator clientIdGenerator, IdGenerator connectionIdGenerator, JMSStatsImpl factoryStats) throws Exception {
204
205        this.transport = transport;
206        this.clientIdGenerator = clientIdGenerator;
207        this.factoryStats = factoryStats;
208
209        // Configure a single threaded executor who's core thread can timeout if
210        // idle
211        executor = new ThreadPoolExecutor(1, 1, 5, TimeUnit.SECONDS, new LinkedBlockingQueue<Runnable>(), new ThreadFactory() {
212            public Thread newThread(Runnable r) {
213                Thread thread = new Thread(r, "ActiveMQ Connection Executor: " + transport);
214                //Don't make these daemon threads - see https://issues.apache.org/jira/browse/AMQ-796
215                //thread.setDaemon(true);
216                return thread;
217            }
218        });
219        // asyncConnectionThread.allowCoreThreadTimeOut(true);
220        String uniqueId = connectionIdGenerator.generateId();
221        this.info = new ConnectionInfo(new ConnectionId(uniqueId));
222        this.info.setManageable(true);
223        this.info.setFaultTolerant(transport.isFaultTolerant());
224        this.connectionSessionId = new SessionId(info.getConnectionId(), -1);
225
226        this.transport.setTransportListener(this);
227
228        this.stats = new JMSConnectionStatsImpl(sessions, this instanceof XAConnection);
229        this.factoryStats.addConnection(this);
230        this.timeCreated = System.currentTimeMillis();
231        this.connectionAudit.setCheckForDuplicates(transport.isFaultTolerant());
232    }
233
234    protected void setUserName(String userName) {
235        this.info.setUserName(userName);
236    }
237
238    protected void setPassword(String password) {
239        this.info.setPassword(password);
240    }
241
242    /**
243     * A static helper method to create a new connection
244     *
245     * @return an ActiveMQConnection
246     * @throws JMSException
247     */
248    public static ActiveMQConnection makeConnection() throws JMSException {
249        ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory();
250        return (ActiveMQConnection)factory.createConnection();
251    }
252
253    /**
254     * A static helper method to create a new connection
255     *
256     * @param uri
257     * @return and ActiveMQConnection
258     * @throws JMSException
259     */
260    public static ActiveMQConnection makeConnection(String uri) throws JMSException, URISyntaxException {
261        ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory(uri);
262        return (ActiveMQConnection)factory.createConnection();
263    }
264
265    /**
266     * A static helper method to create a new connection
267     *
268     * @param user
269     * @param password
270     * @param uri
271     * @return an ActiveMQConnection
272     * @throws JMSException
273     */
274    public static ActiveMQConnection makeConnection(String user, String password, String uri) throws JMSException, URISyntaxException {
275        ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory(user, password, new URI(uri));
276        return (ActiveMQConnection)factory.createConnection();
277    }
278
279    /**
280     * @return a number unique for this connection
281     */
282    public JMSConnectionStatsImpl getConnectionStats() {
283        return stats;
284    }
285
286    /**
287     * Creates a <CODE>Session</CODE> object.
288     *
289     * @param transacted indicates whether the session is transacted
290     * @param acknowledgeMode indicates whether the consumer or the client will
291     *                acknowledge any messages it receives; ignored if the
292     *                session is transacted. Legal values are
293     *                <code>Session.AUTO_ACKNOWLEDGE</code>,
294     *                <code>Session.CLIENT_ACKNOWLEDGE</code>, and
295     *                <code>Session.DUPS_OK_ACKNOWLEDGE</code>.
296     * @return a newly created session
297     * @throws JMSException if the <CODE>Connection</CODE> object fails to
298     *                 create a session due to some internal error or lack of
299     *                 support for the specific transaction and acknowledgement
300     *                 mode.
301     * @see Session#AUTO_ACKNOWLEDGE
302     * @see Session#CLIENT_ACKNOWLEDGE
303     * @see Session#DUPS_OK_ACKNOWLEDGE
304     * @since 1.1
305     */
306    public Session createSession(boolean transacted, int acknowledgeMode) throws JMSException {
307        checkClosedOrFailed();
308        ensureConnectionInfoSent();
309        if(!transacted) {
310            if (acknowledgeMode==Session.SESSION_TRANSACTED) {
311                throw new JMSException("acknowledgeMode SESSION_TRANSACTED cannot be used for an non-transacted Session");
312            } else if (acknowledgeMode < Session.SESSION_TRANSACTED || acknowledgeMode > ActiveMQSession.MAX_ACK_CONSTANT) {
313                throw new JMSException("invalid acknowledgeMode: " + acknowledgeMode + ". Valid values are Session.AUTO_ACKNOWLEDGE (1), " +
314                        "Session.CLIENT_ACKNOWLEDGE (2), Session.DUPS_OK_ACKNOWLEDGE (3), ActiveMQSession.INDIVIDUAL_ACKNOWLEDGE (4) or for transacted sessions Session.SESSION_TRANSACTED (0)");
315            }
316        }
317        return new ActiveMQSession(this, getNextSessionId(), transacted ? Session.SESSION_TRANSACTED : (acknowledgeMode == Session.SESSION_TRANSACTED
318            ? Session.AUTO_ACKNOWLEDGE : acknowledgeMode), isDispatchAsync(), isAlwaysSessionAsync());
319    }
320
321    /**
322     * @return sessionId
323     */
324    protected SessionId getNextSessionId() {
325        return new SessionId(info.getConnectionId(), sessionIdGenerator.getNextSequenceId());
326    }
327
328    /**
329     * Gets the client identifier for this connection.
330     * <P>
331     * This value is specific to the JMS provider. It is either preconfigured by
332     * an administrator in a <CODE> ConnectionFactory</CODE> object or assigned
333     * dynamically by the application by calling the <code>setClientID</code>
334     * method.
335     *
336     * @return the unique client identifier
337     * @throws JMSException if the JMS provider fails to return the client ID
338     *                 for this connection due to some internal error.
339     */
340    public String getClientID() throws JMSException {
341        checkClosedOrFailed();
342        return this.info.getClientId();
343    }
344
345    /**
346     * Sets the client identifier for this connection.
347     * <P>
348     * The preferred way to assign a JMS client's client identifier is for it to
349     * be configured in a client-specific <CODE>ConnectionFactory</CODE>
350     * object and transparently assigned to the <CODE>Connection</CODE> object
351     * it creates.
352     * <P>
353     * Alternatively, a client can set a connection's client identifier using a
354     * provider-specific value. The facility to set a connection's client
355     * identifier explicitly is not a mechanism for overriding the identifier
356     * that has been administratively configured. It is provided for the case
357     * where no administratively specified identifier exists. If one does exist,
358     * an attempt to change it by setting it must throw an
359     * <CODE>IllegalStateException</CODE>. If a client sets the client
360     * identifier explicitly, it must do so immediately after it creates the
361     * connection and before any other action on the connection is taken. After
362     * this point, setting the client identifier is a programming error that
363     * should throw an <CODE>IllegalStateException</CODE>.
364     * <P>
365     * The purpose of the client identifier is to associate a connection and its
366     * objects with a state maintained on behalf of the client by a provider.
367     * The only such state identified by the JMS API is that required to support
368     * durable subscriptions.
369     * <P>
370     * If another connection with the same <code>clientID</code> is already
371     * running when this method is called, the JMS provider should detect the
372     * duplicate ID and throw an <CODE>InvalidClientIDException</CODE>.
373     *
374     * @param newClientID the unique client identifier
375     * @throws JMSException if the JMS provider fails to set the client ID for
376     *                 this connection due to some internal error.
377     * @throws javax.jms.InvalidClientIDException if the JMS client specifies an
378     *                 invalid or duplicate client ID.
379     * @throws javax.jms.IllegalStateException if the JMS client attempts to set
380     *                 a connection's client ID at the wrong time or when it has
381     *                 been administratively configured.
382     */
383    public void setClientID(String newClientID) throws JMSException {
384        checkClosedOrFailed();
385
386        if (this.clientIDSet) {
387            throw new IllegalStateException("The clientID has already been set");
388        }
389
390        if (this.isConnectionInfoSentToBroker) {
391            throw new IllegalStateException("Setting clientID on a used Connection is not allowed");
392        }
393
394        this.info.setClientId(newClientID);
395        this.userSpecifiedClientID = true;
396        ensureConnectionInfoSent();
397    }
398
399    /**
400     * Sets the default client id that the connection will use if explicitly not
401     * set with the setClientId() call.
402     */
403    public void setDefaultClientID(String clientID) throws JMSException {
404        this.info.setClientId(clientID);
405        this.userSpecifiedClientID = true;
406    }
407
408    /**
409     * Gets the metadata for this connection.
410     *
411     * @return the connection metadata
412     * @throws JMSException if the JMS provider fails to get the connection
413     *                 metadata for this connection.
414     * @see javax.jms.ConnectionMetaData
415     */
416    public ConnectionMetaData getMetaData() throws JMSException {
417        checkClosedOrFailed();
418        return ActiveMQConnectionMetaData.INSTANCE;
419    }
420
421    /**
422     * Gets the <CODE>ExceptionListener</CODE> object for this connection. Not
423     * every <CODE>Connection</CODE> has an <CODE>ExceptionListener</CODE>
424     * associated with it.
425     *
426     * @return the <CODE>ExceptionListener</CODE> for this connection, or
427     *         null, if no <CODE>ExceptionListener</CODE> is associated with
428     *         this connection.
429     * @throws JMSException if the JMS provider fails to get the
430     *                 <CODE>ExceptionListener</CODE> for this connection.
431     * @see javax.jms.Connection#setExceptionListener(ExceptionListener)
432     */
433    public ExceptionListener getExceptionListener() throws JMSException {
434        checkClosedOrFailed();
435        return this.exceptionListener;
436    }
437
438    /**
439     * Sets an exception listener for this connection.
440     * <P>
441     * If a JMS provider detects a serious problem with a connection, it informs
442     * the connection's <CODE> ExceptionListener</CODE>, if one has been
443     * registered. It does this by calling the listener's <CODE>onException
444     * </CODE>
445     * method, passing it a <CODE>JMSException</CODE> object describing the
446     * problem.
447     * <P>
448     * An exception listener allows a client to be notified of a problem
449     * asynchronously. Some connections only consume messages, so they would
450     * have no other way to learn their connection has failed.
451     * <P>
452     * A connection serializes execution of its <CODE>ExceptionListener</CODE>.
453     * <P>
454     * A JMS provider should attempt to resolve connection problems itself
455     * before it notifies the client of them.
456     *
457     * @param listener the exception listener
458     * @throws JMSException if the JMS provider fails to set the exception
459     *                 listener for this connection.
460     */
461    public void setExceptionListener(ExceptionListener listener) throws JMSException {
462        checkClosedOrFailed();
463        this.exceptionListener = listener;
464    }
465
466    /**
467     * Gets the <code>ClientInternalExceptionListener</code> object for this connection.
468     * Not every <CODE>ActiveMQConnectionn</CODE> has a <CODE>ClientInternalExceptionListener</CODE>
469     * associated with it.
470     *
471     * @return the listener or <code>null</code> if no listener is registered with the connection.
472     */
473    public ClientInternalExceptionListener getClientInternalExceptionListener()
474    {
475        return clientInternalExceptionListener;
476    }
477
478    /**
479     * Sets a client internal exception listener for this connection.
480     * The connection will notify the listener, if one has been registered, of exceptions thrown by container components
481     * (e.g. an EJB container in case of Message Driven Beans) during asynchronous processing of a message.
482     * It does this by calling the listener's <code>onException()</code> method passing it a <code>Throwable</code>
483     * describing the problem.
484     *
485     * @param listener the exception listener
486     */
487    public void setClientInternalExceptionListener(ClientInternalExceptionListener listener)
488    {
489        this.clientInternalExceptionListener = listener;
490    }
491
492    /**
493     * Starts (or restarts) a connection's delivery of incoming messages. A call
494     * to <CODE>start</CODE> on a connection that has already been started is
495     * ignored.
496     *
497     * @throws JMSException if the JMS provider fails to start message delivery
498     *                 due to some internal error.
499     * @see javax.jms.Connection#stop()
500     */
501    public void start() throws JMSException {
502        checkClosedOrFailed();
503        ensureConnectionInfoSent();
504        if (started.compareAndSet(false, true)) {
505            for (Iterator<ActiveMQSession> i = sessions.iterator(); i.hasNext();) {
506                ActiveMQSession session = i.next();
507                session.start();
508            }
509        }
510    }
511
512    /**
513     * Temporarily stops a connection's delivery of incoming messages. Delivery
514     * can be restarted using the connection's <CODE>start</CODE> method. When
515     * the connection is stopped, delivery to all the connection's message
516     * consumers is inhibited: synchronous receives block, and messages are not
517     * delivered to message listeners.
518     * <P>
519     * This call blocks until receives and/or message listeners in progress have
520     * completed.
521     * <P>
522     * Stopping a connection has no effect on its ability to send messages. A
523     * call to <CODE>stop</CODE> on a connection that has already been stopped
524     * is ignored.
525     * <P>
526     * A call to <CODE>stop</CODE> must not return until delivery of messages
527     * has paused. This means that a client can rely on the fact that none of
528     * its message listeners will be called and that all threads of control
529     * waiting for <CODE>receive</CODE> calls to return will not return with a
530     * message until the connection is restarted. The receive timers for a
531     * stopped connection continue to advance, so receives may time out while
532     * the connection is stopped.
533     * <P>
534     * If message listeners are running when <CODE>stop</CODE> is invoked, the
535     * <CODE>stop</CODE> call must wait until all of them have returned before
536     * it may return. While these message listeners are completing, they must
537     * have the full services of the connection available to them.
538     *
539     * @throws JMSException if the JMS provider fails to stop message delivery
540     *                 due to some internal error.
541     * @see javax.jms.Connection#start()
542     */
543    public void stop() throws JMSException {
544        checkClosedOrFailed();
545        if (started.compareAndSet(true, false)) {
546            synchronized(sessions) {
547                for (Iterator<ActiveMQSession> i = sessions.iterator(); i.hasNext();) {
548                    ActiveMQSession s = i.next();
549                    s.stop();
550                }
551            }
552        }
553    }
554
555    /**
556     * Closes the connection.
557     * <P>
558     * Since a provider typically allocates significant resources outside the
559     * JVM on behalf of a connection, clients should close these resources when
560     * they are not needed. Relying on garbage collection to eventually reclaim
561     * these resources may not be timely enough.
562     * <P>
563     * There is no need to close the sessions, producers, and consumers of a
564     * closed connection.
565     * <P>
566     * Closing a connection causes all temporary destinations to be deleted.
567     * <P>
568     * When this method is invoked, it should not return until message
569     * processing has been shut down in an orderly fashion. This means that all
570     * message listeners that may have been running have returned, and that all
571     * pending receives have returned. A close terminates all pending message
572     * receives on the connection's sessions' consumers. The receives may return
573     * with a message or with null, depending on whether there was a message
574     * available at the time of the close. If one or more of the connection's
575     * sessions' message listeners is processing a message at the time when
576     * connection <CODE>close</CODE> is invoked, all the facilities of the
577     * connection and its sessions must remain available to those listeners
578     * until they return control to the JMS provider.
579     * <P>
580     * Closing a connection causes any of its sessions' transactions in progress
581     * to be rolled back. In the case where a session's work is coordinated by
582     * an external transaction manager, a session's <CODE>commit</CODE> and
583     * <CODE> rollback</CODE> methods are not used and the result of a closed
584     * session's work is determined later by the transaction manager. Closing a
585     * connection does NOT force an acknowledgment of client-acknowledged
586     * sessions.
587     * <P>
588     * Invoking the <CODE>acknowledge</CODE> method of a received message from
589     * a closed connection's session must throw an
590     * <CODE>IllegalStateException</CODE>. Closing a closed connection must
591     * NOT throw an exception.
592     *
593     * @throws JMSException if the JMS provider fails to close the connection
594     *                 due to some internal error. For example, a failure to
595     *                 release resources or to close a socket connection can
596     *                 cause this exception to be thrown.
597     */
598    public void close() throws JMSException {
599        // Store the interrupted state and clear so that cleanup happens without
600        // leaking connection resources.  Reset in finally to preserve state.
601        boolean interrupted = Thread.interrupted();
602
603        try {
604
605            // If we were running, lets stop first.
606            if (!closed.get() && !transportFailed.get()) {
607                stop();
608            }
609
610            synchronized (this) {
611                if (!closed.get()) {
612                    closing.set(true);
613
614                    if (destinationSource != null) {
615                        destinationSource.stop();
616                        destinationSource = null;
617                    }
618                    if (advisoryConsumer != null) {
619                        advisoryConsumer.dispose();
620                        advisoryConsumer = null;
621                    }
622
623                    Scheduler scheduler = this.scheduler;
624                    if (scheduler != null) {
625                        try {
626                            scheduler.stop();
627                        } catch (Exception e) {
628                            JMSException ex =  JMSExceptionSupport.create(e);
629                            throw ex;
630                        }
631                    }
632
633                    long lastDeliveredSequenceId = 0;
634                    for (Iterator<ActiveMQSession> i = this.sessions.iterator(); i.hasNext();) {
635                        ActiveMQSession s = i.next();
636                        s.dispose();
637                        lastDeliveredSequenceId = Math.max(lastDeliveredSequenceId, s.getLastDeliveredSequenceId());
638                    }
639                    for (Iterator<ActiveMQConnectionConsumer> i = this.connectionConsumers.iterator(); i.hasNext();) {
640                        ActiveMQConnectionConsumer c = i.next();
641                        c.dispose();
642                    }
643                    for (Iterator<ActiveMQInputStream> i = this.inputStreams.iterator(); i.hasNext();) {
644                        ActiveMQInputStream c = i.next();
645                        c.dispose();
646                    }
647                    for (Iterator<ActiveMQOutputStream> i = this.outputStreams.iterator(); i.hasNext();) {
648                        ActiveMQOutputStream c = i.next();
649                        c.dispose();
650                    }
651
652                    // As TemporaryQueue and TemporaryTopic instances are bound
653                    // to a connection we should just delete them after the connection
654                    // is closed to free up memory
655                    for (Iterator<ActiveMQTempDestination> i = this.activeTempDestinations.values().iterator(); i.hasNext();) {
656                        ActiveMQTempDestination c = i.next();
657                        c.delete();
658                    }
659
660                    if (isConnectionInfoSentToBroker) {
661                        // If we announced ourselfs to the broker.. Try to let
662                        // the broker
663                        // know that the connection is being shutdown.
664                        RemoveInfo removeCommand = info.createRemoveCommand();
665                        removeCommand.setLastDeliveredSequenceId(lastDeliveredSequenceId);
666                        doSyncSendPacket(info.createRemoveCommand(), closeTimeout);
667                        doAsyncSendPacket(new ShutdownInfo());
668                    }
669
670                    started.set(false);
671
672                    // TODO if we move the TaskRunnerFactory to the connection
673                    // factory
674                    // then we may need to call
675                    // factory.onConnectionClose(this);
676                    if (sessionTaskRunner != null) {
677                        sessionTaskRunner.shutdown();
678                    }
679                    closed.set(true);
680                    closing.set(false);
681                }
682            }
683        } finally {
684            try {
685                if (executor != null) {
686                    executor.shutdown();
687                }
688            } catch (Throwable e) {
689                LOG.error("Error shutting down thread pool " + e, e);
690            }
691
692            ServiceSupport.dispose(this.transport);
693
694            factoryStats.removeConnection(this);
695            if (interrupted) {
696                Thread.currentThread().interrupt();
697            }
698        }
699    }
700
701    /**
702     * Tells the broker to terminate its VM. This can be used to cleanly
703     * terminate a broker running in a standalone java process. Server must have
704     * property enable.vm.shutdown=true defined to allow this to work.
705     */
706    // TODO : org.apache.activemq.message.BrokerAdminCommand not yet
707    // implemented.
708    /*
709     * public void terminateBrokerVM() throws JMSException { BrokerAdminCommand
710     * command = new BrokerAdminCommand();
711     * command.setCommand(BrokerAdminCommand.SHUTDOWN_SERVER_VM);
712     * asyncSendPacket(command); }
713     */
714
715    /**
716     * Create a durable connection consumer for this connection (optional
717     * operation). This is an expert facility not used by regular JMS clients.
718     *
719     * @param topic topic to access
720     * @param subscriptionName durable subscription name
721     * @param messageSelector only messages with properties matching the message
722     *                selector expression are delivered. A value of null or an
723     *                empty string indicates that there is no message selector
724     *                for the message consumer.
725     * @param sessionPool the server session pool to associate with this durable
726     *                connection consumer
727     * @param maxMessages the maximum number of messages that can be assigned to
728     *                a server session at one time
729     * @return the durable connection consumer
730     * @throws JMSException if the <CODE>Connection</CODE> object fails to
731     *                 create a connection consumer due to some internal error
732     *                 or invalid arguments for <CODE>sessionPool</CODE> and
733     *                 <CODE>messageSelector</CODE>.
734     * @throws javax.jms.InvalidDestinationException if an invalid destination
735     *                 is specified.
736     * @throws javax.jms.InvalidSelectorException if the message selector is
737     *                 invalid.
738     * @see javax.jms.ConnectionConsumer
739     * @since 1.1
740     */
741    public ConnectionConsumer createDurableConnectionConsumer(Topic topic, String subscriptionName, String messageSelector, ServerSessionPool sessionPool, int maxMessages)
742        throws JMSException {
743        return this.createDurableConnectionConsumer(topic, subscriptionName, messageSelector, sessionPool, maxMessages, false);
744    }
745
746    /**
747     * Create a durable connection consumer for this connection (optional
748     * operation). This is an expert facility not used by regular JMS clients.
749     *
750     * @param topic topic to access
751     * @param subscriptionName durable subscription name
752     * @param messageSelector only messages with properties matching the message
753     *                selector expression are delivered. A value of null or an
754     *                empty string indicates that there is no message selector
755     *                for the message consumer.
756     * @param sessionPool the server session pool to associate with this durable
757     *                connection consumer
758     * @param maxMessages the maximum number of messages that can be assigned to
759     *                a server session at one time
760     * @param noLocal set true if you want to filter out messages published
761     *                locally
762     * @return the durable connection consumer
763     * @throws JMSException if the <CODE>Connection</CODE> object fails to
764     *                 create a connection consumer due to some internal error
765     *                 or invalid arguments for <CODE>sessionPool</CODE> and
766     *                 <CODE>messageSelector</CODE>.
767     * @throws javax.jms.InvalidDestinationException if an invalid destination
768     *                 is specified.
769     * @throws javax.jms.InvalidSelectorException if the message selector is
770     *                 invalid.
771     * @see javax.jms.ConnectionConsumer
772     * @since 1.1
773     */
774    public ConnectionConsumer createDurableConnectionConsumer(Topic topic, String subscriptionName, String messageSelector, ServerSessionPool sessionPool, int maxMessages,
775                                                              boolean noLocal) throws JMSException {
776        checkClosedOrFailed();
777        ensureConnectionInfoSent();
778        SessionId sessionId = new SessionId(info.getConnectionId(), -1);
779        ConsumerInfo info = new ConsumerInfo(new ConsumerId(sessionId, consumerIdGenerator.getNextSequenceId()));
780        info.setDestination(ActiveMQMessageTransformation.transformDestination(topic));
781        info.setSubscriptionName(subscriptionName);
782        info.setSelector(messageSelector);
783        info.setPrefetchSize(maxMessages);
784        info.setDispatchAsync(isDispatchAsync());
785
786        // Allows the options on the destination to configure the consumerInfo
787        if (info.getDestination().getOptions() != null) {
788            Map<String, String> options = new HashMap<String, String>(info.getDestination().getOptions());
789            IntrospectionSupport.setProperties(this.info, options, "consumer.");
790        }
791
792        return new ActiveMQConnectionConsumer(this, sessionPool, info);
793    }
794
795    // Properties
796    // -------------------------------------------------------------------------
797
798    /**
799     * Returns true if this connection has been started
800     *
801     * @return true if this Connection is started
802     */
803    public boolean isStarted() {
804        return started.get();
805    }
806
807    /**
808     * Returns true if the connection is closed
809     */
810    public boolean isClosed() {
811        return closed.get();
812    }
813
814    /**
815     * Returns true if the connection is in the process of being closed
816     */
817    public boolean isClosing() {
818        return closing.get();
819    }
820
821    /**
822     * Returns true if the underlying transport has failed
823     */
824    public boolean isTransportFailed() {
825        return transportFailed.get();
826    }
827
828    /**
829     * @return Returns the prefetchPolicy.
830     */
831    public ActiveMQPrefetchPolicy getPrefetchPolicy() {
832        return prefetchPolicy;
833    }
834
835    /**
836     * Sets the <a
837     * href="http://activemq.apache.org/what-is-the-prefetch-limit-for.html">prefetch
838     * policy</a> for consumers created by this connection.
839     */
840    public void setPrefetchPolicy(ActiveMQPrefetchPolicy prefetchPolicy) {
841        this.prefetchPolicy = prefetchPolicy;
842    }
843
844    /**
845     */
846    public Transport getTransportChannel() {
847        return transport;
848    }
849
850    /**
851     * @return Returns the clientID of the connection, forcing one to be
852     *         generated if one has not yet been configured.
853     */
854    public String getInitializedClientID() throws JMSException {
855        ensureConnectionInfoSent();
856        return info.getClientId();
857    }
858
859    /**
860     * @return Returns the timeStampsDisableByDefault.
861     */
862    public boolean isDisableTimeStampsByDefault() {
863        return disableTimeStampsByDefault;
864    }
865
866    /**
867     * Sets whether or not timestamps on messages should be disabled or not. If
868     * you disable them it adds a small performance boost.
869     */
870    public void setDisableTimeStampsByDefault(boolean timeStampsDisableByDefault) {
871        this.disableTimeStampsByDefault = timeStampsDisableByDefault;
872    }
873
874    /**
875     * @return Returns the dispatchOptimizedMessage.
876     */
877    public boolean isOptimizedMessageDispatch() {
878        return optimizedMessageDispatch;
879    }
880
881    /**
882     * If this flag is set then an larger prefetch limit is used - only
883     * applicable for durable topic subscribers.
884     */
885    public void setOptimizedMessageDispatch(boolean dispatchOptimizedMessage) {
886        this.optimizedMessageDispatch = dispatchOptimizedMessage;
887    }
888
889    /**
890     * @return Returns the closeTimeout.
891     */
892    public int getCloseTimeout() {
893        return closeTimeout;
894    }
895
896    /**
897     * Sets the timeout before a close is considered complete. Normally a
898     * close() on a connection waits for confirmation from the broker; this
899     * allows that operation to timeout to save the client hanging if there is
900     * no broker
901     */
902    public void setCloseTimeout(int closeTimeout) {
903        this.closeTimeout = closeTimeout;
904    }
905
906    /**
907     * @return ConnectionInfo
908     */
909    public ConnectionInfo getConnectionInfo() {
910        return this.info;
911    }
912
913    public boolean isUseRetroactiveConsumer() {
914        return useRetroactiveConsumer;
915    }
916
917    /**
918     * Sets whether or not retroactive consumers are enabled. Retroactive
919     * consumers allow non-durable topic subscribers to receive old messages
920     * that were published before the non-durable subscriber started.
921     */
922    public void setUseRetroactiveConsumer(boolean useRetroactiveConsumer) {
923        this.useRetroactiveConsumer = useRetroactiveConsumer;
924    }
925
926    public boolean isNestedMapAndListEnabled() {
927        return nestedMapAndListEnabled;
928    }
929
930    /**
931     * Enables/disables whether or not Message properties and MapMessage entries
932     * support <a
933     * href="http://activemq.apache.org/structured-message-properties-and-mapmessages.html">Nested
934     * Structures</a> of Map and List objects
935     */
936    public void setNestedMapAndListEnabled(boolean structuredMapsEnabled) {
937        this.nestedMapAndListEnabled = structuredMapsEnabled;
938    }
939
940    public boolean isExclusiveConsumer() {
941        return exclusiveConsumer;
942    }
943
944    /**
945     * Enables or disables whether or not queue consumers should be exclusive or
946     * not for example to preserve ordering when not using <a
947     * href="http://activemq.apache.org/message-groups.html">Message Groups</a>
948     *
949     * @param exclusiveConsumer
950     */
951    public void setExclusiveConsumer(boolean exclusiveConsumer) {
952        this.exclusiveConsumer = exclusiveConsumer;
953    }
954
955    /**
956     * Adds a transport listener so that a client can be notified of events in
957     * the underlying transport
958     */
959    public void addTransportListener(TransportListener transportListener) {
960        transportListeners.add(transportListener);
961    }
962
963    public void removeTransportListener(TransportListener transportListener) {
964        transportListeners.remove(transportListener);
965    }
966
967    public boolean isUseDedicatedTaskRunner() {
968        return useDedicatedTaskRunner;
969    }
970
971    public void setUseDedicatedTaskRunner(boolean useDedicatedTaskRunner) {
972        this.useDedicatedTaskRunner = useDedicatedTaskRunner;
973    }
974
975    public TaskRunnerFactory getSessionTaskRunner() {
976        synchronized (this) {
977            if (sessionTaskRunner == null) {
978                sessionTaskRunner = new TaskRunnerFactory("ActiveMQ Session Task", ThreadPriorities.INBOUND_CLIENT_SESSION, false, 1000, isUseDedicatedTaskRunner());
979            }
980        }
981        return sessionTaskRunner;
982    }
983
984    public void setSessionTaskRunner(TaskRunnerFactory sessionTaskRunner) {
985        this.sessionTaskRunner = sessionTaskRunner;
986    }
987
988    public MessageTransformer getTransformer() {
989        return transformer;
990    }
991
992    /**
993     * Sets the transformer used to transform messages before they are sent on
994     * to the JMS bus or when they are received from the bus but before they are
995     * delivered to the JMS client
996     */
997    public void setTransformer(MessageTransformer transformer) {
998        this.transformer = transformer;
999    }
1000
1001    /**
1002     * @return the statsEnabled
1003     */
1004    public boolean isStatsEnabled() {
1005        return this.stats.isEnabled();
1006    }
1007
1008    /**
1009     * @param statsEnabled the statsEnabled to set
1010     */
1011    public void setStatsEnabled(boolean statsEnabled) {
1012        this.stats.setEnabled(statsEnabled);
1013    }
1014
1015    /**
1016     * Returns the {@link DestinationSource} object which can be used to listen to destinations
1017     * being created or destroyed or to enquire about the current destinations available on the broker
1018     *
1019     * @return a lazily created destination source
1020     * @throws JMSException
1021     */
1022    public DestinationSource getDestinationSource() throws JMSException {
1023        if (destinationSource == null) {
1024            destinationSource = new DestinationSource(this);
1025            destinationSource.start();
1026        }
1027        return destinationSource;
1028    }
1029
1030    // Implementation methods
1031    // -------------------------------------------------------------------------
1032
1033    /**
1034     * Used internally for adding Sessions to the Connection
1035     *
1036     * @param session
1037     * @throws JMSException
1038     * @throws JMSException
1039     */
1040    protected void addSession(ActiveMQSession session) throws JMSException {
1041        this.sessions.add(session);
1042        if (sessions.size() > 1 || session.isTransacted()) {
1043            optimizedMessageDispatch = false;
1044        }
1045    }
1046
1047    /**
1048     * Used interanlly for removing Sessions from a Connection
1049     *
1050     * @param session
1051     */
1052    protected void removeSession(ActiveMQSession session) {
1053        this.sessions.remove(session);
1054        this.removeDispatcher(session);
1055    }
1056
1057    /**
1058     * Add a ConnectionConsumer
1059     *
1060     * @param connectionConsumer
1061     * @throws JMSException
1062     */
1063    protected void addConnectionConsumer(ActiveMQConnectionConsumer connectionConsumer) throws JMSException {
1064        this.connectionConsumers.add(connectionConsumer);
1065    }
1066
1067    /**
1068     * Remove a ConnectionConsumer
1069     *
1070     * @param connectionConsumer
1071     */
1072    protected void removeConnectionConsumer(ActiveMQConnectionConsumer connectionConsumer) {
1073        this.connectionConsumers.remove(connectionConsumer);
1074        this.removeDispatcher(connectionConsumer);
1075    }
1076
1077    /**
1078     * Creates a <CODE>TopicSession</CODE> object.
1079     *
1080     * @param transacted indicates whether the session is transacted
1081     * @param acknowledgeMode indicates whether the consumer or the client will
1082     *                acknowledge any messages it receives; ignored if the
1083     *                session is transacted. Legal values are
1084     *                <code>Session.AUTO_ACKNOWLEDGE</code>,
1085     *                <code>Session.CLIENT_ACKNOWLEDGE</code>, and
1086     *                <code>Session.DUPS_OK_ACKNOWLEDGE</code>.
1087     * @return a newly created topic session
1088     * @throws JMSException if the <CODE>TopicConnection</CODE> object fails
1089     *                 to create a session due to some internal error or lack of
1090     *                 support for the specific transaction and acknowledgement
1091     *                 mode.
1092     * @see Session#AUTO_ACKNOWLEDGE
1093     * @see Session#CLIENT_ACKNOWLEDGE
1094     * @see Session#DUPS_OK_ACKNOWLEDGE
1095     */
1096    public TopicSession createTopicSession(boolean transacted, int acknowledgeMode) throws JMSException {
1097        return new ActiveMQTopicSession((ActiveMQSession)createSession(transacted, acknowledgeMode));
1098    }
1099
1100    /**
1101     * Creates a connection consumer for this connection (optional operation).
1102     * This is an expert facility not used by regular JMS clients.
1103     *
1104     * @param topic the topic to access
1105     * @param messageSelector only messages with properties matching the message
1106     *                selector expression are delivered. A value of null or an
1107     *                empty string indicates that there is no message selector
1108     *                for the message consumer.
1109     * @param sessionPool the server session pool to associate with this
1110     *                connection consumer
1111     * @param maxMessages the maximum number of messages that can be assigned to
1112     *                a server session at one time
1113     * @return the connection consumer
1114     * @throws JMSException if the <CODE>TopicConnection</CODE> object fails
1115     *                 to create a connection consumer due to some internal
1116     *                 error or invalid arguments for <CODE>sessionPool</CODE>
1117     *                 and <CODE>messageSelector</CODE>.
1118     * @throws javax.jms.InvalidDestinationException if an invalid topic is
1119     *                 specified.
1120     * @throws javax.jms.InvalidSelectorException if the message selector is
1121     *                 invalid.
1122     * @see javax.jms.ConnectionConsumer
1123     */
1124    public ConnectionConsumer createConnectionConsumer(Topic topic, String messageSelector, ServerSessionPool sessionPool, int maxMessages) throws JMSException {
1125        return createConnectionConsumer(topic, messageSelector, sessionPool, maxMessages, false);
1126    }
1127
1128    /**
1129     * Creates a connection consumer for this connection (optional operation).
1130     * This is an expert facility not used by regular JMS clients.
1131     *
1132     * @param queue the queue to access
1133     * @param messageSelector only messages with properties matching the message
1134     *                selector expression are delivered. A value of null or an
1135     *                empty string indicates that there is no message selector
1136     *                for the message consumer.
1137     * @param sessionPool the server session pool to associate with this
1138     *                connection consumer
1139     * @param maxMessages the maximum number of messages that can be assigned to
1140     *                a server session at one time
1141     * @return the connection consumer
1142     * @throws JMSException if the <CODE>QueueConnection</CODE> object fails
1143     *                 to create a connection consumer due to some internal
1144     *                 error or invalid arguments for <CODE>sessionPool</CODE>
1145     *                 and <CODE>messageSelector</CODE>.
1146     * @throws javax.jms.InvalidDestinationException if an invalid queue is
1147     *                 specified.
1148     * @throws javax.jms.InvalidSelectorException if the message selector is
1149     *                 invalid.
1150     * @see javax.jms.ConnectionConsumer
1151     */
1152    public ConnectionConsumer createConnectionConsumer(Queue queue, String messageSelector, ServerSessionPool sessionPool, int maxMessages) throws JMSException {
1153        return createConnectionConsumer(queue, messageSelector, sessionPool, maxMessages, false);
1154    }
1155
1156    /**
1157     * Creates a connection consumer for this connection (optional operation).
1158     * This is an expert facility not used by regular JMS clients.
1159     *
1160     * @param destination the destination to access
1161     * @param messageSelector only messages with properties matching the message
1162     *                selector expression are delivered. A value of null or an
1163     *                empty string indicates that there is no message selector
1164     *                for the message consumer.
1165     * @param sessionPool the server session pool to associate with this
1166     *                connection consumer
1167     * @param maxMessages the maximum number of messages that can be assigned to
1168     *                a server session at one time
1169     * @return the connection consumer
1170     * @throws JMSException if the <CODE>Connection</CODE> object fails to
1171     *                 create a connection consumer due to some internal error
1172     *                 or invalid arguments for <CODE>sessionPool</CODE> and
1173     *                 <CODE>messageSelector</CODE>.
1174     * @throws javax.jms.InvalidDestinationException if an invalid destination
1175     *                 is specified.
1176     * @throws javax.jms.InvalidSelectorException if the message selector is
1177     *                 invalid.
1178     * @see javax.jms.ConnectionConsumer
1179     * @since 1.1
1180     */
1181    public ConnectionConsumer createConnectionConsumer(Destination destination, String messageSelector, ServerSessionPool sessionPool, int maxMessages) throws JMSException {
1182        return createConnectionConsumer(destination, messageSelector, sessionPool, maxMessages, false);
1183    }
1184
1185    public ConnectionConsumer createConnectionConsumer(Destination destination, String messageSelector, ServerSessionPool sessionPool, int maxMessages, boolean noLocal)
1186        throws JMSException {
1187
1188        checkClosedOrFailed();
1189        ensureConnectionInfoSent();
1190
1191        ConsumerId consumerId = createConsumerId();
1192        ConsumerInfo consumerInfo = new ConsumerInfo(consumerId);
1193        consumerInfo.setDestination(ActiveMQMessageTransformation.transformDestination(destination));
1194        consumerInfo.setSelector(messageSelector);
1195        consumerInfo.setPrefetchSize(maxMessages);
1196        consumerInfo.setNoLocal(noLocal);
1197        consumerInfo.setDispatchAsync(isDispatchAsync());
1198
1199        // Allows the options on the destination to configure the consumerInfo
1200        if (consumerInfo.getDestination().getOptions() != null) {
1201            Map<String, String> options = new HashMap<String, String>(consumerInfo.getDestination().getOptions());
1202            IntrospectionSupport.setProperties(consumerInfo, options, "consumer.");
1203        }
1204
1205        return new ActiveMQConnectionConsumer(this, sessionPool, consumerInfo);
1206    }
1207
1208    /**
1209     * @return
1210     */
1211    private ConsumerId createConsumerId() {
1212        return new ConsumerId(connectionSessionId, consumerIdGenerator.getNextSequenceId());
1213    }
1214
1215    /**
1216     * @return
1217     */
1218    private ProducerId createProducerId() {
1219        return new ProducerId(connectionSessionId, producerIdGenerator.getNextSequenceId());
1220    }
1221
1222    /**
1223     * Creates a <CODE>QueueSession</CODE> object.
1224     *
1225     * @param transacted indicates whether the session is transacted
1226     * @param acknowledgeMode indicates whether the consumer or the client will
1227     *                acknowledge any messages it receives; ignored if the
1228     *                session is transacted. Legal values are
1229     *                <code>Session.AUTO_ACKNOWLEDGE</code>,
1230     *                <code>Session.CLIENT_ACKNOWLEDGE</code>, and
1231     *                <code>Session.DUPS_OK_ACKNOWLEDGE</code>.
1232     * @return a newly created queue session
1233     * @throws JMSException if the <CODE>QueueConnection</CODE> object fails
1234     *                 to create a session due to some internal error or lack of
1235     *                 support for the specific transaction and acknowledgement
1236     *                 mode.
1237     * @see Session#AUTO_ACKNOWLEDGE
1238     * @see Session#CLIENT_ACKNOWLEDGE
1239     * @see Session#DUPS_OK_ACKNOWLEDGE
1240     */
1241    public QueueSession createQueueSession(boolean transacted, int acknowledgeMode) throws JMSException {
1242        return new ActiveMQQueueSession((ActiveMQSession)createSession(transacted, acknowledgeMode));
1243    }
1244
1245    /**
1246     * Ensures that the clientID was manually specified and not auto-generated.
1247     * If the clientID was not specified this method will throw an exception.
1248     * This method is used to ensure that the clientID + durableSubscriber name
1249     * are used correctly.
1250     *
1251     * @throws JMSException
1252     */
1253    public void checkClientIDWasManuallySpecified() throws JMSException {
1254        if (!userSpecifiedClientID) {
1255            throw new JMSException("You cannot create a durable subscriber without specifying a unique clientID on a Connection");
1256        }
1257    }
1258
1259    /**
1260     * send a Packet through the Connection - for internal use only
1261     *
1262     * @param command
1263     * @throws JMSException
1264     */
1265    public void asyncSendPacket(Command command) throws JMSException {
1266        if (isClosed()) {
1267            throw new ConnectionClosedException();
1268        } else {
1269            doAsyncSendPacket(command);
1270        }
1271    }
1272
1273    private void doAsyncSendPacket(Command command) throws JMSException {
1274        try {
1275            this.transport.oneway(command);
1276        } catch (IOException e) {
1277            throw JMSExceptionSupport.create(e);
1278        }
1279    }
1280
1281    /**
1282     * Send a packet through a Connection - for internal use only
1283     *
1284     * @param command
1285     * @return
1286     * @throws JMSException
1287     */
1288    public void syncSendPacket(Command command, final AsyncCallback onComplete) throws JMSException {
1289        if(onComplete==null) {
1290            syncSendPacket(command);
1291        } else {
1292            if (isClosed()) {
1293                throw new ConnectionClosedException();
1294            }
1295            try {
1296                this.transport.asyncRequest(command, new ResponseCallback() {
1297                    @Override
1298                    public void onCompletion(FutureResponse resp) {
1299                        Response response;
1300                        Throwable exception = null;
1301                        try {
1302                            response = resp.getResult();
1303                            if (response.isException()) {
1304                                ExceptionResponse er = (ExceptionResponse)response;
1305                                exception = er.getException();
1306                            }
1307                        } catch (Exception e) {
1308                            exception = e;
1309                        }
1310                        if(exception!=null) {
1311                            if ( exception instanceof JMSException) {
1312                                onComplete.onException((JMSException) exception);
1313                            } else {
1314                                if (isClosed()||closing.get()) {
1315                                    LOG.debug("Received an exception but connection is closing");
1316                                }
1317                                JMSException jmsEx = null;
1318                                try {
1319                                    jmsEx = JMSExceptionSupport.create(exception);
1320                                } catch(Throwable e) {
1321                                    LOG.error("Caught an exception trying to create a JMSException for " +exception,e);
1322                                }
1323                                //dispose of transport for security exceptions
1324                                if (exception instanceof SecurityException){
1325                                    Transport t = transport;
1326                                    if (null != t){
1327                                        ServiceSupport.dispose(t);
1328                                    }
1329                                }
1330                                if (jmsEx !=null) {
1331                                    onComplete.onException(jmsEx);
1332                                }
1333                            }
1334                        } else {
1335                            onComplete.onSuccess();
1336                        }
1337                    }
1338                });
1339            } catch (IOException e) {
1340                throw JMSExceptionSupport.create(e);
1341            }
1342        }
1343    }
1344
1345    public Response syncSendPacket(Command command) throws JMSException {
1346        if (isClosed()) {
1347            throw new ConnectionClosedException();
1348        } else {
1349
1350            try {
1351                Response response = (Response)this.transport.request(command);
1352                if (response.isException()) {
1353                    ExceptionResponse er = (ExceptionResponse)response;
1354                    if (er.getException() instanceof JMSException) {
1355                        throw (JMSException)er.getException();
1356                    } else {
1357                        if (isClosed()||closing.get()) {
1358                            LOG.debug("Received an exception but connection is closing");
1359                        }
1360                        JMSException jmsEx = null;
1361                        try {
1362                            jmsEx = JMSExceptionSupport.create(er.getException());
1363                        } catch(Throwable e) {
1364                            LOG.error("Caught an exception trying to create a JMSException for " +er.getException(),e);
1365                        }
1366                        //dispose of transport for security exceptions
1367                        if (er.getException() instanceof SecurityException){
1368                            Transport t = this.transport;
1369                            if (null != t){
1370                                ServiceSupport.dispose(t);
1371                            }
1372                        }
1373                        if (jmsEx !=null) {
1374                            throw jmsEx;
1375                        }
1376                    }
1377                }
1378                return response;
1379            } catch (IOException e) {
1380                throw JMSExceptionSupport.create(e);
1381            }
1382        }
1383    }
1384
1385    /**
1386     * Send a packet through a Connection - for internal use only
1387     *
1388     * @param command
1389     * @return
1390     * @throws JMSException
1391     */
1392    public Response syncSendPacket(Command command, int timeout) throws JMSException {
1393        if (isClosed() || closing.get()) {
1394            throw new ConnectionClosedException();
1395        } else {
1396            return doSyncSendPacket(command, timeout);
1397        }
1398    }
1399
1400    private Response doSyncSendPacket(Command command, int timeout)
1401            throws JMSException {
1402        try {
1403            Response response = (Response) (timeout > 0
1404                    ? this.transport.request(command, timeout)
1405                    : this.transport.request(command));
1406            if (response != null && response.isException()) {
1407                ExceptionResponse er = (ExceptionResponse)response;
1408                if (er.getException() instanceof JMSException) {
1409                    throw (JMSException)er.getException();
1410                } else {
1411                    throw JMSExceptionSupport.create(er.getException());
1412                }
1413            }
1414            return response;
1415        } catch (IOException e) {
1416            throw JMSExceptionSupport.create(e);
1417        }
1418    }
1419
1420    /**
1421     * @return statistics for this Connection
1422     */
1423    public StatsImpl getStats() {
1424        return stats;
1425    }
1426
1427    /**
1428     * simply throws an exception if the Connection is already closed or the
1429     * Transport has failed
1430     *
1431     * @throws JMSException
1432     */
1433    protected synchronized void checkClosedOrFailed() throws JMSException {
1434        checkClosed();
1435        if (transportFailed.get()) {
1436            throw new ConnectionFailedException(firstFailureError);
1437        }
1438    }
1439
1440    /**
1441     * simply throws an exception if the Connection is already closed
1442     *
1443     * @throws JMSException
1444     */
1445    protected synchronized void checkClosed() throws JMSException {
1446        if (closed.get()) {
1447            throw new ConnectionClosedException();
1448        }
1449    }
1450
1451    /**
1452     * Send the ConnectionInfo to the Broker
1453     *
1454     * @throws JMSException
1455     */
1456    protected void ensureConnectionInfoSent() throws JMSException {
1457        synchronized(this.ensureConnectionInfoSentMutex) {
1458            // Can we skip sending the ConnectionInfo packet??
1459            if (isConnectionInfoSentToBroker || closed.get()) {
1460                return;
1461            }
1462            //TODO shouldn't this check be on userSpecifiedClientID rather than the value of clientID?
1463            if (info.getClientId() == null || info.getClientId().trim().length() == 0) {
1464                info.setClientId(clientIdGenerator.generateId());
1465            }
1466            syncSendPacket(info.copy());
1467
1468            this.isConnectionInfoSentToBroker = true;
1469            // Add a temp destination advisory consumer so that
1470            // We know what the valid temporary destinations are on the
1471            // broker without having to do an RPC to the broker.
1472
1473            ConsumerId consumerId = new ConsumerId(new SessionId(info.getConnectionId(), -1), consumerIdGenerator.getNextSequenceId());
1474            if (watchTopicAdvisories) {
1475                advisoryConsumer = new AdvisoryConsumer(this, consumerId);
1476            }
1477        }
1478    }
1479
1480    public synchronized boolean isWatchTopicAdvisories() {
1481        return watchTopicAdvisories;
1482    }
1483
1484    public synchronized void setWatchTopicAdvisories(boolean watchTopicAdvisories) {
1485        this.watchTopicAdvisories = watchTopicAdvisories;
1486    }
1487
1488    /**
1489     * @return Returns the useAsyncSend.
1490     */
1491    public boolean isUseAsyncSend() {
1492        return useAsyncSend;
1493    }
1494
1495    /**
1496     * Forces the use of <a
1497     * href="http://activemq.apache.org/async-sends.html">Async Sends</a> which
1498     * adds a massive performance boost; but means that the send() method will
1499     * return immediately whether the message has been sent or not which could
1500     * lead to message loss.
1501     */
1502    public void setUseAsyncSend(boolean useAsyncSend) {
1503        this.useAsyncSend = useAsyncSend;
1504    }
1505
1506    /**
1507     * @return true if always sync send messages
1508     */
1509    public boolean isAlwaysSyncSend() {
1510        return this.alwaysSyncSend;
1511    }
1512
1513    /**
1514     * Set true if always require messages to be sync sent
1515     *
1516     * @param alwaysSyncSend
1517     */
1518    public void setAlwaysSyncSend(boolean alwaysSyncSend) {
1519        this.alwaysSyncSend = alwaysSyncSend;
1520    }
1521
1522    /**
1523     * @return the messagePrioritySupported
1524     */
1525    public boolean isMessagePrioritySupported() {
1526        return this.messagePrioritySupported;
1527    }
1528
1529    /**
1530     * @param messagePrioritySupported the messagePrioritySupported to set
1531     */
1532    public void setMessagePrioritySupported(boolean messagePrioritySupported) {
1533        this.messagePrioritySupported = messagePrioritySupported;
1534    }
1535
1536    /**
1537     * Cleans up this connection so that it's state is as if the connection was
1538     * just created. This allows the Resource Adapter to clean up a connection
1539     * so that it can be reused without having to close and recreate the
1540     * connection.
1541     */
1542    public void cleanup() throws JMSException {
1543
1544        if (advisoryConsumer != null && !isTransportFailed()) {
1545            advisoryConsumer.dispose();
1546            advisoryConsumer = null;
1547        }
1548
1549        for (Iterator<ActiveMQSession> i = this.sessions.iterator(); i.hasNext();) {
1550            ActiveMQSession s = i.next();
1551            s.dispose();
1552        }
1553        for (Iterator<ActiveMQConnectionConsumer> i = this.connectionConsumers.iterator(); i.hasNext();) {
1554            ActiveMQConnectionConsumer c = i.next();
1555            c.dispose();
1556        }
1557        for (Iterator<ActiveMQInputStream> i = this.inputStreams.iterator(); i.hasNext();) {
1558            ActiveMQInputStream c = i.next();
1559            c.dispose();
1560        }
1561        for (Iterator<ActiveMQOutputStream> i = this.outputStreams.iterator(); i.hasNext();) {
1562            ActiveMQOutputStream c = i.next();
1563            c.dispose();
1564        }
1565
1566        if (isConnectionInfoSentToBroker) {
1567            if (!transportFailed.get() && !closing.get()) {
1568                syncSendPacket(info.createRemoveCommand());
1569            }
1570            isConnectionInfoSentToBroker = false;
1571        }
1572        if (userSpecifiedClientID) {
1573            info.setClientId(null);
1574            userSpecifiedClientID = false;
1575        }
1576        clientIDSet = false;
1577
1578        started.set(false);
1579    }
1580
1581    public void finalize() throws Throwable{
1582        Scheduler s = this.scheduler;
1583        if (s != null){
1584            s.stop();
1585        }
1586    }
1587
1588    /**
1589     * Changes the associated username/password that is associated with this
1590     * connection. If the connection has been used, you must called cleanup()
1591     * before calling this method.
1592     *
1593     * @throws IllegalStateException if the connection is in used.
1594     */
1595    public void changeUserInfo(String userName, String password) throws JMSException {
1596        if (isConnectionInfoSentToBroker) {
1597            throw new IllegalStateException("changeUserInfo used Connection is not allowed");
1598        }
1599        this.info.setUserName(userName);
1600        this.info.setPassword(password);
1601    }
1602
1603    /**
1604     * @return Returns the resourceManagerId.
1605     * @throws JMSException
1606     */
1607    public String getResourceManagerId() throws JMSException {
1608        waitForBrokerInfo();
1609        if (brokerInfo == null) {
1610            throw new JMSException("Connection failed before Broker info was received.");
1611        }
1612        return brokerInfo.getBrokerId().getValue();
1613    }
1614
1615    /**
1616     * Returns the broker name if one is available or null if one is not
1617     * available yet.
1618     */
1619    public String getBrokerName() {
1620        try {
1621            brokerInfoReceived.await(5, TimeUnit.SECONDS);
1622            if (brokerInfo == null) {
1623                return null;
1624            }
1625            return brokerInfo.getBrokerName();
1626        } catch (InterruptedException e) {
1627            Thread.currentThread().interrupt();
1628            return null;
1629        }
1630    }
1631
1632    /**
1633     * Returns the broker information if it is available or null if it is not
1634     * available yet.
1635     */
1636    public BrokerInfo getBrokerInfo() {
1637        return brokerInfo;
1638    }
1639
1640    /**
1641     * @return Returns the RedeliveryPolicy.
1642     * @throws JMSException
1643     */
1644    public RedeliveryPolicy getRedeliveryPolicy() throws JMSException {
1645        return redeliveryPolicy;
1646    }
1647
1648    /**
1649     * Sets the redelivery policy to be used when messages are rolled back
1650     */
1651    public void setRedeliveryPolicy(RedeliveryPolicy redeliveryPolicy) {
1652        this.redeliveryPolicy = redeliveryPolicy;
1653    }
1654
1655    public BlobTransferPolicy getBlobTransferPolicy() {
1656        if (blobTransferPolicy == null) {
1657            blobTransferPolicy = createBlobTransferPolicy();
1658        }
1659        return blobTransferPolicy;
1660    }
1661
1662    /**
1663     * Sets the policy used to describe how out-of-band BLOBs (Binary Large
1664     * OBjects) are transferred from producers to brokers to consumers
1665     */
1666    public void setBlobTransferPolicy(BlobTransferPolicy blobTransferPolicy) {
1667        this.blobTransferPolicy = blobTransferPolicy;
1668    }
1669
1670    /**
1671     * @return Returns the alwaysSessionAsync.
1672     */
1673    public boolean isAlwaysSessionAsync() {
1674        return alwaysSessionAsync;
1675    }
1676
1677    /**
1678     * If this flag is set then a separate thread is not used for dispatching
1679     * messages for each Session in the Connection. However, a separate thread
1680     * is always used if there is more than one session, or the session isn't in
1681     * auto acknowledge or duplicates ok mode
1682     */
1683    public void setAlwaysSessionAsync(boolean alwaysSessionAsync) {
1684        this.alwaysSessionAsync = alwaysSessionAsync;
1685    }
1686
1687    /**
1688     * @return Returns the optimizeAcknowledge.
1689     */
1690    public boolean isOptimizeAcknowledge() {
1691        return optimizeAcknowledge;
1692    }
1693
1694    /**
1695     * Enables an optimised acknowledgement mode where messages are acknowledged
1696     * in batches rather than individually
1697     *
1698     * @param optimizeAcknowledge The optimizeAcknowledge to set.
1699     */
1700    public void setOptimizeAcknowledge(boolean optimizeAcknowledge) {
1701        this.optimizeAcknowledge = optimizeAcknowledge;
1702    }
1703
1704    /**
1705     * The max time in milliseconds between optimized ack batches
1706     * @param optimizeAcknowledgeTimeOut
1707     */
1708    public void setOptimizeAcknowledgeTimeOut(long optimizeAcknowledgeTimeOut) {
1709        this.optimizeAcknowledgeTimeOut =  optimizeAcknowledgeTimeOut;
1710    }
1711
1712    public long getOptimizeAcknowledgeTimeOut() {
1713        return optimizeAcknowledgeTimeOut;
1714    }
1715
1716    public long getWarnAboutUnstartedConnectionTimeout() {
1717        return warnAboutUnstartedConnectionTimeout;
1718    }
1719
1720    /**
1721     * Enables the timeout from a connection creation to when a warning is
1722     * generated if the connection is not properly started via {@link #start()}
1723     * and a message is received by a consumer. It is a very common gotcha to
1724     * forget to <a
1725     * href="http://activemq.apache.org/i-am-not-receiving-any-messages-what-is-wrong.html">start
1726     * the connection</a> so this option makes the default case to create a
1727     * warning if the user forgets. To disable the warning just set the value to <
1728     * 0 (say -1).
1729     */
1730    public void setWarnAboutUnstartedConnectionTimeout(long warnAboutUnstartedConnectionTimeout) {
1731        this.warnAboutUnstartedConnectionTimeout = warnAboutUnstartedConnectionTimeout;
1732    }
1733
1734    /**
1735     * @return the sendTimeout
1736     */
1737    public int getSendTimeout() {
1738        return sendTimeout;
1739    }
1740
1741    /**
1742     * @param sendTimeout the sendTimeout to set
1743     */
1744    public void setSendTimeout(int sendTimeout) {
1745        this.sendTimeout = sendTimeout;
1746    }
1747
1748    /**
1749     * @return the sendAcksAsync
1750     */
1751    public boolean isSendAcksAsync() {
1752        return sendAcksAsync;
1753    }
1754
1755    /**
1756     * @param sendAcksAsync the sendAcksAsync to set
1757     */
1758    public void setSendAcksAsync(boolean sendAcksAsync) {
1759        this.sendAcksAsync = sendAcksAsync;
1760    }
1761
1762
1763    /**
1764     * Returns the time this connection was created
1765     */
1766    public long getTimeCreated() {
1767        return timeCreated;
1768    }
1769
1770    private void waitForBrokerInfo() throws JMSException {
1771        try {
1772            brokerInfoReceived.await();
1773        } catch (InterruptedException e) {
1774            Thread.currentThread().interrupt();
1775            throw JMSExceptionSupport.create(e);
1776        }
1777    }
1778
1779    // Package protected so that it can be used in unit tests
1780    public Transport getTransport() {
1781        return transport;
1782    }
1783
1784    public void addProducer(ProducerId producerId, ActiveMQMessageProducer producer) {
1785        producers.put(producerId, producer);
1786    }
1787
1788    public void removeProducer(ProducerId producerId) {
1789        producers.remove(producerId);
1790    }
1791
1792    public void addDispatcher(ConsumerId consumerId, ActiveMQDispatcher dispatcher) {
1793        dispatchers.put(consumerId, dispatcher);
1794    }
1795
1796    public void removeDispatcher(ConsumerId consumerId) {
1797        dispatchers.remove(consumerId);
1798    }
1799
1800    /**
1801     * @param o - the command to consume
1802     */
1803    public void onCommand(final Object o) {
1804        final Command command = (Command)o;
1805        if (!closed.get() && command != null) {
1806            try {
1807                command.visit(new CommandVisitorAdapter() {
1808                    @Override
1809                    public Response processMessageDispatch(MessageDispatch md) throws Exception {
1810                        waitForTransportInterruptionProcessingToComplete();
1811                        ActiveMQDispatcher dispatcher = dispatchers.get(md.getConsumerId());
1812                        if (dispatcher != null) {
1813                            // Copy in case a embedded broker is dispatching via
1814                            // vm://
1815                            // md.getMessage() == null to signal end of queue
1816                            // browse.
1817                            Message msg = md.getMessage();
1818                            if (msg != null) {
1819                                msg = msg.copy();
1820                                msg.setReadOnlyBody(true);
1821                                msg.setReadOnlyProperties(true);
1822                                msg.setRedeliveryCounter(md.getRedeliveryCounter());
1823                                msg.setConnection(ActiveMQConnection.this);
1824                                md.setMessage(msg);
1825                            }
1826                            dispatcher.dispatch(md);
1827                        }
1828                        return null;
1829                    }
1830
1831                    @Override
1832                    public Response processProducerAck(ProducerAck pa) throws Exception {
1833                        if (pa != null && pa.getProducerId() != null) {
1834                            ActiveMQMessageProducer producer = producers.get(pa.getProducerId());
1835                            if (producer != null) {
1836                                producer.onProducerAck(pa);
1837                            }
1838                        }
1839                        return null;
1840                    }
1841
1842                    @Override
1843                    public Response processBrokerInfo(BrokerInfo info) throws Exception {
1844                        brokerInfo = info;
1845                        brokerInfoReceived.countDown();
1846                        optimizeAcknowledge &= !brokerInfo.isFaultTolerantConfiguration();
1847                        getBlobTransferPolicy().setBrokerUploadUrl(info.getBrokerUploadUrl());
1848                        return null;
1849                    }
1850
1851                    @Override
1852                    public Response processConnectionError(final ConnectionError error) throws Exception {
1853                        executor.execute(new Runnable() {
1854                            public void run() {
1855                                onAsyncException(error.getException());
1856                            }
1857                        });
1858                        return null;
1859                    }
1860
1861                    @Override
1862                    public Response processControlCommand(ControlCommand command) throws Exception {
1863                        return null;
1864                    }
1865
1866                    @Override
1867                    public Response processConnectionControl(ConnectionControl control) throws Exception {
1868                        onConnectionControl((ConnectionControl)command);
1869                        return null;
1870                    }
1871
1872                    @Override
1873                    public Response processConsumerControl(ConsumerControl control) throws Exception {
1874                        onConsumerControl((ConsumerControl)command);
1875                        return null;
1876                    }
1877
1878                    @Override
1879                    public Response processWireFormat(WireFormatInfo info) throws Exception {
1880                        onWireFormatInfo((WireFormatInfo)command);
1881                        return null;
1882                    }
1883                });
1884            } catch (Exception e) {
1885                onClientInternalException(e);
1886            }
1887
1888        }
1889        for (Iterator<TransportListener> iter = transportListeners.iterator(); iter.hasNext();) {
1890            TransportListener listener = iter.next();
1891            listener.onCommand(command);
1892        }
1893    }
1894
1895    protected void onWireFormatInfo(WireFormatInfo info) {
1896        protocolVersion.set(info.getVersion());
1897    }
1898
1899    /**
1900     * Handles async client internal exceptions.
1901     * A client internal exception is usually one that has been thrown
1902     * by a container runtime component during asynchronous processing of a
1903     * message that does not affect the connection itself.
1904     * This method notifies the <code>ClientInternalExceptionListener</code> by invoking
1905     * its <code>onException</code> method, if one has been registered with this connection.
1906     *
1907     * @param error the exception that the problem
1908     */
1909    public void onClientInternalException(final Throwable error) {
1910        if ( !closed.get() && !closing.get() ) {
1911            if ( this.clientInternalExceptionListener != null ) {
1912                executor.execute(new Runnable() {
1913                    public void run() {
1914                        ActiveMQConnection.this.clientInternalExceptionListener.onException(error);
1915                    }
1916                });
1917            } else {
1918                LOG.debug("Async client internal exception occurred with no exception listener registered: "
1919                        + error, error);
1920            }
1921        }
1922    }
1923    /**
1924     * Used for handling async exceptions
1925     *
1926     * @param error
1927     */
1928    public void onAsyncException(Throwable error) {
1929        if (!closed.get() && !closing.get()) {
1930            if (this.exceptionListener != null) {
1931
1932                if (!(error instanceof JMSException)) {
1933                    error = JMSExceptionSupport.create(error);
1934                }
1935                final JMSException e = (JMSException)error;
1936
1937                executor.execute(new Runnable() {
1938                    public void run() {
1939                        ActiveMQConnection.this.exceptionListener.onException(e);
1940                    }
1941                });
1942
1943            } else {
1944                LOG.debug("Async exception with no exception listener: " + error, error);
1945            }
1946        }
1947    }
1948
1949    public void onException(final IOException error) {
1950        onAsyncException(error);
1951        if (!closing.get() && !closed.get()) {
1952            executor.execute(new Runnable() {
1953                public void run() {
1954                    transportFailed(error);
1955                    ServiceSupport.dispose(ActiveMQConnection.this.transport);
1956                    brokerInfoReceived.countDown();
1957                    try {
1958                        cleanup();
1959                    } catch (JMSException e) {
1960                        LOG.warn("Exception during connection cleanup, " + e, e);
1961                    }
1962                    for (Iterator<TransportListener> iter = transportListeners
1963                            .iterator(); iter.hasNext();) {
1964                        TransportListener listener = iter.next();
1965                        listener.onException(error);
1966                    }
1967                }
1968            });
1969        }
1970    }
1971
1972    public void transportInterupted() {
1973        this.transportInterruptionProcessingComplete = new CountDownLatch(dispatchers.size() - (advisoryConsumer != null ? 1:0));
1974        if (LOG.isDebugEnabled()) {
1975            LOG.debug("transport interrupted, dispatchers: " + transportInterruptionProcessingComplete.getCount());
1976        }
1977        signalInterruptionProcessingNeeded();
1978
1979        for (Iterator<ActiveMQSession> i = this.sessions.iterator(); i.hasNext();) {
1980            ActiveMQSession s = i.next();
1981            s.clearMessagesInProgress();
1982        }
1983
1984        for (ActiveMQConnectionConsumer connectionConsumer : this.connectionConsumers) {
1985            connectionConsumer.clearMessagesInProgress();
1986        }
1987
1988        for (Iterator<TransportListener> iter = transportListeners.iterator(); iter.hasNext();) {
1989            TransportListener listener = iter.next();
1990            listener.transportInterupted();
1991        }
1992    }
1993
1994    public void transportResumed() {
1995        for (Iterator<TransportListener> iter = transportListeners.iterator(); iter.hasNext();) {
1996            TransportListener listener = iter.next();
1997            listener.transportResumed();
1998        }
1999    }
2000
2001    /**
2002     * Create the DestinationInfo object for the temporary destination.
2003     *
2004     * @param topic - if its true topic, else queue.
2005     * @return DestinationInfo
2006     * @throws JMSException
2007     */
2008    protected ActiveMQTempDestination createTempDestination(boolean topic) throws JMSException {
2009
2010        // Check if Destination info is of temporary type.
2011        ActiveMQTempDestination dest;
2012        if (topic) {
2013            dest = new ActiveMQTempTopic(info.getConnectionId(), tempDestinationIdGenerator.getNextSequenceId());
2014        } else {
2015            dest = new ActiveMQTempQueue(info.getConnectionId(), tempDestinationIdGenerator.getNextSequenceId());
2016        }
2017
2018        DestinationInfo info = new DestinationInfo();
2019        info.setConnectionId(this.info.getConnectionId());
2020        info.setOperationType(DestinationInfo.ADD_OPERATION_TYPE);
2021        info.setDestination(dest);
2022        syncSendPacket(info);
2023
2024        dest.setConnection(this);
2025        activeTempDestinations.put(dest, dest);
2026        return dest;
2027    }
2028
2029    /**
2030     * @param destination
2031     * @throws JMSException
2032     */
2033    public void deleteTempDestination(ActiveMQTempDestination destination) throws JMSException {
2034
2035        checkClosedOrFailed();
2036
2037        for (Iterator<ActiveMQSession> i = this.sessions.iterator(); i.hasNext();) {
2038            ActiveMQSession s = i.next();
2039            if (s.isInUse(destination)) {
2040                throw new JMSException("A consumer is consuming from the temporary destination");
2041            }
2042        }
2043
2044        activeTempDestinations.remove(destination);
2045
2046        DestinationInfo destInfo = new DestinationInfo();
2047        destInfo.setConnectionId(this.info.getConnectionId());
2048        destInfo.setOperationType(DestinationInfo.REMOVE_OPERATION_TYPE);
2049        destInfo.setDestination(destination);
2050        destInfo.setTimeout(0);
2051        syncSendPacket(destInfo);
2052    }
2053
2054    public boolean isDeleted(ActiveMQDestination dest) {
2055
2056        // If we are not watching the advisories.. then
2057        // we will assume that the temp destination does exist.
2058        if (advisoryConsumer == null) {
2059            return false;
2060        }
2061
2062        return !activeTempDestinations.contains(dest);
2063    }
2064
2065    public boolean isCopyMessageOnSend() {
2066        return copyMessageOnSend;
2067    }
2068
2069    public LongSequenceGenerator getLocalTransactionIdGenerator() {
2070        return localTransactionIdGenerator;
2071    }
2072
2073    public boolean isUseCompression() {
2074        return useCompression;
2075    }
2076
2077    /**
2078     * Enables the use of compression of the message bodies
2079     */
2080    public void setUseCompression(boolean useCompression) {
2081        this.useCompression = useCompression;
2082    }
2083
2084    public void destroyDestination(ActiveMQDestination destination) throws JMSException {
2085
2086        checkClosedOrFailed();
2087        ensureConnectionInfoSent();
2088
2089        DestinationInfo info = new DestinationInfo();
2090        info.setConnectionId(this.info.getConnectionId());
2091        info.setOperationType(DestinationInfo.REMOVE_OPERATION_TYPE);
2092        info.setDestination(destination);
2093        info.setTimeout(0);
2094        syncSendPacket(info);
2095
2096    }
2097
2098    public boolean isDispatchAsync() {
2099        return dispatchAsync;
2100    }
2101
2102    /**
2103     * Enables or disables the default setting of whether or not consumers have
2104     * their messages <a
2105     * href="http://activemq.apache.org/consumer-dispatch-async.html">dispatched
2106     * synchronously or asynchronously by the broker</a>. For non-durable
2107     * topics for example we typically dispatch synchronously by default to
2108     * minimize context switches which boost performance. However sometimes its
2109     * better to go slower to ensure that a single blocked consumer socket does
2110     * not block delivery to other consumers.
2111     *
2112     * @param asyncDispatch If true then consumers created on this connection
2113     *                will default to having their messages dispatched
2114     *                asynchronously. The default value is true.
2115     */
2116    public void setDispatchAsync(boolean asyncDispatch) {
2117        this.dispatchAsync = asyncDispatch;
2118    }
2119
2120    public boolean isObjectMessageSerializationDefered() {
2121        return objectMessageSerializationDefered;
2122    }
2123
2124    /**
2125     * When an object is set on an ObjectMessage, the JMS spec requires the
2126     * object to be serialized by that set method. Enabling this flag causes the
2127     * object to not get serialized. The object may subsequently get serialized
2128     * if the message needs to be sent over a socket or stored to disk.
2129     */
2130    public void setObjectMessageSerializationDefered(boolean objectMessageSerializationDefered) {
2131        this.objectMessageSerializationDefered = objectMessageSerializationDefered;
2132    }
2133
2134    public InputStream createInputStream(Destination dest) throws JMSException {
2135        return createInputStream(dest, null);
2136    }
2137
2138    public InputStream createInputStream(Destination dest, String messageSelector) throws JMSException {
2139        return createInputStream(dest, messageSelector, false);
2140    }
2141
2142    public InputStream createInputStream(Destination dest, String messageSelector, boolean noLocal) throws JMSException {
2143        return createInputStream(dest, messageSelector, noLocal,  -1);
2144    }
2145
2146
2147
2148    public InputStream createInputStream(Destination dest, String messageSelector, boolean noLocal, long timeout) throws JMSException {
2149        return doCreateInputStream(dest, messageSelector, noLocal, null, timeout);
2150    }
2151
2152    public InputStream createDurableInputStream(Topic dest, String name) throws JMSException {
2153        return createInputStream(dest, null, false);
2154    }
2155
2156    public InputStream createDurableInputStream(Topic dest, String name, String messageSelector) throws JMSException {
2157        return createDurableInputStream(dest, name, messageSelector, false);
2158    }
2159
2160    public InputStream createDurableInputStream(Topic dest, String name, String messageSelector, boolean noLocal) throws JMSException {
2161        return createDurableInputStream(dest, name, messageSelector, noLocal, -1);
2162    }
2163
2164    public InputStream createDurableInputStream(Topic dest, String name, String messageSelector, boolean noLocal, long timeout) throws JMSException {
2165        return doCreateInputStream(dest, messageSelector, noLocal, name, timeout);
2166    }
2167
2168    private InputStream doCreateInputStream(Destination dest, String messageSelector, boolean noLocal, String subName, long timeout) throws JMSException {
2169        checkClosedOrFailed();
2170        ensureConnectionInfoSent();
2171        return new ActiveMQInputStream(this, createConsumerId(), ActiveMQDestination.transform(dest), messageSelector, noLocal, subName, prefetchPolicy.getInputStreamPrefetch(), timeout);
2172    }
2173
2174    /**
2175     * Creates a persistent output stream; individual messages will be written
2176     * to disk/database by the broker
2177     */
2178    public OutputStream createOutputStream(Destination dest) throws JMSException {
2179        return createOutputStream(dest, null, ActiveMQMessage.DEFAULT_DELIVERY_MODE, ActiveMQMessage.DEFAULT_PRIORITY, ActiveMQMessage.DEFAULT_TIME_TO_LIVE);
2180    }
2181
2182    /**
2183     * Creates a non persistent output stream; messages will not be written to
2184     * disk
2185     */
2186    public OutputStream createNonPersistentOutputStream(Destination dest) throws JMSException {
2187        return createOutputStream(dest, null, DeliveryMode.NON_PERSISTENT, ActiveMQMessage.DEFAULT_PRIORITY, ActiveMQMessage.DEFAULT_TIME_TO_LIVE);
2188    }
2189
2190    /**
2191     * Creates an output stream allowing full control over the delivery mode,
2192     * the priority and time to live of the messages and the properties added to
2193     * messages on the stream.
2194     *
2195     * @param streamProperties defines a map of key-value pairs where the keys
2196     *                are strings and the values are primitive values (numbers
2197     *                and strings) which are appended to the messages similarly
2198     *                to using the
2199     *                {@link javax.jms.Message#setObjectProperty(String, Object)}
2200     *                method
2201     */
2202    public OutputStream createOutputStream(Destination dest, Map<String, Object> streamProperties, int deliveryMode, int priority, long timeToLive) throws JMSException {
2203        checkClosedOrFailed();
2204        ensureConnectionInfoSent();
2205        return new ActiveMQOutputStream(this, createProducerId(), ActiveMQDestination.transform(dest), streamProperties, deliveryMode, priority, timeToLive);
2206    }
2207
2208    /**
2209     * Unsubscribes a durable subscription that has been created by a client.
2210     * <P>
2211     * This method deletes the state being maintained on behalf of the
2212     * subscriber by its provider.
2213     * <P>
2214     * It is erroneous for a client to delete a durable subscription while there
2215     * is an active <CODE>MessageConsumer </CODE> or
2216     * <CODE>TopicSubscriber</CODE> for the subscription, or while a consumed
2217     * message is part of a pending transaction or has not been acknowledged in
2218     * the session.
2219     *
2220     * @param name the name used to identify this subscription
2221     * @throws JMSException if the session fails to unsubscribe to the durable
2222     *                 subscription due to some internal error.
2223     * @throws InvalidDestinationException if an invalid subscription name is
2224     *                 specified.
2225     * @since 1.1
2226     */
2227    public void unsubscribe(String name) throws InvalidDestinationException, JMSException {
2228        checkClosedOrFailed();
2229        RemoveSubscriptionInfo rsi = new RemoveSubscriptionInfo();
2230        rsi.setConnectionId(getConnectionInfo().getConnectionId());
2231        rsi.setSubscriptionName(name);
2232        rsi.setClientId(getConnectionInfo().getClientId());
2233        syncSendPacket(rsi);
2234    }
2235
2236    /**
2237     * Internal send method optimized: - It does not copy the message - It can
2238     * only handle ActiveMQ messages. - You can specify if the send is async or
2239     * sync - Does not allow you to send /w a transaction.
2240     */
2241    void send(ActiveMQDestination destination, ActiveMQMessage msg, MessageId messageId, int deliveryMode, int priority, long timeToLive, boolean async) throws JMSException {
2242        checkClosedOrFailed();
2243
2244        if (destination.isTemporary() && isDeleted(destination)) {
2245            throw new JMSException("Cannot publish to a deleted Destination: " + destination);
2246        }
2247
2248        msg.setJMSDestination(destination);
2249        msg.setJMSDeliveryMode(deliveryMode);
2250        long expiration = 0L;
2251
2252        if (!isDisableTimeStampsByDefault()) {
2253            long timeStamp = System.currentTimeMillis();
2254            msg.setJMSTimestamp(timeStamp);
2255            if (timeToLive > 0) {
2256                expiration = timeToLive + timeStamp;
2257            }
2258        }
2259
2260        msg.setJMSExpiration(expiration);
2261        msg.setJMSPriority(priority);
2262
2263        msg.setJMSRedelivered(false);
2264        msg.setMessageId(messageId);
2265
2266        msg.onSend();
2267
2268        msg.setProducerId(msg.getMessageId().getProducerId());
2269
2270        if (LOG.isDebugEnabled()) {
2271            LOG.debug("Sending message: " + msg);
2272        }
2273
2274        if (async) {
2275            asyncSendPacket(msg);
2276        } else {
2277            syncSendPacket(msg);
2278        }
2279
2280    }
2281
2282    public void addOutputStream(ActiveMQOutputStream stream) {
2283        outputStreams.add(stream);
2284    }
2285
2286    public void removeOutputStream(ActiveMQOutputStream stream) {
2287        outputStreams.remove(stream);
2288    }
2289
2290    public void addInputStream(ActiveMQInputStream stream) {
2291        inputStreams.add(stream);
2292    }
2293
2294    public void removeInputStream(ActiveMQInputStream stream) {
2295        inputStreams.remove(stream);
2296    }
2297
2298    protected void onConnectionControl(ConnectionControl command) {
2299        if (command.isFaultTolerant()) {
2300            this.optimizeAcknowledge = false;
2301            for (Iterator<ActiveMQSession> i = this.sessions.iterator(); i.hasNext();) {
2302                ActiveMQSession s = i.next();
2303                s.setOptimizeAcknowledge(false);
2304            }
2305        }
2306    }
2307
2308    protected void onConsumerControl(ConsumerControl command) {
2309        if (command.isClose()) {
2310            for (Iterator<ActiveMQSession> i = this.sessions.iterator(); i.hasNext();) {
2311                ActiveMQSession s = i.next();
2312                s.close(command.getConsumerId());
2313            }
2314        } else {
2315            for (Iterator<ActiveMQSession> i = this.sessions.iterator(); i.hasNext();) {
2316                ActiveMQSession s = i.next();
2317                s.setPrefetchSize(command.getConsumerId(), command.getPrefetch());
2318            }
2319        }
2320    }
2321
2322    protected void transportFailed(IOException error) {
2323        transportFailed.set(true);
2324        if (firstFailureError == null) {
2325            firstFailureError = error;
2326        }
2327    }
2328
2329    /**
2330     * Should a JMS message be copied to a new JMS Message object as part of the
2331     * send() method in JMS. This is enabled by default to be compliant with the
2332     * JMS specification. You can disable it if you do not mutate JMS messages
2333     * after they are sent for a performance boost
2334     */
2335    public void setCopyMessageOnSend(boolean copyMessageOnSend) {
2336        this.copyMessageOnSend = copyMessageOnSend;
2337    }
2338
2339    @Override
2340    public String toString() {
2341        return "ActiveMQConnection {id=" + info.getConnectionId() + ",clientId=" + info.getClientId() + ",started=" + started.get() + "}";
2342    }
2343
2344    protected BlobTransferPolicy createBlobTransferPolicy() {
2345        return new BlobTransferPolicy();
2346    }
2347
2348    public int getProtocolVersion() {
2349        return protocolVersion.get();
2350    }
2351
2352    public int getProducerWindowSize() {
2353        return producerWindowSize;
2354    }
2355
2356    public void setProducerWindowSize(int producerWindowSize) {
2357        this.producerWindowSize = producerWindowSize;
2358    }
2359
2360    public void setAuditDepth(int auditDepth) {
2361        connectionAudit.setAuditDepth(auditDepth);
2362    }
2363
2364    public void setAuditMaximumProducerNumber(int auditMaximumProducerNumber) {
2365        connectionAudit.setAuditMaximumProducerNumber(auditMaximumProducerNumber);
2366    }
2367
2368    protected void removeDispatcher(ActiveMQDispatcher dispatcher) {
2369        connectionAudit.removeDispatcher(dispatcher);
2370    }
2371
2372    protected boolean isDuplicate(ActiveMQDispatcher dispatcher, Message message) {
2373        return checkForDuplicates && connectionAudit.isDuplicate(dispatcher, message);
2374    }
2375
2376    protected void rollbackDuplicate(ActiveMQDispatcher dispatcher, Message message) {
2377        connectionAudit.rollbackDuplicate(dispatcher, message);
2378    }
2379
2380    public IOException getFirstFailureError() {
2381        return firstFailureError;
2382    }
2383
2384    protected void waitForTransportInterruptionProcessingToComplete() throws InterruptedException {
2385        CountDownLatch cdl = this.transportInterruptionProcessingComplete;
2386        if (cdl != null) {
2387            if (!closed.get() && !transportFailed.get() && cdl.getCount()>0) {
2388                LOG.warn("dispatch paused, waiting for outstanding dispatch interruption processing (" + cdl.getCount() + ") to complete..");
2389                cdl.await(10, TimeUnit.SECONDS);
2390            }
2391            signalInterruptionProcessingComplete();
2392        }
2393    }
2394
2395    protected void transportInterruptionProcessingComplete() {
2396        CountDownLatch cdl = this.transportInterruptionProcessingComplete;
2397        if (cdl != null) {
2398            cdl.countDown();
2399            try {
2400                signalInterruptionProcessingComplete();
2401            } catch (InterruptedException ignored) {}
2402        }
2403    }
2404
2405    private void signalInterruptionProcessingComplete() throws InterruptedException {
2406        CountDownLatch cdl = this.transportInterruptionProcessingComplete;
2407        if (cdl.getCount()==0) {
2408            if (LOG.isDebugEnabled()) {
2409                LOG.debug("transportInterruptionProcessingComplete for: " + this.getConnectionInfo().getConnectionId());
2410            }
2411            this.transportInterruptionProcessingComplete = null;
2412
2413            FailoverTransport failoverTransport = transport.narrow(FailoverTransport.class);
2414            if (failoverTransport != null) {
2415                failoverTransport.connectionInterruptProcessingComplete(this.getConnectionInfo().getConnectionId());
2416                if (LOG.isDebugEnabled()) {
2417                    LOG.debug("notified failover transport (" + failoverTransport
2418                            + ") of interruption completion for: " + this.getConnectionInfo().getConnectionId());
2419                }
2420            }
2421
2422        }
2423    }
2424
2425    private void signalInterruptionProcessingNeeded() {
2426        FailoverTransport failoverTransport = transport.narrow(FailoverTransport.class);
2427        if (failoverTransport != null) {
2428            failoverTransport.getStateTracker().transportInterrupted(this.getConnectionInfo().getConnectionId());
2429            if (LOG.isDebugEnabled()) {
2430                LOG.debug("notified failover transport (" + failoverTransport
2431                        + ") of pending interruption processing for: " + this.getConnectionInfo().getConnectionId());
2432            }
2433        }
2434    }
2435
2436    /*
2437     * specify the amount of time in milliseconds that a consumer with a transaction pending recovery
2438     * will wait to receive re dispatched messages.
2439     * default value is 0 so there is no wait by default.
2440     */
2441    public void setConsumerFailoverRedeliveryWaitPeriod(long consumerFailoverRedeliveryWaitPeriod) {
2442        this.consumerFailoverRedeliveryWaitPeriod = consumerFailoverRedeliveryWaitPeriod;
2443    }
2444
2445    public long getConsumerFailoverRedeliveryWaitPeriod() {
2446        return consumerFailoverRedeliveryWaitPeriod;
2447    }
2448
2449    protected Scheduler getScheduler() throws JMSException {
2450        Scheduler result = scheduler;
2451        if (result == null) {
2452            synchronized (this) {
2453                result = scheduler;
2454                if (result == null) {
2455                    checkClosed();
2456                    try {
2457                        result = scheduler = new Scheduler("ActiveMQConnection["+info.getConnectionId().getValue()+"] Scheduler");
2458                        scheduler.start();
2459                    } catch(Exception e) {
2460                        throw JMSExceptionSupport.create(e);
2461                    }
2462                }
2463            }
2464        }
2465        return result;
2466    }
2467
2468    protected ThreadPoolExecutor getExecutor() {
2469        return this.executor;
2470    }
2471
2472    /**
2473     * @return the checkForDuplicates
2474     */
2475    public boolean isCheckForDuplicates() {
2476        return this.checkForDuplicates;
2477    }
2478
2479    /**
2480     * @param checkForDuplicates the checkForDuplicates to set
2481     */
2482    public void setCheckForDuplicates(boolean checkForDuplicates) {
2483        this.checkForDuplicates = checkForDuplicates;
2484    }
2485
2486
2487    public boolean isTransactedIndividualAck() {
2488        return transactedIndividualAck;
2489    }
2490
2491    public void setTransactedIndividualAck(boolean transactedIndividualAck) {
2492        this.transactedIndividualAck = transactedIndividualAck;
2493    }
2494
2495    public boolean isNonBlockingRedelivery() {
2496        return nonBlockingRedelivery;
2497    }
2498
2499    public void setNonBlockingRedelivery(boolean nonBlockingRedelivery) {
2500        this.nonBlockingRedelivery = nonBlockingRedelivery;
2501    }
2502
2503    /**
2504     * Removes any TempDestinations that this connection has cached, ignoring
2505     * any exceptions generated because the destination is in use as they should
2506     * not be removed.
2507     */
2508    public void cleanUpTempDestinations() {
2509
2510        if (this.activeTempDestinations == null || this.activeTempDestinations.isEmpty()) {
2511            return;
2512        }
2513
2514        Iterator<ConcurrentHashMap.Entry<ActiveMQTempDestination, ActiveMQTempDestination>> entries
2515            = this.activeTempDestinations.entrySet().iterator();
2516        while(entries.hasNext()) {
2517            ConcurrentHashMap.Entry<ActiveMQTempDestination, ActiveMQTempDestination> entry = entries.next();
2518            try {
2519                // Only delete this temp destination if it was created from this connection. The connection used
2520                // for the advisory consumer may also have a reference to this temp destination.
2521                ActiveMQTempDestination dest = entry.getValue();
2522                String thisConnectionId = (info.getConnectionId() == null) ? "" : info.getConnectionId().toString();
2523                if (dest.getConnectionId() != null && dest.getConnectionId().equals(thisConnectionId)) {
2524                    this.deleteTempDestination(entry.getValue());
2525                }
2526            } catch (Exception ex) {
2527                // the temp dest is in use so it can not be deleted.
2528                // it is ok to leave it to connection tear down phase
2529            }
2530        }
2531    }
2532}