이번 포스팅에서는 PostgreSQL, Greenplum database에서 Array의 특정 위치의 값을 가져오는 방법, 특히 PL/Python 사용자 정의 함수를 이용해서 Array의 여러개 위치에서 값을 가져오는 방법을 소개하겠습니다. 

 

PostgreSQL, Greenplum database 에서 

(1) Array 에서 특정 위치의 하나의 원소 값 가져오기

(2) Array 에서 시작~끝 위치의 원소 값을 Slicing 해서 가져오기

(3) Array 에서 여러개 위치의 원소 값을 PL/Python 사용자 정의함수를 사용해서 가져오기

 

 

How to get multiple elements in an Array using PL/Python on PostgreSQL, Greenplum database

 

 

 

먼저 Array를 포함하는 예제로 사용할 간단할 테이블을 만들어보겠습니다. 

 

-- creating a sample table with array column
DROP TABLE IF EXISTS arr_sample_tbl;
CREATE TABLE arr_sample_tbl (
	grp TEXT
	, x_arr iNTEGER[]
);

INSERT INTO arr_sample_tbl VALUES 
('a',  ARRAY[1, 2, 3, 4, 5, 6, 7, 8, 9, 10])
, ('b', ARRAY[11, 12, 13, 14, 15, 16, 17, 18, 19, 20])
, ('c', ARRAY[21, 22, 23, 24, 25, 26, 27, 28, 29, 30])
, ('d', ARRAY[31, 32, 33, 34, 35, 36, 37, 38, 39, 40])
;

SELECT * FROM arr_sample_tbl ORDER BY grp;
--grp|x_arr                          |
-----+-------------------------------+
--a  |{1,2,3,4,5,6,7,8,9,10}         |
--b  |{11,12,13,14,15,16,17,18,19,20}|
--c  |{21,22,23,24,25,26,27,28,29,30}|
--d  |{31,32,33,34,35,36,37,38,39,40}|

 

 

 

(1) Array 에서 특정 위치의 하나의 원소 값 가져오기

 

PostgreSQL/ Greenplum Array 칼럼에 array_column[position::int] 처럼 [ ] 안에 위치 값을 하나의 정수로 넣어주면 됩니다. 아래 예에서는 x_arr Array 칼럼의 3번째 위치한 값을 가져와 봤습니다. 

 

-- getting a value in an array using postion
SELECT 
	grp
	, x_arr[3] -- POSITION INDEX ok
FROM arr_sample_tbl 
ORDER BY grp
;
--grp|x_arr|
-----+-----+
--a  |    3|
--b  |   13|
--c  |   23|
--d  |   33|

 

 

 

(2) Array 에서 시작~끝 위치의 원소 값을 Slicing 해서 가져오기

 

PostgreSQL/ Greenplum Array 칼럼에서 array_column[start_position:end_position] 구문으로 "시작 위치 ~ 끝 위치" 까지의 연속된 원소 값들을 Slicing 해올 수 있습니다. 아래 예제에서는 x_arr 의 Array 칼럼에서 1번째부터 3번째 위치까지의 연속된 원소값들을 Slicing 해왔습니다. 

 

-- slicing from start position to end position
SELECT 
	grp
	, x_arr[1:3] -- SLICING ok
FROM arr_sample_tbl 
ORDER BY grp
;
--grp|x_arr     |
-----+----------+
--a  |{1,2,3}   |
--b  |{11,12,13}|
--c  |{21,22,23}|
--d  |{31,32,33}|

 

 

 

(3) Array 에서 여러개 위치의 원소 값을 PL/Python 사용자 정의함수를 사용해서 가져오기

 

위의 (2)번에서는 Array에서 연속된 (시작 위치 ~ 끝 위치) 까지의 값을 Slicing 해왔는데요, 만약 연속된 위치의 여러개 값이 아니라 Array 안에 띄엄띄엄 있는 여러개의 원소 값들을 위치로 가져오고 싶을 때는 SQL syntax error 가 발생합니다. 

 

-- SQL Error [42601]: ERROR: syntax error at or near ","
SELECT 
	grp
	, x_arr[1, 4, 6, 9]  -- SQL Error -_-;;;
FROM arr_sample_tbl 
ORDER BY grp
;
--SQL Error [42601]: ERROR: syntax error at or near ","
--  Position: 24
--
--Error position: line: 528 pos: 23

 

 

PostgreSQL, Greenplum DB에서 Array 내에 띄엄띄엄 있는 여러개 위치의 원소값을 가져오고 싶을 때 아래의 PL/Python 사용자 정의함수를 사용해보세요. 참고로, Input으로 집어넣는 DB col_position 위치 값은 1부터 시작하는 반면, PL/Python의 내부 코드블록에서는 Python array의 위치 index가 0 부터 시작하기 때문에 np.array(col_postion) - 1 처럼 1을 빼줬습니다. 

아래 예제에서는 x_arr 칼럼의 1, 4, 6, 9 번째 위치에 있는 Array 복수개 원소들을 복수의 위치값을 사용해서 가져왔습니다. Greenplum 을 사용하면 분산병렬처리가 되기 때문에 PL/Python 사용자 정의함수의 처리 속도도 무척 빠르답니다! 

 

-- UDF: selecting multiple elements using PL/Python in PostgreSQL, Greenplum
DROP FUNCTION IF EXISTS plpy_arr_multi_idx_select(int[], int[]);
CREATE OR REPLACE FUNCTION plpy_arr_multi_idx_select(
          x_arr int[], col_position int[]
          ) 
RETURNS int[]
AS $$
    import numpy as np
    
    # INPUT ARRAY
    arr = np.array(x_arr)
    
    # COLUMN POSITION 
    # Python starts from 0 index. (vs. SQL starts from 1)
    # , so -1 from col_position
    col_position_arr = np.array(col_position) - 1
    
    # getting data by positional indexing
    arr_selected = arr[col_position_arr]
    
    return arr_selected

$$ LANGUAGE 'plpythonu';


-- executing the PL/Python UDF above
-- getting multiple values in an array 
-- using PL/Python UDF in PostgreSQL, Greenplum DB
SELECT 
	grp
	, plpy_arr_multi_idx_select( -- PL/Python USER DEFINED Funtion
		x_arr                -- INPUT ARRAY
		, ARRAY[1, 4, 6, 9]  -- COLUMN POSITION
	) AS selected_values
FROM arr_sample_tbl 
ORDER BY grp
;

--grp|selected_values|
-----+---------------+
--a  |{1,4,6,9}      |
--b  |{11,14,16,19}  |
--c  |{21,24,26,29}  |
--d  |{31,34,36,39}  |

 

 

이번 포스팅이 많은 도움이 되었기를 바랍니다. 

행복한 데이터 과학자 되세요!  :-)

 

728x90
반응형
Posted by Rfriend
,

이번 포스팅에서는 PostgreSQL, Greenplum Database에서 문자열에 대한 함수 3가지를 소개하겠습니다. 

 

(1) STRING_AGG() : 개별 문자열을 그룹별로 하나의 문자열로 합치기

     (concatenate individual strings into a string by group) 

(2) STRING_TO_ARRAY() : 문자열을 구분자를 기준으로 나누어서 여러개의 원소를 가진 Array 로 만들기

      (splits a string into array elements using supplied delimiter)

(3) UNNEST(STRING_TO_ARRAY()) : 문자열 원소를 가진 ARRAY를 나누어서 개별 문자열 행으로 풀기

       (the opposite of STRING_AGG(), splits array elements and unnest it into multiple rows)

 

 

concatenating strings into a string, unnest string array into multiple rows in PostgreSQL, Greenplum

 

 

예제로 사용할 country 테이블을 만들어보겠습니다. 

 

---------------------------------------------------------------------------------------------
-- String functions and operators in PostgreSQL, Greenplum Database
-- string_agg(), string_to_array(), unnest(string_to_array())
----------------------------------------------------------------------------------------------

DROP TABLE IF EXISTS country;
CREATE TABLE country (
	continent  TEXT
	, country TEXT 
);
INSERT INTO country VALUES 
('Asia', 'Korea')
, ('Asia', 'China')
, ('Asia', 'Japan')
, ('Ameria', 'USA')
, ('Ameria', 'Canada')
, ('Ameria', 'Mexico')
, ('Europe', 'UK')
, ('Europe', 'Fance')
, ('Europe', 'Germany');


SELECT * FROM country ORDER BY 1, 2;
--continent|country|
-----------+-------+
--Ameria   |Canada |
--Ameria   |Mexico |
--Ameria   |USA    |
--Asia     |China  |
--Asia     |Japan  |
--Asia     |Korea  |
--Europe   |Fance  |
--Europe   |Germany|
--Europe   |UK     |

 

 

 

(1) STRING_AGG() : 개별 문자열을 그룹별로 하나의 문자열로 합치기

     (concatenate individual strings into a string by group) 

 

-- (1) string_agg()
-- : non-null input values concatenated into a string, separated by delimiter
-- syntax: string_agg(expression, delimiter [order by])
DROP TABLE IF EXISTS country_agg_tbl;
CREATE TABLE country_agg_tbl AS (
	SELECT 
		continent
		, STRING_AGG(country, ',') AS country_agg
	FROM country 
	GROUP BY continent
); 

SELECT * FROM country_agg_tbl ORDER BY 1, 2;
--continent|country_agg      |
-----------+-----------------+
--Ameria   |USA,Canada,Mexico|
--Asia     |Korea,China,Japan|
--Europe   |UK,Fance,Germany |

 

 

 

(2) STRING_TO_ARRAY() : 문자열을 구분자를 기준으로 나누어서 여러개의 원소를 가진 Array 로 만들기

      (splits a string into array elements using supplied delimiter)

 

-- (2) string_to_array()
-- : splits string into array elements using supplied delimiter and optional null string
-- syntax: string_to_array(text, text [, text])


SELECT 
	continent
	, STRING_TO_ARRAY(country_agg, ',') AS country_array
FROM country_agg_tbl
ORDER BY 1;

--continent|country_array      |
-----------+-------------------+
--Ameria   |{USA,Canada,Mexico}|
--Asia     |{Korea,China,Japan}|
--Europe   |{UK,Fance,Germany} |

 

 

옵션으로 세번째 매개변수 위치에 "NULL 처리할 문자열"을 지정할 수 있습니다. 아래 예에서는 'USA' 문자열에 대해서 NULL 처리하였습니다. 

 

-- NULL string optional
SELECT 
	continent
	, STRING_TO_ARRAY(
		country_agg -- string
		, ','       -- delimiter
		, 'USA' -- NULL string
		) AS country_array
FROM country_agg_tbl
ORDER BY 1;

--continent|country_array       |
-----------+--------------------+
--Ameria   |{NULL,Canada,Mexico}|   -- 'USA' --> NULL
--Asia     |{Korea,China,Japan} |
--Europe   |{UK,Fance,Germany}  |

 

 

 

(3) UNNEST(STRING_TO_ARRAY()) : 문자열 원소를 가진 ARRAY를 나누어서 개별 문자열 행으로 풀기

       (the opposite of STRING_AGG(), splits array elements and unnest it into multiple rows)

 

-- (3) unnest(string_to_array())
-- splits array elements and unnest it into multiple rows
-- the opposite of string_agg()

SELECT 
	continent
	, UNNEST(STRING_TO_ARRAY(country_agg, ',')) AS country
FROM country_agg_tbl
ORDER BY 1, 2;

--continent|country|
-----------+-------+
--Ameria   |Canada |
--Ameria   |Mexico |
--Ameria   |USA    |
--Asia     |China  |
--Asia     |Japan  |
--Asia     |Korea  |
--Europe   |Fance  |
--Europe   |Germany|
--Europe   |UK     |

 

[Reference]

* PostgreSQL's STRING_AGG() function
   : https://www.postgresql.org/docs/9.4/functions-aggregate.html 

* PostgreSQL's STRING_TO_ARRAY() function
   : https://www.postgresql.org/docs/9.1/functions-array.html

 

 

