'PL/Container'에 해당되는 글 1건

  1. 2022.10.23 [PostgreSQL, Greenplum] 스펙트럼 분석 PL/Python을 활용한 병렬처리 1

이전 포스팅에서 스펙트럼 분석(spectrum analysis, spectral analysis, frequency domain analysis) 에 대해서 소개하였습니다. ==> https://rfriend.tistory.com/690 참조

 

이번 포스팅에서는 Greenplum 에서 PL/Python (Procedural Language)을 활용하여 여러개 그룹의 시계열 데이터에 대해 스펙트럼 분석을 분산병렬처리하는 방법을 소개하겠습니다. (spectrum analysis in parallel using PL/Python on Greenplum database)

 

 

spectrum analysis in parallel using PL/Python on Greenplum

 

 

(1) 다른 주파수를 가진 3개 그룹의 샘플 데이터셋 생성

 

먼저, spectrum 모듈에서 data_cosine() 메소드를 사용하여 주파수(frequency)가 100, 200, 250 인 3개 그룹의 코사인 파동 샘플 데이터를 생성해보겠습니다. (노이즈 A=0.1 만큼이 추가된 1,024개 코사인 데이터 포인트를 가진 예제 데이터) 

 

## generating 3 cosine signals with frequency of (100Hz, 200Hz, 250Hz) respectively
## buried in white noise (amplitude 0.1), a length of N=1024 and the sampling is 1024Hz.
from spectrum import data_cosine

data1 = data_cosine(N=1024, A=0.1, sampling=1024, freq=100)
data2 = data_cosine(N=1024, A=0.1, sampling=1024, freq=200)
data3 = data_cosine(N=1024, A=0.1, sampling=1024, freq=250)

 

 

 

다음으로 Python pandas 모듈을 사용해서 'grp' 라는 칼럼에 'a', 'b', 'c'의 구분자를 추가하고, 'val' 칼럼에는 위에서 생성한 각기 다른 주파수를 가지는 3개의 샘플 데이터셋을 값으로 가지는 DataFrame을 생성합니다. 

 

## making a pandas DataFrame with a group name
import pandas as pd

df1 = pd.DataFrame({'grp': 'a', 'val': data1})
df2 = pd.DataFrame({'grp': 'b', 'val': data2})
df3 = pd.DataFrame({'grp': 'c', 'val': data3})

df = pd.concat([df1, df2, df3])


df.shape
# (3072, 2)


df.head()
# 	grp	val
# 0	a	1.056002
# 1	a	0.863020
# 2	a	0.463375
# 3	a	-0.311347
# 4	a	-0.756723

 

 

 

sqlalchemy 모듈의 create_engine("driver://user:password@host:port/database") 메소드를 사용해서 Greenplum 데이터베이스에 접속한 후에 pandas의 DataFrame.to_sql() 메소드를 사용해서 위에서 만든 pandas DataFrame을 Greenplum DB에 import 하도록 하겠습니다. 

 

이때 index = True, indx_label = 'id' 를 꼭 설정해주어야만 합니다. 그렇지 않을 경우 Greenplum DB에 데이터가 import 될 때 시계열 데이터의 특성이 sequence 가 없이 순서가 뒤죽박죽이 되어서, 이후에 스펙트럼 분석을 할 수 없게 됩니다. 

 

## importing data to Greenplum using pandas 
import sqlalchemy
from sqlalchemy import create_engine

# engine = sqlalchemy.create_engine("postgresql://user:password@host:port/database")
engine = create_engine("postgresql://user:password@ip:5432/database") # set with yours

df.to_sql(name = 'data_cosine', 
          con = engine, 
          schema = 'public', 
          if_exists = 'replace', # {'fail', 'replace', 'append'), default to 'fail'
          index = True, 
          index_label = 'id', 
          chunksize = 100, 
          dtype = {
              'id': sqlalchemy.types.INTEGER(), 
              'grp': sqlalchemy.types.TEXT(), 
              'val': sqlalchemy.types.Float(precision=6)
          })
          
          
          
SELECT * FROM data_cosine order by grp, id LIMIT 5;
# id	grp	val
# 0	a	1.056
# 1	a	0.86302
# 2	a	0.463375
# 3	a	-0.311347
# 4	a	-0.756723


SELECT count(1) AS cnt FROM data_cosine;
# cnt
# 3072

 

 

 

(2) 스펙트럼 분석을 분산병렬처리하는 PL/Python 사용자 정의 함수 정의 (UDF definition)

 

아래의 스펙트럼 분석은 Python scipy 모듈의 signal() 메소드를 사용하였습니다. (spectrum 모듈의 Periodogram() 메소드를 사용해도 동일합니다. https://rfriend.tistory.com/690  참조) 

(Greenplum database의 master node와 segment nodes 에는 numpy와 scipy 모듈이 각각 미리 설치되어 있어야 합니다.)

 

사용자 정의함수의 인풋으로는 (a) 시계열 데이터 array 와 (b) sampling frequency 를 받으며, 아웃풋으로는 스펙트럼 분석을 통해 추출한 주파수(frequency, spectrum)를 텍스트(혹은 int)로 반환하도록 하였습니다. 

 

DROP FUNCTION IF EXISTS spectrum_func(float8[], int);
CREATE OR REPLACE FUNCTION spectrum_func(x float8[], fs int) 
RETURNS text 
AS $$
    from scipy import signal
    import numpy as np
    
    # x: Time series of measurement values
    # fs: Sampling frequency of the x time series. Defaults to 1.0.
    f, PSD = signal.periodogram(x, fs=fs)
    freq = np.argmax(PSD)
    
    return freq
    
$$ LANGUAGE 'plpythonu';

 

 

 

(3) 스펙트럼 분석을 분산병렬처리하는 PL/Python 사용자 정의함수 실행
       (Execution of Spectrum Analysis in parallel on Greenplum)

 

위의 (2)번에서 정의한 스펙트럼 분석 PL/Python 사용자 정의함수 spectrum_func(x, fs) 를 Select 문으로 호출하여 실행합니다. FROM 절에는 sub query로 input에 해당하는 시계열 데이터를 ARRAY_AGG() 함수를 사용해 array 로 묶어주는데요, 이때 ARRAY_AGG(val::float8 ORDER BY id) 로 id 기준으로 반드시 시간의 순서에 맞게 정렬을 해주어야 제대로 스펙트럼 분석이 진행이 됩니다

 

SELECT 
    grp
    , spectrum_func(val_agg, 1024)::int AS freq
FROM (
    SELECT 
        grp
        , ARRAY_AGG(val::float8 ORDER BY id) AS val_agg
    FROM data_cosine
    GROUP BY grp
) a
ORDER BY grp; 

# grp	freq
# a	100
# b	200
# c	250

 

 

우리가 (1)번에서 주파수가 각 100, 200, 250인 샘플 데이터셋을 만들었는데요, 스펙트럼 분석을 해보니 정확하게 주파수를 도출해 내었네요!. 

 

이번 포스팅이 많은 도움이 되었기를 바랍니다. 

행복한 데이터 과학자 되세요!  :-)

 

728x90
반응형
Posted by Rfriend
,