Skip to main content

CAMEL 角色扮演自主合作代理

这是一篇论文的langchain实现:"CAMEL: Communicative Agents for “Mind” Exploration of Large Scale Language Model Society"。

概述:

会话和基于聊天的语言模型的快速发展已经在复杂任务解决方面取得了显著进展。然而,它们的成功在很大程度上依赖于人类输入来引导对话,这可能具有挑战性和耗时。本文探讨了构建可扩展技术以促进沟通代理之间的自主合作并洞察其“认知”过程的潜力。为了解决实现自主合作的挑战,我们提出了一种名为角色扮演的新型沟通代理框架。我们的方法涉及使用启示式提示来引导聊天代理完成任务,同时保持与人类意图的一致性。我们展示了角色扮演如何用于生成用于研究聊天代理的行为和能力的对话数据,为研究会话语言模型提供了宝贵的资源。我们的贡献包括引入一种新颖的沟通代理框架,提供一种可扩展的方法来研究多代理系统的合作行为和能力,并开源我们的库以支持沟通代理及其他领域的研究。

原始实现:https://github.com/lightaime/camel

项目网站:https://www.camel-ai.org/

Arxiv论文:https://arxiv.org/abs/2303.17760

from typing import List
from langchain.chat_models import ChatOpenAI
from langchain.prompts.chat import (
SystemMessagePromptTemplate,
HumanMessagePromptTemplate,
)
from langchain.schema import (
AIMessage,
HumanMessage,
SystemMessage,
BaseMessage,
)

定义一个CAMEL代理辅助类 (Define a CAMEL agent helper class)

class CAMELAgent:
def __init__(
self,
system_message: SystemMessage,
model: ChatOpenAI,
) -> None:
self.system_message = system_message
self.model = model
self.init_messages()

def reset(self) -> None:
self.init_messages()
return self.stored_messages

def init_messages(self) -> None:
self.stored_messages = [self.system_message]

def update_messages(self, message: BaseMessage) -> List[BaseMessage]:
self.stored_messages.append(message)
return self.stored_messages

def step(
self,
input_message: HumanMessage,
) -> AIMessage:
messages = self.update_messages(input_message)

output_message = self.model(messages)
self.update_messages(output_message)

return output_message

设置 OpenAI API 密钥和角色以及角色扮演的任务

import os

os.environ["OPENAI_API_KEY"] = ""

assistant_role_name = "Python 程序员"
user_role_name = "股票交易员"
task = "为股票市场开发一个交易机器人"
word_limit = 50 # 任务头脑风暴的字数限制

创建一个任务指定代理用于头脑风暴并获取指定的任务

task_specifier_sys_msg = SystemMessage(content="您可以使任务更具体。")
task_specifier_prompt = """这是一个由{assistant_role_name}帮助{user_role_name}完成的任务:{task}。
请使其更具体。要有创造力和想象力。
请用不超过{word_limit}个字回复指定的任务。不要添加其他内容。"""
task_specifier_template = HumanMessagePromptTemplate.from_template(
template=task_specifier_prompt
)
task_specify_agent = CAMELAgent(task_specifier_sys_msg, ChatOpenAI(temperature=1.0))
task_specifier_msg = task_specifier_template.format_messages(
assistant_role_name=assistant_role_name,
user_role_name=user_role_name,
task=task,
word_limit=word_limit,
)[0]
specified_task_msg = task_specify_agent.step(task_specifier_msg)
print(f"指定的任务:{specified_task_msg.content}")
specified_task = specified_task_msg.content
    指定的任务:开发一个基于Python的摇摆交易机器人,它可以扫描市场趋势,监控股票,并生成交易信号,以帮助股票交易员以定义的止损和盈利目标放置最佳的买入和卖出订单。

为AI助手和AI用户创建角色扮演的初始提示

