001/**
002 * Licensed to the Apache Software Foundation (ASF) under one or more
003 * contributor license agreements.  See the NOTICE file distributed with
004 * this work for additional information regarding copyright ownership.
005 * The ASF licenses this file to You under the Apache License, Version 2.0
006 * (the "License"); you may not use this file except in compliance with
007 * the License.  You may obtain a copy of the License at
008 *
009 *      http://www.apache.org/licenses/LICENSE-2.0
010 *
011 * Unless required by applicable law or agreed to in writing, software
012 * distributed under the License is distributed on an "AS IS" BASIS,
013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014 * See the License for the specific language governing permissions and
015 * limitations under the License.
016 */
017package org.apache.activemq.network.jms;
018
019import java.util.Iterator;
020import java.util.List;
021import java.util.Map;
022import java.util.concurrent.CopyOnWriteArrayList;
023import java.util.concurrent.LinkedBlockingQueue;
024import java.util.concurrent.ThreadFactory;
025import java.util.concurrent.ThreadPoolExecutor;
026import java.util.concurrent.TimeUnit;
027import java.util.concurrent.atomic.AtomicBoolean;
028import java.util.concurrent.atomic.AtomicReference;
029
030import javax.jms.Connection;
031import javax.jms.Destination;
032import javax.jms.QueueConnection;
033
034import org.apache.activemq.ActiveMQConnectionFactory;
035import org.apache.activemq.Service;
036import org.apache.activemq.broker.BrokerService;
037import org.apache.activemq.util.LRUCache;
038import org.slf4j.Logger;
039import org.slf4j.LoggerFactory;
040import org.springframework.jndi.JndiTemplate;
041
042/**
043 * This bridge joins the gap between foreign JMS providers and ActiveMQ As some
044 * JMS providers are still only in compliance with JMS v1.0.1 , this bridge itself
045 * aimed to be in compliance with the JMS 1.0.2 specification.
046 */
047public abstract class JmsConnector implements Service {
048
049    private static int nextId;
050    private static final Logger LOG = LoggerFactory.getLogger(JmsConnector.class);
051
052    protected JndiTemplate jndiLocalTemplate;
053    protected JndiTemplate jndiOutboundTemplate;
054    protected JmsMesageConvertor inboundMessageConvertor;
055    protected JmsMesageConvertor outboundMessageConvertor;
056    protected AtomicBoolean initialized = new AtomicBoolean(false);
057    protected AtomicBoolean localSideInitialized = new AtomicBoolean(false);
058    protected AtomicBoolean foreignSideInitialized = new AtomicBoolean(false);
059    protected AtomicBoolean started = new AtomicBoolean(false);
060    protected AtomicBoolean failed = new AtomicBoolean();
061    protected AtomicReference<Connection> foreignConnection = new AtomicReference<Connection>();
062    protected AtomicReference<Connection> localConnection = new AtomicReference<Connection>();
063    protected ActiveMQConnectionFactory embeddedConnectionFactory;
064    protected int replyToDestinationCacheSize = 10000;
065    protected String outboundUsername;
066    protected String outboundPassword;
067    protected String localUsername;
068    protected String localPassword;
069    protected String outboundClientId;
070    protected String localClientId;
071    protected LRUCache<Destination, DestinationBridge> replyToBridges = createLRUCache();
072
073    private ReconnectionPolicy policy = new ReconnectionPolicy();
074    protected ThreadPoolExecutor connectionSerivce;
075    private List<DestinationBridge> inboundBridges = new CopyOnWriteArrayList<DestinationBridge>();
076    private List<DestinationBridge> outboundBridges = new CopyOnWriteArrayList<DestinationBridge>();
077    private String name;
078
079    private static LRUCache<Destination, DestinationBridge> createLRUCache() {
080        return new LRUCache<Destination, DestinationBridge>() {
081            private static final long serialVersionUID = -7446792754185879286L;
082
083            protected boolean removeEldestEntry(Map.Entry<Destination, DestinationBridge> enty) {
084                if (size() > maxCacheSize) {
085                    Iterator<Map.Entry<Destination, DestinationBridge>> iter = entrySet().iterator();
086                    Map.Entry<Destination, DestinationBridge> lru = iter.next();
087                    remove(lru.getKey());
088                    DestinationBridge bridge = (DestinationBridge)lru.getValue();
089                    try {
090                        bridge.stop();
091                        LOG.info("Expired bridge: " + bridge);
092                    } catch (Exception e) {
093                        LOG.warn("stopping expired bridge" + bridge + " caused an exception", e);
094                    }
095                }
096                return false;
097            }
098        };
099    }
100
101    public boolean init() {
102        boolean result = initialized.compareAndSet(false, true);
103        if (result) {
104            if (jndiLocalTemplate == null) {
105                jndiLocalTemplate = new JndiTemplate();
106            }
107            if (jndiOutboundTemplate == null) {
108                jndiOutboundTemplate = new JndiTemplate();
109            }
110            if (inboundMessageConvertor == null) {
111                inboundMessageConvertor = new SimpleJmsMessageConvertor();
112            }
113            if (outboundMessageConvertor == null) {
114                outboundMessageConvertor = new SimpleJmsMessageConvertor();
115            }
116            replyToBridges.setMaxCacheSize(getReplyToDestinationCacheSize());
117
118            connectionSerivce = createExecutor();
119
120            // Subclasses can override this to customize their own it.
121            result = doConnectorInit();
122        }
123        return result;
124    }
125
126    protected boolean doConnectorInit() {
127
128        // We try to make a connection via a sync call first so that the
129        // JmsConnector is fully initialized before the start call returns
130        // in order to avoid missing any messages that are dispatched
131        // immediately after startup.  If either side fails we queue an
132        // asynchronous task to manage the reconnect attempts.
133
134        try {
135            initializeLocalConnection();
136            localSideInitialized.set(true);
137        } catch(Exception e) {
138            // Queue up the task to attempt the local connection.
139            scheduleAsyncLocalConnectionReconnect();
140        }
141
142        try {
143            initializeForeignConnection();
144            foreignSideInitialized.set(true);
145        } catch(Exception e) {
146            // Queue up the task for the foreign connection now.
147            scheduleAsyncForeignConnectionReconnect();
148        }
149
150        return true;
151    }
152
153    public void start() throws Exception {
154        if (started.compareAndSet(false, true)) {
155            init();
156            for (DestinationBridge bridge : inboundBridges) {
157                bridge.start();
158            }
159            for (DestinationBridge bridge : outboundBridges) {
160                bridge.start();
161            }
162            LOG.info("JMS Connector " + getName() + " Started");
163        }
164    }
165
166    public void stop() throws Exception {
167        if (started.compareAndSet(true, false)) {
168
169            this.connectionSerivce.shutdown();
170
171            for (DestinationBridge bridge : inboundBridges) {
172                bridge.stop();
173            }
174            for (DestinationBridge bridge : outboundBridges) {
175                bridge.stop();
176            }
177            LOG.info("JMS Connector " + getName() + " Stopped");
178        }
179    }
180
181    public void clearBridges() {
182        inboundBridges.clear();
183        outboundBridges.clear();
184        replyToBridges.clear();
185    }
186
187    protected abstract Destination createReplyToBridge(Destination destination, Connection consumerConnection, Connection producerConnection);
188
189    /**
190     * One way to configure the local connection - this is called by The
191     * BrokerService when the Connector is embedded
192     *
193     * @param service
194     */
195    public void setBrokerService(BrokerService service) {
196        embeddedConnectionFactory = new ActiveMQConnectionFactory(service.getVmConnectorURI());
197    }
198
199    public Connection getLocalConnection() {
200        return this.localConnection.get();
201    }
202
203    public Connection getForeignConnection() {
204        return this.foreignConnection.get();
205    }
206
207    /**
208     * @return Returns the jndiTemplate.
209     */
210    public JndiTemplate getJndiLocalTemplate() {
211        return jndiLocalTemplate;
212    }
213
214    /**
215     * @param jndiTemplate The jndiTemplate to set.
216     */
217    public void setJndiLocalTemplate(JndiTemplate jndiTemplate) {
218        this.jndiLocalTemplate = jndiTemplate;
219    }
220
221    /**
222     * @return Returns the jndiOutboundTemplate.
223     */
224    public JndiTemplate getJndiOutboundTemplate() {
225        return jndiOutboundTemplate;
226    }
227
228    /**
229     * @param jndiOutboundTemplate The jndiOutboundTemplate to set.
230     */
231    public void setJndiOutboundTemplate(JndiTemplate jndiOutboundTemplate) {
232        this.jndiOutboundTemplate = jndiOutboundTemplate;
233    }
234
235    /**
236     * @return Returns the inboundMessageConvertor.
237     */
238    public JmsMesageConvertor getInboundMessageConvertor() {
239        return inboundMessageConvertor;
240    }
241
242    /**
243     * @param inboundMessageConvertor The inboundMessageConvertor to set.
244     */
245    public void setInboundMessageConvertor(JmsMesageConvertor jmsMessageConvertor) {
246        this.inboundMessageConvertor = jmsMessageConvertor;
247    }
248
249    /**
250     * @return Returns the outboundMessageConvertor.
251     */
252    public JmsMesageConvertor getOutboundMessageConvertor() {
253        return outboundMessageConvertor;
254    }
255
256    /**
257     * @param outboundMessageConvertor The outboundMessageConvertor to set.
258     */
259    public void setOutboundMessageConvertor(JmsMesageConvertor outboundMessageConvertor) {
260        this.outboundMessageConvertor = outboundMessageConvertor;
261    }
262
263    /**
264     * @return Returns the replyToDestinationCacheSize.
265     */
266    public int getReplyToDestinationCacheSize() {
267        return replyToDestinationCacheSize;
268    }
269
270    /**
271     * @param replyToDestinationCacheSize The replyToDestinationCacheSize to set.
272     */
273    public void setReplyToDestinationCacheSize(int replyToDestinationCacheSize) {
274        this.replyToDestinationCacheSize = replyToDestinationCacheSize;
275    }
276
277    /**
278     * @return Returns the localPassword.
279     */
280    public String getLocalPassword() {
281        return localPassword;
282    }
283
284    /**
285     * @param localPassword The localPassword to set.
286     */
287    public void setLocalPassword(String localPassword) {
288        this.localPassword = localPassword;
289    }
290
291    /**
292     * @return Returns the localUsername.
293     */
294    public String getLocalUsername() {
295        return localUsername;
296    }
297
298    /**
299     * @param localUsername The localUsername to set.
300     */
301    public void setLocalUsername(String localUsername) {
302        this.localUsername = localUsername;
303    }
304
305    /**
306     * @return Returns the outboundPassword.
307     */
308    public String getOutboundPassword() {
309        return outboundPassword;
310    }
311
312    /**
313     * @param outboundPassword The outboundPassword to set.
314     */
315    public void setOutboundPassword(String outboundPassword) {
316        this.outboundPassword = outboundPassword;
317    }
318
319    /**
320     * @return Returns the outboundUsername.
321     */
322    public String getOutboundUsername() {
323        return outboundUsername;
324    }
325
326    /**
327     * @param outboundUsername The outboundUsername to set.
328     */
329    public void setOutboundUsername(String outboundUsername) {
330        this.outboundUsername = outboundUsername;
331    }
332
333    /**
334     * @return the outboundClientId
335     */
336    public String getOutboundClientId() {
337        return outboundClientId;
338    }
339
340    /**
341     * @param outboundClientId the outboundClientId to set
342     */
343    public void setOutboundClientId(String outboundClientId) {
344        this.outboundClientId = outboundClientId;
345    }
346
347    /**
348     * @return the localClientId
349     */
350    public String getLocalClientId() {
351        return localClientId;
352    }
353
354    /**
355     * @param localClientId the localClientId to set
356     */
357    public void setLocalClientId(String localClientId) {
358        this.localClientId = localClientId;
359    }
360
361    /**
362     * @return the currently configured reconnection policy.
363     */
364    public ReconnectionPolicy getReconnectionPolicy() {
365        return this.policy;
366    }
367
368    /**
369     * @param policy The new reconnection policy this {@link JmsConnector} should use.
370     */
371    public void setReconnectionPolicy(ReconnectionPolicy policy) {
372        this.policy = policy;
373    }
374
375    /**
376     * @return returns true if the {@link JmsConnector} is connected to both brokers.
377     */
378    public boolean isConnected() {
379        return localConnection.get() != null && foreignConnection.get() != null;
380    }
381
382    protected void addInboundBridge(DestinationBridge bridge) {
383        if (!inboundBridges.contains(bridge)) {
384            inboundBridges.add(bridge);
385        }
386    }
387
388    protected void addOutboundBridge(DestinationBridge bridge) {
389        if (!outboundBridges.contains(bridge)) {
390            outboundBridges.add(bridge);
391        }
392    }
393
394    protected void removeInboundBridge(DestinationBridge bridge) {
395        inboundBridges.remove(bridge);
396    }
397
398    protected void removeOutboundBridge(DestinationBridge bridge) {
399        outboundBridges.remove(bridge);
400    }
401
402    public String getName() {
403        if (name == null) {
404            name = "Connector:" + getNextId();
405        }
406        return name;
407    }
408
409    public void setName(String name) {
410        this.name = name;
411    }
412
413    private static synchronized int getNextId() {
414        return nextId++;
415    }
416
417    public boolean isFailed() {
418        return this.failed.get();
419    }
420
421    /**
422     * Performs the work of connection to the local side of the Connection.
423     * <p>
424     * This creates the initial connection to the local end of the {@link JmsConnector}
425     * and then sets up all the destination bridges with the information needed to bridge
426     * on the local side of the connection.
427     *
428     * @throws Exception if the connection cannot be established for any reason.
429     */
430    protected abstract void initializeLocalConnection() throws Exception;
431
432    /**
433     * Performs the work of connection to the foreign side of the Connection.
434     * <p>
435     * This creates the initial connection to the foreign end of the {@link JmsConnector}
436     * and then sets up all the destination bridges with the information needed to bridge
437     * on the foreign side of the connection.
438     *
439     * @throws Exception if the connection cannot be established for any reason.
440     */
441    protected abstract void initializeForeignConnection() throws Exception;
442
443    /**
444     * Callback method that the Destination bridges can use to report an exception to occurs
445     * during normal bridging operations.
446     *
447     * @param connection
448     *          The connection that was in use when the failure occured.
449     */
450    void handleConnectionFailure(Connection connection) {
451
452        // Can happen if async exception listener kicks in at the same time.
453        if (connection == null || !this.started.get()) {
454            return;
455        }
456
457        LOG.info("JmsConnector handling loss of connection [" + connection.toString() + "]");
458
459        // TODO - How do we handle the re-wiring of replyToBridges in this case.
460        replyToBridges.clear();
461
462        if (this.foreignConnection.compareAndSet((QueueConnection)connection, null)) {
463
464            // Stop the inbound bridges when the foreign connection is dropped since
465            // the bridge has no consumer and needs to be restarted once a new connection
466            // to the foreign side is made.
467            for (DestinationBridge bridge : inboundBridges) {
468                try {
469                    bridge.stop();
470                } catch(Exception e) {
471                }
472            }
473
474            // We got here first and cleared the connection, now we queue a reconnect.
475            this.connectionSerivce.execute(new Runnable() {
476
477                @Override
478                public void run() {
479                    try {
480                        doInitializeConnection(false);
481                    } catch (Exception e) {
482                        LOG.error("Failed to initialize forgein connection for the JMSConnector", e);
483                    }
484                }
485            });
486
487        } else if (this.localConnection.compareAndSet((QueueConnection)connection, null)) {
488
489            // Stop the outbound bridges when the local connection is dropped since
490            // the bridge has no consumer and needs to be restarted once a new connection
491            // to the local side is made.
492            for (DestinationBridge bridge : outboundBridges) {
493                try {
494                    bridge.stop();
495                } catch(Exception e) {
496                }
497            }
498
499            // We got here first and cleared the connection, now we queue a reconnect.
500            this.connectionSerivce.execute(new Runnable() {
501
502                @Override
503                public void run() {
504                    try {
505                        doInitializeConnection(true);
506                    } catch (Exception e) {
507                        LOG.error("Failed to initialize local connection for the JMSConnector", e);
508                    }
509                }
510            });
511        }
512    }
513
514    private void scheduleAsyncLocalConnectionReconnect() {
515        this.connectionSerivce.execute(new Runnable() {
516            @Override
517            public void run() {
518                try {
519                    doInitializeConnection(true);
520                } catch (Exception e) {
521                    LOG.error("Failed to initialize local connection for the JMSConnector", e);
522                }
523            }
524        });
525    }
526
527    private void scheduleAsyncForeignConnectionReconnect() {
528        this.connectionSerivce.execute(new Runnable() {
529            @Override
530            public void run() {
531                try {
532                    doInitializeConnection(false);
533                } catch (Exception e) {
534                    LOG.error("Failed to initialize forgein connection for the JMSConnector", e);
535                }
536            }
537        });
538    }
539
540    private void doInitializeConnection(boolean local) throws Exception {
541
542        int attempt = 0;
543
544        final int maxRetries;
545        if (local) {
546            maxRetries = !localSideInitialized.get() ? policy.getMaxInitialConnectAttempts() :
547                                                       policy.getMaxReconnectAttempts();
548        } else {
549            maxRetries = !foreignSideInitialized.get() ? policy.getMaxInitialConnectAttempts() :
550                                                         policy.getMaxReconnectAttempts();
551        }
552
553        do
554        {
555            if (attempt > 0) {
556                try {
557                    Thread.sleep(policy.getNextDelay(attempt));
558                } catch(InterruptedException e) {
559                }
560            }
561
562            if (connectionSerivce.isTerminating()) {
563                return;
564            }
565
566            try {
567
568                if (local) {
569                    initializeLocalConnection();
570                    localSideInitialized.set(true);
571                } else {
572                    initializeForeignConnection();
573                    foreignSideInitialized.set(true);
574                }
575
576                // Once we are connected we ensure all the bridges are started.
577                if (localConnection.get() != null && foreignConnection.get() != null) {
578                    for (DestinationBridge bridge : inboundBridges) {
579                        bridge.start();
580                    }
581                    for (DestinationBridge bridge : outboundBridges) {
582                        bridge.start();
583                    }
584                }
585
586                return;
587            } catch(Exception e) {
588                LOG.debug("Failed to establish initial " + (local ? "local" : "foriegn") +
589                          " connection for JmsConnector [" + attempt + "]: " + e.getMessage());
590            }
591        }
592        while (maxRetries < ++attempt && !connectionSerivce.isTerminating());
593
594        this.failed.set(true);
595    }
596
597    private ThreadFactory factory = new ThreadFactory() {
598        public Thread newThread(Runnable runnable) {
599            Thread thread = new Thread(runnable, "JmsConnector Async Connection Task: ");
600            thread.setDaemon(true);
601            return thread;
602        }
603    };
604
605    private ThreadPoolExecutor createExecutor() {
606        ThreadPoolExecutor exec = new ThreadPoolExecutor(0, 2, 30, TimeUnit.SECONDS, new LinkedBlockingQueue<Runnable>(), factory);
607        exec.allowCoreThreadTimeOut(true);
608        return exec;
609    }
610}