'Parallel Processing using Procedural Language R'에 해당되는 글 1건

  1. 2020.04.05 [Greenplum, PostgreSQL DB] PL/R 을 위한 데이터셋 array aggregation 준비하는 3가지 방법 4

Greenplum 혹은 PostgreSQL DB에서 PL/R (Procudural Language R Extension) 분석을 위해서는 (1) PL/R 사용자 정의 함수 정의 (define PL/R UDF), (2) array aggregation 하여 데이터 준비 (Preparation of data by array aggregation), (3) PL/R 사용자 정의 함수를 호출하여 실행 (execute PL/R UDF) 의 순서로 진행이 됩니다. 


이번 포스팅에서는 Greenplum 혹은 PostgreSQL DB에서 PL/R (Procedural Language R)을 사용해서 In-DB analytics 를 하기 위해서 array 형태로 데이터를 준비하는 3가지 방법을 소개하겠습니다. 


1. 열(column)을 기준으로 여러개의 행(row)을 그룹별로 array aggregation

2. 행과 열을 기준으로 그룹별로 2D (2-dimensional) array aggregation 

3. 문자열로 string aggregation 하고 PL/R 코드 안에서 R로 데이터셋 변환하기



[ Workflow and Output Image of PL/R on Greenplum, PostgreSQL DB ]




예제로 사용할 PL/R 분석은 'a'와 'b' 두 개의 그룹 별로 x1, x2 두 숫자형 변수 간의 상관계수(correlation coefficients)를 계산하는 업무입니다. 


예제로 사용할 간단한 테이블을 먼저 만들어보겠습니다. 



--create schema and sample table

create schema test;


drop table if exists test.tbl;

create table test.tbl (

grp text not null

, x1 int

, x2 int

);


insert into test.tbl (grp, x1, x2) values 

('a', 1, 2)

, ('a', 2, 5)

, ('a', 3, 4)

, ('b', 1, 8)

, ('b', 2, 7)

, ('b', 3, 3);



select * from test.tbl;

 






  1. 열(column)을 기준으로 여러개의 행(row)을 그룹별로 array aggregation


첫번째는 SQL 의 array_agg() 함수를 사용해서 그룹('grp') 별로 x1, x2 각 칼럼을 기준으로 여러개의 행을 array 형태로 aggregation 하는 방법입니다. 칼럼 기준으로 array aggregation 을 하기 때문에 PL/R 사용자 정의 함수 안에서 각 칼럼을 인자로 받아서 정의하기에 직관적으로 이해하기 쉽고 사용이 편리한 장점이 있습니다. 또 각 칼럼 별로 데이터 유형 (data type)이 서로 다를 경우 (가령, 칼럼이 텍스트, 정수형, 부동소수형 등으로 서로 다른 경우) 각 칼럼 별로 array aggregation 을 하기 때문에 각자 데이터 유형에 맞추어서 해주면 되는 점도 편리합니다. 


다만, 칼럼의 개수가 많을 경우에는 일일이 array aggregation 하고, 또 PL/R 사용자 정의 함수 안에서 이들 칼럼을 다시 인자로 받아서 data frame 으로 만들거나 할 때 손이 많이 가서 번거로울 수 있습니다. 그리고, 그룹 별로 array aggregation 을 했을 때 만약 데이터 크기가 크다면 PL/R을 실행할 때 데이터 I/O 에 다소 시간이 소요될 수 있습니다. 



-----------------------------------------------

-- (1) data preparation : array_agg() by column

-----------------------------------------------

drop table if exists test.array_agg_by_col;

create table test.array_agg_by_col as (

select 

grp

, array_agg(x1) as x1_agg

, array_agg(x2) as x2_agg

from test.tbl 

group by grp

) distributed by (grp);


select * from test.array_agg_by_col order by grp;





-- define PL/R UDF

drop function if exists test.plr_cor(int[], int[]);

create or replace function test.plr_cor(x1 int[], x2 int[]) 

returns float8

as

$$ 

corr_coef <- cor(x1, x2)

return (corr_coef)

$$ language 'plr';




-- execute PL/R UDF

select 

grp

, test.plr_cor(x1_agg, x2_agg) as corr_coef

from test.array_agg_by_col;







  2. 행과 열을 기준으로 그룹별로 2D (2-dimensional) array aggregation  


두번째 방법은 Apache MADlib 의 madlib.matrix_agg() 함수를 사용해서 2차원 배열의 행렬을 만드는 것입니다. 만약 칼럼별 데이터 유형이 모두 숫자형이고 또 칼럼의 개수가 많아서(가령, 수십~수백개) 일일이 array_agg() 를 하기가 번거롭다면 madlib.matrix_agg() 함수를 사용하는 것이 상대적으로 2D array aggregation 하기도 쉽고 또 PL/R 사용자 정의 함수 안에서 데이터 변환을 해서 이용하기도 편리합니다. 


