001/**
002 * Licensed to the Apache Software Foundation (ASF) under one or more
003 * contributor license agreements.  See the NOTICE file distributed with
004 * this work for additional information regarding copyright ownership.
005 * The ASF licenses this file to You under the Apache License, Version 2.0
006 * (the "License"); you may not use this file except in compliance with
007 * the License.  You may obtain a copy of the License at
008 *
009 *      http://www.apache.org/licenses/LICENSE-2.0
010 *
011 * Unless required by applicable law or agreed to in writing, software
012 * distributed under the License is distributed on an "AS IS" BASIS,
013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014 * See the License for the specific language governing permissions and
015 * limitations under the License.
016 */
017package org.apache.activemq.console.command;
018
019import org.apache.activemq.ActiveMQConnectionFactory;
020import org.apache.activemq.command.ActiveMQDestination;
021import org.apache.activemq.util.ProducerThread;
022import org.slf4j.Logger;
023import org.slf4j.LoggerFactory;
024
025import javax.jms.Connection;
026import javax.jms.Session;
027import java.util.List;
028import java.util.concurrent.CountDownLatch;
029
030public class ProducerCommand extends AbstractCommand {
031    private static final Logger LOG = LoggerFactory.getLogger(ProducerCommand.class);
032
033    String brokerUrl = ActiveMQConnectionFactory.DEFAULT_BROKER_URL;
034    String user = ActiveMQConnectionFactory.DEFAULT_USER;
035    String password = ActiveMQConnectionFactory.DEFAULT_PASSWORD;
036    String destination = "queue://TEST";
037    int messageCount = 1000;
038    int sleep = 0;
039    boolean persistent = true;
040    String message = null;
041    String payloadUrl = null;
042    int messageSize = 0;
043    int textMessageSize;
044    long msgTTL = 0L;
045    String msgGroupID=null;
046    int transactionBatchSize;
047    private int parallelThreads = 1;
048
049    @Override
050    protected void runTask(List<String> tokens) throws Exception {
051        LOG.info("Connecting to URL: " + brokerUrl + " (" + user + ":" + password + ")");
052        LOG.info("Producing messages to " + destination);
053        LOG.info("Using " + (persistent ? "persistent" : "non-persistent") + " messages");
054        LOG.info("Sleeping between sends " + sleep + " ms");
055        LOG.info("Running " + parallelThreads + " parallel threads");
056
057        ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory(brokerUrl);
058        Connection conn = null;
059        try {
060            conn = factory.createConnection(user, password);
061            conn.start();
062
063            CountDownLatch active = new CountDownLatch(parallelThreads);
064
065            for (int i = 1; i <= parallelThreads; i++) {
066                Session sess;
067                if (transactionBatchSize != 0) {
068                    sess = conn.createSession(true, Session.SESSION_TRANSACTED);
069                } else {
070                    sess = conn.createSession(false, Session.AUTO_ACKNOWLEDGE);
071                }
072                ProducerThread producer = new ProducerThread(sess, ActiveMQDestination.createDestination(destination, ActiveMQDestination.QUEUE_TYPE));
073                producer.setName("producer-" + i);
074                producer.setMessageCount(messageCount);
075                producer.setSleep(sleep);
076                producer.setMsgTTL(msgTTL);
077                producer.setPersistent(persistent);
078                producer.setTransactionBatchSize(transactionBatchSize);
079                producer.setMessage(message);
080                producer.setPayloadUrl(payloadUrl);
081                producer.setMessageSize(messageSize);
082                producer.setMsgGroupID(msgGroupID);
083                producer.setTextMessageSize(textMessageSize);
084                producer.setFinished(active);
085                producer.start();
086            }
087
088            active.await();
089        } finally {
090            if (conn != null) {
091                conn.close();
092            }
093        }
094    }
095
096    public String getBrokerUrl() {
097        return brokerUrl;
098    }
099
100    public void setBrokerUrl(String brokerUrl) {
101        this.brokerUrl = brokerUrl;
102    }
103
104    public String getDestination() {
105        return destination;
106    }
107
108    public void setDestination(String destination) {
109        this.destination = destination;
110    }
111
112    public int getMessageCount() {
113        return messageCount;
114    }
115
116    public void setMessageCount(int messageCount) {
117        this.messageCount = messageCount;
118    }
119
120    public int getSleep() {
121        return sleep;
122    }
123
124    public void setSleep(int sleep) {
125        this.sleep = sleep;
126    }
127
128    public boolean isPersistent() {
129        return persistent;
130    }
131
132    public void setPersistent(boolean persistent) {
133        this.persistent = persistent;
134    }
135
136    public int getMessageSize() {
137        return messageSize;
138    }
139
140    public void setMessageSize(int messageSize) {
141        this.messageSize = messageSize;
142    }
143
144    public int getTextMessageSize() {
145        return textMessageSize;
146    }
147
148    public void setTextMessageSize(int textMessageSize) {
149        this.textMessageSize = textMessageSize;
150    }
151
152    public long getMsgTTL() {
153        return msgTTL;
154    }
155
156    public void setMsgTTL(long msgTTL) {
157        this.msgTTL = msgTTL;
158    }
159
160    public String getMsgGroupID() {
161        return msgGroupID;
162    }
163
164    public void setMsgGroupID(String msgGroupID) {
165        this.msgGroupID = msgGroupID;
166    }
167
168    public int getTransactionBatchSize() {
169        return transactionBatchSize;
170    }
171
172    public void setTransactionBatchSize(int transactionBatchSize) {
173        this.transactionBatchSize = transactionBatchSize;
174    }
175
176    public String getUser() {
177        return user;
178    }
179
180    public void setUser(String user) {
181        this.user = user;
182    }
183
184    public String getPassword() {
185        return password;
186    }
187
188    public void setPassword(String password) {
189        this.password = password;
190    }
191
192    public int getParallelThreads() {
193        return parallelThreads;
194    }
195
196    public void setParallelThreads(int parallelThreads) {
197        this.parallelThreads = parallelThreads;
198    }
199
200    public String getPayloadUrl() {
201        return payloadUrl;
202    }
203
204    public void setPayloadUrl(String payloadUrl) {
205        this.payloadUrl = payloadUrl;
206    }
207
208    public String getMessage() {
209        return message;
210    }
211
212    public void setMessage(String message) {
213        this.message = message;
214    }
215
216    @Override
217    protected void printHelp() {
218        printHelpFromFile();
219    }
220
221    @Override
222    public String getName() {
223        return "producer";
224    }
225
226    @Override
227    public String getOneLineDescription() {
228        return "Sends messages to the broker";
229    }
230}