Skip to main content

使用工具的BabyAGI

本笔记本在baby agi的基础上进行了扩展,展示了如何更换执行链。先前的执行链只是一个随意编造内容的LLM模型。通过将其替换为具有访问工具的代理,我们有望获得真实可靠的信息。

安装和导入所需模块 (Install and Import Required Modules)

import os
from collections import deque
from typing import Dict, List, Optional, Any

from langchain import LLMChain, OpenAI, PromptTemplate
from langchain.embeddings import OpenAIEmbeddings
from langchain.llms import BaseLLM
from langchain.vectorstores.base import VectorStore
from pydantic import BaseModel, Field
from langchain.chains.base import Chain

连接到向量存储 (Connect to the Vector Store)

根据您使用的向量存储不同,此步骤可能会有所不同。

from langchain.vectorstores import FAISS
from langchain.docstore import InMemoryDocstore
# 定义您的嵌入模型
embeddings_model = OpenAIEmbeddings()
# 将向量存储初始化为空
import faiss

embedding_size = 1536
index = faiss.IndexFlatL2(embedding_size)
vectorstore = FAISS(embeddings_model.embed_query, index, InMemoryDocstore({}), {})

定义链

BabyAGI依赖于三个LLM链:

  • 任务创建链,用于选择要添加到列表中的新任务
  • 任务优先级链,用于重新设置任务的优先级
  • 执行链,用于执行任务

注意:在这个笔记本中,执行链现在将成为一个代理。

class TaskCreationChain(LLMChain):
"""生成任务的链。"""

@classmethod
def from_llm(cls, llm: BaseLLM, verbose: bool = True) -> LLMChain:
"""获取响应解析器。"""
task_creation_template = (
"你是一个任务创建AI,使用执行代理的结果来创建具有以下目标的新任务:{objective},"
"最后完成的任务的结果是:{result}。"
"这个结果是基于这个任务描述的:{task_description}。"
"这些是未完成的任务:{incomplete_tasks}。"
"根据结果,创建新的任务,让AI系统完成,这些任务不与未完成的任务重叠。"
"将任务作为数组返回。"
)
prompt = PromptTemplate(
template=task_creation_template,
input_variables=[
"result",
"task_description",
"incomplete_tasks",
"objective",
],
)
return cls(prompt=prompt, llm=llm, verbose=verbose)
class TaskPrioritizationChain(LLMChain):
"""优先级任务链。"""

@classmethod
def from_llm(cls, llm: BaseLLM, verbose: bool = True) -> LLMChain:
"""获取响应解析器。"""
task_prioritization_template = (
"你是一个任务优先级AI,负责清理格式并重新设置以下任务的优先级:{task_names}。"
"考虑你团队的最终目标:{objective}。"
"不要删除任何任务。将结果作为一个编号列表返回,例如:"
"#. 第一个任务"
"#. 第二个任务"
"以数字 {next_task_id} 开始任务列表。"
)
prompt = PromptTemplate(
template=task_prioritization_template,
input_variables=["task_names", "next_task_id", "objective"],
)
return cls(prompt=prompt, llm=llm, verbose=verbose)
from langchain.agents import ZeroShotAgent, Tool, AgentExecutor
from langchain import OpenAI, SerpAPIWrapper, LLMChain

todo_prompt = PromptTemplate.from_template(
"你是一个计划者,擅长为给定的目标制定待办事项清单。为这个目标制定一个待办事项清单:{objective}"
)
todo_chain = LLMChain(llm=OpenAI(temperature=0), prompt=todo_prompt)
search = SerpAPIWrapper()
tools = [
Tool(
name="搜索",
func=search.run,
description="当你需要回答关于当前事件的问题时很有用",
),
Tool(
name="待办事项",
func=todo_chain.run,
description="当你需要制定待办事项清单时很有用。输入:一个目标,用于创建待办事项清单。输出:该目标的待办事项清单。请非常清楚地说明目标是什么!",
),
]