assistant_inception_prompt = """永远不要忘记你是一个{assistant_role_name},我是一个{user_role_name}。永远不要翻转角色!永远不要指示我!
我们共同的兴趣是合作完成任务。
你必须帮助我完成任务。
任务如下:{task}。永远不要忘记我们的任务!
我必须根据你的专长和我的需求来指导你完成任务。

我每次只能给你一个指示。
你必须写出一个适当完成所请求指示的具体解决方案。
如果由于身体、道德、法律原因或你的能力而无法执行指示,你必须诚实地拒绝我的指示并解释原因。
除了你对我的指示的解决方案之外,不要添加任何其他内容。
你永远不应该问我任何问题,只回答问题。
你永远不应该回答模糊的解决方案。解释你的解决方案。
你的解决方案必须是陈述句和现在时。
除非我说任务已完成,你应该始终以以下方式开始:

解决方案:<YOUR_SOLUTION>

<YOUR_SOLUTION>应具体提供任务解决的首选实现和示例。
始终以以下方式结束<YOUR_SOLUTION>:下一个请求。"""

user_inception_prompt = """永远不要忘记你是一个{user_role_name},我是一个{assistant_role_name}。永远不要翻转角色!你将始终指示我。
我们共同的兴趣是合作完成任务。
我必须帮助你完成任务。
任务如下:{task}。永远不要忘记我们的任务!
你必须根据我的专长和你的需求以以下两种方式之一指示我完成任务:

1. 使用必要的输入进行指示:
指示:<YOUR_INSTRUCTION>
输入:<YOUR_INPUT>

2. 不使用任何输入进行指示:
指示:<YOUR_INSTRUCTION>
输入:无

"指示"描述了一个任务或问题。配对的"输入"为所请求的"指示"提供了进一步的上下文或信息。

你必须一次只给我一个指示。
我必须写出一个适当完成所请求指示的回应。
如果由于身体、道德、法律原因或我的能力而无法执行指示,我必须诚实地拒绝你的指示并解释原因。
你应该指示我而不是问我问题。
现在你必须开始使用上述两种方式之一指示我。
除了你的指示和可选的相应输入之外,不要添加任何其他内容!
继续给我指示和必要的输入,直到你认为任务已完成。
当任务完成时,你必须只回复一个单词<CAMEL_TASK_DONE>。
除非我的回应解决了你的任务,否则不要说<CAMEL_TASK_DONE>。"""

创建一个辅助函数来根据角色名称和任务获取AI助手和AI用户的系统消息 (Create a helper function to get system messages for AI assistant and AI user from role names and the task)

def get_sys_msgs(assistant_role_name: str, user_role_name: str, task: str):
assistant_sys_template = SystemMessagePromptTemplate.from_template(
template=assistant_inception_prompt
)
assistant_sys_msg = assistant_sys_template.format_messages(
assistant_role_name=assistant_role_name,
user_role_name=user_role_name,
task=task,
)[0]

user_sys_template = SystemMessagePromptTemplate.from_template(
template=user_inception_prompt
)
user_sys_msg = user_sys_template.format_messages(
assistant_role_name=assistant_role_name,
user_role_name=user_role_name,
task=task,
)[0]

return assistant_sys_msg, user_sys_msg

从获取的系统消息中创建AI助手代理和AI用户代理 (Create AI assistant agent and AI user agent from obtained system messages)

assistant_sys_msg, user_sys_msg = get_sys_msgs(
assistant_role_name, user_role_name, specified_task
)
assistant_agent = CAMELAgent(assistant_sys_msg, ChatOpenAI(temperature=0.2))
user_agent = CAMELAgent(user_sys_msg, ChatOpenAI(temperature=0.2))

# 重置代理
assistant_agent.reset()
user_agent.reset()

# 初始化对话
assistant_msg = HumanMessage(
content=(
f"{user_sys_msg.content}. "
"现在开始逐个给我介绍。"
"只回复指令和输入。"
)
)

user_msg = HumanMessage(content=f"{assistant_sys_msg.content}")
user_msg = assistant_agent.step(user_msg)

开始角色扮演会话以解决任务!

print(f"原始任务提示:\n{task}\n")
print(f"指定任务提示:\n{specified_task}\n")

chat_turn_limit, n = 30, 0
while n < chat_turn_limit:
n += 1
user_ai_msg = user_agent.step(assistant_msg)
user_msg = HumanMessage(content=user_ai_msg.content)
print(f"AI用户({user_role_name}):\n\n{user_msg.content}\n\n")

assistant_ai_msg = assistant_agent.step(user_msg)
assistant_msg = HumanMessage(content=assistant_ai_msg.content)
print(f"AI助手({assistant_role_name}):\n\n{assistant_msg.content}\n\n")
if "<CAMEL_TASK_DONE>" in user_msg.content:
break
    原始任务提示:
