본문 바로가기
AI 코딩

로컬AI 구축의 핵심 : LLM과 임베딩 모델의 협업

by 피크나인 2025. 7. 28.

 

AI가 어떻게 똑똑해지는 걸까요?

 

많은 개발자들이 "AI를 만들려면 ChatGPT 같은 모델 하나만 있으면 되는 거 아닌가?"라고 생각합니다.

하지만 실제로 업무에 활용할 수 있는 똑똑한 AI를 만들려면 두 개의 서로 다른 AI 모델이 팀워크를 이뤄야 합니다. 이는 마치 사람이 뇌에서 기억을 저장하는 부분(해마)과 생각하고 판단하는 부분(전두엽)이 따로 있는 것과 같습니다. AI도 마찬가지로 정보를 저장하고 찾는 역할을 하는 임베딩 모델과 그 정보를 바탕으로 똑똑한 답변을 만드는 LLM(대형 언어 모델)이 각각의 전문 역할을 담당합니다.

 

이 두 모델이 협력하는 이유는 각각이 서로 다른 강점을 가지고 있기 때문입니다.

임베딩 모델은 마치 도서관 사서처럼 수많은 문서 중에서 질문과 관련된 정보를 빠르게 찾아내는 일을 담당합니다. 반면 LLM은 그 정보들을 읽고 이해해서 자연스럽고 유용한 답변을 만들어내는 작가 역할을 합니다. 만약 하나의 모델로 모든 것을 처리하려고 한다면 엄청난 양의 메모리와 처리 능력이 필요하고, 성능도 떨어지며, 새로운 정보를 추가할 때마다 전체 시스템을 다시 학습시켜야 하는 비효율적인 문제가 발생합니다.

인간의 뇌처럼 기억저장과 처리가 분리된 AI시스템의 기본 구조로 AI 인공지능 신경망은 인간의 뇌를 본떠서 만들고 있습니다.
인간의 뇌처럼 기억저장과 처리가 분리된 AI시스템의 기본 구조

 


1.  LLM과 임베딩 모델 기초 이해하기

1.1  LLM(대형 언어 모델)이란 무엇인가?

LLM의 기본 개념과 역할

LLM은 Large Language Model의 줄임말로, 말 그대로 매우 큰 언어 모델입니다.

이 모델은 인터넷에 있는 수많은 텍스트 데이터(책, 기사, 웹페이지 등)를 학습해서 인간의 언어를 이해하고 자연스러운 문장을 생성할 수 있게 된 AI입니다. 마치 수만 권의 책을 읽고 언어의 패턴을 완전히 익힌 매우 똑똑한 언어학자를 상상해보세요. LLM은 사용자가 질문을 하면 그 질문의 의미를 파악하고, 학습했던 지식을 바탕으로 적절한 답변을 만들어냅니다. 하지만 여기서 중요한 점은 LLM이 모든 정보를 기억하고 있는 것이 아니라, 언어의 패턴과 일반적인 지식을 학습한 것이라는 점입니다. 따라서 특정 회사의 매뉴얼이나 최신 정보 같은 구체적인 내용은 별도로 제공해줘야 정확한 답변을 할 수 있습니다.

LLM이 하는 주요 작업들

  • 텍스트 생성 : 주어진 주제나 시작 문장을 바탕으로 자연스러운 문장이나 문단을 계속 작성하는 기능입니다. 마치 작가가 소설을 쓰는 것처럼 문맥에 맞는 내용을 이어서 만들어냅니다.
  • 질문 답변 : 사용자의 질문을 이해하고 적절한 답변을 생성하는 기능으로, 이때 답변은 LLM이 학습한 지식을 바탕으로 만들어집니다.
  • 언어 번역 : 한 언어로 된 텍스트를 다른 언어로 번역하는 기능으로, 단순한 단어 치환이 아니라 문맥과 의미를 이해한 번역을 수행합니다.
  • 텍스트 요약 : 긴 문서나 기사를 읽고 핵심 내용만 간추려서 짧은 요약문을 만드는 기능입니다.
  • 대화 관리 : 여러 번의 질문과 답변이 이어지는 대화에서 이전 대화 내용을 기억하고 문맥을 유지하며 일관된 대화를 진행하는 기능입니다.
LLM의 기본 동작방식을 이해하기 위한 코드 예시 >
# LLM의 기본 동작 방식을 이해하기 위한 간단한 예시
class SimpleLLMExample:
    """LLM이 어떻게 동작하는지 보여주는 간단한 예시 클래스"""
    
    def __init__(self, model_name: str):
        # 실제로는 수십 기가바이트 크기의 모델 파일을 로드합니다
        self.model_name = model_name
        print(f"LLM 모델 '{model_name}'을 로드했습니다.")
        
        # 토크나이저: 텍스트를 모델이 이해할 수 있는 숫자로 변환하는 도구
        self.tokenizer = self.load_tokenizer()
        
        # 모델: 실제 텍스트 생성을 담당하는 신경망
        self.model = self.load_model()
    
    def load_tokenizer(self):
        """토크나이저 로드 - 텍스트를 숫자로 변환"""
        print("토크나이저를 로드하는 중...")
        # 실제로는 복잡한 토크나이저를 로드하지만 여기서는 간단히 표현
        return "tokenizer_loaded"
    
    def load_model(self):
        """실제 LLM 모델 로드"""
        print("LLM 모델을 메모리에 로드하는 중... (이 과정은 실제로 1-2분 소요)")
        # 실제로는 수십억 개의 매개변수를 가진 거대한 신경망을 로드
        return "model_loaded"
    
    def generate_response(self, user_question: str) -> str:
        """사용자 질문에 대한 답변 생성"""
        
        # 1단계: 사용자 입력을 토큰(숫자)으로 변환
        print(f"1단계: '{user_question}'을 숫자로 변환 중...")
        tokens = self.text_to_tokens(user_question)
        
        # 2단계: 모델이 다음에 올 단어들을 예측
        print("2단계: AI가 답변을 생각하는 중...")
        response_tokens = self.model_predict(tokens)
        
        # 3단계: 예측된 숫자들을 다시 텍스트로 변환
        print("3단계: 숫자를 텍스트로 변환 중...")
        response_text = self.tokens_to_text(response_tokens)
        
        return response_text
    
    def text_to_tokens(self, text: str):
        """텍스트를 토큰(숫자)으로 변환하는 과정"""
        # 실제로는 복잡한 변환 과정이지만 간단히 표현
        print(f"  텍스트 '{text[:30]}...'을 [1, 234, 567, ...] 같은 숫자로 변환")
        return [1, 234, 567, 89, 123]  # 예시 토큰들
    
    def model_predict(self, input_tokens):
        """모델이 다음 토큰들을 예측하는 과정"""
        # 실제로는 매우 복잡한 신경망 계산이 일어남
        print("  AI가 수십억 개의 매개변수를 사용해서 답변을 계산 중...")
        print("  이 과정에서 GPU의 수천 개 코어가 동시에 계산을 수행합니다.")
        
        # 예시로 임의의 토큰들 반환
        return [45, 678, 901, 234, 567, 890]
    
    def tokens_to_text(self, tokens):
        """토큰(숫자)을 다시 텍스트로 변환"""
        print(f"  숫자 {tokens}을 '네, 도움을 드릴 수 있습니다...' 같은 텍스트로 변환")
        return "네, 도움을 드릴 수 있습니다. 질문해주신 내용에 대해 답변드리겠습니다."

# 사용 예시 - LLM이 어떻게 동작하는지 확인
def demonstrate_llm_working():
    """LLM 동작 과정을 보여주는 데모"""
    print("=== LLM 동작 과정 데모 ===")
    
    # LLM 초기화
    llm = SimpleLLMExample("llama-3.3-70b")
    
    # 사용자 질문 처리
    user_question = "CNC 머신에서 진동이 발생하는 이유는 무엇인가요?"
    print(f"\n사용자 질문: {user_question}")
    print("\n--- LLM 내부 처리 과정 ---")
    
    # 답변 생성
    response = llm.generate_response(user_question)
    print(f"\n최종 답변: {response}")
    
    print("\n=== 처리 완료 ===")

# 실행해보기
demonstrate_llm_working()

1.2 임베딩 모델이란 무엇인가?

임베딩 모델의 기본 개념

임베딩 모델은 텍스트, 이미지, 음성 같은 데이터를 컴퓨터가 이해할 수 있는 숫자 배열(벡터)로 변환하는 AI 모델입니다.

이 과정을 간단히 설명하면, 단어나 문장의 '의미'를 숫자로 표현하는 것입니다. 예를 들어, "강아지"와 "개"는 비슷한 의미이므로 비슷한 숫자들로 표현되고, "강아지"와 "자동차"는 전혀 다른 의미이므로 완전히 다른 숫자들로 표현됩니다. 이렇게 만들어진 숫자 배열을 '임베딩 벡터'라고 부르며, 이 벡터들 사이의 거리를 계산하면 텍스트들이 얼마나 비슷한 의미인지 알 수 있습니다. 마치 지도에서 두 지점 사이의 거리를 재는 것처럼, 의미 공간에서 두 텍스트 사이의 의미적 거리를 측정할 수 있게 됩니다.

 

임베딩 모델이 하는 주요 작업들

  • 의미 벡터화 : 텍스트를 768차원이나 1024차원 같은 고차원 숫자 배열로 변환하는 작업입니다. 이 숫자들은 텍스트의 의미를 수학적으로 표현한 것으로, 컴퓨터가 의미를 계산할 수 있게 해줍니다.
  • 유사도 검색 : 특정 질문과 의미가 비슷한 문서들을 빠르게 찾아내는 기능입니다. 예를 들어 "엔진 소음"으로 검색하면 "모터 소리", "기계 잡음" 같은 관련 문서들도 함께 찾아줍니다.
  • 문서 분류 : 문서의 내용을 파악해서 적절한 카테고리로 분류하는 기능으로, 기술 문서, 매뉴얼, 보고서 등을 자동으로 구분할 수 있습니다.
  • 중복 제거 : 비슷한 내용의 문서들을 찾아내서 중복을 제거하거나 통합하는 작업을 수행합니다.
임베딩 모델이 동작되는 방식을 이해하기 위한 코드 예시 >
# 임베딩 모델의 동작 방식을 이해하기 위한 예시
import numpy as np
from typing import List, Tuple

class SimpleEmbeddingExample:
    """임베딩 모델이 어떻게 동작하는지 보여주는 간단한 예시"""
    
    def __init__(self, model_name: str):
        self.model_name = model_name
        self.vector_dimension = 768  # 실제로는 768차원이나 1024차원 벡터 사용
        print(f"임베딩 모델 '{model_name}'을 로드했습니다.")
        print(f"이 모델은 텍스트를 {self.vector_dimension}차원 벡터로 변환합니다.")
        
        # 실제로는 복잡한 신경망 모델을 로드
        self.model = self.load_embedding_model()
    
    def load_embedding_model(self):
        """임베딩 모델 로드"""
        print("임베딩 모델을 메모리에 로드하는 중...")
        print("이 모델은 텍스트의 의미를 이해해서 숫자로 변환합니다.")
        return "embedding_model_loaded"
    
    def text_to_vector(self, text: str) -> np.ndarray:
        """텍스트를 벡터로 변환하는 과정"""
        print(f"\n텍스트를 벡터로 변환 중: '{text}'")
        
        # 1단계: 텍스트 전처리
        cleaned_text = self.preprocess_text(text)
        
        # 2단계: 모델을 통해 벡터 생성
        vector = self.model_encode(cleaned_text)
        
        # 3단계: 벡터 정규화 (크기를 1로 만듦)
        normalized_vector = self.normalize_vector(vector)
        
        print(f"변환 완료: {self.vector_dimension}차원 벡터 생성")
        print(f"벡터 예시: [{normalized_vector[0]:.3f}, {normalized_vector[1]:.3f}, {normalized_vector[2]:.3f}, ...]")
        
        return normalized_vector
    
    def preprocess_text(self, text: str) -> str:
        """텍스트 전처리 과정"""
        print("  1단계: 텍스트를 정리하고 불필요한 문자 제거")
        # 실제로는 소문자 변환, 특수문자 제거, 토큰화 등을 수행
        cleaned = text.lower().strip()
        print(f"    정리된 텍스트: '{cleaned}'")
        return cleaned
    
    def model_encode(self, text: str) -> np.ndarray:
        """모델이 텍스트를 벡터로 인코딩"""
        print("  2단계: AI 모델이 텍스트의 의미를 분석해서 숫자로 변환")
        print("    이 과정에서 단어들의 관계, 문맥, 전체적인 의미를 파악합니다.")
        
        # 실제로는 복잡한 신경망 계산이 일어나지만 여기서는 예시로 랜덤 벡터 생성
        # 비슷한 의미의 텍스트는 비슷한 벡터를 가지도록 학습되어 있음
        vector = np.random.randn(self.vector_dimension)
        
        return vector
    
    def normalize_vector(self, vector: np.ndarray) -> np.ndarray:
        """벡터를 정규화해서 크기를 1로 만듦"""
        print("  3단계: 벡터 크기를 1로 정규화 (유사도 계산을 위해)")
        norm = np.linalg.norm(vector)
        if norm == 0:
            return vector
        return vector / norm
    
    def calculate_similarity(self, vector1: np.ndarray, vector2: np.ndarray) -> float:
        """두 벡터 사이의 유사도 계산 (코사인 유사도)"""
        # 벡터의 내적을 계산하면 코사인 유사도가 나옴 (정규화된 벡터의 경우)
        similarity = np.dot(vector1, vector2)
        return similarity
    
    def find_similar_texts(self, query_text: str, document_texts: List[str], top_k: int = 3) -> List[Tuple[str, float]]:
        """쿼리와 가장 유사한 문서들을 찾는 과정"""
        print(f"\n=== 유사 문서 검색 과정 ===")
        print(f"검색 쿼리: '{query_text}'")
        
        # 1. 쿼리를 벡터로 변환
        query_vector = self.text_to_vector(query_text)
        
        # 2. 모든 문서들을 벡터로 변환
        print(f"\n{len(document_texts)}개 문서를 벡터로 변환 중...")
        document_vectors = []
        for i, doc in enumerate(document_texts):
            print(f"  문서 {i+1}: '{doc[:50]}...'")
            doc_vector = self.text_to_vector(doc)
            document_vectors.append(doc_vector)
        
        # 3. 각 문서와 쿼리 사이의 유사도 계산
        print(f"\n유사도 계산 중...")
        similarities = []
        for i, doc_vector in enumerate(document_vectors):
            similarity = self.calculate_similarity(query_vector, doc_vector)
            similarities.append((document_texts[i], similarity))
            print(f"  문서 {i+1} 유사도: {similarity:.3f}")
        
        # 4. 유사도가 높은 순으로 정렬
        similarities.sort(key=lambda x: x[1], reverse=True)
        
        # 5. 상위 k개 문서 반환
        top_results = similarities[:top_k]
        
        print(f"\n=== 검색 결과 (상위 {top_k}개) ===")
        for i, (doc, score) in enumerate(top_results):
            print(f"{i+1}위: 유사도 {score:.3f}")
            print(f"      내용: {doc[:100]}...")
            print()
        
        return top_results

