.Net下的MSMQ的同步异步调用

一、MSMQ简介
MSMQ(微软消息队列)是Windows操作系统中消息应用程序的基础,是用于创建分布式、松散连接的消息通讯应用程序的开发工具。消息队列
和电子邮件有着很多相似处,他们都包含多个属性,用于保存消息,消息类型中都指出发送者和接收者的地址;然而他们的用处却有着很大的
区别:消息队列的发送者和接收者是应用程序,而电子邮件的发送者和接收者通常是人。如同电子邮件一样,消息队列的发送和接收也不需要

发送者和接收者同时在场,可以存储在消息队列或是邮件服务器中。

二、消息队列的安装
默认情况下安装操作系统是不安装消息队列的,你可以在控制面板中找到添加/删除程序,然后选择添加/删除Windows组件一项,然后选择应

用程序服务器,双击它进入详细资料中选择消息队列一项进行安装,如图:



三、消息队列类型

消息对列分为3类:

公共队列

MachineName\QueueName

能被别的机器所访问,如果你的多个项目中用到消息队列,那么你可以把队列定义为公共队列

专用队列

MachineName\Private$\QueueName

只针对于本机的程序才可以调用的队列,有些情况下为了安全起见定义为私有队列。

日志队列

MachineName\QueueName\Journal$

四、消息队列的创建

MessageQueue Mq=new MessageQueue(“.\\private$\\Mymq”);

通过Path属性引用消息队列的代码也十分简单:

MessageQueue Mq=new MessageQueue();

Mq.Path=”.\\private$\\Mymq”;

使用 Create 方法可以在计算机上创建队列:

System.Messaging.MessageQueue.Create(@".\private$\Mymq");

这里注意由于在C#中要记住用反斜杠将“\”转义。

由于消息对列所放置的地方经常改变,所以建议消息队列路径不要写死,建议放在配置文件中。

五、消息的发送

消息的发送可以分为简单消息和复杂消息,简单消息类型就是常用的数据类型,例如整型、字符串等数据;复杂消息的数据类型通常对应于系统中的复杂数据类型,例如结构,对象等等。

Mq.Send("Hello!");

在这里建议你可以事先定义一个对象类,然后发送这个对象类的实例对象,这样以后无论在增加什么发送信息,只需在对象类中增加相应的属性即可。

六、消息的接收和阅读

(1)同步接收消息

  接收消息的代码很简单:

Mq.Receive();
        Mq.Receive(TimeSpan timeout); //设定超时时间
Mq.ReceiveById(ID);
        Mq.Peek();

通过Receive方法接收消息同时永久性地从队列中删除消息;

通过Peek方法从队列中取出消息而不从队列中移除该消息。

如果知道消息的标识符(ID),还可以通过ReceiveById方法和PeekById方法完成相应的操作。

(2)异步接受消息
 
利用委托机制:MessQueue.ReceiveCompleted +=new ReceiveCompletedEventHandler(mq_ReceiveCompleted);

(3)消息阅读

在应用程序能够阅读的消息和消息队列中的消息格式不同,应用程序发送出去的消息经过序列化以后才发送给了消息队列
而在接受端必须反序列化,利用下面的代码可以实现:

public void mq_ReceiveCompleted(object sender, System.Messaging.ReceiveCompletedEventArgs e)
  {
  System.Messaging.Message m = MessQueue.EndReceive(e.AsyncResult);
  m.Formatter = new System.Messaging.XmlMessageFormatter(new string[]{"System.String,mscorlib"});
  Console.WriteLine("Message: " + (string)m.Body);
  MessQueue.BeginReceive() ;

  }

反序列化还有另一种写法:m.Formatter = new XmlMessageFormatter ( new Type [] { typeof (string) } );

七、由于消息队列的代码有些是固定不便的,所以把这些代码封装成一个类方便以后使用:


