Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
212 changes: 212 additions & 0 deletions tests/test_client.py
Original file line number Diff line number Diff line change
Expand Up @@ -513,6 +513,218 @@ def test_depth_limit(self, mock_ct_headers, mock_session):

assert parse_tweet_result(self.SAMPLE_TWEET_RESULT, depth=3) is None

@patch("twitter_cli.client._get_cffi_session")
@patch("twitter_cli.client._gen_ct_headers", return_value={})
def test_article_atomic_image_block_renders_markdown_image(self, mock_ct_headers, mock_session):
mock_session.return_value = MagicMock()
mock_session.return_value.get = MagicMock(side_effect=Exception("skip"))

client = TwitterClient.__new__(TwitterClient)
client._ct_init_attempted = True
client._client_transaction = None

result = copy.deepcopy(self.SAMPLE_TWEET_RESULT)
result["article"] = {
"article_results": {
"result": {
"title": "Article title",
"content_state": {
"blocks": [
{"key": "a", "type": "unstyled", "text": "Intro", "entityRanges": []},
{"key": "b", "type": "atomic", "text": " ", "entityRanges": [{"offset": 0, "length": 1, "key": 0}]},
{"key": "c", "type": "unstyled", "text": "Outro", "entityRanges": []},
],
"entityMap": {
"0": {
"type": "IMAGE",
"mutability": "IMMUTABLE",
"data": {
"caption": "A cat",
"original_url": "https://pbs.twimg.com/media/cat.jpg",
},
}
},
},
}
}
}

tweet = parse_tweet_result(result)
assert tweet is not None
assert tweet.article_title == "Article title"
assert tweet.article_text == "Intro\n\n![A cat](https://pbs.twimg.com/media/cat.jpg)\n\nOutro"

@patch("twitter_cli.client._get_cffi_session")
@patch("twitter_cli.client._gen_ct_headers", return_value={})
def test_article_atomic_image_block_supports_list_entity_map_and_media_entities(self, mock_ct_headers, mock_session):
mock_session.return_value = MagicMock()
mock_session.return_value.get = MagicMock(side_effect=Exception("skip"))

client = TwitterClient.__new__(TwitterClient)
client._ct_init_attempted = True
client._client_transaction = None

result = copy.deepcopy(self.SAMPLE_TWEET_RESULT)
result["article"] = {
"article_results": {
"result": {
"title": "Article title",
"content_state": {
"blocks": [
{"key": "a", "type": "unstyled", "text": "Intro", "entityRanges": []},
{"key": "b", "type": "atomic", "text": " ", "entityRanges": [{"offset": 0, "length": 1, "key": 2}]},
{"key": "c", "type": "unstyled", "text": "Outro", "entityRanges": []},
],
"entityMap": [
{"key": "2", "value": {"type": "MEDIA", "data": {"mediaItems": [{"mediaId": "2030504404391194624"}]}}}
],
},
"media_entities": [
{
"media_id": "2030504404391194624",
"media_info": {
"original_img_url": "https://pbs.twimg.com/media/example.png"
},
}
],
}
}
}

tweet = parse_tweet_result(result)
assert tweet is not None
assert tweet.article_text == "Intro\n\n![](https://pbs.twimg.com/media/example.png)\n\nOutro"

@patch("twitter_cli.client._get_cffi_session")
@patch("twitter_cli.client._gen_ct_headers", return_value={})
def test_article_real_shape_odysseus_like_payload_renders_two_images(self, mock_ct_headers, mock_session):
mock_session.return_value = MagicMock()
mock_session.return_value.get = MagicMock(side_effect=Exception("skip"))

client = TwitterClient.__new__(TwitterClient)
client._ct_init_attempted = True
client._client_transaction = None

result = copy.deepcopy(self.SAMPLE_TWEET_RESULT)
result["article"] = {
"article_results": {
"result": {
"title": "Harness Engineering Is Cybernetics",
"content_state": {
"blocks": [
{"key": "a", "type": "unstyled", "text": "First paragraph", "entityRanges": []},
{"key": "b", "type": "atomic", "text": " ", "entityRanges": [{"offset": 0, "length": 1, "key": 2}]},
{"key": "c", "type": "unstyled", "text": "Middle paragraph", "entityRanges": []},
{"key": "d", "type": "atomic", "text": " ", "entityRanges": [{"offset": 0, "length": 1, "key": 5}]},
{"key": "e", "type": "unstyled", "text": "Last paragraph", "entityRanges": []},
],
"entityMap": [
{"key": "5", "value": {"type": "MEDIA", "data": {"mediaItems": [{"mediaId": "2030414996266741760"}]}}},
{"key": "2", "value": {"type": "MEDIA", "data": {"mediaItems": [{"mediaId": "2030504404391194624"}]}}},
],
},
"media_entities": [
{
"media_id": "2030504404391194624",
"media_info": {
"original_img_url": "https://pbs.twimg.com/media/HC3M_2qacAA7mej.png"
},
},
{
"media_id": "2030414996266741760",
"media_info": {
"original_img_url": "https://pbs.twimg.com/media/HC17rnca8AAQgjt.jpg"
},
},
],
}
}
}