# 임베딩 모델 동작 실습
def demonstrate_embedding_working():
    """임베딩 모델 동작 과정을 보여주는 데모"""
    print("=== 임베딩 모델 동작 과정 데모 ===")
    
    # 임베딩 모델 초기화
    embedding_model = SimpleEmbeddingExample("bge-m3")
    
    # 예시 문서들 (제조업 관련)
    documents = [
        "CNC 머신의 주축 베어링에서 이상한 소음이 발생하고 있습니다. 온도도 평소보다 높습니다.",
        "알루미늄 가공 시 절삭 속도를 150m/min으로 설정하고 이송량은 0.2mm/rev로 조정했습니다.",
        "정기 점검에서 유압 시스템의 압력이 정상 범위를 벗어났습니다. 즉시 조치가 필요합니다.",
        "새로 설치한 로봇 팔의 교정 작업이 완료되었고 정확도 테스트를 진행하고 있습니다."
    ]
    
    # 사용자 검색 쿼리
    user_query = "기계에서 소음이 나는 문제"
    
    # 유사 문서 검색 실행
    results = embedding_model.find_similar_texts(user_query, documents, top_k=2)
    
    print("=== 검색 완료 ===")
    print("임베딩 모델이 '기계 소음'과 관련된 문서를 성공적으로 찾았습니다!")

# 실행해보기
demonstrate_embedding_working()

2. 두 모델의 내부 구조 자세히 알아보기

2.1 LLM의 내부 구조 상세 분석

트랜스포머 아키텍처의 이해

현재 사용되는 대부분의 LLM은 '트랜스포머(Transformer)'라는 특별한 구조를 사용합니다.

이는 2017년에 구글에서 발표한 "Attention Is All You Need"라는 논문에서 소개된 혁신적인 기술입니다. 트랜스포머의 핵심은 '어텐션(Attention)' 메커니즘인데, 이는 인간이 긴 문장을 읽을 때 중요한 부분에 더 집중하는 것과 비슷합니다.

 

예를 들어 "어제 비가 많이 와서 우산을 가져갔는데 집에 두고 왔다"라는 문장에서 '우산'과 '비'가 서로 관련이 있다는 것을 파악하는 능력이 어텐션입니다. 트랜스포머는 이런 관계를 수학적으로 계산해서 문맥을 이해하고 적절한 답변을 생성할 수 있게 됩니다.

LLM의 주요 구성 요소들

  • 토크나이저(Tokenizer) : 사람이 사용하는 자연어를 AI가 이해할 수 있는 숫자로 변환하는 역할을 합니다. 예를 들어 "안녕하세요"를 [1234, 5678] 같은 숫자로 바꿔줍니다. 이 과정이 필요한 이유는 컴퓨터는 텍스트를 직접 이해할 수 없고 오직 숫자만 처리할 수 있기 때문입니다.
  • 임베딩 레이어 : 토크나이저에서 만들어진 숫자들을 더 풍부한 의미를 담은 벡터로 변환합니다. 단순한 숫자 1234를 [0.1, 0.5, -0.3, 0.8, ...] 같은 수백 개의 숫자로 이루어진 벡터로 바꿔서 더 많은 정보를 담을 수 있게 합니다.
  • 어텐션 레이어 : 문장에서 어떤 단어들이 서로 관련이 있는지를 파악하는 부분입니다. "그는 공을 찼다"에서 '그는'과 '찼다'가 연결되고, '공을'과 '찼다'가 연결된다는 것을 이해합니다.
  • 피드포워드 네트워크 : 어텐션에서 파악한 관계들을 바탕으로 더 복잡한 패턴을 학습하는 부분입니다. 여러 층의 신경망을 통해 점점 더 복잡하고 추상적인 개념을 이해하게 됩니다.
  • 출력 레이어 : 최종적으로 다음에 올 단어의 확률을 계산하는 부분입니다. "오늘 날씨가" 다음에 "좋다"가 올 확률이 30%, "나쁘다"가 올 확률이 20% 같은 식으로 계산합니다.
LLM 내부 구조 이해를 위한 예시 코드 >
# LLM 내부 구조를 이해하기 위한 상세한 예시
import numpy as np
from typing import List, Dict

class DetailedLLMArchitecture:
    """LLM의 내부 구조를 단계별로 보여주는 클래스"""
    
    def __init__(self, vocab_size: int = 50000, d_model: int = 768, num_layers: int = 12):
        """
        LLM 아키텍처 초기화
        
        vocab_size: 어휘 사전 크기 (알고 있는 단어의 개수)
        d_model: 각 단어를 표현하는 벡터의 크기
        num_layers: 트랜스포머 블록의 개수 (많을수록 더 복잡한 이해 가능)
        """
        self.vocab_size = vocab_size
        self.d_model = d_model
        self.num_layers = num_layers
        
        print(f"LLM 아키텍처 초기화:")
        print(f"  - 어휘 사전 크기: {vocab_size:,}개 단어")
        print(f"  - 임베딩 차원: {d_model}차원")
        print(f"  - 트랜스포머 레이어: {num_layers}개")
        print(f"  - 총 매개변수 수: 약 {self.estimate_parameters():,}개")
        
        # 각 구성 요소 초기화
        self.tokenizer = self.create_tokenizer()
        self.embedding_layer = self.create_embedding_layer()
        self.transformer_blocks = self.create_transformer_blocks()
        self.output_layer = self.create_output_layer()
    
    def estimate_parameters(self) -> int:
        """모델의 총 매개변수 수 추정"""
        # 간단한 추정 공식 (실제로는 더 복잡)
        embedding_params = self.vocab_size * self.d_model
        transformer_params = self.num_layers * (4 * self.d_model * self.d_model)
        output_params = self.d_model * self.vocab_size
        
        return embedding_params + transformer_params + output_params
    
    def create_tokenizer(self):
        """토크나이저 생성"""
        print("\n토크나이저 생성 중...")
        print("  토크나이저는 텍스트를 숫자로 변환하는 도구입니다.")
        
        # 예시 어휘 사전 (실제로는 수만 개의 단어 포함)
        vocab = {
            "안녕": 1, "하세요": 2, "CNC": 3, "머신": 4, "진동": 5,
            "온도": 6, "높다": 7, "문제": 8, "해결": 9, "[UNK]": 0
        }
        
        return {"vocab": vocab, "type": "BPE"}  # BPE: Byte Pair Encoding
    
    def create_embedding_layer(self):
        """임베딩 레이어 생성"""
        print("\n임베딩 레이어 생성 중...")
        print("  각 단어를 의미를 담은 벡터로 변환합니다.")
        print(f"  예: '안녕'(1번) → [{np.random.randn(3)[:3]}...] ({self.d_model}차원)")
        
        # 실제로는 학습 가능한 가중치 행렬
        embedding_matrix = np.random.randn(self.vocab_size, self.d_model) * 0.1
        
        return {"matrix": embedding_matrix, "dimension": self.d_model}
    
    def create_transformer_blocks(self):
        """트랜스포머 블록들 생성"""
        print(f"\n{self.num_layers}개의 트랜스포머 블록 생성 중...")
        
        blocks = []
        for i in range(self.num_layers):
            block = {
                "layer_num": i + 1,
                "multi_head_attention": self.create_attention_layer(),
                "feed_forward": self.create_feedforward_layer(),
                "layer_norm1": "첫 번째 정규화 레이어",
                "layer_norm2": "두 번째 정규화 레이어"
            }
            blocks.append(block)
            print(f"  레이어 {i+1}: 어텐션 + 피드포워드 네트워크 생성")
        
        return blocks
    
    def create_attention_layer(self):
        """어텐션 레이어 생성"""
        # 멀티헤드 어텐션: 여러 관점에서 단어 관계를 파악
        num_heads = 12  # 12개의 서로 다른 어텐션 헤드
        
        attention_config = {
            "num_heads": num_heads,
            "head_dimension": self.d_model // num_heads,
            "purpose": "단어들 사이의 관계를 파악하여 문맥 이해"
        }
        
        return attention_config
    
    def create_feedforward_layer(self):
        """피드포워드 네트워크 생성"""
        # 일반적으로 d_model의 4배 크기를 가짐
        ff_dimension = self.d_model * 4
        
        feedforward_config = {
            "input_dim": self.d_model,
            "hidden_dim": ff_dimension,
            "output_dim": self.d_model,
            "activation": "GELU",  # 활성화 함수
            "purpose": "복잡한 패턴과 비선형 관계 학습"
        }
        
        return feedforward_config
    
    def create_output_layer(self):
        """출력 레이어 생성"""
        print(f"\n출력 레이어 생성 중...")
        print(f"  {self.d_model}차원 벡터를 {self.vocab_size}개 단어의 확률로 변환")
        
        output_config = {
            "input_dim": self.d_model,
            "output_dim": self.vocab_size,
            "purpose": "다음 단어의 확률 분포 계산"
        }
        
        return output_config
    
    def process_text_step_by_step(self, input_text: str) -> Dict:
        """텍스트 처리 과정을 단계별로 보여줌"""
        print(f"\n=== 텍스트 처리 과정: '{input_text}' ===")
        
        # 1단계: 토큰화
        tokens = self.tokenize_text(input_text)
        
        # 2단계: 임베딩
        embeddings = self.embed_tokens(tokens)
        
        # 3단계: 트랜스포머 처리
        processed_embeddings = self.process_through_transformers(embeddings)
        
        # 4단계: 출력 생성
        output_probabilities = self.generate_output_probabilities(processed_embeddings)
        
        # 5단계: 다음 단어 선택
        next_word = self.select_next_word(output_probabilities)
        
        return {
            "input_text": input_text,
            "tokens": tokens,
            "embeddings_shape": embeddings.shape,
            "final_embeddings_shape": processed_embeddings.shape,
            "next_word": next_word,
            "processing_steps": 5
        }
    
    def tokenize_text(self, text: str) -> List[int]:
        """텍스트를 토큰으로 변환"""
        print("\n1단계: 토큰화")
        print(f"  입력: '{text}'")
        
        # 간단한 토큰화 (실제로는 더 복잡)
        words = text.split()
        vocab = self.tokenizer["vocab"]
        
        tokens = []
        for word in words:
            if word in vocab:
                tokens.append(vocab[word])
                print(f"  '{word}' → {vocab[word]}")
            else:
                tokens.append(vocab["[UNK]"])  # 모르는 단어
                print(f"  '{word}' → {vocab['[UNK]']} (미등록 단어)")
        
        print(f"  결과: {tokens}")
        return tokens
    
    def embed_tokens(self, tokens: List[int]) -> np.ndarray:
        """토큰들을 임베딩 벡터로 변환"""
        print("\n2단계: 임베딩 변환")
        print(f"  {len(tokens)}개 토큰을 각각 {self.d_model}차원 벡터로 변환")
        
        # 각 토큰을 임베딩 벡터로 변환
        embeddings = []
        for i, token in enumerate(tokens):
            # 실제로는 학습된 임베딩 행렬에서 해당 행을 가져옴
            embedding_vector = np.random.randn(self.d_model) * 0.1
            embeddings.append(embedding_vector)
            print(f"  토큰 {token} → 벡터 [길이: {self.d_model}]")
        
        embeddings_array = np.array(embeddings)
        print(f"  최종 임베딩 형태: {embeddings_array.shape}")
        
        return embeddings_array
    
    def process_through_transformers(self, embeddings: np.ndarray) -> np.ndarray:
        """트랜스포머 블록들을 통과하며 처리"""
        print(f"\n3단계: {self.num_layers}개 트랜스포머 블록 처리")
        
        current_embeddings = embeddings.copy()
        
        for i, block in enumerate(self.transformer_blocks):
            print(f"  블록 {i+1}/{self.num_layers} 처리 중...")
            
            # 어텐션 처리
            attention_output = self.apply_attention(current_embeddings, block["multi_head_attention"])
            print(f"    - 어텐션: 단어들 사이의 관계 파악")
            
            # 피드포워드 처리
            ff_output = self.apply_feedforward(attention_output, block["feed_forward"])
            print(f"    - 피드포워드: 복잡한 패턴 학습")
            
            current_embeddings = ff_output
        
        print(f"  모든 트랜스포머 블록 처리 완료")
        print(f"  최종 처리된 임베딩 형태: {current_embeddings.shape}")
        
        return current_embeddings
    
    def apply_attention(self, embeddings: np.ndarray, attention_config: Dict) -> np.ndarray:
        """어텐션 메커니즘 적용"""
        # 실제로는 복잡한 행렬 연산이지만 여기서는 간단히 표현
        print(f"      어텐션 헤드 {attention_config['num_heads']}개로 관계 분석")
        
        # 예시: 각 단어가 다른 단어들에 얼마나 주목하는지 계산
        seq_length = embeddings.shape[0]
        attention_weights = np.random.softmax(np.random.randn(seq_length, seq_length), axis=1)
        
        # 가중평균으로 새로운 표현 생성
        attended_embeddings = np.dot(attention_weights, embeddings)
        
        return attended_embeddings
    
    def apply_feedforward(self, embeddings: np.ndarray, ff_config: Dict) -> np.ndarray:
        """피드포워드 네트워크 적용"""
        print(f"      {ff_config['input_dim']} → {ff_config['hidden_dim']} → {ff_config['output_dim']} 변환")
        
        # 실제로는 두 개의 선형 변환과 활성화 함수 적용
        # 여기서는 간단히 표현
        processed = embeddings + np.random.randn(*embeddings.shape) * 0.01
        
        return processed
    
    def generate_output_probabilities(self, embeddings: np.ndarray) -> np.ndarray:
        """출력 확률 분포 생성"""
        print("\n4단계: 출력 확률 계산")
        print(f"  마지막 토큰의 임베딩을 {self.vocab_size}개 단어의 확률로 변환")
        
        # 마지막 토큰의 임베딩을 사용 (다음 단어 예측을 위해)
        last_token_embedding = embeddings[-1]
        
        # 어휘 사전 크기만큼의 확률 분포 생성
        logits = np.random.randn(self.vocab_size)
        probabilities = self.softmax(logits)
        
        print(f"  각 단어별 확률 계산 완료 (총 {self.vocab_size}개)")
        
        return probabilities
    
    def softmax(self, x: np.ndarray) -> np.ndarray:
        """소프트맥스 함수로 확률 분포 생성"""
        exp_x = np.exp(x - np.max(x))  # 안정성을 위해 최대값 빼기
        return exp_x / np.sum(exp_x)
    
    def select_next_word(self, probabilities: np.ndarray) -> str:
        """확률 분포에서 다음 단어 선택"""
        print("\n5단계: 다음 단어 선택")
        
        # 가장 확률이 높은 단어 선택 (greedy decoding)
        top_indices = np.argsort(probabilities)[-5:][::-1]  # 상위 5개
        
        # 역방향 어휘 사전 만들기
        reverse_vocab = {v: k for k, v in self.tokenizer["vocab"].items()}
        
        print("  상위 5개 후보:")
        for i, idx in enumerate(top_indices):
            word = reverse_vocab.get(idx, f"단어_{idx}")
            prob = probabilities[idx]
            print(f"    {i+1}위: '{word}' (확률: {prob:.3f})")
        
        # 최고 확률 단어 선택
        best_word = reverse_vocab.get(top_indices[0], f"단어_{top_indices[0]}")
        print(f"  선택된 다음 단어: '{best_word}'")
        
        return best_word

# LLM 내부 구조 데모 실행
def demonstrate_llm_architecture():
    """LLM 내부 구조를 상세히 보여주는 데모"""
    print("=== LLM 내부 구조 데모 ===")
    
    # 작은 크기의 LLM 아키텍처 생성 (데모용)
    llm_arch = DetailedLLMArchitecture(
        vocab_size=1000,    # 실제로는 50,000개 이상
        d_model=256,        # 실제로는 768, 1024, 2048 등
        num_layers=6        # 실제로는 12, 24, 48개 등
    )
    
    # 텍스트 처리 과정 단계별 실행
    result = llm_arch.process_text_step_by_step("CNC 머신 진동")
    
    print(f"\n=== 처리 결과 요약 ===")
    print(f"입력 텍스트: {result['input_text']}")
    print(f"토큰 개수: {len(result['tokens'])}개")
    print(f"처리 단계: {result['processing_steps']}단계")
    print(f"예측된 다음 단어: {result['next_word']}")
    
    print("\n=== LLM 구조 분석 완료 ===")

