Skip to content

V1

OPCUAConnectionManager

Thread-safe connection manager for OPC UA clients with connection pooling and circuit breaker pattern.

Maintains a pool of connections keyed by (url, user_name) to avoid creating new connections for every write operation. Uses circuit breaker to fail fast when servers are unreachable.

Source code in inference/enterprise/workflows/enterprise_blocks/sinks/opc_writer/v1.py
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
class OPCUAConnectionManager:
    """
    Thread-safe connection manager for OPC UA clients with connection pooling
    and circuit breaker pattern.

    Maintains a pool of connections keyed by (url, user_name) to avoid creating
    new connections for every write operation. Uses circuit breaker to fail fast
    when servers are unreachable.
    """

    _instance: Optional["OPCUAConnectionManager"] = None
    _lock = threading.Lock()

    # Circuit breaker: how long to wait before trying a failed server again
    CIRCUIT_BREAKER_TIMEOUT_SECONDS = 2.0

    def __new__(cls) -> "OPCUAConnectionManager":
        """Singleton pattern to ensure one connection manager across the application."""
        if cls._instance is None:
            with cls._lock:
                if cls._instance is None:
                    cls._instance = super().__new__(cls)
                    cls._instance._initialized = False
        return cls._instance

    def __init__(self):
        if self._initialized:
            return
        self._connections: Dict[str, Client] = {}
        self._connection_locks: Dict[str, threading.Lock] = {}
        self._connection_metadata: Dict[str, dict] = {}
        self._connection_failures: Dict[str, float] = (
            {}
        )  # key -> timestamp of last failure
        self._global_lock = threading.Lock()
        self._tloop: Optional[ThreadLoop] = None
        self._initialized = True
        logger.debug("OPC UA Connection Manager initialized")

    def _get_tloop(self) -> ThreadLoop:
        """Get or create the shared ThreadLoop for all clients."""
        if self._tloop is None or not self._tloop.is_alive():
            logger.debug("OPC UA Connection Manager creating shared ThreadLoop")
            self._tloop = ThreadLoop(timeout=120)
            self._tloop.start()
        return self._tloop

    def _stop_tloop(self) -> None:
        """Stop the shared ThreadLoop if it exists."""
        if self._tloop is not None and self._tloop.is_alive():
            logger.debug("OPC UA Connection Manager stopping shared ThreadLoop")
            try:
                self._tloop.loop.call_soon_threadsafe(self._tloop.loop.stop)
                self._tloop.join(timeout=2.0)
            except Exception as exc:
                logger.debug(f"OPC UA Connection Manager ThreadLoop stop error: {exc}")
            self._tloop = None

    def _get_connection_key(self, url: str, user_name: Optional[str]) -> str:
        """Generate a unique key for connection pooling."""
        return f"{url}|{user_name or ''}"

    def _get_connection_lock(self, key: str) -> threading.Lock:
        """Get or create a lock for a specific connection."""
        with self._global_lock:
            if key not in self._connection_locks:
                self._connection_locks[key] = threading.Lock()
            return self._connection_locks[key]

    def _create_client(
        self,
        url: str,
        user_name: Optional[str],
        password: Optional[str],
        timeout: int,
    ) -> Client:
        """Create and configure a new OPC UA client using the shared ThreadLoop."""
        logger.debug(f"OPC UA Connection Manager creating client for {url}")
        tloop = self._get_tloop()
        client = Client(url=url, tloop=tloop, sync_wrapper_timeout=timeout)
        if user_name and password:
            client.set_user(user_name)
            client.set_password(password)
        return client

    def _connect_with_retry(
        self,
        client: Client,
        url: str,
        max_retries: int = 3,
        base_backoff: float = 1.0,
    ) -> None:
        """
        Connect to OPC UA server with retry logic and exponential backoff.

        Args:
            client: The OPC UA client to connect
            url: Server URL (for logging)
            max_retries: Maximum number of connection attempts
            base_backoff: Base delay between retries (seconds), doubles each retry

        Raises:
            Exception: If all connection attempts fail
        """
        last_exception = None

        for attempt in range(max_retries):
            try:
                logger.debug(
                    f"OPC UA Connection Manager connecting to {url} "
                    f"(attempt {attempt + 1}/{max_retries})"
                )
                client.connect()
                logger.info(
                    f"OPC UA Connection Manager successfully connected to {url}"
                )
                return
            except BadUserAccessDenied as exc:
                # Auth errors should not be retried - they will keep failing
                logger.error(f"OPC UA Connection Manager authentication failed: {exc}")
                raise Exception(f"AUTH ERROR: {exc}")
            except OSError as exc:
                last_exception = exc
                logger.warning(
                    f"OPC UA Connection Manager network error on attempt {attempt + 1}: {exc}"
                )
            except Exception as exc:
                last_exception = exc
                logger.warning(
                    f"OPC UA Connection Manager connection error on attempt {attempt + 1}: "
                    f"{type(exc).__name__}: {exc}"
                )

            # Don't sleep after the last attempt
            if attempt < max_retries - 1:
                backoff_time = base_backoff * (2**attempt)
                logger.debug(
                    f"OPC UA Connection Manager waiting {backoff_time}s before retry"
                )
                time.sleep(backoff_time)

        # All retries exhausted
        logger.error(
            f"OPC UA Connection Manager failed to connect to {url} "
            f"after {max_retries} attempts"
        )
        if isinstance(last_exception, OSError):
            raise Exception(
                f"NETWORK ERROR: Failed to connect after {max_retries} attempts. Last error: {last_exception}"
            )
        raise Exception(
            f"CONNECTION ERROR: Failed to connect after {max_retries} attempts. Last error: {last_exception}"
        )

    def _is_circuit_open(self, key: str) -> bool:
        """
        Check if circuit breaker is open (server recently failed).
        Returns True if we should NOT attempt connection (fail fast).
        """
        if key not in self._connection_failures:
            return False

        time_since_failure = time.time() - self._connection_failures[key]
        if time_since_failure < self.CIRCUIT_BREAKER_TIMEOUT_SECONDS:
            return True

        # Timeout expired, clear the failure record
        del self._connection_failures[key]
        return False

    def _record_failure(self, key: str) -> None:
        """Record a connection failure for circuit breaker."""
        self._connection_failures[key] = time.time()

    def _clear_failure(self, key: str) -> None:
        """Clear failure record after successful connection."""
        if key in self._connection_failures:
            del self._connection_failures[key]

    def get_connection(
        self,
        url: str,
        user_name: Optional[str],
        password: Optional[str],
        timeout: int,
        max_retries: int = 1,
        base_backoff: float = 0.0,
    ) -> Client:
        """
        Get a connection from the pool or create a new one.

        This method is thread-safe and will reuse existing healthy connections.
        Uses circuit breaker pattern to fail fast for recently failed servers.

        Args:
            url: OPC UA server URL
            user_name: Optional username for authentication
            password: Optional password for authentication
            timeout: Connection timeout in seconds
            max_retries: Maximum number of connection attempts (default 1)
            base_backoff: Base delay between retries (default 0)

        Returns:
            A connected OPC UA client

        Raises:
            Exception: If connection fails or circuit breaker is open
        """
        key = self._get_connection_key(url, user_name)
        lock = self._get_connection_lock(key)

        with lock:
            # Circuit breaker: fail fast if server recently failed
            if self._is_circuit_open(key):
                logger.debug(
                    f"OPC UA Connection Manager circuit breaker open for {url}, "
                    f"failing fast (will retry in {self.CIRCUIT_BREAKER_TIMEOUT_SECONDS}s)"
                )
                raise Exception(
                    f"CIRCUIT OPEN: Server {url} recently failed, skipping connection attempt. "
                    f"Will retry after {self.CIRCUIT_BREAKER_TIMEOUT_SECONDS}s."
                )

            # Check if we have an existing connection
            if key in self._connections:
                logger.debug(f"OPC UA Connection Manager reusing connection for {url}")
                return self._connections[key]

            # Create new connection
            try:
                client = self._create_client(url, user_name, password, timeout)
                self._connect_with_retry(client, url, max_retries, base_backoff)

                # Success - clear any failure record and store in pool
                self._clear_failure(key)
                self._connections[key] = client
                self._connection_metadata[key] = {
                    "url": url,
                    "user_name": user_name,
                    "password": password,
                    "timeout": timeout,
                    "connected_at": datetime.now(),
                }

                return client
            except Exception as exc:
                # Record failure for circuit breaker
                self._record_failure(key)
                raise

    def _safe_disconnect(self, client: Client) -> None:
        """Safely disconnect a client, swallowing any errors."""
        try:
            logger.debug("OPC UA Connection Manager disconnecting client")
            client.disconnect()
        except Exception as exc:
            logger.debug(
                f"OPC UA Connection Manager disconnect error (non-fatal): {exc}"
            )

    def release_connection(
        self, url: str, user_name: Optional[str], force_close: bool = False
    ) -> None:
        """
        Release a connection back to the pool.

        By default, connections are kept alive for reuse. Set force_close=True
        to immediately close the connection.

        Args:
            url: OPC UA server URL
            user_name: Optional username used for the connection
            force_close: If True, close the connection instead of keeping it
        """
        if not force_close:
            # Connection stays in pool for reuse
            return

        key = self._get_connection_key(url, user_name)
        lock = self._get_connection_lock(key)

        with lock:
            if key in self._connections:
                self._safe_disconnect(self._connections[key])
                del self._connections[key]
                if key in self._connection_metadata:
                    del self._connection_metadata[key]
                logger.debug(f"OPC UA Connection Manager closed connection for {url}")

    def invalidate_connection(self, url: str, user_name: Optional[str]) -> None:
        """
        Invalidate a connection, forcing it to be recreated on next use.

        Call this when a connection error occurs during an operation to ensure
        the next operation gets a fresh connection.

        Args:
            url: OPC UA server URL
            user_name: Optional username used for the connection
        """
        key = self._get_connection_key(url, user_name)
        lock = self._get_connection_lock(key)

        with lock:
            if key in self._connections:
                self._safe_disconnect(self._connections[key])
                del self._connections[key]
                if key in self._connection_metadata:
                    del self._connection_metadata[key]
                logger.debug(
                    f"OPC UA Connection Manager invalidated connection for {url}"
                )

    def close_all(self) -> None:
        """Close all connections in the pool and stop the shared ThreadLoop."""
        with self._global_lock:
            for key, client in list(self._connections.items()):
                self._safe_disconnect(client)
            self._connections.clear()
            self._connection_metadata.clear()
            self._stop_tloop()
            logger.info("OPC UA Connection Manager closed all connections")

    def get_pool_stats(self) -> dict:
        """Get statistics about the connection pool."""
        with self._global_lock:
            return {
                "total_connections": len(self._connections),
                "connections": [
                    {
                        "url": meta["url"],
                        "user_name": meta["user_name"],
                        "connected_at": meta["connected_at"].isoformat(),
                    }
                    for meta in self._connection_metadata.values()
                ],
            }

