Coverage for yasmon/callbacks.py: 99%

266 statements  

« prev     ^ index     » next       coverage.py v7.2.2, created at 2023-03-28 10:57 +0000

1from loguru import logger 

2from abc import ABC, abstractmethod 

3from typing import Self, TYPE_CHECKING 

4import asyncio 

5import yaml 

6import re 

7import smtplib 

8import email 

9import ssl 

10import mimetypes 

11import pathlib 

12 

13if TYPE_CHECKING: 

14 from .tasks import AbstractTask 

15 

16 

17def process_attributes(expr: str, attrs: dict[str, str]) -> str: 

18 """ 

19 Performs attribute substitutions in `expr`. 

20 

21 This is also checking for circular dependencies (`max_depth = 42`). 

22 

23 :param expr: expression to process 

24 :param attrs: attributes to substitute 

25 

26 :raises CallbackCircularAttributeError: see documentation 

27 :raises CallbackAttributeError: see documentation 

28 

29 :return: processed expression 

30 :rtype: str 

31 """ 

32 max_depth = 42 

33 regex = re.compile(r"\{[^{}]*\}") # match attributes 

34 search = regex.search(expr) 

35 

36 depth = 0 

37 while search: 

38 try: 

39 expr = expr.format(**attrs) 

40 except KeyError as e: 

41 raise CallbackAttributeError(e) 

42 

43 if depth < max_depth: 

44 depth += 1 

45 else: 

46 raise CallbackCircularAttributeError(expr) 

47 

48 search = regex.search(expr) 

49 

50 return expr 

51 

52 

53class CallbackNotImplementedError(Exception): 

54 """ 

55 Raised if a requested callback type is not implemented. 

56 """ 

57 

58 def __init__(self, type: str, message="callback {type} not implemented"): 

59 self.message = message.format(type=type) 

60 super().__init__(self.message) 

61 

62 

63class CallbackAttributeError(Exception): 

64 """ 

65 Raised when an undefined task attribute is used by a callback. 

66 """ 

67 

68 def __init__(self, attr: str, message="undefined attribute {attr}"): 

69 self.message = message.format(attr=attr) 

70 super().__init__(self.message) 

71 

72 

73class CallbackCircularAttributeError(Exception): 

74 """ 

75 Raised when task attributes have circular dependencies. 

76 """ 

77 

78 def __init__(self, expr: str, 

79 message="{expr}\ndetected circular attributes"): 

80 self.message = message.format(expr=expr) 

81 super().__init__(self.message) 

82 

83 

84class CallbackSyntaxError(Exception): 

85 """ 

86 Raised on callback syntax error. 

87 """ 

88 

89 def __init__(self, message="callback syntax error"): 

90 self.message = message 

91 super().__init__(self.message) 

92 

93 

94class CallbackError(Exception): 

95 """ 

96 Raised on callback execution error. 

97 """ 

98 

99 def __init__(self, message="callback error"): 

100 self.message = message 

101 super().__init__(self.message) 

102 

103 

104class CallbackDict(dict): 

105 """ 

106 A dedicated `dictionary` for callbacks. 

107 """ 

108 

109 def __init__(self, mapping=(), **kwargs): 

110 super().__init__(mapping, **kwargs) 

111 

112 

113class AbstractCallback(ABC): 

114 """ 

115 Abstract class from which all callback classes are derived. 

116 

117 Derived callbacks are functors and can be used as coroutines for any 

118 of :class:`yasmon.tasks.AbstractTasks`. 

119 

120 The preferred way to instatiate a callback is from class 

121 method :func:`~from_yaml`. 

122 """ 

123 

124 @abstractmethod 

125 def __init__(self): 

126 if not self.name: 

127 self.name = "Generic Callback" 

128 logger.info(f'{self.name} ({self.__class__}) initialized') 

129 

130 @abstractmethod 

131 async def __call__(self, task: 'AbstractTask', attrs: dict[str, str]): 

132 """ 

133 Coroutine called by :class:`TaskRunner`. 

134 

135 :param task: task calling the callback 

136 :raises CallbackError: see documentation 

137 :raises CallbackAttributeError: see documentation 

138 :raises CallbackCircularAttributeError: see documentation 

139 """ 

140 logger.info(f'{self.name} ({self.__class__}) called by ' 

141 f'{task.name} ({task.__class__})') 

142 

143 @classmethod 

144 @abstractmethod 

145 def from_yaml(cls, name: str, data: str): 

146 """ 

147 A class method for constructing a callback from a YAML document. 

148 

149 :param name: unique identifier 

150 :param data: yaml data 

151 

152 :return: new instance 

153 """ 