开发一个股票交易机器人

指定任务提示:
开发一个基于Python的摆动交易机器人,它可以扫描市场趋势,监控股票,并生成交易信号,帮助股票交易员以定义的止损和利润目标放置最佳的买入和卖出订单。

AI用户(股票交易员):

指令:安装数据分析和交易所需的Python库。
输入:无


AI助手(Python程序员):

解决方案:我们可以使用pip,一个Python的软件包安装器,来安装所需的Python库。我们可以使用以下命令来安装pandas、numpy、matplotlib和ta-lib这些库:

```
pip install pandas numpy matplotlib ta-lib
```

下一个请求。


AI用户(股票交易员):

指令:在Python脚本中导入所需的库。
输入:无


AI助手(Python程序员):

解决方案:我们可以使用import语句在Python脚本中导入所需的库。我们需要导入pandas、numpy、matplotlib和ta-lib这些库进行数据分析和交易。我们可以使用以下代码来导入这些库:

```
import pandas as pd
import numpy as np
import matplotlib.pyplot as plt
import talib as ta
```

下一个请求。


AI用户(股票交易员):

指令:将历史股票数据加载到pandas DataFrame中。
输入:包含历史股票数据的CSV文件的路径。


AI助手(Python程序员):

解决方案:我们可以使用pandas的`read_csv()`函数将历史股票数据加载到pandas DataFrame中。我们需要将包含历史股票数据的CSV文件的路径作为参数传递给这个函数。我们可以使用以下代码来加载历史股票数据:

```
df = pd.read_csv('path/to/csv/file.csv')
```

这将把历史股票数据加载到名为`df`的pandas DataFrame中。下一个请求。


AI用户(股票交易员):

指令:通过将日期列设置为索引并按日期升序排序来预处理历史股票数据。
输入:无。


AI助手(Python程序员):

解决方案:我们可以使用pandas的`set_index()`和`sort_index()`函数将日期列设置为索引并按日期升序排序来预处理历史股票数据。我们可以使用以下代码来预处理历史股票数据:

```
df = df.set_index('date')
df = df.sort_index(ascending=True)
```

这将把日期列设置为索引并按日期升序排序DataFrame。下一个请求。


AI用户(股票交易员):

指令:使用`ta.SMA()`函数计算股票数据的短期和长期移动平均线。
输入:短期移动平均线的周期和长期移动平均线的周期。


AI助手(Python程序员):

解决方案:我们可以使用`ta.SMA()`函数从ta-lib计算股票数据的短期和长期移动平均线。我们需要将短期移动平均线的周期和长期移动平均线的周期作为参数传递给这个函数。我们可以使用以下代码来计算短期和长期移动平均线:

```
short_ma = ta.SMA(df['close'], timeperiod=short_period)
long_ma = ta.SMA(df['close'], timeperiod=long_period)
```

这将计算股票数据的短期和长期移动平均线,并分别存储在`short_ma`和`long_ma`变量中。下一个请求。


AI用户(股票交易员):

指令:创建一个新的DataFrame,将历史股票数据与短期和长期移动平均线合并。
输入:无。


AI助手(Python程序员):

解决方案:我们可以使用pandas的`concat()`函数创建一个新的DataFrame,将历史股票数据与短期和长期移动平均线合并。我们需要将历史股票数据、短期移动平均线和长期移动平均线作为参数传递给这个函数。我们可以使用以下代码来创建新的DataFrame:

```
new_df = pd.concat([df, short_ma, long_ma], axis=1)
new_df.columns = ['open', 'high', 'low', 'close', 'volume', 'short_ma', 'long_ma']
```

这将创建一个名为`new_df`的新DataFrame,它将历史股票数据与短期和长期移动平均线合并。新DataFrame的列名分别为'open'、'high'、'low'、'close'、'volume'、'short_ma'和'long_ma'。下一个请求。


AI用户(股票交易员):

指令:在DataFrame中创建一个新的列,根据短期和长期移动平均线的交叉点指示何时买入或卖出股票。
输入:无。


AI助手(Python程序员):

解决方案:我们可以在DataFrame中创建一个新的列,根据短期和长期移动平均线的交叉点指示何时买入或卖出股票。我们可以使用以下代码来创建新的列:

```
new_df['signal'] = np.where(new_df['short_ma'] > new_df['long_ma'], 1, -1)
```