1using System;
  2using System.Messaging;
  3using System.Threading;
  5
  6namespace LoveStatusService
  7{
  8    /**//// <summary>
  9    /// Summary description for Msmq.
10    /// </summary>
11    public class Msmq
12    {
13        public Msmq()
14        {
15            //
16            // TODO: Add constructor logic here
17            //
18        }
19
20       
21        private MessageQueue _messageQueue=null;
22        //最大并发线程数
23        private static int MAX_WORKER_THREADS=Convert.ToInt32( System.Configuration.ConfigurationSettings.AppSettings["MAX_WORKER_THREADS"].ToString());
24        //Msmq路径
25        private static string MsmqPath=System.Configuration.ConfigurationSettings.AppSettings["LoveStatusMQPath"];
26        //等待句柄
27        private WaitHandle[] waitHandleArray = new WaitHandle[MAX_WORKER_THREADS];
28        //任务类型
29        //1. Send Email 2. Send Message  3. Send Email and Message
30        private string TaskType=System.Configuration.ConfigurationSettings.AppSettings["TaskType"];

31        public MessageQueue MessQueue
32        {
33            get
34            {
35           
36                if (_messageQueue==null)
37                {
38                    if(MessageQueue.Exists(MsmqPath))
39                    {
40                        _messageQueue = new MessageQueue(MsmqPath);   
41                    }
42                    else
43                    {
44                        _messageQueue = MessageQueue.Create(MsmqPath);   
45                    }   
46                }
47               
48
49                return _messageQueue;
50            }
51        }
52       
53
54    Private Method#region Private Method
55
56        private void mq_ReceiveCompleted(object sender, System.Messaging.ReceiveCompletedEventArgs e)
57        {
58            MessageQueue mqq = (MessageQueue)sender;
59            System.Messaging.Message m = mqq.EndReceive(e.AsyncResult);
60            //m.Formatter = new System.Messaging.XmlMessageFormatter(new string[]{"System.String,mscorlib"});
61            m.Formatter =new System.Messaging.XmlMessageFormatter(new Type[] {typeof(UserObject)}) ;
62            //log.Info("Receive UserID: " + (string)m.Body) ;
63            UserObject obj=(UserObject)m.Body ;
64            long curUserId=obj.curUserID ;
65            long oppUserId=obj.oppUserID;
66            string curUserName=obj.curUserName;
67            string oppUserName=obj.oppUserName;
68            string curEmail=obj.curEmail ;
69            string oppEmail=obj.oppEmail;
70            string subject =obj.subject ;
71            string body=obj.body ;
72            //AppLog.log.Info("curUserId:"+curUserId) ;
73            //AppLog.log.Info("oppUserId:"+oppUserId) ;
74            AppLog.log.Info("==type="+TaskType) ;
75            switch(TaskType)
76            {
77                //Email
78                case "1":
79                    EmailForMQ.SendEmailForLoveStatus(curEmail,oppEmail,curUserName,oppUserName,subject) ;
80                    AppLog.log.Info("==Send to=="+oppEmail) ;
81                    break;
82                //Message
83                case "2":
84                    MessageForMQ.SendMessage(curUserId,oppUserId,subject,body) ;
85                    AppLog.log.Info("==Send Msg to=="+oppUserId) ;
86                    break;
87                //Email and Message       
88                case "3":
89                    EmailForMQ.SendEmailForLoveStatus(curEmail,oppEmail,curUserName,oppUserName,subject) ;
90                    AppLog.log.Info("==Send to=="+oppEmail) ;
91                    MessageForMQ.SendMessage(curUserId,oppUserId,subject,body) ;
92                    AppLog.log.Info("==Send Msg to=="+oppUserId) ;
93                    break;
94                default:
95                    break;
96
97            }
98            mqq.BeginReceive() ;
99
100        }
101
102    #endregion
103
104    Public Method#region Public Method
105
106        //一个将对象发送到队列的方法,这里发送的是对象
107        public void SendUserIDToMQ(object arr)
108        {
109            MessQueue.Send(arr) ;

110            Console.WriteLine("Ok") ;
111            Console.Read() ;
112        }
113
114        //同步接受队列内容的方法
115        public void ReceiveFromMQ()
116        {
117            Message ms=new Message() ;
118           
119            //ms=MessQueue.Peek();
120            try
121            {
122                ms=MessQueue.Receive(new TimeSpan(0,0,5));

123                if(ms!=null)
124                {
125                    ms.Formatter = new XmlMessageFormatter ( new Type [] { typeof (string) } );
126                    AppLog.log.Info((string)ms.Body)  ;
127                }
128            }
129            catch(Exception ex)
130            {
131               
132            }
133           
134       
135        }
136
137        //开始监听工作线程
138        public  void startListen()
139        {
140            AppLog.log.Info("--Thread--"+MAX_WORKER_THREADS) ;
141            MessQueue.ReceiveCompleted +=new ReceiveCompletedEventHandler(mq_ReceiveCompleted);
142           
143            //异步方式,并发
144           
145            for(int i=0; i<MAX_WORKER_THREADS; i++)
146            {
147                // Begin asynchronous operations.
148                waitHandleArray = MessQueue.BeginReceive().AsyncWaitHandle;
149            }
150
151            AppLog.log.Info("------Start Listen--------") ;
152
153            return;
154
155        }
156
157
158        //停止监听工作线程
159        public void stopListen()
160        {
161
162            for(int i=0;i<waitHandleArray.Length;i++)
163            {
164
165                try
166                {
167                    waitHandleArray.Close();
168                }
169                catch
170                {
171                    AppLog.log.Info("---waitHandleArray.Close() Error!-----") ;
172                }
173
174            }
175
176            try
177            {
178                // Specify to wait for all operations to return.
179                WaitHandle.WaitAll(waitHandleArray,1000,false);
180            }
181            catch
182            {
183                AppLog.log.Info("---WaitHandle.WaitAll Error!-----") ;
184            }
185            AppLog.log.Info("------Stop Listen--------") ;

186
187        }
188
189    #endregion
190   
191   
192
193   
194    }
195}
196


