Airbyte Stripe
Airbyte 是一个用于从API、数据库和文件到数据仓库和数据湖的ELT管道的数据集成平台。它拥有最大的ELT连接器目录,可连接到数据仓库和数据库。
此加载器将Stripe连接器公开为文档加载器,允许您将各种Stripe对象加载为文档。
安装 (Installation)
首先,您需要安装 airbyte-source-stripe
Python 包。
#!pip install airbyte-source-stripe
示例 (Example)
请查看Airbyte文档页面以获取有关如何配置读取器的详细信息。 配置对象应遵循的JSON模式可以在Github上找到:https://github.com/airbytehq/airbyte/blob/master/airbyte-integrations/connectors/source-stripe/source_stripe/spec.yaml。
一般的形状如下所示:
{
"client_secret": "<secret key>",
"account_id": "<account id>",
"start_date": "<date from which to start retrieving records from in ISO format, e.g. 2020-10-20T00:00:00Z>",
}
默认情况下,所有字段都存储为文档中的元数据,并且文本设置为空字符串。通过转换读取器返回的文档来构建文档的文本。
from langchain.document_loaders.airbyte import AirbyteStripeLoader
config = {
# your stripe configuration
}
loader = AirbyteStripeLoader(config=config, stream_name="invoices") # check the documentation linked above for a list of all streams
现在,您可以按照通常的方式加载文档。
docs = loader.load()
由于load
返回一个列表,它将阻塞直到所有文档加载完成。为了更好地控制此过程,您还可以使用lazy_load
方法,它返回一个迭代器:
docs_iterator = loader.lazy_load()
请记住,默认情况下,页面内容为空,元数据对象包含记录的所有信息。要以不同的方式创建文档,请在创建加载器时传入record_handler
函数:
from langchain.docstore.document import Document
def handle_record(record, id):
return Document(page_content=record.data["title"], metadata=record.data)
loader = AirbyteStripeLoader(config=config, record_handler=handle_record, stream_name="invoices")
docs = loader.load()
增量加载 (Incremental loads)
某些流允许增量加载,这意味着源会跟踪已同步的记录,并且不会再次加载它们。这对于具有大量数据且经常更新的源非常有用。
要利用此功能,请存储加载器的last_state
属性,并在再次创建加载器时传入它。这将确保仅加载新记录。
last_state = loader.last_state # store safely
incremental_loader = AirbyteStripeLoader(config=config, record_handler=handle_record, stream_name="invoices", state=last_state)
new_docs = incremental_loader.load()