小小笔记

  DonewsBlog  |  Donews首页  |  Donews社区  |  Donews邮箱  |  我的首页  |  联系作者  |  聚合   |  登录
  129篇文章 :: 0篇收藏:: 123篇评论:: 1个Trackbacks

公告

hoho

文章

收藏

相册

c/c++链接

e书下载

存档


正在读取评论……


IBM WebSphere MQ Java Base编程模块。

TjuAILab  Windshow

使用MQjava编程比较简单。基本编程思想比较简单,大体如下:

(1)       初始化MQEnvironment

(2)       打开队列管理器和队列。

(3)       使用MQPutMessageOptionsMQGetMessageOptionsMQMessage放入队列或者从队列中取出。

(4)       关闭队列和队列管理器。

此处我总结了一下MQ Java编程的基本模块。为了模块的独立性,我将很多共同的部分并没有拆分开,有很多重复的部分,而是直接将其封装到每一个函数中,使用时将每个函数拷贝到你的应用程序中就可以使用了。

在此程序中只对同一台机器上的队列进行了测试。

程序主要提供的函数如下:(如果功能尚不够,可以加入参数进行扩展)

/*向指定的主机指定的队列发送消息。*/

public void sender (String givenHostName, String givenChannel,

                      String givenQueueName, String givenQueueManagerName,

int givenPort)

/*从指定的主机指定的队列中取出消息*/

public void receiver (String givenHostName, String givenChannel,

                        String givenQueueName, String givenQueueManagerName,

                        int givenPort)

/*将请求信息加入到指定主机的指定队列,并将对此消息的应答消息发送到应答队列上*/

public void requester (String givenHostName, String givenChannel,

                         String givenRequestQueueName,

                         String givenRequestQueueManagerName,

                         String givenReplyQueueName,

                         String givenReplyQueueManagerName,

                         int givenPort)

/*对请求队列的消息进行处理*/

public void responder (String givenHostName, String givenChannel,

                         String givenRequestQueueName,

                         String givenRequestQueueManagerName,

                         int givenPort)

/*向指定的主机指定的队列发送组消息。*/

public void groupSender (String givenHostName, String givenChannel,

                           String givenQueueName, String givenQueueManagerName,

                           int givenPort)

 

/*从指定的主机指定的队列中取出组消息*/

public void groupReceiver (String givenHostName, String givenChannel,

                             String givenQueueName,

                             String givenQueueManagerName,

int givenPort)

源代码如下:

package mqjava ;

 

import com.ibm.mq.* ;

import java.io.IOException ;

 

/*

 RC 2059 indicates that your Queue Manager is not available.

 cmd使用mqrc 2059 查看原因

 This Damo:

 crtmqm QM

 strmqm QM

 runmqsc QM(注意通道类型定义)

       DEFINE CHANNEL(JAVA.CHANNEL) CHLTYPE(SVRCONN) TRPTYPE(TCP) REPLACE

       DEFINE QLOCAL(QUEUELOCAL) REPLACE

       DEFINE QLOCAL(QUEUEREQUEST) REPLACE

       DEFINE QLOCAL(QUEUEREPLY)   REPLACE

 启动侦听器

 */

public class MQJavaApplication

{

 

  /**

   *

   * @param givenHostName String

   * @param givenChannel String

   * @param givenQueueName String

   * @param givenQueueManagerName String

   * @param givenPort int

   * 用于向指定主机的指定队列发送一条消息(String

   */

  public void sender (String givenHostName, String givenChannel,

                      String givenQueueName, String givenQueueManagerName,

                      int givenPort)

