From 081742a0c9283b351f4c49006891584dce584d54 Mon Sep 17 00:00:00 2001 From: mrajkowski Date: Tue, 24 Jan 2017 13:33:19 -0800 Subject: [PATCH] Add files via upload Reasons for putting the u2py.py module into Open Source: - It will be a good set of 'real world' Python code for us end-users - Customers and partners will be able to submit fixes and improvements to the code - Foster a Python Community that is asking for this platform to be used - Leverage innovation from the community, increasing the value and quality of our Python project. At this point the u2py.py code is the same copyrighted code that is shipped with the product, I am setting up as a pull request, and will wait for Product Management to get the correct licensing statement before it is accepted. --- U2/u2py.py/UniData/u2py.py | 258 ++++++++++++++++++++++++++++++++++++ U2/u2py.py/UniVerse/u2py.py | 256 +++++++++++++++++++++++++++++++++++ 2 files changed, 514 insertions(+) create mode 100644 U2/u2py.py/UniData/u2py.py create mode 100644 U2/u2py.py/UniVerse/u2py.py diff --git a/U2/u2py.py/UniData/u2py.py b/U2/u2py.py/UniData/u2py.py new file mode 100644 index 0000000..3dcea19 --- /dev/null +++ b/U2/u2py.py/UniData/u2py.py @@ -0,0 +1,258 @@ +# +# Rocket Software Confidential +# OCO Source Materials +# Copyright (C) Rocket Software. 2016, 2016 +# + +"""U2 module for Python, it allows Python developers to +1) call U2BASIC catalogued subroutines +2) run U2 ECL/TCL commands +3) read/write U2 files +4) handle U2 dynamic arrays +5) manage U2 SELECT list +6) control U2 transactions +""" +import os, sys +import random +from _u2py import * +from _u2py import call as _call +from _u2py import _File, _Command, _List, _Subroutine, _DynArray + + +# +# public functions +# +def run(cmdtext, capture=False): + ''' + run(cmdtext, [capture=False]) -> run a TCL/ECL command + if capture is True, return the output of the command as a string + ''' + cmd = Command(cmdtext) + if capture is True: + return cmd.run(True) + else: + cmd.run() + return None + + +def call(subname, *args): + newargs = _call(subname, *args) + for i, arg in enumerate(newargs): + newarg = DynArray(arg) + newargs[i] = newarg + return newargs +call.__doc__ = _call.__doc__ + +# +# public classes +# +class Command(_Command): + __doc__ = _Command.__doc__ + + def __repr__(self): + return _repr_wrapper(self.__class__, self) + + + def run(self, capture=False): + ''' + run([capture=False]) -> None -- run the TCL/ECL command, similar to running a command from U2 prompt + if capture is True, return the output of the command as a string + ''' + if capture is True: + u2_save_stdout = os.dup(sys.stdout.fileno()) + u2_save_stderr = os.dup(sys.stderr.fileno()) + capture_file_name = "u2pyruncapture-{pid}-{random_number}".format(pid = os.getpid(), random_number = random.randrange(10000)) + repstdout = open(capture_file_name, 'w+t',encoding='UTF8', errors="ignore") + os.dup2(repstdout.fileno(), sys.stdout.fileno()) + os.dup2(repstdout.fileno(), sys.stderr.fileno()) + + super().run() + + os.dup2(u2_save_stdout, sys.stdout.fileno()) + os.dup2(u2_save_stderr, sys.stderr.fileno()) + + repstdout.seek(0,0) + cmd_output = repstdout.read() + repstdout.close() + os.close(u2_save_stdout) + os.close(u2_save_stderr) + try: + os.unlink(capture_file_name) + except: + pass + return cmd_output + else: + super().run() + return None + + pass + + + +class File(_File): + __doc__ = _File.__doc__ + + def __repr__(self): + return _repr_wrapper(self.__class__, self) + + def read(self, *args): + return DynArray(super().read(*args)) + read.__doc__ = _File.read.__doc__ + + def readv(self, *args): + return DynArray(super().readv(*args)) + readv.__doc__ = _File.readv.__doc__ + + def readnamedfields(self, *args): + return DynArray(super().readnamedfields(*args)) + readnamedfields.__doc__ = _File.readnamedfields.__doc__ + + pass + + + +class List(_List): + __doc__ = _List.__doc__ + def __repr__(self): + return _repr_wrapper(self.__class__, self) + + def readlist(self): + return DynArray(super().readlist()) + readlist.__doc__ = _List.readlist.__doc__ + + def next(self): + return DynArray(super().next()) + next.__doc__ = _List.next.__doc__ + + def __next__(self): + return DynArray(super().__next__()) + + pass + + + +class Subroutine(_Subroutine): + __doc__ = _Subroutine.__doc__ + def __repr__(self): + return _repr_wrapper(self.__class__, self) + + def call(self): + super().call() + for i, arg in enumerate(self.args): + newarg = DynArray(arg) + self.args[i] = newarg + call.__doc__ = _Subroutine.call.__doc__ + + pass + + +class DynArray(_DynArray): + __doc__ = _DynArray.__doc__ + + def __init__(self, *args): + if len(args) == 1 and isinstance(args[0], list): + self._normalize_list(args[0]) + return + else: + super().__init__(*args) + + def __repr__(self): + return _repr_wrapper(self.__class__, self) + + def extract(self, *args, **kwargs): + return DynArray(super().extract(*args, **kwargs)) + extract.__doc__ = _DynArray.extract.__doc__ + + def field(self, *args, **kwargs): + return DynArray(super().field(*args, **kwargs)) + field.__doc__ = _DynArray.field.__doc__ + + def format(self, *args): + return DynArray(super().format(*args)) + format.__doc__ = _DynArray.format.__doc__ + + def iconv(self, *args): + return DynArray(super().iconv(*args)) + iconv.__doc__ = _DynArray.iconv.__doc__ + + def oconv(self, *args): + return DynArray(super().oconv(*args)) + oconv.__doc__ = _DynArray.oconv.__doc__ + + def to_list(self): + ''' + to_list() ->a Python list object -- split the DynArray object into a Python list object using FM/VM/SM as delimiters + ''' + + if len(self._bytes) == 0: + return [] + + nested_list = self._bytes.split(FM) + for i, f in enumerate(nested_list): + if f.find(VM) == -1 and f.find(SM) == -1: + pass + else: + nested_list[i] = f.split(VM) + for j, v in enumerate(nested_list[i]): + if v.find(SM) == -1: + pass + else: + nested_list[i][j] = v.split(SM) + + + _stringize_list(nested_list) + + return nested_list + + def _normalize_list(self, l): + barray = bytearray() + _bytesize_list(barray, l, mark_idx = 1) + self._bytes = bytes(barray) + + + +# +# internal helper functions +# + +def _stringize_list(l): + for i, f in enumerate(l): + if isinstance(f, list): + _stringize_list(f) + else: + l[i] = str(f, config.encoding) + +def _bytesize_item(barray, i, mark_idx): + if isinstance(i, list): + _bytesize_list(barray, i, mark_idx) + else: + barray.extend(_bytesize(i)) + +def _bytesize_list(barray, l, mark_idx): + if l == []: + return + + mark = _B_MARKS[mark_idx] + for i in l[:-1]: + _bytesize_item(barray, i, mark_idx + 1) + barray.extend(mark) + + _bytesize_item(barray, l[-1], mark_idx + 1) + +def _bytesize(v): + if v is None: + return b'' + + if isinstance(v, bytes) or isinstance(v, bytearray): + return bytes(v) + else: + return bytes(str(v), config.encoding, errors = 'replace') + +def _repr_wrapper(__class__, self): + return super(__class__, self).__repr__().replace("_u2py._", "u2py.", 1) + + +# +# internal module level constants +# +_B_MARKS=[IM, FM, VM, SM, TM] diff --git a/U2/u2py.py/UniVerse/u2py.py b/U2/u2py.py/UniVerse/u2py.py new file mode 100644 index 0000000..6fc1ac1 --- /dev/null +++ b/U2/u2py.py/UniVerse/u2py.py @@ -0,0 +1,256 @@ +# +# Rocket Software Confidential +# OCO Source Materials +# Copyright (C) Rocket Software. 2016, 2016 +# + +"""U2 module for Python, it allows Python developers to +1) call U2BASIC catalogued subroutines +2) run U2 ECL/TCL commands +3) read/write U2 files +4) handle U2 dynamic arrays +5) manage U2 SELECT list +6) control U2 transactions +""" +import os, sys +import random +from _u2py import * +from _u2py import call as _call +from _u2py import _File, _Command, _List, _Subroutine, _DynArray + + +# +# public functions +# +def run(cmdtext, capture=False): + ''' + run(cmdtext, [capture=False]) -> run a TCL/ECL command + if capture is True, return the output of the command as a string + ''' + cmd = Command(cmdtext) + if capture is True: + return cmd.run(True) + else: + cmd.run() + return None + + +def call(subname, *args): + newargs = _call(subname, *args) + for i, arg in enumerate(newargs): + newarg = DynArray(arg) + newargs[i] = newarg + return newargs +call.__doc__ = _call.__doc__ + +# +# public classes +# +class Command(_Command): + __doc__ = _Command.__doc__ + + def __repr__(self): + return _repr_wrapper(self.__class__, self) + + + def run(self, capture=False): + ''' + run([capture=False]) -> None -- run the TCL/ECL command, similar to running a command from U2 prompt + if capture is True, return the output of the command as a string + ''' + if capture is True: + u2_save_stdout = os.dup(sys.stdout.fileno()) + u2_save_stderr = os.dup(sys.stderr.fileno()) + capture_file_name = "u2pyruncapture-{pid}-{random_number}".format(pid = os.getpid(), random_number = random.randrange(10000)) + repstdout = open(capture_file_name, 'w+t') + os.dup2(repstdout.fileno(), sys.stdout.fileno()) + os.dup2(repstdout.fileno(), sys.stderr.fileno()) + + super().run() + + os.dup2(u2_save_stdout, sys.stdout.fileno()) + os.dup2(u2_save_stderr, sys.stderr.fileno()) + + repstdout.seek(0,0) + cmd_output = repstdout.read() + repstdout.close() + try: + os.unlink(capture_file_name) + except: + pass + return cmd_output + else: + super().run() + return None + + pass + + + +class File(_File): + __doc__ = _File.__doc__ + + def __repr__(self): + return _repr_wrapper(self.__class__, self) + + def read(self, *args): + return DynArray(super().read(*args)) + read.__doc__ = _File.read.__doc__ + + def readv(self, *args): + return DynArray(super().readv(*args)) + readv.__doc__ = _File.readv.__doc__ + + def readnamedfields(self, *args): + return DynArray(super().readnamedfields(*args)) + readnamedfields.__doc__ = _File.readnamedfields.__doc__ + + pass + + + +class List(_List): + __doc__ = _List.__doc__ + def __repr__(self): + return _repr_wrapper(self.__class__, self) + + def readlist(self): + return DynArray(super().readlist()) + readlist.__doc__ = _List.readlist.__doc__ + + def next(self): + return DynArray(super().next()) + next.__doc__ = _List.next.__doc__ + + def __next__(self): + return DynArray(super().__next__()) + + pass + + + +class Subroutine(_Subroutine): + __doc__ = _Subroutine.__doc__ + def __repr__(self): + return _repr_wrapper(self.__class__, self) + + def call(self): + super().call() + for i, arg in enumerate(self.args): + newarg = DynArray(arg) + self.args[i] = newarg + call.__doc__ = _Subroutine.call.__doc__ + + pass + + +class DynArray(_DynArray): + __doc__ = _DynArray.__doc__ + + def __init__(self, *args): + if len(args) == 1 and isinstance(args[0], list): + self._normalize_list(args[0]) + return + else: + super().__init__(*args) + + def __repr__(self): + return _repr_wrapper(self.__class__, self) + + def extract(self, *args, **kwargs): + return DynArray(super().extract(*args, **kwargs)) + extract.__doc__ = _DynArray.extract.__doc__ + + def field(self, *args, **kwargs): + return DynArray(super().field(*args, **kwargs)) + field.__doc__ = _DynArray.field.__doc__ + + def format(self, *args): + return DynArray(super().format(*args)) + format.__doc__ = _DynArray.format.__doc__ + + def iconv(self, *args): + return DynArray(super().iconv(*args)) + iconv.__doc__ = _DynArray.iconv.__doc__ + + def oconv(self, *args): + return DynArray(super().oconv(*args)) + oconv.__doc__ = _DynArray.oconv.__doc__ + + def to_list(self): + ''' + to_list() ->a Python list object -- split the DynArray object into a Python list object using FM/VM/SM as delimiters + ''' + + if len(self._bytes) == 0: + return [] + + nested_list = self._bytes.split(FM) + for i, f in enumerate(nested_list): + if f.find(VM) == -1 and f.find(SM) == -1: + pass + else: + nested_list[i] = f.split(VM) + for j, v in enumerate(nested_list[i]): + if v.find(SM) == -1: + pass + else: + nested_list[i][j] = v.split(SM) + + + _stringize_list(nested_list) + + return nested_list + + def _normalize_list(self, l): + barray = bytearray() + _bytesize_list(barray, l, mark_idx = 1) + self._bytes = bytes(barray) + + + +# +# internal helper functions +# + +def _stringize_list(l): + for i, f in enumerate(l): + if isinstance(f, list): + _stringize_list(f) + else: + l[i] = str(f, config.encoding) + +def _bytesize_item(barray, i, mark_idx): + if isinstance(i, list): + _bytesize_list(barray, i, mark_idx) + else: + barray.extend(_bytesize(i)) + +def _bytesize_list(barray, l, mark_idx): + if l == []: + return + + mark = _B_MARKS[mark_idx] + for i in l[:-1]: + _bytesize_item(barray, i, mark_idx + 1) + barray.extend(mark) + + _bytesize_item(barray, l[-1], mark_idx + 1) + +def _bytesize(v): + if v is None: + return b'' + + if isinstance(v, bytes) or isinstance(v, bytearray): + return bytes(v) + else: + return bytes(str(v), config.encoding, errors = 'replace') + +def _repr_wrapper(__class__, self): + return super(__class__, self).__repr__().replace("_u2py._", "u2py.", 1) + + +# +# internal module level constants +# +_B_MARKS=[IM, FM, VM, SM, TM] \ No newline at end of file