diff --git a/tests/test_client.py b/tests/test_client.py index b26098c..a79e7d4 100644 --- a/tests/test_client.py +++ b/tests/test_client.py @@ -513,6 +513,218 @@ def test_depth_limit(self, mock_ct_headers, mock_session): assert parse_tweet_result(self.SAMPLE_TWEET_RESULT, depth=3) is None + @patch("twitter_cli.client._get_cffi_session") + @patch("twitter_cli.client._gen_ct_headers", return_value={}) + def test_article_atomic_image_block_renders_markdown_image(self, mock_ct_headers, mock_session): + mock_session.return_value = MagicMock() + mock_session.return_value.get = MagicMock(side_effect=Exception("skip")) + + client = TwitterClient.__new__(TwitterClient) + client._ct_init_attempted = True + client._client_transaction = None + + result = copy.deepcopy(self.SAMPLE_TWEET_RESULT) + result["article"] = { + "article_results": { + "result": { + "title": "Article title", + "content_state": { + "blocks": [ + {"key": "a", "type": "unstyled", "text": "Intro", "entityRanges": []}, + {"key": "b", "type": "atomic", "text": " ", "entityRanges": [{"offset": 0, "length": 1, "key": 0}]}, + {"key": "c", "type": "unstyled", "text": "Outro", "entityRanges": []}, + ], + "entityMap": { + "0": { + "type": "IMAGE", + "mutability": "IMMUTABLE", + "data": { + "caption": "A cat", + "original_url": "https://pbs.twimg.com/media/cat.jpg", + }, + } + }, + }, + } + } + } + + tweet = parse_tweet_result(result) + assert tweet is not None + assert tweet.article_title == "Article title" + assert tweet.article_text == "Intro\n\n![A cat](https://pbs.twimg.com/media/cat.jpg)\n\nOutro" + + @patch("twitter_cli.client._get_cffi_session") + @patch("twitter_cli.client._gen_ct_headers", return_value={}) + def test_article_atomic_image_block_supports_list_entity_map_and_media_entities(self, mock_ct_headers, mock_session): + mock_session.return_value = MagicMock() + mock_session.return_value.get = MagicMock(side_effect=Exception("skip")) + + client = TwitterClient.__new__(TwitterClient) + client._ct_init_attempted = True + client._client_transaction = None + + result = copy.deepcopy(self.SAMPLE_TWEET_RESULT) + result["article"] = { + "article_results": { + "result": { + "title": "Article title", + "content_state": { + "blocks": [ + {"key": "a", "type": "unstyled", "text": "Intro", "entityRanges": []}, + {"key": "b", "type": "atomic", "text": " ", "entityRanges": [{"offset": 0, "length": 1, "key": 2}]}, + {"key": "c", "type": "unstyled", "text": "Outro", "entityRanges": []}, + ], + "entityMap": [ + {"key": "2", "value": {"type": "MEDIA", "data": {"mediaItems": [{"mediaId": "2030504404391194624"}]}}} + ], + }, + "media_entities": [ + { + "media_id": "2030504404391194624", + "media_info": { + "original_img_url": "https://pbs.twimg.com/media/example.png" + }, + } + ], + } + } + } + + tweet = parse_tweet_result(result) + assert tweet is not None + assert tweet.article_text == "Intro\n\n![](https://pbs.twimg.com/media/example.png)\n\nOutro" + + @patch("twitter_cli.client._get_cffi_session") + @patch("twitter_cli.client._gen_ct_headers", return_value={}) + def test_article_real_shape_odysseus_like_payload_renders_two_images(self, mock_ct_headers, mock_session): + mock_session.return_value = MagicMock() + mock_session.return_value.get = MagicMock(side_effect=Exception("skip")) + + client = TwitterClient.__new__(TwitterClient) + client._ct_init_attempted = True + client._client_transaction = None + + result = copy.deepcopy(self.SAMPLE_TWEET_RESULT) + result["article"] = { + "article_results": { + "result": { + "title": "Harness Engineering Is Cybernetics", + "content_state": { + "blocks": [ + {"key": "a", "type": "unstyled", "text": "First paragraph", "entityRanges": []}, + {"key": "b", "type": "atomic", "text": " ", "entityRanges": [{"offset": 0, "length": 1, "key": 2}]}, + {"key": "c", "type": "unstyled", "text": "Middle paragraph", "entityRanges": []}, + {"key": "d", "type": "atomic", "text": " ", "entityRanges": [{"offset": 0, "length": 1, "key": 5}]}, + {"key": "e", "type": "unstyled", "text": "Last paragraph", "entityRanges": []}, + ], + "entityMap": [ + {"key": "5", "value": {"type": "MEDIA", "data": {"mediaItems": [{"mediaId": "2030414996266741760"}]}}}, + {"key": "2", "value": {"type": "MEDIA", "data": {"mediaItems": [{"mediaId": "2030504404391194624"}]}}}, + ], + }, + "media_entities": [ + { + "media_id": "2030504404391194624", + "media_info": { + "original_img_url": "https://pbs.twimg.com/media/HC3M_2qacAA7mej.png" + }, + }, + { + "media_id": "2030414996266741760", + "media_info": { + "original_img_url": "https://pbs.twimg.com/media/HC17rnca8AAQgjt.jpg" + }, + }, + ], + } + } + } + + tweet = parse_tweet_result(result) + assert tweet is not None + assert tweet.article_text == ( + "First paragraph\n\n" + "![](https://pbs.twimg.com/media/HC3M_2qacAA7mej.png)\n\n" + "Middle paragraph\n\n" + "![](https://pbs.twimg.com/media/HC17rnca8AAQgjt.jpg)\n\n" + "Last paragraph" + ) + + @patch("twitter_cli.client._get_cffi_session") + @patch("twitter_cli.client._gen_ct_headers", return_value={}) + def test_article_real_shape_elvissun_like_payload_renders_caption_and_three_images(self, mock_ct_headers, mock_session): + mock_session.return_value = MagicMock() + mock_session.return_value.get = MagicMock(side_effect=Exception("skip")) + + client = TwitterClient.__new__(TwitterClient) + client._ct_init_attempted = True + client._client_transaction = None + + result = copy.deepcopy(self.SAMPLE_TWEET_RESULT) + result["article"] = { + "article_results": { + "result": { + "title": "OpenClaw + Codex/ClaudeCode Agent Swarm", + "content_state": { + "blocks": [ + {"key": "a", "type": "unstyled", "text": "Intro", "entityRanges": []}, + {"key": "b", "type": "atomic", "text": " ", "entityRanges": [{"offset": 0, "length": 1, "key": 0}]}, + {"key": "c", "type": "unstyled", "text": "Diagram intro", "entityRanges": []}, + {"key": "d", "type": "atomic", "text": " ", "entityRanges": [{"offset": 0, "length": 1, "key": 1}]}, + {"key": "e", "type": "unstyled", "text": "Context comparison", "entityRanges": []}, + {"key": "f", "type": "atomic", "text": " ", "entityRanges": [{"offset": 0, "length": 1, "key": 2}]}, + ], + "entityMap": [ + { + "key": "0", + "value": { + "type": "MEDIA", + "data": { + "caption": "before Jan: CC/codex only | after Jan: Openclaw orchestrates CC/codex", + "mediaItems": [{"mediaId": "2025660629109895168"}], + }, + }, + }, + {"key": "1", "value": {"type": "MEDIA", "data": {"mediaItems": [{"mediaId": "2025790010293669888"}]}}}, + {"key": "2", "value": {"type": "MEDIA", "data": {"mediaItems": [{"mediaId": "2025780043406864384"}]}}}, + ], + }, + "media_entities": [ + { + "media_id": "2025660629109895168", + "media_info": { + "original_img_url": "https://pbs.twimg.com/media/HByXnBmW8AANOl9.jpg" + }, + }, + { + "media_id": "2025790010293669888", + "media_info": { + "original_img_url": "https://pbs.twimg.com/media/HB0NSAEW0AAYPOF.jpg" + }, + }, + { + "media_id": "2025780043406864384", + "media_info": { + "original_img_url": "https://pbs.twimg.com/media/HB0EN2hXcAAbGi9.png" + }, + }, + ], + } + } + } + + tweet = parse_tweet_result(result) + assert tweet is not None + assert tweet.article_text == ( + "Intro\n\n" + "![before Jan: CC/codex only | after Jan: Openclaw orchestrates CC/codex](https://pbs.twimg.com/media/HByXnBmW8AANOl9.jpg)\n\n" + "Diagram intro\n\n" + "![](https://pbs.twimg.com/media/HB0NSAEW0AAYPOF.jpg)\n\n" + "Context comparison\n\n" + "![](https://pbs.twimg.com/media/HB0EN2hXcAAbGi9.png)" + ) + # ── TwitterAPIError ────────────────────────────────────────────────────── diff --git a/twitter_cli/parser.py b/twitter_cli/parser.py index bb964a8..25d1fff 100644 --- a/twitter_cli/parser.py +++ b/twitter_cli/parser.py @@ -113,6 +113,139 @@ def _extract_author(user_data, user_legacy): # ── Article parsing ────────────────────────────────────────────────────── +def _find_article_image_url(value): + # type: (Any) -> Optional[str] + """Best-effort extraction of the original image URL from article entity data.""" + if isinstance(value, dict): + for key in ( + "original_img_url", + "originalImgUrl", + "original_url", + "originalUrl", + "media_url_https", + "mediaUrlHttps", + "media_url", + "mediaUrl", + "url", + "src", + "uri", + ): + candidate = value.get(key) + if isinstance(candidate, str) and candidate.strip(): + lowered = candidate.lower() + if ( + lowered.startswith("https://pbs.twimg.com/") + or lowered.endswith((".jpg", ".jpeg", ".png", ".gif", ".webp")) + or any(ext in lowered for ext in (".jpg?", ".jpeg?", ".png?", ".gif?", ".webp?")) + ): + return candidate.strip() + for nested in value.values(): + found = _find_article_image_url(nested) + if found: + return found + return None + if isinstance(value, list): + for item in value: + found = _find_article_image_url(item) + if found: + return found + return None + + +def _find_article_caption(value): + # type: (Any) -> Optional[str] + """Best-effort extraction of image caption/alt text from article entity data.""" + if isinstance(value, dict): + for key in ("caption", "alt", "alt_text", "altText", "title", "name"): + candidate = value.get(key) + if isinstance(candidate, str) and candidate.strip(): + return candidate.strip() + for nested in value.values(): + found = _find_article_caption(nested) + if found: + return found + return None + if isinstance(value, list): + for item in value: + found = _find_article_caption(item) + if found: + return found + return None + + +def _normalize_article_entity_map(entity_map): + # type: (Any) -> Dict[str, Any] + """Normalize Draft.js entityMap that may arrive as dict or [{key, value}, ...].""" + if isinstance(entity_map, dict): + normalized = {} # type: Dict[str, Any] + for key, value in entity_map.items(): + normalized[str(key)] = value + return normalized + if isinstance(entity_map, list): + normalized = {} # type: Dict[str, Any] + for item in entity_map: + if not isinstance(item, dict): + continue + key = item.get("key") + value = item.get("value") + if key is None or value is None: + continue + normalized[str(key)] = value + return normalized + return {} + + +def _extract_article_media_url_map(article_results): + # type: (Dict[str, Any]) -> Dict[str, str] + """Map article media ids/keys to original image URLs when article entities reference IDs only.""" + media_url_map = {} # type: Dict[str, str] + media_candidates = [] # type: List[Any] + + cover_media = article_results.get("cover_media") + if cover_media: + media_candidates.append(cover_media) + media_candidates.extend(article_results.get("media_entities") or []) + + for media in media_candidates: + if not isinstance(media, dict): + continue + media_info = media.get("media_info") or {} + image_url = _find_article_image_url(media_info) or _find_article_image_url(media) + if not image_url: + continue + for key in ("media_id", "media_key", "id"): + candidate = media.get(key) + if isinstance(candidate, str) and candidate: + media_url_map[candidate] = image_url + return media_url_map + + +def _extract_article_images(block, entity_map, media_url_map): + # type: (Dict[str, Any], Dict[str, Any], Dict[str, str]) -> List[str] + """Convert atomic Draft.js image entities to Markdown image lines.""" + parts = [] # type: List[str] + for entity_range in block.get("entityRanges", []) or []: + if not isinstance(entity_range, dict): + continue + entity_key = entity_range.get("key") + entity = entity_map.get(str(entity_key)) if entity_key is not None else None + if not isinstance(entity, dict): + continue + image_url = _find_article_image_url(entity) + if not image_url: + media_items = _deep_get(entity, "data", "mediaItems") or [] + for media_item in media_items: + media_id = media_item.get("mediaId") if isinstance(media_item, dict) else None + if isinstance(media_id, str) and media_id in media_url_map: + image_url = media_url_map[media_id] + break + if not image_url: + continue + caption = _find_article_caption(entity) or "" + parts.append("![%s](%s)" % (caption, image_url)) + return parts + + def _parse_article(tweet_data): # type: (Dict[str, Any]) -> Dict[str, Any] """Extract Twitter Article data (long-form content) from a tweet. @@ -130,12 +263,17 @@ def _parse_article(tweet_data): if not blocks: return {"article_title": title, "article_text": None} + entity_map = _normalize_article_entity_map(content_state.get("entityMap", {})) + media_url_map = _extract_article_media_url_map(article_results) + # Convert draft.js blocks to Markdown parts = [] # type: List[str] ordered_counter = 0 for block in blocks: block_type = block.get("type", "unstyled") # type: str if block_type == "atomic": + parts.extend(_extract_article_images(block, entity_map, media_url_map)) + ordered_counter = 0 continue text = block.get("text", "") # type: str if not text: