시계열 데이터를 분석할 때 제일 처음 확인하고 처리하는 일이 결측값(missing values) 입니다. 이번 포스팅에서는 시계열 데이터의 결측값을 선형 보간(linear interpolation)하는 2가지 방법을 소개하겠습니다. 

 

(1) Python 으로 결측값을 포함하는 예제 시계열 데이터 생성하기

(2) Python 의 for loop 순환문으로 시계열 데이터 결측값을 보간하기

      (interpolation sequentially using Python for loop statement)

(3) Greenplum에서 PL/Python으로 분산병렬처리하여 시계열 데이터 결측값을 보간하기

     (interpolation in parallel using PL/Python on Greenplum)

 

 

 

(1) Python 으로 결측값을 포함하는 예제 시계열 데이터 생성하기

 

샘플 시계열 데이터를 저장할 폴더를 base_dir 에 지정해주었습니다.

 

분석 단위로 사용할 Lot, Cell, Parameter, TimeStamp 의 개수를 지정해 줍니다. 아래 예에서는 각 Lot, Cell, Parameter 별로 100개의 TimeStamp 별로 측정값을 수집하고, 각 분석 단위별로 10개의 결측값을 포함하도록 설정했습니다. 

 

np.random.normal(10, 30, ts_num) 
: 측정값은 정규분포 X~N(10, 3) 의 분포로 부터 ts_num 인 100개를 난수 생성하여 만들었습니다. 

 

nan_mask = np.random.choice(np.arange(ts_num), missing_num)
ts_df_tmp.loc[nan_mask, 'measure_val'] = np.nan

: 각 분석 단위의 100 개의 측정치 중에서 무작위로 missing_num = 10 개를 뽑아서 np.nan 으로 교체하여 결측값으로 변경하였습니다.  

 

하나의 Lot에 Cell 100개, 각 Cell별 Parameter 10개, 각 Parameter 별 TimeStamp의 측정치 100개, 이중 결측치 10개를 포함한 시계열 데이터를 Lot 별로 묶어서(concat) DataFrame을 만들고, 이를 CSV 파일로 내보냅니다. 

 

#%% setting the directories and conditions
base_dir = '/Users/Documents/ts_data/'

## setting the number of IDs' conditions 
lot_num = 1000
cell_num = 100
param_num = 10
missing_num = 10
ts_num = 100 # number of TimeStamps


#%% [Step 1] generating the sample dataset

import numpy as np
import pandas as pd
import os
from itertools import chain, repeat

## defining the UDF
def ts_random_generator(lot_id, cell_num, param_num, ts_num, missing_num, base_dir):
    # blank DataFrame for saving the sample datasets later
    ts_df = pd.DataFrame()
    
    for cell_id in np.arange(cell_num):
        for param_id in np.arange(param_num):
            # making a DataFrame with colums of lot_id, cell_cd, param_id, ts_id, and measure_val
            ts_df_tmp = pd.DataFrame({
                'lot_id': list(chain.from_iterable(repeat([lot_id + 1], ts_num))), 
                'cell_id': list(chain.from_iterable(repeat([cell_id + 1], ts_num))), 
                'param_id': list(chain.from_iterable(repeat([param_id + 1], ts_num))), 
                'timestamp_id': (np.arange(ts_num) + 1), 
                'measure_val': np.random.normal(10, 3, ts_num)# X~N(mean, stddev, size)
            })
            
            # inserting the missing values randomly
            nan_mask = np.random.choice(np.arange(ts_num), missing_num)
            ts_df_tmp.loc[nan_mask, 'measure_val'] = np.nan
            
            # concatenate the generated random dataset(ts_df_tmp) to the lot based DataFrame(ts_df) 
            ts_df = pd.concat([ts_df, ts_df_tmp], axis=0)
    
    # exporting the DataFrame to local csv file
    base_dir = base_dir
    file_nm = 'lot_' + \
        str(lot_id+1).zfill(4) + \
        '.csv'
        
    ts_df.to_csv(os.path.join(base_dir, file_nm), index=False)
    #ts_df.to_csv('/Users/lhongdon/Documents/SK_ON_PoC/ts_data/lot_0001.csv')
    
    print(file_nm, "is successfully generated.") 



#%% Executing the ts_random_generator UDF above

## running the UDF above using for loop statement
for lot_id in np.arange(lot_num):
    ts_random_generator(
        lot_id, 
        cell_num, 
        param_num, 
        ts_num, 
        missing_num, 
        base_dir
        )

 

 

 

위의 코드를 실행하면 for loop 순환문이 lot_num 수만큼 돌면서 ts_random_generator() 사용자 정의함수를 실행시키면서 결측값을 포함한 시계열 데이터 샘플 CSV 파일을 생성하여 지정된 base_dir 폴더에 저장을 합니다. 

(아래 화면 캡쳐 참조)

 

sample time series data list

 

 

아래의 화면캡쳐는 결측값을 포함하는 시계열 데이터 샘플 중에서 LOT_0001 번의 예시입니다. 

 

time series data sample with missing values

 

 

 

 

(2) Python 의 for loop 순환문으로 시계열 데이터 결측값을 보간하기

      (interpolation sequentially using Python for loop statement)

 

아래 코드는 Python으로 Lot, Cell, Parameter ID 별로 for loop 순환문을 사용해서 pandas 의 interpolate() 메소드를 사용해서 시계열 데이터의 결측값을 선형 보간(linear interpolation) 한 것입니다. 

(forward fill 로 먼저 선형 보간을 해주고, 그 다음에 만약에 첫번째 행에 결측값이 있을 경우에 backward fill 로 이후 값과 같은 값으로 결측값을 채워줍니다.)

 

순차적으로 for loop 순환문을 돌기 때문에 시간이 오래 걸립니다.

 

#%% [Step 2] linear interpolation
from datetime import datetime
start_time = datetime.now()


## reading csv files in the base_dir
file_list = os.listdir(base_dir)

for file_nm in file_list:
    # by Lot
    if file_nm[-3:] == "csv":
        # read csv file
        ts_df = pd.read_csv(os.path.join(base_dir, file_nm))
        
        # blank DataFrame for saving the interpolated time series later
        ts_df_interpolated = pd.DataFrame()
        
        # cell & param ID lists
        cell_list = np.unique(ts_df['cell_id'])
        param_list = np.unique(ts_df['param_id'])
        
        # interpolation by lot, cell, and param IDs
        for cell_id in cell_list:
           for param_id in param_list:
               ts_df_tmp = ts_df[(ts_df.cell_id == cell_id) & (ts_df.param_id == param_id)]
               
               ## interpolating the missing values for equaly spaced time series data
               ts_df_tmp.sort_values(by='timestamp_id', ascending=True) # sorting by TimeStamp first
               ts_df_interpolated_tmp = ts_df_tmp.interpolate(method='values') # linear interploation
               ts_df_interpolated_tmp = ts_df_interpolated_tmp.fillna(method='bfill') # backward fill for the first missing row
               
               ts_df_interpolated = pd.concat([ts_df_interpolated, ts_df_interpolated_tmp], axis=0)
        
        # export DataFrame to local folder as a csv file
        ts_df_interpolated.to_csv(os.path.join(interpolated_dir, file_nm), index=False)
        
        print(file_nm, "is successfully interpolated.")


time_elapsed = datetime.now() - start_time
print("----------" * 5)
print("Time elapsed (hh:mm:ss.ms) {}".format(time_elapsed))
print("----------" * 5)

 

# # Before interplolation
# 3,1,1,20,11.160795506036791
# 3,1,1,21,8.155949904188175
# 3,1,1,22,3.1040644143505407
# 3,1,1,23,                   <-- missing
# 3,1,1,24,                   <-- missing
# 3,1,1,25,11.020504352275342
# 3,1,1,26,                   <-- missing
# 3,1,1,27,8.817922501760519
# 3,1,1,28,10.673174873272234
# 3,1,1,29,6.584669096660191
# 3,1,1,30,13.442427337943553

# # After interpolation
# 3,1,1,20,11.160795506036791
# 3,1,1,21,8.155949904188175
# 3,1,1,22,3.1040644143505407
# 3,1,1,23,5.742877726992141  <-- interpolated
# 3,1,1,24,8.381691039633742  <-- interpolated
# 3,1,1,25,11.020504352275342
# 3,1,1,26,9.919213427017931  <-- interpolated
# 3,1,1,27,8.81792250176052
# 3,1,1,28,10.673174873272234
# 3,1,1,29,6.584669096660191
# 3,1,1,30,13.442427337943554

 

 

아래 화면캡쳐는 선형보간하기 전에 결측값이 있을 때와, 이를 선형보간으로 값을 생성한 후의 예시입니다. 

 

linear interpolation for missing data in time series

 

 

 

아래 선 그래프의 파란색 점 부분이 원래 값에서는 결측값 이었던 것을 선형 보간(linear interpolation)으로 채워준 후의 모습입니다. 선형보간이므로 측정된 값으로 선형회귀식을 적합하고, 결측값 부분의 X 값을 입력해서 Y를 예측하는 방식으로 결측값을 보간합니다. 

 

linear interpolation of missing values in time series

 

 

 

아래 코드는 데이터가 Greenplum DB에 적재되어 있다고 했을 때, 

 (2-1) Python으로 Greenplum DB에 access하여 데이터를 Query 해와서 pandas DataFrame으로 만들고 

 (2-2) Pytnon pandas 의 interpolate() 메소드를 사용해서 선형보간을 한 다음에 

 (2-3) 선형보간된 DataFrame을 pandas 의 to_sql() 메소드를 사용해서 다시 Greenplum DB에 적재

하는 코드입니다. 이를 for loop 순환문을 사용해서 Lot 의 개수만큼 실행시켜 주었습니다. 

 

순차적으로 for loop 순환문을 돌기 때문에 시간이 오래 걸립니다.

 

#%% Greenplum credentials
user = 'username' 
password = 'password' 
host = 'ip_address'
port = 'port'
db = 'databasename'

connection_string = "postgresql://{user}:{password}@{host}:{port}/{db}".\
        format(user=user, 
               password=password, 
               host=host, 
               port=port,
               db=db)

#%%
# helper function: query to pandas DataFrame
def gpdb_query(query):
    import psycopg2 as pg
    import pandas as pd
    
    conn = pg.connect(connection_string)
    cursor = conn.cursor()
    
    cursor.execute(query)
    col_names = [desc[0] for desc in cursor.description]
    
    result_df = pd.DataFrame(cursor.fetchall(), columns=col_names)
    
    cursor.close()
    conn.close()
    
    return result_df


