Google Colab에 PostgreSQL과 pgvector를 설치하면 Python 과 함께 Text, Image Embeddings 변환, 저장 및 Semantic Search 등에 사용할 수 있어서 편리합니다. 

 

 

1. Google Colab 은 무엇인가? 

 

Google Colab, 또는 Colaboratory는 Google에서 제공하는 무료 Jupyter 노트북 환경입니다. 이 서비스는 클라우드 기반으로 운영되며, 데이터 분석, 머신 러닝, 교육 및 연구를 위한 플랫폼으로 널리 사용됩니다. 주요 특징과 장점은 다음과 같습니다. 

 

- 웹 기반 인터페이스: 설치가 필요 없으며, 웹 브라우저를 통해 접근하고 사용할 수 있습니다. 이는 사용자가 어디에서나 쉽게 작업을 시작할 수 있게 해줍니다. 

 

- 무료 접근: 기본적인 사용은 무료이며, 누구나 Google 계정을 통해 접근할 수 있습니다. 

 

- GPU 및 TPU 지원: 데이터 과학과 머신 러닝 작업을 위해 고성능 컴퓨팅 자원인 GPU와 TPU를 무료로 사용할 수 있습니다. 

 

- Python 지원: Python 프로그래밍 언어와 다양한 라이브러리(NumPy, Pandas, Matplotlib 등)를 지원합니다. 또한 TensorFlow, PyTorch 같은 머신 러닝 라이브러리를 사용할 수 있습니다. 

 

- 협업: Google 드라이브와의 통합을 통해 쉽게 공유하고, 다른 사용자와 협업할 수 있습니다. 문서 형식의 노트북에서 직접 코드를 작성하고 실행할 수 있어 팀워크에 유용합니다. 

 

- 교육 및 연구 목적: 교육과 연구를 위한 훌륭한 도구로, 대학 강의, 워크샵, 개인 프로젝트 등 다양한 목적으로 활용됩니다. 

 

Google Colab은 특히 하드웨어 리소스에 제한이 있는 사용자나 빠른 프로토타이핑을 원하는 데이터 과학자 및 연구자들에게 매우 유용합니다. 

 

 

2. PostgreSQL 은 무엇인가? 

 

PostgreSQL은 고급 오픈 소스 관계형 데이터베이스 관리 시스템(RDBMS)입니다. PostgreSQL은 강건함, 확장성, 그리고 SQL 표준 준수로 잘 알려져 있습니다. PostgreSQL은 복잡한 쿼리, 외래 키, 트리거, 뷰, 트랜잭션의 무결성, 다중 버전 동시성 제어 등 다양한 기능을 제공합니다. 간단한 웹 애플리케이션부터 복잡한 데이터 웨어하우징 및 지리공간 데이터 분석에 이르기까지 다양한 애플리케이션에 적합한 선택입니다.  

 

 

3. pgvector extension은 무엇인가? 


pgvector는 고차원 벡터 공간에서의 효율적인 유사성 검색을 위해 설계된 PostgreSQL 확장 기능(an extension for Pogres for efficient similarity search in high-dimensional vector spaces)입니다. 이미지, 텍스트, 오디오 등에 대한 임베딩과 같은 벡터가 일반적인 머신 러닝 애플리케이션에 특히 유용합니다. pgvector는 추천 시스템, 이미지 검색, 자연어 처리 애플리케이션과 같은 작업에 필수적인 빠른 최근접 이웃 검색을 지원합니다. 

pgvector의 주요 측면은 다음과 같습니다. 

- 벡터 데이터 타입 (Vector Data Type): 벡터를 저장하기 위한 새로운 데이터 타입을 도입합니다.  
- 벡터용 인덱싱 (Indexing for Vectors): 고차원 데이터에서 검색 성능을 향상시키는 벡터에 최적화된 인덱싱 방법을 제공합니다. 

  : IVFFlat (Inverted File with Flat Compression), HNSW (Hierarchical Navigable Small World) 
- PostgreSQL과의 통합 (Integration with PostreSQL): 강력한 데이터베이스 기능을 활용하여 PostgreSQL과 원활하게 작동합니다. 
- 머신 러닝 파이프라인에서의 사용 (Use in Machine Learning Pipelines): 임베딩과 같은 머신 러닝 모델 출력의 저장 및 쿼리에 이상적입니다. 

 

PostgreSQL과 pgvector가 오픈소스이고 확장성(Scalability) 이 뛰어나다는 점은 다른 전용 Vector DB 대비 큰 강점입니다. (아래 블로그 포스팅 참고)

 

* Why did we replace Pinecone with PGVecotr?:  

https://medium.com/@jeffreyip54/why-we-replaced-pinecone-with-pgvector-2f679d253eba

 

PostgreSQL과 pgvector의 결합은 특히 머신 러닝 모델을 포함하는 복잡한 데이터 집약적 애플리케이션을 친숙하고 강력한 데이터베이스 환경 내에서 처리할 수 있게 합니다. 

 

 

 

이번 포스팅에서는 

 

(1) Google Colab에 PostgreSQL 설치하기

(2) Google Colab에 설치된 PostgreSQL에 pgvector extension 설치하기

 

에 대해서 소개하겠습니다. 

Google Colab, PostgreSQL, pgvector extension

 

 

 

(1) Google Colab에 PostgreSQL 설치하기

(How to install PostgreSQL in Google Colab?)

 

- PostgreSQL 설치 

 

!sudo apt-get -y -qq update
!sudo apt-get -y -qq install postgresql

 

 

- PostgreSQL 서버 서비스 시작하기

 

!sudo service postgresql start

 

 

- User와 Password 설정하기

 

# Setup a password 'postgres' for username 'postgres'
!sudo -u postgres psql -U postgres -c "ALTER USER postgres PASSWORD 'postgres';"

 

 

- 'dev' 데이터베이스 만들기

 

# Setup a database with name 'dev' to be used
!sudo -u postgres psql -U postgres -c "DROP DATABASE IF EXISTS dev;"
!sudo -u postgres psql -U postgres -c "CREATE DATABASE dev;"

 

 

- 'dev' 데이터베이스에 연결하기

 

# set connection
%env DATABASE_URL=postgresql://postgres:postgres@localhost:5432/dev

 

 

- %load_ext sql 로 SQL 확장 모듈 로드하기 

 

%load_ext는 Jupyter Notebook과 같은 IPython 환경에서 사용하는 매직 명령어 중 하나입니다. 이 명령어는 확장(extension) 모듈을 로드하고 활성화하는 데 사용됩니다. 확장 모듈은 추가 기능을 제공하며, %load_ext sql을 사용하여 SQL 쿼리를 실행할 수 있습니다. 

 

# To load the sql extention to start using %%sql
%load_ext sql

 

 

- SQL query 테스트 

 

# You can start executing postgres sql commands
%%sql
select version();

-- version
-- PostgreSQL 14.9 (Ubuntu 14.9-0ubuntu0.22.04.1) on x86_64-pc-linux-gnu, compiled by gcc (Ubuntu 11.4.0-1ubuntu1~22.04) 11.4.0, 64-bit

 

 

 

(2) Google Colab의 PostgreSQL에 pgvector extension 설치하기

(How to install pgvector extension in Google Colab?)

 

- git clone 해서 pgvector extension 설치 파일 다운로드하기

 

!git clone --branch v0.5.1 https://github.com/pgvector/pgvector.git

 

 

- pgvector 설치 파일이 다운로드 되어있는 폴더로 경로 변경

(Google Colab의 마운트 한 후, content 폴더에 저장됨)

 

%cd /content/pgvector

 

 

- pgvector 설치

 

!make
!make install

 

 

- pgvector 확장 실행: CREATE EXTENSION vector;

  1. PostgreSQL 서버에 로그인
  2. PostgreSQL의 확장 벡터를 생성하려는 데이터베이스로 연결
  3. 확장 벡터를 생성하려는 데이터베이스에서 다음 명령어를 실행 

(데이터베이스 별로 최초 1회만 실행해주면 됩니다)

%%sql
CREATE EXTENSION IF NOT EXISTS vector;

 

 

 

참고로, pgvector 에서 제공하는 연산자는 아래의 6개가 있습니다. 사용자 질문과 문서 간 텍스트 임베딩에 대한 Semantic Search 에 코사인 유사도 (1 - Cosine Distance) 가 많이 사용됩니다. 

 

연산자 (operator) 설명 (description)
+ 요소 별 더하기 (Element-wise Addition)
- 요소 별 빼기 (Element-wise Subtraction)
* 요소 별 곱하기 (Element-wise Multiplication)
<-> 유클리드 거리 (Euclidean Distance)
<#> 음의 내적 (Negative Inner Product)
<=> 코사인 거리 (Cosine Distance)

 

 

[ Reference ]

* Postres pgvector: https://github.com/pgvector/pgvector

* Vector Indexes in Postgres using pgvector: IVFFlat vs. HNSW: 
https://tembo.io/blog/vector-indexes-in-pgvector/

 

이번 포스팅이 많은 도움이 되었기를 바랍니다. 

행복한 데이터 과학자 되세요!  :-)

 

728x90
반응형
Posted by Rfriend
,
[알림] * 본 포스팅 글은 Ahmed Rachid Hazourli (Greenplum Data Engineer in VMware) 가 medium.com 에 2023.5.29일에 "Building large-scale AI-powered search in Greenplum using pgvector and OpenAI"
 라는 제목으로 포스팅한 글을 저자의 동의를 얻어서 한국어로 번역한 것입니다.

 

 

Greenplum의 pgvector와 OpenAI를 이용하여 대규모 AI 기반 검색 구축하기
(Building large-scale AI-powered search in Greenplum using pgvector and OpenAI)

 

[들어가는 글]

지난 몇 년간 ChatGPT와 같은 AI 모델의 기하급수적인 발전은 많은 조직이 생성 AI(Generative AI) 및 LLM(Large Language Model)을 출시하여 사용자 경험을 향상시키고 텍스트에서 이미지, 비디오에 이르기까지 비정형 데이터의 잠재력을 최대한 활용하도록 영감을 주었습니다.

이 블로그 글에서는 Greenplum 데이터 웨어하우스 내에서 pgvector 확장의 벡터 유사성 검색(vector similarity search) 기능을 활용하고 이를 OpenAI 모델과 결합하여 페타바이트급 대규모 텍스트 데이터에서 귀중한 통찰력을 추출하고 Greenplum의 놀라운 MPP 아키텍처(Massively Parallel Processing Architecture)를 활용하는 방법에 대해 알아보겠습니다.

 

large-scale AI-powered search using Greenplum pgvector and OpenAI

 

 

 

도입 (Introduction): 


기업들은 AI를 위해 데이터 플랫폼을 확장하고 챗봇, 추천 시스템 또는 검색 엔진에 대용량 언어 모델을 사용할 수 있는 기술과 방법을 찾기 시작했습니다. 

그러나 한 가지 구체적인 과제는 이러한 AI 모델을 관리 및 배포하고 ML 생성 임베딩(ML-generated embeddings)을 규모있게 저장 및 쿼리하는 것이었습니다.

 


임베딩이란 무엇입니까? (What are embeddings?)


임베딩(Embeddings)은 데이터 또는 텍스트, 이미지 또는 오디오와 같은 복잡한 객체를 고차원 공간의 숫자들의 리스트로 변환하는 것을 말합니다.

Embedding model, 임베딩 모델

* 이미지 출처: OpenAI

 

 

 

 

이 기술은 데이터의 의미와 맥락(의미론적 관계, semantic relationships) 및 데이터 내의 복잡한 관계와 패턴(구문론적 관계, syntactic relationship)에 대한 지식을 캡처/이해할 수 있게 해주는 모든 기계학습(ML) 또는 딥러닝(DL) 알고리듬에 사용됩니다.

 

Embedding Model, 임베딩 모델

* 이미지 출처: https://www.pinecone.io/learn/vector-embeddings/

 

 

 

정보 검색, 이미지 분류, 자연어 처리 등 다양한 애플리케이션에 대해 벡터 표현(vector representations) 결과를 사용할 수 있습니다. 

Object - Vector - Task

* 이미지 출처: https://dev.to/josethz00/vector-databases-5df1

 

 

 

 

다음 다이어그램은 2D 공간에서 단어 임베딩(woed embeddings in 2D space)이 어떻게 보여지는지를 시각적으로 나타냅니다.

word embedding in 2D space

* 이미지 출처: https://neon.tech/blog/building-an-ai-powered-chatbot-using-vercel-openai-and-postgres


