热卖商品
新闻详情
Spring与ActiveMQ的集成详解二 - Donald_Draper - ITeye博客
来自 : www.iteye.com/blog/2348...
发布时间:2021-03-24
protected boolean doReceiveAndExecute(Object invoker, Session session, MessageConsumer consumer, TransactionStatus status) throws JMSException Connection conToClose; Session sessionToClose; MessageConsumer consumerToClose; conToClose = null; sessionToClose = null; consumerToClose = null; Session sessionToUse; boolean transactional; MessageConsumer consumerToUse; Message message; boolean exposeResource; sessionToUse = session; transactional = false; if(sessionToUse == null) //获取事务会话 sessionToUse = ConnectionFactoryUtils.doGetTransactionalSession(getConnectionFactory(), transactionalResourceFactory, true); transactional = sessionToUse != null; //如果事务会话为空,则 if(sessionToUse == null) Connection conToUse; if(sharedConnectionEnabled()) //获取共享连接 conToUse = getSharedConnection(); } else //创建连接 conToUse = createConnection(); conToClose = conToUse; conToUse.start(); //创建会话 sessionToUse = createSession(conToUse); sessionToClose = sessionToUse; consumerToUse = consumer; if(consumerToUse == null) //创建监听器消费者 consumerToUse = createListenerConsumer(sessionToUse); consumerToClose = consumerToUse; //通知消费者从未消费消息通道获取分发消息 message = receiveMessage(consumerToUse); if(message == null) break MISSING_BLOCK_LABEL_434; messageReceived(invoker, sessionToUse); //如果事务同步管理有对应的连接工厂资源 exposeResource = !transactional isExposeListenerSession() !TransactionSynchronizationManager.hasResource(getConnectionFactory()); if(exposeResource) //则绑定连接工厂,和资源Holder TransactionSynchronizationManager.bindResource(getConnectionFactory(), new LocallyExposedJmsResourceHolder(sessionToUse)); //通知会话,消费消息 doExecuteListener(sessionToUse, message); Throwable ex; if(status != null) if(logger.isDebugEnabled()) logger.debug((new StringBuilder()).append(\"Rolling back transaction because of listener exception thrown: \").append(ex).toString()); status.setRollbackOnly();
创建监听器消费者
p
通知消费者从未消费消息通道获取分发消息
message = receiveMessage(consumerToUse);
//ActiveMQMessageConsumer
发送拉消息命令
从未消费消息通道分发消息
调用Invoker,实际为空,待拓展
通知会话,消费消息
总结:
DefaultMessageListenerContainer构造,主要初始化允许并发消费者的数量,每个任务最大消费消息等;然后初始化任务执行器,创建消息监听器Invoker,交由任务线程去执行;消息监听器Invoker主要做的工作,就是获取消息监听器,通知消费者从未消费消息通道获取分发消息,然后遍历消息监听器,有消监听器的onMessage方法消费消息。
附:Spring事务管理,这个以后有时间,研究一下,猜测思想就是事务同步管理器,根据ThreadLocal管理每个线程的事务管理和连接工厂及连接资源Holder映射关系,每一次连接事务的执行,先从事务同步管理器,获取连接资源Holder,如果有,获取事务的状态,没有则看一下需要事务管理,如果需要,则将连接工厂及连接资源Holder映射关系绑定到事务同步管理器;每一次事务提交都是从事务同步管理器获取连接工厂对应的连接资源Holder,从连接资源Holder获取连接,有连接提交事务或回滚。
创建监听器消费者
p
rotected MessageConsumer createListenerConsumer(Session session) throws JMSException Destination destination = getDestination(); if(destination == null) destination = resolveDestinationName(session, getDestinationName()); return createConsumer(session, destination);
通知消费者从未消费消息通道获取分发消息
message = receiveMessage(consumerToUse);
protected Message receiveMessage(MessageConsumer consumer) throws JMSException return receiveTimeout = 0L ? consumer.receive(receiveTimeout) : consumer.receive(); }
//ActiveMQMessageConsumer
public javax.jms.Message receive(long timeout) throws JMSException checkClosed(); checkMessageListener(); if(timeout == 0L) return receive(); //发送拉消息命令 sendPullCommand(timeout); if(timeout 0L) MessageDispatch md; if(info.getPrefetchSize() == 0) md = dequeue(-1L); else //从未消费消息通道分发消息 md = dequeue(timeout); if(md == null) return null; } else beforeMessageIsConsumed(md); afterMessageIsConsumed(md, false); return createActiveMQMessage(md); } else return null; }
发送拉消息命令
protected void sendPullCommand(long timeout) throws JMSException clearDeliveredList(); if(info.getCurrentPrefetchSize() == 0 unconsumedMessages.isEmpty()) MessagePull messagePull = new MessagePull(); messagePull.configure(info); messagePull.setTimeout(timeout); session.asyncSendPacket(messagePull); }
从未消费消息通道分发消息
private MessageDispatch dequeue(long timeout) throws JMSException long deadline; deadline = 0L; if(timeout 0L) deadline = System.currentTimeMillis() + timeout; MessageDispatch md = unconsumedMessages.dequeue(timeout);}
调用Invoker,实际为空,待拓展
messageReceived(invoker, sessionToUse);
通知会话,消费消息
doExecuteListener(sessionToUse, message);
protected void doExecuteListener(Session session, Message message) throws JMSException //通知会话,消费消息 invokeListener(session, message);
protected void invokeListener(Session session, Message message) throws JMSException Object listener = getMessageListener(); if(listener instanceof MessageListener) //通知监听器,消费消息 doInvokeListener((MessageListener)listener, message); protected void doInvokeListener(MessageListener listener, Message message) throws JMSException //监听器消费消息 listener.onMessage(message); }
总结:
DefaultMessageListenerContainer构造,主要初始化允许并发消费者的数量,每个任务最大消费消息等;然后初始化任务执行器,创建消息监听器Invoker,交由任务线程去执行;消息监听器Invoker主要做的工作,就是获取消息监听器,通知消费者从未消费消息通道获取分发消息,然后遍历消息监听器,有消监听器的onMessage方法消费消息。
附:Spring事务管理,这个以后有时间,研究一下,猜测思想就是事务同步管理器,根据ThreadLocal管理每个线程的事务管理和连接工厂及连接资源Holder映射关系,每一次连接事务的执行,先从事务同步管理器,获取连接资源Holder,如果有,获取事务的状态,没有则看一下需要事务管理,如果需要,则将连接工厂及连接资源Holder映射关系绑定到事务同步管理器;每一次事务提交都是从事务同步管理器获取连接工厂对应的连接资源Holder,从连接资源Holder获取连接,有连接提交事务或回滚。
public interface TransactionStatus extends SavepointManager public abstract boolean isNewTransaction(); public abstract boolean hasSavepoint(); public abstract void setRollbackOnly(); public abstract boolean isRollbackOnly(); public abstract boolean isCompleted();}
public interface PlatformTransactionManager public abstract TransactionStatus getTransaction(TransactionDefinition transactiondefinition) throws TransactionException; public abstract void commit(TransactionStatus transactionstatus) throws TransactionException; public abstract void rollback(TransactionStatus transactionstatus) throws TransactionException;}
public class JmsTransactionManager extends AbstractPlatformTransactionManager implements ResourceTransactionManager, InitializingBean public JmsTransactionManager(ConnectionFactory connectionFactory) this(); setConnectionFactory(connectionFactory); afterPropertiesSet(); public void afterPropertiesSet() if(getConnectionFactory() == null) throw new IllegalArgumentException(\"Property \'connectionFactory\' is required\"); else return; protected Object doGetTransaction() JmsTransactionObject txObject = new JmsTransactionObject(); txObject.setResourceHolder((JmsResourceHolder)TransactionSynchronizationManager.getResource(getConnectionFactory())); return txObject; protected boolean isExistingTransaction(Object transaction) JmsTransactionObject txObject = (JmsTransactionObject)transaction; return txObject.getResourceHolder() != null; protected void doBegin(Object transaction, TransactionDefinition definition) if(definition.getIsolationLevel() != -1) throw new InvalidIsolationLevelException(\"JMS does not support an isolation level concept\"); JmsTransactionObject txObject = (JmsTransactionObject)transaction; Connection con = null; Session session = null; con = createConnection(); session = createSession(con); if(logger.isDebugEnabled()) logger.debug((new StringBuilder()).append(\"Created JMS transaction on Session [\").append(session).append(\"] from Connection [\").append(con).append(\"]\").toString()); txObject.setResourceHolder(new JmsResourceHolder(getConnectionFactory(), con, session)); txObject.getResourceHolder().setSynchronizedWithTransaction(true); int timeout = determineTimeout(definition); if(timeout != -1) txObject.getResourceHolder().setTimeoutInSeconds(timeout); TransactionSynchronizationManager.bindResource(getConnectionFactory(), txObject.getResourceHolder()); catch(Throwable ex) if(con != null) con.close(); catch(Throwable ex2) { } throw new CannotCreateTransactionException(\"Could not create JMS transaction\", ex); protected Object doSuspend(Object transaction) JmsTransactionObject txObject = (JmsTransactionObject)transaction; txObject.setResourceHolder(null); return TransactionSynchronizationManager.unbindResource(getConnectionFactory()); protected void doResume(Object transaction, Object suspendedResources) JmsResourceHolder conHolder = (JmsResourceHolder)suspendedResources; TransactionSynchronizationManager.bindResource(getConnectionFactory(), conHolder); protected void doCommit(DefaultTransactionStatus status) JmsTransactionObject txObject = (JmsTransactionObject)status.getTransaction(); Session session = txObject.getResourceHolder().getSession(); if(status.isDebug()) logger.debug((new StringBuilder()).append(\"Committing JMS transaction on Session [\").append(session).append(\"]\").toString()); session.commit(); catch(TransactionRolledBackException ex) throw new UnexpectedRollbackException(\"JMS transaction rolled back\", ex); catch(JMSException ex) throw new TransactionSystemException(\"Could not commit JMS transaction\", ex); protected void doRollback(DefaultTransactionStatus status) JmsTransactionObject txObject = (JmsTransactionObject)status.getTransaction(); Session session = txObject.getResourceHolder().getSession(); if(status.isDebug()) logger.debug((new StringBuilder()).append(\"Rolling back JMS transaction on Session [\").append(session).append(\"]\").toString()); session.rollback(); catch(JMSException ex) throw new TransactionSystemException(\"Could not roll back JMS transaction\", ex);}
本文链接: http://activeint.immuno-online.com/view-711486.html
发布于 : 2021-03-24
阅读(0)
最新动态
2021-03-24
2021-03-24
2021-03-24
2021-03-24
2021-03-24
2021-03-24
2021-03-24
2021-03-24
2021-03-24
2021-03-24
2021-03-24
2021-03-24
公司介绍
品牌分类
联络我们
服务热线:4000-520-616
(限工作日9:00-18:00)
QQ :1570468124
手机:18915418616