#%% 
# UDF for running a query

def interpolator(lot_id):
    
    #import pandas as pd
    
    query = """
        SELECT * 
        FROM ts_data 
        WHERE 
            lot_id = {lot_id}
    """.format(
            lot_id = lot_id)
    
    ts_df = gpdb_query(query)
    ts_df = ts_df.astype({
        'measure_val': float
        })
    
    ## interpolating the missing values for equaly spaced time series data
    ts_df_interpolated = pd.DataFrame()
    
    for cell_id in (np.arange(cell_num)+1):
        for param_id in (np.arange(param_num)+1):
            ts_df_tmp = ts_df[(ts_df.cell_id == cell_id) & (ts_df.param_id == param_id)]
    
            ts_df_tmp.sort_values(by='timestamp_id', ascending=True) # sorting by TimeStamp first
            ts_df_interpolated_tmp = ts_df_tmp.interpolate(method='values') # linear interploation
            ts_df_interpolated_tmp = ts_df_interpolated_tmp.fillna(method='bfill') # backward fill for the first missing row
            
            ts_df_interpolated = pd.concat([ts_df_interpolated, ts_df_interpolated_tmp], axis=0)
    
    # export DataFrame to local folder as a csv file
    #ts_df_interpolated.to_csv(os.path.join(interpolated_dir, file_nm), index=False)        
    #print(file_nm, "is successfully interpolated.")
    
    return ts_df_interpolated



#%% 
# UDF for importing pandas DataFrame to Greenplum DB
def gpdb_importer(lot_id, connection_string):
    
    import sqlalchemy
    from sqlalchemy import create_engine
    
    engine = create_engine(connection_string)
    
    # interpolation
    ts_data_interpolated = interpolator(lot_id)
    
    # inserting to Greenplum    
    ts_data_interpolated.to_sql(
        name = 'ts_data_interpolated_python', 
        con = engine, 
        schema = 'equipment', 
        if_exists = 'append', 
        index = False, 
        dtype = {'lot_id': sqlalchemy.types.INTEGER(), 
                 'cell_id': sqlalchemy.types.INTEGER(), 
                 'param_id': sqlalchemy.types.INTEGER(),
                 'timestamp_id': sqlalchemy.types.INTEGER(), 
                 'measure_val': sqlalchemy.types.Float(precision=6)
                 })
    

#%%
from datetime import datetime
start_time = datetime.now()

import pandas as pd
import os
import numpy as np

for lot_id in (np.arange(lot_num)+1):
    gpdb_importer(lot_id, connection_string)
    print("lot_id", lot_id, "is successfully interpolated.")

time_elapsed = datetime.now() - start_time
print("----------" * 5)
print("Time elapsed (hh:mm:ss.ms) {}".format(time_elapsed))
print("----------" * 5)

 

 

 

 

(3) Greenplum에서 PL/Python으로 분산병렬처리하여 시계열 데이터 결측값을 보간하기

     (interpolation in parallel using PL/Python on Greenplum)

Greenplum에서 PL/Python으로 병렬처리할 때는 (a) 사용자 정의 함수(UDF) 정의, (b) 사용자 정의 함수 실행의 두 단계를 거칩니다. 

 

Greenplum DB에서 PL/Python으로 분산병렬처리를 하면 위의 (2)번에서 Python으로 for loop 순환문으로 순차처리한 것 대비 Greenplum DB 내 노드의 개수에 비례하여 처리 속도가 줄어들게 됩니다. (가령, 노드가 8개이면 병렬처리의 총 처리 소요시간은 순차처리했을 때의 총 소요시간의 1/8 로 줄어듭니다.) 

 

 

parallel processing using PL/Python on Greenplum DB

 

 

(3-1) PL/Python 으로 시계열 데이터 결측값을 선형보간하는 사용자 정의함수 정의 (define a UDF)

 

-- defining the PL/Python UDF
DROP FUNCTION IF EXISTS plpy_interp(numeric[]);
CREATE OR REPLACE FUNCTION plpy_interp(measure_val_arr numeric[]) 
RETURNS numeric[]
AS $$
	import numpy as np
	import pandas as pd
	
	measure_val = np.array(measure_val_arr, dtype='float')
	
	ts_df = pd.DataFrame({
	   'measure_val': measure_val
	    })
	
	# interpolation by lot, cell, and param IDs               
	ts_df_interpolated = ts_df.interpolate(method='values') # linear interploation
	ts_df_interpolated = ts_df_interpolated.fillna(method='bfill') # backward fill for the first missing row
	
	return ts_df_interpolated['measure_val']
	        
$$ LANGUAGE 'plpythonu';

 

 

 

(3-2) 위에서 정의한 시계열 데이터 결측값을 선형보간하는 PL/Python 사용자 정의함수 실행

 

 

PL/Python의 input 으로는 SQL의 array_agg() 함수를 사용해서 만든 Array 데이터를 사용하며, PL/Python에서는 SQL의 Array를 Python의 List 로 변환(converting) 합니다. 

 

-- array aggregation as an input
DROP TABLE IF EXISTS tab1;
CREATE TEMPORARY TABLE tab1 AS
		SELECT 
			lot_id
			, cell_id
			, param_id
			, ARRAY_AGG(timestamp_id ORDER BY timestamp_id) AS timestamp_id_arr
			, ARRAY_AGG(measure_val ORDER BY timestamp_id) AS measure_val_arr
		FROM ts_data
		GROUP BY lot_id, cell_id, param_id
DISTRIBUTED RANDOMLY ;
		
ANALYZE tab1;		


-- executing the PL/Python UDF
DROP TABLE IF EXISTS ts_data_interpolated;
CREATE TABLE ts_data_interpolated AS (
	SELECT 
		lot_id 
		, cell_id 
		, param_id 
		, timestamp_id_arr
		, plpy_interp(measure_val_arr) AS measure_val_arr -- plpython UDF
	FROM tab1 AS a 
) DISTRIBUTED BY (lot_id);

 

 

 

아래 코드는 numeric array 형태로 반환한 선형보간 후의 데이터를 unnest() 함수를 사용해서 보기에 편하도록 long format 으로 풀어준 것입니다. 

 

-- display the interpolated result
SELECT 
	lot_id
	, cell_id 
	, param_id
	, UNNEST(timestamp_id_arr) AS timestamp_id
	, UNNEST(measure_val_arr) AS measure_val
FROM ts_data_interpolated
WHERE lot_id = 1 AND cell_id = 1 AND param_id = 1
ORDER BY lot_id, cell_id, param_id, timestamp_id
LIMIT 100;

 

 

결측값을 포함하고 있는 원래 데이터셋을 아래 SQL query 로 조회해서, 위의 선형보간 된 후의 데이터셋과 비교해볼 수 있습니다. 

 

-- original dataset with missing value
SELECT 
	lot_id
	, cell_id 
	, param_id
	, timestamp_id
	, measure_val
FROM ts_data
WHERE lot_id = 1 AND cell_id = 1 AND param_id = 1
ORDER BY lot_id, cell_id, param_id, timestamp_id
LIMIT 100;

 

 

이번 포스팅이 많은 도움이 되었기를 바랍니다. 

행복한 데이터 과학자 되세요!  :-)

 

728x90
반응형
Posted by Rfriend
,

이전 포스팅에서 스펙트럼 분석(spectrum analysis, spectral analysis, frequency domain analysis) 에 대해서 소개하였습니다. ==> https://rfriend.tistory.com/690 참조

 

이번 포스팅에서는 Greenplum 에서 PL/Python (Procedural Language)을 활용하여 여러개 그룹의 시계열 데이터에 대해 스펙트럼 분석을 분산병렬처리하는 방법을 소개하겠습니다. (spectrum analysis in parallel using PL/Python on Greenplum database)

 

 

spectrum analysis in parallel using PL/Python on Greenplum

 

 

(1) 다른 주파수를 가진 3개 그룹의 샘플 데이터셋 생성

 

먼저, spectrum 모듈에서 data_cosine() 메소드를 사용하여 주파수(frequency)가 100, 200, 250 인 3개 그룹의 코사인 파동 샘플 데이터를 생성해보겠습니다. (노이즈 A=0.1 만큼이 추가된 1,024개 코사인 데이터 포인트를 가진 예제 데이터) 

 

## generating 3 cosine signals with frequency of (100Hz, 200Hz, 250Hz) respectively
## buried in white noise (amplitude 0.1), a length of N=1024 and the sampling is 1024Hz.
from spectrum import data_cosine

data1 = data_cosine(N=1024, A=0.1, sampling=1024, freq=100)
data2 = data_cosine(N=1024, A=0.1, sampling=1024, freq=200)
data3 = data_cosine(N=1024, A=0.1, sampling=1024, freq=250)

 

 

 

다음으로 Python pandas 모듈을 사용해서 'grp' 라는 칼럼에 'a', 'b', 'c'의 구분자를 추가하고, 'val' 칼럼에는 위에서 생성한 각기 다른 주파수를 가지는 3개의 샘플 데이터셋을 값으로 가지는 DataFrame을 생성합니다. 

 

## making a pandas DataFrame with a group name
import pandas as pd

df1 = pd.DataFrame({'grp': 'a', 'val': data1})
df2 = pd.DataFrame({'grp': 'b', 'val': data2})
df3 = pd.DataFrame({'grp': 'c', 'val': data3})

df = pd.concat([df1, df2, df3])


df.shape
# (3072, 2)


df.head()
# 	grp	val
# 0	a	1.056002
# 1	a	0.863020
# 2	a	0.463375
# 3	a	-0.311347
# 4	a	-0.756723

 

 

 

sqlalchemy 모듈의 create_engine("driver://user:password@host:port/database") 메소드를 사용해서 Greenplum 데이터베이스에 접속한 후에 pandas의 DataFrame.to_sql() 메소드를 사용해서 위에서 만든 pandas DataFrame을 Greenplum DB에 import 하도록 하겠습니다. 

 

이때 index = True, indx_label = 'id' 를 꼭 설정해주어야만 합니다. 그렇지 않을 경우 Greenplum DB에 데이터가 import 될 때 시계열 데이터의 특성이 sequence 가 없이 순서가 뒤죽박죽이 되어서, 이후에 스펙트럼 분석을 할 수 없게 됩니다. 

 

## importing data to Greenplum using pandas 
import sqlalchemy
from sqlalchemy import create_engine

# engine = sqlalchemy.create_engine("postgresql://user:password@host:port/database")
engine = create_engine("postgresql://user:password@ip:5432/database") # set with yours

