Requirements

  • Python 3
  • Preinstalled packages:
requests==2.24.0
pandas==1.3.2

You can copy paste packages name and version and put them into separated file “requirements.txt”. Then, using terminal, install with a signle command

pip install -r requirements.txt

Steps to accomplish

  1. Prepare the environment
  2. Make an API call
  3. Extract all found news articles
  4. Export data to CSV file:
    • From Python Dictionary
    • From Pandas table

1. Prepare the environment

This step consists of:

  • import packages

  • set environmental variables (API Key, URL of the News API, etc)

  • define a correct work folder

The following code illustrates the above steps:

# Import packages
# Default packages
import time
import csv
import os
import json

# Preinstalled packages
import requests
import pandas


# Define desired work folder, where you want to save your .csv files
# Windows Example
os.chdir('C:\\Users\\user_name\\PycharmProjects\\extract_news_data')
# Linux Example
os.chdir('/mnt/c/Users/user_name/PycharmProjects/extract_news_data')

# URL of our News API
base_url = 'https://api.newscatcherapi.com/v2/search'

# Your API key
X_API_KEY = 'PUT_YOUR_API_KEY'

Receive your API key by registering at app.newscatcherapi.com

2. Make API call

Let’s take it easy and try to make a single call. For example, we would like to look for all mentions of 3 popular cryptocurrencies Bitcoin, Ethereum, and Dogecoin.

In order to make a call, we need to set headers and parameters. In parameters, I am also filtering on articles in English as well as narrow down the search top 10 000 the most trustful news sources based on rank variable . Default timeperiod is set to 1 week, so no need to define this parameter.

# Put your API key to headers in order to be authorized to perform a call
headers = {'x-api-key': X_API_KEY}

# Define your desired parameters
params = {
    'q': 'Bitcoin AND Ethereum AND Dogecoin',
    'lang': 'en',
    'to_rank': 10000,
    'page_size': 100,
    'page': 1
    }

# Make a simple call with both headers and params
response = requests.get(base_url, headers=headers, params=params)

# Encode received results
results = json.loads(response.text.encode())
if response.status_code == 200:
    print('Done')
else:
    print(results)
    print('ERROR: API call failed.')

If the status_code is not 200, the error message should give you a clear idea of what was wrong

Here are the results that we received:

As you can see, we found 253 articles mentioning all three popular cryptocurrencies in one article. Another parameter worth looking at is “total_pages”. It shows how many API calls you will have to make in order to extract all found news articles. We will use it later in the guide. Besides, you can explore further by looking at each article separately. All of them are stored in “articles” JSON Key.

(Optional) Visualize data with pandas package

To be able to look at all 100 articles at the same time, let’s create a pandas table from the “articles” Key.

# Import data into pandas
pandas_table = pd.DataFrame(results['articles'])

Article represented by a pandas table

Now, you can have a closer look at the first 100 articles, before extracting the remaining.

3. Extract all news articles found

At this stage, we are already confident that an API call returns expected results. The next step is to extract all found news articles using “total_pages” value.

One thing to keep in mind is that I am using Free Trial API Key, where the frequency of API calls is limited to 1 call/second. So, to not be penalized for overuse, I make my code wait for one second between each call.

# Variable to store all found news articles
all_news_articles = []

# Ensure that we start from page 1
params['page'] = 1

# Infinite loop which ends when all articles are extracted
while True:

    # Wait for 1 second between each call
    time.sleep(1)

    # GET Call from previous section enriched with some logs
    response = requests.get(base_url, headers=headers, params=params)
    results = json.loads(response.text.encode())
    if response.status_code == 200:
        print(f'Done for page number => {params["page"]}')


        # Adding your parameters to each result to be able to explore afterwards
        for i in results['articles']:
            i['used_params'] = str(params)


        # Storing all found articles
        all_news_articles.extend(results['articles'])

        # Ensuring to cover all pages by incrementing "page" value at each iteration
        params['page'] += 1
        if params['page'] > results['total_pages']:
            print("All articles have been extracted")
            break
        else:
            print(f'Proceed extracting page number => {params["page"]}')
    else:
        print(results)
        print(f'ERROR: API call failed for page number => {params["page"]}')
        break

