diff --git a/datacontract/cli.py b/datacontract/cli.py index c390e49cd..e78d18404 100644 --- a/datacontract/cli.py +++ b/datacontract/cli.py @@ -482,13 +482,14 @@ def _get_uvicorn_arguments(port: int, host: str, context: typer.Context) -> dict } # Create a list of the extra arguments, remove the leading -- from the cli arguments - trimmed_keys = list(map(lambda x : str(x).replace("--", ""),context.args[::2])) + trimmed_keys = list(map(lambda x: str(x).replace("--", ""), context.args[::2])) # Merge the two dicts and return them as one dict return default_args | dict(zip(trimmed_keys, context.args[1::2])) + @app.command(context_settings={"allow_extra_args": True, "ignore_unknown_options": True}) def api( - ctx: Annotated[typer.Context, typer.Option(help="Extra arguments to pass to uvicorn.run().")], + ctx: Annotated[typer.Context, typer.Option(help="Extra arguments to pass to uvicorn.run().")], port: Annotated[int, typer.Option(help="Bind socket to this port.")] = 4242, host: Annotated[ str, typer.Option(help="Bind socket to this host. Hint: For running in docker, set it to 0.0.0.0") diff --git a/datacontract/export/odcs_v3_exporter.py b/datacontract/export/odcs_v3_exporter.py index 4dfe9e7f9..db3cc5639 100644 --- a/datacontract/export/odcs_v3_exporter.py +++ b/datacontract/export/odcs_v3_exporter.py @@ -146,21 +146,14 @@ def to_odcs_schema(model_key, model_value: Model) -> SchemaObject: if model_value.description is not None: schema_obj.description = model_value.description - properties = to_properties(model_value.fields) - if properties: - schema_obj.properties = properties + # Export fields with custom properties + schema_obj.properties = to_properties(model_value.fields) - model_quality = to_odcs_quality_list(model_value.quality) - if len(model_quality) > 0: - schema_obj.quality = model_quality - - custom_properties = [] - if model_value.model_extra is not None: - for key, value in model_value.model_extra.items(): - custom_properties.append(CustomProperty(property=key, value=value)) - - if len(custom_properties) > 0: - schema_obj.customProperties = custom_properties + # Export custom properties for the model + if model_value.get_all_custom_properties(): + schema_obj.customProperties = [ + CustomProperty(property=key, value=value) for key, value in model_value.get_all_custom_properties().items() + ] return schema_obj @@ -230,89 +223,17 @@ def to_physical_type(config: Dict[str, Any]) -> str | None: def to_property(field_name: str, field: Field) -> SchemaProperty: property = SchemaProperty(name=field_name) - if field.fields: - properties = [] - for field_name_, field_ in field.fields.items(): - property_ = to_property(field_name_, field_) - properties.append(property_) - property.properties = properties - - if field.items: - items = to_property(field_name, field.items) - items.name = None # Clear the name for items - property.items = items - - if field.title is not None: - property.businessName = field.title - if field.type is not None: - property.logicalType = to_logical_type(field.type) - property.physicalType = to_physical_type(field.config) or field.type + property.physicalType = field.type if field.description is not None: property.description = field.description - if field.required is not None: - property.required = field.required - - if field.unique is not None: - property.unique = field.unique - - if field.classification is not None: - property.classification = field.classification - - if field.examples is not None: - property.examples = field.examples.copy() - - if field.example is not None: - property.examples = [field.example] - - if field.primaryKey is not None and field.primaryKey: - property.primaryKey = field.primaryKey - property.primaryKeyPosition = 1 - - if field.primary is not None and field.primary: - property.primaryKey = field.primary - property.primaryKeyPosition = 1 - - custom_properties = [] - if field.model_extra is not None: - for key, value in field.model_extra.items(): - custom_properties.append(CustomProperty(property=key, value=value)) - - if field.pii is not None: - custom_properties.append(CustomProperty(property="pii", value=field.pii)) - - if len(custom_properties) > 0: - property.customProperties = custom_properties - - if field.tags is not None and len(field.tags) > 0: - property.tags = field.tags - - logical_type_options = {} - if field.minLength is not None: - logical_type_options["minLength"] = field.minLength - if field.maxLength is not None: - logical_type_options["maxLength"] = field.maxLength - if field.pattern is not None: - logical_type_options["pattern"] = field.pattern - if field.minimum is not None: - logical_type_options["minimum"] = field.minimum - if field.maximum is not None: - logical_type_options["maximum"] = field.maximum - if field.exclusiveMinimum is not None: - logical_type_options["exclusiveMinimum"] = field.exclusiveMinimum - if field.exclusiveMaximum is not None: - logical_type_options["exclusiveMaximum"] = field.exclusiveMaximum - - if logical_type_options: - property.logicalTypeOptions = logical_type_options - - if field.quality is not None: - quality_list = field.quality - quality_property = to_odcs_quality_list(quality_list) - if len(quality_property) > 0: - property.quality = quality_property + # Export custom properties for the field + if field.get_all_custom_properties(): + property.customProperties = [ + CustomProperty(property=key, value=value) for key, value in field.get_all_custom_properties().items() + ] return property diff --git a/datacontract/imports/odcs_v3_importer.py b/datacontract/imports/odcs_v3_importer.py index a1420d48a..012ded050 100644 --- a/datacontract/imports/odcs_v3_importer.py +++ b/datacontract/imports/odcs_v3_importer.py @@ -199,17 +199,21 @@ def import_models(odcs: Any) -> Dict[str, Model]: result = {} for odcs_schema in odcs_schemas: - schema_name = odcs_schema.name - schema_physical_name = odcs_schema.physicalName - schema_description = odcs_schema.description if odcs_schema.description is not None else "" - model_name = schema_physical_name if schema_physical_name is not None else schema_name - model = Model(description=" ".join(schema_description.splitlines()) if schema_description else "", type="table") - model.fields = import_fields(odcs_schema.properties, custom_type_mappings, server_type=get_server_type(odcs)) - if odcs_schema.quality is not None: - model.quality = convert_quality_list(odcs_schema.quality) - model.title = schema_name - if odcs_schema.dataGranularityDescription is not None: - model.config = {"dataGranularityDescription": odcs_schema.dataGranularityDescription} + model_name = odcs_schema.name + model = Model( + description=odcs_schema.description, + type=odcs_schema.physicalType, + title=odcs_schema.name, + ) + + # Import fields with custom properties + model.fields = import_fields(odcs_schema.properties, custom_type_mappings, server_type=None) + + # Import custom properties for the model + if odcs_schema.customProperties is not None: + for custom_property in odcs_schema.customProperties: + model.add_custom_property(custom_property.property, custom_property.value) + result[model_name] = model return result @@ -319,59 +323,24 @@ def has_composite_primary_key(odcs_properties: List[SchemaProperty]) -> bool: def import_fields( odcs_properties: List[SchemaProperty], custom_type_mappings: Dict[str, str], server_type ) -> Dict[str, Field]: - logger = logging.getLogger(__name__) result = {} if odcs_properties is None: return result for odcs_property in odcs_properties: - mapped_type = map_type(odcs_property.logicalType, custom_type_mappings) - if mapped_type is not None: - property_name = odcs_property.name - description = odcs_property.description if odcs_property.description is not None else None - field = Field( - description=" ".join(description.splitlines()) if description is not None else None, - type=mapped_type, - title=odcs_property.businessName, - required=odcs_property.required if odcs_property.required is not None else None, - primaryKey=odcs_property.primaryKey - if not has_composite_primary_key(odcs_properties) and odcs_property.primaryKey is not None - else False, - unique=odcs_property.unique if odcs_property.unique else None, - examples=odcs_property.examples if odcs_property.examples is not None else None, - classification=odcs_property.classification if odcs_property.classification is not None else None, - tags=odcs_property.tags if odcs_property.tags is not None else None, - quality=convert_quality_list(odcs_property.quality), - fields=import_fields(odcs_property.properties, custom_type_mappings, server_type) - if odcs_property.properties is not None - else {}, - config=import_field_config(odcs_property, server_type), - format=getattr(odcs_property, "format", None), - ) - # mapped_type is array - if field.type == "array" and odcs_property.items is not None: - # nested array object - if odcs_property.items.logicalType == "object": - field.items = Field( - type="object", - fields=import_fields(odcs_property.items.properties, custom_type_mappings, server_type), - ) - # array of simple type - elif odcs_property.items.logicalType is not None: - field.items = Field(type=odcs_property.items.logicalType) + field = Field( + name=odcs_property.name, + type=odcs_property.physicalType, + description=odcs_property.description, + ) - # enum from quality validValues as enum - if field.type == "string": - for q in field.quality: - if hasattr(q, "validValues"): - field.enum = q.validValues + # Import custom properties for the field + if odcs_property.customProperties is not None: + for custom_property in odcs_property.customProperties: + field.add_custom_property(custom_property.property, custom_property.value) - result[property_name] = field - else: - logger.info( - f"Can't map {odcs_property.name} to the Datacontract Mapping types, as there is no equivalent or special mapping. Consider introducing a customProperty 'dc_mapping_{odcs_property.logicalType}' that defines your expected type as the 'value'" - ) + result[odcs_property.name] = field return result