df.to_sql(name = 'data_cosine', 
          con = engine, 
          schema = 'public', 
          if_exists = 'replace', # {'fail', 'replace', 'append'), default to 'fail'
          index = True, 
          index_label = 'id', 
          chunksize = 100, 
          dtype = {
              'id': sqlalchemy.types.INTEGER(), 
              'grp': sqlalchemy.types.TEXT(), 
              'val': sqlalchemy.types.Float(precision=6)
          })
          
          
          
SELECT * FROM data_cosine order by grp, id LIMIT 5;
# id	grp	val
# 0	a	1.056
# 1	a	0.86302
# 2	a	0.463375
# 3	a	-0.311347
# 4	a	-0.756723


SELECT count(1) AS cnt FROM data_cosine;
# cnt
# 3072

 

 

 

(2) 스펙트럼 분석을 분산병렬처리하는 PL/Python 사용자 정의 함수 정의 (UDF definition)

 

아래의 스펙트럼 분석은 Python scipy 모듈의 signal() 메소드를 사용하였습니다. (spectrum 모듈의 Periodogram() 메소드를 사용해도 동일합니다. https://rfriend.tistory.com/690  참조) 

(Greenplum database의 master node와 segment nodes 에는 numpy와 scipy 모듈이 각각 미리 설치되어 있어야 합니다.)

 

사용자 정의함수의 인풋으로는 (a) 시계열 데이터 array 와 (b) sampling frequency 를 받으며, 아웃풋으로는 스펙트럼 분석을 통해 추출한 주파수(frequency, spectrum)를 텍스트(혹은 int)로 반환하도록 하였습니다. 

 

DROP FUNCTION IF EXISTS spectrum_func(float8[], int);
CREATE OR REPLACE FUNCTION spectrum_func(x float8[], fs int) 
RETURNS text 
AS $$
    from scipy import signal
    import numpy as np
    
    # x: Time series of measurement values
    # fs: Sampling frequency of the x time series. Defaults to 1.0.
    f, PSD = signal.periodogram(x, fs=fs)
    freq = np.argmax(PSD)
    
    return freq
    
$$ LANGUAGE 'plpythonu';

 

 

 

(3) 스펙트럼 분석을 분산병렬처리하는 PL/Python 사용자 정의함수 실행
       (Execution of Spectrum Analysis in parallel on Greenplum)

 

위의 (2)번에서 정의한 스펙트럼 분석 PL/Python 사용자 정의함수 spectrum_func(x, fs) 를 Select 문으로 호출하여 실행합니다. FROM 절에는 sub query로 input에 해당하는 시계열 데이터를 ARRAY_AGG() 함수를 사용해 array 로 묶어주는데요, 이때 ARRAY_AGG(val::float8 ORDER BY id) 로 id 기준으로 반드시 시간의 순서에 맞게 정렬을 해주어야 제대로 스펙트럼 분석이 진행이 됩니다

 

SELECT 
    grp
    , spectrum_func(val_agg, 1024)::int AS freq
FROM (
    SELECT 
        grp
        , ARRAY_AGG(val::float8 ORDER BY id) AS val_agg
    FROM data_cosine
    GROUP BY grp
) a
ORDER BY grp; 

# grp	freq
# a	100
# b	200
# c	250

 

 

우리가 (1)번에서 주파수가 각 100, 200, 250인 샘플 데이터셋을 만들었는데요, 스펙트럼 분석을 해보니 정확하게 주파수를 도출해 내었네요!. 

 

이번 포스팅이 많은 도움이 되었기를 바랍니다. 

행복한 데이터 과학자 되세요!  :-)

 

728x90
반응형
Posted by Rfriend
,

지난번 포스팅에서는 Python의 statsmodels  모듈을 이용하여 여러개의 숫자형 변수에 대해 집단 간 평균의 차이가 있는지를 for loop 순환문을 사용하여 검정하는 방법(rfriend.tistory.com/639)을 소개하였습니다. 

 

Python에서 for loop 문을 사용하면 순차적으로 처리 (sequential processing) 를 하게 되므로, 일원분산분석을 해야 하는 숫자형 변수의 개수가 많아질 수록 선형적으로 처리 시간이 증가하게 됩니다. 

 

Greenplum에서 PL/Python (또는 PL/R) 을 사용하면 일원분산분석의 대상의 되는 숫자형 변수가 매우 많고 데이터 크기가 크더라도 분산병렬처리 (distributed parallel processing) 하여 ANOVA test 를 처리할 수 있으므로 신속하게 분석을 할 수 있는 장점이 있습니다.

 

더불어서, 데이터가 저장되어 있는 DB에서 데이터의 이동 없이(no data I/O, no data movement), In-DB 처리/분석이 되므로 work-flow 가 간소화되고 batch scheduling 하기에도 편리한 장점이 있습니다.  

 

만약 데이터는 DB에 있고, 애플리케이션도 DB를 바라보고 있고, 분석은 Python 서버 (또는 R 서버)에서 하는 경우라면, 분석을 위해 DB에서 데이터를 samfile 로 떨구고, 이를 Python에서 pd.read_csv()로 읽어들여서 분석하고, 다시 결과를 text file로 떨구고, 이 text file을 ftp로 DB 서버로 이동하고, psql로 COPY 문으로 테이블에 insert 하는 workflow ... 관리 포인트가 많아서 정신 사납고 복잡하지요?!  

 

자, 이제 Greenplum database에서 PL/Python으로 일원분산분석을 병렬처리해서 집단 간 여러개의 개별 변수별 평균 차이가 있는지 검정을 해보겠습니다. 

 

 

(1) 여러 개의 변수를 가지는 샘플 데이터 만들기

 

정규분포로 부터 난수를 발생시켜서 3개 그룹별로 각 30개 씩의 샘플 데이터를 생성하였습니다. 숫자형 변수로는 'x1', 'x2', 'x3', 'x4'의 네 개의 변수를 생성하였습니다. 이중에서  'x1', 'x2'는 3개 집단이 모두 동일한 평균과 분산을 가지는 정규분포로 부터 샘플을 추출하였고, 반면에 'x3', 'x4'는 3개 집단 중 2개는 동일한 평균과 분산의 정규분포로 부터 샘플을 추출하고 나머지 1개 집단은 다른 평균을 가지는 정규분포로 부터 샘플을 추출하였습니다. (뒤에  one-way ANOVA 검정을 해보면 'x3', 'x4'에 대한 집단 간 평균 차이가 있는 것으로 결과가 나오겠지요?!)

 

import numpy as np
import pandas as pd

# generate 90 IDs 
id = np.arange(90) + 1 

# Create 3 groups with 30 observations in each group. 
from itertools import chain, repeat 
grp = list(chain.from_iterable((repeat(number, 30) for number in [1, 2, 3]))) 

# generate random numbers per each groups from normal distribution 
np.random.seed(1004) 

# for 'x1' from group 1, 2 and 3
x1_g1 = np.random.normal(0, 1, 30) 
x1_g2 = np.random.normal(0, 1, 30) 
x1_g3 = np.random.normal(0, 1, 30) 

# for 'x2' from group 1, 2 and 3 
x2_g1 = np.random.normal(10, 1, 30) 
x2_g2 = np.random.normal(10, 1, 30) 
x2_g3 = np.random.normal(10, 1, 30) 

# for 'x3' from group 1, 2 and 3 
x3_g1 = np.random.normal(30, 1, 30) 
x3_g2 = np.random.normal(30, 1, 30) 
x3_g3 = np.random.normal(50, 1, 30) 

# different mean 
x4_g1 = np.random.normal(50, 1, 30) 
x4_g2 = np.random.normal(50, 1, 30) 
x4_g3 = np.random.normal(20, 1, 30) 

# different mean # make a DataFrame with all together 
df = pd.DataFrame({
    'id': id, 'grp': grp, 
    'x1': np.concatenate([x1_g1, x1_g2, x1_g3]), 
    'x2': np.concatenate([x2_g1, x2_g2, x2_g3]), 
    'x3': np.concatenate([x3_g1, x3_g2, x3_g3]), 
    'x4': np.concatenate([x4_g1, x4_g2, x4_g3])}) 
    

df.head()

 

id

grp

x1

x2

x3

x4

1

1

0.594403

10.910982

29.431739

49.232193

2

1

0.402609

9.145831

28.548873

50.434544

3

1

-0.805162

9.714561

30.505179

49.459769

4

1

0.115126

8.885289

29.218484

50.040593

5

1

-0.753065

10.230208

30.072990

49.601211

 

 

위에서 만든 가상의 샘플 데이터를 Greenplum DB에 'sample_tbl' 이라는 이름의 테이블로 생성해보겠습니다.  Python pandas의  to_sql()  메소드를 사용하면 pandas DataFrame을 쉽게 Greenplum DB (또는 PostgreSQL DB)에 uploading 할 수 있습니다.  

 

# creating a table in Greenplum by importing pandas DataFrame
conn = "postgresql://gpadmin:changeme@localhost:5432/demo"

df.to_sql('sample_tbl', 
         conn, 
         schema = 'public', 
         if_exists = 'replace', 
         index = False)
 

 

 

 

Jupyter Notebook에서 Greenplum DB에 접속해서 SQL로 이후 작업을 진행하겠습니다.

(Jupyter Notebook에서 Greenplum DB access 하고 SQL query 실행하는 방법은 rfriend.tistory.com/572 참조하세요)

 

-- 여기서 부터는 Jupyter Notebook에서 실행한 것입니다. --

%load_ext sql

