Scenario: Rabbit MQ as Message Broker
Solution:
Per RabbitMQ:
RabbitMQ is an open-source message-broker software that originally implemented the Advanced Message Queuing Protocol and has since been extended with a plug-in architecture to support Streaming Text Oriented Messaging Protocol, MQ Telemetry Transport, and other protocols.
Below are the components of RabbitMQ. Ex could be a Post Office
- Producer - Which produces a message [Dropping a letter].
- Producer sends message to one or more exchanges.
- Message Broker [Post Office - They know how to deliver the posted message to the receiver]
- Exchange [Postal Department]
- There could be multiple exchanges.
- Exchange push messages to one or more queue.
- Types of exchanges
- Direct
- Topic
- Header
- Fanout - Publish same message to multiple queues (each message has its own reference, exchange stores it only once).
- Queues [Letter box, Person check time to time or one time of day etc.]
- They are tied to exchange through binding
- Consumer [Letter receiver]
- Consumer listening to messages pushed to no, one or more queues.
- Connections
- Producer & Consumer opens one connection with Message Broker over tcp.
- Channel
- Connection can have multiple Channels. So one connection but messages pushed over channels (threads).
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46 | //Producer
static void Main(string[] args)
{
var factory = new ConnectionFactory() { HostName = "localhost" };
var connection = factory.CreateConnection();
var channel = connection.CreateModel();
// queue: The name of the queue. Pass an empty string to make the server generate a name.
// durable: Should this queue will survive a broker restart?
// exclusive: Should this queue use be limited to its declaring connection? Such a queue will be deleted when its declaring connection closes.
// autoDelete:Should this queue be auto-deleted when its last consumer (if any) unsubscribes?
channel.QueueDeclare(queue: "messages", durable: false, exclusive: false, autoDelete: false,
arguments: null);
var data = Encoding.UTF8.GetBytes("Hi There");
//Default Exchange
channel.BasicPublish("", "messages", null, data);
}
//Consumer
static void Main(string[] args)
{
var factory = new ConnectionFactory() { HostName = "localhost" };
var connection = factory.CreateConnection();
var channel = connection.CreateModel();
// queue: The name of the queue. Pass an empty string to make the server generate a name.
// durable: Should this queue will survive a broker restart?
// exclusive: Should this queue use be limited to its declaring connection? Such a queue will be deleted when its declaring connection closes.
// autoDelete:Should this queue be auto-deleted when its last consumer (if any) unsubscribes?
channel.QueueDeclare(queue: "messages", durable: false, exclusive: false, autoDelete: false,
arguments: null);
var consumer = new EventingBasicConsumer(channel);
consumer.Received += (obj, evn) =>
{
var message = Encoding.UTF8.GetString(evn.Body.ToArray());
Console.WriteLine($"Received message: {message}");
};
channel.BasicConsume(queue: "messages", autoAck: true, consumer: consumer);
Console.ReadLine();
} |
Competing Consumers:
- The competing consumers or work queue pattern is a way to spread the consumption of messages across a number of different consumers. This is used to process message in scale-able and reliable manner.
- We can add multiple consumers to scale the system as if the consumers is slow to process message the exchange will get backlogged with memory issues.
- It uses round robin method, so broker will try C1 then C2 etc. The issue is C1 might already have a message in flight and C2 might be idle. To over come unless the C1 acknowledges the processing of message new message is not given to it.
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34 | static void Main(string[] args)
{
var factory = new ConnectionFactory() { HostName = "localhost" };
var connection = factory.CreateConnection();
var channel = connection.CreateModel();
// queue: The name of the queue. Pass an empty string to make the server generate a name.
// durable: Should this queue will survive a broker restart?
// exclusive: Should this queue use be limited to its declaring connection? Such a queue will be deleted when its declaring connection closes.
// autoDelete:Should this queue be auto-deleted when its last consumer (if any) unsubscribes?
channel.QueueDeclare(queue: "messages", durable: false, exclusive: false, autoDelete: false,
arguments: null);
//prefetchSize: The server will send a message in advance if it is equal to or smaller
//in size than the available prefetch size. 0 means "no specific limit.
//The prefetch-size is ignored if the no-ack option is set.
//prefetchCount: Max unack messages before broker sends new message to consumer
channel.BasicQos(prefetchSize: 0, prefetchCount: 1, global: false);
var consumer = new EventingBasicConsumer(channel);
consumer.Received += (obj, evn) =>
{
var message = Encoding.UTF8.GetString(evn.Body.ToArray());
Console.WriteLine($"Received message: {message}");
channel.BasicAck(deliveryTag: evn.DeliveryTag, multiple: false);
};
channel.BasicConsume(queue: "messages", autoAck: false, consumer: consumer);
Console.ReadLine();
} |
Pub/Sub:
- Delivering same message (duplicatng) to multiple consumers.
- In Competing pattern, one message share across multiple consumers.
- Decoupling Producer from Consumer.
- Temporary queues, Consumer creates temporary queues and then it is destroyed.
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46 | //Producer
static void Main(string[] args)
{
var factory = new ConnectionFactory() { HostName = "localhost" };
var connection = factory.CreateConnection();
var channel = connection.CreateModel(); channel.ExchangeDeclare(exchange: "pubsub", type: ExchangeType.FanOut);
var data = Encoding.UTF8.GetBytes("Hi There");
//Default Exchange
channel.BasicPublish("pubsub", "", null, data); }
//Consumer
static void Main(string[] args)
{
var factory = new ConnectionFactory() { HostName = "localhost" };
var connection = factory.CreateConnection();
var channel = connection.CreateModel();
channel.ExchangeDeclare(exchange: "pubsub", type: ExchangeType.FanOut);
// queue: The name of the queue. Pass an empty string to make the server generate a name.
// durable: Should this queue will survive a broker restart?
// exclusive: Should this queue use be limited to its declaring connection? Such a queue will be deleted when its declaring connection closes.
// autoDelete:Should this queue be auto-deleted when its last consumer (if any) unsubscribes?
var queueName = channel.QueueDeclare.QueueName;
var consumer = new EventingBasicConsumer(channel);
channel.QueueBind(queue: queueName , exchange: "pubsub", routingKey:"");
consumer.Received += (obj, evn) =>
{
var message = Encoding.UTF8.GetString(evn.Body.ToArray());
Console.WriteLine($"Received message: {message}");
};
channel.BasicConsume(queue: queueName , autoAck: true, consumer: consumer);
Console.ReadLine();
} |
| |
No comments:
Post a Comment