의미론적으로 유사한 단어들이 임베딩에서 서로 가까이 있다는 것을 알 수 있습니다. 예를 들어, "사과"라는 단어는 "개"나 "고양이"보다 "오렌지"에 더 가깝습니다.

임베딩을 생성한 후, 회사는 벡터 공간 내에서 유사성 검색(similarity searches)을 수행하고 제품 추천 시스템과 같은 AI 애플리케이션을 구축할 수 있습니다. 

 

 

 

pgvector를 사용하여 Greenplum에 임베딩 저장하기
(Storing embeddings in Greeplum using pgvector)

 

Greenplum 7은 pgvector 확장(pgvector extension) 덕분에 벡터 임베딩을 대규모로 저장하고 쿼리할 준비가 잘 되어 있습니다. 이를 통해 Greenplum 데이터 웨어하우스에 벡터 데이터베이스(vector database) 기능을 제공하여 사용자가 빠른 검색과 효율적인 유사성 검색을 수행할 수 있습니다. 

 

Greenplum의 pgvector 를 사용하여 ML 지원 응용프로그램에 대한 데이터베이스를 설정, 운영 및 확장할 수 있습니다.

예를 들어, 스트리밍 서비스는 pgvector를 사용하여 방금 본 것과 유사한 영화 추천 목록을 제공할 수 있습니다.

 

movie recommendations using embeddings

 

 

 

왜 Greenplum 이고 pgvector 인가? 

 

많은 기업이 다른 벡터 데이터베이스를 관리하지 않고도 엔터프라이즈 데이터 웨어하우스 내에서 벡터 의미 검색(vector semantic searches)을 저장, 쿼리 및 수행하려고 합니다.

다행히 Greenplum과 pgvector를 결합하면 AI 모델의 임베딩을 사용하여 빠르고 확장 가능한 애플리케이션을 구축하고 더 빨리 운영에 들어갈 수 있습니다. 

 

 

 

 

pgvector와 OpenAI를 사용하여 Greenplum에서 제품 설명서에 사용할 AI-Assistant를 구축하기.

 


문맥: 

우리 모두는 이전에 ChatGPT와 같은 챗봇을 사용한 적이 있으며 캐주얼하고 범용적인 질문에 적합하다는 것을 알았습니다. 하지만, 깊고 도메인별 지식이 필요할 때 ChatGPT는 부족하다는 것을 알아차렸을 수도 있습니다. 또한, 그것은 지식의 격차를 메우기 위해 답을 만들고 결코 출처를 언급하지 않습니다.

하지만 어떻게 이것을 개선할 수 있을까요? 적합한 데이터 소스를 정확하게 검색하고 질문에 답변하는 ChatGPT를 구축하려면 어떻게 해야 할까요?

 


답변:

이 질문에 대한 대답은 제품 설명서를 검색 가능하게 만들고 작업별 프롬프트를 OpenAI에 제공하면 결과가 더 신뢰할 수 있다는 것입니다. 즉, 사용자가 질문할 때 Greenplum 테이블에서 적합한 데이터 세트를 검색하도록 pgvector에게 요청합니다. 그런 다음 사용자의 질문에 답변하기 위한 참조 문서(reference document)로 OpenAI에 제공합니다. 

 

 

 

실제 임베딩 적용하기:

이 섹션에서는 임베딩을 실제 적용한 모습을 살펴보고, 임베딩 저장을 용이하게 하고 벡터의 가장 가까운 이웃에 대한 쿼리를 가능하게 하는 Greenplum에 대한 오픈 소스 pgvector 확장을 사용하는 방법을 배울 것입니다.

다음 그림과 같이 OpenAI를 사용하여 지능형 챗봇을 구축하고 시맨틱 텍스트 검색을 통해 Greenplum, RabbitMQ, Gemfire, VMware SQL 및 VMware Data Service Manager에 대한 자세한 기술적 질문에 답변할 수 있는 VMware 데이터 솔루션에 대한 도메인별 지식을 얻을 수 있도록 지원함으로써 이 기능을 시연합니다:

 

Greenplum Database, RabbitMQ, Gemfire, OpenAI ChatGPT

 

 

주요 절차는 다음과 같습니다. 

 

main steps of using Greenplum and OpenAI

 

 

 

1. pgvector extension 을 설치하고 활성화합니다.

 

pgvector 를 설치한 후에 Greenplum에서 벡터 임베딩의 저장을 시작하고 다음과 같이 pgvector 실행을 활성화하여 의미 검색(semantic searches)을 수행할 수 있습니다:

CREATE EXTENSION vector;

 

 

 

2. VECTOR 데이터 유형으로 제품 설명서 테이블 만들기

 

다음 SQL 쿼리로 제품 설명서와 임베딩을 저장할 테이블을 만들어 보겠습니다:

 

CREATE TABLE tanzu_documents (
  id bigserial primary key,
  content text,
  embedding vector(1536)
)
DISTRIBUTED BY (id)
;

 

pgvector는 벡터(VECTOR data-type)라고 불리는 새로운 데이터 유형을 도입합니다. 우리는 위의 쿼리 코드에서 벡터 데이터 유형으로 임베딩 열을 만들었습니다. 벡터의 크기는 벡터가 얼마나 많은 차원을 보유하는지 정의합니다. OpenAI의 text—embedding-ada-002 모델은 1,536개의 차원을 출력하므로 벡터 크기에 사용할 것입니다.

이 게시물에서 OpenAI API를 사용하고 있으므로 다음을 실행하는 모든 Greenplum 호스트에 openai 패키지를 설치합니다:

 

gpssh -f gphostsfile -e 'pip3 install -y openai'

 

또한 이 임베딩을 생성한 원본 제품 설명서 텍스트를 저장하기 위해 content 라는 text 열을 만듭니다.

 

참고: 위의 table은 Greenplum 세그먼트에 걸쳐 "id" 열을 기준으로 분산 저장(distributed by the “id”)되며, pgvector extension은 Greenplum 기능과 완벽하게 작동합니다. 따라서 분산저장에서 파티셔닝에 이르기까지 Greenplum의 MPP(Massiviely Parallel Processing) 기능에 대량의 데이터를 관리하고 검색하는 pgvector의 효율성을 추가하면 Greenplum 사용자는 확장 가능한 규모있는 AI 애플리케이션을 구축할 수 있습니다.

 

 

 

3. OpenAI 임베딩 가져오기 위한 Greenplum PL/Python 함수 

 

이제 문서에 대한 임베딩을 생성해야 합니다. 여기서는 OpenAI의 text-message-ada-002 모델 API를 사용하여 텍스트에서 임베딩을 생성합니다.

가장 좋은 방법은 Greenplum 데이터베이스 내에 PL/Python3u 절차적 언어(Procedural Language)를 사용하여 Python 함수를 생성하는 것입니다. 다음 Greenplum Python 함수는 각 입력 문서에 대한 벡터 임베딩(vector embeddings)을 반환합니다.

 

CREATE OR REPLACE FUNCTION get_embeddings(content text)
RETURNS VECTOR
AS
$$

  import openai
  import os
  text = content
  openai.api_key = os.getenv("OPENAI_API_KEY")
  response = openai.Embedding.create(
         model="text-embedding-ada-002",
         input = text.replace("\n"," ")
     )
  
  embedding = response['data'][0]['embedding']
  return embedding

$$ LANGUAGE PLPYTHON3U;

* 참고: OpenAI API key 생성하는 방법은 https://rfriend.tistory.com/794 를 참고하세요. 

 

 

 

4. Greenplum 테이블에 데이터 넣기


원본 텍스트를 tanzu_documents 테이블, 특히 content 열에 로딩한 다음, embedding 열을 업데이트하고 이전에 생성된 get_messages Python 함수를 사용하여 모든 컨텐츠에 대해 OpenAI 임베딩을 생성합니다:

 

UPDATE tanzu_documents SET embedding  = get_embeddings(content);

 

 

 

5. 첫 번째 의미론적 검색 (Semantic Search) 질의


pgvector의 코사인 거리를 사용하여 (<=> 연산자를 사용하여) 첫 번째 의미 검색 쿼리를 만들고, 질문과 가장 유사한 텍스트(즉, 최소 거리를 가진 텍스트)를 찾아보겠습니다: Greenplum 설치 방법? (How to install Greenplum? )

 

WITH cte_question_embedding AS 
  (
SELECT 
	get_embeddings(
    	'How to create an external table in Greenplum 
         using PXF to read from an Oracle database ?'
         ) 
        AS question_embeddings 
) 

SELECT 
	id
    , content
    , embedding <=> cte_question_embedding.question_embeddings AS distance 
FROM tanzu_documents, cte_question_embedding  
ORDER BY embedding <=> cte_question_embedding.question_embeddings ASC 
LIMIT 1;

 

pgvector는 유사성을 계산하는 데 사용할 수 있는 세 가지 새로운 연산자를 소개합니다: 
 - (1) 유클리드 거리 (Euclidean distance)(L2 거리): <->, 
 - (2) 음의 내적 (Negative inner product): <#>, 
 - (3) 코사인 거리 (Cosine distance): <=> 

* 참고: 유클리드 거리 (Euclidean distance)는 https://rfriend.tistory.com/199 를 참고하세요.

            코사인 거리 (Cosine distance)는 https://rfriend.tistory.com/319 를 참고하세요. 

 

 

 

SELECT 문은 다음 출력을 반환해야 합니다:

 

id       | 640
content  | title: Accessing External Data with PXF 
           -- Data managed by your organisation may already reside in external sources
           -- such as Hadoop, object stores, and other SQL databases. 
           -- The Greenplum Platform Extension Framework \(PXF\) provides access 
           -- to this external data via built-in connectors that map an external 
           -- data source to a Greenplum Database table definition. 
           -- PXF is installed with Hadoop and Object Storage connectors. 
           -- These connectors enable you to read external data stored in text, 
           -- Avro, JSON, RCFile, Parquet, SequenceFile, and ORC formats. 
           -- You can use the JDBC connector to access an external SQL database. 
           -- > **Note** In previous versions of the Greenplum Database, 
           -- you may have used the `gphdfs` external table protocol to access 
           -- data stored in Hadoop. Greenplum Database version 6.0.0 
           -- removes the `gphdfs` protocol. Use PXF and the `pxf` external table 
           -- protocol to access Hadoop in Greenplum Database version 6.x. 
           -- The Greenplum Platform Extension Framework includes 
           -- a C-language extension and a Java service. 
           -- After configuring and initialising PXF, you start a single 
           -- PXF JVM process on each Greenplum Database segment host. 
           -- This long-running process concurrently serves multiple query requests. 
           -- For detailed information about the architecture of and using PXF, 
           -- refer to the [Greenplum Platform Extension Framework \(PXF\)]
           -- (https://docs.vmware.com/en/VMware-Greenplum-Platform-Extension-Framework
           -- /6.6/greenplum-platform-extension-framework/overview_pxf.html) documentation. 
           -- **Parent topic:** [Working with External Data]
           -- (../external/g-working-with-file-based-ext-tables.html) **Parent topic:** 
           -- [Loading and Unloading Data](../load/topics/g-loading-and-unloading-data.html)
distance | 0.12006528354516588

 

 

 

6. 유사성 검색 SQL 함수:


많은 임베딩에 대해 유사성 검색을 수행할 예정이기 때문에, 이를 위한 SQL 사용자 정의 함수를 생성합니다:

CREATE OR REPLACE FUNCTION match_documents (
  query_embedding VECTOR(1536),
  match_threshold FLOAT,
  match_count INT
)

RETURNS TABLE (
  id BIGINT,
  content TEXT,
  similarity FLOAT
)

AS $$

  SELECT
    documents.id,
    documents.content,
    1 - (documents.embedding <=> query_embedding) AS similarity
  FROM tanzu_documents documents
  WHERE 1 - (documents.embedding <=> query_embedding) > match_threshold
  ORDER BY similarity DESC
  LIMIT match_count;

$$ LANGUAGE SQL STABLE;

 

 


위에서 정의한 match_documents 함수를 사용하여 다음과 같이 가장 유사한 텍스트를 OpenAI 모델에 제공합니다:

 