이번 포스팅이 많은 도움이 되었기를 바랍니다. 

행복한 데이터 과학자 되세요!  :-)

 

 

728x90
반응형
Posted by Rfriend
,

이번 포스팅에서는 Python의 popen() 메소드PL/Python 사용자 정의 함수를 사용하여 

 

(1) Python 의 popen() 메소드 소개

(2) Greenplum DB에 설치된 Package 이름 리스트 확인

(3) Greenplum DB에 설치된 Docker Container Image 리스트 확인

(4) Greenplum DB에 등록된 PL/Container Runtime ID 확인

 

하는 방법을 소개하겠습니다. 

 

 

(1) Python 의 popen() 메소드 소개

 

Python 메서드 popen() 은 시스템 명령어(command) 실행 결과를 가져올 때 사용합니다. popen() 메소드의 반환 값은 파이프에 연결된 열린 파일 객체이며, 모드가 'r'(기본값) 이면 읽기모드이고, 'w' 이면 쓰기모드입니다. Buffering 크기도 정수로 설정할 수 있습니다. 

 

python os.popen() method

 

 

아래 예시는 Python의 os.popen() 메소드를 사용해 'ifconfig' command 를 실행하여 IP 주소를 확인해본 것입니다. 

 

import os

ifconfig = os.popen('ifconfig')
ifconfig_info = ifconfig.read()
ifconfig.close()

print(ifconfig_info)

# lo0: flags=xxxx<UP,LOOPBACK,...
# 	options=xxxx<RXCSUM,TXCSUM,...
# 	inet xxxxxxx netmask 0xff000000 
# 	inet6 ::xx prefixlen xxx 
# 	inet6 fe80::xxxx prefixlen xx scopeid xx 
# 	nd6 options=xxx<PERFORMNUD,DAD>
# gif0: flags=xxx<POINTOPOINT,MULTICAST> mtu 1280
# stf0: flags=0<> mtu xxxx
# en5: flags=xxx<UP,BROADCAST,SMART,RUNNING,SIMPLEX,MULTICAST> mtu 1500
# 	ether ac:de:48:00:11:22 
# 	inet6 fe80::aede:48ff:fe00:1122%en5 prefixlen xx scopeid 0x4 
# 	nd6 options=xxx<PERFORMNUD,DAD>
# 	media: autoselect (100baseTX <full-duplex>)
# 	status: active
# ap1: flags=xxxx<BROADCAST,SIMPLEX,MULTICAST> mtu xxxx
# 	options=xxx<CHANNEL_IO>
# 	ether aa:66:5a:20:77:d2 
# 	media: autoselect
# ...

 

 

 

(2) Greenplum DB에 설치된 Package 이름 리스트 확인

 

아래 코드는 os.popen() 메소드로 'gppkg -q --all' 시스템 명령어(command)를 실행해서 Greenplum DB 에 설치된 Package 이름 리스트를 확인하는 PL/Python 사용자 정의 함수입니다.  split() 후에 7번째 위치 이후의 값만 가져오면 됩니다. 

 

--- UDF for getting all package's list on Greenplum DB
DROP FUNCTION IF EXISTS plpy_gppkg_info_func();
CREATE OR REPLACE FUNCTION plpy_gppkg_info_func() 
RETURNS TABLE (LIST text) 
AS $$

	import os
	
	process = os.popen('gppkg -q --all')
	processed = process.read()
	process.close()
	
	# get only packages names 
	result = processed.split()[7:]
	
	return result

$$ LANGUAGE 'plpythonu';


-- Run the UDF above
SELECT plpy_gppkg_info_func() AS gppkg_info;
--gppkg_info                               |
------------------------------------------+
--DataSciencePython-2.0.5                 |
--DataScienceR-2.0.2                      |
--plr-3.0.4                               |
--plcontainer-2.1.5                       |
--pivotal_greenplum_backup_restore-1.25.0 |
--madlib-1.20.0_1                         |

 

 

 

 

(3) Greenplum DB에 설치된 Docker Container Image 리스트 확인

 

아래 코드는 os.popen() 메소드로 'plcontainer image-list' 시스템 명령어(command)를 실행해서 Greenplum DB 에 설치된 Docker Container Image 이름 리스트를 확인하는 PL/Python 사용자 정의 함수입니다.  줄바꿈 '\n' 구분자로 split() 하여서 결과값을 반환받았습니다. 

 

---- UDF for  displaying the installed Docker images on the local host use
DROP FUNCTION IF EXISTS plpy_container_imagelist_func();
CREATE OR REPLACE FUNCTION plpy_container_imagelist_func() 
RETURNS TABLE (LIST text) 
AS $$

	import os
	
	processed = os.popen('plcontainer image-list').read().split('\n')
	
	return processed

$$ LANGUAGE 'plpythonu';


-- Run the UDF above
SELECT plpy_container_imagelist_func() AS container_image_info;
--container_image_info                                                                    |
------------------------------------------------------------------------------------------+
--REPOSITORY                               TAG       IMAGE ID       CREATED         SIZE  |
--pivotaldata/plcontainer_python3_shared   devel     e4cda7f63158   20 months ago   6.71GB|
--pivotaldata/plcontainer_r_shared         devel     3a2472dec133   3 years ago     1.19GB|
--pivotaldata/plcontainer_python_shared    devel     c94546182146   3 years ago     2.34GB|
--                                                                                        |

 

 

(4) Greenplum DB에 등록된 PL/Container Runtime ID 확인

 

아래 코드는 os.popen() 메소드로 'plcontainer runtime-show' 시스템 명령어(command)를 실행해서 Greenplum DB 에 등록된 PL/Container Runtime ID 리스트를 확인하는 PL/Python 사용자 정의 함수입니다.  

'plcontainer runtime-add -r plc_python3_shared -i' 이런식으로 Docker Container Image 정보를 PL/Container configuration file 에 추가 등록해준 정보를 'plcontainer runtime-show' 로 확인할 수 있습니다. 

 

----- UDF for listing the names of the runtimes you created and added to the PL/Container XML file.
DROP FUNCTION IF EXISTS plpy_container_runtimeshow_func();
CREATE OR REPLACE FUNCTION plpy_container_runtimeshow_func() 
RETURNS TABLE (LIST text) 
AS $$

	import os
	
	processed = os.popen('plcontainer runtime-show').read().split('\n')
	
	return processed

$$ LANGUAGE 'plpythonu';


-- Run the UDF above
SELECT plpy_container_runtimeshow_func() AS container_runtimeshow_info;

--PL/Container Runtime Configuration: 
-----------------------------------------------------------
--  Runtime ID: plc_python3_shared
--  Linked Docker Image: pivotaldata/plcontainer_python3_shared:devel
--  Runtime Setting(s): 
--  Shared Directory: 
--  ---- Shared Directory From HOST '/usr/local/greenplum-db-6.22.1/bin/plcontainer_clients' to Container '/clientdir', access mode is 'ro'
-----------------------------------------------------------
--
-----------------------------------------------------------
--  Runtime ID: plc_r_shared
--  Linked Docker Image: pivotaldata/plcontainer_r_shared:devel
--  Runtime Setting(s): 
--  Shared Directory: 
--  ---- Shared Directory From HOST '/usr/local/greenplum-db-6.22.1/bin/plcontainer_clients' to Container '/clientdir', access mode is 'ro'
-----------------------------------------------------------
--
-----------------------------------------------------------
--  Runtime ID: plc_python_shared
--  Linked Docker Image: pivotaldata/plcontainer_python_shared:devel
--  Runtime Setting(s): 
--  Shared Directory: 
--  ---- Shared Directory From HOST '/usr/local/greenplum-db-6.22.1/bin/plcontainer_clients' to Container '/clientdir', access mode is 'ro'
-----------------------------------------------------------

 

 

 

이번 포스팅이 많은 도움이 되었기를 바랍니다. 

행복한 데이터 과학자 되세요!

 

728x90
반응형
Posted by Rfriend
,

이번 포스팅에서는 SQL Query 의 실행 순서 (SQL Query order of execution) 에 대해서 알아보겠습니다. 

 

 

1. 왜 SQL Query 실행 순서를 알아야 하는가? (Why does SQL Query Order matter?)

SQL Query 처리 순서는 Query 구문이 평가되는 순서를 정의합니다. Query 의 각 부분은 순차적으로(sequentially) 실행이 되므로, 쿼리의 실행 순서를 이해해야지만 어느 부분에서 무슨 결과에 접근할 수 있는지를 알 수 있습니다. SQL Query 순서를 정확하게 이해하는 것은 자주 접하게 되는 Query 의 도전사항, 혹은 실행되지 않는 Query의 문제를 해결하는데 필수적입니다. 그리고 SQL Query 의 처리성능을 최적화하고 속도를 향상시키는데에도 Query 실행순서를 이해하는게 큰 기여를 할 수 있습니다.  

 

 

 

2. SQL Query 실행 순서 (SQL Query order of execution)

 

아래는 일반적인 SQL Query 의 구문입니다. 

 

--------------------
-- SQL Query Syntax
--------------------
SELECT DISTINCT column, AGG_FUNC(column_or_expression), …
FROM mytable
    JOIN another_table
      ON mytable.column = another_table.column
    WHERE constraint_expression
    GROUP BY column
    HAVING constraint_expression
    ORDER BY column ASC/DESC
    LIMIT count OFFSET COUNT;

 

 

SQL Query 의 실행 순서는 아래와 같습니다. 

 

순서 구문 기능
1   FROM + JOIN 대상이 되는 소스 테이블에서 데이터를 선택하고 Join 함.
2   WHERE 1번에서 선택되고 Join 된 데이터를 Where 조건절에 맞게 필터링함. 
3   GROUP BY Group by 절의 기준 별로 데이터가 집계(aggregate)됨. 
4   HAVING 집계된 데이터를 Having 조건절에 맞게 필터링함. 
5   SELECT 최종 데이터를 반환함. 
6   ORDER BY 최종 데이터를 Order by 절의 변수를 기준으로 정렬(sorting)함. 
7   LIMIT/ OFFSET 행의 개수만큼 반환되는 데이터의 수를 제한(limit)함. 

 

 

아래는 이미지는 SQL Query 처리 순서(by Brij Kishore Pandey, LinkedIn/Twitter @brijpandeyji)를 도식화한 것인데요, 직관적으로 알기 쉽게 풀어놓아서 인용해봅니다. 

 

SQL Queries run in this order, by Brij Kishore Pandey

 

 

(순서 1) FROM + JOIN 절

 

 SQL 의 FROM +JOIN 절은 Query 에서 제일 먼저 실행되는 부분입니다. 따라서 메모리를 많이 사용하는 두 개의 큰 테이블을 Join 하기 전에 대상 테이블의 크기를 제한하거나 미리 집계(pre-agregate tables)를 해놓는다면 성능의 향상을 기대할 수 있습니다.

많은 SQL planner 들은 다른 Query 들의 최적화를 돕기 위해 로직과 다른 유형의 Join을 사용합니다.  가령, 아래의 SQL Query 의 경우, SQL Planner 는 where 절에 있는 조건절인 (table_a 에서 '30 <= age < 40' 로 필터) 를 먼저 실행하고, 그 다음에 Join 을 하는 것이 성능에 유리하고, 결과는 표준 SQL 처리 순서와 동일하게 반환하게 됩니다. 

 

select
 count(*)
from
 table_a as a
join
 table_b as b
on
 a.id =  b.id
where
 a.age >= 30 and a.age < 40

 

PostgreSQL, Greenplum database 에서는 'EXPLAIN' 절을 사용해서 Query Plan 을 볼 수 있습니다. 

 

 

 

(순서 2) WHERE 절

 

WHERE 절은 테이블의 칼럼 값으로 선택되고 Join 된 테이블 데이터를 제한하는데 사용됩니다. Where 절의 조건을 만족하지 않는 개별 행은 제거됩니다. Where 절에는 숫자형, 문자형, 날짜형, 블리언 등 어떤 데이터 유형도 사용가능합니다. 

