Coverage for python/lsst/resources/davutils.py: 24%

1099 statements  

« prev     ^ index     » next       coverage.py v7.14.1, created at 2026-05-30 01:28 -0700

1# This file is part of lsst-resources. 

2# 

3# Developed for the LSST Data Management System. 

4# This product includes software developed by the LSST Project 

5# (https://www.lsst.org). 

6# See the COPYRIGHT file at the top-level directory of this distribution 

7# for details of code ownership. 

8# 

9# Use of this source code is governed by a 3-clause BSD-style 

10# license that can be found in the LICENSE file. 

11 

12from __future__ import annotations 

13 

14import base64 

15import enum 

16import io 

17import json 

18import logging 

19import os 

20import posixpath 

21import random 

22import re 

23import stat 

24import threading 

25import time 

26import uuid 

27import xml.etree.ElementTree as eTree 

28from datetime import UTC, datetime 

29from http import HTTPStatus 

30from typing import Any, BinaryIO 

31 

32try: 

33 from typing import override # Python 3.12+ 

34except ImportError: 

35 from typing_extensions import override # Python 3.11 

36 

37from urllib.parse import parse_qsl, urlencode, urlparse, urlunparse 

38 

39try: 

40 import fsspec 

41 from fsspec.spec import AbstractFileSystem 

42except ImportError: 

43 fsspec = None 

44 AbstractFileSystem = type 

45 

46import yaml 

47from astropy import units as u 

48from urllib3 import PoolManager, make_headers 

49from urllib3.response import HTTPResponse 

50from urllib3.util import Retry, Timeout, Url, parse_url 

51 

52from lsst.utils.logging import getLogger 

53from lsst.utils.timer import time_this 

54 

55# Use the same logger than `dav.py`. 

56log = getLogger(f"""{__name__.replace(".davutils", ".dav")}""") 

57 

58 

59def normalize_path(path: str | None) -> str: 

60 """Normalize a path intended to be part of a URL. 

61 

62 A path of the form "///a/b/c///../d/e/" would be normalized as "/a/b/d/e". 

63 The returned path is always absolute, i.e. starts by "/" and never 

64 ends by "/" except when the path is exactly "/" and does not contain 

65 "." nor "..". It does not contain consecutive "/" either. 

66 

67 Parameters 

68 ---------- 

69 path : `str`, optional 

70 Path to normalize (e.g., '/path/to/..///normalize/'). 

71 

72 Returns 

73 ------- 

74 url : `str` 

75 Normalized URL (e.g., '/path/normalize'). 

76 """ 

77 return "/" if not path else "/" + posixpath.normpath(path).lstrip("/") 

78 

79 

80def normalize_url(url: str, preserve_scheme: bool = False, preserve_path: bool = True) -> str: 

81 """Normalize a URL so that scheme be 'http' or 'https' and the URL path 

82 is normalized. 

83 

84 Parameters 

85 ---------- 

86 url : `str` 

87 URL to normalize (e.g., 'davs://example.org:1234///path/to//../dir/'). 

88 preserve_scheme : `bool` 

89 If True the scheme of `url` will be preserved. Otherwise the scheme 

90 of the returned normalized URL will be 'http' or 'https'. 

91 preserve_path : `bool` 

92 If True, the path of `url` will be preserved in the returned 

93 normalized URL, otherwise, the returned URL will have '/' as path. 

94 

95 Returns 

96 ------- 

97 url : `str` 

98 Normalized URL (e.g. 'https://example.org:1234/path/to/dir'). 

99 """ 

100 parsed = parse_url(url) 

101 if parsed.scheme is None: 

102 scheme = "http" 

103 else: 

104 scheme = parsed.scheme if preserve_scheme else parsed.scheme.replace("dav", "http") 

105 path = normalize_path(parsed.path) if preserve_path else "/" 

106 return Url(scheme=scheme, host=parsed.host, port=parsed.port, path=path).url 

107 

108 

109def redact_url(url: str) -> str: 

110 """Return a modified `url` with authorization query redacted. 

111 

112 The goal is that this method should be used for logging URLs to avoid 

113 leaking authorization tokens. 

114 

115 Parameters 

116 ---------- 

117 url : `str` 

118 URL to redact. 

119 

120 Returns 

121 ------- 

122 redacted_url : `str` 

123 For instance, when called with an URL like: 

124 

125 https://host.example.org:1234/a/b/c/file.data?key1=value1&key2=value2&authz=token#fragment 

126 

127 the returned value would be: 

128 

129 https://host.example.org:1234/a/b/c/file.data?key1=value1&key2=value2&authz=....#fragment 

130 """ 

131 parsed_url = urlparse(url) 

132 redacted_query: list[tuple[str, str]] = [] 

133 for pair in parse_qsl(parsed_url.query): 

134 redacted_query.append((pair[0], "...." if pair[0] == "authz" else pair[1])) 

135 

136 redacted_url = parsed_url._replace(query=urlencode(redacted_query)) 

137 return urlunparse(redacted_url) 

138 

139 

140class DavConfig: 

141 """Configurable settings a webDAV client must use when interacting with a 

142 particular storage endpoint. 

143 

144 Parameters 

145 ---------- 

146 config : `dict[str, str]` 

147 Dictionary of configurable settings for the webdav endpoint which 

148 base URL is `config["base_url"]`. 

149 

150 For instance, if `config["base_url"]` is 

151 

152 "davs://webdav.example.org:1234/" 

153 

154 any object of class `DavResourcePath` like 

155 

156 "davs://webdav.example.org:1234/path/to/any/file" 

157 

158 will use the settings in this configuration to configure its client. 

159 """ 

160 

161 # Timeout in seconds to establish a network connection with the remote 

162 # server. 

163 DEFAULT_TIMEOUT_CONNECT: float = 10.0 

164 

165 # Timeout in seconds to read the response to a request sent to a server. 

166 # This is total time for reading both the headers and the response body. 

167 # It must be large enough to allow for upload and download of files 

168 # of typical size the webdav client supports. 

169 DEFAULT_TIMEOUT_READ: float = 300.0 

170 

171 # Maximum number of network connections to persist against a single 

172 # "host:port" pair. If this endpoint client needs to issue more 

173 # simultaneous requests than this number, additional network connections 

174 # will be created but won't be persisted after use. 

175 DEFAULT_PERSISTENT_CONNECTIONS_PER_HOST: int = 20 

176 

177 # Size of the buffer (in mebibytes, i.e. 1024*1024 bytes) the webdav 

178 # client of this endpoint will use when sending requests and receiving 

179 # responses. 

180 DEFAULT_BUFFER_SIZE: int = 5 

181 

182 # Size of the block (in mebibytes, i.e. 1024*1024 bytes) the webdav 

183 # client of this endpoint will use for making partial reads. Each partial 

184 # read will request at least this number of bytes, unless the total size 

185 # of the file is lower than this value. 

186 DEFAULT_BLOCK_SIZE: int = 1 

187 

188 # Number of times to retry requests before failing. Retry happens only 

189 # under certain conditions. 

190 DEFAULT_RETRIES: int = 4 

191 

192 # Minimal and maximal retry backoff (in seconds) for the client to compute 

193 # the wait time before retrying a request. 

194 # A value in this interval is randomly selected as the backoff factor 

195 # every time a request is retried. 

196 DEFAULT_RETRY_BACKOFF_MIN: float = 1.0 

197 DEFAULT_RETRY_BACKOFF_MAX: float = 3.0 

198 

199 # Path to a directory or certificate bundle file where the certificates 

200 # of the trusted certificate authorities can be found. 

201 # Those certificates will be used by the client of the webdav endpoint 

202 # to verify the server's host certificate. 

203 # If None, the certificates trusted by the system are used. 

204 DEFAULT_TRUSTED_AUTHORITIES: str | None = None 

205 

206 # User name and password for the client to authenticate to the server. 

207 # If specified, HTTP basic authentication is used on all requests. 

208 DEFAULT_USER_NAME: str | None = None 

209 DEFAULT_USER_PASSWORD: str | None = None 

210 

211 # Path to the client certificate and associated private key the webdav 

212 # client must present to the server for authentication purposes. 

213 # If None, no client certificate is presented. 

214 DEFAULT_USER_CERT: str | None = None 

215 DEFAULT_USER_KEY: str | None = None 

216 

217 # Token the webdav client must sent to the server for authentication 

218 # purposes. The token may be the value of the token itself or the path 

219 # to a file where the token can be found. 

220 DEFAULT_TOKEN: str | None = None 

221 

222 # If this option is set to True, the webdav client attempts to reuse 

223 # the network connection to the server as long as possible. Note that 

224 # the server can unitaleraly decide to close the connection. 

225 # If disabled, the connection is closed after each request. 

226 DEFAULT_REUSE_CONNECTION: bool = True 

227 

228 # Default checksum algorithm to request the server to compute on every 

229 # file upload. Not al servers support this. 

230 # See RFC 3230 for details. 

231 DEFAULT_REQUEST_CHECKSUM: str | None = None 

232 

233 # If this option is set to True, the webdav client can return objects 

234 # compliant to the fsspec specification. 

235 # See: https://filesystem-spec.readthedocs.io 

236 DEFAULT_ENABLE_FSSPEC: bool = True 

237 

238 # If this option is set to True, memory usage is computed and reported 

239 # when executing in debug mode. Computing memory usage is costly, so only 

240 # set this when debugging. 

241 DEFAULT_COLLECT_MEMORY_USAGE: bool = False 

242 

243 # Accepted checksum algorithms. Must be lowercase. 

244 ACCEPTED_CHECKSUMS: list[str] = ["adler32", "md5", "sha-256", "sha-512"] 

245 

246 def __init__(self, config: dict | None = None) -> None: 

247 if config is None: 247 ↛ 250line 247 didn't jump to line 250 because the condition on line 247 was always true

248 config = {} 

249 

250 if (base_url := expand_vars(config.get("base_url"))) is None: 250 ↛ 253line 250 didn't jump to line 253 because the condition on line 250 was always true

251 self._base_url = "_default_" 

252 else: 

253 self._base_url = normalize_url(base_url, preserve_path=False) 

254 

255 self._timeout_connect: float = float(config.get("timeout_connect", DavConfig.DEFAULT_TIMEOUT_CONNECT)) 

256 self._timeout_read: float = float(config.get("timeout_read", DavConfig.DEFAULT_TIMEOUT_READ)) 

257 self._persistent_connections_per_host: int = int( 

258 config.get( 

259 "persistent_connections_per_host", 

260 DavConfig.DEFAULT_PERSISTENT_CONNECTIONS_PER_HOST, 

261 ) 

262 ) 

263 self._buffer_size: int = 1_048_576 * int(config.get("buffer_size", DavConfig.DEFAULT_BUFFER_SIZE)) 

264 self._block_size: int = 1_048_576 * int(config.get("block_size", DavConfig.DEFAULT_BLOCK_SIZE)) 

265 self._retries: int = int(config.get("retries", DavConfig.DEFAULT_RETRIES)) 

266 self._retry_backoff_min: float = float( 

267 config.get("retry_backoff_min", DavConfig.DEFAULT_RETRY_BACKOFF_MIN) 

268 ) 

269 self._retry_backoff_max: float = float( 

270 config.get("retry_backoff_max", DavConfig.DEFAULT_RETRY_BACKOFF_MAX) 

271 ) 

272 self._trusted_authorities: str | None = expand_vars( 

273 config.get("trusted_authorities", DavConfig.DEFAULT_TRUSTED_AUTHORITIES) 

274 ) 

275 self._user_name: str | None = expand_vars(config.get("user_name", DavConfig.DEFAULT_USER_NAME)) 

276 self._user_password: str | None = expand_vars( 

277 config.get("user_password", DavConfig.DEFAULT_USER_PASSWORD) 

278 ) 

279 self._user_cert: str | None = expand_vars(config.get("user_cert", DavConfig.DEFAULT_USER_CERT)) 

280 self._user_key: str | None = expand_vars(config.get("user_key", DavConfig.DEFAULT_USER_KEY)) 

281 self._token: str | None = expand_vars(config.get("token", DavConfig.DEFAULT_TOKEN)) 

282 self._reuse_connection: bool = config.get("reuse_connection", DavConfig.DEFAULT_REUSE_CONNECTION) 

283 self._enable_fsspec: bool = config.get("enable_fsspec", DavConfig.DEFAULT_ENABLE_FSSPEC) 

284 self._frontend_urls: list[str] = self._init_frontend_urls(config=config) 

285 self._collect_memory_usage: bool = config.get( 

286 "collect_memory_usage", DavConfig.DEFAULT_COLLECT_MEMORY_USAGE 

287 ) 

288 self._request_checksum: str | None = config.get( 

289 "request_checksum", DavConfig.DEFAULT_REQUEST_CHECKSUM 

290 ) 

291 if self._request_checksum is not None: 291 ↛ 292line 291 didn't jump to line 292 because the condition on line 291 was never true

292 self._request_checksum = self._request_checksum.lower() 

293 if self._request_checksum not in DavConfig.ACCEPTED_CHECKSUMS: 

294 raise ValueError( 

295 f"""Value for checksum algorithm {self._request_checksum} for storage endpoint """ 

296 f"""{self._base_url} is not among the accepted values: {DavConfig.ACCEPTED_CHECKSUMS}""" 

297 ) 

298 

299 def _init_frontend_urls(self, config: dict | None = None) -> list[str]: 

300 if config is None: 300 ↛ 301line 300 didn't jump to line 301 because the condition on line 300 was never true

301 return [] 

302 

303 # Initialize the URLs of the frontend servers, if present in 

304 # the configuration. 

305 frontend_urls: list[str] = [] 

306 for url in config.get("frontend_base_urls", []): 306 ↛ 308line 306 didn't jump to line 308 because the loop on line 306 never started

307 # Expand environment variables in this URL 

308 if (expanded_url := expand_vars(url)) is not None: 

309 frontend_urls.append(normalize_url(expanded_url, preserve_path=False)) 

310 

311 # Eliminate duplicate URLs. 

312 frontend_urls = list(set(frontend_urls)) 

313 

314 # Check that the scheme of this client's base URL is identical to 

315 # the scheme of the frontend server URLs. 

316 base_url_scheme = parse_url(self._base_url).scheme 

317 for url in frontend_urls: 317 ↛ 318line 317 didn't jump to line 318 because the loop on line 317 never started

318 if base_url_scheme != parse_url(url).scheme: 

319 raise ValueError( 

320 f"""inconsistent scheme in frontend URL {url} for endpoint """ 

321 f"""with base URL {self._base_url}""" 

322 ) 

323 

324 return frontend_urls 

325 

326 @property 

327 def base_url(self) -> str: 

328 return self._base_url 

329 

330 @property 

331 def timeout_connect(self) -> float: 

332 return self._timeout_connect 

333 

334 @property 

335 def timeout_read(self) -> float: 

336 return self._timeout_read 

337 

338 @property 

339 def persistent_connections_per_host(self) -> int: 

340 return self._persistent_connections_per_host 

341 

342 @property 

343 def buffer_size(self) -> int: 

344 return self._buffer_size 

345 

346 @property 

347 def block_size(self) -> int: 

348 return self._block_size 

349 

350 @property 

351 def retries(self) -> int: 

352 return self._retries 

353 

354 @property 

355 def retry_backoff_min(self) -> float: 

356 return self._retry_backoff_min 

357 

358 @property 

359 def retry_backoff_max(self) -> float: 

360 return self._retry_backoff_max 

361 

362 @property 

363 def trusted_authorities(self) -> str | None: 

364 return self._trusted_authorities 

365 

366 @property 

367 def token(self) -> str | None: 

368 return self._token 

369 

370 @property 

371 def reuse_connection(self) -> bool: 

372 return self._reuse_connection 

373 

374 @property 

375 def request_checksum(self) -> str | None: 

376 return self._request_checksum 

377 

378 @property 

379 def user_cert(self) -> str | None: 

380 return self._user_cert 

381 

382 @property 

383 def user_key(self) -> str | None: 

384 # If no user certificate was specified in the configuration, 

385 # ignore the private key, even if it was provided. 

386 if self._user_cert is None: 

387 return None 

388 

389 # If we have a user certificate but not a private key, assume the 

390 # private key is included in the same file as the user certificate. 

391 # That is typically the case when using a X.509 grid proxy as 

392 # client certificate. 

393 return self._user_cert if self._user_key is None else self._user_key 

394 

395 @property 

396 def user_name(self) -> str | None: 

397 return self._user_name 

398 

399 @property 

400 def user_password(self) -> str | None: 

401 return self._user_password 

402 

403 @property 

404 def enable_fsspec(self) -> bool: 

405 return self._enable_fsspec 

406 

407 @property 

408 def collect_memory_usage(self) -> bool: 

409 return self._collect_memory_usage 

410 

411 @property 

412 def frontend_urls(self) -> list[str]: 

413 return self._frontend_urls 

414 

415 

416class DavConfigPool: 

417 """Registry of configurable settings for all known webDAV endpoints. 

418 

419 Parameters 

420 ---------- 

421 filename : `list` [ `str` ] 

422 List of environment variables or file names to load the configuration 

423 from. The first file found in the list will be read and the 

424 configuration settings for all webDAV endpoints will be extracted 

425 from it. Other files will be ignored. 

426 

427 Each component of `filenames` can be an environment variable or 

428 the path of a file which itself can include an environment variable, 

429 e.g. '$HOME/path/to/config.yaml'. 

430 

431 The configuration file is a YAML file with the structure below: 

432 

433 - base_url: "davs://webdav1.example.org:1234/" 

434 persistent_connections_per_host: 10 

435 timeout_connect: 20.0 

436 timeout_read: 120.0 

437 retries: 3 

438 retry_backoff_min: 1.0 

439 retry_backoff_max: 3.0 

440 user_cert: "${X509_USER_PROXY}" 

441 user_key: "${X509_USER_PROXY}" 

442 token: "/path/to/bearer/token/file" 

443 trusted_authorities: "/etc/grid-security/certificates" 

444 buffer_size: 5 

445 enable_fsspec: false 

446 request_checksum: "md5" 

447 collect_memory_usage: false 

448 

449 - base_url: "davs://webdav2.example.org:1234/" 

450 user_name: "user" 

451 user_password: "password" 

452 persistent_connections_per_host: 5 

453 reuse_connection: false 

454 ... 

455 

456 All settings are optional. If no settings are found in the 

457 configuration file for a particular webDAV endpoint, sensible 

458 defaults will be used. 

459 

460 There is only a single instance of this class. This thead-safe 

461 singleton is intended to be initialized when the module is imported 

462 the first time. 

463 """ 