SELECT t.id, t.content, t.similarity
  FROM match_documents(
      (select get_embeddings(
          'How to create an external table in Greenplum using PXF 
           to read from an Oracle database ?')) 
      , 0.8
      , 1) t
;
id         | 640
content    | title: Accessing External Data with PXF 
	-- Data managed by your organisation may already reside in external sources 
    -- such as Hadoop, object stores, and other SQL databases. 
    -- The Greenplum Platform Extension Framework \(PXF\) provides access 
    -- to this external data via built-in connectors 
    -- that map an external data source to a Greenplum Database table definition. 
    -- PXF is installed with Hadoop and Object Storage connectors. 
    -- These connectors enable you to read external data stored in text, Avro, 
    -- JSON, RCFile, Parquet, SequenceFile, and ORC formats. 
    -- You can use the JDBC connector to access an external SQL database. 
    -- > **Note** In previous versions of the Greenplum Database, 
    -- you may have used the `gphdfs` external table protocol to access data 
    -- stored in Hadoop. Greenplum Database version 6.0.0 removes the `gphdfs` protocol. 
    -- Use PXF and the `pxf` external table protocol to access Hadoop in 
    -- Greenplum Database version 6.x. 
    -- The Greenplum Platform Extension Framework includes a C-language extension 
    -- and a Java service. After configuring and initialising PXF, 
    -- you start a single PXF JVM process on each Greenplum Database segment host. 
    -- This long-running process concurrently serves multiple query requests. 
    -- For detailed information about the architecture of and using PXF, 
    -- refer to the [Greenplum Platform Extension Framework \(PXF\)]
    -- (https://docs.vmware.com/en/VMware-Greenplum-Platform-Extension-Framework/6.6/
    -- greenplum-platform-extension-framework/overview_pxf.html) documentation. 
    -- **Parent topic:** [Working with External Data](../external/g-working-with-file-based-ext-tables.html) 
    -- **Parent topic:** [Loading and Unloading Data](../load/topics/g-loading-and-unloading-data.html)
similarity | 0.8775289173395486

 

 

 

7. 벡터 인덱싱 (Vectors Indexing):

 

우리의 테이블은 임베딩과 함께 시간이 지남에 따라 커질 수 있으며, 수십억 개의 벡터에 걸쳐 의미 검색을 수행하기를 원할 것입니다.

pgvector의 뛰어난 점은 쿼리 속도를 높이고 검색 속도를 높일 수 있는 인덱싱(Indexing) 기능입니다.

벡터 인덱스는 정확한 최근접이웃 검색(ANN/KNN, Nearest Neighbour)을 수행합니다. 벡터는 유사성에 따라 그룹화되지 않으므로 순차적 검색(sequential scan)을 통해 가장 가까운 이웃을 찾는 작업은 느리며, 유사성에 따라 정렬을 빠르게 하는 것이 중요합니다(ORDER BY 절). 

각 거리 연산자에는 서로 다른 유형의 인덱스가 필요합니다. 시작할 때 적절한 수의 lists 는 1백만개 행까지는 1,000개, 1백만개 이상의 경우 sqrt(행) 개입니다. 코사인 거리로 정렬하기 때문에 vector_cosine_ops 인덱스를 사용합니다. 

 

-- Create a Vector Index 
CREATE INDEX ON tanzu_documents 
USING ivfflat (embedding vector_cosine_ops)
WITH
  (lists = 300);

-- Analyze table
ANALYZE tanzu_documents;

 

pgvector 인덱싱에 대한 자세한 내용은 여기에서 확인하십시오.
https://github.com/pgvector/pgvector#indexing

 

 

 

8. 관련 답변에 적합한 데이터 세트를 OpenAI 모델에 제공합니다. 


사용자의 인풋과 사용자 인풋에 가장 유사한 텍스트 둘 다를 인풋으로 사용해서 OPenAI 모델에게 답하도록 질문하는 PL/Python 함수를 정의합니다.  

CREATE FUNCTION ask_openai(user_input text, document text)
RETURNS TEXT
AS
$$

   import openai
   import os

   openai.api_key = os.getenv("OPENAI_API_KEY")
   search_string = user_input
   docs_text = document

   messages = [
   	{"role": "system",
    "content": "You concisely answer questions based on text provided to you."}
    ]

   prompt = """Answer the user's prompt or question:

   {search_string}

   by summarising the following text:

   {docs_text}

   Keep your answer direct and concise. Provide code snippets where applicable.
   The question is about a Greenplum / PostgreSQL database. 
   You can enrich the answer with other Greenplum or PostgreSQL-relevant details 
   if applicable.""".format(
   		search_string=search_string, 
        docs_text=docs_text
        )

   messages.append({"role": "user", "content": prompt})

   response = openai.ChatCompletion.create(
   		model="gpt-3.5-turbo", 
        messages=messages
        )
   
   return response.choices[0]["message"]["content"]

$$ LANGUAGE PLPYTHON3U;

 

 

 

9. 더 똑똑한 검색 기능 만들기

 

앞서 언급했듯이, ChatGPT는 기존의 문서만 반환하지 않습니다. ChatGPT는 다양한 정보를 이해해서 하나의 응집력있는 대답으로 만들 수 있습니다. 이를 위해 GPT에 관련 문서와 이 답변을 생성하는 데 사용할 수 있는 프롬프트를 제공해야 합니다.

마지막 단계로, 우리는 지능형 AI-Assistant 애플리케이션을 서비스하기 위해 이전 기능을 단일 프로세스로 결합해야 합니다. 

이전 기능과 임베딩은 프롬프트를 2단계 프로세스로 분할하여 이 문제를 해결할 수 있습니다: 

  1. 임베딩 데이터베이스에 질문과 가장 관련성이 높은 문서를 조회합니다.
  2. 이러한 문서를 OpenAI 모델이 답변에서 참조할 컨텍스트로 삽입합니다.

CREATE OR REPLACE FUNCTION intelligent_ai_assistant(
  user_input TEXT
)

RETURNS TABLE (
  content TEXT
)
LANGUAGE SQL STABLE
AS $$

  SELECT
    ask_openai(user_input, 
              (SELECT t.content 
                FROM match_documents(
                      (SELECT get_embeddings(user_input)) , 
                        0.8, 
                        1) t
                )
    );

$$;

 

위의 SQL 함수는 사용자 입력을 가져다가 임베딩으로 변환하고, tanzu_documents 테이블에 대해 pgvector를 사용하여 의미론적 텍스트 검색을 수행하여 가장 관련성이 높은 문서를 찾고, 마지막으로 이를 OpenAI API 호출에 대한 참조 텍스트로 제공하여 최종 답변을 반환합니다.

 

 

 

10. OpenAI 및 Streamlit 🎈를 활용한 시맨틱 텍스트 검색 기능으로 강화된 자체 챗봇 구축 🤖

 

마지막으로, 우리는 문서를 이해하고 pgvector 시맨틱 텍스트 검색과 함께 Greenplum 데이터 웨어하우스를 사용하는 Streamlit 🎈 챗봇 🤖를 개발했습니다.

챗봇 스트림릿 애플리케이션은 https://greenplum-pgvector-chatbot.streamlit.app/에서 이용할 수 있습니다. 

 

streamlit chatbot

 

소스 코드는 https://github.com/ahmedrachid/streamlit-chatbot-greenplum 에서 확인할 수 있습니다

 

🚀 결론

결론적으로, 확장 가능한 AI 애플리케이션을 구축하고자 하는 기업은 Greenplum 의 대규모 병렬 처리 기능 및 성능을 pgvector 연산과 결합함으로써, 방대한 양의 벡터 임베딩 및 비정형 데이터에 대해 빠른 검색, 유사성 및 의미 검색을 수행할 수 있습니다.



참조 (References): 

1. Open-source Greenplum data warehouse
   : https://greenplum.org/
2. VMware Greenplum data warehouse
   : https://docs.vmware.com/en/VMware-Tanzu-Greenplum/index.html
3. pgvector extension - Open-source vector similarity search for Postgres
   : https://github.com/pgvector/pgvector

 

 

읽어주셔서 감사합니다! 어떠한 의견이나 제안도 환영합니다!
여기에서 다른 Greenplum 기사를 확인하십시오.

 

 

이번 포스팅이 많은 도움이 되었기를 바랍니다. 

행복한 데이터 과학자 되세요!  :-)

 

728x90
반응형
Posted by Rfriend
,

이번 포스팅에서는 PostgreSQL, Greenplum DB에서 Window 함수를 사용해서 그룹별로 요약 통계량을 구하고, 이 통계량을 이용해서 새로운 변수를 만드는 방법을 소개하겠습니다. 

 

(1) 전체 평균: MAX(value) OVER(PARTITION BY NULL) 을 계산해서 새로운 변수 만들기

(2) 그룹별 평균: MAX(value) OVER(PARTITION BY group_col) 을 계산해서 새로운 변수 만들기

 

 

먼저, 예제로 사용할 테이블을 만들어보겠습니다. 

 

-------------------------------------------------------------------
-- Aggregation by groupby using Window Function
-------------------------------------------------------------------

-- creating a sample table
DROP TABLE IF EXISTS tbl;
CREATE TABLE tbl (
	grp TEXT 
	, id INT
	, val INT
);

INSERT INTO tbl VALUES 
  ('a', 1, 4)
, ('a', 2, 1)
, ('a', 3, 3)
, ('a', 4, 5)
, ('a', 5, 2)
, ('b', 6, 7)
, ('b', 7, 5)
, ('b', 8, 8)
, ('b', 9, 10)
, ('b', 10, 9)
;

SELECT * FROM tbl ORDER BY 1, 2;
--grp|id|val|
-----+--+---+
--a  | 1|  4|
--a  | 2|  1|
--a  | 3|  3|
--a  | 4|  5|
--a  | 5|  2|
--b  | 6|  7|
--b  | 7|  5|
--b  | 8|  8|
--b  | 9| 10|
--b  |10|  9|

 

 

 

(1) 전체 평균: MAX(value) OVER (PARTITION BY NULL) 을 계산해서 새로운 변수 만들기

 

아래 예에서는 그룹별 구분없이 전체 최대값을 계산해서 MAX(val) OVER(PARTITION BY NULL) 해서 원래 값을 나누어주어서 새로운 변수 val_max_ration 를 만들어주었습니다. 

 

-- MAX value for all values using Window Function OVER(PARTITION BY NULL)
SELECT 
	a.*
	, val::NUMERIC / MAX(val) OVER(PARTITION BY NULL) 
		AS val_max_ratio 
FROM tbl AS a
ORDER BY 1, 2
;

--grp|id|val|val_max_ratio         |
-----+--+---+----------------------+
--a  | 1|  4|0.40000000000000000000|
--a  | 2|  1|0.10000000000000000000|
--a  | 3|  3|0.30000000000000000000|
--a  | 4|  5|0.50000000000000000000|
--a  | 5|  2|0.20000000000000000000|
--b  | 6|  7|0.70000000000000000000|
--b  | 7|  5|0.50000000000000000000|
--b  | 8|  8|0.80000000000000000000|
--b  | 9| 10|1.00000000000000000000|
--b  |10|  9|0.90000000000000000000|

 

 

 

(2) 그룹별 평균: MAX(value) OVER(PARTITION BY group_col) 을 계산해서 새로운 변수 만들기

 

아래의 예에서는 MAX(val) OVER(PARTITION BY grp)그룹별 최대값을 구해서 원래 값을 나누어줌으로써 그룹별 최대값 대비 값의 비율(val_grp_max_ration) 이라는 새로운 변수를 만들었습니다. 

 

-- MAX value by Group using Window Function OVER(PARTITION BY grp)
SELECT 
	a.*
	, val::NUMERIC / MAX(val) OVER(PARTITION BY grp) 
		AS val_grp_max_ratio 
FROM tbl AS a
ORDER BY 1, 2
;

--grp|id|val|val_grp_max_ratio         |
-----+--+---+----------------------+
--a  | 1|  4|0.80000000000000000000|
--a  | 2|  1|0.20000000000000000000|
--a  | 3|  3|0.60000000000000000000|
--a  | 4|  5|1.00000000000000000000|
--a  | 5|  2|0.40000000000000000000|
--b  | 6|  7|0.70000000000000000000|
--b  | 7|  5|0.50000000000000000000|
--b  | 8|  8|0.80000000000000000000|
--b  | 9| 10|1.00000000000000000000|
--b  |10|  9|0.90000000000000000000|

 

 

이번 포스팅이 많은 도움이 되었기를 바랍니다. 

행복한 데이터 과학자 되세요!  :-)

 

728x90
반응형
Posted by Rfriend
,

