ご無沙汰しております!
Obsidian にメモをまとめていますが、書かなくなってしまいました!w
プロフィールにはこう書いてあります。
調べたものは後でまとめようとメモしているのですが、メモ止まりで全然やらないので調べたときに書こうと思います。
このブログの目的が果たせなくなっていますw
さて、本日は、これはメッチャハマった!!!という内容で、
これは残しておかねば・・・と思ったものになります。
はじめに
AWS AppSync Events は、GraphQLを使わずにリアルタイム通信を実現できるAWSの新しいサービスです。従来のAppSyncがGraphQLベースだったのに対し、AppSync Eventsはより軽量でシンプルなWebSocket接続を提供します。
しかし、実際にPythonでBoto3を使わずにWebSocket接続を試みたところ、AWSコンソールの表示や公式ドキュメントの断片的な情報だけでは分からない複数のトラップにハマりました。本記事では、これらの罠とその解決策を共有します。
1. AWSコンソールのエンドポイント表示が不親切 (序章)
AWS AppSync のダッシュボードには以下のように記載されています。
名前 | API タイプ | HTTP エンドポイント | リアルタイムエンドポイント | API ID | プライマリ認証モード | 作成済み |
---|---|---|---|---|---|---|
my-event-api | イベント | https://xxxxx.appsync-api.ap-northeast-1.amazonaws.com | wss://xxxxx.appsync-realtime-api.ap-northeast-1.amazonaws.com | xxxxxxxxxxxxxxxxxxxxxxxxxx | API_KEY | YYYY/MM/DD HH:mm TZ |
まぁ、これだけ見ると wss://xxxxx.appsync-realtime-api.ap-northeast-1.amazonaws.com
にアクセスすれば良さそうですよね...
実際、以下のコードを実行すると接続できます。
import asyncio import os import websockets WSS_ENDPOINT = "wss://xxxxx.appsync-realtime-api.ap-northeast-1.amazonaws.com" API_KEY = "xxxxxxxxxxxx" class AppSyncEventClient: def __init__(self): self.wss_endpoint = WSS_ENDPOINT self.api_key = API_KEY self.websocket = None async def connect(self): """AppSync Event WebSocketに接続""" try: # WebSocket接続 self.websocket = await websockets.connect(self.wss_endpoint) print("WebSocket接続が確立されました") return True except Exception as e: print(f"接続に失敗しました: {e}") return False async def disconnect(self): """WebSocket接続を切断""" if self.websocket: await self.websocket.close() print("WebSocket接続を切断しました") async def main(): client = AppSyncEventClient() if await client.connect(): await client.disconnect() if __name__ == "__main__": asyncio.run(main())
$ python app.py WebSocket接続が確立されました WebSocket接続を切断しました
しかし、次のイベント送信で失敗してしまいます。
connection_init イベント
コードを少し書き換えて connection_init
イベントを送信してみます。
import asyncio import json import os WSS_ENDPOINT = "wss://xxxxx.appsync-realtime-api.ap-northeast-1.amazonaws.com" API_KEY = "xxxxxxxxxxxx" class AppSyncEventClient: def __init__(self): self.wss_endpoint = WSS_ENDPOINT self.api_key = API_KEY self.websocket = None async def connect(self): """AppSync Event WebSocketに接続""" try: # WebSocket接続 self.websocket = await websockets.connect(self.wss_endpoint) print("WebSocket接続が確立されました") return True except Exception as e: print(f"接続に失敗しました: {e}") return False async def disconnect(self): """WebSocket接続を切断""" if self.websocket: await self.websocket.close() print("WebSocket接続を切断しました") + async def connection_init(self): + """connection_init イベント発行""" + message = { + "type": "connection_init", + "authorization": { + "x-api-key": self.api_key + } + } + await self.websocket.send(json.dumps(message)) + print("connection_initメッセージを送信しました:") + + try: + await self.__wait_for_connection_ack() + except asyncio.TimeoutError: + print("connection_ack待機タイムアウト") + return False + except Exception as e: + print(f"connection_ack待機エラー: {e}") + return False + + async def __wait_for_connection_ack(self): + """connection_ack メッセージを待機""" + print("connection_initメッセージを送信しました:") + try: + response = await asyncio.wait_for(self.websocket.recv(), timeout=30.0) + response_data = json.loads(response) + if response_data.get("type") == "connection_ack": + timeout_ms = response_data.get("connectionTimeoutMs", 300000) + print(f"connection_ack受信 (タイムアウト: {timeout_ms}ms)") + return True + else: + print(f"予期しない応答: {response_data}") + return False + except asyncio.TimeoutError: + print("connection_ack メッセージ待機タイムアウト") + return None + except Exception as e: + print(f"connection_ack メッセージ待機エラー: {e}") + return None async def main(): client = AppSyncEventClient() if await client.connect(): + await client.connection_init() await client.disconnect() if __name__ == "__main__": asyncio.run(main())
$ python app.py WebSocket接続が確立されました connection_initメッセージを送信しました: 予期しない応答: {'payload': {'errors': [{'message': 'NoProtocolError', 'errorCode': 400}]}, 'type': 'connection_error'} WebSocket接続を切断しました
ここで分かったのが サブプロトコル という存在です。
確かに Chrome の DevTool を確認しても以下のような情報が入っていますね。
sec-websocket-protocol aws-appsync-event-ws, header-{BASE64_ENCODE_STRING}
2. サブプロトコルでの認証が必要
こちらに従い、サブプロトコルを設定してみます。
import asyncio import base64 import json import os import websockets + HTTP_ENDPOINT = "https://xxxxx.appsync-api.ap-northeast-1.amazonaws.com" WSS_ENDPOINT = "wss://xxxxx.appsync-realtime-api.ap-northeast-1.amazonaws.com" API_KEY = "xxxxxxxxxxxx" class AppSyncEventClient: def __init__(self): self.http_endpoint = HTTP_ENDPOINT self.wss_endpoint = WSS_ENDPOINT self.api_key = API_KEY self.websocket = None async def connect(self): """AppSync Event WebSocketに接続""" try: + # 認証subprotocolを作成 + auth_protocol = self.__create_auth_subprotocol() + # WebSocket接続 - self.websocket = await websockets.connect(self.wss_endpoint) + self.websocket = await websockets.connect(self.wss_endpoint, subprotocols=["aws-appsync-event-ws", auth_protocol]) print("WebSocket接続が確立されました") return True except Exception as e: print(f"接続に失敗しました: {e}") return False + def __create_auth_subprotocol(self): + """認証用のサブプロトコルを作成""" + http_host = self.http_endpoint.replace('https://', '').split('/')[0] + auth_obj = { + "host": http_host, + "x-api-key": self.api_key + } + # Base64エンコード + auth_json = json.dumps(auth_obj, separators=(',', ':')) + auth_b64 = base64.b64encode(auth_json.encode()).decode() + return f"header-{auth_b64}" async def disconnect(self): """WebSocket接続を切断""" if self.websocket: await self.websocket.close() print("WebSocket接続を切断しました") async def connection_init(self): """connection_init イベント発行""" message = { "type": "connection_init", "authorization": { "x-api-key": self.api_key } } await self.websocket.send(json.dumps(message)) print("connection_initメッセージを送信しました:") try: await self.__wait_for_connection_ack() except asyncio.TimeoutError: print("connection_ack待機タイムアウト") return False except Exception as e: print(f"connection_ack待機エラー: {e}") return False async def __wait_for_connection_ack(self): """connection_ack メッセージを待機""" try: response = await asyncio.wait_for(self.websocket.recv(), timeout=30.0) response_data = json.loads(response) if response_data.get("type") == "connection_ack": timeout_ms = response_data.get( "connectionTimeoutMs", 300000) print(f"connection_ack受信 (タイムアウト: {timeout_ms}ms)") return True else: print(f"予期しない応答: {response_data}") return False except asyncio.TimeoutError: print("connection_ack メッセージ待機タイムアウト") return None except Exception as e: print(f"connection_ack メッセージ待機エラー: {e}") return None async def main(): client = AppSyncEventClient() if await client.connect(): await client.connection_init() await client.disconnect() if __name__ == "__main__": asyncio.run(main())
$ python app.py WebSocket接続が確立されました connection_initメッセージを送信しました: 予期しない応答: {'payload': {'errors': [{'message': 'SubProtocolNotSupportedError', 'errorCode': 400}]}, 'type': 'connection_error'} WebSocket接続を切断しました
3. AWSコンソールのエンドポイント表示が不親切 (解決編)
ふと公式ドキュメントの方に目をやると以下のような情報が目に入りました。
ここに気づくまでに、かなりの時間を要しました。
AWS AppSync Events HTTP endpoint https://example1234567890000.appsync-api.us-east-1.amazonaws.com/event
AWS AppSync Events real-time endpoint wss://example1234567890000.appsync-realtime-api.us-east-1.amazonaws.com/event/realtime
/event/realtime
だと...??
import asyncio import base64 import json import os import websockets - HTTP_ENDPOINT = "https://xxxxx.appsync-api.ap-northeast-1.amazonaws.com" + HTTP_ENDPOINT = "https://xxxxx.appsync-api.ap-northeast-1.amazonaws.com/event" - WSS_ENDPOINT = "wss://xxxxx.appsync-realtime-api.ap-northeast-1.amazonaws.com" + WSS_ENDPOINT = "wss://xxxxx.appsync-realtime-api.ap-northeast-1.amazonaws.com/event/realtime" API_KEY = "xxxxxxxxxxxx" class AppSyncEventClient: def __init__(self): self.http_endpoint = HTTP_ENDPOINT self.wss_endpoint = WSS_ENDPOINT self.api_key = API_KEY self.websocket = None async def connect(self): """AppSync Event WebSocketに接続""" try: # 認証subprotocolを作成 auth_protocol = self.__create_auth_subprotocol() # WebSocket接続 self.websocket = await websockets.connect(self.wss_endpoint, subprotocols=["aws-appsync-event-ws", auth_protocol]) print("WebSocket接続が確立されました") return True except Exception as e: print(f"接続に失敗しました: {e}") return False def __create_auth_subprotocol(self): """認証用のサブプロトコルを作成""" http_host = self.http_endpoint.replace('https://', '').split('/')[0] auth_obj = { "host": http_host, "x-api-key": self.api_key } # Base64エンコード auth_json = json.dumps(auth_obj, separators=(',', ':')) auth_b64 = base64.b64encode(auth_json.encode()).decode() return f"header-{auth_b64}" async def disconnect(self): """WebSocket接続を切断""" if self.websocket: await self.websocket.close() print("WebSocket接続を切断しました") async def connection_init(self): """connection_init イベント発行""" message = { "type": "connection_init", "authorization": { "x-api-key": self.api_key } } await self.websocket.send(json.dumps(message)) print("connection_initメッセージを送信しました:") try: await self.__wait_for_connection_ack() except asyncio.TimeoutError: print("connection_ack待機タイムアウト") return False except Exception as e: print(f"connection_ack待機エラー: {e}") return False async def __wait_for_connection_ack(self): """connection_ack メッセージを待機""" try: response = await asyncio.wait_for(self.websocket.recv(), timeout=30.0) response_data = json.loads(response) if response_data.get("type") == "connection_ack": timeout_ms = response_data.get( "connectionTimeoutMs", 300000) print(f"connection_ack受信 (タイムアウト: {timeout_ms}ms)") return True else: print(f"予期しない応答: {response_data}") return False except asyncio.TimeoutError: print("connection_ack メッセージ待機タイムアウト") return None except Exception as e: print(f"connection_ack メッセージ待機エラー: {e}") return None async def main(): client = AppSyncEventClient() if await client.connect(): await client.connection_init() await client.disconnect() if __name__ == "__main__": asyncio.run(main())
$ python app.py WebSocket接続が確立されました connection_initメッセージを送信しました: connection_ack受信 (タイムアウト: 300000ms) WebSocket接続を切断しました
キタ━━━━(゚∀゚)━━━━!!