001/**
002 * Licensed to the Apache Software Foundation (ASF) under one or more
003 * contributor license agreements.  See the NOTICE file distributed with
004 * this work for additional information regarding copyright ownership.
005 * The ASF licenses this file to You under the Apache License, Version 2.0
006 * (the "License"); you may not use this file except in compliance with
007 * the License.  You may obtain a copy of the License at
008 *
009 *      http://www.apache.org/licenses/LICENSE-2.0
010 *
011 * Unless required by applicable law or agreed to in writing, software
012 * distributed under the License is distributed on an "AS IS" BASIS,
013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014 * See the License for the specific language governing permissions and
015 * limitations under the License.
016 */
017package org.apache.activemq.broker.region.cursors;
018
019import java.io.IOException;
020import java.util.Iterator;
021import java.util.LinkedList;
022import java.util.concurrent.atomic.AtomicBoolean;
023import java.util.concurrent.atomic.AtomicLong;
024import org.apache.activemq.broker.Broker;
025import org.apache.activemq.broker.ConnectionContext;
026import org.apache.activemq.broker.region.Destination;
027import org.apache.activemq.broker.region.IndirectMessageReference;
028import org.apache.activemq.broker.region.MessageReference;
029import org.apache.activemq.broker.region.QueueMessageReference;
030import org.apache.activemq.command.Message;
031import org.apache.activemq.filter.NonCachedMessageEvaluationContext;
032import org.apache.activemq.openwire.OpenWireFormat;
033import org.apache.activemq.store.kahadb.plist.PList;
034import org.apache.activemq.store.kahadb.plist.PListEntry;
035import org.apache.activemq.store.kahadb.plist.PListStore;
036import org.apache.activemq.usage.SystemUsage;
037import org.apache.activemq.usage.Usage;
038import org.apache.activemq.usage.UsageListener;
039import org.apache.activemq.wireformat.WireFormat;
040import org.slf4j.Logger;
041import org.slf4j.LoggerFactory;
042import org.apache.kahadb.util.ByteSequence;
043
044/**
045 * persist pending messages pending message (messages awaiting dispatch to a
046 * consumer) cursor
047 * 
048 * 
049 */
050public class FilePendingMessageCursor extends AbstractPendingMessageCursor implements UsageListener {
051    static final Logger LOG = LoggerFactory.getLogger(FilePendingMessageCursor.class);
052    private static final AtomicLong NAME_COUNT = new AtomicLong();
053    protected Broker broker;
054    private final PListStore store;
055    private final String name;
056    private PendingList memoryList;
057    private PList diskList;
058    private Iterator<MessageReference> iter;
059    private Destination regionDestination;
060    private boolean iterating;
061    private boolean flushRequired;
062    private final AtomicBoolean started = new AtomicBoolean();
063    private final WireFormat wireFormat = new OpenWireFormat();
064    /**
065     * @param broker
066     * @param name
067     * @param prioritizedMessages
068     */
069    public FilePendingMessageCursor(Broker broker, String name, boolean prioritizedMessages) {
070        super(prioritizedMessages);
071        if (this.prioritizedMessages) {
072            this.memoryList = new PrioritizedPendingList();
073        } else {
074            this.memoryList = new OrderedPendingList();
075        }
076        this.broker = broker;
077        // the store can be null if the BrokerService has persistence
078        // turned off
079        this.store = broker.getTempDataStore();
080        this.name = NAME_COUNT.incrementAndGet() + "_" + name;
081    }
082
083    @Override
084    public void start() throws Exception {
085        if (started.compareAndSet(false, true)) {
086            super.start();
087            if (systemUsage != null) {
088                systemUsage.getMemoryUsage().addUsageListener(this);
089            }
090        }
091    }
092
093    @Override
094    public void stop() throws Exception {
095        if (started.compareAndSet(true, false)) {
096            super.stop();
097            if (systemUsage != null) {
098                systemUsage.getMemoryUsage().removeUsageListener(this);
099            }
100        }
101    }
102
103    /**
104     * @return true if there are no pending messages
105     */
106    @Override
107    public synchronized boolean isEmpty() {
108        if (memoryList.isEmpty() && isDiskListEmpty()) {
109            return true;
110        }
111        for (Iterator<MessageReference> iterator = memoryList.iterator(); iterator.hasNext();) {
112            MessageReference node = iterator.next();
113            if (node == QueueMessageReference.NULL_MESSAGE) {
114                continue;
115            }
116            if (!node.isDropped()) {
117                return false;
118            }
119            // We can remove dropped references.
120            iterator.remove();
121        }
122        return isDiskListEmpty();
123    }
124
125    /**
126     * reset the cursor
127     */
128    @Override
129    public synchronized void reset() {
130        iterating = true;
131        last = null;
132        if (isDiskListEmpty()) {
133            this.iter = this.memoryList.iterator();
134        } else {
135            this.iter = new DiskIterator();
136        }
137    }
138
139    @Override
140    public synchronized void release() {
141        iterating = false;
142        if (iter instanceof DiskIterator) {
143           ((DiskIterator)iter).release();
144        };
145        if (flushRequired) {
146            flushRequired = false;
147            if (!hasSpace()) {
148                flushToDisk();
149            }
150        }
151    }
152
153    @Override
154    public synchronized void destroy() throws Exception {
155        stop();
156        for (Iterator<MessageReference> i = memoryList.iterator(); i.hasNext();) {
157            Message node = (Message) i.next();
158            node.decrementReferenceCount();
159        }
160        memoryList.clear();
161        destroyDiskList();
162    }
163
164    private void destroyDiskList() throws Exception {
165        if (diskList != null) {
166            store.removePList(name);
167            diskList = null;
168        }
169    }
170
171    @Override
172    public synchronized LinkedList<MessageReference> pageInList(int maxItems) {
173        LinkedList<MessageReference> result = new LinkedList<MessageReference>();
174        int count = 0;
175        for (Iterator<MessageReference> i = memoryList.iterator(); i.hasNext() && count < maxItems;) {
176            MessageReference ref = i.next();
177            ref.incrementReferenceCount();
178            result.add(ref);
179            count++;
180        }
181        if (count < maxItems && !isDiskListEmpty()) {
182            for (Iterator<MessageReference> i = new DiskIterator(); i.hasNext() && count < maxItems;) {
183                Message message = (Message) i.next();
184                message.setRegionDestination(regionDestination);
185                message.setMemoryUsage(this.getSystemUsage().getMemoryUsage());
186                message.incrementReferenceCount();
187                result.add(message);
188                count++;
189            }
190        }
191        return result;
192    }
193
194    /**
195     * add message to await dispatch
196     * 
197     * @param node
198     * @throws Exception 
199     */
200    @Override
201    public synchronized void addMessageLast(MessageReference node) throws Exception {
202        tryAddMessageLast(node, 0);
203    }
204    
205    @Override
206    public synchronized boolean tryAddMessageLast(MessageReference node, long maxWaitTime) throws Exception {
207        if (!node.isExpired()) {
208            try {
209                regionDestination = node.getMessage().getRegionDestination();
210                if (isDiskListEmpty()) {
211                    if (hasSpace() || this.store == null) {
212                        memoryList.addMessageLast(node);
213                        node.incrementReferenceCount();
214                        setCacheEnabled(true);
215                        return true;
216                    }
217                }
218                if (!hasSpace()) {
219                    if (isDiskListEmpty()) {
220                        expireOldMessages();
221                        if (hasSpace()) {
222                            memoryList.addMessageLast(node);
223                            node.incrementReferenceCount();
224                            return true;
225                        } else {
226                            flushToDisk();
227                        }
228                    }
229                }
230                if (systemUsage.getTempUsage().waitForSpace(maxWaitTime)) {
231                    ByteSequence bs = getByteSequence(node.getMessage());
232                    getDiskList().addLast(node.getMessageId().toString(), bs);
233                    return true;
234                }
235                return false;
236
237            } catch (Exception e) {
238                LOG.error("Caught an Exception adding a message: " + node + " first to FilePendingMessageCursor ", e);
239                throw new RuntimeException(e);
240            }
241        } else {
242            discardExpiredMessage(node);
243        }
244        //message expired
245        return true;
246    }
247
248    /**
249     * add message to await dispatch
250     * 
251     * @param node
252     */
253    @Override
254    public synchronized void addMessageFirst(MessageReference node) {
255        if (!node.isExpired()) {
256            try {
257                regionDestination = node.getMessage().getRegionDestination();
258                if (isDiskListEmpty()) {
259                    if (hasSpace()) {
260                        memoryList.addMessageFirst(node);
261                        node.incrementReferenceCount();
262                        setCacheEnabled(true);
263                        return;
264                    }
265                }
266                if (!hasSpace()) {
267                    if (isDiskListEmpty()) {
268                        expireOldMessages();
269                        if (hasSpace()) {
270                            memoryList.addMessageFirst(node);
271                            node.incrementReferenceCount();
272                            return;
273                        } else {
274                            flushToDisk();
275                        }
276                    }
277                }
278                systemUsage.getTempUsage().waitForSpace();
279                node.decrementReferenceCount();
280                ByteSequence bs = getByteSequence(node.getMessage());
281                getDiskList().addFirst(node.getMessageId().toString(), bs);
282
283            } catch (Exception e) {
284                LOG.error("Caught an Exception adding a message: " + node + " first to FilePendingMessageCursor ", e);
285                throw new RuntimeException(e);
286            }
287        } else {
288            discardExpiredMessage(node);
289        }
290    }
291
292    /**
293     * @return true if there pending messages to dispatch
294     */
295    @Override
296    public synchronized boolean hasNext() {
297        return iter.hasNext();
298    }
299
300    /**
301     * @return the next pending message
302     */
303    @Override
304    public synchronized MessageReference next() {
305        MessageReference reference = iter.next();
306        last = reference;
307        if (!isDiskListEmpty()) {
308            // got from disk
309            reference.getMessage().setRegionDestination(regionDestination);
310            reference.getMessage().setMemoryUsage(this.getSystemUsage().getMemoryUsage());
311        }
312        reference.incrementReferenceCount();
313        return reference;
314    }
315
316    /**
317     * remove the message at the cursor position
318     */
319    @Override
320    public synchronized void remove() {
321        iter.remove();
322        if (last != null) {
323            last.decrementReferenceCount();
324        }
325    }
326
327    /**
328     * @param node
329     * @see org.apache.activemq.broker.region.cursors.AbstractPendingMessageCursor#remove(org.apache.activemq.broker.region.MessageReference)
330     */
331    @Override
332    public synchronized void remove(MessageReference node) {
333        if (memoryList.remove(node) != null) {
334            node.decrementReferenceCount();
335        }
336        if (!isDiskListEmpty()) {
337            try {
338                getDiskList().remove(node.getMessageId().toString());
339            } catch (IOException e) {
340                throw new RuntimeException(e);
341            }
342        }
343    }
344
345    /**
346     * @return the number of pending messages
347     */
348    @Override
349    public synchronized int size() {
350        return memoryList.size() + (isDiskListEmpty() ? 0 : (int)getDiskList().size());
351    }
352
353    /**
354     * clear all pending messages
355     */
356    @Override
357    public synchronized void clear() {
358        memoryList.clear();
359        if (!isDiskListEmpty()) {
360            try {
361                getDiskList().destroy();
362            } catch (IOException e) {
363                throw new RuntimeException(e);
364            }
365        }
366        last = null;
367    }
368
369    @Override
370    public synchronized boolean isFull() {
371
372        return super.isFull() || (!isDiskListEmpty() && systemUsage != null && systemUsage.getTempUsage().isFull());
373
374    }
375
376    @Override
377    public boolean hasMessagesBufferedToDeliver() {
378        return !isEmpty();
379    }
380
381    @Override
382    public void setSystemUsage(SystemUsage usageManager) {
383        super.setSystemUsage(usageManager);
384    }
385
386    public void onUsageChanged(Usage usage, int oldPercentUsage, int newPercentUsage) {
387        if (newPercentUsage >= getMemoryUsageHighWaterMark()) {
388            synchronized (this) {
389                if (!flushRequired && size() != 0) {
390                    flushRequired =true;
391                    if (!iterating) {
392                        expireOldMessages();
393                        if (!hasSpace()) {
394                            flushToDisk();
395                            flushRequired = false;
396                        }
397                    }
398                }
399            }
400        }
401    }
402
403    @Override
404    public boolean isTransient() {
405        return true;
406    }
407
408    protected boolean isSpaceInMemoryList() {
409        return hasSpace() && isDiskListEmpty();
410    }
411
412    protected synchronized void expireOldMessages() {
413        if (!memoryList.isEmpty()) {
414            for (Iterator<MessageReference> iterator = memoryList.iterator(); iterator.hasNext();) {
415                MessageReference node = iterator.next();
416                if (node.isExpired()) {
417                    node.decrementReferenceCount();
418                    discardExpiredMessage(node);
419                    iterator.remove();
420                }
421            }
422        }
423    }
424
425    protected synchronized void flushToDisk() {
426        if (!memoryList.isEmpty() && store != null) {
427            long start = 0;
428             if (LOG.isTraceEnabled()) {
429                start = System.currentTimeMillis();
430                LOG.trace("" + name + ", flushToDisk() mem list size: " +memoryList.size()  + " " +  (systemUsage != null ? systemUsage.getMemoryUsage() : "") );
431             }
432            for (Iterator<MessageReference> iterator = memoryList.iterator(); iterator.hasNext();) {
433                MessageReference node = iterator.next();
434                node.decrementReferenceCount();
435                ByteSequence bs;
436                try {
437                    bs = getByteSequence(node.getMessage());
438                    getDiskList().addLast(node.getMessageId().toString(), bs);
439                } catch (IOException e) {
440                    LOG.error("Failed to write to disk list", e);
441                    throw new RuntimeException(e);
442                }
443
444            }
445            memoryList.clear();
446            setCacheEnabled(false);
447             if (LOG.isTraceEnabled()) {
448                LOG.trace("" + name + ", flushToDisk() done - " + (System.currentTimeMillis() - start) + "ms " + (systemUsage != null ? systemUsage.getMemoryUsage() : ""));
449             }
450        }
451    }
452
453    protected boolean isDiskListEmpty() {
454        return diskList == null || diskList.isEmpty();
455    }
456
457    protected PList getDiskList() {
458        if (diskList == null) {
459            try {
460                diskList = store.getPList(name);
461            } catch (Exception e) {
462                LOG.error("Caught an IO Exception getting the DiskList " + name, e);
463                throw new RuntimeException(e);
464            }
465        }
466        return diskList;
467    }
468
469    private void discardExpiredMessage(MessageReference reference) {
470        if (LOG.isDebugEnabled()) {
471            LOG.debug("Discarding expired message " + reference);
472        }
473        if (broker.isExpired(reference)) {
474            ConnectionContext context = new ConnectionContext(new NonCachedMessageEvaluationContext());
475            context.setBroker(broker);
476            reference.getRegionDestination().messageExpired(context, null, new IndirectMessageReference(reference.getMessage()));
477        }
478    }
479
480    protected ByteSequence getByteSequence(Message message) throws IOException {
481        org.apache.activemq.util.ByteSequence packet = wireFormat.marshal(message);
482        return new ByteSequence(packet.data, packet.offset, packet.length);
483    }
484
485    protected Message getMessage(ByteSequence bs) throws IOException {
486        org.apache.activemq.util.ByteSequence packet = new org.apache.activemq.util.ByteSequence(bs.getData(), bs
487                .getOffset(), bs.getLength());
488        return (Message) this.wireFormat.unmarshal(packet);
489
490    }
491
492    final class DiskIterator implements Iterator<MessageReference> {
493        private final PList.PListIterator iterator;
494        DiskIterator() {
495            try {
496                iterator = getDiskList().iterator();
497            } catch (Exception e) {
498                throw new RuntimeException(e);
499            }
500        }
501
502        public boolean hasNext() {
503            return iterator.hasNext();
504        }
505
506        public MessageReference next() {
507            try {
508                PListEntry entry = iterator.next();
509                return getMessage(entry.getByteSequence());
510            } catch (IOException e) {
511                LOG.error("I/O error", e);
512                throw new RuntimeException(e);
513            }
514        }
515
516        public void remove() {
517            iterator.remove();
518        }
519
520        public void release() {
521            iterator.release();
522        }
523    }
524}