464 

465 _instance = None 

466 _lock = threading.Lock() 

467 

468 def __new__(cls, filename: str | None = None) -> DavConfigPool: 

469 if cls._instance is None: 469 ↛ 474line 469 didn't jump to line 474 because the condition on line 469 was always true

470 with cls._lock: 

471 if cls._instance is None: 471 ↛ 474line 471 didn't jump to line 474

472 cls._instance = super().__new__(cls) 

473 

474 return cls._instance 

475 

476 def __init__(self, filename: str | None = None) -> None: 

477 # Create a default configuration. This configuration is 

478 # used when a URL doest not match any of the endpoints in the 

479 # configuration. 

480 self._default_config: DavConfig = DavConfig() 

481 

482 # The key of this dictionary is the URL of the webDAV endpoint, 

483 # e.g. "davs://host.example.org:1234/" 

484 self._configs: dict[str, DavConfig] = {} 

485 

486 # Load the configuration from the file we have been provided with, 

487 # if any. 

488 if filename is None: 488 ↛ 489line 488 didn't jump to line 489 because the condition on line 488 was never true

489 return 

490 

491 # filename can be the name of an environment variable or a path. 

492 # A path can include environment variables 

493 # (e.g. "$HOME/path/to/config.yaml") or "~" 

494 # (e.g. "~/path/to/config.yaml") 

495 if (filename := os.getenv(filename)) is not None: 495 ↛ 497line 495 didn't jump to line 497 because the condition on line 495 was never true

496 # Expand environment variables and '~' in the file name, if any. 

497 filename = os.path.expandvars(filename) 

498 filename = os.path.expanduser(filename) 

499 with open(filename) as file: 

500 for config_item in yaml.safe_load(file): 

501 config = DavConfig(config_item) 

502 if config.base_url not in self._configs: 

503 self._configs[config.base_url] = config 

504 else: 

505 # We already have a configuration for the same 

506 # endpoint. That is likely a human error in 

507 # the configuration file. 

508 raise ValueError( 

509 f"""configuration file {filename} contains two configurations for """ 

510 f"""endpoint {config.base_url}""" 

511 ) 

512 

513 def get_config_for_url(self, url: str) -> DavConfig: 

514 """Return the configuration to use a webDAV client when interacting 

515 with the server which hosts the resource at `url`. 

516 

517 Parameters 

518 ---------- 

519 url : `str` 

520 URL for which to obtain a configuration. 

521 """ 

522 # Select the configuration for the endpoint of the provided URL. 

523 normalized_url: str = normalize_url(url, preserve_path=False) 

524 if (config := self._configs.get(normalized_url)) is not None: 

525 return config 

526 

527 # No config was found for the specified URL. Use the default. 

528 return self._default_config 

529 

530 def _destroy(self) -> None: 

531 """Destroy this class singleton instance. 

532 

533 Helper method to be used in tests to reset global configuration. 

534 """ 

535 with DavConfigPool._lock: 

536 DavConfigPool._instance = None 

537 

538 

539def make_retry(config: DavConfig) -> Retry: 

540 """Create a ``urllib3.util.Retry`` object from settings in `config`. 

541 

542 Parameters 

543 ---------- 

544 config : `DavConfig` 

545 Configurable settings for a webDAV storage endpoint. 

546 

547 Returns 

548 ------- 

549 retry : `urllib3.util.Retry` 

550 Retry object to he used when creating a ``urllib3.PoolManager``. 

551 """ 

552 backoff_min: float = config.retry_backoff_min 

553 backoff_max: float = config.retry_backoff_max 

554 retry = Retry( 

555 # Total number of retries to allow. Takes precedence over other 

556 # counts. 

557 total=2 * config.retries, 

558 # How many connection-related errors to retry on. 

559 connect=config.retries, 

560 # How many times to retry on read errors. 

561 read=config.retries, 

562 # Backoff factor to apply between attempts after the second try 

563 # (seconds). Compute a random jitter to prevent all the clients which 

564 # started at the same time (even on different hosts) to overwhelm the 

565 # server by sending requests at the same time. 

566 backoff_factor=backoff_min + (backoff_max - backoff_min) * random.random(), 

567 # How many times to retry on bad status codes. 

568 status=config.retries, 

569 # Set of uppercased HTTP method verbs that we should retry on. 

570 # We only automatically retry idempotent requests. 

571 allowed_methods=frozenset( 

572 [ 

573 "COPY", 

574 "DELETE", 

575 "GET", 

576 "HEAD", 

577 "MKCOL", 

578 "OPTIONS", 

579 "PROPFIND", 

580 "PUT", 

581 ] 

582 ), 

583 # HTTP status codes that we should force a retry on. 

584 status_forcelist=frozenset( 

585 [ 

586 HTTPStatus.TOO_MANY_REQUESTS, # 429 

587 HTTPStatus.INTERNAL_SERVER_ERROR, # 500 

588 HTTPStatus.BAD_GATEWAY, # 502 

589 HTTPStatus.SERVICE_UNAVAILABLE, # 503 

590 HTTPStatus.GATEWAY_TIMEOUT, # 504 

591 ] 

592 ), 

593 # Whether to respect "Retry-After" header on status codes defined 

594 # above. 

595 respect_retry_after_header=True, 

596 ) 

597 return retry 

598 

599 

600class DavClientPool: 

601 """Container of reusable webDAV clients, each one specifically configured 

602 to talk to a single storage endpoint. 

603 

604 Parameters 

605 ---------- 

606 config_pool : `DavConfigPool` 

607 Pool of all known webDAV client configurations. 

608 

609 Notes 

610 ----- 

611 There is a single instance of this class. This thead-safe singleton is 

612 intended to be initialized when the module is imported the first time. 

613 """ 

614 

615 _instance = None 

616 _lock = threading.Lock() 

617 

618 def __new__(cls, config_pool: DavConfigPool) -> DavClientPool: 

619 if cls._instance is None: 619 ↛ 624line 619 didn't jump to line 624 because the condition on line 619 was always true

620 with cls._lock: 

621 if cls._instance is None: 621 ↛ 624line 621 didn't jump to line 624

622 cls._instance = super().__new__(cls) 

623 

624 return cls._instance 

625 

626 def __init__(self, config_pool: DavConfigPool) -> None: 

627 self._config_pool: DavConfigPool = config_pool 

628 

629 # The key of this dictionnary is a path-stripped URL of the form 

630 # "davs://host.example.org:1234/". The value is a reusable 

631 # DavClient to interact with that endpoint. 

632 self._clients: dict[str, DavClient] = {} 

633 

634 def get_client_for_url(self, url: str) -> DavClient: 

635 """Return a client for interacting with the endpoint where `url` 

636 is hosted. 

637 

638 Parameters 

639 ---------- 

640 url : `str` 

641 URL for which to obtain a client. 

642 

643 Notes 

644 ----- 

645 The returned client is thread-safe. If a client for that endpoint 

646 already exists it is reused, otherwise a new client is created 

647 with the appropriate configuration for interacting with the storage 

648 endpoint. 

649 """ 

650 # If we already have a client for this endpoint reuse it. 

651 url = normalize_url(url, preserve_path=False) 

652 if (client := self._clients.get(url)) is not None: 

653 return client 

654 

655 # No client for this endpoint was found. Create a new one and save it 

656 # for serving subsequent requests. 

657 with DavClientPool._lock: 

658 # If another client was created in the meantime by another thread 

659 # reuse it. 

660 if (client := self._clients.get(url)) is not None: 

661 return client 

662 

663 config: DavConfig = self._config_pool.get_config_for_url(url) 

664 self._clients[url] = self._make_client(url, config) 

665 

666 return self._clients[url] 

667 

668 def _make_client(self, url: str, config: DavConfig) -> DavClient: 

669 """Make a webDAV client for interacting with the server at `url`.""" 

670 # Check the server implements webDAV protocol and retrieve its 

671 # identity so that we can build a client for that specific 

672 # server implementation. 

673 client = DavClient(url, config) 

674 server_details = client.get_server_details(url) 

675 server_id = server_details.get("Server", None) 

676 accepts_ranges: bool | str | None = server_details.get("Accept-Ranges", None) 

677 if accepts_ranges is not None: 

678 accepts_ranges = accepts_ranges == "bytes" 

679 

680 if server_id is None: 

681 # Create a generic webDAV client 

682 return DavClient(url, config, accepts_ranges) 

683 

684 if server_id.startswith("dCache/"): 

685 # Create a client for a dCache webDAV server 

686 return DavClientDCache(url, config, accepts_ranges) 

687 elif server_id.startswith("XrootD/"): 

688 # Create a client for a XrootD webDAV server 

689 return DavClientXrootD(url, config, accepts_ranges) 

690 else: 

691 # Return a generic webDAV client 

692 return DavClient(url, config, accepts_ranges) 

693 

694 def _destroy(self) -> None: 

695 """Destroy this class singleton instance. 

696 

697 Helper method to be used in tests to reset global configuration. 

698 """ 

699 with DavClientPool._lock: 

700 DavClientPool._instance = None 

701 

702 

703class DavFileSizeCache: 

704 """Helper class to cache file sizes of recently uploaded files. 

705 

706 Parameters 

707 ---------- 

708 default_timeout : `float`, optional 

709 Default validity period, in seconds, of the entries in this cache. 

710 The validity period for a specific entry can be specified when the 

711 entry is added to the cache (see `update_size` method). 

712 

713 Notes 

714 ----- 

715 There is a single instance of this class shared by several `DavClient` 

716 objects. This singleton is thread safe. 

717 

718 Caching file sizes helps preventing sending requests to the server for 

719 retrieving the size of recently uploaded files. This is in particular 

720 intended to efficiently serve `Butler` requests for the size of a file it 

721 just wrote to the datastore. 

722 """ 

723 

724 _instance = None 

725 _lock = threading.Lock() 

726 

727 def __new__(cls) -> DavFileSizeCache: 

728 if cls._instance is None: 

729 with cls._lock: 

730 if cls._instance is None: 

731 cls._instance = super().__new__(cls) 

732 

733 return cls._instance 

734 

735 def __init__(self, default_timeout: float = 60.0) -> None: 

736 # The key of the cache dictionnary is a URL of the form 

737 # 

738 # "https://host.example.org:1234/path/to/file". 

739 # 

740 # The value is a triplet (file_size, last_updated, timeout) where: 

741 # - 'file_size' is the size of the file in bytes, 

742 # - 'last_updated' is the time when this entry was added to the cache 

743 # or last updated, in seconds since epoch, 

744 # - 'timeout' is the validity period of this cache entry, in seconds, 

745 # understood from the moment the cache entry was created. 

746 with DavFileSizeCache._lock: 

747 if not hasattr(self, "_cache"): 

748 self._default_timeout: float = default_timeout 

749 self._cache: dict[str, tuple[int, float, float]] = {} 

750 

751 def invalidate(self, url: str) -> None: 

752 """Invalidate the cache entry for `url`, if any. 

753 

754 Parameters 

755 ---------- 

756 url : `str` 

757 URL of the file to invalidate which cache entry must be 

758 invalidated. 

759 """ 

760 with DavFileSizeCache._lock: 

761 self._cache.pop(url, None) 

762 

763 def update_size(self, url: str, size: int | None, timeout: float | None = None) -> None: 

764 """Update the cache with an entry for `url` which has a size of `size` 

765 bytes. This entry is considered valid for a period of `timeout` 

766 seconds from now. 

767 

768 Parameters 

769 ---------- 

770 url : `str` 

771 URL of the file the size to be cached. 

772 size : `size` or `None`, optional 

773 Size in bytes of the file at `url`. If this value is `None`, the 

774 cache is not modified. 

775 timeout : `float` or `None`, optional 

776 The validity period, in seconds, this size is to be considered 

777 valid. If not specified, the default value specified when this 

778 object was created will be used for this cache entry. 

779 """ 

780 if size is None: 

781 return 

782 

783 timeout = self._default_timeout if timeout is None else timeout 

784 with DavFileSizeCache._lock: 

785 self._cache[url] = (size, time.time(), timeout) 

786 

787 def get_size(self, url: str) -> int | None: 

788 """Retrieve the cached valued of the size of file at `url`. 

789 

790 Parameters 

791 ---------- 

792 url : `str` 

793 URL of the file to retrieve the size for. 

794 

795 Returns 

796 ------- 

797 `size`: `int` or `None` 

798 The cached value of the size of file at `url` if any value was 

799 found in the cache, `None` otherwise. 

800 `None` is also returned if there is a cached value but its 

801 validity period has expired. In this case, the entry associated to 

802 `url` is removed from the cache. 

803 """ 

804 with DavFileSizeCache._lock: 

805 if (entry := self._cache.get(url, None)) is None: 

806 # There is no entry in the cache for this URL 

807 return None 

808 

809 # There is an entry in the cache for this URL. Check that 

810 # its validity period has not yet expired. 

811 size, last_updated, timeout = entry 

812 if time.time() <= last_updated + timeout: 

813 # This entry is stil valid 

814 return size 

815 else: 

816 # This entry is no longer valid. Remove it from the cache. 

817 self._cache.pop(url) 

818 return None 

819 

820 

821def unexpected_status_error(method: str, url: str, resp: HTTPResponse) -> Exception: 

822 """Raise an exception from `resp`. 

823 

824 Parameters 

825 ---------- 

826 method : `str` 

827 The method name triggering the error. 

828 url : `str` 

829 The URL that cause the error. 

830 resp : `resp` 

831 The error response. 

832 """ 

833 message = f"Unexpected response to HTTP request {method} {redact_url(url)}: {resp.status} {resp.reason}" 

834 body = resp.data.decode() 

835 if len(body) > 0: 

836 message += f" [response body: {body}]" 

837 

838 return ValueError(message) 

839 

840 

841class DavClient: 

842 """WebDAV client, configured to talk to a single storage endpoint. 

843 

844 Instances of this class are thread-safe. 

845 

846 Parameters 

847 ---------- 

848 url : `str` 

849 Root URL of the storage endpoint (e.g. 

850 "https://host.example.org:1234/"). 

851 config : `DavConfig` 

852 Configuration to initialize this client. 

853 accepts_ranges : `bool` | `None` 

854 Indicate whether the remote server accepts the ``Range`` header in GET 

855 requests. 

856 """ 

857 

858 def __init__(self, url: str, config: DavConfig, accepts_ranges: bool | None = None) -> None: 

859 # Lock to protect this client fields from concurrent modification. 

860 self._lock = threading.Lock() 

861 

862 # Base URL of the server this client will interact with. 

863 # It is of the form: "davs://host.example.org:1234/" 

864 self._base_url: str = url 

865 

866 # Configuration settings for the storage endpoint this client 

867 # will interact with. 

868 self._config: DavConfig = config 

869 

870 # Make the authorizer for this client's requests. 

871 self._authorizer: Authorizer | None = self._make_authorizer(config=self._config) 

872 

873 # Make the pool manager for this client to use for sending 

874 # requests to the server. 

875 self._pool_manager: PoolManager = self._make_pool_manager(config=self._config) 

876 

877 # Parser of PROPFIND responses. 

878 self._propfind_parser: DavPropfindParser = DavPropfindParser() 

879 

880 # Does the remote server accept a "Range" header in GET requests? 

881 # This field is lazy initialized. 

882 self._accepts_ranges: bool | None = accepts_ranges 

883 

884 # Can this client use a COPY request to duplicate files within a 

885 # single webDAV server? 

886 # Subclasses can overwrite this setting according to the server 

887 # capabilities and compliance to webDAV RFC. 

888 self._can_duplicate: bool = True 

889 

890 # Cache to store sizes of files this client has recently uploaded 

891 # to the server. 

892 self._file_size_cache = DavFileSizeCache() 

893 

894 def _make_authorizer(self, config: DavConfig) -> Authorizer | None: 

895 # If a token was specified in the configuration settings for this 

896 # endpoint, prefer it as the authentication method, even if other 

897 # authentication settings were also specified. 

898 if config.token is not None: 

899 return TokenAuthorizer(token=config.token) 

900 elif config.user_name is not None and config.user_password is not None: 

901 return BasicAuthorizer(user_name=config.user_name, user_password=config.user_password) 

902 

903 return None 

904 

905 def _make_pool_manager(self, config: DavConfig) -> PoolManager: 

906 # Prepare the trusted authorities certificates 

907 ca_certs, ca_cert_dir = None, None 

908 if config.trusted_authorities is not None: 

909 if os.path.isdir(config.trusted_authorities): 

910 ca_cert_dir = config.trusted_authorities 

911 elif os.path.isfile(config.trusted_authorities): 

912 ca_certs = config.trusted_authorities 

913 else: 

914 raise FileNotFoundError( 

915 f"Trusted authorities file or directory {config.trusted_authorities} does not exist" 

916 ) 

917 

918 # If a token was specified for this endpoint don't use the 

919 # <user certificate, private key> pair, even if they were also 

920 # specified. 

921 user_cert, user_key = None, None 

922 if config.token is None: 

923 user_cert = config.user_cert 

924 user_key = config.user_key 

925 

926 # Pool manager for sending requests. Connections in this pool manager 

927 # are generally left open by the client but the front-end server may 

928 # choose to close them in some specific situations. For instance, 

929 # whe serving a PUT request, the front server may redirect to a 

930 # backend server and close the network connection making it 

931 # unsuable for subsequent requests. 

932 # 

933 # In addition, the client may also choose to explicitly close the 

934 # network connection after receiving a response. 

