딥상어동의 딥한 프로그래밍/엔지니어링

우당탕탕 슬랙 메시지 저장기(2) - 게시글과 쓰레드 조회하기

딥상어동의 딥한생각 2023. 5. 21. 23:56
슬랙 무료 플랜의 경우, 90일 단위로 주기적으로 메시지 함을 비웁니다.
이에 따라 추후 백업 및 활용 용도로 메시지를 저장하는 프로젝트를 진행했었습니다.

 

https://gibles-deepmind.tistory.com/entry/%EC%9A%B0%EB%8B%B9%ED%83%95%ED%83%95-%EC%8A%AC%EB%9E%99-%EB%A9%94%EC%8B%9C%EC%A7%80-%EC%A0%80%EC%9E%A5%EA%B8%B01-%EC%8A%AC%EB%9E%99-%EB%A9%94%EC%8B%9C%EC%A7%80-%EB%84%8C-%EB%88%84%EA%B5%AC%EB%83%90

 

우당탕탕 슬랙 메시지 저장기(1) - 슬랙 메시지 넌 누구냐?

슬랙 무료 플랜의 경우, 90일 단위로 주기적으로 메시지 함을 비웁니다. 이에 따라 추후 백업 및 활용 용도로 메시지를 저장하는 프로젝트를 진행했었습니다. ※음슴체주의※ - 슬랙에서 데이터

gibles-deepmind.tistory.com

- 지난 시간에는 슬랙 앱 생성 방법과 토큰 구조 그리고 기본적인 테이블 구조에 대해서 설명했다

- 이 글을 읽기 전에 위 글을 먼저 읽는 것을 추천한다. 

- 테이블은 크게 세 가지 종류임. 유저 테이블 / 채널 테이블 / 슬랙 대화 테이블

- 유저 테이블은 위와 같이 user_id, real_name, display_name 3가지 컬럼으로 구성되어 있다. 유저명을 구분하기 위한 목적으로 저장한다.

- 채널 테이블은 위와 같이 channel_id, channel_name, num_member 3가지 컬럼으로 구성되어 있다. 슬랙에서 message API요청을 하기 위해서는 채널 명이 필요하다. 그래서, 위 채널 리스트를 받아서 API 파라미터 값으로 넣는다.

- 특히, 유저 테이블과 채널 테이블은 매 번 업데이트 한다. 왜냐하면, 혹여나 새로운 유저가 생길 수도 있고 새로운 채널이 생길 수도 있기 때문이다.

- 지난 시간 내용은 이 정도로 전달 하면 될 것 같다. 이번 시간에는 메시지 데이터에 대해서 다루어보려고 한다. 

- 전체 코드는 아래에서 살펴볼 수 있다. (아직 정리중..)

https://github.com/ddongmiin/geultto_genie_bot

 

GitHub - ddongmiin/geultto_genie_bot: 글또 데이터 수집 슬랙 봇 개발기

글또 데이터 수집 슬랙 봇 개발기. Contribute to ddongmiin/geultto_genie_bot development by creating an account on GitHub.

github.com


사용 API

(본 글은 성윤님께서 진행하신 genie 프로젝트를 기반으로 작성하였습니다.)

https://github.com/geultto/genie


- API는 총 4종류를 사용한다.

API 종류 링크
유저 리스트 https://api.slack.com/methods/users.list
채널 리스트 https://api.slack.com/methods/conversations.list
게시글 조회 https://api.slack.com/methods/conversations.history
게시글 쓰레드 조회 https://api.slack.com/methods/conversations.replies

- 앞서, 유저 리스트와 채널 리스트는 설명했었고 본 글에서는 게시글에 대해서 다루려고 한다.

- 본격적으로 들어가기전에 한 가지 염두할 부분이 있다.

https://api.slack.com/docs/rate-limits

 

Rate Limits

All good things in moderation: how rate limiting works throughout the platform.

api.slack.com

- 슬랙은 API 요청 제한량을 Tier로 구분한다. 

- 우리가 사용하려는 API들의 Tier는 3티어로 50+ per minute를 권장한다.

- 그래서, time.sleep을 걸지 않고 무한정 요청할 경우 rate limit 에러가 발생할 수 있다.

- 따라서, time.sleep을 걸고 사용하는 것이 좋다.


게시글 조회


