1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20 package org.backsource.adaptor.jms;
21 import java.util.Enumeration;
22 import javax.jms.*;
23 import javax.management.ObjectName;
24 import org.backsource.adaptor.jms.MessageProducer;
25 import org.backsource.adaptor.jms.Listener;
26 /***
27 * Simple first cut on non transactional bride.
28 *
29 * @author <a href="mailto:pra@tim.se">Peter Antman</a>
30 * @version $Revision: 1.1.1.1 $
31 * @jmx:mbean name="jms:service=JmsBridge,name=MyBridge" extends="org.backsource.adaptor.jms.ListenerMBean"
32 */
33
34 public class Bridge extends Listener implements org.backsource.adaptor.jms.BridgeMBean {
35 private ObjectName publisherMBean;
36 private int bufferSize = 32 * 1024;
37 public Bridge(){
38
39 }
40 /***
41 * @jmx:managed-attribute
42 */
43 public void setPublisherMBean(ObjectName publisherMBean) {
44 this.publisherMBean = publisherMBean;
45 }
46 /***
47 * @jmx:managed-attribute
48 */
49 public ObjectName getPublisherMBean() {
50 return publisherMBean;
51 }
52
53 protected MessageProducer getPublisher() throws Exception {
54
55 if ( publisherMBean != null) {
56 return (MessageProducer)server.getAttribute(publisherMBean, "Producer");
57 } else {
58 throw new Exception("No publisherMBean to invoke");
59 }
60 }
61
62
63
64 public void onMessage(Message message) {
65
66 MessageProducer publisher = null;
67 try {
68 publisher = getPublisher();
69 Message m = createOutputMessage(publisher.getSession(),message);
70 publisher.send(m);
71
72 } catch (Throwable e) {
73 log.error("Could no handle message "+ message+ ": " +e,e);
74 } finally {
75 if ( publisher != null) {
76 try {
77 publisher.close();
78 } catch (Exception e) {
79
80 }
81
82 }
83
84
85 }
86
87
88 }
89
90
91 /***
92 * Factory method to create an output message given an input message.
93 * Derived classes could override this method to perform any kind of
94 * Message transformation.
95 */
96 protected Message createOutputMessage(Session session,Message inputMessage) throws JMSException {
97 Message outputMessage = null;
98
99 if ( inputMessage instanceof TextMessage ) {
100 outputMessage = createOutputTextMessage(session, (TextMessage) inputMessage );
101 }
102 else if ( inputMessage instanceof ObjectMessage ) {
103 outputMessage = createOutputObjectMessage(session, (ObjectMessage) inputMessage );
104 }
105 else if ( inputMessage instanceof MapMessage ) {
106 outputMessage = createOutputMapMessage(session, (MapMessage) inputMessage );
107 }
108 else if ( inputMessage instanceof BytesMessage ) {
109 outputMessage = createOutputBytesMessage(session, (BytesMessage) inputMessage );
110 }
111 else if ( inputMessage instanceof StreamMessage ) {
112 outputMessage = createOutputStreamMessage(session, (StreamMessage) inputMessage );
113 }
114 else {
115 outputMessage = session.createMessage();
116 }
117
118 processMessageHeaders(inputMessage, outputMessage);
119
120 return outputMessage;
121 }
122
123 /***
124 * Factory method to create ObjectMessage
125 * Derived classes could override this method to perform any kind of
126 * Message transformation.
127 */
128 protected ObjectMessage createOutputObjectMessage(Session session,ObjectMessage inputMessage) throws JMSException {
129 return session.createObjectMessage( inputMessage.getObject() );
130 }
131
132 /***
133 * Factory method to create TextMessage
134 * Derived classes could override this method to perform any kind of
135 * Message transformation.
136 */
137 protected TextMessage createOutputTextMessage(Session session,TextMessage inputMessage) throws JMSException {
138 return session.createTextMessage( inputMessage.getText() );
139 }
140
141 /***
142 * Factory method to create MapMessage
143 * Derived classes could override this method to perform any kind of
144 * Message transformation.
145 */
146 protected MapMessage createOutputMapMessage(Session session,MapMessage inputMessage) throws JMSException {
147 MapMessage answer = session.createMapMessage();
148
149
150 for ( Enumeration enum = inputMessage.getMapNames(); enum.hasMoreElements(); ) {
151 String name = (String) enum.nextElement();
152 Object value = inputMessage.getObject( name );
153 answer.setObject( name, value );
154 }
155 return answer;
156 }
157
158 /***
159 * Factory method to create BytesMessage
160 * Derived classes could override this method to perform any kind of
161 * Message transformation.
162 */
163 protected BytesMessage createOutputBytesMessage(Session session,BytesMessage inputMessage) throws JMSException {
164 BytesMessage answer = session.createBytesMessage();
165
166
167 byte[] buffer = new byte[bufferSize];
168 while (true ) {
169 int size = inputMessage.readBytes( buffer );
170 if ( size <= 0 ) {
171 break;
172 }
173 answer.writeBytes( buffer, 0, size );
174 if ( size < bufferSize ) {
175 break;
176 }
177 }
178 return answer;
179 }
180
181 /***
182 * Factory method to create StreamMessage
183 * Derived classes could override this method to perform any kind of
184 * Message transformation.
185 */
186 protected StreamMessage createOutputStreamMessage(Session session,StreamMessage inputMessage) throws JMSException {
187 StreamMessage answer = session.createStreamMessage();
188
189
190 byte[] buffer = new byte[bufferSize];
191 while (true ) {
192 int size = inputMessage.readBytes( buffer );
193 if ( size <= 0 ) {
194 break;
195 }
196 answer.writeBytes( buffer, 0, size );
197 if ( size < bufferSize ) {
198 break;
199 }
200 }
201 return answer;
202 }
203
204
205
206 /***
207 * Strategy method to add any headers required on the output message.
208 * Derived classes could override this method to perform any kind of
209 * header processing, such as copying the correlation ID, copying all
210 * headers or adding some new custom headers etc.
211 */
212 protected void processMessageHeaders(Message inputMessage, Message outputMessage) throws JMSException {
213
214 for(Enumeration en = inputMessage.getPropertyNames();en.hasMoreElements();)
215 {
216 String key = (String) en.nextElement();
217 outputMessage.setStringProperty(key,inputMessage.getStringProperty(key));
218
219 }
220
221
222 outputMessage.setJMSCorrelationID(inputMessage.getJMSCorrelationID() );
223 outputMessage.setJMSDeliveryMode( inputMessage.getJMSDeliveryMode() );
224 outputMessage.setJMSExpiration(inputMessage.getJMSExpiration() );
225 outputMessage.setJMSPriority( inputMessage.getJMSPriority() );
226 outputMessage.setJMSReplyTo( inputMessage.getJMSReplyTo() );
227
228 }
229 }