1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20 package org.backsource.adaptor.jms;
21 import java.util.Date;
22 import javax.jms.*;
23
24 import javax.naming.Context;
25 import javax.naming.InitialContext;
26 import javax.naming.Name;
27 import javax.naming.NamingException;
28
29 import javax.management.ObjectName;
30 import javax.management.MBeanServer;
31 import javax.management.Notification;
32 import javax.management.NotificationBroadcaster;
33 import javax.management.NotificationFilter;
34 import javax.management.NotificationFilterSupport;
35 import javax.management.MalformedObjectNameException;
36
37 import org.jboss.jms.jndi.JMSProviderAdapter;
38
39 import org.backsource.jmx.ServiceMBeanSupport;
40 /***
41 *
42 * @author <a href="mailto:pra@tim.se">Peter Antman</a>
43 * @version $Revision: 1.1.1.1 $
44 * @jmx:mbean name="jms:service=MessageDriven,name=default" extends="org.backsource.adaptor.jms.JmsBaseMBean"
45 */
46
47 public class MessageDriven extends JmsBase
48 implements org.backsource.adaptor.jms.MessageDrivenMBean,MessageListener {
49
50
51
52 private boolean subscriptionDurability = false;
53 private String messageSelector;
54 private String subscriptionId;
55 private boolean pullSubscriber= false;
56 private long pullTimeput = 1000*2;
57
58
59 MessageConsumer subscriber;
60 ReceiverImpl receiver;
61
62 public MessageDriven (){
63
64 }
65
66 /***
67 * @jmx:managed-attribute
68 */
69 public boolean getDurable() {
70 return subscriptionDurability;
71 }
72 /***
73 * @jmx:managed-attribute
74 */
75 public void setDurable(boolean subscriptionDurability) {
76 this.subscriptionDurability = subscriptionDurability;
77 }
78 /***
79 * @jmx:managed-attribute
80 */
81 public String getMessageSelector()
82 {
83 return messageSelector;
84 }
85 /***
86 * @jmx:managed-attribute
87 */
88 public void setMessageSelector(String messageSelector) {
89 if ( messageSelector != null) {
90
91 int i = -1;
92
93 while ((i = messageSelector.indexOf("\r\n")) >= 0)
94 {
95
96 messageSelector = (i == 0 ? "" : messageSelector.substring(0, i)) + " " +
97 (i >= messageSelector.length() - 2 ? "" : messageSelector.substring(i + 2));
98 }
99 i = -1;
100
101 while (( i = messageSelector.indexOf("\r")) >= 0)
102 {
103
104 messageSelector = (i == 0 ? "" : messageSelector.substring(0, i)) + " " +
105 (i >= messageSelector.length() - 1 ? "" : messageSelector.substring(i + 1));
106 }
107 i = -1;
108
109 while ((i = messageSelector.indexOf("\n")) >= 0)
110 {
111
112 messageSelector = (i == 0 ? "" : messageSelector.substring(0, i)) + " " +
113 (i >= messageSelector.length() - 1 ? "" : messageSelector.substring(i + 1));
114 }
115
116
117
118 messageSelector = messageSelector.trim();
119 if ("".equals(messageSelector))
120 {
121 messageSelector = null;
122 }
123 }
124 this.messageSelector = messageSelector;
125
126 }
127
128 /***
129 * @jmx:managed-attribute
130 */
131 public String getSubscriptionId()
132 {
133 return subscriptionId;
134 }
135 /***
136 * @jmx:managed-attribute
137 */
138 public void setSubscriptionId(String subscriptionId) {
139 this.subscriptionId = subscriptionId;
140 }
141
142 /***
143 * @jmx:managed-attribute
144 */
145 public boolean getPullSubscription() {
146 return pullSubscriber;
147 }
148 /***
149 * @jmx:managed-attribute
150 */
151 public void setPullSubscription(boolean pullSubscriber) {
152 this.pullSubscriber = pullSubscriber;
153 }
154 /***
155 * @jmx:managed-attribute
156 */
157 public long getPullTimeout() {
158 return pullTimeput;
159 }
160 /***
161 * @jmx:managed-attribute
162 */
163 public void setPullTimeout(long pullTimeput) {
164 this.pullTimeput = pullTimeput;
165 }
166
167
168 protected void destroyService() throws Exception
169 {
170 if ( receiver !=null) {
171 receiver.stop();
172 receiver = null;
173 }
174
175
176 try
177 {
178 if (subscriber != null)
179 {
180 if ( stopOnFail) {
181 subscriber.close();
182 }
183
184 subscriber= null;
185 }
186 }
187 catch (Exception e)
188 {
189 log.error("Failed to close connection consumer: "+e);
190 }
191 super.destroyService();
192 }
193
194 public void onMessage(final Message message){
195 log.debug("Got message " + message);
196 Notification n = new Notification(null,
197 name,
198 new Date().getTime()
199 );
200 n.setUserData(message);
201 sendNotification(n);
202 }
203
204
205
206
207
208 protected void startTopic() throws JMSException {
209 super.startTopic();
210 TopicSession session = ((TopicConnection)connection).createTopicSession(
211
212 false,
213
214 Session.AUTO_ACKNOWLEDGE);
215
216 if (subscriptionDurability ) {
217 subscriber = session.createDurableSubscriber((Topic)destination, subscriptionId,messageSelector, false);
218 } else {
219 subscriber = session.createSubscriber((Topic)destination, messageSelector, false);
220 }
221
222 if ( !pullSubscriber) {
223 subscriber.setMessageListener(this);
224 } else {
225 receive(subscriber);
226 }
227
228 }
229
230 protected void startQueue() throws JMSException {
231 super.startQueue();
232
233 QueueSession session = ((QueueConnection)connection).createQueueSession(
234
235 false,
236
237 Session.AUTO_ACKNOWLEDGE);
238
239
240 subscriber = session.createReceiver((Queue)destination, messageSelector);
241
242
243 if ( !pullSubscriber) {
244 subscriber.setMessageListener(this);
245 } else {
246 receive(subscriber);
247 }
248 }
249
250
251 protected void receive( MessageConsumer consumer) throws JMSException {
252
253 receiver = new ReceiverImpl(consumer);
254 new Thread("JMSContainerInvoker Create Recovery Thread") {
255 public void run()
256 {
257
258 receiver.receive();
259 }
260 }.start();
261 }
262
263
264 class ReceiverImpl {
265 boolean running = true;
266 Thread currentThread;
267 MessageConsumer consumer;
268 public ReceiverImpl(MessageConsumer consumer) {
269 this.consumer = consumer;
270 }
271 public void receive() {
272 currentThread = Thread.currentThread();
273
274 log.debug("Starting receiving");
275 while ( running) {
276 try {
277 Message msg = consumer.receive(pullTimeput);
278
279 if ( msg != null) {
280 onMessage(msg);
281 }
282 } catch (JMSException e) {
283
284
285 if ( running ) {
286 log.error("Could no receive message: "+e,e);
287
288 running = false;
289 if (!failsafe) {
290
291 exListener.onException( new JMSException("Could not receive: " + e) );
292 }
293 }
294
295 }
296 }
297 currentThread = null;
298 }
299 void stop()
300 {
301 log.debug("Stop requested");
302
303 running = false;
304 if (currentThread != null)
305 {
306 currentThread.interrupt();
307 log.debug("Current thread interrupted");
308 }
309 }
310 }
311 }