# postgresql://Username:Password@Host:Port/Database
%sql postgresql://gpadmin:changeme@localhost:5432/demo
[Out][
'Connected: gpadmin@demo'

 

 

 

위 (1) 에서 pandas 의 to_sql() 로 importing 한 sample_tbl 테이블에서 5개 행을 조회해보겠습니다. 

 

%sql select * from sample_tbl order by id limit 5;
 * postgresql://gpadmin:***@localhost:5432/demo
5 rows affected.

[Out]
id	grp	x1	                x2	                x3	                x4
1	1	0.594403067344276	10.9109819091195	29.4317394311833	49.2321928075563
2	1	0.402608708677309	9.14583073327387	28.54887315985  	50.4345438286737
3	1	-0.805162233589535	9.71456131309311	30.5051787625131	49.4597693977764
4	1	0.115125695763445	8.88528940547472	29.2184835450055	50.0405932387396
5	1	-0.753065219532709	10.230207786414 	30.0729900069999	49.6012106088522

 

 

 

(2) 데이터 구조 변경: reshaping from wide to long

 

PL/Python에서 작업하기 쉽도록 테이블 구조를  wide format에서 long format 으로 변경하겠습니다. union all 로 해서 칼럼 갯수 만큼 위/아래로 append  해나가면 되는데요, DB 에서 이런 형식의 데이터를 관리하고 있다면 아마도 이미 long format 으로 관리하고 있을 가능성이 높습니다. (새로운 데이터가 수집되면 계속  insert into 하면서 행을 밑으로 계속 쌓아갈 것이므로...)

 

%%sql
-- reshaping a table from wide to long
drop table if exists sample_tbl_long;
create table sample_tbl_long as (
    select id, grp, 'x1' as col, x1 as val from sample_tbl
    union all 
    select id, grp, 'x2' as col, x2 as val from sample_tbl
    union all 
    select id, grp, 'x3' as col, x3 as val from sample_tbl
    union all 
    select id, grp, 'x4' as col, x4 as val from sample_tbl
) distributed randomly;

 * postgresql://gpadmin:***@localhost:5432/demo
Done.
360 rows affected.



%sql select * from sample_tbl_long order by id, grp, col limit 8;

[Out]
 * postgresql://gpadmin:***@localhost:5432/demo
8 rows affected.
id	grp	col	val
1	1	x1	0.594403067344276
1	1	x2	10.9109819091195
1	1	x3	29.4317394311833
1	1	x4	49.2321928075563
2	1	x1	0.402608708677309
2	1	x2	9.14583073327387
2	1	x3	28.54887315985
2	1	x4	50.4345438286737

 

 

 

(3) 분석 결과 반환 composite type 정의

 

일원분산분석 결과를 반환받을 때 각 분석 대상 변수 별로 (a) F-통계량, (b) p-value 의 두 개 값을 float8 데이터 형태로  반환받는 composite type 을 미리 정의해놓겠습니다. 

%%sql
-- Creating a coposite return type
drop type if exists plpy_anova_type cascade;
create type plpy_anova_type as (
    f_stat float8
    , p_val float8
);

 * postgresql://gpadmin:***@localhost:5432/demo
Done.
Done.

 

 

 

(4)  일원분산분석(one-way ANOVA) PL/Python 사용자 정의함수 정의

 

집단('grp')과 측정값('val')을 input 으로 받고, statsmodels 모듈의 sm.stats.anova_lm() 메소드로 일원분산분석을 하여 결과 테이블에서 'F-통계량'과 'p-value'만 인덱싱해서 반환하는 PL/Python 사용자 정의 함수를 정의해보겠습니다. 

 

%%sql
-- Creating the PL/Python UDF of ANOVA
drop function if exists plpy_anova_func(text[], float8[]);
create or replace function plpy_anova_func(grp text[], val float8[])
returns plpy_anova_type 
as $$
    import pandas as pd  
    import statsmodels.api as sm
    from statsmodels.formula.api import ols
    
    df = pd.DataFrame({'grp': grp, 'val': val})
    model = ols('val ~ grp', data=df).fit()
    anova_result = sm.stats.anova_lm(model, typ=1)
    return {'f_stat': anova_result.loc['grp', 'F'], 
            'p_val': anova_result.loc['grp', 'PR(>F)']}
$$ language 'plpythonu';

 * postgresql://gpadmin:***@localhost:5432/demo
Done.
Done.

 

 

 

(5) 일원분산분석(one-way ANOVA) PL/Python 함수 분산병렬처리 실행

 

PL/Python 사용자 정의함수는 SQL query 문으로 실행합니다. 이때 PL/Python 이 'F-통계량'과 'p-value'를 반환하도록 UDF를 정의했으므로 아래처럼 (plpy_anova_func(grp_arr, val_arr)).* 처럼 ().* 으로 해서 모든 결과('F-통계량' & 'p-value')를 반환하도록 해줘야 합니다. (빼먹고 실수하기 쉬우므로 ().*를 빼먹지 않도록 주의가 필요합니다)

 

이렇게 하면 변수별로 segment nodes 에서 분산병렬로 각각 처리가 되므로, 변수가 수백~수천개가 있더라도 (segment nodes가 많이 있다는 가정하에) 분산병렬처리되어 신속하게 분석을 할 수 있습니다. 그리고 결과는 바로 Greenplum DB table에 적재가 되므로 이후의 application이나 API service에서 가져다 쓰기에도 무척 편리합니다. 

 

%%sql
-- Executing the PL/Python UDF of ANOVA
drop table if exists plpy_anova_table;
create table plpy_anova_table as (
    select 
        col
        , (plpy_anova_func(grp_arr, val_arr)).*
    from (
        select
            col
            , array_agg(grp::text order by id) as grp_arr
            , array_agg(val::float8 order by id) as val_arr
        from sample_tbl_long
        group by col
    ) a
) distributed randomly;

 * postgresql://gpadmin:***@localhost:5432/demo
Done.
4 rows affected.

 

 

총 4개의 각 변수별 일원분산분석 결과를 조회해보면 아래와 같습니다. 

%%sql
select * from plpy_anova_table order by col;

[Out]
 * postgresql://gpadmin:***@localhost:5432/demo
4 rows affected.
col	f_stat	p_val
x1	0.773700830155438	0.46445029458511966
x2	0.20615939957339052	0.8140997216173114
x3	4520.512608893724	1.2379278415456727e-88
x4	9080.286130418674	1.015467388498996e-101

 

 

이번 포스팅이 많은 도움이 되었기를 바랍니다. 

행복한 데이터 과학자 되세요!  :-)

 

728x90
반응형
Posted by Rfriend
,

Greenplum DB는 여려개의 PostgreSQL DB를 합쳐놓은 shared-nothing architecture 의 MPP (Massively Parallel Processing) Database 입니다.  손과 발이 되는 여러개의 cluster nodes에 머리가 되는 Master host 가 조율/조정/지시해가면서 분산하여 병렬로 일을 시키고, 각 cluster nodes의 연산처리 결과를 master host 가 모아서 취합하여 최종 결과를 반환하는 방식으로 일을 하기 때문에 (1) 대용량 데이터도 (2) 매우 빠르게 처리할 수 있습니다. 

 

이번 포스팅에서는 여기에서 한발 더 나아가서, Procedural Language Extension (PL/X) 을 사용하여 Python, R, Java, C, Perl, SQL 등의 프로그래밍 언어를 Greenplum DB 내에서 사용하여 데이터의 이동 없이 분산 병렬처리하는 방법을 소개하겠습니다. 

 

Massively Parallel Processing through PL/Python on Greenplum DB

 

난수를 발생시켜서 만든 가상의 데이터셋을 사용하여 PL/Python으로 Random Forest의 Feature Importance를 숫자형 목표변수('y1_num') 그룹과 범주형 목표변수('y2_cat') 그룹 별로 분산병렬처리하는 간단한 예를 들어보겠습니다. 

 

(1) 가상의 데이터셋 만들기
   - group: 2개의 그룹

   - X: 각 그룹별 100개의 관측치별 200개의 숫자형 변수를 가지는 가상의 데이터셋

   - y: 숫자형 목표변수 'y1_num', 범주형 목표변수 'y2_cat'

      (a) y1_num = x1*7.0 + x2*6.0 - x3*4.0 + x4*5.0 + 0.001*random()

      (b) y2_cat = case when (x4*7.0 + x5*6.0 - x6*4.0 + x4*5.0 + 0.001*random()) >= 9 then 1 else 0
의 함수로 부터 만듦. 

(2) PL/Python 함수 정의하기
   : (a) 숫자형 목표변수('y1_num') 그룹은 Random Forest Regressor를,  (b) 범주형 목표변수('y2_cat') 그룹은 Random Forest Classifier 를 각 그룹별로 분산병렬로 훈련시킨 후,
   : 각 그룹별 Random Forest Regressor 모델별 200개의 숫자형 변수별 Feature Importance를 반환

(3) PL/Python 함수 실행하기 

(4) 각 그룹별 변수별 Random Forest 의 Feature Importance 를 조회하기

 

를 해 보겠습니다. 

 

 

 

(1) 가상의 예제 데이터셋 만들기

   - group: 2개의 그룹

         (목표변수로 하나는 숫자형, 하나는 범주형 값을 가지는 2개의 X&y 데이터셋 그룹을 생성함.)

 

   - X: 각 그룹별 100개의 관측치별 200개의 숫자형 변수를 가지는 가상의 데이터셋

         (SQL의 random() 함수로 0~1 사이의 난수 실수값을 생성함.)

 

   - y: 숫자형 목표변수 'y1_num', 범주형 목표변수 'y2_cat'

       (a) y1_num = x1*7.0 + x2*6.0 - x3*4.0 + x4*5.0 + 0.001*random()

       (b) y2_cat = case when (x4*7.0 + x5*6.0 - x6*4.0 + x4*5.0 + 0.001*random()) >= 9 then 1 else 0 

             (y1_num, y2_cat 값을 만들때 x 변수에 곱하는데 사용한 *7.0, *6.0, *5.0 은 아무런 의미 없습니다.

             그냥 가상의 예제 샘플 데이터를 만들려고 임의로 선택해서 곱해준 값입니다.)

 

아래의 예제는 In-DB 처리를 염두에 두고, 200개의 숫자형 X 변수들과 한개의 숫자형 y 변수를 DB의 테이블에 "col_nm"이라는 칼럼에는 변수 이름을, "col_val"에는  변수 값을 long form 으로 생성해서 저장해 놓은 것입니다. 

 

나중에 PL/Python의 함수 안에서 pandas 의 pivot_table() 함수를 사용해서 wide form 으로 DataFrame의 형태를 재구조화해서 random forest 를 분석하게 됩니다. 

 

제 맥북에서 도커로 만든 Greenplum 에는 1개의 master node와 2개의 segment nodes 가 있는데요, 편의상 cross join 으로 똑같은 칼럼과 값을 가지는 설명변수 X 데이터셋을 2의 segment nodes에 replication 해서 그룹1('grp=1'), 그룹2('grp=2')를 만들었습니다.

 

그리고 여기에 목표변수 y 값으로는 숫자형 목표변수 'y1_num' 칼럼의 값에 대해서는 그룹1('grp=1'), 범주형 목표변수 'y2_cat' 칼럼의 값에 대해서는 그룹2('grp=2')를 부여한 후에, 앞서 만든 설명변수 X 데이터셋에 union all 로 'y1_num'과 'y2_cat' 데이터를 합쳐서 최종으로 하나의 long format의 테이블을 만들었습니다. 

 