단, 대부분의 DB에서 Alias 는 사용할 수 없습니다. 

 

 

 

(순서 3) GROUP BY 절

 

GROUP BY 절은 데이터의 각 개별 값들을 GROUP BY 'X' 의 각 그룹별로 sum(), count(), average(), min(), max() 등과 같은 집계 함수(aggregation functions)의 계산된 하나의 결과 값으로 요약해줍니다. 

 

 

 

(순서 4) HAVING 절

 

SQL Query 에 GROUP BY 절이 있다면, HAVING 절의 제약 조건이 그룹별로 집계된 행에 적용되어서, HAVING 절의 조건을 만족시키지 못하는 그룹별 집계된 행(grouped rows) 를 제거하게 됩니다. WHERE 절처럼, 대부분의 DB에서 HAVING 절은 Alias 를 사용할 수 없습니다. 

 

 

(순서 5) SELECT 절 

 

위의 순서 1, 2, 3, 4 를 처리한 후의 데이터에 대해서 드디어 SELECT 절을 실행하여 칼럼 값을 계산하고 가져옵니다. 

 

 

 

(순서 6) ORDER BY 절 

 

위의 순서 1, 2, 3, 4, 5 번이 차례대로 실행된 결과의 데이터에 대해서 ORDER BY 절의 칼럼을 기준으로 오름차순(Ascending order) 또는 내림차순(Descending order) 으로 정렬을 해줍니다.

Query의 SELECT 절이 연산이 끝난 상태이므로, ORDER BY 절에서는 Alias 를 참조할 수 있습니다.   

 

 

 

(순서 7) LIMIT / OFFSET 절

 

마지막으로, LIMIT 과 OFFSET 절을 사용해서 최종으로 반환되는 결과 값에서 행의 개수를 제한합니다. 

 

  * LIMIT number: 숫자 만큼의 행 출력

  * OFFSET number: 숫자의 행부터 출력

 

예) select * from mytable LIMIT 10 OFFSET 5;

      ==> mytable 의 "5번째 행부터 (OFFSET 5)", "10개의 행을 출력(LIMIT 10)"

 

 

 

[Reference]

[1] SQL Query Order of Execution, by Sisense Team
: https://www.sisense.com/blog/sql-query-order-of-operations/

[2] SQL Lesson 12: Order of execution of a Query
: https://sqlbolt.com/lesson/select_queries_order_of_execution

 

 

이번 포스팅이 많은 도움이 되었기를 바랍니다. 

행복한 데이터 과학자 되세요~! :-)

 

 

728x90
반응형
Posted by Rfriend
,

이번 포스팅에서는 PostgreSQL, Greenplum에서 차원이 다른 Array를 2D Array로 Aggregation 할 때, 그룹별 Array의 최대 길이를 계산해서 동적으로 최대길이에 모자란 부분만큼 '0'으로 채워서(padding) 차원을 동일하게 맞춘 후에 2D Array로 Aggregation 하는 방법을 소개하겠습니다. 

 

(1) 가상의 시계열 데이터 생성

(2) 2D Array Aggregation 하는 사용자 정의 함수(UDF) 정의

(3) 차원이 다를 경우 2D Array Aggregation Error 발생
      : cannot concatenate incompatible arrays

(4) 그룹별 Array의 차원을 일치시켜서 2D Array Aggregation 하는 방법

 

 

 

 

 

(1) 가상의 시계열 데이터 생성

 

먼저, 예제로 사용할 시계열 데이터로 이루어진 테이블을 만들어보겠습니다.

10의 ID 그룹이 있고, 각 ID별로 20개의 param_id 그룹이 있으며, 각 param_id별로 또 20개의 ts(TimeStamp)별로 측정값(measure_val)이 있는 시계열 데이터셋을 가진 테이블입니다. 

 

이때 무작위로 생성한 난수가 0.5보다 작을 경우는 난수를 측정값으로 사용하고, 그렇지 않을 경우는 결측값(NULL)으로 해서 measure_val 칼럼을 만들어주었습니다. 

 

-- (1) create sample dataset table
DROP TABLE IF EXISTS my_tbl;
CREATE TABLE my_tbl AS (
    SELECT 
        d.*
        -- about 50% NULL, 50% measured values
        , CASE WHEN random() < 0.5 THEN round(random()::numeric, 2) ELSE NULL 
            END AS measure_val 
    FROM (
        SELECT 
            a.*    -- id
            , b.*  -- param_id
            , c.* -- ts (TimeStamp)
        FROM 
            (SELECT generate_series(1, 10) AS id) AS a 
        CROSS JOIN 
            (SELECT generate_series(1, 20) AS param_id) AS b
        CROSS JOIN 
            (SELECT generate_series(1, 20) AS ts) AS c
    ) AS d
);

-- the measure_vals will be different due to random number generation
SELECT * FROM my_tbl ORDER BY 1, 2, 3 LIMIT 10;
--id|param_id|ts|measure_val|
----+--------+--+-----------+
-- 1|       1| 1|       0.93|
-- 1|       1| 2|           |
-- 1|       1| 3|       0.87|
-- 1|       1| 4|       0.41|
-- 1|       1| 5|       0.57|
-- 1|       1| 6|           |
-- 1|       1| 7|       0.60|
-- 1|       1| 8|       0.67|
-- 1|       1| 9|       0.02|
-- 1|       1|10|       0.21|


-- total number of rows: 10 * 20 * 20 = 4,000
SELECT count(1) AS row_cnt FROM my_tbl; 
--row_cnt|
---------+
--   4000|

 

 

 

PostgreSQL 에 있는 array_agg() 함수를 사용해서 id별, param_id별 그룹을 기준으로 param_id, ts, measure_val을 1D array로 aggregation 해보면 아래와 같습니다. 이때 조건절에 "WHERE measure_val IS NOT NULL"을 부여해서 결측값을 제거해주면 각 id, param_id 그룹별로 array 내 원소(element)의 개수가 서로 달라지게 됩니다.  (<-- 이 부분이 나중에 (3)번에서 1D array를 2D array로 aggregation 할 때 1D array의 차원이 달라서 2D array로 묶을 수 없다는 에러를 발생시키게 됩니다.)

 

-- 1D array aggregation using array_agg() function
SELECT 
    id
    , param_id
    , array_agg(param_id) AS param_id_arr
    , array_agg(ts ORDER BY ts) AS ts_arr 
    , array_agg(measure_val ORDER BY ts) AS measure_val_arr
FROM my_tbl
WHERE measure_val IS NOT NULL 
GROUP BY 1, 2
ORDER BY 1, 2
LIMIT 5

 

 

 

 

(2) 2D Array Aggregation 하는 사용자 정의 함수(UDF) 정의

 

PostgreSQL에는 1D array를 2D array로 묶어주는 내장함수가 없으므로 아래와 같이 사용자 정의 함수(User Defined Function)를 만들고, Select 문으로 사용자 정의함수를 호출해서 사용하면 됩니다. 

 

인풋으로 사용하는 데이터 유형별로 각각 사용자 정의함수를 정의해주는데요, 아래는 정수(Integer) 데이터 타입에 대한 1D array를 2D array로 묶어주는 사용자 정의함수입니다. 

 

-- UDF for 2D array aggregation for INTEGER
DROP FUNCTION IF EXISTS array_append_2d_int(integer[][], integer[]) CASCADE;
CREATE OR REPLACE FUNCTION array_append_2d_int(integer[][], integer[])
    RETURNS integer[][]
    LANGUAGE SQL
    AS 'select array_cat($1, ARRAY[$2])'
    IMMUTABLE
;

DROP AGGREGATE IF EXISTS array_agg_array_int(integer[]) CASCADE;
CREATE ORDERED AGGREGATE array_agg_array_int(integer[])
(
    SFUNC = array_append_2d_int
    , STYPE = integer[][]
);

 

 

 

아래는 NUMERIC 데이터 유형을 원소로 가지는 1D array를 2D array 로 묶어주는 사용자 정의 함수입니다. 

 

-- UDF for 2D array aggregation (NUMERIC)
DROP FUNCTION IF EXISTS array_append_2d_numeric(NUMERIC[][], NUMERIC[]) CASCADE;
CREATE OR REPLACE FUNCTION array_append_2d_numeric(NUMERIC[][], NUMERIC[])
    RETURNS NUMERIC[][]
    LANGUAGE SQL
    AS 'select array_cat($1, ARRAY[$2])'
    IMMUTABLE
;

DROP AGGREGATE IF EXISTS array_append_2d_numeric(NUMERIC[]) CASCADE;
CREATE ORDERED AGGREGATE array_append_2d_numeric(NUMERIC[])
(
    SFUNC = array_append_2d_numeric
    , STYPE = NUMERIC[][]
);

 

 

 

 

(3) 차원이 다를 경우 2D Array Aggregation Error 발생
      : cannot concatenate incompatible arrays

 

아래의 SQL Query 처럼 각 Array의 원소(element) 개수가 다를 경우, 이를 2D Array로 합치려고(concatenation) 하면 Array의 차원이 일치하지 않는다는(incompatible) 에러가 발생합니다. 

 

"SQL Error [2202E]: ERROR: cannot concatenate incompatible arrays  (seg4 slice1 10.0.1.238:6004 pid=7073)
  Detail: Arrays with differing element dimensions are not compatible for concatenation.
  Where: SQL function "array_append_2d_numeric" statement 1"

 

--SQL Error [2202E]: ERROR: cannot concatenate incompatible arrays  (seg0 slice1 10.0.1.238:6000 pid=30201)
--  Detail: Arrays with differing element dimensions are not compatible for concatenation.
--  Where: SQL function "array_append_2d_numeric" statement 1

SELECT 
	a.id
	, array_agg_array_int(param_id_arr ORDER BY param_id) AS param_id_arr2d
	, array_agg_array_int(ts_arr ORDER BY param_id) AS ts_arr2d
	, array_agg_array_numeric(measure_val_arr ORDER BY param_id) AS measure_val_arr2d
FROM (
	SELECT 
		id
		, param_id
		, array_agg(param_id) AS param_id_arr
		, array_agg(ts ORDER BY ts) AS ts_arr 
		, array_agg(measure_val ORDER BY ts) AS measure_val_arr
	FROM my_tbl
	WHERE measure_val IS NOT NULL 
	GROUP BY 1, 2
) AS a 
GROUP BY 1;

-- it eraises an ERROR: cannot concatenate incompatible arrays

 

 

 

(4) 그룹별 Array의 차원을 일치시켜서 2D Array Aggregation 하는 방법

 

1D array의 원소 개수가 서로 달라서 2D array 로 묶을 수 없을 경우(ERROR: cannot concatenate incompatible arrays), 아래의 절차를 따라서 각 그룹별 1D array의 차원을 일치시킨 후에 2D array로 묶을 수 있습니다. 

 

(a) 묶고자 하는 기준이 되는 그룹별로 array_agg() 함수를 사용해서 1D array로 묶고, 

(b) 그룹별로 1D array들의 각 원소 개수를 세어서 그룹별로 1D array의 최대 길이 (arr_max_length)를 계산하고, 

(c) 그룹별로 1D array의 최대 길이 모자라는 개수만큼 '0'으로 각 1D Array를 채워서(padding) 차원을 일치시킨 후에, 

(d) 그룹별로 1D array를 2D array로 (2)번에서 정의한 사용자 정의함수를 사용해서 묶어주기

 

