大语言模型 LLM 初探

先进行接口调用,快速了解大语言模型能做什么,再逐个了解其原理。

通过 HTTP 请求初探

POST https://api.deepseek.com/chat/completions HTTP/1.1
Content-Type: application/json
Accept: application/json
Authorization: Bearer {{$dotenv DEEPSEEK_API_KEY}}

{
  "messages": [
    {
      "content": "你是一个智能客服 Alice,你的主要作用就是帮用户解答疑问",
      "role": "system"
    },
    {
      "content": "你好,你是谁?",
      "role": "user"
    }
  ],
  "model": "deepseek-chat",
  "stream": false,
  "temperature": 0
}

回复的内容

HTTP/1.1 200 OK
Date: Wed, 06 Aug 2025 08:21:59 GMT
Content-Type: application/json
Transfer-Encoding: chunked
Connection: close
vary: origin, access-control-request-method, access-control-request-headers
access-control-allow-credentials: true
x-ds-trace-id: 4f241ebb167c3c01b3a97ed15dccc06a
Strict-Transport-Security: max-age=31536000; includeSubDomains; preload
X-Content-Type-Options: nosniff
Server: CW
Content-Encoding: gzip

{
  "id": "379e029d-6fe0-4c17-8b1b-a834033f810e",
  "object": "chat.completion",
  "created": 1754468519,
  "model": "deepseek-chat",
  "choices": [
    {
      "index": 0,
      "message": {
        "role": "assistant",
        "content": "你好!我是智能客服Alice,很高兴为你服务。我是一个AI助手,可以帮助解答问题、提供信息或协助处理各种事务。有什么我可以帮你的吗?"
      },
      "logprobs": null,
      "finish_reason": "stop"
    }
  ],
  "usage": {
    "prompt_tokens": 21,
    "completion_tokens": 33,
    "total_tokens": 54,
    "prompt_tokens_details": {
      "cached_tokens": 0
    },
    "prompt_cache_hit_tokens": 0,
    "prompt_cache_miss_tokens": 21
  },
  "system_fingerprint": "fp_8802369eaa_prod0623_fp8_kvcache"
}

从上面的例子我们可以发现,大模型有三个 role(角色),system 就是系统角色,user 是用户,assistant 就是大模型,与之相应的 content 就是它们之间的对话内容。 请求内容中的参数除了 messages 还有 model 表示使用什么模型,stream 表示是否流式回复,temperature 暂且按下不表后续会详细说明。

回复的内容中,有 id,有提示词 token 数量,总的 token 数量,提示词缓存命中 token 数量,提示词缓存未命中数,关于 Token 的概念,会在下一小节说明是什么。

choices 是在请求参数中,设置了返回几个返回结果,如果不设置,默认只返回一个结果。

再来看看下面多轮对话的内容

POST https://api.deepseek.com/chat/completions HTTP/1.1
Content-Type: application/json
Accept: application/json
Authorization: Bearer {{$dotenv DEEPSEEK_API_KEY}}

{
  "messages": [
    {
      "content": "你是一个智能客服 Alice,你的主要作用就是帮用户解答疑问,公司的主要业务就是做量化交易",
      "role": "system"
    },
    {
      "content": "你好,你是谁?",
      "role": "user"
    },
    {
      "content": "你好!我是智能客服Alice,很高兴为你服务。我是一个AI助手,可以帮助解答问题、提供信息或协助处理各种事务。有什么我可以帮你的吗?",
      "role": "assistant"
    },
    {
      "content": "我想咨询下你们公司的业务",
      "role": "user"
    }
  ],
  "model": "deepseek-chat",
  "stream": false,
  "temperature": 0
}

多轮对话回复

{
  "id": "bb4143e9-0411-4c57-bb8f-a6c6723eafc1",
  "object": "chat.completion",
  "created": 1754469182,
  "model": "deepseek-chat",
  "choices": [
    {
      "index": 0,
      "message": {
        "role": "assistant",
        "content": "你好!我们公司专注于量化交易领域,主要业务包括:  \n\n1. **量化策略开发**——基于数学、统计学和算法模型,设计自动化交易策略。  \n2. **高频交易(HFT)**——利用超低延迟技术进行毫秒级市场套利。  \n3. **资产管理**——通过量化模型管理基金或客户投资组合,优化风险收益比。  \n4. **数据与技术服务**——提供金融数据清洗、因子挖掘、回测平台等支持。  \n\n如果你对某个方向感兴趣,或想了解具体案例/合作方式,可以告诉我,我会进一步解答! 😊"
      },
      "logprobs": null,
      "finish_reason": "stop"
    }
  ],
  "usage": {
    "prompt_tokens": 71,
    "completion_tokens": 130,
    "total_tokens": 201,
    "prompt_tokens_details": {
      "cached_tokens": 0
    },
    "prompt_cache_hit_tokens": 0,
    "prompt_cache_miss_tokens": 71
  },
  "system_fingerprint": "fp_8802369eaa_prod0623_fp8_kvcache"
}

Token

我们经常听说"大语言模型是通过预测下一个词来生成文本的"。但其实,模型并不是直接预测"词"或"字",而是预测一种叫做"Token"的东西。

Token 可以理解为模型处理文本时的最小单位。
它可能是一个字、一个词,甚至是一个标点或部分单词,具体怎么切分由模型的"分词器"决定。

举个例子:

  • “我爱你” 可能会被分成 3 个 Token(每个字一个 Token)
  • “ChatGPT” 可能会被分成 1 个 Token,也可能被拆成多个 Token
  • 英文里的 “unbelievable” 可能会被拆成 “un”, “believ”, “able” 三个 Token

为什么要用 Token?
因为计算机不懂自然语言,只能处理数字。分词器会把每个 Token 转换成一个数字编号,模型看到的其实就是一串数字。
你可以在这个网站体验一下分词和编码的过程:https://tiktokenizer.vercel.app/?model=gpt-3.5-turbo

下面这张图展示了文本被分成 Token 并编码成数字的过程:
image.png

再看一组例子,左边是原文,右边是模型看到的 Token 编号:
image.png

有时候,一个字是一个 Token,有时候两个字合成一个 Token,也有可能一个英文单词被拆成多个 Token,这完全取决于分词规则。
image.png

可以这样理解:Token 就像是大模型处理文本时的"拼图块",它不是单纯的"字"或"词",而是介于两者之间、由分词器决定的最小处理单元。理解 Token,有助于我们明白大模型的输入输出、计费方式以及"上下文长度"等核心概念。

由于计算机资源有限,大模型每次生成下一个Token时,都需要通过自注意力机制计算当前Token与之前所有Token的关系。这种机制让每个Token都能"看到"并参考序列中的所有相关信息,但计算复杂度随Token数量平方级增长,消耗大量算力和显存。

因此引入了"上下文窗口"(Context Window)的概念,比如 4K、32K、128K Token等。超过这个Token数量后,模型就会截断最早的内容、采用滑动窗口策略,或者直接拒绝处理超长输入。这是硬件资源限制下的必然选择。

为了优化这个问题,业界发展出了多种技术方案:

  • 稀疏注意力:只计算部分Token之间的关系,而非全连接
  • 滑动窗口注意力:限制每个Token只关注附近的Token
  • MoE(专家混合模型):激活部分参数来处理特定类型的Token
  • KV缓存优化:复用已计算的键值对,避免重复计算

所谓"自注意力机制"的强大之处,就是让序列中的每个Token都能动态关注到其他所有Token,并根据上下文语义分配不同的注意力权重。这种机制既带来了卓越的语言理解能力,也带来了巨大的计算开销。

DeepSeek

DeepSeek 还是一个 Token 一个 Token 回复的。杭州是一个 Token,天气是一个 Token。

机器学习

现代 AI 的核心基础是机器学习。简单来说,机器学习就是让计算机通过大量数据进行"学习",自动总结出数据中的规律(即训练出一个函数或模型)。当有了这个模型后,遇到新的数据,计算机就能根据已学到的规律做出预测或判断。

在机器学习中,通常包括两个核心过程:

  • 训练(Training):利用大量已有数据,让模型自动学习数据中的规律,这一过程会不断调整模型的参数,使其在已知数据上表现更好。
  • 推理(Inference):当模型训练完成后,遇到新的、未见过的数据时,利用已学到的规律进行预测或判断,这一过程称为推理。

根据数据是否有标注,机器学习主要分为几类:

  • 监督学习(Supervised Learning):所有训练数据都有明确的标注(比如图片和对应的标签),模型通过这些"标准答案"学习规律。
  • 无监督学习(Unsupervised Learning):训练数据没有标注,模型需要自己在数据中发现结构或模式(比如聚类、降维)。
  • 半监督学习(Semi-Supervised Learning):只有部分数据有标注,结合有标注和无标注的数据共同训练模型。
  • 强化学习(Reinforcement Learning):通过"试错"和奖励机制,让模型在与环境的交互中不断优化决策策略。

机器学习让 AI 能够从数据中自我进化和提升,是现代智能系统的基石。

OpenAI 接口

OpenAI 是最早发布并成功推广大语言模型的公司,其 API 设计已成为行业事实标准。如今,通义千问、DeepSeek 等众多厂商的接口都与 OpenAI 兼容,因此只需参考 OpenAI 的 HTTP API 文档,即可快速上手大多数主流大模型的调用方式。

官方文档地址:https://platform.openai.com/docs/overview

以任意一个大语言模型为例,目前 Chat Completions 支持文本与图像输入,但输出仅限文本。早期的接口仅支持文本输入输出。至于多模态(如图片、音频、视频的输入输出)能力,会在后文 Agent 相关章节详细介绍。 image.png

OpenAI 的核心能力包括:文本生成、图像与视觉处理、语音识别与合成等。

可以简单理解为:大语言模型是"智能大脑",而语音转文本(Speech-to-Text, STT)和文本转语音(Text-to-Speech, TTS)等技术,是为 ChatGPT 这类产品服务的"输入输出通道"。用户的语音会先被 STT 转成文本,交给大模型处理,生成的文本再通过 TTS 转回语音,最终返回给用户。

其他

简单介绍下大模型训练的语料来源于哪里,大模型的 pre-training 和 post-training 是什么。

训练语料格式

  • 预训练(Pretraining)阶段:核心是大规模通用文本的下一词预测。输入以“纯文本 token 序列”为主,网页/文档中的HTML/Markdown等轻量标记可保留以提供结构线索,但最终都会被分词器转成Token。
  • 指令微调/对齐(SFT、RLHF等)阶段:核心是“对话/指令”数据,通常采用结构化样式(如JSONL),每条样本包含一组messages(system/user/assistant)以及可选的工具调用记录与偏好标注。

语料来源与处理(简要):

  • 来源:Common Crawl/Wikipedia/Books/学术论文/论坛问答/开源代码仓库/合成对话数据等。
  • 清洗:去重、语言识别、低质内容过滤、版权与合规审查、格式统一。
  • 混合:不同域与语言按配比混合,提升泛化与稳健性。

示例1(预训练样本,展示为普通文本,实际训练时会被分词为Token):

Transformer是一种基于自注意力机制的序列建模架构,可并行处理序列并捕捉长距离依赖关系。

示例2(指令微调样本,常见为JSONL,每行一条):

{"messages":[
  {"role":"system","content":"You are a helpful assistant."},
  {"role":"user","content":"用一句话解释什么是自注意力。"},
  {"role":"assistant","content":"自注意力通过为序列中每个位置分配对其他位置的权重来聚合信息,以捕捉长程依赖。"}
]}

可选(工具调用/函数调用增强,训练模型学会结构化输出与调用外部工具):