# 실행해보기
demonstrate_llm_architecture()

2.2 임베딩 모델의 내부 구조 상세 분석

BERT 기반 아키텍처의 이해

대부분의 임베딩 모델은 BERT(Bidirectional Encoder Representations from Transformers)를 기반으로 만들어집니다. BERT의 가장 큰 특징은 '양방향'으로 텍스트를 읽는다는 점입니다. 일반적인 언어 모델이 왼쪽에서 오른쪽으로만 읽는다면, BERT는 문장 전체를 한꺼번에 보면서 앞뒤 문맥을 모두 고려합니다.

예를 들어 "은행에 돈을 맡겼다"와 "강가 은행에 앉았다"에서 같은 '은행'이라는 단어가 완전히 다른 의미로 사용되는 것을 BERT는 앞뒤 문맥을 보고 정확히 구분할 수 있습니다. 이런 양방향 이해 능력 덕분에 BERT는 텍스트의 의미를 더 정확하게 파악해서 좋은 임베딩을 만들 수 있습니다.

 

임베딩 모델의 주요 구성 요소들

  • 인코더 스택 : 입력된 텍스트를 여러 층의 트랜스포머 인코더를 통과시켜 점점 더 깊은 의미를 파악하는 부분입니다. 첫 번째 층에서는 단어 수준의 의미를 파악하고, 나중 층으로 갈수록 문장 전체의 의미나 추상적인 개념을 이해하게 됩니다.
  • 풀링 레이어 : 각 단어별로 만들어진 벡터들을 하나의 문서 레벨 벡터로 합치는 역할을 합니다. 평균을 내거나(Mean Pooling), 가장 중요한 값을 선택하거나(Max Pooling), 특별한 CLS 토큰을 사용하는 등 다양한 방법이 있습니다.
  • 정규화 레이어 : 생성된 벡터의 크기를 일정하게 만들어서 서로 다른 문서들 간의 유사도를 공정하게 비교할 수 있게 해줍니다. 마치 서로 다른 키를 가진 사람들을 비교할 때 비율로 환산하는 것과 같습니다.
  • 대조 학습 헤드 : 비슷한 의미의 문서들은 가까운 벡터를, 다른 의미의 문서들은 먼 벡터를 가지도록 학습시키는 부분입니다. 이를 통해 의미적 유사도가 벡터 거리로 잘 표현되도록 만듭니다.
엠베딩 모델의 내부 구조를 확인하는 코드 예시 >
# 임베딩 모델의 내부 구조를 상세히 보여주는 예시
import numpy as np
from typing import List, Dict, Tuple

class DetailedEmbeddingArchitecture:
    """임베딩 모델의 내부 구조를 단계별로 보여주는 클래스"""
    
    def __init__(self, vocab_size: int = 30000, d_model: int = 768, num_layers: int = 12):
        """
        임베딩 모델 아키텍처 초기화
        
        vocab_size: 어휘 사전 크기
        d_model: 임베딩 차원 크기  
        num_layers: BERT 인코더 레이어 개수
        """
        self.vocab_size = vocab_size
        self.d_model = d_model
        self.num_layers = num_layers
        self.max_sequence_length = 512  # 최대 입력 길이
        
        print(f"임베딩 모델 아키텍처 초기화:")
        print(f"  - 어휘 사전 크기: {vocab_size:,}개")
        print(f"  - 임베딩 차원: {d_model}")
        print(f"  - 인코더 레이어: {num_layers}개")
        print(f"  - 최대 시퀀스 길이: {self.max_sequence_length}")
        
        # 구성 요소들 초기화
        self.tokenizer = self.create_tokenizer()
        self.embedding_layers = self.create_embedding_layers()
        self.encoder_stack = self.create_encoder_stack()
        self.pooling_layer = self.create_pooling_layer()
        self.normalization = self.create_normalization()
    
    def create_tokenizer(self):
        """토크나이저 생성 - 텍스트를 토큰으로 분리"""
        print("\n토크나이저 생성:")
        print("  - 서브워드 기반 토크나이징 사용 (WordPiece)")
        print("  - 특수 토큰: [CLS], [SEP], [PAD], [UNK], [MASK]")
        
        special_tokens = {
            "[CLS]": 101,    # 문장 시작 토큰
            "[SEP]": 102,    # 문장 분리 토큰  
            "[PAD]": 0,      # 패딩 토큰
            "[UNK]": 100,    # 미등록 단어 토큰
            "[MASK]": 103    # 마스킹 토큰
        }
        
        return {
            "special_tokens": special_tokens,
            "vocab_size": self.vocab_size,
            "type": "WordPiece"
        }
    
    def create_embedding_layers(self):
        """임베딩 레이어들 생성"""
        print("\n임베딩 레이어 생성:")
        
        embedding_config = {
            "token_embedding": {
                "size": (self.vocab_size, self.d_model),
                "purpose": "각 토큰을 벡터로 변환"
            },
            "position_embedding": {
                "size": (self.max_sequence_length, self.d_model),
                "purpose": "단어의 위치 정보 추가"
            },
            "segment_embedding": {
                "size": (2, self.d_model),  # 문장 A, B 구분
                "purpose": "여러 문장을 구분하는 정보"
            }
        }
        
        for name, config in embedding_config.items():
            print(f"  - {name}: {config['size']} - {config['purpose']}")
        
        return embedding_config
    
    def create_encoder_stack(self):
        """BERT 인코더 스택 생성"""
        print(f"\n{self.num_layers}개 인코더 레이어 생성:")
        
        encoder_layers = []
        for i in range(self.num_layers):
            layer_config = {
                "layer_id": i + 1,
                "multi_head_attention": {
                    "num_heads": 12,
                    "head_dim": self.d_model // 12,
                    "purpose": "양방향으로 단어 관계 파악"
                },
                "feed_forward": {
                    "hidden_size": self.d_model * 4,
                    "activation": "GELU",
                    "purpose": "비선형 변환으로 복잡한 패턴 학습"
                },
                "layer_norm": {
                    "epsilon": 1e-12,
                    "purpose": "학습 안정성 향상"
                },
                "residual_connection": {
                    "purpose": "기울기 소실 문제 해결"
                }
            }
            encoder_layers.append(layer_config)
            print(f"  레이어 {i+1}: 양방향 어텐션 + 피드포워드")
        
        return encoder_layers
    
    def create_pooling_layer(self):
        """풀링 레이어 생성 - 토큰 벡터들을 문서 벡터로 변환"""
        print("\n풀링 레이어 생성:")
        
        pooling_strategies = {
            "cls_pooling": {
                "method": "[CLS] 토큰의 벡터 사용",
                "advantage": "전체 문서 정보가 집약됨",
                "usage": "BERT 기본 방식"
            },
            "mean_pooling": {
                "method": "모든 토큰 벡터의 평균",
                "advantage": "모든 단어 정보 반영",
                "usage": "일반적인 임베딩 모델"
            },
            "max_pooling": {
                "method": "각 차원별 최대값 선택",
                "advantage": "중요한 특성 강조",
                "usage": "특별한 경우에만 사용"
            }
        }
        
        for name, config in pooling_strategies.items():
            print(f"  {name}:")
            print(f"    방법: {config['method']}")
            print(f"    장점: {config['advantage']}")
        
        return pooling_strategies
    
    def create_normalization(self):
        """정규화 레이어 생성"""
        print("\n정규화 레이어 생성:")
        print("  L2 정규화를 사용하여 벡터 크기를 1로 만듦")
        print("  이를 통해 코사인 유사도 계산이 간단해짐")
        
        return {
            "type": "L2_normalization",
            "purpose": "벡터 크기 통일로 공정한 유사도 비교"
        }
    
    def encode_text_detailed(self, text: str) -> Dict:
        """텍스트를 임베딩으로 변환하는 전체 과정"""
        print(f"\n=== 텍스트 임베딩 과정: '{text}' ===")
        
        # 1단계: 토크나이징
        tokens, attention_mask = self.tokenize_with_special_tokens(text)
        
        # 2단계: 임베딩 변환
        embeddings = self.convert_to_embeddings(tokens)
        
        # 3단계: 인코더 처리
        encoded_embeddings = self.process_through_encoders(embeddings, attention_mask)
        
        # 4단계: 풀링
        document_embedding = self.apply_pooling(encoded_embeddings, attention_mask)
        
        # 5단계: 정규화
        final_embedding = self.normalize_embedding(document_embedding)
        
        return {
            "original_text": text,
            "tokens": tokens,
            "attention_mask": attention_mask,
            "embedding_shape": final_embedding.shape,
            "embedding_norm": np.linalg.norm(final_embedding),
            "processing_stages": 5
        }
    
    def tokenize_with_special_tokens(self, text: str) -> Tuple[List[int], List[int]]:
        """특수 토큰을 포함한 토크나이징"""
        print("\n1단계: 토크나이징 및 특수 토큰 추가")
        print(f"  원본 텍스트: '{text}'")
        
        # 간단한 토크나이징 (실제로는 WordPiece 사용)
        words = text.split()
        special_tokens = self.tokenizer["special_tokens"]
        
        # [CLS] + 토큰들 + [SEP] 형식으로 구성
        tokens = [special_tokens["[CLS]"]]
        
        print("  토큰 변환 과정:")
        print(f"    시작: [CLS]({special_tokens['[CLS]']})")
        
        for word in words:
            # 실제로는 서브워드로 분할되지만 여기서는 간단히 처리
            if word.lower() in ["cnc", "머신", "진동", "온도", "문제"]:
                token_id = hash(word) % 1000 + 200  # 간단한 해싱
                tokens.append(token_id)
                print(f"    '{word}' → {token_id}")
            else:
                tokens.append(special_tokens["[UNK]"])
                print(f"    '{word}' → [UNK]({special_tokens['[UNK]']})")
        
        tokens.append(special_tokens["[SEP]"])
        print(f"    끝: [SEP]({special_tokens['[SEP]']})")
        
        # 어텐션 마스크 생성 (실제 토큰은 1, 패딩은 0)
        attention_mask = [1] * len(tokens)
        
        # 최대 길이에 맞춰 패딩
        while len(tokens) < 20:  # 예시를 위해 20으로 제한
            tokens.append(special_tokens["[PAD]"])
            attention_mask.append(0)
        
        print(f"  최종 토큰 수: {len(tokens)}개")
        print(f"  실제 토큰: {sum(attention_mask)}개, 패딩: {len(tokens) - sum(attention_mask)}개")
        
        return tokens, attention_mask
    
    def convert_to_embeddings(self, tokens: List[int]) -> np.ndarray:
        """토큰들을 임베딩 벡터로 변환"""
        print("\n2단계: 토큰을 임베딩 벡터로 변환")
        
        token_embeddings = []
        for i, token_id in enumerate(tokens):
            # 토큰 임베딩 (실제로는 학습된 가중치 행렬에서 조회)
            token_embed = np.random.randn(self.d_model) * 0.1
            
            # 위치 임베딩 추가
            position_embed = np.random.randn(self.d_model) * 0.05
            
            # 세그먼트 임베딩 추가 (모든 토큰이 같은 문장이므로 0)
            segment_embed = np.random.randn(self.d_model) * 0.02
            
            # 세 임베딩 합치기
            combined_embed = token_embed + position_embed + segment_embed
            token_embeddings.append(combined_embed)
            
            if i < 5:  # 처음 5개만 출력
                print(f"  토큰 {i+1}: ID {token_id} → {self.d_model}차원 벡터")
        
        embeddings_array = np.array(token_embeddings)
        print(f"  전체 임베딩 형태: {embeddings_array.shape}")
        
        return embeddings_array
    
    def process_through_encoders(self, embeddings: np.ndarray, attention_mask: List[int]) -> np.ndarray:
        """인코더 스택을 통과하며 임베딩 처리"""
        print(f"\n3단계: {self.num_layers}개 인코더 레이어 처리")
        
        current_embeddings = embeddings.copy()
        
        for i, encoder_config in enumerate(self.encoder_stack):
            print(f"  인코더 레이어 {i+1}/{self.num_layers}")
            
            # 멀티헤드 어텐션 적용
            attention_output = self.apply_bidirectional_attention(
                current_embeddings, attention_mask, encoder_config["multi_head_attention"]
            )
            
            # 잔차 연결 및 레이어 정규화
            attention_output = current_embeddings + attention_output  # 잔차 연결
            attention_output = self.layer_normalize(attention_output)
            
            # 피드포워드 네트워크 적용
            ff_output = self.apply_feed_forward(attention_output, encoder_config["feed_forward"])
            
            # 다시 잔차 연결 및 정규화
            current_embeddings = attention_output + ff_output
            current_embeddings = self.layer_normalize(current_embeddings)
            
            print(f"    → 양방향 어텐션 + 피드포워드 처리 완료")
        
        print(f"  최종 인코딩 완료: {current_embeddings.shape}")
        return current_embeddings
    
    def apply_bidirectional_attention(self, embeddings: np.ndarray, attention_mask: List[int], 
                                    attention_config: Dict) -> np.ndarray:
        """양방향 어텐션 적용"""
        seq_length, embed_dim = embeddings.shape
        num_heads = attention_config["num_heads"]
        
        print(f"    양방향 어텐션: {num_heads}개 헤드로 전체 시퀀스 관계 분석")
        
        # 실제로는 복잡한 멀티헤드 어텐션 계산
        # 여기서는 간단히 시뮬레이션
        
        # 어텐션 가중치 계산 (마스킹 적용)
        attention_weights = np.random.randn(seq_length, seq_length)
        
        # 패딩된 위치는 어텐션에서 제외
        for i in range(seq_length):
            for j in range(seq_length):
                if attention_mask[i] == 0 or attention_mask[j] == 0:
                    attention_weights[i][j] = -np.inf
        
        # 소프트맥스로 정규화
        attention_weights = self.softmax_2d(attention_weights)
        
        # 가중 합으로 새로운 표현 생성
        attended_embeddings = np.dot(attention_weights, embeddings)
        
        return attended_embeddings
    
    def softmax_2d(self, x: np.ndarray) -> np.ndarray:
        """2D 배열에 소프트맥스 적용"""
        result = np.zeros_like(x)
        for i in range(x.shape[0]):
            row = x[i]
            exp_row = np.exp(row - np.max(row))
            result[i] = exp_row / np.sum(exp_row)
        return result
    
    def layer_normalize(self, x: np.ndarray) -> np.ndarray:
        """레이어 정규화 적용"""
        # 각 토큰별로 정규화 (평균 0, 분산 1)
        epsilon = 1e-12
        mean = np.mean(x, axis=-1, keepdims=True)
        var = np.var(x, axis=-1, keepdims=True)
        normalized = (x - mean) / np.sqrt(var + epsilon)
        return normalized
    
    def apply_feed_forward(self, embeddings: np.ndarray, ff_config: Dict) -> np.ndarray:
        """피드포워드 네트워크 적용"""
        hidden_size = ff_config["hidden_size"]
        
        print(f"    피드포워드: {self.d_model} → {hidden_size} → {self.d_model}")
        
        # 실제로는 두 개의 선형 변환과 GELU 활성화 함수
        # 여기서는 간단히 시뮬레이션
        hidden = np.dot(embeddings, np.random.randn(self.d_model, hidden_size)) * 0.1
        
        # GELU 활성화 함수 (간단히 표현)
        activated = hidden * (1 + np.tanh(np.sqrt(2/np.pi) * (hidden + 0.044715 * hidden**3)))
        
        # 다시 원래 차원으로
        output = np.dot(activated, np.random.randn(hidden_size, self.d_model)) * 0.1
        
        return output
    
    def apply_pooling(self, embeddings: np.ndarray, attention_mask: List[int]) -> np.ndarray:
        """풀링을 통해 문서 레벨 임베딩 생성"""
        print("\n4단계: 풀링으로 문서 임베딩 생성")
        
        # Mean Pooling 사용 (패딩 토큰 제외)
        print("  Mean Pooling 적용 (패딩 토큰 제외한 평균)")
        
        # 마스킹된 토큰들만 사용
        valid_embeddings = []
        for i, mask_value in enumerate(attention_mask):
            if mask_value == 1:  # 실제 토큰
                valid_embeddings.append(embeddings[i])
        
        if valid_embeddings:
            document_embedding = np.mean(valid_embeddings, axis=0)
            print(f"  유효한 토큰 {len(valid_embeddings)}개의 평균으로 문서 임베딩 생성")
        else:
            document_embedding = np.zeros(self.d_model)
            print("  유효한 토큰이 없어 0 벡터 생성")
        
        print(f"  문서 임베딩 크기: {document_embedding.shape}")
        
        return document_embedding
    
    def normalize_embedding(self, embedding: np.ndarray) -> np.ndarray:
        """임베딩 벡터 정규화"""
        print("\n5단계: L2 정규화")
        
        norm = np.linalg.norm(embedding)
        print(f"  정규화 전 벡터 크기: {norm:.3f}")
        
        if norm > 0:
            normalized = embedding / norm
            new_norm = np.linalg.norm(normalized)
            print(f"  정규화 후 벡터 크기: {new_norm:.3f}")
        else:
            normalized = embedding
            print("  영벡터이므로 정규화 생략")
        
        return normalized
    
    def compare_embeddings(self, text1: str, text2: str) -> float:
        """두 텍스트의 임베딩 유사도 비교"""
        print(f"\n=== 임베딩 유사도 비교 ===")
        print(f"텍스트 1: '{text1}'")
        print(f"텍스트 2: '{text2}'")
        
        # 각각 임베딩 생성
        result1 = self.encode_text_detailed(text1)
        embedding1 = result1["embedding_shape"]  # 실제로는 벡터 반환
        
        result2 = self.encode_text_detailed(text2)
        embedding2 = result2["embedding_shape"]  # 실제로는 벡터 반환
        
        # 실제 계산을 위해 랜덤 임베딩 생성
        emb1 = np.random.randn(self.d_model)
        emb1 = emb1 / np.linalg.norm(emb1)
        
        emb2 = np.random.randn(self.d_model)
        emb2 = emb2 / np.linalg.norm(emb2)
        
        # 코사인 유사도 계산
        similarity = np.dot(emb1, emb2)
        
        print(f"\n코사인 유사도: {similarity:.3f}")
        if similarity > 0.8:
            print("→ 매우 유사한 의미")
        elif similarity > 0.6:
            print("→ 어느 정도 유사한 의미")
        elif similarity > 0.3:
            print("→ 부분적으로 관련있는 의미")
        else:
            print("→ 다른 의미")
        
        return similarity

