LLM

RunnableSequence와 RunnableParallel로 LLM 체인 병렬 처리 및 순차 실행

일찍자기 2025. 11. 21. 13:14

1. LangChain과 Runnable 인터페이스란?

LangChain은 LLM 기반 애플리케이션 개발을 위한 프레임워크입니다. 프롬프트, LLM, 출력 파서 등 다양한 구성 요소를 연결하여 복잡한 로직을 가진 애플리케이션을 손쉽게 구축할 수 있게 해줍니다.

그 중 Runnable 인터페이스는 LangChain의 모든 구성 요소(프롬프트, LLM, 파서, 다른 체인 등)가 따르는 표준 인터페이스입니다. 이를 통해 어떤 Runnable 객체든 | 연산자를 사용하여 파이프라인처럼 연결할 수 있게 됩니다. 마치 리눅스의 파이프(|)처럼 한 요소의 출력을 다음 요소의 입력으로 전달하는 것이죠!

2. 환경 설정 🛠️

먼저 필요한 라이브러리를 설치하고 OpenAI LLM을 설정합니다.

# 필요한 라이브러리 설치 (코랩에서 실행 시)
# !pip install -U langchain langchain_openai langchain_core

import os
from langchain_core.prompts import ChatPromptTemplate
from langchain_core.output_parsers import StrOutputParser
from langchain_core.runnables import RunnableParallel, RunnableSequence
from langchain_openai import ChatOpenAI
from operator import itemgetter # 딕셔너리에서 특정 키의 값을 추출할 때 유용

# OpenAI API 키 설정 (환경 변수에 미리 설정하는 것이 좋습니다)
# os.environ["OPENAI_API_KEY"] = "YOUR_API_KEY"

# LLM 모델 초기화
llm = ChatOpenAI(model="gpt-3.5-turbo", temperature=0.7) # 예시로 gpt-3.5-turbo 사용
parser = StrOutputParser() # LLM 출력을 문자열로 파싱

3. 기본 체인 맛보기 ⛓️

가장 기본적인 LangChain 체인은 프롬프트, LLM, 출력 파서의 순서로 구성됩니다.

basic_prompt = ChatPromptTemplate.from_template("What is the capital of {country}?")
basic_chain = basic_prompt | llm | parser

print(basic_chain.invoke({"country": "France"}))
# 출력 예시: Paris

4. 순차적 워크플로우: RunnableSequence의 힘 💪

여러 단계를 순서대로 실행해야 할 때 RunnableSequence를 사용합니다. | 연산자를 사용해 여러 Runnable 객체를 연결하면 그것 자체가 RunnableSequence가 됩니다. 각 단계의 출력은 다음 단계의 입력이 됩니다.

# 예시: 특정 주제에 대해 질문을 하고 답변을 요약하는 체인
sequential_chain = RunnableSequence(
    ChatPromptTemplate.from_template("Tell me 3 interesting facts about {topic}.") | llm | parser,
    ChatPromptTemplate.from_template("Summarize the following facts in one sentence:\n\n{input}") | llm | parser
)

# 간결한 표현:
# sequential_chain = (
#     ChatPromptTemplate.from_template("Tell me 3 interesting facts about {topic}.")
#     | llm
#     | parser
#     | ChatPromptTemplate.from_template("Summarize the following facts in one sentence:\n\n{input}")
#     | llm
#     | parser
# )

print(sequential_chain.invoke({"topic": "Mars"}))
# 출력 예시: Mars is known as the Red Planet due to its iron-rich surface, has two small moons named Phobos and Deimos, and is home to the largest volcano in the solar system, Olympus Mons.

이처럼 RunnableSequence는 복잡한 작업을 여러 작은 단계로 나누어 처리하고, 각 단계의 결과를 다음 단계로 넘겨주는 데 아주 효과적입니다.

5. 병렬 처리의 마법: RunnableParallel 활용 ⚡

때로는 여러 작업을 동시에 실행하여 각각의 결과를 얻고 싶을 때가 있습니다. 예를 들어, 하나의 입력으로부터 여러 종류의 정보를 추출하거나, 여러 개의 프롬프트를 동시에 생성해야 할 때 유용합니다. RunnableParallel은 이를 가능하게 합니다.

RunnableParallel은 딕셔너리 형태로 Runnable 객체들을 받으며, 각 객체는 동일한 입력을 받아서 병렬로 실행됩니다. 결과는 딕셔너리 형태로 반환됩니다.

# 예시: 하나의 입력(영화)으로 줄거리와 주요 등장인물을 동시에 추출
parallel_extraction_chain = RunnableParallel(
    plot=ChatPromptTemplate.from_template("Summarize the plot of the movie {movie_title} in 2 sentences.") | llm | parser,
    characters=ChatPromptTemplate.from_template("List the 3 main characters in the movie {movie_title}, comma-separated.") | llm | parser
)