  {

    String hostName = givenHostName ;

    String channel = givenChannel ;

    String queueManagerName = givenQueueManagerName ;

    String queueName = givenQueueName ;

    int port = givenPort ;

    MQQueueManager queueManager = null ;

    MQQueue queue = null ;

 

    try

    {

      /*MQEnvironment初始化*/

      MQEnvironment.hostname = hostName ;

      MQEnvironment.channel = channel ;

      MQEnvironment.port = port ;

      MQEnvironment.CCSID = 1381 ;

      MQEnvironment.properties.put(MQC.TRANSPORT_PROPERTY,

                                   MQC.TRANSPORT_MQSERIES) ;

 

      /*连接到队列管理器*/

      queueManager = new MQQueueManager(queueManagerName) ;

      System.out.println("Access the queue manager [" + queueManagerName +

                         "] successfully") ;

 

      /*设置打开选项以便打开用于输出的队列,如果队列管理器已经停止,我们也设置了选项去应对不成功的情况*/

      int openOptions = MQC.MQOO_OUTPUT | MQC.MQOO_FAIL_IF_QUIESCING ;

      /*打开队列*/

      queue = queueManager.accessQueue(queueName, openOptions) ;

      /*设置放置消息选项,使用默认设置*/

      MQPutMessageOptions pmo = new MQPutMessageOptions() ;

      /*创建消息,MQMessage类包含实际消息数据的数据缓冲区和描述消息的所有MQMD参数*/

      MQMessage outMsg = new MQMessage() ;

      /*设置MQMDManager Queue Message Description)格式字段*/

      outMsg.format = MQC.MQFMT_STRING ;

      /*准备用户数据消息*/

      String msgString = "Test Message from sender function" ;

      outMsg.writeString(msgString) ;

      /*队列上放置消息*/

      queue.put(outMsg, pmo) ;

 

      /*提交事务处理*/

      queueManager.commit() ;

 

      System.out.println("The message has been successfully put!") ;

    }

    catch (MQException ex)

    {

      System.out.println("An MQ Error Occured:Completion Code is :\t" +

                         ex.completionCode + "\n\n The reason code is:\t" +

                         ex.reasonCode) ;

      ex.printStackTrace() ;

    }

    catch (IOException ioe)

    {

      ioe.printStackTrace() ;

    }

    finally

    {

      try

      {

        if (queue != null)

        {

          queue.close() ;

          System.out.println("Close the queue:[" + queueName + "] successfully") ;

        }

        if (queueManager != null)

        {

          queueManager.close() ;

          queueManager.disconnect() ;

          System.out.println("Disconnect the queue manager:[" +

                             queueManagerName +

                             "] successfully") ;

        }

      }

      catch (MQException mqe)

      {

        mqe.printStackTrace() ;

      }

 

    }

 

  }

 

  /**

   *

   * @param givenHostName String

   * @param givenChannel String

   * @param givenQueueName String

   * @param givenQueueManagerName String

   * @param givenPort int

   * 用于从指定计算机的指定队列收取消息

   */

  public void receiver (String givenHostName, String givenChannel,

                        String givenQueueName, String givenQueueManagerName,

                        int givenPort)

