지난번 포스팅에서는 Python의 statsmodels  모듈을 이용하여 여러개의 숫자형 변수에 대해 집단 간 평균의 차이가 있는지를 for loop 순환문을 사용하여 검정하는 방법(rfriend.tistory.com/639)을 소개하였습니다. 

 

Python에서 for loop 문을 사용하면 순차적으로 처리 (sequential processing) 를 하게 되므로, 일원분산분석을 해야 하는 숫자형 변수의 개수가 많아질 수록 선형적으로 처리 시간이 증가하게 됩니다. 

 

Greenplum에서 PL/Python (또는 PL/R) 을 사용하면 일원분산분석의 대상의 되는 숫자형 변수가 매우 많고 데이터 크기가 크더라도 분산병렬처리 (distributed parallel processing) 하여 ANOVA test 를 처리할 수 있으므로 신속하게 분석을 할 수 있는 장점이 있습니다.

 

더불어서, 데이터가 저장되어 있는 DB에서 데이터의 이동 없이(no data I/O, no data movement), In-DB 처리/분석이 되므로 work-flow 가 간소화되고 batch scheduling 하기에도 편리한 장점이 있습니다.  

 

만약 데이터는 DB에 있고, 애플리케이션도 DB를 바라보고 있고, 분석은 Python 서버 (또는 R 서버)에서 하는 경우라면, 분석을 위해 DB에서 데이터를 samfile 로 떨구고, 이를 Python에서 pd.read_csv()로 읽어들여서 분석하고, 다시 결과를 text file로 떨구고, 이 text file을 ftp로 DB 서버로 이동하고, psql로 COPY 문으로 테이블에 insert 하는 workflow ... 관리 포인트가 많아서 정신 사납고 복잡하지요?!  

 

자, 이제 Greenplum database에서 PL/Python으로 일원분산분석을 병렬처리해서 집단 간 여러개의 개별 변수별 평균 차이가 있는지 검정을 해보겠습니다. 

 

 

(1) 여러 개의 변수를 가지는 샘플 데이터 만들기

 

정규분포로 부터 난수를 발생시켜서 3개 그룹별로 각 30개 씩의 샘플 데이터를 생성하였습니다. 숫자형 변수로는 'x1', 'x2', 'x3', 'x4'의 네 개의 변수를 생성하였습니다. 이중에서  'x1', 'x2'는 3개 집단이 모두 동일한 평균과 분산을 가지는 정규분포로 부터 샘플을 추출하였고, 반면에 'x3', 'x4'는 3개 집단 중 2개는 동일한 평균과 분산의 정규분포로 부터 샘플을 추출하고 나머지 1개 집단은 다른 평균을 가지는 정규분포로 부터 샘플을 추출하였습니다. (뒤에  one-way ANOVA 검정을 해보면 'x3', 'x4'에 대한 집단 간 평균 차이가 있는 것으로 결과가 나오겠지요?!)

 

import numpy as np
import pandas as pd

# generate 90 IDs 
id = np.arange(90) + 1 

# Create 3 groups with 30 observations in each group. 
from itertools import chain, repeat 
grp = list(chain.from_iterable((repeat(number, 30) for number in [1, 2, 3]))) 

# generate random numbers per each groups from normal distribution 
np.random.seed(1004) 

# for 'x1' from group 1, 2 and 3
x1_g1 = np.random.normal(0, 1, 30) 
x1_g2 = np.random.normal(0, 1, 30) 
x1_g3 = np.random.normal(0, 1, 30) 

# for 'x2' from group 1, 2 and 3 
x2_g1 = np.random.normal(10, 1, 30) 
x2_g2 = np.random.normal(10, 1, 30) 
x2_g3 = np.random.normal(10, 1, 30) 

# for 'x3' from group 1, 2 and 3 
x3_g1 = np.random.normal(30, 1, 30) 
x3_g2 = np.random.normal(30, 1, 30) 
x3_g3 = np.random.normal(50, 1, 30) 

# different mean 
x4_g1 = np.random.normal(50, 1, 30) 
x4_g2 = np.random.normal(50, 1, 30) 
x4_g3 = np.random.normal(20, 1, 30) 

# different mean # make a DataFrame with all together 
df = pd.DataFrame({
    'id': id, 'grp': grp, 
    'x1': np.concatenate([x1_g1, x1_g2, x1_g3]), 
    'x2': np.concatenate([x2_g1, x2_g2, x2_g3]), 
    'x3': np.concatenate([x3_g1, x3_g2, x3_g3]), 
    'x4': np.concatenate([x4_g1, x4_g2, x4_g3])}) 
    

df.head()

 

id

grp

x1

x2

x3

x4

1

1

0.594403

10.910982

29.431739

49.232193

2

1

0.402609

9.145831

28.548873

50.434544

3

1

-0.805162

9.714561

30.505179

49.459769

4

1

0.115126

8.885289

29.218484

50.040593

5

1

-0.753065

10.230208

30.072990

49.601211

 

 

위에서 만든 가상의 샘플 데이터를 Greenplum DB에 'sample_tbl' 이라는 이름의 테이블로 생성해보겠습니다.  Python pandas의  to_sql()  메소드를 사용하면 pandas DataFrame을 쉽게 Greenplum DB (또는 PostgreSQL DB)에 uploading 할 수 있습니다.  

 

# creating a table in Greenplum by importing pandas DataFrame
conn = "postgresql://gpadmin:changeme@localhost:5432/demo"

df.to_sql('sample_tbl', 
         conn, 
         schema = 'public', 
         if_exists = 'replace', 
         index = False)
 

 

 

 

Jupyter Notebook에서 Greenplum DB에 접속해서 SQL로 이후 작업을 진행하겠습니다.

(Jupyter Notebook에서 Greenplum DB access 하고 SQL query 실행하는 방법은 rfriend.tistory.com/572 참조하세요)

 

-- 여기서 부터는 Jupyter Notebook에서 실행한 것입니다. --

%load_ext sql

# postgresql://Username:Password@Host:Port/Database
%sql postgresql://gpadmin:changeme@localhost:5432/demo
[Out][
'Connected: gpadmin@demo'

 

 

 

위 (1) 에서 pandas 의 to_sql() 로 importing 한 sample_tbl 테이블에서 5개 행을 조회해보겠습니다. 

 

%sql select * from sample_tbl order by id limit 5;
 * postgresql://gpadmin:***@localhost:5432/demo
5 rows affected.

[Out]
id	grp	x1	                x2	                x3	                x4
1	1	0.594403067344276	10.9109819091195	29.4317394311833	49.2321928075563
2	1	0.402608708677309	9.14583073327387	28.54887315985  	50.4345438286737
3	1	-0.805162233589535	9.71456131309311	30.5051787625131	49.4597693977764
4	1	0.115125695763445	8.88528940547472	29.2184835450055	50.0405932387396
5	1	-0.753065219532709	10.230207786414 	30.0729900069999	49.6012106088522

 

 

 

(2) 데이터 구조 변경: reshaping from wide to long

 

PL/Python에서 작업하기 쉽도록 테이블 구조를  wide format에서 long format 으로 변경하겠습니다. union all 로 해서 칼럼 갯수 만큼 위/아래로 append  해나가면 되는데요, DB 에서 이런 형식의 데이터를 관리하고 있다면 아마도 이미 long format 으로 관리하고 있을 가능성이 높습니다. (새로운 데이터가 수집되면 계속  insert into 하면서 행을 밑으로 계속 쌓아갈 것이므로...)

 

%%sql
-- reshaping a table from wide to long
drop table if exists sample_tbl_long;
create table sample_tbl_long as (
    select id, grp, 'x1' as col, x1 as val from sample_tbl
    union all 
    select id, grp, 'x2' as col, x2 as val from sample_tbl
    union all 
    select id, grp, 'x3' as col, x3 as val from sample_tbl
    union all 
    select id, grp, 'x4' as col, x4 as val from sample_tbl
) distributed randomly;

 * postgresql://gpadmin:***@localhost:5432/demo
Done.
360 rows affected.



%sql select * from sample_tbl_long order by id, grp, col limit 8;

[Out]
 * postgresql://gpadmin:***@localhost:5432/demo
8 rows affected.
id	grp	col	val
1	1	x1	0.594403067344276
1	1	x2	10.9109819091195
1	1	x3	29.4317394311833
1	1	x4	49.2321928075563
2	1	x1	0.402608708677309
2	1	x2	9.14583073327387
2	1	x3	28.54887315985
2	1	x4	50.4345438286737

 

 

 

(3) 분석 결과 반환 composite type 정의

 

일원분산분석 결과를 반환받을 때 각 분석 대상 변수 별로 (a) F-통계량, (b) p-value 의 두 개 값을 float8 데이터 형태로  반환받는 composite type 을 미리 정의해놓겠습니다. 

%%sql
-- Creating a coposite return type
drop type if exists plpy_anova_type cascade;
create type plpy_anova_type as (
    f_stat float8
    , p_val float8
);

 * postgresql://gpadmin:***@localhost:5432/demo
Done.
Done.

 

 

 

(4)  일원분산분석(one-way ANOVA) PL/Python 사용자 정의함수 정의

 

집단('grp')과 측정값('val')을 input 으로 받고, statsmodels 모듈의 sm.stats.anova_lm() 메소드로 일원분산분석을 하여 결과 테이블에서 'F-통계량'과 'p-value'만 인덱싱해서 반환하는 PL/Python 사용자 정의 함수를 정의해보겠습니다. 

 

%%sql
-- Creating the PL/Python UDF of ANOVA
drop function if exists plpy_anova_func(text[], float8[]);
create or replace function plpy_anova_func(grp text[], val float8[])
returns plpy_anova_type 
as $$
    import pandas as pd  
    import statsmodels.api as sm
    from statsmodels.formula.api import ols
    
    df = pd.DataFrame({'grp': grp, 'val': val})
    model = ols('val ~ grp', data=df).fit()
    anova_result = sm.stats.anova_lm(model, typ=1)
    return {'f_stat': anova_result.loc['grp', 'F'], 
            'p_val': anova_result.loc['grp', 'PR(>F)']}
$$ language 'plpythonu';

 * postgresql://gpadmin:***@localhost:5432/demo
Done.
Done.

 

 

 

(5) 일원분산분석(one-way ANOVA) PL/Python 함수 분산병렬처리 실행

 

PL/Python 사용자 정의함수는 SQL query 문으로 실행합니다. 이때 PL/Python 이 'F-통계량'과 'p-value'를 반환하도록 UDF를 정의했으므로 아래처럼 (plpy_anova_func(grp_arr, val_arr)).* 처럼 ().* 으로 해서 모든 결과('F-통계량' & 'p-value')를 반환하도록 해줘야 합니다. (빼먹고 실수하기 쉬우므로 ().*를 빼먹지 않도록 주의가 필요합니다)

 

이렇게 하면 변수별로 segment nodes 에서 분산병렬로 각각 처리가 되므로, 변수가 수백~수천개가 있더라도 (segment nodes가 많이 있다는 가정하에) 분산병렬처리되어 신속하게 분석을 할 수 있습니다. 그리고 결과는 바로 Greenplum DB table에 적재가 되므로 이후의 application이나 API service에서 가져다 쓰기에도 무척 편리합니다. 

 

%%sql
-- Executing the PL/Python UDF of ANOVA
drop table if exists plpy_anova_table;
create table plpy_anova_table as (
    select 
        col
        , (plpy_anova_func(grp_arr, val_arr)).*
    from (
        select
            col
            , array_agg(grp::text order by id) as grp_arr
            , array_agg(val::float8 order by id) as val_arr
        from sample_tbl_long
        group by col
    ) a
) distributed randomly;

 * postgresql://gpadmin:***@localhost:5432/demo
Done.
4 rows affected.

 

 

총 4개의 각 변수별 일원분산분석 결과를 조회해보면 아래와 같습니다. 

%%sql
select * from plpy_anova_table order by col;

[Out]
 * postgresql://gpadmin:***@localhost:5432/demo
4 rows affected.
col	f_stat	p_val
x1	0.773700830155438	0.46445029458511966
x2	0.20615939957339052	0.8140997216173114
x3	4520.512608893724	1.2379278415456727e-88
x4	9080.286130418674	1.015467388498996e-101

 

 

이번 포스팅이 많은 도움이 되었기를 바랍니다. 

행복한 데이터 과학자 되세요!  :-)

 

728x90
반응형
Posted by Rfriend
,

Greenplum DB는 여려개의 PostgreSQL DB를 합쳐놓은 shared-nothing architecture 의 MPP (Massively Parallel Processing) Database 입니다.  손과 발이 되는 여러개의 cluster nodes에 머리가 되는 Master host 가 조율/조정/지시해가면서 분산하여 병렬로 일을 시키고, 각 cluster nodes의 연산처리 결과를 master host 가 모아서 취합하여 최종 결과를 반환하는 방식으로 일을 하기 때문에 (1) 대용량 데이터도 (2) 매우 빠르게 처리할 수 있습니다. 

 

이번 포스팅에서는 여기에서 한발 더 나아가서, Procedural Language Extension (PL/X) 을 사용하여 Python, R, Java, C, Perl, SQL 등의 프로그래밍 언어를 Greenplum DB 내에서 사용하여 데이터의 이동 없이 분산 병렬처리하는 방법을 소개하겠습니다. 

 

Massively Parallel Processing through PL/Python on Greenplum DB

 

난수를 발생시켜서 만든 가상의 데이터셋을 사용하여 PL/Python으로 Random Forest의 Feature Importance를 숫자형 목표변수('y1_num') 그룹과 범주형 목표변수('y2_cat') 그룹 별로 분산병렬처리하는 간단한 예를 들어보겠습니다. 

 

(1) 가상의 데이터셋 만들기
   - group: 2개의 그룹

   - X: 각 그룹별 100개의 관측치별 200개의 숫자형 변수를 가지는 가상의 데이터셋

   - y: 숫자형 목표변수 'y1_num', 범주형 목표변수 'y2_cat'

      (a) y1_num = x1*7.0 + x2*6.0 - x3*4.0 + x4*5.0 + 0.001*random()

      (b) y2_cat = case when (x4*7.0 + x5*6.0 - x6*4.0 + x4*5.0 + 0.001*random()) >= 9 then 1 else 0