-- Dynamic 2D array aggregation for arrays with NULL and different dimentions
DROP TABLE IF EXISTS my_tbl_2d_arr;
CREATE TABLE my_tbl_2d_arr AS (
WITH t AS (
	SELECT 
		id
		, param_id
		, array_agg(param_id ORDER BY param_id, ts) 
            AS param_id_arr
		, array_agg(ts ORDER BY param_id, ts) 
            AS ts_arr
		, array_agg(measure_val ORDER BY param_id, ts) 
            AS measure_val_arr
	FROM 	my_tbl
		WHERE measure_val IS NOT NULL
		GROUP BY 1, 2
), t2 AS (
	SELECT 
		id 
		, max(array_length(measure_val_arr, 1)) 
            AS arr_max_length
	FROM t
	GROUP BY 1
), t_padded AS (
	SELECT 
		t.id
		, t.param_id
		, array_cat(
			t.param_id_arr, 
			array_fill(NULL::INTEGER, ARRAY[arr_max_length 
            	        - COALESCE(array_length(t.param_id_arr, 1), 0)])
			) AS param_id_arr_padded
		, array_cat(
			t.ts_arr, 
			array_fill(NULL::INTEGER, ARRAY[arr_max_length 
            	        - COALESCE(array_length(t.ts_arr, 1), 0)])
			) AS ts_arr_padded
		, array_cat(
			t.measure_val_arr, 
			array_fill(NULL::NUMERIC, ARRAY[arr_max_length 
            	        - COALESCE(array_length(t.measure_val_arr, 1), 0)])
			) AS measure_val_arr_padded
	FROM t, t2
	WHERE t.id = t2.id
) 
SELECT 
	a.id
	, array_agg_array_int(param_id_arr_padded ORDER BY param_id) 
    	AS param_id_arr2d
	, array_agg_array_int(ts_arr_padded ORDER BY param_id) 
    	AS ts_arr2d
	, array_agg_array_numeric(measure_val_arr_padded ORDER BY param_id) 
    	AS measure_val_arr2d
FROM t_padded AS a 
GROUP BY 1
);


SELECT * FROM my_tbl_2d_arr ORDER BY 1 LIMIT 5;

 

 

다음 포스팅에서는 그룹별로 묶어놓은 2D array를 그룹별 1D array 로 unnest 하는 방법(https://rfriend.tistory.com/728)를 참고하세요. 

 

이번 포스팅이 많은 도움이 되었기를 바랍니다. 

행복한 데이터 과학자 되세요!  :-)

 

728x90
반응형
Posted by Rfriend
,

시계열 데이터를 분석할 때 제일 처음 확인하고 처리하는 일이 결측값(missing values) 입니다. 이번 포스팅에서는 시계열 데이터의 결측값을 선형 보간(linear interpolation)하는 2가지 방법을 소개하겠습니다. 

 

(1) Python 으로 결측값을 포함하는 예제 시계열 데이터 생성하기

(2) Python 의 for loop 순환문으로 시계열 데이터 결측값을 보간하기

      (interpolation sequentially using Python for loop statement)

(3) Greenplum에서 PL/Python으로 분산병렬처리하여 시계열 데이터 결측값을 보간하기

     (interpolation in parallel using PL/Python on Greenplum)

 

 

 

(1) Python 으로 결측값을 포함하는 예제 시계열 데이터 생성하기

 

샘플 시계열 데이터를 저장할 폴더를 base_dir 에 지정해주었습니다.

 

분석 단위로 사용할 Lot, Cell, Parameter, TimeStamp 의 개수를 지정해 줍니다. 아래 예에서는 각 Lot, Cell, Parameter 별로 100개의 TimeStamp 별로 측정값을 수집하고, 각 분석 단위별로 10개의 결측값을 포함하도록 설정했습니다. 

 

np.random.normal(10, 30, ts_num) 
: 측정값은 정규분포 X~N(10, 3) 의 분포로 부터 ts_num 인 100개를 난수 생성하여 만들었습니다. 

 

nan_mask = np.random.choice(np.arange(ts_num), missing_num)
ts_df_tmp.loc[nan_mask, 'measure_val'] = np.nan

: 각 분석 단위의 100 개의 측정치 중에서 무작위로 missing_num = 10 개를 뽑아서 np.nan 으로 교체하여 결측값으로 변경하였습니다.  

 

하나의 Lot에 Cell 100개, 각 Cell별 Parameter 10개, 각 Parameter 별 TimeStamp의 측정치 100개, 이중 결측치 10개를 포함한 시계열 데이터를 Lot 별로 묶어서(concat) DataFrame을 만들고, 이를 CSV 파일로 내보냅니다. 

 

#%% setting the directories and conditions
base_dir = '/Users/Documents/ts_data/'

## setting the number of IDs' conditions 
lot_num = 1000
cell_num = 100
param_num = 10
missing_num = 10
ts_num = 100 # number of TimeStamps


#%% [Step 1] generating the sample dataset

import numpy as np
import pandas as pd
import os
from itertools import chain, repeat

## defining the UDF
def ts_random_generator(lot_id, cell_num, param_num, ts_num, missing_num, base_dir):
    # blank DataFrame for saving the sample datasets later
    ts_df = pd.DataFrame()
    
    for cell_id in np.arange(cell_num):
        for param_id in np.arange(param_num):
            # making a DataFrame with colums of lot_id, cell_cd, param_id, ts_id, and measure_val
            ts_df_tmp = pd.DataFrame({
                'lot_id': list(chain.from_iterable(repeat([lot_id + 1], ts_num))), 
                'cell_id': list(chain.from_iterable(repeat([cell_id + 1], ts_num))), 
                'param_id': list(chain.from_iterable(repeat([param_id + 1], ts_num))), 
                'timestamp_id': (np.arange(ts_num) + 1), 
                'measure_val': np.random.normal(10, 3, ts_num)# X~N(mean, stddev, size)
            })
            
            # inserting the missing values randomly
            nan_mask = np.random.choice(np.arange(ts_num), missing_num)
            ts_df_tmp.loc[nan_mask, 'measure_val'] = np.nan
            
            # concatenate the generated random dataset(ts_df_tmp) to the lot based DataFrame(ts_df) 
            ts_df = pd.concat([ts_df, ts_df_tmp], axis=0)
    
    # exporting the DataFrame to local csv file
    base_dir = base_dir
    file_nm = 'lot_' + \
        str(lot_id+1).zfill(4) + \
        '.csv'
        
    ts_df.to_csv(os.path.join(base_dir, file_nm), index=False)
    #ts_df.to_csv('/Users/lhongdon/Documents/SK_ON_PoC/ts_data/lot_0001.csv')
    
    print(file_nm, "is successfully generated.") 



#%% Executing the ts_random_generator UDF above

## running the UDF above using for loop statement
for lot_id in np.arange(lot_num):
    ts_random_generator(
        lot_id, 
        cell_num, 
        param_num, 
        ts_num, 
        missing_num, 
        base_dir
        )

 

 

 

위의 코드를 실행하면 for loop 순환문이 lot_num 수만큼 돌면서 ts_random_generator() 사용자 정의함수를 실행시키면서 결측값을 포함한 시계열 데이터 샘플 CSV 파일을 생성하여 지정된 base_dir 폴더에 저장을 합니다. 

(아래 화면 캡쳐 참조)

 

sample time series data list

 

 

아래의 화면캡쳐는 결측값을 포함하는 시계열 데이터 샘플 중에서 LOT_0001 번의 예시입니다. 

 

time series data sample with missing values

 

 

 

 

(2) Python 의 for loop 순환문으로 시계열 데이터 결측값을 보간하기

      (interpolation sequentially using Python for loop statement)

 

아래 코드는 Python으로 Lot, Cell, Parameter ID 별로 for loop 순환문을 사용해서 pandas 의 interpolate() 메소드를 사용해서 시계열 데이터의 결측값을 선형 보간(linear interpolation) 한 것입니다. 

(forward fill 로 먼저 선형 보간을 해주고, 그 다음에 만약에 첫번째 행에 결측값이 있을 경우에 backward fill 로 이후 값과 같은 값으로 결측값을 채워줍니다.)

 

순차적으로 for loop 순환문을 돌기 때문에 시간이 오래 걸립니다.

 

#%% [Step 2] linear interpolation
from datetime import datetime
start_time = datetime.now()


## reading csv files in the base_dir
file_list = os.listdir(base_dir)

for file_nm in file_list:
    # by Lot
    if file_nm[-3:] == "csv":
        # read csv file
        ts_df = pd.read_csv(os.path.join(base_dir, file_nm))
        
        # blank DataFrame for saving the interpolated time series later
        ts_df_interpolated = pd.DataFrame()
        
        # cell & param ID lists
        cell_list = np.unique(ts_df['cell_id'])
        param_list = np.unique(ts_df['param_id'])
        
        # interpolation by lot, cell, and param IDs
        for cell_id in cell_list:
           for param_id in param_list:
               ts_df_tmp = ts_df[(ts_df.cell_id == cell_id) & (ts_df.param_id == param_id)]
               
               ## interpolating the missing values for equaly spaced time series data
               ts_df_tmp.sort_values(by='timestamp_id', ascending=True) # sorting by TimeStamp first
               ts_df_interpolated_tmp = ts_df_tmp.interpolate(method='values') # linear interploation
               ts_df_interpolated_tmp = ts_df_interpolated_tmp.fillna(method='bfill') # backward fill for the first missing row
               
               ts_df_interpolated = pd.concat([ts_df_interpolated, ts_df_interpolated_tmp], axis=0)
        
        # export DataFrame to local folder as a csv file
        ts_df_interpolated.to_csv(os.path.join(interpolated_dir, file_nm), index=False)
        
        print(file_nm, "is successfully interpolated.")


time_elapsed = datetime.now() - start_time
print("----------" * 5)
print("Time elapsed (hh:mm:ss.ms) {}".format(time_elapsed))
print("----------" * 5)

 

# # Before interplolation
# 3,1,1,20,11.160795506036791
# 3,1,1,21,8.155949904188175
# 3,1,1,22,3.1040644143505407
# 3,1,1,23,                   <-- missing
# 3,1,1,24,                   <-- missing
# 3,1,1,25,11.020504352275342
# 3,1,1,26,                   <-- missing
# 3,1,1,27,8.817922501760519
# 3,1,1,28,10.673174873272234
# 3,1,1,29,6.584669096660191
# 3,1,1,30,13.442427337943553

# # After interpolation
# 3,1,1,20,11.160795506036791
# 3,1,1,21,8.155949904188175
# 3,1,1,22,3.1040644143505407
# 3,1,1,23,5.742877726992141  <-- interpolated
# 3,1,1,24,8.381691039633742  <-- interpolated
# 3,1,1,25,11.020504352275342
# 3,1,1,26,9.919213427017931  <-- interpolated
# 3,1,1,27,8.81792250176052
# 3,1,1,28,10.673174873272234
# 3,1,1,29,6.584669096660191
# 3,1,1,30,13.442427337943554

 

 

아래 화면캡쳐는 선형보간하기 전에 결측값이 있을 때와, 이를 선형보간으로 값을 생성한 후의 예시입니다. 

 

linear interpolation for missing data in time series

 

 

 

아래 선 그래프의 파란색 점 부분이 원래 값에서는 결측값 이었던 것을 선형 보간(linear interpolation)으로 채워준 후의 모습입니다. 선형보간이므로 측정된 값으로 선형회귀식을 적합하고, 결측값 부분의 X 값을 입력해서 Y를 예측하는 방식으로 결측값을 보간합니다. 

 

linear interpolation of missing values in time series

 

 

 

아래 코드는 데이터가 Greenplum DB에 적재되어 있다고 했을 때, 

 (2-1) Python으로 Greenplum DB에 access하여 데이터를 Query 해와서 pandas DataFrame으로 만들고 

 (2-2) Pytnon pandas 의 interpolate() 메소드를 사용해서 선형보간을 한 다음에 

 (2-3) 선형보간된 DataFrame을 pandas 의 to_sql() 메소드를 사용해서 다시 Greenplum DB에 적재

하는 코드입니다. 이를 for loop 순환문을 사용해서 Lot 의 개수만큼 실행시켜 주었습니다. 

 

순차적으로 for loop 순환문을 돌기 때문에 시간이 오래 걸립니다.

 

#%% Greenplum credentials
user = 'username' 
password = 'password' 
host = 'ip_address'
port = 'port'
db = 'databasename'