첫번째 그룹은 200개의 숫자형 X 변수 중에서 'x1', 'x2', 'x3'의 3개 변수만 숫자형 목표변수(numeric target variable) 인 'y1_num'과 관련이 있고, 나머지 194개의 설명변수와는 관련이 없게끔 y1_num = x1*7.0 + x2*6.0 + x3*5.0 + 0.001*random() 함수를 사용해서 'y1_num' 을 만들었습니다 (*7.0, *6.0, *5.0 은 가상의 예제 데이터를 만들기 위해 임의로 선택한 값으로서, 아무 이유 없습니다).  뒤에서 PL/Python으로 Random Forest Regressor 의 feature importance 결과에서 'x1', 'x2', 'x3' 변수의 Feature Importance 값이 높게 나오는지 살펴보겠습니다.  

 

두번째 그룹은 200개의 숫자형 X변수 중에서 'x4', 'x5', 'x6'의 3개 변수만 범주형 목표변수(categorical target variable) 인 'y2_cat'과 관련이 있고, 나머지 194개의 설명변수와는 연관이 없게끔 y2_cat = case when (x4*7.0 + x5*6.0 - x6*4.0 + x4*5.0 + 0.001*random()) >= 9 then 1 else 0 함수로 부터 가상으로 생성하였습니다. 뒤에서 PL/Python으로 Random Forest Classifier 의 feature importance 결과에서 'x4', 'x5', 'x6' 변수의 Feature Importance 값이 높게 나오는지 살펴보겠습니다.  

 

------------------------------------------------------------------
-- Random Forest's Feature Importance using PL/Python on Greenplum
------------------------------------------------------------------

-- (1) Generate sample data 
-- 2 groups
-- 100 observations(ID) per group
-- X: 200 numeric input variables per observation(ID)
-- y : a numeric target variable by a function of y = x1*5.0 + x2*4.5 - x3*4.0 + x4*3.5 + 0.001*random()
-- distributed by 'grp' (group)

-- (1-1) 100 IDs of observations
drop table if exists id_tmp;
create table id_tmp (
	id integer
) distributed randomly;

insert into id_tmp (select * from generate_series(1, 100, 1));

select * from id_tmp order by id limit 3;
--id
--1
--2
--3


-- (1-2) 200 X variables
drop table if exists x_tmp;
create table x_tmp (
	x integer
) distributed randomly;

insert into x_tmp (select * from generate_series(1, 200, 1));

select * from x_tmp order by x limit 3;
--x
--1
--2
--3



-- (1-3) Cross join of ID and Xs
drop table if exists id_x_tmp;
create table id_x_tmp as (
	select * from id_tmp 
	cross join x_tmp 
) distributed randomly;

select count(1) from id_x_tmp; 
-- 20,000  -- (id 100 * x 200 = 20,000)

select * from id_x_tmp order by id, x limit 3;
--id  x
--1	  1
--1	  2
--1	  3



-- (1-4) Generate X values randomly
drop table if exists x_long_tmp;
create table x_long_tmp as (
	select 
		a.id as id
		, x
		, 'x'||a.x::text as x_col
		, round(random()::numeric, 3) as x_val 
	from id_x_tmp a
) distributed randomly;

select count(1) from x_long_tmp; 
-- 20,000

select * from x_long_tmp order by id, x limit 3;
--id  x  x_col  x_val
--1	  1	 x1	    0.956
--1	  2	 x2	    0.123
--1	  3	 x3	    0.716

select min(x_val) as x_min_val, max(x_val) as x_max_val from x_long_tmp;
--x_min_val  x_max_val
--0.000	     1.000



-- (1-5) create y values
drop table if exists y_tmp;
create table y_tmp as (
	select 
		s.id
		, (s.x1*7.0 + s.x2*6.0 + s.x3*5.0 + 0.001*random()) as y1_num -- numeric
		, case when (s.x4*7.0 + s.x5*6.0 + s.x6*5.0 + 0.001*random()) >= 9 
			then 1 
			else 0 
			end as y2_cat -- categorical
	from (
		select distinct(a.id) as id, x1, x2, x3, x4, x5, x6 from x_long_tmp as a
		left join (select id, x_val as x1 from x_long_tmp where x_col = 'x1') b 
			on a.id = b.id
		left join (select id, x_val as x2 from x_long_tmp where x_col = 'x2') c 
			on a.id = c.id 
		left join (select id, x_val as x3 from x_long_tmp where x_col = 'x3') d 
			on a.id = d.id 
		left join (select id, x_val as x4 from x_long_tmp where x_col = 'x4') e 
			on a.id = e.id
		left join (select id, x_val as x5 from x_long_tmp where x_col = 'x5') f 
			on a.id = f.id
		left join (select id, x_val as x6 from x_long_tmp where x_col = 'x6') g 
			on a.id = g.id
	) s
) distributed randomly;

select count(1) from y_tmp;
--100

select * from y_tmp order by id limit 5;
--id  y1_num            y2_cat
--1	11.0104868695838	1
--2	10.2772997177048	0
--3	7.81790575686749	0
--4	8.89387259676540	1
--5	2.47530914815422	1