  {

    String hostName = givenHostName ;

    String channel = givenChannel ;

    String queueManagerName = givenQueueManagerName ;

    String queueName = givenQueueName ;

    int port = givenPort ;

    MQQueueManager queueManager = null ;

    MQQueue queue = null ;

 

    try

    {

      /*MQEnvironment初始化*/

      MQEnvironment.hostname = hostName ;

      MQEnvironment.channel = channel ;

      MQEnvironment.port = port ;

      MQEnvironment.CCSID = 1381 ;

      MQEnvironment.properties.put(MQC.TRANSPORT_PROPERTY,

                                   MQC.TRANSPORT_MQSERIES) ;

 

      /*连接到队列管理器*/

      queueManager = new MQQueueManager(queueManagerName) ;

      System.out.println("Access the queue manager:[" + queueManagerName +

                         "] successfully") ;

 

      /*设置打开选项以便打开用于输出的队列,如果队列管理器已经停止,设置应对不成功的情况*/

      int openOptions = MQC.MQOO_INPUT_SHARED | MQC.MQOO_FAIL_IF_QUIESCING ;

 

      /*打开队列*/

      queue = queueManager.accessQueue(queueName, openOptions, null, null, null) ;

      /*设置放置消息选项*/

      MQGetMessageOptions gmo = new MQGetMessageOptions() ;

 

      /*在同步点控制下获取消息*/

      gmo.options = gmo.options + MQC.MQGMO_SYNCPOINT ;

      /*如果在队列上没有消息则等待*/

      gmo.options = gmo.options + MQC.MQGMO_WAIT ;

      /*如果队列管理器停顿则失败*/

      gmo.options = gmo.options + MQC.MQGMO_FAIL_IF_QUIESCING ;

      /*设置等待时间间隔*/

      gmo.waitInterval = 3000 ;

 

      /*创建MQMessage*/

      MQMessage inMsg = new MQMessage() ;

      /*从队列到消息缓冲区获取消息*/

      queue.get(inMsg, gmo) ;

 

      /*从消息读取用户数据*/

      String msgString = inMsg.readString(inMsg.getMessageLength()) ;

      System.out.println("The message from the Queue is :" + msgString) ;

 

      /*提交事务*/

      queueManager.commit() ;

    }

    catch (MQException ex)

    {

      System.out.println("An MQ Error Occured:Completion Code is :\t" +

                         ex.completionCode + "\n\n The reason code is:\t" +

                         ex.reasonCode) ;

      ex.printStackTrace() ;

    }

    catch (IOException ioe)

    {

      ioe.printStackTrace() ;

    }

    finally

    {

      try

      {

        if (queue != null)

        {

          queue.close() ;

          System.out.println("Close the queue:[" + queueName + "] successfully") ;

        }

        if (queueManager != null)

        {

          queueManager.close() ;

          queueManager.disconnect() ;

          System.out.println("Disconnect the queue manager:[" +

                             queueManagerName +

                             "] successfully") ;

        }

      }

      catch (MQException mqe)

      {

        mqe.printStackTrace() ;

      }

 

    }

 

  }

 

  /**

   *

   * @param givenHostName String

   * @param givenChannel String

   * @param givenRequestQueueName String

   * @param givenRequestQueueManagerName String

   * @param givenReplyQueueName String

   * @param givenReplyQueueManagerName String

   * @param givenPort int

   * 将请求信息加入到指定主机的指定队列,并将对此消息的应答消息发送到应答队列上

   */

  public void requester (String givenHostName, String givenChannel,

                         String givenRequestQueueName,

                         String givenRequestQueueManagerName,

                         String givenReplyQueueName,

                         String givenReplyQueueManagerName,

                         int givenPort)

