Greenplum DB는 여려개의 PostgreSQL DB를 합쳐놓은 shared-nothing architecture 의 MPP (Massively Parallel Processing) Database 입니다.  손과 발이 되는 여러개의 cluster nodes에 머리가 되는 Master host 가 조율/조정/지시해가면서 분산하여 병렬로 일을 시키고, 각 cluster nodes의 연산처리 결과를 master host 가 모아서 취합하여 최종 결과를 반환하는 방식으로 일을 하기 때문에 (1) 대용량 데이터도 (2) 매우 빠르게 처리할 수 있습니다. 

 

이번 포스팅에서는 여기에서 한발 더 나아가서, Procedural Language Extension (PL/X) 을 사용하여 Python, R, Java, C, Perl, SQL 등의 프로그래밍 언어를 Greenplum DB 내에서 사용하여 데이터의 이동 없이 분산 병렬처리하는 방법을 소개하겠습니다. 

 

Massively Parallel Processing through PL/Python on Greenplum DB

 

난수를 발생시켜서 만든 가상의 데이터셋을 사용하여 PL/Python으로 Random Forest의 Feature Importance를 숫자형 목표변수('y1_num') 그룹과 범주형 목표변수('y2_cat') 그룹 별로 분산병렬처리하는 간단한 예를 들어보겠습니다. 

 

(1) 가상의 데이터셋 만들기
   - group: 2개의 그룹

   - X: 각 그룹별 100개의 관측치별 200개의 숫자형 변수를 가지는 가상의 데이터셋

   - y: 숫자형 목표변수 'y1_num', 범주형 목표변수 'y2_cat'

      (a) y1_num = x1*7.0 + x2*6.0 - x3*4.0 + x4*5.0 + 0.001*random()

      (b) y2_cat = case when (x4*7.0 + x5*6.0 - x6*4.0 + x4*5.0 + 0.001*random()) >= 9 then 1 else 0
의 함수로 부터 만듦. 

(2) PL/Python 함수 정의하기
   : (a) 숫자형 목표변수('y1_num') 그룹은 Random Forest Regressor를,  (b) 범주형 목표변수('y2_cat') 그룹은 Random Forest Classifier 를 각 그룹별로 분산병렬로 훈련시킨 후,
   : 각 그룹별 Random Forest Regressor 모델별 200개의 숫자형 변수별 Feature Importance를 반환

(3) PL/Python 함수 실행하기 

(4) 각 그룹별 변수별 Random Forest 의 Feature Importance 를 조회하기

 

를 해 보겠습니다. 

 

 

 

(1) 가상의 예제 데이터셋 만들기

   - group: 2개의 그룹

         (목표변수로 하나는 숫자형, 하나는 범주형 값을 가지는 2개의 X&y 데이터셋 그룹을 생성함.)

 

   - X: 각 그룹별 100개의 관측치별 200개의 숫자형 변수를 가지는 가상의 데이터셋

         (SQL의 random() 함수로 0~1 사이의 난수 실수값을 생성함.)

 

   - y: 숫자형 목표변수 'y1_num', 범주형 목표변수 'y2_cat'

       (a) y1_num = x1*7.0 + x2*6.0 - x3*4.0 + x4*5.0 + 0.001*random()

       (b) y2_cat = case when (x4*7.0 + x5*6.0 - x6*4.0 + x4*5.0 + 0.001*random()) >= 9 then 1 else 0 

             (y1_num, y2_cat 값을 만들때 x 변수에 곱하는데 사용한 *7.0, *6.0, *5.0 은 아무런 의미 없습니다.

             그냥 가상의 예제 샘플 데이터를 만들려고 임의로 선택해서 곱해준 값입니다.)

 

아래의 예제는 In-DB 처리를 염두에 두고, 200개의 숫자형 X 변수들과 한개의 숫자형 y 변수를 DB의 테이블에 "col_nm"이라는 칼럼에는 변수 이름을, "col_val"에는  변수 값을 long form 으로 생성해서 저장해 놓은 것입니다. 

 

나중에 PL/Python의 함수 안에서 pandas 의 pivot_table() 함수를 사용해서 wide form 으로 DataFrame의 형태를 재구조화해서 random forest 를 분석하게 됩니다. 

 