UserObject的代码


1using System;
  2
  3namespace Goody9807
  4{
  5    /**//// <summary>
  6    /// 用与在MQ上传输数据的对象
  7    /// </summary>
  8    public class UserObject
  9    {
10        public UserObject()
11        {
12            //
13            // TODO: Add constructor logic here
14            //
15        }
16
17        private long _curUserID;
18        public long curUserID
19        {
20            get
21            {
22                return _curUserID;
23            }
24            set
25            {
26                _curUserID=value;
27            }
28        }
29
30        private  string _curUserName="";
31        public string curUserName
32        {
33            get
34            {
35                return _curUserName;
36            }
37            set
38            {
39                _curUserName=value;
40            }
41        }
42
43        private string _curEmail="";
44        public string curEmail
45        {
46            get
47            {
48                return _curEmail;
49            }
50            set
51            {
52                _curEmail=value;
53            }
54        }
55
56
57        private long _oppUserID;
58        public long oppUserID
59        {
60            get
61            {
62                return _oppUserID;
63            }
64            set
65            {
66                _oppUserID=value;
67            }
68        }
69
70        private  string _oppUserName="";
71        public string oppUserName
72        {
73            get
74            {
75                return _oppUserName;
76            }
77            set
78            {

79                _oppUserName=value;
80            }
81        }
82
83        private string _oppEmail="";
84        public string oppEmail
85        {
86            get
87            {
88                return _oppEmail;
89            }
90            set
91            {
92                _oppEmail=value;
93            }
94        }
95
96        private string _subject ="";
97        public string subject
98        {
99            get
100            {
101                return _subject;
102            }
103            set
104            {
105                _subject=value;
106            }
107        }
108
109        private string _body="";
110        public string body
111        {
112            get
113            {
114                return _body;
115            }
116            set
117            {
118                _body=value;
119            }

120        }
121    }
122}
123

另一个同事写的封装类


1using System;
  2
  3using System.Threading;
  4
  5using System.Messaging;
  6
  7
  8
  9namespace Wapdm.SmsApp
