1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20 package org.backsource.adaptor.jms;
21 import javax.jms.*;
22
23 import javax.naming.Context;
24 import javax.naming.InitialContext;
25 import javax.naming.Name;
26 import javax.naming.NamingException;
27
28 import javax.management.ObjectName;
29 import javax.management.MBeanServer;
30 import javax.management.MalformedObjectNameException;
31
32 import org.jboss.jms.jndi.JMSProviderAdapter;
33
34 import org.backsource.jmx.ServiceMBeanSupport;
35 import org.backsource.jmx.ObjectNameFactory;
36 /***
37 * Base MBean for JMS MBeans.
38 *
39 * <p>This baseclass handles the common attributes; ands setups of connectiona dn topic. One instance of an MBean of this type, handles only one connection and one destination.</p>
40 *
41 * @author <a href="mailto:pra@tim.se">Peter Antman</a>
42 * @version $Revision: 1.2 $
43 * @jmx:mbean name="jms:service=Jms,name=default" extends="org.backsource.jmx.ServiceMBean"
44 */
45
46 public class JmsBase extends ServiceMBeanSupport
47 implements org.backsource.adaptor.jms.JmsBaseMBean{
48 public static final ObjectName OBJECT_NAME = ObjectNameFactory.create("jms:service=Jms,name=default");
49
50 protected ObjectName name = OBJECT_NAME;
51 protected MBeanServer server;
52
53
54 private String providerAdapterJNDI;
55 private String destinationFactoryJNDI;
56 private String destinationType;
57 private String destinationJndiName;
58 private String user;
59 private String passwd;
60 private String clientId;
61
62 private long reconnectInterval=1000*5;
63
64 protected boolean failsafe = true;
65 protected boolean stopOnFail = true;
66 protected boolean isTopic = true;
67 protected boolean transacted = false;
68 protected int ackMode = Session.AUTO_ACKNOWLEDGE;
69
70 protected Connection connection;
71 protected Destination destination;
72 protected ExceptionListenerImpl exListener;
73 volatile protected boolean connectionException = false;
74
75 public JmsBase (){
76
77 }
78 protected ObjectName getObjectName(MBeanServer server, ObjectName name)
79 throws MalformedObjectNameException
80 {
81 this.server = server;
82 if ( name != null) {
83 this.name = name;
84 }
85
86 return this.name;
87 }
88
89 /***
90 * @jmx:managed-attribute
91 */
92 public String getJMSProviderAdapterJNDI() {
93 return providerAdapterJNDI;
94 }
95 /***
96 * @jmx:managed-attribute
97 */
98 public void setJMSProviderAdapterJNDI(String providerAdapterJNDI) {
99 if ( providerAdapterJNDI != null) {
100 if (!providerAdapterJNDI.startsWith("java:/"))
101 {
102 providerAdapterJNDI = "java:/" + providerAdapterJNDI;
103 }
104 }
105 this.providerAdapterJNDI = providerAdapterJNDI;
106 }
107 /***
108 * @jmx:managed-attribute
109 */
110 public String getDestinationFactory() {
111 return destinationFactoryJNDI;
112 }
113 /***
114 * @jmx:managed-attribute
115 */
116 public void setDestinationFactory(String destinationFactoryJNDI) {
117 this.destinationFactoryJNDI = destinationFactoryJNDI;
118 }
119 /***
120 * @jmx:managed-attribute
121 */
122 public String getDestinationType()
123 {
124 return destinationType;
125 }
126 /***
127 * @jmx:managed-attribute
128 */
129 public void setDestinationType(String destinationType) {
130 if ( destinationType != null) {
131 isTopic = "javax.jms.topic".equals(destinationType.toLowerCase());
132 }
133
134 this.destinationType = destinationType;
135 }
136 /***
137 * @jmx:managed-attribute
138 */
139 public String getDestinationJndiName()
140 {
141 return destinationJndiName;
142 }
143 /***
144 * @jmx:managed-attribute
145 */
146 public void setDestinationJndiName(String destinationJndiName) {
147 this.destinationJndiName = destinationJndiName;
148 }
149 /***
150 * @jmx:managed-attribute
151 */
152 public String getUsername()
153 {
154 return user;
155 }
156 /***
157 * @jmx:managed-attribute
158 */
159 public void setUsername(String user) {
160 this.user = user;
161 }
162 /***
163 * @jmx:managed-attribute
164 */
165 public String getPassword()
166 {
167 return passwd;
168 }
169 /***
170 * @jmx:managed-attribute
171 */
172 public void setPassword(String passwd) {
173 this.passwd = passwd;
174 }
175 /***
176 * @jmx:managed-attribute
177 */
178 public String getClientId()
179 {
180 return clientId;
181 }
182 /***
183 * @jmx:managed-attribute
184 */
185 public void setClientId(String clientId) {
186 this.clientId = clientId;
187 }
188 /***
189 * @jmx:managed-attribute
190 */
191 public boolean getFailsafe() {
192 return failsafe;
193 }
194 /***
195 * @jmx:managed-attribute
196 */
197 public void setFailsafe(boolean failsafe) {
198 this.failsafe = failsafe;
199 }
200 /***
201 * @jmx:managed-attribute
202 */
203 public boolean getStopOnFail() {
204 return stopOnFail;
205 }
206 /***
207 * @jmx:managed-attribute
208 */
209 public void setStopOnFail(boolean stopOnFail) {
210 this.stopOnFail = stopOnFail;
211 }
212
213 /***
214 * @jmx:managed-attribute
215 */
216 public long getReconnectInterval() {
217 return reconnectInterval;
218 }
219 /***
220 * @jmx:managed-attribute
221 */
222 public void setReconnectInterval(long reconnectInterval) {
223 this.reconnectInterval = reconnectInterval;
224 }
225
226
227 protected void createService() throws Exception
228 {
229 exListener = new ExceptionListenerImpl(this);
230
231 }
232
233 protected void startService() throws Exception {
234
235 innerCreate();
236 if (connection != null)
237 {
238 if ( failsafe) {
239 connection.setExceptionListener(exListener);
240 }
241
242
243 connection.start();
244 }
245 }
246
247 protected void stopService() throws Exception
248 {
249 log.info("Stopping JMS service");
250
251 if (exListener != null)
252 {
253 exListener.stop();
254 }
255
256 innerStop();
257
258 }
259
260 protected void destroyService() throws Exception
261 {
262
263 if (connection != null)
264 {
265 try
266 {
267 log.info("Closing down connection");
268
269 if ( connectionException && !stopOnFail) {
270 log.warn("ConnectionException and stop on fail is false; skip closing connection");
271 }else {
272 connection.close();
273 }
274
275 connection = null;
276
277 }
278 catch (Exception e)
279 {
280 log.error("Failed to close connection: "+e);
281 }
282 }
283 }
284
285
286 protected void innerCreate() throws JMSException {
287 if ( isTopic) {
288 startTopic();
289 } else {
290 startQueue();
291 }
292 connectionException = false;
293 }
294 /***
295 * Stop done from inside, we should not stop the
296 * exceptionListener in inner stop.
297 */
298 protected void innerStop() throws JMSException
299 {
300 if (connection != null)
301 {
302 connection.setExceptionListener(null);
303 log.debug("Unset exception listener");
304 if ( connectionException && !stopOnFail) {
305 log.warn("ConnectionException and stop on fail is false; skip closing connection");
306 }else {
307 try {
308 connection.stop();
309 } catch (Exception e) {
310 log.error("Could not stop connection: " +e);
311 }
312
313 }
314
315 log.debug("Connection stopped");
316 }
317 }
318 protected void startTopic() throws JMSException {
319 TopicConnectionFactory factory = (TopicConnectionFactory)getFactory();
320 TopicConnection myconnection ;
321 if (user == null) {
322 myconnection =
323 factory.createTopicConnection();
324 }
325 else {
326 myconnection =factory.createTopicConnection(user, passwd);
327 }
328 connection = myconnection;
329 log.debug("Using connection: " + connection);
330
331 log.debug("Using client id: " + clientId);
332 if (clientId != null && clientId.length() > 0)
333 connection.setClientID(clientId);
334
335 destination = getDestination();
336
337 }
338
339 protected void startQueue() throws JMSException {
340 QueueConnectionFactory factory = (QueueConnectionFactory)getFactory();
341 QueueConnection myconnection;
342 if (user == null) {
343 myconnection =
344 factory.createQueueConnection();
345 }
346 else {
347 myconnection =factory.createQueueConnection(user, passwd);
348 }
349 connection = myconnection;
350 log.debug("Using connection: " + connection);
351
352 log.debug("Using client id: " + clientId);
353 if (clientId != null && clientId.length() > 0)
354 connection.setClientID(clientId);
355
356 destination = (Queue)getDestination();
357
358 }
359
360
361 protected ConnectionFactory getFactory() throws JMSException {
362 try {
363
364 String factoryRef = null;
365 if ( providerAdapterJNDI != null) {
366 JMSProviderAdapter adapter = getJMSProviderAdapter();
367 if ( isTopic) {
368 factoryRef = adapter.getTopicFactoryRef();
369 } else {
370 factoryRef = adapter.getQueueFactoryRef();
371 }
372
373
374 } else if (destinationFactoryJNDI != null ) {
375 factoryRef = destinationFactoryJNDI;
376 } else {
377
378 throw new JMSException("No factory name specifyed");
379 }
380
381 return (ConnectionFactory)getContext().lookup(factoryRef);
382
383 } catch (NamingException e) {
384 throw new JMSException("Could no lookup connection factory: " +e);
385 }
386
387 }
388
389 protected Destination getDestination() throws JMSException {
390
391 try {
392
393 return (Destination)getContext().lookup(destinationJndiName);
394 } catch (NamingException e) {
395 throw new JMSException("Could no lookup connection destination: " +e);
396 }
397
398
399 }
400
401 protected Context getContext() throws JMSException {
402 try {
403 if ( providerAdapterJNDI != null) {
404 return getJMSProviderAdapter().getInitialContext();
405 } else {
406 return new InitialContext();
407 }
408 } catch ( NamingException e) {
409 throw new JMSException("Could get an initial context: " + e);
410 }
411
412 }
413
414 /***
415 * Return the JMSProviderAdapter that should be used.
416 *
417 * @return The JMSProviderAdapter to use.
418 */
419 protected JMSProviderAdapter getJMSProviderAdapter()
420 throws JMSException
421 {
422 Context context = null;
423 try
424 {
425 context = new InitialContext();
426 log.debug("Looking up provider adapter: " + providerAdapterJNDI);
427 return (JMSProviderAdapter)context.lookup(providerAdapterJNDI);
428 }
429 catch ( NamingException e) {
430 throw new JMSException("Could no lookup provider adapeter: " +e);
431 }
432
433 finally
434 {
435 if ( context != null) {
436 try {
437 context.close();
438 } catch (NamingException e) {
439
440 }
441 }
442
443 }
444 }
445
446 /***
447 * ExceptionListener for failover handling.
448 */
449 class ExceptionListenerImpl
450 implements ExceptionListener
451 {
452 JmsBase invoker;
453 Thread currentThread;
454 volatile boolean running = true;
455
456 ExceptionListenerImpl(final JmsBase invoker)
457 {
458 this.invoker = invoker;
459 }
460
461 public void onException(JMSException ex)
462 {
463 connectionException = true;
464 currentThread = Thread.currentThread();
465
466 log.warn("JMS provider failure detected: ", ex);
467 boolean restartInvoker = true;
468
469 while (restartInvoker && running)
470 {
471 log.info("Trying to reconnect to JMS provider");
472 try
473 {
474 try
475 {
476 Thread.sleep(reconnectInterval);
477 }
478 catch (InterruptedException ie)
479 {
480 return;
481 }
482
483
484 invoker.innerStop();
485 invoker.destroyService();
486 invoker.innerCreate();
487 invoker.startService();
488
489
490 restartInvoker = false;
491 connectionException = false;
492 log.info("Reconnected to JMS provider");
493 }
494 catch (Exception e)
495 {
496 log.error("Reconnect failed: JMS provider failure detected", e);
497 }
498 }
499
500 currentThread = null;
501 }
502
503 void stop()
504 {
505 log.debug("Stop requested");
506
507 running = false;
508 if (currentThread != null)
509 {
510 currentThread.interrupt();
511 log.debug("Current thread interrupted");
512 }
513 }
514 }
515
516 }