# 임베딩 모델 내부 구조 데모
def demonstrate_embedding_architecture():
    """임베딩 모델 내부 구조를 상세히 보여주는 데모"""
    print("=== 임베딩 모델 내부 구조 데모 ===")
    
    # 임베딩 모델 아키텍처 생성
    embedding_arch = DetailedEmbeddingArchitecture(
        vocab_size=5000,    # 실제로는 30,000개 이상
        d_model=256,        # 실제로는 768, 1024 등
        num_layers=6        # 실제로는 12개 이상
    )
    
    # 텍스트 임베딩 생성 과정 시연
    result = embedding_arch.encode_text_detailed("CNC 머신에서 진동이 발생")
    
    print(f"\n=== 처리 결과 ===")
    print(f"원본 텍스트: {result['original_text']}")
    print(f"토큰 개수: {len(result['tokens'])}개")
    print(f"실제 토큰: {sum(result['attention_mask'])}개")
    print(f"임베딩 벡터 크기: {result['embedding_norm']:.3f}")
    
    # 유사도 비교 데모
    similarity = embedding_arch.compare_embeddings(
        "CNC 머신 진동 문제",
        "기계에서 소음 발생"
    )
    
    print("\n=== 임베딩 모델 분석 완료 ===")

# 실행해보기
demonstrate_embedding_architecture()

2.3 LLM모델과 임베딩모델간 데이터 인터페이스 

LLM 모델과 임베딩 모델 간의 구성요소 및 인터페이스 연계를 보여주는 포괄적인 프레임워크 다이어그램을 통해  LLM 모델과 임베딩 모델간의 연관관계를 살펴보겠습니다.

graph TB
    %% 입력 데이터 계층
    subgraph "입력 계층 (Input Layer)"
        UD[사용자 데이터<br/>User Data]
        TXT[텍스트 입력<br/>Text Input]
        DOC[문서/지식베이스<br/>Documents/KB]
    end

    %% LLM 모델 구성요소
    subgraph "LLM 모델 구성요소 (LLM Components)"
        TOK[토크나이저<br/>Tokenizer]
        EMB_L[임베딩 레이어<br/>Embedding Layer]
        POS[위치 인코딩<br/>Position Encoding]
        
        subgraph "트랜스포머 블록 (Transformer Blocks)"
            ATT[어텐션 메커니즘<br/>Multi-Head Attention]
            FFN[피드포워드 네트워크<br/>Feed Forward Network]
            NORM[정규화<br/>Layer Normalization]
        end
        
        OUT[출력 레이어<br/>Output Layer]
        DEC[디코더<br/>Decoder]
    end

    %% 임베딩 모델 구성요소
    subgraph "임베딩 모델 구성요소 (Embedding Model Components)"
        ENC[텍스트 인코더<br/>Text Encoder]
        VEC_GEN[벡터 생성기<br/>Vector Generator]
        
        subgraph "벡터 저장소 (Vector Storage)"
            VDB[벡터 데이터베이스<br/>Vector Database]
            IDX[인덱스<br/>Index]
            META[메타데이터<br/>Metadata]
        end
        
        subgraph "검색 엔진 (Retrieval Engine)"
            SIM[유사도 계산<br/>Similarity Computation]
            RET[검색기<br/>Retriever]
            RANK[랭킹<br/>Ranking]
        end
    end

    %% 인터페이스 및 연결
    subgraph "인터페이스 계층 (Interface Layer)"
        API[API 게이트웨이<br/>API Gateway]
        PROC[전처리기<br/>Preprocessor]
        POST[후처리기<br/>Postprocessor]
    end

    %% 출력 계층
    subgraph "출력 계층 (Output Layer)"
        RES[응답 생성<br/>Response Generation]
        AUG[증강된 응답<br/>Augmented Response]
        FINAL[최종 출력<br/>Final Output]
    end

    %% 데이터 흐름 연결
    UD --> PROC
    TXT --> TOK
    DOC --> ENC
    
    %% LLM 내부 흐름
    TOK --> EMB_L
    EMB_L --> POS
    POS --> ATT
    ATT --> FFN
    FFN --> NORM
    NORM --> OUT
    OUT --> DEC
    
    %% 임베딩 모델 내부 흐름
    ENC --> VEC_GEN
    VEC_GEN --> VDB
    VDB --> IDX
    IDX --> SIM
    SIM --> RET
    RET --> RANK
    
    %% 인터페이스 연결
    PROC --> API
    API --> EMB_L
    API --> ENC
    
    %% 검색-생성 연계
    RANK --> DEC
    DEC --> POST
    POST --> RES
    RES --> AUG
    AUG --> FINAL
    
    %% 양방향 데이터 교환
    EMB_L -.-> VEC_GEN
    VEC_GEN -.-> EMB_L
    ATT -.-> SIM
    SIM -.-> ATT
    
    %% 스타일링
    classDef inputClass fill:#e1f5fe
    classDef llmClass fill:#fff3e0
    classDef embeddingClass fill:#f3e5f5
    classDef interfaceClass fill:#e8f5e8
    classDef outputClass fill:#fce4ec
    
    class UD,TXT,DOC inputClass
    class TOK,EMB_L,POS,ATT,FFN,NORM,OUT,DEC llmClass
    class ENC,VEC_GEN,VDB,IDX,META,SIM,RET,RANK embeddingClass
    class API,PROC,POST interfaceClass
    class RES,AUG,FINAL outputClass

 

이 프레임워크 다이어그램은 LLM과 임베딩 모델 간의 복합적인 연계 구조를 보여줍니다. 주요 구성요소와 데이터 인터페이스를 설명드리겠습니다:

 

임베딩 공간 공유

  • LLM의 임베딩 레이어와 임베딩 모델의 벡터 생성기가 동일한 차원 공간에서 작동됩니다.
  • 벡터 표현의 일관성 유지를 통한 의미적 정합성 확보합니다.

검색-생성 연계

  • 임베딩 모델의 검색 결과가 LLM의 디코더에 컨텍스트로 주입됩니다.
  • 어텐션 메커니즘이 검색된 정보와 생성 과정을 동적으로 연결합니다.

양방향 정보 교환

  • LLM의 어텐션 가중치가 임베딩 모델의 유사도 계산에 피드백으로 작용합니다.
  • 생성 과정에서 발생하는 표현이 벡터 데이터베이스 업데이트에 활용됩니다.

프레임워크 특징

  • 계층적 구조 : 입력 → 처리 → 인터페이스 → 출력의 명확한 데이터 파이프라인으로 계층적 구조를 가지고 있습니다.
  • 모듈러 설계 : 각 구성요소가 독립적으로 최적화 가능하면서도 유기적 연결되어 있습니다.
  • 실시간 상호작용 : API 게이트웨이를 통한 동적 요청-응답 처리를 통한 실시간 상호작용이 가능합니다.

이러한 아키텍처는 RAG(Retrieval-Augmented Generation) 시스템의 핵심 구조를 형성하며, 지식 검색과 자연어 생성을 효율적으로 통합합니다.


3. RAG : 두 모델이 협력하는 방법

3.1 RAG(Retrieval-Augmented Generation) 시스템의 완전한 이해

RAG가 필요한 이유와 동작 원리

RAG는 검색(Retrieval)과 생성(Generation)을 결합한 하이브리드 AI 시스템입니다.

이 시스템이 필요한 이유는 LLM 혼자로는 한계가 있기 때문입니다. LLM은 학습된 일반적인 지식은 잘 알고 있지만, 특정 회사의 매뉴얼이나 최신 정보, 개인적인 문서 같은 특화된 정보는 모릅니다. 마치 박학다식한 교수님이 모든 일반 지식은 알고 있지만, 특정 회사의 내부 규정은 모르는 것과 같습니다.

RAG 시스템은 이 문제를 해결하기 위해 먼저 관련 문서를 찾아서 읽어보고(Retrieval), 그 내용을 바탕으로 답변을 만드는(Generation) 방식으로 작동합니다. 이는 사람이 어려운 질문을 받았을 때 관련 자료를 찾아보고 그것을 참고해서 답변하는 과정과 매우 유사합니다.

RAG의 3단계 세부 과정

  • 1단계 - 검색(Retrieval) : 사용자의 질문을 임베딩 모델로 벡터화하고, 미리 구축된 문서 데이터베이스에서 가장 관련성이 높은 문서들을 찾아옵니다. 이 과정에서 벡터 유사도 계산을 통해 의미적으로 유사한 내용을 빠르게 검색할 수 있습니다.
  • 2단계 - 증강(Augmentation) : 검색된 문서들을 LLM이 이해할 수 있는 형태의 컨텍스트로 정리하고 구성합니다. 여러 문서의 내용을 종합하고, 질문과 가장 관련 있는 부분을 강조하여 LLM에게 제공할 정보를 준비합니다.
  • 3단계 - 생성(Generation) : LLM이 제공된 컨텍스트와 사용자 질문을 함께 분석해서 정확하고 유용한 답변을 생성합니다. 이때 LLM은 자신의 일반 지식과 제공된 특화 정보를 모두 활용해서 종합적인 답변을 만듭니다.
RAG 시스템의 완전하고 상세한 구현 코드 예시 >
# RAG 시스템의 완전하고 상세한 구현
import numpy as np
import json
from datetime import datetime
from typing import List, Dict, Tuple, Optional
import sqlite3
from dataclasses import dataclass

@dataclass
class Document:
    """문서 정보를 담는 데이터 클래스"""
    id: str                    # 문서 고유 ID
    title: str                 # 문서 제목
    content: str               # 문서 내용
    metadata: Dict             # 추가 정보 (작성일, 카테고리 등)
    embedding: Optional[np.ndarray] = None  # 임베딩 벡터

class VectorDatabase:
    """벡터 데이터베이스 - 문서들과 임베딩을 저장하고 검색"""
    
    def __init__(self, db_path: str):
        self.db_path = db_path
        self.documents = {}  # 메모리에 문서 저장
        self.embeddings = {}  # 메모리에 임베딩 저장
        self.setup_database()
        
        print(f"벡터 데이터베이스 초기화: {db_path}")
        print("이 데이터베이스는 문서와 임베딩을 저장하여 빠른 검색을 가능하게 합니다.")
    
    def setup_database(self):
        """데이터베이스 테이블 생성"""
        conn = sqlite3.connect(self.db_path)
        cursor = conn.cursor()
        
        # 문서 메타데이터 테이블
        cursor.execute('''
            CREATE TABLE IF NOT EXISTS documents (
                id TEXT PRIMARY KEY,
                title TEXT,
                content TEXT,
                metadata TEXT,
                created_at TEXT,
                embedding_dimension INTEGER
            )
        ''')
        
        conn.commit()
        conn.close()
        print("데이터베이스 테이블 준비 완료")
    
    def add_document(self, document: Document, embedding: np.ndarray):
        """문서와 임베딩을 데이터베이스에 추가"""
        print(f"\n문서 추가 중: '{document.title}'")
        print(f"  문서 ID: {document.id}")
        print(f"  내용 길이: {len(document.content)}자")
        print(f"  임베딩 차원: {embedding.shape[0]}")
        
        # 메모리에 저장
        self.documents[document.id] = document
        self.embeddings[document.id] = embedding
        
        # 데이터베이스에 메타데이터 저장
        conn = sqlite3.connect(self.db_path)
        cursor = conn.cursor()
        
        cursor.execute('''
            INSERT OR REPLACE INTO documents 
            (id, title, content, metadata, created_at, embedding_dimension)
            VALUES (?, ?, ?, ?, ?, ?)
        ''', (
            document.id,
            document.title,
            document.content,
            json.dumps(document.metadata),
            datetime.now().isoformat(),
            embedding.shape[0]
        ))
        
        conn.commit()
        conn.close()
        
        print(f"  → 저장 완료")
    
    def search_similar_documents(self, query_embedding: np.ndarray, 
                               top_k: int = 5) -> List[Tuple[Document, float]]:
        """쿼리 임베딩과 유사한 문서들을 검색"""
        print(f"\n유사 문서 검색 시작 (상위 {top_k}개)")
        print(f"검색 대상: {len(self.documents)}개 문서")
        
        similarities = []
        
        # 모든 문서와 유사도 계산
        for doc_id, doc_embedding in self.embeddings.items():
            # 코사인 유사도 계산
            similarity = self.calculate_cosine_similarity(query_embedding, doc_embedding)
            document = self.documents[doc_id]
            similarities.append((document, similarity))
            
            print(f"  문서 '{document.title}': 유사도 {similarity:.3f}")
        
        # 유사도 높은 순으로 정렬
        similarities.sort(key=lambda x: x[1], reverse=True)
        
        # 상위 k개 반환
        top_results = similarities[:top_k]
        
        print(f"\n검색 완료 - 상위 {len(top_results)}개 문서:")
        for i, (doc, score) in enumerate(top_results):
            print(f"  {i+1}위: '{doc.title}' (유사도: {score:.3f})")
        
        return top_results
    
    def calculate_cosine_similarity(self, vec1: np.ndarray, vec2: np.ndarray) -> float:
        """두 벡터 간의 코사인 유사도 계산"""
        # 벡터의 내적을 계산
        dot_product = np.dot(vec1, vec2)
        
        # 각 벡터의 크기 계산
        norm1 = np.linalg.norm(vec1)
        norm2 = np.linalg.norm(vec2)
        
        # 코사인 유사도 = 내적 / (크기1 * 크기2)
        if norm1 == 0 or norm2 == 0:
            return 0.0
        
        similarity = dot_product / (norm1 * norm2)
        return similarity
    
    def get_database_stats(self) -> Dict:
        """데이터베이스 통계 정보"""
        return {
            "total_documents": len(self.documents),
            "embedding_dimension": list(self.embeddings.values())[0].shape[0] if self.embeddings else 0,
            "storage_size": f"{len(str(self.documents)) / 1024:.1f} KB"
        }

class ContextBuilder:
    """검색된 문서들로부터 LLM용 컨텍스트를 구성"""
    
    def __init__(self, max_context_length: int = 4000):
        self.max_context_length = max_context_length
        print(f"컨텍스트 빌더 초기화 - 최대 길이: {max_context_length}자")
    
    def build_context(self, query: str, retrieved_docs: List[Tuple[Document, float]]) -> str:
        """검색된 문서들로부터 구조화된 컨텍스트 생성"""
        print(f"\n컨텍스트 구성 시작")
        print(f"질문: '{query}'")
        print(f"참조 문서: {len(retrieved_docs)}개")
        
        context_parts = []
        current_length = 0
        
        # 컨텍스트 헤더 추가
        header = f"다음은 '{query}' 질문과 관련된 참고 자료입니다:\n\n"
        context_parts.append(header)
        current_length += len(header)
        
        # 각 문서를 컨텍스트에 추가
        for i, (document, similarity) in enumerate(retrieved_docs):
            print(f"  문서 {i+1} 처리 중: '{document.title}'")
            
            # 문서 정보 포맷팅
            doc_section = self.format_document_section(document, similarity, i+1)
            
            # 길이 제한 확인
            if current_length + len(doc_section) > self.max_context_length:
                print(f"    → 길이 제한으로 인해 여기서 중단 ({current_length}자)")
                break
            
            context_parts.append(doc_section)
            current_length += len(doc_section)
            print(f"    → 추가됨 (현재 길이: {current_length}자)")
        
        # 컨텍스트 푸터 추가
        footer = f"\n위 자료를 참고하여 다음 질문에 정확하고 도움이 되는 답변을 해주세요:\n질문: {query}\n답변:"
        context_parts.append(footer)
        
        full_context = "".join(context_parts)
        
        print(f"\n컨텍스트 구성 완료:")
        print(f"  최종 길이: {len(full_context)}자")
        print(f"  포함된 문서: {len([p for p in context_parts if '참고자료' in p])}개")
        
        return full_context
    
    def format_document_section(self, document: Document, similarity: float, section_num: int) -> str:
        """개별 문서를 컨텍스트 섹션으로 포맷팅"""
        # 문서 내용이 너무 길면 요약
        content = document.content
        if len(content) > 500:
            content = content[:500] + "..."
        
        section = f"""
===== 참고자료 {section_num} =====
제목: {document.title}
관련도: {similarity:.1%}
내용: {content}

"""
        return section
    
    def extract_key_sentences(self, text: str, query: str, max_sentences: int = 3) -> str:
        """텍스트에서 질문과 가장 관련있는 핵심 문장들 추출"""
        sentences = text.split('. ')
        query_words = query.lower().split()
        
        # 각 문장의 관련성 점수 계산
        sentence_scores = []
        for sentence in sentences:
            score = 0
            sentence_lower = sentence.lower()
            for word in query_words:
                if word in sentence_lower:
                    score += 1
            sentence_scores.append((sentence, score))
        
        # 점수 높은 순으로 정렬하여 상위 문장들 선택
        sentence_scores.sort(key=lambda x: x[1], reverse=True)
        selected_sentences = [s[0] for s in sentence_scores[:max_sentences]]
        
        return '. '.join(selected_sentences)

class ComprehensiveRAGSystem:
    """완전한 RAG 시스템 - 모든 구성 요소 통합"""
    
    def __init__(self, vector_db_path: str = "./rag_database.db"):
        print("=== RAG 시스템 초기화 ===")
        
        # 구성 요소들 초기화
        self.embedding_model = self.initialize_embedding_model()
        self.llm_model = self.initialize_llm_model()
        self.vector_db = VectorDatabase(vector_db_path)
        self.context_builder = ContextBuilder(max_context_length=4000)
        
        # 시스템 설정
        self.retrieval_top_k = 5
        self.min_similarity_threshold = 0.3
        
        print("RAG 시스템 초기화 완료!")
        print(f"  검색 개수: 상위 {self.retrieval_top_k}개")
        print(f"  유사도 임계값: {self.min_similarity_threshold}")
    
    def initialize_embedding_model(self):
        """임베딩 모델 초기화"""
        print("임베딩 모델 로딩 중...")
        # 실제로는 복잡한 모델 로딩 과정
        return {
            "model_name": "bge-m3",
            "dimension": 1024,
            "max_sequence_length": 512
        }
    
    def initialize_llm_model(self):
        """LLM 모델 초기화"""
        print("LLM 모델 로딩 중...")
        # 실제로는 복잡한 모델 로딩 과정
        return {
            "model_name": "llama-3.3-70b",
            "max_context_length": 8192,
            "temperature": 0.7
        }
    
    def add_knowledge_documents(self, documents: List[Document]):
        """지식 베이스에 문서들 추가"""
        print(f"\n=== 지식 베이스 구축 ===")
        print(f"추가할 문서: {len(documents)}개")
        
        for i, document in enumerate(documents):
            print(f"\n문서 {i+1}/{len(documents)} 처리 중:")
            
            # 문서를 임베딩으로 변환
            embedding = self.create_document_embedding(document.content)
            
            # 데이터베이스에 저장
            self.vector_db.add_document(document, embedding)
        
        # 통계 정보 출력
        stats = self.vector_db.get_database_stats()
        print(f"\n지식 베이스 구축 완료:")
        print(f"  총 문서 수: {stats['total_documents']}개")
        print(f"  임베딩 차원: {stats['embedding_dimension']}")
        print(f"  저장 크기: {stats['storage_size']}")
    
    def create_document_embedding(self, text: str) -> np.ndarray:
        """텍스트를 임베딩 벡터로 변환"""
        print(f"    임베딩 생성 중... (텍스트 길이: {len(text)}자)")
        
        # 실제로는 복잡한 임베딩 모델 처리
        # 여기서는 시뮬레이션을 위해 랜덤 벡터 생성
        embedding_dim = self.embedding_model["dimension"]
        
        # 텍스트 길이와 내용에 따라 다른 임베딩 생성 (시뮬레이션)
        np.random.seed(hash(text) % 2**32)  # 같은 텍스트는 같은 임베딩
        embedding = np.random.randn(embedding_dim)
        
        # 정규화
        embedding = embedding / np.linalg.norm(embedding)
        
        print(f"    → 임베딩 생성 완료 ({embedding_dim}차원)")
        return embedding
    
    def answer_question(self, question: str) -> Dict:
        """사용자 질문에 대한 RAG 기반 답변 생성"""
        print(f"\n=== RAG 질답 시작 ===")
        print(f"질문: '{question}'")
        
        start_time = datetime.now()
        
        # 1단계: 질문을 임베딩으로 변환
        print(f"\n1단계: 질문 임베딩 변환")
        query_embedding = self.create_document_embedding(question)
        print(f"  질문 임베딩 생성 완료")
        
        # 2단계: 관련 문서 검색
        print(f"\n2단계: 관련 문서 검색")
        retrieved_docs = self.vector_db.search_similar_documents(
            query_embedding, self.retrieval_top_k
        )
        
        # 유사도 임계값 필터링
        filtered_docs = [
            (doc, score) for doc, score in retrieved_docs 
            if score >= self.min_similarity_threshold
        ]
        
        if not filtered_docs:
            print(f"  경고: 유사도 {self.min_similarity_threshold} 이상인 문서가 없습니다.")
            return self.handle_no_relevant_documents(question)
        
        print(f"  → {len(filtered_docs)}개 문서가 임계값을 통과했습니다.")
        
        # 3단계: 컨텍스트 구성
        print(f"\n3단계: 컨텍스트 구성")
        context = self.context_builder.build_context(question, filtered_docs)
        
        # 4단계: LLM으로 답변 생성
        print(f"\n4단계: LLM 답변 생성")
        response = self.generate_llm_response(context)
        
        # 5단계: 결과 정리
        end_time = datetime.now()
        processing_time = (end_time - start_time).total_seconds()
        
        result = {
            "question": question,
            "answer": response,
            "retrieved_documents": [
                {
                    "title": doc.title,
                    "similarity": score,
                    "content_preview": doc.content[:200] + "...",
                    "metadata": doc.metadata
                }
                for doc, score in filtered_docs
            ],
            "context_length": len(context),
            "processing_time": processing_time,
            "retrieval_count": len(filtered_docs)
        }
        
        print(f"\n=== RAG 답변 완료 ===")
        print(f"처리 시간: {processing_time:.2f}초")
        print(f"참조 문서: {len(filtered_docs)}개")
        print(f"컨텍스트 길이: {len(context)}자")
        
        return result
    
    def generate_llm_response(self, context: str) -> str:
        """LLM을 사용해서 컨텍스트 기반 답변 생성"""
        print(f"  LLM 모델에 컨텍스트 전달 중...")
        print(f"  컨텍스트 길이: {len(context)}자")
        
        # 실제로는 복잡한 LLM 추론 과정
        # 여기서는 시뮬레이션된 답변 생성
        
        # 컨텍스트에서 키워드 추출
        keywords = self.extract_keywords_from_context(context)
        
        # 시뮬레이션된 답변 생성
        if "CNC" in keywords and "진동" in keywords:
            response = """CNC 머신에서 진동이 발생하는 주요 원인들을 참고 자료를 바탕으로 설명드리겠습니다.

**주요 원인:**
1. **주축 베어링 마모**: 장시간 사용으로 인한 베어링 마모가 가장 일반적인 원인입니다.
2. **공구 불균형**: 공구가 제대로 고정되지 않았거나 마모된 경우 진동이 발생할 수 있습니다.
3. **기계적 느슨함**: 볼트나 고정 부품이 느슨해진 경우입니다.

**해결 방법:**
- 정기적인 베어링 점검 및 교체
- 공구 상태 확인 및 적절한 고정
- 진동 주파수 분석을 통한 정확한 원인 파악

참고 자료에 따르면 진동 수치가 0.5mm/s를 초과할 경우 즉시 점검이 필요합니다."""

        elif "알루미늄" in keywords and "가공" in keywords:
            response = """알루미늄 가공에 대한 최적 조건을 참고 자료를 바탕으로 안내드리겠습니다.

**권장 가공 조건:**
- 절삭 속도: 150-300 m/min
- 이송률: 0.1-0.3 mm/rev
- 절삭 깊이: 1-5 mm

**주의사항:**
- 충분한 절삭유 공급 필수
- 카바이드 공구 사용 권장
- 칩 제거를 위한 적절한 배출 시스템 필요

참고 자료에서 제시된 조건을 따르면 우수한 표면 조도와 공구 수명을 확보할 수 있습니다."""

        else:
            response = f"제공된 참고 자료를 바탕으로 답변을 드리겠습니다. 키워드: {', '.join(keywords[:5])}"
        
        print(f"  → LLM 답변 생성 완료 (길이: {len(response)}자)")
        return response
    
    def extract_keywords_from_context(self, context: str) -> List[str]:
        """컨텍스트에서 핵심 키워드 추출"""
        # 간단한 키워드 추출 (실제로는 더 정교한 NLP 기법 사용)
        keywords = []
        common_terms = ["CNC", "진동", "알루미늄", "가공", "온도", "주축", "베어링", "공구", "절삭"]
        
        for term in common_terms:
            if term in context:
                keywords.append(term)
        
        return keywords
    
    def handle_no_relevant_documents(self, question: str) -> Dict:
        """관련 문서가 없는 경우 처리"""
        print("  관련 문서를 찾을 수 없습니다.")
        
        fallback_response = f"""죄송합니다. '{question}'에 대한 구체적인 정보를 찾을 수 없습니다.

현재 지식 베이스에 관련 문서가 없거나 질문과의 유사도가 낮습니다.

**제안사항:**
1. 더 구체적인 키워드를 사용해서 다시 질문해 보세요
2. 관련 문서를 지식 베이스에 추가해 주세요
3. 질문을 다른 방식으로 표현해 보세요

현재 지식 베이스에는 {self.vector_db.get_database_stats()['total_documents']}개의 문서가 있습니다."""

        return {
            "question": question,
            "answer": fallback_response,
            "retrieved_documents": [],
            "context_length": 0,
            "processing_time": 0.1,
            "retrieval_count": 0,
            "status": "no_relevant_documents"
        }
    
    def get_system_status(self) -> Dict:
        """시스템 상태 정보 조회"""
        db_stats = self.vector_db.get_database_stats()
        
        return {
            "embedding_model": self.embedding_model["model_name"],
            "llm_model": self.llm_model["model_name"],
            "knowledge_base": {
                "document_count": db_stats["total_documents"],
                "embedding_dimension": db_stats["embedding_dimension"],
                "storage_size": db_stats["storage_size"]
            },
            "search_settings": {
                "top_k": self.retrieval_top_k,
                "similarity_threshold": self.min_similarity_threshold
            },
            "system_health": "operational" if db_stats["total_documents"] > 0 else "needs_documents"
        }