제 맥북에서 도커로 만든 Greenplum 에는 1개의 master node와 2개의 segment nodes 가 있는데요, 편의상 cross join 으로 똑같은 칼럼과 값을 가지는 설명변수 X 데이터셋을 2의 segment nodes에 replication 해서 그룹1('grp=1'), 그룹2('grp=2')를 만들었습니다.

 

그리고 여기에 목표변수 y 값으로는 숫자형 목표변수 'y1_num' 칼럼의 값에 대해서는 그룹1('grp=1'), 범주형 목표변수 'y2_cat' 칼럼의 값에 대해서는 그룹2('grp=2')를 부여한 후에, 앞서 만든 설명변수 X 데이터셋에 union all 로 'y1_num'과 'y2_cat' 데이터를 합쳐서 최종으로 하나의 long format의 테이블을 만들었습니다. 

 

첫번째 그룹은 200개의 숫자형 X 변수 중에서 'x1', 'x2', 'x3'의 3개 변수만 숫자형 목표변수(numeric target variable) 인 'y1_num'과 관련이 있고, 나머지 194개의 설명변수와는 관련이 없게끔 y1_num = x1*7.0 + x2*6.0 + x3*5.0 + 0.001*random() 함수를 사용해서 'y1_num' 을 만들었습니다 (*7.0, *6.0, *5.0 은 가상의 예제 데이터를 만들기 위해 임의로 선택한 값으로서, 아무 이유 없습니다).  뒤에서 PL/Python으로 Random Forest Regressor 의 feature importance 결과에서 'x1', 'x2', 'x3' 변수의 Feature Importance 값이 높게 나오는지 살펴보겠습니다.  

 

두번째 그룹은 200개의 숫자형 X변수 중에서 'x4', 'x5', 'x6'의 3개 변수만 범주형 목표변수(categorical target variable) 인 'y2_cat'과 관련이 있고, 나머지 194개의 설명변수와는 연관이 없게끔 y2_cat = case when (x4*7.0 + x5*6.0 - x6*4.0 + x4*5.0 + 0.001*random()) >= 9 then 1 else 0 함수로 부터 가상으로 생성하였습니다. 뒤에서 PL/Python으로 Random Forest Classifier 의 feature importance 결과에서 'x4', 'x5', 'x6' 변수의 Feature Importance 값이 높게 나오는지 살펴보겠습니다.  

 

------------------------------------------------------------------
-- Random Forest's Feature Importance using PL/Python on Greenplum
------------------------------------------------------------------

-- (1) Generate sample data 
-- 2 groups
-- 100 observations(ID) per group
-- X: 200 numeric input variables per observation(ID)
-- y : a numeric target variable by a function of y = x1*5.0 + x2*4.5 - x3*4.0 + x4*3.5 + 0.001*random()
-- distributed by 'grp' (group)

-- (1-1) 100 IDs of observations
drop table if exists id_tmp;
create table id_tmp (
	id integer
) distributed randomly;

insert into id_tmp (select * from generate_series(1, 100, 1));

select * from id_tmp order by id limit 3;
--id
--1
--2
--3


-- (1-2) 200 X variables
drop table if exists x_tmp;
create table x_tmp (
	x integer
) distributed randomly;

insert into x_tmp (select * from generate_series(1, 200, 1));

select * from x_tmp order by x limit 3;
--x
--1
--2
--3



-- (1-3) Cross join of ID and Xs
drop table if exists id_x_tmp;
create table id_x_tmp as (
	select * from id_tmp 
	cross join x_tmp 
) distributed randomly;

select count(1) from id_x_tmp; 
-- 20,000  -- (id 100 * x 200 = 20,000)

select * from id_x_tmp order by id, x limit 3;
--id  x
--1	  1
--1	  2
--1	  3



-- (1-4) Generate X values randomly
drop table if exists x_long_tmp;
create table x_long_tmp as (
	select 
		a.id as id
		, x
		, 'x'||a.x::text as x_col
		, round(random()::numeric, 3) as x_val 
	from id_x_tmp a
) distributed randomly;

select count(1) from x_long_tmp; 
-- 20,000

select * from x_long_tmp order by id, x limit 3;
--id  x  x_col  x_val
--1	  1	 x1	    0.956
--1	  2	 x2	    0.123
--1	  3	 x3	    0.716