935 return PoolManager( 

936 # Number of connection pools to cache before discarding the least 

937 # recently used pool. Each connection pool manages network 

938 # connections to a single host, so this is basically the number 

939 # of "host:port" we persist network connections to. 

940 num_pools=200, 

941 # Number of connections to the same "host:port" to persist for 

942 # later reuse. More than 1 is useful in multithreaded situations. 

943 # If more than this number of network connections are needed at 

944 # a particular moment, they will be created and discarded after 

945 # use. 

946 maxsize=config.persistent_connections_per_host, 

947 # Retry configuration to use by default with requests sent to 

948 # host in the front end. 

949 retries=make_retry(config), 

950 # Socket timeout in seconds for each individual connection. 

951 timeout=Timeout( 

952 connect=config.timeout_connect, 

953 read=config.timeout_read, 

954 ), 

955 # Size in bytes of the buffer for reading/writing data from/to 

956 # the underlying socket. 

957 blocksize=config.buffer_size, 

958 # Client certificate and private key for esablishing TLS 

959 # connections. If None, no client certificate is sent to the 

960 # server. Only relevant for endpoints using secure HTTP protocol. 

961 cert_file=user_cert, 

962 key_file=user_key, 

963 # We require verification of the server certificate. 

964 cert_reqs="CERT_REQUIRED", 

965 # Directory where the certificates of the trusted certificate 

966 # authorities can be found. The contents of that directory 

967 # must be as expected by OpenSSL. 

968 ca_cert_dir=ca_cert_dir, 

969 # Path to a file of concatenated CA certificates in PEM format. 

970 ca_certs=ca_certs, 

971 ) 

972 

973 def get_server_details(self, url: str) -> dict[str, str]: 

974 """Retrieve the details of the server and check it advertises 

975 compliance to class 1 of webDAV protocol. 

976 

977 Parameters 

978 ---------- 

979 url : `str` 

980 URL to check. 

981 

982 Returns 

983 ------- 

984 details: `dic[str, str]` 

985 The keys of the returned dictionary can be "Server" and 

986 "Accept-Ranges". Any of those keys may not exist in the returned 

987 dictionary if the server did not include it in its response. 

988 

989 The values are the values of the corresponding 

990 headers found in the response to the OPTIONS request. 

991 Examples of values for the "Server" header are 'dCache/9.2.4' or 

992 'XrootD/v5.7.1'. 

993 """ 

994 # Check that the value "1" is part of the value of the "DAV" header in 

995 # the response to an 'OPTIONS' request. 

996 # 

997 # We don't rely on webDAV locks, so a server complying to class 1 is 

998 # enough for our purposes. All webDAV servers must advertise at least 

999 # compliance class "1". 

1000 # 

1001 # Compliance classes are documented in 

1002 # http://www.webdav.org/specs/rfc4918.html#dav.compliance.classes 

1003 # 

1004 # Examples of values for header DAV are: 

1005 # DAV: 1, 2 

1006 # DAV: 1, <http://apache.org/dav/propset/fs/1> 

1007 resp = self.options(url) 

1008 if "DAV" not in resp.headers: 

1009 raise ValueError(f"Server of {resp.geturl()} does not implement webDAV protocol") 

1010 

1011 if "1" not in resp.headers.get("DAV").replace(" ", "").split(","): 

1012 raise ValueError( 

1013 f"Server of {resp.geturl()} does not advertise required compliance to webDAV protocol class 1" 

1014 ) 

1015 

1016 # The value of 'Server' header is expected to be of the form 

1017 # 'dCache/9.2.4' or 'XrootD/v5.7.1'. Not all servers include such a 

1018 # header in their response to an OPTIONS request. 

1019 details: dict[str, str] = {} 

1020 for header in ("Server", "Accept-Ranges"): 

1021 value = resp.headers.get(header, None) 

1022 if value is not None: 

1023 details[header] = value 

1024 

1025 return details 

1026 

1027 def _get_response_url(self, resp: HTTPResponse, default_url: str) -> str: 

1028 """Return the URL that response `resp` was obtained from. 

1029 

1030 If `resp` contains no redirection history, return `default_url`. 

1031 """ 

1032 if resp.retries is None: 

1033 return default_url 

1034 

1035 if len(resp.retries.history) == 0: 

1036 return default_url 

1037 

1038 return str(resp.retries.history[-1].redirect_location) 

1039 

1040 def _rewrite_url_for_frontend(self, url: str) -> str: 

1041 """Return a URL to reach one of the frontend servers that serves 

1042 requests sent against `url`. 

1043 

1044 Parameters 

1045 ---------- 

1046 url : `str` 

1047 Target URL. 

1048 

1049 Returns 

1050 ------- 

1051 url: `str` 

1052 URL to reach one of this client's frontend servers. If `url` does 

1053 not target this client's frontend servers, the returned value 

1054 is `url` unmodified. 

1055 """ 

1056 # Do nothing if this URL does not match this client's base URL. This 

1057 # happens, for instance, when `url` is a redirection to a backend 

1058 # server, so we don't want to rewrite it. 

1059 # 

1060 # Also, don't rewrite the URL if we don't have frontends configured 

1061 # for this client. 

1062 if not self._config.frontend_urls or not url.startswith(self._base_url): 

1063 return url 

1064 

1065 # Randomly select one of the configured frontends and return a modified 

1066 # URL which uses the selected frontend instead of the original one. 

1067 return random.choice(self._config.frontend_urls) + url.removeprefix(self._base_url) 

1068 

1069 def _request( 

1070 self, 

1071 method: str, 

1072 url: str, 

1073 headers: dict[str, str] | None = None, 

1074 body: BinaryIO | bytes | str | None = None, 

1075 preload_content: bool = True, 

1076 redirect: bool = True, 

1077 pool_manager: PoolManager | None = None, 

1078 **kwargs: dict[Any, Any], 

1079 ) -> HTTPResponse: 

1080 """Send a generic HTTP request and return the response. 

1081 

1082 Parameters 

1083 ---------- 

1084 method : `str` 

1085 Request method, e.g. 'GET', 'PUT', 'PROPFIND'. 

1086 url : `str` 

1087 Target URL. 

1088 headers : `dict[str, str]`, optional 

1089 Headers to sent with the request. 

1090 body : `bytes` or `str` or `None`, optional 

1091 Request body. 

1092 preload_content : `bool`, optional 

1093 If True, the response body is downloaded and can be retrieved 

1094 via the returned response `.data` property. If False, the 

1095 caller needs to call `.read()` on the returned response object to 

1096 download the body, either entirely in one call or by chunks. 

1097 redirect : `bool`, optional 

1098 If True, automatically handle redirects. If False, the returned 

1099 response may contain a redirection to another location. 

1100 pool_manager : `PoolManager`, optional 

1101 Pool manager to use for sending this request. If not provided, 

1102 this client's pool manager is used. 

1103 kwargs : `dict[Any, Any]`, optional 

1104 Keyword arguments to pass unmodified to 

1105 `urllib3.PoolManager.request()`. 

1106 

1107 Returns 

1108 ------- 

1109 resp: `HTTPResponse` 

1110 Response to the request as received from the server. 

1111 """ 

1112 # Retrieve the URL we must use to send this request to one of this 

1113 # client's configured frontend servers. 

1114 url = self._rewrite_url_for_frontend(url) 

1115 

1116 # If this client is configured not to reuse the network connection 

1117 # with the server, add a "Connection: close" header to this request. 

1118 # 

1119 # However, if the caller has explicitly specified a "Connection" 

1120 # header, whatever its value, don't modify it. 

1121 headers = {} if headers is None else dict(headers) 

1122 if "Connection" not in headers and not self._config.reuse_connection: 

1123 headers.update({"Connection": "close"}) 

1124 

1125 # If an authorizer (basic or token) is configured for this client, 

1126 # allow it to set the "Authorization" header to this outgoing request. 

1127 if self._authorizer is not None: 

1128 self._authorizer.set_authorization(headers) 

1129 

1130 if log.isEnabledFor(logging.DEBUG): 

1131 annotation = "" 

1132 if method == "GET" and "Range" in headers: 

1133 byte_range = headers.get("Range", "").removeprefix("bytes=") 

1134 annotation = f" (byte range: {byte_range})" 

1135 

1136 log.debug("sending request %s %s%s", method, redact_url(url), annotation) 

1137 

1138 if pool_manager is None: 

1139 pool_manager = self._pool_manager 

1140 

1141 with time_this( 

1142 log, 

1143 msg="%s %s", 

1144 args=(method, url), 

1145 mem_usage=self._config.collect_memory_usage, 

1146 mem_unit=u.mebibyte, 

1147 ): 

1148 return pool_manager.request( 

1149 method, 

1150 url, 

1151 body=body, 

1152 headers=headers, 

1153 preload_content=preload_content, 

1154 redirect=redirect, 

1155 **kwargs, 

1156 ) 

1157 

1158 def _options( 

1159 self, 

1160 url: str, 

1161 headers: dict[str, str] | None = None, 

1162 pool_manager: PoolManager | None = None, 

1163 ) -> HTTPResponse: 

1164 """Send a HTTP OPTIONS request and return the response unmodified. 

1165 

1166 Parameters 

1167 ---------- 

1168 url : `str` 

1169 Target URL. 

1170 headers : `dict[str, str]`, optional 

1171 Headers to sent with the request. 

1172 pool_manager : `PoolManager`, optional 

1173 Pool manager to use to send this request. 

1174 

1175 Returns 

1176 ------- 

1177 resp: `HTTPResponse` 

1178 Response to the request as received from the server. 

1179 

1180 Notes 

1181 ----- 

1182 This method is intended for subclasses to override when needed. 

1183 """ 

1184 return self._request("OPTIONS", url=url, headers=headers, pool_manager=pool_manager) 

1185 

1186 def _copy( 

1187 self, 

1188 url: str, 

1189 headers: dict[str, str] | None = None, 

1190 preload_content: bool = True, 

1191 pool_manager: PoolManager | None = None, 

1192 ) -> HTTPResponse: 

1193 """Send a webDAV COPY request and return the response unmodified. 

1194 

1195 Parameters 

1196 ---------- 

1197 url : `str` 

1198 Target URL. 

1199 headers : `dict[str, str]`, optional 

1200 Headers to sent with the request. 

1201 pool_manager : `PoolManager`, optional 

1202 Pool manager to use to send this request. 

1203 

1204 Notes 

1205 ----- 

1206 This method is intended for subclasses to override when needed. 

1207 """ 

1208 return self._request( 

1209 "COPY", url=url, headers=headers, preload_content=preload_content, pool_manager=pool_manager 

1210 ) 

1211 

1212 def _delete( 

1213 self, 

1214 url: str, 

1215 headers: dict[str, str] | None = None, 

1216 pool_manager: PoolManager | None = None, 

1217 ) -> HTTPResponse: 

1218 """Send a HTTP DELETE request and return the response unmodified. 

1219 

1220 Parameters 

1221 ---------- 

1222 url : `str` 

1223 Target URL. 

1224 headers : `dict[str, str]`, optional 

1225 Headers to sent with the request. 

1226 pool_manager : `PoolManager`, optional 

1227 Pool manager to use to send this request. 

1228 

1229 Notes 

1230 ----- 

1231 This method is intended for subclasses to override when needed. 

1232 """ 

1233 return self._request("DELETE", url=url, headers=headers, pool_manager=pool_manager) 

1234 

1235 def _get( 

1236 self, 

1237 url: str, 

1238 headers: dict[str, str] | None = None, 

1239 preload_content: bool = True, 

1240 redirect: bool = True, 

1241 pool_manager: PoolManager | None = None, 

1242 ) -> HTTPResponse: 

1243 """Send a HTTP GET request and return the response unmodified. 

1244 

1245 Parameters 

1246 ---------- 

1247 url : `str` 

1248 Target URL. 

1249 headers : `dict[str, str]`, optional 

1250 Headers to sent with the request. 

1251 preload_content : `bool`, optional 

1252 If True, the response body is downloaded and can be retrieved 

1253 via the returned response `.data` property. If False, the 

1254 caller needs to call the `.read()` on the returned response 

1255 object to download the body. 

1256 redirect : `bool`, optional 

1257 If True, follow redirections. 

1258 pool_manager : `PoolManager`, optional 

1259 Pool manager to send the request through. 

1260 

1261 Returns 

1262 ------- 

1263 resp: `HTTPResponse` 

1264 Response to the GET request as received from the server. 

1265 

1266 Notes 

1267 ----- 

1268 This method is intended for subclasses to override when needed. 

1269 """ 

1270 return self._request( 

1271 "GET", 

1272 url=url, 

1273 headers=headers, 

1274 preload_content=preload_content, 

1275 redirect=redirect, 

1276 pool_manager=pool_manager, 

1277 ) 

1278 

1279 def _head( 

1280 self, 

1281 url: str, 

1282 headers: dict[str, str] | None = None, 

1283 pool_manager: PoolManager | None = None, 

1284 ) -> HTTPResponse: 

1285 """Send a HTTP HEAD request and return the response. 

1286 

1287 Parameters 

1288 ---------- 

1289 url : `str` 

1290 Target URL. 

1291 headers : `bool` 

1292 If the target URL is not found, raise an exception. Otherwise 

1293 just return the response. 

1294 pool_manager : `PoolManager`, optional 

1295 Pool manager to use to send this request. 

1296 

1297 Notes 

1298 ----- 

1299 This method is intended for subclasses to override when needed. 

1300 """ 

1301 return self._request("HEAD", url=url, headers=headers, pool_manager=pool_manager) 

1302 

1303 def _mkcol( 

1304 self, 

1305 url: str, 

1306 headers: dict[str, str] | None = None, 

1307 pool_manager: PoolManager | None = None, 

1308 ) -> HTTPResponse: 

1309 """Send a webDAV MKCOL request and return the response unmodified. 

1310 

1311 Parameters 

1312 ---------- 

1313 url : `str` 

1314 Target URL. 

1315 headers : `dict[str, str]`, optional 

1316 Headers to sent with the request. 

1317 pool_manager : `PoolManager`, optional 

1318 Pool manager to use to send this request. 

1319 

1320 Notes 

1321 ----- 

1322 This method is intended for subclasses to override when needed. 

1323 """ 

1324 return self._request("MKCOL", url=url, headers=headers, pool_manager=pool_manager) 

1325 

1326 def _move( 

1327 self, 

1328 url: str, 

1329 headers: dict[str, str] | None = None, 

1330 pool_manager: PoolManager | None = None, 

1331 ) -> HTTPResponse: 

1332 """Send a webDAV MOVE request and return the response unmodified. 

1333 

1334 Parameters 

1335 ---------- 

1336 url : `str` 

1337 Target URL. 

1338 headers : `dict[str, str]`, optional 

1339 Headers to sent with the request. 

1340 pool_manager : `PoolManager`, optional 

1341 Pool manager to use to send this request. 

1342 

1343 Notes 

1344 ----- 

1345 This method is intended for subclasses to override when needed. 

1346 """ 

1347 return self._request("MOVE", url=url, headers=headers, pool_manager=pool_manager) 

1348 

1349 def _propfind( 

1350 self, 

1351 url: str, 

1352 headers: dict[str, str] | None = None, 

1353 body: str = "", 

1354 pool_manager: PoolManager | None = None, 

1355 ) -> HTTPResponse: 

1356 """Send a webDAV PROPFIND request and return the response unmodified. 

1357 

1358 Parameters 

1359 ---------- 

1360 url : `str` 

1361 Target URL. 

1362 headers : `dict[str, str]`, optional 

1363 Headers to sent with the request. 

1364 body : `str`, optional 

1365 Request body. 

1366 pool_manager : `PoolManager`, optional 

1367 Pool manager to use to send this request. 

1368 

1369 Notes 

1370 ----- 

1371 This method is intended for subclasses to override when needed. 

1372 """ 

1373 return self._request("PROPFIND", url=url, headers=headers, body=body, pool_manager=pool_manager) 

1374 

1375 def _put( 

1376 self, 

1377 url: str, 

1378 headers: dict[str, str] | None = None, 

1379 body: BinaryIO | bytes = b"", 

1380 preload_content: bool = True, 

1381 redirect: bool = True, 

1382 pool_manager: PoolManager | None = None, 

1383 ) -> HTTPResponse: 

1384 """Send a HTTP PUT request and return the response unmodified. 

1385 

1386 Parameters 

1387 ---------- 

1388 url : `str` 

1389 Target URL. 

1390 headers : `dict[str, str]`, optional 

1391 Headers to sent with the request. 

1392 body : `BinaryIO` or `bytes`, optional 

1393 Request body. 

1394 preload_content : `bool`, optional 

1395 If True, the response body is downloaded and can be retrieved 

1396 via the returned response `.data` property. If False, the 

1397 caller needs to call the `.read()` on the returned response 

1398 object to download the body. 

1399 redirect : `bool`, optional 

1400 If True, follow redirections. 

1401 pool_manager : `PoolManager`, optional 

1402 Pool manager to send the request through. 

1403 

1404 Returns 

1405 ------- 

1406 resp: `HTTPResponse` 

1407 Response to the PUT request as received from the server. 

1408 

1409 Notes 

1410 ----- 

1411 This method is intended for subclasses to override when needed. 

1412 """ 

1413 # Disable retries when we know the request is not idempotent. In 

1414 # particular, when the body of the request is an `io.BufferedReader`, 

1415 # any attempt to use that body may totally or partially consume it. 

1416 # That means that in case of a retry, the last successful attempt may 

1417 # end up uploading an incomplete body and, as a consequence, the 

1418 # resulting uploaded file may be either incomplete or have a length 

1419 # of zero. 

1420 # 

1421 # So we only retry a PUT request when the body is an instance of 

1422 # `bytes`. 

1423 # 

1424 # Note that we cannot set `retries` to False since that setting would 

1425 # also disable redirection. To disable retries only, we must explicitly 

1426 # set the `retries` keyword argument to integer value zero (0). 

1427 # 

1428 # See documentation: 

1429 # https://urllib3.readthedocs.io/en/stable/user-guide.html#retrying-requests 

1430 kwargs: dict[Any, Any] = {} if isinstance(body, bytes) else {"retries": 0} 

1431 

1432 return self._request( 

1433 "PUT", 

1434 url=url, 

1435 headers=headers, 

1436 body=body, 

1437 preload_content=preload_content, 

1438 redirect=redirect, 

1439 pool_manager=pool_manager, 

1440 **kwargs, 

1441 ) 

