twitterclient.py 19 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559
  1. # -*- coding: utf-8 -*-
  2. # Natural Language Toolkit: Twitter client
  3. #
  4. # Copyright (C) 2001-2019 NLTK Project
  5. # Author: Ewan Klein <ewan@inf.ed.ac.uk>
  6. # Lorenzo Rubio <lrnzcig@gmail.com>
  7. # URL: <http://nltk.org/>
  8. # For license information, see LICENSE.TXT
  9. """
  10. NLTK Twitter client
  11. This module offers methods for collecting and processing Tweets. Most of the
  12. functionality depends on access to the Twitter APIs, and this is handled via
  13. the third party Twython library.
  14. If one of the methods below returns an integer, it is probably a `Twitter
  15. error code <https://dev.twitter.com/overview/api/response-codes>`_. For
  16. example, the response of '420' means that you have reached the limit of the
  17. requests you can currently make to the Twitter API. Currently, `rate limits
  18. for the search API <https://dev.twitter.com/rest/public/rate-limiting>`_ are
  19. divided into 15 minute windows.
  20. """
  21. import datetime
  22. import itertools
  23. import json
  24. import os
  25. import time
  26. import gzip
  27. import requests
  28. from twython import Twython, TwythonStreamer
  29. from twython.exceptions import TwythonRateLimitError, TwythonError
  30. from nltk.twitter.util import credsfromfile, guess_path
  31. from nltk.twitter.api import TweetHandlerI, BasicTweetHandler
  32. class Streamer(TwythonStreamer):
  33. """
  34. Retrieve data from the Twitter Streaming API.
  35. The streaming API requires
  36. `OAuth 1.0 <http://en.wikipedia.org/wiki/OAuth>`_ authentication.
  37. """
  38. def __init__(self, app_key, app_secret, oauth_token, oauth_token_secret):
  39. self.handler = None
  40. self.do_continue = True
  41. TwythonStreamer.__init__(
  42. self, app_key, app_secret, oauth_token, oauth_token_secret
  43. )
  44. def register(self, handler):
  45. """
  46. Register a method for handling Tweets.
  47. :param TweetHandlerI handler: method for viewing
  48. """
  49. self.handler = handler
  50. def on_success(self, data):
  51. """
  52. :param data: response from Twitter API
  53. """
  54. if self.do_continue:
  55. if self.handler is not None:
  56. if 'text' in data:
  57. self.handler.counter += 1
  58. self.handler.handle(data)
  59. self.do_continue = self.handler.do_continue()
  60. else:
  61. raise ValueError("No data handler has been registered.")
  62. else:
  63. self.disconnect()
  64. self.handler.on_finish()
  65. def on_error(self, status_code, data):
  66. """
  67. :param status_code: The status code returned by the Twitter API
  68. :param data: The response from Twitter API
  69. """
  70. print(status_code)
  71. def sample(self):
  72. """
  73. Wrapper for 'statuses / sample' API call
  74. """
  75. while self.do_continue:
  76. # Stream in an endless loop until limit is reached. See twython
  77. # issue 288: https://github.com/ryanmcgrath/twython/issues/288
  78. # colditzjb commented on 9 Dec 2014
  79. try:
  80. self.statuses.sample()
  81. except requests.exceptions.ChunkedEncodingError as e:
  82. if e is not None:
  83. print("Error (stream will continue): {0}".format(e))
  84. continue
  85. def filter(self, track='', follow='', lang='en'):
  86. """
  87. Wrapper for 'statuses / filter' API call
  88. """
  89. while self.do_continue:
  90. # Stream in an endless loop until limit is reached
  91. try:
  92. if track == '' and follow == '':
  93. msg = "Please supply a value for 'track', 'follow'"
  94. raise ValueError(msg)
  95. self.statuses.filter(track=track, follow=follow, lang=lang)
  96. except requests.exceptions.ChunkedEncodingError as e:
  97. if e is not None:
  98. print("Error (stream will continue): {0}".format(e))
  99. continue
  100. class Query(Twython):
  101. """
  102. Retrieve data from the Twitter REST API.
  103. """
  104. def __init__(self, app_key, app_secret, oauth_token, oauth_token_secret):
  105. self.handler = None
  106. self.do_continue = True
  107. Twython.__init__(self, app_key, app_secret, oauth_token, oauth_token_secret)
  108. def register(self, handler):
  109. """
  110. Register a method for handling Tweets.
  111. :param TweetHandlerI handler: method for viewing or writing Tweets to a file.
  112. """
  113. self.handler = handler
  114. def expand_tweetids(self, ids_f, verbose=True):
  115. """
  116. Given a file object containing a list of Tweet IDs, fetch the
  117. corresponding full Tweets from the Twitter API.
  118. The API call `statuses/lookup` will fail to retrieve a Tweet if the
  119. user has deleted it.
  120. This call to the Twitter API is rate-limited. See
  121. <https://dev.twitter.com/rest/reference/get/statuses/lookup> for details.
  122. :param ids_f: input file object consisting of Tweet IDs, one to a line
  123. :return: iterable of Tweet objects in JSON format
  124. """
  125. ids = [line.strip() for line in ids_f if line]
  126. if verbose:
  127. print("Counted {0} Tweet IDs in {1}.".format(len(ids), ids_f))
  128. # The Twitter endpoint takes lists of up to 100 ids, so we chunk the
  129. # ids.
  130. id_chunks = [ids[i : i + 100] for i in range(0, len(ids), 100)]
  131. chunked_tweets = (self.lookup_status(id=chunk) for chunk in id_chunks)
  132. return itertools.chain.from_iterable(chunked_tweets)
  133. def _search_tweets(self, keywords, limit=100, lang='en'):
  134. """
  135. Assumes that the handler has been informed. Fetches Tweets from
  136. search_tweets generator output and passses them to handler
  137. :param str keywords: A list of query terms to search for, written as\
  138. a comma-separated string.
  139. :param int limit: Number of Tweets to process
  140. :param str lang: language
  141. """
  142. while True:
  143. tweets = self.search_tweets(
  144. keywords=keywords, limit=limit, lang=lang, max_id=self.handler.max_id
  145. )
  146. for tweet in tweets:
  147. self.handler.handle(tweet)
  148. if not (self.handler.do_continue() and self.handler.repeat):
  149. break
  150. self.handler.on_finish()
  151. def search_tweets(
  152. self,
  153. keywords,
  154. limit=100,
  155. lang='en',
  156. max_id=None,
  157. retries_after_twython_exception=0,
  158. ):
  159. """
  160. Call the REST API ``'search/tweets'`` endpoint with some plausible
  161. defaults. See `the Twitter search documentation
  162. <https://dev.twitter.com/rest/public/search>`_ for more information
  163. about admissible search parameters.
  164. :param str keywords: A list of query terms to search for, written as\
  165. a comma-separated string
  166. :param int limit: Number of Tweets to process
  167. :param str lang: language
  168. :param int max_id: id of the last tweet fetched
  169. :param int retries_after_twython_exception: number of retries when\
  170. searching Tweets before raising an exception
  171. :rtype: python generator
  172. """
  173. if not self.handler:
  174. # if no handler is provided, `BasicTweetHandler` provides minimum
  175. # functionality for limiting the number of Tweets retrieved
  176. self.handler = BasicTweetHandler(limit=limit)
  177. count_from_query = 0
  178. if max_id:
  179. self.handler.max_id = max_id
  180. else:
  181. results = self.search(
  182. q=keywords, count=min(100, limit), lang=lang, result_type='recent'
  183. )
  184. count = len(results['statuses'])
  185. if count == 0:
  186. print("No Tweets available through REST API for those keywords")
  187. return
  188. count_from_query = count
  189. self.handler.max_id = results['statuses'][count - 1]['id'] - 1
  190. for result in results['statuses']:
  191. yield result
  192. self.handler.counter += 1
  193. if self.handler.do_continue() == False:
  194. return
  195. # Pagination loop: keep fetching Tweets until the desired count is
  196. # reached while dealing with Twitter rate limits.
  197. retries = 0
  198. while count_from_query < limit:
  199. try:
  200. mcount = min(100, limit - count_from_query)
  201. results = self.search(
  202. q=keywords,
  203. count=mcount,
  204. lang=lang,
  205. max_id=self.handler.max_id,
  206. result_type='recent',
  207. )
  208. except TwythonRateLimitError as e:
  209. print("Waiting for 15 minutes -{0}".format(e))
  210. time.sleep(15 * 60) # wait 15 minutes
  211. continue
  212. except TwythonError as e:
  213. print("Fatal error in Twython request -{0}".format(e))
  214. if retries_after_twython_exception == retries:
  215. raise e
  216. retries += 1
  217. count = len(results['statuses'])
  218. if count == 0:
  219. print("No more Tweets available through rest api")
  220. return
  221. count_from_query += count
  222. # the max_id is also present in the Tweet metadata
  223. # results['search_metadata']['next_results'], but as part of a
  224. # query and difficult to fetch. This is doing the equivalent
  225. # (last tweet id minus one)
  226. self.handler.max_id = results['statuses'][count - 1]['id'] - 1
  227. for result in results['statuses']:
  228. yield result
  229. self.handler.counter += 1
  230. if self.handler.do_continue() == False:
  231. return
  232. def user_info_from_id(self, userids):
  233. """
  234. Convert a list of userIDs into a variety of information about the users.
  235. See <https://dev.twitter.com/rest/reference/get/users/show>.
  236. :param list userids: A list of integer strings corresponding to Twitter userIDs
  237. :rtype: list(json)
  238. """
  239. return [self.show_user(user_id=userid) for userid in userids]
  240. def user_tweets(self, screen_name, limit, include_rts='false'):
  241. """
  242. Return a collection of the most recent Tweets posted by the user
  243. :param str user: The user's screen name; the initial '@' symbol\
  244. should be omitted
  245. :param int limit: The number of Tweets to recover; 200 is the maximum allowed
  246. :param str include_rts: Whether to include statuses which have been\
  247. retweeted by the user; possible values are 'true' and 'false'
  248. """
  249. data = self.get_user_timeline(
  250. screen_name=screen_name, count=limit, include_rts=include_rts
  251. )
  252. for item in data:
  253. self.handler.handle(item)
  254. class Twitter(object):
  255. """
  256. Wrapper class with restricted functionality and fewer options.
  257. """
  258. def __init__(self):
  259. self._oauth = credsfromfile()
  260. self.streamer = Streamer(**self._oauth)
  261. self.query = Query(**self._oauth)
  262. def tweets(
  263. self,
  264. keywords='',
  265. follow='',
  266. to_screen=True,
  267. stream=True,
  268. limit=100,
  269. date_limit=None,
  270. lang='en',
  271. repeat=False,
  272. gzip_compress=False,
  273. ):
  274. """
  275. Process some Tweets in a simple manner.
  276. :param str keywords: Keywords to use for searching or filtering
  277. :param list follow: UserIDs to use for filtering Tweets from the public stream
  278. :param bool to_screen: If `True`, display the tweet texts on the screen,\
  279. otherwise print to a file
  280. :param bool stream: If `True`, use the live public stream,\
  281. otherwise search past public Tweets
  282. :param int limit: The number of data items to process in the current\
  283. round of processing.
  284. :param tuple date_limit: The date at which to stop collecting\
  285. new data. This should be entered as a tuple which can serve as the\
  286. argument to `datetime.datetime`.\
  287. E.g. `date_limit=(2015, 4, 1, 12, 40)` for 12:30 pm on April 1 2015.
  288. Note that, in the case of streaming, this is the maximum date, i.e.\
  289. a date in the future; if not, it is the minimum date, i.e. a date\
  290. in the past
  291. :param str lang: language
  292. :param bool repeat: A flag to determine whether multiple files should\
  293. be written. If `True`, the length of each file will be set by the\
  294. value of `limit`. Use only if `to_screen` is `False`. See also
  295. :py:func:`handle`.
  296. :param gzip_compress: if `True`, output files are compressed with gzip.
  297. """
  298. if stream:
  299. upper_date_limit = date_limit
  300. lower_date_limit = None
  301. else:
  302. upper_date_limit = None
  303. lower_date_limit = date_limit
  304. if to_screen:
  305. handler = TweetViewer(
  306. limit=limit,
  307. upper_date_limit=upper_date_limit,
  308. lower_date_limit=lower_date_limit,
  309. )
  310. else:
  311. handler = TweetWriter(
  312. limit=limit,
  313. upper_date_limit=upper_date_limit,
  314. lower_date_limit=lower_date_limit,
  315. repeat=repeat,
  316. gzip_compress=gzip_compress,
  317. )
  318. if to_screen:
  319. handler = TweetViewer(limit=limit)
  320. else:
  321. if stream:
  322. upper_date_limit = date_limit
  323. lower_date_limit = None
  324. else:
  325. upper_date_limit = None
  326. lower_date_limit = date_limit
  327. handler = TweetWriter(
  328. limit=limit,
  329. upper_date_limit=upper_date_limit,
  330. lower_date_limit=lower_date_limit,
  331. repeat=repeat,
  332. gzip_compress=gzip_compress,
  333. )
  334. if stream:
  335. self.streamer.register(handler)
  336. if keywords == '' and follow == '':
  337. self.streamer.sample()
  338. else:
  339. self.streamer.filter(track=keywords, follow=follow, lang=lang)
  340. else:
  341. self.query.register(handler)
  342. if keywords == '':
  343. raise ValueError("Please supply at least one keyword to search for.")
  344. else:
  345. self.query._search_tweets(keywords, limit=limit, lang=lang)
  346. class TweetViewer(TweetHandlerI):
  347. """
  348. Handle data by sending it to the terminal.
  349. """
  350. def handle(self, data):
  351. """
  352. Direct data to `sys.stdout`
  353. :return: return ``False`` if processing should cease, otherwise return ``True``.
  354. :rtype: bool
  355. :param data: Tweet object returned by Twitter API
  356. """
  357. text = data['text']
  358. print(text)
  359. self.check_date_limit(data)
  360. if self.do_stop:
  361. return
  362. def on_finish(self):
  363. print('Written {0} Tweets'.format(self.counter))
  364. class TweetWriter(TweetHandlerI):
  365. """
  366. Handle data by writing it to a file.
  367. """
  368. def __init__(
  369. self,
  370. limit=2000,
  371. upper_date_limit=None,
  372. lower_date_limit=None,
  373. fprefix='tweets',
  374. subdir='twitter-files',
  375. repeat=False,
  376. gzip_compress=False,
  377. ):
  378. """
  379. The difference between the upper and lower date limits depends on
  380. whether Tweets are coming in an ascending date order (i.e. when
  381. streaming) or descending date order (i.e. when searching past Tweets).
  382. :param int limit: number of data items to process in the current\
  383. round of processing.
  384. :param tuple upper_date_limit: The date at which to stop collecting new\
  385. data. This should be entered as a tuple which can serve as the\
  386. argument to `datetime.datetime`. E.g. `upper_date_limit=(2015, 4, 1, 12,\
  387. 40)` for 12:30 pm on April 1 2015.
  388. :param tuple lower_date_limit: The date at which to stop collecting new\
  389. data. See `upper_data_limit` for formatting.
  390. :param str fprefix: The prefix to use in creating file names for Tweet\
  391. collections.
  392. :param str subdir: The name of the directory where Tweet collection\
  393. files should be stored.
  394. :param bool repeat: flag to determine whether multiple files should be\
  395. written. If `True`, the length of each file will be set by the value\
  396. of `limit`. See also :py:func:`handle`.
  397. :param gzip_compress: if `True`, ouput files are compressed with gzip.
  398. """
  399. self.fprefix = fprefix
  400. self.subdir = guess_path(subdir)
  401. self.gzip_compress = gzip_compress
  402. self.fname = self.timestamped_file()
  403. self.repeat = repeat
  404. self.output = None
  405. TweetHandlerI.__init__(self, limit, upper_date_limit, lower_date_limit)
  406. def timestamped_file(self):
  407. """
  408. :return: timestamped file name
  409. :rtype: str
  410. """
  411. subdir = self.subdir
  412. fprefix = self.fprefix
  413. if subdir:
  414. if not os.path.exists(subdir):
  415. os.mkdir(subdir)
  416. fname = os.path.join(subdir, fprefix)
  417. fmt = '%Y%m%d-%H%M%S'
  418. timestamp = datetime.datetime.now().strftime(fmt)
  419. if self.gzip_compress:
  420. suffix = '.gz'
  421. else:
  422. suffix = ''
  423. outfile = '{0}.{1}.json{2}'.format(fname, timestamp, suffix)
  424. return outfile
  425. def handle(self, data):
  426. """
  427. Write Twitter data as line-delimited JSON into one or more files.
  428. :return: return `False` if processing should cease, otherwise return `True`.
  429. :param data: tweet object returned by Twitter API
  430. """
  431. if self.startingup:
  432. if self.gzip_compress:
  433. self.output = gzip.open(self.fname, 'w')
  434. else:
  435. self.output = open(self.fname, 'w')
  436. print('Writing to {0}'.format(self.fname))
  437. json_data = json.dumps(data)
  438. if self.gzip_compress:
  439. self.output.write((json_data + "\n").encode('utf-8'))
  440. else:
  441. self.output.write(json_data + "\n")
  442. self.check_date_limit(data)
  443. if self.do_stop:
  444. return
  445. self.startingup = False
  446. def on_finish(self):
  447. print('Written {0} Tweets'.format(self.counter))
  448. if self.output:
  449. self.output.close()
  450. def do_continue(self):
  451. if self.repeat == False:
  452. return TweetHandlerI.do_continue(self)
  453. if self.do_stop:
  454. # stop for a functional cause (e.g. date limit)
  455. return False
  456. if self.counter == self.limit:
  457. # repeat is True, thus close output file and
  458. # create a new one
  459. self._restart_file()
  460. return True
  461. def _restart_file(self):
  462. self.on_finish()
  463. self.fname = self.timestamped_file()
  464. self.startingup = True
  465. self.counter = 0