IBM WebSphere MQ Java Base编程模块。
TjuAILab Windshow
使用MQ的java编程比较简单。基本编程思想比较简单,大体如下:
(1) 初始化MQEnvironment。
(2) 打开队列管理器和队列。
(3) 使用MQPutMessageOptions和MQGetMessageOptions将MQMessage放入队列或者从队列中取出。
(4) 关闭队列和队列管理器。
此处我总结了一下MQ Java编程的基本模块。为了模块的独立性,我将很多共同的部分并没有拆分开,有很多重复的部分,而是直接将其封装到每一个函数中,使用时将每个函数拷贝到你的应用程序中就可以使用了。
在此程序中只对同一台机器上的队列进行了测试。
程序主要提供的函数如下:(如果功能尚不够,可以加入参数进行扩展)
/*向指定的主机指定的队列发送消息。*/
public void sender (String givenHostName, String givenChannel,
String givenQueueName, String givenQueueManagerName,
int givenPort)
/*从指定的主机指定的队列中取出消息*/
public void receiver (String givenHostName, String givenChannel,
String givenQueueName, String givenQueueManagerName,
int givenPort)
/*将请求信息加入到指定主机的指定队列,并将对此消息的应答消息发送到应答队列上*/
public void requester (String givenHostName, String givenChannel,
String givenRequestQueueName,
String givenRequestQueueManagerName,
String givenReplyQueueName,
String givenReplyQueueManagerName,
int givenPort)
/*对请求队列的消息进行处理*/
public void responder (String givenHostName, String givenChannel,
String givenRequestQueueName,
String givenRequestQueueManagerName,
int givenPort)
/*向指定的主机指定的队列发送组消息。*/
public void groupSender (String givenHostName, String givenChannel,
String givenQueueName, String givenQueueManagerName,
int givenPort)
/*从指定的主机指定的队列中取出组消息*/
public void groupReceiver (String givenHostName, String givenChannel,
String givenQueueName,
String givenQueueManagerName,
int givenPort)
源代码如下:
package mqjava ;
import com.ibm.mq.* ;
import java.io.IOException ;
/*
RC 2059 indicates that your Queue Manager is not available.
在cmd使用mqrc 2059 查看原因
This Damo:
crtmqm QM
strmqm QM
runmqsc QM(注意通道类型定义)
DEFINE CHANNEL(JAVA.CHANNEL) CHLTYPE(SVRCONN) TRPTYPE(TCP) REPLACE
DEFINE QLOCAL(QUEUELOCAL) REPLACE
DEFINE QLOCAL(QUEUEREQUEST) REPLACE
DEFINE QLOCAL(QUEUEREPLY) REPLACE
启动侦听器
*/
public class MQJavaApplication
{
/**
*
* @param givenHostName String
* @param givenChannel String
* @param givenQueueName String
* @param givenQueueManagerName String
* @param givenPort int
* 用于向指定主机的指定队列发送一条消息(String)
*/
public void sender (String givenHostName, String givenChannel,
String givenQueueName, String givenQueueManagerName,
int givenPort)
{
String hostName = givenHostName ;
String channel = givenChannel ;
String queueManagerName = givenQueueManagerName ;
String queueName = givenQueueName ;
int port = givenPort ;
MQQueueManager queueManager = null ;
MQQueue queue = null ;
try
{
/*MQEnvironment初始化*/
MQEnvironment.hostname = hostName ;
MQEnvironment.channel = channel ;
MQEnvironment.port = port ;
MQEnvironment.CCSID = 1381 ;
MQEnvironment.properties.put(MQC.TRANSPORT_PROPERTY,
MQC.TRANSPORT_MQSERIES) ;
/*连接到队列管理器*/
queueManager = new MQQueueManager(queueManagerName) ;
System.out.println("Access the queue manager [" + queueManagerName +
"] successfully") ;
/*设置打开选项以便打开用于输出的队列,如果队列管理器已经停止,我们也设置了选项去应对不成功的情况*/
int openOptions = MQC.MQOO_OUTPUT | MQC.MQOO_FAIL_IF_QUIESCING ;
/*打开队列*/
queue = queueManager.accessQueue(queueName, openOptions) ;
/*设置放置消息选项,使用默认设置*/
MQPutMessageOptions pmo = new MQPutMessageOptions() ;
/*创建消息,MQMessage类包含实际消息数据的数据缓冲区和描述消息的所有MQMD参数*/
MQMessage outMsg = new MQMessage() ;
/*设置MQMD(Manager Queue Message Description)格式字段*/
outMsg.format = MQC.MQFMT_STRING ;
/*准备用户数据消息*/
String msgString = "Test Message from sender function" ;
outMsg.writeString(msgString) ;
/*队列上放置消息*/
queue.put(outMsg, pmo) ;
/*提交事务处理*/
queueManager.commit() ;
System.out.println("The message has been successfully put!") ;
}
catch (MQException ex)
{
System.out.println("An MQ Error Occured:Completion Code is :\t" +
ex.completionCode + "\n\n The reason code is:\t" +
ex.reasonCode) ;
ex.printStackTrace() ;
}
catch (IOException ioe)
{
ioe.printStackTrace() ;
}
finally
{
try
{
if (queue != null)
{
queue.close() ;
System.out.println("Close the queue:[" + queueName + "] successfully") ;
}
if (queueManager != null)
{
queueManager.close() ;
queueManager.disconnect() ;
System.out.println("Disconnect the queue manager:[" +
queueManagerName +
"] successfully") ;
}
}
catch (MQException mqe)
{
mqe.printStackTrace() ;
}
}
}
/**
*
* @param givenHostName String
* @param givenChannel String
* @param givenQueueName String
* @param givenQueueManagerName String
* @param givenPort int
* 用于从指定计算机的指定队列收取消息
*/
public void receiver (String givenHostName, String givenChannel,
String givenQueueName, String givenQueueManagerName,
int givenPort)
{
String hostName = givenHostName ;
String channel = givenChannel ;
String queueManagerName = givenQueueManagerName ;
String queueName = givenQueueName ;
int port = givenPort ;
MQQueueManager queueManager = null ;
MQQueue queue = null ;
try
{
/*MQEnvironment初始化*/
MQEnvironment.hostname = hostName ;
MQEnvironment.channel = channel ;
MQEnvironment.port = port ;
MQEnvironment.CCSID = 1381 ;
MQEnvironment.properties.put(MQC.TRANSPORT_PROPERTY,
MQC.TRANSPORT_MQSERIES) ;
/*连接到队列管理器*/
queueManager = new MQQueueManager(queueManagerName) ;
System.out.println("Access the queue manager:[" + queueManagerName +
"] successfully") ;
/*设置打开选项以便打开用于输出的队列,如果队列管理器已经停止,设置应对不成功的情况*/
int openOptions = MQC.MQOO_INPUT_SHARED | MQC.MQOO_FAIL_IF_QUIESCING ;
/*打开队列*/
queue = queueManager.accessQueue(queueName, openOptions, null, null, null) ;
/*设置放置消息选项*/
MQGetMessageOptions gmo = new MQGetMessageOptions() ;
/*在同步点控制下获取消息*/
gmo.options = gmo.options + MQC.MQGMO_SYNCPOINT ;
/*如果在队列上没有消息则等待*/
gmo.options = gmo.options + MQC.MQGMO_WAIT ;
/*如果队列管理器停顿则失败*/
gmo.options = gmo.options + MQC.MQGMO_FAIL_IF_QUIESCING ;
/*设置等待时间间隔*/
gmo.waitInterval = 3000 ;
/*创建MQMessage类*/
MQMessage inMsg = new MQMessage() ;
/*从队列到消息缓冲区获取消息*/
queue.get(inMsg, gmo) ;
/*从消息读取用户数据*/
String msgString = inMsg.readString(inMsg.getMessageLength()) ;
System.out.println("The message from the Queue is :" + msgString) ;
/*提交事务*/
queueManager.commit() ;
}
catch (MQException ex)
{
System.out.println("An MQ Error Occured:Completion Code is :\t" +
ex.completionCode + "\n\n The reason code is:\t" +
ex.reasonCode) ;
ex.printStackTrace() ;
}
catch (IOException ioe)
{
ioe.printStackTrace() ;
}
finally
{
try
{
if (queue != null)
{
queue.close() ;
System.out.println("Close the queue:[" + queueName + "] successfully") ;
}
if (queueManager != null)
{
queueManager.close() ;
queueManager.disconnect() ;
System.out.println("Disconnect the queue manager:[" +
queueManagerName +
"] successfully") ;
}
}
catch (MQException mqe)
{
mqe.printStackTrace() ;
}
}
}
/**
*
* @param givenHostName String
* @param givenChannel String
* @param givenRequestQueueName String
* @param givenRequestQueueManagerName String
* @param givenReplyQueueName String
* @param givenReplyQueueManagerName String
* @param givenPort int
* 将请求信息加入到指定主机的指定队列,并将对此消息的应答消息发送到应答队列上
*/
public void requester (String givenHostName, String givenChannel,
String givenRequestQueueName,
String givenRequestQueueManagerName,
String givenReplyQueueName,
String givenReplyQueueManagerName,
int givenPort)
{
String hostName = givenHostName ;
String channel = givenChannel ;
String requestQueueName = givenRequestQueueName ;
String requestQueueManagerName = givenRequestQueueManagerName ;
String replyQueueName = givenReplyQueueName ;
String replyQueueManagerName = givenReplyQueueManagerName ;
int port = givenPort ;
MQQueueManager requestQueueManager = null ;
MQQueue requestQueue = null ;
MQQueueManager replyQueueManager = null ;
MQQueue replyQueue = null ;
try
{
/*设置MQEnvironment属性以便客户机连接*/
MQEnvironment.hostname = hostName ;
MQEnvironment.channel = channel ;
MQEnvironment.CCSID = 1381 ;
MQEnvironment.port = 1414 ;
MQEnvironment.properties.put(MQC.TRANSPORT_PROPERTY,
MQC.TRANSPORT_MQSERIES) ;
/*连接到队列管理器*/
requestQueueManager = new MQQueueManager(requestQueueManagerName) ;
System.out.println("Access the queue manager:[" + requestQueueManagerName +
"] successfully") ;
/*设置打开选项以便打开用于输出的队列,如果队列管理器已经停止,设置应对不成功的情况*/
int openOptions = MQC.MQOO_OUTPUT | MQC.MQOO_FAIL_IF_QUIESCING ;
// 第三个参数default q manager
// 第四个参数no dynamic q name
// 第五个参数no alternate user id
requestQueue = requestQueueManager.accessQueue(requestQueueName,
openOptions, null, null, null) ;
/*设置放置消息选项,使用默认设置*/
MQPutMessageOptions pmo = new MQPutMessageOptions() ;
pmo.options = pmo.options + MQC.MQPMO_NEW_MSG_ID ;
pmo.options = pmo.options + MQC.MQPMO_SYNCPOINT ;
/*创建消息缓冲区*/
MQMessage outMsg = new MQMessage() ;
/*设置MQMD格式字段*/
outMsg.format = MQC.MQFMT_STRING ;
outMsg.messageFlags = MQC.MQMT_REQUEST ;
outMsg.replyToQueueName = replyQueueName ;
outMsg.replyToQueueManagerName = replyQueueManagerName ;
/*准备用户数据消息*/
String msgString = "Test Request Message from Requester program" ;
outMsg.writeString(msgString) ;
/*在队列上放置消息*/
requestQueue.put(outMsg, pmo) ;
/*提交事务*/
requestQueueManager.commit() ;
System.out.println("The Message has been successfully put") ;
}
catch (MQException ex)
{
System.out.println("An MQ Error Occured:Completion Code is :\t" +
ex.completionCode + "\n\n The reason code is:\t" +
ex.reasonCode) ;
ex.printStackTrace() ;
}
catch (IOException ioe)
{
ioe.printStackTrace() ;
}
finally
{
try
{
if (requestQueue != null)
{
requestQueue.close() ;
System.out.println("Close the queue:[" + requestQueueName +
"] successfully") ;
}
if (requestQueueManager != null)
{
requestQueueManager.close() ;
requestQueueManager.disconnect() ;
System.out.println("Disconnect the queue manager:[" +
requestQueueManagerName + "] successfully") ;
}
}
catch (MQException mqe)
{
mqe.printStackTrace() ;
}
}
}
/**
*
* @param givenHostName String
* @param givenChannel String
* @param givenRequestQueueName String
* @param givenRequestQueueManagerName String
* @param givenPort int
* 对于请求队列的请求消息进行处理
*/
public void responder (String givenHostName, String givenChannel,
String givenRequestQueueName,
String givenRequestQueueManagerName,
int givenPort)
{
String hostName = givenHostName ;
String channel = givenChannel ;
String requestQueueName = givenRequestQueueName ;
String requestQueueManagerName = givenRequestQueueManagerName ;
int port = givenPort ;
MQQueueManager requestQueueManager = null ;
MQQueue requestQueue = null ;
MQQueueManager replyQueueManager = null ;
MQQueue replyQueue = null ;
try
{
/*设置MQEnvironment属性以便客户机连接*/
MQEnvironment.hostname = hostName ;
MQEnvironment.channel = channel ;
MQEnvironment.CCSID = 1381 ;
MQEnvironment.port = 1414 ;
MQEnvironment.properties.put(MQC.TRANSPORT_PROPERTY,
MQC.TRANSPORT_MQSERIES) ;
/*连接到队列管理器*/
requestQueueManager = new MQQueueManager(requestQueueManagerName) ;
System.out.println("Access the queue manager:[" + requestQueueManagerName +
"] successfully") ;
/*设置打开选项以便打开用于输出的队列,如果队列管理器已经停止,设置应对不成功的情况*/
int openOptions = MQC.MQOO_INPUT_SHARED | MQC.MQOO_FAIL_IF_QUIESCING ;
/*打开队列*/
requestQueue = requestQueueManager.accessQueue(request