154 logger.debug(f'{name} defined form yaml \n{data}') 

155 

156 

157class ShellCallback(AbstractCallback): 

158 """ 

159 Callback implementing shell command execution. 

160 """ 

161 

162 def __init__(self, name: str, cmd: str) -> None: 

163 """ 

164 :param name: unique identifier 

165 :param cmd: command to be executed 

166 """ 

167 self.name = name 

168 self.cmd = cmd 

169 super().__init__() 

170 

171 async def __call__(self, task: 'AbstractTask', attrs: dict[str, str]): 

172 await super().__call__(task, attrs) 

173 

174 try: 

175 cmd = process_attributes(self.cmd, attrs) 

176 except CallbackAttributeError: 

177 raise 

178 except CallbackCircularAttributeError: 

179 raise 

180 

181 proc = await asyncio.create_subprocess_shell( 

182 cmd, 

183 stdin=asyncio.subprocess.PIPE, 

184 stdout=asyncio.subprocess.PIPE, 

185 stderr=asyncio.subprocess.PIPE) 

186 

187 stdout, stderr = await proc.communicate() 

188 out = stdout.decode() 

189 err = stderr.decode() 

190 

191 if out: 

192 logger.info(f'callback {self.name} stdout:\n {out}') 

193 

194 if err: 

195 logger.error(f'callback {self.name} stderr:\n {err}') 

196 

197 return stdout, stderr 

198 

199 @classmethod 

200 def from_yaml(cls, name: str, data: str) -> Self: 

201 """ 

202 :class:`ShellCallback` can be also constructed from a YAML snippet. 

203 

204 .. code:: yaml 

205 

206 command: ls -lah /path/to/some/dir/ 

207 

208 :param name: unique identifier 

209 :param data: YAML snippet 

210 

211 :return: new instance 

212 :rtype: ShellCallback 

213 """ 

214 super().from_yaml(name, data) 

215 try: 

216 parsed = yaml.safe_load(data) 

217 except yaml.YAMLError as err: 

218 raise CallbackSyntaxError(err) 

219 

220 if 'command' not in parsed: 

221 raise CallbackSyntaxError(f"""\ 

222 in callback {name} missig command 

223 """) 

224 

225 if not isinstance(parsed['command'], str): 

226 raise CallbackSyntaxError(f"""\ 

227 in callback {name} not a string 

228 """) 

229 

230 cmd = parsed["command"] 

231 return cls(name, cmd) 

232 

233 

234class LoggerCallback(AbstractCallback): 

235 """ 

236 Callback implementing logger calls 

237 """ 

238 

239 def __init__(self, name: str, level: str, message: str) -> None: 

240 """ 

241 :param name: unique identifier 

242 :param level: logging level 

243 :param message: message to pass to logger 

244 """ 

245 self.name = name 

246 self.level = level 

247 self.message = message 

248 super().__init__() 

249 

250 async def __call__(self, task: 'AbstractTask', 

251 attrs: dict[str, str]) -> None: 

252 await super().__call__(task, attrs) 

253 

254 try: 

255 message = process_attributes(self.message, attrs) 

256 except CallbackAttributeError: 

257 raise 

258 except CallbackCircularAttributeError: 

259 raise 

260 

261 method = getattr(logger, self.level) 

262 method(message) 

263 

264 @classmethod 

265 def from_yaml(cls, name: str, data: str) -> Self: 

266 """ 

267 :class:`LoggerCallback` can be also constructed from a YAML snippet. 

268 

269 .. code:: yaml 

270 

271 level: [error | info | debug | ... (see Loguru docs)] 

272 message: message 

273 

274 :param name: unique identifier 

275 :param data: YAML snippet 

276 

277 :return: new instance 

278 :rtype: LoggerCallback 

279 """ 

280 super().from_yaml(name, data) 

281 try: 

282 parsed = yaml.safe_load(data) 

283 except yaml.YAMLError as err: 

284 raise CallbackSyntaxError(err) 

285 

286 imp_levels = [ 

287 'trace', 

288 'debug', 

289 'info', 

290 'success', 

291 'warning', 

292 'error', 

293 'critical' 

294 ] 

295 

296 if 'level' not in parsed: 

297 raise CallbackSyntaxError(f"""\ 

298 in callback {name} missig logger level 

299 """) 

300 

301 level = parsed["level"] 

302 if level not in imp_levels: 

303 raise CallbackSyntaxError(f"""\ 

304 in callback {name} invalid logger level {level} 

305 """) 

306 

307 if 'message' not in parsed: 

308 raise CallbackSyntaxError(f"""\ 

309 in callback {name} missig message 

310 """) 

311 

312 if not isinstance(parsed['message'], str): 

313 raise CallbackSyntaxError(f"""\ 

314 in callback {name} message not a string 

315 """) 

316 

317 message = parsed["message"] 

318 return cls(name, level, message) 

319 

320 

321class MailCallback(AbstractCallback): 

322 """ 

323 Callback implementing sending simple notification mails with 

324 attachments. 

325 """ 

326 

327 def __init__(self, name: str, host: str, port: int, login: str, 

328 password: str, security: str, toaddr: str, subject: str, 

329 fromaddr: str, message: str, attach: list[str], 

330 delay: int) -> None: 

331 """ 

332 :param name: unique callback identifier 

333 :param host: smtp host 

334 :param port: smtp port 

335 :param login: login name 

336 :param password: password 

337 :param security: ``starttls`` or ``ssl`` 

338 :param toaddr: to address 

339 :param subject: mail subject 

340 :param fromaddr: from address 

341 :param message: message 

342 """ 

343 self.name = name 

344 self.host = host 

345 self.port = port 

346 self.login = login 

347 self.password = password 

348 self.security = security 

349 self.toaddr = toaddr 

350 self.subject = subject 

351 self.fromaddr = fromaddr 

352 self.message = message 

353 self.attach = attach 

354 self.delay = delay 

355 super().__init__() 

356 

357 def send_message_starttls(self, message: email.message.EmailMessage): 

358 """ 

359 Send message using STARTTLS. 

360 """ 

361 

362 context = ssl.create_default_context() 

363 server = smtplib.SMTP(self.host, self.port) 

364 try: 

365 server.starttls(context=context) 

366 server.login(self.login, self.password) 

367 server.send_message(message) 

368 except Exception: 

369 raise 

370 finally: 

371 server.quit() 

372 

373 def send_message_ssl(self, message: email.message.EmailMessage): 

374 """ 

375 Send message using SSL. 

376 """ 

377 

378 try: 

379 server = smtplib.SMTP_SSL(self.host, self.port) 

380 server.login(self.login, self.password) 

381 server.send_message(message) 

382 except Exception: 

383 raise 

384 finally: 

385 server.quit() 

386 

387 def process_attachments(self, message: email.message.EmailMessage, 

388 attrs: dict[str, str]) -> None: 

389 """ 

390 Process list of attachments from ``attach`` and attach these to 

391 ``message``. 

392 

393 :raises CallbackError: if path to file does not exist 

394 """ 

395 

396 try: 

397 for attachment in self.attach: 

398 processed_path = process_attributes(attachment, attrs) 

399 filepath = pathlib.Path(processed_path) 

400 filename = filepath.name 

401 mimetype = mimetypes.guess_type(filename) 

402 if mimetype[0]: 

403 data = mimetype[0].split('/') 

404 attachment_mimetype = { 

405 'maintype': data[0], 

406 'subtype': data[1] 

407 } 

408 else: 

409 attachment_mimetype = { 

410 'maintype': 'text', 

411 'subtype': 'plain' 

412 } 

413 with open(filepath, 'rb') as fh: 

414 attachment_content = fh.read() 

415 message.add_attachment( 

416 attachment_content, 

417 maintype=attachment_mimetype.get('maintype'), 

418 subtype=attachment_mimetype.get('subtype'), 

419 filename=filename) 

420 except Exception as err: 

421 raise CallbackError( 

422 f"in callback '{self.name}' exception " 

423 f"'{err.__class__.__name__}' was raised, {err}") 

424 

425 async def __call__(self, task: 'AbstractTask', 

426 attrs: dict[str, str]) -> None: 

427 await super().__call__(task, attrs) 

428 

429 if self.delay > 0: 

430 logger.debug(f'{self.name} ({self.__class__}) ' 

431 f'delayed for {self.delay}') 

432 await asyncio.sleep(self.delay) 

433 

434 try: 

435 message = email.message.EmailMessage() 

436 message['Subject'] = process_attributes(self.subject, attrs) 

437 message['From'] = process_attributes(self.fromaddr, attrs) 

438 message['To'] = process_attributes(self.toaddr, attrs) 

439 content = process_attributes(self.message, attrs) 

440 message.set_content(content) 

441 self.process_attachments(message, attrs) 

442 except CallbackError: 

443 raise 

444 except CallbackAttributeError: 

445 raise 

446 except CallbackCircularAttributeError: 

447 raise 

448 

449 try: 

450 loop = asyncio.get_event_loop() 

451 match self.security: 

452 case 'starttls': 