__new__()

Singleton pattern to ensure one connection manager across the application.

Source code in inference/enterprise/workflows/enterprise_blocks/sinks/opc_writer/v1.py
35
36
37
38
39
40
41
42
def __new__(cls) -> "OPCUAConnectionManager":
    """Singleton pattern to ensure one connection manager across the application."""
    if cls._instance is None:
        with cls._lock:
            if cls._instance is None:
                cls._instance = super().__new__(cls)
                cls._instance._initialized = False
    return cls._instance

close_all()

Close all connections in the pool and stop the shared ThreadLoop.

Source code in inference/enterprise/workflows/enterprise_blocks/sinks/opc_writer/v1.py
332
333
334
335
336
337
338
339
340
def close_all(self) -> None:
    """Close all connections in the pool and stop the shared ThreadLoop."""
    with self._global_lock:
        for key, client in list(self._connections.items()):
            self._safe_disconnect(client)
        self._connections.clear()
        self._connection_metadata.clear()
        self._stop_tloop()
        logger.info("OPC UA Connection Manager closed all connections")

get_connection(url, user_name, password, timeout, max_retries=1, base_backoff=0.0)

Get a connection from the pool or create a new one.

This method is thread-safe and will reuse existing healthy connections. Uses circuit breaker pattern to fail fast for recently failed servers.

Parameters:

Name Type Description Default
url str

OPC UA server URL

required
user_name Optional[str]

Optional username for authentication

required
password Optional[str]

Optional password for authentication

required
timeout int

Connection timeout in seconds

required
max_retries int

Maximum number of connection attempts (default 1)

1
base_backoff float

Base delay between retries (default 0)

0.0

Returns:

Type Description
Client

A connected OPC UA client

Raises:

Type Description
Exception

If connection fails or circuit breaker is open

Source code in inference/enterprise/workflows/enterprise_blocks/sinks/opc_writer/v1.py
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
def get_connection(
    self,
    url: str,
    user_name: Optional[str],
    password: Optional[str],
    timeout: int,
    max_retries: int = 1,
    base_backoff: float = 0.0,
) -> Client:
    """
    Get a connection from the pool or create a new one.

    This method is thread-safe and will reuse existing healthy connections.
    Uses circuit breaker pattern to fail fast for recently failed servers.

    Args:
        url: OPC UA server URL
        user_name: Optional username for authentication
        password: Optional password for authentication
        timeout: Connection timeout in seconds
        max_retries: Maximum number of connection attempts (default 1)
        base_backoff: Base delay between retries (default 0)

    Returns:
        A connected OPC UA client

    Raises:
        Exception: If connection fails or circuit breaker is open
    """
    key = self._get_connection_key(url, user_name)
    lock = self._get_connection_lock(key)

    with lock:
        # Circuit breaker: fail fast if server recently failed
        if self._is_circuit_open(key):
            logger.debug(
                f"OPC UA Connection Manager circuit breaker open for {url}, "
                f"failing fast (will retry in {self.CIRCUIT_BREAKER_TIMEOUT_SECONDS}s)"
            )
            raise Exception(
                f"CIRCUIT OPEN: Server {url} recently failed, skipping connection attempt. "
                f"Will retry after {self.CIRCUIT_BREAKER_TIMEOUT_SECONDS}s."
            )

        # Check if we have an existing connection
        if key in self._connections:
            logger.debug(f"OPC UA Connection Manager reusing connection for {url}")
            return self._connections[key]

        # Create new connection
        try:
            client = self._create_client(url, user_name, password, timeout)
            self._connect_with_retry(client, url, max_retries, base_backoff)

            # Success - clear any failure record and store in pool
            self._clear_failure(key)
            self._connections[key] = client
            self._connection_metadata[key] = {
                "url": url,
                "user_name": user_name,
                "password": password,
                "timeout": timeout,
                "connected_at": datetime.now(),
            }

            return client
        except Exception as exc:
            # Record failure for circuit breaker
            self._record_failure(key)
            raise

get_pool_stats()

Get statistics about the connection pool.

Source code in inference/enterprise/workflows/enterprise_blocks/sinks/opc_writer/v1.py
342
343
344
345
346
347
348
349
350
351
352
353
354
355
def get_pool_stats(self) -> dict:
    """Get statistics about the connection pool."""
    with self._global_lock:
        return {
            "total_connections": len(self._connections),
            "connections": [
                {
                    "url": meta["url"],
                    "user_name": meta["user_name"],
                    "connected_at": meta["connected_at"].isoformat(),
                }
                for meta in self._connection_metadata.values()
            ],
        }

invalidate_connection(url, user_name)

Invalidate a connection, forcing it to be recreated on next use.

Call this when a connection error occurs during an operation to ensure the next operation gets a fresh connection.

Parameters:

Name Type Description Default
url str

OPC UA server URL

required
user_name Optional[str]

Optional username used for the connection

required
Source code in inference/enterprise/workflows/enterprise_blocks/sinks/opc_writer/v1.py
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
def invalidate_connection(self, url: str, user_name: Optional[str]) -> None:
    """
    Invalidate a connection, forcing it to be recreated on next use.

    Call this when a connection error occurs during an operation to ensure
    the next operation gets a fresh connection.

    Args:
        url: OPC UA server URL
        user_name: Optional username used for the connection
    """
    key = self._get_connection_key(url, user_name)
    lock = self._get_connection_lock(key)

    with lock:
        if key in self._connections:
            self._safe_disconnect(self._connections[key])
            del self._connections[key]
            if key in self._connection_metadata:
                del self._connection_metadata[key]
            logger.debug(
                f"OPC UA Connection Manager invalidated connection for {url}"
            )

release_connection(url, user_name, force_close=False)

Release a connection back to the pool.

By default, connections are kept alive for reuse. Set force_close=True to immediately close the connection.

Parameters:

Name Type Description Default
url str

OPC UA server URL

required
user_name Optional[str]

Optional username used for the connection

required
force_close bool

If True, close the connection instead of keeping it

False
Source code in inference/enterprise/workflows/enterprise_blocks/sinks/opc_writer/v1.py
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
def release_connection(
    self, url: str, user_name: Optional[str], force_close: bool = False
) -> None:
    """
    Release a connection back to the pool.

    By default, connections are kept alive for reuse. Set force_close=True
    to immediately close the connection.

    Args:
        url: OPC UA server URL
        user_name: Optional username used for the connection
        force_close: If True, close the connection instead of keeping it
    """
    if not force_close:
        # Connection stays in pool for reuse
        return

    key = self._get_connection_key(url, user_name)
    lock = self._get_connection_lock(key)

    with lock:
        if key in self._connections:
            self._safe_disconnect(self._connections[key])
            del self._connections[key]
            if key in self._connection_metadata:
                del self._connection_metadata[key]
            logger.debug(f"OPC UA Connection Manager closed connection for {url}")

UnsupportedTypeError

Bases: Exception

Raised when an unsupported value type is specified

Source code in inference/enterprise/workflows/enterprise_blocks/sinks/opc_writer/v1.py
370
371
372
373
class UnsupportedTypeError(Exception):
    """Raised when an unsupported value type is specified"""

    pass

get_available_namespaces(client)

Get list of available namespaces from OPC UA server. Returns empty list if unable to fetch namespaces.

Source code in inference/enterprise/workflows/enterprise_blocks/sinks/opc_writer/v1.py
778
779
780
781
782
783
784
785
786
787
788
789
790
def get_available_namespaces(client: Client) -> List[str]:
    """
    Get list of available namespaces from OPC UA server.
    Returns empty list if unable to fetch namespaces.
    """
    try:
        get_namespace_array = sync_async_client_method(AsyncClient.get_namespace_array)(
            client
        )
        return get_namespace_array()
    except Exception as exc:
        logger.info(f"Failed to get namespace array (non-fatal): {exc}")
        return ["<unable to fetch namespaces>"]

get_connection_manager()

Get the global OPC UA connection manager instance.

Source code in inference/enterprise/workflows/enterprise_blocks/sinks/opc_writer/v1.py
362
363
364
365
366
367
def get_connection_manager() -> OPCUAConnectionManager:
    """Get the global OPC UA connection manager instance."""
    global _connection_manager
    if _connection_manager is None:
        _connection_manager = OPCUAConnectionManager()
    return _connection_manager

get_node_data_type(var)

Get the data type of an OPC UA node. Returns a string representation of the type, or "Unknown" if unable to read.

Source code in inference/enterprise/workflows/enterprise_blocks/sinks/opc_writer/v1.py
802
803
804
805
806
807
808
809
810
811
def get_node_data_type(var) -> str:
    """
    Get the data type of an OPC UA node.
    Returns a string representation of the type, or "Unknown" if unable to read.
    """
    try:
        return str(var.read_data_type_as_variant_type())
    except Exception as exc:
        logger.info(f"Unable to read node data type: {exc}")
        return "Unknown"

opc_connect_and_write_value(url, namespace, user_name, password, object_name, variable_name, value, timeout, node_lookup_mode='hierarchical', value_type='String', max_retries=1, retry_backoff_seconds=0.0)

Connect to OPC UA server and write a value using connection pooling.

Uses the connection manager to reuse existing connections. If no connection exists, attempts to create one. Fails fast on connection errors to avoid blocking the pipeline.

Parameters:

Name Type Description Default
url str

OPC UA server URL

required
namespace str

Namespace URI or index

required
user_name Optional[str]

Optional username for authentication

required
password Optional[str]

Optional password for authentication

required
object_name str

Target object path

required
variable_name str

Variable to write

required
value Union[bool, float, int, str]

Value to write

required
timeout int

Connection timeout in seconds

required
node_lookup_mode Literal['hierarchical', 'direct']

Path lookup strategy ('hierarchical' or 'direct')

'hierarchical'
value_type str

OPC UA data type for the value

'String'
max_retries int

