Skip to main content

Airbyte CDK

Airbyte是一个用于从API、数据库和文件到数据仓库和数据湖的ELT管道的数据集成平台。它拥有最大的ELT连接器目录,可连接到数据仓库和数据库。

许多源连接器都是使用Airbyte CDK实现的。该加载器允许运行任何这些连接器并将数据返回为文档。

安装 (Installation)

首先,您需要安装airbyte-cdk Python包。

#!pip install airbyte-cdk

然后,要么从Airbyte Github存储库安装现有连接器,要么使用Airbyte CDK创建自己的连接器。

例如,要安装Github连接器,请运行:

#!pip install "source_github@git+https://github.com/airbytehq/airbyte.git@master#subdirectory=airbyte-integrations/connectors/source-github"

某些源还以常规包的形式发布在PyPI上。

示例 (Example)

现在,您可以基于导入的源创建一个AirbyteCDKLoader。它接受一个传递给连接器的config对象。您还必须通过名称选择要从中检索记录的流(stream_name)。有关配置对象和可用流的更多信息,请查看连接器的文档页面和规范定义。对于Github连接器,它们分别是:

from langchain.document_loaders.airbyte import AirbyteCDKLoader
from source_github.source import SourceGithub # 在此处插入您自己的源

config = {
# 您的Github配置
"credentials": {
"api_url": "api.github.com",
"personal_access_token": "<token>"
},
"repository": "<repo>",
"start_date": "<date from which to start retrieving records from in ISO format, e.g. 2020-10-20T00:00:00Z>"
}

issues_loader = AirbyteCDKLoader(source_class=SourceGithub, config=config, stream_name="issues")

现在,您可以以通常的方式加载文档。

docs = issues_loader.load()

由于load返回一个列表,它将阻塞直到所有文档加载完成。为了更好地控制这个过程,您还可以使用lazy_load方法,它返回一个迭代器。

docs_iterator = issues_loader.lazy_load()

请记住,默认情况下,页面内容为空,元数据对象包含记录的所有信息。要以不同的方式创建文档,请在创建加载器时传入record_handler函数。

from langchain.docstore.document import Document

def handle_record(record, id):
return Document(page_content=record.data["title"] + "\n" + (record.data["body"] or ""), metadata=record.data)

issues_loader = AirbyteCDKLoader(source_class=SourceGithub, config=config, stream_name="issues", record_handler=handle_record)

docs = issues_loader.load()

增量加载 (Incremental loads)

某些流允许增量加载,这意味着源会跟踪已同步的记录,并且不会再次加载它们。这对于具有大量数据且经常更新的源非常有用。

要利用此功能,请存储加载器的last_state属性,并在再次创建加载器时传递它。这将确保只加载新记录。

last_state = issues_loader.last_state # 安全存储

incremental_issue_loader = AirbyteCDKLoader(source_class=SourceGithub, config=config, stream_name="issues", state=last_state)

new_docs = incremental_issue_loader.load()