123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478 |
- # Natural Language Toolkit: TextTiling
- #
- # Copyright (C) 2001-2019 NLTK Project
- # Author: George Boutsioukis
- #
- # URL: <http://nltk.org/>
- # For license information, see LICENSE.TXT
- import re
- import math
- try:
- import numpy
- except ImportError:
- pass
- from nltk.tokenize.api import TokenizerI
- BLOCK_COMPARISON, VOCABULARY_INTRODUCTION = 0, 1
- LC, HC = 0, 1
- DEFAULT_SMOOTHING = [0]
- class TextTilingTokenizer(TokenizerI):
- """Tokenize a document into topical sections using the TextTiling algorithm.
- This algorithm detects subtopic shifts based on the analysis of lexical
- co-occurrence patterns.
- The process starts by tokenizing the text into pseudosentences of
- a fixed size w. Then, depending on the method used, similarity
- scores are assigned at sentence gaps. The algorithm proceeds by
- detecting the peak differences between these scores and marking
- them as boundaries. The boundaries are normalized to the closest
- paragraph break and the segmented text is returned.
- :param w: Pseudosentence size
- :type w: int
- :param k: Size (in sentences) of the block used in the block comparison method
- :type k: int
- :param similarity_method: The method used for determining similarity scores:
- `BLOCK_COMPARISON` (default) or `VOCABULARY_INTRODUCTION`.
- :type similarity_method: constant
- :param stopwords: A list of stopwords that are filtered out (defaults to NLTK's stopwords corpus)
- :type stopwords: list(str)
- :param smoothing_method: The method used for smoothing the score plot:
- `DEFAULT_SMOOTHING` (default)
- :type smoothing_method: constant
- :param smoothing_width: The width of the window used by the smoothing method
- :type smoothing_width: int
- :param smoothing_rounds: The number of smoothing passes
- :type smoothing_rounds: int
- :param cutoff_policy: The policy used to determine the number of boundaries:
- `HC` (default) or `LC`
- :type cutoff_policy: constant
- >>> from nltk.corpus import brown
- >>> tt = TextTilingTokenizer(demo_mode=True)
- >>> text = brown.raw()[:4000]
- >>> s, ss, d, b = tt.tokenize(text)
- >>> b
- [0, 0, 0, 0, 0, 1, 0, 0, 0, 0, 0, 0, 1, 0, 0, 0, 1, 0, 0, 0, 0]
- """
- def __init__(
- self,
- w=20,
- k=10,
- similarity_method=BLOCK_COMPARISON,
- stopwords=None,
- smoothing_method=DEFAULT_SMOOTHING,
- smoothing_width=2,
- smoothing_rounds=1,
- cutoff_policy=HC,
- demo_mode=False,
- ):
- if stopwords is None:
- from nltk.corpus import stopwords
- stopwords = stopwords.words('english')
- self.__dict__.update(locals())
- del self.__dict__['self']
- def tokenize(self, text):
- """Return a tokenized copy of *text*, where each "token" represents
- a separate topic."""
- lowercase_text = text.lower()
- paragraph_breaks = self._mark_paragraph_breaks(text)
- text_length = len(lowercase_text)
- # Tokenization step starts here
- # Remove punctuation
- nopunct_text = ''.join(
- c for c in lowercase_text if re.match("[a-z\-\' \n\t]", c)
- )
- nopunct_par_breaks = self._mark_paragraph_breaks(nopunct_text)
- tokseqs = self._divide_to_tokensequences(nopunct_text)
- # The morphological stemming step mentioned in the TextTile
- # paper is not implemented. A comment in the original C
- # implementation states that it offers no benefit to the
- # process. It might be interesting to test the existing
- # stemmers though.
- # words = _stem_words(words)
- # Filter stopwords
- for ts in tokseqs:
- ts.wrdindex_list = [
- wi for wi in ts.wrdindex_list if wi[0] not in self.stopwords
- ]
- token_table = self._create_token_table(tokseqs, nopunct_par_breaks)
- # End of the Tokenization step
- # Lexical score determination
- if self.similarity_method == BLOCK_COMPARISON:
- gap_scores = self._block_comparison(tokseqs, token_table)
- elif self.similarity_method == VOCABULARY_INTRODUCTION:
- raise NotImplementedError("Vocabulary introduction not implemented")
- else:
- raise ValueError(
- "Similarity method {} not recognized".format(self.similarity_method)
- )
- if self.smoothing_method == DEFAULT_SMOOTHING:
- smooth_scores = self._smooth_scores(gap_scores)
- else:
- raise ValueError(
- "Smoothing method {} not recognized".format(self.smoothing_method)
- )
- # End of Lexical score Determination
- # Boundary identification
- depth_scores = self._depth_scores(smooth_scores)
- segment_boundaries = self._identify_boundaries(depth_scores)
- normalized_boundaries = self._normalize_boundaries(
- text, segment_boundaries, paragraph_breaks
- )
- # End of Boundary Identification
- segmented_text = []
- prevb = 0
- for b in normalized_boundaries:
- if b == 0:
- continue
- segmented_text.append(text[prevb:b])
- prevb = b
- if prevb < text_length: # append any text that may be remaining
- segmented_text.append(text[prevb:])
- if not segmented_text:
- segmented_text = [text]
- if self.demo_mode:
- return gap_scores, smooth_scores, depth_scores, segment_boundaries
- return segmented_text
- def _block_comparison(self, tokseqs, token_table):
- """Implements the block comparison method"""
- def blk_frq(tok, block):
- ts_occs = filter(lambda o: o[0] in block, token_table[tok].ts_occurences)
- freq = sum([tsocc[1] for tsocc in ts_occs])
- return freq
- gap_scores = []
- numgaps = len(tokseqs) - 1
- for curr_gap in range(numgaps):
- score_dividend, score_divisor_b1, score_divisor_b2 = 0.0, 0.0, 0.0
- score = 0.0
- # adjust window size for boundary conditions
- if curr_gap < self.k - 1:
- window_size = curr_gap + 1
- elif curr_gap > numgaps - self.k:
- window_size = numgaps - curr_gap
- else:
- window_size = self.k
- b1 = [ts.index for ts in tokseqs[curr_gap - window_size + 1 : curr_gap + 1]]
- b2 = [ts.index for ts in tokseqs[curr_gap + 1 : curr_gap + window_size + 1]]
- for t in token_table:
- score_dividend += blk_frq(t, b1) * blk_frq(t, b2)
- score_divisor_b1 += blk_frq(t, b1) ** 2
- score_divisor_b2 += blk_frq(t, b2) ** 2
- try:
- score = score_dividend / math.sqrt(score_divisor_b1 * score_divisor_b2)
- except ZeroDivisionError:
- pass # score += 0.0
- gap_scores.append(score)
- return gap_scores
- def _smooth_scores(self, gap_scores):
- "Wraps the smooth function from the SciPy Cookbook"
- return list(
- smooth(numpy.array(gap_scores[:]), window_len=self.smoothing_width + 1)
- )
- def _mark_paragraph_breaks(self, text):
- """Identifies indented text or line breaks as the beginning of
- paragraphs"""
- MIN_PARAGRAPH = 100
- pattern = re.compile("[ \t\r\f\v]*\n[ \t\r\f\v]*\n[ \t\r\f\v]*")
- matches = pattern.finditer(text)
- last_break = 0
- pbreaks = [0]
- for pb in matches:
- if pb.start() - last_break < MIN_PARAGRAPH:
- continue
- else:
- pbreaks.append(pb.start())
- last_break = pb.start()
- return pbreaks
- def _divide_to_tokensequences(self, text):
- "Divides the text into pseudosentences of fixed size"
- w = self.w
- wrdindex_list = []
- matches = re.finditer("\w+", text)
- for match in matches:
- wrdindex_list.append((match.group(), match.start()))
- return [
- TokenSequence(i / w, wrdindex_list[i : i + w])
- for i in range(0, len(wrdindex_list), w)
- ]
- def _create_token_table(self, token_sequences, par_breaks):
- "Creates a table of TokenTableFields"
- token_table = {}
- current_par = 0
- current_tok_seq = 0
- pb_iter = par_breaks.__iter__()
- current_par_break = next(pb_iter)
- if current_par_break == 0:
- try:
- current_par_break = next(pb_iter) # skip break at 0
- except StopIteration:
- raise ValueError(
- "No paragraph breaks were found(text too short perhaps?)"
- )
- for ts in token_sequences:
- for word, index in ts.wrdindex_list:
- try:
- while index > current_par_break:
- current_par_break = next(pb_iter)
- current_par += 1
- except StopIteration:
- # hit bottom
- pass
- if word in token_table:
- token_table[word].total_count += 1
- if token_table[word].last_par != current_par:
- token_table[word].last_par = current_par
- token_table[word].par_count += 1
- if token_table[word].last_tok_seq != current_tok_seq:
- token_table[word].last_tok_seq = current_tok_seq
- token_table[word].ts_occurences.append([current_tok_seq, 1])
- else:
- token_table[word].ts_occurences[-1][1] += 1
- else: # new word
- token_table[word] = TokenTableField(
- first_pos=index,
- ts_occurences=[[current_tok_seq, 1]],
- total_count=1,
- par_count=1,
- last_par=current_par,
- last_tok_seq=current_tok_seq,
- )
- current_tok_seq += 1
- return token_table
- def _identify_boundaries(self, depth_scores):
- """Identifies boundaries at the peaks of similarity score
- differences"""
- boundaries = [0 for x in depth_scores]
- avg = sum(depth_scores) / len(depth_scores)
- stdev = numpy.std(depth_scores)
- # SB: what is the purpose of this conditional?
- if self.cutoff_policy == LC:
- cutoff = avg - stdev / 2.0
- else:
- cutoff = avg - stdev / 2.0
- depth_tuples = sorted(zip(depth_scores, range(len(depth_scores))))
- depth_tuples.reverse()
- hp = list(filter(lambda x: x[0] > cutoff, depth_tuples))
- for dt in hp:
- boundaries[dt[1]] = 1
- for dt2 in hp: # undo if there is a boundary close already
- if (
- dt[1] != dt2[1]
- and abs(dt2[1] - dt[1]) < 4
- and boundaries[dt2[1]] == 1
- ):
- boundaries[dt[1]] = 0
- return boundaries
- def _depth_scores(self, scores):
- """Calculates the depth of each gap, i.e. the average difference
- between the left and right peaks and the gap's score"""
- depth_scores = [0 for x in scores]
- # clip boundaries: this holds on the rule of thumb(my thumb)
- # that a section shouldn't be smaller than at least 2
- # pseudosentences for small texts and around 5 for larger ones.
- clip = min(max(len(scores) // 10, 2), 5)
- index = clip
- for gapscore in scores[clip:-clip]:
- lpeak = gapscore
- for score in scores[index::-1]:
- if score >= lpeak:
- lpeak = score
- else:
- break
- rpeak = gapscore
- for score in scores[index:]:
- if score >= rpeak:
- rpeak = score
- else:
- break
- depth_scores[index] = lpeak + rpeak - 2 * gapscore
- index += 1
- return depth_scores
- def _normalize_boundaries(self, text, boundaries, paragraph_breaks):
- """Normalize the boundaries identified to the original text's
- paragraph breaks"""
- norm_boundaries = []
- char_count, word_count, gaps_seen = 0, 0, 0
- seen_word = False
- for char in text:
- char_count += 1
- if char in " \t\n" and seen_word:
- seen_word = False
- word_count += 1
- if char not in " \t\n" and not seen_word:
- seen_word = True
- if gaps_seen < len(boundaries) and word_count > (
- max(gaps_seen * self.w, self.w)
- ):
- if boundaries[gaps_seen] == 1:
- # find closest paragraph break
- best_fit = len(text)
- for br in paragraph_breaks:
- if best_fit > abs(br - char_count):
- best_fit = abs(br - char_count)
- bestbr = br
- else:
- break
- if bestbr not in norm_boundaries: # avoid duplicates
- norm_boundaries.append(bestbr)
- gaps_seen += 1
- return norm_boundaries
- class TokenTableField(object):
- """A field in the token table holding parameters for each token,
- used later in the process"""
- def __init__(
- self,
- first_pos,
- ts_occurences,
- total_count=1,
- par_count=1,
- last_par=0,
- last_tok_seq=None,
- ):
- self.__dict__.update(locals())
- del self.__dict__['self']
- class TokenSequence(object):
- "A token list with its original length and its index"
- def __init__(self, index, wrdindex_list, original_length=None):
- original_length = original_length or len(wrdindex_list)
- self.__dict__.update(locals())
- del self.__dict__['self']
- # Pasted from the SciPy cookbook: http://www.scipy.org/Cookbook/SignalSmooth
- def smooth(x, window_len=11, window='flat'):
- """smooth the data using a window with requested size.
- This method is based on the convolution of a scaled window with the signal.
- The signal is prepared by introducing reflected copies of the signal
- (with the window size) in both ends so that transient parts are minimized
- in the beginning and end part of the output signal.
- :param x: the input signal
- :param window_len: the dimension of the smoothing window; should be an odd integer
- :param window: the type of window from 'flat', 'hanning', 'hamming', 'bartlett', 'blackman'
- flat window will produce a moving average smoothing.
- :return: the smoothed signal
- example::
- t=linspace(-2,2,0.1)
- x=sin(t)+randn(len(t))*0.1
- y=smooth(x)
- :see also: numpy.hanning, numpy.hamming, numpy.bartlett, numpy.blackman, numpy.convolve,
- scipy.signal.lfilter
- TODO: the window parameter could be the window itself if an array instead of a string
- """
- if x.ndim != 1:
- raise ValueError("smooth only accepts 1 dimension arrays.")
- if x.size < window_len:
- raise ValueError("Input vector needs to be bigger than window size.")
- if window_len < 3:
- return x
- if window not in ['flat', 'hanning', 'hamming', 'bartlett', 'blackman']:
- raise ValueError(
- "Window is on of 'flat', 'hanning', 'hamming', 'bartlett', 'blackman'"
- )
- s = numpy.r_[2 * x[0] - x[window_len:1:-1], x, 2 * x[-1] - x[-1:-window_len:-1]]
- # print(len(s))
- if window == 'flat': # moving average
- w = numpy.ones(window_len, 'd')
- else:
- w = eval('numpy.' + window + '(window_len)')
- y = numpy.convolve(w / w.sum(), s, mode='same')
- return y[window_len - 1 : -window_len + 1]
- def demo(text=None):
- from nltk.corpus import brown
- from matplotlib import pylab
- tt = TextTilingTokenizer(demo_mode=True)
- if text is None:
- text = brown.raw()[:10000]
- s, ss, d, b = tt.tokenize(text)
- pylab.xlabel("Sentence Gap index")
- pylab.ylabel("Gap Scores")
- pylab.plot(range(len(s)), s, label="Gap Scores")
- pylab.plot(range(len(ss)), ss, label="Smoothed Gap scores")
- pylab.plot(range(len(d)), d, label="Depth scores")
- pylab.stem(range(len(b)), b)
- pylab.legend()
- pylab.show()
|