001/**
002 * Licensed to the Apache Software Foundation (ASF) under one or more
003 * contributor license agreements.  See the NOTICE file distributed with
004 * this work for additional information regarding copyright ownership.
005 * The ASF licenses this file to You under the Apache License, Version 2.0
006 * (the "License"); you may not use this file except in compliance with
007 * the License.  You may obtain a copy of the License at
008 *
009 *      http://www.apache.org/licenses/LICENSE-2.0
010 *
011 * Unless required by applicable law or agreed to in writing, software
012 * distributed under the License is distributed on an "AS IS" BASIS,
013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014 * See the License for the specific language governing permissions and
015 * limitations under the License.
016 */
017package org.apache.activemq.store.kahadb.disk.util;
018
019import java.io.DataInput;
020import java.io.DataOutput;
021import java.io.IOException;
022import java.util.ArrayList;
023import java.util.Iterator;
024import java.util.List;
025import java.util.NoSuchElementException;
026
027/**
028 * Keeps track of a added long values. Collapses ranges of numbers using a
029 * Sequence representation. Use to keep track of received message ids to find
030 * out if a message is duplicate or if there are any missing messages.
031 *
032 * @author chirino
033 */
034public class SequenceSet extends LinkedNodeList<Sequence> implements Iterable<Long> {
035
036    public static class Marshaller implements org.apache.activemq.store.kahadb.disk.util.Marshaller<SequenceSet> {
037
038        public static final Marshaller INSTANCE = new Marshaller();
039
040        public SequenceSet readPayload(DataInput in) throws IOException {
041            SequenceSet value = new SequenceSet();
042            int count = in.readInt();
043            for (int i = 0; i < count; i++) {
044                if( in.readBoolean() ) {
045                    Sequence sequence = new Sequence(in.readLong(), in.readLong());
046                    value.addLast(sequence);
047                } else {
048                    Sequence sequence = new Sequence(in.readLong());
049                    value.addLast(sequence);
050                }
051            }
052            return value;
053        }
054
055        public void writePayload(SequenceSet value, DataOutput out) throws IOException {
056            out.writeInt(value.size());
057            Sequence sequence = value.getHead();
058            while (sequence != null ) {
059                if( sequence.range() > 1 ) {
060                    out.writeBoolean(true);
061                    out.writeLong(sequence.first);
062                    out.writeLong(sequence.last);
063                } else {
064                    out.writeBoolean(false);
065                    out.writeLong(sequence.first);
066                }
067                sequence = sequence.getNext();
068            }
069        }
070
071        public int getFixedSize() {
072            return -1;
073        }
074
075        public SequenceSet deepCopy(SequenceSet value) {
076            SequenceSet rc = new SequenceSet();
077            Sequence sequence = value.getHead();
078            while (sequence != null ) {
079                rc.add(new Sequence(sequence.first, sequence.last));
080                sequence = sequence.getNext();
081            }
082            return rc;
083        }
084
085        public boolean isDeepCopySupported() {
086            return true;
087        }
088    }
089
090    public void add(Sequence value) {
091        // TODO we can probably optimize this a bit
092        for(long i=value.first; i<value.last+1; i++) {
093            add(i);
094        }
095    }
096
097    /**
098     *
099     * @param value
100     *            the value to add to the list
101     * @return false if the value was a duplicate.
102     */
103    public boolean add(long value) {
104
105        if (isEmpty()) {
106            addFirst(new Sequence(value));
107            return true;
108        }
109
110        // check for append
111        Sequence sequence = getTail();
112        if (sequence.isAdjacentToLast(value)) {
113            sequence.last = value;
114            return true;
115        }
116
117        sequence = getHead();
118        while (sequence != null) {
119
120            if (sequence.isAdjacentToLast(value)) {
121                // grow the sequence...
122                sequence.last = value;
123                // it might connect us to the next sequence..
124                if (sequence.getNext() != null) {
125                    Sequence next = sequence.getNext();
126                    if (next.isAdjacentToFirst(value)) {
127                        // Yep the sequence connected.. so join them.
128                        sequence.last = next.last;
129                        next.unlink();
130                    }
131                }
132                return true;
133            }
134
135            if (sequence.isAdjacentToFirst(value)) {
136                // grow the sequence...
137                sequence.first = value;
138
139                // it might connect us to the previous
140                if (sequence.getPrevious() != null) {
141                    Sequence prev = sequence.getPrevious();
142                    if (prev.isAdjacentToLast(value)) {
143                        // Yep the sequence connected.. so join them.
144                        sequence.first = prev.first;
145                        prev.unlink();
146                    }
147                }
148                return true;
149            }
150
151            // Did that value land before this sequence?
152            if (value < sequence.first) {
153                // Then insert a new entry before this sequence item.
154                sequence.linkBefore(new Sequence(value));
155                return true;
156            }
157
158            // Did that value land within the sequence? The it's a duplicate.
159            if (sequence.contains(value)) {
160                return false;
161            }
162
163            sequence = sequence.getNext();
164        }
165
166        // Then the value is getting appended to the tail of the sequence.
167        addLast(new Sequence(value));
168        return true;
169    }
170
171    /**
172     * Removes the given value from the Sequence set, splitting a
173     * contained sequence if necessary.
174     *
175     * @param value
176     *          The value that should be removed from the SequenceSet.
177     *
178     * @return true if the value was removed from the set, false if there
179     *         was no sequence in the set that contained the given value.
180     */
181    public boolean remove(long value) {
182        Sequence sequence = getHead();
183        while (sequence != null ) {
184            if(sequence.contains(value)) {
185                if (sequence.range() == 1) {
186                    sequence.unlink();
187                    return true;
188                } else if (sequence.getFirst() == value) {
189                    sequence.setFirst(value+1);
190                    return true;
191                } else if (sequence.getLast() == value) {
192                    sequence.setLast(value-1);
193                    return true;
194                } else {
195                    sequence.linkBefore(new Sequence(sequence.first, value-1));
196                    sequence.linkAfter(new Sequence(value+1, sequence.last));
197                    sequence.unlink();
198                    return true;
199                }
200            }
201
202            sequence = sequence.getNext();
203        }
204
205        return false;
206    }
207
208    /**
209     * Removes and returns the first element from this list.
210     *
211     * @return the first element from this list.
212     * @throws NoSuchElementException if this list is empty.
213     */
214    public long removeFirst() {
215        if (isEmpty()) {
216            throw new NoSuchElementException();
217        }
218
219        Sequence rc = removeFirstSequence(1);
220        return rc.first;
221    }
222
223    /**
224     * Removes and returns the last sequence from this list.
225     *
226     * @return the last sequence from this list or null if the list is empty.
227     */
228    public Sequence removeLastSequence() {
229        if (isEmpty()) {
230            return null;
231        }
232
233        Sequence rc = getTail();
234        rc.unlink();
235        return rc;
236    }
237
238    /**
239     * Removes and returns the first sequence that is count range large.
240     *
241     * @return a sequence that is count range large, or null if no sequence is that large in the list.
242     */
243    public Sequence removeFirstSequence(long count) {
244        if (isEmpty()) {
245            return null;
246        }
247
248        Sequence sequence = getHead();
249        while (sequence != null ) {
250            if (sequence.range() == count ) {
251                sequence.unlink();
252                return sequence;
253            }
254            if (sequence.range() > count ) {
255                Sequence rc = new Sequence(sequence.first, sequence.first+count-1);
256                sequence.first+=count;
257                return rc;
258            }
259            sequence = sequence.getNext();
260        }
261        return null;
262    }
263
264    /**
265     * @return all the id Sequences that are missing from this set that are not
266     *         in between the range provided.
267     */
268    public List<Sequence> getMissing(long first, long last) {
269        ArrayList<Sequence> rc = new ArrayList<Sequence>();
270        if (first > last) {
271            throw new IllegalArgumentException("First cannot be more than last");
272        }
273        if (isEmpty()) {
274            // We are missing all the messages.
275            rc.add(new Sequence(first, last));
276            return rc;
277        }
278
279        Sequence sequence = getHead();
280        while (sequence != null && first <= last) {
281            if (sequence.contains(first)) {
282                first = sequence.last + 1;
283            } else {
284                if (first < sequence.first) {
285                    if (last < sequence.first) {
286                        rc.add(new Sequence(first, last));
287                        return rc;
288                    } else {
289                        rc.add(new Sequence(first, sequence.first - 1));
290                        first = sequence.last + 1;
291                    }
292                }
293            }
294            sequence = sequence.getNext();
295        }
296
297        if (first <= last) {
298            rc.add(new Sequence(first, last));
299        }
300        return rc;
301    }
302
303    /**
304     * @return all the Sequence that are in this list
305     */
306    public List<Sequence> getReceived() {
307        ArrayList<Sequence> rc = new ArrayList<Sequence>(size());
308        Sequence sequence = getHead();
309        while (sequence != null) {
310            rc.add(new Sequence(sequence.first, sequence.last));
311            sequence = sequence.getNext();
312        }
313        return rc;
314    }
315
316    /**
317     * Returns true if the value given is contained within one of the
318     * sequences held in this set.
319     *
320     * @param value
321     *      The value to search for in the set.
322     *
323     * @return true if the value is contained in the set.
324     */
325    public boolean contains(long value) {
326        if (isEmpty()) {
327            return false;
328        }
329
330        Sequence sequence = getHead();
331        while (sequence != null) {
332            if (sequence.contains(value)) {
333                return true;
334            }
335            sequence = sequence.getNext();
336        }
337
338        return false;
339    }
340
341    public boolean contains(int first, int last) {
342        if (isEmpty()) {
343            return false;
344        }
345        Sequence sequence = getHead();
346        while (sequence != null) {
347            if (sequence.first <= first && first <= sequence.last ) {
348                return last <= sequence.last;
349            }
350            sequence = sequence.getNext();
351        }
352        return false;
353    }
354
355    public Sequence get(int value) {
356        if (!isEmpty()) {
357            Sequence sequence = getHead();
358            while (sequence != null) {
359                if (sequence.contains(value)) {
360                    return sequence;
361                }
362                sequence = sequence.getNext();
363            }
364        }
365        return null;
366    }
367
368
369    /**
370     * Computes the size of this Sequence by summing the values of all
371     * the contained sequences.
372     *
373     * @return the total number of values contained in this set if it
374     *         were to be iterated over like an array.
375     */
376    public long rangeSize() {
377        long result = 0;
378        Sequence sequence = getHead();
379        while (sequence != null) {
380            result += sequence.range();
381            sequence = sequence.getNext();
382        }
383        return result;
384    }
385
386    public Iterator<Long> iterator() {
387        return new SequenceIterator();
388    }
389
390    private class SequenceIterator implements Iterator<Long> {
391
392        private Sequence currentEntry;
393        private long lastReturned = -1;
394
395        public SequenceIterator() {
396            currentEntry = getHead();
397            if (currentEntry != null) {
398                lastReturned = currentEntry.first - 1;
399            }
400        }
401
402        public boolean hasNext() {
403            return currentEntry != null;
404        }
405
406        public Long next() {
407            if (currentEntry == null) {
408                throw new NoSuchElementException();
409            }
410
411            if(lastReturned < currentEntry.first) {
412                lastReturned = currentEntry.first;
413                if (currentEntry.range() == 1) {
414                    currentEntry = currentEntry.getNext();
415                }
416            } else {
417                lastReturned++;
418                if (lastReturned == currentEntry.last) {
419                    currentEntry = currentEntry.getNext();
420                }
421            }
422
423            return lastReturned;
424        }
425
426        public void remove() {
427            throw new UnsupportedOperationException();
428        }
429
430    }
431
432}