Tasks
import asyncio, os
from fastapi import FastAPI
from easyjobs.workers.worker import EasyJobsWorker
server = FastAPI()
@server.on_event('startup')
async def setup():
worker = await EasyJobsWorker.create(
server,
server_secret='abcd1234',
manager_host='0.0.0.0',
manager_port=8220,
manager_secret='abcd1234',
jobs_queue='ETL',
max_tasks_per_worker=5
)
every_minute = '* * * * *'
default_args = {'kwargs': {'url': ['http://stats']}}
async def get_data(url):
print(f"get_data: {url}")
return {'a': 1, 'b': 2, 'c': 3}
async def load_db(data: dict):
#await db.tables['transformed'].insert(**data)
return f"data {data} loaded to db"
async def send_email(address: str, message: str):
return f"email sent to {address}"
@worker.task(run_after=['transform'], schedule=every_minute, default_args=default_args)
async def extract(url: str):
print(f"extract started")
data = await get_data(url)
print(f"extract finished")
return {'data': data}
@worker.task(run_after=['load'])
async def transform(data: dict):
print(f"transform started")
for k in data.copy():
data[k] = int(data[k]) + 2
print(f"transform finished")
return {'data': data}
@worker.task(on_failure='failure_notify', run_after=['compute'])
async def load(data):
print(f"load started")
await load_db(data)
print(f"load finished")
return {'data': data}
@worker.task()
async def failure_notify(job_failed):
await send_email('admin@company.io', job_failed)
return job_failed
@worker.task()
async def deploy_environment():
print(f"deploy_environment - started")
await asyncio.sleep(5)
print(f"deploy_environment - completed")
return f"deploy_environment - completed"
@worker.task()
async def prepare_db():
print(f"prepare_db - started")
await asyncio.sleep(5)
print(f"prepare_db - completed")
return f"prepare_db - completed"
@worker.task(run_before=['deploy_environment', 'prepare_db'])
async def configure_environment():
print(f"configure_environment - starting")
await asyncio.sleep(5)
print(f"configure_environment - finished")
return f"configure_environment - finished"
os.environ['WORKER_TASK_DIR'] = '/home/josh/Documents/python/easyjobs'
@worker.task(subprocess=True, run_before=['configure_environment'])
async def compute(data: dict):
pass
@worker.task()
async def pipeline():
print(f"pipline started")
result = await compute(data={'test': 'data'})
print(f"pipline - result is {result} - finished")
return result
Task Flow
Registering Tasks
Tasks can be registered on a Manager or Worker by using referencing the
# task arguments
def task(
namespace: str = 'DEFAULT',
on_failure: Optional[str] = None, # on failure job
retry_policy: Optional[str] = 'retry_once', # retry_once, retry_always, never
run_before: Optional[list] = None,
run_after: Optional[list] = None,
subprocess: Optional[bool] = False,
schedule: Optional[str] = None,
default_args: Optional[dict] = None,
) -> Callable:
Task register arguments:
- namespace - Manager only, Defaults to 'DEFAULT' - Determines what queue task is registered within, methods can be registered within multiple namespaces. Workers inherit jobs_queue, from creation.
- on_failure - Default Unspecified - Will attempt to create with on_failure=
if task run resulted in a failure - retry_policy - Defaults retry_policy='retry_once', with possible values [retry_always, never]
- run_before - List - Runs listed jobs in parelell before starting task.
- run_after - Defaults Unspecified - Will create job with run_after=
using results of current task as argument for run_afer task. - subprocess - Defaults False - Defines whether a task should be created via a subprocess
- schedule - Default Unspecified - Define a cron schedule which the Job Manager will invoke the Task automatically
- default_args - Default Unspecified - Required if task takes arguments, and used when a task is invoked via schedule.
@worker.task(on_failure='send_failure_email')
async def finance_work(employee_id: str, employee_data: dict):
"""
do finance work
"""
return finance_results
@worker.task(retry_policy='always')
async def send_failure_email(reason):
# send email with reason
return f"email sent for {reason}"
@manager.task(namespace="general", run_after=['more_general_work'])
async def general_work(general_data: dict):
"""
do general work
"""
return general_results
@manager.task(namespace="general")
async def more_general_work(general_results):
# extra work on general_results
return f"more_general_results"
Tip
Worker tasks which do not contain I/O bound tasks (Network, Web Requests / Database querries ) and run beyond 10 seconds, should be placed within a task subprocess definition. This is to allow the current worker thread continue servicing other concurrent tasks
Note
Tasks created with subprocess=True, will create a new process (using an separate & uncontended python GIL), run until completed / failed, and then report the results back to EasyJobsManager. EasyJobsManager will provide the results to the worker, releasing a task reservation ( allowing more work to complete using results).
Subprocess Usage
Example - Worker
#required env vars
os.environ['WORKER_TASK_DIR'] = '/home/codemation/blocking_funcs/'
@worker.task(subprocess=True)
async def basic_blocking(a, b, c):
pass
$ ls /home/codemation/blocking_funcs
advanced_blocking.py basic_blocking.py
Example - Manager
# manager
#required env vars
os.environ['MANAGER_HOST'] = '0.0.0.0'
os.environ['MANAGER_PORT'] = '8220'
os.environ['WORKER_TASK_DIR'] = /home/codemation/blocking_funcs/
@manager.task(subprocess=True)
async def advanced_blocking(a, b, c):
pass
Tip
- Methods registered with 'subprocess=True' do not contain logic
- Arguments improve readability, but do not affect functionality (except in template)
Subprocess Template
# /home/codemation/blocking_funcs/basic_blocking.py
import time
from easyjobs.workers.task import subprocess
@subprocess
def work(a, b, c):
"""
insert blocking / non-blocking work here
"""
time.sleep(5) # Blocking
return {'result': 'I slept for 5 seconds - blocking with {a} {b} {c}'}
if __name__ == '__main__':
work()
Note
The name method work() is not ultimately significant, but shold match what appears within the (if name == 'main': ) block of code: