ジャコ Lab

プログラミング関連のメモ帳的ブログです

AWS AppSync Events が罠すぎた件

AWS AppSync Events が罠すぎた件
AWS AppSync Events が罠すぎた件

ご無沙汰しております!
Obsidian にメモをまとめていますが、書かなくなってしまいました!w

プロフィールにはこう書いてあります。

調べたものは後でまとめようとメモしているのですが、メモ止まりで全然やらないので調べたときに書こうと思います。

このブログの目的が果たせなくなっていますw


さて、本日は、これはメッチャハマった!!!という内容で、
これは残しておかねば・・・と思ったものになります。

以前、普通の AppSync でもハマっていたので AWS AppSync 嫌いになりそうです。

はじめに

AWS AppSync Events は、GraphQLを使わずにリアルタイム通信を実現できるAWSの新しいサービスです。従来のAppSyncがGraphQLベースだったのに対し、AppSync Eventsはより軽量でシンプルなWebSocket接続を提供します。

しかし、実際にPythonでBoto3を使わずにWebSocket接続を試みたところ、AWSコンソールの表示や公式ドキュメントの断片的な情報だけでは分からない複数のトラップにハマりました。本記事では、これらの罠とその解決策を共有します。

1. AWSコンソールのエンドポイント表示が不親切 (序章)

AWS AppSync のダッシュボードには以下のように記載されています。

名前 API タイプ HTTP エンドポイント リアルタイムエンドポイント API ID プライマリ認証モード 作成済み
my-event-api イベント https://xxxxx.appsync-api.ap-northeast-1.amazonaws.com wss://xxxxx.appsync-realtime-api.ap-northeast-1.amazonaws.com xxxxxxxxxxxxxxxxxxxxxxxxxx API_KEY YYYY/MM/DD HH:mm TZ

まぁ、これだけ見ると wss://xxxxx.appsync-realtime-api.ap-northeast-1.amazonaws.com にアクセスすれば良さそうですよね...

実際、以下のコードを実行すると接続できます。

import asyncio
import os

import websockets

WSS_ENDPOINT = "wss://xxxxx.appsync-realtime-api.ap-northeast-1.amazonaws.com"
API_KEY = "xxxxxxxxxxxx"


class AppSyncEventClient:
    def __init__(self):
        self.wss_endpoint = WSS_ENDPOINT 
        self.api_key = API_KEY
        self.websocket = None

    async def connect(self):
        """AppSync Event WebSocketに接続"""
        try:
            # WebSocket接続
            self.websocket = await websockets.connect(self.wss_endpoint)
            print("WebSocket接続が確立されました")
            return True
        except Exception as e:
            print(f"接続に失敗しました: {e}")
            return False

    async def disconnect(self):
        """WebSocket接続を切断"""
        if self.websocket:
            await self.websocket.close()
            print("WebSocket接続を切断しました")


async def main():
    client = AppSyncEventClient()
    if await client.connect():
        await client.disconnect()

if __name__ == "__main__":
    asyncio.run(main())
$ python app.py 
WebSocket接続が確立されました
WebSocket接続を切断しました
うんうん。順調。

しかし、次のイベント送信で失敗してしまいます。

connection_init イベント

コードを少し書き換えて connection_init イベントを送信してみます。

    import asyncio
    import json
    import os

    WSS_ENDPOINT = "wss://xxxxx.appsync-realtime-api.ap-northeast-1.amazonaws.com"
    API_KEY = "xxxxxxxxxxxx"


    class AppSyncEventClient:
        def __init__(self):
            self.wss_endpoint = WSS_ENDPOINT
            self.api_key = API_KEY
            self.websocket = None

        async def connect(self):
            """AppSync Event WebSocketに接続"""
            try:
                # WebSocket接続
                self.websocket = await websockets.connect(self.wss_endpoint)
                print("WebSocket接続が確立されました")
                return True
            except Exception as e:
                print(f"接続に失敗しました: {e}")
                return False

        async def disconnect(self):
            """WebSocket接続を切断"""
            if self.websocket:
                await self.websocket.close()
                print("WebSocket接続を切断しました")

