Upsertion

更新插入

Upsert指的是将文档上传并处理到向量存储中的过程,这是检索增强生成(RAG)系统的基础。

将数据更新到向量存储中有两种基本方法:

我们强烈推荐使用文档存储,因为它提供了一个统一的接口,有助于RAG(检索增强生成)流程——从不同来源检索数据、分块策略、向向量数据库更新插入(upserting)、与更新数据同步。

在本指南中,我们将介绍另一种方法——Chatflow Upsert。这是文档存储出现之前的一种较旧的方法。

有关详细信息,请参阅向量更新端点API参考

了解更新插入(upserting)过程

Chatflow允许您创建一个流程,该流程既可以执行更新操作,也可以执行RAG查询过程,两者都可以独立运行。

更新插入与RAG

设置

为了使更新插入(upsert)流程正常工作,我们需要创建一个包含5个不同节点的更新插入流

  1. 文档加载器

  2. 文本分割器

  3. 嵌入模型

  4. 矢量存储

  5. 记录管理器(可选)

所有元素已在文档存储中介绍,更多详情请参阅该文档。

一旦流程设置正确,右上角就会出现一个绿色按钮,用户可以通过该按钮启动更新流程。

更新插入(upsert)过程也可以通过API执行:

基本URL和认证

基础URLhttp://localhost:3000(或您的Flowise实例URL)

端点POST /api/v1/vector/upsert/:id

身份验证:请参阅流程的身份验证

请求方法

根据您的聊天流程配置,API支持两种不同的请求方法:

1. 表单数据(文件上传)

当您的聊天流程包含具有文件上传功能的文档加载器时使用。

2. JSON 正文(无文件上传)

当您的聊天流程使用不需要文件上传的文档加载器(例如网页抓取器、数据库连接器)时使用。

{% 提示样式="警告" %}

要覆盖任何节点配置(如文件、元数据等),您必须明确启用该选项。

带有文件上传功能的文档加载器

支持的文档类型

文档加载器
文件类型

CSV文件

.csv

Docx/Word文件

.docx

JSON文件

.json

JSON Lines文件

.jsonl

PDF文件

.pdf

文本文件

.txt

Excel文件

.xlsx

Powerpoint文件

.pptx

文件加载器

多个

非结构化文件

多个

提示框(信息提示) 重要提示:请确保文件类型与您的文档加载器配置相匹配。为了获得最大的灵活性,请考虑使用支持多种文件类型的文件加载器。

请求格式(表单数据)

上传文件时,请使用multipart/form-data而不是JSON:

示例

标签页

python 导入requests库 导入操作系统

def upsert_document(chatflow_id, file_path, config=None): """ 将单个文档更新到向量存储中。

参数:
    chatflow_id (str): 为向量更新配置的聊天流程ID
    file_path (str): 要上传文件的路径
    return_source_docs (bool): 是否在响应中返回源文档
    config (dict): 可选的配置覆盖项

返回:
    dict:包含更新结果的API响应
"""
url = f"http://localhost:3000/api/v1/vector/upsert/{chatflow_id}"

# 准备文件数据
文件 = {
    'files': (os.path.basename(file_path), open(file_path, 'rb'))
}

# 准备表单数据
数据 = {}

# 如果提供了配置覆盖,请添加
如果配置(config)存在:
    data['overrideConfig'] = str(config).replace("'", '"')  # 转换为JSON字符串

尝试:
    响应 = requests.post(url, files=files, data=data)
    响应.raise_for_status()
    
    返回响应的JSON格式数据
    
除了requests.exceptions.RequestException异常(用e表示)之外:
    打印(f"上传失败:{e}")

返回空值 最后: # 始终关闭文件 files['files'][1].close()

使用示例

结果 = 更新文档( chatflow_id="你的聊天流程ID",, 文件路径="documents/knowledge_base.pdf", 配置 = { "chunkSize": 1000,, "chunkOverlap": 200 } )

如果结果为: print(f"已成功更新插入 {result.get('numAdded', 0)} 个数据块") 如果 result.get('sourceDocuments'): 打印(f"源文档数量:{len(result['sourceDocuments'])}") 否则: 打印("上传失败")

{% endtab %}

