Source code for nti.contentfragments.censor

#!/usr/bin/env python
# -*- coding: utf-8 -*-
"""
algorithms for content censoring.

The algorithms contained in here are trivially simple.
We could do much better, for example, with prefix trees.
See https://hkn.eecs.berkeley.edu/~dyoo/python/ahocorasick/
and http://pypi.python.org/pypi/trie/0.1.1

If efficiency really matters, and we have many different filters we are
applying, we would need to do a better job pipelining to avoid copies

.. $Id: censor.py 85352 2016-03-26 19:08:54Z carlos.sanchez $
"""

from __future__ import print_function, absolute_import, division
__docformat__ = "restructuredtext en"

# pylint:disable=useless-object-inheritance

logger = __import__('logging').getLogger(__name__)

import re
import array
import codecs
import io

from zope import component
from zope import interface

from zope.event import notify

import html5lib

from lxml import etree

from html5lib import treebuilders

from zope.cachedescriptors.property import Lazy

from .interfaces import CensoredContentEvent
from .interfaces import IHTMLContentFragment
from .interfaces import ICensoredContentPolicy
from .interfaces import UnicodeContentFragment
from .interfaces import ICensoredContentScanner
from .interfaces import ICensoredContentStrategy
from .interfaces import CensoredHTMLContentFragment
from .interfaces import CensoredUnicodeContentFragment
from .interfaces import IPunctuationMarkExpressionPlus
from .interfaces import ICensoredUnicodeContentFragment

etree_tostring = getattr(etree, 'tostring')
resource_string = __import__('pkg_resources').resource_string

PY2 = bytes is str
_ARRAY_CHAR_TYPE = 'u'
text_type = str if not PY2 else unicode # pylint:disable=undefined-variable

