지난번 포스팅에서는 LangChain을 사용해서 RAG (Retrieval-Agumented Generation) 을 구현해보았습니다. 

그때 RunnableParallel() 메소드를 사용해서 사용자 질문을 RunnablePassThrough()와 Retriever 에 동시에 병렬로 처리하는 방법을 한번 소개한 적이 있습니다. 

 

이번 포스팅에서는 LangChain의 RunnableParallel() 을 사용해서 독립적인 다수의 Chain들을 병렬로 처리하는 방법을 소개하겠습니다. 

 

LangChain RunnableParallel() 로 Chain 병렬처리하기 (run multiple processes in parallel using RunnableParallel)

 

 

먼저, langchain, openai 모듈이 설치되어 있지 않다면 터미널에서 pip install 을 사용해서 먼저 설치하시기 바랍니다. (만약 Jupyter Notebook을 사용중이라면, 셀의 제일 앞부분에 '!' 를 추가해주세요)

 

! pip install langchain openai

 

 

 

예시에서 진행한 과업은

 - 사용자로 부터 topic 을 인풋으로 받아서

 - ChatOpenAI 모델을 사용해서 (1) Joke, (2) Poem, (3) Question 을 생성

하기 입니다. 

 

이들 3개의 각 과업에 '|' 을 사용해서 PomtptTemplate + ChatModel + OutputParser 를 chaining 했습니다. 이들 3개 과업의 chain 들은 동일한 topic을 인풋으로 받아서 서로 독립적으로 처리가 가능합니다. 

 

이처럼 독립적으로 수행 가능한 3개의 chain을 'RunnableParallel()' 을 사용해서 병렬로 처리합니다. 그리고 아웃풋은 Dictionary 형태로 chian 의 이름과 매핑이 되어서 생성한 답변을 반환합니다. (아래 제일 마지막 줄의 {'joke': xxx, 'poem': xxx, 'question': xxxx} 결과 참조)

 

from langchain.chat_models import ChatOpenAI
from langchain.prompts import ChatPromptTemplate
from langchain_core.output_parsers import StrOutputParser
from langchain_core.runnables import RunnableParallel

## define PromptTemplates
joke_template = ChatPromptTemplate.from_template("tell me a joke about {topic}")
poem_template = ChatPromptTemplate.from_template("write a 2-line poem about {topic}")
question_template = ChatPromptTemplate.from_template("make a question about {topic}")


## define a ChatModel
#openai_api_key = "sk-xxxxxx..." # set with your API key
model = ChatOpenAI(openai_api_key=OPENAI_API_KEY) 


## define a OutputParser
parser = StrOutputParser()


## Chaining Prompt + ChatModel + OutputParser
joke_chain = joke_template | model | parser
poem_chain = poem_template | model | parser
question_chain = question_template | model | parser

## execute multiple runnables in parallel using RunnableParallel()
map_chain = RunnableParallel(
   joke=joke_chain, 
   poem=poem_chain, 
   qeustion=question_chain
)


# Run all chains
map_chain.invoke({"topic": "snowman"})


# {'joke': 'Why did the snowman go to the spa?\n\nBecause he wanted to chill out and relax!',
#  'poem': "Snowman's frozen smile,\nMelts away in winter's warmth.",
#  'qeustion': 'What are some creative ways to decorate a snowman?'}

 

 

 

RunnableParallel()을 사용하기 전에 각 과업의 chain을 개별적으로 수행했을 때의 소요 시간과, RunnableParallel()을 사용해서 3개의 chain을 동시에 병렬로 처리했을 때의 소요 시간을 측정해서 비교해보았습니다. 그랬더니, RunnableParallel()로 병렬처리했을 때의 시간과 개별로 처리했을 때의 시간이 별 차이가 없음을 알 수 있습니다! 

 

 

[ 'joke' chain 단독 수행 소요 시간: 11s +- 112ms ]

 

%%timeit
joke_chain.invoke({"topic":"snowman"})

# 11 s ± 112 ms per loop (mean ± std. dev. of 7 runs, 1 loop each)

 

 

 