{% tab title="Javascript(浏览器)" %}
```javascript

/// 译文内容:
---
根据上面的信息,执行如下指令:
缺失译文,请检查输入
类 VectorUploader {
    构造函数(baseUrl = 'http://localhost:3000') {
        this.baseUrl = baseUrl;;
    }
    
    async upsertDocument(chatflowId, file, config = {}) {
        /**
         * 从浏览器上传文件到矢量存储
         * @param {string} chatflowId - 聊天流程ID
         * @param {File} file - 来自输入元素的File对象
         * @param {Object} config - 可选配置
         */
        
        const formData = new FormData();;
        formData.append('files', file);;
        
        如果 (config.overrideConfig) {
            formData.append('overrideConfig', JSON.stringify(config.overrideConfig));;
        }
        
        尝试 {
            const response = await fetch(`${this.baseUrl}/api/v1/vector/upsert/${chatflowId}`, {
                方法:'POST',
                正文:表单数据
            });
            
            如果 (!response.ok) {
                抛出新的错误(`HTTP 错误!状态:${response.status}`);;
            }
            
            const result = await response.json();;
            返回结果;
            
        } catch (error) {
            console.error('上传失败:', error);;
            抛出错误;
        }
    }
    
  
}

// 浏览器中的使用示例
const 上传器 = new VectorUploader();;

// 单文件上传
document.getElementById('fileInput').addEventListener('change', async function(e) {
    const file = e.target.files[0];;
    如果 (file) {
        尝试 {
            const result = await uploader.upsertDocument(
                'your-chatflow-id',
                文件,
                {
                    overrideConfig: {
                        数据块大小:1000,
                        数据块重叠:200
                    }
                }
            );
            
            console.log('上传成功:', result);;
            alert(`已成功处理 ${result.numAdded || 0} 个数据块`);;
            
        } catch (error) {
            console.error('上传失败:', error);;
            alert('上传失败:' + error.message);;
        }
    }
});

{% endtab %}

{% tab title="Javascript (Node.js)" %}


/// 译文内容:
---
根据上面的信息,执行如下指令:
你是个专业的翻译,负责把英语内容翻译成中文内容,请帮我翻译一下原文内容
const fs = require('fs');;
const path = require('path');;
const FormData = require('form-data');;
const fetch = require('node-fetch');;

类 NodeVectorUploader {
    构造函数(baseUrl = 'http://localhost:3000') {
        this.baseUrl = baseUrl;;
    }
    
    async upsertDocument(chatflowId, filePath, config = {}) {
        /**
         * 从Node.js向矢量存储上传文件
         * @param {string} chatflowId - 聊天流程ID
         * @param {string} filePath - 文件路径
         * @param {Object} config - 可选配置
         */
        
        如果 (!fs.existsSync(filePath)) {
            抛出新的错误(`文件未找到:${filePath}`);
        }
        
        const formData = new FormData();;
        const fileStream = fs.createReadStream(filePath);;
        
        formData.append('files', fileStream, {
            文件名:path.basename(filePath),
            contentType:this.getMimeType(filePath)
        });
        
        如果 (config.overrideConfig) {
            formData.append('overrideConfig', JSON.stringify(config.overrideConfig));;
        }
        
        尝试 {
            const response = await fetch(`${this.baseUrl}/api/v1/vector/upsert/${chatflowId}`, {
                方法:'POST',
                正文:formData,
                头信息:formData.getHeaders()
            });
            
            如果 (!response.ok) {
                const errorText = await response.text();;
                抛出新的错误(`HTTP ${response.status}: ${errorText}`);;
            }
            
            返回 await response.json();;
            
        } catch (error) {
            console.error('上传失败:', error);;
            抛出错误;
        }
    }

    getMimeType(filePath) {
        const ext = path.extname(filePath).toLowerCase();;
        const mimeTypes = {
            '.pdf': 'application/pdf',,
            '.txt':'text/plain',
            '.docx':'application/vnd.openxmlformats-officedocument.wordprocessingml.document',
            '.csv': 'text/csv',,
            '.json':'application/json'
        };
        返回 mimeTypes[扩展名] 或者 'application/octet-stream';
    }
}

// 使用示例
异步函数 main() {
    const 上传器 = new NodeVectorUploader();;
    
    尝试 {
        // 单文件上传
        const result = await uploader.upsertDocument(
            'your-chatflow-id',
            './documents/manual.pdf',,
            {
                overrideConfig: {
                    数据块大小:1200,
                    数据块重叠率:100
                }
            }
        );
        
        console.log('单文件上传结果:', result);;
    } catch (error) {
        console.error('进程失败:', error);;
    }
}

// 如果直接执行此文件,则运行
如果 (require.main === module) {
    main();;
}

module.exports = { NodeVectorUploader };;

{% endtab %}

{% tab title="cURL" %}

# 使用cURL进行基本文件上传
使用curl命令,通过POST方法,向http://localhost:3000/api/v1/vector/upsert/your-chatflow-id发送请求
  -F "files=@documents/knowledge_base.pdf"

# 配置覆盖的文件上传
使用curl命令,通过POST方式,向http://localhost:3000/api/v1/vector/upsert/your-chatflow-id发送请求
  -F "files=@documents/manual.pdf" \
  -F 'overrideConfig={"chunkSize": 1000, "chunkOverlap": 200}'

# 使用自定义头部信息进行身份验证上传(如已配置)
使用curl命令,通过POST方法,向http://localhost:3000/api/v1/vector/upsert/your-chatflow-id发送请求
  -H "Authorization: Bearer your-api-token" \
  -F "files=@documents/faq.txt" \
  -F 'overrideConfig={"chunkSize": 800, "chunkOverlap": 150}'

{% endtab %} {% endtabs %}

无文件上传功能的文档加载器

对于不需要文件上传的文档加载器(例如,网页抓取器、数据库连接器、API集成),请使用类似于预测API的JSON格式。

示例

标签页 {% tab title="Python" %} python 导入requests库 from typing import Dict, Any, Optional

def upsert(chatflow_id: str, config: Optional[Dict[str, Any]] = None) -> Optional[Dict[str, Any]]: """ 为不需要文件上传的聊天流程触发向量更新。

参数:
    chatflow_id:为向量更新配置的聊天流程ID
    config: 可选的配置覆盖项

返回:
    包含更新结果的API响应
"""
url = f"http://localhost:3000/api/v1/vector/upsert/{chatflow_id}"

载荷 = {
    "overrideConfig": 配置
}

头部信息 = {
    "Content-Type": "application/json"
}

尝试:
    响应 = requests.post(url, json=payload, headers=headers, timeout=300)
    响应.raise_for_status()
    
    返回响应的JSON格式数据
    
除了 requests.exceptions.RequestException 异常(用 e 表示):
    打印(f"更新失败:{e}")

返回空值

结果 = upsert( chatflow_id="聊天流程ID",, 配置 = { "chunkSize": 800,, "chunkOverlap": 100,, } )

如果结果为: print(f"更新完成:已添加{result.get('numAdded', 0)}个块")


</div>

<div data-gb-custom-block data-tag="tab" data-title='JavaScript'>

```javascript