--  (1-6) replicate X table to all clusters 
--        by the number of 'y' varialbes. (in this case, there are 2 y variables, 'y1_num' and 'y2_cat'
drop table if exists long_x_grp_tmp;
create table long_x_grp_tmp as (
	select 
		b.grp as grp
		, a.id as id
		, a.x_col as col_nm
		, a.x_val as col_val
	from x_long_tmp as a
	cross join (
		select generate_series(1, c.y_col_cnt) as grp
		from (
			select (count(distinct column_name) - 1) as y_col_cnt 
			from information_schema.columns 
				where table_name = 'y_tmp' and table_schema = 'public') c
		) as b -- number of clusters
) distributed randomly;


select count(1) from long_x_grp_tmp;
-- 40,000   -- 2 (y_col_cnt) * 20,000 (x_col_cnt)

select * from long_x_grp_tmp order by id limit 5;
--grp  id   col_nm  col_val
--1	1	x161	0.499
--2	1	x114	0.087
--1	1	x170	0.683
--2	1	x4	    0.037
--2	1	x45	    0.995



-- (1-7) create table in long format with x and y 
drop table if exists long_x_y;
create table long_x_y as (
	select x.*
	from long_x_grp_tmp as x
	union all 
	select 1::int as grp, y1.id as id, 'y1_num'::text as col_nm, y1.y1_num as col_val 
	from y_tmp as y1 
	union all 
	select 2::int as grp, y2.id as id, 'y2_cat'::text as col_nm, y2.y2_cat as col_val
	from y_tmp as y2
) distributed randomly;

select count(1) from long_x_y; 
-- 40,200 (x 40,000 + y1_num 100 + y2_cat 100)

select grp, count(1) from long_x_y group by 1 order by 1;
--grp  count
--1	   20100
--2	   20100

select * from long_x_y where grp=1 order by id, col_nm desc limit 5;
--grp  id   col_nm  col_val
--1	   1	y1_num	11.010
--1	   1	x99	     0.737
--1	   1	x98	     0.071
--1	   1	x97	     0.223
--1	   1	x96	     0.289

select * from long_x_y where grp=2 order by id, col_nm desc limit 5;
--grp  id   col_nm  col_val
--2	   1	y2_cat	1.0
--2	   1	x99	    0.737
--2	   1	x98	    0.071
--2	   1	x97	    0.223
--2	   1	x96	    0.289


-- drop temparary tables
drop table if exists id_tmp;
drop table if exists x_tmp;
drop table if exists id_x_tmp;
drop table if exists x_long_tmp;
drop table if exists y_tmp;
drop table if exists long_x_grp_tmp;


 

 

 

(2) PL/Python 사용자 정의함수 정의하기

 

- (2-1) composite return type 정의하기

 

PL/Python로 분산병렬로 연산한 Random Forest의 feature importance (또는 variable importance) 결과를 반환할 때 텍스트 데이터 유형의 '목표변수 이름(y_col_nm)', '설명변수 이름(x_col_nm)'과 '변수 중요도(feat_impo)' 의 array 형태로 반환하게 됩니다. 반환하는 데이터가 '텍스트'와 'float8' 로 서로 다른 데이터 유형이 섞여 있으므로 composite type 의 return type 을 만들어줍니다. 그리고 PL/Python은 array 형태로 반환하므로 text[], text[] 과 같이 '[]' 로서 array 형태로 반환함을 명시합니다.  

-- define composite return type
drop type if exists plpy_rf_feat_impo_type cascade;
create type plpy_rf_feat_impo_type as (
	y_col_nm text[]
	, x_col_nm text[]
	, feat_impo float8[]
);

 

 

 

- (2-2) Random Forest feature importance 결과를 반환하는 PL/Python 함수 정의하기

 

PL/Python 사용자 정의 함수를 정의할 때는 아래와 같이 PostgreSQL의 Procedural Language 함수 정의하는 표준 SQL 문을 사용합니다. 

input data 는 array 형태이므로 칼럼 이름 뒤에 데이터 유형에는 '[]'를 붙여줍니다. 

중간의  $$ ... python code block ... $$ 부분에 pure python code 를 넣어줍니다. 

제일 마지막에 PL/X 언어로서 language 'plpythonu' 으로 PL/Python 임을 명시적으로 지정해줍니다. 

 

create or replace function function_name(column1  data_type1[], column2 data_type2[], ...) 
returns return_type as $$
    ... python code block ...
$$ language 'plpythonu';

 

 

만약 PL/Container 를 사용한다면 명령 프롬프트 창에서 아래처럼 $ plcontainer runtime-show  로 Runtime ID를 확인 한 후에,   

[gpadmin@mdw ~]$ plcontainer runtime-show
PL/Container Runtime Configuration:
---------------------------------------------------------
  Runtime ID: plc_python3_shared
  Linked Docker Image: pivotaldata/plcontainer_python3_shared:devel
  Runtime Setting(s):
  Shared Directory:
  ---- Shared Directory From HOST '/usr/local/greenplum-db/./bin/plcontainer_clients' to Container '/clientdir', access mode is 'ro'
---------------------------------------------------------

 

 

PL/Python 코드블록의 시작 부분에 $$ # container: container_Runtime_ID  로서 사용하고자 하는 docker container 의 runtime ID를 지정해주고, 제일 마지막 부분에 $$ language 'plcontainer'; 로 확장 언어를 'plcontainer'로 지정해주면 됩니다.  PL/Container를 사용하면 최신의 Python 3.x 버전을 사용할 수 있는 장점이 있습니다. 


create or replace function function_name(column1  data_type1[], column2 data_type2[], ...) 
returns return_type as
$$
# container: plc_python3_shared
... python code block ...
$$ LANGUAGE 'plcontainer'; 

 

 

아래 코드에서는 array 형태의 'id', 'col_nm', 'col_val'의 3개 칼럼을 input 으로 받아서 먼저 pandas DataFrame으로 만들어 준 후에, 이를 pandas pivot_table() 함수를 사용해서 long form --> wide form 으로 데이터를 재구조화 해주었습니다. 

 

다음으로, 숫자형의 목표변수('y1_num')를 가지는 그룹1 데이터셋에 대해서는 sklearn 패키지의 RandomForestRegressor 클래스를 사용해서 Random Forest 모델을 훈련하고, 범주형의 목표변수('y2_cat')를 가지는 그룹2의 데이터셋에 대해서는 sklearn 패키지의 RandomForestClassifier 클래스를 사용하여 모델을 훈련하였습니다. 그리고  'rf_regr_fitted.feature_importances_' , 'rf_clas_fitted.feature_importances_'를 사용해서 200개의 각 변수별 feature importance 속성을 리스트로 가져왔습니다. 

 

마지막에 return {'y_col_nm': y_col_nm, 'x_col_nm': x_col_nm_list, 'feat_impo': feat_impo} 에서 전체 변수 리스트와 변수 중요도 연산 결과를 array 형태로 반환하게 했습니다. 

 

----------------------------------
-- PL/Python UDF for Random Forest
----------------------------------

-- define PL/Python function
drop function if exists plpy_rf_feat_impo_func(text[], text[], text[]);
create or replace function plpy_rf_feat_impo_func(
	id_arr text[]
	, col_nm_arr text[]
	, col_val_arr text[]
) returns plpy_rf_feat_impo_type as 
$$
#import numpy as np 
import pandas as pd

# making a DataFrame
xy_df = pd.DataFrame({
    'id': id_arr
    , 'col_nm': col_nm_arr
    , 'col_val': col_val_arr
})

# pivoting a table
xy_pvt = pd.pivot_table(xy_df
                        , index = ['id']
                        , columns = 'col_nm'
                        , values = 'col_val'
                        , aggfunc = 'first'
                        , fill_value = 0)

X = xy_pvt[xy_pvt.columns.difference(['y1_num', 'y2_cat'])]
X = X.astype(float)
x_col_nm_list = X.columns

# UDF for Feature Importance by RandomForestRegressor
def rf_regr_feat_impo(X, y):
	
	# training RandomForestRegressor
	from sklearn.ensemble import RandomForestRegressor
	rf_regr = RandomForestRegressor(n_estimators=200)
	rf_regr_fitted = rf_regr.fit(X, y)
	
	# The impurity-based feature importances.
	rf_regr_feat_impo = rf_regr_fitted.feature_importances_
	return rf_regr_feat_impo


# UDF for Feature Importance by RandomForestClassifier
def rf_clas_feat_impo(X, y):
	
	# training  RandomForestClassifier with balanced class_weight
	from sklearn.ensemble import RandomForestClassifier
	rf_clas = RandomForestClassifier(n_estimators=200, class_weight='balanced')
	rf_clas_fitted = rf_clas.fit(X, y)
	
	# The impurity-based feature importances.
	rf_clas_feat_impo = rf_clas_fitted.feature_importances_
	return rf_clas_feat_impo
	
	
# training RandomForest and getting variable(feature) importance
if 'y1_num' in xy_pvt.columns:
	y_target = 'y1_num'
	y = xy_pvt[y_target]
	feat_impo = rf_regr_feat_impo(X, y)

if 'y2_cat' in xy_pvt.columns:
	y_target = 'y2_cat'
	y = xy_pvt[y_target]
	y = y.astype(int)
	feat_impo = rf_clas_feat_impo(X, y)

feat_impo_df = pd.DataFrame({
	'y_col_nm': y_target
	, 'x_col_nm': x_col_nm_list
	, 'feat_impo': feat_impo
})

# returning the results of feature importances
return {
	'y_col_nm': feat_impo_df['y_col_nm'] 
	, 'x_col_nm': feat_impo_df['x_col_nm']
	, 'feat_impo': feat_impo_df['feat_impo']
	}
    
$$ language 'plpythonu';

 

 

 

(3) PL/Python 함수 실행하기 

 

PL/Python 함수를 실행할 때는 표준 SQL Query 문의 "SELECT group_name, pl_python_function() FROM table_name" 처럼 함수를 SELECT 문으로 직접 호출해서 사용합니다. 

 

PL/Python의 input 으로 array 형태의 데이터를 넣어주므로, 아래처럼 FROM 절의 sub query 에 array_agg() 함수로 먼저 데이터를 'grp' 그룹 별로 array aggregation 하였습니다. 

 

PL/Python 함수의 전체 결과를 모두 반환할 것이므로 (plpy_rf_var_impo_func()).* 처럼 함수를 모두 감싼 후에 ().* 를 사용하였습니다. (실수해서 빼먹기 쉬우므로 유의하시기 바랍니다.)

 

목표변수가 숫자형('y1_num')과 범주형('y2_cat')'별로 그룹1과 그룹2로 나누어서, 'grp' 그룹별로 분산병렬로 Random Forest 분석이 진행되며, Variable importance 결과를 'grp' 그룹 ID를 기준으로 분산해서 저장(distributed by (grp);)하게끔 해주었습니다.   

 

-- execute PL/Python function
drop table if exists rf_feat_impo_result;
create table rf_feat_impo_result as (
	select 
		a.grp 
		, (plpy_rf_feat_impo_func(
			a.id_arr
			, a.col_nm_arr
			, a.col_val_arr
		)).* 
		from (
			select 
				c.grp 
				, array_agg(c.id::text order by id) as id_arr
				, array_agg(c.col_nm::text order by id) as col_nm_arr
				, array_agg(c.col_val::text order by id) as col_val_arr
			from long_x_y as c
			group by grp
			) a
) distributed by (grp);

 

 

 

(4) 각 그룹별 변수별 Random Forest 의 Feature Importance 조회하기

 

위의 (3)번을 실행해서 나온 결과를 조회하면 아래와 같이 'grp=1', 'grp=2' 별로 각 칼럼별로 Random Forest에 의해 계산된 변수 중요도(variable importance) 가 array 형태로 저장되어 있음을 알 수 있습니다. 

 

select count(1) from rf_feat_impo_result; 
-- 2

-- results in array-format
select * from rf_feat_impo_result order by grp;

plpython_random_forest_feature_importance_array

 

위의 array 형태의 결과는 사람이 눈으로 보기에 불편하므로, unnest() 함수를 써서 long form 으로 길게 풀어서 결과를 조회해 보겠습니다. 

 

이번 예제에서는 난수로 생성한 X설명변수에 임의로 함수를 사용해서 숫자형 목표변수('y1_num')를 가지는 그룹1에 대해서는 'x1', 'x2', 'x3' 의 순서대로 변수가 중요하고, 범주형 목표변수('y2_cat')를 가지는 그룹2에서는  'x4', 'x5', 'x6'의 순서대로 변수가 중요하게 가상의 예제 데이터셋을 만들어주었습니다.  (random() 함수로 난수를 생성해서 예제 데이터셋을 만들었으므로, 매번 실행할 때마다 숫자는 달라집니다). 

 

아래 feature importance 결과를 보니, 역시 그룹1의 데이터셋에 대해서는 'x1', 'x2', 'x3' 변수가 중요하게 나왔고, 그룹2의 데이터셋에 대해서는 'x4', 'x5', 'x6' 변수가 중요하다고 나왔네요. 

 

-- display the results using unnest()
select 
	grp
	, unnest(y_col_nm) as y_col_nm
	, unnest(x_col_nm) as x_col_nm
	, unnest(feat_impo) as feat_impo 
from rf_feat_impo_result
where grp = 1
order by feat_impo desc 
limit 10;
--grp    y_col_nm      x_col_nm      feat_impo
--1	   y1_num	 x1	       0.4538784064497847
--1	   y1_num	 x2	       0.1328532144509229
--1	   y1_num	 x3	       0.10484121806286809
--1	   y1_num	 x34           0.006843343319633915
--1	   y1_num	 x42           0.006804819286213849
--1	   y1_num	 x182          0.005771113354638556
--1	   y1_num	 x143          0.005220090515711377
--1	   y1_num	 x154          0.005101366229848041
--1	   y1_num	 x46           0.004571420249598611
--1	   y1_num	 x57           0.004375780774099066

select 
	grp
	, unnest(y_col_nm) as y_col_nm
	, unnest(x_col_nm) as x_col_nm
	, unnest(feat_impo) as feat_impo 
from rf_feat_impo_result
where grp = 2
order by feat_impo desc 
limit 10;
--grp    y_col_nm      x_col_nm      feat_impo
--2	   y2_cat	 x4	       0.07490484681851341
--2	   y2_cat	 x5	       0.04099924609654107
--2	   y2_cat	 x6	       0.03431643243509608
--2	   y2_cat	 x12           0.01474464870781392
--2	   y2_cat	 x40           0.013865405628514437
--2	   y2_cat	 x37           0.013435535581862938
--2	   y2_cat	 x167          0.013236591006394367
--2	   y2_cat	 x133  	       0.012570295279560963
--2	   y2_cat	 x142  	       0.012177597741973058
--2	   y2_cat	 x116          0.011713289042962961


-- The end. 

 

 

이번 포스팅이 많은 도움이 되었기를 바랍니다. 

행복한 데이터 과학자 되세요! :-)

 

728x90
반응형
Posted by Rfriend
,

이번 포스팅에서는 PostgreSQL, Greenplum database에서 SQL, MADlib 함수, PL/R, PL/Python을 사용해서 연속형 데이터에 대한 요약통계량을 구하는 방법을 소개하겠습니다.  무척 쉬운 내용이므로 쉬어가는 코너 정도로 가볍게 생각해주시면 좋겠습니다. ^^


PostgreSQL, Greenplum database 에서 연속형 데이터에 대해 그룹별로, 

(1) SQL 로 요약통계량 구하기

(2) Apache MADlib 으로 요약통계량 구하기





참고로, 이번 포스팅에서는 PostgreSQL 9.4, Greenplum 6.10.1 버전을 사용하였으며, PostgreSQL 9.4 버전보다 낮은 버전을 사용하면 최빈값(mode), 사분위부(percentile) 구하는 함수를 사용할 수 없습니다. 