这将在DataFrame中创建一个名为'signal'的新列,根据短期和长期移动平均线的交叉点指示何时买入或卖出股票。如果短期移动平均线大于长期移动平均线,则信号为1(买入),否则信号为-1(卖出)。下一个请求。


AI用户(股票交易员):

指令:在DataFrame中创建一个新的列,根据买入和卖出信号以及定义的止损和利润目标计算每笔交易的利润或损失。
输入:止损和利润目标的百分比。


AI助手(Python程序员):

解决方案:我们可以在DataFrame中创建一个新的列,根据买入和卖出信号以及定义的止损和利润目标计算每笔交易的利润或损失。我们需要将止损和利润目标作为百分比作为参数传递给这个函数。我们可以使用以下代码来创建新的列:

```
stop_loss = stop_loss_percent / 100
profit_target = profit_target_percent / 100

new_df['pnl'] = 0.0
buy_price = 0.0
for i in range(1, len(new_df)):
if new_df['signal'][i] == 1 and new_df['signal'][i-1] == -1:
buy_price = new_df['close'][i]
elif new_df['signal'][i] == -1 and new_df['signal'][i-1] == 1:
sell_price = new_df['close'][i]
if sell_price <= buy_price * (1 - stop_loss):
new_df['pnl'][i] = -stop_loss
elif sell_price >= buy_price * (1 + profit_target):
new_df['pnl'][i] = profit_target
else:
new_df['pnl'][i] = (sell_price - buy_price) / buy_price
```

这将在DataFrame中创建一个名为'pnl'的新列,根据买入和卖出信号以及定义的止损和利润目标计算每笔交易的利润或损失。止损和利润目标是基于stop_loss_percent和profit_target_percent变量计算的。买入和卖出价格分别存储在buy_price和sell_price变量中。如果卖出价格小于或等于止损价格,则利润或损失设置为-stop_loss。如果卖出价格大于或等于利润目标价格,则利润或损失设置为profit_target。否则,利润或损失计算为(sell_price - buy_price) / buy_price。下一个请求。


AI用户(股票交易员):

指令:计算所有交易的总利润或损失。
输入:无。


AI助手(Python程序员):

解决方案:我们可以通过对DataFrame的'pnl'列中的值求和来计算所有交易的总利润或损失。我们可以使用以下代码来计算总利润或损失:

```
total_pnl = new_df['pnl'].sum()
```

这将计算所有交易的总利润或损失,并将其存储在total_pnl变量中。下一个请求。


AI用户(股票交易员):

指令:使用线图可视化股票数据、短期移动平均线和长期移动平均线。
输入:无。


AI助手(Python程序员):

解决方案:我们可以使用pandas的`plot()`函数使用线图可视化股票数据、短期移动平均线和长期移动平均线。我们可以使用以下代码来可视化数据:

```
plt.figure(figsize=(12,6))
plt.plot(new_df.index, new_df['close'], label='Close')
plt.plot(new_df.index, new_df['short_ma'], label='Short MA')
plt.plot(new_df.index, new_df['long_ma'], label='Long MA')
plt.xlabel('Date')
plt.ylabel('Price')
plt.title('Stock Data with Moving Averages')
plt.legend()
plt.show()
```

这将创建一个线图,可视化股票数据、短期移动平均线和长期移动平均线。x轴表示日期,y轴表示价格。图表还包括一个标签,标记每条线。下一个请求。


AI用户(股票交易员):

指令:使用散点图可视化买入和卖出信号。
输入:无。


AI助手(Python程序员):

解决方案:我们可以使用matplotlib的`scatter()`函数使用散点图可视化买入和卖出信号。我们可以使用以下代码来可视化信号:

```
buy_signals = new_df[new_df['signal'] == 1]
sell_signals = new_df[new_df['signal'] == -1]

plt.figure(figsize=(12,6))
plt.scatter(buy_signals.index, buy_signals['close'], label='Buy', marker='^', color='green')
plt.scatter(sell_signals.index, sell_signals['close'], label='Sell', marker='v', color='red')
plt.plot(new_df.index, new_df['close'], label='Close')
plt.xlabel('Date')
plt.ylabel('Price')
plt.title('Buy and Sell Signals')
plt.legend()
plt.show()
```