select min(x_val) as x_min_val, max(x_val) as x_max_val from x_long_tmp;
--x_min_val  x_max_val
--0.000	     1.000



-- (1-5) create y values
drop table if exists y_tmp;
create table y_tmp as (
	select 
		s.id
		, (s.x1*7.0 + s.x2*6.0 + s.x3*5.0 + 0.001*random()) as y1_num -- numeric
		, case when (s.x4*7.0 + s.x5*6.0 + s.x6*5.0 + 0.001*random()) >= 9 
			then 1 
			else 0 
			end as y2_cat -- categorical
	from (
		select distinct(a.id) as id, x1, x2, x3, x4, x5, x6 from x_long_tmp as a
		left join (select id, x_val as x1 from x_long_tmp where x_col = 'x1') b 
			on a.id = b.id
		left join (select id, x_val as x2 from x_long_tmp where x_col = 'x2') c 
			on a.id = c.id 
		left join (select id, x_val as x3 from x_long_tmp where x_col = 'x3') d 
			on a.id = d.id 
		left join (select id, x_val as x4 from x_long_tmp where x_col = 'x4') e 
			on a.id = e.id
		left join (select id, x_val as x5 from x_long_tmp where x_col = 'x5') f 
			on a.id = f.id
		left join (select id, x_val as x6 from x_long_tmp where x_col = 'x6') g 
			on a.id = g.id
	) s
) distributed randomly;

select count(1) from y_tmp;
--100

select * from y_tmp order by id limit 5;
--id  y1_num            y2_cat
--1	11.0104868695838	1
--2	10.2772997177048	0
--3	7.81790575686749	0
--4	8.89387259676540	1
--5	2.47530914815422	1