반면에, 만약 각 칼럼 별 데이터 유형이 서로 다르고 숫자형이 아닌 텍스트 등이 들어있다면 사용할 수가 없습니다


MADlib 의 함수를 사용하는 것이므로 Greenplum DB에 MADlib을 미리 설치해두어야 합니다. 

* Apache MADlib : https://madlib.apache.org/



------------------------------------------------------

-- (2) data preparation : 2D array MADlib matrix_agg()

--     : only with same data types

------------------------------------------------------


drop table if exists test.tbl_matrix_agg;

create table test.tbl_matrix_agg as (

select 

grp

, madlib.matrix_agg(array[x1, x2]) as mat_agg

from test.tbl 

group by grp

) distributed by (grp);


select * from test.tbl_matrix_agg;




-- define PL/R UDF

drop function if exists test.plr_cor_2(float8[]);

create or replace function test.plr_cor_2(mat_agg float8[]) 

returns float8

as

$$

df <- data.frame(mat_agg)

colnames(df) <- c("x1", "x2")

corr_coef <- with(df, cor(x1, x2))

return (corr_coef)

$$ language 'plr';




-- execute PL/R UDF

select 

grp

, test.plr_cor_2(mat_agg) as corr_coef

from test.tbl_matrix_agg;





 

  3. 문자열로 string aggregation 한 후 PL/R 코드 안에서 R 로 데이터셋 변환하기


PL/R 함수에 input으로 들어갈 데이터를 준비하는 세번째 방법은 데이터를 텍스트로 변환해서 SQL의 string_agg() 함수를 사용하여 구분자(delimiter, 가령 ',' 나 '|' 등)를 값 사이에 추가하여 그룹별로 aggregation 하는 것입니다. 


string aggregation을 사용하면 다양한 데이터 유형 (가령, 텍스트, 정수, 부동소수형 등)이 섞여 있는 다수의 칼럼을 그룹 별 & 행(row) 별로 aggregation 할 수 있고, 또 array aggregation 대비 상대적으로 데이터 크기를 줄여서 PL/R 실행 시 데이터 I/O 시간을 다소 줄일 수 있는 장점이 있습니다. 


반면에, PL/R 함수 안에서 R로 string aggregation 되어 있는 데이터 덩어리를 구분자(delimiter)를 기준으로 분리(split) 하고 transpose 해서 R에서 분석하기에 적합한 형태로 데이터 전처리를 해주어야 하는 번거로움이 있습니다.  아래에 (3-1) base 패키지의 strsplit() 함수를 이용한 전처리와, (3-2) data.table 패키지의 tstrsplit() 함수를 이용한 전처리로 나누어서 각각 예시를 들어보았습니다. 


PL/R 함수를 SQL editor 에서 짜면서 디버깅을 하려면 input, return type 을 정의해주면서 해야하기 때문에 무척 고달플 수 있습니다. 따라서 제일 빠르고 또 정확한 방법은 RStudio 같은 R 전용 IDE에서 샘플 데이터로 R code 에 에러, 버그가 없도록 clean R codes block 을 작성한 후에, 이를 PL/R 코드의 $$ R codes block $$ 안에 추가하는 방식입니다. 노파심에 다시 한번 말씀드리자면, DB 에서 PL/R 코드 돌려가면서 디버깅 하는 것은 고통스러울 수 있으니 R codes 가 정확하게 작동하는 bug-free codes 인지 먼저 명확하게 확인한 후에 PL/R 코드의 $$ ~ $$ 사이에 넣고 실행하기 바랍니다. 



