Running Flowise using Queue

默认情况下,Flowise在NodeJS主线程中运行。然而,当预测任务数量较多时,这种方式并不具备很好的扩展性。因此,你可以配置两种模式:main(默认)和queue

队列模式

通过设置以下环境变量,您可以在“队列”模式下运行Flowise。

变量
描述
类型
默认值

MODE

运行Flowise的模式

枚举字符串:mainqueue

main

WORKER_CONCURRENCY

一个工作进程允许并行处理多少个作业。如果你有一个工作进程,这意味着它可以处理多少个并发预测任务。更多信息

数字

10000

QUEUE_NAME

消息队列的名称

字符串

flowise-queue

QUEUE_REDIS_EVENT_STREAM_MAX_LEN

事件流会自动修剪,以使其大小不会增长太多。更多信息

数字

10000

REDIS_URL

Redis URL

字符串

REDIS_HOST

Redis主机

字符串

localhost

REDIS_PORT

Redis端口

数字

6379

REDIS_USERNAME

Redis用户名(可选)

字符串

REDIS_PASSWORD

Redis密码(可选)

字符串

REDIS_TLS

Redis TLS连接(可选)更多信息

布尔值

false

REDIS_CERT

Redis自签名证书

字符串

REDIS_KEY

Redis自签名证书密钥文件

字符串

REDIS_CA

Redis自签名证书CA文件

字符串

在“队列”模式下,主服务器将负责处理请求,并将作业发送到消息队列。主服务器不会执行作业。一个或多个工作进程会从队列中接收作业,执行作业并将结果发送回。

这允许进行动态扩展:你可以增加工作人员来处理增加的工作量,或在工作量较轻时减少工作人员。

它的工作原理如下:

  1. 主服务器从网络接收预测或其他请求,并将它们作为作业添加到队列中。

  2. 这些作业队列是等待处理的任务的基本列表。工作进程(本质上是独立的进程或线程)会获取这些作业并执行它们。

  3. 工作完成后,worker:

    • 将结果写入数据库。

    • 发送一个事件来指示作业的完成。

  4. 主服务器接收事件,并将结果发送回用户界面(UI)。

  5. Redis的发布/订阅功能也被用于将数据流式传输回用户界面(UI)。

流程图

1. 请求入口点

一个预测请求到达Express服务器后,会立即检查MODE是否为QUEUE。如果是,则该请求将从直接执行切换到异步队列处理。

2. 创造就业机会与双渠道

该系统创建了两条并行路径:

  • 作业通道:请求数据通过BullMQ转换为Redis作业,HTTP线程等待其完成

  • 流通道:通过Redis 发布/订阅建立 SSE 连接以进行实时更新

3. worker处理

独立的worker进程会轮询Redis以获取任务。当分配到任务时:

  • 重建完整的执行上下文(数据库、组件、异常控制器)

  • 以逐节点处理的方式执行工作流

  • 向Redis通道发布实时事件(令牌、工具、进度)

4. 实时通信

执行期间:

5. 完成与响应

作业完成,结果存储在Redis中:

  • HTTP线程解除阻塞,接收结果

  • SSE连接正常关闭

  • 清理资源(中止控制器、连接)

本地设置

启动Redis

在启动主服务器和辅助进程之前,需要先运行Redis。你可以在单独的机器上运行Redis,但要确保服务器和辅助进程实例能够访问它。

例如,你可以按照以下指南在Docker上运行Redis。

启动主服务器

这与默认情况下运行Flowise的方式相同,只是需要配置上述环境变量。

pnpm 启动

启动工作进程

与主服务器相同,上述环境变量必须进行配置。我们建议主实例和辅助实例都使用同一个.env文件。唯一的区别在于如何运行辅助实例。请打开另一个终端并运行:

运行`start-worker`命令

{% 提示 样式="警告" %}

主服务器和辅助服务器需要共享相同的密钥。请参阅[#for-credentials](environment-variables.md#for-credentials "mention)。对于生产环境,我们建议使用Postgres作为数据库以提高性能。

Docker 设置

方法1:预构建镜像(推荐)

此方法使用来自Docker Hub的预构建Docker镜像,使其成为最快且最可靠的部署选项。

步骤1:设置环境

docker目录中创建一个.env文件:

# 基本配置
端口=3000
WORKER_PORT=5566

# 队列配置(必填)
模式=队列
队列名称=flowise-queue
REDIS_URL=redis://redis:6379

# 可选队列设置
WORKER_CONCURRENCY=5
达到24岁即移除
当计数达到1000时移除
QUEUE_REDIS_EVENT_STREAM_MAX_LEN=1000
ENABLE_BULLMQ_DASHBOARD=false

# 数据库(可选 - 默认为SQLite)
数据库路径=/root/.flowise

# 存储
BLOB_STORAGE_PATH=/root/.flowise/storage

# 秘密密钥
SECRETKEY_PATH=/root/.flowise

# 日志记录
日志路径=/root/.flowise/logs

步骤2:部署

进入docker目录
使用docker compose根据docker-compose-queue-prebuilt.yml文件启动并后台运行容器

步骤3:验证部署

# 检查容器状态
docker compose -f docker-compose-queue-prebuilt.yml ps

# 查看日志
使用docker compose根据docker-compose-queue-prebuilt.yml文件配置启动flowise日志服务,并使用-f参数指定日志文件路径
使用docker compose根据docker-compose-queue-prebuilt.yml文件启动flowise-worker的日志输出

方法2:从源代码构建

此方法从源代码构建Flowise,对于开发或自定义修改非常有用。

步骤1:设置环境

按照方法1中的步骤,创建一个相同的.env文件。

步骤2:部署

进入docker目录
使用docker compose根据docker-compose-queue-source.yml文件启动并后台运行容器

第三步:构建流程

源代码构建将:

  • 从源代码构建主要的Flowise应用程序

  • 从源代码构建工作镜像

  • 设置Redis和联网

步骤4:监控构建

# 监控构建进度
docker compose -f docker-compose-queue-source.yml logs -f

# 检查最终状态
docker compose -f docker-compose-queue-source.yml ps

健康检查

所有组合文件都包含健康检查:

# 检查主实例运行状况
使用curl命令向http://localhost:3000/api/v1/ping发出请求

# 检查员工健康状况
使用curl命令访问http://localhost:5566/healthz

队列总结性表格

ENABLE_BULLMQ_DASHBOARD设置为true,用户即可通过访问<your-flowise-url.com>/admin/queues来查看所有作业、状态、结果和数据

Last updated