connection_string = "postgresql://{user}:{password}@{host}:{port}/{db}".\
        format(user=user, 
               password=password, 
               host=host, 
               port=port,
               db=db)

#%%
# helper function: query to pandas DataFrame
def gpdb_query(query):
    import psycopg2 as pg
    import pandas as pd
    
    conn = pg.connect(connection_string)
    cursor = conn.cursor()
    
    cursor.execute(query)
    col_names = [desc[0] for desc in cursor.description]
    
    result_df = pd.DataFrame(cursor.fetchall(), columns=col_names)
    
    cursor.close()
    conn.close()
    
    return result_df


#%% 
# UDF for running a query

def interpolator(lot_id):
    
    #import pandas as pd
    
    query = """
        SELECT * 
        FROM ts_data 
        WHERE 
            lot_id = {lot_id}
    """.format(
            lot_id = lot_id)
    
    ts_df = gpdb_query(query)
    ts_df = ts_df.astype({
        'measure_val': float
        })
    
    ## interpolating the missing values for equaly spaced time series data
    ts_df_interpolated = pd.DataFrame()
    
    for cell_id in (np.arange(cell_num)+1):
        for param_id in (np.arange(param_num)+1):
            ts_df_tmp = ts_df[(ts_df.cell_id == cell_id) & (ts_df.param_id == param_id)]
    
            ts_df_tmp.sort_values(by='timestamp_id', ascending=True) # sorting by TimeStamp first
            ts_df_interpolated_tmp = ts_df_tmp.interpolate(method='values') # linear interploation
            ts_df_interpolated_tmp = ts_df_interpolated_tmp.fillna(method='bfill') # backward fill for the first missing row
            
            ts_df_interpolated = pd.concat([ts_df_interpolated, ts_df_interpolated_tmp], axis=0)
    
    # export DataFrame to local folder as a csv file
    #ts_df_interpolated.to_csv(os.path.join(interpolated_dir, file_nm), index=False)        
    #print(file_nm, "is successfully interpolated.")
    
    return ts_df_interpolated



#%% 
# UDF for importing pandas DataFrame to Greenplum DB
def gpdb_importer(lot_id, connection_string):
    
    import sqlalchemy
    from sqlalchemy import create_engine
    
    engine = create_engine(connection_string)
    
    # interpolation
    ts_data_interpolated = interpolator(lot_id)
    
    # inserting to Greenplum    
    ts_data_interpolated.to_sql(
        name = 'ts_data_interpolated_python', 
        con = engine, 
        schema = 'equipment', 
        if_exists = 'append', 
        index = False, 
        dtype = {'lot_id': sqlalchemy.types.INTEGER(), 
                 'cell_id': sqlalchemy.types.INTEGER(), 
                 'param_id': sqlalchemy.types.INTEGER(),
                 'timestamp_id': sqlalchemy.types.INTEGER(), 
                 'measure_val': sqlalchemy.types.Float(precision=6)
                 })
    

#%%
from datetime import datetime
start_time = datetime.now()

import pandas as pd
import os
import numpy as np

for lot_id in (np.arange(lot_num)+1):
    gpdb_importer(lot_id, connection_string)
    print("lot_id", lot_id, "is successfully interpolated.")

time_elapsed = datetime.now() - start_time
print("----------" * 5)
print("Time elapsed (hh:mm:ss.ms) {}".format(time_elapsed))
print("----------" * 5)

 

 

 

 

(3) Greenplum에서 PL/Python으로 분산병렬처리하여 시계열 데이터 결측값을 보간하기

     (interpolation in parallel using PL/Python on Greenplum)

Greenplum에서 PL/Python으로 병렬처리할 때는 (a) 사용자 정의 함수(UDF) 정의, (b) 사용자 정의 함수 실행의 두 단계를 거칩니다. 

 

Greenplum DB에서 PL/Python으로 분산병렬처리를 하면 위의 (2)번에서 Python으로 for loop 순환문으로 순차처리한 것 대비 Greenplum DB 내 노드의 개수에 비례하여 처리 속도가 줄어들게 됩니다. (가령, 노드가 8개이면 병렬처리의 총 처리 소요시간은 순차처리했을 때의 총 소요시간의 1/8 로 줄어듭니다.) 

 

 

parallel processing using PL/Python on Greenplum DB

 

 

(3-1) PL/Python 으로 시계열 데이터 결측값을 선형보간하는 사용자 정의함수 정의 (define a UDF)

 

-- defining the PL/Python UDF
DROP FUNCTION IF EXISTS plpy_interp(numeric[]);
CREATE OR REPLACE FUNCTION plpy_interp(measure_val_arr numeric[]) 
RETURNS numeric[]
AS $$
	import numpy as np
	import pandas as pd
	
	measure_val = np.array(measure_val_arr, dtype='float')
	
	ts_df = pd.DataFrame({
	   'measure_val': measure_val
	    })
	
	# interpolation by lot, cell, and param IDs               
	ts_df_interpolated = ts_df.interpolate(method='values') # linear interploation
	ts_df_interpolated = ts_df_interpolated.fillna(method='bfill') # backward fill for the first missing row
	
	return ts_df_interpolated['measure_val']
	        
$$ LANGUAGE 'plpythonu';

 

 

 

(3-2) 위에서 정의한 시계열 데이터 결측값을 선형보간하는 PL/Python 사용자 정의함수 실행

 

 

PL/Python의 input 으로는 SQL의 array_agg() 함수를 사용해서 만든 Array 데이터를 사용하며, PL/Python에서는 SQL의 Array를 Python의 List 로 변환(converting) 합니다. 

 

-- array aggregation as an input
DROP TABLE IF EXISTS tab1;
CREATE TEMPORARY TABLE tab1 AS
		SELECT 
			lot_id
			, cell_id
			, param_id
			, ARRAY_AGG(timestamp_id ORDER BY timestamp_id) AS timestamp_id_arr
			, ARRAY_AGG(measure_val ORDER BY timestamp_id) AS measure_val_arr
		FROM ts_data
		GROUP BY lot_id, cell_id, param_id
DISTRIBUTED RANDOMLY ;
		
ANALYZE tab1;		


-- executing the PL/Python UDF
DROP TABLE IF EXISTS ts_data_interpolated;
CREATE TABLE ts_data_interpolated AS (
	SELECT 
		lot_id 
		, cell_id 
		, param_id 
		, timestamp_id_arr
		, plpy_interp(measure_val_arr) AS measure_val_arr -- plpython UDF
	FROM tab1 AS a 
) DISTRIBUTED BY (lot_id);

 

 

 

아래 코드는 numeric array 형태로 반환한 선형보간 후의 데이터를 unnest() 함수를 사용해서 보기에 편하도록 long format 으로 풀어준 것입니다. 

 

-- display the interpolated result
SELECT 
	lot_id
	, cell_id 
	, param_id
	, UNNEST(timestamp_id_arr) AS timestamp_id
	, UNNEST(measure_val_arr) AS measure_val
FROM ts_data_interpolated
WHERE lot_id = 1 AND cell_id = 1 AND param_id = 1
ORDER BY lot_id, cell_id, param_id, timestamp_id
LIMIT 100;

 

 

결측값을 포함하고 있는 원래 데이터셋을 아래 SQL query 로 조회해서, 위의 선형보간 된 후의 데이터셋과 비교해볼 수 있습니다. 

 

-- original dataset with missing value
SELECT 
	lot_id
	, cell_id 
	, param_id
	, timestamp_id
	, measure_val
FROM ts_data
WHERE lot_id = 1 AND cell_id = 1 AND param_id = 1
ORDER BY lot_id, cell_id, param_id, timestamp_id
LIMIT 100;

 

 

이번 포스팅이 많은 도움이 되었기를 바랍니다. 

행복한 데이터 과학자 되세요!  :-)

 

728x90
반응형
Posted by Rfriend
,

이전 포스팅에서 스펙트럼 분석(spectrum analysis, spectral analysis, frequency domain analysis) 에 대해서 소개하였습니다. ==> https://rfriend.tistory.com/690 참조

 

이번 포스팅에서는 Greenplum 에서 PL/Python (Procedural Language)을 활용하여 여러개 그룹의 시계열 데이터에 대해 스펙트럼 분석을 분산병렬처리하는 방법을 소개하겠습니다. (spectrum analysis in parallel using PL/Python on Greenplum database)

 

 

spectrum analysis in parallel using PL/Python on Greenplum

 

 

(1) 다른 주파수를 가진 3개 그룹의 샘플 데이터셋 생성

 

먼저, spectrum 모듈에서 data_cosine() 메소드를 사용하여 주파수(frequency)가 100, 200, 250 인 3개 그룹의 코사인 파동 샘플 데이터를 생성해보겠습니다. (노이즈 A=0.1 만큼이 추가된 1,024개 코사인 데이터 포인트를 가진 예제 데이터) 

 

## generating 3 cosine signals with frequency of (100Hz, 200Hz, 250Hz) respectively
## buried in white noise (amplitude 0.1), a length of N=1024 and the sampling is 1024Hz.
from spectrum import data_cosine

data1 = data_cosine(N=1024, A=0.1, sampling=1024, freq=100)
data2 = data_cosine(N=1024, A=0.1, sampling=1024, freq=200)
data3 = data_cosine(N=1024, A=0.1, sampling=1024, freq=250)

 

 

 

다음으로 Python pandas 모듈을 사용해서 'grp' 라는 칼럼에 'a', 'b', 'c'의 구분자를 추가하고, 'val' 칼럼에는 위에서 생성한 각기 다른 주파수를 가지는 3개의 샘플 데이터셋을 값으로 가지는 DataFrame을 생성합니다. 

 

## making a pandas DataFrame with a group name
import pandas as pd

df1 = pd.DataFrame({'grp': 'a', 'val': data1})
df2 = pd.DataFrame({'grp': 'b', 'val': data2})
df3 = pd.DataFrame({'grp': 'c', 'val': data3})

df = pd.concat([df1, df2, df3])


df.shape
# (3072, 2)


df.head()
# 	grp	val
# 0	a	1.056002
# 1	a	0.863020
# 2	a	0.463375
# 3	a	-0.311347
# 4	a	-0.756723

 

 

 

sqlalchemy 모듈의 create_engine("driver://user:password@host:port/database") 메소드를 사용해서 Greenplum 데이터베이스에 접속한 후에 pandas의 DataFrame.to_sql() 메소드를 사용해서 위에서 만든 pandas DataFrame을 Greenplum DB에 import 하도록 하겠습니다. 

 

이때 index = True, indx_label = 'id' 를 꼭 설정해주어야만 합니다. 그렇지 않을 경우 Greenplum DB에 데이터가 import 될 때 시계열 데이터의 특성이 sequence 가 없이 순서가 뒤죽박죽이 되어서, 이후에 스펙트럼 분석을 할 수 없게 됩니다. 

 

## importing data to Greenplum using pandas 
import sqlalchemy
from sqlalchemy import create_engine

# engine = sqlalchemy.create_engine("postgresql://user:password@host:port/database")
engine = create_engine("postgresql://user:password@ip:5432/database") # set with yours

df.to_sql(name = 'data_cosine', 
          con = engine, 
          schema = 'public', 
          if_exists = 'replace', # {'fail', 'replace', 'append'), default to 'fail'
          index = True, 
          index_label = 'id', 
          chunksize = 100, 
          dtype = {
              'id': sqlalchemy.types.INTEGER(), 
              'grp': sqlalchemy.types.TEXT(), 
              'val': sqlalchemy.types.Float(precision=6)
          })
          
          
          
SELECT * FROM data_cosine order by grp, id LIMIT 5;
# id	grp	val
# 0	a	1.056
# 1	a	0.86302
# 2	a	0.463375
# 3	a	-0.311347
# 4	a	-0.756723


SELECT count(1) AS cnt FROM data_cosine;
# cnt
# 3072

 

 

 

(2) 스펙트럼 분석을 분산병렬처리하는 PL/Python 사용자 정의 함수 정의 (UDF definition)

 