1442 

1443 def head( 

1444 self, 

1445 url: str, 

1446 headers: dict[str, str] | None = None, 

1447 ) -> HTTPResponse: 

1448 """Send a HTTP HEAD request, process and return the response 

1449 only if successful. 

1450 

1451 Parameters 

1452 ---------- 

1453 url : `str` 

1454 Target URL. 

1455 headers : `bool` 

1456 If the target URL is not found, raise an exception. Otherwise 

1457 just return the response. 

1458 """ 

1459 headers = {} if headers is None else dict(headers) 

1460 resp = self._head(url=url, headers=headers) 

1461 match resp.status: 

1462 case HTTPStatus.OK: 

1463 return resp 

1464 case HTTPStatus.NOT_FOUND: 

1465 raise FileNotFoundError(f"No file found at {resp.geturl()}") 

1466 case _: 

1467 raise unexpected_status_error("HEAD", url, resp) 

1468 

1469 def get( 

1470 self, 

1471 url: str, 

1472 headers: dict[str, str] | None = None, 

1473 preload_content: bool = True, 

1474 redirect: bool = True, 

1475 ) -> tuple[str, HTTPResponse]: 

1476 """Send a HTTP GET request. 

1477 

1478 Parameters 

1479 ---------- 

1480 url : `str` 

1481 Target URL. 

1482 headers : `dict[str, str]`, optional 

1483 Headers to sent with the request. 

1484 preload_content : `bool`, optional 

1485 If True, the response body is downloaded and can be retrieved 

1486 via the returned response `.data` property. If False, the 

1487 caller needs to call the `.read()` on the returned response 

1488 object to download the body. 

1489 redirect : `bool`, optional 

1490 If True, follow redirections. 

1491 

1492 Returns 

1493 ------- 

1494 url: `str` 

1495 The URL we used to obtain this response. It may be different from 

1496 the URL passed as argument in case of redirection. 

1497 resp: `HTTPResponse` 

1498 Response to the GET request as received from the server. 

1499 """ 

1500 # Send the GET request to the frontend servers. 

1501 headers = {} if headers is None else dict(headers) 

1502 resp = self._get( 

1503 url, 

1504 headers=headers, 

1505 preload_content=preload_content, 

1506 redirect=redirect, 

1507 ) 

1508 match resp.status: 

1509 case HTTPStatus.OK | HTTPStatus.PARTIAL_CONTENT: 

1510 return self._get_response_url(resp, default_url=url), resp 

1511 case HTTPStatus.NOT_FOUND: 

1512 raise FileNotFoundError(f"No file found at {resp.geturl()}") 

1513 case status if status in resp.REDIRECT_STATUSES and not redirect: 

1514 # This response is a redirection but we are asked not to 

1515 # follow redirections, so return this response as is. 

1516 return self._get_response_url(resp, default_url=url), resp 

1517 case _: 

1518 raise unexpected_status_error("GET", url, resp) 

1519 

1520 def options( 

1521 self, 

1522 url: str, 

1523 headers: dict[str, str] | None = None, 

1524 ) -> HTTPResponse: 

1525 """Send a HTTP OPTIONS request and return the response on success. 

1526 

1527 Parameters 

1528 ---------- 

1529 url : `str` 

1530 Target URL. 

1531 headers : `dict` [`str`, `str`], optional 

1532 Headers to sent with the request. 

1533 

1534 Returns 

1535 ------- 

1536 resp: `HTTPResponse` 

1537 Response to the request as received from the server. 

1538 """ 

1539 resp = self._options(url=url, headers=headers) 

1540 match resp.status: 

1541 case HTTPStatus.OK | HTTPStatus.CREATED: 

1542 return resp 

1543 case _: 

1544 raise unexpected_status_error("OPTIONS", url, resp) 

1545 

1546 def propfind( 

1547 self, 

1548 url: str, 

1549 headers: dict[str, str] | None = None, 

1550 body: str = "", 

1551 depth: str = "0", 

1552 ) -> HTTPResponse: 

1553 """Send a HTTP PROPFIND request and return the unmodified response on 

1554 success. 

1555 

1556 Parameters 

1557 ---------- 

1558 url : `str` 

1559 Target URL. 

1560 headers : `dict[str, str]`, optional 

1561 Headers to sent with the request. 

1562 body : `str`, optional 

1563 Request body. 

1564 depth : `str`, optional 

1565 ???. 

1566 """ 

1567 headers = {} if headers is None else dict(headers) 

1568 headers.update( 

1569 { 

1570 "Depth": depth, 

1571 "Content-Type": 'application/xml; charset="utf-8"', 

1572 "Content-Length": str(len(body)), 

1573 } 

1574 ) 

1575 resp = self._propfind(url=url, headers=headers, body=body) 

1576 match resp.status: 

1577 case HTTPStatus.MULTI_STATUS | HTTPStatus.NOT_FOUND: 

1578 return resp 

1579 case _: 

1580 raise unexpected_status_error("PROPFIND", url, resp) 

1581 

1582 def put( 

1583 self, 

1584 url: str, 

1585 headers: dict[str, str] | None = None, 

1586 data: BinaryIO | bytes = b"", 

1587 ) -> int | None: 

1588 """Send a HTTP PUT request. 

1589 

1590 Parameters 

1591 ---------- 

1592 url : `str` 

1593 Target URL. 

1594 headers : `dict[str, str]`, optional 

1595 Headers to sent with the request. 

1596 data : `BinaryIO` or `bytes` 

1597 Request body. 

1598 

1599 Returns 

1600 ------- 

1601 size : `int | None` 

1602 The size in bytes of the file uploaded. Can be `None` if the size 

1603 could not be retrieved. 

1604 """ 

1605 # Send a PUT request with empty body and handle redirection. This 

1606 # is useful if the server redirects us; since we cannot rewind the 

1607 # data we are uploading, we don't start uploading data until we 

1608 # connect to the server that will actually serve our request. 

1609 frontend_headers = {} if headers is None else dict(headers) 

1610 frontend_headers.update({"Content-Length": "0"}) 

1611 resp = self._put(url, headers=frontend_headers, body=b"", redirect=False) 

1612 match resp.status: 

1613 case HTTPStatus.OK | HTTPStatus.CREATED | HTTPStatus.NO_CONTENT: 

1614 redirect_url = url 

1615 case status if status in resp.REDIRECT_STATUSES: 

1616 redirect_url = resp.headers.get("Location") 

1617 case _: 

1618 raise unexpected_status_error("PUT", url, resp) 

1619 

1620 # We may have been redirectred. Upload the file contents to 

1621 # its final destination. 

1622 

1623 # Ask the server to compute and record a checksum of the uploaded 

1624 # file contents, for later integrity checks. Since we don't compute 

1625 # the digest ourselves while uploading the data, we cannot control 

1626 # after the request is complete that the data we uploaded is 

1627 # identical to the data recorded by the server, but at least the 

1628 # server has recorded a digest of the data it stored. 

1629 # 

1630 # See RFC-3230 for details and 

1631 # https://www.iana.org/assignments/http-dig-alg/http-dig-alg.xhtml 

1632 # for the list of supported digest algorithhms. 

1633 # 

1634 # In addition, note that not all servers implement this RFC so 

1635 # the checksum reqquest may be ignored by the server. 

1636 backend_headers = {} if headers is None else dict(headers) 

1637 if (checksum := self._config.request_checksum) is not None: 

1638 backend_headers.update({"Want-Digest": checksum}) 

1639 

1640 resp = self._put(redirect_url, body=data, headers=backend_headers) 

1641 match resp.status: 

1642 case HTTPStatus.OK | HTTPStatus.CREATED | HTTPStatus.NO_CONTENT: 

1643 # Send a HEAD request to retrieve the size of the file we 

1644 # just uploaded 

1645 resp = self.head(redirect_url) 

1646 size = int(resp.headers.get("Content-Length", -1)) 

1647 return None if size == -1 else size 

1648 case _: 

1649 raise unexpected_status_error("PUT", redirect_url, resp) 

1650 

1651 def _get_temporary_basename(self, basename: str, prefix: str) -> str: 

1652 """Return a basename for a temporary file.""" 

1653 unique_id = str(uuid.uuid4()) 

1654 return f"{prefix}.{unique_id}.{basename}" 

1655 

1656 def _split_parent_and_basename(self, url: str) -> tuple[str, str]: 

1657 """Return the URL of the parent directory and the basename from 

1658 `url`. 

1659 """ 

1660 parsed: Url = parse_url(url) 

1661 normalized_path = normalize_path(parsed.path) 

1662 parent_path = posixpath.dirname(normalized_path) 

1663 basename = posixpath.basename(normalized_path) 

1664 parent_url = Url( 

1665 scheme=parsed.scheme, 

1666 auth=parsed.auth, 

1667 host=parsed.host, 

1668 port=parsed.port, 

1669 path=parent_path, 

1670 query=parsed.query, 

1671 fragment=parsed.fragment, 

1672 ).url 

1673 return parent_url, basename 

1674 

1675 def _parent(self, url: str) -> str: 

1676 """Return the URL of the parent directory to `url`.""" 

1677 parent_url, _ = self._split_parent_and_basename(url) 

1678 return parent_url 

1679 

1680 def _make_temporary_url(self, url: str, prefix: str = ".tmp") -> str: 

1681 """Return the URL of a temporary file based on `url`.""" 

1682 parent_url, basename = self._split_parent_and_basename(url) 

1683 temporary_basename = self._get_temporary_basename(basename=basename, prefix=prefix) 

1684 return f"{parent_url}/{temporary_basename}" 

1685 

1686 def exists(self, url: str) -> bool: 

1687 """Return True if a file or directory exists at `url`. 

1688 

1689 Parameters 

1690 ---------- 

1691 url : `str` 

1692 Target URL. 

1693 

1694 Returns 

1695 ------- 

1696 result: `bool` 

1697 True if there is an object at `url`. 

1698 """ 

1699 return self.stat(url).exists 

1700 

1701 def size(self, url: str) -> int: 

1702 """Return the size in bytes of resource at `url`. 

1703 

1704 If `url` designates a directory, the size is zero. 

1705 

1706 Parameters 

1707 ---------- 

1708 url : `str` 

1709 Target URL. 

1710 

1711 Returns 

1712 ------- 

1713 size: `int` 

1714 The number of bytes of the resource located at `url`. 

1715 """ 

1716 # Check if we have the size of this URL in our cache 

1717 if (size := self._file_size_cache.get_size(url)) is not None: 

1718 return size 

1719 

1720 stat = self.stat(url) 

1721 if not stat.exists: 

1722 raise FileNotFoundError(f"No file or directory found at {url}") 

1723 else: 

1724 return stat.size 

1725 

1726 def is_dir(self, url: str) -> bool: 

1727 """Return True if a directory exists at `url`. 

1728 

1729 Parameters 

1730 ---------- 

1731 url : `str` 

1732 Target URL. 

1733 

1734 Returns 

1735 ------- 

1736 result: `bool` 

1737 True if there is a directory at `url`. 

1738 """ 

1739 return self.stat(url).is_dir 

1740 

1741 def mkcol(self, url: str) -> None: 

1742 """Create a directory at `url`. 

1743 

1744 If a directory already exists at `url` no error is returned nor 

1745 exception is raised. An exception is raised if a file exists at `url`. 

1746 

1747 Parameters 

1748 ---------- 

1749 url : `str` 

1750 Target URL. 

1751 """ 

1752 resp = self._mkcol(url=url) 

1753 match resp.status: 

1754 case HTTPStatus.CREATED | HTTPStatus.METHOD_NOT_ALLOWED: 

1755 return 

1756 case HTTPStatus.CONFLICT: 

1757 # The parent directory does not exist. Create it first except 

1758 # if the parent's path is "/". 

1759 parent = self._parent(url) 

1760 if not parent.endswith("/"): 

1761 self.mkcol(parent) 

1762 resp = self._mkcol(url=url) 

1763 case _: 

1764 raise ValueError( 

1765 f"Can not create directory {resp.geturl()}: status {resp.status} {resp.reason}" 

1766 ) 

1767 

1768 def stat(self, url: str) -> DavFileMetadata: 

1769 """Return some properties of file or directory located at `url`. 

1770 

1771 Parameters 

1772 ---------- 

1773 url : `str` 

1774 Target URL. 

1775 

1776 Returns 

1777 ------- 

1778 result: `DavResourceMetadata` 

1779 Details of the resources at `url`. If no resource was found at 

1780 that URL no exception is raised. Instead the returned details allow 

1781 for detecting that the resource does not exist. 

1782 

1783 The returned value should include fields to determine 

1784 if there is a file or a directory at that `url` and if so, its 

1785 size and kind (file or directory). Other fields may also be 

1786 included depending on the implementation of the webDAV protocol 

1787 by the server. 

1788 """ 

1789 # Request the minimum set of DAV properties. 

1790 body = ( 

1791 """<?xml version="1.0" encoding="utf-8"?>""" 

1792 """<D:propfind xmlns:D="DAV:">""" 

1793 """<D:prop>""" 

1794 """<D:resourcetype/>""" 

1795 """<D:getcontentlength/>""" 

1796 """<D:getlastmodified/>""" 

1797 """</D:prop>""" 

1798 """</D:propfind>""" 

1799 ) 

1800 resp = self.propfind(url, body=body, depth="0") 

1801 match resp.status: 

1802 case HTTPStatus.NOT_FOUND: 

1803 href = url.replace(self._base_url, "", 1) 

1804 return DavFileMetadata(base_url=self._base_url, href=href) 

1805 case HTTPStatus.MULTI_STATUS: 

1806 property = self._propfind_parser.parse(resp.data)[0] 

1807 return DavFileMetadata.from_property(base_url=self._base_url, property=property) 

1808 case _: 

1809 raise unexpected_status_error("PROPFIND", url, resp) 

1810 

1811 def info(self, url: str, name: str | None = None) -> dict[str, Any]: 

1812 """Return the details about the file or directory at `url`. 

1813 

1814 Parameters 

1815 ---------- 

1816 url : `str` 

1817 Target URL. 

1818 name : `str` 

1819 Name of the object to be included in the returned value. If None, 

1820 the `url` is used as name. 

1821 

1822 Returns 

1823 ------- 

1824 result: `dict` 

1825 For an existing file, the returned value has the form: 

1826 

1827 .. code-block:: json 

1828 

1829 { 

1830 "name": name, 

1831 "size": 1234, 

1832 "type": "file", 

1833 "last_modified": 

1834 datetime.datetime(2025, 4, 10, 15, 12, 51, 227854), 

1835 "checksums": { 

1836 "adler32": "0fc5f83f", 

1837 "md5": "1f57339acdec099c6c0a41f8e3d5fcd0", 

1838 } 

1839 } 

1840 

1841 For an existing directory, the returned value has the form: 

1842 

1843 .. code-block:: json 

1844 

1845 { 

1846 "name": name, 

1847 "size": 0, 

1848 "type": "directory", 

1849 "last_modified": 

1850 datetime.datetime(2025, 4, 10, 15, 12, 51, 227854), 

1851 "checksums": {}, 

1852 } 

1853 

1854 For a non-existing file or directory, the returned value has the 

1855 form: 

1856 

1857 .. code-block:: json 

1858 

1859 { 

1860 "name": name, 

1861 "size": None, 

1862 "type": None, 

1863 "last_modified": datetime.datetime(1, 1, 1, 0, 0), 

1864 "checksums": {}, 

1865 } 

1866 

1867 Notes 

1868 ----- 

1869 The format of the returned directory is inspired and compatible with 

1870 `fsspec`. 

1871 

1872 The size of existing directories is always zero. The `checksums` 

1873 dictionary is empty for directories and may be empty for files if the 

1874 server does not compute and store the checksum of the files it stores. 

1875 """ 

1876 result: dict[str, Any] = { 

1877 "name": name if name is not None else url, 

1878 "type": None, 

1879 "size": None, 

1880 "last_modified": datetime.min, 

1881 "checksums": {}, 

1882 } 

1883 metadata = self.stat(url) 

1884 if not metadata.exists: 

1885 return result 

1886 

1887 result.update( 

1888 { 

1889 "type": "directory" if metadata.is_dir else "file", 

1890 "size": metadata.size, 

1891 "last_modified": metadata.last_modified, 

1892 "checksums": metadata.checksums, 

1893 } 

1894 ) 

1895 return result 

1896 

1897 def move(self, source_url: str, destination_url: str, overwrite: bool = False) -> HTTPResponse: 

1898 """Send a webDAV MOVE request and return the response unmodified. 

1899 

1900 Parameters 

1901 ---------- 

1902 source_url : `str` 

1903 Source URL. 

1904 destination_url : `str` 

1905 Destination URL. 

1906 overwrite : `bool`, optional 

1907 Overwrite the destination if it exists. 

1908 

1909 Returns 

1910 ------- 

1911 resp : `HTTPResponse` 

1912 The unmodified response received from the server. 

1913 """ 

1914 headers = { 

1915 "Destination": destination_url, 

1916 "Overwrite": "T" if overwrite else "F", 

1917 } 

1918 return self._move(source_url, headers=headers) 

1919 

1920 def read_dir(self, url: str) -> list[DavFileMetadata]: 

1921 """Return the properties of the files or directories contained in 

1922 directory located at `url`. 

1923 

1924 If `url` designates a file, only the details of itself are returned. 

1925 

1926 Parameters 

1927 ---------- 

1928 url : `str` 

1929 Target URL. 

1930 

1931 Returns 

1932 ------- 

1933 result: `list[DavResourceMetadata]` 

1934 List of details of each file or directory within `url`. 

1935 """ 

1936 body = ( 

1937 """<?xml version="1.0" encoding="utf-8"?>""" 

1938 """<D:propfind xmlns:D="DAV:"><D:prop>""" 

1939 """<D:resourcetype/><D:getcontentlength/><D:getlastmodified/><D:displayname/>""" 

1940 """</D:prop></D:propfind>""" 

1941 ) 

1942 resp = self.propfind(url, body=body, depth="1") 

1943 match resp.status: 

1944 case HTTPStatus.MULTI_STATUS: 

1945 pass 

