Last article we told about the basic concepts and architecture of Airflow, and we knew that Airflow has three major components: webserver
, scheduler
and executor.
This article will talk about the detail of the scheduler
by diving into some of the source code(version: 1.10.1).
Key Concept
We already told about the key concepts of Airflow, let’s recap some concepts use in airflow scheduler here.
- Dag Files: Python files that could contain DAGs. A DAG can be definite in a single python file or in multiple python files packaged into an egg file.
- Dag (Directed Acyclic Graph): a workflow which glues all the tasks with inter-dependencies.
- Task: a parameterized instance of an operator/sensor which represents a unit of actual work to be executed.
- DagRun: an object representing an instantiation of the DAG in time.
- TaskInstance: an object representing an instantiation of the Task in time.
- DagBag: a collection of dags, parsed out of a folder tree and has high level configuration settings, like what database to use as a backend and what executor to use to fire off tasks.
- Job: Jobs are processing items with state and duration that aren’t task instances. There are three types of job in Airflow: ScheduleJob, LocalTaskJob, and BackfillJob.
Process Loop
The Airflow scheduler is designed to run as a persistent service in an Airflow production environment. To kick it off, all you need to do is execute airflow scheduler
command. Let’s look at how Airflow parses this command and start the process loop.
Firstly, airflow use argparse
to parse the command and invoke the scheduler
function in airflow.bin.cli
. And then scheduler
function creates a ScheduleJob and run its run
method. The ScheduleJob will save itself to the database, start the executor and create a DagFileProcessorManager
. Eventually, the ScheduleJob will run a loop to invoke the processor_manager periodically .heartbeat
method.
All the daemon service of Airflow are started by running airflow {serve_name}
command. Basically, the airflow
command is a python command-line script. We can look at the source code to know how it starts the service.
from airflow.bin.cli import CLIFactory
if __name__ == '__main__':
parser = CLIFactory.get_parser()
args = parser.parse_args()
args.func(args)
class CLIFactory(object):
args = {
'dag_id': Arg(("dag_id",), "The id of the dag"),
'task_id': Arg(("task_id",), "The id of the task"),
...
},
subparsers = (
...
{
'func': scheduler,
'help': "Start a scheduler instance",
'args': ('dag_id_opt', 'subdir', 'run_duration', 'num_runs',
'do_pickle', 'pid', 'daemon', 'stdout', 'stderr',
'log_file'),
},
...
)
subparsers_dict = {sp['func'].__name__: sp for sp in subparsers}
dag_subparsers = (
'list_tasks', 'backfill', 'test', 'run', 'pause', 'unpause')
There are two important mappings in the CLIFactory
, args
maps the command line arguments to some default value and help information. subparsers_dict
maps the subcommand to objective python function.
@cli_utils.action_logging
def scheduler(args):
print(settings.HEADER)
job = jobs.SchedulerJob(
dag_id=args.dag_id,
subdir=process_subdir(args.subdir),
run_duration=args.run_duration,
num_runs=args.num_runs,
do_pickle=args.do_pickle)
if args.daemon:
pid, stdout, stderr, log_file = setup_locations("scheduler",
...
ctx = daemon.DaemonContext(
...
with ctx:
job.run()
...
else:
...
job.run()
airflow/jobs.py#ScheduleJob.run
def run(self):
Stats.incr(self.__class__.__name__.lower() + '_start', 1, 1)
# Adding an entry in the DB
with create_session() as session:
self.state = State.RUNNING
session.add(self)
session.commit()
...
try:
self._execute()
# In case of max runs or max duration
self.state = State.SUCCESS
except SystemExit as e:
...
finally:
self.end_date = timezone.utcnow()
session.merge(self)
session.commit()
def _execute(self):
raise NotImplementedError("This method needs to be overridden")
We can know that the scheduler service is actually an airflow.jobs.SchedulerJob
instance that runs its run
method. It will add itself into the database job
table and then run the _execute
helper method handling some and normal exit situations. Now let us look at the _execute
method.
airflow/jobs.py#ScheduleJob._execute
def _execute(self):
# Use multiple processes to parse and generate tasks for the
# DAGs in parallel. By processing them in separate processes,
# we can get parallelism and isolation from potentially harmful
# user code.
# Build up a list of Python files that could contain DAGs
known_file_paths = list_py_file_paths(self.subdir)
processor_manager = DagFileProcessorManager(self.subdir,
known_file_paths,
self.max_threads,
self.file_process_interval,
self.file_process_interval,
processor_factory)
try:
self._execute_helper(processor_manager)
finally:
self.log.info("Exited execute loop")
# Kill all child processes on exit since we don't want to leave
# them as orphaned.
pids_to_kill = processor_manager.get_all_pids()
...
We can see that _execute
method create a processor_manager
and the processor_manager
use processor_factory
to create a processor for each dag file_path. We can also find out it uses configurations which configured in the airflow.cfg
file.
file_process_interval=conf.getint('scheduler', 'min_file_process_interval'),
self.heartrate = conf.getint('scheduler', 'SCHEDULER_HEARTBEAT_SEC')
self.max_threads = conf.getint('scheduler', 'max_threads')
# How often to scan the DAGs directory for new files. Default to 5 minutes.
self.dag_dir_list_interval = conf.getint('scheduler', 'dag_dir_list_interval')
# How often to print out DAG file processing stats to the log. Default to
# 30 seconds.
self.print_stats_interval = conf.getint('scheduler', 'print_stats_interval')
# Parse and schedule each file no faster than this interval. Default
# to 3 minutes.
self.file_process_interval = file_process_interval
self.max_tis_per_query = conf.getint('scheduler', 'max_tis_per_query')
if run_duration is None:
self.run_duration = conf.getint('scheduler', 'run_duration')
Then the _execute
method run the _execute_helper
method.
airflow/jobs.py#ScheduleJob._execute_helper
def _execute_helper(self, processor_manager):
self.executor.start()
## self.log.info("Resetting orphaned tasks for active dag runs")
self.reset_state_for_orphaned_tasks()
execute_start_time = timezone.utcnow()
# Use this value initially
known_file_paths = processor_manager.file_paths
# For the execute duration, parse and schedule DAGs
while (timezone.utcnow() - execute_start_time).total_seconds() < self.run_duration or self.run_duration < 0:
self.log.debug("Starting Loop...")
if elapsed_time_since_refresh > self.dag_dir_list_interval:
# Build up a list of Python files that could contain DAGs
known_file_paths = list_py_file_paths(self.subdir)
last_dag_dir_refresh_time = timezone.utcnow()
processor_manager.set_file_paths(known_file_paths)
self.clear_nonexistent_import_errors (known_file_paths=known_file_paths)
simple_dags = processor_manager.heartbeat()
The _execute_helper
starts the executor
and run the processor_manager.heartbeat()
in a loop with the configured dag_dir_list_interval
.
DagFileProcessorManager Heartbeat
As fast as we know, the processor_manager.heartbeat
is the most important method in the process loop, so let’s dive into it.
The processor_manager
actually maintains a list of DagFileProcessor(don’t know how to present this list in the UML sequence diagram) to process the dag files. The max size of the processor list is configured in the airflow.cfg
-> scheduler
-> max_threads
. In the heartbeat
method, the processor_manager.heartbeat
kick off new DagFileProcessor
and DagFileProcessor
kick off new multiprocessing. Process
to process DAG definition files and read the results from the finished processors. One process corresponds to one SchedulerJob
instance. When the process started, it use scheduler_job.process_file
method to process the DAG files and stores the result into a queue. The processor_manager
collects all the results that were produced by processors that have finished since the last time this was called. it also starts more processors if necessary.
airflow/utils/dag_processing.py#DagFileProcessorManager.heartbeat
def heartbeat(self):
# get finished_processors and running_processors
finished_processors = {}
running_processors = {}
for file_path, processor in self._processors.items():
if processor.done:
...
finished_processors[file_path] = processor
...
else:
running_processors[file_path] = processor
self._processors = running_processors
# Collect all the DAGs that were found in the processed files
simple_dags = []
for file_path, processor in finished_processors.items():
if processor.result is None:
...
else:
for simple_dag in processor.result:
simple_dags.append(simple_dag)
# Generate more file paths to process if we processed all the files
# already.
...
# Start more processors if we have enough slots and files to process
while (self._parallelism - len(self._processors) > 0 and len (self._file_path_queue) > 0):
file_path = self._file_path_queue.pop(0)
processor = self._processor_factory(file_path)
processor.start()
self._processors[file_path] = processor
# Update scheduler heartbeat count.
...
return simple_dags
airflow/jobs.py#DagFileProcessor
class DagFileProcessor(AbstractDagFileProcessor, LoggingMixin):
...
def _launch_process(result_queue,...):
...
def helper()
scheduler_job = SchedulerJob(dag_ids=dag_id_white_list, log=log)
result = scheduler_job.process_file(file_path, pickle_dags)
result_queue.put(result)
...
p = multiprocessing.Process(target=helper,
args=(),
name="{}-Process".format(thread_name))
p.start()
return p
...
def start(self):
"""
Launch the process and start processing the DAG.
"""
self._process = DagFileProcessor._launch_process(
self._result_queue,
self.file_path,
self._pickle_dags,
self._dag_id_white_list,
"DagFileProcessor{}".format(self._instance_id))
self._start_time = timezone.utcnow()
ScheduleJob Process File
Let’s keep diving into the scheduler_job.process_file
method.
The scheduler_job.process_file
method first creates a DagBag
instance for the dag file path. In the DagBag
instance initiation, it loads the python modules in the file path using different std lib base on whether the path is a zip path.
After modules are loaded, DagBag
collects all the DAGs in the modules.
ScheduleJob then gets all dags from DagBag
, sync their states db and collect those dags which are not paused.
ScheduleJob iterates over all the un-paused dags and processes them. Processing includes:
- Calculate
next_run_date
for dag and create appropriateDagRun(s)
in the DB. - Create appropriate
TaskInstance(s)
in the DB for newDagRuns
. - Find all active
DagRuns
for a dag and iterate over their unscheduledTaskInstances
. If the dependencies of aTaskInstance
is met, update the state of theTaskInstance
to SCHEDULED. - Send emails for tasks that have missed SLAs.
airflow/jobs.py#ScheduleJob.process_file
def process_file(self, file_path, pickle_dags=False, session=None):
simple_dags = []
try:
dagbag = models.DagBag(file_path)
except Exception:
...
return []
...
# Save individual DAGs in the ORM and update DagModel.last_scheduled_time
for dag in dagbag.dags.values():
dag.sync_to_db()
paused_dag_ids = [dag.dag_id for dag in dagbag.dags.values() if dag.is_paused]
# Pickle the DAGs (if necessary) and put them into a SimpleDag
for dag_id in dagbag.dags:
dag = dagbag.get_dag(dag_id)
...
# Only return DAGs that are not paused
if dag_id not in paused_dag_ids:
simple_dags.append(SimpleDag(dag, pickle_id=pickle_id))
if len(self.dag_ids) > 0:
dags = [dag for dag in dagbag.dags.values()
if dag.dag_id in self.dag_ids and
dag.dag_id not in paused_dag_ids]
else:
dags = [dag for dag in dagbag.dags.values()
if not dag.parent_dag and
dag.dag_id not in paused_dag_ids]
# Not using multiprocessing.Queue() since it's no longer a separate
# process and due to some unusual behavior. (empty() incorrectly
# returns true?)
ti_keys_to_schedule = []
self._process_dags(dagbag, dags, ti_keys_to_schedule)
for ti_key in ti_keys_to_schedule:
dag = dagbag.dags[ti_key[0]]
task = dag.get_task(ti_key[1])
# NOTE: create TaskInstance
ti = models.TaskInstance(task, ti_key[2])
ti.refresh_from_db(session=session, lock_for_update=True)
...
dep_context = DepContext(deps=QUEUE_DEPS, ignore_task_deps=True)
...
if ti.are_dependencies_met(
dep_context=dep_context,
session=session,
verbose=True):
# Task starts out in the scheduled state. All tasks in the
# scheduled state will be sent to the executor
ti.state = State.SCHEDULED
# Also save this task instance to the DB.
self.log.info("Creating / updating %s in ORM", ti)
session.merge(ti)
# commit batch
session.commit()
# Record import errors into the ORM
...
# kill_zombies
...
return simple_dags
airflow/models/__init__.py#DagBag.process_file
def process_file(self, filepath, only_if_updated=True, safe_mode=True):
...
mods = []
is_zipfile = zipfile.is_zipfile(filepath)
if not is_zipfile:
...
org_mod_name, _ = os.path.splitext(os.path.split(filepath)[-1])
mod_name = ...
...
with timeout(configuration.conf.getint('core', "DAGBAG_IMPORT_TIMEOUT")):
try:
m = imp.load_source(mod_name, filepath)
mods.append(m)
except Exception as e:
...
else:
zip_file = zipfile.ZipFile(filepath)
for mod in zip_file.infolist():
head, _ = os.path.split(mod.filename)
mod_name, ext = os.path.splitext(mod.filename)
if not head and (ext == '.py' or ext == '.pyc'):
...
try:
sys.path.insert(0, filepath)
m = importlib.import_module(mod_name)
mods.append(m)
except Exception as e:
...
for m in mods:
for dag in list(m.__dict__.values()):
if isinstance(dag, DAG):
...
try:
dag.is_subdag = False
self.bag_dag(dag, parent_dag=dag, root_dag=dag)
if isinstance(dag._schedule_interval, six.string_types):
croniter(dag._schedule_interval)
found_dags.append(dag)
found_dags += dag.subdags
except ...
self.file_last_changed[filepath] = file_last_changed_on_disk
return found_dags
airflow/jobs.py#ScheduleJob._process_dags
def _process_dags(self, dagbag, dags, tis_out):
for dag in dags:
...
dag_run = self.create_dag_run(dag)
if dag_run:
self.log.info("Created %s", dag_run)
self._process_task_instances(dag, tis_out)
self.manage_slas(dag)
models.DagStat.update([d.dag_id for d in dags])
airflow/jobs.py#ScheduleJob.create_dag_run
def create_dag_run(self, dag, session=None):
"""
This method checks whether a new DagRun needs to be created
for a DAG based on scheduling interval
Returns DagRun if one is scheduled. Otherwise returns None.
"""
if dag.schedule_interval:
active_runs = DagRun.find(...)
# return if already reached maximum active runs and no timeout setting
if len(active_runs) >= dag.max_active_runs and not dag.dagrun_timeout:
return
timedout_runs = 0
for dr in active_runs:
if (dr.start_date and dag.dagrun_timeout and dr.start_date < timezone.utcnow() - dag.dagrun_timeout):
dr.state = State.FAILED
dr.end_date = timezone.utcnow()
dag.handle_callback(dr, success=False, reason='dagrun_timeout', dsession=session)
timedout_runs += 1
session.commit()
if len(active_runs) - timedout_runs >= dag.max_active_runs:
return
# this query should be replaced by find dagrun
last_scheduled_run = session.query(func.max(DagRun.execution_date)).filter_by(...).scalar()
# don't schedule @once again
if dag.schedule_interval == '@once' and last_scheduled_run:
return None
# don't do scheduler catchup for dag's that don't have dag.catchup = True
if not (dag.catchup or dag.schedule_interval == '@once'):
...
dag.start_date = ... # next_schedule_time_before_now
next_run_date = None
if not last_scheduled_run:
# First run
next_run_date = ... # minimal start_time of dag tasks
else:
next_run_date = dag.following_schedule(last_scheduled_run)
# make sure backfills are also considered
...
# don't ever schedule prior to the dag's start_date
...
# don't ever schedule in the future
...
# this structure is necessary to avoid a TypeError from concatenating
# NoneType
if dag.schedule_interval == '@once':
period_end = next_run_date
elif next_run_date:
period_end = dag.following_schedule(next_run_date)
# Don't schedule a dag beyond its end_date (as specified by the dag param)
if next_run_date and dag.end_date and next_run_date > dag.end_date:
return
# Don't schedule a dag beyond its end_date (as specified by the task params)
# Get the min task end date, which may come from the dag.default_args
...
if next_run_date and period_end and period_end <= timezone.utcnow():
# create a dagrun and save it to db
next_run = dag.create_dagrun(
run_id=DagRun.ID_PREFIX + next_run_date.isoformat(),
execution_date=next_run_date,
start_date=timezone.utcnow(),
state=State.RUNNING,
external_trigger=False
)
return next_run
airflow/jobs.py##ScheduleJob._process_task_instances
@provide_session
def _process_task_instances(self, dag, queue, session=None):
"""
This method schedules the tasks for a single DAG by looking at the
active DAG runs and adding task instances that should run to the
queue.
"""
# update the state of the previously active dag runs
dag_runs = DagRun.find(dag_id=dag.dag_id, state=State.RUNNING, session=session)
active_dag_runs = []
for run in dag_runs:
self.log.info("Examining DAG run %s", run)
# don't consider runs that are executed in the future
...
if len(active_dag_runs) >= dag.max_active_runs:
self.log.info("Active dag runs > max_active_run.")
continue
# skip backfill dagruns for now as long as they are not really scheduled
if run.is_backfill:
continue
# todo: run.dag is transient but needs to be set
run.dag = dag
# todo: preferably the integrity check happens at dag collection time
run.verify_integrity(session=session)
run.update_state(session=session)
if run.state == State.RUNNING:
make_transient(run)
active_dag_runs.append(run)
for run in active_dag_runs:
self.log.debug("Examining active DAG run: %s", run)
# this needs a fresh session sometimes tis get detached
# Get a set of task instance related to this task for a specific date range.
tis = run.get_task_instances(state=(State.NONE, State.UP_FOR_RETRY))
# this loop is quite slow as it uses are_dependencies_met for
# every task (in ti.is_runnable). This is also called in
# update_state above which has already checked these tasks
for ti in tis:
task = dag.get_task(ti.task_id)
# fixme: ti.task is transient but needs to be set
ti.task = task
# future: remove adhoc
if task.adhoc:
continue
if ti.are_dependencies_met(
dep_context=DepContext(flag_upstream_failed=True),
session=session):
self.log.debug('Queuing task: %s', ti)
queue.append(ti.key)
Overview
The Airflow scheduler spins up a process loop, which monitors and stays in sync with a folder for all DAG objects it may contain, and periodically (every minute or so) collects DAG parsing results and inspects active tasks to see whether they can be scheduled.