-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathprocessing.py
More file actions
123 lines (115 loc) · 4.83 KB
/
processing.py
File metadata and controls
123 lines (115 loc) · 4.83 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
import asyncio
import json
import time
from names_translator import Transliterator
class Profinder:
def __init__(self, client, groups, save_dump=None, dump=None, sleep=None):
self.client = client
self.dialogs = {}
self.users = {}
self.loop = asyncio.get_event_loop()
self.groups = groups
self.sleep = sleep
self.loop.run_until_complete(self.__get_dialogs())
if not dump:
self.loop.run_until_complete(self.__get_users())
else:
with open(dump, 'r') as f:
for u in f:
try:
uid = int(u.split(':')[0])
data = json.loads(':'.join(u.split(':')[1:]))
self.users[uid] = data
except Exception as e:
print(e)
if save_dump:
with open(save_dump, 'w') as f:
for u in self.users:
f.write(str(u) + ':' + json.dumps(self.users[u]) + '\n')
print(f'Saved results to {dump}')
async def __get_dialogs(self):
async for dialog in self.client.iter_dialogs():
if dialog.id < 0 and dialog.name in self.groups:
self.dialogs[dialog.id] = dialog.name
async def __get_users(self):
for d in self.dialogs.keys():
chat = await self.client.get_entity(d)
print('Dumping users from chat:', self.dialogs[d])
async for user in self.client.iter_participants(chat, aggressive=False):
full_name = ''
try:
full_name += user.first_name
full_name += ' '
except:
pass
try:
full_name += user.last_name
except:
pass
if user.id in self.users:
self.users[user.id]['chat_ids'].append(d)
self.users[user.id]['chat_names'].append(self.dialogs[d])
else:
link = None
if user.username:
link = f'https://t.me/{user.username}'
else:
link = f'[{full_name}](tg://user?id={user.id})'
self.users[user.id] = {'name':full_name, 'username':user.username, 'phone':user.phone, 'link':link, 'chat_ids':[d], 'chat_names':[self.dialogs[d]]}
if self.sleep:
time.sleep(self.sleep)
print('Number of users:', len(self.users))
def __extend_names(self, names):
pass
def findUser(self, user_names):
return self.loop.run_until_complete(self.__findUser(user_names))
async def __findUser(self, user_names):
matches = {}
tr = Transliterator()
for user_id in self.users:
user = self.users[user_id]
for name in user_names:
n = name.split(' ')
if len(n) < 2:
n.append('')
variants = tr.transliterate(n[0], n[1], '', use_ukrainian_transliteration=False)
found = False
for v in variants:
v_ = v.split(' ')
found = True
for v__ in v_:
if v__.lower() not in user['name'].lower():
found = False
break
if found:
break
if found:
matches[user_id] = user
return matches
# Used to search using telegram api search engine
# Failes on big data
'''for d in self.dialogs.keys():
chat = await self.client.get_entity(d)
for name in user_names:
async for user in self.client.iter_participants(chat, aggressive=False, limit=2000, search=name):
full_name = ''
try:
full_name += user.first_name
full_name += ' '
except:
pass
try:
full_name += user.last_name
except:
pass
if user.id in matches:
matches[user.id]['chat_ids'].append(d)
matches[user.id]['chat_names'].append(self.dialogs[d])
else:
link = None
if user.username:
link = f'https://t.me/{user.username}'
else:
link = f'[{full_name}](tg://user?id={user.id})'
matches[user.id] = {'name':full_name, 'username':user.username, 'phone':user.phone, 'link':link, 'chat_ids':[d], 'chat_names':[self.dialogs[d]]}
return matches'''