LangGraph 实战:构建一个可控的 ReAct Agent

ReAct(Reasoning + Acting)是让 Agent 先思考再行动的范式。这篇文章用 LangGraph 实现一个完整的 ReAct Agent,并加入一些生产环境需要的工程细节。 ReAct 的执行流程 用户输入 ↓ 思考(Thought):分析问题,决定下一步 ↓ 行动(Action):选择并调用工具 ↓ 观察(Observation):获取工具结果 ↓ 循环,直到能给出最终答案 ↓ 最终答案 用 LangGraph 实现 from typing import TypedDict, Annotated from langchain_core.messages import BaseMessage, HumanMessage, AIMessage, ToolMessage from langchain_openai import ChatOpenAI from langgraph.graph import StateGraph, END import operator class AgentState(TypedDict): messages: Annotated[list[BaseMessage], operator.add] iteration: int # 防止无限循环 # 初始化 LLM(绑定工具) llm = ChatOpenAI(model="gpt-4o", temperature=0) llm_with_tools = llm.bind_tools(tools) def should_continue(state: AgentState) -> str: """决定下一步走哪条边""" last_message = state["messages"][-1] # 超出最大迭代次数,强制结束 if state["iteration"] >= 10: return "end" # 最后一条消息没有工具调用,说明已经有最终答案了 if not hasattr(last_message, "tool_calls") or not last_message.tool_calls: return "end" return "tools" def call_llm(state: AgentState) -> AgentState: """调用 LLM 节点""" response = llm_with_tools.invoke(state["messages"]) return { "messages": [response], "iteration": state["iteration"] + 1 } def call_tools(state: AgentState) -> AgentState: """执行工具调用节点""" last_message = state["messages"][-1] tool_messages = [] for tool_call in last_message.tool_calls: tool_name = tool_call["name"] tool_args = tool_call["args"] try: result = tool_registry[tool_name].invoke(tool_args) content = str(result) except Exception as e: content = f"工具调用失败:{str(e)}" # 错误不崩溃,反馈给模型 tool_messages.append(ToolMessage( content=content, tool_call_id=tool_call["id"] )) return {"messages": tool_messages} # 构建图 graph = StateGraph(AgentState) graph.add_node("llm", call_llm) graph.add_node("tools", call_tools) graph.set_entry_point("llm") graph.add_conditional_edges("llm", should_continue, {"tools": "tools", "end": END}) graph.add_edge("tools", "llm") # 工具执行完,回到 LLM 节点 agent = graph.compile() 加入 Human-in-the-loop LangGraph 支持在某些步骤暂停,等待人工确认: ...

2024-11-18 · 2 min · Kada Liao

Function Calling 实战:让 LLM 学会调用工具

Function Calling(现在 OpenAI 叫 Tool Use)是让 LLM 从"聊天机器人"变成"能干活的 Agent"的关键能力。 核心原理 Function Calling 的本质是:你告诉模型"你可以调用这些函数",模型在需要时会输出一个结构化的"我要调用 X 函数,参数是 Y",然后由你的代码真正去执行这个函数,把结果再传给模型。 模型本身不执行任何代码,它只负责"决策"。 最简示例 from openai import OpenAI import json client = OpenAI() # 定义工具 tools = [ { "type": "function", "function": { "name": "get_weather", "description": "获取指定城市的天气", "parameters": { "type": "object", "properties": { "city": { "type": "string", "description": "城市名称,如:北京、上海" } }, "required": ["city"] } } } ] # 实际的工具函数 def get_weather(city: str) -> str: # 这里接真实天气 API return f"{city}今天晴,25°C" messages = [{"role": "user", "content": "北京今天天气怎么样?"}] response = client.chat.completions.create( model="gpt-4o", messages=messages, tools=tools, tool_choice="auto" ) # 判断是否要调用工具 message = response.choices[0].message if message.tool_calls: tool_call = message.tool_calls[0] args = json.loads(tool_call.function.arguments) # 执行工具 result = get_weather(**args) # 把结果传回给模型 messages.append(message) messages.append({ "role": "tool", "tool_call_id": tool_call.id, "content": result }) final_response = client.chat.completions.create( model="gpt-4o", messages=messages ) print(final_response.choices[0].message.content) 多工具调度 模型可以在一次回复里调用多个工具(parallel tool calling): ...

2024-02-20 · 1 min · Kada Liao

LangChain 踩坑合集:那些让我头疼的问题