  {

    String hostName = givenHostName ;

    String channel = givenChannel ;

    String requestQueueName = givenRequestQueueName ;

    String requestQueueManagerName = givenRequestQueueManagerName ;

    String replyQueueName = givenReplyQueueName ;

    String replyQueueManagerName = givenReplyQueueManagerName ;

    int port = givenPort ;

    MQQueueManager requestQueueManager = null ;

    MQQueue requestQueue = null ;

 

    MQQueueManager replyQueueManager = null ;

    MQQueue replyQueue = null ;

 

    try

    {

      /*设置MQEnvironment属性以便客户机连接*/

      MQEnvironment.hostname = hostName ;

      MQEnvironment.channel = channel ;

      MQEnvironment.CCSID = 1381 ;

      MQEnvironment.port = 1414 ;

      MQEnvironment.properties.put(MQC.TRANSPORT_PROPERTY,

                                   MQC.TRANSPORT_MQSERIES) ;

 

      /*连接到队列管理器*/

      requestQueueManager = new MQQueueManager(requestQueueManagerName) ;

      System.out.println("Access the queue manager:[" + requestQueueManagerName +

                         "] successfully") ;

 

      /*设置打开选项以便打开用于输出的队列,如果队列管理器已经停止,设置应对不成功的情况*/

      int openOptions = MQC.MQOO_OUTPUT | MQC.MQOO_FAIL_IF_QUIESCING ;

 

      // 第三个参数default q manager

      // 第四个参数no dynamic q name

      // 第五个参数no alternate user id

      requestQueue = requestQueueManager.accessQueue(requestQueueName,

             openOptions, null, null, null) ;

      /*设置放置消息选项,使用默认设置*/

      MQPutMessageOptions pmo = new MQPutMessageOptions() ;

      pmo.options = pmo.options + MQC.MQPMO_NEW_MSG_ID ;

      pmo.options = pmo.options + MQC.MQPMO_SYNCPOINT ;

 

      /*创建消息缓冲区*/

      MQMessage outMsg = new MQMessage() ;

      /*设置MQMD格式字段*/

      outMsg.format = MQC.MQFMT_STRING ;

      outMsg.messageFlags = MQC.MQMT_REQUEST ;

      outMsg.replyToQueueName = replyQueueName ;

      outMsg.replyToQueueManagerName = replyQueueManagerName ;

 

      /*准备用户数据消息*/

      String msgString = "Test Request Message from Requester program" ;

      outMsg.writeString(msgString) ;

 

      /*在队列上放置消息*/

      requestQueue.put(outMsg, pmo) ;

 

      /*提交事务*/

      requestQueueManager.commit() ;

      System.out.println("The Message has been successfully put") ;

    }

    catch (MQException ex)

    {

      System.out.println("An MQ Error Occured:Completion Code is :\t" +

                         ex.completionCode + "\n\n The reason code is:\t" +

                         ex.reasonCode) ;

      ex.printStackTrace() ;

    }

    catch (IOException ioe)

    {

      ioe.printStackTrace() ;

    }

    finally

    {

      try

      {

        if (requestQueue != null)

        {

          requestQueue.close() ;

          System.out.println("Close the queue:[" + requestQueueName +

                             "] successfully") ;

        }

        if (requestQueueManager != null)

        {

          requestQueueManager.close() ;

          requestQueueManager.disconnect() ;

          System.out.println("Disconnect the queue manager:[" +

                             requestQueueManagerName + "] successfully") ;

        }

      }

      catch (MQException mqe)

      {

        mqe.printStackTrace() ;

      }

 

    }

  }

 

  /**

   *

   * @param givenHostName String

   * @param givenChannel String

   * @param givenRequestQueueName String

   * @param givenRequestQueueManagerName String

   * @param givenPort int

   * 对于请求队列的请求消息进行处理

   */

  public void responder (String givenHostName, String givenChannel,

                         String givenRequestQueueName,

                         String givenRequestQueueManagerName,

                         int givenPort)

  {

    String hostName = givenHostName ;

    String channel = givenChannel ;

    String requestQueueName = givenRequestQueueName ;

    String requestQueueManagerName = givenRequestQueueManagerName ;

    int port = givenPort ;

    MQQueueManager requestQueueManager = null ;

    MQQueue requestQueue = null ;

 

    MQQueueManager replyQueueManager = null ;

    MQQueue replyQueue = null ;

    try

    {

      /*设置MQEnvironment属性以便客户机连接*/

      MQEnvironment.hostname = hostName ;

      MQEnvironment.channel = channel ;

      MQEnvironment.CCSID = 1381 ;

      MQEnvironment.port = 1414 ;

      MQEnvironment.properties.put(MQC.TRANSPORT_PROPERTY,

                                   MQC.TRANSPORT_MQSERIES) ;

 

      /*连接到队列管理器*/

      requestQueueManager = new MQQueueManager(requestQueueManagerName) ;

      System.out.println("Access the queue manager:[" + requestQueueManagerName +

                         "] successfully") ;

 

      /*设置打开选项以便打开用于输出的队列,如果队列管理器已经停止,设置应对不成功的情况*/

      int openOptions = MQC.MQOO_INPUT_SHARED | MQC.MQOO_FAIL_IF_QUIESCING ;

 

      /*打开队列*/

      requestQueue = requestQueueManager.accessQueue(request