Coverage for python/lsst/scarlet/lite/operators.py: 13%

162 statements  

« prev     ^ index     » next       coverage.py v7.14.1, created at 2026-05-30 08:24 +0000

1from __future__ import annotations 

2 

3from typing import Any, Callable, Sequence, cast 

4 

5import numpy as np 

6import numpy.typing as npt 

7from lsst.scarlet.lite.detect_pybind11 import get_connected_pixels # type: ignore 

8from lsst.scarlet.lite.operators_pybind11 import new_monotonicity # type: ignore 

9 

10from .bbox import Box 

11 

12 

13def prox_connected(morph: np.ndarray, centers: Sequence[Sequence[int]]) -> np.ndarray: 

14 """Remove all pixels not connected to the center of a source. 

15 

16 Parameters 

17 ---------- 

18 morph: 

19 The morphology that is being constrained. 

20 centers: 

21 The `(cy, cx)` center of any sources that all pixels must be 

22 connected to. 

23 

24 Returns 

25 ------- 

26 result: 

27 The morphology with all pixels that are not connected to a center 

28 postion set to zero. 

29 """ 

30 result = np.zeros(morph.shape, dtype=bool) 

31 

32 for center in centers: 

33 unchecked = np.ones(morph.shape, dtype=bool) 

34 cy, cx = center 

35 cy = int(cy) 

36 cx = int(cx) 

37 bounds = np.array([cy, cy, cx, cx]).astype(np.int32) 

38 # Update the result in place with the pixels connected to this center 

39 get_connected_pixels(cy, cx, morph, unchecked, result, bounds, 0) 

40 

41 return result * morph 

42 

43 

44class Monotonicity: 

45 """Class to implement Monotonicity 

46 

47 Callable class that applies monotonicity as a pseudo proximal 

48 operator (actually a projection operator) to *a* radially 

49 monotonic solution. 

50 

51 Notes 

52 ----- 

53 This differs from monotonicity in the main scarlet branch because 

54 this stores a single monotonicity operator to set the weights for all 

55 of the pixels up to the size of the largest shape expected, 

56 and only needs to be created once _per blend_, as opposed to 

57 once _per source_.. 

58 This class is then called with the source morphology 

59 to make monotonic and the location of the "center" of the image, 

60 and the full weight matrix is sliced accordingly. 

61 

62 Parameters 

63 ---------- 

64 shape: 

65 The shape of the full operator. 

66 This must be larger than the largest possible object size 

67 in the blend. 

68 dtype: 

69 The numpy ``dtype`` of the output image. 

70 auto_update: 

71 If ``True`` the operator will update its shape if a image is 

72 too big to fit in the current operator. 

73 fit_radius: 

74 Pixels within `fit_radius` of the center of the array to make 

75 monotonic are checked to see if they have more flux than the center 

76 pixel. If they do, the pixel with larger flux is used as the center. 

77 """ 

78 

79 def __init__( 

80 self, 

81 shape: tuple[int, int], 

82 dtype: npt.DTypeLike = float, 

83 auto_update: bool = True, 

84 fit_radius: int = 1, 

85 ): 

86 # Initialize defined variables 

87 self.weights: np.ndarray | None = None 

88 self.distance: np.ndarray | None = None 

89 self.sizes: tuple[int, int, int, int] | None = None 

90 self.dtype = dtype 

91 self.auto_update = auto_update 

92 self.fit_radius = fit_radius 

93 self.update(shape) 

94 

95 @property 

96 def shape(self) -> tuple[int, int]: 

97 """The 2D shape of the largest component that can be made monotonic 

98 

99 Returns 

100 ------- 

101 result: 

102 The shape of the oeprator. 

103 """ 

104 return cast(tuple[int, int], cast(np.ndarray, self.weights).shape[1:]) 

105 

106 @property 

107 def center(self) -> tuple[int, int]: 

108 """The center of the full operator 

109 

110 Returns 

111 ------- 

112 result: 

113 The center of the full operator. 

114 """ 

