forked from Raiffeisen-DGTL/ViennaNET
-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathMqSeriesQueueMessageConverter.cs
107 lines (96 loc) · 3.66 KB
/
MqSeriesQueueMessageConverter.cs
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
using System;
using System.Text;
using IBM.XMS;
using ViennaNET.Messaging.Messages;
namespace ViennaNET.Messaging.MQSeriesQueue
{
internal static class MqSeriesQueueMessageConverter
{
public static IMessage ConvertToMqMessage(this BaseMessage message, ISession session)
{
var mes = ConvertToInternalMessage(message, session);
mes.JMSMessageID = string.IsNullOrWhiteSpace(message.MessageId)
? Guid.NewGuid()
.ToString()
.ToUpper()
: message.MessageId;
mes.JMSCorrelationID = string.IsNullOrWhiteSpace(message.CorrelationId)
? mes.JMSMessageID
: message.CorrelationId;
mes.JMSExpiration = (long)message.LifeTime.TotalMilliseconds;
if (!string.IsNullOrWhiteSpace(message.ReplyQueue))
{
var replyQueue = session.CreateQueue(message.ReplyQueue.Trim());
mes.JMSReplyTo = replyQueue;
}
foreach (var kv in message.Properties)
{
mes.SetObjectProperty(kv.Key, kv.Value);
}
return mes;
}
private static IMessage ConvertToInternalMessage(BaseMessage message, ISession session)
{
switch (message)
{
case TextMessage textMessage:
return session.CreateTextMessage(textMessage.Body);
case BytesMessage bytesMessage:
var result = session.CreateBytesMessage();
result.WriteBytes(bytesMessage.Body);
return result;
default:
throw new ArgumentException($"Unknown inherited type of BaseMessage ({message.GetType()}) while converting to IMessage");
}
}
public static BaseMessage ConvertToBaseMessage(this IMessage receivedMessage)
{
var sendDate = ConvertJavaTimestampToDateTime(receivedMessage.JMSTimestamp);
var expirationDate = ConvertJavaTimestampToDateTime(receivedMessage.JMSExpiration);
var message = CreateBaseMessage(receivedMessage);
message.MessageId = receivedMessage.JMSMessageID;
message.CorrelationId = receivedMessage.JMSCorrelationID;
message.SendDateTime = sendDate;
message.ReceiveDate = DateTime.Now;
message.ReplyQueue = receivedMessage.JMSReplyTo?.Name;
message.LifeTime = expirationDate > sendDate
? expirationDate - sendDate
: new TimeSpan();
// чтение properties сообщения web sphere.
var propertyNames = receivedMessage.PropertyNames;
var messageProps = new StringBuilder();
while (propertyNames.MoveNext())
{
var name = (string)propertyNames.Current ?? string.Empty;
var prop = receivedMessage.GetObjectProperty(name);
message.Properties.Add(name, prop);
messageProps = messageProps.Append(name)
.Append(" = ")
.Append(prop)
.Append(";\n");
}
return message;
}
private static DateTime ConvertJavaTimestampToDateTime(long jmsTimestamp)
{
// Example: Converting Java millis time to .NET time
var baseTime = new DateTime(1970, 1, 1, 0, 0, 0);
var utcTimeTicks = jmsTimestamp * 10000 + baseTime.Ticks;
return new DateTime(utcTimeTicks, DateTimeKind.Utc);
}
private static BaseMessage CreateBaseMessage(IMessage message)
{
switch (message)
{
case ITextMessage textMessage:
return new TextMessage { Body = textMessage.Text };
case IBytesMessage bytesMessage:
var buffer = new byte[bytesMessage.BodyLength];
bytesMessage.ReadBytes(buffer);
return new BytesMessage { Body = buffer };
default:
return new TextMessage { Body = string.Empty };
}
}
}
}