이번 포스팅에서는 PostgreSQL, Greenplum database에서 Array의 특정 위치의 값을 가져오는 방법, 특히 PL/Python 사용자 정의 함수를 이용해서 Array의 여러개 위치에서 값을 가져오는 방법을 소개하겠습니다. 

 

PostgreSQL, Greenplum database 에서 

(1) Array 에서 특정 위치의 하나의 원소 값 가져오기

(2) Array 에서 시작~끝 위치의 원소 값을 Slicing 해서 가져오기

(3) Array 에서 여러개 위치의 원소 값을 PL/Python 사용자 정의함수를 사용해서 가져오기

 

 

How to get multiple elements in an Array using PL/Python on PostgreSQL, Greenplum database

 

 

 

먼저 Array를 포함하는 예제로 사용할 간단할 테이블을 만들어보겠습니다. 

 

-- creating a sample table with array column
DROP TABLE IF EXISTS arr_sample_tbl;
CREATE TABLE arr_sample_tbl (
	grp TEXT
	, x_arr iNTEGER[]
);

INSERT INTO arr_sample_tbl VALUES 
('a',  ARRAY[1, 2, 3, 4, 5, 6, 7, 8, 9, 10])
, ('b', ARRAY[11, 12, 13, 14, 15, 16, 17, 18, 19, 20])
, ('c', ARRAY[21, 22, 23, 24, 25, 26, 27, 28, 29, 30])
, ('d', ARRAY[31, 32, 33, 34, 35, 36, 37, 38, 39, 40])
;

SELECT * FROM arr_sample_tbl ORDER BY grp;
--grp|x_arr                          |
-----+-------------------------------+
--a  |{1,2,3,4,5,6,7,8,9,10}         |
--b  |{11,12,13,14,15,16,17,18,19,20}|
--c  |{21,22,23,24,25,26,27,28,29,30}|
--d  |{31,32,33,34,35,36,37,38,39,40}|

 

 

 

(1) Array 에서 특정 위치의 하나의 원소 값 가져오기

 

PostgreSQL/ Greenplum Array 칼럼에 array_column[position::int] 처럼 [ ] 안에 위치 값을 하나의 정수로 넣어주면 됩니다. 아래 예에서는 x_arr Array 칼럼의 3번째 위치한 값을 가져와 봤습니다. 

 

-- getting a value in an array using postion
SELECT 
	grp
	, x_arr[3] -- POSITION INDEX ok
FROM arr_sample_tbl 
ORDER BY grp
;
--grp|x_arr|
-----+-----+
--a  |    3|
--b  |   13|
--c  |   23|
--d  |   33|

 

 

 

(2) Array 에서 시작~끝 위치의 원소 값을 Slicing 해서 가져오기

 

PostgreSQL/ Greenplum Array 칼럼에서 array_column[start_position:end_position] 구문으로 "시작 위치 ~ 끝 위치" 까지의 연속된 원소 값들을 Slicing 해올 수 있습니다. 아래 예제에서는 x_arr 의 Array 칼럼에서 1번째부터 3번째 위치까지의 연속된 원소값들을 Slicing 해왔습니다. 

 

-- slicing from start position to end position
SELECT 
	grp
	, x_arr[1:3] -- SLICING ok
FROM arr_sample_tbl 
ORDER BY grp
;
--grp|x_arr     |
-----+----------+
--a  |{1,2,3}   |
--b  |{11,12,13}|
--c  |{21,22,23}|
--d  |{31,32,33}|

 

 

 

(3) Array 에서 여러개 위치의 원소 값을 PL/Python 사용자 정의함수를 사용해서 가져오기

 

위의 (2)번에서는 Array에서 연속된 (시작 위치 ~ 끝 위치) 까지의 값을 Slicing 해왔는데요, 만약 연속된 위치의 여러개 값이 아니라 Array 안에 띄엄띄엄 있는 여러개의 원소 값들을 위치로 가져오고 싶을 때는 SQL syntax error 가 발생합니다. 

 

-- SQL Error [42601]: ERROR: syntax error at or near ","
SELECT 
	grp
	, x_arr[1, 4, 6, 9]  -- SQL Error -_-;;;
FROM arr_sample_tbl 
ORDER BY grp
;
--SQL Error [42601]: ERROR: syntax error at or near ","
--  Position: 24
--
--Error position: line: 528 pos: 23

 

 

PostgreSQL, Greenplum DB에서 Array 내에 띄엄띄엄 있는 여러개 위치의 원소값을 가져오고 싶을 때 아래의 PL/Python 사용자 정의함수를 사용해보세요. 참고로, Input으로 집어넣는 DB col_position 위치 값은 1부터 시작하는 반면, PL/Python의 내부 코드블록에서는 Python array의 위치 index가 0 부터 시작하기 때문에 np.array(col_postion) - 1 처럼 1을 빼줬습니다. 

아래 예제에서는 x_arr 칼럼의 1, 4, 6, 9 번째 위치에 있는 Array 복수개 원소들을 복수의 위치값을 사용해서 가져왔습니다. Greenplum 을 사용하면 분산병렬처리가 되기 때문에 PL/Python 사용자 정의함수의 처리 속도도 무척 빠르답니다! 

 

-- UDF: selecting multiple elements using PL/Python in PostgreSQL, Greenplum
DROP FUNCTION IF EXISTS plpy_arr_multi_idx_select(int[], int[]);
CREATE OR REPLACE FUNCTION plpy_arr_multi_idx_select(
          x_arr int[], col_position int[]
          ) 
RETURNS int[]
AS $$
    import numpy as np
    
    # INPUT ARRAY
    arr = np.array(x_arr)
    
    # COLUMN POSITION 
    # Python starts from 0 index. (vs. SQL starts from 1)
    # , so -1 from col_position
    col_position_arr = np.array(col_position) - 1
    
    # getting data by positional indexing
    arr_selected = arr[col_position_arr]
    
    return arr_selected

$$ LANGUAGE 'plpythonu';


-- executing the PL/Python UDF above
-- getting multiple values in an array 
-- using PL/Python UDF in PostgreSQL, Greenplum DB
SELECT 
	grp
	, plpy_arr_multi_idx_select( -- PL/Python USER DEFINED Funtion
		x_arr                -- INPUT ARRAY
		, ARRAY[1, 4, 6, 9]  -- COLUMN POSITION
	) AS selected_values
FROM arr_sample_tbl 
ORDER BY grp
;

--grp|selected_values|
-----+---------------+
--a  |{1,4,6,9}      |
--b  |{11,14,16,19}  |
--c  |{21,24,26,29}  |
--d  |{31,34,36,39}  |

 

 

이번 포스팅이 많은 도움이 되었기를 바랍니다. 

행복한 데이터 과학자 되세요!  :-)

 

728x90
반응형
Posted by Rfriend
,

이번 포스팅에서는 PostgreSQL, Greenplum Database에서 문자열에 대한 함수 3가지를 소개하겠습니다. 

 

(1) STRING_AGG() : 개별 문자열을 그룹별로 하나의 문자열로 합치기

     (concatenate individual strings into a string by group) 

(2) STRING_TO_ARRAY() : 문자열을 구분자를 기준으로 나누어서 여러개의 원소를 가진 Array 로 만들기

      (splits a string into array elements using supplied delimiter)

(3) UNNEST(STRING_TO_ARRAY()) : 문자열 원소를 가진 ARRAY를 나누어서 개별 문자열 행으로 풀기

       (the opposite of STRING_AGG(), splits array elements and unnest it into multiple rows)

 

 

concatenating strings into a string, unnest string array into multiple rows in PostgreSQL, Greenplum

 

 

예제로 사용할 country 테이블을 만들어보겠습니다. 

 

---------------------------------------------------------------------------------------------
-- String functions and operators in PostgreSQL, Greenplum Database
-- string_agg(), string_to_array(), unnest(string_to_array())
----------------------------------------------------------------------------------------------

DROP TABLE IF EXISTS country;
CREATE TABLE country (
	continent  TEXT
	, country TEXT 
);
INSERT INTO country VALUES 
('Asia', 'Korea')
, ('Asia', 'China')
, ('Asia', 'Japan')
, ('Ameria', 'USA')
, ('Ameria', 'Canada')
, ('Ameria', 'Mexico')
, ('Europe', 'UK')
, ('Europe', 'Fance')
, ('Europe', 'Germany');


SELECT * FROM country ORDER BY 1, 2;
--continent|country|
-----------+-------+
--Ameria   |Canada |
--Ameria   |Mexico |
--Ameria   |USA    |
--Asia     |China  |
--Asia     |Japan  |
--Asia     |Korea  |
--Europe   |Fance  |
--Europe   |Germany|
--Europe   |UK     |

 

 

 

(1) STRING_AGG() : 개별 문자열을 그룹별로 하나의 문자열로 합치기

     (concatenate individual strings into a string by group) 

 

-- (1) string_agg()
-- : non-null input values concatenated into a string, separated by delimiter
-- syntax: string_agg(expression, delimiter [order by])
DROP TABLE IF EXISTS country_agg_tbl;
CREATE TABLE country_agg_tbl AS (
	SELECT 
		continent
		, STRING_AGG(country, ',') AS country_agg
	FROM country 
	GROUP BY continent
); 

SELECT * FROM country_agg_tbl ORDER BY 1, 2;
--continent|country_agg      |
-----------+-----------------+
--Ameria   |USA,Canada,Mexico|
--Asia     |Korea,China,Japan|
--Europe   |UK,Fance,Germany |

 

 

 

(2) STRING_TO_ARRAY() : 문자열을 구분자를 기준으로 나누어서 여러개의 원소를 가진 Array 로 만들기

      (splits a string into array elements using supplied delimiter)

 

-- (2) string_to_array()
-- : splits string into array elements using supplied delimiter and optional null string
-- syntax: string_to_array(text, text [, text])


SELECT 
	continent
	, STRING_TO_ARRAY(country_agg, ',') AS country_array
FROM country_agg_tbl
ORDER BY 1;

--continent|country_array      |
-----------+-------------------+
--Ameria   |{USA,Canada,Mexico}|
--Asia     |{Korea,China,Japan}|
--Europe   |{UK,Fance,Germany} |

 

 

옵션으로 세번째 매개변수 위치에 "NULL 처리할 문자열"을 지정할 수 있습니다. 아래 예에서는 'USA' 문자열에 대해서 NULL 처리하였습니다. 

 

-- NULL string optional
SELECT 
	continent
	, STRING_TO_ARRAY(
		country_agg -- string
		, ','       -- delimiter
		, 'USA' -- NULL string
		) AS country_array
FROM country_agg_tbl
ORDER BY 1;

--continent|country_array       |
-----------+--------------------+
--Ameria   |{NULL,Canada,Mexico}|   -- 'USA' --> NULL
--Asia     |{Korea,China,Japan} |
--Europe   |{UK,Fance,Germany}  |

 

 

 

(3) UNNEST(STRING_TO_ARRAY()) : 문자열 원소를 가진 ARRAY를 나누어서 개별 문자열 행으로 풀기

       (the opposite of STRING_AGG(), splits array elements and unnest it into multiple rows)

 

-- (3) unnest(string_to_array())
-- splits array elements and unnest it into multiple rows
-- the opposite of string_agg()

SELECT 
	continent
	, UNNEST(STRING_TO_ARRAY(country_agg, ',')) AS country
FROM country_agg_tbl
ORDER BY 1, 2;

--continent|country|
-----------+-------+
--Ameria   |Canada |
--Ameria   |Mexico |
--Ameria   |USA    |
--Asia     |China  |
--Asia     |Japan  |
--Asia     |Korea  |
--Europe   |Fance  |
--Europe   |Germany|
--Europe   |UK     |

 

[Reference]

* PostgreSQL's STRING_AGG() function
   : https://www.postgresql.org/docs/9.4/functions-aggregate.html 

* PostgreSQL's STRING_TO_ARRAY() function
   : https://www.postgresql.org/docs/9.1/functions-array.html

 

 

이번 포스팅이 많은 도움이 되었기를 바랍니다. 

행복한 데이터 과학자 되세요!  :-)

 

 

728x90
반응형
Posted by Rfriend
,

이번 포스팅에서는 Python의 popen() 메소드PL/Python 사용자 정의 함수를 사용하여 

 

(1) Python 의 popen() 메소드 소개

(2) Greenplum DB에 설치된 Package 이름 리스트 확인

(3) Greenplum DB에 설치된 Docker Container Image 리스트 확인

(4) Greenplum DB에 등록된 PL/Container Runtime ID 확인

 

하는 방법을 소개하겠습니다. 

 

 

