Skip to content

RAG Chatbot

RAG Chatbot

Build and Deploy a RAG Chatbot with JavaScript, LangChain.js, Next.js, Vercel, OpenAI

  • datastax 向量数据库
  • pnpm i @datastax/astra-db-ts langchain openai dotenv
  • GitHub 仓库
  • datastax 无法连接,及时止损,后续再研究

RAG Chatbot 2

LangGraph Complete Course for Beginners – Complex AI Agents with Python

python
import os

from langchain_community.vectorstores import Chroma
from langchain_openai import ChatOpenAI, OpenAIEmbeddings
from langchain_community.document_loaders import PyPDFLoader
from dotenv import load_dotenv
from langchain_text_splitters import RecursiveCharacterTextSplitter
from langchain_core.tools import tool
from typing import TypedDict, Annotated, Sequence
from langchain_core.messages import BaseMessage, SystemMessage, ToolMessage, HumanMessage
from operator import add as add_messages
from langgraph.graph import StateGraph, END
from langchain.callbacks.base import BaseCallbackHandler

load_dotenv()

# ===== 回调 Handler =====
class PrintCallbackHandler(BaseCallbackHandler):
    def on_llm_new_token(self, token: str, **kwargs):
        print(token, end="", flush=True)  # 连续输出,不换行,flush=True 保证实时刷到终端

    def on_llm_start(self, serialized, prompts, **kwargs):
        print(f"\n[llm_start] {prompts}")

    def on_llm_end(self, response, **kwargs):
        print(f"\n[llm_end] {response}")

# ===== LLM =====
llm = ChatOpenAI(model="deepseek-ai/DeepSeek-V3.1", temperature=0, streaming=True, callbacks=[PrintCallbackHandler()]) # temperature 越小,回答越确定

embeddings = OpenAIEmbeddings(model="Qwen/Qwen3-Embedding-4B")

@tool
def retriever_tool(query: str) -> str:
    """
    This tool searches and returns the information from the Stock Market Performance 2024 document.
    """

    docs = retriever.invoke(query)

    if not docs:
        return "I found no relevant information in the Stock Market Performance 2024 document."

    results = []
    for i, doc in enumerate(docs):
        results.append(f"Document {i + 1}:\n{doc.page_content}")

    return "\n\n".join(results)

class AgentState(TypedDict):
    messages: Annotated[Sequence[BaseMessage], add_messages]

def should_continue(state: AgentState):
    """Check if the last message contains tool calls."""
    result = state['messages'][-1]
    return hasattr(result, 'tool_calls') and len(result.tool_calls) > 0


system_prompt = """
You are an intelligent AI assistant who answers questions about question based on the PDF document loaded into your knowledge base.
"""

tools = [retriever_tool]
llm = llm.bind_tools(tools)

# LLM Agent
def call_llm(state: AgentState) -> AgentState:
    """Function to call the LLM with the current state."""
    messages = list(state['messages'])
    messages = [SystemMessage(content=system_prompt)] + messages
    message = llm.invoke(messages)
    return {'messages': [message]}

tools_dict = {our_tool.name: our_tool for our_tool in tools}  # Creating a dictionary of our tools

# Retriever Agent
def take_action(state: AgentState) -> AgentState:
    """Execute tool calls from the LLM's response."""

    tool_calls = state['messages'][-1].tool_calls
    results = []
    for t in tool_calls:
        print(f"Calling Tool: {t['name']} with query: {t['args'].get('query', 'No query provided')}")

        if not t['name'] in tools_dict:  # Checks if a valid tool is present
            print(f"\nTool: {t['name']} does not exist.")
            result = "Incorrect Tool Name, Please Retry and Select tool from List of Available tools."

        else:
            result = tools_dict[t['name']].invoke(t['args'].get('query', ''))
            print(f"Result length: {len(str(result))}")

        # Appends the Tool Message
        results.append(ToolMessage(tool_call_id=t['id'], name=t['name'], content=str(result)))

    print("Tools Execution Complete. Back to the model!")
    return {'messages': results}

if __name__ == '__main__':
    pdf_path = "/Users/luca/Downloads/test.pdf"

    if not os.path.exists(pdf_path):
        raise FileNotFoundError(f"File {pdf_path} not found")

    pdf_loader = PyPDFLoader(pdf_path)

    try:
        pages = pdf_loader.load()
        print(f"Loaded {len(pages)} pages")
    except Exception as e:
        print(f"Error loading PDF: {e}")
        raise

    # Chunking Process
    text_splitter = RecursiveCharacterTextSplitter(
        chunk_size=1000,
        chunk_overlap=200,
    )
    pages_split = text_splitter.split_documents(pages)

    persist_directory = "/Users/luca/Downloads/db"
    collection_name = "game"
    if not os.path.exists(persist_directory):
        os.makedirs(persist_directory)

    try:
        if os.listdir(persist_directory):
            # 加载已存在的向量库
            vectorstore = Chroma(collection_name, embeddings, persist_directory=persist_directory)
            print(f"Vector store loaded from {persist_directory} with collection {collection_name}")
        else:
            # 如果目录没有数据,则创建新的
            vectorstore = Chroma.from_documents(
                documents=pages_split,
                embedding=embeddings,
                persist_directory=persist_directory,
                collection_name=collection_name
            )
            print(f"Vector store created at {persist_directory} with collection {collection_name}")
    except Exception as e:
        print(f"Error creating vector store: {e}")
        raise

    retriever = vectorstore.as_retriever(search_type="similarity", search_kwargs={"k": 5})

    graph = StateGraph(AgentState)
    graph.add_node("llm", call_llm)
    graph.add_node("retriever_agent", take_action)

    graph.add_conditional_edges(
        "llm",
        should_continue,
        {True: "retriever_agent", False: END}
    )
    graph.add_edge("retriever_agent", "llm")
    graph.set_entry_point("llm")

    rag_agent = graph.compile()

    print("\n=== RAG AGENT===")

    while True:
        user_input = input("\nWhat is your question: ")
        if user_input.lower() in ['exit', 'quit']:
            break

        messages = [HumanMessage(content=user_input)]  # converts back to a HumanMessage type

        result = rag_agent.invoke({"messages": messages})

        print("\n=== ANSWER ===")
        print(result['messages'][-1].content)

推荐系统

LLM Course – Build a Semantic Book Recommender (Python, OpenAI, LangChain, Gradio)