의 함수로 부터 만듦. 

(2) PL/Python 함수 정의하기
   : (a) 숫자형 목표변수('y1_num') 그룹은 Random Forest Regressor를,  (b) 범주형 목표변수('y2_cat') 그룹은 Random Forest Classifier 를 각 그룹별로 분산병렬로 훈련시킨 후,
   : 각 그룹별 Random Forest Regressor 모델별 200개의 숫자형 변수별 Feature Importance를 반환

(3) PL/Python 함수 실행하기 

(4) 각 그룹별 변수별 Random Forest 의 Feature Importance 를 조회하기

 

를 해 보겠습니다. 

 

 

 

(1) 가상의 예제 데이터셋 만들기

   - group: 2개의 그룹

         (목표변수로 하나는 숫자형, 하나는 범주형 값을 가지는 2개의 X&y 데이터셋 그룹을 생성함.)

 

   - X: 각 그룹별 100개의 관측치별 200개의 숫자형 변수를 가지는 가상의 데이터셋

         (SQL의 random() 함수로 0~1 사이의 난수 실수값을 생성함.)

 

   - y: 숫자형 목표변수 'y1_num', 범주형 목표변수 'y2_cat'

       (a) y1_num = x1*7.0 + x2*6.0 - x3*4.0 + x4*5.0 + 0.001*random()

       (b) y2_cat = case when (x4*7.0 + x5*6.0 - x6*4.0 + x4*5.0 + 0.001*random()) >= 9 then 1 else 0 

             (y1_num, y2_cat 값을 만들때 x 변수에 곱하는데 사용한 *7.0, *6.0, *5.0 은 아무런 의미 없습니다.

             그냥 가상의 예제 샘플 데이터를 만들려고 임의로 선택해서 곱해준 값입니다.)

 

아래의 예제는 In-DB 처리를 염두에 두고, 200개의 숫자형 X 변수들과 한개의 숫자형 y 변수를 DB의 테이블에 "col_nm"이라는 칼럼에는 변수 이름을, "col_val"에는  변수 값을 long form 으로 생성해서 저장해 놓은 것입니다. 

 

나중에 PL/Python의 함수 안에서 pandas 의 pivot_table() 함수를 사용해서 wide form 으로 DataFrame의 형태를 재구조화해서 random forest 를 분석하게 됩니다. 

 

제 맥북에서 도커로 만든 Greenplum 에는 1개의 master node와 2개의 segment nodes 가 있는데요, 편의상 cross join 으로 똑같은 칼럼과 값을 가지는 설명변수 X 데이터셋을 2의 segment nodes에 replication 해서 그룹1('grp=1'), 그룹2('grp=2')를 만들었습니다.

 

그리고 여기에 목표변수 y 값으로는 숫자형 목표변수 'y1_num' 칼럼의 값에 대해서는 그룹1('grp=1'), 범주형 목표변수 'y2_cat' 칼럼의 값에 대해서는 그룹2('grp=2')를 부여한 후에, 앞서 만든 설명변수 X 데이터셋에 union all 로 'y1_num'과 'y2_cat' 데이터를 합쳐서 최종으로 하나의 long format의 테이블을 만들었습니다. 

 

첫번째 그룹은 200개의 숫자형 X 변수 중에서 'x1', 'x2', 'x3'의 3개 변수만 숫자형 목표변수(numeric target variable) 인 'y1_num'과 관련이 있고, 나머지 194개의 설명변수와는 관련이 없게끔 y1_num = x1*7.0 + x2*6.0 + x3*5.0 + 0.001*random() 함수를 사용해서 'y1_num' 을 만들었습니다 (*7.0, *6.0, *5.0 은 가상의 예제 데이터를 만들기 위해 임의로 선택한 값으로서, 아무 이유 없습니다).  뒤에서 PL/Python으로 Random Forest Regressor 의 feature importance 결과에서 'x1', 'x2', 'x3' 변수의 Feature Importance 값이 높게 나오는지 살펴보겠습니다.  

 

두번째 그룹은 200개의 숫자형 X변수 중에서 'x4', 'x5', 'x6'의 3개 변수만 범주형 목표변수(categorical target variable) 인 'y2_cat'과 관련이 있고, 나머지 194개의 설명변수와는 연관이 없게끔 y2_cat = case when (x4*7.0 + x5*6.0 - x6*4.0 + x4*5.0 + 0.001*random()) >= 9 then 1 else 0 함수로 부터 가상으로 생성하였습니다. 뒤에서 PL/Python으로 Random Forest Classifier 의 feature importance 결과에서 'x4', 'x5', 'x6' 변수의 Feature Importance 값이 높게 나오는지 살펴보겠습니다.  

 

------------------------------------------------------------------
-- Random Forest's Feature Importance using PL/Python on Greenplum
------------------------------------------------------------------

-- (1) Generate sample data 
-- 2 groups
-- 100 observations(ID) per group
-- X: 200 numeric input variables per observation(ID)
-- y : a numeric target variable by a function of y = x1*5.0 + x2*4.5 - x3*4.0 + x4*3.5 + 0.001*random()
-- distributed by 'grp' (group)

-- (1-1) 100 IDs of observations
drop table if exists id_tmp;
create table id_tmp (
	id integer
) distributed randomly;

insert into id_tmp (select * from generate_series(1, 100, 1));

select * from id_tmp order by id limit 3;
--id
--1
--2
--3


-- (1-2) 200 X variables
drop table if exists x_tmp;
create table x_tmp (
	x integer
) distributed randomly;

insert into x_tmp (select * from generate_series(1, 200, 1));

select * from x_tmp order by x limit 3;
--x
--1
--2
--3



-- (1-3) Cross join of ID and Xs
drop table if exists id_x_tmp;
create table id_x_tmp as (
	select * from id_tmp 
	cross join x_tmp 
) distributed randomly;

select count(1) from id_x_tmp; 
-- 20,000  -- (id 100 * x 200 = 20,000)

select * from id_x_tmp order by id, x limit 3;
--id  x
--1	  1
--1	  2
--1	  3



-- (1-4) Generate X values randomly
drop table if exists x_long_tmp;
create table x_long_tmp as (
	select 
		a.id as id
		, x
		, 'x'||a.x::text as x_col
		, round(random()::numeric, 3) as x_val 
	from id_x_tmp a
) distributed randomly;

select count(1) from x_long_tmp; 
-- 20,000

select * from x_long_tmp order by id, x limit 3;
--id  x  x_col  x_val
--1	  1	 x1	    0.956
--1	  2	 x2	    0.123
--1	  3	 x3	    0.716

select min(x_val) as x_min_val, max(x_val) as x_max_val from x_long_tmp;
--x_min_val  x_max_val
--0.000	     1.000



-- (1-5) create y values
drop table if exists y_tmp;
create table y_tmp as (
	select 
		s.id
		, (s.x1*7.0 + s.x2*6.0 + s.x3*5.0 + 0.001*random()) as y1_num -- numeric
		, case when (s.x4*7.0 + s.x5*6.0 + s.x6*5.0 + 0.001*random()) >= 9 
			then 1 
			else 0 
			end as y2_cat -- categorical
	from (
		select distinct(a.id) as id, x1, x2, x3, x4, x5, x6 from x_long_tmp as a
		left join (select id, x_val as x1 from x_long_tmp where x_col = 'x1') b 
			on a.id = b.id
		left join (select id, x_val as x2 from x_long_tmp where x_col = 'x2') c 
			on a.id = c.id 
		left join (select id, x_val as x3 from x_long_tmp where x_col = 'x3') d 
			on a.id = d.id 
		left join (select id, x_val as x4 from x_long_tmp where x_col = 'x4') e 
			on a.id = e.id
		left join (select id, x_val as x5 from x_long_tmp where x_col = 'x5') f 
			on a.id = f.id
		left join (select id, x_val as x6 from x_long_tmp where x_col = 'x6') g 
			on a.id = g.id
	) s
) distributed randomly;

select count(1) from y_tmp;
--100

select * from y_tmp order by id limit 5;
--id  y1_num            y2_cat
--1	11.0104868695838	1
--2	10.2772997177048	0
--3	7.81790575686749	0
--4	8.89387259676540	1
--5	2.47530914815422	1




