View Javadoc

1   /*
2    * Copyright (c) 2002 Peter Antman, Teknik i Media  <peter.antman@tim.se>
3    *
4    * $Id: Bridge.java,v 1.1.1.1 2004/05/19 12:26:42 pra Exp $
5    *
6    * This library is free software; you can redistribute it and/or
7    * modify it under the terms of the GNU Lesser General Public
8    * License as published by the Free Software Foundation; either
9    * version 2 of the License, or (at your option) any later version
10   * 
11   * This library is distributed in the hope that it will be useful,
12   * but WITHOUT ANY WARRANTY; without even the implied warranty of
13   * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14   * Lesser General Public License for more details.
15   * 
16   * You should have received a copy of the GNU Lesser General Public
17   * License along with this library; if not, write to the Free Software
18   * Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA  02111-1307  USA
19   */
20  package org.backsource.adaptor.jms;
21  import java.util.Enumeration;
22  import javax.jms.*;
23  import javax.management.ObjectName;
24  import org.backsource.adaptor.jms.MessageProducer;
25  import org.backsource.adaptor.jms.Listener;
26  /***
27   * Simple first cut on non transactional bride.
28   *
29   * @author <a href="mailto:pra@tim.se">Peter Antman</a>
30   * @version $Revision: 1.1.1.1 $
31   * @jmx:mbean name="jms:service=JmsBridge,name=MyBridge" extends="org.backsource.adaptor.jms.ListenerMBean"
32   */
33  
34  public class Bridge extends Listener implements org.backsource.adaptor.jms.BridgeMBean {
35     private ObjectName publisherMBean;
36     private int bufferSize = 32 * 1024;
37     public Bridge(){
38        
39     }
40     /***
41      * @jmx:managed-attribute
42      */
43     public void setPublisherMBean(ObjectName publisherMBean) {
44        this.publisherMBean = publisherMBean;
45     }
46     /***
47      * @jmx:managed-attribute
48      */
49     public ObjectName getPublisherMBean() {
50        return publisherMBean;
51     }
52     
53     protected  MessageProducer getPublisher() throws Exception {
54        // Do we have a publisher MBean?
55        if ( publisherMBean != null) {
56           return (MessageProducer)server.getAttribute(publisherMBean, "Producer");
57        } else {
58           throw new Exception("No publisherMBean to invoke");
59        } // end of else
60     }
61  
62        
63        
64     public void onMessage(Message message) {
65        // Resend a message back.
66         MessageProducer publisher = null;
67        try {
68           publisher = getPublisher();
69           Message m = createOutputMessage(publisher.getSession(),message);
70           publisher.send(m);
71           
72        } catch (Throwable e) {
73           log.error("Could no handle message "+ message+ ": " +e,e);
74        } finally {
75           if ( publisher != null) {
76              try {
77                 publisher.close();
78              } catch (Exception e) {
79                 //Nothing to do
80              } // end of try-catch
81  
82           } 
83           // end of if ()
84           
85        } // end of finally
86        
87        //
88     }
89  
90  
91     /***
92      * Factory method to create an output message given an input message.
93      * Derived classes could override this method to perform any kind of 
94      * Message transformation.
95      */
96     protected Message createOutputMessage(Session session,Message inputMessage) throws JMSException {
97        Message outputMessage = null;
98          
99        if ( inputMessage instanceof TextMessage ) {
100          outputMessage = createOutputTextMessage(session, (TextMessage) inputMessage );
101       }
102       else if ( inputMessage instanceof ObjectMessage ) {
103          outputMessage = createOutputObjectMessage(session, (ObjectMessage) inputMessage );
104       }
105       else if ( inputMessage instanceof MapMessage ) {
106          outputMessage = createOutputMapMessage(session, (MapMessage) inputMessage );
107       }
108       else if ( inputMessage instanceof BytesMessage ) {
109          outputMessage = createOutputBytesMessage(session, (BytesMessage) inputMessage );
110       }
111       else if ( inputMessage instanceof StreamMessage ) {
112          outputMessage = createOutputStreamMessage(session, (StreamMessage) inputMessage );
113       }
114       else {
115          outputMessage = session.createMessage();
116       }
117         
118       processMessageHeaders(inputMessage, outputMessage);
119         
120       return outputMessage;
121    }
122         
123    /***
124     * Factory method to create ObjectMessage 
125     * Derived classes could override this method to perform any kind of 
126     * Message transformation.
127     */
128    protected ObjectMessage createOutputObjectMessage(Session session,ObjectMessage inputMessage) throws JMSException {
129       return session.createObjectMessage( inputMessage.getObject() );
130    }
131     
132    /***
133     * Factory method to create TextMessage 
134     * Derived classes could override this method to perform any kind of 
135     * Message transformation.
136     */
137    protected TextMessage createOutputTextMessage(Session session,TextMessage inputMessage) throws JMSException {
138       return session.createTextMessage( inputMessage.getText() );
139    }
140     
141    /***
142     * Factory method to create MapMessage 
143     * Derived classes could override this method to perform any kind of 
144     * Message transformation.
145     */
146    protected MapMessage createOutputMapMessage(Session session,MapMessage inputMessage) throws JMSException {
147       MapMessage answer = session.createMapMessage();
148         
149       // copy across all values
150       for ( Enumeration enum = inputMessage.getMapNames(); enum.hasMoreElements(); ) {
151          String name = (String) enum.nextElement();
152          Object value = inputMessage.getObject( name );
153          answer.setObject( name, value );
154       }
155       return answer;
156    }
157     
158    /***
159     * Factory method to create BytesMessage 
160     * Derived classes could override this method to perform any kind of 
161     * Message transformation.
162     */
163    protected BytesMessage createOutputBytesMessage(Session session,BytesMessage inputMessage) throws JMSException {
164       BytesMessage answer = session.createBytesMessage();
165         
166       // copy across all data
167       byte[] buffer = new byte[bufferSize];
168       while (true ) {
169          int size = inputMessage.readBytes( buffer );
170          if ( size <= 0 ) {
171             break;
172          }
173          answer.writeBytes( buffer, 0, size );
174          if ( size < bufferSize ) {
175             break;
176          }
177       }
178       return answer;
179    }
180     
181    /***
182     * Factory method to create StreamMessage 
183     * Derived classes could override this method to perform any kind of 
184     * Message transformation.
185     */
186    protected StreamMessage createOutputStreamMessage(Session session,StreamMessage inputMessage) throws JMSException {
187       StreamMessage answer = session.createStreamMessage();
188         
189       // copy across all data
190       byte[] buffer = new byte[bufferSize];
191       while (true ) {
192          int size = inputMessage.readBytes( buffer );
193          if ( size <= 0 ) {
194             break;
195          }
196          answer.writeBytes( buffer, 0, size );
197          if ( size < bufferSize ) {
198             break;
199          }
200       }
201       return answer;
202    }
203     
204     
205     
206    /***
207     * Strategy method to add any headers required on the output message.
208     * Derived classes could override this method to perform any kind of 
209     * header processing, such as copying the correlation ID, copying all
210     * headers or adding some new custom headers etc.
211     */
212    protected void processMessageHeaders(Message inputMessage, Message outputMessage) throws JMSException {
213       // Set headers
214       for(Enumeration en = inputMessage.getPropertyNames();en.hasMoreElements();)
215          {
216             String key = (String) en.nextElement();
217             outputMessage.setStringProperty(key,inputMessage.getStringProperty(key));
218             
219          }
220       // set jms stuff
221       
222       outputMessage.setJMSCorrelationID(inputMessage.getJMSCorrelationID() );
223       outputMessage.setJMSDeliveryMode( inputMessage.getJMSDeliveryMode() );
224       outputMessage.setJMSExpiration(inputMessage.getJMSExpiration() );
225       outputMessage.setJMSPriority( inputMessage.getJMSPriority() );
226       outputMessage.setJMSReplyTo( inputMessage.getJMSReplyTo() );
227       
228    }
229 }// Bridge