# RAG 시스템 데모 및 실습
def comprehensive_rag_demo():
    """RAG 시스템의 완전한 동작 과정을 보여주는 데모"""
    print("=== 포괄적 RAG 시스템 데모 ===")
    
    # RAG 시스템 초기화
    rag_system = ComprehensiveRAGSystem("./demo_rag.db")
    
    # 제조업 관련 샘플 문서들 준비
    sample_documents = [
        Document(
            id="cnc_vibration_001",
            title="CNC 머신 진동 원인 및 해결방법",
            content="""CNC 머신에서 발생하는 진동의 주요 원인은 다음과 같습니다. 
            첫째, 주축 베어링의 마모로 인한 진동입니다. 베어링이 마모되면 회전 시 불균등한 힘이 발생하여 진동을 유발합니다.
            둘째, 공구의 불균형이나 마모로 인한 진동입니다. 공구가 적절히 고정되지 않거나 날이 마모되면 절삭 시 진동이 발생합니다.
            셋째, 기계 구조물의 느슨함입니다. 볼트나 고정 부품이 느슨해지면 전체적인 진동이 증가합니다.
            해결방법으로는 정기적인 베어링 점검, 공구 상태 확인, 구조물 점검이 필요합니다.
            진동 측정값이 0.5mm/s를 초과하면 즉시 점검해야 합니다.""",
            metadata={"category": "maintenance", "equipment": "CNC", "priority": "high"}
        ),
        
        Document(
            id="aluminum_machining_002", 
            title="알루미늄 CNC 가공 최적 조건",
            content="""알루미늄 합금의 CNC 가공에서 최적의 결과를 얻기 위한 조건들을 설명합니다.
            절삭 속도는 재질에 따라 다르지만 일반적으로 Al6061의 경우 150-300m/min이 적합합니다.
            이송률은 0.1-0.3mm/rev 범위에서 설정하며, 표면 조도에 따라 조절합니다.
            절삭 깊이는 roughing에서 3-5mm, finishing에서 0.2-0.5mm를 권장합니다.
            절삭유는 반드시 충분히 공급해야 하며, 에멀젼 타입을 사용하는 것이 좋습니다.
            공구는 초경 카바이드를 사용하며, 코팅된 공구를 선택하면 수명이 연장됩니다.
            칩 처리가 중요하므로 적절한 배출 시스템을 갖춰야 합니다.""",
            metadata={"category": "machining", "material": "aluminum", "priority": "medium"}
        ),
        
        Document(
            id="temperature_monitoring_003",
            title="CNC 머신 온도 모니터링 가이드", 
            content="""CNC 머신의 온도 관리는 정밀도와 공구 수명에 직접적인 영향을 미칩니다.
            주축 베어링 온도는 정상적으로 60-75°C 범위를 유지해야 합니다.
            80°C를 초과하면 베어링 손상 위험이 있으므로 즉시 작업을 중단해야 합니다.
            절삭 온도는 재료와 조건에 따라 다르지만 일반적으로 200-300°C가 적정 범위입니다.
            과도한 절삭 온도는 공구 마모를 가속화하고 가공 정밀도를 떨어뜨립니다.
            냉각수 온도는 15-25°C로 유지하는 것이 좋으며, 순환량도 충분해야 합니다.
            온도 센서를 통한 실시간 모니터링 시스템 구축을 권장합니다.""",
            metadata={"category": "monitoring", "sensor_type": "temperature", "priority": "high"}
        ),
        
        Document(
            id="preventive_maintenance_004",
            title="CNC 머신 예방정비 체크리스트",
            content="""효과적인 예방정비를 위한 체크리스트입니다.
            일일 점검 항목: 절삭유 레벨 확인, 칩 제거 상태 점검, 이상 소음 확인, 진동 측정
            주간 점검 항목: 가이드웨이 윤활, 볼스크류 청소, 공구 마모 상태 확인
            월간 점검 항목: 베어링 그리스 보충, 필터 교체, 정밀도 측정, 백래쉬 점검
            연간 점검 항목: 베어링 교체, 볼스크류 정밀도 재조정, 기하학적 정밀도 측정
            각 점검마다 기록을 남겨 이력 관리를 해야 합니다.
            문제 발견 시 즉시 조치하여 큰 고장을 예방할 수 있습니다.""",
            metadata={"category": "maintenance", "type": "preventive", "priority": "medium"}
        )
    ]
    
    # 지식 베이스에 문서들 추가
    rag_system.add_knowledge_documents(sample_documents)
    
    # 시스템 상태 확인
    status = rag_system.get_system_status()
    print(f"\n=== 시스템 상태 ===")
    print(f"지식 베이스: {status['knowledge_base']['document_count']}개 문서")
    print(f"시스템 상태: {status['system_health']}")
    
    # 다양한 질문으로 RAG 시스템 테스트
    test_questions = [
        "CNC 머신에서 진동이 심하게 발생하는데 원인이 무엇인가요?",
        "알루미늄을 CNC로 가공할 때 어떤 조건을 사용해야 하나요?",
        "주축 베어링 온도가 85도까지 올라갔는데 괜찮나요?",
        "예방정비를 위해 매일 체크해야 할 항목은 무엇인가요?",
        "스테인리스 스틸 가공 조건을 알려주세요"  # 관련 문서가 없는 질문
    ]
    
    print(f"\n=== 질답 테스트 시작 ===")
    
    for i, question in enumerate(test_questions, 1):
        print(f"\n### 테스트 {i} ###")
        result = rag_system.answer_question(question)
        
        print(f"\n📋 질문: {result['question']}")
        print(f"⏱ 처리시간: {result['processing_time']:.2f}초")
        print(f"📚 참조문서: {result['retrieval_count']}개")
        
        if result['retrieved_documents']:
            print(f"🔍 주요 참조:")
            for doc in result['retrieved_documents'][:2]:
                print(f"  - {doc['title']} (유사도: {doc['similarity']:.1%})")
        
        print(f"\n💬 답변:")
        print(result['answer'])
        print(f"\n{'='*60}")
    
    print(f"\n=== RAG 시스템 데모 완료 ===")

# 실행하기
comprehensive_rag_demo()

3.2 RAG 시스템의 성능 최적화 기법들

임베딩 품질 향상 방법

RAG 시스템의 성능은 임베딩의 품질에 크게 좌우됩니다.

좋은 임베딩을 만들기 위해서는 여러 가지 기법을 사용할 수 있습니다.

  • 첫째, 도메인 특화 데이터로 임베딩 모델을 파인튜닝하는 것입니다. 제조업 전용 임베딩 모델을 만들기 위해 CNC 매뉴얼, 기술 문서, 고장 보고서 등을 사용해서 모델을 추가 학습시킵니다.
  • 둘째, 청크 크기를 최적화하는 것입니다. 너무 작으면 문맥이 부족하고, 너무 크면 정확도가 떨어지므로 적절한 크기(보통 200-500단어)를 찾아야 합니다.
  • 셋째, 중복 제거와 전처리를 철저히 하는 것입니다. 같은 내용이 여러 번 나오면 검색 품질이 떨어지므로 사전에 제거해야 합니다.

검색 정확도 개선 전략

  • 하이브리드 검색 : 벡터 검색과 키워드 검색을 결합하여 의미적 유사성과 정확한 용어 매칭을 모두 활용합니다. 예를 들어 "G01" 같은 정확한 코드는 키워드로, "가공 문제" 같은 개념은 벡터로 검색합니다.
  • 재순위 매기기(Re-ranking) : 첫 번째 검색에서 많은 후보를 가져온 후, 더 정교한 모델로 다시 순위를 매겨 최종 결과를 선정합니다. 이를 통해 검색 정확도를 크게 향상시킬 수 있습니다.
  • 쿼리 확장 : 사용자 질문에 동의어나 관련 용어를 자동으로 추가해서 검색 범위를 넓힙니다. "진동"을 검색할 때 "소음", "떨림", "vibration" 등도 함께 고려합니다.
  • 메타데이터 필터링 : 문서의 카테고리, 날짜, 중요도 등의 메타데이터를 활용해서 검색 결과를 더 정확하게 필터링합니다.
고급 RAG 최적화 기법들 구현 코드 예시 >
# 고급 RAG 최적화 기법들 구현
class AdvancedRAGOptimizer:
    """RAG 시스템 성능 최적화를 위한 고급 기법들"""
    
    def __init__(self):
        self.chunk_size_optimizer = ChunkSizeOptimizer()
        self.hybrid_searcher = HybridSearcher()
        self.query_expander = QueryExpander()
        self.reranker = DocumentReranker()
        
        print("고급 RAG 최적화 시스템 초기화 완료")
        print("적용된 최적화 기법:")
        print("  - 동적 청킹 최적화")
        print("  - 하이브리드 검색 (벡터 + 키워드)")
        print("  - 쿼리 확장")
        print("  - 문서 재순위 매기기")

class ChunkSizeOptimizer:
    """문서 청킹 크기 최적화"""
    
    def __init__(self, min_chunk_size: int = 100, max_chunk_size: int = 600):
        self.min_chunk_size = min_chunk_size
        self.max_chunk_size = max_chunk_size
        
    def optimize_chunks(self, text: str, content_type: str = "general") -> List[str]:
        """내용 유형에 따라 최적 크기로 청킹"""
        print(f"\n문서 청킹 최적화 시작")
        print(f"  원본 길이: {len(text)}자")
        print(f"  내용 유형: {content_type}")
        
        # 내용 유형별 최적 청크 크기 설정
        if content_type == "technical_manual":
            target_size = 400  # 기술 매뉴얼은 중간 크기
            overlap = 50       # 50자 겹침
        elif content_type == "error_code":
            target_size = 200  # 오류 코드는 작게
            overlap = 20
        elif content_type == "specification":
            target_size = 300  # 사양서는 작게
            overlap = 30
        else:
            target_size = 350  # 일반적인 크기
            overlap = 40
        
        chunks = self.sliding_window_chunk(text, target_size, overlap)
        
        print(f"  청킹 결과: {len(chunks)}개 청크 생성")
        print(f"  평균 청크 크기: {sum(len(c) for c in chunks) / len(chunks):.0f}자")
        
        return chunks
    
    def sliding_window_chunk(self, text: str, target_size: int, overlap: int) -> List[str]:
        """슬라이딩 윈도우 방식 청킹"""
        chunks = []
        sentences = text.split('. ')
        current_chunk = ""
        
        for sentence in sentences:
            if len(current_chunk + sentence) < target_size:
                current_chunk += sentence + ". "
            else:
                if current_chunk.strip():
                    chunks.append(current_chunk.strip())
                
                # 겹침 부분 계산
                overlap_text = current_chunk[-overlap:] if len(current_chunk) > overlap else ""
                current_chunk = overlap_text + sentence + ". "
        
        # 마지막 청크 추가
        if current_chunk.strip():
            chunks.append(current_chunk.strip())
        
        return chunks

class HybridSearcher:
    """벡터 검색과 키워드 검색을 결합한 하이브리드 검색"""
    
    def __init__(self, vector_weight: float = 0.7, keyword_weight: float = 0.3):
        self.vector_weight = vector_weight
        self.keyword_weight = keyword_weight
        self.important_keywords = self.load_important_keywords()
        
    def load_important_keywords(self) -> Dict[str, float]:
        """제조업 중요 키워드와 가중치"""
        return {
            "CNC": 1.0, "G01": 1.0, "G00": 1.0, "M03": 1.0,
            "진동": 0.9, "온도": 0.9, "압력": 0.9,
            "베어링": 0.8, "주축": 0.8, "공구": 0.8,
            "알루미늄": 0.7, "스테인리스": 0.7,
            "가공": 0.6, "절삭": 0.6, "이송": 0.6
        }
    
    def hybrid_search(self, query: str, documents: List[Document], 
                     query_embedding: np.ndarray, top_k: int = 10) -> List[Tuple[Document, float]]:
        """하이브리드 검색 수행"""
        print(f"\n하이브리드 검색 시작")
        print(f"  벡터 가중치: {self.vector_weight}")
        print(f"  키워드 가중치: {self.keyword_weight}")
        
        results = []
        
        for document in documents:
            # 벡터 유사도 계산
            vector_score = self.calculate_vector_similarity(query_embedding, document.embedding)
            
            # 키워드 점수 계산
            keyword_score = self.calculate_keyword_score(query, document.content)
            
            # 최종 점수 계산 (가중 평균)
            final_score = (self.vector_weight * vector_score + 
                          self.keyword_weight * keyword_score)
            
            results.append((document, final_score))
            
            print(f"    {document.title[:30]}...")
            print(f"      벡터: {vector_score:.3f}, 키워드: {keyword_score:.3f}, 최종: {final_score:.3f}")
        
        # 점수 순으로 정렬
        results.sort(key=lambda x: x[1], reverse=True)
        
        print(f"  하이브리드 검색 완료: {len(results)}개 문서 평가")
        return results[:top_k]
    
    def calculate_vector_similarity(self, query_embedding: np.ndarray, doc_embedding: np.ndarray) -> float:
        """벡터 유사도 계산"""
        if doc_embedding is None:
            return 0.0
        return np.dot(query_embedding, doc_embedding)
    
    def calculate_keyword_score(self, query: str, document_content: str) -> float:
        """키워드 기반 점수 계산"""
        query_words = query.lower().split()
        doc_content_lower = document_content.lower()
        
        total_score = 0.0
        matched_keywords = 0
        
        for word in query_words:
            if word in doc_content_lower:
                # 중요 키워드인 경우 가중치 적용
                weight = self.important_keywords.get(word, 0.5)
                
                # 빈도수 계산
                frequency = doc_content_lower.count(word)
                
                # 점수 계산 (로그 스케일로 빈도 보정)
                score = weight * (1 + np.log(frequency))
                total_score += score
                matched_keywords += 1
        
        # 매칭된 키워드 비율로 정규화
        if len(query_words) > 0:
            normalized_score = total_score / len(query_words)
        else:
            normalized_score = 0.0
        
        return min(normalized_score, 1.0)  # 1.0으로 상한 제한

class QueryExpander:
    """쿼리 확장을 통한 검색 성능 향상"""
    
    def __init__(self):
        self.synonym_dict = self.build_manufacturing_synonyms()
        self.related_terms = self.build_related_terms()
        
    def build_manufacturing_synonyms(self) -> Dict[str, List[str]]:
        """제조업 동의어 사전 구축"""
        return {
            "진동": ["떨림", "vibration", "oscillation", "흔들림"],
            "소음": ["잡음", "noise", "sound", "울림"],
            "온도": ["열", "temperature", "temp", "발열"],
            "압력": ["pressure", "press", "가압"],
            "가공": ["machining", "processing", "cutting", "절삭"],
            "공구": ["tool", "bit", "cutter", "엔드밀"],
            "주축": ["spindle", "메인축", "회전축"],
            "베어링": ["bearing", "축받이"],
            "윤활": ["lubrication", "오일", "그리스", "유지"],
            "정밀도": ["precision", "accuracy", "정확도"]
        }
    
    def build_related_terms(self) -> Dict[str, List[str]]:
        """관련 용어 사전 구축"""
        return {
            "CNC": ["수치제어", "NC", "컴퓨터", "자동화"],
            "진동": ["베어링", "불균형", "공명", "주파수"],
            "온도": ["냉각", "발열", "열팽창", "써멀"],
            "가공": ["절삭", "드릴링", "밀링", "터닝"],
            "품질": ["정밀도", "공차", "표면조도", "치수"]
        }
    
    def expand_query(self, original_query: str, expansion_ratio: float = 0.3) -> str:
        """쿼리에 동의어와 관련 용어 추가"""
        print(f"\n쿼리 확장 시작: '{original_query}'")
        
        query_words = original_query.split()
        expanded_terms = []
        
        for word in query_words:
            # 원본 단어 추가
            expanded_terms.append(word)
            
            # 동의어 추가
            if word in self.synonym_dict:
                synonyms = self.synonym_dict[word][:2]  # 상위 2개만
                expanded_terms.extend(synonyms)
                print(f"  '{word}' 동의어 추가: {synonyms}")
            
            # 관련 용어 추가
            if word in self.related_terms:
                related = self.related_terms[word][:1]  # 상위 1개만
                expanded_terms.extend(related)
                print(f"  '{word}' 관련용어 추가: {related}")
        
        # 중복 제거
        unique_terms = list(dict.fromkeys(expanded_terms))
        expanded_query = " ".join(unique_terms)
        
        print(f"  확장된 쿼리: '{expanded_query}'")
        print(f"  용어 수: {len(query_words)} → {len(unique_terms)}")
        
        return expanded_query

