core_lib.core_lib.CoreLibListner [source]
An abstract class that implements ObserverListener
, functions provided by this class are used by the Core-Lib
class methods that update you when Core-Lib
is being created or destroyed
start_core_lib()
core_lib.core_lib.CoreLib.start_core_lib() [source]
Calls the fire_core_lib_ready()
function and also sets the value of the private variable self._core_lib_started
created in the init
to True
.
def start_core_lib(self):
Example
from core_lib.core_lib_listener import CoreLibListener
from core_lib.core_lib import CoreLib
class YourCoreLib(CoreLib):
def __init__(self, config: DictConfig):
...
self.load_jobs(self.config.core_lib.your_core_lib.jobs, {'job_name': self,...})
class Listener(CoreLibListener):
def on_core_lib_ready(self):
# Do some task here
pass
def on_core_lib_destroy(self):
# Cleanups here
pass
core_lib = YourCoreLib(config)
core_lib.attach_listener(Listener())
core_lib.start_core_lib()
attach_listener()
core_lib.core_lib.CoreLib.attach_listener() [source]
Attaches a CoreLibListener
implemented class to the private observer self._observer
created in the init
.
def attach_listener(self, core_lib_listener: CoreLibListener):
Arguments
core_lib_listener
(CoreLibListener)
: A inherited class that implements the abstract functions inCoreLibListener
.
Example
from core_lib.core_lib_listener import CoreLibListener
from core_lib.core_lib import CoreLib
class YourCoreLib(CoreLib):
def __init__(self, config: DictConfig):
...
self.load_jobs(self.config.core_lib.your_core_lib.jobs, {'job_name': self,...})
class Listener(CoreLibListener):
def on_core_lib_ready(self):
# Do some task here
pass
def on_core_lib_destroy(self):
# Cleanups here
pass
core_lib = YourCoreLib(config)
core_lib.attach_listener(Listener())
detach_listener()
core_lib.core_lib.CoreLib.detach_listener() [source]
Detaches a CoreLibListener
implemented class from the private observer self._observer
created in the init
.
def detach_listener(self, core_lib_listener: CoreLibListener):
Arguments
core_lib_listener
(CoreLibListener)
: A inherited class that implements the abstract functions inCoreLibListener
.
Example
from core_lib.core_lib_listener import CoreLibListener
from core_lib.core_lib import CoreLib
class YourCoreLib(CoreLib):
def __init__(self, config: DictConfig):
...
self.load_jobs(self.config.core_lib.your_core_lib.jobs, {'job_name': self,...})
class Listener(CoreLibListener):
def on_core_lib_ready(self):
# Do some task here
pass
def on_core_lib_destroy(self):
# Cleanups here
pass
core_lib = YourCoreLib(config)
core_lib.attach_listener(Listener())
core_lib.detach_listener(Listener())
fire_core_lib_ready()
core_lib.core_lib.CoreLib.fire_core_lib_ready() [source]
Notifies the observer listener with key CoreLibListener.CoreLibEventType.CORE_LIB_READY
and value None
and calls the implemented on_core_lib_ready
function from the attached listener. This function is called in the start_core_lib()
function.
def fire_core_lib_ready(self):
fire_core_lib_destroy()
core_lib.core_lib.CoreLib.fire_core_lib_destroy() [source]
Notifies the observer listener with key CoreLibListener.CoreLibEventType.CORE_LIB_DESTROY
and value None
and calls the implemented on_core_lib_destroy
function from the attached listener. This function is called in the __del__
method of the class.
def fire_core_lib_destroy(self):
Functions
on_core_lib_ready()
core_lib.core_lib.CoreLibListner.on_core_lib_ready() [source]
Abstract method that will be called in the start_core_lib()
function and should take care of the task to be performed at the start of the Core-Lib
.
@abstractmethod
def on_core_lib_ready(self):
on_core_lib_destroy()
core_lib.core_lib.CoreLibListner.on_core_lib_destroy() [source]
Abstract method that will be called in the destructor of the CoreLib
class and should take care of the cleanups to be performed.
@abstractmethod
def on_core_lib_destroy(self):
update()
core_lib.core_lib.CoreLibListner.update() [source]
An implemented method from the ObserverListener
class, this method takes care of calling the respective functions on_core_lib_ready()
and on_core_lib_destroy()
on key
change.
def update(self, key: str, value):
Arguments
key
(str)
: The key that the listener listens to for changes.value
: Any value to be stored.
Example
Implementation of a CoreLibListener
class.
from core_lib.core_lib_listener import CoreLibListener
from core_lib.core_lib import CoreLib
class YourCoreLib(CoreLib):
def __init__(self, config: DictConfig):
...
self.load_jobs(self.config.core_lib.your_core_lib.jobs, {'job_name': self,...})
class Listener(CoreLibListener):
def on_core_lib_ready(self):
# Do some task here
pass
def on_core_lib_destroy(self):
# Cleanups here
pass
core_lib = YourCoreLib(config)
core_lib.attach_listener(Listener())
__del__()
core_lib.core_lib.CoreLib.start_core_lib() [source]
Destructor method of the class, calls fire_core_lib_destroy()
function.
def __del__(self):
Example
from core_lib.core_lib_listener import CoreLibListener
from core_lib.core_lib import CoreLib
class YourCoreLib(CoreLib):
def __init__(self, config: DictConfig):
...
self.load_jobs(self.config.core_lib.your_core_lib.jobs, {'job_name': self,...})
class Listener(CoreLibListener):
def on_core_lib_ready(self):
# Do some task here
pass
def on_core_lib_destroy(self):
# Cleanups here
pass
core_lib = YourCoreLib(config)
del core_lib