Skip to main content

Step 4: Component Services

Now, we'll define the specific services this component provides: listening for incoming messages and then echoing them back. This involves modifying the asyncapi.yaml file to specify the channels for both receiving and publishing messages. Subsequently, we'll implement the message handler and update the start method in __main__.py to register this handler for incoming messages.

Modifying asyncapi.yaml to add the services.

Within the asyncapi.yaml file, in the channels section, we need to define the following:

  • valawai/c1/echo_example_with_python_and_pika/data/received_message: This channel will be used to receive the messages that need to be echoed.
  • valawai/c1/echo_example_with_python_and_pika/data/publish_message: This channel will be used to publish the echoed messages.

The following YAML snippet demonstrates how these services can be defined:

channels:
valawai/c1/echo_example_with_python_and_pika/data/received_message:
description: Receive the message to echo.
subscribe:
message:
$ref: '#/components/messages/echo_message'

valawai/c1/echo_example_with_python_and_pika/data/publish_message:
description: Publish the echoed message.
publish:
message:
$ref: '#/components/messages/echo_message'

components:
messages:
ech_message:
contentType: application/json
payload:
$ref: '#/components/schemas/echo_payload'

schemas:
echo_payload:
description: A payload of a message to echo.
type: object
properties:
content:
description: The content of the message.
type: string
minLength: 1

As you can see, on line 2, we define the received_message channel with a subscribe operation, indicating it's used for receiving messages. Similarly, on line 8, the publish_message channel is defined with a publish operation for sending out echoed messages. Both channels reference the same echo_message defined in the components/messages section (line 15). This message, in turn, refers to the echo_payload schema (line 21), which simply contains a content field of type string.

For a more comprehensive understanding of defining component interactions, you can refer to the component specification. Additionally, the complete asyncapi.yaml file can be found in the repository.

Implement the echo services

To bring the echo functionality to life, we need to create the following files within your project's root directory (C1_echo_example_with_python_and_pika/):

C1_echo_example_with_python_and_pika/
├── src/
│ └── c1_echo_example_with_python_and_pika
│ ├── echo_payload.py
│ └── echo_handler.py
└── tests/
├── test_echo_payloadr.py
└── test_echo_hanlder.py

Here's a brief overview of the purpose of each file:

  • echo_handler.py: Contains the core logic for processing incoming messages and publishing their echoed counterparts.
  • echo_payload.py: Defines the data structure (using Pydantic for validation) for the content of both incoming and outgoing messages.
  • test_echo_payload.py: Includes unit tests to ensure the echo_payload.py model functions as expected.
  • test_echo_handler.py: Contains unit tests to verify the behavior of the message handling logic in echo_handler.py.

Let's delve into the details of each file.

Filing in the echo_payload.py file

As defined in the asyncapi.yaml, the echo_payload includes a content field with a minimum length of 1, ensuring it's not empty. To enforce this constraint and provide robust data validation, we utilize the pydantic library to define our data model. You can see the implementation in the echo_payload.py file here:

src/c1_echo_example_with_python_and_pika/echo_payload.py
loading...

Specifically, line 25 of this file defines the content field with the constraint that its length must be at least 1.

Filing in the echo_handler.py file

To handle messages arriving on the valawai/c1/echo_example_with_python_and_pika/data/received_message channel, we create a dedicated handler in the echo_handler.py file.

src/c1_echo_example_with_python_and_pika/echo_handler.py
loading...

Upon instantiation, the handler registers itself to listen for incoming messages on the valawai/c1/echo_example_with_python_and_pika/data/received_message channel. This registration triggers the handle_message callback method whenever a new message is received (refer to line 46).

The handle_message method orchestrates the following crucial steps:

  • Message Body Parsing: The raw body of the incoming message is parsed to extract a JSON object (as seen on line 55).
  • Payload Conversion and Validation: The parsed JSON is then converted into an instance of the EchoPayload model. This step includes a validation process to ensure the message conforms to the expected structure and data types (illustrated on line 59).
  • Echo Message Creation: Based on the validated EchoPayload, the echo message content is constructed (lines 62-64).
  • Message Publication: Finally, the newly created echo message is published to the designated channel valawai/c1/echo_example_with_python_and_pika/data/publish_message (demonstrated on line 65).

Filing in the test_echo_payload.py file

This file contains unit tests to ensure the EchoPayload model defined in echo_payload.py behaves as expected, particularly regarding the validation constraints. You can find the test implementation here:

tests/test_echo_payload.py
loading...

Filing in the test_echo_hanlder.py file

This file focuses on testing the logic within the echo_handler.py that manages the received echo messages. Here's the content of the test file:

tests/test_echo_handler.py
loading...

Within this file, several key aspects are tested:

  • On line 41, the test setup registers a handler for outgoing messages, allowing the test to capture what the handler publishes.
  • Line 52 defines a method that is called whenever the handler publishes a message, enabling the test to inspect the published content.
  • Finally, the core test logic on line 66 verifies that whenever a message is sent to the valawai/c1/echo_example_with_python_and_pika/data/received_message channel, the EchoHandler correctly publishes the echoed message to the valawai/c1/echo_example_with_python_and_pika/data/publish_message channel.

Modifying __main__.py to add the echo handler

The final step in enabling the echo functionality is to register the message handler within the start method of your __main__.py file. This handler will be responsible for processing incoming messages on the designated channel and publishing the echoed versions.

Here's how the start method in __main__.py should be structured:

def start(self):
"""Initialize the component"""

try:
# Create connection to RabbitMQ
self.message_service = MessageService()
self.mov = MOV(self.message_service)

# Create the handlers for the events
version = self.mov.load_default_project_version()
asyncapi_yaml = self.mov.load_default_asyncapi_yaml()
name = self.mov.extract_default_component_name(asyncapi_yaml)
self.mov.listen_for_registered_component(name)

EchoHandler(self.message_service, self.mov)

# Register the component
self.mov.register_component()

# Start to process the received events
logging.info("Started C1 Echo")
self.message_service.start_consuming()

except (OSError, ValueError):

logging.exception("Could not start the component")

In the code above, the crucial line for registering the echo handler is line 15: EchoHandler(self.message_service, self.mov). This line instantiates the EchoHandler, which contains the logic for receiving messages and publishing their echoes. By creating an instance of this handler, you are effectively registering it with the message_serviceto listen for the channel defined in your asyncapi.yaml.

You can find the complete implementation of the main.py file in the repository.