ABOUT ME

-

Today
-
Yesterday
-
Total
-
  • [AI Agent] LangChain/LangGraph invoke()와 stream() (+ 그리고 How to stream tool calls)
    AI/NLP 2025. 5. 24. 12:43
    728x90

     

    [AI Agent] LangChain/LangGraph invoke()와 stream() (+ 그리고 How to stream tool calls)

     

    드디어 소기의 목적인 비동기 기반의 LLM 스트리밍 서비스를 위해 LangChain과 FastAPI를 조합하고, 기존의 invoke() 방식에서 벗어나 astream()을 적용!

     

    * 이전 포스트에서 이어집니다 (https://asidefine.tistory.com/332)

     

    LangChain의 invoke, stream, batch 

    invoke, stream, batch 함수

    사용자 정의 체인을 쉽게 만들도록 대부분의 컴포넌트에 Runnable 프로토콜을 구현해 놓았으며, 이 중 invoke, stream, batch는 다음과 같은 역할을 수행하는 표준 인터페이스이다.

    • invoke : 입력에 대한 체인 호출(응답)
    • stream : 응답 청크 스트리밍(실시간 출력)
    • batch : 입력 여러 개(배치)에 대한 체인 호출(배치 처리하는 응답)

     

    from langchain_openai import ChatOpenAI
    from langchain_core.prompts import PromptTemplate
    from langchain_core.output_parsers import StrOutputParser
    
    llm = ChatOpenAI(
    	temperature=0.2,
        model_name="gpt-4-turbo"
    )
    prompt = PromptTemplate.from_template("{location}에 대해 소개하는 글을 2문장으로 작성해줘.")
    chain = prompt | llm | StrOutputParser() # create chain
    
    # invoke
    chain.invoke({"location": "서울"})
    
    # stream
    for token in chain.stream({"location": "서울"}):
    	print(token, end="", flush=True)
    
    # batch 
    chain.batch([{"location" : "서울"}, {"location" : "캘리포니아"}])

     

    LangGraph 스트리밍 (Streaming)

    LangGraph의 그래프 객체는 .stream()(동기) 및 .astream()(비동기) 메서드를 제공합니다

    for chunk in graph.stream(inputs, stream_mode="updates"):
        print(chunk)
    
    async for chunk in graph.astream(inputs, stream_mode="updates"):
        print(chunk)

     

    🔑 LangGraph에서 스트리밍 가능한 주요 데이터는 세 가지

     

    • 워크플로우 진행 상태 (Workflow progress)
      • 각 그래프 노드가 실행된 후의 상태 업데이트를 받아볼 수 있습니다.
    • LLM 토큰 (LLM tokens)
      • 언어 모델이 생성하는 토큰을 실시간으로 스트리밍할 수 있습니다.
    • 사용자 정의 업데이트 (Custom updates)
      • 예: "전체 100개 중 10개 로딩 완료" 같은 사용자 지정 시그널을 도구(tool) 함수에서 직접 보낼 수 있습니다.

     

     

     

    LangGraph 스트리밍으로 가능한 것들

     

    • LLM 토큰 스트리밍
      • 노드 내부, 서브그래프, 툴 등 어디서든 생성되는 토큰을 스트리밍할 수 있습니다.
    • 도구(tool)에서 진행 상태 알림 전송
      • 도구 함수 내부에서 진행 상황이나 사용자 정의 메시지를 직접 전송할 수 있습니다.
    • 서브그래프에서도 스트리밍 지원
      • 부모 그래프뿐 아니라 중첩된 서브그래프의 출력까지 함께 스트리밍할 수 있습니다.
    • 모든 LLM 모델 사용 가능
      • LangChain을 사용하지 않는 언어 모델도 커스텀 스트리밍 모드를 사용해 스트리밍할 수 있습니다.
    • 다양한 스트리밍 모드 지원
      • 원하는 데이터 유형에 따라 아래 스트리밍 모드 중 선택할 수 있습니다:

     

      설명
    values 그래프의 각 단계 이후 전체 상태 값을 스트리밍합니다.
    updates 각 단계 이후의 변경된 상태만 스트리밍합니다. 여러 노드가 동시에 실행될 경우, 각각의 업데이트가 따로 스트리밍됩니다.
    custom 그래프 노드 내부에서 발생한 사용자 정의 데이터를 스트리밍합니다.
    messages LLM 토큰과 메타데이터를 포함한 튜플을 스트리밍합니다.
    debug 실행 전체에 대해 가능한 모든 디버깅 정보를 스트리밍합니다.

     

    🔄 여러 모드 동시에 사용

    stream_mode에 리스트를 넘기면 여러 스트리밍 모드를 동시에 사용할 수 있습니다.

    for mode, chunk in graph.stream(inputs, stream_mode=["updates", "custom"]):
        print(chunk)

     

     

    +) LangChain Tool Calling Streaming 방법 
    • LangChain에서 툴이 스트리밍 환경에서 호출될 경우, **각 메시지 청크(message chunk)**는 .tool_call_chunks라는 속성을 통해 ToolCallChunk 객체의 리스트를 포함하게 됩니다.

     

    🔹 ToolCallChunk는 다음과 같은 정보를 담습니다:

    • name: 툴 이름 (예: "Add")
    • args: 인자 문자열(조각일 수 있음)
    • id: 호출 식별자
    • index: 툴 호출 순서를 나타내는 정수

    ⚠️ 이 필드들은 조각(streamed chunk) 형태로 나누어져 올 수 있기 때문에, 일부 청크에는 name이나 id가 None일 수도 있습니다.

    또한, AIMessageChunk 클래스는 .tool_calls 및 .invalid_tool_calls라는 필드를 추가로 제공합니다.
    이 필드들은 .tool_call_chunks로부터 LangChain이 최대한 잘 파싱해낸 구조화된 툴 호출 정보입니다.

     

    from langchain_core.tools import tool
    
    @tool
    def add(a: int, b: int) -> int:
        return a + b
    
    @tool
    def multiply(a: int, b: int) -> int:
        return a * b
    
    tools = [add, multiply]
    
    from langchain_openai import ChatOpenAI
    
    llm = ChatOpenAI(model="gpt-4o-mini", temperature=0)
    llm_with_tools = llm.bind_tools(tools)
    query = "What is 3 * 12? Also, what is 11 + 49?"
    
    async for chunk in llm_with_tools.astream(query):
        print(chunk.tool_call_chunks)
    
    # []
    # [{'name': 'Multiply', 'args': '', 'id': ..., 'index': 0}]
    # [{'name': None, 'args': '{"a"', 'id': None, 'index': 0}]
    # [{'name': None, 'args': ': 3, ', 'id': None, 'index': 0}]
    # [{'name': None, 'args': '"b": 1', 'id': None, 'index': 0}]
    # [{'name': None, 'args': '2}', 'id': None, 'index': 0}]
    # ...

    🔄 툴 호출 조각 누적 (Chunk Accumulation)

    여러 청크를 누적해서 점점 완성된 툴 호출 정보를 만들어갈 수 있습니다.

     

    first = True
    async for chunk in llm_with_tools.astream(query):
        if first:
            gathered = chunk
            first = False
        else:
            gathered = gathered + chunk
    
        print(gathered.tool_call_chunks)
    # [{'name': 'Multiply', 'args': '{"a": 3, "b": 12}', 'id': ..., 'index': 0}, {'name': 'Add', 'args': '{"a": 11, "b": 49}', 'id': ..., 'index': 1}]

    ✅ 파싱된 최종 Tool Calls 보기

    이제 LangChain이 .tool_call_chunks를 기반으로 파싱한 tool_calls 결과는 다음과 같습니다

    print(gathered.tool_calls)
    # [{'name': 'Multiply', 'args': {'a': 3, 'b': 12}, 'id': ...}, {'name': 'Add', 'args': {'a': 11, 'b': 49}, 'id': ...}]

    .tool_calls[0]["args"]는 딕셔너리(dict) 형태로 파싱됨

    Reference. 

     

     

    728x90
Designed by Tistory.