-
Notifications
You must be signed in to change notification settings - Fork 0
/
consumers.py
78 lines (58 loc) · 2.04 KB
/
consumers.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
import time
import json
import sqlite3
import datetime
import requests
import feedparser
from DTOs import Post
class BaseConsumer:
def __init__(self, url):
self.url = url
self.conn = sqlite3.connect('rss-aggregator.db')
def seen_before(_id):
"""checks if a post with that id has been seen before."""
c = self.conn.cursor
c.execute('SELECT * FROM history where id = %s' % _id)
post = c.fetchone()
return bool(post)
class RSSConsumer(BaseConsumer):
def get_new_posts(self):
"""collects all new content at self.url"""
rss = feedparser.parse(self.url)
posts = []
for item in rss.entries:
if not self.seen_before(item.id):
posts.append(
Post(item.title,
"{}\n{}".format(item.link, item.description),
item.id))
return posts
class RSSLinkContentConsumer(BaseConsumer):
def get_new_posts(self):
"""collects all content at self.url, checks if that content has been seen,
follows content links if not seen yet, returns list of documents found by following links
with ther RSS IDs."""
rss = feedparser.parse(self.url)
posts = []
for item in rss.entries:
if not self.seen_before(item.id):
response = requests.get(item.link)
if 200 <= response.status_code <= 299:
body = {
"text": response.text,
"_id": item.id
}
posts.append(Post(item.title, body, item.id))
return posts
class ConsoleConsumer:
def get_new_posts(self):
"""runs in an infinite loop until user breaks out"""
posts = []
while True:
try:
title = input('title: ')
content = input('content: ')
posts.append(Post(title, content))
except KeyboardInterrupt:
break
return posts