diff --git a/src/hamming.py b/src/hamming.py new file mode 100644 index 0000000..521668a --- /dev/null +++ b/src/hamming.py @@ -0,0 +1,66 @@ +def reducent_bits(n): + """ + Рассчитывает редуцентные биты p + """ + p = 0 + while (2**p) < n + p + 1: + p += 1 + return p + +def encode(string): + """ + Принимает строку из битов + Кодирует строку кодом Хэмминга + Возвращает строку битов в кодировке Хэмминга + """ + bits = [int(b) for b in string] + n = len(bits) + p = reducent_bits(n) + + encoded_bits = [0] * (n + p) + + idx = 0 + for i in range(1, len(encoded_bits) + 1): + if not(i & (i - 1) == 0): + encoded_bits[i-1] = bits[idx] + idx += 1 + + for i in range(p): + parity_pos = 2**i + checked_bits = [] + for j in range(1, len(encoded_bits) + 1): + if j & parity_pos: + checked_bits.append(encoded_bits[j-1]) + + encoded_bits[parity_pos-1] = sum(checked_bits) % 2 + + return "".join(map(str, encoded_bits)) + +def decode(encoded_string): + """ + Принимает закодированную строку из 0 и 1 + Декодирует и проверяет + В случае ошибки возвращает -1 (успех) или индекс позиции с ошибкой + """ + received_bits = [int(b) for b in encoded_string] + n = len(received_bits) + p = 0 + while (2**p) < n: + p += 1 + + status = 0 + + for i in range(p): + parity_pos = 2**i + checked_bits = [] + for j in range(1, n + 1): + if j & parity_pos: + checked_bits.append(received_bits[j-1]) + + if sum(checked_bits) % 2 != 0: + status += parity_pos + + if status == 0: + return -1 + else: + return status \ No newline at end of file diff --git a/src/walker.py b/src/walker.py new file mode 100644 index 0000000..934914c --- /dev/null +++ b/src/walker.py @@ -0,0 +1,54 @@ +import random +from collections import deque + +class WalkersAlias: + def __init__(self, events_and_probs): + if not events_and_probs: + raise ValueError("Список событий пуст") + + self.events = [item[0] for item in events_and_probs] + probs = [item[1] for item in events_and_probs] + self.ln = len(self.events) + + if min(probs) < 0: + raise ValueError("Вероятность не может быть отрицательной") + + result = sum(probs) + if not(abs(result - 1.0) < 1e-6): + raise ValueError("Сумма вероятностей должна быть близка 1") + + columns_probs = [p * self.ln for p in probs] + + self.alias_table = [-1] * self.ln + self.prob_table = [0.0] * self.ln + + short = deque([i for i, p in enumerate(columns_probs) if p < 1.0]) + long = deque([i for i, p in enumerate(columns_probs) if p >= 1.0]) + + while short and long: + shrt = short.popleft() + lng = long.popleft() + + self.prob_table[shrt] = columns_probs[shrt] + self.alias_table[shrt] = lng + + columns_probs[lng] -= (1.0 - columns_probs[shrt]) + + if columns_probs[lng] < 1.0: + short.append(lng) + else: + long.append(lng) + + while short: + self.prob_table[short.popleft()] = 1.0 + while long: + self.prob_table[long.popleft()] = 1.0 + + def get_random(self): + xi = random.randint(0, self.ln - 1) + eta = random.random() + + if eta <= self.prob_table[xi]: + return self.events[xi] + else: + return self.events[self.alias_table[xi]] \ No newline at end of file diff --git a/test/test_hamming.py b/test/test_hamming.py new file mode 100644 index 0000000..8469ff5 --- /dev/null +++ b/test/test_hamming.py @@ -0,0 +1,12 @@ +from src.hamming import encode, decode + +def test_encode(): + assert encode('1011') == '0110011' + assert encode('11010110') == '001010100110' + +def test_decode(): + encoded_data4 = '0110011' + assert decode(encoded_data4) == -1 + + encoded_data8 = '001010100110' + assert decode(encoded_data8) == -1 \ No newline at end of file diff --git a/test/test_walker.py b/test/test_walker.py new file mode 100644 index 0000000..9c76c8a --- /dev/null +++ b/test/test_walker.py @@ -0,0 +1,38 @@ +import math +from src.walker import WalkersAlias + +N_TESTS = 100000 +INACCURACY = 0.01 + +def test_same_distribution(): + distribution = [("A", 0.5), ("B", 0.5)] + walker = WalkersAlias(distribution) + results = {"A": 0, "B": 0} + + for _ in range(N_TESTS): + event = walker.get_random() + results[event] += 1 + + assert math.isclose(results["A"] / N_TESTS, 0.5, abs_tol=INACCURACY) + assert math.isclose(results["B"] / N_TESTS, 0.5, abs_tol=INACCURACY) + +def test_multiple_events(): + distribution = [("A", 0.65), ("B", 0.1), ("C", 0.2), ("D", 0.05)] + walker = WalkersAlias(distribution) + results = {e: 0 for e, p in distribution} + + for _ in range(N_TESTS): + event = walker.get_random() + results[event] += 1 + + assert math.isclose(results["A"] / N_TESTS, 0.65, abs_tol=INACCURACY) + assert math.isclose(results["B"] / N_TESTS, 0.1, abs_tol=INACCURACY) + assert math.isclose(results["C"] / N_TESTS, 0.2, abs_tol=INACCURACY) + assert math.isclose(results["D"] / N_TESTS, 0.05, abs_tol=INACCURACY) + +def test_inaccuracy(): + p = 0.1 + distribution = [("A", p)] * 10 + walker = WalkersAlias(distribution) + + assert walker.ln == 10 \ No newline at end of file