与spring结合

在Spring中嵌入ActiveMQ有四种方式:纯Spring配置文件、在Spring的配置文件中引入ActiveMQ的配置文件、使用硬代码及ActiveMQ配置文件和在Spring配置文件中使用特定的schema。

[b]纯Spring配置文件[/b]

<?xml version="1.0" encoding="UTF-8"?>  
<beans xmlns="http://www.springframework.org/schema/beans"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns:p="http://www.springframework.org/schema/p"
xmlns:context="http://www.springframework.org/schema/context"
xsi:schemaLocation="
http://www.springframework.org/schema/beans
http://www.springframework.org/schema/beans/spring-beans-3.0.xsd
http://www.springframework.org/schema/context
http://www.springframework.org/schema/context/spring-context-3.0.xsd">

<bean id="admins" class="org.apache.activemq.security.AuthenticationUser">
<constructor-arg index="0" value="admin" />
<constructor-arg index="1" value="password" />
<constructor-arg index="2" value="admins,publishers,consumers" />
</bean>
<bean id="publishers" class="org.apache.activemq.security.AuthenticationUser">
<constructor-arg index="0" value="publisher" />
<constructor-arg index="1" value="password" />
<constructor-arg index="2" value="publishers,consumers" />
</bean>
<bean id="consumers" class="org.apache.activemq.security.AuthenticationUser">
<constructor-arg index="0" value="consumer" />
<constructor-arg index="1" value="password" />
<constructor-arg index="2" value="consumers" />
</bean>
<bean id="guests" class="org.apache.activemq.security.AuthenticationUser">
<constructor-arg index="0" value="guest" />
<constructor-arg index="1" value="password" />
<constructor-arg index="2" value="guests" />
</bean>
<bean id="simpleAuthPlugin" class="org.apache.activemq.security.SimpleAuthenticationPlugin">
<property name="users">
<list>
<ref bean="admins" />
<ref bean="publishers" />
<ref bean="consumers" />
<ref bean="guests" />
</list>
</property>
</bean>
<bean id="broker" class="org.apache.activemq.broker.BrokerService" init-method="start" destroy-method="stop">
<property name="brokerName" value="myBroker" />
<property name="persistent" value="false" />
<property name="transportConnectorURIs">
<list>
<value>tcp://localhost:61616</value>
</list>
</property>
<property name="plugins">
<list>
<ref bean="simpleAuthPlugin" />
</list>
</property>
</bean>
</beans>

[b]在Spring的配置文件中引入ActiveMQ的配置文件[/b]

<?xml version="1.0" encoding="UTF-8"?>  
<beans xmlns="http://www.springframework.org/schema/beans"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns:p="http://www.springframework.org/schema/p"
xmlns:context="http://www.springframework.org/schema/context"
xsi:schemaLocation="
http://www.springframework.org/schema/beans
http://www.springframework.org/schema/beans/spring-beans-3.0.xsd
http://www.springframework.org/schema/context
http://www.springframework.org/schema/context/spring-context-3.0.xsd">

<!-- 其他Spring配置 -->

<bean id="broker" class="org.apache.activemq.xbean.BrokerFactoryBean">
<property name="config" value="activemq.xml" />
<property name="start" value="true"></property>
</bean>
</beans>

[b]硬编码+activemq配置文件方式[/b]

public static void main(String[] args) throws Exception {  
String configUri = MyConfiguration3.class.getResource("/")
+ "activemq.xml";
System.setProperty("activemq.base", System.getProperty("user.dir"));
new FileSystemXmlApplicationContext(configUri);
}

[b]使用特定的Schema[/b]

<?xml version="1.0" encoding="UTF-8"?>  
<beans xmlns="http://www.springframework.org/schema/beans"
<span style="color:#ff0000;">xmlns:amq="http://activemq.apache.org/schema/core"</span>
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns:p="http://www.springframework.org/schema/p"
xmlns:context="http://www.springframework.org/schema/context"
xsi:schemaLocation="
http://www.springframework.org/schema/beans
http://www.springframework.org/schema/beans/spring-beans-3.0.xsd
http://www.springframework.org/schema/context
http://www.springframework.org/schema/context/spring-context-3.0.xsd
<span style="color:#ff0000;">http://activemq.apache.org/schema/core
http://activemq.apache.org/schema/core/activemq-core.xsd</span>
">

<!-- 其他Spring配置 -->