3-1. base 패키지의 strsplit() 함수를 이용하여 텍스트 파싱
     (text parsing using base package's strsplit() function)


base 패키지 안의 strsplit() 함수를 사용해서 텍스트를 구분자(delimiter)를 기준으로 분리(split) 하고, 이를 do.call 로 "cbind" 함수를 여러번 호출해서 세로로 묶어서 데이터 프레임을 만드는 방식입니다. 아래의 예시처럼 코드가 좀 복잡하고 어렵게 보일 수 있습니다. ㅜ_ㅜ


DB에서 SQL로 string_agg() 함수를 사용하려면 대상이 되는 칼럼을 text로 데이터 유형 변환 (type casting)을 먼저 해주어야 합니다. (아래 예시에서는 integer 형태인 x1, x2 를 x1::text, x2::text 를 사용해 text 형태로 변환 후 string_agg() 적용함)


R strsplit() 함수의 구분자는 DB에서 string_agg() 함수로 aggregation 할 때 사용했던 구분자로 설정해줍니다. (아래 예시에서는 구분자로 수직막대기 '|' 를 사용하였음)


*  Base R 패키지의 문자열 처리 함수 참고 : https://rfriend.tistory.com/37



--------------------------------------------------

-- (3-a) data preparation : string_agg() by column

--------------------------------------------------

drop table if exists test.string_agg_by_col;

create table test.string_agg_by_col as (

select 

grp

, string_agg(x1::text, '|') as x1_str_agg

, string_agg(x2::text, '|') as x2_str_agg

from test.tbl 

group by grp

) distributed by (grp);


select * from test.string_agg_by_col order by grp;



-- define PL/R UDF : (3-a) using DataFrame

drop function if exists test.plr_cor_3(text, text);

create or replace function test.plr_cor_3(x1 text, x2 text) 

returns float8

as

$$ 

# make a temp DataFrame

df_tmp <- data.frame(x1, x2)

# split by delimiter and reshape it in a long format

split_func <- function(x){

options(stringsAsFactors = FALSE) # Not to read strings as factors

df_split <- as.data.frame(

do.call('cbind'

, strsplit(as.character(x)

                                     set delimiter with yours

    , split="|"

    , fixed=T)))

return (df_split)

}

df <- data.frame(lapply(df_tmp, split_func))

colnames(df) <- c("x1", "x2") # set column names

# convert a data type from text to numeric

df <- data.frame(sapply(df, as.numeric))

# calculate correlation coefficients

corr_coef <- with(df, cor(x1, x2))

return (corr_coef)

$$ language 'plr';



-- execute PL/R UDF

select 

grp

, test.plr_cor_3(x1_str_agg, x2_str_agg) as corr_coef

from test.string_agg_by_col

order by grp asc;






3-2. data.table 패키지의 tstrsplit() 함수를 이용하여 텍스트 파싱 

      (text parsing using data.table package's tstrsplit() function)


data.table 패키지의 tstrsplit() 함수는 strsplit() 함수와 transpose 를 하나로 합쳐놓은 역할을 하는 함수로서, 위의 base 패키지를 사용한 파싱 대비 상대적으로 간편하고 깔끔하며 또 빠릅니다. 


data.table 패키지 안의 tstrsplit() 함수를 사용한다고 했으므로 사전에 Greenplum, PostgreSQL DB에 R data.table 패키지를 설치해두어야 합니다(Greenplum의 경우 각 segment node에 모두 설치 필요). 그리고 PL/R 함수 안에서는 library(data.table) 로 패키지를 로딩해주어야 합니다. 


R tstrsplit() 함수의 구분자는 DB에서 string_agg() 함수로 aggregation 할 때 사용했던 구분자로 설정해줍니다. (아래 예시에서는 구분자로 수직막대기 '|' 를 사용하였음)


* R data.table package's tstrsplit() function : https://www.rdocumentation.org/packages/data.table/versions/1.12.8/topics/tstrsplit 



--------------------------------------------------

-- (3-b) data preparation : string_agg() by column

--------------------------------------------------

drop table if exists test.string_agg_by_col;

create table test.string_agg_by_col as (

select 

grp

string_agg(x1::text'|'as x1_str_agg

string_agg(x2::text'|'as x2_str_agg

from test.tbl 

group by grp

) distributed by (grp);


select * from test.string_agg_by_col order by grp;




-- define PL/R UDF : (3-b) using data.table tstrsplit() function

drop function if exists test.plr_cor_4(text, text);

create or replace function test.plr_cor_4(x1 text, x2 text) 

returns float8

as

$$ 

library(data.table)

# make a temp DataTable

dt_tmp <- data.table(x1, x2)

# split by delimiter and reshape it in a long format

dt_split_func <- function(x){

dt_split <- data.table(tstrsplit(x, split="|", fixed=T))

return(dt_split)

}

df <- data.frame(lapply(dt_tmp, dt_split_func))

colnames(df) <- c("x1", "x2") # set column names

# convert a data type from text to numeric

df <- data.frame(sapply(df, as.numeric))

# calculate correlation coefficients

corr_coef <- with(df, cor(x1, x2))

return (corr_coef)

$$ language 'plr';



-- execute PL/R UDF

select 

grp

, test.plr_cor_4(x1_str_agg, x2_str_agg) as corr_coef

from test.string_agg_by_col;





많은 도움이 되었기를 바랍니다. 

이번 포스팅이 도움이 되었다면 아래의 '공감~'를 꾹 눌러주세요. :-)



728x90
반응형
Posted by Rfriend
,