Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion .pylintrc
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ ignore-patterns=

# Use multiple processes to speed up Pylint. Specifying 0 will auto-detect the
# number of processors available to use.
jobs=1
jobs=0

# Control the amount of potential inferred values when inferring a single
# object. This can help the performance when dealing with large functions or
Expand Down
32 changes: 32 additions & 0 deletions lockable/flatten.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,32 @@
"""
Helper functions for flattening nested dictionaries.
"""


def flatten_json(data: dict, parent_key='', sep='.') -> dict:
"""
Flatten a nested dictionary.

Args:
- data (dict): The dictionary to flatten.
- parent_key (str, optional): The concatenated key from the parent(s). Defaults to ''.
- sep (str, optional): The separator to use between keys. Defaults to '.'.

Returns:
- dict: The flattened dictionary.
"""
items = {}
for key, value in data.items():
new_key = f"{parent_key}{sep}{key}" if parent_key else key
if isinstance(value, dict):
items.update(flatten_json(value, new_key, sep=sep))
else:
# ensure that the key doesn't overwrite an existing key
assert new_key not in items, f"Key {new_key} already exists in items"
items[new_key] = value
return items


def flatten_list(array: list) -> list:
""" Flatten a list of dictionaries """
return list(map(flatten_json, array))
3 changes: 2 additions & 1 deletion lockable/provider.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@

from pydash import filter_, count_by

from lockable.flatten import flatten_list
from lockable.logger import get_logger

MODULE_LOGGER = get_logger()
Expand All @@ -26,7 +27,7 @@ def __init__(self, uri: typing.Union[str, list]):
@property
def data(self) -> list:
""" Get resources list """
return self._resources
return flatten_list(self._resources)

@abstractmethod
def reload(self) -> None: # pragma: no cover
Expand Down
13 changes: 13 additions & 0 deletions tests/test_Lockable.py
Original file line number Diff line number Diff line change
Expand Up @@ -274,3 +274,16 @@ def test_lock_many_existing_allocation(self):
self.assertTrue(end - start < 2 and end - start > 1)
self.assertTrue(os.path.exists(os.path.join(tmpdirname, 'a.pid')))
self.assertFalse(os.path.exists(os.path.join(tmpdirname, 'b.pid')))

def test_lock_nested(self):
with TemporaryDirectory() as tmpdirname:
resources = [
{"id": "a", "hostname": "myhost", "online": True, "a": 2},
{"id": "b", "hostname": "myhost", "online": True, "a": {"b": 2}}
]

lockable = Lockable(hostname='myhost', resource_list=resources, lock_folder=tmpdirname)
resource = lockable.lock({"a.b": 2})
self.assertEqual(resource.resource_id, 'b')
self.assertEqual(resource.resource_info,
{"id": "b", "hostname": "myhost", "online": True, "a.b": 2})
6 changes: 6 additions & 0 deletions tests/test_Provider.py
Original file line number Diff line number Diff line change
Expand Up @@ -163,3 +163,9 @@ def test_provider_http_no_response(self):
ProviderHttp.BACKOFF_FACTOR = 0
with self.assertRaises(ProviderError):
create_provider('http://localhost/resource')

def test_provider_nested_resources(self):
nested_resource = {"id": 12, "a": {"b": 2}}
flatten_resource = {"a.b": 2, "id": 12}
provider = create_provider([nested_resource])
self.assertEqual([flatten_resource], provider.data)
41 changes: 41 additions & 0 deletions tests/test_flatten.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,41 @@
import unittest

from lockable.flatten import flatten_json, flatten_list


class TestFlattenJson(unittest.TestCase):

def test_basic_flattening(self):
data = {"a": {"b": 1}}
result = flatten_json(data)
self.assertEqual(result, {"a.b": 1})

def test_multiple_level_flattening(self):
data = {"a": {"b": {"c": 2}}}
result = flatten_json(data)
self.assertEqual(result, {"a.b.c": 2})

def test_mixed_flattening(self):
data = {"a": {"b": 1, "c": {"d": 2, "e": {"f": 3}}}, "g": 4}
result = flatten_json(data)
self.assertEqual(result, {'a.b': 1, 'a.c.d': 2, 'a.c.e.f': 3, 'g': 4})

def test_empty_data(self):
data = {}
result = flatten_json(data)
self.assertEqual(result, {})

def test_non_nested_data(self):
data = {"a": 1, "b": 2}
result = flatten_json(data)
self.assertEqual(result, data)

def test_flatten_list_mixed(self):
data = [{"a": 1, "b": 2}, {"a": {"b": {"c": 2}}}]
result = flatten_list(data)
self.assertEqual(result, [{'a': 1, 'b': 2}, {'a.b.c': 2}])

def test_raise_if_overlapping_subfield(self):
data = {"a": {"b": 1}, "a.b": 2}
with self.assertRaises(AssertionError):
flatten_json(data)