当前位置: 首页 > news >正文

DeepResearch代码浅析

DeepResearch代码浅析

概述

代码:DeepResearch

主要看一下inference下面的ReAct推理流程。

inference
├── eval_data
│   ├── example_with_file.jsonl
│   ├── example.jsonl
│   └── file_corpus
│       └── hello.txt
├── file_tools
│   ├── __pycache__
│   │   └── file_parser.cpython-313.pyc
│   ├── file_parser.py
│   ├── idp.py
│   ├── utils.py
│   ├── video_agent.py
│   └── video_analysis.py
├── prompt.py
├── react_agent.py
├── run_multi_react.py
├── run_react_infer.sh
├── tool_file.py
├── tool_python.py
├── tool_scholar.py
├── tool_search.py
└── tool_visit.py

代码的入口是run_react_infer.sh中的run_multi_react.py文件

run_multi_react.py负责初始化节点环境,加载数据集,加载模型配置,进行多次rollout采样。

react_agent是ReAct 架构的Agent,负责迭代输出,调用工具。

from react_agent import MultiTurnReactAgent      test_agent = MultiTurnReactAgent(llm=llm_cfg,function_list=["search", "visit", "google_scholar", "PythonInterpreter"]
)

react_agent

主体的ReAct agent,统一调度处理模型的输出,进行tool extract and execute和tool response的拼接