https://api.slack.com/methods/conversations.replies

 

conversations.replies API method

Retrieve a thread of messages posted to a conversation

api.slack.com

- 게시글 조회 API는 conversations.history이고 Python에서는 app을 생성한 다음 conversations_history 메서드로 호출한다. 

- 이 메서드는 여러 가지 파라미터를 받는다. 

파라미터 설명
token 슬랙 앱의 토큰이다. 앞선 글에서 관련 내용을 확인할 수 있다.
channel 메시지를 받아올 채널아이디이다.
limit 한 번에 몇 개의 메시지를 받아올지를 결정한다. 슬랙 권장은 200개인데, 최대 1,000개로 입력할 수 있다. 
latest unixtime으로 end_date정도로 생각하면 될 것 같다.
oldest unixtime으로 start_date정도로 생각하면 될 것 같다.
inclusive latest와 oldest를 포함할지 여부이다 true로 입력해주면 된다.
import os
from typing import List, Dict
import json
from datetime import datetime
import ssl
import certifi

import pandas as pd
from slack_sdk import WebClient
from slack_bolt import App


class SlackMessageRetriever:
    """
    슬랙 게시글 수집에 필요한 기능들을 정리했습니다.
    """

    def __init__(self, env_name: str):
        self.token = os.environ.get(env_name)
        self.ssl_context = ssl.create_default_context(cafile=certifi.where())
        self.app = App(client=WebClient(token=self.token, ssl=self.ssl_context))

    def read_post_from_slack(
        self, start_unixtime: float, end_unixtime: float, channel_id: str
    ) -> List[Dict]:
        """
        conversations_history API를 이용해 슬랙 게시글을 불러옵니다.

        Parameters
        ----------
        start_unixtime : float
            게시글 시작일시입니다. Unixtime값을 넣어줘야 합니다.
        end_unixtime : float
            게시글 종료일시입니다. Unixtime값을 넣어줘야 합니다.
        channel_id : str
            수집할 채널명입니다.

        Returns
        -------
        List[Dict]
            list dict 형태로 게시글이 출력됩니다.
        """
        response = self.app.client.conversations_history(
            channel=channel_id,
            limit=1000,
            inclusive="true",
            oldest=start_unixtime,
            latest=end_unixtime,
        )
        return response["messages"]

- 나의 경우, 위와 같이 별도 클래스를 구성하여 slack_app을 상속받아 사용하였다. 

- 몇 가지 설명할 부분이 있다. 

- self.ssl_context = ssl.create_default_context(cafile=certifi.where())

- 이건 간혹 ssl 인증 에러가 나는 경우가 있어 추가하였다. (에러가 없어지기는 했는데, time.sleep을 걸어서인지 이 코드 때문인지 잘 모르겠다. 일단 그대로 두었다.) 

- read_post_from_slack의 return값에는 [{}, {}, ....] 형태로 메시지들이 저장되어 있다. 

- 해당 메시지는 어떻게 파싱할까?

    def convert_post_to_dict(channel_id: str, post: Dict) -> Dict:
        """
        post dict내에서 필요한 정보만 수집합니다.

        Parameters
        ----------
        channel_id : str
            채널명은 dict안에 없으므로 파라미터로 추가합니다.
        post : Dict
            dict 형태의 post입니다.

        Returns
        -------
        Dict
            채널id, 메시지타입(post), 게시글id, 유저id, 작성시간, 작성일자, 게시글, 리액션(종류, 사람, 숫자)을 수집합니다.
        """
        return {
            "channel_id": channel_id,
            "message_type": "post",
            "post_id": post["user"]
            + "-"
            + datetime.fromtimestamp(float(post["ts"])).strftime("%Y-%m-%d-%H-%M-%S-%f"),
            "user_id": post["user"],
            "createtime": datetime.fromtimestamp(float(post["ts"])).strftime(
                "%Y-%m-%dT%H:%M:%S.%f"
            ),
            "tddate": datetime.fromtimestamp(float(post["ts"])).strftime("%Y-%m-%d"),
            "text": post["text"],
            "reactions": json.dumps(
                [
                    {
                        "name": react_dict["name"],
                        "user_id": react_dict["users"],
                        "count": react_dict["count"],
                    }
                    for react_dict in post.get("reactions", [])
                ],
                ensure_ascii=False,
            ),
        }