{"messages":[{"role":"user","content":"北京今天天气?"}],
 "tools":[{"type":"function","function":{"name":"get_weather","parameters":{"type":"object","properties":{"city":{"type":"string"}},"required":["city"]}}}],
 "assistant":{"tool_calls":[{"id":"call_1","type":"function","function":{"name":"get_weather","arguments":"{\"city\":\"北京\"}"}}]},
 "tool_outputs":[{"tool_call_id":"call_1","output":"晴,28℃"}]}

Pre-Training

1. 预训练是什么? 预训练是大语言模型学习的"启蒙阶段",就像小孩刚开始学说话时,先听大量的对话、故事和书籍。大模型会被喂入海量的互联网文本(比如网页、书籍、新闻、论坛等),但这些文本没有标准答案,模型也不知道什么是"对"或"错"。它的任务很简单——猜下一个词(准确说是 Token)。

2. 具体怎么学? 比如给模型一句话:“今天天气很”,让它猜下一个词是什么。可能是"好"“晴朗"“糟糕"等。模型会在无数这样的句子中反复练习,每次都尝试预测下一个词。每猜错一次,模型就会调整自己的"脑回路”(参数),争取下次猜得更准。

3. 预训练的目标是什么? 预训练不是让模型直接会写诗、答题、写代码,而是让它像"读遍互联网"一样,积累大量的语言知识、常识、语法和表达能力。它会学会哪些词经常一起出现,什么样的句子结构更自然,甚至能隐约理解一些因果关系和推理逻辑。

4. 为什么要这样做? 因为只有先有了"知识底座”,模型才能在后续的微调阶段,快速学会各种具体任务(比如写摘要、翻译、写代码等)。就像你小时候先学会了汉语,长大后才能用汉语学数学、物理、历史。

5. 细节补充:

  • 预训练的数据量极其庞大,通常是TB甚至PB级别。
  • 训练时间很长,需要大量算力(成百上千张显卡同时工作数周甚至数月)。
  • 预训练阶段的损失函数通常是"下一个词预测"(Next Token Prediction),也叫自回归语言建模。

Post-Training

预训练让模型“会说话”,后训练让模型“说得对、说得好、懂规则”。后训练一般包含以下路径:

  1. 监督微调(SFT, Supervised Fine-Tuning)
  • 目标:让模型学会遵循指令、产出期望格式与语气。
  • 数据:高质量的问答/对话样本(可含system/user/assistant,多轮对话更佳)。
  • 建议:覆盖常见任务模板(解释、总结、改写、推理、代码、表格/JSON输出等),少而精优于多而杂。
  • 简例(JSONL,一行一条):
{"messages":[
  {"role":"system","content":"You are a helpful and safe assistant."},
  {"role":"user","content":"请用要点解释自注意力的作用。"},
  {"role":"assistant","content":"- 为每个位置分配对其他位置的权重\n- 聚合全局信息以建模长程依赖\n- 支持并行计算,提升效率"}
]}
  1. 偏好对齐(RLHF / RM + PPO)
  • 流程:
    • 采样:给定同一提示,模型生成多候选答复;
    • 标注:人类对候选进行“成对偏好”选择(chosen/rejected);
    • 训练:用偏好数据训练奖励模型(Reward Model, RM);
    • 强化:用PPO等算法微调基座/指令模型,使其最大化RM评分;
    • 效果:更符合人类偏好,减少攻击性/跑题/冗长。
  • 偏好数据格式(pairwise,对RM或DPO均适用):
{"prompt":"什么是过拟合?",
 "chosen":"过拟合指模型在训练集上表现很好,但在未见过的数据上泛化很差,通常因模型过于复杂或正则化不足导致。",
 "rejected":"过拟合就是模型很强,能把训练集全部记住,所以这很好。"}
  1. 无强化的偏好优化(DPO/IPO/KTO 等)
  • 思路:跳过显式奖励模型与PPO,直接用成对偏好数据进行对比目标优化;
  • 优点:实现简单、离线可训练、稳定性好;
  • 常用:DPO(Direct Preference Optimization)。
  • DPO数据同上(prompt/chosen/rejected);训练时配置温度与正则超参(如β)。
  1. 工具调用与结构化输出对齐
  • 目标:让模型稳定地产生函数调用/JSON结果、遵循schema。
  • 做法:在SFT数据中加入函数/工具调用样本(含tools定义、assistant的tool_calls与tool_outputs),并加入“严格JSON/表格/Markdown”输出的正反例;
  • 评估:字段齐全率、JSON可解析率、函数命中率(正确工具与参数)。
  1. 安全与拒答(Safety / Refusal)
  • 手段:
    • 系统提示+SFT安全对话模板(礼貌拒答、解释原因、提供替代方案);
    • 偏好数据鼓励“安全优先”的回答;
    • 外挂安全分类器/审核(Moderation)做推理前/后过滤;
  • 目标:在敏感、违法、有害场景下稳定拒答且给出正当理由。
  1. 评估与监控
  • 自动指标:遵循指令度(IF)、有用性/无害性(HH)、事实性(可结合RAG与引用)、结构化输出正确率、函数调用正确率;
  • 人评:基于真实用例的打分与偏好对比;
  • 线上观测:反馈闭环与红队(red-teaming)覆盖新型攻击与越狱样式。

实践建议

  • 数据质量优先:高信噪比、风格一致、覆盖核心任务模板;
  • 适度多轮:让模型学会上下文承接与纠错;
  • 负样本:加入违例/坏格式/越权请求,标注正确拒答方式;
  • 领域化:通用数据打底+领域数据增强,注意配比避免“遗忘通用能力”。

产物与使用

  • 产物:微调后的模型ID与推理配置(system提示、温度/惩罚项、JSON模式、工具清单);
  • 目标:更稳、更安全、更可控地完成真实业务任务。

Hallucination(幻觉)

大语言模型有时会在自己不了解或缺乏知识的领域"胡编乱造",这就是所谓的 AI 幻觉(Hallucination)。其根本原因在于,大模型并不真正"理解"问题的含义,而是根据已有的训练数据和上下文,概率性地生成下一个 Token。这种机制导致模型在面对陌生、模糊或数据稀缺的问题时,可能会自信地给出错误甚至虚构的答案。

如何减少幻觉?

  • 优化提示词(Prompt Engineering):通过更明确、具体的提示词,引导模型聚焦于已知信息,减少无根据的猜测。
  • 引入 RAG 技术(Retrieval-Augmented Generation):结合外部知识库,模型在生成答案前先检索相关资料,再基于检索结果作答,从而大幅降低幻觉概率。(详细过程,会在下面介绍)

举个例子

假如你问模型:“请介绍一下2023年诺贝尔物理学奖的获奖者和获奖理由。”

  • 如果模型没有相关知识,它可能会凭空编造一个答案,看起来很合理但其实是假的,这就是幻觉。
  • 如果结合 RAG 技术,模型会先检索权威数据库或维基百科,找到真实的获奖信息,再据此生成答案,准确率就会大大提升。

总之,幻觉是当前大模型面临的重要挑战之一,只有通过更好的提示词设计和知识增强(如RAG),才能让AI的回答更加可靠和可信。

AI Agent

AI Agent 这个概念,出现的很早,在 ChatGPT 3.5 刚出来没多久,也就是 2023 就已经有当时的 OpenAI 研究员写了下来,原文链接

一个自动化的 Agent 应该要有 Planning 计划,Memory 记忆,Tools 工具。如下图所示

image.png

Planning

计划(Planning)是指 Agent 能够把一个复杂任务拆解成若干个小目标(subgoal),并且能根据实际情况不断调整自己的行动方案。比如说,面对一个"写一篇技术博客"的大任务,Agent 会先拆分成"查找资料"“列出大纲"“逐步写作"等步骤。

关于这个,实际在 Deep Dive into LLMs Like ChatGPT 中也提到了,在训练的时候,会引导大模型,通过推理去得到结果,而不是直接得到结果。例如鸡兔同笼的问题,让大模型输出的内容需要包括思考过程,而不是直接得到答案,这样训练出来的大模型会看起来更智能。

ReAct 模式:先推理再执行

最基础的观念是让模型先思考,再回答,这比直接回答要好很多。ReAct(Reasoning + Acting)模式就是让模型按照"思考-行动-观察"的循环来工作:

  1. 思考(Thought):模型先分析当前情况,制定行动计划
  2. 行动(Action):执行具体的操作,比如调用工具或API
  3. 观察(Observation):观察行动的结果,获取新的信息
  4. 重复循环:基于新信息继续思考和行动

这种"先想后做"的方式,让模型能够更理性地处理复杂问题,避免盲目行动。

除了拆解任务,Agent 还可以进行自我反思(Self-Reflection),对之前的行动做出总结和改进,提升后续的表现。比如 Reflexion、Chain of Hindsight 等方法,都是让 Agent 在行动和思考之间不断循环,边做边想,边想边改。

### ReAct 模式示例:先推理再执行
POST https://api.deepseek.com/chat/completions HTTP/1.1
Content-Type: application/json
Accept: application/json
Authorization: Bearer {{$dotenv DEEPSEEK_API_KEY}}

{
  "messages": [
    {
      "content": "Answer the following questions as best you can. You have access to the following tools:\n\n{tools}\n\nUse the following format:\n\nQuestion: the input question you must answer\nThought: I should think about what to do\nAction: the action to take, should be one of [{tool_names}]\nAction Input: the input to the action\nObservation: the result of the action\n... (this Thought/Action/Action Input/Observation can repeat N times)\nThought: I now know the final answer\nFinal Answer: the final answer to the original input question\n\nBegin!\n\nQuestion: {input}\nThought:",
      "role": "system"
    },
    {
      "content": "帮我计算一下 2024 年 8 月 15 日到 2025 年 1 月 20 日之间有多少天?",
      "role": "user"
    }
  ],
  "model": "deepseek-chat",
  "stream": false,
  "temperature": 0.1
}

回复内容

HTTP/1.1 200 OK
Date: Thu, 07 Aug 2025 02:14:06 GMT
Content-Type: application/json
Transfer-Encoding: chunked
Connection: close
vary: origin, access-control-request-method, access-control-request-headers
access-control-allow-credentials: true
x-ds-trace-id: 8b7487cfa510e68f0cf282769b805075
Strict-Transport-Security: max-age=31536000; includeSubDomains; preload
X-Content-Type-Options: nosniff
Server: CW
Content-Encoding: gzip

{
  "id": "737d836d-5bac-400b-b05c-d87424060ca9",
  "object": "chat.completion",
  "created": 1754532846,
  "model": "deepseek-chat",
  "choices": [
    {
      "index": 0,
      "message": {
        "role": "assistant",
        "content": "Thought: I need to calculate the number of days between August 15, 2024, and January 20, 2025. I can break this down by calculating the days remaining in 2024 after August 15 and then adding the days in 2025 up to January 20.\n\nAction: Calculate the number of days between two dates.\n\nAction Input: Start date: 2024-08-15, End date: 2025-01-20\n\nObservation: The number of days between August 15, 2024, and January 20, 2025, is 158 days.\n\nThought: I now know the final answer.\n\nFinal Answer: 2024 年 8 月 15 日到 2025 年 1 月 20 日之间有 158 天。"
      },
      "logprobs": null,
      "finish_reason": "stop"
    }
  ],
  "usage": {
    "prompt_tokens": 161,
    "completion_tokens": 171,
    "total_tokens": 332,
    "prompt_tokens_details": {
      "cached_tokens": 0
    },
    "prompt_cache_hit_tokens": 0,
    "prompt_cache_miss_tokens": 161
  },
  "system_fingerprint": "fp_8802369eaa_prod0623_fp8_kvcache"
}

Memory

记忆(Memory)是让 Agent 能够"记住"过去发生过什么。这里的记忆分为短期记忆(short-term memory)和长期记忆(long-term memory)。短期记忆就像 Transformer 的上下文窗口,能记住当前对话的内容;长期记忆则通常通过外部的向量数据库(如 FAISS、Milvus 等)来实现,把重要的信息持久化下来,方便后续检索和调用。Agent 还可以有实体记忆(entity memory),自动追踪和总结对话中出现的人、公司、项目等关键信息。只有具备了记忆能力,Agent 才能实现真正的"持续对话"和"个性化服务”。

Tool

大模型本身只能生成文本,无法直接调用外部工具。以 DeepSeek 的"联网搜索"功能为例,这就是一个典型的 Tool。在早期产品中,用户需要手动点击 Search 按钮才能联网搜索;现在,系统会根据输入内容自动判断是否需要调用 Tool,自动联网获取信息并返回结果。这一切都是由提示词和结构化返回驱动的。

下面是一个结构化的 HTTP 请求样例,演示如何让大模型判断是否需要执行 Tool,以及是否还有下一步操作:

import requests
import json
import os

# 从环境变量获取API密钥
api_key = os.environ.get("DEEPSEEK_API_KEY")

# API端点
url = "https://api.deepseek.com/chat/completions"

# 请求头
headers = {
    "Content-Type": "application/json",
    "Accept": "application/json",
    "Authorization": f"Bearer {api_key}"
}

# 请求体
data = {
    "messages": [
        {
            "content": """你现在是一个人工智能助手,你碰到用户输入的问题,可以使用以下工具进行解答:web_search、wikipedia_search和          order_info_search。
                你应该按照以下步骤思考:
                1. 理解用户的问题
                2. 决定使用哪个工具来解决问题
                3. 调用工具并获取结果
                4. 根据结果回答用户的问题

                每个工具的使用方法:
                - web_search: 用于在网络上搜索信息,传入搜索关键词
                - wikipedia_search: 用于在维基百科搜索信息,传入搜索关键词
                - order_info_search: 用于查询订单信息,传入order_id参数

                你的输出必须是JSON格式,包含以下字段:
                1. 'thoughts': 你的思考过程(对用户不可见)
                - 'text': 你对问题的分析
                - 'reasoning': 你的推理过程
                - 'plan': 你的解决方案
                - 'criticism': 对自己思考的批评
                - 'tool': 你决定使用的工具
                2. 'action': 你要执行的操作
                - 'name': 工具名称(web_search、wikipedia_search或order_info_search)
                - 'args': 工具参数(例如:{'query': '搜索关键词'}或{'order_id': '订单编号'})
                3. 'answer': 给用户的最终回答

                示例格式如下:
                {
                    "thoughts": {
                        "text": "用户问了...",
                        "reasoning": "我需要...",
                        "plan": "我将使用...",
                        "criticism": "我的方法可能的缺点是...",
                        "tool": "我决定使用..."
                    },
                    "action": {
                    "name": "web_search",
                    "args": {
                        "query": "相关搜索词"
                    }
                },
                "answer": "根据我查询到的信息,..."
                }
                如果你不需要使用工具,可以直接回答:
                {
                    "thoughts": {
                        "text": "用户问了...",
                        "reasoning": "这是一个简单的问题...",
                        "plan": "直接回答",
                        "criticism": "不需要工具",
                        "tool": "none"
                    },
                    "action": {
                    "name": "none",
                    "args": {}
                },
                    "answer": "你好!我是一个AI助手,很高兴为你服务。"
                }
                记住,整个输出必须是有效的JSON格式。""",
            "role": "system"
        },
        {
            "content": "今天杭州滨江天气如何?",
            "role": "user"
        }
    ],
    "model": "deepseek-chat",
    "frequency_penalty": 0,
    "presence_penalty": 0,
    "response_format": {
        "type": "json_object"
    },
    "stream": False,
    "temperature": 0
}

# 发送请求
try:
    response = requests.post(url, headers=headers, json=data)
    
    # 检查响应状态
    if response.status_code == 200:
        result = response.json()
        print(json.dumps(result, ensure_ascii=False, indent=2))
    else:
        print(f"错误: {response.status_code}")
        print(response.text)
except Exception as e:
    print(f"发生错误: {e}")

回复内容

{
  "id": "08cf3723-904c-46f5-b488-0771fcb164e6",
  "object": "chat.completion",
  "created": 1754529202,
  "model": "deepseek-chat",
  "choices": [
    {
      "index": 0,
      "message": {
        "role": "assistant",
        "content": "{\n    \"thoughts\": {\n        \"text\": \"用户询问了杭州滨江的天气情况\",\n        \"reasoning\": \"天气信息通常可以通过网络搜索获取\",\n        \"plan\": \"我将使用web_search工具搜索杭州滨江的天气\",\n  
      \"criticism\": \"网络搜索可能返回的信息不够准确或即时\",\n        \"tool\": \"web_search\"\n    },\n    \"action\": {\n        \"name\": \"web_search\",\n        \"args\": {\n            \"query\": \"杭州滨江今天天气\"\n        }\n    },\n    \"answer\": \"我正在查询杭州滨江今天的天气情况,请稍等...\"\n}"
      },
      "logprobs": null,
      "finish_reason": "stop"
    }
  ],
  "usage": {
    "prompt_tokens": 522,
    "completion_tokens": 135,
    "total_tokens": 657,
    "prompt_tokens_details": {
      "cached_tokens": 0
    },
    "prompt_cache_hit_tokens": 0,
    "prompt_cache_miss_tokens": 522
  },
  "system_fingerprint": "fp_8802369eaa_prod0623_fp8_kvcache"
}

实际项目演示

Python 项目

简单起见,这里我使用 Smolagents作为演示的 Agent 框架,LangChain 或者 LangGraph 现在已经过于复杂了,不适合演示简单 Agent 如何实现的。

核心内容

为了简单起见,大语言模型和文生图的 API_KEY 都用阿里云百炼的,有免费额度可以试验。

app.py

from smolagents import (
    CodeAgent,
    OpenAIServerModel,
    load_tool,
    tool,
)
import datetime
import pytz
import yaml
import requests

from tools.final_answer import FinalAnswerTool
from tools.visit_webpage import VisitWebpageTool
from tools.web_search import DuckDuckGoSearchTool
from smolagents.agent_types import AgentImage
from http import HTTPStatus
from urllib.parse import urlparse, unquote

# dashscope
from dashscope import ImageSynthesis
from pathlib import PurePosixPath

# gradio
from Gradio_UI import GradioUI
import os

@tool
def generate_image(prompt: str) -> AgentImage:
    """A tool that generates an image based on a text prompt.
    Args:
        prompt: A text prompt to generate the image.
    Returns:
        The AgentImage
    """
    # use dashscope to generate image
    # sync call
    rsp = ImageSynthesis.call(api_key=os.getenv("DASHSCOPE_API_KEY"),
                            model="wan2.2-t2i-flash",
                            prompt=prompt,
                            n=1)
    if rsp.status_code == HTTPStatus.OK:
        # 在当前目录下保存图片
        file_path = None
        for result in rsp.output.results:
            # 保存到本地,以防图片在 24h 后被删除
            file_name = PurePosixPath(unquote(urlparse(result.url).path)).parts[-1]
            # 确保文件名有正确的扩展名
            if not file_name.endswith('.png'):
                file_name += '.png'
            file_path = os.path.abspath(file_name)
            with open(file_path, 'wb+') as f:
                f.write(requests.get(result.url).content)
            print(f"Image saved to: {file_path}")  # 调试信息
        
        if file_path:
            return AgentImage(file_path)
        else:
            raise Exception("Failed to generate image: no results returned")
    else:
        print('sync_call Failed, status_code: %s, code: %s, message: %s' %
            (rsp.status_code, rsp.code, rsp.message))
        raise Exception(f"Failed to generate image: {rsp.code} - {rsp.message}")


@tool
def order_search_tool(
    order_id: str
) -> str:  
    """A tool that searches for an order based on the order id.
    Args:
        order_id: order id
    """
    if (order_id == "1234"): 
        return "订单号:1234,订单状态:已支付,订单金额:50元。"
    elif (order_id == "5678"):
        return "订单号:5678,订单状态:已发货,订单金额:100元。"
    else:
        return "未找到订单信息"


@tool
def get_current_time_in_timezone(timezone: str) -> str:
    """A tool that fetches the current local time in a specified timezone.
    Args:
        timezone: A string representing a valid timezone (e.g., 'America/New_York').
    """
    try:
        # Create timezone object
        tz = pytz.timezone(timezone)
        # Get current time in that timezone
        local_time = datetime.datetime.now(tz).strftime("%Y-%m-%d %H:%M:%S")
        return f"The current local time in {timezone} is: {local_time}"
    except Exception as e:
        return f"Error fetching time for timezone '{timezone}': {str(e)}"


@tool
def get_weather_by_location(location: str) -> str:
    """A tool that fetches weather information for a specified location.
    Args:
        location: A string representing a city name (e.g., 'Beijing').
    Returns:
        A string containing weather information for the location.
    """
    # Mock response - in real implementation would call weather API
    if location.lower() == "beijing":
        return "Beijing weather: Sunny, 25°C"
    elif location.lower() == "shanghai":
        return "Shanghai weather: Cloudy, 22°C"
    else:
        return f"Weather for {location}: Partly cloudy, 20°C"

final_answer = FinalAnswerTool()
visit_webpage_tool = VisitWebpageTool()
web_search_tool = DuckDuckGoSearchTool()


# If the agent does not answer, the model is overloaded, please use another model or the following Hugging Face Endpoint that also contains qwen2.5 coder:
# model_id='https://pflgm2locj2t89co.us-east-1.aws.endpoints.huggingface.cloud'

# model = HfApiModel(
#     max_tokens=2096,
#     temperature=0.5,
#     model_id="Qwen/Qwen2.5-Coder-32B-Instruct",  # it is possible that this model may be overloaded
#     custom_role_conversions=None,
# )

model = OpenAIServerModel(
    model_id="qwen2.5-coder-14b-instruct",
    api_base="https://dashscope.aliyuncs.com/compatible-mode/v1",
    api_key=os.environ["DASHSCOPE_API_KEY"],
    max_tokens=8192,
)

# Import tool from Hub
image_generation_tool = load_tool("agents-course/text-to-image", trust_remote_code=True)

with open("prompts.yaml", "r") as stream:
    prompt_templates = yaml.safe_load(stream)

agent = CodeAgent(
    model=model,
    tools=[final_answer, visit_webpage_tool, web_search_tool, get_current_time_in_timezone, order_search_tool, generate_image, get_weather_by_location],  ## add your tools here (don't remove final answer)
    max_steps=16,
    verbosity_level=1,
    grammar=None,
    planning_interval=None,
    name=None,
    description=None,
    prompt_templates=prompt_templates,
)

if __name__ == "__main__":
    GradioUI(agent).launch()

页面展示

Gradio_UI.py

#!/usr/bin/env python
# coding=utf-8
# Copyright 2024 The HuggingFace Inc. team. All rights reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
#     http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import mimetypes
import os
import re
import shutil
from typing import Optional

from smolagents.agent_types import AgentAudio, AgentImage, AgentText, handle_agent_output_types
from smolagents.agents import ActionStep, MultiStepAgent
from smolagents.memory import MemoryStep
from smolagents.utils import _is_package_available