아래의 스펙트럼 분석은 Python scipy 모듈의 signal() 메소드를 사용하였습니다. (spectrum 모듈의 Periodogram() 메소드를 사용해도 동일합니다. https://rfriend.tistory.com/690  참조) 

(Greenplum database의 master node와 segment nodes 에는 numpy와 scipy 모듈이 각각 미리 설치되어 있어야 합니다.)

 

사용자 정의함수의 인풋으로는 (a) 시계열 데이터 array 와 (b) sampling frequency 를 받으며, 아웃풋으로는 스펙트럼 분석을 통해 추출한 주파수(frequency, spectrum)를 텍스트(혹은 int)로 반환하도록 하였습니다. 

 

DROP FUNCTION IF EXISTS spectrum_func(float8[], int);
CREATE OR REPLACE FUNCTION spectrum_func(x float8[], fs int) 
RETURNS text 
AS $$
    from scipy import signal
    import numpy as np
    
    # x: Time series of measurement values
    # fs: Sampling frequency of the x time series. Defaults to 1.0.
    f, PSD = signal.periodogram(x, fs=fs)
    freq = np.argmax(PSD)
    
    return freq
    
$$ LANGUAGE 'plpythonu';

 

 

 

(3) 스펙트럼 분석을 분산병렬처리하는 PL/Python 사용자 정의함수 실행
       (Execution of Spectrum Analysis in parallel on Greenplum)

 

위의 (2)번에서 정의한 스펙트럼 분석 PL/Python 사용자 정의함수 spectrum_func(x, fs) 를 Select 문으로 호출하여 실행합니다. FROM 절에는 sub query로 input에 해당하는 시계열 데이터를 ARRAY_AGG() 함수를 사용해 array 로 묶어주는데요, 이때 ARRAY_AGG(val::float8 ORDER BY id) 로 id 기준으로 반드시 시간의 순서에 맞게 정렬을 해주어야 제대로 스펙트럼 분석이 진행이 됩니다

 

SELECT 
    grp
    , spectrum_func(val_agg, 1024)::int AS freq
FROM (
    SELECT 
        grp
        , ARRAY_AGG(val::float8 ORDER BY id) AS val_agg
    FROM data_cosine
    GROUP BY grp
) a
ORDER BY grp; 

# grp	freq
# a	100
# b	200
# c	250

 

 

우리가 (1)번에서 주파수가 각 100, 200, 250인 샘플 데이터셋을 만들었는데요, 스펙트럼 분석을 해보니 정확하게 주파수를 도출해 내었네요!. 

 

이번 포스팅이 많은 도움이 되었기를 바랍니다. 

행복한 데이터 과학자 되세요!  :-)

 

728x90
반응형
Posted by Rfriend
,

Database에서 가상의 샘플 데이터를 만들어서 SQL이 버그없이 잘 작동하는지 확인을 한다든지, DB의 성능을 테스트 해봐야 할 때가 있습니다. 

 

이번 포스팅에서는 PostgreSQL, Greenplum DB를 사용해서 정규분포(Normal Distribution)로부터 난수 (random number)를 생성하여 샘플 테이블을 만들어보겠습니다. 

 

(1) 테이블 생성 : create table

(2) 정규분포로 부터 난수 생성하는 사용자 정의 함수 정의 : random_normal(count, mean, stddev)

(3) 테이블에 정규분포로 부터 생성한 난수 추가하기 : generate_series(), to_char(), insert into

(4) Instance 별 데이터 개수 확인하기 : count() group by gp_segment_id

 

creating a sample table using random numbers in PostgreSQL, Greenplum

 

 

아래 SQL 예제 코드는 PostgreSQL 9.4.26 버전, Greenplum 6.19.2 버전에서, Greenplum Database 의 분산 저장, 분산병렬처리 고려해서 작성하였습니다. 

 

-- version check
SELECT version();
-- PostgreSQL 9.4.26 (Greenplum Database 6.19.2 build commit:0e1f6307eb4e368b79cbf67a0dc6af53362d26c0) on x86_64-unknown-linux-gnu, compiled by gcc (GCC) 6.4.0, 64-bit compiled on Feb 14 2022 23:03:52

 

 

(1) 테이블 생성 : create table

 

CREATE TABLE 함수를 사용해서 분석의 대상 기준으로 사용할 문자열의 cell_id, param_id 와 숫자형의 측정값 meas_val 을 가지는 빈 껍데기 samp_tbl 테이블을 만들어보겠습니다. WITH 절에 압축 옵션을 설정하였으며, DISTRIBUTED BY (cell_id) 로 분산 저장하는 기준을 설정해주었습니다.

 

분산키를 잘 잡아주는 것이 향후 분산병렬처리의 성능을 좌지우지 합니다. 분석이나 데이터 처리(조인 등) 기준이 되고, 한쪽 노드로 쏠리지 않고 골고루 분산시킬 수 있는 분산키를 설정해주어야 합니다. 

 

-- creating a sample table
DROP TABLE IF EXISTS samp_tbl;
CREATE TABLE samp_tbl (
	cell_id varchar(10)
	, param_id varchar(2)
	, meas_val numeric
) WITH(appendonly=TRUE, compresslevel=7, compresstype=zstd) 
DISTRIBUTED BY (cell_id);

 

 

 

(2) 정규분포로 부터 난수 생성하는 사용자 정의 함수 정의 : random_normal(count, mean, stddev)

 

PostgreSQL 버전 10 이상부터 정규분포(normal distribution)로 부터 난수를 생성(generating random numbers) 하는 함수 normal_rand() 를 쓸 수 있습니다. 

https://www.postgresql.org/docs/current/tablefunc.html

-- over PostgreSQL version 10
-- Produces a set of normally distributed random values.
normal_rand ( numvals integer, mean float8, stddev float8 ) → setof float8

 

 

PostgreSQL 9.6 이전 버전에서는 PL/Python, PL/R, PL/SQL 로 정규분포로 부터 난수를 생성하는 사용자 정의함수를 정의해서 사용해야 합니다. (아래는 PL/SQL 이구요, PL/Python이나 PL/R 로도 가능해요)

 

-- UDF of random number generator from a normal distribution, X~N(mean, stddev)
-- random_normal() built-in function over PostgreSQL version 10.x
DROP FUNCTION IF EXISTS random_normal(INTEGER, DOUBLE PRECISION, DOUBLE PRECISION);
CREATE OR REPLACE FUNCTION random_normal(
    count INTEGER DEFAULT 1,
    mean DOUBLE PRECISION DEFAULT 0.0,
    stddev DOUBLE PRECISION DEFAULT 1.0
    ) RETURNS SETOF DOUBLE PRECISION
      RETURNS NULL ON NULL INPUT AS $$
        DECLARE
            u DOUBLE PRECISION;
            v DOUBLE PRECISION;
            s DOUBLE PRECISION;
        BEGIN
            WHILE count > 0 LOOP
                u = RANDOM() * 2 - 1; -- range: -1.0 <= u < 1.0
                v = RANDOM() * 2 - 1; -- range: -1.0 <= v < 1.0
                s = u^2 + v^2;

                IF s != 0.0 AND s < 1.0 THEN
                    s = SQRT(-2 * LN(s) / s);

                    RETURN NEXT mean + stddev * s * u;
                    count = count - 1;

                    IF count > 0 THEN
                        RETURN NEXT mean + stddev * s * v;
                        count = count - 1;
                    END IF;
                END IF;
            END LOOP;
        END;
    $$ LANGUAGE plpgsql;
    
    
    -- credit: https://bugfactory.io/blog/generating-random-numbers-according-to-a-continuous-probability-distribution-with-postgresql/

 

 

 

(3) 테이블에 정규분포로 부터 생성한 난수 추가하기 : generate_series(), to_char(), insert into

 

이제 위의 (1)번에서 생성한 samp_tbl 테이블에 insert into 구문을 사용해서 가상의 샘플 데이터 추가해보겠습니다. 이때 From 절에서 generate_series(from, to) 함수를 사용해서 정수의 수열을 생성해주고, SELECT 절의 TO_CHAR(a, '0000000000'), TO_CHAR(b, '00') 에서 generate_series()에서 생성한 정수를 자리수가 10자리, 2자리인 문자열로 바꾸어줍니다. (빈 자리는 '0'으로 자리수만큼 채워줍니다.) TRIP() 함수는 화이트 스페이스를 제거해줍니다. 

 

-- inserting data
-- cell_id 1,000 * param_id 4 * meas_val 25 = 100,000 rows in total
-- good cases 99,999,000 vs. bad cases 1,000 (cell_id 10 * param_id 4 * meas_val 25 = 1,000 rows) 
-- cell_id '000000001' will be used as a control group (good case) later.   
-- it took 8 min. 4 sec.
TRUNCATE TABLE samp_tbl;
INSERT INTO samp_tbl 
SELECT 
	trim(to_char(a, '0000000000')) AS cell_id
	, trim(to_char(b, '00')) AS param_id
	, random_normal(25, 0, 1) AS meas_val -- X~N(0, 1), from Normal distribution
FROM generate_series(1, 1000) AS a -- cell_id
	, generate_series(1, 4) AS b -- param_id
;

 

 

 

(4) Instance 별 데이터 개수 확인하기 : count() group by gp_segment_id

 

위의 (1)~(3)번에서 테이블을 만들고, 가짜 데이터를 정규분포로 부터 난수를 발생시켜서 테이블에 추가를 하였으니, Greenplum의 각 nodes 에 골고루 잘 분산이 되었는지 확인을 해보겠습니다. (아래는 AWS에서 2개 노드, 노드별 6개 instance, 총 12개 instances 환경에서 테스트한 것임)

 

-- check segments in Greenplum
-- 2 nodes * 6 instances = 12 instances in total
SELECT gp_segment_id, count(1) 
FROM samp_tbl 
	GROUP BY gp_segment_id 
	ORDER BY gp_segment_id;
    
--gp_segment_id|count
---------------+-----+
--            0| 7400|
--            1| 8300|
--            2| 8200|
--            3| 8500|
--            4| 7800|
--            5| 6600|
--            6| 9400|
--            7| 9400|
--            8| 7600|
--            9| 8900|
--           10| 8200|
--           11| 9700|


-- totoal number of rows: cell_id 1,000 * param_id 4 * measured_value 25 = 100,000
SELECT count(1) FROM samp_tbl; 
--count |
--------+
--100000|


-- X ~ N(0, 1) approximately
SELECT avg(meas_val), stddev(meas_val) FROM samp_tbl;
--avg               |stddev           |
-------------------+-----------------+
--0.005474367|0.995105289|



SELECT * FROM samp_tbl ORDER BY cell_id, param_id LIMIT 25;
--cell_id   |param_id|meas_val          |
------------+--------+------------------+
--0000000001|01      |-0.531695967165547|
--0000000001|01      |-0.108739177377124|
--0000000001|01      | 0.568470878445752|
--0000000001|01      |0.0202499172346384|
--0000000001|01      | 0.733808732215974|
--0000000001|01      | 0.217977459614905|
--0000000001|01      |-0.819498864258696|
--0000000001|01      | -1.15053271252296|
--0000000001|01      |  0.27459170410016|
--0000000001|01      |-0.360160392758718|
--0000000001|01      | 0.180482978307365|
--0000000001|01      | 0.903190145608135|
--0000000001|01      |-0.546983465499866|
--0000000001|01      |  2.10019183187282|
--0000000001|01      | 0.500516025880425|
--0000000001|01      | -1.46928655599126|
--0000000001|01      |-0.224673782111734|
--0000000001|01      | 0.600268991904523|
--0000000001|01      |-0.233178028377569|
--0000000001|01      |0.0753960434547863|
--0000000001|01      | -2.86355579238885|
--0000000001|01      | -2.25814837725797|
--0000000001|01      |   1.4013348575359|
--0000000001|01      |-0.445684149707259|
--0000000001|01      | -1.03404850229361|

 

 