/// 译文内容:
---
根据上面的信息,执行如下指令:
缺失译文,请检查输入
class NoFileUploader {
    构造函数(baseUrl = 'http://localhost:3000') {
        this.baseUrl = baseUrl;;
    }
    
    async upsertWithoutFiles(chatflowId, config = {}) {
        /**
         * 为不需要文件上传的流程触发向量更新
         * @param {string} chatflowId - 聊天流程ID
         * @param {Object} config - 配置覆盖项
         */
        
        const payload = {
            overrideConfig: 配置
        };
        
        尝试 {
            const response = await fetch(`${this.baseUrl}/api/v1/vector/upsert/${chatflowId}`, {
                方法:'POST',
                头信息:{
                    'Content-Type': 'application/json',,
                },
                正文:JSON.stringify(payload)
            });
            
            如果 (!response.ok) {
                抛出新的错误(`HTTP 错误!状态:${response.status}`);;
            }
            
            返回 await response.json();;
            
        } catch (error) {
            console.error('更新失败:', error);;
            抛出错误;
        }
    }
    
    异步的定时更新操作(chatflowId, 间隔时间 = 3600000) {
        /**
         * 为动态内容源设置定时更新
         * @param {string} chatflowId - 聊天流程ID
         * @param {number} interval - 时间间隔(以毫秒为单位)(默认值:1小时)
         */
        
        控制台输出:`开始每${interval/1000}秒执行一次预定的更新操作`;
        
        const performUpsert = async () => {
            尝试 {
                控制台输出:“正在执行预定的更新操作...”;
                
                const result = await this.upsertWithoutFiles(chatflowId, {
                    addMetadata: {
                        scheduledUpdate: true,,
                        时间戳:new Date().toISOString()
                    }
                });
                
                控制台输出:`已完成的计划更新操作:已处理 ${result.numAdded || 0} 个块`;
                
            } catch (error) {
                console.error('计划更新失败:', error);;
            }
        };
        
        // 执行初始更新插入操作
        等待执行更新操作;
        
        // 设置定期更新
        返回 setInterval(performUpsert, interval);;
    }
}

// 使用示例
const uploader = new NoFileUploader();;

异步函数 performUpsert() {
    尝试 {
        const result = await uploader.upsertWithoutFiles(
            'chatflow-id',
            {
                数据块大小:800,
                数据块重叠:100
            }
        );
        
        console.log('更新结果:', result);;
        
    } catch (error) {
        console.error('更新失败:', error);;
    }
}

// 一次性更新
等待执行更新操作();;

// 设置定时更新(每30分钟一次)
const schedulerHandle = await uploader.scheduledUpsert(
    'dynamic-content-chatflow-id',
    30 * 60 * 1000
);

// 稍后停止计划更新:
// 清除调度器句柄的间隔时间;
# 使用cURL进行基本更新
使用curl命令,通过POST方法,向http://localhost:3000/api/v1/vector/upsert/your-chatflow-id发送请求
  -H "Content-Type: application/json"

# 使用配置覆盖进行更新
使用curl命令,通过POST方法,向http://localhost:3000/api/v1/vector/upsert/your-chatflow-id发送请求
  -H "Content-Type: application/json" \
d
    "overrideConfig": {
      "returnSourceDocuments": true
    }
  }'
  
# 使用自定义头进行身份验证的更新(如果已配置)
使用curl命令,通过POST方法,向http://localhost:3000/api/v1/vector/upsert/your-chatflow-id发送请求
  -H "Authorization: Bearer your-api-token" \
  -H "Content-Type: application/json"

响应字段

字段
类型
描述

numAdded

number

向向量存储中添加的新块数量

numDeleted

number

删除的块数(如果使用记录管理器)

numSkipped

number

跳过的块数(如果使用记录管理器)

numUpdated

number

已更新的现有块数(如果使用记录管理器)

优化策略

1. 批处理策略

python def intelligent_batch_processing(files: List[str], chatflow_id: str) -> Dict[str, Any]: “根据文件大小和类型,以优化的批量方式处理文件。”

# 按大小和类型对文件进行分组
小文件列表 = []
large_files = []

对于文件列表中的每个文件路径:
    文件大小 = os.path.getsize(文件路径)
    如果文件大小 > 5_000_000:# 5MB
        large_files.append(file_path)
    否则:
        将文件路径添加到small_files列表中

结果 = {'成功': [], '失败': [], '总块数': 0}

# 单独处理大文件
对于large_files中的每个文件路径:
    打印(f"正在处理大文件:{file_path}")
    # 使用自定义配置进行单独处理
    # ...实现

# 分批处理小文件
批量大小 = 5
对于i在0到小文件列表长度之间,以批量大小为步长进行遍历:
    批量 = 小文件[i:i + 批量大小]
    打印(f"正在处理一批{batch长度}个小文件")
    # 批处理
    # ...实现

返回结果


### 2. 元数据优化

python
导入 requests 库
导入操作系统
从datetime模块导入datetime类
from typing import Dict, Any

使用优化元数据进行更新(upsert)操作(def upsert_with_optimized_metadata(chatflow_id: str, file_path: str,,
                                 def get_dict(department: str = None, category: str = None) -> Dict[str, Any]:
    """
    使用自动优化的元数据更新文档。
    """
    url = f"http://localhost:3000/api/v1/vector/upsert/{chatflow_id}"
    
    # 生成优化的元数据
    自定义元数据 = {
        'department': 部门或'general',
        'category': 类别或'documentation',
        'indexed_date': datetime.now().strftime('%Y-%m-%d'),,
        'version': '1.0'
    }
    
    优化后的元数据 = 优化元数据(文件路径, 自定义元数据)
    
    # 准备请求
    files = {'files': (os.path.basename(file_path), open(file_path, 'rb'))}
    数据 = {
        'overrideConfig': str({
            'metadata': 优化后的元数据
        ```

注:原文似乎是一个代码片段,其中包含了一些占位符和特殊字符。在中文中,我们通常不会直接翻译这些占位符或特殊字符,而是保留原样。因此,在中文翻译中,我保留了原文的格式。如果需要翻译的是具体的文本内容,请提供具体的文本,我会尽力帮助您进行翻译 }

尝试:
    响应 = requests.post(url, files=files, data=data)
    响应.raise_for_status()
    返回响应的JSON格式数据
最后:
    files['files'][1].close()

不同文档类型的示例用法

结果 = []

技术文档

技术结果 = 使用优化元数据进行更新插入( chatflow_id="技术文档聊天流程",, 文件路径为“docs/api_reference.pdf”, 部门="工程部", 类别="技术文档" ) 结果.append(技术结果)

人力资源政策

hr_result = 使用优化元数据进行更新插入(upsert_with_optimized_metadata) chatflow_id="hr-docs-chatflow",, 文件路径="policies/employee_handbook.pdf", 部门="人力资源部", 类别="政策" ) 结果.append(hr_result)

营销材料

营销结果 = 使用优化元数据进行更新插入(upsert_with_optimized_metadata) chatflow_id="营销聊天流程",, 文件路径="marketing/product_brochure.pdf", 部门="市场营销", 类别="促销" ) 结果.append(营销结果)

在枚举结果时,对每个结果进行循环: print(f"上传{i+1}:已添加{result.get('numAdded', 0)}个块")


## 故障排除

1. **文件上传失败**
   * 检查文件格式兼容性
   * 验证文件大小限制
2. **处理超时**
   * 增加请求超时时间
   * 将大文件拆分成更小的部分
   * 优化数据块大小
3. **矢量存储错误**
   * 检查向量存储连接
   * 验证嵌入模型的维度兼容性

Last updated