+       async def connection_init(self):
+           """connection_init イベント発行"""
+           message = {
+               "type": "connection_init",
+               "authorization": {
+                   "x-api-key": self.api_key
+               }
+           }
+           await self.websocket.send(json.dumps(message))
+           print("connection_initメッセージを送信しました:")
+
+           try:
+               await self.__wait_for_connection_ack()
+           except asyncio.TimeoutError:
+               print("connection_ack待機タイムアウト")
+               return False
+           except Exception as e:
+               print(f"connection_ack待機エラー: {e}")
+               return False
+
+       async def __wait_for_connection_ack(self):
+           """connection_ack メッセージを待機"""
+           print("connection_initメッセージを送信しました:")
+           try:
+               response = await asyncio.wait_for(self.websocket.recv(), timeout=30.0)
+               response_data = json.loads(response)
+               if response_data.get("type") == "connection_ack":
+                   timeout_ms = response_data.get("connectionTimeoutMs", 300000)
+                   print(f"connection_ack受信 (タイムアウト: {timeout_ms}ms)")
+                   return True
+               else:
+                   print(f"予期しない応答: {response_data}")
+                   return False
+           except asyncio.TimeoutError:
+               print("connection_ack メッセージ待機タイムアウト")
+               return None
+           except Exception as e:
+               print(f"connection_ack メッセージ待機エラー: {e}")
+               return None


    async def main():
        client = AppSyncEventClient()
        if await client.connect():
+           await client.connection_init()
            await client.disconnect()


    if __name__ == "__main__":
        asyncio.run(main())
$ python app.py 
WebSocket接続が確立されました
connection_initメッセージを送信しました:
予期しない応答: {'payload': {'errors': [{'message': 'NoProtocolError', 'errorCode': 400}]}, 'type': 'connection_error'}
WebSocket接続を切断しました
NoProtocol Error...

ここで分かったのが サブプロトコル という存在です。

確かに Chrome の DevTool を確認しても以下のような情報が入っていますね。

sec-websocket-protocol
aws-appsync-event-ws, header-{BASE64_ENCODE_STRING}

2. サブプロトコルでの認証が必要

docs.aws.amazon.com

こちらに従い、サブプロトコルを設定してみます。

    import asyncio
    import base64
    import json
    import os

    import websockets

+   HTTP_ENDPOINT = "https://xxxxx.appsync-api.ap-northeast-1.amazonaws.com"
    WSS_ENDPOINT = "wss://xxxxx.appsync-realtime-api.ap-northeast-1.amazonaws.com"
    API_KEY = "xxxxxxxxxxxx"


    class AppSyncEventClient:
        def __init__(self):
            self.http_endpoint = HTTP_ENDPOINT
            self.wss_endpoint = WSS_ENDPOINT
            self.api_key = API_KEY
            self.websocket = None

        async def connect(self):
            """AppSync Event WebSocketに接続"""
            try:
+               # 認証subprotocolを作成
+               auth_protocol = self.__create_auth_subprotocol()

+               # WebSocket接続
-               self.websocket = await websockets.connect(self.wss_endpoint)
+               self.websocket = await websockets.connect(self.wss_endpoint, subprotocols=["aws-appsync-event-ws", auth_protocol])
                print("WebSocket接続が確立されました")
                return True
            except Exception as e:
                print(f"接続に失敗しました: {e}")
                return False

+       def __create_auth_subprotocol(self):
+           """認証用のサブプロトコルを作成"""
+           http_host = self.http_endpoint.replace('https://', '').split('/')[0]
+           auth_obj = {
+               "host": http_host,
+               "x-api-key": self.api_key
+           }

+           # Base64エンコード
+           auth_json = json.dumps(auth_obj, separators=(',', ':'))
+           auth_b64 = base64.b64encode(auth_json.encode()).decode()
+           return f"header-{auth_b64}"

        async def disconnect(self):
            """WebSocket接続を切断"""
            if self.websocket:
                await self.websocket.close()
                print("WebSocket接続を切断しました")

        async def connection_init(self):
            """connection_init イベント発行"""
            message = {
                "type": "connection_init",
                "authorization": {
                    "x-api-key": self.api_key
                }
            }
            await self.websocket.send(json.dumps(message))
            print("connection_initメッセージを送信しました:")

            try:
                await self.__wait_for_connection_ack()
            except asyncio.TimeoutError:
                print("connection_ack待機タイムアウト")
                return False
            except Exception as e:
                print(f"connection_ack待機エラー: {e}")
                return False

        async def __wait_for_connection_ack(self):
            """connection_ack メッセージを待機"""
            try:
                response = await asyncio.wait_for(self.websocket.recv(), timeout=30.0)
                response_data = json.loads(response)

                if response_data.get("type") == "connection_ack":
                    timeout_ms = response_data.get(
                        "connectionTimeoutMs", 300000)
                    print(f"connection_ack受信 (タイムアウト: {timeout_ms}ms)")
                    return True
                else:
                    print(f"予期しない応答: {response_data}")
                    return False
            except asyncio.TimeoutError:
                print("connection_ack メッセージ待機タイムアウト")
                return None
            except Exception as e:
                print(f"connection_ack メッセージ待機エラー: {e}")
                return None


    async def main():
        client = AppSyncEventClient()
        if await client.connect():
            await client.connection_init()
            await client.disconnect()

    if __name__ == "__main__":
        asyncio.run(main())
