Upsertion
更新插入
Upsert指的是将文档上传并处理到向量存储中的过程,这是检索增强生成(RAG)系统的基础。
将数据更新到向量存储中有两种基本方法:
Chatflow 更新
我们强烈推荐使用文档存储,因为它提供了一个统一的接口,有助于RAG(检索增强生成)流程——从不同来源检索数据、分块策略、向向量数据库更新插入(upserting)、与更新数据同步。
在本指南中,我们将介绍另一种方法——Chatflow Upsert。这是文档存储出现之前的一种较旧的方法。
有关详细信息,请参阅向量更新端点API参考。
了解更新插入(upserting)过程
Chatflow允许您创建一个流程,该流程既可以执行更新操作,也可以执行RAG查询过程,两者都可以独立运行。

设置
为了使更新插入(upsert)流程正常工作,我们需要创建一个包含5个不同节点的更新插入流:
文档加载器
文本分割器
嵌入模型
矢量存储
记录管理器(可选)
所有元素已在文档存储中介绍,更多详情请参阅该文档。
一旦流程设置正确,右上角就会出现一个绿色按钮,用户可以通过该按钮启动更新流程。


更新插入(upsert)过程也可以通过API执行:

基本URL和认证
基础URL:http://localhost:3000(或您的Flowise实例URL)
端点:POST /api/v1/vector/upsert/:id
身份验证:请参阅流程的身份验证
请求方法
根据您的聊天流程配置,API支持两种不同的请求方法:
1. 表单数据(文件上传)
当您的聊天流程包含具有文件上传功能的文档加载器时使用。
2. JSON 正文(无文件上传)
当您的聊天流程使用不需要文件上传的文档加载器(例如网页抓取器、数据库连接器)时使用。
{% 提示样式="警告" %}
要覆盖任何节点配置(如文件、元数据等),您必须明确启用该选项。

