Massoud Mazar

Sharing The Knowledge

NAVIGATION - SEARCH

Ship Prefect logs using filebeat

After few years of working with Airflow for job scheduling, I have settled with Prefect these days. It fixes all problems I have had with Airflow and is the best upgrade in my opinion.

After implementing our logging and monitoring using Logz.io which is a hosted ELK solution, I found it useful to ship the logs from our Prefect flow executions to Logz.io to benefit from centralized logging, monitoring and alerting.

After a bit of googling, I put together a simple solution. It is based on the fact that you can add handlers to python logger very easily. Just needed to use the correct JSON formatter and a utility function.

import logging
from datetime import datetime
from pathlib import Path
from prefect.utilities.logging import get_logger
from pythonjsonlogger import jsonlogger

class PrefectJsonFormatter(jsonlogger.JsonFormatter):
    """
    Log Formatter to be parsed easily with ELK
    """

    def __init__(self, *args, **kwargs):
        super(PrefectJsonFormatter, self).__init__(*args, **kwargs)

    def add_fields(self, log_record, record, message_dict):
        super(PrefectJsonFormatter, self).add_fields(
                log_record, record, message_dict)

        log_record['@timestamp'] = datetime.utcnow().isoformat()
        log_record['level'] = record.levelname
        log_record['logger'] = record.name

def setup_json_logging():
    log_file = f'/var/log/platform/prefect.log'
    prefect_logger = get_logger()
    try:
        Path(log_file).parent.mkdir(parents=True, exist_ok=True)
        file_handler = logging.handlers.TimedRotatingFileHandler(
                log_file, when='D', backupCount=5, utc=True)
        file_handler.setFormatter(PrefectJsonFormatter())
        prefect_logger.addHandler(file_handler)
    except OSError as err:
        prefect_logger.exception("Unable to setup file logging.")

 

Now all I need to do is to call the helper function when defining the flow. Something like this:

import prefect
from prefect import task, Flow
from etl_utils.prefect_logging import setup_json_logging

setup_json_logging()

@task
def extract():
    """Get a list of data"""
    return [1, 2, 3]

@task
def transform(data):
    """Multiply the input by 10"""
    return [i * 10 for i in data]

@task
def load(data):
    """Print the data to indicate it was received"""
    print("Here's your data: {}".format(data))

with Flow('ETL') as flow:
    e = extract()
    t = transform(e)
    l = load(t)

 

To ship logs to Logz.io, I'm using filebeat. Only thing I do not like about this solution is having to setup logging on every flow. If you know of a way to do this through Prefect configuration so it applies to all flows globally, please let me know.

Add comment