import datetime
import time
from typing import List, Dict, Any
from newscatcher import NewscatcherApi
from newscatcher.core.api_error import ApiError
import os
from dotenv import load_dotenv
load_dotenv()
API_KEY = os.getenv("NEWSCATCHER_API_KEY")
import datetime
import time
import json
from typing import List, Dict, Any, Optional
from newscatcher import NewscatcherApi
from newscatcher.core.api_error import ApiError
def retrieve_week_of_data(
client: NewscatcherApi,
query: str,
start_date: datetime.datetime,
end_date: datetime.datetime,
output_file: Optional[str] = None,
) -> List[Dict[str, Any]]:
"""
Retrieve a week of historical data using daily aggregation.
Args:
client: Configured NewscatcherApi client instance
query: Search query string
start_date: Start date for the week
end_date: End date for the week
output_file: Filename for JSON output (without .json extension)
Returns:
List of all articles retrieved for the entire week
"""
results = []
# Step 1: Get daily data volumes
try:
aggregation_response = client.aggregation.post(
q=query,
from_=start_date,
to=end_date,
aggregation_by="day",
lang=["en"],
)
# Log daily volumes for planning
if aggregation_response.aggregations:
print("Daily data volumes:")
aggregation_data = aggregation_response.aggregations[0]["aggregation_count"]
for day_data in aggregation_data:
print(
f" {day_data['time_frame']}: {day_data['article_count']} articles"
)
print(f"Total articles expected: {aggregation_response.total_hits}")
except ApiError as e:
print(f"Error getting aggregation data: {e.status_code} - {e.body}")
return results
# Step 2: Process each day in the week
current_date = start_date.date()
end_date_only = end_date.date()
while current_date <= end_date_only:
# Set time bounds for the current day
day_start = datetime.datetime.combine(current_date, datetime.time.min)
day_end = datetime.datetime.combine(current_date, datetime.time.max)
print(f"Processing {current_date}")
current_page = 1
total_pages = 1
daily_articles = 0
# Step 3: Paginate through the day's data
while current_page <= total_pages:
try:
response = client.search.post(
q=query,
from_=day_start,
to=day_end,
lang=["en"],
page=current_page,
page_size=100,
)
# Add articles to results (store as original JSON/dict)
if response.articles:
# Convert to dict to ensure JSON serialization works
for article in response.articles:
results.append(article.__dict__)
daily_articles += len(response.articles)
# Get pagination info from response
total_pages = response.total_pages or 1
current_page += 1
print(f" Retrieved page {current_page - 1} of {total_pages}")
# Add delay between requests to respect rate limits
time.sleep(1)
except ApiError as e:
if e.status_code == 408:
print(
" Request timeout. The time window might contain too many articles."
)
# For daily windows, this is less likely, but could divide into hours if needed
elif e.status_code == 429:
print(" Rate limit hit. Waiting longer...")
time.sleep(5)
continue # Retry the same page
else:
print(f" API Error ({e.status_code}): {e.body}")
# Break the pagination loop on non-recoverable errors
break
except Exception as e:
print(f" Unexpected error: {e}")
break
print(
f" Completed {current_date}, retrieved {daily_articles} articles for this day"
)
print(f" Total articles so far: {len(results)}")
# Move to next day
current_date += datetime.timedelta(days=1)
# Save results if output file specified
if output_file and results:
save_articles_to_json(results, output_file)
return results
def save_articles_to_json(articles: List[Dict[str, Any]], filename: str):
"""Save articles array to JSON file."""
json_filename = f"{filename}.json"
with open(json_filename, "w", encoding="utf-8") as f:
json.dump(articles, f, indent=2, ensure_ascii=False, default=str)
print(f"Saved {len(articles)} articles to {json_filename}")
def main():
"""Test the weekly data retrieval function with transport strike query."""
client = NewscatcherApi(api_key=API_KEY)
# Define the test week (adjust dates as needed)
start_date = datetime.datetime(2025, 5, 15)
end_date = datetime.datetime(2025, 5, 22)
# Your complex transport strike query
query = '(airport OR "freight port" OR train) AND (strike OR "union protest" OR "planned closure" OR "worker dispute") AND NOT (past OR historical OR ended)'
try:
print(
f"Testing weekly data retrieval from {start_date.date()} to {end_date.date()}"
)
print(f"Query: {query}")
print("=" * 80)
# Generate output filename based on date range
output_file = f"transport_strikes_{start_date.strftime('%Y%m%d')}_{end_date.strftime('%Y%m%d')}"
articles = retrieve_week_of_data(
client, query, start_date, end_date, output_file=output_file
)
print("=" * 80)
print(f"SUCCESS: Retrieved {len(articles)} articles total")
# Optional: Show some sample data
if articles:
print("\nSample articles:")
for i, article in enumerate(articles[:3]): # Show first 3 articles
print(f"{i+1}. {article.get('title', 'No title')}")
print(f" Published: {article.get('published_date', 'Unknown date')}")
print(f" Source: {article.get('name_source', 'Unknown source')}")
print(f" URL: {article.get('link', 'No URL')}")
print()
except Exception as error:
print(f"FAILED: {error}")
if __name__ == "__main__":
main()