多Agent场景,子agent 之间数据读写不同步,如何解决?

AITNT-国内领先的一站式人工智能新闻资讯网站
# 热门搜索 #
多Agent场景,子agent 之间数据读写不同步,如何解决?
9065点击    2026-05-12 08:53

多Agent 系统里,经常会出现一个单 Agent 里从来不会出现的问题:一个子 Agent 刚写完数据,另一个子 Agent 立刻去读,结果是空的。


根本问题出在 Agent 的写-读模式撞上了很多数据库为单 Agent 场景设计的默认一致性配置。


接下来,这篇文章将说清楚这个矛盾从哪来,以及怎么用一行参数解决它。


01 

单 Agent 与多 Agent 的读写设计有何异同?


单 Agent RAG 的工作方式是这样的:用户提出一个问题,Agent 把问题向量化,去 Milvus 检索 Top-K 文档片段,拼成 prompt 喂给模型,模型输出答案。整条链路里,向量数据库是默认只读的——数据在应用启动时、文档更新时已经写好了,推理过程中没有人再继续往里写东西。


但多 Agent 系统里有两类角色:Writer Agent负责执行任务、调用外部工具、发现新信息,把结果 embedding 后写入 Milvus 作为共享记忆;Reader Agent收到协调信号后,从 Milvus 检索最新记忆,基于这些上下文生成下一步行动。


两者是独立的进程或线程,通过消息、回调或事件协调。Writer 写完,立刻通知 Reader,这个间隔是毫秒级的。


多Agent场景,子agent 之间数据读写不同步,如何解决?


在这种情况下,Writer 写完、信号发出、Reader 立刻查,这种模式会导致Reader的查询动作,恰好落在“数据已写入但未对Query Node可见”的时间窗口内,最终返回空结果。


那么,这个时间窗口是怎么产生的,又要如何解决?


02 

Milvus如何用四档一致性控制数据对外可见的时机


出现“写后读空”的关键,在于我们对Milvus的insert()操作存在一个认知误区——insert()返回成功,不代表数据已经可以被查询。


具体来说,Milvus 的写入流程分两段,insert()操作在第一阶段完成后就会返回“成功”,但数据此时只是被写入了消息队列(类似Kafka producer ack的语义)安全落盘,但消费者(Query Node)尚未处理,此时读取自然无法看到新数据。


如图所示,这个“写入成功到数据进入Growing Segment、查询可见”的几十毫秒到几秒的时间差,就是多Agent场景下读空问题的核心诱因。


多Agent场景,子agent 之间数据读写不同步,如何解决?


要想解决这个问题,在Milvus中,我们可以通过guarantee_timestamp来控制数据的可见性:每次search()调用都携带上这个时间戳,Query Node执行查询前会先检查自己使用的数据版本是否追上了这个时间戳?没追上就等待,追上了再执行查询。


而我们在代码中设置的consistency_level(一致性级别),本质上就是在控制guarantee_timestamp的设定逻辑。


Milvus提供四档一致性选项,可在创建Collection时设置默认值,也可在每次search()调用时单独覆盖,不同级别对应不同的可见性、性能代价,具体如下:


多Agent场景,子agent 之间数据读写不同步,如何解决?


这里需要重点说明:Milvus创建Collection的默认一致性级别是Bounded,这对单Agent RAG场景是完全合理的——因为单Agent场景没有推理过程中的写入操作,Bounded的5秒窗口不会被触发,既能保证检索性能,又能满足需求,是性能与体验的双赢选择。


但对于Writer写完数据后Reader立即查询的多Agent事件驱动场景,此时查询的guarantee_timestamp如果仍落在Bounded的5秒窗口内,新写入的数据就会不可见,返回空结果。


而解决这个问题的关键,就是将consistency_level从默认的Bounded,切换到适配多Agent场景的strong级别。


03

实验:Bounded 查不到,Strong 查得到


为了直观验证上述结论,我们设计了一组实验:通过模拟生产环境的高写压,让Query Node始终处于数据追赶状态,再执行“写入一条数据后立即查询”的操作,对比Bounded和Strong两种一致性级别的查询结果。


实验设计思路


通过两个机制模拟生产环境的写压,确保Query Node始终处于忙碌的追赶状态:


  • preload预写:提前写入大批量数据,制造WAL(Write-Ahead Log)历史积压;
  • storm writers后台写入:用多个后台线程持续高速写入数据,维持Query Node的追赶压力。


每轮实验中,先写入一条带唯一标记(marker)的记录,然后立即分别用Bounded和Strong级别查询该记录——一旦出现Bounded=0、Strong=1,即判定问题复现成功。


运行前提:pymilvus >= 2.6.0 已安装,Milvus 服务可访问。


#!/usr/bin/env python3

import argparse

import itertools

import random

import threading

import time

import uuid

from contextlib import suppress

from pymilvus import DataType, MilvusClient

defmake_vector(seed, dim):

    rng = random.Random(seed)

    vec = [rng.uniform(-1.0, 1.0) for _ inrange(dim)]

    norm = sum(x * x for x in vec) ** 0.5or1.0

return [x / norm for x in vec]

defmake_records(start_id, count, dim, marker, round_no):

return [

        {

"id": start_id + i,

"vector": make_vector(start_id + i, dim),

"marker": marker,

"round": round_no,

        }

for i inrange(count)

    ]

defcreate_collection(client, name, dim):

if client.has_collection(name):

        client.drop_collection(name)

    schema = client.create_schema(auto_id=False, enable_dynamic_field=False)

    schema.add_field("id", DataType.INT64, is_primary=True)

    schema.add_field("vector", DataType.FLOAT_VECTOR, dim=dim)

    schema.add_field("marker", DataType.VARCHAR, max_length=128)

    schema.add_field("round", DataType.INT64)

    index_params = client.prepare_index_params()

    index_params.add_index(

        field_name="vector",

        index_type="AUTOINDEX",

        metric_type="COSINE",

    )

    client.create_collection(

        collection_name=name,

        schema=schema,

        index_params=index_params,

        consistency_level="Bounded",

    )

    client.load_collection(name)

defsearch_marker(client, name, vector, marker, consistency, timeout):

    result = client.search(

        collection_name=name,

        data=[vector],

        anns_field="vector",

        search_params={"metric_type": "COSINE"},

filter=f'marker == "{marker}"',

        limit=1,

        output_fields=["id", "marker", "round"],

        consistency_level=consistency,

        timeout=timeout,

    )

    hits = result[0] if result else []

returnlen(hits), hits

defwriter_storm(uri, name, dim, stop_event, id_counter, batch_size, sleep_seconds):

    client = MilvusClient(uri=uri)

whilenot stop_event.is_set():

        start_id = next(id_counter)

        records = make_records(start_id, batch_size, dim, "storm", -1)

with suppress(Exception):

            client.insert(collection_name=name, data=records)

if sleep_seconds > 0:

            time.sleep(sleep_seconds)

defmain():

    parser = argparse.ArgumentParser()

    parser.add_argument("--uri", default="http://localhost:19530")

    parser.add_argument("--collection", default="")

    parser.add_argument("--dim", type=int, default=16)

    parser.add_argument("--attempts", type=int, default=200)

    parser.add_argument("--bounded-timeout", type=float, default=2.0)

    parser.add_argument("--strong-timeout", type=float, default=30.0)

    parser.add_argument("--storm-writers", type=int, default=2)

    parser.add_argument("--storm-batch-size", type=int, default=2000)

    parser.add_argument("--storm-sleep", type=float, default=0.0)

    parser.add_argument("--preload", type=int, default=5000)

    parser.add_argument("--keep", action="store_true")

    args = parser.parse_args()

    collection = args.collection orf"consistency_probe_{int(time.time())}_{uuid.uuid4().hex[:8]}"

    writer = MilvusClient(uri=args.uri)

    bounded_reader = MilvusClient(uri=args.uri)

    strong_reader = MilvusClient(uri=args.uri)

    stop_event = threading.Event()

    storm_threads = []

    storm_id_counter = itertools.count(10_000_000, args.storm_batch_size)

print(f"uri={args.uri}")

print(f"collection={collection}")

try:

        create_collection(writer, collection, args.dim)

if args.preload > 0:

print(f"preload {args.preload} rows")

            writer.insert(

                collection_name=collection,

                data=make_records(1_000_000, args.preload, args.dim, "preload", -2),

            )

            _, _ = search_marker(

                strong_reader,

                collection,

                make_vector(1_000_000, args.dim),

"preload",

"Strong",

                args.strong_timeout,

            )

for _ inrange(args.storm_writers):

            thread = threading.Thread(

                target=writer_storm,

                args=(

                    args.uri,

                    collection,

                    args.dim,

                    stop_event,

                    storm_id_counter,

                    args.storm_batch_size,

                    args.storm_sleep,

                ),

                daemon=True,

            )

            thread.start()

            storm_threads.append(thread)

for attempt inrange(args.attempts):

            marker = f"probe_{attempt}_{uuid.uuid4().hex[:12]}"

            record_id = attempt + 1

            vector = make_vector(record_id, args.dim)

            record = {

"id": record_id,

"vector": vector,

"marker": marker,

"round": attempt,

            }

            insert_start = time.perf_counter()

            writer.insert(collection_name=collection, data=[record])

            insert_ms = (time.perf_counter() - insert_start) * 1000

            bounded_start = time.perf_counter()

            bounded_count, bounded_hits = search_marker(

                bounded_reader, collection, vector, marker,

"Bounded", args.bounded_timeout,

            )

            bounded_ms = (time.perf_counter() - bounded_start) * 1000

            strong_start = time.perf_counter()

            strong_count, strong_hits = search_marker(

                strong_reader, collection, vector, marker,

"Strong", args.strong_timeout,

            )

            strong_ms = (time.perf_counter() - strong_start) * 1000

print(

f"attempt={attempt:03d} insert={insert_ms:.1f}ms "

f"bounded={bounded_count}({bounded_ms:.1f}ms) "

f"strong={strong_count}({strong_ms:.1f}ms)"

            )

if bounded_count == 0and strong_count > 0:

print("\nREPRODUCED: Bounded missed the just-inserted row, Strong found it.")

print(f"marker={marker}")

print(f"strong_hit={strong_hits[0] if strong_hits elseNone}")

return

if bounded_hits andnot strong_hits:

print("Unexpected: Bounded found the row but Strong did not; check service config.")

print("\nNot reproduced. QueryNode likely consumed the insert before each Bounded search.")

print("Try increasing --storm-writers/--storm-batch-size/--attempts, or run against a cluster under write load.")

finally:

        stop_event.set()

for thread in storm_threads:

            thread.join(timeout=1)

if args.keep:

print(f"kept collection={collection}")

else:

with suppress(Exception):

                writer.drop_collection(collection)

print(f"dropped collection={collection}")

if __name__ == "__main__":

    main()


运行命令(替换uri为自身Milvus服务地址):


python probe.py --uri http://localhost:19530 \--storm-writers 2\

                --storm-batch-size 2000\

                --preload 5000


运行结果:(与文档中报错URL对应的服务地址一致):


uri=http://localhost:19530

collection=consistency_probe_1777278755_71fb2959

preload5000 rows

attempt=000 insert=47.7ms bounded=0(100.7ms) strong=1(171.7ms)

REPRODUCED: Bounded missed the just-inserted row, Strong found it.

marker=probe_0_96fadc07d29e

strong_hit={'id': 1, 'distance': 1.0, 'entity': {'marker': 'probe_0_96fadc07d29e', 'round': 0, 'id': 1}}

dropped collection=consistency_probe_1777278755_71fb2959


实验结论


第一次尝试(attempt=000)即复现:bounded=0 说明 Query Node 正忙于消费 storm writers 制造的写入积压,Bounded 的 guarantee_timestamp 落在本次写入之前,新记录对此次查询不可见;strong=1 说明 Strong 强制 Query Node 追赶到全局最新时间戳后再返回,新记录被稳定查到。


其中distance=1.0确认了查询向量与写入向量完全一致,排除了向量不匹配的干扰。这进一步证明:问题的核心不是数据未写入,而是一致性级别导致的数据可见性时序冲突,与原文开头提出的多Agent写后读空问题完全吻合。


04 

不是所有场景都需要 Strong


虽然consistency_level="Strong" 能解决多 Agent 写后立刻读的问题,但它需要等待所有并发写入同步完成,会牺牲一定的性能。