带有文件上传功能的文档加载器
支持的文档类型
CSV文件
.csv
Docx/Word文件
.docx
JSON文件
.json
JSON Lines文件
.jsonl
PDF文件
.pdf
文本文件
.txt
Excel文件
.xlsx
Powerpoint文件
.pptx
文件加载器
多个
非结构化文件
多个
提示框(信息提示) 重要提示:请确保文件类型与您的文档加载器配置相匹配。为了获得最大的灵活性,请考虑使用支持多种文件类型的文件加载器。
请求格式(表单数据)
上传文件时,请使用multipart/form-data而不是JSON:
示例
标签页
python 导入requests库 导入操作系统
def upsert_document(chatflow_id, file_path, config=None): """ 将单个文档更新到向量存储中。
参数:
chatflow_id (str): 为向量更新配置的聊天流程ID
file_path (str): 要上传文件的路径
return_source_docs (bool): 是否在响应中返回源文档
config (dict): 可选的配置覆盖项
返回:
dict:包含更新结果的API响应
"""
url = f"http://localhost:3000/api/v1/vector/upsert/{chatflow_id}"
# 准备文件数据
文件 = {
'files': (os.path.basename(file_path), open(file_path, 'rb'))
}
# 准备表单数据
数据 = {}
# 如果提供了配置覆盖,请添加
如果配置(config)存在:
data['overrideConfig'] = str(config).replace("'", '"') # 转换为JSON字符串
尝试:
响应 = requests.post(url, files=files, data=data)
响应.raise_for_status()
返回响应的JSON格式数据
除了requests.exceptions.RequestException异常(用e表示)之外:
打印(f"上传失败:{e}")返回空值 最后: # 始终关闭文件 files['files'][1].close()
使用示例
结果 = 更新文档( chatflow_id="你的聊天流程ID",, 文件路径="documents/knowledge_base.pdf", 配置 = { "chunkSize": 1000,, "chunkOverlap": 200 } )
如果结果为: print(f"已成功更新插入 {result.get('numAdded', 0)} 个数据块") 如果 result.get('sourceDocuments'): 打印(f"源文档数量:{len(result['sourceDocuments'])}") 否则: 打印("上传失败")
{% endtab %}
{% tab title="Javascript(浏览器)" %}
```javascript
/// 译文内容:
---
根据上面的信息,执行如下指令:
缺失译文,请检查输入
类 VectorUploader {
构造函数(baseUrl = 'http://localhost:3000') {
this.baseUrl = baseUrl;;
}
async upsertDocument(chatflowId, file, config = {}) {
/**
* 从浏览器上传文件到矢量存储
* @param {string} chatflowId - 聊天流程ID
* @param {File} file - 来自输入元素的File对象
* @param {Object} config - 可选配置
*/
const formData = new FormData();;
formData.append('files', file);;
如果 (config.overrideConfig) {
formData.append('overrideConfig', JSON.stringify(config.overrideConfig));;
}
尝试 {
const response = await fetch(`${this.baseUrl}/api/v1/vector/upsert/${chatflowId}`, {
方法:'POST',
正文:表单数据
});
如果 (!response.ok) {
抛出新的错误(`HTTP 错误!状态:${response.status}`);;
}
const result = await response.json();;
返回结果;
} catch (error) {
console.error('上传失败:', error);;
抛出错误;
}
}
}
// 浏览器中的使用示例
const 上传器 = new VectorUploader();;
// 单文件上传
document.getElementById('fileInput').addEventListener('change', async function(e) {
const file = e.target.files[0];;
如果 (file) {
尝试 {
const result = await uploader.upsertDocument(
'your-chatflow-id',
文件,
{
overrideConfig: {
数据块大小:1000,
数据块重叠:200
}
}
);
console.log('上传成功:', result);;
alert(`已成功处理 ${result.numAdded || 0} 个数据块`);;
} catch (error) {
console.error('上传失败:', error);;
alert('上传失败:' + error.message);;
}
}
});{% endtab %}
{% tab title="Javascript (Node.js)" %}
/// 译文内容:
---
根据上面的信息,执行如下指令:
你是个专业的翻译,负责把英语内容翻译成中文内容,请帮我翻译一下原文内容
const fs = require('fs');;
const path = require('path');;
const FormData = require('form-data');;
const fetch = require('node-fetch');;
类 NodeVectorUploader {
构造函数(baseUrl = 'http://localhost:3000') {
this.baseUrl = baseUrl;;
}
async upsertDocument(chatflowId, filePath, config = {}) {
/**
* 从Node.js向矢量存储上传文件
* @param {string} chatflowId - 聊天流程ID
* @param {string} filePath - 文件路径
* @param {Object} config - 可选配置
*/
如果 (!fs.existsSync(filePath)) {
抛出新的错误(`文件未找到:${filePath}`);
}
const formData = new FormData();;
const fileStream = fs.createReadStream(filePath);;
formData.append('files', fileStream, {
文件名:path.basename(filePath),
contentType:this.getMimeType(filePath)
});
如果 (config.overrideConfig) {
formData.append('overrideConfig', JSON.stringify(config.overrideConfig));;
}
尝试 {
const response = await fetch(`${this.baseUrl}/api/v1/vector/upsert/${chatflowId}`, {
方法:'POST',
正文:formData,
头信息:formData.getHeaders()
});
如果 (!response.ok) {
const errorText = await response.text();;
抛出新的错误(`HTTP ${response.status}: ${errorText}`);;
}
返回 await response.json();;
} catch (error) {
console.error('上传失败:', error);;
抛出错误;
}
}
getMimeType(filePath) {
const ext = path.extname(filePath).toLowerCase();;
const mimeTypes = {
'.pdf': 'application/pdf',,
'.txt':'text/plain',
'.docx':'application/vnd.openxmlformats-officedocument.wordprocessingml.document',
'.csv': 'text/csv',,
'.json':'application/json'
};
返回 mimeTypes[扩展名] 或者 'application/octet-stream';
}
}
// 使用示例
异步函数 main() {
const 上传器 = new NodeVectorUploader();;
尝试 {
// 单文件上传
const result = await uploader.upsertDocument(
'your-chatflow-id',
'./documents/manual.pdf',,
{
overrideConfig: {
数据块大小:1200,
数据块重叠率:100
}
}
);
console.log('单文件上传结果:', result);;
} catch (error) {
console.error('进程失败:', error);;
}
}
// 如果直接执行此文件,则运行
如果 (require.main === module) {
main();;
}
module.exports = { NodeVectorUploader };;{% endtab %}
{% tab title="cURL" %}
# 使用cURL进行基本文件上传
使用curl命令,通过POST方法,向http://localhost:3000/api/v1/vector/upsert/your-chatflow-id发送请求
-F "files=@documents/knowledge_base.pdf"
# 配置覆盖的文件上传
使用curl命令,通过POST方式,向http://localhost:3000/api/v1/vector/upsert/your-chatflow-id发送请求
-F "files=@documents/manual.pdf" \
-F 'overrideConfig={"chunkSize": 1000, "chunkOverlap": 200}'
# 使用自定义头部信息进行身份验证上传(如已配置)
使用curl命令,通过POST方法,向http://localhost:3000/api/v1/vector/upsert/your-chatflow-id发送请求
-H "Authorization: Bearer your-api-token" \
-F "files=@documents/faq.txt" \
-F 'overrideConfig={"chunkSize": 800, "chunkOverlap": 150}'{% endtab %} {% endtabs %}
无文件上传功能的文档加载器
对于不需要文件上传的文档加载器(例如,网页抓取器、数据库连接器、API集成),请使用类似于预测API的JSON格式。
示例
标签页 {% tab title="Python" %} python 导入requests库 from typing import Dict, Any, Optional
def upsert(chatflow_id: str, config: Optional[Dict[str, Any]] = None) -> Optional[Dict[str, Any]]: """ 为不需要文件上传的聊天流程触发向量更新。
参数:
chatflow_id:为向量更新配置的聊天流程ID
config: 可选的配置覆盖项
返回:
包含更新结果的API响应
"""
url = f"http://localhost:3000/api/v1/vector/upsert/{chatflow_id}"
载荷 = {
"overrideConfig": 配置
}
头部信息 = {
"Content-Type": "application/json"
}
尝试:
响应 = requests.post(url, json=payload, headers=headers, timeout=300)
响应.raise_for_status()
返回响应的JSON格式数据
除了 requests.exceptions.RequestException 异常(用 e 表示):
打印(f"更新失败:{e}")返回空值
结果 = upsert( chatflow_id="聊天流程ID",, 配置 = { "chunkSize": 800,, "chunkOverlap": 100,, } )
如果结果为: print(f"更新完成:已添加{result.get('numAdded', 0)}个块")
</div>
<div data-gb-custom-block data-tag="tab" data-title='JavaScript'>
```javascript
/// 译文内容:
---
根据上面的信息,执行如下指令:
缺失译文,请检查输入
class NoFileUploader {
构造函数(baseUrl = 'http://localhost:3000') {
this.baseUrl = baseUrl;;
}
async upsertWithoutFiles(chatflowId, config = {}) {
/**
* 为不需要文件上传的流程触发向量更新
* @param {string} chatflowId - 聊天流程ID
* @param {Object} config - 配置覆盖项
*/
const payload = {
overrideConfig: 配置
};
尝试 {
const response = await fetch(`${this.baseUrl}/api/v1/vector/upsert/${chatflowId}`, {
方法:'POST',
头信息:{
'Content-Type': 'application/json',,
},
正文:JSON.stringify(payload)
});
如果 (!response.ok) {
抛出新的错误(`HTTP 错误!状态:${response.status}`);;
}
返回 await response.json();;
} catch (error) {
console.error('更新失败:', error);;
抛出错误;
}
}
异步的定时更新操作(chatflowId, 间隔时间 = 3600000) {
/**
* 为动态内容源设置定时更新
* @param {string} chatflowId - 聊天流程ID
* @param {number} interval - 时间间隔(以毫秒为单位)(默认值:1小时)
*/
控制台输出:`开始每${interval/1000}秒执行一次预定的更新操作`;
const performUpsert = async () => {
尝试 {
控制台输出:“正在执行预定的更新操作...”;
const result = await this.upsertWithoutFiles(chatflowId, {
addMetadata: {
scheduledUpdate: true,,
时间戳:new Date().toISOString()
}
});
控制台输出:`已完成的计划更新操作:已处理 ${result.numAdded || 0} 个块`;
} catch (error) {
console.error('计划更新失败:', error);;
}
};
// 执行初始更新插入操作
等待执行更新操作;
// 设置定期更新
返回 setInterval(performUpsert, interval);;
}
}
// 使用示例
const uploader = new NoFileUploader();;
异步函数 performUpsert() {
尝试 {
const result = await uploader.upsertWithoutFiles(
'chatflow-id',
{
数据块大小:800,
数据块重叠:100
}
);
console.log('更新结果:', result);;
} catch (error) {
console.error('更新失败:', error);;
}
}
// 一次性更新
等待执行更新操作();;
// 设置定时更新(每30分钟一次)
const schedulerHandle = await uploader.scheduledUpsert(
'dynamic-content-chatflow-id',
30 * 60 * 1000
);
// 稍后停止计划更新:
// 清除调度器句柄的间隔时间;# 使用cURL进行基本更新
使用curl命令,通过POST方法,向http://localhost:3000/api/v1/vector/upsert/your-chatflow-id发送请求
-H "Content-Type: application/json"
# 使用配置覆盖进行更新
使用curl命令,通过POST方法,向http://localhost:3000/api/v1/vector/upsert/your-chatflow-id发送请求
-H "Content-Type: application/json" \
d
"overrideConfig": {
"returnSourceDocuments": true
}
}'
# 使用自定义头进行身份验证的更新(如果已配置)
使用curl命令,通过POST方法,向http://localhost:3000/api/v1/vector/upsert/your-chatflow-id发送请求
-H "Authorization: Bearer your-api-token" \
-H "Content-Type: application/json"响应字段
numAdded
number
向向量存储中添加的新块数量
numDeleted
number
删除的块数(如果使用记录管理器)
numSkipped
number
跳过的块数(如果使用记录管理器)
numUpdated
number
已更新的现有块数(如果使用记录管理器)
优化策略
1. 批处理策略
python def intelligent_batch_processing(files: List[str], chatflow_id: str) -> Dict[str, Any]: “根据文件大小和类型,以优化的批量方式处理文件。”
# 按大小和类型对文件进行分组
小文件列表 = []
large_files = []
对于文件列表中的每个文件路径:
文件大小 = os.path.getsize(文件路径)
如果文件大小 > 5_000_000:# 5MB
large_files.append(file_path)
否则:
将文件路径添加到small_files列表中
结果 = {'成功': [], '失败': [], '总块数': 0}
# 单独处理大文件
对于large_files中的每个文件路径:
打印(f"正在处理大文件:{file_path}")
# 使用自定义配置进行单独处理
# ...实现
# 分批处理小文件
批量大小 = 5
对于i在0到小文件列表长度之间,以批量大小为步长进行遍历:
批量 = 小文件[i:i + 批量大小]
打印(f"正在处理一批{batch长度}个小文件")
# 批处理
# ...实现返回结果
### 2. 元数据优化
python
导入 requests 库
导入操作系统
从datetime模块导入datetime类
from typing import Dict, Any
使用优化元数据进行更新(upsert)操作(def upsert_with_optimized_metadata(chatflow_id: str, file_path: str,,
def get_dict(department: str = None, category: str = None) -> Dict[str, Any]:
"""
使用自动优化的元数据更新文档。
"""
url = f"http://localhost:3000/api/v1/vector/upsert/{chatflow_id}"
# 生成优化的元数据
自定义元数据 = {
'department': 部门或'general',
'category': 类别或'documentation',
'indexed_date': datetime.now().strftime('%Y-%m-%d'),,
'version': '1.0'
}
优化后的元数据 = 优化元数据(文件路径, 自定义元数据)
# 准备请求
files = {'files': (os.path.basename(file_path), open(file_path, 'rb'))}
数据 = {
'overrideConfig': str({
'metadata': 优化后的元数据
```注:原文似乎是一个代码片段,其中包含了一些占位符和特殊字符。在中文中,我们通常不会直接翻译这些占位符或特殊字符,而是保留原样。因此,在中文翻译中,我保留了原文的格式。如果需要翻译的是具体的文本内容,请提供具体的文本,我会尽力帮助您进行翻译 }
尝试:
响应 = requests.post(url, files=files, data=data)
响应.raise_for_status()
返回响应的JSON格式数据
最后:
files['files'][1].close()不同文档类型的示例用法
结果 = []
技术文档
技术结果 = 使用优化元数据进行更新插入( chatflow_id="技术文档聊天流程",, 文件路径为“docs/api_reference.pdf”, 部门="工程部", 类别="技术文档" ) 结果.append(技术结果)
人力资源政策
hr_result = 使用优化元数据进行更新插入(upsert_with_optimized_metadata) chatflow_id="hr-docs-chatflow",, 文件路径="policies/employee_handbook.pdf", 部门="人力资源部", 类别="政策" ) 结果.append(hr_result)
营销材料
营销结果 = 使用优化元数据进行更新插入(upsert_with_optimized_metadata) chatflow_id="营销聊天流程",, 文件路径="marketing/product_brochure.pdf", 部门="市场营销", 类别="促销" ) 结果.append(营销结果)
在枚举结果时,对每个结果进行循环: print(f"上传{i+1}:已添加{result.get('numAdded', 0)}个块")
## 故障排除
1. **文件上传失败**
* 检查文件格式兼容性
* 验证文件大小限制
2. **处理超时**
* 增加请求超时时间
* 将大文件拆分成更小的部分
* 优化数据块大小
3. **矢量存储错误**
* 检查向量存储连接
* 验证嵌入模型的维度兼容性Last updated