이번 포스팅이 많은 도움이 되었기를 바랍니다. 

행복한 데이터 과학자 되세요!  :-)

 

728x90
반응형
Posted by Rfriend
,

이번 포스팅에서는 PostgreSQL, Greenplum DB에서 여러개의 문자열을 'OR' 조건으로 매칭하는 3가지 SQL 방법을 소개하겠습니다. 

 

(1) LIKE '%string1%' OR LIKE '%string2%' ... 

(2) ANY(ARRAY['%string1%', '%string2%', ...])

(3) regular expression matching: ~ '(string1|string2|...)'

 

 

 

먼저 예제로 사용할 샘플 테이블을 만들어보겠습니다. 과일가게에서 장바구니 ID별로 구매한 과일 품목이 문자열로 들어있는 테이블입니다. 

 

-- create a sample table
DROP TABLE IF EXISTS basket_tbl;
CREATE TABLE basket_tbl (
	id int
	, item text
);


INSERT INTO basket_tbl VALUES 
(1, 'orange, apple, grape')
, (2, 'guava, apple, durian')
, (3, 'strawberry, lime, leomon')
, (4, 'mango, mangosteen, plum')
, (5, 'plum, guava, peach');

SELECT * FROM basket_tbl ORDER BY id;
--id|item                    |
----+------------------------+
-- 1|orange, apple, grape    |
-- 2|guava, apple, durian    |
-- 3|strawberry, lime, leomon|
-- 4|mango, mangosteen, plum |
-- 5|plum, guava, peach      |

 

 

위의 샘플 테이블의 item 칼럼의 문자열에서 'apple', 'orange', 'peach' 중에 하나라도(OR) 문자열이 매칭(string matching)이 되면 SELECT 문으로 조회를 해오는 SQL query 를 3가지 방법으로 작성해보겠습니다. 

 

 

(1) LIKE '%string1%' OR LIKE '%string2%' ... 

 

가장 단순한 반면에, 조건절 항목이 많아질 경우 SQL query 가 굉장히 길어지고 비효율적인 단점이 있습니다. 

 

-- (1) multiple LIKE '%string1%' OR LIKE '%string2%' OR...
SELECT * 
FROM basket_tbl 
WHERE item LIKE '%apple%' 
	OR item LIKE '%orange%'
	OR item LIKE '%peach%'
ORDER BY id;

--id|item                |
----+--------------------+
-- 1|orange, apple, grape|
-- 2|guava, apple, durian|
-- 5|plum, guava, peach  |

 

 

(2) ANY(ARRAY['%string1%', '%string2%', ...])

 

문자열 매칭 조건절의 각 문자열 항목을 ARRAY[] 에 나열을 해주고, any() 연산자를 사용해서 이들 문자열 조건 중에서 하나라도 매칭이 되면 반환을 하도록 하는 방법입니다. 위의 (1)번 보다는 SQL query 가 짧고 깔끔해졌습니다. 

 

-- (2) ANY(ARRAY['%string1%', '%string2%',...])
SELECT * 
FROM basket_tbl 
WHERE item LIKE ANY(ARRAY['%apple%', '%orange%', '%peach%'])
ORDER BY id;

--id|item                |
----+--------------------+
-- 1|orange, apple, grape|
-- 2|guava, apple, durian|
-- 5|plum, guava, peach  |

 

 

 

(3) regular expression matching: ~ '(string1|string2|...)'

 

마지막으로, 정규표현식(regular expression) '~'을 이용해서 복수의 문자열을 OR 조건(수직바 '|')으로 매칭하는 방법입니다. '%'를 사용하지 않아도 되므로 (1), (2) 와 비교했을 때 가장 SQL query 가 간단한 방법입니다. 

 

-- (3) regular expression match: ~ '(string1|string2|...)'
SELECT * 
FROM basket_tbl 
WHERE item ~ '(apple|orange|peach)'
ORDER BY id;

--id|item                |
----+--------------------+
-- 1|orange, apple, grape|
-- 2|guava, apple, durian|
-- 5|plum, guava, peach  |

 

 

이번 포스팅이 많은 도움이 되었기를 바랍니다. 

행복한 데이터 과학자 되세요~!  :-)

 

728x90
반응형
Posted by Rfriend
,

이번 포스팅에서는 PostgreSQL, Greenplum DB의 Window Function 의 함수 특징, 함수별 구문 사용법에 대해서 알아보겠습니다. Window Function을 알아두면 편리하고 또 강력한 SQL query 를 사용할 수 있습니다. 특히 MPP (Massively Parallel Processing) Architecture 의 Greenplum DB 에서는 Window Function 실행 시 분산병렬처리가 되기 때문에 성능 면에서 매우 우수합니다. 

 

Window Function 은 현재 행과 관련된 테이블 행 집합(a set of table rows)에 대해 계산을 수행합니다. 이는 집계 함수(aggregate function)로 수행할 수 있는 계산 유형과 비슷합니다. 그러나 일반적인 집계 함수와 달리 Window Function을 사용하면 행이 단일 출력 행으로 그룹화되지 않으며 행은 별도의 ID를 유지합니다.

 

아래 화면의 예시는 AVG() 함수로 평균을 구하는데 있어,

   (a) 전체 평균 집계: AVG() --> 한 개의 행 반환

   (b) 그룹별 평균 집계: AVG() ... GROUP BY --> 그룹의 개수만큼 행 반환

   (c) Window Function: AVG() OVER (PARTITION BY)  --> ID의 개수만큼 행 반환

별로 차이를 비교해보았습니다. 

 

[ PostgreSQL, Greenplum: Aggregation vs. Aggregation by Groups vs. Window Function ]

 

PostgreSQL, Greenplum: Aggregation vs. Aggregation by Groups vs. Window Function

 

 

 

먼저, 부서(depname), 직원번호(empno), 급여(salary) 의 칼럼을 가지는 간단한 예제 테이블 empsalary 을 만들고 데이터를 입력해보겠습니다. 

 

-- (0) making a sample table as an example 
DROP TABLE IF EXISTS empsalary;
CREATE TABLE empsalary (
	depname TEXT 
	, empno INT 
	, salary INT
);

INSERT INTO empsalary  (depname, empno, salary) 
VALUES 
('sales', 1, 5000)
, ('personnel', 2, 3900)
, ('sales', 3, 4800)
, ('sales', 4, 4800)
, ('personnel', 5, 3500)
, ('develop', 7, 4200)
, ('develop', 8, 6000)
, ('develop', 9, 4500)
, ('develop', 10, 5200)
, ('develop' , 11, 5200);

SELECT * FROM empsalary ORDER BY  empno LIMIT 2;

--depname  |empno|salary|
-----------+-----+------+
--sales    |    1|  5000|
--personnel|    2|  3900|

 

 

 

(1) AVG()  vs.  AVG() GROUP BY  vs.  AVG() OVER (PARTITION BY ORDER BY)

 

아래는 (a) AVG() 로 전체 급여의 전체 평균 집계, (b) AVG() GROUP BY depname 로 부서 그룹별 평균 집계, (c) AVG() OVER (PARTITION BY depname) 의 Window Function을 사용해서 부서 집합 별 평균을 계산해서 직원번호 ID의 행별로 결과를 반환하는 것을 비교해 본 것입니다. 반환하는 행의 개수, 결과를 유심히 비교해보시면 aggregate function 과 window function 의 차이를 이해하는데 도움이 될거예요. 

 

-- (0) aggregation returns 1 row.
SELECT 
	AVG(salary) 
FROM empsalary;

--avg                  |
-----------------------+
--4710.0000000000000000|


-- (0) aggregation by groups returns rows with the number of groups
SELECT 
	depname
	, AVG(salary) 
FROM empsalary 
GROUP BY depname 
ORDER BY depname;

--depname  |avg                  |
-----------+---------------------+
--develop  |5020.000000000000000
--personnel|3700.000000000000000
--sales    |4866.6666666666666667|


-- (1) Window functions have an OVER(PARTITION BY xx) clause. 
-- any function without an OVER clause is not a window function, but rather an aggregate or single-row (scalar) function.
SELECT 
	depname
	, empno
	, salary
	, AVG(salary) 
		OVER (PARTITION BY depname) 
		AS avg_dep
FROM empsalary;

--depname  |empno|salary|avg_dep              |
-----------+-----+------+---------------------+
--develop  |    7|  4200|5020.0000000000000000|
--develop  |    8|  6000|5020.0000000000000000|
--develop  |    9|  4500|5020.0000000000000000|
--develop  |   10|  5200|5020.0000000000000000|
--develop  |   11|  5200|5020.0000000000000000|
--personnel|    5|  3500|3700.0000000000000000|
--personnel|    2|  3900|3700.0000000000000000|
--sales    |    4|  4800|4866.6666666666666667|
--sales    |    3|  4800|4866.6666666666666667|
--sales    |    1|  5000|4866.6666666666666667|

 

 

 

아래는 PostgreSQL, Greenplum 의 Window Function Syntax 구문입니다.

  - window_function(매개변수) 바로 다음에 OVER() 가 있으며,

  - OVER() 안에 PARTITION BY 로 연산이 실행될 집단(set) 기준을 지정해주고 

  - OVER() 안에 ORDER BY 로 시간(time), 순서(sequence)가 중요할 경우 정렬 기준을 지정해줍니다. 

 

-- PostgreSQL Window Function Syntax
WINDOW_FUNCTION(arg1, arg2,..) OVER ( 
	[PARTITION BY partition_expression] 
	[ORDER BY sort_expression [ASC | DESC] 
	[NULLS {FIRST | LAST }]
	)

 

PostgreSQL, Greenplum Window Function Syntax

 

 

PostgreSQL의 Window Functions  중에서 제가 그래도 자주 쓰는 함수로 AVG() OVER(), SUM() OVER(), RANK() OVER(), LAG() OVER(), LEAD() OVER(), FIRST_VALUE() OVER(), LAST_VALUE() OVER(), NTILE() OVER(), ROW_NUMBER() OVER() 등의 일부 함수 (제 맘대로... ㅎㅎ)에 대해서 아래에 예시를 들어보겠습니다. 

 

 

 

(2) RANK() OVER ([PARTITION BY] ORDER BY) : 순위

 

아래 예는 PARTITION BY depname 로  '부서' 집단별로 구분해서, ORDER BY salary DESC로 급여 내림차순으로 정렬한 후의 직원별 순위(rank)를 계산해서 직원 ID 행별로 순위를 반환해줍니다.  

 

PARTITION BY 집단 내에서 ORDER BY 정렬 기준칼럼의 값이 동일할 경우 순위는 동일한 값을 가지며, 동일한 순위의 개수만큼 감안해서 그 다음 순위의 값은 순위가 바뀝니다. (가령, develop 부서의 경우 순위가 1, 2, 2, 4, 5, 로서 동일 순위 '2'가 두명 있고, 급여가 네번째인 사람의 순위는 '4'가 되었음.)

 

-- (2) You can control the order 
--    in which rows are processed by window functions using ORDER BY within OVER. 
SELECT 
	depname
	, empno
	, salary
	, RANK() OVER (PARTITION BY depname ORDER BY salary DESC) 
FROM empsalary;

--depname  |empno|salary|rank|
-----------+-----+------+----+
--personnel|    2|  3900|   1|
--personnel|    5|  3500|   2|
------------------------------------- set 1 (personnel)
--sales    |    1|  5000|   1|
--sales    |    3|  4800|   2|
--sales    |    4|  4800|   2|
------------------------------------- set 2 (sales)
--develop  |    8|  6000|   1|
--develop  |   11|  5200|   2|
--develop  |   10|  5200|   2|
--develop  |    9|  4500|   4|
--develop  |    7|  4200|   5|
------------------------------------- set 3 (develop)

 

 

 

(3) SUM() OVER () : PARTITION BY, ORDER BY 는 생략 가능

 

만약 집단별로 구분해서 연산할 필요가 없다면 OVER() 구문 안에서 PARTITON BY 는 생략할 수 있습니다. 

만약 시간이나 순서가 중요하지 않다면 OVER() 구문 안에서 ORDER BY 는 생략할 수 있습니다. 

 

아래 예에서는 SUM(salary) OVER () 를 사용해서 전체 직원의 급여 평균을 계산해서 각 직원 ID의 개수 만큼 행을 반환했습니다. 

 

-- (3) ORDER BY can be omitted if the ordering of rows is not important. 
-- It is also possible to omit PARTITION BY, 
-- in which case there is just one partition containing all the rows.
SELECT 
	salary
	, SUM(salary) OVER ()  -- same result
FROM empsalary;

--salary|sum  |
--------+-----+
--  5000|47100|
--  3900|47100|
--  4800|47100|
--  4800|47100|
--  3500|47100|
--  4200|47100|
--  6000|47100|
--  4500|47100|
--  5200|47100|
--  5200|47100|

 

 

 

(4) SUM() OVER (ORDER BY) : ORDER BY 구문이 추가되면 누적 합으로 결과가 달라짐

 

만약 PARTITION BY 가 없어서 집단별 구분없이 전체 데이터셋을 대상으로 연산을 하게 될 때, SUM(salary) OVER (ORDER BY salary) 처럼 OVER () 절 안에 ORDER BY 를 추가하게 되면 salary 를 기준으로 정렬이 된 상태에서 누적 합이 계산되므로 위의 (3)번과 차이점을 알아두기 바랍니다. 

 

-- (4) But if we add an ORDER BY clause, we get very different results:
SELECT 
	salary
	, SUM(salary) OVER (ORDER BY salary) 
FROM empsalary;

--salary|sum  |
--------+-----+
--  3500| 3500|
--  3900| 7400|
--  4200|11600|
--  4500|16100|
--  4800|25700|
--  4800|25700|
--  5000|30700|
--  5200|41100|
--  5200|41100|
--  6000|47100|

 

 

 

(5) LAG(expression, offset) OVER (ORDER BY), LEAD(expression, offset) OVER (ORDER BY)

 

이번에는 시간(time), 순서(sequence)가 중요한 시계열 데이터(Time Series data) 로 예제 테이블을 만들어보겠습니다. 시계열 데이터에 대해 Window Function 을 사용하게 되면 OVER (ORDER BY timestamp) 처럼 ORDER BY 를 꼭 포함시켜줘야 겠습니다. 

 

-- (5) LAG() over (), LEAD() over ()

-- making a sample TimeSeries table
DROP TABLE IF EXISTS ts;
CREATE TABLE ts (
	dt DATE
	, id INT
	, val INT
);

INSERT INTO  ts (dt, id, val) VALUES 
('2022-02-10', 1, 25)
, ('2022-02-11', 1, 28)
, ('2022-02-12', 1, 35)
, ('2022-02-13', 1, 34)
, ('2022-02-14', 1, 39)
, ('2022-02-10', 2, 40)
, ('2022-02-11', 2, 35)
, ('2022-02-12', 2, 30)
, ('2022-02-13', 2, 25)
, ('2022-02-14', 2, 15);

SELECT * FROM ts ORDER BY id, dt;

--dt        |id|val|
------------+--+---+
--2022-02-10| 1| 25|
--2022-02-11| 1| 28|
--2022-02-12| 1| 35|
--2022-02-13| 1| 34|
--2022-02-14| 1| 39|
--2022-02-10| 2| 40|
--2022-02-11| 2| 35|
--2022-02-12| 2| 30|
--2022-02-13| 2| 25|
--2022-02-14| 2| 15|

 

 

 

LAG(expression, offset) OVER (PARTITION BY id ORDER BY timestamp) 윈도우 함수는 ORDER BY timestamp 기준으로 정렬을 한 상태에서, id 집합 내에서 현재 행에서 offset 만큼 앞에 있는 행의 값(a row which comes before the current row)을 가져옵니다.  아래의 예를 살펴보는 것이 이해하기 빠르고 쉬울거예요. 

 

-- (5-1) LAG() function to access a row which comes before the current row 
--       at a specific physical offset.
SELECT 
	dt
	, id 
	, val
	, LAG(val, 1) OVER (PARTITION BY id ORDER BY dt) AS lag_val_1
	, LAG(val, 2) OVER (PARTITION BY id ORDER BY dt) AS lag_val_2
FROM ts 
;
--dt        |id|val|lag_val_1|lag_val_2|
------------+--+---+---------+---------+
--2022-02-10| 1| 25|         |         |
--2022-02-11| 1| 28|       25|         |
--2022-02-12| 1| 35|       28|       25|
--2022-02-13| 1| 34|       35|       28|
--2022-02-14| 1| 39|       34|       35|
--2022-02-10| 2| 40|         |         |
--2022-02-11| 2| 35|       40|         |
--2022-02-12| 2| 30|       35|       40|
--2022-02-13| 2| 25|       30|       35|
--2022-02-14| 2| 15|       25|       30|

 

 

 

LEAD(expression, offset) OVER (PARTITION BY id ORDER BY timestamp) 윈도우 함수는 ORDER BY timestamp 기준으로 정렬을 한 후에, id 집합 내에서 현재 행에서 offset 만큼 뒤에 있는 행의 값(a row which follows the current row)을 가져옵니다.  아래의 예를 살펴보는 것이 이해하기 빠르고 쉬울거예요. 

 

-- (5-2) LEAD() function to access a row that follows the current row, 
--       at a specific physical offset.
SELECT 
	dt
	, id 
	, val
	, LEAD(val, 1) OVER (PARTITION BY id ORDER BY dt) AS lead_val_1
	, LEAD(val, 2) OVER (PARTITION BY id ORDER BY dt) AS lead_val_2
FROM ts 
;
--dt        |id|val|lead_val_1|lead_val_2|
------------+--+---+----------+----------+
--2022-02-10| 1| 25|        28|        35|
--2022-02-11| 1| 28|        35|        34|
--2022-02-12| 1| 35|        34|        39|
--2022-02-13| 1| 34|        39|          |
--2022-02-14| 1| 39|          |          |
--2022-02-10| 2| 40|        35|        30|
--2022-02-11| 2| 35|        30|        25|
--2022-02-12| 2| 30|        25|        15|
--2022-02-13| 2| 25|        15|          |
--2022-02-14| 2| 15|          |          |

 

 

 

(6) FIRST_VALUE() OVER (), LAST_VALUE() OVER () 

 

OVER (PARTITION BY id ORDER BY dt) 로 id 집합 내에서 dt 순서를 기준으로 정렬한 상태에서, FIRST_VALUE() 는 첫번째 값을 반환하며, LAST_VALUE() 는 마지막 값을 반환합니다. 

 

OVER() 절 안에 RANGE BETWEEN UNBOUNDED PRECEDING AND UNBOUNDED FOLLOWING 은 partion by 집합 내의 처음과 끝의 모든 행의 범위를 다 고려하라는 의미입니다. 

 

-- (6) FIRST_VALUE() OVER (), LAST_VALUE() OVER ()

-- The FIRST_VALUE() function returns a value evaluated 
--  against the first row in a sorted partition of a result set.

-- The LAST_VALUE() function returns a value evaluated 
--  against the last row in a sorted partition of a result set.

-- The RANGE BETWEEN UNBOUNDED PRECEDING AND UNBOUNDED FOLLOWING clause defined 
-- the frame starting from the first row and ending at the last row of each partition.

SELECT 
	dt
	, id 
	, val 
	, FIRST_VALUE(val) 
		OVER (
			PARTITION BY id 
			ORDER BY dt 
			RANGE BETWEEN UNBOUNDED PRECEDING 
				AND UNBOUNDED FOLLOWING) AS first_val
	, LAST_VALUE(val) 
		OVER (
			PARTITION BY id 
			ORDER BY dt 
			RANGE BETWEEN UNBOUNDED PRECEDING 
				AND UNBOUNDED FOLLOWING) AS last_val
FROM ts
;
--dt        |id|val|first_val|last_val|
------------+--+---+---------+--------+
--2022-02-10| 1| 25|       25|      39|
--2022-02-11| 1| 28|       25|      39|
--2022-02-12| 1| 35|       25|      39|
--2022-02-13| 1| 34|       25|      39|
--2022-02-14| 1| 39|       25|      39|
--2022-02-10| 2| 40|       40|      15|
--2022-02-11| 2| 35|       40|      15|
--2022-02-12| 2| 30|       40|      15|
--2022-02-13| 2| 25|       40|      15|
--2022-02-14| 2| 15|       40|      15|

 

 

 

(7) NTILE(buckets) OVER (PARTITION BY ORDER BY)

 

NTINE(buckets) 의 buckets 개수 만큼 가능한 동일한 크기(equal size)를 가지는 집단으로 나누어줍니다.

아래 예 NTILE(2) OVER (PARTITION BY id ORDER BY val) 는 id 집합 내에서 val 을 기준으로 정렬을 한 상태에서 NTILE(2) 의 buckets = 2 개 만큼의 동일한 크기를 가지는 집단으로 나누어주었습니다.

 

짝수개면 정확하게 동일한 크기로 나누었을텐데요, id 집단 내 행의 개수가 5개로 홀수개인데 2개의 집단으로 나누려다 보니 가능한 동일한 크기인 3개, 2개로 나뉘었네요. 

 

-- (7) NTILE() function allows you to divide ordered rows in the partition 
--     into a specified number of ranked groups as EQUAL SIZE as possible.
SELECT 
	dt 
	, id 
	, val
	, NTILE(2) OVER (PARTITION BY id ORDER BY val) AS ntile_val
FROM ts 
ORDER BY id, val
;

--dt        |id|val|ntile_val|
------------+--+---+---------+
--2022-02-10| 1| 25|        1|
--2022-02-11| 1| 28|        1|
--2022-02-13| 1| 34|        1|
--2022-02-12| 1| 35|        2|
--2022-02-14| 1| 39|        2|
--2022-02-14| 2| 15|        1|
--2022-02-13| 2| 25|        1|
--2022-02-12| 2| 30|        1|
--2022-02-11| 2| 35|        2|
--2022-02-10| 2| 40|        2|

 

 

 

(8) ROW_NUMBER() OVER (ORDER BY)

 

아래의 예 ROW_NUMBER() OVER (PARTITION BY id ORDER BY dt) 는 id 집합 내에서 dt 를 기준으로 올림차순 정렬 (sort in ascending order) 한 상태에서, 1 부터 시작해서 하나씩 증가시켜가면서 행 번호 (row number) 를 부여한 것입니다. 집합 내에서 특정 기준으로 정렬한 상태에서 특정 순서/위치의 값을 가져오기 한다거나, 집합 내에서 unique 한 ID 를 생성하고 싶을 때 종종 사용합니다. 

 

-- (8) ROW_NUMBER() : Number the current row within its partition starting from 1.
SELECT 
	dt
	, id 
	, val 
	, ROW_NUMBER() OVER (PARTITION BY id ORDER BY dt) AS seq_no
FROM ts 
;
--dt        |id|val|seq_no|
------------+--+---+------+
--2022-02-10| 1| 25|     1|
--2022-02-11| 1| 28|     2|
--2022-02-12| 1| 35|     3|
--2022-02-13| 1| 34|     4|
--2022-02-14| 1| 39|     5|
--2022-02-10| 2| 40|     1|
--2022-02-11| 2| 35|     2|
--2022-02-12| 2| 30|     3|
--2022-02-13| 2| 25|     4|
--2022-02-14| 2| 15|     5|

 

 

[ Reference ]

* PostgreSQL Window Functions
   : https://www.postgresql.org/docs/9.6/tutorial-window.html

* PostgreSQL Window Functions Tutorial
   : https://www.postgresqltutorial.com/postgresql-window-function/

 

 

이번 포스팅이 많은 도움이 되었기를 바랍니다. 

행복한 데이터 과학자 되세요!  :-)

 

728x90
반응형
Posted by Rfriend
,