115 shape = self.shape 

116 cx = (shape[1] - 1) // 2 

117 cy = (shape[0] - 1) // 2 

118 return cy, cx 

119 

120 def update(self, shape: tuple[int, int]): 

121 """Update the operator with a new shape 

122 

123 Parameters 

124 ---------- 

125 shape: 

126 The new shape 

127 """ 

128 if len(shape) != 2: 

129 msg = f"Monotonicity is a 2D operator but received shape with {len(shape)} dimensions" 

130 raise ValueError(msg) 

131 if shape[0] % 2 == 0 or shape[1] % 2 == 0: 

132 raise ValueError(f"The shape must be odd, got {shape}") 

133 # Use the center of the operator as the center 

134 # and calculate the distance to each pixel from the center 

135 cx = (shape[1] - 1) // 2 

136 cy = (shape[0] - 1) // 2 

137 _x = np.arange(shape[1], dtype=self.dtype) - cx 

138 _y = np.arange(shape[0], dtype=self.dtype) - cy 

139 x, y = np.meshgrid(_x, _y) 

140 distance = np.sqrt(x**2 + y**2) 

141 

142 # Calculate the distance from each pixel to its 8 nearest neighbors 

143 neighbor_dist = np.zeros((9,) + distance.shape, dtype=self.dtype) 

144 neighbor_dist[0, 1:, 1:] = distance[1:, 1:] - distance[:-1, :-1] 

145 neighbor_dist[1, 1:, :] = distance[1:, :] - distance[:-1, :] 

146 neighbor_dist[2, 1:, :-1] = distance[1:, :-1] - distance[:-1, 1:] 

147 neighbor_dist[3, :, 1:] = distance[:, 1:] - distance[:, :-1] 

148 

149 # For the center pixel, set the distance to 1 just so that it is 

150 # non-zero 

151 neighbor_dist[4, cy, cx] = 1 

152 neighbor_dist[5, :, :-1] = distance[:, :-1] - distance[:, 1:] 

153 neighbor_dist[6, :-1, 1:] = distance[:-1, 1:] - distance[1:, :-1] 

154 neighbor_dist[7, :-1, :] = distance[:-1, :] - distance[1:, :] 

155 neighbor_dist[8, :-1, :-1] = distance[:-1, :-1] - distance[1:, 1:] 

156 

157 # Calculate the difference in angle to the center 

158 # from each pixel to its 8 nearest neighbors 

159 angles = np.arctan2(y, x) 

160 angle_diff = np.zeros((9,) + angles.shape, dtype=self.dtype) 

161 angle_diff[0, 1:, 1:] = angles[1:, 1:] - angles[:-1, :-1] 

162 angle_diff[1, 1:, :] = angles[1:, :] - angles[:-1, :] 

163 angle_diff[2, 1:, :-1] = angles[1:, :-1] - angles[:-1, 1:] 

164 angle_diff[3, :, 1:] = angles[:, 1:] - angles[:, :-1] 

165 # For the center pixel, on the center will have a non-zero cosine, 

166 # which is used as the weight. 

167 angle_diff[4] = 1 

168 angle_diff[4, cy, cx] = 0 

169 angle_diff[5, :, :-1] = angles[:, :-1] - angles[:, 1:] 

170 angle_diff[6, :-1, 1:] = angles[:-1, 1:] - angles[1:, :-1] 

171 angle_diff[7, :-1, :] = angles[:-1, :] - angles[1:, :] 

172 angle_diff[8, :-1, :-1] = angles[:-1, :-1] - angles[1:, 1:] 

173 

174 # Use cos(theta) to set the weights, then normalize 

175 # This gives more weight to neighboring pixels that are more closely 

176 # aligned with the vector pointing toward the center. 

177 weights = np.cos(angle_diff) 

178 weights[neighbor_dist <= 0] = 0 

179 # Adjust for the discontinuity at theta = 2pi 

