001/**
002 * Licensed to the Apache Software Foundation (ASF) under one or more
003 * contributor license agreements.  See the NOTICE file distributed with
004 * this work for additional information regarding copyright ownership.
005 * The ASF licenses this file to You under the Apache License, Version 2.0
006 * (the "License"); you may not use this file except in compliance with
007 * the License.  You may obtain a copy of the License at
008 *
009 *      http://www.apache.org/licenses/LICENSE-2.0
010 *
011 * Unless required by applicable law or agreed to in writing, software
012 * distributed under the License is distributed on an "AS IS" BASIS,
013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014 * See the License for the specific language governing permissions and
015 * limitations under the License.
016 */
017package org.apache.kahadb.util;
018
019import java.io.DataInput;
020import java.io.DataOutput;
021import java.io.IOException;
022import java.util.ArrayList;
023import java.util.Iterator;
024import java.util.List;
025import java.util.NoSuchElementException;
026
027/**
028 * Keeps track of a added long values. Collapses ranges of numbers using a
029 * Sequence representation. Use to keep track of received message ids to find
030 * out if a message is duplicate or if there are any missing messages.
031 *
032 * @author chirino
033 */
034public class SequenceSet extends LinkedNodeList<Sequence> implements Iterable<Long> {
035
036    public static class Marshaller implements org.apache.kahadb.util.Marshaller<SequenceSet> {
037
038        public static final Marshaller INSTANCE = new Marshaller();
039
040        public SequenceSet readPayload(DataInput in) throws IOException {
041            SequenceSet value = new SequenceSet();
042            int count = in.readInt();
043            for (int i = 0; i < count; i++) {
044                if( in.readBoolean() ) {
045                    Sequence sequence = new Sequence(in.readLong(), in.readLong());
046                    value.addLast(sequence);
047                } else {
048                    Sequence sequence = new Sequence(in.readLong());
049                    value.addLast(sequence);
050                }
051            }
052            return value;
053        }
054
055        public void writePayload(SequenceSet value, DataOutput out) throws IOException {
056            out.writeInt(value.size());
057            Sequence sequence = value.getHead();
058            while (sequence != null ) {
059                if( sequence.range() > 1 ) {
060                    out.writeBoolean(true);
061                    out.writeLong(sequence.first);
062                    out.writeLong(sequence.last);
063                } else {
064                    out.writeBoolean(false);
065                    out.writeLong(sequence.first);
066                }
067                sequence = sequence.getNext();
068            }
069        }
070
071        public int getFixedSize() {
072            return -1;
073        }
074
075        public SequenceSet deepCopy(SequenceSet value) {
076            SequenceSet rc = new SequenceSet();
077            Sequence sequence = value.getHead();
078            while (sequence != null ) {
079                rc.add(new Sequence(sequence.first, sequence.last));
080                sequence = sequence.getNext();
081            }
082            return rc;
083        }
084
085        public boolean isDeepCopySupported() {
086            return true;
087        }
088    }
089
090    public void add(Sequence value) {
091        // TODO we can probably optimize this a bit
092        for(long i=value.first; i<value.last+1; i++) {
093            add(i);
094        }
095    }
096
097    /**
098     *
099     * @param value
100     *            the value to add to the list
101     * @return false if the value was a duplicate.
102     */
103    public boolean add(long value) {
104
105        if (isEmpty()) {
106            addFirst(new Sequence(value));
107            return true;
108        }
109
110        Sequence sequence = getHead();
111        while (sequence != null) {
112
113            if (sequence.isAdjacentToLast(value)) {
114                // grow the sequence...
115                sequence.last = value;
116                // it might connect us to the next sequence..
117                if (sequence.getNext() != null) {
118                    Sequence next = sequence.getNext();
119                    if (next.isAdjacentToFirst(value)) {
120                        // Yep the sequence connected.. so join them.
121                        sequence.last = next.last;
122                        next.unlink();
123                    }
124                }
125                return true;
126            }
127
128            if (sequence.isAdjacentToFirst(value)) {
129                // grow the sequence...
130                sequence.first = value;
131
132                // it might connect us to the previous
133                if (sequence.getPrevious() != null) {
134                    Sequence prev = sequence.getPrevious();
135                    if (prev.isAdjacentToLast(value)) {
136                        // Yep the sequence connected.. so join them.
137                        sequence.first = prev.first;
138                        prev.unlink();
139                    }
140                }
141                return true;
142            }
143
144            // Did that value land before this sequence?
145            if (value < sequence.first) {
146                // Then insert a new entry before this sequence item.
147                sequence.linkBefore(new Sequence(value));
148                return true;
149            }
150
151            // Did that value land within the sequence? The it's a duplicate.
152            if (sequence.contains(value)) {
153                return false;
154            }
155
156            sequence = sequence.getNext();
157        }
158
159        // Then the value is getting appended to the tail of the sequence.
160        addLast(new Sequence(value));
161        return true;
162    }
163
164    /**
165     * Removes the given value from the Sequence set, splitting a
166     * contained sequence if necessary.
167     *
168     * @param value
169     *          The value that should be removed from the SequenceSet.
170     *
171     * @return true if the value was removed from the set, false if there
172     *         was no sequence in the set that contained the given value.
173     */
174    public boolean remove(long value) {
175        Sequence sequence = getHead();
176        while (sequence != null ) {
177            if(sequence.contains(value)) {
178                if (sequence.range() == 1) {
179                    sequence.unlink();
180                    return true;
181                } else if (sequence.getFirst() == value) {
182                    sequence.setFirst(value+1);
183                    return true;
184                } else if (sequence.getLast() == value) {
185                    sequence.setLast(value-1);
186                    return true;
187                } else {
188                    sequence.linkBefore(new Sequence(sequence.first, value-1));
189                    sequence.linkAfter(new Sequence(value+1, sequence.last));
190                    sequence.unlink();
191                    return true;
192                }
193            }
194
195            sequence = sequence.getNext();
196        }
197
198        return false;
199    }
200
201    /**
202     * Removes and returns the first element from this list.
203     *
204     * @return the first element from this list.
205     * @throws NoSuchElementException if this list is empty.
206     */
207    public long removeFirst() {
208        if (isEmpty()) {
209            throw new NoSuchElementException();
210        }
211
212        Sequence rc = removeFirstSequence(1);
213        return rc.first;
214    }
215
216    /**
217     * Removes and returns the last sequence from this list.
218     *
219     * @return the last sequence from this list or null if the list is empty.
220     */
221    public Sequence removeLastSequence() {
222        if (isEmpty()) {
223            return null;
224        }
225
226        Sequence rc = getTail();
227        rc.unlink();
228        return rc;
229    }
230
231    /**
232     * Removes and returns the first sequence that is count range large.
233     *
234     * @return a sequence that is count range large, or null if no sequence is that large in the list.
235     */
236    public Sequence removeFirstSequence(long count) {
237        if (isEmpty()) {
238            return null;
239        }
240
241        Sequence sequence = getHead();
242        while (sequence != null ) {
243            if (sequence.range() == count ) {
244                sequence.unlink();
245                return sequence;
246            }
247            if (sequence.range() > count ) {
248                Sequence rc = new Sequence(sequence.first, sequence.first+count-1);
249                sequence.first+=count;
250                return rc;
251            }
252            sequence = sequence.getNext();
253        }
254        return null;
255    }
256
257    /**
258     * @return all the id Sequences that are missing from this set that are not
259     *         in between the range provided.
260     */
261    public List<Sequence> getMissing(long first, long last) {
262        ArrayList<Sequence> rc = new ArrayList<Sequence>();
263        if (first > last) {
264            throw new IllegalArgumentException("First cannot be more than last");
265        }
266        if (isEmpty()) {
267            // We are missing all the messages.
268            rc.add(new Sequence(first, last));
269            return rc;
270        }
271
272        Sequence sequence = getHead();
273        while (sequence != null && first <= last) {
274            if (sequence.contains(first)) {
275                first = sequence.last + 1;
276            } else {
277                if (first < sequence.first) {
278                    if (last < sequence.first) {
279                        rc.add(new Sequence(first, last));
280                        return rc;
281                    } else {
282                        rc.add(new Sequence(first, sequence.first - 1));
283                        first = sequence.last + 1;
284                    }
285                }
286            }
287            sequence = sequence.getNext();
288        }
289
290        if (first <= last) {
291            rc.add(new Sequence(first, last));
292        }
293        return rc;
294    }
295
296    /**
297     * @return all the Sequence that are in this list
298     */
299    public List<Sequence> getReceived() {
300        ArrayList<Sequence> rc = new ArrayList<Sequence>(size());
301        Sequence sequence = getHead();
302        while (sequence != null) {
303            rc.add(new Sequence(sequence.first, sequence.last));
304            sequence = sequence.getNext();
305        }
306        return rc;
307    }
308
309    /**
310     * Returns true if the value given is contained within one of the
311     * sequences held in this set.
312     *
313     * @param value
314     *      The value to search for in the set.
315     *
316     * @return true if the value is contained in the set.
317     */
318    public boolean contains(long value) {
319        if (isEmpty()) {
320            return false;
321        }
322
323        Sequence sequence = getHead();
324        while (sequence != null) {
325            if (sequence.contains(value)) {
326                return true;
327            }
328            sequence = sequence.getNext();
329        }
330
331        return false;
332    }
333
334    public boolean contains(int first, int last) {
335        if (isEmpty()) {
336            return false;
337        }
338        Sequence sequence = getHead();
339        while (sequence != null) {
340            if (sequence.first <= first && first <= sequence.last ) {
341                return last <= sequence.last;
342            }
343            sequence = sequence.getNext();
344        }
345        return false;
346    }
347
348    /**
349     * Computes the size of this Sequence by summing the values of all
350     * the contained sequences.
351     *
352     * @return the total number of values contained in this set if it
353     *         were to be iterated over like an array.
354     */
355    public long rangeSize() {
356        long result = 0;
357        Sequence sequence = getHead();
358        while (sequence != null) {
359            result += sequence.range();
360            sequence = sequence.getNext();
361        }
362        return result;
363    }
364
365    public Iterator<Long> iterator() {
366        return new SequenceIterator();
367    }
368
369    private class SequenceIterator implements Iterator<Long> {
370
371        private Sequence currentEntry;
372        private long lastReturned = -1;
373
374        public SequenceIterator() {
375            currentEntry = getHead();
376            if (currentEntry != null) {
377                lastReturned = currentEntry.first - 1;
378            }
379        }
380
381        public boolean hasNext() {
382            return currentEntry != null;
383        }
384
385        public Long next() {
386            if (currentEntry == null) {
387                throw new NoSuchElementException();
388            }
389
390            if(lastReturned < currentEntry.first) {
391                lastReturned = currentEntry.first;
392                if (currentEntry.range() == 1) {
393                    currentEntry = currentEntry.getNext();
394                }
395            } else {
396                lastReturned++;
397                if (lastReturned == currentEntry.last) {
398                    currentEntry = currentEntry.getNext();
399                }
400            }
401
402            return lastReturned;
403        }
404
405        public void remove() {
406            throw new UnsupportedOperationException();
407        }
408
409    }
410
411}