Coverage for python/lsst/resources/davutils.py: 24%
1099 statements
« prev ^ index » next coverage.py v7.14.1, created at 2026-05-30 08:30 +0000
« prev ^ index » next coverage.py v7.14.1, created at 2026-05-30 08:30 +0000
1# This file is part of lsst-resources.
2#
3# Developed for the LSST Data Management System.
4# This product includes software developed by the LSST Project
5# (https://www.lsst.org).
6# See the COPYRIGHT file at the top-level directory of this distribution
7# for details of code ownership.
8#
9# Use of this source code is governed by a 3-clause BSD-style
10# license that can be found in the LICENSE file.
12from __future__ import annotations
14import base64
15import enum
16import io
17import json
18import logging
19import os
20import posixpath
21import random
22import re
23import stat
24import threading
25import time
26import uuid
27import xml.etree.ElementTree as eTree
28from datetime import UTC, datetime
29from http import HTTPStatus
30from typing import Any, BinaryIO
32try:
33 from typing import override # Python 3.12+
34except ImportError:
35 from typing_extensions import override # Python 3.11
37from urllib.parse import parse_qsl, urlencode, urlparse, urlunparse
39try:
40 import fsspec
41 from fsspec.spec import AbstractFileSystem
42except ImportError:
43 fsspec = None
44 AbstractFileSystem = type
46import yaml
47from astropy import units as u
48from urllib3 import PoolManager, make_headers
49from urllib3.response import HTTPResponse
50from urllib3.util import Retry, Timeout, Url, parse_url
52from lsst.utils.logging import getLogger
53from lsst.utils.timer import time_this
55# Use the same logger than `dav.py`.
56log = getLogger(f"""{__name__.replace(".davutils", ".dav")}""")
59def normalize_path(path: str | None) -> str:
60 """Normalize a path intended to be part of a URL.
62 A path of the form "///a/b/c///../d/e/" would be normalized as "/a/b/d/e".
63 The returned path is always absolute, i.e. starts by "/" and never
64 ends by "/" except when the path is exactly "/" and does not contain
65 "." nor "..". It does not contain consecutive "/" either.
67 Parameters
68 ----------
69 path : `str`, optional
70 Path to normalize (e.g., '/path/to/..///normalize/').
72 Returns
73 -------
74 url : `str`
75 Normalized URL (e.g., '/path/normalize').
76 """
77 return "/" if not path else "/" + posixpath.normpath(path).lstrip("/")
80def normalize_url(url: str, preserve_scheme: bool = False, preserve_path: bool = True) -> str:
81 """Normalize a URL so that scheme be 'http' or 'https' and the URL path
82 is normalized.
84 Parameters
85 ----------
86 url : `str`
87 URL to normalize (e.g., 'davs://example.org:1234///path/to//../dir/').
88 preserve_scheme : `bool`
89 If True the scheme of `url` will be preserved. Otherwise the scheme
90 of the returned normalized URL will be 'http' or 'https'.
91 preserve_path : `bool`
92 If True, the path of `url` will be preserved in the returned
93 normalized URL, otherwise, the returned URL will have '/' as path.
95 Returns
96 -------
97 url : `str`
98 Normalized URL (e.g. 'https://example.org:1234/path/to/dir').
99 """
100 parsed = parse_url(url)
101 if parsed.scheme is None:
102 scheme = "http"
103 else:
104 scheme = parsed.scheme if preserve_scheme else parsed.scheme.replace("dav", "http")
105 path = normalize_path(parsed.path) if preserve_path else "/"
106 return Url(scheme=scheme, host=parsed.host, port=parsed.port, path=path).url
109def redact_url(url: str) -> str:
110 """Return a modified `url` with authorization query redacted.
112 The goal is that this method should be used for logging URLs to avoid
113 leaking authorization tokens.
115 Parameters
116 ----------
117 url : `str`
118 URL to redact.
120 Returns
121 -------
122 redacted_url : `str`
123 For instance, when called with an URL like:
125 https://host.example.org:1234/a/b/c/file.data?key1=value1&key2=value2&authz=token#fragment
127 the returned value would be:
129 https://host.example.org:1234/a/b/c/file.data?key1=value1&key2=value2&authz=....#fragment
130 """
131 parsed_url = urlparse(url)
132 redacted_query: list[tuple[str, str]] = []
133 for pair in parse_qsl(parsed_url.query):
134 redacted_query.append((pair[0], "...." if pair[0] == "authz" else pair[1]))
136 redacted_url = parsed_url._replace(query=urlencode(redacted_query))
137 return urlunparse(redacted_url)
140class DavConfig:
141 """Configurable settings a webDAV client must use when interacting with a
142 particular storage endpoint.
144 Parameters
145 ----------
146 config : `dict[str, str]`
147 Dictionary of configurable settings for the webdav endpoint which
148 base URL is `config["base_url"]`.
150 For instance, if `config["base_url"]` is
152 "davs://webdav.example.org:1234/"
154 any object of class `DavResourcePath` like
156 "davs://webdav.example.org:1234/path/to/any/file"
158 will use the settings in this configuration to configure its client.
159 """
161 # Timeout in seconds to establish a network connection with the remote
162 # server.
163 DEFAULT_TIMEOUT_CONNECT: float = 10.0
165 # Timeout in seconds to read the response to a request sent to a server.
166 # This is total time for reading both the headers and the response body.
167 # It must be large enough to allow for upload and download of files
168 # of typical size the webdav client supports.
169 DEFAULT_TIMEOUT_READ: float = 300.0
171 # Maximum number of network connections to persist against a single
172 # "host:port" pair. If this endpoint client needs to issue more
173 # simultaneous requests than this number, additional network connections
174 # will be created but won't be persisted after use.
175 DEFAULT_PERSISTENT_CONNECTIONS_PER_HOST: int = 20
177 # Size of the buffer (in mebibytes, i.e. 1024*1024 bytes) the webdav
178 # client of this endpoint will use when sending requests and receiving
179 # responses.
180 DEFAULT_BUFFER_SIZE: int = 5
182 # Size of the block (in mebibytes, i.e. 1024*1024 bytes) the webdav
183 # client of this endpoint will use for making partial reads. Each partial
184 # read will request at least this number of bytes, unless the total size
185 # of the file is lower than this value.
186 DEFAULT_BLOCK_SIZE: int = 1
188 # Number of times to retry requests before failing. Retry happens only
189 # under certain conditions.
190 DEFAULT_RETRIES: int = 4
192 # Minimal and maximal retry backoff (in seconds) for the client to compute
193 # the wait time before retrying a request.
194 # A value in this interval is randomly selected as the backoff factor
195 # every time a request is retried.
196 DEFAULT_RETRY_BACKOFF_MIN: float = 1.0
197 DEFAULT_RETRY_BACKOFF_MAX: float = 3.0
199 # Path to a directory or certificate bundle file where the certificates
200 # of the trusted certificate authorities can be found.
201 # Those certificates will be used by the client of the webdav endpoint
202 # to verify the server's host certificate.
203 # If None, the certificates trusted by the system are used.
204 DEFAULT_TRUSTED_AUTHORITIES: str | None = None
206 # User name and password for the client to authenticate to the server.
207 # If specified, HTTP basic authentication is used on all requests.
208 DEFAULT_USER_NAME: str | None = None
209 DEFAULT_USER_PASSWORD: str | None = None
211 # Path to the client certificate and associated private key the webdav
212 # client must present to the server for authentication purposes.
213 # If None, no client certificate is presented.
214 DEFAULT_USER_CERT: str | None = None
215 DEFAULT_USER_KEY: str | None = None
217 # Token the webdav client must sent to the server for authentication
218 # purposes. The token may be the value of the token itself or the path
219 # to a file where the token can be found.
220 DEFAULT_TOKEN: str | None = None
222 # If this option is set to True, the webdav client attempts to reuse
223 # the network connection to the server as long as possible. Note that
224 # the server can unitaleraly decide to close the connection.
225 # If disabled, the connection is closed after each request.
226 DEFAULT_REUSE_CONNECTION: bool = True
228 # Default checksum algorithm to request the server to compute on every
229 # file upload. Not al servers support this.
230 # See RFC 3230 for details.
231 DEFAULT_REQUEST_CHECKSUM: str | None = None
233 # If this option is set to True, the webdav client can return objects
234 # compliant to the fsspec specification.
235 # See: https://filesystem-spec.readthedocs.io
236 DEFAULT_ENABLE_FSSPEC: bool = True
238 # If this option is set to True, memory usage is computed and reported
239 # when executing in debug mode. Computing memory usage is costly, so only
240 # set this when debugging.
241 DEFAULT_COLLECT_MEMORY_USAGE: bool = False
243 # Accepted checksum algorithms. Must be lowercase.
244 ACCEPTED_CHECKSUMS: list[str] = ["adler32", "md5", "sha-256", "sha-512"]
246 def __init__(self, config: dict | None = None) -> None:
247 if config is None: 247 ↛ 250line 247 didn't jump to line 250 because the condition on line 247 was always true
248 config = {}
250 if (base_url := expand_vars(config.get("base_url"))) is None: 250 ↛ 253line 250 didn't jump to line 253 because the condition on line 250 was always true
251 self._base_url = "_default_"
252 else:
253 self._base_url = normalize_url(base_url, preserve_path=False)
255 self._timeout_connect: float = float(config.get("timeout_connect", DavConfig.DEFAULT_TIMEOUT_CONNECT))
256 self._timeout_read: float = float(config.get("timeout_read", DavConfig.DEFAULT_TIMEOUT_READ))
257 self._persistent_connections_per_host: int = int(
258 config.get(
259 "persistent_connections_per_host",
260 DavConfig.DEFAULT_PERSISTENT_CONNECTIONS_PER_HOST,
261 )
262 )
263 self._buffer_size: int = 1_048_576 * int(config.get("buffer_size", DavConfig.DEFAULT_BUFFER_SIZE))
264 self._block_size: int = 1_048_576 * int(config.get("block_size", DavConfig.DEFAULT_BLOCK_SIZE))
265 self._retries: int = int(config.get("retries", DavConfig.DEFAULT_RETRIES))
266 self._retry_backoff_min: float = float(
267 config.get("retry_backoff_min", DavConfig.DEFAULT_RETRY_BACKOFF_MIN)
268 )
269 self._retry_backoff_max: float = float(
270 config.get("retry_backoff_max", DavConfig.DEFAULT_RETRY_BACKOFF_MAX)
271 )
272 self._trusted_authorities: str | None = expand_vars(
273 config.get("trusted_authorities", DavConfig.DEFAULT_TRUSTED_AUTHORITIES)
274 )
275 self._user_name: str | None = expand_vars(config.get("user_name", DavConfig.DEFAULT_USER_NAME))
276 self._user_password: str | None = expand_vars(
277 config.get("user_password", DavConfig.DEFAULT_USER_PASSWORD)
278 )
279 self._user_cert: str | None = expand_vars(config.get("user_cert", DavConfig.DEFAULT_USER_CERT))
280 self._user_key: str | None = expand_vars(config.get("user_key", DavConfig.DEFAULT_USER_KEY))
281 self._token: str | None = expand_vars(config.get("token", DavConfig.DEFAULT_TOKEN))
282 self._reuse_connection: bool = config.get("reuse_connection", DavConfig.DEFAULT_REUSE_CONNECTION)
283 self._enable_fsspec: bool = config.get("enable_fsspec", DavConfig.DEFAULT_ENABLE_FSSPEC)
284 self._frontend_urls: list[str] = self._init_frontend_urls(config=config)
285 self._collect_memory_usage: bool = config.get(
286 "collect_memory_usage", DavConfig.DEFAULT_COLLECT_MEMORY_USAGE
287 )
288 self._request_checksum: str | None = config.get(
289 "request_checksum", DavConfig.DEFAULT_REQUEST_CHECKSUM
290 )
291 if self._request_checksum is not None: 291 ↛ 292line 291 didn't jump to line 292 because the condition on line 291 was never true
292 self._request_checksum = self._request_checksum.lower()
293 if self._request_checksum not in DavConfig.ACCEPTED_CHECKSUMS:
294 raise ValueError(
295 f"""Value for checksum algorithm {self._request_checksum} for storage endpoint """
296 f"""{self._base_url} is not among the accepted values: {DavConfig.ACCEPTED_CHECKSUMS}"""
297 )
299 def _init_frontend_urls(self, config: dict | None = None) -> list[str]:
300 if config is None: 300 ↛ 301line 300 didn't jump to line 301 because the condition on line 300 was never true
301 return []
303 # Initialize the URLs of the frontend servers, if present in
304 # the configuration.
305 frontend_urls: list[str] = []
306 for url in config.get("frontend_base_urls", []): 306 ↛ 308line 306 didn't jump to line 308 because the loop on line 306 never started
307 # Expand environment variables in this URL
308 if (expanded_url := expand_vars(url)) is not None:
309 frontend_urls.append(normalize_url(expanded_url, preserve_path=False))
311 # Eliminate duplicate URLs.
312 frontend_urls = list(set(frontend_urls))
314 # Check that the scheme of this client's base URL is identical to
315 # the scheme of the frontend server URLs.
316 base_url_scheme = parse_url(self._base_url).scheme
317 for url in frontend_urls: 317 ↛ 318line 317 didn't jump to line 318 because the loop on line 317 never started
318 if base_url_scheme != parse_url(url).scheme:
319 raise ValueError(
320 f"""inconsistent scheme in frontend URL {url} for endpoint """
321 f"""with base URL {self._base_url}"""
322 )
324 return frontend_urls
326 @property
327 def base_url(self) -> str:
328 return self._base_url
330 @property
331 def timeout_connect(self) -> float:
332 return self._timeout_connect
334 @property
335 def timeout_read(self) -> float:
336 return self._timeout_read
338 @property
339 def persistent_connections_per_host(self) -> int:
340 return self._persistent_connections_per_host
342 @property
343 def buffer_size(self) -> int:
344 return self._buffer_size
346 @property
347 def block_size(self) -> int:
348 return self._block_size
350 @property
351 def retries(self) -> int:
352 return self._retries
354 @property
355 def retry_backoff_min(self) -> float:
356 return self._retry_backoff_min
358 @property
359 def retry_backoff_max(self) -> float:
360 return self._retry_backoff_max
362 @property
363 def trusted_authorities(self) -> str | None:
364 return self._trusted_authorities
366 @property
367 def token(self) -> str | None:
368 return self._token
370 @property
371 def reuse_connection(self) -> bool:
372 return self._reuse_connection
374 @property
375 def request_checksum(self) -> str | None:
376 return self._request_checksum
378 @property
379 def user_cert(self) -> str | None:
380 return self._user_cert
382 @property
383 def user_key(self) -> str | None:
384 # If no user certificate was specified in the configuration,
385 # ignore the private key, even if it was provided.
386 if self._user_cert is None:
387 return None
389 # If we have a user certificate but not a private key, assume the
390 # private key is included in the same file as the user certificate.
391 # That is typically the case when using a X.509 grid proxy as
392 # client certificate.
393 return self._user_cert if self._user_key is None else self._user_key
395 @property
396 def user_name(self) -> str | None:
397 return self._user_name
399 @property
400 def user_password(self) -> str | None:
401 return self._user_password
403 @property
404 def enable_fsspec(self) -> bool:
405 return self._enable_fsspec
407 @property
408 def collect_memory_usage(self) -> bool:
409 return self._collect_memory_usage
411 @property
412 def frontend_urls(self) -> list[str]:
413 return self._frontend_urls
416class DavConfigPool:
417 """Registry of configurable settings for all known webDAV endpoints.
419 Parameters
420 ----------
421 filename : `list` [ `str` ]
422 List of environment variables or file names to load the configuration
423 from. The first file found in the list will be read and the
424 configuration settings for all webDAV endpoints will be extracted
425 from it. Other files will be ignored.
427 Each component of `filenames` can be an environment variable or
428 the path of a file which itself can include an environment variable,
429 e.g. '$HOME/path/to/config.yaml'.
431 The configuration file is a YAML file with the structure below:
433 - base_url: "davs://webdav1.example.org:1234/"
434 persistent_connections_per_host: 10
435 timeout_connect: 20.0
436 timeout_read: 120.0
437 retries: 3
438 retry_backoff_min: 1.0
439 retry_backoff_max: 3.0
440 user_cert: "${X509_USER_PROXY}"
441 user_key: "${X509_USER_PROXY}"
442 token: "/path/to/bearer/token/file"
443 trusted_authorities: "/etc/grid-security/certificates"
444 buffer_size: 5
445 enable_fsspec: false
446 request_checksum: "md5"
447 collect_memory_usage: false
449 - base_url: "davs://webdav2.example.org:1234/"
450 user_name: "user"
451 user_password: "password"
452 persistent_connections_per_host: 5
453 reuse_connection: false
454 ...
456 All settings are optional. If no settings are found in the
457 configuration file for a particular webDAV endpoint, sensible
458 defaults will be used.
460 There is only a single instance of this class. This thead-safe
461 singleton is intended to be initialized when the module is imported
462 the first time.
463 """
465 _instance = None
466 _lock = threading.Lock()
468 def __new__(cls, filename: str | None = None) -> DavConfigPool:
469 if cls._instance is None: 469 ↛ 474line 469 didn't jump to line 474 because the condition on line 469 was always true
470 with cls._lock:
471 if cls._instance is None: 471 ↛ 474line 471 didn't jump to line 474
472 cls._instance = super().__new__(cls)
474 return cls._instance
476 def __init__(self, filename: str | None = None) -> None:
477 # Create a default configuration. This configuration is
478 # used when a URL doest not match any of the endpoints in the
479 # configuration.
480 self._default_config: DavConfig = DavConfig()
482 # The key of this dictionary is the URL of the webDAV endpoint,
483 # e.g. "davs://host.example.org:1234/"
484 self._configs: dict[str, DavConfig] = {}
486 # Load the configuration from the file we have been provided with,
487 # if any.
488 if filename is None: 488 ↛ 489line 488 didn't jump to line 489 because the condition on line 488 was never true
489 return
491 # filename can be the name of an environment variable or a path.
492 # A path can include environment variables
493 # (e.g. "$HOME/path/to/config.yaml") or "~"
494 # (e.g. "~/path/to/config.yaml")
495 if (filename := os.getenv(filename)) is not None: 495 ↛ 497line 495 didn't jump to line 497 because the condition on line 495 was never true
496 # Expand environment variables and '~' in the file name, if any.
497 filename = os.path.expandvars(filename)
498 filename = os.path.expanduser(filename)
499 with open(filename) as file:
500 for config_item in yaml.safe_load(file):
501 config = DavConfig(config_item)
502 if config.base_url not in self._configs:
503 self._configs[config.base_url] = config
504 else:
505 # We already have a configuration for the same
506 # endpoint. That is likely a human error in
507 # the configuration file.
508 raise ValueError(
509 f"""configuration file {filename} contains two configurations for """
510 f"""endpoint {config.base_url}"""
511 )
513 def get_config_for_url(self, url: str) -> DavConfig:
514 """Return the configuration to use a webDAV client when interacting
515 with the server which hosts the resource at `url`.
517 Parameters
518 ----------
519 url : `str`
520 URL for which to obtain a configuration.
521 """
522 # Select the configuration for the endpoint of the provided URL.
523 normalized_url: str = normalize_url(url, preserve_path=False)
524 if (config := self._configs.get(normalized_url)) is not None:
525 return config
527 # No config was found for the specified URL. Use the default.
528 return self._default_config
530 def _destroy(self) -> None:
531 """Destroy this class singleton instance.
533 Helper method to be used in tests to reset global configuration.
534 """
535 with DavConfigPool._lock:
536 DavConfigPool._instance = None
539def make_retry(config: DavConfig) -> Retry:
540 """Create a ``urllib3.util.Retry`` object from settings in `config`.
542 Parameters
543 ----------
544 config : `DavConfig`
545 Configurable settings for a webDAV storage endpoint.
547 Returns
548 -------
549 retry : `urllib3.util.Retry`
550 Retry object to he used when creating a ``urllib3.PoolManager``.
551 """
552 backoff_min: float = config.retry_backoff_min
553 backoff_max: float = config.retry_backoff_max
554 retry = Retry(
555 # Total number of retries to allow. Takes precedence over other
556 # counts.
557 total=2 * config.retries,
558 # How many connection-related errors to retry on.
559 connect=config.retries,
560 # How many times to retry on read errors.
561 read=config.retries,
562 # Backoff factor to apply between attempts after the second try
563 # (seconds). Compute a random jitter to prevent all the clients which
564 # started at the same time (even on different hosts) to overwhelm the
565 # server by sending requests at the same time.
566 backoff_factor=backoff_min + (backoff_max - backoff_min) * random.random(),
567 # How many times to retry on bad status codes.
568 status=config.retries,
569 # Set of uppercased HTTP method verbs that we should retry on.
570 # We only automatically retry idempotent requests.
571 allowed_methods=frozenset(
572 [
573 "COPY",
574 "DELETE",
575 "GET",
576 "HEAD",
577 "MKCOL",
578 "OPTIONS",
579 "PROPFIND",
580 "PUT",
581 ]
582 ),
583 # HTTP status codes that we should force a retry on.
584 status_forcelist=frozenset(
585 [
586 HTTPStatus.TOO_MANY_REQUESTS, # 429
587 HTTPStatus.INTERNAL_SERVER_ERROR, # 500
588 HTTPStatus.BAD_GATEWAY, # 502
589 HTTPStatus.SERVICE_UNAVAILABLE, # 503
590 HTTPStatus.GATEWAY_TIMEOUT, # 504
591 ]
592 ),
593 # Whether to respect "Retry-After" header on status codes defined
594 # above.
595 respect_retry_after_header=True,
596 )
597 return retry
600class DavClientPool:
601 """Container of reusable webDAV clients, each one specifically configured
602 to talk to a single storage endpoint.
604 Parameters
605 ----------
606 config_pool : `DavConfigPool`
607 Pool of all known webDAV client configurations.
609 Notes
610 -----
611 There is a single instance of this class. This thead-safe singleton is
612 intended to be initialized when the module is imported the first time.
613 """
615 _instance = None
616 _lock = threading.Lock()
618 def __new__(cls, config_pool: DavConfigPool) -> DavClientPool:
619 if cls._instance is None: 619 ↛ 624line 619 didn't jump to line 624 because the condition on line 619 was always true
620 with cls._lock:
621 if cls._instance is None: 621 ↛ 624line 621 didn't jump to line 624
622 cls._instance = super().__new__(cls)
624 return cls._instance
626 def __init__(self, config_pool: DavConfigPool) -> None:
627 self._config_pool: DavConfigPool = config_pool
629 # The key of this dictionnary is a path-stripped URL of the form
630 # "davs://host.example.org:1234/". The value is a reusable
631 # DavClient to interact with that endpoint.
632 self._clients: dict[str, DavClient] = {}
634 def get_client_for_url(self, url: str) -> DavClient:
635 """Return a client for interacting with the endpoint where `url`
636 is hosted.
638 Parameters
639 ----------
640 url : `str`
641 URL for which to obtain a client.
643 Notes
644 -----
645 The returned client is thread-safe. If a client for that endpoint
646 already exists it is reused, otherwise a new client is created
647 with the appropriate configuration for interacting with the storage
648 endpoint.
649 """
650 # If we already have a client for this endpoint reuse it.
651 url = normalize_url(url, preserve_path=False)
652 if (client := self._clients.get(url)) is not None:
653 return client
655 # No client for this endpoint was found. Create a new one and save it
656 # for serving subsequent requests.
657 with DavClientPool._lock:
658 # If another client was created in the meantime by another thread
659 # reuse it.
660 if (client := self._clients.get(url)) is not None:
661 return client
663 config: DavConfig = self._config_pool.get_config_for_url(url)
664 self._clients[url] = self._make_client(url, config)
666 return self._clients[url]
668 def _make_client(self, url: str, config: DavConfig) -> DavClient:
669 """Make a webDAV client for interacting with the server at `url`."""
670 # Check the server implements webDAV protocol and retrieve its
671 # identity so that we can build a client for that specific
672 # server implementation.
673 client = DavClient(url, config)
674 server_details = client.get_server_details(url)
675 server_id = server_details.get("Server", None)
676 accepts_ranges: bool | str | None = server_details.get("Accept-Ranges", None)
677 if accepts_ranges is not None:
678 accepts_ranges = accepts_ranges == "bytes"
680 if server_id is None:
681 # Create a generic webDAV client
682 return DavClient(url, config, accepts_ranges)
684 if server_id.startswith("dCache/"):
685 # Create a client for a dCache webDAV server
686 return DavClientDCache(url, config, accepts_ranges)
687 elif server_id.startswith("XrootD/"):
688 # Create a client for a XrootD webDAV server
689 return DavClientXrootD(url, config, accepts_ranges)
690 else:
691 # Return a generic webDAV client
692 return DavClient(url, config, accepts_ranges)
694 def _destroy(self) -> None:
695 """Destroy this class singleton instance.
697 Helper method to be used in tests to reset global configuration.
698 """
699 with DavClientPool._lock:
700 DavClientPool._instance = None
703class DavFileSizeCache:
704 """Helper class to cache file sizes of recently uploaded files.
706 Parameters
707 ----------
708 default_timeout : `float`, optional
709 Default validity period, in seconds, of the entries in this cache.
710 The validity period for a specific entry can be specified when the
711 entry is added to the cache (see `update_size` method).
713 Notes
714 -----
715 There is a single instance of this class shared by several `DavClient`
716 objects. This singleton is thread safe.
718 Caching file sizes helps preventing sending requests to the server for
719 retrieving the size of recently uploaded files. This is in particular
720 intended to efficiently serve `Butler` requests for the size of a file it
721 just wrote to the datastore.
722 """
724 _instance = None
725 _lock = threading.Lock()
727 def __new__(cls) -> DavFileSizeCache:
728 if cls._instance is None:
729 with cls._lock:
730 if cls._instance is None:
731 cls._instance = super().__new__(cls)
733 return cls._instance
735 def __init__(self, default_timeout: float = 60.0) -> None:
736 # The key of the cache dictionnary is a URL of the form
737 #
738 # "https://host.example.org:1234/path/to/file".
739 #
740 # The value is a triplet (file_size, last_updated, timeout) where:
741 # - 'file_size' is the size of the file in bytes,
742 # - 'last_updated' is the time when this entry was added to the cache
743 # or last updated, in seconds since epoch,
744 # - 'timeout' is the validity period of this cache entry, in seconds,
745 # understood from the moment the cache entry was created.
746 with DavFileSizeCache._lock:
747 if not hasattr(self, "_cache"):
748 self._default_timeout: float = default_timeout
749 self._cache: dict[str, tuple[int, float, float]] = {}
751 def invalidate(self, url: str) -> None:
752 """Invalidate the cache entry for `url`, if any.
754 Parameters
755 ----------
756 url : `str`
757 URL of the file to invalidate which cache entry must be
758 invalidated.
759 """
760 with DavFileSizeCache._lock:
761 self._cache.pop(url, None)
763 def update_size(self, url: str, size: int | None, timeout: float | None = None) -> None:
764 """Update the cache with an entry for `url` which has a size of `size`
765 bytes. This entry is considered valid for a period of `timeout`
766 seconds from now.
768 Parameters
769 ----------
770 url : `str`
771 URL of the file the size to be cached.
772 size : `size` or `None`, optional
773 Size in bytes of the file at `url`. If this value is `None`, the
774 cache is not modified.
775 timeout : `float` or `None`, optional
776 The validity period, in seconds, this size is to be considered
777 valid. If not specified, the default value specified when this
778 object was created will be used for this cache entry.
779 """
780 if size is None:
781 return
783 timeout = self._default_timeout if timeout is None else timeout
784 with DavFileSizeCache._lock:
785 self._cache[url] = (size, time.time(), timeout)
787 def get_size(self, url: str) -> int | None:
788 """Retrieve the cached valued of the size of file at `url`.
790 Parameters
791 ----------
792 url : `str`
793 URL of the file to retrieve the size for.
795 Returns
796 -------
797 `size`: `int` or `None`
798 The cached value of the size of file at `url` if any value was
799 found in the cache, `None` otherwise.
800 `None` is also returned if there is a cached value but its
801 validity period has expired. In this case, the entry associated to
802 `url` is removed from the cache.
803 """
804 with DavFileSizeCache._lock:
805 if (entry := self._cache.get(url, None)) is None:
806 # There is no entry in the cache for this URL
807 return None
809 # There is an entry in the cache for this URL. Check that
810 # its validity period has not yet expired.
811 size, last_updated, timeout = entry
812 if time.time() <= last_updated + timeout:
813 # This entry is stil valid
814 return size
815 else:
816 # This entry is no longer valid. Remove it from the cache.
817 self._cache.pop(url)
818 return None
821def unexpected_status_error(method: str, url: str, resp: HTTPResponse) -> Exception:
822 """Raise an exception from `resp`.
824 Parameters
825 ----------
826 method : `str`
827 The method name triggering the error.
828 url : `str`
829 The URL that cause the error.
830 resp : `resp`
831 The error response.
832 """
833 message = f"Unexpected response to HTTP request {method} {redact_url(url)}: {resp.status} {resp.reason}"
834 body = resp.data.decode()
835 if len(body) > 0:
836 message += f" [response body: {body}]"
838 return ValueError(message)
841class DavClient:
842 """WebDAV client, configured to talk to a single storage endpoint.
844 Instances of this class are thread-safe.
846 Parameters
847 ----------
848 url : `str`
849 Root URL of the storage endpoint (e.g.
850 "https://host.example.org:1234/").
851 config : `DavConfig`
852 Configuration to initialize this client.
853 accepts_ranges : `bool` | `None`
854 Indicate whether the remote server accepts the ``Range`` header in GET
855 requests.
856 """
858 def __init__(self, url: str, config: DavConfig, accepts_ranges: bool | None = None) -> None:
859 # Lock to protect this client fields from concurrent modification.
860 self._lock = threading.Lock()
862 # Base URL of the server this client will interact with.
863 # It is of the form: "davs://host.example.org:1234/"
864 self._base_url: str = url
866 # Configuration settings for the storage endpoint this client
867 # will interact with.
868 self._config: DavConfig = config
870 # Make the authorizer for this client's requests.
871 self._authorizer: Authorizer | None = self._make_authorizer(config=self._config)
873 # Make the pool manager for this client to use for sending
874 # requests to the server.
875 self._pool_manager: PoolManager = self._make_pool_manager(config=self._config)
877 # Parser of PROPFIND responses.
878 self._propfind_parser: DavPropfindParser = DavPropfindParser()
880 # Does the remote server accept a "Range" header in GET requests?
881 # This field is lazy initialized.
882 self._accepts_ranges: bool | None = accepts_ranges
884 # Can this client use a COPY request to duplicate files within a
885 # single webDAV server?
886 # Subclasses can overwrite this setting according to the server
887 # capabilities and compliance to webDAV RFC.
888 self._can_duplicate: bool = True
890 # Cache to store sizes of files this client has recently uploaded
891 # to the server.
892 self._file_size_cache = DavFileSizeCache()
894 def _make_authorizer(self, config: DavConfig) -> Authorizer | None:
895 # If a token was specified in the configuration settings for this
896 # endpoint, prefer it as the authentication method, even if other
897 # authentication settings were also specified.
898 if config.token is not None:
899 return TokenAuthorizer(token=config.token)
900 elif config.user_name is not None and config.user_password is not None:
901 return BasicAuthorizer(user_name=config.user_name, user_password=config.user_password)
903 return None
905 def _make_pool_manager(self, config: DavConfig) -> PoolManager:
906 # Prepare the trusted authorities certificates
907 ca_certs, ca_cert_dir = None, None
908 if config.trusted_authorities is not None:
909 if os.path.isdir(config.trusted_authorities):
910 ca_cert_dir = config.trusted_authorities
911 elif os.path.isfile(config.trusted_authorities):
912 ca_certs = config.trusted_authorities
913 else:
914 raise FileNotFoundError(
915 f"Trusted authorities file or directory {config.trusted_authorities} does not exist"
916 )
918 # If a token was specified for this endpoint don't use the
919 # <user certificate, private key> pair, even if they were also
920 # specified.
921 user_cert, user_key = None, None
922 if config.token is None:
923 user_cert = config.user_cert
924 user_key = config.user_key
926 # Pool manager for sending requests. Connections in this pool manager
927 # are generally left open by the client but the front-end server may
928 # choose to close them in some specific situations. For instance,
929 # whe serving a PUT request, the front server may redirect to a
930 # backend server and close the network connection making it
931 # unsuable for subsequent requests.
932 #
933 # In addition, the client may also choose to explicitly close the
934 # network connection after receiving a response.
935 return PoolManager(
936 # Number of connection pools to cache before discarding the least
937 # recently used pool. Each connection pool manages network
938 # connections to a single host, so this is basically the number
939 # of "host:port" we persist network connections to.
940 num_pools=200,
941 # Number of connections to the same "host:port" to persist for
942 # later reuse. More than 1 is useful in multithreaded situations.
943 # If more than this number of network connections are needed at
944 # a particular moment, they will be created and discarded after
945 # use.
946 maxsize=config.persistent_connections_per_host,
947 # Retry configuration to use by default with requests sent to
948 # host in the front end.
949 retries=make_retry(config),
950 # Socket timeout in seconds for each individual connection.
951 timeout=Timeout(
952 connect=config.timeout_connect,
953 read=config.timeout_read,
954 ),
955 # Size in bytes of the buffer for reading/writing data from/to
956 # the underlying socket.
957 blocksize=config.buffer_size,
958 # Client certificate and private key for esablishing TLS
959 # connections. If None, no client certificate is sent to the
960 # server. Only relevant for endpoints using secure HTTP protocol.
961 cert_file=user_cert,
962 key_file=user_key,
963 # We require verification of the server certificate.
964 cert_reqs="CERT_REQUIRED",
965 # Directory where the certificates of the trusted certificate
966 # authorities can be found. The contents of that directory
967 # must be as expected by OpenSSL.
968 ca_cert_dir=ca_cert_dir,
969 # Path to a file of concatenated CA certificates in PEM format.
970 ca_certs=ca_certs,
971 )
973 def get_server_details(self, url: str) -> dict[str, str]:
974 """Retrieve the details of the server and check it advertises
975 compliance to class 1 of webDAV protocol.
977 Parameters
978 ----------
979 url : `str`
980 URL to check.
982 Returns
983 -------
984 details: `dic[str, str]`
985 The keys of the returned dictionary can be "Server" and
986 "Accept-Ranges". Any of those keys may not exist in the returned
987 dictionary if the server did not include it in its response.
989 The values are the values of the corresponding
990 headers found in the response to the OPTIONS request.
991 Examples of values for the "Server" header are 'dCache/9.2.4' or
992 'XrootD/v5.7.1'.
993 """
994 # Check that the value "1" is part of the value of the "DAV" header in
995 # the response to an 'OPTIONS' request.
996 #
997 # We don't rely on webDAV locks, so a server complying to class 1 is
998 # enough for our purposes. All webDAV servers must advertise at least
999 # compliance class "1".
1000 #
1001 # Compliance classes are documented in
1002 # http://www.webdav.org/specs/rfc4918.html#dav.compliance.classes
1003 #
1004 # Examples of values for header DAV are:
1005 # DAV: 1, 2
1006 # DAV: 1, <http://apache.org/dav/propset/fs/1>
1007 resp = self.options(url)
1008 if "DAV" not in resp.headers:
1009 raise ValueError(f"Server of {resp.geturl()} does not implement webDAV protocol")
1011 if "1" not in resp.headers.get("DAV").replace(" ", "").split(","):
1012 raise ValueError(
1013 f"Server of {resp.geturl()} does not advertise required compliance to webDAV protocol class 1"
1014 )
1016 # The value of 'Server' header is expected to be of the form
1017 # 'dCache/9.2.4' or 'XrootD/v5.7.1'. Not all servers include such a
1018 # header in their response to an OPTIONS request.
1019 details: dict[str, str] = {}
1020 for header in ("Server", "Accept-Ranges"):
1021 value = resp.headers.get(header, None)
1022 if value is not None:
1023 details[header] = value
1025 return details
1027 def _get_response_url(self, resp: HTTPResponse, default_url: str) -> str:
1028 """Return the URL that response `resp` was obtained from.
1030 If `resp` contains no redirection history, return `default_url`.
1031 """
1032 if resp.retries is None:
1033 return default_url
1035 if len(resp.retries.history) == 0:
1036 return default_url
1038 return str(resp.retries.history[-1].redirect_location)
1040 def _rewrite_url_for_frontend(self, url: str) -> str:
1041 """Return a URL to reach one of the frontend servers that serves
1042 requests sent against `url`.
1044 Parameters
1045 ----------
1046 url : `str`
1047 Target URL.
1049 Returns
1050 -------
1051 url: `str`
1052 URL to reach one of this client's frontend servers. If `url` does
1053 not target this client's frontend servers, the returned value
1054 is `url` unmodified.
1055 """
1056 # Do nothing if this URL does not match this client's base URL. This
1057 # happens, for instance, when `url` is a redirection to a backend
1058 # server, so we don't want to rewrite it.
1059 #
1060 # Also, don't rewrite the URL if we don't have frontends configured
1061 # for this client.
1062 if not self._config.frontend_urls or not url.startswith(self._base_url):
1063 return url
1065 # Randomly select one of the configured frontends and return a modified
1066 # URL which uses the selected frontend instead of the original one.
1067 return random.choice(self._config.frontend_urls) + url.removeprefix(self._base_url)
1069 def _request(
1070 self,
1071 method: str,
1072 url: str,
1073 headers: dict[str, str] | None = None,
1074 body: BinaryIO | bytes | str | None = None,
1075 preload_content: bool = True,
1076 redirect: bool = True,
1077 pool_manager: PoolManager | None = None,
1078 **kwargs: dict[Any, Any],
1079 ) -> HTTPResponse:
1080 """Send a generic HTTP request and return the response.
1082 Parameters
1083 ----------
1084 method : `str`
1085 Request method, e.g. 'GET', 'PUT', 'PROPFIND'.
1086 url : `str`
1087 Target URL.
1088 headers : `dict[str, str]`, optional
1089 Headers to sent with the request.
1090 body : `bytes` or `str` or `None`, optional
1091 Request body.
1092 preload_content : `bool`, optional
1093 If True, the response body is downloaded and can be retrieved
1094 via the returned response `.data` property. If False, the
1095 caller needs to call `.read()` on the returned response object to
1096 download the body, either entirely in one call or by chunks.
1097 redirect : `bool`, optional
1098 If True, automatically handle redirects. If False, the returned
1099 response may contain a redirection to another location.
1100 pool_manager : `PoolManager`, optional
1101 Pool manager to use for sending this request. If not provided,
1102 this client's pool manager is used.
1103 kwargs : `dict[Any, Any]`, optional
1104 Keyword arguments to pass unmodified to
1105 `urllib3.PoolManager.request()`.
1107 Returns
1108 -------
1109 resp: `HTTPResponse`
1110 Response to the request as received from the server.
1111 """
1112 # Retrieve the URL we must use to send this request to one of this
1113 # client's configured frontend servers.
1114 url = self._rewrite_url_for_frontend(url)
1116 # If this client is configured not to reuse the network connection
1117 # with the server, add a "Connection: close" header to this request.
1118 #
1119 # However, if the caller has explicitly specified a "Connection"
1120 # header, whatever its value, don't modify it.
1121 headers = {} if headers is None else dict(headers)
1122 if "Connection" not in headers and not self._config.reuse_connection:
1123 headers.update({"Connection": "close"})
1125 # If an authorizer (basic or token) is configured for this client,
1126 # allow it to set the "Authorization" header to this outgoing request.
1127 if self._authorizer is not None:
1128 self._authorizer.set_authorization(headers)
1130 if log.isEnabledFor(logging.DEBUG):
1131 annotation = ""
1132 if method == "GET" and "Range" in headers:
1133 byte_range = headers.get("Range", "").removeprefix("bytes=")
1134 annotation = f" (byte range: {byte_range})"
1136 log.debug("sending request %s %s%s", method, redact_url(url), annotation)
1138 if pool_manager is None:
1139 pool_manager = self._pool_manager
1141 with time_this(
1142 log,
1143 msg="%s %s",
1144 args=(method, url),
1145 mem_usage=self._config.collect_memory_usage,
1146 mem_unit=u.mebibyte,
1147 ):
1148 return pool_manager.request(
1149 method,
1150 url,
1151 body=body,
1152 headers=headers,
1153 preload_content=preload_content,
1154 redirect=redirect,
1155 **kwargs,
1156 )
1158 def _options(
1159 self,
1160 url: str,
1161 headers: dict[str, str] | None = None,
1162 pool_manager: PoolManager | None = None,
1163 ) -> HTTPResponse:
1164 """Send a HTTP OPTIONS request and return the response unmodified.
1166 Parameters
1167 ----------
1168 url : `str`
1169 Target URL.
1170 headers : `dict[str, str]`, optional
1171 Headers to sent with the request.
1172 pool_manager : `PoolManager`, optional
1173 Pool manager to use to send this request.
1175 Returns
1176 -------
1177 resp: `HTTPResponse`
1178 Response to the request as received from the server.
1180 Notes
1181 -----
1182 This method is intended for subclasses to override when needed.
1183 """
1184 return self._request("OPTIONS", url=url, headers=headers, pool_manager=pool_manager)
1186 def _copy(
1187 self,
1188 url: str,
1189 headers: dict[str, str] | None = None,
1190 preload_content: bool = True,
1191 pool_manager: PoolManager | None = None,
1192 ) -> HTTPResponse:
1193 """Send a webDAV COPY request and return the response unmodified.
1195 Parameters
1196 ----------
1197 url : `str`
1198 Target URL.
1199 headers : `dict[str, str]`, optional
1200 Headers to sent with the request.
1201 pool_manager : `PoolManager`, optional
1202 Pool manager to use to send this request.
1204 Notes
1205 -----
1206 This method is intended for subclasses to override when needed.
1207 """
1208 return self._request(
1209 "COPY", url=url, headers=headers, preload_content=preload_content, pool_manager=pool_manager
1210 )
1212 def _delete(
1213 self,
1214 url: str,
1215 headers: dict[str, str] | None = None,
1216 pool_manager: PoolManager | None = None,
1217 ) -> HTTPResponse:
1218 """Send a HTTP DELETE request and return the response unmodified.
1220 Parameters
1221 ----------
1222 url : `str`
1223 Target URL.
1224 headers : `dict[str, str]`, optional
1225 Headers to sent with the request.
1226 pool_manager : `PoolManager`, optional
1227 Pool manager to use to send this request.
1229 Notes
1230 -----
1231 This method is intended for subclasses to override when needed.
1232 """
1233 return self._request("DELETE", url=url, headers=headers, pool_manager=pool_manager)
1235 def _get(
1236 self,
1237 url: str,
1238 headers: dict[str, str] | None = None,
1239 preload_content: bool = True,
1240 redirect: bool = True,
1241 pool_manager: PoolManager | None = None,
1242 ) -> HTTPResponse:
1243 """Send a HTTP GET request and return the response unmodified.
1245 Parameters
1246 ----------
1247 url : `str`
1248 Target URL.
1249 headers : `dict[str, str]`, optional
1250 Headers to sent with the request.
1251 preload_content : `bool`, optional
1252 If True, the response body is downloaded and can be retrieved
1253 via the returned response `.data` property. If False, the
1254 caller needs to call the `.read()` on the returned response
1255 object to download the body.
1256 redirect : `bool`, optional
1257 If True, follow redirections.
1258 pool_manager : `PoolManager`, optional
1259 Pool manager to send the request through.
1261 Returns
1262 -------
1263 resp: `HTTPResponse`
1264 Response to the GET request as received from the server.
1266 Notes
1267 -----
1268 This method is intended for subclasses to override when needed.
1269 """
1270 return self._request(
1271 "GET",
1272 url=url,
1273 headers=headers,
1274 preload_content=preload_content,
1275 redirect=redirect,
1276 pool_manager=pool_manager,
1277 )
1279 def _head(
1280 self,
1281 url: str,
1282 headers: dict[str, str] | None = None,
1283 pool_manager: PoolManager | None = None,
1284 ) -> HTTPResponse:
1285 """Send a HTTP HEAD request and return the response.
1287 Parameters
1288 ----------
1289 url : `str`
1290 Target URL.
1291 headers : `bool`
1292 If the target URL is not found, raise an exception. Otherwise
1293 just return the response.
1294 pool_manager : `PoolManager`, optional
1295 Pool manager to use to send this request.
1297 Notes
1298 -----
1299 This method is intended for subclasses to override when needed.
1300 """
1301 return self._request("HEAD", url=url, headers=headers, pool_manager=pool_manager)
1303 def _mkcol(
1304 self,
1305 url: str,
1306 headers: dict[str, str] | None = None,
1307 pool_manager: PoolManager | None = None,
1308 ) -> HTTPResponse:
1309 """Send a webDAV MKCOL request and return the response unmodified.
1311 Parameters
1312 ----------
1313 url : `str`
1314 Target URL.
1315 headers : `dict[str, str]`, optional
1316 Headers to sent with the request.
1317 pool_manager : `PoolManager`, optional
1318 Pool manager to use to send this request.
1320 Notes
1321 -----
1322 This method is intended for subclasses to override when needed.
1323 """
1324 return self._request("MKCOL", url=url, headers=headers, pool_manager=pool_manager)
1326 def _move(
1327 self,
1328 url: str,
1329 headers: dict[str, str] | None = None,
1330 pool_manager: PoolManager | None = None,
1331 ) -> HTTPResponse:
1332 """Send a webDAV MOVE request and return the response unmodified.
1334 Parameters
1335 ----------
1336 url : `str`
1337 Target URL.
1338 headers : `dict[str, str]`, optional
1339 Headers to sent with the request.
1340 pool_manager : `PoolManager`, optional
1341 Pool manager to use to send this request.
1343 Notes
1344 -----
1345 This method is intended for subclasses to override when needed.
1346 """
1347 return self._request("MOVE", url=url, headers=headers, pool_manager=pool_manager)
1349 def _propfind(
1350 self,
1351 url: str,
1352 headers: dict[str, str] | None = None,
1353 body: str = "",
1354 pool_manager: PoolManager | None = None,
1355 ) -> HTTPResponse:
1356 """Send a webDAV PROPFIND request and return the response unmodified.
1358 Parameters
1359 ----------
1360 url : `str`
1361 Target URL.
1362 headers : `dict[str, str]`, optional
1363 Headers to sent with the request.
1364 body : `str`, optional
1365 Request body.
1366 pool_manager : `PoolManager`, optional
1367 Pool manager to use to send this request.
1369 Notes
1370 -----
1371 This method is intended for subclasses to override when needed.
1372 """
1373 return self._request("PROPFIND", url=url, headers=headers, body=body, pool_manager=pool_manager)
1375 def _put(
1376 self,
1377 url: str,
1378 headers: dict[str, str] | None = None,
1379 body: BinaryIO | bytes = b"",
1380 preload_content: bool = True,
1381 redirect: bool = True,
1382 pool_manager: PoolManager | None = None,
1383 ) -> HTTPResponse:
1384 """Send a HTTP PUT request and return the response unmodified.
1386 Parameters
1387 ----------
1388 url : `str`
1389 Target URL.
1390 headers : `dict[str, str]`, optional
1391 Headers to sent with the request.
1392 body : `BinaryIO` or `bytes`, optional
1393 Request body.
1394 preload_content : `bool`, optional
1395 If True, the response body is downloaded and can be retrieved
1396 via the returned response `.data` property. If False, the
1397 caller needs to call the `.read()` on the returned response
1398 object to download the body.
1399 redirect : `bool`, optional
1400 If True, follow redirections.
1401 pool_manager : `PoolManager`, optional
1402 Pool manager to send the request through.
1404 Returns
1405 -------
1406 resp: `HTTPResponse`
1407 Response to the PUT request as received from the server.
1409 Notes
1410 -----
1411 This method is intended for subclasses to override when needed.
1412 """
1413 # Disable retries when we know the request is not idempotent. In
1414 # particular, when the body of the request is an `io.BufferedReader`,
1415 # any attempt to use that body may totally or partially consume it.
1416 # That means that in case of a retry, the last successful attempt may
1417 # end up uploading an incomplete body and, as a consequence, the
1418 # resulting uploaded file may be either incomplete or have a length
1419 # of zero.
1420 #
1421 # So we only retry a PUT request when the body is an instance of
1422 # `bytes`.
1423 #
1424 # Note that we cannot set `retries` to False since that setting would
1425 # also disable redirection. To disable retries only, we must explicitly
1426 # set the `retries` keyword argument to integer value zero (0).
1427 #
1428 # See documentation:
1429 # https://urllib3.readthedocs.io/en/stable/user-guide.html#retrying-requests
1430 kwargs: dict[Any, Any] = {} if isinstance(body, bytes) else {"retries": 0}
1432 return self._request(
1433 "PUT",
1434 url=url,
1435 headers=headers,
1436 body=body,
1437 preload_content=preload_content,
1438 redirect=redirect,
1439 pool_manager=pool_manager,
1440 **kwargs,
1441 )
1443 def head(
1444 self,
1445 url: str,
1446 headers: dict[str, str] | None = None,
1447 ) -> HTTPResponse:
1448 """Send a HTTP HEAD request, process and return the response
1449 only if successful.
1451 Parameters
1452 ----------
1453 url : `str`
1454 Target URL.
1455 headers : `bool`
1456 If the target URL is not found, raise an exception. Otherwise
1457 just return the response.
1458 """
1459 headers = {} if headers is None else dict(headers)
1460 resp = self._head(url=url, headers=headers)
1461 match resp.status:
1462 case HTTPStatus.OK:
1463 return resp
1464 case HTTPStatus.NOT_FOUND:
1465 raise FileNotFoundError(f"No file found at {resp.geturl()}")
1466 case _:
1467 raise unexpected_status_error("HEAD", url, resp)
1469 def get(
1470 self,
1471 url: str,
1472 headers: dict[str, str] | None = None,
1473 preload_content: bool = True,
1474 redirect: bool = True,
1475 ) -> tuple[str, HTTPResponse]:
1476 """Send a HTTP GET request.
1478 Parameters
1479 ----------
1480 url : `str`
1481 Target URL.
1482 headers : `dict[str, str]`, optional
1483 Headers to sent with the request.
1484 preload_content : `bool`, optional
1485 If True, the response body is downloaded and can be retrieved
1486 via the returned response `.data` property. If False, the
1487 caller needs to call the `.read()` on the returned response
1488 object to download the body.
1489 redirect : `bool`, optional
1490 If True, follow redirections.
1492 Returns
1493 -------
1494 url: `str`
1495 The URL we used to obtain this response. It may be different from
1496 the URL passed as argument in case of redirection.
1497 resp: `HTTPResponse`
1498 Response to the GET request as received from the server.
1499 """
1500 # Send the GET request to the frontend servers.
1501 headers = {} if headers is None else dict(headers)
1502 resp = self._get(
1503 url,
1504 headers=headers,
1505 preload_content=preload_content,
1506 redirect=redirect,
1507 )
1508 match resp.status:
1509 case HTTPStatus.OK | HTTPStatus.PARTIAL_CONTENT:
1510 return self._get_response_url(resp, default_url=url), resp
1511 case HTTPStatus.NOT_FOUND:
1512 raise FileNotFoundError(f"No file found at {resp.geturl()}")
1513 case status if status in resp.REDIRECT_STATUSES and not redirect:
1514 # This response is a redirection but we are asked not to
1515 # follow redirections, so return this response as is.
1516 return self._get_response_url(resp, default_url=url), resp
1517 case _:
1518 raise unexpected_status_error("GET", url, resp)
1520 def options(
1521 self,
1522 url: str,
1523 headers: dict[str, str] | None = None,
1524 ) -> HTTPResponse:
1525 """Send a HTTP OPTIONS request and return the response on success.
1527 Parameters
1528 ----------
1529 url : `str`
1530 Target URL.
1531 headers : `dict` [`str`, `str`], optional
1532 Headers to sent with the request.
1534 Returns
1535 -------
1536 resp: `HTTPResponse`
1537 Response to the request as received from the server.
1538 """
1539 resp = self._options(url=url, headers=headers)
1540 match resp.status:
1541 case HTTPStatus.OK | HTTPStatus.CREATED:
1542 return resp
1543 case _:
1544 raise unexpected_status_error("OPTIONS", url, resp)
1546 def propfind(
1547 self,
1548 url: str,
1549 headers: dict[str, str] | None = None,
1550 body: str = "",
1551 depth: str = "0",
1552 ) -> HTTPResponse:
1553 """Send a HTTP PROPFIND request and return the unmodified response on
1554 success.
1556 Parameters
1557 ----------
1558 url : `str`
1559 Target URL.
1560 headers : `dict[str, str]`, optional
1561 Headers to sent with the request.
1562 body : `str`, optional
1563 Request body.
1564 depth : `str`, optional
1565 ???.
1566 """
1567 headers = {} if headers is None else dict(headers)
1568 headers.update(
1569 {
1570 "Depth": depth,
1571 "Content-Type": 'application/xml; charset="utf-8"',
1572 "Content-Length": str(len(body)),
1573 }
1574 )
1575 resp = self._propfind(url=url, headers=headers, body=body)
1576 match resp.status:
1577 case HTTPStatus.MULTI_STATUS | HTTPStatus.NOT_FOUND:
1578 return resp
1579 case _:
1580 raise unexpected_status_error("PROPFIND", url, resp)
1582 def put(
1583 self,
1584 url: str,
1585 headers: dict[str, str] | None = None,
1586 data: BinaryIO | bytes = b"",
1587 ) -> int | None:
1588 """Send a HTTP PUT request.
1590 Parameters
1591 ----------
1592 url : `str`
1593 Target URL.
1594 headers : `dict[str, str]`, optional
1595 Headers to sent with the request.
1596 data : `BinaryIO` or `bytes`
1597 Request body.
1599 Returns
1600 -------
1601 size : `int | None`
1602 The size in bytes of the file uploaded. Can be `None` if the size
1603 could not be retrieved.
1604 """
1605 # Send a PUT request with empty body and handle redirection. This
1606 # is useful if the server redirects us; since we cannot rewind the
1607 # data we are uploading, we don't start uploading data until we
1608 # connect to the server that will actually serve our request.
1609 frontend_headers = {} if headers is None else dict(headers)
1610 frontend_headers.update({"Content-Length": "0"})
1611 resp = self._put(url, headers=frontend_headers, body=b"", redirect=False)
1612 match resp.status:
1613 case HTTPStatus.OK | HTTPStatus.CREATED | HTTPStatus.NO_CONTENT:
1614 redirect_url = url
1615 case status if status in resp.REDIRECT_STATUSES:
1616 redirect_url = resp.headers.get("Location")
1617 case _:
1618 raise unexpected_status_error("PUT", url, resp)
1620 # We may have been redirectred. Upload the file contents to
1621 # its final destination.
1623 # Ask the server to compute and record a checksum of the uploaded
1624 # file contents, for later integrity checks. Since we don't compute
1625 # the digest ourselves while uploading the data, we cannot control
1626 # after the request is complete that the data we uploaded is
1627 # identical to the data recorded by the server, but at least the
1628 # server has recorded a digest of the data it stored.
1629 #
1630 # See RFC-3230 for details and
1631 # https://www.iana.org/assignments/http-dig-alg/http-dig-alg.xhtml
1632 # for the list of supported digest algorithhms.
1633 #
1634 # In addition, note that not all servers implement this RFC so
1635 # the checksum reqquest may be ignored by the server.
1636 backend_headers = {} if headers is None else dict(headers)
1637 if (checksum := self._config.request_checksum) is not None:
1638 backend_headers.update({"Want-Digest": checksum})
1640 resp = self._put(redirect_url, body=data, headers=backend_headers)
1641 match resp.status:
1642 case HTTPStatus.OK | HTTPStatus.CREATED | HTTPStatus.NO_CONTENT:
1643 # Send a HEAD request to retrieve the size of the file we
1644 # just uploaded
1645 resp = self.head(redirect_url)
1646 size = int(resp.headers.get("Content-Length", -1))
1647 return None if size == -1 else size
1648 case _:
1649 raise unexpected_status_error("PUT", redirect_url, resp)
1651 def _get_temporary_basename(self, basename: str, prefix: str) -> str:
1652 """Return a basename for a temporary file."""
1653 unique_id = str(uuid.uuid4())
1654 return f"{prefix}.{unique_id}.{basename}"
1656 def _split_parent_and_basename(self, url: str) -> tuple[str, str]:
1657 """Return the URL of the parent directory and the basename from
1658 `url`.
1659 """
1660 parsed: Url = parse_url(url)
1661 normalized_path = normalize_path(parsed.path)
1662 parent_path = posixpath.dirname(normalized_path)
1663 basename = posixpath.basename(normalized_path)
1664 parent_url = Url(
1665 scheme=parsed.scheme,
1666 auth=parsed.auth,
1667 host=parsed.host,
1668 port=parsed.port,
1669 path=parent_path,
1670 query=parsed.query,
1671 fragment=parsed.fragment,
1672 ).url
1673 return parent_url, basename
1675 def _parent(self, url: str) -> str:
1676 """Return the URL of the parent directory to `url`."""
1677 parent_url, _ = self._split_parent_and_basename(url)
1678 return parent_url
1680 def _make_temporary_url(self, url: str, prefix: str = ".tmp") -> str:
1681 """Return the URL of a temporary file based on `url`."""
1682 parent_url, basename = self._split_parent_and_basename(url)
1683 temporary_basename = self._get_temporary_basename(basename=basename, prefix=prefix)
1684 return f"{parent_url}/{temporary_basename}"
1686 def exists(self, url: str) -> bool:
1687 """Return True if a file or directory exists at `url`.
1689 Parameters
1690 ----------
1691 url : `str`
1692 Target URL.
1694 Returns
1695 -------
1696 result: `bool`
1697 True if there is an object at `url`.
1698 """
1699 return self.stat(url).exists
1701 def size(self, url: str) -> int:
1702 """Return the size in bytes of resource at `url`.
1704 If `url` designates a directory, the size is zero.
1706 Parameters
1707 ----------
1708 url : `str`
1709 Target URL.
1711 Returns
1712 -------
1713 size: `int`
1714 The number of bytes of the resource located at `url`.
1715 """
1716 # Check if we have the size of this URL in our cache
1717 if (size := self._file_size_cache.get_size(url)) is not None:
1718 return size
1720 stat = self.stat(url)
1721 if not stat.exists:
1722 raise FileNotFoundError(f"No file or directory found at {url}")
1723 else:
1724 return stat.size
1726 def is_dir(self, url: str) -> bool:
1727 """Return True if a directory exists at `url`.
1729 Parameters
1730 ----------
1731 url : `str`
1732 Target URL.
1734 Returns
1735 -------
1736 result: `bool`
1737 True if there is a directory at `url`.
1738 """
1739 return self.stat(url).is_dir
1741 def mkcol(self, url: str) -> None:
1742 """Create a directory at `url`.
1744 If a directory already exists at `url` no error is returned nor
1745 exception is raised. An exception is raised if a file exists at `url`.
1747 Parameters
1748 ----------
1749 url : `str`
1750 Target URL.
1751 """
1752 resp = self._mkcol(url=url)
1753 match resp.status:
1754 case HTTPStatus.CREATED | HTTPStatus.METHOD_NOT_ALLOWED:
1755 return
1756 case HTTPStatus.CONFLICT:
1757 # The parent directory does not exist. Create it first except
1758 # if the parent's path is "/".
1759 parent = self._parent(url)
1760 if not parent.endswith("/"):
1761 self.mkcol(parent)
1762 resp = self._mkcol(url=url)
1763 case _:
1764 raise ValueError(
1765 f"Can not create directory {resp.geturl()}: status {resp.status} {resp.reason}"
1766 )
1768 def stat(self, url: str) -> DavFileMetadata:
1769 """Return some properties of file or directory located at `url`.
1771 Parameters
1772 ----------
1773 url : `str`
1774 Target URL.
1776 Returns
1777 -------
1778 result: `DavResourceMetadata`
1779 Details of the resources at `url`. If no resource was found at
1780 that URL no exception is raised. Instead the returned details allow
1781 for detecting that the resource does not exist.
1783 The returned value should include fields to determine
1784 if there is a file or a directory at that `url` and if so, its
1785 size and kind (file or directory). Other fields may also be
1786 included depending on the implementation of the webDAV protocol
1787 by the server.
1788 """
1789 # Request the minimum set of DAV properties.
1790 body = (
1791 """<?xml version="1.0" encoding="utf-8"?>"""
1792 """<D:propfind xmlns:D="DAV:">"""
1793 """<D:prop>"""
1794 """<D:resourcetype/>"""
1795 """<D:getcontentlength/>"""
1796 """<D:getlastmodified/>"""
1797 """</D:prop>"""
1798 """</D:propfind>"""
1799 )
1800 resp = self.propfind(url, body=body, depth="0")
1801 match resp.status:
1802 case HTTPStatus.NOT_FOUND:
1803 href = url.replace(self._base_url, "", 1)
1804 return DavFileMetadata(base_url=self._base_url, href=href)
1805 case HTTPStatus.MULTI_STATUS:
1806 property = self._propfind_parser.parse(resp.data)[0]
1807 return DavFileMetadata.from_property(base_url=self._base_url, property=property)
1808 case _:
1809 raise unexpected_status_error("PROPFIND", url, resp)
1811 def info(self, url: str, name: str | None = None) -> dict[str, Any]:
1812 """Return the details about the file or directory at `url`.
1814 Parameters
1815 ----------
1816 url : `str`
1817 Target URL.
1818 name : `str`
1819 Name of the object to be included in the returned value. If None,
1820 the `url` is used as name.
1822 Returns
1823 -------
1824 result: `dict`
1825 For an existing file, the returned value has the form:
1827 .. code-block:: json
1829 {
1830 "name": name,
1831 "size": 1234,
1832 "type": "file",
1833 "last_modified":
1834 datetime.datetime(2025, 4, 10, 15, 12, 51, 227854),
1835 "checksums": {
1836 "adler32": "0fc5f83f",
1837 "md5": "1f57339acdec099c6c0a41f8e3d5fcd0",
1838 }
1839 }
1841 For an existing directory, the returned value has the form:
1843 .. code-block:: json
1845 {
1846 "name": name,
1847 "size": 0,
1848 "type": "directory",
1849 "last_modified":
1850 datetime.datetime(2025, 4, 10, 15, 12, 51, 227854),
1851 "checksums": {},
1852 }
1854 For a non-existing file or directory, the returned value has the
1855 form:
1857 .. code-block:: json
1859 {
1860 "name": name,
1861 "size": None,
1862 "type": None,
1863 "last_modified": datetime.datetime(1, 1, 1, 0, 0),
1864 "checksums": {},
1865 }
1867 Notes
1868 -----
1869 The format of the returned directory is inspired and compatible with
1870 `fsspec`.
1872 The size of existing directories is always zero. The `checksums`
1873 dictionary is empty for directories and may be empty for files if the
1874 server does not compute and store the checksum of the files it stores.
1875 """
1876 result: dict[str, Any] = {
1877 "name": name if name is not None else url,
1878 "type": None,
1879 "size": None,
1880 "last_modified": datetime.min,
1881 "checksums": {},
1882 }
1883 metadata = self.stat(url)
1884 if not metadata.exists:
1885 return result
1887 result.update(
1888 {
1889 "type": "directory" if metadata.is_dir else "file",
1890 "size": metadata.size,
1891 "last_modified": metadata.last_modified,
1892 "checksums": metadata.checksums,
1893 }
1894 )
1895 return result
1897 def move(self, source_url: str, destination_url: str, overwrite: bool = False) -> HTTPResponse:
1898 """Send a webDAV MOVE request and return the response unmodified.
1900 Parameters
1901 ----------
1902 source_url : `str`
1903 Source URL.
1904 destination_url : `str`
1905 Destination URL.
1906 overwrite : `bool`, optional
1907 Overwrite the destination if it exists.
1909 Returns
1910 -------
1911 resp : `HTTPResponse`
1912 The unmodified response received from the server.
1913 """
1914 headers = {
1915 "Destination": destination_url,
1916 "Overwrite": "T" if overwrite else "F",
1917 }
1918 return self._move(source_url, headers=headers)
1920 def read_dir(self, url: str) -> list[DavFileMetadata]:
1921 """Return the properties of the files or directories contained in
1922 directory located at `url`.
1924 If `url` designates a file, only the details of itself are returned.
1926 Parameters
1927 ----------
1928 url : `str`
1929 Target URL.
1931 Returns
1932 -------
1933 result: `list[DavResourceMetadata]`
1934 List of details of each file or directory within `url`.
1935 """
1936 body = (
1937 """<?xml version="1.0" encoding="utf-8"?>"""
1938 """<D:propfind xmlns:D="DAV:"><D:prop>"""
1939 """<D:resourcetype/><D:getcontentlength/><D:getlastmodified/><D:displayname/>"""
1940 """</D:prop></D:propfind>"""
1941 )
1942 resp = self.propfind(url, body=body, depth="1")
1943 match resp.status:
1944 case HTTPStatus.MULTI_STATUS:
1945 pass
1946 case HTTPStatus.NOT_FOUND:
1947 raise FileNotFoundError(f"No directory found at {resp.geturl()}")
1948 case _:
1949 raise unexpected_status_error("PROPFIND", url, resp)
1951 if (path := parse_url(url).path) is not None:
1952 this_dir_href = path.rstrip("/") + "/"
1953 else:
1954 this_dir_href = "/"
1956 result = []
1957 for property in self._propfind_parser.parse(resp.data):
1958 # Don't include in the results the metadata of the directory we
1959 # traversing.
1960 # Some webDAV servers do not append a "/" to the href of a
1961 # directory in their response to PROPFIND, so we must take into
1962 # account that.
1963 if property.is_file:
1964 result.append(DavFileMetadata.from_property(base_url=self._base_url, property=property))
1965 elif property.is_dir and property.href != this_dir_href:
1966 result.append(DavFileMetadata.from_property(base_url=self._base_url, property=property))
1968 return result
1970 def read(self, url: str) -> tuple[str, bytes]:
1971 """Download the contents of file located at `url`.
1973 Parameters
1974 ----------
1975 url : `str`
1976 Target URL.
1978 Returns
1979 -------
1980 url: `str`
1981 Backend URL from which the data was obtained.
1982 data: `bytes`
1983 Contents of the file.
1985 Notes
1986 -----
1987 The caller must ensure that the resource at `url` is a file, not
1988 a directory.
1989 """
1990 backend_url, resp = self.get(url)
1991 return backend_url, resp.data
1993 def read_range(
1994 self,
1995 url: str,
1996 start: int,
1997 end: int | None,
1998 headers: dict[str, str] | None = None,
1999 release_backend: bool = True,
2000 ) -> tuple[str, bytes]:
2001 """Download partial content of file located at `url`.
2003 Parameters
2004 ----------
2005 url : `str`
2006 Target URL.
2007 start : `int`
2008 Starting byte offset of the range to download.
2009 end : `int`
2010 Ending byte offset of the range to download.
2011 headers : `dict[str,str]`, optional
2012 Specific headers to sent with the GET request.
2013 release_backend : `bool`, optional
2014 Whether or not to close the connection to the backend.
2016 Returns
2017 -------
2018 backend_url: `str`
2019 URL used to retrieve this data. If the server redirected us
2020 this is the URL we were redirected to.
2021 data: `bytes`
2022 Partial contents of the file.
2024 Notes
2025 -----
2026 The caller must ensure that the resource at `url` is a file, not
2027 a directory. This is important because some webDAV servers respond
2028 with an HTML document when asked for reading a directory.
2029 """
2030 range_headers = {"Accept-Encoding": "identity"}
2031 if end is None:
2032 range_headers.update({"Range": f"bytes={start}-"})
2033 else:
2034 range_headers.update({"Range": f"bytes={start}-{end}"})
2036 frontend_headers = {} if headers is None else dict(headers)
2037 frontend_headers.update(range_headers)
2039 # Send the request to the frontend server and don't follow
2040 # redirections automatically. We need to be able to add a
2041 # "Connection: close" request header when sending the request to the
2042 # backend server (if any) if are requested to. We don't send that
2043 # header to the frontend.
2044 final_url, resp = self.get(url, headers=frontend_headers, redirect=False)
2045 match resp.status:
2046 case HTTPStatus.PARTIAL_CONTENT:
2047 return final_url, resp.data
2048 case status if status not in resp.REDIRECT_STATUSES:
2049 raise unexpected_status_error("GET (with 'Range' header)", url, resp)
2050 case _:
2051 pass
2053 # We were redirected to a backend server. Follow the redirection and
2054 # if requested add a "Connection: close" header to explicitly release
2055 # the backend server.
2056 redirect_url = resp.headers.get("Location")
2057 log.debug("GET request to %s got redirected to %s", url, redirect_url)
2059 backend_headers = {} if headers is None else dict(headers)
2060 backend_headers.update(range_headers)
2061 backend_headers.update({"Connection": "close" if release_backend else "keep-alive"})
2063 final_url, resp = self.get(redirect_url, headers=backend_headers)
2064 match resp.status:
2065 case HTTPStatus.PARTIAL_CONTENT:
2066 return final_url, resp.data
2067 case _:
2068 raise unexpected_status_error("GET (with 'Range' header)", redirect_url, resp)
2070 def _write_response_body_to_file(self, resp: HTTPResponse, filename: str, chunk_size: int) -> int:
2071 """Write the the response body to a local file.
2073 Parameters
2074 ----------
2075 resp : `HTTPResponse`
2076 The HTTP Response to read the body from.
2077 filename : `str`
2078 Local file to write the content to. If the file already exists,
2079 it will be rewritten.
2080 chunk_size : `int`
2081 Size of the chunks to write to `filename`.
2083 Returns
2084 -------
2085 count: `int`
2086 Number of bytes written to `filename`.
2087 """
2088 try:
2089 # Read the response body into a pre-allocated memory buffer and
2090 # write the buffer content to the destination file avoiding
2091 # copies if possible.
2092 content_length = 0
2093 with open(filename, "wb", buffering=0) as file:
2094 view = memoryview(bytearray(chunk_size))
2095 while True:
2096 if (count := resp.readinto(view)) > 0: # type: ignore
2097 content_length += count
2098 file.write(view[:count])
2099 else:
2100 break
2102 # Check that the expected and actual content lengths match.
2103 # Perform this check only when the body of the response was not
2104 # encoded by the server.
2105 expected_length: int = int(resp.headers.get("Content-Length", -1))
2106 if (
2107 "Content-Encoding" not in resp.headers
2108 and expected_length != -1
2109 and expected_length != content_length
2110 ):
2111 raise ValueError(
2112 f"Size of downloaded file does not match value in Content-Length header for "
2113 f"{resp.geturl()}: expecting {expected_length} and got {content_length} bytes"
2114 )
2116 return content_length
2117 finally:
2118 # Release the connection
2119 resp.drain_conn()
2120 resp.release_conn()
2122 def download(self, url: str, filename: str, chunk_size: int) -> int:
2123 """Download the content of a file and write it to local file.
2125 Parameters
2126 ----------
2127 url : `str`
2128 Target URL.
2129 filename : `str`
2130 Local file to write the content to. If the file already exists,
2131 it will be rewritten.
2132 chunk_size : `int`
2133 Size of the chunks to write to `filename`.
2135 Returns
2136 -------
2137 count: `int`
2138 Number of bytes written to `filename`.
2140 Notes
2141 -----
2142 The caller must ensure that the resource at `url` is a file, not
2143 a directory.
2144 """
2145 _, resp = self.get(url, preload_content=False)
2146 return self._write_response_body_to_file(resp, filename, chunk_size)
2148 def write(self, url: str, data: BinaryIO | bytes) -> int | None:
2149 """Create or rewrite a remote file at `url` with `data` as its
2150 contents.
2152 Parameters
2153 ----------
2154 url : `str`
2155 Target URL.
2156 data : `bytes`
2157 Sequence of bytes to upload.
2159 Returns
2160 -------
2161 size : `int | None`
2162 The size in bytes of the file uploaded. Can be `None` if the size
2163 could not be retrieved.
2165 Notes
2166 -----
2167 If a file already exists at `url` it will be rewritten.
2168 """
2169 # According to RFC 4918, the parent directory of the file must
2170 # exist before we can write to it. So create it first and then
2171 # upload.
2172 self.mkcol(self._parent(url))
2174 try:
2175 # Upload to a temporary file and rename to the final name.
2176 temporary_url = self._make_temporary_url(url)
2177 size = self.put(temporary_url, data=data)
2178 self.rename(temporary_url, url, overwrite=True, create_parent=False)
2180 # Update the file size cache with this size
2181 self._file_size_cache.update_size(url, size)
2182 return size
2183 except Exception:
2184 # Upload failed. Attempt to remove the temporary file.
2185 self.delete(temporary_url)
2186 raise
2188 def checksums(self, url: str) -> dict[str, str]:
2189 """Return the checksums of the contents of file located at `url`.
2191 The checksums are retrieved from the storage endpoint. There may be
2192 none if the storage endpoint does not automatically expose the
2193 checksums it computes.
2195 Parameters
2196 ----------
2197 url : `str`
2198 Target URL.
2200 Returns
2201 -------
2202 checksums: `dict[str, str]`
2203 A file exists at `url`.
2204 The key of the dictionary is the lowercased name of the checksum
2205 algorithm (e.g. "md5", "adler32"). The value is the lowercased
2206 checksum itself (e.g. "78441cec2479ec8b545c4d6699f542da").
2207 """
2208 stat = self.stat(url)
2209 if not stat.exists:
2210 raise FileNotFoundError(f"No file found at {url}")
2212 return stat.checksums if stat.is_file else {}
2214 def delete(self, url: str) -> None:
2215 """Delete the file or directory at `url`.
2217 If there is no file or directory at `url` is not considered an error.
2219 Parameters
2220 ----------
2221 url : `str`
2222 Target URL.
2224 Notes
2225 -----
2226 If `url` designates a directory, some webDAV servers recursively
2227 remove the directory and its contents. Others, only remove the
2228 directory if it is empty.
2230 For a consisten behavior, the caller must check what kind of object
2231 the target URL is and walk the hierarchy removing all objects.
2232 """
2233 resp = self._delete(url)
2234 match resp.status:
2235 case HTTPStatus.OK | HTTPStatus.ACCEPTED | HTTPStatus.NO_CONTENT | HTTPStatus.NOT_FOUND:
2236 # Invalidate the entry for this file in our cache, if any
2237 self._file_size_cache.invalidate(url)
2238 case _:
2239 raise ValueError(
2240 f"Unable to delete resource {resp.geturl()}: status {resp.status} {resp.reason}"
2241 )
2243 def accepts_ranges(self, url: str) -> bool:
2244 """Return `True` if the server supports a 'Range' header in
2245 GET requests against `url`.
2247 Parameters
2248 ----------
2249 url : `str`
2250 Target URL.
2251 """
2252 # If we have already determined that the server accepts "Range" for
2253 # another URL, we assume that it implements that feature for any
2254 # file it serves, so reuse that information.
2255 if self._accepts_ranges is not None:
2256 return self._accepts_ranges
2258 with self._lock:
2259 if self._accepts_ranges is None:
2260 self._accepts_ranges = self.head(url).headers.get("Accept-Ranges", "") == "bytes"
2262 return self._accepts_ranges
2264 @property
2265 def supports_duplicate(self) -> bool:
2266 """Return True if the server this client interacts with implements
2267 webDAV COPY method.
2268 """
2269 return self._can_duplicate
2271 def copy(self, source_url: str, destination_url: str, overwrite: bool = False) -> None:
2272 """Copy the file at `source_url` to `destination_url` in the same
2273 storage endpoint.
2275 Parameters
2276 ----------
2277 source_url : `str`
2278 URL of the source file.
2279 destination_url : `str`
2280 URL of the destination file. Its parent directory must exist.
2281 overwrite : `bool`
2282 If True and a file exists at `destination_url` it will be
2283 overwritten. Otherwise an exception is raised.
2284 """
2285 headers = {
2286 "Destination": destination_url,
2287 "Overwrite": "T" if overwrite else "F",
2288 }
2289 resp = self._copy(source_url, headers=headers)
2290 match resp.status:
2291 case HTTPStatus.CREATED | HTTPStatus.NO_CONTENT:
2292 self._file_size_cache.invalidate(destination_url)
2293 case _:
2294 raise ValueError(
2295 f"Could not copy {resp.geturl()} to {destination_url}: status {resp.status} {resp.reason}"
2296 )
2298 def duplicate(self, source_url: str, destination_url: str, overwrite: bool = False) -> None:
2299 """Copy the file at `source_url` to `destination_url` in the same
2300 storage endpoint.
2302 Parameters
2303 ----------
2304 source_url : `str`
2305 URL of the source file.
2306 destination_url : `str`
2307 URL of the destination file. Its parent directory is created if
2308 necessary.
2309 overwrite : `bool`
2310 If True and a file exists at `destination_url` it will be
2311 overwritten. Otherwise an exception is raised.
2312 """
2313 # Check the source is a file
2314 if self.is_dir(source_url):
2315 raise NotImplementedError(f"copy is not implemented for directory {source_url}")
2317 # Create the destination's parent directory first because COPY may
2318 # fail if it does not exist, depending on the server implementation
2319 # of RFC 4918.
2320 destination_parent = self._parent(destination_url)
2321 self.mkcol(destination_parent)
2322 self.copy(source_url=source_url, destination_url=destination_url, overwrite=overwrite)
2324 def rename(
2325 self,
2326 source_url: str,
2327 destination_url: str,
2328 overwrite: bool = False,
2329 create_parent: bool = True,
2330 ) -> None:
2331 """Rename (move) the file at `source_url` to `destination_url` in the
2332 same storage endpoint.
2334 Parameters
2335 ----------
2336 source_url : `str`
2337 URL of the source file.
2338 destination_url : `str`
2339 URL of the destination file. Its parent directory must exist.
2340 overwrite : `bool`, optional
2341 If True and a file exists at `destination_url` it will be
2342 overwritten. Otherwise an exception is raised.
2343 create_parent : `bool`, optional
2344 Whether to create the parent.
2345 """
2346 # Create the destination's parent directory first because MOVE may
2347 # fail if it does not exist, depending on the server implementation
2348 # of RFC 4918.
2349 if create_parent:
2350 destination_parent = self._parent(destination_url)
2351 self.mkcol(destination_parent)
2353 resp = self.move(source_url=source_url, destination_url=destination_url, overwrite=overwrite)
2354 match resp.status:
2355 case HTTPStatus.OK | HTTPStatus.CREATED | HTTPStatus.NO_CONTENT:
2356 self._file_size_cache.invalidate(destination_url)
2357 case _:
2358 raise ValueError(
2359 f"""Could not move file {resp.geturl()} to {destination_url}: status {resp.status} """
2360 f"""{resp.reason}"""
2361 )
2363 def generate_presigned_get_url(self, url: str, expiration_time_seconds: int) -> str:
2364 """Return a pre-signed URL that can be used to retrieve this resource
2365 using an HTTP GET without supplying any access credentials.
2367 Parameters
2368 ----------
2369 url : `str`
2370 Target URL.
2371 expiration_time_seconds : `int`
2372 Number of seconds until the generated URL is no longer valid.
2374 Returns
2375 -------
2376 url : `str`
2377 HTTP URL signed for GET.
2378 """
2379 raise NotImplementedError(f"URL signing is not supported by server for {self}")
2381 def generate_presigned_put_url(self, url: str, expiration_time_seconds: int) -> str:
2382 """Return a pre-signed URL that can be used to upload a file to this
2383 path using an HTTP PUT without supplying any access credentials.
2385 Parameters
2386 ----------
2387 url : `str`
2388 Target URL.
2389 expiration_time_seconds : `int`
2390 Number of seconds until the generated URL is no longer valid.
2392 Returns
2393 -------
2394 url : `str`
2395 HTTP URL signed for PUT.
2396 """
2397 raise NotImplementedError(f"URL signing is not supported by server for {self}")
2400class ActivityCaveat(enum.Enum):
2401 """Helper class for enumerating accepted activity caveats for requesting
2402 macaroons for dCache or XRootD webDAV servers.
2403 """
2405 DOWNLOAD = 1
2406 UPLOAD = 2
2409class DavClientURLSigner(DavClient):
2410 """WebDAV client which supports signing of URL for upload and download.
2412 Instances of this class are thread-safe.
2414 Parameters
2415 ----------
2416 url : `str`
2417 Root URL of the storage endpoint
2418 (e.g. "https://host.example.org:1234/").
2419 config : `DavConfig`
2420 Configuration to initialize this client.
2421 accepts_ranges : `bool` | `None`
2422 Indicate whether the remote server accepts the ``Range`` header in GET
2423 requests.
2424 """
2426 def __init__(self, url: str, config: DavConfig, accepts_ranges: bool | None = None) -> None:
2427 super().__init__(url=url, config=config, accepts_ranges=accepts_ranges)
2429 def generate_presigned_get_url(self, url: str, expiration_time_seconds: int) -> str:
2430 """Return a pre-signed URL that can be used to retrieve the resource
2431 at `url` using an HTTP GET without supplying any access credentials.
2433 Parameters
2434 ----------
2435 url : `str`
2436 URL of an existing file.
2437 expiration_time_seconds : `int`
2438 Number of seconds until the generated URL is no longer valid.
2440 Returns
2441 -------
2442 url : `str`
2443 HTTP URL signed for GET.
2445 Notes
2446 -----
2447 Although the returned URL allows for downloading the file at `url`
2448 without supplying credentials, the HTTP client must be configured
2449 to accept the certificate the server will present if the client wants
2450 validate it. The server's certificate may be issued by a certificate
2451 authority unknown to the client.
2452 """
2453 macaroon: str = self._get_macaroon(url, ActivityCaveat.DOWNLOAD, expiration_time_seconds)
2454 return f"{url}?authz={macaroon}"
2456 def generate_presigned_put_url(self, url: str, expiration_time_seconds: int) -> str:
2457 """Return a pre-signed URL that can be used to upload a file to `url`
2458 using an HTTP PUT without supplying any access credentials.
2460 Parameters
2461 ----------
2462 url : `str`
2463 URL of an existing file.
2464 expiration_time_seconds : `int`
2465 Number of seconds until the generated URL is no longer valid.
2467 Returns
2468 -------
2469 url : `str`
2470 HTTP URL signed for PUT.
2472 Notes
2473 -----
2474 Although the returned URL allows for uploading a file to `url`
2475 without supplying credentials, the HTTP client must be configured
2476 to accept the certificate the server will present if the client wants
2477 validate it. The server's certificate may be issued by a certificate
2478 authority unknown to the client.
2479 """
2480 macaroon: str = self._get_macaroon(url, ActivityCaveat.UPLOAD, expiration_time_seconds)
2481 return f"{url}?authz={macaroon}"
2483 def _get_macaroon(self, url: str, activity: ActivityCaveat, expiration_time_seconds: int) -> str:
2484 """Return a macaroon for uploading or downloading the file at `url`.
2486 Parameters
2487 ----------
2488 url : `str`
2489 URL of an existing file.
2490 activity : `ActivityCaveat`
2491 the activity the macaroon is requested for.
2492 expiration_time_seconds : `int`
2493 Requested duration of the macaroon, in seconds.
2495 Returns
2496 -------
2497 macaroon : `str`
2498 Macaroon to be used with `url` in a GET or PUT request.
2499 """
2500 # dCache and XRootD webDAV servers support delivery of macaroons.
2501 #
2502 # For details about dCache macaroons see:
2503 # https://www.dcache.org/manuals/UserGuide-9.2/macaroons.shtml
2504 match activity:
2505 case ActivityCaveat.DOWNLOAD:
2506 activity_caveat = "DOWNLOAD,LIST"
2507 case ActivityCaveat.UPLOAD:
2508 activity_caveat = "UPLOAD,LIST,DELETE,MANAGE"
2510 # Retrieve a macaroon for the requested activities and duration
2511 headers = {"Content-Type": "application/macaroon-request"}
2512 body = {
2513 "caveats": [
2514 f"activity:{activity_caveat}",
2515 ],
2516 "validity": f"PT{expiration_time_seconds}S",
2517 }
2518 resp = self._request("POST", url, headers=headers, body=json.dumps(body))
2519 if resp.status != HTTPStatus.OK:
2520 raise ValueError(
2521 f"Could not retrieve a macaroon for URL {resp.geturl()}, status: {resp.status} {resp.reason}"
2522 )
2524 # We are expecting the body of the response to be formatted in JSON.
2525 # dCache sets the 'Content-Type' of the response to 'application/json'
2526 # but XRootD does not set any 'Content-Type' header 8-[
2527 #
2528 # An example of a response body returned by dCache is shown below:
2529 # {
2530 # "macaroon": "MDA[...]Qo",
2531 # "uri": {
2532 # "targetWithMacaroon": "https://dcache.example.org/?authz=MD...",
2533 # "baseWithMacaroon": "https://dcache.example.org/?authz=MD...",
2534 # "target": "https://dcache.example.org/",
2535 # "base": "https://dcache.example.org/"
2536 # }
2537 # }
2538 #
2539 # An example of a response body returned by XRootD is shown below:
2540 # {
2541 # "macaroon": "MDA[...]Qo",
2542 # "expires_in": 86400
2543 # }
2544 try:
2545 response_body = json.loads(resp.data.decode())
2546 except json.JSONDecodeError:
2547 raise ValueError(f"Could not deserialize response to POST request for URL {resp.geturl()}")
2549 if "macaroon" in response_body:
2550 return response_body["macaroon"]
2552 raise ValueError(f"Could not retrieve macaroon for URL {resp.geturl()}")
2554 @override
2555 def duplicate(self, source_url: str, destination_url: str, overwrite: bool = False) -> None:
2556 """Copy the file at `source_url` to `destination_url` in the same
2557 storage endpoint.
2559 Parameters
2560 ----------
2561 source_url : `str`
2562 URL of the source file.
2563 destination_url : `str`
2564 URL of the destination file. Its parent directory must exist.
2565 overwrite : `bool`
2566 If True and a file exists at `destination_url` it will be
2567 overwritten. Otherwise an exception is raised.
2568 """
2569 # Check the source is a file
2570 if self.is_dir(source_url):
2571 raise NotImplementedError(f"copy is not implemented for directory {source_url}")
2573 # Neither dCache nor XrootD currently implement the COPY
2574 # webDAV method as documented in
2575 #
2576 # http://www.webdav.org/specs/rfc4918.html#METHOD_COPY
2577 #
2578 # (See issues DM-37603 and DM-37651 for details)
2579 # With those servers use third-party copy instead.
2580 return self._copy_via_third_party(source_url, destination_url, overwrite)
2582 def _copy_via_third_party(self, source_url: str, destination_url: str, overwrite: bool = False) -> None:
2583 """Copy the file at `source_url` to `destination_url` in the same
2584 storage endpoint using the third-party copy functionality
2585 implemented by dCache and XRootD servers.
2587 Parameters
2588 ----------
2589 source_url : `str`
2590 URL of the source file.
2591 destination_url : `str`
2592 URL of the destination file. Its parent directory must exist.
2593 overwrite : `bool`
2594 If True and a file exists at `destination_url` it will be
2595 overwritten. Otherwise an exception is raised.
2596 """
2597 # To implement COPY we use dCache's third-party copy mechanism
2598 # documented at:
2599 #
2600 # https://www.dcache.org/manuals/UserGuide-10.2/webdav.shtml#third-party-transfers
2601 #
2602 # The reason is that dCache does not correctly implement webDAV's COPY
2603 # method. See https://github.com/dCache/dcache/issues/6950
2605 # Create the destination's parent directory first because COPY may
2606 # fail if it does not exist, depending on the server implementation
2607 # of RFC 4918.
2608 destination_parent = self._parent(destination_url)
2609 self.mkcol(destination_parent)
2611 # Retrieve a macaroon for downloading the source
2612 download_macaroon = self._get_macaroon(source_url, ActivityCaveat.DOWNLOAD, 300)
2614 # Prepare and send the COPY request
2615 try:
2616 headers = {
2617 "Source": source_url,
2618 "TransferHeaderAuthorization": f"Bearer {download_macaroon}",
2619 "Credential": "none",
2620 "Depth": "0",
2621 "Overwrite": "T" if overwrite else "F",
2622 "RequireChecksumVerification": "false",
2623 }
2624 resp = self._copy(destination_url, headers=headers, preload_content=False)
2625 match resp.status:
2626 case HTTPStatus.CREATED:
2627 return
2628 case HTTPStatus.ACCEPTED:
2629 pass
2630 case _:
2631 raise ValueError(
2632 f"Unable to copy resource {resp.geturl()}; status: {resp.status} {resp.reason}"
2633 )
2635 # Analyse the response to the COPY request that the server has
2636 # not completed yet.
2637 content_type = resp.headers.get("Content-Type")
2638 if content_type != "text/perf-marker-stream":
2639 raise ValueError(
2640 f"""Unexpected Content-Type {content_type} in response to COPY request from """
2641 f"""{source_url} to {destination_url}"""
2642 )
2644 # Read the performance markers in the response body until we get
2645 # a "success" or "failure" notification.
2646 #
2647 # Documentation:
2648 # https://dcache.org/manuals/UserGuide-10.2/webdav.shtml#third-party-transfers
2649 for marker in io.TextIOWrapper(resp): # type: ignore
2650 marker = marker.rstrip("\n")
2651 if marker == "": # EOF
2652 raise ValueError(
2653 f"""Copying file from {source_url} to {destination_url} failed: """
2654 """could not get response from server"""
2655 )
2656 elif marker.startswith("failure:"):
2657 raise ValueError(
2658 f"""Copying file from {source_url} to {destination_url} failed with error: """
2659 f"""{marker}"""
2660 )
2661 elif marker.startswith("success:"):
2662 return
2663 finally:
2664 resp.drain_conn()
2667class DavClientDCache(DavClientURLSigner):
2668 """Client for interacting with a dCache webDAV server.
2670 Instances of this class are thread-safe.
2672 Parameters
2673 ----------
2674 url : `str`
2675 Root URL of the storage endpoint
2676 (e.g. "https://host.example.org:1234/").
2677 config : `DavConfig`
2678 Configuration to initialize this client.
2679 accepts_ranges : `bool` | `None`
2680 Indicate whether the remote server accepts the ``Range`` header in GET
2681 requests.
2682 """
2684 # Regular expression to parse dCache's response body of a successful
2685 # PUT request. Such a response body is of the form:
2686 #
2687 # "104857600 bytes uploaded\r\n\r\n"
2688 #
2689 rex: re.Pattern = re.compile(r"^(\d*) bytes uploaded", re.IGNORECASE | re.ASCII)
2691 def __init__(self, url: str, config: DavConfig, accepts_ranges: bool | None = None) -> None:
2692 super().__init__(url=url, config=config, accepts_ranges=accepts_ranges)
2694 # Create a specialized pool manager for sending requests to dCache
2695 # webdav door, in particular for retrieving metadata.
2696 #
2697 # As of dCache v10.2.14, the webDAV door leaves the network connection
2698 # unusable for us for sending subsequent requests after serving
2699 # GET, PUT, DELETE, etc., but leaves the connection intact after
2700 # serving MKCOL, MOVE and PROPFIND requests.
2701 # We take advantage of that by using a dedicated pool manager for
2702 # those requests, so that the network connections managed by that pool
2703 # be reused. This avoids establishing the TCP+TLS connection for each
2704 # request.
2705 pool_manager = self._make_pool_manager(self._config)
2706 self._propfind_pool_manager = pool_manager
2707 self._move_pool_manager = pool_manager
2708 self._mkcol_pool_manager = pool_manager
2710 # dCache does not deliver macaroons when we are not using a secure
2711 # channel to interact with the door. In that case, we can not use
2712 # third party copy and dCache does not correctly support the COPY
2713 # method as stated in RFC-4918.
2714 self._can_duplicate = self._base_url.startswith("https://")
2716 @override
2717 def _mkcol(
2718 self,
2719 url: str,
2720 headers: dict[str, str] | None = None,
2721 pool_manager: PoolManager | None = None,
2722 ) -> HTTPResponse:
2723 """Inherits doc string."""
2724 return self._request("MKCOL", url=url, headers=headers, pool_manager=self._mkcol_pool_manager)
2726 @override
2727 def _move(
2728 self,
2729 url: str,
2730 headers: dict[str, str] | None = None,
2731 pool_manager: PoolManager | None = None,
2732 ) -> HTTPResponse:
2733 """Inherits doc string."""
2734 return self._request("MOVE", url=url, headers=headers, pool_manager=self._move_pool_manager)
2736 @override
2737 def _propfind(
2738 self,
2739 url: str,
2740 headers: dict[str, str] | None = None,
2741 body: str = "",
2742 pool_manager: PoolManager | None = None,
2743 ) -> HTTPResponse:
2744 """Inherits doc string."""
2745 return self._request(
2746 "PROPFIND", url=url, headers=headers, body=body, pool_manager=self._propfind_pool_manager
2747 )
2749 @override
2750 def put(
2751 self,
2752 url: str,
2753 headers: dict[str, str] | None = None,
2754 data: BinaryIO | bytes = b"",
2755 ) -> int | None:
2756 # Docstring inherited.
2757 # Send a PUT request with empty body to the dCache frontend server to
2758 # get redirected to the backend.
2759 #
2760 # Details:
2761 # https://www.dcache.org/manuals/UserGuide-10.2/webdav.shtml#redirection
2762 frontend_headers = {} if headers is None else dict(headers)
2763 frontend_headers.update({"Content-Length": "0", "Expect": "100-continue"})
2764 if is_zero_length := isinstance(data, bytes) and len(data) == 0:
2765 # We are uploading an empty file. Don't send the "Expect" header
2766 # so that the dCache door handles this PUT request itself without
2767 # redirecting us to a pool.
2768 frontend_headers.pop("Expect")
2770 resp = self._put(url, headers=frontend_headers, body=b"", redirect=False)
2771 match resp.status:
2772 case HTTPStatus.OK | HTTPStatus.CREATED | HTTPStatus.NO_CONTENT:
2773 redirect_url = url
2774 case status if status in resp.REDIRECT_STATUSES:
2775 redirect_url = resp.headers.get("Location")
2776 case _:
2777 raise unexpected_status_error("PUT", url, resp)
2779 # If we are uploading an empty file, there is nothing more to do.
2780 if is_zero_length:
2781 return 0
2783 # We may have beend redirected to a backend server. Upload the file
2784 # contents to its final destination. Explicitly ask the server to close
2785 # this network connection after serving this PUT request to release
2786 # the associated dCache mover.
2787 backend_headers = {} if headers is None else dict(headers)
2788 backend_headers.update({"Connection": "close"})
2790 # Ask dCache to compute and record a checksum of the uploaded
2791 # file contents, for later integrity checks. Since we don't compute
2792 # the digest ourselves while uploading the data, we cannot control
2793 # after the request is complete that the data we uploaded is
2794 # identical to the data recorded by the server, but at least the
2795 # server has recorded a digest of the data it stored.
2796 #
2797 # See RFC-3230 for details and
2798 # https://www.iana.org/assignments/http-dig-alg/http-dig-alg.xhtml
2799 # for the list of supported digest algorithhms.
2800 if (checksum := self._config.request_checksum) is not None:
2801 backend_headers.update({"Want-Digest": checksum})
2803 resp = self._put(redirect_url, body=data, headers=backend_headers)
2804 match resp.status:
2805 case HTTPStatus.OK | HTTPStatus.CREATED | HTTPStatus.NO_CONTENT:
2806 # Parse the response body and extract the number of bytes
2807 # uploaded. This allows us to avoid sending a HEAD request
2808 # to retrieve the file size.
2809 response_body = resp.data.decode()
2810 if match := DavClientDCache.rex.match(response_body):
2811 return int(match.group(1))
2812 else:
2813 return None
2814 case _:
2815 raise unexpected_status_error("PUT", redirect_url, resp)
2817 @override
2818 def download(self, url: str, filename: str, chunk_size: int) -> int:
2819 """Download the content of a file and write it to local file.
2821 Parameters
2822 ----------
2823 url : `str`
2824 Target URL.
2825 filename : `str`
2826 Local file to write the content to. If the file already exists,
2827 it will be rewritten.
2828 chunk_size : `int`
2829 Size of the chunks to write to `filename`.
2831 Returns
2832 -------
2833 count: `int`
2834 Number of bytes written to `filename`.
2836 Notes
2837 -----
2838 The caller must ensure that the resource at `url` is a file, not
2839 a directory.
2840 """
2841 # Send a GET request without following redirection to get redirected
2842 # to the backend server.
2843 _, resp = self.get(url, preload_content=False, redirect=False)
2844 match resp.status:
2845 case HTTPStatus.OK:
2846 # We were not redirected. Consume this response.
2847 return self._write_response_body_to_file(resp, filename, chunk_size)
2848 case status if status not in resp.REDIRECT_STATUSES:
2849 raise unexpected_status_error("GET", url, resp)
2850 case _:
2851 # We were redirected. Follow this redirection.
2852 pass
2854 # Drain and release the response we received from the frontend server
2855 # so that the connection can be reused.
2856 resp.drain_conn()
2857 resp.release_conn()
2859 # We were redirected to a backend server. Send a GET request to the
2860 # backend server and ask it to close the HTTP connection to force
2861 # closing the network connection.
2862 redirect_url = resp.headers.get("Location")
2863 _, resp = self.get(redirect_url, headers={"Connection": "close"}, preload_content=False)
2864 match resp.status:
2865 case HTTPStatus.OK:
2866 return self._write_response_body_to_file(resp, filename, chunk_size)
2867 case _:
2868 raise unexpected_status_error("GET", redirect_url, resp)
2870 @override
2871 def read(self, url: str) -> tuple[str, bytes]:
2872 """Download the contents of file located at `url`.
2874 Parameters
2875 ----------
2876 url : `str`
2877 Target URL.
2879 Returns
2880 -------
2881 url: `str`
2882 Backend URL from which the data was obtained.
2883 data: `bytes`
2884 Contents of the file.
2886 Notes
2887 -----
2888 The caller must ensure that the resource at `url` is a file, not
2889 a directory.
2890 """
2891 # Send a GET request without following redirection to get redirected
2892 # to the backend server.
2893 backend_url, resp = self.get(url, redirect=False)
2894 match resp.status:
2895 case HTTPStatus.OK:
2896 return backend_url, resp.data
2897 case status if status in resp.REDIRECT_STATUSES:
2898 redirect_url = resp.headers.get("Location")
2899 case _:
2900 raise unexpected_status_error("GET", url, resp)
2902 # We were redirected. Send a GET request to the backend server
2903 # and ask it to close the HTTP connection to force closing the
2904 # network connection.
2905 final_url, resp = self.get(redirect_url, headers={"Connection": "close"})
2906 match resp.status:
2907 case HTTPStatus.OK:
2908 return final_url, resp.data
2909 case _:
2910 raise unexpected_status_error("GET", redirect_url, resp)
2912 @override
2913 def write(self, url: str, data: BinaryIO | bytes) -> int | None:
2914 """Create or rewrite a remote file at `url` with `data` as its
2915 contents.
2917 Parameters
2918 ----------
2919 url : `str`
2920 Target URL.
2921 data : `bytes`
2922 Sequence of bytes to upload.
2924 Returns
2925 -------
2926 size : `int | None`
2927 The size in bytes of the file uploaded. Can be `None` if the size
2928 could not be retrieved.
2930 Notes
2931 -----
2932 If a file already exists at `url` it will be rewritten.
2933 """
2934 # dCache will automatically create all the parent directories so we
2935 # don't need to explicitly create them. Although this is not compliant
2936 # to RFC 4918, this is advantageous because it avoids several
2937 # round-trips to the server for creating all the directories
2938 # before actually uploading the data.
2939 try:
2940 # Upload to a temporary file and rename to the final name.
2941 temporary_url = self._make_temporary_url(url)
2942 size = self.put(temporary_url, data=data)
2943 self.rename(temporary_url, url, overwrite=True, create_parent=False)
2945 # Update the file size cache with this size
2946 self._file_size_cache.update_size(url, size)
2947 return size
2948 except Exception:
2949 # Upload failed. Attempt to remove the temporary file.
2950 self.delete(temporary_url)
2951 raise
2953 @override
2954 def mkcol(self, url: str) -> None:
2955 """Create a directory at `url`.
2957 If a directory already exists at `url` no error is returned nor
2958 exception is raised. An exception is raised if a file exists at `url`.
2960 Parameters
2961 ----------
2962 url : `str`
2963 Target URL.
2964 """
2965 # A "MKCOL" request to dCache does not automatically create all
2966 # the intermediate directories if they do not exist. However, a
2967 # "PUT" request of a file does create the directory hierarchy.
2968 #
2969 # We exploit that to create directory hierarchies: we first create an
2970 # empty file with a random name and then we remove it. As a side
2971 # effect, the target directory will be created.
2972 #
2973 # Creating a directory this way implies two requests to the server
2974 # ("PUT" and "DELETE"), while using "MKCOL" would on average imply
2975 # one request per inexisting directory in the hierarchy. When
2976 # directory hierarchies are relatively deep, requiring two
2977 # requests per hierarchy is better than sending a "MKCOL" request
2978 # per directory in the hierarchy.
2979 try:
2980 temporary_url = self._make_temporary_url(url=f"{url}/mkcol")
2981 self.put(temporary_url, data=b"")
2982 finally:
2983 self.delete(temporary_url)
2985 @override
2986 def info(self, url: str, name: str | None = None) -> dict[str, Any]:
2987 # Docstring inherited.
2988 result: dict[str, Any] = {
2989 "name": name if name is not None else url,
2990 "type": None,
2991 "size": None,
2992 "last_modified": datetime.min,
2993 "checksums": {},
2994 }
2996 # Request live DAV properties as well as the checksums that dCache
2997 # recorded about this file.
2998 body = (
2999 """<?xml version="1.0" encoding="utf-8"?>"""
3000 """<D:propfind xmlns:D="DAV:" xmlns:dcache="http://www.dcache.org/2013/webdav">"""
3001 """<D:prop>"""
3002 """<D:resourcetype/>"""
3003 """<D:getcontentlength/>"""
3004 """<D:getlastmodified/>"""
3005 """<D:displayname/>"""
3006 """<dcache:Checksums/>"""
3007 """</D:prop>"""
3008 """</D:propfind>"""
3009 )
3010 resp = self.propfind(url, body=body, depth="0")
3011 match resp.status:
3012 case HTTPStatus.NOT_FOUND:
3013 return result
3014 case HTTPStatus.MULTI_STATUS:
3015 property = self._propfind_parser.parse(resp.data)[0]
3016 metadata = DavFileMetadata.from_property(base_url=self._base_url, property=property)
3017 result.update(
3018 {
3019 "type": "directory" if metadata.is_dir else "file",
3020 "size": metadata.size,
3021 "last_modified": metadata.last_modified,
3022 "checksums": metadata.checksums,
3023 }
3024 )
3025 return result
3026 case _:
3027 raise unexpected_status_error("PROPFIND", url, resp)
3030class DavClientXrootD(DavClientURLSigner):
3031 """Client for interacting with a XrootD webDAV server.
3033 Instances of this class are thread-safe.
3035 Parameters
3036 ----------
3037 url : `str`
3038 Root URL of the storage endpoint
3039 (e.g. "https://host.example.org:1234/").
3040 config : `DavConfig`
3041 Configuration to initialize this client.
3042 accepts_ranges : `bool` | `None`
3043 Indicate whether the remote server accepts the ``Range`` header in GET
3044 requests.
3045 """
3047 def __init__(self, url: str, config: DavConfig, accepts_ranges: bool | None = None) -> None:
3048 super().__init__(url=url, config=config, accepts_ranges=accepts_ranges)
3050 @override
3051 def put(
3052 self,
3053 url: str,
3054 headers: dict[str, str] | None = None,
3055 data: BinaryIO | bytes = b"",
3056 ) -> int | None:
3057 # Docstring inherited.
3058 # Send a PUT request with empty body to the XRootD frontend server to
3059 # get redirected to the backend.
3060 frontend_headers = {} if headers is None else dict(headers)
3061 frontend_headers.update({"Content-Length": "0", "Expect": "100-continue"})
3062 for attempt in range(max_attempts := 3):
3063 resp = self._put(url, headers=frontend_headers, body=b"", redirect=False)
3064 if resp.status in (
3065 HTTPStatus.OK,
3066 HTTPStatus.CREATED,
3067 HTTPStatus.NO_CONTENT,
3068 ):
3069 redirect_url = url
3070 break
3071 elif resp.status in resp.REDIRECT_STATUSES:
3072 redirect_url = resp.headers.get("Location")
3073 break
3074 elif resp.status == HTTPStatus.LOCKED:
3075 # Sometimes XRootD servers respond with status code LOCKED and
3076 # response body of the form:
3077 #
3078 # "Output file /path/to/file is already opened by 1 writer;
3079 # open denied."
3080 #
3081 # If we get such a response, try again, unless we reached
3082 # the maximum number of attempts.
3083 if attempt == max_attempts - 1:
3084 raise ValueError(
3085 f"""Unexpected response to HTTP request PUT {resp.geturl()}: status {resp.status} """
3086 f"""{resp.reason} [{resp.data.decode()}] after {max_attempts} attempts"""
3087 )
3089 # Wait a bit and try again
3090 log.warning(
3091 f"""got unexpected response status {HTTPStatus.LOCKED} Locked for PUT {resp.geturl()} """
3092 f"""(attempt {attempt}/{max_attempts}), retrying..."""
3093 )
3094 time.sleep((attempt + 1) * 0.100)
3095 continue
3096 else:
3097 raise unexpected_status_error("PUT", url, resp)
3099 # We were redirected to a backend server. Upload the file contents to
3100 # its final destination.
3102 # XRootD backend servers typically use a single port number for
3103 # accepting connections from clients. It is therefore beneficial
3104 # to keep those connections open, if the server allows.
3106 # Ask the server to compute and record a checksum of the uploaded
3107 # file contents, for later integrity checks. Since we don't compute
3108 # the digest ourselves while uploading the data, we cannot control
3109 # after the request is complete that the data we uploaded is
3110 # identical to the data recorded by the server, but at least the
3111 # server has recorded a digest of the data it stored.
3112 #
3113 # See RFC-3230 for details and
3114 # https://www.iana.org/assignments/http-dig-alg/http-dig-alg.xhtml
3115 # for the list of supported digest algorithhms.
3116 #
3117 # In addition, note that not all servers implement this RFC so
3118 # the checksum reqquest may be ignored by the server.
3119 backend_headers = {} if headers is None else dict(headers)
3120 if (checksum := self._config.request_checksum) is not None:
3121 backend_headers.update({"Want-Digest": checksum})
3123 resp = self._put(redirect_url, body=data, headers=backend_headers)
3124 match resp.status:
3125 case HTTPStatus.OK | HTTPStatus.CREATED | HTTPStatus.NO_CONTENT:
3126 # Send a HEAD request to retrieve the size of the file we
3127 # just uploaded.
3128 resp = self.head(redirect_url)
3129 size = int(resp.headers.get("Content-Length", -1))
3130 return None if size == -1 else size
3131 case _:
3132 raise unexpected_status_error("PUT", redirect_url, resp)
3134 @override
3135 def info(self, url: str, name: str | None = None) -> dict[str, Any]:
3136 # XRootD does not include checksums in the response to PROPFIND
3137 # request. We need to send a specific HEAD request to retrieve
3138 # the ADLER32 checksum.
3139 #
3140 # If found, the checksum is included in the response header "Digest",
3141 # which is of the form:
3142 #
3143 # Digest: adler32=0e4709f2
3144 result = super().info(url, name)
3145 if result["type"] == "file":
3146 headers: dict[str, str] = {"Want-Digest": "adler32"}
3147 resp = self.head(url=url, headers=headers)
3148 if (digest := resp.headers.get("Digest")) is not None:
3149 value = digest.split("=")[1]
3150 result["checksums"].update({"adler32": value})
3152 return result
3154 @override
3155 def write(self, url: str, data: BinaryIO | bytes) -> int | None:
3156 """Create or rewrite a remote file at `url` with `data` as its
3157 contents.
3159 Parameters
3160 ----------
3161 url : `str`
3162 Target URL.
3163 data : `bytes`
3164 Sequence of bytes to upload.
3166 Returns
3167 -------
3168 size : `int | None`
3169 The size in bytes of the file uploaded. Can be `None` if the size
3170 could not be retrieved.
3172 Notes
3173 -----
3174 If a file already exists at `url` it will be rewritten.
3175 """
3176 # XRootD will automatically create all the parent directories so we
3177 # don't need to explicitly create them. Although this is not compliant
3178 # to RFC 4918, this is advantageous because it avoids several
3179 # round-trips to the server for creating all the directories
3180 # before actually uploading the data.
3181 try:
3182 # Upload to a temporary file and rename to the final name.
3183 temporary_url = self._make_temporary_url(url)
3184 size = self.put(temporary_url, data=data)
3185 self.rename(temporary_url, url, overwrite=True, create_parent=False)
3187 # Update the file size cache with this size
3188 self._file_size_cache.update_size(url, size)
3189 return size
3190 except Exception:
3191 # Upload failed. Attempt to remove the temporary file.
3192 self.delete(temporary_url)
3193 raise
3195 @override
3196 def mkcol(self, url: str) -> None:
3197 """Create a directory at `url`.
3199 If a directory already exists at `url` no error is returned nor
3200 exception is raised. An exception is raised if a file exists at `url`.
3202 Parameters
3203 ----------
3204 url : `str`
3205 Target URL.
3206 """
3207 # XRootD automatically creates all the intermediate directories.
3208 resp = self._mkcol(url)
3209 match resp.status:
3210 case HTTPStatus.CREATED:
3211 return
3212 case HTTPStatus.METHOD_NOT_ALLOWED:
3213 # XRootD returns "405 Method Not Allowed" when either a file
3214 # or a directory already exists at `url`
3215 stat = self.stat(url)
3216 if stat.is_dir:
3217 # A directory exists at `url`. Nothing more to do.
3218 return
3219 elif stat.is_file:
3220 raise NotADirectoryError(
3221 f"Can not create a directory because a file already exists at {resp.geturl()}"
3222 )
3223 case _:
3224 raise ValueError(
3225 f"Can not create directory {resp.geturl()}: status {resp.status} {resp.reason}"
3226 )
3228 @override
3229 def stat(self, url: str) -> DavFileMetadata:
3230 # Docstring inherited.
3231 # XRootD v5.9.1 responds "200 OK" to a HEAD request against an
3232 # existing file. When the target URL is a directory, it also responds
3233 # "200 OK". In both cases the response header "Content-Length"
3234 # is present but has different meaning. If the target URL is a file,
3235 # the header value is the size in bytes of the file. If the target
3236 # URL is a directory, the header value is the number of items in
3237 # the directory.
3238 #
3239 # So there is not an easy way to determine if the target URL is a
3240 # file or a directory from the response to a HEAD request.
3241 #
3242 # When the target URL is a directory and we ask for a digest, the
3243 # server responds "409 Conflict". We use this behavior to
3244 # discriminate between a file and a directory.
3245 #
3246 # Note that XRootD does not include the "Last-Modified" header in the
3247 # response to a HEAD request so we cannot include the last modified
3248 # time in the value returned by this method.
3249 resp = self._head(url, headers={"Want-Digest": "adler32"})
3250 match resp.status:
3251 case HTTPStatus.OK:
3252 # There is a file at target URL
3253 if "Content-Length" in resp.headers:
3254 href = url.replace(self._base_url, "", 1)
3255 size = int(resp.headers.get("Content-Length"))
3256 return DavFileMetadata(self._base_url, href=href, exists=True, is_dir=False, size=size)
3257 else:
3258 raise ValueError(
3259 f"""Expecting Content-Length header to be present in """
3260 f"""response to HTTP HEAD {resp.geturl()}: status {resp.status} """
3261 f"""{resp.reason} [{resp.data.decode()}] but could not find it"""
3262 )
3263 case HTTPStatus.CONFLICT:
3264 # There is a directory at target URL
3265 href = url.replace(self._base_url, "", 1)
3266 return DavFileMetadata(self._base_url, href=href, exists=True, is_dir=True)
3267 case HTTPStatus.NOT_FOUND:
3268 # There is neither a file nor a directory at target URL
3269 return DavFileMetadata(base_url=url, exists=False)
3270 case _:
3271 raise unexpected_status_error("HEAD", url, resp)
3274class DavFileMetadata:
3275 """Container for attributes of interest of a webDAV file or directory.
3277 Parameters
3278 ----------
3279 base_url : `str`
3280 Base URL.
3281 href : `str`, optional
3282 Path component that can be added to the base URL.
3283 name : `str`, optional
3284 Name.
3285 exists : `bool`, optional
3286 Whether file or directory exist.
3287 size : `int`, optional
3288 Size of file.
3289 is_dir : `bool`, optional
3290 Whether the URL points to a directory or file.
3291 last_modified : `bool`, optional
3292 Last modified date.
3293 checksums : `dict` [ `str`, `str` ] | `None`, optional
3294 Checksums.
3295 """
3297 def __init__(
3298 self,
3299 base_url: str,
3300 href: str = "",
3301 name: str = "",
3302 exists: bool = False,
3303 size: int = -1,
3304 is_dir: bool = False,
3305 last_modified: datetime = datetime.min,
3306 checksums: dict[str, str] | None = None,
3307 ):
3308 self._url: str = base_url if not href else base_url.rstrip("/") + href
3309 self._href: str = href
3310 self._name: str = name
3311 self._exists: bool = exists
3312 self._size: int = size
3313 self._is_dir: bool = is_dir
3314 self._last_modified: datetime = last_modified
3315 self._checksums: dict[str, str] = {} if checksums is None else dict(checksums)
3317 @staticmethod
3318 def from_property(base_url: str, property: DavProperty) -> DavFileMetadata:
3319 """Create an instance from the values in `property`.
3321 Parameters
3322 ----------
3323 base_url : `str`
3324 Base URL.
3325 property : `DavProperty`
3326 Properties to associate with URL.
3327 """
3328 return DavFileMetadata(
3329 base_url=base_url,
3330 href=property.href,
3331 name=property.name,
3332 exists=property.exists,
3333 size=property.size,
3334 is_dir=property.is_dir,
3335 last_modified=property.last_modified,
3336 checksums=dict(property.checksums),
3337 )
3339 def __str__(self) -> str:
3340 return (
3341 f"""{self._url} {self._href} {self._name} {self._exists} {self._size} {self._is_dir} """
3342 f"""{self._checksums}"""
3343 )
3345 @property
3346 def url(self) -> str:
3347 return self._url
3349 @property
3350 def href(self) -> str:
3351 return self._href
3353 @property
3354 def name(self) -> str:
3355 return self._name
3357 @property
3358 def exists(self) -> bool:
3359 return self._exists
3361 @property
3362 def size(self) -> int:
3363 if not self._exists:
3364 return -1
3366 return 0 if self._is_dir else self._size
3368 @property
3369 def is_dir(self) -> bool:
3370 return self._exists and self._is_dir
3372 @property
3373 def is_file(self) -> bool:
3374 return self._exists and not self._is_dir
3376 @property
3377 def last_modified(self) -> datetime:
3378 return self._last_modified
3380 @property
3381 def checksums(self) -> dict[str, str]:
3382 return self._checksums
3385class DavProperty:
3386 """Helper class to encapsulate select live DAV properties of a single
3387 resource, as retrieved via a PROPFIND request.
3389 Parameters
3390 ----------
3391 response : `eTree.Element` or `None`
3392 The XML response defining the DAV property.
3393 """
3395 # Regular expression to compare against the 'status' element of a
3396 # PROPFIND response's 'propstat' element.
3397 _status_ok_rex = re.compile(r"^HTTP/.* 200 .*$", re.IGNORECASE)
3399 def __init__(self, response: eTree.Element | None):
3400 self._href: str = ""
3401 self._displayname: str = ""
3402 self._collection: bool = False
3403 self._getlastmodified: str = ""
3404 self._getcontentlength: int = -1
3405 self._checksums: dict[str, str] = {}
3407 if response is not None:
3408 self._parse(response)
3410 def _parse(self, response: eTree.Element) -> None:
3411 # Extract 'href'.
3412 if (element := response.find("./{DAV:}href")) is not None:
3413 # We need to use "str(element.text)"" instead of "element.text" to
3414 # keep mypy happy.
3415 self._href = str(element.text).strip()
3416 else:
3417 raise ValueError(
3418 "Property 'href' expected but not found in PROPFIND response: "
3419 f"{eTree.tostring(response, encoding='unicode')}"
3420 )
3422 for propstat in response.findall("./{DAV:}propstat"):
3423 # Only extract properties of interest with status OK.
3424 status = propstat.find("./{DAV:}status")
3425 if status is None or not self._status_ok_rex.match(str(status.text)):
3426 continue
3428 for prop in propstat.findall("./{DAV:}prop"):
3429 # Parse "collection".
3430 if (element := prop.find("./{DAV:}resourcetype/{DAV:}collection")) is not None:
3431 self._collection = True
3433 # Parse "getlastmodified".
3434 if (element := prop.find("./{DAV:}getlastmodified")) is not None:
3435 self._getlastmodified = str(element.text)
3437 # Parse "getcontentlength".
3438 if (element := prop.find("./{DAV:}getcontentlength")) is not None:
3439 self._getcontentlength = int(str(element.text))
3441 # Parse "displayname".
3442 if (element := prop.find("./{DAV:}displayname")) is not None:
3443 self._displayname = str(element.text)
3445 # Parse "Checksums"
3446 if (element := prop.find("./{http://www.dcache.org/2013/webdav}Checksums")) is not None:
3447 self._checksums = self._parse_checksums(element.text)
3449 # Some webDAV servers don't include the 'displayname' property in the
3450 # response so try to infer it from the value of the 'href' property.
3451 # Depending on the server the href value may end with '/'.
3452 if not self._displayname:
3453 self._displayname = os.path.basename(self._href.rstrip("/"))
3455 # Some webDAV servers do not append a "/" to the href of directories.
3456 # Ensure we include a single final "/" in our response.
3457 if self._collection:
3458 self._href = self._href.rstrip("/") + "/"
3460 # Force a size of 0 for collections.
3461 if self._collection:
3462 self._getcontentlength = 0
3464 def _parse_checksums(self, checksums: str | None) -> dict[str, str]:
3465 # checksums argument is of the form
3466 # md5=MyS/wljSzI9WYiyrsuyoxw==,adler32=23b104f2
3467 result: dict[str, str] = {}
3468 if checksums is not None:
3469 for checksum in checksums.split(","):
3470 if (pos := checksum.find("=")) != -1:
3471 algorithm, value = (checksum[:pos].lower(), checksum[pos + 1 :])
3472 if algorithm == "md5":
3473 # dCache documentation about how it encodes the
3474 # MD5 checksum:
3475 #
3476 # https://www.dcache.org/manuals/UserGuide-10.2/webdav.shtml#checksums
3477 result[algorithm] = bytes.hex(base64.standard_b64decode(value))
3478 else:
3479 result[algorithm] = value
3481 return result
3483 @property
3484 def exists(self) -> bool:
3485 # It is either a directory or a file with length of at least zero
3486 return self._collection or self._getcontentlength >= 0
3488 @property
3489 def is_dir(self) -> bool:
3490 return self._collection
3492 @property
3493 def is_file(self) -> bool:
3494 return not self._collection
3496 @property
3497 def last_modified(self) -> datetime:
3498 if not self._getlastmodified:
3499 return datetime.min
3501 # Last modified timestamp is of the form:
3502 # 'Wed, 12 Mar 2025 10:11:13 GMT'
3503 return datetime.strptime(self._getlastmodified, "%a, %d %b %Y %H:%M:%S %Z").replace(tzinfo=UTC)
3505 @property
3506 def size(self) -> int:
3507 return self._getcontentlength
3509 @property
3510 def name(self) -> str:
3511 return self._displayname
3513 @property
3514 def href(self) -> str:
3515 return self._href
3517 @property
3518 def checksums(self) -> dict[str, str]:
3519 return self._checksums
3522class DavPropfindParser:
3523 """Helper class to parse the response body of a PROPFIND request."""
3525 def __init__(self) -> None:
3526 return
3528 def parse(self, body: bytes) -> list[DavProperty]:
3529 """Parse the XML-encoded contents of the response body to a webDAV
3530 PROPFIND request.
3532 Parameters
3533 ----------
3534 body : `bytes`
3535 XML-encoded response body to a PROPFIND request.
3537 Returns
3538 -------
3539 responses : `list` [ `DavProperty` ]
3540 Parsed content of the response.
3542 Notes
3543 -----
3544 Is is expected that there is at least one reponse in `body`, otherwise
3545 this function raises.
3546 """
3547 # A response body to a PROPFIND request is of the form (indented for
3548 # readability):
3549 #
3550 # <?xml version="1.0" encoding="UTF-8"?>
3551 # <D:multistatus xmlns:D="DAV:">
3552 # <D:response>
3553 # <D:href>path/to/resource</D:href>
3554 # <D:propstat>
3555 # <D:prop>
3556 # <D:resourcetype>
3557 # <D:collection xmlns:D="DAV:"/>
3558 # </D:resourcetype>
3559 # <D:getlastmodified>
3560 # Fri, 27 Jan 2 023 13:59:01 GMT
3561 # </D:getlastmodified>
3562 # <D:getcontentlength>
3563 # 12345
3564 # </D:getcontentlength>
3565 # </D:prop>
3566 # <D:status>
3567 # HTTP/1.1 200 OK
3568 # </D:status>
3569 # </D:propstat>
3570 # </D:response>
3571 # <D:response>
3572 # ...
3573 # </D:response>
3574 # <D:response>
3575 # ...
3576 # </D:response>
3577 # </D:multistatus>
3579 # Scan all the 'response' elements and extract the relevant properties
3580 decoded_body: str = body.decode().strip()
3581 responses = []
3582 multistatus = eTree.fromstring(decoded_body)
3583 for response in multistatus.findall("./{DAV:}response"):
3584 responses.append(DavProperty(response))
3586 if responses:
3587 return responses
3588 else:
3589 # Could not parse the body
3590 raise ValueError(f"Unable to parse response for PROPFIND request: {decoded_body}")
3593class Authorizer:
3594 """Base class for attaching an 'Authorization' header to a HTTP request."""
3596 def set_authorization(self, headers: dict[str, str]) -> None:
3597 """Add the 'Authorization' header to `headers`.
3599 Parameters
3600 ----------
3601 headers : `dict` [ `str`, `str` ]
3602 Dict to augment with authorization information.
3604 Notes
3605 -----
3606 This method must be implemented by concrete subclasses.
3607 """
3608 raise NotImplementedError
3610 def _is_file_protected(self, filepath: str) -> bool:
3611 """Return true if the permissions of file at `filepath` only allow for
3612 access by its owner.
3614 Parameters
3615 ----------
3616 filepath : `str`
3617 Path of a local file.
3618 """
3619 if not os.path.isfile(filepath):
3620 return False
3622 mode = stat.S_IMODE(os.stat(filepath).st_mode)
3623 owner_accessible = bool(mode & stat.S_IRWXU)
3624 group_accessible = bool(mode & stat.S_IRWXG)
3625 other_accessible = bool(mode & stat.S_IRWXO)
3626 return owner_accessible and not group_accessible and not other_accessible
3628 def _read_if_modified_since(
3629 self, filename: str | None, timestamp: float
3630 ) -> tuple[str, float] | tuple[None, None]:
3631 """Read local file `filename` if its modification time is more
3632 recent than `timestamp`.
3634 Parameters
3635 ----------
3636 filename : `str`, optional
3637 Path of a local file.
3639 timestamp: `float`, optional
3640 Timestamp to compare against the last modification time of
3641 `filename`. The contents of file at `filename` is only read if its
3642 modification time is more recent than `timestamp`.
3644 Returns
3645 -------
3646 result: `tuple[str, float]`
3647 tuple of (contents of file `filename`, timestamp of the read
3648 operation).
3650 If `filename` is `None`, the returned value is `tuple[None, None]`.
3651 """
3652 if filename is None:
3653 return (None, None)
3655 if os.stat(filename).st_mtime < timestamp:
3656 return (None, None)
3658 with open(filename) as file:
3659 time_of_last_read = time.time()
3660 return (file.read().rstrip("\n"), time_of_last_read)
3663class TokenAuthorizer(Authorizer):
3664 """Attach a bearer token 'Authorization' header to each request.
3666 Parameters
3667 ----------
3668 token : `str`
3669 Can be either the path to a local file which contains the
3670 value of the token or the token itself. If `token` is a file
3671 it must be protected so that only the owner can read and write it.
3672 """
3674 def __init__(self, token: str | None = None) -> None:
3675 self._token = self._token_path = None
3676 self._time_of_last_read: float = -1.0
3677 if token is None:
3678 return
3680 self._token = token
3681 if os.path.isfile(token):
3682 self._token_path = os.path.abspath(token)
3683 if not self._is_file_protected(self._token_path):
3684 raise PermissionError(
3685 f"""Authorization token file at {self._token_path} must be protected for access only """
3686 """by its owner"""
3687 )
3688 self._update_token()
3690 def _update_token(self) -> None:
3691 """Read the token file (if any) if its modification time is more recent
3692 than the last time we read it.
3693 """
3694 if self._token_path is None:
3695 return None
3697 token, time_of_last_read = self._read_if_modified_since(self._token_path, self._time_of_last_read)
3698 if token is None or time_of_last_read is None:
3699 return
3701 # Update the token value and the last time we read it.
3702 self._token = token
3703 self._time_of_last_read = time_of_last_read
3705 @override
3706 def set_authorization(self, headers: dict[str, str]) -> None:
3707 """Add the 'Authorization' header to `headers`.
3709 Parameters
3710 ----------
3711 headers : `dict` [ `str`, `str` ]
3712 Dict to augment with authorization information.
3713 """
3714 if self._token is None:
3715 return
3717 self._update_token()
3718 headers["Authorization"] = f"Bearer {self._token}"
3721class BasicAuthorizer(Authorizer):
3722 """Attach a 'Authorization' header to each request using Basic
3723 authentication.
3725 Parameters
3726 ----------
3727 user_name : `str`
3728 Can be either the path to a local file which contains the
3729 user name or the user name itself. If `user_name` is a file
3730 it must be protected so that only the owner can read and write it.
3731 user_password : `str`
3732 Can be either the path to a local file which contains the
3733 value of the password or the password itself. If `user_password` is a
3734 file it must be protected so that only the owner can read and write it.
3735 """
3737 def __init__(self, user_name: str | None = None, user_password: str | None = None) -> None:
3738 if user_name is None or user_password is None:
3739 return
3741 self._user_name: str | None = user_name
3742 self._user_password: str | None = user_password
3743 self._user_password_path: str | None = None
3744 self._time_of_last_read: float = -1.0
3745 self._header_value: str = ""
3747 if os.path.isfile(self._user_password):
3748 # The value in `user_password` is the path to a file. Check
3749 # the file is protected and read its contents.
3750 self._user_password_path = os.path.abspath(self._user_password)
3751 if not self._is_file_protected(self._user_password_path):
3752 raise PermissionError(
3753 f"""Password file at {self._user_password_path} must be protected for access only """
3754 """by its owner"""
3755 )
3756 self._update_password()
3757 else:
3758 self._update_header_value()
3760 def _update_header_value(self) -> None:
3761 """Compute the value of the 'Authorization' header using HTTP basic
3762 authorization.
3763 """
3764 basic_auth_header = make_headers(basic_auth=f"{self._user_name}:{self._user_password}")
3765 self._header_value = basic_auth_header["authorization"]
3767 def _update_password(self) -> None:
3768 """Update the password of this authorizer if the file it is stored in
3769 has been modified since the last time we read it.
3770 """
3771 if self._user_password_path is None:
3772 return None
3774 password, time_of_last_read = self._read_if_modified_since(
3775 self._user_password_path, self._time_of_last_read
3776 )
3777 if password is None or time_of_last_read is None:
3778 return
3780 # Update the password, the last time we read it and re-compute the
3781 # value of the "Authorization" header.
3782 self._user_password = password
3783 self._time_of_last_read = time_of_last_read
3784 self._update_header_value()
3786 @override
3787 def set_authorization(self, headers: dict[str, str]) -> None:
3788 """Add the 'Authorization' header to `headers`.
3790 Parameters
3791 ----------
3792 headers : `dict` [ `str`, `str` ]
3793 Dict to augment with authorization information.
3794 """
3795 if self._user_name is None or self._user_password is None:
3796 return
3798 self._update_password()
3799 headers["Authorization"] = self._header_value
3802def expand_vars(path: str | None) -> str | None:
3803 """Expand the environment variables in `path` and return the path with
3804 the value of the variable expanded.
3806 Parameters
3807 ----------
3808 path : `str` or `None`
3809 Abolute or relative path which may include an environment variable
3810 (e.g. '$HOME/path/to/my/file').
3812 Returns
3813 -------
3814 path: `str`
3815 The path with the values of the environment variables expanded.
3816 """
3817 return None if path is None else os.path.expandvars(path)
3820def dump_response(method: str, resp: HTTPResponse, dump_body: bool = False) -> None:
3821 """Dump response for debugging purposes.
3823 Parameters
3824 ----------
3825 method : `str`
3826 Method name to include in log output.
3827 resp : `HTTPResponse`
3828 Response to dump.
3829 dump_body : `bool`, optional
3830 Whether or not to issue a debug log message.
3831 """
3832 log.debug("%s %s", method, resp.geturl())
3833 log.debug(" %s %s", resp.status, resp.reason)
3835 for header, value in resp.headers.items():
3836 log.debug(" %s: %s", header, value)
3838 if dump_body:
3839 log.debug(" response body length: %d", len(resp.data.decode()))