result = parallel_extraction_chain.invoke({"movie_title": "Inception"})
print(result)
# 출력 예시:
# {
#   'plot': 'Dom Cobb is a skilled thief who steals information by entering people\'s dreams. He is offered a chance to have his criminal history erased as payment for planting an idea into a target\'s subconscious.',
#   'characters': 'Dom Cobb, Ariadne, Arthur'
# }

RunnableParallel의 강력함은 단일 입력으로 여러 출력을 동시에 생성하고, 이 출력들을 나중에 조합하여 사용할 수 있다는 점입니다.

6. 복잡한 워크플로우 만들기: RunnableSequence와 RunnableParallel의 조화 🎨

실제 LLM 애플리케이션에서는 순차적인 처리와 병렬 처리가 혼합된 복잡한 워크플로우가 필요합니다. RunnableSequence와 RunnableParallel을 함께 사용하면 이런 복잡한 체인도 유연하게 구성할 수 있습니다.

예를 들어, 먼저 병렬로 여러 정보를 추출한 다음, 이 정보들을 모아서 순차적으로 다음 작업을 수행할 수 있습니다.

✨ 독창적인 사례 연구: AI 고객 피드백 분석기 만들기 ✨

이제 위에서 배운 RunnableSequence와 RunnableParallel을 활용하여 실제 시나리오에 적용해봅시다. 우리의 목표는 고객 피드백 텍스트를 입력받아, 그 피드백의 감성(Sentiment), **핵심 사항(Key Aspects)**을 병렬로 추출하고, 이 정보들을 바탕으로 **요약(Summary)**과 **실행 가능한 개선 사항(Actionable Insights)**을 또 다시 병렬로 생성하는 'AI 고객 피드백 분석기'를 만드는 것입니다.

문제 정의

기업은 매일 수많은 고객 피드백을 받습니다. 이 피드백들을 수동으로 분류하고 분석하는 것은 시간과 비용이 많이 드는 작업입니다. AI를 활용하여 피드백을 자동으로 분석하고, 경영진이 빠르게 이해할 수 있는 요약과 구체적인 개선 방안을 제시해준다면 업무 효율을 크게 높일 수 있을 것입니다.

솔루션 설계

우리의 분석기는 다음 단계를 거쳐 작동합니다:

  1. 초기 병렬 분석 (Initial Parallel Analysis):
    • 입력된 고객 피드백으로부터 감성(긍정/부정/중립)을 파악합니다.
    • 동시에 피드백에서 언급된 핵심 사항이나 주제를 추출합니다.
    • 이 두 작업은 서로 독립적이므로 병렬로 처리합니다.
  2. 결과 통합 및 최종 병렬 생성 (Combine & Final Parallel Generation):
    • 원래의 피드백 텍스트와 1단계에서 얻은 감성, 핵심 사항을 통합하여 새로운 입력으로 만듭니다.
    • 이 통합된 입력을 바탕으로 피드백을 간결하게 요약합니다.
    • 동시에 감성과 핵심 사항을 고려하여 구체적인 개선 사항이나 행동 지침을 제안합니다.
    • 이 두 작업(요약, 개선 사항 제안) 또한 병렬로 처리합니다.

이러한 구조는 RunnableParallel을 여러 번 사용하고, 그 사이에 RunnableSequence (암묵적인 | 연산자)를 통해 데이터 흐름을 제어하는 방식으로 구현할 수 있습니다.

코드 구현

# --- 프롬프트 정의 ---
sentiment_prompt = ChatPromptTemplate.from_template(
    "Analyze the sentiment of the following customer feedback. Respond with 'positive', 'negative', or 'neutral' ONLY.\n\nFeedback: {feedback}"
)

aspects_prompt = ChatPromptTemplate.from_template(
    "Extract up to 3 main key aspects or topics mentioned in the following customer feedback, comma-separated. If none, respond with 'N/A' ONLY.\n\nFeedback: {feedback}"
)

summary_prompt = ChatPromptTemplate.from_template(
    "Given the sentiment '{sentiment}' and key aspects '{key_aspects}' of the customer feedback, provide a concise summary (max 50 words) of the original feedback:\n\nOriginal Feedback: {original_feedback}"
)

insights_prompt = ChatPromptTemplate.from_template(
    "Based on the sentiment '{sentiment}' and key aspects '{key_aspects}' from the customer feedback, suggest 2-3 actionable insights or improvements. If the sentiment is positive, focus on how to leverage it. If negative, suggest fixes. Number each insight.\n\nOriginal Feedback: {original_feedback}"
)

# --- 체인 구성 ---

# 1단계: 초기 병렬 분석 - 감성 및 핵심 사항 추출
# 입력: {"feedback": "..."}
# 출력: {"sentiment": "...", "key_aspects": "..."}
initial_analysis_chain = RunnableParallel(
    sentiment=sentiment_prompt | llm | parser,
    key_aspects=aspects_prompt | llm | parser,
).with_config(run_name="Initial_Feedback_Analysis")

