Source code for ppa.archive.gale

import json
import logging
import pathlib
import time

import pymarc
import requests
from django.conf import settings
from django.core.exceptions import ImproperlyConfigured
from pairtree import PairtreeStorageFactory, storage_exceptions

from ppa import __version__ as ppa_version

logger = logging.getLogger(__name__)


[docs] def get_local_ocr(item_id): """ Load local OCR page text for the specified Gale volume, if available. This requires a base directory (specified by GALE_LOCAL_OCR) to be configured and assumes the following organization: * Volume-level directories are organized in stub directories that correspond to every third number (e.g., CW0128905397 --> 193). So, a Gale volume's OCR data is located in the following directory: GALE_LOCAL_OCR / stub_dir / item_id.json * Page text is stored as a JSON dictionary with keys based on Gale page numbers, which is a 4-digit string (e.g., "0004"). Raises a FileNotFoundError if the local OCR page text does not exist. """ ocr_dir = getattr(settings, "GALE_LOCAL_OCR", None) if not ocr_dir: raise ImproperlyConfigured( "GALE_LOCAL_OCR configuration is required for indexing Gale page content" ) # check that the id looks as expected (appease github codeql security concerns) # first two characters are CW or CB; rest of the id is numeric if not all([item_id[:2] in ["CW", "CB"], item_id[2:].isnumeric()]): raise ValueError(f"{item_id} is not a valid Gale item identifier") # files are in stub directories; following conventions set in ppa-nlp stub_dir = item_id[::3][1:] ocr_path = pathlib.Path(ocr_dir, stub_dir, f"{item_id}.json") with ocr_path.open() as ocrfile: return json.load(ocrfile)
[docs] class GaleAPIError(Exception): """Base exception class for Gale API errors"""
[docs] class GaleItemForbidden(GaleAPIError): """Permission denied to access item in Gale API"""
[docs] class GaleUnauthorized(GaleAPIError): """Permission not authorized for Gale API access"""
[docs] class GaleItemNotFound(GaleAPIError): """Item not found in Gale API"""
[docs] class GaleAPI: """Minimal Gale API client with functionality need for PPA import. Requires **GALE_API_USERNAME** configured in Django settings. Automatically uses the configured username to retrieve an API key when needed, and has logic to refresh the API key when it expires (30 minutes). If **TECHNICAL_CONTACT** is configured in Django settings, it will be included in request headers when making API calls. Implemented as a singleton; instanciating the class will return the same shared instance every time. """ #: base URL for all API requests api_root = "https://api.gale.com/api" #: shared singleton instance; populated on first instantiation instance = None def __new__(cls): # implement as a singleton # adapted from https://softwareengineering.stackexchange.com/a/333710 # if no instance has been initialized, create and store on the class if cls.instance is None: cls.instance = super().__new__(cls) # return the instance return cls.instance def __init__(self): # NOTE: copied from hathi.py base api class; should be generalized # into a common base class if/when we add a third provider # first make sure we have a username configured try: self.username = settings.GALE_API_USERNAME except AttributeError: raise ImproperlyConfigured( "GALE_API_USERNAME configuration is required for Gale API" ) # create a request session, for request pooling self.session = requests.Session() # set a user-agent header, but preserve requests version information headers = { "User-Agent": "ppa-django/%s (%s)" % (ppa_version, self.session.headers["User-Agent"]) } # include technical contact as From header, if set tech_contact = getattr(settings, "TECHNICAL_CONTACT", None) if tech_contact: headers["From"] = tech_contact self.session.headers.update(headers) def _make_request( self, url, params=None, requires_api_key=True, stream=False, retry=0 ): """Make a GET request with the configured session. Takes a url relative to :attr:`api_root`, optional dictionary of parameters for the request, and flags to indicate if the request needs an API key, should be streamed, or is a retry.""" # NOTE: also copied from hathi.py # Returns the response for status 200 OK; raises # :class:`HathiItemNotFound` for 404 and :class:`HathiItemForbidden` # for 403. # ''' rqst_url = "%s/%s" % (self.api_root, url) rqst_opts = {} if params: rqst_opts["params"] = params.copy() # add api key to parameters if neded for this request if requires_api_key: if "params" not in rqst_opts: rqst_opts["params"] = {} rqst_opts["params"]["api_key"] = self.api_key resp = self.session.get(rqst_url, stream=stream, **rqst_opts) logger.debug( "get %s %s: %f sec", rqst_url, resp.status_code, resp.elapsed.total_seconds(), ) if resp.status_code == requests.codes.ok: return resp if resp.status_code == requests.codes.not_found: raise GaleItemNotFound # when api key expires, API returns: # HTTP Status 401 - Authentication Failed: Invalid or Expired API key # If we get a 401 on a request that requires an api key, try getting a new one if resp.status_code == requests.codes.unauthorized: # If we get a 401 on a request that requires an api key, # get a fresh key and then try the same request again if requires_api_key and retry < 1: self.refresh_api_key() return self._make_request( url, params=params, requires_api_key=requires_api_key, stream=stream, retry=retry + 1, ) # response is html error, not json; could try # extracting h1, but not sure it's worth parsing raise GaleUnauthorized() if resp.status_code == requests.codes.forbidden: # forbidden results return a message # NOTE that item requests for invalid ids may return 403 raise GaleItemForbidden(resp.json()["message"]) # raise anything else as a generic error with status code # getting 406 not acceptable in some cases # (attempt to access item with invalid item id) raise GaleAPIError(resp.status_code)
[docs] def get_api_key(self): """Get a new API key to use for requests in the next 30 minutes.""" # GALE API requires use of an API key, which lasts for 30 minutes # request a new one when needed using configured username response = self._make_request( "tools/generate_key", {"user": self.username}, requires_api_key=False ) return response.json()["apiKey"]
_api_key = None @property def api_key(self): """Property for current api key. Uses :meth:`get_api_key` to request a new one when needed.""" if self._api_key is None: self._api_key = self.get_api_key() return self._api_key
[docs] def refresh_api_key(self): """clear cached api key and request a new one""" self._api_key = None assert self.api_key # populate new key through property
[docs] def get_item(self, item_id): """Get the full record for a single item""" # full id looks like GALE|CW###### or GALE|CB####### # using streaming makes a *significant* difference in response time, # especially for larger results response = self._make_request("v1/item/GALE%%7C%s" % item_id, stream=True) if response: return response.json()
[docs] def get_item_pages(self, item_id, gale_record=None): """Return a generator of page content for the specified digitized work from the Gale API. Takes an optional gale_record parameter (item record as returned by Gale API), to avoid making an extra API call if data is already available.""" if gale_record is None: gale_record = self.get_item(item_id) local_ocr_text = None try: # Use higher quality local OCR text if available local_ocr_text = get_local_ocr(item_id) except FileNotFoundError: logger.warning(f"Local OCR not found for {item_id}") except json.decoder.JSONDecodeError: logger.warning(f"JSON decode error on local OCR file for {item_id}") # iterate through the pages in the response for page in gale_record["pageResponse"]["pages"]: page_number = page["pageNumber"] # Use local OCR text if we have it, with fallback to Gale # OCR. Set a tag to indicate when local OCR is present. tags = [] ocr_text = None if local_ocr_text and page_number in local_ocr_text: ocr_text = local_ocr_text.get(page_number) # if we have content for this page, set tag to indicate local ocr. # If page is present but content is the empty string # (i.e., blank page), still set tag since it was the local OCR # that determined the page was blank tags = ["local_ocr"] # If page is not present in the data, use Gale OCR as fallback else: # don't warn for every page when no OCR text is found if local_ocr_text: logger.warning(f"No local OCR for {item_id} {page_number}") # try getting the ocr from the gale api result # (may be empty, since some pages have no text) ocr_text = page.get("ocrText") info = { "page_id": page_number, "content": ocr_text, # Don't set label when there isn't one. Fallback labels are set in the # common page indexing code. "label": page.get("folioNumber"), "tags": tags, # image id needed for thumbnail url; use solr dynamic field "image_id_s": page["image"]["id"], # index image url since we will need it when Gale API changes # (expect to be present in Gale API; may not be present in unit tests) "image_url_s": page["image"].get("url"), } yield info
# MARC records needed for import and metadata are stored in a local pairtree. # currently used for Gale/ECCO content
[docs] def get_marc_storage(): """return pairtree storage for marc records""" return PairtreeStorageFactory().get_store( store_dir=settings.MARC_DATA, uri_base="info:local/" )
[docs] class MARCRecordNotFound(Exception): """record not found in local MARC record storage"""
[docs] def get_marc_record(marc_id): """get a marc record from the pairtree storage by Gale ESTC id""" start_time = time.time() try: marc_object = get_marc_storage().get_object(marc_id) with marc_object.get_bytestream("marc.dat", streamable=True) as marcfile: reader = pymarc.MARCReader(marcfile, to_unicode=True, file_encoding="utf-8") record = [rec for rec in reader][0] logger.debug( "Loaded MARC record for %s in %.5fs" % (marc_id, time.time() - start_time) ) except storage_exceptions.PartNotFoundException: raise MARCRecordNotFound(marc_id) return record