-
Notifications
You must be signed in to change notification settings - Fork 0
Add walker and hamming and tests for them #8
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: main
Are you sure you want to change the base?
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,66 @@ | ||
| def reducent_bits(n): | ||
| """ | ||
| Рассчитывает редуцентные биты p | ||
| """ | ||
| p = 0 | ||
| while (2**p) < n + p + 1: | ||
| p += 1 | ||
| return p | ||
|
|
||
| def encode(string): | ||
| """ | ||
| Принимает строку из битов | ||
| Кодирует строку кодом Хэмминга | ||
| Возвращает строку битов в кодировке Хэмминга | ||
| """ | ||
| bits = [int(b) for b in string] | ||
| n = len(bits) | ||
| p = reducent_bits(n) | ||
|
|
||
| encoded_bits = [0] * (n + p) | ||
|
|
||
| idx = 0 | ||
| for i in range(1, len(encoded_bits) + 1): | ||
| if not(i & (i - 1) == 0): | ||
| encoded_bits[i-1] = bits[idx] | ||
| idx += 1 | ||
|
|
||
| for i in range(p): | ||
| parity_pos = 2**i | ||
| checked_bits = [] | ||
| for j in range(1, len(encoded_bits) + 1): | ||
| if j & parity_pos: | ||
| checked_bits.append(encoded_bits[j-1]) | ||
|
|
||
| encoded_bits[parity_pos-1] = sum(checked_bits) % 2 | ||
|
|
||
| return "".join(map(str, encoded_bits)) | ||
|
|
||
| def decode(encoded_string): | ||
| """ | ||
| Принимает закодированную строку из 0 и 1 | ||
| Декодирует и проверяет | ||
| В случае ошибки возвращает -1 (успех) или индекс позиции с ошибкой | ||
| """ | ||
| received_bits = [int(b) for b in encoded_string] | ||
| n = len(received_bits) | ||
| p = 0 | ||
| while (2**p) < n: | ||
| p += 1 | ||
|
|
||
| status = 0 | ||
|
|
||
| for i in range(p): | ||
| parity_pos = 2**i | ||
| checked_bits = [] | ||
| for j in range(1, n + 1): | ||
| if j & parity_pos: | ||
| checked_bits.append(received_bits[j-1]) | ||
|
|
||
| if sum(checked_bits) % 2 != 0: | ||
| status += parity_pos | ||
|
|
||
| if status == 0: | ||
| return -1 | ||
| else: | ||
| return status | ||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,54 @@ | ||
| import random | ||
| from collections import deque | ||
|
|
||
| class WalkersAlias: | ||
| def __init__(self, events_and_probs): | ||
|
Collaborator
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Названия переменных очень странные. Вы вообще не используете принятую у нас терминологию: донор, рецепиент, барьер -- всего этого нет |
||
| if not events_and_probs: | ||
| raise ValueError("Список событий пуст") | ||
|
|
||
| self.events = [item[0] for item in events_and_probs] | ||
| probs = [item[1] for item in events_and_probs] | ||
| self.ln = len(self.events) | ||
|
|
||
| if min(probs) < 0: | ||
| raise ValueError("Вероятность не может быть отрицательной") | ||
|
|
||
| result = sum(probs) | ||
| if not(abs(result - 1.0) < 1e-6): | ||
| raise ValueError("Сумма вероятностей должна быть близка 1") | ||
|
Comment on lines
+16
to
+18
Collaborator
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Почему "близка"? Она должна быть ему равна |
||
|
|
||
| columns_probs = [p * self.ln for p in probs] | ||
|
|
||
| self.alias_table = [-1] * self.ln | ||
| self.prob_table = [0.0] * self.ln | ||
|
|
||
| short = deque([i for i, p in enumerate(columns_probs) if p < 1.0]) | ||
| long = deque([i for i, p in enumerate(columns_probs) if p >= 1.0]) | ||
|
|
||
| while short and long: | ||
| shrt = short.popleft() | ||
| lng = long.popleft() | ||
|
|
||
| self.prob_table[shrt] = columns_probs[shrt] | ||
| self.alias_table[shrt] = lng | ||
|
|
||
| columns_probs[lng] -= (1.0 - columns_probs[shrt]) | ||
|
|
||
| if columns_probs[lng] < 1.0: | ||
| short.append(lng) | ||
| else: | ||
| long.append(lng) | ||
|
|
||
| while short: | ||
| self.prob_table[short.popleft()] = 1.0 | ||
| while long: | ||
| self.prob_table[long.popleft()] = 1.0 | ||
|
|
||
| def get_random(self): | ||
| xi = random.randint(0, self.ln - 1) | ||
| eta = random.random() | ||
|
|
||
| if eta <= self.prob_table[xi]: | ||
| return self.events[xi] | ||
| else: | ||
| return self.events[self.alias_table[xi]] | ||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,12 @@ | ||
| from src.hamming import encode, decode | ||
|
|
||
| def test_encode(): | ||
| assert encode('1011') == '0110011' | ||
| assert encode('11010110') == '001010100110' | ||
|
|
||
| def test_decode(): | ||
| encoded_data4 = '0110011' | ||
| assert decode(encoded_data4) == -1 | ||
|
|
||
| encoded_data8 = '001010100110' | ||
| assert decode(encoded_data8) == -1 |
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,38 @@ | ||
| import math | ||
| from src.walker import WalkersAlias | ||
|
|
||
| N_TESTS = 100000 | ||
| INACCURACY = 0.01 | ||
|
|
||
| def test_same_distribution(): | ||
| distribution = [("A", 0.5), ("B", 0.5)] | ||
| walker = WalkersAlias(distribution) | ||
| results = {"A": 0, "B": 0} | ||
|
|
||
| for _ in range(N_TESTS): | ||
| event = walker.get_random() | ||
| results[event] += 1 | ||
|
|
||
| assert math.isclose(results["A"] / N_TESTS, 0.5, abs_tol=INACCURACY) | ||
| assert math.isclose(results["B"] / N_TESTS, 0.5, abs_tol=INACCURACY) | ||
|
|
||
| def test_multiple_events(): | ||
| distribution = [("A", 0.65), ("B", 0.1), ("C", 0.2), ("D", 0.05)] | ||
| walker = WalkersAlias(distribution) | ||
| results = {e: 0 for e, p in distribution} | ||
|
|
||
| for _ in range(N_TESTS): | ||
| event = walker.get_random() | ||
| results[event] += 1 | ||
|
|
||
| assert math.isclose(results["A"] / N_TESTS, 0.65, abs_tol=INACCURACY) | ||
| assert math.isclose(results["B"] / N_TESTS, 0.1, abs_tol=INACCURACY) | ||
| assert math.isclose(results["C"] / N_TESTS, 0.2, abs_tol=INACCURACY) | ||
| assert math.isclose(results["D"] / N_TESTS, 0.05, abs_tol=INACCURACY) | ||
|
|
||
| def test_inaccuracy(): | ||
| p = 0.1 | ||
| distribution = [("A", p)] * 10 | ||
| walker = WalkersAlias(distribution) | ||
|
|
||
| assert walker.ln == 10 |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Что такое "редуцентные" биты? Документация должна пояснять функцию. Почему возвращает число, а не список, если она рассчитывает биты?