--  (1-6) replicate X table to all clusters 
--        by the number of 'y' varialbes. (in this case, there are 2 y variables, 'y1_num' and 'y2_cat'
drop table if exists long_x_grp_tmp;
create table long_x_grp_tmp as (
	select 
		b.grp as grp
		, a.id as id
		, a.x_col as col_nm
		, a.x_val as col_val
	from x_long_tmp as a
	cross join (
		select generate_series(1, c.y_col_cnt) as grp
		from (
			select (count(distinct column_name) - 1) as y_col_cnt 
			from information_schema.columns 
				where table_name = 'y_tmp' and table_schema = 'public') c
		) as b -- number of clusters
) distributed randomly;


select count(1) from long_x_grp_tmp;
-- 40,000   -- 2 (y_col_cnt) * 20,000 (x_col_cnt)

select * from long_x_grp_tmp order by id limit 5;
--grp  id   col_nm  col_val
--1	1	x161	0.499
--2	1	x114	0.087
--1	1	x170	0.683
--2	1	x4	    0.037
--2	1	x45	    0.995



-- (1-7) create table in long format with x and y 
drop table if exists long_x_y;
create table long_x_y as (
	select x.*
	from long_x_grp_tmp as x
	union all 
	select 1::int as grp, y1.id as id, 'y1_num'::text as col_nm, y1.y1_num as col_val 
	from y_tmp as y1 
	union all 
	select 2::int as grp, y2.id as id, 'y2_cat'::text as col_nm, y2.y2_cat as col_val
	from y_tmp as y2
) distributed randomly;

select count(1) from long_x_y; 
-- 40,200 (x 40,000 + y1_num 100 + y2_cat 100)

select grp, count(1) from long_x_y group by 1 order by 1;
--grp  count
--1	   20100
--2	   20100

select * from long_x_y where grp=1 order by id, col_nm desc limit 5;
--grp  id   col_nm  col_val
--1	   1	y1_num	11.010
--1	   1	x99	     0.737
--1	   1	x98	     0.071
--1	   1	x97	     0.223
--1	   1	x96	     0.289

select * from long_x_y where grp=2 order by id, col_nm desc limit 5;
--grp  id   col_nm  col_val
--2	   1	y2_cat	1.0
--2	   1	x99	    0.737
--2	   1	x98	    0.071
--2	   1	x97	    0.223
--2	   1	x96	    0.289


-- drop temparary tables
drop table if exists id_tmp;
drop table if exists x_tmp;
drop table if exists id_x_tmp;
drop table if exists x_long_tmp;
drop table if exists y_tmp;
drop table if exists long_x_grp_tmp;


 

 

 

(2) PL/Python 사용자 정의함수 정의하기

 

- (2-1) composite return type 정의하기

 

PL/Python로 분산병렬로 연산한 Random Forest의 feature importance (또는 variable importance) 결과를 반환할 때 텍스트 데이터 유형의 '목표변수 이름(y_col_nm)', '설명변수 이름(x_col_nm)'과 '변수 중요도(feat_impo)' 의 array 형태로 반환하게 됩니다. 반환하는 데이터가 '텍스트'와 'float8' 로 서로 다른 데이터 유형이 섞여 있으므로 composite type 의 return type 을 만들어줍니다. 그리고 PL/Python은 array 형태로 반환하므로 text[], text[] 과 같이 '[]' 로서 array 형태로 반환함을 명시합니다.  

-- define composite return type
drop type if exists plpy_rf_feat_impo_type cascade;
create type plpy_rf_feat_impo_type as (
	y_col_nm text[]
	, x_col_nm text[]
	, feat_impo float8[]
);

 

 

 

- (2-2) Random Forest feature importance 결과를 반환하는 PL/Python 함수 정의하기

 

PL/Python 사용자 정의 함수를 정의할 때는 아래와 같이 PostgreSQL의 Procedural Language 함수 정의하는 표준 SQL 문을 사용합니다. 

input data 는 array 형태이므로 칼럼 이름 뒤에 데이터 유형에는 '[]'를 붙여줍니다. 

중간의  $$ ... python code block ... $$ 부분에 pure python code 를 넣어줍니다. 

제일 마지막에 PL/X 언어로서 language 'plpythonu' 으로 PL/Python 임을 명시적으로 지정해줍니다. 

 

create or replace function function_name(column1  data_type1[], column2 data_type2[], ...) 
returns return_type as $$
    ... python code block ...
$$ language 'plpythonu';

 

 

만약 PL/Container 를 사용한다면 명령 프롬프트 창에서 아래처럼 $ plcontainer runtime-show  로 Runtime ID를 확인 한 후에,   

[gpadmin@mdw ~]$ plcontainer runtime-show
PL/Container Runtime Configuration:
---------------------------------------------------------
  Runtime ID: plc_python3_shared
  Linked Docker Image: pivotaldata/plcontainer_python3_shared:devel
  Runtime Setting(s):
  Shared Directory:
  ---- Shared Directory From HOST '/usr/local/greenplum-db/./bin/plcontainer_clients' to Container '/clientdir', access mode is 'ro'
---------------------------------------------------------

 

 

PL/Python 코드블록의 시작 부분에 $$ # container: container_Runtime_ID  로서 사용하고자 하는 docker container 의 runtime ID를 지정해주고, 제일 마지막 부분에 $$ language 'plcontainer'; 로 확장 언어를 'plcontainer'로 지정해주면 됩니다.  PL/Container를 사용하면 최신의 Python 3.x 버전을 사용할 수 있는 장점이 있습니다. 


create or replace function function_name(column1  data_type1[], column2 data_type2[], ...) 
returns return_type as
$$
# container: plc_python3_shared
... python code block ...
$$ LANGUAGE 'plcontainer'; 

 

 

아래 코드에서는 array 형태의 'id', 'col_nm', 'col_val'의 3개 칼럼을 input 으로 받아서 먼저 pandas DataFrame으로 만들어 준 후에, 이를 pandas pivot_table() 함수를 사용해서 long form --> wide form 으로 데이터를 재구조화 해주었습니다. 

 

다음으로, 숫자형의 목표변수('y1_num')를 가지는 그룹1 데이터셋에 대해서는 sklearn 패키지의 RandomForestRegressor 클래스를 사용해서 Random Forest 모델을 훈련하고, 범주형의 목표변수('y2_cat')를 가지는 그룹2의 데이터셋에 대해서는 sklearn 패키지의 RandomForestClassifier 클래스를 사용하여 모델을 훈련하였습니다. 그리고  'rf_regr_fitted.feature_importances_' , 'rf_clas_fitted.feature_importances_'를 사용해서 200개의 각 변수별 feature importance 속성을 리스트로 가져왔습니다. 

 

마지막에 return {'y_col_nm': y_col_nm, 'x_col_nm': x_col_nm_list, 'feat_impo': feat_impo} 에서 전체 변수 리스트와 변수 중요도 연산 결과를 array 형태로 반환하게 했습니다. 

 

----------------------------------
-- PL/Python UDF for Random Forest
----------------------------------

-- define PL/Python function
drop function if exists plpy_rf_feat_impo_func(text[], text[], text[]);
create or replace function plpy_rf_feat_impo_func(
	id_arr text[]
	, col_nm_arr text[]
	, col_val_arr text[]
) returns plpy_rf_feat_impo_type as 
$$
#import numpy as np 
import pandas as pd

# making a DataFrame
xy_df = pd.DataFrame({
    'id': id_arr
    , 'col_nm': col_nm_arr
    , 'col_val': col_val_arr
})

# pivoting a table
xy_pvt = pd.pivot_table(xy_df
                        , index = ['id']
                        , columns = 'col_nm'
                        , values = 'col_val'
                        , aggfunc = 'first'
                        , fill_value = 0)

X = xy_pvt[xy_pvt.columns.difference(['y1_num', 'y2_cat'])]
X = X.astype(float)
x_col_nm_list = X.columns

# UDF for Feature Importance by RandomForestRegressor
def rf_regr_feat_impo(X, y):
	
	# training RandomForestRegressor
	from sklearn.ensemble import RandomForestRegressor
	rf_regr = RandomForestRegressor(n_estimators=200)
	rf_regr_fitted = rf_regr.fit(X, y)
	
	# The impurity-based feature importances.
	rf_regr_feat_impo = rf_regr_fitted.feature_importances_
	return rf_regr_feat_impo


# UDF for Feature Importance by RandomForestClassifier
def rf_clas_feat_impo(X, y):
	
	# training  RandomForestClassifier with balanced class_weight
	from sklearn.ensemble import RandomForestClassifier
	rf_clas = RandomForestClassifier(n_estimators=200, class_weight='balanced')
	rf_clas_fitted = rf_clas.fit(X, y)
	
	# The impurity-based feature importances.
	rf_clas_feat_impo = rf_clas_fitted.feature_importances_
	return rf_clas_feat_impo
	
	
# training RandomForest and getting variable(feature) importance
if 'y1_num' in xy_pvt.columns:
	y_target = 'y1_num'
	y = xy_pvt[y_target]
	feat_impo = rf_regr_feat_impo(X, y)

if 'y2_cat' in xy_pvt.columns:
	y_target = 'y2_cat'
	y = xy_pvt[y_target]
	y = y.astype(int)
	feat_impo = rf_clas_feat_impo(X, y)

feat_impo_df = pd.DataFrame({
	'y_col_nm': y_target
	, 'x_col_nm': x_col_nm_list
	, 'feat_impo': feat_impo
})

# returning the results of feature importances
return {
	'y_col_nm': feat_impo_df['y_col_nm'] 
	, 'x_col_nm': feat_impo_df['x_col_nm']
	, 'feat_impo': feat_impo_df['feat_impo']
	}
    
$$ language 'plpythonu';

 

 

 

(3) PL/Python 함수 실행하기 

 

PL/Python 함수를 실행할 때는 표준 SQL Query 문의 "SELECT group_name, pl_python_function() FROM table_name" 처럼 함수를 SELECT 문으로 직접 호출해서 사용합니다. 

 

PL/Python의 input 으로 array 형태의 데이터를 넣어주므로, 아래처럼 FROM 절의 sub query 에 array_agg() 함수로 먼저 데이터를 'grp' 그룹 별로 array aggregation 하였습니다. 

 

PL/Python 함수의 전체 결과를 모두 반환할 것이므로 (plpy_rf_var_impo_func()).* 처럼 함수를 모두 감싼 후에 ().* 를 사용하였습니다. (실수해서 빼먹기 쉬우므로 유의하시기 바랍니다.)

 

목표변수가 숫자형('y1_num')과 범주형('y2_cat')'별로 그룹1과 그룹2로 나누어서, 'grp' 그룹별로 분산병렬로 Random Forest 분석이 진행되며, Variable importance 결과를 'grp' 그룹 ID를 기준으로 분산해서 저장(distributed by (grp);)하게끔 해주었습니다.   

 

-- execute PL/Python function
drop table if exists rf_feat_impo_result;
create table rf_feat_impo_result as (
	select 
		a.grp 
		, (plpy_rf_feat_impo_func(
			a.id_arr
			, a.col_nm_arr
			, a.col_val_arr
		)).* 
		from (
			select 
				c.grp 
				, array_agg(c.id::text order by id) as id_arr
				, array_agg(c.col_nm::text order by id) as col_nm_arr
				, array_agg(c.col_val::text order by id) as col_val_arr
			from long_x_y as c
			group by grp
			) a
) distributed by (grp);

 

 

 

(4) 각 그룹별 변수별 Random Forest 의 Feature Importance 조회하기

 

위의 (3)번을 실행해서 나온 결과를 조회하면 아래와 같이 'grp=1', 'grp=2' 별로 각 칼럼별로 Random Forest에 의해 계산된 변수 중요도(variable importance) 가 array 형태로 저장되어 있음을 알 수 있습니다. 

 

select count(1) from rf_feat_impo_result; 
-- 2

-- results in array-format
select * from rf_feat_impo_result order by grp;

plpython_random_forest_feature_importance_array

 

위의 array 형태의 결과는 사람이 눈으로 보기에 불편하므로, unnest() 함수를 써서 long form 으로 길게 풀어서 결과를 조회해 보겠습니다. 

 

이번 예제에서는 난수로 생성한 X설명변수에 임의로 함수를 사용해서 숫자형 목표변수('y1_num')를 가지는 그룹1에 대해서는 'x1', 'x2', 'x3' 의 순서대로 변수가 중요하고, 범주형 목표변수('y2_cat')를 가지는 그룹2에서는  'x4', 'x5', 'x6'의 순서대로 변수가 중요하게 가상의 예제 데이터셋을 만들어주었습니다.  (random() 함수로 난수를 생성해서 예제 데이터셋을 만들었으므로, 매번 실행할 때마다 숫자는 달라집니다). 

 

아래 feature importance 결과를 보니, 역시 그룹1의 데이터셋에 대해서는 'x1', 'x2', 'x3' 변수가 중요하게 나왔고, 그룹2의 데이터셋에 대해서는 'x4', 'x5', 'x6' 변수가 중요하다고 나왔네요. 

 

-- display the results using unnest()
select 
	grp
	, unnest(y_col_nm) as y_col_nm
	, unnest(x_col_nm) as x_col_nm
	, unnest(feat_impo) as feat_impo 
from rf_feat_impo_result
where grp = 1
order by feat_impo desc 
limit 10;
--grp    y_col_nm      x_col_nm      feat_impo
--1	   y1_num	 x1	       0.4538784064497847
--1	   y1_num	 x2	       0.1328532144509229
--1	   y1_num	 x3	       0.10484121806286809
--1	   y1_num	 x34           0.006843343319633915
--1	   y1_num	 x42           0.006804819286213849
--1	   y1_num	 x182          0.005771113354638556
--1	   y1_num	 x143          0.005220090515711377
--1	   y1_num	 x154          0.005101366229848041
--1	   y1_num	 x46           0.004571420249598611
--1	   y1_num	 x57           0.004375780774099066

select 
	grp
	, unnest(y_col_nm) as y_col_nm
	, unnest(x_col_nm) as x_col_nm
	, unnest(feat_impo) as feat_impo 
from rf_feat_impo_result
where grp = 2
order by feat_impo desc 
limit 10;
--grp    y_col_nm      x_col_nm      feat_impo
--2	   y2_cat	 x4	       0.07490484681851341
--2	   y2_cat	 x5	       0.04099924609654107
--2	   y2_cat	 x6	       0.03431643243509608
--2	   y2_cat	 x12           0.01474464870781392
--2	   y2_cat	 x40           0.013865405628514437
--2	   y2_cat	 x37           0.013435535581862938
--2	   y2_cat	 x167          0.013236591006394367
--2	   y2_cat	 x133  	       0.012570295279560963
--2	   y2_cat	 x142  	       0.012177597741973058
--2	   y2_cat	 x116          0.011713289042962961


-- The end. 

 

 

이번 포스팅이 많은 도움이 되었기를 바랍니다. 

행복한 데이터 과학자 되세요! :-)

 

728x90
반응형
Posted by Rfriend
,

지난번 포스팅 (rfriend.tistory.com/631) 에서는 Greenplum DB에서 PostGIS 를 사용하여 좌표 인식, 변환, 계산을 하기 위해서 필요한 좌표계 참조 테이블인 PostGIS의 spatial_ref_sys 테이블을 모든 segment nodes 에 복제하는 방법을 소개하였습니다. 

 

이번 포스팅에서는 PostGIS의 원래의 spatial_ref_sys 테이블에는 없는 좌표계 (Coordinate Reference System) 를 사용해서 두 개의 좌표계간 변환을 하는 방법을 소개하겠습니다. 

 

(방법 1) spatial_ref_sys 테이블에 새로운 좌표계를 등록하고 (즉, insert into spatial_ref_sys 테이블),

            --> PostGIS의 ST_Transform() 함수를 사용해서 좌표계 간 변환하기

(방법 2) PostGIS_Transform_Geometry() 함수를 사용해서 직접 새로운 좌표계로 변환하기

 

 

transforming Coordinate Reference System in PostGIS, PostgreSQL, Greenplum

 

예로서 KATECH 좌표계와  WGS84 좌표계 간 변환하는 방법을 소개하겠습니다.

참고로 KATECH 좌표계는 국내의 포털 사이트의 지도 API 서비스나 국내 네비게이션에서 사용하고 있는 비공식 좌표계로서, 원점으로 위도 38도, 경도 128도의 단일원점을 사용하기 때문에 TM128 좌표계라고도 합니다. KATECH 좌표계는 국내 CNS(자동차 항법장치)를 위해서 만들어진 것으로 3개(서부,중부,동부)의 투영원점을 위도38도, 경도127도 30분의 단일원점으로 통합한 것입니다. (* 출처: hmjkor.tistory.com/377).

 

 

(방법 1)  (step 1) spatial_ref_sys 테이블에 새로운 좌표계를 등록하고 (즉, insert into spatial_ref_sys 테이블),

            --> (step 2) PostGIS의 ST_Transform() 함수를 사용해서 좌표계 간 변환하기

 

(step 1) spatial_ref_sys 테이블에  미등록되어 있는 KATECH 좌표계를 등록하기 

 

* 사전에 Greenplum DB 의 모든 segment nodes에 spatial_ref_sys 테이블이 복제(replication)되어 있어야 합니다.

  (참고: rfriend.tistory.com/631)  

* 아래처럼 insert into 쿼리 구문으로 좌표계 정보를 spatial_ref_sys 테이블에 추가해줍니다. 이때, SRID 번호는 사용자가 알아서 지정해주면 됩니다. 

* 아래 KATECH 좌표계의 정보는 다시 한번 각 사용처에서 사용하고 있는 것과 동일한 것인지 다시 한번 확인해보시기 바랍니다.(critical 한 정보이므로 반드시 아래 숫자 맞는지 다시 한번 double check 해보세요!) 

 

-- inserting KATECH CRS into spatial_ref_sys table
INSERT into spatial_ref_sys (srid, auth_name, auth_srid, proj4text, srtext) 
values (
    10000, 
    'sr-org', 
    8030, 
    '+proj=tmerc +lat_0=38 +lon_0=128 +k=0.9999 +x_0=400000 +y_0=600000 +ellps=bessel +towgs84=-145.907,505.034,685.756,-1.162,2.347,1.592,6.342 +units=m +no_defs', 
    'PROJCS["Katech",GEOGCS["Bessel 1841",
    DATUM["unknown",SPHEROID["bessel",6377397.155,299.1528128],
    TOWGS84[-145.907,505.034,685.756,-1.162,2.347,1.592,6.342]],PRIMEM["Greenwich",0],
    UNIT["degree",0.0174532925199433]],
    PROJECTION["Transverse_Mercator"],
    PARAMETER["latitude_of_origin",38],
    PARAMETER["central_meridian",128],
    PARAMETER["scale_factor",0.9999],
    PARAMETER["false_easting",400000],
    PARAMETER["false_northing",600000],
    UNIT["Meter",1]]'
);   

 

 

(step 2)  PostGIS 의 ST_Transform() 함수를 사용해서  WGS 84 (SRID 4326) 좌표계를 KATECH (SRID 10000, 위에서 지정한 번호로서, SRID 번호는 지정하기 나름임) 좌표계로 변환. 

 

-- (1st method) using ST_Transform() and CRS information from spatial_ref_sys table
SELECT ST_Transform(ST_SetSRID(ST_Point(-123.365556, 48.428611), 4326), 10000) AS katech_geom;

 

 

 

(방법 2) PostGIS_Transform_Geometry() 함수를 사용해서 직접 새로운 좌표계로 변환하기

 