因此,我们无需盲目将所有场景都设置为Strong级别,核心判断标准是:写入和查询之间是否有明确的因果关系,以及查询对数据新鲜度的要求。


结合多Agent常见场景,我们整理了针对性的一致性级别推荐方案,兼顾性能与一致性需求:


有明确因果——Writer 写完触发 Reader 查,流水线上一阶段写完触发下一阶段读,用 Strong。


无固定因果、但必须看到最新——多个 Agent 并发读写共享状态,没有固定上下游,任何人的写入都可能影响其他人的决策。用 Strong,等全局最新。


多Agent场景,子agent 之间数据读写不同步,如何解决?


尾声


在实际开发中,很多人会用time.sleep(N)或无视一致性配置的方式“规避”读空问题,但这两种方式都不可靠:time.sleep靠猜测时间窗口,无法适配不同负载场景;无视一致性则完全靠运气,会导致系统偶发异常,难以排查。


其实,consistency_level参数的作用非常简单:告诉Milvus这次查询需要看到多新的数据。将默认的Bounded改为Strong,就为多Agent的“写后立即读”提供了确定性的可见性保证——这一行参数的差距,就是多Agent系统稳定运行与偶发空结果之间的全部距离。


总结来说:单Agent RAG场景,Milvus的默认配置完全够用;但在多Agent事件驱动场景中,你需要明确告诉Milvus“这次查询要看到最新数据”,通过一行参数调整一致性级别,就能彻底解决写后读空的核心难题。


作者介绍


多Agent场景,子agent 之间数据读写不同步,如何解决?

Zilliz黄金写手:尹珉


文章来自于"Zilliz",作者 "尹珉"。

AITNT-国内领先的一站式人工智能新闻资讯网站
AITNT资源拓展
根据文章内容,系统为您匹配了更有价值的资源信息。内容由AI生成,仅供参考
1
智能体

【开源免费】AutoGPT是一个允许用户创建和运行智能体的(AI Agents)项目。用户创建的智能体能够自动执行各种任务,从而让AI有步骤的去解决实际问题。

项目地址:https://github.com/Significant-Gravitas/AutoGPT


【开源免费】MetaGPT是一个“软件开发公司”的智能体项目,只需要输入一句话的老板需求,MetaGPT即可输出用户故事 / 竞品分析 / 需求 / 数据结构 / APIs / 文件等软件开发的相关内容。MetaGPT内置了各种AI角色,包括产品经理 / 架构师 / 项目经理 / 工程师,MetaGPT提供了一个精心调配的软件公司研发全过程的SOP。

项目地址:https://github.com/geekan/MetaGPT/blob/main/docs/README_CN.md

2
RAG

【开源免费】graphrag是微软推出的RAG项目,与传统的通过 RAG 方法使用向量相似性作为搜索技术不同,GraphRAG是使用知识图谱在推理复杂信息时大幅提高问答性能。

项目地址:https://github.com/microsoft/graphrag

【开源免费】Dify是最早一批实现RAG,Agent,模型管理等一站式AI开发的工具平台,并且项目方一直持续维护。其中在任务编排方面相对领先对手,可以帮助研发实现像字节扣子那样的功能。

项目地址:https://github.com/langgenius/dify


【开源免费】RAGFlow是和Dify类似的开源项目,该项目在大文件解析方面做的更出色,拓展编排方面相对弱一些。

项目地址:https://github.com/infiniflow/ragflow/tree/main


【开源免费】phidata是一个可以实现将数据转化成向量存储,并通过AI实现RAG功能的项目

项目地址:https://github.com/phidatahq/phidata


【开源免费】TaskingAI 是一个提供RAG,Agent,大模型管理等AI项目开发的工具平台,比LangChain更强大的中间件AI平台工具。

项目地址:https://github.com/TaskingAI/TaskingAI

3
prompt

【开源免费】LangGPT 是一个通过结构化和模板化的方法,编写高质量的AI提示词的开源项目。它可以让任何非专业的用户轻松创建高水平的提示词,进而高质量的帮助用户通过AI解决问题。

项目地址:https://github.com/langgptai/LangGPT/blob/main/README_zh.md

在线使用:https://kimi.moonshot.cn/kimiplus/conpg00t7lagbbsfqkq0