tweet = parse_tweet_result(result)
assert tweet is not None
assert tweet.article_text == (
"First paragraph\n\n"
"![](https://pbs.twimg.com/media/HC3M_2qacAA7mej.png)\n\n"
"Middle paragraph\n\n"
"![](https://pbs.twimg.com/media/HC17rnca8AAQgjt.jpg)\n\n"
"Last paragraph"
)

@patch("twitter_cli.client._get_cffi_session")
@patch("twitter_cli.client._gen_ct_headers", return_value={})
def test_article_real_shape_elvissun_like_payload_renders_caption_and_three_images(self, mock_ct_headers, mock_session):
mock_session.return_value = MagicMock()
mock_session.return_value.get = MagicMock(side_effect=Exception("skip"))

client = TwitterClient.__new__(TwitterClient)
client._ct_init_attempted = True
client._client_transaction = None

result = copy.deepcopy(self.SAMPLE_TWEET_RESULT)
result["article"] = {
"article_results": {
"result": {
"title": "OpenClaw + Codex/ClaudeCode Agent Swarm",
"content_state": {
"blocks": [
{"key": "a", "type": "unstyled", "text": "Intro", "entityRanges": []},
{"key": "b", "type": "atomic", "text": " ", "entityRanges": [{"offset": 0, "length": 1, "key": 0}]},
{"key": "c", "type": "unstyled", "text": "Diagram intro", "entityRanges": []},
{"key": "d", "type": "atomic", "text": " ", "entityRanges": [{"offset": 0, "length": 1, "key": 1}]},
{"key": "e", "type": "unstyled", "text": "Context comparison", "entityRanges": []},
{"key": "f", "type": "atomic", "text": " ", "entityRanges": [{"offset": 0, "length": 1, "key": 2}]},
],
"entityMap": [
{
"key": "0",
"value": {
"type": "MEDIA",
"data": {
"caption": "before Jan: CC/codex only | after Jan: Openclaw orchestrates CC/codex",
"mediaItems": [{"mediaId": "2025660629109895168"}],
},
},
},
{"key": "1", "value": {"type": "MEDIA", "data": {"mediaItems": [{"mediaId": "2025790010293669888"}]}}},
{"key": "2", "value": {"type": "MEDIA", "data": {"mediaItems": [{"mediaId": "2025780043406864384"}]}}},
],
},
"media_entities": [
{
"media_id": "2025660629109895168",
"media_info": {
"original_img_url": "https://pbs.twimg.com/media/HByXnBmW8AANOl9.jpg"
},
},
{
"media_id": "2025790010293669888",
"media_info": {
"original_img_url": "https://pbs.twimg.com/media/HB0NSAEW0AAYPOF.jpg"
},
},
{
"media_id": "2025780043406864384",
"media_info": {
"original_img_url": "https://pbs.twimg.com/media/HB0EN2hXcAAbGi9.png"
},
},
],
}
}
}

tweet = parse_tweet_result(result)
assert tweet is not None
assert tweet.article_text == (
"Intro\n\n"
"![before Jan: CC/codex only | after Jan: Openclaw orchestrates CC/codex](https://pbs.twimg.com/media/HByXnBmW8AANOl9.jpg)\n\n"
"Diagram intro\n\n"
"![](https://pbs.twimg.com/media/HB0NSAEW0AAYPOF.jpg)\n\n"
"Context comparison\n\n"
"![](https://pbs.twimg.com/media/HB0EN2hXcAAbGi9.png)"
)


# ── TwitterAPIError ──────────────────────────────────────────────────────

Expand Down
138 changes: 138 additions & 0 deletions twitter_cli/parser.py
Original file line number Diff line number Diff line change
Expand Up @@ -113,6 +113,139 @@ def _extract_author(user_data, user_legacy):
# ── Article parsing ──────────────────────────────────────────────────────


