001/**
002 * Licensed to the Apache Software Foundation (ASF) under one or more
003 * contributor license agreements.  See the NOTICE file distributed with
004 * this work for additional information regarding copyright ownership.
005 * The ASF licenses this file to You under the Apache License, Version 2.0
006 * (the "License"); you may not use this file except in compliance with
007 * the License.  You may obtain a copy of the License at
008 *
009 *      http://www.apache.org/licenses/LICENSE-2.0
010 *
011 * Unless required by applicable law or agreed to in writing, software
012 * distributed under the License is distributed on an "AS IS" BASIS,
013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014 * See the License for the specific language governing permissions and
015 * limitations under the License.
016 */
017package org.apache.activemq.broker.region.cursors;
018
019import java.io.IOException;
020import java.util.Iterator;
021import java.util.LinkedList;
022import java.util.concurrent.atomic.AtomicBoolean;
023import java.util.concurrent.atomic.AtomicLong;
024import org.apache.activemq.broker.Broker;
025import org.apache.activemq.broker.ConnectionContext;
026import org.apache.activemq.broker.region.Destination;
027import org.apache.activemq.broker.region.IndirectMessageReference;
028import org.apache.activemq.broker.region.MessageReference;
029import org.apache.activemq.broker.region.QueueMessageReference;
030import org.apache.activemq.command.Message;
031import org.apache.activemq.filter.NonCachedMessageEvaluationContext;
032import org.apache.activemq.openwire.OpenWireFormat;
033import org.apache.activemq.store.PList;
034import org.apache.activemq.store.PListStore;
035import org.apache.activemq.store.PListEntry;
036import org.apache.activemq.usage.SystemUsage;
037import org.apache.activemq.usage.Usage;
038import org.apache.activemq.usage.UsageListener;
039import org.apache.activemq.wireformat.WireFormat;
040import org.slf4j.Logger;
041import org.slf4j.LoggerFactory;
042import org.apache.activemq.util.ByteSequence;
043
044/**
045 * persist pending messages pending message (messages awaiting dispatch to a
046 * consumer) cursor
047 *
048 *
049 */
050public class FilePendingMessageCursor extends AbstractPendingMessageCursor implements UsageListener {
051    static final Logger LOG = LoggerFactory.getLogger(FilePendingMessageCursor.class);
052    private static final AtomicLong NAME_COUNT = new AtomicLong();
053    protected Broker broker;
054    private final PListStore store;
055    private final String name;
056    private PendingList memoryList;
057    private PList diskList;
058    private Iterator<MessageReference> iter;
059    private Destination regionDestination;
060    private boolean iterating;
061    private boolean flushRequired;
062    private final AtomicBoolean started = new AtomicBoolean();
063    private final WireFormat wireFormat = new OpenWireFormat();
064    /**
065     * @param broker
066     * @param name
067     * @param prioritizedMessages
068     */
069    public FilePendingMessageCursor(Broker broker, String name, boolean prioritizedMessages) {
070        super(prioritizedMessages);
071        if (this.prioritizedMessages) {
072            this.memoryList = new PrioritizedPendingList();
073        } else {
074            this.memoryList = new OrderedPendingList();
075        }
076        this.broker = broker;
077        // the store can be null if the BrokerService has persistence
078        // turned off
079        this.store = broker.getTempDataStore();
080        this.name = NAME_COUNT.incrementAndGet() + "_" + name;
081    }
082
083    @Override
084    public void start() throws Exception {
085        if (started.compareAndSet(false, true)) {
086            if( this.broker != null) {
087                wireFormat.setVersion(this.broker.getBrokerService().getStoreOpenWireVersion());
088            }
089            super.start();
090            if (systemUsage != null) {
091                systemUsage.getMemoryUsage().addUsageListener(this);
092            }
093        }
094    }
095
096    @Override
097    public void stop() throws Exception {
098        if (started.compareAndSet(true, false)) {
099            super.stop();
100            if (systemUsage != null) {
101                systemUsage.getMemoryUsage().removeUsageListener(this);
102            }
103        }
104    }
105
106    /**
107     * @return true if there are no pending messages
108     */
109    @Override
110    public synchronized boolean isEmpty() {
111        if (memoryList.isEmpty() && isDiskListEmpty()) {
112            return true;
113        }
114        for (Iterator<MessageReference> iterator = memoryList.iterator(); iterator.hasNext();) {
115            MessageReference node = iterator.next();
116            if (node == QueueMessageReference.NULL_MESSAGE) {
117                continue;
118            }
119            if (!node.isDropped()) {
120                return false;
121            }
122            // We can remove dropped references.
123            iterator.remove();
124        }
125        return isDiskListEmpty();
126    }
127
128    /**
129     * reset the cursor
130     */
131    @Override
132    public synchronized void reset() {
133        iterating = true;
134        last = null;
135        if (isDiskListEmpty()) {
136            this.iter = this.memoryList.iterator();
137        } else {
138            this.iter = new DiskIterator();
139        }
140    }
141
142    @Override
143    public synchronized void release() {
144        iterating = false;
145        if (iter instanceof DiskIterator) {
146           ((DiskIterator)iter).release();
147        };
148        if (flushRequired) {
149            flushRequired = false;
150            if (!hasSpace()) {
151                flushToDisk();
152            }
153        }
154        // ensure any memory ref is released
155        iter = null;
156    }
157
158    @Override
159    public synchronized void destroy() throws Exception {
160        stop();
161        for (Iterator<MessageReference> i = memoryList.iterator(); i.hasNext();) {
162            MessageReference node = i.next();
163            node.decrementReferenceCount();
164        }
165        memoryList.clear();
166        destroyDiskList();
167    }
168
169    private void destroyDiskList() throws Exception {
170        if (diskList != null) {
171            store.removePList(name);
172            diskList = null;
173        }
174    }
175
176    @Override
177    public synchronized LinkedList<MessageReference> pageInList(int maxItems) {
178        LinkedList<MessageReference> result = new LinkedList<MessageReference>();
179        int count = 0;
180        for (Iterator<MessageReference> i = memoryList.iterator(); i.hasNext() && count < maxItems;) {
181            MessageReference ref = i.next();
182            ref.incrementReferenceCount();
183            result.add(ref);
184            count++;
185        }
186        if (count < maxItems && !isDiskListEmpty()) {
187            for (Iterator<MessageReference> i = new DiskIterator(); i.hasNext() && count < maxItems;) {
188                Message message = (Message) i.next();
189                message.setRegionDestination(regionDestination);
190                message.setMemoryUsage(this.getSystemUsage().getMemoryUsage());
191                message.incrementReferenceCount();
192                result.add(message);
193                count++;
194            }
195        }
196        return result;
197    }
198
199    /**
200     * add message to await dispatch
201     *
202     * @param node
203     * @throws Exception
204     */
205    @Override
206    public synchronized boolean tryAddMessageLast(MessageReference node, long maxWaitTime) throws Exception {
207        if (!node.isExpired()) {
208            try {
209                regionDestination = (Destination) node.getMessage().getRegionDestination();
210                if (isDiskListEmpty()) {
211                    if (hasSpace() || this.store == null) {
212                        memoryList.addMessageLast(node);
213                        node.incrementReferenceCount();
214                        setCacheEnabled(true);
215                        return true;
216                    }
217                }
218                if (!hasSpace()) {
219                    if (isDiskListEmpty()) {
220                        expireOldMessages();
221                        if (hasSpace()) {
222                            memoryList.addMessageLast(node);
223                            node.incrementReferenceCount();
224                            return true;
225                        } else {
226                            flushToDisk();
227                        }
228                    }
229                }
230                if (systemUsage.getTempUsage().waitForSpace(maxWaitTime)) {
231                    ByteSequence bs = getByteSequence(node.getMessage());
232                    getDiskList().addLast(node.getMessageId().toString(), bs);
233                    return true;
234                }
235                return false;
236
237            } catch (Exception e) {
238                LOG.error("Caught an Exception adding a message: {} first to FilePendingMessageCursor ", node, e);
239                throw new RuntimeException(e);
240            }
241        } else {
242            discardExpiredMessage(node);
243        }
244        //message expired
245        return true;
246    }
247
248    /**
249     * add message to await dispatch
250     *
251     * @param node
252     */
253    @Override
254    public synchronized void addMessageFirst(MessageReference node) {
255        if (!node.isExpired()) {
256            try {
257                regionDestination = (Destination) node.getMessage().getRegionDestination();
258                if (isDiskListEmpty()) {
259                    if (hasSpace()) {
260                        memoryList.addMessageFirst(node);
261                        node.incrementReferenceCount();
262                        setCacheEnabled(true);
263                        return;
264                    }
265                }
266                if (!hasSpace()) {
267                    if (isDiskListEmpty()) {
268                        expireOldMessages();
269                        if (hasSpace()) {
270                            memoryList.addMessageFirst(node);
271                            node.incrementReferenceCount();
272                            return;
273                        } else {
274                            flushToDisk();
275                        }
276                    }
277                }
278                systemUsage.getTempUsage().waitForSpace();
279                node.decrementReferenceCount();
280                ByteSequence bs = getByteSequence(node.getMessage());
281                Object locator = getDiskList().addFirst(node.getMessageId().toString(), bs);
282                node.getMessageId().setPlistLocator(locator);
283
284            } catch (Exception e) {
285                LOG.error("Caught an Exception adding a message: {} first to FilePendingMessageCursor ", node, e);
286                throw new RuntimeException(e);
287            }
288        } else {
289            discardExpiredMessage(node);
290        }
291    }
292
293    /**
294     * @return true if there pending messages to dispatch
295     */
296    @Override
297    public synchronized boolean hasNext() {
298        return iter.hasNext();
299    }
300
301    /**
302     * @return the next pending message
303     */
304    @Override
305    public synchronized MessageReference next() {
306        MessageReference reference = iter.next();
307        last = reference;
308        if (!isDiskListEmpty()) {
309            // got from disk
310            reference.getMessage().setRegionDestination(regionDestination);
311            reference.getMessage().setMemoryUsage(this.getSystemUsage().getMemoryUsage());
312        }
313        reference.incrementReferenceCount();
314        return reference;
315    }
316
317    /**
318     * remove the message at the cursor position
319     */
320    @Override
321    public synchronized void remove() {
322        iter.remove();
323        if (last != null) {
324            last.decrementReferenceCount();
325        }
326    }
327
328    /**
329     * @param node
330     * @see org.apache.activemq.broker.region.cursors.AbstractPendingMessageCursor#remove(org.apache.activemq.broker.region.MessageReference)
331     */
332    @Override
333    public synchronized void remove(MessageReference node) {
334        if (memoryList.remove(node) != null) {
335            node.decrementReferenceCount();
336        }
337        if (!isDiskListEmpty()) {
338            try {
339                getDiskList().remove(node.getMessageId().getPlistLocator());
340            } catch (IOException e) {
341                throw new RuntimeException(e);
342            }
343        }
344    }
345
346    /**
347     * @return the number of pending messages
348     */
349    @Override
350    public synchronized int size() {
351        return memoryList.size() + (isDiskListEmpty() ? 0 : (int)getDiskList().size());
352    }
353
354    @Override
355    public synchronized long messageSize() {
356        return memoryList.messageSize() + (isDiskListEmpty() ? 0 : (int)getDiskList().messageSize());
357    }
358
359    /**
360     * clear all pending messages
361     */
362    @Override
363    public synchronized void clear() {
364        memoryList.clear();
365        if (!isDiskListEmpty()) {
366            try {
367                getDiskList().destroy();
368            } catch (IOException e) {
369                throw new RuntimeException(e);
370            }
371        }
372        last = null;
373    }
374
375    @Override
376    public synchronized boolean isFull() {
377
378        return super.isFull() || (!isDiskListEmpty() && systemUsage != null && systemUsage.getTempUsage().isFull());
379
380    }
381
382    @Override
383    public boolean hasMessagesBufferedToDeliver() {
384        return !isEmpty();
385    }
386
387    @Override
388    public void setSystemUsage(SystemUsage usageManager) {
389        super.setSystemUsage(usageManager);
390    }
391
392    @Override
393    public void onUsageChanged(Usage usage, int oldPercentUsage, int newPercentUsage) {
394        if (newPercentUsage >= getMemoryUsageHighWaterMark()) {
395            synchronized (this) {
396                if (!flushRequired && size() != 0) {
397                    flushRequired =true;
398                    if (!iterating) {
399                        expireOldMessages();
400                        if (!hasSpace()) {
401                            flushToDisk();
402                            flushRequired = false;
403                        }
404                    }
405                }
406            }
407        }
408    }
409
410    @Override
411    public boolean isTransient() {
412        return true;
413    }
414
415    protected synchronized void expireOldMessages() {
416        if (!memoryList.isEmpty()) {
417            for (Iterator<MessageReference> iterator = memoryList.iterator(); iterator.hasNext();) {
418                MessageReference node = iterator.next();
419                if (node.isExpired()) {
420                    node.decrementReferenceCount();
421                    discardExpiredMessage(node);
422                    iterator.remove();
423                }
424            }
425        }
426    }
427
428    protected synchronized void flushToDisk() {
429        if (!memoryList.isEmpty() && store != null) {
430            long start = 0;
431             if (LOG.isTraceEnabled()) {
432                start = System.currentTimeMillis();
433                LOG.trace("{}, flushToDisk() mem list size: {} {}", new Object[]{ name, memoryList.size(), (systemUsage != null ? systemUsage.getMemoryUsage() : "") });
434             }
435            for (Iterator<MessageReference> iterator = memoryList.iterator(); iterator.hasNext();) {
436                MessageReference node = iterator.next();
437                node.decrementReferenceCount();
438                ByteSequence bs;
439                try {
440                    bs = getByteSequence(node.getMessage());
441                    getDiskList().addLast(node.getMessageId().toString(), bs);
442                } catch (IOException e) {
443                    LOG.error("Failed to write to disk list", e);
444                    throw new RuntimeException(e);
445                }
446
447            }
448            memoryList.clear();
449            setCacheEnabled(false);
450            LOG.trace("{}, flushToDisk() done - {} ms {}", new Object[]{ name, (System.currentTimeMillis() - start), (systemUsage != null ? systemUsage.getMemoryUsage() : "") });
451        }
452    }
453
454    protected boolean isDiskListEmpty() {
455        return diskList == null || diskList.isEmpty();
456    }
457
458    public PList getDiskList() {
459        if (diskList == null) {
460            try {
461                diskList = store.getPList(name);
462            } catch (Exception e) {
463                LOG.error("Caught an IO Exception getting the DiskList {}", name, e);
464                throw new RuntimeException(e);
465            }
466        }
467        return diskList;
468    }
469
470    private void discardExpiredMessage(MessageReference reference) {
471        LOG.debug("Discarding expired message {}", reference);
472        if (broker.isExpired(reference)) {
473            ConnectionContext context = new ConnectionContext(new NonCachedMessageEvaluationContext());
474            context.setBroker(broker);
475            ((Destination)reference.getRegionDestination()).messageExpired(context, null, new IndirectMessageReference(reference.getMessage()));
476        }
477    }
478
479    protected ByteSequence getByteSequence(Message message) throws IOException {
480        org.apache.activemq.util.ByteSequence packet = wireFormat.marshal(message);
481        return new ByteSequence(packet.data, packet.offset, packet.length);
482    }
483
484    protected Message getMessage(ByteSequence bs) throws IOException {
485        org.apache.activemq.util.ByteSequence packet = new org.apache.activemq.util.ByteSequence(bs.getData(), bs
486                .getOffset(), bs.getLength());
487        return (Message) this.wireFormat.unmarshal(packet);
488
489    }
490
491    final class DiskIterator implements Iterator<MessageReference> {
492        private final PList.PListIterator iterator;
493        DiskIterator() {
494            try {
495                iterator = getDiskList().iterator();
496            } catch (Exception e) {
497                throw new RuntimeException(e);
498            }
499        }
500
501        @Override
502        public boolean hasNext() {
503            return iterator.hasNext();
504        }
505
506        @Override
507        public MessageReference next() {
508            try {
509                PListEntry entry = iterator.next();
510                Message message = getMessage(entry.getByteSequence());
511                message.getMessageId().setPlistLocator(entry.getLocator());
512                return message;
513            } catch (IOException e) {
514                LOG.error("I/O error", e);
515                throw new RuntimeException(e);
516            }
517        }
518
519        @Override
520        public void remove() {
521            iterator.remove();
522        }
523
524        public void release() {
525            iterator.release();
526        }
527    }
528}