Component services
The services that your VALAWAI component provides and consumes are formally defined within
the asyncapi.yaml
file. The component definition
section provides detailed guidelines on how to specify these services, which essentially
describe the messages your component can exchange via RabbitMQ queues.
This section will guide you through the practical implementation of connecting to RabbitMQ and implementing the mechanisms for both listening to (consuming) and publishing messages, all leveraging the capabilities of the Pika libary. You can see the integration of all the concepts explained here in the Message service subsection of the Echo example section.
Create a connection
The Pika documentation outlines various methods for establishing a connection with RabbitMQ. The simplest approach is often using a blocking connection, as demonstrated in the following example:
import pika
# Connection parameters (adjust as needed)
credentials = pika.PlainCredentials('guest', 'guest') # Default credentials
connection_parameters = pika.ConnectionParameters('localhost', credentials=credentials)
try:
connection = pika.BlockingConnection(connection_parameters)
channel = connection.channel()
print("Successfully connected to RabbitMQ!")
except pika.exceptions.AMQPConnectionError as e:
print(f"Failed to connect to RabbitMQ: {e}")
# Handle the connection error appropriately (e.g., retry, exit)
exit(1)
Publish a message
The VALAWAI infrastructure relies on JSON-encoded messages for communication. You can achieve
this encoding using the dump()
method from the json
library.
Once your message is encoded, you can publish it using the basic_publish()
method
of your connection's channel. For direct queue targeting, the routing_key
should be
the name of the destination queue, and the exchange parameter should be an empty string ('').
The following code illustrates how to send a log message to the Master of VALAWAI.
import json
import pika
msg = {
"level": "INFO",
"message": "The component is active",
"payload": "{\"pattern\":\"p1\"}",
"component_id": "your_component_id" # Replace with your component's ID
}
body = json.dumps(msg)
properties = pika.BasicProperties(content_type='application/json')
channel.basic_publish(exchange='', routing_key='valawai/log/add', body=body, properties=properties)
Listening for messages
The Pika documentation details various ways
to listen for messages from RabbitMQ. We will focus on the blocking approach. With this method,
message processing for a subscription doesn't begin until you explicitly call the start_consuming()
method on the channel. Once called, the current Python thread will be blocked until the connection
is closed or encounters an error.
Before invoking start_consuming()
, you must declare the queues you intend to listen to and define
the callback methods that will handle incoming messages.
queue_name = 'my_queue'
channel.queue_declare(queue=queue_name, durable=True)
print(f"Queue '{queue_name}' declared.")
The callback methods responsible for processing received messages must accept four arguments:
ch
(the channel object), method
(delivery information), properties
(message properties),
and body
(the message content as a JSON-encoded string). You can use the loads()
method from
the json
library to deserialize the body into a Python dictionary.
The following example demonstrates how to define a class to manage messages received by
a VALAWAI component from the queue valawai/cx/name/control/parameters
:
import pika
import json
import logging
class ChangeParametersHandler:
def __init__(self, channel: pika.channel.Channel):
channel.queue_declare(queue='valawai/cx/name/control/parameters',
durable=True,
exclusive=False,
auto_delete=False)
channel.basic_consume(queue='valawai/cx/name/control/parameters',
auto_ack=True,
on_message_callback=self.handle_message)
def handle_message(self, ch, method, properties, body):
try:
parameters = json.loads(body)
# Implement your logic to process the received parameters
print(f"Received parameters: {parameters}")
except Exception:
logging.exception(f"Unexpected message: {body.decode()}")
To make this handler effective, you need to instantiate it before calling start_consuming()
.
In the context of your component's main application (src/cX_name/__main__.py
), you would
typically initialize these handlers within the start()
method of your main App
class,
as shown below:
import pika
import logging
class App:
"""The main application class for the component."""
def __init__(self):
self.connection = None
def start(self):
"""Initializes the component."""
try:
# Create a connection to RabbitMQ
credentials = pika.PlainCredentials('guest', 'guest')
connection_parameters = pika.ConnectionParameters('localhost', credentials=credentials)
self.connection = pika.BlockingConnection(connection_parameters)
channel = self.connection.channel()
# Create the handlers for the events
ChangeParametersHandler(channel)
# Start to process the received events
logging.info("Started CX name")
channel.start_consuming()
except pika.exceptions.AMQPConnectionError as e:
logging.error(f"Error connecting to RabbitMQ: {e}")
except Exception as e:
logging.exception(f"An error occurred during startup: {e}")
finally:
if self.connection and self.connection.is_open:
self.connection.close()
logging.info("Connection closed.")
def stop(self):
"""Cleans up resources and stops the component."""
logging.info("Stopping CX name")
if self.connection and self.connection.is_open and self.connection.channel.is_consuming:
self.connection.channel.stop_consuming()
if self.connection and self.connection.is_open:
self.connection.close()
logging.info("CX name stopped.")
if __name__ == "__main__":
app = App()
try:
app.start()
except KeyboardInterrupt:
app.stop()
Key Considerations When Using Pika
When working with Pika's blocking connection, keep the following points in mind:
- Connection Resilience: Establishing the initial blocking connection might fail if RabbitMQ is not yet available. You might need to implement a retry mechanism with a delay to handle this scenario gracefully.
- Clean Shutdown: Before closing a blocking connection that is consuming messages,
ensure you call
stop_consuming()
on the channel. Failing to do so can lead to errors during the connection closure. - Queue Declaration: Always declare queues before attempting to consume messages from them to ensure they exist.
- Separate Connections for Publishing and Consuming (Blocking): With blocking connections, it's generally recommended to use separate connections or channels if you need to simultaneously publish and consume messages without blocking the consuming thread. However, the provided examples often use the same channel for simplicity within a single-threaded context. For more complex scenarios, consider using separate threads or asynchronous Pika adapters.