PR

ローカル環境から AWS へ!Gmail スケジュール連携ツール移行ガイド – 第2段階:Gmail からのメール取得と内容の読み取り

モニタにPythonコードが表示され、AWSのロゴとGmail、Google Calendarのアイコンが配置されたイメージ。サーバーレス移行と連携処理の概念を示す。 技術・IT
AWS LambdaによるGmail連携スケジュール管理のサーバーレス化イメージ

前回の第1段階では、AWS でツールを動かすための土台となる AWS Lambda、AWS Secrets Manager、Amazon EventBridge の基本的な設定と、Google Calendar API の利用準備、Gmail のプログラム用パスワードの取得について解説しました。

第2段階となる今回は、いよいよ Lambda 関数に、Gmail からメールを取得し、その内容を読み取るための Python コードを記述していきます。

前回の記事はこちら↓

1. Lambda 関数に Python コードを記述する準備

まずは、AWS Lambda コンソールで、前回作成した Lambda 関数 (gmail-calendar-sync など) の画面を開いてください。

画面中央の「コードソース」という部分に、オンラインのエディタが表示されています。ここに、Gmail からメールを取得し、内容を処理するための Python コードを記述していきます。

2. Gmail への接続とメールの取得 – gmail_connector.py の作成

コードを整理するために、Gmail への接続やメールの取得に関する処理を別のファイル (gmail_connector.py) に記述し、Lambda 関数のメインファイル (main.py) から呼び出すようにします。

まずは、Lambda 関数のコードエディタの左側にあるファイルツリーの「+」ボタンをクリックし、「新しいファイルを作成」を選択します。ファイル名として gmail_connector.py と入力し、「作成」をクリックします。

gmail_connector.py には以下の Python コードを記述します。

Python

import imaplib
import email
from email.header import decode_header
import os
import json
from datetime import datetime, timedelta

def connect_gmail(username, password):
    """Gmail に接続し、IMAP4_SSL オブジェクトを返します。"""
    try:
        print("ログ: Gmail への接続を開始...")
        mail = imaplib.IMAP4_SSL('imap.gmail.com')
        print("ログ: IMAP4_SSL オブジェクトを作成しました。")
        mail.login(username, password)
        print(f"ログ: Gmail へのログインに成功しました (ユーザー: {username})。")
        return mail
    except Exception as e:
        print(f"エラー (Gmail 接続): {e}")
        return None

def select_inbox(mail):
    """受信トレイを選択します。"""
    try:
        mail.select('inbox')
        print("ログ: 受信トレイを選択しました。")
    except Exception as e:
        print(f"エラー (受信トレイ選択): {e}")

def search_unread_emails(mail, sender, days=7):
    """指定された送信者からの過去 days 日以内の未読メールを検索します。"""
    date_limit = (datetime.now() - timedelta(days=days)).strftime("%d-%b-%Y")
    search_query = f'(UNSEEN FROM "{sender}" SINCE "{date_limit}")'
    try:
        status, email_ids = mail.search(None, search_query)
        if status == 'OK' and email_ids[0]:
            return email_ids[0].split()
        return []
    except Exception as e:
        print(f"エラー (未読メール検索): {e}")
        return []

def fetch_email_data(mail, email_id):
    """指定されたメール ID のメールデータを取得します。"""
    try:
        print(f"ログ: メール ID {email_id.decode()} のデータ取得...")
        status, msg_data = mail.fetch(email_id, '(RFC822)')
        if status == 'OK':
            return email.message_from_bytes(msg_data[0][1])
        else:
            print(f"エラー (メール取得): メール ID {email_id.decode()} のデータの取得に失敗しました: {msg_data}")
            return None
    except Exception as e:
        print(f"エラー (メールデータ取得): {e}")
        return None

