-
Notifications
You must be signed in to change notification settings - Fork 2
/
google_search_api.py
105 lines (90 loc) · 4.26 KB
/
google_search_api.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
import os
import requests
from dotenv import load_dotenv
from typing import List, Dict, Optional
import logging
load_dotenv()
class SearchResults:
def __init__(self, results):
self.results = results
def __str__(self):
output = ""
for result in self.results:
output += "---\n"
output += f"Title: {result.get('title', 'Title not found')}\n"
output += f"Link: {result.get('link', 'Link not found')}\n"
if 'snippet' in result:
output += f"Snippet: {result['snippet']}\n"
if 'image' in result:
output += f"Image: {result['image']}\n"
output += "---\n"
return output
class GoogleSearchAPI:
def __init__(self):
self.base_url = "https://www.googleapis.com/customsearch/v1"
self.headers = {"Content-Type": "application/json"}
self.api_key = os.getenv('api_key')
if not self.api_key:
raise ValueError("API key not found. Set the api_key environment variable.")
self.cx = os.getenv('cx')
if not self.cx:
raise ValueError("CX (Search Engine ID) not found. Set the cx environment variable.")
def response(self, method: str, query: str, max_results: int = 10) -> SearchResults:
if not query:
raise ValueError("Query not found. Please enter a query and try again.")
query = query.replace(" ", "+")
all_results = []
total_fetched = 0
start_index = 1
while total_fetched < max_results:
num_results = min(max_results - total_fetched, 10) # API allows max 10 results at a time
search_type_param = '&searchType=image' if method == 'image' else ''
url = f"{self.base_url}?q={query}&key={self.api_key}&cx={self.cx}{search_type_param}&num={num_results}&start={start_index}"
try:
response = requests.get(url, headers=self.headers)
response.raise_for_status()
search_results = response.json().get("items", [])
all_results.extend(search_results)
if "nextPage" in response.json()["queries"]:
start_index = response.json()["queries"]["nextPage"][0]["startIndex"]
total_fetched += len(search_results)
else:
break
except requests.exceptions.RequestException as e:
logging.error(f"An error occurred: {e}")
break
formatted_results = [self.results(result, method) for result in all_results[:max_results]]
return SearchResults(formatted_results)
def results(self, result: Dict, search_type: str) -> Dict:
if search_type == 'image':
return {
"title": self.field_search(result, "title"),
"link": self.field_search(result, "image.contextLink"),
"image": self.field_search(result, "link", "image.thumbnailLink")
}
else:
return {
"link": self.field_search(result, "link", "pagemap.metatags.0.og:url"),
"title": self.field_search(result, "pagemap.metatags.0.og:title", "pagemap.metatags.0.twitter:title", "title", "pagemap.metatags.0.og:image:alt", "pagemap.metatags.0.twitter:image:alt"),
"snippet": self.field_search(result, "pagemap.metatags.0.og:description", "pagemap.metatags.0.twitter:description", "snippet")
}
def field_search(self, result: Dict, *field_paths: str) -> str:
for path in field_paths:
try:
value = result
for key in path.split('.'):
if isinstance(value, dict):
value = value.get(key, {})
elif isinstance(value, list) and key.isdigit():
index = int(key)
if index < len(value):
value = value[index]
else:
break
else:
break
if isinstance(value, str):
return value
except Exception as e:
logging.error(f"Error extracting {path}: {e}")
return "Not found"