LangChain 的迭代速度极快,API 经常变,文档跟不上代码。这篇文章记录我踩过的一些有代表性的坑。 坑 1:版本兼容性 LangChain 把包拆分了,以前的 langchain 现在分成了: langchain-core:核心抽象 langchain:主包 langchain-community:第三方集成 langchain-openai、langchain-anthropic 等:各家模型的独立包 很多旧教程的 import 路径在新版本里已经不对了: # 旧写法(可能报 ImportError) from langchain.chat_models import ChatOpenAI # 新写法 from langchain_openai import ChatOpenAI 解决办法:固定版本,或者直接看报错信息里的迁移提示。 坑 2:ConversationBufferMemory 在 LCEL 里不能直接用 从旧版 Chain API 迁移到 LCEL 时,发现旧的 Memory 类不能直接套用: # LCEL 里需要手动管理历史 from langchain_core.messages import HumanMessage, AIMessage history = [] def chat(user_input: str) -> str: history.append(HumanMessage(content=user_input)) response = chain.invoke({"messages": history}) history.append(AIMessage(content=response)) return response LCEL 更偏向函数式,状态管理需要自己来。 坑 3:Chroma 持久化 # 错误:每次都重建 vectorstore,已有的数据被覆盖 vectorstore = Chroma.from_documents(docs, embeddings, persist_directory="./db") # 正确:已有数据库直接加载 import os if os.path.exists("./db"): vectorstore = Chroma(persist_directory="./db", embedding_function=embeddings) else: vectorstore = Chroma.from_documents(docs, embeddings, persist_directory="./db") 坑 4:输出解析器的错误处理 LLM 偶尔会输出格式不符合预期的内容,导致解析器报错: ...

2023-11-08 · 1 min · Kada Liao

RAG 系统从零搭建:检索增强生成的原理与实践