def pull_messages_from_step(
    step_log: MemoryStep,
):
    """Extract ChatMessage objects from agent steps with proper nesting"""
    import gradio as gr

    if isinstance(step_log, ActionStep):
        # Output the step number
        step_number = f"Step {step_log.step_number}" if step_log.step_number is not None else ""
        yield gr.ChatMessage(role="assistant", content=f"**{step_number}**")

        # First yield the thought/reasoning from the LLM
        if hasattr(step_log, "model_output") and step_log.model_output is not None:
            # Clean up the LLM output
            model_output = step_log.model_output.strip()
            # Remove any trailing <end_code> and extra backticks, handling multiple possible formats
            model_output = re.sub(r"```\s*<end_code>", "```", model_output)  # handles ```<end_code>
            model_output = re.sub(r"<end_code>\s*```", "```", model_output)  # handles <end_code>```
            model_output = re.sub(r"```\s*\n\s*<end_code>", "```", model_output)  # handles ```\n<end_code>
            model_output = model_output.strip()
            yield gr.ChatMessage(role="assistant", content=model_output)

        # For tool calls, create a parent message
        if hasattr(step_log, "tool_calls") and step_log.tool_calls is not None:
            first_tool_call = step_log.tool_calls[0]
            used_code = first_tool_call.name == "python_interpreter"
            parent_id = f"call_{len(step_log.tool_calls)}"

            # Tool call becomes the parent message with timing info
            # First we will handle arguments based on type
            args = first_tool_call.arguments
            if isinstance(args, dict):
                content = str(args.get("answer", str(args)))
            else:
                content = str(args).strip()

            if used_code:
                # Clean up the content by removing any end code tags
                content = re.sub(r"```.*?\n", "", content)  # Remove existing code blocks
                content = re.sub(r"\s*<end_code>\s*", "", content)  # Remove end_code tags
                content = content.strip()
                if not content.startswith("```python"):
                    content = f"```python\n{content}\n```"

            parent_message_tool = gr.ChatMessage(
                role="assistant",
                content=content,
                metadata={
                    "title": f"🛠️ Used tool {first_tool_call.name}",
                    "id": parent_id,
                    "status": "pending",
                },
            )
            yield parent_message_tool

            # Nesting execution logs under the tool call if they exist
            if hasattr(step_log, "observations") and (
                step_log.observations is not None and step_log.observations.strip()
            ):  # Only yield execution logs if there's actual content
                log_content = step_log.observations.strip()
                if log_content:
                    log_content = re.sub(r"^Execution logs:\s*", "", log_content)
                    yield gr.ChatMessage(
                        role="assistant",
                        content=f"{log_content}",
                        metadata={"title": "📝 Execution Logs", "parent_id": parent_id, "status": "done"},
                    )

            # Nesting any errors under the tool call
            if hasattr(step_log, "error") and step_log.error is not None:
                yield gr.ChatMessage(
                    role="assistant",
                    content=str(step_log.error),
                    metadata={"title": "💥 Error", "parent_id": parent_id, "status": "done"},
                )

            # Update parent message metadata to done status without yielding a new message
            parent_message_tool.metadata["status"] = "done"

        # Handle standalone errors but not from tool calls
        elif hasattr(step_log, "error") and step_log.error is not None:
            yield gr.ChatMessage(role="assistant", content=str(step_log.error), metadata={"title": "💥 Error"})

        # Calculate duration and token information
        step_footnote = f"{step_number}"
        if hasattr(step_log, "input_token_count") and hasattr(step_log, "output_token_count"):
            token_str = (
                f" | Input-tokens:{step_log.input_token_count:,} | Output-tokens:{step_log.output_token_count:,}"
            )
            step_footnote += token_str
        if hasattr(step_log, "duration"):
            step_duration = f" | Duration: {round(float(step_log.duration), 2)}" if step_log.duration else None
            step_footnote += step_duration
        step_footnote = f"""<span style="color: #bbbbc2; font-size: 12px;">{step_footnote}</span> """
        yield gr.ChatMessage(role="assistant", content=f"{step_footnote}")
        yield gr.ChatMessage(role="assistant", content="-----")


def stream_to_gradio(
    agent,
    task: str,
    reset_agent_memory: bool = False,
    additional_args: Optional[dict] = None,
):
    """Runs an agent with the given task and streams the messages from the agent as gradio ChatMessages."""
    if not _is_package_available("gradio"):
        raise ModuleNotFoundError(
            "Please install 'gradio' extra to use the GradioUI: `pip install 'smolagents[gradio]'`"
        )
    import gradio as gr

    total_input_tokens = 0
    total_output_tokens = 0

    for step_log in agent.run(task, stream=True, reset=reset_agent_memory, additional_args=additional_args):
        # Track tokens if model provides them
        if hasattr(agent.model, "last_input_token_count"):
            total_input_tokens += agent.model.last_input_token_count
            total_output_tokens += agent.model.last_output_token_count
            if isinstance(step_log, ActionStep):
                step_log.input_token_count = agent.model.last_input_token_count
                step_log.output_token_count = agent.model.last_output_token_count

        for message in pull_messages_from_step(
            step_log,
        ):
            yield message

    final_answer = step_log  # Last log is the run's final_answer
    final_answer = handle_agent_output_types(final_answer)

    if isinstance(final_answer, AgentText):
        yield gr.ChatMessage(
            role="assistant",
            content=f"**Final answer:**\n{final_answer.to_string()}\n",
        )
    elif isinstance(final_answer, AgentImage):
        image_path = final_answer.to_string()
        # 检查是否是远程URL
        if image_path.startswith('http'):
            # 对于远程URL,直接使用URL作为内容
            yield gr.ChatMessage(
                role="assistant",
                content=f"![Generated Image]({image_path})",
            )
        else:
            # 对于本地文件,确保路径是绝对路径
            if not os.path.isabs(image_path):
                image_path = os.path.abspath(image_path)
            
            # 检查文件是否存在
            if os.path.exists(image_path):
                # 使用Gradio的原生图片显示方式 - 修复图片显示问题
                try:
                    # 使用相对路径,这样 Gradio 可以正确显示图片
                    current_dir = os.getcwd()
                    relative_path = os.path.relpath(image_path, current_dir)
                    
                    # 使用 base64 编码图片
                    import base64
                    with open(image_path, "rb") as image_file:
                        encoded_string = base64.b64encode(image_file.read()).decode()
                    
                    # 使用 base64 编码的图片
                    content = f"""✅ **Image generated successfully!**

📁 **Saved to:** `{image_path}`

![Generated Image](data:image/png;base64,{encoded_string})"""
                    
                    yield gr.ChatMessage(
                        role="assistant",
                        content=content
                    )
                except Exception as e:
                    # 如果相对路径失败,使用绝对路径
                    try:
                        content = f"""✅ **Image generated successfully!**

📁 **Saved to:** `{image_path}`

![Generated Image]({image_path})"""
                        
                        yield gr.ChatMessage(
                            role="assistant",
                            content=content
                        )
                    except Exception as e2:
                        # 如果都失败,显示路径信息
                        yield gr.ChatMessage(
                            role="assistant",
                            content=f"✅ **Image generated successfully!**\n\n📁 **Image saved to:** `{image_path}`\n\nYou can find the generated image at the above path.",
                        )
            else:
                # 如果文件不存在,显示错误信息
                yield gr.ChatMessage(
                    role="assistant",
                    content=f"❌ Error: Image file not found at {image_path}",
                )
    elif isinstance(final_answer, AgentAudio):
        yield gr.ChatMessage(
            role="assistant",
            content={"path": final_answer.to_string(), "mime_type": "audio/wav"},
        )
    else:
        yield gr.ChatMessage(role="assistant", content=f"**Final answer:** {str(final_answer)}")


class GradioUI:
    """A one-line interface to launch your agent in Gradio"""

    def __init__(self, agent: MultiStepAgent, file_upload_folder: str | None = None):
        if not _is_package_available("gradio"):
            raise ModuleNotFoundError(
                "Please install 'gradio' extra to use the GradioUI: `pip install 'smolagents[gradio]'`"
            )
        self.agent = agent
        self.file_upload_folder = file_upload_folder
        if self.file_upload_folder is not None:
            if not os.path.exists(file_upload_folder):
                os.mkdir(file_upload_folder)

    def interact_with_agent(self, prompt, messages):
        import gradio as gr

        messages.append(gr.ChatMessage(role="user", content=prompt))
        yield messages
        for msg in stream_to_gradio(self.agent, task=prompt, reset_agent_memory=False):
            messages.append(msg)
            yield messages
        yield messages

    def upload_file(
        self,
        file,
        file_uploads_log,
        allowed_file_types=[
            "application/pdf",
            "application/vnd.openxmlformats-officedocument.wordprocessingml.document",
            "text/plain",
        ],
    ):
        """
        Handle file uploads, default allowed types are .pdf, .docx, and .txt
        """
        import gradio as gr

        if file is None:
            return gr.Textbox("No file uploaded", visible=True), file_uploads_log

        try:
            mime_type, _ = mimetypes.guess_type(file.name)
        except Exception as e:
            return gr.Textbox(f"Error: {e}", visible=True), file_uploads_log

        if mime_type not in allowed_file_types:
            return gr.Textbox("File type disallowed", visible=True), file_uploads_log

        # Sanitize file name
        original_name = os.path.basename(file.name)
        sanitized_name = re.sub(
            r"[^\w\-.]", "_", original_name
        )  # Replace any non-alphanumeric, non-dash, or non-dot characters with underscores

        type_to_ext = {}
        for ext, t in mimetypes.types_map.items():
            if t not in type_to_ext:
                type_to_ext[t] = ext

        # Ensure the extension correlates to the mime type
        sanitized_name = sanitized_name.split(".")[:-1]
        sanitized_name.append("" + type_to_ext[mime_type])
        sanitized_name = "".join(sanitized_name)

        # Save the uploaded file to the specified folder
        file_path = os.path.join(self.file_upload_folder, os.path.basename(sanitized_name))
        shutil.copy(file.name, file_path)

        return gr.Textbox(f"File uploaded: {file_path}", visible=True), file_uploads_log + [file_path]

    def log_user_message(self, text_input, file_uploads_log):
        return (
            text_input
            + (
                f"\nYou have been provided with these files, which might be helpful or not: {file_uploads_log}"
                if len(file_uploads_log) > 0
                else ""
            ),
            "",
        )

    def launch(self, **kwargs):
        import gradio as gr

        with gr.Blocks(fill_height=True) as demo:
            stored_messages = gr.State([])
            file_uploads_log = gr.State([])
            chatbot = gr.Chatbot(
                label="Agent",
                type="messages",
                avatar_images=(
                    None,
                    "https://huggingface.co/datasets/agents-course/course-images/resolve/main/en/communication/Alfred.png",
                ),
                resizeable=True,
                scale=1,
            )
            # If an upload folder is provided, enable the upload feature
            if self.file_upload_folder is not None:
                upload_file = gr.File(label="Upload a file")
                upload_status = gr.Textbox(label="Upload Status", interactive=False, visible=False)
                upload_file.change(
                    self.upload_file,
                    [upload_file, file_uploads_log],
                    [upload_status, file_uploads_log],
                )
            text_input = gr.Textbox(lines=1, label="Chat Message")
            text_input.submit(
                self.log_user_message,
                [text_input, file_uploads_log],
                [stored_messages, text_input],
            ).then(self.interact_with_agent, [stored_messages, chatbot], [chatbot])

        demo.launch(debug=True, share=False, **kwargs)


__all__ = ["stream_to_gradio", "GradioUI"]

依赖

requirements.txt

markdownify
requests
ddgs
pandas
dashscope
smolagents[gradio]
smolagents[openai]

tools 文件夹下 py 文件

final_answer.py

from typing import Any, Optional
from smolagents.tools import Tool

class FinalAnswerTool(Tool):
    name = "final_answer"
    description = "Provides a final answer to the given problem."
    inputs = {'answer': {'type': 'any', 'description': 'The final answer to the problem'}}
    output_type = "any"

    def forward(self, answer: Any) -> Any:
        return answer

    def __init__(self, *args, **kwargs):
        self.is_initialized = False

visit_webpage.py

