001/**
002 * Licensed to the Apache Software Foundation (ASF) under one or more
003 * contributor license agreements.  See the NOTICE file distributed with
004 * this work for additional information regarding copyright ownership.
005 * The ASF licenses this file to You under the Apache License, Version 2.0
006 * (the "License"); you may not use this file except in compliance with
007 * the License.  You may obtain a copy of the License at
008 *
009 *      http://www.apache.org/licenses/LICENSE-2.0
010 *
011 * Unless required by applicable law or agreed to in writing, software
012 * distributed under the License is distributed on an "AS IS" BASIS,
013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014 * See the License for the specific language governing permissions and
015 * limitations under the License.
016 */
017package org.apache.activemq;
018
019import java.io.File;
020import java.io.InputStream;
021import java.io.Serializable;
022import java.net.URL;
023import java.util.Collections;
024import java.util.Iterator;
025import java.util.List;
026import java.util.concurrent.CopyOnWriteArrayList;
027import java.util.concurrent.ThreadPoolExecutor;
028import java.util.concurrent.atomic.AtomicBoolean;
029
030import javax.jms.BytesMessage;
031import javax.jms.Destination;
032import javax.jms.IllegalStateException;
033import javax.jms.InvalidDestinationException;
034import javax.jms.InvalidSelectorException;
035import javax.jms.JMSException;
036import javax.jms.MapMessage;
037import javax.jms.Message;
038import javax.jms.MessageConsumer;
039import javax.jms.MessageListener;
040import javax.jms.MessageProducer;
041import javax.jms.ObjectMessage;
042import javax.jms.Queue;
043import javax.jms.QueueBrowser;
044import javax.jms.QueueReceiver;
045import javax.jms.QueueSender;
046import javax.jms.QueueSession;
047import javax.jms.Session;
048import javax.jms.StreamMessage;
049import javax.jms.TemporaryQueue;
050import javax.jms.TemporaryTopic;
051import javax.jms.TextMessage;
052import javax.jms.Topic;
053import javax.jms.TopicPublisher;
054import javax.jms.TopicSession;
055import javax.jms.TopicSubscriber;
056import javax.jms.TransactionRolledBackException;
057
058import org.apache.activemq.blob.BlobDownloader;
059import org.apache.activemq.blob.BlobTransferPolicy;
060import org.apache.activemq.blob.BlobUploader;
061import org.apache.activemq.command.ActiveMQBlobMessage;
062import org.apache.activemq.command.ActiveMQBytesMessage;
063import org.apache.activemq.command.ActiveMQDestination;
064import org.apache.activemq.command.ActiveMQMapMessage;
065import org.apache.activemq.command.ActiveMQMessage;
066import org.apache.activemq.command.ActiveMQObjectMessage;
067import org.apache.activemq.command.ActiveMQQueue;
068import org.apache.activemq.command.ActiveMQStreamMessage;
069import org.apache.activemq.command.ActiveMQTempDestination;
070import org.apache.activemq.command.ActiveMQTempQueue;
071import org.apache.activemq.command.ActiveMQTempTopic;
072import org.apache.activemq.command.ActiveMQTextMessage;
073import org.apache.activemq.command.ActiveMQTopic;
074import org.apache.activemq.command.Command;
075import org.apache.activemq.command.ConsumerId;
076import org.apache.activemq.command.MessageAck;
077import org.apache.activemq.command.MessageDispatch;
078import org.apache.activemq.command.MessageId;
079import org.apache.activemq.command.ProducerId;
080import org.apache.activemq.command.RemoveInfo;
081import org.apache.activemq.command.Response;
082import org.apache.activemq.command.SessionId;
083import org.apache.activemq.command.SessionInfo;
084import org.apache.activemq.command.TransactionId;
085import org.apache.activemq.management.JMSSessionStatsImpl;
086import org.apache.activemq.management.StatsCapable;
087import org.apache.activemq.management.StatsImpl;
088import org.apache.activemq.thread.Scheduler;
089import org.apache.activemq.transaction.Synchronization;
090import org.apache.activemq.usage.MemoryUsage;
091import org.apache.activemq.util.Callback;
092import org.apache.activemq.util.JMSExceptionSupport;
093import org.apache.activemq.util.LongSequenceGenerator;
094import org.slf4j.Logger;
095import org.slf4j.LoggerFactory;
096
097/**
098 * <P>
099 * A <CODE>Session</CODE> object is a single-threaded context for producing
100 * and consuming messages. Although it may allocate provider resources outside
101 * the Java virtual machine (JVM), it is considered a lightweight JMS object.
102 * <P>
103 * A session serves several purposes:
104 * <UL>
105 * <LI>It is a factory for its message producers and consumers.
106 * <LI>It supplies provider-optimized message factories.
107 * <LI>It is a factory for <CODE>TemporaryTopics</CODE> and
108 * <CODE>TemporaryQueues</CODE>.
109 * <LI>It provides a way to create <CODE>Queue</CODE> or <CODE>Topic</CODE>
110 * objects for those clients that need to dynamically manipulate
111 * provider-specific destination names.
112 * <LI>It supports a single series of transactions that combine work spanning
113 * its producers and consumers into atomic units.
114 * <LI>It defines a serial order for the messages it consumes and the messages
115 * it produces.
116 * <LI>It retains messages it consumes until they have been acknowledged.
117 * <LI>It serializes execution of message listeners registered with its message
118 * consumers.
119 * <LI>It is a factory for <CODE>QueueBrowsers</CODE>.
120 * </UL>
121 * <P>
122 * A session can create and service multiple message producers and consumers.
123 * <P>
124 * One typical use is to have a thread block on a synchronous
125 * <CODE>MessageConsumer</CODE> until a message arrives. The thread may then
126 * use one or more of the <CODE>Session</CODE>'s<CODE>MessageProducer</CODE>s.
127 * <P>
128 * If a client desires to have one thread produce messages while others consume
129 * them, the client should use a separate session for its producing thread.
130 * <P>
131 * Once a connection has been started, any session with one or more registered
132 * message listeners is dedicated to the thread of control that delivers
133 * messages to it. It is erroneous for client code to use this session or any of
134 * its constituent objects from another thread of control. The only exception to
135 * this rule is the use of the session or connection <CODE>close</CODE>
136 * method.
137 * <P>
138 * It should be easy for most clients to partition their work naturally into
139 * sessions. This model allows clients to start simply and incrementally add
140 * message processing complexity as their need for concurrency grows.
141 * <P>
142 * The <CODE>close</CODE> method is the only session method that can be called
143 * while some other session method is being executed in another thread.
144 * <P>
145 * A session may be specified as transacted. Each transacted session supports a
146 * single series of transactions. Each transaction groups a set of message sends
147 * and a set of message receives into an atomic unit of work. In effect,
148 * transactions organize a session's input message stream and output message
149 * stream into series of atomic units. When a transaction commits, its atomic
150 * unit of input is acknowledged and its associated atomic unit of output is
151 * sent. If a transaction rollback is done, the transaction's sent messages are
152 * destroyed and the session's input is automatically recovered.
153 * <P>
154 * The content of a transaction's input and output units is simply those
155 * messages that have been produced and consumed within the session's current
156 * transaction.
157 * <P>
158 * A transaction is completed using either its session's <CODE>commit</CODE>
159 * method or its session's <CODE>rollback </CODE> method. The completion of a
160 * session's current transaction automatically begins the next. The result is
161 * that a transacted session always has a current transaction within which its
162 * work is done.
163 * <P>
164 * The Java Transaction Service (JTS) or some other transaction monitor may be
165 * used to combine a session's transaction with transactions on other resources
166 * (databases, other JMS sessions, etc.). Since Java distributed transactions
167 * are controlled via the Java Transaction API (JTA), use of the session's
168 * <CODE>commit</CODE> and <CODE>rollback</CODE> methods in this context is
169 * prohibited.
170 * <P>
171 * The JMS API does not require support for JTA; however, it does define how a
172 * provider supplies this support.
173 * <P>
174 * Although it is also possible for a JMS client to handle distributed
175 * transactions directly, it is unlikely that many JMS clients will do this.
176 * Support for JTA in the JMS API is targeted at systems vendors who will be
177 * integrating the JMS API into their application server products.
178 *
179 *
180 * @see javax.jms.Session
181 * @see javax.jms.QueueSession
182 * @see javax.jms.TopicSession
183 * @see javax.jms.XASession
184 */
185public class ActiveMQSession implements Session, QueueSession, TopicSession, StatsCapable, ActiveMQDispatcher {
186
187    /**
188     * Only acknowledge an individual message - using message.acknowledge()
189     * as opposed to CLIENT_ACKNOWLEDGE which
190     * acknowledges all messages consumed by a session at when acknowledge()
191     * is called
192     */
193    public static final int INDIVIDUAL_ACKNOWLEDGE = 4;
194    public static final int MAX_ACK_CONSTANT = INDIVIDUAL_ACKNOWLEDGE;
195
196    public static interface DeliveryListener {
197        void beforeDelivery(ActiveMQSession session, Message msg);
198
199        void afterDelivery(ActiveMQSession session, Message msg);
200    }
201
202    private static final Logger LOG = LoggerFactory.getLogger(ActiveMQSession.class);
203    private final ThreadPoolExecutor connectionExecutor;
204
205    protected int acknowledgementMode;
206    protected final ActiveMQConnection connection;
207    protected final SessionInfo info;
208    protected final LongSequenceGenerator consumerIdGenerator = new LongSequenceGenerator();
209    protected final LongSequenceGenerator producerIdGenerator = new LongSequenceGenerator();
210    protected final LongSequenceGenerator deliveryIdGenerator = new LongSequenceGenerator();
211    protected final ActiveMQSessionExecutor executor;
212    protected final AtomicBoolean started = new AtomicBoolean(false);
213
214    protected final CopyOnWriteArrayList<ActiveMQMessageConsumer> consumers = new CopyOnWriteArrayList<ActiveMQMessageConsumer>();
215    protected final CopyOnWriteArrayList<ActiveMQMessageProducer> producers = new CopyOnWriteArrayList<ActiveMQMessageProducer>();
216
217    protected boolean closed;
218    private volatile boolean synchronizationRegistered;
219    protected boolean asyncDispatch;
220    protected boolean sessionAsyncDispatch;
221    protected final boolean debug;
222    protected Object sendMutex = new Object();
223
224    private MessageListener messageListener;
225    private final JMSSessionStatsImpl stats;
226    private TransactionContext transactionContext;
227    private DeliveryListener deliveryListener;
228    private MessageTransformer transformer;
229    private BlobTransferPolicy blobTransferPolicy;
230    private long lastDeliveredSequenceId;
231
232    /**
233     * Construct the Session
234     *
235     * @param connection
236     * @param sessionId
237     * @param acknowledgeMode n.b if transacted - the acknowledgeMode ==
238     *                Session.SESSION_TRANSACTED
239     * @param asyncDispatch
240     * @param sessionAsyncDispatch
241     * @throws JMSException on internal error
242     */
243    protected ActiveMQSession(ActiveMQConnection connection, SessionId sessionId, int acknowledgeMode, boolean asyncDispatch, boolean sessionAsyncDispatch) throws JMSException {
244        this.debug = LOG.isDebugEnabled();
245        this.connection = connection;
246        this.acknowledgementMode = acknowledgeMode;
247        this.asyncDispatch = asyncDispatch;
248        this.sessionAsyncDispatch = sessionAsyncDispatch;
249        this.info = new SessionInfo(connection.getConnectionInfo(), sessionId.getValue());
250        setTransactionContext(new TransactionContext(connection));
251        stats = new JMSSessionStatsImpl(producers, consumers);
252        this.connection.asyncSendPacket(info);
253        setTransformer(connection.getTransformer());
254        setBlobTransferPolicy(connection.getBlobTransferPolicy());
255        this.connectionExecutor=connection.getExecutor();
256        this.executor = new ActiveMQSessionExecutor(this);
257        connection.addSession(this);
258        if (connection.isStarted()) {
259            start();
260        }
261
262    }
263
264    protected ActiveMQSession(ActiveMQConnection connection, SessionId sessionId, int acknowledgeMode, boolean asyncDispatch) throws JMSException {
265        this(connection, sessionId, acknowledgeMode, asyncDispatch, true);
266    }
267
268    /**
269     * Sets the transaction context of the session.
270     *
271     * @param transactionContext - provides the means to control a JMS
272     *                transaction.
273     */
274    public void setTransactionContext(TransactionContext transactionContext) {
275        this.transactionContext = transactionContext;
276    }
277
278    /**
279     * Returns the transaction context of the session.
280     *
281     * @return transactionContext - session's transaction context.
282     */
283    public TransactionContext getTransactionContext() {
284        return transactionContext;
285    }
286
287    /*
288     * (non-Javadoc)
289     *
290     * @see org.apache.activemq.management.StatsCapable#getStats()
291     */
292    public StatsImpl getStats() {
293        return stats;
294    }
295
296    /**
297     * Returns the session's statistics.
298     *
299     * @return stats - session's statistics.
300     */
301    public JMSSessionStatsImpl getSessionStats() {
302        return stats;
303    }
304
305    /**
306     * Creates a <CODE>BytesMessage</CODE> object. A <CODE>BytesMessage</CODE>
307     * object is used to send a message containing a stream of uninterpreted
308     * bytes.
309     *
310     * @return the an ActiveMQBytesMessage
311     * @throws JMSException if the JMS provider fails to create this message due
312     *                 to some internal error.
313     */
314    public BytesMessage createBytesMessage() throws JMSException {
315        ActiveMQBytesMessage message = new ActiveMQBytesMessage();
316        configureMessage(message);
317        return message;
318    }
319
320    /**
321     * Creates a <CODE>MapMessage</CODE> object. A <CODE>MapMessage</CODE>
322     * object is used to send a self-defining set of name-value pairs, where
323     * names are <CODE>String</CODE> objects and values are primitive values
324     * in the Java programming language.
325     *
326     * @return an ActiveMQMapMessage
327     * @throws JMSException if the JMS provider fails to create this message due
328     *                 to some internal error.
329     */
330    public MapMessage createMapMessage() throws JMSException {
331        ActiveMQMapMessage message = new ActiveMQMapMessage();
332        configureMessage(message);
333        return message;
334    }
335
336    /**
337     * Creates a <CODE>Message</CODE> object. The <CODE>Message</CODE>
338     * interface is the root interface of all JMS messages. A
339     * <CODE>Message</CODE> object holds all the standard message header
340     * information. It can be sent when a message containing only header
341     * information is sufficient.
342     *
343     * @return an ActiveMQMessage
344     * @throws JMSException if the JMS provider fails to create this message due
345     *                 to some internal error.
346     */
347    public Message createMessage() throws JMSException {
348        ActiveMQMessage message = new ActiveMQMessage();
349        configureMessage(message);
350        return message;
351    }
352
353    /**
354     * Creates an <CODE>ObjectMessage</CODE> object. An
355     * <CODE>ObjectMessage</CODE> object is used to send a message that
356     * contains a serializable Java object.
357     *
358     * @return an ActiveMQObjectMessage
359     * @throws JMSException if the JMS provider fails to create this message due
360     *                 to some internal error.
361     */
362    public ObjectMessage createObjectMessage() throws JMSException {
363        ActiveMQObjectMessage message = new ActiveMQObjectMessage();
364        configureMessage(message);
365        return message;
366    }
367
368    /**
369     * Creates an initialized <CODE>ObjectMessage</CODE> object. An
370     * <CODE>ObjectMessage</CODE> object is used to send a message that
371     * contains a serializable Java object.
372     *
373     * @param object the object to use to initialize this message
374     * @return an ActiveMQObjectMessage
375     * @throws JMSException if the JMS provider fails to create this message due
376     *                 to some internal error.
377     */
378    public ObjectMessage createObjectMessage(Serializable object) throws JMSException {
379        ActiveMQObjectMessage message = new ActiveMQObjectMessage();
380        configureMessage(message);
381        message.setObject(object);
382        return message;
383    }
384
385    /**
386     * Creates a <CODE>StreamMessage</CODE> object. A
387     * <CODE>StreamMessage</CODE> object is used to send a self-defining
388     * stream of primitive values in the Java programming language.
389     *
390     * @return an ActiveMQStreamMessage
391     * @throws JMSException if the JMS provider fails to create this message due
392     *                 to some internal error.
393     */
394    public StreamMessage createStreamMessage() throws JMSException {
395        ActiveMQStreamMessage message = new ActiveMQStreamMessage();
396        configureMessage(message);
397        return message;
398    }
399
400    /**
401     * Creates a <CODE>TextMessage</CODE> object. A <CODE>TextMessage</CODE>
402     * object is used to send a message containing a <CODE>String</CODE>
403     * object.
404     *
405     * @return an ActiveMQTextMessage
406     * @throws JMSException if the JMS provider fails to create this message due
407     *                 to some internal error.
408     */
409    public TextMessage createTextMessage() throws JMSException {
410        ActiveMQTextMessage message = new ActiveMQTextMessage();
411        configureMessage(message);
412        return message;
413    }
414
415    /**
416     * Creates an initialized <CODE>TextMessage</CODE> object. A
417     * <CODE>TextMessage</CODE> object is used to send a message containing a
418     * <CODE>String</CODE>.
419     *
420     * @param text the string used to initialize this message
421     * @return an ActiveMQTextMessage
422     * @throws JMSException if the JMS provider fails to create this message due
423     *                 to some internal error.
424     */
425    public TextMessage createTextMessage(String text) throws JMSException {
426        ActiveMQTextMessage message = new ActiveMQTextMessage();
427        message.setText(text);
428        configureMessage(message);
429        return message;
430    }
431
432    /**
433     * Creates an initialized <CODE>BlobMessage</CODE> object. A
434     * <CODE>BlobMessage</CODE> object is used to send a message containing a
435     * <CODE>URL</CODE> which points to some network addressible BLOB.
436     *
437     * @param url the network addressable URL used to pass directly to the
438     *                consumer
439     * @return a BlobMessage
440     * @throws JMSException if the JMS provider fails to create this message due
441     *                 to some internal error.
442     */
443    public BlobMessage createBlobMessage(URL url) throws JMSException {
444        return createBlobMessage(url, false);
445    }
446
447    /**
448     * Creates an initialized <CODE>BlobMessage</CODE> object. A
449     * <CODE>BlobMessage</CODE> object is used to send a message containing a
450     * <CODE>URL</CODE> which points to some network addressible BLOB.
451     *
452     * @param url the network addressable URL used to pass directly to the
453     *                consumer
454     * @param deletedByBroker indicates whether or not the resource is deleted
455     *                by the broker when the message is acknowledged
456     * @return a BlobMessage
457     * @throws JMSException if the JMS provider fails to create this message due
458     *                 to some internal error.
459     */
460    public BlobMessage createBlobMessage(URL url, boolean deletedByBroker) throws JMSException {
461        ActiveMQBlobMessage message = new ActiveMQBlobMessage();
462        configureMessage(message);
463        message.setURL(url);
464        message.setDeletedByBroker(deletedByBroker);
465        message.setBlobDownloader(new BlobDownloader(getBlobTransferPolicy()));
466        return message;
467    }
468
469    /**
470     * Creates an initialized <CODE>BlobMessage</CODE> object. A
471     * <CODE>BlobMessage</CODE> object is used to send a message containing
472     * the <CODE>File</CODE> content. Before the message is sent the file
473     * conent will be uploaded to the broker or some other remote repository
474     * depending on the {@link #getBlobTransferPolicy()}.
475     *
476     * @param file the file to be uploaded to some remote repo (or the broker)
477     *                depending on the strategy
478     * @return a BlobMessage
479     * @throws JMSException if the JMS provider fails to create this message due
480     *                 to some internal error.
481     */
482    public BlobMessage createBlobMessage(File file) throws JMSException {
483        ActiveMQBlobMessage message = new ActiveMQBlobMessage();
484        configureMessage(message);
485        message.setBlobUploader(new BlobUploader(getBlobTransferPolicy(), file));
486        message.setBlobDownloader(new BlobDownloader((getBlobTransferPolicy())));
487        message.setDeletedByBroker(true);
488        message.setName(file.getName());
489        return message;
490    }
491
492    /**
493     * Creates an initialized <CODE>BlobMessage</CODE> object. A
494     * <CODE>BlobMessage</CODE> object is used to send a message containing
495     * the <CODE>File</CODE> content. Before the message is sent the file
496     * conent will be uploaded to the broker or some other remote repository
497     * depending on the {@link #getBlobTransferPolicy()}.
498     *
499     * @param in the stream to be uploaded to some remote repo (or the broker)
500     *                depending on the strategy
501     * @return a BlobMessage
502     * @throws JMSException if the JMS provider fails to create this message due
503     *                 to some internal error.
504     */
505    public BlobMessage createBlobMessage(InputStream in) throws JMSException {
506        ActiveMQBlobMessage message = new ActiveMQBlobMessage();
507        configureMessage(message);
508        message.setBlobUploader(new BlobUploader(getBlobTransferPolicy(), in));
509        message.setBlobDownloader(new BlobDownloader(getBlobTransferPolicy()));
510        message.setDeletedByBroker(true);
511        return message;
512    }
513
514    /**
515     * Indicates whether the session is in transacted mode.
516     *
517     * @return true if the session is in transacted mode
518     * @throws JMSException if there is some internal error.
519     */
520    public boolean getTransacted() throws JMSException {
521        checkClosed();
522        return isTransacted();
523    }
524
525    /**
526     * Returns the acknowledgement mode of the session. The acknowledgement mode
527     * is set at the time that the session is created. If the session is
528     * transacted, the acknowledgement mode is ignored.
529     *
530     * @return If the session is not transacted, returns the current
531     *         acknowledgement mode for the session. If the session is
532     *         transacted, returns SESSION_TRANSACTED.
533     * @throws JMSException
534     * @see javax.jms.Connection#createSession(boolean,int)
535     * @since 1.1 exception JMSException if there is some internal error.
536     */
537    public int getAcknowledgeMode() throws JMSException {
538        checkClosed();
539        return this.acknowledgementMode;
540    }
541
542    /**
543     * Commits all messages done in this transaction and releases any locks
544     * currently held.
545     *
546     * @throws JMSException if the JMS provider fails to commit the transaction
547     *                 due to some internal error.
548     * @throws TransactionRolledBackException if the transaction is rolled back
549     *                 due to some internal error during commit.
550     * @throws javax.jms.IllegalStateException if the method is not called by a
551     *                 transacted session.
552     */
553    public void commit() throws JMSException {
554        checkClosed();
555        if (!getTransacted()) {
556            throw new javax.jms.IllegalStateException("Not a transacted session");
557        }
558        if (LOG.isDebugEnabled()) {
559            LOG.debug(getSessionId() + " Transaction Commit :" + transactionContext.getTransactionId());
560        }
561        transactionContext.commit();
562    }
563
564    /**
565     * Rolls back any messages done in this transaction and releases any locks
566     * currently held.
567     *
568     * @throws JMSException if the JMS provider fails to roll back the
569     *                 transaction due to some internal error.
570     * @throws javax.jms.IllegalStateException if the method is not called by a
571     *                 transacted session.
572     */
573    public void rollback() throws JMSException {
574        checkClosed();
575        if (!getTransacted()) {
576            throw new javax.jms.IllegalStateException("Not a transacted session");
577        }
578        if (LOG.isDebugEnabled()) {
579            LOG.debug(getSessionId() + " Transaction Rollback, txid:"  + transactionContext.getTransactionId());
580        }
581        transactionContext.rollback();
582    }
583
584    /**
585     * Closes the session.
586     * <P>
587     * Since a provider may allocate some resources on behalf of a session
588     * outside the JVM, clients should close the resources when they are not
589     * needed. Relying on garbage collection to eventually reclaim these
590     * resources may not be timely enough.
591     * <P>
592     * There is no need to close the producers and consumers of a closed
593     * session.
594     * <P>
595     * This call will block until a <CODE>receive</CODE> call or message
596     * listener in progress has completed. A blocked message consumer
597     * <CODE>receive</CODE> call returns <CODE>null</CODE> when this session
598     * is closed.
599     * <P>
600     * Closing a transacted session must roll back the transaction in progress.
601     * <P>
602     * This method is the only <CODE>Session</CODE> method that can be called
603     * concurrently.
604     * <P>
605     * Invoking any other <CODE>Session</CODE> method on a closed session must
606     * throw a <CODE> JMSException.IllegalStateException</CODE>. Closing a
607     * closed session must <I>not </I> throw an exception.
608     *
609     * @throws JMSException if the JMS provider fails to close the session due
610     *                 to some internal error.
611     */
612    public void close() throws JMSException {
613        if (!closed) {
614            if (getTransactionContext().isInXATransaction()) {
615                if (!synchronizationRegistered) {
616                    synchronizationRegistered = true;
617                    getTransactionContext().addSynchronization(new Synchronization() {
618
619                                        @Override
620                                        public void afterCommit() throws Exception {
621                                            doClose();
622                                            synchronizationRegistered = false;
623                                        }
624
625                                        @Override
626                                        public void afterRollback() throws Exception {
627                                            doClose();
628                                            synchronizationRegistered = false;
629                                        }
630                                    });
631                }
632
633            } else {
634                doClose();
635            }
636        }
637    }
638
639    private void doClose() throws JMSException {
640        boolean interrupted = Thread.interrupted();
641        dispose();
642        RemoveInfo removeCommand = info.createRemoveCommand();
643        removeCommand.setLastDeliveredSequenceId(lastDeliveredSequenceId);
644        connection.asyncSendPacket(removeCommand);
645        if (interrupted) {
646            Thread.currentThread().interrupt();
647        }
648    }
649
650    void clearMessagesInProgress() {
651        executor.clearMessagesInProgress();
652        // we are called from inside the transport reconnection logic
653        // which involves us clearing all the connections' consumers
654        // dispatch and delivered lists. So rather than trying to
655        // grab a mutex (which could be already owned by the message
656        // listener calling the send or an ack) we allow it to complete in
657        // a separate thread via the scheduler and notify us via
658        // connection.transportInterruptionProcessingComplete()
659        //
660        for (final ActiveMQMessageConsumer consumer : consumers) {
661            consumer.inProgressClearRequired();
662            try {
663                connection.getScheduler().executeAfterDelay(new Runnable() {
664                    public void run() {
665                        consumer.clearMessagesInProgress();
666                    }}, 0l);
667            } catch (JMSException e) {
668                connection.onClientInternalException(e);
669            }
670        }
671    }
672
673    void deliverAcks() {
674        for (Iterator<ActiveMQMessageConsumer> iter = consumers.iterator(); iter.hasNext();) {
675            ActiveMQMessageConsumer consumer = iter.next();
676            consumer.deliverAcks();
677        }
678    }
679
680    public synchronized void dispose() throws JMSException {
681        if (!closed) {
682
683            try {
684                executor.stop();
685
686                for (Iterator<ActiveMQMessageConsumer> iter = consumers.iterator(); iter.hasNext();) {
687                    ActiveMQMessageConsumer consumer = iter.next();
688                    consumer.setFailureError(connection.getFirstFailureError());
689                    consumer.dispose();
690                    lastDeliveredSequenceId = Math.max(lastDeliveredSequenceId, consumer.getLastDeliveredSequenceId());
691                }
692                consumers.clear();
693
694                for (Iterator<ActiveMQMessageProducer> iter = producers.iterator(); iter.hasNext();) {
695                    ActiveMQMessageProducer producer = iter.next();
696                    producer.dispose();
697                }
698                producers.clear();
699
700                try {
701                    if (getTransactionContext().isInLocalTransaction()) {
702                        rollback();
703                    }
704                } catch (JMSException e) {
705                }
706
707            } finally {
708                connection.removeSession(this);
709                this.transactionContext = null;
710                closed = true;
711            }
712        }
713    }
714
715    /**
716     * Checks that the session is not closed then configures the message
717     */
718    protected void configureMessage(ActiveMQMessage message) throws IllegalStateException {
719        checkClosed();
720        message.setConnection(connection);
721    }
722
723    /**
724     * Check if the session is closed. It is used for ensuring that the session
725     * is open before performing various operations.
726     *
727     * @throws IllegalStateException if the Session is closed
728     */
729    protected void checkClosed() throws IllegalStateException {
730        if (closed) {
731            throw new IllegalStateException("The Session is closed");
732        }
733    }
734
735    /**
736     * Checks if the session is closed.
737     *
738     * @return true if the session is closed, false otherwise.
739     */
740    public boolean isClosed() {
741        return closed;
742    }
743
744    /**
745     * Stops message delivery in this session, and restarts message delivery
746     * with the oldest unacknowledged message.
747     * <P>
748     * All consumers deliver messages in a serial order. Acknowledging a
749     * received message automatically acknowledges all messages that have been
750     * delivered to the client.
751     * <P>
752     * Restarting a session causes it to take the following actions:
753     * <UL>
754     * <LI>Stop message delivery
755     * <LI>Mark all messages that might have been delivered but not
756     * acknowledged as "redelivered"
757     * <LI>Restart the delivery sequence including all unacknowledged messages
758     * that had been previously delivered. Redelivered messages do not have to
759     * be delivered in exactly their original delivery order.
760     * </UL>
761     *
762     * @throws JMSException if the JMS provider fails to stop and restart
763     *                 message delivery due to some internal error.
764     * @throws IllegalStateException if the method is called by a transacted
765     *                 session.
766     */
767    public void recover() throws JMSException {
768
769        checkClosed();
770        if (getTransacted()) {
771            throw new IllegalStateException("This session is transacted");
772        }
773
774        for (Iterator<ActiveMQMessageConsumer> iter = consumers.iterator(); iter.hasNext();) {
775            ActiveMQMessageConsumer c = iter.next();
776            c.rollback();
777        }
778
779    }
780
781    /**
782     * Returns the session's distinguished message listener (optional).
783     *
784     * @return the message listener associated with this session
785     * @throws JMSException if the JMS provider fails to get the message
786     *                 listener due to an internal error.
787     * @see javax.jms.Session#setMessageListener(javax.jms.MessageListener)
788     * @see javax.jms.ServerSessionPool
789     * @see javax.jms.ServerSession
790     */
791    public MessageListener getMessageListener() throws JMSException {
792        checkClosed();
793        return this.messageListener;
794    }
795
796    /**
797     * Sets the session's distinguished message listener (optional).
798     * <P>
799     * When the distinguished message listener is set, no other form of message
800     * receipt in the session can be used; however, all forms of sending
801     * messages are still supported.
802     * <P>
803     * This is an expert facility not used by regular JMS clients.
804     *
805     * @param listener the message listener to associate with this session
806     * @throws JMSException if the JMS provider fails to set the message
807     *                 listener due to an internal error.
808     * @see javax.jms.Session#getMessageListener()
809     * @see javax.jms.ServerSessionPool
810     * @see javax.jms.ServerSession
811     */
812    public void setMessageListener(MessageListener listener) throws JMSException {
813        checkClosed();
814        this.messageListener = listener;
815
816        if (listener != null) {
817            executor.setDispatchedBySessionPool(true);
818        }
819    }
820
821    /**
822     * Optional operation, intended to be used only by Application Servers, not
823     * by ordinary JMS clients.
824     *
825     * @see javax.jms.ServerSession
826     */
827    public void run() {
828        MessageDispatch messageDispatch;
829        while ((messageDispatch = executor.dequeueNoWait()) != null) {
830            final MessageDispatch md = messageDispatch;
831            ActiveMQMessage message = (ActiveMQMessage)md.getMessage();
832            if (message.isExpired() || connection.isDuplicate(ActiveMQSession.this, message)) {
833                // TODO: Ack it without delivery to client
834                continue;
835            }
836
837            if (isClientAcknowledge()||isIndividualAcknowledge()) {
838                message.setAcknowledgeCallback(new Callback() {
839                    public void execute() throws Exception {
840                    }
841                });
842            }
843
844            if (deliveryListener != null) {
845                deliveryListener.beforeDelivery(this, message);
846            }
847
848            md.setDeliverySequenceId(getNextDeliveryId());
849
850            try {
851                messageListener.onMessage(message);
852            } catch (RuntimeException e) {
853                LOG.error("error dispatching message: ", e);
854                // A problem while invoking the MessageListener does not
855                // in general indicate a problem with the connection to the broker, i.e.
856                // it will usually be sufficient to let the afterDelivery() method either
857                // commit or roll back in order to deal with the exception.
858                // However, we notify any registered client internal exception listener
859                // of the problem.
860                connection.onClientInternalException(e);
861            }
862
863            try {
864                MessageAck ack = new MessageAck(md, MessageAck.STANDARD_ACK_TYPE, 1);
865                ack.setFirstMessageId(md.getMessage().getMessageId());
866                doStartTransaction();
867                ack.setTransactionId(getTransactionContext().getTransactionId());
868                if (ack.getTransactionId() != null) {
869                    getTransactionContext().addSynchronization(new Synchronization() {
870
871                        @Override
872                        public void afterRollback() throws Exception {
873                            md.getMessage().onMessageRolledBack();
874                            // ensure we don't filter this as a duplicate
875                            connection.rollbackDuplicate(ActiveMQSession.this, md.getMessage());
876                            RedeliveryPolicy redeliveryPolicy = connection.getRedeliveryPolicy();
877                            int redeliveryCounter = md.getMessage().getRedeliveryCounter();
878                            if (redeliveryPolicy.getMaximumRedeliveries() != RedeliveryPolicy.NO_MAXIMUM_REDELIVERIES
879                                && redeliveryCounter > redeliveryPolicy.getMaximumRedeliveries()) {
880                                // We need to NACK the messages so that they get
881                                // sent to the
882                                // DLQ.
883                                // Acknowledge the last message.
884                                MessageAck ack = new MessageAck(md, MessageAck.POSION_ACK_TYPE, 1);
885                                ack.setFirstMessageId(md.getMessage().getMessageId());
886                                asyncSendPacket(ack);
887                            } else {
888
889                                MessageAck ack = new MessageAck(md, MessageAck.REDELIVERED_ACK_TYPE, 1);
890                                ack.setFirstMessageId(md.getMessage().getMessageId());
891                                asyncSendPacket(ack);
892
893                                // Figure out how long we should wait to resend
894                                // this message.
895                                long redeliveryDelay = redeliveryPolicy.getInitialRedeliveryDelay();
896                                for (int i = 0; i < redeliveryCounter; i++) {
897                                    redeliveryDelay = redeliveryPolicy.getNextRedeliveryDelay(redeliveryDelay);
898                                }
899                                connection.getScheduler().executeAfterDelay(new Runnable() {
900
901                                    public void run() {
902                                        ((ActiveMQDispatcher)md.getConsumer()).dispatch(md);
903                                    }
904                                }, redeliveryDelay);
905                            }
906                        }
907                    });
908                }
909                asyncSendPacket(ack);
910            } catch (Throwable e) {
911                connection.onClientInternalException(e);
912            }
913
914            if (deliveryListener != null) {
915                deliveryListener.afterDelivery(this, message);
916            }
917        }
918    }
919
920    /**
921     * Creates a <CODE>MessageProducer</CODE> to send messages to the
922     * specified destination.
923     * <P>
924     * A client uses a <CODE>MessageProducer</CODE> object to send messages to
925     * a destination. Since <CODE>Queue </CODE> and <CODE>Topic</CODE> both
926     * inherit from <CODE>Destination</CODE>, they can be used in the
927     * destination parameter to create a <CODE>MessageProducer</CODE> object.
928     *
929     * @param destination the <CODE>Destination</CODE> to send to, or null if
930     *                this is a producer which does not have a specified
931     *                destination.
932     * @return the MessageProducer
933     * @throws JMSException if the session fails to create a MessageProducer due
934     *                 to some internal error.
935     * @throws InvalidDestinationException if an invalid destination is
936     *                 specified.
937     * @since 1.1
938     */
939    public MessageProducer createProducer(Destination destination) throws JMSException {
940        checkClosed();
941        if (destination instanceof CustomDestination) {
942            CustomDestination customDestination = (CustomDestination)destination;
943            return customDestination.createProducer(this);
944        }
945        int timeSendOut = connection.getSendTimeout();
946        return new ActiveMQMessageProducer(this, getNextProducerId(), ActiveMQMessageTransformation.transformDestination(destination),timeSendOut);
947    }
948
949    /**
950     * Creates a <CODE>MessageConsumer</CODE> for the specified destination.
951     * Since <CODE>Queue</CODE> and <CODE> Topic</CODE> both inherit from
952     * <CODE>Destination</CODE>, they can be used in the destination
953     * parameter to create a <CODE>MessageConsumer</CODE>.
954     *
955     * @param destination the <CODE>Destination</CODE> to access.
956     * @return the MessageConsumer
957     * @throws JMSException if the session fails to create a consumer due to
958     *                 some internal error.
959     * @throws InvalidDestinationException if an invalid destination is
960     *                 specified.
961     * @since 1.1
962     */
963    public MessageConsumer createConsumer(Destination destination) throws JMSException {
964        return createConsumer(destination, (String) null);
965    }
966
967    /**
968     * Creates a <CODE>MessageConsumer</CODE> for the specified destination,
969     * using a message selector. Since <CODE> Queue</CODE> and
970     * <CODE>Topic</CODE> both inherit from <CODE>Destination</CODE>, they
971     * can be used in the destination parameter to create a
972     * <CODE>MessageConsumer</CODE>.
973     * <P>
974     * A client uses a <CODE>MessageConsumer</CODE> object to receive messages
975     * that have been sent to a destination.
976     *
977     * @param destination the <CODE>Destination</CODE> to access
978     * @param messageSelector only messages with properties matching the message
979     *                selector expression are delivered. A value of null or an
980     *                empty string indicates that there is no message selector
981     *                for the message consumer.
982     * @return the MessageConsumer
983     * @throws JMSException if the session fails to create a MessageConsumer due
984     *                 to some internal error.
985     * @throws InvalidDestinationException if an invalid destination is
986     *                 specified.
987     * @throws InvalidSelectorException if the message selector is invalid.
988     * @since 1.1
989     */
990    public MessageConsumer createConsumer(Destination destination, String messageSelector) throws JMSException {
991        return createConsumer(destination, messageSelector, false);
992    }
993
994    /**
995     * Creates a <CODE>MessageConsumer</CODE> for the specified destination.
996     * Since <CODE>Queue</CODE> and <CODE> Topic</CODE> both inherit from
997     * <CODE>Destination</CODE>, they can be used in the destination
998     * parameter to create a <CODE>MessageConsumer</CODE>.
999     *
1000     * @param destination the <CODE>Destination</CODE> to access.
1001     * @param messageListener the listener to use for async consumption of messages
1002     * @return the MessageConsumer
1003     * @throws JMSException if the session fails to create a consumer due to
1004     *                 some internal error.
1005     * @throws InvalidDestinationException if an invalid destination is
1006     *                 specified.
1007     * @since 1.1
1008     */
1009    public MessageConsumer createConsumer(Destination destination, MessageListener messageListener) throws JMSException {
1010        return createConsumer(destination, null, messageListener);
1011    }
1012
1013    /**
1014     * Creates a <CODE>MessageConsumer</CODE> for the specified destination,
1015     * using a message selector. Since <CODE> Queue</CODE> and
1016     * <CODE>Topic</CODE> both inherit from <CODE>Destination</CODE>, they
1017     * can be used in the destination parameter to create a
1018     * <CODE>MessageConsumer</CODE>.
1019     * <P>
1020     * A client uses a <CODE>MessageConsumer</CODE> object to receive messages
1021     * that have been sent to a destination.
1022     *
1023     * @param destination the <CODE>Destination</CODE> to access
1024     * @param messageSelector only messages with properties matching the message
1025     *                selector expression are delivered. A value of null or an
1026     *                empty string indicates that there is no message selector
1027     *                for the message consumer.
1028     * @param messageListener the listener to use for async consumption of messages
1029     * @return the MessageConsumer
1030     * @throws JMSException if the session fails to create a MessageConsumer due
1031     *                 to some internal error.
1032     * @throws InvalidDestinationException if an invalid destination is
1033     *                 specified.
1034     * @throws InvalidSelectorException if the message selector is invalid.
1035     * @since 1.1
1036     */
1037    public MessageConsumer createConsumer(Destination destination, String messageSelector, MessageListener messageListener) throws JMSException {
1038        return createConsumer(destination, messageSelector, false, messageListener);
1039    }
1040
1041    /**
1042     * Creates <CODE>MessageConsumer</CODE> for the specified destination,
1043     * using a message selector. This method can specify whether messages
1044     * published by its own connection should be delivered to it, if the
1045     * destination is a topic.
1046     * <P>
1047     * Since <CODE>Queue</CODE> and <CODE>Topic</CODE> both inherit from
1048     * <CODE>Destination</CODE>, they can be used in the destination
1049     * parameter to create a <CODE>MessageConsumer</CODE>.
1050     * <P>
1051     * A client uses a <CODE>MessageConsumer</CODE> object to receive messages
1052     * that have been published to a destination.
1053     * <P>
1054     * In some cases, a connection may both publish and subscribe to a topic.
1055     * The consumer <CODE>NoLocal</CODE> attribute allows a consumer to
1056     * inhibit the delivery of messages published by its own connection. The
1057     * default value for this attribute is False. The <CODE>noLocal</CODE>
1058     * value must be supported by destinations that are topics.
1059     *
1060     * @param destination the <CODE>Destination</CODE> to access
1061     * @param messageSelector only messages with properties matching the message
1062     *                selector expression are delivered. A value of null or an
1063     *                empty string indicates that there is no message selector
1064     *                for the message consumer.
1065     * @param noLocal - if true, and the destination is a topic, inhibits the
1066     *                delivery of messages published by its own connection. The
1067     *                behavior for <CODE>NoLocal</CODE> is not specified if
1068     *                the destination is a queue.
1069     * @return the MessageConsumer
1070     * @throws JMSException if the session fails to create a MessageConsumer due
1071     *                 to some internal error.
1072     * @throws InvalidDestinationException if an invalid destination is
1073     *                 specified.
1074     * @throws InvalidSelectorException if the message selector is invalid.
1075     * @since 1.1
1076     */
1077    public MessageConsumer createConsumer(Destination destination, String messageSelector, boolean noLocal) throws JMSException {
1078        return createConsumer(destination, messageSelector, noLocal, null);
1079    }
1080
1081    /**
1082     * Creates <CODE>MessageConsumer</CODE> for the specified destination,
1083     * using a message selector. This method can specify whether messages
1084     * published by its own connection should be delivered to it, if the
1085     * destination is a topic.
1086     * <P>
1087     * Since <CODE>Queue</CODE> and <CODE>Topic</CODE> both inherit from
1088     * <CODE>Destination</CODE>, they can be used in the destination
1089     * parameter to create a <CODE>MessageConsumer</CODE>.
1090     * <P>
1091     * A client uses a <CODE>MessageConsumer</CODE> object to receive messages
1092     * that have been published to a destination.
1093     * <P>
1094     * In some cases, a connection may both publish and subscribe to a topic.
1095     * The consumer <CODE>NoLocal</CODE> attribute allows a consumer to
1096     * inhibit the delivery of messages published by its own connection. The
1097     * default value for this attribute is False. The <CODE>noLocal</CODE>
1098     * value must be supported by destinations that are topics.
1099     *
1100     * @param destination the <CODE>Destination</CODE> to access
1101     * @param messageSelector only messages with properties matching the message
1102     *                selector expression are delivered. A value of null or an
1103     *                empty string indicates that there is no message selector
1104     *                for the message consumer.
1105     * @param noLocal - if true, and the destination is a topic, inhibits the
1106     *                delivery of messages published by its own connection. The
1107     *                behavior for <CODE>NoLocal</CODE> is not specified if
1108     *                the destination is a queue.
1109     * @param messageListener the listener to use for async consumption of messages
1110     * @return the MessageConsumer
1111     * @throws JMSException if the session fails to create a MessageConsumer due
1112     *                 to some internal error.
1113     * @throws InvalidDestinationException if an invalid destination is
1114     *                 specified.
1115     * @throws InvalidSelectorException if the message selector is invalid.
1116     * @since 1.1
1117     */
1118    public MessageConsumer createConsumer(Destination destination, String messageSelector, boolean noLocal, MessageListener messageListener) throws JMSException {
1119        checkClosed();
1120
1121        if (destination instanceof CustomDestination) {
1122            CustomDestination customDestination = (CustomDestination)destination;
1123            return customDestination.createConsumer(this, messageSelector, noLocal);
1124        }
1125
1126        ActiveMQPrefetchPolicy prefetchPolicy = connection.getPrefetchPolicy();
1127        int prefetch = 0;
1128        if (destination instanceof Topic) {
1129            prefetch = prefetchPolicy.getTopicPrefetch();
1130        } else {
1131            prefetch = prefetchPolicy.getQueuePrefetch();
1132        }
1133        ActiveMQDestination activemqDestination = ActiveMQMessageTransformation.transformDestination(destination);
1134        return new ActiveMQMessageConsumer(this, getNextConsumerId(), activemqDestination, null, messageSelector,
1135                prefetch, prefetchPolicy.getMaximumPendingMessageLimit(), noLocal, false, isAsyncDispatch(), messageListener);
1136    }
1137
1138    /**
1139     * Creates a queue identity given a <CODE>Queue</CODE> name.
1140     * <P>
1141     * This facility is provided for the rare cases where clients need to
1142     * dynamically manipulate queue identity. It allows the creation of a queue
1143     * identity with a provider-specific name. Clients that depend on this
1144     * ability are not portable.
1145     * <P>
1146     * Note that this method is not for creating the physical queue. The
1147     * physical creation of queues is an administrative task and is not to be
1148     * initiated by the JMS API. The one exception is the creation of temporary
1149     * queues, which is accomplished with the <CODE>createTemporaryQueue</CODE>
1150     * method.
1151     *
1152     * @param queueName the name of this <CODE>Queue</CODE>
1153     * @return a <CODE>Queue</CODE> with the given name
1154     * @throws JMSException if the session fails to create a queue due to some
1155     *                 internal error.
1156     * @since 1.1
1157     */
1158    public Queue createQueue(String queueName) throws JMSException {
1159        checkClosed();
1160        if (queueName.startsWith(ActiveMQDestination.TEMP_DESTINATION_NAME_PREFIX)) {
1161            return new ActiveMQTempQueue(queueName);
1162        }
1163        return new ActiveMQQueue(queueName);
1164    }
1165
1166    /**
1167     * Creates a topic identity given a <CODE>Topic</CODE> name.
1168     * <P>
1169     * This facility is provided for the rare cases where clients need to
1170     * dynamically manipulate topic identity. This allows the creation of a
1171     * topic identity with a provider-specific name. Clients that depend on this
1172     * ability are not portable.
1173     * <P>
1174     * Note that this method is not for creating the physical topic. The
1175     * physical creation of topics is an administrative task and is not to be
1176     * initiated by the JMS API. The one exception is the creation of temporary
1177     * topics, which is accomplished with the <CODE>createTemporaryTopic</CODE>
1178     * method.
1179     *
1180     * @param topicName the name of this <CODE>Topic</CODE>
1181     * @return a <CODE>Topic</CODE> with the given name
1182     * @throws JMSException if the session fails to create a topic due to some
1183     *                 internal error.
1184     * @since 1.1
1185     */
1186    public Topic createTopic(String topicName) throws JMSException {
1187        checkClosed();
1188        if (topicName.startsWith(ActiveMQDestination.TEMP_DESTINATION_NAME_PREFIX)) {
1189            return new ActiveMQTempTopic(topicName);
1190        }
1191        return new ActiveMQTopic(topicName);
1192    }
1193
1194    /**
1195     * Creates a <CODE>QueueBrowser</CODE> object to peek at the messages on
1196     * the specified queue.
1197     *
1198     * @param queue the <CODE>queue</CODE> to access
1199     * @exception InvalidDestinationException if an invalid destination is
1200     *                    specified
1201     * @since 1.1
1202     */
1203    /**
1204     * Creates a durable subscriber to the specified topic.
1205     * <P>
1206     * If a client needs to receive all the messages published on a topic,
1207     * including the ones published while the subscriber is inactive, it uses a
1208     * durable <CODE>TopicSubscriber</CODE>. The JMS provider retains a
1209     * record of this durable subscription and insures that all messages from
1210     * the topic's publishers are retained until they are acknowledged by this
1211     * durable subscriber or they have expired.
1212     * <P>
1213     * Sessions with durable subscribers must always provide the same client
1214     * identifier. In addition, each client must specify a name that uniquely
1215     * identifies (within client identifier) each durable subscription it
1216     * creates. Only one session at a time can have a
1217     * <CODE>TopicSubscriber</CODE> for a particular durable subscription.
1218     * <P>
1219     * A client can change an existing durable subscription by creating a
1220     * durable <CODE>TopicSubscriber</CODE> with the same name and a new topic
1221     * and/or message selector. Changing a durable subscriber is equivalent to
1222     * unsubscribing (deleting) the old one and creating a new one.
1223     * <P>
1224     * In some cases, a connection may both publish and subscribe to a topic.
1225     * The subscriber <CODE>NoLocal</CODE> attribute allows a subscriber to
1226     * inhibit the delivery of messages published by its own connection. The
1227     * default value for this attribute is false.
1228     *
1229     * @param topic the non-temporary <CODE>Topic</CODE> to subscribe to
1230     * @param name the name used to identify this subscription
1231     * @return the TopicSubscriber
1232     * @throws JMSException if the session fails to create a subscriber due to
1233     *                 some internal error.
1234     * @throws InvalidDestinationException if an invalid topic is specified.
1235     * @since 1.1
1236     */
1237    public TopicSubscriber createDurableSubscriber(Topic topic, String name) throws JMSException {
1238        checkClosed();
1239        return createDurableSubscriber(topic, name, null, false);
1240    }
1241
1242    /**
1243     * Creates a durable subscriber to the specified topic, using a message
1244     * selector and specifying whether messages published by its own connection
1245     * should be delivered to it.
1246     * <P>
1247     * If a client needs to receive all the messages published on a topic,
1248     * including the ones published while the subscriber is inactive, it uses a
1249     * durable <CODE>TopicSubscriber</CODE>. The JMS provider retains a
1250     * record of this durable subscription and insures that all messages from
1251     * the topic's publishers are retained until they are acknowledged by this
1252     * durable subscriber or they have expired.
1253     * <P>
1254     * Sessions with durable subscribers must always provide the same client
1255     * identifier. In addition, each client must specify a name which uniquely
1256     * identifies (within client identifier) each durable subscription it
1257     * creates. Only one session at a time can have a
1258     * <CODE>TopicSubscriber</CODE> for a particular durable subscription. An
1259     * inactive durable subscriber is one that exists but does not currently
1260     * have a message consumer associated with it.
1261     * <P>
1262     * A client can change an existing durable subscription by creating a
1263     * durable <CODE>TopicSubscriber</CODE> with the same name and a new topic
1264     * and/or message selector. Changing a durable subscriber is equivalent to
1265     * unsubscribing (deleting) the old one and creating a new one.
1266     *
1267     * @param topic the non-temporary <CODE>Topic</CODE> to subscribe to
1268     * @param name the name used to identify this subscription
1269     * @param messageSelector only messages with properties matching the message
1270     *                selector expression are delivered. A value of null or an
1271     *                empty string indicates that there is no message selector
1272     *                for the message consumer.
1273     * @param noLocal if set, inhibits the delivery of messages published by its
1274     *                own connection
1275     * @return the Queue Browser
1276     * @throws JMSException if the session fails to create a subscriber due to
1277     *                 some internal error.
1278     * @throws InvalidDestinationException if an invalid topic is specified.
1279     * @throws InvalidSelectorException if the message selector is invalid.
1280     * @since 1.1
1281     */
1282    public TopicSubscriber createDurableSubscriber(Topic topic, String name, String messageSelector, boolean noLocal) throws JMSException {
1283        checkClosed();
1284
1285        if (isIndividualAcknowledge()) {
1286            throw JMSExceptionSupport.create("Cannot create a durable consumer for a Session in "+
1287                                             "INDIVIDUAL_ACKNOWLEDGE mode.", null);
1288        }
1289
1290        if (topic instanceof CustomDestination) {
1291            CustomDestination customDestination = (CustomDestination)topic;
1292            return customDestination.createDurableSubscriber(this, name, messageSelector, noLocal);
1293        }
1294
1295        connection.checkClientIDWasManuallySpecified();
1296        ActiveMQPrefetchPolicy prefetchPolicy = this.connection.getPrefetchPolicy();
1297        int prefetch = isAutoAcknowledge() && connection.isOptimizedMessageDispatch() ? prefetchPolicy.getOptimizeDurableTopicPrefetch() : prefetchPolicy.getDurableTopicPrefetch();
1298        int maxPrendingLimit = prefetchPolicy.getMaximumPendingMessageLimit();
1299        return new ActiveMQTopicSubscriber(this, getNextConsumerId(), ActiveMQMessageTransformation.transformDestination(topic), name, messageSelector, prefetch, maxPrendingLimit,
1300                                           noLocal, false, asyncDispatch);
1301    }
1302
1303    /**
1304     * Creates a <CODE>QueueBrowser</CODE> object to peek at the messages on
1305     * the specified queue.
1306     *
1307     * @param queue the <CODE>queue</CODE> to access
1308     * @return the Queue Browser
1309     * @throws JMSException if the session fails to create a browser due to some
1310     *                 internal error.
1311     * @throws InvalidDestinationException if an invalid destination is
1312     *                 specified
1313     * @since 1.1
1314     */
1315    public QueueBrowser createBrowser(Queue queue) throws JMSException {
1316        checkClosed();
1317        return createBrowser(queue, null);
1318    }
1319
1320    /**
1321     * Creates a <CODE>QueueBrowser</CODE> object to peek at the messages on
1322     * the specified queue using a message selector.
1323     *
1324     * @param queue the <CODE>queue</CODE> to access
1325     * @param messageSelector only messages with properties matching the message
1326     *                selector expression are delivered. A value of null or an
1327     *                empty string indicates that there is no message selector
1328     *                for the message consumer.
1329     * @return the Queue Browser
1330     * @throws JMSException if the session fails to create a browser due to some
1331     *                 internal error.
1332     * @throws InvalidDestinationException if an invalid destination is
1333     *                 specified
1334     * @throws InvalidSelectorException if the message selector is invalid.
1335     * @since 1.1
1336     */
1337    public QueueBrowser createBrowser(Queue queue, String messageSelector) throws JMSException {
1338        checkClosed();
1339        return new ActiveMQQueueBrowser(this, getNextConsumerId(), ActiveMQMessageTransformation.transformDestination(queue), messageSelector, asyncDispatch);
1340    }
1341
1342    /**
1343     * Creates a <CODE>TemporaryQueue</CODE> object. Its lifetime will be that
1344     * of the <CODE>Connection</CODE> unless it is deleted earlier.
1345     *
1346     * @return a temporary queue identity
1347     * @throws JMSException if the session fails to create a temporary queue due
1348     *                 to some internal error.
1349     * @since 1.1
1350     */
1351    public TemporaryQueue createTemporaryQueue() throws JMSException {
1352        checkClosed();
1353        return (TemporaryQueue)connection.createTempDestination(false);
1354    }
1355
1356    /**
1357     * Creates a <CODE>TemporaryTopic</CODE> object. Its lifetime will be that
1358     * of the <CODE>Connection</CODE> unless it is deleted earlier.
1359     *
1360     * @return a temporary topic identity
1361     * @throws JMSException if the session fails to create a temporary topic due
1362     *                 to some internal error.
1363     * @since 1.1
1364     */
1365    public TemporaryTopic createTemporaryTopic() throws JMSException {
1366        checkClosed();
1367        return (TemporaryTopic)connection.createTempDestination(true);
1368    }
1369
1370    /**
1371     * Creates a <CODE>QueueReceiver</CODE> object to receive messages from
1372     * the specified queue.
1373     *
1374     * @param queue the <CODE>Queue</CODE> to access
1375     * @return
1376     * @throws JMSException if the session fails to create a receiver due to
1377     *                 some internal error.
1378     * @throws JMSException
1379     * @throws InvalidDestinationException if an invalid queue is specified.
1380     */
1381    public QueueReceiver createReceiver(Queue queue) throws JMSException {
1382        checkClosed();
1383        return createReceiver(queue, null);
1384    }
1385
1386    /**
1387     * Creates a <CODE>QueueReceiver</CODE> object to receive messages from
1388     * the specified queue using a message selector.
1389     *
1390     * @param queue the <CODE>Queue</CODE> to access
1391     * @param messageSelector only messages with properties matching the message
1392     *                selector expression are delivered. A value of null or an
1393     *                empty string indicates that there is no message selector
1394     *                for the message consumer.
1395     * @return QueueReceiver
1396     * @throws JMSException if the session fails to create a receiver due to
1397     *                 some internal error.
1398     * @throws InvalidDestinationException if an invalid queue is specified.
1399     * @throws InvalidSelectorException if the message selector is invalid.
1400     */
1401    public QueueReceiver createReceiver(Queue queue, String messageSelector) throws JMSException {
1402        checkClosed();
1403
1404        if (queue instanceof CustomDestination) {
1405            CustomDestination customDestination = (CustomDestination)queue;
1406            return customDestination.createReceiver(this, messageSelector);
1407        }
1408
1409        ActiveMQPrefetchPolicy prefetchPolicy = this.connection.getPrefetchPolicy();
1410        return new ActiveMQQueueReceiver(this, getNextConsumerId(), ActiveMQMessageTransformation.transformDestination(queue), messageSelector, prefetchPolicy.getQueuePrefetch(),
1411                                         prefetchPolicy.getMaximumPendingMessageLimit(), asyncDispatch);
1412    }
1413
1414    /**
1415     * Creates a <CODE>QueueSender</CODE> object to send messages to the
1416     * specified queue.
1417     *
1418     * @param queue the <CODE>Queue</CODE> to access, or null if this is an
1419     *                unidentified producer
1420     * @return QueueSender
1421     * @throws JMSException if the session fails to create a sender due to some
1422     *                 internal error.
1423     * @throws InvalidDestinationException if an invalid queue is specified.
1424     */
1425    public QueueSender createSender(Queue queue) throws JMSException {
1426        checkClosed();
1427        if (queue instanceof CustomDestination) {
1428            CustomDestination customDestination = (CustomDestination)queue;
1429            return customDestination.createSender(this);
1430        }
1431        int timeSendOut = connection.getSendTimeout();
1432        return new ActiveMQQueueSender(this, ActiveMQMessageTransformation.transformDestination(queue),timeSendOut);
1433    }
1434
1435    /**
1436     * Creates a nondurable subscriber to the specified topic. <p/>
1437     * <P>
1438     * A client uses a <CODE>TopicSubscriber</CODE> object to receive messages
1439     * that have been published to a topic. <p/>
1440     * <P>
1441     * Regular <CODE>TopicSubscriber</CODE> objects are not durable. They
1442     * receive only messages that are published while they are active. <p/>
1443     * <P>
1444     * In some cases, a connection may both publish and subscribe to a topic.
1445     * The subscriber <CODE>NoLocal</CODE> attribute allows a subscriber to
1446     * inhibit the delivery of messages published by its own connection. The
1447     * default value for this attribute is false.
1448     *
1449     * @param topic the <CODE>Topic</CODE> to subscribe to
1450     * @return TopicSubscriber
1451     * @throws JMSException if the session fails to create a subscriber due to
1452     *                 some internal error.
1453     * @throws InvalidDestinationException if an invalid topic is specified.
1454     */
1455    public TopicSubscriber createSubscriber(Topic topic) throws JMSException {
1456        checkClosed();
1457        return createSubscriber(topic, null, false);
1458    }
1459
1460    /**
1461     * Creates a nondurable subscriber to the specified topic, using a message
1462     * selector or specifying whether messages published by its own connection
1463     * should be delivered to it. <p/>
1464     * <P>
1465     * A client uses a <CODE>TopicSubscriber</CODE> object to receive messages
1466     * that have been published to a topic. <p/>
1467     * <P>
1468     * Regular <CODE>TopicSubscriber</CODE> objects are not durable. They
1469     * receive only messages that are published while they are active. <p/>
1470     * <P>
1471     * Messages filtered out by a subscriber's message selector will never be
1472     * delivered to the subscriber. From the subscriber's perspective, they do
1473     * not exist. <p/>
1474     * <P>
1475     * In some cases, a connection may both publish and subscribe to a topic.
1476     * The subscriber <CODE>NoLocal</CODE> attribute allows a subscriber to
1477     * inhibit the delivery of messages published by its own connection. The
1478     * default value for this attribute is false.
1479     *
1480     * @param topic the <CODE>Topic</CODE> to subscribe to
1481     * @param messageSelector only messages with properties matching the message
1482     *                selector expression are delivered. A value of null or an
1483     *                empty string indicates that there is no message selector
1484     *                for the message consumer.
1485     * @param noLocal if set, inhibits the delivery of messages published by its
1486     *                own connection
1487     * @return TopicSubscriber
1488     * @throws JMSException if the session fails to create a subscriber due to
1489     *                 some internal error.
1490     * @throws InvalidDestinationException if an invalid topic is specified.
1491     * @throws InvalidSelectorException if the message selector is invalid.
1492     */
1493    public TopicSubscriber createSubscriber(Topic topic, String messageSelector, boolean noLocal) throws JMSException {
1494        checkClosed();
1495
1496        if (topic instanceof CustomDestination) {
1497            CustomDestination customDestination = (CustomDestination)topic;
1498            return customDestination.createSubscriber(this, messageSelector, noLocal);
1499        }
1500
1501        ActiveMQPrefetchPolicy prefetchPolicy = this.connection.getPrefetchPolicy();
1502        return new ActiveMQTopicSubscriber(this, getNextConsumerId(), ActiveMQMessageTransformation.transformDestination(topic), null, messageSelector, prefetchPolicy
1503            .getTopicPrefetch(), prefetchPolicy.getMaximumPendingMessageLimit(), noLocal, false, asyncDispatch);
1504    }
1505
1506    /**
1507     * Creates a publisher for the specified topic. <p/>
1508     * <P>
1509     * A client uses a <CODE>TopicPublisher</CODE> object to publish messages
1510     * on a topic. Each time a client creates a <CODE>TopicPublisher</CODE> on
1511     * a topic, it defines a new sequence of messages that have no ordering
1512     * relationship with the messages it has previously sent.
1513     *
1514     * @param topic the <CODE>Topic</CODE> to publish to, or null if this is
1515     *                an unidentified producer
1516     * @return TopicPublisher
1517     * @throws JMSException if the session fails to create a publisher due to
1518     *                 some internal error.
1519     * @throws InvalidDestinationException if an invalid topic is specified.
1520     */
1521    public TopicPublisher createPublisher(Topic topic) throws JMSException {
1522        checkClosed();
1523
1524        if (topic instanceof CustomDestination) {
1525            CustomDestination customDestination = (CustomDestination)topic;
1526            return customDestination.createPublisher(this);
1527        }
1528        int timeSendOut = connection.getSendTimeout();
1529        return new ActiveMQTopicPublisher(this, ActiveMQMessageTransformation.transformDestination(topic),timeSendOut);
1530    }
1531
1532    /**
1533     * Unsubscribes a durable subscription that has been created by a client.
1534     * <P>
1535     * This method deletes the state being maintained on behalf of the
1536     * subscriber by its provider.
1537     * <P>
1538     * It is erroneous for a client to delete a durable subscription while there
1539     * is an active <CODE>MessageConsumer </CODE> or
1540     * <CODE>TopicSubscriber</CODE> for the subscription, or while a consumed
1541     * message is part of a pending transaction or has not been acknowledged in
1542     * the session.
1543     *
1544     * @param name the name used to identify this subscription
1545     * @throws JMSException if the session fails to unsubscribe to the durable
1546     *                 subscription due to some internal error.
1547     * @throws InvalidDestinationException if an invalid subscription name is
1548     *                 specified.
1549     * @since 1.1
1550     */
1551    public void unsubscribe(String name) throws JMSException {
1552        checkClosed();
1553        connection.unsubscribe(name);
1554    }
1555
1556    public void dispatch(MessageDispatch messageDispatch) {
1557        try {
1558            executor.execute(messageDispatch);
1559        } catch (InterruptedException e) {
1560            Thread.currentThread().interrupt();
1561            connection.onClientInternalException(e);
1562        }
1563    }
1564
1565    /**
1566     * Acknowledges all consumed messages of the session of this consumed
1567     * message.
1568     * <P>
1569     * All consumed JMS messages support the <CODE>acknowledge</CODE> method
1570     * for use when a client has specified that its JMS session's consumed
1571     * messages are to be explicitly acknowledged. By invoking
1572     * <CODE>acknowledge</CODE> on a consumed message, a client acknowledges
1573     * all messages consumed by the session that the message was delivered to.
1574     * <P>
1575     * Calls to <CODE>acknowledge</CODE> are ignored for both transacted
1576     * sessions and sessions specified to use implicit acknowledgement modes.
1577     * <P>
1578     * A client may individually acknowledge each message as it is consumed, or
1579     * it may choose to acknowledge messages as an application-defined group
1580     * (which is done by calling acknowledge on the last received message of the
1581     * group, thereby acknowledging all messages consumed by the session.)
1582     * <P>
1583     * Messages that have been received but not acknowledged may be redelivered.
1584     *
1585     * @throws JMSException if the JMS provider fails to acknowledge the
1586     *                 messages due to some internal error.
1587     * @throws javax.jms.IllegalStateException if this method is called on a
1588     *                 closed session.
1589     * @see javax.jms.Session#CLIENT_ACKNOWLEDGE
1590     */
1591    public void acknowledge() throws JMSException {
1592        for (Iterator<ActiveMQMessageConsumer> iter = consumers.iterator(); iter.hasNext();) {
1593            ActiveMQMessageConsumer c = iter.next();
1594            c.acknowledge();
1595        }
1596    }
1597
1598    /**
1599     * Add a message consumer.
1600     *
1601     * @param consumer - message consumer.
1602     * @throws JMSException
1603     */
1604    protected void addConsumer(ActiveMQMessageConsumer consumer) throws JMSException {
1605        this.consumers.add(consumer);
1606        if (consumer.isDurableSubscriber()) {
1607            stats.onCreateDurableSubscriber();
1608        }
1609        this.connection.addDispatcher(consumer.getConsumerId(), this);
1610    }
1611
1612    /**
1613     * Remove the message consumer.
1614     *
1615     * @param consumer - consumer to be removed.
1616     * @throws JMSException
1617     */
1618    protected void removeConsumer(ActiveMQMessageConsumer consumer) {
1619        this.connection.removeDispatcher(consumer.getConsumerId());
1620        if (consumer.isDurableSubscriber()) {
1621            stats.onRemoveDurableSubscriber();
1622        }
1623        this.consumers.remove(consumer);
1624        this.connection.removeDispatcher(consumer);
1625    }
1626
1627    /**
1628     * Adds a message producer.
1629     *
1630     * @param producer - message producer to be added.
1631     * @throws JMSException
1632     */
1633    protected void addProducer(ActiveMQMessageProducer producer) throws JMSException {
1634        this.producers.add(producer);
1635        this.connection.addProducer(producer.getProducerInfo().getProducerId(), producer);
1636    }
1637
1638    /**
1639     * Removes a message producer.
1640     *
1641     * @param producer - message producer to be removed.
1642     * @throws JMSException
1643     */
1644    protected void removeProducer(ActiveMQMessageProducer producer) {
1645        this.connection.removeProducer(producer.getProducerInfo().getProducerId());
1646        this.producers.remove(producer);
1647    }
1648
1649    /**
1650     * Start this Session.
1651     *
1652     * @throws JMSException
1653     */
1654    protected void start() throws JMSException {
1655        started.set(true);
1656        for (Iterator<ActiveMQMessageConsumer> iter = consumers.iterator(); iter.hasNext();) {
1657            ActiveMQMessageConsumer c = iter.next();
1658            c.start();
1659        }
1660        executor.start();
1661    }
1662
1663    /**
1664     * Stops this session.
1665     *
1666     * @throws JMSException
1667     */
1668    protected void stop() throws JMSException {
1669
1670        for (Iterator<ActiveMQMessageConsumer> iter = consumers.iterator(); iter.hasNext();) {
1671            ActiveMQMessageConsumer c = iter.next();
1672            c.stop();
1673        }
1674
1675        started.set(false);
1676        executor.stop();
1677    }
1678
1679    /**
1680     * Returns the session id.
1681     *
1682     * @return value - session id.
1683     */
1684    protected SessionId getSessionId() {
1685        return info.getSessionId();
1686    }
1687
1688    /**
1689     * @return
1690     */
1691    protected ConsumerId getNextConsumerId() {
1692        return new ConsumerId(info.getSessionId(), consumerIdGenerator.getNextSequenceId());
1693    }
1694
1695    /**
1696     * @return
1697     */
1698    protected ProducerId getNextProducerId() {
1699        return new ProducerId(info.getSessionId(), producerIdGenerator.getNextSequenceId());
1700    }
1701
1702    /**
1703     * Sends the message for dispatch by the broker.
1704     *
1705     *
1706     * @param producer - message producer.
1707     * @param destination - message destination.
1708     * @param message - message to be sent.
1709     * @param deliveryMode - JMS messsage delivery mode.
1710     * @param priority - message priority.
1711     * @param timeToLive - message expiration.
1712     * @param producerWindow
1713     * @param onComplete
1714     * @throws JMSException
1715     */
1716    protected void send(ActiveMQMessageProducer producer, ActiveMQDestination destination, Message message, int deliveryMode, int priority, long timeToLive,
1717                        MemoryUsage producerWindow, int sendTimeout, AsyncCallback onComplete) throws JMSException {
1718
1719        checkClosed();
1720        if (destination.isTemporary() && connection.isDeleted(destination)) {
1721            throw new InvalidDestinationException("Cannot publish to a deleted Destination: " + destination);
1722        }
1723        synchronized (sendMutex) {
1724            // tell the Broker we are about to start a new transaction
1725            doStartTransaction();
1726            TransactionId txid = transactionContext.getTransactionId();
1727            long sequenceNumber = producer.getMessageSequence();
1728
1729            //Set the "JMS" header fields on the original message, see 1.1 spec section 3.4.11
1730            message.setJMSDeliveryMode(deliveryMode);
1731            long expiration = 0L;
1732            if (!producer.getDisableMessageTimestamp()) {
1733                long timeStamp = System.currentTimeMillis();
1734                message.setJMSTimestamp(timeStamp);
1735                if (timeToLive > 0) {
1736                    expiration = timeToLive + timeStamp;
1737                }
1738            }
1739            message.setJMSExpiration(expiration);
1740            message.setJMSPriority(priority);
1741            message.setJMSRedelivered(false);
1742
1743            // transform to our own message format here
1744            ActiveMQMessage msg = ActiveMQMessageTransformation.transformMessage(message, connection);
1745
1746            // Set the message id.
1747            if (msg == message) {
1748                msg.setMessageId(new MessageId(producer.getProducerInfo().getProducerId(), sequenceNumber));
1749            } else {
1750                msg.setMessageId(new MessageId(producer.getProducerInfo().getProducerId(), sequenceNumber));
1751                message.setJMSMessageID(msg.getMessageId().toString());
1752            }
1753            //clear the brokerPath in case we are re-sending this message
1754            msg.setBrokerPath(null);
1755            // destination format is provider specific so only set on transformed message
1756            msg.setJMSDestination(destination);
1757
1758            msg.setTransactionId(txid);
1759            if (connection.isCopyMessageOnSend()) {
1760                msg = (ActiveMQMessage)msg.copy();
1761            }
1762            msg.setConnection(connection);
1763            msg.onSend();
1764            msg.setProducerId(msg.getMessageId().getProducerId());
1765            if (LOG.isTraceEnabled()) {
1766                LOG.trace(getSessionId() + " sending message: " + msg);
1767            }
1768            if (onComplete==null && sendTimeout <= 0 && !msg.isResponseRequired() && !connection.isAlwaysSyncSend() && (!msg.isPersistent() || connection.isUseAsyncSend() || txid != null)) {
1769                this.connection.asyncSendPacket(msg);
1770                if (producerWindow != null) {
1771                    // Since we defer lots of the marshaling till we hit the
1772                    // wire, this might not
1773                    // provide and accurate size. We may change over to doing
1774                    // more aggressive marshaling,
1775                    // to get more accurate sizes.. this is more important once
1776                    // users start using producer window
1777                    // flow control.
1778                    int size = msg.getSize();
1779                    producerWindow.increaseUsage(size);
1780                }
1781            } else {
1782                if (sendTimeout > 0 && onComplete==null) {
1783                    this.connection.syncSendPacket(msg,sendTimeout);
1784                }else {
1785                    this.connection.syncSendPacket(msg, onComplete);
1786                }
1787            }
1788
1789        }
1790    }
1791
1792    /**
1793     * Send TransactionInfo to indicate transaction has started
1794     *
1795     * @throws JMSException if some internal error occurs
1796     */
1797    protected void doStartTransaction() throws JMSException {
1798        if (getTransacted() && !transactionContext.isInXATransaction()) {
1799            transactionContext.begin();
1800        }
1801    }
1802
1803    /**
1804     * Checks whether the session has unconsumed messages.
1805     *
1806     * @return true - if there are unconsumed messages.
1807     */
1808    public boolean hasUncomsumedMessages() {
1809        return executor.hasUncomsumedMessages();
1810    }
1811
1812    /**
1813     * Checks whether the session uses transactions.
1814     *
1815     * @return true - if the session uses transactions.
1816     */
1817    public boolean isTransacted() {
1818        return this.acknowledgementMode == Session.SESSION_TRANSACTED || (transactionContext.isInXATransaction());
1819    }
1820
1821    /**
1822     * Checks whether the session used client acknowledgment.
1823     *
1824     * @return true - if the session uses client acknowledgment.
1825     */
1826    protected boolean isClientAcknowledge() {
1827        return this.acknowledgementMode == Session.CLIENT_ACKNOWLEDGE;
1828    }
1829
1830    /**
1831     * Checks whether the session used auto acknowledgment.
1832     *
1833     * @return true - if the session uses client acknowledgment.
1834     */
1835    public boolean isAutoAcknowledge() {
1836        return acknowledgementMode == Session.AUTO_ACKNOWLEDGE;
1837    }
1838
1839    /**
1840     * Checks whether the session used dup ok acknowledgment.
1841     *
1842     * @return true - if the session uses client acknowledgment.
1843     */
1844    public boolean isDupsOkAcknowledge() {
1845        return acknowledgementMode == Session.DUPS_OK_ACKNOWLEDGE;
1846    }
1847
1848    public boolean isIndividualAcknowledge(){
1849        return acknowledgementMode == ActiveMQSession.INDIVIDUAL_ACKNOWLEDGE;
1850    }
1851
1852    /**
1853     * Returns the message delivery listener.
1854     *
1855     * @return deliveryListener - message delivery listener.
1856     */
1857    public DeliveryListener getDeliveryListener() {
1858        return deliveryListener;
1859    }
1860
1861    /**
1862     * Sets the message delivery listener.
1863     *
1864     * @param deliveryListener - message delivery listener.
1865     */
1866    public void setDeliveryListener(DeliveryListener deliveryListener) {
1867        this.deliveryListener = deliveryListener;
1868    }
1869
1870    /**
1871     * Returns the SessionInfo bean.
1872     *
1873     * @return info - SessionInfo bean.
1874     * @throws JMSException
1875     */
1876    protected SessionInfo getSessionInfo() throws JMSException {
1877        SessionInfo info = new SessionInfo(connection.getConnectionInfo(), getSessionId().getValue());
1878        return info;
1879    }
1880
1881    /**
1882     * Send the asynchronus command.
1883     *
1884     * @param command - command to be executed.
1885     * @throws JMSException
1886     */
1887    public void asyncSendPacket(Command command) throws JMSException {
1888        connection.asyncSendPacket(command);
1889    }
1890
1891    /**
1892     * Send the synchronus command.
1893     *
1894     * @param command - command to be executed.
1895     * @return Response
1896     * @throws JMSException
1897     */
1898    public Response syncSendPacket(Command command) throws JMSException {
1899        return connection.syncSendPacket(command);
1900    }
1901
1902    public long getNextDeliveryId() {
1903        return deliveryIdGenerator.getNextSequenceId();
1904    }
1905
1906    public void redispatch(ActiveMQDispatcher dispatcher, MessageDispatchChannel unconsumedMessages) throws JMSException {
1907
1908        List<MessageDispatch> c = unconsumedMessages.removeAll();
1909        for (MessageDispatch md : c) {
1910            this.connection.rollbackDuplicate(dispatcher, md.getMessage());
1911        }
1912        Collections.reverse(c);
1913
1914        for (Iterator<MessageDispatch> iter = c.iterator(); iter.hasNext();) {
1915            MessageDispatch md = iter.next();
1916            executor.executeFirst(md);
1917        }
1918
1919    }
1920
1921    public boolean isRunning() {
1922        return started.get();
1923    }
1924
1925    public boolean isAsyncDispatch() {
1926        return asyncDispatch;
1927    }
1928
1929    public void setAsyncDispatch(boolean asyncDispatch) {
1930        this.asyncDispatch = asyncDispatch;
1931    }
1932
1933    /**
1934     * @return Returns the sessionAsyncDispatch.
1935     */
1936    public boolean isSessionAsyncDispatch() {
1937        return sessionAsyncDispatch;
1938    }
1939
1940    /**
1941     * @param sessionAsyncDispatch The sessionAsyncDispatch to set.
1942     */
1943    public void setSessionAsyncDispatch(boolean sessionAsyncDispatch) {
1944        this.sessionAsyncDispatch = sessionAsyncDispatch;
1945    }
1946
1947    public MessageTransformer getTransformer() {
1948        return transformer;
1949    }
1950
1951    public ActiveMQConnection getConnection() {
1952        return connection;
1953    }
1954
1955    /**
1956     * Sets the transformer used to transform messages before they are sent on
1957     * to the JMS bus or when they are received from the bus but before they are
1958     * delivered to the JMS client
1959     */
1960    public void setTransformer(MessageTransformer transformer) {
1961        this.transformer = transformer;
1962    }
1963
1964    public BlobTransferPolicy getBlobTransferPolicy() {
1965        return blobTransferPolicy;
1966    }
1967
1968    /**
1969     * Sets the policy used to describe how out-of-band BLOBs (Binary Large
1970     * OBjects) are transferred from producers to brokers to consumers
1971     */
1972    public void setBlobTransferPolicy(BlobTransferPolicy blobTransferPolicy) {
1973        this.blobTransferPolicy = blobTransferPolicy;
1974    }
1975
1976    public List<MessageDispatch> getUnconsumedMessages() {
1977        return executor.getUnconsumedMessages();
1978    }
1979
1980    @Override
1981    public String toString() {
1982        return "ActiveMQSession {id=" + info.getSessionId() + ",started=" + started.get() + "}";
1983    }
1984
1985    public void checkMessageListener() throws JMSException {
1986        if (messageListener != null) {
1987            throw new IllegalStateException("Cannot synchronously receive a message when a MessageListener is set");
1988        }
1989        for (Iterator<ActiveMQMessageConsumer> i = consumers.iterator(); i.hasNext();) {
1990            ActiveMQMessageConsumer consumer = i.next();
1991            if (consumer.getMessageListener() != null) {
1992                throw new IllegalStateException("Cannot synchronously receive a message when a MessageListener is set");
1993            }
1994        }
1995    }
1996
1997    protected void setOptimizeAcknowledge(boolean value) {
1998        for (Iterator<ActiveMQMessageConsumer> iter = consumers.iterator(); iter.hasNext();) {
1999            ActiveMQMessageConsumer c = iter.next();
2000            c.setOptimizeAcknowledge(value);
2001        }
2002    }
2003
2004    protected void setPrefetchSize(ConsumerId id, int prefetch) {
2005        for (Iterator<ActiveMQMessageConsumer> iter = consumers.iterator(); iter.hasNext();) {
2006            ActiveMQMessageConsumer c = iter.next();
2007            if (c.getConsumerId().equals(id)) {
2008                c.setPrefetchSize(prefetch);
2009                break;
2010            }
2011        }
2012    }
2013
2014    protected void close(ConsumerId id) {
2015        for (Iterator<ActiveMQMessageConsumer> iter = consumers.iterator(); iter.hasNext();) {
2016            ActiveMQMessageConsumer c = iter.next();
2017            if (c.getConsumerId().equals(id)) {
2018                try {
2019                    c.close();
2020                } catch (JMSException e) {
2021                    LOG.warn("Exception closing consumer", e);
2022                }
2023                LOG.warn("Closed consumer on Command");
2024                break;
2025            }
2026        }
2027    }
2028
2029    public boolean isInUse(ActiveMQTempDestination destination) {
2030        for (Iterator<ActiveMQMessageConsumer> iter = consumers.iterator(); iter.hasNext();) {
2031            ActiveMQMessageConsumer c = iter.next();
2032            if (c.isInUse(destination)) {
2033                return true;
2034            }
2035        }
2036        return false;
2037    }
2038
2039    /**
2040     * highest sequence id of the last message delivered by this session.
2041     * Passed to the broker in the close command, maintained by dispose()
2042     * @return lastDeliveredSequenceId
2043     */
2044    public long getLastDeliveredSequenceId() {
2045        return lastDeliveredSequenceId;
2046    }
2047
2048    protected void sendAck(MessageAck ack) throws JMSException {
2049        sendAck(ack,false);
2050    }
2051
2052    protected void sendAck(MessageAck ack, boolean lazy) throws JMSException {
2053        if (lazy || connection.isSendAcksAsync() || getTransacted()) {
2054            asyncSendPacket(ack);
2055        } else {
2056            syncSendPacket(ack);
2057        }
2058    }
2059
2060    protected Scheduler getScheduler() throws JMSException {
2061        return this.connection.getScheduler();
2062    }
2063
2064    protected ThreadPoolExecutor getConnectionExecutor() {
2065        return this.connectionExecutor;
2066    }
2067}