$ python app.py 
WebSocket接続が確立されました
connection_initメッセージを送信しました:
予期しない応答: {'payload': {'errors': [{'message': 'SubProtocolNotSupportedError', 'errorCode': 400}]}, 'type': 'connection_error'}
WebSocket接続を切断しました
SubProtocolNotSupportedError...

3. AWSコンソールのエンドポイント表示が不親切 (解決編)

ふと公式ドキュメントの方に目をやると以下のような情報が目に入りました。
ここに気づくまでに、かなりの時間を要しました。

AWS AppSync Events HTTP endpoint https://example1234567890000.appsync-api.us-east-1.amazonaws.com/event

AWS AppSync Events real-time endpoint wss://example1234567890000.appsync-realtime-api.us-east-1.amazonaws.com/event/realtime

おい待て.../event/realtimeだと...??
    import asyncio
    import base64
    import json
    import os

    import websockets

-   HTTP_ENDPOINT = "https://xxxxx.appsync-api.ap-northeast-1.amazonaws.com"
+   HTTP_ENDPOINT = "https://xxxxx.appsync-api.ap-northeast-1.amazonaws.com/event"
-   WSS_ENDPOINT = "wss://xxxxx.appsync-realtime-api.ap-northeast-1.amazonaws.com"
+   WSS_ENDPOINT = "wss://xxxxx.appsync-realtime-api.ap-northeast-1.amazonaws.com/event/realtime"
    API_KEY = "xxxxxxxxxxxx"


    class AppSyncEventClient:
        def __init__(self):
            self.http_endpoint = HTTP_ENDPOINT
            self.wss_endpoint = WSS_ENDPOINT
            self.api_key = API_KEY
            self.websocket = None

        async def connect(self):
            """AppSync Event WebSocketに接続"""
            try:
                # 認証subprotocolを作成
                auth_protocol = self.__create_auth_subprotocol()

                # WebSocket接続
                self.websocket = await websockets.connect(self.wss_endpoint, subprotocols=["aws-appsync-event-ws", auth_protocol])
                print("WebSocket接続が確立されました")
                return True
            except Exception as e:
                print(f"接続に失敗しました: {e}")
                return False

        def __create_auth_subprotocol(self):
            """認証用のサブプロトコルを作成"""
            http_host = self.http_endpoint.replace('https://', '').split('/')[0]
            auth_obj = {
                "host": http_host,
                "x-api-key": self.api_key
            }

            # Base64エンコード
            auth_json = json.dumps(auth_obj, separators=(',', ':'))
            auth_b64 = base64.b64encode(auth_json.encode()).decode()
            return f"header-{auth_b64}"

        async def disconnect(self):
            """WebSocket接続を切断"""
            if self.websocket:
                await self.websocket.close()
                print("WebSocket接続を切断しました")

        async def connection_init(self):
            """connection_init イベント発行"""
            message = {
                "type": "connection_init",
                "authorization": {
                    "x-api-key": self.api_key
                }
            }
            await self.websocket.send(json.dumps(message))
            print("connection_initメッセージを送信しました:")

            try:
                await self.__wait_for_connection_ack()
            except asyncio.TimeoutError:
                print("connection_ack待機タイムアウト")
                return False
            except Exception as e:
                print(f"connection_ack待機エラー: {e}")
                return False

        async def __wait_for_connection_ack(self):
            """connection_ack メッセージを待機"""
            try:
                response = await asyncio.wait_for(self.websocket.recv(), timeout=30.0)
                response_data = json.loads(response)

                if response_data.get("type") == "connection_ack":
                    timeout_ms = response_data.get(
                        "connectionTimeoutMs", 300000)
                    print(f"connection_ack受信 (タイムアウト: {timeout_ms}ms)")
                    return True
                else:
                    print(f"予期しない応答: {response_data}")
                    return False
            except asyncio.TimeoutError:
                print("connection_ack メッセージ待機タイムアウト")
                return None
            except Exception as e:
                print(f"connection_ack メッセージ待機エラー: {e}")
                return None


    async def main():
        client = AppSyncEventClient()
        if await client.connect():
            await client.connection_init()
            await client.disconnect()

    if __name__ == "__main__":
        asyncio.run(main())
$ python app.py 
WebSocket接続が確立されました
connection_initメッセージを送信しました:
connection_ack受信 (タイムアウト: 300000ms)
WebSocket接続を切断しました
\('ω')/ウオオオオオアアアーーーッ!
キタ━━━━(゚∀゚)━━━━!!

まとめ

ドキュメントはちゃんと読もう。その上でダッシュボードに記載のエンドポイントがもっと親切ならば...
そして、boto3 を使っているならもっと簡単に行っているはず...