生产消息
import pika
import json
import time
import logging
from typing import Dict, Any
import argparse
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)
class RobustRabbitMQProducer
:
def __init__(self, host='localhost', port=5672, vhost='vhost_test' ,username='rabbitmq', password='rabbitmq@123',connection_attempts=3,aliyun_object_name="None"):
self.host = host
self.port = port
self.username = username
self.password = password
self.vhost = vhost
self.connection_attempts = connection_attempts
self.connection = None
self.channel = None
self.aliyun_obj = aliyun_object_name
self.connect()
def connect(self):
"""建立连接"""
try:
self.connection = pika.BlockingConnection(
pika.ConnectionParameters(
host=self.host,
port=self.port,
credentials=pika.PlainCredentials(self.username, self.password),
virtual_host=self.vhost,
connection_attempts=self.connection_attempts,
heartbeat=600,
blocked_connection_timeout=300
)
)
self.channel = self.connection.channel()
self.channel.exchange_declare(
exchange='unlock_rclone_topic_exchange', # 交换机名称
exchange_type='topic', # 交换机类型(与路由键匹配)
durable=True, # 持久化(重启后不丢失)
auto_delete=False # 不自动删除
)
logger.info("✅ 成功连接到RabbitMQ")
except Exception as e:
logger.error(f"❌ 连接RabbitMQ失败: {e
}")
msg=f"❌ 连接RabbitMQ失败: {e
}"
self.warning_send_to_wechat(record_msg=msg,obj_name=f"{self.aliyun_obj
}")
raise
def warning_send_to_wechat(self,record_msg=None,obj_name=None):
import requests
import json
url = "《机器人URL》"
# 构建正确的JSON payload
payload = {
"msgtype": "text",
"text": {
"content": f"阿里云OSS解锁记录失败: {record_msg
}\n 对象: {obj_name
}",
"mentioned_list": ["@ user_ID"]
}
}
headers = {
'Content-Type': 'application/json' # 修正Content-Type
}
response = requests.post(url, headers=headers, data=json.dumps(payload))
# 检查响应
if response.status_code == 200:
msg="企业微信通知发送成功"
print(msg)
#return True,msg
else:
msg=f"企业微信通知发送失败: {record_msg,obj_name
} "
print(msg)
def ensure_connection(self):
"""确保连接有效"""
if self.connection is None or self.connection.is_closed:
logger.warning("连接已关闭,尝试重新连接...")
self.connect()
def send_message(self, exchange: str, routing_key: str, message: Dict[str, Any],
persistent: bool = True, retry_count: int = 3):
"""发送消息(带重试机制)"""
for attempt in range(retry_count):
try:
self.ensure_connection()
properties = pika.BasicProperties(
delivery_mode=2 if persistent else 1, # 2=持久化,1=非持久化
content_type='application/json',
timestamp=int(time.time())
)
self.channel.basic_publish(
exchange=exchange,
routing_key=routing_key,
body=json.dumps(message),
properties=properties,
mandatory=True # 确保消息被路由到队列
)
logger.info(f" 消息发送成功: {exchange
} ->
{routing_key
}")
return True
except pika.exceptions.UnroutableError:
self.warning_send_to_wechat(record_msg=f"⚠️ 消息无法路由: {routing_key
}",obj_name=f"{message
}")
logger.warning(f"⚠️ 消息无法路由: {routing_key
}")
return False
except Exception as e:
self.warning_send_to_wechat(record_msg=f"❌ 发送失败 尝试 {attempt + 1
}/{retry_count
}: {e
}",obj_name=f"{message
}")
logger.error(f"❌ 发送失败 (尝试 {attempt + 1
}/{retry_count
}): {e
}")
if attempt < retry_count - 1:
time.sleep(2) # 等待后重试
else:
raise
def close(self):
"""安全关闭连接"""
if self.connection and not self.connection.is_closed:
self.connection.close()
logger.info(" 连接已关闭")
def parse_args():
"""
解析命令行入口参数,定义用户可传递的参数规则
"""
parser = argparse.ArgumentParser(
description="RabbitMQ消息发送脚本:支持用户传递rclone相关的对象名及消息内容",
formatter_class=argparse.RawTextHelpFormatter # 支持换行显示帮助信息
)
# 1. 必传核心参数:rclone用到的对象名
parser.add_argument(
"--aliyun-obj", # 参数名(长选项)
"-a", # 短选项
required=True, # 必传
type=str,
help="阿里云对象存储的对象名(rclone操作目标,例如:oss://my-bucket/path)"
)
parser.add_argument(
"--minio-obj",
"-m",
required=True,
type=str,
help="MinIO对象存储的对象名(rclone操作目标,例如:minio://my-bucket/path)"
)
# 2. 消息内容参数:支持用户自定义(提供默认值,可选传)
parser.add_argument(
"--message-ali",
"-ma",
type=str,
default='{"service": "aliyun", "action": "copy"}',
help='阿里云相关消息体(JSON格式字符串)\n'
'示例:\'{"service":"aliyun","action":"copy","files":["data.csv"]}\'\n'
'默认值:{"service": "aliyun", "action": "copy"}'
)
parser.add_argument(
"--message-minio",
"-mm",
type=str,
default='{"tool": "rclone", "action": "copy", "source": "/tmp"}',
help='MinIO相关消息体(JSON格式字符串)\n'
'示例:\'{"tool":"rclone","action":"move","source":"/data"}\'\n'
'默认值:{"tool": "rclone", "action": "copy", "source": "/tmp"}'
)
# 3. 可选参数:RabbitMQ交换器和路由键(如需灵活配置可开放,这里用默认值)
parser.add_argument(
"--exchange",
"-e",
type=str,
default="unlock_rclone_topic_exchange",
help="RabbitMQ主题交换器名称(默认:unlock_rclone_topic_exchange)"
)
parser.add_argument(
"--routing-key-ali",
"-rka",
type=str,
default="aliV4.sync",
help="阿里云消息的路由键(默认:aliV4.sync)"
)
parser.add_argument(
"--routing-key-minio",
"-rkm",
type=str,
default="rclone.copy",
help="MinIO消息的路由键(默认:rclone.copy)"
)
return parser.parse_args()
def validate_and_parse_json(json_str: str, param_name: str) ->
dict:
"""
校验并解析JSON格式的字符串为字典
:param json_str: 待解析的JSON字符串
:param param_name: 参数名(用于报错提示)
:return: 解析后的字典
"""
import json # 局部导入(仅用到时加载)
try:
return json.loads(json_str)
except json.JSONDecodeError as e:
raise ValueError(f"参数【{param_name
}】格式错误,必须是合法JSON字符串:{e
}")
def build_messages(args) ->
list:
"""
根据用户传入的参数,构建要发送的RabbitMQ消息列表
:param args: 解析后的命令行参数
:return: 消息列表
"""
# 解析JSON格式的消息体
msg_ali = validate_and_parse_json(args.message_ali, "--message-ali")
msg_minio = validate_and_parse_json(args.message_minio, "--message-minio")
# 构建消息(可根据需求将 aliyun-obj/minio-obj 嵌入消息体)
# 注意:这里默认将对象名加入消息体的"target"字段,如需调整位置可修改
messages = [
{
"exchange": args.exchange,
"routing_key": args.routing_key_ali,
"message": {
**msg_ali, "target": args.aliyun_obj
} # 合并用户消息与对象名
},
{
"exchange": args.exchange,
"routing_key": args.routing_key_minio,
"message": {
**msg_minio, "target": args.minio_obj
} # 合并用户消息与对象名
}
]
return messages
def main():
# 1. 解析命令行参数
args = parse_args()
# 2. 初始化RabbitMQ生产者
producer = RobustRabbitMQProducer(aliyun_object_name=args.aliyun_obj)
try:
# 3. 构建消息列表
messages_to_send = build_messages(args)
print(f" 待发送消息列表:{messages_to_send
}")
# 4. 发送消息
for idx, msg in enumerate(messages_to_send, start=1):
producer.send_message(
exchange=msg["exchange"],
routing_key=msg["routing_key"],
message=msg["message"]
)
print(f"✅ 第{idx
}条消息发送成功")
time.sleep(0.5) # 避免消息发送过快
print("\n 所有消息发送完成!")
except ValueError as ve:
# 捕获参数校验错误(如JSON格式错误)
print(f"❌ 参数错误: {ve
}")
except Exception as e:
# 捕获其他运行时错误(如RabbitMQ连接失败)
print(f" 发送过程中出错: {
str(e)
}")
finally:
# 确保生产者连接关闭
if "producer" in locals(): # 避免未初始化时调用close()
producer.close()
print(" RabbitMQ生产者连接已关闭")
if __name__ == "__main__":
main()
执行
python send_message.py -a <ali-bucket>://<object_all_path>-m <other-App>://<object_all_path>-mm '{"tool": "rclone", "action": "copy", "source": "<object_name_all_path>"}'
customer.py 消费者
#!/usr/bin/env python3
import pika
import json
import argparse
from typing import Optional, Dict, Any
class RabbitMQMessageReader
:
def __init__(self, host: str = 'localhost', port: int = 5672,
username: str = 'Account', password: str = 'Password',
vhost: str = 'vhost_prod'):
self.connection_params = pika.ConnectionParameters(
host=host,
port=port,
virtual_host=vhost,
credentials=pika.PlainCredentials(username, password),
heartbeat=600,
blocked_connection_timeout=300
)
def read_single_message(self, queue_name: str) -> Optional[Dict[str, Any]]:
"""读取单条消息但不删除"""
connection = None
try:
connection = pika.BlockingConnection(self.connection_params)
channel = connection.channel()
# 声明队列(确保队列存在)
channel.queue_declare(queue=queue_name, passive=True)
# 获取消息
method_frame, properties, body = channel.basic_get(
queue=queue_name,
auto_ack=False
)
if method_frame:
message_info = {
'delivery_tag': method_frame.delivery_tag,
'routing_key': method_frame.routing_key,
'exchange': method_frame.exchange,
'redelivered': method_frame.redelivered,
'message_count': method_frame.message_count,
'body': body
}
# 拒绝消息并重新入队
channel.basic_reject(method_frame.delivery_tag, requeue=True)
return message_info
else:
print("队列为空")
return None
except pika.exceptions.ChannelClosedByBroker as e:
print(f"队列不存在或无法访问: {e
}")
except Exception as e:
print(f"错误: {e
}")
finally:
if connection and not connection.is_closed:
connection.close()
return None
def decode_message_body(self, body: bytes) -> Any:
"""解码消息体"""
try:
text = body.decode('utf-8')
# 尝试解析为 JSON
if text.strip().startswith('{') or text.strip().startswith('['):
return json.loads(text)
return text
except UnicodeDecodeError:
return body # 返回原始字节
except json.JSONDecodeError:
return text # 返回文本
def print_message(self, message_info: Dict[str, Any]):
"""美化打印消息"""
print("=" * 60)
print("RabbitMQ 消息详情")
print("=" * 60)
print(f"消息ID: {message_info['delivery_tag']
}")
print(f"路由键: {message_info['routing_key']
}")
print(f"交换器: {message_info['exchange']
}")
print(f"重投递: {message_info['redelivered']
}")
print(f"队列剩余消息: {message_info['message_count']
}")
print("\n消息内容:")
decoded_body = self.decode_message_body(message_info['body'])
if isinstance(decoded_body, dict):
print(json.dumps(decoded_body, ensure_ascii=False, indent=2))
else:
print(decoded_body)
print("\n消息已重新放回队列")
def main():
parser = argparse.ArgumentParser(description='读取 RabbitMQ 消息(不删除)')
parser.add_argument('--queue', required=True, help='队列名称')
parser.add_argument('--host', default='localhost', help='RabbitMQ 主机')
parser.add_argument('--port', type=int, default=5672, help='RabbitMQ 端口')
parser.add_argument('-u', '--username', default='Account', help='用户名')
parser.add_argument('-p', '--password', required=True, help='密码')
parser.add_argument('-V', '--vhost', default='vhost_prod', help='虚拟主机')
args = parser.parse_args()
reader = RabbitMQMessageReader(
host=args.host,
port=args.port,
username=args.username,
password=args.password,
vhost=args.vhost
)
message = reader.read_single_message(args.queue)
if message:
reader.print_message(message)
if __name__ == '__main__':
# 使用示例
# python3 script.py --queue ali_queue -p Password
main()
消费者确认
手动确认的三种方式:
basic_ack(delivery_tag) - 确认并删除消息
basic_reject(delivery_tag, requeue=True) - 拒绝并重新入队
basic_reject(delivery_tag, requeue=False) - 拒绝并丢弃
例如:
def manual_ack_example():
credentials = pika.PlainCredentials('Account', 'Password')
parameters = pika.ConnectionParameters(
host='localhost',
virtual_host='vhost_prod',
credentials=credentials
)
connection = pika.BlockingConnection(parameters)
channel = connection.channel()
# 获取消息,不自动确认
method_frame, header_frame, body = channel.basic_get(
queue='ali_queue',
auto_ack=False # 重要:手动确认模式
)
if method_frame:
print(f"收到消息: {body.decode()
}")
# 手动确认消息(从队列中删除)
channel.basic_ack(delivery_tag=method_frame.delivery_tag)
print("消息已确认并删除")
# 或者拒绝消息并重新入队
# channel.basic_reject(delivery_tag=method_frame.delivery_tag, requeue=True)
# print("消息已拒绝并重新入队")
else:
print("没有消息")
connection.close()