File: //var/opt/nydus/ops/primordial/log/__init__.py
# -*- coding: utf-8 -*-
from datetime import datetime
from logging.handlers import SysLogHandler
import logging
import logging.config
import os
import socket
from socket import AF_INET, SOCK_DGRAM, SOCK_STREAM, SocketType
from typing import Any, Dict, Optional, Tuple # pylint: disable=W0611
from pkg_resources import resource_stream
import yaml
# If Flask is installed, x-request-id headers are included in logs.
try:
import flask
except ImportError:
flask = None # type: ignore
import pytz
from primordial.timeutils import iso8601_utc
from primordial.log.envelope import Envelope
from primordial.log.envelopedata import ConfigLogData
# Keeping unused imports here to prevent breaking change to hfs, where these classes are referenced via primordial.log
from primordial.log.extra import PerfLogExtra, DevLogExtra, BALogExtra # pylint: disable=unused-import
SYSLOG_DEFAULT_PORT = 514
SYSLOG_FORMAT_VERSION = 1
COOKIE = '@cee:'
PREFIX_MAP = {
logging.DEBUG: '<7>',
logging.INFO: '<6>',
logging.NOTSET: '<6>',
logging.WARNING: '<4>',
logging.WARN: '<4>',
logging.ERROR: '<3>',
logging.CRITICAL: '<2>',
logging.FATAL: '<1>'
}
LOG = logging.getLogger(__name__)
class VerticalServiceStructuredFormatter(logging.Formatter):
"""Specialized structured logging formatter for HFS vertical services.
Takes a log record and configuration data and formats it according to the
structure in the Envelope class, returning a json-formatted log message.
"""
def __init__(self,
*args,
log_type: str = ConfigLogData.DEFAULT_TYPE,
log_sub_type: str = ConfigLogData.DEFAULT_SUB_TYPE,
datacenter: str = ConfigLogData.DEFAULT_DATACENTER,
environment: str = ConfigLogData.DEFAULT_ENVIRONMENT,
**kwargs) -> None:
super().__init__(*args, **kwargs)
self.configLogData = ConfigLogData()\
.set(ConfigLogData.TYPE_FIELD, log_type)\
.set(ConfigLogData.SUB_TYPE_FIELD, log_sub_type)\
.set(ConfigLogData.DATACENTER_FIELD, datacenter)\
.set(ConfigLogData.ENVIRONMENT_FIELD, environment)
def format(self, record: logging.LogRecord) -> str:
logenvelope = Envelope.getEnvelope(record, self.configLogData)
return COOKIE + logenvelope.getJSON()
class VerticalServiceSystemdFormatter(logging.Formatter):
"""Systemd logging formatter for HFS vertical services."""
def format(self, record: logging.LogRecord) -> str:
prefix = PREFIX_MAP.get(record.levelno, '')
return prefix + super().format(record)
class VerticalServiceRequestLoggingFilter(logging.Filter):
"""A log filter for producing only Flask web request messages."""
def filter(self, record: logging.LogRecord) -> bool:
if flask and flask.has_request_context():
if not hasattr(record, 'extra'):
record.extra = {} # type: ignore
request_id = flask.request.headers.get('x-request-id')
if request_id:
record.extra['request_id'] = request_id # type: ignore
return True
class HFSSyslogHandler(SysLogHandler):
"""A customized Log handler for HFS syslogs.
Given a log record, emits a log message formatted according to the
:class:`VerticalServiceStructuredFormatter` class.
"""
def __init__(self, # pylint: disable=W0231
ident: Optional[str] = None,
address: Tuple[str, int] = ('localhost', SYSLOG_DEFAULT_PORT),
facility: int = SysLogHandler.LOG_USER,
socktype: SocketType = SOCK_DGRAM) -> None: # type: ignore
"""Produce a new HFS syslog handler.
:param ident:
:param address:
:param facility:
:raises TypeError: For socket types other than TCP or UDP
"""
if socktype not in (SOCK_DGRAM, SOCK_STREAM):
raise TypeError('HFSSyslogHandler only supports TCP and UDP AF_INET sockets')
# NB: we avoid the standard:
# super(HFSSyslogHandler, self).__init__(address=address, facility=facility, socktype=socktype)
# here in preference for the manual init directly below and the implemenatation of the _connect method.
# The notable thing here is that we're limiting our ability to handle UNIX style sockets, and, we have added
# a restrictive timeout for our socket connections. We may want to revisit this later. --EA
logging.Handler.__init__(self) # pylint: disable=non-parent-init-called
self.ident = ident or os.getenv('SYSLOG_IDENT') or 'python'
self.address = address
self.facility = facility
self.socktype = socktype # type: ignore
self.socket = None # type: Optional[socket.socket]
self.unixsocket = False # required by base close()
def close(self) -> None:
"""Ensure socket object, as expected by base class."""
if self.socket is None:
self._set_socket()
super().close()
def emit(self, record: logging.LogRecord) -> None:
"""Emit a log record.
The record is formatted, and an RFC-compliant message is sent to the syslog server.
No structured data segment is created and message id is always nil ('-').
:param record: The record to be emitted.
"""
try:
# We need to convert record level to lowercase, maybe this will
# change in the future.
# NOTE: https://github.com/python/typeshed/issues/2577
prio = self.encodePriority(self.facility,
self.mapPriority(record.levelname)) # type: ignore
timestamp = iso8601_utc(datetime.fromtimestamp(record.created, pytz.utc))
hostname = os.getenv('HOSTNAME')
pid = record.process if record.process is not None else 0
message_id = '-'
header = '<%d>%d %s %s %s %d %s' % (
prio, SYSLOG_FORMAT_VERSION, timestamp, hostname, self.ident, pid, message_id
)
structured_data = '-' # we don't support structured data
message = self.format(record)
ascii_part = header + ' ' + structured_data + ' '
utf8_part = message + '\n'
# we don't insert the UTF-8 BOM because rsyslog neither wants it nor handles it properly
encoded_message = ascii_part.encode('ascii') + utf8_part.encode('utf8')
try:
if self.socket is None:
self._connect_socket()
# NOTE: Types are ignored here because of https://github.com/python/mypy/issues/4805
if self.socktype == SOCK_DGRAM:
self.socket.sendto(encoded_message, self.address) # type: ignore
else:
self.socket.sendall(encoded_message) # type: ignore
except OSError:
# socket error. close it and we'll reconnect next time
self.socket.close() # type: ignore
self.socket = None
except Exception: # pylint: disable=broad-except
self.handleError(record)
def _connect_socket(self) -> None:
self._set_socket()
self.socket.settimeout(.5) # type: ignore
if self.socktype == SOCK_STREAM:
self.socket.connect(self.address) # type: ignore
def _set_socket(self) -> None:
self.socket = socket.socket(AF_INET, self.socktype)
def init_logging(name: str = __name__,
filename: str = "logging.yaml",
override_loggers: Optional[Dict[str, Any]] = None,
log_type: str = ConfigLogData.DEFAULT_TYPE,
log_sub_type: str = ConfigLogData.DEFAULT_SUB_TYPE,
datacenter: Optional[str] = None,
environment: Optional[str] = None) -> None:
"""
Initialize logging, updating the existing log config loggers dict with an override_loggers dict.
It's possible, since we have many ways to run the system, that init_logging
be called more than once. We allow this, but warn if so.
:param name: package name where the logging YAML config file is located
:param filename: filename of the logging YAML config
:param override_loggers: a dict of overrides to the default log dictconfig
:param log_type:
:param log_sub_type:
:param datacenter:
:param environment:
"""
datacenter = datacenter if datacenter is not None else ConfigLogData.DEFAULT_DATACENTER
environment = environment.upper() if environment is not None else ConfigLogData.DEFAULT_ENVIRONMENT
config = yaml.safe_load(resource_stream(name, filename).read().decode())
for formatter in config['formatters'].values():
if formatter.get('()') ==\
'primordial.log.VerticalServiceStructuredFormatter':
formatter['log_type'] = log_type
formatter['log_sub_type'] = log_sub_type
formatter['environment'] = environment
formatter['datacenter'] = datacenter
if override_loggers is not None:
config['loggers'].update(override_loggers)
logging.config.dictConfig(config)
logging.captureWarnings(True)