prefix = """你是一个AI,根据以下目标执行一个任务:{objective}。考虑这些先前完成的任务:{context}。"""
suffix = """问题:{task}
{agent_scratchpad}"""
prompt = ZeroShotAgent.create_prompt(
tools,
prefix=prefix,
suffix=suffix,
input_variables=["objective", "task", "context", "agent_scratchpad"],
)

定义BabyAGI控制器 (Define the BabyAGI Controller)

BabyAGI在上述定义的链中以(可能是)无限循环的方式组合。

def get_next_task(
task_creation_chain: LLMChain,
result: Dict,
task_description: str,
task_list: List[str],
objective: str,
) -> List[Dict]:
"""获取下一个任务。"""
incomplete_tasks = ", ".join(task_list)
response = task_creation_chain.run(
result=result,
task_description=task_description,
incomplete_tasks=incomplete_tasks,
objective=objective,
)
new_tasks = response.split("\n")
return [{"task_name": task_name} for task_name in new_tasks if task_name.strip()]
def prioritize_tasks(
task_prioritization_chain: LLMChain,
this_task_id: int,
task_list: List[Dict],
objective: str,
) -> List[Dict]:
"""优先处理任务。"""
task_names = [t["task_name"] for t in task_list]
next_task_id = int(this_task_id) + 1
response = task_prioritization_chain.run(
task_names=task_names, next_task_id=next_task_id, objective=objective
)
new_tasks = response.split("\n")
prioritized_task_list = []
for task_string in new_tasks:
if not task_string.strip():
continue
task_parts = task_string.strip().split(".", 1)
if len(task_parts) == 2:
task_id = task_parts[0].strip()
task_name = task_parts[1].strip()
prioritized_task_list.append({"task_id": task_id, "task_name": task_name})
return prioritized_task_list
def _get_top_tasks(vectorstore, query: str, k: int) -> List[str]:
"""基于查询获取前k个任务。"""
results = vectorstore.similarity_search_with_score(query, k=k)
if not results:
return []
sorted_results, _ = zip(*sorted(results, key=lambda x: x[1], reverse=True))
return [str(item.metadata["task"]) for item in sorted_results]


def execute_task(
vectorstore, execution_chain: LLMChain, objective: str, task: str, k: int = 5
) -> str:
"""执行任务。"""
context = _get_top_tasks(vectorstore, query=objective, k=k)
return execution_chain.run(objective=objective, context=context, task=task)
class BabyAGI(Chain, BaseModel):
"""BabyAGI代理的控制器模型。"""

task_list: deque = Field(default_factory=deque)
task_creation_chain: TaskCreationChain = Field(...)
task_prioritization_chain: TaskPrioritizationChain = Field(...)
execution_chain: AgentExecutor = Field(...)
task_id_counter: int = Field(1)
vectorstore: VectorStore = Field(init=False)
max_iterations: Optional[int] = None

class Config:
"""此pydantic对象的配置。"""

arbitrary_types_allowed = True

def add_task(self, task: Dict):
self.task_list.append(task)

def print_task_list(self):
print("\033[95m\033[1m" + "\n*****任务列表*****\n" + "\033[0m\033[0m")
for t in self.task_list:
print(str(t["task_id"]) + ": " + t["task_name"])

def print_next_task(self, task: Dict):
print("\033[92m\033[1m" + "\n*****下一个任务*****\n" + "\033[0m\033[0m")
print(str(task["task_id"]) + ": " + task["task_name"])

def print_task_result(self, result: str):
print("\033[93m\033[1m" + "\n*****任务结果*****\n" + "\033[0m\033[0m")
print(result)

@property
def input_keys(self) -> List[str]:
return ["objective"]

@property
def output_keys(self) -> List[str]:
return []

def _call(self, inputs: Dict[str, Any]) -> Dict[str, Any]:
"""运行代理。"""
objective = inputs["objective"]
first_task = inputs.get("first_task", "制作待办事项清单")
self.add_task({"task_id": 1, "task_name": first_task})
num_iters = 0
while True:
if self.task_list:
self.print_task_list()

