浅读deer-flow源代码,梳理流程
浅读deer-flow源代码,梳理流程
只是浅显的debug,读了一下,文内可能有不对的地方,请多包涵。
调试时候注意几点:
- token消耗量巨大,一下午消耗1m的token。迭代次数调低,完整跑一次估计要400k~500k。
- 已经试过qwen-max不行,deepseek-chat不行。只有doubao可以。
basic
构建的图位置
1 | def _build_base_graph(): |
下面逐个分析
coordinator_node
提交问题之后,就会到这个节点。
1 | def coordinator_node( |
background_investigation
1 | def background_investigation_node(state: State, config: RunnableConfig): |
在构建图的时候,有一段代码
1 | builder.add_edge("background_investigator", "planner") |
所以下一个是planner节点
planner_node
这个planner的markdown写的是真全,太长了就不贴上来了。链接地址
1 | def planner_node( |
这是response的输出结果
1 | { |
上面最后的goto到human_feedback
先看human_feedback
human_feedback
1 | def human_feedback_node( |
代码比较简单,如果不需要人工干预,就是修复一下json然后返回。
当前goto是research_team
research_team
1 | def research_team_node(state: State): |
在构建图的时候有一步,决定research_team下一步去哪
1 | builder.add_conditional_edges( |
下一步会进入researcher
researcher_node
调用搜索函数,搜索,我这里会调用Tavily
1 | async def researcher_node( |
_setup_and_execute_agent_step 会判断有没有mcp,如果没有mcp,会调用search函数,然后传入researcher的prompt
1 | if mcp_servers: |
_execute_agent_step函数里面,这里面就会让agent自动调用对应的函数执行,核心代码就是最后的ainvoke
1 | async def _execute_agent_step( |
然后回到research_team,然后会判断下一个step是什么类型,如果还是research,就会再次循环,直到完成。
1 | def continue_to_running_research_team(state: State): |
所有结果都执行完之后会回到planner
搜索完结果之后回到planner
由于max_plan_iterations为1,所以只迭代一次就返回了,尽管实际结果中has_enough_context还是false。
1 | def planner_node( |
reporter
这一步就是生成结果了,代码很简单,精华都在reporter.md里面,链接
1 | def reporter_node(state: State, config: RunnableConfig): |
总结
coordinator接受用户输入,让llm判断需不需要分发请求,还是直接返回。如果要分发请求,下一步会到background_investigator。
background_investigator 会调用搜索引擎完成基本搜索。下一步会交给planer。
planer会对上面的基本搜索结果评估,然后生成计划。生成什么样的计划,有哪些关键点,全都写在prompt里面。
planner生成多个step之后,调用human_feedback,当前是不反馈,所以继续下一步。
research_team,负责执行上面planner的计划。当前的计划是搜索,所以research_team就会调用researcher开始搜索,搜索每一步step。
搜索完之后会回到planner,planner检查结果,如果ok,就会交给reporter生成结果。
reporter生成结果也是采用prompt控制的,交给llm生成。
这是langgraph生成的流程图,可以帮助理解: