001/**
002 * Licensed to the Apache Software Foundation (ASF) under one or more
003 * contributor license agreements.  See the NOTICE file distributed with
004 * this work for additional information regarding copyright ownership.
005 * The ASF licenses this file to You under the Apache License, Version 2.0
006 * (the "License"); you may not use this file except in compliance with
007 * the License.  You may obtain a copy of the License at
008 *
009 *      http://www.apache.org/licenses/LICENSE-2.0
010 *
011 * Unless required by applicable law or agreed to in writing, software
012 * distributed under the License is distributed on an "AS IS" BASIS,
013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014 * See the License for the specific language governing permissions and
015 * limitations under the License.
016 */
017package org.apache.activemq.broker;
018
019import java.io.EOFException;
020import java.io.IOException;
021import java.net.SocketException;
022import java.net.URI;
023import java.util.HashMap;
024import java.util.Iterator;
025import java.util.LinkedList;
026import java.util.List;
027import java.util.Map;
028import java.util.Properties;
029import java.util.concurrent.ConcurrentHashMap;
030import java.util.concurrent.CopyOnWriteArrayList;
031import java.util.concurrent.CountDownLatch;
032import java.util.concurrent.TimeUnit;
033import java.util.concurrent.atomic.AtomicBoolean;
034import java.util.concurrent.atomic.AtomicInteger;
035import java.util.concurrent.atomic.AtomicReference;
036import java.util.concurrent.locks.ReentrantReadWriteLock;
037
038import javax.transaction.xa.XAResource;
039import org.apache.activemq.advisory.AdvisorySupport;
040import org.apache.activemq.broker.ft.MasterBroker;
041import org.apache.activemq.broker.region.ConnectionStatistics;
042import org.apache.activemq.broker.region.RegionBroker;
043import org.apache.activemq.command.*;
044import org.apache.activemq.network.DemandForwardingBridge;
045import org.apache.activemq.network.MBeanNetworkListener;
046import org.apache.activemq.network.NetworkBridgeConfiguration;
047import org.apache.activemq.network.NetworkBridgeFactory;
048import org.apache.activemq.security.MessageAuthorizationPolicy;
049import org.apache.activemq.state.CommandVisitor;
050import org.apache.activemq.state.ConnectionState;
051import org.apache.activemq.state.ConsumerState;
052import org.apache.activemq.state.ProducerState;
053import org.apache.activemq.state.SessionState;
054import org.apache.activemq.state.TransactionState;
055import org.apache.activemq.thread.DefaultThreadPools;
056import org.apache.activemq.thread.Task;
057import org.apache.activemq.thread.TaskRunner;
058import org.apache.activemq.thread.TaskRunnerFactory;
059import org.apache.activemq.transaction.Transaction;
060import org.apache.activemq.transport.DefaultTransportListener;
061import org.apache.activemq.transport.ResponseCorrelator;
062import org.apache.activemq.transport.Transport;
063import org.apache.activemq.transport.TransportDisposedIOException;
064import org.apache.activemq.transport.TransportFactory;
065import org.apache.activemq.util.IntrospectionSupport;
066import org.apache.activemq.util.MarshallingSupport;
067import org.apache.activemq.util.ServiceSupport;
068import org.apache.activemq.util.URISupport;
069import org.slf4j.Logger;
070import org.slf4j.LoggerFactory;
071import org.slf4j.MDC;
072
073public class TransportConnection implements Connection, Task, CommandVisitor {
074    private static final Logger LOG = LoggerFactory.getLogger(TransportConnection.class);
075    private static final Logger TRANSPORTLOG = LoggerFactory.getLogger(TransportConnection.class.getName() + ".Transport");
076    private static final Logger SERVICELOG = LoggerFactory.getLogger(TransportConnection.class.getName() + ".Service");
077    // Keeps track of the broker and connector that created this connection.
078    protected final Broker broker;
079    protected final TransportConnector connector;
080    // Keeps track of the state of the connections.
081    // protected final ConcurrentHashMap localConnectionStates=new
082    // ConcurrentHashMap();
083    protected final Map<ConnectionId, ConnectionState> brokerConnectionStates;
084    // The broker and wireformat info that was exchanged.
085    protected BrokerInfo brokerInfo;
086    protected final List<Command> dispatchQueue = new LinkedList<Command>();
087    protected TaskRunner taskRunner;
088    protected final AtomicReference<IOException> transportException = new AtomicReference<IOException>();
089    protected AtomicBoolean dispatchStopped = new AtomicBoolean(false);
090    private MasterBroker masterBroker;
091    private final Transport transport;
092    private MessageAuthorizationPolicy messageAuthorizationPolicy;
093    private WireFormatInfo wireFormatInfo;
094    // Used to do async dispatch.. this should perhaps be pushed down into the
095    // transport layer..
096    private boolean inServiceException;
097    private final ConnectionStatistics statistics = new ConnectionStatistics();
098    private boolean manageable;
099    private boolean slow;
100    private boolean markedCandidate;
101    private boolean blockedCandidate;
102    private boolean blocked;
103    private boolean connected;
104    private boolean active;
105    private boolean starting;
106    private boolean pendingStop;
107    private long timeStamp;
108    private final AtomicBoolean stopping = new AtomicBoolean(false);
109    private final CountDownLatch stopped = new CountDownLatch(1);
110    private final AtomicBoolean asyncException = new AtomicBoolean(false);
111    private final Map<ProducerId, ProducerBrokerExchange> producerExchanges = new HashMap<ProducerId, ProducerBrokerExchange>();
112    private final Map<ConsumerId, ConsumerBrokerExchange> consumerExchanges = new HashMap<ConsumerId, ConsumerBrokerExchange>();
113    private final CountDownLatch dispatchStoppedLatch = new CountDownLatch(1);
114    private ConnectionContext context;
115    private boolean networkConnection;
116    private boolean faultTolerantConnection;
117    private final AtomicInteger protocolVersion = new AtomicInteger(CommandTypes.PROTOCOL_VERSION);
118    private DemandForwardingBridge duplexBridge;
119    private final TaskRunnerFactory taskRunnerFactory;
120    private TransportConnectionStateRegister connectionStateRegister = new SingleTransportConnectionStateRegister();
121    private final ReentrantReadWriteLock serviceLock = new ReentrantReadWriteLock();
122    private String duplexNetworkConnectorId;
123    private Throwable stopError = null;
124
125    /**
126     * @param taskRunnerFactory - can be null if you want direct dispatch to the transport
127     *                          else commands are sent async.
128     */
129    public TransportConnection(TransportConnector connector, final Transport transport, Broker broker,
130                               TaskRunnerFactory taskRunnerFactory) {
131        this.connector = connector;
132        this.broker = broker;
133        this.messageAuthorizationPolicy = connector.getMessageAuthorizationPolicy();
134        RegionBroker rb = (RegionBroker) broker.getAdaptor(RegionBroker.class);
135        brokerConnectionStates = rb.getConnectionStates();
136        if (connector != null) {
137            this.statistics.setParent(connector.getStatistics());
138        }
139        this.taskRunnerFactory = taskRunnerFactory;
140        this.transport = transport;
141        this.transport.setTransportListener(new DefaultTransportListener() {
142            @Override
143            public void onCommand(Object o) {
144                serviceLock.readLock().lock();
145                try {
146                    if (!(o instanceof Command)) {
147                        throw new RuntimeException("Protocol violation - Command corrupted: " + o.toString());
148                    }
149                    Command command = (Command) o;
150                    Response response = service(command);
151                    if (response != null) {
152                        dispatchSync(response);
153                    }
154                } finally {
155                    serviceLock.readLock().unlock();
156                }
157            }
158
159            @Override
160            public void onException(IOException exception) {
161                serviceLock.readLock().lock();
162                try {
163                    serviceTransportException(exception);
164                } finally {
165                    serviceLock.readLock().unlock();
166                }
167            }
168        });
169        connected = true;
170    }
171
172    /**
173     * Returns the number of messages to be dispatched to this connection
174     *
175     * @return size of dispatch queue
176     */
177    public int getDispatchQueueSize() {
178        synchronized (dispatchQueue) {
179            return dispatchQueue.size();
180        }
181    }
182
183    public void serviceTransportException(IOException e) {
184        BrokerService bService = connector.getBrokerService();
185        if (bService.isShutdownOnSlaveFailure()) {
186            if (brokerInfo != null) {
187                if (brokerInfo.isSlaveBroker()) {
188                    LOG.error("Slave has exception: " + e.getMessage() + " shutting down master now.", e);
189                    try {
190                        doStop();
191                        bService.stop();
192                    } catch (Exception ex) {
193                        LOG.warn("Failed to stop the master", ex);
194                    }
195                }
196            }
197        }
198        if (!stopping.get()) {
199            transportException.set(e);
200            if (TRANSPORTLOG.isDebugEnabled()) {
201                TRANSPORTLOG.debug(this + " failed: " + e, e);
202            } else if (TRANSPORTLOG.isWarnEnabled() && !expected(e)) {
203                TRANSPORTLOG.warn(this + " failed: " + e);
204            }
205            stopAsync();
206        }
207    }
208
209    private boolean expected(IOException e) {
210        return isStomp() &&
211                ((e instanceof SocketException && e.getMessage().indexOf("reset") != -1) || e instanceof EOFException);
212    }
213
214    private boolean isStomp() {
215        URI uri = connector.getUri();
216        return uri != null && uri.getScheme() != null && uri.getScheme().indexOf("stomp") != -1;
217    }
218
219    /**
220     * Calls the serviceException method in an async thread. Since handling a
221     * service exception closes a socket, we should not tie up broker threads
222     * since client sockets may hang or cause deadlocks.
223     */
224    public void serviceExceptionAsync(final IOException e) {
225        if (asyncException.compareAndSet(false, true)) {
226            new Thread("Async Exception Handler") {
227                @Override
228                public void run() {
229                    serviceException(e);
230                }
231            }.start();
232        }
233    }
234
235    /**
236     * Closes a clients connection due to a detected error. Errors are ignored
237     * if: the client is closing or broker is closing. Otherwise, the connection
238     * error transmitted to the client before stopping it's transport.
239     */
240    public void serviceException(Throwable e) {
241        // are we a transport exception such as not being able to dispatch
242        // synchronously to a transport
243        if (e instanceof IOException) {
244            serviceTransportException((IOException) e);
245        } else if (e.getClass() == BrokerStoppedException.class) {
246            // Handle the case where the broker is stopped
247            // But the client is still connected.
248            if (!stopping.get()) {
249                if (SERVICELOG.isDebugEnabled()) {
250                    SERVICELOG.debug("Broker has been stopped.  Notifying client and closing his connection.");
251                }
252                ConnectionError ce = new ConnectionError();
253                ce.setException(e);
254                dispatchSync(ce);
255                // Record the error that caused the transport to stop
256                this.stopError = e;
257                // Wait a little bit to try to get the output buffer to flush
258                // the exption notification to the client.
259                try {
260                    Thread.sleep(500);
261                } catch (InterruptedException ie) {
262                    Thread.currentThread().interrupt();
263                }
264                // Worst case is we just kill the connection before the
265                // notification gets to him.
266                stopAsync();
267            }
268        } else if (!stopping.get() && !inServiceException) {
269            inServiceException = true;
270            try {
271                SERVICELOG.warn("Async error occurred: " + e, e);
272                ConnectionError ce = new ConnectionError();
273                ce.setException(e);
274                if (pendingStop) {
275                    dispatchSync(ce);
276                } else {
277                    dispatchAsync(ce);
278                }
279            } finally {
280                inServiceException = false;
281            }
282        }
283    }
284
285    public Response service(Command command) {
286        MDC.put("activemq.connector", connector.getUri().toString());
287        Response response = null;
288        boolean responseRequired = command.isResponseRequired();
289        int commandId = command.getCommandId();
290        try {
291            if (!pendingStop) {
292                response = command.visit(this);
293            } else {
294                response = new ExceptionResponse(this.stopError);
295            }
296        } catch (Throwable e) {
297            if (SERVICELOG.isDebugEnabled() && e.getClass() != BrokerStoppedException.class) {
298                SERVICELOG.debug("Error occured while processing " + (responseRequired ? "sync" : "async")
299                        + " command: " + command + ", exception: " + e, e);
300            }
301
302            if (e instanceof java.lang.SecurityException) {
303                // still need to close this down - in case the peer of this transport doesn't play nice
304                delayedStop(2000, "Failed with SecurityException: " + e.getLocalizedMessage(), e);
305            }
306
307            if (responseRequired) {
308                response = new ExceptionResponse(e);
309            } else {
310                serviceException(e);
311            }
312        }
313        if (responseRequired) {
314            if (response == null) {
315                response = new Response();
316            }
317            response.setCorrelationId(commandId);
318        }
319        // The context may have been flagged so that the response is not
320        // sent.
321        if (context != null) {
322            if (context.isDontSendReponse()) {
323                context.setDontSendReponse(false);
324                response = null;
325            }
326            context = null;
327        }
328        MDC.remove("activemq.connector");
329        return response;
330    }
331
332    public Response processKeepAlive(KeepAliveInfo info) throws Exception {
333        return null;
334    }
335
336    public Response processRemoveSubscription(RemoveSubscriptionInfo info) throws Exception {
337        broker.removeSubscription(lookupConnectionState(info.getConnectionId()).getContext(), info);
338        return null;
339    }
340
341    public Response processWireFormat(WireFormatInfo info) throws Exception {
342        wireFormatInfo = info;
343        protocolVersion.set(info.getVersion());
344        return null;
345    }
346
347    public Response processShutdown(ShutdownInfo info) throws Exception {
348        stopAsync();
349        return null;
350    }
351
352    public Response processFlush(FlushCommand command) throws Exception {
353        return null;
354    }
355
356    public Response processBeginTransaction(TransactionInfo info) throws Exception {
357        TransportConnectionState cs = lookupConnectionState(info.getConnectionId());
358        context = null;
359        if (cs != null) {
360            context = cs.getContext();
361        }
362        if (cs == null) {
363            throw new NullPointerException("Context is null");
364        }
365        // Avoid replaying dup commands
366        if (cs.getTransactionState(info.getTransactionId()) == null) {
367            cs.addTransactionState(info.getTransactionId());
368            broker.beginTransaction(context, info.getTransactionId());
369        }
370        return null;
371    }
372
373    public Response processEndTransaction(TransactionInfo info) throws Exception {
374        // No need to do anything. This packet is just sent by the client
375        // make sure he is synced with the server as commit command could
376        // come from a different connection.
377        return null;
378    }
379
380    public Response processPrepareTransaction(TransactionInfo info) throws Exception {
381        TransportConnectionState cs = lookupConnectionState(info.getConnectionId());
382        context = null;
383        if (cs != null) {
384            context = cs.getContext();
385        }
386        if (cs == null) {
387            throw new NullPointerException("Context is null");
388        }
389        TransactionState transactionState = cs.getTransactionState(info.getTransactionId());
390        if (transactionState == null) {
391            throw new IllegalStateException("Cannot prepare a transaction that had not been started or previously returned XA_RDONLY: "
392                    + info.getTransactionId());
393        }
394        // Avoid dups.
395        if (!transactionState.isPrepared()) {
396            transactionState.setPrepared(true);
397            int result = broker.prepareTransaction(context, info.getTransactionId());
398            transactionState.setPreparedResult(result);
399            if (result == XAResource.XA_RDONLY) {
400                // we are done, no further rollback or commit from TM
401                cs.removeTransactionState(info.getTransactionId());
402            }
403            IntegerResponse response = new IntegerResponse(result);
404            return response;
405        } else {
406            IntegerResponse response = new IntegerResponse(transactionState.getPreparedResult());
407            return response;
408        }
409    }
410
411    public Response processCommitTransactionOnePhase(TransactionInfo info) throws Exception {
412        TransportConnectionState cs = lookupConnectionState(info.getConnectionId());
413        context = cs.getContext();
414        cs.removeTransactionState(info.getTransactionId());
415        broker.commitTransaction(context, info.getTransactionId(), true);
416        return null;
417    }
418
419    public Response processCommitTransactionTwoPhase(TransactionInfo info) throws Exception {
420        TransportConnectionState cs = lookupConnectionState(info.getConnectionId());
421        context = cs.getContext();
422        cs.removeTransactionState(info.getTransactionId());
423        broker.commitTransaction(context, info.getTransactionId(), false);
424        return null;
425    }
426
427    public Response processRollbackTransaction(TransactionInfo info) throws Exception {
428        TransportConnectionState cs = lookupConnectionState(info.getConnectionId());
429        context = cs.getContext();
430        cs.removeTransactionState(info.getTransactionId());
431        broker.rollbackTransaction(context, info.getTransactionId());
432        return null;
433    }
434
435    public Response processForgetTransaction(TransactionInfo info) throws Exception {
436        TransportConnectionState cs = lookupConnectionState(info.getConnectionId());
437        context = cs.getContext();
438        broker.forgetTransaction(context, info.getTransactionId());
439        return null;
440    }
441
442    public Response processRecoverTransactions(TransactionInfo info) throws Exception {
443        TransportConnectionState cs = lookupConnectionState(info.getConnectionId());
444        context = cs.getContext();
445        TransactionId[] preparedTransactions = broker.getPreparedTransactions(context);
446        return new DataArrayResponse(preparedTransactions);
447    }
448
449    public Response processMessage(Message messageSend) throws Exception {
450        ProducerId producerId = messageSend.getProducerId();
451        ProducerBrokerExchange producerExchange = getProducerBrokerExchange(producerId);
452        if (producerExchange.canDispatch(messageSend)) {
453            broker.send(producerExchange, messageSend);
454        }
455        return null;
456    }
457
458    public Response processMessageAck(MessageAck ack) throws Exception {
459        ConsumerBrokerExchange consumerExchange = getConsumerBrokerExchange(ack.getConsumerId());
460        if (consumerExchange != null) {
461            broker.acknowledge(consumerExchange, ack);
462        }
463        return null;
464    }
465
466    public Response processMessagePull(MessagePull pull) throws Exception {
467        return broker.messagePull(lookupConnectionState(pull.getConsumerId()).getContext(), pull);
468    }
469
470    public Response processMessageDispatchNotification(MessageDispatchNotification notification) throws Exception {
471        broker.processDispatchNotification(notification);
472        return null;
473    }
474
475    public Response processAddDestination(DestinationInfo info) throws Exception {
476        TransportConnectionState cs = lookupConnectionState(info.getConnectionId());
477        broker.addDestinationInfo(cs.getContext(), info);
478        if (info.getDestination().isTemporary()) {
479            cs.addTempDestination(info);
480        }
481        return null;
482    }
483
484    public Response processRemoveDestination(DestinationInfo info) throws Exception {
485        TransportConnectionState cs = lookupConnectionState(info.getConnectionId());
486        broker.removeDestinationInfo(cs.getContext(), info);
487        if (info.getDestination().isTemporary()) {
488            cs.removeTempDestination(info.getDestination());
489        }
490        return null;
491    }
492
493    public Response processAddProducer(ProducerInfo info) throws Exception {
494        SessionId sessionId = info.getProducerId().getParentId();
495        ConnectionId connectionId = sessionId.getParentId();
496        TransportConnectionState cs = lookupConnectionState(connectionId);
497        if (cs == null) {
498            throw new IllegalStateException("Cannot add a producer to a connection that had not been registered: "
499                    + connectionId);
500        }
501        SessionState ss = cs.getSessionState(sessionId);
502        if (ss == null) {
503            throw new IllegalStateException("Cannot add a producer to a session that had not been registered: "
504                    + sessionId);
505        }
506        // Avoid replaying dup commands
507        if (!ss.getProducerIds().contains(info.getProducerId())) {
508            ActiveMQDestination destination = info.getDestination();
509            if (destination != null && !AdvisorySupport.isAdvisoryTopic(destination)) {
510                if (getProducerCount(connectionId) >= connector.getMaximumProducersAllowedPerConnection()){
511                    throw new IllegalStateException("Can't add producer on connection " + connectionId + ": at maximum limit: " + connector.getMaximumProducersAllowedPerConnection());
512                }
513            }
514            broker.addProducer(cs.getContext(), info);
515            try {
516                ss.addProducer(info);
517            } catch (IllegalStateException e) {
518                broker.removeProducer(cs.getContext(), info);
519            }
520
521        }
522        return null;
523    }
524
525    public Response processRemoveProducer(ProducerId id) throws Exception {
526        SessionId sessionId = id.getParentId();
527        ConnectionId connectionId = sessionId.getParentId();
528        TransportConnectionState cs = lookupConnectionState(connectionId);
529        SessionState ss = cs.getSessionState(sessionId);
530        if (ss == null) {
531            throw new IllegalStateException("Cannot remove a producer from a session that had not been registered: "
532                    + sessionId);
533        }
534        ProducerState ps = ss.removeProducer(id);
535        if (ps == null) {
536            throw new IllegalStateException("Cannot remove a producer that had not been registered: " + id);
537        }
538        removeProducerBrokerExchange(id);
539        broker.removeProducer(cs.getContext(), ps.getInfo());
540        return null;
541    }
542
543    public Response processAddConsumer(ConsumerInfo info) throws Exception {
544        SessionId sessionId = info.getConsumerId().getParentId();
545        ConnectionId connectionId = sessionId.getParentId();
546        TransportConnectionState cs = lookupConnectionState(connectionId);
547        if (cs == null) {
548            throw new IllegalStateException("Cannot add a consumer to a connection that had not been registered: "
549                    + connectionId);
550        }
551        SessionState ss = cs.getSessionState(sessionId);
552        if (ss == null) {
553            throw new IllegalStateException(broker.getBrokerName()
554                    + " Cannot add a consumer to a session that had not been registered: " + sessionId);
555        }
556        // Avoid replaying dup commands
557        if (!ss.getConsumerIds().contains(info.getConsumerId())) {
558            ActiveMQDestination destination = info.getDestination();
559            if (destination != null && !AdvisorySupport.isAdvisoryTopic(destination)) {
560                if (getConsumerCount(connectionId) >= connector.getMaximumConsumersAllowedPerConnection()){
561                    throw new IllegalStateException("Can't add consumer on connection " + connectionId + ": at maximum limit: " + connector.getMaximumConsumersAllowedPerConnection());
562                }
563            }
564
565            broker.addConsumer(cs.getContext(), info);
566            try {
567                ss.addConsumer(info);
568                addConsumerBrokerExchange(info.getConsumerId());
569            } catch (IllegalStateException e) {
570                broker.removeConsumer(cs.getContext(), info);
571            }
572
573        }
574        return null;
575    }
576
577    public Response processRemoveConsumer(ConsumerId id, long lastDeliveredSequenceId) throws Exception {
578        SessionId sessionId = id.getParentId();
579        ConnectionId connectionId = sessionId.getParentId();
580        TransportConnectionState cs = lookupConnectionState(connectionId);
581        if (cs == null) {
582            throw new IllegalStateException("Cannot remove a consumer from a connection that had not been registered: "
583                    + connectionId);
584        }
585        SessionState ss = cs.getSessionState(sessionId);
586        if (ss == null) {
587            throw new IllegalStateException("Cannot remove a consumer from a session that had not been registered: "
588                    + sessionId);
589        }
590        ConsumerState consumerState = ss.removeConsumer(id);
591        if (consumerState == null) {
592            throw new IllegalStateException("Cannot remove a consumer that had not been registered: " + id);
593        }
594        ConsumerInfo info = consumerState.getInfo();
595        info.setLastDeliveredSequenceId(lastDeliveredSequenceId);
596        broker.removeConsumer(cs.getContext(), consumerState.getInfo());
597        removeConsumerBrokerExchange(id);
598        return null;
599    }
600
601    public Response processAddSession(SessionInfo info) throws Exception {
602        ConnectionId connectionId = info.getSessionId().getParentId();
603        TransportConnectionState cs = lookupConnectionState(connectionId);
604        // Avoid replaying dup commands
605        if (cs != null && !cs.getSessionIds().contains(info.getSessionId())) {
606            broker.addSession(cs.getContext(), info);
607            try {
608                cs.addSession(info);
609            } catch (IllegalStateException e) {
610                e.printStackTrace();
611                broker.removeSession(cs.getContext(), info);
612            }
613        }
614        return null;
615    }
616
617    public Response processRemoveSession(SessionId id, long lastDeliveredSequenceId) throws Exception {
618        ConnectionId connectionId = id.getParentId();
619        TransportConnectionState cs = lookupConnectionState(connectionId);
620        if (cs == null) {
621            throw new IllegalStateException("Cannot remove session from connection that had not been registered: " + connectionId);
622        }
623        SessionState session = cs.getSessionState(id);
624        if (session == null) {
625            throw new IllegalStateException("Cannot remove session that had not been registered: " + id);
626        }
627        // Don't let new consumers or producers get added while we are closing
628        // this down.
629        session.shutdown();
630        // Cascade the connection stop to the consumers and producers.
631        for (ConsumerId consumerId : session.getConsumerIds()) {
632            try {
633                processRemoveConsumer(consumerId, lastDeliveredSequenceId);
634            } catch (Throwable e) {
635                LOG.warn("Failed to remove consumer: " + consumerId + ". Reason: " + e, e);
636            }
637        }
638        for (ProducerId producerId : session.getProducerIds()) {
639            try {
640                processRemoveProducer(producerId);
641            } catch (Throwable e) {
642                LOG.warn("Failed to remove producer: " + producerId + ". Reason: " + e, e);
643            }
644        }
645        cs.removeSession(id);
646        broker.removeSession(cs.getContext(), session.getInfo());
647        return null;
648    }
649
650    public Response processAddConnection(ConnectionInfo info) throws Exception {
651        // if the broker service has slave attached, wait for the slave to be
652        // attached to allow client connection. slave connection is fine
653        if (!info.isBrokerMasterConnector() && connector.getBrokerService().isWaitForSlave()
654                && connector.getBrokerService().getSlaveStartSignal().getCount() == 1) {
655            ServiceSupport.dispose(transport);
656            return new ExceptionResponse(new Exception("Master's slave not attached yet."));
657        }
658        // Older clients should have been defaulting this field to true.. but
659        // they were not.
660        if (wireFormatInfo != null && wireFormatInfo.getVersion() <= 2) {
661            info.setClientMaster(true);
662        }
663        TransportConnectionState state;
664        // Make sure 2 concurrent connections by the same ID only generate 1
665        // TransportConnectionState object.
666        synchronized (brokerConnectionStates) {
667            state = (TransportConnectionState) brokerConnectionStates.get(info.getConnectionId());
668            if (state == null) {
669                state = new TransportConnectionState(info, this);
670                brokerConnectionStates.put(info.getConnectionId(), state);
671            }
672            state.incrementReference();
673        }
674        // If there are 2 concurrent connections for the same connection id,
675        // then last one in wins, we need to sync here
676        // to figure out the winner.
677        synchronized (state.getConnectionMutex()) {
678            if (state.getConnection() != this) {
679                LOG.debug("Killing previous stale connection: " + state.getConnection().getRemoteAddress());
680                state.getConnection().stop();
681                LOG.debug("Connection " + getRemoteAddress() + " taking over previous connection: "
682                        + state.getConnection().getRemoteAddress());
683                state.setConnection(this);
684                state.reset(info);
685            }
686        }
687        registerConnectionState(info.getConnectionId(), state);
688        LOG.debug("Setting up new connection id: " + info.getConnectionId() + ", address: " + getRemoteAddress() + ", info: " + info);
689        this.faultTolerantConnection = info.isFaultTolerant();
690        // Setup the context.
691        String clientId = info.getClientId();
692        context = new ConnectionContext();
693        context.setBroker(broker);
694        context.setClientId(clientId);
695        context.setClientMaster(info.isClientMaster());
696        context.setConnection(this);
697        context.setConnectionId(info.getConnectionId());
698        context.setConnector(connector);
699        context.setMessageAuthorizationPolicy(getMessageAuthorizationPolicy());
700        context.setNetworkConnection(networkConnection);
701        context.setFaultTolerant(faultTolerantConnection);
702        context.setTransactions(new ConcurrentHashMap<TransactionId, Transaction>());
703        context.setUserName(info.getUserName());
704        context.setWireFormatInfo(wireFormatInfo);
705        context.setReconnect(info.isFailoverReconnect());
706        this.manageable = info.isManageable();
707        context.setConnectionState(state);
708        state.setContext(context);
709        state.setConnection(this);
710        if (info.getClientIp() == null) {
711            info.setClientIp(getRemoteAddress());
712        }
713
714        try {
715            broker.addConnection(context, info);
716        } catch (Exception e) {
717            synchronized (brokerConnectionStates) {
718                brokerConnectionStates.remove(info.getConnectionId());
719            }
720            unregisterConnectionState(info.getConnectionId());
721            LOG.warn("Failed to add Connection " + info.getConnectionId() + ", reason: " + e.toString());
722            if (LOG.isDebugEnabled()) {
723                LOG.debug("Exception detail:", e);
724            }
725            throw e;
726        }
727        if (info.isManageable()) {
728            // send ConnectionCommand
729            ConnectionControl command = this.connector.getConnectionControl();
730            command.setFaultTolerant(broker.isFaultTolerantConfiguration());
731            if (info.isFailoverReconnect()) {
732                command.setRebalanceConnection(false);
733            }
734            dispatchAsync(command);
735        }
736        return null;
737    }
738
739    public synchronized Response processRemoveConnection(ConnectionId id, long lastDeliveredSequenceId)
740            throws InterruptedException {
741        LOG.debug("remove connection id: " + id);
742        TransportConnectionState cs = lookupConnectionState(id);
743        if (cs != null) {
744            // Don't allow things to be added to the connection state while we
745            // are shutting down.
746            cs.shutdown();
747            // Cascade the connection stop to the sessions.
748            for (SessionId sessionId : cs.getSessionIds()) {
749                try {
750                    processRemoveSession(sessionId, lastDeliveredSequenceId);
751                } catch (Throwable e) {
752                    SERVICELOG.warn("Failed to remove session " + sessionId, e);
753                }
754            }
755            // Cascade the connection stop to temp destinations.
756            for (Iterator<DestinationInfo> iter = cs.getTempDestinations().iterator(); iter.hasNext(); ) {
757                DestinationInfo di = iter.next();
758                try {
759                    broker.removeDestination(cs.getContext(), di.getDestination(), 0);
760                } catch (Throwable e) {
761                    SERVICELOG.warn("Failed to remove tmp destination " + di.getDestination(), e);
762                }
763                iter.remove();
764            }
765            try {
766                broker.removeConnection(cs.getContext(), cs.getInfo(), null);
767            } catch (Throwable e) {
768                SERVICELOG.warn("Failed to remove connection " + cs.getInfo() + ", reason: " + e.toString());
769                if (LOG.isDebugEnabled()) {
770                    SERVICELOG.debug("Exception detail:", e);
771                }
772            }
773            TransportConnectionState state = unregisterConnectionState(id);
774            if (state != null) {
775                synchronized (brokerConnectionStates) {
776                    // If we are the last reference, we should remove the state
777                    // from the broker.
778                    if (state.decrementReference() == 0) {
779                        brokerConnectionStates.remove(id);
780                    }
781                }
782            }
783        }
784        return null;
785    }
786
787    public Response processProducerAck(ProducerAck ack) throws Exception {
788        // A broker should not get ProducerAck messages.
789        return null;
790    }
791
792    public Connector getConnector() {
793        return connector;
794    }
795
796    public void dispatchSync(Command message) {
797        try {
798            processDispatch(message);
799        } catch (IOException e) {
800            serviceExceptionAsync(e);
801        }
802    }
803
804    public void dispatchAsync(Command message) {
805        if (!stopping.get()) {
806            if (taskRunner == null) {
807                dispatchSync(message);
808            } else {
809                synchronized (dispatchQueue) {
810                    dispatchQueue.add(message);
811                }
812                try {
813                    taskRunner.wakeup();
814                } catch (InterruptedException e) {
815                    Thread.currentThread().interrupt();
816                }
817            }
818        } else {
819            if (message.isMessageDispatch()) {
820                MessageDispatch md = (MessageDispatch) message;
821                Runnable sub = md.getTransmitCallback();
822                broker.postProcessDispatch(md);
823                if (sub != null) {
824                    sub.run();
825                }
826            }
827        }
828    }
829
830    protected void processDispatch(Command command) throws IOException {
831        final MessageDispatch messageDispatch = (MessageDispatch) (command.isMessageDispatch() ? command : null);
832        try {
833            if (!stopping.get()) {
834                if (messageDispatch != null) {
835                    broker.preProcessDispatch(messageDispatch);
836                }
837                dispatch(command);
838            }
839        } finally {
840            if (messageDispatch != null) {
841                Runnable sub = messageDispatch.getTransmitCallback();
842                broker.postProcessDispatch(messageDispatch);
843                if (sub != null) {
844                    sub.run();
845                }
846            }
847        }
848    }
849
850    public boolean iterate() {
851        try {
852            if (pendingStop || stopping.get()) {
853                if (dispatchStopped.compareAndSet(false, true)) {
854                    if (transportException.get() == null) {
855                        try {
856                            dispatch(new ShutdownInfo());
857                        } catch (Throwable ignore) {
858                        }
859                    }
860                    dispatchStoppedLatch.countDown();
861                }
862                return false;
863            }
864            if (!dispatchStopped.get()) {
865                Command command = null;
866                synchronized (dispatchQueue) {
867                    if (dispatchQueue.isEmpty()) {
868                        return false;
869                    }
870                    command = dispatchQueue.remove(0);
871                }
872                processDispatch(command);
873                return true;
874            }
875            return false;
876        } catch (IOException e) {
877            if (dispatchStopped.compareAndSet(false, true)) {
878                dispatchStoppedLatch.countDown();
879            }
880            serviceExceptionAsync(e);
881            return false;
882        }
883    }
884
885    /**
886     * Returns the statistics for this connection
887     */
888    public ConnectionStatistics getStatistics() {
889        return statistics;
890    }
891
892    public MessageAuthorizationPolicy getMessageAuthorizationPolicy() {
893        return messageAuthorizationPolicy;
894    }
895
896    public void setMessageAuthorizationPolicy(MessageAuthorizationPolicy messageAuthorizationPolicy) {
897        this.messageAuthorizationPolicy = messageAuthorizationPolicy;
898    }
899
900    public boolean isManageable() {
901        return manageable;
902    }
903
904    public void start() throws Exception {
905        try {
906            synchronized (this) {
907                starting = true;
908                if (taskRunnerFactory != null) {
909                    taskRunner = taskRunnerFactory.createTaskRunner(this, "ActiveMQ Connection Dispatcher: "
910                            + getRemoteAddress());
911                } else {
912                    taskRunner = null;
913                }
914                transport.start();
915                active = true;
916                BrokerInfo info = connector.getBrokerInfo().copy();
917                if (connector.isUpdateClusterClients()) {
918                    info.setPeerBrokerInfos(this.broker.getPeerBrokerInfos());
919                } else {
920                    info.setPeerBrokerInfos(null);
921                }
922                dispatchAsync(info);
923
924                connector.onStarted(this);
925            }
926        } catch (Exception e) {
927            // Force clean up on an error starting up.
928            pendingStop = true;
929            throw e;
930        } finally {
931            // stop() can be called from within the above block,
932            // but we want to be sure start() completes before
933            // stop() runs, so queue the stop until right now:
934            setStarting(false);
935            if (isPendingStop()) {
936                LOG.debug("Calling the delayed stop() after start() " + this);
937                stop();
938            }
939        }
940    }
941
942    public void stop() throws Exception {
943        stopAsync();
944        while (!stopped.await(5, TimeUnit.SECONDS)) {
945            LOG.info("The connection to '" + transport.getRemoteAddress() + "' is taking a long time to shutdown.");
946        }
947    }
948
949    public void delayedStop(final int waitTime, final String reason, Throwable cause) {
950        if (waitTime > 0) {
951            synchronized (this) {
952                pendingStop = true;
953                stopError = cause;
954            }
955            try {
956                DefaultThreadPools.getDefaultTaskRunnerFactory().execute(new Runnable() {
957                    public void run() {
958                        try {
959                            Thread.sleep(waitTime);
960                            stopAsync();
961                            LOG.info("Stopping " + transport.getRemoteAddress() + " because " + reason);
962                        } catch (InterruptedException e) {
963                        }
964                    }
965                }, "delayedStop:" + transport.getRemoteAddress());
966            } catch (Throwable t) {
967                LOG.warn("cannot create stopAsync :", t);
968            }
969        }
970    }
971
972    public void stopAsync() {
973        // If we're in the middle of starting then go no further... for now.
974        synchronized (this) {
975            pendingStop = true;
976            if (starting) {
977                LOG.debug("stopAsync() called in the middle of start(). Delaying till start completes..");
978                return;
979            }
980        }
981        if (stopping.compareAndSet(false, true)) {
982            // Let all the connection contexts know we are shutting down
983            // so that in progress operations can notice and unblock.
984            List<TransportConnectionState> connectionStates = listConnectionStates();
985            for (TransportConnectionState cs : connectionStates) {
986                ConnectionContext connectionContext = cs.getContext();
987                if (connectionContext != null) {
988                    connectionContext.getStopping().set(true);
989                }
990            }
991            try {
992                DefaultThreadPools.getDefaultTaskRunnerFactory().execute(new Runnable() {
993                    public void run() {
994                        serviceLock.writeLock().lock();
995                        try {
996                            doStop();
997                        } catch (Throwable e) {
998                            LOG.debug("Error occurred while shutting down a connection " + this, e);
999                        } finally {
1000                            stopped.countDown();
1001                            serviceLock.writeLock().unlock();
1002                        }
1003                    }
1004                }, "StopAsync:" + transport.getRemoteAddress());
1005            } catch (Throwable t) {
1006                LOG.warn("cannot create async transport stopper thread.. not waiting for stop to complete, reason:", t);
1007                stopped.countDown();
1008            }
1009        }
1010    }
1011
1012    @Override
1013    public String toString() {
1014        return "Transport Connection to: " + transport.getRemoteAddress();
1015    }
1016
1017    protected void doStop() throws Exception, InterruptedException {
1018        LOG.debug("Stopping connection: " + transport.getRemoteAddress());
1019        connector.onStopped(this);
1020        try {
1021            synchronized (this) {
1022                if (masterBroker != null) {
1023                    masterBroker.stop();
1024                }
1025                if (duplexBridge != null) {
1026                    duplexBridge.stop();
1027                }
1028            }
1029        } catch (Exception ignore) {
1030            LOG.trace("Exception caught stopping", ignore);
1031        }
1032        try {
1033            transport.stop();
1034            LOG.debug("Stopped transport: " + transport.getRemoteAddress());
1035        } catch (Exception e) {
1036            LOG.debug("Could not stop transport: " + e, e);
1037        }
1038        if (taskRunner != null) {
1039            taskRunner.shutdown(1);
1040        }
1041        active = false;
1042        // Run the MessageDispatch callbacks so that message references get
1043        // cleaned up.
1044        synchronized (dispatchQueue) {
1045            for (Iterator<Command> iter = dispatchQueue.iterator(); iter.hasNext(); ) {
1046                Command command = iter.next();
1047                if (command.isMessageDispatch()) {
1048                    MessageDispatch md = (MessageDispatch) command;
1049                    Runnable sub = md.getTransmitCallback();
1050                    broker.postProcessDispatch(md);
1051                    if (sub != null) {
1052                        sub.run();
1053                    }
1054                }
1055            }
1056            dispatchQueue.clear();
1057        }
1058        //
1059        // Remove all logical connection associated with this connection
1060        // from the broker.
1061        if (!broker.isStopped()) {
1062            List<TransportConnectionState> connectionStates = listConnectionStates();
1063            connectionStates = listConnectionStates();
1064            for (TransportConnectionState cs : connectionStates) {
1065                cs.getContext().getStopping().set(true);
1066                try {
1067                    LOG.debug("Cleaning up connection resources: " + getRemoteAddress());
1068                    processRemoveConnection(cs.getInfo().getConnectionId(), 0l);
1069                } catch (Throwable ignore) {
1070                    ignore.printStackTrace();
1071                }
1072            }
1073        }
1074        LOG.debug("Connection Stopped: " + getRemoteAddress());
1075    }
1076
1077    /**
1078     * @return Returns the blockedCandidate.
1079     */
1080    public boolean isBlockedCandidate() {
1081        return blockedCandidate;
1082    }
1083
1084    /**
1085     * @param blockedCandidate The blockedCandidate to set.
1086     */
1087    public void setBlockedCandidate(boolean blockedCandidate) {
1088        this.blockedCandidate = blockedCandidate;
1089    }
1090
1091    /**
1092     * @return Returns the markedCandidate.
1093     */
1094    public boolean isMarkedCandidate() {
1095        return markedCandidate;
1096    }
1097
1098    /**
1099     * @param markedCandidate The markedCandidate to set.
1100     */
1101    public void setMarkedCandidate(boolean markedCandidate) {
1102        this.markedCandidate = markedCandidate;
1103        if (!markedCandidate) {
1104            timeStamp = 0;
1105            blockedCandidate = false;
1106        }
1107    }
1108
1109    /**
1110     * @param slow The slow to set.
1111     */
1112    public void setSlow(boolean slow) {
1113        this.slow = slow;
1114    }
1115
1116    /**
1117     * @return true if the Connection is slow
1118     */
1119    public boolean isSlow() {
1120        return slow;
1121    }
1122
1123    /**
1124     * @return true if the Connection is potentially blocked
1125     */
1126    public boolean isMarkedBlockedCandidate() {
1127        return markedCandidate;
1128    }
1129
1130    /**
1131     * Mark the Connection, so we can deem if it's collectable on the next sweep
1132     */
1133    public void doMark() {
1134        if (timeStamp == 0) {
1135            timeStamp = System.currentTimeMillis();
1136        }
1137    }
1138
1139    /**
1140     * @return if after being marked, the Connection is still writing
1141     */
1142    public boolean isBlocked() {
1143        return blocked;
1144    }
1145
1146    /**
1147     * @return true if the Connection is connected
1148     */
1149    public boolean isConnected() {
1150        return connected;
1151    }
1152
1153    /**
1154     * @param blocked The blocked to set.
1155     */
1156    public void setBlocked(boolean blocked) {
1157        this.blocked = blocked;
1158    }
1159
1160    /**
1161     * @param connected The connected to set.
1162     */
1163    public void setConnected(boolean connected) {
1164        this.connected = connected;
1165    }
1166
1167    /**
1168     * @return true if the Connection is active
1169     */
1170    public boolean isActive() {
1171        return active;
1172    }
1173
1174    /**
1175     * @param active The active to set.
1176     */
1177    public void setActive(boolean active) {
1178        this.active = active;
1179    }
1180
1181    /**
1182     * @return true if the Connection is starting
1183     */
1184    public synchronized boolean isStarting() {
1185        return starting;
1186    }
1187
1188    public synchronized boolean isNetworkConnection() {
1189        return networkConnection;
1190    }
1191
1192    public boolean isFaultTolerantConnection() {
1193        return this.faultTolerantConnection;
1194    }
1195
1196    protected synchronized void setStarting(boolean starting) {
1197        this.starting = starting;
1198    }
1199
1200    /**
1201     * @return true if the Connection needs to stop
1202     */
1203    public synchronized boolean isPendingStop() {
1204        return pendingStop;
1205    }
1206
1207    protected synchronized void setPendingStop(boolean pendingStop) {
1208        this.pendingStop = pendingStop;
1209    }
1210
1211    public Response processBrokerInfo(BrokerInfo info) {
1212        if (info.isSlaveBroker()) {
1213            BrokerService bService = connector.getBrokerService();
1214            // Do we only support passive slaves - or does the slave want to be
1215            // passive ?
1216            boolean passive = bService.isPassiveSlave() || info.isPassiveSlave();
1217            if (passive == false) {
1218
1219                // stream messages from this broker (the master) to
1220                // the slave
1221                MutableBrokerFilter parent = (MutableBrokerFilter) broker.getAdaptor(MutableBrokerFilter.class);
1222                masterBroker = new MasterBroker(parent, transport);
1223                masterBroker.startProcessing();
1224            }
1225            LOG.info((passive ? "Passive" : "Active") + " Slave Broker " + info.getBrokerName() + " is attached");
1226            bService.slaveConnectionEstablished();
1227        } else if (info.isNetworkConnection() && info.isDuplexConnection()) {
1228            // so this TransportConnection is the rear end of a network bridge
1229            // We have been requested to create a two way pipe ...
1230            try {
1231                Properties properties = MarshallingSupport.stringToProperties(info.getNetworkProperties());
1232                Map<String, String> props = createMap(properties);
1233                NetworkBridgeConfiguration config = new NetworkBridgeConfiguration();
1234                IntrospectionSupport.setProperties(config, props, "");
1235                config.setBrokerName(broker.getBrokerName());
1236
1237                // check for existing duplex connection hanging about
1238
1239                // We first look if existing network connection already exists for the same broker Id and network connector name
1240                // It's possible in case of brief network fault to have this transport connector side of the connection always active
1241                // and the duplex network connector side wanting to open a new one
1242                // In this case, the old connection must be broken
1243                String duplexNetworkConnectorId = config.getName() + "@" + info.getBrokerId();
1244                CopyOnWriteArrayList<TransportConnection> connections = this.connector.getConnections();
1245                synchronized (connections) {
1246                    for (Iterator<TransportConnection> iter = connections.iterator(); iter.hasNext(); ) {
1247                        TransportConnection c = iter.next();
1248                        if ((c != this) && (duplexNetworkConnectorId.equals(c.getDuplexNetworkConnectorId()))) {
1249                            LOG.warn("Stopping an existing active duplex connection [" + c + "] for network connector (" + duplexNetworkConnectorId + ").");
1250                            c.stopAsync();
1251                            // better to wait for a bit rather than get connection id already in use and failure to start new bridge
1252                            c.getStopped().await(1, TimeUnit.SECONDS);
1253                        }
1254                    }
1255                    setDuplexNetworkConnectorId(duplexNetworkConnectorId);
1256                }
1257                URI uri = broker.getVmConnectorURI();
1258                HashMap<String, String> map = new HashMap<String, String>(URISupport.parseParameters(uri));
1259                map.put("network", "true");
1260                map.put("async", "false");
1261                uri = URISupport.createURIWithQuery(uri, URISupport.createQueryString(map));
1262                Transport localTransport = TransportFactory.connect(uri);
1263                Transport remoteBridgeTransport = new ResponseCorrelator(transport);
1264                String duplexName = localTransport.toString();
1265                if (duplexName.contains("#")) {
1266                    duplexName = duplexName.substring(duplexName.lastIndexOf("#"));
1267                }
1268                MBeanNetworkListener listener = new MBeanNetworkListener(broker.getBrokerService(), broker.getBrokerService().createDuplexNetworkConnectorObjectName(duplexName));
1269                listener.setCreatedByDuplex(true);
1270                duplexBridge = NetworkBridgeFactory.createBridge(config, localTransport, remoteBridgeTransport, listener);
1271                duplexBridge.setBrokerService(broker.getBrokerService());
1272                // now turn duplex off this side
1273                info.setDuplexConnection(false);
1274                duplexBridge.setCreatedByDuplex(true);
1275                duplexBridge.duplexStart(this, brokerInfo, info);
1276                LOG.info("Started responder end of duplex bridge " + duplexNetworkConnectorId);
1277                return null;
1278            } catch (TransportDisposedIOException e) {
1279                LOG.warn("Duplex bridge " + duplexNetworkConnectorId + " was stopped before it was correctly started.");
1280                return null;
1281            } catch (Exception e) {
1282                LOG.error("Failed to create responder end of duplex network bridge " + duplexNetworkConnectorId, e);
1283                return null;
1284            }
1285        }
1286        // We only expect to get one broker info command per connection
1287        if (this.brokerInfo != null) {
1288            LOG.warn("Unexpected extra broker info command received: " + info);
1289        }
1290        this.brokerInfo = info;
1291        networkConnection = true;
1292        List<TransportConnectionState> connectionStates = listConnectionStates();
1293        for (TransportConnectionState cs : connectionStates) {
1294            cs.getContext().setNetworkConnection(true);
1295        }
1296        return null;
1297    }
1298
1299    @SuppressWarnings({"unchecked", "rawtypes"})
1300    private HashMap<String, String> createMap(Properties properties) {
1301        return new HashMap(properties);
1302    }
1303
1304    protected void dispatch(Command command) throws IOException {
1305        try {
1306            setMarkedCandidate(true);
1307            transport.oneway(command);
1308        } finally {
1309            setMarkedCandidate(false);
1310        }
1311    }
1312
1313    public String getRemoteAddress() {
1314        return transport.getRemoteAddress();
1315    }
1316
1317    public String getConnectionId() {
1318        List<TransportConnectionState> connectionStates = listConnectionStates();
1319        for (TransportConnectionState cs : connectionStates) {
1320            if (cs.getInfo().getClientId() != null) {
1321                return cs.getInfo().getClientId();
1322            }
1323            return cs.getInfo().getConnectionId().toString();
1324        }
1325        return null;
1326    }
1327
1328    public void updateClient(ConnectionControl control) {
1329        if (isActive() && isBlocked() == false && isFaultTolerantConnection() && this.wireFormatInfo != null
1330                && this.wireFormatInfo.getVersion() >= 6) {
1331            dispatchAsync(control);
1332        }
1333    }
1334
1335    private ProducerBrokerExchange getProducerBrokerExchange(ProducerId id) throws IOException {
1336        ProducerBrokerExchange result = producerExchanges.get(id);
1337        if (result == null) {
1338            synchronized (producerExchanges) {
1339                result = new ProducerBrokerExchange();
1340                TransportConnectionState state = lookupConnectionState(id);
1341                context = state.getContext();
1342                result.setConnectionContext(context);
1343                if (context.isReconnect() || (context.isNetworkConnection() && connector.isAuditNetworkProducers())) {
1344                    result.setLastStoredSequenceId(broker.getBrokerService().getPersistenceAdapter().getLastProducerSequenceId(id));
1345                }
1346                SessionState ss = state.getSessionState(id.getParentId());
1347                if (ss != null) {
1348                    result.setProducerState(ss.getProducerState(id));
1349                    ProducerState producerState = ss.getProducerState(id);
1350                    if (producerState != null && producerState.getInfo() != null) {
1351                        ProducerInfo info = producerState.getInfo();
1352                        result.setMutable(info.getDestination() == null || info.getDestination().isComposite());
1353                    }
1354                }
1355                producerExchanges.put(id, result);
1356            }
1357        } else {
1358            context = result.getConnectionContext();
1359        }
1360        return result;
1361    }
1362
1363    private void removeProducerBrokerExchange(ProducerId id) {
1364        synchronized (producerExchanges) {
1365            producerExchanges.remove(id);
1366        }
1367    }
1368
1369    private ConsumerBrokerExchange getConsumerBrokerExchange(ConsumerId id) {
1370        ConsumerBrokerExchange result = consumerExchanges.get(id);
1371        return result;
1372    }
1373
1374    private ConsumerBrokerExchange addConsumerBrokerExchange(ConsumerId id) {
1375        ConsumerBrokerExchange result = consumerExchanges.get(id);
1376        if (result == null) {
1377            synchronized (consumerExchanges) {
1378                result = new ConsumerBrokerExchange();
1379                TransportConnectionState state = lookupConnectionState(id);
1380                context = state.getContext();
1381                result.setConnectionContext(context);
1382                SessionState ss = state.getSessionState(id.getParentId());
1383                if (ss != null) {
1384                    ConsumerState cs = ss.getConsumerState(id);
1385                    if (cs != null) {
1386                        ConsumerInfo info = cs.getInfo();
1387                        if (info != null) {
1388                            if (info.getDestination() != null && info.getDestination().isPattern()) {
1389                                result.setWildcard(true);
1390                            }
1391                        }
1392                    }
1393                }
1394                consumerExchanges.put(id, result);
1395            }
1396        }
1397        return result;
1398    }
1399
1400    private void removeConsumerBrokerExchange(ConsumerId id) {
1401        synchronized (consumerExchanges) {
1402            consumerExchanges.remove(id);
1403        }
1404    }
1405
1406    public int getProtocolVersion() {
1407        return protocolVersion.get();
1408    }
1409
1410    public Response processControlCommand(ControlCommand command) throws Exception {
1411        return null;
1412    }
1413
1414    public Response processMessageDispatch(MessageDispatch dispatch) throws Exception {
1415        return null;
1416    }
1417
1418    public Response processConnectionControl(ConnectionControl control) throws Exception {
1419        if (control != null) {
1420            faultTolerantConnection = control.isFaultTolerant();
1421        }
1422        return null;
1423    }
1424
1425    public Response processConnectionError(ConnectionError error) throws Exception {
1426        return null;
1427    }
1428
1429    public Response processConsumerControl(ConsumerControl control) throws Exception {
1430        ConsumerBrokerExchange consumerExchange = getConsumerBrokerExchange(control.getConsumerId());
1431        broker.processConsumerControl(consumerExchange, control);
1432        return null;
1433    }
1434
1435    protected synchronized TransportConnectionState registerConnectionState(ConnectionId connectionId,
1436                                                                            TransportConnectionState state) {
1437        TransportConnectionState cs = null;
1438        if (!connectionStateRegister.isEmpty() && !connectionStateRegister.doesHandleMultipleConnectionStates()) {
1439            // swap implementations
1440            TransportConnectionStateRegister newRegister = new MapTransportConnectionStateRegister();
1441            newRegister.intialize(connectionStateRegister);
1442            connectionStateRegister = newRegister;
1443        }
1444        cs = connectionStateRegister.registerConnectionState(connectionId, state);
1445        return cs;
1446    }
1447
1448    protected synchronized TransportConnectionState unregisterConnectionState(ConnectionId connectionId) {
1449        return connectionStateRegister.unregisterConnectionState(connectionId);
1450    }
1451
1452    protected synchronized List<TransportConnectionState> listConnectionStates() {
1453        return connectionStateRegister.listConnectionStates();
1454    }
1455
1456    protected synchronized TransportConnectionState lookupConnectionState(String connectionId) {
1457        return connectionStateRegister.lookupConnectionState(connectionId);
1458    }
1459
1460    protected synchronized TransportConnectionState lookupConnectionState(ConsumerId id) {
1461        return connectionStateRegister.lookupConnectionState(id);
1462    }
1463
1464    protected synchronized TransportConnectionState lookupConnectionState(ProducerId id) {
1465        return connectionStateRegister.lookupConnectionState(id);
1466    }
1467
1468    protected synchronized TransportConnectionState lookupConnectionState(SessionId id) {
1469        return connectionStateRegister.lookupConnectionState(id);
1470    }
1471
1472    protected synchronized TransportConnectionState lookupConnectionState(ConnectionId connectionId) {
1473        return connectionStateRegister.lookupConnectionState(connectionId);
1474    }
1475
1476    protected synchronized void setDuplexNetworkConnectorId(String duplexNetworkConnectorId) {
1477        this.duplexNetworkConnectorId = duplexNetworkConnectorId;
1478    }
1479
1480    protected synchronized String getDuplexNetworkConnectorId() {
1481        return this.duplexNetworkConnectorId;
1482    }
1483
1484    public boolean isStopping() {
1485        return stopping.get();
1486    }
1487
1488    protected CountDownLatch getStopped() {
1489        return stopped;
1490    }
1491
1492    private int getProducerCount(ConnectionId connectionId) {
1493        int result = 0;
1494        TransportConnectionState cs = lookupConnectionState(connectionId);
1495        if (cs != null) {
1496            for (SessionId sessionId : cs.getSessionIds()) {
1497                SessionState sessionState = cs.getSessionState(sessionId);
1498                if (sessionState != null) {
1499                    result += sessionState.getProducerIds().size();
1500                }
1501            }
1502        }
1503        return result;
1504    }
1505
1506    private int getConsumerCount(ConnectionId connectionId) {
1507        int result = 0;
1508        TransportConnectionState cs = lookupConnectionState(connectionId);
1509        if (cs != null) {
1510            for (SessionId sessionId : cs.getSessionIds()) {
1511                SessionState sessionState = cs.getSessionState(sessionId);
1512                if (sessionState != null) {
1513                    result += sessionState.getConsumerIds().size();
1514                }
1515            }
1516        }
1517        return result;
1518    }
1519}