Maximum number of connection attempts (default 1 = no retries)

1
retry_backoff_seconds float

Base delay between retries (default 0 = no delay)

0.0

Returns:

Type Description
Tuple[bool, str]

Tuple of (error_status, message)

Source code in inference/enterprise/workflows/enterprise_blocks/sinks/opc_writer/v1.py
814
815
816
817
818
819
820
821
822
823
824
825
826
827
828
829
830
831
832
833
834
835
836
837
838
839
840
841
842
843
844
845
846
847
848
849
850
851
852
853
854
855
856
857
858
859
860
861
862
863
864
865
866
867
868
869
870
871
872
873
874
875
876
877
878
879
880
881
882
883
884
885
886
887
888
889
890
891
892
893
894
895
896
897
898
899
900
901
902
903
904
def opc_connect_and_write_value(
    url: str,
    namespace: str,
    user_name: Optional[str],
    password: Optional[str],
    object_name: str,
    variable_name: str,
    value: Union[bool, float, int, str],
    timeout: int,
    node_lookup_mode: Literal["hierarchical", "direct"] = "hierarchical",
    value_type: str = "String",
    max_retries: int = 1,
    retry_backoff_seconds: float = 0.0,
) -> Tuple[bool, str]:
    """
    Connect to OPC UA server and write a value using connection pooling.

    Uses the connection manager to reuse existing connections. If no connection
    exists, attempts to create one. Fails fast on connection errors to avoid
    blocking the pipeline.

    Args:
        url: OPC UA server URL
        namespace: Namespace URI or index
        user_name: Optional username for authentication
        password: Optional password for authentication
        object_name: Target object path
        variable_name: Variable to write
        value: Value to write
        timeout: Connection timeout in seconds
        node_lookup_mode: Path lookup strategy ('hierarchical' or 'direct')
        value_type: OPC UA data type for the value
        max_retries: Maximum number of connection attempts (default 1 = no retries)
        retry_backoff_seconds: Base delay between retries (default 0 = no delay)

    Returns:
        Tuple of (error_status, message)
    """
    logger.debug(
        f"OPC Writer attempting to write value={value} to {url}/{object_name}/{variable_name}"
    )

    connection_manager = get_connection_manager()

    try:
        # Get connection from pool (will create new if needed)
        client = connection_manager.get_connection(
            url=url,
            user_name=user_name,
            password=password,
            timeout=timeout,
            max_retries=max_retries,
            base_backoff=retry_backoff_seconds,
        )

        # Perform the write operation
        _opc_write_value(
            client=client,
            namespace=namespace,
            object_name=object_name,
            variable_name=variable_name,
            value=value,
            node_lookup_mode=node_lookup_mode,
            value_type=value_type,
        )

        logger.debug(
            f"OPC Writer successfully wrote value to {url}/{object_name}/{variable_name}"
        )
        return False, "Value set successfully"

    except Exception as exc:
        is_user_config_error = isinstance(exc, USER_CONFIG_ERROR_TYPES)

        # Check the exception chain for wrapped errors
        if not is_user_config_error and hasattr(exc, "__cause__") and exc.__cause__:
            is_user_config_error = isinstance(exc.__cause__, USER_CONFIG_ERROR_TYPES)

        if not is_user_config_error:
            logger.warning(
                f"OPC Writer error (invalidating connection): {type(exc).__name__}: {exc}"
            )
            connection_manager.invalidate_connection(url, user_name)
        else:
            # User configuration errors - connection is fine, just log the error
            logger.error(f"OPC Writer configuration error: {type(exc).__name__}: {exc}")

        return (
            True,
            f"Failed to write {value} to {object_name}:{variable_name} in {url}. Error: {exc}",
        )

safe_disconnect(client)

Safely disconnect from OPC UA server, swallowing any errors

Source code in inference/enterprise/workflows/enterprise_blocks/sinks/opc_writer/v1.py
793
794
795
796
797
798
799
def safe_disconnect(client: Client) -> None:
    """Safely disconnect from OPC UA server, swallowing any errors"""
    try:
        logger.debug("OPC Writer disconnecting from server")
        client.disconnect()
    except Exception as exc:
        logger.debug(f"OPC Writer disconnect error (non-fatal): {exc}")