[AI Agent] LangChain/LangGraph invoke()와 stream() (+ 그리고 How to stream tool calls)
[AI Agent] LangChain/LangGraph invoke()와 stream() (+ 그리고 How to stream tool calls)
드디어 소기의 목적인 비동기 기반의 LLM 스트리밍 서비스를 위해 LangChain과 FastAPI를 조합하고, 기존의 invoke() 방식에서 벗어나 astream()을 적용!
* 이전 포스트에서 이어집니다 (https://asidefine.tistory.com/332)
LangChain의 invoke, stream, batch
invoke, stream, batch 함수
사용자 정의 체인을 쉽게 만들도록 대부분의 컴포넌트에 Runnable 프로토콜을 구현해 놓았으며, 이 중 invoke, stream, batch는 다음과 같은 역할을 수행하는 표준 인터페이스이다.
- invoke : 입력에 대한 체인 호출(응답)
- stream : 응답 청크 스트리밍(실시간 출력)
- batch : 입력 여러 개(배치)에 대한 체인 호출(배치 처리하는 응답)
from langchain_openai import ChatOpenAI
from langchain_core.prompts import PromptTemplate
from langchain_core.output_parsers import StrOutputParser
llm = ChatOpenAI(
temperature=0.2,
model_name="gpt-4-turbo"
)
prompt = PromptTemplate.from_template("{location}에 대해 소개하는 글을 2문장으로 작성해줘.")
chain = prompt | llm | StrOutputParser() # create chain
# invoke
chain.invoke({"location": "서울"})
# stream
for token in chain.stream({"location": "서울"}):
print(token, end="", flush=True)
# batch
chain.batch([{"location" : "서울"}, {"location" : "캘리포니아"}])
LangGraph 스트리밍 (Streaming)
LangGraph의 그래프 객체는 .stream()(동기) 및 .astream()(비동기) 메서드를 제공합니다
for chunk in graph.stream(inputs, stream_mode="updates"):
print(chunk)
async for chunk in graph.astream(inputs, stream_mode="updates"):
print(chunk)
🔑 LangGraph에서 스트리밍 가능한 주요 데이터는 세 가지
- 워크플로우 진행 상태 (Workflow progress)
- 각 그래프 노드가 실행된 후의 상태 업데이트를 받아볼 수 있습니다.
- LLM 토큰 (LLM tokens)
- 언어 모델이 생성하는 토큰을 실시간으로 스트리밍할 수 있습니다.
- 사용자 정의 업데이트 (Custom updates)
- 예: "전체 100개 중 10개 로딩 완료" 같은 사용자 지정 시그널을 도구(tool) 함수에서 직접 보낼 수 있습니다.
LangGraph 스트리밍으로 가능한 것들
- LLM 토큰 스트리밍
- 노드 내부, 서브그래프, 툴 등 어디서든 생성되는 토큰을 스트리밍할 수 있습니다.
- 도구(tool)에서 진행 상태 알림 전송
- 도구 함수 내부에서 진행 상황이나 사용자 정의 메시지를 직접 전송할 수 있습니다.
- 서브그래프에서도 스트리밍 지원
- 부모 그래프뿐 아니라 중첩된 서브그래프의 출력까지 함께 스트리밍할 수 있습니다.
- 모든 LLM 모델 사용 가능
- LangChain을 사용하지 않는 언어 모델도 커스텀 스트리밍 모드를 사용해 스트리밍할 수 있습니다.
- 다양한 스트리밍 모드 지원
- 원하는 데이터 유형에 따라 아래 스트리밍 모드 중 선택할 수 있습니다:
설명 | |
values | 그래프의 각 단계 이후 전체 상태 값을 스트리밍합니다. |
updates | 각 단계 이후의 변경된 상태만 스트리밍합니다. 여러 노드가 동시에 실행될 경우, 각각의 업데이트가 따로 스트리밍됩니다. |
custom | 그래프 노드 내부에서 발생한 사용자 정의 데이터를 스트리밍합니다. |
messages | LLM 토큰과 메타데이터를 포함한 튜플을 스트리밍합니다. |
debug | 실행 전체에 대해 가능한 모든 디버깅 정보를 스트리밍합니다. |
🔄 여러 모드 동시에 사용
stream_mode에 리스트를 넘기면 여러 스트리밍 모드를 동시에 사용할 수 있습니다.
for mode, chunk in graph.stream(inputs, stream_mode=["updates", "custom"]):
print(chunk)
+) LangChain Tool Calling Streaming 방법
- LangChain에서 툴이 스트리밍 환경에서 호출될 경우, **각 메시지 청크(message chunk)**는 .tool_call_chunks라는 속성을 통해 ToolCallChunk 객체의 리스트를 포함하게 됩니다.
🔹 ToolCallChunk는 다음과 같은 정보를 담습니다:
- name: 툴 이름 (예: "Add")
- args: 인자 문자열(조각일 수 있음)
- id: 호출 식별자
- index: 툴 호출 순서를 나타내는 정수
⚠️ 이 필드들은 조각(streamed chunk) 형태로 나누어져 올 수 있기 때문에, 일부 청크에는 name이나 id가 None일 수도 있습니다.
또한, AIMessageChunk 클래스는 .tool_calls 및 .invalid_tool_calls라는 필드를 추가로 제공합니다.
이 필드들은 .tool_call_chunks로부터 LangChain이 최대한 잘 파싱해낸 구조화된 툴 호출 정보입니다.
from langchain_core.tools import tool
@tool
def add(a: int, b: int) -> int:
return a + b
@tool
def multiply(a: int, b: int) -> int:
return a * b
tools = [add, multiply]
from langchain_openai import ChatOpenAI
llm = ChatOpenAI(model="gpt-4o-mini", temperature=0)
llm_with_tools = llm.bind_tools(tools)
query = "What is 3 * 12? Also, what is 11 + 49?"
async for chunk in llm_with_tools.astream(query):
print(chunk.tool_call_chunks)
# []
# [{'name': 'Multiply', 'args': '', 'id': ..., 'index': 0}]
# [{'name': None, 'args': '{"a"', 'id': None, 'index': 0}]
# [{'name': None, 'args': ': 3, ', 'id': None, 'index': 0}]
# [{'name': None, 'args': '"b": 1', 'id': None, 'index': 0}]
# [{'name': None, 'args': '2}', 'id': None, 'index': 0}]
# ...
🔄 툴 호출 조각 누적 (Chunk Accumulation)
여러 청크를 누적해서 점점 완성된 툴 호출 정보를 만들어갈 수 있습니다.
first = True
async for chunk in llm_with_tools.astream(query):
if first:
gathered = chunk
first = False
else:
gathered = gathered + chunk
print(gathered.tool_call_chunks)
# [{'name': 'Multiply', 'args': '{"a": 3, "b": 12}', 'id': ..., 'index': 0}, {'name': 'Add', 'args': '{"a": 11, "b": 49}', 'id': ..., 'index': 1}]
✅ 파싱된 최종 Tool Calls 보기
이제 LangChain이 .tool_call_chunks를 기반으로 파싱한 tool_calls 결과는 다음과 같습니다
print(gathered.tool_calls)
# [{'name': 'Multiply', 'args': {'a': 3, 'b': 12}, 'id': ...}, {'name': 'Add', 'args': {'a': 11, 'b': 49}, 'id': ...}]
.tool_calls[0]["args"]는 딕셔너리(dict) 형태로 파싱됨
Reference.
- LangGraph 비동기/동기
- LangGraph Invoke & Stream
- https://rudaks.tistory.com/entry/langgraph-%EC%BB%A4%EC%8A%A4%ED%85%80-%EB%8D%B0%EC%9D%B4%ED%84%B0%EB%A5%BC-%EC%8A%A4%ED%8A%B8%EB%A6%AC%EB%B0%8D%ED%95%98%EB%8A%94-%EB%B0%A9%EB%B2%95
- https://langchain-ai.github.io/langgraph/how-tos/streaming/
- https://python.langchain.com/docs/how_to/tool_streaming/
- https://github.com/teddylee777/langchain-teddynote/blob/main/langchain_teddynote/messages.py#L60