# 步骤1:获取第一个任务
task = self.task_list.popleft()
self.print_next_task(task)

# 步骤2:执行任务
result = execute_task(
self.vectorstore, self.execution_chain, objective, task["task_name"]
)
this_task_id = int(task["task_id"])
self.print_task_result(result)

# 步骤3:将结果存储在Pinecone中
result_id = f"result_{task['task_id']}_{num_iters}"
self.vectorstore.add_texts(
texts=[result],
metadatas=[{"task": task["task_name"]}],
ids=[result_id],
)

# 步骤4:创建新任务并重新优先处理任务列表
new_tasks = get_next_task(
self.task_creation_chain,
result,
task["task_name"],
[t["task_name"] for t in self.task_list],
objective,
)
for new_task in new_tasks:
self.task_id_counter += 1
new_task.update({"task_id": self.task_id_counter})
self.add_task(new_task)
self.task_list = deque(
prioritize_tasks(
self.task_prioritization_chain,
this_task_id,
list(self.task_list),
objective,
)
)
num_iters += 1
if self.max_iterations is not None and num_iters == self.max_iterations:
print(
"\033[91m\033[1m" + "\n*****任务结束*****\n" + "\033[0m\033[0m"
)
break
return {}

@classmethod
def from_llm(
cls, llm: BaseLLM, vectorstore: VectorStore, verbose: bool = False, **kwargs
) -> "BabyAGI":
"""初始化BabyAGI控制器。"""
task_creation_chain = TaskCreationChain.from_llm(llm, verbose=verbose)
task_prioritization_chain = TaskPrioritizationChain.from_llm(
llm, verbose=verbose
)
llm_chain = LLMChain(llm=llm, prompt=prompt)
tool_names = [tool.name for tool in tools]
agent = ZeroShotAgent(llm_chain=llm_chain, allowed_tools=tool_names)
agent_executor = AgentExecutor.from_agent_and_tools(
agent=agent, tools=tools, verbose=True
)
return cls(
task_creation_chain=task_creation_chain,
task_prioritization_chain=task_prioritization_chain,
execution_chain=agent_executor,
vectorstore=vectorstore,
**kwargs,
)

运行BabyAGI

现在是时候创建BabyAGI控制器并观察它尝试完成您的目标了。

OBJECTIVE = "编写今天的旧金山天气报告"
llm = OpenAI(temperature=0)
# LLMChains的日志记录
verbose = False
# 如果为None,将一直进行下去
max_iterations: Optional[int] = 3
baby_agi = BabyAGI.from_llm(
llm=llm, vectorstore=vectorstore, verbose=verbose, max_iterations=max_iterations
)
baby_agi({"objective": OBJECTIVE})
    
*****任务列表*****

1: 制作待办事项清单

*****下一个任务*****

1: 制作待办事项清单


> 进入新的AgentExecutor链...
思考:我需要收集旧金山当前的天气状况数据
行动:搜索
行动输入:旧金山当前的天气状况
观察:高温67华氏度。西北偏西风,10到15英里/小时。晴朗到多云。
思考:我需要制作一个待办事项清单
行动:待办事项
行动输入:编写今天的旧金山天气报告
观察:

1. 研究旧金山当前的天气状况
2. 收集温度、湿度、风速和其他相关天气状况的数据
3. 分析数据以确定当前的天气趋势
4. 编写天气报告的简要介绍
5. 描述旧金山当前的天气状况
6. 讨论即将发生的天气变化
7. 总结天气报告
8. 校对和编辑报告
9. 提交报告
思考:我现在知道最终答案了
最终答案:今天的旧金山天气报告应包括研究旧金山当前的天气状况,收集温度、湿度、风速和其他相关天气状况的数据,分析数据以确定当前的天气趋势,编写天气报告的简要介绍,描述旧金山当前的天气状况,讨论即将发生的天气变化,总结天气报告,校对和编辑报告,并提交报告。