# 2단계: 원래 피드백과 1단계 결과를 결합하여 다음 단계의 입력 준비
# 이 부분은 "병렬적으로" 원래 feedback을 통과시키고,
# 동시에 "병렬적으로" initial_analysis_chain을 실행하여 결과를 추출합니다.
# 입력: {"feedback": "..."}
# 출력: {"original_feedback": "...", "sentiment": "...", "key_aspects": "..."}
combined_input_preparer = RunnableParallel(
    original_feedback=itemgetter("feedback"), # 원본 피드백을 그대로 전달
    sentiment=initial_analysis_chain | itemgetter("sentiment"), # 1단계 분석 결과에서 감성 추출
    key_aspects=initial_analysis_chain | itemgetter("key_aspects"), # 1단계 분석 결과에서 핵심 사항 추출
).with_config(run_name="Prepare_Inputs_for_Final_Generation")

# 3단계: 최종 병렬 생성 - 요약 및 실행 가능한 개선 사항 제안
# 입력: {"original_feedback": "...", "sentiment": "...", "key_aspects": "..."}
# 출력: {"summary": "...", "actionable_insights": "..."}
final_generation_chain = RunnableParallel(
    summary=summary_prompt | llm | parser,
    actionable_insights=insights_prompt | llm | parser,
).with_config(run_name="Final_Output_Generation")

# 전체 고객 피드백 분석기 체인 연결 (순차적 실행)
# combined_input_preparer의 출력이 final_generation_chain의 입력이 됩니다.
full_feedback_analyzer = combined_input_preparer | final_generation_chain

# --- 실행 예시 ---
customer_feedback = {
    "feedback": "The new software update is very buggy, especially with the user interface. Performance has also decreased significantly. I miss the old version and encountered several crashes daily. Please fix this soon!"
}

print("AI 고객 피드백 분석 시작... 🚀")
analysis_result = full_feedback_analyzer.invoke(customer_feedback)
print("\n--- 분석 결과 ---")
print(f"Summary: {analysis_result['summary']}")
print(f"Actionable Insights:\n{analysis_result['actionable_insights']}")

# 다른 예시
customer_feedback_positive = {
    "feedback": "I absolutely love the new dark mode feature! It's so much easier on my eyes, and the overall design feels very sleek and modern. Great job, keep up the excellent work!"
}

print("\nAI 고객 피드백 분석 시작 (긍정 피드백)... 😊")
analysis_result_positive = full_feedback_analyzer.invoke(customer_feedback_positive)
print("\n--- 분석 결과 ---")
print(f"Summary: {analysis_result_positive['summary']}")
print(f"Actionable Insights:\n{analysis_result_positive['actionable_insights']}")

코드 설명

  1. sentiment_prompt & aspects_prompt: 고객 피드백에서 감성과 핵심 사항을 추출하기 위한 프롬프트입니다.
  2. initial_analysis_chain: RunnableParallel을 사용하여 sentiment_prompt와 aspects_prompt를 동시에 실행합니다. 입력으로 {"feedback": "..."}를 받고, 출력으로 {"sentiment": "...", "key_aspects": "..."}를 반환합니다.
  3. combined_input_preparer:
    • 이 단계는 RunnableParallel을 사용하지만, 실제로는 다음 단계에서 필요한 모든 데이터를 한데 모으는 역할을 합니다.
    • itemgetter("feedback")를 통해 원래의 feedback을 original_feedback이라는 이름으로 그대로 전달합니다.
    • initial_analysis_chain | itemgetter("sentiment")는 1단계 분석 체인의 결과에서 sentiment 값을 추출하여 전달합니다. key_aspects도 마찬가지입니다.
    • 결과적으로 {"original_feedback": "...", "sentiment": "...", "key_aspects": "..."} 형태의 딕셔너리를 생성하여 다음 단계에 전달합니다.
  4. summary_prompt & insights_prompt: 2단계에서 준비된 통합된 입력(original_feedback, sentiment, key_aspects)을 바탕으로 요약과 개선 사항을 생성하기 위한 프롬프트입니다.
  5. final_generation_chain: 다시 RunnableParallel을 사용하여 요약과 개선 사항을 동시에 생성합니다.
  6. full_feedback_analyzer: combined_input_preparer | final_generation_chain과 같이 | 연산자를 사용하여 앞서 정의한 두 체인을 순차적으로 연결합니다. combined_input_preparer의 출력이 final_generation_chain의 입력이 되는 것이죠.

이처럼 RunnableParallel을 통해 동시에 여러 작업을 처리하고, | 연산자(RunnableSequence의 암묵적 사용)를 통해 각 단계의 결과를 다음 단계로 넘겨주는 방식으로 복잡하지만 효율적인 LLM 애플리케이션을 구축할 수 있습니다.

 


출처: 이 블로그 게시물은 LangChain Academy의 "Connecting Components" Colab 노트북의 내용을 바탕으로 재구성 및 확장하여 작성되었습니다.