(1) Python 의 popen() 메소드 소개

 

Python 메서드 popen() 은 시스템 명령어(command) 실행 결과를 가져올 때 사용합니다. popen() 메소드의 반환 값은 파이프에 연결된 열린 파일 객체이며, 모드가 'r'(기본값) 이면 읽기모드이고, 'w' 이면 쓰기모드입니다. Buffering 크기도 정수로 설정할 수 있습니다. 

 

python os.popen() method

 

 

아래 예시는 Python의 os.popen() 메소드를 사용해 'ifconfig' command 를 실행하여 IP 주소를 확인해본 것입니다. 

 

import os

ifconfig = os.popen('ifconfig')
ifconfig_info = ifconfig.read()
ifconfig.close()

print(ifconfig_info)

# lo0: flags=xxxx<UP,LOOPBACK,...
# 	options=xxxx<RXCSUM,TXCSUM,...
# 	inet xxxxxxx netmask 0xff000000 
# 	inet6 ::xx prefixlen xxx 
# 	inet6 fe80::xxxx prefixlen xx scopeid xx 
# 	nd6 options=xxx<PERFORMNUD,DAD>
# gif0: flags=xxx<POINTOPOINT,MULTICAST> mtu 1280
# stf0: flags=0<> mtu xxxx
# en5: flags=xxx<UP,BROADCAST,SMART,RUNNING,SIMPLEX,MULTICAST> mtu 1500
# 	ether ac:de:48:00:11:22 
# 	inet6 fe80::aede:48ff:fe00:1122%en5 prefixlen xx scopeid 0x4 
# 	nd6 options=xxx<PERFORMNUD,DAD>
# 	media: autoselect (100baseTX <full-duplex>)
# 	status: active
# ap1: flags=xxxx<BROADCAST,SIMPLEX,MULTICAST> mtu xxxx
# 	options=xxx<CHANNEL_IO>
# 	ether aa:66:5a:20:77:d2 
# 	media: autoselect
# ...

 

 

 

(2) Greenplum DB에 설치된 Package 이름 리스트 확인

 

아래 코드는 os.popen() 메소드로 'gppkg -q --all' 시스템 명령어(command)를 실행해서 Greenplum DB 에 설치된 Package 이름 리스트를 확인하는 PL/Python 사용자 정의 함수입니다.  split() 후에 7번째 위치 이후의 값만 가져오면 됩니다. 

 

--- UDF for getting all package's list on Greenplum DB
DROP FUNCTION IF EXISTS plpy_gppkg_info_func();
CREATE OR REPLACE FUNCTION plpy_gppkg_info_func() 
RETURNS TABLE (LIST text) 
AS $$

	import os
	
	process = os.popen('gppkg -q --all')
	processed = process.read()
	process.close()
	
	# get only packages names 
	result = processed.split()[7:]
	
	return result

$$ LANGUAGE 'plpythonu';


-- Run the UDF above
SELECT plpy_gppkg_info_func() AS gppkg_info;
--gppkg_info                               |
------------------------------------------+
--DataSciencePython-2.0.5                 |
--DataScienceR-2.0.2                      |
--plr-3.0.4                               |
--plcontainer-2.1.5                       |
--pivotal_greenplum_backup_restore-1.25.0 |
--madlib-1.20.0_1                         |

 

 

 

 

(3) Greenplum DB에 설치된 Docker Container Image 리스트 확인

 

아래 코드는 os.popen() 메소드로 'plcontainer image-list' 시스템 명령어(command)를 실행해서 Greenplum DB 에 설치된 Docker Container Image 이름 리스트를 확인하는 PL/Python 사용자 정의 함수입니다.  줄바꿈 '\n' 구분자로 split() 하여서 결과값을 반환받았습니다. 

 

---- UDF for  displaying the installed Docker images on the local host use
DROP FUNCTION IF EXISTS plpy_container_imagelist_func();
CREATE OR REPLACE FUNCTION plpy_container_imagelist_func() 
RETURNS TABLE (LIST text) 
AS $$

	import os
	
	processed = os.popen('plcontainer image-list').read().split('\n')
	
	return processed

$$ LANGUAGE 'plpythonu';


-- Run the UDF above
SELECT plpy_container_imagelist_func() AS container_image_info;
--container_image_info                                                                    |
------------------------------------------------------------------------------------------+
--REPOSITORY                               TAG       IMAGE ID       CREATED         SIZE  |
--pivotaldata/plcontainer_python3_shared   devel     e4cda7f63158   20 months ago   6.71GB|
--pivotaldata/plcontainer_r_shared         devel     3a2472dec133   3 years ago     1.19GB|
--pivotaldata/plcontainer_python_shared    devel     c94546182146   3 years ago     2.34GB|
--                                                                                        |

 

 

(4) Greenplum DB에 등록된 PL/Container Runtime ID 확인

 

아래 코드는 os.popen() 메소드로 'plcontainer runtime-show' 시스템 명령어(command)를 실행해서 Greenplum DB 에 등록된 PL/Container Runtime ID 리스트를 확인하는 PL/Python 사용자 정의 함수입니다.  

'plcontainer runtime-add -r plc_python3_shared -i' 이런식으로 Docker Container Image 정보를 PL/Container configuration file 에 추가 등록해준 정보를 'plcontainer runtime-show' 로 확인할 수 있습니다. 

 

----- UDF for listing the names of the runtimes you created and added to the PL/Container XML file.
DROP FUNCTION IF EXISTS plpy_container_runtimeshow_func();
CREATE OR REPLACE FUNCTION plpy_container_runtimeshow_func() 
RETURNS TABLE (LIST text) 
AS $$

	import os
	
	processed = os.popen('plcontainer runtime-show').read().split('\n')
	
	return processed

$$ LANGUAGE 'plpythonu';


-- Run the UDF above
SELECT plpy_container_runtimeshow_func() AS container_runtimeshow_info;

--PL/Container Runtime Configuration: 
-----------------------------------------------------------
--  Runtime ID: plc_python3_shared
--  Linked Docker Image: pivotaldata/plcontainer_python3_shared:devel
--  Runtime Setting(s): 
--  Shared Directory: 
--  ---- Shared Directory From HOST '/usr/local/greenplum-db-6.22.1/bin/plcontainer_clients' to Container '/clientdir', access mode is 'ro'
-----------------------------------------------------------
--
-----------------------------------------------------------
--  Runtime ID: plc_r_shared
--  Linked Docker Image: pivotaldata/plcontainer_r_shared:devel
--  Runtime Setting(s): 
--  Shared Directory: 
--  ---- Shared Directory From HOST '/usr/local/greenplum-db-6.22.1/bin/plcontainer_clients' to Container '/clientdir', access mode is 'ro'
-----------------------------------------------------------
--
-----------------------------------------------------------
--  Runtime ID: plc_python_shared
--  Linked Docker Image: pivotaldata/plcontainer_python_shared:devel
--  Runtime Setting(s): 
--  Shared Directory: 
--  ---- Shared Directory From HOST '/usr/local/greenplum-db-6.22.1/bin/plcontainer_clients' to Container '/clientdir', access mode is 'ro'
-----------------------------------------------------------

 

 

 

이번 포스팅이 많은 도움이 되었기를 바랍니다. 

행복한 데이터 과학자 되세요!

 

728x90
반응형
Posted by Rfriend
,

이번 포스팅에서는 SQL Query 의 실행 순서 (SQL Query order of execution) 에 대해서 알아보겠습니다. 

 

 

1. 왜 SQL Query 실행 순서를 알아야 하는가? (Why does SQL Query Order matter?)

SQL Query 처리 순서는 Query 구문이 평가되는 순서를 정의합니다. Query 의 각 부분은 순차적으로(sequentially) 실행이 되므로, 쿼리의 실행 순서를 이해해야지만 어느 부분에서 무슨 결과에 접근할 수 있는지를 알 수 있습니다. SQL Query 순서를 정확하게 이해하는 것은 자주 접하게 되는 Query 의 도전사항, 혹은 실행되지 않는 Query의 문제를 해결하는데 필수적입니다. 그리고 SQL Query 의 처리성능을 최적화하고 속도를 향상시키는데에도 Query 실행순서를 이해하는게 큰 기여를 할 수 있습니다.  

 

 

 

2. SQL Query 실행 순서 (SQL Query order of execution)

 

아래는 일반적인 SQL Query 의 구문입니다. 

 

--------------------
-- SQL Query Syntax
--------------------
SELECT DISTINCT column, AGG_FUNC(column_or_expression), …
FROM mytable
    JOIN another_table
      ON mytable.column = another_table.column
    WHERE constraint_expression
    GROUP BY column
    HAVING constraint_expression
    ORDER BY column ASC/DESC
    LIMIT count OFFSET COUNT;

 

 

SQL Query 의 실행 순서는 아래와 같습니다. 

 

순서 구문 기능
1   FROM + JOIN 대상이 되는 소스 테이블에서 데이터를 선택하고 Join 함.
2   WHERE 1번에서 선택되고 Join 된 데이터를 Where 조건절에 맞게 필터링함. 
3   GROUP BY Group by 절의 기준 별로 데이터가 집계(aggregate)됨. 
4   HAVING 집계된 데이터를 Having 조건절에 맞게 필터링함. 
5   SELECT 최종 데이터를 반환함. 
6   ORDER BY 최종 데이터를 Order by 절의 변수를 기준으로 정렬(sorting)함. 
7   LIMIT/ OFFSET 행의 개수만큼 반환되는 데이터의 수를 제한(limit)함. 

 

 

아래는 이미지는 SQL Query 처리 순서(by Brij Kishore Pandey, LinkedIn/Twitter @brijpandeyji)를 도식화한 것인데요, 직관적으로 알기 쉽게 풀어놓아서 인용해봅니다. 

 

SQL Queries run in this order, by Brij Kishore Pandey

 

 

(순서 1) FROM + JOIN 절

 

 SQL 의 FROM +JOIN 절은 Query 에서 제일 먼저 실행되는 부분입니다. 따라서 메모리를 많이 사용하는 두 개의 큰 테이블을 Join 하기 전에 대상 테이블의 크기를 제한하거나 미리 집계(pre-agregate tables)를 해놓는다면 성능의 향상을 기대할 수 있습니다.

많은 SQL planner 들은 다른 Query 들의 최적화를 돕기 위해 로직과 다른 유형의 Join을 사용합니다.  가령, 아래의 SQL Query 의 경우, SQL Planner 는 where 절에 있는 조건절인 (table_a 에서 '30 <= age < 40' 로 필터) 를 먼저 실행하고, 그 다음에 Join 을 하는 것이 성능에 유리하고, 결과는 표준 SQL 처리 순서와 동일하게 반환하게 됩니다. 

 

select
 count(*)
from
 table_a as a
join
 table_b as b
on
 a.id =  b.id
where
 a.age >= 30 and a.age < 40

 

PostgreSQL, Greenplum database 에서는 'EXPLAIN' 절을 사용해서 Query Plan 을 볼 수 있습니다. 

 

 

 

(순서 2) WHERE 절

 

WHERE 절은 테이블의 칼럼 값으로 선택되고 Join 된 테이블 데이터를 제한하는데 사용됩니다. Where 절의 조건을 만족하지 않는 개별 행은 제거됩니다. Where 절에는 숫자형, 문자형, 날짜형, 블리언 등 어떤 데이터 유형도 사용가능합니다. 

단, 대부분의 DB에서 Alias 는 사용할 수 없습니다. 

 

 

 

(순서 3) GROUP BY 절

 

GROUP BY 절은 데이터의 각 개별 값들을 GROUP BY 'X' 의 각 그룹별로 sum(), count(), average(), min(), max() 등과 같은 집계 함수(aggregation functions)의 계산된 하나의 결과 값으로 요약해줍니다. 

 

 

 

(순서 4) HAVING 절

 

SQL Query 에 GROUP BY 절이 있다면, HAVING 절의 제약 조건이 그룹별로 집계된 행에 적용되어서, HAVING 절의 조건을 만족시키지 못하는 그룹별 집계된 행(grouped rows) 를 제거하게 됩니다. WHERE 절처럼, 대부분의 DB에서 HAVING 절은 Alias 를 사용할 수 없습니다. 

 

 

(순서 5) SELECT 절 

 

위의 순서 1, 2, 3, 4 를 처리한 후의 데이터에 대해서 드디어 SELECT 절을 실행하여 칼럼 값을 계산하고 가져옵니다. 

 

 

 

