After few years of working with Airflow for job scheduling, I have settled with Prefect these days. It fixes all problems I have had with Airflow and is the best upgrade in my opinion.
After implementing our logging and monitoring using Logz.io which is a hosted ELK solution, I found it useful to ship the logs from our Prefect flow executions to Logz.io to benefit from centralized logging, monitoring and alerting.
After a bit of googling, I put together a simple solution. It is based on the fact that you can add handlers to python logger very easily. Just needed to use the correct JSON formatter and a utility function.
import logging
from datetime import datetime
from pathlib import Path
from prefect.utilities.logging import get_logger
from pythonjsonlogger import jsonlogger
class PrefectJsonFormatter(jsonlogger.JsonFormatter):
"""
Log Formatter to be parsed easily with ELK
"""
def __init__(self, *args, **kwargs):
super(PrefectJsonFormatter, self).__init__(*args, **kwargs)
def add_fields(self, log_record, record, message_dict):
super(PrefectJsonFormatter, self).add_fields(
log_record, record, message_dict)
log_record['@timestamp'] = datetime.utcnow().isoformat()
log_record['level'] = record.levelname
log_record['logger'] = record.name
def setup_json_logging():
log_file = f'/var/log/platform/prefect.log'
prefect_logger = get_logger()
try:
Path(log_file).parent.mkdir(parents=True, exist_ok=True)
file_handler = logging.handlers.TimedRotatingFileHandler(
log_file, when='D', backupCount=5, utc=True)
file_handler.setFormatter(PrefectJsonFormatter())
prefect_logger.addHandler(file_handler)
except OSError as err:
prefect_logger.exception("Unable to setup file logging.")
Now all I need to do is to call the helper function when defining the flow. Something like this:
import prefect
from prefect import task, Flow
from etl_utils.prefect_logging import setup_json_logging
setup_json_logging()
@task
def extract():
"""Get a list of data"""
return [1, 2, 3]
@task
def transform(data):
"""Multiply the input by 10"""
return [i * 10 for i in data]
@task
def load(data):
"""Print the data to indicate it was received"""
print("Here's your data: {}".format(data))
with Flow('ETL') as flow:
e = extract()
t = transform(e)
l = load(t)
To ship logs to Logz.io, I'm using filebeat. Only thing I do not like about this solution is having to setup logging on every flow. If you know of a way to do this through Prefect configuration so it applies to all flows globally, please let me know.
cb7001b3-f227-4093-a51f-c4f57b3f37f6|0|.0|96d5b379-7e1d-4dac-a6ba-1e50db561b04