LLM 有两个核心局限:知识有截止日期、无法访问私有数据。RAG(Retrieval-Augmented Generation)是目前解决这两个问题最主流的方案。 RAG 的基本流程 文档 → 切块 → Embedding → 存入向量库 ↓ 用户问题 → Embedding → 向量检索 → 召回相关块 → 组合 Prompt → LLM → 回答 四个核心步骤:文档处理、向量化、检索、生成。 文档切块 文档太长无法直接塞给模型,需要切成小块: from langchain.text_splitter import RecursiveCharacterTextSplitter splitter = RecursiveCharacterTextSplitter( chunk_size=500, chunk_overlap=50, # 相邻块有重叠,避免信息割裂 separators=["\n\n", "\n", "。", ",", " "] ) chunks = splitter.split_documents(docs) chunk_overlap 是个容易忽略的参数——它让相邻的块有内容重叠,避免一句话被硬切断导致语义丢失。 Embedding 与向量存储 from langchain_openai import OpenAIEmbeddings from langchain_community.vectorstores import Chroma embeddings = OpenAIEmbeddings(model="text-embedding-3-small") # 将文档块向量化并存入 Chroma vectorstore = Chroma.from_documents( documents=chunks, embedding=embeddings, persist_directory="./chroma_db" ) 向量数据库本质是做高维空间的近邻搜索。Chroma 适合本地开发,生产环境可以考虑 Pinecone、Weaviate 或自建 Milvus。 检索与生成 from langchain.chains import RetrievalQA retriever = vectorstore.as_retriever( search_type="similarity", search_kwargs={"k": 4} # 召回最相关的 4 个块 ) qa_chain = RetrievalQA.from_chain_type( llm=ChatOpenAI(model="gpt-3.5-turbo"), retriever=retriever, return_source_documents=True ) result = qa_chain.invoke({"query": "公司的请假政策是什么?"}) print(result["result"]) 几个影响效果的关键点 切块策略:chunk_size 太小,单块缺乏上下文;太大,引入噪声。通常 300-600 tokens 是个比较合适的范围。 ...

2023-08-15 · 1 min · Kada Liao

LangChain 入门:用 Python 构建你的第一个 LLM 应用

2023 年初,LangChain 在 GitHub 上的星数以惊人的速度增长,一夜之间成为 LLM 应用开发的标配。这篇文章梳理一下它的核心设计和基础用法。 为什么需要 LangChain 直接调用 OpenAI API 能做很多事,但当应用变复杂时,你会发现需要反复造轮子: 多轮对话的历史管理 Prompt 模板化 链式调用多个 LLM 步骤 连接外部数据源 LangChain 把这些封装成了可组合的组件。 核心概念 Chain Chain 是 LangChain 的核心抽象,把多个操作串联起来: from langchain.chains import LLMChain from langchain.prompts import PromptTemplate from langchain_openai import ChatOpenAI llm = ChatOpenAI(model="gpt-3.5-turbo") prompt = PromptTemplate( input_variables=["topic"], template="用 3 句话解释{topic},面向初学者" ) chain = LLMChain(llm=llm, prompt=prompt) result = chain.invoke({"topic": "向量数据库"}) print(result["text"]) Memory Memory 组件负责维护对话历史: from langchain.memory import ConversationBufferMemory from langchain.chains import ConversationChain memory = ConversationBufferMemory() conversation = ConversationChain(llm=llm, memory=memory) conversation.predict(input="我叫 Kada") conversation.predict(input="你还记得我叫什么吗?") # 能记住 Document Loaders 加载外部文档,是构建 RAG 的基础: from langchain_community.document_loaders import TextLoader, PyPDFLoader # 加载文本文件 loader = TextLoader("./doc.txt", encoding="utf-8") docs = loader.load() # 加载 PDF pdf_loader = PyPDFLoader("./report.pdf") pages = pdf_loader.load_and_split() LCEL:新的链式语法 LangChain 0.1 之后推荐用 LCEL(LangChain Expression Language)写链: ...

2023-03-20 · 1 min · Kada Liao

Python asyncio 实战:从入门到踩坑

Python 3.5 引入 asyncio 之后,异步编程逐渐成为 Python 工程师的必备技能。但从"跑通 demo"到"在生产项目里用好它",中间有一段不短的距离。 event loop 的生命周期 很多人第一次写 asyncio 代码是这样的: import asyncio async def main(): await asyncio.sleep(1) print("done") asyncio.run(main()) asyncio.run() 是 Python 3.7 引入的,它做了三件事: 创建一个新的 event loop 运行传入的协程直到完成 关闭 event loop 并清理资源 注意:不要在已经运行的 event loop 里调用 asyncio.run(),这会抛出 RuntimeError。 常见坑:忘记 await async def fetch_data(): return await some_async_operation() # 错误写法——fetch_data() 返回的是协程对象,不是结果 result = fetch_data() # 正确写法 result = await fetch_data() 协程对象不会自动执行,必须被 await、asyncio.create_task() 或 asyncio.gather() 驱动。 并发执行多个任务 import asyncio async def fetch(url): # 模拟 IO 操作 await asyncio.sleep(0.1) return f"result from {url}" async def main(): urls = ["url1", "url2", "url3"] # 并发执行,总耗时约 0.1s 而不是 0.3s results = await asyncio.gather(*[fetch(url) for url in urls]) print(results) asyncio.run(main()) asyncio.gather() 并发调度多个协程,是最常用的并发原语。 ...

2022-09-15 · 1 min · Kada Liao

Python ORM 选型:SQLAlchemy vs Tortoise-ORM vs Peewee

Python 的 ORM 生态比较分散,没有像 Rails ActiveRecord 那样一统天下的选择。这篇文章对比一下我用过的几个。 SQLAlchemy:工业级首选 SQLAlchemy 是 Python 生态里最成熟、功能最强的 ORM,分两层: Core:SQL 表达式语言,接近裸 SQL ORM:高级对象映射层 from sqlalchemy import create_engine, Column, Integer, String, ForeignKey from sqlalchemy.orm import DeclarativeBase, relationship, Session engine = create_engine("mysql+pymysql://user:pass@localhost/mydb") class Base(DeclarativeBase): pass class User(Base): __tablename__ = "users" id = Column(Integer, primary_key=True) name = Column(String(50), nullable=False) email = Column(String(100), unique=True) orders = relationship("Order", back_populates="user") class Order(Base): __tablename__ = "orders" id = Column(Integer, primary_key=True) user_id = Column(Integer, ForeignKey("users.id")) total = Column(Integer) user = relationship("User", back_populates="orders") # 查询 with Session(engine) as session: users = session.query(User).filter(User.name.like("K%")).all() # 预加载关联数据(避免 N+1 问题) from sqlalchemy.orm import selectinload users = session.query(User).options(selectinload(User.orders)).all() SQLAlchemy 2.0 新语法(推荐): from sqlalchemy import select with Session(engine) as session: stmt = select(User).where(User.name.like("K%")) users = session.execute(stmt).scalars().all() 优点:功能极强,支持复杂查询,文档完善,生态成熟。 缺点:学习曲线陡,概念多(Session、Unit of Work、lazy loading……)。 Tortoise-ORM:异步场景的选择 专为 asyncio 设计,和 FastAPI 配合很顺手: ...

2022-07-19 · 2 min · Kada Liao

FastAPI 入门:用 Python 快速构建现代 Web API

Django REST Framework 太重,Flask 太裸。FastAPI 在这两者之间找到了一个很好的平衡点。 为什么是 FastAPI 快:基于 Starlette,性能接近 Node.js 自动文档:自动生成 Swagger UI 和 ReDoc 类型驱动:用 Pydantic 做数据校验,写一次模型,校验和文档都有了 原生异步:完整支持 async/await 5 分钟上手 uv pip install fastapi uvicorn # main.py from fastapi import FastAPI app = FastAPI(title="我的 API", version="1.0.0") @app.get("/") def read_root(): return {"message": "Hello, World!"} @app.get("/users/{user_id}") def get_user(user_id: int, include_orders: bool = False): return {"user_id": user_id, "include_orders": include_orders} uvicorn main:app --reload # 访问 http://localhost:8000/docs 看自动生成的文档 Pydantic 模型:请求/响应的核心 from pydantic import BaseModel, EmailStr, Field from datetime import datetime class UserCreate(BaseModel): name: str = Field(..., min_length=2, max_length=50) email: EmailStr age: int = Field(..., ge=0, le=150) class UserResponse(BaseModel): id: int name: str email: str created_at: datetime class Config: from_attributes = True # 支持从 ORM 对象创建 @app.post("/users", response_model=UserResponse, status_code=201) def create_user(user: UserCreate): # FastAPI 自动解析请求体、校验字段、返回时过滤多余字段 db_user = save_to_db(user) return db_user Pydantic 的 Field 支持的校验相当丰富:字符串长度、数字范围、正则表达式……不需要手写校验逻辑。 ...

2022-03-28 · 2 min · Kada Liao

Python 虚拟环境管理:从 venv 到 uv

Python 的包管理历史是一部混乱史。这篇文章梳理一下各种工具的演变,以及我现在的实践选择。 为什么需要虚拟环境 不同项目依赖不同版本的库。不用虚拟环境,所有包都装在系统 Python 里,版本冲突是迟早的事: 项目 A:需要 Django 3.2 项目 B:需要 Django 4.2 → 只能装一个,两个项目不能同时开发 虚拟环境为每个项目创建独立的 Python 环境,依赖互不干扰。 各种工具的演变 venv(Python 3.3+ 内置) python -m venv .venv source .venv/bin/activate # macOS/Linux .venv\Scripts\activate # Windows pip install requests pip freeze > requirements.txt 最基础,不需要额外安装,但功能简单,requirements.txt 不区分开发依赖和生产依赖。 virtualenv + pip-tools pip install pip-tools # requirements.in(只写直接依赖) requests>=2.28 flask>=2.0 # 生成锁定版本的 requirements.txt pip-compile requirements.in # 安装 pip-sync requirements.txt pip-compile 解决了依赖锁定的问题,是很长一段时间内的最佳实践。 Poetry poetry new my-project poetry add requests poetry add pytest --group dev poetry install # pyproject.toml 管理所有配置 Poetry 把依赖管理、打包、发布整合在一起,pyproject.toml 是单一配置文件。问题是速度慢,解析依赖时间长。 ...

2021-10-05 · 1 min · Kada Liao

Python 生成器与迭代器:惰性求值的力量

处理大文件或数据流时,把所有数据加载进内存是不现实的。生成器提供了一种"按需生成"的方式,解决了这个问题。 迭代器协议 Python 的 for 循环本质上是在调用迭代器协议: # for 循环等价于这段代码 it = iter(my_list) # 调用 __iter__,返回迭代器 while True: try: item = next(it) # 调用 __next__,取下一个元素 # 执行循环体 except StopIteration: break 实现了 __iter__ 和 __next__ 的对象就是迭代器。 生成器函数 用 yield 的函数就是生成器函数,调用它返回一个生成器对象: def count_up(start, end): current = start while current <= end: yield current # 暂停,返回 current,等待下次调用 current += 1 for n in count_up(1, 5): print(n) # 1 2 3 4 5 关键在于暂停和恢复:每次 next() 调用,函数从上次 yield 的地方继续执行,直到下一个 yield。 实际用处:处理大文件 # 不好:一次性加载所有行到内存 def read_all_lines(filename): with open(filename) as f: return f.readlines() # 10GB 文件直接 OOM # 好:逐行生成,内存始终只有一行 def read_lines(filename): with open(filename) as f: for line in f: yield line.strip() # 使用 for line in read_lines("huge_file.csv"): process(line) 生成器表达式 列表推导式的"惰性版本": ...

2021-06-30 · 2 min · Kada Liao