슬랙 무료 플랜의 경우, 90일 단위로 주기적으로 메시지 함을 비웁니다.
이에 따라 추후 백업 및 활용 용도로 메시지를 저장하는 프로젝트를 진행했었습니다.
- 지난 시간에는 슬랙 앱 생성 방법과 토큰 구조 그리고 기본적인 테이블 구조에 대해서 설명했다
- 이 글을 읽기 전에 위 글을 먼저 읽는 것을 추천한다.
- 테이블은 크게 세 가지 종류임. 유저 테이블 / 채널 테이블 / 슬랙 대화 테이블
- 유저 테이블은 위와 같이 user_id, real_name, display_name 3가지 컬럼으로 구성되어 있다. 유저명을 구분하기 위한 목적으로 저장한다.
- 채널 테이블은 위와 같이 channel_id, channel_name, num_member 3가지 컬럼으로 구성되어 있다. 슬랙에서 message API요청을 하기 위해서는 채널 명이 필요하다. 그래서, 위 채널 리스트를 받아서 API 파라미터 값으로 넣는다.
- 특히, 유저 테이블과 채널 테이블은 매 번 업데이트 한다. 왜냐하면, 혹여나 새로운 유저가 생길 수도 있고 새로운 채널이 생길 수도 있기 때문이다.
- 지난 시간 내용은 이 정도로 전달 하면 될 것 같다. 이번 시간에는 메시지 데이터에 대해서 다루어보려고 한다.
- 전체 코드는 아래에서 살펴볼 수 있다. (아직 정리중..)
https://github.com/ddongmiin/geultto_genie_bot
사용 API
(본 글은 성윤님께서 진행하신 genie 프로젝트를 기반으로 작성하였습니다.)
https://github.com/geultto/genie
- API는 총 4종류를 사용한다.
- 앞서, 유저 리스트와 채널 리스트는 설명했었고 본 글에서는 게시글에 대해서 다루려고 한다.
- 본격적으로 들어가기전에 한 가지 염두할 부분이 있다.
https://api.slack.com/docs/rate-limits
- 슬랙은 API 요청 제한량을 Tier로 구분한다.
- 우리가 사용하려는 API들의 Tier는 3티어로 50+ per minute를 권장한다.
- 그래서, time.sleep을 걸지 않고 무한정 요청할 경우 rate limit 에러가 발생할 수 있다.
- 따라서, time.sleep을 걸고 사용하는 것이 좋다.
게시글 조회
https://api.slack.com/methods/conversations.replies
- 게시글 조회 API는 conversations.history이고 Python에서는 app을 생성한 다음 conversations_history 메서드로 호출한다.
- 이 메서드는 여러 가지 파라미터를 받는다.
파라미터 | 설명 |
token | 슬랙 앱의 토큰이다. 앞선 글에서 관련 내용을 확인할 수 있다. |
channel | 메시지를 받아올 채널아이디이다. |
limit | 한 번에 몇 개의 메시지를 받아올지를 결정한다. 슬랙 권장은 200개인데, 최대 1,000개로 입력할 수 있다. |
latest | unixtime으로 end_date정도로 생각하면 될 것 같다. |
oldest | unixtime으로 start_date정도로 생각하면 될 것 같다. |
inclusive | latest와 oldest를 포함할지 여부이다 true로 입력해주면 된다. |
import os
from typing import List, Dict
import json
from datetime import datetime
import ssl
import certifi
import pandas as pd
from slack_sdk import WebClient
from slack_bolt import App
class SlackMessageRetriever:
"""
슬랙 게시글 수집에 필요한 기능들을 정리했습니다.
"""
def __init__(self, env_name: str):
self.token = os.environ.get(env_name)
self.ssl_context = ssl.create_default_context(cafile=certifi.where())
self.app = App(client=WebClient(token=self.token, ssl=self.ssl_context))
def read_post_from_slack(
self, start_unixtime: float, end_unixtime: float, channel_id: str
) -> List[Dict]:
"""
conversations_history API를 이용해 슬랙 게시글을 불러옵니다.
Parameters
----------
start_unixtime : float
게시글 시작일시입니다. Unixtime값을 넣어줘야 합니다.
end_unixtime : float
게시글 종료일시입니다. Unixtime값을 넣어줘야 합니다.
channel_id : str
수집할 채널명입니다.
Returns
-------
List[Dict]
list dict 형태로 게시글이 출력됩니다.
"""
response = self.app.client.conversations_history(
channel=channel_id,
limit=1000,
inclusive="true",
oldest=start_unixtime,
latest=end_unixtime,
)
return response["messages"]
- 나의 경우, 위와 같이 별도 클래스를 구성하여 slack_app을 상속받아 사용하였다.
- 몇 가지 설명할 부분이 있다.
- self.ssl_context = ssl.create_default_context(cafile=certifi.where())
- 이건 간혹 ssl 인증 에러가 나는 경우가 있어 추가하였다. (에러가 없어지기는 했는데, time.sleep을 걸어서인지 이 코드 때문인지 잘 모르겠다. 일단 그대로 두었다.)
- read_post_from_slack의 return값에는 [{}, {}, ....] 형태로 메시지들이 저장되어 있다.
- 해당 메시지는 어떻게 파싱할까?
def convert_post_to_dict(channel_id: str, post: Dict) -> Dict:
"""
post dict내에서 필요한 정보만 수집합니다.
Parameters
----------
channel_id : str
채널명은 dict안에 없으므로 파라미터로 추가합니다.
post : Dict
dict 형태의 post입니다.
Returns
-------
Dict
채널id, 메시지타입(post), 게시글id, 유저id, 작성시간, 작성일자, 게시글, 리액션(종류, 사람, 숫자)을 수집합니다.
"""
return {
"channel_id": channel_id,
"message_type": "post",
"post_id": post["user"]
+ "-"
+ datetime.fromtimestamp(float(post["ts"])).strftime("%Y-%m-%d-%H-%M-%S-%f"),
"user_id": post["user"],
"createtime": datetime.fromtimestamp(float(post["ts"])).strftime(
"%Y-%m-%dT%H:%M:%S.%f"
),
"tddate": datetime.fromtimestamp(float(post["ts"])).strftime("%Y-%m-%d"),
"text": post["text"],
"reactions": json.dumps(
[
{
"name": react_dict["name"],
"user_id": react_dict["users"],
"count": react_dict["count"],
}
for react_dict in post.get("reactions", [])
],
ensure_ascii=False,
),
}
- channel_id는 message안에 없는 값이라 별도로 추가해준다.
- 앞선 글에서 게시글의 유저id와 게시시간(unixtime)을 합치면 게시글과 쓰레드를 구분할 수 있는 키 값이 된다고 얘기했었다.
- 그 내용이 위 코드에 표현되어 있다. 메시지안의 게시글=ts를 나노세컨즈 단위의 문자열로 파싱한다.
- createtime은 작성 시간을 표기한다. fromtimestamp를 사용하면 unixtime이 자동으로 kst 기준으로 변환된다.
- tddate는 일자로 나타내어 파티션 키로 사용하였다. (해당 데이터를 빅쿼리에 적재했는데 빅쿼리에서는 시간 관련 타입만 파티션 키로 사용할 수 있다.)
- reactions에서는 이모지와 이모지를 입력한 유저명 그리고 count수를 기록할 수 있다. 여기서 한가지 주의사항이 잇다.
- 간혹 한글로 표기된 이모티콘 파싱이 잘 안되는 경우가 있는데 위와 같이 json.dumps에서 ensure_ascii를 False로 체킹해주면 한글도 잘 표기된다.
- 여기까지 진행하면 위와 같은 데이터를 볼 수 있다. text와 text에 대한 reactions도 모두 기록한다.
게시글 쓰레드 조회
https://api.slack.com/methods/conversations.replies
- 쓰레드 조회 API의 경우 게시글 조회 API와 거의 동일하나
- ts즉 게시글의 unixtime을 파라미터로 받는다는 차이점이 있다.
def read_thread_from_slack(self, channel_id: str, thread_ts: float) -> List[Dict]:
"""
conversations_replies API를 이용해 슬랙 게시글의 쓰레드를 불러옵니다.
Parameters
----------
channel_id : str
수집할 채널 명입니다.
thread_ts : float
게시글 작성 시간(unixtime)을 넣어줘야 헤당 게시글의 쓰레드를 확인할 수 있습니다.
Returns
-------
List[Dict]
list dict 형태로 출력됩니다.
0번째 인덱스는 게시글이므로 제외합니다.
"""
thread = self.app.client.conversations_replies(channel=channel_id, ts=thread_ts)
return thread["messages"]
- 쓰레드의 경우 conversations_replies 메서드를 사용한다. ts는 게시글의 unixtime이다.
- 쓰레드는 어떻게 파싱할까?
def convert_thread_to_dict(channel_id: str, thread: Dict) -> Dict:
"""
thread dict내에서 필요한 정보만 수집합니다.
Parameters
----------
channel_id : str
채널명은 dict안에 없으므로 파라미터로 추가합니다.
thread : Dict
dict 형태의 thread입니다.
Returns
-------
Dict
채널id, 메시지타입(thread), 게시글id, 유저id, 작성시간, 작성일자, 게시글, 리액션(종류, 사람, 숫자)을 수집합니다.
"""
parent_user_id = (
thread["parent_user_id"]
if "parent_user_id" in thread.keys()
else thread["root"]["user"]
)
return {
"channel_id": channel_id,
"message_type": "thread",
"post_id": parent_user_id
+ "-"
+ datetime.fromtimestamp(float(thread["thread_ts"])).strftime("%Y-%m-%d-%H-%M-%S-%f"),
"user_id": thread["user"],
"createtime": datetime.fromtimestamp(float(thread["ts"])).strftime(
"%Y-%m-%dT%H:%M:%S.%f"
),
"tddate": datetime.fromtimestamp(float(thread["ts"])).strftime("%Y-%m-%d"),
"text": thread["text"],
"reactions": json.dumps(
[
{
"name": react_dict["name"],
"user_id": react_dict["users"],
"count": react_dict["count"],
}
for react_dict in thread.get("reactions", [])
],
ensure_ascii=False,
),
}
- 게시글과 거의 동일해서 차이점 있는 부분만 설명하겠다.
"post_id": parent_user_id
+ "-"
+ datetime.fromtimestamp(float(thread["thread_ts"])).strftime("%Y-%m-%d-%H-%M-%S-%f"),
- post_id에서 parent_user_id를 받는다.
- 이 parent_user_id는 게시글을 작성한 유저의 id이다.
parent_user_id = (
thread["parent_user_id"]
if "parent_user_id" in thread.keys()
else thread["root"]["user"]
)
- thread안에 parent_user_id가 남기도 하지만 간혹 전체 채널로 전송하기 버튼을 체크하는 경우 root안에 포함되는 경우가 있다. 그래서 ['root']로 한번 더 처리해준다.
- 여기까지 진행하고 나면 포스트와 거의 동일한 구조로 쓰레드를 받을 수 있다.
진행하는 과정에서 있었던 사건들
- 진행하는 과정에서 토큰이 노출되는 경우가 있었다. 물론, public을 하기 전에 당연히 다 지우고 올렸지만.. 내가 생각못했던 예전 커밋에 기록이 남아 있었던 것...
- 당시에는 상당히 당황했었는데.. 지금 생각해보면 토큰이나 키파일을 그냥 환경변수로 지정해주면 될 일이었다.
- 성윤님께서 최초에 작성해주신 코드에도 환경변수로 지정하라고 되어 있었다..
- 실제 서비스였더라면.. 정말 진땀이 났을 것 같다.
- 여튼 지금은 구글키도 슬랙토큰도 사이좋게 환경변수로 지정해두었다.
os.environ.get(env_name)
- 받아올때도 위와 같이 환경변수로 받아온다.
- 그리고, 시간 관련된 변수들을 넣으면서 몇 가지? 어려운 부분들이 있었다.
- createtime과 tddate는 DATETIME과 DATE이다. 각각
- 처음에 timestamp로 넣었더니 계속 utc표시가 붙어서 createtime은 DATETIME으로 넣었고
- tddate는 아무리 해도... 이상하게 값이 안들어갔다.
https://github.com/apache/airflow/issues/26248
- 위 이슈였는데, 너무 해결이 안돼서 답답했었다.
- 해결법은 생각외로 심플했는데
https://pandas-gbq.readthedocs.io/_/downloads/en/latest/pdf/
- gbq가이드 문서에 잇는 타입 그대로 넣어주는 것
message_df["tddate"] = pd.to_datetime(message_df["tddate"], format="%Y-%m-%d")
message_df["createtime"] = pd.to_datetime(
message_df["createtime"], format="%Y-%m-%dT%H:%M:%S.%f"
)
- 지금 생각해봐도 굉장히 어이없지만, 그냥 datetime64[ns] 타입으로 바꿔주니 해결되는 문제였다.
- 스키마를 사전에 설정하는 부분도 살짝 애먹었었는데
https://github.com/zzsza/kyle-school/blob/master/week6/data/bike_schema.json
- 성윤님이 말씀주신대로 위 파일과 같이 사전에 스키마를 설정하니 편하게 관리할 수 있다.
마무리하며..
- 여튼 이렇게 우여곡절끝에 기간이 만료되기 전에 데이터를 저장할 수 있었다.
- 다음 시간에는 빅쿼리 적재 관련해서 글을 작성하고 본 시리즈를 마무리하려고 한다.
'딥상어동의 딥한 프로그래밍 > 엔지니어링' 카테고리의 다른 글
MY FIRST DBT - (1) Bigquery 연결해보기 + 폴더 구조에 대한 가벼운 이해 (5) | 2024.11.10 |
---|---|
Airflow DAG 개념 톺아보기 (설치/실행 과정 포함) (4) | 2024.10.13 |
우당탕탕 슬랙 메시지 저장기(1) - 슬랙 메시지 넌 누구냐? (8) | 2023.05.07 |
데이터 마트에서는 뭘 파나요?(feat. OLTP, OLAP) (14) | 2023.03.26 |
[Bigquery] 지난 며칠 간 Python과 연동하여 사용한 소감 (0) | 2023.01.18 |
제 블로그에 와주셔서 감사합니다! 다들 오늘 하루도 좋은 일 있으시길~~
포스팅이 좋았다면 "좋아요❤️" 또는 "구독👍🏻" 해주세요!