180 weights[weights < 0] = -weights[weights < 0] 

181 weights = weights / np.sum(weights, axis=0)[None, :, :] 

182 

183 # Store the parameters needed later 

184 self.weights = weights 

185 self.distance = distance 

186 self.sizes = (cy, cx, shape[0] - cy, shape[1] - cx) 

187 

188 def check_size(self, shape: tuple[int, int], center: tuple[int, int], update: bool = True): 

189 """Check to see if the operator can be applied 

190 

191 Parameters 

192 ---------- 

193 shape: 

194 The shape of the image to apply monotonicity. 

195 center: 

196 The location (in `shape`) of the point where the monotonicity will 

197 be taken from. 

198 update: 

199 When ``True`` the operator will update itself so that an image 

200 with shape `shape` can be made monotonic about the `center`. 

201 

202 Raises 

203 ------ 

204 ValueError: 

205 Raised when an array with shape `shape` does not fit in the 

206 current operator and `update` is `False`. 

207 """ 

208 sizes = np.array(tuple(center) + (shape[0] - center[0], shape[1] - center[1])) 

209 if np.any(sizes > self.sizes): 

210 if update: 

211 size = 2 * np.max(sizes) + 1 

212 self.update((size, size)) 

213 else: 

214 raise ValueError(f"Cannot apply monotonicity to image with shape {shape} at {center}") 

215 

216 def __call__(self, image: np.ndarray, center: tuple[int, int]) -> np.ndarray: 

217 """Make an input image monotonic about a center pixel 

218 

219 Parameters 

220 ---------- 

221 image: 

222 The image to make monotonic. 

223 center: 

224 The ``(y, x)`` location _in image coordinates_ to make the 

225 center of the monotonic region. 

226 

227 Returns 

228 ------- 

229 result: 

230 The input image is updated in place, but also returned from this 

231 method. 

232 """ 

233 # Check for a better center 

234 center = get_peak(image, center, self.fit_radius) 

235 

236 # Check that the operator can fit the image 

237 self.check_size(cast(tuple[int, int], image.shape), center, self.auto_update) 

238 

239 # Create the bounding box to slice the weights and distance as needed 

240 cy, cx = self.center 

241 py, px = center 

242 bbox = Box((9,) + image.shape, origin=(0, cy - py, cx - px)) 

243 weights = cast(np.ndarray, self.weights)[bbox.slices] 

244 indices = np.argsort(cast(np.ndarray, self.distance)[bbox.slices[1:]].flatten()) 

245 coords = np.unravel_index(indices, image.shape) 

246 

247 # Pad the image by 1 so that we don't have to worry about 

248 # weights on the edges. 

249 result_shape = (image.shape[0] + 2, image.shape[1] + 2) 

250 result = np.zeros(result_shape, dtype=image.dtype) 

251 result[1:-1, 1:-1] = image 

252 new_monotonicity(coords[0], coords[1], weights, result) 

253 image[:] = result[1:-1, 1:-1] 

254 return image 

255 

256 def __copy__(self) -> Monotonicity: 

257 """Create a shallow copy of the operator 

258 

259 Returns 

260 ------- 

261 result: 

262 A copy of the operator. 

263 """ 

264 new = Monotonicity(self.shape, self.dtype, self.auto_update, self.fit_radius) 

265 return new 

266 

267 def __deepcopy__(self, memo: dict[int, Any] | None = None) -> Monotonicity: 

268 """Create a deep copy of the operator 

269 

270 Parameters 

271 ---------- 

272 memo: 

273 The memoization dictionary for deep copies. 

274 

275 Returns 

276 ------- 

277 result: 

278 A copy of the operator. 

279 """ 

280 return self.__copy__() 

281 

282 

283def get_peak(image: np.ndarray, center: tuple[int, int], radius: int = 1) -> tuple[int, int]: 