def _find_article_image_url(value):
# type: (Any) -> Optional[str]
"""Best-effort extraction of the original image URL from article entity data."""
if isinstance(value, dict):
for key in (
"original_img_url",
"originalImgUrl",
"original_url",
"originalUrl",
"media_url_https",
"mediaUrlHttps",
"media_url",
"mediaUrl",
"url",
"src",
"uri",
):
candidate = value.get(key)
if isinstance(candidate, str) and candidate.strip():
lowered = candidate.lower()
if (
lowered.startswith("https://pbs.twimg.com/")
or lowered.endswith((".jpg", ".jpeg", ".png", ".gif", ".webp"))
or any(ext in lowered for ext in (".jpg?", ".jpeg?", ".png?", ".gif?", ".webp?"))
):
return candidate.strip()
for nested in value.values():
found = _find_article_image_url(nested)
if found:
return found
return None
if isinstance(value, list):
for item in value:
found = _find_article_image_url(item)
if found:
return found
return None


def _find_article_caption(value):
# type: (Any) -> Optional[str]
"""Best-effort extraction of image caption/alt text from article entity data."""
if isinstance(value, dict):
for key in ("caption", "alt", "alt_text", "altText", "title", "name"):
candidate = value.get(key)
if isinstance(candidate, str) and candidate.strip():
return candidate.strip()
for nested in value.values():
found = _find_article_caption(nested)
if found:
return found
return None
if isinstance(value, list):
for item in value:
found = _find_article_caption(item)
if found:
return found
return None


def _normalize_article_entity_map(entity_map):
# type: (Any) -> Dict[str, Any]
"""Normalize Draft.js entityMap that may arrive as dict or [{key, value}, ...]."""
if isinstance(entity_map, dict):
normalized = {} # type: Dict[str, Any]
for key, value in entity_map.items():
normalized[str(key)] = value
return normalized
if isinstance(entity_map, list):
normalized = {} # type: Dict[str, Any]
for item in entity_map:
if not isinstance(item, dict):
continue
key = item.get("key")
value = item.get("value")
if key is None or value is None:
continue
normalized[str(key)] = value
return normalized
return {}


def _extract_article_media_url_map(article_results):
# type: (Dict[str, Any]) -> Dict[str, str]
"""Map article media ids/keys to original image URLs when article entities reference IDs only."""
media_url_map = {} # type: Dict[str, str]
media_candidates = [] # type: List[Any]

cover_media = article_results.get("cover_media")
if cover_media:
media_candidates.append(cover_media)
media_candidates.extend(article_results.get("media_entities") or [])

for media in media_candidates:
if not isinstance(media, dict):
continue
media_info = media.get("media_info") or {}
image_url = _find_article_image_url(media_info) or _find_article_image_url(media)
if not image_url:
continue
for key in ("media_id", "media_key", "id"):
candidate = media.get(key)
if isinstance(candidate, str) and candidate:
media_url_map[candidate] = image_url
return media_url_map


def _extract_article_images(block, entity_map, media_url_map):
# type: (Dict[str, Any], Dict[str, Any], Dict[str, str]) -> List[str]
"""Convert atomic Draft.js image entities to Markdown image lines."""
parts = [] # type: List[str]
for entity_range in block.get("entityRanges", []) or []:
if not isinstance(entity_range, dict):
continue
entity_key = entity_range.get("key")
entity = entity_map.get(str(entity_key)) if entity_key is not None else None
if not isinstance(entity, dict):
continue
image_url = _find_article_image_url(entity)
if not image_url:
media_items = _deep_get(entity, "data", "mediaItems") or []
for media_item in media_items:
media_id = media_item.get("mediaId") if isinstance(media_item, dict) else None
if isinstance(media_id, str) and media_id in media_url_map:
image_url = media_url_map[media_id]
break
if not image_url:
continue
caption = _find_article_caption(entity) or ""
parts.append("![%s](%s)" % (caption, image_url))
return parts


def _parse_article(tweet_data):
# type: (Dict[str, Any]) -> Dict[str, Any]
"""Extract Twitter Article data (long-form content) from a tweet.
Expand All @@ -130,12 +263,17 @@ def _parse_article(tweet_data):
if not blocks:
return {"article_title": title, "article_text": None}

entity_map = _normalize_article_entity_map(content_state.get("entityMap", {}))
media_url_map = _extract_article_media_url_map(article_results)

# Convert draft.js blocks to Markdown
parts = [] # type: List[str]
ordered_counter = 0
for block in blocks:
block_type = block.get("type", "unstyled") # type: str
if block_type == "atomic":
parts.extend(_extract_article_images(block, entity_map, media_url_map))
ordered_counter = 0
continue
text = block.get("text", "") # type: str
if not text:
Expand Down