def get_email_body(msg):
    """メールの本文を取得します。"""
    body = ""
    if msg.is_multipart():
        for part in msg.walk():
            ctype = part.get_content_type()
            cdispo = str(part.get('Content-Disposition'))
            if ctype == 'text/plain' and 'attachment' not in cdispo:
                try:
                    payload = part.get_payload(decode=True)
                    charset = part.get_charset()
                    if isinstance(payload, bytes):
                        body = payload.decode(charset or 'utf-8', errors='ignore')
                    else:
                        body = payload
                    return body
                except Exception as e:
                    print(f"ログ (本文デコード): エラー: {e}")
                    return str(part.get_payload())
    else:
        try:
            payload = msg.get_payload(decode=True)
            charset = msg.get_charset()
            if isinstance(payload, bytes):
                return payload.decode(charset or 'utf-8', errors='ignore')
            return payload
        except Exception as e:
            print(f"ログ (本文デコード): エラー: {e}")
            return str(msg.get_payload())
    return body

def get_sender_email(msg):
    """メールの送信者メールアドレスを取得します。"""
    sender = msg.get('From')
    if sender:
        try:
            decoded_sender = ""
            for header, charset in decode_header(sender):
                if isinstance(header, bytes):
                    decoded_sender += header.decode(charset or 'utf-8')
                else:
                    decoded_sender += header
            match = email.utils.parseaddr(decoded_sender)[1]
            if match:
                return match
        except Exception as e:
            print(f"エラー (送信者デコード): {e}")
            return sender
    return None

def logout_gmail(mail):
    """Gmail からログアウトします。"""
    try:
        mail.logout()
        print("ログ: Gmail からログアウトしました。")
    except Exception as e:
        print(f"エラー (Gmail ログアウト): {e}")
gmail_connector.py の説明

このファイルには、以下の処理が定義されています。

  • connect_gmail:Gmail への接続を行います。
  • select_inbox:受信トレイを選択します。
  • search_unread_emails:指定した送信者からの未読メールを検索します。
  • fetch_email_data:指定したメールのデータを取得します。
  • get_email_body:メールの本文を取得します(multipart やエンコーディングを考慮した処理)。
  • get_sender_email:メールの送信者を取得します。
  • logout_gmail:Gmail からログアウトします。

この gmail_connector.py には、Gmail への接続、メールボックスの選択、未読メールの検索、メールデータの取得、件名と送信者のデコード、様々な文字エンコーディングに対応したメール本文の取得、Gmail からのログアウトといった、IMAP 関連の基本的な処理を関数として定義しています。

get_email_body 関数では、メールが multipart 形式かそうでないかで処理を分け、各パートやメッセージ全体の Content-Type ヘッダーから文字エンコーディングを取得し、本文をデコードしています。bytes 型のペイロードの場合にはデコード処理を行い、エラーが発生した場合にはエラーログを出力し、可能な範囲で情報を返却するようにしています。

3. Lambda 関数のメインコード (main.py) の記述

次に、Lambda 関数のメインファイル (main.py) を作成(または既存のファイルを編集)し、gmail_connector.py の関数を呼び出してメールを取得し、ログに出力する処理を記述します。

main.py に以下の Python コードを記述します。

Python

import os
import json
from gmail_connector import connect_gmail, select_inbox, search_unread_emails, fetch_email_data, logout_gmail, get_email_body, get_sender_email
import boto3

def get_environment_variable(variable_name, default=None):
    """環境変数を取得し、存在しない場合はデフォルト値を返します。"""
    return os.environ.get(variable_name, default)

def get_secret_from_secrets_manager(secret_name, region_name=None):
    """AWS Secrets Manager からシークレットの値を取得します。"""
    if region_name is None:
        region_name = get_environment_variable("AWS_REGION", "ap-northeast-1")
    client = boto3.client("secretsmanager", region_name=region_name)
    try:
        get_secret_value_response = client.get_secret_value(SecretId=secret_name)
        secret = get_secret_value_response['SecretString']
        return json.loads(secret)
    except Exception as e:
        print(f"エラー (Secrets Manager): シークレット '{secret_name}' の取得に失敗しました: {e}")
        raise

def lambda_handler(event, context):
    print("ログ: Lambda 関数の実行を開始しました。")
    secret_name = get_environment_variable("SECRET_NAME")
    if not secret_name:
        print("エラー: 環境変数 SECRET_NAME が設定されていません (Gmail)。")
        return { 'statusCode': 500, 'body': json.dumps('エラー: 環境変数 SECRET_NAME が設定されていません (Gmail)。') }

    secrets = get_secret_from_secrets_manager(secret_name)
    if not secrets:
        print(f"エラー: Gmail のシークレット '{secret_name}' の取得に失敗しました。")
        return { 'statusCode': 500, 'body': json.dumps('エラー: Gmail のシークレットの取得に失敗しました。') }

    gmail_user = secrets.get("gmail_address")
    gmail_password = secrets.get("gmail_password")
    garoon_sender = secrets.get("garoon_sender")

    if not gmail_user or not gmail_password or not garoon_sender:
        print("エラー: 必要な Gmail のシークレット (アドレス、パスワード、送信者) が見つかりません。")
        return { 'statusCode': 500, 'body': json.dumps('エラー: Gmail のシークレットが見つかりません。') }

    mail = connect_gmail(gmail_user, gmail_password)
    if not mail:
        return { 'statusCode': 500, 'body': json.dumps('エラー: Gmail への接続に失敗しました。') }

    try:
        select_inbox(mail)
        email_ids = search_unread_emails(mail, garoon_sender)
        print(f"ログ: 未読メールの ID を取得しました: {email_ids}")

        for email_id in email_ids:
            msg = fetch_email_data(mail, email_id)
            if msg:
                subject = msg.get('Subject')
                sender = get_sender_email(msg)
                body = get_email_body(msg)
                print(f"ログ: メールを受信しました (ID: {email_id.decode()}, 件名: {subject}, 送信者: {sender})")
                print(f"ログ: メール本文:\n{body}")

    except Exception as e:
        print(f"エラー (Lambda Handler): {e}")
        return { 'statusCode': 500, 'body': json.dumps(f'エラー: {str(e)}') }
    finally:
        logout_gmail(mail)
        print("ログ: Gmail からログアウトしました。")

    return { 'statusCode': 200, 'body': json.dumps('未読メールの確認を完了しました。') }

この main.py では、以下の処理を行っています。

  1. 環境変数 SECRET_NAME から Secrets Manager のシークレット名を取得します。
  2. get_secret_from_secrets_manager 関数を使って、Secrets Manager から Gmail の認証情報(メールアドレス、パスワード、Garoon の送信元メールアドレス)を取得します。
  3. gmail_connector.pyconnect_gmail 関数を使って Gmail に接続します。
  4. select_inbox 関数で受信トレイを選択します。
  5. search_unread_emails 関数で、指定された送信者からの未読メールを検索します。
  6. 取得した各メールの ID に対して、fetch_email_data 関数でメールのデータを取得し、decode_subject で件名をデコード、get_sender_email で送信者メールアドレスを取得、get_email_body で本文を取得してログに出力します。
  7. 最後に、logout_gmail 関数で Gmail からログアウトします。

重要な設定:

  • Lambda 関数の環境変数に SECRET_NAME を設定し、Secrets Manager で作成したシークレットの名前(例: gmail-calendar-credentials)を入力してください。

4. Lambda 関数のテスト実行

作成したコードをテストしてみましょう。AWS Lambda コンソールの Lambda 関数の画面で、「テスト」タブを開き、「新しいテストイベントを作成」をクリックします。

イベントテンプレートは「hello-world」などのデフォルトのもので構いません。イベント名は任意で入力し、「作成」をクリックします。

作成したテストイベントを選択し、「テスト」ボタンをクリックします。

CloudWatch Logs を確認すると、Lambda 関数の実行ログが出力されているはずです。Gmail への接続、未読メールの検索、メールの内容のログ出力などが正常に行われているか確認してください。

もしエラーが発生した場合は、ログの内容を確認し、コードに誤りがないか見直してください。

まとめ

この第2段階では、Lambda 関数から Gmail に接続し、未読メールを取得してその内容(件名、送信者、本文)を読み取るための Python コードを作成しました。

次の第3段階では、取得したメールの本文を解析し、スケジュール情報(開始時間、終了時間、タイトルなど)を抽出する処理を実装していきます。

コメント

タイトルとURLをコピーしました