- channel_id는 message안에 없는 값이라 별도로 추가해준다.

- 앞선 글에서 게시글의 유저id와 게시시간(unixtime)을 합치면 게시글과 쓰레드를 구분할 수 있는 키 값이 된다고 얘기했었다.

"post_id": post["user"]
+ "-"
+ datetime.fromtimestamp(float(post["ts"])).strftime("%Y-%m-%d-%H-%M-%S-%f"), 

- 그 내용이 위 코드에 표현되어 있다. 메시지안의 게시글=ts를 나노세컨즈 단위의 문자열로 파싱한다.

- createtime은 작성 시간을 표기한다. fromtimestamp를 사용하면 unixtime이 자동으로 kst 기준으로 변환된다. 

- tddate는 일자로 나타내어 파티션 키로 사용하였다. (해당 데이터를 빅쿼리에 적재했는데 빅쿼리에서는 시간 관련 타입만 파티션 키로 사용할 수 있다.)

- reactions에서는 이모지와 이모지를 입력한 유저명 그리고 count수를 기록할 수 있다. 여기서 한가지 주의사항이 잇다. 

"reactions": json.dumps(
[
{
"name": react_dict["name"],
"user_id": react_dict["users"],
"count": react_dict["count"],
}
for react_dict in thread.get("reactions", [])
],
ensure_ascii=False,
),

- 간혹 한글로 표기된 이모티콘 파싱이 잘 안되는 경우가 있는데 위와 같이 json.dumps에서 ensure_ascii를 False로 체킹해주면 한글도 잘 표기된다. 

- 여기까지 진행하면 위와 같은 데이터를 볼 수 있다. text와 text에 대한 reactions도 모두 기록한다.


게시글 쓰레드 조회


https://api.slack.com/methods/conversations.replies

 

conversations.replies API method

Retrieve a thread of messages posted to a conversation

api.slack.com

- 쓰레드 조회 API의 경우 게시글 조회 API와 거의 동일하나

- ts즉 게시글의 unixtime을 파라미터로 받는다는 차이점이 있다.

    def read_thread_from_slack(self, channel_id: str, thread_ts: float) -> List[Dict]:
        """
        conversations_replies API를 이용해 슬랙 게시글의 쓰레드를 불러옵니다.

        Parameters
        ----------
        channel_id : str
            수집할 채널 명입니다.
        thread_ts : float
            게시글 작성 시간(unixtime)을 넣어줘야 헤당 게시글의 쓰레드를 확인할 수 있습니다.

        Returns
        -------
        List[Dict]
            list dict 형태로 출력됩니다.
            0번째 인덱스는 게시글이므로 제외합니다.
        """
        thread = self.app.client.conversations_replies(channel=channel_id, ts=thread_ts)
        return thread["messages"]

- 쓰레드의 경우 conversations_replies 메서드를 사용한다. ts는 게시글의 unixtime이다.

- 쓰레드는 어떻게 파싱할까?

    def convert_thread_to_dict(channel_id: str, thread: Dict) -> Dict:
        """
        thread dict내에서 필요한 정보만 수집합니다.

        Parameters
        ----------
        channel_id : str
            채널명은 dict안에 없으므로 파라미터로 추가합니다.
        thread : Dict
            dict 형태의 thread입니다.

        Returns
        -------
        Dict
            채널id, 메시지타입(thread), 게시글id, 유저id, 작성시간, 작성일자, 게시글, 리액션(종류, 사람, 숫자)을 수집합니다.
        """
        parent_user_id = (
            thread["parent_user_id"]
            if "parent_user_id" in thread.keys()
            else thread["root"]["user"]
        )
        return {
            "channel_id": channel_id,
            "message_type": "thread",
            "post_id": parent_user_id
            + "-"
            + datetime.fromtimestamp(float(thread["thread_ts"])).strftime("%Y-%m-%d-%H-%M-%S-%f"),
            "user_id": thread["user"],
            "createtime": datetime.fromtimestamp(float(thread["ts"])).strftime(
                "%Y-%m-%dT%H:%M:%S.%f"
            ),
            "tddate": datetime.fromtimestamp(float(thread["ts"])).strftime("%Y-%m-%d"),
            "text": thread["text"],
            "reactions": json.dumps(
                [
                    {
                        "name": react_dict["name"],
                        "user_id": react_dict["users"],
                        "count": react_dict["count"],
                    }
                    for react_dict in thread.get("reactions", [])
                ],
                ensure_ascii=False,
            ),
        }

