-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathembedding_manager.py
More file actions
308 lines (253 loc) · 10.2 KB
/
embedding_manager.py
File metadata and controls
308 lines (253 loc) · 10.2 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
"""
Módulo para gerenciar embeddings de posts usando Ollama e ChromaDB.
"""
import ollama
from typing import List, Dict, Any
from chromadb import Client, Settings
from chromadb.config import Settings as ChromaSettings
import chromadb
from pathlib import Path
import json
class EmbeddingManager:
"""Classe para gerenciar embeddings e armazenamento vetorial."""
def __init__(
self,
collection_name: str = "instagram_posts",
embedding_model: str = "mxbai-embed-large",
persist_dir: str = "./chroma_db"
):
"""
Inicializa o gerenciador de embeddings.
Args:
collection_name: Nome da coleção no ChromaDB
embedding_model: Modelo Ollama para embeddings
persist_dir: Diretório para persistir o banco vetorial
"""
self.embedding_model = embedding_model
self.collection_name = collection_name
self.persist_dir = Path(persist_dir)
# Cria diretório se não existir
self.persist_dir.mkdir(exist_ok=True)
# Inicializa ChromaDB
self.client = chromadb.PersistentClient(path=str(self.persist_dir))
# Cria ou recupera coleção
try:
self.collection = self.client.get_collection(name=collection_name)
print(f"✓ Coleção '{collection_name}' carregada com {self.collection.count()} documentos")
except:
self.collection = self.client.create_collection(
name=collection_name,
metadata={"hnsw:space": "cosine"}
)
print(f"✓ Nova coleção '{collection_name}' criada")
def generate_embedding(self, text: str) -> List[float]:
"""
Gera embedding para um texto usando Ollama.
Args:
text: Texto para gerar embedding
Returns:
Lista de floats representando o embedding
"""
try:
response = ollama.embeddings(
model=self.embedding_model,
prompt=text
)
return response['embedding']
except Exception as e:
print(f"Erro ao gerar embedding: {e}")
raise
def add_posts(self, posts: List[Dict[str, Any]], batch_size: int = 100):
"""
Adiciona posts e notícias ao banco vetorial.
Args:
posts: Lista de posts/notícias processados
batch_size: Tamanho do lote para processamento
"""
total = len(posts)
print(f"\nIniciando indexação de {total} documentos...")
for i in range(0, total, batch_size):
batch = posts[i:i + batch_size]
documents = []
metadatas = []
ids = []
embeddings = []
for post in batch:
# Prepara documento
doc_text = post['text']
# Metadados base
metadata = {
'profile': post['profile'],
'url': post['url'],
'timestamp': post['timestamp'].isoformat(),
'likesCount': post['likesCount'],
'commentsCount': post['commentsCount'],
'type': post['type'],
'content_type': post.get('content_type', 'instagram_post'),
}
# Metadados específicos por tipo de conteúdo
if post.get('content_type') == 'news':
# Metadados de notícias
metadata.update({
'title': post.get('title', '')[:500],
'description': post.get('description', '')[:500],
'publisher_name': post.get('publisher_name', ''),
'publisher_link': post.get('publisher_link', ''),
'country': post.get('country', ''),
'language': post.get('language', ''),
})
else:
# Metadados de posts do Instagram
metadata.update({
'caption': post.get('caption', '')[:500],
'hashtags': json.dumps(post.get('hashtags', [])),
'mentions': json.dumps(post.get('mentions', [])),
})
# Gera embedding
embedding = self.generate_embedding(doc_text)
documents.append(doc_text)
metadatas.append(metadata)
ids.append(f"{post['profile']}_{post['id']}")
embeddings.append(embedding)
# Adiciona lote ao ChromaDB
self.collection.add(
documents=documents,
metadatas=metadatas,
ids=ids,
embeddings=embeddings
)
progress = min(i + batch_size, total)
print(f"Progresso: {progress}/{total} documentos indexados ({(progress/total)*100:.1f}%)")
print(f"✓ Indexação concluída! Total de documentos: {self.collection.count()}")
def search(
self,
query: str,
n_results: int = 5,
profile_filter: str = None,
content_type_filter: str = None
) -> Dict[str, Any]:
"""
Busca posts/notícias relevantes baseado em uma query.
Args:
query: Texto da busca
n_results: Número de resultados a retornar
profile_filter: Filtrar por perfil específico (opcional)
content_type_filter: Filtrar por tipo de conteúdo: 'instagram_post' ou 'news' (opcional)
Returns:
Dicionário com resultados da busca
"""
# Gera embedding da query
query_embedding = self.generate_embedding(query)
# Prepara filtros
where = {}
if profile_filter:
where['profile'] = profile_filter
if content_type_filter:
where['content_type'] = content_type_filter
# Busca no ChromaDB
results = self.collection.query(
query_embeddings=[query_embedding],
n_results=n_results,
where=where if where else None
)
return results
def clear_collection(self):
"""Remove todos os documentos da coleção."""
try:
self.client.delete_collection(name=self.collection_name)
self.collection = self.client.create_collection(
name=self.collection_name,
metadata={"hnsw:space": "cosine"}
)
print(f"✓ Coleção '{self.collection_name}' limpa")
except Exception as e:
print(f"Erro ao limpar coleção: {e}")
def get_stats(self) -> Dict[str, Any]:
"""
Retorna estatísticas sobre a coleção.
Returns:
Dicionário com estatísticas
"""
count = self.collection.count()
# Pega TODOS os documentos para garantir que obtém todos os perfis
if count > 0:
# Busca todos os documentos (só metadados, não documentos completos)
all_data = self.collection.get(
limit=count, # Pega todos
include=['metadatas'] # Só metadados para ser mais rápido
)
profiles = set()
for metadata in all_data['metadatas']:
profiles.add(metadata['profile'])
# Ordena os perfis alfabeticamente
profiles_list = sorted(list(profiles))
return {
'total_documents': count,
'profiles': profiles_list,
'collection_name': self.collection_name,
'embedding_model': self.embedding_model
}
return {
'total_documents': 0,
'profiles': [],
'collection_name': self.collection_name,
'embedding_model': self.embedding_model
}
def check_ollama_model(model_name: str) -> bool:
"""
Verifica se um modelo está disponível no Ollama.
Args:
model_name: Nome do modelo
Returns:
True se o modelo está disponível
"""
try:
models = ollama.list()
available_models = [model['name'] for model in models.get('models', [])]
# Verifica se o modelo ou uma variante está disponível
for available in available_models:
if model_name in available:
return True
return False
except Exception as e:
print(f"Erro ao verificar modelos Ollama: {e}")
return False
def install_embedding_model(model_name: str = "mxbai-embed-large"):
"""
Instala modelo de embedding via Ollama.
Args:
model_name: Nome do modelo a instalar
"""
print(f"\n📥 Instalando modelo de embedding: {model_name}")
print("Isso pode levar alguns minutos...")
try:
ollama.pull(model_name)
print(f"✓ Modelo {model_name} instalado com sucesso!")
except Exception as e:
print(f"✗ Erro ao instalar modelo: {e}")
print("\nTente instalar manualmente com:")
print(f" ollama pull {model_name}")
def main():
"""Função de teste do módulo."""
import sys
# Verifica se modelo de embedding está instalado
embedding_model = "mxbai-embed-large"
print("=== Verificando Modelos Ollama ===\n")
if not check_ollama_model(embedding_model):
print(f"⚠️ Modelo {embedding_model} não encontrado")
response = input("Deseja instalar agora? (s/n): ")
if response.lower() == 's':
install_embedding_model(embedding_model)
else:
print("Operação cancelada. Execute 'ollama pull mxbai-embed-large' manualmente.")
sys.exit(1)
else:
print(f"✓ Modelo {embedding_model} disponível")
# Teste básico
manager = EmbeddingManager()
print(f"\n=== Estatísticas da Coleção ===\n")
stats = manager.get_stats()
for key, value in stats.items():
print(f"{key}: {value}")
if __name__ == "__main__":
main()