<amq:broker brokerName="myBroker" dataDirectory="${activemq.base}/data">
<amq:transportConnectors>
<amq:transportConnector name="openwire" uri="tcp://localhost:61616" />
</amq:transportConnectors>
<amq:plugins>
<amq:simpleAuthenticationPlugin>
<amq:users>
<amq:authenticationUser username="admin" password="password" groups="admins,publishers,consumers" />
<amq:authenticationUser username="publisher" password="password" groups="publishers,consumers" />
<amq:authenticationUser username="consumer" password="password" groups="consumers" />
<amq:authenticationUser username="guest" password="password" groups="guests" />
</amq:users>
</amq:simpleAuthenticationPlugin>
</amq:plugins>
</amq:broker>
</beans>

[b]客户端配置[/b]

<bean id="jmsConnectionFactory" class="org.apache.activemq.ActiveMQConnectionFactory">
<property name="brokerURL" value="tcp://localhost:61616" />
<property name="userName" value="admin" />
<property name="password" value="password" />
</bean>

可以使用连接池

<bean id="pooledJmsConnectionFactory" class="org.apache.activemq.pool.PooledConnectionFactory" destroy-method="stop">
<property name="connectionFactory" ref="jmsConnectionFactory" />
</bean>

[b]配置JMS消息目的地[/b]

<bean id="cscoDest" class="org.apache.activemq.command.ActiveMQTopic">
<constructor-arg value="STOCKS.CSCO" />
</bean>

<bean id="orclDest" class="org.apache.activemq.command.ActiveMQTopic">
<constructor-arg value="STOCKS.ORCL" />
</bean>

[b]消息消费者和消息生产者[/b]
Spring中接收消息的基本抽象机制是使用消息监听器容器(MLC: 参阅 http://mng.bz/LJti).
MLC是在消息监听器和代理之间设计的中间件,用来处理连接,线程等等,让开发者只需要关注
消息监听器中具体的业务逻辑.

<!-- 这个监听器实现需要自己编写 -->
<bean id="portfolioListener" class="org.apache.activemq.book.ch3.portfolio.Listener">
</bean>

<!-- Spring DMLC -->
<bean id="cscoConsumer" class="org.springframework.jms.listener.DefaultMessageListenerContainer">
<property name="connectionFactory" ref="jmsConnectionFactory" />
<property name="destination" ref="cscoDest" />
<property name="messageListener" ref="portfolioListener" />
</bean>

<!-- Spring DMLC -->
<bean id="orclConsumer" class="org.springframework.jms.listener.DefaultMessageListenerContainer">
<property name="connectionFactory" ref="jmsConnectionFactory" />
<property name="destination" ref="orclDest" />
<property name="messageListener" ref="portfolioListener" />
</bean>

同接收消息一样,Spring同样为发送消息提供了便利.Spring中发送消息的基本机制是使用JmsTemplate类.
JmsTemplate使用标准的模板模式为发送消息提供一个很方便的类.

使用Spring发送消息的一个常用方法是实现Spring的MessageCreator接口然后使用JmsTemplate类中合适
的send()方法

public class StockMessageCreator implements MessageCreator 
{
private int MAX_DELTA_PERCENT = 1;
private Map<Destination, Double> LAST_PRICES = new Hashtable<Destination, Double>();
Destination stock;

public StockMessageCreator(Destination stock)
{
this.stock = stock;
}

public Message createMessage(Session session) throws JMSException
{
Double value = LAST_PRICES.get(stock);
if (value == null)
{
value = new Double(Math.random() * 100);
}

// lets mutate the value by some percentage
double oldPrice = value.doubleValue();
value = new Double(mutatePrice(oldPrice));
LAST_PRICES.put(stock, value);
double price = value.doubleValue();
double offer = price * 1.001;
boolean up = (price > oldPrice);
MapMessage message = session.createMapMessage();
message.setString("stock", stock.toString());
message.setDouble("price", price);
message.setDouble("offer", offer);
message.setBoolean("up", up);
System.out.println("Sending: " + ((ActiveMQMapMessage)message).getContentMap() + " on destination: " + stock);
return message;
}

protected double mutatePrice(double price)
{
double percentChange = (2 * Math.random() * MAX_DELTA_PERCENT) - MAX_DELTA_PERCENT;
return price * (100 + percentChange) / 100;
}
}
<bean id="jmsTemplate" class="org.springframework.jms.core.JmsTemplate">
<property name="connectionFactory" ref="pooledJmsConnectionFactory" />
</bean>

注意JmsTemplate使用了一个池化的连接工厂.这点很重要,因为JmsTemplate是专门设计成使用
Java EE容器的,而Java EE规范要求提供池化连接.每次调用JmsTemplate的send()方法都会创建
和销毁JMS资源(连接,消费者和生产者).因此,如果你没有使用Java EE容器,请确保在利用
JmsTemplate发送消息时使用的是池化的连接工厂.

发表评论

电子邮件地址不会被公开。 必填项已用*标注

昵称 *