import os import sys import json import serial import signal import logging import argparse from threading import Thread from contextlib import contextmanager, ExitStack from signal import SIGINT, SIGTERM from serial.tools import list_ports from datetime import datetime as dt from lib.logger import set_logger from lib.config import LogConfig def parsing_argument(): parser = argparse.ArgumentParser(description="USB-UART Serial logging.") # parser.add_argument('-p', '--serial-path', type=str, default="/dev/ttyUSB2", help="Path to device file") # parser.add_argument('-b', '--baudrate', type=int, default=115200, help="Baudrate") parser.add_argument('-f', '--log-file', type=str, default="logs/main.log", help="Name of log file") parser.add_argument('-d', '--log-level', type=str, default="info", choices=['notset', 'info', 'debug', 'warning', 'error', 'critical'], help="Set log level") parser.add_argument('-c', '--log-config', type=str, default="log-config.yml", help="Config for logging") args = parser.parse_args() # device = os.path.basename(args.serial_path) # if len(args.log_file) == 0: # args.log_file = f"{device}.log" return args class SerialLogger(): def __init__(self, config): self.set_params(config) # self.name = if not hasattr(self, 'log_file') or len(self.log_file) == 0: self.log_file = f"logs/{self.name}.log" if not hasattr(self, 'log_level'): self.log_level='info' self.logger = set_logger(name=self.name, filename=self.log_file, level=self.log_level) self.stop=False print("") self.logger.info("-----------------------------------------------") self.logger.info(f"Logger launched {dt.now().strftime('%Y/%m/%d %a %H:%M:%S')}") self.logger.info(f"Target device: {self.device}, Baudrate: {self.baudrate}") self.logger.info(f"Log level: {self.log_level.upper()}, saved in \'{self.log_file}\'") self.logger.info("-----------------------------------------------\n") def set_params(self, config: dict): for k, v in config.items(): # if type(v) is dict: # set_params(v) setattr(self, k, v) @contextmanager def run(self): thread = Thread(target=self.listen, name=f"{self.name}") thread.start() try: yield finally: self.stop = True thread.join() def listen(self): tmp="" with serial.Serial(self.device, self.baudrate) as dev: while not self.stop: data = dev.readline(dev.inWaiting()).decode("latin1") if data: if "Ramdisk addr" in data: self.logger.info("") self.logger.info("-----------------------------------------") self.logger.info(f"FPGA Initialized. {dt.now().strftime('%Y/%m/%d %a %H:%M:%S')}") self.logger.info("-----------------------------------------") if data[-1] == '\n': tmp += data.strip() self.logger.info(tmp) tmp = "" else: tmp+= data.strip() #self.logger.info(data.strip()) def main(args=None): LOG = set_logger(filename=args.log_file, level=args.log_level) LOG.info("-----------------------------------------------") LOG.info(f"Logger launched {dt.now().strftime('%Y/%m/%d %a %H:%M:%S')}") LOG.info(f"Logging configs: {args.log_config}") LOG.info(f"Log level: {args.log_level.upper()}, saved in \'{args.log_file}\'") LOG.info("-----------------------------------------------\n") LOG.info("Available ports") for i, p in enumerate(list_ports.comports()): LOG.info(f"{i+1}) Deivce: {p.device}") LOG.info(f"\tDescription : {p.description}") LOG.info(f"\tHWID : {p.hwid}") LOG.info(f"\tSerial # : {p.serial_number}") LOG.info(f"\tLocation : {p.location}") LOG.info(f"\tManufacturer : {p.manufacturer}") LOG.info(f"\tProduct : {p.product}") LOG.info(f"\tInterface : {p.interface}\n") def signal_handler(sig, frame): sign = signal.Signals(sig) print("") LOG.info(f"Stop program by {signal.strsignal(sign)} ({sign.name})\n") configs = LogConfig(args.log_config).Config with ExitStack() as stack: for cfg in configs: LOG.info(f"Logging config:\n{json.dumps(cfg, indent=4)}") sl = SerialLogger(cfg) stack.enter_context(sl.run()) signal.signal(SIGINT, signal_handler) signal.signal(SIGTERM, signal_handler) print('\t To terminate the program, press Ctrl-c\n') signal.pause() return 0 def test(): config = LogConfig("log-config.yml") print(type(config.Config[0])) pass if __name__ == "__main__": # test() sys.exit( main(parsing_argument()) )