- 게시글과 거의 동일해서 차이점 있는 부분만 설명하겠다. 

            "post_id": parent_user_id
            + "-"
            + datetime.fromtimestamp(float(thread["thread_ts"])).strftime("%Y-%m-%d-%H-%M-%S-%f"),

- post_id에서 parent_user_id를 받는다.

- 이 parent_user_id는 게시글을 작성한 유저의 id이다. 

        parent_user_id = (
            thread["parent_user_id"]
            if "parent_user_id" in thread.keys()
            else thread["root"]["user"]
        )

- thread안에 parent_user_id가 남기도 하지만 간혹 전체 채널로 전송하기 버튼을 체크하는 경우 root안에 포함되는 경우가 있다. 그래서 ['root']로 한번 더 처리해준다.

- 여기까지 진행하고 나면 포스트와 거의 동일한 구조로 쓰레드를 받을 수 있다.


진행하는 과정에서 있었던 사건들


 

- 진행하는 과정에서 토큰이 노출되는 경우가 있었다. 물론, public을 하기 전에 당연히 다 지우고 올렸지만.. 내가 생각못했던 예전 커밋에 기록이 남아 있었던 것...

- 당시에는 상당히 당황했었는데.. 지금 생각해보면 토큰이나 키파일을 그냥 환경변수로 지정해주면 될 일이었다. 

- 성윤님께서 최초에 작성해주신 코드에도 환경변수로 지정하라고 되어 있었다.. 

- 실제 서비스였더라면.. 정말 진땀이 났을 것 같다.

- 여튼 지금은 구글키도 슬랙토큰도 사이좋게 환경변수로 지정해두었다.

os.environ.get(env_name)

- 받아올때도 위와 같이 환경변수로 받아온다.

 

- 그리고, 시간 관련된 변수들을 넣으면서 몇 가지? 어려운 부분들이 있었다.

- createtime과 tddate는 DATETIME과 DATE이다. 각각

- 처음에 timestamp로 넣었더니 계속 utc표시가 붙어서 createtime은 DATETIME으로 넣었고

- tddate는 아무리 해도... 이상하게 값이 안들어갔다.

https://github.com/apache/airflow/issues/26248

 

BaseSQLToGCSOperator Parquet Format Fails to Write Dates/JSON · Issue #26248 · apache/airflow

Apache Airflow Provider(s) google Versions of Apache Airflow Providers 8.3.0 Apache Airflow version 2.3.4 Operating System OSX Deployment Virtualenv installation Deployment details No response What...

github.com

- 위 이슈였는데, 너무 해결이 안돼서 답답했었다.

- 해결법은 생각외로 심플했는데

https://pandas-gbq.readthedocs.io/_/downloads/en/latest/pdf/

- gbq가이드 문서에 잇는 타입 그대로 넣어주는 것

        message_df["tddate"] = pd.to_datetime(message_df["tddate"], format="%Y-%m-%d")
        message_df["createtime"] = pd.to_datetime(
            message_df["createtime"], format="%Y-%m-%dT%H:%M:%S.%f"
        )

- 지금 생각해봐도 굉장히 어이없지만, 그냥 datetime64[ns] 타입으로 바꿔주니 해결되는 문제였다.

 

- 스키마를 사전에 설정하는 부분도 살짝 애먹었었는데

https://github.com/zzsza/kyle-school/blob/master/week6/data/bike_schema.json

 

GitHub - zzsza/kyle-school: 쏘카 데이터 그룹 사내 신입/인턴을 대상으로 한 카일스쿨

쏘카 데이터 그룹 사내 신입/인턴을 대상으로 한 카일스쿨. Contribute to zzsza/kyle-school development by creating an account on GitHub.

github.com

- 성윤님이 말씀주신대로 위 파일과 같이 사전에 스키마를 설정하니 편하게 관리할 수 있다.


마무리하며..


- 여튼 이렇게 우여곡절끝에 기간이 만료되기 전에 데이터를 저장할 수 있었다.

- 다음 시간에는 빅쿼리 적재 관련해서 글을 작성하고 본 시리즈를 마무리하려고 한다.