Im getting the below error when i run my code. This code is to requeue the Deadletter messages.
Error:
Exception has occurred: ServiceBusError
Handler failed: 'tuple' object has no attribute 'get_token'.
AttributeError: 'tuple' object has no attribute 'get_token'
During handling of the above exception, another exception occurred:
File "C:\Users\asisaacr\Downloads\project work\DeadletterRequeue\requeue.py", line 39, in <module>with from_subscription_receiver:
azure.servicebus.exceptions.ServiceBusError: Handler failed: 'tuple' object has no attribute 'get_token'.
Code :
from azure.servicebus import ServiceBusClient, ServiceBusMessage
import json
import configparser# Read configuration from config.conf file
config = configparser.ConfigParser()
config.read('config.conf')service_namespace = config.get('svcbus', 'service_namespace')
shared_access_key_name = config.get('svcbus', 'shared_access_key_name')
shared_access_key_value = config.get('svcbus', 'shared_access_key')
from_subscription_name = config.get('svcbus', 'from_subscription_name')
to_topic_name = config.get('svcbus', 'to_topic_name')
batch_size = int(config.get('svcbus', 'Batch'))# Create a ServiceBusClient with the shared access key credential for the source subscription
bus_service_source = ServiceBusClient(fully_qualified_namespace=service_namespace,credential=(shared_access_key_name, shared_access_key_value)
)# Create a ServiceBusClient with the shared access key credential for the destination topic
bus_service_destination = ServiceBusClient(fully_qualified_namespace=service_namespace,credential=(shared_access_key_name, shared_access_key_value)
)# Get a sender for the destination topic
to_topic_sender = bus_service_destination.get_topic_sender(to_topic_name)# Get a receiver for the source subscription's dead letter queue
from_subscription_receiver = bus_service_source.get_subscription_receiver(topic_name=to_topic_name,subscription_name=from_subscription_name
)counter = 0with from_subscription_receiver:for message in from_subscription_receiver.peek_messages(max_message_count=batch_size):print(f"Reading dead letter from {from_subscription_name}:")request = json.loads(str(message))props = message.user_propertiesif b"DeadLetterReason" in props:del props[b"DeadLetterReason"]if b"DeadLetterErrorDescription" in props:del props[b"DeadLetterErrorDescription"]msg = ServiceBusMessage(json.dumps(request))msg.user_properties = propsto_topic_sender.send_messages(msg)print(f"...message removed after being added to topic: {to_topic_name} {counter}")message.complete()counter += 1if counter == batch_size:break# Close the ServiceBusClients
bus_service_source.close()
bus_service_destination.close()
I havent tried anything, I need help inorder to fix this
The error might occur due to the improper usage of the ServiceBusClient
and the credentials provided.
The code below reads messages from a dead letter queue (DLQ) for a subscription to a topic in Azure Service Bus. It removes the DLQ-related properties from the messages, sends them to the topic, and handles invalid JSON format messages.
I used ServiceBusClient.from_connection_string
to create a ServiceBusClient
instance with a connection string.
I added decoding the message body explicitly using message_body.decode('utf-8')
before attempting to load it as JSON, and a try
/except
block to catch JSONDecodeError
.
I configured basic logging with logging.basicConfig
and used logging.info
and logging.error
to log informational and error messages.
I referred to this MSDOC for Azure Service Bus with Python.
Code:
from azure.servicebus import ServiceBusClient, ServiceBusMessage, ServiceBusReceiveMode
import json
import configparser# Read configuration from config.conf file
config = configparser.ConfigParser()
config.read('config.conf')service_namespace = config.get('svcbus', 'service_namespace')
shared_access_key_name = config.get('svcbus', 'shared_access_key_name')
shared_access_key_value = config.get('svcbus', 'shared_access_key')
from_subscription_name = config.get('svcbus', 'from_subscription_name')
to_topic_name = config.get('svcbus', 'to_topic_name')
batch_size = int(config.get('svcbus', 'Batch'))# Build the connection string
connection_string = f'Endpoint=sb://{service_namespace}.servicebus.windows.net/;SharedAccessKeyName={shared_access_key_name};SharedAccessKey={shared_access_key_value}'# Create a ServiceBusClient using the connection string
servicebus_client = ServiceBusClient.from_connection_string(connection_string)# Get a sender for the destination topic
to_topic_sender = servicebus_client.get_topic_sender(to_topic_name)# Get a receiver for the source subscription's dead letter queue in PEEK_LOCK mode
from_subscription_receiver = servicebus_client.get_subscription_receiver(topic_name=to_topic_name,subscription_name=from_subscription_name,receive_mode=ServiceBusReceiveMode.PEEK_LOCK,max_wait_time=5 # Adjust max_wait_time as needed
)counter = 0with from_subscription_receiver:for message in from_subscription_receiver.receive_messages(max_message_count=batch_size):print(f"Reading dead letter from {from_subscription_name}:")message_content_type = message.content_typeif message_content_type == "application/json":try:message_body = b''.join(list(message.body))request = json.loads(message_body.decode('utf-8'))# Print the message contentprint(f"Message Content: {request}")print(f"Message Body: {message_body}")props = message.application_propertiesif b"DeadLetterReason" in props:del props[b"DeadLetterReason"]if b"DeadLetterErrorDescription" in props:del props[b"DeadLetterErrorDescription"]msg = ServiceBusMessage(json.dumps(request))msg.application_properties = propsto_topic_sender.send_messages(msg)print(f"...message removed after being added to topic: {to_topic_name} {counter}")from_subscription_receiver.complete_message(message) # Complete the messagecounter += 1except json.JSONDecodeError as e:# Handle invalid JSON formatprint(f"Error parsing message: {e}")if counter == batch_size:break# Close the ServiceBusClient
servicebus_client.close()
Output:
- Refer to this link for
azure-identity
with Python in Azure Service Bus.