먼저, 예제로 사용하기 위해 '나이'의 연속형 데이터와 '성별'의 범주형 데이터 (그룹)를 가진 간단한 테이블을 만들어보겠습니다. 결측값(missing value)도 성별 그룹별로 몇 개 넣어봤습니다. 



DROP TABLE IF EXISTS cust;

CREATE TABLE cust (id INT, age INT, gender TEXT);

INSERT INTO cust VALUES

(1,NULL,'M'),

(2,NULL,'M'),

(3,25,'M'),

(4,28,'M'),

(5,27,'M'),

(6,25,'M'),

(7,26,'M'),

(8,29,'M'),

(9,25,'M'),

(10,27,'M'),

(11,NULL,'F'),

(12,23,'F'),

(13,25,'F'),

(14,23,'F'),

(15,24,'F'),

(16,26,'F'),

(17,23,'F'),

(18,24,'F'),

(19,22,'F'),

(20,23,'F');

 




 (1) SQL로 연속형 데이터의 그룹별 요약통계량 구하기


함수가 굳이 설명을 안해도 될 정도로 간단하므로 길게 설명하지는 않겠습니다. 


표준편차 STDDEV() 와 분산 VARIANCE() 함수는 표본표준편차(sample standard deviation), 표본분산(sample variance) 를 계산해줍니다. 만약 모표준편차(population standard deviation), 모분산(population variance)를 구하고 싶으면 STDDEV_POP(), VAR_POP() 함수를 사용하면 됩니다. 


PostgreSQL 9.4 버전 이상부터 최빈값(MODE), 백분위수(Percentile) 함수가 생겨서 정렬한 후에 집계하는 기능이 매우 편리해졌습니다. (MODE(), PERCENTILE_DISC() 함수를 사용하지 않고 pure SQL로 최빈값과 백분위수를 구하려면 query 가 꽤 길어집니다.)



SELECT

    gender AS group_by_value

    , 'age' AS target_column

    , COUNT(*) AS row_count

    , COUNT(DISTINCT age) AS distinct_values

    , AVG(age)

    , VARIANCE(age)

    , STDDEV(age)

    , MIN(age)

    , PERCENTILE_DISC(0.25) WITHIN GROUP (ORDER BY age) AS first_quartile

    , MEDIAN(age)

    , PERCENTILE_DISC(0.75) WITHIN GROUP (ORDER BY age) AS third_quartile

    , MAX(age)

    , MODE() WITHIN GROUP (ORDER BY age) -- over PostgreSQL 9.4

FROM cust

WHERE age IS NOT NULL

GROUP BY gender

ORDER BY gender;





성별 그룹별로 연령(age) 칼럼의 결측값 개수를 구해보겠습니다. 

결측값 개수는 WHERE age IS NULL 로 조건절을 주고 COUNT(*)로 행의 개수를 세어주면 됩니다. 



SELECT 

    gender

    , COUNT(*) AS missing_count

FROM cust

WHERE age IS NULL

GROUP BY gender

ORDER BY gender;


Out[5]:
gendermissing_count
F1
M2





위의 집계/ 요약통계량과 결측값 개수를 하나의 조회 결과로 보려면 아래처럼 Join 을 해주면 됩니다.



WITH summary_tbl AS (
    SELECT
        gender AS group_by_value
        , 'age' AS target_column
        , COUNT(*) AS row_count
        , COUNT(DISTINCT age) AS distinct_values
        , AVG(age)
        , VARIANCE(age)
        , STDDEV(age)
        , MIN(age)
        , PERCENTILE_DISC(0.25) WITHIN GROUP (ORDER BY age) AS first_quartile
        , MEDIAN(age)
        , PERCENTILE_DISC(0.75) WITHIN GROUP (ORDER BY age) AS third_quartile
        , MAX(age)
        , MODE() WITHIN GROUP (ORDER BY age)
    FROM cust
    WHERE age IS NOT NULL
    GROUP BY gender
    ORDER BY gender
), missing_tbl AS (
    SELECT
        gender AS group_by_value
        , COUNT(*) AS missing_count
    FROM cust
    WHERE age IS NULL
    GROUP BY gender
)
SELECT a.*, b.missing_count
FROM summary_tbl a LEFT JOIN missing_tbl b USING(group_by_value)
;

 




  (2) Apache MADlib으로 연속형 데이터의 그룹별 요약통계량 구하기


Apache MADlib의 madlib.summary() 함수를 사용하면 단 몇 줄의 코드만으로 위의 (1)번에서 SQL 집계 함수를 사용해서 길게 짠 코드를 대신해서 매우 깔끔하고 간단하게 구할 수 있습니다. 


아래는 (1)번의 결과를 얻기위해 성별(gender) 연령(age) 칼럼의 집계/요약데이터를 구하는 madlib.summary() 함수 예시입니다. 


Target columns 위치에는 1 개 이상의 분석을 원하는 연속형 데이터 칼럼을 추가로 넣어주기만 하면 되므로 (1) 번의 pure SQL 대비 훨씬 편리한 측면이 있습니다! 


그리고 그룹별로 구분해서 집계/요약하고 싶으면 Grouping columns 위치에 기준 칼럼 이름을 넣어주기만 하면 되므로 역시 (1)번의 pure SQL 대비 훨씬 편리합니다!



DROP TABLE IF EXISTS cust_summary;

SELECT madlib.summary('cust'     -- Source table

                      ,'cust_summary'   -- Output table

                      , 'age'                -- Target columns

                      , 'gender'            -- Grouping columns

);






madlib.summary() 함수의 결과 테이블에서 조회할 수 있는 집계/요약통계량 칼럼 리스트는 아래와 같습니다. 



SELECT column_name

FROM INFORMATION_SCHEMA.COLUMNS

    WHERE TABLE_SCHEMA = 'public'

        AND TABLE_NAME    = 'cust_summary'

    ORDER BY ORDINAL_POSITION;

Out[7]:
column_name
group_by
group_by_value
target_column
column_number
data_type
row_count
distinct_values
missing_values
blank_values
fraction_missing
fraction_blank
positive_values
negative_values
zero_values
mean
variance
confidence_interval
min
max
first_quartile
median
third_quartile
most_frequent_values
mfv_frequencies

 



[Reference]

* PostgreSQL aggregate functions: https://www.postgresql.org/docs/9.4/functions-aggregate.html

* Apache MADlib summary function: https://madlib.apache.org/docs/v1.11/group__grp__summary.html