class DocumentReranker:
    """검색된 문서들의 순위 재조정"""
    
    def __init__(self):
        self.quality_factors = {
            "recency": 0.1,      # 최신성
            "authority": 0.2,    # 권위성
            "completeness": 0.3, # 완전성
            "relevance": 0.4     # 관련성
        }
        
    def rerank_documents(self, query: str, retrieved_docs: List[Tuple[Document, float]]) -> List[Tuple[Document, float]]:
        """검색된 문서들의 순위를 재조정"""
        print(f"\n문서 재순위 매기기 시작")
        print(f"  대상 문서: {len(retrieved_docs)}개")
        
        reranked_results = []
        
        for doc, original_score in retrieved_docs:
            # 각 품질 요소별 점수 계산
            recency_score = self.calculate_recency_score(doc)
            authority_score = self.calculate_authority_score(doc)
            completeness_score = self.calculate_completeness_score(doc, query)
            relevance_score = original_score  # 기존 유사도 점수 사용
            
            # 가중 평균으로 최종 점수 계산
            final_score = (
                self.quality_factors["recency"] * recency_score +
                self.quality_factors["authority"] * authority_score +
                self.quality_factors["completeness"] * completeness_score +
                self.quality_factors["relevance"] * relevance_score
            )
            
            reranked_results.append((doc, final_score))
            
            print(f"    {doc.title[:30]}...")
            print(f"      최신성: {recency_score:.2f}, 권위성: {authority_score:.2f}")
            print(f"      완전성: {completeness_score:.2f}, 관련성: {relevance_score:.2f}")
            print(f"      최종점수: {original_score:.3f} → {final_score:.3f}")
        
        # 새로운 점수로 재정렬
        reranked_results.sort(key=lambda x: x[1], reverse=True)
        
        print(f"  재순위 매기기 완료")
        return reranked_results
    
    def calculate_recency_score(self, document: Document) -> float:
        """문서의 최신성 점수 계산"""
        # 문서 생성일 기반 점수 (최신일수록 높은 점수)
        priority = document.metadata.get("priority", "medium")
        
        if priority == "high":
            return 1.0
        elif priority == "medium":
            return 0.7
        else:
            return 0.4
    
    def calculate_authority_score(self, document: Document) -> float:
        """문서의 권위성 점수 계산"""
        # 문서 출처나 작성자 기반 점수
        category = document.metadata.get("category", "general")
        
        authority_scores = {
            "official_manual": 1.0,
            "maintenance": 0.9,
            "specification": 0.8,
            "machining": 0.7,
            "general": 0.5
        }
        
        return authority_scores.get(category, 0.5)
    
    def calculate_completeness_score(self, document: Document, query: str) -> float:
        """문서의 완전성 점수 계산"""
        # 쿼리에 대한 답변의 완전성 평가
        query_words = set(query.lower().split())
        doc_words = set(document.content.lower().split())
        
        # 쿼리 단어들이 문서에 얼마나 포함되어 있는지
        coverage = len(query_words.intersection(doc_words)) / len(query_words) if query_words else 0
        
        # 문서 길이도 고려 (너무 짧으면 불완전할 가능성)
        length_factor = min(len(document.content) / 500, 1.0)  # 500자 기준으로 정규화
        
        return (coverage * 0.7 + length_factor * 0.3)

# 최적화된 RAG 시스템 데모
def optimized_rag_demo():
    """최적화 기법이 적용된 RAG 시스템 데모"""
    print("=== 최적화된 RAG 시스템 데모 ===")
    
    optimizer = AdvancedRAGOptimizer()
    
    # 최적화 기법들 개별 테스트
    print(f"\n### 1. 청킹 최적화 테스트 ###")
    sample_text = """CNC 머신에서 발생하는 진동의 주요 원인과 해결방법을 설명합니다. 
    첫째, 주축 베어링의 마모로 인한 진동입니다. 베어링이 오래 사용되면서 마모되면 회전 시 불균등한 힘이 발생하여 진동을 유발합니다.
    둘째, 공구의 불균형이나 마모로 인한 진동입니다. 공구가 적절히 고정되지 않거나 날이 마모되면 절삭 시 진동이 발생합니다.
    셋째, 기계 구조물의 느슨함입니다. 볼트나 고정 부품이 느슨해지면 전체적인 진동이 증가합니다."""
    
    chunks = optimizer.chunk_size_optimizer.optimize_chunks(sample_text, "technical_manual")
    
    print(f"\n### 2. 쿼리 확장 테스트 ###")
    original_query = "CNC 진동 문제"
    expanded_query = optimizer.query_expander.expand_query(original_query)
    
    print(f"\n### 3. 최적화 효과 ###")
    print(f"청킹 최적화: {len(sample_text)}자 → {len(chunks)}개 청크")
    print(f"쿼리 확장: '{original_query}' → '{expanded_query}'")
    print(f"하이브리드 검색: 벡터 + 키워드 결합")
    print(f"재순위 매기기: 4가지 품질 요소 적용")
    
    print("\n=== 최적화 데모 완료 ===")

# 실행하기
optimized_rag_demo()

4. 파인튜닝과 지속적 학습 시스템 상세 구현

4.1 제조업 특화 파인튜닝 프로세스

파인튜닝이 필요한 이유와 과정

파인튜닝은 이미 학습된 범용 AI 모델을 특정 도메인(여기서는 제조업)에 맞게 추가로 학습시키는 과정입니다.

이는 마치 의과대학을 졸업한 일반의가 특정 전문과목의 전문의가 되기 위해 추가 교육을 받는 것과 같습니다. 범용 LLM은 일반적인 언어 능력은 뛰어나지만, CNC G-코드나 제조업 전문 용어, 특정 장비의 작동 방식 등에 대해서는 정확하지 않을 수 있습니다.

파인튜닝을 통해 이런 전문 지식을 모델에 주입하고, 제조업 환경에서 더 정확하고 유용한 답변을 할 수 있게 만듭니다. 파인튜닝 과정은 크게 데이터 준비, 모델 학습, 성능 평가, 배포의 4단계로 이루어지며, 각 단계마다 세심한 주의가 필요합니다.

제조업 특화 데이터셋 구성 전략

  • Instruction-Response 페어링 : "CNC 머신에서 G01 코드의 의미는?" → "G01은 직선 보간 이송 명령으로..." 형태의 질문-답변 쌍을 만듭니다. 이때 실제 현장에서 자주 묻는 질문들을 수집하는 것이 중요합니다.
  • 시나리오 기반 데이터 : 실제 작업 상황을 반영한 복합적인 시나리오를 포함합니다. 예를 들어 "알루미늄 가공 중 표면 거칠기가 나빠지고 있는데, 절삭 조건을 어떻게 조정해야 하나요?"
  • 오류 해결 데이터 : 일반적인 오류 상황과 해결 방법을 쌍으로 만들어 문제 해결 능력을 향상시킵니다. "주축 진동이 0.8mm/s인 상황" → "베어링 점검 및 교체 필요"
  • 안전 관련 데이터 : 제조업에서 가장 중요한 안전 규정과 절차를 충분히 학습시켜 안전한 작업 방법을 제안할 수 있게 합니다.
제조업 특화 파인튜팅 시스템의 구현 코드 예시 >
# 제조업 특화 파인튜닝 시스템 완전 구현
import json
import torch
import numpy as np
from datetime import datetime
from typing import List, Dict, Tuple, Optional
from dataclasses import dataclass
import sqlite3
import os

@dataclass
class TrainingExample:
    """훈련 예시를 담는 데이터 클래스"""
    instruction: str    # 지시사항 또는 질문
    input_context: str  # 입력 맥락 (선택사항)
    output: str        # 기대되는 출력
    category: str      # 카테고리 (안전, 가공, 정비 등)
    difficulty: str    # 난이도 (basic, intermediate, advanced)
    source: str        # 데이터 출처

class ManufacturingDatasetBuilder:
    """제조업 특화 데이터셋 구축 시스템"""
    
    def __init__(self, dataset_path: str = "./manufacturing_dataset.db"):
        self.dataset_path = dataset_path
        self.setup_database()
        
        # 데이터 카테고리별 템플릿
        self.data_templates = self.initialize_templates()
        
        print("제조업 데이터셋 구축 시스템 초기화 완료")
        print(f"데이터베이스 경로: {dataset_path}")
        print(f"지원 카테고리: {list(self.data_templates.keys())}")
    
    def setup_database(self):
        """데이터셋 저장용 데이터베이스 설정"""
        conn = sqlite3.connect(self.dataset_path)
        cursor = conn.cursor()
        
        cursor.execute('''
            CREATE TABLE IF NOT EXISTS training_examples (
                id INTEGER PRIMARY KEY AUTOINCREMENT,
                instruction TEXT NOT NULL,
                input_context TEXT,
                output TEXT NOT NULL,
                category TEXT NOT NULL,
                difficulty TEXT NOT NULL,
                source TEXT NOT NULL,
                created_at TEXT NOT NULL,
                quality_score REAL DEFAULT 0.0,
                used_count INTEGER DEFAULT 0
            )
        ''')
        
        cursor.execute('''
            CREATE TABLE IF NOT EXISTS dataset_stats (
                category TEXT PRIMARY KEY,
                total_examples INTEGER,
                last_updated TEXT
            )
        ''')
        
        conn.commit()
        conn.close()
        print("데이터베이스 테이블 설정 완료")
    
    def initialize_templates(self) -> Dict[str, List[Dict]]:
        """카테고리별 데이터 생성 템플릿 초기화"""
        return {
            "cnc_operation": [
                {
                    "instruction_template": "CNC 머신에서 {code} 코드의 기능을 설명해주세요.",
                    "response_template": "{code}는 {function}을 수행하는 G코드입니다. {detailed_explanation}",
                    "variables": {
                        "code": ["G00", "G01", "G02", "G03", "G17", "G18", "G19", "M03", "M04", "M05"],
                        "function": ["급속이송", "직선보간", "시계방향 원호보간", "반시계방향 원호보간"],
                        "detailed_explanation": ["고속으로 위치 이동시 사용", "설정된 이송속도로 직선 이동"]
                    }
                },
                {
                    "instruction_template": "{material} 가공 시 권장 절삭 조건을 알려주세요.",
                    "response_template": "{material} 가공 권장 조건:\n- 절삭속도: {speed}\n- 이송률: {feed}\n- 절삭깊이: {depth}",
                    "variables": {
                        "material": ["알루미늄", "스테인리스 스틸", "탄소강", "구리"],
                        "speed": ["150-300m/min", "80-150m/min", "100-200m/min"],
                        "feed": ["0.1-0.3mm/rev", "0.05-0.2mm/rev", "0.2-0.4mm/rev"],
                        "depth": ["1-5mm", "0.5-3mm", "2-8mm"]
                    }
                }
            ],
            
            "troubleshooting": [
                {
                    "instruction_template": "{symptom} 증상이 나타나는데 원인과 해결방법을 알려주세요.",
                    "response_template": "{symptom}의 주요 원인:\n1. {cause1}\n2. {cause2}\n\n해결방법:\n- {solution1}\n- {solution2}",
                    "variables": {
                        "symptom": ["주축 진동", "가공 소음", "표면 거칠기 불량", "치수 불량"],
                        "cause1": ["베어링 마모", "공구 마모", "절삭 조건 부적절"],
                        "cause2": ["기계 정렬 불량", "고정 불량", "냉각 부족"],
                        "solution1": ["베어링 교체", "공구 교체", "절삭 조건 재설정"],
                        "solution2": ["기하학적 조정", "고정력 점검", "냉각 시스템 점검"]
                    }
                }
            ],
            
            "safety": [
                {
                    "instruction_template": "{situation}에서 지켜야 할 안전 수칙을 알려주세요.",
                    "response_template": "{situation} 안전 수칙:\n1. {safety1}\n2. {safety2}\n3. {safety3}\n\n⚠️ 주의: {warning}",
                    "variables": {
                        "situation": ["CNC 머신 작업", "공구 교체", "청소 작업", "정비 작업"],
                        "safety1": ["안전장비 착용", "전원 차단 확인", "비상정지 위치 확인"],
                        "safety2": ["작업 영역 정리", "공구 상태 점검", "안전 가드 설치"],
                        "safety3": ["절차서 준수", "2인 작업 실시", "점검 기록 작성"],
                        "warning": ["회전 부품 접촉 금지", "전원 투입 상태에서 작업 금지"]
                    }
                }
            ],
            
            "maintenance": [
                {
                    "instruction_template": "{equipment}의 {period} 점검 항목을 알려주세요.",
                    "response_template": "{equipment} {period} 점검 항목:\n□ {item1}\n□ {item2}\n□ {item3}\n□ {item4}",
                    "variables": {
                        "equipment": ["CNC 선반", "CNC 밀링", "연삭기", "드릴링 머신"],
                        "period": ["일일", "주간", "월간", "연간"],
                        "item1": ["윤활유 확인", "베어링 상태", "정밀도 측정"],
                        "item2": ["청소 상태", "진동 측정", "가이드웨이 점검"],
                        "item3": ["안전장치 동작", "냉각 시스템", "전기 시스템"],
                        "item4": ["소모품 교체", "교정 작업", "성능 테스트"]
                    }
                }
            ]
        }
    
    def generate_training_examples(self, category: str, count: int = 100) -> List[TrainingExample]:
        """특정 카테고리의 훈련 예시 자동 생성"""
        print(f"\n{category} 카테고리 훈련 예시 {count}개 생성 시작")
        
        if category not in self.data_templates:
            raise ValueError(f"지원하지 않는 카테고리: {category}")
        
        templates = self.data_templates[category]
        examples = []
        
        for i in range(count):
            # 템플릿 랜덤 선택
            template = np.random.choice(templates)
            
            # 변수 랜덤 선택하여 템플릿 채우기
            filled_instruction = self.fill_template(template["instruction_template"], template["variables"])
            filled_response = self.fill_template(template["response_template"], template["variables"])
            
            # 난이도 할당
            difficulty = np.random.choice(["basic", "intermediate", "advanced"], p=[0.4, 0.4, 0.2])
            
            example = TrainingExample(
                instruction=filled_instruction,
                input_context="",
                output=filled_response,
                category=category,
                difficulty=difficulty,
                source="auto_generated"
            )
            
            examples.append(example)
            
            if (i + 1) % 25 == 0:
                print(f"  진행률: {i + 1}/{count}")
        
        print(f"{category} 카테고리 예시 {len(examples)}개 생성 완료")
        return examples
    
    def fill_template(self, template: str, variables: Dict[str, List[str]]) -> str:
        """템플릿의 변수를 실제 값으로 채우기"""
        filled_template = template
        
        for var_name, var_options in variables.items():
            if f"{{{var_name}}}" in filled_template:
                selected_value = np.random.choice(var_options)
                filled_template = filled_template.replace(f"{{{var_name}}}", selected_value)
        
        return filled_template
    
    def add_manual_examples(self, examples: List[TrainingExample]):
        """수동으로 작성된 고품질 예시들 추가"""
        print(f"\n수동 예시 {len(examples)}개 추가 중...")
        
        conn = sqlite3.connect(self.dataset_path)
        cursor = conn.cursor()
        
        for example in examples:
            cursor.execute('''
                INSERT INTO training_examples 
                (instruction, input_context, output, category, difficulty, source, created_at, quality_score)
                VALUES (?, ?, ?, ?, ?, ?, ?, ?)
            ''', (
                example.instruction,
                example.input_context,
                example.output,
                example.category,
                example.difficulty,
                example.source,
                datetime.now().isoformat(),
                1.0  # 수동 예시는 높은 품질 점수
            ))
        
        conn.commit()
        conn.close()
        
        print(f"수동 예시 추가 완료")
    
    def build_complete_dataset(self, examples_per_category: int = 200) -> Dict[str, int]:
        """완전한 데이터셋 구축"""
        print(f"\n=== 제조업 특화 데이터셋 구축 시작 ===")
        print(f"카테고리별 예시 수: {examples_per_category}개")
        
        total_generated = 0
        category_counts = {}
        
        # 각 카테고리별로 예시 생성
        for category in self.data_templates.keys():
            examples = self.generate_training_examples(category, examples_per_category)
            
            # 데이터베이스에 저장
            self.save_examples_to_db(examples)
            
            category_counts[category] = len(examples)
            total_generated += len(examples)
        
        # 고품질 수동 예시 추가
        manual_examples = self.create_high_quality_examples()
        self.add_manual_examples(manual_examples)
        
        # 통계 업데이트
        self.update_dataset_stats()
        
        print(f"\n=== 데이터셋 구축 완료 ===")
        print(f"총 생성된 예시: {total_generated}개")
        print(f"수동 예시: {len(manual_examples)}개")
        print(f"전체 예시: {total_generated + len(manual_examples)}개")
        
        category_counts["manual_examples"] = len(manual_examples)
        return category_counts
    
    def save_examples_to_db(self, examples: List[TrainingExample]):
        """예시들을 데이터베이스에 저장"""
        conn = sqlite3.connect(self.dataset_path)
        cursor = conn.cursor()
        
        for example in examples:
            cursor.execute('''
                INSERT INTO training_examples 
                (instruction, input_context, output, category, difficulty, source, created_at)
                VALUES (?, ?, ?, ?, ?, ?, ?)
            ''', (
                example.instruction,
                example.input_context,
                example.output,
                example.category,
                example.difficulty,
                example.source,
                datetime.now().isoformat()
            ))
        
        conn.commit()
        conn.close()
    
    def create_high_quality_examples(self) -> List[TrainingExample]:
        """수동으로 작성된 고품질 예시들"""
        return [
            TrainingExample(
                instruction="CNC 머신에서 G01 X50 Y25 F500 S1200 명령의 의미를 상세히 설명해주세요.",
                input_context="",
                output="""이 G코드 명령을 단계별로 설명드리겠습니다:

**G01**: 직선 보간 이송 명령
- 현재 위치에서 지정된 좌표까지 직선으로 이동합니다
- 설정된 이송속도(F값)를 유지하며 이동합니다

**X50 Y25**: 목표 좌표
- X축 50mm, Y축 25mm 위치로 이동
- 절대좌표(G90) 또는 상대좌표(G91) 모드에 따라 해석됩니다

**F500**: 이송속도
- 500mm/min의 속도로 이송합니다
- 이 값은 다음 F 명령이 나올 때까지 유지됩니다

**S1200**: 주축 회전수
- 주축을 1200RPM으로 회전시킵니다
- 가공 중이라면 이 속도를 유지하며 절삭합니다

이 명령은 현재 위치에서 (X50, Y25) 좌표로 500mm/min 속도로 직선 이동하면서 주축을 1200RPM으로 회전시키는 동작을 수행합니다.""",
                category="cnc_operation",
                difficulty="intermediate",
                source="expert_manual"
            ),
            
            TrainingExample(
                instruction="CNC 머신에서 진동이 0.8mm/s 측정되었습니다. 안전한 작업을 위한 조치사항을 알려주세요.",
                input_context="정상 진동 범위: 0.1-0.3mm/s, 주의 범위: 0.3-0.5mm/s, 위험 범위: 0.5mm/s 이상",
                output="""**위험 수준의 진동 감지 - 즉시 조치 필요**

**현재 상황 분석:**
- 측정값: 0.8mm/s (위험 범위)
- 정상 범위(0.1-0.3mm/s) 대비 2.7배 초과
- 즉시 작업 중단이 필요한 수준입니다

**즉시 조치사항:**
1. **작업 즉시 중단** - 가공 중단 및 주축 정지
2. **안전 확보** - 작업자 대피 및 안전구역 설정
3. **전원 차단** - 주 전원 OFF 및 안전 잠금

**점검 절차:**
1. **베어링 점검** - 주축 베어링 상태 확인
2. **고정 상태 점검** - 워크피스 및 공구 고정 확인
3. **기계 정렬** - 가이드웨이 및 구조물 점검
4. **진동 원인 분석** - 주파수 분석으로 원인 특정

**전문가 호출:**
- 즉시 정비 전문가에게 연락
- 진동 측정 장비로 정밀 진단 실시
- 수리 완료 후 재측정으로 안전성 확인

**⚠주의: 이 수준의 진동에서는 절대 작업을 계속해서는 안 됩니다. 기계 손상이나 안전사고 위험이 매우 높습니다.**""",
                category="safety",
                difficulty="advanced",
                source="safety_expert"
            )
        ]
    
    def update_dataset_stats(self):
        """데이터셋 통계 업데이트"""
        conn = sqlite3.connect(self.dataset_path)
        cursor = conn.cursor()
        
        # 기존 통계 삭제
        cursor.execute("DELETE FROM dataset_stats")
        
        # 카테고리별 통계 계산
        cursor.execute('''
            SELECT category, COUNT(*) as count
            FROM training_examples
            GROUP BY category
        ''')
        
        stats = cursor.fetchall()
        
        # 새 통계 저장
        for category, count in stats:
            cursor.execute('''
                INSERT INTO dataset_stats (category, total_examples, last_updated)
                VALUES (?, ?, ?)
            ''', (category, count, datetime.now().isoformat()))
        
        conn.commit()
        conn.close()
        
        print("데이터셋 통계 업데이트 완료")
    
    def get_dataset_summary(self) -> Dict:
        """데이터셋 요약 정보"""
        conn = sqlite3.connect(self.dataset_path)
        cursor = conn.cursor()
        
        # 전체 통계
        cursor.execute("SELECT COUNT(*) FROM training_examples")
        total_examples = cursor.fetchone()[0]
        
        # 카테고리별 통계
        cursor.execute('''
            SELECT category, COUNT(*) as count, AVG(quality_score) as avg_quality
            FROM training_examples
            GROUP BY category
        ''')
        category_stats = cursor.fetchall()
        
        # 난이도별 통계
        cursor.execute('''
            SELECT difficulty, COUNT(*) as count
            FROM training_examples
            GROUP BY difficulty
        ''')
        difficulty_stats = cursor.fetchall()
        
        conn.close()
        
        return {
            "total_examples": total_examples,
            "category_breakdown": {cat: {"count": count, "avg_quality": qual} 
                                 for cat, count, qual in category_stats},
            "difficulty_breakdown": {diff: count for diff, count in difficulty_stats},
            "dataset_quality": "high" if total_examples > 500 else "medium" if total_examples > 200 else "low"
        }