执行ReAct的全部流程,给出最后的执行状态,处理运行中的异常现象

  • 定义工具

    from tool_file import *
    from tool_scholar import *
    from tool_python import *
    from tool_search import *
    from tool_visit import *OBS_START = '<tool_response>'
    OBS_END = '\n</tool_response>'# 定义工具,放在TOOL_MAP中
    TOOL_CLASS = [FileParser(),Scholar(),Visit(),Search(),PythonInterpreter(),
    ]
    TOOL_MAP = {tool.name: tool for tool in TOOL_CLASS}
    
  • MultiTurnReactAgent类中使用def call_server() 调用llm api

    def call_server(self, msgs, planning_port, max_tries=10):openai_api_key = "EMPTY"openai_api_base = f"http://127.0.0.1:{planning_port}/v1"client = OpenAI(api_key=openai_api_key,base_url=openai_api_base,timeout=600.0,)
    
  • 执行ReAct流程

    可能出现的情况

    • 返回answer (出现<answer> </answer>
      • 未达到轮次限制
      • 达到/未达到上下文token数量限制
    • 未返回answer
      • 超出轮次限制后
      • 达到上下问token数量限制后,返回答案没有<answer>
      • 超时
      • 工具调用错误:tool_call 的json格式错误
    def _run(self, data: str, model: str, **kwargs) -> List[List[Message]]:############################################################## 初始化question和最多调用轮次num_llm_calls_available,# 记录start_time,拼接最开始的message#############################################################self.model=modeltry:question = data['item']['question']except: raw_msg = data['item']['messages'][1]["content"] question = raw_msg.split("User:")[1].strip() if "User:" in raw_msg else raw_msg start_time = time.time()planning_port = data['planning_port']answer = data['item']['answer']self.user_prompt = questionsystem_prompt = SYSTEM_PROMPTcur_date = today_date()system_prompt = system_prompt + str(cur_date)messages = [{"role": "system", "content": system_prompt}, {"role": "user", "content": question}]num_llm_calls_available = MAX_LLM_CALL_PER_RUNround = 0############################################################## 开始迭代每一个iter,生成<tool_call> 或是<answer>#############################################################while num_llm_calls_available > 0:# Check whether time is reached############################################################## 检查是否超时(2.5小时)#############################################################if time.time() - start_time > 150 * 60:  # 150 minutes in secondsprediction = 'No answer found after 2h30mins'termination = 'No answer found after 2h30mins'result = {"question": question,"answer": answer,"messages": messages,"prediction": prediction,"termination": termination}return result############################################################## 更新调用llm次数 num_llm_calls_available# 获取llm的返回值 content#############################################################round += 1num_llm_calls_available -= 1content = self.call_server(messages, planning_port)print(f'Round {round}: {content}')############################################################## 进行content中关键tool的提取############################################################## 舍弃content中<tool_response>的部分,应为obs应该是user输入的,而不是llm生成的if '<tool_response>' in content:pos = content.find('<tool_response>')content = content[:pos]messages.append({"role": "assistant", "content": content.strip()})# 查看content中是否有工具调用 <tool_call>if '<tool_call>' in content and '</tool_call>' in content:tool_call = content.split('<tool_call>')[1].split('</tool_call>')[0]try:# 使用python解释器运行code_rawif "python" in tool_call.lower():try:code_raw=content.split('<tool_call>')[1].split('</tool_call>')[0].split('<code>')[1].split('</code>')[0].strip()result = TOOL_MAP['PythonInterpreter'].call(code_raw)except:result = "[Python Interpreter Error]: Formatting error."# 调用其他的工具else:tool_call = json5.loads(tool_call)tool_name = tool_call.get('name', '')tool_args = tool_call.get('arguments', {})result = self.custom_call_tool(tool_name, tool_args)# 如果llm生成的tool formart错误,则将错误信息写入messages中(可以使用约束采样避免格式错误)except:result = 'Error: Tool call is not a valid JSON. Tool call must contain a valid "name" and "arguments" field.'result = "<tool_response>\n" + result + "\n</tool_response>"# print(result)# 把tool response写入到user中messages.append({"role": "user", "content": result})# 如果模型生成的content中有<answer> </answer>,则已经输出答案if '<answer>' in content and '</answer>' in content:termination = 'answer'break# 如果没有可用轮次,记录失败信息if num_llm_calls_available <= 0 and '<answer>' not in content:messages[-1]['content'] = 'Sorry, the number of llm calls exceeds the limit.'max_tokens = 110 * 1024token_count = self.count_tokens(messages)print(f"round: {round}, token count: {token_count}")############################################################## ReAct的累积上下文token长度达到阈值,强制给出回答#############################################################if token_count > max_tokens:print(f"Token quantity exceeds the limit: {token_count} > {max_tokens}")messages[-1]['content'] = "You have now reached the maximum context length you can handle. You should stop making tool calls and, based on all the information above, think again and provide what you consider the most likely answer in the following format:<think>your final thinking</think>\n<answer>your answer</answer>"content = self.call_server(messages, planning_port)messages.append({"role": "assistant", "content": content.strip()})# token数达到阈值后,成功返回结果if '<answer>' in content and '</answer>' in content:prediction = messages[-1]['content'].split('<answer>')[1].split('</answer>')[0]termination = 'generate an answer as token limit reached'# 未返回结果else:prediction = messages[-1]['content']termination = 'format error: generate an answer as token limit reached'result = {"question": question,"answer": answer,"messages": messages,"prediction": prediction,"termination": termination}return result# 这里termination忽略了token超限制后是否给出answer的情况if '<answer>' in messages[-1]['content']:prediction = messages[-1]['content'].split('<answer>')[1].split('</answer>')[0]termination = 'answer'else:prediction = 'No answer found.'termination = 'answer not found'if num_llm_calls_available == 0:termination = 'exceed available llm calls'result = {"question": question,"answer": answer,"messages": messages,"prediction": prediction,"termination": termination}return result

工具调用

  • tool_python

    执行python代码。\((code;Interpreter)\rightarrow (stdout, stderr)\)

    def call(self, params, files= None, timeout = 50, **kwargs) -> str:try:# params 即为要执行的code代码code=paramslast_error = None# 尝试多次for attempt in range(8):try:# Randomly sample an endpoint for each attemptendpoint = random.choice(SANDBOX_FUSION_ENDPOINTS)print(f"Attempt {attempt + 1}/5 using endpoint: {endpoint}")# 执行codecode_result = run_code(RunCodeRequest(code=code, language='python', run_timeout=timeout), max_attempts=1, client_timeout=timeout, endpoint=endpoint)print("[Python] Code Result", code_result)result = []# 记录code 的标准输出和错误if code_result.run_result.stdout:result.append(f"stdout:\n{code_result.run_result.stdout}")if code_result.run_result.stderr:result.append(f"stderr:\n{code_result.run_result.stderr}")if code_result.run_result.execution_time >= timeout-1:result.append(f"[PythonInterpreter Error] TimeoutError: Execution timed out.")result = '\n'.join(result)print('SUCCESS RUNNING TOOL')return result if result.strip() else 'Finished execution.'# code执行超时except Timeout as e:last_error = f'[Python Interpreter Error] TimeoutError: Execution timed out on endpoint {endpoint}.'print(f"Timeout on attempt {attempt + 1}: {last_error}")if attempt == 4:  # Last attemptreturn last_errorcontinue# code执行错误except Exception as e:last_error = f'[Python Interpreter Error]: {str(e)} on endpoint {endpoint}'print(f"Error on attempt {attempt + 1}: {last_error}")if attempt == 4:  # Last attemptreturn last_errorcontinuereturn last_error if last_error else '[Python Interpreter Error]: All attempts failed.'except Exception as e:return f"[Python Interpreter Error]: {str(e)}"
    
  • tool_visit

搜索具体的url,并根据goal总结返回。\((url, goal;\pi)\rightarrow summary\)

JINA_API_KEYS = os.getenv("JINA_API_KEYS", "")def readpage_jina(self, url: str, goal: str) -> str:"""Attempt to read webpage content by alternating between jina and aidata services.Args:url: The URL to readgoal: The goal/purpose of reading the pageReturns:str: The webpage content or error message"""# def call_server用于根据goal总结网页的内容summary_page_func = self.call_servermax_retries = int(os.getenv('VISIT_SERVER_MAX_RETRIES', 1))# 使用jina将url的网页信息转化为 markdown格式content = self.html_readpage_jina(url)############################################################## 处理markdown的网页信息 content############################################################## 如果网页信息可以被jina提取if content and not content.startswith("[visit] Failed to read page.") and content != "[visit] Empty content." and not content.startswith("[document_parser]"):# pre-process 先处理content的token长度,避免llm的上下文超长content = truncate_to_tokens(content, max_tokens=95000)# 总结promoptmessages = [{"role":"user","content": EXTRACTOR_PROMPT.format(webpage_content=content, goal=goal)}]parse_retry_times = 0# 得到网页总结后的信息 rawraw = summary_page_func(messages, max_retries=max_retries)summary_retries = 3# 如果raw少于10个字符,那么认为总结失败,因为raw是json格式,```json {"rational":..., "evidence":..., "summary":...}```while len(raw) < 10 and summary_retries >= 0:# 尝试截断30%的长度truncate_length = int(0.7 * len(content)) if summary_retries > 0 else 25000status_msg = (f"[visit] Summary url[{url}] " f"attempt {3 - summary_retries + 1}/3, "f"content length: {len(content)}, "f"truncating to {truncate_length} chars") if summary_retries > 0 else (f"[visit] Summary url[{url}] failed after 3 attempts, "f"final truncation to 25000 chars") # 截断30%不行,尝试只留下25000字符print(status_msg)content = content[:truncate_length]extraction_prompt = EXTRACTOR_PROMPT.format(webpage_content=content,goal=goal)messages = [{"role": "user", "content": extraction_prompt}]raw = summary_page_func(messages, max_retries=max_retries)summary_retries -= 1# 解析总结的格式parse_retry_times = 2if isinstance(raw, str):raw = raw.replace("```json", "").replace("```", "").strip()while parse_retry_times < 3:try:raw = json.loads(raw)breakexcept:# 解析失败的话,就重新生成总结raw = summary_page_func(messages, max_retries=max_retries)parse_retry_times += 1# 解析失败if parse_retry_times >= 3:useful_information = "The useful information in {url} for user goal {goal} as follows: \n\n".format(url=url, goal=goal)useful_information += "Evidence in page: \n" + "The provided webpage content could not be accessed. Please check the URL or file format." + "\n\n"useful_information += "Summary: \n" + "The webpage content could not be processed, and therefore, no information is available." + "\n\n"# 解析成功,把evidence和summary一并返回else:useful_information = "The useful information in {url} for user goal {goal} as follows: \n\n".format(url=url, goal=goal)useful_information += "Evidence in page: \n" + str(raw["evidence"]) + "\n\n"useful_information += "Summary: \n" + str(raw["summary"]) + "\n\n"if len(useful_information) < 10 and summary_retries < 0:print("[visit] Could not generate valid summary after maximum retries")useful_information = "[visit] Failed to read page"return useful_information# If no valid content was obtained after all retries# 如果网页的原始信息就不合理,jina无法提取,返回失败信息else:useful_information = "The useful information in {url} for user goal {goal} as follows: \n\n".format(url=url, goal=goal)useful_information += "Evidence in page: \n" + "The provided webpage content could not be accessed. Please check the URL or file format." + "\n\n"useful_information += "Summary: \n" + "The webpage content could not be processed, and therefore, no information is available." + "\n\n"return useful_information

jina举例

输入https://r.jina.ai/+{url(https://www.axtonliu.ai/newsletters/ai-2/posts/jina-reader-api-four-usage-methods-guide)}

原始网页:

image-20251017120415289

jina由三部分组成:

  • title
  • url
  • markdown content(图片的url信息,超链接等)
Title: Jina Reader API完全指南:4种实用集成方案详解 | AI开发教程URL Source: https://www.axtonliu.ai/newsletters/ai-2/posts/jina-reader-api-four-usage-methods-guideMarkdown Content:
构建知识库,或者分析各种文章数据,是大家使用 AI 很重要的一个应用场景,
  • tool_file

    根据url的文件,和goal,返回总结信息,类似于tool_visit。但是要借助于file_tools进行指定url文件的读取(visit是借用jina进行指定url网页信息的读取)。

    """
    input:- query/goal: str- Docs: List[file]/List[url]- file type: 'pdf', 'docx', 'pptx', 'txt', 'html', 'csv', 'tsv', 'xlsx', 'xls', 'doc', 'zip', '.mp4', '.mov', '.avi', '.mkv', '.webm', '.mp3', '.wav', '.aac', '.ogg', '.flac'
    output:- answer: str- useful_information: str
    """
    
  • tool_search

    调用google 进行search。\((q;Enginer)\rightarrow docs\)

  • tool_scholar

    类似于tool_search,区别在于 tool_scholar在goole scholar上进行文章的搜索

Prompt

分为react的system prompt,以及visit 总结的extract prompt

SYSTEM_PROMPT = """You are a deep research assistant. Your core function is to conduct thorough, multi-source investigations into any topic. You must handle both broad, open-domain inquiries and queries within specialized academic fields. For every request, synthesize information from credible, diverse sources to deliver a comprehensive, accurate, and objective response. When you have gathered sufficient information and are ready to provide the definitive response, you must enclose the entire final answer within <answer></answer> tags.# ToolsYou may call one or more functions to assist with the user query.You are provided with function signatures within <tools></tools> XML tags:
<tools>
{"type": "function", "function": {"name": "search", "description": "Perform Google web searches then returns a string of the top search results. Accepts multiple queries.", "parameters": {"type": "object", "properties": {"query": {"type": "array", "items": {"type": "string", "description": "The search query."}, "minItems": 1, "description": "The list of search queries."}}, "required": ["query"]}}}
{"type": "function", "function": {"name": "visit", "description": "Visit webpage(s) and return the summary of the content.", "parameters": {"type": "object", "properties": {"url": {"type": "array", "items": {"type": "string"}, "description": "The URL(s) of the webpage(s) to visit. Can be a single URL or an array of URLs."}, "goal": {"type": "string", "description": "The specific information goal for visiting webpage(s)."}}, "required": ["url", "goal"]}}}
{"type": "function", "function": {"name": "PythonInterpreter", "description": "Executes Python code in a sandboxed environment. To use this tool, you must follow this format:
1. The 'arguments' JSON object must be empty: {}.
2. The Python code to be executed must be placed immediately after the JSON block, enclosed within <code> and </code> tags.IMPORTANT: Any output you want to see MUST be printed to standard output using the print() function.Example of a correct call:
<tool_call>
{"name": "PythonInterpreter", "arguments": {}}
<code>
import numpy as np
# Your code here
print(f"The result is: {np.mean([1,2,3])}")
</code>
</tool_call>", "parameters": {"type": "object", "properties": {}, "required": []}}}
{"type": "function", "function": {"name": "google_scholar", "description": "Leverage Google Scholar to retrieve relevant information from academic publications. Accepts multiple queries. This tool will also return results from google search", "parameters": {"type": "object", "properties": {"query": {"type": "array", "items": {"type": "string", "description": "The search query."}, "minItems": 1, "description": "The list of search queries for Google Scholar."}}, "required": ["query"]}}}
{"type": "function", "function": {"name": "parse_file", "description": "This is a tool that can be used to parse multiple user uploaded local files such as PDF, DOCX, PPTX, TXT, CSV, XLSX, DOC, ZIP, MP4, MP3.", "parameters": {"type": "object", "properties": {"files": {"type": "array", "items": {"type": "string"}, "description": "The file name of the user uploaded local files to be parsed."}}, "required": ["files"]}}}
</tools>For each function call, return a json object with function name and arguments within <tool_call></tool_call> XML tags:
<tool_call>
{"name": <function-name>, "arguments": <args-json-object>}
</tool_call>Current date: """EXTRACTOR_PROMPT = """Please process the following webpage content and user goal to extract relevant information:## **Webpage Content** 
{webpage_content}## **User Goal**
{goal}## **Task Guidelines**
1. **Content Scanning for Rational**: Locate the **specific sections/data** directly related to the user's goal within the webpage content
2. **Key Extraction for Evidence**: Identify and extract the **most relevant information** from the content, you never miss any important information, output the **full original context** of the content as far as possible, it can be more than three paragraphs.
3. **Summary Output for Summary**: Organize into a concise paragraph with logical flow, prioritizing clarity and judge the contribution of the information to the goal.**Final Output Format using JSON format has "rational", "evidence", "summary" feilds**
"""
http://www.hskmm.com/?act=detail&tid=32904

相关文章:

  • 2025年连铸机厂家权威推荐榜单:扇形段/大包回转台/钢包中间罐/结晶器总成/拉矫机/引锭杆/输送辊道/液压剪等核心部件专业供应商
  • 拉格朗日插值
  • 2025年10月中国专精特新申报服务机构推荐榜:五强实测
  • 2025年轧钢设备厂家权威推荐榜:冷轧机、热轧机源头生产厂家,技术实力与市场口碑深度解析
  • 正态总体中标准化单样本残差的分布推导
  • 10.16 CSP-S 模拟赛总结
  • 远程无钥匙进入(PKE)技术:便利与安全的完美融合
  • 203. 移除链表元素
  • 灵动岛iPhone状态栏获得高度不对 iOS iPhone14pro iPhone14pro max状态栏获得高度不对
  • string略解
  • 《程序员修炼之道》 阅读笔记二
  • 是时候告别向日葵、Todesk、TeamViewer了,快速搭建自托管服务器RustDesk
  • 史馆
  • firecrawl 私有部署(test)
  • $\text{Catalan}$ 数 卡特兰数
  • java作业3
  • 大模型 | VLM 初识及在自动驾驶场景中的应用
  • CF1977 Codeforces Round 948 (Div. 2) 游记(VP)
  • 别被波形“骗” 了!差分探头与无源探头测量不一致的 5 大关键因素
  • 2025 年展览会服务商最新推荐榜权威发布:22 年经验甄选十强品牌,助力企业参展高效决策
  • 2025年信息流代运营服务商权威推荐榜单:专业投放策略与效果优化服务口碑之选
  • 2025 年焊把线厂家最新推荐榜:国标欧标铜芯软焊把线优质企业排行,优质品牌助力选购欧标/铜芯/软/耐高温焊把线厂家推荐
  • 基于MATLAB的倒立摆控制实现方案
  • 2025 年展会服务商最新推荐排行榜:聚焦一站式服务与高效执行能力的优质企业榜单瓷砖/暖通/照明/门窗/玻璃/厨卫/卫浴/灯饰展会厂家推荐
  • 数据迁移mysql--sr
  • iOS 26 App 开发阶段性能优化全流程,从监控到调优的多工具协作实践
  • MATLAB实现语音去混响与去噪
  • 风险评估的流程和各阶段的工作内容
  • 无穷小和无穷大
  • Adobe Media Encoder 2025 免费版一键安装包完整安装教程(含下载安装包)