-
Notifications
You must be signed in to change notification settings - Fork 6
Expand file tree
/
Copy pathphase1_handler.py
More file actions
238 lines (199 loc) · 8.96 KB
/
phase1_handler.py
File metadata and controls
238 lines (199 loc) · 8.96 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
"""Listens to incoming comments from reddit, checks if they match the 'subreddit recommendation' pattern, and saves them to the DB"""
import logging
import re
import json
import consts
import racb_db
import reddit_instantiator
import my_i18n as i18n
import repost_detector
import sub_name_string_match
def handle_incoming_comment(comment):
logging.debug(f'Handling a comment: {comment.permalink}')
target_subreddit = check_pattern(comment)
if target_subreddit is None:
return
if is_mod_post(comment):
return
if title_contains_prohibited_phrases(comment):
return
source_subreddit = comment.subreddit.display_name.lower()
if target_subreddit.lower() in consts.SUB_BLACKLIST or source_subreddit in consts.SUB_BLACKLIST:
return
if target_subreddit.lower() == source_subreddit:
logging.info('Found "source_subreddit=target_subreddt" comment. Replying to source comment.')
reply_to_source_equals_target_comment(comment)
return
result_obj = get_posts_with_same_content(comment, target_subreddit)
if result_obj.posts_found:
logging.info('Found post with same content. Replying to source comment.')
post_with_same_content=result_obj.posts[0]
reply_to_same_content_post_comment(comment, target_subreddit, post_with_same_content)
return
elif result_obj.unable_to_search and result_obj.unable_to_search_reason == 'SUBREDDIT_DOES_NOT_EXIST':
logging.info('Found a reference to a subreddit that does not exist. Replying to source comment.')
reply_to_nonexistent_target_subreddit_comment(comment, target_subreddit)
return
if not is_top_level_comment(comment):
return
logging.info(f'Match found: {comment.permalink}')
racb_db.add_comment(comment)
def is_mod_post(comment):
return comment.distinguished == 'moderator'
def is_top_level_comment(comment):
return comment.parent_id == comment.link_id
prohibited_phrases = ['sub', 'subs', 'subreddit', 'subreddits']
prohibited_phrases = [(r'\b' + x + r'\b') for x in prohibited_phrases]
prohibited_phrases = '|'.join(prohibited_phrases)
prohibited_phrases = re.compile(prohibited_phrases, flags=re.IGNORECASE)
def title_contains_prohibited_phrases(comment):
title = comment.submission.title
return prohibited_phrases.search(title) is not None
# subreddit name length must be between 2 and 24 characters
subreddit_regex = re.compile(r'^(/)?r/(?P<sub_name>[a-zA-Z0-9_]{2,24})$') # compile once
def check_pattern(comment):
search_result = subreddit_regex.search(comment.body)
if search_result is None:
return None
target_subreddit = search_result.group('sub_name')
return target_subreddit
def get_posts_with_same_content(comment, subreddit):
class Result:
posts_found =False
posts = []
unable_to_search = False
unable_to_search_reason = None
result = Result()
reddit = reddit_instantiator.get_reddit_instance()
try:
# The format of the query string is explained here: https://github.com/praw-dev/praw/issues/880
query = f'url:\"{comment.submission.url}\"'
submissions = reddit.subreddit(subreddit).search(query=query, sort='new', time_filter='all')
# iterate over submissions to fetch them
submissions = [s for s in submissions]
except Exception as e: #TODO change exception type to be specific
error_message = e.args[0]
# when reddit tries redirecting a search query of a link to the submission page, that means 0 results were found for the search query
if error_message == 'Redirect to /submit':
return result
# when reddit redirects to /subreddits/search that means the subreddit doesn't exist
elif error_message in ['Redirect to /subreddits/search', 'received 404 HTTP response']:
if e.response.text:
try:
response_obj = json.loads(e.response.text)
if response_obj['reason'] == 'banned':
result.unable_to_search = True
result.unable_to_search_reason = 'SUBREDDIT_IS_BANNED'
return result
except json.JSONDecodeError:
pass
result.unable_to_search = True
result.unable_to_search_reason = 'SUBREDDIT_DOES_NOT_EXIST'
return result
# this error is recieved when the subreddit is private
# "You must be invited to visit this community"
elif error_message == 'received 403 HTTP response':
result.unable_to_search = True
result.unable_to_search_reason = 'SUBREDDIT_IS_PRIVATE'
return result
else:
raise
if len(submissions) > 0:
result.posts_found = True
result.posts = submissions
return result
prior_posts = repost_detector.get_reposts_in_sub(comment, subreddit)
if prior_posts:
result.posts_found = True
result.posts = [reddit.submission(id=p['post_id']) for p in prior_posts]
return result
def reply_to_source_equals_target_comment(source_comment):
source_subreddit = source_comment.subreddit.display_name
text = i18n.get_translated_string(
'THATS_WHERE_WE_ARE',
source_subreddit,
bot_name=reddit_instantiator.SAME_SUBREDDIT_BOT_NAME
)
comment2 = get_comment_with_different_praw_instance(source_comment, reddit_instantiator.SAME_SUBREDDIT_BOT_NAME)
comment2.reply(text)
return
def reply_to_nonexistent_target_subreddit_comment(source_comment, target_subreddit):
source_subreddit = source_comment.subreddit.display_name
text = i18n.get_translated_string(
'NONEXISTENT_SUBREDDIT',
source_subreddit,
add_suffix = False,
)
text = text.format(target_subreddit=target_subreddit,)
matches = sub_name_string_match.get_matches(target_subreddit)
filtered_matches = [sub for sub in matches if is_subreddit_available(sub)]
if len(filtered_matches) == 0:
text2 = i18n.get_translated_string(
'MAYBE_TYPO',
source_subreddit,
add_suffix=False,
)
text = f'{text} {text2}'
else:
text2 = i18n.get_translated_string(
'ALTERNATE_SUBS_SUGGESTION',
source_subreddit,
add_suffix=False,
)
formatted_matches = [get_subreddit_suggestion_list_line(sub) for sub in filtered_matches]
alternate_subreddits_string = '\n'.join(formatted_matches)
text2 = text2.format(alternate_subreddits_string=alternate_subreddits_string)
text = f'{text}\n\n{text2}'
if is_subreddit_name_length_valid(target_subreddit):
text3 = i18n.get_translated_string(
'PROMPT_NONEXISTENT_SUBREDDIT_CREATION',
source_subreddit,
add_suffix=False,
bot_name=reddit_instantiator.SUB_DOESNT_EXIST_BOT_NAME,
)
text3 = text3.format(target_subreddit=target_subreddit,)
text = f'{text}\n\n{text3}'
text += i18n.get_suffix(
subreddit=target_subreddit,
bot_name=reddit_instantiator.SUB_DOESNT_EXIST_BOT_NAME,
)
comment2 = get_comment_with_different_praw_instance(source_comment, reddit_instantiator.SUB_DOESNT_EXIST_BOT_NAME)
comment2.reply(text)
return
def is_subreddit_name_length_valid(subreddit_name):
NEW_SUBREDDIT_NAME_MINIMUM_LENGTH = 3
NEW_SUBREDDIT_NAME_MAXIMUM_LENGTH = 24
return NEW_SUBREDDIT_NAME_MINIMUM_LENGTH <= len(subreddit_name) <= NEW_SUBREDDIT_NAME_MAXIMUM_LENGTH
def is_subreddit_available(subreddit_name):
reddit = reddit_instantiator.get_reddit_instance()
try:
sub_obj = reddit.subreddit(subreddit_name)
subscribers = sub_obj.subscribers
return True
except:
return False
def get_subreddit_suggestion_list_line(subreddit_name):
ret_val = f'* r/{subreddit_name} ('
reddit = reddit_instantiator.get_reddit_instance()
sub_obj = reddit.subreddit(subreddit_name)
if sub_obj.over18:
ret_val += '**NSFW**, '
ret_val += f'subscribers: {sub_obj.subscribers:,})'
return ret_val
# TODO: make up a better name for this function
def reply_to_same_content_post_comment(source_comment, target_subreddit, post_with_same_content):
source_subreddit = source_comment.subreddit.display_name
text = i18n.get_translated_string(
'FOUND_POST_WITH_SAME_CONTENT',
source_subreddit,
bot_name=reddit_instantiator.SAME_POST_BOT_NAME,
)
text = text.format(same_content_post_url=post_with_same_content.permalink,
target_subreddit=target_subreddit,)
comment2 = get_comment_with_different_praw_instance(source_comment, reddit_instantiator.SAME_POST_BOT_NAME)
comment2.reply(text)
return
def get_comment_with_different_praw_instance(comment, username):
reddit2 = reddit_instantiator.get_reddit_instance(username = username)
comment2 = reddit2.comment(id=comment.id)
return comment2