(순서 6) ORDER BY 절 

 

위의 순서 1, 2, 3, 4, 5 번이 차례대로 실행된 결과의 데이터에 대해서 ORDER BY 절의 칼럼을 기준으로 오름차순(Ascending order) 또는 내림차순(Descending order) 으로 정렬을 해줍니다.

Query의 SELECT 절이 연산이 끝난 상태이므로, ORDER BY 절에서는 Alias 를 참조할 수 있습니다.   

 

 

 

(순서 7) LIMIT / OFFSET 절

 

마지막으로, LIMIT 과 OFFSET 절을 사용해서 최종으로 반환되는 결과 값에서 행의 개수를 제한합니다. 

 

  * LIMIT number: 숫자 만큼의 행 출력

  * OFFSET number: 숫자의 행부터 출력

 

예) select * from mytable LIMIT 10 OFFSET 5;

      ==> mytable 의 "5번째 행부터 (OFFSET 5)", "10개의 행을 출력(LIMIT 10)"

 

 

 

[Reference]

[1] SQL Query Order of Execution, by Sisense Team
: https://www.sisense.com/blog/sql-query-order-of-operations/

[2] SQL Lesson 12: Order of execution of a Query
: https://sqlbolt.com/lesson/select_queries_order_of_execution

 

 

이번 포스팅이 많은 도움이 되었기를 바랍니다. 

행복한 데이터 과학자 되세요~! :-)

 

 

728x90
반응형
Posted by Rfriend
,

이번 포스팅에서는 PostgreSQL, Greenplum에서 차원이 다른 Array를 2D Array로 Aggregation 할 때, 그룹별 Array의 최대 길이를 계산해서 동적으로 최대길이에 모자란 부분만큼 '0'으로 채워서(padding) 차원을 동일하게 맞춘 후에 2D Array로 Aggregation 하는 방법을 소개하겠습니다. 

 

(1) 가상의 시계열 데이터 생성

(2) 2D Array Aggregation 하는 사용자 정의 함수(UDF) 정의

(3) 차원이 다를 경우 2D Array Aggregation Error 발생
      : cannot concatenate incompatible arrays

(4) 그룹별 Array의 차원을 일치시켜서 2D Array Aggregation 하는 방법

 

 

 

 

 

(1) 가상의 시계열 데이터 생성

 

먼저, 예제로 사용할 시계열 데이터로 이루어진 테이블을 만들어보겠습니다.

10의 ID 그룹이 있고, 각 ID별로 20개의 param_id 그룹이 있으며, 각 param_id별로 또 20개의 ts(TimeStamp)별로 측정값(measure_val)이 있는 시계열 데이터셋을 가진 테이블입니다. 

 

이때 무작위로 생성한 난수가 0.5보다 작을 경우는 난수를 측정값으로 사용하고, 그렇지 않을 경우는 결측값(NULL)으로 해서 measure_val 칼럼을 만들어주었습니다. 

 

-- (1) create sample dataset table
DROP TABLE IF EXISTS my_tbl;
CREATE TABLE my_tbl AS (
    SELECT 
        d.*
        -- about 50% NULL, 50% measured values
        , CASE WHEN random() < 0.5 THEN round(random()::numeric, 2) ELSE NULL 
            END AS measure_val 
    FROM (
        SELECT 
            a.*    -- id
            , b.*  -- param_id
            , c.* -- ts (TimeStamp)
        FROM 
            (SELECT generate_series(1, 10) AS id) AS a 
        CROSS JOIN 
            (SELECT generate_series(1, 20) AS param_id) AS b
        CROSS JOIN 
            (SELECT generate_series(1, 20) AS ts) AS c
    ) AS d
);

-- the measure_vals will be different due to random number generation
SELECT * FROM my_tbl ORDER BY 1, 2, 3 LIMIT 10;
--id|param_id|ts|measure_val|
----+--------+--+-----------+
-- 1|       1| 1|       0.93|
-- 1|       1| 2|           |
-- 1|       1| 3|       0.87|
-- 1|       1| 4|       0.41|
-- 1|       1| 5|       0.57|
-- 1|       1| 6|           |
-- 1|       1| 7|       0.60|
-- 1|       1| 8|       0.67|
-- 1|       1| 9|       0.02|
-- 1|       1|10|       0.21|


-- total number of rows: 10 * 20 * 20 = 4,000
SELECT count(1) AS row_cnt FROM my_tbl; 
--row_cnt|
---------+
--   4000|

 

 

 

PostgreSQL 에 있는 array_agg() 함수를 사용해서 id별, param_id별 그룹을 기준으로 param_id, ts, measure_val을 1D array로 aggregation 해보면 아래와 같습니다. 이때 조건절에 "WHERE measure_val IS NOT NULL"을 부여해서 결측값을 제거해주면 각 id, param_id 그룹별로 array 내 원소(element)의 개수가 서로 달라지게 됩니다.  (<-- 이 부분이 나중에 (3)번에서 1D array를 2D array로 aggregation 할 때 1D array의 차원이 달라서 2D array로 묶을 수 없다는 에러를 발생시키게 됩니다.)

 

-- 1D array aggregation using array_agg() function
SELECT 
    id
    , param_id
    , array_agg(param_id) AS param_id_arr
    , array_agg(ts ORDER BY ts) AS ts_arr 
    , array_agg(measure_val ORDER BY ts) AS measure_val_arr
FROM my_tbl
WHERE measure_val IS NOT NULL 
GROUP BY 1, 2
ORDER BY 1, 2
LIMIT 5

 

 

 

 

(2) 2D Array Aggregation 하는 사용자 정의 함수(UDF) 정의

 

PostgreSQL에는 1D array를 2D array로 묶어주는 내장함수가 없으므로 아래와 같이 사용자 정의 함수(User Defined Function)를 만들고, Select 문으로 사용자 정의함수를 호출해서 사용하면 됩니다. 

 

인풋으로 사용하는 데이터 유형별로 각각 사용자 정의함수를 정의해주는데요, 아래는 정수(Integer) 데이터 타입에 대한 1D array를 2D array로 묶어주는 사용자 정의함수입니다. 

 

-- UDF for 2D array aggregation for INTEGER
DROP FUNCTION IF EXISTS array_append_2d_int(integer[][], integer[]) CASCADE;
CREATE OR REPLACE FUNCTION array_append_2d_int(integer[][], integer[])
    RETURNS integer[][]
    LANGUAGE SQL
    AS 'select array_cat($1, ARRAY[$2])'
    IMMUTABLE
;

DROP AGGREGATE IF EXISTS array_agg_array_int(integer[]) CASCADE;
CREATE ORDERED AGGREGATE array_agg_array_int(integer[])
(
    SFUNC = array_append_2d_int
    , STYPE = integer[][]
);

 

 

 

아래는 NUMERIC 데이터 유형을 원소로 가지는 1D array를 2D array 로 묶어주는 사용자 정의 함수입니다. 

 

-- UDF for 2D array aggregation (NUMERIC)
DROP FUNCTION IF EXISTS array_append_2d_numeric(NUMERIC[][], NUMERIC[]) CASCADE;
CREATE OR REPLACE FUNCTION array_append_2d_numeric(NUMERIC[][], NUMERIC[])
    RETURNS NUMERIC[][]
    LANGUAGE SQL
    AS 'select array_cat($1, ARRAY[$2])'
    IMMUTABLE
;

DROP AGGREGATE IF EXISTS array_append_2d_numeric(NUMERIC[]) CASCADE;
CREATE ORDERED AGGREGATE array_append_2d_numeric(NUMERIC[])
(
    SFUNC = array_append_2d_numeric
    , STYPE = NUMERIC[][]
);

 

 

 

 

(3) 차원이 다를 경우 2D Array Aggregation Error 발생
      : cannot concatenate incompatible arrays

 

아래의 SQL Query 처럼 각 Array의 원소(element) 개수가 다를 경우, 이를 2D Array로 합치려고(concatenation) 하면 Array의 차원이 일치하지 않는다는(incompatible) 에러가 발생합니다. 

 

"SQL Error [2202E]: ERROR: cannot concatenate incompatible arrays  (seg4 slice1 10.0.1.238:6004 pid=7073)
  Detail: Arrays with differing element dimensions are not compatible for concatenation.
  Where: SQL function "array_append_2d_numeric" statement 1"

 

--SQL Error [2202E]: ERROR: cannot concatenate incompatible arrays  (seg0 slice1 10.0.1.238:6000 pid=30201)
--  Detail: Arrays with differing element dimensions are not compatible for concatenation.
--  Where: SQL function "array_append_2d_numeric" statement 1

SELECT 
	a.id
	, array_agg_array_int(param_id_arr ORDER BY param_id) AS param_id_arr2d
	, array_agg_array_int(ts_arr ORDER BY param_id) AS ts_arr2d
	, array_agg_array_numeric(measure_val_arr ORDER BY param_id) AS measure_val_arr2d
FROM (
	SELECT 
		id
		, param_id
		, array_agg(param_id) AS param_id_arr
		, array_agg(ts ORDER BY ts) AS ts_arr 
		, array_agg(measure_val ORDER BY ts) AS measure_val_arr
	FROM my_tbl
	WHERE measure_val IS NOT NULL 
	GROUP BY 1, 2
) AS a 
GROUP BY 1;

-- it eraises an ERROR: cannot concatenate incompatible arrays

 

 

 

(4) 그룹별 Array의 차원을 일치시켜서 2D Array Aggregation 하는 방법

 

1D array의 원소 개수가 서로 달라서 2D array 로 묶을 수 없을 경우(ERROR: cannot concatenate incompatible arrays), 아래의 절차를 따라서 각 그룹별 1D array의 차원을 일치시킨 후에 2D array로 묶을 수 있습니다. 

 

(a) 묶고자 하는 기준이 되는 그룹별로 array_agg() 함수를 사용해서 1D array로 묶고, 

(b) 그룹별로 1D array들의 각 원소 개수를 세어서 그룹별로 1D array의 최대 길이 (arr_max_length)를 계산하고, 

(c) 그룹별로 1D array의 최대 길이 모자라는 개수만큼 '0'으로 각 1D Array를 채워서(padding) 차원을 일치시킨 후에, 

(d) 그룹별로 1D array를 2D array로 (2)번에서 정의한 사용자 정의함수를 사용해서 묶어주기

 

-- Dynamic 2D array aggregation for arrays with NULL and different dimentions
DROP TABLE IF EXISTS my_tbl_2d_arr;
CREATE TABLE my_tbl_2d_arr AS (
WITH t AS (
	SELECT 
		id
		, param_id
		, array_agg(param_id ORDER BY param_id, ts) 
            AS param_id_arr
		, array_agg(ts ORDER BY param_id, ts) 
            AS ts_arr
		, array_agg(measure_val ORDER BY param_id, ts) 
            AS measure_val_arr
	FROM 	my_tbl
		WHERE measure_val IS NOT NULL
		GROUP BY 1, 2
), t2 AS (
	SELECT 
		id 
		, max(array_length(measure_val_arr, 1)) 
            AS arr_max_length
	FROM t
	GROUP BY 1
), t_padded AS (
	SELECT 
		t.id
		, t.param_id
		, array_cat(
			t.param_id_arr, 
			array_fill(NULL::INTEGER, ARRAY[arr_max_length 
            	        - COALESCE(array_length(t.param_id_arr, 1), 0)])
			) AS param_id_arr_padded
		, array_cat(
			t.ts_arr, 
			array_fill(NULL::INTEGER, ARRAY[arr_max_length 
            	        - COALESCE(array_length(t.ts_arr, 1), 0)])
			) AS ts_arr_padded
		, array_cat(
			t.measure_val_arr, 
			array_fill(NULL::NUMERIC, ARRAY[arr_max_length 
            	        - COALESCE(array_length(t.measure_val_arr, 1), 0)])
			) AS measure_val_arr_padded
	FROM t, t2
	WHERE t.id = t2.id
) 
SELECT 
	a.id
	, array_agg_array_int(param_id_arr_padded ORDER BY param_id) 
    	AS param_id_arr2d
	, array_agg_array_int(ts_arr_padded ORDER BY param_id) 
    	AS ts_arr2d
	, array_agg_array_numeric(measure_val_arr_padded ORDER BY param_id) 
    	AS measure_val_arr2d
FROM t_padded AS a 
GROUP BY 1
);


SELECT * FROM my_tbl_2d_arr ORDER BY 1 LIMIT 5;

 

 