1946 case HTTPStatus.NOT_FOUND: 

1947 raise FileNotFoundError(f"No directory found at {resp.geturl()}") 

1948 case _: 

1949 raise unexpected_status_error("PROPFIND", url, resp) 

1950 

1951 if (path := parse_url(url).path) is not None: 

1952 this_dir_href = path.rstrip("/") + "/" 

1953 else: 

1954 this_dir_href = "/" 

1955 

1956 result = [] 

1957 for property in self._propfind_parser.parse(resp.data): 

1958 # Don't include in the results the metadata of the directory we 

1959 # traversing. 

1960 # Some webDAV servers do not append a "/" to the href of a 

1961 # directory in their response to PROPFIND, so we must take into 

1962 # account that. 

1963 if property.is_file: 

1964 result.append(DavFileMetadata.from_property(base_url=self._base_url, property=property)) 

1965 elif property.is_dir and property.href != this_dir_href: 

1966 result.append(DavFileMetadata.from_property(base_url=self._base_url, property=property)) 

1967 

1968 return result 

1969 

1970 def read(self, url: str) -> tuple[str, bytes]: 

1971 """Download the contents of file located at `url`. 

1972 

1973 Parameters 

1974 ---------- 

1975 url : `str` 

1976 Target URL. 

1977 

1978 Returns 

1979 ------- 

1980 url: `str` 

1981 Backend URL from which the data was obtained. 

1982 data: `bytes` 

1983 Contents of the file. 

1984 

1985 Notes 

1986 ----- 

1987 The caller must ensure that the resource at `url` is a file, not 

1988 a directory. 

1989 """ 

1990 backend_url, resp = self.get(url) 

1991 return backend_url, resp.data 

1992 

1993 def read_range( 

1994 self, 

1995 url: str, 

1996 start: int, 

1997 end: int | None, 

1998 headers: dict[str, str] | None = None, 

1999 release_backend: bool = True, 

2000 ) -> tuple[str, bytes]: 

2001 """Download partial content of file located at `url`. 

2002 

2003 Parameters 

2004 ---------- 

2005 url : `str` 

2006 Target URL. 

2007 start : `int` 

2008 Starting byte offset of the range to download. 

2009 end : `int` 

2010 Ending byte offset of the range to download. 

2011 headers : `dict[str,str]`, optional 

2012 Specific headers to sent with the GET request. 

2013 release_backend : `bool`, optional 

2014 Whether or not to close the connection to the backend. 

2015 

2016 Returns 

2017 ------- 

2018 backend_url: `str` 

2019 URL used to retrieve this data. If the server redirected us 

2020 this is the URL we were redirected to. 

2021 data: `bytes` 

2022 Partial contents of the file. 

2023 

2024 Notes 

2025 ----- 

2026 The caller must ensure that the resource at `url` is a file, not 

2027 a directory. This is important because some webDAV servers respond 

2028 with an HTML document when asked for reading a directory. 

2029 """ 

2030 range_headers = {"Accept-Encoding": "identity"} 

2031 if end is None: 

2032 range_headers.update({"Range": f"bytes={start}-"}) 

2033 else: 

2034 range_headers.update({"Range": f"bytes={start}-{end}"}) 

2035 

2036 frontend_headers = {} if headers is None else dict(headers) 

2037 frontend_headers.update(range_headers) 

2038 

2039 # Send the request to the frontend server and don't follow 

2040 # redirections automatically. We need to be able to add a 

2041 # "Connection: close" request header when sending the request to the 

2042 # backend server (if any) if are requested to. We don't send that 

2043 # header to the frontend. 

2044 final_url, resp = self.get(url, headers=frontend_headers, redirect=False) 

2045 match resp.status: 

2046 case HTTPStatus.PARTIAL_CONTENT: 

2047 return final_url, resp.data 

2048 case status if status not in resp.REDIRECT_STATUSES: 

2049 raise unexpected_status_error("GET (with 'Range' header)", url, resp) 

2050 case _: 

2051 pass 

2052 

2053 # We were redirected to a backend server. Follow the redirection and 

2054 # if requested add a "Connection: close" header to explicitly release 

2055 # the backend server. 

2056 redirect_url = resp.headers.get("Location") 

2057 log.debug("GET request to %s got redirected to %s", url, redirect_url) 

2058 

2059 backend_headers = {} if headers is None else dict(headers) 

2060 backend_headers.update(range_headers) 

2061 backend_headers.update({"Connection": "close" if release_backend else "keep-alive"}) 

2062 

2063 final_url, resp = self.get(redirect_url, headers=backend_headers) 

2064 match resp.status: 

2065 case HTTPStatus.PARTIAL_CONTENT: 

2066 return final_url, resp.data 

2067 case _: 

2068 raise unexpected_status_error("GET (with 'Range' header)", redirect_url, resp) 

2069 

2070 def _write_response_body_to_file(self, resp: HTTPResponse, filename: str, chunk_size: int) -> int: 

2071 """Write the the response body to a local file. 

2072 

2073 Parameters 

2074 ---------- 

2075 resp : `HTTPResponse` 

2076 The HTTP Response to read the body from. 

2077 filename : `str` 

2078 Local file to write the content to. If the file already exists, 

2079 it will be rewritten. 

2080 chunk_size : `int` 

2081 Size of the chunks to write to `filename`. 

2082 

2083 Returns 

2084 ------- 

2085 count: `int` 

2086 Number of bytes written to `filename`. 

2087 """ 

2088 try: 

2089 # Read the response body into a pre-allocated memory buffer and 

2090 # write the buffer content to the destination file avoiding 

2091 # copies if possible. 

2092 content_length = 0 

2093 with open(filename, "wb", buffering=0) as file: 

2094 view = memoryview(bytearray(chunk_size)) 

2095 while True: 

2096 if (count := resp.readinto(view)) > 0: # type: ignore 

2097 content_length += count 

2098 file.write(view[:count]) 

2099 else: 

2100 break 

2101 

2102 # Check that the expected and actual content lengths match. 

2103 # Perform this check only when the body of the response was not 

2104 # encoded by the server. 

2105 expected_length: int = int(resp.headers.get("Content-Length", -1)) 

2106 if ( 

2107 "Content-Encoding" not in resp.headers 

2108 and expected_length != -1 

2109 and expected_length != content_length 

2110 ): 

2111 raise ValueError( 

2112 f"Size of downloaded file does not match value in Content-Length header for " 

2113 f"{resp.geturl()}: expecting {expected_length} and got {content_length} bytes" 

2114 ) 

2115 

2116 return content_length 

2117 finally: 

2118 # Release the connection 

2119 resp.drain_conn() 

2120 resp.release_conn() 

2121 

2122 def download(self, url: str, filename: str, chunk_size: int) -> int: 

2123 """Download the content of a file and write it to local file. 

2124 

2125 Parameters 

2126 ---------- 

2127 url : `str` 

2128 Target URL. 

2129 filename : `str` 

2130 Local file to write the content to. If the file already exists, 

2131 it will be rewritten. 

2132 chunk_size : `int` 

2133 Size of the chunks to write to `filename`. 

2134 

2135 Returns 

2136 ------- 

2137 count: `int` 

2138 Number of bytes written to `filename`. 

2139 

2140 Notes 

2141 ----- 

2142 The caller must ensure that the resource at `url` is a file, not 

2143 a directory. 

2144 """ 

2145 _, resp = self.get(url, preload_content=False) 

2146 return self._write_response_body_to_file(resp, filename, chunk_size) 

2147 

2148 def write(self, url: str, data: BinaryIO | bytes) -> int | None: 

2149 """Create or rewrite a remote file at `url` with `data` as its 

2150 contents. 

2151 

2152 Parameters 

2153 ---------- 

2154 url : `str` 

2155 Target URL. 

2156 data : `bytes` 

2157 Sequence of bytes to upload. 

2158 

2159 Returns 

2160 ------- 

2161 size : `int | None` 

2162 The size in bytes of the file uploaded. Can be `None` if the size 

2163 could not be retrieved. 

2164 

2165 Notes 

2166 ----- 

2167 If a file already exists at `url` it will be rewritten. 

2168 """ 

2169 # According to RFC 4918, the parent directory of the file must 

2170 # exist before we can write to it. So create it first and then 

2171 # upload. 

2172 self.mkcol(self._parent(url)) 

2173 

2174 try: 

2175 # Upload to a temporary file and rename to the final name. 

2176 temporary_url = self._make_temporary_url(url) 

2177 size = self.put(temporary_url, data=data) 

2178 self.rename(temporary_url, url, overwrite=True, create_parent=False) 

2179 

2180 # Update the file size cache with this size 

2181 self._file_size_cache.update_size(url, size) 

2182 return size 

2183 except Exception: 

2184 # Upload failed. Attempt to remove the temporary file. 

2185 self.delete(temporary_url) 

2186 raise 

2187 

2188 def checksums(self, url: str) -> dict[str, str]: 

2189 """Return the checksums of the contents of file located at `url`. 

2190 

2191 The checksums are retrieved from the storage endpoint. There may be 

2192 none if the storage endpoint does not automatically expose the 

2193 checksums it computes. 

2194 

2195 Parameters 

2196 ---------- 

2197 url : `str` 

2198 Target URL. 

2199 

2200 Returns 

2201 ------- 

2202 checksums: `dict[str, str]` 

2203 A file exists at `url`. 

2204 The key of the dictionary is the lowercased name of the checksum 

2205 algorithm (e.g. "md5", "adler32"). The value is the lowercased 

2206 checksum itself (e.g. "78441cec2479ec8b545c4d6699f542da"). 

2207 """ 

2208 stat = self.stat(url) 

2209 if not stat.exists: 

2210 raise FileNotFoundError(f"No file found at {url}") 

2211 

2212 return stat.checksums if stat.is_file else {} 

2213 

2214 def delete(self, url: str) -> None: 

2215 """Delete the file or directory at `url`. 

2216 

2217 If there is no file or directory at `url` is not considered an error. 

2218 

2219 Parameters 

2220 ---------- 

2221 url : `str` 

2222 Target URL. 

2223 

2224 Notes 

2225 ----- 

2226 If `url` designates a directory, some webDAV servers recursively 

2227 remove the directory and its contents. Others, only remove the 

2228 directory if it is empty. 

2229 

2230 For a consisten behavior, the caller must check what kind of object 

2231 the target URL is and walk the hierarchy removing all objects. 

2232 """ 

2233 resp = self._delete(url) 

2234 match resp.status: 

2235 case HTTPStatus.OK | HTTPStatus.ACCEPTED | HTTPStatus.NO_CONTENT | HTTPStatus.NOT_FOUND: 

2236 # Invalidate the entry for this file in our cache, if any 

2237 self._file_size_cache.invalidate(url) 

2238 case _: 

2239 raise ValueError( 

2240 f"Unable to delete resource {resp.geturl()}: status {resp.status} {resp.reason}" 

2241 ) 

2242 

2243 def accepts_ranges(self, url: str) -> bool: 

2244 """Return `True` if the server supports a 'Range' header in 

2245 GET requests against `url`. 

2246 

2247 Parameters 

2248 ---------- 

2249 url : `str` 

2250 Target URL. 

2251 """ 

2252 # If we have already determined that the server accepts "Range" for 

2253 # another URL, we assume that it implements that feature for any 

2254 # file it serves, so reuse that information. 

2255 if self._accepts_ranges is not None: 

2256 return self._accepts_ranges 

2257 

2258 with self._lock: 

2259 if self._accepts_ranges is None: 

2260 self._accepts_ranges = self.head(url).headers.get("Accept-Ranges", "") == "bytes" 

2261 

2262 return self._accepts_ranges 

2263 

2264 @property 

2265 def supports_duplicate(self) -> bool: 

2266 """Return True if the server this client interacts with implements 

2267 webDAV COPY method. 

2268 """ 

2269 return self._can_duplicate 

2270 

2271 def copy(self, source_url: str, destination_url: str, overwrite: bool = False) -> None: 

2272 """Copy the file at `source_url` to `destination_url` in the same 

2273 storage endpoint. 

2274 

2275 Parameters 

2276 ---------- 

2277 source_url : `str` 

2278 URL of the source file. 

2279 destination_url : `str` 

2280 URL of the destination file. Its parent directory must exist. 

2281 overwrite : `bool` 

2282 If True and a file exists at `destination_url` it will be 

2283 overwritten. Otherwise an exception is raised. 

2284 """ 

2285 headers = { 

2286 "Destination": destination_url, 

2287 "Overwrite": "T" if overwrite else "F", 

2288 } 

2289 resp = self._copy(source_url, headers=headers) 

2290 match resp.status: 

2291 case HTTPStatus.CREATED | HTTPStatus.NO_CONTENT: 

2292 self._file_size_cache.invalidate(destination_url) 

2293 case _: 

2294 raise ValueError( 

2295 f"Could not copy {resp.geturl()} to {destination_url}: status {resp.status} {resp.reason}" 

2296 ) 

2297 

2298 def duplicate(self, source_url: str, destination_url: str, overwrite: bool = False) -> None: 

2299 """Copy the file at `source_url` to `destination_url` in the same 

2300 storage endpoint. 

2301 

2302 Parameters 

2303 ---------- 

2304 source_url : `str` 

2305 URL of the source file. 

2306 destination_url : `str` 

2307 URL of the destination file. Its parent directory is created if 

2308 necessary. 

2309 overwrite : `bool` 

2310 If True and a file exists at `destination_url` it will be 

2311 overwritten. Otherwise an exception is raised. 

2312 """ 

2313 # Check the source is a file 

2314 if self.is_dir(source_url): 

2315 raise NotImplementedError(f"copy is not implemented for directory {source_url}") 

2316 

2317 # Create the destination's parent directory first because COPY may 

2318 # fail if it does not exist, depending on the server implementation 

2319 # of RFC 4918. 

2320 destination_parent = self._parent(destination_url) 

2321 self.mkcol(destination_parent) 

2322 self.copy(source_url=source_url, destination_url=destination_url, overwrite=overwrite) 

2323 

2324 def rename( 

2325 self, 

2326 source_url: str, 

2327 destination_url: str, 

2328 overwrite: bool = False, 

2329 create_parent: bool = True, 

2330 ) -> None: 

2331 """Rename (move) the file at `source_url` to `destination_url` in the 

2332 same storage endpoint. 

2333 

2334 Parameters 

2335 ---------- 

2336 source_url : `str` 

2337 URL of the source file. 

2338 destination_url : `str` 

2339 URL of the destination file. Its parent directory must exist. 

2340 overwrite : `bool`, optional 

2341 If True and a file exists at `destination_url` it will be 

2342 overwritten. Otherwise an exception is raised. 

2343 create_parent : `bool`, optional 

2344 Whether to create the parent. 

2345 """ 

2346 # Create the destination's parent directory first because MOVE may 

2347 # fail if it does not exist, depending on the server implementation 

2348 # of RFC 4918. 

2349 if create_parent: 

2350 destination_parent = self._parent(destination_url) 

2351 self.mkcol(destination_parent) 

2352 

2353 resp = self.move(source_url=source_url, destination_url=destination_url, overwrite=overwrite) 

2354 match resp.status: 

2355 case HTTPStatus.OK | HTTPStatus.CREATED | HTTPStatus.NO_CONTENT: 

2356 self._file_size_cache.invalidate(destination_url) 

2357 case _: 

2358 raise ValueError( 

2359 f"""Could not move file {resp.geturl()} to {destination_url}: status {resp.status} """ 

2360 f"""{resp.reason}""" 

2361 ) 

2362 

2363 def generate_presigned_get_url(self, url: str, expiration_time_seconds: int) -> str: 

2364 """Return a pre-signed URL that can be used to retrieve this resource 

2365 using an HTTP GET without supplying any access credentials. 

2366 

2367 Parameters 

2368 ---------- 

2369 url : `str` 

2370 Target URL. 

2371 expiration_time_seconds : `int` 

2372 Number of seconds until the generated URL is no longer valid. 

2373 

2374 Returns 

2375 ------- 

2376 url : `str` 

2377 HTTP URL signed for GET. 

2378 """ 

2379 raise NotImplementedError(f"URL signing is not supported by server for {self}") 

2380 

2381 def generate_presigned_put_url(self, url: str, expiration_time_seconds: int) -> str: 

2382 """Return a pre-signed URL that can be used to upload a file to this 

2383 path using an HTTP PUT without supplying any access credentials. 

2384 

2385 Parameters 

2386 ---------- 

2387 url : `str` 

2388 Target URL. 

2389 expiration_time_seconds : `int` 

2390 Number of seconds until the generated URL is no longer valid. 

2391 

2392 Returns 

2393 ------- 

2394 url : `str` 

2395 HTTP URL signed for PUT. 

2396 """ 

2397 raise NotImplementedError(f"URL signing is not supported by server for {self}") 

2398 

2399 

2400class ActivityCaveat(enum.Enum): 

2401 """Helper class for enumerating accepted activity caveats for requesting 

2402 macaroons for dCache or XRootD webDAV servers. 

2403 """ 

2404 

2405 DOWNLOAD = 1 

2406 UPLOAD = 2 

2407 

2408 

2409class DavClientURLSigner(DavClient): 

2410 """WebDAV client which supports signing of URL for upload and download. 

2411 

2412 Instances of this class are thread-safe. 

2413 

2414 Parameters 

2415 ---------- 

2416 url : `str` 

2417 Root URL of the storage endpoint 

2418 (e.g. "https://host.example.org:1234/"). 

2419 config : `DavConfig` 

2420 Configuration to initialize this client. 

2421 accepts_ranges : `bool` | `None` 

2422 Indicate whether the remote server accepts the ``Range`` header in GET 

2423 requests. 

2424 """ 

2425 

2426 def __init__(self, url: str, config: DavConfig, accepts_ranges: bool | None = None) -> None: 

2427 super().__init__(url=url, config=config, accepts_ranges=accepts_ranges) 

2428 

2429 def generate_presigned_get_url(self, url: str, expiration_time_seconds: int) -> str: 

2430 """Return a pre-signed URL that can be used to retrieve the resource 

2431 at `url` using an HTTP GET without supplying any access credentials. 

2432 

2433 Parameters 

2434 ---------- 

2435 url : `str` 

2436 URL of an existing file. 

2437 expiration_time_seconds : `int` 

2438 Number of seconds until the generated URL is no longer valid. 

2439 

2440 Returns 

2441 ------- 

2442 url : `str` 

2443 HTTP URL signed for GET. 

2444 

2445 Notes 

2446 ----- 

2447 Although the returned URL allows for downloading the file at `url` 

2448 without supplying credentials, the HTTP client must be configured 

2449 to accept the certificate the server will present if the client wants 

2450 validate it. The server's certificate may be issued by a certificate 

2451 authority unknown to the client. 

2452 """ 

2453 macaroon: str = self._get_macaroon(url, ActivityCaveat.DOWNLOAD, expiration_time_seconds) 

2454 return f"{url}?authz={macaroon}" 

2455 

2456 def generate_presigned_put_url(self, url: str, expiration_time_seconds: int) -> str: 

2457 """Return a pre-signed URL that can be used to upload a file to `url` 

2458 using an HTTP PUT without supplying any access credentials. 

2459 

2460 Parameters 

2461 ---------- 

2462 url : `str` 

2463 URL of an existing file. 

2464 expiration_time_seconds : `int` 

2465 Number of seconds until the generated URL is no longer valid. 

2466 

2467 Returns 

2468 ------- 

2469 url : `str` 

2470 HTTP URL signed for PUT. 

2471 

2472 Notes 

2473 ----- 

2474 Although the returned URL allows for uploading a file to `url` 

2475 without supplying credentials, the HTTP client must be configured 

2476 to accept the certificate the server will present if the client wants 

2477 validate it. The server's certificate may be issued by a certificate 

2478 authority unknown to the client. 

2479 """ 

2480 macaroon: str = self._get_macaroon(url, ActivityCaveat.UPLOAD, expiration_time_seconds) 

2481 return f"{url}?authz={macaroon}" 

2482 

2483 def _get_macaroon(self, url: str, activity: ActivityCaveat, expiration_time_seconds: int) -> str: 

2484 """Return a macaroon for uploading or downloading the file at `url`. 

2485 

2486 Parameters 

2487 ---------- 

2488 url : `str` 

2489 URL of an existing file. 

2490 activity : `ActivityCaveat` 

2491 the activity the macaroon is requested for. 

2492 expiration_time_seconds : `int` 

2493 Requested duration of the macaroon, in seconds. 

2494 

2495 Returns 

2496 ------- 

2497 macaroon : `str` 

2498 Macaroon to be used with `url` in a GET or PUT request. 

2499 """ 

2500 # dCache and XRootD webDAV servers support delivery of macaroons. 

2501 # 

2502 # For details about dCache macaroons see: 

2503 # https://www.dcache.org/manuals/UserGuide-9.2/macaroons.shtml 

2504 match activity: 

2505 case ActivityCaveat.DOWNLOAD: 

2506 activity_caveat = "DOWNLOAD,LIST" 

2507 case ActivityCaveat.UPLOAD: 

2508 activity_caveat = "UPLOAD,LIST,DELETE,MANAGE" 

2509 

2510 # Retrieve a macaroon for the requested activities and duration 

2511 headers = {"Content-Type": "application/macaroon-request"} 

2512 body = { 

2513 "caveats": [ 

2514 f"activity:{activity_caveat}", 

2515 ], 

2516 "validity": f"PT{expiration_time_seconds}S", 

2517 } 

2518 resp = self._request("POST", url, headers=headers, body=json.dumps(body)) 

2519 if resp.status != HTTPStatus.OK: 

2520 raise ValueError( 

2521 f"Could not retrieve a macaroon for URL {resp.geturl()}, status: {resp.status} {resp.reason}" 

2522 ) 

2523 

2524 # We are expecting the body of the response to be formatted in JSON. 

2525 # dCache sets the 'Content-Type' of the response to 'application/json' 

2526 # but XRootD does not set any 'Content-Type' header 8-[ 

2527 # 

2528 # An example of a response body returned by dCache is shown below: 

2529 # { 

2530 # "macaroon": "MDA[...]Qo", 

2531 # "uri": { 

2532 # "targetWithMacaroon": "https://dcache.example.org/?authz=MD...", 

2533 # "baseWithMacaroon": "https://dcache.example.org/?authz=MD...", 

2534 # "target": "https://dcache.example.org/", 

2535 # "base": "https://dcache.example.org/" 

2536 # } 

2537 # } 

2538 # 

2539 # An example of a response body returned by XRootD is shown below: 

2540 # { 

2541 # "macaroon": "MDA[...]Qo", 

2542 # "expires_in": 86400 

2543 # } 

2544 try: 

2545 response_body = json.loads(resp.data.decode()) 

2546 except json.JSONDecodeError: 

2547 raise ValueError(f"Could not deserialize response to POST request for URL {resp.geturl()}") 

2548 

2549 if "macaroon" in response_body: 

2550 return response_body["macaroon"] 

2551 

2552 raise ValueError(f"Could not retrieve macaroon for URL {resp.geturl()}") 

2553 

2554 @override 

2555 def duplicate(self, source_url: str, destination_url: str, overwrite: bool = False) -> None: 

2556 """Copy the file at `source_url` to `destination_url` in the same 

2557 storage endpoint. 

2558 

2559 Parameters 

2560 ---------- 

2561 source_url : `str` 

2562 URL of the source file. 

2563 destination_url : `str` 

2564 URL of the destination file. Its parent directory must exist. 

2565 overwrite : `bool` 

2566 If True and a file exists at `destination_url` it will be 

2567 overwritten. Otherwise an exception is raised. 

2568 """ 

2569 # Check the source is a file 

2570 if self.is_dir(source_url): 

2571 raise NotImplementedError(f"copy is not implemented for directory {source_url}") 

2572 

2573 # Neither dCache nor XrootD currently implement the COPY 

2574 # webDAV method as documented in 

2575 # 

2576 # http://www.webdav.org/specs/rfc4918.html#METHOD_COPY 

2577 # 

2578 # (See issues DM-37603 and DM-37651 for details) 

2579 # With those servers use third-party copy instead. 

2580 return self._copy_via_third_party(source_url, destination_url, overwrite) 

2581 

2582 def _copy_via_third_party(self, source_url: str, destination_url: str, overwrite: bool = False) -> None: 

2583 """Copy the file at `source_url` to `destination_url` in the same 

2584 storage endpoint using the third-party copy functionality 

2585 implemented by dCache and XRootD servers. 

2586 

2587 Parameters 

2588 ---------- 

2589 source_url : `str` 

2590 URL of the source file. 

2591 destination_url : `str` 

2592 URL of the destination file. Its parent directory must exist. 

2593 overwrite : `bool` 

2594 If True and a file exists at `destination_url` it will be 

2595 overwritten. Otherwise an exception is raised. 

2596 """ 

2597 # To implement COPY we use dCache's third-party copy mechanism 

2598 # documented at: 

2599 # 

2600 # https://www.dcache.org/manuals/UserGuide-10.2/webdav.shtml#third-party-transfers 

2601 # 

2602 # The reason is that dCache does not correctly implement webDAV's COPY 

2603 # method. See https://github.com/dCache/dcache/issues/6950 

2604 

2605 # Create the destination's parent directory first because COPY may 

2606 # fail if it does not exist, depending on the server implementation 

2607 # of RFC 4918. 

2608 destination_parent = self._parent(destination_url) 

2609 self.mkcol(destination_parent) 

2610 

2611 # Retrieve a macaroon for downloading the source 

2612 download_macaroon = self._get_macaroon(source_url, ActivityCaveat.DOWNLOAD, 300) 

2613 

2614 # Prepare and send the COPY request 

2615 try: 

2616 headers = { 

2617 "Source": source_url, 

2618 "TransferHeaderAuthorization": f"Bearer {download_macaroon}", 

2619 "Credential": "none", 

2620 "Depth": "0", 

2621 "Overwrite": "T" if overwrite else "F", 

2622 "RequireChecksumVerification": "false", 

2623 } 

2624 resp = self._copy(destination_url, headers=headers, preload_content=False) 

2625 match resp.status: 

2626 case HTTPStatus.CREATED: 

2627 return 

2628 case HTTPStatus.ACCEPTED: 

2629 pass 

2630 case _: 

2631 raise ValueError( 

2632 f"Unable to copy resource {resp.geturl()}; status: {resp.status} {resp.reason}" 

2633 ) 

2634 

2635 # Analyse the response to the COPY request that the server has 

2636 # not completed yet. 

2637 content_type = resp.headers.get("Content-Type") 

2638 if content_type != "text/perf-marker-stream": 

2639 raise ValueError( 

2640 f"""Unexpected Content-Type {content_type} in response to COPY request from """ 

2641 f"""{source_url} to {destination_url}""" 

2642 ) 

2643 

2644 # Read the performance markers in the response body until we get 

2645 # a "success" or "failure" notification. 

2646 # 

2647 # Documentation: 

2648 # https://dcache.org/manuals/UserGuide-10.2/webdav.shtml#third-party-transfers 

2649 for marker in io.TextIOWrapper(resp): # type: ignore 

2650 marker = marker.rstrip("\n") 

2651 if marker == "": # EOF 

2652 raise ValueError( 

2653 f"""Copying file from {source_url} to {destination_url} failed: """ 

2654 """could not get response from server""" 

2655 ) 

2656 elif marker.startswith("failure:"): 

2657 raise ValueError( 

2658 f"""Copying file from {source_url} to {destination_url} failed with error: """ 

2659 f"""{marker}""" 

2660 ) 

2661 elif marker.startswith("success:"): 

2662 return 

2663 finally: 

2664 resp.drain_conn() 

2665 

2666 

2667class DavClientDCache(DavClientURLSigner): 

2668 """Client for interacting with a dCache webDAV server. 

2669 

2670 Instances of this class are thread-safe. 

2671 

2672 Parameters 

2673 ---------- 

2674 url : `str` 

2675 Root URL of the storage endpoint 

2676 (e.g. "https://host.example.org:1234/"). 

2677 config : `DavConfig` 

2678 Configuration to initialize this client. 

2679 accepts_ranges : `bool` | `None` 

2680 Indicate whether the remote server accepts the ``Range`` header in GET 

2681 requests. 

2682 """ 

2683 

2684 # Regular expression to parse dCache's response body of a successful 

2685 # PUT request. Such a response body is of the form: 

2686 # 

2687 # "104857600 bytes uploaded\r\n\r\n" 

2688 # 

2689 rex: re.Pattern = re.compile(r"^(\d*) bytes uploaded", re.IGNORECASE | re.ASCII) 

2690 

2691 def __init__(self, url: str, config: DavConfig, accepts_ranges: bool | None = None) -> None: 

2692 super().__init__(url=url, config=config, accepts_ranges=accepts_ranges) 

2693 

2694 # Create a specialized pool manager for sending requests to dCache 

2695 # webdav door, in particular for retrieving metadata. 

2696 # 

2697 # As of dCache v10.2.14, the webDAV door leaves the network connection 

2698 # unusable for us for sending subsequent requests after serving 

2699 # GET, PUT, DELETE, etc., but leaves the connection intact after 

2700 # serving MKCOL, MOVE and PROPFIND requests. 

2701 # We take advantage of that by using a dedicated pool manager for 

2702 # those requests, so that the network connections managed by that pool 

2703 # be reused. This avoids establishing the TCP+TLS connection for each 

2704 # request. 

2705 pool_manager = self._make_pool_manager(self._config) 

2706 self._propfind_pool_manager = pool_manager 

2707 self._move_pool_manager = pool_manager 

2708 self._mkcol_pool_manager = pool_manager 

2709 

2710 # dCache does not deliver macaroons when we are not using a secure 

2711 # channel to interact with the door. In that case, we can not use 

2712 # third party copy and dCache does not correctly support the COPY 

2713 # method as stated in RFC-4918. 

2714 self._can_duplicate = self._base_url.startswith("https://") 

2715 

2716 @override 

2717 def _mkcol( 

2718 self, 

2719 url: str, 

2720 headers: dict[str, str] | None = None, 

2721 pool_manager: PoolManager | None = None, 

2722 ) -> HTTPResponse: 

2723 """Inherits doc string.""" 

2724 return self._request("MKCOL", url=url, headers=headers, pool_manager=self._mkcol_pool_manager) 

2725 

2726 @override 

2727 def _move( 

2728 self, 

2729 url: str, 

2730 headers: dict[str, str] | None = None, 

2731 pool_manager: PoolManager | None = None, 

2732 ) -> HTTPResponse: 

2733 """Inherits doc string.""" 

2734 return self._request("MOVE", url=url, headers=headers, pool_manager=self._move_pool_manager) 

2735 

2736 @override 

2737 def _propfind( 

2738 self, 

2739 url: str, 

2740 headers: dict[str, str] | None = None, 

2741 body: str = "", 

2742 pool_manager: PoolManager | None = None, 

2743 ) -> HTTPResponse: 

2744 """Inherits doc string.""" 

2745 return self._request( 

2746 "PROPFIND", url=url, headers=headers, body=body, pool_manager=self._propfind_pool_manager 

2747 ) 

2748 

2749 @override 

2750 def put( 

2751 self, 

2752 url: str, 

2753 headers: dict[str, str] | None = None, 

2754 data: BinaryIO | bytes = b"", 

2755 ) -> int | None: 

2756 # Docstring inherited. 

2757 # Send a PUT request with empty body to the dCache frontend server to 

2758 # get redirected to the backend. 

2759 # 

2760 # Details: 

2761 # https://www.dcache.org/manuals/UserGuide-10.2/webdav.shtml#redirection 

2762 frontend_headers = {} if headers is None else dict(headers) 

2763 frontend_headers.update({"Content-Length": "0", "Expect": "100-continue"}) 

2764 if is_zero_length := isinstance(data, bytes) and len(data) == 0: 

2765 # We are uploading an empty file. Don't send the "Expect" header 

2766 # so that the dCache door handles this PUT request itself without 

2767 # redirecting us to a pool. 

2768 frontend_headers.pop("Expect") 

2769 

2770 resp = self._put(url, headers=frontend_headers, body=b"", redirect=False) 

2771 match resp.status: 

2772 case HTTPStatus.OK | HTTPStatus.CREATED | HTTPStatus.NO_CONTENT: 

2773 redirect_url = url 

2774 case status if status in resp.REDIRECT_STATUSES: 

2775 redirect_url = resp.headers.get("Location") 

2776 case _: 

2777 raise unexpected_status_error("PUT", url, resp) 

2778 

2779 # If we are uploading an empty file, there is nothing more to do. 

2780 if is_zero_length: 

2781 return 0 

2782 

2783 # We may have beend redirected to a backend server. Upload the file 

2784 # contents to its final destination. Explicitly ask the server to close 

2785 # this network connection after serving this PUT request to release 

2786 # the associated dCache mover. 

2787 backend_headers = {} if headers is None else dict(headers) 

2788 backend_headers.update({"Connection": "close"}) 

2789 

2790 # Ask dCache to compute and record a checksum of the uploaded 

2791 # file contents, for later integrity checks. Since we don't compute 

2792 # the digest ourselves while uploading the data, we cannot control 

2793 # after the request is complete that the data we uploaded is 

2794 # identical to the data recorded by the server, but at least the 

2795 # server has recorded a digest of the data it stored. 

2796 # 

2797 # See RFC-3230 for details and 

2798 # https://www.iana.org/assignments/http-dig-alg/http-dig-alg.xhtml 

2799 # for the list of supported digest algorithhms. 

2800 if (checksum := self._config.request_checksum) is not None: 

2801 backend_headers.update({"Want-Digest": checksum}) 

2802 

2803 resp = self._put(redirect_url, body=data, headers=backend_headers) 

2804 match resp.status: 

2805 case HTTPStatus.OK | HTTPStatus.CREATED | HTTPStatus.NO_CONTENT: 

2806 # Parse the response body and extract the number of bytes 

2807 # uploaded. This allows us to avoid sending a HEAD request 

2808 # to retrieve the file size. 

2809 response_body = resp.data.decode() 

2810 if match := DavClientDCache.rex.match(response_body): 

2811 return int(match.group(1)) 

2812 else: 

2813 return None 

2814 case _: 

2815 raise unexpected_status_error("PUT", redirect_url, resp) 

2816 

2817 @override 

2818 def download(self, url: str, filename: str, chunk_size: int) -> int: 

2819 """Download the content of a file and write it to local file. 

2820 

2821 Parameters 

2822 ---------- 

2823 url : `str` 

2824 Target URL. 

2825 filename : `str` 

2826 Local file to write the content to. If the file already exists, 

2827 it will be rewritten. 

2828 chunk_size : `int` 

2829 Size of the chunks to write to `filename`. 

2830 

2831 Returns 

2832 ------- 

2833 count: `int` 

2834 Number of bytes written to `filename`. 

2835 

2836 Notes 

2837 ----- 

2838 The caller must ensure that the resource at `url` is a file, not 

2839 a directory. 

2840 """ 

2841 # Send a GET request without following redirection to get redirected 

2842 # to the backend server. 

2843 _, resp = self.get(url, preload_content=False, redirect=False) 

2844 match resp.status: 

2845 case HTTPStatus.OK: 

2846 # We were not redirected. Consume this response. 

2847 return self._write_response_body_to_file(resp, filename, chunk_size) 

2848 case status if status not in resp.REDIRECT_STATUSES: 

2849 raise unexpected_status_error("GET", url, resp) 

2850 case _: 

2851 # We were redirected. Follow this redirection. 

2852 pass 

2853 

2854 # Drain and release the response we received from the frontend server 

2855 # so that the connection can be reused. 

2856 resp.drain_conn() 

2857 resp.release_conn() 

2858 

2859 # We were redirected to a backend server. Send a GET request to the 

2860 # backend server and ask it to close the HTTP connection to force 

2861 # closing the network connection. 

2862 redirect_url = resp.headers.get("Location") 

2863 _, resp = self.get(redirect_url, headers={"Connection": "close"}, preload_content=False) 

2864 match resp.status: 

2865 case HTTPStatus.OK: 

2866 return self._write_response_body_to_file(resp, filename, chunk_size) 

