Rabbit MQ - Overview

                   

Scenario: Rabbit MQ as Message Broker

Solution:

Per RabbitMQ:

RabbitMQ is an open-source message-broker software that originally implemented the Advanced Message Queuing Protocol and has since been extended with a plug-in architecture to support Streaming Text Oriented Messaging Protocol, MQ Telemetry Transport, and other protocols.

Below are the components of RabbitMQ. Ex could be a Post Office

  1. Producer - Which produces a message [Dropping a letter].
    1. Producer sends message to one or more exchanges.
  2. Message Broker [Post Office - They know how to deliver the posted message to the receiver]
    1. Exchange [Postal Department]
      1. There could be multiple exchanges.
      2. Exchange push messages to one or more queue.
      3. Types of exchanges
        1. Direct
        2. Topic
        3. Header
        4. Fanout - Publish same message to multiple queues (each message has its own reference, exchange stores it only once).
    2. Queues [Letter box, Person check time to time or one time of day etc.]
      1. They are tied to exchange through binding
  3. Consumer [Letter receiver]
    1. Consumer listening to messages pushed to no, one or more queues.
  4. Connections
    1. Producer & Consumer opens one connection with Message Broker over tcp.
  5. Channel
    1. Connection can have multiple Channels. So one connection but messages pushed over channels (threads).

     1
     2
     3
     4
     5
     6
     7
     8
     9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    33
    34
    35
    36
    37
    38
    39
    40
    41
    42
    43
    44
    45
    46
    //Producer
            static void Main(string[] args)
            {
                var factory = new ConnectionFactory() { HostName = "localhost" };
                var connection = factory.CreateConnection();
                var channel = connection.CreateModel();
    
                // queue: The name of the queue. Pass an empty string to make the server generate a name.
                // durable: Should this queue will survive a broker restart?
                // exclusive: Should this queue use be limited to its declaring connection? Such a queue will be deleted when its declaring connection closes.
                // autoDelete:Should this queue be auto-deleted when its last consumer (if any) unsubscribes?
                channel.QueueDeclare(queue: "messages", durable: false, exclusive: false, autoDelete: false,
                    arguments: null);
    
                var data = Encoding.UTF8.GetBytes("Hi There");
    
                //Default Exchange
                channel.BasicPublish("", "messages", null, data);
            }
    
            //Consumer
            static void Main(string[] args)
            {
                var factory = new ConnectionFactory() { HostName = "localhost" };
                var connection = factory.CreateConnection();
                var channel = connection.CreateModel();
    
                // queue: The name of the queue. Pass an empty string to make the server generate a name.
                // durable: Should this queue will survive a broker restart?
                // exclusive: Should this queue use be limited to its declaring connection? Such a queue will be deleted when its declaring connection closes.
                // autoDelete:Should this queue be auto-deleted when its last consumer (if any) unsubscribes?
                channel.QueueDeclare(queue: "messages", durable: false, exclusive: false, autoDelete: false,
                    arguments: null);
    
                var consumer = new EventingBasicConsumer(channel);
    
                consumer.Received += (obj, evn) =>
                {
                    var message = Encoding.UTF8.GetString(evn.Body.ToArray());
                    Console.WriteLine($"Received message: {message}");
                };
    
                channel.BasicConsume(queue: "messages", autoAck: true, consumer: consumer);
    
                Console.ReadLine();
            }

     
Competing Consumers:
  1. The competing consumers or work queue pattern is a way to spread the consumption of messages across a number of different consumers. This is used to process message in scale-able and reliable manner. 
  2. We can add multiple consumers to scale the system as if the consumers is slow to process message the exchange will get backlogged with memory issues. 
  3. It uses round robin method, so broker will try C1 then C2 etc. The issue is C1 might already have a message in flight and C2 might be idle. To over come unless the C1 acknowledges the processing of message new message is not given to it.

    1
     2
     3
     4
     5
     6
     7
     8
     9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    33
    34
    static void Main(string[] args)
            {
                var factory = new ConnectionFactory() { HostName = "localhost" };
                var connection = factory.CreateConnection();
                var channel = connection.CreateModel();
    
                // queue: The name of the queue. Pass an empty string to make the server generate a name.
                // durable: Should this queue will survive a broker restart?
                // exclusive: Should this queue use be limited to its declaring connection? Such a queue will be deleted when its declaring connection closes.
                // autoDelete:Should this queue be auto-deleted when its last consumer (if any) unsubscribes?
                channel.QueueDeclare(queue: "messages", durable: false, exclusive: false, autoDelete: false,
                    arguments: null);
    
                //prefetchSize: The server will send a message in advance if it is equal to or smaller
                //in size than the available prefetch size. 0 means "no specific limit.
                //The prefetch-size is ignored if the no-ack option is set.
    
                //prefetchCount: Max unack messages before broker sends new message to consumer
                channel.BasicQos(prefetchSize: 0, prefetchCount: 1, global: false);
    
                var consumer = new EventingBasicConsumer(channel);
    
                consumer.Received += (obj, evn) =>
                {
                    var message = Encoding.UTF8.GetString(evn.Body.ToArray());
                    Console.WriteLine($"Received message: {message}");
    
                    channel.BasicAck(deliveryTag: evn.DeliveryTag, multiple: false);
                };
    
                channel.BasicConsume(queue: "messages", autoAck: false, consumer: consumer);
    
                Console.ReadLine();
            }

Pub/Sub:
  1. Delivering same message (duplicatng) to multiple consumers.
  2. In Competing pattern, one message share across multiple consumers.
  3. Decoupling Producer from Consumer.
  4. Temporary queues, Consumer creates temporary queues and then it is destroyed.

    1
     2
     3
     4
     5
     6
     7
     8
     9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    33
    34
    35
    36
    37
    38
    39
    40
    41
    42
    43
    44
    45
    46
    //Producer
            static void Main(string[] args)
            {
                var factory = new ConnectionFactory() { HostName = "localhost" };
                var connection = factory.CreateConnection();
                var channel = connection.CreateModel();
                
                channel.ExchangeDeclare(exchange: "pubsub", type: ExchangeType.FanOut);
    var data = Encoding.UTF8.GetBytes("Hi There"); //Default Exchange channel.BasicPublish("pubsub", "", null, data);
            }
    
            //Consumer
            static void Main(string[] args)
            {
                var factory = new ConnectionFactory() { HostName = "localhost" };
                var connection = factory.CreateConnection();
                var channel = connection.CreateModel();

                channel.ExchangeDeclare(exchange: "pubsub", type: ExchangeType.FanOut);
    // queue: The name of the queue. Pass an empty string to make the server generate a name. // durable: Should this queue will survive a broker restart? // exclusive: Should this queue use be limited to its declaring connection? Such a queue will be deleted when its declaring connection closes. // autoDelete:Should this queue be auto-deleted when its last consumer (if any) unsubscribes? var queueName = channel.QueueDeclare.QueueName; var consumer = new EventingBasicConsumer(channel);

                channel.QueueBind(queue: queueName , exchange: "pubsub", routingKey:"");
                consumer.Received += (obj, evn) =>
                {
                    var message = Encoding.UTF8.GetString(evn.Body.ToArray());
                    Console.WriteLine($"Received message: {message}");
                };
    
                channel.BasicConsume(queue: queueName , autoAck: true, consumer: consumer);
    Console.ReadLine(); }

     

No comments:

Post a Comment

Move Github Sub Repository back to main repo

 -- delete .gitmodules git rm --cached MyProject/Core git commit -m 'Remove myproject_core submodule' rm -rf MyProject/Core git remo...