[docs]def punkt_re_char(lang='en'): return component.getUtility(IPunctuationMarkExpressionPlus, name=lang)
def _get_censored_fragment(org_fragment, new_fragment, factory=CensoredUnicodeContentFragment): try: result = org_fragment.censored(new_fragment) except AttributeError: result = factory(new_fragment) # We used to check this and then do alsoProvides if it wasn't there, # but this is only called with two factory arguments, both of which # provide the interface. assert ICensoredUnicodeContentFragment.providedBy(result) return result
[docs]@interface.implementer(ICensoredContentStrategy) class SimpleReplacementCensoredContentStrategy(object): def __init__(self, replacement_char=u'*'): assert len(replacement_char) == 1 self._replacement_array = array.array(_ARRAY_CHAR_TYPE, replacement_char)
[docs] def censor_ranges(self, content_fragment, censored_ranges): # Since we will be replacing each range with its equal length # of content and not shortening, then sorting the ranges doesn't matter content_fragment = content_fragment.decode('utf-8') \ if isinstance(content_fragment, bytes) else content_fragment buf = array.array(_ARRAY_CHAR_TYPE, content_fragment) for start, end in censored_ranges: buf[start:end] = self._replacement_array * (end - start) new_fragment = buf.tounicode() result = _get_censored_fragment(content_fragment, new_fragment) return result
[docs]class BasicScanner(object):
[docs] def test_range(self, new_range, yielded): for t in yielded: if new_range[0] >= t[0] and new_range[1] <= t[1]: # new_range is entirely included in something we already yielded return False return True
[docs] def do_scan(self, fragment, ranges): """ do_scan is passed a fragment that is guaranteed to be unicode and lower case. """ raise NotImplementedError()
[docs] def scan(self, content_fragment): yielded = [] # A simple, inefficient way of making sure we don't send overlapping ranges content_fragment = content_fragment.decode('utf-8') \ if isinstance(content_fragment, bytes) else content_fragment content_fragment = content_fragment.lower() result = self.do_scan(content_fragment, yielded) return result
[docs]@interface.implementer(ICensoredContentScanner) class TrivialMatchScanner(BasicScanner): def __init__(self, prohibited_values=()): # normalize case, ignore blanks # In this implementation, the most common values should # clearly go at the front of the list self.prohibited_values = [x.lower() for x in prohibited_values if x]
[docs] def do_scan(self, content_fragment, yielded): for the_word in self.prohibited_values: # Find all occurrences of each prohibited word, # one at a time idx = content_fragment.find(the_word, 0) while idx != -1: match_range = (idx, idx + len(the_word)) if self.test_range(match_range, yielded): yield match_range idx = content_fragment.find(the_word, match_range[1])
[docs]@interface.implementer(ICensoredContentScanner) class WordMatchScanner(BasicScanner): def __init__(self, white_words=(), prohibited_words=()): self.white_words = tuple([word.lower() for word in white_words]) self.prohibited_words = tuple([word.lower() for word in prohibited_words])
[docs] @Lazy def char_tester(self): return re.compile(punkt_re_char())
def _test_start(self, idx, content_fragment): result = idx == 0 or self.char_tester.match(content_fragment[idx - 1]) return result def _test_end(self, idx, content_fragment): result = idx == len(content_fragment) or self.char_tester.match(content_fragment[idx]) return result def _find_ranges(self, word_list, content_fragment): for the_word in word_list: # Find all occurrences of each word one by one idx = content_fragment.find(the_word, 0) while idx != -1: endidx = idx + len(the_word) match_range = (idx, endidx) if self._test_start(idx, content_fragment) and \ self._test_end(endidx, content_fragment): yield match_range idx = content_fragment.find(the_word, endidx)
[docs] def do_scan(self, content_fragment, yielded): # Here, 'yielded' is the ranges we examine and either guarantee # they are good, or guarantee they are bad ranges = self._find_ranges(self.white_words, content_fragment) yielded.extend(ranges) # yield/return any prohibited_words ranges = self._find_ranges(self.prohibited_words, content_fragment) for match_range in ranges: if self.test_range(match_range, yielded): yield match_range
[docs]@interface.implementer(ICensoredContentScanner) class PipeLineMatchScanner(BasicScanner): def __init__(self, scanners=()): self.scanners = tuple(scanners)
[docs] def do_scan(self, content_fragment, yielded): for s in self.scanners: matched_ranges = s.do_scan(content_fragment, yielded) for match_range in matched_ranges: if self.test_range(match_range, yielded): yield match_range
def _read(fname, rot13): data = resource_string(__name__, fname) data_text = data.decode('utf-8') # Go through StringIO for universal newline handling src = io.StringIO(data_text) if rot13: words = {codecs.encode(x, 'rot13').strip().lower() for x in src.readlines()} else: words = {x.strip().lower() for x in src.readlines()} return frozenset(words) _white_words = _read('white_list.txt', False) _prohibited_words = _read('prohibited_words.txt', True) _profane_words = _read('profanity_list.txt', True) @interface.implementer(ICensoredContentScanner) def _word_profanity_scanner(): """ External files are stored in rot13. """ return WordMatchScanner(_white_words, _prohibited_words) @interface.implementer(ICensoredContentScanner) def _word_plus_trivial_profanity_scanner(): return PipeLineMatchScanner([_word_profanity_scanner(), TrivialMatchScanner(_profane_words)])
[docs]@interface.implementer(ICensoredContentPolicy) class DefaultCensoredContentPolicy(object): """ A content censoring policy that looks up the default scanner and strategy utilities and uses them. This package does not register this policy as an adapter for anything, you must do that yourself, on (content-fragment, target-object); it can also be registered as a utility or instantiated directly with no arguments. """ def __init__(self, fragment=None, target=None): pass
[docs] def censor(self, fragment, target): if IHTMLContentFragment.providedBy(fragment): result = self.censor_html(fragment, target) else: result = self.censor_text(fragment, target) return result
[docs] def censor_text(self, fragment, target): scanner = component.getUtility(ICensoredContentScanner) strat = component.getUtility(ICensoredContentStrategy) return strat.censor_ranges(fragment, scanner.scan(fragment))
[docs] def censor_html(self, fragment, target): result = None try: p = html5lib.HTMLParser(tree=treebuilders.getTreeBuilder("lxml"), namespaceHTMLElements=False) doc = p.parse(fragment) for node in doc.iter(): for name in ('text', 'tail'): text = getattr(node, name, None) if text: text = self.censor_text(UnicodeContentFragment(text), target) setattr(node, name, text) docstr = etree_tostring(doc, encoding=text_type) # be sure to return the best interface result = _get_censored_fragment(fragment, docstr, CensoredHTMLContentFragment) except Exception: result = self.censor_text(fragment, target) return result
[docs]@interface.implementer(ICensoredContentPolicy) class NoOpCensoredContentPolicy(object): """ A content censoring policy that does no censoring whatesover. This package does not register this policy as an adapter for anything, you must do that yourself, on (content-fragment, target-object); it can also be registered as a utility or instantiated directly with no arguments. """ def __init__(self, *args, **kwargs): pass
[docs] def censor(self, fragment, _target): return fragment
from nti.schema.interfaces import BeforeTextAssignedEvent
[docs]def censor_before_text_assigned(fragment, target, event): """ Watches for field values to be assigned, and looks for specific policies for the given object and field name to handle censoring. If such a policy is found and returns something that is not the original fragment, the event is updated (and so the value assigned to the target is also updated). """ if ICensoredUnicodeContentFragment.providedBy(fragment): # Nothing to do, already censored return None, None # Does somebody want to censor assigning values of fragments' type to objects of # target's type to the field named event.name? policy = component.queryMultiAdapter((fragment, target), ICensoredContentPolicy, name=event.name) if policy is not None: censored_fragment = policy.censor(fragment, target) if censored_fragment is not fragment and censored_fragment != fragment: event.object = censored_fragment # notify censoring context = event.context or target notify(CensoredContentEvent(fragment, censored_fragment, event.name, context)) # as an optimization when we are called directly return event.object, True return fragment, False
[docs]def censor_before_assign_components_of_sequence(sequence, target, event): """ Register this adapter for (usually any) sequence, some specific interface target, and the :class:`nti.schema.interfaces.IBeforeSequenceAssignedEvent` and it will iterate across the fields and attempt to censor each of them. This package DOES NOT register this event. """ if sequence is None: return # There are many optimization opportunities here s2 = [] _changed = False evt = BeforeTextAssignedEvent(None, event.name, event.context) for obj in sequence: evt.object = obj val, changed = censor_before_text_assigned(obj, target, evt) _changed |= changed s2.append(val) # only copy the list/tuple/whatever if we need to if _changed: event.object = type(event.object)(s2)
[docs]def censor_assign(fragment, target, field_name): """ Perform manual censoring of assigning an object to a field. """ evt = BeforeTextAssignedEvent(fragment, field_name, target) return censor_before_text_assigned(fragment, target, evt)[0]