Skip to content

Commit

Permalink
basic scripts and utils for openalex querying
Browse files Browse the repository at this point in the history
  • Loading branch information
MaximilianRoessler committed Aug 20, 2024
1 parent e3b3dfa commit 0a6e188
Show file tree
Hide file tree
Showing 4 changed files with 341 additions and 0 deletions.
57 changes: 57 additions & 0 deletions config.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,57 @@
api:
base_url: "https://api.openalex.org/works?"
per_page: 200

logging:
level: INFO
format: '%(asctime)s - %(levelname)s - %(message)s'
file: 'output/gcr_research.log'

query_sets:
- name: combined_gcr_query
query_name: "Combined GCR and Resilience Query"
url: "https://api.openalex.org/works?filter=default.search:((%22global+catastrophic+risk%22+OR+%22existential+risk%22)+AND+(resilience+OR+mitigation)+AND+(pandemic+OR+nuclear+OR+volcano+OR+famine))+OR+((resilience+OR+mitigation)+AND+(%22nuclear+winter%22+OR+%22coronal+mass+ejection%22+OR+%22abrupt+sunlight+reduction%22+OR+%22extreme+pandemic%22))"

- name: ("global catastrophic risk" OR "existential risk") AND ((resilience) OR (mitigation)) AND (pandemic)
query_name: "GCR Pandemic"
url: "https://api.openalex.org/works?filter=default.search:(%22global+catastrophic+risk%22+OR+%22existential+risk%22)+AND+((resilience)+OR+(mitigation))+AND+(pandemic)"

- name: ("global catastrophic risk" OR "existential risk") AND ((resilience) OR (mitigation)) AND (nuclear)
query_name: "GCR Nuclear"
url: "https://api.openalex.org/works?filter=default.search:(%22global+catastrophic+risk%22+OR+%22existential+risk%22)+AND+((resilience)+OR+(mitigation))+AND+(nuclear)"

- name: ("global catastrophic risk" OR "existential risk") AND ((resilience) OR (mitigation)) AND (volcano)
query_name: "GCR Volcano"
url: "https://api.openalex.org/works?filter=default.search:(%22global+catastrophic+risk%22+OR+%22existential+risk%22)+AND+((resilience)+OR+(mitigation))+AND+(volcano)"

- name: ("global catastrophic risk" OR "existential risk") AND ((resilience) OR (mitigation)) AND (famine)
query_name: "GCR Famine"
url: "https://api.openalex.org/works?filter=default.search:(%22global+catastrophic+risk%22+OR+%22existential+risk%22)+AND+((resilience)+OR+(mitigation))+AND+(famine)"

- name: ((resilience) OR (mitigation)) AND ("nuclear winter")
query_name: "Non-GCR Nuclear Winter"
url: "https://api.openalex.org/works?filter=default.search:((resilience)+OR+(mitigation))+AND+(%22nuclear+winter%22)"

- name: ((resilience) OR (mitigation)) AND ("coronal mass ejection")
query_name: "Non-GCR Coronal Mass Ejection"
url: "https://api.openalex.org/works?filter=default.search:((resilience)+OR+(mitigation))+AND+(%22coronal+mass+ejection%22)"

- name: ((resilience) OR (mitigation)) AND ("abrupt sunlight reduction")
query_name: "Non-GCR Abrupt Sunlight Reduction"
url: "https://api.openalex.org/works?filter=default.search:((resilience)+OR+(mitigation))+AND+(%22abrupt+sunlight+reduction%22)"

- name: ((resilience) OR (mitigation)) AND ("geomagnetic storm")
query_name: "Non-GCR geomagnetic storm"
url: "https://api.openalex.org/works?filter=default.search:((resilience)+OR+(mitigation))+AND+(%22geomagnetic+storm%22)"

- name: ((resilience) OR (mitigation)) AND ("extreme pandemic")
query_name: "Non-GCR Extreme Pandemic"
url: "https://api.openalex.org/works?filter=default.search:((resilience)+OR+(mitigation))+AND+(%22extreme+pandemic%22)"

- name: ((resilience) OR (mitigation)) AND ("global catastropic biological risk")
query_name: "Non-GCR Global Catastrophic Biological Risk"
url: "https://api.openalex.org/works?filter=default.search:((resilience)+OR+(mitigation))+AND+(%22global+catastrophic+biological+risk%22)"

output:
directory: 'output'
ris_file: 'all_results.ris'
127 changes: 127 additions & 0 deletions src/main.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,127 @@
"""Main script for processing OpenAlex queries for GCR research."""

import os
import argparse
from collections import defaultdict
import yaml
from query_processor import QueryProcessor
from utils import setup_logging, save_to_ris, analyze_symmetric_difference

def analyze_results(all_results):

Check failure on line 10 in src/main.py

View workflow job for this annotation

GitHub Actions / Flake8

src/main.py#L10

Expected 2 blank lines, found 1 (E302)
"""
Analyze query results to count article occurrences across different searches.
Args:
all_results (dict): Dictionary of DataFrames containing query results.
Returns:
tuple: Total unique articles, articles in multiple searches, articles in single search.
"""
article_occurrences = defaultdict(list)
for query_name, df in all_results.items():
for _, row in df.iterrows():
article_occurrences[row['id']].append(query_name)
multiple_searches = [
id for id, queries in article_occurrences.items() if len(queries) > 1
]
single_search = [
id for id, queries in article_occurrences.items() if len(queries) == 1
]
return len(article_occurrences), len(multiple_searches), len(single_search)

def main():

Check failure on line 32 in src/main.py

View workflow job for this annotation

GitHub Actions / Flake8

src/main.py#L32

Expected 2 blank lines, found 1 (E302)
"""Process OpenAlex queries for GCR research based on configuration."""
parser = argparse.ArgumentParser(description="Process OpenAlex queries for GCR research.")
parser.add_argument('--config', default='config/config.yml', help='Path to config file')
parser.add_argument('--symmetric-difference', nargs=2, metavar=('QUERY1', 'QUERY2'),
help='Compute symmetric difference between two queries')
parser.add_argument('--force-refresh', action='store_true',
help='Force refresh all queries, ignoring cache')
args = parser.parse_args()

with open(args.config, 'r', encoding='utf-8') as f:
config = yaml.safe_load(f)

logger = setup_logging(config['logging'])

output_dir = config['output']['directory']
os.makedirs(output_dir, exist_ok=True)

processor = QueryProcessor(config['api'], logger, output_dir)

all_results = {}
all_articles = []

for query_set in config['query_sets']:
logger.info("Processing query: %s", query_set['query_name'])
if args.force_refresh:
results = processor.fetch_all_data(query_set['url'], query_set['name'])
else:
results = processor.load_from_cache(query_set['name'])
if results is None:
results = processor.fetch_all_data(query_set['url'], query_set['name'])
all_results[query_set['query_name']] = results
all_articles.extend(results.to_dict('records'))
logger.info("Number of articles for %s: %d", query_set['query_name'], len(results))

total_articles, multiple_searches, single_search = analyze_results(all_results)

logger.info("Total unique articles: %d", total_articles)
logger.info("Articles appearing in multiple searches: %d", multiple_searches)
logger.info("Articles appearing in only one search: %d", single_search)

save_to_ris(all_articles, os.path.join(output_dir, config['output']['ris_file']))
logger.info("All results saved to RIS file: %s", config['output']['ris_file'])

if args.symmetric_difference:
query1, query2 = args.symmetric_difference
if query1 in all_results and query2 in all_results:
report, df1_only, df2_only = analyze_symmetric_difference(
all_results[query1], all_results[query2], query1, query2, output_dir
)
logger.info("\nSymmetric Difference Analysis:\n%s", report)
# Output overview for symmetric difference
logger.info("\nOverview of articles unique to each query:")
logger.info("Articles unique to %s:", query1)
for _, row in df1_only.iterrows():
logger.info("Title: %s", row['title'])
logger.info("Authors: %s", row['authors'])
logger.info("Year: %d", row['publication_year'])
logger.info("DOI: %s", row['doi'])
logger.info("---")
logger.info("Articles unique to %s:", query2)
for _, row in df2_only.iterrows():
logger.info("Title: %s", row['title'])
logger.info("Authors: %s", row['authors'])
logger.info("Year: %d", row['publication_year'])
logger.info("DOI: %s", row['doi'])
logger.info("---")
report_file = os.path.join(
output_dir, f"symmetric_difference_{query1}_{query2}_report.txt"
)
with open(report_file, 'w', encoding='utf-8') as f:
f.write(report)
f.write("\n\nOverview of articles unique to each query:\n")
f.write(f"\nArticles unique to {query1}:\n")
for _, row in df1_only.iterrows():
f.write(f"Title: {row['title']}\n")
f.write(f"Authors: {row['authors']}\n")
f.write(f"Year: {row['publication_year']}\n")
f.write(f"DOI: {row['doi']}\n")
f.write("---\n")
f.write(f"\nArticles unique to {query2}:\n")
for _, row in df2_only.iterrows():
f.write(f"Title: {row['title']}\n")
f.write(f"Authors: {row['authors']}\n")
f.write(f"Year: {row['publication_year']}\n")
f.write(f"DOI: {row['doi']}\n")
f.write("---\n")
else:
logger.error("One or both of the specified queries for symmetric "
"difference are not found.")

logger.info("All queries processed.")

if __name__ == "__main__":

Check failure on line 125 in src/main.py

View workflow job for this annotation

GitHub Actions / Flake8

src/main.py#L125

Expected 2 blank lines after class or function definition, found 1 (E305)
main()

Check failure on line 127 in src/main.py

View workflow job for this annotation

GitHub Actions / Flake8

src/main.py#L127

Blank line contains whitespace (W293)

Check failure on line 127 in src/main.py

View workflow job for this annotation

GitHub Actions / Flake8

src/main.py#L127

No newline at end of file (W292)
92 changes: 92 additions & 0 deletions src/query_processor.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,92 @@
"""Module for processing OpenAlex queries."""

import os
from typing import List, Dict, Any
from urllib.parse import urlparse, parse_qs, urlencode, urlunparse
import requests
import pandas as pd

class QueryProcessor:

Check failure on line 9 in src/query_processor.py

View workflow job for this annotation

GitHub Actions / Flake8

src/query_processor.py#L9

Expected 2 blank lines, found 1 (E302)
"""Processes API queries, fetches results, and manages caching."""

def __init__(self, api_config: Dict[str, Any], logger, output_dir: str):
self.per_page = api_config['per_page']
self.logger = logger
self.output_dir = output_dir

def get_page(self, url: str) -> tuple[List[Dict[str, Any]], int, str]:
"""Fetch a single page of results from the API."""
response = requests.get(url, timeout=30) # Add a 30-second timeout
response.raise_for_status()
data = response.json()
total_results = data['meta']['count']
results = data.get('results', [])
next_cursor = data['meta'].get('next_cursor')
return results, total_results, next_cursor

def process_results(self, results: List[Dict[str, Any]]) -> List[Dict[str, Any]]:
"""Extract relevant information from API results."""
processed = []
for result in results:
authors = [author['author']['display_name'] for author in result.get('authorships', [])]
processed.append({
'id': result['id'],
'title': result['title'],
'authors': ', '.join(authors),
'publication_year': result['publication_year'],
'journal': result.get('host_venue', {}).get('display_name', ''),
'doi': result.get('doi', ''),
})
return processed

def update_url_with_cursor(self, url: str, cursor: str) -> str:
"""Update the URL with a new cursor value."""
parsed_url = urlparse(url)
query_params = parse_qs(parsed_url.query)
query_params['cursor'] = [cursor]
query_params['per-page'] = [str(self.per_page)]
new_query = urlencode(query_params, doseq=True)
return urlunparse(parsed_url._replace(query=new_query))

def get_cache_filename(self, query_name: str) -> str:
"""Generate a filename for caching based on the query name."""
return os.path.join(self.output_dir, f"{query_name}_results.csv")

def load_from_cache(self, query_name: str) -> pd.DataFrame:
"""Load results from a cached CSV file."""
cache_file = self.get_cache_filename(query_name)
if os.path.exists(cache_file):
self.logger.info(f"Loading cached results for {query_name}")
return pd.read_csv(cache_file)
return None

def save_to_cache(self, df: pd.DataFrame, query_name: str):
"""Save results to a CSV file for caching."""
cache_file = self.get_cache_filename(query_name)
df.to_csv(cache_file, index=False)
self.logger.info(f"Cached results saved for {query_name}")

def fetch_all_data(self, url: str, query_name: str) -> pd.DataFrame:
"""Fetch all pages of results for given URL, using cache if available."""
cached_results = self.load_from_cache(query_name)
if cached_results is not None:
return cached_results

all_results = []
cursor = '*'
while cursor:
current_url = self.update_url_with_cursor(url, cursor)
self.logger.info(f"Fetching URL: {current_url}")
results, total_count, next_cursor = self.get_page(current_url)
if not results:
break
processed_results = self.process_results(results)
all_results.extend(processed_results)
self.logger.info(f"Fetched {len(all_results)} out of {total_count} results")
cursor = next_cursor
if not cursor:
break
df = pd.DataFrame(all_results)
self.save_to_cache(df, query_name)
return df

Check failure on line 92 in src/query_processor.py

View workflow job for this annotation

GitHub Actions / Flake8

src/query_processor.py#L92

Blank line contains whitespace (W293)

Check failure on line 92 in src/query_processor.py

View workflow job for this annotation

GitHub Actions / Flake8

src/query_processor.py#L92

No newline at end of file (W292)
65 changes: 65 additions & 0 deletions src/utils.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,65 @@
"""Utility functions for data processing and file operations."""

import logging
from typing import Dict, Any, List, Tuple
import pandas as pd

def setup_logging(config: Dict[str, Any]) -> logging.Logger:

Check failure on line 7 in src/utils.py

View workflow job for this annotation

GitHub Actions / Flake8

src/utils.py#L7

Expected 2 blank lines, found 1 (E302)
"""Set up logging based on configuration."""
logging.basicConfig(
level=config['level'],
format=config['format'],
filename=config['file']
)
return logging.getLogger(__name__)

def save_results(df: pd.DataFrame, filename: str):

Check failure on line 16 in src/utils.py

View workflow job for this annotation

GitHub Actions / Flake8

src/utils.py#L16

Expected 2 blank lines, found 1 (E302)
"""Save results to a CSV file."""
df.to_csv(filename, index=False)
print(f"Results saved to {filename}")

def save_to_ris(articles: List[Dict[str, Any]], filename: str):

Check failure on line 21 in src/utils.py

View workflow job for this annotation

GitHub Actions / Flake8

src/utils.py#L21

Expected 2 blank lines, found 1 (E302)
"""Save all articles to a single RIS file."""
with open(filename, 'w', encoding='utf-8') as f:
for article in articles:
f.write("TY - JOUR\n")
f.write(f"TI - {article['title']}\n")
# Handle 'authors' field
authors = article.get('authors', '')
if isinstance(authors, str):
for author in authors.split(', '):
f.write(f"AU - {author}\n")
elif isinstance(authors, list):
for author in authors:
f.write(f"AU - {author}\n")
f.write(f"PY - {article['publication_year']}\n")
f.write(f"JO - {article['journal']}\n")
f.write(f"DO - {article['doi']}\n")
f.write(f"UR - https://doi.org/{article['doi']}\n")
f.write(f"ID - {article['id']}\n")
f.write("ER - \n\n")
print(f"All results saved to RIS file: {filename}")

def compute_symmetric_difference(

Check failure on line 43 in src/utils.py

View workflow job for this annotation

GitHub Actions / Flake8

src/utils.py#L43

Expected 2 blank lines, found 1 (E302)
df1: pd.DataFrame, df2: pd.DataFrame
) -> Tuple[pd.DataFrame, pd.DataFrame]:
"""Compute the symmetric difference between two DataFrames."""
df1_only = df1[~df1['id'].isin(df2['id'])]
df2_only = df2[~df2['id'].isin(df1['id'])]
return df1_only, df2_only

def analyze_symmetric_difference(

Check failure on line 51 in src/utils.py

View workflow job for this annotation

GitHub Actions / Flake8

src/utils.py#L51

Expected 2 blank lines, found 1 (E302)
df1: pd.DataFrame, df2: pd.DataFrame, name1: str, name2: str, output_dir: str
) -> Tuple[str, pd.DataFrame, pd.DataFrame]:
"""Analyze and return a string report of the symmetric difference between two DataFrames."""
df1_only, df2_only = compute_symmetric_difference(df1, df2)
report = f"Symmetric Difference Analysis between {name1} and {name2}:\n"
report += f"Total articles in {name1}: {len(df1)}\n"
report += f"Total articles in {name2}: {len(df2)}\n"
report += f"Articles unique to {name1}: {len(df1_only)}\n"
report += f"Articles unique to {name2}: {len(df2_only)}\n"
report += f"Articles in common: {len(df1) - len(df1_only)}\n"
# Save the symmetric differences
save_results(df1_only, f"{output_dir}/symmetric_difference_{name1}_only.csv")
save_results(df2_only, f"{output_dir}/symmetric_difference_{name2}_only.csv")
return report, df1_only, df2_only

0 comments on commit 0a6e188

Please sign in to comment.