Skip to content

Commit d0b6234

Browse files
committed
update code
1 parent 6f2b021 commit d0b6234

File tree

58 files changed

+626
-509
lines changed

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

58 files changed

+626
-509
lines changed

.github/workflows/ci.yml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -34,7 +34,7 @@ jobs:
3434
libsasl2-dev
3535
3636
- name: Cache Cargo
37-
uses: Swatinem/rust-cache@2
37+
uses: Swatinem/rust-cache@v2
3838
with:
3939
shared-key: "linux-build"
4040

cli/cli/src/repl.rs

Lines changed: 1 addition & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -343,11 +343,7 @@ impl Repl {
343343
.set_header(
344344
vec!["Command", "Usage"]
345345
.into_iter()
346-
.map(|s| {
347-
Cell::new(s)
348-
.fg(Color::Cyan)
349-
.add_attribute(Attribute::Bold)
350-
})
346+
.map(|s| Cell::new(s).fg(Color::Cyan).add_attribute(Attribute::Bold))
351347
.collect::<Vec<_>>(),
352348
)
353349
.add_row(vec!["HELP", "Show this message"])

python/functionstream-api/src/fs_api/context.py

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -22,24 +22,24 @@
2222

2323
class Context(abc.ABC):
2424
"""Context object"""
25-
25+
2626
@abc.abstractmethod
2727
def emit(self, data: bytes, channel: int = 0):
2828
pass
29-
29+
3030
@abc.abstractmethod
3131
def emit_watermark(self, watermark: int, channel: int = 0):
3232
pass
33-
33+
3434
@abc.abstractmethod
3535
def getOrCreateKVStore(self, name: str) -> KvStore:
3636
pass
37-
37+
3838
@abc.abstractmethod
3939
def getConfig(self) -> Dict[str, str]:
4040
"""
4141
Get global configuration Map
42-
42+
4343
Returns:
4444
Dict[str, str]: Configuration dictionary
4545
"""

python/functionstream-api/src/fs_api/driver.py

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -22,7 +22,7 @@ def init(self, ctx: Context, config: dict):
2222
@abc.abstractmethod
2323
def process(self, ctx: Context, source_id: int, data: bytes):
2424
pass
25-
25+
2626
@abc.abstractmethod
2727
def process_watermark(self, ctx: Context, source_id: int, watermark: int):
2828
pass
@@ -34,7 +34,7 @@ def take_checkpoint(self, ctx: Context, checkpoint_id: int):
3434
@abc.abstractmethod
3535
def check_heartbeat(self, ctx: Context) -> bool:
3636
pass
37-
37+
3838
@abc.abstractmethod
3939
def close(self, ctx: Context):
4040
pass

python/functionstream-api/src/fs_api/store/iterator.py

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -16,11 +16,11 @@
1616

1717

1818
class KvIterator(abc.ABC):
19-
19+
2020
@abc.abstractmethod
2121
def has_next(self) -> bool:
2222
pass
23-
23+
2424
@abc.abstractmethod
2525
def next(self) -> Optional[Tuple[bytes, bytes]]:
2626
pass

python/functionstream-api/src/fs_api/store/store.py

Lines changed: 12 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -23,49 +23,49 @@
2323

2424

2525
class KvStore(abc.ABC):
26-
26+
2727
@abc.abstractmethod
2828
def put_state(self, key: bytes, value: bytes):
2929
pass
30-
30+
3131
@abc.abstractmethod
3232
def get_state(self, key: bytes) -> Optional[bytes]:
3333
pass
34-
34+
3535
@abc.abstractmethod
3636
def delete_state(self, key: bytes):
3737
pass
38-
38+
3939
@abc.abstractmethod
4040
def list_states(self, start_inclusive: bytes, end_exclusive: bytes) -> List[bytes]:
4141
pass
42-
42+
4343
@abc.abstractmethod
4444
def put(self, key: ComplexKey, value: bytes):
4545
pass
46-
46+
4747
@abc.abstractmethod
4848
def get(self, key: ComplexKey) -> Optional[bytes]:
4949
pass
50-
50+
5151
@abc.abstractmethod
5252
def delete(self, key: ComplexKey):
5353
pass
54-
54+
5555
@abc.abstractmethod
5656
def merge(self, key: ComplexKey, value: bytes):
5757
pass
58-
58+
5959
@abc.abstractmethod
6060
def delete_prefix(self, key: ComplexKey):
6161
pass
62-
62+
6363
@abc.abstractmethod
64-
def list_complex(self, key_group: bytes, key: bytes, namespace: bytes,
64+
def list_complex(self, key_group: bytes, key: bytes, namespace: bytes,
6565
start_inclusive: bytes, end_exclusive: bytes) -> List[bytes]:
6666
pass
6767

68-
68+
6969
@abc.abstractmethod
7070
def scan_complex(self, key_group: bytes, key: bytes, namespace: bytes) -> KvIterator:
7171
pass

python/functionstream-client/src/fs_client/client.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -503,4 +503,4 @@ def __enter__(self):
503503
return self
504504

505505
def __exit__(self, exc_type, exc_val, exc_tb):
506-
self.close()
506+
self.close()

python/functionstream-client/src/fs_client/exceptions.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -137,4 +137,4 @@ def _convert_grpc_error(e: grpc.RpcError) -> FsError:
137137
message = f"{details} (gRPC code: {code.name if code else 'NONE'})"
138138

139139
# 3. Return the instance (caller should raise it)
140-
return exception_cls(message, grpc_code=code)
140+
return exception_cls(message, grpc_code=code)

python/functionstream-runtime/build.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -273,4 +273,4 @@ def main():
273273

274274

275275
if __name__ == "__main__":
276-
main()
276+
main()

python/functionstream-runtime/src/fs_runtime/runner.py

Lines changed: 28 additions & 28 deletions
Original file line numberDiff line numberDiff line change
@@ -32,7 +32,7 @@ def fs_exec(class_name: str, modules: List[Tuple[str, bytes]]) -> None:
3232
global _DRIVER
3333

3434
try:
35-
35+
3636
# Load all modules in order
3737
loaded_modules = {}
3838
for module_name, module_bytes in modules:
@@ -49,19 +49,19 @@ def fs_exec(class_name: str, modules: List[Tuple[str, bytes]]) -> None:
4949

5050
# Create the module
5151
module = importlib.util.module_from_spec(spec)
52-
52+
5353
# Execute the module source code
5454
code = compile(module_source, f"<{module_name}>", "exec")
5555
exec(code, module.__dict__)
56-
56+
5757
# Add the module to sys.modules
5858
sys.modules[module_name] = module
5959
loaded_modules[module_name] = module
60-
60+
6161
# Try to find the class in all loaded modules
6262
ProcessorClass = None
6363
found_in_module = None
64-
64+
6565
# First, try to find in the last module (most likely location)
6666
if modules:
6767
last_module_name = modules[-1][0]
@@ -70,103 +70,103 @@ def fs_exec(class_name: str, modules: List[Tuple[str, bytes]]) -> None:
7070
if hasattr(module, class_name):
7171
ProcessorClass = getattr(module, class_name)
7272
found_in_module = last_module_name
73-
73+
7474
# If not found, search in all modules
7575
if ProcessorClass is None:
7676
for module_name, module in loaded_modules.items():
7777
if hasattr(module, class_name):
7878
ProcessorClass = getattr(module, class_name)
7979
found_in_module = module_name
8080
break
81-
81+
8282
if ProcessorClass is None:
8383
module_names = [name for name, _ in modules]
8484
raise RuntimeError(f"Class '{class_name}' not found in any of the loaded modules: {module_names}")
85-
85+
8686
if not issubclass(ProcessorClass, FSProcessorDriver):
8787
raise TypeError(f"Class '{class_name}' must be a subclass of FSProcessorDriver")
8888

8989
_DRIVER = ProcessorClass()
90-
90+
9191
except Exception as e:
9292
raise RuntimeError(f"Failed to load class '{class_name}' from modules: {e}") from e
9393

9494
class WitWorld:
95-
95+
9696
def fs_init(self, config: List[Tuple[str, str]]) -> None:
9797
global _DRIVER, _CONTEXT
98-
98+
9999
config_dict = convert_config_to_dict(config)
100-
100+
101101
_CONTEXT = WitContext(config_dict)
102-
102+
103103
if _DRIVER:
104104
try:
105105
_DRIVER.init(_CONTEXT, _CONTEXT._CONFIG)
106106
except Exception:
107107
pass
108-
108+
109109
def fs_process(self, source_id: int, data: bytes) -> None:
110110
global _DRIVER, _CONTEXT
111111
if not _DRIVER or not _CONTEXT:
112112
return
113-
113+
114114
try:
115115
_DRIVER.process(_CONTEXT, source_id, data)
116116
except Exception:
117117
pass
118-
118+
119119
def fs_process_watermark(self, source_id: int, watermark: int) -> None:
120120
global _DRIVER, _CONTEXT
121121
if not _DRIVER or not _CONTEXT:
122122
return
123-
123+
124124
try:
125125
_DRIVER.process_watermark(_CONTEXT, source_id, watermark)
126126
except Exception:
127127
pass
128-
128+
129129
def fs_take_checkpoint(self, checkpoint_id: int) -> None:
130130
global _DRIVER, _CONTEXT
131131
if not _DRIVER or not _CONTEXT:
132132
return
133-
133+
134134
try:
135135
_DRIVER.take_checkpoint(_CONTEXT, checkpoint_id)
136136
except Exception:
137137
pass
138-
138+
139139
def fs_check_heartbeat(self) -> bool:
140140
global _DRIVER, _CONTEXT
141141
if not _DRIVER or not _CONTEXT:
142142
return False
143-
143+
144144
try:
145145
return _DRIVER.check_heartbeat(_CONTEXT)
146146
except Exception:
147147
return False
148-
148+
149149
def fs_close(self) -> None:
150150
global _DRIVER, _CONTEXT
151-
151+
152152
if _DRIVER and _CONTEXT:
153153
try:
154154
_DRIVER.close(_CONTEXT)
155155
except Exception:
156156
pass
157-
157+
158158
_DRIVER = None
159159
_CONTEXT = None
160-
160+
161161
def fs_exec(self, class_name: str, modules: List[Tuple[str, bytes]]) -> None:
162162
fs_exec(class_name, modules)
163-
163+
164164
def fs_custom(self, payload: bytes) -> bytes:
165165
global _DRIVER, _CONTEXT
166-
166+
167167
if not _DRIVER or not _CONTEXT:
168168
raise RuntimeError("Driver or Context not initialized")
169-
169+
170170
return _DRIVER.custom(payload)
171171

172172

0 commit comments

Comments
 (0)