2867 case _: 

2868 raise unexpected_status_error("GET", redirect_url, resp) 

2869 

2870 @override 

2871 def read(self, url: str) -> tuple[str, bytes]: 

2872 """Download the contents of file located at `url`. 

2873 

2874 Parameters 

2875 ---------- 

2876 url : `str` 

2877 Target URL. 

2878 

2879 Returns 

2880 ------- 

2881 url: `str` 

2882 Backend URL from which the data was obtained. 

2883 data: `bytes` 

2884 Contents of the file. 

2885 

2886 Notes 

2887 ----- 

2888 The caller must ensure that the resource at `url` is a file, not 

2889 a directory. 

2890 """ 

2891 # Send a GET request without following redirection to get redirected 

2892 # to the backend server. 

2893 backend_url, resp = self.get(url, redirect=False) 

2894 match resp.status: 

2895 case HTTPStatus.OK: 

2896 return backend_url, resp.data 

2897 case status if status in resp.REDIRECT_STATUSES: 

2898 redirect_url = resp.headers.get("Location") 

2899 case _: 

2900 raise unexpected_status_error("GET", url, resp) 

2901 

2902 # We were redirected. Send a GET request to the backend server 

2903 # and ask it to close the HTTP connection to force closing the 

2904 # network connection. 

2905 final_url, resp = self.get(redirect_url, headers={"Connection": "close"}) 

2906 match resp.status: 

2907 case HTTPStatus.OK: 

2908 return final_url, resp.data 

2909 case _: 

2910 raise unexpected_status_error("GET", redirect_url, resp) 

2911 

2912 @override 

2913 def write(self, url: str, data: BinaryIO | bytes) -> int | None: 

2914 """Create or rewrite a remote file at `url` with `data` as its 

2915 contents. 

2916 

2917 Parameters 

2918 ---------- 

2919 url : `str` 

2920 Target URL. 

2921 data : `bytes` 

2922 Sequence of bytes to upload. 

2923 

2924 Returns 

2925 ------- 

2926 size : `int | None` 

2927 The size in bytes of the file uploaded. Can be `None` if the size 

2928 could not be retrieved. 

2929 

2930 Notes 

2931 ----- 

2932 If a file already exists at `url` it will be rewritten. 

2933 """ 

2934 # dCache will automatically create all the parent directories so we 

2935 # don't need to explicitly create them. Although this is not compliant 

2936 # to RFC 4918, this is advantageous because it avoids several 

2937 # round-trips to the server for creating all the directories 

2938 # before actually uploading the data. 

2939 try: 

2940 # Upload to a temporary file and rename to the final name. 

2941 temporary_url = self._make_temporary_url(url) 

2942 size = self.put(temporary_url, data=data) 

2943 self.rename(temporary_url, url, overwrite=True, create_parent=False) 

2944 

2945 # Update the file size cache with this size 

2946 self._file_size_cache.update_size(url, size) 

2947 return size 

2948 except Exception: 

2949 # Upload failed. Attempt to remove the temporary file. 

2950 self.delete(temporary_url) 

2951 raise 

2952 

2953 @override 

2954 def mkcol(self, url: str) -> None: 

2955 """Create a directory at `url`. 

2956 

2957 If a directory already exists at `url` no error is returned nor 

2958 exception is raised. An exception is raised if a file exists at `url`. 

2959 

2960 Parameters 

2961 ---------- 

2962 url : `str` 

2963 Target URL. 

2964 """ 

2965 # A "MKCOL" request to dCache does not automatically create all 

2966 # the intermediate directories if they do not exist. However, a 

2967 # "PUT" request of a file does create the directory hierarchy. 

2968 # 

2969 # We exploit that to create directory hierarchies: we first create an 

2970 # empty file with a random name and then we remove it. As a side 

2971 # effect, the target directory will be created. 

2972 # 

2973 # Creating a directory this way implies two requests to the server 

2974 # ("PUT" and "DELETE"), while using "MKCOL" would on average imply 

2975 # one request per inexisting directory in the hierarchy. When 

2976 # directory hierarchies are relatively deep, requiring two 

2977 # requests per hierarchy is better than sending a "MKCOL" request 

2978 # per directory in the hierarchy. 

2979 try: 

2980 temporary_url = self._make_temporary_url(url=f"{url}/mkcol") 

2981 self.put(temporary_url, data=b"") 

2982 finally: 

2983 self.delete(temporary_url) 

2984 

2985 @override 

2986 def info(self, url: str, name: str | None = None) -> dict[str, Any]: 

2987 # Docstring inherited. 

2988 result: dict[str, Any] = { 

2989 "name": name if name is not None else url, 

2990 "type": None, 

2991 "size": None, 

2992 "last_modified": datetime.min, 

2993 "checksums": {}, 

2994 } 

2995 

2996 # Request live DAV properties as well as the checksums that dCache 

2997 # recorded about this file. 

2998 body = ( 

2999 """<?xml version="1.0" encoding="utf-8"?>""" 

3000 """<D:propfind xmlns:D="DAV:" xmlns:dcache="http://www.dcache.org/2013/webdav">""" 

3001 """<D:prop>""" 

3002 """<D:resourcetype/>""" 

3003 """<D:getcontentlength/>""" 

3004 """<D:getlastmodified/>""" 

3005 """<D:displayname/>""" 

3006 """<dcache:Checksums/>""" 

3007 """</D:prop>""" 

3008 """</D:propfind>""" 

3009 ) 

3010 resp = self.propfind(url, body=body, depth="0") 

3011 match resp.status: 

3012 case HTTPStatus.NOT_FOUND: 

3013 return result 

3014 case HTTPStatus.MULTI_STATUS: 

3015 property = self._propfind_parser.parse(resp.data)[0] 

3016 metadata = DavFileMetadata.from_property(base_url=self._base_url, property=property) 

3017 result.update( 

3018 { 

3019 "type": "directory" if metadata.is_dir else "file", 

3020 "size": metadata.size, 

3021 "last_modified": metadata.last_modified, 

3022 "checksums": metadata.checksums, 

3023 } 

3024 ) 

3025 return result 

3026 case _: 

3027 raise unexpected_status_error("PROPFIND", url, resp) 

3028 

3029 

3030class DavClientXrootD(DavClientURLSigner): 

3031 """Client for interacting with a XrootD webDAV server. 

3032 

3033 Instances of this class are thread-safe. 

3034 

3035 Parameters 

3036 ---------- 

3037 url : `str` 

3038 Root URL of the storage endpoint 

3039 (e.g. "https://host.example.org:1234/"). 

3040 config : `DavConfig` 

3041 Configuration to initialize this client. 

3042 accepts_ranges : `bool` | `None` 

3043 Indicate whether the remote server accepts the ``Range`` header in GET 

3044 requests. 

3045 """ 

3046 

3047 def __init__(self, url: str, config: DavConfig, accepts_ranges: bool | None = None) -> None: 

3048 super().__init__(url=url, config=config, accepts_ranges=accepts_ranges) 

3049 

3050 @override 

3051 def put( 

3052 self, 

3053 url: str, 

3054 headers: dict[str, str] | None = None, 

3055 data: BinaryIO | bytes = b"", 

3056 ) -> int | None: 

3057 # Docstring inherited. 

3058 # Send a PUT request with empty body to the XRootD frontend server to 

3059 # get redirected to the backend. 

3060 frontend_headers = {} if headers is None else dict(headers) 

3061 frontend_headers.update({"Content-Length": "0", "Expect": "100-continue"}) 

3062 for attempt in range(max_attempts := 3): 

3063 resp = self._put(url, headers=frontend_headers, body=b"", redirect=False) 

3064 if resp.status in ( 

3065 HTTPStatus.OK, 

3066 HTTPStatus.CREATED, 

3067 HTTPStatus.NO_CONTENT, 

3068 ): 

3069 redirect_url = url 

3070 break 

3071 elif resp.status in resp.REDIRECT_STATUSES: 

3072 redirect_url = resp.headers.get("Location") 

3073 break 

3074 elif resp.status == HTTPStatus.LOCKED: 

3075 # Sometimes XRootD servers respond with status code LOCKED and 

3076 # response body of the form: 

3077 # 

3078 # "Output file /path/to/file is already opened by 1 writer; 

3079 # open denied." 

3080 # 

3081 # If we get such a response, try again, unless we reached 

3082 # the maximum number of attempts. 

3083 if attempt == max_attempts - 1: 

3084 raise ValueError( 

3085 f"""Unexpected response to HTTP request PUT {resp.geturl()}: status {resp.status} """ 

3086 f"""{resp.reason} [{resp.data.decode()}] after {max_attempts} attempts""" 

3087 ) 

3088 

3089 # Wait a bit and try again 

3090 log.warning( 

3091 f"""got unexpected response status {HTTPStatus.LOCKED} Locked for PUT {resp.geturl()} """ 

3092 f"""(attempt {attempt}/{max_attempts}), retrying...""" 

3093 ) 

3094 time.sleep((attempt + 1) * 0.100) 

3095 continue 

3096 else: 

3097 raise unexpected_status_error("PUT", url, resp) 

3098 

3099 # We were redirected to a backend server. Upload the file contents to 

3100 # its final destination. 

3101 

3102 # XRootD backend servers typically use a single port number for 

3103 # accepting connections from clients. It is therefore beneficial 

3104 # to keep those connections open, if the server allows. 

3105 

3106 # Ask the server to compute and record a checksum of the uploaded 

3107 # file contents, for later integrity checks. Since we don't compute 

3108 # the digest ourselves while uploading the data, we cannot control 

3109 # after the request is complete that the data we uploaded is 

3110 # identical to the data recorded by the server, but at least the 

3111 # server has recorded a digest of the data it stored. 

3112 # 

3113 # See RFC-3230 for details and 

3114 # https://www.iana.org/assignments/http-dig-alg/http-dig-alg.xhtml 

3115 # for the list of supported digest algorithhms. 

3116 # 

3117 # In addition, note that not all servers implement this RFC so 

3118 # the checksum reqquest may be ignored by the server. 

3119 backend_headers = {} if headers is None else dict(headers) 

3120 if (checksum := self._config.request_checksum) is not None: 

3121 backend_headers.update({"Want-Digest": checksum}) 

3122 

3123 resp = self._put(redirect_url, body=data, headers=backend_headers) 

3124 match resp.status: 

3125 case HTTPStatus.OK | HTTPStatus.CREATED | HTTPStatus.NO_CONTENT: 

3126 # Send a HEAD request to retrieve the size of the file we 

3127 # just uploaded. 

3128 resp = self.head(redirect_url) 

3129 size = int(resp.headers.get("Content-Length", -1)) 

3130 return None if size == -1 else size 

3131 case _: 

3132 raise unexpected_status_error("PUT", redirect_url, resp) 

3133 

3134 @override 

3135 def info(self, url: str, name: str | None = None) -> dict[str, Any]: 

3136 # XRootD does not include checksums in the response to PROPFIND 

3137 # request. We need to send a specific HEAD request to retrieve 

3138 # the ADLER32 checksum. 

3139 # 

3140 # If found, the checksum is included in the response header "Digest", 

3141 # which is of the form: 

3142 # 

3143 # Digest: adler32=0e4709f2 

3144 result = super().info(url, name) 

3145 if result["type"] == "file": 

3146 headers: dict[str, str] = {"Want-Digest": "adler32"} 

3147 resp = self.head(url=url, headers=headers) 

3148 if (digest := resp.headers.get("Digest")) is not None: 

3149 value = digest.split("=")[1] 

3150 result["checksums"].update({"adler32": value}) 

3151 

3152 return result 

3153 

3154 @override 

3155 def write(self, url: str, data: BinaryIO | bytes) -> int | None: 

3156 """Create or rewrite a remote file at `url` with `data` as its 

3157 contents. 

3158 

3159 Parameters 

3160 ---------- 

3161 url : `str` 

3162 Target URL. 

3163 data : `bytes` 

3164 Sequence of bytes to upload. 

3165 

3166 Returns 

3167 ------- 

3168 size : `int | None` 

3169 The size in bytes of the file uploaded. Can be `None` if the size 

3170 could not be retrieved. 

3171 

3172 Notes 

3173 ----- 

3174 If a file already exists at `url` it will be rewritten. 

3175 """ 

3176 # XRootD will automatically create all the parent directories so we 

3177 # don't need to explicitly create them. Although this is not compliant 

3178 # to RFC 4918, this is advantageous because it avoids several 

3179 # round-trips to the server for creating all the directories 

3180 # before actually uploading the data. 

3181 try: 

3182 # Upload to a temporary file and rename to the final name. 

3183 temporary_url = self._make_temporary_url(url) 

3184 size = self.put(temporary_url, data=data) 

3185 self.rename(temporary_url, url, overwrite=True, create_parent=False) 

3186 

3187 # Update the file size cache with this size 

3188 self._file_size_cache.update_size(url, size) 

3189 return size 

3190 except Exception: 

3191 # Upload failed. Attempt to remove the temporary file. 

3192 self.delete(temporary_url) 

3193 raise 

3194 

3195 @override 

3196 def mkcol(self, url: str) -> None: 

3197 """Create a directory at `url`. 

3198 

3199 If a directory already exists at `url` no error is returned nor 

3200 exception is raised. An exception is raised if a file exists at `url`. 

3201 

3202 Parameters 

3203 ---------- 

3204 url : `str` 

3205 Target URL. 

3206 """ 

3207 # XRootD automatically creates all the intermediate directories. 

3208 resp = self._mkcol(url) 

3209 match resp.status: 

3210 case HTTPStatus.CREATED: 

3211 return 

3212 case HTTPStatus.METHOD_NOT_ALLOWED: 

3213 # XRootD returns "405 Method Not Allowed" when either a file 

3214 # or a directory already exists at `url` 

3215 stat = self.stat(url) 

3216 if stat.is_dir: 

3217 # A directory exists at `url`. Nothing more to do. 

3218 return 

3219 elif stat.is_file: 

3220 raise NotADirectoryError( 

3221 f"Can not create a directory because a file already exists at {resp.geturl()}" 

3222 ) 

3223 case _: 

3224 raise ValueError( 

3225 f"Can not create directory {resp.geturl()}: status {resp.status} {resp.reason}" 

3226 ) 

3227 

3228 @override 

3229 def stat(self, url: str) -> DavFileMetadata: 

3230 # Docstring inherited. 

3231 # XRootD v5.9.1 responds "200 OK" to a HEAD request against an 

3232 # existing file. When the target URL is a directory, it also responds 

3233 # "200 OK". In both cases the response header "Content-Length" 

3234 # is present but has different meaning. If the target URL is a file, 

3235 # the header value is the size in bytes of the file. If the target 

3236 # URL is a directory, the header value is the number of items in 

3237 # the directory. 

3238 # 

3239 # So there is not an easy way to determine if the target URL is a 

3240 # file or a directory from the response to a HEAD request. 

3241 # 

3242 # When the target URL is a directory and we ask for a digest, the 

3243 # server responds "409 Conflict". We use this behavior to 

3244 # discriminate between a file and a directory. 

3245 # 

3246 # Note that XRootD does not include the "Last-Modified" header in the 

3247 # response to a HEAD request so we cannot include the last modified 

3248 # time in the value returned by this method. 

3249 resp = self._head(url, headers={"Want-Digest": "adler32"}) 

3250 match resp.status: 

3251 case HTTPStatus.OK: 

3252 # There is a file at target URL 

3253 if "Content-Length" in resp.headers: 

3254 href = url.replace(self._base_url, "", 1) 

3255 size = int(resp.headers.get("Content-Length")) 

3256 return DavFileMetadata(self._base_url, href=href, exists=True, is_dir=False, size=size) 

3257 else: 

3258 raise ValueError( 

3259 f"""Expecting Content-Length header to be present in """ 

3260 f"""response to HTTP HEAD {resp.geturl()}: status {resp.status} """ 

3261 f"""{resp.reason} [{resp.data.decode()}] but could not find it""" 

3262 ) 

3263 case HTTPStatus.CONFLICT: 

3264 # There is a directory at target URL 

3265 href = url.replace(self._base_url, "", 1) 

3266 return DavFileMetadata(self._base_url, href=href, exists=True, is_dir=True) 

3267 case HTTPStatus.NOT_FOUND: 

3268 # There is neither a file nor a directory at target URL 

3269 return DavFileMetadata(base_url=url, exists=False) 

3270 case _: 

3271 raise unexpected_status_error("HEAD", url, resp) 

3272 

3273 

3274class DavFileMetadata: 

3275 """Container for attributes of interest of a webDAV file or directory. 

3276 

3277 Parameters 

3278 ---------- 

3279 base_url : `str` 

3280 Base URL. 

3281 href : `str`, optional 

3282 Path component that can be added to the base URL. 

3283 name : `str`, optional 

3284 Name. 

3285 exists : `bool`, optional 

3286 Whether file or directory exist. 

3287 size : `int`, optional 

3288 Size of file. 

3289 is_dir : `bool`, optional 

3290 Whether the URL points to a directory or file. 

3291 last_modified : `bool`, optional 

3292 Last modified date. 

3293 checksums : `dict` [ `str`, `str` ] | `None`, optional 

3294 Checksums. 

3295 """ 

3296 

3297 def __init__( 

3298 self, 

3299 base_url: str, 

3300 href: str = "", 

3301 name: str = "", 

3302 exists: bool = False, 

3303 size: int = -1, 

3304 is_dir: bool = False, 

3305 last_modified: datetime = datetime.min, 

3306 checksums: dict[str, str] | None = None, 

3307 ): 

3308 self._url: str = base_url if not href else base_url.rstrip("/") + href 

3309 self._href: str = href 

3310 self._name: str = name 

3311 self._exists: bool = exists 

3312 self._size: int = size 

3313 self._is_dir: bool = is_dir 

3314 self._last_modified: datetime = last_modified 

3315 self._checksums: dict[str, str] = {} if checksums is None else dict(checksums) 

3316 

3317 @staticmethod 

3318 def from_property(base_url: str, property: DavProperty) -> DavFileMetadata: 

