[DevOps] ElasticSearch to CSV export

[DevOps] ElasticSearch to CSV export

안녕하세요? 정리하는 개발자 워니즈입니다. 이번시간에는 ElasticSearch의 데이터를 CSV로 export 하는 내용에 대해서 정리를 해보겠습니다.

필자는 DevOps 업무를 하고 있습니다. 그러다보니 최대한 자동화를 통해서 업무를 효율적으로 하는 부분에 대해서도 신경을 많이 쓰고 있습니다.

어느날인가 개발팀분이 저에게 DM을 주셔서 elasticsearch to csv가 가능한지 문의가 들어왔습니다. 하지만 필자도 그동안 ES에 대한 데이터 적재 및 대시보드 구성만 해봤었고 export 시키는 내용에 대해서는 정확하게 알지를 못했습니다 .

ES도 결국 DB와 같은 데이터 record를 적재하고 있고, 이런 부분들을 query를 통해서 충분히 가져올 수 있을 것이라고 생각했습니다.

관련해서 위의 내용을 정리해보도록 하겠습니다.

1. ES query 호출해보기

우선 간단하게나마 ES를 호출해보기로했습니다. 그러기 위해서는 ES에 query를 하는 방식을 이해해야 합니다. 필자가 요구하는 사항은 각 Index별로 전체의 데이터를 가져오는 것이기 때문에 전체 조회 query를 확인했습니다.

  • Match_all / match_nonde

match_all 쿼리는 지정된 index의 모든 document를 검색하는 방법입니다.즉, 특별한 검색어 없이 모든 document를 가져오고 싶을 때 사용합니다.SQL로 치면 WHERE 절이 없는 SELECT문과 같습니다.

{  
   "query":{  
      "match_all":{}
   }
}

그런데 위처럼 조회를 하면 결과가 전체 records를 가져오지 않고 return 하는 records가 전체보다는 작은 수치였습니다.
Elasticsearch query to return all records

위의 내용을 확인하여, size값을 추가하기로 했습니다.

{  
   "size": 1000,
   "query":{  
      "match_all":{}
   }
}

인덱스별로 최대 1000을 넘지 않는것을 확인 한 뒤, 위와 같이 size값을 주어, 전체 레코즈를 가져올 수 있도록 설정했습니다.

2. python을 활용한 호출부 구현

위의 내용으로 하나의 Index에 대해서 전체 데이터를 가져오는 부분을 구현해보기로했습니다.

    headers = {
        'authority': '{ES 주소}',
        'authorization': 'Basic {계정 정보}',
        'content-type': 'application/json',
        'sec-fetch-site': 'same-origin',
        'sec-fetch-mode': 'cors',
        'accept-language': 'ko-KR,ko;q=0.9,en-US;q=0.8,en;q=0.7',
    }
    params = (
        ('path', index+'/_search'),
        ('method', 'POST'),
    )

    ## query 요청 
    json_data = {
        'size': 1000,
        'query': {
            'match_all': {},
        },
    }
    response = requests.post('{ES 주소}/api/console/proxy', headers=headers, params=params, json=json_data)

위와 같이 호출하니 정상적으로 데이터를 받아왔습니다. 위의 내용을 함수로 구현하여 인덱스에 대해서 레코즈를 가져오는 부분으로 구현했습니다.

3. ES Index list 가져오기

상위 작업에서 인덱스를 파라미터로 넣으면, 전체 레코즈를 가져오도록 설계를 했습니다. 이제는 전체 ES 리스트를 조회하고 파라미터를 준비하기로했습니다.

ES의 query builder에서 DEV Tools를 통해서 전체 index를 가져오는 내용을 호출했습니다.

GET /_cat/indices?v

위와 같이 호출을 하게 되면, 마치 Excel의 Sheet 형식으로 데이터가 리턴이 됩니다. 거기서 필요한 인덱스 정보만을 취득하여 파라미터로 준비하기로해습니다.

4. pandas를 이용한 CSV export

위에서 Index List를 통해서 한개의 Index에 해당하는 모든 레코즈를 가져오도록 설계를 했습니다. 그리고 이내용을 json파일로 만들어서 저장하도록 했습니다.

Pythonpandas를 이용해서 json 데이터를 읽고 해당 내용을 CSV로 변환하기로했습니다.

import pandas as pd

# json file read
def read_json(filename: str) -> dict:
    try:
        with open(filename, "r") as f:
            data = json.loads(f.read())
    except:
        raise Exception(f"Reading {filename} file encountered an error")
    return data

def create_dataframe(data: list) -> pd.DataFrame:
    # Declare an empty dataframe to append records
    dataframe = pd.DataFrame()
    # Looping through each record
    for d in data:
        # Normalize the column levels
        record = pd.json_normalize(d)
        # Append it to the dataframe
        dataframe = dataframe.append(record, ignore_index=True)
    return dataframe

# pandas json to csv
data = read_json(filename="result-1.json")
dataframe = create_dataframe(data=data)
dataframe.to_csv("result.csv", index=False)

마지막 세줄이 제일 중요합니다.

data = read_json(filename="result-1.json")

json파일을 읽고 data라는 변수에 할당합니다.

dataframe = create_dataframe(data=data)

pandas를 통해서 위에서 읽은 data 변수를 파라미터로 넣고 해당 내용을 dataframe으로 만들어줍니다.

dataframe.to_csv("result.csv", index=False)

위에서 읽은 dataframe형태를 CSV 파일로 export하면서 최종 완료합니다.

5. 최종 아키텍처

import requests
import json
from time import sleep
import pandas as pd

resultArray = []

# ES http request를 통한 데이터 획득
def getDataFromES(index) :
    headers = {
        'authority': '{ES 주소}',
        'authorization': 'Basic {ES 계정}',
        'content-type': 'application/json',
        'sec-fetch-site': 'same-origin',
        'sec-fetch-mode': 'cors',
        'accept-language': 'ko-KR,ko;q=0.9,en-US;q=0.8,en;q=0.7',
    }
    params = (
        ('path', index+'/_search'),
        ('method', 'POST'),
    )

    ## query 요청 
    json_data = {
        'size': 1000,
        'query': {
            'match_all': {},
        },
    }
    response = requests.post('{ES 주소}/api/console/proxy', headers=headers, params=params, json=json_data)

    sleep(1)
    print(index + " 완료")
    return response

# 2depth의 json을 nomalize 하는 함수.
def normalize_json(data: dict) -> dict:
    new_data = dict()
    for key, value in data.items():
        if not isinstance(value, dict):
            new_data[key] = value
        else:
            for k, v in value.items():
                new_data[key + "_" + k] = v
    return new_data


# json file read
def read_json(filename: str) -> dict:
    try:
        with open(filename, "r") as f:
            data = json.loads(f.read())
    except:
        raise Exception(f"Reading {filename} file encountered an error")
    return data


def create_dataframe(data: list) -> pd.DataFrame:
    # Declare an empty dataframe to append records
    dataframe = pd.DataFrame()
    # Looping through each record
    for d in data:
        # Normalize the column levels
        record = pd.json_normalize(d)
        # Append it to the dataframe
        dataframe = dataframe.append(record, ignore_index=True)
    return dataframe


#index 리스트 
indexList = ['sample-index-2020-08-25',
'sample-index-2020-08-26',]

# resultArray객체에 모든 jsonobject 추가.
for index in indexList:
    result = getDataFromES(index)
    jsonObject= json.loads(result.text)
    jsonArray = jsonObject.get("hits").get("hits")

    for list in jsonArray:
        new_data = normalize_json(data=list)
        resultArray.append(new_data)


f = open("result-1.json", "w")
f.write(json.dumps(resultArray))
f.close()

# pandas json to csv
data = read_json(filename="result-1.json")
dataframe = create_dataframe(data=data)
dataframe.rename(columns={}, inplace=True)

dataframe.to_csv("result.csv", index=False)


최종코드는 위와 같고, 위의 내용을 그림으로 표현한 내용은 아래와 같습니다.

6. 마치며..

간단한 프로그램을 통해서 전체 ES의 데이터를 export하는 프로그램을 만들어봤습니다. 간단하지만 csv로 export를 통해서 무엇인가를 자동화한거 같아서 매우 뿌듯하면서도 재밌었습니다.

다음시간에는 pandas에 대해서도 정리를 해보는 시간을 갖도록 하겠습니다.

7. 참고

convert nested json to csv in python

답글 남기기

이메일 주소는 공개되지 않습니다. 필수 필드는 *로 표시됩니다