10
11{
12
13    /**//// <summary>
14
15    /// <para>
16
17    /// A Logger implementation that writes messages to a message queue.
18
19    /// The default event formatter used is an instance of XMLEventFormatter
20
21    /// </para>
22
23    /// </summary>
24
25    public sealed class MsgQueue
26
27    {
28
29
30
31        private const string BLANK_STRING                  = "";
32
33        private const string PERIOD                        = @".\private$";  //".";
34
35        private const string ELLIPSIS                      = "";   
36
37   
38
39        private string serverAddress;
40
41        private string queueName;
42
43        private string queuePath;
44
45       
46
47        private bool IsContextEnabled; 
48
49   
50
51        private MessageQueue queue;
52
53   
54
55        private object queueMonitor                        = new object();
56
57   
58
59        private MsgQueue() {}
60
61
62
63        public static MsgQueue mq = null;
64
65        public static WaitHandle[] waitHandleArray = new WaitHandle[Util.MAX_WORKER_THREADS];
66
67   
68
69        public MsgQueue(string _serverAddress, string _queueName, string _summaryPattern)
70
71        {
72
73              if ((_serverAddress == null) || (_queueName == null) || (_summaryPattern == null))
74
75              {
76
77                  throw new ArgumentNullException();
78
79              }
80
81              ServerAddress = _serverAddress;
82
83              QueueName = _queueName;
84
85              IsContextEnabled = true;           
86
87        }
88
89   
90
91        public MsgQueue(string _serverAddress, string _queueName)
92
93        {
94
95              if ((_serverAddress == null) || (_queueName == null))

96
97              {
98
99                  throw new ArgumentNullException();
100
101              }
102
103              ServerAddress = _serverAddress;
104
105              QueueName = _queueName;
106
107              IsContextEnabled = true;
108
109        }
110
111   
112
113        public MsgQueue(string _queueName)
114
115        {
116
117              if (_queueName == null)
118
119              {
120
121                  throw new ArgumentNullException();
122
123              }
124
125              serverAddress = PERIOD;
126
127              QueueName = _queueName;
128
129              IsContextEnabled = true;           
130
131              if ( IsContextEnabled == false )
132
133                  throw new ArgumentNullException();
134
135        }
136
137   
138
139        public string ServerAddress
140
141        {
142
143              get
144
145              {
146
147                  return serverAddress;
148
149              }
150
151              set
152
153              {
154
155                  if (value == null)
156
157                  {
158
159                      value = PERIOD;
160
161                  }
162
163                  value = value.Trim();
164
165                  if (value.Equals(BLANK_STRING))
166
167                  {
168
169                      throw new ArgumentException("Invalid value (must contain non-whitespace characters)");

170
171                  }
172
173                  lock (queueMonitor)
174
175                  {
176
177                      serverAddress = value;
178
179                      queuePath = serverAddress + '\\' + queueName;
180
181                      InitializeQueue();
182
183                  }
184
185              }
186
187        }
188
189
190
191        public string QueueName
192
193        {
194
195              get
196
197              {
198
199                  return queueName;
200
201              }
202
203              set
204
205              {
206
207                  if (value == null)
208
209                  {
210
211                      throw new ArgumentNullException();
212
213                  }
214
215                  value = value.Trim();
216
217                  if (value.Equals(BLANK_STRING))
218
219                  {
220
221                      throw new ArgumentException("Invalid value (must contain non-whitespace characters)");

222
223                  }
224
225                  lock (queueMonitor)
226
227                  {
228
229                      queueName = value;
230
231                      queuePath = serverAddress + '\\' + queueName;
232
233                      InitializeQueue();
234
235                  }
236
237              }
238
239        }
240
241   
242
243        private void InitializeQueue()
244
245        {
246
247              lock (queueMonitor)
248
249              {           
250
251                  if (queue != null)
252
253                  {
254
255                      try { queue.Close(); }
256
257                      catch {}
258
259                      queue = null;
260
261                  }
262
263
264
265                  try
266
267                  {
268
269                      if(!MessageQueue.Exists(queuePath))
270
271                            MessageQueue.Create(queuePath);
272
273                  }
274
275                  catch {}
276
277                  try
278
279                  {
280
281                      queue = new MessageQueue(queuePath);
282
283                      queue.SetPermissions("EveryOne",MessageQueueAccessRights.FullControl);
284
285                      queue.Formatter = new XmlMessageFormatter(new Type[] {typeof(MoMsg)});

286
287                  }
288
289                  catch (Exception e)
290
291                  {
292
293                      try { queue.Close(); }
294
295                      catch {}
296
297                      queue = null;
298
299                      throw new ApplicationException("Couldn't open queue at '" + queuePath + "': " + e.GetType().FullName + ": " + e.Message);

300
301                  }
302
303
304
305              }
306
307        }
308
309   
310
311        private  void AcquireResources()
312
313        {
314
315              InitializeQueue();
316
317        }
318
319   
320
321        public  void ReleaseResources()
322
323        {
324
325              lock (queueMonitor)
326
327              {
328
329                  if (queue != null)
330
331                  {
332
333                      try
334
335                      {
336
337                            queue.Close();
338
339                      }
340
341                      catch {}
342
343                      queue = null;
344
345                  }
346
347              }   
348
349        }
350
351   
352
353        //阻塞方式
354
355        public MoMsg Read( )
356
357        {
358
359              MoMsg _event = null;           
360
361              lock (queueMonitor)
362
363              {
364
365                  if (queue == null)
366
367                  {
368
369                      InitializeQueue();
370
371                  }
372
373                  try
374
375                  {
376
377                      Message message = queue.Receive( new TimeSpan(0,0,1) );//等待10秒
378
379                      _event = (MoMsg) (message.Body);
380
381                      return _event;
382
383                  }
384
385                  catch (Exception )
386
387                  {
388
389                      try { queue.Close(); }
390
391                      catch {}
392
393                      queue = null;
394
395                  }           
396
397              }
398
399              return null;
400
401        }
402
403
404
405        public void Write(MoMsg _event)
406
407        {
408
409              if (_event == null)
410
411              {
412
413                  return;
414
415              }
416
417              lock (queueMonitor)
418
419              {
420
421                  try
422
423                  {
424
425                      if (queue == null)
426
427                      {
428
429                            InitializeQueue();
430
431                      }
432
433                 
434
435                      Message message = new Message();
436
437                      message.Priority = _event.Priority;
438
439                      message.Recoverable = true;

440
441                      message.Body = _event; //eventFormatter.Format(_event);
442
443
444
445                      queue.Send(message);
446
447                  }
448
449                  catch (Exception e)
450
451                  {
452
453                      try { queue.Close(); }
454
455                      catch {}

456
457                      queue = null;
458
459                      Util.Log.log("Couldn't write Message (" + e.GetType().FullName + ": " + e.Message + ")");
460
461                  }           
462
463              }
464
465        }
466
467
468
469        public static bool statusTest()
470
471        {
472
473              bool reValue = false;
474
475              try
476
477              {
478
479                  MessageEnumerator re = mq.queue.GetMessageEnumerator();
480
481                  bool rev = re.MoveNext();
482
483                  reValue = true;
484
485              }
486
487              catch
488
489              {
490
491                  reValue = false;
492
493              }
494
495
496
497              return reValue;
498
499        }
500
501
502
503        public static void startListen()
504
505        {
506
507              mq = new MsgQueue(Util.MqName);
508
509
510
511              mq.queue.ReceiveCompleted +=new ReceiveCompletedEventHandler(queue_ReceiveCompleted);
512
513             
514
515              //异步方式,并发
516

517              for(int i=0; i<Util.MAX_WORKER_THREADS; i++)
518
519              {
520
521                  // Begin asynchronous operations.
522
523                  waitHandleArray =
524
525                      mq.queue.BeginReceive().AsyncWaitHandle;
526
527              }
528
529
530
531              return;
532
533        }
534

535
536
537        public static void stopListen()
538
539        {
540
541
542
543              for(int i=0;i<waitHandleArray.Length;i++)
544
545              {
546
547                  try
548
549                  {
550
551                      waitHandleArray.Close();
552
553                  }
554
555                  catch

556
557                  {
558
559                      //忽略错误
560
561                  }
562
563              }
564
565
566
567              try
568
569              {
570
571                  // Specify to wait for all operations to return.
572
573                  WaitHandle.WaitAll(waitHandleArray,1000,false);
574
575              }
576
577              catch
578
579              {
580
581                  //忽略错误
582
583              }
584
585        }
586
587
588
589        private static void queue_ReceiveCompleted(object sender, ReceiveCompletedEventArgs e)
590
591        {
592
593              // Connect to the queue.
594
595              MessageQueue mqq = (MessageQueue)sender;

596
597
598
599              // End the asynchronous Receive operation.
600
601              Message m = mqq.EndReceive(e.AsyncResult);
602
603
604
605              Util.ProcessMo((MoMsg)(m.Body));
606
607
608
609              if(Util.isRunning)
610
611              {
612
613                  // Restart the asynchronous Receive operation.
614
615                  mqq.BeginReceive();
616
617              }
618
619           
620
621              return;
622
623        }
624
625    }
626
627}

、哭┈゛.並不代表Wo屈服х. 退一步...並不象徵Wo認輸..→.放手.ǐ.o.並不表示Wo放棄.正如Wo微笑.並不意味Wo快樂┈┊