453 await loop.run_in_executor( 

454 None, self.send_message_starttls, message) 

455 case 'ssl': 

456 await loop.run_in_executor( 

457 None, self.send_message_ssl, message) 

458 logger.debug(f'{self.name} ({self.__class__}) ' 

459 f'sent a mail') 

460 except Exception as err: 

461 raise CallbackError( 

462 f"in callback '{self.name}' exception " 

463 f"'{err.__class__.__name__}' was raised, {err}") 

464 

465 @classmethod 

466 def from_yaml(cls, name: str, data: str) -> Self: 

467 """ 

468 :class:`MailCallback` can be also constructed from a YAML snippet. 

469 

470 .. code:: yaml 

471 

472 host: [SMTP_HOST] 

473 port: [SMTP_PORT] 

474 login: [SMTP_LOGIN] 

475 password: [SMTP_PASSWORD] 

476 security: [starttls | ssl] 

477 from: account@host.com 

478 to: account@anotherhost.com 

479 subject: Some subject with an attribute {subject} 

480 message: Some message with attributes {message} {date} 

481 attach: 

482 - patch/to/file1 

483 - patch/to/file2 

484 delay: 42 

485 

486 :param name: unique identifier 

487 :param data: YAML snippet 

488 

489 :return: new instance 

490 :rtype: MailCallback 

491 """ 

492 super().from_yaml(name, data) 

493 

494 try: 

495 parsed = yaml.safe_load(data) 

496 except yaml.YAMLError as err: 

497 raise CallbackSyntaxError(err) 

498 

499 required_keys = [ 

500 'host', 

501 'port', 

502 'login', 

503 'password', 

504 'security', 

505 'to', 

506 'subject', 

507 'from', 

508 'message' 

509 ] 

510 

511 for required_key in required_keys: 

512 if required_key not in parsed: 

513 raise CallbackSyntaxError( 

514 f"in callback '{name}' missing '{required_key}'") 

515 

516 host = parsed['host'] 

517 port = parsed['port'] 

518 login = parsed['login'] 

519 password = parsed['password'] 

520 security = parsed['security'] 

521 toaddr = parsed['to'] 

522 subject = parsed['subject'] 

523 fromaddr = parsed['from'] 

524 message = parsed['message'] 

525 

526 attach: list[str] = [] 

527 if 'attach' in parsed: 

528 if not isinstance(parsed['attach'], list): 

529 raise CallbackSyntaxError( 

530 f"in callback '{name}' 'attach' not a list") 

531 

532 for attachment in parsed['attach']: 

533 if not isinstance(attachment, str): 

534 raise CallbackSyntaxError( 

535 f"in callback '{name}' invalid attachment") 

536 

537 attach.append(attachment) 

538 

539 if not isinstance(host, str): 

540 raise CallbackSyntaxError( 

541 f"in callback '{name}' 'host' not a str") 

542 

543 if not isinstance(login, str): 

544 raise CallbackSyntaxError( 

545 f"in callback '{name}' 'login' not a str") 

546 

547 if not isinstance(password, str): 

548 raise CallbackSyntaxError( 

549 f"in callback '{name}' 'password' not a str") 

550 

551 if not isinstance(security, str): 

552 raise CallbackSyntaxError( 

553 f"in callback '{name}' 'security' not a str") 

554 

555 if not isinstance(toaddr, str): 

556 raise CallbackSyntaxError( 

557 f"in callback '{name}' 'to' not a str") 

558 

559 if not isinstance(subject, str): 

560 raise CallbackSyntaxError( 

561 f"in callback '{name}' 'subject' not a str") 

562 

563 if not isinstance(fromaddr, str): 

564 raise CallbackSyntaxError( 

565 f"in callback '{name}' 'from' not a str") 

566 

567 if not isinstance(message, str): 

568 raise CallbackSyntaxError( 

569 f"in callback '{name}' 'message' not a str") 

570 

571 if not isinstance(port, int): 

572 raise CallbackSyntaxError( 

573 f"in callback '{name}' 'port' not an int") 

574 

575 if 'delay' in parsed: 

576 delay = parsed['delay'] 

577 if not isinstance(delay, int): 

578 raise CallbackSyntaxError( 

579 f"in callback '{name}' 'delay' not an int") 

580 else: 

581 delay = 0 

582 

583 if security not in ['starttls', 'ssl']: 

584 raise CallbackSyntaxError( 

585 f"in callback '{name}' invalid " 

586 f"security '{security}' value") 

587 

588 return cls(name, host, port, login, password, security, 

589 toaddr, subject, fromaddr, message, attach, delay)