-
Notifications
You must be signed in to change notification settings - Fork 2
/
omnivore_client.py
140 lines (126 loc) · 4.68 KB
/
omnivore_client.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
import httpx
import logging
import json
import uuid
from typing import Optional, Dict, Any
import os
logger = logging.getLogger(__name__)
class OmnivoreClient:
def __init__(self, api_key: str):
self.api_key = api_key
self.api_url = "https://api-prod.omnivore.app/api/graphql"
self.label = os.environ.get("OMNIVORE_LABEL", "SlackSaved")
async def search_url(self, url: str) -> bool:
querystring = {
"after": "null",
"first": "10",
"query": f'url:"{url}"'
}
payload = {
"query": """
query Search($after: String, $first: Int, $query: String) {
search(after: $after, first: $first, query: $query) {
... on SearchSuccess {
edges {
node {
id
title
url
}
}
pageInfo {
hasNextPage
endCursor
totalCount
}
}
... on SearchError {
errorCodes
}
}
}
""",
"operationName": "Search"
}
headers = {
"Content-Type": "application/json",
"User-Agent": "OmnivoreClient/1.0",
"Authorization": self.api_key
}
try:
async with httpx.AsyncClient() as client:
response = await client.post(self.api_url, data=json.dumps(payload), headers=headers, params=querystring)
response.raise_for_status()
result = response.json()
logger.debug(f"Search URL response: {json.dumps(result, indent=2)}")
if "data" in result and "search" in result["data"]:
search_result = result["data"]["search"]
if "edges" in search_result:
for edge in search_result["edges"]:
if edge["node"]["url"] == url:
logger.info(f"Exact URL match found in Omnivore: {url}")
return True
logger.info(f"Exact URL not found in Omnivore: {url}")
return False
else:
logger.info(f"No search results for URL: {url}")
return False
logger.warning(f"Unexpected search result format for URL: {url}")
return False
except httpx.HTTPStatusError as e:
logger.error(f"HTTP error occurred while searching Omnivore: {e}")
raise
except Exception as e:
logger.error(f"An error occurred while searching Omnivore: {str(e)}")
raise
async def save_url(self, url: str) -> Optional[Dict[str, Any]]:
url = url.rstrip('>') if url else url
# Check if the URL already exists
if await self.search_url(url):
logger.info(f"URL already exists, skipping save: {url}")
return None
payload = {
"query": """
mutation SaveUrl($input: SaveUrlInput!) {
saveUrl(input: $input) {
... on SaveSuccess {
url
clientRequestId
}
... on SaveError {
errorCodes
message
}
}
}
""",
"variables": {
"input": {
"clientRequestId": str(uuid.uuid4()),
"source": "api",
"url": url,
"labels": [{"name": self.label}]
}
}
}
headers = {
"Content-Type": "application/json",
"Authorization": self.api_key
}
try:
async with httpx.AsyncClient() as client:
response = await client.post(self.api_url, json=payload, headers=headers)
response.raise_for_status()
result = response.json()
if "data" in result and isinstance(result["data"], dict):
logger.info(f"Successfully saved URL to Omnivore: {url}")
return result
else:
logger.error("Unexpected response format from Omnivore API")
return None
except httpx.HTTPStatusError as e:
logger.error(f"HTTP error occurred while saving to Omnivore: {e}")
raise
except Exception as e:
logger.error(f"An error occurred while saving to Omnivore: {str(e)}")
raise