这将创建一个散点图,可视化买入和卖出信号。x轴表示日期,y轴表示价格。图表还包括一个标签,标记每个点。下一个请求。


AI用户(股票交易员):

指令:打印所有交易的总利润或损失。
输入:无。


AI助手(Python程序员):

解决方案:我们可以使用`print()`函数打印所有交易的总利润或损失。我们可以使用以下代码来打印总利润或损失:

```
print('总利润/损失:{:.2%}'.format(total_pnl))
```

这将以百分比格式打印所有交易的总利润或损失,保留两位小数。下一个请求。


AI用户(股票交易员):

指令:通过将所有代码片段放在一个单独的Python脚本中来完成代码。
输入:每个代码片段的必要输入。


AI助手(Python程序员):

解决方案:这是将所有代码片段放在一起的完整Python脚本:

```
import pandas as pd
import numpy as np
import matplotlib.pyplot as plt
import talib as ta

# 将历史股票数据加载到pandas DataFrame中
df = pd.read_csv('path/to/csv/file.csv')

# 预处理历史股票数据
df = df.set_index('date')
df = df.sort_index(ascending=True)

# 计算短期和长期移动平均线
short_period = 10
long_period = 30
short_ma = ta.SMA(df['close'], timeperiod=short_period)
long_ma = ta.SMA(df['close'], timeperiod=long_period)

# 创建一个新的DataFrame,将历史股票数据与短期和长期移动平均线合并
new_df = pd.concat([df, short_ma, long_ma], axis=1)
new_df.columns = ['open', 'high', 'low', 'close', 'volume', 'short_ma', 'long_ma']

# 在DataFrame中创建一个新的列,根据短期和长期移动平均线的交叉点指示何时买入或卖出股票
new_df['signal'] = np.where(new_df['short_ma'] > new_df['long_ma'], 1, -1)

# 在DataFrame中创建一个新的列,根据买入和卖出信号以及定义的止损和利润目标计算每笔交易的利润或损失
stop_loss_percent = 5
profit_target_percent = 10
stop_loss = stop_loss_percent / 100
profit_target = profit_target_percent / 100
new_df['pnl'] = 0.0
buy_price = 0.0
for i in range(1, len(new_df)):
if new_df['signal'][i] == 1 and new_df['signal'][i-1] == -1:
buy_price = new_df['close'][i]
elif new_df['signal'][i] == -1 and new_df['signal'][i-1] == 1:
sell_price = new_df['close'][i]
if sell_price <= buy_price * (1 - stop_loss):
new_df['pnl'][i] = -stop_loss
elif sell_price >= buy_price * (1 + profit_target):
new_df['pnl'][i] = profit_target
else:
new_df['pnl'][i] = (sell_price - buy_price) / buy_price

# 计算所有交易的总利润或损失
total_pnl = new_df['pnl'].sum()

# 使用线图可视化股票数据、短期移动平均线和长期移动平均线
plt.figure(figsize=(12,6))
plt.plot(new_df.index, new_df['close'], label='Close')
plt.plot(new_df.index, new_df['short_ma'], label='Short MA')
plt.plot(new_df.index, new_df['long_ma'], label='Long MA')
plt.xlabel('Date')
plt.ylabel('Price')
plt.title('Stock Data with Moving Averages')
plt.legend()
plt.show()

# 使用散点图可视化买入和卖出信号
buy_signals = new_df[new_df['signal'] == 1]
sell_signals = new_df[new_df['signal'] == -1]
plt.figure(figsize=(12,6))
plt.scatter(buy_signals.index, buy_signals['close'], label='Buy', marker='^', color='green')
plt.scatter(sell_signals.index, sell_signals['close'], label='Sell', marker='v', color='red')
plt.plot(new_df.index, new_df['close'], label='Close')
plt.xlabel('Date')
plt.ylabel('Price')
plt.title('Buy and Sell Signals')
plt.legend()
plt.show()

# 打印所有交易的总利润或损失
print('总利润/损失:{:.2%}'.format(total_pnl))
```

您需要将path/to/csv/file.csv替换为包含历史股票数据的CSV文件的实际路径。您还可以根据需要调整short_period、long_period、stop_loss_percent和profit_target_percent变量。


AI用户(股票交易员):

<CAMEL_TASK_DONE>


AI助手(Python程序员):

太棒了!如果您需要进一步的帮助,请告诉我。