import logging
import rdflib
import requests
from django.core.exceptions import ObjectDoesNotExist, ValidationError
from django.db import models
from django.urls import reverse
from django.utils.html import format_html, strip_tags
from parasolr.django.indexing import ModelIndexable
from mep.accounts.event_set import EventSetMixin
from mep.accounts.partial_date import (DatePrecisionField, PartialDate,
from mep.books.utils import generate_sort_title, nonstop_words, work_slug
from mep.common.models import Named, Notable, TrackChangesModel
from mep.common.validators import verify_latlon
from mep.people.models import Person
logger = logging.getLogger(__name__)
[docs]class PublisherPlace(Named, Notable):
'''Model for place where publishers are located'''
# NOTE: Using decimal field here to set precision on the head
# FloatField uses float, which can introduce unexpected rounding.
# This would let us have measurements down to the tree level, if necessary
# QUESTION: Do we want to add a Geonames ID for this?
latitude = models.DecimalField(
longitude = models.DecimalField(
[docs]class Publisher(Named, Notable):
'''Model for publishers'''
[docs]class Genre(Named):
'''Genres of items from OCLC'''
[docs]class Subject(models.Model):
'''Linked data subjects for describing :class:`Item`'''
#: name/label for the subject (required but not required unique)
name = models.CharField(max_length=255)
#: linked data URI for the subject
uri = models.URLField("URI", help_text="Subject URI", unique=True)
#: rdf type for the subject
rdf_type = models.URLField("RDF Type")
def __str__(self):
return '%s (%s)' % (, self.uri)
def __repr__(self):
return '<Subject %s (%s)>' % (self.uri,
[docs] @classmethod
def create_from_uri(cls, uri):
'''Initialize a new :class:`Subject` from a URI. Loads the URI
as an :class:`rdflib.Graph` in order to pull the preferred label
and RDF type for the URI.'''
# as for OCLC code, using requests to load RDF content
# for more fine-grained control and fewer errors for batch work,
# and current SSL support (i.e. for VIAF)
graph = rdflib.Graph()
request_uri = uri
uriref = rdflib.URIRef(uri)
# worldcat FAST URIs don't support content negotation,
# so explicitly request RDF content based on known URL format
request_headers = {}
if uri.startswith(''):
request_uri = '%s.rdf.xml' % request_uri.rstrip('/')
elif uri.startswith(''):
# at least one LOC url is redirecting alternately
# to an HTML version and json-ld, so explicitly request json-ld
request_uri = '%s.jsonld' % request_uri
request_headers = {'accept': 'application/rdf+xml'}
response = requests.get(request_uri, headers=request_headers)
# exclude html responses, since they can't be parsed
# (some LOC json requests are redirecting to html)
# Possibly useful? LoC responses include a X-PrefLabel header,
# could just use that (and make type optional)
if response.status_code == and \
not response.headers['content-type'].startswith('text/html'):
parse_opts = {}
# some results return json-ld, and rdflib does not autodetect
if response.headers['content-type'] == 'application/ld+json':
parse_opts['format'] = 'json-ld'
graph.parse(data=response.content.decode(), **parse_opts)
label_opts = {}
# viaf records include multiple languages and some records
# have language codes for them; try with language filter first
if '' in uri:
label_opts['lang'] = 'en-US'
labels = graph.preferredLabel(uriref, **label_opts)
# if no labels were found with language tag, try without
if not labels:
labels = graph.preferredLabel(uriref)
# if still no labels, bail out
if not labels:
# preferred label returns a list of predicate, object
# use the object for the first result
name = str(labels[0][1])
rdf_type = str(graph.value(uriref, rdflib.RDF.type))
return Subject.objects.create(uri=uri, name=name,
# if the request failed or was not usable, log the error
logger.warning('Error creating Subject for %s (response %s)',
uri, response.status_code)
[docs]class WorkSignalHandlers:
'''Signal handlers for indexing :class:`Work` records when
related records are saved or deleted.'''
[docs] @staticmethod
def creatortype_save(sender, instance=None, raw=False, **_kwargs):
'''reindex all associated works when a creator type is changed'''
# raw = saved as presented; don't query the database
if raw or not
# if any members are associated
works = Work.objects.filter(
if works.exists():
logger.debug('creator type save, reindexing %d related works',
[docs] @staticmethod
def creatortype_delete(sender, instance, **_kwargs):
'''reindex all associated works when a creator type is deleted'''
work_ids = Work.objects.filter( \
.values_list('id', flat=True)
if work_ids:
logger.debug('creator type delete, reindexing %d related works',
# find the items based on the list of ids to reindex
works = Work.objects.filter(id__in=list(work_ids))
[docs] @staticmethod
def person_save(sender, instance=None, raw=False, **_kwargs):
'''reindex all works associated via creator when a person is saved'''
# raw = saved as presented; don't query the database
if raw or not
# if any members are associated
works = Work.objects.filter(
if works.exists():
logger.debug('person save, reindexing %d related works',
[docs] @staticmethod
def person_delete(sender, instance, **_kwargs):
'''reindex all works associated via creator when a person is deleted'''
work_ids = Work.objects.filter( \
.values_list('id', flat=True)
if work_ids:
logger.debug('person delete, reindexing %d related works',
# find the items based on the list of ids to reindex
works = Work.objects.filter(id__in=list(work_ids))
[docs] @staticmethod
def creator_change(sender, instance=None, raw=False, **_kwargs):
'''reindex associated work when a creator record is changed'''
# raw = saved as presented; don't query the database
if raw or not
logger.debug('creator change, reindexing %s',
# delete the assocation so cards will index without the account
[docs] @staticmethod
def event_save(sender=None, instance=None, raw=False, **kwargs):
'''when an event is saved, reindex associated work if there is one'''
# raw = saved as presented; don't query the database
if raw or not
# if any books are associated
[docs] @staticmethod
def event_delete(sender, instance, **kwargs):
'''when an event is delete, reindex people associated
with the corresponding account.'''
# get a list of ids for deleted event
[docs]class WorkQuerySet(models.QuerySet):
'''Custom :class:`models.QuerySet` for :class:`Work`'''
[docs] def count_events(self):
'''Annotate the queryset with counts for events, borrows,
and purchases.'''
return self.annotate(models.Count('event', distinct=True),
models.Count('event__borrow', distinct=True),
models.Count('event__purchase', distinct=True))
[docs]class Work(TrackChangesModel, Notable, ModelIndexable, EventSetMixin):
'''Work record for an item that circulated in the library or was
other referenced in library activities.'''
#: Message that will be read to users of assistive technology in place of
#: the uncertainty icon.
UNCERTAINTY_MESSAGE = "Work data is uncertain or incomplete."
#: mep id from stub records imported from xml
mep_id = models.CharField(
max_length=255, blank=True, unique=True,
verbose_name='MEP ID', null=True)
# NOTE: mep_id has null=true so we can enforce unique constraint but
# allow for items with no mep id
title = models.CharField(
max_length=255, blank=True,
help_text='Title of the work in English')
sort_title = models.CharField(
max_length=255, blank=True,
help_text='Sort title autogenerated from title on record save.')
year = models.PositiveSmallIntegerField(
blank=True, null=True, verbose_name='Date of Publication')
uri = models.URLField(blank=True, verbose_name='Work URI',
help_text="Linked data URI for this work")
edition_uri = models.URLField(
blank=True, verbose_name='Edition URI',
help_text="Linked data URI for this edition, if known")
ebook_url = models.URLField(
blank=True, verbose_name='eBook URL',
help_text='Link to a webpage where one or more ebook versions can be \
downloaded, e.g. Project Gutenberg page for this item')
work_format = models.ForeignKey(
Format, verbose_name='Format', null=True, blank=True,
help_text='Format, e.g. book or periodical', on_delete=models.SET_NULL)
#: update timestamp
updated_at = models.DateTimeField(auto_now=True, null=True)
# direct access to all creator persons, using Creator as through model
creators = models.ManyToManyField(Person, through='Creator')
#: optional genres, from OCLC record
genres = models.ManyToManyField(Genre, blank=True,
help_text='Genre(s) from OCLC record')
#: optional subjects, from OCLC record
subjects = models.ManyToManyField(Subject, blank=True)
#: a field for notes publicly displayed on the website
public_notes = models.TextField(
blank=True, help_text='Notes for display on the public website. ' +
' Use markdown for formatting.')
#: slug for use in urls
slug = models.SlugField(
max_length=255, unique=True, blank=True, null=True,
help_text='Short, durable, unique identifier for use in URLs. ' +
'Save and continue editing to have a new slug autogenerated.' +
'Editing will change the public, citable URL for books.')
# NOTE: null=true required to avoid validation error
# when submitting admin edit form with no slug
# override default manager with customized version
objects = WorkQuerySet.as_manager()
class Meta:
ordering = ['sort_title']
[docs] def save(self, *args, **kwargs):
# override save to ensure mep ID is None rather than empty string
# if not set
if not self.mep_id:
self.mep_id = None
# if slug is empty generate
if not self.slug:
# recalculate sort title in case title has changed
self.sort_title = generate_sort_title(self.title)
# if slug has changed, save the old one as a past slug
# (skip if record is not yet saved)
if and self.has_changed('slug'):
super(Work, self).save(*args, **kwargs)
[docs] def validate_unique(self, exclude=None):
# customize uniqueness validation to ensure new slugs don't
# conflict with past slugs
if PastWorkSlug.objects.filter(slug=self.slug) \
raise ValidationError('Slug is not unique ' +
'(conflicts with previously used slugs)')
def __repr__(self):
# provide pk for easy lookup and string for recognition
return '<Work pk:%s %s>' % ( or '??', str(self))
def __str__(self):
year_str = ''
if self.year:
year_str = '(%s)' % self.year
str_value = ('%s %s' % (self.title, year_str)).strip()
if str_value:
return str_value
return '(No title, year)'
[docs] def creator_by_type(self, creator_type):
'''return work creators of a single type, e.g. author'''
return [creator.person for creator in self.creator_set.all()
if == creator_type]
[docs] def get_absolute_url(self):
'''Return the public url to view book's detail page'''
return reverse('books:book-detail', args=[self.slug])
def is_uncertain(self):
Returns True if the Work's notes indicate that it should show an
uncertainty icon via the UNCERTAINTYICON tag.
return "UNCERTAINTYICON" in self.notes
def creator_names(self):
'''list of all creator names, including authors'''
return [ for creator in self.creators.all()]
def authors(self):
'''work creators with type author'''
return self.creator_by_type('Author')
[docs] def author_list(self):
'''semicolon separated list of author names'''
return '; '.join([ for auth in self.authors])
author_list.verbose_name = 'Authors'
def sort_author_list(self):
'''semicolon separated list of author sort names'''
return '; '.join([auth.sort_name for auth in self.authors])
def editors(self):
'''work creators with type editor'''
return self.creator_by_type('Editor')
def translators(self):
'''work creators with type translator'''
return self.creator_by_type('Translator')
def event_count(self):
'''Number of events of any kind associated with this work.'''
# use database annotation if present; otherwise use queryset
return getattr(self, 'event__count',
def borrow_count(self):
'''Number of times this work was borrowed.'''
# use database annotation if present; otherwise use queryset
return getattr(self, 'event__borrow__count',
def purchase_count(self):
'''Number of times this work was purchased.'''
# use database annotation if present; otherwise use queryset
return getattr(self, 'event__purchase__count',
[docs] def admin_url(self):
'''URL to edit this record in the admin site'''
return reverse('admin:books_work_change', args=[])
admin_url.verbose_name = 'Admin Link'
[docs] def has_uri(self):
'''Is the URI is set for this work?'''
return self.uri != ''
has_uri.boolean = True
has_uri.admin_order_field = 'uri'
[docs] def subject_list(self):
'''semicolon separated list of subject names'''
return '; '.join([ for subj in self.subjects.all()])
[docs] def genre_list(self):
'''semicolon separated list of genres'''
return '; '.join([ for genre in self.genres.all()])
index_depends_on = {
'creators': {
'post_save': WorkSignalHandlers.person_save,
'pre_delete': WorkSignalHandlers.person_delete,
'books.Creator': {
'post_save': WorkSignalHandlers.creator_change,
'post_delete': WorkSignalHandlers.creator_change,
'books.CreatorType': {
'post_save': WorkSignalHandlers.creatortype_save,
'pre_delete': WorkSignalHandlers.creatortype_delete,
'books.Format': {
'post_save': WorkSignalHandlers.format_save,
'pre_delete': WorkSignalHandlers.format_delete,
'accounts.Event': {
'post_save': WorkSignalHandlers.event_save,
'pre_delete': WorkSignalHandlers.event_delete,
[docs] @classmethod
def items_to_index(cls):
'''Modify the queryset used for indexing in bulk; prefetch
creators, annotate event counts.'''
return cls.objects.prefetch_related('creator_set').count_events()
[docs] def index_data(self):
'''data for indexing in Solr'''
index_data = super().index_data()
'title_t': self.title,
'sort_title_isort': self.sort_title,
'slug_s': self.slug,
'authors_t': [ for a in self.authors] if self.authors else None,
'sort_authors_t': [str(a) for a in self.authors] if self.authors else None,
'sort_authors_isort': self.sort_author_list,
'creators_t': self.creator_names,
'pub_date_i': self.year,
'format_s_lower': self.format(),
'notes_txt_en': self.public_notes,
'is_uncertain_b': self.is_uncertain,
'event_count_i': self.event_count,
'admin_notes_txt_en': self.notes,
'edition_titles': [ed.title for ed in self.edition_set.all()],
earliest_date = self.earliest_date()
if earliest_date:
# NOTE: doesn't matter if partial, will still sort as expected
index_data['first_event_date_i'] = earliest_date.strftime('%Y%m%d')
# if there is at least one date, also include circulation years
index_data['event_years_is'] = self.event_years
return index_data
def first_known_interaction(self):
'''date of the earliest known interaction for this item'''
# search for the earliest start date, excluding any borrow
# or purchase events with unknown years
first_event = self.event_set \
.filter(start_date__isnull=False) \
.exclude(borrow__start_date_precision__knownyear=False) \
.exclude(purchase__start_date_precision__knownyear=False) \
if first_event:
return first_event.start_date
[docs] def populate_from_worldcat(self, worldcat_entity):
'''Set work URI, edition URI, genre, item type, and subjects
based on a WorldCat record.'''
# work URI apparently not available in all cases; set to
# empty string instead of None/null
self.uri = worldcat_entity.work_uri or ''
self.edition_uri = worldcat_entity.item_uri
# add associations for genres, creating if necessary
for genre in worldcat_entity.genres:
# types will be prepopulated to work with OCLC search results
# (predominantly books and periodicals), but in future
# we may need a method to create format from uri as for subjects
if worldcat_entity.item_type:
print('entity item type %s' % worldcat_entity.item_type)
self.work_format = Format.objects.get(
except ObjectDoesNotExist:
logger.error('Unexpected item type %s',
subject_uris = worldcat_entity.subjects
if subject_uris:
# find existing subjects already in the database
subjects = list(Subject.objects.filter(uri__in=subject_uris))
# create any new subjects that don't already exist
new_subject_uris = set(subject_uris) - \
set(subj.uri for subj in subjects)
for subject_uri in new_subject_uris:
# try to create the subject
subject = Subject.create_from_uri(subject_uri)
# if successful, add to the list
if subject:
# set subjects on this item (replacing any previously set)
[docs] def generate_slug(self):
'''Generate a slug for this work based on author and title
and ensure it is unique.'''
self.slug = work_slug(self)
# check for duplicates
dupe_slugs = Work.objects.filter(slug__startswith=self.slug) \
.exclude( \
.order_by('slug') \
.values_list('slug', flat=True)
if dupe_slugs.count() and self.slug in dupe_slugs:
nonstop_title_words = nonstop_words(self.title)
# if title has more than three words, use the 4th for uniqueness
if len(nonstop_title_words) > 3:
self.slug = work_slug(self, max_words=4)
# if 4-word title slug is still not unique, try 5
if self.slug in dupe_slugs and len(nonstop_title_words) > 4:
self.slug = work_slug(self, max_words=5)
# if still not unique, add a number
if self.slug in dupe_slugs:
prefix = '%s-' % self.slug
# get all the endings attached to this slug (i.e. unclear-##)
suffixes = [slug[len(prefix):] for slug in dupe_slugs
if slug.startswith(prefix)]
# get the largest numeric suffix
values = [int(num) for num in suffixes if num.isnumeric()]
slug_count = max(values) if values else 1
# use the next number for the current slug
self.slug = '%s-%s' % (self.slug, slug_count + 1)
[docs]class PastWorkSlug(models.Model):
'''A slug that was previously associated with a :class:`Work`;
preserved so that former slugs will resolve to the correct work.'''
#: work record this slug belonged to
work = models.ForeignKey(Work, related_name='past_slugs',
#: slug
slug = models.SlugField(
max_length=100, unique=True,
help_text='Short, durable, unique identifier for use in URLs. ' +
'Editing will change the public, citable URL for library books.')
[docs]class Edition(Notable):
'''A specific known edition of a :class:`Work` that circulated.'''
work = models.ForeignKey(
Work, help_text='Generic Work associated with this edition.',
title = models.CharField(
max_length=255, blank=True,
help_text='Title of this edition, if different from associated work')
volume = models.PositiveSmallIntegerField(blank=True, null=True)
number = models.CharField(max_length=255, blank=True, null=True)
date = models.DateField(blank=True, null=True,
help_text='Date of Publication for this edition')
date_precision = DatePrecisionField(blank=True, null=True)
partial_date = PartialDate('date', 'date_precision',
PartialDateMixin.UNKNOWN_YEAR, label='publication date')
season = models.CharField(max_length=255, blank=True,
help_text='Spell out month or season if part of numbering')
edition = models.CharField(max_length=255, blank=True)
uri = models.URLField(
blank=True, verbose_name='URI',
help_text="Linked data URI for this edition, if known")
#: update timestamp
updated_at = models.DateTimeField(auto_now=True, null=True)
# direct access to all creator persons, using Creator as through model
creators = models.ManyToManyField(Person, through='EditionCreator')
publisher = models.ManyToManyField(Publisher, blank=True)
pub_places = models.ManyToManyField(
PublisherPlace, blank=True, verbose_name="Places of Publication")
# language model foreign key may be added in future
class Meta:
ordering = ['date', 'volume']
def __repr__(self):
# provide pk for easy lookup and string for recognition
return '<Edition pk:%s %s>' % ( or '??', self)
def __str__(self):
# simple string representation
parts = [
self.title or or '??',
'(%s)' % (self.partial_date or or '??', ),
if self.volume:
parts.append('vol. %s' % self.volume)
if self.number:
parts.append('no. %s' % self.number)
if self.season:
# include edition?
return ' '.join(parts)
[docs] def display_html(self):
'''Render volume/issue citation with formatting, suitable
for inclusion on a webpage.'''
parts = []
if self.volume:
parts.append('Vol. %s' % self.volume)
if self.number:
parts.append('no. %s' % self.number)
if self.season or
season_year = '%s %s' % (
self.season, if else '')
citation = ', '.join(parts)
if self.title:
return format_html('{} <br/><em>{}</em>',
citation, self.title)
return citation
[docs] def display_text(self):
'''text-only version of volume/issue citation'''
return strip_tags(self.display_html())
[docs]class CreatorType(Named, Notable):
'''Type of creator role a person can have in relation to a work;
author, editor, translator, etc.'''
order = models.PositiveSmallIntegerField(
help_text='order in which creator types will be listed')
class Meta:
ordering = ['order']
[docs]class Creator(Notable):
creator_type = models.ForeignKey(CreatorType, on_delete=models.CASCADE)
person = models.ForeignKey(Person, on_delete=models.CASCADE)
work = models.ForeignKey(Work, on_delete=models.CASCADE)
order = models.PositiveSmallIntegerField(
blank=True, null=True,
help_text='Order for multiple creators of the same type (optional)')
class Meta:
ordering = ['creator_type__order', 'order', 'person__sort_name']
def __str__(self):
return '%s %s %s' % (self.person, self.creator_type,
[docs]class EditionCreator(Notable):
'''Creator specific to an :class:`Edition` of a :class:`Work`.'''
creator_type = models.ForeignKey(CreatorType, on_delete=models.CASCADE)
person = models.ForeignKey(Person, on_delete=models.CASCADE)
edition = models.ForeignKey(Edition, on_delete=models.CASCADE)
def __str__(self):
'''String representation: person, creator type, edition.'''
return '%s %s %s' % (self.person, self.creator_type, self.edition)