두번째는 spatial_ref_sys 테이블에 새로운 좌표계를 등록할 필요없이, PostGIS_Transform_Geometry() 함수안에 (a) 기존의 변환하려는 대상 좌표계 : proj_from, (b) 앞으로 변환하려고 하는 기준 좌표계 : proj_to 를 직접 넣어주는 방식입니다. 

위의 (방법 1)에서 사용한 PostGIS의 ST_Transform() 함수의 소스 코드를 까서 살펴보면 그 안에 PostGIS_Transform_Geometry() 함수를 사용하고 있습니다. 

 

아래의 예는 table_with_geometry 라는 from 절의 테이블에서 x_axis, y_axis 의 경도, 위도 좌표를 가져와서 25를 더하고 뺀 값을  ST_MakeEnvelope() 함수로 만든 사각형 Polygon 의 좌표를  WGS 84 에서 KATECH 좌표계로 PostGIS_Transform_Geometry() 함수를 써서 변환해본 것입니다. 

 

이렇게 수작업으로 하면 Greenplum DB의 각 segment nodes에 복제되어 있는 spatial_ref_sys 테이블에 (방법 1 - a) 처럼 KATECH 좌표계를 미리 등록 (insert into) 하지 않고 바로 사용할 수 있는 장점이 있습니다. 하지만, 이런 좌표계 변환을 여러 사용처에서, 반복적으로 해야 하는 경우라면 매번 이렇게 복잡하게 좌표계 정보를 직접 입력하는 것은 번거로고 어려운 뿐만 아니라, human error 를 유발할 위험도 다분히 있기 때문에 추천하지는 않습니다. 

 

-- (2nd Method) using PostGIS_Transform_Geometry() function directly

SELECT  
    PostGIS_Transform_Geometry(
        -- geom
        ST_Union(ST_MakeEnvelope(x_axis::integer -25, y_axis::integer - 25, x_axis::integer + 25, y_axis::integer + 25, 6645))
        -- proj_from
        , ‘+proj=tmerc +lat_0=38 +lon_0=128 +k=0.9999 +x_0=400000 +y_0=600000 _ellps=bessel +units=m +no_defs +towgs84=-115.80,474.99,674.11,1.16,-2.31,-1.63,6.43’
        -- proj_to
        , ‘+proj=tmerc +lat_0=38 +lon_0=128 +k=0.9999 +x_0=400000 +y_0=600000 +ellps=bessel +towgs84=-145.907,505.034,685.756,-1.162,2.347,1.592,6.342 +units=m +no_defs’
        -- SRID
        , 5179)
FROM table_with_geometry …;

 

 

이번 포스팅이 많은 도움이 되었기를 바랍니다.

행복한 데이터 과학자 되세요! :-)

 

728x90
반응형
Posted by Rfriend
,

PostgreSQL database에서 오픈소스 PostGIS 를 사용하여 지리공간 데이터 변환, 연산 및 분석을 하는데 있어 첫번째로 챙겨야 하는 것이 있다면 지리공간의 기준이 되는 좌표계(Coordinate Reference System)인  SRID (Spatial Reference IDentifier) 를 spatial_ref_sys 테이블의 값으로 설정해주는 것입니다.  

 

SRID 별로 지리공간 좌표 참조 정보가 들어있는 spatial_ref_sys 테이블은 아래와 같이 srid, auth_name, auth_srid, srtext, proj4text 의 칼럼으로 구성되어 있습니다. 

 

예를 들어서, WGS(World Geodetic System) 84 좌표계는 SRID = 4326 으로 조회를 하면 됩니다. 

 

 

 

SRID 설정은 PostGIS의  ST_SetSRID() 함수를 사용합니다. 

SELECT ST_SetSRID(ST_Point(-123.365556, 48.428611), 4326) AS wgs84long_lat; 
wgs84long_lat
-------------------------
POINT(-123.365556 48.428611)

 

 

PostgreSQL DB 에서 PostGIS 를 사용해서 두 개의 좌표계 간에 좌표 변환을 하려면  ST_Transform() 함수를 사용하면 간단하게 좌표 변환을 할 수 있습니다. 그런데 만약 Greenplum DB 에서 PostGIS 의 좌표변환 함수인 ST_Transform() 함수를 사용한다면 아래와 같은 에러가 발생할 것입니다. (몇 년 전에 POC를 하는데 아래 에러 해결하려고 workaround 만드느라 반나절 고생했던게 생각나네요. ^^;;;)

 

"ERROR: function cannot execute on QE slice because it accesses relation spatial_ref_sys" 

 

이런 에러가 발생하는 이유는 Greenplum database의 경우 PostgreSQL 엔진을 기반으로 하고 있지만,  Shared nothing 아키텍쳐의 MPP (Massively Parallel Processing) database 로서, ST_Transform() 함수 실행 시 각 segment nodes 가 spatial_ref_sys 좌표계 테이블을 참조하지 못하기 때문입니다. Greenplum DB 에서 이 에러를 해결하기 위해서는 spatial_ref_sys 좌표계 정보가 들어있는 테이블을 여러개의 segment nodes에 복제를 해주면 됩니다.  

 

Greenplum 6.x 버전 부터는 'DISTRIBUTED REPLICATED' 를 사용해서 쉽게 테이블을 각 segment nodes 에 복제할 수 있습니다.  (Greenplum 5.x 버전에서는  CROSS JOIN 을 사용해서 각 segment nodes 에 복제해주면 됩니다.)

 

 

[ Greenplum 6.x 버전에서 spatial_ref_sys 테이블을 각 segment nodes 에 복제하여 생성하는 절차 ]

 

(1) 기존의 spatial_ref_sys 테이블을 spatial_ref_sys_old 로 테이블 이름을 바꿔줍니다. 

(2) spatial_ref_sys 라는 이름의 테이블을 'DISTRIBUTED REPLICATED' 모드로 해서 각 segment nodes에 복제해서 생성해줍니다. 이때 칼럼 이름과 속성, 제약조건은 아래의 SQL query 를 그래도 복사해서 사용하시면 됩니다. 

(3) 위의 (1)번에서 이름을 바꿔놓았던 기존의 테이블인  spatial_ref_sys_old 테이블에서 SELECT 문으로 데이터를 조회해와서 새로 각 테이블에 복제 모드로 생성해놓은  spatial_ref_sys 테이블에 데이터를 삽입해줍니다. 

 

-- (1) changing the spatial_ref_sys table's name
ALTER TABLE spatial_ref_sys RENAME TO spatial_ref_sys_old;


-- (2) creating spatial_ref_sys table using DISTRIBUTED REPLICATED
CREATE TABLE spatial_ref_sys(
    srid int4 NOT NULL
    , auth_name VARCHAR(256) NULL
    , auth_srid INT4 NULL
    , srtext VARCHAR(2048) NULL
    , proj4text TEXT NOT NULL
    , CONSTRAINT spatial_ref_sys_pkey_1 PRIMARY KEY (srid)
    , CONSTRAINT spatial_ref_sys_srid_check_1 CHECK (((srid > 0) AND (srid <= 998999)))
)
DISTRIBUTED REPLICATED;


-- (3) inserting spatial_ref_sys_old data inot replicated segments' tables
INSERT INTO spatial_ref_sys SELECT * FROM spatial_ref_sys_old;

 

 

 

이제 spatial_ref_sys 테이블이 모든 segment nodes에 복제가 되었으니 Greenplum DB에서 PostGIS의 ST_Transform() 함수를 사용해서 좌표계 SRID 4326 (WGS 84) 인 데이터를 좌표계 SRID 3785 로 변환해보겠습니다. 

 

-- Mark a point as WGS 84 long lat and then transform to web mercator (Spherical Mercator) --
SELECT ST_Transform(ST_SetSRID(ST_Point(-123.365556, 48.428611), 4326), 3785) AS spere_merc; 

spere_merc
---------------------------------
SRID=3785;POINT(-13732990.8753491 6178458.96425423)

 

 

이번 포스팅이 많은 도움이 되었기를 바랍니다. 

행복한 데이터 과학자 되세요! :-)

 

728x90
반응형
Posted by Rfriend
,
지난번 포스팅에서는 PostgreSQL, Greenplum database에서 SQL과 Apache MADlib을 활용해서 대용량의 연속형 데이터에 대한 In-DB 상관관계 분석(Correlation Analysis in Database)에 대해서 알아보았습니다.

이번 포스팅에서는 상관관계 분석에서 한발 더 나아가서, 설명변수(독립변수) X와 목표변수(종속변수) Y 간의 선형/인과관계를 모델링하는 선형 회귀모형(Linear Regression)을 훈련(train)하고 예측(predict)하는 방법을 소개하겠습니다.

PostgreSQL, Greenplum database에서 대용량 데이터에 대해
(1) SQL로 선형 회귀모형 적합하고 모수 확인하기
(2) SQL로 그룹별로 선형 회귀모형 적합하고 예측하기
(3) Apache MADlib으로 다중 선형 회귀모형 적합하기
(4) Apache MADlib으로 그룹별로 다중 선형 회귀모형 적합하고 예측하기




먼저, 예제로 사용할 데이터로 4개의 연속형 데이터('sepal_length', 'sepal_width', 'petal_length', 'petal_width')와 1개의 범주형 데이터('class_name')를 가진 iris 데이터셋으로 테이블을 만들어보겠습니다.



-- Iris data table
DROP TABLE IF EXISTS iris;
CREATE TABLE iris (id INT, sepal_length FLOAT, sepal_width FLOAT,
                    petal_length FLOAT, petal_width FLOAT,
                   class_name text);
INSERT INTO iris VALUES
(1,5.1,3.5,1.4,0.2,'Iris-setosa'),
(2,4.9,3.0,1.4,0.2,'Iris-setosa'),
(3,4.7,3.2,1.3,0.2,'Iris-setosa'),
(4,4.6,3.1,1.5,0.2,'Iris-setosa'),
(5,5.0,3.6,1.4,0.2,'Iris-setosa'),
(6,5.4,3.9,1.7,0.4,'Iris-setosa'),
(7,4.6,3.4,1.4,0.3,'Iris-setosa'),
(8,5.0,3.4,1.5,0.2,'Iris-setosa'),
(9,4.4,2.9,1.4,0.2,'Iris-setosa'),
(10,4.9,3.1,1.5,0.1,'Iris-setosa'),
(11,7.0,3.2,4.7,1.4,'Iris-versicolor'),
(12,6.4,3.2,4.5,1.5,'Iris-versicolor'),
(13,6.9,3.1,4.9,1.5,'Iris-versicolor'),
(14,5.5,2.3,4.0,1.3,'Iris-versicolor'),
(15,6.5,2.8,4.6,1.5,'Iris-versicolor'),
(16,5.7,2.8,4.5,1.3,'Iris-versicolor'),
(17,6.3,3.3,4.7,1.6,'Iris-versicolor'),
(18,4.9,2.4,3.3,1.0,'Iris-versicolor'),
(19,6.6,2.9,4.6,1.3,'Iris-versicolor'),
(20,5.2,2.7,3.9,1.4,'Iris-versicolor'),
(21,6.3,3.3,6.0,2.5,'Iris-virginica'),
(22,5.8,2.7,5.1,1.9,'Iris-virginica'),
(23,7.1,3.0,5.9,2.1,'Iris-virginica'),
(24,6.3,2.9,5.6,1.8,'Iris-virginica'),
(25,6.5,3.0,5.8,2.2,'Iris-virginica'),
(26,7.6,3.0,6.6,2.1,'Iris-virginica'),
(27,4.9,2.5,4.5,1.7,'Iris-virginica'),
(28,7.3,2.9,6.3,1.8,'Iris-virginica'),
(29,6.7,2.5,5.8,1.8,'Iris-virginica'),
(30,7.2,3.6,6.1,2.5,'Iris-virginica');

SELECT * FROM iris ORDER BY id LIMIT 5;





  (1) SQL로 선형 회귀모형 적합하고 모수 확인하기


PostgreSQL 에서 설명변수(독립변수) X 1개와 목표변수(종속변수) Y 와의 선형 회귀모형을 적합할 수 있습니다. 대신에 하나의 함수로 한번에 선형 회귀모형을 적합하는 것은 아니구요, REGR_SLOPE(Y, X) 함수로 기울기(slope)를 구하고, REGR_INTERCEPT(Y, X)로 Y절편을 구할 수 있습니다.



-- Python으로 산점도와 선형회귀선을 겹쳐서 그래보면 아래와 같습니다.



## Scatter Plot using Python seaborn package


import pandas as pd
import matplotlib.pyplot as plt
import seaborn as sns
plt.rcParams['figure.figsize'] = [12, 8]


iris = sns.load_dataset('iris')


sns.regplot(x=iris['petal_length'],
            y=iris['petal_width'],
            fit_reg=True)

plt.title('Scatter Plot with Regression Line', fontsize=16)
plt.show()

 




REGR_COUNT(Y, X)는 관측치의 개수, REGR_AVGY(Y, X) 는 Y의 평균 값, REGR_AVGX(Y, X) 는 X의 평균 값을 구해줍니다.


그리고 REGR_R2(Y, X)는 적합된 선형회귀모형의 설명력을 보여주는 결정계수(coefficient of determination)를 구해줍니다.



----------------------------
-- (1) PostgreSQL functions
----------------------------
-- Training a Regression using PostgreSQL regr_slope(Y, X), regr_intercept(Y, X) function
DROP TABLE IF EXISTS iris_regr_postgres;
CREATE TABLE iris_regr_postgres AS (
SELECT
    'petal_width' AS y_var_nm
    , 'petal_length' AS x_var_nm
    , REGR_SLOPE(petal_width, petal_length)
    , REGR_INTERCEPT(petal_width, petal_length)
    , REGR_R2(petal_width, petal_length)
    , REGR_AVGY(petal_width, petal_length)
    , REGR_AVGX(petal_width, petal_length)
    , REGR_COUNT(petal_width, petal_length)
FROM  iris
);



SELECT * FROM iris_regr_postgres;







  (2) SQL로 그룹별로 선형 회귀모형 적합하고 예측하기