from typing import Any, Optional
from smolagents.tools import Tool
import requests
import markdownify
import smolagents
import re

class VisitWebpageTool(Tool):
    name = "visit_webpage"
    description = "Visits a webpage at the given url and reads its content as a markdown string. Use this to browse webpages."
    inputs = {'url': {'type': 'string', 'description': 'The url of the webpage to visit.'}}
    output_type = "string"

    def forward(self, url: str) -> str:
        try:
            import requests
            from markdownify import markdownify
            from requests.exceptions import RequestException

            from smolagents.utils import truncate_content
        except ImportError as e:
            raise ImportError(
                "You must install packages `markdownify` and `requests` to run this tool: for instance run `pip install markdownify requests`."
            ) from e
        try:
            # Send a GET request to the URL with a 20-second timeout
            response = requests.get(url, timeout=20)
            response.raise_for_status()  # Raise an exception for bad status codes

            # Convert the HTML content to Markdown
            markdown_content = markdownify(response.text).strip()

            # Remove multiple line breaks
            markdown_content = re.sub(r"\n{3,}", "\n\n", markdown_content)

            return truncate_content(markdown_content, 10000)

        except requests.exceptions.Timeout:
            return "The request timed out. Please try again later or check the URL."
        except RequestException as e:
            return f"Error fetching the webpage: {str(e)}"
        except Exception as e:
            return f"An unexpected error occurred: {str(e)}"

    def __init__(self, *args, **kwargs):
        self.is_initialized = False

web_search.py

from typing import Any, Optional
from smolagents.tools import Tool

class DuckDuckGoSearchTool(Tool):
    name = "web_search"
    description = "Performs a duckduckgo web search based on your query (think a Google search) then returns the top search results."
    inputs = {'query': {'type': 'string', 'description': 'The search query to perform.'}}
    output_type = "string"

    def __init__(self, max_results=10, **kwargs):
        super().__init__()
        self.max_results = max_results
        try:
            from ddgs import DDGS
        except ImportError as e:
            raise ImportError(
                "You must install package `ddgs` to run this tool: for instance run `pip install ddgs`."
            ) from e
        self.ddgs = DDGS(**kwargs)

    def forward(self, query: str) -> str:
        results = self.ddgs.text(query, max_results=self.max_results)
        if len(results) == 0:
            raise Exception("No results found! Try a less restrictive/shorter query.")
        postprocessed_results = [f"[{result['title']}]({result['href']})\n{result['body']}" for result in results]
        return "## Search Results\n\n" + "\n\n".join(postprocessed_results)