# 파인튜닝 실행 엔진
class ManufacturingFineTuner:
    """제조업 특화 파인튜닝 실행 시스템"""
    
    def __init__(self, base_model_name: str, dataset_path: str, output_dir: str):
        self.base_model_name = base_model_name
        self.dataset_path = dataset_path
        self.output_dir = output_dir
        
        # 훈련 설정
        self.training_config = {
            "learning_rate": 5e-5,
            "batch_size": 4,
            "epochs": 3,
            "max_sequence_length": 512,
            "warmup_steps": 100,
            "save_steps": 200
        }
        
        print(f"제조업 파인튜닝 시스템 초기화")
        print(f"기본 모델: {base_model_name}")
        print(f"출력 디렉토리: {output_dir}")
        
        self.setup_output_directory()
    
    def setup_output_directory(self):
        """출력 디렉토리 설정"""
        os.makedirs(self.output_dir, exist_ok=True)
        os.makedirs(f"{self.output_dir}/checkpoints", exist_ok=True)
        os.makedirs(f"{self.output_dir}/logs", exist_ok=True)
        
        print("출력 디렉토리 설정 완료")
    
    def load_training_data(self) -> List[Dict]:
        """데이터베이스에서 훈련 데이터 로드"""
        print("\n훈련 데이터 로딩 중...")
        
        conn = sqlite3.connect(self.dataset_path)
        cursor = conn.cursor()
        
        cursor.execute('''
            SELECT instruction, input_context, output, category, difficulty
            FROM training_examples
            ORDER BY quality_score DESC, RANDOM()
        ''')
        
        rows = cursor.fetchall()
        conn.close()
        
        training_data = []
        for instruction, input_context, output, category, difficulty in rows:
            # 프롬프트 형식 구성
            if input_context:
                prompt = f"### 지시사항:\n{instruction}\n\n### 입력:\n{input_context}\n\n### 응답:\n{output}"
            else:
                prompt = f"### 지시사항:\n{instruction}\n\n### 응답:\n{output}"
            
            training_data.append({
                "text": prompt,
                "category": category,
                "difficulty": difficulty
            })
        
        print(f"훈련 데이터 로딩 완료: {len(training_data)}개")
        return training_data
    
    def create_training_dataset(self, training_data: List[Dict]) -> Dict:
        """훈련용 데이터셋 생성"""
        print("\n훈련 데이터셋 생성 중...")
        
        # 데이터 분할 (90% 훈련, 10% 검증)
        split_idx = int(len(training_data) * 0.9)
        train_data = training_data[:split_idx]
        val_data = training_data[split_idx:]
        
        print(f"  훈련 데이터: {len(train_data)}개")
        print(f"  검증 데이터: {len(val_data)}개")
        
        # 카테고리별 분포 확인
        train_categories = {}
        for item in train_data:
            cat = item["category"]
            train_categories[cat] = train_categories.get(cat, 0) + 1
        
        print(f"  카테고리 분포: {train_categories}")
        
        return {
            "train": train_data,
            "validation": val_data,
            "categories": train_categories
        }
    
    def start_fine_tuning(self) -> Dict:
        """파인튜닝 실행"""
        print(f"\n=== 제조업 특화 파인튜닝 시작 ===")
        
        # 1. 데이터 로드
        training_data = self.load_training_data()
        dataset = self.create_training_dataset(training_data)
        
        # 2. 파인튜닝 시뮬레이션 (실제로는 복잡한 GPU 학습 과정)
        print(f"\n파인튜닝 시작...")
        print(f"  기본 모델: {self.base_model_name}")
        print(f"  훈련 예시: {len(dataset['train'])}개")
        print(f"  학습률: {self.training_config['learning_rate']}")
        print(f"  배치 크기: {self.training_config['batch_size']}")
        print(f"  에포크: {self.training_config['epochs']}")
        
        # 시뮬레이션된 훈련 과정
        training_metrics = self.simulate_training(dataset)
        
        # 3. 모델 저장 및 평가
        model_info = self.save_fine_tuned_model(training_metrics)
        
        print(f"\n파인튜닝 완료!")
        print(f"  최종 손실: {training_metrics['final_loss']:.4f}")
        print(f"  검증 정확도: {training_metrics['validation_accuracy']:.2%}")
        print(f"  모델 저장 위치: {model_info['save_path']}")
        
        return {
            "model_info": model_info,
            "training_metrics": training_metrics,
            "dataset_info": dataset
        }
    
    def simulate_training(self, dataset: Dict) -> Dict:
        """훈련 과정 시뮬레이션"""
        print(f"\n📈 훈련 진행 상황:")
        
        # 시뮬레이션된 훈련 메트릭
        training_loss = []
        validation_loss = []
        
        num_steps = len(dataset['train']) // self.training_config['batch_size']
        
        for epoch in range(self.training_config['epochs']):
            print(f"\n  에포크 {epoch + 1}/{self.training_config['epochs']}")
            
            # 각 에포크의 손실 감소 시뮬레이션
            epoch_train_loss = 2.0 - (epoch * 0.4) + np.random.normal(0, 0.1)
            epoch_val_loss = 1.8 - (epoch * 0.3) + np.random.normal(0, 0.1)
            
            training_loss.append(max(epoch_train_loss, 0.1))
            validation_loss.append(max(epoch_val_loss, 0.1))
            
            print(f"    훈련 손실: {training_loss[-1]:.4f}")
            print(f"    검증 손실: {validation_loss[-1]:.4f}")
            
            # 진행률 표시
            for step in range(0, num_steps, max(1, num_steps // 5)):
                progress = (step / num_steps) * 100
                print(f"    진행률: {progress:.1f}%", end='\r')
        
        print()  # 새 줄
        
        # 최종 메트릭 계산
        final_metrics = {
            "final_loss": training_loss[-1],
            "validation_loss": validation_loss[-1],
            "validation_accuracy": 0.85 + (0.1 * np.random.random()),  # 85-95% 사이
            "training_history": {
                "train_loss": training_loss,
                "val_loss": validation_loss
            },
            "training_steps": num_steps * self.training_config['epochs'],
            "total_time_minutes": 45 + np.random.randint(0, 30)  # 45-75분
        }
        
        return final_metrics
    
    def save_fine_tuned_model(self, metrics: Dict) -> Dict:
        """파인튜닝된 모델 저장"""
        timestamp = datetime.now().strftime("%Y%m%d_%H%M%S")
        model_name = f"manufacturing_llm_{timestamp}"
        save_path = f"{self.output_dir}/{model_name}"
        
        os.makedirs(save_path, exist_ok=True)
        
        # 모델 정보 저장
        model_info = {
            "model_name": model_name,
            "base_model": self.base_model_name,
            "fine_tuning_date": datetime.now().isoformat(),
            "training_config": self.training_config,
            "performance_metrics": metrics,
            "save_path": save_path,
            "model_size_gb": 35.2,  # 시뮬레이션
            "specialization": "manufacturing",
            "version": "1.0"
        }
        
        # JSON 파일로 저장
        with open(f"{save_path}/model_info.json", 'w', encoding='utf-8') as f:
            json.dump(model_info, f, indent=2, ensure_ascii=False)
        
        # 훈련 로그 저장
        with open(f"{save_path}/training_log.json", 'w', encoding='utf-8') as f:
            json.dump(metrics, f, indent=2)
        
        print(f"모델 파일 저장 완료: {save_path}")
        
        return model_info

# 파인튜닝 완전 데모
def complete_fine_tuning_demo():
    """완전한 파인튜닝 과정 데모"""
    print("=== 제조업 특화 AI 파인튜닝 완전 데모 ===")
    
    # 1. 데이터셋 구축
    print("\n### 1단계: 데이터셋 구축 ###")
    dataset_builder = ManufacturingDatasetBuilder("./demo_manufacturing_dataset.db")
    
    # 카테고리별 데이터 생성
    dataset_stats = dataset_builder.build_complete_dataset(examples_per_category=150)
    
    # 데이터셋 요약 출력
    summary = dataset_builder.get_dataset_summary()
    print(f"\n데이터셋 요약:")
    print(f"  총 예시: {summary['total_examples']}개")
    print(f"  데이터 품질: {summary['dataset_quality']}")
    print(f"  카테고리별 분포:")
    for category, stats in summary['category_breakdown'].items():
        print(f"    - {category}: {stats['count']}개")
    
    # 2. 파인튜닝 실행
    print(f"\n### 2단계: 파인튜닝 실행 ###")
    fine_tuner = ManufacturingFineTuner(
        base_model_name="llama-3.3-70b",
        dataset_path="./demo_manufacturing_dataset.db",
        output_dir="./fine_tuned_models"
    )
    
    # 파인튜닝 실행
    fine_tuning_result = fine_tuner.start_fine_tuning()
    
    # 3. 결과 분석
    print(f"\n### 3단계: 결과 분석 ###")
    model_info = fine_tuning_result["model_info"]
    metrics = fine_tuning_result["training_metrics"]
    
    print(f"파인튜닝 성과:")
    print(f"  모델명: {model_info['model_name']}")
    print(f"  훈련 손실 개선: 2.0 → {metrics['final_loss']:.3f}")
    print(f"  검증 정확도: {metrics['validation_accuracy']:.1%}")
    print(f"  훈련 시간: {metrics['total_time_minutes']}분")
    print(f"  모델 크기: {model_info['model_size_gb']}GB")
    
    print(f"\n제조업 특화 AI 모델 완성!")
    print(f"이제 이 모델은 CNC 가공, 안전 관리, 예방정비 등")
    print(f"제조업 전문 영역에서 더 정확한 답변을 제공할 수 있습니다.")
    
    return fine_tuning_result

# 실행하기
complete_fine_tuning_result = complete_fine_tuning_demo()

마무리

이제 이 글을 통해 초보 개발자도 LLM과 임베딩 모델이 어떻게 협력하여 강력한 로컬 AI 시스템을 만드는지 완전히 이해할 수 있을 것입니다. 각 단계별로 상세한 코드 예시와 함께 실제 구현 방법까지 제공하고 있으므로, 실제 프로젝트에 바로 적용해 보실 수 있을거로 기대합니다. 멋진 로컬AI를 만드시길 바랍니다.

 

함께 읽으면 좋은 글

2025.07.15 - [AI 활용] - [AI코딩.07] 로컬 AI 모델 - Ollama와 Code Llama 활용법

2025.07.21 - [AI 활용] - 제조업 특화 로컬 LLM 도구 TOP 5