다음으로 'class_name' 범주('iris_setosa', 'iris_versicolor', 'iris_virginica') 그룹별로 1개 설명변수 'petal_length'와 종속변수 'petal_width'의 관계를 모형화하는 선형 회귀모형을 적합해보겠습니다.


위의 (1)번 SQL query에 SELECT 문에 그룹 칼럼('class_name')을 넣어주고, FROM 절 다음에 GROUP BY 그룹 칼럼('class_name') 을 넣어주면 됩니다. 모델 3개가 잘 적합되었습니다.



-- Regression by Groups
DROP TABLE IF EXISTS iris_regr_grp_postgres;
CREATE TABLE iris_regr_grp_postgres AS (
SELECT
    class_name AS group_nm
    , 'petal_width' AS y_var_nm
    , 'petal_length' AS x_var_nm
    , REGR_SLOPE(petal_width, petal_length)
    , REGR_INTERCEPT(petal_width, petal_length)
    , REGR_R2(petal_width, petal_length)
    , REGR_AVGY(petal_width, petal_length)
    , REGR_AVGX(petal_width, petal_length)
    , REGR_COUNT(petal_width, petal_length)
FROM  iris
GROUP BY class_name
ORDER BY class_name
);



SELECT * FROM iris_regr_grp_postgres ORDER BY group_nm;





이제 위에서 적합한 class_name별 3개 모델(기울기 slope, Y절편 intercept)의 모수를 활용해서 아래의 수식을 사용해서 예측을 해보겠습니다.



-- Prediction
SELECT
    iris.class_name
    , iris.id
    , iris.petal_width AS y_petal_width
    , (iris.petal_length * m.regr_slope + m.regr_intercept) AS pred_petal_width
FROM iris, iris_regr_grp_postgres m
WHERE iris.class_name = m.group_nm
ORDER BY id;







  (3) Apache MADlib으로 다중 선형 회귀모형(Multiple Linear Regression) 적합하기


SQL 기반의 오픈소스 Apache MADlib의 madlib.linregr_train() 함수를 사용하여 PostgreSQL, Greenplum database에서 여러개의 설명변수를 사용하는 다중 선형회귀모형 (multiple linear regression)을 적합할 수 있습니다.


위의 (1)번 PostgreSQL 의 기울기, Y절편 함수에서는 설명변수 X로 1개의 칼럼만을 사용하는 한계가 있었습니다. 그리고 기울기와 Y절편, R^2 등을 구하기 위해 개별 함수를 사용해야 하는 불편함이 있었습니다.


반면에, MADlib의 madlib.linregr_train() 함수는 source table, output table, dependent variable, ARRAY[1, independent variables] 의 순서대로 칼럼 이름을 넣어주면 됩니다. 그러면 회귀계수(coef), 결정계수(r2), 설명변수별 표준화오차(std_err)와 t통계량(t_stats), p값(p_values), condition_no, 관측치 개수(num_rows_processed), 결측치 개수(num_missing_rows_skipped), 분산공분산(variance_covariance) 의 결과를 반환합니다.



----------------
-- (2) MADlib
----------------
-- Multivariate Regression using MADlib
DROP TABLE IF EXISTS iris_regr, iris_regr_summary;
SELECT madlib.linregr_train(
    'iris'              -- source table
    , 'iris_regr'      -- output table
    , 'petal_width' -- dependent variable
    , 'ARRAY[1, petal_length, sepal_length]' -- independent variables
);

SELECT * FROM iris_regr;





위의 선형 회귀모형 적합 결과를 좀더 보기 좋도록 UNNEST() 를 사용해서 설명변수별로 구분해서 풀어서 제시해보겠습니다.



SELECT
    UNNEST(ARRAY['intercept', 'petal_length', 'sepal_length']) AS var_nm
    , UNNEST(coef) AS coef
    , UNNEST(std_err) AS std_err
    , UNNEST(t_stats) AS t_stats
    , UNNEST(p_values) AS p_values
FROM iris_regr;






  (4) Apache MADlib으로 그룹별로 다중 선형 회귀모형 적합하고 예측하기


이번에는 class_name 범주의 그룹별로 다중 선형회귀모형을 적합해 보겠습니다.


madlib.linregr_tarin() 함수의 5번째 인자에 Grouping Column으로서 'class_name' 을 넣어주면 됩니다.



-- Multiple Regression by Group using MADlib
DROP TABLE IF EXISTS iris_regr_grp, iris_regr_grp_summary;
SELECT madlib.linregr_train(
    'iris'          -- source table
    , 'iris_regr_grp'   -- output table
    , 'petal_width' -- dependent variable
    , 'ARRAY[1, petal_length, sepal_length]' -- indepent variables
    , 'class_name'  -- grouping column
);

--SELECT * FROM iris_regr_grp;

SELECT
    class_name
    , UNNEST(ARRAY['intercept', 'petal_length', 'sepal_length']) AS var_nm
    , UNNEST(coef) AS coef
    , UNNEST(std_err) AS std_err
    , UNNEST(t_stats) AS t_stats
    , UNNEST(p_values) AS p_values
FROM iris_regr_grp;





위에서 3개 범주 그룹별로 적합한 모델을 사용해서 madlib.linregr_predict() 함수로 예측을 해보겠습니다. 이때 WHERE 조건절에 input dataset의 class_name 범주와 모델 테이블의 class_name 이 같아야 한다는 조건을 추가해줍니다.



-- Prediction
SELECT iris.*,
       madlib.linregr_predict( m.coef,
                               ARRAY[1,petal_length,sepal_length]
                             ) AS predict
FROM iris, iris_regr_grp m
WHERE  iris.class_name = m.class_name
ORDER BY id
LIMIT 10;





[Reference]
* PostgreSQL 9.4: https://www.postgresql.org/docs/9.4/functions-aggregate.html
* Apache MADlib : https://madlib.apache.org/docs/latest/group__grp__linreg.html


이번 포스팅이 많은 도움이 되었기를 바랍니다.
행복한 데이터 과학자 되세요! :-)



728x90
반응형
Posted by Rfriend
,

이번 포스팅에서는 PostgreSQL, Greenplum database에서 SQL, MADlib 함수, PL/R, PL/Python을 사용해서 연속형 데이터에 대한 요약통계량을 구하는 방법을 소개하겠습니다.  무척 쉬운 내용이므로 쉬어가는 코너 정도로 가볍게 생각해주시면 좋겠습니다. ^^


PostgreSQL, Greenplum database 에서 연속형 데이터에 대해 그룹별로, 

(1) SQL 로 요약통계량 구하기

(2) Apache MADlib 으로 요약통계량 구하기





참고로, 이번 포스팅에서는 PostgreSQL 9.4, Greenplum 6.10.1 버전을 사용하였으며, PostgreSQL 9.4 버전보다 낮은 버전을 사용하면 최빈값(mode), 사분위부(percentile) 구하는 함수를 사용할 수 없습니다. 


먼저, 예제로 사용하기 위해 '나이'의 연속형 데이터와 '성별'의 범주형 데이터 (그룹)를 가진 간단한 테이블을 만들어보겠습니다. 결측값(missing value)도 성별 그룹별로 몇 개 넣어봤습니다. 



DROP TABLE IF EXISTS cust;

CREATE TABLE cust (id INT, age INT, gender TEXT);

INSERT INTO cust VALUES

(1,NULL,'M'),

(2,NULL,'M'),

(3,25,'M'),

(4,28,'M'),

(5,27,'M'),

(6,25,'M'),

(7,26,'M'),

(8,29,'M'),

(9,25,'M'),

(10,27,'M'),

(11,NULL,'F'),

(12,23,'F'),

(13,25,'F'),

(14,23,'F'),

(15,24,'F'),

(16,26,'F'),

(17,23,'F'),

(18,24,'F'),

(19,22,'F'),

(20,23,'F');

 




 (1) SQL로 연속형 데이터의 그룹별 요약통계량 구하기


함수가 굳이 설명을 안해도 될 정도로 간단하므로 길게 설명하지는 않겠습니다. 


표준편차 STDDEV() 와 분산 VARIANCE() 함수는 표본표준편차(sample standard deviation), 표본분산(sample variance) 를 계산해줍니다. 만약 모표준편차(population standard deviation), 모분산(population variance)를 구하고 싶으면 STDDEV_POP(), VAR_POP() 함수를 사용하면 됩니다. 


PostgreSQL 9.4 버전 이상부터 최빈값(MODE), 백분위수(Percentile) 함수가 생겨서 정렬한 후에 집계하는 기능이 매우 편리해졌습니다. (MODE(), PERCENTILE_DISC() 함수를 사용하지 않고 pure SQL로 최빈값과 백분위수를 구하려면 query 가 꽤 길어집니다.)



SELECT

    gender AS group_by_value

    , 'age' AS target_column

    , COUNT(*) AS row_count

    , COUNT(DISTINCT age) AS distinct_values

    , AVG(age)

    , VARIANCE(age)

    , STDDEV(age)

    , MIN(age)

    , PERCENTILE_DISC(0.25) WITHIN GROUP (ORDER BY age) AS first_quartile

    , MEDIAN(age)

    , PERCENTILE_DISC(0.75) WITHIN GROUP (ORDER BY age) AS third_quartile

    , MAX(age)

    , MODE() WITHIN GROUP (ORDER BY age) -- over PostgreSQL 9.4

FROM cust

WHERE age IS NOT NULL

GROUP BY gender

ORDER BY gender;





성별 그룹별로 연령(age) 칼럼의 결측값 개수를 구해보겠습니다. 

결측값 개수는 WHERE age IS NULL 로 조건절을 주고 COUNT(*)로 행의 개수를 세어주면 됩니다. 



SELECT 

    gender

    , COUNT(*) AS missing_count

FROM cust

WHERE age IS NULL

GROUP BY gender

ORDER BY gender;


Out[5]:
gendermissing_count
F1
M2





위의 집계/ 요약통계량과 결측값 개수를 하나의 조회 결과로 보려면 아래처럼 Join 을 해주면 됩니다.



WITH summary_tbl AS (
    SELECT
        gender AS group_by_value
        , 'age' AS target_column
        , COUNT(*) AS row_count
        , COUNT(DISTINCT age) AS distinct_values
        , AVG(age)
        , VARIANCE(age)
        , STDDEV(age)
        , MIN(age)
        , PERCENTILE_DISC(0.25) WITHIN GROUP (ORDER BY age) AS first_quartile
        , MEDIAN(age)
        , PERCENTILE_DISC(0.75) WITHIN GROUP (ORDER BY age) AS third_quartile
        , MAX(age)
        , MODE() WITHIN GROUP (ORDER BY age)
    FROM cust
    WHERE age IS NOT NULL
    GROUP BY gender
    ORDER BY gender
), missing_tbl AS (
    SELECT
        gender AS group_by_value
        , COUNT(*) AS missing_count
    FROM cust
    WHERE age IS NULL
    GROUP BY gender
)
SELECT a.*, b.missing_count
FROM summary_tbl a LEFT JOIN missing_tbl b USING(group_by_value)
;

 




  (2) Apache MADlib으로 연속형 데이터의 그룹별 요약통계량 구하기


Apache MADlib의 madlib.summary() 함수를 사용하면 단 몇 줄의 코드만으로 위의 (1)번에서 SQL 집계 함수를 사용해서 길게 짠 코드를 대신해서 매우 깔끔하고 간단하게 구할 수 있습니다. 


아래는 (1)번의 결과를 얻기위해 성별(gender) 연령(age) 칼럼의 집계/요약데이터를 구하는 madlib.summary() 함수 예시입니다. 


Target columns 위치에는 1 개 이상의 분석을 원하는 연속형 데이터 칼럼을 추가로 넣어주기만 하면 되므로 (1) 번의 pure SQL 대비 훨씬 편리한 측면이 있습니다! 


그리고 그룹별로 구분해서 집계/요약하고 싶으면 Grouping columns 위치에 기준 칼럼 이름을 넣어주기만 하면 되므로 역시 (1)번의 pure SQL 대비 훨씬 편리합니다!



DROP TABLE IF EXISTS cust_summary;

SELECT madlib.summary('cust'     -- Source table

                      ,'cust_summary'   -- Output table

                      , 'age'                -- Target columns

                      , 'gender'            -- Grouping columns

);






madlib.summary() 함수의 결과 테이블에서 조회할 수 있는 집계/요약통계량 칼럼 리스트는 아래와 같습니다. 



SELECT column_name

FROM INFORMATION_SCHEMA.COLUMNS

    WHERE TABLE_SCHEMA = 'public'

        AND TABLE_NAME    = 'cust_summary'

    ORDER BY ORDINAL_POSITION;

Out[7]:
column_name
group_by
group_by_value
target_column
column_number
data_type
row_count
distinct_values
missing_values
blank_values
fraction_missing
fraction_blank
positive_values
negative_values
zero_values
mean
variance
confidence_interval
min
max
first_quartile
median
third_quartile
most_frequent_values
mfv_frequencies

 



[Reference]

* PostgreSQL aggregate functions: https://www.postgresql.org/docs/9.4/functions-aggregate.html

* Apache MADlib summary function: https://madlib.apache.org/docs/v1.11/group__grp__summary.html