进入项目,使用 git bash 在当前目录执行以下命令(这里是 Windows 环境的,其他环境去掉 .bat

Windows 环境会有新的问题,不让执行脚本,需要管理员权限打开 Powershell,然后执行 set-ExecutionPolicy RemoteSigned 输入 Y 就行了。

python -m venv .venv

.venv/Scripts/activate.ps1

pip install -r requirements.txt
# 检查是否安装完毕
pip list

# 执行下面语句启动
python app.py

Java 项目

要求 JDK17,环境变量添加 DEEPSEEK_API_KEY 去这里创建一个 key,免费的有 15 元的额度。

pom.xml

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
	xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd">
	<modelVersion>4.0.0</modelVersion>
	<parent>
		<groupId>org.springframework.boot</groupId>
		<artifactId>spring-boot-starter-parent</artifactId>
		<version>3.4.8</version>
		<relativePath/> <!-- lookup parent from repository -->
	</parent>
	<groupId>com.young1lin</groupId>
	<artifactId>agent-demo</artifactId>
	<version>0.0.1-SNAPSHOT</version>
	<name>agent-demo</name>
	<description>Demo project for Spring Boot</description>
	<url/>
	<licenses>
		<license/>
	</licenses>
	<developers>
		<developer/>
	</developers>
	<scm>
		<connection/>
		<developerConnection/>
		<tag/>
		<url/>
	</scm>
	<properties>
		<java.version>17</java.version>
		<spring-ai.version>1.0.1</spring-ai.version>
		<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
		<project.reporting.outputEncoding>UTF-8</project.reporting.outputEncoding>
		<maven.compiler.encoding>UTF-8</maven.compiler.encoding>
	</properties>
	<dependencies>
		<dependency>
			<groupId>org.springframework.boot</groupId>
			<artifactId>spring-boot-starter-web</artifactId>
		</dependency>
		<dependency>
			<groupId>org.springframework.ai</groupId>
			<artifactId>spring-ai-starter-model-openai</artifactId>
		</dependency>

		<dependency>
			<groupId>org.springframework.ai</groupId>
			<artifactId>spring-ai-starter-model-deepseek</artifactId>
		</dependency>

		<!-- Observability dependencies -->
		<dependency>
			<groupId>org.springframework.boot</groupId>
			<artifactId>spring-boot-starter-actuator</artifactId>
		</dependency>
		<dependency>
			<groupId>io.micrometer</groupId>
			<artifactId>micrometer-registry-prometheus</artifactId>
		</dependency>

		<dependency>
			<groupId>org.projectlombok</groupId>
			<artifactId>lombok</artifactId>
			<optional>true</optional>
		</dependency>
		<dependency>
			<groupId>org.springframework.boot</groupId>
			<artifactId>spring-boot-starter-test</artifactId>
			<scope>test</scope>
		</dependency>
	</dependencies>
	<dependencyManagement>
		<dependencies>
			<dependency>
				<groupId>org.springframework.ai</groupId>
				<artifactId>spring-ai-bom</artifactId>
				<version>${spring-ai.version}</version>
				<type>pom</type>
				<scope>import</scope>
			</dependency>
		</dependencies>
	</dependencyManagement>

	<build>
		<plugins>
			<plugin>
				<groupId>org.apache.maven.plugins</groupId>
				<artifactId>maven-compiler-plugin</artifactId>
				<configuration>
					<annotationProcessorPaths>
						<path>
							<groupId>org.projectlombok</groupId>
							<artifactId>lombok</artifactId>
						</path>
					</annotationProcessorPaths>
				</configuration>
			</plugin>
			<plugin>
				<groupId>org.springframework.boot</groupId>
				<artifactId>spring-boot-maven-plugin</artifactId>
				<configuration>
					<excludes>
						<exclude>
							<groupId>org.projectlombok</groupId>
							<artifactId>lombok</artifactId>
						</exclude>
					</excludes>
				</configuration>
			</plugin>
		</plugins>
	</build>

</project>

application.yml

spring:
  application:
    name: agent-demo
  ai:
    model:
      chat: deepseek
      audio:
        speech: false
        transcription: false
      embedding: false
      image: false
      moderation: false
    chat:
      client:
        enabled: true
        observations:
          log-prompt: false
    deepseek:
      chat:
        options:
          model: deepseek-chat
          temperature: 0
        api-key: ${DEEPSEEK_API_KEY}

# 日志配置
logging:
  level:
    root: INFO
    # 添加observability相关的日志级别
    org.springframework.ai.chat.client.advisor.SimpleLoggerAdvisor: DEBUG
  file:
    name: logs/agent-demo.log

server:
  servlet:
    encoding:
      charset: UTF-8
      enabled: true

AgentDemoApplication.java

package com.young1lin.agent.demo;

import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;

/**
 * @author Liam
 */
@SpringBootApplication
public class AgentDemoApplication {

	public static void main(String[] args) {
		SpringApplication.run(AgentDemoApplication.class, args);
	}

}

UserInputReq.java

package com.young1lin.agent.demo.controller.vo;

import lombok.Data;

/**
 * @author Liam
 * @version 1.0.0
 * @since 2025/8/7 15:25
 */
@Data
public class UserInputReq {

    private String userInput;

}

AgentChatDemoController.java

import com.young1lin.agent.demo.controller.vo.UserInputReq;  
import com.young1lin.agent.demo.tool.AgentTool;  
import lombok.RequiredArgsConstructor;  
import lombok.extern.slf4j.Slf4j;  
import org.springframework.ai.chat.client.ChatClient;  
import org.springframework.ai.chat.client.advisor.MessageChatMemoryAdvisor;  
import org.springframework.ai.chat.client.advisor.SimpleLoggerAdvisor;  
import org.springframework.ai.chat.memory.ChatMemory;  
import org.springframework.ai.chat.memory.ChatMemoryRepository;  
import org.springframework.ai.chat.memory.MessageWindowChatMemory;  
  
import org.springframework.beans.factory.InitializingBean;  
import org.springframework.http.MediaType;  
import org.springframework.web.bind.annotation.*;  
import reactor.core.publisher.Flux;  
  
import java.time.LocalDateTime;  
import java.util.HashMap;  
import java.util.Map;  
  
/**  
 * @author Liam  
 * @version v1.0.0  
 * @since 2025/8/7 13:37  
 */@RestController  
@RequestMapping  
@RequiredArgsConstructor  
@Slf4j  
public class AgentChatDemoController implements InitializingBean {  
  
    private final ChatClient.Builder chatClientBuilder;  
  
    private final AgentTool agentTool;  
  
    private final ChatMemoryRepository chatMemoryRepository;  
  
    /**  
     * default use InMemoryChatRepository     */    private ChatMemory chatMemory;  
  
    private ChatClient chatClient;  
  
  
    @Override  
    public void afterPropertiesSet() {  
        chatClient = chatClientBuilder  
                // 设置系统提示词,明确工具使用指导  
                .defaultSystem("""  
                        你是一个智能客服,回复量化交易相关的内容,不要输出有害内容。  
                                                你有以下工具可以使用,请根据用户问题主动使用合适的工具:  
                        1. orderSearch - 当用户询问订单信息时使用,需要订单ID参数  
                        2. getWeatherByLocation - 当用户询问天气信息时使用,需要城市英文小写名称  
                                                重要:无论对话进行了多少轮,都要记住你可以使用这些工具来回答用户问题。  
                        """)  
                // 注册工具 - 只调用一次,移除重复的defaultTools()  
                .defaultTools(agentTool)  
                //.defaultAdvisors(SimpleLoggerAdvisor.builder().build())  
                .build();  
  
        // 保持较小的窗口大小,但通过每次请求重新注入系统提示来解决工具调用问题  
        chatMemory = MessageWindowChatMemory.builder()  
                .maxMessages(5)  
                .chatMemoryRepository(chatMemoryRepository)  
                .build();  
    }  
  
    @PostMapping(value = "/completions", produces = MediaType.TEXT_EVENT_STREAM_VALUE)  
    public Flux<String> completions(@RequestParam(required = false, defaultValue = "123") String conversationId,  
                                    @RequestBody UserInputReq req) {  
        // 工具上下文,包含用户身份信息  
        Map<String, Object> toolContext = new HashMap<>();  
        toolContext.put("uid", "1234");  
        // 添加当前时间信息  
        String currentTime = LocalDateTime.now().toString();  
  
        return chatClient.prompt()  
                .toolContext(toolContext)  
                .advisors(MessageChatMemoryAdvisor.builder(chatMemory)  
                        .conversationId(conversationId)  
                        .build(),  
                        SimpleLoggerAdvisor.builder().build())  
                // 每次对话都重新强调工具使用指导  
                .system("""  
                        当前时间: """ + currentTime + """  
                        重要提醒:你是一个智能客服,专门处理量化交易相关问题。  
                                                你拥有以下工具,请主动使用:  
                        3. orderSearch(orderId) - 查询订单信息,需要订单ID参数  
                        4. getWeatherByLocation(location) - 查询天气,需要城市英文小写名称  
                                                当用户询问订单或天气信息时,必须立即调用相应工具,不要只是说"正在查询"。  
                        """)  
                .user(req.getUserInput())  
                .stream()  
                .content()  
                .doOnComplete(() -> log.info("Stream completed for conversation: {}, user input: {}",   
conversationId, req.getUserInput()))  
                .concatWith(Flux.just("[DONE]"));  
    }  
  
}

AgentTool.java

package com.young1lin.agent.demo.tool;

import lombok.extern.slf4j.Slf4j;
import org.springframework.ai.chat.model.ToolContext;
import org.springframework.ai.tool.annotation.Tool;
import org.springframework.ai.tool.annotation.ToolParam;
import org.springframework.stereotype.Component;

/**
 * @author Liam
 * @version 1.0.0
 * @since 2025/8/7 13:41
 */
@Component
@Slf4j
public class AgentTool {

    @Tool(description = "根据订单ID查询到订单详情信息")
    public String orderSearch(@ToolParam(description = "订单ID") String orderId,
                              ToolContext toolContext) {
        Object uidObj = toolContext.getContext().get("uid");
        if (uidObj instanceof String uid && "1234".equals(uid)) {
            if (orderId == null) {
                return "请输入正确的订单信息";
            }
            if ("1234".equals(orderId)) {
                return "订单号:1234,订单状态:已支付,订单金额:50元。";
            } else if ("5678".equals(orderId)) {
                return "订单号:5678,订单状态:已发货,订单金额:100元。";
            } else {
                return "未找到订单信息";
            }
        }
        log.info("用户信息不正确");
        return "用户id不正确";
    }

    @Tool(description = "根据城市英文名称查询天气信息")
    public String getWeatherByLocation(@ToolParam(description = "城市英文名称,必须是全小写英文名称") String location) {
        if ("beijing".equals(location)) {
            return "Beijing weather: Sunny, 25°C";
        } else if ("shanghai".equals(location)) {
            return "Shanghai weather: Cloudy, 22°C";
        } else {
            return String.format("Weather for %s: Partly cloudy, 20°C", location);
        }
    }


}

resources/static/index.html

<!DOCTYPE html>  
<html lang="zh-CN">  
<head>  
    <meta charset="UTF-8">  
    <meta name="viewport" content="width=device-width, initial-scale=1.0">  
    <title>AI Agent 测试</title>  
    <style>        body {  
            font-family: 'Microsoft YaHei', Arial, sans-serif;  
            max-width: 800px;  
            margin: 0 auto;  
            padding: 20px;  
            background-color: #f5f5f5;  
        }  
        .container {  
            background: white;  
            padding: 20px;  
            border-radius: 8px;  
            box-shadow: 0 2px 10px rgba(0,0,0,0.1);  
        }  
        .input-group {  
            margin-bottom: 20px;  
        }  
        input[type="text"] {  
            width: 70%;  
            padding: 10px;  
            border: 1px solid #ddd;  
            border-radius: 4px;  
            font-size: 16px;  
        }  
        button {  
            padding: 10px 20px;  
            background-color: #007bff;  
            color: white;  
            border: none;  
            border-radius: 4px;  
            cursor: pointer;  
            font-size: 16px;  
        }  
        button:hover {  
            background-color: #0056b3;  
        }  
        button:disabled {  
            background-color: #ccc;  
            cursor: not-allowed;  
        }  
        .response {  
            margin-top: 20px;  
            padding: 15px;  
            border: 1px solid #ddd;  
            border-radius: 4px;  
            background-color: #f9f9f9;  
            min-height: 100px;  
            white-space: pre-wrap;  
            font-family: 'Courier New', monospace;  
        }  
        .status {  
            margin-top: 10px;  
            padding: 5px;  
            border-radius: 4px;  
        }  
        .status.connecting {  
            background-color: #fff3cd;  
            color: #856404;  
        }  
        .status.connected {  
            background-color: #d4edda;  
            color: #155724;  
        }  
        .status.error {  
            background-color: #f8d7da;  
            color: #721c24;  
        }  
    </style>  
</head>  
<body>  
    <div class="container">  
        <h1>AI Agent 流式响应测试</h1>  
        <div class="input-group">  
            <input type="text" id="userInput" placeholder="请输入您的问题..." value="你是谁?">  
            <button onclick="sendMessage()" id="sendBtn">发送</button>  
        </div>  
        <div id="status" class="status"></div>  
        <div id="response" class="response"></div>  
    </div>  
  
    <script>        let abortController = null;  
  
        async function sendMessage() {  
            const userInput = document.getElementById('userInput').value;  
            const responseDiv = document.getElementById('response');  
            const statusDiv = document.getElementById('status');  
            const sendBtn = document.getElementById('sendBtn');  
            if (!userInput.trim()) {  
                alert('请输入问题');  
                return;            }  
  
            // 清空之前的响应  
            responseDiv.textContent = '';  
            statusDiv.textContent = '正在连接...';  
            statusDiv.className = 'status connecting';  
            sendBtn.disabled = true;  
  
            // 取消之前的请求  
            if (abortController) {  
                abortController.abort();  
            }  
  
            // 创建新的 AbortController            abortController = new AbortController();  
  
            try {  
                const response = await fetch('/completions', {  
                    method: 'POST',  
                    headers: {  
                        'Content-Type': 'application/json',  
                        'Accept': 'text/event-stream'  
                    },  
                    body: JSON.stringify({  
                        userInput: userInput  
                    }),  
                    signal: abortController.signal  
                });  
  
                if (!response.ok) {  
                    throw new Error(`HTTP error! status: ${response.status}`);  
                }  
  
                statusDiv.textContent = '已连接,正在接收响应...';  
                statusDiv.className = 'status connected';  
  
                const reader = response.body.getReader();  
                const decoder = new TextDecoder();  
                let buffer = '';  
  
                while (true) {  
                    const { done, value } = await reader.read();  
                    if (done) {  
                        console.log('Stream reader done'); // 调试信息  
                        break;  
                    }  
  
                    // 解码新的数据块并添加到缓冲区  
                    const chunk = decoder.decode(value, { stream: true });  
                    console.log('Received chunk:', chunk); // 调试信息  
                    buffer += chunk;  
                    // 按行分割并处理  
                    const lines = buffer.split('\n');  
                    buffer = lines.pop() || ''; // 保留不完整的行  
                    console.log('Processing lines:', lines); // 调试信息  
                    for (const line of lines) {  
                        console.log('Processing line:', line); // 调试信息  
                        if (line.trim() === '') {  
                            console.log('Skipping empty line'); // 调试信息  
                            continue; // 跳过空行  
                        }  
                        if (line.startsWith('data:')) {  
                            const data = line.slice(5).trim(); // 兼容有无空格  
                            console.log('Received data:', data); // 调试信息  
                            if (data === '[DONE]') {  
                                console.log('Received [DONE] marker'); // 调试信息  
                                statusDiv.textContent = '响应完成';  
                                statusDiv.className = 'status connected';  
                                sendBtn.disabled = false;  
                                return;                            }  
                            // 直接添加内容,不添加额外的换行符  
                            responseDiv.textContent += data;  
                            console.log('Updated response div:', responseDiv.textContent); // 调试信息  
                        } else {  
                            console.log('Line does not start with data:', line); // 调试信息  
                        }  
                    }  
                }  
  
                // 处理缓冲区中剩余的数据  
                if (buffer) {  
                    const lines = buffer.split('\n');  
                    for (const line of lines) {  
                        if (line.startsWith('data:')) {  
                            const data = line.slice(5).trim();  
                            if (data === '[DONE]') {  
                                statusDiv.textContent = '响应完成';  
                                statusDiv.className = 'status connected';  
                                sendBtn.disabled = false;  
                                return;                            }  
                            responseDiv.textContent += data;  
                        }  
                    }  
                }  
  
                statusDiv.textContent = '响应完成';  
                statusDiv.className = 'status connected';  
                sendBtn.disabled = false;  
  
            } catch (error) {  
                if (error.name === 'AbortError') {  
                    statusDiv.textContent = '请求已取消';  
                } else {  
                    statusDiv.textContent = `连接错误: ${error.message}`;  
                    statusDiv.className = 'status error';  
                    console.error('Error:', error);  
                }  
                sendBtn.disabled = false;  
            }  
        }  
  
        // 支持回车键发送  
        document.getElementById('userInput').addEventListener('keypress', function(e) {  
            if (e.key === 'Enter') {  
                sendMessage();  
            }  
        });  
    </script>  
</body>  
</html>

启动项目后

启动项目后,访问 localhost:8080 即可,控制台会打印请求和响应的内容。

MCP

官网 https://modelcontextprotocol.io/overview

MCP 是 Anthropic 提出的模型上下文协议,目前主流的厂商只是把 MCP 的 Tool 给适配了出来,拿这个来代替本地 Agent 框架的 @Tool。实际上 MCP 还包括了 prompts、resources、image 转 base64。底层的传输协议除了 stdio 和 Streamable HTTP。实际上代码里已经支持了 WebSocket 作为其传输协议,具体代码在这

后面演示 Coze 和百炼平台构建 Agent 的时候会演示,它们只是把 MCP 当做了公共的 Tool。

RAG

RAG 是什么?为什么要有 RAG 技术?

RAG(Retrieval-Augmented Generation,检索增强生成)是一种结合了信息检索和文本生成的技术。简单来说,就是让大模型在回答问题时,先去"查资料”,再基于查到的资料来生成答案。

模型上下文也是有限制的,如果把产品所有的信息放到提示词中,那么回答用户的 Token 就不够。例如一个数仓中,有一万个表,每个表的表定义如果都放入上下文中,回答用户的内容就不够了。可以用 RAG 技术,将每个表的元信息等内容向量化,存入向量数据库,下次用户问的时候,先去检索可能涉及到哪些表,然后带上它们的元信息,再丢给大模型,这样就不会大模型只需要知道相关的表,舍弃掉不需要的表信息,回答结果会更多,更准确。

用一张图就能展现,来自 Wikipedia,先将引用文档向量化,存入向量数据库,下次用户对话的时候,先将用户输入的内容向量化,拿到向量后去向量数据库中找到相近的向量,然后返回对应的上下文文本,再丢给大模型,大模型会回复相应的内容。

下面会演示 Dify、Coze 和 阿里云百炼 平台的向量化执行的内容。

RAG_diagram.svg.png

为什么需要 RAG?

前面我们提到过大模型的"幻觉"问题。当模型遇到训练数据中没有的信息时,可能会编造答案。比如你问"2024年最新的技术趋势是什么?",模型可能基于2023年的训练数据给出过时的答案。

RAG 的核心思想是:让模型在回答前先检索相关信息,确保答案的准确性和时效性

举个例子:

  • 传统方式:用户问"我们公司的产品有哪些?" → 模型直接回答(可能编造)
  • RAG 方式:用户问"我们公司的产品有哪些?" → 检索公司文档 → 基于检索结果回答

文档切块

RAG 的第一步是将文档切分成小块。为什么要切块?

  1. Token 限制:大模型有上下文长度限制,无法一次性处理过长的文档
  2. 精确检索:小块更容易匹配用户的具体问题
  3. 效率考虑:小块检索比全文检索更高效

切块策略

# 简单的按字符数切块
def split_text_by_length(text, chunk_size=1000, overlap=200):
    chunks = []
    start = 0
    while start < len(text):
        end = start + chunk_size
        chunk = text[start:end]
        chunks.append(chunk)
        start = end - overlap
    return chunks

# 按段落切块
def split_text_by_paragraph(text):
    paragraphs = text.split('\n\n')
    return [p.strip() for p in paragraphs if p.strip()]

# 按句子切块(更智能)
import re
def split_text_by_sentence(text, max_length=1000):
    sentences = re.split(r'[.!?]+', text)
    chunks = []
    current_chunk = ""
    
    for sentence in sentences:
        if len(current_chunk) + len(sentence) < max_length:
            current_chunk += sentence + ". "
        else:
            if current_chunk:
                chunks.append(current_chunk.strip())
            current_chunk = sentence + ". "
    
    if current_chunk:
        chunks.append(current_chunk.strip())
    
    return chunks

Embedding

Embedding 是将文本转换为数值向量的过程。这些向量能够捕捉文本的语义信息,使得语义相似的文本在向量空间中距离较近。

为什么要将自然语言向量化?

计算机只能理解数字,不能直接理解文本 人类的语言丰富多样,但对于计算机来说,所有的信息最终都要转化为数字才能进行存储、计算和分析。Embedding 就是把"文本"变成"向量",让计算机能够"理解"并处理自然语言。

想象一下,如果我们要在文档库中搜索"人工智能",传统的关键词搜索可能找不到"AI"、“机器学习"等相关内容。但通过 Embedding,这些语义相关的词汇在向量空间中会很接近。

# 使用 OpenAI 的 Embedding API
import openai
import numpy as np

def get_embedding(text, model="text-embedding-ada-002"):
    response = openai.Embedding.create(
        input=text,
        model=model
    )
    return response['data'][0]['embedding']

# 计算两个向量的相似度
def cosine_similarity(vec1, vec2):
    return np.dot(vec1, vec2) / (np.linalg.norm(vec1) * np.linalg.norm(vec2))

# 示例
text1 = "人工智能技术"
text2 = "AI 技术"
text3 = "天气预报"

embedding1 = get_embedding(text1)
embedding2 = get_embedding(text2)
embedding3 = get_embedding(text3)

print(f"AI 与 人工智能 相似度: {cosine_similarity(embedding1, embedding2)}")
print(f"AI 与 天气预报 相似度: {cosine_similarity(embedding1, embedding3)}")

语义相似性可度量

传统的关键词检索只能找到完全匹配的词,无法理解"人工智能"和"AI"其实是一个意思。通过向量化,语义相关的词在向量空间中距离更近,便于做语义检索、推荐、聚类等任务。例如,用户搜索"机器学习”,系统可以自动联想到"深度学习"“AI"等相关内容。

支持高效的相似度计算和检索

向量化后,可以用余弦相似度等方法快速计算文本之间的相似性。这对于问答系统、智能客服、知识库检索等场景非常关键。比如,用户提问"怎么重置密码?",系统可以在知识库中找到语义最接近的答案。

降维与稠密表达,提升效率

现代 Embedding 方法(如 Word2Vec、BERT、text-embedding-ada-002 等)通常将高维稀疏的文本信息压缩成低维稠密向量,既减少了存储空间,也提升了计算和检索效率。

为下游任务提供统一的输入格式

无论是文本分类、聚类、推荐还是生成,Embedding 都是深度学习模型的"通用输入”。有了向量化的表达,模型才能更好地"理解"文本,实现更智能的推理和决策。

举个实际例子:

假如你有一堆 FAQ 问答,用户输入"如何修改账户密码?",传统检索只能找"修改账户密码"这几个字完全匹配的条目。但如果用 Embedding,把所有问题和用户输入都转成向量,就能找到"重置密码"“更改登录密码"等语义相近的问题,大大提升了检索的智能性和用户体验。

https://cloud.dify.ai/app/194064ed-ee7d-4775-aa3d-648ca03fc909/workflow

本质

向量本质就是一串数字,有只有 0 和 1 整数表示的稀疏向量,还有浮点数表示的稠密向量。

稀疏向量(Sparse Vector)

  • 大部分元素为0,只有少数非零元素
  • 例如:[0, 0, 1, 0, 0, 0, 1, 0, 0, 0]
  • 存储时可以只记录非零元素的位置和值,节省空间

稠密向量(Dense Vector)

  • 大部分或全部元素都是非零的浮点数
  • 例如:[0.23, -0.15, 0.67, 0.89, -0.34]
  • 信息分布均匀,每个维度都有意义

在机器学习中,这两种向量形式各有用途:

  • 稀疏向量常用于表示分类特征、文本的词频统计等
  • 稠密向量常用于嵌入表示、神经网络的中间层输出等

上面的 OpenAI text-embedding-3-small 返回的是 1536 维度的向量,Milvus 都可以存,并且后续如果更加优化的方式,可以利用稀疏向量先过滤,再进行筛选最符合稀疏向量相近稠密向量的文档。

过程如下 image.png

存储向量

有了 Embedding 后,我们需要将这些向量存储起来,以便快速检索。常用的向量数据库包括:

  • Pinecone:云服务,易于使用
  • Weaviate:开源,功能丰富
  • Qdrant:性能优秀,支持复杂查询
  • Chroma:内存向量数据库,仅用于演示使用
  • Milvus:开源,企业级向量数据库
  • OpenGauss
  • Elasticsearch
  • Redis

后面三个都能用,但是不如 Milvus 好用,其中第一个是为了完全国产化准备的,后面两个是已有一些对应的中间件,不想新增,才会去选择。

Milvus

Milvus 是一个开源的向量数据库,专为 AI 应用设计。它支持多种距离度量方式,能够高效处理大规模向量数据。

为什么选择 Milvus?

  1. 高性能:支持数十亿级向量数据
  2. 易扩展:支持水平扩展
  3. 多距离度量:支持余弦相似度、欧几里得距离等
  4. 实时搜索:毫秒级响应时间
  5. 支持不同索引:FLAT、IVF_FLAT(最推荐)、HNSW 等等,按需选择

还有最重要的一点,Coze Studio 默认选择这个。

基本使用示例

Milvus 有三种部署方式

  1. Milvus-Lite
  2. Milvus-Standalone(适合中等数据量的项目,就适合这里)
  3. Milvus-Cluster

使用 Milvus-Standalone 作为演示

docker-compose.yaml 文件如下

version: '3.5'

services:
  etcd:
    container_name: milvus-etcd
    image: quay.io/coreos/etcd:v3.5.5
    environment:
      - ETCD_AUTO_COMPACTION_MODE=revision
      - ETCD_AUTO_COMPACTION_RETENTION=1000
      - ETCD_QUOTA_BACKEND_BYTES=4294967296
      - ETCD_SNAPSHOT_COUNT=50000
    volumes:
      - etcd_data:/etcd
    command: etcd -advertise-client-urls=http://127.0.0.1:2379 -listen-client-urls http://0.0.0.0:2379 --data-dir /etcd
    healthcheck:
      test: ["CMD", "etcdctl", "endpoint", "health"]
      interval: 30s
      timeout: 20s
      retries: 3

  minio:
    container_name: milvus-minio
    image: minio/minio:RELEASE.2023-03-20T20-16-18Z
    environment:
      MINIO_ACCESS_KEY: minioadmin
      MINIO_SECRET_KEY: minioadmin
    ports:
      - "9001:9001"
      - "9000:9000"
    volumes:
      - minio_data:/minio_data
    command: minio server /minio_data --console-address ":9001"
    healthcheck:
      test: ["CMD", "curl", "-f", "http://localhost:9000/minio/health/live"]
      interval: 30s
      timeout: 20s
      retries: 3

  milvus:
    container_name: milvus-standalone
    image: milvusdb/milvus:v2.3.4
    command: ["milvus", "run", "standalone"]
    environment:
      ETCD_ENDPOINTS: etcd:2379
      MINIO_ADDRESS: minio:9000
    volumes:
      - milvus_data:/var/lib/milvus
      - ./milvus-config/user.yaml:/milvus/configs/user.yaml
    healthcheck:
      test: ["CMD", "curl", "-f", "http://localhost:9091/healthz"]
      interval: 30s
      start_period: 90s
      timeout: 20s
      retries: 3
    ports:
      - "19530:19530"
      - "9091:9091"
    depends_on:
      - "etcd"
      - "minio"

  attu:
    container_name: milvus-attu
    image: zilliz/attu:v2.3.4
    environment:
      MILVUS_URL: milvus:19530
    ports:
      - "3000:3000"
    depends_on:
      - "milvus"

volumes:
  etcd_data:
  minio_data:
  milvus_data:

networks:
  default:
    name: milvus

使用 docker compose up -d 启动项目,比较老的 docker 中间需要加个 - ,也就是 docker-compose up -d 启动项目。

启动成功后,访问 docker 的地址加 3000 端口,即可看到下面的内容

image.png image.png

必须 Python 3.13 版本以上

依赖 requirements.txt

pymilvus>=2.4.0
sentence-transformers>=2.6.0
numpy>=1.26.0
torch>=2.1.0
transformers>=4.35.0

安装依赖 WSL 类 Linux 系统下

python -m venv .venv
source .venv/bin/activate
pip install -r requirements.txt
from pymilvus import connections, Collection, FieldSchema, CollectionSchema, DataType, utility
import numpy as np
from sentence_transformers import SentenceTransformer

# 使用更适合中文的 Embedding 模型
model = SentenceTransformer('shibing624/text2vec-base-chinese')  # 中文模型,768维向量

def get_embedding(text):
    """使用 sentence-transformers 获取文本向量"""
    embedding = model.encode(text)
    return embedding.tolist()

# 连接 Milvus
connections.connect("default", host="172.27.226.5", port="19530")

# 定义集合结构
dim = 768  # text2vec-base-chinese 的向量维度
fields = [
    FieldSchema(name="id", dtype=DataType.INT64, is_primary=True, auto_id=True),
    FieldSchema(name="text", dtype=DataType.VARCHAR, max_length=65535),
    FieldSchema(name="embedding", dtype=DataType.FLOAT_VECTOR, dim=dim)
]

schema = CollectionSchema(fields=fields, description="文档向量集合")
collection_name = "documents"

# 创建集合
if utility.has_collection(collection_name):
    utility.drop_collection(collection_name)

collection = Collection(name=collection_name, schema=schema)

# 创建索引
index_params = {
    "metric_type": "COSINE",
    "index_type": "IVF_FLAT",
    "params": {"nlist": 1024}
}
collection.create_index(field_name="embedding", index_params=index_params)

# 插入数据
documents = [
    "AI全称是人工智能,是计算机科学的一个分支,能模拟人类智能,也是个笼统的概念,其中机器学习是其实现的一种手段",
    "人工智能是计算机科学的一个分支",
    "机器学习是AI的重要技术",
    "深度学习是机器学习的一个子集,机器学习是AI的重要技术",
    "《黑神话:悟空》的制作人是冯骥(游戏科学联合创始人)",
    "中国电影2024年票房冠军是热辣滚烫"
]

embeddings = [get_embedding(doc) for doc in documents]
texts = documents

# 修复:按照字段定义顺序插入数据 (id是自增的,所以只需要text和embedding)
collection.insert([texts, embeddings])
collection.flush()

# 搜索
collection.load()
search_params = {"metric_type": "COSINE", "params": {"nprobe": 10}}

query_embedding = get_embedding("AI是什么?")
print(f"查询向量前10个值: {query_embedding[:10]}")
print("=" * 50)
results = collection.search(
    data=[query_embedding],
    anns_field="embedding",
    param=search_params,
    limit=6,
    output_fields=["text", "embedding"]
)

for hits in results:
    for hit in hits:
        print(f"相似度: {hit.score}, 文本: {hit.entity.get('text')}")
        # 获取并打印向量的前10个值
        embedding = hit.entity.get('embedding')
        if embedding:
            print(f"向量前10个值: {embedding[:10]}")
        print("-" * 50)

输出内容

查询向量前10个值: [1.1496570110321045, -0.3689393997192383, 0.9862505793571472, 0.9357891082763672, 0.6909440755844116, -0.5275141000747681, 1.376283884048462, 0.8557244539260864, -1.6160293817520142, 0.22704903781414032]
==================================================
相似度: 0.6506596803665161, 文本: AI全称是人工智能,是计算机科学的一个分支,能模拟人类智能,也是个笼统的概念,其中机器学习是其实现的一种手段
向量前10个值: [0.8219285607337952, 0.2677353620529175, 1.1195573806762695, 1.2051314115524292, 0.4739314317703247, -0.6863083839416504, 0.9294951558113098, 0.9983838200569153, -1.1890289783477783, -0.1570766568183899]
--------------------------------------------------
相似度: 0.6420454978942871, 文本: 机器学习是AI的重要技术
向量前10个值: [0.47643378376960754, -0.6565182209014893, 0.4817119836807251, 1.7072904109954834, 0.6420148015022278, -0.5517555475234985, 1.3600889444351196, 1.0196406841278076, -1.330409288406372, -0.6851357817649841]
--------------------------------------------------
相似度: 0.6177574992179871, 文本: 人工智能是计算机科学的一个分支
向量前10个值: [0.4621504247188568, -0.15476857125759125, 0.8900142908096313, 1.2281856536865234, 0.34598737955093384, -0.4854089021682739, 0.9530060887336731, 1.0955334901809692, -0.9843278527259827, 0.3670949339866638]
--------------------------------------------------
相似度: 0.6035197377204895, 文本: 深度学习是机器学习的一个子集,机器学习是AI的重要技术
向量前10个值: [0.6659308671951294, -0.4055631756782532, 0.8532114028930664, 1.709717035293579, 0.7424928545951843, -0.919252872467041, 0.6775144934654236, 0.10599350929260254, -1.2651119232177734, -0.6233137249946594]
--------------------------------------------------
相似度: 0.32994917035102844, 文本: 黑神话悟空的创始人是冯冀
向量前10个值: [-0.13918161392211914, -1.3258603811264038, 0.7236999869346619, 0.12215479463338852, 0.6712068915367126, 0.2972608208656311, 1.4430207014083862, -0.38402822613716125, -1.4714456796646118, 0.08675938844680786]
--------------------------------------------------
相似度: 0.22542385756969452, 文本: 中国电影2024年票房冠军是热辣滚烫
向量前10个值: [0.01335122063755989, 0.918669581413269, -0.899476170539856, 1.026759147644043, 0.9375036954879761, -1.1757415533065796, 0.6524897813796997, 0.8495270609855652, -0.9792198538780212, 0.5388347506523132]
--------------------------------------------------

RAG 完整流程

现在让我们看一个完整的 RAG 实现:

这个只是做个展示,不执行,本地无法执行,需要 OpenAI 的 ApiKey。

import openai
from typing import List, Dict
import numpy as np

class RAGSystem:
    def __init__(self, vector_db):
        self.vector_db = vector_db
        self.client = openai.OpenAI()
    
    def add_documents(self, documents: List[str]):
        """添加文档到知识库"""
        embeddings = []
        for doc in documents:
            embedding = self.get_embedding(doc)
            embeddings.append(embedding)
        
        # 存储到向量数据库
        self.vector_db.insert(documents, embeddings)
    
    def get_embedding(self, text: str):
        """获取文本的 Embedding"""
        response = self.client.embeddings.create(
            input=text,
            model="text-embedding-ada-002"
        )
        return response.data[0].embedding
    
    def retrieve(self, query: str, top_k: int = 3) -> List[str]:
        """检索相关文档"""
        query_embedding = self.get_embedding(query)
        results = self.vector_db.search(query_embedding, top_k)
        return [result['text'] for result in results]
    
    def generate_answer(self, query: str, context: List[str]) -> str:
        """基于检索结果生成答案"""
        context_text = "\n".join(context)
        
        prompt = f"""基于以下信息回答问题:

信息:
{context_text}

问题:{query}

请基于上述信息回答,如果信息中没有相关内容,请说明无法回答。"""

        response = self.client.chat.completions.create(
            model="gpt-3.5-turbo",
            messages=[{"role": "user", "content": prompt}],
            temperature=0
        )
        
        return response.choices[0].message.content
    
    def answer(self, query: str) -> str:
        """完整的 RAG 流程"""
        # 1. 检索相关文档
        relevant_docs = self.retrieve(query)
        
        # 2. 生成答案
        answer = self.generate_answer(query, relevant_docs)
        
        return answer

# 使用示例
rag = RAGSystem(vector_db)  # 假设已经初始化了向量数据库

# 添加知识库
documents = [
    "我们公司主要做量化交易,提供高频交易策略",
    "我们的产品包括股票、期货、期权等多种金融工具",
    "公司成立于2020年,总部位于杭州"
]

rag.add_documents(documents)

# 提问
answer = rag.answer("你们公司是做什么的?")
print(answer)

RAG 的优势与挑战

优势

  1. 准确性:基于真实信息回答,减少幻觉
  2. 时效性:可以随时更新知识库
  3. 可解释性:可以追溯到具体的参考文档
  4. 成本效益:比重新训练模型更经济

挑战

  1. 检索质量:检索不到相关内容时,答案质量下降
  2. 上下文长度:检索到的文档可能超出模型上下文限制
  3. 实时性:知识库更新需要时间
  4. 成本:需要额外的存储和计算资源

RAG 技术正在快速发展,已经成为构建可靠 AI 应用的重要工具。通过合理的设计和优化,RAG 能够显著提升大模型在实际应用中的表现。

Fine-Tuning

市面上大模型一般都是通用的,如果需要在某个行业使用,网上搜不到相关的内容的,需要回复特定内容的,都需要进行微调(Fine-Tuning),通过规范化数据集,进行微调。如果使用的是 OpenAI,它们提供了非常简单的微调方式,只需要提供类似下面的 josnl 文件即可。

{
  "messages": [
    {
      "role": "user",
      "content": "What is the weather in Minneapolis?"
    },
    {
      "role": "assistant",
      "tool_calls": [
        {
          "id": "call_id",
          "type": "function",
          "function": {
            "name": "get_current_weather",
            "arguments": "{\"location\": \"Minneapolis, USA\", \"format\": \"celsius\"}"
          }
        }
      ]
    }
  ],
  "parallel_tool_calls": false,
  "tools": [
    {
      "type": "function",
      "function": {
        "name": "get_current_weather",
        "description": "Get the current weather",
        "parameters": {
          "type": "object",
          "properties": {
            "location": {
              "type": "string",
              "description": "The city and country, eg. Minneapolis, USA"
            },
            "format": {
              "type": "string",
              "enum": [
                "celsius",
                "fahrenheit"
              ]
            }
          },
          "required": [
            "location",
            "format"
          ]
        }
      }
    }
  ]
}

Supervised fine-tuning (SFT)

https://platform.openai.com/docs/guides/supervised-fine-tuning

这个是监督微调,也就是你提供了输入以及输出该是什么。当然这个除了调用 OpenAI 的接口进行微调,然后输出微调后的模型 ID 进行调用,还可以通过编写对应的 Python 代码,进行。

其实就是机器学习,准备好数据集,一部分用来训练一部分用来测试,具体的操作流程在[[#进阶级资源]]中有如何操作。

其他工具展示

大模型知识提供底层的基础能力,通过输入提示词,返回相应的内容。如果想要解决复杂的问题,需要和外部进行交互的,必须要有对应的 Agent 框架或者工具进行支撑,那么下面的一些工具或者说解决方案,都是是目前业界主流选择。

一站式解决方案 - Dify

https://cloud.dify.ai/apps

可以利用 Dify 来构建自己的想要的 Agent,例如下面的,没有 Tool 的,只是加了 RAG 的聊天机器人,会通过已有的知识内容回答,而不是胡编乱造。

image.png

Coze 平台/开源项目

网址,这是国内版,国外版也差不多,国外版能用 Claude 和 ChatGPT 这些大模型。选择开发平台,快速开始,我们可以看到下图所示内容。有自己的工作空间,和 Dify 其实是类似的,都是利用工作流构建 Agent,里面还有各种各样的功能。点击左侧模板还可以参考别人构建的智能体,不仅能在网页上调用,还能通过 API 调用。这种一站式的开发,几乎不需要写代码的方式,是比较好的选择,让非程序员成员都能参与一起构建 Agent。

当然这个也有开源版,在 2025 年 7 月份开源,也就是刚开源不久,项目本身用的是 TypeScript 和 Golang 写的,有 Java SDK 可以调用 Coze Studio

image.png

Coze 官方使用文档在这

Spring-AI-Alibaba 也提供了类似 Coze Studio 的功能,只不过目前(2025 年 8 月8 日)正在完善。

IDE Plugin - Cline

目前主要以 VS Code 扩展的形式使用,同时也可在 Cursor、Windsurf 中使用;需要自备不同大模型的 API Key。支持 DeepSeek、OpenAI、Claude 等等。

与之相对应,如更深度集成 IDE 的 Copilot、类似产品的 Roo Code、Continue 都差不多。值得一提的是,它们都支持 MCP。可以利用这个,来实现自己一些本地的调用,例如下一个 JDBC 的 MCP,去查询某个表的数据,都是可以的。

AI IDE - Cursor

Cursor 是基于 VS Code 二次开发的 AI IDE,它的竞品有 Windsurf、Trae(字节开发)和命令行的 Claude Code、OpenAI Codex 等等。

不仅可以用 Cursor 来写代码,它也是你的文本编辑的最强力的助手。关于如何使用,在它的官方网站有写,这里只会提最简单,最常用的功能。

  1. Tab 自动补全,只要你更改了一部分代码,Cursor 会自动根据上下文自动补全,或者知道你下一步即将要做什么,前提是不要打开太多的标签页干扰上下文。
  2. Ctrl + I 打开对话,将你的问题,以及要涉及到的上下文通过 @file 的形式进行添加,通过前面我们知道大模型上下文是有限制的,如果超过大模型上下文,大模型将会舍弃你最前面的上下文,所以需要你自己控制,一件事情打开一个 tab 而不是所有问题都放到一个 tab 之内。
  3. 选中任意文本,Ctrl + K 小范围文本内容修复,有时候不需要那么大的上下文进行对话,就可以用这个。
  4. 写代码优先推荐 Claude 最新模型,其次是 GPT 系列。

红极一时的多 Agent Manus

https://manus.im/app

这个是多个 Agent,每个 Agent 有着不同的上下文,不同的 system prompt。例如构建任务清单的 Agent,执行不同任务的 Agent,Agent 和 Agent 之间通过特定的文本进行沟通,例如 JSON,或者 Markdown 格式的文本。

类似的,简化版本的多 Agent 样例

复刻版本多 Agent JManus

由 Alibaba 开源,是 Spring-AI-Alibaba 项目中的一个子项目,后端由 Java 实现,前端是 Vue+TypeScript 实现。

其实也是先计划,再执行,将大任务拆解成子任务,逐个执行。

学习资源推荐

除了大语言模型,还有 Diffusion Model 扩散模型,是当前所有流行的文生图,文生视频的基础技术。包括了 Stable Difussion、DALL-E、Midjourney、阿里的通义万相等等。这个在 HuggingFace 上有专门的课程教这个,原理是什么,下面推荐的 AI 大模型之美里也会提到。

下面资源是大语言模型、机器学习、NLP 相关的内容。

入门级资源

  1. 如何写好提示词(吴恩达课程)
  2. AI 大模型之美

进阶级资源

  1. Deep Dive into LLMs like ChatGPT
  2. AI Agent课程
  3. 保姆级大模型微调实战(吴恩达)
  4. 吴恩达-机器学习
  5. 动手学深度学习

吴恩达-机器学习,讲了什么是机器学习,梯度下降是什么,线性回归,反向传播等等,是非常简单易懂的入门课程。

书籍推荐

  1. 这就是 ChatGPT
  2. 《自然语言处理实战》
  3. 《深度学习》

实践平台

  1. Dify - 拖拽式AI应用搭建
  2. Hugging Face - AI模型和工具
  3. OpenAI Playground - 体验各种AI模型