forked from Raiffeisen-DGTL/ViennaNET
-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathRabbitMqQueueMessageConverter.cs
121 lines (103 loc) · 3.33 KB
/
RabbitMqQueueMessageConverter.cs
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
using System;
using System.Globalization;
using System.Text;
using EasyNetQ;
using ViennaNET.Messaging.Messages;
namespace ViennaNET.Messaging.RabbitMQQueue
{
internal static class RabbitMqQueueMessageConverter
{
private const string DefaultLifeTime = "3600000";
public static MessageProperties ConvertToProperties(this BaseMessage message, RabbitMqQueueConfiguration configuration)
{
var expiration = GetExpiration(message, configuration);
var properties = new MessageProperties
{
MessageId = string.IsNullOrEmpty(message.MessageId)
? Guid.NewGuid()
.ToString()
.ToUpper()
: message.MessageId,
Headers = message.Properties,
DeliveryMode = 2,
Expiration = expiration,
Timestamp = DateTime.Now.ToFileTimeUtc(),
ContentType = GetContentType(message)
};
if (!string.IsNullOrWhiteSpace(message.ReplyQueue))
{
properties.ReplyTo = message.ReplyQueue;
}
if (!string.IsNullOrEmpty(message.CorrelationId))
{
properties.CorrelationId = message.CorrelationId;
}
return properties;
}
private static string GetExpiration(BaseMessage message, RabbitMqQueueConfiguration configuration)
{
if (message.LifeTime.TotalMilliseconds > 0)
{
return message.LifeTime.TotalMilliseconds.ToString(CultureInfo.InvariantCulture);
}
if (configuration.Lifetime.HasValue)
{
return ((int)configuration.Lifetime.Value.TotalMilliseconds).ToString(CultureInfo.InvariantCulture);
}
return DefaultLifeTime;
}
private static string GetContentType(BaseMessage message)
{
switch (message)
{
case TextMessage mes:
return mes.ContentType ?? ContentType.Text.ToString("G");
case BytesMessage _:
return ContentType.Bytes.ToString("G");
default:
throw new
ArgumentException($"Unknown inherited type of BaseMessage ({message.GetType()}) while get content type of Rabbit message");
}
}
public static BaseMessage ConvertToBaseMessage(this byte[] body, MessageProperties messageProperties)
{
BaseMessage message;
if (messageProperties.ContentType == ContentType.Bytes.ToString("G"))
{
message = new BytesMessage { Body = body };
}
else
{
message = new TextMessage { Body = Encoding.UTF8.GetString(body) };
}
message.MessageId = messageProperties.MessageId;
message.CorrelationId = messageProperties.CorrelationId;
message.ReplyQueue = messageProperties.ReplyTo;
message.SendDateTime = DateTime.FromFileTimeUtc(messageProperties.Timestamp);
message.ReceiveDate = DateTime.Now;
if (TimeSpan.TryParse(messageProperties.Expiration, out var lifetime))
{
message.LifeTime = lifetime;
}
foreach (var header in messageProperties.Headers)
{
var value = string.Empty;
if (header.Value is byte[] bytes)
{
value = Encoding.UTF8.GetString(bytes);
}
if (header.Value is string stringHeader)
{
value = stringHeader;
}
message.Properties.Add(header.Key, value);
}
return message;
}
internal enum ContentType
{
Bytes,
Text
}
}
}