다음번 포스팅에서는 PostgreSQL, Greenplum에서 SQL과 Apache MADlib을 이용하여 상관계수, 상관계수 행렬을 구하는 방법(https://rfriend.tistory.com/581)을 소개하겠습니다.


이번 포스팅이 많은 도움이 되었기를 바랍니다. 

행복한 데이터 과학자 되세요!



728x90
반응형
Posted by Rfriend
,

지난번 포스팅에서는 Python의 ipython-sql, psycopg2 패키지를 사용하여 Jupyter Notebook에서 PostgreSQL, Greenplum database에 접속할 수 있는 4가지 방법(https://rfriend.tistory.com/577)을 소개하였습니다.


이번 포스팅에서는 Python의 ipython-sql, psycopg2 패키지를 사용하여 PostgreSQL, Greenplum database에 SQL query를 할 때 Jupyter Notebook의 로컬 변수를 SQL query에 대입하여 변수값을 동적으로 대체해가면서 query 할 수 있는 3가지 방법을 소개하겠습니다.


Python의 로컬 변수를 SQL query 문에 사용할 수 있으므로 Python과 PostgreSQL, Greenplum DB를 서로 연동해서 데이터분석과 프로그래밍을 하는 경우 매우 강력하고 유용하게 사용할 수 있습니다.


(방법 1) Variable Substitution:  %sql SELECT :variable_name

(방법 2) Variable Substitution:  %sql SELECT {variable_name}

(방법 3) Variable Substitution:  %sql SELECT $variable_name





  (0) 필요 Python 패키지 사전 설치


아래의 SQLAlchemy, psycopg2, ipython-sql, pgspecial, sql_magic 중에서 아직 설치가 안된 패키지가 있다면 아래처럼 명령 프롬프트 창에서 Python의 패키지를 설치해줍니다.



-- (명령 프롬프트 창에서 pip 로 설치)

$ pip install --upgrade pip

$ pip install sqlalchemy

$ pip install psycopg2

$ pip install ipython-sql==0.3.9

$ pip install pgspecial

 



ipython-sql 패키지로 Jupyter Notebook에서 Greenplum database에 접속한 후에, 예제로 사용할 간단한 houses 테이블을 만들어보겠습니다.



%load_ext sql


%sql postgresql://gpadmin:changeme@localhost/demo

[Out] 'Connected: gpadmin@demo'


%sql select version();

 * postgresql://gpadmin:***@localhost/demo
1 rows affected.
Out[3]:
version
PostgreSQL 9.4.24 (Greenplum Database 6.10.1 build commit:efba04ce26ebb29b535a255a5e95d1f5ebfde94e) on x86_64-unknown-linux-gnu, compiled by gcc (GCC) 6.4.0, 64-bit compiled on Aug 13 2020 02:56:29


%%sql
DROP TABLE IF EXISTS houses;
CREATE TABLE houses (id INT, tax INT, bedroom INT, bath FLOAT, price INT,
            size INT, lot INT, region TEXT);
INSERT INTO houses VALUES
  (1 ,  590 ,       2 ,    1 ,  50000 ,  770 , 22100, 'seoul'),
  (2 , 1050 ,       3 ,    2 ,  85000 , 1410 , 12000, 'seoul'),
  (3 ,   20 ,       3 ,    1 ,  22500 , 1060 ,  3500, 'busan'),
  (4 ,  870 ,       2 ,    2 ,  90000 , 1300 , 17500, 'seoul'),
  (5 , 1320 ,       3 ,    2 , 133000 , 1500 , 30000, 'bundang'),
  (6 , 1350 ,       2 ,    1 ,  90500 ,  820 , 25700, 'bundang'),
  (7 , 2790 ,       3 ,  2.5 , 260000 , 2130 , 25000, 'busan'),
  (8 ,  680 ,       2 ,    1 , 142500 , 1170 , 22000, 'busan'),
  (9 , 1840 ,       3 ,    2 , 160000 , 1500 , 19000, 'inchon'),
 (10 , 3680 ,       4 ,    2 , 240000 , 2790 , 20000, 'seoul'),
 (11 , 1660 ,       3 ,    1 ,  87000 , 1030 , 17500, 'inchon'),
 (12 , 1620 ,       3 ,    2 , 118600 , 1250 , 20000, 'busan'),
 (13 , 3100 ,       3 ,    2 , 140000 , 1760 , 38000, 'bundang'),
 (14 , 2070 ,       2 ,    3 , 148000 , 1550 , 14000, 'bundang'),
 (15 ,  650 ,       3 ,  1.5 ,  65000 , 1450 , 12000, 'inchon');





  (방법 1) Variable Substitution:   %sql SELECT :variable_name


첫번째 방법은 :variable_name 과 같은 named style 을 사용해서 Jupyter Notebook에서 local namespace에 생성한 변수 이름을 SQL query에 넣어서 동적으로 값을 바꾸어 가면서 query 를 해보겠습니다.


(1-1) :variable_name 으로 SQL의 정수(integer) 변수값 대체



id_var = (1, 5, 15)

%sql SELECT * FROM houses WHERE id IN :id_var;

[Out]

* postgresql://gpadmin:***@localhost/demo 3 rows affected.

Out[123]:
idtaxbedroombathpricesizelotregion
159021.05000077022100seoul
5132032.0133000150030000bundang
1565031.565000145012000inchon

 



(1-2) :variable_name 으로 SQL의 문자형(character) 변수값 대체



region_var = 'seoul'

%sql SELECT * FROM houses WHERE region = :region_var;

[Out]

* postgresql://gpadmin:***@localhost/demo 4 rows affected.

Out[125]:
idtaxbedroombathpricesizelotregion
10368042.0240000279020000seoul
159021.05000077022100seoul
2105032.085000141012000seoul
487022.090000130017500seoul


 




  (방법 2) Variable Substitution:  %sql SELECT {variable_name}


(2-1) {variable_name} 으로 SQL의 정수(integer) 변수값 대체



id_var = (1, 5, 15)

%sql SELECT * FROM houses WHERE id IN {id_var};

[Out]

* postgresql://gpadmin:***@localhost:5432/demo 3 rows affected.

Out[126]:
idtaxbedroombathpricesizelotregion
159021.05000077022100seoul
5132032.0133000150030000bundang
1565031.565000145012000inchon

 



(2-2) '{variable_name}' 으로 SQL의 문자형(character) 변수값 대체



region_var = 'seoul'

%sql SELECT * FROM houses WHERE region = '{region_var}';

[Out]

* postgresql://gpadmin:***@localhost:5432/demo 4 rows affected.

Out[127]:
idtaxbedroombathpricesizelotregion
159021.05000077022100seoul
10368042.0240000279020000seoul
2105032.085000141012000seoul
487022.090000130017500

seoul




  (방법 3) Variable Substitution:  %sql SELECT $variable_name


(3-1) $variable_name 으로 SQL의 정수형(integer) 변수값 대체



id_var = (1, 5, 15)

%sql SELECT * FROM houses WHERE id IN $id_var;

[Out]

* postgresql://gpadmin:***@localhost:5432/demo 3 rows affected.

Out[128]:
idtaxbedroombathpricesizelotregion
159021.05000077022100seoul
5132032.0133000150030000bundang
1565031.565000145012000inchon

 



(3-2) $variable_name 으로 SQL의 문자형(character) 변수값 대체


localname space에 문자형의 bind parameter 값 입력해줄 때 큰따옴표(" ")로 감싸주고, 그 안에 작은따옴표(' ')로 값 입력해주도록 하세요. 그냥 작은따옴표(' ')만 했더니 칼럼으로 인식을 해서 에러가 나네요.



region_var = "'seoul'"

%sql SELECT * FROM houses WHERE region = $region_var;

[Out]

* postgresql://gpadmin:***@localhost:5432/demo 3 rows affected.

Out[171]:
idtaxbedroombathpricesizelotregion
2105032.085000141012000seoul
487022.090000130017500seoul
10368042.0240000279020000seoul

 



* Reference: https://pypi.org/project/ipython-sql/



이번 포스팅이 많은 도움이 되었기를 바랍니다.

행복한 데이터 과학자 되세요!



728x90
반응형
Posted by Rfriend
,

지난번 포스팅에서는 Python의 ipython-sql, pgspecial 패키지를 사용하여 Jupyter Notebook 에서 PostgreSQL, Greenplum database 에 접속(access)하고 SQL query, meta-commands 를 하는 방법(https://rfriend.tistory.com/572)을 간략하게 소개하였습니다.


이번 포스팅에서는 psycopg2 와 ipython-sql 패키지를 사용하여 Jupyter Notebook에서 PostgreSQL, Greenplum database 에 접속하는 4가지 방법을 추가로 소개하겠습니다.


특히, 사용자이름(username), 비밀번호(password), 호스트(host), 포트(port), 데이터베이스(database) 등과 같이 보안이 요구되고 다른 사용자에게 노출이나 공유가 되면 곤란한 정보들(DB Credentials)을 Jupyter Notebook에서 표기/노출하지 않고 별도의 파일로 보관하면서, 이를 불러와서 DB access 할 수 있는 방법들에 주안점을 두고 소개하겠습니다.


-- Jupyter Notebook 의 Cell 안에서 DB credentials 직접 입력 (* 외부 노출되므로 권장하지 않음)

(1) %sql postgresql://Username:Password@Host:Port/Database

(2) %sql $connection_string


-- 별도의 폴더에 별도의 파일로 DB credentials 관리하고, 이를 불러와서 Jupyter Notebook에 입력

    (* 보안유지 되므로 권장함)

(3) config.py 별도 파일 & %sql $connection_string

(4) db_credentions 별도 파일 & %config SqlMagic.dsn_filename = db_cred_path





(0) Python 패키지 사전 설치


먼저, 명령 프롬프트 창에서 아래의 PostgreSQL, Greenplum database에 접속하고 SQL query 를 하기 위해 필요한 Python 패키지들을 pip로 설치해줍니다.



-- (명령 프롬프트 창에서 설치)

$ pip install --upgrade pip

$ pip install sqlalchemy

$ pip install psycopg2

$ pip install ipython-sql==0.3.9

$ pip install pgspecial

$ pip install sql_magic

 



-- Jupyter Notebook 의 Cell 안에서 DB credentials 직접 입력

   (* 편리하기는 하지만, DB 접속 정보가 외부에 노출되므로 권장하지 않음. )


 (1) %sql postgresql://Username:Password@Host:Port/Database


가장 편리한 방법은 SQLAlchemy 표준 URL (database-driver://Username:Password@Host:Port/Database) 에 따라 PostgreSQL, Greenplum database에 connection engine을 생성하여 접속하는 방법입니다.


하지만, 이 방법은 Jupyter Notebook에 DB 접속정보가 고스란히 노출되기 때문에 만약 다른 조직, 팀원 간에 협업을 하고 notebook 파일을 공유해야할 일이 생길 경우 보안 방침에 위배가 되므로 권장하는 방법은 아닙니다.



%load_ext sql


# postgresql://Username:Password@Host:Port/Database
%sql postgresql://gpadmin:changeme@localhost:5432/demo

[Out] 'Connected: gpadmin@demo'







 (2) %sql $connection_string


%sql $connection_string 문으로 Jupyter Notebook에서 동적으로 DB credentials 를 Python string format 으로 입력받아서 PostgreSQL, Greenplum database에 접속할 수 있습니다. 


다만, 아래처럼 DB credentials 를 Jupyter notebook 의 Cell 안에서 직접 입력하면 DB 접속 정보가 외부로 노출되는 문제가 있습니다.



%load_ext sql


# DB credentials

username = "gpadmin"
password = "changeme"
host = "localhost"
port = "5432"
database = "demo"


# connection strings using Python string format
connection_string = "postgresql://{user}:{password}@{host}:{port}/{db}".format(
    user=username,
    password=password,
    host=host,
    port=port,
    db=database)


# dynamic access dredentials
%sql $connection_string

 





-- 별도의 폴더에 별도의 파일로 DB credentials 관리하고, 이를 불러와서 Jupyter Notebook에 입력

    (* 보안유지 되므로 권장함)


 (3) config.py 별도 파일 & %sql $connection_string


세번째 방법은 DB Credentials 정보를 별도의 파일에 분리해서 만들어놓고, 이를 불러와서 DB connect 하는 방법입니다. 아래에 예를 들어보면, (폴더, 파일 이름은 각자 알아서 정해주면 됨)


(a) HOME directory 밑에 DB credentials 파일을 넣어둘 'db_cred' 라는 이름의 폴더 만들고,

(b) 'db_cred' 폴더 안에 'gpdb_credentials.py', '__init__.py' 라는 이름의 2개의 Python 파일을 생성함.

     'gpdb_credentials.py' 파일에는 Dictionary (Key : Value 짝) 형태로 Username, Password, Host, Port, Database 정보를 입력해줌. 여러개의 Database 별로 credentials 정보를 각각 다른 이름의 Dictionary 로 하나의 파일 안에 생성해놓을 수 있음.

     '__init__.py' 파일은 내용은 비어있으며, 해당 폴더의 Python 파일을 패키지로 만들기 위해 생성해줌.

(c) Jupyter Notebook 을 작업하는 Directory 에서도 HOME directory 밑의 'db_cred' 폴더에 접근해서 'gpdb_credentials.py' 파일에 접근할 수 있도록 sys.path.append(cred_path) 로 Python 의 Path 에 추가해줌. (sys.path.append(cred_path))

(d) 작업을 하는 Jupyter Notebook 에서 'from gpdb_credentials import demo_db' 문으로 gpdb_credentials.py 파일에서 'demo_db' Dictionary 를 불러옴.

(e) 'demo_db' Dictionary 에서 DB connection에 필요한 정보를 파싱해옴. (dict['key'] 인덱싱)

(f) %sql $ 문 뒤에 (e)에서 파싱해서 만든 connection_string을 입력해서 DB connect 함.



%load_ext sql


# put a folder and DB credential files at HOME directory

import os
homedir = os.getenv('HOME')
cred_path = os.path.join(homedir, 'db_cred')


# add a 'cred_path' for interpreter to search
import sys
sys.path.append(cred_path)


# import DB credentials from 'gpdb_credentials.py' dictionary file.

from gpdb_credentials import demo_db


# parsing DB credentials and connect to Greenplum using %sql $connection_string

username = demo_db['Username']
password = demo_db['Password']
host = demo_db['Host']
port = demo_db['Port']
database = demo_db['Database']

connection_string = "postgresql://{user}:{password}@{host}:{port}/{db}".format(
    user=username,
    password=password,
    host=host,
    port=port,
    db=database)

%sql $connection_string






(4) db_credentials 별도 파일 & %config SqlMagic.dsn_filename = db_cred_path


명령 프롬프트 창에서 아래처럼 0.3.9 버전의 ipython-sql을 설치해줍니다. (최신 버전은 0.4.0 이지만 Python 3.x. 버전의 ipython-sql 0.4.0 버전에 DSN connections 를 하는데 있어 config 를 반환하지 않는 bug가 있습니다. config bug fix 되기 전까지는 0.3.9 버전으로 사용하기 바랍니다.)


-- 명령 프롬프트 창에서 ipython-sql 0.3.9 버전 설치

pip install ipython-sql==0.3.9


(a) PostgreSQL, Greenplum database 접속 정보(connection info.)를 별도의 configuration file 에 저장하여 HOME directory 밑에 보관합니다. 이때 2개 이상의 복수의 DB credentials 정보를 [DB alias] 로 구분해서 하나의 configuration file에 저장해서 사용할 수 있습니다.


파일 이름을 ".odbc.ini", ".dsn.ini" 처럼 "."으로 시작하면 '숨김 파일(hidden file)'이 되어 평상시에는 탐색기, Finder에서는 볼 수가 없으므로 DB 접속정보를 관리하는데 좀더 보안에 유리합니다.

(참고로, Windows OS에서 숨김파일을 보려면, Windows 탐색기에서 [구성] > [폴더 및 검색 옵션] > [폴더 옵션] 대화상자에서 [보기] 탭을 클릭 > [고급 설정]에서 "숨김 파일 밒 폴더 표시"를 선택하면 됩니다.

Mac OS 에서는 Finder에서 "Shift + Command + ." 동시에 눌러주면 숨김 파일이 표시됩니다.)


(b) Jupyter Notebook에서 ipython-sql 로 DSN connections 을 할 수 있습니다.

    %config SqlMagic.dsn_filename = "$homedir/.odbc.ini"


(c) DB connect 된 이후에 제일 처음으로 %sql 로 SQL query 할 때 DB credentions 의 DB alias 를 [ ] 안에 넣어서 명시를 해주고(예: %sql [demo_db] SELECT version();), 그 다음부터 %sql 문으로 SQL query 할 때는 DB alias 를 안써주고 바로 SQL query 를 하면 됩니다.


(d) Jupyter Notebook의 중간 Cell 에서 사용(connect)하려는 DB를 바꾸고 싶으면 %sql [DB_alias2] SELECT .... 처럼 [DB_alias] 부분에 다른 DB alias 이름을 명시해주고 SQL query 를 하면, 그 이후 Cell 부터는 새로운 DB 를 connect 해서 query를 할 수 있습니다.

(예: %sql [dev_db] SELECT COUNT(*) FROM tbl;)



%load_ext sql


import os
homedir = os.getenv('HOME')

# parse and configure gpdb credentials and access to GPDB
%config SqlMagic.dsn_filename = "$homedir/.odbc.ini"

# put [alias_name] after %sql in the first line
%sql [demo_db] SELECT version();

[Out] * postgresql://gpadmin:***@localhost:5432/demo

1 rows affected.



* Reference: https://pypi.org/project/ipython-sql/


다음 포스팅에서는 ipython-sql 로 PostgreSQL, Greenplum database에 접속하여 Jupyter Notebook 의 로컬변수로 동적으로 SQL query 하는 3가지 방법(https://rfriend.tistory.com/578)을 소개하겠습니다.


이번 포스팅이 많은 도움이 되었기를 바랍니다.

행복한 데이터 과학자 되세요!



728x90
반응형
Posted by Rfriend
,

Python의 SQLAlchemy (모든 DB), psycopg2 (PostgreSQL, Greenplum) 패키지를 이용하여 Spyder 나 Pycharm 과 같은 IDE에서 PostgreSQL, Greenplum database에 접속(access) 하고 SQL query 를 할 수 있습니다.

 

 

 

그렇다면 Jupyter Notebook 에서도 DB access, SQL query를 할 수 있으면 편하겠지요?

 

이번 포스팅에서는 Python의 ipython-sql 과 pgspecial 패키지를 이용하여 PostgreSQL, Greenplum database 에 Jupyter Notebook으로 접속(access DB)하여 SQL query 를 하는 방법을 소개하겠습니다.

 


(1) ipython-sql 로 PostgreSQL, Greenplum DB 접속(access)하기

(2) ipython-sql 로 PostgreSQL, Greenplum DB에 SQL query 하기 (%sql, %%sql)

(3) pgspecial 로 PostgreSQL, Greenplum DB에 meta-commands query 하기 (\l, \dn, \dt)

 

 

 

 

 

(0) 사전 설치가 필요한 Python 패키지 리스트

 

명령 프롬프트 창에서 아래의 5개 패키지에 대해서 pip 로 설치해주시기 바랍니다.

 

sqlalchemy, psycopg2는 PostgreSQL, Greenplum DB 접속(access, connection)를 위해서 필요한 Python 패키지 입니다.

 

ipython-sql, sql_magic은 IPython으로 Jupyter Notebook에서 DB access, SQL query 를 하기 위해 필요한 Python 패키지입니다.

 

pgspecial은 Jupyter Notebook에서 PostgreSQL, Greenplum DB에 meta-commands (역슬래쉬 \ 로 시작하는, psql 에서 사용하는 \l, \dn, \dt 명령문) 를 위해 필요한 Python 패키지입니다.

 

ipython-sql 의 경우 2020.12월 현재 0.4.0 버전 (python 3.x) 이 최신인데요, %config로 DB access 하는 명령문의 bug가 아직 fix가 안되어 있어서, 아래처럼 ipython-sql==0.3.9 로 한단계 낮은 버전으로 설치해주세요.

 

 

-- 명령 프롬프트 창에서 pip 로 python 패키지 설치

 

$ pip install --upgrade pip

$ pip install sqlalchemy

$ pip install psycopg2

$ pip install ipython-sql==0.3.9

$ pip install pgspecial

$ pip install sql_magic

 

 

만약 psycopg2 모듈을 pip 로 설치하다가 에러가 나면 아래처럼 wheel package 를 이용해서 psycopg2-binary 로 설치해보세요. 

 

    Error: pg_config executable not found.    

    pg_config is required to build psycopg2 from source.  Please add the directory

    containing pg_config to the $PATH or specify the full executable path with the

    option:   

        python setup.py build_ext --pg-config /path/to/pg_config build ..

    or with the pg_config option in 'setup.cfg'.

 

 

$ pip install psycopg2-binary

 

 

 (1) ipython-sql 로 PostgreSQL, Greenplum DB 접속(access)하기

 

%load_ext sql 로 IPython의 sql 을 로딩하여 %sql 또는 %%sql magic 명령문을 사용할 수 있습니다.

PostgreSQL, Greenplum database에 접속할 때는 SQLAlchemy 의 표준 URL connect strings 를 사용합니다.

 

[ SQLAlchemy 의 표준 Database URL]

 

 dialect+driver://username:password@hoat:port/database

 

아래의 db credentials 로 Greenplum database에 접속할 때의 예입니다.

- driver: postgresql

- username: gpadmin

- password: changeme

- host: localhost

- port: 5432

- database: demo

 

 

%load_ext sql

 

# postgresql://Username:Password@Host:Port/Database
%sql postgresql://gpadmin:changeme@localhost:5432/demo

[Out] 'Connected: gpadmin@testdb'

 

 

 

 

* 위의 %sql SQLAlchemy 표준 URL 방법 외에 Jupyter Notebook에서 PostgreSQL, Greenplum DB에 접속하는 다른 3가지 추가 방법은 https://rfriend.tistory.com/577 를 참고하세요.

 

 

 

 (2) ipython-sql 로 PostgreSQL, Greenplum DB에 SQL query 하기 (%sql, %%sql)

 

Jupyter Notebook의 Cell 안에 1줄 SQL query일 경우는 %sql 로 시작하고, 2줄 이상 SQL query 일 경우에는 %%sql 로 시작합니다.

 

(2-1) %sql : 1줄의 SQL query

 

1줄짜리 SELECT 문으로 PostgreSQL의 버전을 확인해보겠습니다.

 

 

%sql SELECT version();

 

[Out]

version

PostgreSQL 9.4.24 (Greenplum Database 6.10.1 build commit:efba04ce26ebb29b535a255a5e95d1f5ebfde94e) on x86_64-unknown-linux-gnu, compiled by gcc (GCC) 6.4.0, 64-bit compiled on Aug 13 2020 02:56:29

 

 

 

 

 

 

(2-1) %%sql : 2줄 이상의 SQL query

 

아래 예제는 pg_catalog.pg_tables 테이블에서 스키마 이름이 pg_catalog, information_schema 가 아닌 테이블을 조회하여 1개만 반환해보는 query 입니다.

 

 

%%sql
SELECT *
FROM pg_catalog.pg_tables
WHERE schemaname != 'pg_catalog' AND
    schemaname != 'information_schema'
LIMIT 1;

 

 

 

 

 

 (3) pgspecial 로 PostgreSQL, Greenplum DB에 meta-commands (\l, \dn, \dt)

 

meta-commands 는 psql 에서 역슬래쉬(\)와 함께 사용해서 데이터베이스, 스키마, 테이블 등을 조회할 때 사용하는 명령어를 말합니다. pgspecial 패키지는 Jupyter Notebook에서 meta-commands 를 사용할 수 있게 해줍니다.

 

(3-1) database 조회 : %sql \l  (역슬래쉬 + L)

 

 

%sql \l

 

[Out]

Name

Owner

Encoding

Collate

Ctype

Access privileges

demo

gpadmin

UTF8

en_US.utf8

en_US.utf8

=Tc/gpadmin

gpadmin=CTc/gpadmin

dsuser=CTc/gpadmin

gpperfmon

gpadmin

UTF8

en_US.utf8

en_US.utf8

None

postgres

gpadmin

UTF8

en_US.utf8

en_US.utf8

None

template0

gpadmin

UTF8

en_US.utf8

en_US.utf8

=c/gpadmin

gpadmin=CTc/gpadmin

template1

gpadmin

UTF8

en_US.utf8

en_US.utf8

=c/gpadmin
gpadmin=CTc/gpadmin

 

 

 
(3-2) Schema 조회 : %sql \dn
 

 

%sql \dn

 

[Out]

Name Owner
gp_toolkit gpadmin
madlib dsuser14
public gpadmin

 

 

 

 
(3-3) Table 조회 : %sql \dt
 
아래 예제는 public 스키마에서 "ab" 로 시작하는 모든 테이블(public.ab*)을 조회한 것입니다.
 

 

%sql \dt public.ab*

 

[Out]

Schema Name Type Owner
public abalone table

gpadmin

public abalone_corr table

gpadmin

public abalone_corr_summary table

gpadmin

public abalone_correlations table

gpadmin

 

 

 

이상으로 ipython-sql, pgspecial 패키지를 사용해서 PostgreSQL, Greenplum database에 접속하고 SQL query, meta-commands 하는 방법에 대한 가장 기본적이고 개략적인 소개를 마치겠습니다.
 
* 다음번 포스팅에서는 SQLAlchemy, psycopg2, ipython-sql 로 Jupyter Notebook 에서 PostgreSQL, Greenplum database에 접속하는 4가지 방법(https://rfriend.tistory.com/577)에 대한 소소한 팁을 추가로 소개하겠습니다.
 
* ipython-sql 로 PostgreSQL, Greenplum database에 접속하여 Jupyter Notebook 의 로컬변수로 동적으로 SQL query 하는 3가지 방법https://rfriend.tistory.com/578 를 참고하세요.
 
* ipython-sql로 PostgreSQL, Greenplum database에 접속하여 Jupyter Notebook에서 SQL query한 결과를 pandas DataFrame으로 가져오는 3가지 방법https://rfriend.tistory.com/579 를 참고하세요.
 
이번 포스팅이 많은 도움이 되었기를 바랍니다.
행복한 데이터 과학자 되세요!

 

 

 

728x90
반응형
Posted by Rfriend
,

만약 한개 당 1분 걸리는 동일한 프로세스의 100개의 일을 한 명이서 한다면 100분이 걸릴텐데요, 이것을 100명에게 일을 1개씩 나누어서 동시에 분산해서 시킨다면 1분(+취합하는 시간 약간) 밖에 안걸릴 것입니다. 1명이 100명을 이길 수는 없기 때문입니다. 


대용량 데이터에 대해서 빠른 성능으로 통계나 기계학습의 고급 분석을 처리해야 하는 경우라면 Greenplum 과 같은 MPP (Massively Parallel Processing) 아키텍처 기반의 DB에서 R 이나 Python 언어로 작성한 알고리즘을 In-DB에서 PL/R, PL/Python을 사용해서 분산 병렬 처리할 수 있습니다. 


이번 포스팅에서는 Greenplum DB에서 PL/R (Procedural Language R) 을 사용해서 분산 병렬처리(distributed parallel processing하여 그룹별로 선형회귀모형을 각각 적합하고 예측하는 방법을 소개하겠습니다. 모든 연산이 In-DB 에서 일어나기 때문에 데이터 I/O 가 없으므로 I/O 시간을 절약하고 architecture 와  workflow를 간단하게 가져갈 수 있는 장점도 있습니다. (vs. DB 에서 local R 로 데이터 말아서 내리고, local R로 모형 적합 / 예측 후, 이 결과를 다시 DB에 insert 하고 하는 복잡한 절차가 필요 없음)



이번에 소개할 간단한 예제로 사용할 데이터셋은 abalone 공개 데이터셋으로서, 성 (sex) 별로 구분하여 무게(shucked_weight)와 지름(diameter) 설명변수를 input으로 하여 껍질의 고리 개수(rings)를 추정하는 선형회귀모형을 적합하고, 예측하는 문제입니다. 


이러한 일을 성별 F, M, I 별로 순차적으로 하는 것이 아니라, Greenplum DB 에서 성별 F, M, I 별로 PL/R로 분산 병렬처리하여 동시에 수행하는 방법입니다. 



  (1) abalone 데이터셋으로 테이블 만들기


먼저, abalone 데이터셋을 공개 데이터셋 웹사이트에서 가져와서 External table을 만들고, 이로부터 abalone table 을 생성해보겠습니다. 



---------------------------------

-- Linear Regression in Parallel 

-- using PL/R

---------------------------------


-- Dataset for example: abalone dataset from the UC Irvine Machine Learning Repository

-- url: http://archive.ics.uci.edu/ml/machine-learning-databases/abalone/abalone.data

-- Create an external web table

DROP EXTERNAL TABLE IF EXISTS abalone_external;

CREATE EXTERNAL WEB TABLE abalone_external(

sex text 

, length float8

, diameter float8

, height float8

, whole_weight float8

, shucked_weight float8

, viscera_weight float8

, shell_weight float8

, rings integer -- target variable to predict

) LOCATION('http://archive.ics.uci.edu/ml/machine-learning-databases/abalone/abalone.data') 

FORMAT 'CSV' 

(null as '?');



-- Create a table of abalone

DROP TABLE IF EXISTS abalone;

CREATE TABLE abalone AS 

SELECT * FROM abalone_external

DISTRIBUTED BY (sex);



-- Viewing data distribution

SELECT gp_segment_id, COUNT(*) AS row_cnt

FROM abalone

GROUP BY gp_segment_id;




-- Check data

SELECT * FROM abalone LIMIT 5;






  (2) Train, Test set 테이블 분할 (train, test set split)


다음으로 MADlib(https://madlib.apache.org/) 의 madlib.train_test_split() 함수를 사용해서 abalone 원 테이블을 train set, test set 테이블로 분할(split into train and test set) 해보겠습니다. 



---------------------------

-- Train, Test set split

---------------------------

-- Check the number of observations per sex group(F, I, M)
SELECT
 sex, COUNT(*) FROM abalone GROUP BY sex;




-- Train, Test set split

DROP TABLE IF EXISTS out_train, out_test;

SELECT madlib.train_test_split(

'abalone',    -- Source table

'out',     -- Output table

    0.8,       -- train_proportion

    NULL,      -- Default = 1 - train_proportion = 0.5

    'sex', -- Strata definition

    'rings, shucked_weight, diameter', -- Columns to output

    FALSE,      -- Sample with replacement

    TRUE);     -- Separate output tables



SELECT * FROM out_train LIMIT 5;




SELECT sex, count(*) FROM out_train GROUP BY sex;




SELECT sex, count(*) FROM out_test GROUP BY sex;






  (3) array aggregation 하여 PL/R에서 사용할 데이터셋 준비하기


좀 낯설을 수도 있는데요, PL/R 에서는 array 를 input으로 받으므로 array_agg() 함수를 사용해서 설명변수를 칼럼별로 array aggregation 해줍니다. 이때 성별(sex) 로 모형을 각각 병렬로 적합할 것이므로 group by sex 로 해서 성별로 따로 따로 array aggregation 을 해줍니다. 이렇게 해주면 long format으로 여러개의 열(row)에 들어있던 값들이 성별로 구분이 되어서 하나의 array( { } )에 모두 들어가게 됩니다. (아래 이미지 참조)


 

-- Data Preparation

-- : array aggregation using array_agg()

DROP TABLE IF EXISTS abalone_array;

CREATE TABLE abalone_array AS 

SELECT

sex::text -- group

, array_agg(rings::float8) as rings             -- y

, array_agg(shucked_weight::float8) as s_weight -- x1

, array_agg(diameter::float8) as diameter       -- x2

FROM out_train

GROUP BY sex

DISTRIBUTED BY (sex);


SELECT * FROM abalone_array;






  (4) 선형회귀모형 적합하는 PL/R 사용자 정의 함수 정의하기 (Define PL/R UDF)


선형회귀모형 PL/R 의 반환받는 값을 두가지 유형으로 나누어서 각각 소개하겠습니다.


(4-1) 적합된 회귀모형의 회귀계수 (coefficients) 를 반환하기

(4-2) 적합된 회귀모형 자체(fitted model itself)를 반환하기 



먼저, (4-1) 적합된 회귀모형의 회귀계수를 반환하는 PL/R 함수를 정의하는 방법을 소개하겠습니다. 


R의 lm() 함수를 사용하여 다중 선형회귀모형을 적합(fit a model)하면, summary(fitted_model)$coef 는 추정된 회귀계수(coef_est), 표준오차(std_error), T통계량(t_stat), P-값 (p_value) 를 반환합니다. 


CREATE OR REPLAE FUNCTION pl_r_funtion_name(column_name data_type[], ...) 으로 PL/R 함수 이름과 인자로 받는 칼럼 이름, 데이터 유형을 정의해주고, 


이들 모형 적합 후의 추정된 회귀계수와 통계량을 Greenplum DB에 반환하려면 데이터 유형이 여러개이므로 composit type 을 별도로 정의('lm_abalone_type')해주어고, PL/R 사용자 정의함수에서 returns setof lm_abalone_type 처럼 써주면 됩니다.


그 다음에, $$ pure R codes block $$ LANGUAGE 'plr' 형식으로 R codes 를 통째로 $$ ~~~~ $$ 안에 넣어주면 됩니다. 



----------------------------------------------------------------

-- (4-1) PL/R : Linear Regression Model's Coefficients --> Predict

----------------------------------------------------------------


-- Return Types

DROP TYPE IF EXISTS lm_abalone_type CASCADE;

CREATE TYPE lm_abalone_type AS (

variable text

, coef_est float

, std_error float

, t_stat float

, p_value float

);


-- PL/R User Defined Function

DROP FUNCTION IF EXISTS plr_lm_train(float8[], float8[], float8[]);

CREATE OR REPLACE FUNCTION plr_lm_train(

rings float8[]

, s_weight float8[]

, diameter float8[]

) RETURNS SETOF lm_abalone_type AS

$$

m1 <- lm(rings ~ s_weight + diameter)

m1_s <- summary(m1)$coef

temp_m1 <- data.frame(rownames(m1_s), m1_s)

return(temp_m1)

$$

LANGUAGE 'plr';

 





 (5) PL/R 실행하기 (execute PL/R UDF in Greenplum DB)


위의 (4)에서 정의한 성별(per sex) 선형회귀모형을 적합하여 회귀계수와 통계량을 반환하는 PL/R 사용자 정의함수를 Greenplum DB 안에서 병렬처리하여 실행해보겠습니다. 


select sex, (plr_lm_train(rings, s_weight, diameter)).* from abalone_array 처럼 위의 (4)번에서 정의한 PL/R 함수에 (3)번에서 준비한 array 가 들어있는 테이블의 칼럼을 써주고, from 절에 array 테이블 이름을 써주면 됩니다. 


이때 테이블에 return 받는 값들이 composit type의 여러개 칼럼들이므로 (plr_udf(column, ...)).* 처럼 PL/R 함수를 괄호 ( ) 로 싸주고 끝에 '*' (asterisk) 를 붙여줍니다. ( * 를 빼먹으면 여러개의 칼럼별로 나누어지지 않고 한개의 칼럼에 튜플로 모든 칼럼이 뭉쳐서 들어갑니다)



-- Execution of Linear Regression PL/R 

DROP TABLE IF EXISTS lm_abalone_model_coef;

CREATE TABLE lm_abalone_model_coef AS (

SELECT sex, (plr_lm_train(rings, s_weight, diameter)).* 

FROM abalone_array

) DISTRIBUTED BY (sex);


SELECT * FROM lm_abalone_model_coef;







  (6) 적합한 선형회귀모형을 사용해 test set에 대해 예측하기 (prediction on test set)


위의 (5)번에서 적합한 성별 선형회귀모형들의 회귀계수 (coefficients per sex groups) 를 사용해서 test set 의 데이터셋에 대해 PL/R 함수로 분산 병렬처리하여 rings 값을 예측해보겠습니다. (training 도 분산병렬처리, prediction/ scoring 도 역시 분산병렬처리!)


먼저, 예측하는 PL/R 함수에 넣어줄 test set을 array aggregation 해줍니다. 


다음으로, ID별로 실제값(actual rings)과 예측한 값(predicted rings)을 반환받을 수 있도록 composite type 을 정의해줍니다. 


그 다음엔 추정된 회귀계수를 사용해서 예측할 수 있도록 행렬 곱 연산을 하는 PL/R 함수(plr_lm_coef_predict())를 정의해줍니다. 


마지막으로 예측하는 PL/R 함수를 실행해줍니다. 



------------------------------------------------

-- Prediction and Model Evaluation for Test set

------------------------------------------------


-- Preparation of test set in aggregated array

DROP TABLE IF EXISTS test_array;

CREATE TABLE test_array AS 

SELECT

sex::text

, array_agg(rings::float8) as rings             -- y

, array_agg(shucked_weight::float8) as s_weight -- x1

, array_agg(diameter::float8) as diameter       --x2

FROM out_test

GROUP BY sex

DISTRIBUTED BY (sex);


SELECT * FROM test_array;




-- Define composite data type for predicted return values

DROP TYPE IF EXISTS lm_predicted_type CASCADE;

CREATE TYPE lm_predicted_type AS (

id int

, actual float

, pred float

);



-- Define PL/R UDF of prediction per groups using linear regression coefficients

DROP FUNCTION IF EXISTS plr_lm_coef_predict(float8[], float8[], float8[], float8[]);

CREATE OR REPLACE FUNCTION plr_lm_coef_predict(

rings float8[]

, s_weight float8[]

, diameter float8[]

, coef_est float8[]

) RETURNS SETOF lm_predicted_type AS

$$

actual <- rings # y

intercept <- 1

X <- cbind(intercept, s_weight, diameter) # X matrix

coef_est <- matrix(coef_est) # coefficients matrix

predicted <- X %*% coef_est  # matrix multiplication

df_actual_pred <- data.frame(actual, predicted)

id <- as.numeric(rownames(df_actual_pred))

return(data.frame(id, df_actual_pred))

$$

LANGUAGE 'plr';



-- Execute PL/R Prediction UDF

DROP TABLE IF EXISTS out_coef_predict;

CREATE TABLE out_coef_predict AS (

SELECT sex, (plr_lm_coef_predict(c.rings, c.s_weight, c.diameter, c.coef_est)).*

FROM (

SELECT a.*, b.coef_est

FROM test_array a, 

(SELECT sex, array_agg(coef_est) AS coef_est FROM lm_abalone_model_coef GROUP BY sex) b

WHERE a.sex = b.sex

) c

) DISTRIBUTED BY (sex);



-- Compare 'actual' vs. 'predicted'

SELECT * FROM out_coef_predict WHERE sex = 'F' ORDER BY sex, id LIMIT 10;







  (7) 회귀모형 자체를 Serialize 해서 DB에 저장하고, Unserialize 해서 예측하기


위의 (4)번~(6번) 까지는 적합된 회귀모형의 회귀계수와 통계량을 반환하고, 이를 이용해 예측을 해보았다면, 이번에는 


- (4-2) 적합된 회귀모형 자체(model itself)를 Serialize 하여 DB에 저장하고 (인코딩)

- 이를 DB에서 읽어와서 Unserialize 한 후 (디코딩), 예측하기

- 단, 이때 예측값의 95% 신뢰구간 (95% confidence interval) 도 같이 반환하기


를 해보겠습니다. 



--------------------------------------------------------------------

-- (2) PL/R : Linear Model --> Serialize --> Deserialize --> Predict

--------------------------------------------------------------------


-- PL/R User Defined Function Definition

DROP FUNCTION IF EXISTS plr_lm_model(float8[], float8[], float8[]);

CREATE OR REPLACE FUNCTION plr_lm_model(

    rings float8[]

, s_weight float8[]

, diameter float8[]

) RETURNS bytea -- serialized model as a byte array

AS

$$

lr_model <- lm(rings ~ s_weight + diameter)

return (serialize(lr_model, NULL))

$$

LANGUAGE 'plr';


-- Execution of Linear Regression PL/R 

DROP TABLE IF EXISTS lm_abalone_model;

CREATE TABLE lm_abalone_model AS (

SELECT sex, plr_lm_model(rings, s_weight, diameter) AS serialized_model

FROM abalone_array

) DISTRIBUTED BY (sex);



-- We can not read serialized model

SELECT * FROM lm_abalone_model;





DROP TYPE IF EXISTS lm_predicted_interval_type CASCADE;

CREATE TYPE lm_predicted_interval_type AS (

id int

, actual float

, pred float

, lwr float

, upr float

);


-- PL/R function to read a serialized PL/R model

DROP FUNCTION IF EXISTS plr_lm_model_predict(float8[], float8[], float8[], bytea);

CREATE OR REPLACE FUNCTION plr_lm_model_predict(

rings float8[]

, s_weight float8[]

, diameter float8[]

, serialized_model bytea

) RETURNS SETOF lm_predicted_interval_type 

AS

$$

model <- unserialize(serialized_model)

actual <- rings # y

X <- data.frame(s_weight, diameter) # new data X

predicted <- predict(model, newdata = X, interval = "confidence")

df_actual_pred <- data.frame(actual, predicted)

id <- as.numeric(rownames(df_actual_pred))

return (data.frame(id, df_actual_pred))

$$

LANGUAGE 'plr';



-- Predict

DROP TABLE IF EXISTS out_model_predict;

CREATE TABLE out_model_predict AS (

SELECT sex, (plr_lm_model_predict(c.rings, c.s_weight, c.diameter, c.serialized_model)).*

FROM (

SELECT a.*, b.serialized_model

FROM test_array a, lm_abalone_model b

WHERE a.sex = b.sex

) c

) DISTRIBUTED BY (sex);


SELECT * FROM out_model_predict WHERE sex = 'F' ORDER BY sex, id LIMIT 10;




[Greenplum & PostgreSQL] MADlib 을 활용한 그룹별 선형회귀모형 분산병렬 적합 및 예측은 https://rfriend.tistory.com/533 를 참고하세요. 



[References]


많은 도움이 되었기를 바랍니다. 

이번 포스팅이 도움이 되었다면 아래의 '공감~'를 꾹 눌러주세요. 



728x90
반응형
Posted by Rfriend
,