From e88bd573aad360771f93bbb091ffbc09c8c43ecf Mon Sep 17 00:00:00 2001 From: yashsinghcodes Date: Tue, 24 Dec 2024 01:59:07 +0530 Subject: [PATCH] improvement over filter_list --- shuffle-tools/1.2.0/src/app.py | 131 ++++++++++++++++++++++++++------- 1 file changed, 105 insertions(+), 26 deletions(-) diff --git a/shuffle-tools/1.2.0/src/app.py b/shuffle-tools/1.2.0/src/app.py index 593433a5..175abcd8 100644 --- a/shuffle-tools/1.2.0/src/app.py +++ b/shuffle-tools/1.2.0/src/app.py @@ -9,6 +9,7 @@ import tempfile import zipfile import base64 +import gzip import ipaddress import hashlib from io import StringIO @@ -53,6 +54,8 @@ def __init__(self, redis, logger, console_logger=None): :param logger: :param console_logger: """ + self.cache_update_buffer = [] + self.shared_cache = {} super().__init__(redis, logger, console_logger) def router(self): @@ -661,6 +664,62 @@ def check_wildcard(self, wildcardstring, matching_string): return False + def preload_cache(self, key): + org_id = self.full_execution["workflow"]["execution_org"]["id"] + url = f"{self.url}/api/v1/orgs/{org_id}/get_cache" + data = { + "workflow_id": self.full_execution["workflow"]["id"], + "execution_id": self.current_execution_id, + "authorization": self.authorization, + "org_id": org_id, + "key": key, + } + get_response = requests.post(url, json=data, verify=False) + response_data = get_response.json() + if "value" in response_data: + raw_value = response_data["value"] + if isinstance(raw_value, str): + try: + parsed = json.loads(raw_value) + except json.JSONDecodeError: + parsed = [raw_value] + else: + parsed = raw_value + + if not isinstance(parsed, list): + parsed = [parsed] + + response_data["value"] = parsed + return get_response.json() + + def check_compression(self, obj, threshold=1_000_000): + data_btyes = json.dumps(obj).encode("utf-8") + if len(data_btyes) > threshold: + return True + return False + + def compress_data(self, obj): + data_btyes = json.dumps(obj).encode("utf-8") + compressed_data = gzip.compress(data_btyes) + return base64.b64encode(compressed_data).decode("utf-8") + + def update_cache(self, key): + org_id = self.full_execution["workflow"]["execution_org"]["id"] + url = f"{self.url}/api/v1/orgs/{org_id}/set_cache" + data = { + "workflow_id": self.full_execution["workflow"]["id"], + "execution_id": self.current_execution_id, + "authorization": self.authorization, + "org_id": org_id, + "key": key, + "value": json.dumps(self.shared_cache["value"]), + } + + get_response = requests.post(url, json=data, verify=False) + self.cache_update_buffer = [] + return get_response.json() + + def filter_list(self, input_list, field, check, value, opposite): # Remove hashtags on the fly @@ -876,12 +935,20 @@ def filter_list(self, input_list, field, check, value, opposite): failed_list.append(item) elif check == "in cache key": + if item == input_list[0]: + self.shared_cache = self.preload_cache(key=value) + ret = self.check_cache_contains(value, tmp, "true") + if ret["success"] == True and ret["found"] == True: new_list.append(item) else: failed_list.append(item) + if len(self.cache_update_buffer) > 400 or (item == input_list[-1] and len(self.cache_update_buffer) > 0): + self.update_cache(value) + + #return { # "success": True, # "found": False, @@ -931,13 +998,16 @@ def filter_list(self, input_list, field, check, value, opposite): failed_list = tmplist try: - return json.dumps( - { + data ={ "success": True, "valid": new_list, "invalid": failed_list, } - ) + if self.check_compression(data): + data = self.compress_data(data) + return data + + return json.dumps(data) # new_list = json.dumps(new_list) except json.decoder.JSONDecodeError as e: return json.dumps( @@ -1737,7 +1807,6 @@ def escape_html(self, input_data): def check_cache_contains(self, key, value, append): org_id = self.full_execution["workflow"]["execution_org"]["id"] - url = "%s/api/v1/orgs/%s/get_cache" % (self.url, org_id) data = { "workflow_id": self.full_execution["workflow"]["id"], "execution_id": self.current_execution_id, @@ -1766,7 +1835,7 @@ def check_cache_contains(self, key, value, append): value = json.dumps(value) except Exception as e: pass - + if not isinstance(value, str): value = str(value) @@ -1778,11 +1847,13 @@ def check_cache_contains(self, key, value, append): append = False if "success" not in allvalues: - get_response = requests.post(url, json=data, verify=False) + #get_response = requests.post(url, json=data, verify=False) + pass try: if "success" not in allvalues: - allvalues = get_response.json() + #allvalues = get_response.json() + allvalues = self.shared_cache try: if allvalues["value"] == None or allvalues["value"] == "null": @@ -1799,6 +1870,7 @@ def check_cache_contains(self, key, value, append): set_response = requests.post(set_url, json=data, verify=False) try: allvalues = set_response.json() + self.shared_cache = self.preload_cache(key=key) #allvalues["key"] = key #return allvalues @@ -1830,19 +1902,26 @@ def check_cache_contains(self, key, value, append): if allvalues["value"] == None or allvalues["value"] == "null": allvalues["value"] = "[]" - allvalues["value"] = str(allvalues["value"]) + if isinstance(allvalues["value"], str): + try: + allvalues["value"] = json.loads(allvalues["value"]) + except json.JSONDecodeError: + self.logger.info("[WARNING] Failed inner value cache parsing") + allvalues["value"] = [allvalues["value"]] + + if not isinstance(allvalues["value"], list): + allvalues["value"] = [allvalues["value"]] try: - parsedvalue = json.loads(allvalues["value"]) + parsedvalue = json.loads(str(allvalues["value"])) except json.decoder.JSONDecodeError as e: - parsedvalue = [str(allvalues["value"])] - except Exception as e: - parsedvalue = [str(allvalues["value"])] + parsedvalue = allvalues["value"] try: for item in parsedvalue: #return "%s %s" % (item, value) - if item == value: + self.logger.info(f"{item} == {value}") + if str(item) == str(value): if not append: try: newdata = json.loads(json.dumps(data)) @@ -1858,7 +1937,7 @@ def check_cache_contains(self, key, value, append): "reason": "Found and not appending!", "key": key, "search": value, - "value": json.loads(allvalues["value"]), + "value": allvalues["value"], } else: return { @@ -1867,10 +1946,10 @@ def check_cache_contains(self, key, value, append): "reason": "Found, was appending, but item already exists", "key": key, "search": value, - "value": json.loads(allvalues["value"]), + "value": allvalues["value"], } - - # Lol + + # Lol break except Exception as e: parsedvalue = [str(parsedvalue)] @@ -1886,18 +1965,18 @@ def check_cache_contains(self, key, value, append): "value": json.loads(allvalues["value"]), } - new_value = parsedvalue - if new_value == None: - new_value = [value] + #parsedvalue.append(value) - new_value.append(value) - data["value"] = json.dumps(new_value) + #data["value"] = json.dumps(parsedvalue) - set_url = "%s/api/v1/orgs/%s/set_cache" % (self.url, org_id) - response = requests.post(set_url, json=data, verify=False) + if value not in allvalues["value"] and isinstance(allvalues["value"], list): + self.cache_update_buffer.append(value) + allvalues["value"].append(value) + #set_url = "%s/api/v1/orgs/%s/set_cache" % (self.url, org_id) + #response = requests.post(set_url, json=data, verify=False) exception = "" try: - allvalues = response.json() + #allvalues = response.json() #return allvalues return { @@ -1906,7 +1985,7 @@ def check_cache_contains(self, key, value, append): "reason": "Appended as it didn't exist", "key": key, "search": value, - "value": new_value, + "value": parsedvalue, } except Exception as e: exception = e