284 """Search around a location for the maximum flux 

285 

286 For monotonicity it is important to start at the brightest pixel 

287 in the center of the source. This may be off by a pixel or two, 

288 so we search for the correct center before applying 

289 monotonic_tree. 

290 

291 Parameters 

292 ---------- 

293 image: 

294 The image of the source. 

295 center: 

296 The suggested center of the source. 

297 radius: 

298 The number of pixels around the `center` to search 

299 for a higher flux value. 

300 

301 Returns 

302 ------- 

303 new_center: 

304 The true center of the source. 

305 """ 

306 cy, cx = int(round(center[0])), int(round(center[1])) 

307 y0 = np.max([cy - radius, 0]) 

308 x0 = np.max([cx - radius, 0]) 

309 y_slice = slice(y0, cy + radius + 1) 

310 x_slice = slice(x0, cx + radius + 1) 

311 subset = image[y_slice, x_slice] 

312 center = cast(tuple[int, int], np.unravel_index(np.argmax(subset), subset.shape)) 

313 return center[0] + y0, center[1] + x0 

314 

315 

316def prox_monotonic_mask( 

317 x: np.ndarray, 

318 center: tuple[int, int], 

319 center_radius: int = 1, 

320 variance: float = 0.0, 

321 max_iter: int = 3, 

322) -> tuple[np.ndarray, np.ndarray, tuple[int, int, int, int]]: 

323 """Apply monotonicity from any path from the center 

324 

325 Parameters 

326 ---------- 

327 x: 

328 The input image that the mask is created for. 

329 center: 

330 The location of the center of the mask. 

331 center_radius: 

332 Radius from the center pixel to search for a better center 

333 (ie. a pixel in `X` with higher flux than the pixel given by 

334 `center`). 

335 If `center_radius == 0` then the `center` pixel is assumed 

336 to be correct. 

337 variance: 

338 The average variance in the image. 

339 This is used to allow pixels to be non-monotonic up to `variance`, 

340 so setting `variance=0` will force strict monotonicity in the mask. 

341 max_iter: 

342 Maximum number of iterations to interpolate non-monotonic pixels. 

343 

344 Returns 

345 ------- 

346 valid: 

347 Boolean array of pixels that are monotonic. 

348 model: 

349 The model with invalid pixels masked out. 

350 bounds: 

351 The bounds of the valid monotonic pixels. 

352 """ 

353 from lsst.scarlet.lite.operators_pybind11 import ( 

354 get_valid_monotonic_pixels, 

355 linear_interpolate_invalid_pixels, 

356 ) 

357 

358 if center_radius > 0: 

359 i, j = get_peak(x, center, center_radius) 

360 else: 

361 i, j = int(np.round(center[0])), int(np.round(center[1])) 

362 unchecked = np.ones(x.shape, dtype=bool) 

363 unchecked[i, j] = False 

364 orphans = np.zeros(x.shape, dtype=bool) 

365 # This is the bounding box of the result 

366 bounds = np.array([i, i, j, j], dtype=np.int32) 

367 # Get all of the monotonic pixels 

368 get_valid_monotonic_pixels(i, j, x, unchecked, orphans, variance, bounds, 0) 

369 # Set the initial model to the exact input in the valid pixels 

370 model = x.copy() 

371 

372 it = 0 

373 

374 while np.sum(orphans & unchecked) > 0 and it < max_iter: 

375 it += 1 

376 all_i, all_j = np.where(orphans) 

377 linear_interpolate_invalid_pixels(all_i, all_j, unchecked, model, orphans, variance, True, bounds) 

378 valid = ~unchecked & ~orphans 

379 # Clear all of the invalid pixels from the input image 

380 model = model * valid 

381 return valid, model, tuple(bounds) # type: ignore 

382 

383 

384def uncentered_operator( 

385 x: np.ndarray, 

386 func: Callable, 

387 center: tuple[int, int] | None = None, 

388 fill: float | None = None, 

389 **kwargs, 

390) -> np.ndarray: 