[ 'poem' chain 단독 수행 소요 시간: 891 ms +- 197ms ]

 

%%timeit
poem_chain.invoke({"topic": "snowman"})

# 891 ms ± 197 ms per loop (mean ± std. dev. of 7 runs, 1 loop each)

 

 

 

[ 'question' chain 단독 수행 소요 시간: 751ms +- 128ms ]

 

%%timeit
question_chain.invoke({"topic": "snowman"})

# 751 ms ± 128 ms per loop (mean ± std. dev. of 7 runs, 1 loop each)

 

 

 

[ 'map' chain 으로 3개의 모든 chain 병렬 처리 소요 시간: 922ms +- 101ms]

 

%%timeit
map_chain.invoke({"topic": "snowman"})

# 922 ms ± 101 ms per loop (mean ± std. dev. of 7 runs, 1 loop each)

 

 

 

이번 포스팅이 많은 도움이 되었기를 바랍니다. 

행복한 데이터 과학자 되세요!  :-)

 

728x90
반응형
Posted by Rfriend
,

이전 포스팅에서 스펙트럼 분석(spectrum analysis, spectral analysis, frequency domain analysis) 에 대해서 소개하였습니다. ==> https://rfriend.tistory.com/690 참조

 

이번 포스팅에서는 Greenplum 에서 PL/Python (Procedural Language)을 활용하여 여러개 그룹의 시계열 데이터에 대해 스펙트럼 분석을 분산병렬처리하는 방법을 소개하겠습니다. (spectrum analysis in parallel using PL/Python on Greenplum database)

 

 

spectrum analysis in parallel using PL/Python on Greenplum

 

 

(1) 다른 주파수를 가진 3개 그룹의 샘플 데이터셋 생성

 

먼저, spectrum 모듈에서 data_cosine() 메소드를 사용하여 주파수(frequency)가 100, 200, 250 인 3개 그룹의 코사인 파동 샘플 데이터를 생성해보겠습니다. (노이즈 A=0.1 만큼이 추가된 1,024개 코사인 데이터 포인트를 가진 예제 데이터) 

 

## generating 3 cosine signals with frequency of (100Hz, 200Hz, 250Hz) respectively
## buried in white noise (amplitude 0.1), a length of N=1024 and the sampling is 1024Hz.
from spectrum import data_cosine

data1 = data_cosine(N=1024, A=0.1, sampling=1024, freq=100)
data2 = data_cosine(N=1024, A=0.1, sampling=1024, freq=200)
data3 = data_cosine(N=1024, A=0.1, sampling=1024, freq=250)

 

 

 

다음으로 Python pandas 모듈을 사용해서 'grp' 라는 칼럼에 'a', 'b', 'c'의 구분자를 추가하고, 'val' 칼럼에는 위에서 생성한 각기 다른 주파수를 가지는 3개의 샘플 데이터셋을 값으로 가지는 DataFrame을 생성합니다. 

 

## making a pandas DataFrame with a group name
import pandas as pd

df1 = pd.DataFrame({'grp': 'a', 'val': data1})
df2 = pd.DataFrame({'grp': 'b', 'val': data2})
df3 = pd.DataFrame({'grp': 'c', 'val': data3})

df = pd.concat([df1, df2, df3])


df.shape
# (3072, 2)


df.head()
# 	grp	val
# 0	a	1.056002
# 1	a	0.863020
# 2	a	0.463375
# 3	a	-0.311347
# 4	a	-0.756723

 

 

 

sqlalchemy 모듈의 create_engine("driver://user:password@host:port/database") 메소드를 사용해서 Greenplum 데이터베이스에 접속한 후에 pandas의 DataFrame.to_sql() 메소드를 사용해서 위에서 만든 pandas DataFrame을 Greenplum DB에 import 하도록 하겠습니다. 

 

이때 index = True, indx_label = 'id' 를 꼭 설정해주어야만 합니다. 그렇지 않을 경우 Greenplum DB에 데이터가 import 될 때 시계열 데이터의 특성이 sequence 가 없이 순서가 뒤죽박죽이 되어서, 이후에 스펙트럼 분석을 할 수 없게 됩니다. 

 

