001/**
002 * Licensed to the Apache Software Foundation (ASF) under one or more
003 * contributor license agreements.  See the NOTICE file distributed with
004 * this work for additional information regarding copyright ownership.
005 * The ASF licenses this file to You under the Apache License, Version 2.0
006 * (the "License"); you may not use this file except in compliance with
007 * the License.  You may obtain a copy of the License at
008 *
009 *      http://www.apache.org/licenses/LICENSE-2.0
010 *
011 * Unless required by applicable law or agreed to in writing, software
012 * distributed under the License is distributed on an "AS IS" BASIS,
013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014 * See the License for the specific language governing permissions and
015 * limitations under the License.
016 */
017package org.apache.activemq.network;
018
019import java.io.IOException;
020import java.security.GeneralSecurityException;
021import java.security.cert.X509Certificate;
022import java.util.Arrays;
023import java.util.Collection;
024import java.util.List;
025import java.util.Properties;
026import java.util.concurrent.ConcurrentHashMap;
027import java.util.concurrent.CountDownLatch;
028import java.util.concurrent.TimeUnit;
029import java.util.concurrent.atomic.AtomicBoolean;
030import java.util.concurrent.atomic.AtomicLong;
031
032import javax.management.ObjectName;
033import org.apache.activemq.Service;
034import org.apache.activemq.advisory.AdvisorySupport;
035import org.apache.activemq.broker.BrokerService;
036import org.apache.activemq.broker.BrokerServiceAware;
037import org.apache.activemq.broker.TransportConnection;
038import org.apache.activemq.broker.region.AbstractRegion;
039import org.apache.activemq.broker.region.DurableTopicSubscription;
040import org.apache.activemq.broker.region.Region;
041import org.apache.activemq.broker.region.RegionBroker;
042import org.apache.activemq.broker.region.Subscription;
043import org.apache.activemq.broker.region.policy.PolicyEntry;
044import org.apache.activemq.command.*;
045import org.apache.activemq.filter.DestinationFilter;
046import org.apache.activemq.filter.MessageEvaluationContext;
047import org.apache.activemq.thread.DefaultThreadPools;
048import org.apache.activemq.thread.TaskRunnerFactory;
049import org.apache.activemq.transport.DefaultTransportListener;
050import org.apache.activemq.transport.FutureResponse;
051import org.apache.activemq.transport.ResponseCallback;
052import org.apache.activemq.transport.Transport;
053import org.apache.activemq.transport.TransportDisposedIOException;
054import org.apache.activemq.transport.TransportFilter;
055import org.apache.activemq.transport.tcp.SslTransport;
056import org.apache.activemq.util.IdGenerator;
057import org.apache.activemq.util.IntrospectionSupport;
058import org.apache.activemq.util.LongSequenceGenerator;
059import org.apache.activemq.util.MarshallingSupport;
060import org.apache.activemq.util.ServiceStopper;
061import org.apache.activemq.util.ServiceSupport;
062import org.slf4j.Logger;
063import org.slf4j.LoggerFactory;
064
065/**
066 * A useful base class for implementing demand forwarding bridges.
067 */
068public abstract class DemandForwardingBridgeSupport implements NetworkBridge, BrokerServiceAware {
069    private static final Logger LOG = LoggerFactory.getLogger(DemandForwardingBridgeSupport.class);
070    private final TaskRunnerFactory asyncTaskRunner = DefaultThreadPools.getDefaultTaskRunnerFactory();
071    protected static final String DURABLE_SUB_PREFIX = "NC-DS_";
072    protected final Transport localBroker;
073    protected final Transport remoteBroker;
074    protected final IdGenerator idGenerator = new IdGenerator();
075    protected final LongSequenceGenerator consumerIdGenerator = new LongSequenceGenerator();
076    protected ConnectionInfo localConnectionInfo;
077    protected ConnectionInfo remoteConnectionInfo;
078    protected SessionInfo localSessionInfo;
079    protected ProducerInfo producerInfo;
080    protected String remoteBrokerName = "Unknown";
081    protected String localClientId;
082    protected ConsumerInfo demandConsumerInfo;
083    protected int demandConsumerDispatched;
084    protected final AtomicBoolean localBridgeStarted = new AtomicBoolean(false);
085    protected final AtomicBoolean remoteBridgeStarted = new AtomicBoolean(false);
086    protected AtomicBoolean disposed = new AtomicBoolean();
087    protected BrokerId localBrokerId;
088    protected ActiveMQDestination[] excludedDestinations;
089    protected ActiveMQDestination[] dynamicallyIncludedDestinations;
090    protected ActiveMQDestination[] staticallyIncludedDestinations;
091    protected ActiveMQDestination[] durableDestinations;
092    protected final ConcurrentHashMap<ConsumerId, DemandSubscription> subscriptionMapByLocalId = new ConcurrentHashMap<ConsumerId, DemandSubscription>();
093    protected final ConcurrentHashMap<ConsumerId, DemandSubscription> subscriptionMapByRemoteId = new ConcurrentHashMap<ConsumerId, DemandSubscription>();
094    protected final BrokerId localBrokerPath[] = new BrokerId[] { null };
095    protected CountDownLatch startedLatch = new CountDownLatch(2);
096    protected CountDownLatch localStartedLatch = new CountDownLatch(1);
097    protected final AtomicBoolean lastConnectSucceeded = new AtomicBoolean(false);
098    protected NetworkBridgeConfiguration configuration;
099    protected final NetworkBridgeFilterFactory defaultFilterFactory = new DefaultNetworkBridgeFilterFactory();
100
101    protected final BrokerId remoteBrokerPath[] = new BrokerId[] {null};
102    protected Object brokerInfoMutex = new Object();
103    protected BrokerId remoteBrokerId;
104
105    final AtomicLong enqueueCounter = new AtomicLong();
106    final AtomicLong dequeueCounter = new AtomicLong();
107
108    private NetworkBridgeListener networkBridgeListener;
109    private boolean createdByDuplex;
110    private BrokerInfo localBrokerInfo;
111    private BrokerInfo remoteBrokerInfo;
112
113    private final AtomicBoolean started = new AtomicBoolean();
114    private TransportConnection duplexInitiatingConnection;
115    private BrokerService brokerService = null;
116    private ObjectName mbeanObjectName;
117
118    public DemandForwardingBridgeSupport(NetworkBridgeConfiguration configuration, Transport localBroker, Transport remoteBroker) {
119        this.configuration = configuration;
120        this.localBroker = localBroker;
121        this.remoteBroker = remoteBroker;
122    }
123
124    public void duplexStart(TransportConnection connection, BrokerInfo localBrokerInfo, BrokerInfo remoteBrokerInfo) throws Exception {
125        this.localBrokerInfo = localBrokerInfo;
126        this.remoteBrokerInfo = remoteBrokerInfo;
127        this.duplexInitiatingConnection = connection;
128        start();
129        serviceRemoteCommand(remoteBrokerInfo);
130    }
131
132    public void start() throws Exception {
133        if (started.compareAndSet(false, true)) {
134            localBroker.setTransportListener(new DefaultTransportListener() {
135
136                @Override
137                public void onCommand(Object o) {
138                    Command command = (Command) o;
139                    serviceLocalCommand(command);
140                }
141
142                @Override
143                public void onException(IOException error) {
144                    serviceLocalException(error);
145                }
146            });
147            remoteBroker.setTransportListener(new DefaultTransportListener() {
148
149                public void onCommand(Object o) {
150                    Command command = (Command) o;
151                    serviceRemoteCommand(command);
152                }
153
154                public void onException(IOException error) {
155                    serviceRemoteException(error);
156                }
157
158            });
159
160            localBroker.start();
161            remoteBroker.start();
162            if (!disposed.get()) {
163                try {
164                    triggerRemoteStartBridge();
165                } catch (IOException e) {
166                    LOG.warn("Caught exception from remote start", e);
167                }
168            } else {
169                LOG.warn ("Bridge was disposed before the start() method was fully executed.");
170                throw new TransportDisposedIOException();
171            }
172        }
173    }
174
175    protected void triggerLocalStartBridge() throws IOException {
176        asyncTaskRunner.execute(new Runnable() {
177            public void run() {
178                final String originalName = Thread.currentThread().getName();
179                Thread.currentThread().setName("StartLocalBridge: localBroker=" + localBroker);
180                try {
181                    startLocalBridge();
182                } catch (Throwable e) {
183                    serviceLocalException(e);
184                } finally {
185                    Thread.currentThread().setName(originalName);
186                }
187            }
188        });
189    }
190
191    protected void triggerRemoteStartBridge() throws IOException {
192        asyncTaskRunner.execute(new Runnable() {
193            public void run() {
194                final String originalName = Thread.currentThread().getName();
195                Thread.currentThread().setName("StartRemoteBridge: remoteBroker=" + remoteBroker);
196                try {
197                    startRemoteBridge();
198                } catch (Exception e) {
199                    serviceRemoteException(e);
200                } finally {
201                    Thread.currentThread().setName(originalName);
202                }
203            }
204        });
205    }
206
207    private void startLocalBridge() throws Throwable {
208        if (localBridgeStarted.compareAndSet(false, true)) {
209            synchronized (this) {
210                if (LOG.isTraceEnabled()) {
211                    LOG.trace(configuration.getBrokerName() + " starting local Bridge, localBroker=" + localBroker);
212                }
213                if (!disposed.get()) {
214                    localConnectionInfo = new ConnectionInfo();
215                    localConnectionInfo.setConnectionId(new ConnectionId(idGenerator.generateId()));
216                    localClientId = configuration.getName() + "_" + remoteBrokerName + "_inbound_" + configuration.getBrokerName();
217                    localConnectionInfo.setClientId(localClientId);
218                    localConnectionInfo.setUserName(configuration.getUserName());
219                    localConnectionInfo.setPassword(configuration.getPassword());
220                    Transport originalTransport = remoteBroker;
221                    while (originalTransport instanceof TransportFilter) {
222                        originalTransport = ((TransportFilter) originalTransport).getNext();
223                    }
224                    if (originalTransport instanceof SslTransport) {
225                        X509Certificate[] peerCerts = ((SslTransport) originalTransport).getPeerCertificates();
226                        localConnectionInfo.setTransportContext(peerCerts);
227                    }
228                    // sync requests that may fail
229                    Object resp = localBroker.request(localConnectionInfo);
230                    if (resp instanceof ExceptionResponse) {
231                        throw ((ExceptionResponse)resp).getException();
232                    }
233                    localSessionInfo = new SessionInfo(localConnectionInfo, 1);
234                    localBroker.oneway(localSessionInfo);
235
236                    brokerService.getBroker().networkBridgeStarted(remoteBrokerInfo, this.createdByDuplex, remoteBroker.toString());
237                    NetworkBridgeListener l = this.networkBridgeListener;
238                    if (l != null) {
239                        l.onStart(this);
240                    }
241                    LOG.info("Network connection between " + localBroker + " and " + remoteBroker + "(" + remoteBrokerName + ") has been established.");
242
243                } else {
244                    LOG.warn ("Bridge was disposed before the startLocalBridge() method was fully executed.");
245                }
246                startedLatch.countDown();
247                localStartedLatch.countDown();
248                if (!disposed.get()) {
249                    setupStaticDestinations();
250                } else {
251                    LOG.warn("Network connection between " + localBroker + " and " + remoteBroker + "(" + remoteBrokerName + ") was interrupted during establishment.");
252                }
253            }
254        }
255    }
256
257    protected void startRemoteBridge() throws Exception {
258        if (remoteBridgeStarted.compareAndSet(false, true)) {
259            if (LOG.isTraceEnabled()) {
260                LOG.trace(configuration.getBrokerName() + " starting remote Bridge, remoteBroker=" + remoteBroker);
261            }
262            synchronized (this) {
263                if (!isCreatedByDuplex()) {
264                    BrokerInfo brokerInfo = new BrokerInfo();
265                    brokerInfo.setBrokerName(configuration.getBrokerName());
266                    brokerInfo.setBrokerURL(configuration.getBrokerURL());
267                    brokerInfo.setNetworkConnection(true);
268                    brokerInfo.setDuplexConnection(configuration.isDuplex());
269                    // set our properties
270                    Properties props = new Properties();
271                    IntrospectionSupport.getProperties(configuration, props, null);
272                    String str = MarshallingSupport.propertiesToString(props);
273                    brokerInfo.setNetworkProperties(str);
274                    brokerInfo.setBrokerId(this.localBrokerId);
275                    remoteBroker.oneway(brokerInfo);
276                }
277                if (remoteConnectionInfo != null) {
278                    remoteBroker.oneway(remoteConnectionInfo.createRemoveCommand());
279                }
280                remoteConnectionInfo = new ConnectionInfo();
281                remoteConnectionInfo.setConnectionId(new ConnectionId(idGenerator.generateId()));
282                remoteConnectionInfo.setClientId(configuration.getName() + "_" + configuration.getBrokerName() + "_outbound");
283                remoteConnectionInfo.setUserName(configuration.getUserName());
284                remoteConnectionInfo.setPassword(configuration.getPassword());
285                remoteBroker.oneway(remoteConnectionInfo);
286
287                SessionInfo remoteSessionInfo = new SessionInfo(remoteConnectionInfo, 1);
288                remoteBroker.oneway(remoteSessionInfo);
289                producerInfo = new ProducerInfo(remoteSessionInfo, 1);
290                producerInfo.setResponseRequired(false);
291                remoteBroker.oneway(producerInfo);
292                // Listen to consumer advisory messages on the remote broker to
293                // determine demand.
294                if (!configuration.isStaticBridge()) {
295                    demandConsumerInfo = new ConsumerInfo(remoteSessionInfo, 1);
296                    demandConsumerInfo.setDispatchAsync(configuration.isDispatchAsync());
297                    String advisoryTopic = configuration.getDestinationFilter();
298                    if (configuration.isBridgeTempDestinations()) {
299                        advisoryTopic += "," + AdvisorySupport.TEMP_DESTINATION_COMPOSITE_ADVISORY_TOPIC;
300                    }
301                    demandConsumerInfo.setDestination(new ActiveMQTopic(advisoryTopic));
302                    demandConsumerInfo.setPrefetchSize(configuration.getPrefetchSize());
303                    remoteBroker.oneway(demandConsumerInfo);
304                }
305                startedLatch.countDown();
306            }
307        }
308    }
309
310    public void stop() throws Exception {
311        if (started.compareAndSet(true, false)) {
312            if (disposed.compareAndSet(false, true)) {
313                LOG.debug(" stopping " + configuration.getBrokerName() + " bridge to " + remoteBrokerName);
314                NetworkBridgeListener l = this.networkBridgeListener;
315                if (l != null) {
316                    l.onStop(this);
317                }
318                try {
319                    remoteBridgeStarted.set(false);
320                    final CountDownLatch sendShutdown = new CountDownLatch(1);
321                    asyncTaskRunner.execute(new Runnable() {
322                        public void run() {
323                            try {
324                                localBroker.oneway(new ShutdownInfo());
325                                sendShutdown.countDown();
326                                remoteBroker.oneway(new ShutdownInfo());
327                            } catch (Throwable e) {
328                                LOG.debug("Caught exception sending shutdown", e);
329                            } finally {
330                                sendShutdown.countDown();
331                            }
332
333                        }
334                    });
335                    if (!sendShutdown.await(10, TimeUnit.SECONDS)) {
336                        LOG.info("Network Could not shutdown in a timely manner");
337                    }
338                } finally {
339                    ServiceStopper ss = new ServiceStopper();
340                    ss.stop(remoteBroker);
341                    ss.stop(localBroker);
342                    // Release the started Latch since another thread could be
343                    // stuck waiting for it to start up.
344                    startedLatch.countDown();
345                    startedLatch.countDown();
346                    localStartedLatch.countDown();
347                    ss.throwFirstException();
348                }
349            }
350            if (remoteBrokerInfo != null) {
351                brokerService.getBroker().removeBroker(null, remoteBrokerInfo);
352                brokerService.getBroker().networkBridgeStopped(remoteBrokerInfo);
353                LOG.info(configuration.getBrokerName() + " bridge to " + remoteBrokerName + " stopped");
354            }
355        }
356    }
357
358    public void serviceRemoteException(Throwable error) {
359        if (!disposed.get()) {
360            if (error instanceof SecurityException || error instanceof GeneralSecurityException) {
361                LOG.error("Network connection between " + localBroker + " and " + remoteBroker + " shutdown due to a remote error: " + error);
362            } else {
363                LOG.warn("Network connection between " + localBroker + " and " + remoteBroker + " shutdown due to a remote error: " + error);
364            }
365            LOG.debug("The remote Exception was: " + error, error);
366            asyncTaskRunner.execute(new Runnable() {
367                public void run() {
368                    ServiceSupport.dispose(getControllingService());
369                }
370            });
371            fireBridgeFailed();
372        }
373    }
374
375    protected void serviceRemoteCommand(Command command) {
376        if (!disposed.get()) {
377            try {
378                if (command.isMessageDispatch()) {
379                    waitStarted();
380                    MessageDispatch md = (MessageDispatch) command;
381                    serviceRemoteConsumerAdvisory(md.getMessage().getDataStructure());
382                    ackAdvisory(md.getMessage());
383                } else if (command.isBrokerInfo()) {
384                    lastConnectSucceeded.set(true);
385                    remoteBrokerInfo = (BrokerInfo) command;
386                    Properties props = MarshallingSupport.stringToProperties(remoteBrokerInfo.getNetworkProperties());
387                    try {
388                        IntrospectionSupport.getProperties(configuration, props, null);
389                        if (configuration.getExcludedDestinations() != null) {
390                            excludedDestinations = configuration.getExcludedDestinations().toArray(
391                                    new ActiveMQDestination[configuration.getExcludedDestinations().size()]);
392                        }
393                        if (configuration.getStaticallyIncludedDestinations() != null) {
394                            staticallyIncludedDestinations = configuration.getStaticallyIncludedDestinations().toArray(
395                                    new ActiveMQDestination[configuration.getStaticallyIncludedDestinations().size()]);
396                        }
397                        if (configuration.getDynamicallyIncludedDestinations() != null) {
398                            dynamicallyIncludedDestinations = configuration.getDynamicallyIncludedDestinations()
399                                    .toArray(
400                                            new ActiveMQDestination[configuration.getDynamicallyIncludedDestinations()
401                                                    .size()]);
402                        }
403                    } catch (Throwable t) {
404                        LOG.error("Error mapping remote destinations", t);
405                    }
406                    serviceRemoteBrokerInfo(command);
407                    // Let the local broker know the remote broker's ID.
408                    localBroker.oneway(command);
409                    // new peer broker (a consumer can work with remote broker also)
410                    brokerService.getBroker().addBroker(null, remoteBrokerInfo);
411                } else if (command.getClass() == ConnectionError.class) {
412                    ConnectionError ce = (ConnectionError) command;
413                    serviceRemoteException(ce.getException());
414                } else {
415                    if (isDuplex()) {
416                        if (command.isMessage()) {
417                            ActiveMQMessage message = (ActiveMQMessage) command;
418                            if (AdvisorySupport.isConsumerAdvisoryTopic(message.getDestination())
419                                || AdvisorySupport.isDestinationAdvisoryTopic(message.getDestination())) {
420                                serviceRemoteConsumerAdvisory(message.getDataStructure());
421                                ackAdvisory(message);
422                            } else {
423                                if (!isPermissableDestination(message.getDestination(), true)) {
424                                    return;
425                                }
426                                if (message.isResponseRequired()) {
427                                    Response reply = new Response();
428                                    reply.setCorrelationId(message.getCommandId());
429                                    localBroker.oneway(message);
430                                    remoteBroker.oneway(reply);
431                                } else {
432                                    localBroker.oneway(message);
433                                }
434                            }
435                        } else {
436                            switch (command.getDataStructureType()) {
437                            case ConnectionInfo.DATA_STRUCTURE_TYPE:
438                            case SessionInfo.DATA_STRUCTURE_TYPE:
439                            case ProducerInfo.DATA_STRUCTURE_TYPE:
440                                localBroker.oneway(command);
441                                break;
442                            case MessageAck.DATA_STRUCTURE_TYPE:
443                                MessageAck ack = (MessageAck) command;
444                                DemandSubscription localSub = subscriptionMapByRemoteId.get(ack.getConsumerId());
445                                if (localSub != null) {
446                                    ack.setConsumerId(localSub.getLocalInfo().getConsumerId());
447                                    localBroker.oneway(ack);
448                                } else {
449                                    LOG.warn("Matching local subscription not found for ack: " + ack);
450                                }
451                                break;
452                            case ConsumerInfo.DATA_STRUCTURE_TYPE:
453                                localStartedLatch.await();
454                                if (started.get()) {
455                                    if (!addConsumerInfo((ConsumerInfo) command)) {
456                                        if (LOG.isDebugEnabled()) {
457                                            LOG.debug("Ignoring ConsumerInfo: " + command);
458                                        }
459                                    } else {
460                                        if (LOG.isTraceEnabled()) {
461                                            LOG.trace("Adding ConsumerInfo: " + command);
462                                        }
463                                    }
464                                } else {
465                                    // received a subscription whilst stopping
466                                    LOG.warn("Stopping - ignoring ConsumerInfo: " + command);
467                                }
468                                break;
469                            case ShutdownInfo.DATA_STRUCTURE_TYPE:
470                                // initiator is shutting down, controlled case
471                                // abortive close dealt with by inactivity monitor
472                                LOG.info("Stopping network bridge on shutdown of remote broker");
473                                serviceRemoteException(new IOException(command.toString()));
474                                break;
475                            default:
476                                if (LOG.isDebugEnabled()) {
477                                    LOG.debug("Ignoring remote command: " + command);
478                                }
479                            }
480                        }
481                    } else {
482                        switch (command.getDataStructureType()) {
483                        case KeepAliveInfo.DATA_STRUCTURE_TYPE:
484                        case WireFormatInfo.DATA_STRUCTURE_TYPE:
485                        case ShutdownInfo.DATA_STRUCTURE_TYPE:
486                            break;
487                        default:
488                            LOG.warn("Unexpected remote command: " + command);
489                        }
490                    }
491                }
492            } catch (Throwable e) {
493                if (LOG.isDebugEnabled()) {
494                    LOG.debug("Exception processing remote command: " + command, e);
495                }
496                serviceRemoteException(e);
497            }
498        }
499    }
500
501    private void ackAdvisory(Message message) throws IOException {
502        demandConsumerDispatched++;
503        if (demandConsumerDispatched > (demandConsumerInfo.getPrefetchSize() * .75)) {
504            MessageAck ack = new MessageAck(message, MessageAck.STANDARD_ACK_TYPE, demandConsumerDispatched);
505            ack.setConsumerId(demandConsumerInfo.getConsumerId());
506            remoteBroker.oneway(ack);
507            demandConsumerDispatched = 0;
508        }
509    }
510
511    private void serviceRemoteConsumerAdvisory(DataStructure data) throws IOException {
512        final int networkTTL = configuration.getNetworkTTL();
513        if (data.getClass() == ConsumerInfo.class) {
514            // Create a new local subscription
515            ConsumerInfo info = (ConsumerInfo) data;
516            BrokerId[] path = info.getBrokerPath();
517
518            if (info.isBrowser()) {
519                if (LOG.isDebugEnabled()) {
520                    LOG.info(configuration.getBrokerName() + " Ignoring sub from " + remoteBrokerName + ", browsers explicitly suppressed");
521                }
522                return;
523            }
524
525            if (path != null && path.length >= networkTTL) {
526                if (LOG.isDebugEnabled()) {
527                    LOG.debug(configuration.getBrokerName() + " Ignoring sub from " + remoteBrokerName + ", restricted to " + networkTTL + " network hops only : " + info);
528                }
529                return;
530            }
531            if (contains(path, localBrokerPath[0])) {
532                // Ignore this consumer as it's a consumer we locally sent to the broker.
533                if (LOG.isDebugEnabled()) {
534                    LOG.debug(configuration.getBrokerName() + " Ignoring sub from " + remoteBrokerName + ", already routed through this broker once : " + info);
535                }
536                return;
537            }
538            if (!isPermissableDestination(info.getDestination())) {
539                // ignore if not in the permitted or in the excluded list
540                if (LOG.isDebugEnabled()) {
541                    LOG.debug(configuration.getBrokerName() + " Ignoring sub from " + remoteBrokerName + ", destination " + info.getDestination() + " is not permiited :" + info);
542                }
543                return;
544            }
545
546            // in a cyclic network there can be multiple bridges per broker that can propagate
547            // a network subscription so there is a need to synchronise on a shared entity
548            synchronized (brokerService.getVmConnectorURI()) {
549                if (addConsumerInfo(info)) {
550                    if (LOG.isDebugEnabled()) {
551                        LOG.debug(configuration.getBrokerName() + " bridged sub on " + localBroker + " from " + remoteBrokerName + " : " + info);
552                    }
553                } else {
554                    if (LOG.isDebugEnabled()) {
555                        LOG.debug(configuration.getBrokerName() + " Ignoring sub from " + remoteBrokerName + " as already subscribed to matching destination : " + info);
556                    }
557                }
558            }
559        } else if (data.getClass() == DestinationInfo.class) {
560            // It's a destination info - we want to pass up
561            // information about temporary destinations
562            DestinationInfo destInfo = (DestinationInfo) data;
563            BrokerId[] path = destInfo.getBrokerPath();
564            if (path != null && path.length >= networkTTL) {
565                if (LOG.isDebugEnabled()) {
566                    LOG.debug(configuration.getBrokerName() + " Ignoring destination " + destInfo + " restricted to " + networkTTL + " network hops only");
567                }
568                return;
569            }
570            if (contains(destInfo.getBrokerPath(), localBrokerPath[0])) {
571                // Ignore this consumer as it's a consumer we locally sent to
572                // the broker.
573                if (LOG.isDebugEnabled()) {
574                    LOG.debug(configuration.getBrokerName() + " Ignoring destination " + destInfo + " already routed through this broker once");
575                }
576                return;
577            }
578            destInfo.setConnectionId(localConnectionInfo.getConnectionId());
579            if (destInfo.getDestination() instanceof ActiveMQTempDestination) {
580                // re-set connection id so comes from here
581                ActiveMQTempDestination tempDest = (ActiveMQTempDestination) destInfo.getDestination();
582                tempDest.setConnectionId(localSessionInfo.getSessionId().getConnectionId());
583            }
584            destInfo.setBrokerPath(appendToBrokerPath(destInfo.getBrokerPath(), getRemoteBrokerPath()));
585            if (LOG.isTraceEnabled()) {
586                LOG.trace(configuration.getBrokerName() + " bridging " + (destInfo.isAddOperation() ? "add" : "remove") + " destination on " + localBroker + " from " + remoteBrokerName + ", destination: " + destInfo);
587            }
588            localBroker.oneway(destInfo);
589        } else if (data.getClass() == RemoveInfo.class) {
590            ConsumerId id = (ConsumerId) ((RemoveInfo) data).getObjectId();
591            removeDemandSubscription(id);
592        }
593    }
594
595    public void serviceLocalException(Throwable error) {
596        if (!disposed.get()) {
597            LOG.info("Network connection between " + localBroker + " and " + remoteBroker + " shutdown due to a local error: " + error);
598            LOG.debug("The local Exception was:" + error, error);
599            asyncTaskRunner.execute(new Runnable() {
600                public void run() {
601                    ServiceSupport.dispose(getControllingService());
602                }
603            });
604            fireBridgeFailed();
605        }
606    }
607
608    protected Service getControllingService() {
609        return duplexInitiatingConnection != null ? duplexInitiatingConnection : DemandForwardingBridgeSupport.this;
610    }
611
612    protected void addSubscription(DemandSubscription sub) throws IOException {
613        if (sub != null) {
614            localBroker.oneway(sub.getLocalInfo());
615        }
616    }
617
618    protected void removeSubscription(final DemandSubscription sub) throws IOException {
619        if (sub != null) {
620            if (LOG.isDebugEnabled()) {
621                LOG.debug(configuration.getBrokerName() + " remove local subscription for remote " + sub.getRemoteInfo().getConsumerId());
622            }
623            subscriptionMapByLocalId.remove(sub.getLocalInfo().getConsumerId());
624            subscriptionMapByRemoteId.remove(sub.getRemoteInfo().getConsumerId());
625
626            // continue removal in separate thread to free up this thread for outstanding responses
627            asyncTaskRunner.execute(new Runnable() {
628                public void run() {
629                    sub.waitForCompletion();
630                    try {
631                        localBroker.oneway(sub.getLocalInfo().createRemoveCommand());
632                    } catch (IOException e) {
633                        LOG.warn("failed to deliver remove command for local subscription, for remote " + sub.getRemoteInfo().getConsumerId(), e);
634                    }
635                }
636            });
637        }
638    }
639
640    protected Message configureMessage(MessageDispatch md) {
641        Message message = md.getMessage().copy();
642        // Update the packet to show where it came from.
643        message.setBrokerPath(appendToBrokerPath(message.getBrokerPath(), localBrokerPath));
644        message.setProducerId(producerInfo.getProducerId());
645        message.setDestination(md.getDestination());
646        if (message.getOriginalTransactionId() == null) {
647            message.setOriginalTransactionId(message.getTransactionId());
648        }
649        message.setTransactionId(null);
650        return message;
651    }
652
653    protected void serviceLocalCommand(Command command) {
654        if (!disposed.get()) {
655            try {
656                if (command.isMessageDispatch()) {
657                    enqueueCounter.incrementAndGet();
658                    final MessageDispatch md = (MessageDispatch) command;
659                    final DemandSubscription sub = subscriptionMapByLocalId.get(md.getConsumerId());
660                    if (sub != null && md.getMessage() != null && sub.incrementOutstandingResponses()) {
661
662                        if (suppressMessageDispatch(md, sub)) {
663                            if (LOG.isDebugEnabled()) {
664                                LOG.debug(configuration.getBrokerName() + " message not forwarded to " + remoteBrokerName + " because message came from there or fails networkTTL, brokerPath: " + Arrays.toString(md.getMessage().getBrokerPath()) + ", message: " + md.getMessage());
665                            }
666                            // still ack as it may be durable
667                            try {
668                                localBroker.oneway(new MessageAck(md, MessageAck.INDIVIDUAL_ACK_TYPE, 1));
669                            } finally {
670                                sub.decrementOutstandingResponses();
671                            }
672                            return;
673                        }
674
675                        Message message = configureMessage(md);
676                        if (LOG.isDebugEnabled()) {
677                            LOG.debug("bridging (" + configuration.getBrokerName() + " -> " + remoteBrokerName + ") " + (LOG.isTraceEnabled() ? message : message.getMessageId()) + ", consumer: " + md.getConsumerId() + ", destination " + message.getDestination() + ", brokerPath: " + Arrays.toString(message.getBrokerPath()) + ", message: " + message);
678                        }
679
680                        if (!configuration.isAlwaysSyncSend() && !message.isPersistent()) {
681
682                            // If the message was originally sent using async
683                            // send, we will preserve that QOS
684                            // by bridging it using an async send (small chance
685                            // of message loss).
686                            try {
687                                remoteBroker.oneway(message);
688                                localBroker.oneway(new MessageAck(md, MessageAck.INDIVIDUAL_ACK_TYPE, 1));
689                                dequeueCounter.incrementAndGet();
690                            } finally {
691                                sub.decrementOutstandingResponses();
692                            }
693
694                        } else {
695
696                            // The message was not sent using async send, so we
697                            // should only ack the local
698                            // broker when we get confirmation that the remote
699                            // broker has received the message.
700                            ResponseCallback callback = new ResponseCallback() {
701                                public void onCompletion(FutureResponse future) {
702                                    try {
703                                        Response response = future.getResult();
704                                        if (response.isException()) {
705                                            ExceptionResponse er = (ExceptionResponse) response;
706                                            serviceLocalException(er.getException());
707                                        } else {
708                                            localBroker.oneway(new MessageAck(md, MessageAck.INDIVIDUAL_ACK_TYPE, 1));
709                                            dequeueCounter.incrementAndGet();
710                                        }
711                                    } catch (IOException e) {
712                                        serviceLocalException(e);
713                                    } finally {
714                                        sub.decrementOutstandingResponses();
715                                    }
716                                }
717                            };
718
719                            remoteBroker.asyncRequest(message, callback);
720
721                        }
722                    } else {
723                        if (LOG.isDebugEnabled()) {
724                            LOG.debug("No subscription registered with this network bridge for consumerId " + md.getConsumerId() + " for message: " + md.getMessage());
725                        }
726                    }
727                } else if (command.isBrokerInfo()) {
728                    localBrokerInfo = (BrokerInfo) command;
729                    serviceLocalBrokerInfo(command);
730                } else if (command.isShutdownInfo()) {
731                    LOG.info(configuration.getBrokerName() + " Shutting down");
732                    stop();
733                } else if (command.getClass() == ConnectionError.class) {
734                    ConnectionError ce = (ConnectionError) command;
735                    serviceLocalException(ce.getException());
736                } else {
737                    switch (command.getDataStructureType()) {
738                    case WireFormatInfo.DATA_STRUCTURE_TYPE:
739                        break;
740                    default:
741                        LOG.warn("Unexpected local command: " + command);
742                    }
743                }
744            } catch (Throwable e) {
745                LOG.warn("Caught an exception processing local command", e);
746                serviceLocalException(e);
747            }
748        }
749    }
750
751    private boolean suppressMessageDispatch(MessageDispatch md, DemandSubscription sub) throws Exception {
752        boolean suppress = false;
753        // for durable subs, suppression via filter leaves dangling acks so we need to
754        // check here and allow the ack irrespective
755        if (sub.getLocalInfo().isDurable()) {
756            MessageEvaluationContext messageEvalContext = new MessageEvaluationContext();
757            messageEvalContext.setMessageReference(md.getMessage());
758            messageEvalContext.setDestination(md.getDestination());
759            suppress = !sub.getNetworkBridgeFilter().matches(messageEvalContext);
760        }
761        return suppress;
762    }
763
764    /**
765     * @return Returns the dynamicallyIncludedDestinations.
766     */
767    public ActiveMQDestination[] getDynamicallyIncludedDestinations() {
768        return dynamicallyIncludedDestinations;
769    }
770
771    /**
772     * @param dynamicallyIncludedDestinations The
773     *            dynamicallyIncludedDestinations to set.
774     */
775    public void setDynamicallyIncludedDestinations(ActiveMQDestination[] dynamicallyIncludedDestinations) {
776        this.dynamicallyIncludedDestinations = dynamicallyIncludedDestinations;
777    }
778
779    /**
780     * @return Returns the excludedDestinations.
781     */
782    public ActiveMQDestination[] getExcludedDestinations() {
783        return excludedDestinations;
784    }
785
786    /**
787     * @param excludedDestinations The excludedDestinations to set.
788     */
789    public void setExcludedDestinations(ActiveMQDestination[] excludedDestinations) {
790        this.excludedDestinations = excludedDestinations;
791    }
792
793    /**
794     * @return Returns the staticallyIncludedDestinations.
795     */
796    public ActiveMQDestination[] getStaticallyIncludedDestinations() {
797        return staticallyIncludedDestinations;
798    }
799
800    /**
801     * @param staticallyIncludedDestinations The staticallyIncludedDestinations
802     *            to set.
803     */
804    public void setStaticallyIncludedDestinations(ActiveMQDestination[] staticallyIncludedDestinations) {
805        this.staticallyIncludedDestinations = staticallyIncludedDestinations;
806    }
807
808    /**
809     * @return Returns the durableDestinations.
810     */
811    public ActiveMQDestination[] getDurableDestinations() {
812        return durableDestinations;
813    }
814
815    /**
816     * @param durableDestinations The durableDestinations to set.
817     */
818    public void setDurableDestinations(ActiveMQDestination[] durableDestinations) {
819        this.durableDestinations = durableDestinations;
820    }
821
822    /**
823     * @return Returns the localBroker.
824     */
825    public Transport getLocalBroker() {
826        return localBroker;
827    }
828
829    /**
830     * @return Returns the remoteBroker.
831     */
832    public Transport getRemoteBroker() {
833        return remoteBroker;
834    }
835
836    /**
837     * @return the createdByDuplex
838     */
839    public boolean isCreatedByDuplex() {
840        return this.createdByDuplex;
841    }
842
843    /**
844     * @param createdByDuplex the createdByDuplex to set
845     */
846    public void setCreatedByDuplex(boolean createdByDuplex) {
847        this.createdByDuplex = createdByDuplex;
848    }
849
850    public static boolean contains(BrokerId[] brokerPath, BrokerId brokerId) {
851        if (brokerPath != null) {
852            for (int i = 0; i < brokerPath.length; i++) {
853                if (brokerId.equals(brokerPath[i])) {
854                    return true;
855                }
856            }
857        }
858        return false;
859    }
860
861    protected BrokerId[] appendToBrokerPath(BrokerId[] brokerPath, BrokerId[] pathsToAppend) {
862        if (brokerPath == null || brokerPath.length == 0) {
863            return pathsToAppend;
864        }
865        BrokerId rc[] = new BrokerId[brokerPath.length + pathsToAppend.length];
866        System.arraycopy(brokerPath, 0, rc, 0, brokerPath.length);
867        System.arraycopy(pathsToAppend, 0, rc, brokerPath.length, pathsToAppend.length);
868        return rc;
869    }
870
871    protected BrokerId[] appendToBrokerPath(BrokerId[] brokerPath, BrokerId idToAppend) {
872        if (brokerPath == null || brokerPath.length == 0) {
873            return new BrokerId[] { idToAppend };
874        }
875        BrokerId rc[] = new BrokerId[brokerPath.length + 1];
876        System.arraycopy(brokerPath, 0, rc, 0, brokerPath.length);
877        rc[brokerPath.length] = idToAppend;
878        return rc;
879    }
880
881    protected boolean isPermissableDestination(ActiveMQDestination destination) {
882        return isPermissableDestination(destination, false);
883    }
884
885    protected boolean isPermissableDestination(ActiveMQDestination destination, boolean allowTemporary) {
886        // Are we not bridging temp destinations?
887        if (destination.isTemporary()) {
888            if (allowTemporary) {
889                return true;
890            } else {
891                return configuration.isBridgeTempDestinations();
892            }
893        }
894
895        ActiveMQDestination[] dests = staticallyIncludedDestinations;
896        if (dests != null && dests.length > 0) {
897            for (int i = 0; i < dests.length; i++) {
898                ActiveMQDestination match = dests[i];
899                DestinationFilter inclusionFilter = DestinationFilter.parseFilter(match);
900                if (match != null && inclusionFilter.matches(destination) && dests[i].getDestinationType() == destination.getDestinationType()) {
901                    return true;
902                }
903            }
904        }
905
906        dests = excludedDestinations;
907        if (dests != null && dests.length > 0) {
908            for (int i = 0; i < dests.length; i++) {
909                ActiveMQDestination match = dests[i];
910                DestinationFilter exclusionFilter = DestinationFilter.parseFilter(match);
911                if (match != null && exclusionFilter.matches(destination) && dests[i].getDestinationType() == destination.getDestinationType()) {
912                    return false;
913                }
914            }
915        }
916
917        dests = dynamicallyIncludedDestinations;
918        if (dests != null && dests.length > 0) {
919            for (int i = 0; i < dests.length; i++) {
920                ActiveMQDestination match = dests[i];
921                DestinationFilter inclusionFilter = DestinationFilter.parseFilter(match);
922                if (match != null && inclusionFilter.matches(destination) && dests[i].getDestinationType() == destination.getDestinationType()) {
923                    return true;
924                }
925            }
926
927            return false;
928        }
929        return true;
930    }
931
932    /**
933     * Subscriptions for these destinations are always created
934     */
935    protected void setupStaticDestinations() {
936        ActiveMQDestination[] dests = staticallyIncludedDestinations;
937        if (dests != null) {
938            for (int i = 0; i < dests.length; i++) {
939                ActiveMQDestination dest = dests[i];
940                DemandSubscription sub = createDemandSubscription(dest);
941                try {
942                    addSubscription(sub);
943                } catch (IOException e) {
944                    LOG.error("Failed to add static destination " + dest, e);
945                }
946                if (LOG.isTraceEnabled()) {
947                    LOG.trace("bridging messages for static destination: " + dest);
948                }
949            }
950        }
951    }
952
953    protected boolean addConsumerInfo(final ConsumerInfo consumerInfo) throws IOException {
954        boolean consumerAdded = false;
955        ConsumerInfo info = consumerInfo.copy();
956        addRemoteBrokerToBrokerPath(info);
957        DemandSubscription sub = createDemandSubscription(info);
958        if (sub != null) {
959            if (duplicateSuppressionIsRequired(sub)) {
960                undoMapRegistration(sub);
961            } else {
962                addSubscription(sub);
963                consumerAdded = true;
964            }
965        }
966        return consumerAdded;
967    }
968
969    private void undoMapRegistration(DemandSubscription sub) {
970        subscriptionMapByLocalId.remove(sub.getLocalInfo().getConsumerId());
971        subscriptionMapByRemoteId.remove(sub.getRemoteInfo().getConsumerId());
972    }
973
974    /*
975     * check our existing subs networkConsumerIds against the list of network ids in this subscription
976     * A match means a duplicate which we suppress for topics and maybe for queues
977     */
978    private boolean duplicateSuppressionIsRequired(DemandSubscription candidate) {
979        final ConsumerInfo consumerInfo = candidate.getRemoteInfo();
980        boolean suppress = false;
981
982        if (consumerInfo.getDestination().isQueue() && !configuration.isSuppressDuplicateQueueSubscriptions() ||
983                consumerInfo.getDestination().isTopic() && !configuration.isSuppressDuplicateTopicSubscriptions()) {
984            return suppress;
985        }
986
987        List<ConsumerId> candidateConsumers = consumerInfo.getNetworkConsumerIds();
988        Collection<Subscription> currentSubs =
989            getRegionSubscriptions(consumerInfo.getDestination());
990        for (Subscription sub : currentSubs) {
991            List<ConsumerId> networkConsumers = sub.getConsumerInfo().getNetworkConsumerIds();
992            if (!networkConsumers.isEmpty()) {
993                if (matchFound(candidateConsumers, networkConsumers)) {
994                    if (isInActiveDurableSub(sub)) {
995                        suppress = false;
996                    } else {
997                        suppress = hasLowerPriority(sub, candidate.getLocalInfo());
998                    }
999                    break;
1000                }
1001            }
1002        }
1003        return suppress;
1004    }
1005
1006    private boolean isInActiveDurableSub(Subscription sub) {
1007        return  (sub.getConsumerInfo().isDurable() && sub instanceof DurableTopicSubscription && !((DurableTopicSubscription)sub).isActive());
1008    }
1009
1010    private boolean hasLowerPriority(Subscription existingSub, ConsumerInfo candidateInfo) {
1011        boolean suppress = false;
1012
1013        if (existingSub.getConsumerInfo().getPriority() >= candidateInfo.getPriority()) {
1014            if (LOG.isDebugEnabled()) {
1015                LOG.debug(configuration.getBrokerName() + " Ignoring duplicate subscription from " + remoteBrokerName
1016                        + ", sub: " + candidateInfo + " is duplicated by network subscription with equal or higher network priority: "
1017                        + existingSub  + ", networkConsumerIds: " + existingSub.getConsumerInfo().getNetworkConsumerIds());
1018            }
1019            suppress = true;
1020        } else {
1021            // remove the existing lower priority duplicate and allow this candidate
1022            try {
1023                removeDuplicateSubscription(existingSub);
1024
1025                if (LOG.isDebugEnabled()) {
1026                    LOG.debug(configuration.getBrokerName() + " Replacing duplicate subscription " + existingSub.getConsumerInfo()
1027                            + " with sub from " + remoteBrokerName
1028                            + ", which has a higher priority, new sub: " + candidateInfo + ", networkComsumerIds: "
1029                            + candidateInfo.getNetworkConsumerIds());
1030                }
1031            } catch (IOException e) {
1032                LOG.error("Failed to remove duplicated sub as a result of sub with higher priority, sub: " + existingSub, e);
1033            }
1034        }
1035        return suppress;
1036    }
1037
1038    private void removeDuplicateSubscription(Subscription existingSub) throws IOException {
1039        for (NetworkConnector connector : brokerService.getNetworkConnectors()) {
1040            if (connector.removeDemandSubscription(existingSub.getConsumerInfo().getConsumerId())) {
1041                break;
1042            }
1043        }
1044    }
1045
1046    private boolean matchFound(List<ConsumerId> candidateConsumers, List<ConsumerId> networkConsumers) {
1047        boolean found = false;
1048        for (ConsumerId aliasConsumer : networkConsumers) {
1049            if (candidateConsumers.contains(aliasConsumer)) {
1050                found = true;
1051                break;
1052            }
1053        }
1054        return found;
1055    }
1056
1057    private final Collection<Subscription> getRegionSubscriptions(ActiveMQDestination dest) {
1058        RegionBroker region_broker = (RegionBroker) brokerService.getRegionBroker();
1059        Region region;
1060        Collection<Subscription> subs;
1061
1062        region = null;
1063        switch ( dest.getDestinationType() )
1064        {
1065            case ActiveMQDestination.QUEUE_TYPE:
1066                region = region_broker.getQueueRegion();
1067                break;
1068
1069            case ActiveMQDestination.TOPIC_TYPE:
1070                region = region_broker.getTopicRegion();
1071                break;
1072
1073            case ActiveMQDestination.TEMP_QUEUE_TYPE:
1074                region = region_broker.getTempQueueRegion();
1075                break;
1076
1077            case ActiveMQDestination.TEMP_TOPIC_TYPE:
1078                region = region_broker.getTempTopicRegion();
1079                break;
1080        }
1081
1082        if ( region instanceof AbstractRegion )
1083            subs = ((AbstractRegion) region).getSubscriptions().values();
1084        else
1085            subs = null;
1086
1087        return subs;
1088    }
1089
1090    protected DemandSubscription createDemandSubscription(ConsumerInfo info) throws IOException {
1091        //add our original id to ourselves
1092        info.addNetworkConsumerId(info.getConsumerId());
1093        return doCreateDemandSubscription(info);
1094    }
1095
1096    protected DemandSubscription doCreateDemandSubscription(ConsumerInfo info) throws IOException {
1097        DemandSubscription result = new DemandSubscription(info);
1098        result.getLocalInfo().setConsumerId(new ConsumerId(localSessionInfo.getSessionId(), consumerIdGenerator.getNextSequenceId()));
1099        if (info.getDestination().isTemporary()) {
1100            // reset the local connection Id
1101
1102            ActiveMQTempDestination dest = (ActiveMQTempDestination) result.getLocalInfo().getDestination();
1103            dest.setConnectionId(localConnectionInfo.getConnectionId().toString());
1104        }
1105
1106        if (configuration.isDecreaseNetworkConsumerPriority()) {
1107            byte priority = (byte) configuration.getConsumerPriorityBase();
1108            if (info.getBrokerPath() != null && info.getBrokerPath().length > 1) {
1109                // The longer the path to the consumer, the less it's consumer priority.
1110                priority -= info.getBrokerPath().length + 1;
1111            }
1112            result.getLocalInfo().setPriority(priority);
1113            if (LOG.isDebugEnabled()) {
1114                LOG.debug(configuration.getBrokerName() + " using priority :" + priority + " for subscription: " + info);
1115            }
1116        }
1117        configureDemandSubscription(info, result);
1118        return result;
1119    }
1120
1121    final protected DemandSubscription createDemandSubscription(ActiveMQDestination destination) {
1122        ConsumerInfo info = new ConsumerInfo();
1123        info.setDestination(destination);
1124        // the remote info held by the DemandSubscription holds the original
1125        // consumerId,
1126        // the local info get's overwritten
1127
1128        info.setConsumerId(new ConsumerId(localSessionInfo.getSessionId(), consumerIdGenerator.getNextSequenceId()));
1129        DemandSubscription result = null;
1130        try {
1131            result = createDemandSubscription(info);
1132        } catch (IOException e) {
1133            LOG.error("Failed to create DemandSubscription ", e);
1134        }
1135        return result;
1136    }
1137
1138    protected void configureDemandSubscription(ConsumerInfo info, DemandSubscription sub) throws IOException {
1139        sub.getLocalInfo().setDispatchAsync(configuration.isDispatchAsync());
1140        sub.getLocalInfo().setPrefetchSize(configuration.getPrefetchSize());
1141        subscriptionMapByLocalId.put(sub.getLocalInfo().getConsumerId(), sub);
1142        subscriptionMapByRemoteId.put(sub.getRemoteInfo().getConsumerId(), sub);
1143
1144        sub.setNetworkBridgeFilter(createNetworkBridgeFilter(info));
1145        if (!info.isDurable()) {
1146            // This works for now since we use a VM connection to the local broker.
1147            // may need to change if we ever subscribe to a remote broker.
1148            sub.getLocalInfo().setAdditionalPredicate(sub.getNetworkBridgeFilter());
1149        } else  {
1150            // need to ack this message if it is ignored as it is durable so
1151            // we check before we send. see: suppressMessageDispatch()
1152        }
1153    }
1154
1155    protected void removeDemandSubscription(ConsumerId id) throws IOException {
1156        DemandSubscription sub = subscriptionMapByRemoteId.remove(id);
1157        if (LOG.isDebugEnabled()) {
1158            LOG.debug(configuration.getBrokerName() + " remove request on " + localBroker + " from " + remoteBrokerName + " , consumer id: " + id + ", matching sub: " + sub);
1159        }
1160        if (sub != null) {
1161            removeSubscription(sub);
1162            if (LOG.isDebugEnabled()) {
1163                LOG.debug(configuration.getBrokerName() + " removed sub on " + localBroker + " from " + remoteBrokerName + " :  " + sub.getRemoteInfo());
1164            }
1165        }
1166    }
1167
1168    protected boolean removeDemandSubscriptionByLocalId(ConsumerId consumerId) {
1169        boolean removeDone = false;
1170        DemandSubscription sub = subscriptionMapByLocalId.get(consumerId);
1171        if (sub != null) {
1172            try {
1173                removeDemandSubscription(sub.getRemoteInfo().getConsumerId());
1174                removeDone = true;
1175            } catch (IOException e) {
1176                LOG.debug("removeDemandSubscriptionByLocalId failed for localId: " + consumerId, e);
1177            }
1178        }
1179        return removeDone;
1180    }
1181
1182    protected void waitStarted() throws InterruptedException {
1183        startedLatch.await();
1184    }
1185
1186    protected void clearDownSubscriptions() {
1187        subscriptionMapByLocalId.clear();
1188        subscriptionMapByRemoteId.clear();
1189    }
1190
1191    protected NetworkBridgeFilter createNetworkBridgeFilter(ConsumerInfo info) throws IOException {
1192        NetworkBridgeFilterFactory filterFactory = defaultFilterFactory;
1193        if (brokerService != null && brokerService.getDestinationPolicy() != null) {
1194            PolicyEntry entry = brokerService.getDestinationPolicy().getEntryFor(info.getDestination());
1195            if (entry != null && entry.getNetworkBridgeFilterFactory() != null) {
1196                filterFactory = entry.getNetworkBridgeFilterFactory();
1197            }
1198        }
1199        return filterFactory.create(info, getRemoteBrokerPath(), configuration.getNetworkTTL());
1200    }
1201
1202    protected void serviceLocalBrokerInfo(Command command) throws InterruptedException {
1203        synchronized (brokerInfoMutex) {
1204            if (remoteBrokerId != null) {
1205                if (remoteBrokerId.equals(localBrokerId)) {
1206                    if (LOG.isTraceEnabled()) {
1207                        LOG.trace(configuration.getBrokerName() + " disconnecting local loop back connection for: " + remoteBrokerName + ", with id:" + remoteBrokerId);
1208                    }
1209                    waitStarted();
1210                    ServiceSupport.dispose(this);
1211                }
1212            }
1213        }
1214    }
1215
1216    protected void addRemoteBrokerToBrokerPath(ConsumerInfo info) throws IOException {
1217        info.setBrokerPath(appendToBrokerPath(info.getBrokerPath(), getRemoteBrokerPath()));
1218    }
1219
1220    protected void serviceRemoteBrokerInfo(Command command) throws IOException {
1221        synchronized (brokerInfoMutex) {
1222            BrokerInfo remoteBrokerInfo = (BrokerInfo)command;
1223            remoteBrokerId = remoteBrokerInfo.getBrokerId();
1224            remoteBrokerPath[0] = remoteBrokerId;
1225            remoteBrokerName = remoteBrokerInfo.getBrokerName();
1226            if (localBrokerId != null) {
1227                if (localBrokerId.equals(remoteBrokerId)) {
1228                    if (LOG.isTraceEnabled()) {
1229                        LOG.trace(configuration.getBrokerName() + " disconnecting remote loop back connection for: " + remoteBrokerName + ", with id:" + remoteBrokerId);
1230                    }
1231                    ServiceSupport.dispose(this);
1232                }
1233            }
1234            if (!disposed.get()) {
1235                triggerLocalStartBridge();
1236            }
1237        }
1238    }
1239
1240    protected  BrokerId[] getRemoteBrokerPath() {
1241        return remoteBrokerPath;
1242    }
1243
1244    public void setNetworkBridgeListener(NetworkBridgeListener listener) {
1245        this.networkBridgeListener = listener;
1246    }
1247
1248    private void fireBridgeFailed() {
1249        NetworkBridgeListener l = this.networkBridgeListener;
1250        if (l != null) {
1251            l.bridgeFailed();
1252        }
1253    }
1254
1255    public String getRemoteAddress() {
1256        return remoteBroker.getRemoteAddress();
1257    }
1258
1259    public String getLocalAddress() {
1260        return localBroker.getRemoteAddress();
1261    }
1262
1263    public String getRemoteBrokerName() {
1264        return remoteBrokerInfo == null ? null : remoteBrokerInfo.getBrokerName();
1265    }
1266
1267    public String getLocalBrokerName() {
1268        return localBrokerInfo == null ? null : localBrokerInfo.getBrokerName();
1269    }
1270
1271    public long getDequeueCounter() {
1272        return dequeueCounter.get();
1273    }
1274
1275    public long getEnqueueCounter() {
1276        return enqueueCounter.get();
1277    }
1278
1279    protected boolean isDuplex() {
1280        return configuration.isDuplex() || createdByDuplex;
1281    }
1282
1283    public ConcurrentHashMap<ConsumerId, DemandSubscription> getLocalSubscriptionMap() {
1284        return subscriptionMapByRemoteId;
1285    }
1286
1287    public void setBrokerService(BrokerService brokerService) {
1288        this.brokerService = brokerService;
1289        this.localBrokerId = brokerService.getRegionBroker().getBrokerId();
1290        localBrokerPath[0] = localBrokerId;
1291    }
1292
1293    public void setMbeanObjectName(ObjectName objectName) {
1294        this.mbeanObjectName = objectName;
1295    }
1296
1297    public ObjectName getMbeanObjectName() {
1298        return mbeanObjectName;
1299    }
1300}