001/**
002 * Licensed to the Apache Software Foundation (ASF) under one or more
003 * contributor license agreements.  See the NOTICE file distributed with
004 * this work for additional information regarding copyright ownership.
005 * The ASF licenses this file to You under the Apache License, Version 2.0
006 * (the "License"); you may not use this file except in compliance with
007 * the License.  You may obtain a copy of the License at
008 *
009 *      http://www.apache.org/licenses/LICENSE-2.0
010 *
011 * Unless required by applicable law or agreed to in writing, software
012 * distributed under the License is distributed on an "AS IS" BASIS,
013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014 * See the License for the specific language governing permissions and
015 * limitations under the License.
016 */
017package org.apache.activemq.broker.region;
018
019import java.util.ArrayList;
020import java.util.HashMap;
021import java.util.Iterator;
022import java.util.List;
023import java.util.Map;
024import java.util.Set;
025import java.util.concurrent.ConcurrentHashMap;
026import java.util.concurrent.locks.ReentrantReadWriteLock;
027
028import javax.jms.JMSException;
029import org.apache.activemq.broker.ConnectionContext;
030import org.apache.activemq.broker.ConsumerBrokerExchange;
031import org.apache.activemq.broker.DestinationAlreadyExistsException;
032import org.apache.activemq.broker.ProducerBrokerExchange;
033import org.apache.activemq.broker.TransportConnection;
034import org.apache.activemq.command.ActiveMQDestination;
035import org.apache.activemq.command.ConsumerControl;
036import org.apache.activemq.command.ConsumerId;
037import org.apache.activemq.command.ConsumerInfo;
038import org.apache.activemq.command.DestinationInfo;
039import org.apache.activemq.command.Message;
040import org.apache.activemq.command.MessageAck;
041import org.apache.activemq.command.MessageDispatchNotification;
042import org.apache.activemq.command.MessagePull;
043import org.apache.activemq.command.ProducerInfo;
044import org.apache.activemq.command.RemoveSubscriptionInfo;
045import org.apache.activemq.command.Response;
046import org.apache.activemq.filter.DestinationFilter;
047import org.apache.activemq.filter.DestinationMap;
048import org.apache.activemq.security.SecurityContext;
049import org.apache.activemq.thread.TaskRunnerFactory;
050import org.apache.activemq.usage.SystemUsage;
051import org.slf4j.Logger;
052import org.slf4j.LoggerFactory;
053
054/**
055 *
056 */
057public abstract class AbstractRegion implements Region {
058
059    private static final Logger LOG = LoggerFactory.getLogger(AbstractRegion.class);
060
061    protected final Map<ActiveMQDestination, Destination> destinations = new ConcurrentHashMap<ActiveMQDestination, Destination>();
062    protected final DestinationMap destinationMap = new DestinationMap();
063    protected final Map<ConsumerId, Subscription> subscriptions = new ConcurrentHashMap<ConsumerId, Subscription>();
064    protected final SystemUsage usageManager;
065    protected final DestinationFactory destinationFactory;
066    protected final DestinationStatistics destinationStatistics;
067    protected final RegionBroker broker;
068    protected boolean autoCreateDestinations = true;
069    protected final TaskRunnerFactory taskRunnerFactory;
070    protected final ReentrantReadWriteLock destinationsLock = new ReentrantReadWriteLock();
071    protected final Map<ConsumerId, Object> consumerChangeMutexMap = new HashMap<ConsumerId, Object>();
072    protected boolean started;
073
074    public AbstractRegion(RegionBroker broker, DestinationStatistics destinationStatistics, SystemUsage memoryManager,
075            TaskRunnerFactory taskRunnerFactory, DestinationFactory destinationFactory) {
076        if (broker == null) {
077            throw new IllegalArgumentException("null broker");
078        }
079        this.broker = broker;
080        this.destinationStatistics = destinationStatistics;
081        this.usageManager = memoryManager;
082        this.taskRunnerFactory = taskRunnerFactory;
083        if (destinationFactory == null) {
084            throw new IllegalArgumentException("null destinationFactory");
085        }
086        this.destinationFactory = destinationFactory;
087    }
088
089    public final void start() throws Exception {
090        started = true;
091
092        Set<ActiveMQDestination> inactiveDests = getInactiveDestinations();
093        for (Iterator<ActiveMQDestination> iter = inactiveDests.iterator(); iter.hasNext();) {
094            ActiveMQDestination dest = iter.next();
095
096            ConnectionContext context = new ConnectionContext();
097            context.setBroker(broker.getBrokerService().getBroker());
098            context.setSecurityContext(SecurityContext.BROKER_SECURITY_CONTEXT);
099            context.getBroker().addDestination(context, dest, false);
100        }
101        destinationsLock.readLock().lock();
102        try{
103            for (Iterator<Destination> i = destinations.values().iterator(); i.hasNext();) {
104                Destination dest = i.next();
105                dest.start();
106            }
107        } finally {
108            destinationsLock.readLock().unlock();
109        }
110    }
111
112    public void stop() throws Exception {
113        started = false;
114        destinationsLock.readLock().lock();
115        try{
116            for (Iterator<Destination> i = destinations.values().iterator(); i.hasNext();) {
117                Destination dest = i.next();
118                dest.stop();
119            }
120        } finally {
121            destinationsLock.readLock().unlock();
122        }
123        destinations.clear();
124    }
125
126    public Destination addDestination(ConnectionContext context, ActiveMQDestination destination,
127            boolean createIfTemporary) throws Exception {
128
129        destinationsLock.writeLock().lock();
130        try {
131            Destination dest = destinations.get(destination);
132            if (dest == null) {
133                if (destination.isTemporary() == false || createIfTemporary) {
134                    if (LOG.isDebugEnabled()) {
135                        LOG.debug(broker.getBrokerName() + " adding destination: " + destination);
136                    }
137                    dest = createDestination(context, destination);
138                    // intercept if there is a valid interceptor defined
139                    DestinationInterceptor destinationInterceptor = broker.getDestinationInterceptor();
140                    if (destinationInterceptor != null) {
141                        dest = destinationInterceptor.intercept(dest);
142                    }
143                    dest.start();
144                    destinations.put(destination, dest);
145                    destinationMap.put(destination, dest);
146                    addSubscriptionsForDestination(context, dest);
147                }
148                if (dest == null) {
149                    throw new JMSException("The destination " + destination + " does not exist.");
150                }
151            }
152            return dest;
153        } finally {
154            destinationsLock.writeLock().unlock();
155        }
156    }
157
158    public Map<ConsumerId, Subscription> getSubscriptions() {
159        return subscriptions;
160    }
161
162    protected List<Subscription> addSubscriptionsForDestination(ConnectionContext context, Destination dest)
163            throws Exception {
164
165        List<Subscription> rc = new ArrayList<Subscription>();
166        // Add all consumers that are interested in the destination.
167        for (Iterator<Subscription> iter = subscriptions.values().iterator(); iter.hasNext();) {
168            Subscription sub = iter.next();
169            if (sub.matches(dest.getActiveMQDestination())) {
170                dest.addSubscription(context, sub);
171                rc.add(sub);
172            }
173        }
174        return rc;
175
176    }
177
178    public void removeDestination(ConnectionContext context, ActiveMQDestination destination, long timeout)
179            throws Exception {
180
181        // No timeout.. then try to shut down right way, fails if there are
182        // current subscribers.
183        if (timeout == 0) {
184            for (Iterator<Subscription> iter = subscriptions.values().iterator(); iter.hasNext();) {
185                Subscription sub = iter.next();
186                if (sub.matches(destination)) {
187                    throw new JMSException("Destination still has an active subscription: " + destination);
188                }
189            }
190        }
191
192        if (timeout > 0) {
193            // TODO: implement a way to notify the subscribers that we want to
194            // take the down
195            // the destination and that they should un-subscribe.. Then wait up
196            // to timeout time before
197            // dropping the subscription.
198        }
199
200        if (LOG.isDebugEnabled()) {
201            LOG.debug(broker.getBrokerName() + " removing destination: " + destination);
202        }
203
204        destinationsLock.writeLock().lock();
205        try {
206            Destination dest = destinations.remove(destination);
207            if (dest != null) {
208                // timeout<0 or we timed out, we now force any remaining
209                // subscriptions to un-subscribe.
210                for (Iterator<Subscription> iter = subscriptions.values().iterator(); iter.hasNext();) {
211                    Subscription sub = iter.next();
212                    if (sub.matches(destination)) {
213                        dest.removeSubscription(context, sub, 0l);
214                    }
215                }
216                destinationMap.removeAll(destination);
217                dispose(context, dest);
218                DestinationInterceptor destinationInterceptor = broker.getDestinationInterceptor();
219                if (destinationInterceptor != null) {
220                    destinationInterceptor.remove(dest);
221                }
222
223            } else {
224                if (LOG.isDebugEnabled()) {
225                    LOG.debug("Cannot remove a destination that doesn't exist: " + destination);
226                }
227            }
228        } finally {
229            destinationsLock.writeLock().unlock();
230        }
231    }
232
233    /**
234     * Provide an exact or wildcard lookup of destinations in the region
235     *
236     * @return a set of matching destination objects.
237     */
238    @SuppressWarnings("unchecked")
239    public Set<Destination> getDestinations(ActiveMQDestination destination) {
240        destinationsLock.readLock().lock();
241        try{
242            return destinationMap.get(destination);
243        } finally {
244            destinationsLock.readLock().unlock();
245        }
246    }
247
248    public Map<ActiveMQDestination, Destination> getDestinationMap() {
249        destinationsLock.readLock().lock();
250        try{
251            return destinations;
252        } finally {
253            destinationsLock.readLock().unlock();
254        }
255    }
256
257    @SuppressWarnings("unchecked")
258    public Subscription addConsumer(ConnectionContext context, ConsumerInfo info) throws Exception {
259        if (LOG.isDebugEnabled()) {
260            LOG.debug(broker.getBrokerName() + " adding consumer: " + info.getConsumerId() + " for destination: "
261                    + info.getDestination());
262        }
263        ActiveMQDestination destination = info.getDestination();
264        if (destination != null && !destination.isPattern() && !destination.isComposite()) {
265            // lets auto-create the destination
266            lookup(context, destination,true);
267        }
268
269        Object addGuard;
270        synchronized (consumerChangeMutexMap) {
271            addGuard = consumerChangeMutexMap.get(info.getConsumerId());
272            if (addGuard == null) {
273                addGuard = new Object();
274                consumerChangeMutexMap.put(info.getConsumerId(), addGuard);
275            }
276        }
277        synchronized (addGuard) {
278            Subscription o = subscriptions.get(info.getConsumerId());
279            if (o != null) {
280                LOG.warn("A duplicate subscription was detected. Clients may be misbehaving. Later warnings you may see about subscription removal are a consequence of this.");
281                return o;
282            }
283
284            // We may need to add some destinations that are in persistent store
285            // but not active
286            // in the broker.
287            //
288            // TODO: think about this a little more. This is good cause
289            // destinations are not loaded into
290            // memory until a client needs to use the queue, but a management
291            // agent viewing the
292            // broker will not see a destination that exists in persistent
293            // store. We may want to
294            // eagerly load all destinations into the broker but have an
295            // inactive state for the
296            // destination which has reduced memory usage.
297            //
298            DestinationFilter.parseFilter(info.getDestination());
299
300            Subscription sub = createSubscription(context, info);
301
302            subscriptions.put(info.getConsumerId(), sub);
303
304            // At this point we're done directly manipulating subscriptions,
305            // but we need to retain the synchronized block here. Consider
306            // otherwise what would happen if at this point a second
307            // thread added, then removed, as would be allowed with
308            // no mutex held. Remove is only essentially run once
309            // so everything after this point would be leaked.
310
311            // Add the subscription to all the matching queues.
312            // But copy the matches first - to prevent deadlocks
313            List<Destination> addList = new ArrayList<Destination>();
314            destinationsLock.readLock().lock();
315            try {
316                for (Destination dest : (Set<Destination>) destinationMap.get(info.getDestination())) {
317                    addList.add(dest);
318                }
319            } finally {
320                destinationsLock.readLock().unlock();
321            }
322
323            for (Destination dest : addList) {
324                dest.addSubscription(context, sub);
325            }
326
327            if (info.isBrowser()) {
328                ((QueueBrowserSubscription) sub).destinationsAdded();
329            }
330
331            return sub;
332        }
333    }
334
335    /**
336     * Get all the Destinations that are in storage
337     *
338     * @return Set of all stored destinations
339     */
340    @SuppressWarnings("rawtypes")
341    public Set getDurableDestinations() {
342        return destinationFactory.getDestinations();
343    }
344
345    /**
346     * @return all Destinations that don't have active consumers
347     */
348    protected Set<ActiveMQDestination> getInactiveDestinations() {
349        Set<ActiveMQDestination> inactiveDests = destinationFactory.getDestinations();
350        destinationsLock.readLock().lock();
351        try {
352            inactiveDests.removeAll(destinations.keySet());
353        } finally {
354            destinationsLock.readLock().unlock();
355        }
356        return inactiveDests;
357    }
358
359    @SuppressWarnings("unchecked")
360    public void removeConsumer(ConnectionContext context, ConsumerInfo info) throws Exception {
361        if (LOG.isDebugEnabled()) {
362            LOG.debug(broker.getBrokerName() + " removing consumer: " + info.getConsumerId() + " for destination: "
363                    + info.getDestination());
364        }
365
366        Subscription sub = subscriptions.remove(info.getConsumerId());
367        // The sub could be removed elsewhere - see ConnectionSplitBroker
368        if (sub != null) {
369
370            // remove the subscription from all the matching queues.
371            List<Destination> removeList = new ArrayList<Destination>();
372            destinationsLock.readLock().lock();
373            try {
374                for (Destination dest : (Set<Destination>) destinationMap.get(info.getDestination())) {
375                    removeList.add(dest);
376                }
377            } finally {
378                destinationsLock.readLock().unlock();
379            }
380            for (Destination dest : removeList) {
381                dest.removeSubscription(context, sub, info.getLastDeliveredSequenceId());
382            }
383
384            destroySubscription(sub);
385        }
386        synchronized (consumerChangeMutexMap) {
387            consumerChangeMutexMap.remove(info.getConsumerId());
388        }
389    }
390
391    protected void destroySubscription(Subscription sub) {
392        sub.destroy();
393    }
394
395    public void removeSubscription(ConnectionContext context, RemoveSubscriptionInfo info) throws Exception {
396        throw new JMSException("Invalid operation.");
397    }
398
399    public void send(final ProducerBrokerExchange producerExchange, Message messageSend) throws Exception {
400        final ConnectionContext context = producerExchange.getConnectionContext();
401
402        if (producerExchange.isMutable() || producerExchange.getRegionDestination() == null) {
403            final Destination regionDestination = lookup(context, messageSend.getDestination(),false);
404            producerExchange.setRegionDestination(regionDestination);
405        }
406
407        producerExchange.getRegionDestination().send(producerExchange, messageSend);
408    }
409
410    public void acknowledge(ConsumerBrokerExchange consumerExchange, MessageAck ack) throws Exception {
411        Subscription sub = consumerExchange.getSubscription();
412        if (sub == null) {
413            sub = subscriptions.get(ack.getConsumerId());
414            if (sub == null) {
415                if (!consumerExchange.getConnectionContext().isInRecoveryMode()) {
416                    LOG.warn("Ack for non existent subscription, ack:" + ack);
417                    throw new IllegalArgumentException("The subscription does not exist: " + ack.getConsumerId());
418                } else {
419                    if (LOG.isDebugEnabled()) {
420                        LOG.debug("Ack for non existent subscription in recovery, ack:" + ack);
421                    }
422                    return;
423                }
424            }
425            consumerExchange.setSubscription(sub);
426        }
427        sub.acknowledge(consumerExchange.getConnectionContext(), ack);
428    }
429
430    public Response messagePull(ConnectionContext context, MessagePull pull) throws Exception {
431        Subscription sub = subscriptions.get(pull.getConsumerId());
432        if (sub == null) {
433            throw new IllegalArgumentException("The subscription does not exist: " + pull.getConsumerId());
434        }
435        return sub.pullMessage(context, pull);
436    }
437
438    protected Destination lookup(ConnectionContext context, ActiveMQDestination destination,boolean createTemporary) throws Exception {
439        Destination dest = null;
440
441        destinationsLock.readLock().lock();
442        try {
443            dest = destinations.get(destination);
444        } finally {
445            destinationsLock.readLock().unlock();
446        }
447
448        if (dest == null) {
449            if (isAutoCreateDestinations()) {
450                // Try to auto create the destination... re-invoke broker
451                // from the
452                // top so that the proper security checks are performed.
453                try {
454                    context.getBroker().addDestination(context, destination, createTemporary);
455                    dest = addDestination(context, destination, false);
456                } catch (DestinationAlreadyExistsException e) {
457                    // if the destination already exists then lets ignore
458                    // this error
459                }
460                // We should now have the dest created.
461                destinationsLock.readLock().lock();
462                try {
463                    dest = destinations.get(destination);
464                } finally {
465                    destinationsLock.readLock().unlock();
466                }
467            }
468
469            if (dest == null) {
470                throw new JMSException("The destination " + destination + " does not exist.");
471            }
472        }
473        return dest;
474    }
475
476    public void processDispatchNotification(MessageDispatchNotification messageDispatchNotification) throws Exception {
477        Subscription sub = subscriptions.get(messageDispatchNotification.getConsumerId());
478        if (sub != null) {
479            sub.processMessageDispatchNotification(messageDispatchNotification);
480        } else {
481            throw new JMSException("Slave broker out of sync with master - Subscription: "
482                    + messageDispatchNotification.getConsumerId() + " on "
483                    + messageDispatchNotification.getDestination() + " does not exist for dispatch of message: "
484                    + messageDispatchNotification.getMessageId());
485        }
486    }
487
488    /*
489     * For a Queue/TempQueue, dispatch order is imperative to match acks, so the
490     * dispatch is deferred till the notification to ensure that the
491     * subscription chosen by the master is used. AMQ-2102
492     */
493    protected void processDispatchNotificationViaDestination(MessageDispatchNotification messageDispatchNotification)
494            throws Exception {
495        Destination dest = null;
496        destinationsLock.readLock().lock();
497        try {
498            dest = destinations.get(messageDispatchNotification.getDestination());
499        } finally {
500            destinationsLock.readLock().unlock();
501        }
502
503        if (dest != null) {
504            dest.processDispatchNotification(messageDispatchNotification);
505        } else {
506            throw new JMSException("Slave broker out of sync with master - Destination: "
507                    + messageDispatchNotification.getDestination() + " does not exist for consumer "
508                    + messageDispatchNotification.getConsumerId() + " with message: "
509                    + messageDispatchNotification.getMessageId());
510        }
511    }
512
513    public void gc() {
514        for (Subscription sub : subscriptions.values()) {
515            sub.gc();
516        }
517
518        destinationsLock.readLock().lock();
519        try {
520            for (Destination dest : destinations.values()) {
521                dest.gc();
522            }
523        } finally {
524            destinationsLock.readLock().unlock();
525        }
526    }
527
528    protected abstract Subscription createSubscription(ConnectionContext context, ConsumerInfo info) throws Exception;
529
530    protected Destination createDestination(ConnectionContext context, ActiveMQDestination destination)
531            throws Exception {
532        return destinationFactory.createDestination(context, destination, destinationStatistics);
533    }
534
535    public boolean isAutoCreateDestinations() {
536        return autoCreateDestinations;
537    }
538
539    public void setAutoCreateDestinations(boolean autoCreateDestinations) {
540        this.autoCreateDestinations = autoCreateDestinations;
541    }
542
543    @SuppressWarnings("unchecked")
544    public void addProducer(ConnectionContext context, ProducerInfo info) throws Exception {
545        destinationsLock.readLock().lock();
546        try {
547            for (Destination dest : (Set<Destination>) destinationMap.get(info.getDestination())) {
548                dest.addProducer(context, info);
549            }
550        } finally {
551            destinationsLock.readLock().unlock();
552        }
553    }
554
555    /**
556     * Removes a Producer.
557     *
558     * @param context
559     *            the environment the operation is being executed under.
560     * @throws Exception
561     *             TODO
562     */
563    @SuppressWarnings("unchecked")
564    public void removeProducer(ConnectionContext context, ProducerInfo info) throws Exception {
565        destinationsLock.readLock().lock();
566        try {
567            for (Destination dest : (Set<Destination>) destinationMap.get(info.getDestination())) {
568                dest.removeProducer(context, info);
569            }
570        } finally {
571            destinationsLock.readLock().unlock();
572        }
573    }
574
575    protected void dispose(ConnectionContext context, Destination dest) throws Exception {
576        dest.dispose(context);
577        dest.stop();
578        destinationFactory.removeDestination(dest);
579    }
580
581    public void processConsumerControl(ConsumerBrokerExchange consumerExchange, ConsumerControl control) {
582        Subscription sub = subscriptions.get(control.getConsumerId());
583        if (sub != null && sub instanceof AbstractSubscription) {
584            ((AbstractSubscription) sub).setPrefetchSize(control.getPrefetch());
585            if (LOG.isDebugEnabled()) {
586                LOG.debug("setting prefetch: " + control.getPrefetch() + ", on subscription: "
587                        + control.getConsumerId());
588            }
589            try {
590                lookup(consumerExchange.getConnectionContext(), control.getDestination(),false).wakeup();
591            } catch (Exception e) {
592                LOG.warn("failed to deliver consumerControl to destination: " + control.getDestination(), e);
593            }
594        }
595    }
596}