## importing data to Greenplum using pandas 
import sqlalchemy
from sqlalchemy import create_engine

# engine = sqlalchemy.create_engine("postgresql://user:password@host:port/database")
engine = create_engine("postgresql://user:password@ip:5432/database") # set with yours

df.to_sql(name = 'data_cosine', 
          con = engine, 
          schema = 'public', 
          if_exists = 'replace', # {'fail', 'replace', 'append'), default to 'fail'
          index = True, 
          index_label = 'id', 
          chunksize = 100, 
          dtype = {
              'id': sqlalchemy.types.INTEGER(), 
              'grp': sqlalchemy.types.TEXT(), 
              'val': sqlalchemy.types.Float(precision=6)
          })
          
          
          
SELECT * FROM data_cosine order by grp, id LIMIT 5;
# id	grp	val
# 0	a	1.056
# 1	a	0.86302
# 2	a	0.463375
# 3	a	-0.311347
# 4	a	-0.756723


SELECT count(1) AS cnt FROM data_cosine;
# cnt
# 3072

 

 

 

(2) 스펙트럼 분석을 분산병렬처리하는 PL/Python 사용자 정의 함수 정의 (UDF definition)

 

아래의 스펙트럼 분석은 Python scipy 모듈의 signal() 메소드를 사용하였습니다. (spectrum 모듈의 Periodogram() 메소드를 사용해도 동일합니다. https://rfriend.tistory.com/690  참조) 

(Greenplum database의 master node와 segment nodes 에는 numpy와 scipy 모듈이 각각 미리 설치되어 있어야 합니다.)

 

사용자 정의함수의 인풋으로는 (a) 시계열 데이터 array 와 (b) sampling frequency 를 받으며, 아웃풋으로는 스펙트럼 분석을 통해 추출한 주파수(frequency, spectrum)를 텍스트(혹은 int)로 반환하도록 하였습니다. 

 

DROP FUNCTION IF EXISTS spectrum_func(float8[], int);
CREATE OR REPLACE FUNCTION spectrum_func(x float8[], fs int) 
RETURNS text 
AS $$
    from scipy import signal
    import numpy as np
    
    # x: Time series of measurement values
    # fs: Sampling frequency of the x time series. Defaults to 1.0.
    f, PSD = signal.periodogram(x, fs=fs)
    freq = np.argmax(PSD)
    
    return freq
    
$$ LANGUAGE 'plpythonu';

 

 

 

(3) 스펙트럼 분석을 분산병렬처리하는 PL/Python 사용자 정의함수 실행
       (Execution of Spectrum Analysis in parallel on Greenplum)

 

위의 (2)번에서 정의한 스펙트럼 분석 PL/Python 사용자 정의함수 spectrum_func(x, fs) 를 Select 문으로 호출하여 실행합니다. FROM 절에는 sub query로 input에 해당하는 시계열 데이터를 ARRAY_AGG() 함수를 사용해 array 로 묶어주는데요, 이때 ARRAY_AGG(val::float8 ORDER BY id) 로 id 기준으로 반드시 시간의 순서에 맞게 정렬을 해주어야 제대로 스펙트럼 분석이 진행이 됩니다

 

SELECT 
    grp
    , spectrum_func(val_agg, 1024)::int AS freq
FROM (
    SELECT 
        grp
        , ARRAY_AGG(val::float8 ORDER BY id) AS val_agg
    FROM data_cosine
    GROUP BY grp
) a
ORDER BY grp; 

# grp	freq
# a	100
# b	200
# c	250

 

 

우리가 (1)번에서 주파수가 각 100, 200, 250인 샘플 데이터셋을 만들었는데요, 스펙트럼 분석을 해보니 정확하게 주파수를 도출해 내었네요!. 

 

이번 포스팅이 많은 도움이 되었기를 바랍니다. 

행복한 데이터 과학자 되세요!  :-)

 

728x90
반응형
Posted by Rfriend
,