print(f'Number of extracted articles => {str(len(all_news_articles))}')

In summary, we iterate through all available pages, extract news articles and store them in one variable called “all_news_article”. We also add used parameters to each article, so when exploring you can see where it comes from. You can always delete this part of code if you do not want to have this information in your CSV file.

(Optional) Having multiple queries

Imagine that you want to extract news data from multiple queries at one time. So, instead of searching for articles where all 3 popular cryptocurrencies are mentioned, you would like to look for each of them separately and adding “business” as a topic. In this case, you will have multiple parameters and you will have to add one more iteration.

Here is how the params variable looks like:

params = [
  {
    q: "Bitcoin",
    lang: "en",
    to_rank: 10000,
    topic: "business",
    page_size: 100,
    page: 1,
  },
  {
    q: "Ethereum",
    lang: "en",
    to_rank: 10000,
    topic: "business",
    page_size: 100,
    page: 1,
  },
  {
    q: "Dogecoin",
    lang: "en",
    to_rank: 10000,
    topic: "business",
    page_size: 100,
    page: 1,
  },
];

In the code, we added one more iteration and put “separated_param” inside requests.get function.

# Variable to store all found news articles, mp stands for "multiple queries"
all_news_articles_mp = []

# Infinite loop which ends when all articles are extracted
for separated_param in params:

    print(f'Query in use => {str(separated_param)}')

    while True:
        # Wait for 1 second between each call
        time.sleep(1)

        # GET Call from previous section enriched with some logs
        response = requests.get(base_url, headers=headers, params=separated_param)
        results = json.loads(response.text.encode())
        if response.status_code == 200:
            print(f'Done for page number => {separated_param["page"]}')


            # Adding your parameters to each result to be able to explore afterwards
            for i in results['articles']:
                i['used_params'] = str(separated_param)


            # Storing all found articles
            all_news_articles_mp.extend(results['articles'])

            # Ensuring to cover all pages by incrementing "page" value at each iteration
            separated_param['page'] += 1
            if separated_param['page'] > results['total_pages']:
                print("All articles have been extracted")
                break
            else:
                print(f'Proceed extracting page number => {separated_param["page"]}')
        else:
            print(results)
            print(f'ERROR: API call failed for page number => {separated_param["page"]}')
            break

print(f'Number of extracted articles => {str(len(all_news_articles_mp))}')

One more important thing is to deduplicate results. Right now we extract articles from 3 different queries. But, as we saw before, there can be mentions of all 3 of them in the same article. So different queries can bring the same articles. That is why we have “_id” value generated for each article. ID is created by decoding both title and clean_url (Web Domain Name of the News Source).

Here is how you can deduplicate results in Python:

# Define variables
unique_ids = []
all_news_articles = []

# Iterate on each article and check whether we saw this _id before
for article in all_news_articles_mp:
    if article['_id'] not in unique_ids:
        unique_ids.append(article['_id'])
        all_news_articles.append(article)

4. Export data to CSV file

4.1 From Python dictionary

You can save a file directly from the dictionary generated previously:

field_names = list(all_news_articles[0].keys())
# Generate CSV file from dict
with open('extracted_news_articles.csv', 'w', encoding="utf-8", newline='') as csvfile:
    writer = csv.DictWriter(csvfile, fieldnames=field_names, delimiter=";")
    writer.writeheader()
    writer.writerows(all_news_articles)

4.2 From Pandas table

Or create a Pandas table, check the results and then generate a CSV:

# Generate CSV from Pandas table
# Create Pandas table
pandas_table = pd.DataFrame(all_news_articles)

# Generate CSV
pandas_table.to_csv('extracted_news_articles.csv', encoding='utf-8', sep=';')