다음 포스팅에서는 그룹별로 묶어놓은 2D array를 그룹별 1D array 로 unnest 하는 방법(https://rfriend.tistory.com/728)를 참고하세요. 

 

이번 포스팅이 많은 도움이 되었기를 바랍니다. 

행복한 데이터 과학자 되세요!  :-)

 

728x90
반응형
Posted by Rfriend
,

시계열 데이터를 분석할 때 제일 처음 확인하고 처리하는 일이 결측값(missing values) 입니다. 이번 포스팅에서는 시계열 데이터의 결측값을 선형 보간(linear interpolation)하는 2가지 방법을 소개하겠습니다. 

 

(1) Python 으로 결측값을 포함하는 예제 시계열 데이터 생성하기

(2) Python 의 for loop 순환문으로 시계열 데이터 결측값을 보간하기

      (interpolation sequentially using Python for loop statement)

(3) Greenplum에서 PL/Python으로 분산병렬처리하여 시계열 데이터 결측값을 보간하기

     (interpolation in parallel using PL/Python on Greenplum)

 

 

 

(1) Python 으로 결측값을 포함하는 예제 시계열 데이터 생성하기

 

샘플 시계열 데이터를 저장할 폴더를 base_dir 에 지정해주었습니다.

 

분석 단위로 사용할 Lot, Cell, Parameter, TimeStamp 의 개수를 지정해 줍니다. 아래 예에서는 각 Lot, Cell, Parameter 별로 100개의 TimeStamp 별로 측정값을 수집하고, 각 분석 단위별로 10개의 결측값을 포함하도록 설정했습니다. 

 

np.random.normal(10, 30, ts_num) 
: 측정값은 정규분포 X~N(10, 3) 의 분포로 부터 ts_num 인 100개를 난수 생성하여 만들었습니다. 

 

nan_mask = np.random.choice(np.arange(ts_num), missing_num)
ts_df_tmp.loc[nan_mask, 'measure_val'] = np.nan

: 각 분석 단위의 100 개의 측정치 중에서 무작위로 missing_num = 10 개를 뽑아서 np.nan 으로 교체하여 결측값으로 변경하였습니다.  

 

하나의 Lot에 Cell 100개, 각 Cell별 Parameter 10개, 각 Parameter 별 TimeStamp의 측정치 100개, 이중 결측치 10개를 포함한 시계열 데이터를 Lot 별로 묶어서(concat) DataFrame을 만들고, 이를 CSV 파일로 내보냅니다. 

 

#%% setting the directories and conditions
base_dir = '/Users/Documents/ts_data/'

## setting the number of IDs' conditions 
lot_num = 1000
cell_num = 100
param_num = 10
missing_num = 10
ts_num = 100 # number of TimeStamps


#%% [Step 1] generating the sample dataset

import numpy as np
import pandas as pd
import os
from itertools import chain, repeat

## defining the UDF
def ts_random_generator(lot_id, cell_num, param_num, ts_num, missing_num, base_dir):
    # blank DataFrame for saving the sample datasets later
    ts_df = pd.DataFrame()
    
    for cell_id in np.arange(cell_num):
        for param_id in np.arange(param_num):
            # making a DataFrame with colums of lot_id, cell_cd, param_id, ts_id, and measure_val
            ts_df_tmp = pd.DataFrame({
                'lot_id': list(chain.from_iterable(repeat([lot_id + 1], ts_num))), 
                'cell_id': list(chain.from_iterable(repeat([cell_id + 1], ts_num))), 
                'param_id': list(chain.from_iterable(repeat([param_id + 1], ts_num))), 
                'timestamp_id': (np.arange(ts_num) + 1), 
                'measure_val': np.random.normal(10, 3, ts_num)# X~N(mean, stddev, size)
            })
            
            # inserting the missing values randomly
            nan_mask = np.random.choice(np.arange(ts_num), missing_num)
            ts_df_tmp.loc[nan_mask, 'measure_val'] = np.nan
            
            # concatenate the generated random dataset(ts_df_tmp) to the lot based DataFrame(ts_df) 
            ts_df = pd.concat([ts_df, ts_df_tmp], axis=0)
    
    # exporting the DataFrame to local csv file
    base_dir = base_dir
    file_nm = 'lot_' + \
        str(lot_id+1).zfill(4) + \
        '.csv'
        
    ts_df.to_csv(os.path.join(base_dir, file_nm), index=False)
    #ts_df.to_csv('/Users/lhongdon/Documents/SK_ON_PoC/ts_data/lot_0001.csv')
    
    print(file_nm, "is successfully generated.") 



#%% Executing the ts_random_generator UDF above

## running the UDF above using for loop statement
for lot_id in np.arange(lot_num):
    ts_random_generator(
        lot_id, 
        cell_num, 
        param_num, 
        ts_num, 
        missing_num, 
        base_dir
        )

 

 

 

위의 코드를 실행하면 for loop 순환문이 lot_num 수만큼 돌면서 ts_random_generator() 사용자 정의함수를 실행시키면서 결측값을 포함한 시계열 데이터 샘플 CSV 파일을 생성하여 지정된 base_dir 폴더에 저장을 합니다. 

(아래 화면 캡쳐 참조)

 

sample time series data list

 

 

아래의 화면캡쳐는 결측값을 포함하는 시계열 데이터 샘플 중에서 LOT_0001 번의 예시입니다. 

 

time series data sample with missing values

 

 

 

 

(2) Python 의 for loop 순환문으로 시계열 데이터 결측값을 보간하기

      (interpolation sequentially using Python for loop statement)

 

아래 코드는 Python으로 Lot, Cell, Parameter ID 별로 for loop 순환문을 사용해서 pandas 의 interpolate() 메소드를 사용해서 시계열 데이터의 결측값을 선형 보간(linear interpolation) 한 것입니다. 

(forward fill 로 먼저 선형 보간을 해주고, 그 다음에 만약에 첫번째 행에 결측값이 있을 경우에 backward fill 로 이후 값과 같은 값으로 결측값을 채워줍니다.)

 

순차적으로 for loop 순환문을 돌기 때문에 시간이 오래 걸립니다.

 

#%% [Step 2] linear interpolation
from datetime import datetime
start_time = datetime.now()


## reading csv files in the base_dir
file_list = os.listdir(base_dir)

for file_nm in file_list:
    # by Lot
    if file_nm[-3:] == "csv":
        # read csv file
        ts_df = pd.read_csv(os.path.join(base_dir, file_nm))
        
        # blank DataFrame for saving the interpolated time series later
        ts_df_interpolated = pd.DataFrame()
        
        # cell & param ID lists
        cell_list = np.unique(ts_df['cell_id'])
        param_list = np.unique(ts_df['param_id'])
        
        # interpolation by lot, cell, and param IDs
        for cell_id in cell_list:
           for param_id in param_list:
               ts_df_tmp = ts_df[(ts_df.cell_id == cell_id) & (ts_df.param_id == param_id)]
               
               ## interpolating the missing values for equaly spaced time series data
               ts_df_tmp.sort_values(by='timestamp_id', ascending=True) # sorting by TimeStamp first
               ts_df_interpolated_tmp = ts_df_tmp.interpolate(method='values') # linear interploation
               ts_df_interpolated_tmp = ts_df_interpolated_tmp.fillna(method='bfill') # backward fill for the first missing row
               
               ts_df_interpolated = pd.concat([ts_df_interpolated, ts_df_interpolated_tmp], axis=0)
        
        # export DataFrame to local folder as a csv file
        ts_df_interpolated.to_csv(os.path.join(interpolated_dir, file_nm), index=False)
        
        print(file_nm, "is successfully interpolated.")


time_elapsed = datetime.now() - start_time
print("----------" * 5)
print("Time elapsed (hh:mm:ss.ms) {}".format(time_elapsed))
print("----------" * 5)

 

# # Before interplolation
# 3,1,1,20,11.160795506036791
# 3,1,1,21,8.155949904188175
# 3,1,1,22,3.1040644143505407
# 3,1,1,23,                   <-- missing
# 3,1,1,24,                   <-- missing
# 3,1,1,25,11.020504352275342
# 3,1,1,26,                   <-- missing
# 3,1,1,27,8.817922501760519
# 3,1,1,28,10.673174873272234
# 3,1,1,29,6.584669096660191
# 3,1,1,30,13.442427337943553

# # After interpolation
# 3,1,1,20,11.160795506036791
# 3,1,1,21,8.155949904188175
# 3,1,1,22,3.1040644143505407
# 3,1,1,23,5.742877726992141  <-- interpolated
# 3,1,1,24,8.381691039633742  <-- interpolated
# 3,1,1,25,11.020504352275342
# 3,1,1,26,9.919213427017931  <-- interpolated
# 3,1,1,27,8.81792250176052
# 3,1,1,28,10.673174873272234
# 3,1,1,29,6.584669096660191
# 3,1,1,30,13.442427337943554

 

 

아래 화면캡쳐는 선형보간하기 전에 결측값이 있을 때와, 이를 선형보간으로 값을 생성한 후의 예시입니다. 

 

linear interpolation for missing data in time series

 

 

 

아래 선 그래프의 파란색 점 부분이 원래 값에서는 결측값 이었던 것을 선형 보간(linear interpolation)으로 채워준 후의 모습입니다. 선형보간이므로 측정된 값으로 선형회귀식을 적합하고, 결측값 부분의 X 값을 입력해서 Y를 예측하는 방식으로 결측값을 보간합니다. 

 

linear interpolation of missing values in time series

 

 

 

아래 코드는 데이터가 Greenplum DB에 적재되어 있다고 했을 때, 

 (2-1) Python으로 Greenplum DB에 access하여 데이터를 Query 해와서 pandas DataFrame으로 만들고 

 (2-2) Pytnon pandas 의 interpolate() 메소드를 사용해서 선형보간을 한 다음에 

 (2-3) 선형보간된 DataFrame을 pandas 의 to_sql() 메소드를 사용해서 다시 Greenplum DB에 적재

하는 코드입니다. 이를 for loop 순환문을 사용해서 Lot 의 개수만큼 실행시켜 주었습니다. 

 

순차적으로 for loop 순환문을 돌기 때문에 시간이 오래 걸립니다.

 

#%% Greenplum credentials
user = 'username' 
password = 'password' 
host = 'ip_address'
port = 'port'
db = 'databasename'

connection_string = "postgresql://{user}:{password}@{host}:{port}/{db}".\
        format(user=user, 
               password=password, 
               host=host, 
               port=port,
               db=db)

#%%
# helper function: query to pandas DataFrame
def gpdb_query(query):
    import psycopg2 as pg
    import pandas as pd
    
    conn = pg.connect(connection_string)
    cursor = conn.cursor()
    
    cursor.execute(query)
    col_names = [desc[0] for desc in cursor.description]
    
    result_df = pd.DataFrame(cursor.fetchall(), columns=col_names)
    
    cursor.close()
    conn.close()
    
    return result_df


#%% 
# UDF for running a query

def interpolator(lot_id):
    
    #import pandas as pd
    
    query = """
        SELECT * 
        FROM ts_data 
        WHERE 
            lot_id = {lot_id}
    """.format(
            lot_id = lot_id)
    
    ts_df = gpdb_query(query)
    ts_df = ts_df.astype({
        'measure_val': float
        })
    
    ## interpolating the missing values for equaly spaced time series data
    ts_df_interpolated = pd.DataFrame()
    
    for cell_id in (np.arange(cell_num)+1):
        for param_id in (np.arange(param_num)+1):
            ts_df_tmp = ts_df[(ts_df.cell_id == cell_id) & (ts_df.param_id == param_id)]
    
            ts_df_tmp.sort_values(by='timestamp_id', ascending=True) # sorting by TimeStamp first
            ts_df_interpolated_tmp = ts_df_tmp.interpolate(method='values') # linear interploation
            ts_df_interpolated_tmp = ts_df_interpolated_tmp.fillna(method='bfill') # backward fill for the first missing row
            
            ts_df_interpolated = pd.concat([ts_df_interpolated, ts_df_interpolated_tmp], axis=0)
    
    # export DataFrame to local folder as a csv file
    #ts_df_interpolated.to_csv(os.path.join(interpolated_dir, file_nm), index=False)        
    #print(file_nm, "is successfully interpolated.")
    
    return ts_df_interpolated