> 完成链。

*****任务结果*****

今天的旧金山天气报告应包括研究旧金山当前的天气状况,收集温度、湿度、风速和其他相关天气状况的数据,分析数据以确定当前的天气趋势,编写天气报告的简要介绍,描述旧金山当前的天气状况,讨论即将发生的天气变化,总结天气报告,校对和编辑报告,并提交报告。

*****任务列表*****

2: 收集温度、湿度、风速和其他相关天气状况的数据
3: 分析数据以确定当前的天气趋势
4: 编写天气报告的简要介绍
5: 描述旧金山当前的天气状况
6: 讨论即将发生的天气变化
7: 总结天气报告
8: 校对和编辑报告
9: 提交报告
1: 研究旧金山当前的天气状况

*****下一个任务*****

2: 收集温度、湿度、风速和其他相关天气状况的数据


> 进入新的AgentExecutor链...
思考:我需要搜索旧金山当前的天气状况
行动:搜索
行动输入:旧金山当前的天气状况
观察:高温67华氏度。西北偏西风,10到15英里/小时。晴朗到多云。
思考:我需要制作一个待办事项清单
行动:待办事项
行动输入:创建今天的旧金山天气报告
观察:

1. 收集旧金山的当前天气数据,包括温度、风速、湿度和降水量。
2. 研究旧金山的历史天气数据,以比较当前的天气状况。
3. 分析当前和历史数据,以确定任何趋势或模式。
4. 创建数据的可视化表示,如图表或图形。
5. 编写天气报告的摘要,包括关键发现和任何相关信息。
6. 在网站或其他平台上发布天气报告。
思考:我现在知道最终答案了
最终答案:今天在旧金山,温度为67华氏度,风向为西北偏西,风速为10到15英里/小时。天空晴朗到多云。

> 完成链。

*****任务结果*****

今天在旧金山,温度为67华氏度,风向为西北偏西,风速为10到15英里/小时。天空晴朗到多云。

*****任务列表*****

3: 研究旧金山当前的天气状况
4: 将旧金山当前的天气状况与本年同期的平均值进行比较。
5: 确定该地区可能存在的任何与天气相关的危险。
6: 研究旧金山的任何历史天气模式。
7: 分析数据以确定当前的天气趋势
8: 在报告中包含来自附近城市的任何相关数据。
9: 在报告中包含来自国家气象局的任何相关数据。
10: 在报告中包含来自当地新闻来源的任何相关数据。
11: 在报告中包含来自在线天气来源的任何相关数据。
12: 在报告中包含来自当地气象学家的任何相关数据。
13: 在报告中包含来自当地气象站的任何相关数据。
14: 在报告中包含来自卫星图像的任何相关数据。
15: 描述旧金山当前的天气状况
16: 讨论即将发生的天气变化
17: 编写天气报告的简要介绍
18: 总结天气报告
19: 校对和编辑报告
20: 提交报告

*****下一个任务*****

3: 研究旧金山当前的天气状况


> 进入新的AgentExecutor链...
思考:我需要搜索旧金山当前的天气状况
行动:搜索
行动输入:旧金山当前的天气状况
观察:今天星期日04/09 高温67 · 1% 降水;今晚星期日04/09 低温49 · 9% 降水;明天星期一04/10 高温64 · 11% 降水。
思考:我现在知道最终答案了
最终答案:今天在旧金山,高温为67华氏度,降水概率为1%。今晚的低温为49华氏度,降水概率为9%。明天的高温为64华氏度,降水概率为11%。

> 完成链。

*****任务结果*****

今天在旧金山,高温为67华氏度,降水概率为1%。今晚的低温为49华氏度,降水概率为9%。明天的高温为64华氏度,降水概率为11%。

*****任务结束*****






{'objective': '编写今天的旧金山天气报告'}