001/**
002 * Licensed to the Apache Software Foundation (ASF) under one or more
003 * contributor license agreements.  See the NOTICE file distributed with
004 * this work for additional information regarding copyright ownership.
005 * The ASF licenses this file to You under the Apache License, Version 2.0
006 * (the "License"); you may not use this file except in compliance with
007 * the License.  You may obtain a copy of the License at
008 *
009 *      http://www.apache.org/licenses/LICENSE-2.0
010 *
011 * Unless required by applicable law or agreed to in writing, software
012 * distributed under the License is distributed on an "AS IS" BASIS,
013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014 * See the License for the specific language governing permissions and
015 * limitations under the License.
016 */
017package org.apache.activemq.transport.nio;
018
019import java.io.IOException;
020import java.nio.channels.spi.AbstractSelectableChannel;
021import java.util.LinkedList;
022import java.util.concurrent.Executor;
023import java.util.concurrent.ExecutorService;
024import java.util.concurrent.SynchronousQueue;
025import java.util.concurrent.ThreadFactory;
026import java.util.concurrent.ThreadPoolExecutor;
027import java.util.concurrent.TimeUnit;
028
029/**
030 * The SelectorManager will manage one Selector and the thread that checks the
031 * selector.
032 *
033 * We may need to consider running more than one thread to check the selector if
034 * servicing the selector takes too long.
035 */
036public final class SelectorManager {
037
038    public static final SelectorManager SINGLETON = new SelectorManager();
039
040    private Executor selectorExecutor = createDefaultExecutor();
041    private Executor channelExecutor = selectorExecutor;
042    private final LinkedList<SelectorWorker> freeWorkers = new LinkedList<SelectorWorker>();
043    private int maxChannelsPerWorker = 1024;
044
045    protected ExecutorService createDefaultExecutor() {
046        ThreadPoolExecutor rc = new ThreadPoolExecutor(getDefaultCorePoolSize(), getDefaultMaximumPoolSize(), getDefaultKeepAliveTime(), TimeUnit.SECONDS, new SynchronousQueue<Runnable>(),
047            new ThreadFactory() {
048
049                private long i = 0;
050
051                @Override
052                public Thread newThread(Runnable runnable) {
053                    Thread t = new Thread(runnable, "ActiveMQ NIO Worker " + (i++));
054                    t.setDaemon(false);
055                    return t;
056                }
057            });
058
059        return rc;
060    }
061
062    private static int getDefaultCorePoolSize() {
063        return Integer.getInteger("org.apache.activemq.transport.nio.SelectorManager.corePoolSize", 0);
064    }
065
066    private static int getDefaultMaximumPoolSize() {
067        return Integer.getInteger("org.apache.activemq.transport.nio.SelectorManager.maximumPoolSize", Integer.MAX_VALUE);
068    }
069
070    private static int getDefaultKeepAliveTime() {
071        return Integer.getInteger("org.apache.activemq.transport.nio.SelectorManager.keepAliveTime", 30);
072    }
073
074    public static SelectorManager getInstance() {
075        return SINGLETON;
076    }
077
078    public interface Listener {
079        void onSelect(SelectorSelection selector);
080
081        void onError(SelectorSelection selection, Throwable error);
082    }
083
084    public synchronized SelectorSelection register(AbstractSelectableChannel selectableChannel, Listener listener) throws IOException {
085        SelectorSelection selection = null;
086        while (selection == null) {
087            if (freeWorkers.size() > 0) {
088                SelectorWorker worker = freeWorkers.getFirst();
089                if (worker.isReleased()) {
090                    freeWorkers.remove(worker);
091                } else {
092                    worker.retain();
093                    selection = new SelectorSelection(worker, selectableChannel, listener);
094                }
095            } else {
096                // Worker starts /w retain count of 1
097                SelectorWorker worker = new SelectorWorker(this);
098                freeWorkers.addFirst(worker);
099                selection = new SelectorSelection(worker, selectableChannel, listener);
100            }
101        }
102
103        return selection;
104    }
105
106    synchronized void onWorkerFullEvent(SelectorWorker worker) {
107        freeWorkers.remove(worker);
108    }
109
110    public synchronized void onWorkerEmptyEvent(SelectorWorker worker) {
111        freeWorkers.remove(worker);
112    }
113
114    public synchronized void onWorkerNotFullEvent(SelectorWorker worker) {
115        freeWorkers.addFirst(worker);
116    }
117
118    public Executor getChannelExecutor() {
119        return channelExecutor;
120    }
121
122    public void setChannelExecutor(Executor channelExecutor) {
123        this.channelExecutor = channelExecutor;
124    }
125
126    public int getMaxChannelsPerWorker() {
127        return maxChannelsPerWorker;
128    }
129
130    public void setMaxChannelsPerWorker(int maxChannelsPerWorker) {
131        this.maxChannelsPerWorker = maxChannelsPerWorker;
132    }
133
134    public Executor getSelectorExecutor() {
135        return selectorExecutor;
136    }
137
138    public void setSelectorExecutor(Executor selectorExecutor) {
139        this.selectorExecutor = selectorExecutor;
140    }
141}