1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20 package org.backsource.adaptor.jms;
21 import javax.jms.QueueConnection;
22 import javax.jms.TopicConnection;
23 import javax.jms.Session;
24 import javax.jms.QueueSession;
25 import javax.jms.TopicSession;
26 import javax.jms.QueueSender;
27 import javax.jms.TopicPublisher;
28 import javax.jms.Message;
29 import javax.jms.TextMessage;
30 import javax.jms.Destination;
31 import javax.jms.Queue;
32 import javax.jms.Topic;
33 import javax.jms.JMSException;
34
35 import org.backsource.jmx.MBeanProxyFactory;
36 /***
37 * A JMS producer.
38 *
39 * <p>This class holds a connection and specific destination. Users get a MessageProducer from it. Currently this will create a session each time the getProducer is called, but it should always be left back when used, since failover handling will not work correct othervise! if given a ProducerJNDI an Mbean proxy will be bound at that name that implements {@link ProducerFactory}.</p>
40 *
41 * @author <a href="mailto:pra@tim.se">Peter Antman</a>
42 * @version $Revision: 1.1.1.1 $
43 * @jmx:mbean name="jms:service=JmsProducer,name=default" extends="org.backsource.adaptor.jms.JmsBaseMBean,org.backsource.adaptor.jms.ProducerFactory"
44 */
45
46 public class Producer extends JmsBase implements org.backsource.adaptor.jms.ProducerMBean {
47 protected String producerJNDI;
48 protected ProducerFactory prodProxy;
49
50 public Producer (){
51
52 }
53 /***
54 * Set a JNDI name to bind an MBean proxy to this MBean that implements {@link ProducerFactory}.
55 *<p>if not set a startup, binding will not be done.</p>
56 * @jmx:managed-attribute
57 */
58 public void setProducerJNDI(String producerJNDI) {
59 this.producerJNDI = producerJNDI;
60 }
61 /***
62 * @jmx:managed-attribute
63 */
64 public String getProducerJNDI() {
65 return producerJNDI;
66 }
67 /***
68 * Get a MessageProducer.
69 *
70 * <p>Currently a new session will be created each time this is called. Even if this is the case; the recommended usage pattern is to get it at each invokation and close it; since pooling may be introduced.</p>
71 *
72 * @jmx:managed-attribute
73 */
74 public MessageProducer getProducer() throws JMSException {
75 if ( connectionException) {
76 throw new JMSException("Connection exception: no connection to provider");
77 }
78 if ( connection != null) {
79 Session sess = null;
80 if ( isTopic) {
81 sess = ((TopicConnection)connection).createTopicSession(transacted,
82 ackMode);
83 } else {
84 sess = ((QueueConnection)connection).createQueueSession(transacted,
85 ackMode);
86 }
87 return new MessageProducerImpl(this,sess,destination,isTopic);
88 } else {
89 throw new JMSException("No connection");
90 }
91
92
93
94
95 }
96 /***
97 * Used by MessageProcucer to leave itself back to the factory.
98 *
99 * @jmx:managed-operation
100 */
101 public void leaveProducer(MessageProducer prod) {
102 try {
103
104 if ( prod != null) {
105 Session s = prod.getSession();
106 if ( s != null) {
107 s.close();
108 }
109
110 }
111
112 } catch ( Throwable e) {
113 log.error("Could not take back producer: " +e,e);
114 }
115
116 }
117 /***
118 * Publish message as a text message.
119 *
120 * @jmx:managed-operation
121 */
122 public void publish(String message) throws JMSException {
123 MessageProducer p = getProducer();
124 Session s = p.getSession();
125 TextMessage m = s.createTextMessage(message);
126 log.debug("Sending message: " + message);
127 p.send(m);
128 p.close();
129 }
130
131
132
133 protected void startService() throws Exception {
134 super.startService();
135 if (producerJNDI != null ) {
136 log.info("Binding producer proxy into jndi: " + producerJNDI);
137 prodProxy = (ProducerFactory)MBeanProxyFactory.create(ProducerFactory.class,
138 name.toString());
139 MBeanProxyFactory.rebind(producerJNDI, prodProxy);
140 }
141
142 }
143
144 protected void stopService() throws Exception {
145 if ( prodProxy != null) {
146 MBeanProxyFactory.unbind(producerJNDI);
147 prodProxy = null;
148 }
149 super.stopService();
150 }
151
152 /***
153 * An extended message procuder.
154 *
155 * <p>This class implements the jms 1.1 interface for 1.0.2 providers. It also ads the possibility to get the session; to have something to create messages from.</p>
156 */
157 static class MessageProducerImpl implements MessageProducer {
158 boolean isTopic = true;
159 Session session;
160 Producer factory;
161 javax.jms.MessageProducer producer;
162 Destination dest;
163
164 MessageProducerImpl(Producer factory,Session session, Destination dest,boolean isTopic) throws JMSException{
165 this.factory = factory;
166 this.session = session;
167 this.isTopic = isTopic;
168 this.dest = dest;
169
170 if ( isTopic) {
171 producer = ((TopicSession)session).createPublisher((Topic)dest);
172 } else {
173 producer = ((QueueSession)session).createSender((Queue)dest);
174 }
175
176 }
177
178
179 public void setDisableMessageID(boolean value) throws JMSException {
180 producer.setDisableMessageID(value);
181 }
182
183 public boolean getDisableMessageID() throws JMSException {
184 return producer.getDisableMessageID();
185 }
186
187 public void setDisableMessageTimestamp(boolean value) throws JMSException {
188 producer.setDisableMessageTimestamp(value);
189 }
190
191 public boolean getDisableMessageTimestamp() throws JMSException {
192 return producer.getDisableMessageTimestamp();
193 }
194
195 public void setDeliveryMode(int deliveryMode) throws JMSException {
196 producer.setDeliveryMode(deliveryMode);
197 }
198
199 public int getDeliveryMode() throws JMSException {
200 return producer.getDeliveryMode();
201 }
202
203 public void setPriority(int defaultPriorityFromLow0High9) throws JMSException {
204 producer.setPriority(defaultPriorityFromLow0High9);
205 }
206
207 public int getPriority() throws JMSException {
208 return producer.getPriority();
209 }
210
211 public void setTimeToLive(long timeToLiveInMilliseconds) throws JMSException {
212 producer.setTimeToLive(timeToLiveInMilliseconds);
213 }
214
215 public long getTimeToLive() throws JMSException {
216 return producer.getTimeToLive();
217 }
218
219 public void close() throws JMSException {
220 producer.close();
221 factory.leaveProducer(this);
222 }
223
224 public void send(Message message) throws JMSException {
225 if ( isTopic) {
226 ((TopicPublisher)producer).publish(message);
227 } else {
228 ((QueueSender)producer).send(message);
229 }
230
231
232 }
233
234 public void send(
235 Message message,
236 int deliveryMode,
237 int priority,
238 long timeToLive
239 )
240 throws JMSException {
241 if ( isTopic) {
242 ((TopicPublisher)producer).publish(message,deliveryMode,priority,timeToLive);
243 } else {
244 ((QueueSender)producer).send(message,deliveryMode,priority,timeToLive);
245 }
246
247 }
248
249 public void send(Destination destination, Message message) throws JMSException {
250 if ( isTopic) {
251 ((TopicPublisher)producer).publish((Topic)destination,message);
252 } else {
253 ((QueueSender)producer).send((Queue)destination,message);
254 }
255
256
257 }
258 public void send(
259 Destination destination,
260 Message message,
261 int deliveryMode,
262 int priority,
263 long timeToLive
264 )
265 throws JMSException {
266 if ( isTopic) {
267 ((TopicPublisher)producer).publish((Topic)destination,message,deliveryMode,priority,timeToLive);
268 } else {
269 ((QueueSender)producer).send((Queue)destination,message,deliveryMode,priority,timeToLive);
270 }
271 }
272
273 /***
274 * Get the underlying session; please:only use it to create new messages!
275 */
276 public Session getSession() {
277 return session;
278 }
279 }
280 }