Configuration
Jobs can be configured to run from the core_lib.yaml file. for example:
CoreLib to run this job at startup by providing the initial_deplay parameter with one of the values boot or startup
core_lib:
...
jobs:
- initial_delay: 2h32m
frequency: 16:00
handler:
class: some_package.MyJob
params:
some_param: some value
- initial_delay: startup
frequency: None
handler:
class: some_package.SomeJob
params:
load_jobs()
core_lib.core_lib.CoreLib.load_jobs() [source]
Used to load instantiate all the jobs present in the config file using instantiate_config_group_generator_dict and start to schedule them based on the frequency.
def load_jobs(self, config: DictConfig, job_to_data_handler: dict = {}):
Arguments
config(DictConfig):jobssections from theCore-Libconfig.job_to_data_handler(dict): Data handlers for the jobs listed in the config, thekeyshould be thejob nameand thevalueshould be thehandler name.
Example
from core_lib.core_lib import CoreLib
class YourCoreLib(CoreLib):
def __init__(self, config: DictConfig):
...
self.load_jobs(self.config.core_lib.your_core_lib.jobs, {'job_name': self,...})
Core-Lib provides core_lib.jobs.JobScheduler class that can schedule core_lib.jobs.Job instances to run in a separate thread.
Job class
core_lib.jobs.job.Job [source]
Job class provide an abstract method called run. to create a custom Job simply do.
class MyJob(Job):
def run(self):
self.core_lib.user.do_somthing()
The local variable self.core_lib will be automatically populated by CoreLib when running using configuration (see below).
In case you want to create the job manually. pass the CoreLib instance using the job function set_data_handler, Or using the constructor.
Example
my_job.set_data_handler(my_core_lib)
JobScheduler class
core_lib.jobs.job_scheduler.JobScheduler [source]
JobScheduler provides 2 main functions
-
schedule()*core_lib.jobs.job_scheduler.JobScheduler.schedule() * [source] -
schedule_once()*core_lib.jobs.job_scheduler.JobScheduler.schedule_once() * [source]
# Run the job, and repeat by frequency
def schedule(self, initial_delay: str, frequency: str, job: Job):
...
# Run the job a single time
def schedule_once(self, initial_delay: str, job: Job):
...
Arguments
initial_delay(str): The initial time after which therun()function will be called for the first time.frequency(str): The time interval after whichrun()function will be called.job(Job): Is the instance ofJobclass created by user that implementsrun()andinitialized()
The parameters initial_delay and frequency are string parsed by the library pytimeparse.
Example
from core_lib.jobs.job import Job
from core_lib.jobs.job_scheduler import JobScheduler
class UpdateCache(Job):
def initialized(self, data_handler):
pass
def run(self):
# code to update your cache
scheduler = JobScheduler()
job = UpdateCache()
# updates the cache 1 second after the schedule_once() is called
scheduler.schedule_once('1s', job)
# updates the cache 1 second after the job is scheduled and keeps updating every 30 minutes until stop() is called.
scheduler.schedule('1s', '30m', job)
scheduler.stop()# stops the ongoing job schedule
If a job fails while running the
Exceptionand the message will be logged by theJobSchedulerfor the same.
Core-Lib Instance
When a Job is run by the configuration file, it will automatically populate the local core_lib variable of a job