Coverage for yasmon/callbacks.py: 99%
266 statements
« prev ^ index » next coverage.py v7.2.2, created at 2023-03-28 10:57 +0000
« prev ^ index » next coverage.py v7.2.2, created at 2023-03-28 10:57 +0000
1from loguru import logger
2from abc import ABC, abstractmethod
3from typing import Self, TYPE_CHECKING
4import asyncio
5import yaml
6import re
7import smtplib
8import email
9import ssl
10import mimetypes
11import pathlib
13if TYPE_CHECKING:
14 from .tasks import AbstractTask
17def process_attributes(expr: str, attrs: dict[str, str]) -> str:
18 """
19 Performs attribute substitutions in `expr`.
21 This is also checking for circular dependencies (`max_depth = 42`).
23 :param expr: expression to process
24 :param attrs: attributes to substitute
26 :raises CallbackCircularAttributeError: see documentation
27 :raises CallbackAttributeError: see documentation
29 :return: processed expression
30 :rtype: str
31 """
32 max_depth = 42
33 regex = re.compile(r"\{[^{}]*\}") # match attributes
34 search = regex.search(expr)
36 depth = 0
37 while search:
38 try:
39 expr = expr.format(**attrs)
40 except KeyError as e:
41 raise CallbackAttributeError(e)
43 if depth < max_depth:
44 depth += 1
45 else:
46 raise CallbackCircularAttributeError(expr)
48 search = regex.search(expr)
50 return expr
53class CallbackNotImplementedError(Exception):
54 """
55 Raised if a requested callback type is not implemented.
56 """
58 def __init__(self, type: str, message="callback {type} not implemented"):
59 self.message = message.format(type=type)
60 super().__init__(self.message)
63class CallbackAttributeError(Exception):
64 """
65 Raised when an undefined task attribute is used by a callback.
66 """
68 def __init__(self, attr: str, message="undefined attribute {attr}"):
69 self.message = message.format(attr=attr)
70 super().__init__(self.message)
73class CallbackCircularAttributeError(Exception):
74 """
75 Raised when task attributes have circular dependencies.
76 """
78 def __init__(self, expr: str,
79 message="{expr}\ndetected circular attributes"):
80 self.message = message.format(expr=expr)
81 super().__init__(self.message)
84class CallbackSyntaxError(Exception):
85 """
86 Raised on callback syntax error.
87 """
89 def __init__(self, message="callback syntax error"):
90 self.message = message
91 super().__init__(self.message)
94class CallbackError(Exception):
95 """
96 Raised on callback execution error.
97 """
99 def __init__(self, message="callback error"):
100 self.message = message
101 super().__init__(self.message)
104class CallbackDict(dict):
105 """
106 A dedicated `dictionary` for callbacks.
107 """
109 def __init__(self, mapping=(), **kwargs):
110 super().__init__(mapping, **kwargs)
113class AbstractCallback(ABC):
114 """
115 Abstract class from which all callback classes are derived.
117 Derived callbacks are functors and can be used as coroutines for any
118 of :class:`yasmon.tasks.AbstractTasks`.
120 The preferred way to instatiate a callback is from class
121 method :func:`~from_yaml`.
122 """
124 @abstractmethod
125 def __init__(self):
126 if not self.name:
127 self.name = "Generic Callback"
128 logger.info(f'{self.name} ({self.__class__}) initialized')
130 @abstractmethod
131 async def __call__(self, task: 'AbstractTask', attrs: dict[str, str]):
132 """
133 Coroutine called by :class:`TaskRunner`.
135 :param task: task calling the callback
136 :raises CallbackError: see documentation
137 :raises CallbackAttributeError: see documentation
138 :raises CallbackCircularAttributeError: see documentation
139 """
140 logger.info(f'{self.name} ({self.__class__}) called by '
141 f'{task.name} ({task.__class__})')
143 @classmethod
144 @abstractmethod
145 def from_yaml(cls, name: str, data: str):
146 """
147 A class method for constructing a callback from a YAML document.
149 :param name: unique identifier
150 :param data: yaml data
152 :return: new instance
153 """
154 logger.debug(f'{name} defined form yaml \n{data}')
157class ShellCallback(AbstractCallback):
158 """
159 Callback implementing shell command execution.
160 """
162 def __init__(self, name: str, cmd: str) -> None:
163 """
164 :param name: unique identifier
165 :param cmd: command to be executed
166 """
167 self.name = name
168 self.cmd = cmd
169 super().__init__()
171 async def __call__(self, task: 'AbstractTask', attrs: dict[str, str]):
172 await super().__call__(task, attrs)
174 try:
175 cmd = process_attributes(self.cmd, attrs)
176 except CallbackAttributeError:
177 raise
178 except CallbackCircularAttributeError:
179 raise
181 proc = await asyncio.create_subprocess_shell(
182 cmd,
183 stdin=asyncio.subprocess.PIPE,
184 stdout=asyncio.subprocess.PIPE,
185 stderr=asyncio.subprocess.PIPE)
187 stdout, stderr = await proc.communicate()
188 out = stdout.decode()
189 err = stderr.decode()
191 if out:
192 logger.info(f'callback {self.name} stdout:\n {out}')
194 if err:
195 logger.error(f'callback {self.name} stderr:\n {err}')
197 return stdout, stderr
199 @classmethod
200 def from_yaml(cls, name: str, data: str) -> Self:
201 """
202 :class:`ShellCallback` can be also constructed from a YAML snippet.
204 .. code:: yaml
206 command: ls -lah /path/to/some/dir/
208 :param name: unique identifier
209 :param data: YAML snippet
211 :return: new instance
212 :rtype: ShellCallback
213 """
214 super().from_yaml(name, data)
215 try:
216 parsed = yaml.safe_load(data)
217 except yaml.YAMLError as err:
218 raise CallbackSyntaxError(err)
220 if 'command' not in parsed:
221 raise CallbackSyntaxError(f"""\
222 in callback {name} missig command
223 """)
225 if not isinstance(parsed['command'], str):
226 raise CallbackSyntaxError(f"""\
227 in callback {name} not a string
228 """)
230 cmd = parsed["command"]
231 return cls(name, cmd)
234class LoggerCallback(AbstractCallback):
235 """
236 Callback implementing logger calls
237 """
239 def __init__(self, name: str, level: str, message: str) -> None:
240 """
241 :param name: unique identifier
242 :param level: logging level
243 :param message: message to pass to logger
244 """
245 self.name = name
246 self.level = level
247 self.message = message
248 super().__init__()
250 async def __call__(self, task: 'AbstractTask',
251 attrs: dict[str, str]) -> None:
252 await super().__call__(task, attrs)
254 try:
255 message = process_attributes(self.message, attrs)
256 except CallbackAttributeError:
257 raise
258 except CallbackCircularAttributeError:
259 raise
261 method = getattr(logger, self.level)
262 method(message)
264 @classmethod
265 def from_yaml(cls, name: str, data: str) -> Self:
266 """
267 :class:`LoggerCallback` can be also constructed from a YAML snippet.
269 .. code:: yaml
271 level: [error | info | debug | ... (see Loguru docs)]
272 message: message
274 :param name: unique identifier
275 :param data: YAML snippet
277 :return: new instance
278 :rtype: LoggerCallback
279 """
280 super().from_yaml(name, data)
281 try:
282 parsed = yaml.safe_load(data)
283 except yaml.YAMLError as err:
284 raise CallbackSyntaxError(err)
286 imp_levels = [
287 'trace',
288 'debug',
289 'info',
290 'success',
291 'warning',
292 'error',
293 'critical'
294 ]
296 if 'level' not in parsed:
297 raise CallbackSyntaxError(f"""\
298 in callback {name} missig logger level
299 """)
301 level = parsed["level"]
302 if level not in imp_levels:
303 raise CallbackSyntaxError(f"""\
304 in callback {name} invalid logger level {level}
305 """)
307 if 'message' not in parsed:
308 raise CallbackSyntaxError(f"""\
309 in callback {name} missig message
310 """)
312 if not isinstance(parsed['message'], str):
313 raise CallbackSyntaxError(f"""\
314 in callback {name} message not a string
315 """)
317 message = parsed["message"]
318 return cls(name, level, message)
321class MailCallback(AbstractCallback):
322 """
323 Callback implementing sending simple notification mails with
324 attachments.
325 """
327 def __init__(self, name: str, host: str, port: int, login: str,
328 password: str, security: str, toaddr: str, subject: str,
329 fromaddr: str, message: str, attach: list[str],
330 delay: int) -> None:
331 """
332 :param name: unique callback identifier
333 :param host: smtp host
334 :param port: smtp port
335 :param login: login name
336 :param password: password
337 :param security: ``starttls`` or ``ssl``
338 :param toaddr: to address
339 :param subject: mail subject
340 :param fromaddr: from address
341 :param message: message
342 """
343 self.name = name
344 self.host = host
345 self.port = port
346 self.login = login
347 self.password = password
348 self.security = security
349 self.toaddr = toaddr
350 self.subject = subject
351 self.fromaddr = fromaddr
352 self.message = message
353 self.attach = attach
354 self.delay = delay
355 super().__init__()
357 def send_message_starttls(self, message: email.message.EmailMessage):
358 """
359 Send message using STARTTLS.
360 """
362 context = ssl.create_default_context()
363 server = smtplib.SMTP(self.host, self.port)
364 try:
365 server.starttls(context=context)
366 server.login(self.login, self.password)
367 server.send_message(message)
368 except Exception:
369 raise
370 finally:
371 server.quit()
373 def send_message_ssl(self, message: email.message.EmailMessage):
374 """
375 Send message using SSL.
376 """
378 try:
379 server = smtplib.SMTP_SSL(self.host, self.port)
380 server.login(self.login, self.password)
381 server.send_message(message)
382 except Exception:
383 raise
384 finally:
385 server.quit()
387 def process_attachments(self, message: email.message.EmailMessage,
388 attrs: dict[str, str]) -> None:
389 """
390 Process list of attachments from ``attach`` and attach these to
391 ``message``.
393 :raises CallbackError: if path to file does not exist
394 """
396 try:
397 for attachment in self.attach:
398 processed_path = process_attributes(attachment, attrs)
399 filepath = pathlib.Path(processed_path)
400 filename = filepath.name
401 mimetype = mimetypes.guess_type(filename)
402 if mimetype[0]:
403 data = mimetype[0].split('/')
404 attachment_mimetype = {
405 'maintype': data[0],
406 'subtype': data[1]
407 }
408 else:
409 attachment_mimetype = {
410 'maintype': 'text',
411 'subtype': 'plain'
412 }
413 with open(filepath, 'rb') as fh:
414 attachment_content = fh.read()
415 message.add_attachment(
416 attachment_content,
417 maintype=attachment_mimetype.get('maintype'),
418 subtype=attachment_mimetype.get('subtype'),
419 filename=filename)
420 except Exception as err:
421 raise CallbackError(
422 f"in callback '{self.name}' exception "
423 f"'{err.__class__.__name__}' was raised, {err}")
425 async def __call__(self, task: 'AbstractTask',
426 attrs: dict[str, str]) -> None:
427 await super().__call__(task, attrs)
429 if self.delay > 0:
430 logger.debug(f'{self.name} ({self.__class__}) '
431 f'delayed for {self.delay}')
432 await asyncio.sleep(self.delay)
434 try:
435 message = email.message.EmailMessage()
436 message['Subject'] = process_attributes(self.subject, attrs)
437 message['From'] = process_attributes(self.fromaddr, attrs)
438 message['To'] = process_attributes(self.toaddr, attrs)
439 content = process_attributes(self.message, attrs)
440 message.set_content(content)
441 self.process_attachments(message, attrs)
442 except CallbackError:
443 raise
444 except CallbackAttributeError:
445 raise
446 except CallbackCircularAttributeError:
447 raise
449 try:
450 loop = asyncio.get_event_loop()
451 match self.security:
452 case 'starttls':
453 await loop.run_in_executor(
454 None, self.send_message_starttls, message)
455 case 'ssl':
456 await loop.run_in_executor(
457 None, self.send_message_ssl, message)
458 logger.debug(f'{self.name} ({self.__class__}) '
459 f'sent a mail')
460 except Exception as err:
461 raise CallbackError(
462 f"in callback '{self.name}' exception "
463 f"'{err.__class__.__name__}' was raised, {err}")
465 @classmethod
466 def from_yaml(cls, name: str, data: str) -> Self:
467 """
468 :class:`MailCallback` can be also constructed from a YAML snippet.
470 .. code:: yaml
472 host: [SMTP_HOST]
473 port: [SMTP_PORT]
474 login: [SMTP_LOGIN]
475 password: [SMTP_PASSWORD]
476 security: [starttls | ssl]
477 from: account@host.com
478 to: account@anotherhost.com
479 subject: Some subject with an attribute {subject}
480 message: Some message with attributes {message} {date}
481 attach:
482 - patch/to/file1
483 - patch/to/file2
484 delay: 42
486 :param name: unique identifier
487 :param data: YAML snippet
489 :return: new instance
490 :rtype: MailCallback
491 """
492 super().from_yaml(name, data)
494 try:
495 parsed = yaml.safe_load(data)
496 except yaml.YAMLError as err:
497 raise CallbackSyntaxError(err)
499 required_keys = [
500 'host',
501 'port',
502 'login',
503 'password',
504 'security',
505 'to',
506 'subject',
507 'from',
508 'message'
509 ]
511 for required_key in required_keys:
512 if required_key not in parsed:
513 raise CallbackSyntaxError(
514 f"in callback '{name}' missing '{required_key}'")
516 host = parsed['host']
517 port = parsed['port']
518 login = parsed['login']
519 password = parsed['password']
520 security = parsed['security']
521 toaddr = parsed['to']
522 subject = parsed['subject']
523 fromaddr = parsed['from']
524 message = parsed['message']
526 attach: list[str] = []
527 if 'attach' in parsed:
528 if not isinstance(parsed['attach'], list):
529 raise CallbackSyntaxError(
530 f"in callback '{name}' 'attach' not a list")
532 for attachment in parsed['attach']:
533 if not isinstance(attachment, str):
534 raise CallbackSyntaxError(
535 f"in callback '{name}' invalid attachment")
537 attach.append(attachment)
539 if not isinstance(host, str):
540 raise CallbackSyntaxError(
541 f"in callback '{name}' 'host' not a str")
543 if not isinstance(login, str):
544 raise CallbackSyntaxError(
545 f"in callback '{name}' 'login' not a str")
547 if not isinstance(password, str):
548 raise CallbackSyntaxError(
549 f"in callback '{name}' 'password' not a str")
551 if not isinstance(security, str):
552 raise CallbackSyntaxError(
553 f"in callback '{name}' 'security' not a str")
555 if not isinstance(toaddr, str):
556 raise CallbackSyntaxError(
557 f"in callback '{name}' 'to' not a str")
559 if not isinstance(subject, str):
560 raise CallbackSyntaxError(
561 f"in callback '{name}' 'subject' not a str")
563 if not isinstance(fromaddr, str):
564 raise CallbackSyntaxError(
565 f"in callback '{name}' 'from' not a str")
567 if not isinstance(message, str):
568 raise CallbackSyntaxError(
569 f"in callback '{name}' 'message' not a str")
571 if not isinstance(port, int):
572 raise CallbackSyntaxError(
573 f"in callback '{name}' 'port' not an int")
575 if 'delay' in parsed:
576 delay = parsed['delay']
577 if not isinstance(delay, int):
578 raise CallbackSyntaxError(
579 f"in callback '{name}' 'delay' not an int")
580 else:
581 delay = 0
583 if security not in ['starttls', 'ssl']:
584 raise CallbackSyntaxError(
585 f"in callback '{name}' invalid "
586 f"security '{security}' value")
588 return cls(name, host, port, login, password, security,
589 toaddr, subject, fromaddr, message, attach, delay)