391 """Only apply the operator on a centered patch 

392 

393 In some cases, for example symmetry, an operator might not make 

394 sense outside of a centered box. This operator only updates 

395 the portion of `X` inside the centered region. 

396 

397 Parameters 

398 ---------- 

399 x: 

400 The parameter to update. 

401 func: 

402 The function (or operator) to apply to `x`. 

403 center: 

404 The location of the center of the sub-region to 

405 apply `func` to `x`. 

406 fill: 

407 The value to fill the region outside of centered 

408 `sub-region`, for example `0`. If `fill` is `None` 

409 then only the subregion is updated and the rest of 

410 `x` remains unchanged. 

411 

412 Returns 

413 ------- 

414 result: 

415 `x`, with an operator applied based on the shifted center. 

416 """ 

417 if center is None: 

418 py, px = cast(tuple[int, int], np.unravel_index(np.argmax(x), x.shape)) 

419 else: 

420 py, px = center 

421 cy, cx = np.array(x.shape) // 2 

422 

423 # Fast path: skip the slicing only when the full array is 

424 # already a valid input for ``func`` (odd-odd shape with the 

425 # peak at the geometric center). Even-shaped arrays must take 

426 # the slicing path below so the +1 correction yields an 

427 # odd-shaped subarray centered on the peak. 

428 if py == cy and px == cx and x.shape[0] % 2 == 1 and x.shape[1] % 2 == 1: 

429 return func(x, **kwargs) 

430 

431 dy = int(round(2 * (py - cy))) 

432 dx = int(round(2 * (px - cx))) 

433 if not x.shape[0] % 2: 

434 dy += 1 

435 if not x.shape[1] % 2: 

436 dx += 1 

437 if dx < 0: 

438 xslice = slice(None, dx) 

439 else: 

440 xslice = slice(dx, None) 

441 if dy < 0: 

442 yslice = slice(None, dy) 

443 else: 

444 yslice = slice(dy, None) 

445 

446 if fill is not None: 

447 _x = np.ones(x.shape, x.dtype) * fill 

448 _x[yslice, xslice] = func(x[yslice, xslice], **kwargs) 

449 x[:] = _x 

450 else: 

451 x[yslice, xslice] = func(x[yslice, xslice], **kwargs) 

452 

453 return x 

454 

455 

456def prox_sdss_symmetry(x: np.ndarray): 

457 """SDSS/HSC symmetry operator 

458 

459 This function uses the *minimum* of the two 

460 symmetric pixels in the update. Symmetry is enforced about the 

461 geometric center of ``x``: for odd-shaped axes that is the 

462 center pixel, for even-shaped axes it is the half-pixel offset 

463 between the two central pixels. Callers that need integer-pixel 

464 symmetry on an even-shaped array should go through 

465 ``prox_uncentered_symmetry``, which slices to an odd-shaped 

466 subregion before calling this helper. 

467 

468 Parameters 

469 ---------- 

470 x: 

471 The array to make symmetric. 

472 

473 Returns 

474 ------- 

475 result: 

476 The updated `x`. 

477 """ 

478 symmetric = np.fliplr(np.flipud(x)) 

479 x[:] = np.min([x, symmetric], axis=0) 

480 return x 

481 

482 

483def prox_uncentered_symmetry( 

484 x: np.ndarray, 

485 center: tuple[int, int] | None = None, 

486 fill: float | None = None, 

487) -> np.ndarray: 

488 """Symmetry with off-center peak 

489 

490 Symmetrize X for all pixels with a symmetric partner. 

491 

492 Parameters 

493 ---------- 

494 x: 

495 The parameter to update. 

496 center: 

497 The center pixel coordinates to apply the symmetry operator. 

498 fill: 

499 The value to fill the region that cannot be made symmetric. 

500 When `fill` is `None` then the region of `X` that is not symmetric 

501 is not constrained. 

502 

503 Returns 

504 ------- 

505 result: 

506 The update function based on the specified parameters. 

507 """ 

508 return uncentered_operator(x, prox_sdss_symmetry, center, fill=fill)