Skip to content

V1

UnsupportedTypeError

Bases: Exception

Raised when an unsupported value type is specified

Source code in inference/enterprise/workflows/enterprise_blocks/sinks/opc_writer/v1.py
18
19
20
21
class UnsupportedTypeError(Exception):
    """Raised when an unsupported value type is specified"""

    pass

get_available_namespaces(client)

Get list of available namespaces from OPC UA server. Returns empty list if unable to fetch namespaces.

Source code in inference/enterprise/workflows/enterprise_blocks/sinks/opc_writer/v1.py
382
383
384
385
386
387
388
389
390
391
392
393
394
def get_available_namespaces(client: Client) -> List[str]:
    """
    Get list of available namespaces from OPC UA server.
    Returns empty list if unable to fetch namespaces.
    """
    try:
        get_namespace_array = sync_async_client_method(AsyncClient.get_namespace_array)(
            client
        )
        return get_namespace_array()
    except Exception as exc:
        logger.info(f"Failed to get namespace array (non-fatal): {exc}")
        return ["<unable to fetch namespaces>"]

get_node_data_type(var)

Get the data type of an OPC UA node. Returns a string representation of the type, or "Unknown" if unable to read.

Source code in inference/enterprise/workflows/enterprise_blocks/sinks/opc_writer/v1.py
406
407
408
409
410
411
412
413
414
415
def get_node_data_type(var) -> str:
    """
    Get the data type of an OPC UA node.
    Returns a string representation of the type, or "Unknown" if unable to read.
    """
    try:
        return str(var.read_data_type_as_variant_type())
    except Exception as exc:
        logger.info(f"Unable to read node data type: {exc}")
        return "Unknown"

safe_disconnect(client)

Safely disconnect from OPC UA server, swallowing any errors

Source code in inference/enterprise/workflows/enterprise_blocks/sinks/opc_writer/v1.py
397
398
399
400
401
402
403
def safe_disconnect(client: Client) -> None:
    """Safely disconnect from OPC UA server, swallowing any errors"""
    try:
        logger.debug("OPC Writer disconnecting from server")
        client.disconnect()
    except Exception as exc:
        logger.debug(f"OPC Writer disconnect error (non-fatal): {exc}")