#%% 
# UDF for importing pandas DataFrame to Greenplum DB
def gpdb_importer(lot_id, connection_string):
    
    import sqlalchemy
    from sqlalchemy import create_engine
    
    engine = create_engine(connection_string)
    
    # interpolation
    ts_data_interpolated = interpolator(lot_id)
    
    # inserting to Greenplum    
    ts_data_interpolated.to_sql(
        name = 'ts_data_interpolated_python', 
        con = engine, 
        schema = 'equipment', 
        if_exists = 'append', 
        index = False, 
        dtype = {'lot_id': sqlalchemy.types.INTEGER(), 
                 'cell_id': sqlalchemy.types.INTEGER(), 
                 'param_id': sqlalchemy.types.INTEGER(),
                 'timestamp_id': sqlalchemy.types.INTEGER(), 
                 'measure_val': sqlalchemy.types.Float(precision=6)
                 })
    

#%%
from datetime import datetime
start_time = datetime.now()

import pandas as pd
import os
import numpy as np

for lot_id in (np.arange(lot_num)+1):
    gpdb_importer(lot_id, connection_string)
    print("lot_id", lot_id, "is successfully interpolated.")

time_elapsed = datetime.now() - start_time
print("----------" * 5)
print("Time elapsed (hh:mm:ss.ms) {}".format(time_elapsed))
print("----------" * 5)

 

 

 

 

(3) Greenplum에서 PL/Python으로 분산병렬처리하여 시계열 데이터 결측값을 보간하기

     (interpolation in parallel using PL/Python on Greenplum)

Greenplum에서 PL/Python으로 병렬처리할 때는 (a) 사용자 정의 함수(UDF) 정의, (b) 사용자 정의 함수 실행의 두 단계를 거칩니다. 

 

Greenplum DB에서 PL/Python으로 분산병렬처리를 하면 위의 (2)번에서 Python으로 for loop 순환문으로 순차처리한 것 대비 Greenplum DB 내 노드의 개수에 비례하여 처리 속도가 줄어들게 됩니다. (가령, 노드가 8개이면 병렬처리의 총 처리 소요시간은 순차처리했을 때의 총 소요시간의 1/8 로 줄어듭니다.) 

 

 

parallel processing using PL/Python on Greenplum DB

 

 

(3-1) PL/Python 으로 시계열 데이터 결측값을 선형보간하는 사용자 정의함수 정의 (define a UDF)

 

-- defining the PL/Python UDF
DROP FUNCTION IF EXISTS plpy_interp(numeric[]);
CREATE OR REPLACE FUNCTION plpy_interp(measure_val_arr numeric[]) 
RETURNS numeric[]
AS $$
	import numpy as np
	import pandas as pd
	
	measure_val = np.array(measure_val_arr, dtype='float')
	
	ts_df = pd.DataFrame({
	   'measure_val': measure_val
	    })
	
	# interpolation by lot, cell, and param IDs               
	ts_df_interpolated = ts_df.interpolate(method='values') # linear interploation
	ts_df_interpolated = ts_df_interpolated.fillna(method='bfill') # backward fill for the first missing row
	
	return ts_df_interpolated['measure_val']
	        
$$ LANGUAGE 'plpythonu';

 

 

 

(3-2) 위에서 정의한 시계열 데이터 결측값을 선형보간하는 PL/Python 사용자 정의함수 실행

 

 

PL/Python의 input 으로는 SQL의 array_agg() 함수를 사용해서 만든 Array 데이터를 사용하며, PL/Python에서는 SQL의 Array를 Python의 List 로 변환(converting) 합니다. 

 

-- array aggregation as an input
DROP TABLE IF EXISTS tab1;
CREATE TEMPORARY TABLE tab1 AS
		SELECT 
			lot_id
			, cell_id
			, param_id
			, ARRAY_AGG(timestamp_id ORDER BY timestamp_id) AS timestamp_id_arr
			, ARRAY_AGG(measure_val ORDER BY timestamp_id) AS measure_val_arr
		FROM ts_data
		GROUP BY lot_id, cell_id, param_id
DISTRIBUTED RANDOMLY ;
		
ANALYZE tab1;		


-- executing the PL/Python UDF
DROP TABLE IF EXISTS ts_data_interpolated;
CREATE TABLE ts_data_interpolated AS (
	SELECT 
		lot_id 
		, cell_id 
		, param_id 
		, timestamp_id_arr
		, plpy_interp(measure_val_arr) AS measure_val_arr -- plpython UDF
	FROM tab1 AS a 
) DISTRIBUTED BY (lot_id);

 

 

 

아래 코드는 numeric array 형태로 반환한 선형보간 후의 데이터를 unnest() 함수를 사용해서 보기에 편하도록 long format 으로 풀어준 것입니다. 

 

-- display the interpolated result
SELECT 
	lot_id
	, cell_id 
	, param_id
	, UNNEST(timestamp_id_arr) AS timestamp_id
	, UNNEST(measure_val_arr) AS measure_val
FROM ts_data_interpolated
WHERE lot_id = 1 AND cell_id = 1 AND param_id = 1
ORDER BY lot_id, cell_id, param_id, timestamp_id
LIMIT 100;

 

 

결측값을 포함하고 있는 원래 데이터셋을 아래 SQL query 로 조회해서, 위의 선형보간 된 후의 데이터셋과 비교해볼 수 있습니다. 

 

-- original dataset with missing value
SELECT 
	lot_id
	, cell_id 
	, param_id
	, timestamp_id
	, measure_val
FROM ts_data
WHERE lot_id = 1 AND cell_id = 1 AND param_id = 1
ORDER BY lot_id, cell_id, param_id, timestamp_id
LIMIT 100;

 

 

이번 포스팅이 많은 도움이 되었기를 바랍니다. 

행복한 데이터 과학자 되세요!  :-)

 

728x90
반응형
Posted by Rfriend
,

이전 포스팅에서 스펙트럼 분석(spectrum analysis, spectral analysis, frequency domain analysis) 에 대해서 소개하였습니다. ==> https://rfriend.tistory.com/690 참조

 

이번 포스팅에서는 Greenplum 에서 PL/Python (Procedural Language)을 활용하여 여러개 그룹의 시계열 데이터에 대해 스펙트럼 분석을 분산병렬처리하는 방법을 소개하겠습니다. (spectrum analysis in parallel using PL/Python on Greenplum database)

 

 

spectrum analysis in parallel using PL/Python on Greenplum

 

 

(1) 다른 주파수를 가진 3개 그룹의 샘플 데이터셋 생성

 

먼저, spectrum 모듈에서 data_cosine() 메소드를 사용하여 주파수(frequency)가 100, 200, 250 인 3개 그룹의 코사인 파동 샘플 데이터를 생성해보겠습니다. (노이즈 A=0.1 만큼이 추가된 1,024개 코사인 데이터 포인트를 가진 예제 데이터) 

 

## generating 3 cosine signals with frequency of (100Hz, 200Hz, 250Hz) respectively
## buried in white noise (amplitude 0.1), a length of N=1024 and the sampling is 1024Hz.
from spectrum import data_cosine

data1 = data_cosine(N=1024, A=0.1, sampling=1024, freq=100)
data2 = data_cosine(N=1024, A=0.1, sampling=1024, freq=200)
data3 = data_cosine(N=1024, A=0.1, sampling=1024, freq=250)

 

 

 

다음으로 Python pandas 모듈을 사용해서 'grp' 라는 칼럼에 'a', 'b', 'c'의 구분자를 추가하고, 'val' 칼럼에는 위에서 생성한 각기 다른 주파수를 가지는 3개의 샘플 데이터셋을 값으로 가지는 DataFrame을 생성합니다. 

 

## making a pandas DataFrame with a group name
import pandas as pd

df1 = pd.DataFrame({'grp': 'a', 'val': data1})
df2 = pd.DataFrame({'grp': 'b', 'val': data2})
df3 = pd.DataFrame({'grp': 'c', 'val': data3})

df = pd.concat([df1, df2, df3])


df.shape
# (3072, 2)


df.head()
# 	grp	val
# 0	a	1.056002
# 1	a	0.863020
# 2	a	0.463375
# 3	a	-0.311347
# 4	a	-0.756723

 

 

 

sqlalchemy 모듈의 create_engine("driver://user:password@host:port/database") 메소드를 사용해서 Greenplum 데이터베이스에 접속한 후에 pandas의 DataFrame.to_sql() 메소드를 사용해서 위에서 만든 pandas DataFrame을 Greenplum DB에 import 하도록 하겠습니다. 

 

이때 index = True, indx_label = 'id' 를 꼭 설정해주어야만 합니다. 그렇지 않을 경우 Greenplum DB에 데이터가 import 될 때 시계열 데이터의 특성이 sequence 가 없이 순서가 뒤죽박죽이 되어서, 이후에 스펙트럼 분석을 할 수 없게 됩니다. 

 

## importing data to Greenplum using pandas 
import sqlalchemy
from sqlalchemy import create_engine

# engine = sqlalchemy.create_engine("postgresql://user:password@host:port/database")
engine = create_engine("postgresql://user:password@ip:5432/database") # set with yours

df.to_sql(name = 'data_cosine', 
          con = engine, 
          schema = 'public', 
          if_exists = 'replace', # {'fail', 'replace', 'append'), default to 'fail'
          index = True, 
          index_label = 'id', 
          chunksize = 100, 
          dtype = {
              'id': sqlalchemy.types.INTEGER(), 
              'grp': sqlalchemy.types.TEXT(), 
              'val': sqlalchemy.types.Float(precision=6)
          })
          
          
          
SELECT * FROM data_cosine order by grp, id LIMIT 5;
# id	grp	val
# 0	a	1.056
# 1	a	0.86302
# 2	a	0.463375
# 3	a	-0.311347
# 4	a	-0.756723


SELECT count(1) AS cnt FROM data_cosine;
# cnt
# 3072

 

 

 

(2) 스펙트럼 분석을 분산병렬처리하는 PL/Python 사용자 정의 함수 정의 (UDF definition)

 

아래의 스펙트럼 분석은 Python scipy 모듈의 signal() 메소드를 사용하였습니다. (spectrum 모듈의 Periodogram() 메소드를 사용해도 동일합니다. https://rfriend.tistory.com/690  참조) 

(Greenplum database의 master node와 segment nodes 에는 numpy와 scipy 모듈이 각각 미리 설치되어 있어야 합니다.)

 

사용자 정의함수의 인풋으로는 (a) 시계열 데이터 array 와 (b) sampling frequency 를 받으며, 아웃풋으로는 스펙트럼 분석을 통해 추출한 주파수(frequency, spectrum)를 텍스트(혹은 int)로 반환하도록 하였습니다. 

 

DROP FUNCTION IF EXISTS spectrum_func(float8[], int);
CREATE OR REPLACE FUNCTION spectrum_func(x float8[], fs int) 
RETURNS text 
AS $$
    from scipy import signal
    import numpy as np
    
    # x: Time series of measurement values
    # fs: Sampling frequency of the x time series. Defaults to 1.0.
    f, PSD = signal.periodogram(x, fs=fs)
    freq = np.argmax(PSD)
    
    return freq
    
$$ LANGUAGE 'plpythonu';

 

 

 

(3) 스펙트럼 분석을 분산병렬처리하는 PL/Python 사용자 정의함수 실행
       (Execution of Spectrum Analysis in parallel on Greenplum)

 

위의 (2)번에서 정의한 스펙트럼 분석 PL/Python 사용자 정의함수 spectrum_func(x, fs) 를 Select 문으로 호출하여 실행합니다. FROM 절에는 sub query로 input에 해당하는 시계열 데이터를 ARRAY_AGG() 함수를 사용해 array 로 묶어주는데요, 이때 ARRAY_AGG(val::float8 ORDER BY id) 로 id 기준으로 반드시 시간의 순서에 맞게 정렬을 해주어야 제대로 스펙트럼 분석이 진행이 됩니다

 

SELECT 
    grp
    , spectrum_func(val_agg, 1024)::int AS freq
FROM (
    SELECT 
        grp
        , ARRAY_AGG(val::float8 ORDER BY id) AS val_agg
    FROM data_cosine
    GROUP BY grp
) a
ORDER BY grp; 

# grp	freq
# a	100
# b	200
# c	250

 

 

우리가 (1)번에서 주파수가 각 100, 200, 250인 샘플 데이터셋을 만들었는데요, 스펙트럼 분석을 해보니 정확하게 주파수를 도출해 내었네요!. 

 

이번 포스팅이 많은 도움이 되었기를 바랍니다. 

행복한 데이터 과학자 되세요!  :-)

 

728x90
반응형
Posted by Rfriend
,