3319 """Create an instance from the values in `property`. 

3320 

3321 Parameters 

3322 ---------- 

3323 base_url : `str` 

3324 Base URL. 

3325 property : `DavProperty` 

3326 Properties to associate with URL. 

3327 """ 

3328 return DavFileMetadata( 

3329 base_url=base_url, 

3330 href=property.href, 

3331 name=property.name, 

3332 exists=property.exists, 

3333 size=property.size, 

3334 is_dir=property.is_dir, 

3335 last_modified=property.last_modified, 

3336 checksums=dict(property.checksums), 

3337 ) 

3338 

3339 def __str__(self) -> str: 

3340 return ( 

3341 f"""{self._url} {self._href} {self._name} {self._exists} {self._size} {self._is_dir} """ 

3342 f"""{self._checksums}""" 

3343 ) 

3344 

3345 @property 

3346 def url(self) -> str: 

3347 return self._url 

3348 

3349 @property 

3350 def href(self) -> str: 

3351 return self._href 

3352 

3353 @property 

3354 def name(self) -> str: 

3355 return self._name 

3356 

3357 @property 

3358 def exists(self) -> bool: 

3359 return self._exists 

3360 

3361 @property 

3362 def size(self) -> int: 

3363 if not self._exists: 

3364 return -1 

3365 

3366 return 0 if self._is_dir else self._size 

3367 

3368 @property 

3369 def is_dir(self) -> bool: 

3370 return self._exists and self._is_dir 

3371 

3372 @property 

3373 def is_file(self) -> bool: 

3374 return self._exists and not self._is_dir 

3375 

3376 @property 

3377 def last_modified(self) -> datetime: 

3378 return self._last_modified 

3379 

3380 @property 

3381 def checksums(self) -> dict[str, str]: 

3382 return self._checksums 

3383 

3384 

3385class DavProperty: 

3386 """Helper class to encapsulate select live DAV properties of a single 

3387 resource, as retrieved via a PROPFIND request. 

3388 

3389 Parameters 

3390 ---------- 

3391 response : `eTree.Element` or `None` 

3392 The XML response defining the DAV property. 

3393 """ 

3394 

3395 # Regular expression to compare against the 'status' element of a 

3396 # PROPFIND response's 'propstat' element. 

3397 _status_ok_rex = re.compile(r"^HTTP/.* 200 .*$", re.IGNORECASE) 

3398 

3399 def __init__(self, response: eTree.Element | None): 

3400 self._href: str = "" 

3401 self._displayname: str = "" 

3402 self._collection: bool = False 

3403 self._getlastmodified: str = "" 

3404 self._getcontentlength: int = -1 

3405 self._checksums: dict[str, str] = {} 

3406 

3407 if response is not None: 

3408 self._parse(response) 

3409 

3410 def _parse(self, response: eTree.Element) -> None: 

3411 # Extract 'href'. 

3412 if (element := response.find("./{DAV:}href")) is not None: 

3413 # We need to use "str(element.text)"" instead of "element.text" to 

3414 # keep mypy happy. 

3415 self._href = str(element.text).strip() 

3416 else: 

3417 raise ValueError( 

3418 "Property 'href' expected but not found in PROPFIND response: " 

3419 f"{eTree.tostring(response, encoding='unicode')}" 

3420 ) 

3421 

3422 for propstat in response.findall("./{DAV:}propstat"): 

3423 # Only extract properties of interest with status OK. 

3424 status = propstat.find("./{DAV:}status") 

3425 if status is None or not self._status_ok_rex.match(str(status.text)): 

3426 continue 

3427 

3428 for prop in propstat.findall("./{DAV:}prop"): 

3429 # Parse "collection". 

3430 if (element := prop.find("./{DAV:}resourcetype/{DAV:}collection")) is not None: 

3431 self._collection = True 

3432 

3433 # Parse "getlastmodified". 

3434 if (element := prop.find("./{DAV:}getlastmodified")) is not None: 

3435 self._getlastmodified = str(element.text) 

3436 

3437 # Parse "getcontentlength". 

3438 if (element := prop.find("./{DAV:}getcontentlength")) is not None: 

3439 self._getcontentlength = int(str(element.text)) 

3440 

3441 # Parse "displayname". 

3442 if (element := prop.find("./{DAV:}displayname")) is not None: 

3443 self._displayname = str(element.text) 

3444 

3445 # Parse "Checksums" 

3446 if (element := prop.find("./{http://www.dcache.org/2013/webdav}Checksums")) is not None: 

3447 self._checksums = self._parse_checksums(element.text) 

3448 

3449 # Some webDAV servers don't include the 'displayname' property in the 

3450 # response so try to infer it from the value of the 'href' property. 

3451 # Depending on the server the href value may end with '/'. 

3452 if not self._displayname: 

3453 self._displayname = os.path.basename(self._href.rstrip("/")) 

3454 

3455 # Some webDAV servers do not append a "/" to the href of directories. 

3456 # Ensure we include a single final "/" in our response. 

3457 if self._collection: 

3458 self._href = self._href.rstrip("/") + "/" 

3459 

3460 # Force a size of 0 for collections. 

3461 if self._collection: 

3462 self._getcontentlength = 0 

3463 

3464 def _parse_checksums(self, checksums: str | None) -> dict[str, str]: 

3465 # checksums argument is of the form 

3466 # md5=MyS/wljSzI9WYiyrsuyoxw==,adler32=23b104f2 

3467 result: dict[str, str] = {} 

3468 if checksums is not None: 

3469 for checksum in checksums.split(","): 

3470 if (pos := checksum.find("=")) != -1: 

3471 algorithm, value = (checksum[:pos].lower(), checksum[pos + 1 :]) 

3472 if algorithm == "md5": 

3473 # dCache documentation about how it encodes the 

3474 # MD5 checksum: 

3475 # 

3476 # https://www.dcache.org/manuals/UserGuide-10.2/webdav.shtml#checksums 

3477 result[algorithm] = bytes.hex(base64.standard_b64decode(value)) 

3478 else: 

3479 result[algorithm] = value 

3480 

3481 return result 

3482 

3483 @property 

3484 def exists(self) -> bool: 

3485 # It is either a directory or a file with length of at least zero 

3486 return self._collection or self._getcontentlength >= 0 

3487 

3488 @property 

3489 def is_dir(self) -> bool: 

3490 return self._collection 

3491 

3492 @property 

3493 def is_file(self) -> bool: 

3494 return not self._collection 

3495 

3496 @property 

3497 def last_modified(self) -> datetime: 

3498 if not self._getlastmodified: 

3499 return datetime.min 

3500 

3501 # Last modified timestamp is of the form: 

3502 # 'Wed, 12 Mar 2025 10:11:13 GMT' 

3503 return datetime.strptime(self._getlastmodified, "%a, %d %b %Y %H:%M:%S %Z").replace(tzinfo=UTC) 

3504 

3505 @property 

3506 def size(self) -> int: 

3507 return self._getcontentlength 

3508 

3509 @property 

3510 def name(self) -> str: 

3511 return self._displayname 

3512 

3513 @property 

3514 def href(self) -> str: 

3515 return self._href 

3516 

3517 @property 

3518 def checksums(self) -> dict[str, str]: 

3519 return self._checksums 

3520 

3521 

3522class DavPropfindParser: 

3523 """Helper class to parse the response body of a PROPFIND request.""" 

3524 

3525 def __init__(self) -> None: 

3526 return 

3527 

3528 def parse(self, body: bytes) -> list[DavProperty]: 

3529 """Parse the XML-encoded contents of the response body to a webDAV 

3530 PROPFIND request. 

3531 

3532 Parameters 

3533 ---------- 

3534 body : `bytes` 

3535 XML-encoded response body to a PROPFIND request. 

3536 

3537 Returns 

3538 ------- 

3539 responses : `list` [ `DavProperty` ] 

3540 Parsed content of the response. 

3541 

3542 Notes 

3543 ----- 

3544 Is is expected that there is at least one reponse in `body`, otherwise 

3545 this function raises. 

3546 """ 

3547 # A response body to a PROPFIND request is of the form (indented for 

3548 # readability): 

3549 # 

3550 # <?xml version="1.0" encoding="UTF-8"?> 

3551 # <D:multistatus xmlns:D="DAV:"> 

3552 # <D:response> 

3553 # <D:href>path/to/resource</D:href> 

3554 # <D:propstat> 

3555 # <D:prop> 

3556 # <D:resourcetype> 

3557 # <D:collection xmlns:D="DAV:"/> 

3558 # </D:resourcetype> 

3559 # <D:getlastmodified> 

3560 # Fri, 27 Jan 2 023 13:59:01 GMT 

3561 # </D:getlastmodified> 

3562 # <D:getcontentlength> 

3563 # 12345 

3564 # </D:getcontentlength> 

3565 # </D:prop> 

3566 # <D:status> 

3567 # HTTP/1.1 200 OK 

3568 # </D:status> 

3569 # </D:propstat> 

3570 # </D:response> 

3571 # <D:response> 

3572 # ... 

3573 # </D:response> 

3574 # <D:response> 

3575 # ... 

3576 # </D:response> 

3577 # </D:multistatus> 

3578 

3579 # Scan all the 'response' elements and extract the relevant properties 

3580 decoded_body: str = body.decode().strip() 

3581 responses = [] 

3582 multistatus = eTree.fromstring(decoded_body) 

3583 for response in multistatus.findall("./{DAV:}response"): 

3584 responses.append(DavProperty(response)) 

3585 

3586 if responses: 

3587 return responses 

3588 else: 

3589 # Could not parse the body 

3590 raise ValueError(f"Unable to parse response for PROPFIND request: {decoded_body}") 

3591 

3592 

3593class Authorizer: 

3594 """Base class for attaching an 'Authorization' header to a HTTP request.""" 

3595 

3596 def set_authorization(self, headers: dict[str, str]) -> None: 

3597 """Add the 'Authorization' header to `headers`. 

3598 

3599 Parameters 

3600 ---------- 

3601 headers : `dict` [ `str`, `str` ] 

3602 Dict to augment with authorization information. 

3603 

3604 Notes 

3605 ----- 

3606 This method must be implemented by concrete subclasses. 

3607 """ 

3608 raise NotImplementedError 

3609 

3610 def _is_file_protected(self, filepath: str) -> bool: 

3611 """Return true if the permissions of file at `filepath` only allow for 

3612 access by its owner. 

3613 

3614 Parameters 

3615 ---------- 

3616 filepath : `str` 

3617 Path of a local file. 

3618 """ 

3619 if not os.path.isfile(filepath): 

3620 return False 

3621 

3622 mode = stat.S_IMODE(os.stat(filepath).st_mode) 

3623 owner_accessible = bool(mode & stat.S_IRWXU) 

3624 group_accessible = bool(mode & stat.S_IRWXG) 

3625 other_accessible = bool(mode & stat.S_IRWXO) 

3626 return owner_accessible and not group_accessible and not other_accessible 

3627 

3628 def _read_if_modified_since( 

3629 self, filename: str | None, timestamp: float 

3630 ) -> tuple[str, float] | tuple[None, None]: 

3631 """Read local file `filename` if its modification time is more 

3632 recent than `timestamp`. 

3633 

3634 Parameters 

3635 ---------- 

3636 filename : `str`, optional 

3637 Path of a local file. 

3638 

3639 timestamp: `float`, optional 

3640 Timestamp to compare against the last modification time of 

3641 `filename`. The contents of file at `filename` is only read if its 

3642 modification time is more recent than `timestamp`. 

3643 

3644 Returns 

3645 ------- 

3646 result: `tuple[str, float]` 

3647 tuple of (contents of file `filename`, timestamp of the read 

3648 operation). 

3649 

3650 If `filename` is `None`, the returned value is `tuple[None, None]`. 

3651 """ 

3652 if filename is None: 

3653 return (None, None) 

3654 

3655 if os.stat(filename).st_mtime < timestamp: 

3656 return (None, None) 

3657 

3658 with open(filename) as file: 

3659 time_of_last_read = time.time() 

3660 return (file.read().rstrip("\n"), time_of_last_read) 

3661 

3662 

3663class TokenAuthorizer(Authorizer): 

3664 """Attach a bearer token 'Authorization' header to each request. 

3665 

3666 Parameters 

3667 ---------- 

3668 token : `str` 

3669 Can be either the path to a local file which contains the 

3670 value of the token or the token itself. If `token` is a file 

3671 it must be protected so that only the owner can read and write it. 

3672 """ 

3673 

3674 def __init__(self, token: str | None = None) -> None: 

3675 self._token = self._token_path = None 

3676 self._time_of_last_read: float = -1.0 

3677 if token is None: 

3678 return 

3679 

3680 self._token = token 

3681 if os.path.isfile(token): 

3682 self._token_path = os.path.abspath(token) 

3683 if not self._is_file_protected(self._token_path): 

3684 raise PermissionError( 

3685 f"""Authorization token file at {self._token_path} must be protected for access only """ 

3686 """by its owner""" 

3687 ) 

3688 self._update_token() 

3689 

3690 def _update_token(self) -> None: 

3691 """Read the token file (if any) if its modification time is more recent 

3692 than the last time we read it. 

3693 """ 

3694 if self._token_path is None: 

3695 return None 

3696 

3697 token, time_of_last_read = self._read_if_modified_since(self._token_path, self._time_of_last_read) 

3698 if token is None or time_of_last_read is None: 

3699 return 

3700 

3701 # Update the token value and the last time we read it. 

3702 self._token = token 

3703 self._time_of_last_read = time_of_last_read 

3704 

3705 @override 

3706 def set_authorization(self, headers: dict[str, str]) -> None: 

3707 """Add the 'Authorization' header to `headers`. 

3708 

3709 Parameters 

3710 ---------- 

3711 headers : `dict` [ `str`, `str` ] 

3712 Dict to augment with authorization information. 

3713 """ 

3714 if self._token is None: 

3715 return 

3716 

3717 self._update_token() 

3718 headers["Authorization"] = f"Bearer {self._token}" 

3719 

3720 

3721class BasicAuthorizer(Authorizer): 

3722 """Attach a 'Authorization' header to each request using Basic 

3723 authentication. 

3724 

3725 Parameters 

3726 ---------- 

3727 user_name : `str` 

3728 Can be either the path to a local file which contains the 

3729 user name or the user name itself. If `user_name` is a file 

3730 it must be protected so that only the owner can read and write it. 

3731 user_password : `str` 

3732 Can be either the path to a local file which contains the 

3733 value of the password or the password itself. If `user_password` is a 

3734 file it must be protected so that only the owner can read and write it. 

3735 """ 

3736 

3737 def __init__(self, user_name: str | None = None, user_password: str | None = None) -> None: 

3738 if user_name is None or user_password is None: 

3739 return 

3740 

3741 self._user_name: str | None = user_name 

3742 self._user_password: str | None = user_password 

3743 self._user_password_path: str | None = None 

3744 self._time_of_last_read: float = -1.0 

3745 self._header_value: str = "" 

3746 

3747 if os.path.isfile(self._user_password): 

3748 # The value in `user_password` is the path to a file. Check 

3749 # the file is protected and read its contents. 

3750 self._user_password_path = os.path.abspath(self._user_password) 

3751 if not self._is_file_protected(self._user_password_path): 

3752 raise PermissionError( 

3753 f"""Password file at {self._user_password_path} must be protected for access only """ 

3754 """by its owner""" 

3755 ) 

3756 self._update_password() 

3757 else: 

3758 self._update_header_value() 

3759 

3760 def _update_header_value(self) -> None: 

3761 """Compute the value of the 'Authorization' header using HTTP basic 

3762 authorization. 

3763 """ 

3764 basic_auth_header = make_headers(basic_auth=f"{self._user_name}:{self._user_password}") 

3765 self._header_value = basic_auth_header["authorization"] 

3766 

3767 def _update_password(self) -> None: 

3768 """Update the password of this authorizer if the file it is stored in 

3769 has been modified since the last time we read it. 

3770 """ 

3771 if self._user_password_path is None: 

3772 return None 

3773 

3774 password, time_of_last_read = self._read_if_modified_since( 

3775 self._user_password_path, self._time_of_last_read 

3776 ) 

3777 if password is None or time_of_last_read is None: 

3778 return 

3779 

3780 # Update the password, the last time we read it and re-compute the 

3781 # value of the "Authorization" header. 

3782 self._user_password = password 

3783 self._time_of_last_read = time_of_last_read 

3784 self._update_header_value() 

3785 

3786 @override 

3787 def set_authorization(self, headers: dict[str, str]) -> None: 

3788 """Add the 'Authorization' header to `headers`. 

3789 

3790 Parameters 

3791 ---------- 

3792 headers : `dict` [ `str`, `str` ] 

3793 Dict to augment with authorization information. 

3794 """ 

3795 if self._user_name is None or self._user_password is None: 

3796 return 

3797 

3798 self._update_password() 

3799 headers["Authorization"] = self._header_value 

3800 

3801 

3802def expand_vars(path: str | None) -> str | None: 

3803 """Expand the environment variables in `path` and return the path with 

3804 the value of the variable expanded. 

3805 

3806 Parameters 

3807 ---------- 

3808 path : `str` or `None` 

3809 Abolute or relative path which may include an environment variable 

3810 (e.g. '$HOME/path/to/my/file'). 

3811 

3812 Returns 

3813 ------- 

3814 path: `str` 

3815 The path with the values of the environment variables expanded. 

3816 """ 

3817 return None if path is None else os.path.expandvars(path) 

3818 

3819 

3820def dump_response(method: str, resp: HTTPResponse, dump_body: bool = False) -> None: 

3821 """Dump response for debugging purposes. 

3822 

3823 Parameters 

3824 ---------- 

3825 method : `str` 

3826 Method name to include in log output. 

3827 resp : `HTTPResponse` 

3828 Response to dump. 

3829 dump_body : `bool`, optional 

3830 Whether or not to issue a debug log message. 

3831 """ 

3832 log.debug("%s %s", method, resp.geturl()) 

3833 log.debug(" %s %s", resp.status, resp.reason) 

3834 

3835 for header, value in resp.headers.items(): 

3836 log.debug(" %s: %s", header, value) 

3837 

3838 if dump_body: 

3839 log.debug(" response body length: %d", len(resp.data.decode()))