다음번 포스팅에서는 PostgreSQL, Greenplum에서 SQL과 Apache MADlib을 이용하여 상관계수, 상관계수 행렬을 구하는 방법(https://rfriend.tistory.com/581)을 소개하겠습니다.


이번 포스팅이 많은 도움이 되었기를 바랍니다. 

행복한 데이터 과학자 되세요!



728x90
반응형
Posted by Rfriend
,

이번 포스팅에서는 도커 허브(Docker Hub)에서 Greenplum Database(이하 GPDB)에 MADlib, PL/R, PL/Python이 설치된 Docker Image를 내려받아 분석 환경을 구성하는 방법을 소개하겠습니다. 


이번 포스팅에서 소개하는 gpdb-analytics 도커 이미지는 개인이 집이나 회사에서 GPDB로 MADlib, PL/R, PL/Python 사용하면서 테스트해보고 공부할 때 간편하게 사용할 수 있는 용도로 만든 것입니다. 



[사전 준비] Dokcer Install


Docker Image를 이용하여 GPDB+분석툴을 설치할 것이므로, 먼저 Docker Hub (https://hub.docker.com/)에서 회원가입을 하고, https://www.docker.com/products/docker-desktop 사이트에서 자신의 OS에 맞는 Docker를 다운로드 받아 설치하시기 바랍니다. 




단, Windows OS 사용자의 경우는 (1) Windows 10 Professional or Enterprise 64-bit 의 경우 'Docker CE for Windows'를 다운받아 설치하시구요, 그 이전 버전 혹은 Home Edition 버전 Windows OS 이용하시는 분의 경우는 'Docker Toolbox'를 다운로드 받아서 설치하시기 바랍니다. 





[ Docker Hub에서 gpdb-analytics 도커 이미지 내려받아서 GPDB 분석 환경 구성하기 ]


1. Docker Hub에서 gpdb-analytics 도커 이미지 내려받기 (docker pull)





(터미널 사용)

## Docker 이미지 내려 받기

$ docker pull hdlee2u/gpdb-analytics

## Docker 이미지 확인 하기

$ docker images

REPOSITORY               TAG                 IMAGE ID            CREATED             SIZE
centos                     7                   d123f4e55e12        9 months ago        197MB
hdlee2u/gpdb-base        latest              bfe4e63b8e81         2 years ago           1.17GB

hdlee2u/gpdb-analytics  latest           3be773a1a7e1        About a minute ago   4.93GB

 




2. 도커 이미지를 실행하여 Docker Container 를 생성하고, GPDB 분석 환경 시작하기



## Docker 이미지를 실행/ 5432 기본 포트로, ssh 2022포트를 사용하여 접근 가능하도록 Docker 컨테이너 생성

docker run -i -d -p 5432:5432 -p 28080:28080 --name gpdb-ds --hostname mdw hdlee2u/gpdb-analytics /usr/sbin/sshd -D


## Docker 컨테이너 목록 확인

$ docker ps

CONTAINER ID    IMAGE                   COMMAND                   CREATED             STATUS              PORTS                   NAMES

7518fd48450a        575a7d45999d        "/bin/bash"              1 minute ago         Up 6 hours                  0.0.0.0:5432->5432/tcp, 0.0.0.0:28080->28080/tcp   gpdb-ds



## Start GPDB and Use psql


$ docker exec -it gpdb-ds /bin/bash

[root@mdw /]# su - gpadmin
[gpadmin@mdw ~]$ gpstart -a

20180821:04:45:08:000043 gpstart:mdw:gpadmin-[INFO]:-Starting gpstart with args: -a
20180821:04:45:08:000043 gpstart:mdw:gpadmin-[INFO]:-Gathering information and validating the environment...
20180821:04:45:08:000043 gpstart:mdw:gpadmin-[INFO]:-Greenplum Binary Version: 'postgres (Greenplum Database) 5.10.2 build commit:b3c02f3acd880e2d676dacea36be015e4a3826d4'
20180821:04:45:08:000043 gpstart:mdw:gpadmin-[INFO]:-Greenplum Catalog Version: '301705051'
20180821:04:45:08:000043 gpstart:mdw:gpadmin-[WARNING]:-postmaster.pid file exists on Master, checking if recovery startup required
20180821:04:45:08:000043 gpstart:mdw:gpadmin-[INFO]:-Commencing recovery startup checks
20180821:04:45:08:000043 gpstart:mdw:gpadmin-[INFO]:-Have lock file /tmp/.s.PGSQL.5432 but no process running on port 5432
20180821:04:45:08:000043 gpstart:mdw:gpadmin-[INFO]:-No Master instance process, entering recovery startup mode
20180821:04:45:08:000043 gpstart:mdw:gpadmin-[INFO]:-Clearing Master instance lock files
20180821:04:45:08:000043 gpstart:mdw:gpadmin-[INFO]:-Clearing Master instance pid file
20180821:04:45:08:000043 gpstart:mdw:gpadmin-[INFO]:-Starting Master instance in admin mode
20180821:04:45:10:000043 gpstart:mdw:gpadmin-[INFO]:-Obtaining Greenplum Master catalog information
20180821:04:45:10:000043 gpstart:mdw:gpadmin-[INFO]:-Obtaining Segment details from master...
20180821:04:45:10:000043 gpstart:mdw:gpadmin-[INFO]:-Setting new master era
20180821:04:45:10:000043 gpstart:mdw:gpadmin-[INFO]:-Commencing forced instance shutdown
20180821:04:45:12:000043 gpstart:mdw:gpadmin-[INFO]:-Starting Master instance in admin mode
20180821:04:45:13:000043 gpstart:mdw:gpadmin-[INFO]:-Obtaining Greenplum Master catalog information
20180821:04:45:13:000043 gpstart:mdw:gpadmin-[INFO]:-Obtaining Segment details from master...
20180821:04:45:13:000043 gpstart:mdw:gpadmin-[INFO]:-Setting new master era
20180821:04:45:13:000043 gpstart:mdw:gpadmin-[INFO]:-Master Started...
20180821:04:45:13:000043 gpstart:mdw:gpadmin-[INFO]:-Shutting down master
20180821:04:45:14:000043 gpstart:mdw:gpadmin-[INFO]:-Commencing parallel segment instance startup, please wait...
...
20180821:04:45:17:000043 gpstart:mdw:gpadmin-[INFO]:-Process results...
20180821:04:45:17:000043 gpstart:mdw:gpadmin-[INFO]:-----------------------------------------------------
20180821:04:45:17:000043 gpstart:mdw:gpadmin-[INFO]:-   Successful segment starts                                            = 2
20180821:04:45:17:000043 gpstart:mdw:gpadmin-[INFO]:-   Failed segment starts                                                = 0
20180821:04:45:17:000043 gpstart:mdw:gpadmin-[INFO]:-   Skipped segment starts (segments are marked down in configuration)   = 0
20180821:04:45:17:000043 gpstart:mdw:gpadmin-[INFO]:-----------------------------------------------------
20180821:04:45:17:000043 gpstart:mdw:gpadmin-[INFO]:-Successfully started 2 of 2 segment instances
20180821:04:45:17:000043 gpstart:mdw:gpadmin-[INFO]:-----------------------------------------------------
20180821:04:45:17:000043 gpstart:mdw:gpadmin-[INFO]:-Starting Master instance mdw directory /data/master/gpseg-1
20180821:04:45:18:000043 gpstart:mdw:gpadmin-[INFO]:-Command pg_ctl reports Master mdw instance active
20180821:04:45:18:000043 gpstart:mdw:gpadmin-[INFO]:-No standby master configured.  skipping...
20180821:04:45:18:000043 gpstart:mdw:gpadmin-[INFO]:-Database successfully started
[gpadmin@mdw ~]$
[gpadmin@mdw ~]$

[gpadmin@mdw ~]$ psql
psql (8.3.23)
Type "help" for help.

gpadmin=# \dn
List of schemas
Name | Owner
--------------------+---------
gp_toolkit | gpadmin
information_schema | gpadmin
madlib | gpadmin
pg_aoseg | gpadmin
pg_bitmapindex | gpadmin
pg_catalog | gpadmin
pg_toast | gpadmin
public | gpadmin
(8 rows)

gpadmin=# \q




3. PGAdmin IV 로 GPDB 연결하기


GPDB 5.x 버전에서 SQL Query를 할 때 PGAdmin IV 를 사용합니다. (GPDB 5.x 버전에서는 PGAdmin III 는 작동하지 않으며, 반대로 GPDB 4.x 버전에서는 PGAdmin IV가 작동하지 않고 대신 PGAdmin III만 작동합니다)


PGAdmin IV 는 https://www.pgadmin.org/download/ 에서 다운로드 하여 설치하면 됩니다. 


  • Host : localhost
  • Port : 5432
  • Maintenance DB : gpadmin
  • Username : gpadmin
  • Password : pivotal
  • Group: Servers
  • Ternel port: 22






4. Jupyter Notebook으로 GPDB 연결하기


4-0. Python 2.7 version의 Anaconda 설치


https://www.anaconda.com/download 에서 자신의 OS에 맞는 Python 2.7 version의 Anaconda를 설치합니다. GPDB 5.x 버전은 Python 2.7 version을 지원합니다. 




Anaconda를 설치하였으면, Anaconda Navigator를 실행한 후 base(root) 환경(즉, python 2.7)에서 'Jupyter Notebook'의 Launch 단추를 눌러서 Jupyter Notebook을 실행합니다. 





터미널을 이용해서 가상환경을 조회, 선택하고 Jupyter Notebook을 실행할 수도 있습니다. 




# 가상 환경 리스트 조회

$ conda env list

 

# 가상 환경 선택 (가상환경 리스트에서 python 2.7 버전 선택, windows의 경우: activate env_name)

$ source activate env_name


# Jupyter Notebook 실행

$ jupyter notebook






4-1. pip install 로 추가로 필요한 Python 패키지 설치하기



(터미널에서)

$ pip install --upgrade pip


$ pip install psycopg2

$ pip install sqlalchemy

$ pip install sql_magic

$ pip install math

$ pip install textwrap

$ pip install os

$ pip install Ipython


$ pip install ipywidgets

$ jupyter nbextension enable --py widgetsnbextension

$ pip install pygraphviz





4-2.  Jupyter Notebook에서 DB Connection 설정하기

4-2-1. Python packages importing



# Common modules

import numpy as np

import pandas as pd

from pandas import DataFrame

from pandas import Series

import sklearn

import math

import textwrap as tw


# For DB Connecton

import psycopg2

from sqlalchemy import create_engine

import sql_magic


# For reproducibility

np.random.seed(2622)


# Directory

import os

 



4-2-2. Visualization Parms Setup



import matplotlib as mpl

import matplotlib.pyplot as plt

import seaborn as sns


# To draw plots in jupyter notebook

#%matplotlib inline

%pylab inline


from pylab import rcParams

rcParams['figure.figsize'] = 12, 8

rcParams['axes.labelsize'] = 14

rcParams['xtick.labelsize'] = 12

rcParams['ytick.labelsize'] = 12


pd.set_option('display.max_columns', None)

pd.set_option('display.max_colwidth', 1000)


# Display

import ipywidgets as widgets

import IPython.display as ipd

from IPython.display import display


# interpret string as markdown

def printmd(string):

    ipd.display(ipd.Markdown(string))


# seaborn style

sns.set(style="darkgrid")




4-2-3. Greenplum Database Connection Setup



# put your own GPDB information

user = 'gpadmin'

password = 'pivotal'

host = 'localhost'

db = 'gpadmin'


connection_string = "postgresql://{user}:{password}@{host}/{db}".\

    format(user=user, 

           password=password, 

           host=host, 

           db=db)

    

conn = psycopg2.connect(connection_string)

cur = conn.cursor()

conn.autocommit = True




# helper function

def query_gpdb(query): 


    cur.execute(query)


    colnames = [desc[0] for desc in cur.description]

    return DataFrame(cur.fetchall(), columns=colnames)

 




4-2-4. sql_magic Setup

https://github.com/pivotal-legacy/sql_magic

sql_magic is Jupyter magic for writing SQL to interact with Greenplum/PostgreSQL database, Hive and Spark. Query results are saved directly to a Pandas dataframe.



# sql_magic package and ext sql_magic to query GPDB

%load_ext sql_magic

#%reload_ext sql_magic


# sql_magic

postgres_engine = create_engine(connection_string)


%config SQL.conn_name = 'postgres_engine'


# '%execsql' for sql execution, 

# '%read_sql' for reading table as a DataFrame format

from IPython.core.magic import (register_line_magic, register_cell_magic, register_line_cell_magic)

@register_cell_magic

def execsql(line, cell):

       _ = postgres_engine.execute(cell)

       return

 




드디어 GPDB를 개인 컴퓨터에서 테스트, 공부용으로 간편하게(? ^^;) 설치하여 보았습니다. 수고 많으셨습니다. 




Jupyter Notebook 에서 sql 매직 언어로 DB를 조회할 수 있습니다. 


# GPDB 버전 확인

%read_sql select version();



# GPDB instance 확인 (하나의 서버에 1개 master, 2개 segment가 설치된 경우임)

%read_sql select * from pg_catalog.gp_segment_configuration 



# MADlib version 확인

%read_sql select madlib.version();



# PL/Languages 확인


sql query가 두 줄 이상일 경우 %%read_sql 처럼 % 두개를 앞에 써줍니다. (sql query 가 한 줄일 경우 %read_sql)

%%read_sql 

select * 

    from pg_catalog.pg_language;



# Table 생성

%%execsql

drop table if exists tmp;

create table tmp (

    id int, 

    var1 varchar(10)

    );




5. Docker Container 중단, 재시작, 작동 중인 컨테이너 목록 확인



## Docker  컨테이너 중단, 재시작, 목록 확인

$ docker stop gpdb-ds

$ docker start gpdb-ds

$ docker ps

CONTAINER ID    IMAGE                   COMMAND                   CREATED             STATUS              PORTS                   NAMES
7518fd48450a        575a7d45999d        "/bin/bash"              2 minutes ago         Up 6 hours                  0.0.0.0:5432->5432/tcp, 0.0.0.0:28080->28080/tcp   gpdb-ds

 



많은 도움이 되었기를 바랍니다. 



---------------

혹시 아래와 같은 에러가 발생하는 경우 http://rfriend.tistory.com/396 포스팅을 참고하세요. 



Error response from daemon: driver failed programming external connectivity on endpoint gpdb-ds (d519c381360008f0ac0e8d756e97aeb0538075ee1b7e35862a0eaedf887181f1): Error starting userland proxy: Bind for 0.0.0.0:5432 failed: port is already allocated

Error: failed to start containers: gpdb-ds 





728x90
반응형
Posted by Rfriend
,