--  (1-6) replicate X table to all clusters 
--        by the number of 'y' varialbes. (in this case, there are 2 y variables, 'y1_num' and 'y2_cat'
drop table if exists long_x_grp_tmp;
create table long_x_grp_tmp as (
	select 
		b.grp as grp
		, a.id as id
		, a.x_col as col_nm
		, a.x_val as col_val
	from x_long_tmp as a
	cross join (
		select generate_series(1, c.y_col_cnt) as grp
		from (
			select (count(distinct column_name) - 1) as y_col_cnt 
			from information_schema.columns 
				where table_name = 'y_tmp' and table_schema = 'public') c
		) as b -- number of clusters
) distributed randomly;


select count(1) from long_x_grp_tmp;
-- 40,000   -- 2 (y_col_cnt) * 20,000 (x_col_cnt)

select * from long_x_grp_tmp order by id limit 5;
--grp  id   col_nm  col_val
--1	1	x161	0.499
--2	1	x114	0.087
--1	1	x170	0.683
--2	1	x4	    0.037
--2	1	x45	    0.995



-- (1-7) create table in long format with x and y 
drop table if exists long_x_y;
create table long_x_y as (
	select x.*
	from long_x_grp_tmp as x
	union all 
	select 1::int as grp, y1.id as id, 'y1_num'::text as col_nm, y1.y1_num as col_val 
	from y_tmp as y1 
	union all 
	select 2::int as grp, y2.id as id, 'y2_cat'::text as col_nm, y2.y2_cat as col_val
	from y_tmp as y2
) distributed randomly;

select count(1) from long_x_y; 
-- 40,200 (x 40,000 + y1_num 100 + y2_cat 100)

select grp, count(1) from long_x_y group by 1 order by 1;
--grp  count
--1	   20100
--2	   20100

select * from long_x_y where grp=1 order by id, col_nm desc limit 5;
--grp  id   col_nm  col_val
--1	   1	y1_num	11.010
--1	   1	x99	     0.737
--1	   1	x98	     0.071
--1	   1	x97	     0.223
--1	   1	x96	     0.289

select * from long_x_y where grp=2 order by id, col_nm desc limit 5;
--grp  id   col_nm  col_val
--2	   1	y2_cat	1.0
--2	   1	x99	    0.737
--2	   1	x98	    0.071
--2	   1	x97	    0.223
--2	   1	x96	    0.289


-- drop temparary tables
drop table if exists id_tmp;
drop table if exists x_tmp;
drop table if exists id_x_tmp;
drop table if exists x_long_tmp;
drop table if exists y_tmp;
drop table if exists long_x_grp_tmp;


 

 

 

(2) PL/Python 사용자 정의함수 정의하기

 

- (2-1) composite return type 정의하기

 

PL/Python로 분산병렬로 연산한 Random Forest의 feature importance (또는 variable importance) 결과를 반환할 때 텍스트 데이터 유형의 '목표변수 이름(y_col_nm)', '설명변수 이름(x_col_nm)'과 '변수 중요도(feat_impo)' 의 array 형태로 반환하게 됩니다. 반환하는 데이터가 '텍스트'와 'float8' 로 서로 다른 데이터 유형이 섞여 있으므로 composite type 의 return type 을 만들어줍니다. 그리고 PL/Python은 array 형태로 반환하므로 text[], text[] 과 같이 '[]' 로서 array 형태로 반환함을 명시합니다.  

-- define composite return type
drop type if exists plpy_rf_feat_impo_type cascade;
create type plpy_rf_feat_impo_type as (
	y_col_nm text[]
	, x_col_nm text[]
	, feat_impo float8[]
);

 

 

 

- (2-2) Random Forest feature importance 결과를 반환하는 PL/Python 함수 정의하기

 

PL/Python 사용자 정의 함수를 정의할 때는 아래와 같이 PostgreSQL의 Procedural Language 함수 정의하는 표준 SQL 문을 사용합니다. 

input data 는 array 형태이므로 칼럼 이름 뒤에 데이터 유형에는 '[]'를 붙여줍니다. 

중간의  $$ ... python code block ... $$ 부분에 pure python code 를 넣어줍니다. 

제일 마지막에 PL/X 언어로서 language 'plpythonu' 으로 PL/Python 임을 명시적으로 지정해줍니다. 

 

create or replace function function_name(column1  data_type1[], column2 data_type2[], ...) 
returns return_type as $$
    ... python code block ...
$$ language 'plpythonu';

 

 

만약 PL/Container 를 사용한다면 명령 프롬프트 창에서 아래처럼 $ plcontainer runtime-show  로 Runtime ID를 확인 한 후에,   

[gpadmin@mdw ~]$ plcontainer runtime-show
PL/Container Runtime Configuration:
---------------------------------------------------------
  Runtime ID: plc_python3_shared
  Linked Docker Image: pivotaldata/plcontainer_python3_shared:devel
  Runtime Setting(s):
  Shared Directory:
  ---- Shared Directory From HOST '/usr/local/greenplum-db/./bin/plcontainer_clients' to Container '/clientdir', access mode is 'ro'
---------------------------------------------------------

 

 

PL/Python 코드블록의 시작 부분에 $$ # container: container_Runtime_ID  로서 사용하고자 하는 docker container 의 runtime ID를 지정해주고, 제일 마지막 부분에 $$ language 'plcontainer'; 로 확장 언어를 'plcontainer'로 지정해주면 됩니다.  PL/Container를 사용하면 최신의 Python 3.x 버전을 사용할 수 있는 장점이 있습니다. 


create or replace function function_name(column1  data_type1[], column2 data_type2[], ...) 
returns return_type as
$$
# container: plc_python3_shared
... python code block ...
$$ LANGUAGE 'plcontainer'; 

 

 

아래 코드에서는 array 형태의 'id', 'col_nm', 'col_val'의 3개 칼럼을 input 으로 받아서 먼저 pandas DataFrame으로 만들어 준 후에, 이를 pandas pivot_table() 함수를 사용해서 long form --> wide form 으로 데이터를 재구조화 해주었습니다. 

 

다음으로, 숫자형의 목표변수('y1_num')를 가지는 그룹1 데이터셋에 대해서는 sklearn 패키지의 RandomForestRegressor 클래스를 사용해서 Random Forest 모델을 훈련하고, 범주형의 목표변수('y2_cat')를 가지는 그룹2의 데이터셋에 대해서는 sklearn 패키지의 RandomForestClassifier 클래스를 사용하여 모델을 훈련하였습니다. 그리고  'rf_regr_fitted.feature_importances_' , 'rf_clas_fitted.feature_importances_'를 사용해서 200개의 각 변수별 feature importance 속성을 리스트로 가져왔습니다. 

 

마지막에 return {'y_col_nm': y_col_nm, 'x_col_nm': x_col_nm_list, 'feat_impo': feat_impo} 에서 전체 변수 리스트와 변수 중요도 연산 결과를 array 형태로 반환하게 했습니다. 

 

----------------------------------
-- PL/Python UDF for Random Forest
----------------------------------

-- define PL/Python function
drop function if exists plpy_rf_feat_impo_func(text[], text[], text[]);
create or replace function plpy_rf_feat_impo_func(
	id_arr text[]
	, col_nm_arr text[]
	, col_val_arr text[]
) returns plpy_rf_feat_impo_type as 
$$
#import numpy as np 
import pandas as pd

# making a DataFrame
xy_df = pd.DataFrame({
    'id': id_arr
    , 'col_nm': col_nm_arr
    , 'col_val': col_val_arr
})

# pivoting a table
xy_pvt = pd.pivot_table(xy_df
                        , index = ['id']
                        , columns = 'col_nm'
                        , values = 'col_val'
                        , aggfunc = 'first'
                        , fill_value = 0)

X = xy_pvt[xy_pvt.columns.difference(['y1_num', 'y2_cat'])]
X = X.astype(float)
x_col_nm_list = X.columns

# UDF for Feature Importance by RandomForestRegressor
def rf_regr_feat_impo(X, y):
	
	# training RandomForestRegressor
	from sklearn.ensemble import RandomForestRegressor
	rf_regr = RandomForestRegressor(n_estimators=200)
	rf_regr_fitted = rf_regr.fit(X, y)
	
	# The impurity-based feature importances.
	rf_regr_feat_impo = rf_regr_fitted.feature_importances_
	return rf_regr_feat_impo


# UDF for Feature Importance by RandomForestClassifier
def rf_clas_feat_impo(X, y):
	
	# training  RandomForestClassifier with balanced class_weight
	from sklearn.ensemble import RandomForestClassifier
	rf_clas = RandomForestClassifier(n_estimators=200, class_weight='balanced')
	rf_clas_fitted = rf_clas.fit(X, y)
	
	# The impurity-based feature importances.
	rf_clas_feat_impo = rf_clas_fitted.feature_importances_
	return rf_clas_feat_impo
	
	
# training RandomForest and getting variable(feature) importance
if 'y1_num' in xy_pvt.columns:
	y_target = 'y1_num'
	y = xy_pvt[y_target]
	feat_impo = rf_regr_feat_impo(X, y)

if 'y2_cat' in xy_pvt.columns:
	y_target = 'y2_cat'
	y = xy_pvt[y_target]
	y = y.astype(int)
	feat_impo = rf_clas_feat_impo(X, y)

feat_impo_df = pd.DataFrame({
	'y_col_nm': y_target
	, 'x_col_nm': x_col_nm_list
	, 'feat_impo': feat_impo
})

# returning the results of feature importances
return {
	'y_col_nm': feat_impo_df['y_col_nm'] 
	, 'x_col_nm': feat_impo_df['x_col_nm']
	, 'feat_impo': feat_impo_df['feat_impo']
	}
    
$$ language 'plpythonu';

 

 

 

(3) PL/Python 함수 실행하기 

 

PL/Python 함수를 실행할 때는 표준 SQL Query 문의 "SELECT group_name, pl_python_function() FROM table_name" 처럼 함수를 SELECT 문으로 직접 호출해서 사용합니다. 

 

PL/Python의 input 으로 array 형태의 데이터를 넣어주므로, 아래처럼 FROM 절의 sub query 에 array_agg() 함수로 먼저 데이터를 'grp' 그룹 별로 array aggregation 하였습니다. 

 

PL/Python 함수의 전체 결과를 모두 반환할 것이므로 (plpy_rf_var_impo_func()).* 처럼 함수를 모두 감싼 후에 ().* 를 사용하였습니다. (실수해서 빼먹기 쉬우므로 유의하시기 바랍니다.)

 

목표변수가 숫자형('y1_num')과 범주형('y2_cat')'별로 그룹1과 그룹2로 나누어서, 'grp' 그룹별로 분산병렬로 Random Forest 분석이 진행되며, Variable importance 결과를 'grp' 그룹 ID를 기준으로 분산해서 저장(distributed by (grp);)하게끔 해주었습니다.   

 

-- execute PL/Python function
drop table if exists rf_feat_impo_result;
create table rf_feat_impo_result as (
	select 
		a.grp 
		, (plpy_rf_feat_impo_func(
			a.id_arr
			, a.col_nm_arr
			, a.col_val_arr
		)).* 
		from (
			select 
				c.grp 
				, array_agg(c.id::text order by id) as id_arr
				, array_agg(c.col_nm::text order by id) as col_nm_arr
				, array_agg(c.col_val::text order by id) as col_val_arr
			from long_x_y as c
			group by grp
			) a
) distributed by (grp);

 

 

 

(4) 각 그룹별 변수별 Random Forest 의 Feature Importance 조회하기

 

위의 (3)번을 실행해서 나온 결과를 조회하면 아래와 같이 'grp=1', 'grp=2' 별로 각 칼럼별로 Random Forest에 의해 계산된 변수 중요도(variable importance) 가 array 형태로 저장되어 있음을 알 수 있습니다. 

 

select count(1) from rf_feat_impo_result; 
-- 2

-- results in array-format
select * from rf_feat_impo_result order by grp;

plpython_random_forest_feature_importance_array

 

위의 array 형태의 결과는 사람이 눈으로 보기에 불편하므로, unnest() 함수를 써서 long form 으로 길게 풀어서 결과를 조회해 보겠습니다. 

 

이번 예제에서는 난수로 생성한 X설명변수에 임의로 함수를 사용해서 숫자형 목표변수('y1_num')를 가지는 그룹1에 대해서는 'x1', 'x2', 'x3' 의 순서대로 변수가 중요하고, 범주형 목표변수('y2_cat')를 가지는 그룹2에서는  'x4', 'x5', 'x6'의 순서대로 변수가 중요하게 가상의 예제 데이터셋을 만들어주었습니다.  (random() 함수로 난수를 생성해서 예제 데이터셋을 만들었으므로, 매번 실행할 때마다 숫자는 달라집니다). 

 

아래 feature importance 결과를 보니, 역시 그룹1의 데이터셋에 대해서는 'x1', 'x2', 'x3' 변수가 중요하게 나왔고, 그룹2의 데이터셋에 대해서는 'x4', 'x5', 'x6' 변수가 중요하다고 나왔네요. 

 

-- display the results using unnest()
select 
	grp
	, unnest(y_col_nm) as y_col_nm
	, unnest(x_col_nm) as x_col_nm
	, unnest(feat_impo) as feat_impo 
from rf_feat_impo_result
where grp = 1
order by feat_impo desc 
limit 10;
--grp    y_col_nm      x_col_nm      feat_impo
--1	   y1_num	 x1	       0.4538784064497847
--1	   y1_num	 x2	       0.1328532144509229
--1	   y1_num	 x3	       0.10484121806286809
--1	   y1_num	 x34           0.006843343319633915
--1	   y1_num	 x42           0.006804819286213849
--1	   y1_num	 x182          0.005771113354638556
--1	   y1_num	 x143          0.005220090515711377
--1	   y1_num	 x154          0.005101366229848041
--1	   y1_num	 x46           0.004571420249598611
--1	   y1_num	 x57           0.004375780774099066

select 
	grp
	, unnest(y_col_nm) as y_col_nm
	, unnest(x_col_nm) as x_col_nm
	, unnest(feat_impo) as feat_impo 
from rf_feat_impo_result
where grp = 2
order by feat_impo desc 
limit 10;
--grp    y_col_nm      x_col_nm      feat_impo
--2	   y2_cat	 x4	       0.07490484681851341
--2	   y2_cat	 x5	       0.04099924609654107
--2	   y2_cat	 x6	       0.03431643243509608
--2	   y2_cat	 x12           0.01474464870781392
--2	   y2_cat	 x40           0.013865405628514437
--2	   y2_cat	 x37           0.013435535581862938
--2	   y2_cat	 x167          0.013236591006394367
--2	   y2_cat	 x133  	       0.012570295279560963
--2	   y2_cat	 x142  	       0.012177597741973058
--2	   y2_cat	 x116          0.011713289042962961


-- The end. 

 

 

이번 포스팅이 많은 도움이 되었기를 바랍니다. 

행복한 데이터 과학자 되세요! :-)

 

728x90
반응형
Posted by Rfriend
,